Commit 69a87e57 authored by Christophe Massiot's avatar Christophe Massiot

* ALL: Introduce a buffering scheme to smooth packet output (see README).

parent 22ad9a93
...@@ -26,7 +26,8 @@ Current features ...@@ -26,7 +26,8 @@ Current features
- Lightweight program designed for extreme memory and CPU conditions - Lightweight program designed for extreme memory and CPU conditions
- Only one dependancy: libdvbpsi - Only one dependancy: libdvbpsi
- CAM menus (MMI) support via an external application - CAM menus (MMI) support via an external application
- The configuration file describing outputs can be reloaded without losing a single packet - The configuration file describing outputs can be reloaded without losing
a single packet
- Support for the new S2API of linux-dvb - Support for the new S2API of linux-dvb
- IPv6 network support - IPv6 network support
- UDP rather than RTP output for IPTV STBs which don't support RTP - UDP rather than RTP output for IPTV STBs which don't support RTP
...@@ -55,6 +56,11 @@ information. ...@@ -55,6 +56,11 @@ information.
Alternative inputs Alternative inputs
================== ==================
DVBlast may handle several DVB adapters in the same machine with the -a switch:
-a 3 will use /dev/dvb/adapter3. Additionally, selecting between frontends on
a single card is supported with the -n switch. This is useful for hybrid
DVB/S + DVB/T cards.
If you own a Computer Modules DVB-ASI input card, you can have DVBlast If you own a Computer Modules DVB-ASI input card, you can have DVBlast
filter and demultiplex the inputs. You just need to specify the slot number filter and demultiplex the inputs. You just need to specify the slot number
with -A. with -A.
...@@ -140,6 +146,48 @@ from the command-line : ...@@ -140,6 +146,48 @@ from the command-line :
dvblast -c /tmp/dvblast.conf dvblast -c /tmp/dvblast.conf
Buffering
=========
DVB cards usually output packets in big chunks. This can be problematic
with low bitrate multiplexes. By default, DVBlast bufferizes 200 ms
and tries to smooth the output. It may be desired to change this value
with the -L option. The appropriate value should be :
chunk_size / multiplex_bitrate
The chunk size for saa7146-based cards is 512000 bits ; for cx23885-based
cards is 192512 bits. The multiplex_bitrate depends on the symbol rate and
many other factors such as the modulation.
The current default value allows for multiplex_rates as low as 2.56 Mbi/s.
Smaller multiplexes are rare but exist, so in that case you may want to
increase the buffer size. A typical DVB multiplex is 30 or 40 Mbi/s, so
the default introduces a superfluous latency ; the buffer can be lowered
to 50 ms if latency is an issue.
DVBlast also has another parameter called "max retention time" (-E).
This controls how TS packets are grouped together in IP datagrams : the
difference between the theorical output times of the first and the last
TS packets cannot exceed the maximal retention time.
IP datagrams are normally output at the output time of the earliest TS
packet ; it implies that the next TS packets are sent too soon and must
be buffered at the receiver level. ISO/IEC 13818-1 makes no provision for
this, since IP wasn't in mind when designing TS, so in theory we risk a
buffer overflow.
However normal IP receivers feature a jitter buffer which can absorb the
overflow ; DVB recommends a 50 ms buffer. DVBlast's default maximal
retention time is just below, at 40 ms, which should be fine in most
situations.
If some anal set-top-box complains about buffer overflows or clock issues,
you may try to lower the value ; the drawback is that on low bitrate
streams it will introduce padding. People with low bitrate streams and
nice receivers with big buffers can raise this value to avoid superfluous
padding and lower the total bitrate.
Monitoring Monitoring
========== ==========
...@@ -163,11 +211,6 @@ dvblast_mmi.sh -r /tmp/dvblast.sock ...@@ -163,11 +211,6 @@ dvblast_mmi.sh -r /tmp/dvblast.sock
Advanced features Advanced features
================= =================
DVBlast may handle several DVB adapters in the same machine with the -a switch:
-a 3 will use /dev/dvb/adapter3. Additionally, selecting between frontends on
a single card is supported with the -n switch. This is useful for hybrid
DVB/S + DVB/T cards.
For better latency, run DVBlast in real-time priority: -i 1 (requires root For better latency, run DVBlast in real-time priority: -i 1 (requires root
privileges). privileges).
...@@ -183,3 +226,4 @@ unused ones, can be output. With -e, dvblast also streams EIT and SDT packets ...@@ -183,3 +226,4 @@ unused ones, can be output. With -e, dvblast also streams EIT and SDT packets
for the related services. for the related services.
Other options are self-understandable, and are listed in dvblast -h. Other options are self-understandable, and are listed in dvblast -h.
- Test and enhance the API for DVB-C, H, and ATSC support - Test and enhance the API for DVB-C, H, and ATSC support
- Win32 support - Win32 support
- Improve build system (autostuff) - Improve build system (autostuff)
- Rework the EN 50 221 machinery to avoid blocking TPDU_Recv and Send - bitstream support
...@@ -167,82 +167,86 @@ void asi_Open( void ) ...@@ -167,82 +167,86 @@ void asi_Open( void )
/***************************************************************************** /*****************************************************************************
* asi_Read : read packets from the device * asi_Read : read packets from the device
*****************************************************************************/ *****************************************************************************/
block_t *asi_Read( void ) block_t *asi_Read( mtime_t i_poll_timeout )
{ {
int i, i_len; struct pollfd pfd;
struct iovec p_iov[i_bufsize / TS_SIZE]; int i_ret;
block_t *p_ts, **pp_current = &p_ts;
for ( ; ; ) pfd.fd = i_handle;
{ pfd.events = POLLIN | POLLPRI;
struct pollfd pfd;
pfd.fd = i_handle; i_ret = poll( &pfd, 1, (i_poll_timeout + 999) / 1000 );
pfd.events = POLLIN | POLLPRI;
if ( poll(&pfd, 1, -1) < 0 ) i_wallclock = mdate();
{
if ( i_ret < 0 )
{
if( errno != EINTR )
msg_Err( NULL, "couldn't poll from device " ASI_DEVICE " (%s)", msg_Err( NULL, "couldn't poll from device " ASI_DEVICE " (%s)",
i_asi_adapter, strerror(errno) ); i_asi_adapter, strerror(errno) );
continue; return NULL;
} }
if ( (pfd.revents & POLLPRI) ) if ( (pfd.revents & POLLPRI) )
{
unsigned int i_val;
if ( ioctl(i_handle, ASI_IOC_RXGETEVENTS, &i_val) < 0 )
msg_Err( NULL, "couldn't RXGETEVENTS (%s)", strerror(errno) );
else
{ {
unsigned int i_val; if ( i_val & ASI_EVENT_RX_BUFFER )
msg_Warn( NULL, "driver receive buffer queue overrun" );
if ( ioctl(i_handle, ASI_IOC_RXGETEVENTS, &i_val) < 0 ) if ( i_val & ASI_EVENT_RX_FIFO )
msg_Err( NULL, "couldn't RXGETEVENTS (%s)", strerror(errno) ); msg_Warn( NULL, "onboard receive FIFO overrun" );
else if ( i_val & ASI_EVENT_RX_CARRIER )
{ msg_Warn( NULL, "carrier status change" );
if ( i_val & ASI_EVENT_RX_BUFFER ) if ( i_val & ASI_EVENT_RX_LOS )
msg_Warn( NULL, "driver receive buffer queue overrun" ); msg_Warn( NULL, "loss of packet synchronization" );
if ( i_val & ASI_EVENT_RX_FIFO ) if ( i_val & ASI_EVENT_RX_AOS )
msg_Warn( NULL, "onboard receive FIFO overrun" ); msg_Warn( NULL, "acquisition of packet synchronization" );
if ( i_val & ASI_EVENT_RX_CARRIER ) if ( i_val & ASI_EVENT_RX_DATA )
msg_Warn( NULL, "carrier status change" ); msg_Warn( NULL, "receive data status change" );
if ( i_val & ASI_EVENT_RX_LOS )
msg_Warn( NULL, "loss of packet synchronization" );
if ( i_val & ASI_EVENT_RX_AOS )
msg_Warn( NULL, "acquisition of packet synchronization" );
if ( i_val & ASI_EVENT_RX_DATA )
msg_Warn( NULL, "receive data status change" );
}
} }
if ( (pfd.revents & POLLIN) )
break;
} }
for ( i = 0; i < i_bufsize / TS_SIZE; i++ ) if ( (pfd.revents & POLLIN) )
{ {
*pp_current = block_New(); struct iovec p_iov[i_bufsize / TS_SIZE];
p_iov[i].iov_base = (*pp_current)->p_ts; block_t *p_ts, **pp_current = &p_ts;
p_iov[i].iov_len = TS_SIZE; int i, i_len;
pp_current = &(*pp_current)->p_next;
}
if ( (i_len = readv(i_handle, p_iov, i_bufsize / TS_SIZE)) < 0 ) for ( i = 0; i < i_bufsize / TS_SIZE; i++ )
{ {
msg_Err( NULL, "couldn't read from device " ASI_DEVICE " (%s)", *pp_current = block_New();
i_asi_adapter, strerror(errno) ); p_iov[i].iov_base = (*pp_current)->p_ts;
i_len = 0; p_iov[i].iov_len = TS_SIZE;
} pp_current = &(*pp_current)->p_next;
i_len /= TS_SIZE; }
pp_current = &p_ts; if ( (i_len = readv(i_handle, p_iov, i_bufsize / TS_SIZE)) < 0 )
while ( i_len && *pp_current ) {
{ msg_Err( NULL, "couldn't read from device " ASI_DEVICE " (%s)",
pp_current = &(*pp_current)->p_next; i_asi_adapter, strerror(errno) );
i_len--; i_len = 0;
} }
i_len /= TS_SIZE;
pp_current = &p_ts;
while ( i_len && *pp_current )
{
pp_current = &(*pp_current)->p_next;
i_len--;
}
if ( *pp_current ) if ( *pp_current )
msg_Dbg( NULL, "partial buffer received" ); msg_Dbg( NULL, "partial buffer received" );
block_DeleteChain( *pp_current ); block_DeleteChain( *pp_current );
*pp_current = NULL; *pp_current = NULL;
return p_ts; return p_ts;
}
return NULL;
} }
/***************************************************************************** /*****************************************************************************
...@@ -251,7 +255,7 @@ block_t *asi_Read( void ) ...@@ -251,7 +255,7 @@ block_t *asi_Read( void )
int asi_SetFilter( uint16_t i_pid ) int asi_SetFilter( uint16_t i_pid )
{ {
#ifdef USE_HARDWARE_FILTERING #ifdef USE_HARDWARE_FILTERING
p_pid_filter[ i_pid / 8 ] |= (0x01 << (i_pid % 8)); p_pid_filter[ i_pid / 8 ] |= (0x01 << (i_pid % 8));
if ( ioctl( i_handle, ASI_IOC_RXSETPF, p_pid_filter ) < 0 ) if ( ioctl( i_handle, ASI_IOC_RXSETPF, p_pid_filter ) < 0 )
msg_Warn( NULL, "couldn't add filter on PID %u", i_pid ); msg_Warn( NULL, "couldn't add filter on PID %u", i_pid );
......
/***************************************************************************** /*****************************************************************************
* demux.c * demux.c
***************************************************************************** *****************************************************************************
* Copyright (C) 2004, 2008-2009 VideoLAN * Copyright (C) 2004, 2008-2010 VideoLAN
* $Id$ * $Id$
* *
* Authors: Christophe Massiot <massiot@via.ecp.fr> * Authors: Christophe Massiot <massiot@via.ecp.fr>
...@@ -76,6 +76,7 @@ static dvbpsi_handle p_sdt_dvbpsi_handle; ...@@ -76,6 +76,7 @@ static dvbpsi_handle p_sdt_dvbpsi_handle;
static dvbpsi_handle p_eit_dvbpsi_handle; static dvbpsi_handle p_eit_dvbpsi_handle;
static dvbpsi_pat_t *p_current_pat = NULL; static dvbpsi_pat_t *p_current_pat = NULL;
static dvbpsi_sdt_t *p_current_sdt = NULL; static dvbpsi_sdt_t *p_current_sdt = NULL;
static mtime_t i_last_dts = -1;
static int i_demux_fd; static int i_demux_fd;
static int i_nb_errors = 0; static int i_nb_errors = 0;
static mtime_t i_last_error = 0; static mtime_t i_last_error = 0;
...@@ -84,6 +85,7 @@ static mtime_t i_last_error = 0; ...@@ -84,6 +85,7 @@ static mtime_t i_last_error = 0;
* Local prototypes * Local prototypes
*****************************************************************************/ *****************************************************************************/
static void demux_Handle( block_t *p_ts ); static void demux_Handle( block_t *p_ts );
static void SetDTS( block_t *p_list );
static void SetPID( uint16_t i_pid ); static void SetPID( uint16_t i_pid );
static void UnsetPID( uint16_t i_pid ); static void UnsetPID( uint16_t i_pid );
static void StartPID( output_t *p_output, uint16_t i_pid ); static void StartPID( output_t *p_output, uint16_t i_pid );
...@@ -98,9 +100,9 @@ static void GetPIDS( uint16_t **ppi_wanted_pids, int *pi_nb_wanted_pids, ...@@ -98,9 +100,9 @@ static void GetPIDS( uint16_t **ppi_wanted_pids, int *pi_nb_wanted_pids,
static int SIDIsSelected( uint16_t i_sid ); static int SIDIsSelected( uint16_t i_sid );
int PIDWouldBeSelected( dvbpsi_pmt_es_t *p_es ); int PIDWouldBeSelected( dvbpsi_pmt_es_t *p_es );
static int PMTNeedsDescrambling( dvbpsi_pmt_t *p_pmt ); static int PMTNeedsDescrambling( dvbpsi_pmt_t *p_pmt );
static void SendPAT( void ); static void SendPAT( mtime_t i_dts );
static void SendSDT( void ); static void SendSDT( mtime_t i_dts );
static void SendPMT( sid_t *p_sid ); static void SendPMT( sid_t *p_sid, mtime_t i_dts );
static void NewPAT( output_t *p_output ); static void NewPAT( output_t *p_output );
static void NewSDT( output_t *p_output ); static void NewSDT( output_t *p_output );
static void NewPMT( output_t *p_output ); static void NewPMT( output_t *p_output );
...@@ -148,9 +150,9 @@ void demux_Open( void ) ...@@ -148,9 +150,9 @@ void demux_Open( void )
/***************************************************************************** /*****************************************************************************
* demux_Run * demux_Run
*****************************************************************************/ *****************************************************************************/
void demux_Run( void ) void demux_Run( block_t *p_ts )
{ {
block_t *p_ts = pf_Read(); SetDTS( p_ts );
while ( p_ts != NULL ) while ( p_ts != NULL )
{ {
...@@ -207,7 +209,7 @@ static void demux_Handle( block_t *p_ts ) ...@@ -207,7 +209,7 @@ static void demux_Handle( block_t *p_ts )
{ {
dvbpsi_PushPacket( p_pat_dvbpsi_handle, p_ts->p_ts ); dvbpsi_PushPacket( p_pat_dvbpsi_handle, p_ts->p_ts );
if ( block_UnitStart( p_ts ) ) if ( block_UnitStart( p_ts ) )
SendPAT(); SendPAT( p_ts->i_dts );
} }
else if ( b_enable_epg && i_pid == EIT_PID ) else if ( b_enable_epg && i_pid == EIT_PID )
{ {
...@@ -217,11 +219,11 @@ static void demux_Handle( block_t *p_ts ) ...@@ -217,11 +219,11 @@ static void demux_Handle( block_t *p_ts )
{ {
dvbpsi_PushPacket( p_sdt_dvbpsi_handle, p_ts->p_ts ); dvbpsi_PushPacket( p_sdt_dvbpsi_handle, p_ts->p_ts );
if ( block_UnitStart( p_ts ) ) if ( block_UnitStart( p_ts ) )
{ {
uint8_t *p_payload = block_GetPayload( p_ts ); uint8_t *p_payload = block_GetPayload( p_ts );
if ( p_payload && *(p_payload + *p_payload + 1) == 0x42 ) if ( p_payload && *(p_payload + *p_payload + 1) == 0x42 )
SendSDT(); SendSDT( p_ts->i_dts );
} }
} }
...@@ -242,7 +244,7 @@ static void demux_Handle( block_t *p_ts ) ...@@ -242,7 +244,7 @@ static void demux_Handle( block_t *p_ts )
dvbpsi_PushPacket( pp_sids[i]->p_dvbpsi_handle, dvbpsi_PushPacket( pp_sids[i]->p_dvbpsi_handle,
p_ts->p_ts ); p_ts->p_ts );
if ( block_UnitStart( p_ts ) ) if ( block_UnitStart( p_ts ) )
SendPMT( pp_sids[i] ); SendPMT( pp_sids[i], p_ts->i_dts );
} }
} }
} }
...@@ -265,7 +267,7 @@ static void demux_Handle( block_t *p_ts ) ...@@ -265,7 +267,7 @@ static void demux_Handle( block_t *p_ts )
if ( pp_outputs[j]->i_sid == i_sid ) if ( pp_outputs[j]->i_sid == i_sid )
{ {
pp_outputs[j]->i_ref_timestamp = i_timestamp; pp_outputs[j]->i_ref_timestamp = i_timestamp;
pp_outputs[j]->i_ref_wallclock = 0; pp_outputs[j]->i_ref_wallclock = p_ts->i_dts;
} }
} }
} }
...@@ -430,7 +432,7 @@ void demux_Change( output_t *p_output, uint16_t i_sid, ...@@ -430,7 +432,7 @@ void demux_Change( output_t *p_output, uint16_t i_sid,
{ {
if ( pp_sids[i]->i_sid == i_sid ) if ( pp_sids[i]->i_sid == i_sid )
{ {
if ( pp_sids[i]->p_current_pmt != NULL if ( pp_sids[i]->p_current_pmt != NULL
&& PMTNeedsDescrambling( pp_sids[i]->p_current_pmt ) ) && PMTNeedsDescrambling( pp_sids[i]->p_current_pmt ) )
en50221_UpdatePMT( pp_sids[i]->p_current_pmt ); en50221_UpdatePMT( pp_sids[i]->p_current_pmt );
break; break;
...@@ -454,6 +456,40 @@ void demux_Change( output_t *p_output, uint16_t i_sid, ...@@ -454,6 +456,40 @@ void demux_Change( output_t *p_output, uint16_t i_sid,
NewPMT( p_output ); NewPMT( p_output );
} }
/*****************************************************************************
* SetDTS
*****************************************************************************/
static void SetDTS( block_t *p_list )
{
int i_nb_ts = 0, i;
mtime_t i_duration;
block_t *p_ts = p_list;
while ( p_ts != NULL )
{
i_nb_ts++;
p_ts = p_ts->p_next;
}
/* We suppose the stream is CBR, at least between two consecutive read().
* This is especially true in budget mode */
if ( i_last_dts == -1 )
i_duration = 0;
else
i_duration = i_wallclock - i_last_dts;
p_ts = p_list;
i = i_nb_ts - 1;
while ( p_ts != NULL )
{
p_ts->i_dts = i_wallclock - i_duration * i / i_nb_ts;
i--;
p_ts = p_ts->p_next;
}
i_last_dts = i_wallclock;
}
/***************************************************************************** /*****************************************************************************
* SetPID/UnsetPID * SetPID/UnsetPID
*****************************************************************************/ *****************************************************************************/
...@@ -630,7 +666,8 @@ static void GetPIDS( uint16_t **ppi_wanted_pids, int *pi_nb_wanted_pids, ...@@ -630,7 +666,8 @@ static void GetPIDS( uint16_t **ppi_wanted_pids, int *pi_nb_wanted_pids,
* WritePSISection * WritePSISection
*****************************************************************************/ *****************************************************************************/
static block_t *WritePSISection( dvbpsi_psi_section_t *p_section, static block_t *WritePSISection( dvbpsi_psi_section_t *p_section,
uint16_t i_pid, uint8_t *pi_cc ) uint16_t i_pid, uint8_t *pi_cc,
mtime_t i_dts )
{ {
block_t *p_block, **pp_last = &p_block; block_t *p_block, **pp_last = &p_block;
uint32_t i_length; uint32_t i_length;
...@@ -678,6 +715,8 @@ static block_t *WritePSISection( dvbpsi_psi_section_t *p_section, ...@@ -678,6 +715,8 @@ static block_t *WritePSISection( dvbpsi_psi_section_t *p_section,
for( i = 4 + b_first + i_copy; i < 188; i++ ) for( i = 4 + b_first + i_copy; i < 188; i++ )
p_ts->p_ts[i] = 0xff; p_ts->p_ts[i] = 0xff;
p_ts->i_dts = i_dts;
b_first = 0; b_first = 0;
i_length -= i_copy; i_length -= i_copy;
p_data += i_copy; p_data += i_copy;
...@@ -690,7 +729,7 @@ static block_t *WritePSISection( dvbpsi_psi_section_t *p_section, ...@@ -690,7 +729,7 @@ static block_t *WritePSISection( dvbpsi_psi_section_t *p_section,
/***************************************************************************** /*****************************************************************************
* SendPAT * SendPAT
*****************************************************************************/ *****************************************************************************/
static void SendPAT( void ) static void SendPAT( mtime_t i_dts )
{ {
int i; int i;
...@@ -720,7 +759,7 @@ static void SendPAT( void ) ...@@ -720,7 +759,7 @@ static void SendPAT( void )
block_t *p_block; block_t *p_block;
p_block = WritePSISection( pp_outputs[i]->p_pat_section, PAT_PID, p_block = WritePSISection( pp_outputs[i]->p_pat_section, PAT_PID,
&pp_outputs[i]->i_pat_cc ); &pp_outputs[i]->i_pat_cc, i_dts );
while ( p_block != NULL ) while ( p_block != NULL )
{ {
block_t *p_next = p_block->p_next; block_t *p_next = p_block->p_next;
...@@ -735,7 +774,7 @@ static void SendPAT( void ) ...@@ -735,7 +774,7 @@ static void SendPAT( void )
/***************************************************************************** /*****************************************************************************
* SendSDT * SendSDT
*****************************************************************************/ *****************************************************************************/
static void SendSDT( void ) static void SendSDT( mtime_t i_dts )
{ {
int i; int i;
...@@ -746,7 +785,7 @@ static void SendSDT( void ) ...@@ -746,7 +785,7 @@ static void SendSDT( void )
block_t *p_block; block_t *p_block;
p_block = WritePSISection( pp_outputs[i]->p_sdt_section, SDT_PID, p_block = WritePSISection( pp_outputs[i]->p_sdt_section, SDT_PID,
&pp_outputs[i]->i_sdt_cc ); &pp_outputs[i]->i_sdt_cc, i_dts );
while ( p_block != NULL ) while ( p_block != NULL )
{ {
block_t *p_next = p_block->p_next; block_t *p_next = p_block->p_next;
...@@ -761,7 +800,7 @@ static void SendSDT( void ) ...@@ -761,7 +800,7 @@ static void SendSDT( void )
/***************************************************************************** /*****************************************************************************
* SendPMT * SendPMT
*****************************************************************************/ *****************************************************************************/
static void SendPMT( sid_t *p_sid ) static void SendPMT( sid_t *p_sid, mtime_t i_dts )
{ {
int i; int i;
...@@ -775,7 +814,7 @@ static void SendPMT( sid_t *p_sid ) ...@@ -775,7 +814,7 @@ static void SendPMT( sid_t *p_sid )
p_block = WritePSISection( pp_outputs[i]->p_pmt_section, p_block = WritePSISection( pp_outputs[i]->p_pmt_section,
p_sid->i_pmt_pid, p_sid->i_pmt_pid,
&pp_outputs[i]->i_pmt_cc ); &pp_outputs[i]->i_pmt_cc, i_dts );
while ( p_block != NULL ) while ( p_block != NULL )
{ {
block_t *p_next = p_block->p_next; block_t *p_next = p_block->p_next;
...@@ -812,7 +851,7 @@ static void SendEIT( dvbpsi_psi_section_t *p_section, uint16_t i_sid, ...@@ -812,7 +851,7 @@ static void SendEIT( dvbpsi_psi_section_t *p_section, uint16_t i_sid,
p_block = WritePSISection( p_section, p_block = WritePSISection( p_section,
EIT_PID, EIT_PID,
&pp_outputs[i]->i_eit_cc ); &pp_outputs[i]->i_eit_cc, i_wallclock );
while ( p_block != NULL ) while ( p_block != NULL )
{ {
block_t *p_next = p_block->p_next; block_t *p_next = p_block->p_next;
......
/***************************************************************************** /*****************************************************************************
* dvb.c: linux-dvb input for DVBlast * dvb.c: linux-dvb input for DVBlast
***************************************************************************** *****************************************************************************
* Copyright (C) 2008-2009 VideoLAN * Copyright (C) 2008-2010 VideoLAN
* $Id$ * $Id$
* *
* Authors: Christophe Massiot <massiot@via.ecp.fr> * Authors: Christophe Massiot <massiot@via.ecp.fr>
...@@ -119,11 +119,10 @@ void dvb_Reset( void ) ...@@ -119,11 +119,10 @@ void dvb_Reset( void )
/***************************************************************************** /*****************************************************************************
* dvb_Read * dvb_Read
*****************************************************************************/ *****************************************************************************/
block_t *dvb_Read( void ) block_t *dvb_Read( mtime_t i_poll_timeout )
{ {
struct pollfd ufds[4]; struct pollfd ufds[4];
int i_ret, i_nb_fd = 2; int i_ret, i_nb_fd = 2;
mtime_t i_wallclock;
block_t *p_blocks = NULL; block_t *p_blocks = NULL;
memset( ufds, 0, sizeof(ufds) ); memset( ufds, 0, sizeof(ufds) );
...@@ -144,7 +143,9 @@ block_t *dvb_Read( void ) ...@@ -144,7 +143,9 @@ block_t *dvb_Read( void )
i_nb_fd++; i_nb_fd++;
} }
i_ret = poll( ufds, i_nb_fd, 100 ); i_ret = poll( ufds, i_nb_fd, (i_poll_timeout + 999) / 1000 );
i_wallclock = mdate();
if ( i_ret < 0 ) if ( i_ret < 0 )
{ {
...@@ -156,7 +157,6 @@ block_t *dvb_Read( void ) ...@@ -156,7 +157,6 @@ block_t *dvb_Read( void )
if ( ufds[0].revents ) if ( ufds[0].revents )
p_blocks = DVRRead(); p_blocks = DVRRead();
i_wallclock = mdate();
if ( i_ca_handle && i_ca_type == CA_CI_LINK ) if ( i_ca_handle && i_ca_type == CA_CI_LINK )
{ {
if ( ufds[i_nb_fd - 1].revents ) if ( ufds[i_nb_fd - 1].revents )
...@@ -353,7 +353,7 @@ static void FrontendPoll( void ) ...@@ -353,7 +353,7 @@ static void FrontendPoll( void )
else else
{ {
msg_Dbg( NULL, "frontend has lost lock" ); msg_Dbg( NULL, "frontend has lost lock" );
i_frontend_timeout = mdate() + FRONTEND_LOCK_TIMEOUT; i_frontend_timeout = i_wallclock + FRONTEND_LOCK_TIMEOUT;
} }
IF_UP( FE_REINIT ) IF_UP( FE_REINIT )
...@@ -802,7 +802,7 @@ static void FrontendSet( bool b_init ) ...@@ -802,7 +802,7 @@ static void FrontendSet( bool b_init )
} }
i_last_status = 0; i_last_status = 0;
i_frontend_timeout = mdate() + FRONTEND_LOCK_TIMEOUT; i_frontend_timeout = i_wallclock + FRONTEND_LOCK_TIMEOUT;
} }
#else /* !S2API */ #else /* !S2API */
...@@ -896,7 +896,7 @@ static void FrontendSet( bool b_init ) ...@@ -896,7 +896,7 @@ static void FrontendSet( bool b_init )
} }
i_last_status = 0; i_last_status = 0;
i_frontend_timeout = mdate() + FRONTEND_LOCK_TIMEOUT; i_frontend_timeout = i_wallclock + FRONTEND_LOCK_TIMEOUT;
} }
#endif /* S2API */ #endif /* S2API */
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
/***************************************************************************** /*****************************************************************************
* Local declarations * Local declarations
*****************************************************************************/ *****************************************************************************/
mtime_t i_wallclock = 0;
output_t **pp_outputs = NULL; output_t **pp_outputs = NULL;
int i_nb_outputs = 0; int i_nb_outputs = 0;
output_t output_dup = { 0 }; output_t output_dup = { 0 };
...@@ -65,6 +66,8 @@ int b_slow_cam = 0; ...@@ -65,6 +66,8 @@ int b_slow_cam = 0;
int b_output_udp = 0; int b_output_udp = 0;
int b_enable_epg = 0; int b_enable_epg = 0;
int b_unique_tsid = 0; int b_unique_tsid = 0;
mtime_t i_output_latency = DEFAULT_OUTPUT_LATENCY;
mtime_t i_max_retention = DEFAULT_MAX_RETENTION;
volatile int b_hup_received = 0; volatile int b_hup_received = 0;
int i_verbose = DEFAULT_VERBOSITY; int i_verbose = DEFAULT_VERBOSITY;
int i_syslog = 0; int i_syslog = 0;
...@@ -74,7 +77,7 @@ int b_src_rawudp = 0; ...@@ -74,7 +77,7 @@ int b_src_rawudp = 0;
int i_asi_adapter = 0; int i_asi_adapter = 0;
void (*pf_Open)( void ) = NULL; void (*pf_Open)( void ) = NULL;
block_t * (*pf_Read)( void ) = NULL; block_t * (*pf_Read)( mtime_t i_poll_timeout ) = NULL;
int (*pf_SetFilter)( uint16_t i_pid ) = NULL; int (*pf_SetFilter)( uint16_t i_pid ) = NULL;
void (*pf_UnsetFilter)( int i_fd, uint16_t i_pid ) = NULL; void (*pf_UnsetFilter)( int i_fd, uint16_t i_pid ) = NULL;
...@@ -315,7 +318,7 @@ static void DisplayVersion() ...@@ -315,7 +318,7 @@ static void DisplayVersion()
*****************************************************************************/ *****************************************************************************/
void usage() void usage()
{ {
msg_Raw( NULL, "Usage: dvblast [-q] [-c <config file>] [-r <remote socket>] [-t <ttl>] [-o <SSRC IP>] [-i <RT priority>] [-a <adapter>] [-n <frontend number>] [-S <diseqc>] [-f <frequency>|-D <src mcast>:<port>|-A <ASI adapter>] [-F <fec inner>] [-R <rolloff>] [-s <symbol rate>] [-v <0|13|18>] [-p] [-b <bandwidth>] [-m <modulation] [-u] [-W] [-U] [-d <dest IP:port>] [-e] [-T]" ); msg_Raw( NULL, "Usage: dvblast [-q] [-c <config file>] [-r <remote socket>] [-t <ttl>] [-o <SSRC IP>] [-i <RT priority>] [-a <adapter>] [-n <frontend number>] [-S <diseqc>] [-f <frequency>|-D <src mcast>:<port>|-A <ASI adapter>] [-F <fec inner>] [-R <rolloff>] [-s <symbol rate>] [-v <0|13|18>] [-p] [-b <bandwidth>] [-m <modulation] [-u] [-W] [-U] [-L <latency>] [-E <retention>] [-d <dest IP:port>] [-e] [-T]" );
msg_Raw( NULL, "Input:" ); msg_Raw( NULL, "Input:" );
msg_Raw( NULL, " -a --adapter <adapter>" ); msg_Raw( NULL, " -a --adapter <adapter>" );
...@@ -343,6 +346,8 @@ void usage() ...@@ -343,6 +346,8 @@ void usage()
msg_Raw( NULL, "Output:" ); msg_Raw( NULL, "Output:" );
msg_Raw( NULL, " -c --config-file <config file>" ); msg_Raw( NULL, " -c --config-file <config file>" );
msg_Raw( NULL, " -L --latency maximum latency allowed between input and output (default: 100 ms)" );
msg_Raw( NULL, " -E --retention maximum retention allowed between input and output (default: 40 ms)" );
msg_Raw( NULL, " -d --duplicate duplicate all received packets to a given destination" ); msg_Raw( NULL, " -d --duplicate duplicate all received packets to a given destination" );
msg_Raw( NULL, " -o --rtp-output <SSRC IP>" ); msg_Raw( NULL, " -o --rtp-output <SSRC IP>" );
msg_Raw( NULL, " -t --ttl <ttl> TTL of the output stream" ); msg_Raw( NULL, " -t --ttl <ttl> TTL of the output stream" );
...@@ -360,6 +365,7 @@ void usage() ...@@ -360,6 +365,7 @@ void usage()
int main( int i_argc, char **pp_argv ) int main( int i_argc, char **pp_argv )
{ {
mtime_t i_poll_timeout = MAX_POLL_TIMEOUT;
struct sched_param param; struct sched_param param;
int i_error; int i_error;
int c; int c;
...@@ -393,6 +399,8 @@ int main( int i_argc, char **pp_argv ) ...@@ -393,6 +399,8 @@ int main( int i_argc, char **pp_argv )
{ "slow-cam", no_argument, NULL, 'W' }, { "slow-cam", no_argument, NULL, 'W' },
{ "udp", no_argument, NULL, 'U' }, { "udp", no_argument, NULL, 'U' },
{ "unique-ts-id", no_argument, NULL, 'T' }, { "unique-ts-id", no_argument, NULL, 'T' },
{ "latency", required_argument, NULL, 'L' },
{ "retention", required_argument, NULL, 'E' },
{ "duplicate", required_argument, NULL, 'd' }, { "duplicate", required_argument, NULL, 'd' },
{ "rtp-input", required_argument, NULL, 'D' }, { "rtp-input", required_argument, NULL, 'D' },
{ "asi-adapter", required_argument, NULL, 'A' }, { "asi-adapter", required_argument, NULL, 'A' },
...@@ -401,9 +409,9 @@ int main( int i_argc, char **pp_argv ) ...@@ -401,9 +409,9 @@ int main( int i_argc, char **pp_argv )
{ "help", no_argument, NULL, 'h' }, { "help", no_argument, NULL, 'h' },
{ "version", no_argument, NULL, 'V' }, { "version", no_argument, NULL, 'V' },
{ 0, 0, 0, 0} { 0, 0, 0, 0}
}; };
while ( ( c = getopt_long(i_argc, pp_argv, "q::c:r:t:o:i:a:n:f:F:R:s:S:v:pb:m:uWUTd:D:A:lehV", long_options, NULL)) != -1 ) while ( ( c = getopt_long(i_argc, pp_argv, "q::c:r:t:o:i:a:n:f:F:R:s:S:v:pb:m:uWUTL:E:d:D:A:lehV", long_options, NULL)) != -1 )
{ {
switch ( c ) switch ( c )
{ {
...@@ -522,6 +530,14 @@ int main( int i_argc, char **pp_argv ) ...@@ -522,6 +530,14 @@ int main( int i_argc, char **pp_argv )
b_output_udp = 1; b_output_udp = 1;
break; break;
case 'L':
i_output_latency = strtoll( optarg, NULL, 0 ) * 1000;
break;
case 'E':
i_max_retention = strtoll( optarg, NULL, 0 ) * 1000;
break;
case 'd': case 'd':
{ {
char *psz_token, *psz_displayname; char *psz_token, *psz_displayname;
...@@ -720,6 +736,8 @@ int main( int i_argc, char **pp_argv ) ...@@ -720,6 +736,8 @@ int main( int i_argc, char **pp_argv )
for ( ; ; ) for ( ; ; )
{ {
block_t *p_ts;
if ( b_hup_received ) if ( b_hup_received )
{ {
b_hup_received = 0; b_hup_received = 0;
...@@ -727,7 +745,12 @@ int main( int i_argc, char **pp_argv ) ...@@ -727,7 +745,12 @@ int main( int i_argc, char **pp_argv )
ReadConfiguration( psz_conf_file ); ReadConfiguration( psz_conf_file );
} }
demux_Run(); p_ts = pf_Read( i_poll_timeout );
if ( p_ts != NULL )
demux_Run( p_ts );
i_poll_timeout = output_Send();
if ( i_poll_timeout == -1 || i_poll_timeout > MAX_POLL_TIMEOUT )
i_poll_timeout = MAX_POLL_TIMEOUT;
} }
if ( b_enable_syslog ) if ( b_enable_syslog )
......
...@@ -39,6 +39,9 @@ ...@@ -39,6 +39,9 @@
#define WATCHDOG_WAIT 10000000LL #define WATCHDOG_WAIT 10000000LL
#define MAX_ERRORS 1000 #define MAX_ERRORS 1000
#define DEFAULT_VERBOSITY 3 #define DEFAULT_VERBOSITY 3
#define MAX_POLL_TIMEOUT 100000 /* 100 ms */
#define DEFAULT_OUTPUT_LATENCY 200000 /* 200 ms */
#define DEFAULT_MAX_RETENTION 40000 /* 40 ms */
/***************************************************************************** /*****************************************************************************
* Output configuration flags (for output_t -> i_config) - bit values * Output configuration flags (for output_t -> i_config) - bit values
...@@ -61,9 +64,12 @@ typedef struct block_t ...@@ -61,9 +64,12 @@ typedef struct block_t
{ {
uint8_t p_ts[TS_SIZE]; uint8_t p_ts[TS_SIZE];
int i_refcount; int i_refcount;
mtime_t i_dts;
struct block_t *p_next; struct block_t *p_next;
} block_t; } block_t;
typedef struct packet_t packet_t;
typedef struct output_t typedef struct output_t
{ {
/* address information, protocol agnostic */ /* address information, protocol agnostic */
...@@ -75,8 +81,7 @@ typedef struct output_t ...@@ -75,8 +81,7 @@ typedef struct output_t
/* output */ /* output */
int i_handle; int i_handle;
block_t *pp_blocks[NB_BLOCKS]; packet_t *p_packets, *p_last_packet;
int i_depth;
uint16_t i_cc; uint16_t i_cc;
mtime_t i_ref_timestamp; mtime_t i_ref_timestamp;
mtime_t i_ref_wallclock; mtime_t i_ref_wallclock;
...@@ -124,6 +129,9 @@ extern int b_slow_cam; ...@@ -124,6 +129,9 @@ extern int b_slow_cam;
extern int b_output_udp; extern int b_output_udp;
extern int b_enable_epg; extern int b_enable_epg;
extern int b_unique_tsid; extern int b_unique_tsid;
extern mtime_t i_output_latency;
extern mtime_t i_max_retention;
extern mtime_t i_wallclock;
extern volatile int b_hup_received; extern volatile int b_hup_received;
extern int i_comm_fd; extern int i_comm_fd;
extern uint16_t i_src_port; extern uint16_t i_src_port;
...@@ -132,7 +140,7 @@ extern int b_src_rawudp; ...@@ -132,7 +140,7 @@ extern int b_src_rawudp;
extern int i_asi_adapter; extern int i_asi_adapter;
extern void (*pf_Open)( void ); extern void (*pf_Open)( void );
extern block_t * (*pf_Read)( void ); extern block_t * (*pf_Read)( mtime_t i_poll_timeout );
extern int (*pf_SetFilter)( uint16_t i_pid ); extern int (*pf_SetFilter)( uint16_t i_pid );
extern void (*pf_UnsetFilter)( int i_fd, uint16_t i_pid ); extern void (*pf_UnsetFilter)( int i_fd, uint16_t i_pid );
...@@ -158,32 +166,35 @@ void hexDump( uint8_t *p_data, uint32_t i_len ); ...@@ -158,32 +166,35 @@ void hexDump( uint8_t *p_data, uint32_t i_len );
void dvb_Open( void ); void dvb_Open( void );
void dvb_Reset( void ); void dvb_Reset( void );
block_t * dvb_Read( void ); block_t * dvb_Read( mtime_t i_poll_timeout );
int dvb_SetFilter( uint16_t i_pid ); int dvb_SetFilter( uint16_t i_pid );
void dvb_UnsetFilter( int i_fd, uint16_t i_pid ); void dvb_UnsetFilter( int i_fd, uint16_t i_pid );
uint8_t dvb_FrontendStatus( uint8_t *p_answer, ssize_t *pi_size ); uint8_t dvb_FrontendStatus( uint8_t *p_answer, ssize_t *pi_size );
void udp_Open( void ); void udp_Open( void );
block_t * udp_Read( void ); block_t * udp_Read( mtime_t i_poll_timeout );
int udp_SetFilter( uint16_t i_pid ); int udp_SetFilter( uint16_t i_pid );
void udp_UnsetFilter( int i_fd, uint16_t i_pid ); void udp_UnsetFilter( int i_fd, uint16_t i_pid );
void asi_Open( void ); void asi_Open( void );
block_t * asi_Read( void ); block_t * asi_Read( mtime_t i_poll_timeout );
int asi_SetFilter( uint16_t i_pid ); int asi_SetFilter( uint16_t i_pid );
void asi_UnsetFilter( int i_fd, uint16_t i_pid ); void asi_UnsetFilter( int i_fd, uint16_t i_pid );
void demux_Open( void ); void demux_Open( void );
void demux_Run( void ); void demux_Run( block_t *p_ts );
void demux_Change( output_t *p_output, uint16_t i_sid, void demux_Change( output_t *p_output, uint16_t i_sid,
uint16_t *pi_pids, int i_nb_pids ); uint16_t *pi_pids, int i_nb_pids );
void demux_ResendCAPMTs( void ); void demux_ResendCAPMTs( void );
int PIDIsSelected( uint16_t i_pid ); int PIDIsSelected( uint16_t i_pid );
output_t *output_Create( uint8_t i_config, const char *psz_displayname, void *p_init_data ); output_t *output_Create( uint8_t i_config, const char *psz_displayname,
int output_Init( output_t *p_output, uint8_t i_config, const char *psz_displayname, void *p_init_data ); void *p_init_data );
int output_Init( output_t *p_output, uint8_t i_config,
const char *psz_displayname, void *p_init_data );
void output_Close( output_t *p_output ); void output_Close( output_t *p_output );
void output_Put( output_t *p_output, block_t *p_block ); void output_Put( output_t *p_output, block_t *p_block );
mtime_t output_Send( void );
void comm_Open( void ); void comm_Open( void );
void comm_Read( void ); void comm_Read( void );
...@@ -302,4 +313,3 @@ static inline uint8_t *block_GetPayload( block_t *p_block ) ...@@ -302,4 +313,3 @@ static inline uint8_t *block_GetPayload( block_t *p_block )
return &p_block->p_ts[4]; return &p_block->p_ts[4];
return &p_block->p_ts[ 5 + p_block->p_ts[4] ]; return &p_block->p_ts[ 5 + p_block->p_ts[4] ];
} }
...@@ -1492,7 +1492,7 @@ static void DateTimeSend( access_t * p_access, int i_session_id ) ...@@ -1492,7 +1492,7 @@ static void DateTimeSend( access_t * p_access, int i_session_id )
APDUSend( p_access, i_session_id, AOT_DATE_TIME, p_response, 7 ); APDUSend( p_access, i_session_id, AOT_DATE_TIME, p_response, 7 );
p_date->i_last = mdate(); p_date->i_last = i_wallclock;
} }
} }
...@@ -1540,7 +1540,7 @@ static void DateTimeManage( access_t * p_access, int i_session_id ) ...@@ -1540,7 +1540,7 @@ static void DateTimeManage( access_t * p_access, int i_session_id )
(date_time_t *)p_sessions[i_session_id - 1].p_sys; (date_time_t *)p_sessions[i_session_id - 1].p_sys;
if ( p_date->i_interval if ( p_date->i_interval
&& mdate() > p_date->i_last + (mtime_t)p_date->i_interval * 1000000 ) && i_wallclock > p_date->i_last + (mtime_t)p_date->i_interval * 1000000 )
{ {
DateTimeSend( p_access, i_session_id ); DateTimeSend( p_access, i_session_id );
} }
...@@ -2188,7 +2188,7 @@ void en50221_Poll( void ) ...@@ -2188,7 +2188,7 @@ void en50221_Poll( void )
{ {
if ( !p_slot->i_init_timeout ) if ( !p_slot->i_init_timeout )
InitSlot( NULL, i_slot ); InitSlot( NULL, i_slot );
else if ( p_slot->i_init_timeout < mdate() ) else if ( p_slot->i_init_timeout < i_wallclock )
{ {
msg_Dbg( NULL, "en50221_Poll: resetting slot %d", i_slot ); msg_Dbg( NULL, "en50221_Poll: resetting slot %d", i_slot );
ResetSlot( i_slot ); ResetSlot( i_slot );
......
/***************************************************************************** /*****************************************************************************
* output.c * output.c
***************************************************************************** *****************************************************************************
* Copyright (C) 2004, 2008-2009 VideoLAN * Copyright (C) 2004, 2008-2010 VideoLAN
* $Id$ * $Id$
* *
* Authors: Christophe Massiot <massiot@via.ecp.fr> * Authors: Christophe Massiot <massiot@via.ecp.fr>
...@@ -40,13 +40,42 @@ ...@@ -40,13 +40,42 @@
/***************************************************************************** /*****************************************************************************
* Local prototypes * Local prototypes
*****************************************************************************/ *****************************************************************************/
struct packet_t
{
block_t *pp_blocks[NB_BLOCKS];
int i_depth;
mtime_t i_dts;
struct packet_t *p_next;
};
static uint8_t p_pad_ts[TS_SIZE] = {
0x47, 0x1f, 0xff, 0x10, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
};
static int net_Open( output_t *p_output ); static int net_Open( output_t *p_output );
static void rtp_SetHdr( output_t *p_output, uint8_t *p_hdr ); static void rtp_SetHdr( output_t *p_output, packet_t *p_packet,
uint8_t *p_hdr );
/***************************************************************************** /*****************************************************************************
* output_Create : called from main thread * output_Create : called from main thread
*****************************************************************************/ *****************************************************************************/
output_t *output_Create( uint8_t i_config, const char *psz_displayname, void *p_init_data ) output_t *output_Create( uint8_t i_config, const char *psz_displayname,
void *p_init_data )
{ {
int i; int i;
output_t *p_output = NULL; output_t *p_output = NULL;
...@@ -78,10 +107,11 @@ output_t *output_Create( uint8_t i_config, const char *psz_displayname, void *p_ ...@@ -78,10 +107,11 @@ output_t *output_Create( uint8_t i_config, const char *psz_displayname, void *p_
/***************************************************************************** /*****************************************************************************
* output_Init * output_Init
*****************************************************************************/ *****************************************************************************/
int output_Init( output_t *p_output, uint8_t i_config, const char *psz_displayname, void *p_init_data ) int output_Init( output_t *p_output, uint8_t i_config,
const char *psz_displayname, void *p_init_data )
{ {
p_output->i_sid = 0; p_output->i_sid = 0;
p_output->i_depth = 0; p_output->p_packets = p_output->p_last_packet = NULL;
p_output->pi_pids = NULL; p_output->pi_pids = NULL;
p_output->i_nb_pids = 0; p_output->i_nb_pids = 0;
...@@ -127,17 +157,21 @@ int output_Init( output_t *p_output, uint8_t i_config, const char *psz_displayna ...@@ -127,17 +157,21 @@ int output_Init( output_t *p_output, uint8_t i_config, const char *psz_displayna
*****************************************************************************/ *****************************************************************************/
void output_Close( output_t *p_output ) void output_Close( output_t *p_output )
{ {
int i; packet_t *p_packet = p_output->p_packets;
while ( p_packet != NULL )
for ( i = 0; i < p_output->i_depth; i++ )
{ {
p_output->pp_blocks[i]->i_refcount--; int i;
if ( !p_output->pp_blocks[i]->i_refcount )
block_Delete( p_output->pp_blocks[i] ); for ( i = 0; i < p_packet->i_depth; i++ )
{
p_packet->pp_blocks[i]->i_refcount--;
if ( !p_packet->pp_blocks[i]->i_refcount )
block_Delete( p_packet->pp_blocks[i] );
}
} }
p_output->p_packets = p_output->p_last_packet = NULL;
free( p_output->psz_displayname ); free( p_output->psz_displayname );
p_output->i_depth = 0;
p_output->i_config &= ~OUTPUT_VALID; p_output->i_config &= ~OUTPUT_VALID;
close( p_output->i_handle ); close( p_output->i_handle );
} }
...@@ -147,49 +181,55 @@ void output_Close( output_t *p_output ) ...@@ -147,49 +181,55 @@ void output_Close( output_t *p_output )
*****************************************************************************/ *****************************************************************************/
static void output_Flush( output_t *p_output ) static void output_Flush( output_t *p_output )
{ {
packet_t *p_packet = p_output->p_packets;
struct iovec p_iov[NB_BLOCKS + 1]; struct iovec p_iov[NB_BLOCKS + 1];
uint8_t p_rtp_hdr[RTP_SIZE]; uint8_t p_rtp_hdr[RTP_SIZE];
int i; int i_block_cnt = ( p_output->p_addr->ss_family == AF_INET6 ) ?
NB_BLOCKS_IPV6 : NB_BLOCKS;
int i_block_cnt = ( p_output->p_addr->ss_family == AF_INET6 ) ? NB_BLOCKS_IPV6 : NB_BLOCKS; int i_iov, i_block;
int i_outblocks = i_block_cnt;
if ( !b_output_udp && !(p_output->i_config & OUTPUT_UDP) ) if ( !b_output_udp && !(p_output->i_config & OUTPUT_UDP) )
{ {
p_iov[0].iov_base = p_rtp_hdr; p_iov[0].iov_base = p_rtp_hdr;
p_iov[0].iov_len = sizeof(p_rtp_hdr); p_iov[0].iov_len = sizeof(p_rtp_hdr);
rtp_SetHdr( p_output, p_rtp_hdr ); rtp_SetHdr( p_output, p_packet, p_rtp_hdr );
i_iov = 1;
for ( i = 1; i < i_block_cnt + 1; i++ )
{
p_iov[i].iov_base = p_output->pp_blocks[i - 1]->p_ts;
p_iov[i].iov_len = TS_SIZE;
}
i_outblocks += 1;
} }
else else
i_iov = 0;
for ( i_block = 0; i_block < p_packet->i_depth; i_block++ )
{ {
for ( i = 0; i < i_block_cnt; i++ ) p_iov[i_iov].iov_base = p_packet->pp_blocks[i_block]->p_ts;
{ p_iov[i_iov].iov_len = TS_SIZE;
p_iov[i].iov_base = p_output->pp_blocks[i]->p_ts; i_iov++;
p_iov[i].iov_len = TS_SIZE;
}
} }
if ( writev( p_output->i_handle, p_iov, i_outblocks ) < 0 ) for ( ; i_block < i_block_cnt; i_block++ )
{
p_iov[i_iov].iov_base = p_pad_ts;
p_iov[i_iov].iov_len = TS_SIZE;
i_iov++;
}
if ( writev( p_output->i_handle, p_iov, i_iov ) < 0 )
{ {
msg_Err( NULL, "couldn't writev to %s (%s)", msg_Err( NULL, "couldn't writev to %s (%s)",
p_output->psz_displayname, strerror(errno) ); p_output->psz_displayname, strerror(errno) );
} }
/* Update the wallclock because writev() can take some time. */
i_wallclock = mdate();
for ( i = 0; i < i_block_cnt; i++ ) for ( i_block = 0; i_block < p_packet->i_depth; i_block++ )
{ {
p_output->pp_blocks[i]->i_refcount--; p_packet->pp_blocks[i_block]->i_refcount--;
if ( !p_output->pp_blocks[i]->i_refcount ) if ( !p_packet->pp_blocks[i_block]->i_refcount )
block_Delete( p_output->pp_blocks[i] ); block_Delete( p_packet->pp_blocks[i_block] );
} }
p_output->i_depth = 0; p_output->p_packets = p_packet->p_next;
free( p_packet );
if ( p_output->p_packets == NULL )
p_output->p_last_packet = NULL;
} }
/***************************************************************************** /*****************************************************************************
...@@ -197,16 +237,64 @@ static void output_Flush( output_t *p_output ) ...@@ -197,16 +237,64 @@ static void output_Flush( output_t *p_output )
*****************************************************************************/ *****************************************************************************/
void output_Put( output_t *p_output, block_t *p_block ) void output_Put( output_t *p_output, block_t *p_block )
{ {
int i_block_cnt = ( p_output->p_addr->ss_family == AF_INET6 ) ?
NB_BLOCKS_IPV6 : NB_BLOCKS;
packet_t *p_packet;
p_block->i_refcount++; p_block->i_refcount++;
p_output->pp_blocks[p_output->i_depth] = p_block; if ( p_output->p_last_packet != NULL
p_output->i_depth++; && p_output->p_last_packet->i_depth < i_block_cnt
&& p_output->p_last_packet->i_dts + i_max_retention > p_block->i_dts )
{
p_packet = p_output->p_last_packet;
if ( block_HasPCR( p_block ) )
p_packet->i_dts = p_block->i_dts;
}
else
{
p_packet = malloc( sizeof(packet_t) );
p_packet->i_depth = 0;
p_packet->p_next = NULL;
p_packet->i_dts = p_block->i_dts;
if ( p_output->p_last_packet != NULL )
p_output->p_last_packet->p_next = p_packet;
else
p_output->p_packets = p_packet;
p_output->p_last_packet = p_packet;
}
p_packet->pp_blocks[p_packet->i_depth] = p_block;
p_packet->i_depth++;
}
/*****************************************************************************
* output_Send : called from main to flush the queues when needed
*****************************************************************************/
mtime_t output_Send( void )
{
mtime_t i_earliest_dts = -1;
int i;
for ( i = 0; i < i_nb_outputs; i++ )
{
output_t *p_output = pp_outputs[i];
if ( !( p_output->i_config & OUTPUT_VALID ) )
continue;
while ( p_output->p_packets != NULL
&& p_output->p_packets->i_dts + i_output_latency
<= i_wallclock )
output_Flush( p_output );
if ( p_output->p_packets != NULL
&& (p_output->p_packets->i_dts < i_earliest_dts
|| i_earliest_dts == -1) )
i_earliest_dts = p_output->p_packets->i_dts;
}
if ( ( ( p_output->p_addr->ss_family == AF_INET6 ) && return i_earliest_dts == -1 ? -1 :
( p_output->i_depth >= NB_BLOCKS_IPV6 ) ) || i_earliest_dts + i_output_latency - i_wallclock;
( ( p_output->p_addr->ss_family == AF_INET ) &&
( p_output->i_depth >= NB_BLOCKS ) ) )
output_Flush( p_output );
} }
/***************************************************************************** /*****************************************************************************
...@@ -244,7 +332,8 @@ static int net_Open( output_t *p_output ) ...@@ -244,7 +332,8 @@ static int net_Open( output_t *p_output )
} }
} }
if ( connect( i_handle, (struct sockaddr *)p_output->p_addr, p_output->i_addrlen ) < 0 ) if ( connect( i_handle, (struct sockaddr *)p_output->p_addr,
p_output->i_addrlen ) < 0 )
{ {
msg_Err( NULL, "couldn't connect socket to %s (%s)", msg_Err( NULL, "couldn't connect socket to %s (%s)",
p_output->psz_displayname, strerror(errno) ); p_output->psz_displayname, strerror(errno) );
...@@ -274,20 +363,12 @@ static int net_Open( output_t *p_output ) ...@@ -274,20 +363,12 @@ static int net_Open( output_t *p_output )
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/ */
static void rtp_SetHdr( output_t *p_output, uint8_t *p_hdr ) static void rtp_SetHdr( output_t *p_output, packet_t *p_packet, uint8_t *p_hdr )
{ {
mtime_t i_timestamp; mtime_t i_timestamp;
if (!p_output->i_ref_wallclock) i_timestamp = p_output->i_ref_timestamp
{ + (p_packet->i_dts - p_output->i_ref_wallclock) * 9 / 100;
i_timestamp = p_output->i_ref_timestamp;
p_output->i_ref_wallclock = mdate();
}
else
{
i_timestamp = p_output->i_ref_timestamp
+ (mdate() - p_output->i_ref_wallclock) * 9 / 100;
}
p_hdr[0] = 0x80; p_hdr[0] = 0x80;
p_hdr[1] = 33; p_hdr[1] = 33;
......
...@@ -96,52 +96,76 @@ void udp_Open( void ) ...@@ -96,52 +96,76 @@ void udp_Open( void )
/***************************************************************************** /*****************************************************************************
* udp_Read * udp_Read
*****************************************************************************/ *****************************************************************************/
block_t *udp_Read( void ) block_t *udp_Read( mtime_t i_poll_timeout )
{ {
int i = 0, i_len; struct pollfd pfd;
struct iovec p_iov[NB_BLOCKS + !b_src_rawudp]; int i_ret;
block_t *p_ts, **pp_current = &p_ts;
uint8_t p_rtp_hdr[RTP_SIZE];
if ( !b_src_rawudp ) pfd.fd = i_handle;
{ pfd.events = POLLIN;
/* FIXME : this is wrong if RTP header > 12 bytes */
p_iov[i].iov_base = p_rtp_hdr;
p_iov[i].iov_len = RTP_SIZE;
i++;
}
for ( ; i < NB_BLOCKS + !b_src_rawudp; i++ ) i_ret = poll( &pfd, 1, (i_poll_timeout + 999) / 1000 );
{
*pp_current = block_New(); i_wallclock = mdate();
p_iov[i].iov_base = (*pp_current)->p_ts;
p_iov[i].iov_len = TS_SIZE;
pp_current = &(*pp_current)->p_next;
}
if ( (i_len = readv( i_handle, p_iov, NB_BLOCKS + !b_src_rawudp )) < 0 ) if ( i_ret < 0 )
{ {
msg_Err( NULL, "couldn't read from network (%s)", if( errno != EINTR )
strerror(errno) ); msg_Err( NULL, "couldn't poll from socket (%s)",
i_len = 0; strerror(errno) );
return NULL;
} }
if ( !b_src_rawudp )
i_len -= RTP_SIZE;
i_len /= TS_SIZE;
pp_current = &p_ts; if ( pfd.revents )
while ( i_len && *pp_current )
{ {
pp_current = &(*pp_current)->p_next; struct iovec p_iov[NB_BLOCKS + !b_src_rawudp];
i_len--; block_t *p_ts, **pp_current = &p_ts;
} int i = 0, i_len;
uint8_t p_rtp_hdr[RTP_SIZE];
if ( !b_src_rawudp )
{
/* FIXME : this is wrong if RTP header > 12 bytes */
p_iov[i].iov_base = p_rtp_hdr;
p_iov[i].iov_len = RTP_SIZE;
i++;
}
for ( ; i < NB_BLOCKS + !b_src_rawudp; i++ )
{
*pp_current = block_New();
p_iov[i].iov_base = (*pp_current)->p_ts;
p_iov[i].iov_len = TS_SIZE;
pp_current = &(*pp_current)->p_next;
}
if ( (i_len = readv( i_handle, p_iov, NB_BLOCKS + !b_src_rawudp )) < 0 )
{
msg_Err( NULL, "couldn't read from network (%s)",
strerror(errno) );
i_len = 0;
}
if ( !b_src_rawudp )
i_len -= RTP_SIZE;
i_len /= TS_SIZE;
if ( *pp_current ) pp_current = &p_ts;
msg_Dbg( NULL, "partial buffer received" ); while ( i_len && *pp_current )
block_DeleteChain( *pp_current ); {
*pp_current = NULL; pp_current = &(*pp_current)->p_next;
i_len--;
}
i_wallclock = mdate();
return p_ts; if ( *pp_current )
msg_Dbg( NULL, "partial buffer received" );
block_DeleteChain( *pp_current );
*pp_current = NULL;
return p_ts;
}
return NULL;
} }
/* From now on these are just stubs */ /* From now on these are just stubs */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment