Commit 4a3274fb authored by Marian Durkovic's avatar Marian Durkovic

modifications for RTP reordering:

 * for RTP, prebuffering is done within RTP access
 * reordering now working within this buffer
 * solves synchro problems since packet rate is preserved 
parent 60acd31a
......@@ -103,6 +103,8 @@ struct access_t
int i_title; /* idem, start from 0 (could be menu) */
int i_seekpoint;/* idem, start from 0 */
vlc_bool_t b_prebuffered; /* Read only for input */
} info;
access_sys_t *p_sys;
};
......
......@@ -77,6 +77,7 @@ typedef struct block_sys_t block_sys_t;
struct block_t
{
block_t *p_next;
block_t *p_prev;
uint32_t i_flags;
......@@ -87,6 +88,8 @@ struct block_t
int i_samples; /* Used for audio */
int i_rate;
uint16_t i_seqno; /* Used for RTP */
int i_buffer;
uint8_t *p_buffer;
......
......@@ -48,12 +48,11 @@
#define AUTO_MTU_LONGTEXT N_( \
"Allows growing the MTU if truncated packets are found" )
#define RTP_LATE_TEXT N_("Reorder timeout in ms for late RTP packets")
#define RTP_LATE_TEXT N_("RTP reordering timeout in ms")
#define RTP_LATE_LONGTEXT N_( \
"Allows you to modify the RTP packets reorder and late behaviour. " \
"If enabled (value>0) then out-of-order packets will be held for the " \
"specified timeout in ms. " \
"The default behaviour is not to reorder." )
"Allows you to modify the RTP reordering behaviour. " \
"RTP input will wait for late packets upto " \
"the specified timeout in milisecond units." )
static int Open ( vlc_object_t * );
static void Close( vlc_object_t * );
......@@ -66,11 +65,11 @@ vlc_module_begin();
add_integer( "udp-caching", DEFAULT_PTS_DELAY / 1000, NULL, CACHING_TEXT,
CACHING_LONGTEXT, VLC_TRUE );
add_integer( "rtp-late", 100, NULL, RTP_LATE_TEXT, RTP_LATE_LONGTEXT, VLC_TRUE );
add_bool( "udp-auto-mtu", 1, NULL,
AUTO_MTU_TEXT, AUTO_MTU_LONGTEXT, VLC_TRUE );
add_integer( "rtp-late", 0, NULL, RTP_LATE_TEXT, RTP_LATE_LONGTEXT, VLC_TRUE );
set_capability( "access2", 0 );
add_shortcut( "udp" );
add_shortcut( "udpstream" );
......@@ -86,7 +85,6 @@ vlc_module_end();
* Local prototypes
*****************************************************************************/
#define RTP_HEADER_LEN 12
#define RTP_SEQ_NUM_SIZE 65536
static block_t *BlockUDP( access_t * );
static block_t *BlockRTP( access_t * );
......@@ -100,17 +98,11 @@ struct access_sys_t
int i_mtu;
vlc_bool_t b_auto_mtu;
/* rtp only */
uint16_t i_sequence_number;
vlc_bool_t b_first_seqno;
/* reorder rtp packets when out-of-bounds
* the packets hold queue is one level deep
*/
uint32_t i_rtp_late; /* number of ms an RTP packet may be too late*/
uint32_t i_last_pcr; /* last known good PCR */
block_t *p_list; /* list of packets to rearrange */
block_t *p_end; /* last packet in p_list */
/* reorder rtp packets when out-of-sequence */
int64_t i_rtp_late;
uint16_t i_last_seqno;
block_t *p_list;
block_t *p_end;
};
/*****************************************************************************
......@@ -215,6 +207,7 @@ static int Open( vlc_object_t *p_this )
p_access->info.i_size = 0;
p_access->info.i_pos = 0;
p_access->info.b_eof = VLC_FALSE;
p_access->info.b_prebuffered = VLC_FALSE;
p_access->info.i_title = 0;
p_access->info.i_seekpoint = 0;
......@@ -242,16 +235,12 @@ static int Open( vlc_object_t *p_this )
/* Update default_pts to a suitable value for udp access */
var_Create( p_access, "udp-caching", VLC_VAR_INTEGER | VLC_VAR_DOINHERIT );
/* Keep track of RTP sequence number */
p_sys->i_sequence_number = 0;
p_sys->b_first_seqno = VLC_TRUE;
/* RTP reordering out-of-bound packets */
p_sys->i_last_pcr = 0;
p_sys->i_rtp_late = var_CreateGetInteger( p_access, "rtp-late" );
/* RTP reordering for out-of-sequence packets */
p_sys->i_rtp_late = var_CreateGetInteger( p_access, "rtp-late" ) * 1000;
p_sys->i_last_seqno = 0;
p_sys->p_list = NULL;
p_sys->p_end = NULL;
return VLC_SUCCESS;
}
......@@ -349,196 +338,77 @@ static block_t *BlockUDP( access_t *p_access )
* rtp_ChainInsert - insert a p_block in the chain and
* look at the sequence numbers.
*/
static inline void rtp_ChainInsert( access_t *p_access, block_t *p_block )
static inline vlc_bool_t rtp_ChainInsert( access_t *p_access, block_t *p_block )
{
access_sys_t *p_sys = (access_sys_t *) p_access->p_sys;
block_t *p_list = p_sys->p_list;
block_t *p_end = p_sys->p_end;
block_t *p = NULL;
uint16_t i_new = 0;
uint16_t i_cur = 0;
uint16_t i_expected = 0;
uint32_t i_pcr_new = 0;
if( !p_block ) return;
if( !p_list )
block_t *p_prev = NULL;
block_t *p = p_sys->p_end;
uint16_t i_new = p_block->i_seqno;
uint16_t i_tmp = 0;
if( !p_sys->p_list )
{
p_sys->p_list = p_block;
p_sys->p_end = p_block;
return;
return VLC_TRUE;
}
/* walk through the queue from top down since the new packet is in
most cases just appended to the end */
/* Appending packets at the end of the chain is the normal case */
i_pcr_new = ( (p_block->p_buffer[4] << 24) +
(p_block->p_buffer[5] << 16) +
(p_block->p_buffer[6] << 8) +
p_block->p_buffer[7] );
i_new = ( (p_block->p_buffer[2] << 8 ) + p_block->p_buffer[3] );
i_cur = ( (p_end->p_buffer[2] << 8 ) + p_end->p_buffer[3] );
i_expected = i_cur + 1;
if( (i_new - i_expected) >= 0 ) /* Append at the end? */
{
msg_Dbg( p_access, "RTP: append %u after %u", i_new, i_cur );
p_end->p_next = p_block;
p_sys->p_end = p_end->p_next;
return;
}
/* Add to the front fo the chain? */
p = p_list;
i_cur = ( (p->p_buffer[2] << 8 ) + p->p_buffer[3] );
if( (i_expected - i_new) > 0 )
{
msg_Dbg( p_access, "RTP: prepend %u before %u", i_cur, i_new );
p_block->p_next = p;
p_sys->p_list = p_block;
return;
}
/* The packet can't be added to the front or the end of the chain,
* thus walk the chain from the start.
*/
while( p )
for( ;; )
{
i_cur = (p->p_buffer[2] << 8 ) + p->p_buffer[3];
i_expected = i_cur+1;
i_tmp = i_new - p->i_seqno;
if( (i_cur - i_new) == 0 )
{
uint32_t i_pcr_cur = ( (p->p_buffer[4] << 24) +
(p->p_buffer[5] << 16) +
(p->p_buffer[6] << 8) +
p->p_buffer[7] );
/* This packet might be a duplicate, so check PCR's */
if( i_pcr_cur >= i_pcr_new )
{
/* packet way too late drop it. */
block_Release( p_block );
return;
}
/* Add it to list later on
* else if( i_pcr_cur < i_pcr_new ) */
if( !i_tmp ) /* trash duplicate */
break;
}
else if( (i_expected - i_new) >= 0 ) /* insert in chain */
{
block_t *p_tmp = NULL;
p_tmp = p->p_next;
msg_Dbg( p_access, "RTP: insert %u after %u", i_new, i_cur );
if ( i_tmp < 32768 )
{ /* insert after this block ( i_new > p->i_seqno ) */
p_block->p_next = p->p_next;
p->p_next = p_block;
p_block->p_next = p_tmp;
return;
}
if( !p->p_next ) break;
p = p->p_next;
}
msg_Dbg(p_access, "RTP: trashing duplicate %d", i_new );
block_Release( p_block );
}
/*
* rtp_ChainSend - look which packets are ready for sending.
*/
static inline block_t *rtp_ChainSend( access_t *p_access, block_t **pp_list, uint16_t i_seq )
{
access_sys_t *p_sys = (access_sys_t *) p_access->p_sys;
uint16_t i_cur = 0;
if( *pp_list )
{
/* Parse RTP header */
int i_skip = 0;
int i_extension_bit = 0;
int i_extension_length = 0;
int i_CSRC_count = 0;
int i_payload_type = 0;
uint32_t i_pcr_prev = 0;
uint16_t i_seq_prev = 0;
/* Data pointers */
block_t *p_prev = NULL;
block_t *p_send = *pp_list;
block_t *p = *pp_list;
while( p )
p_block->p_prev = p;
if (p_prev)
{
i_cur = (p->p_buffer[2] << 8 ) + p->p_buffer[3];
if( (i_cur - i_seq) == 0 )
{
msg_Dbg( p_access, "rtp_ChainSend: sequence number %u", i_seq );
i_seq++; /* sent all packets that are received in order */
/* Remember PCR and sequence number of packet
* for next iteration */
i_pcr_prev = ( (p->p_buffer[4] << 24) +
(p->p_buffer[5] << 16) +
(p->p_buffer[6] << 8) +
p->p_buffer[7] );
i_seq_prev = ( (p->p_buffer[2] << 8 ) +
p->p_buffer[3] );
/* Parse headerfields we need */
i_CSRC_count = p->p_buffer[0] & 0x0F;
i_payload_type = (p->p_buffer[1] & 0x7F);
i_extension_bit = ( p->p_buffer[0] & 0x10 ) >> 4;
if ( i_extension_bit == 1)
i_extension_length = ( (p->p_buffer[14] << 8 ) +
p->p_buffer[15] );
/* Skip header + CSRC extension field n*(32 bits) + extention */
i_skip = RTP_HEADER_LEN + 4*i_CSRC_count + i_extension_length;
if( i_payload_type == 14 ) i_skip += 4;
/* Return the packet without the RTP header. */
p->i_buffer -= i_skip;
p->p_buffer += i_skip;
p_prev->p_prev = p_block;
msg_Dbg(p_access, "RTP reordering: insert after %d, new %d",
p->i_seqno, i_new );
}
else if( (i_cur - i_seq) > 0 )
{
if( p_prev )
else
{
p_sys->p_list = p;
p_prev->p_next = NULL;
p_sys->i_last_pcr = i_pcr_prev;
p_sys->i_sequence_number = i_seq_prev;
return p_send;
p_sys->p_end = p_block;
}
goto out;
return VLC_TRUE;
}
p_prev = p;
if (!p->p_next) break;
p = p->p_next;
if( p == p_sys->p_list )
{ /* we've reached bottom of chain */
i_tmp = p_sys->i_last_seqno - i_new;
if( !p_access->info.b_prebuffered || (i_tmp > 32767) )
{
msg_Dbg(p_access, "RTP reordering: prepend %d before %d",
i_new, p->i_seqno );
p_block->p_next = p;
p->p_prev = p_block;
p_sys->p_list = p_block;
return VLC_TRUE;
}
out:
/* We have walked through the complete chain and all packets are
* in sequence - so send the whole chain
*/
i_payload_type = (p->p_buffer[1] & 0x7F);
i_CSRC_count = p->p_buffer[0] & 0x0F;
i_extension_bit = ( p->p_buffer[0] & 0x10 ) >> 4;
if( i_extension_bit == 1)
i_extension_length = ( (p->p_buffer[14] << 8 ) + p->p_buffer[15] );
/* Skip header + CSRC extension field n*(32 bits) + extention */
i_skip = RTP_HEADER_LEN + 4*i_CSRC_count + i_extension_length;
if( i_payload_type == 14 ) i_skip += 4;
if( !i_tmp ) /* trash duplicate */
break;
/* Update the list pointers */
p_sys->p_list = NULL;
p_sys->p_end = NULL;
p_sys->i_sequence_number = ( (p->p_buffer[2] << 8 ) +
p->p_buffer[3] );
p_sys->i_last_pcr = ( (p->p_buffer[4] << 24) +
(p->p_buffer[5] << 16) +
(p->p_buffer[6] << 8) +
p->p_buffer[7] );
/* Return the packet without the RTP header. */
p->i_buffer -= i_skip;
p->p_buffer += i_skip;
return p_send;
/* reordering failed - append the packet to the end of queue */
msg_Dbg(p_access, "RTP: sequence changed (or buffer too small) "
"new: %d, buffer %d...%d", i_new, p->i_seqno,
p_sys->p_end->i_seqno);
p_sys->p_end->p_next = p_block;
p_block->p_prev = p_sys->p_end;
p_sys->p_end = p_block;
return VLC_TRUE;
}
return NULL;
p_prev = p;
p = p->p_prev;
}
block_Release( p_block );
return VLC_FALSE;
}
/*****************************************************************************
......@@ -551,33 +421,23 @@ static block_t *BlockParseRTP( access_t *p_access, block_t *p_block )
int i_CSRC_count;
int i_payload_type;
int i_skip = 0;
uint16_t i_sequence_number = 0;
uint16_t i_sequence_expected = 0;
int i_extension_bit = 0;
int i_extension_flag = 0;
int i_extension_length = 0;
uint32_t i_pcr = 0;
uint16_t i_sequence_number = 0;
if( p_block == NULL )
return NULL;
if( p_block->i_buffer < RTP_HEADER_LEN )
{
msg_Warn( p_access, "received a too short packet for RTP" );
block_Release( p_block );
return NULL;
}
goto trash;
/* Parse the header and make some verifications.
* See RFC 3550. */
i_rtp_version = ( p_block->p_buffer[0] & 0xC0 ) >> 6;
i_CSRC_count = p_block->p_buffer[0] & 0x0F;
i_extension_flag = p_block->p_buffer[0] & 0x10;
i_payload_type = p_block->p_buffer[1] & 0x7F;
i_sequence_number = (p_block->p_buffer[2] << 8) +
p_block->p_buffer[3];
i_pcr = ( (p_block->p_buffer[4] << 24) +
(p_block->p_buffer[5] << 16) +
(p_block->p_buffer[6] << 8) +
p_block->p_buffer[7] );
i_extension_bit = ( p_block->p_buffer[0] & 0x10 ) >> 4;
i_sequence_number = (p_block->p_buffer[2] << 8 ) + p_block->p_buffer[3];
if( i_rtp_version != 2 )
msg_Dbg( p_access, "RTP version is %u, should be 2", i_rtp_version );
......@@ -586,33 +446,20 @@ static block_t *BlockParseRTP( access_t *p_access, block_t *p_block )
i_skip = 4;
else if( i_payload_type != 33 && i_payload_type != 32 )
msg_Dbg( p_access, "unsupported RTP payload type (%u)", i_payload_type );
if( i_extension_bit == 1)
if( i_extension_flag )
i_extension_length = 4 +
4 * ( (p_block->p_buffer[14] << 8) + p_block->p_buffer[15] );
/* Skip header + CSRC extension field n*(32 bits) + extention */
/* Skip header + CSRC extension field n*(32 bits) + extension */
i_skip += RTP_HEADER_LEN + 4*i_CSRC_count + i_extension_length;
if( i_skip >= p_block->i_buffer )
{
msg_Warn( p_access, "received a too short packet for RTP" );
block_Release( p_block );
return NULL;
}
goto trash;
/* Detect RTP packet loss through tracking sequence numbers,
* and take RTP PCR into account.
* See RFC 3550.
*/
if( p_sys->b_first_seqno )
{
p_sys->i_sequence_number = i_sequence_number;
p_sys->i_last_pcr = i_pcr;
p_sys->b_first_seqno = VLC_FALSE;
i_sequence_expected = i_sequence_number;
}
else
i_sequence_expected = p_sys->i_sequence_number + 1;
/* Return the packet without the RTP header, remember seqno */
p_block->i_buffer -= i_skip;
p_block->p_buffer += i_skip;
p_block->i_seqno = i_sequence_number;
#if 0
/* Emulate packet loss */
......@@ -624,120 +471,78 @@ static block_t *BlockParseRTP( access_t *p_access, block_t *p_block )
}
#endif
if( (i_sequence_expected - i_sequence_number) != 0 )
{
/* Handle out of order packets */
if( p_sys->i_rtp_late > 0 )
{
if( (i_sequence_number - i_sequence_expected) > 0 )
{
msg_Warn( p_access,
"RTP packet out of order (too early) expected %u, current %u",
i_sequence_expected, i_sequence_number );
if( (i_pcr - p_sys->i_last_pcr) >= (p_sys->i_rtp_late*90) )
{
block_t *p_start = p_sys->p_list;
uint16_t i_start = (!p_start) ? p_sys->i_sequence_number :
(p_start->p_buffer[2] << 8) +
p_start->p_buffer[3];
/* Gap too big, we have been holding this data for too long,
* send what we have.
*/
msg_Warn( p_access,
"Gap too big resyncing: delta %u, held for %d ms",
(i_pcr - p_sys->i_last_pcr), p_sys->i_rtp_late );
rtp_ChainInsert( p_access, p_block );
return rtp_ChainSend( p_access, &p_sys->p_list, i_start );
}
/* hold packets that arrive too early. */
rtp_ChainInsert( p_access, p_block );
return rtp_ChainSend( p_access, &p_sys->p_list, i_sequence_expected );
}
else if( /* ((i_sequence_expected - i_sequence_number ) > 0) && */
(p_sys->i_last_pcr - i_pcr) >= 0 )
{
msg_Warn( p_access,
"RTP packet out of order (duplicate or too late) expected %u, current %u .. trashing it",
i_sequence_expected, i_sequence_number );
return p_block;
trash:
msg_Warn( p_access, "received a too short packet for RTP" );
block_Release( p_block );
p_sys->i_sequence_number = i_sequence_number;
p_sys->i_last_pcr = i_pcr;
return NULL;
}
}
if( p_sys->p_list )
{
block_t *p = NULL;
block_t **p_send = &p_sys->p_list;
msg_Warn( p_access,
"RTP packet (unexpected condition) expected %u, current %u",
i_sequence_expected, i_sequence_number );
/* Append block to the end of chain and send whole chain */
block_ChainLastAppend( &p_send, p_block );
p_sys->p_list = p_sys->p_end = NULL;
p_sys->i_sequence_number = i_sequence_number;
p_sys->i_last_pcr = i_pcr;
/* Return the packet without the RTP header. */
p = *p_send;
while( p )
{
p->i_buffer -= i_skip;
p->p_buffer += i_skip;
if( !p->p_next ) break;
p = p->p_next;
}
return *p_send;
}
/* This code should never be reached !! */
msg_Err( p_access,
"Bug in algorithme: (unexpected condition) expected %u (pcr=%u), current %u (pcr=%u)",
i_sequence_expected, i_sequence_number, p_sys->i_last_pcr, i_pcr );
}
msg_Warn( p_access,
"RTP packet(s) lost, expected sequence number %d got %d",
i_sequence_expected, i_sequence_number );
static block_t *BlockPrebufferRTP( access_t *p_access, block_t *p_block )
{
access_sys_t *p_sys = p_access->p_sys;
int64_t i_first = mdate();
int i_count = 0;
block_t *p = p_block;
/* Mark transport error in the first TS packet in the RTP stream. */
if( (i_payload_type == 33) && (p_block->p_buffer[0] == 0x47) )
p_block->p_buffer[1] |= 0x80;
}
else if( (p_sys->i_rtp_late > 0) && p_sys->p_list )
for( ;; )
{
if( (p_sys->i_last_pcr - i_pcr) >= 0 )
int64_t i_date = mdate();
if( p && rtp_ChainInsert( p_access, p ))
i_count++;
/* Require at least 3 packets in the buffer */
if( i_count > 3 && (i_date - i_first) > p_sys->i_rtp_late )
break;
p = BlockParseRTP( p_access, BlockUDP( p_access ));
if( !p && (i_date - i_first) > p_sys->i_rtp_late )
{
msg_Warn( p_access,
"RTP packet out of order (duplicate) expected %u, current %u .. trashing it",
i_sequence_expected, i_sequence_number );
block_Release( p_block );
p_sys->i_sequence_number = i_sequence_number;
p_sys->i_last_pcr = i_pcr;
return NULL;
msg_Err( p_access, "Error in RTP prebuffering!" );
break;
}
rtp_ChainInsert( p_access, p_block );
return rtp_ChainSend( p_access, &p_sys->p_list, i_sequence_expected );
}
/* This is the normal case when no packet reordering is effective */
p_sys->i_sequence_number = i_sequence_number;
p_sys->i_last_pcr = i_pcr;
/* Return the packet without the RTP header. */
p_block->i_buffer -= i_skip;
p_block->p_buffer += i_skip;
return p_block;
msg_Dbg( p_access, "RTP: prebuffered %d packets", i_count - 1 );
p_access->info.b_prebuffered = VLC_TRUE;
p = p_sys->p_list;
p_sys->p_list = p_sys->p_list->p_next;
p_sys->i_last_seqno = p->i_seqno;
p->p_next = NULL;
return p;
}
static block_t *BlockRTP( access_t *p_access )
{
block_t *p_block = BlockUDP( p_access );
access_sys_t *p_sys = p_access->p_sys;
block_t *p;
if ( p_block != NULL )
return BlockParseRTP( p_access, p_block );
else
again:
p = BlockParseRTP( p_access, BlockUDP( p_access ));
if ( !p )
return NULL;
if ( !p_access->info.b_prebuffered )
return BlockPrebufferRTP( p_access, p );
if( !rtp_ChainInsert( p_access, p ))
goto again;
p = p_sys->p_list;
p_sys->p_list = p_sys->p_list->p_next;
p_sys->i_last_seqno++;
if( p_sys->i_last_seqno != p->i_seqno )
{
msg_Dbg( p_access, "RTP: packet(s) lost, expected %d, got %d",
p_sys->i_last_seqno, p->i_seqno );
p_sys->i_last_seqno = p->i_seqno;
}
p->p_next = NULL;
return p;
}
/*****************************************************************************
......@@ -800,7 +605,9 @@ static block_t *BlockChoose( access_t *p_access )
return p_block;
}
if( !BlockParseRTP( p_access, p_block )) return NULL;
p_access->pf_block = BlockRTP;
return BlockParseRTP( p_access, p_block );
return BlockPrebufferRTP( p_access, p_block );
}
......@@ -72,6 +72,7 @@ static access_t *access2_InternalNew( vlc_object_t *p_obj, char *psz_access,
p_access->info.i_size = 0;
p_access->info.i_pos = 0;
p_access->info.b_eof = VLC_FALSE;
p_access->info.b_prebuffered = VLC_FALSE;
p_access->info.i_title = 0;
p_access->info.i_seekpoint = 0;
......
......@@ -566,6 +566,7 @@ static int AStreamControl( stream_t *s, int i_query, va_list args )
static void AStreamPrebufferBlock( stream_t *s )
{
stream_sys_t *p_sys = s->p_sys;
access_t *p_access = p_sys->p_access;
int64_t i_first = 0;
int64_t i_start;
......@@ -606,12 +607,6 @@ static void AStreamPrebufferBlock( stream_t *s )
continue;
}
if( i_first == 0 )
{
i_first = mdate();
msg_Dbg( s, "received first data for our buffer");
}
while( b )
{
/* Append the block */
......@@ -622,6 +617,21 @@ static void AStreamPrebufferBlock( stream_t *s )
p_sys->stat.i_read_count++;
b = b->p_next;
}
if( p_access->info.b_prebuffered )
{
/* Access has already prebufferred - update stats and exit */
p_sys->stat.i_bytes = p_sys->block.i_size;
p_sys->stat.i_read_time = mdate() - i_start;
break;
}
if( i_first == 0 )
{
i_first = mdate();
msg_Dbg( s, "received first data for our buffer");
}
}
p_sys->block.p_current = p_sys->block.p_first;
......
......@@ -64,11 +64,13 @@ block_t *__block_New( vlc_object_t *p_obj, int i_size )
/* Fill all fields */
p_block->p_next = NULL;
p_block->p_prev = NULL;
p_block->i_flags = 0;
p_block->i_pts = 0;
p_block->i_dts = 0;
p_block->i_length = 0;
p_block->i_rate = 0;
p_block->i_seqno = 0;
p_block->i_buffer = i_size;
p_block->p_buffer =
&p_sys->p_allocated_buffer[BLOCK_PADDING_SIZE +
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment