From fd275acc5c98be80404770d6beceb8e4c1955153 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 25 Jan 2021 15:29:49 -0500 Subject: [PATCH] TT#98901 use DTX buffer for related DTMF events Change-Id: Ia9b83bf7a0989ec2e20ac0d8ea9a1024a8f5d417 --- daemon/codec.c | 183 +++++++++++++++++++++++++++++++++---------------- daemon/ssrc.c | 3 + 2 files changed, 126 insertions(+), 60 deletions(-) diff --git a/daemon/codec.c b/daemon/codec.c index cc4e94d30..a18535934 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -75,6 +75,7 @@ struct dtx_buffer { int tspp; // timestamp increment per packet struct call *call; unsigned long ts; + unsigned int ts_seq; // for subsequent packets with same TS, e.g. DTMF time_t start; }; struct dtx_entry { @@ -83,6 +84,8 @@ struct dtx_entry { struct media_packet mp; unsigned long ts; void *ssrc_ptr; // opaque pointer, doesn't hold a reference + struct codec_ssrc_handler *decoder_handler; // holds reference + int (*func)(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp); }; struct silence_event { @@ -177,6 +180,11 @@ static int packet_decoded_direct(decoder_t *decoder, AVFrame *frame, void *u1, v static void codec_touched(struct rtp_payload_type *pt, struct call_media *media); +static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *ch, + struct transcode_packet *packet, struct media_packet *mp, + int (*func)(struct codec_ssrc_handler *ch, struct transcode_packet *packet, + struct media_packet *mp)); + static struct codec_handler codec_handler_stub_ssrc = { .source_pt.payload_type = -1, @@ -1923,8 +1931,8 @@ static struct codec_ssrc_handler *__output_ssrc_handler(struct codec_ssrc_handle return new_ch; } -static void packet_dtmf_fwd(struct codec_ssrc_handler *ch, struct transcode_packet *packet, - struct media_packet *mp, int seq_inc) +static int packet_dtmf_fwd(struct codec_ssrc_handler *ch, struct transcode_packet *packet, + struct media_packet *mp) { int payload_type = -1; // take from handler's output config @@ -1986,10 +1994,46 @@ skip:; packet->marker, packet->p.seq, -1, payload_type); else // use our own sequencing __output_rtp(mp, ch, packet->handler ? : ch->handler, buf, packet->payload->len, packet->ts, - packet->marker, -1, seq_inc, payload_type); + packet->marker, -1, 0, payload_type); + + return 0; +} + +// returns the codec handler for the primary payload type - mostly determined by guessing +static struct codec_handler *__decoder_handler(struct codec_handler *h, struct media_packet *mp) { + if (!mp->ssrc_in) + return h; + + for (int i = 0; i < mp->ssrc_in->tracker.most_len; i++) { + int prim_pt = mp->ssrc_in->tracker.most[i]; + if (prim_pt == 255) + continue; + + struct codec_handler *sequencer_h = codec_handler_get(mp->media, prim_pt); + if (sequencer_h == h) + continue; + if (sequencer_h->source_pt.codec_def && sequencer_h->source_pt.codec_def->supplemental) + continue; + ilogs(transcoding, LOG_DEBUG, "Primary RTP payload type for handling %s is %i", + h->source_pt.codec_def->rtpname, + prim_pt); + return sequencer_h; + } + return h; } + static int packet_dtmf(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp) { + struct codec_ssrc_handler *decoder_ch = NULL; + + if (mp->ssrc_in) { + // find our decoder handler for the appropriate DTX buffer + struct codec_handler *handler = ch->handler; + struct codec_handler *decoder_handler = __decoder_handler(handler, mp); + decoder_ch = get_ssrc(mp->ssrc_in->parent->h.ssrc, + decoder_handler->ssrc_hash); + } + if (ch->ts_in != packet->ts) { // ignore already processed events int ret = dtmf_event(mp, packet->payload, ch->encoder_format.clockrate); if (G_UNLIKELY(ret == -1)) // error @@ -2000,15 +2044,25 @@ static int packet_dtmf(struct codec_ssrc_handler *ch, struct transcode_packet *p } } - if (!mp->call->block_dtmf && !mp->media->monologue->block_dtmf) - packet_dtmf_fwd(ch, packet, mp, 0); - return 0; + int ret = 0; + + if (!mp->call->block_dtmf && !mp->media->monologue->block_dtmf) { + if (decoder_ch && __buffer_dtx(decoder_ch->dtx_buffer, ch, packet, mp, packet_dtmf_fwd)) + ret = 1; // consumed + else + packet_dtmf_fwd(ch, packet, mp); + } + + if (decoder_ch) + obj_put(&decoder_ch->h); + + return ret; } static int packet_dtmf_dup(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp) { if (!mp->call->block_dtmf && !mp->media->monologue->block_dtmf) - packet_dtmf_fwd(ch, packet, mp, 0); + packet_dtmf_fwd(ch, packet, mp); return 0; } @@ -2031,24 +2085,7 @@ static int __handler_func_supplemental(struct codec_handler *h, struct media_pac // determine the primary audio codec used by this SSRC, as the sequence numbers // and timing info is shared with it. we'll need to use the same sequencer - struct codec_handler *sequencer_h = h; // handler that contains the appropriate sequencer - if (mp->ssrc_in) { - for (int i = 0; i < mp->ssrc_in->tracker.most_len; i++) { - int prim_pt = mp->ssrc_in->tracker.most[i]; - if (prim_pt == 255) - continue; - - sequencer_h = codec_handler_get(mp->media, prim_pt); - if (sequencer_h == h) - continue; - if (sequencer_h->source_pt.codec_def && sequencer_h->source_pt.codec_def->supplemental) - continue; - ilogs(transcoding, LOG_DEBUG, "Primary RTP payload type for handling %s is %i", - h->source_pt.codec_def->rtpname, - prim_pt); - break; - } - } + struct codec_handler *sequencer_h = __decoder_handler(h, mp); // XXX ? h->output_handler = sequencer_h->output_handler; // XXX locking? @@ -2295,12 +2332,55 @@ static void __dtx_add_callback(struct dtx_buffer *dtxb, const struct timeval *ba dtxe->ssrc_ptr = ssrc_ptr; timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry); } +// consumes `packet` if buffered (returns 1) +static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *decoder_handler, + struct transcode_packet *packet, struct media_packet *mp, + int (*func)(struct codec_ssrc_handler *ch, struct transcode_packet *packet, + struct media_packet *mp)) +{ + if (!dtxb || !mp->sfd || !mp->ssrc_in || !mp->ssrc_out) + return 0; + + ilogs(transcoding, LOG_DEBUG, "Adding packet to DTX buffer"); + + unsigned long ts = packet->ts; + + mutex_lock(&dtxb->lock); + if (ts != dtxb->ts) { + dtxb->ts = ts; + dtxb->ts_seq = 0; + } + else + dtxb->ts_seq++; + unsigned int ts_seq = dtxb->ts_seq; + dtxb->start = rtpe_now.tv_sec; + mutex_unlock(&dtxb->lock); + + struct dtx_entry *dtxe = g_slice_alloc0(sizeof(*dtxe)); + dtxe->ttq_entry.when = rtpe_now; + timeval_add_usec(&dtxe->ttq_entry.when, rtpe_config.dtx_delay * 1000); + dtxe->packet = packet; + dtxe->func = func; + if (decoder_handler) + dtxe->decoder_handler = obj_get(&decoder_handler->h); + media_packet_copy(&dtxe->mp, mp); + timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry); + // packet now consumed + packet = NULL; + + __dtx_add_callback(dtxb, &rtpe_now, (rtpe_config.dtx_delay + dtxb->ptime) * 1000, mp, ts + ts_seq, 1, + mp->stream->ssrc_in); + + return 1; +} static void __dtx_entry_free(void *p) { struct dtx_entry *dtxe = p; if (dtxe->packet) __transcode_packet_free(dtxe->packet); media_packet_release(&dtxe->mp); + if (dtxe->decoder_handler) + obj_put(&dtxe->decoder_handler->h); g_slice_free1(sizeof(*dtxe), dtxe); } static void __dtx_send_later(struct timerthread_queue *ttq, void *p) { @@ -2312,11 +2392,13 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) { int ret = 0; mutex_lock(&dtxb->lock); - struct codec_ssrc_handler *ch = dtxb->csh ? obj_get(&dtxb->csh->h) : NULL; + struct codec_ssrc_handler *ch = dtxe->decoder_handler ? obj_get(&dtxe->decoder_handler->h) : NULL; + if (!ch && dtxb->csh) + ch = obj_get(&dtxb->csh->h); struct call *call = dtxb->call ? obj_get(dtxb->call) : NULL; mutex_unlock(&dtxb->lock); - if (!call) + if (!call || !ch) goto out; // shut down log_info_stream_fd(mp->sfd); @@ -2327,9 +2409,7 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) { if (packet) { ilogs(transcoding, LOG_DEBUG, "Decoding DTX-buffered RTP packet (TS %lu) now", packet->ts); - ret = decoder_input_data(ch->decoder, packet->payload, packet->ts, - ch->handler->packet_decoded, ch, &dtxe->mp); - mp->ssrc_out->parent->seq_diff--; + ret = dtxe->func(ch, packet, &dtxe->mp); if (ret) ilogs(transcoding, LOG_WARN | LOG_FLAG_LIMIT, "Decoder error while processing buffered RTP packet"); } @@ -2338,7 +2418,7 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) { mutex_lock(&dtxb->lock); unsigned int diff = rtpe_now.tv_sec - dtxb->start; - unsigned long dtxb_ts = dtxb->ts; + unsigned long dtxb_ts = dtxb->ts + dtxb->ts_seq; void *ssrc_ptr = dtxe->mp.stream->ssrc_in; if (dtxe_ts == dtxb_ts @@ -2349,8 +2429,10 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) { dtxe_ts); dtxb_ts += dtxb->tspp; + dtxb_ts -= dtxb->ts_seq; dtxe_ts = dtxb_ts; dtxb->ts = dtxb_ts; + dtxb->ts_seq = 0; mutex_unlock(&dtxb->lock); ret = decoder_lost_packet(ch->decoder, dtxe_ts, @@ -2837,6 +2919,13 @@ static int packet_decoded_direct(decoder_t *decoder, AVFrame *frame, void *u1, v return packet_decoded_common(decoder, frame, u1, u2, encoder_input_data); } +static int __rtp_decode(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp) +{ + int ret = decoder_input_data(ch->decoder, packet->payload, packet->ts, ch->handler->packet_decoded, + ch, mp); + mp->ssrc_out->parent->seq_diff--; + return ret; +} static int packet_decode(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp) { int ret = 0; @@ -2845,38 +2934,12 @@ static int packet_decode(struct codec_ssrc_handler *ch, struct transcode_packet ch->first_ts = packet->ts; ch->last_ts = packet->ts; - if (ch->dtx_buffer && mp->sfd && mp->ssrc_in && mp->ssrc_out) { - ilogs(transcoding, LOG_DEBUG, "Adding packet to DTX buffer"); - - struct dtx_buffer *dtxb = ch->dtx_buffer; - unsigned long ts = packet->ts; - - mutex_lock(&dtxb->lock); - if (ts != dtxb->ts) - dtxb->ts = ts; - dtxb->start = rtpe_now.tv_sec; - mutex_unlock(&dtxb->lock); - - struct dtx_entry *dtxe = g_slice_alloc0(sizeof(*dtxe)); - dtxe->ttq_entry.when = rtpe_now; - timeval_add_usec(&dtxe->ttq_entry.when, rtpe_config.dtx_delay * 1000); - dtxe->packet = packet; - media_packet_copy(&dtxe->mp, mp); - timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry); - // packet now consumed - packet = NULL; - - __dtx_add_callback(dtxb, &rtpe_now, (rtpe_config.dtx_delay + dtxb->ptime) * 1000, mp, ts, 1, - mp->stream->ssrc_in); - - ret = 1; - } + if (__buffer_dtx(ch->dtx_buffer, ch, packet, mp, __rtp_decode)) + ret = 1; // consumed else { ilogs(transcoding, LOG_DEBUG, "Decoding RTP packet now"); - ret = decoder_input_data(ch->decoder, packet->payload, packet->ts, ch->handler->packet_decoded, - ch, mp); + ret = __rtp_decode(ch, packet, mp); ret = ret ? -1 : 0; - mp->ssrc_out->parent->seq_diff--; } return ret; diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 94d6252b3..22e52c249 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -110,6 +110,9 @@ static int ssrc_time_cmp(const void *aa, const void *bb, void *pp) { void *get_ssrc(u_int32_t ssrc, struct ssrc_hash *ht /* , int *created */) { struct ssrc_entry *ent; + if (!ht) + return NULL; + restart: ent = find_ssrc(ssrc, ht); if (G_LIKELY(ent)) {