Commit c04ebc35 authored by Francois Cartegnie's avatar Francois Cartegnie

stream_filter: dash: change as demuxer

parent 33d200ee
......@@ -92,8 +92,6 @@ libdash_plugin_la_SOURCES = \
stream_filter/dash/xml/Node.h \
stream_filter/dash/dash.cpp \
stream_filter/dash/dash.hpp \
stream_filter/dash/DASHDownloader.cpp \
stream_filter/dash/DASHDownloader.h \
stream_filter/dash/DASHManager.cpp \
stream_filter/dash/DASHManager.h \
stream_filter/dash/Helper.cpp \
......
/*
* DASHDownloader.h
*****************************************************************************
* Copyright (C) 2010 - 2011 Klagenfurt University
*
* Created on: Aug 10, 2010
* Authors: Christopher Mueller <christopher.mueller@itec.uni-klu.ac.at>
* Christian Timmerer <christian.timmerer@itec.uni-klu.ac.at>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "DASHDownloader.h"
using namespace dash;
using namespace dash::http;
using namespace dash::logic;
using namespace dash::buffer;
DASHDownloader::DASHDownloader (HTTPConnectionManager *conManager_, BlockBuffer *buffer_)
{
conManager = conManager_;
buffer = buffer_;
}
DASHDownloader::~DASHDownloader ()
{
buffer->setEOF(true);
vlc_join(this->dashDLThread, NULL);
}
bool DASHDownloader::start ()
{
if(vlc_clone(&(this->dashDLThread), download, static_cast<void*>(this), VLC_THREAD_PRIORITY_LOW))
return false;
return true;
}
void* DASHDownloader::download(void *dashDownloader)
{
DASHDownloader *me = static_cast<DASHDownloader*>(dashDownloader);
int ret = 0;
do
{
block_t *block = NULL;
ret = me->conManager->read(Streams::VIDEO, &block);
if(ret > 0)
me->buffer->put(block);
}while(ret > 0 && !me->buffer->getEOF());
me->buffer->setEOF(true);
return NULL;
}
/*
* DASHDownloader.h
*****************************************************************************
* Copyright (C) 2010 - 2011 Klagenfurt University
*
* Created on: Aug 10, 2010
* Authors: Christopher Mueller <christopher.mueller@itec.uni-klu.ac.at>
* Christian Timmerer <christian.timmerer@itec.uni-klu.ac.at>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifndef DASHDOWNLOADER_H_
#define DASHDOWNLOADER_H_
#include "http/HTTPConnectionManager.h"
#include "adaptationlogic/IAdaptationLogic.h"
#include "buffer/BlockBuffer.h"
#define CHUNKDEFAULTBITRATE 1
#include <iostream>
namespace dash
{
class DASHDownloader
{
public:
DASHDownloader (http::HTTPConnectionManager *conManager, buffer::BlockBuffer *buffer);
virtual ~DASHDownloader ();
bool start ();
static void* download (void *);
private:
vlc_thread_t dashDLThread;
http::HTTPConnectionManager *conManager;
buffer::BlockBuffer *buffer;
};
}
#endif /* DASHDOWNLOADER_H_ */
......@@ -25,11 +25,13 @@
# include "config.h"
#endif
#define __STDC_CONSTANT_MACROS
#include "DASHManager.h"
#include "adaptationlogic/AdaptationLogicFactory.h"
using namespace dash;
using namespace dash::http;
using namespace dash::xml;
using namespace dash::logic;
using namespace dash::mpd;
using namespace dash::buffer;
......@@ -37,64 +39,103 @@ using namespace dash::buffer;
DASHManager::DASHManager ( MPD *mpd,
IAdaptationLogic::LogicType type, stream_t *stream) :
conManager ( NULL ),
adaptationLogic( NULL ),
logicType ( type ),
mpd ( mpd ),
stream ( stream ),
downloader ( NULL ),
buffer ( NULL )
stream ( stream )
{
for(int i=0; i<Streams::count; i++)
streams[i] = NULL;
}
DASHManager::~DASHManager ()
{
delete this->downloader;
delete this->buffer;
delete this->conManager;
delete this->adaptationLogic;
delete conManager;
for(int i=0; i<Streams::count; i++)
delete streams[i];
}
bool DASHManager::start()
bool DASHManager::start(demux_t *demux)
{
adaptationLogic = AdaptationLogicFactory::create( logicType, mpd );
if ( this->adaptationLogic == NULL )
const Period *period = mpd->getFirstPeriod();
if(!period)
return false;
this->conManager = new dash::http::HTTPConnectionManager(this->adaptationLogic, this->stream);
this->buffer = new BlockBuffer(this->stream);
this->downloader = new DASHDownloader(this->conManager, this->buffer);
for(int i=0; i<Streams::count; i++)
{
Streams::Type type = static_cast<Streams::Type>(i);
const AdaptationSet *set = period->getAdaptationSet(type);
if(set)
{
streams[type] = new Streams::Stream(type);
try
{
streams[type]->init(demux, AdaptationLogicFactory::create( logicType, mpd ) );
} catch (int) {
delete streams[type];
streams[type] = NULL;
}
}
}
this->buffer->attach(this->adaptationLogic);
conManager = new HTTPConnectionManager(stream);
if(!conManager)
return false;
return this->downloader->start();
return true;
}
size_t DASHManager::read()
{
size_t i_ret = 0;
for(int type=0; type<Streams::count; type++)
{
if(!streams[type])
continue;
i_ret += streams[type]->read(conManager);
}
return i_ret;
}
int DASHManager::read( void *p_buffer, size_t len )
mtime_t DASHManager::getPCR() const
{
return this->buffer->get(p_buffer, len);
mtime_t pcr = VLC_TS_INVALID;
for(int type=0; type<Streams::count; type++)
{
if(!streams[type])
continue;
if(pcr == VLC_TS_INVALID || pcr > streams[type]->getPCR())
pcr = streams[type]->getPCR();
}
return pcr;
}
int DASHManager::seekBackwards( unsigned i_len )
int DASHManager::getGroup() const
{
return this->buffer->seekBackwards( i_len );
for(int type=0; type<Streams::count; type++)
{
if(!streams[type])
continue;
return streams[type]->getGroup();
}
return -1;
}
int DASHManager::peek( const uint8_t **pp_peek, size_t i_peek )
int DASHManager::esCount() const
{
return this->buffer->peek(pp_peek, i_peek);
int es = 0;
for(int type=0; type<Streams::count; type++)
{
if(!streams[type])
continue;
es += streams[type]->esCount();
}
return es;
}
mtime_t DASHManager::getDuration() const
{
if (mpd->isLive())
{
return 0;
}
else
{
const Representation *rep = adaptationLogic->getCurrentRepresentation(Streams::VIDEO);
if ( !rep )
return 0;
else
return mpd->getDuration() * rep->getBandwidth() / 8;
}
return CLOCK_FREQ * mpd->getDuration();
}
......@@ -26,11 +26,7 @@
#define DASHMANAGER_H_
#include "http/HTTPConnectionManager.h"
#include "xml/Node.h"
#include "adaptationlogic/IAdaptationLogic.h"
#include "adaptationlogic/AdaptationLogicFactory.h"
#include "buffer/BlockBuffer.h"
#include "DASHDownloader.h"
#include "mpd/MPD.h"
namespace dash
......@@ -42,22 +38,21 @@ namespace dash
logic::IAdaptationLogic::LogicType type, stream_t *stream);
virtual ~DASHManager ();
bool start ();
int read ( void *p_buffer, size_t len );
int peek ( const uint8_t **pp_peek, size_t i_peek );
int seekBackwards ( unsigned len );
bool start (demux_t *);
size_t read();
mtime_t getDuration() const;
mtime_t getPCR() const;
int getGroup() const;
int esCount() const;
private:
http::HTTPConnectionManager *conManager;
logic::IAdaptationLogic *adaptationLogic;
logic::IAdaptationLogic::LogicType logicType;
mpd::MPD *mpd;
stream_t *stream;
DASHDownloader *downloader;
buffer::BlockBuffer *buffer;
Streams::Stream *streams[Streams::count];
};
}
#endif /* DASHMANAGER_H_ */
......@@ -17,18 +17,40 @@
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#define __STDC_CONSTANT_MACROS
#include "Streams.hpp"
#include "adaptationlogic/IAdaptationLogic.h"
#include "adaptationlogic/AdaptationLogicFactory.h"
#include <vlc_stream.h>
#include <vlc_demux.h>
using namespace dash::Streams;
using namespace dash::http;
using namespace dash::logic;
Stream::Stream(const std::string &mime)
{
type = mimeToType(mime);
init(mimeToType(mime));
}
Stream::Stream(const Type type)
{
this->type = type;
init(type);
}
void Stream::init(const Type type_)
{
type = type_;
output = NULL;
currentChunk = NULL;
eof = false;
}
Stream::~Stream()
{
delete currentChunk;
delete adaptationLogic;
delete output;
}
Type Stream::mimeToType(const std::string &mime)
......@@ -45,7 +67,214 @@ Type Stream::mimeToType(const std::string &mime)
return mimetype;
}
void Stream::init(demux_t *demux, IAdaptationLogic *logic)
{
output = new Streams::MP4StreamOutput(demux);
adaptationLogic = logic;
}
bool Stream::isEOF() const
{
return false;
}
mtime_t Stream::getPCR() const
{
return output->getPCR();
}
int Stream::getGroup() const
{
return output->getGroup();
}
int Stream::esCount() const
{
return output->esCount();
}
bool Stream::operator ==(const Stream &stream) const
{
return stream.type == type;
}
Chunk * Stream::getChunk()
{
if (currentChunk == NULL)
{
currentChunk = adaptationLogic->getNextChunk(type);
if (currentChunk == NULL)
eof = true;
}
return currentChunk;
}
size_t Stream::read(HTTPConnectionManager *connManager)
{
Chunk *chunk = getChunk();
if(!chunk)
return 0;
if(!chunk->getConnection())
{
if(!connManager->connectChunk(chunk))
return 0;
}
size_t readsize = 0;
/* Because we don't know Chunk size at start, we need to get size
from content length */
if(chunk->getBytesRead() == 0)
{
if(chunk->getConnection()->query(chunk->getPath()) == false)
readsize = 32768; /* we don't handle retry here :/ */
else
readsize = chunk->getBytesToRead();
}
else
{
readsize = chunk->getBytesToRead();
}
if (readsize > 128000)
readsize = 32768;
block_t *block = block_Alloc(readsize);
if(!block)
return 0;
mtime_t time = mdate();
ssize_t ret = chunk->getConnection()->read(block->p_buffer, readsize);
time = mdate() - time;
if(ret <= 0)
{
block_Release(block);
chunk->getConnection()->releaseChunk();
currentChunk = NULL;
delete chunk;
return 0;
}
else
{
block->i_buffer = (size_t)ret;
adaptationLogic->updateDownloadRate(block->i_buffer, time);
if (chunk->getBytesToRead() == 0)
{
chunk->onDownload(block->p_buffer, block->i_buffer);
chunk->getConnection()->releaseChunk();
currentChunk = NULL;
delete chunk;
}
}
readsize = block->i_buffer;
output->pushBlock(block);
return readsize;
}
AbstractStreamOutput::AbstractStreamOutput(demux_t *demux)
{
realdemux = demux;
demuxstream = NULL;
pcr = VLC_TS_0;
group = -1;
escount = 0;
fakeesout = new es_out_t;
if (!fakeesout)
throw VLC_ENOMEM;
fakeesout->pf_add = esOutAdd;
fakeesout->pf_control = esOutControl;
fakeesout->pf_del = esOutDel;
fakeesout->pf_destroy = esOutDestroy;
fakeesout->pf_send = esOutSend;
fakeesout->p_sys = (es_out_sys_t*) this;
}
AbstractStreamOutput::~AbstractStreamOutput()
{
if (demuxstream)
stream_Delete(demuxstream);
delete fakeesout;
}
mtime_t AbstractStreamOutput::getPCR() const
{
return pcr;
}
int AbstractStreamOutput::getGroup() const
{
return group;
}
int AbstractStreamOutput::esCount() const
{
return escount;
}
void AbstractStreamOutput::pushBlock(block_t *block)
{
stream_DemuxSend(demuxstream, block);
}
/* Static callbacks */
es_out_id_t * AbstractStreamOutput::esOutAdd(es_out_t *fakees, const es_format_t *p_fmt)
{
AbstractStreamOutput *me = (AbstractStreamOutput *) fakees->p_sys;
me->escount++;
return me->realdemux->out->pf_add(me->realdemux->out, p_fmt);
}
int AbstractStreamOutput::esOutSend(es_out_t *fakees, es_out_id_t *p_es, block_t *p_block)
{
AbstractStreamOutput *me = (AbstractStreamOutput *) fakees->p_sys;
return me->realdemux->out->pf_send(me->realdemux->out, p_es, p_block);
}
void AbstractStreamOutput::esOutDel(es_out_t *fakees, es_out_id_t *p_es)
{
AbstractStreamOutput *me = (AbstractStreamOutput *) fakees->p_sys;
me->escount--;
me->realdemux->out->pf_del(me->realdemux->out, p_es);
}
int AbstractStreamOutput::esOutControl(es_out_t *fakees, int i_query, va_list args)
{
AbstractStreamOutput *me = (AbstractStreamOutput *) fakees->p_sys;
if (i_query == ES_OUT_SET_PCR )
{
me->pcr = (int64_t)va_arg( args, int64_t );
return VLC_SUCCESS;
}
else if( i_query == ES_OUT_SET_GROUP_PCR )
{
me->group = (int) va_arg( args, int );
me->pcr = (int64_t)va_arg( args, int64_t );
return VLC_SUCCESS;
}
return me->realdemux->out->pf_control(me->realdemux->out, i_query, args);
}
void AbstractStreamOutput::esOutDestroy(es_out_t *fakees)
{
AbstractStreamOutput *me = (AbstractStreamOutput *) fakees->p_sys;
me->realdemux->out->pf_destroy(me->realdemux->out);
}
/* !Static callbacks */
MP4StreamOutput::MP4StreamOutput(demux_t *demux) :
AbstractStreamOutput(demux)
{
demuxstream = stream_DemuxNew(demux, "mp4", fakeesout);
if(!demuxstream)
throw VLC_EGENERIC;
}
......@@ -20,8 +20,16 @@
#ifndef STREAM_HPP
#define STREAM_HPP
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <string>
#include <vlc_common.h>
#include "StreamsType.hpp"
#include "adaptationlogic/IAdaptationLogic.h"
#include "http/HTTPConnectionManager.h"
#include "http/Chunk.h"
namespace dash
{
......@@ -34,13 +42,59 @@ namespace dash
public:
Stream(const std::string &mime);
Stream(const Type);
~Stream();
bool operator==(const Stream &) const;
static Type mimeToType(const std::string &mime);
void init(demux_t *, logic::IAdaptationLogic *);
bool isEOF() const;
mtime_t getPCR() const;
int getGroup() const;
int esCount() const;
size_t read(http::HTTPConnectionManager *);
private:
http::Chunk *getChunk();
void init(const Type);
Type type;
AbstractStreamOutput *output;
logic::IAdaptationLogic *adaptationLogic;
http::Chunk *currentChunk;
bool eof;
};
class AbstractStreamOutput
{
public:
AbstractStreamOutput(demux_t *);
virtual ~AbstractStreamOutput();
virtual void pushBlock(block_t *);
mtime_t getPCR() const;
int getGroup() const;
int esCount() const;
protected:
mtime_t pcr;
int group;
int escount;
es_out_t *fakeesout; /* to intercept/proxy what is sent from demuxstream */
stream_t *demuxstream;
private:
demux_t *realdemux;
static es_out_id_t *esOutAdd(es_out_t *, const es_format_t *);
static int esOutSend(es_out_t *, es_out_id_t *, block_t *);
static void esOutDel(es_out_t *, es_out_id_t *);
static int esOutControl(es_out_t *, int, va_list);
static void esOutDestroy(es_out_t *);
};
class MP4StreamOutput : public AbstractStreamOutput
{
public:
MP4StreamOutput(demux_t *);
virtual ~MP4StreamOutput(){}
};
}
}
#endif // STREAMS_HPP
......@@ -35,6 +35,7 @@ AbstractAdaptationLogic::AbstractAdaptationLogic (MPD *mpd_) :
mpd (mpd_),
currentPeriod (mpd->getFirstPeriod()),
count (0),
prevRepresentation (NULL),
bufferedMicroSec (0),
bufferedPercent (0)
{
......@@ -53,9 +54,15 @@ Chunk* AbstractAdaptationLogic::getNextChunk(Streams::Type type)
if ( rep == NULL )
return NULL;
bool reinit = count && (rep != prevRepresentation);
prevRepresentation = rep;
std::vector<ISegment *> segments = rep->getSegments();
ISegment *first = segments.empty() ? NULL : segments.front();
if (reinit && first && first->getClassId() == InitSegment::CLASSID_INITSEGMENT)
return first->toChunk(count, rep);
bool b_templated = (first && !first->isSingleShot());
if (count == segments.size() && !b_templated)
......
......@@ -54,6 +54,7 @@ namespace dash
dash::mpd::MPD *mpd;
dash::mpd::Period *currentPeriod;
size_t count;
const mpd::Representation *prevRepresentation;
private:
mtime_t bufferedMicroSec;
......
......@@ -29,7 +29,7 @@
#include <adaptationlogic/IDownloadRateObserver.h>
#include "mpd/Representation.h"
#include "buffer/IBufferObserver.h"
#include "Streams.hpp"
#include "StreamsType.hpp"
namespace dash
{
......
......@@ -29,170 +29,61 @@
using namespace dash::buffer;
BlockBuffer::BlockBuffer (stream_t *stream) :
sizeMicroSec (0),
BlockBuffer::BlockBuffer () :
sizeBytes (0),
stream (stream),
isEOF (false)
{
this->capacityMicroSec = var_InheritInteger(stream, "dash-buffersize") * 1000000;
if(this->capacityMicroSec <= 0)
this->capacityMicroSec = DEFAULTBUFFERLENGTH;
this->peekBlock = block_Alloc(INTIALPEEKSIZE);
block_BytestreamInit(&this->buffer);
vlc_mutex_init(&this->monitorMutex);
vlc_cond_init(&this->empty);
vlc_cond_init(&this->full);
fifo = block_FifoNew();
if(!fifo)
throw VLC_ENOMEM;
}
BlockBuffer::~BlockBuffer ()
{
block_Release(this->peekBlock);
block_BytestreamRelease(&this->buffer);
vlc_mutex_destroy(&this->monitorMutex);
vlc_cond_destroy(&this->empty);
vlc_cond_destroy(&this->full);
block_FifoRelease(fifo);
}
int BlockBuffer::peek (const uint8_t **pp_peek, unsigned int len)
int BlockBuffer::peek(const uint8_t **pp_peek, unsigned int len)
{
vlc_mutex_lock(&this->monitorMutex);
while(this->sizeBytes == 0 && !this->isEOF)
vlc_cond_wait(&this->full, &this->monitorMutex);
if(this->sizeBytes == 0)
{
vlc_cond_signal(&this->empty);
vlc_mutex_unlock(&this->monitorMutex);
block_t *p_block = block_FifoShow(fifo);
if(!p_block)
return 0;
}
size_t ret = len > this->sizeBytes ? this->sizeBytes : len;
if(ret > this->peekBlock->i_buffer)
this->peekBlock = block_Realloc(this->peekBlock, 0, ret);
block_PeekBytes(&this->buffer, this->peekBlock->p_buffer, ret);
*pp_peek = this->peekBlock->p_buffer;
vlc_mutex_unlock(&this->monitorMutex);
return ret;
*pp_peek = p_block->p_buffer;
return __MIN(len, p_block->i_buffer);
}
int BlockBuffer::seekBackwards (unsigned len)
block_t * BlockBuffer::get()
{
vlc_mutex_lock(&this->monitorMutex);
if( this->buffer.i_offset > len )
{
this->buffer.i_offset -= len;
this->sizeBytes += len;
vlc_mutex_unlock(&this->monitorMutex);
return VLC_SUCCESS;
}
vlc_mutex_unlock(&this->monitorMutex);
return VLC_EGENERIC;
block_t *p_block = block_FifoGet(fifo);
if(p_block)
notify();
return p_block;
}
int BlockBuffer::get (void *p_data, unsigned int len)
void BlockBuffer::put(block_t *block)
{
vlc_mutex_lock(&this->monitorMutex);
while(this->sizeBytes == 0 && !this->isEOF)
vlc_cond_wait(&this->full, &this->monitorMutex);
if(this->sizeBytes == 0)
{
vlc_cond_signal(&this->empty);
vlc_mutex_unlock(&this->monitorMutex);
return 0;
}
int ret = len > this->sizeBytes ? this->sizeBytes : len;
if(p_data == NULL)
block_SkipBytes(&this->buffer, ret);
else
block_GetBytes(&this->buffer, (uint8_t *)p_data, ret);
block_BytestreamFlush(&this->buffer);
this->updateBufferSize(ret);
this->notify();
vlc_cond_signal(&this->empty);
vlc_mutex_unlock(&this->monitorMutex);
return ret;
block_FifoPut(fifo, block);
notify();
}
void BlockBuffer::put (block_t *block)
{
vlc_mutex_lock(&this->monitorMutex);
while(this->sizeMicroSec >= this->capacityMicroSec && !this->isEOF)
vlc_cond_wait(&this->empty, &this->monitorMutex);
if(this->isEOF)
{
vlc_cond_signal(&this->full);
vlc_mutex_unlock(&this->monitorMutex);
return;
}
this->sizeMicroSec += block->i_length;
this->sizeBytes += block->i_buffer;
block_BytestreamPush(&this->buffer, block);
this->notify();
vlc_cond_signal(&this->full);
vlc_mutex_unlock(&this->monitorMutex);
}
void BlockBuffer::setEOF (bool value)
void BlockBuffer::setEOF(bool value)
{
vlc_mutex_lock(&this->monitorMutex);
this->isEOF = value;
vlc_cond_signal(&this->empty);
vlc_cond_signal(&this->full);
vlc_mutex_unlock(&this->monitorMutex);
isEOF = value;
block_FifoWake(fifo);
}
bool BlockBuffer::getEOF ()
{
vlc_mutex_locker lock(&this->monitorMutex);
return this->isEOF;
bool BlockBuffer::getEOF()
{
return isEOF;
}
void BlockBuffer::attach (IBufferObserver *observer)
{
this->bufferObservers.push_back(observer);
}
void BlockBuffer::notify ()
{
for(size_t i = 0; i < this->bufferObservers.size(); i++)
this->bufferObservers.at(i)->bufferLevelChanged(this->sizeMicroSec, ((float)this->sizeMicroSec / this->capacityMicroSec) * 100);
}
void BlockBuffer::updateBufferSize (size_t bytes)
{
block_t *block = this->buffer.p_block;
this->sizeMicroSec = 0;
while(block)
{
this->sizeMicroSec += block->i_length;
block = block->p_next;
}
this->sizeBytes -= bytes;
}
mtime_t BlockBuffer::size ()
{
vlc_mutex_lock(&this->monitorMutex);
mtime_t ret = this->sizeMicroSec;
vlc_mutex_unlock(&this->monitorMutex);
return ret;
// for(size_t i = 0; i < this->bufferObservers.size(); i++)
// this->bufferObservers.at(i)->bufferLevelChanged(this->sizeMicroSec, ((float)this->sizeMicroSec / this->capacityMicroSec) * 100);
}
......@@ -28,11 +28,8 @@
#include "buffer/IBufferObserver.h"
#include <vlc_stream.h>
#include <vlc_block_helper.h>
#include <vector>
#include <iostream>
#define DEFAULTBUFFERLENGTH 30000000
#define INTIALPEEKSIZE 32768
......@@ -43,34 +40,23 @@ namespace dash
class BlockBuffer
{
public:
BlockBuffer (stream_t *stream);
BlockBuffer ();
virtual ~BlockBuffer ();
void put (block_t *block);
int get (void *p_data, unsigned int len);
block_t *get ();
int peek (const uint8_t **pp_peek, unsigned int i_peek);
int seekBackwards (unsigned len);
void setEOF (bool value);
bool getEOF ();
mtime_t size ();
void attach (IBufferObserver *observer);
void notify ();
private:
mtime_t capacityMicroSec;
mtime_t sizeMicroSec;
size_t sizeBytes;
vlc_mutex_t monitorMutex;
vlc_cond_t empty;
vlc_cond_t full;
stream_t *stream;
bool isEOF;
block_bytestream_t buffer;
block_t *peekBlock;
block_fifo_t *fifo;
std::vector<IBufferObserver *> bufferObservers;
void updateBufferSize(size_t bytes);
};
}
}
......
This diff is collapsed.
......@@ -19,10 +19,8 @@
*****************************************************************************/
#include "DASHManager.h"
struct stream_sys_t
struct demux_sys_t
{
dash::DASHManager *p_dashManager;
dash::mpd::MPD *p_mpd;
uint64_t position;
char *psz_useragent;
};
......@@ -26,9 +26,7 @@
#endif
#include "HTTPConnectionManager.h"
#include "mpd/Segment.h"
#include <vlc_block.h>
#include "http/Chunk.h"
#include <vlc_stream.h>
......@@ -36,11 +34,8 @@ using namespace dash::http;
const uint64_t HTTPConnectionManager::CHUNKDEFAULTBITRATE = 1;
HTTPConnectionManager::HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream) :
currentChunk (NULL),
adaptationLogic (adaptationLogic),
stream (stream),
chunkCount (0)
HTTPConnectionManager::HTTPConnectionManager (stream_t *stream) :
stream (stream)
{
}
HTTPConnectionManager::~HTTPConnectionManager ()
......@@ -50,74 +45,15 @@ HTTPConnectionManager::~HTTPConnectionManager ()
void HTTPConnectionManager::closeAllConnections ()
{
releaseAllConnections();
vlc_delete_all(this->connectionPool);
for(int i=0; i<Streams::count; i++)
vlc_delete_all(downloadQueue[i]);
delete currentChunk;
}
ssize_t HTTPConnectionManager::read(Streams::Type type, block_t **pp_block)
void HTTPConnectionManager::releaseAllConnections()
{
Chunk *chunk;
if(downloadQueue[type].empty())
{
chunk = adaptationLogic->getNextChunk(type);
if(!connectChunk(chunk))
return -1;
else
downloadQueue[type].push_back(chunk);
}
chunk = downloadQueue[type].front();
if(chunk->getBytesRead() == 0)
{
if (!chunk->getConnection()->query(chunk->getPath()))
return -1;
}
/* chunk length should be set at connect/query reply time */
size_t readsize = chunk->getBytesToRead();
if (readsize > 128000)
readsize = 32768;
block_t *block = block_Alloc(readsize);
if(!block)
return -1;
mtime_t time = mdate();
ssize_t ret = chunk->getConnection()->read(block->p_buffer, readsize);
time = mdate() - time;
block->i_length = (mtime_t)((ret * 8) / ((float)chunk->getBitrate() / CLOCK_FREQ));
if(ret <= 0)
{
block_Release(block);
*pp_block = NULL;
delete(chunk);
downloadQueue[type].pop_front();
return read(type, pp_block);
}
else
{
block->i_buffer = ret;
adaptationLogic->updateDownloadRate(block->i_buffer, time);
if (chunk->getBytesToRead() == 0)
{
chunk->onDownload(block->p_buffer, block->i_buffer);
delete chunk;
downloadQueue[type].pop_front();
}
}
*pp_block = block;
return ret;
std::vector<PersistentConnection *>::iterator it;
for(it = connectionPool.begin(); it != connectionPool.end(); it++)
(*it)->releaseChunk();
}
PersistentConnection * HTTPConnectionManager::getConnectionForHost(const std::string &hostname)
......@@ -125,7 +61,7 @@ PersistentConnection * HTTPConnectionManager::getConnectionForHost(const std::st
std::vector<PersistentConnection *>::const_iterator it;
for(it = connectionPool.begin(); it != connectionPool.end(); it++)
{
if(!(*it)->getHostname().compare(hostname))
if(!(*it)->getHostname().compare(hostname) && (*it)->isAvailable())
return *it;
}
return NULL;
......@@ -135,6 +71,8 @@ bool HTTPConnectionManager::connectChunk(Chunk *chunk)
{
if(chunk == NULL)
return false;
if(chunk->getConnection())
return true;
msg_Dbg(stream, "Retrieving %s", chunk->getUrl().c_str());
......@@ -144,15 +82,13 @@ bool HTTPConnectionManager::connectChunk(Chunk *chunk)
conn = new PersistentConnection(stream, chunk);
if(!conn)
return false;
connectionPool.push_back(conn);
if (!chunk->getConnection()->connect(chunk->getHostname(), chunk->getPort()))
return false;
connectionPool.push_back(conn);
}
conn->bindChunk(chunk);
chunkCount++;
if(chunk->getBitrate() <= 0)
chunk->setBitrate(HTTPConnectionManager::CHUNKDEFAULTBITRATE);
......
......@@ -30,19 +30,10 @@
#endif
#include "http/PersistentConnection.h"
#include "adaptationlogic/IDownloadRateObserver.h"
#include <vlc_common.h>
#include <string>
#include <vector>
#include <deque>
#include <iostream>
#include <ctime>
#include <limits.h>
#include "http/PersistentConnection.h"
#include "adaptationlogic/IAdaptationLogic.h"
#include "Streams.hpp"
namespace dash
{
......@@ -51,25 +42,20 @@ namespace dash
class HTTPConnectionManager
{
public:
HTTPConnectionManager (logic::IAdaptationLogic *adaptationLogic, stream_t *stream);
HTTPConnectionManager (stream_t *stream);
virtual ~HTTPConnectionManager ();
void closeAllConnections ();
ssize_t read (Streams::Type, block_t **);
private:
std::deque<Chunk *> downloadQueue[Streams::count];
void releaseAllConnections ();
bool connectChunk (Chunk *chunk);
private:
Chunk *currentChunk;
std::vector<PersistentConnection *> connectionPool;
logic::IAdaptationLogic *adaptationLogic;
stream_t *stream;
int chunkCount;
static const uint64_t CHUNKDEFAULTBITRATE;
bool connectChunk (Chunk *chunk);
PersistentConnection * getConnectionForHost (const std::string &hostname);
};
}
......
......@@ -34,11 +34,13 @@ IHTTPConnection::IHTTPConnection(stream_t *stream_)
{
stream = stream_;
httpSocket = -1;
psz_useragent = var_InheritString(stream, "http-user-agent");
}
IHTTPConnection::~IHTTPConnection()
{
disconnect();
free(psz_useragent);
}
bool IHTTPConnection::connect(const std::string &hostname, int port)
......@@ -166,7 +168,7 @@ std::string IHTTPConnection::buildRequestHeader(const std::string &path) const
std::stringstream req;
req << "GET " << path << " HTTP/1.1\r\n" <<
"Host: " << hostname << "\r\n" <<
"User-Agent: " << std::string(stream->p_sys->psz_useragent) << "\r\n";
"User-Agent: " << std::string(psz_useragent) << "\r\n";
req << extraRequestHeaders();
return req.str();
}
......@@ -60,6 +60,7 @@ namespace dash
bool parseReply();
std::string readLine();
std::string hostname;
char * psz_useragent;
stream_t *stream;
private:
......
......@@ -28,6 +28,8 @@
#include "MPD.h"
#include "Helper.h"
#include "dash.hpp"
#include <vlc_common.h>
#include <vlc_stream.h>
using namespace dash::mpd;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment