Commit 50871da6 authored by bellard's avatar bellard

added RTSP and RTP server support - added daemon mode as default (use -d for...

added RTSP and RTP server support - added daemon mode as default (use -d for foreground mode) - added module support


git-svn-id: file:///var/local/repositories/ffmpeg/trunk@812 9553f0bf-9b14-0410-a0b8-cfaf0461ba5b
parent 0070639e
......@@ -28,7 +28,6 @@
#include <errno.h>
#include <sys/time.h>
#include <time.h>
#include <getopt.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/wait.h>
......@@ -36,6 +35,9 @@
#include <netdb.h>
#include <ctype.h>
#include <signal.h>
#include <dlfcn.h>
#include "ffserver.h"
/* maximum number of simultaneous HTTP connections */
#define HTTP_MAX_CONNECTIONS 2000
......@@ -44,30 +46,45 @@ enum HTTPState {
HTTPSTATE_WAIT_REQUEST,
HTTPSTATE_SEND_HEADER,
HTTPSTATE_SEND_DATA_HEADER,
HTTPSTATE_SEND_DATA,
HTTPSTATE_SEND_DATA, /* sending TCP or UDP data */
HTTPSTATE_SEND_DATA_TRAILER,
HTTPSTATE_RECEIVE_DATA,
HTTPSTATE_WAIT_FEED,
HTTPSTATE_RECEIVE_DATA,
HTTPSTATE_WAIT_FEED, /* wait for data from the feed */
HTTPSTATE_WAIT, /* wait before sending next packets */
HTTPSTATE_WAIT_SHORT, /* short wait for short term
bandwidth limitation */
HTTPSTATE_READY,
RTSPSTATE_WAIT_REQUEST,
RTSPSTATE_SEND_REPLY,
};
const char *http_state[] = {
"WAIT_REQUEST",
"SEND_HEADER",
"HTTP_WAIT_REQUEST",
"HTTP_SEND_HEADER",
"SEND_DATA_HEADER",
"SEND_DATA",
"SEND_DATA_TRAILER",
"RECEIVE_DATA",
"WAIT_FEED",
"WAIT",
"WAIT_SHORT",
"READY",
"RTSP_WAIT_REQUEST",
"RTSP_SEND_REPLY",
};
#define IOBUFFER_INIT_SIZE 8192
#define PBUFFER_INIT_SIZE 8192
/* coef for exponential mean for bitrate estimation in statistics */
#define AVG_COEF 0.9
/* timeouts are in ms */
#define REQUEST_TIMEOUT (15 * 1000)
#define HTTP_REQUEST_TIMEOUT (15 * 1000)
#define RTSP_REQUEST_TIMEOUT (3600 * 24 * 1000)
#define SYNC_TIMEOUT (10 * 1000)
typedef struct {
......@@ -91,17 +108,19 @@ typedef struct HTTPContext {
int feed_fd;
/* input format handling */
AVFormatContext *fmt_in;
long start_time; /* In milliseconds - this wraps fairly often */
INT64 first_pts; /* initial pts value */
int pts_stream_index; /* stream we choose as clock reference */
/* output format handling */
struct FFStream *stream;
/* -1 is invalid stream */
int feed_streams[MAX_STREAMS]; /* index of streams in the feed */
int switch_feed_streams[MAX_STREAMS]; /* index of streams in the feed */
int switch_pending;
AVFormatContext fmt_ctx;
AVFormatContext fmt_ctx; /* instance of FFStream for one user */
int last_packet_sent; /* true if last data packet was sent */
int suppress_log;
int bandwidth;
long start_time; /* In milliseconds - this wraps fairly often */
DataRateData datarate;
int wmp_client_id;
char protocol[16];
......@@ -109,8 +128,23 @@ typedef struct HTTPContext {
char url[128];
int buffer_size;
UINT8 *buffer;
int pbuffer_size;
UINT8 *pbuffer;
int is_packetized; /* if true, the stream is packetized */
int packet_stream_index; /* current stream for output in state machine */
/* RTSP state specific */
UINT8 *pb_buffer; /* XXX: use that in all the code */
ByteIOContext *pb;
int seq; /* RTSP sequence number */
/* RTP state specific */
enum RTSPProtocol rtp_protocol;
char session_id[32]; /* session id */
AVFormatContext *rtp_ctx[MAX_STREAMS];
URLContext *rtp_handles[MAX_STREAMS];
/* RTP short term bandwidth limitation */
int packet_byte_count;
int packet_start_time_us; /* used for short durations (a few
seconds max) */
} HTTPContext;
/* each generated stream is described here */
......@@ -124,7 +158,8 @@ enum StreamType {
typedef struct FFStream {
enum StreamType stream_type;
char filename[1024]; /* stream filename */
struct FFStream *feed;
struct FFStream *feed; /* feed we are using (can be null if
coming from file) */
AVOutputFormat *fmt;
int nb_streams;
int prebuffer; /* Number of millseconds early to start */
......@@ -142,8 +177,10 @@ typedef struct FFStream {
time_t pid_start; /* Of ffmpeg process */
char **child_argv;
struct FFStream *next;
/* RTSP options */
char *rtsp_option;
/* feed specific */
int feed_opened; /* true if someone if writing to feed */
int feed_opened; /* true if someone is writing to the feed */
int is_feed; /* true if it is a feed */
int conns_served;
INT64 bytes_served;
......@@ -158,23 +195,45 @@ typedef struct FeedData {
float avg_frame_size; /* frame size averraged over last frames with exponential mean */
} FeedData;
struct sockaddr_in my_addr;
struct sockaddr_in my_http_addr;
struct sockaddr_in my_rtsp_addr;
char logfilename[1024];
HTTPContext *first_http_ctx;
FFStream *first_feed; /* contains only feeds */
FFStream *first_stream; /* contains all streams, including feeds */
static int handle_http(HTTPContext *c);
static void new_connection(int server_fd, int is_rtsp);
static void close_connection(HTTPContext *c);
/* HTTP handling */
static int handle_connection(HTTPContext *c);
static int http_parse_request(HTTPContext *c);
static int http_send_data(HTTPContext *c);
static void compute_stats(HTTPContext *c);
static int open_input_stream(HTTPContext *c, const char *info);
static int http_start_receive_data(HTTPContext *c);
static int http_receive_data(HTTPContext *c);
static int compute_send_delay(HTTPContext *c);
/* RTSP handling */
static int rtsp_parse_request(HTTPContext *c);
static void rtsp_cmd_describe(HTTPContext *c, const char *url);
static void rtsp_cmd_setup(HTTPContext *c, const char *url, RTSPHeader *h);
static void rtsp_cmd_play(HTTPContext *c, const char *url, RTSPHeader *h);
static void rtsp_cmd_pause(HTTPContext *c, const char *url, RTSPHeader *h);
static void rtsp_cmd_teardown(HTTPContext *c, const char *url, RTSPHeader *h);
/* RTP handling */
static HTTPContext *rtp_new_connection(HTTPContext *rtsp_c,
FFStream *stream, const char *session_id);
static int rtp_new_av_stream(HTTPContext *c,
int stream_index, struct sockaddr_in *dest_addr);
static const char *my_program_name;
static int ffserver_debug;
static int ffserver_daemon;
static int no_launch;
static int need_to_start_children;
......@@ -306,13 +365,10 @@ static void start_children(FFStream *feed)
}
}
/* main loop of the http server */
static int http_server(struct sockaddr_in my_addr)
/* open a listening socket */
static int socket_open_listen(struct sockaddr_in *my_addr)
{
int server_fd, tmp, ret;
struct sockaddr_in from_addr;
struct pollfd poll_table[HTTP_MAX_CONNECTIONS + 1], *poll_entry;
HTTPContext *c, **cp;
int server_fd, tmp;
server_fd = socket(AF_INET,SOCK_STREAM,0);
if (server_fd < 0) {
......@@ -323,7 +379,7 @@ static int http_server(struct sockaddr_in my_addr)
tmp = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(tmp));
if (bind (server_fd, (struct sockaddr *) &my_addr, sizeof (my_addr)) < 0) {
if (bind (server_fd, (struct sockaddr *) my_addr, sizeof (*my_addr)) < 0) {
perror ("bind");
close(server_fd);
return -1;
......@@ -334,12 +390,31 @@ static int http_server(struct sockaddr_in my_addr)
close(server_fd);
return -1;
}
fcntl(server_fd, F_SETFL, O_NONBLOCK);
return server_fd;
}
/* main loop of the http server */
static int http_server(void)
{
int server_fd, ret, rtsp_server_fd, delay, delay1;
struct pollfd poll_table[HTTP_MAX_CONNECTIONS + 2], *poll_entry;
HTTPContext *c, *c_next;
server_fd = socket_open_listen(&my_http_addr);
if (server_fd < 0)
return -1;
rtsp_server_fd = socket_open_listen(&my_rtsp_addr);
if (rtsp_server_fd < 0)
return -1;
http_log("ffserver started.\n");
start_children(first_feed);
fcntl(server_fd, F_SETFL, O_NONBLOCK);
first_http_ctx = NULL;
nb_connections = 0;
first_http_ctx = NULL;
......@@ -349,40 +424,61 @@ static int http_server(struct sockaddr_in my_addr)
poll_entry->events = POLLIN;
poll_entry++;
poll_entry->fd = rtsp_server_fd;
poll_entry->events = POLLIN;
poll_entry++;
/* wait for events on each HTTP handle */
c = first_http_ctx;
delay = 1000;
while (c != NULL) {
int fd;
fd = c->fd;
switch(c->state) {
case HTTPSTATE_WAIT_REQUEST:
case HTTPSTATE_SEND_HEADER:
case RTSPSTATE_SEND_REPLY:
c->poll_entry = poll_entry;
poll_entry->fd = fd;
poll_entry->events = POLLIN;
poll_entry->events = POLLOUT;
poll_entry++;
break;
case HTTPSTATE_SEND_HEADER:
case HTTPSTATE_SEND_DATA_HEADER:
case HTTPSTATE_SEND_DATA:
case HTTPSTATE_SEND_DATA_TRAILER:
c->poll_entry = poll_entry;
poll_entry->fd = fd;
poll_entry->events = POLLOUT;
poll_entry++;
if (!c->is_packetized) {
/* for TCP, we output as much as we can (may need to put a limit) */
c->poll_entry = poll_entry;
poll_entry->fd = fd;
poll_entry->events = POLLOUT;
poll_entry++;
} else {
/* not strictly correct, but currently cannot add
more than one fd in poll entry */
delay = 0;
}
break;
case HTTPSTATE_WAIT_REQUEST:
case HTTPSTATE_RECEIVE_DATA:
c->poll_entry = poll_entry;
poll_entry->fd = fd;
poll_entry->events = POLLIN;
poll_entry++;
break;
case HTTPSTATE_WAIT_FEED:
case RTSPSTATE_WAIT_REQUEST:
/* need to catch errors */
c->poll_entry = poll_entry;
poll_entry->fd = fd;
poll_entry->events = POLLIN;/* Maybe this will work */
poll_entry++;
break;
case HTTPSTATE_WAIT:
c->poll_entry = NULL;
delay1 = compute_send_delay(c);
if (delay1 < delay)
delay = delay1;
break;
case HTTPSTATE_WAIT_SHORT:
c->poll_entry = NULL;
delay1 = 10; /* one tick wait XXX: 10 ms assumed */
if (delay1 < delay)
delay = delay1;
break;
default:
c->poll_entry = NULL;
break;
......@@ -393,7 +489,7 @@ static int http_server(struct sockaddr_in my_addr)
/* wait for an event on one connection. We poll at least every
second to handle timeouts */
do {
ret = poll(poll_table, poll_entry - poll_table, 1000);
ret = poll(poll_table, poll_entry - poll_table, delay);
} while (ret == -1);
cur_time = gettime_ms();
......@@ -404,81 +500,153 @@ static int http_server(struct sockaddr_in my_addr)
}
/* now handle the events */
cp = &first_http_ctx;
while ((*cp) != NULL) {
c = *cp;
if (handle_http (c) < 0) {
for(c = first_http_ctx; c != NULL; c = c_next) {
c_next = c->next;
if (handle_connection(c) < 0) {
/* close and free the connection */
log_connection(c);
close(c->fd);
if (c->fmt_in)
av_close_input_file(c->fmt_in);
*cp = c->next;
nb_bandwidth -= c->bandwidth;
av_free(c->buffer);
av_free(c->pbuffer);
av_free(c);
nb_connections--;
} else {
cp = &c->next;
close_connection(c);
}
}
/* new connection request ? */
poll_entry = poll_table;
/* new HTTP connection request ? */
if (poll_entry->revents & POLLIN) {
int fd, len;
len = sizeof(from_addr);
fd = accept(server_fd, (struct sockaddr *)&from_addr,
&len);
if (fd >= 0) {
fcntl(fd, F_SETFL, O_NONBLOCK);
/* XXX: should output a warning page when coming
close to the connection limit */
if (nb_connections >= nb_max_connections) {
c = NULL;
} else {
/* add a new connection */
c = av_mallocz(sizeof(HTTPContext));
if (c) {
c->next = first_http_ctx;
first_http_ctx = c;
c->fd = fd;
c->poll_entry = NULL;
c->from_addr = from_addr;
c->state = HTTPSTATE_WAIT_REQUEST;
c->buffer = av_malloc(c->buffer_size = IOBUFFER_INIT_SIZE);
c->pbuffer = av_malloc(c->pbuffer_size = PBUFFER_INIT_SIZE);
if (!c->buffer || !c->pbuffer) {
av_free(c->buffer);
av_free(c->pbuffer);
av_freep(&c);
} else {
c->buffer_ptr = c->buffer;
c->buffer_end = c->buffer + c->buffer_size;
c->timeout = cur_time + REQUEST_TIMEOUT;
c->start_time = cur_time;
nb_connections++;
}
}
}
if (!c) {
close(fd);
}
}
new_connection(server_fd, 0);
}
poll_entry++;
/* new RTSP connection request ? */
if (poll_entry->revents & POLLIN) {
new_connection(rtsp_server_fd, 1);
}
}
}
static int handle_http(HTTPContext *c)
/* start waiting for a new HTTP/RTSP request */
static void start_wait_request(HTTPContext *c, int is_rtsp)
{
int len;
c->buffer_ptr = c->buffer;
c->buffer_end = c->buffer + c->buffer_size - 1; /* leave room for '\0' */
if (is_rtsp) {
c->timeout = cur_time + RTSP_REQUEST_TIMEOUT;
c->state = RTSPSTATE_WAIT_REQUEST;
} else {
c->timeout = cur_time + HTTP_REQUEST_TIMEOUT;
c->state = HTTPSTATE_WAIT_REQUEST;
}
}
static void new_connection(int server_fd, int is_rtsp)
{
struct sockaddr_in from_addr;
int fd, len;
HTTPContext *c = NULL;
len = sizeof(from_addr);
fd = accept(server_fd, (struct sockaddr *)&from_addr,
&len);
if (fd < 0)
return;
fcntl(fd, F_SETFL, O_NONBLOCK);
/* XXX: should output a warning page when coming
close to the connection limit */
if (nb_connections >= nb_max_connections)
goto fail;
/* add a new connection */
c = av_mallocz(sizeof(HTTPContext));
if (!c)
goto fail;
c->next = first_http_ctx;
first_http_ctx = c;
c->fd = fd;
c->poll_entry = NULL;
c->from_addr = from_addr;
c->buffer_size = IOBUFFER_INIT_SIZE;
c->buffer = av_malloc(c->buffer_size);
if (!c->buffer)
goto fail;
nb_connections++;
start_wait_request(c, is_rtsp);
return;
fail:
if (c) {
av_free(c->buffer);
av_free(c);
}
close(fd);
}
static void close_connection(HTTPContext *c)
{
HTTPContext **cp, *c1;
int i, nb_streams;
AVFormatContext *ctx;
URLContext *h;
AVStream *st;
/* remove connection from list */
cp = &first_http_ctx;
while ((*cp) != NULL) {
c1 = *cp;
if (c1 == c) {
*cp = c->next;
} else {
cp = &c1->next;
}
}
/* remove connection associated resources */
if (c->fd >= 0)
close(c->fd);
if (c->fmt_in) {
/* close each frame parser */
for(i=0;i<c->fmt_in->nb_streams;i++) {
st = c->fmt_in->streams[i];
if (st->codec.codec) {
avcodec_close(&st->codec);
}
}
av_close_input_file(c->fmt_in);
}
/* free RTP output streams if any */
nb_streams = 0;
if (c->stream)
nb_streams = c->stream->nb_streams;
for(i=0;i<nb_streams;i++) {
ctx = c->rtp_ctx[i];
if (ctx) {
av_write_trailer(ctx);
av_free(ctx);
}
h = c->rtp_handles[i];
if (h) {
url_close(h);
}
}
nb_bandwidth -= c->bandwidth;
av_freep(&c->pb_buffer);
av_free(c->buffer);
av_free(c);
nb_connections--;
}
static int handle_connection(HTTPContext *c)
{
int len, ret;
switch(c->state) {
case HTTPSTATE_WAIT_REQUEST:
case RTSPSTATE_WAIT_REQUEST:
/* timeout ? */
if ((c->timeout - cur_time) < 0)
return -1;
......@@ -503,7 +671,12 @@ static int handle_http(HTTPContext *c)
if ((ptr >= c->buffer + 2 && !memcmp(ptr-2, "\n\n", 2)) ||
(ptr >= c->buffer + 4 && !memcmp(ptr-4, "\r\n\r\n", 4))) {
/* request found : parse it and reply */
if (http_parse_request(c) < 0)
if (c->state == HTTPSTATE_WAIT_REQUEST) {
ret = http_parse_request(c);
} else {
ret = rtsp_parse_request(c);
}
if (ret < 0)
return -1;
} else if (ptr >= c->buffer_end) {
/* request too long: cannot do anything */
......@@ -516,13 +689,14 @@ static int handle_http(HTTPContext *c)
if (c->poll_entry->revents & (POLLERR | POLLHUP))
return -1;
/* no need to read if no events */
/* no need to write if no events */
if (!(c->poll_entry->revents & POLLOUT))
return 0;
len = write(c->fd, c->buffer_ptr, c->buffer_end - c->buffer_ptr);
if (len < 0) {
if (errno != EAGAIN && errno != EINTR) {
/* error : close connection */
av_freep(&c->pb_buffer);
return -1;
}
} else {
......@@ -531,10 +705,12 @@ static int handle_http(HTTPContext *c)
c->stream->bytes_served += len;
c->data_count += len;
if (c->buffer_ptr >= c->buffer_end) {
av_freep(&c->pb_buffer);
/* if error, exit */
if (c->http_error)
if (c->http_error) {
return -1;
/* all the buffer was send : synchronize to the incoming stream */
}
/* all the buffer was sent : synchronize to the incoming stream */
c->state = HTTPSTATE_SEND_DATA_HEADER;
c->buffer_ptr = c->buffer_end = c->buffer;
}
......@@ -544,12 +720,17 @@ static int handle_http(HTTPContext *c)
case HTTPSTATE_SEND_DATA:
case HTTPSTATE_SEND_DATA_HEADER:
case HTTPSTATE_SEND_DATA_TRAILER:
/* no need to read if no events */
if (c->poll_entry->revents & (POLLERR | POLLHUP))
return -1;
if (!(c->poll_entry->revents & POLLOUT))
return 0;
/* for packetized output, we consider we can always write (the
input streams sets the speed). It may be better to verify
that we do not rely too much on the kernel queues */
if (!c->is_packetized) {
if (c->poll_entry->revents & (POLLERR | POLLHUP))
return -1;
/* no need to read if no events */
if (!(c->poll_entry->revents & POLLOUT))
return 0;
}
if (http_send_data(c) < 0)
return -1;
break;
......@@ -569,6 +750,45 @@ static int handle_http(HTTPContext *c)
/* nothing to do, we'll be waken up by incoming feed packets */
break;
case HTTPSTATE_WAIT:
/* if the delay expired, we can send new packets */
if (compute_send_delay(c) <= 0)
c->state = HTTPSTATE_SEND_DATA;
break;
case HTTPSTATE_WAIT_SHORT:
/* just return back to send data */
c->state = HTTPSTATE_SEND_DATA;
break;
case RTSPSTATE_SEND_REPLY:
if (c->poll_entry->revents & (POLLERR | POLLHUP)) {
av_freep(&c->pb_buffer);
return -1;
}
/* no need to write if no events */
if (!(c->poll_entry->revents & POLLOUT))
return 0;
len = write(c->fd, c->buffer_ptr, c->buffer_end - c->buffer_ptr);
if (len < 0) {
if (errno != EAGAIN && errno != EINTR) {
/* error : close connection */
av_freep(&c->pb_buffer);
return -1;
}
} else {
c->buffer_ptr += len;
c->data_count += len;
if (c->buffer_ptr >= c->buffer_end) {
/* all the buffer was sent : wait for a new request */
av_freep(&c->pb_buffer);
start_wait_request(c, 1);
}
}
break;
case HTTPSTATE_READY:
/* nothing to do */
break;
default:
return -1;
}
......@@ -708,6 +928,35 @@ static void do_switch_stream(HTTPContext *c, int i)
c->switch_feed_streams[i] = -1;
}
/* XXX: factorize in utils.c ? */
/* XXX: take care with different space meaning */
static void skip_spaces(const char **pp)
{
const char *p;
p = *pp;
while (*p == ' ' || *p == '\t')
p++;
*pp = p;
}
static void get_word(char *buf, int buf_size, const char **pp)
{
const char *p;
char *q;
p = *pp;
skip_spaces(&p);
q = buf;
while (!isspace(*p) && *p != '\0') {
if ((q - buf) < buf_size - 1)
*q++ = *p;
p++;
}
if (buf_size > 0)
*q = '\0';
*pp = p;
}
/* parse http request and prepare header */
static int http_parse_request(HTTPContext *c)
{
......@@ -716,6 +965,7 @@ static int http_parse_request(HTTPContext *c)
int doing_asx;
int doing_asf_redirector;
int doing_ram;
int doing_rtsp_redirector;
char cmd[32];
char info[1024], *filename;
char url[1024], *q;
......@@ -728,14 +978,7 @@ static int http_parse_request(HTTPContext *c)
char *useragent = 0;
p = c->buffer;
q = cmd;
while (!isspace(*p) && *p != '\0') {
if ((q - cmd) < sizeof(cmd) - 1)
*q++ = *p;
p++;
}
*q = '\0';
get_word(cmd, sizeof(cmd), (const char **)&p);
pstrcpy(c->method, sizeof(c->method), cmd);
if (!strcmp(cmd, "GET"))
......@@ -745,25 +988,10 @@ static int http_parse_request(HTTPContext *c)
else
return -1;
while (isspace(*p)) p++;
q = url;
while (!isspace(*p) && *p != '\0') {
if ((q - url) < sizeof(url) - 1)
*q++ = *p;
p++;
}
*q = '\0';
get_word(url, sizeof(url), (const char **)&p);
pstrcpy(c->url, sizeof(c->url), url);
while (isspace(*p)) p++;
q = protocol;
while (!isspace(*p) && *p != '\0') {
if ((q - protocol) < sizeof(protocol) - 1)
*q++ = *p;
p++;
}
*q = '\0';
get_word(protocol, sizeof(protocol), (const char **)&p);
if (strcmp(protocol, "HTTP/1.0") && strcmp(protocol, "HTTP/1.1"))
return -1;
......@@ -820,6 +1048,33 @@ static int http_parse_request(HTTPContext *c)
doing_ram = 0;
}
if (strlen(filename) > 5 &&
strcmp(".rtsp", filename + strlen(filename) - 5) == 0) {
char file1[1024];
char file2[1024];
char *p;
doing_rtsp_redirector = 1;
/* compute filename by matching without the file extensions */
pstrcpy(file1, sizeof(file1), filename);
p = strrchr(file1, '.');
if (p)
*p = '\0';
for(stream = first_stream; stream != NULL; stream = stream->next) {
pstrcpy(file2, sizeof(file2), stream->filename);
p = strrchr(file2, '.');
if (p)
*p = '\0';
if (!strcmp(file1, file2)) {
pstrcpy(url, sizeof(url), stream->filename);
filename = url;
break;
}
}
} else {
doing_rtsp_redirector = 0;
}
stream = first_stream;
while (stream != NULL) {
if (!strcmp(stream->filename, filename))
......@@ -902,7 +1157,8 @@ static int http_parse_request(HTTPContext *c)
return 0;
}
if (doing_asx || doing_ram || doing_asf_redirector) {
if (doing_asx || doing_ram || doing_asf_redirector ||
doing_rtsp_redirector) {
char *hostinfo = 0;
for (p = c->buffer; *p && *p != '\r' && *p != '\n'; ) {
......@@ -958,8 +1214,23 @@ static int http_parse_request(HTTPContext *c)
q += sprintf(q, "[Reference]\r\n");
q += sprintf(q, "Ref1=http://%s/%s%s\r\n",
hostbuf, filename, info);
} else
} else if (doing_rtsp_redirector) {
char hostname[256], *p;
/* extract only hostname */
pstrcpy(hostname, sizeof(hostname), hostbuf);
p = strrchr(hostname, ':');
if (p)
*p = '\0';
q += sprintf(q, "HTTP/1.0 200 RTSP Redirect follows\r\n");
/* XXX: incorrect mime type ? */
q += sprintf(q, "Content-type: application/x-rtsp\r\n");
q += sprintf(q, "\r\n");
q += sprintf(q, "rtsp://%s:%d/%s\r\n",
hostname, ntohs(my_rtsp_addr.sin_port),
filename);
} else {
av_abort();
}
/* prepare output buffer */
c->buffer_ptr = c->buffer;
......@@ -1116,7 +1387,7 @@ static int http_parse_request(HTTPContext *c)
return 0;
}
static int fmt_bytecount(char *q, INT64 count)
static void fmt_bytecount(ByteIOContext *pb, INT64 count)
{
static const char *suffix = " kMGTP";
const char *s;
......@@ -1124,64 +1395,68 @@ static int fmt_bytecount(char *q, INT64 count)
for (s = suffix; count >= 100000 && s[1]; count /= 1000, s++) {
}
return sprintf(q, "%lld%c", count, *s);
url_fprintf(pb, "%lld%c", count, *s);
}
static void compute_stats(HTTPContext *c)
{
HTTPContext *c1;
FFStream *stream;
char *q, *p;
char *p;
time_t ti;
int i;
char *new_buffer;
int i, len;
ByteIOContext pb1, *pb = &pb1;
new_buffer = av_malloc(65536);
if (new_buffer) {
av_free(c->buffer);
c->buffer_size = 65536;
c->buffer = new_buffer;
if (url_open_dyn_buf(pb) < 0) {
/* XXX: return an error ? */
c->buffer_ptr = c->buffer;
c->buffer_end = c->buffer + c->buffer_size;
c->buffer_end = c->buffer;
return;
}
q = c->buffer;
q += sprintf(q, "HTTP/1.0 200 OK\r\n");
q += sprintf(q, "Content-type: %s\r\n", "text/html");
q += sprintf(q, "Pragma: no-cache\r\n");
q += sprintf(q, "\r\n");
url_fprintf(pb, "HTTP/1.0 200 OK\r\n");
url_fprintf(pb, "Content-type: %s\r\n", "text/html");
url_fprintf(pb, "Pragma: no-cache\r\n");
url_fprintf(pb, "\r\n");
q += sprintf(q, "<HEAD><TITLE>FFServer Status</TITLE>\n");
url_fprintf(pb, "<HEAD><TITLE>FFServer Status</TITLE>\n");
if (c->stream->feed_filename) {
q += sprintf(q, "<link rel=\"shortcut icon\" href=\"%s\">\n", c->stream->feed_filename);
url_fprintf(pb, "<link rel=\"shortcut icon\" href=\"%s\">\n", c->stream->feed_filename);
}
q += sprintf(q, "</HEAD>\n<BODY>");
q += sprintf(q, "<H1>FFServer Status</H1>\n");
url_fprintf(pb, "</HEAD>\n<BODY>");
url_fprintf(pb, "<H1>FFServer Status</H1>\n");
/* format status */
q += sprintf(q, "<H2>Available Streams</H2>\n");
q += sprintf(q, "<TABLE cellspacing=0 cellpadding=4>\n");
q += sprintf(q, "<TR><Th valign=top>Path<th align=left>Served<br>Conns<Th><br>bytes<Th valign=top>Format<Th>Bit rate<br>kbits/s<Th align=left>Video<br>kbits/s<th><br>Codec<Th align=left>Audio<br>kbits/s<th><br>Codec<Th align=left valign=top>Feed\n");
url_fprintf(pb, "<H2>Available Streams</H2>\n");
url_fprintf(pb, "<TABLE cellspacing=0 cellpadding=4>\n");
url_fprintf(pb, "<TR><Th valign=top>Path<th align=left>Served<br>Conns<Th><br>bytes<Th valign=top>Format<Th>Bit rate<br>kbits/s<Th align=left>Video<br>kbits/s<th><br>Codec<Th align=left>Audio<br>kbits/s<th><br>Codec<Th align=left valign=top>Feed\n");
stream = first_stream;
while (stream != NULL) {
char sfilename[1024];
char *eosf;
if (stream->feed != stream) {
pstrcpy(sfilename, sizeof(sfilename) - 1, stream->filename);
pstrcpy(sfilename, sizeof(sfilename) - 10, stream->filename);
eosf = sfilename + strlen(sfilename);
if (eosf - sfilename >= 4) {
if (strcmp(eosf - 4, ".asf") == 0) {
strcpy(eosf - 4, ".asx");
} else if (strcmp(eosf - 3, ".rm") == 0) {
strcpy(eosf - 3, ".ram");
} else if (stream->fmt == &rtp_mux) {
/* generate a sample RTSP director - maybe should
generate a .sdp file ? */
eosf = strrchr(sfilename, '.');
if (!eosf)
eosf = sfilename + strlen(sfilename);
strcpy(eosf, ".rtsp");
}
}
q += sprintf(q, "<TR><TD><A HREF=\"/%s\">%s</A> ",
url_fprintf(pb, "<TR><TD><A HREF=\"/%s\">%s</A> ",
sfilename, stream->filename);
q += sprintf(q, "<td align=right> %d <td align=right> ",
url_fprintf(pb, "<td align=right> %d <td align=right> ",
stream->conns_served);
q += fmt_bytecount(q, stream->bytes_served);
fmt_bytecount(pb, stream->bytes_served);
switch(stream->stream_type) {
case STREAM_TYPE_LIVE:
{
......@@ -1216,58 +1491,63 @@ static void compute_stats(HTTPContext *c)
av_abort();
}
}
q += sprintf(q, "<TD align=center> %s <TD align=right> %d <TD align=right> %d <TD> %s %s <TD align=right> %d <TD> %s %s",
url_fprintf(pb, "<TD align=center> %s <TD align=right> %d <TD align=right> %d <TD> %s %s <TD align=right> %d <TD> %s %s",
stream->fmt->name,
(audio_bit_rate + video_bit_rate) / 1000,
video_bit_rate / 1000, video_codec_name, video_codec_name_extra,
audio_bit_rate / 1000, audio_codec_name, audio_codec_name_extra);
if (stream->feed) {
q += sprintf(q, "<TD>%s", stream->feed->filename);
url_fprintf(pb, "<TD>%s", stream->feed->filename);
} else {
q += sprintf(q, "<TD>%s", stream->feed_filename);
url_fprintf(pb, "<TD>%s", stream->feed_filename);
}
q += sprintf(q, "\n");
url_fprintf(pb, "\n");
}
break;
default:
q += sprintf(q, "<TD align=center> - <TD align=right> - <TD align=right> - <td><td align=right> - <TD>\n");
url_fprintf(pb, "<TD align=center> - <TD align=right> - <TD align=right> - <td><td align=right> - <TD>\n");
break;
}
}
stream = stream->next;
}
q += sprintf(q, "</TABLE>\n");
url_fprintf(pb, "</TABLE>\n");
stream = first_stream;
while (stream != NULL) {
if (stream->feed == stream) {
q += sprintf(q, "<h2>Feed %s</h2>", stream->filename);
url_fprintf(pb, "<h2>Feed %s</h2>", stream->filename);
if (stream->pid) {
FILE *pid_stat;
char ps_cmd[64];
q += sprintf(q, "Running as pid %d.\n", stream->pid);
#ifdef linux
/* This is somewhat linux specific I guess */
snprintf(ps_cmd, sizeof(ps_cmd), "ps -o \"%%cpu,bsdtime\" --no-headers %d", stream->pid);
pid_stat = popen(ps_cmd, "r");
if (pid_stat) {
char cpuperc[10];
char cpuused[64];
url_fprintf(pb, "Running as pid %d.\n", stream->pid);
if (fscanf(pid_stat, "%10s %64s", cpuperc, cpuused) == 2) {
q += sprintf(q, "Currently using %s%% of the cpu. Total time used %s.\n",
cpuperc, cpuused);
#if defined(linux) && !defined(CONFIG_NOCUTILS)
{
FILE *pid_stat;
char ps_cmd[64];
/* This is somewhat linux specific I guess */
snprintf(ps_cmd, sizeof(ps_cmd),
"ps -o \"%%cpu,cputime\" --no-headers %d",
stream->pid);
pid_stat = popen(ps_cmd, "r");
if (pid_stat) {
char cpuperc[10];
char cpuused[64];
if (fscanf(pid_stat, "%10s %64s", cpuperc,
cpuused) == 2) {
url_fprintf(pb, "Currently using %s%% of the cpu. Total time used %s.\n",
cpuperc, cpuused);
}
fclose(pid_stat);
}
fclose(pid_stat);
}
#endif
q += sprintf(q, "<p>");
url_fprintf(pb, "<p>");
}
q += sprintf(q, "<table cellspacing=0 cellpadding=4><tr><th>Stream<th>type<th>kbits/s<th align=left>codec<th align=left>Parameters\n");
url_fprintf(pb, "<table cellspacing=0 cellpadding=4><tr><th>Stream<th>type<th>kbits/s<th align=left>codec<th align=left>Parameters\n");
for (i = 0; i < stream->nb_streams; i++) {
AVStream *st = stream->streams[i];
......@@ -1289,10 +1569,10 @@ static void compute_stats(HTTPContext *c)
default:
av_abort();
}
q += sprintf(q, "<tr><td align=right>%d<td>%s<td align=right>%d<td>%s<td>%s\n",
url_fprintf(pb, "<tr><td align=right>%d<td>%s<td align=right>%d<td>%s<td>%s\n",
i, type, st->codec.bit_rate/1000, codec ? codec->name : "", parameters);
}
q += sprintf(q, "</table>\n");
url_fprintf(pb, "</table>\n");
}
stream = stream->next;
......@@ -1307,9 +1587,9 @@ static void compute_stats(HTTPContext *c)
/* feed status */
stream = first_feed;
while (stream != NULL) {
q += sprintf(q, "<H1>Feed '%s'</H1>\n", stream->filename);
q += sprintf(q, "<TABLE>\n");
q += sprintf(q, "<TR><TD>Parameters<TD>Frame count<TD>Size<TD>Avg bitrate (kbits/s)\n");
url_fprintf(pb, "<H1>Feed '%s'</H1>\n", stream->filename);
url_fprintf(pb, "<TABLE>\n");
url_fprintf(pb, "<TR><TD>Parameters<TD>Frame count<TD>Size<TD>Avg bitrate (kbits/s)\n");
for(i=0;i<stream->nb_streams;i++) {
AVStream *st = stream->streams[i];
FeedData *fdata = st->priv_data;
......@@ -1319,98 +1599,90 @@ static void compute_stats(HTTPContext *c)
avg = fdata->avg_frame_size * (float)enc->rate * 8.0;
if (enc->codec->type == CODEC_TYPE_AUDIO && enc->frame_size > 0)
avg /= enc->frame_size;
q += sprintf(q, "<TR><TD>%s <TD> %d <TD> %Ld <TD> %0.1f\n",
url_fprintf(pb, "<TR><TD>%s <TD> %d <TD> %Ld <TD> %0.1f\n",
buf, enc->frame_number, fdata->data_count, avg / 1000.0);
}
q += sprintf(q, "</TABLE>\n");
url_fprintf(pb, "</TABLE>\n");
stream = stream->next_feed;
}
}
#endif
/* connection status */
q += sprintf(q, "<H2>Connection Status</H2>\n");
url_fprintf(pb, "<H2>Connection Status</H2>\n");
q += sprintf(q, "Number of connections: %d / %d<BR>\n",
url_fprintf(pb, "Number of connections: %d / %d<BR>\n",
nb_connections, nb_max_connections);
q += sprintf(q, "Bandwidth in use: %dk / %dk<BR>\n",
url_fprintf(pb, "Bandwidth in use: %dk / %dk<BR>\n",
nb_bandwidth, nb_max_bandwidth);
q += sprintf(q, "<TABLE>\n");
q += sprintf(q, "<TR><Th>#<Th>File<Th>IP<Th>State<Th>Target bits/sec<Th>Actual bits/sec<Th>Bytes transferred\n");
url_fprintf(pb, "<TABLE>\n");
url_fprintf(pb, "<TR><th>#<th>File<th>IP<th>Proto<th>State<th>Target bits/sec<th>Actual bits/sec<th>Bytes transferred\n");
c1 = first_http_ctx;
i = 0;
while (c1 != NULL && q < (char *) c->buffer + c->buffer_size - 2048) {
while (c1 != NULL) {
int bitrate;
int j;
bitrate = 0;
for (j = 0; j < c1->stream->nb_streams; j++) {
if (c1->feed_streams[j] >= 0) {
bitrate += c1->stream->feed->streams[c1->feed_streams[j]]->codec.bit_rate;
if (c1->stream) {
for (j = 0; j < c1->stream->nb_streams; j++) {
if (!c1->stream->feed) {
bitrate += c1->stream->streams[j]->codec.bit_rate;
} else {
if (c1->feed_streams[j] >= 0) {
bitrate += c1->stream->feed->streams[c1->feed_streams[j]]->codec.bit_rate;
}
}
}
}
i++;
p = inet_ntoa(c1->from_addr.sin_addr);
q += sprintf(q, "<TR><TD><B>%d</B><TD>%s%s <TD> %s <TD> %s <td align=right>",
i, c1->stream->filename,
c1->state == HTTPSTATE_RECEIVE_DATA ? "(input)" : "",
p,
http_state[c1->state]);
q += fmt_bytecount(q, bitrate);
q += sprintf(q, "<td align=right>");
q += fmt_bytecount(q, compute_datarate(&c1->datarate, c1->data_count) * 8);
q += sprintf(q, "<td align=right>");
q += fmt_bytecount(q, c1->data_count);
*q++ = '\n';
url_fprintf(pb, "<TR><TD><B>%d</B><TD>%s%s<TD>%s<TD>%s<TD>%s<td align=right>",
i,
c1->stream ? c1->stream->filename : "",
c1->state == HTTPSTATE_RECEIVE_DATA ? "(input)" : "",
p,
c1->protocol,
http_state[c1->state]);
fmt_bytecount(pb, bitrate);
url_fprintf(pb, "<td align=right>");
fmt_bytecount(pb, compute_datarate(&c1->datarate, c1->data_count) * 8);
url_fprintf(pb, "<td align=right>");
fmt_bytecount(pb, c1->data_count);
url_fprintf(pb, "\n");
c1 = c1->next;
}
q += sprintf(q, "</TABLE>\n");
url_fprintf(pb, "</TABLE>\n");
/* date */
ti = time(NULL);
p = ctime(&ti);
q += sprintf(q, "<HR size=1 noshade>Generated at %s", p);
q += sprintf(q, "</BODY>\n</HTML>\n");
url_fprintf(pb, "<HR size=1 noshade>Generated at %s", p);
url_fprintf(pb, "</BODY>\n</HTML>\n");
c->buffer_ptr = c->buffer;
c->buffer_end = q;
len = url_close_dyn_buf(pb, &c->pb_buffer);
c->buffer_ptr = c->pb_buffer;
c->buffer_end = c->pb_buffer + len;
}
static void http_write_packet(void *opaque,
unsigned char *buf, int size)
/* check if the parser needs to be opened for stream i */
static void open_parser(AVFormatContext *s, int i)
{
HTTPContext *c = opaque;
if (c->buffer_ptr == c->buffer_end || !c->buffer_ptr)
c->buffer_ptr = c->buffer_end = c->buffer;
if (c->buffer_end - c->buffer + size > c->buffer_size) {
int new_buffer_size = c->buffer_size * 2;
UINT8 *new_buffer;
if (new_buffer_size <= c->buffer_end - c->buffer + size) {
new_buffer_size = c->buffer_end - c->buffer + size + c->buffer_size;
}
AVStream *st = s->streams[i];
AVCodec *codec;
new_buffer = av_malloc(new_buffer_size);
if (new_buffer) {
memcpy(new_buffer, c->buffer, c->buffer_end - c->buffer);
c->buffer_end += (new_buffer - c->buffer);
c->buffer_ptr += (new_buffer - c->buffer);
av_free(c->buffer);
c->buffer = new_buffer;
c->buffer_size = new_buffer_size;
} else {
av_abort();
if (!st->codec.codec) {
codec = avcodec_find_decoder(st->codec.codec_id);
if (codec && (codec->capabilities & CODEC_CAP_PARSE_ONLY)) {
st->codec.parse_only = 1;
if (avcodec_open(&st->codec, codec) < 0) {
st->codec.parse_only = 0;
}
}
}
memcpy(c->buffer_end, buf, size);
c->buffer_end += size;
}
static int open_input_stream(HTTPContext *c, const char *info)
......@@ -1418,7 +1690,7 @@ static int open_input_stream(HTTPContext *c, const char *info)
char buf[128];
char input_filename[1024];
AVFormatContext *s;
int buf_size;
int buf_size, i;
INT64 stream_pos;
/* find file name */
......@@ -1448,131 +1720,253 @@ static int open_input_stream(HTTPContext *c, const char *info)
return -1;
/* open stream */
if (av_open_input_file(&s, input_filename, NULL, buf_size, NULL) < 0)
if (av_open_input_file(&s, input_filename, NULL, buf_size, NULL) < 0) {
http_log("%s not found", input_filename);
return -1;
}
c->fmt_in = s;
/* open each parser */
for(i=0;i<s->nb_streams;i++)
open_parser(s, i);
/* choose stream as clock source (we favorize video stream if
present) for packet sending */
c->pts_stream_index = 0;
for(i=0;i<c->stream->nb_streams;i++) {
if (c->pts_stream_index == 0 &&
c->stream->streams[i]->codec.codec_type == CODEC_TYPE_VIDEO) {
c->pts_stream_index = i;
}
}
if (c->fmt_in->iformat->read_seek) {
c->fmt_in->iformat->read_seek(c->fmt_in, stream_pos);
}
// printf("stream %s opened pos=%0.6f\n", input_filename, stream_pos / 1000000.0);
/* set the start time (needed for maxtime and RTP packet timing) */
c->start_time = cur_time;
c->first_pts = AV_NOPTS_VALUE;
printf("stream %s opened pos=%0.6f\n", input_filename, stream_pos / 1000000.0);
return 0;
}
static int http_prepare_data(HTTPContext *c)
{
int i;
/* currently desactivated because the new PTS handling is not
satisfactory yet */
//#define AV_READ_FRAME
#ifdef AV_READ_FRAME
switch(c->state) {
case HTTPSTATE_SEND_DATA_HEADER:
memset(&c->fmt_ctx, 0, sizeof(c->fmt_ctx));
pstrcpy(c->fmt_ctx.author, sizeof(c->fmt_ctx.author), c->stream->author);
pstrcpy(c->fmt_ctx.comment, sizeof(c->fmt_ctx.comment), c->stream->comment);
pstrcpy(c->fmt_ctx.copyright, sizeof(c->fmt_ctx.copyright), c->stream->copyright);
pstrcpy(c->fmt_ctx.title, sizeof(c->fmt_ctx.title), c->stream->title);
if (c->stream->feed) {
/* open output stream by using specified codecs */
c->fmt_ctx.oformat = c->stream->fmt;
c->fmt_ctx.nb_streams = c->stream->nb_streams;
for(i=0;i<c->fmt_ctx.nb_streams;i++) {
AVStream *st;
st = av_mallocz(sizeof(AVStream));
c->fmt_ctx.streams[i] = st;
if (c->stream->feed == c->stream)
memcpy(st, c->stream->streams[i], sizeof(AVStream));
else
memcpy(st, c->stream->feed->streams[c->stream->feed_streams[i]], sizeof(AVStream));
/* XXX: generalize that in ffmpeg for picture/audio/data. Currently
the return packet MUST NOT be freed */
int av_read_frame(AVFormatContext *s, AVPacket *pkt)
{
AVStream *st;
int len, ret, old_nb_streams, i;
st->codec.frame_number = 0; /* XXX: should be done in
AVStream, not in codec */
/* see if remaining frames must be parsed */
for(;;) {
if (s->cur_len > 0) {
st = s->streams[s->cur_pkt.stream_index];
len = avcodec_parse_frame(&st->codec, &pkt->data, &pkt->size,
s->cur_ptr, s->cur_len);
if (len < 0) {
/* error: get next packet */
s->cur_len = 0;
} else {
s->cur_ptr += len;
s->cur_len -= len;
if (pkt->size) {
/* init pts counter if not done */
if (st->pts.den == 0) {
switch(st->codec.codec_type) {
case CODEC_TYPE_AUDIO:
st->pts_incr = (INT64)s->pts_den;
av_frac_init(&st->pts, st->pts.val, 0,
(INT64)s->pts_num * st->codec.sample_rate);
break;
case CODEC_TYPE_VIDEO:
st->pts_incr = (INT64)s->pts_den * FRAME_RATE_BASE;
av_frac_init(&st->pts, st->pts.val, 0,
(INT64)s->pts_num * st->codec.frame_rate);
break;
default:
av_abort();
}
}
/* a frame was read: return it */
pkt->pts = st->pts.val;
#if 0
printf("add pts=%Lx num=%Lx den=%Lx incr=%Lx\n",
st->pts.val, st->pts.num, st->pts.den, st->pts_incr);
#endif
switch(st->codec.codec_type) {
case CODEC_TYPE_AUDIO:
av_frac_add(&st->pts, st->pts_incr * st->codec.frame_size);
break;
case CODEC_TYPE_VIDEO:
av_frac_add(&st->pts, st->pts_incr);
break;
default:
av_abort();
}
pkt->stream_index = s->cur_pkt.stream_index;
/* we use the codec indication because it is
more accurate than the demux flags */
pkt->flags = 0;
if (st->codec.key_frame)
pkt->flags |= PKT_FLAG_KEY;
return 0;
}
}
c->got_key_frame = 0;
} else {
/* open output stream by using codecs in specified file */
c->fmt_ctx.oformat = c->stream->fmt;
c->fmt_ctx.nb_streams = c->fmt_in->nb_streams;
for(i=0;i<c->fmt_ctx.nb_streams;i++) {
AVStream *st;
st = av_mallocz(sizeof(AVStream));
c->fmt_ctx.streams[i] = st;
memcpy(st, c->fmt_in->streams[i], sizeof(AVStream));
st->codec.frame_number = 0; /* XXX: should be done in
AVStream, not in codec */
/* free previous packet */
av_free_packet(&s->cur_pkt);
old_nb_streams = s->nb_streams;
ret = av_read_packet(s, &s->cur_pkt);
if (ret)
return ret;
/* open parsers for each new streams */
for(i = old_nb_streams; i < s->nb_streams; i++)
open_parser(s, i);
st = s->streams[s->cur_pkt.stream_index];
/* update current pts (XXX: dts handling) from packet, or
use current pts if none given */
if (s->cur_pkt.pts != AV_NOPTS_VALUE) {
av_frac_set(&st->pts, s->cur_pkt.pts);
} else {
s->cur_pkt.pts = st->pts.val;
}
if (!st->codec.codec) {
/* no codec opened: just return the raw packet */
*pkt = s->cur_pkt;
/* no codec opened: just update the pts by considering we
have one frame and free the packet */
if (st->pts.den == 0) {
switch(st->codec.codec_type) {
case CODEC_TYPE_AUDIO:
st->pts_incr = (INT64)s->pts_den * st->codec.frame_size;
av_frac_init(&st->pts, st->pts.val, 0,
(INT64)s->pts_num * st->codec.sample_rate);
break;
case CODEC_TYPE_VIDEO:
st->pts_incr = (INT64)s->pts_den * FRAME_RATE_BASE;
av_frac_init(&st->pts, st->pts.val, 0,
(INT64)s->pts_num * st->codec.frame_rate);
break;
default:
av_abort();
}
}
av_frac_add(&st->pts, st->pts_incr);
return 0;
} else {
s->cur_ptr = s->cur_pkt.data;
s->cur_len = s->cur_pkt.size;
}
c->got_key_frame = 0;
}
init_put_byte(&c->fmt_ctx.pb, c->pbuffer, c->pbuffer_size,
1, c, NULL, http_write_packet, NULL);
c->fmt_ctx.pb.is_streamed = 1;
/* prepare header */
av_write_header(&c->fmt_ctx);
c->state = HTTPSTATE_SEND_DATA;
}
}
static int compute_send_delay(HTTPContext *c)
{
INT64 cur_pts, delta_pts, next_pts;
int delay1;
/* compute current pts value from system time */
cur_pts = ((INT64)(cur_time - c->start_time) * c->fmt_in->pts_den) /
(c->fmt_in->pts_num * 1000LL);
/* compute the delta from the stream we choose as
main clock (we do that to avoid using explicit
buffers to do exact packet reordering for each
stream */
/* XXX: really need to fix the number of streams */
if (c->pts_stream_index >= c->fmt_in->nb_streams)
next_pts = cur_pts;
else
next_pts = c->fmt_in->streams[c->pts_stream_index]->pts.val;
delta_pts = next_pts - cur_pts;
if (delta_pts <= 0) {
delay1 = 0;
} else {
delay1 = (delta_pts * 1000 * c->fmt_in->pts_num) / c->fmt_in->pts_den;
}
return delay1;
}
#else
/* just fall backs */
int av_read_frame(AVFormatContext *s, AVPacket *pkt)
{
return av_read_packet(s, pkt);
}
static int compute_send_delay(HTTPContext *c)
{
return 0;
}
#endif
static int http_prepare_data(HTTPContext *c)
{
int i, len, ret;
AVFormatContext *ctx;
switch(c->state) {
case HTTPSTATE_SEND_DATA_HEADER:
memset(&c->fmt_ctx, 0, sizeof(c->fmt_ctx));
pstrcpy(c->fmt_ctx.author, sizeof(c->fmt_ctx.author),
c->stream->author);
pstrcpy(c->fmt_ctx.comment, sizeof(c->fmt_ctx.comment),
c->stream->comment);
pstrcpy(c->fmt_ctx.copyright, sizeof(c->fmt_ctx.copyright),
c->stream->copyright);
pstrcpy(c->fmt_ctx.title, sizeof(c->fmt_ctx.title),
c->stream->title);
/* open output stream by using specified codecs */
c->fmt_ctx.oformat = c->stream->fmt;
c->fmt_ctx.nb_streams = c->stream->nb_streams;
for(i=0;i<c->fmt_ctx.nb_streams;i++) {
AVStream *st;
st = av_mallocz(sizeof(AVStream));
c->fmt_ctx.streams[i] = st;
/* if file or feed, then just take streams from FFStream struct */
if (!c->stream->feed ||
c->stream->feed == c->stream)
memcpy(st, c->stream->streams[i], sizeof(AVStream));
else
memcpy(st, c->stream->feed->streams[c->stream->feed_streams[i]],
sizeof(AVStream));
st->codec.frame_number = 0; /* XXX: should be done in
AVStream, not in codec */
}
c->got_key_frame = 0;
/* prepare header and save header data in a stream */
if (url_open_dyn_buf(&c->fmt_ctx.pb) < 0) {
/* XXX: potential leak */
return -1;
}
c->fmt_ctx.pb.is_streamed = 1;
av_write_header(&c->fmt_ctx);
len = url_close_dyn_buf(&c->fmt_ctx.pb, &c->pb_buffer);
c->buffer_ptr = c->pb_buffer;
c->buffer_end = c->pb_buffer + len;
c->state = HTTPSTATE_SEND_DATA;
c->last_packet_sent = 0;
break;
case HTTPSTATE_SEND_DATA:
/* find a new packet */
#if 0
fifo_total_size = http_fifo_write_count - c->last_http_fifo_write_count;
if (fifo_total_size >= ((3 * FIFO_MAX_SIZE) / 4)) {
/* overflow : resync. We suppose that wptr is at this
point a pointer to a valid packet */
c->rptr = http_fifo.wptr;
c->got_key_frame = 0;
}
start_rptr = c->rptr;
if (fifo_read(&http_fifo, (UINT8 *)&hdr, sizeof(hdr), &c->rptr) < 0)
return 0;
payload_size = ntohs(hdr.payload_size);
payload = av_malloc(payload_size);
if (fifo_read(&http_fifo, payload, payload_size, &c->rptr) < 0) {
/* cannot read all the payload */
av_free(payload);
c->rptr = start_rptr;
return 0;
}
c->last_http_fifo_write_count = http_fifo_write_count -
fifo_size(&http_fifo, c->rptr);
if (c->stream->stream_type != STREAM_TYPE_MASTER) {
/* test if the packet can be handled by this format */
ret = 0;
for(i=0;i<c->fmt_ctx.nb_streams;i++) {
AVStream *st = c->fmt_ctx.streams[i];
if (test_header(&hdr, &st->codec)) {
/* only begin sending when got a key frame */
if (st->codec.key_frame)
c->got_key_frame |= 1 << i;
if (c->got_key_frame & (1 << i)) {
ret = c->fmt_ctx.format->write_packet(&c->fmt_ctx, i,
payload, payload_size);
}
break;
}
}
if (ret) {
/* must send trailer now */
c->state = HTTPSTATE_SEND_DATA_TRAILER;
}
} else {
/* master case : send everything */
char *q;
q = c->buffer;
memcpy(q, &hdr, sizeof(hdr));
q += sizeof(hdr);
memcpy(q, payload, payload_size);
q += payload_size;
c->buffer_ptr = c->buffer;
c->buffer_end = q;
}
av_free(payload);
#endif
{
AVPacket pkt;
/* read a packet from the input stream */
if (c->stream->feed) {
ffm_set_write_index(c->fmt_in,
......@@ -1584,95 +1978,150 @@ static int http_prepare_data(HTTPContext *c)
c->stream->max_time + c->start_time - cur_time < 0) {
/* We have timed out */
c->state = HTTPSTATE_SEND_DATA_TRAILER;
} else if (av_read_packet(c->fmt_in, &pkt) < 0) {
if (c->stream->feed && c->stream->feed->feed_opened) {
/* if coming from feed, it means we reached the end of the
ffm file, so must wait for more data */
c->state = HTTPSTATE_WAIT_FEED;
return 1; /* state changed */
} else {
/* must send trailer now because eof or error */
c->state = HTTPSTATE_SEND_DATA_TRAILER;
}
} else {
/* send it to the appropriate stream */
if (c->stream->feed) {
/* if coming from a feed, select the right stream */
if (c->switch_pending) {
c->switch_pending = 0;
if (c->is_packetized) {
if (compute_send_delay(c) > 0) {
c->state = HTTPSTATE_WAIT;
return 1; /* state changed */
}
}
if (av_read_frame(c->fmt_in, &pkt) < 0) {
if (c->stream->feed && c->stream->feed->feed_opened) {
/* if coming from feed, it means we reached the end of the
ffm file, so must wait for more data */
c->state = HTTPSTATE_WAIT_FEED;
return 1; /* state changed */
} else {
/* must send trailer now because eof or error */
c->state = HTTPSTATE_SEND_DATA_TRAILER;
}
} else {
/* update first pts if needed */
if (c->first_pts == AV_NOPTS_VALUE)
c->first_pts = pkt.pts;
/* send it to the appropriate stream */
if (c->stream->feed) {
/* if coming from a feed, select the right stream */
if (c->switch_pending) {
c->switch_pending = 0;
for(i=0;i<c->stream->nb_streams;i++) {
if (c->switch_feed_streams[i] == pkt.stream_index) {
if (pkt.flags & PKT_FLAG_KEY) {
do_switch_stream(c, i);
}
}
if (c->switch_feed_streams[i] >= 0) {
c->switch_pending = 1;
}
}
}
for(i=0;i<c->stream->nb_streams;i++) {
if (c->switch_feed_streams[i] == pkt.stream_index) {
if (c->feed_streams[i] == pkt.stream_index) {
pkt.stream_index = i;
if (pkt.flags & PKT_FLAG_KEY) {
do_switch_stream(c, i);
c->got_key_frame |= 1 << i;
}
/* See if we have all the key frames, then
* we start to send. This logic is not quite
* right, but it works for the case of a
* single video stream with one or more
* audio streams (for which every frame is
* typically a key frame).
*/
if (!c->stream->send_on_key ||
((c->got_key_frame + 1) >> c->stream->nb_streams)) {
goto send_it;
}
}
if (c->switch_feed_streams[i] >= 0) {
c->switch_pending = 1;
}
}
}
for(i=0;i<c->stream->nb_streams;i++) {
if (c->feed_streams[i] == pkt.stream_index) {
pkt.stream_index = i;
if (pkt.flags & PKT_FLAG_KEY) {
c->got_key_frame |= 1 << i;
}
/* See if we have all the key frames, then
* we start to send. This logic is not quite
* right, but it works for the case of a
* single video stream with one or more
* audio streams (for which every frame is
* typically a key frame).
*/
if (!c->stream->send_on_key || ((c->got_key_frame + 1) >> c->stream->nb_streams)) {
goto send_it;
}
} else {
AVCodecContext *codec;
send_it:
/* specific handling for RTP: we use several
output stream (one for each RTP
connection). XXX: need more abstract handling */
if (c->is_packetized) {
c->packet_stream_index = pkt.stream_index;
ctx = c->rtp_ctx[c->packet_stream_index];
codec = &ctx->streams[0]->codec;
} else {
ctx = &c->fmt_ctx;
/* Fudge here */
codec = &ctx->streams[pkt.stream_index]->codec;
}
}
} else {
AVCodecContext *codec;
send_it:
/* Fudge here */
codec = &c->fmt_ctx.streams[pkt.stream_index]->codec;
codec->key_frame = ((pkt.flags & PKT_FLAG_KEY) != 0);
codec->key_frame = ((pkt.flags & PKT_FLAG_KEY) != 0);
#ifdef PJSG
if (codec->codec_type == CODEC_TYPE_AUDIO) {
codec->frame_size = (codec->sample_rate * pkt.duration + 500000) / 1000000;
/* printf("Calculated size %d, from sr %d, duration %d\n", codec->frame_size, codec->sample_rate, pkt.duration); */
if (codec->codec_type == CODEC_TYPE_AUDIO) {
codec->frame_size = (codec->sample_rate * pkt.duration + 500000) / 1000000;
/* printf("Calculated size %d, from sr %d, duration %d\n", codec->frame_size, codec->sample_rate, pkt.duration); */
}
#endif
if (c->is_packetized) {
ret = url_open_dyn_packet_buf(&ctx->pb,
url_get_max_packet_size(c->rtp_handles[c->packet_stream_index]));
c->packet_byte_count = 0;
c->packet_start_time_us = av_gettime();
} else {
ret = url_open_dyn_buf(&ctx->pb);
}
if (ret < 0) {
/* XXX: potential leak */
return -1;
}
if (av_write_packet(ctx, &pkt, pkt.pts)) {
c->state = HTTPSTATE_SEND_DATA_TRAILER;
}
len = url_close_dyn_buf(&ctx->pb, &c->pb_buffer);
c->buffer_ptr = c->pb_buffer;
c->buffer_end = c->pb_buffer + len;
codec->frame_number++;
}
#ifndef AV_READ_FRAME
av_free_packet(&pkt);
#endif
if (av_write_packet(&c->fmt_ctx, &pkt, 0))
c->state = HTTPSTATE_SEND_DATA_TRAILER;
codec->frame_number++;
}
av_free_packet(&pkt);
}
}
break;
default:
case HTTPSTATE_SEND_DATA_TRAILER:
/* last packet test ? */
if (c->last_packet_sent)
if (c->last_packet_sent || c->is_packetized)
return -1;
ctx = &c->fmt_ctx;
/* prepare header */
av_write_trailer(&c->fmt_ctx);
if (url_open_dyn_buf(&ctx->pb) < 0) {
/* XXX: potential leak */
return -1;
}
av_write_trailer(ctx);
len = url_close_dyn_buf(&ctx->pb, &c->pb_buffer);
c->buffer_ptr = c->pb_buffer;
c->buffer_end = c->pb_buffer + len;
c->last_packet_sent = 1;
break;
}
return 0;
}
/* in bit/s */
#define SHORT_TERM_BANDWIDTH 8000000
/* should convert the format at the same time */
static int http_send_data(HTTPContext *c)
{
int len, ret;
int len, ret, dt;
while (c->buffer_ptr >= c->buffer_end) {
av_freep(&c->pb_buffer);
ret = http_prepare_data(c);
if (ret < 0)
return -1;
......@@ -1684,20 +2133,58 @@ static int http_send_data(HTTPContext *c)
}
}
if (c->buffer_end > c->buffer_ptr) {
len = write(c->fd, c->buffer_ptr, c->buffer_end - c->buffer_ptr);
if (len < 0) {
if (errno != EAGAIN && errno != EINTR) {
/* error : close connection */
return -1;
if (c->buffer_ptr < c->buffer_end) {
if (c->is_packetized) {
/* RTP/UDP data output */
len = c->buffer_end - c->buffer_ptr;
if (len < 4) {
/* fail safe - should never happen */
fail1:
c->buffer_ptr = c->buffer_end;
return 0;
}
len = (c->buffer_ptr[0] << 24) |
(c->buffer_ptr[1] << 16) |
(c->buffer_ptr[2] << 8) |
(c->buffer_ptr[3]);
if (len > (c->buffer_end - c->buffer_ptr))
goto fail1;
/* short term bandwidth limitation */
dt = av_gettime() - c->packet_start_time_us;
if (dt < 1)
dt = 1;
if ((c->packet_byte_count + len) * (INT64)1000000 >=
(SHORT_TERM_BANDWIDTH / 8) * (INT64)dt) {
/* bandwidth overflow : wait at most one tick and retry */
c->state = HTTPSTATE_WAIT_SHORT;
return 0;
}
} else {
c->buffer_ptr += 4;
url_write(c->rtp_handles[c->packet_stream_index],
c->buffer_ptr, len);
c->buffer_ptr += len;
c->data_count += len;
update_datarate(&c->datarate, c->data_count);
if (c->stream)
c->stream->bytes_served += len;
c->packet_byte_count += len;
} else {
/* TCP data output */
len = write(c->fd, c->buffer_ptr, c->buffer_end - c->buffer_ptr);
if (len < 0) {
if (errno != EAGAIN && errno != EINTR) {
/* error : close connection */
return -1;
} else {
return 0;
}
} else {
c->buffer_ptr += len;
}
}
c->data_count += len;
update_datarate(&c->datarate, c->data_count);
if (c->stream)
c->stream->bytes_served += len;
}
return 0;
}
......@@ -1827,6 +2314,635 @@ static int http_receive_data(HTTPContext *c)
return -1;
}
/********************************************************************/
/* RTSP handling */
static void rtsp_reply_header(HTTPContext *c, enum RTSPStatusCode error_number)
{
const char *str;
time_t ti;
char *p;
char buf2[32];
switch(error_number) {
#define DEF(n, c, s) case c: str = s; break;
#include "rtspcodes.h"
#undef DEF
default:
str = "Unknown Error";
break;
}
url_fprintf(c->pb, "RTSP/1.0 %d %s\r\n", error_number, str);
url_fprintf(c->pb, "CSeq: %d\r\n", c->seq);
/* output GMT time */
ti = time(NULL);
p = ctime(&ti);
strcpy(buf2, p);
p = buf2 + strlen(p) - 1;
if (*p == '\n')
*p = '\0';
url_fprintf(c->pb, "Date: %s GMT\r\n", buf2);
}
static void rtsp_reply_error(HTTPContext *c, enum RTSPStatusCode error_number)
{
rtsp_reply_header(c, error_number);
url_fprintf(c->pb, "\r\n");
}
static int rtsp_parse_request(HTTPContext *c)
{
const char *p, *p1, *p2;
char cmd[32];
char url[1024];
char protocol[32];
char line[1024];
ByteIOContext pb1;
int len;
RTSPHeader header1, *header = &header1;
c->buffer_ptr[0] = '\0';
p = c->buffer;
get_word(cmd, sizeof(cmd), &p);
get_word(url, sizeof(url), &p);
get_word(protocol, sizeof(protocol), &p);
pstrcpy(c->method, sizeof(c->method), cmd);
pstrcpy(c->url, sizeof(c->url), url);
pstrcpy(c->protocol, sizeof(c->protocol), protocol);
c->pb = &pb1;
if (url_open_dyn_buf(c->pb) < 0) {
/* XXX: cannot do more */
c->pb = NULL; /* safety */
return -1;
}
/* check version name */
if (strcmp(protocol, "RTSP/1.0") != 0) {
rtsp_reply_error(c, RTSP_STATUS_VERSION);
goto the_end;
}
/* parse each header line */
memset(header, 0, sizeof(RTSPHeader));
/* skip to next line */
while (*p != '\n' && *p != '\0')
p++;
if (*p == '\n')
p++;
while (*p != '\0') {
p1 = strchr(p, '\n');
if (!p1)
break;
p2 = p1;
if (p2 > p && p2[-1] == '\r')
p2--;
/* skip empty line */
if (p2 == p)
break;
len = p2 - p;
if (len > sizeof(line) - 1)
len = sizeof(line) - 1;
memcpy(line, p, len);
line[len] = '\0';
rtsp_parse_line(header, line);
p = p1 + 1;
}
/* handle sequence number */
c->seq = header->seq;
if (!strcmp(cmd, "DESCRIBE")) {
rtsp_cmd_describe(c, url);
} else if (!strcmp(cmd, "SETUP")) {
rtsp_cmd_setup(c, url, header);
} else if (!strcmp(cmd, "PLAY")) {
rtsp_cmd_play(c, url, header);
} else if (!strcmp(cmd, "PAUSE")) {
rtsp_cmd_pause(c, url, header);
} else if (!strcmp(cmd, "TEARDOWN")) {
rtsp_cmd_teardown(c, url, header);
} else {
rtsp_reply_error(c, RTSP_STATUS_METHOD);
}
the_end:
len = url_close_dyn_buf(c->pb, &c->pb_buffer);
c->pb = NULL; /* safety */
if (len < 0) {
/* XXX: cannot do more */
return -1;
}
c->buffer_ptr = c->pb_buffer;
c->buffer_end = c->pb_buffer + len;
c->state = RTSPSTATE_SEND_REPLY;
return 0;
}
static int prepare_sdp_description(HTTPContext *c,
FFStream *stream, UINT8 **pbuffer)
{
ByteIOContext pb1, *pb = &pb1;
struct sockaddr_in my_addr;
int len, i, payload_type;
const char *ipstr, *title, *mediatype;
AVStream *st;
len = sizeof(my_addr);
getsockname(c->fd, (struct sockaddr *)&my_addr, &len);
ipstr = inet_ntoa(my_addr.sin_addr);
if (url_open_dyn_buf(pb) < 0)
return -1;
/* general media info */
url_fprintf(pb, "v=0\n");
url_fprintf(pb, "o=- 0 0 IN IP4 %s\n", ipstr);
title = stream->title;
if (title[0] == '\0')
title = "No Title";
url_fprintf(pb, "s=%s\n", title);
if (stream->comment[0] != '\0')
url_fprintf(pb, "i=%s\n", stream->comment);
/* for each stream, we output the necessary info */
for(i = 0; i < stream->nb_streams; i++) {
st = stream->streams[i];
switch(st->codec.codec_type) {
case CODEC_TYPE_AUDIO:
mediatype = "audio";
break;
case CODEC_TYPE_VIDEO:
mediatype = "video";
break;
default:
mediatype = "application";
break;
}
/* XXX: the port indication is not correct (but should be correct
for broadcast) */
payload_type = rtp_get_payload_type(&st->codec);
url_fprintf(pb, "m=%s %d RTP/AVP %d\n",
mediatype, 0, payload_type);
url_fprintf(pb, "a=control:streamid=%d\n", i);
}
return url_close_dyn_buf(pb, pbuffer);
}
static void rtsp_cmd_describe(HTTPContext *c, const char *url)
{
FFStream *stream;
char path1[1024];
const char *path;
UINT8 *content;
int content_length;
/* find which url is asked */
url_split(NULL, 0, NULL, 0, NULL, path1, sizeof(path1), url);
path = path1;
if (*path == '/')
path++;
for(stream = first_stream; stream != NULL; stream = stream->next) {
if (!stream->is_feed && stream->fmt == &rtp_mux &&
!strcmp(path, stream->filename)) {
goto found;
}
}
/* no stream found */
rtsp_reply_error(c, RTSP_STATUS_SERVICE); /* XXX: right error ? */
return;
found:
/* prepare the media description in sdp format */
content_length = prepare_sdp_description(c, stream, &content);
if (content_length < 0) {
rtsp_reply_error(c, RTSP_STATUS_INTERNAL);
return;
}
rtsp_reply_header(c, RTSP_STATUS_OK);
url_fprintf(c->pb, "Content-Type: application/sdp\r\n");
url_fprintf(c->pb, "Content-Length: %d\r\n", content_length);
url_fprintf(c->pb, "\r\n");
put_buffer(c->pb, content, content_length);
}
static HTTPContext *find_rtp_session(const char *session_id)
{
HTTPContext *c;
if (session_id[0] == '\0')
return NULL;
for(c = first_http_ctx; c != NULL; c = c->next) {
if (!strcmp(c->session_id, session_id))
return c;
}
return NULL;
}
RTSPTransportField *find_transport(RTSPHeader *h, enum RTSPProtocol protocol)
{
RTSPTransportField *th;
int i;
for(i=0;i<h->nb_transports;i++) {
th = &h->transports[i];
if (th->protocol == protocol)
return th;
}
return NULL;
}
static void rtsp_cmd_setup(HTTPContext *c, const char *url,
RTSPHeader *h)
{
FFStream *stream;
int stream_index, port;
char buf[1024];
char path1[1024];
const char *path;
HTTPContext *rtp_c;
RTSPTransportField *th;
struct sockaddr_in dest_addr;
RTSPActionServerSetup setup;
/* find which url is asked */
url_split(NULL, 0, NULL, 0, NULL, path1, sizeof(path1), url);
path = path1;
if (*path == '/')
path++;
/* now check each stream */
for(stream = first_stream; stream != NULL; stream = stream->next) {
if (!stream->is_feed && stream->fmt == &rtp_mux) {
/* accept aggregate filenames only if single stream */
if (!strcmp(path, stream->filename)) {
if (stream->nb_streams != 1) {
rtsp_reply_error(c, RTSP_STATUS_AGGREGATE);
return;
}
stream_index = 0;
goto found;
}
for(stream_index = 0; stream_index < stream->nb_streams;
stream_index++) {
snprintf(buf, sizeof(buf), "%s/streamid=%d",
stream->filename, stream_index);
if (!strcmp(path, buf))
goto found;
}
}
}
/* no stream found */
rtsp_reply_error(c, RTSP_STATUS_SERVICE); /* XXX: right error ? */
return;
found:
/* generate session id if needed */
if (h->session_id[0] == '\0') {
snprintf(h->session_id, sizeof(h->session_id),
"%08x%08x", (int)random(), (int)random());
}
/* find rtp session, and create it if none found */
rtp_c = find_rtp_session(h->session_id);
if (!rtp_c) {
rtp_c = rtp_new_connection(c, stream, h->session_id);
if (!rtp_c) {
rtsp_reply_error(c, RTSP_STATUS_BANDWIDTH);
return;
}
/* open input stream */
if (open_input_stream(rtp_c, "") < 0) {
rtsp_reply_error(c, RTSP_STATUS_INTERNAL);
return;
}
/* always prefer UDP */
th = find_transport(h, RTSP_PROTOCOL_RTP_UDP);
if (!th) {
th = find_transport(h, RTSP_PROTOCOL_RTP_TCP);
if (!th) {
rtsp_reply_error(c, RTSP_STATUS_TRANSPORT);
return;
}
}
rtp_c->rtp_protocol = th->protocol;
}
/* test if stream is OK (test needed because several SETUP needs
to be done for a given file) */
if (rtp_c->stream != stream) {
rtsp_reply_error(c, RTSP_STATUS_SERVICE);
return;
}
/* test if stream is already set up */
if (rtp_c->rtp_ctx[stream_index]) {
rtsp_reply_error(c, RTSP_STATUS_STATE);
return;
}
/* check transport */
th = find_transport(h, rtp_c->rtp_protocol);
if (!th || (th->protocol == RTSP_PROTOCOL_RTP_UDP &&
th->client_port_min <= 0)) {
rtsp_reply_error(c, RTSP_STATUS_TRANSPORT);
return;
}
/* setup default options */
setup.transport_option[0] = '\0';
dest_addr = rtp_c->from_addr;
dest_addr.sin_port = htons(th->client_port_min);
/* add transport option if needed */
if (ff_rtsp_callback) {
setup.ipaddr = ntohl(dest_addr.sin_addr.s_addr);
if (ff_rtsp_callback(RTSP_ACTION_SERVER_SETUP, rtp_c->session_id,
(char *)&setup, sizeof(setup),
stream->rtsp_option) < 0) {
rtsp_reply_error(c, RTSP_STATUS_TRANSPORT);
return;
}
dest_addr.sin_addr.s_addr = htonl(setup.ipaddr);
}
/* setup stream */
if (rtp_new_av_stream(rtp_c, stream_index, &dest_addr) < 0) {
rtsp_reply_error(c, RTSP_STATUS_TRANSPORT);
return;
}
/* now everything is OK, so we can send the connection parameters */
rtsp_reply_header(c, RTSP_STATUS_OK);
/* session ID */
url_fprintf(c->pb, "Session: %s\r\n", rtp_c->session_id);
switch(rtp_c->rtp_protocol) {
case RTSP_PROTOCOL_RTP_UDP:
port = rtp_get_local_port(rtp_c->rtp_handles[stream_index]);
url_fprintf(c->pb, "Transport: RTP/AVP/UDP;unicast;"
"client_port=%d-%d;server_port=%d-%d",
th->client_port_min, th->client_port_min + 1,
port, port + 1);
break;
case RTSP_PROTOCOL_RTP_TCP:
url_fprintf(c->pb, "Transport: RTP/AVP/TCP;interleaved=%d-%d",
stream_index * 2, stream_index * 2 + 1);
break;
default:
break;
}
if (setup.transport_option[0] != '\0') {
url_fprintf(c->pb, ";%s", setup.transport_option);
}
url_fprintf(c->pb, "\r\n");
url_fprintf(c->pb, "\r\n");
}
/* find an rtp connection by using the session ID. Check consistency
with filename */
static HTTPContext *find_rtp_session_with_url(const char *url,
const char *session_id)
{
HTTPContext *rtp_c;
char path1[1024];
const char *path;
rtp_c = find_rtp_session(session_id);
if (!rtp_c)
return NULL;
/* find which url is asked */
url_split(NULL, 0, NULL, 0, NULL, path1, sizeof(path1), url);
path = path1;
if (*path == '/')
path++;
if (strcmp(path, rtp_c->stream->filename) != 0)
return NULL;
return rtp_c;
}
static void rtsp_cmd_play(HTTPContext *c, const char *url, RTSPHeader *h)
{
HTTPContext *rtp_c;
rtp_c = find_rtp_session_with_url(url, h->session_id);
if (!rtp_c) {
rtsp_reply_error(c, RTSP_STATUS_SESSION);
return;
}
if (rtp_c->state != HTTPSTATE_SEND_DATA &&
rtp_c->state != HTTPSTATE_WAIT_FEED &&
rtp_c->state != HTTPSTATE_READY) {
rtsp_reply_error(c, RTSP_STATUS_STATE);
return;
}
rtp_c->state = HTTPSTATE_SEND_DATA;
/* now everything is OK, so we can send the connection parameters */
rtsp_reply_header(c, RTSP_STATUS_OK);
/* session ID */
url_fprintf(c->pb, "Session: %s\r\n", rtp_c->session_id);
url_fprintf(c->pb, "\r\n");
}
static void rtsp_cmd_pause(HTTPContext *c, const char *url, RTSPHeader *h)
{
HTTPContext *rtp_c;
rtp_c = find_rtp_session_with_url(url, h->session_id);
if (!rtp_c) {
rtsp_reply_error(c, RTSP_STATUS_SESSION);
return;
}
if (rtp_c->state != HTTPSTATE_SEND_DATA &&
rtp_c->state != HTTPSTATE_WAIT_FEED) {
rtsp_reply_error(c, RTSP_STATUS_STATE);
return;
}
rtp_c->state = HTTPSTATE_READY;
/* now everything is OK, so we can send the connection parameters */
rtsp_reply_header(c, RTSP_STATUS_OK);
/* session ID */
url_fprintf(c->pb, "Session: %s\r\n", rtp_c->session_id);
url_fprintf(c->pb, "\r\n");
}
static void rtsp_cmd_teardown(HTTPContext *c, const char *url, RTSPHeader *h)
{
HTTPContext *rtp_c;
rtp_c = find_rtp_session_with_url(url, h->session_id);
if (!rtp_c) {
rtsp_reply_error(c, RTSP_STATUS_SESSION);
return;
}
/* abort the session */
close_connection(rtp_c);
if (ff_rtsp_callback) {
ff_rtsp_callback(RTSP_ACTION_SERVER_TEARDOWN, rtp_c->session_id,
NULL, 0,
rtp_c->stream->rtsp_option);
}
/* now everything is OK, so we can send the connection parameters */
rtsp_reply_header(c, RTSP_STATUS_OK);
/* session ID */
url_fprintf(c->pb, "Session: %s\r\n", rtp_c->session_id);
url_fprintf(c->pb, "\r\n");
}
/********************************************************************/
/* RTP handling */
static HTTPContext *rtp_new_connection(HTTPContext *rtsp_c,
FFStream *stream, const char *session_id)
{
HTTPContext *c = NULL;
/* XXX: should output a warning page when coming
close to the connection limit */
if (nb_connections >= nb_max_connections)
goto fail;
/* add a new connection */
c = av_mallocz(sizeof(HTTPContext));
if (!c)
goto fail;
c->fd = -1;
c->poll_entry = NULL;
c->from_addr = rtsp_c->from_addr;
c->buffer_size = IOBUFFER_INIT_SIZE;
c->buffer = av_malloc(c->buffer_size);
if (!c->buffer)
goto fail;
nb_connections++;
c->stream = stream;
pstrcpy(c->session_id, sizeof(c->session_id), session_id);
c->state = HTTPSTATE_READY;
c->is_packetized = 1;
/* protocol is shown in statistics */
pstrcpy(c->protocol, sizeof(c->protocol), "RTP");
c->next = first_http_ctx;
first_http_ctx = c;
return c;
fail:
if (c) {
av_free(c->buffer);
av_free(c);
}
return NULL;
}
/* add a new RTP stream in an RTP connection (used in RTSP SETUP
command). if dest_addr is NULL, then TCP tunneling in RTSP is
used. */
static int rtp_new_av_stream(HTTPContext *c,
int stream_index, struct sockaddr_in *dest_addr)
{
AVFormatContext *ctx;
AVStream *st;
char *ipaddr;
URLContext *h;
UINT8 *dummy_buf;
/* now we can open the relevant output stream */
ctx = av_mallocz(sizeof(AVFormatContext));
if (!ctx)
return -1;
ctx->oformat = &rtp_mux;
st = av_mallocz(sizeof(AVStream));
if (!st)
goto fail;
ctx->nb_streams = 1;
ctx->streams[0] = st;
if (!c->stream->feed ||
c->stream->feed == c->stream) {
memcpy(st, c->stream->streams[stream_index], sizeof(AVStream));
} else {
memcpy(st,
c->stream->feed->streams[c->stream->feed_streams[stream_index]],
sizeof(AVStream));
}
if (dest_addr) {
/* build destination RTP address */
ipaddr = inet_ntoa(dest_addr->sin_addr);
snprintf(ctx->filename, sizeof(ctx->filename),
"rtp://%s:%d", ipaddr, ntohs(dest_addr->sin_port));
printf("open %s\n", ctx->filename);
if (url_open(&h, ctx->filename, URL_WRONLY) < 0)
goto fail;
c->rtp_handles[stream_index] = h;
} else {
goto fail;
}
/* normally, no packets should be output here, but the packet size may be checked */
if (url_open_dyn_packet_buf(&ctx->pb,
url_get_max_packet_size(h)) < 0) {
/* XXX: close stream */
goto fail;
}
if (av_write_header(ctx) < 0) {
fail:
if (h)
url_close(h);
av_free(ctx);
return -1;
}
url_close_dyn_buf(&ctx->pb, &dummy_buf);
av_free(dummy_buf);
c->rtp_ctx[stream_index] = ctx;
return 0;
}
/********************************************************************/
/* ffserver initialization */
AVStream *add_av_stream1(FFStream *stream, AVCodecContext *codec)
{
AVStream *fst;
fst = av_mallocz(sizeof(AVStream));
if (!fst)
return NULL;
fst->priv_data = av_mallocz(sizeof(FeedData));
memcpy(&fst->codec, codec, sizeof(AVCodecContext));
stream->streams[stream->nb_streams++] = fst;
return fst;
}
/* return the stream number in the feed */
int add_av_stream(FFStream *feed,
AVStream *st)
......@@ -1862,17 +2978,66 @@ int add_av_stream(FFStream *feed,
}
}
fst = av_mallocz(sizeof(AVStream));
fst = add_av_stream1(feed, av);
if (!fst)
return -1;
fst->priv_data = av_mallocz(sizeof(FeedData));
memcpy(&fst->codec, av, sizeof(AVCodecContext));
feed->streams[feed->nb_streams++] = fst;
return feed->nb_streams - 1;
found:
return i;
}
void remove_stream(FFStream *stream)
{
FFStream **ps;
ps = &first_stream;
while (*ps != NULL) {
if (*ps == stream) {
*ps = (*ps)->next;
} else {
ps = &(*ps)->next;
}
}
}
/* compute the needed AVStream for each file */
void build_file_streams(void)
{
FFStream *stream, *stream_next;
AVFormatContext *infile;
int i;
/* gather all streams */
for(stream = first_stream; stream != NULL; stream = stream_next) {
stream_next = stream->next;
if (stream->stream_type == STREAM_TYPE_LIVE &&
!stream->feed) {
/* the stream comes from a file */
/* try to open the file */
/* open stream */
if (av_open_input_file(&infile, stream->feed_filename,
NULL, 0, NULL) < 0) {
http_log("%s not found", stream->feed_filename);
/* remove stream (no need to spend more time on it) */
fail:
remove_stream(stream);
} else {
/* find all the AVStreams inside and reference them in
'stream' */
if (av_find_stream_info(infile) < 0) {
http_log("Could not find codec parameters from '%s'",
stream->feed_filename);
av_close_input_file(infile);
goto fail;
}
for(i=0;i<infile->nb_streams;i++) {
add_av_stream1(stream, &infile->streams[i]->codec);
}
av_close_input_file(infile);
}
}
}
}
/* compute the needed AVStream for each feed */
void build_feed_streams(void)
{
......@@ -1884,6 +3049,7 @@ void build_feed_streams(void)
feed = stream->feed;
if (feed) {
if (!stream->is_feed) {
/* we handle a stream coming from a feed */
for(i=0;i<stream->nb_streams;i++) {
stream->feed_streams[i] = add_av_stream(feed, stream->streams[i]);
}
......@@ -2060,6 +3226,30 @@ int opt_video_codec(const char *arg)
return p->id;
}
/* simplistic plugin support */
void load_module(const char *filename)
{
void *dll;
void (*init_func)(void);
dll = dlopen(filename, RTLD_NOW);
if (!dll) {
fprintf(stderr, "Could not load module '%s' - %s\n",
filename, dlerror());
return;
}
init_func = dlsym(dll, "ffserver_module_init");
if (!init_func) {
fprintf(stderr,
"%s: init function 'ffserver_module_init()' not found\n",
filename);
dlclose(dll);
}
init_func();
}
int parse_ffconfig(const char *filename)
{
FILE *f;
......@@ -2104,10 +3294,22 @@ int parse_ffconfig(const char *filename)
if (!strcasecmp(cmd, "Port")) {
get_arg(arg, sizeof(arg), &p);
my_addr.sin_port = htons (atoi(arg));
my_http_addr.sin_port = htons (atoi(arg));
} else if (!strcasecmp(cmd, "BindAddress")) {
get_arg(arg, sizeof(arg), &p);
if (!inet_aton(arg, &my_addr.sin_addr)) {
if (!inet_aton(arg, &my_http_addr.sin_addr)) {
fprintf(stderr, "%s:%d: Invalid IP address: %s\n",
filename, line_num, arg);
errors++;
}
} else if (!strcasecmp(cmd, "NoDaemon")) {
ffserver_daemon = 0;
} else if (!strcasecmp(cmd, "RTSPPort")) {
get_arg(arg, sizeof(arg), &p);
my_rtsp_addr.sin_port = htons (atoi(arg));
} else if (!strcasecmp(cmd, "RTSPBindAddress")) {
get_arg(arg, sizeof(arg), &p);
if (!inet_aton(arg, &my_rtsp_addr.sin_addr)) {
fprintf(stderr, "%s:%d: Invalid IP address: %s\n",
filename, line_num, arg);
errors++;
......@@ -2185,7 +3387,7 @@ int parse_ffconfig(const char *filename)
feed->child_argv[i] = av_malloc(30 + strlen(feed->filename));
snprintf(feed->child_argv[i], 256, "http://127.0.0.1:%d/%s",
ntohs(my_addr.sin_port), feed->filename);
ntohs(my_http_addr.sin_port), feed->filename);
}
} else if (!strcasecmp(cmd, "File")) {
if (feed) {
......@@ -2428,6 +3630,16 @@ int parse_ffconfig(const char *filename)
video_id = CODEC_ID_NONE;
} else if (!strcasecmp(cmd, "NoAudio")) {
audio_id = CODEC_ID_NONE;
} else if (!strcasecmp(cmd, "RTSPOption")) {
get_arg(arg, sizeof(arg), &p);
if (stream) {
av_freep(&stream->rtsp_option);
/* XXX: av_strdup ? */
stream->rtsp_option = av_malloc(strlen(arg) + 1);
if (stream->rtsp_option) {
strcpy(stream->rtsp_option, arg);
}
}
} else if (!strcasecmp(cmd, "</Stream>")) {
if (!stream) {
fprintf(stderr, "%s:%d: No corresponding <Stream> for </Stream>\n",
......@@ -2481,6 +3693,9 @@ int parse_ffconfig(const char *filename)
errors++;
}
redirect = NULL;
} else if (!strcasecmp(cmd, "LoadModule")) {
get_arg(arg, sizeof(arg), &p);
load_module(arg);
} else {
fprintf(stderr, "%s:%d: Incorrect keyword: '%s'\n",
filename, line_num, cmd);
......@@ -2496,12 +3711,6 @@ int parse_ffconfig(const char *filename)
}
void *http_server_thread(void *arg)
{
http_server(my_addr);
return NULL;
}
#if 0
static void write_packet(FFCodec *ffenc,
UINT8 *buf, int size)
......@@ -2590,9 +3799,10 @@ int main(int argc, char **argv)
config_filename = "/etc/ffserver.conf";
my_program_name = argv[0];
ffserver_daemon = 1;
for(;;) {
c = getopt_long_only(argc, argv, "ndLh?f:", NULL, NULL);
c = getopt(argc, argv, "ndLh?f:");
if (c == -1)
break;
switch(c) {
......@@ -2608,6 +3818,7 @@ int main(int argc, char **argv)
break;
case 'd':
ffserver_debug = 1;
ffserver_daemon = 0;
break;
case 'f':
config_filename = optarg;
......@@ -2619,10 +3830,16 @@ int main(int argc, char **argv)
putenv("http_proxy"); /* Kill the http_proxy */
/* address on which the server will handle connections */
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons (8080);
my_addr.sin_addr.s_addr = htonl (INADDR_ANY);
/* address on which the server will handle HTTP connections */
my_http_addr.sin_family = AF_INET;
my_http_addr.sin_port = htons (8080);
my_http_addr.sin_addr.s_addr = htonl (INADDR_ANY);
/* address on which the server will handle RTSP connections */
my_rtsp_addr.sin_family = AF_INET;
my_rtsp_addr.sin_port = htons (5454);
my_rtsp_addr.sin_addr.s_addr = htonl (INADDR_ANY);
nb_max_connections = 5;
nb_max_bandwidth = 1000;
first_stream = NULL;
......@@ -2638,8 +3855,34 @@ int main(int argc, char **argv)
exit(1);
}
build_file_streams();
build_feed_streams();
/* put the process in background and detach it from its TTY */
if (ffserver_daemon) {
int pid;
pid = fork();
if (pid < 0) {
perror("fork");
exit(1);
} else if (pid > 0) {
/* parent : exit */
exit(0);
} else {
/* child */
setsid();
chdir("/");
close(0);
close(1);
close(2);
open("/dev/null", O_RDWR);
dup(0);
dup(0);
}
}
/* signal init */
signal(SIGPIPE, SIG_IGN);
......@@ -2651,8 +3894,8 @@ int main(int argc, char **argv)
logfile = fopen(logfilename, "w");
}
if (http_server(my_addr) < 0) {
fprintf(stderr, "Could not start http server\n");
if (http_server() < 0) {
fprintf(stderr, "Could not start server\n");
exit(1);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment