Commit f4348329 authored by Rémi Denis-Courmont's avatar Rémi Denis-Courmont

RTP: avoid pf_demux. Fix input thread latency

parent 06f4977c
...@@ -160,60 +160,62 @@ static block_t *rtp_recv (demux_t *demux) ...@@ -160,60 +160,62 @@ static block_t *rtp_recv (demux_t *demux)
} }
static void timer_cleanup (void *timer)
{
vlc_timer_destroy (timer);
}
static void rtp_process (void *data);
void *rtp_thread (void *data) void *rtp_thread (void *data)
{ {
demux_t *demux = data; demux_t *demux = data;
demux_sys_t *p_sys = demux->p_sys; demux_sys_t *p_sys = demux->p_sys;
bool autodetect = true; bool autodetect = true, reorder = false;
do if (vlc_timer_create (&p_sys->timer, rtp_process, data))
return NULL;
vlc_cleanup_push (timer_cleanup, &p_sys->timer);
for (;;)
{ {
block_t *block = rtp_recv (demux); block_t *block = rtp_recv (demux);
vlc_mutex_lock (&p_sys->lock);
if (block == NULL) if (block == NULL)
p_sys->dead = true; /* Fatal error: abort */ break;
else
{ if (autodetect)
if (autodetect) { /* Autodetect payload type, _before_ rtp_queue() */
{ /* Autodetect payload type, _before_ rtp_queue() */ /* No need for lock - the queue is empty. */
if (rtp_autodetect (demux, p_sys->session, block)) if (rtp_autodetect (demux, p_sys->session, block))
{ {
vlc_mutex_unlock (&p_sys->lock); block_Release (block);
block_Release (block); continue;
continue;
}
autodetect = false;
} }
rtp_queue (demux, p_sys->session, block); autodetect = false;
} }
vlc_cond_signal (&p_sys->wait);
vlc_mutex_lock (&p_sys->lock);
rtp_queue (demux, p_sys->session, block);
vlc_mutex_unlock (&p_sys->lock); vlc_mutex_unlock (&p_sys->lock);
}
while (!p_sys->dead);
rtp_process (demux);
}
vlc_cleanup_run ();
return NULL; return NULL;
} }
/** /**
* Process one RTP packet from the de-jitter queue. * Process one RTP packet from the de-jitter queue.
* @return 0 on success, -1 on EOF
*/ */
int rtp_process (demux_t *demux) static void rtp_process (void *data)
{ {
demux_t *demux = data;
demux_sys_t *p_sys = demux->p_sys; demux_sys_t *p_sys = demux->p_sys;
mtime_t deadline = INT64_MAX; mtime_t deadline;
int ret;
vlc_mutex_lock (&p_sys->lock); vlc_mutex_lock (&p_sys->lock);
if (rtp_dequeue (demux, p_sys->session, &deadline)) if (rtp_dequeue (demux, p_sys->session, &deadline))
/* Pace the demux thread */ vlc_timer_schedule (&p_sys->timer, true, deadline, 0);
vlc_cond_timedwait (&p_sys->wait, &p_sys->lock, deadline);
else
vlc_cond_wait (&p_sys->wait, &p_sys->lock);
ret = p_sys->dead ? -1 : 0;
vlc_mutex_unlock (&p_sys->lock); vlc_mutex_unlock (&p_sys->lock);
return ret;
} }
...@@ -139,7 +139,6 @@ vlc_module_end () ...@@ -139,7 +139,6 @@ vlc_module_end ()
/* /*
* Local prototypes * Local prototypes
*/ */
static int Demux (demux_t *);
static int Control (demux_t *, int i_query, va_list args); static int Control (demux_t *, int i_query, va_list args);
static int extract_port (char **phost); static int extract_port (char **phost);
...@@ -236,7 +235,6 @@ static int Open (vlc_object_t *obj) ...@@ -236,7 +235,6 @@ static int Open (vlc_object_t *obj)
} }
vlc_mutex_init (&p_sys->lock); vlc_mutex_init (&p_sys->lock);
vlc_cond_init (&p_sys->wait);
p_sys->srtp = NULL; p_sys->srtp = NULL;
p_sys->fd = fd; p_sys->fd = fd;
p_sys->rtcp_fd = rtcp_fd; p_sys->rtcp_fd = rtcp_fd;
...@@ -246,9 +244,8 @@ static int Open (vlc_object_t *obj) ...@@ -246,9 +244,8 @@ static int Open (vlc_object_t *obj)
p_sys->max_dropout = var_CreateGetInteger (obj, "rtp-max-dropout"); p_sys->max_dropout = var_CreateGetInteger (obj, "rtp-max-dropout");
p_sys->max_misorder = var_CreateGetInteger (obj, "rtp-max-misorder"); p_sys->max_misorder = var_CreateGetInteger (obj, "rtp-max-misorder");
p_sys->framed_rtp = (tp == IPPROTO_TCP); p_sys->framed_rtp = (tp == IPPROTO_TCP);
p_sys->dead = false;
demux->pf_demux = Demux; demux->pf_demux = NULL;
demux->pf_control = Control; demux->pf_control = Control;
demux->p_sys = p_sys; demux->p_sys = p_sys;
...@@ -303,7 +300,6 @@ static void Close (vlc_object_t *obj) ...@@ -303,7 +300,6 @@ static void Close (vlc_object_t *obj)
vlc_cancel (p_sys->thread); vlc_cancel (p_sys->thread);
vlc_join (p_sys->thread, NULL); vlc_join (p_sys->thread, NULL);
} }
vlc_cond_destroy (&p_sys->wait);
vlc_mutex_destroy (&p_sys->lock); vlc_mutex_destroy (&p_sys->lock);
if (p_sys->srtp) if (p_sys->srtp)
...@@ -676,11 +672,3 @@ int rtp_autodetect (demux_t *demux, rtp_session_t *session, ...@@ -676,11 +672,3 @@ int rtp_autodetect (demux_t *demux, rtp_session_t *session,
* Dynamic payload type handlers * Dynamic payload type handlers
* Hmm, none implemented yet. * Hmm, none implemented yet.
*/ */
/**
* Processing callback
*/
static int Demux (demux_t *demux)
{
return rtp_process (demux) ? 0 : 1;
}
...@@ -46,7 +46,6 @@ void rtp_queue (demux_t *, rtp_session_t *, block_t *); ...@@ -46,7 +46,6 @@ void rtp_queue (demux_t *, rtp_session_t *, block_t *);
bool rtp_dequeue (demux_t *, const rtp_session_t *, mtime_t *); bool rtp_dequeue (demux_t *, const rtp_session_t *, mtime_t *);
int rtp_add_type (demux_t *demux, rtp_session_t *ses, const rtp_pt_t *pt); int rtp_add_type (demux_t *demux, rtp_session_t *ses, const rtp_pt_t *pt);
int rtp_process (demux_t *demux);
void *rtp_thread (void *data); void *rtp_thread (void *data);
/* Global data */ /* Global data */
...@@ -57,8 +56,8 @@ struct demux_sys_t ...@@ -57,8 +56,8 @@ struct demux_sys_t
int fd; int fd;
int rtcp_fd; int rtcp_fd;
vlc_thread_t thread; vlc_thread_t thread;
vlc_timer_t timer;
vlc_mutex_t lock; vlc_mutex_t lock;
vlc_cond_t wait;
bool thread_ready; bool thread_ready;
unsigned caching; unsigned caching;
...@@ -67,6 +66,8 @@ struct demux_sys_t ...@@ -67,6 +66,8 @@ struct demux_sys_t
uint16_t max_misorder; /**< Max packet backward misordering */ uint16_t max_misorder; /**< Max packet backward misordering */
uint8_t max_src; /**< Max simultaneous RTP sources */ uint8_t max_src; /**< Max simultaneous RTP sources */
bool framed_rtp; /**< Framed RTP packets over TCP */ bool framed_rtp; /**< Framed RTP packets over TCP */
#if 0
bool dead; /**< End of stream */ bool dead; /**< End of stream */
#endif
}; };
...@@ -453,6 +453,8 @@ bool rtp_dequeue (demux_t *demux, const rtp_session_t *session, ...@@ -453,6 +453,8 @@ bool rtp_dequeue (demux_t *demux, const rtp_session_t *session,
mtime_t now = mdate (); mtime_t now = mdate ();
bool pending = false; bool pending = false;
*deadlinep = INT64_MAX;
for (unsigned i = 0, max = session->srcc; i < max; i++) for (unsigned i = 0, max = session->srcc; i < max; i++)
{ {
rtp_source_t *src = session->srcv[i]; rtp_source_t *src = session->srcv[i];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment