Merge branch 'add-recording-forwarding' of https://github.com/1and1/rtpengine

pull/420/head
Richard Fuchs 8 years ago
commit fb6575eefb

@ -747,9 +747,9 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
chopper = sdp_chopper_new(&sdp);
bencode_buffer_destroy_add(output->buffer, (free_func_t) sdp_chopper_destroy, chopper);
detect_setup_recording(call, &flags.record_call_str);
detect_setup_recording(call, &flags.record_call_str, &flags.metadata);
if (flags.record_call)
recording_start(call, NULL);
recording_start(call, NULL, &flags.metadata);
ret = monologue_offer_answer(monologue, &streams, &flags);
if (!ret)
@ -763,7 +763,8 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
meta_write_sdp_after(recording, sdp_iov, chopper->iov_num, chopper->str_len,
monologue, opmode);
if (flags.metadata.len) {
//only add METADATA chunk if value is changed
if (flags.metadata.len && str_cmp_str(&flags.metadata, &recording->metadata)) {
call_str_cpy(call, &recording->metadata, &flags.metadata);
recording_meta_chunk(recording, "METADATA", &flags.metadata);
}
@ -1192,14 +1193,16 @@ const char *call_list_ng(bencode_item_t *input, struct callmaster *m, bencode_it
const char *call_start_recording_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) {
str callid;
struct call *call;
str metadata;
if (!bencode_dictionary_get_str(input, "call-id", &callid))
return "No call-id in message";
bencode_dictionary_get_str(input, "metadata", &metadata);
call = call_get_opmode(&callid, m, OP_OTHER);
if (!call)
return "Unknown call-id";
recording_start(call, NULL);
recording_start(call, NULL, &metadata);
rwlock_unlock_w(&call->master_lock);
obj_put(call);

@ -214,7 +214,7 @@ static int pcap_create_spool_dir(const char *spoolpath) {
}
// lock must be held
void recording_start(struct call *call, const char *prefix) {
void recording_start(struct call *call, const char *prefix, str *metadata) {
if (call->recording) // already active
return;
@ -236,6 +236,9 @@ void recording_start(struct call *call, const char *prefix) {
}
else
recording->meta_prefix = strdup(prefix);
if (metadata->len) {
call_str_cpy(call, &recording->metadata, metadata);
}
_rm(init_struct, call);
@ -269,12 +272,12 @@ void recording_stop(struct call *call) {
*
* Returns a boolean for whether or not the call is being recorded.
*/
void detect_setup_recording(struct call *call, const str *recordcall) {
void detect_setup_recording(struct call *call, const str *recordcall, str *metadata) {
if (!recordcall || !recordcall->s)
return;
if (!str_cmp(recordcall, "yes") || !str_cmp(recordcall, "on"))
recording_start(call, NULL);
recording_start(call, NULL, metadata);
else if (!str_cmp(recordcall, "no") || !str_cmp(recordcall, "off"))
recording_stop(call);
else
@ -655,6 +658,8 @@ static void proc_init(struct call *call) {
append_meta_chunk_str(recording, &call->callid, "CALL-ID");
append_meta_chunk_s(recording, recording->meta_prefix, "PARENT");
if (recording->metadata.len)
recording_meta_chunk(recording, "METADATA", &recording->metadata);
}
static void sdp_before_proc(struct recording *recording, const str *sdp, struct call_monologue *ml,

@ -113,9 +113,9 @@ void recording_fs_init(const char *spooldir, const char *method, const char *for
*
* Returns a boolean for whether or not the call is being recorded.
*/
void detect_setup_recording(struct call *call, const str *recordcall);
void detect_setup_recording(struct call *call, const str *recordcall, str *metadata);
void recording_start(struct call *call, const char *);
void recording_start(struct call *call, const char *prefix, str *metadata);
void recording_stop(struct call *call);

@ -1440,7 +1440,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str *
struct redis_hash call;
struct redis_list tags, sfds, streams, medias, maps;
struct call *c = NULL;
str s, id;
str s, id, meta;
const char *err = 0;
int i;
@ -1547,10 +1547,8 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str *
// presence of this key determines whether we were recording at all
if (!redis_hash_get_str(&s, &call, "recording_meta_prefix")) {
recording_start(c, s.s);
if ((c->recording) && (!redis_hash_get_str(&s, &call, "recording_metadata")))
call_str_cpy(c, &c->recording->metadata, &s);
redis_hash_get_str(&meta, &call, "recording_metadata");
recording_start(c, s.s, &meta);
}
err = NULL;

@ -25,7 +25,7 @@ LDFLAGS+= `mysql_config --libs`
include ../lib/lib.Makefile
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \
decoder.c output.c mix.c resample.c db.c log.c
decoder.c output.c mix.c resample.c db.c log.c forward.c
LIBSRCS= loglib.c auxlib.c rtplib.c
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o)

@ -0,0 +1,71 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <fcntl.h>
#include "forward.h"
#include "main.h"
#include "log.h"
void start_forwarding_capture(metafile_t *mf, char *meta_info) {
int sock;
struct sockaddr_un addr;
if (mf->forward_fd >= 0) {
ilog(LOG_INFO, "Connection already established");
return;
}
if ((sock = socket(AF_UNIX, SOCK_SEQPACKET, 0)) == -1) {
ilog(LOG_ERR, "Error creating socket: %s", strerror(errno));
return;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, forward_to, sizeof(addr.sun_path) - 1);
if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
ilog(LOG_ERR, "Error setting socket non-blocking: %s", strerror(errno));
goto err;
}
if (connect(sock, (struct sockaddr*) &addr, sizeof(addr)) == -1) {
ilog(LOG_ERR, "Error connecting to socket %s : %s", addr.sun_path,
strerror(errno));
goto err;
}
if (send(sock, meta_info, strlen(meta_info), 0) == -1) {
ilog(LOG_ERR, "Error sending meta info: %s. Call will not be forwarded", strerror(errno));
goto err;
}
mf->forward_fd = sock;
return;
err:
close(sock);
}
int forward_packet(metafile_t *mf, unsigned char *buf, unsigned len) {
if (mf->forward_fd == -1) {
ilog(LOG_ERR,
"Trying to send packets, but connection not initialized!");
goto err;
}
if (send(mf->forward_fd, buf, len, 0) == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
ilog(LOG_DEBUG, "Dropping packet since call would block");
else
ilog(LOG_ERR, "Error sending: %s", strerror(errno));
goto err;
}
free(buf);
return 0;
err:
free(buf);
return -1;
}

@ -0,0 +1,9 @@
#ifndef _FORWARD_H_
#define _FORWARD_H_
#include "types.h"
void start_forwarding_capture(metafile_t *mf, char *meta_info);
int forward_packet(metafile_t *mf, unsigned char *buf, unsigned len);
#endif

@ -22,6 +22,7 @@
#include "auxlib.h"
#include "decoder.h"
#include "output.h"
#include "forward.h"
@ -32,12 +33,13 @@ const char *output_dir = "/var/lib/rtpengine-recording";
static const char *output_format = "wav";
int output_mixed;
int output_single;
int output_enabled = 1;
const char *c_mysql_host,
*c_mysql_user,
*c_mysql_pass,
*c_mysql_db;
int c_mysql_port;
const char *forward_to = NULL;
static GQueue threads = G_QUEUE_INIT; // only accessed from main thread
@ -90,23 +92,25 @@ static void avlog_ilog(void *ptr, int loglevel, const char *fmt, va_list ap) {
static void setup(void) {
log_init("rtpengine-recording");
av_register_all();
avcodec_register_all();
avfilter_register_all();
avformat_network_init();
if (output_enabled) {
av_register_all();
avcodec_register_all();
avfilter_register_all();
avformat_network_init();
av_log_set_callback(avlog_ilog);
output_init(output_format);
if (!g_file_test(output_dir, G_FILE_TEST_IS_DIR)) {
ilog(LOG_INFO, "Creating output dir '%s'", output_dir);
if (mkdir(output_dir, 0700))
die_errno("Failed to create output dir '%s'");
}
}
mysql_library_init(0, NULL, NULL);
signals();
metafile_setup();
epoll_setup();
inotify_setup();
av_log_set_callback(avlog_ilog);
output_init(output_format);
if (!g_file_test(output_dir, G_FILE_TEST_IS_DIR)) {
ilog(LOG_INFO, "Creating output dir '%s'", output_dir);
if (mkdir(output_dir, 0700))
die_errno("Failed to create output dir '%s'");
}
}
@ -167,7 +171,7 @@ static void options(int *argc, char ***argv) {
{ "spool-dir", 0, 0, G_OPTION_ARG_STRING, &spool_dir, "Directory containing rtpengine metadata files", "PATH" },
{ "num-threads", 0, 0, G_OPTION_ARG_INT, &num_threads, "Number of worker threads", "INT" },
{ "output-dir", 0, 0, G_OPTION_ARG_STRING, &output_dir, "Where to write media files to", "PATH" },
{ "output-format", 0, 0, G_OPTION_ARG_STRING, &output_format, "Write audio files of this type", "wav|mp3" },
{ "output-format", 0, 0, G_OPTION_ARG_STRING, &output_format, "Write audio files of this type", "wav|mp3|none" },
{ "resample-to", 0, 0, G_OPTION_ARG_INT, &resample_audio,"Resample all output audio", "INT" },
{ "mp3-bitrate", 0, 0, G_OPTION_ARG_INT, &mp3_bitrate, "Bits per second for MP3 encoding", "INT" },
{ "output-mixed", 0, 0, G_OPTION_ARG_NONE, &output_mixed, "Mix participating sources into a single output",NULL },
@ -177,13 +181,22 @@ static void options(int *argc, char ***argv) {
{ "mysql-user", 0, 0, G_OPTION_ARG_STRING, &c_mysql_user, "MySQL connection credentials", "USERNAME" },
{ "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &c_mysql_pass, "MySQL connection credentials", "PASSWORD" },
{ "mysql-db", 0, 0, G_OPTION_ARG_STRING, &c_mysql_db, "MySQL database name", "STRING" },
{ "forward-to", 0, 0, G_OPTION_ARG_STRING, &forward_to, "Where to forward to (unix socket)", "PATH" },
{ NULL, }
};
config_load(argc, argv, e, " - rtpengine recording daemon",
"/etc/rtpengine/rtpengine-recording.conf", "rtpengine-recording");
if (!output_mixed && !output_single)
if (!strcmp(output_format, "none")) {
output_enabled = 0;
if (output_mixed || output_single)
die("Output is disabled, but output-mixed or output-single is set");
if (!forward_to) {
//the daemon has no function
die("Both output and packet forwarding are disabled");
}
} else if (!output_mixed && !output_single)
output_mixed = output_single = 1;
}

@ -8,11 +8,13 @@ extern const char *spool_dir;
extern const char *output_dir;
extern int output_mixed;
extern int output_single;
extern int output_enabled;
extern const char *c_mysql_host,
*c_mysql_user,
*c_mysql_pass,
*c_mysql_db;
extern int c_mysql_port;
extern const char *forward_to;
extern volatile int shutdown_flag;

@ -15,7 +15,7 @@
#include "output.h"
#include "mix.h"
#include "db.h"
#include "forward.h"
static pthread_mutex_t metafiles_lock = PTHREAD_MUTEX_INITIALIZER;
static GHashTable *metafiles;
@ -25,8 +25,10 @@ static void meta_free(void *ptr) {
metafile_t *mf = ptr;
dbg("freeing metafile info for %s", mf->name);
output_close(mf->mix_out);
mix_destroy(mf->mix);
if (output_enabled) {
output_close(mf->mix_out);
mix_destroy(mf->mix);
}
g_string_chunk_free(mf->gsc);
for (int i = 0; i < mf->streams->len; i++) {
stream_t *stream = g_ptr_array_index(mf->streams, i);
@ -34,7 +36,8 @@ static void meta_free(void *ptr) {
stream_free(stream);
}
g_ptr_array_free(mf->streams, TRUE);
g_hash_table_destroy(mf->ssrc_hash);
if (output_enabled)
g_hash_table_destroy(mf->ssrc_hash);
g_slice_free1(sizeof(*mf), mf);
}
@ -48,6 +51,14 @@ static void meta_destroy(metafile_t *mf) {
stream_close(stream);
pthread_mutex_unlock(&stream->lock);
}
//close forward socket
if (mf->forward_fd >= 0) {
dbg("call [%s] forwarded %d packets. %d failed sends.", mf->call_id,
(int )g_atomic_int_get(&mf->forward_count),
(int )g_atomic_int_get(&mf->forward_failed));
close(mf->forward_fd);
mf->forward_fd = -1;
}
db_close_call(mf);
}
@ -55,17 +66,17 @@ static void meta_destroy(metafile_t *mf) {
// mf is locked
static void meta_stream_interface(metafile_t *mf, unsigned long snum, char *content) {
db_do_call(mf);
pthread_mutex_lock(&mf->mix_lock);
if (!mf->mix && output_mixed) {
char buf[256];
snprintf(buf, sizeof(buf), "%s-mix", mf->parent);
mf->mix_out = output_new(output_dir, buf);
mf->mix = mix_new();
db_do_stream(mf, mf->mix_out, "mixed", 0, 0);
if (output_enabled) {
pthread_mutex_lock(&mf->mix_lock);
if (!mf->mix && output_mixed) {
char buf[256];
snprintf(buf, sizeof(buf), "%s-mix", mf->parent);
mf->mix_out = output_new(output_dir, buf);
mf->mix = mix_new();
db_do_stream(mf, mf->mix_out, "mixed", 0, 0);
}
pthread_mutex_unlock(&mf->mix_lock);
}
pthread_mutex_unlock(&mf->mix_lock);
dbg("stream %lu interface %s", snum, content);
stream_open(mf, snum, content);
}
@ -87,10 +98,12 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i
ilog(LOG_ERR, "Payload type number %u is invalid", payload_num);
return;
}
pthread_mutex_lock(&mf->payloads_lock);
mf->payload_types[payload_num] = g_string_chunk_insert(mf->gsc, payload_type);
pthread_mutex_unlock(&mf->payloads_lock);
if (output_enabled) {
pthread_mutex_lock(&mf->payloads_lock);
mf->payload_types[payload_num] = g_string_chunk_insert(mf->gsc,
payload_type);
pthread_mutex_unlock(&mf->payloads_lock);
}
}
@ -98,6 +111,8 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i
static void meta_metadata(metafile_t *mf, char *content) {
mf->metadata = g_string_chunk_insert(mf->gsc, content);
db_do_call(mf);
if (forward_to)
start_forwarding_capture(mf, content);
}
@ -134,10 +149,16 @@ static metafile_t *metafile_get(char *name) {
mf->gsc = g_string_chunk_new(0);
mf->name = g_string_chunk_insert(mf->gsc, name);
pthread_mutex_init(&mf->lock, NULL);
pthread_mutex_init(&mf->payloads_lock, NULL);
pthread_mutex_init(&mf->mix_lock, NULL);
mf->streams = g_ptr_array_new();
mf->ssrc_hash = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_free);
mf->forward_fd = -1;
mf->forward_count = 0;
mf->forward_failed = 0;
if (output_enabled) {
pthread_mutex_init(&mf->payloads_lock, NULL);
pthread_mutex_init(&mf->mix_lock, NULL);
mf->ssrc_hash = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_free);
}
g_hash_table_insert(metafiles, mf->name, mf);

@ -10,6 +10,7 @@
#include "log.h"
#include "main.h"
#include "packet.h"
#include "forward.h"
#define MAXBUFLEN 65535
@ -67,7 +68,14 @@ static void stream_handler(handler_t *handler) {
// got a packet
pthread_mutex_unlock(&stream->lock);
packet_process(stream, buf, ret);
if (output_enabled)
packet_process(stream, buf, ret);
if (forward_to){
if (forward_packet(stream->metafile,buf,ret))
g_atomic_int_inc(&stream->metafile->forward_failed);
else
g_atomic_int_inc(&stream->metafile->forward_count);
}
log_info_call = NULL;
log_info_stream = NULL;
return;

@ -101,6 +101,10 @@ struct metafile_s {
mix_t *mix;
output_t *mix_out;
int forward_fd;
volatile gint forward_count;
volatile gint forward_failed;
pthread_mutex_t payloads_lock;
char *payload_types[128];
};

Loading…
Cancel
Save