diff --git a/daemon/call.c b/daemon/call.c index 962c593d9..8189d80ce 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -4182,14 +4182,13 @@ void call_destroy(call_t *c) { } } - k = g_hash_table_get_values(ml->ssrc_hash->nht); - while (k) { + for (k = ml->ssrc_hash->nq.head; k; k = k->next) { struct ssrc_entry_call *se = k->data; // stats output only - no cleanups if (!se->stats_blocks.length || !se->lowest_mos || !se->highest_mos) - goto next_k; + continue; int mos_samples = (se->stats_blocks.length - se->no_mos_count); if (mos_samples < 1) mos_samples = 1; @@ -4232,9 +4231,6 @@ void call_destroy(call_t *c) { se->average_mos.packetloss / mos_samples, se->lowest_mos->packetloss, se->highest_mos->packetloss); - -next_k: - k = g_list_delete_link(k, k); } } diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 6472a23a1..851a15adc 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -3031,9 +3031,7 @@ static void ng_stats_ssrc_mos_entry_dict_avg(const ng_parser_t *parser, parser_a } static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, struct ssrc_hash *ht) { - GList *ll = g_hash_table_get_values(ht->nht); - - for (GList *l = ll; l; l = l->next) { + for (GList *l = ht->nq.head; l; l = l->next) { struct ssrc_entry_call *se = l->data; char tmp[12]; snprintf(tmp, sizeof(tmp), "%" PRIu32, se->h.ssrc); @@ -3073,8 +3071,6 @@ static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, struct ssr ng_stats_ssrc_mos_entry(parser, cent, sb); } } - - g_list_free(ll); } /* call must be locked */ diff --git a/daemon/redis.c b/daemon/redis.c index a4411c544..4bfac29d1 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -2636,11 +2636,10 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) { } // SSRC table dump - rwlock_lock_r(&ml->ssrc_hash->lock); - k = g_hash_table_get_values(ml->ssrc_hash->nht); + LOCK(&ml->ssrc_hash->lock); snprintf(tmp, sizeof(tmp), "ssrc_table-%u", ml->unique_id); parser_arg list = parser->dict_add_list_dup(root, tmp); - for (GList *m = k; m; m = m->next) { + for (GList *m = ml->ssrc_hash->nq.head; m; m = m->next) { struct ssrc_entry_call *se = m->data; inner = parser->list_add_dict(list); @@ -2654,9 +2653,6 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) { JSON_SET_SIMPLE("out_payload_type", "%i", se->output_ctx.tracker.most[0]); // XXX add rest of info } - - g_list_free(k); - rwlock_unlock_r(&ml->ssrc_hash->lock); } // --- for monologues.head for (__auto_type l = c->medias.head; l; l = l->next) { diff --git a/daemon/rtcp.c b/daemon/rtcp.c index b5c520476..bc4175fc4 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -1539,7 +1539,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, } void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash, struct call_monologue *ml) { - rwlock_lock_r(&hash->lock); + LOCK(&hash->lock); for (GList *l = hash->nq.head; l; l = l->next) { struct ssrc_entry_call *e = l->data; struct ssrc_ctx *i = &e->input_ctx; @@ -1551,7 +1551,6 @@ void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash, struct call_mono ssrc_ctx_hold(i); g_queue_push_tail(out, i); } - rwlock_unlock_r(&hash->lock); } diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 5e1614d7a..8a469b388 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -37,7 +37,6 @@ static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) { } static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) { ent->ssrc = ssrc; - ent->last_used = rtpe_now.tv_sec; mutex_init(&ent->lock); ent->link.data = ent; } @@ -53,9 +52,7 @@ static struct ssrc_entry *create_ssrc_entry_call(void *uptr) { } static void add_ssrc_entry(uint32_t ssrc, struct ssrc_entry *ent, struct ssrc_hash *ht) { init_ssrc_entry(ent, ssrc); - g_hash_table_replace(ht->nht, GUINT_TO_POINTER(ent->ssrc), ent); - obj_hold(ent); // HT entry - g_queue_push_tail_link(&ht->nq, &ent->link); + g_queue_push_head_link(&ht->nq, &ent->link); obj_hold(ent); // queue entry } static void free_sender_report(struct ssrc_sender_report_item *i) { @@ -187,33 +184,21 @@ static void mos_calc_legacy(struct ssrc_stats_block *ssb) { ssb->mos = mos_from_rx(r / 10); // e5 } -static void *find_ssrc(uint32_t ssrc, struct ssrc_hash *ht) { - rwlock_lock_r(&ht->lock); - struct ssrc_entry *ret = atomic_get_na(&ht->cache); - if (!ret || ret->ssrc != ssrc) { - ret = g_hash_table_lookup(ht->nht, GUINT_TO_POINTER(ssrc)); - if (ret) { +static void *find_ssrc(uint32_t ssrc, struct ssrc_hash *ht, unsigned int *iters) { + LOCK(&ht->lock); + for (GList *l = ht->nq.head; l; l = l->next) { + struct ssrc_entry *ret = l->data; + if (ret->ssrc == ssrc) { obj_hold(ret); - // cache shares the reference from ht - atomic_set_na(&ht->cache, ret); - ret->last_used = rtpe_now.tv_sec; + // move to front + g_queue_unlink(&ht->nq, &ret->link); + g_queue_push_head_link(&ht->nq, &ret->link); + return ret; } } - else { - obj_hold(ret); - ret->last_used = rtpe_now.tv_sec; - } - rwlock_unlock_r(&ht->lock); - return ret; -} - -static int ssrc_time_cmp(const void *aa, const void *bb, void *pp) { - const struct ssrc_entry *a = aa, *b = bb; - if (a->last_used < b->last_used) - return -1; - if (a->last_used > b->last_used) - return 1; - return 0; + if (iters) + *iters = ht->iters; + return NULL; } // returns a new reference @@ -223,63 +208,58 @@ void *get_ssrc_full(uint32_t ssrc, struct ssrc_hash *ht, bool *created) { if (!ht) return NULL; -restart: - ent = find_ssrc(ssrc, ht); - if (G_LIKELY(ent)) { - if (created) - *created = false; - return ent; - } + while (true) { + unsigned int iters; + ent = find_ssrc(ssrc, ht, &iters); + if (G_LIKELY(ent)) { + if (created) + *created = false; + return ent; + } - // use precreated entry if possible - ent = atomic_get_na(&ht->precreat); - while (1) { - if (!ent) - break; // create one ourselves - if (atomic_compare_exchange(&ht->precreat, &ent, NULL)) - break; - // something got in the way - retry - } - if (G_UNLIKELY(!ent)) - ent = ht->create_func(ht->uptr); - if (G_UNLIKELY(!ent)) - return NULL; + // use precreated entry if possible + ent = atomic_get_na(&ht->precreat); + while (1) { + if (!ent) + break; // create one ourselves + if (atomic_compare_exchange(&ht->precreat, &ent, NULL)) + break; + // something got in the way - retry + } + if (G_UNLIKELY(!ent)) + ent = ht->create_func(ht->uptr); + if (G_UNLIKELY(!ent)) + return NULL; + + LOCK(&ht->lock); + + while (G_UNLIKELY(ht->nq.length > MAX_SSRC_ENTRIES)) { + GList *link = g_queue_pop_tail_link(&ht->nq); + struct ssrc_entry *old_ent = link->data; + ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %s%x%s) - " + "deleting SSRC %s%x%s", + FMT_M(ssrc), FMT_M(old_ent->ssrc)); + obj_put(old_ent); // for the queue entry + } - rwlock_lock_w(&ht->lock); - - while (G_UNLIKELY(ht->nq.length > 20)) { // arbitrary limit - g_queue_sort(&ht->nq, ssrc_time_cmp, NULL); - GList *link = g_queue_pop_head_link(&ht->nq); - struct ssrc_entry *old_ent = link->data; - ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %s%x%s) - " - "deleting SSRC %s%x%s", - FMT_M(ssrc), FMT_M(old_ent->ssrc)); - atomic_set(&ht->cache, NULL); - g_hash_table_remove(ht->nht, GUINT_TO_POINTER(old_ent->ssrc)); // does obj_put - obj_put(old_ent); // for the queue entry - } + if (ht->iters != iters) { + // preempted, something else created an entry + // return created entry if slot is still empty + struct ssrc_entry *null_entry = NULL; + if (!atomic_compare_exchange(&ht->precreat, &null_entry, ent)) + obj_put(ent); + continue; + } + add_ssrc_entry(ssrc, ent, ht); + if (created) + *created = true; - if (g_hash_table_lookup(ht->nht, GUINT_TO_POINTER(ssrc))) { - // preempted - rwlock_unlock_w(&ht->lock); - // return created entry if slot is still empty - struct ssrc_entry *null_entry = NULL; - if (!atomic_compare_exchange(&ht->precreat, &null_entry, ent)) - obj_put(ent); - goto restart; + return ent; } - add_ssrc_entry(ssrc, ent, ht); - atomic_set(&ht->cache, ent); - rwlock_unlock_w(&ht->lock); - if (created) - *created = true; - - return ent; } void free_ssrc_hash(struct ssrc_hash **ht) { if (!*ht) return; - g_hash_table_destroy((*ht)->nht); for (GList *l = (*ht)->nq.head; l;) { GList *next = l->next; ssrc_entry_put(l->data); @@ -294,22 +274,19 @@ void ssrc_hash_foreach(struct ssrc_hash *sh, void (*f)(void *, void *), void *pt if (!sh) return; - rwlock_lock_w(&sh->lock); + LOCK(&sh->lock); for (GList *k = sh->nq.head; k; k = k->next) f(k->data, ptr); if (sh->precreat) f(sh->precreat, ptr); - - rwlock_unlock_w(&sh->lock); } struct ssrc_hash *create_ssrc_hash_full_fast(ssrc_create_func_t cfunc, void *uptr) { struct ssrc_hash *ret; ret = g_new0(__typeof(*ret), 1); - ret->nht = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_entry_put); - rwlock_init(&ret->lock); + mutex_init(&ret->lock); ret->create_func = cfunc; ret->uptr = uptr; return ret; @@ -373,7 +350,7 @@ static struct ssrc_entry_call *hunt_ssrc(struct call_media *media, uint32_t ssrc for (__auto_type sub = media->media_subscriptions.head; sub; sub = sub->next) { struct media_subscription * ms = sub->data; - struct ssrc_entry_call *e = find_ssrc(ssrc, ms->monologue->ssrc_hash); + struct ssrc_entry_call *e = find_ssrc(ssrc, ms->monologue->ssrc_hash, NULL); if (e) return e; } @@ -392,7 +369,7 @@ static long long __calc_rtt(struct call_media *m, struct crtt_args a) if (!a.ntp_middle_bits || !a.delay) return 0; - struct ssrc_entry_call *e = find_ssrc(a.ssrc, a.ht); + struct ssrc_entry_call *e = find_ssrc(a.ssrc, a.ht, NULL); if (G_UNLIKELY(!e)) return 0; diff --git a/include/ssrc.h b/include/ssrc.h index 74cf01223..eaa849f86 100644 --- a/include/ssrc.h +++ b/include/ssrc.h @@ -10,6 +10,8 @@ #include "codeclib.h" #include "types.h" +#define MAX_SSRC_ENTRIES 20 + struct call_media; struct timeval; struct ssrc_entry; @@ -19,13 +21,12 @@ enum ssrc_dir; typedef struct ssrc_entry *(*ssrc_create_func_t)(void *uptr); struct ssrc_hash { - GHashTable *nht; GQueue nq; - rwlock_t lock; + mutex_t lock; ssrc_create_func_t create_func; void *uptr; - struct ssrc_entry *cache; // last used entry struct ssrc_entry *precreat; // next used entry + unsigned int iters; // tracks changes }; struct payload_tracker { mutex_t lock; @@ -86,7 +87,6 @@ struct ssrc_entry { GList link; mutex_t lock; uint32_t ssrc; - time_t last_used; }; struct ssrc_entry_call {