From bec997590b71798129a42bf6566e4cb2d5fe51ce Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 20 Dec 2022 13:49:22 -0500 Subject: [PATCH] MT#55283 abstract RTP sending and scheduling Change-Id: Ieed08617b2c61b0909c7ff2e4fef692451a263b9 --- daemon/codec.c | 199 +++++++++++++++++++++++++----------------------- include/codec.h | 20 +++++ 2 files changed, 125 insertions(+), 94 deletions(-) diff --git a/daemon/codec.c b/daemon/codec.c index a5e842384..d4090ffe4 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -174,11 +174,7 @@ struct codec_ssrc_handler { int bitrate; int ptime; int bytes_per_packet; - unsigned long first_ts; // for output TS scaling - unsigned long last_ts; // to detect input lag and handle lost packets - struct timeval first_send; - unsigned long first_send_ts; - long output_skew; + struct codec_scheduler csch; GString *sample_buffer; struct dtx_buffer *dtx_buffer; @@ -513,9 +509,9 @@ struct codec_handler *codec_handler_make_playback(const struct rtp_payload_type rtp_payload_type_copy(&handler->dest_pt, dst_pt); handler->handler_func = handler_func_playback; handler->ssrc_handler = (void *) __ssrc_handler_transcode_new(handler); - handler->ssrc_handler->first_ts = last_ts; - while (handler->ssrc_handler->first_ts == 0) - handler->ssrc_handler->first_ts = ssl_random(); + handler->ssrc_handler->csch.first_ts = last_ts; + while (handler->ssrc_handler->csch.first_ts == 0) + handler->ssrc_handler->csch.first_ts = ssl_random(); handler->ssrc_handler->rtp_mark = 1; ilogs(codec, LOG_DEBUG, "Created media playback context for " STR_FORMAT " -> " STR_FORMAT "", @@ -1658,7 +1654,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa || !h->dest_pt.codec_def) break; - uint32_t ts_diff = packet_ts - ch->last_ts; + uint32_t ts_diff = packet_ts - ch->csch.last_ts; // if packet TS is larger than last tracked TS, we can force the next packet if packets were lost and the TS // difference is too large. if packet TS is the same or lower (can happen for supplement codecs) we can wait @@ -1681,20 +1677,20 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa } if (ch) { - uint32_t ts_diff = ch->last_ts - packet->ts; + uint32_t ts_diff = ch->csch.last_ts - packet->ts; if (ts_diff < 0x80000000) { // ch->last_ts >= packet->ts // multiple consecutive packets with same TS: this could be a compound packet, e.g. a large video frame, or // it could be a supplemental audio codec with static timestamps, in which case we adjust the TS forward // by one frame length. This is needed so that the next real audio packet (with real TS) is not mistakenly // seen as overdue if (h->source_pt.codec_def && h->source_pt.codec_def->supplemental) - ch->last_ts += h->source_pt.clock_rate * (ch->ptime ?: 20) / 1000; + ch->csch.last_ts += h->source_pt.clock_rate * (ch->ptime ?: 20) / 1000; } else - ch->last_ts = packet->ts; + ch->csch.last_ts = packet->ts; if (input_ch) - input_ch->last_ts = ch->last_ts; + input_ch->csch.last_ts = ch->csch.last_ts; } @@ -1751,8 +1747,8 @@ out_ch: return 0; } -static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, - struct codec_handler *handler, // normally == ch->handler except for DTMF +void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch, + struct codec_handler *handler, char *buf, // malloc'd, room for rtp_header + filled-in payload unsigned int payload_len, unsigned long payload_ts, @@ -1791,10 +1787,10 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, // this packet is dynamically allocated, so we're able to schedule it. // determine scheduled time to send - if (ch->first_send.tv_sec && handler->dest_pt.clock_rate) { + if (csch->first_send.tv_sec && handler->dest_pt.clock_rate) { // scale first_send from first_send_ts to ts - p->ttq_entry.when = ch->first_send; - uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around + p->ttq_entry.when = csch->first_send; + uint32_t ts_diff = (uint32_t) ts - (uint32_t) csch->first_send_ts; // allow for wrap-around ts_diff += ts_delay; long long ts_diff_us = (unsigned long long) ts_diff * 1000000 / handler->dest_pt.clock_rate; @@ -1803,24 +1799,24 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, // how far in the future is this? ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); if (ts_diff_us > 1000000 || ts_diff_us < -1000000) // more than one second, can't be right - ch->first_send.tv_sec = 0; // fix it up below + csch->first_send.tv_sec = 0; // fix it up below } - if (!ch->first_send.tv_sec || !p->ttq_entry.when.tv_sec) { - p->ttq_entry.when = ch->first_send = rtpe_now; - ch->first_send_ts = ts; + if (!csch->first_send.tv_sec || !p->ttq_entry.when.tv_sec) { + p->ttq_entry.when = csch->first_send = rtpe_now; + csch->first_send_ts = ts; } long long ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); - ch->output_skew = ch->output_skew * 15 / 16 + ts_diff_us / 16; - if (ch->output_skew > 50000 && ts_diff_us > 10000) { // arbitrary value, 50 ms, 10 ms shift + csch->output_skew = csch->output_skew * 15 / 16 + ts_diff_us / 16; + if (csch->output_skew > 50000 && ts_diff_us > 10000) { // arbitrary value, 50 ms, 10 ms shift ilogs(transcoding, LOG_DEBUG, "Steady clock skew of %li.%01li ms detected, shifting send timer back by 10 ms", - ch->output_skew / 1000, - (ch->output_skew % 1000) / 100); + csch->output_skew / 1000, + (csch->output_skew % 1000) / 100); timeval_add_usec(&p->ttq_entry.when, -10000); - ch->output_skew -= 10000; - ch->first_send_ts += ch->encoder_format.clockrate / 100; + csch->output_skew -= 10000; + csch->first_send_ts += handler->dest_pt.clock_rate / 100; ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); } else if (ts_diff_us < 0) { @@ -1829,8 +1825,8 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, ts_diff_us / 1000, (ts_diff_us % 1000) / 100); timeval_add_usec(&p->ttq_entry.when, ts_diff_us); - ch->output_skew += ts_diff_us; - ch->first_send_ts -= (long long) ch->encoder_format.clockrate * ts_diff_us / 1000000; + csch->output_skew += ts_diff_us; + csch->first_send_ts -= (long long) handler->dest_pt.clock_rate * ts_diff_us / 1000000; ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); // should be 0 now } @@ -1882,12 +1878,7 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr if (G_UNLIKELY(!output_ch->encoder)) goto skip; - // init some vars - ch->first_ts = output_ch->first_ts; - ch->first_send_ts = output_ch->first_send_ts; - ch->output_skew = output_ch->output_skew; - ch->first_send = output_ch->first_send; - + ch->csch = output_ch->csch; // the correct output TS is the encoder's FIFO PTS at the start of the DTMF // event. however, we must shift the FIFO PTS forward as the DTMF event goes on @@ -1903,7 +1894,7 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr // roll back TS to start of event ts -= ch->last_dtmf_event_ts; // adjust to output RTP TS - unsigned long packet_ts = ts + output_ch->first_ts; + unsigned long packet_ts = ts + output_ch->csch.first_ts; ilogs(transcoding, LOG_DEBUG, "Scaling DTMF packet timestamp and duration: TS %lu -> %lu " "(%u -> %u)", @@ -1937,10 +1928,10 @@ skip: char *buf = malloc(packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM); memcpy(buf + sizeof(struct rtp_header), packet->payload->s, packet->payload->len); if (packet->bypass_seq) // inject original seq - __output_rtp(mp, ch, packet->handler ? : h, buf, packet->payload->len, packet->ts, + codec_output_rtp(mp, &ch->csch, packet->handler ? : h, buf, packet->payload->len, packet->ts, packet->marker, packet->p.seq, -1, payload_type, ts_delay); else // use our own sequencing - __output_rtp(mp, ch, packet->handler ? : h, buf, packet->payload->len, packet->ts, + codec_output_rtp(mp, &ch->csch, packet->handler ? : h, buf, packet->payload->len, packet->ts, packet->marker, -1, 0, payload_type, ts_delay); mp->ssrc_out->parent->seq_diff++; @@ -2293,7 +2284,7 @@ void codec_add_dtmf_event(struct codec_ssrc_handler *ch, int code, int level, ui ilogs(transcoding, LOG_DEBUG, "DTMF event state change: code %i, volume %i, TS %lu", new_ev.code, new_ev.volume, (unsigned long) ts); dtmf_dsp_event(&new_ev, &ch->dtmf_state, ch->handler->media, ch->handler->source_pt.clock_rate, - ts + ch->first_ts); + ts + ch->csch.first_ts); // add to queue if we're doing PCM -> DTMF event conversion // this does not capture events when doing DTMF delay (dtmf_payload_type == -1) @@ -3506,17 +3497,14 @@ static void __free_ssrc_handler(void *chp) { dtx_buffer_stop(&ch->dtx_buffer); } -static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { - struct codec_ssrc_handler *ch = u1; - struct media_packet *mp = u2; - ilogs(transcoding, LOG_DEBUG, "RTP media successfully encoded: TS %llu, len %i", - (unsigned long long) enc->avpkt->pts, enc->avpkt->size); - - // run this through our packetizer +void packet_encoded_packetize(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp, + void (*fn)(encoder_t *, struct codec_ssrc_handler *, struct media_packet *, str *, + char *, unsigned int)) +{ AVPacket *in_pkt = enc->avpkt; - while (1) { + while (true) { // figure out how big of a buffer we need unsigned int payload_len = MAX(MAX(enc->avpkt->size, ch->bytes_per_packet), sizeof(struct telephone_event_payload)); @@ -3541,46 +3529,7 @@ static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { ilogs(transcoding, LOG_DEBUG, "Received packet of %zu bytes from packetizer", inout.len); - // check special payloads - - unsigned int repeats = 0; - int payload_type = -1; - int dtmf_pt = ch->handler->dtmf_payload_type; - if (dtmf_pt == -1) - dtmf_pt = ch->handler->real_dtmf_payload_type; - int is_dtmf = 0; - - if (dtmf_pt != -1) - is_dtmf = dtmf_event_payload(&inout, (uint64_t *) &enc->avpkt->pts, enc->avpkt->duration, - &ch->dtmf_event, &ch->dtmf_events); - if (is_dtmf) { - payload_type = dtmf_pt; - if (is_dtmf == 1) - ch->rtp_mark = 1; // DTMF start event - else if (is_dtmf == 3) - repeats = 2; // DTMF end event - } - else { - if (is_silence_event(&inout, &ch->silence_events, enc->avpkt->pts, enc->avpkt->duration)) - payload_type = ch->handler->cn_payload_type; - } - - // ready to send - - do { - char *send_buf = buf; - if (repeats > 0) { - // need to duplicate the payload as __output_rtp consumes it - send_buf = malloc(pkt_len); - memcpy(send_buf, buf, pkt_len); - } - __output_rtp(mp, ch, ch->handler, send_buf, inout.len, ch->first_ts - + fraction_divl(enc->avpkt->pts, &enc->clockrate_fact), - ch->rtp_mark ? 1 : 0, -1, 0, - payload_type, 0); - mp->ssrc_out->parent->seq_diff++; - ch->rtp_mark = 0; - } while (repeats--); + fn(enc, ch, mp, &inout, buf, pkt_len); if (ret == 0) { // no more to go @@ -3590,10 +3539,72 @@ static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { // loop around and get more in_pkt = NULL; } +} + + +static void packet_encoded_tx(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp, + str *inout, char *buf, unsigned int pkt_len); + +static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { + struct codec_ssrc_handler *ch = u1; + struct media_packet *mp = u2; + + ilogs(transcoding, LOG_DEBUG, "RTP media successfully encoded: TS %llu, len %i", + (unsigned long long) enc->avpkt->pts, enc->avpkt->size); + + // run this through our packetizer + packet_encoded_packetize(enc, ch, mp, packet_encoded_tx); return 0; } +static void packet_encoded_tx(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp, + str *inout, char *buf, unsigned int pkt_len) +{ + // check special payloads + + unsigned int repeats = 0; + int payload_type = -1; + int dtmf_pt = ch->handler->dtmf_payload_type; + if (dtmf_pt == -1) + dtmf_pt = ch->handler->real_dtmf_payload_type; + int is_dtmf = 0; + + if (dtmf_pt != -1) + is_dtmf = dtmf_event_payload(inout, (uint64_t *) &enc->avpkt->pts, enc->avpkt->duration, + &ch->dtmf_event, &ch->dtmf_events); + if (is_dtmf) { + payload_type = dtmf_pt; + if (is_dtmf == 1) + ch->rtp_mark = 1; // DTMF start event + else if (is_dtmf == 3) + repeats = 2; // DTMF end event + } + else { + if (is_silence_event(inout, &ch->silence_events, enc->avpkt->pts, enc->avpkt->duration)) + payload_type = ch->handler->cn_payload_type; + } + + // ready to send + + do { + char *send_buf = buf; + if (repeats > 0) { + // need to duplicate the payload as codec_output_rtp consumes it + send_buf = malloc(pkt_len); + memcpy(send_buf, buf, pkt_len); + } + codec_output_rtp(mp, &ch->csch, ch->handler, send_buf, inout->len, ch->csch.first_ts + + fraction_divl(enc->avpkt->pts, &enc->clockrate_fact), + ch->rtp_mark ? 1 : 0, -1, 0, + payload_type, 0); + mp->ssrc_out->parent->seq_diff++; + ch->rtp_mark = 0; + } while (repeats--); +} + + + static void __dtmf_detect(struct codec_ssrc_handler *ch, AVFrame *frame) { if (!ch->dtmf_dsp) return; @@ -3649,13 +3660,13 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v struct codec_ssrc_handler *new_ch = __output_ssrc_handler(ch, mp); if (new_ch != ch) { // copy some essential parameters - if (!new_ch->first_ts) - new_ch->first_ts = ch->first_ts; + if (!new_ch->csch.first_ts) + new_ch->csch.first_ts = ch->csch.first_ts; if (decoder->def->supplemental) { // supp codecs return bogus timestamps. Adjust the frame's TS to be in // line with the primary decoder - frame->pts -= new_ch->first_ts; + frame->pts -= new_ch->csch.first_ts; } ch = new_ch; @@ -3693,7 +3704,7 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v if (mp->media_out) ch->encoder->callback = mp->media_out->encoder_callback; - uint32_t ts = frame->pts + ch->first_ts; + uint32_t ts = frame->pts + ch->csch.first_ts; __buffer_delay_frame(h->input_handler ? h->input_handler->delay_buffer : h->delay_buffer, ch, input_func, frame, mp, ts); frame = NULL; // consumed @@ -3728,8 +3739,8 @@ static int packet_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handle { int ret = 0; - if (!ch->first_ts) - ch->first_ts = packet->ts; + if (!ch->csch.first_ts) + ch->csch.first_ts = packet->ts; if (ch->decoder->def->dtmf) { if (packet_dtmf_event(ch, input_ch, packet, mp)) diff --git a/include/codec.h b/include/codec.h index 1c9abf61d..0d41e3845 100644 --- a/include/codec.h +++ b/include/codec.h @@ -76,6 +76,14 @@ struct codec_packet { void (*free_func)(void *); }; +struct codec_scheduler { + unsigned long first_ts; // for output TS scaling + unsigned long last_ts; // to detect input lag and handle lost packets + struct timeval first_send; + unsigned long first_send_ts; + long output_skew; +}; + void codecs_init(void); void codecs_cleanup(void); @@ -142,6 +150,18 @@ uint64_t codec_decoder_unskip_pts(struct codec_ssrc_handler *ch); void codec_tracker_update(struct codec_store *); void codec_handlers_stop(GQueue *); + +void packet_encoded_packetize(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp, + void (*fn)(encoder_t *, struct codec_ssrc_handler *, struct media_packet *, str *, + char *, unsigned int)); +void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *, + struct codec_handler *handler, // normally == ch->handler except for DTMF + char *buf, // malloc'd, room for rtp_header + filled-in payload + unsigned int payload_len, + unsigned long payload_ts, + int marker, int seq, int seq_inc, int payload_type, + unsigned long ts_delay); + #else INLINE bool codec_handlers_update(struct call_media *receiver, struct call_media *sink,