TT#30405 codec matchup and handler functions

Change-Id: I506c0e8bfa4b6e64689f6debeb33fe69fce1722c
changes/13/18513/11
Richard Fuchs 8 years ago
parent fb729e3d12
commit 8715d40242

@ -20,6 +20,7 @@ flags = [
'-I/usr/lib/x86_64-linux-gnu/glib-2.0/include',
'-pthread',
'-I../kernel-module/',
'-I../lib/',
'-D_GNU_SOURCE',
'-D__DEBUG=1',
'-D__YCM=1',

@ -55,7 +55,8 @@ include ../lib/lib.Makefile
SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \
codec.c
LIBSRCS= loglib.c auxlib.c rtplib.c
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o)

@ -158,6 +158,19 @@ INLINE void g_tree_add_all(GTree *t, GQueue *q) {
/* GHASHTABLE */
INLINE GQueue *g_hash_table_lookup_queue_new(GHashTable *ht, void *key) {
GQueue *ret = g_hash_table_lookup(ht, key);
if (ret)
return ret;
ret = g_queue_new();
g_hash_table_insert(ht, key, ret);
return ret;
}
/*** STRING HELPERS ***/
INLINE void strmove(char **d, char **s) {

@ -43,6 +43,7 @@
#include "ssrc.h"
#include "main.h"
#include "graphite.h"
#include "codec.h"
/* also serves as array index for callstream->peers[] */
@ -683,6 +684,7 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con
med->index = sp->index;
call_str_cpy(ml->call, &med->type, &sp->type);
med->codecs = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free);
med->codec_names = g_hash_table_new_full(str_hash, str_equal, NULL, (void (*)(void*)) g_queue_free);
g_queue_push_tail(&ml->medias, med);
@ -1448,27 +1450,57 @@ static void __dtls_logic(const struct sdp_ng_flags *flags,
MEDIA_SET(other_media, DTLS);
}
static void __rtp_payload_type_add(struct call_media *media, struct rtp_payload_type *pt) {
static void __rtp_payload_type_add(struct call_media *media, struct call_media *other_media,
struct rtp_payload_type *pt)
{
struct call *call = media->call;
if (g_hash_table_lookup(media->codecs, &pt->payload_type)) {
// collision/duplicate - ignore
__payload_type_free(pt);
return;
}
/* we must duplicate the contents */
call_str_cpy(call, &pt->encoding_with_params, &pt->encoding_with_params);
call_str_cpy(call, &pt->encoding, &pt->encoding);
call_str_cpy(call, &pt->encoding_parameters, &pt->encoding_parameters);
call_str_cpy(call, &pt->format_parameters, &pt->format_parameters);
g_hash_table_replace(media->codecs, &pt->payload_type, pt);
g_queue_push_tail(&media->codecs_prefs, pt);
GQueue *q = g_hash_table_lookup_queue_new(media->codec_names, &pt->encoding);
g_queue_push_tail(q, GUINT_TO_POINTER(pt->payload_type));
g_queue_push_tail(&media->codecs_prefs_recv, pt);
// for the other side, we need a new 'pt' struct
struct rtp_payload_type *pt_copy = g_slice_alloc(sizeof(*pt));
*pt_copy = *pt; // contents are allocated from the 'call'
g_queue_push_tail(&other_media->codecs_prefs_send, pt_copy);
// make sure we have at least an empty queue here to indicate support for this code.
// don't add anything to the queue as we don't know the reverse RTP payload type.
g_hash_table_lookup_queue_new(other_media->codec_names, &pt->encoding);
}
static void __rtp_payload_types(struct call_media *media, GQueue *types, GHashTable *strip,
static void __payload_queue_free(void *qq) {
GQueue *q = qq;
g_queue_free_full(q, __payload_type_free);
}
static void __rtp_payload_types(struct call_media *media, struct call_media *other_media,
GQueue *types, GHashTable *strip,
const GQueue *offer)
{
// 'media' = receiver of this offer/answer; 'other_media' = sender of this offer/answer
struct rtp_payload_type *pt;
static const str str_all = STR_CONST_INIT("all");
GHashTable *removed = g_hash_table_new_full(str_hash, str_equal, NULL, __payload_type_free);
GHashTable *removed = g_hash_table_new_full(str_hash, str_equal, NULL, __payload_queue_free);
int remove_all = 0;
// start fresh
g_queue_clear(&media->codecs_prefs);
g_queue_clear(&media->codecs_prefs_recv);
g_queue_clear_full(&other_media->codecs_prefs_send, __payload_type_free);
g_hash_table_remove_all(media->codecs);
g_hash_table_remove_all(media->codec_names);
if (strip && g_hash_table_lookup(strip, &str_all))
remove_all = 1;
@ -1478,22 +1510,27 @@ static void __rtp_payload_types(struct call_media *media, GQueue *types, GHashTa
// codec stripping
if (strip) {
if (remove_all || g_hash_table_lookup(strip, &pt->encoding)) {
g_hash_table_replace(removed, &pt->encoding, pt);
GQueue *q = g_hash_table_lookup_queue_new(removed, &pt->encoding);
g_queue_push_tail(q, pt);
continue;
}
}
__rtp_payload_type_add(media, pt);
__rtp_payload_type_add(media, other_media, pt);
}
if (offer) {
// now restore codecs that have been removed, but should be offered
for (GList *l = offer->head; l; l = l->next) {
str *codec = l->data;
pt = g_hash_table_lookup(removed, codec);
if (!pt)
GQueue *q = g_hash_table_lookup(removed, codec);
if (!q)
continue;
g_hash_table_steal(removed, codec);
__rtp_payload_type_add(media, pt);
for (GList *l = q->head; l; l = l->next) {
pt = l->data;
__rtp_payload_type_add(media, other_media, pt);
}
g_queue_free(q);
}
}
@ -1620,7 +1657,10 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
MEDIA_SET(other_media, SDES);
}
__rtp_payload_types(media, &sp->rtp_payload_types, flags->codec_strip, &flags->codec_offer);
// codec and RTP payload types handling
__rtp_payload_types(media, other_media, &sp->rtp_payload_types,
flags->codec_strip, &flags->codec_offer);
codec_handlers_update(media, other_media);
/* send and recv are from our POV */
bf_copy_same(&media->media_flags, &sp->sp_flags,
@ -2030,7 +2070,10 @@ static void __call_free(void *p) {
g_queue_clear(&md->streams);
g_queue_clear(&md->endpoint_maps);
g_hash_table_destroy(md->codecs);
g_queue_clear(&md->codecs_prefs);
g_hash_table_destroy(md->codec_names);
g_queue_clear(&md->codecs_prefs_recv);
g_queue_clear_full(&md->codecs_prefs_send, __payload_type_free);
codec_handlers_free(md);
g_slice_free1(sizeof(*md), md);
}

@ -199,6 +199,7 @@ struct local_interface;
struct call_monologue;
struct ice_agent;
struct ssrc_hash;
struct codec_handler;
typedef bencode_buffer_t call_buffer_t;
@ -323,8 +324,13 @@ struct call_media {
GQueue streams; /* normally RTP + RTCP */
GQueue endpoint_maps;
GHashTable *codecs;
GQueue codecs_prefs;
GHashTable *codecs; // int payload type -> struct rtp_payload_type; storage container
GHashTable *codec_names; // codec name -> GQueue of int payload types; storage container
GQueue codecs_prefs_recv, // preference by order in SDP; values shared with 'codecs'
codecs_prefs_send; // ditto for outgoing media; storage container
GHashTable *codec_handlers; // int payload type -> struct codec_handler
// XXX combine this with 'codecs' hash table?
volatile struct codec_handler *codec_handler_cache;
volatile unsigned int media_flags;
};

@ -0,0 +1,130 @@
#include "codec.h"
#include <glib.h>
#include "call.h"
#include "log.h"
#include "rtplib.h"
static codec_handler_func handler_func_stub;
static struct codec_handler codec_handler_stub = {
.rtp_payload_type = -1,
.func = handler_func_stub,
};
static void __make_stub(struct codec_handler *handler) {
handler->func = handler_func_stub;
}
static void __codec_handler_free(void *pp) {
struct codec_handler *h = pp;
g_slice_free1(sizeof(*h), h);
}
// call must be locked in W
void codec_handlers_update(struct call_media *receiver, struct call_media *sink) {
if (!receiver->codec_handlers)
receiver->codec_handlers = g_hash_table_new_full(g_int_hash, g_int_equal,
NULL, __codec_handler_free);
// we go through the list of codecs that the receiver supports and compare it
// with the list of codecs supported by the sink. if the receiver supports
// a codec that the sink doesn't support, we must transcode.
//
// if we transcode, we transcode to the highest-preference supported codec
// that the sink specified. determine this first.
struct rtp_payload_type *pref_dest_codec = NULL;
for (GList *l = sink->codecs_prefs_send.head; l; l = l->next) {
struct rtp_payload_type *pt = l->data;
// XXX if supported ...
ilog(LOG_DEBUG, "Default sink codec is " STR_FORMAT, STR_FMT(&pt->encoding));
pref_dest_codec = pt;
break;
}
for (GList *l = receiver->codecs_prefs_recv.head; l; l = l->next) {
struct rtp_payload_type *pt = l->data;
// first, make sure we have a codec_handler struct for this
struct codec_handler *handler;
handler = g_hash_table_lookup(receiver->codec_handlers, &pt->payload_type);
if (!handler) {
ilog(LOG_DEBUG, "Creating codec handler for " STR_FORMAT, STR_FMT(&pt->encoding));
handler = g_slice_alloc0(sizeof(*handler));
handler->rtp_payload_type = pt->payload_type;
g_hash_table_insert(receiver->codec_handlers, &handler->rtp_payload_type,
handler);
}
// if the sink's codec preferences are unknown (empty), or there are
// no supported codecs to transcode to, then we have nothing
// to do. most likely this is an initial offer without a received answer.
// we default to forwarding without transcoding.
if (!pref_dest_codec) {
ilog(LOG_DEBUG, "No known/supported sink codec for " STR_FORMAT, STR_FMT(&pt->encoding));
__make_stub(handler);
continue;
}
if (g_hash_table_lookup(sink->codec_names, &pt->encoding)) {
// the sink supports this codec. forward without transcoding.
ilog(LOG_DEBUG, "Sink supports codec " STR_FORMAT, STR_FMT(&pt->encoding));
__make_stub(handler);
continue;
}
// the sink does not support this codec XXX do something
ilog(LOG_DEBUG, "Sink does not support codec " STR_FORMAT, STR_FMT(&pt->encoding));
__make_stub(handler);
}
}
// call must be locked in R
struct codec_handler *codec_handler_get(struct call_media *m, int payload_type) {
struct codec_handler *h;
if (payload_type < 0)
goto out;
h = g_atomic_pointer_get(&m->codec_handler_cache);
if (G_LIKELY(G_LIKELY(h) && G_LIKELY(h->rtp_payload_type == payload_type)))
return h;
h = g_hash_table_lookup(m->codec_handlers, &payload_type);
if (!h)
goto out;
g_atomic_pointer_set(&m->codec_handler_cache, h);
return h;
out:
return &codec_handler_stub;
}
void codec_handlers_free(struct call_media *m) {
g_hash_table_destroy(m->codec_handlers);
m->codec_handlers = NULL;
m->codec_handler_cache = NULL;
}
static int handler_func_stub(struct codec_handler *h, struct call_media *media, const str *s, GQueue *out) {
struct codec_packet *p = g_slice_alloc(sizeof(*p));
p->s = *s;
p->free_func = NULL;
g_queue_push_tail(out, p);
return 0;
}
void codec_packet_free(void *pp) {
struct codec_packet *p = pp;
if (p->free_func)
p->free_func(p->s.s);
g_slice_free1(sizeof(*p), p);
}

@ -0,0 +1,38 @@
#ifndef __CODEC_H__
#define __CODEC_H__
#include <glib.h>
#include "str.h"
struct call_media;
struct codec_handler;
typedef int codec_handler_func(struct codec_handler *, struct call_media *, const str *, GQueue *);
struct codec_handler {
int rtp_payload_type;
codec_handler_func *func;
};
struct codec_packet {
str s;
void (*free_func)(void *);
};
void codec_handlers_update(struct call_media *receiver, struct call_media *sink);
struct codec_handler *codec_handler_get(struct call_media *, int payload_type);
void codec_handlers_free(struct call_media *);
void codec_packet_free(void *);
#endif

@ -24,6 +24,7 @@
#include "ssrc.h"
#include "iptables.h"
#include "main.h"
#include "codec.h"
#ifndef PORT_RANDOM_MIN
@ -68,6 +69,7 @@ struct packet_handler_ctx {
struct packet_stream *sink; // where to send output packets to (forward destination)
rewrite_func decrypt_func, encrypt_func; // handlers for decrypt/encrypt
struct packet_stream *in_srtp, *out_srtp; // SRTP contexts for decrypt/encrypt (relevant for muxed RTCP)
int payload_type; // -1 if unknown or not RTP
struct ssrc_ctx *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp
int rtcp; // true if this is an RTCP packet
@ -75,6 +77,9 @@ struct packet_handler_ctx {
int update; // true if Redis info needs to be updated
int unkernelize; // true if stream ought to be removed from kernel
int kernelize; // true if stream can be kernelized
// output:
GQueue packets_out;
};
@ -1281,6 +1286,8 @@ static void media_packet_rtp(struct packet_handler_ctx *phc)
struct rtp_header *rtp_h;
struct rtcp_packet *rtcp_h;
phc->payload_type = -1;
if (G_UNLIKELY(!phc->media->protocol))
return;
if (G_UNLIKELY(!phc->media->protocol->rtp))
@ -1292,15 +1299,16 @@ static void media_packet_rtp(struct packet_handler_ctx *phc)
&phc->ssrc_out, phc->call->ssrc_hash);
// check the payload type
int i = (rtp_h->m_pt & 0x7f);
// XXX redundant between SSRC handling and codec_handler stuff -> combine
phc->payload_type = (rtp_h->m_pt & 0x7f);
if (G_LIKELY(phc->ssrc_in))
phc->ssrc_in->parent->payload_type = i;
phc->ssrc_in->parent->payload_type = phc->payload_type;
// XXX convert to array? or keep last pointer?
struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &i);
struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &phc->payload_type);
if (!rtp_s) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT,
"RTP packet with unknown payload type %u received", i);
"RTP packet with unknown payload type %u received", phc->payload_type);
atomic64_inc(&phc->stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors);
}
@ -1348,21 +1356,25 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
return ret;
}
static int media_packet_encrypt(struct packet_handler_ctx *phc)
{
static int media_packet_encrypt(struct packet_handler_ctx *phc) {
int ret = 0;
if (!phc->encrypt_func)
return 0;
mutex_lock(&phc->out_srtp->out_lock);
int ret = phc->encrypt_func(&phc->s, phc->out_srtp, NULL, NULL, NULL, phc->ssrc_out);
for (GList *l = phc->packets_out.head; l; l = l->next) {
struct codec_packet *p = l->data;
int encret = phc->encrypt_func(&p->s, phc->out_srtp, NULL, NULL, NULL, phc->ssrc_out);
if (encret == 1)
phc->update = 1;
else if (encret != 0)
ret = -1;
}
mutex_unlock(&phc->out_srtp->out_lock);
if (ret == 1) {
phc->update = 1;
ret = 0;
}
return ret;
}
@ -1558,7 +1570,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
// this sets rtcp, in_srtp, out_srtp, and sink
media_packet_rtcp_demux(phc);
// this set ssrc_in and ssrc_out
// this set payload_type, ssrc_in and ssrc_out
media_packet_rtp(phc);
@ -1580,6 +1592,11 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (phc->call->recording)
dump_packet(phc->call->recording, phc->stream, &phc->s);
// XXX use a handler for RTCP
struct codec_handler *transcoder = codec_handler_get(phc->media, phc->payload_type);
// this transfers the packet from 's' to 'packets_out'
transcoder->func(transcoder, phc->media, &phc->s, &phc->packets_out);
if (G_LIKELY(handler_ret >= 0))
handler_ret = media_packet_encrypt(phc);
@ -1606,9 +1623,19 @@ static int stream_packet(struct packet_handler_ctx *phc) {
goto drop;
}
ret = socket_sendto(&phc->sink->selected_sfd->socket, phc->s.s, phc->s.len, &phc->sink->endpoint);
__C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&phc->sink->endpoint.address),
phc->sink->endpoint.port);
struct codec_packet *p;
ret = 0;
while ((p = g_queue_pop_head(&phc->packets_out))) {
__C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&phc->sink->endpoint.address),
phc->sink->endpoint.port);
ret = socket_sendto(&phc->sink->selected_sfd->socket, p->s.s, p->s.len, &phc->sink->endpoint);
codec_packet_free(p);
if (ret == -1)
break;
}
mutex_unlock(&phc->sink->out_lock);
@ -1622,6 +1649,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
drop:
ret = 0;
// XXX separate stats for received/sent
atomic64_inc(&phc->stream->stats.packets);
atomic64_add(&phc->stream->stats.bytes, phc->s.len);
atomic64_set(&phc->stream->last_packet, rtpe_now.tv_sec);
@ -1637,6 +1665,8 @@ out:
rwlock_unlock_r(&phc->call->master_lock);
g_queue_clear_full(&phc->packets_out, codec_packet_free);
return ret;
}
@ -1662,7 +1692,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
}
#endif
struct packet_handler_ctx phc = { 0, };
struct packet_handler_ctx phc;
ZERO(phc);
phc.sfd = sfd;
ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE,

