TT#136952 add new delay_buffer struct

Change-Id: I8b3b663d3688f1872b059791381fc8e2034c7293
pull/1430/head
Richard Fuchs 4 years ago
parent 85ec0faf9e
commit 8e8e78afb1

@ -113,6 +113,25 @@ struct dtx_packet {
struct transcode_packet *packet, struct media_packet *mp);
};
typedef int (*encoder_input_func_t)(encoder_t *enc, AVFrame *frame,
int (*callback)(encoder_t *, void *u1, void *u2), void *u1, void *u2);
struct delay_buffer {
struct codec_timer ct;
struct call *call;
struct codec_handler *handler;
mutex_t lock;
unsigned int delay;
GQueue frames; // in reverse order: newest packet first, oldest last
};
struct delay_frame {
AVFrame *frame;
struct media_packet mp;
encoder_input_func_t func;
struct codec_handler *handler;
struct codec_ssrc_handler *ch;
};
struct silence_event {
uint64_t start;
uint64_t end;
@ -214,6 +233,12 @@ static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *ch,
static void __dtx_shutdown(struct dtx_buffer *dtxb);
static struct codec_handler *__input_handler(struct codec_handler *h, struct media_packet *mp);
static void __delay_frame_process(struct delay_buffer *, struct delay_frame *dframe);
static void __delay_buffer_setup(struct delay_buffer **dbufp,
struct codec_handler *h, struct call *call, unsigned int delay);
static void __delay_buffer_shutdown(struct delay_buffer *dbuf, bool);
static void delay_buffer_stop(struct delay_buffer **pcmbp);
static struct codec_handler codec_handler_stub_ssrc = {
.source_pt.payload_type = -1,
@ -228,6 +253,10 @@ static struct codec_handler codec_handler_stub_ssrc = {
static void __handler_shutdown(struct codec_handler *handler) {
ssrc_hash_foreach(handler->ssrc_hash, __ssrc_handler_stop, NULL);
free_ssrc_hash(&handler->ssrc_hash);
if (handler->delay_buffer) {
__delay_buffer_shutdown(handler->delay_buffer, true);
delay_buffer_stop(&handler->delay_buffer);
}
if (handler->ssrc_handler)
obj_put(&handler->ssrc_handler->h);
@ -347,7 +376,7 @@ static void __make_transcoder(struct codec_handler *handler, struct rtp_payload_
STR_FMT(&dest->encoding_with_params),
dest->payload_type);
goto check_output;
goto no_handler_reset;
reset:
__handler_shutdown(handler);
@ -396,7 +425,8 @@ reset:
ssrc_hash_foreach(handler->media->monologue->ssrc_hash, __reset_sequencer, NULL);
check_output:;
no_handler_reset:
__delay_buffer_setup(&handler->delay_buffer, handler, handler->media->call, handler->media->buffer_delay);
// check if we have multiple decoders transcoding to the same output PT
struct codec_handler *output_handler = NULL;
if (output_transcoders)
@ -2082,6 +2112,59 @@ static int codec_decoder_event(enum codec_event event, void *ptr, void *data) {
return 0;
}
// must be locked
static void __delay_buffer_schedule(struct delay_buffer *dbuf) {
if (dbuf->ct.next.tv_sec) // already scheduled?
return;
struct delay_frame *dframe = g_queue_peek_tail(&dbuf->frames);
if (!dframe)
return;
struct timeval to_run = dframe->mp.tv;
timeval_add_usec(&to_run, dbuf->delay * 1000);
dbuf->ct.next = to_run;
timerthread_obj_schedule_abs(&dbuf->ct.tt_obj, &dbuf->ct.next);
}
static bool __buffer_delay_do_direct(struct delay_buffer *dbuf) {
if (!dbuf)
return true;
LOCK(&dbuf->lock);
if (dbuf->delay == 0 && dbuf->frames.length == 0)
return true;
return false;
}
static int delay_frame_cmp(const void *A, const void *B, void *ptr) {
const struct delay_frame *a = A, *b = B;
return -1 * timeval_cmp(&a->mp.tv, &b->mp.tv);
}
// consumes frame
static void __buffer_delay_frame(struct delay_buffer *dbuf, struct codec_ssrc_handler *ch,
encoder_input_func_t input_func, AVFrame *frame, struct media_packet *mp)
{
if (__buffer_delay_do_direct(dbuf)) {
// input now
input_func(ch->encoder, frame, ch->handler->packet_encoded, ch, mp);
av_frame_free(&frame);
return;
}
struct delay_frame *dframe = g_slice_alloc0(sizeof(*dframe));
dframe->frame = frame;
dframe->func = input_func;
dframe->ch = obj_get(&ch->h);
media_packet_copy(&dframe->mp, mp);
LOCK(&dbuf->lock);
g_queue_insert_sorted(&dbuf->frames, dframe, delay_frame_cmp, NULL);
__delay_buffer_schedule(dbuf);
}
// consumes `packet` if buffered (returns 1)
static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *decoder_handler,
struct codec_ssrc_handler *input_handler,
@ -2131,6 +2214,37 @@ static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *deco
return 1;
}
static void delay_frame_free(struct delay_frame *dframe) {
media_packet_release(&dframe->mp);
av_frame_free(&dframe->frame);
if (dframe->ch)
obj_put(&dframe->ch->h);
g_slice_free1(sizeof(*dframe), dframe);
}
static void delay_frame_send(struct delay_frame *dframe) {
// XXX this should be unified with other instances of the same code
struct sink_handler *sh = &dframe->mp.sink;
struct packet_stream *sink = sh->sink;
if (!sink)
media_socket_dequeue(&dframe->mp, NULL); // just free
else {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &dframe->mp))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
if (media_socket_dequeue(&dframe->mp, sink))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
}
static void delay_frame_flush(struct delay_buffer *dbuf, struct delay_frame *dframe) {
// call is locked in W here
__delay_frame_process(dbuf, dframe);
delay_frame_send(dframe);
delay_frame_free(dframe);
}
static void dtx_packet_free(struct dtx_packet *dtxp) {
if (dtxp->packet)
__transcode_packet_free(dtxp->packet);
@ -2141,9 +2255,88 @@ static void dtx_packet_free(struct dtx_packet *dtxp) {
obj_put(&dtxp->input_handler->h);
g_slice_free1(sizeof(*dtxp), dtxp);
}
static void delay_buffer_stop(struct delay_buffer **pcmbp) {
codec_timer_stop((struct codec_timer **) pcmbp);
}
static void dtx_buffer_stop(struct dtx_buffer **dtxbp) {
codec_timer_stop((struct codec_timer **) dtxbp);
}
static void __delay_frame_process(struct delay_buffer *dbuf, struct delay_frame *dframe) {
if (dframe->mp.rtp) {
// adjust output seq num. pushing packets into the delay buffer looks as if they
// were consumed and resulted in a decreased seq_diff. we need to adjust the
// perceived seq num forward to compensate for this. each entry in the buffer
// accounts for one packet that was only delayed and not consumed:
//LOCK(&dbuf->lock); // XXX needed?
dframe->mp.rtp->seq_num = htons(ntohs(dframe->mp.rtp->seq_num) + dbuf->frames.length + 1);
}
struct codec_ssrc_handler *csh = dframe->ch;
if (csh && csh->handler && csh->encoder) {
dframe->func(csh->encoder, dframe->frame, csh->handler->packet_encoded,
csh, &dframe->mp);
}
else
ilog(LOG_ERR | LOG_FLAG_LIMIT, "Delay buffer bug");
}
static void __delay_send_later(struct codec_timer *ct) {
struct delay_buffer *dbuf = (void *) ct;
struct call *call = NULL;
struct delay_frame *dframe = NULL;
{
// short-term lock - copy out references to all relevant objects
LOCK(&dbuf->lock);
call = dbuf->call;
if (call)
obj_get(call);
dframe = g_queue_pop_tail(&dbuf->frames);
}
if (!call) // do nothing
goto out;
// we can now do a top-down lock
rwlock_lock_r(&call->master_lock);
log_info_call(call);
if (!dframe)
goto out;
__ssrc_lock_both(&dframe->mp);
__delay_frame_process(dbuf, dframe);
__ssrc_unlock_both(&dframe->mp);
delay_frame_send(dframe);
{
// schedule next run
LOCK(&dbuf->lock);
dbuf->ct.next.tv_sec = 0;
__delay_buffer_schedule(dbuf);
}
out:
// release all references
if (call) {
rwlock_unlock_r(&call->master_lock);
obj_put(call);
}
if (dframe)
delay_frame_free(dframe);
log_info_clear();
}
static bool __dtx_drift_shift(struct dtx_buffer *dtxb, struct dtx_packet *dtxp, unsigned long ts,
long tv_diff, long ts_diff,
struct codec_ssrc_handler *ch)
@ -2465,12 +2658,30 @@ static void __dtx_shutdown(struct dtx_buffer *dtxb) {
dtxb->call = NULL;
g_queue_clear_full(&dtxb->packets, (GDestroyNotify) dtx_packet_free);
}
static void __delay_buffer_shutdown(struct delay_buffer *dbuf, bool flush) {
if (flush) {
while (dbuf->frames.length) {
struct delay_frame *dframe = g_queue_pop_tail(&dbuf->frames);
delay_frame_flush(dbuf, dframe);
}
}
else
g_queue_clear_full(&dbuf->frames, (GDestroyNotify) delay_frame_free);
if (dbuf->call)
obj_put(dbuf->call);
dbuf->call = NULL;
}
static void __dtx_free(void *p) {
struct dtx_buffer *dtxb = p;
__dtx_shutdown(dtxb);
media_packet_release(&dtxb->last_mp);
mutex_destroy(&dtxb->lock);
}
static void __delay_buffer_free(void *p) {
struct delay_buffer *dbuf = p;
__delay_buffer_shutdown(dbuf, false);
mutex_destroy(&dbuf->lock);
}
static void __dtx_setup(struct codec_ssrc_handler *ch) {
if (!decoder_has_dtx(ch->decoder) || ch->dtx_buffer)
return;
@ -2495,6 +2706,30 @@ static void __dtx_setup(struct codec_ssrc_handler *ch) {
dtx->clockrate = ch->handler->source_pt.clock_rate;
dtx->tspp = dtx->ptime * dtx->clockrate / 1000;
}
static void __delay_buffer_setup(struct delay_buffer **dbufp,
struct codec_handler *h, struct call *call, unsigned int delay)
{
if (!dbufp)
return;
struct delay_buffer *dbuf = *dbufp;
if (dbuf) {
// update options
dbuf->delay = delay;
return;
}
if (!delay)
return;
dbuf = obj_alloc0("delay_buffer", sizeof(*dbuf), __delay_buffer_free);
dbuf->ct.tt_obj.tt = &codec_timers_thread;
dbuf->ct.func = __delay_send_later;
dbuf->handler = h;
dbuf->call = obj_get(call);
dbuf->delay = delay;
mutex_init(&dbuf->lock);
}
static void __ssrc_handler_stop(void *p, void *dummy) {
struct codec_ssrc_handler *ch = p;
if (ch->dtx_buffer) {
@ -2508,6 +2743,14 @@ static void __ssrc_handler_stop(void *p, void *dummy) {
void codec_handlers_stop(GQueue *q) {
for (GList *l = q->head; l; l = l->next) {
struct codec_handler *h = l->data;
if (h->delay_buffer) {
mutex_lock(&h->delay_buffer->lock);
__delay_buffer_shutdown(h->delay_buffer, true);
mutex_unlock(&h->delay_buffer->lock);
delay_buffer_stop(&h->delay_buffer);
}
ssrc_hash_foreach(h->ssrc_hash, __ssrc_handler_stop, NULL);
}
}
@ -2841,8 +3084,7 @@ static void __dtmf_detect(struct codec_ssrc_handler *ch, AVFrame *frame) {
}
static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, void *u2,
int (*input_func)(encoder_t *enc, AVFrame *frame,
int (*callback)(encoder_t *, void *u1, void *u2), void *u1, void *u2))
encoder_input_func_t input_func)
{
struct codec_ssrc_handler *ch = u1;
struct media_packet *mp = u2;
@ -2892,7 +3134,8 @@ static int packet_decoded_common(decoder_t *decoder, AVFrame *frame, void *u1, v
if (mp->media_out)
ch->encoder->codec_options.amr.cmr = mp->media_out->u.amr.cmr;
input_func(ch->encoder, frame, h->packet_encoded, ch, mp);
__buffer_delay_frame(h->delay_buffer, ch, input_func, frame, mp);
frame = NULL; // consumed
discard:
av_frame_free(&frame);

@ -384,9 +384,10 @@ struct call_media {
struct t38_gateway *t38_gateway;
struct codec_handler *t38_handler;
unsigned int buffer_delay;
mutex_t dtmf_lock;
unsigned long dtmf_ts; // TS of last processed end event
#ifdef WITH_TRANSCODING
union {
struct {

@ -26,6 +26,7 @@ struct mqtt_timer;
struct call;
struct codec_store;
struct call_monologue;
struct delay_buffer;
typedef int codec_handler_func(struct codec_handler *, struct media_packet *);
@ -56,6 +57,7 @@ struct codec_handler {
struct codec_ssrc_handler *ssrc_handler;
// for DTMF injection
struct codec_handler *dtmf_injector;
struct delay_buffer *delay_buffer;
// stats entry
char *stats_chain;

Loading…
Cancel
Save