implement support for RFC 5761

git.mgm/mediaproxy-ng/github/master
Richard Fuchs 12 years ago
parent e87787af11
commit 43764b46d4

@ -89,6 +89,12 @@ struct call_stats {
struct stats totals[4]; /* rtp in, rtcp in, rtp out, rtcp out */
};
struct streamhandler {
int (*rewrite)(str *, struct streamrelay *);
int (*kernel_decrypt)(struct mediaproxy_srtp *, struct streamrelay *);
int (*kernel_encrypt)(struct mediaproxy_srtp *, struct streamrelay *);
};
static char *rtp_codecs[] = {
[0] = "G711u",
[1] = "1016",
@ -136,7 +142,7 @@ static int call_savp2avp_rtcp(str *s, struct streamrelay *r);
static int call_avpf2avp_rtcp(str *s, struct streamrelay *r);
static int call_avpf2savp_rtcp(str *s, struct streamrelay *r);
static int call_savpf2avp_rtcp(str *s, struct streamrelay *r);
static int call_savpf2savp_rtcp(str *s, struct streamrelay *r);
static int call_savpf2savp_rtcp(str *s, struct streamrelay *t);
static const struct streamhandler __sh_noop = {
.kernel_decrypt = __k_null,
@ -251,6 +257,7 @@ void kernelize(struct callstream *c) {
mpt.tos = cm->conf.tos;
mpt.src_addr.port = rp->fd.localport;
mpt.dst_addr.port = r->peer.port;
mpt.rtcp_mux = r->rtcp_mux;
if (IN6_IS_ADDR_V4MAPPED(&r->peer.ip46)) {
mpt.src_addr.family = AF_INET;
@ -290,6 +297,15 @@ no_kernel_stream:
/* returns: 0 = not a muxed stream, 1 = muxed, RTP, 2 = muxed, RTCP */
static int rtcp_demux(str *s, struct streamrelay *r) {
if (r->idx != 0)
return 0;
if (!r->rtcp_mux)
return 0;
return rtcp_demux_is_rtcp(s) ? 2 : 1;
}
static int call_avpf2avp_rtcp(str *s, struct streamrelay *r) {
return rtcp_avpf2avp(s);
}
@ -484,10 +500,10 @@ dummy:
/* called with r->up (== cs) locked */
static int stream_packet(struct streamrelay *sr_incoming, str *s, struct sockaddr_in6 *fsin) {
struct streamrelay *sr_outgoing, *sr_out_rtcp;
struct streamrelay *sr_outgoing, *sr_out_rtcp, *sr_in_rtcp;
struct peer *p_incoming, *p_outgoing;
struct callstream *cs_incoming;
int ret, update = 0, stun_ret = 0, handler_ret = 0;
int ret, update = 0, stun_ret = 0, handler_ret = 0, muxed_rtcp = 0;
struct sockaddr_in sin;
struct sockaddr_in6 sin6;
struct msghdr mh;
@ -529,9 +545,14 @@ static int stream_packet(struct streamrelay *sr_incoming, str *s, struct sockadd
return 0;
}
determine_handler(sr_incoming);
if (sr_incoming->handler->rewrite)
handler_ret = sr_incoming->handler->rewrite(s, sr_incoming);
sr_in_rtcp = sr_incoming;
muxed_rtcp = rtcp_demux(s, sr_incoming);
if (muxed_rtcp == 2)
sr_in_rtcp = &p_incoming->rtps[1];
determine_handler(sr_in_rtcp);
if (sr_in_rtcp->handler->rewrite)
handler_ret = sr_in_rtcp->handler->rewrite(s, sr_in_rtcp);
use_cand:
if (p_incoming->confirmed || !p_incoming->filled || sr_incoming->idx != 0)
@ -575,6 +596,17 @@ forward:
|| stun_ret || handler_ret)
goto drop;
if (muxed_rtcp == 2) {
/* demux */
sr_incoming = sr_in_rtcp;
sr_outgoing = sr_incoming->other;
}
else if (sr_incoming->idx == 1 && sr_outgoing->rtcp_mux) {
/* mux */
sr_incoming = &p_incoming->rtps[0];
sr_outgoing = sr_incoming->other;
}
ZERO(mh);
mh.msg_control = buf;
mh.msg_controllen = sizeof(buf);
@ -1307,6 +1339,7 @@ static int setup_peer(struct peer *p, struct stream_input *s, const str *tag) {
b->peer_advertised = b->peer;
a->rtcp = s->is_rtcp;
b->rtcp = 1;
a->other->rtcp_mux = s->rtcp_mux;
a->other->crypto.in = s->crypto;
b->other->crypto.in = s->crypto;

@ -85,6 +85,7 @@ struct stream_input {
struct crypto_context crypto;
int has_rtcp:1;
int is_rtcp:1;
int rtcp_mux:1;
};
struct udp_fd {
int fd;
@ -94,11 +95,7 @@ struct udp_fd {
struct streamrelay;
struct mediaproxy_srtp;
struct streamhandler {
int (*rewrite)(str *, struct streamrelay *);
int (*kernel_decrypt)(struct mediaproxy_srtp *, struct streamrelay *);
int (*kernel_encrypt)(struct mediaproxy_srtp *, struct streamrelay *);
};
struct streamhandler;
struct streamrelay {
struct udp_fd fd;
@ -114,6 +111,7 @@ struct streamrelay {
struct crypto_context_pair crypto;
int stun:1;
int rtcp:1;
int rtcp_mux:1;
int no_kernel_support:1;
};
struct relays_cache {

@ -458,3 +458,20 @@ error:
mylog(LOG_WARNING, "Discarded invalid SRTCP packet: %s", err);
return -1;
}
/* RFC 5761 section 4 */
int rtcp_demux_is_rtcp(const str *s) {
struct rtcp_packet *rtcp;
if (s->len < sizeof(*rtcp))
return 0;
rtcp = (void *) s->s;
if (rtcp->header.pt < 194)
return 0;
if (rtcp->header.pt > 223)
return 0;
return 1;
}

@ -25,6 +25,8 @@ int rtcp_avpf2avp(str *);
int rtcp_avp2savp(str *, struct crypto_context *);
int rtcp_savp2avp(str *, struct crypto_context *);
int rtcp_demux_is_rtcp(const str *);
#endif

@ -132,6 +132,7 @@ struct sdp_attribute {
ATTR_SENDRECV,
ATTR_SENDONLY,
ATTR_RECVONLY,
ATTR_RTCP_MUX,
} attr;
union {
@ -154,6 +155,10 @@ static str ice_foundation_str_alt;
static int has_rtcp(struct sdp_media *media);
static inline struct sdp_attribute *attr_get_by_id(struct sdp_attributes *a, int id) {
return g_hash_table_lookup(a->id_hash, &id);
}
@ -522,6 +527,8 @@ static int parse_attribute(struct sdp_attribute *a) {
case 'r':
if (!str_cmp(&a->name, "recvonly"))
a->attr = ATTR_RECVONLY;
if (!str_cmp(&a->name, "rtcp-mux"))
a->attr = ATTR_RTCP_MUX;
break;
}
break;
@ -831,9 +838,19 @@ int sdp_streams(const GQueue *sessions, GQueue *streams, GHashTable *streamhash,
if (!si || media->port_count != 1)
continue;
if (attr_get_by_id(&media->attributes, ATTR_RTCP_MUX)) {
si->rtcp_mux = 1;
continue;
}
attr = attr_get_by_id(&media->attributes, ATTR_RTCP);
if (!attr || !attr->u.rtcp.port_num)
continue;
if (attr->u.rtcp.port_num == si->stream.port) {
si->rtcp_mux = 1;
continue;
}
if (attr->u.rtcp.port_num == si->stream.port + 1)
continue;
@ -949,16 +966,24 @@ static int skip_over(struct sdp_chopper *chop, str *where) {
return 0;
}
static void fill_relays(struct streamrelay **rtp, struct streamrelay **rtcp, GList *m,
int off, struct stream_input *sip)
static int fill_relays(struct streamrelay **rtp, struct streamrelay **rtcp, GList *m,
int off, struct stream_input *sip, struct sdp_media *media)
{
*rtp = &((struct callstream *) m->data)->peers[off].rtps[0];
if (rtcp) {
*rtcp = &((struct callstream *) m->data)->peers[off].rtps[1];
if (sip && sip->has_rtcp && m->next)
*rtcp = &((struct callstream *) m->next->data)->peers[off].rtps[0];
}
if (!rtcp)
return 1;
*rtcp = &((struct callstream *) m->data)->peers[off].rtps[1];
if (sip && sip->has_rtcp && m->next)
*rtcp = &((struct callstream *) m->next->data)->peers[off].rtps[0];
if ((*rtp)->rtcp_mux)
return 2;
if (!has_rtcp(media))
return 3;
return 0;
}
static int replace_transport_protocol(struct sdp_chopper *chop,
@ -1009,7 +1034,7 @@ static int replace_consecutive_port_count(struct sdp_chopper *chop, struct sdp_m
m = m->next;
if (!m)
goto warn;
fill_relays(&sr, NULL, m, off, NULL);
fill_relays(&sr, NULL, m, off, NULL, media);
if (sr->fd.localport != rtp->fd.localport + cons * 2) {
warn:
mylog(LOG_WARN, "Failed to handle consecutive ports");
@ -1150,6 +1175,7 @@ static int process_media_attributes(struct sdp_chopper *chop, struct sdp_attribu
goto strip;
case ATTR_RTCP:
case ATTR_RTCP_MUX:
goto strip;
case ATTR_CRYPTO:
@ -1188,7 +1214,12 @@ static GList *find_stream_num(GList *m, int num) {
}
static int has_rtcp(struct sdp_media *media) {
struct sdp_session *session = media->session;
struct sdp_session *session;
if (!media)
return 0;
session = media->session;
if ((media->rr == -1 ? session->rr : media->rr) != 0
&& (media->rs == -1 ? session->rs : media->rs) != 0)
@ -1240,7 +1271,8 @@ static void insert_candidates(struct sdp_chopper *chop, struct streamrelay *rtp,
insert_ice_address(chop, rtp);
chopper_append_c(chop, " typ host\r\n");
if (has_rtcp(media)) {
if (rtcp) {
/* rtcp-mux only possible in answer */
chopper_append_c(chop, "a=candidate:");
chopper_append_str(chop, &ice_foundation_str);
chopper_append_printf(chop, " 2 UDP %lu ", priority - 1);
@ -1259,7 +1291,7 @@ static void insert_candidates_alt(struct sdp_chopper *chop, struct streamrelay *
insert_ice_address_alt(chop, rtp);
chopper_append_c(chop, " typ host\r\n");
if (has_rtcp(media)) {
if (rtcp) {
chopper_append_c(chop, "a=candidate:");
chopper_append_str(chop, &ice_foundation_str_alt);
chopper_append_printf(chop, " 2 UDP %lu ", priority - 1);
@ -1315,11 +1347,13 @@ static int generate_crypto(struct sdp_media *media, struct sdp_ng_flags *flags,
*c = *src;
mutex_unlock(&rtp->up->up->lock);
mutex_lock(&rtcp->up->up->lock);
c = &rtcp->crypto.out;
if (!c->crypto_suite)
*c = *src;
mutex_unlock(&rtcp->up->up->lock);
if (rtcp) {
mutex_lock(&rtcp->up->up->lock);
c = &rtcp->crypto.out;
if (!c->crypto_suite)
*c = *src;
mutex_unlock(&rtcp->up->up->lock);
}
return 0;
}
@ -1356,19 +1390,21 @@ static int generate_crypto(struct sdp_media *media, struct sdp_ng_flags *flags,
p, &state, &save);
p += g_base64_encode_close(0, p, &state, &save);
mutex_lock(&rtcp->up->up->lock);
if (rtcp) {
mutex_lock(&rtcp->up->up->lock);
src = c;
c = &rtcp->crypto.out;
src = c;
c = &rtcp->crypto.out;
c->crypto_suite = src->crypto_suite;
c->tag = src->tag;
memcpy(c->master_key, src->master_key,
c->crypto_suite->master_key_len);
memcpy(c->master_salt, src->master_salt,
c->crypto_suite->master_salt_len);
c->crypto_suite = src->crypto_suite;
c->tag = src->tag;
memcpy(c->master_key, src->master_key,
c->crypto_suite->master_key_len);
memcpy(c->master_salt, src->master_salt,
c->crypto_suite->master_salt_len);
mutex_unlock(&rtcp->up->up->lock);
mutex_unlock(&rtcp->up->up->lock);
}
chopper_append_c(chop, "a=crypto:");
chopper_append_printf(chop, "%u ", c->tag);
@ -1387,7 +1423,7 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call *call,
struct sdp_session *session;
struct sdp_media *media;
GList *l, *k, *m;
int off, do_ice;
int off, do_ice, r_flags;
struct stream_input si, *sip;
struct streamrelay *rtp, *rtcp;
unsigned long priority;
@ -1399,7 +1435,7 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call *call,
for (l = sessions->head; l; l = l->next) {
session = l->data;
fill_relays(&rtp, &rtcp, m, off, NULL);
fill_relays(&rtp, &rtcp, m, off, NULL, NULL);
if (session->origin.parsed && flags->replace_origin) {
if (replace_network_address(chop, &session->origin.address, rtp))
@ -1430,7 +1466,7 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call *call,
m = find_stream_num(m, sip->stream.num);
if (!m)
goto error;
fill_relays(&rtp, &rtcp, m, off, sip);
r_flags = fill_relays(&rtp, &rtcp, m, off, sip, media);
rtp->peer.protocol = flags->transport_protocol;
rtcp->peer.protocol = rtp->peer.protocol;
@ -1458,11 +1494,16 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call *call,
continue;
}
if (has_rtcp(media)) {
if (r_flags == 0) {
chopper_append_c(chop, "a=rtcp:");
chopper_append_printf(chop, "%hu", rtcp->fd.localport);
chopper_append_c(chop, "\r\n");
}
else if (r_flags == 2) {
chopper_append_c(chop, "a=rtcp:");
chopper_append_printf(chop, "%hu", rtp->fd.localport);
chopper_append_c(chop, "\r\na=rtcp-mux\r\n");
}
generate_crypto(media, flags, rtp, rtcp, chop);
@ -1494,11 +1535,13 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call *call,
if (!flags->ice_remove) {
priority = new_priority(flags->ice_force ? NULL : media);
insert_candidates(chop, rtp, rtcp, priority, media);
insert_candidates(chop, rtp, (r_flags == 2) ? NULL : rtcp,
priority, media);
if (callmaster_has_ipv6(rtp->up->up->call->callmaster)) {
priority -= 256;
insert_candidates_alt(chop, rtp, rtcp, priority, media);
insert_candidates_alt(chop, rtp, (r_flags == 2) ? NULL : rtcp,
priority, media);
}
}
}

@ -1951,6 +1951,14 @@ static inline int srtp_decrypt(struct mp_crypto_context *c,
return c->cipher->decrypt(c, s, r, pkt_idx);
}
static inline int is_muxed_rtcp(struct rtp_parsed *r) {
if (r->header->m_pt < 194)
return 0;
if (r->header->m_pt > 223)
return 0;
return 1;
}
static unsigned int mediaproxy46(struct sk_buff *skb, struct mediaproxy_table *t) {
struct udphdr *uh;
struct mediaproxy_target *g;
@ -1999,7 +2007,9 @@ not_stun:
g->decrypt.cipher->name);
if (parse_rtp(&rtp, skb))
goto not_rtp;
goto skip1;
if (g->target.rtcp_mux && is_muxed_rtcp(&rtp))
goto skip1;
pkt_idx = packet_index(&g->decrypt, &g->target.decrypt, rtp.header);
if (srtp_auth_validate(&g->decrypt, &g->target.decrypt, &rtp, pkt_idx))
goto skip_error;
@ -2015,7 +2025,6 @@ not_stun:
rtp.payload[12], rtp.payload[13], rtp.payload[14], rtp.payload[15],
rtp.payload[16], rtp.payload[17], rtp.payload[18], rtp.payload[19]);
not_rtp:
if (g->target.mirror_addr.family) {
DBG("sending mirror packet to dst "MIPF"\n", MIPP(g->target.mirror_addr));
skb2 = skb_copy(skb, GFP_ATOMIC);
@ -2051,6 +2060,7 @@ skip_error:
spin_lock_irqsave(&g->stats_lock, flags);
g->stats.errors++;
spin_unlock_irqrestore(&g->stats_lock, flags);
skip1:
target_push(g);
skip2:
kfree_skb(skb);

@ -65,6 +65,7 @@ struct mediaproxy_target_info {
struct mediaproxy_srtp encrypt;
unsigned char tos;
int rtcp_mux:1;
};
struct mediaproxy_message {

@ -15,7 +15,7 @@ use MIME::Base64;
my ($NUM, $RUNTIME, $STREAMS, $PAYLOAD, $INTERVAL, $RTCP_INTERVAL, $STATS_INTERVAL)
= (1000, 30, 1, 160, 20, 5, 5);
my ($NODEL, $IP, $IPV6, $KEEPGOING, $REINVITES, $BRANCHES, $PROTOS, $DEST, $SUITES, $NOENC);
my ($NODEL, $IP, $IPV6, $KEEPGOING, $REINVITES, $BRANCHES, $PROTOS, $DEST, $SUITES, $NOENC, $RTCPMUX);
GetOptions(
'no-delete' => \$NODEL,
'num-calls=i' => \$NUM,
@ -34,6 +34,7 @@ GetOptions(
'stats-interval=i'=>\$STATS_INTERVAL,
'suites=s' => \$SUITES,
'no-encrypt' => \$NOENC,
'rtcp-mux' => \$RTCPMUX,
) or die;
($IP || $IPV6) or die("at least one of --local-ip or --local-ipv6 must be given");
@ -508,8 +509,20 @@ sub do_rtp {
$rtcp or next;
($payload, $expect) = $$trans[$a]{rtcp_func}($$trans[$b], $tcx, $tcx_o);
$dst = $$pr{sockaddr}($$outputs[$b][$j][0] + 1, $addr);
$repl = send_receive($$cfds[$a][$j], $$cfds[$b][$j], $payload, $dst);
my $dstport = $$outputs[$b][$j][0] + 1;
my $sendfd = $$cfds[$a][$j];
my $expfd = $$cfds[$b][$j];
if ($RTCPMUX && !$a) {
if (!$a) {
$dstport--;
$sendfd = $$fds[$a][$j];
}
else {
$expfd = $$fds[$b][$j];
}
}
$dst = $$pr{sockaddr}($dstport, $addr);
$repl = send_receive($sendfd, $expfd, $payload, $dst);
$NOENC and $repl = $expect;
$repl eq $expect or die hexdump($repl, $expect) . " $$trans[$a]{name} > $$trans[$b]{name}";
}
@ -663,6 +676,13 @@ m=audio $p $$tr{name} 8
a=rtpmap:8 PCMA/8000
a=rtcp:$cp
!
if ($RTCPMUX && !$i) {
$sdp .= "a=rtcp-mux\n";
rand() >= .5 and $sdp .= "a=rtcp:$p\n";
}
else {
$sdp .= "a=rtcp:$cp\n";
}
$$tr{sdp_media_params} and $sdp .= $$tr{sdp_media_params}($tcx);
}
$i or print("transport is $$tr{name} -> $$tr_o{name}\n");
@ -682,6 +702,7 @@ a=rtcp:$cp
$$o{result} eq 'ok' or die;
my ($rp_af, $rp_add) = $$o{sdp} =~ /c=IN IP([46]) (\S+)/s or die;
$RTCPMUX && $i and ($$o{sdp} =~ /a=rtcp-mux/s or die);
my @rp_ports = $$o{sdp} =~ /m=audio (\d+) \Q$$tr_o{name}\E /gs or die;
$rp_af ne $$pr_o{reply} and die "incorrect address family reply code";
my $rpl_a = $$c{outputs} || ($$c{outputs} = []);

Loading…
Cancel
Save