Commit 1fcd8954 authored by Rémi Denis-Courmont's avatar Rémi Denis-Courmont

Fix memleaks on error path, fix sign warnings, cleanup

parent 5da43874
...@@ -115,32 +115,21 @@ static ssize_t Write ( sout_access_out_t *, block_t * ); ...@@ -115,32 +115,21 @@ static ssize_t Write ( sout_access_out_t *, block_t * );
static int Seek ( sout_access_out_t *, off_t ); static int Seek ( sout_access_out_t *, off_t );
static int Control( sout_access_out_t *, int, va_list ); static int Control( sout_access_out_t *, int, va_list );
static void* ThreadWrite( vlc_object_t * ); static void* ThreadWrite( void * );
static block_t *NewUDPPacket( sout_access_out_t *, mtime_t ); static block_t *NewUDPPacket( sout_access_out_t *, mtime_t );
typedef struct sout_access_thread_t
{
VLC_COMMON_MEMBERS
block_fifo_t *p_fifo;
int i_handle;
int64_t i_caching;
int i_group;
block_fifo_t *p_empty_blocks;
} sout_access_thread_t;
struct sout_access_out_sys_t struct sout_access_out_sys_t
{ {
int i_mtu; mtime_t i_caching;
int i_handle;
bool b_mtu_warning; bool b_mtu_warning;
size_t i_mtu;
block_t *p_buffer; block_fifo_t *p_fifo;
block_fifo_t *p_empty_blocks;
sout_access_thread_t *p_thread; block_t *p_buffer;
vlc_thread_t thread;
}; };
#define DEFAULT_PORT 1234 #define DEFAULT_PORT 1234
...@@ -171,7 +160,7 @@ static int Open( vlc_object_t *p_this ) ...@@ -171,7 +160,7 @@ static int Open( vlc_object_t *p_this )
return VLC_ENOMEM; return VLC_ENOMEM;
} }
if( !( p_sys = calloc ( 1, sizeof( sout_access_out_sys_t ) ) ) ) if( !( p_sys = malloc ( sizeof( *p_sys ) ) ) )
return VLC_ENOMEM; return VLC_ENOMEM;
p_access->p_sys = p_sys; p_access->p_sys = p_sys;
...@@ -193,21 +182,6 @@ static int Open( vlc_object_t *p_this ) ...@@ -193,21 +182,6 @@ static int Open( vlc_object_t *p_this )
i_dst_port = atoi (psz_parser); i_dst_port = atoi (psz_parser);
} }
p_sys->p_thread =
vlc_object_create( p_access, sizeof( sout_access_thread_t ) );
if( !p_sys->p_thread )
{
free (p_sys);
free (psz_dst_addr);
return VLC_ENOMEM;
}
vlc_object_attach( p_sys->p_thread, p_access );
p_sys->p_thread->b_die = 0;
p_sys->p_thread->b_error= 0;
p_sys->p_thread->p_fifo = block_FifoNew();
p_sys->p_thread->p_empty_blocks = block_FifoNew();
i_handle = net_ConnectDgram( p_this, psz_dst_addr, i_dst_port, -1, i_handle = net_ConnectDgram( p_this, psz_dst_addr, i_dst_port, -1,
IPPROTO_UDP ); IPPROTO_UDP );
free (psz_dst_addr); free (psz_dst_addr);
...@@ -215,7 +189,6 @@ static int Open( vlc_object_t *p_this ) ...@@ -215,7 +189,6 @@ static int Open( vlc_object_t *p_this )
if( i_handle == -1 ) if( i_handle == -1 )
{ {
msg_Err( p_access, "failed to create raw UDP socket" ); msg_Err( p_access, "failed to create raw UDP socket" );
vlc_object_release (p_sys->p_thread);
free (p_sys); free (p_sys);
return VLC_EGENERIC; return VLC_EGENERIC;
} }
...@@ -238,23 +211,24 @@ static int Open( vlc_object_t *p_this ) ...@@ -238,23 +211,24 @@ static int Open( vlc_object_t *p_this )
var_SetInteger (p_access, "dst-port", port); var_SetInteger (p_access, "dst-port", port);
} }
} }
p_sys->p_thread->i_handle = i_handle;
shutdown( i_handle, SHUT_RD ); shutdown( i_handle, SHUT_RD );
p_sys->p_thread->i_caching = p_sys->i_caching = UINT64_C(1000)
(int64_t)1000 * var_GetInteger( p_access, SOUT_CFG_PREFIX "caching"); * var_GetInteger( p_access, SOUT_CFG_PREFIX "caching");
p_sys->p_thread->i_group = p_sys->i_handle = i_handle;
var_GetInteger( p_access, SOUT_CFG_PREFIX "group" );
p_sys->i_mtu = var_CreateGetInteger( p_this, "mtu" ); p_sys->i_mtu = var_CreateGetInteger( p_this, "mtu" );
p_sys->b_mtu_warning = false;
p_sys->p_fifo = block_FifoNew();
p_sys->p_empty_blocks = block_FifoNew();
p_sys->p_buffer = NULL; p_sys->p_buffer = NULL;
if( vlc_thread_create( p_sys->p_thread, "sout write thread", ThreadWrite, if( vlc_clone( &p_sys->thread, ThreadWrite, p_access,
VLC_THREAD_PRIORITY_HIGHEST ) ) VLC_THREAD_PRIORITY_HIGHEST ) )
{ {
msg_Err( p_access, "cannot spawn sout access thread" ); msg_Err( p_access, "cannot spawn sout access thread" );
block_FifoRelease( p_sys->p_fifo );
block_FifoRelease( p_sys->p_empty_blocks );
net_Close (i_handle); net_Close (i_handle);
vlc_object_release( p_sys->p_thread );
free (p_sys); free (p_sys);
return VLC_EGENERIC; return VLC_EGENERIC;
} }
...@@ -273,32 +247,15 @@ static void Close( vlc_object_t * p_this ) ...@@ -273,32 +247,15 @@ static void Close( vlc_object_t * p_this )
{ {
sout_access_out_t *p_access = (sout_access_out_t*)p_this; sout_access_out_t *p_access = (sout_access_out_t*)p_this;
sout_access_out_sys_t *p_sys = p_access->p_sys; sout_access_out_sys_t *p_sys = p_access->p_sys;
int i;
vlc_object_kill( p_sys->p_thread );
for( i = 0; i < 10; i++ )
{
block_t *p_dummy = block_New( p_access, p_sys->i_mtu );
p_dummy->i_dts = 0;
p_dummy->i_pts = 0;
p_dummy->i_length = 0;
memset( p_dummy->p_buffer, 0, p_dummy->i_buffer );
block_FifoPut( p_sys->p_thread->p_fifo, p_dummy );
}
vlc_thread_join( p_sys->p_thread );
block_FifoRelease( p_sys->p_thread->p_fifo ); vlc_cancel( p_sys->thread );
block_FifoRelease( p_sys->p_thread->p_empty_blocks ); vlc_join( p_sys->thread, NULL );
block_FifoRelease( p_sys->p_fifo );
block_FifoRelease( p_sys->p_empty_blocks );
if( p_sys->p_buffer ) block_Release( p_sys->p_buffer ); if( p_sys->p_buffer ) block_Release( p_sys->p_buffer );
net_Close( p_sys->p_thread->i_handle ); net_Close( p_sys->i_handle );
vlc_object_detach( p_sys->p_thread );
vlc_object_release( p_sys->p_thread );
msg_Dbg( p_access, "UDP access output closed" );
free( p_sys ); free( p_sys );
} }
...@@ -343,22 +300,21 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer ) ...@@ -343,22 +300,21 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
if( p_sys->p_buffer && if( p_sys->p_buffer &&
p_sys->p_buffer->i_buffer + p_buffer->i_buffer > p_sys->i_mtu ) p_sys->p_buffer->i_buffer + p_buffer->i_buffer > p_sys->i_mtu )
{ {
if( p_sys->p_buffer->i_dts + p_sys->p_thread->i_caching < now ) if( p_sys->p_buffer->i_dts + p_sys->i_caching < now )
{ {
msg_Dbg( p_access, "late packet for UDP input (%"PRId64 ")", msg_Dbg( p_access, "late packet for UDP input (%"PRId64 ")",
now - p_sys->p_buffer->i_dts now - p_sys->p_buffer->i_dts
- p_sys->p_thread->i_caching ); - p_sys->i_caching );
} }
block_FifoPut( p_sys->p_thread->p_fifo, p_sys->p_buffer ); block_FifoPut( p_sys->p_fifo, p_sys->p_buffer );
p_sys->p_buffer = NULL; p_sys->p_buffer = NULL;
} }
i_len += p_buffer->i_buffer; i_len += p_buffer->i_buffer;
while( p_buffer->i_buffer ) while( p_buffer->i_buffer )
{ {
int i_payload_size = p_sys->i_mtu; size_t i_payload_size = p_sys->i_mtu;
size_t i_write = __MIN( p_buffer->i_buffer, i_payload_size );
int i_write = __MIN( p_buffer->i_buffer, i_payload_size );
i_packets++; i_packets++;
...@@ -384,13 +340,13 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer ) ...@@ -384,13 +340,13 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
if( p_sys->p_buffer->i_buffer == p_sys->i_mtu || i_packets > 1 ) if( p_sys->p_buffer->i_buffer == p_sys->i_mtu || i_packets > 1 )
{ {
/* Flush */ /* Flush */
if( p_sys->p_buffer->i_dts + p_sys->p_thread->i_caching < now ) if( p_sys->p_buffer->i_dts + p_sys->i_caching < now )
{ {
msg_Dbg( p_access, "late packet for udp input (%"PRId64 ")", msg_Dbg( p_access, "late packet for udp input (%"PRId64 ")",
mdate() - p_sys->p_buffer->i_dts mdate() - p_sys->p_buffer->i_dts
- p_sys->p_thread->i_caching ); - p_sys->i_caching );
} }
block_FifoPut( p_sys->p_thread->p_fifo, p_sys->p_buffer ); block_FifoPut( p_sys->p_fifo, p_sys->p_buffer );
p_sys->p_buffer = NULL; p_sys->p_buffer = NULL;
} }
} }
...@@ -400,7 +356,7 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer ) ...@@ -400,7 +356,7 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
p_buffer = p_next; p_buffer = p_next;
} }
return( p_sys->p_thread->b_error ? -1 : i_len ); return i_len;
} }
/***************************************************************************** /*****************************************************************************
...@@ -408,6 +364,7 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer ) ...@@ -408,6 +364,7 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
*****************************************************************************/ *****************************************************************************/
static int Seek( sout_access_out_t *p_access, off_t i_pos ) static int Seek( sout_access_out_t *p_access, off_t i_pos )
{ {
(void) i_pos;
msg_Err( p_access, "UDP sout access cannot seek" ); msg_Err( p_access, "UDP sout access cannot seek" );
return -1; return -1;
} }
...@@ -420,19 +377,19 @@ static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts) ...@@ -420,19 +377,19 @@ static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts)
sout_access_out_sys_t *p_sys = p_access->p_sys; sout_access_out_sys_t *p_sys = p_access->p_sys;
block_t *p_buffer; block_t *p_buffer;
while ( block_FifoCount( p_sys->p_thread->p_empty_blocks ) > MAX_EMPTY_BLOCKS ) while ( block_FifoCount( p_sys->p_empty_blocks ) > MAX_EMPTY_BLOCKS )
{ {
p_buffer = block_FifoGet( p_sys->p_thread->p_empty_blocks ); p_buffer = block_FifoGet( p_sys->p_empty_blocks );
block_Release( p_buffer ); block_Release( p_buffer );
} }
if( block_FifoCount( p_sys->p_thread->p_empty_blocks ) == 0 ) if( block_FifoCount( p_sys->p_empty_blocks ) == 0 )
{ {
p_buffer = block_Alloc( p_sys->i_mtu ); p_buffer = block_Alloc( p_sys->i_mtu );
} }
else else
{ {
p_buffer = block_FifoGet(p_sys->p_thread->p_empty_blocks ); p_buffer = block_FifoGet(p_sys->p_empty_blocks );
p_buffer->i_flags = 0; p_buffer->i_flags = 0;
p_buffer = block_Realloc( p_buffer, 0, p_sys->i_mtu ); p_buffer = block_Realloc( p_buffer, 0, p_sys->i_mtu );
} }
...@@ -446,41 +403,31 @@ static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts) ...@@ -446,41 +403,31 @@ static block_t *NewUDPPacket( sout_access_out_t *p_access, mtime_t i_dts)
/***************************************************************************** /*****************************************************************************
* ThreadWrite: Write a packet on the network at the good time. * ThreadWrite: Write a packet on the network at the good time.
*****************************************************************************/ *****************************************************************************/
static void* ThreadWrite( vlc_object_t *p_this ) static void* ThreadWrite( void *data )
{ {
sout_access_thread_t *p_thread = (sout_access_thread_t*)p_this; sout_access_out_t *p_access = data;
mtime_t i_date_last = -1; sout_access_out_sys_t *p_sys = p_access->p_sys;
mtime_t i_to_send = p_thread->i_group; mtime_t i_date_last = -1;
int i_dropped_packets = 0; const unsigned i_group = var_GetInteger( p_access,
SOUT_CFG_PREFIX "group" );
mtime_t i_to_send = i_group;
unsigned i_dropped_packets = 0;
for (;;) for (;;)
{ {
block_t *p_pk; block_t *p_pk = block_FifoGet( p_sys->p_fifo );
mtime_t i_date, i_sent; mtime_t i_date, i_sent;
#if 0
if( (i++ % 1000)==0 ) {
int i = 0;
int j = 0;
block_t *p_tmp = p_thread->p_empty_blocks->p_first;
while( p_tmp ) { p_tmp = p_tmp->p_next; i++;}
p_tmp = p_thread->p_fifo->p_first;
while( p_tmp ) { p_tmp = p_tmp->p_next; j++;}
msg_Dbg( p_thread, "fifo depth: %d/%d, empty blocks: %d/%d",
p_thread->p_fifo->i_depth, j,p_thread->p_empty_blocks->i_depth,i );
}
#endif
p_pk = block_FifoGet( p_thread->p_fifo );
i_date = p_thread->i_caching + p_pk->i_dts; i_date = p_sys->i_caching + p_pk->i_dts;
if( i_date_last > 0 ) if( i_date_last > 0 )
{ {
if( i_date - i_date_last > 2000000 ) if( i_date - i_date_last > 2000000 )
{ {
if( !i_dropped_packets ) if( !i_dropped_packets )
msg_Dbg( p_thread, "mmh, hole (%"PRId64" > 2s) -> drop", msg_Dbg( p_access, "mmh, hole (%"PRId64" > 2s) -> drop",
i_date - i_date_last ); i_date - i_date_last );
block_FifoPut( p_thread->p_empty_blocks, p_pk ); block_FifoPut( p_sys->p_empty_blocks, p_pk );
i_date_last = i_date; i_date_last = i_date;
i_dropped_packets++; i_dropped_packets++;
...@@ -489,7 +436,7 @@ static void* ThreadWrite( vlc_object_t *p_this ) ...@@ -489,7 +436,7 @@ static void* ThreadWrite( vlc_object_t *p_this )
else if( i_date - i_date_last < -1000 ) else if( i_date - i_date_last < -1000 )
{ {
if( !i_dropped_packets ) if( !i_dropped_packets )
msg_Dbg( p_thread, "mmh, packets in the past (%"PRId64")", msg_Dbg( p_access, "mmh, packets in the past (%"PRId64")",
i_date_last - i_date ); i_date_last - i_date );
} }
} }
...@@ -499,16 +446,15 @@ static void* ThreadWrite( vlc_object_t *p_this ) ...@@ -499,16 +446,15 @@ static void* ThreadWrite( vlc_object_t *p_this )
if( !i_to_send || (p_pk->i_flags & BLOCK_FLAG_CLOCK) ) if( !i_to_send || (p_pk->i_flags & BLOCK_FLAG_CLOCK) )
{ {
mwait( i_date ); mwait( i_date );
i_to_send = p_thread->i_group; i_to_send = i_group;
} }
if ( send( p_thread->i_handle, p_pk->p_buffer, if ( send( p_sys->i_handle, p_pk->p_buffer, p_pk->i_buffer, 0 ) == -1 )
p_pk->i_buffer, 0 ) == -1 ) msg_Warn( p_access, "send error: %m" );
msg_Warn( p_thread, "send error: %m" );
vlc_cleanup_pop(); vlc_cleanup_pop();
if( i_dropped_packets ) if( i_dropped_packets )
{ {
msg_Dbg( p_thread, "dropped %i packets", i_dropped_packets ); msg_Dbg( p_access, "dropped %i packets", i_dropped_packets );
i_dropped_packets = 0; i_dropped_packets = 0;
} }
...@@ -516,12 +462,12 @@ static void* ThreadWrite( vlc_object_t *p_this ) ...@@ -516,12 +462,12 @@ static void* ThreadWrite( vlc_object_t *p_this )
i_sent = mdate(); i_sent = mdate();
if ( i_sent > i_date + 20000 ) if ( i_sent > i_date + 20000 )
{ {
msg_Dbg( p_thread, "packet has been sent too late (%"PRId64 ")", msg_Dbg( p_access, "packet has been sent too late (%"PRId64 ")",
i_sent - i_date ); i_sent - i_date );
} }
#endif #endif
block_FifoPut( p_thread->p_empty_blocks, p_pk ); block_FifoPut( p_sys->p_empty_blocks, p_pk );
i_date_last = i_date; i_date_last = i_date;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment