Commit 9745c4a8 authored by Rémi Denis-Courmont's avatar Rémi Denis-Courmont

Sanitize/rewrite the message subscription API

parent a9d31e8c
...@@ -71,14 +71,6 @@ typedef struct ...@@ -71,14 +71,6 @@ typedef struct
* Used by interface plugins which subscribe to the message bank. * Used by interface plugins which subscribe to the message bank.
*/ */
typedef struct msg_subscription_t msg_subscription_t; typedef struct msg_subscription_t msg_subscription_t;
struct msg_subscription_t
{
int i_start;
int* pi_stop;
msg_item_t* p_msg;
vlc_mutex_t* p_lock;
};
/***************************************************************************** /*****************************************************************************
* Prototypes * Prototypes
...@@ -100,10 +92,16 @@ VLC_EXPORT( void, __msg_GenericVa, ( vlc_object_t *, int, const char *, const ch ...@@ -100,10 +92,16 @@ VLC_EXPORT( void, __msg_GenericVa, ( vlc_object_t *, int, const char *, const ch
__msg_Generic( VLC_OBJECT(p_this), VLC_MSG_DBG, \ __msg_Generic( VLC_OBJECT(p_this), VLC_MSG_DBG, \
MODULE_STRING, __VA_ARGS__ ) MODULE_STRING, __VA_ARGS__ )
#define msg_Subscribe(a) __msg_Subscribe(VLC_OBJECT(a)) typedef struct msg_cb_data_t msg_cb_data_t;
#define msg_Unsubscribe(a,b) __msg_Unsubscribe(VLC_OBJECT(a),b)
VLC_EXPORT( msg_subscription_t*, __msg_Subscribe, ( vlc_object_t * ) ); /**
VLC_EXPORT( void, __msg_Unsubscribe, ( vlc_object_t *, msg_subscription_t * ) ); * Message logging callback signature.
* Accepts one private data pointer, the message, and an overrun counter.
*/
typedef void (*msg_callback_t) (msg_cb_data_t *, msg_item_t *, unsigned);
VLC_EXPORT( msg_subscription_t*, msg_Subscribe, ( libvlc_int_t *, msg_callback_t, msg_cb_data_t * ) );
VLC_EXPORT( void, msg_Unsubscribe, ( msg_subscription_t * ) );
/* Enable or disable a certain object debug messages */ /* Enable or disable a certain object debug messages */
#define msg_EnableObjectPrinting(a,b) __msg_EnableObjectPrinting(VLC_OBJECT(a),b) #define msg_EnableObjectPrinting(a,b) __msg_EnableObjectPrinting(VLC_OBJECT(a),b)
......
...@@ -77,12 +77,13 @@ typedef struct msg_bank_t ...@@ -77,12 +77,13 @@ typedef struct msg_bank_t
{ {
/** Message queue lock */ /** Message queue lock */
vlc_mutex_t lock; vlc_mutex_t lock;
bool b_overflow; vlc_cond_t wait;
/* Message queue */ /* Message queue */
msg_item_t msg[VLC_MSG_QSIZE]; /**< message queue */ msg_item_t msg[VLC_MSG_QSIZE]; /**< message queue */
int i_start; int i_start;
int i_stop; int i_stop;
bool b_overflow;
/* Subscribers */ /* Subscribers */
int i_sub; int i_sub;
......
...@@ -223,8 +223,8 @@ __msg_DisableObjectPrinting ...@@ -223,8 +223,8 @@ __msg_DisableObjectPrinting
__msg_EnableObjectPrinting __msg_EnableObjectPrinting
__msg_Generic __msg_Generic
__msg_GenericVa __msg_GenericVa
__msg_Subscribe msg_Subscribe
__msg_Unsubscribe msg_Unsubscribe
msleep msleep
mstrtime mstrtime
mwait mwait
......
...@@ -76,6 +76,10 @@ static uintptr_t banks = 0; ...@@ -76,6 +76,10 @@ static uintptr_t banks = 0;
#endif #endif
#define QUEUE priv->msg_bank #define QUEUE priv->msg_bank
static inline msg_bank_t *libvlc_bank (libvlc_int_t *inst)
{
return &(libvlc_priv (inst))->msg_bank;
}
/***************************************************************************** /*****************************************************************************
* Local prototypes * Local prototypes
...@@ -94,7 +98,10 @@ static vlc_mutex_t msg_stack_lock = VLC_STATIC_MUTEX; ...@@ -94,7 +98,10 @@ static vlc_mutex_t msg_stack_lock = VLC_STATIC_MUTEX;
void msg_Create (libvlc_int_t *p_libvlc) void msg_Create (libvlc_int_t *p_libvlc)
{ {
libvlc_priv_t *priv = libvlc_priv (p_libvlc); libvlc_priv_t *priv = libvlc_priv (p_libvlc);
vlc_mutex_init( &QUEUE.lock ); msg_bank_t *bank = libvlc_bank (p_libvlc);
vlc_mutex_init (&bank->lock);
vlc_cond_init (&bank->wait);
vlc_dictionary_init( &priv->msg_enabled_objects, 0 ); vlc_dictionary_init( &priv->msg_enabled_objects, 0 );
priv->msg_all_objects_enabled = true; priv->msg_all_objects_enabled = true;
...@@ -168,9 +175,10 @@ void __msg_DisableObjectPrinting (vlc_object_t *p_this, char * psz_object) ...@@ -168,9 +175,10 @@ void __msg_DisableObjectPrinting (vlc_object_t *p_this, char * psz_object)
void msg_Destroy (libvlc_int_t *p_libvlc) void msg_Destroy (libvlc_int_t *p_libvlc)
{ {
libvlc_priv_t *priv = libvlc_priv (p_libvlc); libvlc_priv_t *priv = libvlc_priv (p_libvlc);
msg_bank_t *bank = libvlc_bank (p_libvlc);
if( QUEUE.i_sub ) if( QUEUE.i_sub )
msg_Err( p_libvlc, "stale interface subscribers" ); msg_Err( p_libvlc, "stale interface subscribers (VLC might crash)" );
FlushMsg( &QUEUE ); FlushMsg( &QUEUE );
...@@ -185,52 +193,101 @@ void msg_Destroy (libvlc_int_t *p_libvlc) ...@@ -185,52 +193,101 @@ void msg_Destroy (libvlc_int_t *p_libvlc)
vlc_dictionary_clear( &priv->msg_enabled_objects, NULL, NULL ); vlc_dictionary_clear( &priv->msg_enabled_objects, NULL, NULL );
/* Destroy lock */ vlc_cond_destroy (&bank->wait);
vlc_mutex_destroy( &QUEUE.lock ); vlc_mutex_destroy (&bank->lock);
}
struct msg_subscription_t
{
vlc_thread_t thread;
libvlc_int_t *instance;
msg_callback_t func;
msg_cb_data_t *opaque;
msg_item_t *items[VLC_MSG_QSIZE];
unsigned begin, end;
unsigned overruns;
};
static void *msg_thread (void *data)
{
msg_subscription_t *sub = data;
msg_bank_t *bank = libvlc_bank (sub->instance);
/* TODO: finer-grained locking and/or msg_item_t refcount */
vlc_mutex_lock (&bank->lock);
mutex_cleanup_push (&bank->lock);
for (;;)
{
/* Wait for messages */
assert (sub->begin < VLC_MSG_QSIZE);
assert (sub->end < VLC_MSG_QSIZE);
while (sub->begin != sub->end)
{
sub->func (sub->opaque, sub->items[sub->begin], sub->overruns);
if (++sub->begin == VLC_MSG_QSIZE)
sub->begin = 0;
sub->overruns = 0;
}
vlc_cond_wait (&bank->wait, &bank->lock);
}
vlc_cleanup_pop ();
assert (0);
} }
/** /**
* Subscribe to a message queue. * Subscribe to the message queue.
* Whenever a message is emitted, a callback will be called.
* Callback invocation are serialized within a subscription.
*
* @param instance LibVLC instance to get messages from
* @param cb callback function
* @param opaque data for the callback function
* @return a subscription pointer, or NULL in case of failure
*/ */
msg_subscription_t *__msg_Subscribe( vlc_object_t *p_this ) msg_subscription_t *msg_Subscribe (libvlc_int_t *instance, msg_callback_t cb,
msg_cb_data_t *opaque)
{ {
libvlc_priv_t *priv = libvlc_priv (p_this->p_libvlc); msg_subscription_t *sub = malloc (sizeof (*sub));
msg_subscription_t *p_sub = malloc( sizeof( msg_subscription_t ) ); if (sub == NULL)
if (p_sub == NULL)
return NULL; return NULL;
vlc_mutex_lock( &QUEUE.lock ); sub->instance = instance;
sub->func = cb;
TAB_APPEND( QUEUE.i_sub, QUEUE.pp_sub, p_sub ); sub->opaque = opaque;
sub->begin = sub->end = sub->overruns = 0;
p_sub->i_start = QUEUE.i_start; if (vlc_clone (&sub->thread, msg_thread, sub, VLC_THREAD_PRIORITY_LOW))
p_sub->pi_stop = &QUEUE.i_stop; {
p_sub->p_msg = QUEUE.msg; free (sub);
p_sub->p_lock = &QUEUE.lock; return NULL;
}
vlc_mutex_unlock( &QUEUE.lock ); msg_bank_t *bank = libvlc_bank (instance);
vlc_mutex_lock (&bank->lock);
TAB_APPEND (bank->i_sub, bank->pp_sub, sub);
vlc_mutex_unlock (&bank->lock);
return p_sub; return sub;
} }
/** /**
* Unsubscribe from a message queue. * Unsubscribe from the message queue.
* This function waits for the message callback to return if needed.
*/ */
void __msg_Unsubscribe( vlc_object_t *p_this, msg_subscription_t *p_sub ) void msg_Unsubscribe (msg_subscription_t *sub)
{ {
libvlc_priv_t *priv = libvlc_priv (p_this->p_libvlc); msg_bank_t *bank = libvlc_bank (sub->instance);
vlc_mutex_lock( &QUEUE.lock ); /* TODO: flush support? */
for( int j = 0 ; j< QUEUE.i_sub ; j++ ) vlc_cancel (sub->thread);
{ vlc_mutex_lock (&bank->lock);
if( QUEUE.pp_sub[j] == p_sub ) TAB_REMOVE (bank->i_sub, bank->pp_sub, sub);
{ vlc_mutex_unlock (&bank->lock);
REMOVE_ELEM( QUEUE.pp_sub, QUEUE.i_sub, j );
free( p_sub ); vlc_join (sub->thread, NULL);
} free (sub);
}
vlc_mutex_unlock( &QUEUE.lock );
} }
/***************************************************************************** /*****************************************************************************
...@@ -469,15 +526,32 @@ static void QueueMsg( vlc_object_t *p_this, int i_type, const char *psz_module, ...@@ -469,15 +526,32 @@ static void QueueMsg( vlc_object_t *p_this, int i_type, const char *psz_module,
p_item->psz_header = psz_header; p_item->psz_header = psz_header;
PrintMsg( p_this, p_item ); PrintMsg( p_this, p_item );
#define bank p_queue
if( p_queue->b_overflow ) if( p_item == &item )
{ {
free( p_item->psz_module ); free( p_item->psz_module );
free( p_item->psz_msg ); free( p_item->psz_msg );
free( p_item->psz_header ); free( p_item->psz_header );
for (int i = 0; i < bank->i_sub; i++)
bank->pp_sub[i]->overruns++;
} }
else
vlc_mutex_unlock ( &p_queue->lock ); {
for (int i = 0; i < bank->i_sub; i++)
{
msg_subscription_t *sub = bank->pp_sub[i];
if ((sub->end + 1 - sub->begin) % VLC_MSG_QSIZE)
{
sub->items[sub->end++] = p_item;
if (sub->end == VLC_MSG_QSIZE)
sub->end = 0;
}
else
sub->overruns++;
}
vlc_cond_broadcast (&bank->wait);
}
vlc_mutex_unlock (&bank->lock);
} }
/* following functions are local */ /* following functions are local */
...@@ -498,7 +572,7 @@ static void FlushMsg ( msg_bank_t *p_queue ) ...@@ -498,7 +572,7 @@ static void FlushMsg ( msg_bank_t *p_queue )
/* Check until which value we can free messages */ /* Check until which value we can free messages */
for( i_index = 0; i_index < p_queue->i_sub; i_index++ ) for( i_index = 0; i_index < p_queue->i_sub; i_index++ )
{ {
i_start = p_queue->pp_sub[ i_index ]->i_start; i_start = p_queue->pp_sub[ i_index ]->begin;
/* If this subscriber is late, we don't free messages before /* If this subscriber is late, we don't free messages before
* his i_start value, otherwise he'll miss messages */ * his i_start value, otherwise he'll miss messages */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment