TT#5566 rudimentary wav file output

Change-Id: Icdc97a9dc849bba6ba6add12d0bdd17f8b7712cd
changes/85/9685/8
Richard Fuchs 9 years ago
parent dbd1c36be0
commit d7fa0689f9

@ -7,7 +7,7 @@ CFLAGS+= `pkg-config --cflags zlib`
CFLAGS+= `pkg-config --cflags openssl`
CFLAGS+= `pkg-config --cflags libevent_pthreads`
CFLAGS+= `pcre-config --cflags`
CFLAGS+= -I../kernel-module/
CFLAGS+= -I. -I../kernel-module/ -I../lib/
CFLAGS+= -D_GNU_SOURCE
ifeq ($(RTPENGINE_VERSION),)
@ -68,7 +68,7 @@ endif
SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \
media_socket.c rtcp_xr.c homer.c recording.c
media_socket.c rtcp_xr.c homer.c recording.c rtplib.c
OBJS= $(SRCS:.c=.o)

@ -12,6 +12,7 @@
#include "rtp.h"
#include "rtcp.h"
#include "log.h"
#include "rtplib.h"

@ -19,6 +19,7 @@
#include "log_funcs.h"
#include "poller.h"
#include "recording.h"
#include "rtplib.h"
#ifndef PORT_RANDOM_MIN

