TT#28163 pass RTP packets to media decoder

Change-Id: Ie1cf4ed50a0dae0ee4bbe37918d3d2a86666d998
changes/99/18599/10
Richard Fuchs 8 years ago
parent b9206a49bc
commit cee94a5348

@ -5,30 +5,42 @@
#include "log.h" #include "log.h"
#include "rtplib.h" #include "rtplib.h"
#include "codeclib.h" #include "codeclib.h"
#include "ssrc.h"
static codec_handler_func handler_func_stub; struct codec_ssrc_handler {
struct ssrc_entry h; // must be first
mutex_t lock;
packet_sequencer_t sequencer;
decoder_t *decoder;
};
struct transcode_packet {
seq_packet_t p; // must be first
unsigned long ts;
str *payload;
};
static codec_handler_func handler_func_passthrough;
static codec_handler_func handler_func_transcode; static codec_handler_func handler_func_transcode;
static struct ssrc_entry *__ssrc_handler_new(void *p);
static void __ssrc_handler_free(struct codec_ssrc_handler *p);
static void __transcode_packet_free(struct transcode_packet *);
static struct codec_handler codec_handler_stub = { static struct codec_handler codec_handler_stub = {
.rtp_payload_type = -1, .source_pt.payload_type = -1,
.func = handler_func_stub, .func = handler_func_passthrough,
}; };
static void __handler_shutdown(struct codec_handler *handler) { static void __handler_shutdown(struct codec_handler *handler) {
if (handler->decoder) free_ssrc_hash(&handler->ssrc_hash);
decoder_close(handler->decoder);
handler->decoder = NULL;
}
static void __make_stub(struct codec_handler *handler) {
__handler_shutdown(handler);
handler->func = handler_func_stub;
} }
static void __codec_handler_free(void *pp) { static void __codec_handler_free(void *pp) {
@ -37,26 +49,38 @@ static void __codec_handler_free(void *pp) {
g_slice_free1(sizeof(*h), h); g_slice_free1(sizeof(*h), h);
} }
static struct codec_handler *__handler_new(int pt) {
struct codec_handler *handler = g_slice_alloc0(sizeof(*handler));
handler->source_pt.payload_type = pt;
return handler;
}
static void __make_passthrough(struct codec_handler *handler) {
__handler_shutdown(handler);
handler->func = handler_func_passthrough;
}
static void __make_transcoder(struct codec_handler *handler, struct rtp_payload_type *source, static void __make_transcoder(struct codec_handler *handler, struct rtp_payload_type *source,
struct rtp_payload_type *dest) struct rtp_payload_type *dest)
{ {
assert(source->codec_def != NULL); assert(source->codec_def != NULL);
assert(dest->codec_def != NULL); assert(dest->codec_def != NULL);
assert(source->payload_type == handler->source_pt.payload_type);
__handler_shutdown(handler); __handler_shutdown(handler);
handler->source_pt = *source;
handler->dest_pt = *dest;
handler->func = handler_func_transcode; handler->func = handler_func_transcode;
handler->decoder = decoder_new_fmt(source->codec_def, source->clock_rate, 1, 0);
if (!handler->decoder) handler->ssrc_hash = create_ssrc_hash_full(__ssrc_handler_new, (ssrc_free_func_t) __ssrc_handler_free,
goto err; handler);
ilog(LOG_DEBUG, "Created transcode context for '" STR_FORMAT "' -> '" STR_FORMAT "'", ilog(LOG_DEBUG, "Created transcode context for '" STR_FORMAT "' -> '" STR_FORMAT "'",
STR_FMT(&source->encoding), STR_FMT(&dest->encoding)); STR_FMT(&source->encoding), STR_FMT(&dest->encoding));
return; return;
err:
__make_stub(handler);
} }
// call must be locked in W // call must be locked in W
@ -93,9 +117,8 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink)
handler = g_hash_table_lookup(receiver->codec_handlers, &pt->payload_type); handler = g_hash_table_lookup(receiver->codec_handlers, &pt->payload_type);
if (!handler) { if (!handler) {
ilog(LOG_DEBUG, "Creating codec handler for " STR_FORMAT, STR_FMT(&pt->encoding)); ilog(LOG_DEBUG, "Creating codec handler for " STR_FORMAT, STR_FMT(&pt->encoding));
handler = g_slice_alloc0(sizeof(*handler)); handler = __handler_new(pt->payload_type);
handler->rtp_payload_type = pt->payload_type; g_hash_table_insert(receiver->codec_handlers, &handler->source_pt.payload_type,
g_hash_table_insert(receiver->codec_handlers, &handler->rtp_payload_type,
handler); handler);
} }
@ -105,7 +128,7 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink)
// we default to forwarding without transcoding. // we default to forwarding without transcoding.
if (!pref_dest_codec) { if (!pref_dest_codec) {
ilog(LOG_DEBUG, "No known/supported sink codec for " STR_FORMAT, STR_FMT(&pt->encoding)); ilog(LOG_DEBUG, "No known/supported sink codec for " STR_FORMAT, STR_FMT(&pt->encoding));
__make_stub(handler); __make_passthrough(handler);
continue; continue;
} }
@ -113,7 +136,7 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink)
// the sink supports this codec. forward without transcoding. // the sink supports this codec. forward without transcoding.
// XXX check format parameters as well // XXX check format parameters as well
ilog(LOG_DEBUG, "Sink supports codec " STR_FORMAT, STR_FMT(&pt->encoding)); ilog(LOG_DEBUG, "Sink supports codec " STR_FORMAT, STR_FMT(&pt->encoding));
__make_stub(handler); __make_passthrough(handler);
continue; continue;
} }
@ -132,7 +155,7 @@ struct codec_handler *codec_handler_get(struct call_media *m, int payload_type)
goto out; goto out;
h = g_atomic_pointer_get(&m->codec_handler_cache); h = g_atomic_pointer_get(&m->codec_handler_cache);
if (G_LIKELY(G_LIKELY(h) && G_LIKELY(h->rtp_payload_type == payload_type))) if (G_LIKELY(G_LIKELY(h) && G_LIKELY(h->source_pt.payload_type == payload_type)))
return h; return h;
h = g_hash_table_lookup(m->codec_handlers, &payload_type); h = g_hash_table_lookup(m->codec_handlers, &payload_type);
@ -154,14 +177,102 @@ void codec_handlers_free(struct call_media *m) {
} }
static int handler_func_stub(struct codec_handler *h, struct call_media *media, const str *s, GQueue *out) { static int handler_func_passthrough(struct codec_handler *h, struct call_media *media,
const struct media_packet *mp, GQueue *out)
{
struct codec_packet *p = g_slice_alloc(sizeof(*p)); struct codec_packet *p = g_slice_alloc(sizeof(*p));
p->s = *s; p->s = mp->raw;
p->free_func = NULL; p->free_func = NULL;
g_queue_push_tail(out, p); g_queue_push_tail(out, p);
return 0; return 0;
} }
static int handler_func_transcode(struct codec_handler *h, struct call_media *media, const str *s, GQueue *out) {
static void __transcode_packet_free(struct transcode_packet *p) {
free(p->payload);
g_slice_free1(sizeof(*p), p);
}
static struct ssrc_entry *__ssrc_handler_new(void *p) {
struct codec_handler *h = p;
struct codec_ssrc_handler *ch = g_slice_alloc0(sizeof(*ch));
mutex_init(&ch->lock);
packet_sequencer_init(&ch->sequencer, (GDestroyNotify) __transcode_packet_free);
ch->decoder = decoder_new_fmt(h->source_pt.codec_def, h->source_pt.clock_rate, 1, 0);
if (!ch->decoder)
goto err;
return &ch->h;
err:
__ssrc_handler_free(ch);
return NULL;
}
static void __ssrc_handler_free(struct codec_ssrc_handler *ch) {
packet_sequencer_destroy(&ch->sequencer);
if (ch->decoder)
decoder_close(ch->decoder);
g_slice_free1(sizeof(*ch), ch);
}
int __packet_decoded(decoder_t *decoder, AVFrame *frame, void *u1, void *u2) {
//struct codec_ssrc_handler *ch = u1;
ilog(LOG_DEBUG, "RTP media successfully decoded");
av_frame_free(&frame);
return 0;
}
static int handler_func_transcode(struct codec_handler *h, struct call_media *media,
const struct media_packet *mp, GQueue *out)
{
if (G_UNLIKELY(!mp->rtp || mp->rtcp))
return handler_func_passthrough(h, media, mp, out);
assert((mp->rtp->m_pt & 0x7f) == h->source_pt.payload_type);
// create new packet and insert it into sequencer queue
ilog(LOG_DEBUG, "Received RTP packet: SSRC %u, PT %u, seq %u, TS %u",
ntohl(mp->rtp->ssrc), mp->rtp->m_pt, ntohs(mp->rtp->seq_num),
ntohl(mp->rtp->timestamp));
struct codec_ssrc_handler *ch = get_ssrc(mp->rtp->ssrc, h->ssrc_hash);
if (G_UNLIKELY(!ch))
return 0;
struct transcode_packet *packet = g_slice_alloc0(sizeof(*packet));
packet->p.seq = ntohs(mp->rtp->seq_num);
packet->payload = str_dup(&mp->payload);
packet->ts = ntohl(mp->rtp->timestamp);
mutex_lock(&ch->lock);
if (packet_sequencer_insert(&ch->sequencer, &packet->p)) {
// dupe
mutex_unlock(&ch->lock);
__transcode_packet_free(packet);
ilog(LOG_DEBUG, "Ignoring duplicate RTP packet");
return 0;
}
// got a new packet, run decoder
while (1) {
packet = packet_sequencer_next_packet(&ch->sequencer);
if (G_UNLIKELY(!packet))
break;
ilog(LOG_DEBUG, "Decoding RTP packet: seq %u, TS %lu",
packet->p.seq, packet->ts);
if (decoder_input_data(ch->decoder, packet->payload, packet->ts, __packet_decoded, ch, NULL))
ilog(LOG_WARN, "Decoder error while processing RTP packet");
__transcode_packet_free(packet);
}
mutex_unlock(&ch->lock);
return 0; return 0;
} }

@ -5,19 +5,26 @@
#include <glib.h> #include <glib.h>
#include "str.h" #include "str.h"
#include "codeclib.h" #include "codeclib.h"
#include "aux.h"
#include "rtplib.h"
struct call_media; struct call_media;
struct codec_handler; struct codec_handler;
struct media_packet;
struct ssrc_hash;
typedef int codec_handler_func(struct codec_handler *, struct call_media *, const str *, GQueue *); typedef int codec_handler_func(struct codec_handler *, struct call_media *, const struct media_packet *,
GQueue *);
struct codec_handler { struct codec_handler {
int rtp_payload_type; struct rtp_payload_type source_pt; // source_pt.payload_type = hashtable index
struct rtp_payload_type dest_pt;
codec_handler_func *func; codec_handler_func *func;
decoder_t *decoder;
struct ssrc_hash *ssrc_hash;
}; };
struct codec_packet { struct codec_packet {

@ -79,6 +79,7 @@ struct packet_handler_ctx {
int kernelize; // true if stream can be kernelized int kernelize; // true if stream can be kernelized
// output: // output:
struct media_packet mp; // passed to handlers
GQueue packets_out; GQueue packets_out;
}; };
@ -1283,9 +1284,6 @@ static void media_packet_rtcp_demux(struct packet_handler_ctx *phc)
static void media_packet_rtp(struct packet_handler_ctx *phc) static void media_packet_rtp(struct packet_handler_ctx *phc)
{ {
struct rtp_header *rtp_h;
struct rtcp_packet *rtcp_h;
phc->payload_type = -1; phc->payload_type = -1;
if (G_UNLIKELY(!phc->media->protocol)) if (G_UNLIKELY(!phc->media->protocol))
@ -1293,18 +1291,21 @@ static void media_packet_rtp(struct packet_handler_ctx *phc)
if (G_UNLIKELY(!phc->media->protocol->rtp)) if (G_UNLIKELY(!phc->media->protocol->rtp))
return; return;
if (G_LIKELY(!phc->rtcp && !rtp_payload(&rtp_h, NULL, &phc->s))) { if (G_LIKELY(!phc->rtcp && !rtp_payload(&phc->mp.rtp, &phc->mp.payload, &phc->s))) {
rtp_padding(phc->mp.rtp, &phc->mp.payload);
if (G_LIKELY(phc->out_srtp != NULL)) if (G_LIKELY(phc->out_srtp != NULL))
__stream_ssrc(phc->in_srtp, phc->out_srtp, rtp_h->ssrc, &phc->ssrc_in, __stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtp->ssrc, &phc->ssrc_in,
&phc->ssrc_out, phc->call->ssrc_hash); &phc->ssrc_out, phc->call->ssrc_hash);
// check the payload type // check the payload type
// XXX redundant between SSRC handling and codec_handler stuff -> combine // XXX redundant between SSRC handling and codec_handler stuff -> combine
phc->payload_type = (rtp_h->m_pt & 0x7f); phc->payload_type = (phc->mp.rtp->m_pt & 0x7f);
if (G_LIKELY(phc->ssrc_in)) if (G_LIKELY(phc->ssrc_in))
phc->ssrc_in->parent->payload_type = phc->payload_type; phc->ssrc_in->parent->payload_type = phc->payload_type;
// XXX convert to array? or keep last pointer? // XXX convert to array? or keep last pointer?
// XXX yet another hash table per payload type -> combine
struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &phc->payload_type); struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &phc->payload_type);
if (!rtp_s) { if (!rtp_s) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, ilog(LOG_WARNING | LOG_FLAG_LIMIT,
@ -1318,9 +1319,9 @@ static void media_packet_rtp(struct packet_handler_ctx *phc)
atomic64_add(&rtp_s->bytes, phc->s.len); atomic64_add(&rtp_s->bytes, phc->s.len);
} }
} }
else if (phc->rtcp && !rtcp_payload(&rtcp_h, NULL, &phc->s)) { else if (phc->rtcp && !rtcp_payload(&phc->mp.rtcp, NULL, &phc->s)) {
if (G_LIKELY(phc->out_srtp != NULL)) if (G_LIKELY(phc->out_srtp != NULL))
__stream_ssrc(phc->in_srtp, phc->out_srtp, rtcp_h->ssrc, &phc->ssrc_in, __stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtcp->ssrc, &phc->ssrc_in,
&phc->ssrc_out, phc->call->ssrc_hash); &phc->ssrc_out, phc->call->ssrc_hash);
} }
} }
@ -1570,7 +1571,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
// this sets rtcp, in_srtp, out_srtp, and sink // this sets rtcp, in_srtp, out_srtp, and sink
media_packet_rtcp_demux(phc); media_packet_rtcp_demux(phc);
// this set payload_type, ssrc_in and ssrc_out // this set payload_type, ssrc_in, ssrc_out and mp
media_packet_rtp(phc); media_packet_rtp(phc);
@ -1595,7 +1596,9 @@ static int stream_packet(struct packet_handler_ctx *phc) {
// XXX use a handler for RTCP // XXX use a handler for RTCP
struct codec_handler *transcoder = codec_handler_get(phc->media, phc->payload_type); struct codec_handler *transcoder = codec_handler_get(phc->media, phc->payload_type);
// this transfers the packet from 's' to 'packets_out' // this transfers the packet from 's' to 'packets_out'
transcoder->func(transcoder, phc->media, &phc->s, &phc->packets_out); phc->mp.raw = phc->s;
if (transcoder->func(transcoder, phc->media, &phc->mp, &phc->packets_out))
goto drop;
if (G_LIKELY(handler_ret >= 0)) if (G_LIKELY(handler_ret >= 0))
handler_ret = media_packet_encrypt(phc); handler_ret = media_packet_encrypt(phc);

@ -68,6 +68,12 @@ struct stream_fd {
struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */
struct dtls_connection dtls; /* LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */
}; };
struct media_packet {
str raw;
struct rtp_header *rtp;
struct rtcp_packet *rtcp;
str payload;
};

@ -25,6 +25,7 @@ sub new {
$self->{octet_count} = 0; $self->{octet_count} = 0;
$self->{other_ssrcs} = {}; $self->{other_ssrcs} = {};
$self->{args} = \%args; $self->{args} = \%args;
$self->{payload_type} = $args{payload_type} // 0;
return $self; return $self;
} }
@ -34,7 +35,8 @@ sub timer {
time() < $self->{next_send} and return; time() < $self->{next_send} and return;
my $hdr = pack("CCnNN", 0x80, 0x00, $self->{seq}, $self->{timestamp}->bstr(), $self->{ssrc}); my $hdr = pack("CCnNN", 0x80, $self->{payload_type}, $self->{seq}, $self->{timestamp}->bstr(),
$self->{ssrc});
my $payload = chr(rand(256)) x $self->{payload}; # XXX adapt to codec my $payload = chr(rand(256)) x $self->{payload}; # XXX adapt to codec
my $lost = 0; my $lost = 0;

@ -123,17 +123,29 @@ use Socket;
use Socket6; use Socket6;
use IO::Socket; use IO::Socket;
my %codec_map = (
PCMA => { payload_type => 8 },
PCMU => { payload_type => 0 },
);
my %payload_type_map = map {$codec_map{$_}{payload_type} => $_} keys(%codec_map);
sub _codec_list_to_hash {
my ($list) = @_;
return { map { $_ => { %{$codec_map{$_}} } } @{$list} };
}
sub new { sub new {
my ($class, $rtp, $rtcp, $protocol, $type) = @_; my ($class, $rtp, $rtcp, %args) = @_;
my $self = {}; my $self = {};
bless $self, $class; bless $self, $class;
$self->{rtp} = $rtp; # main transport $self->{rtp} = $rtp; # main transport
$self->{rtcp} = $rtcp; # optional $self->{rtcp} = $rtcp; # optional
$self->{protocol} = $protocol // 'RTP/AVP'; $self->{protocol} = $args{protocol} // 'RTP/AVP';
$self->{type} = $type // 'audio'; $self->{type} = $args{type} // 'audio';
$self->{payload_types} = [0]; $self->{codec_list} = $args{codecs};
$self->{codecs} = _codec_list_to_hash(@{$self->{codecs}});
$self->{additional_attributes} = []; $self->{additional_attributes} = [];
@ -149,7 +161,9 @@ sub new_remote {
$self->{protocol} = $protocol; $self->{protocol} = $protocol;
$self->{port} = $port; $self->{port} = $port;
$self->{type} = $type; $self->{type} = $type;
$self->{payload_types} = [split(/ /, $payload_types)]; my @payload_types = [split(/ /, $payload_types)];
$self->{codec_list} = [ map {$payload_type_map{$_}} @payload_types ];
$self->{codecs} = _codec_list_to_hash(@{$self->{codecs}});
return $self; return $self;
}; };
@ -165,8 +179,10 @@ sub encode {
my $pconn = $parent_connection ? NGCP::Rtpclient::SDP::encode_address($parent_connection) : ''; my $pconn = $parent_connection ? NGCP::Rtpclient::SDP::encode_address($parent_connection) : '';
my @out; my @out;
my @payload_types = map {$codec_map{$_}{payload_type}} @{$self->{codec_list}};
push(@out, "m=$self->{type} " . $self->{rtp}->sockport() . ' ' . $self->{protocol} . ' ' push(@out, "m=$self->{type} " . $self->{rtp}->sockport() . ' ' . $self->{protocol} . ' '
. join(' ', @{$self->{payload_types}})); . join(' ', @payload_types));
my $rtpconn = NGCP::Rtpclient::SDP::encode_address($self->{rtp}); my $rtpconn = NGCP::Rtpclient::SDP::encode_address($self->{rtp});
$rtpconn eq $pconn or push(@out, "c=$rtpconn"); $rtpconn eq $pconn or push(@out, "c=$rtpconn");

@ -138,6 +138,7 @@ sub _new {
$self->{parent} = $parent; $self->{parent} = $parent;
$self->{tag} = rand(); $self->{tag} = rand();
$self->{codecs} = $args{codecs} // [qw(PCMU)];
# create media sockets # create media sockets
my @addresses = @{$parent->{all_addresses}}; my @addresses = @{$parent->{all_addresses}};
@ -181,7 +182,10 @@ sub _new {
$args{protocol} and $proto = $args{protocol}; $args{protocol} and $proto = $args{protocol};
$self->{local_media} = $self->{local_sdp}->add_media(NGCP::Rtpclient::SDP::Media->new( $self->{local_media} = $self->{local_sdp}->add_media(NGCP::Rtpclient::SDP::Media->new(
$self->{main_sockets}->[0], $self->{main_sockets}->[1], $proto)); # main rtp and rtcp $self->{main_sockets}->[0], $self->{main_sockets}->[1], # main rtp and rtcp
protocol => $proto,
codecs => $self->{codecs},
));
# XXX support multiple medias # XXX support multiple medias
if ($args{dtls}) { if ($args{dtls}) {
@ -273,7 +277,7 @@ sub _default_req_args {
my $req = { command => $cmd, 'call-id' => $self->{parent}->{callid} }; my $req = { command => $cmd, 'call-id' => $self->{parent}->{callid} };
for my $cp (qw(sdp from-tag to-tag ICE transport-protocol address-family label direction)) { for my $cp (qw(sdp from-tag to-tag ICE transport-protocol address-family label direction codec)) {
$args{$cp} and $req->{$cp} = $args{$cp}; $args{$cp} and $req->{$cp} = $args{$cp};
} }
for my $cp (@{$args{flags}}) { for my $cp (@{$args{flags}}) {

@ -0,0 +1,27 @@
#!/usr/bin/perl
use strict;
use warnings;
use NGCP::Rtpengine::Test;
use IO::Socket;
my $r = NGCP::Rtpengine::Test->new();
my ($a, $b) = $r->client_pair(
{sockdomain => &Socket::AF_INET, codecs => [qw(PCMA)]},
{sockdomain => &Socket::AF_INET, codecs => [qw(PCMU)]}
);
$r->timer_once(3, sub {
$b->answer($a, ICE => 'remove', label => "callee");
$a->start_rtp();
$a->start_rtcp();
});
$r->timer_once(10, sub { $r->stop(); });
$a->offer($b, ICE => 'remove', label => "caller", codec => { transcode => ['PCMU']});
$b->start_rtp();
$b->start_rtcp();
$r->run();
$a->teardown(dump => 1);
Loading…
Cancel
Save