From c2ff5c3fd3821c74395d594c99d2069c651fc3fd Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 12 Feb 2015 12:10:15 -0500 Subject: [PATCH] MT#9935 understand, learn, remember and report RTP payload types (codecs) Squashed commit of the following: commit cca40e8e311e6884204289687ba2a05d0855720b Author: Richard Fuchs Date: Thu Feb 12 10:17:51 2015 -0500 support per-payload-type stats in kernel module commit dcc0dc0002bd552ae7c99aa58311af2f81336a8f Author: Richard Fuchs Date: Wed Feb 11 12:14:44 2015 -0500 count unknown rtp type as error commit 941bde0df59720d1d3ef6660096cf2532a5c7e1c Author: Richard Fuchs Date: Wed Feb 11 12:01:55 2015 -0500 use the list of rtp formats from the m= line This avoids dynamically altering the rtpstats hash table and makes keeping packet stats lock free. commit 9150fed671d8490f4c09fb3050002c7c558391df Author: Richard Fuchs Date: Fri Feb 6 15:06:04 2015 -0500 fix and simplify rtpmap hash table commit 1f73741cbf2ac7d6b8d0a54d9562e9a550678e7c Author: Richard Fuchs Date: Fri Feb 6 13:56:07 2015 -0500 MT#9935 understand, learn, remember and report RTP payload types (codecs) commit b0d690837c02989485cf73927a89ed860a299807 Author: Richard Fuchs Date: Thu Nov 20 13:36:25 2014 -0500 parse a=rtpmap attribute --- daemon/aux.h | 5 + daemon/call.c | 205 +++++++++++++++++++++++++++++++++- daemon/call.h | 18 +++ daemon/call_interfaces.c | 21 ++-- daemon/rtcp.c | 8 +- daemon/rtp.c | 64 ++++++++++- daemon/rtp.h | 11 ++ daemon/sdp.c | 206 ++++++++++++++++++++++++++++------- daemon/str.c | 4 + daemon/str.h | 4 + kernel-module/xt_RTPENGINE.c | 47 +++++++- kernel-module/xt_RTPENGINE.h | 15 +++ tests/simulator-ng.pl | 3 +- 13 files changed, 555 insertions(+), 56 deletions(-) diff --git a/daemon/aux.h b/daemon/aux.h index 80bf36c4b..51aab5bc3 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -86,6 +86,11 @@ INLINE void g_queue_truncate(GQueue *q, unsigned int len) { while (q->length > len) g_queue_pop_tail(q); } +INLINE void g_queue_clear_full(GQueue *q, GDestroyNotify free_func) { + void *p; + while ((p = g_queue_pop_head(q))) + free_func(p); +} INLINE void strmove(char **d, char **s) { diff --git a/daemon/call.c b/daemon/call.c index a26715a4c..1436f61a9 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -77,42 +77,49 @@ const struct transport_protocol transport_protocols[] = { [PROTO_RTP_AVP] = { .index = PROTO_RTP_AVP, .name = "RTP/AVP", + .rtp = 1, .srtp = 0, .avpf = 0, }, [PROTO_RTP_SAVP] = { .index = PROTO_RTP_SAVP, .name = "RTP/SAVP", + .rtp = 1, .srtp = 1, .avpf = 0, }, [PROTO_RTP_AVPF] = { .index = PROTO_RTP_AVPF, .name = "RTP/AVPF", + .rtp = 1, .srtp = 0, .avpf = 1, }, [PROTO_RTP_SAVPF] = { .index = PROTO_RTP_SAVPF, .name = "RTP/SAVPF", + .rtp = 1, .srtp = 1, .avpf = 1, }, [PROTO_UDP_TLS_RTP_SAVP] = { .index = PROTO_UDP_TLS_RTP_SAVP, .name = "UDP/TLS/RTP/SAVP", + .rtp = 1, .srtp = 1, .avpf = 0, }, [PROTO_UDP_TLS_RTP_SAVPF] = { .index = PROTO_UDP_TLS_RTP_SAVPF, .name = "UDP/TLS/RTP/SAVPF", + .rtp = 1, .srtp = 1, .avpf = 1, }, [PROTO_UDPTL] = { .index = PROTO_UDPTL, .name = "udptl", + .rtp = 0, .srtp = 0, .avpf = 0, }, @@ -354,6 +361,16 @@ INLINE void __re_address_translate(struct re_address *o, const struct endpoint * o->port = ep->port; } +static int __rtp_stats_pt_sort(const void *ap, const void *bp) { + const struct rtp_stats *a = ap, *b = bp; + + if (a->payload_type < b->payload_type) + return -1; + if (a->payload_type > b->payload_type) + return 1; + return 0; +} + /* called with in_lock held */ void kernelize(struct packet_stream *stream) { struct rtpengine_target_info reti; @@ -438,6 +455,23 @@ void kernelize(struct packet_stream *stream) { ZERO(stream->kernel_stats); + if (stream->media->protocol && stream->media->protocol->rtp) { + GList *values, *l; + struct rtp_stats *rs; + + reti.rtp = 1; + values = g_hash_table_get_values(stream->rtp_stats); + values = g_list_sort(values, __rtp_stats_pt_sort); + for (l = values; l; l = l->next) { + if (reti.num_payload_types >= G_N_ELEMENTS(reti.payload_types)) { + ilog(LOG_WARNING, "Too many RTP payload types for kernel module"); + break; + } + rs = l->data; + reti.payload_types[reti.num_payload_types++] = rs->payload_type; + } + } + kernel_add_stream(cm->conf.kernelfd, &reti, 0); PS_SET(stream, KERNELIZED); @@ -588,6 +622,7 @@ void stream_msg_mh_src(struct packet_stream *ps, struct msghdr *mh) { } } +/* XXX split this function into pieces */ /* called lock-free */ static int stream_packet(struct stream_fd *sfd, str *s, struct sockaddr_in6 *fsin, struct in6_addr *dst) { struct packet_stream *stream, @@ -608,6 +643,8 @@ static int stream_packet(struct stream_fd *sfd, str *s, struct sockaddr_in6 *fsi struct endpoint endpoint; rewrite_func rwf_in, rwf_out; struct interface_address *loc_addr; + struct rtp_header *rtp_h; + struct rtp_stats *rtp_s; call = sfd->call; cm = call->callmaster; @@ -626,6 +663,9 @@ static int stream_packet(struct stream_fd *sfd, str *s, struct sockaddr_in6 *fsi if (!stream->sfd) goto done; + + /* demux other protocols running on this port */ + if (MEDIA_ISSET(media, DTLS) && is_dtls(s)) { ret = dtls(stream, s, fsin); if (!ret) @@ -672,6 +712,9 @@ loop_ok: mutex_unlock(&stream->in_lock); + + /* demux RTCP */ + in_srtp = stream; sink = stream->rtp_sink; if (!sink && PS_ISSET(stream, RTCP)) { @@ -690,6 +733,29 @@ loop_ok: if (rtcp && sink && sink->rtcp_sibling) out_srtp = sink->rtcp_sibling; + + /* stats per RTP payload type */ + + if (media->protocol && media->protocol->rtp && !rtcp && !rtp_payload(&rtp_h, NULL, s)) { + i = (rtp_h->m_pt & 0x7f); + + rtp_s = g_hash_table_lookup(stream->rtp_stats, &i); + if (!rtp_s) { + ilog(LOG_WARNING | LOG_FLAG_LIMIT, + "RTP packet with unknown payload type %u received", i); + atomic64_inc(&stream->stats.errors); + atomic64_inc(&cm->statsps.errors); + } + + else { + atomic64_inc(&rtp_s->packets); + atomic64_add(&rtp_s->bytes, s->len); + } + } + + + /* do we have somewhere to forward it to? */ + if (!sink || !sink->sfd || !out_srtp->sfd || !in_srtp->sfd) { ilog(LOG_WARNING, "RTP packet from %s discarded", addr); atomic64_inc(&stream->stats.errors); @@ -697,6 +763,9 @@ loop_ok: goto unlock_out; } + + /* transcoding stuff, in and out */ + mutex_lock(&in_srtp->in_lock); determine_handler(in_srtp, sink); @@ -1303,6 +1372,8 @@ static void callmaster_timer(void *ptr) { struct stats tmpstats; int j, update; struct stream_fd *sfd; + struct rtp_stats *rs; + unsigned int pt; ZERO(hlp); @@ -1345,6 +1416,21 @@ static void callmaster_timer(void *ptr) { atomic64_set(&ps->kernel_stats.packets, ke->stats.packets); atomic64_set(&ps->kernel_stats.errors, ke->stats.errors); + for (j = 0; j < ke->target.num_payload_types; j++) { + pt = ke->target.payload_types[j]; + rs = g_hash_table_lookup(ps->rtp_stats, &pt); + if (!rs) + continue; + if (ke->rtp_stats[j].packets > atomic64_get(&rs->packets)) + atomic64_add(&rs->packets, + ke->rtp_stats[j].packets - atomic64_get(&rs->packets)); + if (ke->rtp_stats[j].bytes > atomic64_get(&rs->bytes)) + atomic64_add(&rs->bytes, + ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes)); + atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets); + atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); + } + update = 0; sink = packet_stream_sink(ps); @@ -1588,6 +1674,10 @@ fail: return -1; } +static void __payload_type_free(void *p) { + g_slice_free1(sizeof(struct rtp_payload_type), p); +} + static struct call_media *__get_media(struct call_monologue *ml, GList **it, const struct stream_params *sp) { struct call_media *med; @@ -1613,6 +1703,7 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con med->call = ml->call; med->index = sp->index; call_str_cpy(ml->call, &med->type, &sp->type); + med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free); g_queue_push_tail(&ml->medias, med); *it = ml->medias.tail; @@ -1751,6 +1842,10 @@ static int __wildcard_endpoint_map(struct call_media *media, unsigned int num_po return 0; } +static void __rtp_stats_free(void *p) { + g_slice_free1(sizeof(struct rtp_stats), p); +} + struct packet_stream *__packet_stream_new(struct call *call) { struct packet_stream *stream; @@ -1759,6 +1854,8 @@ struct packet_stream *__packet_stream_new(struct call *call) { mutex_init(&stream->out_lock); stream->call = call; atomic64_set_na(&stream->last_packet, poller_now); + stream->rtp_stats = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __rtp_stats_free); + call->streams = g_slist_prepend(call->streams, stream); return stream; @@ -1822,6 +1919,32 @@ static int __init_stream(struct packet_stream *ps) { return 0; } +static void __rtp_stats_update(GHashTable *dst, GHashTable *src) { + struct rtp_stats *rs; + struct rtp_payload_type *pt; + GList *values, *l; + + /* "src" is a call_media->rtp_payload_types table, while "dst" is a + * packet_stream->rtp_stats table */ + + values = g_hash_table_get_values(src); + + for (l = values; l; l = l->next) { + pt = l->data; + rs = g_hash_table_lookup(dst, &pt->payload_type); + if (rs) + continue; + + rs = g_slice_alloc0(sizeof(*rs)); + rs->payload_type = pt->payload_type; + g_hash_table_insert(dst, &rs->payload_type, rs); + } + + g_list_free(values); + + /* we leave previously added but now removed payload types in place */ +} + static int __init_streams(struct call_media *A, struct call_media *B, const struct stream_params *sp) { GList *la, *lb; struct packet_stream *a, *ax, *b; @@ -1837,7 +1960,9 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru /* RTP */ a->rtp_sink = b; - PS_SET(a, RTP); + PS_SET(a, RTP); /* XXX technically not correct, could be udptl too */ + + __rtp_stats_update(a->rtp_stats, A->rtp_payload_types); if (sp) { __fill_stream(a, &sp->rtp_endpoint, port_off); @@ -2199,6 +2324,19 @@ static void __dtls_logic(const struct sdp_ng_flags *flags, struct call_media *me MEDIA_SET(other_media, DTLS); } +static void __rtp_payload_types(struct call_media *media, GQueue *types) { + struct rtp_payload_type *pt; + struct call *call = media->call; + + /* we steal the entire list to avoid duplicate allocs */ + while ((pt = g_queue_pop_head(types))) { + /* but we must duplicate the contents */ + call_str_cpy(call, &pt->encoding, &pt->encoding); + call_str_cpy(call, &pt->encoding_parameters, &pt->encoding_parameters); + g_hash_table_replace(media->rtp_payload_types, &pt->payload_type, pt); + } +} + /* called with call->master_lock held in W */ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, const struct sdp_ng_flags *flags) @@ -2266,6 +2404,8 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, if (other_media->sdes_in.params.crypto_suite) MEDIA_SET(other_media, SDES); + __rtp_payload_types(media, &sp->rtp_payload_types); + /* send and recv are from our POV */ bf_copy_same(&media->media_flags, &sp->sp_flags, SP_FLAG_SEND | SP_FLAG_RECV); @@ -2422,6 +2562,47 @@ static void timeval_totalstats_average_add(struct totalstats *s, const struct ti mutex_unlock(&s->total_average_lock); } +static int __rtp_stats_sort(const void *ap, const void *bp) { + const struct rtp_stats *a = ap, *b = bp; + + /* descending order */ + if (atomic64_get(&a->packets) > atomic64_get(&b->packets)) + return -1; + if (atomic64_get(&a->packets) < atomic64_get(&b->packets)) + return 1; + return 0; +} + +static const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m) { + struct packet_stream *ps; + GList *values; + struct rtp_stats *rtp_s; + const struct rtp_payload_type *rtp_pt = NULL; + + /* we only use the primary packet stream for the time being */ + if (!m->streams.head) + return NULL; + + ps = m->streams.head->data; + + values = g_hash_table_get_values(ps->rtp_stats); + if (!values) + return NULL; + + values = g_list_sort(values, __rtp_stats_sort); + + /* payload type with the most packets */ + rtp_s = values->data; + if (atomic64_get(&rtp_s->packets) == 0) + goto out; + + rtp_pt = rtp_payload_type(rtp_s->payload_type, m->rtp_payload_types); + +out: + g_list_free(values); + return rtp_pt; /* may be NULL */ +} + /* called lock-free, but must hold a reference to the call */ void call_destroy(struct call *c) { struct callmaster *m = c->callmaster; @@ -2439,6 +2620,7 @@ void call_destroy(struct call *c) { char* cdrbufcur = cdrbuffer; int cdrlinecnt = 0; int found = 0; + const struct rtp_payload_type *rtp_pt; rwlock_lock_w(&m->hashlock); ret = g_hash_table_remove(m->callhash, &c->callid); @@ -2492,6 +2674,22 @@ void call_destroy(struct call *c) { for (k = ml->medias.head; k; k = k->next) { md = k->data; + rtp_pt = __rtp_stats_codec(md); +#define MLL_PREFIX "------ Media #%u ("STR_FORMAT" over %s) using " /* media log line prefix */ +#define MLL_COMMON /* common args */ \ + md->index, \ + STR_FMT(&md->type), \ + md->protocol ? md->protocol->name : "(unknown)" + if (!rtp_pt) + ilog(LOG_INFO, MLL_PREFIX "unknown codec", MLL_COMMON); + else if (!rtp_pt->encoding_parameters.s) + ilog(LOG_INFO, MLL_PREFIX ""STR_FORMAT"/%u", MLL_COMMON, + STR_FMT(&rtp_pt->encoding), rtp_pt->clock_rate); + else + ilog(LOG_INFO, MLL_PREFIX ""STR_FORMAT"/%u/"STR_FORMAT"", MLL_COMMON, + STR_FMT(&rtp_pt->encoding), rtp_pt->clock_rate, + STR_FMT(&rtp_pt->encoding_parameters)); + for (o = md->streams.head; o; o = o->next) { ps = o->data; @@ -2523,9 +2721,8 @@ void call_destroy(struct call *c) { atomic64_get(&ps->last_packet)); } - ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, " + ilog(LOG_INFO, "--------- Port %5u <> %15s:%-5hu%s, " ""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet", - md->index, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), addr, ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", @@ -2751,6 +2948,7 @@ static void __call_free(void *p) { g_queue_clear(&em->sfds); g_slice_free1(sizeof(*em), em); } + g_hash_table_destroy(md->rtp_payload_types); g_slice_free1(sizeof(*md), md); } g_queue_clear(&m->medias); @@ -2763,6 +2961,7 @@ static void __call_free(void *p) { while (c->streams) { ps = c->streams->data; c->streams = g_slist_delete_link(c->streams, c->streams); + g_hash_table_destroy(ps->rtp_stats); g_slice_free1(sizeof(*ps), ps); } diff --git a/daemon/call.h b/daemon/call.h index aa6146505..057d4a2be 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -3,6 +3,9 @@ +/* XXX split everything into call_signalling.[ch] and call_packets.[ch] or w/e */ + + #include #include @@ -66,6 +69,7 @@ struct call_monologue; #include "str.h" #include "crypto.h" #include "dtls.h" +#include "rtp.h" @@ -179,6 +183,7 @@ typedef bencode_buffer_t call_buffer_t; struct transport_protocol { enum transport_protocol_index index; const char *name; + int rtp:1; /* also set to 1 for SRTP */ int srtp:1; int avpf:1; }; @@ -230,6 +235,7 @@ struct stream_params { int desired_family; struct dtls_fingerprint fingerprint; unsigned int sp_flags; + GQueue rtp_payload_types; /* slice-alloc'd */ }; struct stream_fd { @@ -252,6 +258,14 @@ struct loop_protector { unsigned char buf[RTP_LOOP_PROTECT]; }; +struct rtp_stats { + unsigned int payload_type; + atomic64 packets; + atomic64 bytes; + atomic64 kernel_packets; + atomic64 kernel_bytes; +}; + struct packet_stream { mutex_t in_lock, out_lock; @@ -274,6 +288,7 @@ struct packet_stream { struct stats stats; struct stats kernel_stats; atomic64 last_packet; + GHashTable *rtp_stats; /* LOCK: call->master_lock */ #if RTP_LOOP_PROTECT /* LOCK: in_lock: */ @@ -317,6 +332,7 @@ struct call_media { GQueue streams; /* normally RTP + RTCP */ GSList *endpoint_maps; + GHashTable *rtp_payload_types; unsigned int media_flags; }; @@ -483,6 +499,8 @@ INLINE void *call_malloc(struct call *c, size_t l) { INLINE char *call_strdup_len(struct call *c, const char *s, unsigned int len) { char *r; + if (!s) + return NULL; r = call_malloc(c, len + 1); memcpy(r, s, len); r[len] = 0; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index fa6af5243..ff82e153b 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -16,6 +16,7 @@ #include "str.h" #include "control_tcp.h" #include "control_udp.h" +#include "rtp.h" @@ -274,14 +275,20 @@ static void streams_parse(const char *s, struct callmaster *m, GQueue *q) { pcre_multi_match(m->streams_re, m->streams_ree, s, 3, streams_parse_func, &i, q); } -static void streams_free(GQueue *q) { - struct stream_params *s; +/* XXX move these somewhere else */ +static void rtp_pt_free(void *p) { + g_slice_free1(sizeof(struct rtp_payload_type), p); +} +static void sp_free(void *p) { + struct stream_params *s = p; - while ((s = g_queue_pop_head(q))) { - if (s->crypto.mki) - free(s->crypto.mki); - g_slice_free1(sizeof(*s), s); - } + if (s->crypto.mki) + free(s->crypto.mki); + g_queue_clear_full(&s->rtp_payload_types, rtp_pt_free); + g_slice_free1(sizeof(*s), s); +} +static void streams_free(GQueue *q) { + g_queue_clear_full(q, sp_free); } diff --git a/daemon/rtcp.c b/daemon/rtcp.c index aa9bc3e06..793bc39c5 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -240,11 +240,11 @@ static struct rtcp_chain_element *rtcp_psfb(str *s) { return rtcp_generic(s, RTCP_PT_PSFB); } +static void rtcp_ce_free(void *p) { + g_slice_free1(sizeof(struct rtcp_chain_element), p); +} static void rtcp_list_free(GQueue *q) { - struct rtcp_chain_element *el; - - while ((el = g_queue_pop_head(q))) - g_slice_free1(sizeof(*el), el); + g_queue_clear_full(q, rtcp_ce_free); } static int rtcp_parse(GQueue *q, str *_s) { diff --git a/daemon/rtp.c b/daemon/rtp.c index c7fd65d9b..a1ba9ebf4 100644 --- a/daemon/rtp.c +++ b/daemon/rtp.c @@ -20,6 +20,44 @@ struct rtp_extension { +#define RFC_TYPE(type, name, c_rate) \ + [type] = { \ + .payload_type = type, \ + .encoding = STR_CONST_INIT(#name), \ + .clock_rate = c_rate, \ + } + +static const struct rtp_payload_type __rfc_types[] = +{ + RFC_TYPE(0, PCMU, 8000), + RFC_TYPE(3, GSM, 8000), + RFC_TYPE(4, G723, 8000), + RFC_TYPE(5, DVI4, 8000), + RFC_TYPE(6, DVI4, 16000), + RFC_TYPE(7, LPC, 8000), + RFC_TYPE(8, PCMA, 8000), + RFC_TYPE(9, G722, 8000), + RFC_TYPE(10, L16, 44100), + RFC_TYPE(11, L16, 44100), + RFC_TYPE(12, QCELP, 8000), + RFC_TYPE(13, CN, 8000), + RFC_TYPE(14, MPA, 90000), + RFC_TYPE(15, G728, 8000), + RFC_TYPE(16, DVI4, 11025), + RFC_TYPE(17, DVI4, 22050), + RFC_TYPE(18, G729, 8000), + RFC_TYPE(25, CelB, 90000), + RFC_TYPE(26, JPEG, 90000), + RFC_TYPE(28, nv, 90000), + RFC_TYPE(31, H261, 90000), + RFC_TYPE(32, MPV, 90000), + RFC_TYPE(33, MP2T, 90000), + RFC_TYPE(34, H263, 90000), +}; + + + + INLINE int check_session_keys(struct crypto_context *c) { str s; const char *err; @@ -51,7 +89,7 @@ error: return -1; } -static int rtp_payload(struct rtp_header **out, str *p, const str *s) { +int rtp_payload(struct rtp_header **out, str *p, const str *s) { struct rtp_header *rtp; struct rtp_extension *ext; const char *err; @@ -65,6 +103,9 @@ static int rtp_payload(struct rtp_header **out, str *p, const str *s) { if ((rtp->v_p_x_cc & 0xc0) != 0x80) /* version 2 */ goto error; + if (!p) + goto done; + *p = *s; /* fixed header */ str_shift(p, sizeof(*rtp)); @@ -84,6 +125,7 @@ static int rtp_payload(struct rtp_header **out, str *p, const str *s) { goto error; } +done: *out = rtp; return 0; @@ -266,3 +308,23 @@ error: ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Invalid SRTP/SRTCP packet received (short packet)"); return -1; } + +const struct rtp_payload_type *rtp_payload_type(unsigned int type, GHashTable *lookup) { + const struct rtp_payload_type *rtp_pt; + + if (!lookup) + goto rfc_types; + + rtp_pt = g_hash_table_lookup(lookup, &type); + if (rtp_pt) + return rtp_pt; + +rfc_types: + if (type >= G_N_ELEMENTS(__rfc_types)) + return NULL; + rtp_pt = &__rfc_types[type]; + if (!rtp_pt->encoding.s) + return NULL; + return rtp_pt; + +} diff --git a/daemon/rtp.h b/daemon/rtp.h index 1e14c6cc7..d5be29bd9 100644 --- a/daemon/rtp.h +++ b/daemon/rtp.h @@ -4,6 +4,7 @@ #include "str.h" +#include @@ -18,10 +19,20 @@ struct rtp_header { u_int32_t csrc[]; } __attribute__ ((packed)); +struct rtp_payload_type { + unsigned int payload_type; + str encoding; + unsigned int clock_rate; + str encoding_parameters; +}; + +int rtp_payload(struct rtp_header **out, str *p, const str *s); +const struct rtp_payload_type *rtp_payload_type(unsigned int, GHashTable *); + int rtp_avp2savp(str *, struct crypto_context *); int rtp_savp2avp(str *, struct crypto_context *); diff --git a/daemon/sdp.c b/daemon/sdp.c index f0aa3030c..e951ca4a5 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -13,6 +13,7 @@ #include "call.h" #include "crypto.h" #include "dtls.h" +#include "rtp.h" struct network_address { str network_type; @@ -58,7 +59,7 @@ struct sdp_media { str media_type; str port; str transport; - /* ... format list */ + str formats; /* space separated */ long int port_num; int port_count; @@ -66,6 +67,7 @@ struct sdp_media { struct sdp_connection connection; int rr, rs; struct sdp_attributes attributes; + GQueue format_list; /* list of slice-alloc'd str objects */ }; struct attribute_rtcp { @@ -143,6 +145,14 @@ struct attribute_setup { } value; }; +struct attribute_rtpmap { + str payload_type_str; + str encoding_str; + str clock_rate_str; + + struct rtp_payload_type rtp_pt; +}; + struct sdp_attribute { str full_line, /* including a= and \r\n */ line_value, /* without a= and without \r\n */ @@ -169,6 +179,7 @@ struct sdp_attribute { ATTR_MID, ATTR_FINGERPRINT, ATTR_SETUP, + ATTR_RTPMAP, } attr; union { @@ -179,6 +190,7 @@ struct sdp_attribute { struct attribute_group group; struct attribute_fingerprint fingerprint; struct attribute_setup setup; + struct attribute_rtpmap rtpmap; } u; }; @@ -190,13 +202,12 @@ static const char ice_chars[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJK -//static int has_rtcp(struct sdp_media *media); - - - INLINE struct sdp_attribute *attr_get_by_id(struct sdp_attributes *a, int id) { return g_hash_table_lookup(a->id_hash, &id); } +INLINE GQueue *attr_list_get_by_id(struct sdp_attributes *a, int id) { + return g_hash_table_lookup(a->id_lists_hash, &id); +} static struct sdp_attribute *attr_get_by_id_m_s(struct sdp_media *m, int id) { struct sdp_attribute *a; @@ -309,6 +320,9 @@ INLINE int extract_token(char **sp, char *end, str *out) { EXTRACT_NETWORK_ADDRESS_NP(field); \ if (parse_address(&output->field)) output->field.parsed.s6_addr32[0] = 0xfe +#define PARSE_DECL char *end, *start +#define PARSE_INIT start = output->value.s; end = start + output->value.len + static int parse_origin(char *start, char *end, struct sdp_origin *output) { if (output->parsed) return -1; @@ -334,10 +348,12 @@ static int parse_connection(char *start, char *end, struct sdp_connection *outpu static int parse_media(char *start, char *end, struct sdp_media *output) { char *ep; + str s, *sp; EXTRACT_TOKEN(media_type); EXTRACT_TOKEN(port); EXTRACT_TOKEN(transport); + str_init_len(&output->formats, start, end - start); output->port_num = strtol(output->port.s, &ep, 10); if (ep == output->port.s) @@ -355,6 +371,15 @@ static int parse_media(char *start, char *end, struct sdp_media *output) { else output->port_count = 1; + /* to split the "formats" list into tokens, we abuse some vars */ + start = output->formats.s; + end = start + output->formats.len; + while (!extract_token(&start, end, &s)) { + sp = g_slice_alloc(sizeof(*sp)); + *sp = s; + g_queue_push_tail(&output->format_list, sp); + } + return 0; } @@ -379,14 +404,12 @@ static int parse_attribute_group(struct sdp_attribute *output) { } static int parse_attribute_ssrc(struct sdp_attribute *output) { - char *start, *end; + PARSE_DECL; struct attribute_ssrc *s; output->attr = ATTR_SSRC; - start = output->value.s; - end = start + output->value.len; - + PARSE_INIT; EXTRACT_TOKEN(u.ssrc.id_str); EXTRACT_TOKEN(u.ssrc.attr_str); @@ -407,7 +430,8 @@ static int parse_attribute_ssrc(struct sdp_attribute *output) { } static int parse_attribute_crypto(struct sdp_attribute *output) { - char *start, *end, *endp; + PARSE_DECL; + char *endp; struct attribute_crypto *c; int salt_key_len, enc_salt_key_len; int b64_state = 0; @@ -419,9 +443,7 @@ static int parse_attribute_crypto(struct sdp_attribute *output) { output->attr = ATTR_CRYPTO; - start = output->value.s; - end = start + output->value.len; - + PARSE_INIT; EXTRACT_TOKEN(u.crypto.tag_str); EXTRACT_TOKEN(u.crypto.crypto_suite_str); EXTRACT_TOKEN(u.crypto.key_params_str); @@ -551,12 +573,12 @@ static int parse_attribute_rtcp(struct sdp_attribute *output) { } static int parse_attribute_candidate(struct sdp_attribute *output) { - char *end, *start, *ep; + PARSE_DECL; + char *ep; - start = output->value.s; - end = start + output->value.len; output->attr = ATTR_CANDIDATE; + PARSE_INIT; EXTRACT_TOKEN(u.candidate.foundation); EXTRACT_TOKEN(u.candidate.component_str); EXTRACT_TOKEN(u.candidate.transport); @@ -578,14 +600,13 @@ static int parse_attribute_candidate(struct sdp_attribute *output) { } static int parse_attribute_fingerprint(struct sdp_attribute *output) { - char *end, *start; + PARSE_DECL; unsigned char *c; int i; - start = output->value.s; - end = start + output->value.len; output->attr = ATTR_FINGERPRINT; + PARSE_INIT; EXTRACT_TOKEN(u.fingerprint.hash_func_str); EXTRACT_TOKEN(u.fingerprint.fingerprint_str); @@ -647,6 +668,49 @@ static int parse_attribute_setup(struct sdp_attribute *output) { return 0; } +static int parse_attribute_rtpmap(struct sdp_attribute *output) { + PARSE_DECL; + char *ep; + struct attribute_rtpmap *a; + struct rtp_payload_type *pt; + + output->attr = ATTR_RTPMAP; + + PARSE_INIT; + EXTRACT_TOKEN(u.rtpmap.payload_type_str); + EXTRACT_TOKEN(u.rtpmap.encoding_str); + + a = &output->u.rtpmap; + pt = &a->rtp_pt; + + pt->payload_type = strtoul(a->payload_type_str.s, &ep, 10); + if (ep == a->payload_type_str.s) + return -1; + + str_chr_str(&a->clock_rate_str, &a->encoding_str, '/'); + if (!a->clock_rate_str.s) + return -1; + + pt->encoding = a->encoding_str; + pt->encoding.len -= a->clock_rate_str.len; + str_shift(&a->clock_rate_str, 1); + + str_chr_str(&pt->encoding_parameters, &a->clock_rate_str, '/'); + if (pt->encoding_parameters.s) { + a->clock_rate_str.len -= pt->encoding_parameters.len; + str_shift(&pt->encoding_parameters, 1); + } + + if (!a->clock_rate_str.len) + return -1; + + pt->clock_rate = strtoul(a->clock_rate_str.s, &ep, 10); + if (ep && ep != a->clock_rate_str.s + a->clock_rate_str.len) + return -1; + + return 0; +} + static int parse_attribute(struct sdp_attribute *a) { int ret; @@ -696,6 +760,8 @@ static int parse_attribute(struct sdp_attribute *a) { ret = parse_attribute_crypto(a); else if (!str_cmp(&a->name, "extmap")) a->attr = ATTR_EXTMAP; + else if (!str_cmp(&a->name, "rtpmap")) + ret = parse_attribute_rtpmap(a); break; case 7: if (!str_cmp(&a->name, "ice-pwd")) @@ -915,30 +981,30 @@ error: return -1; } +static void attr_free(void *p) { + g_slice_free1(sizeof(struct sdp_attribute), p); +} static void free_attributes(struct sdp_attributes *a) { - struct sdp_attribute *attr; - /* g_hash_table_destroy(a->name_hash); */ g_hash_table_destroy(a->id_hash); /* g_hash_table_destroy(a->name_lists_hash); */ g_hash_table_destroy(a->id_lists_hash); - while ((attr = g_queue_pop_head(&a->list))) { - g_slice_free1(sizeof(*attr), attr); - } + g_queue_clear_full(&a->list, attr_free); +} +static void media_free(void *p) { + struct sdp_media *media = p; + free_attributes(&media->attributes); + g_queue_clear_full(&media->format_list, str_slice_free); + g_slice_free1(sizeof(*media), media); +} +static void session_free(void *p) { + struct sdp_session *session = p; + g_queue_clear_full(&session->media_streams, media_free); + free_attributes(&session->attributes); + g_slice_free1(sizeof(*session), session); } - void sdp_free(GQueue *sessions) { - struct sdp_session *session; - struct sdp_media *media; - - while ((session = g_queue_pop_head(sessions))) { - while ((media = g_queue_pop_head(&session->media_streams))) { - free_attributes(&media->attributes); - g_slice_free1(sizeof(*media), media); - } - free_attributes(&session->attributes); - g_slice_free1(sizeof(*session), session); - } + g_queue_clear_full(sessions, session_free); } static int fill_endpoint(struct endpoint *ep, const struct sdp_media *media, struct sdp_ng_flags *flags, @@ -968,6 +1034,66 @@ static int fill_endpoint(struct endpoint *ep, const struct sdp_media *media, str } + +static int __rtp_payload_types(struct stream_params *sp, struct sdp_media *media) +{ + GHashTable *ht; + GQueue *q; + GList *ql; + struct sdp_attribute *attr; + int ret = 0; + + if (!sp->protocol || !sp->protocol->rtp) + return 0; + + /* first go through a=rtpmap and build a hash table of attrs */ + ht = g_hash_table_new(g_int_hash, g_int_equal); + q = attr_list_get_by_id(&media->attributes, ATTR_RTPMAP); + for (ql = q->head; ql; ql = ql->next) { + struct rtp_payload_type *pt; + attr = ql->data; + pt = &attr->u.rtpmap.rtp_pt; + g_hash_table_insert(ht, &pt->payload_type, pt); + } + /* a=fmtp processing would go here */ + + /* then go through the format list and associate */ + for (ql = media->format_list.head; ql; ql = ql->next) { + char *ep; + str *s; + unsigned int i; + struct rtp_payload_type *pt; + const struct rtp_payload_type *ptl; + + s = ql->data; + i = (unsigned int) strtoul(s->s, &ep, 10); + if (ep == s->s || i > 127) + goto error; + + /* first look in rtpmap for a match, then check RFC types, + * else fall back to an "unknown" type */ + ptl = rtp_payload_type(i, ht); + + pt = g_slice_alloc0(sizeof(*pt)); + if (ptl) + *pt = *ptl; + else + pt->payload_type = i; + g_queue_push_tail(&sp->rtp_payload_types, pt); + } + + goto out; + +error: + ret = -1; + goto out; +out: + g_hash_table_destroy(ht); + return ret; +} + + +/* XXX split this function up */ int sdp_streams(const GQueue *sessions, GQueue *streams, struct sdp_ng_flags *flags) { struct sdp_session *session; struct sdp_media *media; @@ -1000,6 +1126,10 @@ int sdp_streams(const GQueue *sessions, GQueue *streams, struct sdp_ng_flags *fl bf_xset(&sp->sp_flags, SP_FLAG_STRICT_SOURCE, flags->strict_source); bf_xset(&sp->sp_flags, SP_FLAG_MEDIA_HANDOVER, flags->media_handover); + errstr = "Invalid RTP payload types"; + if (__rtp_payload_types(sp, media)) + goto error; + /* a=crypto */ attr = attr_get_by_id(&media->attributes, ATTR_CRYPTO); if (attr) { @@ -1451,7 +1581,6 @@ INLINE unsigned long type_from_prio(unsigned int prio) { } static unsigned long new_priority(struct sdp_media *media, int relay) { - int id; GQueue *cands; int pref; unsigned long prio, tpref; @@ -1468,8 +1597,7 @@ static unsigned long new_priority(struct sdp_media *media, int relay) { if (!media) goto out; - id = ATTR_CANDIDATE; - cands = g_hash_table_lookup(media->attributes.id_lists_hash, &id); + cands = attr_list_get_by_id(&media->attributes, ATTR_CANDIDATE); if (!cands) goto out; diff --git a/daemon/str.c b/daemon/str.c index e00aa01d2..ceada3781 100644 --- a/daemon/str.c +++ b/daemon/str.c @@ -29,3 +29,7 @@ str *__str_sprintf(const char *fmt, ...) { va_end(ap); return ret; } + +void str_slice_free(void *p) { + g_slice_free1(sizeof(str), p); +} diff --git a/daemon/str.h b/daemon/str.h index 099621144..20e1cda5b 100644 --- a/daemon/str.h +++ b/daemon/str.h @@ -25,6 +25,7 @@ typedef struct _str str; #define STR_FMT0(str) ((str) ? (str)->len : 6), ((str) ? (str)->s : "(NULL)") #define STR_NULL ((str) { NULL, 0 }) #define STR_EMPTY ((str) { "", 0 }) +#define STR_CONST_INIT(str) { str, sizeof(str)-1 } @@ -70,6 +71,9 @@ INLINE str *g_string_free_str(GString *gs); guint str_hash(gconstpointer s); gboolean str_equal(gconstpointer a, gconstpointer b); +/* destroy function, frees a slice-alloc'd str */ +void str_slice_free(void *); + diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index bc43b0324..65cd243a7 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -148,6 +149,7 @@ struct rtpengine_target { spinlock_t stats_lock; struct rtpengine_stats stats; + struct rtpengine_rtp_stats rtp_stats[NUM_PAYLOAD_TYPES]; struct re_crypto_context decrypt; struct re_crypto_context encrypt; @@ -843,6 +845,7 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t spin_lock_irqsave(&g->stats_lock, flags); memcpy(&op.stats, &g->stats, sizeof(op.stats)); + memcpy(&op.rtp_stats, &g->rtp_stats, sizeof(op.rtp_stats)); spin_unlock_irqrestore(&g->stats_lock, flags); spin_lock_irqsave(&g->decrypt.lock, flags); @@ -971,6 +974,7 @@ static void proc_list_crypto_print(struct seq_file *f, struct re_crypto_context static int proc_list_show(struct seq_file *f, void *v) { struct rtpengine_target *g = v; unsigned long flags; + int i; seq_printf(f, "port %5u:\n", g->target.target_port); proc_list_addr_print(f, "src", &g->target.src_addr); @@ -982,6 +986,10 @@ static int proc_list_show(struct seq_file *f, void *v) { spin_lock_irqsave(&g->stats_lock, flags); seq_printf(f, " stats: %20llu bytes, %20llu packets, %20llu errors\n", g->stats.bytes, g->stats.packets, g->stats.errors); + for (i = 0; i < g->target.num_payload_types; i++) + seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets\n", + g->target.payload_types[i], + g->rtp_stats[i].bytes, g->rtp_stats[i].packets); spin_unlock_irqrestore(&g->stats_lock, flags); proc_list_crypto_print(f, &g->decrypt, &g->target.decrypt, "decryption (incoming)"); proc_list_crypto_print(f, &g->encrypt, &g->target.encrypt, "encryption (outgoing)"); @@ -1454,6 +1462,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i spin_lock(&og->stats_lock); /* nested lock! irqs are disabled already */ memcpy(&g->stats, &og->stats, sizeof(g->stats)); + memcpy(&g->rtp_stats, &og->rtp_stats, sizeof(g->rtp_stats)); spin_unlock(&og->stats_lock); } else { @@ -2141,11 +2150,30 @@ static inline int is_dtls(struct sk_buff *skb) { return 1; } +static int rtp_payload_match(const void *a, const void *b) { + const unsigned char *A = a, *B = b; + + if (*A < *B) + return -1; + if (*A > *B) + return 1; + return 0; +} +static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rtpengine_target_info *tg) { + unsigned char pt, *match; + + pt = hdr->m_pt & 0x7f; + match = bsearch(&pt, tg->payload_types, tg->num_payload_types, sizeof(pt), rtp_payload_match); + if (!match) + return -1; + return match - tg->payload_types; +} + static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src) { struct udphdr *uh; struct rtpengine_target *g; struct sk_buff *skb2; - int err; + int err, rtp_pt_idx = -2; unsigned int datalen; unsigned long flags; u_int32_t *u32; @@ -2205,14 +2233,23 @@ not_stun: src_check_ok: if (g->target.dtls && is_dtls(skb)) goto skip1; + + rtp.ok = 0; + if (!g->target.rtp) + goto not_rtp; + parse_rtp(&rtp, skb); if (!rtp.ok) { if (g->target.rtp_only) goto skip1; goto not_rtp; } + if (g->target.rtcp_mux && is_muxed_rtcp(&rtp)) goto skip1; + + rtp_pt_idx = rtp_payload_type(rtp.header, &g->target); + pkt_idx_u = pkt_idx = packet_index(&g->decrypt, &g->target.decrypt, rtp.header); if (srtp_auth_validate(&g->decrypt, &g->target.decrypt, &rtp, &pkt_idx)) goto skip_error; @@ -2258,6 +2295,14 @@ out: g->stats.packets++; g->stats.bytes += datalen; } + if (rtp_pt_idx >= 0) { + g->rtp_stats[rtp_pt_idx].packets++; + g->rtp_stats[rtp_pt_idx].bytes += datalen; + } + else if (rtp_pt_idx == -2) + /* not RTP */ ; + else if (rtp_pt_idx == -1) + g->stats.errors++; spin_unlock_irqrestore(&g->stats_lock, flags); target_push(g); diff --git a/kernel-module/xt_RTPENGINE.h b/kernel-module/xt_RTPENGINE.h index 5f419a08c..361a56fb0 100644 --- a/kernel-module/xt_RTPENGINE.h +++ b/kernel-module/xt_RTPENGINE.h @@ -1,6 +1,12 @@ #ifndef XT_RTPPROXY_H #define XT_RTPPROXY_H + + +#define NUM_PAYLOAD_TYPES 16 + + + struct xt_rtpengine_info { u_int32_t id; }; @@ -10,6 +16,10 @@ struct rtpengine_stats { u_int64_t bytes; u_int64_t errors; }; +struct rtpengine_rtp_stats { + u_int64_t packets; + u_int64_t bytes; +}; struct re_address { int family; @@ -73,10 +83,14 @@ struct rtpengine_target_info { struct rtpengine_srtp decrypt; struct rtpengine_srtp encrypt; + unsigned char payload_types[NUM_PAYLOAD_TYPES]; /* must be sorted */ + unsigned int num_payload_types; + unsigned char tos; int rtcp_mux:1, dtls:1, stun:1, + rtp:1, rtp_only:1; }; @@ -94,6 +108,7 @@ struct rtpengine_message { struct rtpengine_list_entry { struct rtpengine_target_info target; struct rtpengine_stats stats; + struct rtpengine_rtp_stats rtp_stats[NUM_PAYLOAD_TYPES]; }; diff --git a/tests/simulator-ng.pl b/tests/simulator-ng.pl index 365d1f802..3c2e98cb9 100755 --- a/tests/simulator-ng.pl +++ b/tests/simulator-ng.pl @@ -558,8 +558,9 @@ t=0 0 and $cp = $p; $sdp .= <<"!"; -m=audio $p $$tr{name} 8 +m=audio $p $$tr{name} 0 8 111 a=rtpmap:8 PCMA/8000 +a=rtpmap:111 opus/48000/2 ! if ($$A{want_rtcpmux} && $op eq 'offer') { $sdp .= "a=rtcp-mux\n";