MT#55283 use one timerthread context per thread

This should prevent multiple threads from fighting over the same shared
structures (GTree + mutex + condition). Downside is that it may lead to
load not being perfectly balanced between threads.

Change-Id: I8127b98dcfbeafd692d74e60cdf6d60e3e572ba7
pull/1802/head
Richard Fuchs 1 year ago
parent da7b8f30ff
commit bc504e2aef

@ -439,6 +439,7 @@ static struct ice_agent *__ice_agent_new(struct call_media *media) {
ag = obj_alloc0("ice_agent", sizeof(*ag), __ice_agent_free);
ag->tt_obj.tt = &ice_agents_timer_thread;
ag->tt_obj.thread = &ice_agents_timer_thread.threads[0]; // there's only one thread
ag->call = obj_get(call);
ag->media = media;
mutex_init(&ag->lock);
@ -732,7 +733,9 @@ static void __agent_schedule_abs(struct ice_agent *ag, const struct timeval *tv)
nxt = *tv;
mutex_lock(&ice_agents_timer_thread.lock);
struct timerthread_thread *tt = ag->tt_obj.thread;
mutex_lock(&tt->lock);
if (ag->tt_obj.last_run.tv_sec) {
/* make sure we don't run more often than we should */
diff = timeval_diff(&nxt, &ag->tt_obj.last_run);
@ -740,7 +743,7 @@ static void __agent_schedule_abs(struct ice_agent *ag, const struct timeval *tv)
timeval_add_usec(&nxt, TIMER_RUN_INTERVAL * 1000 - diff);
}
timerthread_obj_schedule_abs_nl(&ag->tt_obj, &nxt);
mutex_unlock(&ice_agents_timer_thread.lock);
mutex_unlock(&tt->lock);
}
static void __agent_deschedule(struct ice_agent *ag) {
if (ag)

@ -9,12 +9,19 @@ static int tt_obj_cmp(const void *a, const void *b) {
return timeval_cmp_ptr(&A->next_check, &B->next_check);
}
void timerthread_init(struct timerthread *tt, unsigned int num, void (*func)(void *)) {
static void timerthread_thread_init(struct timerthread_thread *tt, struct timerthread *parent) {
tt->tree = g_tree_new(tt_obj_cmp);
mutex_init(&tt->lock);
cond_init(&tt->cond);
tt->parent = parent;
}
void timerthread_init(struct timerthread *tt, unsigned int num, void (*func)(void *)) {
tt->func = func;
tt->num_threads = num;
tt->threads = g_malloc(sizeof(*tt->threads) * num);
for (unsigned int i = 0; i < num; i++)
timerthread_thread_init(&tt->threads[i], tt);
}
static int __tt_put_all(void *k, void *d, void *p) {
@ -24,14 +31,21 @@ static int __tt_put_all(void *k, void *d, void *p) {
return FALSE;
}
void timerthread_free(struct timerthread *tt) {
static void timerthread_thread_destroy(struct timerthread_thread *tt) {
g_tree_foreach(tt->tree, __tt_put_all, tt);
g_tree_destroy(tt->tree);
mutex_destroy(&tt->lock);
}
void timerthread_free(struct timerthread *tt) {
for (unsigned int i = 0; i < tt->num_threads; i++)
timerthread_thread_destroy(&tt->threads[i]);
g_free(tt->threads);
}
static void timerthread_run(void *p) {
struct timerthread *tt = p;
struct timerthread_thread *tt = p;
struct timerthread *parent = tt->parent;
struct thread_waker waker = { .lock = &tt->lock, .cond = &tt->cond };
thread_waker_add(&waker);
@ -61,7 +75,7 @@ static void timerthread_run(void *p) {
mutex_unlock(&tt->lock);
// run and release
tt->func(tt_obj);
parent->func(tt_obj);
obj_put(tt_obj);
log_info_reset();
@ -83,17 +97,17 @@ sleep:;
void timerthread_launch(struct timerthread *tt, const char *scheduler, int prio, const char *name) {
for (unsigned int i = 0; i < tt->num_threads; i++)
thread_create_detach_prio(timerthread_run, tt, scheduler, prio, name);
thread_create_detach_prio(timerthread_run, &tt->threads[i], scheduler, prio, name);
}
void timerthread_obj_schedule_abs_nl(struct timerthread_obj *tt_obj, const struct timeval *tv) {
if (!tt_obj)
return;
struct timerthread_thread *tt = tt_obj->thread;
//ilog(LOG_DEBUG, "scheduling timer object at %llu.%06lu", (unsigned long long) tv->tv_sec,
//(unsigned long) tv->tv_usec);
struct timerthread *tt = tt_obj->tt;
if (tt_obj->next_check.tv_sec && timeval_cmp(&tt_obj->next_check, tv) <= 0)
return; /* already scheduled sooner */
if (!g_tree_remove(tt->tree, tt_obj))
@ -107,11 +121,14 @@ void timerthread_obj_deschedule(struct timerthread_obj *tt_obj) {
if (!tt_obj)
return;
struct timerthread *tt = tt_obj->tt;
struct timerthread_thread *tt = tt_obj->thread;
if (!tt)
return;
mutex_lock(&tt->lock);
if (!tt_obj->next_check.tv_sec)
goto nope; /* already descheduled */
int ret = g_tree_remove(tt->tree, tt_obj);
gboolean ret = g_tree_remove(tt->tree, tt_obj);
ZERO(tt_obj->next_check);
if (ret)
obj_put(tt_obj);

@ -7,11 +7,19 @@
#include "auxlib.h"
#include "obj.h"
struct timerthread {
unsigned int num_threads;
GTree *tree;
struct timerthread;
struct timerthread_thread {
struct timerthread *parent;
GTree *tree; // XXX investigate other structures
mutex_t lock;
cond_t cond;
};
struct timerthread {
unsigned int num_threads;
struct timerthread_thread *threads;
unsigned int thread_idx;
void (*func)(void *);
};
@ -19,6 +27,7 @@ struct timerthread_obj {
struct obj obj;
struct timerthread *tt;
struct timerthread_thread *thread; // set once and then static
struct timeval next_check; /* protected by ->lock */
struct timeval last_run; /* ditto */
};
@ -62,12 +71,24 @@ void timerthread_queue_flush_data(void *ptr);
void timerthread_queue_push(struct timerthread_queue *, struct timerthread_queue_entry *);
unsigned int timerthread_queue_flush(struct timerthread_queue *, void *);
INLINE struct timerthread_thread *timerthread_get_next(struct timerthread *tt) {
unsigned int idx = g_atomic_int_add(&tt->thread_idx, 1);
idx = idx % tt->num_threads; // XXX check perf without %
return &tt->threads[idx];
}
INLINE void timerthread_obj_schedule_abs(struct timerthread_obj *tt_obj, const struct timeval *tv) {
if (!tt_obj)
return;
mutex_lock(&tt_obj->tt->lock);
struct timerthread_thread *tt = tt_obj->thread;
if (!tt) {
tt = timerthread_get_next(tt_obj->tt);
g_atomic_pointer_compare_and_exchange(&tt_obj->thread, NULL, tt);
}
tt = tt_obj->thread;
mutex_lock(&tt->lock);
timerthread_obj_schedule_abs_nl(tt_obj, tv);
mutex_unlock(&tt_obj->tt->lock);
mutex_unlock(&tt->lock);
}

Loading…
Cancel
Save