diff --git a/daemon/codec.c b/daemon/codec.c index 74300db15..f5973ec7f 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -113,6 +113,25 @@ struct dtx_packet { struct transcode_packet *packet, struct media_packet *mp); }; +typedef int (*encoder_input_func_t)(encoder_t *enc, AVFrame *frame, + int (*callback)(encoder_t *, void *u1, void *u2), void *u1, void *u2); + +struct delay_buffer { + struct codec_timer ct; + struct call *call; + struct codec_handler *handler; + mutex_t lock; + unsigned int delay; + GQueue frames; // in reverse order: newest packet first, oldest last +}; +struct delay_frame { + AVFrame *frame; + struct media_packet mp; + encoder_input_func_t func; + struct codec_handler *handler; + struct codec_ssrc_handler *ch; +}; + struct silence_event { uint64_t start; uint64_t end; @@ -214,6 +233,12 @@ static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *ch, static void __dtx_shutdown(struct dtx_buffer *dtxb); static struct codec_handler *__input_handler(struct codec_handler *h, struct media_packet *mp); +static void __delay_frame_process(struct delay_buffer *, struct delay_frame *dframe); +static void __delay_buffer_setup(struct delay_buffer **dbufp, + struct codec_handler *h, struct call *call, unsigned int delay); +static void __delay_buffer_shutdown(struct delay_buffer *dbuf, bool); +static void delay_buffer_stop(struct delay_buffer **pcmbp); + static struct codec_handler codec_handler_stub_ssrc = { .source_pt.payload_type = -1, @@ -228,6 +253,10 @@ static struct codec_handler codec_handler_stub_ssrc = { static void __handler_shutdown(struct codec_handler *handler) { ssrc_hash_foreach(handler->ssrc_hash, __ssrc_handler_stop, NULL); free_ssrc_hash(&handler->ssrc_hash); + if (handler->delay_buffer) { + __delay_buffer_shutdown(handler->delay_buffer, true); + delay_buffer_stop(&handler->delay_buffer); + } if (handler->ssrc_handler) obj_put(&handler->ssrc_handler->h); @@ -347,7 +376,7 @@ static void __make_transcoder(struct codec_handler *handler, struct rtp_payload_ STR_FMT(&dest->encoding_with_params), dest->payload_type); - goto check_output; + goto no_handler_reset; reset: __handler_shutdown(handler); @@ -396,7 +425,8 @@ reset: ssrc_hash_foreach(handler->media->monologue->ssrc_hash, __reset_sequencer, NULL); -check_output:; +no_handler_reset: + __delay_buffer_setup(&handler->delay_buffer, handler, handler->media->call, handler->media->buffer_delay); // check if we have multiple decoders transcoding to the same output PT struct codec_handler *output_handler = NULL; if (output_transcoders) @@ -2082,6 +2112,59 @@ static int codec_decoder_event(enum codec_event event, void *ptr, void *data) { return 0; } +// must be locked +static void __delay_buffer_schedule(struct delay_buffer *dbuf) { + if (dbuf->ct.next.tv_sec) // already scheduled? + return; + + struct delay_frame *dframe = g_queue_peek_tail(&dbuf->frames); + if (!dframe) + return; + + struct timeval to_run = dframe->mp.tv; + timeval_add_usec(&to_run, dbuf->delay * 1000); + dbuf->ct.next = to_run; + timerthread_obj_schedule_abs(&dbuf->ct.tt_obj, &dbuf->ct.next); +} + +static bool __buffer_delay_do_direct(struct delay_buffer *dbuf) { + if (!dbuf) + return true; + LOCK(&dbuf->lock); + if (dbuf->delay == 0 && dbuf->frames.length == 0) + return true; + return false; +} + +static int delay_frame_cmp(const void *A, const void *B, void *ptr) { + const struct delay_frame *a = A, *b = B; + return -1 * timeval_cmp(&a->mp.tv, &b->mp.tv); +} + +// consumes frame +static void __buffer_delay_frame(struct delay_buffer *dbuf, struct codec_ssrc_handler *ch, + encoder_input_func_t input_func, AVFrame *frame, struct media_packet *mp) +{ + if (__buffer_delay_do_direct(dbuf)) { + // input now + input_func(ch->encoder, frame, ch->handler->packet_encoded, ch, mp); + av_frame_free(&frame); + return; + } + + struct delay_frame *dframe = g_slice_alloc0(sizeof(*dframe)); + dframe->frame = frame; + dframe->func = input_func; + dframe->ch = obj_get(&ch->h); + media_packet_copy(&dframe->mp, mp); + + LOCK(&dbuf->lock); + g_queue_insert_sorted(&dbuf->frames, dframe, delay_frame_cmp, NULL); + + __delay_buffer_schedule(dbuf); + +} + // consumes `packet` if buffered (returns 1) static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *decoder_handler, struct codec_ssrc_handler *input_handler, @@ -2131,6 +2214,37 @@ static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *deco return 1; } +static void delay_frame_free(struct delay_frame *dframe) { + media_packet_release(&dframe->mp); + av_frame_free(&dframe->frame); + if (dframe->ch) + obj_put(&dframe->ch->h); + g_slice_free1(sizeof(*dframe), dframe); +} +static void delay_frame_send(struct delay_frame *dframe) { + // XXX this should be unified with other instances of the same code + struct sink_handler *sh = &dframe->mp.sink; + struct packet_stream *sink = sh->sink; + + if (!sink) + media_socket_dequeue(&dframe->mp, NULL); // just free + else { + if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &dframe->mp)) + ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media"); + + mutex_lock(&sink->out_lock); + if (media_socket_dequeue(&dframe->mp, sink)) + ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, + "Error sending buffered media to RTP sink"); + mutex_unlock(&sink->out_lock); + } +} +static void delay_frame_flush(struct delay_buffer *dbuf, struct delay_frame *dframe) { + // call is locked in W here + __delay_frame_process(dbuf, dframe); + delay_frame_send(dframe); + delay_frame_free(dframe); +} static void dtx_packet_free(struct dtx_packet *dtxp) { if (dtxp->packet) __transcode_packet_free(dtxp->packet); @@ -2141,9 +2255,88 @@ static void dtx_packet_free(struct dtx_packet *dtxp) { obj_put(&dtxp->input_handler->h); g_slice_free1(sizeof(*dtxp), dtxp); } +static void delay_buffer_stop(struct delay_buffer **pcmbp) { + codec_timer_stop((struct codec_timer **) pcmbp); +} static void dtx_buffer_stop(struct dtx_buffer **dtxbp) { codec_timer_stop((struct codec_timer **) dtxbp); } + + +static void __delay_frame_process(struct delay_buffer *dbuf, struct delay_frame *dframe) { + if (dframe->mp.rtp) { + // adjust output seq num. pushing packets into the delay buffer looks as if they + // were consumed and resulted in a decreased seq_diff. we need to adjust the + // perceived seq num forward to compensate for this. each entry in the buffer + // accounts for one packet that was only delayed and not consumed: + //LOCK(&dbuf->lock); // XXX needed? + dframe->mp.rtp->seq_num = htons(ntohs(dframe->mp.rtp->seq_num) + dbuf->frames.length + 1); + } + + struct codec_ssrc_handler *csh = dframe->ch; + + if (csh && csh->handler && csh->encoder) { + dframe->func(csh->encoder, dframe->frame, csh->handler->packet_encoded, + csh, &dframe->mp); + } + else + ilog(LOG_ERR | LOG_FLAG_LIMIT, "Delay buffer bug"); +} +static void __delay_send_later(struct codec_timer *ct) { + struct delay_buffer *dbuf = (void *) ct; + + struct call *call = NULL; + struct delay_frame *dframe = NULL; + + { + // short-term lock - copy out references to all relevant objects + LOCK(&dbuf->lock); + + call = dbuf->call; + if (call) + obj_get(call); + + dframe = g_queue_pop_tail(&dbuf->frames); + } + + if (!call) // do nothing + goto out; + + // we can now do a top-down lock + rwlock_lock_r(&call->master_lock); + log_info_call(call); + + if (!dframe) + goto out; + + __ssrc_lock_both(&dframe->mp); + + __delay_frame_process(dbuf, dframe); + + __ssrc_unlock_both(&dframe->mp); + + delay_frame_send(dframe); + + { + // schedule next run + LOCK(&dbuf->lock); + dbuf->ct.next.tv_sec = 0; + __delay_buffer_schedule(dbuf); + } + +out: + // release all references + if (call) { + rwlock_unlock_r(&call->master_lock); + obj_put(call); + } + if (dframe) + delay_frame_free(dframe); + + log_info_clear(); +} + + static bool __dtx_drift_shift(struct dtx_buffer *dtxb, struct dtx_packet *dtxp, unsigned long ts, long tv_diff, long ts_diff, struct codec_ssrc_handler *ch) @@ -2465,12 +2658,30 @@ static void __dtx_shutdown(struct dtx_buffer *dtxb) { dtxb->call = NULL; g_queue_clear_full(&dtxb->packets, (GDestroyNotify) dtx_packet_free); } +static void __delay_buffer_shutdown(struct delay_buffer *dbuf, bool flush) { + if (flush) { + while (dbuf->frames.length) { + struct delay_frame *dframe = g_queue_pop_tail(&dbuf->frames); + delay_frame_flush(dbuf, dframe); + } + } + else + g_queue_clear_full(&dbuf->frames, (GDestroyNotify) delay_frame_free); + if (dbuf->call) + obj_put(dbuf->call); + dbuf->call = NULL; +} static void __dtx_free(void *p) { struct dtx_buffer *dtxb = p; __dtx_shutdown(dtxb); media_packet_release(&dtxb->last_mp); mutex_destroy(&dtxb->lock); } +static void __delay_buffer_free(void *p) { + struct delay_buffer *dbuf = p; + __delay_buffer_shutdown(dbuf, false); + mutex_destroy(&dbuf->lock); +} static void __dtx_setup(struct codec_ssrc_handler *ch) { if (!decoder_has_dtx(ch->decoder) || ch->dtx_buffer) return; @@ -2495,6 +2706,30 @@ static void __dtx_setup(struct codec_ssrc_handler *ch) { dtx->clockrate = ch->handler->source_pt.clock_rate; dtx->tspp = dtx->ptime * dtx->clockrate / 1000; } +static void __delay_buffer_setup(struct delay_buffer **dbufp, + struct codec_handler *h, struct call *call, unsigned int delay) +{ + if (!dbufp) + return; + + struct delay_buffer *dbuf = *dbufp; + + if (dbuf) { + // update options + dbuf->delay = delay; + return; + } + if (!delay) + return; + + dbuf = obj_alloc0("delay_buffer", sizeof(*dbuf), __delay_buffer_free); + dbuf->ct.tt_obj.tt = &codec_timers_thread; + dbuf->ct.func = __delay_send_later; + dbuf->handler = h; + dbuf->call = obj_get(call); + dbuf->delay = delay; + mutex_init(&dbuf->lock); +} static void __ssrc_handler_stop(void *p, void *dummy) { struct codec_ssrc_handler *ch = p; if (ch->dtx_buffer) { @@ -2508,6 +2743,14 @@ static void __ssrc_handler_stop(void *p, void *dummy) { void codec_handlers_stop(GQueue *q) { for (GList *l = q->head; l; l = l->next) { struct codec_handler *h = l->data; + + if (h->delay_buffer) { + mutex_lock(&h->delay_buffer->lock); + __delay_buffer_shutdown(h->delay_buffer, true); + mutex_unlock(&h->delay_buffer->lock); + + delay_buffer_stop(&h->delay_buffer); + } ssrc_hash_foreach(h->ssrc_hash, __ssrc_handler_stop, NULL); } } @@ -2841,8 +3084,7 @@ static void __dtmf_detect(struct codec_ssrc_handler *ch, AVFrame *frame) { } static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, void *u2, - int (*input_func)(encoder_t *enc, AVFrame *frame, - int (*callback)(encoder_t *, void *u1, void *u2), void *u1, void *u2)) + encoder_input_func_t input_func) { struct codec_ssrc_handler *ch = u1; struct media_packet *mp = u2; @@ -2892,7 +3134,8 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v if (mp->media_out) ch->encoder->codec_options.amr.cmr = mp->media_out->u.amr.cmr; - input_func(ch->encoder, frame, h->packet_encoded, ch, mp); + __buffer_delay_frame(h->delay_buffer, ch, input_func, frame, mp); + frame = NULL; // consumed discard: av_frame_free(&frame); diff --git a/include/call.h b/include/call.h index 73748aa37..67cacec72 100644 --- a/include/call.h +++ b/include/call.h @@ -384,9 +384,10 @@ struct call_media { struct t38_gateway *t38_gateway; struct codec_handler *t38_handler; + unsigned int buffer_delay; + mutex_t dtmf_lock; unsigned long dtmf_ts; // TS of last processed end event - #ifdef WITH_TRANSCODING union { struct { diff --git a/include/codec.h b/include/codec.h index dba803729..6feb27a32 100644 --- a/include/codec.h +++ b/include/codec.h @@ -26,6 +26,7 @@ struct mqtt_timer; struct call; struct codec_store; struct call_monologue; +struct delay_buffer; typedef int codec_handler_func(struct codec_handler *, struct media_packet *); @@ -56,6 +57,7 @@ struct codec_handler { struct codec_ssrc_handler *ssrc_handler; // for DTMF injection struct codec_handler *dtmf_injector; + struct delay_buffer *delay_buffer; // stats entry char *stats_chain;