From 05b6da43f10038a8effe4e025e05ebfe85f8bb03 Mon Sep 17 00:00:00 2001 From: root Date: Sun, 10 May 2020 13:37:45 +0000 Subject: [PATCH] JB marker fix --- daemon/jitter_buffer.c | 155 +++++++++++++++++++++++++++------------- daemon/timerthread.c | 20 ++++++ include/jitter_buffer.h | 4 +- include/timerthread.h | 1 + 4 files changed, 130 insertions(+), 50 deletions(-) diff --git a/daemon/jitter_buffer.c b/daemon/jitter_buffer.c index 8efda0ad1..75fa3e5cc 100644 --- a/daemon/jitter_buffer.c +++ b/daemon/jitter_buffer.c @@ -10,8 +10,9 @@ #define INITIAL_PACKETS 0x1E #define CONT_SEQ_COUNT 0x1F4 #define CONT_MISS_COUNT 0x0A -#define CLOCK_DRIFT_MULT 0x14 -#define ONE_MS 0x64 +#define CLOCK_DRIFT_MULT 0x28 +#define DELAY_FACTOR 0x64 +#define COMFORT_NOISE 0x0D static struct timerthread jitter_buffer_thread; @@ -22,6 +23,13 @@ void jitter_buffer_init(void) { timerthread_init(&jitter_buffer_thread, timerthread_queue_run); } +static void jitter_buffer_flush(struct jitter_buffer *jb) { + mutex_unlock(&jb->lock); + timerthread_queue_flush_data(&jb->ttq); + mutex_lock(&jb->lock); +} + + // jb is locked static void reset_jitter_buffer(struct jitter_buffer *jb) { ilog(LOG_INFO, "reset_jitter_buffer"); @@ -37,14 +45,16 @@ static void reset_jitter_buffer(struct jitter_buffer *jb) { jb->next_exp_seq = 0; jb->clock_rate = 0; jb->payload_type = 0; - jb->drift_mult_factor = 0; - jb->buf_decremented = 0; jb->clock_drift_val = 0; + jb->prev_seq_ts = rtpe_now; + jb->prev_seq = 0; jb->num_resets++; + if(g_tree_nnodes(jb->ttq.entries) > 0) + jitter_buffer_flush(jb); //disable jitter buffer in case of more than 2 resets - if(jb->num_resets > 2 && jb->call) + if(jb->num_resets >= 2) jb->disabled = 1; } @@ -108,7 +118,7 @@ static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { // jb is locked static void check_buffered_packets(struct jitter_buffer *jb) { - if (g_tree_nnodes(jb->ttq.entries) >= (2* rtpe_config.jb_length)) { + if (g_tree_nnodes(jb->ttq.entries) >= (3* rtpe_config.jb_length)) { ilog(LOG_DEBUG, "Jitter reset due to buffer overflow"); reset_jitter_buffer(jb); } @@ -120,6 +130,7 @@ static int queue_packet(struct media_packet *mp, struct jb_packet *p) { unsigned long ts = ntohl(mp->rtp->timestamp); int payload_type = (mp->rtp->m_pt & 0x7f); int clockrate = get_clock_rate(mp, payload_type); + int curr_seq = ntohs(mp->rtp->seq_num); if(!clockrate || !jb->first_send.tv_sec) { ilog(LOG_DEBUG, "Jitter reset due to clockrate"); @@ -127,21 +138,23 @@ static int queue_packet(struct media_packet *mp, struct jb_packet *p) { return 1; } long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; - int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; + int seq_diff = curr_seq - jb->first_seq; + if(seq_diff < 0) { + jb->first_send.tv_sec = 0; + return 1; + } + if(!jb->rtptime_delta && seq_diff) { jb->rtptime_delta = ts_diff/seq_diff; } + p->ttq_entry.when = jb->first_send; long long ts_diff_us = (long long) (ts_diff + (jb->rtptime_delta * jb->buffer_len))* 1000000 / clockrate; ts_diff_us += (jb->clock_drift_val * seq_diff); - ts_diff_us += (jb->dtmf_mult_factor * ONE_MS); + ts_diff_us += (jb->dtmf_mult_factor * DELAY_FACTOR); - if(jb->buf_decremented) { - ts_diff_us += 5000; //add 5ms delta when 2 packets are scheduled around same time - jb->buf_decremented = 0; - } timeval_add_usec(&p->ttq_entry.when, ts_diff_us); ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); @@ -149,29 +162,43 @@ static int queue_packet(struct media_packet *mp, struct jb_packet *p) { if (ts_diff_us > 1000000) { // more than one second, can't be right ilog(LOG_DEBUG, "Partial reset due to timestamp"); jb->first_send.tv_sec = 0; - return 1; + p->ttq_entry.when = rtpe_now; } + if(jb->prev_seq_ts.tv_sec == 0) + jb->prev_seq_ts = rtpe_now; + + if((timeval_diff(&p->ttq_entry.when, &jb->prev_seq_ts) < 0) && (curr_seq > jb->prev_seq)) { + p->ttq_entry.when = jb->prev_seq_ts; + timeval_add_usec(&p->ttq_entry.when, DELAY_FACTOR); + } + + if(timeval_diff(&p->ttq_entry.when, &jb->prev_seq_ts) > 0) { + jb->prev_seq_ts = p->ttq_entry.when; + jb->prev_seq = curr_seq; + } + + if(seq_diff > 3000) //readjust after 3k packets + jb->first_send.tv_sec = 0; + timerthread_queue_push(&jb->ttq, &p->ttq_entry); return 0; } -static void handle_clock_drift(struct media_packet *mp) { +static int handle_clock_drift(struct media_packet *mp) { ilog(LOG_DEBUG, "handle_clock_drift"); struct jitter_buffer *jb = mp->stream->jb; int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; - int mult_factor = pow(2, jb->drift_mult_factor); - - if(seq_diff < (mult_factor * CLOCK_DRIFT_MULT)) - return; + if((seq_diff % CLOCK_DRIFT_MULT) != 0) + return 0; unsigned long ts = ntohl(mp->rtp->timestamp); int payload_type = (mp->rtp->m_pt & 0x7f); int clockrate = get_clock_rate(mp, payload_type); if(!clockrate) { - return; + return 0; } long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; long long ts_diff_us = @@ -181,7 +208,13 @@ static void handle_clock_drift(struct media_packet *mp) { long long time_diff = timeval_diff(&rtpe_now, &to_send); jb->clock_drift_val = time_diff/seq_diff; - jb->drift_mult_factor++; + if(jb->clock_drift_val < -10000 || jb->clock_drift_val > 10000) { //disable jb if clock drift greater than 10 ms + jb->disabled = 1; + jitter_buffer_flush(jb); + ilog(LOG_DEBUG, "JB disabled due to clock drift"); + return 1; + } + return 0; } int buffer_packet(struct media_packet *mp, const str *s) { @@ -199,6 +232,11 @@ int buffer_packet(struct media_packet *mp, const str *s) { if (!jb || jb->disabled || PS_ISSET(mp->sfd->stream, RTCP)) goto end; + if(jb->initial_pkts < INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any + jb->initial_pkts++; + goto end; + } + p = get_jb_packet(mp, s); if (!p) goto end; @@ -208,26 +246,40 @@ int buffer_packet(struct media_packet *mp, const str *s) { mp = &p->mp; + mutex_lock(&jb->lock); + int payload_type = (mp->rtp->m_pt & 0x7f); + int seq = ntohs(mp->rtp->seq_num); + int marker = (mp->rtp->m_pt & 0x80) ? 1 : 0; + int dtmf = 0; + const struct rtp_payload_type *rtp_pt = get_rtp_payload_type(mp, payload_type); + if(rtp_pt) { + if(rtp_pt->codec_def && rtp_pt->codec_def->dtmf) + dtmf = 1; + } - mutex_lock(&jb->lock); + if(marker || (jb->ssrc != ntohl(mp->rtp->ssrc)) || seq == 0 ) { //marker or ssrc change or sequence wrap + jb->first_send.tv_sec = 0; + } - if((jb->clock_rate && jb->payload_type != payload_type) || - (jb->first_send.tv_sec && jb->ssrc != ntohl(mp->rtp->ssrc))) { //reset in case of payload change or ssrc change - const struct rtp_payload_type *rtp_pt = get_rtp_payload_type(mp, payload_type); - if(rtp_pt) { - if(rtp_pt->codec_def && !rtp_pt->codec_def->dtmf) + if(jb->clock_rate && jb->payload_type != payload_type) { //reset in case of payload change + if(!dtmf) jb->first_send.tv_sec = 0; - if(rtp_pt->codec_def->dtmf) - jb->dtmf_mult_factor++; else - jb->dtmf_mult_factor=0; - } + jb->dtmf_mult_factor++; + } + + if(!dtmf && jb->dtmf_mult_factor) { //reset after DTMF ends + jb->first_send.tv_sec = 0; + jb->dtmf_mult_factor=0; } + if (jb->first_send.tv_sec) { - if(rtpe_config.jb_clock_drift) - handle_clock_drift(mp); + if(rtpe_config.jb_clock_drift) { + if(handle_clock_drift(mp)) + goto end_unlock; + } ret = queue_packet(mp,p); } else { @@ -236,22 +288,20 @@ int buffer_packet(struct media_packet *mp, const str *s) { int payload_type = (mp->rtp->m_pt & 0x7f); int clockrate = get_clock_rate(mp, payload_type); if(!clockrate){ - jb->initial_pkts++; - if(jb->initial_pkts > INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any + if(jb->rtptime_delta && payload_type != COMFORT_NOISE) { //ignore CN + ilog(LOG_INFO, "Jitter reset due to unknown payload = %d", payload_type); reset_jitter_buffer(jb); } goto end_unlock; } - p->ttq_entry.when = jb->first_send = rtpe_now; jb->first_send_ts = ts; jb->first_seq = ntohs(mp->rtp->seq_num); jb->ssrc = ntohl(mp->rtp->ssrc); - if(jb->buffer_len > 0) + if(jb->rtptime_delta) ret = queue_packet(mp,p); - jb->rtptime_delta = 0; - jb->next_exp_seq = 0; - jb->drift_mult_factor = 0; + if(!dtmf) + jb->rtptime_delta = 0; } // packet consumed? @@ -276,10 +326,8 @@ static void increment_buffer(struct jitter_buffer *jb) { } static void decrement_buffer(struct jitter_buffer *jb) { - if(jb->buffer_len > 0) { + if(jb->buffer_len > 0) jb->buffer_len--; - jb->buf_decremented = 1; - } } static void set_jitter_values(struct media_packet *mp) { @@ -287,16 +335,28 @@ static void set_jitter_values(struct media_packet *mp) { if(!jb || !mp->rtp) return; int curr_seq = ntohs(mp->rtp->seq_num); - if(jb->next_exp_seq) { + int payload_type = (mp->rtp->m_pt & 0x7f); + int dtmf = 0; + const struct rtp_payload_type *rtp_pt = get_rtp_payload_type(mp, payload_type); + if(rtp_pt) { + if(rtp_pt->codec_def && rtp_pt->codec_def->dtmf) + dtmf = 1; + } + if(jb->next_exp_seq && !dtmf) { mutex_lock(&jb->lock); if(curr_seq > jb->next_exp_seq) { - ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq); - increment_buffer(jb); - jb->cont_frames = 0; - jb->cont_miss++; + int marker = (mp->rtp->m_pt & 0x80) ? 1 : 0; + if(!marker) { + ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq); + increment_buffer(jb); + jb->cont_frames = 0; + jb->cont_miss++; + } } else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed jb->cont_frames = 0; + if((curr_seq == 0) || (jb->next_exp_seq - curr_seq) > 65500) //sequence wrap + jb->next_exp_seq = 0; } else { jb->cont_frames++; @@ -304,7 +364,6 @@ static void set_jitter_values(struct media_packet *mp) { if(jb->cont_frames >= CONT_SEQ_COUNT) { decrement_buffer(jb); jb->cont_frames = 0; - ilog(LOG_DEBUG, "Received continous frames Buffer len=%d", jb->buffer_len); } } diff --git a/daemon/timerthread.c b/daemon/timerthread.c index 2e57833ad..473b15daa 100644 --- a/daemon/timerthread.c +++ b/daemon/timerthread.c @@ -235,3 +235,23 @@ unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) { return num; } + +void timerthread_queue_flush_data(void *ptr) { + struct timerthread_queue *ttq = ptr; + + ilog(LOG_DEBUG, "timerthread_queue_flush_data"); + + mutex_lock(&ttq->lock); + while (g_tree_nnodes(ttq->entries)) { + struct timerthread_queue_entry *ttqe = g_tree_find_first(ttq->entries, NULL, NULL); + assert(ttqe != NULL); + g_tree_remove(ttq->entries, ttqe); + + mutex_unlock(&ttq->lock); + + ttq->run_later_func(ttq, ttqe); + + mutex_lock(&ttq->lock); + } + mutex_unlock(&ttq->lock); +} diff --git a/include/jitter_buffer.h b/include/jitter_buffer.h index 9fe3260c9..d715941ac 100644 --- a/include/jitter_buffer.h +++ b/include/jitter_buffer.h @@ -22,7 +22,9 @@ struct jitter_buffer { mutex_t lock; unsigned long first_send_ts; struct timeval first_send; + struct timeval prev_seq_ts; unsigned int first_seq; + unsigned int prev_seq; unsigned int rtptime_delta; unsigned int next_exp_seq; unsigned int cont_frames; @@ -31,12 +33,10 @@ struct jitter_buffer { unsigned int payload_type; unsigned int num_resets; unsigned int initial_pkts; - unsigned int drift_mult_factor; unsigned int ssrc; unsigned int dtmf_mult_factor; int buffer_len; int clock_drift_val; - int buf_decremented; struct call *call; int disabled; }; diff --git a/include/timerthread.h b/include/timerthread.h index 708ccc54c..9f344ce63 100644 --- a/include/timerthread.h +++ b/include/timerthread.h @@ -55,6 +55,7 @@ void *timerthread_queue_new(const char *type, size_t size, void (*free_func)(void *), void (*entry_free_func)(void *)); void timerthread_queue_run(void *ptr); +void timerthread_queue_flush_data(void *ptr); void timerthread_queue_push(struct timerthread_queue *, struct timerthread_queue_entry *); unsigned int timerthread_queue_flush(struct timerthread_queue *, void *);