Prepare for multi-threaded operation

Introduce generic "objects" with reference counting primitives and automatic
resource deallocation when no refs are left ("garbage collection"). Overhaul
poller framework to make extensive use of these objects to allow for future
thread-concurrent operation.

No locking added anywhere yet except in poller. Poller is still not 100%
thread safe, but close.

Valgrind reports no errors or memleaks.
git.mgm/mediaproxy-ng/2.1
Richard Fuchs 13 years ago
parent 5d1410c61a
commit 145bbd1f7c

@ -83,13 +83,16 @@ static void unkernelize(struct peer *);
static void stream_closed(int fd, void *p) {
struct streamrelay *r = p;
static void stream_closed(int fd, void *p, uintptr_t u) {
struct callstream *cs = p;
struct streamrelay *r;
struct call *c;
int i;
socklen_t j;
c = r->up->up->call;
r = &cs->peers[u >> 1].rtps[u & 1];
assert(r->fd == fd);
c = cs->call;
j = sizeof(i);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &i, &j);
@ -196,7 +199,7 @@ static int stream_packet(struct streamrelay *r, char *b, int l, struct sockaddr_
if (pe->confirmed || !pe->filled || r->idx != 0)
goto forward;
if (!c->lookup_done || m->poller->now <= c->lookup_done + 3)
if (!c->lookup_done || poller_now(m->poller) <= c->lookup_done + 3)
goto peerinfo;
mylog(LOG_DEBUG, LOG_PREFIX_C "Confirmed peer information for port %u - %s:%u",
@ -302,7 +305,7 @@ drop:
r->stats.bytes += l;
m->statsps.packets++;
m->statsps.bytes += l;
r->last = m->poller->now;
r->last = poller_now(m->poller);
return 0;
}
@ -310,8 +313,9 @@ drop:
static void stream_readable(int fd, void *p) {
struct streamrelay *r = p;
static void stream_readable(int fd, void *p, uintptr_t u) {
struct callstream *cs = p;
struct streamrelay *r;
char buf[8192];
int ret;
struct sockaddr_storage ss;
@ -320,6 +324,9 @@ static void stream_readable(int fd, void *p) {
unsigned int sinlen;
void *sinp;
r = &cs->peers[u >> 1].rtps[u & 1];
assert(r->fd == fd);
for (;;) {
sinlen = sizeof(ss);
ret = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *) &ss, &sinlen);
@ -327,7 +334,7 @@ static void stream_readable(int fd, void *p) {
if (ret < 0) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)
break;
stream_closed(fd, r);
stream_closed(fd, r, 0);
break;
}
if (ret >= sizeof(buf))
@ -467,7 +474,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
else if (IN6_IS_ADDR_UNSPECIFIED(&sr->peer.ip46))
check = cm->silent_timeout;
if (po->now - sr->last < check)
if (poller_now(po) - sr->last < check)
goto good;
}
}
@ -568,7 +575,7 @@ static void callmaster_timer(void *ptr) {
DS(errors);
if (ke->stats.packets != sr->kstats.packets)
sr->last = po->now;
sr->last = poller_now(po);
sr->kstats.packets = ke->stats.packets;
sr->kstats.bytes = ke->stats.bytes;
@ -595,19 +602,20 @@ next:
struct callmaster *callmaster_new(struct poller *p) {
struct callmaster *c;
c = g_slice_alloc0(sizeof(*c));
c = obj_alloc0("callmaster", sizeof(*c), NULL);
c->callhash = g_hash_table_new(g_str_hash, g_str_equal);
if (!c->callhash)
goto fail;
c->poller = p;
poller_timer(p, callmaster_timer, c);
poller_timer(p, callmaster_timer, &c->obj);
obj_put(&c->obj);
return c;
fail:
g_slice_free1(sizeof(*c), c);
obj_put(&c->obj);
return NULL;
}
@ -876,7 +884,8 @@ static void steal_peer(struct peer *dest, struct peer *src) {
srs->peer_advertised.port = 0;
pi.fd = sr->fd;
pi.ptr = sr;
pi.obj = &sr->up->up->obj;
pi.uintp = i | (dest->idx << 1);
pi.readable = stream_readable;
pi.closed = stream_closed;
@ -894,10 +903,9 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2
po = ca->callmaster->poller;
ZERO(*s);
ZERO(pi);
s->call = ca;
s->call = obj_get(&ca->obj);
DBG("setting new callstream num to %i", num);
s->num = num;
@ -915,7 +923,7 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2
r->fd = -1;
r->idx = j;
r->up = p;
r->last = po->now;
r->last = poller_now(po);
}
tport = (i == 0) ? port1 : port2;
@ -927,7 +935,8 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2
r = &p->rtps[j];
pi.fd = r->fd;
pi.ptr = r;
pi.obj = &s->obj;
pi.uintp = (i << 1) | j;
pi.readable = stream_readable;
pi.closed = stream_closed;
@ -939,6 +948,31 @@ void callstream_init(struct callstream *s, struct call *ca, int port1, int port2
static void callstream_free(void *ptr) {
struct callstream *s = ptr;
int i, j;
struct peer *p;
struct streamrelay *r;
for (i = 0; i < 2; i++) {
p = &s->peers[i];
free(p->tag);
free(p->mediatype);
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
if (r->fd != -1) {
close(r->fd);
bit_array_clear(ports_used, r->localport);
r->fd = -1;
}
}
}
obj_put(&s->call->obj);
}
static int call_streams(struct call *c, GQueue *s, const char *tag, int opmode) {
GQueue *q;
GList *i, *l;
@ -989,7 +1023,7 @@ found:
if (!opmode) { /* request */
DBG("creating new callstream");
cs = g_slice_alloc(sizeof(*cs));
cs = obj_alloc0("callstream", sizeof(*cs), callstream_free);
if (!r) {
/* nothing found to re-use, open new ports */
@ -1012,7 +1046,7 @@ found:
}
}
g_queue_push_tail(q, cs);
g_queue_push_tail(q, cs); /* hand over the ref */
ZERO(c->lookup_done);
continue;
}
@ -1070,12 +1104,12 @@ got_cs:
/* need a new call stream after all */
DBG("case 4");
cs_o = cs;
cs = g_slice_alloc(sizeof(*cs));
cs = obj_alloc0("callstream", sizeof(*cs), callstream_free);
callstream_init(cs, c, 0, 0, t->num);
steal_peer(&cs->peers[0], &cs_o->peers[0]);
p = &cs->peers[1];
setup_peer(p, t, tag);
g_queue_push_tail(c->callstreams, cs_o);
g_queue_push_tail(c->callstreams, cs_o); /* hand over ref XXX? */
}
time(&c->lookup_done);
@ -1136,39 +1170,24 @@ static void kill_callstream(struct callstream *s) {
unkernelize(p);
free(p->tag);
free(p->mediatype);
for (j = 0; j < 2; j++) {
r = &p->rtps[j];
if (r->fd != -1) {
if (r->fd != -1)
poller_del_item(s->call->callmaster->poller, r->fd);
close(r->fd);
bit_array_clear(ports_used, r->localport);
}
}
}
g_slice_free1(sizeof(*s), s);
}
static void call_destroy(struct call *c) {
struct callmaster *m = c->callmaster;
struct callstream *s;
g_hash_table_remove(m->callhash, c->callid);
g_hash_table_remove(m->callhash, c->callid); /* steal this ref */
if (redis_delete)
redis_delete(c);
g_hash_table_destroy(c->infohash);
g_hash_table_destroy(c->branches);
if (c->calling_agent)
free(c->calling_agent);
if (c->called_agent)
free(c->called_agent);
mylog(LOG_INFO, LOG_PREFIX_C "Final packet stats:", c->callid);
while (c->callstreams->head) {
s = g_queue_pop_head(c->callstreams);
@ -1190,11 +1209,10 @@ static void call_destroy(struct call *c) {
s->peers[1].rtps[1].localport, s->peers[1].rtps[1].stats.packets,
s->peers[1].rtps[1].stats.bytes, s->peers[1].rtps[1].stats.errors);
kill_callstream(s);
obj_put(&s->obj);
}
g_queue_free(c->callstreams);
free(c->callid);
g_slice_free1(sizeof(*c), c);
obj_put(&c->obj);
}
@ -1280,16 +1298,29 @@ static guint g_str_hash0(gconstpointer v) {
return g_str_hash(v);
}
static void call_free(void *p) {
struct call *c = p;
g_hash_table_destroy(c->infohash);
g_hash_table_destroy(c->branches);
if (c->calling_agent)
free(c->calling_agent);
if (c->called_agent)
free(c->called_agent);
g_queue_free(c->callstreams);
free(c->callid);
}
static struct call *call_create(const char *callid, struct callmaster *m) {
struct call *c;
mylog(LOG_NOTICE, LOG_PREFIX_C "Creating new call",
callid); /* XXX will spam syslog on recovery from DB */
c = g_slice_alloc0(sizeof(*c));
c = obj_alloc0("call", sizeof(*c), call_free);
c->callmaster = m;
c->callid = strdup(callid);
c->callstreams = g_queue_new();
c->created = m->poller->now;
c->created = poller_now(m->poller);
c->infohash = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
c->branches = g_hash_table_new_full(g_str_hash0, g_str_equal0, free, NULL);
return c;
@ -1302,8 +1333,10 @@ struct call *call_get_or_create(const char *callid, const char *viabranch, struc
if (!c) {
/* completely new call-id, create call */
c = call_create(callid, m);
g_hash_table_insert(m->callhash, c->callid, c);
g_hash_table_insert(m->callhash, c->callid, obj_get(&c->obj));
}
else
obj_hold(&c->obj);
if (viabranch && !g_hash_table_lookup(c->branches, viabranch))
g_hash_table_insert(c->branches, strdup(viabranch), (void *) 0x1);
@ -1382,12 +1415,14 @@ char *call_update_udp(const char **out, struct callmaster *m) {
ret = streams_print(c->callstreams, 1, (num >= 0) ? 0 : 1, out[RE_UDP_COOKIE], 1);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
c->log_info = NULL;
obj_put(&c->obj);
return ret;
fail:
mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]);
asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]);
c->log_info = NULL;
obj_put(&c->obj);
return ret;
}
@ -1405,6 +1440,7 @@ char *call_lookup_udp(const char **out, struct callmaster *m) {
asprintf(&ret, "%s 0 " IPF "\n", out[RE_UDP_COOKIE], IPP(m->ipv4));
return ret;
}
obj_hold(&c->obj);
c->log_info = out[RE_UDP_UL_CALLID];
strdupfree(&c->called_agent, "UNKNOWN(udp)");
@ -1423,12 +1459,14 @@ char *call_lookup_udp(const char **out, struct callmaster *m) {
ret = streams_print(c->callstreams, 1, (num >= 0) ? 1 : 0, out[RE_UDP_COOKIE], 1);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
c->log_info = NULL;
obj_put(&c->obj);
return ret;
fail:
mylog(LOG_WARNING, "Failed to parse a media stream: %s/%s:%s", out[RE_UDP_UL_ADDR4], out[RE_UDP_UL_ADDR6], out[RE_UDP_UL_PORT]);
asprintf(&ret, "%s E8\n", out[RE_UDP_COOKIE]);
c->log_info = NULL;
obj_put(&c->obj);
return ret;
}
@ -1451,6 +1489,7 @@ char *call_request(const char **out, struct callmaster *m) {
ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 0 : 1, NULL, 0);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
obj_put(&c->obj);
return ret;
}
@ -1466,6 +1505,8 @@ char *call_lookup(const char **out, struct callmaster *m) {
return NULL;
}
obj_hold(&c->obj);
strdupfree(&c->called_agent, out[RE_TCP_RL_AGENT] ? : "UNKNOWN");
info_parse(out[RE_TCP_RL_INFO], &c->infohash);
s = streams_parse(out[RE_TCP_RL_STREAMS]);
@ -1477,6 +1518,7 @@ char *call_lookup(const char **out, struct callmaster *m) {
ret = streams_print(c->callstreams, abs(num), (num >= 0) ? 1 : 0, NULL, 0);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %s", LOG_PARAMS_CI(c), ret);
obj_put(&c->obj);
return ret;
}
@ -1496,6 +1538,7 @@ char *call_delete_udp(const char **out, struct callmaster *m) {
mylog(LOG_INFO, LOG_PREFIX_C "Call-ID to delete not found", out[RE_UDP_D_CALLID]);
goto err;
}
obj_hold(&c->obj);
c->log_info = out[RE_UDP_D_VIABRANCH];
if (out[RE_UDP_D_FROMTAG] && *out[RE_UDP_D_FROMTAG]) {
@ -1552,8 +1595,10 @@ err:
goto out;
out:
if (c)
if (c) {
c->log_info = NULL;
obj_put(&c->obj);
}
return ret;
}
@ -1564,8 +1609,10 @@ void call_delete(const char **out, struct callmaster *m) {
if (!c)
return;
obj_hold(&c->obj);
/* delete whole list, as we don't have branches in tcp controller */
call_destroy(c);
obj_put(&c->obj);
}
@ -1590,7 +1637,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
(char *) g_hash_table_lookup(c->infohash, "from"),
(char *) g_hash_table_lookup(c->infohash, "to"),
c->calling_agent, c->called_agent,
(int) (m->poller->now - c->created));
(int) (poller_now(m->poller) - c->created));
for (l = c->callstreams->head; l; l = l->next) {
cs = l->data;
@ -1620,7 +1667,7 @@ static void call_status_iterator(void *key, void *val, void *ptr) {
(long long unsigned int) r1->stats.bytes + rx1->stats.bytes + r2->stats.bytes + rx2->stats.bytes,
"active",
p->codec ? : "unknown",
p->mediatype, (int) (m->poller->now - r1->last));
p->mediatype, (int) (poller_now(m->poller) - r1->last));
}
}

