MT#55283 overhaul SSRC hash

Remove the actual hash and just keep a linked list. Always move most
recently used entries to the front of the list, which also obsoletes
tracking the last used time stamp.

Change-Id: Id277499228b538dd013a4442e9b5c5a4d247ff15
pull/1126/merge
Richard Fuchs 1 month ago
parent 86aa171fdb
commit 2a141c2f92

@ -4182,14 +4182,13 @@ void call_destroy(call_t *c) {
}
}
k = g_hash_table_get_values(ml->ssrc_hash->nht);
while (k) {
for (k = ml->ssrc_hash->nq.head; k; k = k->next) {
struct ssrc_entry_call *se = k->data;
// stats output only - no cleanups
if (!se->stats_blocks.length || !se->lowest_mos || !se->highest_mos)
goto next_k;
continue;
int mos_samples = (se->stats_blocks.length - se->no_mos_count);
if (mos_samples < 1) mos_samples = 1;
@ -4232,9 +4231,6 @@ void call_destroy(call_t *c) {
se->average_mos.packetloss / mos_samples,
se->lowest_mos->packetloss,
se->highest_mos->packetloss);
next_k:
k = g_list_delete_link(k, k);
}
}

@ -3031,9 +3031,7 @@ static void ng_stats_ssrc_mos_entry_dict_avg(const ng_parser_t *parser, parser_a
}
static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, struct ssrc_hash *ht) {
GList *ll = g_hash_table_get_values(ht->nht);
for (GList *l = ll; l; l = l->next) {
for (GList *l = ht->nq.head; l; l = l->next) {
struct ssrc_entry_call *se = l->data;
char tmp[12];
snprintf(tmp, sizeof(tmp), "%" PRIu32, se->h.ssrc);
@ -3073,8 +3071,6 @@ static void ng_stats_ssrc(const ng_parser_t *parser, parser_arg dict, struct ssr
ng_stats_ssrc_mos_entry(parser, cent, sb);
}
}
g_list_free(ll);
}
/* call must be locked */

@ -2636,11 +2636,10 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) {
}
// SSRC table dump
rwlock_lock_r(&ml->ssrc_hash->lock);
k = g_hash_table_get_values(ml->ssrc_hash->nht);
LOCK(&ml->ssrc_hash->lock);
snprintf(tmp, sizeof(tmp), "ssrc_table-%u", ml->unique_id);
parser_arg list = parser->dict_add_list_dup(root, tmp);
for (GList *m = k; m; m = m->next) {
for (GList *m = ml->ssrc_hash->nq.head; m; m = m->next) {
struct ssrc_entry_call *se = m->data;
inner = parser->list_add_dict(list);
@ -2654,9 +2653,6 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c, void **to_free) {
JSON_SET_SIMPLE("out_payload_type", "%i", se->output_ctx.tracker.most[0]);
// XXX add rest of info
}
g_list_free(k);
rwlock_unlock_r(&ml->ssrc_hash->lock);
} // --- for monologues.head
for (__auto_type l = c->medias.head; l; l = l->next) {

@ -1539,7 +1539,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
}
void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash, struct call_monologue *ml) {
rwlock_lock_r(&hash->lock);
LOCK(&hash->lock);
for (GList *l = hash->nq.head; l; l = l->next) {
struct ssrc_entry_call *e = l->data;
struct ssrc_ctx *i = &e->input_ctx;
@ -1551,7 +1551,6 @@ void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash, struct call_mono
ssrc_ctx_hold(i);
g_queue_push_tail(out, i);
}
rwlock_unlock_r(&hash->lock);
}

@ -37,7 +37,6 @@ static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) {
}
static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) {
ent->ssrc = ssrc;
ent->last_used = rtpe_now.tv_sec;
mutex_init(&ent->lock);
ent->link.data = ent;
}
@ -53,9 +52,7 @@ static struct ssrc_entry *create_ssrc_entry_call(void *uptr) {
}
static void add_ssrc_entry(uint32_t ssrc, struct ssrc_entry *ent, struct ssrc_hash *ht) {
init_ssrc_entry(ent, ssrc);
g_hash_table_replace(ht->nht, GUINT_TO_POINTER(ent->ssrc), ent);
obj_hold(ent); // HT entry
g_queue_push_tail_link(&ht->nq, &ent->link);
g_queue_push_head_link(&ht->nq, &ent->link);
obj_hold(ent); // queue entry
}
static void free_sender_report(struct ssrc_sender_report_item *i) {
@ -187,33 +184,21 @@ static void mos_calc_legacy(struct ssrc_stats_block *ssb) {
ssb->mos = mos_from_rx(r / 10); // e5
}
static void *find_ssrc(uint32_t ssrc, struct ssrc_hash *ht) {
rwlock_lock_r(&ht->lock);
struct ssrc_entry *ret = atomic_get_na(&ht->cache);
if (!ret || ret->ssrc != ssrc) {
ret = g_hash_table_lookup(ht->nht, GUINT_TO_POINTER(ssrc));
if (ret) {
static void *find_ssrc(uint32_t ssrc, struct ssrc_hash *ht, unsigned int *iters) {
LOCK(&ht->lock);
for (GList *l = ht->nq.head; l; l = l->next) {
struct ssrc_entry *ret = l->data;
if (ret->ssrc == ssrc) {
obj_hold(ret);
// cache shares the reference from ht
atomic_set_na(&ht->cache, ret);
ret->last_used = rtpe_now.tv_sec;
// move to front
g_queue_unlink(&ht->nq, &ret->link);
g_queue_push_head_link(&ht->nq, &ret->link);
return ret;
}
}
else {
obj_hold(ret);
ret->last_used = rtpe_now.tv_sec;
}
rwlock_unlock_r(&ht->lock);
return ret;
}
static int ssrc_time_cmp(const void *aa, const void *bb, void *pp) {
const struct ssrc_entry *a = aa, *b = bb;
if (a->last_used < b->last_used)
return -1;
if (a->last_used > b->last_used)
return 1;
return 0;
if (iters)
*iters = ht->iters;
return NULL;
}
// returns a new reference
@ -223,63 +208,58 @@ void *get_ssrc_full(uint32_t ssrc, struct ssrc_hash *ht, bool *created) {
if (!ht)
return NULL;
restart:
ent = find_ssrc(ssrc, ht);
if (G_LIKELY(ent)) {
if (created)
*created = false;
return ent;
}
while (true) {
unsigned int iters;
ent = find_ssrc(ssrc, ht, &iters);
if (G_LIKELY(ent)) {
if (created)
*created = false;
return ent;
}
// use precreated entry if possible
ent = atomic_get_na(&ht->precreat);
while (1) {
if (!ent)
break; // create one ourselves
if (atomic_compare_exchange(&ht->precreat, &ent, NULL))
break;
// something got in the way - retry
}
if (G_UNLIKELY(!ent))
ent = ht->create_func(ht->uptr);
if (G_UNLIKELY(!ent))
return NULL;
// use precreated entry if possible
ent = atomic_get_na(&ht->precreat);
while (1) {
if (!ent)
break; // create one ourselves
if (atomic_compare_exchange(&ht->precreat, &ent, NULL))
break;
// something got in the way - retry
}
if (G_UNLIKELY(!ent))
ent = ht->create_func(ht->uptr);
if (G_UNLIKELY(!ent))
return NULL;
LOCK(&ht->lock);
while (G_UNLIKELY(ht->nq.length > MAX_SSRC_ENTRIES)) {
GList *link = g_queue_pop_tail_link(&ht->nq);
struct ssrc_entry *old_ent = link->data;
ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %s%x%s) - "
"deleting SSRC %s%x%s",
FMT_M(ssrc), FMT_M(old_ent->ssrc));
obj_put(old_ent); // for the queue entry
}
rwlock_lock_w(&ht->lock);
while (G_UNLIKELY(ht->nq.length > 20)) { // arbitrary limit
g_queue_sort(&ht->nq, ssrc_time_cmp, NULL);
GList *link = g_queue_pop_head_link(&ht->nq);
struct ssrc_entry *old_ent = link->data;
ilog(LOG_DEBUG, "SSRC hash table exceeded size limit (trying to add %s%x%s) - "
"deleting SSRC %s%x%s",
FMT_M(ssrc), FMT_M(old_ent->ssrc));
atomic_set(&ht->cache, NULL);
g_hash_table_remove(ht->nht, GUINT_TO_POINTER(old_ent->ssrc)); // does obj_put
obj_put(old_ent); // for the queue entry
}
if (ht->iters != iters) {
// preempted, something else created an entry
// return created entry if slot is still empty
struct ssrc_entry *null_entry = NULL;
if (!atomic_compare_exchange(&ht->precreat, &null_entry, ent))
obj_put(ent);
continue;
}
add_ssrc_entry(ssrc, ent, ht);
if (created)
*created = true;
if (g_hash_table_lookup(ht->nht, GUINT_TO_POINTER(ssrc))) {
// preempted
rwlock_unlock_w(&ht->lock);
// return created entry if slot is still empty
struct ssrc_entry *null_entry = NULL;
if (!atomic_compare_exchange(&ht->precreat, &null_entry, ent))
obj_put(ent);
goto restart;
return ent;
}
add_ssrc_entry(ssrc, ent, ht);
atomic_set(&ht->cache, ent);
rwlock_unlock_w(&ht->lock);
if (created)
*created = true;
return ent;
}
void free_ssrc_hash(struct ssrc_hash **ht) {
if (!*ht)
return;
g_hash_table_destroy((*ht)->nht);
for (GList *l = (*ht)->nq.head; l;) {
GList *next = l->next;
ssrc_entry_put(l->data);
@ -294,22 +274,19 @@ void ssrc_hash_foreach(struct ssrc_hash *sh, void (*f)(void *, void *), void *pt
if (!sh)
return;
rwlock_lock_w(&sh->lock);
LOCK(&sh->lock);
for (GList *k = sh->nq.head; k; k = k->next)
f(k->data, ptr);
if (sh->precreat)
f(sh->precreat, ptr);
rwlock_unlock_w(&sh->lock);
}
struct ssrc_hash *create_ssrc_hash_full_fast(ssrc_create_func_t cfunc, void *uptr) {
struct ssrc_hash *ret;
ret = g_new0(__typeof(*ret), 1);
ret->nht = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_entry_put);
rwlock_init(&ret->lock);
mutex_init(&ret->lock);
ret->create_func = cfunc;
ret->uptr = uptr;
return ret;
@ -373,7 +350,7 @@ static struct ssrc_entry_call *hunt_ssrc(struct call_media *media, uint32_t ssrc
for (__auto_type sub = media->media_subscriptions.head; sub; sub = sub->next)
{
struct media_subscription * ms = sub->data;
struct ssrc_entry_call *e = find_ssrc(ssrc, ms->monologue->ssrc_hash);
struct ssrc_entry_call *e = find_ssrc(ssrc, ms->monologue->ssrc_hash, NULL);
if (e)
return e;
}
@ -392,7 +369,7 @@ static long long __calc_rtt(struct call_media *m, struct crtt_args a)
if (!a.ntp_middle_bits || !a.delay)
return 0;
struct ssrc_entry_call *e = find_ssrc(a.ssrc, a.ht);
struct ssrc_entry_call *e = find_ssrc(a.ssrc, a.ht, NULL);
if (G_UNLIKELY(!e))
return 0;

@ -10,6 +10,8 @@
#include "codeclib.h"
#include "types.h"
#define MAX_SSRC_ENTRIES 20
struct call_media;
struct timeval;
struct ssrc_entry;
@ -19,13 +21,12 @@ enum ssrc_dir;
typedef struct ssrc_entry *(*ssrc_create_func_t)(void *uptr);
struct ssrc_hash {
GHashTable *nht;
GQueue nq;
rwlock_t lock;
mutex_t lock;
ssrc_create_func_t create_func;
void *uptr;
struct ssrc_entry *cache; // last used entry
struct ssrc_entry *precreat; // next used entry
unsigned int iters; // tracks changes
};
struct payload_tracker {
mutex_t lock;
@ -86,7 +87,6 @@ struct ssrc_entry {
GList link;
mutex_t lock;
uint32_t ssrc;
time_t last_used;
};
struct ssrc_entry_call {

Loading…
Cancel
Save