Commit 05c3deaf authored by Rémi Denis-Courmont's avatar Rémi Denis-Courmont

RTP: use poll() timeout, get rid of the threaded timer and lock

This gets rid of one thread per RTP input. This will also ease would-be
support for RTCP.
parent f0879515
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include <vlc_block.h> #include <vlc_block.h>
#include <vlc_network.h> #include <vlc_network.h>
#include <limits.h>
#include <unistd.h> #include <unistd.h>
#ifdef HAVE_POLL #ifdef HAVE_POLL
# include <poll.h> # include <poll.h>
...@@ -39,189 +40,156 @@ ...@@ -39,189 +40,156 @@
# include <srtp.h> # include <srtp.h>
#endif #endif
static bool fd_dead (int fd)
{
struct pollfd ufd = { .fd = fd, };
return (poll (&ufd, 1, 0) > 0) && (ufd.revents & POLLHUP);
}
/** /**
* Gets a datagram from the network. * Processes a packet received from the RTP socket.
* @param fd datagram file descriptor
* @return a block or NULL on fatal error (socket dead)
*/ */
static block_t *rtp_dgram_recv (vlc_object_t *obj, int fd) static void rtp_process (demux_t *demux, block_t *block)
{ {
block_t *block = block_Alloc (0xffff); demux_sys_t *sys = demux->p_sys;
ssize_t len;
block_cleanup_push (block); if (block->i_buffer < 2)
do goto drop;
{ const uint8_t ptype = rtp_ptype (block);
len = net_Read (obj, fd, NULL, if (ptype >= 72 && ptype <= 76)
block->p_buffer, block->i_buffer, false); goto drop; /* Muxed RTCP, ignore for now FIXME */
if (((len <= 0) && fd_dead (fd)) || !vlc_object_alive (obj)) #ifdef HAVE_SRTP
{ /* POLLHUP -> permanent (DCCP) socket error */ if (sys->srtp != NULL)
block_Release (block); {
block = NULL; size_t len = block->i_buffer;
break; if (srtp_recv (sys->srtp, block->p_buffer, &len))
{
msg_Dbg (demux, "SRTP authentication/decryption failed");
goto drop;
} }
block->i_buffer = len;
} }
while (len == -1); #endif
vlc_cleanup_pop ();
return block ? block_Realloc (block, 0, len) : NULL; /* TODO: use SDP and get rid of this hack */
if (unlikely(sys->autodetect))
{ /* Autodetect payload type, _before_ rtp_queue() */
if (rtp_autodetect (demux, sys->session, block))
goto drop;
sys->autodetect = false;
}
rtp_queue (demux, sys->session, block);
return;
drop:
block_Release (block);
} }
static int rtp_timeout (mtime_t deadline)
{
if (deadline == VLC_TS_INVALID)
return -1; /* infinite */
mtime_t t = mdate ();
if (t >= deadline)
return 0;
t = (deadline - t) / (CLOCK_FREQ / INT64_C(1000));
if (unlikely(t > INT_MAX))
return INT_MAX;
return t;
}
/** /**
* Gets a framed RTP packet. * RTP/RTCP session thread for datagram sockets
* @param fd stream file descriptor
* @return a block or NULL in case of fatal error
*/ */
static block_t *rtp_stream_recv (vlc_object_t *obj, int fd) void *rtp_dgram_thread (void *opaque)
{ {
ssize_t len = 0; demux_t *demux = opaque;
uint8_t hdr[2]; /* frame header */ demux_sys_t *sys = demux->p_sys;
mtime_t deadline = VLC_TS_INVALID;
/* Receives the RTP frame header */ int rtp_fd = sys->fd;
do
{
ssize_t val = net_Read (obj, fd, NULL, hdr + len, 2 - len, false);
if (val <= 0)
return NULL;
len += val;
}
while (len < 2);
block_t *block = block_Alloc (GetWBE (hdr)); struct pollfd ufd[1];
ufd[0].fd = rtp_fd;
ufd[0].events = POLLIN;
/* Receives the RTP packet */ for (;;)
for (ssize_t i = 0; i < len;)
{ {
ssize_t val; int n = poll (ufd, 1, rtp_timeout (deadline));
if (n == -1)
continue;
block_cleanup_push (block); int canc = vlc_savecancel ();
val = net_Read (obj, fd, NULL, if (n == 0)
block->p_buffer + i, block->i_buffer - i, false); goto dequeue;
vlc_cleanup_pop ();
if (val <= 0) if (ufd[0].revents)
{ {
block_Release (block); n--;
return NULL; if (unlikely(ufd[0].revents & POLLHUP))
} break; /* RTP socket dead (DCCP only) */
i += val;
}
return block; block_t *block = block_Alloc (0xffff); /* TODO: p_sys->mru */
} if (unlikely(block == NULL))
break; /* we are totallly screwed */
ssize_t len = recv (rtp_fd, block->p_buffer, block->i_buffer, 0);
static block_t *rtp_recv (demux_t *demux) if (len != -1)
{
demux_sys_t *p_sys = demux->p_sys;
for (block_t *block;; block_Release (block))
{ {
block = p_sys->framed_rtp block->i_buffer = len;
? rtp_stream_recv (VLC_OBJECT (demux), p_sys->fd) rtp_process (demux, block);
: rtp_dgram_recv (VLC_OBJECT (demux), p_sys->fd);
if (block == NULL)
{
msg_Err (demux, "RTP flow stopped");
break; /* fatal error */
} }
else
if (block->i_buffer < 2)
continue;
/* FIXME */
const uint8_t ptype = rtp_ptype (block);
if (ptype >= 72 && ptype <= 76)
continue; /* Muxed RTCP, ignore for now */
#ifdef HAVE_SRTP
if (p_sys->srtp)
{
size_t len = block->i_buffer;
int canc, err;
canc = vlc_savecancel ();
err = srtp_recv (p_sys->srtp, block->p_buffer, &len);
vlc_restorecancel (canc);
if (err)
{ {
msg_Dbg (demux, "SRTP authentication/decryption failed"); msg_Warn (demux, "RTP network error: %m");
continue; block_Release (block);
} }
block->i_buffer = len;
} }
#endif
return block; /* success! */ dequeue:
if (!rtp_dequeue (demux, sys->session, &deadline))
deadline = VLC_TS_INVALID;
vlc_restorecancel (canc);
} }
return NULL; return NULL;
} }
/**
static void timer_cleanup (void *timer) * RTP/RTCP session thread for stream sockets (framed RTP)
{ */
vlc_timer_destroy ((vlc_timer_t)timer); void *rtp_stream_thread (void *opaque)
}
static void rtp_process (void *data);
void *rtp_thread (void *data)
{ {
demux_t *demux = data; #ifndef WIN32
demux_sys_t *p_sys = demux->p_sys; demux_t *demux = opaque;
bool autodetect = true; demux_sys_t *sys = demux->p_sys;
int fd = sys->fd;
if (vlc_timer_create (&p_sys->timer, rtp_process, data))
return NULL;
vlc_cleanup_push (timer_cleanup, (void *)p_sys->timer);
for (;;) for (;;)
{ {
block_t *block = rtp_recv (demux); /* There is no reordering on stream sockets, so no timeout. */
if (block == NULL) /* FIXME: hack rtp_dequeue() to skip the reordering/timer */
ssize_t val;
uint16_t frame_len;
if (recv (fd, &frame_len, 2, MSG_WAITALL) != 2)
break; break;
if (autodetect) block_t *block = block_Alloc (ntohs (frame_len));
{ /* Autodetect payload type, _before_ rtp_queue() */ if (unlikely(block == NULL))
/* No need for lock - the queue is empty. */ break;
if (rtp_autodetect (demux, p_sys->session, block))
block_cleanup_push (block);
val = recv (fd, block->p_buffer, block->i_buffer, MSG_WAITALL);
vlc_cleanup_pop ();
if (val != (ssize_t)block->i_buffer)
{ {
block_Release (block); block_Release (block);
continue; break;
}
autodetect = false;
} }
int canc = vlc_savecancel (); int canc = vlc_savecancel ();
vlc_mutex_lock (&p_sys->lock); rtp_process (demux, block);
rtp_queue (demux, p_sys->session, block);
vlc_mutex_unlock (&p_sys->lock);
vlc_restorecancel (canc); vlc_restorecancel (canc);
rtp_process (demux);
} }
vlc_cleanup_run (); #else
(void) opaque;
#endif
return NULL; return NULL;
} }
/**
* Process one RTP packet from the de-jitter queue.
*/
static void rtp_process (void *data)
{
demux_t *demux = data;
demux_sys_t *p_sys = demux->p_sys;
mtime_t deadline;
vlc_mutex_lock (&p_sys->lock);
if (rtp_dequeue (demux, p_sys->session, &deadline))
vlc_timer_schedule (p_sys->timer, true, deadline, 0);
vlc_mutex_unlock (&p_sys->lock);
}
...@@ -257,7 +257,6 @@ static int Open (vlc_object_t *obj) ...@@ -257,7 +257,6 @@ static int Open (vlc_object_t *obj)
return VLC_EGENERIC; return VLC_EGENERIC;
} }
vlc_mutex_init (&p_sys->lock);
#ifdef HAVE_SRTP #ifdef HAVE_SRTP
p_sys->srtp = NULL; p_sys->srtp = NULL;
#endif #endif
...@@ -269,7 +268,8 @@ static int Open (vlc_object_t *obj) ...@@ -269,7 +268,8 @@ static int Open (vlc_object_t *obj)
* CLOCK_FREQ; * CLOCK_FREQ;
p_sys->max_dropout = var_CreateGetInteger (obj, "rtp-max-dropout"); p_sys->max_dropout = var_CreateGetInteger (obj, "rtp-max-dropout");
p_sys->max_misorder = var_CreateGetInteger (obj, "rtp-max-misorder"); p_sys->max_misorder = var_CreateGetInteger (obj, "rtp-max-misorder");
p_sys->framed_rtp = (tp == IPPROTO_TCP); p_sys->thread_ready = false;
p_sys->autodetect = true;
demux->pf_demux = NULL; demux->pf_demux = NULL;
demux->pf_control = Control; demux->pf_control = Control;
...@@ -303,8 +303,9 @@ static int Open (vlc_object_t *obj) ...@@ -303,8 +303,9 @@ static int Open (vlc_object_t *obj)
} }
#endif #endif
if (vlc_clone (&p_sys->thread, rtp_thread, demux, if (vlc_clone (&p_sys->thread,
VLC_THREAD_PRIORITY_INPUT)) (tp != IPPROTO_TCP) ? rtp_dgram_thread : rtp_stream_thread,
demux, VLC_THREAD_PRIORITY_INPUT))
goto error; goto error;
p_sys->thread_ready = true; p_sys->thread_ready = true;
return VLC_SUCCESS; return VLC_SUCCESS;
...@@ -328,7 +329,6 @@ static void Close (vlc_object_t *obj) ...@@ -328,7 +329,6 @@ static void Close (vlc_object_t *obj)
vlc_cancel (p_sys->thread); vlc_cancel (p_sys->thread);
vlc_join (p_sys->thread, NULL); vlc_join (p_sys->thread, NULL);
} }
vlc_mutex_destroy (&p_sys->lock);
#ifdef HAVE_SRTP #ifdef HAVE_SRTP
if (p_sys->srtp) if (p_sys->srtp)
......
...@@ -54,7 +54,8 @@ void rtp_queue (demux_t *, rtp_session_t *, block_t *); ...@@ -54,7 +54,8 @@ void rtp_queue (demux_t *, rtp_session_t *, block_t *);
bool rtp_dequeue (demux_t *, const rtp_session_t *, mtime_t *); bool rtp_dequeue (demux_t *, const rtp_session_t *, mtime_t *);
int rtp_add_type (demux_t *demux, rtp_session_t *ses, const rtp_pt_t *pt); int rtp_add_type (demux_t *demux, rtp_session_t *ses, const rtp_pt_t *pt);
void *rtp_thread (void *data); void *rtp_dgram_thread (void *data);
void *rtp_stream_thread (void *data);
/* Global data */ /* Global data */
struct demux_sys_t struct demux_sys_t
...@@ -66,18 +67,13 @@ struct demux_sys_t ...@@ -66,18 +67,13 @@ struct demux_sys_t
int fd; int fd;
int rtcp_fd; int rtcp_fd;
vlc_thread_t thread; vlc_thread_t thread;
vlc_timer_t timer;
vlc_mutex_t lock;
mtime_t timeout; mtime_t timeout;
unsigned caching; unsigned caching;
uint16_t max_dropout; /**< Max packet forward misordering */ uint16_t max_dropout; /**< Max packet forward misordering */
uint16_t max_misorder; /**< Max packet backward misordering */ uint16_t max_misorder; /**< Max packet backward misordering */
uint8_t max_src; /**< Max simultaneous RTP sources */ uint8_t max_src; /**< Max simultaneous RTP sources */
bool framed_rtp; /**< Framed RTP packets over TCP */
bool thread_ready; bool thread_ready;
#if 0 bool autodetect; /**< Payload type autodetection pending */
bool dead; /**< End of stream */
#endif
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment