MT#55283 revamp calls_merge once again

Maybe this makes sense now

Change-Id: I94c668969aaffd3582d94f8872106f073bb939ba
pull/2097/head
Richard Fuchs 2 weeks ago
parent cf38ee00af
commit 88f0483c4e

@ -5544,43 +5544,56 @@ call_t *call_get(const str *callid) {
return ret;
}
// obtain a reference to a second call while holding a lock to another call
// intermittently releases lock on the first call!
// return return another reference to the same call
call_t *call_get2(call_t *call1, const str *callid2) {
call_t *ret = NULL;
rwlock_unlock_w(&call1->master_lock);
// list of call IDs must be non empty
// returns all calls with lock and reference held
// returns empty list on failure
// returned list might be shorter if duplicate objects were found
// log info and memory arena point to the first call in the list
call_q calls_get(const str_q *call_ids) {
call_q ret = TYPED_GQUEUE_INIT;
if (!call_ids->length)
return ret; // empty queue
while (true) {
RWLOCK_R(&rtpe_callhash_lock);
ret = t_hash_table_lookup(rtpe_callhash, callid2);
if (!ret)
goto out;
for (auto_iter(l, call_ids->head); l; l = l->next) {
const str *callid = l->data;
if (ret == call1) {
obj_hold(ret);
goto out;
}
__auto_type call = t_hash_table_lookup(rtpe_callhash, callid);
if (!call)
goto err;
rwlock_lock_w(&call1->master_lock);
// O(n^2)
if (t_queue_find(&ret, call))
continue;
if (!rwlock_trylock_w(&ret->master_lock)) {
obj_hold(ret);
return ret;
if (rwlock_trylock_w(&call->master_lock))
goto retry;
obj_hold(call);
t_queue_push_tail(&ret, call);
}
rwlock_unlock_w(&call1->master_lock);
// try again
// success
break;
retry:
call_q_unlock_release(&ret);
}
out:
rwlock_lock_w(&call1->master_lock);
log_info_call(ret.head->data);
return ret;
err:
call_q_unlock_release(&ret);
return ret; // empty queue
}
static gboolean fragment_move(str *key, fragment_q *q, void *c) {
call_t *call = c;
t_hash_table_insert(call->sdp_fragments, key, q);
@ -5588,15 +5601,8 @@ static gboolean fragment_move(str *key, fragment_q *q, void *c) {
}
// both calls must be locked and a reference held. call2 reference will be released if successful
bool call_merge(call_t *call, call_t *call2) {
if (!call2)
return true;
if (call == call2) {
obj_put(call2);
return true;
}
__attribute__((nonnull(1, 2)))
static bool call_merge(call_t *call, call_t *call2) {
// chcek for tag collisions: duplicate tags are a failure
for (auto_iter(l, call2->monologues.head); l; l = l->next) {
if (t_hash_table_lookup(call->tags, &l->data->tag))
@ -5702,6 +5708,29 @@ bool call_merge(call_t *call, call_t *call2) {
return true;
}
// all calls must be unique, locked, with a reference held
// returns a single merged call on success, with the list having been cleared out
call_t *calls_merge(call_q *calls) {
__auto_type ret = t_queue_pop_head(calls);
if (!ret)
return NULL;
while (calls->length) {
__auto_type call = t_queue_pop_head(calls);
bool ok = call_merge(ret, call);
if (!ok) {
// return to list and bail
t_queue_push_head(calls, call);
t_queue_push_head(calls, ret);
return NULL;
}
}
return ret;
}
/* returns call with master_lock held in W, or possibly NULL iff opmode == OP_ANSWER */
call_t *call_get_opmode(const str *callid, enum ng_opmode opmode) {
if (opmode == OP_OFFER)
@ -6652,3 +6681,13 @@ void call_unlock_release(call_t *c) {
rwlock_unlock_w(&c->master_lock);
obj_put(c);
}
void call_q_unlock_release(call_q *calls) {
while (calls->length) {
__auto_type call = t_queue_pop_head(calls);
if (!call)
continue;
call_unlock_release(call);
}
}

@ -1451,19 +1451,29 @@ static const char *media_block_match(call_t **call, struct call_monologue **mono
return NULL;
}
static const char *medias_match(call_t **call, medias_q *medias,
static const char *medias_match(call_q *calls, medias_q *medias,
sdp_ng_flags *flags, ng_command_ctx_t *ctx)
{
call_ng_process_flags(flags, ctx);
if (!flags->call_id.s)
return "No call-id in message";
*call = call_get(&flags->call_id);
if (!*call)
return "Unknown call-ID";
g_auto(str_q) call_ids = TYPED_GQUEUE_INIT;
t_queue_push_tail(&call_ids, &flags->call_id);
if (flags->to_call_id.len)
t_queue_push_tail(&call_ids, &flags->to_call_id);
*calls = calls_get(&call_ids);
if (!calls->length)
return "Unknown call-ID(s)";
call_t *call = calls->head->data;
if (flags->all == ALL_ALL) {
for (__auto_type l = (*call)->medias.head; l; l = l->next) {
for (__auto_type l = call->medias.head; l; l = l->next) {
struct call_media *media = l->data;
if (!media || (media->monologue->tagtype != FROM_TAG &&
media->monologue->tagtype != TO_TAG))
@ -1478,7 +1488,7 @@ static const char *medias_match(call_t **call, medias_q *medias,
/* is a single ml given? */
struct call_monologue *ml = NULL;
const char *err = media_match(*call, &ml, flags);
const char *err = media_match(call, &ml, flags);
if (err)
return err;
if (ml) {
@ -1495,7 +1505,7 @@ static const char *medias_match(call_t **call, medias_q *medias,
/* handle from-tag list */
for (__auto_type l = flags->from_tags.head; l; l = l->next) {
str *s = l->data;
struct call_monologue *mlf = call_get_monologue(*call, s);
struct call_monologue *mlf = call_get_monologue(call, s);
if (!mlf) {
ilog(LOG_WARN, "Given from-tag " STR_FORMAT_M " not found", STR_FMT_M(s));
} else {
@ -2177,14 +2187,14 @@ const char *call_subscribe_request_ng(ng_command_ctx_t *ctx) {
const char *err = NULL;
g_auto(sdp_ng_flags) flags;
char rand_buf[65];
g_autoptr(call_t) call = NULL;
g_auto(call_q) calls = TYPED_GQUEUE_INIT;
g_auto(medias_q) mq = TYPED_GQUEUE_INIT;
g_auto(str) sdp_out = STR_NULL;
parser_arg output = ctx->resp;
const ng_parser_t *parser = ctx->parser_ctx.parser;
/* get source monologue */
err = medias_match(&call, &mq, &flags, ctx);
err = medias_match(&calls, &mq, &flags, ctx);
if (err)
return err;
@ -2206,6 +2216,8 @@ const char *call_subscribe_request_ng(ng_command_ctx_t *ctx) {
rand_hex_str(flags.to_tag.s, flags.to_tag.len / 2);
}
g_autoptr(call_t) call = t_queue_pop_head(&calls);
struct call_monologue *dest_ml = call_get_or_create_monologue(call, &flags.to_tag);
int ret = monologue_subscribe_request(&mq, dest_ml, &flags);
@ -2372,8 +2384,6 @@ const char *call_unsubscribe_ng(ng_command_ctx_t *ctx) {
static const char *call_inject_ng(ng_command_ctx_t *ctx, bool start) {
g_auto(sdp_ng_flags) flags;
g_autoptr(call_t) call = NULL;
g_autoptr(call_t) call2 = NULL;
parser_arg input = ctx->req;
const ng_parser_t *parser = ctx->parser_ctx.parser;
@ -2394,29 +2404,29 @@ static const char *call_inject_ng(ng_command_ctx_t *ctx, bool start) {
if (!parser->dict_get_str(input, "source-call-id", &source_call_id))
source_call_id = flags.call_id;
call = call_get(&flags.call_id);
if (!call)
return "Unknown call-ID";
g_auto(str_q) call_ids = TYPED_GQUEUE_INIT;
t_queue_push_tail(&call_ids, &flags.call_id);
t_queue_push_tail(&call_ids, &source_call_id);
call2 = call_get2(call, &source_call_id);
if (!call2)
return "Unknown source call-ID";
g_auto(call_q) calls = calls_get(&call_ids);
if (!calls.length)
return "Unknown call-ID(s)";
g_autoptr(call_t) call = calls_merge(&calls);
if (!call)
return "Failed to merge two calls into one";
struct call_monologue *dst_ml = call_get_monologue(call, &flags.to_tag);
if (!dst_ml)
return "To-tag not found";
struct call_monologue *src_ml = call_get_monologue(call2, &source_tag);
struct call_monologue *src_ml = call_get_monologue(call, &source_tag);
if (!src_ml)
return "Source-tag not found";
if (src_ml == dst_ml)
return "Trying to inject to self";
if (!call_merge(call, call2))
return "Failed to merge two calls into one";
call2 = NULL;
int ret = start
? monologue_inject_start(src_ml, dst_ml, &flags)
: monologue_inject_stop(src_ml, dst_ml, &flags);
@ -2438,33 +2448,24 @@ const char *call_inject_stop_ng(ng_command_ctx_t *ctx) {
const char *call_connect_ng(ng_command_ctx_t *ctx) {
g_auto(sdp_ng_flags) flags;
g_autoptr(call_t) call = NULL;
g_autoptr(call_t) call2 = NULL;
g_auto(call_q) calls = TYPED_GQUEUE_INIT;
g_auto(medias_q) medias = TYPED_GQUEUE_INIT;
const char *err = medias_match(&call, &medias, &flags, ctx);
const char *err = medias_match(&calls, &medias, &flags, ctx);
if (err)
return err;
if (!flags.to_tag.s)
return "No to-tag in message";
if (flags.to_call_id.len) {
call2 = call_get2(call, &flags.to_call_id);
if (!call2)
return "Unknown to-tag call-ID";
}
else
call2 = obj_get(call);
g_autoptr(call_t) call = calls_merge(&calls);
if (!call)
return "Failed to merge two calls into one (tag collision)";
struct call_monologue *dest_ml = call_get_or_create_monologue(call2, &flags.to_tag);
struct call_monologue *dest_ml = call_get_or_create_monologue(call, &flags.to_tag);
if (!dest_ml)
return "To-tag not found";
if (!call_merge(call, call2))
return "Failed to merge two calls into one (tag collision)";
call2 = NULL; // reference released
dialogue_connect(&medias, dest_ml, &flags);
call_unlock_release_update(&call);

@ -881,10 +881,10 @@ struct call_media *call_make_transform_media(struct call_monologue *ml, const st
const str *media_id, const endpoint_t *remote, const str *interface);
__attribute__((nonnull(1)))
call_t *call_get(const str *callid);
__attribute__((nonnull(1, 2)))
call_t *call_get2(call_t *, const str *);
__attribute__((nonnull(1)))
bool call_merge(call_t *, call_t *);
call_q calls_get(const str_q *);
__attribute__((nonnull(1)))
call_t *calls_merge(call_q *);
__attribute__((nonnull(2, 3)))
int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q *streams, sdp_ng_flags *flags);
__attribute__((nonnull(1, 2, 3, 4)))
@ -943,9 +943,14 @@ TYPED_GHASHTABLE(subscription_store_ht, struct call_media, struct media_subscrip
media_direct_hash, media_direct_eq, NULL, media_subscription_free)
__attribute__((nonnull(1)))
void call_unlock_release(call_t *c);
G_DEFINE_AUTOPTR_CLEANUP_FUNC(call_t, call_unlock_release)
__attribute__((nonnull(1)))
void call_q_unlock_release(call_q *);
G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(call_q, call_q_unlock_release);
#include "str.h"

@ -22,6 +22,7 @@ typedef struct _str str;
TYPED_GQUEUE(charp, char)
TYPED_GQUEUE(str, str)
G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC(str_q, str_q_clear)

Loading…
Cancel
Save