MT#57719 call: funcs introduction for media subscriptions handling

Introduction of the basic functions required for
a handling of the media subscriptions:
- `__unsubscribe_media_link()`
- `__unsubscribe_media()`
- `__unsubscribe_all_medias()`
- `__add_media_subscription()`
- `__subscribe_medias_both_ways()`
- `__subscribe_matched_medias()`
- `call_get_media_subscription()`
- `call_totag_subscribed_to_monologue()`
- `call_viabranch_intact_monologue()`

Change-Id: Iad2af5323b2ea8a10a83064d9ee72106c1d8f9c0
pull/1722/head
Donat Zenichev 2 years ago
parent 259b3a7088
commit 2a9564983b

@ -70,14 +70,16 @@ static struct mqtt_timer *global_mqtt_timer;
unsigned int call_socket_cpu_affinity = 0;
/* ********** */
/**
* locally needed static declarations
*/
static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start,
struct timeval *interval_duration);
static void __call_free(void *p);
static void __call_cleanup(struct call *c);
static void __monologue_stop(struct call_monologue *ml);
static void media_stop(struct call_media *m);
static void __subscribe_medias_both_ways(struct call_media * a, struct call_media * b);
/* called with call->master_lock held in R */
static int call_timer_delete_monologues(struct call *c) {
@ -2932,6 +2934,26 @@ void call_subscriptions_clear(GQueue *q) {
g_queue_clear_full(q, call_subscription_free);
}
static void __unsubscribe_media_link(struct call_media * which, GList * which_cm_link)
{
struct media_subscription * ms = which_cm_link->data;
struct media_subscription * rev_ms = ms->link->data;
struct call_media * from = ms->media;
ilog(LOG_DEBUG, "Unsubscribing media with monologue tag '" STR_FORMAT_M "' (index: %d)"
"from media with monologue tag '" STR_FORMAT_M "' (index: %d)",
STR_FMT_M(&which->monologue->tag), which->index,
STR_FMT_M(&from->monologue->tag), from->index);
g_queue_delete_link(&from->media_subscribers, ms->link);
g_queue_delete_link(&which->media_subscriptions, which_cm_link);
g_hash_table_remove(which->media_subscriptions_ht, ms->media);
g_hash_table_remove(from->media_subscribers_ht, rev_ms->media);
g_slice_free1(sizeof(*ms), ms);
g_slice_free1(sizeof(*rev_ms), rev_ms);
}
static void __unsubscribe_one_link(struct call_monologue *which, GList *which_cs_link) {
struct call_subscription *cs = which_cs_link->data;
struct call_subscription *rev_cs = cs->link->data;
@ -2946,6 +2968,26 @@ static void __unsubscribe_one_link(struct call_monologue *which, GList *which_cs
g_slice_free1(sizeof(*cs), cs);
g_slice_free1(sizeof(*rev_cs), rev_cs);
}
/**
* Unsubscribe one particular media subscriber from this call media.
*/
static bool __unsubscribe_media(struct call_media * which, struct call_media * from)
{
GList * l = g_hash_table_lookup(which->media_subscriptions_ht, from);
if (!l) {
ilog(LOG_DEBUG, "Media with monologue tag '" STR_FORMAT_M "' (index: %d) "
"is not subscribed to media with monologue tag '" STR_FORMAT_M "' "
"(index: %d). Cannot remove this media subscriber.",
STR_FMT_M(&which->monologue->tag), which->index,
STR_FMT_M(&from->monologue->tag), from->index);
return false;
}
__unsubscribe_media_link(which, l);
return true;
}
static bool __unsubscribe_one(struct call_monologue *which, struct call_monologue *from) {
GList *l = g_hash_table_lookup(which->subscriptions_ht, from);
if (!l) {
@ -2971,6 +3013,27 @@ static void __unsubscribe_all_offer_answer_subscribers(struct call_monologue *ml
l = next;
}
}
/**
* Deletes all offer/answer media subscriptions.
*/
static void __unsubscribe_all_offer_answer_medias(struct call_media * cm) {
for (GList *l = cm->media_subscribers.head; l; )
{
struct media_subscription * ms = l->data;
if (!ms->attrs.offer_answer) {
l = l->next;
continue;
}
GList * next = l->next;
struct call_media * other_cm = ms->media;
__unsubscribe_media(other_cm, cm);
__unsubscribe_media(cm, other_cm);
l = next;
}
}
static void __unsubscribe_from_all(struct call_monologue *ml) {
for (GList *l = ml->subscriptions.head; l; ) {
GList *next = l->next;
@ -2978,6 +3041,68 @@ static void __unsubscribe_from_all(struct call_monologue *ml) {
l = next;
}
}
static void __unsubscribe_medias_from_all(struct call_monologue *ml) {
for (int i = 0; i < ml->medias->len; i++)
{
struct call_media * media = ml->medias->pdata[i];
if (!media)
continue;
for (GList * subcription = media->media_subscriptions.head; subcription; )
{
GList *next = subcription->next;
__unsubscribe_media_link(media, subcription);
subcription = next;
}
}
}
static void __add_media_subscription(struct call_media * which, struct call_media * to,
const struct sink_attrs *attrs)
{
if (g_hash_table_lookup(which->media_subscriptions_ht, to)) {
ilog(LOG_DEBUG, "Media with monologue tag '" STR_FORMAT_M "' (index: %d) is already subscribed"
" to media with monologue tag '" STR_FORMAT_M "' (index: %d)",
STR_FMT_M(&which->monologue->tag), which->index,
STR_FMT_M(&to->monologue->tag), to->index);
return;
}
ilog(LOG_DEBUG, "Subscribing media with monologue tag '" STR_FORMAT_M "' (index: %d) "
"to media with monologue tag '" STR_FORMAT_M "' (index: %d)",
STR_FMT_M(&which->monologue->tag), which->index,
STR_FMT_M(&to->monologue->tag), to->index);
struct media_subscription *which_ms = g_slice_alloc0(sizeof(*which_ms));
struct media_subscription *to_rev_ms = g_slice_alloc0(sizeof(*to_rev_ms));
which_ms->media = to;
to_rev_ms->media = which;
which_ms->monologue = to->monologue;
to_rev_ms->monologue = which->monologue;
/* preserve attributes if they were present previously */
if (attrs) {
which_ms->attrs = * attrs;
to_rev_ms->attrs = * attrs;
}
/* keep offer-answer subscriptions first in the list */
if (!attrs || !attrs->offer_answer) {
g_queue_push_tail(&which->media_subscriptions, which_ms);
g_queue_push_tail(&to->media_subscribers, to_rev_ms);
which_ms->link = to->media_subscribers.tail;
to_rev_ms->link = which->media_subscriptions.tail;
} else {
g_queue_push_head(&which->media_subscriptions, which_ms);
g_queue_push_head(&to->media_subscribers, to_rev_ms);
which_ms->link = to->media_subscribers.head;
to_rev_ms->link = which->media_subscriptions.head;
}
g_hash_table_insert(which->media_subscriptions_ht, to, to_rev_ms->link);
g_hash_table_insert(to->media_subscribers_ht, which, which_ms->link);
}
void __add_subscription(struct call_monologue *which, struct call_monologue *to,
unsigned int offset, const struct sink_attrs *attrs)
{
@ -3017,6 +3142,40 @@ void __add_subscription(struct call_monologue *which, struct call_monologue *to,
g_hash_table_insert(which->subscriptions_ht, to, to_rev_cs->link);
g_hash_table_insert(to->subscribers_ht, which, which_cs->link);
}
/**
* Subscribe medias to each other.
*/
static void __subscribe_medias_both_ways(struct call_media * a, struct call_media * b)
{
if (!a || !b)
return;
/* retrieve previous subscriptions to retain attributes */
struct media_subscription *a_ms = call_get_media_subscription(a->media_subscriptions_ht, b);
struct media_subscription *b_ms = call_get_media_subscription(b->media_subscriptions_ht, a);
/* copy out attributes */
struct sink_attrs a_attrs = {0,};
struct sink_attrs b_attrs = {0,};
if (a_ms)
a_attrs = a_ms->attrs;
if (b_ms)
b_attrs = b_ms->attrs;
/* override/reset some attributes */
a_attrs.offer_answer = b_attrs.offer_answer = true;
a_attrs.egress = b_attrs.egress = false;
a_attrs.rtcp_only = b_attrs.rtcp_only = false;
/* release existing subscriptions both ways */
__unsubscribe_all_offer_answer_medias(a);
__unsubscribe_all_offer_answer_medias(b);
/* (re)create, preserving existing attributes if there have been any */
__add_media_subscription(a, b, &a_attrs);
__add_media_subscription(b, a, &b_attrs);
}
static void __subscribe_offer_answer_both_ways(struct call_monologue *a, struct call_monologue *b) {
// retrieve previous subscriptions to retain attributes
struct call_subscription *a_cs = call_get_call_subscription(a->subscriptions_ht, b);
@ -3040,6 +3199,52 @@ static void __subscribe_offer_answer_both_ways(struct call_monologue *a, struct
__add_subscription(b, a, 0, &b_attrs);
}
/**
* Subscribe media lines to each other respecting the given order in the SDP offer/answer.
* If there are `media_id` (mid) presented, then use a mid ordering instead.
*/
static void __subscribe_matched_medias(struct call_monologue * a_ml, struct call_monologue * b_ml)
{
GPtrArray * a_medias = a_ml->medias;
GPtrArray * b_medias = b_ml->medias;
/* A properly formed answer SDP has the same number of m= lines as the offer SDP,
* and in the same order. Media types must match up. */
if (a_medias->len != b_medias->len) {
ilog(LOG_WARN, "Non-matching amount of media sections in monologues, cannot subscribe them!");
return;
}
for (int i = 0; i < a_medias->len; i++)
{
struct call_media * a_media = a_medias->pdata[i];
struct call_media * b_media;
if (!a_media)
continue;
/* first try matching based on media_id */
if (a_media->media_id.s) {
b_media = g_hash_table_lookup(b_ml->media_ids, &a_media->media_id);
if (b_media) {
__subscribe_medias_both_ways(a_media, b_media);
continue; /* we found a matched one, go ahead to another one */
}
}
/* then a matching based on usual ordering */
b_media = b_medias->pdata[i];
if (!b_media)
continue;
if (a_media->type_id != b_media->type_id) {
ilog(LOG_WARN, "Wrong ordering of media sections in monologues, skip the '%d' media section.", i);
continue;
}
__subscribe_medias_both_ways(a_media, b_media);
}
}
// return subscription objects, valid only immediately after __subscribe_offer_answer_both_ways
static void __offer_answer_get_subscriptions(struct call_monologue *a, struct call_monologue *b,
struct call_subscription *rets[2])
@ -3048,8 +3253,15 @@ static void __offer_answer_get_subscriptions(struct call_monologue *a, struct ca
rets[1] = a->subscribers.head->data;
}
/**
* Retrieve exsisting media subscriptions for a call monologue.
*/
struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm) {
GList * l = g_hash_table_lookup(ht, cm);
if (!l)
return NULL;
return l->data;
}
struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml) {
GList *l = g_hash_table_lookup(ht, ml);
if (!l)
@ -3057,8 +3269,6 @@ struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call
return l->data;
}
/* called with call->master_lock held in W */
__attribute__((nonnull(1, 2, 3)))
int monologue_publish(struct call_monologue *ml, GQueue *streams, struct sdp_ng_flags *flags) {
@ -4179,6 +4389,75 @@ static bool call_monologues_associations_left(struct call * c) {
return false;
}
/**
* Check whether given totag is already subscribed to the given monologue medias.
* Returns: true - subscribed, false - not subscribed.
*/
static bool call_totag_subscribed_to_monologue(const str * totag, const struct call_monologue * monologue)
{
if (!totag && !totag->s)
return false;
for (int i = 0; i < monologue->medias->len; i++)
{
struct call_media * media = monologue->medias->pdata[i];
if (!media)
continue;
for (GList * subscriber = media->media_subscribers.head;
subscriber;
subscriber = subscriber->next)
{
struct media_subscription * ms = subscriber->data;
if (!ms->attrs.offer_answer) /* is this really needed? */
continue;
struct call_monologue * subscriber_monologue = ms->monologue;
if (!str_cmp_str(&subscriber_monologue->tag, totag)) /* subscriber found */
return true;
}
}
return false;
}
/**
* Check whether given viabranch is intact with a monologue, which owns
* existing media subscribriptions to it.
*
* It tags a monologue, media of which is subscribed to given monologue, using given viabranch,
* in case previous other side hasn't been tagged with the via-branch
*
* Returns: true - intact, false - not intact.
*/
static bool call_viabranch_intact_monologue(const str * viabranch, struct call_monologue * monologue)
{
for (int i = 0; i < monologue->medias->len; i++)
{
struct call_media * media = monologue->medias->pdata[i];
if (!media)
continue;
for (GList * subscriber = media->media_subscribers.head;
subscriber;
subscriber = subscriber->next)
{
struct media_subscription * ms = subscriber->data;
struct call_monologue * subscriber_monologue = ms->monologue;
/* check the viabranch. if it's not known, then this is a branched offer and we need
* to create a new "other side" for this branch. */
if (!subscriber_monologue->viabranch.s) {
/* previous "other side" hasn't been tagged with the via-branch, so we'll just
* use this one and tag it */
__monologue_viabranch(subscriber_monologue, viabranch);
return true;
}
if (!str_cmp_str(&subscriber_monologue->viabranch, viabranch))
return true; /* dialogue still intact */
}
}
return false;
}
/**
* Based on given From-tag create a new monologue for this dialog,
* if given tag wasn't present in 'tags' of this call.

@ -722,6 +722,7 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch);
struct packet_stream *__packet_stream_new(struct call *call);
void __add_subscription(struct call_monologue *ml, struct call_monologue *other,
unsigned int media_offset, const struct sink_attrs *);
struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm);
struct call_subscription *call_get_call_subscription(GHashTable *ht, struct call_monologue *ml);
void free_sink_handler(void *);
void __add_sink_handler(GQueue *, struct packet_stream *, const struct sink_attrs *);

Loading…
Cancel
Save