Commit e63f18a0 authored by Rémi Denis-Courmont's avatar Rémi Denis-Courmont

RTP: try to improve packet re-ordering

parent 40f2e517
......@@ -3,7 +3,10 @@ if HAVE_LIBGCRYPT
libvlc_LTLIBRARIES += \
librtp_plugin.la
librtp_plugin_la_SOURCES = \
rtp.c rtp.h session.c
rtp.c \
rtp.h \
input.c \
session.c
librtp_plugin_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/libs/srtp
librtp_plugin_la_LIBADD = $(AM_LIBADD) \
$(top_builddir)/libs/srtp/libvlc_srtp.la
......
/**
* @file input.c
* @brief RTP packet input
*/
/*****************************************************************************
* Copyright © 2008 Rémi Denis-Courmont
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2.0
* of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
****************************************************************************/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <vlc_common.h>
#include <vlc_demux.h>
#include <vlc_block.h>
#include <vlc_network.h>
#include <unistd.h>
#ifdef HAVE_POLL
# include <poll.h>
#endif
#include "rtp.h"
#include <srtp.h>
/**
* Gets a datagram from the network.
* @param fd datagram file descriptor
* @return a block or NULL on fatal error (socket dead)
*/
static block_t *rtp_dgram_recv (int fd)
{
block_t *block = block_Alloc (0xffff);
ssize_t len;
do
{
struct pollfd ufd = { .fd = fd, .events = POLLIN, };
block_cleanup_push (block);
poll (&ufd, 1, -1);
len = read (fd, block->p_buffer, block->i_buffer);
vlc_cleanup_pop ();
if ((len <= 0) && (ufd.revents & POLLHUP))
{ /* POLLHUP -> permanent (DCCP) socket error */
block_Release (block);
return NULL;
}
}
while (len == -1);
return block_Realloc (block, 0, len);
}
/**
* Gets a framed RTP packet.
* @param fd stream file descriptor
* @return a block or NULL in case of fatal error
*/
static block_t *rtp_stream_recv (int fd)
{
ssize_t len = 0;
struct pollfd ufd = { .fd = fd, .events = POLLIN, };
uint8_t hdr[2]; /* frame header */
/* Receives the RTP frame header */
do
{
ssize_t val;
poll (&ufd, 1, -1);
val = read (fd, hdr + len, 2 - len);
if (val <= 0)
return NULL;
len += val;
}
while (len < 2);
block_t *block = block_Alloc (GetWBE (hdr));
/* Receives the RTP packet */
for (ssize_t i = 0; i < len;)
{
ssize_t val;
block_cleanup_push (block);
poll (&ufd, 1, -1);
val = read (fd, block->p_buffer + i, block->i_buffer - i);
vlc_cleanup_pop ();
if (val <= 0)
{
block_Release (block);
return NULL;
}
i += val;
}
return block;
}
static block_t *rtp_recv (demux_t *demux)
{
demux_sys_t *p_sys = demux->p_sys;
for (block_t *block;; block_Release (block))
{
block = p_sys->framed_rtp
? rtp_stream_recv (p_sys->fd)
: rtp_dgram_recv (p_sys->fd);
if (block == NULL)
{
msg_Err (demux, "RTP flow stopped");
break; /* fatal error */
}
if (block->i_buffer < 2)
continue;
/* FIXME */
const uint8_t ptype = rtp_ptype (block);
if (ptype >= 72 && ptype <= 76)
continue; /* Muxed RTCP, ignore for now */
if (p_sys->srtp)
{
size_t len = block->i_buffer;
int canc, err;
canc = vlc_savecancel ();
err = srtp_recv (p_sys->srtp, block->p_buffer, &len);
vlc_restorecancel (canc);
if (err)
{
msg_Dbg (demux, "SRTP authentication/decryption failed");
continue;
}
block->i_buffer = len;
}
return block; /* success! */
}
return NULL;
}
void *rtp_thread (void *data)
{
demux_t *demux = data;
demux_sys_t *p_sys = demux->p_sys;
for (;;)
{
block_t *block = rtp_recv (demux);
if (block == NULL)
break; /* fatal error: abort */
vlc_mutex_lock (&p_sys->lock);
/* Autodetect payload type, _before_ rtp_queue() */
if (p_sys->autodetect)
{
if (rtp_autodetect (demux, p_sys->session, block))
{
block_Release (block);
continue;
}
p_sys->autodetect = false;
}
rtp_queue (demux, p_sys->session, block);
vlc_cond_signal (&p_sys->wait);
vlc_mutex_unlock (&p_sys->lock);
}
/* TODO: return 0 from Demux */
return NULL;
}
void rtp_process (demux_t *demux)
{
demux_sys_t *p_sys = demux->p_sys;
mtime_t deadline;
vlc_mutex_lock (&p_sys->lock);
if (rtp_dequeue (demux, p_sys->session, &deadline))
/* Pace the demux thread */
vlc_cond_timedwait (&p_sys->wait, &p_sys->lock, deadline);
else
vlc_cond_wait (&p_sys->wait, &p_sys->lock);
vlc_mutex_unlock (&p_sys->lock);
}
......@@ -31,9 +31,6 @@
#include <vlc_demux.h>
#include <vlc_aout.h>
#include <vlc_network.h>
#ifdef HAVE_POLL
# include <poll.h>
#endif
#include <vlc_plugin.h>
#include <vlc_codecs.h>
......@@ -225,6 +222,8 @@ static int Open (vlc_object_t *obj)
return VLC_EGENERIC;
}
vlc_mutex_init (&p_sys->lock);
vlc_cond_init (&p_sys->wait);
p_sys->srtp = NULL;
p_sys->fd = fd;
p_sys->rtcp_fd = rtcp_fd;
......@@ -266,6 +265,10 @@ static int Open (vlc_object_t *obj)
}
}
if (vlc_clone (&p_sys->thread, rtp_thread, demux,
VLC_THREAD_PRIORITY_INPUT))
goto error;
p_sys->thread_ready = true;
return VLC_SUCCESS;
error:
......@@ -282,6 +285,14 @@ static void Close (vlc_object_t *obj)
demux_t *demux = (demux_t *)obj;
demux_sys_t *p_sys = demux->p_sys;
if (p_sys->thread_ready)
{
vlc_cancel (p_sys->thread);
vlc_join (p_sys->thread, NULL);
}
vlc_cond_destroy (&p_sys->wait);
vlc_mutex_destroy (&p_sys->lock);
if (p_sys->srtp)
srtp_destroy (p_sys->srtp);
if (p_sys->session)
......@@ -366,81 +377,6 @@ static int Control (demux_t *demux, int i_query, va_list args)
}
/**
* Checks if a file descriptor is hung up.
*/
static bool fd_dead (int fd)
{
struct pollfd ufd = { .fd = fd, };
return (poll (&ufd, 1, 0) == 1) && (ufd.revents & POLLHUP);
}
/**
* Gets a datagram from the network, or NULL in case of fatal error.
*/
static block_t *rtp_dgram_recv (demux_t *demux, int fd)
{
block_t *block = block_Alloc (0xffff);
ssize_t len;
do
{
len = net_Read (VLC_OBJECT (demux), fd, NULL,
block->p_buffer, block->i_buffer, false);
if (((len <= 0) && fd_dead (fd))
|| !vlc_object_alive (demux))
{
block_Release (block);
return NULL;
}
}
while (len == -1);
return block_Realloc (block, 0, len);
}
/**
* Gets a framed RTP packet, or NULL in case of fatal error.
*/
static block_t *rtp_stream_recv (demux_t *demux, int fd)
{
ssize_t len = 0;
uint8_t hdr[2]; /* frame header */
/* Receives the RTP frame header */
do
{
ssize_t val = net_Read (VLC_OBJECT (demux), fd, NULL,
hdr + len, 2 - len, false);
if (val <= 0)
return NULL;
len += val;
}
while (len < 2);
block_t *block = block_Alloc (GetWBE (hdr));
/* Receives the RTP packet */
for (ssize_t i = 0; i < len;)
{
ssize_t val;
val = net_Read (VLC_OBJECT (demux), fd, NULL,
block->p_buffer + i, block->i_buffer - i, false);
if (val <= 0)
{
block_Release (block);
return NULL;
}
i += val;
}
return block;
}
/*
* Generic packet handlers
*/
......@@ -613,6 +549,75 @@ static void *ts_init (demux_t *demux)
}
/* Not using SDP, we need to guess the payload format used */
/* see http://www.iana.org/assignments/rtp-parameters */
int rtp_autodetect (demux_t *demux, rtp_session_t *session,
const block_t *block)
{
uint8_t ptype = rtp_ptype (block);
rtp_pt_t pt = {
.init = NULL,
.destroy = codec_destroy,
.decode = codec_decode,
.frequency = 0,
.number = ptype,
};
switch (ptype)
{
case 0:
msg_Dbg (demux, "detected G.711 mu-law");
pt.init = pcmu_init;
pt.frequency = 8000;
break;
case 8:
msg_Dbg (demux, "detected G.711 A-law");
pt.init = pcma_init;
pt.frequency = 8000;
break;
case 10:
msg_Dbg (demux, "detected stereo PCM");
pt.init = l16s_init;
pt.frequency = 44100;
break;
case 11:
msg_Dbg (demux, "detected mono PCM");
pt.init = l16m_init;
pt.frequency = 44100;
break;
case 14:
msg_Dbg (demux, "detected MPEG Audio");
pt.init = mpa_init;
pt.decode = mpa_decode;
pt.frequency = 90000;
break;
case 32:
msg_Dbg (demux, "detected MPEG Video");
pt.init = mpv_init;
pt.decode = mpv_decode;
pt.frequency = 90000;
break;
case 33:
msg_Dbg (demux, "detected MPEG2 TS");
pt.init = ts_init;
pt.destroy = stream_destroy;
pt.decode = stream_decode;
pt.frequency = 90000;
break;
default:
return -1;
}
rtp_add_type (demux, session, &pt);
return 0;
}
/*
* Dynamic payload type handlers
* Hmm, none implemented yet.
......@@ -623,102 +628,6 @@ static void *ts_init (demux_t *demux)
*/
static int Demux (demux_t *demux)
{
demux_sys_t *p_sys = demux->p_sys;
block_t *block;
block = p_sys->framed_rtp
? rtp_stream_recv (demux, p_sys->fd)
: rtp_dgram_recv (demux, p_sys->fd);
if (!block)
return 0;
if (block->i_buffer < 2)
goto drop;
const uint8_t ptype = block->p_buffer[1] & 0x7F;
if (ptype >= 72 && ptype <= 76)
goto drop; /* Muxed RTCP, ignore for now */
if (p_sys->srtp)
{
size_t len = block->i_buffer;
if (srtp_recv (p_sys->srtp, block->p_buffer, &len))
{
msg_Dbg (demux, "SRTP authentication/decryption failed");
goto drop;
}
block->i_buffer = len;
}
/* Not using SDP, we need to guess the payload format used */
/* see http://www.iana.org/assignments/rtp-parameters */
if (p_sys->autodetect)
{
rtp_pt_t pt = {
.init = NULL,
.destroy = codec_destroy,
.decode = codec_decode,
.frequency = 0,
.number = ptype,
};
switch (ptype)
{
case 0:
msg_Dbg (demux, "detected G.711 mu-law");
pt.init = pcmu_init;
pt.frequency = 8000;
break;
case 8:
msg_Dbg (demux, "detected G.711 A-law");
pt.init = pcma_init;
pt.frequency = 8000;
break;
case 10:
msg_Dbg (demux, "detected stereo PCM");
pt.init = l16s_init;
pt.frequency = 44100;
break;
case 11:
msg_Dbg (demux, "detected mono PCM");
pt.init = l16m_init;
pt.frequency = 44100;
break;
case 14:
msg_Dbg (demux, "detected MPEG Audio");
pt.init = mpa_init;
pt.decode = mpa_decode;
pt.frequency = 90000;
break;
case 32:
msg_Dbg (demux, "detected MPEG Video");
pt.init = mpv_init;
pt.decode = mpv_decode;
pt.frequency = 90000;
break;
case 33:
msg_Dbg (demux, "detected MPEG2 TS");
pt.init = ts_init;
pt.destroy = stream_destroy;
pt.decode = stream_decode;
pt.frequency = 90000;
break;
default:
goto drop;
}
rtp_add_type (demux, p_sys->session, &pt);
p_sys->autodetect = false;
}
rtp_receive (demux, p_sys->session, block);
return 1;
drop:
block_Release (block);
rtp_process (demux);
return 1;
}
......@@ -20,8 +20,10 @@
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
****************************************************************************/
/* RTP payload format */
typedef struct rtp_pt_t rtp_pt_t;
typedef struct rtp_session_t rtp_session_t;
/** @section RTP payload format */
struct rtp_pt_t
{
void *(*init) (demux_t *);
......@@ -30,14 +32,22 @@ struct rtp_pt_t
uint32_t frequency; /* RTP clock rate (Hz) */
uint8_t number;
};
int rtp_autodetect (demux_t *, rtp_session_t *, const block_t *);
/* RTP session */
typedef struct rtp_session_t rtp_session_t;
static inline uint8_t rtp_ptype (const block_t *block)
{
return block->p_buffer[1] & 0x7F;
}
/** @section RTP session */
rtp_session_t *rtp_session_create (demux_t *);
void rtp_session_destroy (demux_t *, rtp_session_t *);
void rtp_receive (demux_t *, rtp_session_t *, block_t *);
void rtp_queue (demux_t *, rtp_session_t *, block_t *);
bool rtp_dequeue (demux_t *, const rtp_session_t *, mtime_t *);
int rtp_add_type (demux_t *demux, rtp_session_t *ses, const rtp_pt_t *pt);
void rtp_process (demux_t *demux);
void *rtp_thread (void *data);
/* Global data */
struct demux_sys_t
......@@ -46,6 +56,10 @@ struct demux_sys_t
struct srtp_session_t *srtp;
int fd;
int rtcp_fd;
vlc_thread_t thread;
vlc_mutex_t lock;
vlc_cond_t wait;
bool thread_ready;
unsigned caching;
unsigned timeout;
......
......@@ -192,11 +192,6 @@ rtp_source_destroy (demux_t *demux, const rtp_session_t *session,
free (source);
}
static inline uint8_t rtp_ptype (const block_t *block)
{
return block->p_buffer[1] & 0x7F;
}
static inline uint16_t rtp_seq (const block_t *block)
{
assert (block->i_buffer >= 4);
......@@ -234,7 +229,7 @@ rtp_find_ptype (const rtp_session_t *session, rtp_source_t *source,
* @param block RTP packet including the RTP header
*/
void
rtp_receive (demux_t *demux, rtp_session_t *session, block_t *block)
rtp_queue (demux_t *demux, rtp_session_t *session, block_t *block)
{
demux_sys_t *p_sys = demux->p_sys;
......@@ -368,7 +363,7 @@ rtp_receive (demux_t *demux, rtp_session_t *session, block_t *block)
block->p_next = *pp;
*pp = block;
rtp_decode (demux, session, src);
/*rtp_decode (demux, session, src);*/
return;
drop:
......@@ -381,16 +376,22 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src)
{
block_t *block = src->blocks;
/* Buffer underflow? */
if (!block || !block->p_next || !block->p_next->p_next)
return;
/* TODO: use time rather than packet counts for buffer measurement */
assert (block);
src->blocks = block->p_next;
block->p_next = NULL;
/* Discontinuity detection */
if (((src->last_seq + 1) & 0xffff) != rtp_seq (block))
uint16_t delta_seq = rtp_seq (block) - (src->last_seq + 1);
if (delta_seq != 0)
{
if (delta_seq >= 0x8000)
{ /* Unrecoverable if later packets have already been dequeued */
msg_Warn (demux, "ignoring late packet (sequence: %u)",
rtp_seq (block));
goto drop;
}
block->i_flags |= BLOCK_FLAG_DISCONTINUITY;
}
src->last_seq = rtp_seq (block);
/* Match the payload type */
......@@ -408,7 +409,7 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src)
* format, a single source MUST only use payloads of a chosen frequency.
* Otherwise it would be impossible to compute consistent timestamps. */
/* FIXME: handle timestamp wrap properly */
/* TODO: sync multiple sources sanely... */
/* TODO: inter-medias/sessions sync (using RTCP-SR) */
const uint32_t timestamp = rtp_timestamp (block);
block->i_pts = UINT64_C(1) * CLOCK_FREQ * timestamp / pt->frequency;
......@@ -437,3 +438,64 @@ rtp_decode (demux_t *demux, const rtp_session_t *session, rtp_source_t *src)
drop:
block_Release (block);
}
bool rtp_dequeue (demux_t *demux, const rtp_session_t *session,
mtime_t *restrict deadlinep)
{
mtime_t now = mdate ();
bool pending = false;
for (unsigned i = 0, max = session->srcc; i < max; i++)
{
rtp_source_t *src = session->srcv[i];
block_t *block;
/* Because of IP packet delay variation (IPDV), we need to guesstimate
* how long to wait for a missing packet in the RTP sequence
* (see RFC3393 for background on IPDV).
*
* This situation occurs if a packet got lost, or if the network has
* re-ordered packets. Unfortunately, the MSL is 2 minutes, orders of
* magnitude too long for multimedia. We need a tradeoff.
* If we underestimated IPDV, we may have to discard valid but late
* packets. If we overestimate it, we will either cause too much
* delay, or worse, underflow our downstream buffers, as we wait for
* definitely a lost packets.
*
* The rest of the "de-jitter buffer" work is done by the interval
* LibVLC E/S-out clock synchronization. Here, we need to bother about
* re-ordering packets, as decoders can't cope with mis-ordered data.
*/
while (((block = src->blocks)) != NULL)
{
#if 0
if (rtp_seq (block) == ((src->last_seq + 1) & 0xffff))
{ /* Next block ready, no need to wait */
rtp_decode (demux, session, src);
continue;
}
#endif
/* Wait for 3 times the inter-arrival delay variance (about 99.7%
* match for random gaussian jitter). Additionnaly, we implicitly
* wait for misordering times the packetization time.
*/
mtime_t deadline = src->last_rx;
const rtp_pt_t *pt = rtp_find_ptype (session, src, block, NULL);
if (pt)
deadline += UINT64_C(3) * CLOCK_FREQ * src->jitter
/ pt->frequency;
if (now >= deadline)
{
rtp_decode (demux, session, src);
continue;
}
if (*deadlinep > deadline)
*deadlinep = deadline;
pending = true; /* packet pending in buffer */
break;
}
}
return pending;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment