JB marker fix

pull/988/head
root 5 years ago
parent e1b2532fe8
commit 05b6da43f1

@ -10,8 +10,9 @@
#define INITIAL_PACKETS 0x1E
#define CONT_SEQ_COUNT 0x1F4
#define CONT_MISS_COUNT 0x0A
#define CLOCK_DRIFT_MULT 0x14
#define ONE_MS 0x64
#define CLOCK_DRIFT_MULT 0x28
#define DELAY_FACTOR 0x64
#define COMFORT_NOISE 0x0D
static struct timerthread jitter_buffer_thread;
@ -22,6 +23,13 @@ void jitter_buffer_init(void) {
timerthread_init(&jitter_buffer_thread, timerthread_queue_run);
}
static void jitter_buffer_flush(struct jitter_buffer *jb) {
mutex_unlock(&jb->lock);
timerthread_queue_flush_data(&jb->ttq);
mutex_lock(&jb->lock);
}
// jb is locked
static void reset_jitter_buffer(struct jitter_buffer *jb) {
ilog(LOG_INFO, "reset_jitter_buffer");
@ -37,14 +45,16 @@ static void reset_jitter_buffer(struct jitter_buffer *jb) {
jb->next_exp_seq = 0;
jb->clock_rate = 0;
jb->payload_type = 0;
jb->drift_mult_factor = 0;
jb->buf_decremented = 0;
jb->clock_drift_val = 0;
jb->prev_seq_ts = rtpe_now;
jb->prev_seq = 0;
jb->num_resets++;
if(g_tree_nnodes(jb->ttq.entries) > 0)
jitter_buffer_flush(jb);
//disable jitter buffer in case of more than 2 resets
if(jb->num_resets > 2 && jb->call)
if(jb->num_resets >= 2)
jb->disabled = 1;
}
@ -108,7 +118,7 @@ static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) {
// jb is locked
static void check_buffered_packets(struct jitter_buffer *jb) {
if (g_tree_nnodes(jb->ttq.entries) >= (2* rtpe_config.jb_length)) {
if (g_tree_nnodes(jb->ttq.entries) >= (3* rtpe_config.jb_length)) {
ilog(LOG_DEBUG, "Jitter reset due to buffer overflow");
reset_jitter_buffer(jb);
}
@ -120,6 +130,7 @@ static int queue_packet(struct media_packet *mp, struct jb_packet *p) {
unsigned long ts = ntohl(mp->rtp->timestamp);
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
int curr_seq = ntohs(mp->rtp->seq_num);
if(!clockrate || !jb->first_send.tv_sec) {
ilog(LOG_DEBUG, "Jitter reset due to clockrate");
@ -127,21 +138,23 @@ static int queue_packet(struct media_packet *mp, struct jb_packet *p) {
return 1;
}
long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts;
int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq;
int seq_diff = curr_seq - jb->first_seq;
if(seq_diff < 0) {
jb->first_send.tv_sec = 0;
return 1;
}
if(!jb->rtptime_delta && seq_diff) {
jb->rtptime_delta = ts_diff/seq_diff;
}
p->ttq_entry.when = jb->first_send;
long long ts_diff_us =
(long long) (ts_diff + (jb->rtptime_delta * jb->buffer_len))* 1000000 / clockrate;
ts_diff_us += (jb->clock_drift_val * seq_diff);
ts_diff_us += (jb->dtmf_mult_factor * ONE_MS);
ts_diff_us += (jb->dtmf_mult_factor * DELAY_FACTOR);
if(jb->buf_decremented) {
ts_diff_us += 5000; //add 5ms delta when 2 packets are scheduled around same time
jb->buf_decremented = 0;
}
timeval_add_usec(&p->ttq_entry.when, ts_diff_us);
ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now);
@ -149,29 +162,43 @@ static int queue_packet(struct media_packet *mp, struct jb_packet *p) {
if (ts_diff_us > 1000000) { // more than one second, can't be right
ilog(LOG_DEBUG, "Partial reset due to timestamp");
jb->first_send.tv_sec = 0;
return 1;
p->ttq_entry.when = rtpe_now;
}
if(jb->prev_seq_ts.tv_sec == 0)
jb->prev_seq_ts = rtpe_now;
if((timeval_diff(&p->ttq_entry.when, &jb->prev_seq_ts) < 0) && (curr_seq > jb->prev_seq)) {
p->ttq_entry.when = jb->prev_seq_ts;
timeval_add_usec(&p->ttq_entry.when, DELAY_FACTOR);
}
if(timeval_diff(&p->ttq_entry.when, &jb->prev_seq_ts) > 0) {
jb->prev_seq_ts = p->ttq_entry.when;
jb->prev_seq = curr_seq;
}
if(seq_diff > 3000) //readjust after 3k packets
jb->first_send.tv_sec = 0;
timerthread_queue_push(&jb->ttq, &p->ttq_entry);
return 0;
}
static void handle_clock_drift(struct media_packet *mp) {
static int handle_clock_drift(struct media_packet *mp) {
ilog(LOG_DEBUG, "handle_clock_drift");
struct jitter_buffer *jb = mp->stream->jb;
int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq;
int mult_factor = pow(2, jb->drift_mult_factor);
if(seq_diff < (mult_factor * CLOCK_DRIFT_MULT))
return;
if((seq_diff % CLOCK_DRIFT_MULT) != 0)
return 0;
unsigned long ts = ntohl(mp->rtp->timestamp);
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
if(!clockrate) {
return;
return 0;
}
long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts;
long long ts_diff_us =
@ -181,7 +208,13 @@ static void handle_clock_drift(struct media_packet *mp) {
long long time_diff = timeval_diff(&rtpe_now, &to_send);
jb->clock_drift_val = time_diff/seq_diff;
jb->drift_mult_factor++;
if(jb->clock_drift_val < -10000 || jb->clock_drift_val > 10000) { //disable jb if clock drift greater than 10 ms
jb->disabled = 1;
jitter_buffer_flush(jb);
ilog(LOG_DEBUG, "JB disabled due to clock drift");
return 1;
}
return 0;
}
int buffer_packet(struct media_packet *mp, const str *s) {
@ -199,6 +232,11 @@ int buffer_packet(struct media_packet *mp, const str *s) {
if (!jb || jb->disabled || PS_ISSET(mp->sfd->stream, RTCP))
goto end;
if(jb->initial_pkts < INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any
jb->initial_pkts++;
goto end;
}
p = get_jb_packet(mp, s);
if (!p)
goto end;
@ -208,26 +246,40 @@ int buffer_packet(struct media_packet *mp, const str *s) {
mp = &p->mp;
mutex_lock(&jb->lock);
int payload_type = (mp->rtp->m_pt & 0x7f);
int seq = ntohs(mp->rtp->seq_num);
int marker = (mp->rtp->m_pt & 0x80) ? 1 : 0;
int dtmf = 0;
const struct rtp_payload_type *rtp_pt = get_rtp_payload_type(mp, payload_type);
if(rtp_pt) {
if(rtp_pt->codec_def && rtp_pt->codec_def->dtmf)
dtmf = 1;
}
mutex_lock(&jb->lock);
if(marker || (jb->ssrc != ntohl(mp->rtp->ssrc)) || seq == 0 ) { //marker or ssrc change or sequence wrap
jb->first_send.tv_sec = 0;
}
if((jb->clock_rate && jb->payload_type != payload_type) ||
(jb->first_send.tv_sec && jb->ssrc != ntohl(mp->rtp->ssrc))) { //reset in case of payload change or ssrc change
const struct rtp_payload_type *rtp_pt = get_rtp_payload_type(mp, payload_type);
if(rtp_pt) {
if(rtp_pt->codec_def && !rtp_pt->codec_def->dtmf)
if(jb->clock_rate && jb->payload_type != payload_type) { //reset in case of payload change
if(!dtmf)
jb->first_send.tv_sec = 0;
if(rtp_pt->codec_def->dtmf)
jb->dtmf_mult_factor++;
else
jb->dtmf_mult_factor=0;
}
jb->dtmf_mult_factor++;
}
if(!dtmf && jb->dtmf_mult_factor) { //reset after DTMF ends
jb->first_send.tv_sec = 0;
jb->dtmf_mult_factor=0;
}
if (jb->first_send.tv_sec) {
if(rtpe_config.jb_clock_drift)
handle_clock_drift(mp);
if(rtpe_config.jb_clock_drift) {
if(handle_clock_drift(mp))
goto end_unlock;
}
ret = queue_packet(mp,p);
}
else {
@ -236,22 +288,20 @@ int buffer_packet(struct media_packet *mp, const str *s) {
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
if(!clockrate){
jb->initial_pkts++;
if(jb->initial_pkts > INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any
if(jb->rtptime_delta && payload_type != COMFORT_NOISE) { //ignore CN
ilog(LOG_INFO, "Jitter reset due to unknown payload = %d", payload_type);
reset_jitter_buffer(jb);
}
goto end_unlock;
}
p->ttq_entry.when = jb->first_send = rtpe_now;
jb->first_send_ts = ts;
jb->first_seq = ntohs(mp->rtp->seq_num);
jb->ssrc = ntohl(mp->rtp->ssrc);
if(jb->buffer_len > 0)
if(jb->rtptime_delta)
ret = queue_packet(mp,p);
jb->rtptime_delta = 0;
jb->next_exp_seq = 0;
jb->drift_mult_factor = 0;
if(!dtmf)
jb->rtptime_delta = 0;
}
// packet consumed?
@ -276,10 +326,8 @@ static void increment_buffer(struct jitter_buffer *jb) {
}
static void decrement_buffer(struct jitter_buffer *jb) {
if(jb->buffer_len > 0) {
if(jb->buffer_len > 0)
jb->buffer_len--;
jb->buf_decremented = 1;
}
}
static void set_jitter_values(struct media_packet *mp) {
@ -287,16 +335,28 @@ static void set_jitter_values(struct media_packet *mp) {
if(!jb || !mp->rtp)
return;
int curr_seq = ntohs(mp->rtp->seq_num);
if(jb->next_exp_seq) {
int payload_type = (mp->rtp->m_pt & 0x7f);
int dtmf = 0;
const struct rtp_payload_type *rtp_pt = get_rtp_payload_type(mp, payload_type);
if(rtp_pt) {
if(rtp_pt->codec_def && rtp_pt->codec_def->dtmf)
dtmf = 1;
}
if(jb->next_exp_seq && !dtmf) {
mutex_lock(&jb->lock);
if(curr_seq > jb->next_exp_seq) {
ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq);
increment_buffer(jb);
jb->cont_frames = 0;
jb->cont_miss++;
int marker = (mp->rtp->m_pt & 0x80) ? 1 : 0;
if(!marker) {
ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq);
increment_buffer(jb);
jb->cont_frames = 0;
jb->cont_miss++;
}
}
else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed
jb->cont_frames = 0;
if((curr_seq == 0) || (jb->next_exp_seq - curr_seq) > 65500) //sequence wrap
jb->next_exp_seq = 0;
}
else {
jb->cont_frames++;
@ -304,7 +364,6 @@ static void set_jitter_values(struct media_packet *mp) {
if(jb->cont_frames >= CONT_SEQ_COUNT) {
decrement_buffer(jb);
jb->cont_frames = 0;
ilog(LOG_DEBUG, "Received continous frames Buffer len=%d", jb->buffer_len);
}
}

@ -235,3 +235,23 @@ unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) {
return num;
}
void timerthread_queue_flush_data(void *ptr) {
struct timerthread_queue *ttq = ptr;
ilog(LOG_DEBUG, "timerthread_queue_flush_data");
mutex_lock(&ttq->lock);
while (g_tree_nnodes(ttq->entries)) {
struct timerthread_queue_entry *ttqe = g_tree_find_first(ttq->entries, NULL, NULL);
assert(ttqe != NULL);
g_tree_remove(ttq->entries, ttqe);
mutex_unlock(&ttq->lock);
ttq->run_later_func(ttq, ttqe);
mutex_lock(&ttq->lock);
}
mutex_unlock(&ttq->lock);
}

@ -22,7 +22,9 @@ struct jitter_buffer {
mutex_t lock;
unsigned long first_send_ts;
struct timeval first_send;
struct timeval prev_seq_ts;
unsigned int first_seq;
unsigned int prev_seq;
unsigned int rtptime_delta;
unsigned int next_exp_seq;
unsigned int cont_frames;
@ -31,12 +33,10 @@ struct jitter_buffer {
unsigned int payload_type;
unsigned int num_resets;
unsigned int initial_pkts;
unsigned int drift_mult_factor;
unsigned int ssrc;
unsigned int dtmf_mult_factor;
int buffer_len;
int clock_drift_val;
int buf_decremented;
struct call *call;
int disabled;
};

@ -55,6 +55,7 @@ void *timerthread_queue_new(const char *type, size_t size,
void (*free_func)(void *),
void (*entry_free_func)(void *));
void timerthread_queue_run(void *ptr);
void timerthread_queue_flush_data(void *ptr);
void timerthread_queue_push(struct timerthread_queue *, struct timerthread_queue_entry *);
unsigned int timerthread_queue_flush(struct timerthread_queue *, void *);

Loading…
Cancel
Save