diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 658acc211..91a7cbfa2 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -137,10 +137,12 @@ struct control_ng_stats* get_control_ng_stats(const sockaddr_t *addr) { static void __ng_buffer_free(void *p) { struct ng_buffer *ngbuf = p; bencode_buffer_free(&ngbuf->buffer); + if (ngbuf->ref) + __obj_put(ngbuf->ref); } int control_ng_process(str *buf, const endpoint_t *sin, char *addr, - void (*cb)(str *, str *, const endpoint_t *, void *), void *p1) + void (*cb)(str *, str *, const endpoint_t *, void *), void *p1, struct obj *ref) { struct ng_buffer *ngbuf; bencode_item_t *dict, *resp; @@ -159,9 +161,12 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, return funcret; } + // init decode buffer object ngbuf = obj_alloc0("ng_buffer", sizeof(*ngbuf), __ng_buffer_free); mutex_init(&ngbuf->lock); mutex_lock(&ngbuf->lock); + if (ref) + ngbuf->ref = __obj_get(ref); // hold until we're done int ret = bencode_buffer_init(&ngbuf->buffer); assert(ret == 0); @@ -404,10 +409,10 @@ static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void } -static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin, char *addr, - socket_t *ul) +static void control_ng_incoming(struct obj *obj, struct udp_buffer *udp_buf) { - control_ng_process(buf, sin, addr, control_ng_send, ul); + control_ng_process(&udp_buf->str, &udp_buf->sin, udp_buf->addr, control_ng_send, udp_buf->listener, + &udp_buf->obj); } diff --git a/daemon/control_udp.c b/daemon/control_udp.c index cfc1f038a..f5a66aa29 100644 --- a/daemon/control_udp.c +++ b/daemon/control_udp.c @@ -21,8 +21,7 @@ #include "log_funcs.h" -static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *sin, char *addr, - socket_t *ul) { +static void control_udp_incoming(struct obj *obj, struct udp_buffer *udp_buf) { struct control_udp *u = (void *) obj; int ret; int ovec[100]; @@ -31,17 +30,17 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si unsigned int iovlen; str cookie, *reply; - ret = pcre_exec(u->parse_re, u->parse_ree, buf->s, buf->len, 0, 0, ovec, G_N_ELEMENTS(ovec)); + ret = pcre_exec(u->parse_re, u->parse_ree, udp_buf->str.s, udp_buf->str.len, 0, 0, ovec, G_N_ELEMENTS(ovec)); if (ret <= 0) { - ret = pcre_exec(u->fallback_re, NULL, buf->s, buf->len, 0, 0, ovec, G_N_ELEMENTS(ovec)); + ret = pcre_exec(u->fallback_re, NULL, udp_buf->str.s, udp_buf->str.len, 0, 0, ovec, G_N_ELEMENTS(ovec)); if (ret <= 0) { - ilogs(control, LOG_WARNING, "Unable to parse command line from udp:%s: %.*s", addr, STR_FMT(buf)); + ilogs(control, LOG_WARNING, "Unable to parse command line from udp:%s: %.*s", udp_buf->addr, STR_FMT(&udp_buf->str)); return; } - ilogs(control, LOG_WARNING, "Failed to properly parse UDP command line '%.*s' from %s, using fallback RE", STR_FMT(buf), addr); + ilogs(control, LOG_WARNING, "Failed to properly parse UDP command line '%.*s' from %s, using fallback RE", STR_FMT(&udp_buf->str), udp_buf->addr); - pcre_get_substring_list(buf->s, ovec, ret, (const char ***) &out); + pcre_get_substring_list(udp_buf->str.s, ovec, ret, (const char ***) &out); iov[0].iov_base = (void *) out[RE_UDP_COOKIE]; iov[0].iov_len = strlen(out[RE_UDP_COOKIE]); @@ -60,22 +59,22 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si iovlen = 2; } - socket_sendiov(ul, iov, iovlen, sin); + socket_sendiov(udp_buf->listener, iov, iovlen, &udp_buf->sin); pcre_free(out); return; } - ilogs(control, LOG_INFO, "Got valid command from udp:%s: %.*s", addr, STR_FMT(buf)); + ilogs(control, LOG_INFO, "Got valid command from udp:%s: %.*s", udp_buf->addr, STR_FMT(&udp_buf->str)); - pcre_get_substring_list(buf->s, ovec, ret, (const char ***) &out); + pcre_get_substring_list(udp_buf->str.s, ovec, ret, (const char ***) &out); str_init(&cookie, (void *) out[RE_UDP_COOKIE]); reply = cookie_cache_lookup(&u->cookie_cache, &cookie); if (reply) { - ilogs(control, LOG_INFO, "Detected command from udp:%s as a duplicate", addr); - socket_sendto(ul, reply->s, reply->len, sin); + ilogs(control, LOG_INFO, "Detected command from udp:%s as a duplicate", udp_buf->addr); + socket_sendto(udp_buf->listener, reply->s, reply->len, &udp_buf->sin); free(reply); goto out; } @@ -86,7 +85,7 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si log_info_c_string(out[RE_UDP_DQ_CALLID]); if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'U') - reply = call_update_udp(out, addr, sin); + reply = call_update_udp(out, udp_buf->addr, &udp_buf->sin); else if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'L') reply = call_lookup_udp(out); else if (chrtoupper(out[RE_UDP_DQ_CMD][0]) == 'D') @@ -118,11 +117,11 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si iov[2].iov_len = 9; iovlen++; } - socket_sendiov(ul, iov, iovlen, sin); + socket_sendiov(udp_buf->listener, iov, iovlen, &udp_buf->sin); } if (reply) { - socket_sendto(ul, reply->s, reply->len, sin); + socket_sendto(udp_buf->listener, reply->s, reply->len, &udp_buf->sin); cookie_cache_insert(&u->cookie_cache, &cookie, reply); free(reply); } diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index 5529c46c0..66f3e6572 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -28,17 +28,17 @@ static void udp_listener_closed(int fd, void *p, uintptr_t x) { static void udp_listener_incoming(int fd, void *p, uintptr_t x) { struct udp_listener_callback *cb = p; int len; - char buf[0x10000]; - char addr[64]; - str str; - socket_t *listener; - endpoint_t sin; - - str.s = buf; - listener = cb->ul; + struct udp_buffer *udp_buf = NULL; for (;;) { - len = socket_recvfrom(listener, buf, sizeof(buf)-1, &sin); + if (!udp_buf) { + // initialise if we need to + udp_buf = obj_alloc0("udp_buffer", sizeof(*udp_buf), NULL); + udp_buf->str.s = udp_buf->buf + RTP_BUFFER_HEAD_ROOM; + udp_buf->listener = cb->ul; + } + + len = socket_recvfrom(udp_buf->listener, udp_buf->str.s, MAX_UDP_LENGTH, &udp_buf->sin); if (len < 0) { if (errno == EINTR) continue; @@ -47,11 +47,19 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) { return; } - buf[len] = '\0'; - endpoint_print(&sin, addr, sizeof(addr)); + udp_buf->str.s[len] = '\0'; + endpoint_print(&udp_buf->sin, udp_buf->addr, sizeof(udp_buf->addr)); - str.len = len; - cb->func(cb->p, &str, &sin, addr, listener); + udp_buf->str.len = len; + cb->func(cb->p, udp_buf); + + // we can re-use the object if only one reference (ours) is left. this is not + // totally race-free, but in the worst case we end up re-allocating another + // new object when we didn't need to. + if (udp_buf->obj.ref != 1) { + obj_put(udp_buf); + udp_buf = NULL; + } } } diff --git a/daemon/websocket.c b/daemon/websocket.c index de6e54ccd..81f359c91 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -37,6 +37,14 @@ struct websocket_conn { GQueue output_q; }; +struct websocket_ng_buf { + struct obj obj; + GString *body; + char addr[64]; + str cmd; + endpoint_t endpoint; +}; + static GQueue websocket_vhost_configs; static struct lws_context *websocket_context; @@ -414,32 +422,48 @@ static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin websocket_queue_raw(wc, body->s, body->len); websocket_write_http(wc, NULL, 1); } + +static void __ng_buf_free(void *p) { + struct websocket_ng_buf *buf = p; + g_string_free(buf->body, TRUE); +} + static const char *websocket_ng_process(struct websocket_message *wm) { - char addr[64]; - endpoint_print(&wm->wc->endpoint, addr, sizeof(addr)); + struct websocket_ng_buf *buf = obj_alloc0("websocket_ng_buf", sizeof(*buf), __ng_buf_free); - ilogs(http, LOG_DEBUG, "Processing websocket NG req from %s", addr); + endpoint_print(&wm->wc->endpoint, buf->addr, sizeof(buf->addr)); - str cmd; - str_init_len(&cmd, wm->body->str, wm->body->len); + ilogs(http, LOG_DEBUG, "Processing websocket NG req from %s", buf->addr); + + // steal body and initialise + buf->body = wm->body; + wm->body = g_string_new(""); + str_init_len(&buf->cmd, buf->body->str, buf->body->len); + buf->endpoint = wm->wc->endpoint; - control_ng_process(&cmd, &wm->wc->endpoint, addr, websocket_ng_send_ws, wm->wc); + control_ng_process(&buf->cmd, &buf->endpoint, buf->addr, websocket_ng_send_ws, wm->wc, &buf->obj); + + obj_put(buf); return NULL; } static const char *websocket_http_ng(struct websocket_message *wm) { - char addr[64]; + struct websocket_ng_buf *buf = obj_alloc0("websocket_ng_buf", sizeof(*buf), __ng_buf_free); - endpoint_print(&wm->wc->endpoint, addr, sizeof(addr)); + endpoint_print(&wm->wc->endpoint, buf->addr, sizeof(buf->addr)); - ilogs(http, LOG_DEBUG, "Respoding to POST /ng from %s", addr); + ilogs(http, LOG_DEBUG, "Respoding to POST /ng from %s", buf->addr); - str cmd; - str_init_len(&cmd, wm->body->str, wm->body->len); + // steal body and initialise + buf->body = wm->body; + wm->body = g_string_new(""); + str_init_len(&buf->cmd, buf->body->str, buf->body->len); + buf->endpoint = wm->wc->endpoint; - if (control_ng_process(&cmd, &wm->wc->endpoint, addr, websocket_ng_send_http, wm->wc)) + if (control_ng_process(&buf->cmd, &buf->endpoint, buf->addr, websocket_ng_send_http, wm->wc, &buf->obj)) websocket_http_complete(wm->wc, 600, "text/plain", 6, "error\n"); + obj_put(buf); return NULL; } diff --git a/include/control_ng.h b/include/control_ng.h index 6b917b22f..c5d61f39b 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -55,6 +55,7 @@ struct ng_buffer { struct obj obj; mutex_t lock; bencode_buffer_t buffer; + struct obj *ref; }; extern const char *ng_command_strings[NGC_COUNT]; @@ -64,7 +65,7 @@ struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char); void control_ng_init(void); void control_ng_cleanup(void); int control_ng_process(str *buf, const endpoint_t *sin, char *addr, - void (*cb)(str *, str *, const endpoint_t *, void *), void *p1); + void (*cb)(str *, str *, const endpoint_t *, void *), void *p1, struct obj *); INLINE void ng_buffer_release(struct ng_buffer *ngbuf) { mutex_unlock(&ngbuf->lock); diff --git a/include/udp_listener.h b/include/udp_listener.h index 5aa44c107..94c039d96 100644 --- a/include/udp_listener.h +++ b/include/udp_listener.h @@ -5,12 +5,23 @@ #include "poller.h" #include "str.h" #include "socket.h" +#include "obj.h" +#include "call.h" +#define MAX_UDP_LENGTH 0xffff struct poller; -struct obj; -typedef void (*udp_listener_callback_t)(struct obj *p, str *buf, const endpoint_t *ep, char *addr, socket_t *); +struct udp_buffer { + struct obj obj; + char buf[MAX_UDP_LENGTH + RTP_BUFFER_TAIL_ROOM + RTP_BUFFER_HEAD_ROOM + 1]; + str str; + endpoint_t sin; + char addr[64]; + socket_t *listener; +}; + +typedef void (*udp_listener_callback_t)(struct obj *p, struct udp_buffer *); int udp_listener_init(socket_t *, struct poller *p, const endpoint_t *, udp_listener_callback_t, struct obj *);