Commit 561195e1 authored by Rémi Denis-Courmont's avatar Rémi Denis-Courmont

cache_read: "method 2" stream_access rewritten as a stream filter

parent 682e1602
...@@ -66,6 +66,7 @@ $Id$ ...@@ -66,6 +66,7 @@ $Id$
* bonjour: Zeroconf services discovery * bonjour: Zeroconf services discovery
* bpg: BPG image decoder using libbpg * bpg: BPG image decoder using libbpg
* caca: color ASCII art video output using libcaca * caca: color ASCII art video output using libcaca
* cache_read: byte stream caching stream filter
* caf: CAF demuxer * caf: CAF demuxer
* canvas: Automatically resize and padd a video * canvas: Automatically resize and padd a video
* caopengllayer: CoreAnimation OpenGL video output * caopengllayer: CoreAnimation OpenGL video output
......
...@@ -2,6 +2,9 @@ stream_filterdir = $(pluginsdir)/stream_filter ...@@ -2,6 +2,9 @@ stream_filterdir = $(pluginsdir)/stream_filter
stream_filter_LTLIBRARIES = stream_filter_LTLIBRARIES =
libcache_read_plugin_la_SOURCES = stream_filter/cache_read.c
stream_filter_LTLIBRARIES += libcache_read_plugin.la
libdecomp_plugin_la_SOURCES = stream_filter/decomp.c libdecomp_plugin_la_SOURCES = stream_filter/decomp.c
libdecomp_plugin_la_LIBADD = $(LIBPTHREAD) libdecomp_plugin_la_LIBADD = $(LIBPTHREAD)
if !HAVE_WIN32 if !HAVE_WIN32
......
/*****************************************************************************
* cache_read.c
*****************************************************************************
* Copyright (C) 1999-2004 VLC authors and VideoLAN
* $Id$
*
* Authors: Laurent Aimar <fenrir@via.ecp.fr>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <vlc_common.h>
#include <vlc_plugin.h>
#include <vlc_stream.h>
#include <vlc_interrupt.h>
// #define STREAM_DEBUG 1
/*
* Complex scheme using mutliple track to avoid seeking
*/
/* How many tracks we have, currently only used for stream mode */
#ifdef OPTIMIZE_MEMORY
# define STREAM_CACHE_TRACK 1
/* Max size of our cache 128Ko per track */
# define STREAM_CACHE_SIZE (STREAM_CACHE_TRACK*1024*128)
#else
# define STREAM_CACHE_TRACK 3
/* Max size of our cache 4Mo per track */
# define STREAM_CACHE_SIZE (4*STREAM_CACHE_TRACK*1024*1024)
#endif
/* How many data we try to prebuffer
* XXX it should be small to avoid useless latency but big enough for
* efficient demux probing */
#define STREAM_CACHE_PREBUFFER_SIZE (128)
/* Method:
* - We use ring buffers, only one if unseekable, all if seekable
* - Upon seek date current ring, then search if one ring match the pos,
* yes: switch to it, seek the access to match the end of the ring
* no: search the ring with i_end the closer to i_pos,
* if close enough, read data and use this ring
* else use the oldest ring, seek and use it.
*
* TODO: - with access non seekable: use all space available for only one ring, but
* we have to support seekable/non-seekable switch on the fly.
* - compute a good value for i_read_size
* - ?
*/
#define STREAM_READ_ATONCE 1024
#define STREAM_CACHE_TRACK_SIZE (STREAM_CACHE_SIZE/STREAM_CACHE_TRACK)
typedef struct
{
mtime_t date;
uint64_t i_start;
uint64_t i_end;
uint8_t *p_buffer;
} stream_track_t;
struct stream_sys_t
{
uint64_t i_pos; /* Current reading offset */
unsigned i_offset; /* Buffer offset in the current track */
int i_tk; /* Current track */
stream_track_t tk[STREAM_CACHE_TRACK];
/* Global buffer */
uint8_t *p_buffer;
/* */
unsigned i_used; /* Used since last read */
unsigned i_read_size;
struct
{
/* Stat about reading data */
uint64_t i_read_count;
uint64_t i_bytes;
uint64_t i_read_time;
} stat;
};
static int AStreamRefillStream(stream_t *s)
{
stream_sys_t *sys = s->p_sys;
stream_track_t *tk = &sys->tk[sys->i_tk];
/* We read but won't increase i_start after initial start + offset */
int i_toread =
__MIN(sys->i_used, STREAM_CACHE_TRACK_SIZE -
(tk->i_end - tk->i_start - sys->i_offset));
bool b_read = false;
if (i_toread <= 0) return VLC_EGENERIC; /* EOF */
#ifdef STREAM_DEBUG
msg_Dbg(s, "AStreamRefillStream: used=%d toread=%d",
sys->i_used, i_toread);
#endif
mtime_t start = mdate();
while (i_toread > 0)
{
int i_off = tk->i_end % STREAM_CACHE_TRACK_SIZE;
int i_read;
if (vlc_killed())
return VLC_EGENERIC;
i_read = __MIN(i_toread, STREAM_CACHE_TRACK_SIZE - i_off);
i_read = stream_Read(s->p_source, &tk->p_buffer[i_off], i_read);
/* msg_Dbg(s, "AStreamRefillStream: read=%d", i_read); */
if (i_read < 0)
{
continue;
}
else if (i_read == 0)
{
if (!b_read)
return VLC_EGENERIC;
return VLC_SUCCESS;
}
b_read = true;
/* Update end */
tk->i_end += i_read;
/* Windows of STREAM_CACHE_TRACK_SIZE */
if (tk->i_start + STREAM_CACHE_TRACK_SIZE < tk->i_end)
{
unsigned i_invalid = tk->i_end - tk->i_start - STREAM_CACHE_TRACK_SIZE;
tk->i_start += i_invalid;
sys->i_offset -= i_invalid;
}
i_toread -= i_read;
sys->i_used -= i_read;
sys->stat.i_bytes += i_read;
sys->stat.i_read_count++;
}
sys->stat.i_read_time += mdate() - start;
return VLC_SUCCESS;
}
static void AStreamPrebufferStream(stream_t *s)
{
stream_sys_t *sys = s->p_sys;
mtime_t start = mdate();
bool first = true;
msg_Dbg(s, "starting pre-buffering");
for (;;)
{
stream_track_t *tk = &sys->tk[sys->i_tk];
mtime_t now = mdate();
int i_read;
int i_buffered = tk->i_end - tk->i_start;
if (vlc_killed() || i_buffered >= STREAM_CACHE_PREBUFFER_SIZE)
{
int64_t i_byterate;
/* Update stat */
sys->stat.i_bytes = i_buffered;
sys->stat.i_read_time = now - start;
i_byterate = (CLOCK_FREQ * sys->stat.i_bytes) /
(sys->stat.i_read_time+1);
msg_Dbg(s, "pre-buffering done %"PRId64" bytes in %"PRId64"s - "
"%"PRId64" KiB/s", sys->stat.i_bytes,
sys->stat.i_read_time / CLOCK_FREQ, i_byterate / 1024);
break;
}
i_read = STREAM_CACHE_TRACK_SIZE - i_buffered;
i_read = __MIN((int)sys->i_read_size, i_read);
i_read = stream_Read(s->p_source, &tk->p_buffer[i_buffered], i_read);
if (i_read < 0)
continue;
else if (i_read == 0)
break; /* EOF */
if (first)
{
msg_Dbg(s, "received first data after %"PRId64" ms",
(mdate() - start) / 1000);
first = false;
}
tk->i_end += i_read;
sys->stat.i_read_count++;
}
}
/****************************************************************************
* AStreamControlReset:
****************************************************************************/
static void AStreamControlReset(stream_t *s)
{
stream_sys_t *sys = s->p_sys;
sys->i_pos = stream_Tell(s->p_source);
/* Setup our tracks */
sys->i_offset = 0;
sys->i_tk = 0;
sys->i_used = 0;
for (unsigned i = 0; i < STREAM_CACHE_TRACK; i++)
{
sys->tk[i].date = 0;
sys->tk[i].i_start = sys->i_pos;
sys->tk[i].i_end = sys->i_pos;
}
/* Do the prebuffering */
AStreamPrebufferStream(s);
}
static ssize_t AStreamReadNoSeekStream(stream_t *s, void *buf, size_t len)
{
stream_sys_t *sys = s->p_sys;
stream_track_t *tk = &sys->tk[sys->i_tk];
if (tk->i_start >= tk->i_end)
return 0; /* EOF */
#ifdef STREAM_DEBUG
msg_Dbg(s, "AStreamReadStream: %zd pos=%"PRId64" tk=%d start=%"PRId64
" offset=%d end=%"PRId64, len, sys->i_pos, p_sys->i_tk,
tk->i_start, sys->i_offset, tk->i_end);
#endif
unsigned i_off = (tk->i_start + sys->i_offset) % STREAM_CACHE_TRACK_SIZE;
size_t i_current = __MIN(tk->i_end - tk->i_start - sys->i_offset,
STREAM_CACHE_TRACK_SIZE - i_off);
ssize_t i_copy = __MIN(i_current, len);
if (i_copy <= 0)
return 0; /* EOF */
/* Copy data */
/* msg_Dbg(s, "AStreamReadStream: copy %zd", i_copy); */
if (buf != NULL)
memcpy(buf, &tk->p_buffer[i_off], i_copy);
sys->i_offset += i_copy;
/* Update pos now */
sys->i_pos += i_copy;
/* */
sys->i_used += i_copy;
if (tk->i_end + i_copy <= tk->i_start + sys->i_offset + len)
{
const size_t i_read_requested = VLC_CLIP(len - i_copy,
STREAM_READ_ATONCE / 2,
STREAM_READ_ATONCE * 10);
if (sys->i_used < i_read_requested)
sys->i_used = i_read_requested;
AStreamRefillStream(s);
}
return i_copy;
}
static int AStreamSeekStream(stream_t *s, uint64_t i_pos)
{
stream_sys_t *sys = s->p_sys;
stream_track_t *p_current = &sys->tk[sys->i_tk];
if (p_current->i_start >= p_current->i_end && i_pos >= p_current->i_end)
return 0; /* EOF */
#ifdef STREAM_DEBUG
msg_Dbg(s, "AStreamSeekStream: to %"PRId64" pos=%"PRId64
" tk=%d start=%"PRId64" offset=%d end=%"PRId64,
i_pos, sys->i_pos, sys->i_tk, p_current->i_start,
sys->i_offset, p_current->i_end);
#endif
bool b_aseek;
stream_Control(s->p_source, STREAM_CAN_SEEK, &b_aseek);
if (!b_aseek && i_pos < p_current->i_start)
{
msg_Warn(s, "AStreamSeekStream: can't seek");
return VLC_EGENERIC;
}
bool b_afastseek;
stream_Control(s->p_source, STREAM_CAN_FASTSEEK, &b_afastseek);
/* FIXME compute seek cost (instead of static 'stupid' value) */
uint64_t i_skip_threshold;
if (b_aseek)
i_skip_threshold = b_afastseek ? 128 : 3 * sys->i_read_size;
else
i_skip_threshold = INT64_MAX;
/* Date the current track */
p_current->date = mdate();
/* Search a new track slot */
stream_track_t *tk = NULL;
int i_tk_idx = -1;
/* Prefer the current track */
if (p_current->i_start <= i_pos && i_pos <= p_current->i_end + i_skip_threshold)
{
tk = p_current;
i_tk_idx = sys->i_tk;
}
if (!tk)
{
/* Try to maximize already read data */
for (int i = 0; i < STREAM_CACHE_TRACK; i++)
{
stream_track_t *t = &sys->tk[i];
if (t->i_start > i_pos || i_pos > t->i_end)
continue;
if (!tk || tk->i_end < t->i_end)
{
tk = t;
i_tk_idx = i;
}
}
}
if (!tk)
{
/* Use the oldest unused */
for (int i = 0; i < STREAM_CACHE_TRACK; i++)
{
stream_track_t *t = &sys->tk[i];
if (!tk || tk->date > t->date)
{
tk = t;
i_tk_idx = i;
}
}
}
assert(i_tk_idx >= 0 && i_tk_idx < STREAM_CACHE_TRACK);
if (tk != p_current)
i_skip_threshold = 0;
if (tk->i_start <= i_pos && i_pos <= tk->i_end + i_skip_threshold)
{
#ifdef STREAM_DEBUG
msg_Err(s, "AStreamSeekStream: reusing %d start=%"PRId64
" end=%"PRId64"(%s)",
i_tk_idx, tk->i_start, tk->i_end,
tk != p_current ? "seek" : i_pos > tk->i_end ? "skip" : "noseek");
#endif
if (tk != p_current)
{
assert(b_aseek);
/* Seek at the end of the buffer
* TODO it is stupid to seek now, it would be better to delay it
*/
if (stream_Seek(s->p_source, tk->i_end))
return VLC_EGENERIC;
}
else if (i_pos > tk->i_end)
{
uint64_t i_skip = i_pos - tk->i_end;
while (i_skip > 0)
{
const int i_read_max = __MIN(10 * STREAM_READ_ATONCE, i_skip);
if (AStreamReadNoSeekStream(s, NULL, i_read_max) != i_read_max)
return VLC_EGENERIC;
i_skip -= i_read_max;
}
}
}
else
{
#ifdef STREAM_DEBUG
msg_Err(s, "AStreamSeekStream: hard seek");
#endif
/* Nothing good, seek and choose oldest segment */
if (stream_Seek(s->p_source, i_pos))
return VLC_EGENERIC;
tk->i_start = i_pos;
tk->i_end = i_pos;
}
sys->i_offset = i_pos - tk->i_start;
sys->i_tk = i_tk_idx;
sys->i_pos = i_pos;
/* If there is not enough data left in the track, refill */
/* TODO How to get a correct value for
* - refilling threshold
* - how much to refill
*/
if (tk->i_end < tk->i_start + sys->i_offset + sys->i_read_size)
{
if (sys->i_used < STREAM_READ_ATONCE / 2)
sys->i_used = STREAM_READ_ATONCE / 2;
if (AStreamRefillStream(s) && i_pos >= tk->i_end)
return VLC_EGENERIC;
}
return VLC_SUCCESS;
}
static ssize_t AStreamReadStream(stream_t *s, void *p_read, size_t i_read)
{
stream_sys_t *sys = s->p_sys;
if (!p_read)
{
const uint64_t i_pos_wanted = sys->i_pos + i_read;
if (AStreamSeekStream(s, i_pos_wanted))
{
if (sys->i_pos != i_pos_wanted)
return 0;
}
return i_read;
}
return AStreamReadNoSeekStream(s, p_read, i_read);
}
/****************************************************************************
* AStreamControl:
****************************************************************************/
static int AStreamControl(stream_t *s, int i_query, va_list args)
{
stream_sys_t *sys = s->p_sys;
switch(i_query)
{
case STREAM_CAN_SEEK:
case STREAM_CAN_FASTSEEK:
case STREAM_CAN_PAUSE:
case STREAM_CAN_CONTROL_PACE:
case STREAM_IS_DIRECTORY:
case STREAM_GET_SIZE:
case STREAM_GET_PTS_DELAY:
case STREAM_GET_TITLE_INFO:
case STREAM_GET_TITLE:
case STREAM_GET_SEEKPOINT:
case STREAM_GET_META:
case STREAM_GET_CONTENT_TYPE:
case STREAM_GET_SIGNAL:
case STREAM_SET_PAUSE_STATE:
case STREAM_SET_PRIVATE_ID_STATE:
case STREAM_SET_PRIVATE_ID_CA:
case STREAM_GET_PRIVATE_ID_STATE:
return stream_vaControl(s->p_source, i_query, args);
case STREAM_GET_POSITION:
*va_arg(args, uint64_t *) = sys->i_pos;
break;
case STREAM_SET_POSITION:
{
uint64_t offset = va_arg(args, uint64_t);
return AStreamSeekStream(s, offset);
}
case STREAM_SET_TITLE:
case STREAM_SET_SEEKPOINT:
{
int ret = stream_vaControl(s->p_source, i_query, args);
if (ret == VLC_SUCCESS)
AStreamControlReset(s);
return ret;
}
case STREAM_SET_RECORD_STATE:
default:
msg_Err(s, "invalid stream_vaControl query=0x%x", i_query);
return VLC_EGENERIC;
}
return VLC_SUCCESS;
}
static int Open(vlc_object_t *obj)
{
stream_t *s = (stream_t *)obj;
stream_sys_t *sys = malloc(sizeof (*sys));
if (unlikely(sys == NULL))
return VLC_ENOMEM;
/* Common field */
sys->i_pos = stream_Tell(s->p_source);
/* Stats */
sys->stat.i_bytes = 0;
sys->stat.i_read_time = 0;
sys->stat.i_read_count = 0;
msg_Dbg(s, "Using stream method for AStream*");
/* Allocate/Setup our tracks */
sys->i_offset = 0;
sys->i_tk = 0;
sys->p_buffer = malloc(STREAM_CACHE_SIZE);
if (sys->p_buffer == NULL)
{
free(sys);
return VLC_ENOMEM;
}
sys->i_used = 0;
sys->i_read_size = STREAM_READ_ATONCE;
#if STREAM_READ_ATONCE < 256
# error "Invalid STREAM_READ_ATONCE value"
#endif
for (unsigned i = 0; i < STREAM_CACHE_TRACK; i++)
{
sys->tk[i].date = 0;
sys->tk[i].i_start = sys->i_pos;
sys->tk[i].i_end = sys->i_pos;
sys->tk[i].p_buffer = &sys->p_buffer[i * STREAM_CACHE_TRACK_SIZE];
}
s->p_sys = sys;
/* Do the prebuffering */
AStreamPrebufferStream(s);
if (sys->tk[sys->i_tk].i_end <= 0)
{
msg_Err(s, "cannot pre fill buffer");
free(sys->p_buffer);
free(sys);
return VLC_EGENERIC;
}
s->pf_read = AStreamReadStream;
s->pf_control = AStreamControl;
return VLC_SUCCESS;
}
/****************************************************************************
* AStreamDestroy:
****************************************************************************/
static void Close(vlc_object_t *obj)
{
stream_t *s = (stream_t *)obj;
stream_sys_t *sys = s->p_sys;
free(sys->p_buffer);
free(sys);
}
vlc_module_begin()
set_category(CAT_INPUT)
set_subcategory(SUBCAT_INPUT_STREAM_FILTER)
set_capability("stream_filter", 0)
set_description(N_("Byte stream cache"))
set_callbacks(Open, Close)
vlc_module_end()
...@@ -1033,6 +1033,7 @@ modules/services_discovery/upnp.cpp ...@@ -1033,6 +1033,7 @@ modules/services_discovery/upnp.cpp
modules/services_discovery/windrive.c modules/services_discovery/windrive.c
modules/services_discovery/xcb_apps.c modules/services_discovery/xcb_apps.c
modules/stream_filter/aribcam.c modules/stream_filter/aribcam.c
modules/stream_filter/cache_read.c
modules/stream_filter/decomp.c modules/stream_filter/decomp.c
modules/stream_filter/hds/hds.c modules/stream_filter/hds/hds.c
modules/stream_filter/record.c modules/stream_filter/record.c
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment