|
|
|
|
@ -108,8 +108,10 @@ void timerthread_queue_run(void *ptr) {
|
|
|
|
|
|
|
|
|
|
mutex_lock(&ttq->lock);
|
|
|
|
|
|
|
|
|
|
while (ttq->entries.length) {
|
|
|
|
|
struct timerthread_queue_entry *ttqe = g_queue_pop_head(&ttq->entries);
|
|
|
|
|
while (g_tree_nnodes(ttq->entries)) {
|
|
|
|
|
struct timerthread_queue_entry *ttqe = g_tree_find_first(ttq->entries, NULL, NULL);
|
|
|
|
|
assert(ttqe != NULL);
|
|
|
|
|
g_tree_remove(ttq->entries, ttqe);
|
|
|
|
|
|
|
|
|
|
mutex_unlock(&ttq->lock);
|
|
|
|
|
|
|
|
|
|
@ -120,8 +122,7 @@ void timerthread_queue_run(void *ptr) {
|
|
|
|
|
if (!ret)
|
|
|
|
|
continue;
|
|
|
|
|
// couldn't send the last one. remember time to schedule
|
|
|
|
|
g_queue_push_head(&ttq->entries, ttqe);
|
|
|
|
|
// XXX sort queue?
|
|
|
|
|
g_tree_insert(ttq->entries, ttqe, ttqe);
|
|
|
|
|
next_send = ttqe->when;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@ -129,17 +130,30 @@ void timerthread_queue_run(void *ptr) {
|
|
|
|
|
mutex_unlock(&ttq->lock);
|
|
|
|
|
|
|
|
|
|
if (next_send.tv_sec)
|
|
|
|
|
timerthread_obj_schedule_abs(&ttq->tt_obj, &next_send); // XXX does this work if already scheduled earlier?
|
|
|
|
|
timerthread_obj_schedule_abs(&ttq->tt_obj, &next_send);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int ttqe_free_all(void *k, void *v, void *d) {
|
|
|
|
|
struct timerthread_queue *ttq = d;
|
|
|
|
|
ttq->entry_free_func(k);
|
|
|
|
|
return FALSE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void __timerthread_queue_free(void *p) {
|
|
|
|
|
struct timerthread_queue *ttq = p;
|
|
|
|
|
g_queue_clear_full(&ttq->entries, ttq->entry_free_func);
|
|
|
|
|
g_tree_foreach(ttq->entries, ttqe_free_all, ttq);
|
|
|
|
|
g_tree_destroy(ttq->entries);
|
|
|
|
|
mutex_destroy(&ttq->lock);
|
|
|
|
|
if (ttq->free_func)
|
|
|
|
|
ttq->free_func(p);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int ttqe_compare(const void *a, const void *b) {
|
|
|
|
|
const struct timerthread_queue_entry *t1 = a;
|
|
|
|
|
const struct timerthread_queue_entry *t2 = b;
|
|
|
|
|
return timeval_cmp_ptr(&t1->when, &t2->when);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void *timerthread_queue_new(const char *type, size_t size,
|
|
|
|
|
struct timerthread *tt,
|
|
|
|
|
void (*run_now_func)(struct timerthread_queue *, void *),
|
|
|
|
|
@ -158,7 +172,7 @@ void *timerthread_queue_new(const char *type, size_t size,
|
|
|
|
|
ttq->free_func = free_func;
|
|
|
|
|
ttq->entry_free_func = entry_free_func;
|
|
|
|
|
mutex_init(&ttq->lock);
|
|
|
|
|
g_queue_init(&ttq->entries);
|
|
|
|
|
ttq->entries = g_tree_new(ttqe_compare);
|
|
|
|
|
return ttq;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -183,34 +197,33 @@ void timerthread_queue_push(struct timerthread_queue *ttq, struct timerthread_qu
|
|
|
|
|
// ntohl(rh->timestamp));
|
|
|
|
|
|
|
|
|
|
mutex_lock(&ttq->lock);
|
|
|
|
|
unsigned int qlen = ttq->entries.length;
|
|
|
|
|
// this hands over ownership of cp, so we must copy the timeval out
|
|
|
|
|
struct timeval tv_send = ttqe->when;
|
|
|
|
|
g_queue_push_tail(&ttq->entries, ttqe);
|
|
|
|
|
g_tree_insert(ttq->entries, ttqe, ttqe);
|
|
|
|
|
struct timerthread_queue_entry *first_ttqe = g_tree_find_first(ttq->entries, NULL, NULL);
|
|
|
|
|
mutex_unlock(&ttq->lock);
|
|
|
|
|
|
|
|
|
|
// first packet in? we're probably not scheduled yet
|
|
|
|
|
if (!qlen)
|
|
|
|
|
if (first_ttqe == ttqe)
|
|
|
|
|
timerthread_obj_schedule_abs(&ttq->tt_obj, &tv_send);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int ttqe_ptr_match(const void *ent, const void *ptr) {
|
|
|
|
|
const struct timerthread_queue_entry *ttqe = ent;
|
|
|
|
|
return ttqe->source == ptr;
|
|
|
|
|
}
|
|
|
|
|
unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) {
|
|
|
|
|
if (!ttq)
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
unsigned int num = 0;
|
|
|
|
|
GList *l = ttq->entries.head;
|
|
|
|
|
while (l) {
|
|
|
|
|
GList *next = l->next;
|
|
|
|
|
struct timerthread_queue_entry *ttqe = l->data;
|
|
|
|
|
if (ttqe->source != ptr)
|
|
|
|
|
goto next;
|
|
|
|
|
g_queue_delete_link(&ttq->entries, l);
|
|
|
|
|
GQueue matches = G_QUEUE_INIT;
|
|
|
|
|
g_tree_find_all(&matches, ttq->entries, ttqe_ptr_match, ptr);
|
|
|
|
|
|
|
|
|
|
while (matches.length) {
|
|
|
|
|
struct timerthread_queue_entry *ttqe = g_queue_pop_head(&matches);
|
|
|
|
|
ttq->entry_free_func(ttqe);
|
|
|
|
|
num++;
|
|
|
|
|
|
|
|
|
|
next:
|
|
|
|
|
l = next;
|
|
|
|
|
}
|
|
|
|
|
return num;
|
|
|
|
|
}
|
|
|
|
|
|