Record raw RTP to recording files on the filesystem.

Pass in "record-call" flag over `rtpengine_offer` or `rtpengine_answer`
message. RTP Engine tracks files used to record pcaps and send them back
in the response message.

Pipes call audio (unencrypted from both ends) to recording files.
Sets up file descriptors for local files to dump RTP recordings.
A file and a file descriptor per monologue in a call.

Recorded streams will be running in user daemon mode, not in kernel mode.
This removes first 12 octets from packet to record just the rtp.
pull/245/head
Eric Green 10 years ago
parent 351bc106be
commit 70eb6b9e81

@ -2210,6 +2210,11 @@ void call_destroy(struct call *c) {
obj_put(sfd);
}
while (c->recording_pcaps) {
free(c->recording_pcaps->data);
c->recording_pcaps = g_slist_delete_link(c->recording_pcaps, c->recording_pcaps);
}
rwlock_unlock_w(&c->master_lock);
}
@ -2401,6 +2406,27 @@ struct call_monologue *__monologue_create(struct call *call) {
ret->call = call;
ret->created = poller_now;
ret->other_tags = g_hash_table_new(str_hash, str_equal);
if (call->record_call) {
char recording_path[15];
char logbuf[15];
/*
*
* create a file descriptor per monologue which can be used for writing rtp to disk
* aka call recording.
*/
sprintf(recording_path, "/tmp/%d", rand());
GSList *list = NULL;
call->recording_pcaps = g_slist_prepend(call->recording_pcaps, g_strdup(recording_path));
ilog(LOG_INFO, "xxegreen: path2 %s", call->recording_pcaps->data);
ilog(LOG_INFO, "XXXECT: Creating new file descriptor for recording at path %s", recording_path);
ret->recording_fd = open(recording_path, O_WRONLY | O_CREAT | O_TRUNC);
sprintf(logbuf, "%d", ret->recording_fd);
ilog(LOG_INFO, "XXXECT: FD created: %s", logbuf);
} else {
ret->recording_fd = -1;
}
g_queue_init(&ret->medias);
gettimeofday(&ret->started, NULL);
@ -2474,6 +2500,9 @@ static void __monologue_destroy(struct call_monologue *monologue) {
GList *l;
call = monologue->call;
/* XXXECT BEGIN */
close(monologue->recording_fd);
/* XXXECT END */
g_hash_table_remove(call->tags, &monologue->tag);

@ -146,7 +146,7 @@ enum call_type {
#define PS_FLAG_CONFIRMED 0x00200000
#define PS_FLAG_KERNELIZED 0x00400000
#define PS_FLAG_NO_KERNEL_SUPPORT 0x00800000
#define PS_FLAG_UNUSED 0x01000000
#define PS_FLAG_FORCE_DAEMON_MODE 0x01000000
#define PS_FLAG_FINGERPRINT_VERIFIED 0x02000000
#define PS_FLAG_STRICT_SOURCE SHARED_FLAG_STRICT_SOURCE
#define PS_FLAG_MEDIA_HANDOVER SHARED_FLAG_MEDIA_HANDOVER
@ -397,7 +397,7 @@ struct call_monologue {
enum termination_reason term_reason;
GHashTable *other_tags;
struct call_monologue *active_dialogue;
int recording_fd;
GQueue medias;
};
@ -413,14 +413,17 @@ struct call {
rwlock_t master_lock;
GQueue monologues;
GQueue medias;
GHashTable *tags;
GHashTable *tags;
GHashTable *viabranches;
GQueue streams;
GQueue stream_fds;
GQueue endpoint_maps;
struct dtls_cert *dtls_cert; /* for outgoing */
str callid;
unsigned int record_call;
GSList *recording_pcaps;
str callid;
time_t created;
time_t last_signal;
time_t deleted;

@ -647,7 +647,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
bencode_item_t *output, enum call_opmode opmode, const char* addr,
const endpoint_t *sin)
{
str sdp, fromtag, totag = STR_NULL, callid, viabranch;
str sdp, fromtag, totag = STR_NULL, callid, viabranch, recordcall = STR_NULL;
char *errstr;
GQueue parsed = G_QUEUE_INIT;
GQueue streams = G_QUEUE_INIT;
@ -670,6 +670,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
str_swap(&totag, &fromtag);
}
bencode_dictionary_get_str(input, "via-branch", &viabranch);
bencode_dictionary_get_str(input, "record-call", &recordcall);
if (sdp_parse(&sdp, &parsed))
return "Failed to parse SDP";
@ -686,6 +687,12 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
if (!call)
goto out;
if (recordcall.s && !str_cmp(&recordcall, "yes")) {
call->record_call = 1;
} else {
call->record_call = 0;
}
if (!call->created_from && addr) {
call->created_from = call_strdup(call, addr);
call->created_from_addr = sin->address;
@ -733,6 +740,13 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
bencode_dictionary_add_iovec(output, "sdp", &g_array_index(chopper->iov, struct iovec, 0),
chopper->iov_num, chopper->str_len);
bencode_dictionary_add_string(output, "result", "ok");
bencode_item_t *recordings = bencode_dictionary_add_list(output, "recordings");
GList *l;
char *recording_path;
for (l = call->recording_pcaps; l; l = l->next) {
ilog(LOG_INFO, "xxegreen: Recording path %s", l->data);
bencode_list_add_string(recordings, l->data);
}
errstr = NULL;
out:

@ -237,7 +237,7 @@ static int has_free_ports_loc(struct local_intf *loc, unsigned int num_ports) {
ilog(LOG_ERR, "has_free_ports_loc - NULL local interface");
return 0;
}
if (num_ports > loc->spec->port_pool.free_ports) {
ilog(LOG_ERR, "Didn't found %d ports available for %.*s/%s",
num_ports, loc->logical->name.len, loc->logical->name.s,
@ -297,7 +297,7 @@ static int has_free_ports_log_all(struct logical_intf *log, unsigned int num_por
return 1;
}
/* run round-robin-calls algorithm */
/* run round-robin-calls algorithm */
static struct logical_intf* run_round_robin_calls(GQueue *q, unsigned int num_ports) {
struct logical_intf *log = NULL;
volatile unsigned int nr_tries = 0;
@ -340,7 +340,7 @@ select_log:
// 2 streams => 4 x get_logical_interface calls at offer
selection_count ++;
if (selection_count % (num_ports / 2) == 0) {
selection_count = 0;
selection_count = 0;
selection_index ++;
selection_index = selection_index % nr_logs;
}
@ -834,6 +834,8 @@ void kernelize(struct packet_stream *stream) {
if (PS_ISSET(stream, KERNELIZED))
return;
if (PS_ISSET(stream, FORCE_DAEMON_MODE))
return;
if (cm->conf.kernelid < 0)
goto no_kernel;
nk_warn_msg = "interface to kernel module not open";
@ -938,6 +940,8 @@ void __unkernelize(struct packet_stream *p) {
return;
if (PS_ISSET(p, NO_KERNEL_SUPPORT))
return;
if (PS_ISSET(p, FORCE_DAEMON_MODE))
return;
if (p->call->callmaster->conf.kernelfd >= 0) {
__re_address_translate_ep(&rea, &p->selected_sfd->socket.local);
@ -1023,6 +1027,8 @@ static int stream_packet(struct stream_fd *sfd, str *s, const endpoint_t *fsin,
int ret = 0, update = 0, stun_ret = 0, handler_ret = 0, muxed_rtcp = 0, rtcp = 0,
unk = 0;
int i;
// XXEGREEN... This makes me nervous.
int recording_fd = sfd->stream->media->monologue->recording_fd;
struct call *call;
struct callmaster *cm;
/*unsigned char cc;*/
@ -1142,7 +1148,6 @@ loop_ok:
}
}
/* do we have somewhere to forward it to? */
if (!sink || !sink->selected_sfd || !out_srtp->selected_sfd || !in_srtp->selected_sfd) {
@ -1172,8 +1177,24 @@ loop_ok:
/* return values are: 0 = forward packet, -1 = error/dont forward,
* 1 = forward and push update to redis */
if (rwf_in)
if (rwf_in) {
handler_ret = rwf_in(s, in_srtp);
ilog(LOG_INFO, "xxegreen peer address as %s", endpoint_print_buf(fsin));
}
// This might be the hook that rfuchs might be referring to
// ilog(LOG_WARNING, "xxegreen0: %s", s->s);
// EGREEN: This is working pretty nicely but we need to remove the first 12 bytes from each packet that it is dumping
if (recording_fd && recording_fd != -1) {
// I am aware that we need to do better and that this is a naive approach
int writelen = (s->len)-12;
char towrite[writelen];
memcpy(towrite, &s->s[12], writelen);
write(recording_fd, towrite, writelen);
// EGREEN: This is going to happen for every packet. We need to do better
PS_SET(stream, FORCE_DAEMON_MODE);
}
if (handler_ret >= 0) {
if (rtcp)
parse_and_log_rtcp_report(sfd, s, fsin, tv);
@ -1296,6 +1317,7 @@ kernel_check:
kernelize(stream);
forward:
// ilog(LOG_INFO, "XXEGREENSTREAM: %s", s->s);
if (sink)
mutex_lock(&sink->out_lock);
@ -1306,6 +1328,8 @@ forward:
|| stun_ret || handler_ret < 0)
goto drop;
// s is my packet?
ilog(LOG_INFO, "XXEGREEN NOT");
ret = socket_sendto(&sink->selected_sfd->socket, s->s, s->len, &sink->endpoint);
__C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&sink->endpoint.address), sink->endpoint.port);
@ -1367,6 +1391,20 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
log_info_stream_fd(sfd);
/*
* I should be able to create a filedescriptor here for each stream and pass it
* to stream_packet. Each file descriptor should then receive one side of the
* rtp for each call. I fully expect some packets to get dropped using this
* naive method. If we are seeing problems in testing we could use a RAM disk.
*/
// var egreentmp1[15] = "/tmp" + rand();
// ilog(LOG_INFO, "XXEGREEN: Creting new file descriptor for recording %s", egreentmp1);
// int egreenFD = open(egreentmp1, O_WRONLY | O_CREAT | O_TRUNC);
// char egreentmp[15];
// sprintf(egreentmp, "%d", egreenFD);
// ilog(LOG_INFO, "XXEGREEN: FD created: %s", egreentmp);
for (iters = 0; ; iters++) {
#if MAX_RECV_ITERS
if (iters >= MAX_RECV_ITERS) {
@ -1434,7 +1472,7 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo
sfd->call = obj_get(call);
sfd->local_intf = lif;
g_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */
g_slice_free1(sizeof(*fd), fd); /* moved into sfd, thus free */
//sfd->recording_fd = recording_fd;
__C_DBG("stream_fd_new localport=%d", sfd->socket.local.port);

@ -64,6 +64,7 @@ struct stream_fd {
struct packet_stream *stream; /* LOCK: call->master_lock */
struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */
struct dtls_connection dtls; /* LOCK: stream->in_lock */
int recording_fd; /* XXEGREEN file descriptor to record rtp to */
};

Loading…
Cancel
Save