TT#52651 produce output for TCP forwarding feature

Change-Id: I18543921577faf655679829684f5af46c0af5054
changes/33/27233/5
Richard Fuchs 6 years ago
parent 2ef8028eb2
commit 442c48f627

@ -989,7 +989,7 @@ static int __packet_decoded(decoder_t *decoder, AVFrame *frame, void *u1, void *
static int packet_decode(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp)
{
return decoder_input_data(ch->decoder, packet->payload, packet->ts, __packet_decoded, ch, mp, NULL);
return decoder_input_data(ch->decoder, packet->payload, packet->ts, __packet_decoded, ch, mp);
}
static int handler_func_transcode(struct codec_handler *h, struct media_packet *mp) {

@ -591,8 +591,7 @@ err:
}
int decoder_input_data(decoder_t *dec, const str *data, unsigned long ts,
int (*callback)(decoder_t *, AVFrame *, void *u1, void *u2, void *u3),
void *u1, void *u2, void *u3)
int (*callback)(decoder_t *, AVFrame *, void *u1, void *u2), void *u1, void *u2)
{
GQueue frames = G_QUEUE_INIT;
@ -634,7 +633,7 @@ int decoder_input_data(decoder_t *dec, const str *data, unsigned long ts,
ret = -1;
}
else {
if (callback(dec, rsmp_frame, u1, u2, u3))
if (callback(dec, rsmp_frame, u1, u2))
ret = -1;
}
av_frame_free(&frame);

@ -199,8 +199,7 @@ decoder_t *decoder_new_fmtp(const codec_def_t *def, int clockrate, int channels,
const str *fmtp);
void decoder_close(decoder_t *dec);
int decoder_input_data(decoder_t *dec, const str *data, unsigned long ts,
int (*callback)(decoder_t *, AVFrame *, void *u1, void *u2, void *u3),
void *u1, void *u2, void *u3);
int (*callback)(decoder_t *, AVFrame *, void *u1, void *u2), void *u1, void *u2);
encoder_t *encoder_new();

