diff --git a/daemon/control.c b/daemon/control.c index 47d98f4..7688b2f 100644 --- a/daemon/control.c +++ b/daemon/control.c @@ -4,6 +4,7 @@ #include #include #include +#include #include "control.h" #include "poller.h" @@ -21,14 +22,20 @@ static pcre_extra *parse_ree; static void control_stream_closed(int fd, void *p, uintptr_t u) { struct control_stream *s = p; struct control *c; + GList *l; mylog(LOG_INFO, "Control connection from " DF " closed", DP(s->inaddr)); c = s->control; - c->streams = g_list_remove(c->streams, s); + mutex_lock(&c->lock); + l = g_list_find(c->streams, s); + if (l) + c->streams = g_list_delete_link(c->streams, l); + mutex_unlock(&c->lock); + if (!l) + return; obj_put(s); - if (poller_del_item(s->poller, fd)) abort(); } @@ -38,32 +45,24 @@ static void control_list(struct control *c, struct control_stream *s) { struct control_stream *i; GList *l; + mutex_lock(&c->lock); for (l = c->streams; l; l = l->next) { i = l->data; streambuf_printf(s->outbuf, DF "\n", DP(i->inaddr)); } + mutex_unlock(&c->lock); streambuf_printf(s->outbuf, "End.\n"); } static int control_stream_parse(struct control_stream *s, char *line) { - const char *errptr; - int erroff; int ovec[60]; int ret; const char **out; struct control *c = s->control; char *output = NULL; - if (!parse_re) { - parse_re = pcre_compile( - /* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */ - "^(?:(request|lookup)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+info=(\\S*)|(delete)\\s+(\\S+)\\s+info=(\\S*)|(build|version|controls|quit|exit|status))$", - PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); - parse_ree = pcre_study(parse_re, 0, &errptr); - } - ret = pcre_exec(parse_re, parse_ree, line, strlen(line), 0, 0, ovec, G_N_ELEMENTS(ovec)); if (ret <= 0) { mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line); @@ -91,7 +90,9 @@ static int control_stream_parse(struct control_stream *s, char *line) { ; if (output) { + mutex_lock(&s->lock); streambuf_write(s->outbuf, output, strlen(output)); + mutex_unlock(&s->lock); free(output); } @@ -103,8 +104,13 @@ static int control_stream_parse(struct control_stream *s, char *line) { static void control_stream_timer(int fd, void *p, uintptr_t u) { struct control_stream *s = p; struct poller *o = s->poller; + int i; - if ((poller_now(o) - s->inbuf->active) >= 60 || (poller_now(o) - s->outbuf->active) >= 60) + mutex_lock(&s->lock); + i = (poller_now(o) - s->inbuf->active) >= 60 || (poller_now(o) - s->outbuf->active) >= 60; + mutex_unlock(&s->lock); + + if (i) control_stream_closed(s->fd, s, 0); } @@ -114,15 +120,19 @@ static void control_stream_readable(int fd, void *p, uintptr_t u) { char *line; int ret; + mutex_lock(&s->lock); + if (streambuf_readable(s->inbuf)) goto close; while ((line = streambuf_getline(s->inbuf))) { + mutex_unlock(&s->lock); mylog(LOG_DEBUG, "Got control line from " DF ": %s", DP(s->inaddr), line); ret = control_stream_parse(s, line); free(line); if (ret) - goto close; + goto close_nolock; + mutex_lock(&s->lock); } if (streambuf_bufsize(s->inbuf) > 1024) { @@ -130,9 +140,12 @@ static void control_stream_readable(int fd, void *p, uintptr_t u) { goto close; } + mutex_unlock(&s->lock); return; close: + mutex_unlock(&s->lock); +close_nolock: control_stream_closed(fd, s, 0); } @@ -179,6 +192,7 @@ static void control_incoming(int fd, void *p, uintptr_t u) { s->inbuf = streambuf_new(c->poller, nfd); s->outbuf = streambuf_new(c->poller, nfd); memcpy(&s->inaddr, &sin, sizeof(s->inaddr)); + mutex_init(&s->lock); ZERO(i); i.fd = nfd; @@ -191,8 +205,10 @@ static void control_incoming(int fd, void *p, uintptr_t u) { if (poller_add_item(c->poller, &i)) goto fail; + mutex_lock(&c->lock); /* let the list steal our own ref */ c->streams = g_list_prepend(c->streams, s); + mutex_unlock(&c->lock); return; @@ -206,12 +222,22 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru struct control *c; struct poller_item i; struct sockaddr_in sin; + const char *errptr; + int erroff; if (!p) return NULL; if (!m) return NULL; + if (!parse_re) { + parse_re = pcre_compile( + /* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */ + "^(?:(request|lookup)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+info=(\\S*)|(delete)\\s+(\\S+)\\s+info=(\\S*)|(build|version|controls|quit|exit|status))$", + PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); + parse_ree = pcre_study(parse_re, 0, &errptr); + } + fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) return NULL; @@ -235,6 +261,7 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru c->fd = fd; c->poller = p; c->callmaster = m; + mutex_init(&c->lock); ZERO(i); i.fd = fd; diff --git a/daemon/control.h b/daemon/control.h index 623ac04..8631558 100644 --- a/daemon/control.h +++ b/daemon/control.h @@ -10,6 +10,7 @@ #include #include "obj.h" +#include "aux.h" #define RE_TCP_RL_CMD 1 @@ -39,6 +40,7 @@ struct control_stream { struct obj obj; int fd; + mutex_t lock; struct streambuf *inbuf; struct streambuf *outbuf; struct sockaddr_in inaddr; @@ -53,6 +55,7 @@ struct control { int fd; + mutex_t lock; GList *streams; struct poller *poller;