making timers thread-safe

remotes/origin/2.1
Richard Fuchs 13 years ago
parent 4b74e6f476
commit 7dc00bd258

@ -612,7 +612,7 @@ struct callmaster *callmaster_new(struct poller *p) {
c->poller = p;
rwlock_init(&c->lock);
poller_timer(p, callmaster_timer, &c->obj);
poller_add_timer(p, callmaster_timer, &c->obj);
obj_put(c);
return c;

@ -36,7 +36,12 @@ struct poller {
mutex_t lock;
struct poller_item_int **items;
unsigned int items_size;
GList *timers;
mutex_t timers_lock;
GSList *timers;
mutex_t timers_add_del_lock; /* nested below timers_lock */
GSList *timers_add;
GSList *timers_del;
time_t now;
};
@ -57,6 +62,8 @@ struct poller *poller_new(void) {
if (p->fd == -1)
abort();
mutex_init(&p->lock);
mutex_init(&p->timers_lock);
mutex_init(&p->timers_add_del_lock);
return p;
}
@ -127,7 +134,7 @@ static int __poller_add_item(struct poller *p, struct poller_item *i, int has_lo
mutex_unlock(&p->lock);
if (i->timer)
poller_timer(p, poller_fd_timer, &ip->obj);
poller_add_timer(p, poller_fd_timer, &ip->obj);
obj_put(ip);
@ -148,20 +155,8 @@ int poller_add_item(struct poller *p, struct poller_item *i) {
}
int poller_find_timer(gconstpointer a, gconstpointer b) {
const struct timer_item *it = a;
const struct obj *x = b;
if (it->obj_ptr == x)
return 0;
return 1;
}
int poller_del_item(struct poller *p, int fd) {
struct poller_item_int *it;
GList *l;
struct timer_item *ti;
if (!p || fd < 0)
return -1;
@ -180,18 +175,8 @@ int poller_del_item(struct poller *p, int fd) {
mutex_unlock(&p->lock);
if (it->item.timer) {
while (1) {
/* rare but possible race with poller_add_item above */
l = g_list_find_custom(p->timers, &it->obj, poller_find_timer);
if (l)
break;
}
p->timers = g_list_remove_link(p->timers, l);
ti = l->data;
obj_put(ti);
g_list_free_1(l);
}
if (it->item.timer)
poller_del_timer(p, poller_fd_timer, &it->obj);
obj_put(it);
@ -235,12 +220,75 @@ int poller_update_item(struct poller *p, struct poller_item *i) {
}
/* timers_lock and timers_add_del_lock must be held */
static void poller_timers_mod(struct poller *p) {
GSList *l, **ll, **kk;
struct timer_item *ti, *tj;
ll = &p->timers_add;
while (*ll) {
l = *ll;
*ll = l->next;
l->next = p->timers;
p->timers = l;
}
ll = &p->timers_del;
while (*ll) {
ti = (*ll)->data;
kk = &p->timers;
while (*kk) {
tj = (*kk)->data;
if (tj->func != ti->func)
goto next;
if (tj->obj_ptr != ti->obj_ptr)
goto next;
goto found;
next:
kk = &(*kk)->next;
}
/* deleted a timer that wasn't added yet. possible race, otherwise bug */
ll = &(*ll)->next;
continue;
found:
l = *ll;
*ll = (*ll)->next;
obj_put(l->data);
g_slist_free_1(l);
l = *kk;
*kk = (*kk)->next;
obj_put(l->data);
g_slist_free_1(l);
}
}
static void poller_timers_run(struct poller *p) {
GSList *l;
struct timer_item *ti;
mutex_lock(&p->timers_lock);
mutex_lock(&p->timers_add_del_lock);
poller_timers_mod(p);
mutex_unlock(&p->timers_add_del_lock);
for (l = p->timers; l; l = l->next) {
ti = l->data;
ti->func(ti->obj_ptr);
}
mutex_lock(&p->timers_add_del_lock);
poller_timers_mod(p);
mutex_unlock(&p->timers_add_del_lock);
mutex_unlock(&p->timers_lock);
}
int poller_poll(struct poller *p, int timeout) {
int ret, i;
struct poller_item_int *it;
time_t last;
GList *li, *ne;
struct timer_item *ti;
struct epoll_event evs[128], *ev, e;
if (!p)
@ -255,16 +303,10 @@ int poller_poll(struct poller *p, int timeout) {
last = p->now;
p->now = time(NULL);
if (last != p->now) {
for (li = p->timers; li; li = ne) {
ne = li->next;
ti = li->data;
/* XXX not safe */
mutex_unlock(&p->lock);
ti->func(ti->obj_ptr);
mutex_lock(&p->lock);
}
mutex_unlock(&p->lock);
poller_timers_run(p);
ret = p->items_size;
goto out;
goto out_lock;
}
mutex_unlock(&p->lock);
@ -325,6 +367,7 @@ next:
out:
mutex_unlock(&p->lock);
out_lock:
return ret;
}
@ -406,7 +449,7 @@ static void timer_item_free(void *p) {
obj_put(i->obj_ptr);
}
int poller_timer(struct poller *p, void (*f)(void *), struct obj *o) {
static int poller_timer_link(struct poller *p, GSList **lp, void (*f)(void *), struct obj *o) {
struct timer_item *i;
if (!o || !f)
@ -417,11 +460,27 @@ int poller_timer(struct poller *p, void (*f)(void *), struct obj *o) {
i->func = f;
i->obj_ptr = obj_hold(o);
p->timers = g_list_prepend(p->timers, i);
mutex_lock(&p->timers_add_del_lock);
*lp = g_slist_prepend(*lp, i);
if (mutex_trylock(&p->timers_lock)) {
poller_timers_mod(p);
mutex_unlock(&p->timers_lock);
}
mutex_unlock(&p->timers_add_del_lock);
return 0;
}
int poller_del_timer(struct poller *p, void (*f)(void *), struct obj *o) {
return poller_timer_link(p, &p->timers_del, f, o);
}
int poller_add_timer(struct poller *p, void (*f)(void *), struct obj *o) {
return poller_timer_link(p, &p->timers_add, f, o);
}
time_t poller_now(struct poller *p) {
return p->now;
}

@ -37,7 +37,8 @@ int poller_isblocked(struct poller *, int);
void poller_error(struct poller *, int);
time_t poller_now(struct poller *);
int poller_timer(struct poller *, void (*)(void *), struct obj *);
int poller_add_timer(struct poller *, void (*)(void *), struct obj *);
int poller_del_timer(struct poller *, void (*)(void *), struct obj *);
#endif

Loading…
Cancel
Save