MT#9935 understand, learn, remember and report RTP payload types (codecs)

Squashed commit of the following:

commit cca40e8e311e6884204289687ba2a05d0855720b
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Thu Feb 12 10:17:51 2015 -0500

    support per-payload-type stats in kernel module

commit dcc0dc0002bd552ae7c99aa58311af2f81336a8f
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Wed Feb 11 12:14:44 2015 -0500

    count unknown rtp type as error

commit 941bde0df59720d1d3ef6660096cf2532a5c7e1c
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Wed Feb 11 12:01:55 2015 -0500

    use the list of rtp formats from the m= line

    This avoids dynamically altering the rtpstats hash table and makes
    keeping packet stats lock free.

commit 9150fed671d8490f4c09fb3050002c7c558391df
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Fri Feb 6 15:06:04 2015 -0500

    fix and simplify rtpmap hash table

commit 1f73741cbf2ac7d6b8d0a54d9562e9a550678e7c
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Fri Feb 6 13:56:07 2015 -0500

    MT#9935 understand, learn, remember and report RTP payload types (codecs)

commit b0d690837c02989485cf73927a89ed860a299807
Author: Richard Fuchs <rfuchs@sipwise.com>
Date:   Thu Nov 20 13:36:25 2014 -0500

    parse a=rtpmap attribute
pull/81/head
Richard Fuchs 10 years ago
parent c21193a329
commit c2ff5c3fd3

