Commit 9ec2d132 authored by Francois Cartegnie's avatar Francois Cartegnie

demux: adaptative: fix/update chunk reading and buffering

(dash monosegment was broken)
need to read large chunks for parsing atoms
parent d7020419
...@@ -38,6 +38,7 @@ using namespace adaptative::http; ...@@ -38,6 +38,7 @@ using namespace adaptative::http;
AbstractChunkSource::AbstractChunkSource() AbstractChunkSource::AbstractChunkSource()
{ {
parentChunk = NULL; parentChunk = NULL;
contentLength = 0;
} }
AbstractChunkSource::~AbstractChunkSource() AbstractChunkSource::~AbstractChunkSource()
...@@ -50,10 +51,25 @@ void AbstractChunkSource::setParentChunk(AbstractChunk *parent) ...@@ -50,10 +51,25 @@ void AbstractChunkSource::setParentChunk(AbstractChunk *parent)
parentChunk = parent; parentChunk = parent;
} }
size_t AbstractChunkSource::getContentLength() const
{
return contentLength;
}
void AbstractChunkSource::setBytesRange(const BytesRange &range)
{
bytesRange = range;
if(bytesRange.isValid() && bytesRange.getEndByte())
contentLength = bytesRange.getEndByte() - bytesRange.getStartByte();
}
const BytesRange & AbstractChunkSource::getBytesRange() const
{
return bytesRange;
}
AbstractChunk::AbstractChunk(AbstractChunkSource *source_) AbstractChunk::AbstractChunk(AbstractChunkSource *source_)
{ {
length = 0;
bytesRead = 0; bytesRead = 0;
source = source_; source = source_;
source->setParentChunk(this); source->setParentChunk(this);
...@@ -64,52 +80,30 @@ AbstractChunk::~AbstractChunk() ...@@ -64,52 +80,30 @@ AbstractChunk::~AbstractChunk()
delete source; delete source;
} }
void AbstractChunk::setLength(size_t length)
{
this->length = length;
}
size_t AbstractChunk::getBytesRead() const size_t AbstractChunk::getBytesRead() const
{ {
return this->bytesRead; return this->bytesRead;
} }
void AbstractChunk::setBytesRead(size_t bytes)
{
this->bytesRead = bytes;
}
size_t AbstractChunk::getBytesToRead() const size_t AbstractChunk::getBytesToRead() const
{ {
return length - bytesRead; return source->getContentLength() - bytesRead;
} }
void AbstractChunk::setBytesRange(const BytesRange &range) block_t * AbstractChunk::read(size_t size, mtime_t *time)
{
bytesRange = range;
if(bytesRange.isValid() && bytesRange.getEndByte())
setLength(bytesRange.getEndByte() - bytesRange.getStartByte());
}
const BytesRange & AbstractChunk::getBytesRange() const
{
return bytesRange;
}
block_t * AbstractChunk::read(size_t sizehint, mtime_t *time)
{ {
if(!source) if(!source)
return NULL; return NULL;
*time = mdate(); *time = mdate();
block_t *block = source->read(sizehint); block_t *block = source->read(size);
*time = mdate() - *time; *time = mdate() - *time;
if(block) if(block)
{ {
if(getBytesRead() == 0) if(bytesRead == 0)
block->i_flags |= BLOCK_FLAG_HEADER; block->i_flags |= BLOCK_FLAG_HEADER;
setBytesRead(getBytesRead() + block->i_buffer); bytesRead += block->i_buffer;
onDownload(&block); onDownload(&block);
block->i_flags ^= BLOCK_FLAG_HEADER; block->i_flags ^= BLOCK_FLAG_HEADER;
} }
...@@ -119,9 +113,10 @@ block_t * AbstractChunk::read(size_t sizehint, mtime_t *time) ...@@ -119,9 +113,10 @@ block_t * AbstractChunk::read(size_t sizehint, mtime_t *time)
HTTPChunkSource::HTTPChunkSource(const std::string& url, HTTPConnectionManager *manager) : HTTPChunkSource::HTTPChunkSource(const std::string& url, HTTPConnectionManager *manager) :
AbstractChunkSource(), AbstractChunkSource(),
port (0),
connection (NULL), connection (NULL),
connManager (manager) connManager (manager),
consumed (0),
port (0)
{ {
if(!init(url)) if(!init(url))
throw VLC_EGENERIC; throw VLC_EGENERIC;
...@@ -169,7 +164,31 @@ bool HTTPChunkSource::init(const std::string &url) ...@@ -169,7 +164,31 @@ bool HTTPChunkSource::init(const std::string &url)
return true; return true;
} }
block_t * HTTPChunkSource::read(size_t maxread) block_t * HTTPChunkSource::consume(size_t readsize)
{
if(contentLength && readsize > contentLength - consumed)
readsize = contentLength - consumed;
block_t *p_block = block_Alloc(readsize);
if(!p_block)
return NULL;
ssize_t ret = connection->read(p_block->p_buffer, readsize);
if(ret < 0)
{
block_Release(p_block);
p_block = NULL;
}
else
{
p_block->i_buffer = (size_t) ret;
consumed += p_block->i_buffer;
}
return p_block;
}
block_t * HTTPChunkSource::read(size_t readsize)
{ {
if(!connManager || !parentChunk) if(!connManager || !parentChunk)
return NULL; return NULL;
...@@ -181,39 +200,93 @@ block_t * HTTPChunkSource::read(size_t maxread) ...@@ -181,39 +200,93 @@ block_t * HTTPChunkSource::read(size_t maxread)
return NULL; return NULL;
} }
if(parentChunk->getBytesRead() == 0) if(consumed == 0)
{ {
if( connection->query(path, parentChunk->getBytesRange()) != VLC_SUCCESS ) if( connection->query(path, bytesRange) != VLC_SUCCESS )
return NULL; return NULL;
parentChunk->setLength(connection->getContentLength()); /* Because we don't know Chunk size at start, we need to get size
from content length */
contentLength = connection->getContentLength();
} }
if( parentChunk->getBytesToRead() == 0 ) if(consumed == contentLength && consumed > 0)
return NULL; return NULL;
/* Because we don't know Chunk size at start, we need to get size return consume(readsize);
from content length */ }
size_t readsize = parentChunk->getBytesToRead();
if (readsize > maxread)
readsize = maxread;
block_t *block = block_Alloc(readsize); HTTPChunkBufferedSource::HTTPChunkBufferedSource(const std::string& url, HTTPConnectionManager *manager) :
if(!block) HTTPChunkSource(url, manager),
return NULL; p_buffer (NULL),
buffered (0)
{
}
ssize_t ret = connection->read(block->p_buffer, readsize); HTTPChunkBufferedSource::~HTTPChunkBufferedSource()
{
if(p_buffer)
block_ChainRelease(p_buffer);
}
void HTTPChunkBufferedSource::bufferize(size_t readsize)
{
if(readsize < 32768)
readsize = 32768;
if(contentLength && readsize > contentLength - consumed)
readsize = contentLength - consumed;
block_t *p_block = block_Alloc(readsize);
if(!p_block)
return;
ssize_t ret = connection->read(p_block->p_buffer, readsize);
if(ret < 0) if(ret < 0)
{ {
block_Release(block); block_Release(p_block);
}
else
{
p_block->i_buffer = (size_t) ret;
buffered += p_block->i_buffer;
block_ChainAppend(&p_buffer, p_block);
}
}
block_t * HTTPChunkBufferedSource::consume(size_t readsize)
{
if(readsize > buffered)
bufferize(readsize);
block_t *p_block = NULL;
if(!readsize || !buffered || !(p_block = block_Alloc(readsize)) )
return NULL; return NULL;
size_t copied = 0;
while(buffered && readsize)
{
const size_t toconsume = std::min(p_buffer->i_buffer, readsize);
memcpy(&p_block->p_buffer[copied], p_buffer->p_buffer, toconsume);
copied += toconsume;
readsize -= toconsume;
buffered -= toconsume;
p_buffer->i_buffer -= toconsume;
p_buffer->p_buffer += toconsume;
if(p_buffer->i_buffer == 0)
{
block_t *next = p_buffer->p_next;
p_buffer->p_next = NULL;
block_Release(p_buffer);
p_buffer = next;
}
} }
block->i_buffer = (size_t)ret; consumed += copied;
p_block->i_buffer = copied;
return block; return p_block;
} }
HTTPChunk::HTTPChunk(const std::string &url, HTTPConnectionManager *manager): HTTPChunk::HTTPChunk(const std::string &url, HTTPConnectionManager *manager):
AbstractChunk(new HTTPChunkSource(url, manager)) AbstractChunk(new HTTPChunkSource(url, manager))
{ {
......
...@@ -46,10 +46,15 @@ namespace adaptative ...@@ -46,10 +46,15 @@ namespace adaptative
AbstractChunkSource(); AbstractChunkSource();
virtual ~AbstractChunkSource(); virtual ~AbstractChunkSource();
virtual block_t * read(size_t) = 0; virtual block_t * read(size_t) = 0;
void setParentChunk(AbstractChunk *); void setParentChunk (AbstractChunk *);
void setBytesRange (const BytesRange &);
const BytesRange & getBytesRange () const;
size_t getContentLength() const;
protected: protected:
AbstractChunk *parentChunk; AbstractChunk *parentChunk;
size_t contentLength;
BytesRange bytesRange;
}; };
class AbstractChunk class AbstractChunk
...@@ -59,11 +64,6 @@ namespace adaptative ...@@ -59,11 +64,6 @@ namespace adaptative
size_t getBytesRead () const; size_t getBytesRead () const;
size_t getBytesToRead () const; size_t getBytesToRead () const;
const BytesRange & getBytesRange () const;
void setBytesRead (size_t bytes);
void setLength (size_t length);
void setBytesRange (const BytesRange &);
virtual block_t * read (size_t, mtime_t *); virtual block_t * read (size_t, mtime_t *);
virtual void onDownload (block_t **) = 0; virtual void onDownload (block_t **) = 0;
...@@ -73,9 +73,7 @@ namespace adaptative ...@@ -73,9 +73,7 @@ namespace adaptative
AbstractChunkSource *source; AbstractChunkSource *source;
private: private:
size_t length;
size_t bytesRead; size_t bytesRead;
BytesRange bytesRange;
}; };
class HTTPChunkSource : public AbstractChunkSource class HTTPChunkSource : public AbstractChunkSource
...@@ -86,6 +84,12 @@ namespace adaptative ...@@ -86,6 +84,12 @@ namespace adaptative
virtual block_t * read(size_t); /* impl */ virtual block_t * read(size_t); /* impl */
protected:
virtual block_t * consume(size_t);
HTTPConnection *connection;
HTTPConnectionManager *connManager;
size_t consumed; /* read pointer */
private: private:
bool init(const std::string &); bool init(const std::string &);
std::string url; std::string url;
...@@ -93,8 +97,21 @@ namespace adaptative ...@@ -93,8 +97,21 @@ namespace adaptative
std::string path; std::string path;
std::string hostname; std::string hostname;
uint16_t port; uint16_t port;
HTTPConnection *connection; };
HTTPConnectionManager *connManager;
class HTTPChunkBufferedSource : public HTTPChunkSource
{
public:
HTTPChunkBufferedSource(const std::string &url, HTTPConnectionManager *);
virtual ~HTTPChunkBufferedSource();
protected:
virtual block_t * consume(size_t);
private:
void bufferize(size_t);
block_t *p_buffer; /* read cache buffer */
size_t buffered; /* read cache size */
}; };
class HTTPChunk : public AbstractChunk class HTTPChunk : public AbstractChunk
......
...@@ -62,6 +62,8 @@ ISegment::~ISegment() ...@@ -62,6 +62,8 @@ ISegment::~ISegment()
SegmentChunk * ISegment::getChunk(const std::string &url, HTTPConnectionManager *connManager) SegmentChunk * ISegment::getChunk(const std::string &url, HTTPConnectionManager *connManager)
{ {
HTTPChunkSource *source = new HTTPChunkSource(url, connManager); HTTPChunkSource *source = new HTTPChunkSource(url, connManager);
if(startByte != endByte)
source->setBytesRange(BytesRange(startByte, endByte));
return new (std::nothrow) SegmentChunk(this, source); return new (std::nothrow) SegmentChunk(this, source);
} }
...@@ -82,10 +84,7 @@ SegmentChunk* ISegment::toChunk(size_t index, BaseRepresentation *ctxrep, HTTPCo ...@@ -82,10 +84,7 @@ SegmentChunk* ISegment::toChunk(size_t index, BaseRepresentation *ctxrep, HTTPCo
catch (int) catch (int)
{ {
return NULL; return NULL;
} };
if(startByte != endByte)
chunk->setBytesRange(BytesRange(startByte, endByte));
chunk->setRepresentation(ctxrep); chunk->setRepresentation(ctxrep);
......
...@@ -70,7 +70,7 @@ ssize_t ChunksSourceStream::Read(uint8_t *buf, size_t size) ...@@ -70,7 +70,7 @@ ssize_t ChunksSourceStream::Read(uint8_t *buf, size_t size)
while(i_toread && !b_eof) while(i_toread && !b_eof)
{ {
const size_t i_blocksize = __MIN(i_toread, 32768); const size_t i_blocksize = __MAX(i_toread, 32768);
if(!p_block && !(p_block = source->readNextBlock(i_blocksize))) if(!p_block && !(p_block = source->readNextBlock(i_blocksize)))
{ {
b_eof = true; b_eof = true;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment