TT#31403 map and replace RTCP SSRCs when transcoding

Change-Id: I765f1972e4b4d10d48e10a9e14e451edd48ca836
changes/00/18900/7
Richard Fuchs 7 years ago
parent 26a356ccae
commit 613f7afdb6

@ -342,6 +342,7 @@ struct call_media {
GHashTable *codec_handlers; // int payload type -> struct codec_handler
// XXX combine this with 'codecs_recv' hash table?
volatile struct codec_handler *codec_handler_cache;
struct rtcp_handler *rtcp_handler;
int ptime; // either from SDP or overridden

@ -6,6 +6,7 @@
#include "rtplib.h"
#include "codeclib.h"
#include "ssrc.h"
#include "rtcp.h"
@ -138,6 +139,7 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink)
NULL, __codec_handler_free);
MEDIA_CLEAR(receiver, TRANSCODE);
receiver->rtcp_handler = NULL;
// we go through the list of codecs that the receiver supports and compare it
// with the list of codecs supported by the sink. if the receiver supports
@ -335,6 +337,8 @@ next:
STR_FMT(&pt->encoding));
l = __delete_receiver_codec(receiver, l);
}
receiver->rtcp_handler = rtcp_transcode_handler;
}
}
@ -368,16 +372,16 @@ void codec_handlers_free(struct call_media *m) {
}
void codec_add_raw_packet(struct media_packet *mp, const str *raw) {
void codec_add_raw_packet(struct media_packet *mp) {
struct codec_packet *p = g_slice_alloc(sizeof(*p));
p->s = *raw;
p->s = mp->raw;
p->free_func = NULL;
g_queue_push_tail(&mp->packets_out, p);
}
static int handler_func_passthrough(struct codec_handler *h, struct call_media *media,
struct media_packet *mp)
{
codec_add_raw_packet(mp, &mp->raw);
codec_add_raw_packet(mp);
return 0;
}
@ -500,7 +504,7 @@ static int __packet_encoded(encoder_t *enc, void *u1, void *u2) {
rh->m_pt = ch->handler->dest_pt.payload_type;
rh->seq_num = htons(ch->seq_out++);
rh->timestamp = htonl(enc->avpkt.pts + ch->ts_out);
rh->ssrc = mp->ssrc_out->ssrc_map_out;
rh->ssrc = htonl(mp->ssrc_in->ssrc_map_out);
// add to output queue
struct codec_packet *p = g_slice_alloc(sizeof(*p));

@ -36,7 +36,7 @@ void codec_handlers_update(struct call_media *receiver, struct call_media *sink)
struct codec_handler *codec_handler_get(struct call_media *, int payload_type);
void codec_handlers_free(struct call_media *);
void codec_add_raw_packet(struct media_packet *mp, const str *);
void codec_add_raw_packet(struct media_packet *mp);
void codec_packet_free(void *);
void codec_rtp_payload_types(struct call_media *media, struct call_media *other_media,

@ -60,13 +60,7 @@ struct intf_rr {
struct packet_handler_ctx {
// inputs:
str s; // raw input packet
endpoint_t fsin; // source address of received packet
struct timeval tv; // timestamp when packet was received
struct stream_fd *sfd; // fd which received the packet
struct call *call; // sfd->call
struct packet_stream *stream; // sfd->stream
struct call_media *media; // stream->media
struct packet_stream *sink; // where to send output packets to (forward destination)
rewrite_func decrypt_func, encrypt_func; // handlers for decrypt/encrypt
rtcp_filter_func *rtcp_filter;
@ -1136,31 +1130,38 @@ noop:
static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *out_srtp, u_int32_t ssrc_bs,
struct ssrc_ctx **ssrc_in_p, struct ssrc_ctx **ssrc_out_p, struct ssrc_hash *ssrc_hash)
{
u_int32_t ssrc = ntohl(ssrc_bs);
u_int32_t in_ssrc = ntohl(ssrc_bs);
u_int32_t out_ssrc;
// input direction
mutex_lock(&in_srtp->in_lock);
(*ssrc_in_p) = in_srtp->ssrc_in;
if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != ssrc)) {
if (G_UNLIKELY(!(*ssrc_in_p) || (*ssrc_in_p)->parent->h.ssrc != in_ssrc)) {
// SSRC mismatch - get the new entry
(*ssrc_in_p) = in_srtp->ssrc_in =
get_ssrc_ctx(ssrc, ssrc_hash, SSRC_DIR_INPUT);
get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT);
// might have created a new entry, which would have a new random
// ssrc_map_out. we don't need this if we're not transcoding
if (!MEDIA_ISSET(in_srtp->media, TRANSCODE))
(*ssrc_in_p)->ssrc_map_out = in_ssrc;
}
mutex_unlock(&in_srtp->in_lock);
if (MEDIA_ISSET(in_srtp->media, TRANSCODE))
ssrc = (*ssrc_in_p)->ssrc_map_out;
// out direction
out_ssrc = (*ssrc_in_p)->ssrc_map_out;
mutex_lock(&out_srtp->out_lock);
(*ssrc_out_p) = out_srtp->ssrc_out;
if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != ssrc)) {
if (G_UNLIKELY(!(*ssrc_out_p) || (*ssrc_out_p)->parent->h.ssrc != out_ssrc)) {
// SSRC mismatch - get the new entry
(*ssrc_out_p) = out_srtp->ssrc_out =
get_ssrc_ctx(ssrc, ssrc_hash, SSRC_DIR_OUTPUT);
get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT);
// reverse SSRC mapping
(*ssrc_out_p)->ssrc_map_out = in_ssrc;
}
mutex_unlock(&out_srtp->out_lock);
@ -1170,20 +1171,20 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o
// returns: 0 = packet processed by other protocol hander; -1 = packet not handled, proceed;
// 1 = same as 0, but stream can be kernelized
static int media_demux_protocols(struct packet_handler_ctx *phc) {
if (MEDIA_ISSET(phc->media, DTLS) && is_dtls(&phc->s)) {
mutex_lock(&phc->stream->in_lock);
int ret = dtls(phc->stream, &phc->s, &phc->fsin);
mutex_unlock(&phc->stream->in_lock);
if (MEDIA_ISSET(phc->mp.media, DTLS) && is_dtls(&phc->s)) {
mutex_lock(&phc->mp.stream->in_lock);
int ret = dtls(phc->mp.stream, &phc->s, &phc->mp.fsin);
mutex_unlock(&phc->mp.stream->in_lock);
if (!ret)
return 0;
}
if (phc->media->ice_agent && is_stun(&phc->s)) {
int stun_ret = stun(&phc->s, phc->sfd, &phc->fsin);
if (phc->mp.media->ice_agent && is_stun(&phc->s)) {
int stun_ret = stun(&phc->s, phc->mp.sfd, &phc->mp.fsin);
if (!stun_ret)
return 0;
if (stun_ret == 1) {
call_media_state_machine(phc->media);
call_media_state_machine(phc->mp.media);
return 1;
}
else /* not an stun packet */
@ -1197,33 +1198,33 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
#if RTP_LOOP_PROTECT
// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet
static int media_loop_detect(struct packet_handler_ctx *phc) {
mutex_lock(&phc->stream->in_lock);
mutex_lock(&phc->mp.stream->in_lock);
for (int i = 0; i < RTP_LOOP_PACKETS; i++) {
if (phc->stream->lp_buf[i].len != phc->s.len)
if (phc->mp.stream->lp_buf[i].len != phc->s.len)
continue;
if (memcmp(phc->stream->lp_buf[i].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT)))
if (memcmp(phc->mp.stream->lp_buf[i].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT)))
continue;
__C_DBG("packet dupe");
if (phc->stream->lp_count >= RTP_LOOP_MAX_COUNT) {
if (phc->mp.stream->lp_count >= RTP_LOOP_MAX_COUNT) {
ilog(LOG_WARNING, "More than %d duplicate packets detected, dropping packet "
"to avoid potential loop", RTP_LOOP_MAX_COUNT);
mutex_unlock(&phc->stream->in_lock);
mutex_unlock(&phc->mp.stream->in_lock);
return -1;
}
phc->stream->lp_count++;
phc->mp.stream->lp_count++;
goto loop_ok;
}
/* not a dupe */
phc->stream->lp_count = 0;
phc->stream->lp_buf[phc->stream->lp_idx].len = phc->s.len;
memcpy(phc->stream->lp_buf[phc->stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT));
phc->stream->lp_idx = (phc->stream->lp_idx + 1) % RTP_LOOP_PACKETS;
phc->mp.stream->lp_count = 0;
phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].len = phc->s.len;
memcpy(phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT));
phc->mp.stream->lp_idx = (phc->mp.stream->lp_idx + 1) % RTP_LOOP_PACKETS;
loop_ok:
mutex_unlock(&phc->stream->in_lock);
mutex_unlock(&phc->mp.stream->in_lock);
return 0;
}
@ -1235,18 +1236,18 @@ loop_ok:
// sink is set to where to forward the packet to
static void media_packet_rtcp_demux(struct packet_handler_ctx *phc)
{
phc->in_srtp = phc->stream;
phc->sink = phc->stream->rtp_sink;
if (!phc->sink && PS_ISSET(phc->stream, RTCP)) {
phc->sink = phc->stream->rtcp_sink;
phc->in_srtp = phc->mp.stream;
phc->sink = phc->mp.stream->rtp_sink;
if (!phc->sink && PS_ISSET(phc->mp.stream, RTCP)) {
phc->sink = phc->mp.stream->rtcp_sink;
phc->rtcp = 1;
}
else if (phc->stream->rtcp_sink) {
int muxed_rtcp = rtcp_demux(&phc->s, phc->media);
else if (phc->mp.stream->rtcp_sink) {
int muxed_rtcp = rtcp_demux(&phc->s, phc->mp.media);
if (muxed_rtcp == 2) {
phc->sink = phc->stream->rtcp_sink;
phc->sink = phc->mp.stream->rtcp_sink;
phc->rtcp = 1;
phc->in_srtp = phc->stream->rtcp_sibling; // use RTCP SRTP context
phc->in_srtp = phc->mp.stream->rtcp_sibling; // use RTCP SRTP context
}
}
phc->out_srtp = phc->sink;
@ -1259,9 +1260,9 @@ static void media_packet_rtp(struct packet_handler_ctx *phc)
{
phc->payload_type = -1;
if (G_UNLIKELY(!phc->media->protocol))
if (G_UNLIKELY(!phc->mp.media->protocol))
return;
if (G_UNLIKELY(!phc->media->protocol->rtp))
if (G_UNLIKELY(!phc->mp.media->protocol->rtp))
return;
if (G_LIKELY(!phc->rtcp && !rtp_payload(&phc->mp.rtp, &phc->mp.payload, &phc->s))) {
@ -1269,7 +1270,7 @@ static void media_packet_rtp(struct packet_handler_ctx *phc)
if (G_LIKELY(phc->out_srtp != NULL))
__stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtp->ssrc, &phc->mp.ssrc_in,
&phc->mp.ssrc_out, phc->call->ssrc_hash);
&phc->mp.ssrc_out, phc->mp.call->ssrc_hash);
// check the payload type
// XXX redundant between SSRC handling and codec_handler stuff -> combine
@ -1279,11 +1280,11 @@ static void media_packet_rtp(struct packet_handler_ctx *phc)
// XXX convert to array? or keep last pointer?
// XXX yet another hash table per payload type -> combine
struct rtp_stats *rtp_s = g_hash_table_lookup(phc->stream->rtp_stats, &phc->payload_type);
struct rtp_stats *rtp_s = g_hash_table_lookup(phc->mp.stream->rtp_stats, &phc->payload_type);
if (!rtp_s) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT,
"RTP packet with unknown payload type %u received", phc->payload_type);
atomic64_inc(&phc->stream->stats.errors);
atomic64_inc(&phc->mp.stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors);
}
@ -1295,7 +1296,7 @@ static void media_packet_rtp(struct packet_handler_ctx *phc)
else if (phc->rtcp && !rtcp_payload(&phc->mp.rtcp, NULL, &phc->s)) {
if (G_LIKELY(phc->out_srtp != NULL))
__stream_ssrc(phc->in_srtp, phc->out_srtp, phc->mp.rtcp->ssrc, &phc->mp.ssrc_in,
&phc->mp.ssrc_out, phc->call->ssrc_hash);
&phc->mp.ssrc_out, phc->mp.call->ssrc_hash);
}
}
@ -1320,7 +1321,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
* 1 = forward and push update to redis */
int ret = 0;
if (phc->decrypt_func)
ret = phc->decrypt_func(&phc->s, phc->in_srtp, phc->sfd, &phc->fsin, &phc->tv, phc->mp.ssrc_in);
ret = phc->decrypt_func(&phc->s, phc->in_srtp, phc->mp.sfd, &phc->mp.fsin, &phc->mp.tv, phc->mp.ssrc_in);
mutex_unlock(&phc->in_srtp->in_lock);
@ -1361,50 +1362,50 @@ static int media_packet_address_check(struct packet_handler_ctx *phc)
struct endpoint endpoint;
int ret = 0;
mutex_lock(&phc->stream->in_lock);
mutex_lock(&phc->mp.stream->in_lock);
/* we're OK to (potentially) use the source address of this packet as destination
* in the other direction. */
/* if the other side hasn't been signalled yet, just forward the packet */
if (!PS_ISSET(phc->stream, FILLED)) {
__C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
if (!PS_ISSET(phc->mp.stream, FILLED)) {
__C_DBG("stream %s:%d not FILLED", sockaddr_print_buf(&phc->mp.stream->endpoint.address),
phc->mp.stream->endpoint.port);
goto out;
}
/* do not pay attention to source addresses of incoming packets for asymmetric streams */
if (MEDIA_ISSET(phc->media, ASYMMETRIC))
PS_SET(phc->stream, CONFIRMED);
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC))
PS_SET(phc->mp.stream, CONFIRMED);
/* confirm sink for unidirectional streams in order to kernelize */
if (MEDIA_ISSET(phc->media, UNIDIRECTIONAL))
if (MEDIA_ISSET(phc->mp.media, UNIDIRECTIONAL))
PS_SET(phc->sink, CONFIRMED);
/* if we have already updated the endpoint in the past ... */
if (PS_ISSET(phc->stream, CONFIRMED)) {
if (PS_ISSET(phc->mp.stream, CONFIRMED)) {
/* see if we need to compare the source address with the known endpoint */
if (PS_ISSET2(phc->stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint = phc->fsin;
mutex_lock(&phc->stream->out_lock);
if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint = phc->mp.fsin;
mutex_lock(&phc->mp.stream->out_lock);
int tmp = memcmp(&endpoint, &phc->stream->endpoint, sizeof(endpoint));
if (tmp && PS_ISSET(phc->stream, MEDIA_HANDOVER)) {
int tmp = memcmp(&endpoint, &phc->mp.stream->endpoint, sizeof(endpoint));
if (tmp && PS_ISSET(phc->mp.stream, MEDIA_HANDOVER)) {
/* out_lock remains locked */
ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(&phc->fsin));
ilog(LOG_INFO, "Peer address changed to %s", endpoint_print_buf(&phc->mp.fsin));
phc->unkernelize = 1;
phc->update = 1;
phc->stream->endpoint = phc->fsin;
phc->mp.stream->endpoint = phc->mp.fsin;
goto update_addr;
}
mutex_unlock(&phc->stream->out_lock);
mutex_unlock(&phc->mp.stream->out_lock);
if (tmp && PS_ISSET(phc->stream, STRICT_SOURCE)) {
if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) {
ilog(LOG_INFO, "Drop due to strict-source attribute; got %s:%d, expected %s:%d",
sockaddr_print_buf(&endpoint.address), endpoint.port,
sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
atomic64_inc(&phc->stream->stats.errors);
sockaddr_print_buf(&phc->mp.stream->endpoint.address),
phc->mp.stream->endpoint.port);
atomic64_inc(&phc->mp.stream->stats.errors);
ret = -1;
goto out;
}
@ -1415,72 +1416,72 @@ static int media_packet_address_check(struct packet_handler_ctx *phc)
/* wait at least 3 seconds after last signal before committing to a particular
* endpoint address */
if (!phc->call->last_signal || rtpe_now.tv_sec <= phc->call->last_signal + 3)
if (!phc->mp.call->last_signal || rtpe_now.tv_sec <= phc->mp.call->last_signal + 3)
goto update_peerinfo;
phc->kernelize = 1;
phc->update = 1;
ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(&phc->fsin));
ilog(LOG_INFO, "Confirmed peer address as %s", endpoint_print_buf(&phc->mp.fsin));
PS_SET(phc->stream, CONFIRMED);
PS_SET(phc->mp.stream, CONFIRMED);
update_peerinfo:
mutex_lock(&phc->stream->out_lock);
endpoint = phc->stream->endpoint;
phc->stream->endpoint = phc->fsin;
if (memcmp(&endpoint, &phc->stream->endpoint, sizeof(endpoint)))
mutex_lock(&phc->mp.stream->out_lock);
endpoint = phc->mp.stream->endpoint;
phc->mp.stream->endpoint = phc->mp.fsin;
if (memcmp(&endpoint, &phc->mp.stream->endpoint, sizeof(endpoint)))
phc->update = 1;
update_addr:
mutex_unlock(&phc->stream->out_lock);
mutex_unlock(&phc->mp.stream->out_lock);
/* check the destination address of the received packet against what we think our
* local interface to use is */
if (phc->stream->selected_sfd && phc->sfd != phc->stream->selected_sfd) {
ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&phc->sfd->socket.local));
phc->stream->selected_sfd = phc->sfd;
if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) {
ilog(LOG_INFO, "Switching local interface to %s", endpoint_print_buf(&phc->mp.sfd->socket.local));
phc->mp.stream->selected_sfd = phc->mp.sfd;
phc->update = 1;
}
out:
mutex_unlock(&phc->stream->in_lock);
mutex_unlock(&phc->mp.stream->in_lock);
return ret;
}
static void media_packet_kernel_check(struct packet_handler_ctx *phc) {
if (PS_ISSET(phc->stream, NO_KERNEL_SUPPORT)) {
__C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&phc->stream->endpoint.address), phc->stream->endpoint.port);
if (PS_ISSET(phc->mp.stream, NO_KERNEL_SUPPORT)) {
__C_DBG("stream %s:%d NO_KERNEL_SUPPORT", sockaddr_print_buf(&phc->mp.stream->endpoint.address), phc->mp.stream->endpoint.port);
return;
}
if (!PS_ISSET(phc->stream, CONFIRMED)) {
__C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
if (!PS_ISSET(phc->mp.stream, CONFIRMED)) {
__C_DBG("stream %s:%d not CONFIRMED", sockaddr_print_buf(&phc->mp.stream->endpoint.address),
phc->mp.stream->endpoint.port);
return;
}
if (!phc->sink) {
__C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
__C_DBG("sink is NULL for stream %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address),
phc->mp.stream->endpoint.port);
return;
}
if (!PS_ISSET(phc->sink, CONFIRMED)) {
__C_DBG("sink not CONFIRMED for stream %s:%d",
sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
sockaddr_print_buf(&phc->mp.stream->endpoint.address),
phc->mp.stream->endpoint.port);
return;
}
if (!PS_ISSET(phc->sink, FILLED)) {
__C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
__C_DBG("sink not FILLED for stream %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address),
phc->mp.stream->endpoint.port);
return;
}
kernelize(phc->stream);
kernelize(phc->mp.stream);
}
@ -1490,13 +1491,14 @@ static int do_rtcp(struct packet_handler_ctx *phc) {
// XXX rewrite/consume for transcoding
GQueue rtcp_list = G_QUEUE_INIT;
if (rtcp_parse(&rtcp_list, &phc->s, phc->sfd, &phc->fsin, &phc->tv))
if (rtcp_parse(&rtcp_list, &phc->mp))
goto out;
if (phc->rtcp_filter)
if (phc->rtcp_filter(&phc->s, &rtcp_list))
if (phc->rtcp_filter(&phc->mp, &rtcp_list))
goto out;
codec_add_raw_packet(&phc->mp, &phc->s);
// queue for output
codec_add_raw_packet(&phc->mp);
ret = 0;
out:
@ -1529,20 +1531,20 @@ static int stream_packet(struct packet_handler_ctx *phc) {
* always holds true */
int ret = 0, handler_ret = 0;
phc->call = phc->sfd->call;
phc->mp.call = phc->mp.sfd->call;
rwlock_lock_r(&phc->call->master_lock);
rwlock_lock_r(&phc->mp.call->master_lock);
phc->stream = phc->sfd->stream;
if (G_UNLIKELY(!phc->stream))
phc->mp.stream = phc->mp.sfd->stream;
if (G_UNLIKELY(!phc->mp.stream))
goto out;
__C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&phc->stream->endpoint.address),
phc->stream->endpoint.port);
__C_DBG("Handling packet on: %s:%d", sockaddr_print_buf(&phc->mp.stream->endpoint.address),
phc->mp.stream->endpoint.port);
phc->media = phc->stream->media;
phc->mp.media = phc->mp.stream->media;
if (!phc->stream->selected_sfd)
if (!phc->mp.stream->selected_sfd)
goto out;
@ -1556,7 +1558,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
#if RTP_LOOP_PROTECT
if (MEDIA_ISSET(phc->media, LOOP_CHECK)) {
if (MEDIA_ISSET(phc->mp.media, LOOP_CHECK)) {
if (media_loop_detect(phc))
goto out;
}
@ -1575,8 +1577,8 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (G_UNLIKELY(!phc->sink || !phc->sink->selected_sfd || !phc->out_srtp
|| !phc->out_srtp->selected_sfd || !phc->in_srtp->selected_sfd))
{
ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(&phc->fsin));
atomic64_inc(&phc->stream->stats.errors);
ilog(LOG_WARNING, "RTP packet from %s discarded", endpoint_print_buf(&phc->mp.fsin));
atomic64_inc(&phc->mp.stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors);
goto out;
}
@ -1585,19 +1587,21 @@ static int stream_packet(struct packet_handler_ctx *phc) {
handler_ret = media_packet_decrypt(phc);
// If recording pcap dumper is set, then we record the call.
if (phc->call->recording)
dump_packet(phc->call->recording, phc->stream, &phc->s);
if (phc->mp.call->recording)
dump_packet(phc->mp.call->recording, phc->mp.stream, &phc->s);
// ready to process
phc->mp.raw = phc->s;
// RTCP detection is further up, so we could make this determination there
if (phc->mp.rtcp) {
if (phc->rtcp) {
if (do_rtcp(phc))
goto drop;
}
else {
struct codec_handler *transcoder = codec_handler_get(phc->media, phc->payload_type);
struct codec_handler *transcoder = codec_handler_get(phc->mp.media, phc->payload_type);
// this transfers the packet from 's' to 'packets_out'
phc->mp.raw = phc->s;
if (transcoder->func(transcoder, phc->media, &phc->mp))
if (transcoder->func(transcoder, phc->mp.media, &phc->mp))
goto drop;
}
@ -1605,7 +1609,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
handler_ret = media_packet_encrypt(phc);
if (phc->update) // for RTCP packet index updates
unkernelize(phc->stream);
unkernelize(phc->mp.stream);
int address_check = media_packet_address_check(phc);
@ -1646,7 +1650,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (ret == -1) {
ret = -errno;
ilog(LOG_DEBUG,"Error when sending message. Error: %s",strerror(errno));
atomic64_inc(&phc->stream->stats.errors);
atomic64_inc(&phc->mp.stream->stats.errors);
atomic64_inc(&rtpe_statsps.errors);
goto out;
}
@ -1654,20 +1658,20 @@ static int stream_packet(struct packet_handler_ctx *phc) {
drop:
ret = 0;
// XXX separate stats for received/sent
atomic64_inc(&phc->stream->stats.packets);
atomic64_add(&phc->stream->stats.bytes, phc->s.len);
atomic64_set(&phc->stream->last_packet, rtpe_now.tv_sec);
atomic64_inc(&phc->mp.stream->stats.packets);
atomic64_add(&phc->mp.stream->stats.bytes, phc->s.len);
atomic64_set(&phc->mp.stream->last_packet, rtpe_now.tv_sec);
atomic64_inc(&rtpe_statsps.packets);
atomic64_add(&rtpe_statsps.bytes, phc->s.len);
out:
if (phc->unkernelize) {
stream_unconfirm(phc->stream);
stream_unconfirm(phc->stream->rtp_sink);
stream_unconfirm(phc->stream->rtcp_sink);
stream_unconfirm(phc->mp.stream);
stream_unconfirm(phc->mp.stream->rtp_sink);
stream_unconfirm(phc->mp.stream->rtcp_sink);
}
rwlock_unlock_r(&phc->call->master_lock);
rwlock_unlock_r(&phc->mp.call->master_lock);
g_queue_clear_full(&phc->mp.packets_out, codec_packet_free);
@ -1698,10 +1702,10 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
struct packet_handler_ctx phc;
ZERO(phc);
phc.sfd = sfd;
phc.mp.sfd = sfd;
ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE,
&phc.fsin, &phc.tv);
&phc.mp.fsin, &phc.mp.tv);
if (ret < 0) {
if (errno == EINTR)

@ -15,7 +15,9 @@
typedef int rtcp_filter_func(str *, GQueue *);
struct media_packet;
typedef int rtcp_filter_func(struct media_packet *, GQueue *);
@ -73,6 +75,14 @@ struct stream_fd {
};
struct media_packet {
str raw;
endpoint_t fsin; // source address of received packet
struct timeval tv; // timestamp when packet was received
struct stream_fd *sfd; // fd which received the packet
struct call *call; // sfd->call
struct packet_stream *stream; // sfd->stream
struct call_media *media; // stream->media
struct rtp_header *rtp;
struct rtcp_packet *rtcp;
struct ssrc_ctx *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp

@ -226,9 +226,7 @@ struct rtcp_chain_element {
// context to hold state variables
struct rtcp_process_ctx {
// input
struct call *call;
struct call_media *media;
const struct timeval *received;
struct media_packet *mp;
// handler vars
union {
@ -252,10 +250,10 @@ struct rtcp_process_ctx {
struct rtcp_handler {
void (*init)(struct rtcp_process_ctx *);
void (*start)(struct rtcp_process_ctx *, struct call *);
void (*common)(struct rtcp_process_ctx *, const struct rtcp_packet *);
void (*sr)(struct rtcp_process_ctx *, const struct sender_report_packet *);
void (*common)(struct rtcp_process_ctx *, struct rtcp_packet *);
void (*sr)(struct rtcp_process_ctx *, struct sender_report_packet *);
void (*rr_list_start)(struct rtcp_process_ctx *, const struct rtcp_packet *);
void (*rr)(struct rtcp_process_ctx *, const struct report_block *);
void (*rr)(struct rtcp_process_ctx *, struct report_block *);
void (*rr_list_end)(struct rtcp_process_ctx *);
//void (*xr)(struct rtcp_process_ctx *, const struct rtcp_packet *, str *);
void (*sdes_list_start)(struct rtcp_process_ctx *, const struct source_description_packet *);
@ -276,6 +274,7 @@ struct rtcp_handlers {
const struct rtcp_handler
*scratch,
*mos,
*transcode,
*logging,
*homer;
};
@ -283,25 +282,35 @@ struct rtcp_handlers {
// log handler function prototypes
// scratch area (prepare/parse packet)
static void scratch_common(struct rtcp_process_ctx *, const struct rtcp_packet *);
static void scratch_sr(struct rtcp_process_ctx *, const struct sender_report_packet *);
static void scratch_rr(struct rtcp_process_ctx *, const struct report_block *);
static void scratch_common(struct rtcp_process_ctx *, struct rtcp_packet *);
static void scratch_sr(struct rtcp_process_ctx *, struct sender_report_packet *);
static void scratch_rr(struct rtcp_process_ctx *, struct report_block *);
static void scratch_xr_rr_time(struct rtcp_process_ctx *, const struct xr_rb_rr_time *);
static void scratch_xr_dlrr(struct rtcp_process_ctx *, const struct xr_rb_dlrr *);
static void scratch_xr_voip_metrics(struct rtcp_process_ctx *, const struct xr_rb_voip_metrics *);
// MOS calculation / stats
static void mos_sr(struct rtcp_process_ctx *, const struct sender_report_packet *);
static void mos_rr(struct rtcp_process_ctx *, const struct report_block *);
static void mos_sr(struct rtcp_process_ctx *, struct sender_report_packet *);
static void mos_rr(struct rtcp_process_ctx *, struct report_block *);
static void mos_xr_rr_time(struct rtcp_process_ctx *, const struct xr_rb_rr_time *);
static void mos_xr_dlrr(struct rtcp_process_ctx *, const struct xr_rb_dlrr *);
static void mos_xr_voip_metrics(struct rtcp_process_ctx *, const struct xr_rb_voip_metrics *);
// RTCP translation for transcoding
static void transcode_common(struct rtcp_process_ctx *, struct rtcp_packet *);
static void transcode_rr(struct rtcp_process_ctx *, struct report_block *);
static void transcode_sr(struct rtcp_process_ctx *, struct sender_report_packet *);
// wrappers to enable dynamic transcoding
static void transcode_common_wrap(struct rtcp_process_ctx *, struct rtcp_packet *);
static void transcode_rr_wrap(struct rtcp_process_ctx *, struct report_block *);
static void transcode_sr_wrap(struct rtcp_process_ctx *, struct sender_report_packet *);
// homer functions
static void homer_init(struct rtcp_process_ctx *);
static void homer_sr(struct rtcp_process_ctx *, const struct sender_report_packet *);
static void homer_sr(struct rtcp_process_ctx *, struct sender_report_packet *);
static void homer_rr_list_start(struct rtcp_process_ctx *, const struct rtcp_packet *);
static void homer_rr(struct rtcp_process_ctx *, const struct report_block *);
static void homer_rr(struct rtcp_process_ctx *, struct report_block *);
static void homer_rr_list_end(struct rtcp_process_ctx *);
static void homer_sdes_list_start(struct rtcp_process_ctx *, const struct source_description_packet *);
static void homer_sdes_item(struct rtcp_process_ctx *, const struct sdes_chunk *, const struct sdes_item *,
@ -313,10 +322,10 @@ static void homer_finish(struct rtcp_process_ctx *, struct call *, const endpoin
// syslog functions
static void logging_init(struct rtcp_process_ctx *);
static void logging_start(struct rtcp_process_ctx *, struct call *);
static void logging_common(struct rtcp_process_ctx *, const struct rtcp_packet *);
static void logging_common(struct rtcp_process_ctx *, struct rtcp_packet *);
static void logging_sdes_list_start(struct rtcp_process_ctx *, const struct source_description_packet *);
static void logging_sr(struct rtcp_process_ctx *, const struct sender_report_packet *);
static void logging_rr(struct rtcp_process_ctx *, const struct report_block *);
static void logging_sr(struct rtcp_process_ctx *, struct sender_report_packet *);
static void logging_rr(struct rtcp_process_ctx *, struct report_block *);
static void logging_xr_rb(struct rtcp_process_ctx *, const struct xr_report_block *);
static void logging_xr_rr_time(struct rtcp_process_ctx *, const struct xr_rb_rr_time *);
static void logging_xr_dlrr(struct rtcp_process_ctx *, const struct xr_rb_dlrr *);
@ -343,6 +352,16 @@ static struct rtcp_handler mos_handlers = {
.xr_dlrr = mos_xr_dlrr,
.xr_voip_metrics = mos_xr_voip_metrics,
};
static struct rtcp_handler transcode_handlers = {
.common = transcode_common,
.rr = transcode_rr,
.sr = transcode_sr,
};
static struct rtcp_handler transcode_handlers_wrap = {
.common = transcode_common_wrap,
.rr = transcode_rr_wrap,
.sr = transcode_sr_wrap,
};
static struct rtcp_handler log_handlers = {
.init = logging_init,
.start = logging_start,
@ -374,6 +393,7 @@ static struct rtcp_handler homer_handlers = {
static struct rtcp_handlers rtcp_handlers = {
.scratch = &scratch_handlers,
.mos = &mos_handlers,
.transcode = &transcode_handlers_wrap,
// remainder is variable
};
@ -383,11 +403,13 @@ static struct rtcp_handlers rtcp_handlers = {
rtcp_handlers.type->func(log_ctx, ##__VA_ARGS__); \
} while (0)
// macro to call all function handlers in one go
// order is important
#define CAH(func, ...) do { \
CH(func, scratch, ##__VA_ARGS__); \
CH(func, mos, ##__VA_ARGS__); \
CH(func, logging, ##__VA_ARGS__); \
CH(func, homer, ##__VA_ARGS__); \
CH(func, scratch, ##__VA_ARGS__); /* first parse out the values into scratch area */ \
CH(func, mos, ##__VA_ARGS__); /* process for MOS calculation */ \
CH(func, logging, ##__VA_ARGS__); /* log packets to syslog */ \
CH(func, homer, ##__VA_ARGS__); /* send contents to homer */ \
CH(func, transcode, ##__VA_ARGS__); /* translate for transcoding */ \
} while (0)
@ -448,6 +470,11 @@ static const int min_xr_packet_sizes[] = {
struct rtcp_handler *rtcp_transcode_handler = &transcode_handlers;
static struct rtcp_header *rtcp_length_check(str *s, size_t min_len, unsigned int *len_p) {
@ -617,15 +644,12 @@ void rtcp_list_free(GQueue *q) {
int rtcp_parse(GQueue *q, const str *_s, struct stream_fd *sfd, const endpoint_t *src,
const struct timeval *tv)
{
int rtcp_parse(GQueue *q, struct media_packet *mp) {
struct rtcp_header *hdr;
struct rtcp_chain_element *el;
rtcp_handler_func func;
str s = *_s;
struct call *c = sfd->call;
struct call_media *m = sfd->stream->media;
str s = mp->raw;
struct call *c = mp->call;
struct rtcp_process_ctx log_ctx_s,
*log_ctx;
unsigned int len;
@ -633,9 +657,7 @@ int rtcp_parse(GQueue *q, const str *_s, struct stream_fd *sfd, const endpoint_t
int min_packet_size;
ZERO(log_ctx_s);
log_ctx_s.call = c;
log_ctx_s.media = m;
log_ctx_s.received = tv;
log_ctx_s.mp = mp;
log_ctx = &log_ctx_s;
@ -687,25 +709,25 @@ next:
abort();
}
CAH(finish, c, src, &sfd->socket.local, tv);
CAH(finish, c, &mp->fsin, &mp->sfd->socket.local, &mp->tv);
CAH(destroy);
return 0;
error:
CAH(finish, c, src, &sfd->socket.local, tv);
CAH(finish, c, &mp->fsin, &mp->sfd->socket.local, &mp->tv);
CAH(destroy);
rtcp_list_free(q);
return -1;
}
int rtcp_avpf2avp_filter(str *s, GQueue *rtcp_list) {
int rtcp_avpf2avp_filter(struct media_packet *mp, GQueue *rtcp_list) {
GList *l;
struct rtcp_chain_element *el;
void *start;
unsigned int removed, left;
left = s->len;
left = mp->raw.len;
removed = 0;
for (l = rtcp_list->head; l; l = l->next) {
el = l->data;
@ -724,8 +746,8 @@ int rtcp_avpf2avp_filter(str *s, GQueue *rtcp_list) {
}
}
s->len -= removed;
if (!s->len)
mp->raw.len -= removed;
if (!mp->raw.len)
return -1;
return 0;
@ -884,10 +906,10 @@ static void str_sanitize(GString *s) {
static void scratch_common(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common) {
static void scratch_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) {
ctx->scratch_common_ssrc = htonl(common->ssrc);
}
static void scratch_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) {
static void scratch_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) {
ctx->scratch.rr = (struct ssrc_receiver_report) {
.from = ctx->scratch_common_ssrc,
.ssrc = htonl(rr->ssrc),
@ -899,7 +921,7 @@ static void scratch_rr(struct rtcp_process_ctx *ctx, const struct report_block *
};
ctx->scratch.rr.packets_lost = (rr->number_lost[0] << 16) | (rr->number_lost[1] << 8) | rr->number_lost[2];
}
static void scratch_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) {
static void scratch_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) {
ctx->scratch.sr = (struct ssrc_sender_report) {
.ssrc = ctx->scratch_common_ssrc,
.ntp_msw = ntohl(sr->ntp_msw),
@ -958,7 +980,7 @@ static void homer_init(struct rtcp_process_ctx *ctx) {
ctx->json = g_string_new("{ ");
ctx->json_init_len = ctx->json->len;
}
static void homer_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) {
static void homer_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) {
g_string_append_printf(ctx->json, "\"sender_information\":{\"ntp_timestamp_sec\":%u,"
"\"ntp_timestamp_usec\":%u,\"octets\":%u,\"rtp_timestamp\":%u, \"packets\":%u},",
ctx->scratch.sr.ntp_msw,
@ -973,7 +995,7 @@ static void homer_rr_list_start(struct rtcp_process_ctx *ctx, const struct rtcp_
common->header.pt,
common->header.count);
}
static void homer_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) {
static void homer_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) {
g_string_append_printf(ctx->json, "{\"source_ssrc\":%u,"
"\"highest_seq_no\":%u,\"fraction_lost\":%u,\"ia_jitter\":%u,"
"\"packets_lost\":%u,\"lsr\":%u,\"dlsr\":%u},",
@ -1056,7 +1078,7 @@ static void logging_start(struct rtcp_process_ctx *ctx, struct call *c) {
g_string_append_printf(ctx->log, "["STR_FORMAT"] ", STR_FMT(&c->callid));
ctx->log_init_len = ctx->log->len;
}
static void logging_common(struct rtcp_process_ctx *ctx, const struct rtcp_packet *common) {
static void logging_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) {
g_string_append_printf(ctx->log,"version=%u, padding=%u, count=%u, payloadtype=%u, length=%u, ssrc=%u, ",
common->header.version,
common->header.p,
@ -1073,7 +1095,7 @@ static void logging_sdes_list_start(struct rtcp_process_ctx *ctx, const struct s
sdes->header.pt,
ntohs(sdes->header.length));
}
static void logging_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) {
static void logging_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) {
g_string_append_printf(ctx->log,"ntp_sec=%u, ntp_fractions=%u, rtp_ts=%u, sender_packets=%u, " \
"sender_bytes=%u, ",
ctx->scratch.sr.ntp_msw,
@ -1082,7 +1104,7 @@ static void logging_sr(struct rtcp_process_ctx *ctx, const struct sender_report_
ctx->scratch.sr.packet_count,
ctx->scratch.sr.octet_count);
}
static void logging_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) {
static void logging_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) {
g_string_append_printf(ctx->log,"ssrc=%u, fraction_lost=%u, packet_loss=%u, last_seq=%u, jitter=%u, last_sr=%u, delay_since_last_sr=%u, ",
ctx->scratch.rr.ssrc,
rr->fraction_lost,
@ -1179,20 +1201,80 @@ static void logging_destroy(struct rtcp_process_ctx *ctx) {
static void mos_sr(struct rtcp_process_ctx *ctx, const struct sender_report_packet *sr) {
ssrc_sender_report(ctx->media, &ctx->scratch.sr, ctx->received);
static void mos_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) {
ssrc_sender_report(ctx->mp->media, &ctx->scratch.sr, &ctx->mp->tv);
}
static void mos_rr(struct rtcp_process_ctx *ctx, const struct report_block *rr) {
ssrc_receiver_report(ctx->media, &ctx->scratch.rr, ctx->received);
static void mos_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) {
ssrc_receiver_report(ctx->mp->media, &ctx->scratch.rr, &ctx->mp->tv);
}
static void mos_xr_rr_time(struct rtcp_process_ctx *ctx, const struct xr_rb_rr_time *rr) {
ssrc_receiver_rr_time(ctx->media, &ctx->scratch.xr_rr, ctx->received);
ssrc_receiver_rr_time(ctx->mp->media, &ctx->scratch.xr_rr, &ctx->mp->tv);
}
static void mos_xr_dlrr(struct rtcp_process_ctx *ctx, const struct xr_rb_dlrr *dlrr) {
ssrc_receiver_dlrr(ctx->media, &ctx->scratch.xr_dlrr, ctx->received);
ssrc_receiver_dlrr(ctx->mp->media, &ctx->scratch.xr_dlrr, &ctx->mp->tv);
}
static void mos_xr_voip_metrics(struct rtcp_process_ctx *ctx, const struct xr_rb_voip_metrics *rb_voip_mtc) {
ssrc_voip_metrics(ctx->media, &ctx->scratch.xr_vm, ctx->received);
ssrc_voip_metrics(ctx->mp->media, &ctx->scratch.xr_vm, &ctx->mp->tv);
}
static void transcode_common(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) {
assert(ctx->scratch_common_ssrc == ctx->mp->ssrc_in->parent->h.ssrc);
common->ssrc = htonl(ctx->mp->ssrc_in->ssrc_map_out);
}
static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) {
assert(ctx->scratch.rr.from == ctx->mp->ssrc_in->parent->h.ssrc);
// reverse SSRC mapping
struct ssrc_ctx *map_ctx = get_ssrc_ctx(ctx->scratch.rr.ssrc, ctx->mp->call->ssrc_hash,
SSRC_DIR_OUTPUT);
rr->ssrc = htonl(map_ctx->ssrc_map_out);
// ilog(LOG_DEBUG, "transcode_rr: from ssrc %x about %x "
// "ssrc_in %x ssrc_in-map %x ssrc_out %x ssrc_out-map %x "
// "map_ctx %x map_ctx-map %x",
// ctx->scratch.rr.from,
// ctx->scratch.rr.ssrc,
// ctx->mp->ssrc_in->parent->h.ssrc,
// ctx->mp->ssrc_in->ssrc_map_out,
// ctx->mp->ssrc_out->parent->h.ssrc,
// ctx->mp->ssrc_out->ssrc_map_out,
// map_ctx->parent->h.ssrc,
// map_ctx->ssrc_map_out);
// translate ctx->scratch.rr.from to ctx->mp->ssrc_in->ssrc_map_out - done by transcode_common
}
static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) {
assert(ctx->scratch.sr.ssrc == ctx->mp->ssrc_in->parent->h.ssrc);
// ilog(LOG_DEBUG, "transcode_sr: ssrc %x ssrc_in %x ssrc_in-map %x ssrc_out %x ssrc_out-map %x",
// ctx->scratch.sr.ssrc,
// ctx->mp->ssrc_in->parent->h.ssrc,
// ctx->mp->ssrc_in->ssrc_map_out,
// ctx->mp->ssrc_out->parent->h.ssrc,
// ctx->mp->ssrc_out->ssrc_map_out);
//
// translate ctx->scratch.sr.ssrc to ctx->mp->ssrc_in->ssrc_map_out - done by transcode_common
}
static void transcode_common_wrap(struct rtcp_process_ctx *ctx, struct rtcp_packet *common) {
if (!ctx->mp->media->rtcp_handler)
return;
ctx->mp->media->rtcp_handler->common(ctx, common);
}
static void transcode_rr_wrap(struct rtcp_process_ctx *ctx, struct report_block *rr) {
if (!ctx->mp->media->rtcp_handler)
return;
ctx->mp->media->rtcp_handler->rr(ctx, rr);
}
static void transcode_sr_wrap(struct rtcp_process_ctx *ctx, struct sender_report_packet *sr) {
if (!ctx->mp->media->rtcp_handler)
return;
ctx->mp->media->rtcp_handler->sr(ctx, sr);
}

@ -10,15 +10,25 @@
struct crypto_context;
struct rtcp_packet;
struct ssrc_ctx;
struct rtcp_handler;
struct rtcp_parse_ctx {
struct call *call;
struct call_media *media;
const struct timeval *received;
};
extern struct rtcp_handler *rtcp_transcode_handler;
int rtcp_avp2savp(str *, struct crypto_context *, struct ssrc_ctx *);
int rtcp_savp2avp(str *, struct crypto_context *, struct ssrc_ctx *);
int rtcp_payload(struct rtcp_packet **out, str *p, const str *s);
int rtcp_parse(GQueue *q, const str *, struct stream_fd *sfd, const endpoint_t *, const struct timeval *);
int rtcp_parse(GQueue *q, struct media_packet *);
void rtcp_list_free(GQueue *q);
rtcp_filter_func rtcp_avpf2avp_filter;

@ -102,7 +102,7 @@ restart:
while (G_UNLIKELY(ht->q.length > 20)) { // arbitrary limit
struct ssrc_entry *old_ent = g_queue_pop_head(&ht->q);
ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %u) - deleting SSRC %u",
ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %x) - deleting SSRC %x",
ssrc, old_ent->ssrc);
g_hash_table_remove(ht->ht, &old_ent->ssrc);
g_atomic_pointer_set(&ht->cache, NULL);
@ -236,7 +236,7 @@ found:;
mutex_unlock(&e->h.lock);
rtt -= (long long) delay * 1000000LL / 65536LL;
ilog(LOG_DEBUG, "Calculated round-trip time for %u is %lli us", ssrc, rtt);
ilog(LOG_DEBUG, "Calculated round-trip time for %x is %lli us", ssrc, rtt);
if (rtt <= 0 || rtt > 10000000) {
ilog(LOG_DEBUG, "Invalid RTT - discarding");
@ -260,7 +260,7 @@ void ssrc_sender_report(struct call_media *m, const struct ssrc_sender_report *s
seri->report = *sr;
ilog(LOG_DEBUG, "SR from %u: RTP TS %u PC %u OC %u NTP TS %u/%u=%f",
ilog(LOG_DEBUG, "SR from %x: RTP TS %u PC %u OC %u NTP TS %u/%u=%f",
sr->ssrc, sr->timestamp, sr->packet_count, sr->octet_count,
sr->ntp_msw, sr->ntp_lsw, seri->time_item.ntp_ts);
@ -271,7 +271,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor
{
struct call *c = m->call;
ilog(LOG_DEBUG, "RR from %u about %u: FL %u TL %u HSR %u J %u LSR %u DLSR %u",
ilog(LOG_DEBUG, "RR from %x about %x: FL %u TL %u HSR %u J %u LSR %u DLSR %u",
rr->from, rr->ssrc, rr->fraction_lost, rr->packets_lost,
rr->high_seq_received, rr->jitter, rr->lsr, rr->dlsr);
@ -299,7 +299,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor
goto out_nl;
}
unsigned int jitter = rpt->clock_rate ? (rr->jitter * 1000 / rpt->clock_rate) : rr->jitter;
ilog(LOG_DEBUG, "Calculated jitter for %u is %u ms", rr->ssrc, jitter);
ilog(LOG_DEBUG, "Calculated jitter for %x is %u ms", rr->ssrc, jitter);
ilog(LOG_DEBUG, "Adding opposide side RTT of %u us", other_e->last_rtt);
@ -312,7 +312,7 @@ void ssrc_receiver_report(struct call_media *m, const struct ssrc_receiver_repor
};
mos_calc(ssb);
ilog(LOG_DEBUG, "Calculated MOS from RR for %u is %.1f", rr->from, (double) ssb->mos / 10.0);
ilog(LOG_DEBUG, "Calculated MOS from RR for %x is %.1f", rr->from, (double) ssb->mos / 10.0);
// got a new stats block, add it to reporting ssrc
mutex_lock(&other_e->h.lock);
@ -358,7 +358,7 @@ void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *r
if (!srti)
return;
ilog(LOG_DEBUG, "XR RR TIME from %u: NTP TS %u/%u=%f",
ilog(LOG_DEBUG, "XR RR TIME from %x: NTP TS %u/%u=%f",
rr->ssrc,
rr->ntp_msw, rr->ntp_lsw, srti->time_item.ntp_ts);
@ -368,7 +368,7 @@ void ssrc_receiver_rr_time(struct call_media *m, const struct ssrc_xr_rr_time *r
void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr,
const struct timeval *tv)
{
ilog(LOG_DEBUG, "XR DLRR from %u about %u: LRR %u DLRR %u",
ilog(LOG_DEBUG, "XR DLRR from %x about %x: LRR %u DLRR %u",
dlrr->from, dlrr->ssrc,
dlrr->lrr, dlrr->dlrr);
@ -379,7 +379,7 @@ void ssrc_receiver_dlrr(struct call_media *m, const struct ssrc_xr_dlrr *dlrr,
void ssrc_voip_metrics(struct call_media *m, const struct ssrc_xr_voip_metrics *vm,
const struct timeval *tv)
{
ilog(LOG_DEBUG, "XR VM from %u about %u: LR %u DR %u BD %u GD %u BDu %u GDu %u RTD %u "
ilog(LOG_DEBUG, "XR VM from %x about %x: LR %u DR %u BD %u GD %u BDu %u GDu %u RTD %u "
"ESD %u SL %u NL %u RERL %u GMin %u R %u eR %u MOSL %u MOSC %u RX %u "
"JBn %u JBm %u JBam %u",
vm->from, vm->ssrc,

@ -66,7 +66,7 @@ sub input {
my ($vpxcc, $pt, $seq, $ts, $ssrc, $payload) = unpack("CCnNN a*", $packet);
$vpxcc == 0x80 or die;
$pt == 0 or die;
#$pt == 0 or die;
my $remote = ($self->{other_ssrcs}->{$ssrc} //= {
ssrc => $ssrc,

@ -372,10 +372,11 @@ sub _input {
$$input eq $exp or die;
}
$self->{media_packets_received}->[$component]++;
$$input = '';
$self->{client_components}->[$component] and
$self->{client_components}->[$component]->input($exp);
$self->{client_components}->[$component]->input($$input);
$$input = '';
}
sub _timer {

Loading…
Cancel
Save