@ -15,6 +15,7 @@
#include "mix.h"
#include "resample.h"
#include "codeclib.h"
#include "streambuf.h"
int resample_audio;
@ -79,9 +80,10 @@ decode_t *decoder_new(const char *payload_str, output_t *outp) {
}
static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *op, void *mp, void *dp) {
metafile_t *metafile = mp;
output_t *output = op;
static int decoder_got_frame(decoder_t *dec, AVFrame *frame, void *sp, void *dp) {
ssrc_t *ssrc = sp;
metafile_t *metafile = ssrc->metafile;
output_t *output = ssrc->output;
decode_t *deco = dp;
dbg("got frame pts %llu samples %u contents %02x%02x%02x%02x...", (unsigned long long) frame->pts, frame->nb_samples,
@ -122,6 +124,39 @@ no_mix_out:
}
no_recording:
if (ssrc->tcp_fwd_stream) {
// XXX might be a second resampling to same format
AVFrame *dec_frame = resample_frame(&ssrc->tcp_fwd_resampler, frame, &ssrc->tcp_fwd_format);
if (!ssrc->tcp_fwd_poller.connected) {
int status = connect_socket_retry(&ssrc->tcp_fwd_sock);
if (status == 0) {
ssrc->tcp_fwd_poller.connected = 1;
ssrc->tcp_fwd_poller.blocked = 0;
}
else if (status < 0) {
ilog(LOG_ERR, "Failed to connect TCP socket: %s", strerror(errno));
streambuf_destroy(ssrc->tcp_fwd_stream);
ssrc->tcp_fwd_stream = NULL;
}
}
if (!ssrc->tcp_fwd_poller.connected && ssrc->tcp_fwd_poller.blocked) {
ssrc->tcp_fwd_poller.blocked = 0;
streambuf_writeable(ssrc->tcp_fwd_stream);
}
if (!ssrc->tcp_fwd_poller.intro) {
streambuf_write(ssrc->tcp_fwd_stream, metafile->metadata, strlen(metafile->metadata) + 1);
ssrc->tcp_fwd_poller.intro = 1;
}
streambuf_write(ssrc->tcp_fwd_stream, (char *) dec_frame->extended_data[0],
dec_frame->linesize[0]);
av_frame_free(&dec_frame);
}
av_frame_free(&frame);
return 0;
@ -131,8 +166,8 @@ err:
}
int decoder_input(decode_t *deco, const str *data, unsigned long ts, output_t *output, metafile_t *metafile) {
return decoder_input_data(deco->dec, data, ts, decoder_got_frame, output, metafile, deco);
int decoder_input(decode_t *deco, const str *data, unsigned long ts, ssrc_t *ssrc) {
return decoder_input_data(deco->dec, data, ts, decoder_got_frame, ssrc, deco);
}
void decoder_free(decode_t *deco) {

@ -9,7 +9,7 @@ extern int resample_audio;
decode_t *decoder_new(const char *payload_str, output_t *);
int decoder_input(decode_t *, const str *, unsigned long ts, output_t *, metafile_t *);
int decoder_input(decode_t *, const str *, unsigned long ts, ssrc_t *);
void decoder_free(decode_t *);

@ -70,11 +70,11 @@ static void signals(void) {
static void setup(void) {
log_init("rtpengine-recording");
if (decoding_enabled) {
socket_init();
if (decoding_enabled)
codeclib_init(0);
output_init(output_format);
}
if (output_enabled) {
output_init(output_format);
if (!g_file_test(output_dir, G_FILE_TEST_IS_DIR)) {
ilog(LOG_INFO, "Creating output dir '%s'", output_dir);
if (mkdir(output_dir, 0700))
@ -178,7 +178,7 @@ static void options(int *argc, char ***argv) {
output_enabled = 0;
if (output_mixed || output_single)
die("Output is disabled, but output-mixed or output-single is set");
if (!forward_to && !tcp_send_to_ep.address.family) {
if (!forward_to && !tcp_send_to_ep.port) {
//the daemon has no function
die("Both output and forwarding are disabled");
}
@ -186,7 +186,7 @@ static void options(int *argc, char ***argv) {
} else if (!output_mixed && !output_single)
output_mixed = output_single = 1;
if (output_enabled || tcp_send_to_ep.address.family)
if (output_enabled || tcp_send_to_ep.port)
decoding_enabled = 1;
if (!os_str || !strcmp(os_str, "file"))

@ -148,6 +148,8 @@ static void meta_section(metafile_t *mf, char *section, char *content, unsigned
tag_label(mf, lu, content);
else if (sscanf_match(section, "RECORDING %u", &u) == 1)
mf->recording_on = u;
else if (sscanf_match(section, "FORWARDING %u", &u) == 1)
mf->forwarding_on = u;
}

@ -12,8 +12,6 @@
static const codec_def_t *output_codec;
static const char *output_file_format;
static const codec_def_t *tcp_send_codec;
int mp3_bitrate;
@ -170,13 +168,6 @@ void output_close(output_t *output) {
void output_init(const char *format) {
str codec;
str_init(&codec, "PCM-S16LE");
tcp_send_codec = codec_find(&codec, MT_AUDIO);
assert(tcp_send_codec != NULL);
if (!format)
return;
if (!strcmp(format, "wav")) {
str_init(&codec, "PCM-S16LE");
output_file_format = "wav";

@ -13,6 +13,8 @@
#include "main.h"
#include "output.h"
#include "db.h"
#include "streambuf.h"
#include "resample.h"
static void packet_free(void *p) {
@ -30,6 +32,12 @@ void ssrc_free(void *p) {
output_close(s->output);
for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++)
decoder_free(s->decoders[i]);
if (s->tcp_fwd_stream) {
close_socket(&s->tcp_fwd_sock);
streambuf_destroy(s->tcp_fwd_stream);
s->tcp_fwd_stream = NULL;
resample_shutdown(&s->tcp_fwd_resampler);
}
g_slice_free1(sizeof(*s), s);
}
@ -49,18 +57,44 @@ static ssrc_t *ssrc_get(stream_t *stream, unsigned long ssrc) {
ret->ssrc = ssrc;
packet_sequencer_init(&ret->sequencer, packet_free);
char buf[256];
snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc);
if (output_single) {
ret->output = output_new(output_dir, buf);
db_do_stream(mf, ret->output, "single", stream, ssrc);
}
g_hash_table_insert(mf->ssrc_hash, GUINT_TO_POINTER(ssrc), ret);
out:
pthread_mutex_lock(&ret->lock);
pthread_mutex_unlock(&mf->lock);
if (mf->recording_on && !ret->output && output_single) {
char buf[256];
snprintf(buf, sizeof(buf), "%s-%08lx", mf->parent, ssrc);
ret->output = output_new(output_dir, buf);
db_do_stream(mf, ret->output, "single", stream, ssrc);
}
if (mf->forwarding_on && !ret->tcp_fwd_stream) {
ZERO(ret->tcp_fwd_poller);
int status = connect_socket_nb(&ret->tcp_fwd_sock, SOCK_STREAM, &tcp_send_to_ep);
if (status >= 0) {
ret->tcp_fwd_stream = streambuf_new(&ret->tcp_fwd_poller, ret->tcp_fwd_sock.fd);
if (status == 1)
ret->tcp_fwd_poller.blocked = 1;
else
ret->tcp_fwd_poller.connected = 1;
}
else
ilog(LOG_ERR, "Failed to open/connect TCP socket: %s", strerror(errno));
ret->tcp_fwd_format = (format_t) {
.clockrate = tcp_resample,
.channels = 1,
.format = AV_SAMPLE_FMT_S16,
};
}
else if (!mf->forwarding_on && ret->tcp_fwd_stream) {
// XXX same as above - unify
close_socket(&ret->tcp_fwd_sock);
streambuf_destroy(ret->tcp_fwd_stream);
ret->tcp_fwd_stream = NULL;
resample_shutdown(&ret->tcp_fwd_resampler);
}
return ret;
}
@ -103,7 +137,7 @@ static void packet_decode(ssrc_t *ssrc, packet_t *packet) {
}
if (decoder_input(ssrc->decoders[payload_type], &packet->payload, ntohl(packet->rtp->timestamp),
ssrc->output, ssrc->metafile))
ssrc))
ilog(LOG_ERR, "Failed to decode media packet");
}

@ -1,9 +1,10 @@
#include "poller.h"
void poller_blocked(struct poller *p, int fd) {
p->blocked = 1;
}
int poller_isblocked(struct poller *p, int fd) {
return 0;
return p->blocked ? 1 : 0;
}
void poller_error(struct poller *p, int fd) {
}

@ -2,7 +2,13 @@
#define __POLLER_H__
struct poller;
// dummy poller
struct poller {
int blocked:1;
int connected:1;
int error:1;
int intro:1;
};
void poller_blocked(struct poller *, int);
int poller_isblocked(struct poller *, int);

@ -12,12 +12,15 @@
#include <libavutil/audio_fifo.h>
#include "str.h"
#include "codeclib.h"
#include "poller.h"
#include "socket.h"
struct iphdr;
struct ip6_hdr;
struct udphdr;
struct rtp_header;
struct streambuf;
struct handler_s;
@ -75,6 +78,12 @@ struct ssrc_s {
packet_sequencer_t sequencer;
decode_t *decoders[128];
output_t *output;
format_t tcp_fwd_format;
resample_t tcp_fwd_resampler;
socket_t tcp_fwd_sock;
struct streambuf *tcp_fwd_stream;
struct poller tcp_fwd_poller;
};
typedef struct ssrc_s ssrc_t;
@ -114,6 +123,7 @@ struct metafile_s {
char *payload_types[128];
int recording_on:1;
int forwarding_on:1;
};

@ -54,7 +54,7 @@ static void do_test_amr_xx(const char *file, int line,
decoder_t *d = decoder_new_fmtp(def, clockrate, 1, &fmt, fmtp);
assert(d);
const str data = { data_s, data_len };
int ret = decoder_input_data(d, &data, 1, frame_cb, &expect_s, &expect_len, NULL);
int ret = decoder_input_data(d, &data, 1, frame_cb, &expect_s, &expect_len);
assert(!ret);
assert(expect_s == NULL);
decoder_close(d);

Loading…
Cancel
Save