@ -8,14 +8,7 @@
#include "str.h"
#include "crypto.h"
#include "log.h"
struct rtp_extension {
u_int16_t undefined;
u_int16_t length;
} __attribute__ ((packed));
#include "rtplib.h"
@ -89,52 +82,6 @@ error:
return -1;
}
int rtp_payload(struct rtp_header **out, str *p, const str *s) {
struct rtp_header *rtp;
struct rtp_extension *ext;
const char *err;
err = "short packet (header)";
if (s->len < sizeof(*rtp))
goto error;
rtp = (void *) s->s;
err = "invalid header version";
if ((rtp->v_p_x_cc & 0xc0) != 0x80) /* version 2 */
goto error;
if (!p)
goto done;
*p = *s;
/* fixed header */
str_shift(p, sizeof(*rtp));
/* csrc list */
err = "short packet (CSRC list)";
if (str_shift(p, (rtp->v_p_x_cc & 0xf) * 4))
goto error;
if ((rtp->v_p_x_cc & 0x10)) {
/* extension */
err = "short packet (extension header)";
if (p->len < sizeof(*ext))
goto error;
ext = (void *) p->s;
err = "short packet (header extensions)";
if (str_shift(p, 4 + ntohs(ext->length) * 4))
goto error;
}
done:
*out = rtp;
return 0;
error:
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Error parsing RTP header: %s", err);
return -1;
}
static u_int64_t packet_index(struct crypto_context *c, struct rtp_header *rtp) {
u_int16_t seq;

@ -9,15 +9,7 @@
struct crypto_context;
struct rtp_header {
unsigned char v_p_x_cc;
unsigned char m_pt;
u_int16_t seq_num;
u_int32_t timestamp;
u_int32_t ssrc;
u_int32_t csrc[];
} __attribute__ ((packed));
struct rtp_header;
struct rtp_payload_type {
unsigned int payload_type;
@ -31,7 +23,6 @@ struct rtp_payload_type {
int rtp_payload(struct rtp_header **out, str *p, const str *s);
const struct rtp_payload_type *rtp_payload_type(unsigned int, GHashTable *);
int rtp_avp2savp(str *, struct crypto_context *);

@ -0,0 +1 @@
../lib/rtplib.c

1
lib/.gitignore vendored

@ -0,0 +1 @@
*.o

@ -0,0 +1,74 @@
#include "rtplib.h"
#include <arpa/inet.h>
#include "str.h"
#include "log.h"
struct rtp_extension {
u_int16_t undefined;
u_int16_t length;
} __attribute__ ((packed));
int rtp_payload(struct rtp_header **out, str *p, const str *s) {
struct rtp_header *rtp;
struct rtp_extension *ext;
const char *err;
err = "short packet (header)";
if (s->len < sizeof(*rtp))
goto error;
rtp = (void *) s->s;
err = "invalid header version";
if ((rtp->v_p_x_cc & 0xc0) != 0x80) /* version 2 */
goto error;
if (!p)
goto done;
*p = *s;
/* fixed header */
str_shift(p, sizeof(*rtp));
/* csrc list */
err = "short packet (CSRC list)";
if (str_shift(p, (rtp->v_p_x_cc & 0xf) * 4))
goto error;
if ((rtp->v_p_x_cc & 0x10)) {
/* extension */
err = "short packet (extension header)";
if (p->len < sizeof(*ext))
goto error;
ext = (void *) p->s;
err = "short packet (header extensions)";
if (str_shift(p, 4 + ntohs(ext->length) * 4))
goto error;
}
done:
*out = rtp;
return 0;
error:
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Error parsing RTP header: %s", err);
return -1;
}
int rtp_padding(struct rtp_header *header, str *payload) {
if (!(header->v_p_x_cc & 0x20))
return 0; // no padding
if (payload->len == 0)
return -1;
unsigned int padding = (unsigned char) payload->s[payload->len - 1];
if (payload->len < padding)
return -1;
payload->len -= padding;
return 0;
}

@ -0,0 +1,22 @@
#ifndef _RTPLIB_H_
#define _RTPLIB_H_
#include <stdint.h>
#include "str.h"
struct rtp_header {
unsigned char v_p_x_cc;
unsigned char m_pt;
uint16_t seq_num;
uint32_t timestamp;
uint32_t ssrc;
uint32_t csrc[];
} __attribute__ ((packed));
int rtp_payload(struct rtp_header **out, str *p, const str *s);
int rtp_padding(struct rtp_header *header, str *payload);
#endif

@ -18,6 +18,7 @@ flags = [
'-fno-strict-aliasing',
'-I/usr/include/glib-2.0',
'-I/usr/lib/x86_64-linux-gnu/glib-2.0/include',
'-I../lib/',
'-pthread',
'-D_GNU_SOURCE',
'-D__DEBUG=1',

@ -1,12 +1,12 @@
TARGET= rtpengine-recording
CC?=gcc
CFLAGS= -g -Wall -pthread
CFLAGS= -g -Wall -pthread -I. -I../lib/
CFLAGS+= -std=c99
CFLAGS+= -D_GNU_SOURCE -D_POSIX_SOURCE -D_POSIX_C_SOURCE
CFLAGS+= `pkg-config --cflags glib-2.0`
CFLAGS+= `pkg-config --cflags gthread-2.0`
#CFLAGS+= `pcre-config --cflags`
CFLAGS+= `pcre-config --cflags`
CFLAGS+= `pkg-config --cflags libavcodec`
CFLAGS+= `pkg-config --cflags libavformat`
CFLAGS+= `pkg-config --cflags libavutil`
@ -20,7 +20,7 @@ endif
LDFLAGS= -lm
LDFLAGS+= `pkg-config --libs glib-2.0`
LDFLAGS+= `pkg-config --libs gthread-2.0`
#LDFLAGS+= `pcre-config --libs`
LDFLAGS+= `pcre-config --libs`
LDFLAGS+= `pkg-config --libs libavcodec`
LDFLAGS+= `pkg-config --libs libavformat`
LDFLAGS+= `pkg-config --libs libavutil`
@ -35,7 +35,8 @@ ifneq ($(DBG),yes)
endif
endif
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c aux.c
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c rtplib.c packet.c \
decoder.c
OBJS= $(SRCS:.c=.o)

@ -0,0 +1,200 @@
#include "decoder.h"
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <glib.h>
#include <stdint.h>
#include "types.h"
#include "log.h"
struct decoder_s {
AVCodecContext *avcctx;
AVPacket avpkt;
AVFrame *frame;
unsigned long rtp_ts;
uint64_t pts;
};
struct output_s {
AVCodecContext *avcctx;
AVFormatContext *fmtctx;
AVStream *avst;
AVPacket avpkt;
};
decoder_t *decoder_new(unsigned int payload_type, const char *payload_str) {
decoder_t *ret = g_slice_alloc0(sizeof(*ret));
// XXX error reporting
AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_PCM_ALAW);
ret->avcctx = avcodec_alloc_context3(codec);
if (!ret->avcctx)
goto err;
ret->avcctx->channels = 1;
ret->avcctx->sample_rate = 8000;
int i = avcodec_open2(ret->avcctx, codec, NULL);
if (i)
goto err;
av_init_packet(&ret->avpkt);
ret->frame = av_frame_alloc();
if (!ret->frame)
goto err;
ret->pts = (uint64_t) -1LL;
ret->rtp_ts = (unsigned long) -1L;
return ret;
err:
decoder_close(ret);
return NULL;
}
static int output_add(output_t *output, AVFrame *frame) {
if (!output)
return -1;
#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(57, 0, 0)
int ret = avcodec_send_frame(output->avcctx, frame);
dbg("send frame ret %i", ret);
if (ret)
return -1;
ret = avcodec_receive_packet(output->avcctx, &output->avpkt);
dbg("receive packet ret %i", ret);
if (ret)
return -1;
#else
int got_packet = 0;
int ret = avcodec_encode_audio2(output->avcctx, &output->avpkt, frame, &got_packet);
dbg("encode frame ret %i, got packet %i", ret, got_packet);
if (!got_packet)
return 0;
#endif
av_write_frame(output->fmtctx, &output->avpkt);
return 0;
}
int decoder_input(decoder_t *dec, const str *data, unsigned long ts, output_t *output) {
if (G_UNLIKELY(!dec))
return -1;
if (G_UNLIKELY(dec->rtp_ts == (unsigned long) -1L)) {
// initialize pts
dec->pts = 0;
}
else {
// shift pts according to rtp ts shift
dec->pts += (ts - dec->rtp_ts) * output->avst->time_base.num * 8000 / output->avst->time_base.den;
}
dec->rtp_ts = ts;
dec->avpkt.data = (unsigned char *) data->s;
dec->avpkt.size = data->len;
dec->avpkt.pts = dec->pts;
#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(57, 0, 0)
int ret = avcodec_send_packet(dec->avcctx, &dec->avpkt);
dbg("send packet ret %i", ret);
if (ret)
return -1;
ret = avcodec_receive_frame(dec->avcctx, dec->frame);
dbg("receive frame ret %i", ret);
if (ret)
return -1;
#else
int got_frame = 0;
int ret = avcodec_decode_audio4(dec->avcctx, dec->frame, &got_frame, &dec->avpkt);
dbg("decode frame ret %i, got frame %i", ret, got_frame);
if (!got_frame)
return 0;
#endif
dec->frame->pts = dec->frame->pkt_pts;
output_add(output, dec->frame);
return 0;
}
output_t *output_new(const char *filename) {
output_t *ret = g_slice_alloc0(sizeof(*ret));
// XXX error reporting
ret->fmtctx = avformat_alloc_context();
if (!ret->fmtctx)
goto err;
ret->fmtctx->oformat = av_guess_format("wav", NULL, NULL); // XXX better way?
if (!ret->fmtctx->oformat)
goto err;
AVCodec *codec = avcodec_find_encoder(AV_CODEC_ID_PCM_S16LE);
// XXX error handling
ret->avst = avformat_new_stream(ret->fmtctx, codec);
if (!ret->avst)
goto err;
#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(57, 0, 0)
ret->avcctx = avcodec_alloc_context3(codec);
if (!ret->avcctx)
goto err;
#else
ret->avcctx = ret->avst->codec;
#endif
ret->avcctx->channels = 1;
ret->avcctx->sample_rate = 8000;
ret->avcctx->sample_fmt = AV_SAMPLE_FMT_S16;
ret->avcctx->time_base = (AVRational){8000,1};
ret->avst->time_base = ret->avcctx->time_base;
#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(57, 0, 0)
avcodec_parameters_from_context(ret->avst->codecpar, ret->avcctx);
#endif
int i = avcodec_open2(ret->avcctx, codec, NULL);
if (i)
goto err;
i = avio_open(&ret->fmtctx->pb, filename, AVIO_FLAG_WRITE);
if (i < 0)
goto err;
i = avformat_write_header(ret->fmtctx, NULL);
if (i)
goto err;
av_init_packet(&ret->avpkt);
return ret;
err:
output_close(ret);
return NULL;
}
void decoder_close(decoder_t *dec) {
if (!dec)
return;
avcodec_free_context(&dec->avcctx);
av_frame_free(&dec->frame);
g_slice_free1(sizeof(*dec), dec);
}
void output_close(output_t *output) {
if (!output)
return;
av_write_trailer(output->fmtctx);
avcodec_close(output->avcctx);
avio_closep(&output->fmtctx->pb);
avformat_free_context(output->fmtctx);
g_slice_free1(sizeof(*output), output);
}

@ -0,0 +1,16 @@
#ifndef _DECODER_H_
#define _DECODER_H_
#include "types.h"
#include "str.h"
decoder_t *decoder_new(unsigned int payload_type, const char *payload_str);
int decoder_input(decoder_t *, const str *, unsigned long ts, output_t *);
void decoder_close(decoder_t *);
output_t *output_new(const char *filename);
void output_close(output_t *);
#endif

@ -10,7 +10,8 @@
#include "stream.h"
#include "garbage.h"
#include "main.h"
#include "aux.h"
#include "recaux.h"
#include "packet.h"
static pthread_mutex_t metafiles_lock = PTHREAD_MUTEX_INITIALIZER;
@ -32,6 +33,7 @@ static void meta_free(void *ptr) {
}
g_ptr_array_free(mf->streams, TRUE);
g_slice_free1(sizeof(*mf), mf);
g_hash_table_destroy(mf->ssrc_hash);
}
@ -65,6 +67,15 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i
char *payload_type)
{
dbg("payload type in media %lu num %u is %s", mnum, payload_num, payload_type);
if (payload_num >= 128) {
ilog(LOG_ERR, "Payload type number %u is invalid", payload_num);
return;
}
pthread_mutex_lock(&mf->payloads_lock);
mf->payload_types[payload_num] = g_string_chunk_insert(mf->gsc, payload_type);
pthread_mutex_unlock(&mf->payloads_lock);
}
@ -86,23 +97,37 @@ static void meta_section(metafile_t *mf, char *section, char *content, unsigned
}
void metafile_change(char *name) {
// returns mf locked
static metafile_t *metafile_get(char *name) {
// get or create metafile metadata
pthread_mutex_lock(&metafiles_lock);
metafile_t *mf = g_hash_table_lookup(metafiles, name);
if (!mf) {
dbg("allocating metafile info for %s", name);
mf = g_slice_alloc0(sizeof(*mf));
mf->gsc = g_string_chunk_new(0);
mf->name = g_string_chunk_insert(mf->gsc, name);
pthread_mutex_init(&mf->lock, NULL);
mf->streams = g_ptr_array_new();
g_hash_table_insert(metafiles, mf->name, mf);
}
if (mf)
goto out;
dbg("allocating metafile info for %s", name);
mf = g_slice_alloc0(sizeof(*mf));
mf->gsc = g_string_chunk_new(0);
mf->name = g_string_chunk_insert(mf->gsc, name);
pthread_mutex_init(&mf->lock, NULL);
pthread_mutex_init(&mf->payloads_lock, NULL);
mf->streams = g_ptr_array_new();
mf->ssrc_hash = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_free);
g_hash_table_insert(metafiles, mf->name, mf);
out:
// switch locks
pthread_mutex_lock(&mf->lock);
pthread_mutex_unlock(&metafiles_lock);
return mf;
}
void metafile_change(char *name) {
metafile_t *mf = metafile_get(name);
char fnbuf[PATH_MAX];
snprintf(fnbuf, sizeof(fnbuf), "%s/%s", SPOOL_DIR, name);
@ -131,6 +156,7 @@ void metafile_change(char *name) {
close(fd);
// process contents of metadata file
// XXX use "str" type?
char *head = s->str;
char *endp = s->str + s->len;
while (head < endp) {

@ -0,0 +1,181 @@
#include "packet.h"
#include <netinet/ip.h>
#include <netinet/ip6.h>
#include <netinet/udp.h>
#include <glib.h>
#include <unistd.h>
#include "types.h"
#include "log.h"
#include "rtplib.h"
#include "str.h"
#include "decoder.h"
static int packet_cmp(const void *A, const void *B, void *dummy) {
const packet_t *a = A, *b = B;
if (a->seq < b->seq)
return -1;
if (a->seq > b->seq)
return 1;
return 0;
}
static void packet_free(void *p) {
packet_t *packet = p;
if (!packet)
return;
free(packet->buffer);
g_slice_free1(sizeof(*packet), packet);
}
void ssrc_free(void *p) {
ssrc_t *s = p;
g_tree_destroy(s->packets);
output_close(s->output);
for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++)
decoder_close(s->decoders[i]);
g_slice_free1(sizeof(*s), s);
}
// mf must be unlocked; returns ssrc locked
static ssrc_t *ssrc_get(metafile_t *mf, unsigned long ssrc) {
pthread_mutex_lock(&mf->lock);
ssrc_t *ret = g_hash_table_lookup(mf->ssrc_hash, GUINT_TO_POINTER(ssrc));
if (ret)
goto out;
ret = g_slice_alloc0(sizeof(*ret));
pthread_mutex_init(&ret->lock, NULL);
ret->metafile = mf;
ret->ssrc = ssrc;
ret->packets = g_tree_new_full(packet_cmp, NULL, NULL, packet_free);
ret->seq = -1;
char buf[256];
snprintf(buf, sizeof(buf), "%s-%08lx.wav", mf->parent, ssrc);
ret->output = output_new(buf);
g_hash_table_insert(mf->ssrc_hash, GUINT_TO_POINTER(ssrc), ret);
out:
pthread_mutex_lock(&ret->lock);
pthread_mutex_unlock(&mf->lock);
return ret;
}
static gboolean ssrc_tree_get_first(void *key, void *val, void *data) {
packet_t **out = data;
*out = val;
return TRUE;
}
// ssrc is locked and must be unlocked when returning
static void ssrc_run(ssrc_t *ssrc) {
while (1) {
// inspect first packet to see if seq is correct
packet_t *first = NULL;
g_tree_foreach(ssrc->packets, ssrc_tree_get_first, &first);
if (!first)
break;
if (first->seq != ssrc->seq)
break; // need to wait for more
// determine payload type and run decoder
unsigned int payload_type = first->rtp->m_pt & 0x7f;
metafile_t *mf = ssrc->metafile;
pthread_mutex_lock(&mf->payloads_lock);
char *payload_str = mf->payload_types[payload_type];
pthread_mutex_unlock(&mf->payloads_lock);
dbg("processing packet seq %i, payload type is %s", first->seq, payload_str);
g_tree_steal(ssrc->packets, first);
// check if we have a decoder for this payload type yet
if (!ssrc->decoders[payload_type])
ssrc->decoders[payload_type] = decoder_new(payload_type, payload_str);
// XXX error handling
decoder_input(ssrc->decoders[payload_type], &first->payload, ntohl(first->rtp->timestamp),
ssrc->output);
packet_free(first);
dbg("packets left in queue: %i", g_tree_nnodes(ssrc->packets));
ssrc->seq = (ssrc->seq + 1) & 0xffff;
}
pthread_mutex_unlock(&ssrc->lock);
}
// stream is unlocked, buf is malloc'd
void packet_process(stream_t *stream, unsigned char *buf, unsigned len) {
packet_t *packet = g_slice_alloc0(sizeof(*packet));
packet->buffer = buf; // handing it over
// XXX more checking here
str bufstr;
str_init_len(&bufstr, packet->buffer, len);
packet->ip = (void *) bufstr.s;
// XXX kernel already does this - add metadata?
if (packet->ip->version == 4) {
if (str_shift(&bufstr, packet->ip->ihl << 2))
goto err;
}
else {
packet->ip = NULL;
packet->ip6 = (void *) bufstr.s;
if (str_shift(&bufstr, sizeof(*packet->ip6)))
goto err;
}
packet->udp = (void *) bufstr.s;
str_shift(&bufstr, sizeof(*packet->udp));
if (rtp_payload(&packet->rtp, &packet->payload, &bufstr))
goto err;
if (rtp_padding(packet->rtp, &packet->payload))
goto err;
packet->seq = ntohs(packet->rtp->seq_num);
dbg("packet parsed successfully, seq %u", packet->seq);
// insert into ssrc queue
ssrc_t *ssrc = ssrc_get(stream->metafile, ntohl(packet->rtp->ssrc));
// check seq for dupes
if (G_UNLIKELY(ssrc->seq == -1)) {
// first packet we see
ssrc->seq = packet->seq;
goto seq_ok;
}
int diff = packet->seq - ssrc->seq;
if (diff >= 0x8000)
goto dupe;
if (diff < 0 && diff > -0x8000)
goto dupe;
// seq ok - fall thru
seq_ok:
if (g_tree_lookup(ssrc->packets, packet))
goto dupe;
g_tree_insert(ssrc->packets, packet, packet);
// got a new packet, run the decoder
ssrc_run(ssrc);
return;
dupe:
dbg("skipping dupe packet (new seq %i prev seq %i)", packet->seq, ssrc->seq);
pthread_mutex_unlock(&ssrc->lock);
return;
err:
ilog(LOG_WARN, "Failed to parse packet headers");
packet_free(packet);
}

@ -0,0 +1,10 @@
#ifndef _PACKET_H_
#define _PACKET_H_
#include "types.h"
void ssrc_free(void *p);
void packet_process(stream_t *, unsigned char *, unsigned len);
#endif

@ -3,12 +3,12 @@
#include "log.h"
void pcre_build(pcre_t *out, const char *pattern) {
const char *errptr;
int erroff;
out->re = pcre_compile(pattern, PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
if (!out->re)
die("Failed to compile PCRE '%s': %s (at %i)", pattern, errptr, erroff);
out->extra = pcre_study(out->re, 0, &errptr);
}
//void pcre_build(pcre_t *out, const char *pattern) {
// const char *errptr;
// int erroff;
//
// out->re = pcre_compile(pattern, PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
// if (!out->re)
// die("Failed to compile PCRE '%s': %s (at %i)", pattern, errptr, erroff);
// out->extra = pcre_study(out->re, 0, &errptr);
//}

@ -3,6 +3,6 @@
#include "types.h"
void pcre_build(pcre_t *out, const char *pattern);
//void pcre_build(pcre_t *out, const char *pattern);
#endif

@ -1,4 +1,4 @@
#include "aux.h"
#include "recaux.h"
#include <stdio.h>
#include <stdarg.h>

@ -1,5 +1,5 @@
#ifndef _AUX_H_
#define _AUX_H_
#ifndef _RECAUX_H_
#define _RECAUX_H_
extern int __thread __sscanf_hack_var;

@ -0,0 +1 @@
../lib/rtplib.c

@ -4,11 +4,12 @@
#include <unistd.h>
#include <limits.h>
#include <fcntl.h>
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include "metafile.h"
#include "epoll.h"
#include "log.h"
#include "main.h"
#include "packet.h"
// stream is locked
@ -35,8 +36,17 @@ static void stream_handler(handler_t *handler) {
if (stream->fd == -1)
goto out;
char buf[65535];
int ret = read(stream->fd, buf, sizeof(buf));
static const int maxbuflen = 65535;
static const int alloclen = maxbuflen
#ifdef AV_INPUT_BUFFER_PADDING_SIZE
+ AV_INPUT_BUFFER_PADDING_SIZE
#endif
#ifdef FF_INPUT_BUFFER_PADDING_SIZE
+ FF_INPUT_BUFFER_PADDING_SIZE
#endif
;
unsigned char *buf = malloc(alloclen);
int ret = read(stream->fd, buf, maxbuflen);
if (ret == 0) {
ilog(LOG_INFO, "EOF on stream %s", stream->name);
stream_close(stream);
@ -48,6 +58,11 @@ static void stream_handler(handler_t *handler) {
goto out;
}
// got a packet
pthread_mutex_unlock(&stream->lock);
packet_process(stream, buf, ret);
return;
out:
pthread_mutex_unlock(&stream->lock);
}
@ -66,6 +81,7 @@ static stream_t *stream_get(metafile_t *mf, unsigned long id) {
pthread_mutex_init(&ret->lock, NULL);
ret->fd = -1;
ret->id = id;
ret->metafile = mf;
out:
return ret;
@ -89,15 +105,6 @@ void stream_open(metafile_t *mf, unsigned long id, char *name) {
return;
}
stream->avinf = av_find_input_format("rtp");
ilog(LOG_DEBUG, "avinf %p", stream->avinf);
stream->avfctx = avformat_alloc_context();
unsigned char *buf = av_malloc(1024); // ?
stream->avfctx->pb = avio_alloc_context(buf, 1024, 1, NULL, NULL, NULL, NULL);
int ret = avformat_open_input(&stream->avfctx, "", stream->avinf, NULL);
ilog(LOG_DEBUG, "ret %i avfctx %p", ret, stream->avfctx);
// add to epoll
stream->handler.ptr = stream;
stream->handler.func = stream_handler;

@ -6,38 +6,86 @@
#include <sys/types.h>
#include <glib.h>
#include <pcre.h>
#include <libavformat/avformat.h>
#include "str.h"
struct iphdr;
struct ip6_hdr;
struct udphdr;
struct rtp_header;
struct handler_s;
typedef struct handler_s handler_t;
struct metafile_s;
typedef struct metafile_s metafile_t;
struct decoder_s;
typedef struct decoder_s decoder_t;
struct output_s;
typedef struct output_s output_t;
typedef void handler_func(handler_t *);
struct handler_s {
handler_func *func;
void *ptr;
};
struct stream_s {
pthread_mutex_t lock;
char *name;
metafile_t *metafile;
unsigned long id;
int fd;
handler_t handler;
AVInputFormat *avinf;
AVFormatContext *avfctx;
};
typedef struct stream_s stream_t;
struct packet_s {
void *buffer;
// pointers into buffer
struct iphdr *ip;
struct ip6_hdr *ip6;
struct udphdr *udp;
int seq;
struct rtp_header *rtp;
str payload;
};
typedef struct packet_s packet_t;
struct ssrc_s {
pthread_mutex_t lock;
metafile_t *metafile;
unsigned long ssrc;
GTree *packets; // contains packet_t objects
int seq; // next expected seq
decoder_t *decoders[128];
output_t *output;
};
typedef struct ssrc_s ssrc_t;
struct metafile_s {
pthread_mutex_t lock;
char *name;
char *parent;
char *call_id;
off_t pos;
GStringChunk *gsc; // XXX limit max size
GPtrArray *streams;
GHashTable *ssrc_hash; // contains ssrc_t objects
pthread_mutex_t payloads_lock;
char *payload_types[128];
};
typedef struct metafile_s metafile_t;
// struct pcre_s {
// pcre *re;

Loading…
Cancel
Save