@ -91,7 +91,6 @@ int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
struct rtp_header *rtp;
str payload, to_auth;
u_int64_t index;
int ret = 0;
if (G_UNLIKELY(!ssrc_ctx))
return -1;
@ -115,7 +114,7 @@ int rtp_avp2savp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
s->len += c->params.crypto_suite->srtp_auth_tag;
}
return ret;
return 0;
}
/* rfc 3711, section 3.3 */
@ -124,7 +123,6 @@ int rtp_savp2avp(str *s, struct crypto_context *c, struct ssrc_ctx *ssrc_ctx) {
u_int64_t index;
str payload, to_auth, to_decrypt, auth_tag;
char hmac[20];
int ret = 0;
if (G_UNLIKELY(!ssrc_ctx))
return -1;
@ -176,7 +174,7 @@ decrypt:
*s = to_auth;
return ret;
return 0;
error:
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Discarded invalid SRTP packet: authentication failed");

@ -1003,15 +1003,9 @@ int sdp_parse(str *body, GQueue *sessions) {
/* if (attr->key.s)
g_hash_table_insert(attrs->name_hash, &attr->key, attr); */
/* attr_queue = g_hash_table_lookup(attrs->name_lists_hash, &attr->name);
if (!attr_queue)
g_hash_table_insert(attrs->name_lists_hash, &attr->name,
(attr_queue = g_queue_new()));
/* attr_queue = g_hash_table_lookup_queue_new(attrs->name_lists_hash, &attr->name);
g_queue_push_tail(attr_queue, attr); */
attr_queue = g_hash_table_lookup(attrs->id_lists_hash, &attr->attr);
if (!attr_queue)
g_hash_table_insert(attrs->id_lists_hash, &attr->attr,
(attr_queue = g_queue_new()));
attr_queue = g_hash_table_lookup_queue_new(attrs->id_lists_hash, &attr->attr);
g_queue_push_tail(attr_queue, attr);
break;
@ -1470,10 +1464,10 @@ static int replace_transport_protocol(struct sdp_chopper *chop,
static int replace_codec_list(struct sdp_chopper *chop,
struct sdp_media *media, struct call_media *cm)
{
if (cm->codecs_prefs.length == 0)
if (cm->codecs_prefs_recv.length == 0)
return 0; // legacy protocol or usage error
for (GList *l = cm->codecs_prefs.head; l; l = l->next) {
for (GList *l = cm->codecs_prefs_recv.head; l; l = l->next) {
struct rtp_payload_type *pt = l->data;
chopper_append_printf(chop, " %u", pt->payload_type);
}
@ -1718,7 +1712,7 @@ static int process_media_attributes(struct sdp_chopper *chop, struct sdp_media *
break;
case ATTR_RTPMAP:
if (media->codecs_prefs.length == 0)
if (media->codecs_prefs_recv.length == 0)
break; // legacy protocol or usage error
if (!g_hash_table_lookup(media->codecs,
&attr->u.rtpmap.rtp_pt.payload_type))
@ -1726,7 +1720,7 @@ static int process_media_attributes(struct sdp_chopper *chop, struct sdp_media *
break;
case ATTR_FMTP:
if (media->codecs_prefs.length == 0)
if (media->codecs_prefs_recv.length == 0)
break; // legacy protocol or usage error
if (!g_hash_table_lookup(media->codecs,
&attr->u.fmtp.payload_type))

Loading…
Cancel
Save