From 145bbd1f7c52346aa29071c7a7ea578bf75e115f Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 27 Jul 2012 19:29:21 +0000 Subject: [PATCH] Prepare for multi-threaded operation Introduce generic "objects" with reference counting primitives and automatic resource deallocation when no refs are left ("garbage collection"). Overhaul poller framework to make extensive use of these objects to allow for future thread-concurrent operation. No locking added anywhere yet except in poller. Poller is still not 100% thread safe, but close. Valgrind reports no errors or memleaks. --- daemon/call.c | 143 +++++++++++++++-------- daemon/call.h | 6 + daemon/control.c | 79 +++++++------ daemon/control.h | 11 +- daemon/control_udp.c | 15 ++- daemon/control_udp.h | 5 + daemon/obj.h | 157 +++++++++++++++++++++++++ daemon/poller.c | 271 ++++++++++++++++++++++++++++++++----------- daemon/poller.h | 28 ++--- daemon/streambuf.c | 8 +- 10 files changed, 541 insertions(+), 182 deletions(-) create mode 100644 daemon/obj.h diff --git a/daemon/call.c b/daemon/call.c index 8dd1e3ca2..3023c3068 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -83,13 +83,16 @@ static void unkernelize(struct peer *); -static void stream_closed(int fd, void *p) { - struct streamrelay *r = p; +static void stream_closed(int fd, void *p, uintptr_t u) { + struct callstream *cs = p; + struct streamrelay *r; struct call *c; int i; socklen_t j; - c = r->up->up->call; + r = &cs->peers[u >> 1].rtps[u & 1]; + assert(r->fd == fd); + c = cs->call; j = sizeof(i); getsockopt(fd, SOL_SOCKET, SO_ERROR, &i, &j); @@ -196,7 +199,7 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_ if (pe->confirmed || !pe->filled || r->idx != 0) goto forward; - if (!c->lookup_done || m->poller->now <= c->lookup_done + 3) + if (!c->lookup_done || poller_now(m->poller) <= c->lookup_done + 3) goto peerinfo; mylog(LOG_DEBUG, LOG_PREFIX_C "Confirmed peer information for port %u - %s:%u", @@ -302,7 +305,7 @@ drop: r->stats.bytes += l; m->statsps.packets++; m->statsps.bytes += l; - r->last = m->poller->now; + r->last = poller_now(m->poller); return 0; } @@ -310,8 +313,9 @@ drop: -static void stream_readable(int fd, void *p) { - struct streamrelay *r = p; +static void stream_readable(int fd, void *p, uintptr_t u) { + struct callstream *cs = p; + struct streamrelay *r; char buf[8192]; int ret; struct sockaddr_storage ss; @@ -320,6 +324,9 @@ static void stream_readable(int fd, void *p) { unsigned int sinlen; void *sinp; + r = &cs->peers[u >> 1].rtps[u & 1]; + assert(r->fd == fd); + for (;;) { sinlen = sizeof(ss); ret = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *) &ss, &sinlen); @@ -327,7 +334,7 @@ static void stream_readable(int fd, void *p) { if (ret < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) break; - stream_closed(fd, r); + stream_closed(fd, r, 0); break; } if (ret >= sizeof(buf)) @@ -467,7 +474,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) { else if (IN6_IS_ADDR_UNSPECIFIED(&sr->peer.ip46)) check = cm->silent_timeout; - if (po->now - sr->last < check) + if (poller_now(po) - sr->last < check) goto good; } } @@ -568,7 +575,7 @@ static void callmaster_timer(void *ptr) { DS(errors); if (ke->stats.packets != sr->kstats.packets) - sr->last = po->now; + sr->last = poller_now(po); sr->kstats.packets = ke->stats.packets; sr->kstats.bytes = ke->stats.bytes; @@ -595,19 +602,20 @@ next: struct callmaster *callmaster_new(struct poller *p) { struct callmaster *c; - c = g_slice_alloc0(sizeof(*c)); + c = obj_alloc0("callmaster", sizeof(*c), NULL); c->callhash = g_hash_table_new(g_str_hash, g_str_equal); if (!c->callhash) goto fail; c->poller = p; - poller_timer(p, callmaster_timer, c); + poller_timer(p, callmaster_timer, &c->obj); + obj_put(&c->obj); return c; fail: - g_slice_free1(sizeof(*c), c); + obj_put(&c->obj); return NULL; } @@ -876,7 +884,8 @@ static void steal_peer(struct peer *dest, struct peer *src) { srs->peer_advertised.port = 0; pi.fd = sr->fd; - pi.ptr = sr; + pi.obj = &sr->up->up->obj; + pi.uintp = i | (dest->idx << 1); pi.readable = stream_readable; pi.closed = stream_closed; @@ -894,10 +903,9 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 po = ca->callmaster->poller; - ZERO(*s); ZERO(pi); - s->call = ca; + s->call = obj_get(&ca->obj); DBG("setting new callstream num to %i", num); s->num = num; @@ -915,7 +923,7 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 r->fd = -1; r->idx = j; r->up = p; - r->last = po->now; + r->last = poller_now(po); } tport = (i == 0) ? port1 : port2; @@ -927,7 +935,8 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 r = &p->rtps[j]; pi.fd = r->fd; - pi.ptr = r; + pi.obj = &s->obj; + pi.uintp = (i << 1) | j; pi.readable = stream_readable; pi.closed = stream_closed; @@ -939,6 +948,31 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2 +static void callstream_free(void *ptr) { + struct callstream *s = ptr; + int i, j; + struct peer *p; + struct streamrelay *r; + + for (i = 0; i < 2; i++) { + p = &s->peers[i]; + + free(p->tag); + free(p->mediatype); + + for (j = 0; j < 2; j++) { + r = &p->rtps[j]; + + if (r->fd != -1) { + close(r->fd); + bit_array_clear(ports_used, r->localport); + r->fd = -1; + } + } + } + obj_put(&s->call->obj); +} + static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) { GQueue *q; GList *i, *l; @@ -989,7 +1023,7 @@ found: if (!opmode) { /* request */ DBG("creating new callstream"); - cs = g_slice_alloc(sizeof(*cs)); + cs = obj_alloc0("callstream", sizeof(*cs), callstream_free); if (!r) { /* nothing found to re-use, open new ports */ @@ -1012,7 +1046,7 @@ found: } } - g_queue_push_tail(q, cs); + g_queue_push_tail(q, cs); /* hand over the ref */ ZERO(c->lookup_done); continue; } @@ -1070,12 +1104,12 @@ got_cs: /* need a new call stream after all */ DBG("case 4"); cs_o = cs; - cs = g_slice_alloc(sizeof(*cs)); + cs = obj_alloc0("callstream", sizeof(*cs), callstream_free); callstream_init(cs, c, 0, 0, t->num); steal_peer(&cs->peers[0], &cs_o->peers[0]); p = &cs->peers[1]; setup_peer(p, t, tag); - g_queue_push_tail(c->callstreams, cs_o); + g_queue_push_tail(c->callstreams, cs_o); /* hand over ref XXX? */ } time(&c->lookup_done); @@ -1136,39 +1170,24 @@ static void kill_callstream(struct callstream *s) { unkernelize(p); - free(p->tag); - free(p->mediatype); - for (j = 0; j < 2; j++) { r = &p->rtps[j]; - if (r->fd != -1) { + if (r->fd != -1) poller_del_item(s->call->callmaster->poller, r->fd); - close(r->fd); - bit_array_clear(ports_used, r->localport); - } } } - - g_slice_free1(sizeof(*s), s); } static void call_destroy(struct call *c) { struct callmaster *m = c->callmaster; struct callstream *s; - g_hash_table_remove(m->callhash, c->callid); + g_hash_table_remove(m->callhash, c->callid); /* steal this ref */ if (redis_delete) redis_delete(c); - g_hash_table_destroy(c->infohash); - g_hash_table_destroy(c->branches); - if (c->calling_agent) - free(c->calling_agent); - if (c->called_agent) - free(c->called_agent); - mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid); while (c->callstreams->head) { s = g_queue_pop_head(c->callstreams); @@ -1190,11 +1209,10 @@ static void call_destroy(struct call *c) { s->peers[1].rtps[1].localport, s->peers[1].rtps[1].stats.packets, s->peers[1].rtps[1].stats.bytes, s->peers[1].rtps[1].stats.errors); kill_callstream(s); + obj_put(&s->obj); } - g_queue_free(c->callstreams); - free(c->callid); - g_slice_free1(sizeof(*c), c); + obj_put(&c->obj); } @@ -1280,16 +1298,29 @@ static guint g_str_hash0(gconstpointer v) { return g_str_hash(v); } +static void call_free(void *p) { + struct call *c = p; + + g_hash_table_destroy(c->infohash); + g_hash_table_destroy(c->branches); + if (c->calling_agent) + free(c->calling_agent); + if (c->called_agent) + free(c->called_agent); + g_queue_free(c->callstreams); + free(c->callid); +} + static struct call *call_create(const char *callid, struct callmaster *m) { struct call *c; mylog(LOG_NOTICE, LOG_PREFIX_C "Creating new call", callid); /* XXX will spam syslog on recovery from DB */ - c = g_slice_alloc0(sizeof(*c)); + c = obj_alloc0("call", sizeof(*c), call_free); c->callmaster = m; c->callid = strdup(callid); c->callstreams = g_queue_new(); - c->created = m->poller->now; + c->created = poller_now(m->poller); c->infohash = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); c->branches = g_hash_table_new_full(g_str_hash0, g_str_equal0, free, NULL); return c; @@ -1302,8 +1333,10 @@ struct call *call_get_or_create(const char *callid, const char *viabranch, struc if (!c) { /* completely new call-id, create call */ c = call_create(callid, m); - g_hash_table_insert(m->callhash, c->callid, c); + g_hash_table_insert(m->callhash, c->callid, obj_get(&c->obj)); } + else + obj_hold(&c->obj); if (viabranch && !g_hash_table_lookup(c->branches, viabranch)) g_hash_table_insert(c->branches, strdup(viabranch), (void *) 0x1); @@ -1382,12 +1415,14 @@ char *call_update_udp(const char **out, struct callmaster *m) { ret = streams_print(c->callstreams, 1, (num >= 0) ? 0 : 1, out[RE_UDP_COOKIE], 1); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); c->log_info = NULL; + obj_put(&c->obj); return ret; fail: mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]); asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]); c->log_info = NULL; + obj_put(&c->obj); return ret; } @@ -1405,6 +1440,7 @@ char *call_lookup_udp(const char **out, struct callmaster *m) { asprintf(&ret, "%s 0 " IPF "\n", out[RE_UDP_COOKIE], IPP(m->ipv4)); return ret; } + obj_hold(&c->obj); c->log_info = out[RE_UDP_UL_CALLID]; strdupfree(&c->called_agent, "UNKNOWN(udp)"); @@ -1423,12 +1459,14 @@ char *call_lookup_udp(const char **out, struct callmaster *m) { ret = streams_print(c->callstreams, 1, (num >= 0) ? 1 : 0, out[RE_UDP_COOKIE], 1); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); c->log_info = NULL; + obj_put(&c->obj); return ret; fail: mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]); asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]); c->log_info = NULL; + obj_put(&c->obj); return ret; } @@ -1451,6 +1489,7 @@ char *call_request(const char **out, struct callmaster *m) { ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 0 : 1, NULL, 0); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); + obj_put(&c->obj); return ret; } @@ -1466,6 +1505,8 @@ char *call_lookup(const char **out, struct callmaster *m) { return NULL; } + obj_hold(&c->obj); + strdupfree(&c->called_agent, out[RE_TCP_RL_AGENT] ? : "UNKNOWN"); info_parse(out[RE_TCP_RL_INFO], &c->infohash); s = streams_parse(out[RE_TCP_RL_STREAMS]); @@ -1477,6 +1518,7 @@ char *call_lookup(const char **out, struct callmaster *m) { ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 1 : 0, NULL, 0); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret); + obj_put(&c->obj); return ret; } @@ -1496,6 +1538,7 @@ char *call_delete_udp(const char **out, struct callmaster *m) { mylog(LOG_INFO, LOG_PREFIX_C "Call-ID to delete not found", out[RE_UDP_D_CALLID]); goto err; } + obj_hold(&c->obj); c->log_info = out[RE_UDP_D_VIABRANCH]; if (out[RE_UDP_D_FROMTAG] && *out[RE_UDP_D_FROMTAG]) { @@ -1552,8 +1595,10 @@ err: goto out; out: - if (c) + if (c) { c->log_info = NULL; + obj_put(&c->obj); + } return ret; } @@ -1564,8 +1609,10 @@ void call_delete(const char **out, struct callmaster *m) { if (!c) return; + obj_hold(&c->obj); /* delete whole list, as we don't have branches in tcp controller */ call_destroy(c); + obj_put(&c->obj); } @@ -1590,7 +1637,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) { (char *) g_hash_table_lookup(c->infohash, "from"), (char *) g_hash_table_lookup(c->infohash, "to"), c->calling_agent, c->called_agent, - (int) (m->poller->now - c->created)); + (int) (poller_now(m->poller) - c->created)); for (l = c->callstreams->head; l; l = l->next) { cs = l->data; @@ -1620,7 +1667,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) { (long long unsigned int) r1->stats.bytes + rx1->stats.bytes + r2->stats.bytes + rx2->stats.bytes, "active", p->codec ? : "unknown", - p->mediatype, (int) (m->poller->now - r1->last)); + p->mediatype, (int) (poller_now(m->poller) - r1->last)); } } diff --git a/daemon/call.h b/daemon/call.h index c41dff302..d1f7c4e07 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -10,6 +10,7 @@ #include "control.h" #include "control_udp.h" +#include "obj.h" struct poller; struct control_stream; @@ -67,12 +68,15 @@ struct peer { int confirmed:1; }; struct callstream { + struct obj obj; struct peer peers[2]; struct call *call; int num; }; struct call { + struct obj obj; + struct callmaster *callmaster; GQueue *callstreams; @@ -90,6 +94,8 @@ struct call { }; struct callmaster { + struct obj obj; + GHashTable *callhash; u_int16_t lastport; struct stats statsps; diff --git a/daemon/control.c b/daemon/control.c index eb819bad0..c61aef78a 100644 --- a/daemon/control.c +++ b/daemon/control.c @@ -12,10 +12,13 @@ #include "log.h" #include "call.h" + + + static pcre *parse_re; static pcre_extra *parse_ree; -static void control_stream_closed(int fd, void *p) { +static void control_stream_closed(int fd, void *p, uintptr_t u) { struct control_stream *s = p; struct control *c; @@ -23,23 +26,22 @@ static void control_stream_closed(int fd, void *p) { c = s->control; - c->stream_head = g_list_remove_link(c->stream_head, &s->link); + c->streams = g_list_remove(c->streams, s); + obj_put(&s->obj); if (poller_del_item(s->poller, fd)) abort(); - close(fd); - - streambuf_destroy(s->inbuf); - streambuf_destroy(s->outbuf); - free(s); } static void control_list(struct control *c, struct control_stream *s) { struct control_stream *i; + GList *l; - for (i = (void *) c->stream_head; i; i = (void *) i->link.next) - streambuf_printf(s->outbuf, DF "\n", DP(s->inaddr)); + for (l = c->streams; l; l = l->next) { + i = l->data; + streambuf_printf(s->outbuf, DF "\n", DP(i->inaddr)); + } streambuf_printf(s->outbuf, "End.\n"); } @@ -98,16 +100,16 @@ static int control_stream_parse(struct control_stream *s, char *line) { } -static void control_stream_timer(int fd, void *p) { +static void control_stream_timer(int fd, void *p, uintptr_t u) { struct control_stream *s = p; struct poller *o = s->poller; - if ((o->now - s->inbuf->active) >= 60 || (o->now - s->outbuf->active) >= 60) - control_stream_closed(s->fd, s); + if ((poller_now(o) - s->inbuf->active) >= 60 || (poller_now(o) - s->outbuf->active) >= 60) + control_stream_closed(s->fd, s, 0); } -static void control_stream_readable(int fd, void *p) { +static void control_stream_readable(int fd, void *p, uintptr_t u) { struct control_stream *s = p; char *line; int ret; @@ -131,21 +133,29 @@ static void control_stream_readable(int fd, void *p) { return; close: - control_stream_closed(fd, s); + control_stream_closed(fd, s, 0); } -static void control_stream_writeable(int fd, void *p) { +static void control_stream_writeable(int fd, void *p, uintptr_t u) { struct control_stream *s = p; if (streambuf_writeable(s->outbuf)) - control_stream_closed(fd, s); + control_stream_closed(fd, s, 0); } -static void control_closed(int fd, void *p) { +static void control_closed(int fd, void *p, uintptr_t u) { abort(); } -static void control_incoming(int fd, void *p) { +static void control_stream_free(void *p) { + struct control_stream *s = p; + + close(s->fd); + streambuf_destroy(s->inbuf); + streambuf_destroy(s->outbuf); +} + +static void control_incoming(int fd, void *p, uintptr_t u) { int nfd; struct control *c = p; struct control_stream *s; @@ -161,8 +171,14 @@ static void control_incoming(int fd, void *p) { mylog(LOG_INFO, "New control connection from " DF, DP(sin)); - s = malloc(sizeof(*s)); - ZERO(*s); + s = obj_alloc0("control_stream", sizeof(*s), control_stream_free); + + s->fd = nfd; + s->control = c; + s->poller = c->poller; + s->inbuf = streambuf_new(c->poller, nfd); + s->outbuf = streambuf_new(c->poller, nfd); + memcpy(&s->inaddr, &sin, sizeof(s->inaddr)); ZERO(i); i.fd = nfd; @@ -170,23 +186,18 @@ static void control_incoming(int fd, void *p) { i.readable = control_stream_readable; i.writeable = control_stream_writeable; i.timer = control_stream_timer; - i.ptr = s; + i.obj = &s->obj; + if (poller_add_item(c->poller, &i)) goto fail; - s->fd = nfd; - s->control = c; - s->poller = c->poller; - s->inbuf = streambuf_new(c->poller, nfd); - s->outbuf = streambuf_new(c->poller, nfd); - memcpy(&s->inaddr, &sin, sizeof(s->inaddr)); - c->stream_head = g_list_link(c->stream_head, &s->link); + /* let the list steal our own ref */ + c->streams = g_list_prepend(c->streams, s); return; fail: - free(s); - close(nfd); + obj_put(&s->obj); } @@ -219,8 +230,7 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru goto fail; - c = malloc(sizeof(*c)); - ZERO(*c); + c = obj_alloc0("control", sizeof(*c), NULL); c->fd = fd; c->poller = p; @@ -230,14 +240,15 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru i.fd = fd; i.closed = control_closed; i.readable = control_incoming; - i.ptr = c; + i.obj = &c->obj; if (poller_add_item(p, &i)) goto fail2; + obj_put(&c->obj); return c; fail2: - free(c); + obj_put(&c->obj); fail: close(fd); return NULL; diff --git a/daemon/control.h b/daemon/control.h index 0f83119b8..623ac04a4 100644 --- a/daemon/control.h +++ b/daemon/control.h @@ -9,6 +9,9 @@ #include #include +#include "obj.h" + + #define RE_TCP_RL_CMD 1 #define RE_TCP_RL_CALLID 2 #define RE_TCP_RL_STREAMS 3 @@ -31,8 +34,9 @@ struct callmaster; + struct control_stream { - GList link; /* must be first */ + struct obj obj; int fd; struct streambuf *inbuf; @@ -45,15 +49,18 @@ struct control_stream { struct control { + struct obj obj; + int fd; - GList *stream_head; + GList *streams; struct poller *poller; struct callmaster *callmaster; }; + struct control *control_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *); diff --git a/daemon/control_udp.c b/daemon/control_udp.c index 31ec331ad..6f6d5b3b5 100644 --- a/daemon/control_udp.c +++ b/daemon/control_udp.c @@ -15,11 +15,11 @@ #include "call.h" -static void control_udp_closed(int fd, void *p) { +static void control_udp_closed(int fd, void *p, uintptr_t x) { abort(); } -static void control_udp_incoming(int fd, void *p) { +static void control_udp_incoming(int fd, void *p, uintptr_t x) { struct control_udp *u = p; int ret, len; char buf[8192]; @@ -87,7 +87,7 @@ static void control_udp_incoming(int fd, void *p) { pcre_get_substring_list(buf, ovec, ret, &out); - if (u->poller->now - u->oven_time >= 30) { + if (poller_now(u->poller) - u->oven_time >= 30) { g_hash_table_remove_all(u->stale_cookies); #if GLIB_CHECK_VERSION(2,14,0) g_string_chunk_clear(u->stale_chunks); @@ -98,7 +98,7 @@ static void control_udp_incoming(int fd, void *p) { u->fresh_chunks = g_string_chunk_new(4 * 1024); #endif swap_ptrs(&u->stale_cookies, &u->fresh_cookies); - u->oven_time = u->poller->now; /* baked new cookies! */ + u->oven_time = poller_now(u->poller); /* baked new cookies! */ } /* XXX better hashing */ @@ -187,8 +187,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1 goto fail; - c = malloc(sizeof(*c)); - ZERO(*c); + c = obj_alloc0("control_udp", sizeof(*c), NULL); c->fd = fd; c->poller = p; @@ -197,7 +196,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1 c->stale_cookies = g_hash_table_new(g_str_hash, g_str_equal); c->fresh_chunks = g_string_chunk_new(4 * 1024); c->stale_chunks = g_string_chunk_new(4 * 1024); - c->oven_time = p->now; + c->oven_time = poller_now(p); c->parse_re = pcre_compile( /* cookie cmd flags callid viabranch:5 */ "^(\\S+)\\s+(?:([ul])(\\S*)\\s+([^;]+)(?:;(\\S+))?\\s+" \ @@ -221,7 +220,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1 i.fd = fd; i.closed = control_udp_closed; i.readable = control_udp_incoming; - i.ptr = c; + i.obj = &c->obj; if (poller_add_item(p, &i)) goto fail2; diff --git a/daemon/control_udp.h b/daemon/control_udp.h index 815121872..5aae4183a 100644 --- a/daemon/control_udp.h +++ b/daemon/control_udp.h @@ -9,6 +9,9 @@ #include #include #include +#include "obj.h" + + #define RE_UDP_COOKIE 1 #define RE_UDP_UL_CMD 2 @@ -39,6 +42,8 @@ struct callmaster; struct control_udp { + struct obj obj; + int fd; struct poller *poller; diff --git a/daemon/obj.h b/daemon/obj.h new file mode 100644 index 000000000..37fd45172 --- /dev/null +++ b/daemon/obj.h @@ -0,0 +1,157 @@ +#ifndef _OBJ_H_ +#define _OBJ_H_ + + + +#include +#include +#include +#include +#include + +#include "log.h" + + + + +#define OBJ_DEBUG 1 + + + + +struct obj { +#if OBJ_DEBUG + u_int32_t magic; + char *type; +#endif + volatile gint ref; + void (*free_func)(void *); + unsigned int size; +}; + + + + + +#if OBJ_DEBUG + +#define OBJ_MAGIC 0xf1eef1ee + +#define obj_alloc(t,a,b) __obj_alloc(a,b,t,__FILE__,__LINE__) +#define obj_alloc0(t,a,b) __obj_alloc0(a,b,t,__FILE__,__LINE__) +#define obj_hold(a) __obj_hold(a,__FILE__,__LINE__) +#define obj_get(a) __obj_get(a,__FILE__,__LINE__) +#define obj_put(a) __obj_put(a,__FILE__,__LINE__) + +#else + +#define obj_alloc(t,a,b) __obj_alloc(a,b) +#define obj_alloc0(t,a,b) __obj_alloc0(a,b) +#define obj_hold(a) __obj_hold(a) +#define obj_get(a) __obj_get(a) +#define obj_put(a) __obj_put(a) + +#endif + +static inline void __obj_init(struct obj *o, unsigned int size, void (*free_func)(void *) +#if OBJ_DEBUG +, const char *type, const char *file, unsigned int line +#endif +) { +#if OBJ_DEBUG + o->magic = OBJ_MAGIC; + o->type = strdup(type); + mylog(LOG_DEBUG, "obj_allocX(\"%s\", size %u) -> %p [%s:%u]", type, size, o, file, line); +#endif + o->ref = 1; + o->free_func = free_func; + o->size = size; +} + +static inline void *__obj_alloc(unsigned int size, void (*free_func)(void *) +#if OBJ_DEBUG +, const char *type, const char *file, unsigned int line +#endif +) { + struct obj *r; + + r = g_slice_alloc(size); + __obj_init(r, size, free_func +#if OBJ_DEBUG + , type, file, line +#endif + ); + return r; +} + +static inline void *__obj_alloc0(unsigned int size, void (*free_func)(void *) +#if OBJ_DEBUG +, const char *type, const char *file, unsigned int line +#endif +) { + struct obj *r; + + r = g_slice_alloc0(size); + __obj_init(r, size, free_func +#if OBJ_DEBUG + , type, file, line +#endif + ); + return r; +} + +static inline struct obj *__obj_hold(struct obj *o +#if OBJ_DEBUG +, const char *file, unsigned int line +#endif +) { +#if OBJ_DEBUG + assert(o->magic == OBJ_MAGIC); + mylog(LOG_DEBUG, "obj_hold(%p, \"%s\", size %u), refcnt before %u [%s:%u]", + o, o->type, o->size, g_atomic_int_get(&o->ref), file, line); +#endif + g_atomic_int_inc(&o->ref); +#if OBJ_DEBUG + mylog(LOG_DEBUG, "obj_hold(%p, \"%s\", size %u), refcnt after %u [%s:%u]", + o, o->type, o->size, g_atomic_int_get(&o->ref), file, line); +#endif + return o; +} + +static inline void *__obj_get(struct obj *o +#if OBJ_DEBUG +, const char *file, unsigned int line +#endif +) { + return __obj_hold(o +#if OBJ_DEBUG + , file, line +#endif + ); +} + +static inline void __obj_put(struct obj *o +#if OBJ_DEBUG +, const char *file, unsigned int line +#endif +) { +#if OBJ_DEBUG + assert(o->magic == OBJ_MAGIC); + mylog(LOG_DEBUG, "obj_put(%p, \"%s\", size %u), refcnt before %u [%s:%u]", + o, o->type, o->size, g_atomic_int_get(&o->ref), file, line); +#endif + if (!g_atomic_int_dec_and_test(&o->ref)) + return; +#if OBJ_DEBUG + mylog(LOG_DEBUG, "obj_put(%p, \"%s\", size %u), refcnt after %u [%s:%u]", + o, o->type, o->size, g_atomic_int_get(&o->ref), file, line); + free(o->type); +#endif + if (o->free_func) + o->free_func(o); + g_slice_free1(o->size, o); +} + + + +#endif diff --git a/daemon/poller.c b/daemon/poller.c index b5525e672..fcd0d4e9e 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -12,15 +12,37 @@ #include "poller.h" #include "aux.h" +#include "obj.h" struct timer_item { - void (*func)(void *); - void *ptr; + struct obj obj; + void (*func)(void *); + struct obj *obj_ptr; }; +struct poller_item_int { + struct obj obj; + struct poller_item item; + + int blocked:1; + int error:1; +}; + +struct poller { + int fd; + GMutex lock; + struct poller_item_int **items; + unsigned int items_size; + GList *timers; + + time_t now; +}; + + + @@ -34,42 +56,58 @@ struct poller *poller_new(void) { p->fd = epoll_create1(0); if (p->fd == -1) abort(); + g_mutex_init(&p->lock); return p; } -static int epoll_events(struct poller_item *i) { - return EPOLLHUP | EPOLLERR | ((i->writeable && i->blocked) ? EPOLLOUT : 0) | (i->readable ? EPOLLIN : 0); +static int epoll_events(struct poller_item *it, struct poller_item_int *ii) { + if (!it) + it = &ii->item; + return EPOLLHUP | EPOLLERR | + ((it->writeable && ii && ii->blocked) ? EPOLLOUT : 0) | + (it->readable ? EPOLLIN : 0); } static void poller_fd_timer(void *p) { - struct poller_item *it = p; + struct poller_item_int *it = p; - if (it->timer) - it->timer(it->fd, it->ptr); + if (it->item.timer) + it->item.timer(it->item.fd, it->item.obj, it->item.uintp); } -int poller_add_item(struct poller *p, struct poller_item *i) { - struct poller_item *ip; +static void poller_item_free(void *p) { + struct poller_item_int *i = p; + obj_put(i->item.obj); +} + + +/* unlocks on return */ +static int __poller_add_item(struct poller *p, struct poller_item *i, int has_lock) { + struct poller_item_int *ip; unsigned int u; struct epoll_event e; if (!p || !i) - return -1; + goto fail_lock; if (i->fd < 0) - return -1; + goto fail_lock; if (!i->readable && !i->writeable) - return -1; + goto fail_lock; if (!i->closed) - return -1; + goto fail_lock; + + if (!has_lock) + g_mutex_lock(&p->lock); if (i->fd < p->items_size && p->items[i->fd]) - return -1; + goto fail; - e.events = epoll_events(i); + ZERO(e); + e.events = epoll_events(i, NULL); e.data.fd = i->fd; if (epoll_ctl(p->fd, EPOLL_CTL_ADD, i->fd, &e)) abort(); @@ -81,58 +119,92 @@ int poller_add_item(struct poller *p, struct poller_item *i) { memset(p->items + u, 0, sizeof(*p->items) * (p->items_size - u - 1)); } - ip = g_slice_alloc(sizeof(*ip)); - memcpy(ip, i, sizeof(*ip)); - p->items[i->fd] = ip; + ip = obj_alloc0("poller_item_int", sizeof(*ip), poller_item_free); + memcpy(&ip->item, i, sizeof(*i)); + obj_hold(ip->item.obj); /* new ref in *ip */ + p->items[i->fd] = obj_get(&ip->obj); + + g_mutex_unlock(&p->lock); if (i->timer) - poller_timer(p, poller_fd_timer, ip); + poller_timer(p, poller_fd_timer, &ip->obj); + + obj_put(&ip->obj); return 0; + +fail: + g_mutex_unlock(&p->lock); + return -1; +fail_lock: + if (has_lock) + g_mutex_unlock(&p->lock); + return -1; +} + + +int poller_add_item(struct poller *p, struct poller_item *i) { + return __poller_add_item(p, i, 0); } int poller_find_timer(gconstpointer a, gconstpointer b) { const struct timer_item *it = a; - const struct poller_item *x = b; + const struct obj *x = b; - if (it->ptr == x) + if (it->obj_ptr == x) return 0; return 1; } int poller_del_item(struct poller *p, int fd) { - struct poller_item *it; + struct poller_item_int *it; GList *l; + struct timer_item *ti; if (!p || fd < 0) return -1; + + g_mutex_lock(&p->lock); + if (fd >= p->items_size) - return -1; + goto fail; if (!p->items || !(it = p->items[fd])) - return -1; + goto fail; if (epoll_ctl(p->fd, EPOLL_CTL_DEL, fd, NULL)) abort(); - if (it->timer) { - l = g_list_find_custom(p->timers, it, poller_find_timer); - if (!l) - abort(); - g_slice_free1(sizeof(struct timer_item), l->data); - p->timers = g_list_delete_link(p->timers, l); + p->items[fd] = NULL; /* stealing the ref */ + + g_mutex_unlock(&p->lock); + + if (it->item.timer) { + while (1) { + /* rare but possible race with poller_add_item above */ + l = g_list_find_custom(p->timers, &it->obj, poller_find_timer); + if (l) + break; + } + p->timers = g_list_remove_link(p->timers, l); + ti = l->data; + obj_put(&ti->obj); + g_list_free_1(l); } - g_slice_free1(sizeof(*it), it); - p->items[fd] = NULL; + obj_put(&it->obj); return 0; + +fail: + g_mutex_unlock(&p->lock); + return -1; } int poller_update_item(struct poller *p, struct poller_item *i) { - struct poller_item *np; + struct poller_item_int *np; if (!p || !i) return -1; @@ -143,14 +215,21 @@ int poller_update_item(struct poller *p, struct poller_item *i) { if (!i->closed) return -1; + g_mutex_lock(&p->lock); + if (i->fd >= p->items_size || !(np = p->items[i->fd])) - return poller_add_item(p, i); + return __poller_add_item(p, i, 1); + + obj_hold(i->obj); + obj_put(np->item.obj); + np->item.obj = i->obj; + np->item.uintp = i->uintp; + np->item.readable = i->readable; + np->item.writeable = i->writeable; + np->item.closed = i->closed; + /* updating timer is not supported */ - np->ptr = i->ptr; - np->readable = i->readable; - np->writeable = i->writeable; - np->closed = i->closed; - np->timer = i->timer; + g_mutex_unlock(&p->lock); return 0; } @@ -158,7 +237,7 @@ int poller_update_item(struct poller *p, struct poller_item *i) { int poller_poll(struct poller *p, int timeout) { int ret, i; - struct poller_item *it; + struct poller_item_int *it; time_t last; GList *li, *ne; struct timer_item *ti; @@ -166,8 +245,12 @@ int poller_poll(struct poller *p, int timeout) { if (!p) return -1; + + g_mutex_lock(&p->lock); + + ret = -1; if (!p->items || !p->items_size) - return -1; + goto out; last = p->now; p->now = time(NULL); @@ -175,13 +258,19 @@ int poller_poll(struct poller *p, int timeout) { for (li = p->timers; li; li = ne) { ne = li->next; ti = li->data; - ti->func(ti->ptr); + /* XXX not safe */ + g_mutex_unlock(&p->lock); + ti->func(ti->obj_ptr); + g_mutex_lock(&p->lock); } - return p->items_size; + ret = p->items_size; + goto out; } + g_mutex_unlock(&p->lock); errno = 0; ret = epoll_wait(p->fd, evs, sizeof(evs) / sizeof(*evs), timeout); + g_mutex_lock(&p->lock); if (errno == EINTR) ret = 0; @@ -198,33 +287,44 @@ int poller_poll(struct poller *p, int timeout) { if (!it) continue; + obj_hold(&it->obj); + g_mutex_unlock(&p->lock); + if (it->error) { - it->closed(it->fd, it->ptr); - continue; + it->item.closed(it->item.fd, it->item.obj, it->item.uintp); + goto next; } if ((ev->events & (POLLERR | POLLHUP))) - it->closed(it->fd, it->ptr); + it->item.closed(it->item.fd, it->item.obj, it->item.uintp); else if ((ev->events & POLLOUT)) { + g_mutex_lock(&p->lock); it->blocked = 0; - e.events = epoll_events(it); - e.data.fd = it->fd; - if (epoll_ctl(p->fd, EPOLL_CTL_MOD, it->fd, &e)) + ZERO(e); + e.events = epoll_events(NULL, it); + e.data.fd = it->item.fd; + if (epoll_ctl(p->fd, EPOLL_CTL_MOD, it->item.fd, &e)) abort(); - it->writeable(it->fd, it->ptr); + g_mutex_unlock(&p->lock); + it->item.writeable(it->item.fd, it->item.obj, it->item.uintp); } else if ((ev->events & POLLIN)) - it->readable(it->fd, it->ptr); + it->item.readable(it->item.fd, it->item.obj, it->item.uintp); else if (!ev->events) - continue; + goto next; else abort(); + +next: + obj_put(&it->obj); + g_mutex_lock(&p->lock); } out: + g_mutex_unlock(&p->lock); return ret; } @@ -234,63 +334,94 @@ void poller_blocked(struct poller *p, int fd) { if (!p || fd < 0) return; + + g_mutex_lock(&p->lock); + if (fd >= p->items_size) - return; + goto fail; if (!p->items || !p->items[fd]) - return; - if (!p->items[fd]->writeable) - return; + goto fail; + if (!p->items[fd]->item.writeable) + goto fail; p->items[fd]->blocked = 1; - e.events = epoll_events(p->items[fd]); + ZERO(e); + e.events = epoll_events(NULL, p->items[fd]); e.data.fd = fd; if (epoll_ctl(p->fd, EPOLL_CTL_MOD, fd, &e)) abort(); + +fail: + g_mutex_unlock(&p->lock); } void poller_error(struct poller *p, int fd) { if (!p || fd < 0) return; + + g_mutex_lock(&p->lock); + if (fd >= p->items_size) - return; + goto fail; if (!p->items || !p->items[fd]) - return; - if (!p->items[fd]->writeable) - return; + goto fail; + if (!p->items[fd]->item.writeable) + goto fail; p->items[fd]->error = 1; p->items[fd]->blocked = 1; + +fail: + g_mutex_unlock(&p->lock); } int poller_isblocked(struct poller *p, int fd) { + int ret; + if (!p || fd < 0) return -1; + + g_mutex_lock(&p->lock); + + ret = -1; if (fd >= p->items_size) - return -1; + goto out; if (!p->items || !p->items[fd]) - return -1; - if (!p->items[fd]->writeable) - return -1; + goto out; + if (!p->items[fd]->item.writeable) + goto out; - return p->items[fd]->blocked; + ret = p->items[fd]->blocked ? 1 : 0; + +out: + g_mutex_unlock(&p->lock); + return ret; } +static void timer_item_free(void *p) { + struct timer_item *i = p; + obj_put(i->obj_ptr); +} -int poller_timer(struct poller *p, void (*f)(void *), void *ptr) { +int poller_timer(struct poller *p, void (*f)(void *), struct obj *o) { struct timer_item *i; - if (!p || !f) + if (!o || !f) return -1; - i = g_slice_alloc0(sizeof(*i)); + i = obj_alloc0("timer_item", sizeof(*i), timer_item_free); i->func = f; - i->ptr = ptr; + i->obj_ptr = obj_hold(o); p->timers = g_list_prepend(p->timers, i); return 0; } + +time_t poller_now(struct poller *p) { + return p->now; +} diff --git a/daemon/poller.h b/daemon/poller.h index 01b79d359..ca0386b63 100644 --- a/daemon/poller.h +++ b/daemon/poller.h @@ -4,32 +4,27 @@ #include +#include #include #include +#include "obj.h" struct poller_item { int fd; - void *ptr; + struct obj *obj; + uintptr_t uintp; - void (*readable)(int, void *); - void (*writeable)(int, void *); - void (*closed)(int, void *); - void (*timer)(int, void *); - - int blocked:1; - int error:1; + void (*readable)(int, void *, uintptr_t); + void (*writeable)(int, void *, uintptr_t); + void (*closed)(int, void *, uintptr_t); + void (*timer)(int, void *, uintptr_t); }; -struct poller { - int fd; - struct poller_item **items; - unsigned int items_size; - GList *timers; +struct poller; + - time_t now; -}; struct poller *poller_new(void); @@ -40,8 +35,9 @@ int poller_poll(struct poller *, int); void poller_blocked(struct poller *, int); int poller_isblocked(struct poller *, int); void poller_error(struct poller *, int); +time_t poller_now(struct poller *); -int poller_timer(struct poller *, void (*)(void *), void *); +int poller_timer(struct poller *, void (*)(void *), struct obj *); #endif diff --git a/daemon/streambuf.c b/daemon/streambuf.c index ccb55c0fe..4d8f86787 100644 --- a/daemon/streambuf.c +++ b/daemon/streambuf.c @@ -21,7 +21,7 @@ struct streambuf *streambuf_new(struct poller *p, int fd) { b->buf = g_string_new(""); b->fd = fd; b->poller = p; - b->active = p->now; + b->active = poller_now(p); return b; } @@ -52,7 +52,7 @@ int streambuf_writeable(struct streambuf *b) { if (ret > 0) { g_string_erase(b->buf, 0, ret); - b->active = b->poller->now; + b->active = poller_now(b->poller); } if (ret != out) { @@ -80,7 +80,7 @@ int streambuf_readable(struct streambuf *b) { } g_string_append_len(b->buf, buf, ret); - b->active = b->poller->now; + b->active = poller_now(b->poller); } return 0; @@ -162,7 +162,7 @@ void streambuf_write(struct streambuf *b, char *s, unsigned int len) { s += ret; len -= ret; - b->active = b->poller->now; + b->active = poller_now(b->poller); } if (b->buf->len > 5242880)