TT#106101 handle out-of-order ICE fragments

Change-Id: I6b16474d505a40055b06d215e4cc6c5391214613
pull/1164/head
Richard Fuchs 4 years ago
parent 554034eb7e
commit 97a1c218fa

@ -697,6 +697,8 @@ next:
kill_calls_timer(hlp.del_scheduled, NULL);
kill_calls_timer(hlp.del_timeout, rtpe_config.b2b_url);
call_interfaces_timer();
struct timeval tv_stop;
gettimeofday(&tv_stop, NULL);
long long duration = timeval_diff(&tv_stop, &tv_start);
@ -2109,6 +2111,10 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
if (flags && flags->fragment) {
// trickle ICE SDP fragment. don't do anything other than update
// the ICE stuff.
if (!MEDIA_ISSET(other_media, TRICKLE_ICE))
return ERROR_NO_ICE_AGENT;
if (!other_media->ice_agent)
return ERROR_NO_ICE_AGENT;
ice_update(other_media->ice_agent, sp);
continue;
}

@ -31,6 +31,17 @@
#include "dtmf.h"
struct fragment_key {
str call_id;
str from_tag;
};
struct sdp_fragment {
struct ng_buffer *ngbuf;
struct timeval received;
GQueue streams;
struct sdp_ng_flags flags;
};
static pcre *info_re;
static pcre_extra *info_ree;
static pcre *streams_re;
@ -39,6 +50,9 @@ static pcre_extra *streams_ree;
int trust_address_def;
int dtls_passive_def;
static mutex_t sdp_fragments_lock;
static GHashTable *sdp_fragments;
INLINE int call_ng_flags_prefix(struct sdp_ng_flags *out, str *s_ori, const char *prefix,
void (*cb)(struct sdp_ng_flags *, str *, void *), void *ptr);
@ -1169,6 +1183,98 @@ static enum load_limit_reasons call_offer_session_limit(void) {
return ret;
}
static void fragment_free(struct sdp_fragment *frag) {
streams_free(&frag->streams);
call_ng_free_flags(&frag->flags);
obj_put(frag->ngbuf);
g_slice_free1(sizeof(*frag), frag);
}
static void fragment_key_free(void *p) {
struct fragment_key *k = p;
free(k->call_id.s);
free(k->from_tag.s);
g_slice_free1(sizeof(*k), k);
}
static void queue_sdp_fragment(struct ng_buffer *ngbuf, GQueue *streams, struct sdp_ng_flags *flags) {
ilog(LOG_DEBUG, "Queuing up SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M,
STR_FMT_M(&flags->call_id), STR_FMT_M(&flags->from_tag));
struct fragment_key *k = g_slice_alloc0(sizeof(*k));
str_init_dup_str(&k->call_id, &flags->call_id);
str_init_dup_str(&k->from_tag, &flags->from_tag);
struct sdp_fragment *frag = g_slice_alloc0(sizeof(*frag));
frag->received = rtpe_now;
frag->ngbuf = obj_get(ngbuf);
frag->streams = *streams;
frag->flags = *flags;
g_queue_init(streams);
ZERO(*flags);
mutex_lock(&sdp_fragments_lock);
GQueue *frags = g_hash_table_lookup_queue_new(sdp_fragments, k, fragment_key_free);
g_queue_push_tail(frags, frag);
mutex_unlock(&sdp_fragments_lock);
}
#define MAX_FRAG_AGE 3000000
static void dequeue_sdp_fragments(struct call_monologue *monologue) {
struct fragment_key k;
ZERO(k);
k.call_id = monologue->call->callid;
k.from_tag = monologue->tag;
mutex_lock(&sdp_fragments_lock);
GQueue *frags = g_hash_table_lookup(sdp_fragments, &k);
if (!frags) {
mutex_unlock(&sdp_fragments_lock);
return;
}
g_hash_table_remove(sdp_fragments, &k);
// we own the queue now
mutex_unlock(&sdp_fragments_lock);
struct sdp_fragment *frag;
while ((frag = g_queue_pop_head(frags))) {
if (timeval_diff(&rtpe_now, &frag->received) > MAX_FRAG_AGE)
goto next;
ilog(LOG_DEBUG, "Dequeuing SDP fragment for " STR_FORMAT_M "/" STR_FORMAT_M,
STR_FMT_M(&k.call_id), STR_FMT_M(&k.from_tag));
monologue_offer_answer(monologue, &frag->streams, &frag->flags);
next:
fragment_free(frag);
}
g_queue_free(frags);
}
static gboolean fragment_check_cleanup(void *k, void *v, void *p) {
int all = GPOINTER_TO_INT(p);
struct fragment_key *key = k;
GQueue *frags = v;
if (!key || !frags)
return TRUE;
while (frags->length) {
struct sdp_fragment *frag = frags->head->data;
if (!all && timeval_diff(&rtpe_now, &frag->received) <= MAX_FRAG_AGE)
break;
g_queue_pop_head(frags);
fragment_free(frag);
}
if (!frags->length) {
g_queue_free(frags);
return TRUE;
}
return FALSE;
}
static void fragments_cleanup(int all) {
mutex_lock(&sdp_fragments_lock);
g_hash_table_foreach_remove(sdp_fragments, fragment_check_cleanup, GINT_TO_POINTER(all));
mutex_unlock(&sdp_fragments_lock);
}
static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t *input,
bencode_item_t *output, enum call_opmode opmode, const char* addr,
const endpoint_t *sin)
@ -1227,18 +1333,16 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t
/* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */
call = call_get(&flags.call_id);
/* Failover scenario because of timeout on offer response: siprouter tries
* to establish session with another rtpengine2 even though rtpengine1
* might have persisted part of the session. rtpengine2 deletes previous
* call in memory and recreates an OWN call in redis */
// SDP fragments for trickle ICE must always operate on an existing call
if (opmode == OP_OFFER && !flags.fragment) {
if (!call) {
/* call == NULL, should create call */
call = call_get_or_create(&flags.call_id, 0);
}
if (!call && opmode == OP_OFFER && flags.fragment) {
queue_sdp_fragment(ngbuf, &streams, &flags);
errstr = NULL;
goto out;
}
if (opmode == OP_OFFER && !call)
call = call_get_or_create(&flags.call_id, 0);
errstr = "Unknown call-id";
if (!call)
goto out;
@ -1289,12 +1393,21 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t
call->drop_traffic = 0;
}
int do_dequeue = 1;
ret = monologue_offer_answer(monologue, &streams, &flags);
if (!ret) {
// SDP fragments for trickle ICE are consumed with no replacement returned
if (!flags.fragment)
ret = sdp_replace(chopper, &parsed, monologue->active_dialogue, &flags);
}
else if (ret == ERROR_NO_ICE_AGENT && flags.fragment) {
queue_sdp_fragment(ngbuf, &streams, &flags);
ret = 0;
do_dequeue = 0;
}
// streams and flags are invalid after here
struct recording *recording = call->recording;
if (recording != NULL) {
@ -1305,6 +1418,9 @@ static const char *call_offer_answer_ng(struct ng_buffer *ngbuf, bencode_item_t
recording_response(recording, output);
}
if (do_dequeue)
dequeue_sdp_fragments(monologue);
rwlock_unlock_w(&call->master_lock);
if (!flags.no_redis_update) {
@ -2252,6 +2368,26 @@ void call_interfaces_free() {
pcre_free_study(streams_ree);
streams_ree = NULL;
}
fragments_cleanup(1);
g_hash_table_destroy(sdp_fragments);
sdp_fragments = NULL;
mutex_destroy(&sdp_fragments_lock);
}
void call_interfaces_timer() {
fragments_cleanup(0);
}
unsigned static int frag_key_hash(const void *A) {
const struct fragment_key *a = A;
return str_hash(&a->call_id) ^ str_hash(&a->from_tag);
}
static int frag_key_eq(const void *A, const void *B) {
const struct fragment_key *a = A;
const struct fragment_key *b = B;
return str_equal(&a->call_id, &b->call_id)
&& str_equal(&a->from_tag, &b->from_tag);
}
int call_interfaces_init() {
@ -2268,5 +2404,8 @@ int call_interfaces_init() {
return -1;
streams_ree = pcre_study(streams_re, 0, &errptr);
sdp_fragments = g_hash_table_new_full(frag_key_hash, frag_key_eq, fragment_key_free, NULL);
mutex_init(&sdp_fragments_lock);
return 0;
}

@ -62,6 +62,7 @@ enum call_stream_state {
#define ERROR_NO_FREE_PORTS -100
#define ERROR_NO_FREE_LOGS -101
#define ERROR_NO_ICE_AGENT -102
#define MAX_RTP_PACKET_SIZE 8192
#define RTP_BUFFER_HEAD_ROOM 128

@ -160,6 +160,7 @@ void ng_call_stats(struct call *call, const str *fromtag, const str *totag, benc
int call_interfaces_init(void);
void call_interfaces_free(void);
void call_interfaces_timer(void);
#endif

Loading…
Cancel
Save