and the tcp controller becomes thread-safe

remotes/origin/2.1
Richard Fuchs 13 years ago
parent 6e474ccfc0
commit 419f6bb230

@ -4,6 +4,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <pcre.h> #include <pcre.h>
#include <glib.h>
#include "control.h" #include "control.h"
#include "poller.h" #include "poller.h"
@ -21,14 +22,20 @@ static pcre_extra *parse_ree;
static void control_stream_closed(int fd, void *p, uintptr_t u) { static void control_stream_closed(int fd, void *p, uintptr_t u) {
struct control_stream *s = p; struct control_stream *s = p;
struct control *c; struct control *c;
GList *l;
mylog(LOG_INFO, "Control connection from " DF " closed", DP(s->inaddr)); mylog(LOG_INFO, "Control connection from " DF " closed", DP(s->inaddr));
c = s->control; c = s->control;
c->streams = g_list_remove(c->streams, s); mutex_lock(&c->lock);
l = g_list_find(c->streams, s);
if (l)
c->streams = g_list_delete_link(c->streams, l);
mutex_unlock(&c->lock);
if (!l)
return;
obj_put(s); obj_put(s);
if (poller_del_item(s->poller, fd)) if (poller_del_item(s->poller, fd))
abort(); abort();
} }
@ -38,32 +45,24 @@ static void control_list(struct control *c, struct control_stream *s) {
struct control_stream *i; struct control_stream *i;
GList *l; GList *l;
mutex_lock(&c->lock);
for (l = c->streams; l; l = l->next) { for (l = c->streams; l; l = l->next) {
i = l->data; i = l->data;
streambuf_printf(s->outbuf, DF "\n", DP(i->inaddr)); streambuf_printf(s->outbuf, DF "\n", DP(i->inaddr));
} }
mutex_unlock(&c->lock);
streambuf_printf(s->outbuf, "End.\n"); streambuf_printf(s->outbuf, "End.\n");
} }
static int control_stream_parse(struct control_stream *s, char *line) { static int control_stream_parse(struct control_stream *s, char *line) {
const char *errptr;
int erroff;
int ovec[60]; int ovec[60];
int ret; int ret;
const char **out; const char **out;
struct control *c = s->control; struct control *c = s->control;
char *output = NULL; char *output = NULL;
if (!parse_re) {
parse_re = pcre_compile(
/* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */
"^(?:(request|lookup)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+info=(\\S*)|(delete)\\s+(\\S+)\\s+info=(\\S*)|(build|version|controls|quit|exit|status))$",
PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
parse_ree = pcre_study(parse_re, 0, &errptr);
}
ret = pcre_exec(parse_re, parse_ree, line, strlen(line), 0, 0, ovec, G_N_ELEMENTS(ovec)); ret = pcre_exec(parse_re, parse_ree, line, strlen(line), 0, 0, ovec, G_N_ELEMENTS(ovec));
if (ret <= 0) { if (ret <= 0) {
mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line); mylog(LOG_WARNING, "Unable to parse command line from " DF ": %s", DP(s->inaddr), line);
@ -91,7 +90,9 @@ static int control_stream_parse(struct control_stream *s, char *line) {
; ;
if (output) { if (output) {
mutex_lock(&s->lock);
streambuf_write(s->outbuf, output, strlen(output)); streambuf_write(s->outbuf, output, strlen(output));
mutex_unlock(&s->lock);
free(output); free(output);
} }
@ -103,8 +104,13 @@ static int control_stream_parse(struct control_stream *s, char *line) {
static void control_stream_timer(int fd, void *p, uintptr_t u) { static void control_stream_timer(int fd, void *p, uintptr_t u) {
struct control_stream *s = p; struct control_stream *s = p;
struct poller *o = s->poller; struct poller *o = s->poller;
int i;
if ((poller_now(o) - s->inbuf->active) >= 60 || (poller_now(o) - s->outbuf->active) >= 60) mutex_lock(&s->lock);
i = (poller_now(o) - s->inbuf->active) >= 60 || (poller_now(o) - s->outbuf->active) >= 60;
mutex_unlock(&s->lock);
if (i)
control_stream_closed(s->fd, s, 0); control_stream_closed(s->fd, s, 0);
} }
@ -114,15 +120,19 @@ static void control_stream_readable(int fd, void *p, uintptr_t u) {
char *line; char *line;
int ret; int ret;
mutex_lock(&s->lock);
if (streambuf_readable(s->inbuf)) if (streambuf_readable(s->inbuf))
goto close; goto close;
while ((line = streambuf_getline(s->inbuf))) { while ((line = streambuf_getline(s->inbuf))) {
mutex_unlock(&s->lock);
mylog(LOG_DEBUG, "Got control line from " DF ": %s", DP(s->inaddr), line); mylog(LOG_DEBUG, "Got control line from " DF ": %s", DP(s->inaddr), line);
ret = control_stream_parse(s, line); ret = control_stream_parse(s, line);
free(line); free(line);
if (ret) if (ret)
goto close; goto close_nolock;
mutex_lock(&s->lock);
} }
if (streambuf_bufsize(s->inbuf) > 1024) { if (streambuf_bufsize(s->inbuf) > 1024) {
@ -130,9 +140,12 @@ static void control_stream_readable(int fd, void *p, uintptr_t u) {
goto close; goto close;
} }
mutex_unlock(&s->lock);
return; return;
close: close:
mutex_unlock(&s->lock);
close_nolock:
control_stream_closed(fd, s, 0); control_stream_closed(fd, s, 0);
} }
@ -179,6 +192,7 @@ static void control_incoming(int fd, void *p, uintptr_t u) {
s->inbuf = streambuf_new(c->poller, nfd); s->inbuf = streambuf_new(c->poller, nfd);
s->outbuf = streambuf_new(c->poller, nfd); s->outbuf = streambuf_new(c->poller, nfd);
memcpy(&s->inaddr, &sin, sizeof(s->inaddr)); memcpy(&s->inaddr, &sin, sizeof(s->inaddr));
mutex_init(&s->lock);
ZERO(i); ZERO(i);
i.fd = nfd; i.fd = nfd;
@ -191,8 +205,10 @@ static void control_incoming(int fd, void *p, uintptr_t u) {
if (poller_add_item(c->poller, &i)) if (poller_add_item(c->poller, &i))
goto fail; goto fail;
mutex_lock(&c->lock);
/* let the list steal our own ref */ /* let the list steal our own ref */
c->streams = g_list_prepend(c->streams, s); c->streams = g_list_prepend(c->streams, s);
mutex_unlock(&c->lock);
return; return;
@ -206,12 +222,22 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru
struct control *c; struct control *c;
struct poller_item i; struct poller_item i;
struct sockaddr_in sin; struct sockaddr_in sin;
const char *errptr;
int erroff;
if (!p) if (!p)
return NULL; return NULL;
if (!m) if (!m)
return NULL; return NULL;
if (!parse_re) {
parse_re = pcre_compile(
/* reqtype callid streams ip fromdom fromtype todom totype agent info |reqtype callid info | reqtype */
"^(?:(request|lookup)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+info=(\\S*)|(delete)\\s+(\\S+)\\s+info=(\\S*)|(build|version|controls|quit|exit|status))$",
PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL);
parse_ree = pcre_study(parse_re, 0, &errptr);
}
fd = socket(AF_INET, SOCK_STREAM, 0); fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) if (fd == -1)
return NULL; return NULL;
@ -235,6 +261,7 @@ struct control *control_new(struct poller *p, u_int32_t ip, u_int16_t port, stru
c->fd = fd; c->fd = fd;
c->poller = p; c->poller = p;
c->callmaster = m; c->callmaster = m;
mutex_init(&c->lock);
ZERO(i); ZERO(i);
i.fd = fd; i.fd = fd;

@ -10,6 +10,7 @@
#include <glib.h> #include <glib.h>
#include "obj.h" #include "obj.h"
#include "aux.h"
#define RE_TCP_RL_CMD 1 #define RE_TCP_RL_CMD 1
@ -39,6 +40,7 @@ struct control_stream {
struct obj obj; struct obj obj;
int fd; int fd;
mutex_t lock;
struct streambuf *inbuf; struct streambuf *inbuf;
struct streambuf *outbuf; struct streambuf *outbuf;
struct sockaddr_in inaddr; struct sockaddr_in inaddr;
@ -53,6 +55,7 @@ struct control {
int fd; int fd;
mutex_t lock;
GList *streams; GList *streams;
struct poller *poller; struct poller *poller;

Loading…
Cancel
Save