diff --git a/daemon/call.c b/daemon/call.c index 05bee2942..6410f555d 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -3887,6 +3887,11 @@ void call_destroy(call_t *c) { rwlock_lock_w(&rtpe_callhash_lock); bool removed = __remove_call_id_from_hash(&c->callid, c); + for (auto_iter(l, c->callid_aliases.head); l; l = l->next) { + __auto_type alias = l->data; + if (__remove_call_id_from_hash(alias, c)) + obj_put(c); + } rwlock_unlock_w(&rtpe_callhash_lock); // if call not found in callhash => previously deleted @@ -4192,6 +4197,7 @@ static void __call_free(void *p) { t_hash_table_destroy(c->tags); t_hash_table_destroy(c->viabranches); t_hash_table_destroy(c->labels); + t_queue_clear(&c->callid_aliases); while (c->streams.head) { ps = t_queue_pop_head(&c->streams); @@ -4341,6 +4347,121 @@ call_t *call_get(const str *callid) { return ret; } +static gboolean fragment_move(str *key, fragment_q *q, void *c) { + call_t *call = c; + t_hash_table_insert(call->sdp_fragments, key, q); + return TRUE; +} + +// both calls must be locked and a reference held. call2 will be released and set to NULL upon return +bool call_merge(call_t *call, call_t **call2p) { + call_t *call2 = *call2p; + + // chcek for tag collisions: duplicate tags are a failure + for (auto_iter(l, call2->monologues.head); l; l = l->next) { + if (t_hash_table_lookup(call->tags, &l->data->tag)) + return false; + } + + ilog(LOG_DEBUG, "Merging call " STR_FORMAT_M " into " STR_FORMAT_M, + STR_FMT_M(&call2->callid), STR_FMT_M(&call->callid)); + + // move buffers + bencode_buffer_merge(&call->buffer, &call2->buffer); + + // move all contained objects: we have to renumber all unique IDs, and redirect any + // `call` pointers + + unsigned int last_id = call->monologues.head->data->unique_id; + while (call2->monologues.head) { + __auto_type ml = t_queue_pop_head(&call2->monologues); + ml->unique_id = ++last_id; + ml->call = call; + t_queue_push_tail(&call->monologues, ml); + t_hash_table_insert(call->tags, &ml->tag, ml); + for (auto_iter(l, ml->tag_aliases.head); l; l = l->next) + t_hash_table_insert(call->tags, l->data, ml); + if (ml->viabranch.len) + t_hash_table_insert(call->viabranches, &ml->viabranch, ml); + if (ml->label.len) + t_hash_table_insert(call->labels, &ml->label, ml); + } + + last_id = call->medias.head->data->unique_id; + while (call2->medias.head) { + __auto_type media = t_queue_pop_head(&call2->medias); + media->unique_id = ++last_id; + media->call = call; + t_queue_push_tail(&call->medias, media); + } + + t_hash_table_foreach_remove(call2->sdp_fragments, fragment_move, call); + + last_id = call->streams.head->data->unique_id; + while (call2->streams.head) { + __auto_type stream = t_queue_pop_head(&call2->streams); + stream->unique_id = ++last_id; + stream->call = call; + t_queue_push_tail(&call->streams, stream); + } + + last_id = call->stream_fds.head->data->unique_id; + while (call2->stream_fds.head) { + __auto_type sfd = t_queue_pop_head(&call2->stream_fds); + sfd->unique_id = ++last_id; + // call objects are held by reference here + if (sfd->call) { + obj_put(sfd->call); + sfd->call = obj_get(call); + } + t_queue_push_tail(&call->stream_fds, sfd); + } + + last_id = call->endpoint_maps.head->data->unique_id; + while (call2->endpoint_maps.head) { + __auto_type endpoint_map = t_queue_pop_head(&call2->endpoint_maps); + endpoint_map->unique_id = ++last_id; + t_queue_push_tail(&call->endpoint_maps, endpoint_map); + } + + // redirect hash table entry for old ID. store old ID in new call + + str *old_id = call_str_dup(&call2->callid); + t_queue_push_tail(&call->callid_aliases, old_id); + + rwlock_lock_w(&rtpe_callhash_lock); + + call_t *call_ht = NULL; + t_hash_table_steal_extended(rtpe_callhash, &call2->callid, NULL, &call_ht); + if (call_ht) { + if (call_ht != call2) { + // already deleted and replace by a different call + t_hash_table_insert(rtpe_callhash, &call_ht->callid, call_ht); + call_ht = NULL; + } + else { + // insert a new reference under the old call ID + t_hash_table_insert(rtpe_callhash, old_id, obj_get(call)); + RTPE_GAUGE_DEC(total_sessions); + } + } // else: already deleted + + rwlock_unlock_w(&rtpe_callhash_lock); + + if (call_ht) + obj_put(call_ht); + + __call_iterator_remove(call2); + mqtt_timer_stop(&call2->mqtt_timer); + __call_cleanup(call2); + + rwlock_unlock_w(&call2->master_lock); + obj_put(call2); + *call2p = NULL; + + return true; +} + /* returns call with master_lock held in W, or possibly NULL iff opmode == OP_ANSWER */ call_t *call_get_opmode(const str *callid, enum ng_opmode opmode) { if (opmode == OP_OFFER) diff --git a/include/call.h b/include/call.h index 2726be840..a3821b134 100644 --- a/include/call.h +++ b/include/call.h @@ -752,6 +752,7 @@ struct call { struct mqtt_timer *mqtt_timer; str callid; + str_q callid_aliases; struct timeval created; struct timeval destroyed; time_t last_signal; @@ -832,6 +833,8 @@ struct call_monologue *call_get_monologue(call_t *call, const str *fromtag); struct call_monologue *call_get_or_create_monologue(call_t *call, const str *fromtag); __attribute__((nonnull(1))) call_t *call_get(const str *callid); +__attribute__((nonnull(1, 2))) +bool call_merge(call_t *, call_t **); __attribute__((nonnull(2, 3))) int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q *streams, sdp_ng_flags *flags); __attribute__((nonnull(1, 2, 3, 4)))