Commit 37b1ee9a authored by Francois Cartegnie's avatar Francois Cartegnie Committed by Jean-Baptiste Kempf

stream_filter: httplive: factorize segment read and remove vlc_object_alive

(cherry picked from commit 39b6e3b4f3071294c098958756cc912f8671d7b2)
Signed-off-by: default avatarJean-Baptiste Kempf <jb@videolan.org>
parent 09f30022
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include <vlc_stream.h> #include <vlc_stream.h>
#include <vlc_memory.h> #include <vlc_memory.h>
#include <vlc_gcrypt.h> #include <vlc_gcrypt.h>
#include <vlc_atomic.h>
/***************************************************************************** /*****************************************************************************
* Module descriptor * Module descriptor
...@@ -153,6 +154,7 @@ struct stream_sys_t ...@@ -153,6 +154,7 @@ struct stream_sys_t
vlc_cond_t wait; vlc_cond_t wait;
vlc_mutex_t lock; vlc_mutex_t lock;
bool paused; bool paused;
atomic_bool closing;
}; };
/**************************************************************************** /****************************************************************************
...@@ -1588,13 +1590,16 @@ static int hls_DownloadSegmentData(stream_t *s, hls_stream_t *hls, segment_t *se ...@@ -1588,13 +1590,16 @@ static int hls_DownloadSegmentData(stream_t *s, hls_stream_t *hls, segment_t *se
} }
mtime_t start = mdate(); mtime_t start = mdate();
if (hls_Download(s, segment) != VLC_SUCCESS)
int i_ret = hls_Download(s, segment);
if (i_ret != VLC_SUCCESS)
{ {
msg_Err(s, "downloading segment %d from stream %d failed", msg_Err(s, "downloading segment %d from stream %d failed",
segment->sequence, *cur_stream); segment->sequence, *cur_stream);
vlc_mutex_unlock(&segment->lock); vlc_mutex_unlock(&segment->lock);
return VLC_EGENERIC; return i_ret;
} }
mtime_t duration = mdate() - start; mtime_t duration = mdate() - start;
if (hls->bandwidth == 0 && segment->duration > 0) if (hls->bandwidth == 0 && segment->duration > 0)
{ {
...@@ -1603,14 +1608,13 @@ static int hls_DownloadSegmentData(stream_t *s, hls_stream_t *hls, segment_t *se ...@@ -1603,14 +1608,13 @@ static int hls_DownloadSegmentData(stream_t *s, hls_stream_t *hls, segment_t *se
} }
/* If the segment is encrypted, decode it */ /* If the segment is encrypted, decode it */
if (hls_DecodeSegmentData(s, hls, segment) != VLC_SUCCESS) i_ret = hls_DecodeSegmentData(s, hls, segment);
{
vlc_mutex_unlock(&segment->lock);
return VLC_EGENERIC;
}
vlc_mutex_unlock(&segment->lock); vlc_mutex_unlock(&segment->lock);
if(i_ret != VLC_SUCCESS)
return i_ret;
msg_Dbg(s, "downloaded segment %d from stream %d", msg_Dbg(s, "downloaded segment %d from stream %d",
segment->sequence, *cur_stream); segment->sequence, *cur_stream);
...@@ -1636,9 +1640,7 @@ static void* hls_Thread(void *p_this) ...@@ -1636,9 +1640,7 @@ static void* hls_Thread(void *p_this)
stream_t *s = (stream_t *)p_this; stream_t *s = (stream_t *)p_this;
stream_sys_t *p_sys = s->p_sys; stream_sys_t *p_sys = s->p_sys;
int canc = vlc_savecancel(); for( ;; )
while (vlc_object_alive(s))
{ {
hls_stream_t *hls = hls_Get(p_sys->hls_stream, p_sys->download.stream); hls_stream_t *hls = hls_Get(p_sys->hls_stream, p_sys->download.stream);
assert(hls); assert(hls);
...@@ -1654,6 +1656,7 @@ static void* hls_Thread(void *p_this) ...@@ -1654,6 +1656,7 @@ static void* hls_Thread(void *p_this)
{ {
/* wait */ /* wait */
vlc_mutex_lock(&p_sys->download.lock_wait); vlc_mutex_lock(&p_sys->download.lock_wait);
mutex_cleanup_push(&p_sys->download.lock_wait); //CO
while (((p_sys->download.segment - p_sys->playback.segment > 6) || while (((p_sys->download.segment - p_sys->playback.segment > 6) ||
(p_sys->download.segment >= count)) && (p_sys->download.segment >= count)) &&
(p_sys->download.seek == -1)) (p_sys->download.seek == -1))
...@@ -1661,35 +1664,36 @@ static void* hls_Thread(void *p_this) ...@@ -1661,35 +1664,36 @@ static void* hls_Thread(void *p_this)
vlc_cond_wait(&p_sys->download.wait, &p_sys->download.lock_wait); vlc_cond_wait(&p_sys->download.wait, &p_sys->download.lock_wait);
if (p_sys->b_live /*&& (mdate() >= p_sys->playlist.wakeup)*/) if (p_sys->b_live /*&& (mdate() >= p_sys->playlist.wakeup)*/)
break; break;
if (!vlc_object_alive(s))
break;
} }
vlc_cleanup_pop( ); //CO
/* */ /* */
if (p_sys->download.seek >= 0) if (p_sys->download.seek >= 0)
{ {
p_sys->download.segment = p_sys->download.seek; p_sys->download.segment = p_sys->download.seek;
p_sys->download.seek = -1; p_sys->download.seek = -1;
} }
vlc_mutex_unlock(&p_sys->download.lock_wait); vlc_mutex_unlock(&p_sys->download.lock_wait);
} }
if (!vlc_object_alive(s)) break; vlc_testcancel();
vlc_mutex_lock(&hls->lock); vlc_mutex_lock(&hls->lock);
segment_t *segment = segment_GetSegment(hls, p_sys->download.segment); segment_t *segment = segment_GetSegment(hls, p_sys->download.segment);
vlc_mutex_unlock(&hls->lock); vlc_mutex_unlock(&hls->lock);
int i_canc = vlc_savecancel();
if ((segment != NULL) && if ((segment != NULL) &&
(hls_DownloadSegmentData(s, hls, segment, &p_sys->download.stream) != VLC_SUCCESS)) (hls_DownloadSegmentData(s, hls, segment, &p_sys->download.stream) != VLC_SUCCESS))
{ {
if (!vlc_object_alive(s)) break;
if (!p_sys->b_live) if (!p_sys->b_live)
{ {
p_sys->b_error = true; p_sys->b_error = true;
break; break;
} }
} }
vlc_restorecancel(i_canc);
/* download succeeded */ /* download succeeded */
/* determine next segment to download */ /* determine next segment to download */
...@@ -1708,9 +1712,10 @@ static void* hls_Thread(void *p_this) ...@@ -1708,9 +1712,10 @@ static void* hls_Thread(void *p_this)
vlc_mutex_lock(&p_sys->read.lock_wait); vlc_mutex_lock(&p_sys->read.lock_wait);
vlc_cond_signal(&p_sys->read.wait); vlc_cond_signal(&p_sys->read.wait);
vlc_mutex_unlock(&p_sys->read.lock_wait); vlc_mutex_unlock(&p_sys->read.lock_wait);
vlc_testcancel();
} }
vlc_restorecancel(canc);
return NULL; return NULL;
} }
...@@ -1721,14 +1726,14 @@ static void* hls_Reload(void *p_this) ...@@ -1721,14 +1726,14 @@ static void* hls_Reload(void *p_this)
assert(p_sys->b_live); assert(p_sys->b_live);
int canc = vlc_savecancel();
double wait = 1.0; double wait = 1.0;
while (vlc_object_alive(s)) for ( ;; )
{ {
mtime_t now = mdate(); mtime_t now = mdate();
if (now >= p_sys->playlist.wakeup) if (now >= p_sys->playlist.wakeup)
{ {
int canc = vlc_savecancel();
/* reload the m3u8 if there are less than 3 segments what aren't downloaded */ /* reload the m3u8 if there are less than 3 segments what aren't downloaded */
if ( ( p_sys->download.segment - p_sys->playback.segment < 3 ) && if ( ( p_sys->download.segment - p_sys->playback.segment < 3 ) &&
( hls_ReloadPlaylist(s) != VLC_SUCCESS) ) ( hls_ReloadPlaylist(s) != VLC_SUCCESS) )
...@@ -1752,6 +1757,8 @@ static void* hls_Reload(void *p_this) ...@@ -1752,6 +1757,8 @@ static void* hls_Reload(void *p_this)
wait = 1.0; wait = 1.0;
} }
vlc_restorecancel(canc);
hls_stream_t *hls = hls_Get(p_sys->hls_stream, p_sys->download.stream); hls_stream_t *hls = hls_Get(p_sys->hls_stream, p_sys->download.stream);
assert(hls); assert(hls);
...@@ -1768,7 +1775,6 @@ static void* hls_Reload(void *p_this) ...@@ -1768,7 +1775,6 @@ static void* hls_Reload(void *p_this)
mwait(p_sys->playlist.wakeup); mwait(p_sys->playlist.wakeup);
} }
vlc_restorecancel(canc);
return NULL; return NULL;
} }
...@@ -1788,7 +1794,7 @@ static int Prefetch(stream_t *s, int *current) ...@@ -1788,7 +1794,7 @@ static int Prefetch(stream_t *s, int *current)
/* Download ~10s worth of segments of this HLS stream if they exist */ /* Download ~10s worth of segments of this HLS stream if they exist */
unsigned segment_amount = (0.5f + 10/hls->duration); unsigned segment_amount = (0.5f + 10/hls->duration);
for (int i = 0; i < __MIN(vlc_array_count(hls->segments), segment_amount); i++) for (unsigned i = 0; i < __MIN((unsigned)vlc_array_count(hls->segments), segment_amount); i++)
{ {
segment_t *segment = segment_GetSegment(hls, p_sys->download.segment); segment_t *segment = segment_GetSegment(hls, p_sys->download.segment);
if (segment == NULL ) if (segment == NULL )
...@@ -1824,6 +1830,8 @@ static int Prefetch(stream_t *s, int *current) ...@@ -1824,6 +1830,8 @@ static int Prefetch(stream_t *s, int *current)
/**************************************************************************** /****************************************************************************
* *
****************************************************************************/ ****************************************************************************/
#define HLS_READ_SIZE 65536
static int hls_Download(stream_t *s, segment_t *segment) static int hls_Download(stream_t *s, segment_t *segment)
{ {
stream_sys_t *p_sys = s->p_sys; stream_sys_t *p_sys = s->p_sys;
...@@ -1834,82 +1842,72 @@ static int hls_Download(stream_t *s, segment_t *segment) ...@@ -1834,82 +1842,72 @@ static int hls_Download(stream_t *s, segment_t *segment)
vlc_cond_wait(&p_sys->wait, &p_sys->lock); vlc_cond_wait(&p_sys->wait, &p_sys->lock);
vlc_mutex_unlock(&p_sys->lock); vlc_mutex_unlock(&p_sys->lock);
if (atomic_load(&p_sys->closing))
return VLC_EGENERIC;
stream_t *p_ts = stream_UrlNew(s, segment->url); stream_t *p_ts = stream_UrlNew(s, segment->url);
if (p_ts == NULL) if (p_ts == NULL)
return VLC_EGENERIC; return VLC_EGENERIC;
segment->size = stream_Size(p_ts); int64_t size = stream_Size(p_ts);
if (size < 0)
if (segment->size == 0) { size = 0;
int chunk_size = 65536;
segment->data = block_Alloc(chunk_size);
if (!segment->data)
goto nomem;
do {
if (segment->data->i_buffer - segment->size < chunk_size) {
chunk_size *= 2;
block_t *p_block = block_Realloc(segment->data, 0, segment->data->i_buffer + chunk_size);
if (!p_block) {
block_Release(segment->data);
segment->data = NULL;
goto nomem;
}
segment->data = p_block;
}
ssize_t length = stream_Read(p_ts, segment->data->p_buffer + segment->size, chunk_size); unsigned i_total_read = 0;
if (length <= 0) { int i_return = VLC_SUCCESS;
segment->data->i_buffer = segment->size;
break;
}
segment->size += length;
} while (vlc_object_alive(s));
stream_Delete(p_ts); block_t *p_segment_data = block_Alloc(size ? size : HLS_READ_SIZE);
return VLC_SUCCESS; if (!p_segment_data)
{
i_return = VLC_ENOMEM;
goto end;
} }
for( ;; )
segment->data = block_Alloc(segment->size);
if (segment->data == NULL)
goto nomem;
assert(segment->data->i_buffer == segment->size);
ssize_t curlen = 0;
do
{ {
/* NOTE: Beware the size reported for a segment by the HLS server may not /* NOTE: Beware the size reported for a segment by the HLS server may not
* be correct, when downloading the segment data. Therefore check the size * be correct, when downloading the segment data. Therefore check the size
* and enlarge the segment data block if necessary. * and enlarge the segment data block if necessary.
*/ */
uint64_t size = stream_Size(p_ts); uint64_t i_toread = (size > 0) ? (uint64_t) size - i_total_read: HLS_READ_SIZE;
if (size > segment->size)
if (i_total_read + i_toread > p_segment_data->i_buffer)
{ {
msg_Dbg(s, "size changed %"PRIu64, segment->size); msg_Dbg(s, "size changed to %"PRIu64, i_total_read + i_toread);
block_t *p_block = block_Realloc(segment->data, 0, size); block_t *p_realloc_block = block_Realloc(p_segment_data, 0, i_total_read + i_toread);
if (p_block == NULL) if (p_realloc_block == NULL)
{ {
block_Release(segment->data); if(p_segment_data)
segment->data = NULL; block_Release(p_segment_data);
goto nomem; i_return = VLC_ENOMEM;
goto end;
} }
segment->data = p_block; p_segment_data = p_realloc_block;
segment->size = size; }
assert(segment->data->i_buffer == segment->size);
p_block = NULL; int i_canc = vlc_savecancel();
int i_length = stream_Read(p_ts, &p_segment_data->p_buffer[i_total_read], HLS_READ_SIZE);
vlc_restorecancel(i_canc);
if (i_length <= 0)
{
if(size > 0 && i_total_read < size)
msg_Warn(s, "segment read %"PRIu64"/%"PRIu64, size - i_total_read, size );
p_segment_data->i_buffer = i_total_read;
break;
} }
ssize_t length = stream_Read(p_ts, segment->data->p_buffer + curlen, segment->size - curlen);
if (length <= 0) i_total_read += i_length;
if (atomic_load(&p_sys->closing))
break; break;
curlen += length; };
} while (vlc_object_alive(s));
stream_Delete(p_ts); segment->data = p_segment_data;
return VLC_SUCCESS; segment->size = p_segment_data->i_buffer;
nomem: end:
stream_Delete(p_ts); stream_Delete(p_ts);
return VLC_ENOMEM; return i_return;
} }
/* Read M3U8 file */ /* Read M3U8 file */
...@@ -2068,6 +2066,7 @@ static int Open(vlc_object_t *p_this) ...@@ -2068,6 +2066,7 @@ static int Open(vlc_object_t *p_this)
s->pf_control = Control; s->pf_control = Control;
p_sys->paused = false; p_sys->paused = false;
atomic_init(&p_sys->closing, false);
vlc_cond_init(&p_sys->wait); vlc_cond_init(&p_sys->wait);
vlc_mutex_init(&p_sys->lock); vlc_mutex_init(&p_sys->lock);
...@@ -2128,7 +2127,10 @@ static int Open(vlc_object_t *p_this) ...@@ -2128,7 +2127,10 @@ static int Open(vlc_object_t *p_this)
if (vlc_clone(&p_sys->thread, hls_Thread, s, VLC_THREAD_PRIORITY_INPUT)) if (vlc_clone(&p_sys->thread, hls_Thread, s, VLC_THREAD_PRIORITY_INPUT))
{ {
if (p_sys->b_live) if (p_sys->b_live)
{
vlc_cancel(p_sys->reload);
vlc_join(p_sys->reload, NULL); vlc_join(p_sys->reload, NULL);
}
goto fail_thread; goto fail_thread;
} }
...@@ -2171,6 +2173,7 @@ static void Close(vlc_object_t *p_this) ...@@ -2171,6 +2173,7 @@ static void Close(vlc_object_t *p_this)
vlc_mutex_lock(&p_sys->lock); vlc_mutex_lock(&p_sys->lock);
p_sys->paused = false; p_sys->paused = false;
atomic_store(&p_sys->closing, true);
vlc_cond_signal(&p_sys->wait); vlc_cond_signal(&p_sys->wait);
vlc_mutex_unlock(&p_sys->lock); vlc_mutex_unlock(&p_sys->lock);
...@@ -2182,10 +2185,18 @@ static void Close(vlc_object_t *p_this) ...@@ -2182,10 +2185,18 @@ static void Close(vlc_object_t *p_this)
vlc_cond_signal(&p_sys->download.wait); vlc_cond_signal(&p_sys->download.wait);
vlc_mutex_unlock(&p_sys->download.lock_wait); vlc_mutex_unlock(&p_sys->download.lock_wait);
vlc_cond_signal(&p_sys->read.wait); /* set closing first */
/* */ /* */
if (p_sys->b_live) if (p_sys->b_live)
{
vlc_cancel(p_sys->reload);
vlc_join(p_sys->reload, NULL); vlc_join(p_sys->reload, NULL);
}
vlc_cancel(p_sys->thread);
vlc_join(p_sys->thread, NULL); vlc_join(p_sys->thread, NULL);
vlc_mutex_destroy(&p_sys->download.lock_wait); vlc_mutex_destroy(&p_sys->download.lock_wait);
vlc_cond_destroy(&p_sys->download.wait); vlc_cond_destroy(&p_sys->download.wait);
...@@ -2387,7 +2398,7 @@ static int Read(stream_t *s, void *buffer, unsigned int i_read) ...@@ -2387,7 +2398,7 @@ static int Read(stream_t *s, void *buffer, unsigned int i_read)
while (length == 0) while (length == 0)
{ {
// In case an error occurred or the stream was closed return 0 // In case an error occurred or the stream was closed return 0
if (p_sys->b_error || !vlc_object_alive(s)) if (p_sys->b_error || atomic_load(&p_sys->closing))
return 0; return 0;
// Lock the mutex before trying to read to avoid a race condition with the download thread // Lock the mutex before trying to read to avoid a race condition with the download thread
...@@ -2666,7 +2677,8 @@ static int segment_Seek(stream_t *s, const uint64_t pos) ...@@ -2666,7 +2677,8 @@ static int segment_Seek(stream_t *s, const uint64_t pos)
(p_sys->download.segment < count))) (p_sys->download.segment < count)))
{ {
vlc_cond_wait(&p_sys->download.wait, &p_sys->download.lock_wait); vlc_cond_wait(&p_sys->download.wait, &p_sys->download.lock_wait);
if (!vlc_object_alive(s) || s->b_error) break; if (p_sys->b_error || atomic_load(&p_sys->closing))
break;
} }
vlc_mutex_unlock(&p_sys->download.lock_wait); vlc_mutex_unlock(&p_sys->download.lock_wait);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment