Commit a5d7067e authored by Francois Cartegnie's avatar Francois Cartegnie

demux: adaptative: add chunk backend/source instea of direct reads

because we'll need fake chunks
parent 8d8a63bc
......@@ -54,7 +54,7 @@ void SegmentTracker::resetCounter()
prevRepresentation = NULL;
}
SegmentChunk * SegmentTracker::getNextChunk(bool switch_allowed)
SegmentChunk * SegmentTracker::getNextChunk(bool switch_allowed, HTTPConnectionManager *connManager)
{
BaseRepresentation *rep;
ISegment *segment;
......@@ -92,7 +92,7 @@ SegmentChunk * SegmentTracker::getNextChunk(bool switch_allowed)
init_sent = true;
segment = rep->getSegment(BaseRepresentation::INFOTYPE_INIT);
if(segment)
return segment->toChunk(count, rep);
return segment->toChunk(count, rep, connManager);
}
if(!index_sent)
......@@ -100,7 +100,7 @@ SegmentChunk * SegmentTracker::getNextChunk(bool switch_allowed)
index_sent = true;
segment = rep->getSegment(BaseRepresentation::INFOTYPE_INDEX);
if(segment)
return segment->toChunk(count, rep);
return segment->toChunk(count, rep, connManager);
}
bool b_gap = false;
......@@ -120,7 +120,7 @@ SegmentChunk * SegmentTracker::getNextChunk(bool switch_allowed)
/* stop initializing after 1st chunk */
initializing = false;
SegmentChunk *chunk = segment->toChunk(count, rep);
SegmentChunk *chunk = segment->toChunk(count, rep, connManager);
if(chunk)
count++;
......
......@@ -29,6 +29,11 @@
namespace adaptative
{
namespace http
{
class HTTPConnectionManager;
}
namespace logic
{
class AbstractAdaptationLogic;
......@@ -44,6 +49,7 @@ namespace adaptative
using namespace playlist;
using namespace logic;
using namespace http;
class SegmentTrackerListenerInterface
{
......@@ -63,7 +69,7 @@ namespace adaptative
void setAdaptationLogic(AbstractAdaptationLogic *);
void resetCounter();
SegmentChunk* getNextChunk(bool);
SegmentChunk* getNextChunk(bool, HTTPConnectionManager *);
bool setPositionByTime(mtime_t, bool, bool);
void setPositionByNumber(uint64_t, bool);
mtime_t getSegmentStart() const;
......
......@@ -150,7 +150,7 @@ SegmentChunk * AbstractStream::getChunk()
disabled = true;
return NULL;
}
currentChunk = segmentTracker->getNextChunk(!fakeesout->restarting());
currentChunk = segmentTracker->getNextChunk(!fakeesout->restarting(), connManager);
if (currentChunk == NULL)
eof = true;
}
......@@ -294,7 +294,7 @@ AbstractStream::status AbstractStream::demux(mtime_t nz_deadline, bool send)
return AbstractStream::status_demuxed;
}
block_t * AbstractStream::readNextBlock(size_t)
block_t * AbstractStream::readNextBlock(size_t toread)
{
SegmentChunk *chunk = getChunk();
if(!chunk)
......@@ -320,63 +320,23 @@ block_t * AbstractStream::readNextBlock(size_t)
return NULL;
}
if(!chunk->getConnection())
{
if(!connManager->connectChunk(chunk))
return NULL;
}
size_t readsize = 0;
bool b_segment_head_chunk = false;
/* New chunk, do query */
if(chunk->getBytesRead() == 0)
{
if(chunk->getConnection()->query(chunk->getPath(), chunk->getBytesRange()) != VLC_SUCCESS)
{
currentChunk = NULL;
delete chunk;
return NULL;
}
chunk->setLength(chunk->getConnection()->getContentLength());
b_segment_head_chunk = true;
}
/* Because we don't know Chunk size at start, we need to get size
from content length */
readsize = chunk->getBytesToRead();
if (readsize > 32768)
readsize = 32768;
block_t *block = block_Alloc(readsize);
if(!block)
return NULL;
const bool b_segment_head_chunk = (chunk->getBytesRead() == 0);
mtime_t time = mdate();
ssize_t ret = chunk->getConnection()->read(block->p_buffer, readsize);
time = mdate() - time;
if(ret < 0)
mtime_t time;
block_t *block = chunk->read(toread, &time);
if(block == NULL)
{
block_Release(block);
currentChunk = NULL;
delete chunk;
return NULL;
}
else
{
chunk->setBytesRead(chunk->getBytesRead() + ret);
block->i_buffer = (size_t)ret;
adaptationLogic->updateDownloadRate(block->i_buffer, time);
chunk->onDownload(&block);
if (chunk->getBytesToRead() == 0)
{
currentChunk = NULL;
delete chunk;
}
}
block = checkBlock(block, b_segment_head_chunk);
......
......@@ -26,17 +26,111 @@
#endif
#include "Chunk.h"
#include "HTTPConnection.hpp"
#include "HTTPConnectionManager.h"
#include <vlc_common.h>
#include <vlc_url.h>
#include <vlc_block.h>
using namespace adaptative::http;
Chunk::Chunk (const std::string& url) :
AbstractChunkSource::AbstractChunkSource()
{
parentChunk = NULL;
}
AbstractChunkSource::~AbstractChunkSource()
{
}
void AbstractChunkSource::setParentChunk(AbstractChunk *parent)
{
parentChunk = parent;
}
AbstractChunk::AbstractChunk(AbstractChunkSource *source_)
{
length = 0;
bytesRead = 0;
source = source_;
source->setParentChunk(this);
}
AbstractChunk::~AbstractChunk()
{
delete source;
}
void AbstractChunk::setLength(size_t length)
{
this->length = length;
}
size_t AbstractChunk::getBytesRead() const
{
return this->bytesRead;
}
void AbstractChunk::setBytesRead(size_t bytes)
{
this->bytesRead = bytes;
}
size_t AbstractChunk::getBytesToRead() const
{
return length - bytesRead;
}
void AbstractChunk::setBytesRange(const BytesRange &range)
{
bytesRange = range;
if(bytesRange.isValid() && bytesRange.getEndByte())
setLength(bytesRange.getEndByte() - bytesRange.getStartByte());
}
const BytesRange & AbstractChunk::getBytesRange() const
{
return bytesRange;
}
block_t * AbstractChunk::read(size_t sizehint, mtime_t *time)
{
if(!source)
return NULL;
*time = mdate();
block_t *block = source->read(sizehint);
*time = mdate() - *time;
if(block)
{
setBytesRead(getBytesRead() + block->i_buffer);
onDownload(&block);
}
return block;
}
HTTPChunkSource::HTTPChunkSource(const std::string& url, HTTPConnectionManager *manager) :
AbstractChunkSource(),
port (0),
length (0),
bytesRead (0),
connection (NULL)
connection (NULL),
connManager (manager)
{
if(!init(url))
throw VLC_EGENERIC;
}
HTTPChunkSource::~HTTPChunkSource()
{
if(connection)
connection->setUsed(false);
}
bool HTTPChunkSource::init(const std::string &url)
{
this->url = url;
......@@ -47,7 +141,7 @@ Chunk::Chunk (const std::string& url) :
}
if(scheme != "http" && scheme != "https")
throw VLC_EGENERIC;
return false;
vlc_url_t url_components;
vlc_UrlParse(&url_components, url.c_str());
......@@ -67,73 +161,63 @@ Chunk::Chunk (const std::string& url) :
vlc_UrlClean(&url_components);
if(path.empty() || hostname.empty())
throw VLC_EGENERIC;
}
return false;
Chunk::~Chunk()
{
if(connection)
connection->setUsed(false);
return true;
}
const BytesRange & Chunk::getBytesRange() const
block_t * HTTPChunkSource::read(size_t maxread)
{
return bytesRange;
}
if(!connManager || !parentChunk)
return NULL;
const std::string& Chunk::getUrl () const
{
return url;
}
if(!connection)
{
connection = connManager->getConnection(scheme, hostname, port);
if(!connection)
return NULL;
}
void Chunk::setBytesRange(const BytesRange &range)
{
bytesRange = range;
if(bytesRange.isValid() && bytesRange.getEndByte())
length = bytesRange.getEndByte() - bytesRange.getStartByte();
}
if(parentChunk->getBytesRead() == 0)
{
if( connection->query(path, parentChunk->getBytesRange()) != VLC_SUCCESS )
return NULL;
parentChunk->setLength(connection->getContentLength());
}
const std::string& Chunk::getScheme () const
{
return scheme;
}
if( parentChunk->getBytesToRead() == 0 )
return NULL;
const std::string& Chunk::getHostname () const
{
return hostname;
}
const std::string& Chunk::getPath () const
{
return this->path;
}
int Chunk::getPort () const
{
return this->port;
}
/* Because we don't know Chunk size at start, we need to get size
from content length */
size_t readsize = parentChunk->getBytesToRead();
if (readsize > maxread)
readsize = maxread;
void Chunk::setLength (size_t length)
{
this->length = length;
}
size_t Chunk::getBytesRead () const
{
return this->bytesRead;
}
void Chunk::setBytesRead (size_t bytes)
{
this->bytesRead = bytes;
}
block_t *block = block_Alloc(readsize);
if(!block)
return NULL;
size_t Chunk::getBytesToRead () const
{
return length - bytesRead;
ssize_t ret = connection->read(block->p_buffer, readsize);
if(ret < 0)
{
block_Release(block);
return NULL;
}
block->i_buffer = (size_t)ret;
return block;
}
HTTPConnection* Chunk::getConnection () const
HTTPChunk::HTTPChunk(const std::string &url, HTTPConnectionManager *manager):
AbstractChunk(new HTTPChunkSource(url, manager))
{
return this->connection;
}
void Chunk::setConnection (HTTPConnection *connection)
HTTPChunk::~HTTPChunk()
{
this->connection = connection;
}
......@@ -37,40 +37,73 @@ namespace adaptative
namespace http
{
class HTTPConnection;
class HTTPConnectionManager;
class AbstractChunk;
class Chunk
class AbstractChunkSource
{
public:
Chunk (const std::string &url);
virtual ~Chunk ();
AbstractChunkSource();
virtual ~AbstractChunkSource();
virtual block_t * read(size_t) = 0;
void setParentChunk(AbstractChunk *);
protected:
AbstractChunk *parentChunk;
};
class AbstractChunk
{
public:
virtual ~AbstractChunk();
const BytesRange & getBytesRange () const;
const std::string& getUrl () const;
const std::string& getScheme () const;
const std::string& getHostname () const;
const std::string& getPath () const;
int getPort () const;
size_t getBytesRead () const;
size_t getBytesToRead () const;
HTTPConnection* getConnection () const;
const BytesRange & getBytesRange () const;
void setConnection (HTTPConnection *connection);
void setBytesRead (size_t bytes);
void setLength (size_t length);
void setBytesRange (const BytesRange &);
virtual void onDownload (block_t **) {}
virtual block_t * read (size_t, mtime_t *);
virtual void onDownload (block_t **) = 0;
protected:
AbstractChunk(AbstractChunkSource *);
AbstractChunkSource *source;
private:
size_t length;
size_t bytesRead;
BytesRange bytesRange;
};
class HTTPChunkSource : public AbstractChunkSource
{
public:
HTTPChunkSource(const std::string &url, HTTPConnectionManager *);
virtual ~HTTPChunkSource();
virtual block_t * read(size_t); /* impl */
private:
bool init(const std::string &);
std::string url;
std::string scheme;
std::string path;
std::string hostname;
int port;
BytesRange bytesRange;
size_t length;
size_t bytesRead;
uint16_t port;
HTTPConnection *connection;
HTTPConnectionManager *connManager;
};
class HTTPChunk : public AbstractChunk
{
public:
HTTPChunk(const std::string &url, HTTPConnectionManager *);
virtual ~HTTPChunk();
virtual void onDownload (block_t **) {} /* impl */
};
}
......
......@@ -82,6 +82,9 @@ int HTTPConnection::query(const std::string &path, const BytesRange &range)
{
queryOk = false;
msg_Dbg(stream, "Retrieving ://%s:%u%s @%zu", hostname.c_str(), port, path.c_str(),
range.isValid() ? range.getStartByte() : 0);
if(!connected() && ( hostname.empty() || !connect(hostname, port) ))
return VLC_EGENERIC;
......
......@@ -27,8 +27,8 @@
#include "HTTPConnectionManager.h"
#include "HTTPConnection.hpp"
#include "Chunk.h"
#include "Sockets.hpp"
#include <vlc_url.h>
using namespace adaptative::http;
......@@ -66,42 +66,37 @@ HTTPConnection * HTTPConnectionManager::getConnection(const std::string &hostnam
return NULL;
}
bool HTTPConnectionManager::connectChunk(Chunk *chunk)
HTTPConnection * HTTPConnectionManager::getConnection(const std::string &scheme,
const std::string &hostname,
uint16_t port)
{
if(chunk == NULL)
return false;
if(chunk->getConnection())
return true;
msg_Dbg(stream, "Retrieving %s @%zu", chunk->getUrl().c_str(),
chunk->getBytesRange().isValid() ? chunk->getBytesRange().getStartByte() : 0);
if((scheme != "http" && scheme != "https") || hostname.empty())
return NULL;
const int sockettype = (chunk->getScheme() == "https") ? TLSSocket::TLS : Socket::REGULAR;
HTTPConnection *conn = getConnection(chunk->getHostname(), chunk->getPort(), sockettype);
const int sockettype = (scheme == "https") ? TLSSocket::TLS : Socket::REGULAR;
HTTPConnection *conn = getConnection(hostname, port, sockettype);
if(!conn)
{
Socket *socket = (sockettype == TLSSocket::TLS) ? new (std::nothrow) TLSSocket()
: new (std::nothrow) Socket();
if(!socket)
return false;
return NULL;
/* disable pipelined tls until we have ticket/resume session support */
conn = new (std::nothrow) HTTPConnection(stream, socket, sockettype != TLSSocket::TLS);
if(!conn)
{
delete socket;
return false;
return NULL;
}
connectionPool.push_back(conn);
if (!conn->connect(chunk->getHostname(), chunk->getPort()))
if (!conn->connect(hostname, port))
{
return false;
return NULL;
}
}
conn->setUsed(true);
chunk->setConnection(conn);
return true;
return conn;
}
......@@ -38,7 +38,6 @@ namespace adaptative
namespace http
{
class HTTPConnection;
class Chunk;
class HTTPConnectionManager
{
......@@ -48,7 +47,9 @@ namespace adaptative
void closeAllConnections ();
void releaseAllConnections ();
bool connectChunk (Chunk *chunk);
HTTPConnection * getConnection(const std::string &scheme,
const std::string &hostname,
uint16_t port);
private:
std::vector<HTTPConnection *> connectionPool;
......
......@@ -29,6 +29,7 @@
namespace adaptative
{
namespace playlist
{
class BasePeriod;
......@@ -73,6 +74,7 @@ namespace adaptative
std::vector<std::string> baseUrls;
std::string playlistUrl;
std::string type;
};
}
}
......
......@@ -28,6 +28,7 @@
#include "Segment.h"
#include "BaseRepresentation.h"
#include "AbstractPlaylist.hpp"
#include "SegmentChunk.hpp"
#include "../http/BytesRange.hpp"
#include <cassert>
......@@ -58,9 +59,10 @@ ISegment::~ISegment()
assert(chunksuse.Get() == 0);
}
SegmentChunk * ISegment::getChunk(const std::string &url)
SegmentChunk * ISegment::getChunk(const std::string &url, HTTPConnectionManager *connManager)
{
return new (std::nothrow) SegmentChunk(this, url);
HTTPChunkSource *source = new HTTPChunkSource(url, connManager);
return new (std::nothrow) SegmentChunk(this, source);
}
void ISegment::onChunkDownload(block_t **, SegmentChunk *, BaseRepresentation *)
......@@ -68,12 +70,12 @@ void ISegment::onChunkDownload(block_t **, SegmentChunk *, BaseRepresentation *)
}
SegmentChunk* ISegment::toChunk(size_t index, BaseRepresentation *ctxrep)
SegmentChunk* ISegment::toChunk(size_t index, BaseRepresentation *ctxrep, HTTPConnectionManager *connManager)
{
SegmentChunk *chunk;
try
{
chunk = getChunk(getUrlSegment().toString(index, ctxrep));
chunk = getChunk(getUrlSegment().toString(index, ctxrep), connManager);
if (!chunk)
return NULL;
}
......
......@@ -35,6 +35,11 @@
namespace adaptative
{
namespace http
{
class HTTPConnectionManager;
}
namespace playlist
{
class BaseRepresentation;
......@@ -53,7 +58,7 @@ namespace adaptative
* That is basically true when using an Url, and false
* when using an UrlTemplate
*/
virtual SegmentChunk* toChunk (size_t, BaseRepresentation * = NULL);
virtual SegmentChunk* toChunk (size_t, BaseRepresentation *, HTTPConnectionManager *);
virtual void setByteRange (size_t start, size_t end);
virtual void setSequenceNumber(uint64_t);
virtual uint64_t getSequenceNumber() const;
......@@ -84,7 +89,7 @@ namespace adaptative
static const int SEQUENCE_INVALID;
static const int SEQUENCE_FIRST;
virtual SegmentChunk * getChunk(const std::string &);
virtual SegmentChunk * getChunk(const std::string &, HTTPConnectionManager *);
};
class Segment : public ISegment
......
......@@ -25,8 +25,8 @@
using namespace adaptative::playlist;
using namespace adaptative;
SegmentChunk::SegmentChunk(ISegment *segment_, const std::string &url) :
Chunk(url)
SegmentChunk::SegmentChunk(ISegment *segment_, AbstractChunkSource *source) :
AbstractChunk(source)
{
segment = segment_;
segment->chunksuse.Set(segment->chunksuse.Get() + 1);
......@@ -57,3 +57,4 @@ StreamFormat SegmentChunk::getStreamFormat() const
else
return StreamFormat();
}
......@@ -27,6 +27,7 @@
namespace adaptative
{
namespace playlist
{
using namespace http;
......@@ -34,10 +35,10 @@ namespace adaptative
class BaseRepresentation;
class ISegment;
class SegmentChunk : public Chunk
class SegmentChunk : public AbstractChunk
{
public:
SegmentChunk(ISegment *segment, const std::string &url);
SegmentChunk(ISegment *segment, AbstractChunkSource *);
virtual ~SegmentChunk();
void setRepresentation(BaseRepresentation *);
virtual void onDownload(block_t **); // reimpl
......
......@@ -70,7 +70,8 @@ ssize_t ChunksSourceStream::Read(uint8_t *buf, size_t size)
while(i_toread && !b_eof)
{
if(!p_block && !(p_block = source->readNextBlock(i_toread)))
const size_t i_blocksize = __MIN(i_toread, 32768);
if(!p_block && !(p_block = source->readNextBlock(i_blocksize)))
{
b_eof = true;
break;
......
......@@ -26,52 +26,19 @@
using namespace adaptative;
using namespace adaptative::http;
uint64_t Retrieve::HTTP(vlc_object_t *obj, const std::string &uri, void **pp_data)
block_t * Retrieve::HTTP(vlc_object_t *obj, const std::string &uri)
{
HTTPConnectionManager connManager(obj);
Chunk *datachunk;
HTTPChunk *datachunk;
try
{
datachunk = new Chunk(uri);
datachunk = new HTTPChunk(uri, &connManager);
} catch (int) {
*pp_data = NULL;
return 0;
}
;
if(!connManager.connectChunk(datachunk))
{
delete datachunk;
*pp_data = NULL;
return 0;
}
if( datachunk->getConnection()->query(datachunk->getPath()) == VLC_SUCCESS )
datachunk->setLength(datachunk->getConnection()->getContentLength());
if( datachunk->getBytesToRead() == 0 )
{
delete datachunk;
*pp_data = NULL;
return 0;
return NULL;
}
size_t i_data = datachunk->getBytesToRead();
*pp_data = malloc(i_data);
if(*pp_data)
{
ssize_t ret = datachunk->getConnection()->read(*pp_data, i_data);
if(ret < 0)
{
free(*pp_data);
*pp_data = NULL;
i_data = 0;
}
else
{
datachunk->setBytesRead(datachunk->getBytesRead() + ret);
i_data = ret;
}
}
mtime_t time;
block_t *block = datachunk->read(1<<21, &time);
delete datachunk;
return i_data;
return block;
}
......@@ -24,7 +24,6 @@
# include "config.h"
#endif
//#include <inttypes.h>
#include <vlc_common.h>
#include <string>
......@@ -33,7 +32,7 @@ namespace adaptative
class Retrieve
{
public:
static uint64_t HTTP(vlc_object_t *, const std::string &uri, void **pp_data);
static block_t * HTTP(vlc_object_t *, const std::string &uri);
};
}
......
......@@ -37,6 +37,7 @@
#include <vlc_stream.h>
#include <vlc_demux.h>
#include <vlc_meta.h>
#include <vlc_block.h>
#include "../adaptative/tools/Retrieve.hpp"
#include <algorithm>
......@@ -73,15 +74,14 @@ bool DASHManager::updatePlaylist()
url.append("://");
url.append(p_demux->psz_location);
uint8_t *p_data = NULL;
size_t i_data = Retrieve::HTTP(VLC_OBJECT(p_demux->s), url, (void**) &p_data);
if(!p_data)
block_t *p_block = Retrieve::HTTP(VLC_OBJECT(p_demux->s), url);
if(!p_block)
return false;
stream_t *mpdstream = stream_MemoryNew(p_demux->s, p_data, i_data, false);
stream_t *mpdstream = stream_MemoryNew(p_demux->s, p_block->p_buffer, p_block->i_buffer, true);
if(!mpdstream)
{
free(p_data);
block_Release(p_block);
nextPlaylistupdate = now + playlist->minUpdatePeriod.Get() / CLOCK_FREQ;
return false;
}
......@@ -90,6 +90,7 @@ bool DASHManager::updatePlaylist()
if(!parser.parse())
{
stream_Delete(mpdstream);
block_Release(p_block);
nextPlaylistupdate = now + playlist->minUpdatePeriod.Get() / CLOCK_FREQ;
return false;
}
......@@ -111,6 +112,7 @@ bool DASHManager::updatePlaylist()
delete newmpd;
}
stream_Delete(mpdstream);
block_Release(p_block);
}
/* Compute new MPD update time */
......
......@@ -187,11 +187,10 @@ void M3U8Parser::createAndFillRepresentation(vlc_object_t *p_obj, BaseAdaptation
bool M3U8Parser::appendSegmentsFromPlaylistURI(vlc_object_t *p_obj, Representation *rep)
{
void *p_data;
const size_t i_data = Retrieve::HTTP(p_obj, rep->getPlaylistUrl().toString(), &p_data);
if(p_data)
block_t *p_block = Retrieve::HTTP(p_obj, rep->getPlaylistUrl().toString());
if(p_block)
{
stream_t *substream = stream_MemoryNew(p_obj, (uint8_t *)p_data, i_data, false);
stream_t *substream = stream_MemoryNew(p_obj, p_block->p_buffer, p_block->i_buffer, true);
if(substream)
{
std::list<Tag *> tagslist = parseEntries(substream);
......@@ -202,6 +201,7 @@ bool M3U8Parser::appendSegmentsFromPlaylistURI(vlc_object_t *p_obj, Representati
releaseTagsList(tagslist);
return true;
}
block_Release(p_block);
}
return false;
}
......@@ -311,7 +311,6 @@ void M3U8Parser::parseSegments(vlc_object_t *p_obj, Representation *rep, const s
{
encryption.method = SegmentEncryption::AES_128;
encryption.key.clear();
uint8_t *p_data;
Url keyurl(keytag->getAttributeByName("URI")->quotedString());
if(!keyurl.hasScheme())
......@@ -319,15 +318,15 @@ void M3U8Parser::parseSegments(vlc_object_t *p_obj, Representation *rep, const s
keyurl.prepend(Helper::getDirectoryPath(rep->getPlaylistUrl().toString()).append("/"));
}
const uint64_t read = Retrieve::HTTP(p_obj, keyurl.toString(), (void **) &p_data);
if(p_data)
block_t *p_block = Retrieve::HTTP(p_obj, keyurl.toString());
if(p_block)
{
if(read == 16)
if(p_block->i_buffer == 16)
{
encryption.key.resize(16);
memcpy(&encryption.key[0], p_data, 16);
memcpy(&encryption.key[0], p_block->p_buffer, 16);
}
free(p_data);
block_Release(p_block);
}
if(keytag->getAttributeByName("IV"))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment