TT#98901 convert from static DTX delay to dynamic buffer queue

Change-Id: Ib6bfdfb9d1967263dc8acb48a812d7e75f9c239b
pull/1232/head
Richard Fuchs 4 years ago
parent 42dd21d1ec
commit 7dc55e6378

@ -75,16 +75,16 @@ struct dtx_buffer {
int ptime; // ms per packet
int tspp; // timestamp increment per packet
struct call *call;
unsigned long ts;
unsigned int ts_seq; // for subsequent packets with same TS, e.g. DTMF
GQueue packets;
struct media_packet last_mp;
unsigned long head_ts;
uint32_t ssrc;
struct timerthread_queue_entry ttq_entry;
time_t start;
};
struct dtx_entry {
struct timerthread_queue_entry ttq_entry;
struct dtx_packet {
struct transcode_packet *packet;
struct media_packet mp;
unsigned long ts;
void *ssrc_ptr; // opaque pointer, doesn't hold a reference
struct codec_ssrc_handler *decoder_handler; // holds reference
int (*func)(struct codec_ssrc_handler *ch, struct transcode_packet *packet, struct media_packet *mp);
};
@ -2377,18 +2377,6 @@ static int codec_decoder_event(enum codec_event event, void *ptr, void *data) {
return 0;
}
static void __dtx_add_callback(struct dtx_buffer *dtxb, const struct timeval *base, unsigned int offset,
const struct media_packet *mp, unsigned long ts, int seq_add, void *ssrc_ptr)
{
struct dtx_entry *dtxe = g_slice_alloc0(sizeof(*dtxe));
dtxe->ttq_entry.when = *base;
timeval_add_usec(&dtxe->ttq_entry.when, offset);
dtxe->ts = ts;
media_packet_copy(&dtxe->mp, mp);
dtxe->mp.rtp->seq_num += htons(seq_add);
dtxe->ssrc_ptr = ssrc_ptr;
timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry);
}
// consumes `packet` if buffered (returns 1)
static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *decoder_handler,
struct transcode_packet *packet, struct media_packet *mp,
@ -2398,135 +2386,227 @@ static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *deco
if (!dtxb || !mp->sfd || !mp->ssrc_in || !mp->ssrc_out)
return 0;
ilogs(transcoding, LOG_DEBUG, "Adding packet to DTX buffer");
unsigned long ts = packet->ts;
// allocate packet object
struct dtx_packet *dtxp = g_slice_alloc0(sizeof(*dtxp));
dtxp->packet = packet;
dtxp->func = func;
if (decoder_handler)
dtxp->decoder_handler = obj_get(&decoder_handler->h);
media_packet_copy(&dtxp->mp, mp);
// add to processing queue
mutex_lock(&dtxb->lock);
if (ts != dtxb->ts) {
dtxb->ts = ts;
dtxb->ts_seq = 0;
}
else
dtxb->ts_seq++;
unsigned int ts_seq = dtxb->ts_seq;
dtxb->start = rtpe_now.tv_sec;
mutex_unlock(&dtxb->lock);
g_queue_push_tail(&dtxb->packets, dtxp);
ilogs(dtx, LOG_DEBUG, "Adding packet (TS %lu) to DTX buffer; now %i packets in DTX queue",
ts, dtxb->packets.length);
struct dtx_entry *dtxe = g_slice_alloc0(sizeof(*dtxe));
dtxe->ttq_entry.when = rtpe_now;
timeval_add_usec(&dtxe->ttq_entry.when, rtpe_config.dtx_delay * 1000);
dtxe->packet = packet;
dtxe->func = func;
if (decoder_handler)
dtxe->decoder_handler = obj_get(&decoder_handler->h);
media_packet_copy(&dtxe->mp, mp);
timerthread_queue_push(&dtxb->ttq, &dtxe->ttq_entry);
// packet now consumed
packet = NULL;
// schedule timer if not running yet
if (!dtxb->ttq_entry.when.tv_sec) {
if (!dtxb->ssrc)
dtxb->ssrc = mp->ssrc_in->parent->h.ssrc;
dtxb->ttq_entry.when = mp->tv;
timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_delay * 1000);
timerthread_queue_push(&dtxb->ttq, &dtxb->ttq_entry);
}
__dtx_add_callback(dtxb, &rtpe_now, (rtpe_config.dtx_delay + dtxb->ptime) * 1000, mp, ts + ts_seq, 1,
mp->stream->ssrc_in);
mutex_unlock(&dtxb->lock);
return 1;
}
static void __dtx_entry_free(void *p) {
struct dtx_entry *dtxe = p;
if (dtxe->packet)
__transcode_packet_free(dtxe->packet);
media_packet_release(&dtxe->mp);
if (dtxe->decoder_handler)
obj_put(&dtxe->decoder_handler->h);
g_slice_free1(sizeof(*dtxe), dtxe);
static void dtx_packet_free(struct dtx_packet *dtxp) {
if (dtxp->packet)
__transcode_packet_free(dtxp->packet);
media_packet_release(&dtxp->mp);
if (dtxp->decoder_handler)
obj_put(&dtxp->decoder_handler->h);
g_slice_free1(sizeof(*dtxp), dtxp);
}
static void __dtx_send_later(struct timerthread_queue *ttq, void *p) {
struct dtx_buffer *dtxb = (void *) ttq;
struct dtx_entry *dtxe = p;
struct transcode_packet *packet = dtxe->packet;
struct media_packet *mp = &dtxe->mp;
struct packet_stream *ps = mp->stream;
int ret = 0;
struct media_packet mp_copy = {0,};
int ret = 0, discard = 0;
unsigned long ts;
int p_left = 0;
long tv_diff = -1, ts_diff = 0;
mutex_lock(&dtxb->lock);
struct codec_ssrc_handler *ch = dtxe->decoder_handler ? obj_get(&dtxe->decoder_handler->h) : NULL;
// do we have a packet?
struct dtx_packet *dtxp = g_queue_peek_head(&dtxb->packets);
if (dtxp) {
// inspect head packet and check TS, see if it's ready to be decoded
ts = dtxp->packet->ts;
ts_diff = ts - dtxb->head_ts;
if (!dtxb->head_ts)
; // first packet
else if (ts_diff < 0)
ilogs(dtx, LOG_DEBUG, "DTX timestamp reset (from %lu to %lu)", dtxb->head_ts, ts);
else if (ts_diff > 100000) // arbitrary value
ilogs(dtx, LOG_DEBUG, "DTX timestamp reset (from %lu to %lu)", dtxb->head_ts, ts);
else if (ts_diff > dtxb->tspp) {
ilogs(dtx, LOG_DEBUG, "First packet in DTX buffer not ready yet (packet TS %lu, "
"DTX TS %lu, diff %li)",
ts, dtxb->head_ts, ts_diff);
dtxp = NULL;
}
// go or no go?
if (dtxp)
g_queue_pop_head(&dtxb->packets);
}
p_left = dtxb->packets.length;
if (dtxp) {
// save the `mp` for possible future DTX
media_packet_release(&dtxb->last_mp);
media_packet_copy(&dtxb->last_mp, &dtxp->mp);
media_packet_copy(&mp_copy, &dtxp->mp);
ts_diff = dtxp->packet->ts - dtxb->head_ts;
ts = dtxb->head_ts = dtxp->packet->ts;
tv_diff = timeval_diff(&rtpe_now, &mp_copy.tv);
}
else {
// no packet ready to decode: DTX
media_packet_copy(&mp_copy, &dtxb->last_mp);
// shift forward TS
dtxb->head_ts += dtxb->tspp;
ts = dtxb->head_ts;
}
struct packet_stream *ps = mp_copy.stream;
log_info_stream_fd(mp_copy.sfd);
// copy out other fields so we can unlock
struct codec_ssrc_handler *ch = (dtxp && dtxp->decoder_handler) ? obj_get(&dtxp->decoder_handler->h)
: NULL;
if (!ch && dtxb->csh)
ch = obj_get(&dtxb->csh->h);
struct call *call = dtxb->call ? obj_get(dtxb->call) : NULL;
mutex_unlock(&dtxb->lock);
if (!call || !ch)
if (!call || !ch || !ps || !ps->ssrc_in
|| dtxb->ssrc != ps->ssrc_in->parent->h.ssrc
|| dtxb->ttq_entry.when.tv_sec == 0) {
// shut down or SSRC change
ilogs(dtx, LOG_DEBUG, "DTX buffer for %lx has been shut down", (unsigned long) dtxb->ssrc);
dtxb->ttq_entry.when.tv_sec = 0;
mutex_unlock(&dtxb->lock);
goto out; // shut down
}
// schedule next run
timeval_add_usec(&dtxb->ttq_entry.when, dtxb->ptime * 1000);
// handle timer drifts
if (dtxp && tv_diff < rtpe_config.dtx_delay * 1000) {
// timer underflow
ilogs(dtx, LOG_DEBUG, "Packet reception time has caught up with DTX timer "
"(%li ms < %i ms), "
"pushing DTX timer forward my %i ms",
tv_diff / 1000, rtpe_config.dtx_delay, rtpe_config.dtx_shift);
timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * 1000);
}
else if (dtxp && ts_diff < dtxb->tspp) {
// TS underflow
// special case: DTMF timestamps are static
if (ts_diff == 0 && ch->handler->source_pt.codec_def->dtmf) {
;
}
else {
ilogs(dtx, LOG_DEBUG, "Packet timestamps have caught up with DTX timer "
"(TS %lu, diff %li), "
"pushing DTX timer forward by %i ms and discarding packet",
ts, ts_diff, rtpe_config.dtx_shift);
timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * 1000);
discard = 1;
}
}
else if (dtxp && dtxb->packets.length >= rtpe_config.dtx_buffer) {
// inspect TS is most recent packet
struct dtx_packet *dtxp_last = g_queue_peek_tail(&dtxb->packets);
ts_diff = dtxp_last->packet->ts - ts;
long long ts_diff_us = (long long) ts_diff * 1000000 / ch->handler->source_pt.clock_rate;
if (ts_diff_us >= rtpe_config.dtx_lag * 1000) {
// overflow
ilogs(dtx, LOG_DEBUG, "DTX timer queue overflowing (%i packets in queue, "
"%lli ms delay), speeding up DTX timer by %i ms",
dtxb->packets.length, ts_diff_us / 1000, rtpe_config.dtx_shift);
timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * -1000);
}
}
log_info_stream_fd(mp->sfd);
timerthread_queue_push(&dtxb->ttq, &dtxb->ttq_entry);
mutex_unlock(&dtxb->lock);
rwlock_lock_r(&call->master_lock);
__ssrc_lock_both(mp);
__ssrc_lock_both(&mp_copy);
if (packet) {
ilogs(transcoding, LOG_DEBUG, "Decoding DTX-buffered RTP packet (TS %lu) now", packet->ts);
if (dtxp) {
if (!discard) {
ilogs(dtx, LOG_DEBUG, "Decoding DTX-buffered RTP packet (TS %lu) now; "
"%i packets left in queue", ts, p_left);
ret = dtxe->func(ch, packet, &dtxe->mp);
if (ret)
ilogs(transcoding, LOG_WARN | LOG_FLAG_LIMIT, "Decoder error while processing buffered RTP packet");
ret = dtxp->func(ch, dtxp->packet, &mp_copy);
if (ret)
ilogs(dtx, LOG_WARN | LOG_FLAG_LIMIT,
"Decoder error while processing buffered RTP packet");
}
}
else {
unsigned long dtxe_ts = dtxe->ts;
mutex_lock(&dtxb->lock);
unsigned int diff = rtpe_now.tv_sec - dtxb->start;
unsigned long dtxb_ts = dtxb->ts + dtxb->ts_seq;
void *ssrc_ptr = dtxe->mp.stream->ssrc_in;
if (dtxe_ts == dtxb_ts
&& ssrc_ptr == dtxe->ssrc_ptr
&& (rtpe_config.max_dtx <= 0 || diff < rtpe_config.max_dtx))
{
ilogs(transcoding, LOG_DEBUG, "RTP media for TS %lu+ missing, triggering DTX",
dtxe_ts);
dtxb_ts += dtxb->tspp;
dtxb_ts -= dtxb->ts_seq;
dtxe_ts = dtxb_ts;
dtxb->ts = dtxb_ts;
dtxb->ts_seq = 0;
mutex_unlock(&dtxb->lock);
ret = decoder_lost_packet(ch->decoder, dtxe_ts,
ch->handler->packet_decoded, ch, &dtxe->mp);
if (ret)
ilogs(transcoding, LOG_WARN | LOG_FLAG_LIMIT, "Decoder error handling DTX/lost packet");
__dtx_add_callback(dtxb, &dtxe->ttq_entry.when, dtxb->ptime * 1000, mp, dtxe_ts, 0, ssrc_ptr);
if (rtpe_config.max_dtx <= 0 || diff < rtpe_config.max_dtx) {
ilogs(dtx, LOG_DEBUG, "RTP media for TS %lu missing, triggering DTX", ts);
// synthetic packet
mp_copy.rtp->seq_num += htons(1);
ret = decoder_lost_packet(ch->decoder, ts,
ch->handler->packet_decoded, ch, &mp_copy);
if (ret)
ilogs(dtx, LOG_WARN | LOG_FLAG_LIMIT,
"Decoder error handling DTX/lost packet");
}
else
mutex_unlock(&dtxb->lock);
ilogs(dtx, LOG_DEBUG, "Stopping DTX at TS %lu", ts);
}
__ssrc_unlock_both(mp);
__ssrc_unlock_both(&mp_copy);
if (mp->packets_out.length && ret == 0) {
if (mp_copy.packets_out.length && ret == 0) {
struct packet_stream *sink = ps->rtp_sink;
if (!sink)
media_socket_dequeue(mp, NULL); // just free
media_socket_dequeue(&mp_copy, NULL); // just free
else {
if (ps->handler && media_packet_encrypt(ps->handler->out->rtp_crypt, sink, mp))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
if (ps->handler && media_packet_encrypt(ps->handler->out->rtp_crypt, sink, &mp_copy))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
if (media_socket_dequeue(mp, sink))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error sending buffered media to RTP sink");
if (media_socket_dequeue(&mp_copy, sink))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
}
rwlock_unlock_r(&call->master_lock);
obj_put(call);
obj_put(&ch->h);
out:
__dtx_entry_free(dtxe);
if (call)
obj_put(call);
if (ch)
obj_put(&ch->h);
if (dtxp)
dtx_packet_free(dtxp);
media_packet_release(&mp_copy);
log_info_clear();
}
static void __dtx_shutdown(struct dtx_buffer *dtxb) {
@ -2536,10 +2616,12 @@ static void __dtx_shutdown(struct dtx_buffer *dtxb) {
if (dtxb->call)
obj_put(dtxb->call);
dtxb->call = NULL;
g_queue_clear_full(&dtxb->packets, (GDestroyNotify) dtx_packet_free);
}
static void __dtx_free(void *p) {
struct dtx_buffer *dtxb = p;
__dtx_shutdown(dtxb);
media_packet_release(&dtxb->last_mp);
mutex_destroy(&dtxb->lock);
}
static void __dtx_setup(struct codec_ssrc_handler *ch) {
@ -2551,14 +2633,14 @@ static void __dtx_setup(struct codec_ssrc_handler *ch) {
struct dtx_buffer *dtx =
ch->dtx_buffer = timerthread_queue_new("dtx_buffer", sizeof(*ch->dtx_buffer),
&codec_timers_thread, NULL, __dtx_send_later, __dtx_free, __dtx_entry_free);
&codec_timers_thread, NULL, __dtx_send_later, __dtx_free, NULL);
dtx->csh = obj_get(&ch->h);
dtx->call = obj_get(ch->handler->media->call);
mutex_init(&dtx->lock);
dtx->ptime = ch->ptime;
if (!dtx->ptime)
dtx->ptime = 20; // XXX ?
dtx->tspp = dtx->ptime * ch->handler->source_pt.clock_rate / 1000;
dtx->ptime = 20; // XXX should be replaced with length of actual decoded packet
dtx->tspp = dtx->ptime * ch->handler->source_pt.clock_rate / 1000; // XXX ditto
}
static void __ssrc_handler_stop(void *p) {
struct codec_ssrc_handler *ch = p;
@ -3862,6 +3944,7 @@ void codec_rtp_payload_types(struct call_media *media, struct call_media *other_
void codecs_init(void) {
#ifdef WITH_TRANSCODING
// XXX not real queue timer - unify to simple timerthread
timerthread_init(&codec_timers_thread, timerthread_queue_run);
rtcp_timer_queue = timerthread_queue_new("rtcp_timer_queue", sizeof(*rtcp_timer_queue),
&codec_timers_thread, NULL, __rtcp_timer_run, NULL, __rtcp_timer_free);
@ -3869,6 +3952,7 @@ void codecs_init(void) {
}
void codecs_cleanup(void) {
#ifdef WITH_TRANSCODING
obj_put(&rtcp_timer_queue->ttq.tt_obj);
timerthread_free(&codec_timers_thread);
#endif
}

@ -10,3 +10,4 @@ ll(srtp, "SRTP encryption and decryption")
ll(internals, "Noisy low-level internals")
ll(http, "HTTP, HTTPS, Websockets")
ll(control, "Control protocols including SDP exchanges, CLI")
ll(dtx, "DTX timer/buffer")

@ -81,6 +81,9 @@ struct rtpengine_config rtpe_config = {
.dtls_rsa_key_size = 2048,
.dtls_signature = 256,
.max_dtx = 30,
.dtx_shift = 5,
.dtx_buffer = 10,
.dtx_lag = 100,
.common = {
.log_levels = {
[log_level_index_internals] = -1,
@ -483,6 +486,9 @@ static void options(int *argc, char ***argv) {
#ifdef WITH_TRANSCODING
{ "dtx-delay", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_delay, "Delay in milliseconds to trigger DTX handling","INT"},
{ "max-dtx", 0,0, G_OPTION_ARG_INT, &rtpe_config.max_dtx, "Maximum duration of DTX handling", "INT"},
{ "dtx-buffer", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_buffer,"Maxmium number of packets held in DTX buffer", "INT"},
{ "dtx-lag", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_lag, "Maxmium time span in milliseconds held in DTX buffer", "INT"},
{ "dtx-shift", 0,0, G_OPTION_ARG_INT, &rtpe_config.dtx_shift, "Length of time (in ms) to shift DTX buffer after over/underflow", "INT"},
{ "silence-detect",0,0, G_OPTION_ARG_DOUBLE, &silence_detect, "Audio level threshold in percent for silence detection","FLOAT"},
{ "cn-payload",0,0, G_OPTION_ARG_STRING_ARRAY,&cn_payload, "Comfort noise parameters to replace silence with","INT INT INT ..."},
{ "reorder-codecs",0,0, G_OPTION_ARG_NONE, &rtpe_config.reorder_codecs,"Reorder answer codecs based on sender preference",NULL},

@ -788,8 +788,7 @@ only. When enabled, delays processing of received packets for the specified
time (much like a jitter buffer) in order to trigger DTX handling when a
transmission gap occurs. The decoder is then instructed to fill in the missing
time during a transmission gap, for example by generating comfort noise. The
delay should be configured to just slightly more than the expected incoming
jitter.
delay should be configured to be higher than the expected incoming jitter.
=item B<--max-dtx=>I<INT>
@ -798,6 +797,26 @@ received within this time frame, then DTX processing will stop. Can be set to
zero or negative to disable and keep DTX processing on indefinitely. Defaults
to 30 seconds.
=item B<--dtx-buffer=>I<INT>
=item B<--dtx-lag=>I<INT>
These two options together control the maximum number of packets and amount of
audio that is allowed to be held in the DTX buffer. The B<dtx-buffer> option
limits the number of packets held in the DTX buffer, while the B<dtx-lag>
option limits the amount of audio (in milliseconds) to be held in the DTX
buffer. A DTX buffer overflow is declared when both limits are exceeded, in
which case DTX processing is sped up by B<dtx-shift> milliseconds.
The defauls are 10 packets and 100 milliseconds.
=item B<--dtx-shift=>I<INT>
Amount of time in milliseconds that DTX processing is shifted forward (sped up)
or backwards (delayed) in case of a DTX buffer overflow or underflow. An
underflow occurs when RTP packets are received slower than expected, while an
overflow occurs when packets are received faster than expected.
=item B<--silence-detect=>I<FLOAT>
Enable silence detection and specify threshold in percent. This option is

@ -111,6 +111,9 @@ struct rtpengine_config {
int http_threads;
int dtx_delay;
int max_dtx;
int dtx_buffer;
int dtx_lag;
int dtx_shift;
double silence_detect_double;
uint32_t silence_detect_int;
str cn_payload;

@ -10,3 +10,4 @@ ll(srtp, "SRTP encryption and decryption")
ll(internals, "Noisy low-level internals")
ll(http, "HTTP, HTTPS, Websockets")
ll(control, "Control protocols including SDP exchanges, CLI")
ll(dtx, "DTX timer/buffer")

Loading…
Cancel
Save