@ -10,6 +10,7 @@
#include "control.h"
#include "control_udp.h"
#include "obj.h"
struct poller;
struct control_stream;
@ -67,12 +68,15 @@ struct peer {
int confirmed:1;
};
struct callstream {
struct obj obj;
struct peer peers[2];
struct call *call;
int num;
};
struct call {
struct obj obj;
struct callmaster *callmaster;
GQueue *callstreams;
@ -90,6 +94,8 @@ struct call {
};
struct callmaster {
struct obj obj;
GHashTable *callhash;
u_int16_t lastport;
struct stats statsps;

@ -12,10 +12,13 @@
#include "log.h"
#include "call.h"
static pcre *parse_re;
static pcre_extra *parse_ree;
static void control_stream_closed(int fd, void *p) {
static void control_stream_closed(int fd, void *p, uintptr_t u) {
struct control_stream *s = p;
struct control *c;
@ -23,23 +26,22 @@ static void control_stream_closed(int fd, void *p) {
c = s->control;
c->stream_head = g_list_remove_link(c->stream_head, &s->link);
c->streams = g_list_remove(c->streams, s);
obj_put(&s->obj);
if (poller_del_item(s->poller, fd))
abort();
close(fd);
streambuf_destroy(s->inbuf);
streambuf_destroy(s->outbuf);
free(s);
}
static void control_list(struct control *c, struct control_stream *s) {
struct control_stream *i;
GList *l;
for (i = (void *) c->stream_head; i; i = (void *) i->link.next)
streambuf_printf(s->outbuf, DF "\n", DP(s->inaddr));
for (l = c->streams; l; l = l->next) {
i = l->data;
streambuf_printf(s->outbuf, DF "\n", DP(i->inaddr));
}
streambuf_printf(s->outbuf, "End.\n");
}
@ -98,16 +100,16 @@ static int control_stream_parse(struct control_stream *s, char *line) {
}
static void control_stream_timer(int fd, void *p) {
static void control_stream_timer(int fd, void *p, uintptr_t u) {
struct control_stream *s = p;
struct poller *o = s->poller;
if ((o->now - s->inbuf->active) >= 60 || (o->now - s->outbuf->active) >= 60)
control_stream_closed(s->fd, s);
if ((poller_now(o) - s->inbuf->active) >= 60 || (poller_now(o) - s->outbuf->active) >= 60)
control_stream_closed(s->fd, s, 0);
}
static void control_stream_readable(int fd, void *p) {
static void control_stream_readable(int fd, void *p, uintptr_t u) {
struct control_stream *s = p;
char *line;
int ret;
@ -131,21 +133,29 @@ static void control_stream_readable(int fd, void *p) {
return;
close:
control_stream_closed(fd, s);
control_stream_closed(fd, s, 0);
}
static void control_stream_writeable(int fd, void *p) {
static void control_stream_writeable(int fd, void *p, uintptr_t u) {
struct control_stream *s = p;
if (streambuf_writeable(s->outbuf))
control_stream_closed(fd, s);
control_stream_closed(fd, s, 0);
}
static void control_closed(int fd, void *p) {
static void control_closed(int fd, void *p, uintptr_t u) {
abort();
}
static void control_incoming(int fd, void *p) {
static void control_stream_free(void *p) {
struct control_stream *s = p;
close(s->fd);
streambuf_destroy(s->inbuf);
streambuf_destroy(s->outbuf);
}
static void control_incoming(int fd, void *p, uintptr_t u) {
int nfd;
struct control *c = p;
struct control_stream *s;
@ -161,8 +171,14 @@ static void control_incoming(int fd, void *p) {
mylog(LOG_INFO, "New control connection from " DF, DP(sin));
s = malloc(sizeof(*s));
ZERO(*s);
s = obj_alloc0("control_stream", sizeof(*s), control_stream_free);
s->fd = nfd;
s->control = c;
s->poller = c->poller;
s->inbuf = streambuf_new(c->poller, nfd);
s->outbuf = streambuf_new(c->poller, nfd);
memcpy(&s->inaddr, &sin, sizeof(s->inaddr));
ZERO(i);
i.fd = nfd;
@ -170,23 +186,18 @@ static void control_incoming(int fd, void *p) {
i.readable = control_stream_readable;
i.writeable = control_stream_writeable;
i.timer = control_stream_timer;
i.ptr = s;
i.obj = &s->obj;
if (poller_add_item(c->poller, &i))
goto fail;
s->fd = nfd;
s->control = c;
s->poller = c->poller;
s->inbuf = streambuf_new(c->poller, nfd);
s->outbuf = streambuf_new(c->poller, nfd);
memcpy(&s->inaddr, &sin, sizeof(s->inaddr));
c->stream_head = g_list_link(c->stream_head, &s->link);
/* let the list steal our own ref */
c->streams = g_list_prepend(c->streams, s);
return;
fail:
free(s);
close(nfd);
obj_put(&s->obj);
}
@ -219,8 +230,7 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru
goto fail;
c = malloc(sizeof(*c));
ZERO(*c);
c = obj_alloc0("control", sizeof(*c), NULL);
c->fd = fd;
c->poller = p;
@ -230,14 +240,15 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru
i.fd = fd;
i.closed = control_closed;
i.readable = control_incoming;
i.ptr = c;
i.obj = &c->obj;
if (poller_add_item(p, &i))
goto fail2;
obj_put(&c->obj);
return c;
fail2:
free(c);
obj_put(&c->obj);
fail:
close(fd);
return NULL;

@ -9,6 +9,9 @@
#include <netinet/ip.h>
#include <glib.h>
#include "obj.h"
#define RE_TCP_RL_CMD 1
#define RE_TCP_RL_CALLID 2
#define RE_TCP_RL_STREAMS 3
@ -31,8 +34,9 @@ struct callmaster;
struct control_stream {
GList link; /* must be first */
struct obj obj;
int fd;
struct streambuf *inbuf;
@ -45,15 +49,18 @@ struct control_stream {
struct control {
struct obj obj;
int fd;
GList *stream_head;
GList *streams;
struct poller *poller;
struct callmaster *callmaster;
};
struct control *control_new(struct poller *, u_int32_t, u_int16_t, struct callmaster *);

@ -15,11 +15,11 @@
#include "call.h"
static void control_udp_closed(int fd, void *p) {
static void control_udp_closed(int fd, void *p, uintptr_t x) {
abort();
}
static void control_udp_incoming(int fd, void *p) {
static void control_udp_incoming(int fd, void *p, uintptr_t x) {
struct control_udp *u = p;
int ret, len;
char buf[8192];
@ -87,7 +87,7 @@ static void control_udp_incoming(int fd, void *p) {
pcre_get_substring_list(buf, ovec, ret, &out);
if (u->poller->now - u->oven_time >= 30) {
if (poller_now(u->poller) - u->oven_time >= 30) {
g_hash_table_remove_all(u->stale_cookies);
#if GLIB_CHECK_VERSION(2,14,0)
g_string_chunk_clear(u->stale_chunks);
@ -98,7 +98,7 @@ static void control_udp_incoming(int fd, void *p) {
u->fresh_chunks = g_string_chunk_new(4 * 1024);
#endif
swap_ptrs(&u->stale_cookies, &u->fresh_cookies);
u->oven_time = u->poller->now; /* baked new cookies! */
u->oven_time = poller_now(u->poller); /* baked new cookies! */
}
/* XXX better hashing */
@ -187,8 +187,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1
goto fail;
c = malloc(sizeof(*c));
ZERO(*c);
c = obj_alloc0("control_udp", sizeof(*c), NULL);
c->fd = fd;
c->poller = p;
@ -197,7 +196,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1
c->stale_cookies = g_hash_table_new(g_str_hash, g_str_equal);
c->fresh_chunks = g_string_chunk_new(4 * 1024);
c->stale_chunks = g_string_chunk_new(4 * 1024);
c->oven_time = p->now;
c->oven_time = poller_now(p);
c->parse_re = pcre_compile(
/* cookie cmd flags callid viabranch:5 */
"^(\\S+)\\s+(?:([ul])(\\S*)\\s+([^;]+)(?:;(\\S+))?\\s+" \
@ -221,7 +220,7 @@ struct control_udp *control_udp_new(struct poller *p, struct in6_addr ip, u_int1
i.fd = fd;
i.closed = control_udp_closed;
i.readable = control_udp_incoming;
i.ptr = c;
i.obj = &c->obj;
if (poller_add_item(p, &i))
goto fail2;

@ -9,6 +9,9 @@
#include <glib.h>
#include <time.h>
#include <netinet/in.h>
#include "obj.h"
#define RE_UDP_COOKIE 1
#define RE_UDP_UL_CMD 2
@ -39,6 +42,8 @@ struct callmaster;
struct control_udp {
struct obj obj;
int fd;
struct poller *poller;

@ -0,0 +1,157 @@
#ifndef _OBJ_H_
#define _OBJ_H_
#include <glib.h>
#include <sys/types.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include "log.h"
#define OBJ_DEBUG 1
struct obj {
#if OBJ_DEBUG
u_int32_t magic;
char *type;
#endif
volatile gint ref;
void (*free_func)(void *);
unsigned int size;
};
#if OBJ_DEBUG
#define OBJ_MAGIC 0xf1eef1ee
#define obj_alloc(t,a,b) __obj_alloc(a,b,t,__FILE__,__LINE__)
#define obj_alloc0(t,a,b) __obj_alloc0(a,b,t,__FILE__,__LINE__)
#define obj_hold(a) __obj_hold(a,__FILE__,__LINE__)
#define obj_get(a) __obj_get(a,__FILE__,__LINE__)
#define obj_put(a) __obj_put(a,__FILE__,__LINE__)
#else
#define obj_alloc(t,a,b) __obj_alloc(a,b)
#define obj_alloc0(t,a,b) __obj_alloc0(a,b)
#define obj_hold(a) __obj_hold(a)
#define obj_get(a) __obj_get(a)
#define obj_put(a) __obj_put(a)
#endif
static inline void __obj_init(struct obj *o, unsigned int size, void (*free_func)(void *)
#if OBJ_DEBUG
, const char *type, const char *file, unsigned int line
#endif
) {
#if OBJ_DEBUG
o->magic = OBJ_MAGIC;
o->type = strdup(type);
mylog(LOG_DEBUG, "obj_allocX(\"%s\", size %u) -> %p [%s:%u]", type, size, o, file, line);
#endif
o->ref = 1;
o->free_func = free_func;
o->size = size;
}
static inline void *__obj_alloc(unsigned int size, void (*free_func)(void *)
#if OBJ_DEBUG
, const char *type, const char *file, unsigned int line
#endif
) {
struct obj *r;
r = g_slice_alloc(size);
__obj_init(r, size, free_func
#if OBJ_DEBUG
, type, file, line
#endif
);
return r;
}
static inline void *__obj_alloc0(unsigned int size, void (*free_func)(void *)
#if OBJ_DEBUG
, const char *type, const char *file, unsigned int line
#endif
) {
struct obj *r;
r = g_slice_alloc0(size);
__obj_init(r, size, free_func
#if OBJ_DEBUG
, type, file, line
#endif
);
return r;
}
static inline struct obj *__obj_hold(struct obj *o
#if OBJ_DEBUG
, const char *file, unsigned int line
#endif
) {
#if OBJ_DEBUG
assert(o->magic == OBJ_MAGIC);
mylog(LOG_DEBUG, "obj_hold(%p, \"%s\", size %u), refcnt before %u [%s:%u]",
o, o->type, o->size, g_atomic_int_get(&o->ref), file, line);
#endif
g_atomic_int_inc(&o->ref);
#if OBJ_DEBUG
mylog(LOG_DEBUG, "obj_hold(%p, \"%s\", size %u), refcnt after %u [%s:%u]",
o, o->type, o->size, g_atomic_int_get(&o->ref), file, line);
#endif
return o;
}
static inline void *__obj_get(struct obj *o
#if OBJ_DEBUG
, const char *file, unsigned int line
#endif
) {
return __obj_hold(o
#if OBJ_DEBUG
, file, line
#endif
);
}
static inline void __obj_put(struct obj *o
#if OBJ_DEBUG
, const char *file, unsigned int line
#endif
) {
#if OBJ_DEBUG
assert(o->magic == OBJ_MAGIC);
mylog(LOG_DEBUG, "obj_put(%p, \"%s\", size %u), refcnt before %u [%s:%u]",
o, o->type, o->size, g_atomic_int_get(&o->ref), file, line);
#endif
if (!g_atomic_int_dec_and_test(&o->ref))
return;
#if OBJ_DEBUG
mylog(LOG_DEBUG, "obj_put(%p, \"%s\", size %u), refcnt after %u [%s:%u]",
o, o->type, o->size, g_atomic_int_get(&o->ref), file, line);
free(o->type);
#endif
if (o->free_func)
o->free_func(o);
g_slice_free1(o->size, o);
}
#endif

@ -12,15 +12,37 @@
#include "poller.h"
#include "aux.h"
#include "obj.h"
struct timer_item {
void (*func)(void *);
void *ptr;
struct obj obj;
void (*func)(void *);
struct obj *obj_ptr;
};
struct poller_item_int {
struct obj obj;
struct poller_item item;
int blocked:1;
int error:1;
};
struct poller {
int fd;
GMutex lock;
struct poller_item_int **items;
unsigned int items_size;
GList *timers;
time_t now;
};
@ -34,42 +56,58 @@ struct poller *poller_new(void) {
p->fd = epoll_create1(0);
if (p->fd == -1)
abort();
g_mutex_init(&p->lock);
return p;
}
static int epoll_events(struct poller_item *i) {
return EPOLLHUP | EPOLLERR | ((i->writeable && i->blocked) ? EPOLLOUT : 0) | (i->readable ? EPOLLIN : 0);
static int epoll_events(struct poller_item *it, struct poller_item_int *ii) {
if (!it)
it = &ii->item;
return EPOLLHUP | EPOLLERR |
((it->writeable && ii && ii->blocked) ? EPOLLOUT : 0) |
(it->readable ? EPOLLIN : 0);
}
static void poller_fd_timer(void *p) {
struct poller_item *it = p;
struct poller_item_int *it = p;
if (it->timer)
it->timer(it->fd, it->ptr);
if (it->item.timer)
it->item.timer(it->item.fd, it->item.obj, it->item.uintp);
}
int poller_add_item(struct poller *p, struct poller_item *i) {
struct poller_item *ip;
static void poller_item_free(void *p) {
struct poller_item_int *i = p;
obj_put(i->item.obj);
}
/* unlocks on return */
static int __poller_add_item(struct poller *p, struct poller_item *i, int has_lock) {
struct poller_item_int *ip;
unsigned int u;
struct epoll_event e;
if (!p || !i)
return -1;
goto fail_lock;
if (i->fd < 0)
return -1;
goto fail_lock;
if (!i->readable && !i->writeable)
return -1;
goto fail_lock;
if (!i->closed)
return -1;
goto fail_lock;
if (!has_lock)
g_mutex_lock(&p->lock);
if (i->fd < p->items_size && p->items[i->fd])
return -1;
goto fail;
e.events = epoll_events(i);
ZERO(e);
e.events = epoll_events(i, NULL);
e.data.fd = i->fd;
if (epoll_ctl(p->fd, EPOLL_CTL_ADD, i->fd, &e))
abort();
@ -81,58 +119,92 @@ int poller_add_item(struct poller *p, struct poller_item *i) {
memset(p->items + u, 0, sizeof(*p->items) * (p->items_size - u - 1));
}
ip = g_slice_alloc(sizeof(*ip));
memcpy(ip, i, sizeof(*ip));
p->items[i->fd] = ip;
ip = obj_alloc0("poller_item_int", sizeof(*ip), poller_item_free);
memcpy(&ip->item, i, sizeof(*i));
obj_hold(ip->item.obj); /* new ref in *ip */
p->items[i->fd] = obj_get(&ip->obj);
g_mutex_unlock(&p->lock);
if (i->timer)
poller_timer(p, poller_fd_timer, ip);
poller_timer(p, poller_fd_timer, &ip->obj);
obj_put(&ip->obj);
return 0;
fail:
g_mutex_unlock(&p->lock);
return -1;
fail_lock:
if (has_lock)
g_mutex_unlock(&p->lock);
return -1;
}
int poller_add_item(struct poller *p, struct poller_item *i) {
return __poller_add_item(p, i, 0);
}
int poller_find_timer(gconstpointer a, gconstpointer b) {
const struct timer_item *it = a;
const struct poller_item *x = b;
const struct obj *x = b;
if (it->ptr == x)
if (it->obj_ptr == x)
return 0;
return 1;
}
int poller_del_item(struct poller *p, int fd) {
struct poller_item *it;
struct poller_item_int *it;
GList *l;
struct timer_item *ti;
if (!p || fd < 0)
return -1;
g_mutex_lock(&p->lock);
if (fd >= p->items_size)
return -1;
goto fail;
if (!p->items || !(it = p->items[fd]))
return -1;
goto fail;
if (epoll_ctl(p->fd, EPOLL_CTL_DEL, fd, NULL))
abort();
if (it->timer) {
l = g_list_find_custom(p->timers, it, poller_find_timer);
if (!l)
abort();
g_slice_free1(sizeof(struct timer_item), l->data);
p->timers = g_list_delete_link(p->timers, l);
p->items[fd] = NULL; /* stealing the ref */
g_mutex_unlock(&p->lock);
if (it->item.timer) {
while (1) {
/* rare but possible race with poller_add_item above */
l = g_list_find_custom(p->timers, &it->obj, poller_find_timer);
if (l)
break;
}
p->timers = g_list_remove_link(p->timers, l);
ti = l->data;
obj_put(&ti->obj);
g_list_free_1(l);
}
g_slice_free1(sizeof(*it), it);
p->items[fd] = NULL;
obj_put(&it->obj);
return 0;
fail:
g_mutex_unlock(&p->lock);
return -1;
}
int poller_update_item(struct poller *p, struct poller_item *i) {
struct poller_item *np;
struct poller_item_int *np;
if (!p || !i)
return -1;
@ -143,14 +215,21 @@ int poller_update_item(struct poller *p, struct poller_item *i) {
if (!i->closed)
return -1;
g_mutex_lock(&p->lock);
if (i->fd >= p->items_size || !(np = p->items[i->fd]))
return poller_add_item(p, i);
return __poller_add_item(p, i, 1);
obj_hold(i->obj);
obj_put(np->item.obj);
np->item.obj = i->obj;
np->item.uintp = i->uintp;
np->item.readable = i->readable;
np->item.writeable = i->writeable;
np->item.closed = i->closed;
/* updating timer is not supported */
np->ptr = i->ptr;
np->readable = i->readable;
np->writeable = i->writeable;
np->closed = i->closed;
np->timer = i->timer;
g_mutex_unlock(&p->lock);
return 0;
}
@ -158,7 +237,7 @@ int poller_update_item(struct poller *p, struct poller_item *i) {
int poller_poll(struct poller *p, int timeout) {
int ret, i;
struct poller_item *it;
struct poller_item_int *it;
time_t last;
GList *li, *ne;
struct timer_item *ti;
@ -166,8 +245,12 @@ int poller_poll(struct poller *p, int timeout) {
if (!p)
return -1;
g_mutex_lock(&p->lock);
ret = -1;
if (!p->items || !p->items_size)
return -1;
goto out;
last = p->now;
p->now = time(NULL);
@ -175,13 +258,19 @@ int poller_poll(struct poller *p, int timeout) {
for (li = p->timers; li; li = ne) {
ne = li->next;
ti = li->data;
ti->func(ti->ptr);
/* XXX not safe */
g_mutex_unlock(&p->lock);
ti->func(ti->obj_ptr);
g_mutex_lock(&p->lock);
}
return p->items_size;
ret = p->items_size;
goto out;
}
g_mutex_unlock(&p->lock);
errno = 0;
ret = epoll_wait(p->fd, evs, sizeof(evs) / sizeof(*evs), timeout);
g_mutex_lock(&p->lock);
if (errno == EINTR)
ret = 0;
@ -198,33 +287,44 @@ int poller_poll(struct poller *p, int timeout) {
if (!it)
continue;
obj_hold(&it->obj);
g_mutex_unlock(&p->lock);
if (it->error) {
it->closed(it->fd, it->ptr);
continue;
it->item.closed(it->item.fd, it->item.obj, it->item.uintp);
goto next;
}
if ((ev->events & (POLLERR | POLLHUP)))
it->closed(it->fd, it->ptr);
it->item.closed(it->item.fd, it->item.obj, it->item.uintp);
else if ((ev->events & POLLOUT)) {
g_mutex_lock(&p->lock);
it->blocked = 0;
e.events = epoll_events(it);
e.data.fd = it->fd;
if (epoll_ctl(p->fd, EPOLL_CTL_MOD, it->fd, &e))
ZERO(e);
e.events = epoll_events(NULL, it);
e.data.fd = it->item.fd;
if (epoll_ctl(p->fd, EPOLL_CTL_MOD, it->item.fd, &e))
abort();
it->writeable(it->fd, it->ptr);
g_mutex_unlock(&p->lock);
it->item.writeable(it->item.fd, it->item.obj, it->item.uintp);
}
else if ((ev->events & POLLIN))
it->readable(it->fd, it->ptr);
it->item.readable(it->item.fd, it->item.obj, it->item.uintp);
else if (!ev->events)
continue;
goto next;
else
abort();
next:
obj_put(&it->obj);
g_mutex_lock(&p->lock);
}
out:
g_mutex_unlock(&p->lock);
return ret;
}
@ -234,63 +334,94 @@ void poller_blocked(struct poller *p, int fd) {
if (!p || fd < 0)
return;
g_mutex_lock(&p->lock);
if (fd >= p->items_size)
return;
goto fail;
if (!p->items || !p->items[fd])
return;
if (!p->items[fd]->writeable)
return;
goto fail;
if (!p->items[fd]->item.writeable)
goto fail;
p->items[fd]->blocked = 1;
e.events = epoll_events(p->items[fd]);
ZERO(e);
e.events = epoll_events(NULL, p->items[fd]);
e.data.fd = fd;
if (epoll_ctl(p->fd, EPOLL_CTL_MOD, fd, &e))
abort();
fail:
g_mutex_unlock(&p->lock);
}
void poller_error(struct poller *p, int fd) {
if (!p || fd < 0)
return;
g_mutex_lock(&p->lock);
if (fd >= p->items_size)
return;
goto fail;
if (!p->items || !p->items[fd])
return;
if (!p->items[fd]->writeable)
return;
goto fail;
if (!p->items[fd]->item.writeable)
goto fail;
p->items[fd]->error = 1;
p->items[fd]->blocked = 1;
fail:
g_mutex_unlock(&p->lock);
}
int poller_isblocked(struct poller *p, int fd) {
int ret;
if (!p || fd < 0)
return -1;
g_mutex_lock(&p->lock);
ret = -1;
if (fd >= p->items_size)
return -1;
goto out;
if (!p->items || !p->items[fd])
return -1;
if (!p->items[fd]->writeable)
return -1;
goto out;
if (!p->items[fd]->item.writeable)
goto out;
return p->items[fd]->blocked;
ret = p->items[fd]->blocked ? 1 : 0;
out:
g_mutex_unlock(&p->lock);
return ret;
}
static void timer_item_free(void *p) {
struct timer_item *i = p;
obj_put(i->obj_ptr);
}
int poller_timer(struct poller *p, void (*f)(void *), void *ptr) {
int poller_timer(struct poller *p, void (*f)(void *), struct obj *o) {
struct timer_item *i;
if (!p || !f)
if (!o || !f)
return -1;
i = g_slice_alloc0(sizeof(*i));
i = obj_alloc0("timer_item", sizeof(*i), timer_item_free);
i->func = f;
i->ptr = ptr;
i->obj_ptr = obj_hold(o);
p->timers = g_list_prepend(p->timers, i);
return 0;
}
time_t poller_now(struct poller *p) {
return p->now;
}

@ -4,32 +4,27 @@
#include <sys/types.h>
#include <stdint.h>
#include <time.h>
#include <glib.h>
#include "obj.h"
struct poller_item {
int fd;
void *ptr;
struct obj *obj;
uintptr_t uintp;
void (*readable)(int, void *);
void (*writeable)(int, void *);
void (*closed)(int, void *);
void (*timer)(int, void *);
int blocked:1;
int error:1;
void (*readable)(int, void *, uintptr_t);
void (*writeable)(int, void *, uintptr_t);
void (*closed)(int, void *, uintptr_t);
void (*timer)(int, void *, uintptr_t);
};
struct poller {
int fd;
struct poller_item **items;
unsigned int items_size;
GList *timers;
struct poller;
time_t now;
};
struct poller *poller_new(void);
@ -40,8 +35,9 @@ int poller_poll(struct poller *, int);
void poller_blocked(struct poller *, int);
int poller_isblocked(struct poller *, int);
void poller_error(struct poller *, int);
time_t poller_now(struct poller *);
int poller_timer(struct poller *, void (*)(void *), void *);
int poller_timer(struct poller *, void (*)(void *), struct obj *);
#endif

@ -21,7 +21,7 @@ struct streambuf *streambuf_new(struct poller *p, int fd) {
b->buf = g_string_new("");
b->fd = fd;
b->poller = p;
b->active = p->now;
b->active = poller_now(p);
return b;
}
@ -52,7 +52,7 @@ int streambuf_writeable(struct streambuf *b) {
if (ret > 0) {
g_string_erase(b->buf, 0, ret);
b->active = b->poller->now;
b->active = poller_now(b->poller);
}
if (ret != out) {
@ -80,7 +80,7 @@ int streambuf_readable(struct streambuf *b) {
}
g_string_append_len(b->buf, buf, ret);
b->active = b->poller->now;
b->active = poller_now(b->poller);
}
return 0;
@ -162,7 +162,7 @@ void streambuf_write(struct streambuf *b, char *s, unsigned int len) {
s += ret;
len -= ret;
b->active = b->poller->now;
b->active = poller_now(b->poller);
}
if (b->buf->len > 5242880)

Loading…
Cancel
Save