@ -86,6 +86,11 @@ INLINE void g_queue_truncate(GQueue *q, unsigned int len) {
while (q->length > len)
g_queue_pop_tail(q);
}
INLINE void g_queue_clear_full(GQueue *q, GDestroyNotify free_func) {
void *p;
while ((p = g_queue_pop_head(q)))
free_func(p);
}
INLINE void strmove(char **d, char **s) {

@ -77,42 +77,49 @@ const struct transport_protocol transport_protocols[] = {
[PROTO_RTP_AVP] = {
.index = PROTO_RTP_AVP,
.name = "RTP/AVP",
.rtp = 1,
.srtp = 0,
.avpf = 0,
},
[PROTO_RTP_SAVP] = {
.index = PROTO_RTP_SAVP,
.name = "RTP/SAVP",
.rtp = 1,
.srtp = 1,
.avpf = 0,
},
[PROTO_RTP_AVPF] = {
.index = PROTO_RTP_AVPF,
.name = "RTP/AVPF",
.rtp = 1,
.srtp = 0,
.avpf = 1,
},
[PROTO_RTP_SAVPF] = {
.index = PROTO_RTP_SAVPF,
.name = "RTP/SAVPF",
.rtp = 1,
.srtp = 1,
.avpf = 1,
},
[PROTO_UDP_TLS_RTP_SAVP] = {
.index = PROTO_UDP_TLS_RTP_SAVP,
.name = "UDP/TLS/RTP/SAVP",
.rtp = 1,
.srtp = 1,
.avpf = 0,
},
[PROTO_UDP_TLS_RTP_SAVPF] = {
.index = PROTO_UDP_TLS_RTP_SAVPF,
.name = "UDP/TLS/RTP/SAVPF",
.rtp = 1,
.srtp = 1,
.avpf = 1,
},
[PROTO_UDPTL] = {
.index = PROTO_UDPTL,
.name = "udptl",
.rtp = 0,
.srtp = 0,
.avpf = 0,
},
@ -354,6 +361,16 @@ INLINE void __re_address_translate(struct re_address *o, const struct endpoint *
o->port = ep->port;
}
static int __rtp_stats_pt_sort(const void *ap, const void *bp) {
const struct rtp_stats *a = ap, *b = bp;
if (a->payload_type < b->payload_type)
return -1;
if (a->payload_type > b->payload_type)
return 1;
return 0;
}
/* called with in_lock held */
void kernelize(struct packet_stream *stream) {
struct rtpengine_target_info reti;
@ -438,6 +455,23 @@ void kernelize(struct packet_stream *stream) {
ZERO(stream->kernel_stats);
if (stream->media->protocol && stream->media->protocol->rtp) {
GList *values, *l;
struct rtp_stats *rs;
reti.rtp = 1;
values = g_hash_table_get_values(stream->rtp_stats);
values = g_list_sort(values, __rtp_stats_pt_sort);
for (l = values; l; l = l->next) {
if (reti.num_payload_types >= G_N_ELEMENTS(reti.payload_types)) {
ilog(LOG_WARNING, "Too many RTP payload types for kernel module");
break;
}
rs = l->data;
reti.payload_types[reti.num_payload_types++] = rs->payload_type;
}
}
kernel_add_stream(cm->conf.kernelfd, &reti, 0);
PS_SET(stream, KERNELIZED);
@ -588,6 +622,7 @@ void stream_msg_mh_src(struct packet_stream *ps, struct msghdr *mh) {
}
}
/* XXX split this function into pieces */
/* called lock-free */
static int stream_packet(struct stream_fd *sfd, str *s, struct sockaddr_in6 *fsin, struct in6_addr *dst) {
struct packet_stream *stream,
@ -608,6 +643,8 @@ static int stream_packet(struct stream_fd *sfd, str *s, struct sockaddr_in6 *fsi
struct endpoint endpoint;
rewrite_func rwf_in, rwf_out;
struct interface_address *loc_addr;
struct rtp_header *rtp_h;
struct rtp_stats *rtp_s;
call = sfd->call;
cm = call->callmaster;
@ -626,6 +663,9 @@ static int stream_packet(struct stream_fd *sfd, str *s, struct sockaddr_in6 *fsi
if (!stream->sfd)
goto done;
/* demux other protocols running on this port */
if (MEDIA_ISSET(media, DTLS) && is_dtls(s)) {
ret = dtls(stream, s, fsin);
if (!ret)
@ -672,6 +712,9 @@ loop_ok:
mutex_unlock(&stream->in_lock);
/* demux RTCP */
in_srtp = stream;
sink = stream->rtp_sink;
if (!sink && PS_ISSET(stream, RTCP)) {
@ -690,6 +733,29 @@ loop_ok:
if (rtcp && sink && sink->rtcp_sibling)
out_srtp = sink->rtcp_sibling;
/* stats per RTP payload type */
if (media->protocol && media->protocol->rtp && !rtcp && !rtp_payload(&rtp_h, NULL, s)) {
i = (rtp_h->m_pt & 0x7f);
rtp_s = g_hash_table_lookup(stream->rtp_stats, &i);
if (!rtp_s) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT,
"RTP packet with unknown payload type %u received", i);
atomic64_inc(&stream->stats.errors);
atomic64_inc(&cm->statsps.errors);
}
else {
atomic64_inc(&rtp_s->packets);
atomic64_add(&rtp_s->bytes, s->len);
}
}
/* do we have somewhere to forward it to? */
if (!sink || !sink->sfd || !out_srtp->sfd || !in_srtp->sfd) {
ilog(LOG_WARNING, "RTP packet from %s discarded", addr);
atomic64_inc(&stream->stats.errors);
@ -697,6 +763,9 @@ loop_ok:
goto unlock_out;
}
/* transcoding stuff, in and out */
mutex_lock(&in_srtp->in_lock);
determine_handler(in_srtp, sink);
@ -1303,6 +1372,8 @@ static void callmaster_timer(void *ptr) {
struct stats tmpstats;
int j, update;
struct stream_fd *sfd;
struct rtp_stats *rs;
unsigned int pt;
ZERO(hlp);
@ -1345,6 +1416,21 @@ static void callmaster_timer(void *ptr) {
atomic64_set(&ps->kernel_stats.packets, ke->stats.packets);
atomic64_set(&ps->kernel_stats.errors, ke->stats.errors);
for (j = 0; j < ke->target.num_payload_types; j++) {
pt = ke->target.payload_types[j];
rs = g_hash_table_lookup(ps->rtp_stats, &pt);
if (!rs)
continue;
if (ke->rtp_stats[j].packets > atomic64_get(&rs->packets))
atomic64_add(&rs->packets,
ke->rtp_stats[j].packets - atomic64_get(&rs->packets));
if (ke->rtp_stats[j].bytes > atomic64_get(&rs->bytes))
atomic64_add(&rs->bytes,
ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes));
atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets);
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
}
update = 0;
sink = packet_stream_sink(ps);
@ -1588,6 +1674,10 @@ fail:
return -1;
}
static void __payload_type_free(void *p) {
g_slice_free1(sizeof(struct rtp_payload_type), p);
}
static struct call_media *__get_media(struct call_monologue *ml, GList **it, const struct stream_params *sp) {
struct call_media *med;
@ -1613,6 +1703,7 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con
med->call = ml->call;
med->index = sp->index;
call_str_cpy(ml->call, &med->type, &sp->type);
med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free);
g_queue_push_tail(&ml->medias, med);
*it = ml->medias.tail;
@ -1751,6 +1842,10 @@ static int __wildcard_endpoint_map(struct call_media *media, unsigned int num_po
return 0;
}
static void __rtp_stats_free(void *p) {
g_slice_free1(sizeof(struct rtp_stats), p);
}
struct packet_stream *__packet_stream_new(struct call *call) {
struct packet_stream *stream;
@ -1759,6 +1854,8 @@ struct packet_stream *__packet_stream_new(struct call *call) {
mutex_init(&stream->out_lock);
stream->call = call;
atomic64_set_na(&stream->last_packet, poller_now);
stream->rtp_stats = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __rtp_stats_free);
call->streams = g_slist_prepend(call->streams, stream);
return stream;
@ -1822,6 +1919,32 @@ static int __init_stream(struct packet_stream *ps) {
return 0;
}
static void __rtp_stats_update(GHashTable *dst, GHashTable *src) {
struct rtp_stats *rs;
struct rtp_payload_type *pt;
GList *values, *l;
/* "src" is a call_media->rtp_payload_types table, while "dst" is a
* packet_stream->rtp_stats table */
values = g_hash_table_get_values(src);
for (l = values; l; l = l->next) {
pt = l->data;
rs = g_hash_table_lookup(dst, &pt->payload_type);
if (rs)
continue;
rs = g_slice_alloc0(sizeof(*rs));
rs->payload_type = pt->payload_type;
g_hash_table_insert(dst, &rs->payload_type, rs);
}
g_list_free(values);
/* we leave previously added but now removed payload types in place */
}
static int __init_streams(struct call_media *A, struct call_media *B, const struct stream_params *sp) {
GList *la, *lb;
struct packet_stream *a, *ax, *b;
@ -1837,7 +1960,9 @@ static int __init_streams(struct call_media *A, struct call_media *B, const stru
/* RTP */
a->rtp_sink = b;
PS_SET(a, RTP);
PS_SET(a, RTP); /* XXX technically not correct, could be udptl too */
__rtp_stats_update(a->rtp_stats, A->rtp_payload_types);
if (sp) {
__fill_stream(a, &sp->rtp_endpoint, port_off);
@ -2199,6 +2324,19 @@ static void __dtls_logic(const struct sdp_ng_flags *flags, struct call_media *me
MEDIA_SET(other_media, DTLS);
}
static void __rtp_payload_types(struct call_media *media, GQueue *types) {
struct rtp_payload_type *pt;
struct call *call = media->call;
/* we steal the entire list to avoid duplicate allocs */
while ((pt = g_queue_pop_head(types))) {
/* but we must duplicate the contents */
call_str_cpy(call, &pt->encoding, &pt->encoding);
call_str_cpy(call, &pt->encoding_parameters, &pt->encoding_parameters);
g_hash_table_replace(media->rtp_payload_types, &pt->payload_type, pt);
}
}
/* called with call->master_lock held in W */
int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
const struct sdp_ng_flags *flags)
@ -2266,6 +2404,8 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
if (other_media->sdes_in.params.crypto_suite)
MEDIA_SET(other_media, SDES);
__rtp_payload_types(media, &sp->rtp_payload_types);
/* send and recv are from our POV */
bf_copy_same(&media->media_flags, &sp->sp_flags,
SP_FLAG_SEND | SP_FLAG_RECV);
@ -2422,6 +2562,47 @@ static void timeval_totalstats_average_add(struct totalstats *s, const struct ti
mutex_unlock(&s->total_average_lock);
}
static int __rtp_stats_sort(const void *ap, const void *bp) {
const struct rtp_stats *a = ap, *b = bp;
/* descending order */
if (atomic64_get(&a->packets) > atomic64_get(&b->packets))
return -1;
if (atomic64_get(&a->packets) < atomic64_get(&b->packets))
return 1;
return 0;
}
static const struct rtp_payload_type *__rtp_stats_codec(struct call_media *m) {
struct packet_stream *ps;
GList *values;
struct rtp_stats *rtp_s;
const struct rtp_payload_type *rtp_pt = NULL;
/* we only use the primary packet stream for the time being */
if (!m->streams.head)
return NULL;
ps = m->streams.head->data;
values = g_hash_table_get_values(ps->rtp_stats);
if (!values)
return NULL;
values = g_list_sort(values, __rtp_stats_sort);
/* payload type with the most packets */
rtp_s = values->data;
if (atomic64_get(&rtp_s->packets) == 0)
goto out;
rtp_pt = rtp_payload_type(rtp_s->payload_type, m->rtp_payload_types);
out:
g_list_free(values);
return rtp_pt; /* may be NULL */
}
/* called lock-free, but must hold a reference to the call */
void call_destroy(struct call *c) {
struct callmaster *m = c->callmaster;
@ -2439,6 +2620,7 @@ void call_destroy(struct call *c) {
char* cdrbufcur = cdrbuffer;
int cdrlinecnt = 0;
int found = 0;
const struct rtp_payload_type *rtp_pt;
rwlock_lock_w(&m->hashlock);
ret = g_hash_table_remove(m->callhash, &c->callid);
@ -2492,6 +2674,22 @@ void call_destroy(struct call *c) {
for (k = ml->medias.head; k; k = k->next) {
md = k->data;
rtp_pt = __rtp_stats_codec(md);
#define MLL_PREFIX "------ Media #%u ("STR_FORMAT" over %s) using " /* media log line prefix */
#define MLL_COMMON /* common args */ \
md->index, \
STR_FMT(&md->type), \
md->protocol ? md->protocol->name : "(unknown)"
if (!rtp_pt)
ilog(LOG_INFO, MLL_PREFIX "unknown codec", MLL_COMMON);
else if (!rtp_pt->encoding_parameters.s)
ilog(LOG_INFO, MLL_PREFIX ""STR_FORMAT"/%u", MLL_COMMON,
STR_FMT(&rtp_pt->encoding), rtp_pt->clock_rate);
else
ilog(LOG_INFO, MLL_PREFIX ""STR_FORMAT"/%u/"STR_FORMAT"", MLL_COMMON,
STR_FMT(&rtp_pt->encoding), rtp_pt->clock_rate,
STR_FMT(&rtp_pt->encoding_parameters));
for (o = md->streams.head; o; o = o->next) {
ps = o->data;
@ -2523,9 +2721,8 @@ void call_destroy(struct call *c) {
atomic64_get(&ps->last_packet));
}
ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, "
ilog(LOG_INFO, "--------- Port %5u <> %15s:%-5hu%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
addr, ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
@ -2751,6 +2948,7 @@ static void __call_free(void *p) {
g_queue_clear(&em->sfds);
g_slice_free1(sizeof(*em), em);
}
g_hash_table_destroy(md->rtp_payload_types);
g_slice_free1(sizeof(*md), md);
}
g_queue_clear(&m->medias);
@ -2763,6 +2961,7 @@ static void __call_free(void *p) {
while (c->streams) {
ps = c->streams->data;
c->streams = g_slist_delete_link(c->streams, c->streams);
g_hash_table_destroy(ps->rtp_stats);
g_slice_free1(sizeof(*ps), ps);
}

@ -3,6 +3,9 @@
/* XXX split everything into call_signalling.[ch] and call_packets.[ch] or w/e */
#include <sys/types.h>
#include <glib.h>
@ -66,6 +69,7 @@ struct call_monologue;
#include "str.h"
#include "crypto.h"
#include "dtls.h"
#include "rtp.h"
@ -179,6 +183,7 @@ typedef bencode_buffer_t call_buffer_t;
struct transport_protocol {
enum transport_protocol_index index;
const char *name;
int rtp:1; /* also set to 1 for SRTP */
int srtp:1;
int avpf:1;
};
@ -230,6 +235,7 @@ struct stream_params {
int desired_family;
struct dtls_fingerprint fingerprint;
unsigned int sp_flags;
GQueue rtp_payload_types; /* slice-alloc'd */
};
struct stream_fd {
@ -252,6 +258,14 @@ struct loop_protector {
unsigned char buf[RTP_LOOP_PROTECT];
};
struct rtp_stats {
unsigned int payload_type;
atomic64 packets;
atomic64 bytes;
atomic64 kernel_packets;
atomic64 kernel_bytes;
};
struct packet_stream {
mutex_t in_lock,
out_lock;
@ -274,6 +288,7 @@ struct packet_stream {
struct stats stats;
struct stats kernel_stats;
atomic64 last_packet;
GHashTable *rtp_stats; /* LOCK: call->master_lock */
#if RTP_LOOP_PROTECT
/* LOCK: in_lock: */
@ -317,6 +332,7 @@ struct call_media {
GQueue streams; /* normally RTP + RTCP */
GSList *endpoint_maps;
GHashTable *rtp_payload_types;
unsigned int media_flags;
};
@ -483,6 +499,8 @@ INLINE void *call_malloc(struct call *c, size_t l) {
INLINE char *call_strdup_len(struct call *c, const char *s, unsigned int len) {
char *r;
if (!s)
return NULL;
r = call_malloc(c, len + 1);
memcpy(r, s, len);
r[len] = 0;

@ -16,6 +16,7 @@
#include "str.h"
#include "control_tcp.h"
#include "control_udp.h"
#include "rtp.h"
@ -274,14 +275,20 @@ static void streams_parse(const char *s, struct callmaster *m, GQueue *q) {
pcre_multi_match(m->streams_re, m->streams_ree, s, 3, streams_parse_func, &i, q);
}
static void streams_free(GQueue *q) {
struct stream_params *s;
/* XXX move these somewhere else */
static void rtp_pt_free(void *p) {
g_slice_free1(sizeof(struct rtp_payload_type), p);
}
static void sp_free(void *p) {
struct stream_params *s = p;
while ((s = g_queue_pop_head(q))) {
if (s->crypto.mki)
free(s->crypto.mki);
g_slice_free1(sizeof(*s), s);
}
if (s->crypto.mki)
free(s->crypto.mki);
g_queue_clear_full(&s->rtp_payload_types, rtp_pt_free);
g_slice_free1(sizeof(*s), s);
}
static void streams_free(GQueue *q) {
g_queue_clear_full(q, sp_free);
}

@ -240,11 +240,11 @@ static struct rtcp_chain_element *rtcp_psfb(str *s) {
return rtcp_generic(s, RTCP_PT_PSFB);
}
static void rtcp_ce_free(void *p) {
g_slice_free1(sizeof(struct rtcp_chain_element), p);
}
static void rtcp_list_free(GQueue *q) {
struct rtcp_chain_element *el;
while ((el = g_queue_pop_head(q)))
g_slice_free1(sizeof(*el), el);
g_queue_clear_full(q, rtcp_ce_free);
}
static int rtcp_parse(GQueue *q, str *_s) {

@ -20,6 +20,44 @@ struct rtp_extension {
#define RFC_TYPE(type, name, c_rate) \
[type] = { \
.payload_type = type, \
.encoding = STR_CONST_INIT(#name), \
.clock_rate = c_rate, \
}
static const struct rtp_payload_type __rfc_types[] =
{
RFC_TYPE(0, PCMU, 8000),
RFC_TYPE(3, GSM, 8000),
RFC_TYPE(4, G723, 8000),
RFC_TYPE(5, DVI4, 8000),
RFC_TYPE(6, DVI4, 16000),
RFC_TYPE(7, LPC, 8000),
RFC_TYPE(8, PCMA, 8000),
RFC_TYPE(9, G722, 8000),
RFC_TYPE(10, L16, 44100),
RFC_TYPE(11, L16, 44100),
RFC_TYPE(12, QCELP, 8000),
RFC_TYPE(13, CN, 8000),
RFC_TYPE(14, MPA, 90000),
RFC_TYPE(15, G728, 8000),
RFC_TYPE(16, DVI4, 11025),
RFC_TYPE(17, DVI4, 22050),
RFC_TYPE(18, G729, 8000),
RFC_TYPE(25, CelB, 90000),
RFC_TYPE(26, JPEG, 90000),
RFC_TYPE(28, nv, 90000),
RFC_TYPE(31, H261, 90000),
RFC_TYPE(32, MPV, 90000),
RFC_TYPE(33, MP2T, 90000),
RFC_TYPE(34, H263, 90000),
};
INLINE int check_session_keys(struct crypto_context *c) {
str s;
const char *err;
@ -51,7 +89,7 @@ error:
return -1;
}
static int rtp_payload(struct rtp_header **out, str *p, const str *s) {
int rtp_payload(struct rtp_header **out, str *p, const str *s) {
struct rtp_header *rtp;
struct rtp_extension *ext;
const char *err;
@ -65,6 +103,9 @@ static int rtp_payload(struct rtp_header **out, str *p, const str *s) {
if ((rtp->v_p_x_cc & 0xc0) != 0x80) /* version 2 */
goto error;
if (!p)
goto done;
*p = *s;
/* fixed header */
str_shift(p, sizeof(*rtp));
@ -84,6 +125,7 @@ static int rtp_payload(struct rtp_header **out, str *p, const str *s) {
goto error;
}
done:
*out = rtp;
return 0;
@ -266,3 +308,23 @@ error:
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Invalid SRTP/SRTCP packet received (short packet)");
return -1;
}
const struct rtp_payload_type *rtp_payload_type(unsigned int type, GHashTable *lookup) {
const struct rtp_payload_type *rtp_pt;
if (!lookup)
goto rfc_types;
rtp_pt = g_hash_table_lookup(lookup, &type);
if (rtp_pt)
return rtp_pt;
rfc_types:
if (type >= G_N_ELEMENTS(__rfc_types))
return NULL;
rtp_pt = &__rfc_types[type];
if (!rtp_pt->encoding.s)
return NULL;
return rtp_pt;
}

@ -4,6 +4,7 @@
#include "str.h"
#include <glib.h>
@ -18,10 +19,20 @@ struct rtp_header {
u_int32_t csrc[];
} __attribute__ ((packed));
struct rtp_payload_type {
unsigned int payload_type;
str encoding;
unsigned int clock_rate;
str encoding_parameters;
};
int rtp_payload(struct rtp_header **out, str *p, const str *s);
const struct rtp_payload_type *rtp_payload_type(unsigned int, GHashTable *);
int rtp_avp2savp(str *, struct crypto_context *);
int rtp_savp2avp(str *, struct crypto_context *);

@ -13,6 +13,7 @@
#include "call.h"
#include "crypto.h"
#include "dtls.h"
#include "rtp.h"
struct network_address {
str network_type;
@ -58,7 +59,7 @@ struct sdp_media {
str media_type;
str port;
str transport;
/* ... format list */
str formats; /* space separated */
long int port_num;
int port_count;
@ -66,6 +67,7 @@ struct sdp_media {
struct sdp_connection connection;
int rr, rs;
struct sdp_attributes attributes;
GQueue format_list; /* list of slice-alloc'd str objects */
};
struct attribute_rtcp {
@ -143,6 +145,14 @@ struct attribute_setup {
} value;
};
struct attribute_rtpmap {
str payload_type_str;
str encoding_str;
str clock_rate_str;
struct rtp_payload_type rtp_pt;
};
struct sdp_attribute {
str full_line, /* including a= and \r\n */
line_value, /* without a= and without \r\n */
@ -169,6 +179,7 @@ struct sdp_attribute {
ATTR_MID,
ATTR_FINGERPRINT,
ATTR_SETUP,
ATTR_RTPMAP,
} attr;
union {
@ -179,6 +190,7 @@ struct sdp_attribute {
struct attribute_group group;
struct attribute_fingerprint fingerprint;
struct attribute_setup setup;
struct attribute_rtpmap rtpmap;
} u;
};
@ -190,13 +202,12 @@ static const char ice_chars[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJK
//static int has_rtcp(struct sdp_media *media);
INLINE struct sdp_attribute *attr_get_by_id(struct sdp_attributes *a, int id) {
return g_hash_table_lookup(a->id_hash, &id);
}
INLINE GQueue *attr_list_get_by_id(struct sdp_attributes *a, int id) {
return g_hash_table_lookup(a->id_lists_hash, &id);
}
static struct sdp_attribute *attr_get_by_id_m_s(struct sdp_media *m, int id) {
struct sdp_attribute *a;
@ -309,6 +320,9 @@ INLINE int extract_token(char **sp, char *end, str *out) {
EXTRACT_NETWORK_ADDRESS_NP(field); \
if (parse_address(&output->field)) output->field.parsed.s6_addr32[0] = 0xfe
#define PARSE_DECL char *end, *start
#define PARSE_INIT start = output->value.s; end = start + output->value.len
static int parse_origin(char *start, char *end, struct sdp_origin *output) {
if (output->parsed)
return -1;
@ -334,10 +348,12 @@ static int parse_connection(char *start, char *end, struct sdp_connection *outpu
static int parse_media(char *start, char *end, struct sdp_media *output) {
char *ep;
str s, *sp;
EXTRACT_TOKEN(media_type);
EXTRACT_TOKEN(port);
EXTRACT_TOKEN(transport);
str_init_len(&output->formats, start, end - start);
output->port_num = strtol(output->port.s, &ep, 10);
if (ep == output->port.s)
@ -355,6 +371,15 @@ static int parse_media(char *start, char *end, struct sdp_media *output) {
else
output->port_count = 1;
/* to split the "formats" list into tokens, we abuse some vars */
start = output->formats.s;
end = start + output->formats.len;
while (!extract_token(&start, end, &s)) {
sp = g_slice_alloc(sizeof(*sp));
*sp = s;
g_queue_push_tail(&output->format_list, sp);
}
return 0;
}
@ -379,14 +404,12 @@ static int parse_attribute_group(struct sdp_attribute *output) {
}
static int parse_attribute_ssrc(struct sdp_attribute *output) {
char *start, *end;
PARSE_DECL;
struct attribute_ssrc *s;
output->attr = ATTR_SSRC;
start = output->value.s;
end = start + output->value.len;
PARSE_INIT;
EXTRACT_TOKEN(u.ssrc.id_str);
EXTRACT_TOKEN(u.ssrc.attr_str);
@ -407,7 +430,8 @@ static int parse_attribute_ssrc(struct sdp_attribute *output) {
}
static int parse_attribute_crypto(struct sdp_attribute *output) {
char *start, *end, *endp;
PARSE_DECL;
char *endp;
struct attribute_crypto *c;
int salt_key_len, enc_salt_key_len;
int b64_state = 0;
@ -419,9 +443,7 @@ static int parse_attribute_crypto(struct sdp_attribute *output) {
output->attr = ATTR_CRYPTO;
start = output->value.s;
end = start + output->value.len;
PARSE_INIT;
EXTRACT_TOKEN(u.crypto.tag_str);
EXTRACT_TOKEN(u.crypto.crypto_suite_str);
EXTRACT_TOKEN(u.crypto.key_params_str);
@ -551,12 +573,12 @@ static int parse_attribute_rtcp(struct sdp_attribute *output) {
}
static int parse_attribute_candidate(struct sdp_attribute *output) {
char *end, *start, *ep;
PARSE_DECL;
char *ep;
start = output->value.s;
end = start + output->value.len;
output->attr = ATTR_CANDIDATE;
PARSE_INIT;
EXTRACT_TOKEN(u.candidate.foundation);
EXTRACT_TOKEN(u.candidate.component_str);
EXTRACT_TOKEN(u.candidate.transport);
@ -578,14 +600,13 @@ static int parse_attribute_candidate(struct sdp_attribute *output) {
}
static int parse_attribute_fingerprint(struct sdp_attribute *output) {
char *end, *start;
PARSE_DECL;
unsigned char *c;
int i;
start = output->value.s;
end = start + output->value.len;
output->attr = ATTR_FINGERPRINT;
PARSE_INIT;
EXTRACT_TOKEN(u.fingerprint.hash_func_str);
EXTRACT_TOKEN(u.fingerprint.fingerprint_str);
@ -647,6 +668,49 @@ static int parse_attribute_setup(struct sdp_attribute *output) {
return 0;
}
static int parse_attribute_rtpmap(struct sdp_attribute *output) {
PARSE_DECL;
char *ep;
struct attribute_rtpmap *a;
struct rtp_payload_type *pt;
output->attr = ATTR_RTPMAP;
PARSE_INIT;
EXTRACT_TOKEN(u.rtpmap.payload_type_str);
EXTRACT_TOKEN(u.rtpmap.encoding_str);
a = &output->u.rtpmap;
pt = &a->rtp_pt;
pt->payload_type = strtoul(a->payload_type_str.s, &ep, 10);
if (ep == a->payload_type_str.s)
return -1;
str_chr_str(&a->clock_rate_str, &a->encoding_str, '/');
if (!a->clock_rate_str.s)
return -1;
pt->encoding = a->encoding_str;
pt->encoding.len -= a->clock_rate_str.len;
str_shift(&a->clock_rate_str, 1);
str_chr_str(&pt->encoding_parameters, &a->clock_rate_str, '/');
if (pt->encoding_parameters.s) {
a->clock_rate_str.len -= pt->encoding_parameters.len;
str_shift(&pt->encoding_parameters, 1);
}
if (!a->clock_rate_str.len)
return -1;
pt->clock_rate = strtoul(a->clock_rate_str.s, &ep, 10);
if (ep && ep != a->clock_rate_str.s + a->clock_rate_str.len)
return -1;
return 0;
}
static int parse_attribute(struct sdp_attribute *a) {
int ret;
@ -696,6 +760,8 @@ static int parse_attribute(struct sdp_attribute *a) {
ret = parse_attribute_crypto(a);
else if (!str_cmp(&a->name, "extmap"))
a->attr = ATTR_EXTMAP;
else if (!str_cmp(&a->name, "rtpmap"))
ret = parse_attribute_rtpmap(a);
break;
case 7:
if (!str_cmp(&a->name, "ice-pwd"))
@ -915,30 +981,30 @@ error:
return -1;
}
static void attr_free(void *p) {
g_slice_free1(sizeof(struct sdp_attribute), p);
}
static void free_attributes(struct sdp_attributes *a) {
struct sdp_attribute *attr;
/* g_hash_table_destroy(a->name_hash); */
g_hash_table_destroy(a->id_hash);
/* g_hash_table_destroy(a->name_lists_hash); */
g_hash_table_destroy(a->id_lists_hash);
while ((attr = g_queue_pop_head(&a->list))) {
g_slice_free1(sizeof(*attr), attr);
}
g_queue_clear_full(&a->list, attr_free);
}
static void media_free(void *p) {
struct sdp_media *media = p;
free_attributes(&media->attributes);
g_queue_clear_full(&media->format_list, str_slice_free);
g_slice_free1(sizeof(*media), media);
}
static void session_free(void *p) {
struct sdp_session *session = p;
g_queue_clear_full(&session->media_streams, media_free);
free_attributes(&session->attributes);
g_slice_free1(sizeof(*session), session);
}
void sdp_free(GQueue *sessions) {
struct sdp_session *session;
struct sdp_media *media;
while ((session = g_queue_pop_head(sessions))) {
while ((media = g_queue_pop_head(&session->media_streams))) {
free_attributes(&media->attributes);
g_slice_free1(sizeof(*media), media);
}
free_attributes(&session->attributes);
g_slice_free1(sizeof(*session), session);
}
g_queue_clear_full(sessions, session_free);
}
static int fill_endpoint(struct endpoint *ep, const struct sdp_media *media, struct sdp_ng_flags *flags,
@ -968,6 +1034,66 @@ static int fill_endpoint(struct endpoint *ep, const struct sdp_media *media, str
}
static int __rtp_payload_types(struct stream_params *sp, struct sdp_media *media)
{
GHashTable *ht;
GQueue *q;
GList *ql;
struct sdp_attribute *attr;
int ret = 0;
if (!sp->protocol || !sp->protocol->rtp)
return 0;
/* first go through a=rtpmap and build a hash table of attrs */
ht = g_hash_table_new(g_int_hash, g_int_equal);
q = attr_list_get_by_id(&media->attributes, ATTR_RTPMAP);
for (ql = q->head; ql; ql = ql->next) {
struct rtp_payload_type *pt;
attr = ql->data;
pt = &attr->u.rtpmap.rtp_pt;
g_hash_table_insert(ht, &pt->payload_type, pt);
}
/* a=fmtp processing would go here */
/* then go through the format list and associate */
for (ql = media->format_list.head; ql; ql = ql->next) {
char *ep;
str *s;
unsigned int i;
struct rtp_payload_type *pt;
const struct rtp_payload_type *ptl;
s = ql->data;
i = (unsigned int) strtoul(s->s, &ep, 10);
if (ep == s->s || i > 127)
goto error;
/* first look in rtpmap for a match, then check RFC types,
* else fall back to an "unknown" type */
ptl = rtp_payload_type(i, ht);
pt = g_slice_alloc0(sizeof(*pt));
if (ptl)
*pt = *ptl;
else
pt->payload_type = i;
g_queue_push_tail(&sp->rtp_payload_types, pt);
}
goto out;
error:
ret = -1;
goto out;
out:
g_hash_table_destroy(ht);
return ret;
}
/* XXX split this function up */
int sdp_streams(const GQueue *sessions, GQueue *streams, struct sdp_ng_flags *flags) {
struct sdp_session *session;
struct sdp_media *media;
@ -1000,6 +1126,10 @@ int sdp_streams(const GQueue *sessions, GQueue *streams, struct sdp_ng_flags *fl
bf_xset(&sp->sp_flags, SP_FLAG_STRICT_SOURCE, flags->strict_source);
bf_xset(&sp->sp_flags, SP_FLAG_MEDIA_HANDOVER, flags->media_handover);
errstr = "Invalid RTP payload types";
if (__rtp_payload_types(sp, media))
goto error;
/* a=crypto */
attr = attr_get_by_id(&media->attributes, ATTR_CRYPTO);
if (attr) {
@ -1451,7 +1581,6 @@ INLINE unsigned long type_from_prio(unsigned int prio) {
}
static unsigned long new_priority(struct sdp_media *media, int relay) {
int id;
GQueue *cands;
int pref;
unsigned long prio, tpref;
@ -1468,8 +1597,7 @@ static unsigned long new_priority(struct sdp_media *media, int relay) {
if (!media)
goto out;
id = ATTR_CANDIDATE;
cands = g_hash_table_lookup(media->attributes.id_lists_hash, &id);
cands = attr_list_get_by_id(&media->attributes, ATTR_CANDIDATE);
if (!cands)
goto out;

@ -29,3 +29,7 @@ str *__str_sprintf(const char *fmt, ...) {
va_end(ap);
return ret;
}
void str_slice_free(void *p) {
g_slice_free1(sizeof(str), p);
}

@ -25,6 +25,7 @@ typedef struct _str str;
#define STR_FMT0(str) ((str) ? (str)->len : 6), ((str) ? (str)->s : "(NULL)")
#define STR_NULL ((str) { NULL, 0 })
#define STR_EMPTY ((str) { "", 0 })
#define STR_CONST_INIT(str) { str, sizeof(str)-1 }
@ -70,6 +71,9 @@ INLINE str *g_string_free_str(GString *gs);
guint str_hash(gconstpointer s);
gboolean str_equal(gconstpointer a, gconstpointer b);
/* destroy function, frees a slice-alloc'd str */
void str_slice_free(void *);

@ -17,6 +17,7 @@
#include <net/dst.h>
#include <linux/proc_fs.h>
#include <linux/spinlock.h>
#include <linux/bsearch.h>
#include <linux/netfilter_ipv4/ip_tables.h>
#include <linux/netfilter_ipv4.h>
#include <linux/netfilter_ipv6.h>
@ -148,6 +149,7 @@ struct rtpengine_target {
spinlock_t stats_lock;
struct rtpengine_stats stats;
struct rtpengine_rtp_stats rtp_stats[NUM_PAYLOAD_TYPES];
struct re_crypto_context decrypt;
struct re_crypto_context encrypt;
@ -843,6 +845,7 @@ static ssize_t proc_blist_read(struct file *f, char __user *b, size_t l, loff_t
spin_lock_irqsave(&g->stats_lock, flags);
memcpy(&op.stats, &g->stats, sizeof(op.stats));
memcpy(&op.rtp_stats, &g->rtp_stats, sizeof(op.rtp_stats));
spin_unlock_irqrestore(&g->stats_lock, flags);
spin_lock_irqsave(&g->decrypt.lock, flags);
@ -971,6 +974,7 @@ static void proc_list_crypto_print(struct seq_file *f, struct re_crypto_context
static int proc_list_show(struct seq_file *f, void *v) {
struct rtpengine_target *g = v;
unsigned long flags;
int i;
seq_printf(f, "port %5u:\n", g->target.target_port);
proc_list_addr_print(f, "src", &g->target.src_addr);
@ -982,6 +986,10 @@ static int proc_list_show(struct seq_file *f, void *v) {
spin_lock_irqsave(&g->stats_lock, flags);
seq_printf(f, " stats: %20llu bytes, %20llu packets, %20llu errors\n",
g->stats.bytes, g->stats.packets, g->stats.errors);
for (i = 0; i < g->target.num_payload_types; i++)
seq_printf(f, " RTP payload type %3u: %20llu bytes, %20llu packets\n",
g->target.payload_types[i],
g->rtp_stats[i].bytes, g->rtp_stats[i].packets);
spin_unlock_irqrestore(&g->stats_lock, flags);
proc_list_crypto_print(f, &g->decrypt, &g->target.decrypt, "decryption (incoming)");
proc_list_crypto_print(f, &g->encrypt, &g->target.encrypt, "encryption (outgoing)");
@ -1454,6 +1462,7 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
spin_lock(&og->stats_lock); /* nested lock! irqs are disabled already */
memcpy(&g->stats, &og->stats, sizeof(g->stats));
memcpy(&g->rtp_stats, &og->rtp_stats, sizeof(g->rtp_stats));
spin_unlock(&og->stats_lock);
}
else {
@ -2141,11 +2150,30 @@ static inline int is_dtls(struct sk_buff *skb) {
return 1;
}
static int rtp_payload_match(const void *a, const void *b) {
const unsigned char *A = a, *B = b;
if (*A < *B)
return -1;
if (*A > *B)
return 1;
return 0;
}
static inline int rtp_payload_type(const struct rtp_header *hdr, const struct rtpengine_target_info *tg) {
unsigned char pt, *match;
pt = hdr->m_pt & 0x7f;
match = bsearch(&pt, tg->payload_types, tg->num_payload_types, sizeof(pt), rtp_payload_match);
if (!match)
return -1;
return match - tg->payload_types;
}
static unsigned int rtpengine46(struct sk_buff *skb, struct rtpengine_table *t, struct re_address *src) {
struct udphdr *uh;
struct rtpengine_target *g;
struct sk_buff *skb2;
int err;
int err, rtp_pt_idx = -2;
unsigned int datalen;
unsigned long flags;
u_int32_t *u32;
@ -2205,14 +2233,23 @@ not_stun:
src_check_ok:
if (g->target.dtls && is_dtls(skb))
goto skip1;
rtp.ok = 0;
if (!g->target.rtp)
goto not_rtp;
parse_rtp(&rtp, skb);
if (!rtp.ok) {
if (g->target.rtp_only)
goto skip1;
goto not_rtp;
}
if (g->target.rtcp_mux && is_muxed_rtcp(&rtp))
goto skip1;
rtp_pt_idx = rtp_payload_type(rtp.header, &g->target);
pkt_idx_u = pkt_idx = packet_index(&g->decrypt, &g->target.decrypt, rtp.header);
if (srtp_auth_validate(&g->decrypt, &g->target.decrypt, &rtp, &pkt_idx))
goto skip_error;
@ -2258,6 +2295,14 @@ out:
g->stats.packets++;
g->stats.bytes += datalen;
}
if (rtp_pt_idx >= 0) {
g->rtp_stats[rtp_pt_idx].packets++;
g->rtp_stats[rtp_pt_idx].bytes += datalen;
}
else if (rtp_pt_idx == -2)
/* not RTP */ ;
else if (rtp_pt_idx == -1)
g->stats.errors++;
spin_unlock_irqrestore(&g->stats_lock, flags);
target_push(g);

@ -1,6 +1,12 @@
#ifndef XT_RTPPROXY_H
#define XT_RTPPROXY_H
#define NUM_PAYLOAD_TYPES 16
struct xt_rtpengine_info {
u_int32_t id;
};
@ -10,6 +16,10 @@ struct rtpengine_stats {
u_int64_t bytes;
u_int64_t errors;
};
struct rtpengine_rtp_stats {
u_int64_t packets;
u_int64_t bytes;
};
struct re_address {
int family;
@ -73,10 +83,14 @@ struct rtpengine_target_info {
struct rtpengine_srtp decrypt;
struct rtpengine_srtp encrypt;
unsigned char payload_types[NUM_PAYLOAD_TYPES]; /* must be sorted */
unsigned int num_payload_types;
unsigned char tos;
int rtcp_mux:1,
dtls:1,
stun:1,
rtp:1,
rtp_only:1;
};
@ -94,6 +108,7 @@ struct rtpengine_message {
struct rtpengine_list_entry {
struct rtpengine_target_info target;
struct rtpengine_stats stats;
struct rtpengine_rtp_stats rtp_stats[NUM_PAYLOAD_TYPES];
};

@ -558,8 +558,9 @@ t=0 0
and $cp = $p;
$sdp .= <<"!";
m=audio $p $$tr{name} 8
m=audio $p $$tr{name} 0 8 111
a=rtpmap:8 PCMA/8000
a=rtpmap:111 opus/48000/2
!
if ($$A{want_rtcpmux} && $op eq 'offer') {
$sdp .= "a=rtcp-mux\n";

Loading…
Cancel
Save