MT#55283 abstract RTP sending and scheduling

Change-Id: Ieed08617b2c61b0909c7ff2e4fef692451a263b9
pull/1592/head
Richard Fuchs 3 years ago
parent fe578109f0
commit bec997590b

@ -174,11 +174,7 @@ struct codec_ssrc_handler {
int bitrate; int bitrate;
int ptime; int ptime;
int bytes_per_packet; int bytes_per_packet;
unsigned long first_ts; // for output TS scaling struct codec_scheduler csch;
unsigned long last_ts; // to detect input lag and handle lost packets
struct timeval first_send;
unsigned long first_send_ts;
long output_skew;
GString *sample_buffer; GString *sample_buffer;
struct dtx_buffer *dtx_buffer; struct dtx_buffer *dtx_buffer;
@ -513,9 +509,9 @@ struct codec_handler *codec_handler_make_playback(const struct rtp_payload_type
rtp_payload_type_copy(&handler->dest_pt, dst_pt); rtp_payload_type_copy(&handler->dest_pt, dst_pt);
handler->handler_func = handler_func_playback; handler->handler_func = handler_func_playback;
handler->ssrc_handler = (void *) __ssrc_handler_transcode_new(handler); handler->ssrc_handler = (void *) __ssrc_handler_transcode_new(handler);
handler->ssrc_handler->first_ts = last_ts; handler->ssrc_handler->csch.first_ts = last_ts;
while (handler->ssrc_handler->first_ts == 0) while (handler->ssrc_handler->csch.first_ts == 0)
handler->ssrc_handler->first_ts = ssl_random(); handler->ssrc_handler->csch.first_ts = ssl_random();
handler->ssrc_handler->rtp_mark = 1; handler->ssrc_handler->rtp_mark = 1;
ilogs(codec, LOG_DEBUG, "Created media playback context for " STR_FORMAT " -> " STR_FORMAT "", ilogs(codec, LOG_DEBUG, "Created media playback context for " STR_FORMAT " -> " STR_FORMAT "",
@ -1658,7 +1654,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
|| !h->dest_pt.codec_def) || !h->dest_pt.codec_def)
break; break;
uint32_t ts_diff = packet_ts - ch->last_ts; uint32_t ts_diff = packet_ts - ch->csch.last_ts;
// if packet TS is larger than last tracked TS, we can force the next packet if packets were lost and the TS // if packet TS is larger than last tracked TS, we can force the next packet if packets were lost and the TS
// difference is too large. if packet TS is the same or lower (can happen for supplement codecs) we can wait // difference is too large. if packet TS is the same or lower (can happen for supplement codecs) we can wait
@ -1681,20 +1677,20 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
} }
if (ch) { if (ch) {
uint32_t ts_diff = ch->last_ts - packet->ts; uint32_t ts_diff = ch->csch.last_ts - packet->ts;
if (ts_diff < 0x80000000) { // ch->last_ts >= packet->ts if (ts_diff < 0x80000000) { // ch->last_ts >= packet->ts
// multiple consecutive packets with same TS: this could be a compound packet, e.g. a large video frame, or // multiple consecutive packets with same TS: this could be a compound packet, e.g. a large video frame, or
// it could be a supplemental audio codec with static timestamps, in which case we adjust the TS forward // it could be a supplemental audio codec with static timestamps, in which case we adjust the TS forward
// by one frame length. This is needed so that the next real audio packet (with real TS) is not mistakenly // by one frame length. This is needed so that the next real audio packet (with real TS) is not mistakenly
// seen as overdue // seen as overdue
if (h->source_pt.codec_def && h->source_pt.codec_def->supplemental) if (h->source_pt.codec_def && h->source_pt.codec_def->supplemental)
ch->last_ts += h->source_pt.clock_rate * (ch->ptime ?: 20) / 1000; ch->csch.last_ts += h->source_pt.clock_rate * (ch->ptime ?: 20) / 1000;
} }
else else
ch->last_ts = packet->ts; ch->csch.last_ts = packet->ts;
if (input_ch) if (input_ch)
input_ch->last_ts = ch->last_ts; input_ch->csch.last_ts = ch->csch.last_ts;
} }
@ -1751,8 +1747,8 @@ out_ch:
return 0; return 0;
} }
static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch,
struct codec_handler *handler, // normally == ch->handler except for DTMF struct codec_handler *handler,
char *buf, // malloc'd, room for rtp_header + filled-in payload char *buf, // malloc'd, room for rtp_header + filled-in payload
unsigned int payload_len, unsigned int payload_len,
unsigned long payload_ts, unsigned long payload_ts,
@ -1791,10 +1787,10 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
// this packet is dynamically allocated, so we're able to schedule it. // this packet is dynamically allocated, so we're able to schedule it.
// determine scheduled time to send // determine scheduled time to send
if (ch->first_send.tv_sec && handler->dest_pt.clock_rate) { if (csch->first_send.tv_sec && handler->dest_pt.clock_rate) {
// scale first_send from first_send_ts to ts // scale first_send from first_send_ts to ts
p->ttq_entry.when = ch->first_send; p->ttq_entry.when = csch->first_send;
uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around uint32_t ts_diff = (uint32_t) ts - (uint32_t) csch->first_send_ts; // allow for wrap-around
ts_diff += ts_delay; ts_diff += ts_delay;
long long ts_diff_us = long long ts_diff_us =
(unsigned long long) ts_diff * 1000000 / handler->dest_pt.clock_rate; (unsigned long long) ts_diff * 1000000 / handler->dest_pt.clock_rate;
@ -1803,24 +1799,24 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
// how far in the future is this? // how far in the future is this?
ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now);
if (ts_diff_us > 1000000 || ts_diff_us < -1000000) // more than one second, can't be right if (ts_diff_us > 1000000 || ts_diff_us < -1000000) // more than one second, can't be right
ch->first_send.tv_sec = 0; // fix it up below csch->first_send.tv_sec = 0; // fix it up below
} }
if (!ch->first_send.tv_sec || !p->ttq_entry.when.tv_sec) { if (!csch->first_send.tv_sec || !p->ttq_entry.when.tv_sec) {
p->ttq_entry.when = ch->first_send = rtpe_now; p->ttq_entry.when = csch->first_send = rtpe_now;
ch->first_send_ts = ts; csch->first_send_ts = ts;
} }
long long ts_diff_us long long ts_diff_us
= timeval_diff(&p->ttq_entry.when, &rtpe_now); = timeval_diff(&p->ttq_entry.when, &rtpe_now);
ch->output_skew = ch->output_skew * 15 / 16 + ts_diff_us / 16; csch->output_skew = csch->output_skew * 15 / 16 + ts_diff_us / 16;
if (ch->output_skew > 50000 && ts_diff_us > 10000) { // arbitrary value, 50 ms, 10 ms shift if (csch->output_skew > 50000 && ts_diff_us > 10000) { // arbitrary value, 50 ms, 10 ms shift
ilogs(transcoding, LOG_DEBUG, "Steady clock skew of %li.%01li ms detected, shifting send timer back by 10 ms", ilogs(transcoding, LOG_DEBUG, "Steady clock skew of %li.%01li ms detected, shifting send timer back by 10 ms",
ch->output_skew / 1000, csch->output_skew / 1000,
(ch->output_skew % 1000) / 100); (csch->output_skew % 1000) / 100);
timeval_add_usec(&p->ttq_entry.when, -10000); timeval_add_usec(&p->ttq_entry.when, -10000);
ch->output_skew -= 10000; csch->output_skew -= 10000;
ch->first_send_ts += ch->encoder_format.clockrate / 100; csch->first_send_ts += handler->dest_pt.clock_rate / 100;
ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now);
} }
else if (ts_diff_us < 0) { else if (ts_diff_us < 0) {
@ -1829,8 +1825,8 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
ts_diff_us / 1000, ts_diff_us / 1000,
(ts_diff_us % 1000) / 100); (ts_diff_us % 1000) / 100);
timeval_add_usec(&p->ttq_entry.when, ts_diff_us); timeval_add_usec(&p->ttq_entry.when, ts_diff_us);
ch->output_skew += ts_diff_us; csch->output_skew += ts_diff_us;
ch->first_send_ts -= (long long) ch->encoder_format.clockrate * ts_diff_us / 1000000; csch->first_send_ts -= (long long) handler->dest_pt.clock_rate * ts_diff_us / 1000000;
ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); // should be 0 now ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); // should be 0 now
} }
@ -1882,12 +1878,7 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr
if (G_UNLIKELY(!output_ch->encoder)) if (G_UNLIKELY(!output_ch->encoder))
goto skip; goto skip;
// init some vars ch->csch = output_ch->csch;
ch->first_ts = output_ch->first_ts;
ch->first_send_ts = output_ch->first_send_ts;
ch->output_skew = output_ch->output_skew;
ch->first_send = output_ch->first_send;
// the correct output TS is the encoder's FIFO PTS at the start of the DTMF // the correct output TS is the encoder's FIFO PTS at the start of the DTMF
// event. however, we must shift the FIFO PTS forward as the DTMF event goes on // event. however, we must shift the FIFO PTS forward as the DTMF event goes on
@ -1903,7 +1894,7 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr
// roll back TS to start of event // roll back TS to start of event
ts -= ch->last_dtmf_event_ts; ts -= ch->last_dtmf_event_ts;
// adjust to output RTP TS // adjust to output RTP TS
unsigned long packet_ts = ts + output_ch->first_ts; unsigned long packet_ts = ts + output_ch->csch.first_ts;
ilogs(transcoding, LOG_DEBUG, "Scaling DTMF packet timestamp and duration: TS %lu -> %lu " ilogs(transcoding, LOG_DEBUG, "Scaling DTMF packet timestamp and duration: TS %lu -> %lu "
"(%u -> %u)", "(%u -> %u)",
@ -1937,10 +1928,10 @@ skip:
char *buf = malloc(packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM); char *buf = malloc(packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM);
memcpy(buf + sizeof(struct rtp_header), packet->payload->s, packet->payload->len); memcpy(buf + sizeof(struct rtp_header), packet->payload->s, packet->payload->len);
if (packet->bypass_seq) // inject original seq if (packet->bypass_seq) // inject original seq
__output_rtp(mp, ch, packet->handler ? : h, buf, packet->payload->len, packet->ts, codec_output_rtp(mp, &ch->csch, packet->handler ? : h, buf, packet->payload->len, packet->ts,
packet->marker, packet->p.seq, -1, payload_type, ts_delay); packet->marker, packet->p.seq, -1, payload_type, ts_delay);
else // use our own sequencing else // use our own sequencing
__output_rtp(mp, ch, packet->handler ? : h, buf, packet->payload->len, packet->ts, codec_output_rtp(mp, &ch->csch, packet->handler ? : h, buf, packet->payload->len, packet->ts,
packet->marker, -1, 0, payload_type, ts_delay); packet->marker, -1, 0, payload_type, ts_delay);
mp->ssrc_out->parent->seq_diff++; mp->ssrc_out->parent->seq_diff++;
@ -2293,7 +2284,7 @@ void codec_add_dtmf_event(struct codec_ssrc_handler *ch, int code, int level, ui
ilogs(transcoding, LOG_DEBUG, "DTMF event state change: code %i, volume %i, TS %lu", ilogs(transcoding, LOG_DEBUG, "DTMF event state change: code %i, volume %i, TS %lu",
new_ev.code, new_ev.volume, (unsigned long) ts); new_ev.code, new_ev.volume, (unsigned long) ts);
dtmf_dsp_event(&new_ev, &ch->dtmf_state, ch->handler->media, ch->handler->source_pt.clock_rate, dtmf_dsp_event(&new_ev, &ch->dtmf_state, ch->handler->media, ch->handler->source_pt.clock_rate,
ts + ch->first_ts); ts + ch->csch.first_ts);
// add to queue if we're doing PCM -> DTMF event conversion // add to queue if we're doing PCM -> DTMF event conversion
// this does not capture events when doing DTMF delay (dtmf_payload_type == -1) // this does not capture events when doing DTMF delay (dtmf_payload_type == -1)
@ -3506,17 +3497,14 @@ static void __free_ssrc_handler(void *chp) {
dtx_buffer_stop(&ch->dtx_buffer); dtx_buffer_stop(&ch->dtx_buffer);
} }
static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) {
struct codec_ssrc_handler *ch = u1;
struct media_packet *mp = u2;
ilogs(transcoding, LOG_DEBUG, "RTP media successfully encoded: TS %llu, len %i", void packet_encoded_packetize(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp,
(unsigned long long) enc->avpkt->pts, enc->avpkt->size); void (*fn)(encoder_t *, struct codec_ssrc_handler *, struct media_packet *, str *,
char *, unsigned int))
// run this through our packetizer {
AVPacket *in_pkt = enc->avpkt; AVPacket *in_pkt = enc->avpkt;
while (1) { while (true) {
// figure out how big of a buffer we need // figure out how big of a buffer we need
unsigned int payload_len = MAX(MAX(enc->avpkt->size, ch->bytes_per_packet), unsigned int payload_len = MAX(MAX(enc->avpkt->size, ch->bytes_per_packet),
sizeof(struct telephone_event_payload)); sizeof(struct telephone_event_payload));
@ -3541,46 +3529,7 @@ static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) {
ilogs(transcoding, LOG_DEBUG, "Received packet of %zu bytes from packetizer", inout.len); ilogs(transcoding, LOG_DEBUG, "Received packet of %zu bytes from packetizer", inout.len);
// check special payloads fn(enc, ch, mp, &inout, buf, pkt_len);
unsigned int repeats = 0;
int payload_type = -1;
int dtmf_pt = ch->handler->dtmf_payload_type;
if (dtmf_pt == -1)
dtmf_pt = ch->handler->real_dtmf_payload_type;
int is_dtmf = 0;
if (dtmf_pt != -1)
is_dtmf = dtmf_event_payload(&inout, (uint64_t *) &enc->avpkt->pts, enc->avpkt->duration,
&ch->dtmf_event, &ch->dtmf_events);
if (is_dtmf) {
payload_type = dtmf_pt;
if (is_dtmf == 1)
ch->rtp_mark = 1; // DTMF start event
else if (is_dtmf == 3)
repeats = 2; // DTMF end event
}
else {
if (is_silence_event(&inout, &ch->silence_events, enc->avpkt->pts, enc->avpkt->duration))
payload_type = ch->handler->cn_payload_type;
}
// ready to send
do {
char *send_buf = buf;
if (repeats > 0) {
// need to duplicate the payload as __output_rtp consumes it
send_buf = malloc(pkt_len);
memcpy(send_buf, buf, pkt_len);
}
__output_rtp(mp, ch, ch->handler, send_buf, inout.len, ch->first_ts
+ fraction_divl(enc->avpkt->pts, &enc->clockrate_fact),
ch->rtp_mark ? 1 : 0, -1, 0,
payload_type, 0);
mp->ssrc_out->parent->seq_diff++;
ch->rtp_mark = 0;
} while (repeats--);
if (ret == 0) { if (ret == 0) {
// no more to go // no more to go
@ -3590,10 +3539,72 @@ static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) {
// loop around and get more // loop around and get more
in_pkt = NULL; in_pkt = NULL;
} }
}
static void packet_encoded_tx(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp,
str *inout, char *buf, unsigned int pkt_len);
static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) {
struct codec_ssrc_handler *ch = u1;
struct media_packet *mp = u2;
ilogs(transcoding, LOG_DEBUG, "RTP media successfully encoded: TS %llu, len %i",
(unsigned long long) enc->avpkt->pts, enc->avpkt->size);
// run this through our packetizer
packet_encoded_packetize(enc, ch, mp, packet_encoded_tx);
return 0; return 0;
} }
static void packet_encoded_tx(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp,
str *inout, char *buf, unsigned int pkt_len)
{
// check special payloads
unsigned int repeats = 0;
int payload_type = -1;
int dtmf_pt = ch->handler->dtmf_payload_type;
if (dtmf_pt == -1)
dtmf_pt = ch->handler->real_dtmf_payload_type;
int is_dtmf = 0;
if (dtmf_pt != -1)
is_dtmf = dtmf_event_payload(inout, (uint64_t *) &enc->avpkt->pts, enc->avpkt->duration,
&ch->dtmf_event, &ch->dtmf_events);
if (is_dtmf) {
payload_type = dtmf_pt;
if (is_dtmf == 1)
ch->rtp_mark = 1; // DTMF start event
else if (is_dtmf == 3)
repeats = 2; // DTMF end event
}
else {
if (is_silence_event(inout, &ch->silence_events, enc->avpkt->pts, enc->avpkt->duration))
payload_type = ch->handler->cn_payload_type;
}
// ready to send
do {
char *send_buf = buf;
if (repeats > 0) {
// need to duplicate the payload as codec_output_rtp consumes it
send_buf = malloc(pkt_len);
memcpy(send_buf, buf, pkt_len);
}
codec_output_rtp(mp, &ch->csch, ch->handler, send_buf, inout->len, ch->csch.first_ts
+ fraction_divl(enc->avpkt->pts, &enc->clockrate_fact),
ch->rtp_mark ? 1 : 0, -1, 0,
payload_type, 0);
mp->ssrc_out->parent->seq_diff++;
ch->rtp_mark = 0;
} while (repeats--);
}
static void __dtmf_detect(struct codec_ssrc_handler *ch, AVFrame *frame) { static void __dtmf_detect(struct codec_ssrc_handler *ch, AVFrame *frame) {
if (!ch->dtmf_dsp) if (!ch->dtmf_dsp)
return; return;
@ -3649,13 +3660,13 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v
struct codec_ssrc_handler *new_ch = __output_ssrc_handler(ch, mp); struct codec_ssrc_handler *new_ch = __output_ssrc_handler(ch, mp);
if (new_ch != ch) { if (new_ch != ch) {
// copy some essential parameters // copy some essential parameters
if (!new_ch->first_ts) if (!new_ch->csch.first_ts)
new_ch->first_ts = ch->first_ts; new_ch->csch.first_ts = ch->csch.first_ts;
if (decoder->def->supplemental) { if (decoder->def->supplemental) {
// supp codecs return bogus timestamps. Adjust the frame's TS to be in // supp codecs return bogus timestamps. Adjust the frame's TS to be in
// line with the primary decoder // line with the primary decoder
frame->pts -= new_ch->first_ts; frame->pts -= new_ch->csch.first_ts;
} }
ch = new_ch; ch = new_ch;
@ -3693,7 +3704,7 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v
if (mp->media_out) if (mp->media_out)
ch->encoder->callback = mp->media_out->encoder_callback; ch->encoder->callback = mp->media_out->encoder_callback;
uint32_t ts = frame->pts + ch->first_ts; uint32_t ts = frame->pts + ch->csch.first_ts;
__buffer_delay_frame(h->input_handler ? h->input_handler->delay_buffer : h->delay_buffer, __buffer_delay_frame(h->input_handler ? h->input_handler->delay_buffer : h->delay_buffer,
ch, input_func, frame, mp, ts); ch, input_func, frame, mp, ts);
frame = NULL; // consumed frame = NULL; // consumed
@ -3728,8 +3739,8 @@ static int packet_decode(struct codec_ssrc_handler *ch, struct codec_ssrc_handle
{ {
int ret = 0; int ret = 0;
if (!ch->first_ts) if (!ch->csch.first_ts)
ch->first_ts = packet->ts; ch->csch.first_ts = packet->ts;
if (ch->decoder->def->dtmf) { if (ch->decoder->def->dtmf) {
if (packet_dtmf_event(ch, input_ch, packet, mp)) if (packet_dtmf_event(ch, input_ch, packet, mp))

@ -76,6 +76,14 @@ struct codec_packet {
void (*free_func)(void *); void (*free_func)(void *);
}; };
struct codec_scheduler {
unsigned long first_ts; // for output TS scaling
unsigned long last_ts; // to detect input lag and handle lost packets
struct timeval first_send;
unsigned long first_send_ts;
long output_skew;
};
void codecs_init(void); void codecs_init(void);
void codecs_cleanup(void); void codecs_cleanup(void);
@ -142,6 +150,18 @@ uint64_t codec_decoder_unskip_pts(struct codec_ssrc_handler *ch);
void codec_tracker_update(struct codec_store *); void codec_tracker_update(struct codec_store *);
void codec_handlers_stop(GQueue *); void codec_handlers_stop(GQueue *);
void packet_encoded_packetize(encoder_t *enc, struct codec_ssrc_handler *ch, struct media_packet *mp,
void (*fn)(encoder_t *, struct codec_ssrc_handler *, struct media_packet *, str *,
char *, unsigned int));
void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *,
struct codec_handler *handler, // normally == ch->handler except for DTMF
char *buf, // malloc'd, room for rtp_header + filled-in payload
unsigned int payload_len,
unsigned long payload_ts,
int marker, int seq, int seq_inc, int payload_type,
unsigned long ts_delay);
#else #else
INLINE bool codec_handlers_update(struct call_media *receiver, struct call_media *sink, INLINE bool codec_handlers_update(struct call_media *receiver, struct call_media *sink,

Loading…
Cancel
Save