preliminary attempt to make the udp controller thread-safe

remotes/origin/2.1
Richard Fuchs 13 years ago
parent 5f5e158d76
commit 26824993e2

@ -150,6 +150,7 @@ static inline int smart_pton(int af, char *src, void *dst) {
typedef GStaticMutex mutex_t;
typedef GStaticRWLock rwlock_t;
typedef GCond *cond_t;
#define mutex_init(m) g_static_mutex_init(m)
#define mutex_lock(m) g_static_mutex_lock(m)
@ -162,10 +163,16 @@ typedef GStaticRWLock rwlock_t;
#define rwlock_lock_w(l) g_static_rw_lock_writer_lock(l)
#define rwlock_unlock_w(l) g_static_rw_lock_writer_unlock(l)
#define cond_init(c) *(c) = g_cond_new()
#define cond_wait(c,m) g_cond_wait(*(c),m)
#define cond_signal(c) g_cond_signal(*(c))
#define cond_broadcast(c) g_cond_broadcast(*(c))
#else
typedef GMutex mutex_t;
typedef GRWLock rwlock_t;
typedef GCond cond_t;
#define mutex_init(m) g_mutex_init(m)
#define mutex_lock(m) g_mutex_lock(m)
@ -178,6 +185,11 @@ typedef GRWLock rwlock_t;
#define rwlock_lock_w(l) g_rw_lock_writer_lock(l)
#define rwlock_unlock_w(l) g_rw_lock_writer_unlock(l)
#define cond_init(c) g_cond_init(c)
#define cond_wait(c,m) g_cond_wait(c,m)
#define cond_signal(c) g_cond_signal(c)
#define cond_broadcast(c) g_cond_broadcast(c)
#endif

@ -7,6 +7,7 @@
#include <glib.h>
#include <time.h>
#include <netinet/in.h>
#include <errno.h>
#include "control_udp.h"
#include "poller.h"
@ -15,6 +16,9 @@
#include "call.h"
static const char *cookie_in_use = "MAGIC";
static void control_udp_closed(int fd, void *p, uintptr_t x) {
abort();
}
@ -34,8 +38,9 @@ static void control_udp_incoming(int fd, void *p, uintptr_t x) {
sin_len = sizeof(sin);
len = recvfrom(fd, buf, sizeof(buf) - 1, 0, (struct sockaddr *) &sin, &sin_len);
if (len <= 0) {
mylog(LOG_WARNING, "Error reading from UDP socket");
if (len < 0) {
if (errno != EWOULDBLOCK)
mylog(LOG_WARNING, "Error reading from UDP socket");
return;
}
@ -87,6 +92,7 @@ static void control_udp_incoming(int fd, void *p, uintptr_t x) {
pcre_get_substring_list(buf, ovec, ret, &out);
mutex_lock(&u->lock);
if (poller_now(u->poller) - u->oven_time >= 30) {
g_hash_table_remove_all(u->stale_cookies);
#if GLIB_CHECK_VERSION(2,14,0)
@ -101,16 +107,26 @@ static void control_udp_incoming(int fd, void *p, uintptr_t x) {
u->oven_time = poller_now(u->poller); /* baked new cookies! */
}
restart:
/* XXX better hashing */
reply = g_hash_table_lookup(u->fresh_cookies, out[RE_UDP_COOKIE]);
if (!reply)
reply = g_hash_table_lookup(u->stale_cookies, out[RE_UDP_COOKIE]);
if (reply) {
if (reply == cookie_in_use) {
/* another thread is working on this right now */
cond_wait(&u->cond, &u->lock);
goto restart;
}
mutex_unlock(&u->lock);
mylog(LOG_INFO, "Detected command from udp:%s:%u as a duplicate", addr, ntohs(sin.sin6_port));
sendto(fd, reply, strlen(reply), 0, (struct sockaddr *) &sin, sin_len);
goto out;
}
g_hash_table_replace(u->fresh_cookies, (void *) out[RE_UDP_COOKIE], (void *) cookie_in_use);
mutex_unlock(&u->lock);
if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'U')
reply = call_update_udp(out, u->callmaster);
else if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'L')
@ -151,8 +167,11 @@ static void control_udp_incoming(int fd, void *p, uintptr_t x) {
if (reply) {
sendto(fd, reply, strlen(reply), 0, (struct sockaddr *) &sin, sin_len);
g_hash_table_insert(u->fresh_cookies, g_string_chunk_insert(u->fresh_chunks, out[RE_UDP_COOKIE]),
mutex_lock(&u->lock);
g_hash_table_replace(u->fresh_cookies, g_string_chunk_insert(u->fresh_chunks, out[RE_UDP_COOKIE]),
g_string_chunk_insert(u->fresh_chunks, reply));
cond_broadcast(&u->cond);
mutex_unlock(&u->lock);
free(reply);
}
@ -197,6 +216,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1
c->fresh_chunks = g_string_chunk_new(4 * 1024);
c->stale_chunks = g_string_chunk_new(4 * 1024);
c->oven_time = poller_now(p);
mutex_init(&c->lock);
c->parse_re = pcre_compile(
/* cookie cmd flags callid viabranch:5 */
"^(\\S+)\\s+(?:([ul])(\\S*)\\s+([^;]+)(?:;(\\S+))?\\s+" \

@ -10,6 +10,7 @@
#include <time.h>
#include <netinet/in.h>
#include "obj.h"
#include "aux.h"
@ -52,6 +53,9 @@ struct control_udp {
pcre *parse_re;
pcre_extra *parse_ree;
pcre *fallback_re;
mutex_t lock;
cond_t cond;
GHashTable *fresh_cookies, *stale_cookies;
GStringChunk *fresh_chunks, *stale_chunks;
time_t oven_time;

@ -481,6 +481,7 @@ int poller_add_timer(struct poller *p, void (*f)(void *), struct obj *o) {
return poller_timer_link(p, &p->timers_add, f, o);
}
/* XXX not thread safe */
time_t poller_now(struct poller *p) {
return p->now;
}

Loading…
Cancel
Save