From f984c8e5e36709bd0fc3d00ca6aea3da16c89bc6 Mon Sep 17 00:00:00 2001 From: Donat Zenichev Date: Mon, 28 Aug 2023 14:44:10 +0200 Subject: [PATCH] MT#57719 redis: Use media subscriptions approach Start using the media subscriptions model (based on newly introduced `media_subscription` objects) in scope of: - `redis_encode_json()` - `json_link_medias()` - `rbl_subs_cb()` Change-Id: I3f7267ab156b361d7e7bec4ff91a8976a7be02ee --- daemon/call.c | 2 +- daemon/redis.c | 148 ++++++++++++++++------------------- include/call.h | 2 + t/auto-daemon-tests-redis.pl | 69 ++++++++-------- 4 files changed, 105 insertions(+), 116 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index ce682cd89..b4d8af1b3 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -3077,7 +3077,7 @@ static void __unsubscribe_medias_from_all(struct call_monologue *ml) { } } } -static void __add_media_subscription(struct call_media * which, struct call_media * to, +void __add_media_subscription(struct call_media * which, struct call_media * to, const struct sink_attrs *attrs) { if (g_hash_table_lookup(which->media_subscriptions_ht, to)) { diff --git a/daemon/redis.c b/daemon/redis.c index e721fd14f..d65cd464d 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1653,41 +1653,45 @@ static int redis_link_sfds(struct redis_list *sfds, struct redis_list *streams) return 0; } +/** + * Supports only `media-subscriptions-*` structures. + * Restores media subscriptions based on: + * `unique_id`, `offer_answer`, `rtcp_only`, `egress` + */ static int rbl_subs_cb(str *s, callback_arg_t dummy, struct redis_list *list, void *ptr) { str token; if (str_token_sep(&token, s, '/')) return -1; - unsigned int idx = str_to_i(&token, 0); + unsigned int media_unique_id = str_to_i(&token, 0); - unsigned int media_offset = 0; bool offer_answer = false; bool rtcp_only = false; bool egress = false; if (!str_token_sep(&token, s, '/')) { - media_offset = str_to_i(&token, 0); + offer_answer = str_to_i(&token, 0) ? true : false; if (!str_token_sep(&token, s, '/')) { - offer_answer = str_to_i(&token, 0) ? true : false; - if (!str_token_sep(&token, s, '/')) { - rtcp_only = str_to_i(&token, 0) ? true : false; - if (!str_token_sep(&token, s, '/')) - egress = str_to_i(&token, 0) ? true : false; - } + rtcp_only = str_to_i(&token, 0) ? true : false; + if (!str_token_sep(&token, s, '/')) + egress = str_to_i(&token, 0) ? true : false; } } - struct call_monologue *ml = ptr; - struct call_monologue *other_ml = redis_list_get_idx_ptr(list, idx); - if (!other_ml) + struct call_media *media = ptr; + struct call_media *other_media = redis_list_get_idx_ptr(list, media_unique_id); + if (!other_media) return -1; - __add_subscription(ml, other_ml, media_offset, &(struct sink_attrs) { - .offer_answer = offer_answer, - .rtcp_only = rtcp_only, - .egress = egress, - }); + __add_media_subscription(media, other_media, + &(struct sink_attrs) { + .offer_answer = offer_answer, + .rtcp_only = rtcp_only, + .egress = egress, + }); + + codec_handlers_update(other_media, media, .reset_transcoding = true); return 0; } @@ -1699,46 +1703,33 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ GQueue q = G_QUEUE_INIT; GList *l; - for (i = 0; i < tags->len; i++) { + for (i = 0; i < tags->len; i++) + { ml = tags->ptrs[i]; - if (!json_build_list_cb((callback_arg_t) NULL, c, "subscriptions", i, tags, rbl_subs_cb, ml, root_reader)) { - // new format, ok - ; - } - else if (!json_build_list(&q, c, "subscriptions-oa", i, tags, root_reader)) { - // legacy format - for (l = q.head; l; l = l->next) { - other_ml = l->data; - if (!other_ml) - return -1; - __add_subscription(ml, other_ml, 0, - &(struct sink_attrs) { .offer_answer = true }); - } - g_queue_clear(&q); + char key_subscriptions[256], key_subscriptions_oa[256], key_subscriptions_noa[256]; + snprintf(key_subscriptions, 256, "subscriptions-%u", i); + snprintf(key_subscriptions, 256, "subscriptions-oa-%u", i); + snprintf(key_subscriptions, 256, "subscriptions-noa-%u", i); - if (json_build_list(&q, c, "subscriptions-noa", i, tags, root_reader)) - return -1; - for (l = q.head; l; l = l->next) { - other_ml = l->data; - if (!other_ml) - return -1; - __add_subscription(ml, other_ml, 0, NULL); - } - g_queue_clear(&q); - } + /* Legacy */ + if (json_reader_read_member(root_reader, key_subscriptions)) + rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped."); + json_reader_end_member(root_reader); - // backwards compatibility - if (!ml->subscriptions.length) { - other_ml = redis_list_get_ptr(tags, &tags->rh[i], "active"); - if (other_ml) - __add_subscription(ml, other_ml, 0, - &(struct sink_attrs) { .offer_answer = true }); - } + if (json_reader_read_member(root_reader, key_subscriptions_oa)) + rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped."); + json_reader_end_member(root_reader); + if (json_reader_read_member(root_reader, key_subscriptions_noa)) + rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped."); + json_reader_end_member(root_reader); + + /* associated tags */ if (json_build_list(&q, c, "associated_tags", i, tags, root_reader)) return -1; - for (l = q.head; l; l = l->next) { + for (l = q.head; l; l = l->next) + { other_ml = l->data; if (!other_ml) return -1; @@ -1840,8 +1831,11 @@ static int json_link_streams(struct call *c, struct redis_list *streams, static int json_link_medias(struct call *c, struct redis_list *medias, struct redis_list *streams, struct redis_list *maps, struct redis_list *tags, JsonReader *root_reader) { - for (unsigned int i = 0; i < medias->len; i++) { + for (unsigned int i = 0; i < medias->len; i++) + { struct call_media *med = medias->ptrs[i]; + if (!med) + continue; med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); if (!med->monologue) @@ -1854,21 +1848,11 @@ static int json_link_medias(struct call *c, struct redis_list *medias, if (med->media_id.s) g_hash_table_insert(med->monologue->media_ids, &med->media_id, med); - // find the pair media - struct call_monologue *ml = med->monologue; - for (GList *sub = ml->subscribers.head; sub; sub = sub->next) { - struct call_subscription *cs = sub->data; - struct call_monologue *other_ml = cs->monologue; - for (unsigned int j = 0; j < other_ml->medias->len; j++) { - struct call_media *other_m = other_ml->medias->pdata[j]; - if (!other_m) - continue; - other_m->monologue = other_ml; - if (other_m->index == med->index) { - codec_handlers_update(other_m, med, .reset_transcoding = true); - break; - } - } + /* find the pair media to subscribe */ + if (!json_build_list_cb((callback_arg_t) NULL, c, "media-subscriptions", med->unique_id, + medias, rbl_subs_cb, med, root_reader)) + { + rlog(LOG_DEBUG, "Restored media subscriptions for: '" STR_FORMAT_M "'", STR_FMT_M(&med->monologue->tag)); } } return 0; @@ -2563,25 +2547,29 @@ char* redis_encode_json(struct call *c) { g_list_free(k); rwlock_unlock_r(&ml->ssrc_hash->lock); + } + + for (l = c->medias.head; l; l = l->next) { + media = l->data; - snprintf(tmp, sizeof(tmp), "subscriptions-%u", ml->unique_id); + if (!media) + continue; + + /* store media subscriptions */ + snprintf(tmp, sizeof(tmp), "media-subscriptions-%u", media->unique_id); json_builder_set_member_name(builder, tmp); json_builder_begin_array(builder); - for (k = ml->subscriptions.head; k; k = k->next) { - struct call_subscription *cs = k->data; - JSON_ADD_STRING("%u/%u/%u/%u/%u", - cs->monologue->unique_id, - cs->media_offset, - cs->attrs.offer_answer, - cs->attrs.rtcp_only, - cs->attrs.egress); + + for (GList * sub = media->media_subscriptions.head; sub; sub = sub->next) + { + struct media_subscription * ms = sub->data; + JSON_ADD_STRING("%u/%u/%u/%u", + ms->media->unique_id, + ms->attrs.offer_answer, + ms->attrs.rtcp_only, + ms->attrs.egress); } json_builder_end_array(builder); - } - - - for (l = c->medias.head; l; l = l->next) { - media = l->data; snprintf(tmp, sizeof(tmp), "media-%u", media->unique_id); json_builder_set_member_name(builder, tmp); diff --git a/include/call.h b/include/call.h index 78751e928..aab4aa569 100644 --- a/include/call.h +++ b/include/call.h @@ -723,6 +723,8 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); struct packet_stream *__packet_stream_new(struct call *call); void __add_subscription(struct call_monologue *ml, struct call_monologue *other, unsigned int media_offset, const struct sink_attrs *); +void __add_media_subscription(struct call_media * which, struct call_media * to, + const struct sink_attrs *attrs); struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm); struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml); void free_sink_handler(void *); diff --git a/t/auto-daemon-tests-redis.pl b/t/auto-daemon-tests-redis.pl index da1cfad9d..9f90b04a4 100755 --- a/t/auto-daemon-tests-redis.pl +++ b/t/auto-daemon-tests-redis.pl @@ -300,11 +300,11 @@ $json_exp = { '2', '3' ], - 'subscriptions-0' => [ - '1/0/1/0/0' + 'media-subscriptions-0' => [ + '1/1/0/0' ], - 'subscriptions-1' => [ - '0/0/1/0/0' + 'media-subscriptions-1' => [ + '0/1/0/0' ], 'tag-0' => { 'block_dtmf' => '0', @@ -556,11 +556,11 @@ $json_exp = { '2', '3' ], - 'subscriptions-0' => [ - '1/0/1/0/0' + 'media-subscriptions-0' => [ + '1/1/0/0' ], - 'subscriptions-1' => [ - '0/0/1/0/0' + 'media-subscriptions-1' => [ + '0/1/0/0' ], 'tag-0' => { 'block_dtmf' => '0', @@ -821,11 +821,11 @@ $json_exp = { '2', '3' ], - 'subscriptions-0' => [ - '1/0/1/0/0' + 'media-subscriptions-0' => [ + '1/1/0/0' ], - 'subscriptions-1' => [ - '0/0/1/0/0' + 'media-subscriptions-1' => [ + '0/1/0/0' ], 'tag-0' => { 'block_dtmf' => '0', @@ -1080,11 +1080,11 @@ $json_exp = { '2', '3' ], - 'subscriptions-0' => [ - '1/0/1/0/0' + 'media-subscriptions-0' => [ + '1/1/0/0' ], - 'subscriptions-1' => [ - '0/0/1/0/0' + 'media-subscriptions-1' => [ + '0/1/0/0' ], 'tag-0' => { 'block_dtmf' => '0', @@ -1522,15 +1522,14 @@ $json_exp = { '6', '7' ], - 'subscriptions-0' => [ - '1/0/1/0/0' + 'media-subscriptions-0' => [ + '1/1/0/0' ], - 'subscriptions-1' => [ - '0/0/1/0/0' + 'media-subscriptions-1' => [ + '0/1/0/0' ], - 'subscriptions-2' => [ - '0/0/0/0/0', - '1/1/0/0/0' + 'media-subscriptions-2' => [ + '1/0/0/0' ], 'tag-0' => { 'block_dtmf' => '0', @@ -1735,7 +1734,7 @@ $json_exp = { '0', '1' ], - 'subscriptions-0' => [], + 'media-subscriptions-0' => [], 'tag-0' => { 'block_dtmf' => '0', 'block_media' => '0', @@ -1971,9 +1970,9 @@ $json_exp = { '2', '3' ], - 'subscriptions-0' => [], - 'subscriptions-1' => [ - '0/0/0/0/0' + 'media-subscriptions-0' => [], + 'media-subscriptions-1' => [ + '0/0/0/0' ], 'tag-0' => { 'block_dtmf' => '0', @@ -2210,9 +2209,9 @@ $json_exp = { '2', '3' ], - 'subscriptions-0' => [], - 'subscriptions-1' => [ - '0/0/0/0/0' + 'media-subscriptions-0' => [], + 'media-subscriptions-1' => [ + '0/0/0/0' ], 'tag-0' => { 'block_dtmf' => '0', @@ -2539,12 +2538,12 @@ $json_exp = { '4', '5' ], - 'subscriptions-0' => [], - 'subscriptions-1' => [ - '0/0/0/0/0' + 'media-subscriptions-0' => [], + 'media-subscriptions-1' => [ + '0/0/0/0' ], - 'subscriptions-2' => [ - '0/0/0/0/0' + 'media-subscriptions-2' => [ + '0/0/0/0' ], 'tag-0' => { 'block_dtmf' => '0',