TT#91151 split input from output legs in kernel module

Change-Id: Ic8269f5a506c560d4460f562fb545efe0ea6a809
pull/1295/head
Richard Fuchs 5 years ago
parent ecb0da8e4a
commit 01a10c0379

@ -642,13 +642,15 @@ static void call_timer(void *ptr) {
sink = packet_stream_sink(ps);
if (!ke->target.non_forwarding && diff_packets) {
if (sink) {
// only check the first
struct rtpengine_output_info *o = &ke->outputs[0];
if (sink && o->src_addr.family) {
mutex_lock(&sink->out_lock);
if (sink->crypto.params.crypto_suite && sink->ssrc_out
&& ntohl(ke->target.ssrc) == sink->ssrc_out->parent->h.ssrc
&& ke->target.encrypt.last_index - sink->ssrc_out->srtp_index > 0x4000)
&& o->encrypt.last_index - sink->ssrc_out->srtp_index > 0x4000)
{
sink->ssrc_out->srtp_index = ke->target.encrypt.last_index;
sink->ssrc_out->srtp_index = o->encrypt.last_index;
update = 1;
}
mutex_unlock(&sink->out_lock);

@ -129,7 +129,7 @@ int kernel_add_stream(struct rtpengine_target_info *mti) {
if (!kernel.is_open)
return -1;
msg.cmd = REMG_ADD;
msg.cmd = REMG_ADD_TARGET;
msg.u.target = *mti;
// coverity[uninit_use_in_call : FALSE]
@ -141,6 +141,25 @@ int kernel_add_stream(struct rtpengine_target_info *mti) {
return -1;
}
int kernel_add_destination(struct rtpengine_destination_info *mdi) {
struct rtpengine_message msg;
int ret;
if (!kernel.is_open)
return -1;
msg.cmd = REMG_ADD_DESTINATION;
msg.u.destination = *mdi;
// coverity[uninit_use_in_call : FALSE]
ret = write(kernel.fd, &msg, sizeof(msg));
if (ret > 0)
return 0;
ilog(LOG_ERROR, "Failed to push relay stream destination to kernel: %s", strerror(errno));
return -1;
}
int kernel_del_stream(const struct re_address *a) {
struct rtpengine_message msg;
@ -150,7 +169,7 @@ int kernel_del_stream(const struct re_address *a) {
return -1;
ZERO(msg);
msg.cmd = REMG_DEL;
msg.cmd = REMG_DEL_TARGET;
msg.u.target.local = *a;
ret = write(kernel.fd, &msg, sizeof(msg));

@ -1113,6 +1113,7 @@ static int __rtp_stats_pt_sort(const void *ap, const void *bp) {
/* called with in_lock held */
void kernelize(struct packet_stream *stream) {
struct rtpengine_target_info reti;
struct rtpengine_destination_info redi;
struct call *call = stream->call;
struct packet_stream *sink = NULL;
const char *nk_warn_msg;
@ -1167,6 +1168,7 @@ void kernelize(struct packet_stream *stream) {
goto no_kernel_warn;
ZERO(reti);
ZERO(redi);
if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
mutex_lock(&stream->out_lock);
@ -1181,7 +1183,8 @@ void kernelize(struct packet_stream *stream) {
mutex_lock(&sink->out_lock);
__re_address_translate_ep(&reti.local, &stream->selected_sfd->socket.local);
reti.tos = call->tos;
redi.local = reti.local;
redi.output.tos = call->tos;
reti.rtcp_mux = MEDIA_ISSET(media, RTCP_MUX);
reti.dtls = MEDIA_ISSET(media, DTLS);
reti.stun = media->ice_agent ? 1 : 0;
@ -1189,23 +1192,26 @@ void kernelize(struct packet_stream *stream) {
reti.blackhole = MEDIA_ISSET(media, BLACKHOLE) ? 1 : 0;
reti.rtp_stats = (MEDIA_ISSET(media, RTCP_GEN) || (mqtt_publish_scope() != MPS_NONE)) ? 1 : 0;
__re_address_translate_ep(&reti.dst_addr, &sink->endpoint);
__re_address_translate_ep(&reti.src_addr, &sink->selected_sfd->socket.local);
reti.num_destinations = 1;
redi.num = 0;
__re_address_translate_ep(&redi.output.dst_addr, &sink->endpoint);
__re_address_translate_ep(&redi.output.src_addr, &sink->selected_sfd->socket.local);
if (stream->ssrc_in) {
reti.ssrc = htonl(stream->ssrc_in->parent->h.ssrc);
if (MEDIA_ISSET(media, TRANSCODE) || MEDIA_ISSET(media, ECHO)) {
reti.ssrc_out = htonl(stream->ssrc_in->ssrc_map_out);
redi.output.ssrc_out = htonl(stream->ssrc_in->ssrc_map_out);
reti.transcoding = 1;
}
}
stream->handler->in->kernel(&reti.decrypt, stream);
stream->handler->out->kernel(&reti.encrypt, sink);
stream->handler->out->kernel(&redi.output.encrypt, sink);
mutex_unlock(&sink->out_lock);
nk_warn_msg = "encryption cipher or HMAC not supported by kernel module";
if (!reti.encrypt.cipher || !reti.encrypt.hmac)
if (!redi.output.encrypt.cipher || !redi.output.encrypt.hmac)
goto no_kernel_warn;
nk_warn_msg = "decryption cipher or HMAC not supported by kernel module";
if (!reti.decrypt.cipher || !reti.decrypt.hmac)
@ -1244,6 +1250,7 @@ void kernelize(struct packet_stream *stream) {
recording_stream_kernel_info(stream, &reti);
kernel_add_stream(&reti);
kernel_add_destination(&redi);
PS_SET(stream, KERNELIZED);
return;

@ -16,6 +16,7 @@
struct rtpengine_target_info;
struct rtpengine_destination_info;
struct re_address;
struct rtpengine_ssrc_stats;
@ -34,6 +35,7 @@ extern struct kernel_interface kernel;
int kernel_setup_table(unsigned int);
int kernel_add_stream(struct rtpengine_target_info *);
int kernel_add_destination(struct rtpengine_destination_info *);
int kernel_del_stream(const struct re_address *);
GList *kernel_list(void);
int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out);

@ -285,6 +285,10 @@ struct rtpengine_rtp_stats_a {
atomic64_t packets;
atomic64_t bytes;
};
struct rtpengine_output {
struct rtpengine_output_info output;
struct re_crypto_context encrypt;
};
struct rtpengine_target {
atomic_t refcnt;
uint32_t table;
@ -296,7 +300,10 @@ struct rtpengine_target {
struct rtpengine_ssrc_stats ssrc_stats;
struct re_crypto_context decrypt;
struct re_crypto_context encrypt;
rwlock_t outputs_lock;
struct rtpengine_output *outputs;
unsigned int outputs_unfilled; // only ever decreases
};
struct re_bitfield {
@ -853,6 +860,8 @@ static void free_crypto_context(struct re_crypto_context *c) {
}
static void target_put(struct rtpengine_target *t) {
unsigned int i;
if (!t)
return;
@ -862,8 +871,12 @@ static void target_put(struct rtpengine_target *t) {
DBG("Freeing target\n");
free_crypto_context(&t->decrypt);
free_crypto_context(&t->encrypt);
if (t->outputs) {
for (i = 0; i < t->target.num_destinations; i++)
free_crypto_context(&t->outputs[i].encrypt);
kfree(t->outputs);
}
kfree(t);
}
@ -1364,7 +1377,8 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
uint32_t id;
struct rtpengine_table *t;
struct rtpengine_list_entry *opp;
int err, port, addr_bucket, i;
int err, port, addr_bucket;
unsigned int i;
struct rtpengine_target *g;
unsigned long flags;
@ -1409,9 +1423,18 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
opp->target.decrypt.last_index = g->target.decrypt.last_index;
spin_unlock_irqrestore(&g->decrypt.lock, flags);
spin_lock_irqsave(&g->encrypt.lock, flags);
opp->target.encrypt.last_index = g->target.encrypt.last_index;
spin_unlock_irqrestore(&g->encrypt.lock, flags);
_r_lock(&g->outputs_lock, flags);
if (!g->outputs_unfilled) {
_r_unlock(&g->outputs_lock, flags);
for (i = 0; i < g->target.num_destinations; i++) {
struct rtpengine_output *o = &g->outputs[i];
spin_lock_irqsave(&o->encrypt.lock, flags);
opp->outputs[i] = o->output;
spin_unlock_irqrestore(&o->encrypt.lock, flags);
}
}
else
_r_unlock(&g->outputs_lock, flags);
target_put(g);
@ -1568,24 +1591,30 @@ static void proc_list_crypto_print(struct seq_file *f, struct re_crypto_context
}
if (c->hmac && c->hmac->id != REH_NULL) {
if (!hdr++)
seq_printf(f, " SRTP %s parameters:\n", label);
seq_printf(f, " HMAC: %s\n", c->hmac->name ? : "<invalid>");
seq_printf(f, " SRTP %s parameters:\n", label);
seq_printf(f, " HMAC: %s\n", c->hmac->name ? : "<invalid>");
seq_printf(f, " auth tag length: %u\n", s->auth_tag_len);
}
}
static int proc_list_show(struct seq_file *f, void *v) {
struct rtpengine_target *g = v;
int i;
unsigned int i;
unsigned long flags;
seq_printf(f, "local ");
seq_addr_print(f, &g->target.local);
seq_printf(f, "\n");
if (!g->target.non_forwarding) {
proc_list_addr_print(f, "src", &g->target.src_addr);
proc_list_addr_print(f, "dst", &g->target.dst_addr);
// all outputs filled?
_r_lock(&g->outputs_lock, flags);
if (g->outputs_unfilled) {
seq_printf(f, " outputs not fully filled (%u missing)\n", g->outputs_unfilled);
_r_unlock(&g->outputs_lock, flags);
goto out;
}
proc_list_addr_print(f, "mirror", &g->target.mirror_addr);
_r_unlock(&g->outputs_lock, flags);
proc_list_addr_print(f, "expect", &g->target.expected_src);
if (g->target.src_mismatch > 0 && g->target.src_mismatch <= ARRAY_SIZE(re_msm_strings))
seq_printf(f, " src mismatch action: %s\n", re_msm_strings[g->target.src_mismatch]);
@ -1599,11 +1628,8 @@ static int proc_list_show(struct seq_file *f, void *v) {
(unsigned long long) atomic64_read(&g->rtp_stats[i].bytes),
(unsigned long long) atomic64_read(&g->rtp_stats[i].packets));
if (g->target.ssrc)
seq_printf(f, " SSRC in: %lx\n", (unsigned long) ntohl(g->target.ssrc));
if (g->target.ssrc_out)
seq_printf(f, " SSRC out: %lx\n", (unsigned long) ntohl(g->target.ssrc_out));
proc_list_crypto_print(f, &g->decrypt, &g->target.decrypt, "decryption (incoming)");
proc_list_crypto_print(f, &g->encrypt, &g->target.encrypt, "encryption (outgoing)");
seq_printf(f, " SSRC in: %lx\n", (unsigned long) ntohl(g->target.ssrc));
proc_list_crypto_print(f, &g->decrypt, &g->target.decrypt, "decryption");
if (g->target.rtcp_mux)
seq_printf(f, " option: rtcp-mux\n");
if (g->target.dtls)
@ -1619,8 +1645,18 @@ static int proc_list_show(struct seq_file *f, void *v) {
if (g->target.rtp_stats)
seq_printf(f, " option: RTP stats\n");
target_put(g);
for (i = 0; i < g->target.num_destinations; i++) {
struct rtpengine_output *o = &g->outputs[i];
seq_printf(f, " output #%u\n", i);
proc_list_addr_print(f, "src", &o->output.src_addr);
proc_list_addr_print(f, "dst", &o->output.dst_addr);
if (o->output.ssrc_out)
seq_printf(f, " SSRC out: %lx\n", (unsigned long) ntohl(o->output.ssrc_out));
proc_list_crypto_print(f, &o->encrypt, &o->output.encrypt, "encryption");
}
out:
target_put(g);
return 0;
}
@ -2200,24 +2236,18 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
if (!is_valid_address(&i->local))
return -EINVAL;
if (i->num_destinations > MAX_FORWARD_DESTINATIONS)
return -EINVAL;
if (!i->non_forwarding) {
if (!is_valid_address(&i->src_addr))
return -EINVAL;
if (!is_valid_address(&i->dst_addr))
return -EINVAL;
if (i->src_addr.family != i->dst_addr.family)
if (!i->num_destinations)
return -EINVAL;
}
if (i->mirror_addr.family) {
if (!is_valid_address(&i->mirror_addr))
return -EINVAL;
if (i->mirror_addr.family != i->src_addr.family)
else {
if (i->num_destinations)
return -EINVAL;
}
if (validate_srtp(&i->decrypt))
return -EINVAL;
if (validate_srtp(&i->encrypt))
return -EINVAL;
DBG("Creating new target\n");
@ -2231,17 +2261,21 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
g->table = t->id;
atomic_set(&g->refcnt, 1);
spin_lock_init(&g->decrypt.lock);
spin_lock_init(&g->encrypt.lock);
memcpy(&g->target, i, sizeof(*i));
crypto_context_init(&g->decrypt, &g->target.decrypt);
crypto_context_init(&g->encrypt, &g->target.encrypt);
spin_lock_init(&g->ssrc_stats_lock);
g->ssrc_stats.lost_bits = -1;
rwlock_init(&g->outputs_lock);
if (i->num_destinations) {
err = -ENOMEM;
g->outputs = kzalloc(sizeof(*g->outputs) * i->num_destinations, GFP_KERNEL);
if (!g->outputs)
goto fail2;
g->outputs_unfilled = i->num_destinations;
}
err = gen_session_keys(&g->decrypt, &g->target.decrypt);
if (err)
goto fail2;
err = gen_session_keys(&g->encrypt, &g->target.encrypt);
if (err)
goto fail2;
@ -2335,11 +2369,67 @@ fail4:
if (ba)
kfree(ba);
fail2:
if (g->outputs)
kfree(g->outputs);
kfree(g);
fail1:
return err;
}
static int table_add_destination(struct rtpengine_table *t, struct rtpengine_destination_info *i) {
unsigned long flags;
int err;
struct rtpengine_target *g;
// validate input
if (!is_valid_address(&i->output.src_addr))
return -EINVAL;
if (!is_valid_address(&i->output.dst_addr))
return -EINVAL;
if (i->output.src_addr.family != i->output.dst_addr.family)
return -EINVAL;
if (validate_srtp(&i->output.encrypt))
return -EINVAL;
g = get_target(t, &i->local);
if (!g)
return -ENOENT;
// ready to fill in
_w_lock(&g->outputs_lock, flags);
if (!g->outputs_unfilled)
panic("BUG num of unfilled outputs %u", g->outputs_unfilled);
// out of range entry?
err = -ERANGE;
if (i->num >= g->target.num_destinations)
goto out;
// already filled?
err = -EEXIST;
if (g->outputs[i->num].output.src_addr.family)
goto out;
spin_lock_init(&g->outputs[i->num].encrypt.lock);
crypto_context_init(&g->outputs[i->num].encrypt, &i->output.encrypt);
err = gen_session_keys(&g->outputs[i->num].encrypt, &i->output.encrypt);
if (err)
goto out;
g->outputs[i->num].output = i->output;
g->outputs_unfilled--;
err = 0;
out:
_w_unlock(&g->outputs_lock, flags);
target_put(g);
return err;
}
@ -3312,16 +3402,16 @@ static inline ssize_t proc_control_read_write(struct file *file, char __user *ub
err = -ERANGE;
break;
case REMG_ADD:
case REMG_ADD_TARGET:
err = table_new_target(t, &msg->u.target);
break;
case REMG_DEL:
case REMG_DEL_TARGET:
err = table_del_target(t, &msg->u.target.local);
break;
case REMG_UPDATE:
err = -EOPNOTSUPP;
case REMG_ADD_DESTINATION:
err = table_add_destination(t, &msg->u.destination);
break;
case REMG_GET_STATS:
@ -4220,11 +4310,14 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t,
int rtp_pt_idx = -2;
unsigned int datalen, pllen;
uint32_t *u32;
struct rtp_parsed rtp;
struct rtp_parsed rtp, rtp2;
ssize_t offset;
uint64_t pkt_idx;
struct re_stream *stream;
struct re_stream_packet *packet;
const char *errstr = NULL;
unsigned long flags;
unsigned int i;
#if (RE_HAS_MEASUREDELAY)
uint64_t starttime, endtime, delay;
@ -4248,6 +4341,15 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t,
if (!g)
goto skip2;
// all our outputs filled?
_r_lock(&g->outputs_lock, flags);
if (g->outputs_unfilled) {
// pass to application
_r_unlock(&g->outputs_lock, flags);
goto skip1;
}
_r_unlock(&g->outputs_lock, flags);
DBG("target found, src "MIPF" -> dst "MIPF"\n", MIPP(g->target.src_addr), MIPP(g->target.dst_addr));
DBG("target decrypt hmac and cipher are %s and %s", g->decrypt.hmac->name,
g->decrypt.cipher->name);
@ -4338,15 +4440,6 @@ src_check_ok:
rtp.payload[16], rtp.payload[17], rtp.payload[18], rtp.payload[19]);
not_rtp:
if (g->target.mirror_addr.family) {
DBG("sending mirror packet to dst "MIPF"\n", MIPP(g->target.mirror_addr));
skb2 = skb_copy_expand(skb, MAX_HEADER, MAX_SKB_TAIL_ROOM, GFP_ATOMIC);
err = send_proxy_packet(skb2, &g->target.src_addr, &g->target.mirror_addr, g->target.tos,
par);
if (err)
atomic64_inc(&g->stats.errors);
}
if (g->target.do_intercept) {
DBG("do_intercept is set\n");
stream = get_stream_lock(NULL, g->target.intercept_stream_idx);
@ -4368,29 +4461,49 @@ intercept_done:
}
no_intercept:
if (rtp.ok) {
// SSRC substitution
if (g->target.transcoding && g->target.ssrc_out)
rtp.header->ssrc = g->target.ssrc_out;
pkt_idx = packet_index(&g->encrypt, &g->target.encrypt, rtp.header);
pllen = rtp.payload_len;
srtp_encrypt(&g->encrypt, &g->target.encrypt, &rtp, pkt_idx);
srtp_authenticate(&g->encrypt, &g->target.encrypt, &rtp, pkt_idx);
skb_put(skb, rtp.payload_len - pllen);
}
// output
for (i = 0; i < g->target.num_destinations; i++) {
struct rtpengine_output *o = &g->outputs[i];
// do we need a copy?
if (i == (g->target.num_destinations - 1))
skb2 = skb; // last iteration - use original
else {
// make copy
skb2 = skb_copy_expand(skb, MAX_HEADER, MAX_SKB_TAIL_ROOM, GFP_ATOMIC);
if (!skb2) {
log_err("out of memory while creating skb copy");
atomic64_inc(&g->stats.errors);
continue;
}
}
// adjust RTP pointers
offset = skb2->data - skb->data;
rtp2 = rtp;
rtp2.header = (void *) (((char *) rtp2.header) + offset);
rtp2.payload = (void *) (((char *) rtp2.payload) + offset);
if (rtp2.ok) {
// SSRC substitution
if (g->target.transcoding && o->output.ssrc_out)
rtp2.header->ssrc = o->output.ssrc_out;
pkt_idx = packet_index(&o->encrypt, &o->output.encrypt, rtp2.header);
pllen = rtp2.payload_len;
srtp_encrypt(&o->encrypt, &o->output.encrypt, &rtp2, pkt_idx);
srtp_authenticate(&o->encrypt, &o->output.encrypt, &rtp2, pkt_idx);
skb_put(skb2, rtp2.payload_len - pllen);
}
err = send_proxy_packet(skb, &g->target.src_addr, &g->target.dst_addr, g->target.tos, par);
err = send_proxy_packet(skb2, &o->output.src_addr, &o->output.dst_addr, o->output.tos, par);
if (err)
atomic64_inc(&g->stats.errors);
}
if (atomic64_read(&g->stats.packets)==0)
atomic_set(&g->stats.in_tos,in_tos);
if (err)
atomic64_inc(&g->stats.errors);
else {
atomic64_inc(&g->stats.packets);
atomic64_add(datalen, &g->stats.bytes);
}
atomic64_inc(&g->stats.packets);
atomic64_add(datalen, &g->stats.bytes);
if (rtp_pt_idx >= 0) {
atomic64_inc(&g->rtp_stats[rtp_pt_idx].packets);

@ -4,6 +4,7 @@
#define NUM_PAYLOAD_TYPES 16
#define MAX_FORWARD_DESTINATIONS 32
@ -95,23 +96,16 @@ struct rtpengine_target_info {
struct re_address local;
struct re_address expected_src; /* for incoming packets */
enum rtpengine_src_mismatch src_mismatch;
struct re_address src_addr; /* for outgoing packets */
struct re_address dst_addr;
struct re_address mirror_addr;
unsigned int num_destinations;
unsigned int intercept_stream_idx;
struct rtpengine_srtp decrypt;
struct rtpengine_srtp encrypt;
uint32_t ssrc; // Expose the SSRC to userspace when we resync.
uint32_t ssrc_out; // Rewrite SSRC
unsigned char payload_types[NUM_PAYLOAD_TYPES]; /* must be sorted */
uint32_t clock_rates[NUM_PAYLOAD_TYPES];
unsigned int num_payload_types;
unsigned char tos;
unsigned int rtcp_mux:1,
dtls:1,
stun:1,
@ -124,6 +118,22 @@ struct rtpengine_target_info {
rtp_stats:1; // requires SSRC and clock_rates to be set
};
struct rtpengine_output_info {
struct re_address src_addr; /* for outgoing packets */
struct re_address dst_addr;
struct rtpengine_srtp encrypt;
uint32_t ssrc_out; // Rewrite SSRC
unsigned char tos;
};
struct rtpengine_destination_info {
struct re_address local;
unsigned int num;
struct rtpengine_output_info output;
};
struct rtpengine_call_info {
unsigned int call_idx;
char call_id[256];
@ -158,9 +168,11 @@ struct rtpengine_message {
REMG_NOOP = 1,
/* target_info: */
REMG_ADD,
REMG_DEL,
REMG_UPDATE, // obsolete - not supported
REMG_ADD_TARGET,
REMG_DEL_TARGET,
/* destination_info: */
REMG_ADD_DESTINATION,
/* call_info: */
REMG_ADD_CALL,
@ -183,6 +195,7 @@ struct rtpengine_message {
union {
struct rtpengine_noop_info noop;
struct rtpengine_target_info target;
struct rtpengine_destination_info destination;
struct rtpengine_call_info call;
struct rtpengine_stream_info stream;
struct rtpengine_packet_info packet;
@ -196,6 +209,7 @@ struct rtpengine_list_entry {
struct rtpengine_target_info target;
struct rtpengine_stats stats;
struct rtpengine_rtp_stats rtp_stats[NUM_PAYLOAD_TYPES];
struct rtpengine_output_info outputs[MAX_FORWARD_DESTINATIONS];
};

Loading…
Cancel
Save