From 2240921ab3b9e8c60bcf24a4ab3185690a385e53 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 23 Feb 2022 14:09:11 -0500 Subject: [PATCH] TT#136957 use separate sequencer per sink/destination Change-Id: Ib960108003d5aa13ba1732d7a5e8c6720feca5c0 --- daemon/codec.c | 32 ++++-- daemon/dtmf.c | 7 +- daemon/ssrc.c | 3 +- include/ssrc.h | 2 +- t/auto-daemon-tests-pubsub.pl | 177 +++++++++++++++++++++++++++++++++- 5 files changed, 208 insertions(+), 13 deletions(-) diff --git a/daemon/codec.c b/daemon/codec.c index 7fe795830..75f69477e 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -398,7 +398,9 @@ static void __make_passthrough_ssrc(struct codec_handler *handler) { static void __reset_sequencer(void *p, void *dummy) { struct ssrc_entry_call *s = p; - s->sequencer.seq = -1; + if (s->sequencers) + g_hash_table_destroy(s->sequencers); + s->sequencers = NULL; } static void __make_transcoder(struct codec_handler *handler, struct rtp_payload_type *dest, GHashTable *output_transcoders, int dtmf_payload_type, bool pcm_dtmf_detect, @@ -1535,6 +1537,12 @@ static void __ssrc_unlock_both(struct media_packet *mp) { mutex_unlock(&ssrc_out_p->h.lock); } +static void __seq_free(void *p) { + packet_sequencer_t *seq = p; + packet_sequencer_destroy(seq); + g_slice_free1(sizeof(*seq), seq); +} + static int __handler_func_sequencer(struct media_packet *mp, struct transcode_packet *packet) { struct codec_handler *h = packet->handler; @@ -1588,10 +1596,18 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa __ssrc_lock_both(mp); - packet_sequencer_init(&ssrc_in_p->sequencer, (GDestroyNotify) __transcode_packet_free); + // get sequencer appropriate for our output + if (!ssrc_in_p->sequencers) + ssrc_in_p->sequencers = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __seq_free); + packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in_p->sequencers, mp->media_out); + if (!seq) { + seq = g_slice_alloc0(sizeof(*seq)); + packet_sequencer_init(seq, (GDestroyNotify) __transcode_packet_free); + g_hash_table_insert(ssrc_in_p->sequencers, mp->media_out, seq); + } - uint16_t seq_ori = ssrc_in_p->sequencer.seq; - int seq_ret = packet_sequencer_insert(&ssrc_in_p->sequencer, &packet->p); + uint16_t seq_ori = seq->seq; + int seq_ret = packet_sequencer_insert(seq, &packet->p); if (seq_ret < 0) { // dupe int func_ret = 0; @@ -1610,7 +1626,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa while (1) { int func_ret = 0; - packet = packet_sequencer_next_packet(&ssrc_in_p->sequencer); + packet = packet_sequencer_next_packet(seq); if (G_UNLIKELY(!packet)) { if (!ch || !h->dest_pt.clock_rate || !ch->handler || !h->dest_pt.codec_def) @@ -1627,7 +1643,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa unsigned long long ts_diff_us = (unsigned long long) ts_diff * 1000000 / h->dest_pt.clock_rate; if (ts_diff_us >= 60000) { // arbitrary value - packet = packet_sequencer_force_next_packet(&ssrc_in_p->sequencer); + packet = packet_sequencer_force_next_packet(seq); if (!packet) break; ilogs(transcoding, LOG_DEBUG, "Timestamp difference too large (%llu ms) after lost packet, " @@ -1672,8 +1688,8 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa goto next; } - ssrc_in_p->packets_lost = ssrc_in_p->sequencer.lost_count; - atomic64_set(&ssrc_in->last_seq, ssrc_in_p->sequencer.ext_seq); + ssrc_in_p->packets_lost = seq->lost_count; + atomic64_set(&ssrc_in->last_seq, seq->ext_seq); ilogs(transcoding, LOG_DEBUG, "Processing RTP packet: seq %u, TS %lu", packet->p.seq, packet->ts); diff --git a/daemon/dtmf.c b/daemon/dtmf.c index 4f881fbad..a2af90706 100644 --- a/daemon/dtmf.c +++ b/daemon/dtmf.c @@ -490,6 +490,9 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media * struct sink_handler *sh = l->data; struct packet_stream *sink_ps = sh->sink; struct call_monologue *sink_ml = sink_ps->media->monologue; + packet_sequencer_t *seq = g_hash_table_lookup(ssrc_in->parent->sequencers, sink_ps->media); + if (!seq) + continue; struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out, sink_ml->ssrc_hash, SSRC_DIR_OUTPUT, @@ -511,7 +514,7 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media * struct rtp_header rtp = { .m_pt = 0xff, .timestamp = 0, - .seq_num = htons(ssrc_in->parent->sequencer.seq), + .seq_num = htons(seq->seq), .ssrc = htonl(ssrc_in->parent->h.ssrc), }; struct media_packet packet = { @@ -535,7 +538,7 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media * // insert pause tep.event = 0xff; tep.duration = htons(pause_samples); - rtp.seq_num = htons(ssrc_in->parent->sequencer.seq); + rtp.seq_num = htons(seq->seq); ch->dtmf_injector->handler_func(ch->dtmf_injector, &packet); diff --git a/daemon/ssrc.c b/daemon/ssrc.c index b00bd7e71..501ddf38b 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -53,7 +53,8 @@ static void __free_ssrc_entry_call(void *ep) { g_queue_clear_full(&e->sender_reports, (GDestroyNotify) free_sender_report); g_queue_clear_full(&e->rr_time_reports, (GDestroyNotify) free_rr_time); g_queue_clear_full(&e->stats_blocks, (GDestroyNotify) free_stats_block); - packet_sequencer_destroy(&e->sequencer); + if (e->sequencers) + g_hash_table_destroy(e->sequencers); } static void ssrc_entry_put(void *ep) { struct ssrc_entry_call *e = ep; diff --git a/include/ssrc.h b/include/ssrc.h index e6f02298d..96314baa7 100644 --- a/include/ssrc.h +++ b/include/ssrc.h @@ -118,7 +118,7 @@ struct ssrc_entry_call { // for transcoding // input only - packet_sequencer_t sequencer; + GHashTable *sequencers; uint32_t jitter, transit; // output only uint16_t seq_diff; diff --git a/t/auto-daemon-tests-pubsub.pl b/t/auto-daemon-tests-pubsub.pl index 06183cb57..cd25c23da 100755 --- a/t/auto-daemon-tests-pubsub.pl +++ b/t/auto-daemon-tests-pubsub.pl @@ -16,7 +16,7 @@ autotest_start(qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1 my ($sock_a, $sock_b, $sock_c, $sock_d, $port_a, $port_b, $port_c, $ssrc_a, $ssrc_b, $resp, - $sock_ax, $sock_bx, $port_ax, $port_bx, $port_d, + $sock_ax, $sock_bx, $port_ax, $port_bx, $port_d, $sock_e, $port_e, $srtp_ctx_a, $srtp_ctx_b, $srtp_ctx_a_rev, $srtp_ctx_b_rev, $ufrag_a, $ufrag_b, @ret1, @ret2, @ret3, @ret4, $srtp_key_a, $srtp_key_b, $ts, $seq, $tag_medias, $media_labels, $ftr, $ttr, $fts, $ttr2); @@ -1622,6 +1622,181 @@ rcv($sock_c, $port_c, rtpm(0, 4001, 7160, $ssrc_b, "\x00" x 160)); +($sock_a, $sock_b, $sock_c, $sock_d, $sock_e) = + new_call([qw(198.51.100.14 6132)], + [qw(198.51.100.14 6134)], + [qw(198.51.100.14 6136)], + [qw(198.51.100.14 6138)], + [qw(198.51.100.14 6140)]); + +($port_a) = offer('multi subs w diff codecs', + { }, < ft(), codec => {transcode => ['PCMA', 'G722', 'G723'] } }, < $ttr }, < ft(), codec => {transcode => ['PCMA', 'G722', 'G723'] } }, < $ttr, flags => ['allow transcoding'] }, < ft(), codec => {transcode => ['PCMA', 'G722', 'G723'] } }, < $ttr, flags => ['allow transcoding'] }, <