MT#57719 redis: Use media subscriptions approach

Start using the media subscriptions model
(based on newly introduced `media_subscription` objects)
in scope of:

- `redis_encode_json()`
- `json_link_medias()`
- `rbl_subs_cb()`

Change-Id: I3f7267ab156b361d7e7bec4ff91a8976a7be02ee
pull/1729/head
Donat Zenichev 3 years ago
parent ecc5420300
commit f984c8e5e3

@ -3077,7 +3077,7 @@ static void __unsubscribe_medias_from_all(struct call_monologue *ml) {
} }
} }
} }
static void __add_media_subscription(struct call_media * which, struct call_media * to, void __add_media_subscription(struct call_media * which, struct call_media * to,
const struct sink_attrs *attrs) const struct sink_attrs *attrs)
{ {
if (g_hash_table_lookup(which->media_subscriptions_ht, to)) { if (g_hash_table_lookup(which->media_subscriptions_ht, to)) {

@ -1653,41 +1653,45 @@ static int redis_link_sfds(struct redis_list *sfds, struct redis_list *streams)
return 0; return 0;
} }
/**
* Supports only `media-subscriptions-*` structures.
* Restores media subscriptions based on:
* `unique_id`, `offer_answer`, `rtcp_only`, `egress`
*/
static int rbl_subs_cb(str *s, callback_arg_t dummy, struct redis_list *list, void *ptr) { static int rbl_subs_cb(str *s, callback_arg_t dummy, struct redis_list *list, void *ptr) {
str token; str token;
if (str_token_sep(&token, s, '/')) if (str_token_sep(&token, s, '/'))
return -1; return -1;
unsigned int idx = str_to_i(&token, 0); unsigned int media_unique_id = str_to_i(&token, 0);
unsigned int media_offset = 0;
bool offer_answer = false; bool offer_answer = false;
bool rtcp_only = false; bool rtcp_only = false;
bool egress = false; bool egress = false;
if (!str_token_sep(&token, s, '/')) { if (!str_token_sep(&token, s, '/')) {
media_offset = str_to_i(&token, 0); offer_answer = str_to_i(&token, 0) ? true : false;
if (!str_token_sep(&token, s, '/')) { if (!str_token_sep(&token, s, '/')) {
offer_answer = str_to_i(&token, 0) ? true : false; rtcp_only = str_to_i(&token, 0) ? true : false;
if (!str_token_sep(&token, s, '/')) { if (!str_token_sep(&token, s, '/'))
rtcp_only = str_to_i(&token, 0) ? true : false; egress = str_to_i(&token, 0) ? true : false;
if (!str_token_sep(&token, s, '/'))
egress = str_to_i(&token, 0) ? true : false;
}
} }
} }
struct call_monologue *ml = ptr; struct call_media *media = ptr;
struct call_monologue *other_ml = redis_list_get_idx_ptr(list, idx); struct call_media *other_media = redis_list_get_idx_ptr(list, media_unique_id);
if (!other_ml) if (!other_media)
return -1; return -1;
__add_subscription(ml, other_ml, media_offset, &(struct sink_attrs) { __add_media_subscription(media, other_media,
.offer_answer = offer_answer, &(struct sink_attrs) {
.rtcp_only = rtcp_only, .offer_answer = offer_answer,
.egress = egress, .rtcp_only = rtcp_only,
}); .egress = egress,
});
codec_handlers_update(other_media, media, .reset_transcoding = true);
return 0; return 0;
} }
@ -1699,46 +1703,33 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_
GQueue q = G_QUEUE_INIT; GQueue q = G_QUEUE_INIT;
GList *l; GList *l;
for (i = 0; i < tags->len; i++) { for (i = 0; i < tags->len; i++)
{
ml = tags->ptrs[i]; ml = tags->ptrs[i];
if (!json_build_list_cb((callback_arg_t) NULL, c, "subscriptions", i, tags, rbl_subs_cb, ml, root_reader)) { char key_subscriptions[256], key_subscriptions_oa[256], key_subscriptions_noa[256];
// new format, ok snprintf(key_subscriptions, 256, "subscriptions-%u", i);
; snprintf(key_subscriptions, 256, "subscriptions-oa-%u", i);
} snprintf(key_subscriptions, 256, "subscriptions-noa-%u", i);
else if (!json_build_list(&q, c, "subscriptions-oa", i, tags, root_reader)) {
// legacy format
for (l = q.head; l; l = l->next) {
other_ml = l->data;
if (!other_ml)
return -1;
__add_subscription(ml, other_ml, 0,
&(struct sink_attrs) { .offer_answer = true });
}
g_queue_clear(&q);
if (json_build_list(&q, c, "subscriptions-noa", i, tags, root_reader)) /* Legacy */
return -1; if (json_reader_read_member(root_reader, key_subscriptions))
for (l = q.head; l; l = l->next) { rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped.");
other_ml = l->data; json_reader_end_member(root_reader);
if (!other_ml)
return -1;
__add_subscription(ml, other_ml, 0, NULL);
}
g_queue_clear(&q);
}
// backwards compatibility if (json_reader_read_member(root_reader, key_subscriptions_oa))
if (!ml->subscriptions.length) { rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped.");
other_ml = redis_list_get_ptr(tags, &tags->rh[i], "active"); json_reader_end_member(root_reader);
if (other_ml)
__add_subscription(ml, other_ml, 0,
&(struct sink_attrs) { .offer_answer = true });
}
if (json_reader_read_member(root_reader, key_subscriptions_noa))
rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped.");
json_reader_end_member(root_reader);
/* associated tags */
if (json_build_list(&q, c, "associated_tags", i, tags, root_reader)) if (json_build_list(&q, c, "associated_tags", i, tags, root_reader))
return -1; return -1;
for (l = q.head; l; l = l->next) { for (l = q.head; l; l = l->next)
{
other_ml = l->data; other_ml = l->data;
if (!other_ml) if (!other_ml)
return -1; return -1;
@ -1840,8 +1831,11 @@ static int json_link_streams(struct call *c, struct redis_list *streams,
static int json_link_medias(struct call *c, struct redis_list *medias, static int json_link_medias(struct call *c, struct redis_list *medias,
struct redis_list *streams, struct redis_list *maps, struct redis_list *tags, JsonReader *root_reader) struct redis_list *streams, struct redis_list *maps, struct redis_list *tags, JsonReader *root_reader)
{ {
for (unsigned int i = 0; i < medias->len; i++) { for (unsigned int i = 0; i < medias->len; i++)
{
struct call_media *med = medias->ptrs[i]; struct call_media *med = medias->ptrs[i];
if (!med)
continue;
med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag");
if (!med->monologue) if (!med->monologue)
@ -1854,21 +1848,11 @@ static int json_link_medias(struct call *c, struct redis_list *medias,
if (med->media_id.s) if (med->media_id.s)
g_hash_table_insert(med->monologue->media_ids, &med->media_id, med); g_hash_table_insert(med->monologue->media_ids, &med->media_id, med);
// find the pair media /* find the pair media to subscribe */
struct call_monologue *ml = med->monologue; if (!json_build_list_cb((callback_arg_t) NULL, c, "media-subscriptions", med->unique_id,
for (GList *sub = ml->subscribers.head; sub; sub = sub->next) { medias, rbl_subs_cb, med, root_reader))
struct call_subscription *cs = sub->data; {
struct call_monologue *other_ml = cs->monologue; rlog(LOG_DEBUG, "Restored media subscriptions for: '" STR_FORMAT_M "'", STR_FMT_M(&med->monologue->tag));
for (unsigned int j = 0; j < other_ml->medias->len; j++) {
struct call_media *other_m = other_ml->medias->pdata[j];
if (!other_m)
continue;
other_m->monologue = other_ml;
if (other_m->index == med->index) {
codec_handlers_update(other_m, med, .reset_transcoding = true);
break;
}
}
} }
} }
return 0; return 0;
@ -2563,25 +2547,29 @@ char* redis_encode_json(struct call *c) {
g_list_free(k); g_list_free(k);
rwlock_unlock_r(&ml->ssrc_hash->lock); rwlock_unlock_r(&ml->ssrc_hash->lock);
}
for (l = c->medias.head; l; l = l->next) {
media = l->data;
snprintf(tmp, sizeof(tmp), "subscriptions-%u", ml->unique_id); if (!media)
continue;
/* store media subscriptions */
snprintf(tmp, sizeof(tmp), "media-subscriptions-%u", media->unique_id);
json_builder_set_member_name(builder, tmp); json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder); json_builder_begin_array(builder);
for (k = ml->subscriptions.head; k; k = k->next) {
struct call_subscription *cs = k->data; for (GList * sub = media->media_subscriptions.head; sub; sub = sub->next)
JSON_ADD_STRING("%u/%u/%u/%u/%u", {
cs->monologue->unique_id, struct media_subscription * ms = sub->data;
cs->media_offset, JSON_ADD_STRING("%u/%u/%u/%u",
cs->attrs.offer_answer, ms->media->unique_id,
cs->attrs.rtcp_only, ms->attrs.offer_answer,
cs->attrs.egress); ms->attrs.rtcp_only,
ms->attrs.egress);
} }
json_builder_end_array(builder); json_builder_end_array(builder);
}
for (l = c->medias.head; l; l = l->next) {
media = l->data;
snprintf(tmp, sizeof(tmp), "media-%u", media->unique_id); snprintf(tmp, sizeof(tmp), "media-%u", media->unique_id);
json_builder_set_member_name(builder, tmp); json_builder_set_member_name(builder, tmp);

@ -723,6 +723,8 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch);
struct packet_stream *__packet_stream_new(struct call *call); struct packet_stream *__packet_stream_new(struct call *call);
void __add_subscription(struct call_monologue *ml, struct call_monologue *other, void __add_subscription(struct call_monologue *ml, struct call_monologue *other,
unsigned int media_offset, const struct sink_attrs *); unsigned int media_offset, const struct sink_attrs *);
void __add_media_subscription(struct call_media * which, struct call_media * to,
const struct sink_attrs *attrs);
struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm); struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm);
struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml); struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml);
void free_sink_handler(void *); void free_sink_handler(void *);

@ -300,11 +300,11 @@ $json_exp = {
'2', '2',
'3' '3'
], ],
'subscriptions-0' => [ 'media-subscriptions-0' => [
'1/0/1/0/0' '1/1/0/0'
], ],
'subscriptions-1' => [ 'media-subscriptions-1' => [
'0/0/1/0/0' '0/1/0/0'
], ],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',
@ -556,11 +556,11 @@ $json_exp = {
'2', '2',
'3' '3'
], ],
'subscriptions-0' => [ 'media-subscriptions-0' => [
'1/0/1/0/0' '1/1/0/0'
], ],
'subscriptions-1' => [ 'media-subscriptions-1' => [
'0/0/1/0/0' '0/1/0/0'
], ],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',
@ -821,11 +821,11 @@ $json_exp = {
'2', '2',
'3' '3'
], ],
'subscriptions-0' => [ 'media-subscriptions-0' => [
'1/0/1/0/0' '1/1/0/0'
], ],
'subscriptions-1' => [ 'media-subscriptions-1' => [
'0/0/1/0/0' '0/1/0/0'
], ],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',
@ -1080,11 +1080,11 @@ $json_exp = {
'2', '2',
'3' '3'
], ],
'subscriptions-0' => [ 'media-subscriptions-0' => [
'1/0/1/0/0' '1/1/0/0'
], ],
'subscriptions-1' => [ 'media-subscriptions-1' => [
'0/0/1/0/0' '0/1/0/0'
], ],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',
@ -1522,15 +1522,14 @@ $json_exp = {
'6', '6',
'7' '7'
], ],
'subscriptions-0' => [ 'media-subscriptions-0' => [
'1/0/1/0/0' '1/1/0/0'
], ],
'subscriptions-1' => [ 'media-subscriptions-1' => [
'0/0/1/0/0' '0/1/0/0'
], ],
'subscriptions-2' => [ 'media-subscriptions-2' => [
'0/0/0/0/0', '1/0/0/0'
'1/1/0/0/0'
], ],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',
@ -1735,7 +1734,7 @@ $json_exp = {
'0', '0',
'1' '1'
], ],
'subscriptions-0' => [], 'media-subscriptions-0' => [],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',
'block_media' => '0', 'block_media' => '0',
@ -1971,9 +1970,9 @@ $json_exp = {
'2', '2',
'3' '3'
], ],
'subscriptions-0' => [], 'media-subscriptions-0' => [],
'subscriptions-1' => [ 'media-subscriptions-1' => [
'0/0/0/0/0' '0/0/0/0'
], ],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',
@ -2210,9 +2209,9 @@ $json_exp = {
'2', '2',
'3' '3'
], ],
'subscriptions-0' => [], 'media-subscriptions-0' => [],
'subscriptions-1' => [ 'media-subscriptions-1' => [
'0/0/0/0/0' '0/0/0/0'
], ],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',
@ -2539,12 +2538,12 @@ $json_exp = {
'4', '4',
'5' '5'
], ],
'subscriptions-0' => [], 'media-subscriptions-0' => [],
'subscriptions-1' => [ 'media-subscriptions-1' => [
'0/0/0/0/0' '0/0/0/0'
], ],
'subscriptions-2' => [ 'media-subscriptions-2' => [
'0/0/0/0/0' '0/0/0/0'
], ],
'tag-0' => { 'tag-0' => {
'block_dtmf' => '0', 'block_dtmf' => '0',

Loading…
Cancel
Save