From 26824993e2c4ace2413f6b70dfaba9b20fc6322c Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 31 Jul 2012 21:34:30 +0000 Subject: [PATCH] preliminary attempt to make the udp controller thread-safe --- daemon/aux.h | 12 ++++++++++++ daemon/control_udp.c | 26 +++++++++++++++++++++++--- daemon/control_udp.h | 4 ++++ daemon/poller.c | 1 + 4 files changed, 40 insertions(+), 3 deletions(-) diff --git a/daemon/aux.h b/daemon/aux.h index 2e71e89d1..d66cce035 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -150,6 +150,7 @@ static inline int smart_pton(int af, char *src, void *dst) { typedef GStaticMutex mutex_t; typedef GStaticRWLock rwlock_t; +typedef GCond *cond_t; #define mutex_init(m) g_static_mutex_init(m) #define mutex_lock(m) g_static_mutex_lock(m) @@ -162,10 +163,16 @@ typedef GStaticRWLock rwlock_t; #define rwlock_lock_w(l) g_static_rw_lock_writer_lock(l) #define rwlock_unlock_w(l) g_static_rw_lock_writer_unlock(l) +#define cond_init(c) *(c) = g_cond_new() +#define cond_wait(c,m) g_cond_wait(*(c),m) +#define cond_signal(c) g_cond_signal(*(c)) +#define cond_broadcast(c) g_cond_broadcast(*(c)) + #else typedef GMutex mutex_t; typedef GRWLock rwlock_t; +typedef GCond cond_t; #define mutex_init(m) g_mutex_init(m) #define mutex_lock(m) g_mutex_lock(m) @@ -178,6 +185,11 @@ typedef GRWLock rwlock_t; #define rwlock_lock_w(l) g_rw_lock_writer_lock(l) #define rwlock_unlock_w(l) g_rw_lock_writer_unlock(l) +#define cond_init(c) g_cond_init(c) +#define cond_wait(c,m) g_cond_wait(c,m) +#define cond_signal(c) g_cond_signal(c) +#define cond_broadcast(c) g_cond_broadcast(c) + #endif diff --git a/daemon/control_udp.c b/daemon/control_udp.c index 6f6d5b3b5..3e1a5b1b5 100644 --- a/daemon/control_udp.c +++ b/daemon/control_udp.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "control_udp.h" #include "poller.h" @@ -15,6 +16,9 @@ #include "call.h" +static const char *cookie_in_use = "MAGIC"; + + static void control_udp_closed(int fd, void *p, uintptr_t x) { abort(); } @@ -34,8 +38,9 @@ static void control_udp_incoming(int fd, void *p, uintptr_t x) { sin_len = sizeof(sin); len = recvfrom(fd, buf, sizeof(buf) - 1, 0, (struct sockaddr *) &sin, &sin_len); - if (len <= 0) { - mylog(LOG_WARNING, "Error reading from UDP socket"); + if (len < 0) { + if (errno != EWOULDBLOCK) + mylog(LOG_WARNING, "Error reading from UDP socket"); return; } @@ -87,6 +92,7 @@ static void control_udp_incoming(int fd, void *p, uintptr_t x) { pcre_get_substring_list(buf, ovec, ret, &out); + mutex_lock(&u->lock); if (poller_now(u->poller) - u->oven_time >= 30) { g_hash_table_remove_all(u->stale_cookies); #if GLIB_CHECK_VERSION(2,14,0) @@ -101,16 +107,26 @@ static void control_udp_incoming(int fd, void *p, uintptr_t x) { u->oven_time = poller_now(u->poller); /* baked new cookies! */ } +restart: /* XXX better hashing */ reply = g_hash_table_lookup(u->fresh_cookies, out[RE_UDP_COOKIE]); if (!reply) reply = g_hash_table_lookup(u->stale_cookies, out[RE_UDP_COOKIE]); if (reply) { + if (reply == cookie_in_use) { + /* another thread is working on this right now */ + cond_wait(&u->cond, &u->lock); + goto restart; + } + mutex_unlock(&u->lock); mylog(LOG_INFO, "Detected command from udp:%s:%u as a duplicate", addr, ntohs(sin.sin6_port)); sendto(fd, reply, strlen(reply), 0, (struct sockaddr *) &sin, sin_len); goto out; } + g_hash_table_replace(u->fresh_cookies, (void *) out[RE_UDP_COOKIE], (void *) cookie_in_use); + mutex_unlock(&u->lock); + if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'U') reply = call_update_udp(out, u->callmaster); else if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'L') @@ -151,8 +167,11 @@ static void control_udp_incoming(int fd, void *p, uintptr_t x) { if (reply) { sendto(fd, reply, strlen(reply), 0, (struct sockaddr *) &sin, sin_len); - g_hash_table_insert(u->fresh_cookies, g_string_chunk_insert(u->fresh_chunks, out[RE_UDP_COOKIE]), + mutex_lock(&u->lock); + g_hash_table_replace(u->fresh_cookies, g_string_chunk_insert(u->fresh_chunks, out[RE_UDP_COOKIE]), g_string_chunk_insert(u->fresh_chunks, reply)); + cond_broadcast(&u->cond); + mutex_unlock(&u->lock); free(reply); } @@ -197,6 +216,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1 c->fresh_chunks = g_string_chunk_new(4 * 1024); c->stale_chunks = g_string_chunk_new(4 * 1024); c->oven_time = poller_now(p); + mutex_init(&c->lock); c->parse_re = pcre_compile( /* cookie cmd flags callid viabranch:5 */ "^(\\S+)\\s+(?:([ul])(\\S*)\\s+([^;]+)(?:;(\\S+))?\\s+" \ diff --git a/daemon/control_udp.h b/daemon/control_udp.h index 5aae4183a..ca0d8e100 100644 --- a/daemon/control_udp.h +++ b/daemon/control_udp.h @@ -10,6 +10,7 @@ #include #include #include "obj.h" +#include "aux.h" @@ -52,6 +53,9 @@ struct control_udp { pcre *parse_re; pcre_extra *parse_ree; pcre *fallback_re; + + mutex_t lock; + cond_t cond; GHashTable *fresh_cookies, *stale_cookies; GStringChunk *fresh_chunks, *stale_chunks; time_t oven_time; diff --git a/daemon/poller.c b/daemon/poller.c index 5d60cafa3..4a83c56d8 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -481,6 +481,7 @@ int poller_add_timer(struct poller *p, void (*f)(void *), struct obj *o) { return poller_timer_link(p, &p->timers_add, f, o); } +/* XXX not thread safe */ time_t poller_now(struct poller *p) { return p->now; }