Commit 41e5a3bd authored by Jean-Paul Saman's avatar Jean-Paul Saman

examples/dvbinfo: limit fifo size

Implemented a function fifo_size() that returns the current size of the fifo.
The dvbinfo example program uses fifo_size() to detect if the total size of
buffers inside the fifo exceeds a certain threshold. The threshold is currently
defined at compile time.
parent fcb8d11c
...@@ -44,6 +44,7 @@ struct fifo_s ...@@ -44,6 +44,7 @@ struct fifo_s
pthread_cond_t wait; pthread_cond_t wait;
bool b_force_wake; bool b_force_wake;
ssize_t i_count; ssize_t i_count;
size_t i_size; /* fifo size in bytes */
buffer_t *p_first; buffer_t *p_first;
buffer_t **pp_last; buffer_t **pp_last;
}; };
...@@ -73,6 +74,7 @@ fifo_t *fifo_new(void) ...@@ -73,6 +74,7 @@ fifo_t *fifo_new(void)
if (fifo == NULL) return NULL; if (fifo == NULL) return NULL;
fifo->i_count = 0; fifo->i_count = 0;
fifo->i_size = 0;
fifo->b_force_wake = false; fifo->b_force_wake = false;
fifo->p_first = NULL; fifo->p_first = NULL;
fifo->pp_last = &fifo->p_first; fifo->pp_last = &fifo->p_first;
...@@ -129,6 +131,14 @@ ssize_t fifo_count(fifo_t *fifo) ...@@ -129,6 +131,14 @@ ssize_t fifo_count(fifo_t *fifo)
return count; return count;
} }
size_t fifo_size(fifo_t *fifo)
{
pthread_mutex_lock(&fifo->lock);
size_t size = fifo->i_size;
pthread_mutex_unlock(&fifo->lock);
return size;
}
void fifo_push(fifo_t *fifo, buffer_t *buffer) void fifo_push(fifo_t *fifo, buffer_t *buffer)
{ {
buffer_t *p_last; buffer_t *p_last;
...@@ -148,6 +158,7 @@ void fifo_push(fifo_t *fifo, buffer_t *buffer) ...@@ -148,6 +158,7 @@ void fifo_push(fifo_t *fifo, buffer_t *buffer)
*fifo->pp_last = buffer; *fifo->pp_last = buffer;
fifo->pp_last = &p_last->p_next; fifo->pp_last = &p_last->p_next;
fifo->i_count += i_depth; fifo->i_count += i_depth;
fifo->i_size += buffer->i_size;
assert(fifo->p_first != NULL); assert(fifo->p_first != NULL);
assert(fifo->pp_last != NULL); assert(fifo->pp_last != NULL);
...@@ -175,6 +186,7 @@ buffer_t *fifo_pop(fifo_t *fifo) ...@@ -175,6 +186,7 @@ buffer_t *fifo_pop(fifo_t *fifo)
fifo->p_first = buffer->p_next; fifo->p_first = buffer->p_next;
fifo->i_count--; fifo->i_count--;
fifo->i_size -= buffer->i_size;
if (fifo->p_first == NULL) if (fifo->p_first == NULL)
{ {
......
...@@ -46,6 +46,7 @@ void buffer_free(buffer_t *buffer); ...@@ -46,6 +46,7 @@ void buffer_free(buffer_t *buffer);
* fifo_new() - create a new fifo holding buffer_t pointers * fifo_new() - create a new fifo holding buffer_t pointers
* fifo_free() - release fifo and all buffers contained therein * fifo_free() - release fifo and all buffers contained therein
* fifo_count()- number of buffers in fifo_t * fifo_count()- number of buffers in fifo_t
* fifo_size() - total size of buffers in fifo_t
* fifo_push() - push buffer at end of fifo * fifo_push() - push buffer at end of fifo
* fifo_pop() - pop buffer from start of fifo * fifo_pop() - pop buffer from start of fifo
* fifo_wake() - wake up fifo listeners * fifo_wake() - wake up fifo listeners
...@@ -53,6 +54,7 @@ void buffer_free(buffer_t *buffer); ...@@ -53,6 +54,7 @@ void buffer_free(buffer_t *buffer);
fifo_t *fifo_new(void); fifo_t *fifo_new(void);
void fifo_free(fifo_t *fifo); void fifo_free(fifo_t *fifo);
ssize_t fifo_count(fifo_t *fifo); ssize_t fifo_count(fifo_t *fifo);
size_t fifo_size(fifo_t *fifo);
void fifo_push(fifo_t *fifo, buffer_t *buffer); void fifo_push(fifo_t *fifo, buffer_t *buffer);
buffer_t *fifo_pop(fifo_t *fifo); buffer_t *fifo_pop(fifo_t *fifo);
void fifo_wake(fifo_t *fifo); void fifo_wake(fifo_t *fifo);
......
...@@ -64,6 +64,7 @@ ...@@ -64,6 +64,7 @@
# include "tcp.h" # include "tcp.h"
#endif #endif
#define FIFO_THRESHOLD_SIZE (400 * 1024 * 1024) /* threshold in bytes */
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
static const int i_summary_mode[] = { SUM_BANDWIDTH, SUM_TABLE, SUM_PACKET, SUM_WIRE }; static const int i_summary_mode[] = { SUM_BANDWIDTH, SUM_TABLE, SUM_PACKET, SUM_WIRE };
...@@ -77,6 +78,10 @@ typedef struct dvbinfo_capture_s ...@@ -77,6 +78,10 @@ typedef struct dvbinfo_capture_s
fifo_t *fifo; fifo_t *fifo;
fifo_t *empty; fifo_t *empty;
pthread_mutex_t lock;
pthread_cond_t fifo_full;
bool b_fifo_full;
size_t size; /* prefered capture size */ size_t size; /* prefered capture size */
params_t *params; params_t *params;
...@@ -277,6 +282,30 @@ static void *dvbinfo_capture(void *data) ...@@ -277,6 +282,30 @@ static void *dvbinfo_capture(void *data)
buffer->i_date = mdate(); buffer->i_date = mdate();
/* check fifo size */
if (fifo_size(capture->fifo) >= FIFO_THRESHOLD_SIZE)
{
pthread_mutex_lock(&capture->lock);
capture->b_fifo_full = true;
pthread_mutex_unlock(&capture->lock);
if (param->b_file)
{
/* wait till buffer becomes smaller again */
pthread_mutex_lock(&capture->lock);
while(capture->b_fifo_full)
pthread_cond_wait(&capture->fifo_full, &capture->lock);
pthread_mutex_unlock(&capture->lock);
}
else
{
libdvbpsi_log(capture->params, DVBINFO_LOG_ERROR,
"error fifo full discarding buffer");
fifo_push(capture->empty, buffer);
continue;
}
}
/* store buffer */ /* store buffer */
fifo_push(capture->fifo, buffer); fifo_push(capture->fifo, buffer);
buffer = NULL; buffer = NULL;
...@@ -374,6 +403,15 @@ static int dvbinfo_process(dvbinfo_capture_t *capture) ...@@ -374,6 +403,15 @@ static int dvbinfo_process(dvbinfo_capture_t *capture)
/* reuse buffer */ /* reuse buffer */
fifo_push(capture->empty, buffer); fifo_push(capture->empty, buffer);
buffer = NULL; buffer = NULL;
/* check fifo size */
if (fifo_size(capture->fifo) < FIFO_THRESHOLD_SIZE)
{
pthread_mutex_lock(&capture->lock);
capture->b_fifo_full = false;
pthread_cond_signal(&capture->fifo_full);
pthread_mutex_unlock(&capture->lock);
}
} }
libdvbpsi_exit(stream); libdvbpsi_exit(stream);
...@@ -407,6 +445,9 @@ int main(int argc, char **pp_argv) ...@@ -407,6 +445,9 @@ int main(int argc, char **pp_argv)
capture.params = param; capture.params = param;
capture.fifo = fifo_new(); capture.fifo = fifo_new();
capture.empty = fifo_new(); capture.empty = fifo_new();
capture.b_fifo_full = false;
pthread_mutex_init(&capture.lock, NULL);
pthread_cond_init(&capture.fifo_full, NULL);
static const struct option long_options[] = static const struct option long_options[] =
{ {
...@@ -461,6 +502,7 @@ int main(int argc, char **pp_argv) ...@@ -461,6 +502,7 @@ int main(int argc, char **pp_argv)
} }
/* */ /* */
param->pf_read = read; param->pf_read = read;
param->b_file = true;
} }
break; break;
...@@ -647,6 +689,9 @@ int main(int argc, char **pp_argv) ...@@ -647,6 +689,9 @@ int main(int argc, char **pp_argv)
fifo_free((&capture)->fifo); fifo_free((&capture)->fifo);
fifo_free((&capture)->empty); fifo_free((&capture)->empty);
pthread_mutex_destroy(&capture.lock);
pthread_cond_destroy(&capture.fifo_full);
#ifdef HAVE_SYS_SOCKET_H #ifdef HAVE_SYS_SOCKET_H
if (param->b_monitor) if (param->b_monitor)
closelog(); closelog();
......
...@@ -40,6 +40,7 @@ typedef struct params_s ...@@ -40,6 +40,7 @@ typedef struct params_s
int port; int port;
bool b_udp; bool b_udp;
bool b_tcp; bool b_tcp;
bool b_file;
/* */ /* */
int fd_in; int fd_in;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment