TT#106101 also turn UDP/WS receive buffer into refcounted object

Change-Id: I81fa68b07af3a87e26d031a5722dcd103a1e620e
pull/1164/head
Richard Fuchs 4 years ago
parent 28e0620c80
commit 554034eb7e

@ -137,10 +137,12 @@ struct control_ng_stats* get_control_ng_stats(const sockaddr_t *addr) {
static void __ng_buffer_free(void *p) {
struct ng_buffer *ngbuf = p;
bencode_buffer_free(&ngbuf->buffer);
if (ngbuf->ref)
__obj_put(ngbuf->ref);
}
int control_ng_process(str *buf, const endpoint_t *sin, char *addr,
void (*cb)(str *, str *, const endpoint_t *, void *), void *p1)
void (*cb)(str *, str *, const endpoint_t *, void *), void *p1, struct obj *ref)
{
struct ng_buffer *ngbuf;
bencode_item_t *dict, *resp;
@ -159,9 +161,12 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr,
return funcret;
}
// init decode buffer object
ngbuf = obj_alloc0("ng_buffer", sizeof(*ngbuf), __ng_buffer_free);
mutex_init(&ngbuf->lock);
mutex_lock(&ngbuf->lock);
if (ref)
ngbuf->ref = __obj_get(ref); // hold until we're done
int ret = bencode_buffer_init(&ngbuf->buffer);
assert(ret == 0);
@ -404,10 +409,10 @@ static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void
}
static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin, char *addr,
socket_t *ul)
static void control_ng_incoming(struct obj *obj, struct udp_buffer *udp_buf)
{
control_ng_process(buf, sin, addr, control_ng_send, ul);
control_ng_process(&udp_buf->str, &udp_buf->sin, udp_buf->addr, control_ng_send, udp_buf->listener,
&udp_buf->obj);
}

@ -21,8 +21,7 @@
#include "log_funcs.h"
static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *sin, char *addr,
socket_t *ul) {
static void control_udp_incoming(struct obj *obj, struct udp_buffer *udp_buf) {
struct control_udp *u = (void *) obj;
int ret;
int ovec[100];
@ -31,17 +30,17 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si
unsigned int iovlen;
str cookie, *reply;
ret = pcre_exec(u->parse_re, u->parse_ree, buf->s, buf->len, 0, 0, ovec, G_N_ELEMENTS(ovec));
ret = pcre_exec(u->parse_re, u->parse_ree, udp_buf->str.s, udp_buf->str.len, 0, 0, ovec, G_N_ELEMENTS(ovec));
if (ret <= 0) {
ret = pcre_exec(u->fallback_re, NULL, buf->s, buf->len, 0, 0, ovec, G_N_ELEMENTS(ovec));
ret = pcre_exec(u->fallback_re, NULL, udp_buf->str.s, udp_buf->str.len, 0, 0, ovec, G_N_ELEMENTS(ovec));
if (ret <= 0) {
ilogs(control, LOG_WARNING, "Unable to parse command line from udp:%s: %.*s", addr, STR_FMT(buf));
ilogs(control, LOG_WARNING, "Unable to parse command line from udp:%s: %.*s", udp_buf->addr, STR_FMT(&udp_buf->str));
return;
}
ilogs(control, LOG_WARNING, "Failed to properly parse UDP command line '%.*s' from %s, using fallback RE", STR_FMT(buf), addr);
ilogs(control, LOG_WARNING, "Failed to properly parse UDP command line '%.*s' from %s, using fallback RE", STR_FMT(&udp_buf->str), udp_buf->addr);
pcre_get_substring_list(buf->s, ovec, ret, (const char ***) &out);
pcre_get_substring_list(udp_buf->str.s, ovec, ret, (const char ***) &out);
iov[0].iov_base = (void *) out[RE_UDP_COOKIE];
iov[0].iov_len = strlen(out[RE_UDP_COOKIE]);
@ -60,22 +59,22 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si
iovlen = 2;
}
socket_sendiov(ul, iov, iovlen, sin);
socket_sendiov(udp_buf->listener, iov, iovlen, &udp_buf->sin);
pcre_free(out);
return;
}
ilogs(control, LOG_INFO, "Got valid command from udp:%s: %.*s", addr, STR_FMT(buf));
ilogs(control, LOG_INFO, "Got valid command from udp:%s: %.*s", udp_buf->addr, STR_FMT(&udp_buf->str));
pcre_get_substring_list(buf->s, ovec, ret, (const char ***) &out);
pcre_get_substring_list(udp_buf->str.s, ovec, ret, (const char ***) &out);
str_init(&cookie, (void *) out[RE_UDP_COOKIE]);
reply = cookie_cache_lookup(&u->cookie_cache, &cookie);
if (reply) {
ilogs(control, LOG_INFO, "Detected command from udp:%s as a duplicate", addr);
socket_sendto(ul, reply->s, reply->len, sin);
ilogs(control, LOG_INFO, "Detected command from udp:%s as a duplicate", udp_buf->addr);
socket_sendto(udp_buf->listener, reply->s, reply->len, &udp_buf->sin);
free(reply);
goto out;
}
@ -86,7 +85,7 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si
log_info_c_string(out[RE_UDP_DQ_CALLID]);
if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'U')
reply = call_update_udp(out, addr, sin);
reply = call_update_udp(out, udp_buf->addr, &udp_buf->sin);
else if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'L')
reply = call_lookup_udp(out);
else if (chrtoupper(out[RE_UDP_DQ_CMD][0]) == 'D')
@ -118,11 +117,11 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si
iov[2].iov_len = 9;
iovlen++;
}
socket_sendiov(ul, iov, iovlen, sin);
socket_sendiov(udp_buf->listener, iov, iovlen, &udp_buf->sin);
}
if (reply) {
socket_sendto(ul, reply->s, reply->len, sin);
socket_sendto(udp_buf->listener, reply->s, reply->len, &udp_buf->sin);
cookie_cache_insert(&u->cookie_cache, &cookie, reply);
free(reply);
}

@ -28,17 +28,17 @@ static void udp_listener_closed(int fd, void *p, uintptr_t x) {
static void udp_listener_incoming(int fd, void *p, uintptr_t x) {
struct udp_listener_callback *cb = p;
int len;
char buf[0x10000];
char addr[64];
str str;
socket_t *listener;
endpoint_t sin;
str.s = buf;
listener = cb->ul;
struct udp_buffer *udp_buf = NULL;
for (;;) {
len = socket_recvfrom(listener, buf, sizeof(buf)-1, &sin);
if (!udp_buf) {
// initialise if we need to
udp_buf = obj_alloc0("udp_buffer", sizeof(*udp_buf), NULL);
udp_buf->str.s = udp_buf->buf + RTP_BUFFER_HEAD_ROOM;
udp_buf->listener = cb->ul;
}
len = socket_recvfrom(udp_buf->listener, udp_buf->str.s, MAX_UDP_LENGTH, &udp_buf->sin);
if (len < 0) {
if (errno == EINTR)
continue;
@ -47,11 +47,19 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) {
return;
}
buf[len] = '\0';
endpoint_print(&sin, addr, sizeof(addr));
udp_buf->str.s[len] = '\0';
endpoint_print(&udp_buf->sin, udp_buf->addr, sizeof(udp_buf->addr));
str.len = len;
cb->func(cb->p, &str, &sin, addr, listener);
udp_buf->str.len = len;
cb->func(cb->p, udp_buf);
// we can re-use the object if only one reference (ours) is left. this is not
// totally race-free, but in the worst case we end up re-allocating another
// new object when we didn't need to.
if (udp_buf->obj.ref != 1) {
obj_put(udp_buf);
udp_buf = NULL;
}
}
}

@ -37,6 +37,14 @@ struct websocket_conn {
GQueue output_q;
};
struct websocket_ng_buf {
struct obj obj;
GString *body;
char addr[64];
str cmd;
endpoint_t endpoint;
};
static GQueue websocket_vhost_configs;
static struct lws_context *websocket_context;
@ -414,32 +422,48 @@ static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin
websocket_queue_raw(wc, body->s, body->len);
websocket_write_http(wc, NULL, 1);
}
static void __ng_buf_free(void *p) {
struct websocket_ng_buf *buf = p;
g_string_free(buf->body, TRUE);
}
static const char *websocket_ng_process(struct websocket_message *wm) {
char addr[64];
endpoint_print(&wm->wc->endpoint, addr, sizeof(addr));
struct websocket_ng_buf *buf = obj_alloc0("websocket_ng_buf", sizeof(*buf), __ng_buf_free);
ilogs(http, LOG_DEBUG, "Processing websocket NG req from %s", addr);
endpoint_print(&wm->wc->endpoint, buf->addr, sizeof(buf->addr));
str cmd;
str_init_len(&cmd, wm->body->str, wm->body->len);
ilogs(http, LOG_DEBUG, "Processing websocket NG req from %s", buf->addr);
// steal body and initialise
buf->body = wm->body;
wm->body = g_string_new("");
str_init_len(&buf->cmd, buf->body->str, buf->body->len);
buf->endpoint = wm->wc->endpoint;
control_ng_process(&cmd, &wm->wc->endpoint, addr, websocket_ng_send_ws, wm->wc);
control_ng_process(&buf->cmd, &buf->endpoint, buf->addr, websocket_ng_send_ws, wm->wc, &buf->obj);
obj_put(buf);
return NULL;
}
static const char *websocket_http_ng(struct websocket_message *wm) {
char addr[64];
struct websocket_ng_buf *buf = obj_alloc0("websocket_ng_buf", sizeof(*buf), __ng_buf_free);
endpoint_print(&wm->wc->endpoint, addr, sizeof(addr));
endpoint_print(&wm->wc->endpoint, buf->addr, sizeof(buf->addr));
ilogs(http, LOG_DEBUG, "Respoding to POST /ng from %s", addr);
ilogs(http, LOG_DEBUG, "Respoding to POST /ng from %s", buf->addr);
str cmd;
str_init_len(&cmd, wm->body->str, wm->body->len);
// steal body and initialise
buf->body = wm->body;
wm->body = g_string_new("");
str_init_len(&buf->cmd, buf->body->str, buf->body->len);
buf->endpoint = wm->wc->endpoint;
if (control_ng_process(&cmd, &wm->wc->endpoint, addr, websocket_ng_send_http, wm->wc))
if (control_ng_process(&buf->cmd, &buf->endpoint, buf->addr, websocket_ng_send_http, wm->wc, &buf->obj))
websocket_http_complete(wm->wc, 600, "text/plain", 6, "error\n");
obj_put(buf);
return NULL;
}

@ -55,6 +55,7 @@ struct ng_buffer {
struct obj obj;
mutex_t lock;
bencode_buffer_t buffer;
struct obj *ref;
};
extern const char *ng_command_strings[NGC_COUNT];
@ -64,7 +65,7 @@ struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char);
void control_ng_init(void);
void control_ng_cleanup(void);
int control_ng_process(str *buf, const endpoint_t *sin, char *addr,
void (*cb)(str *, str *, const endpoint_t *, void *), void *p1);
void (*cb)(str *, str *, const endpoint_t *, void *), void *p1, struct obj *);
INLINE void ng_buffer_release(struct ng_buffer *ngbuf) {
mutex_unlock(&ngbuf->lock);

@ -5,12 +5,23 @@
#include "poller.h"
#include "str.h"
#include "socket.h"
#include "obj.h"
#include "call.h"
#define MAX_UDP_LENGTH 0xffff
struct poller;
struct obj;
typedef void (*udp_listener_callback_t)(struct obj *p, str *buf, const endpoint_t *ep, char *addr, socket_t *);
struct udp_buffer {
struct obj obj;
char buf[MAX_UDP_LENGTH + RTP_BUFFER_TAIL_ROOM + RTP_BUFFER_HEAD_ROOM + 1];
str str;
endpoint_t sin;
char addr[64];
socket_t *listener;
};
typedef void (*udp_listener_callback_t)(struct obj *p, struct udp_buffer *);
int udp_listener_init(socket_t *, struct poller *p, const endpoint_t *, udp_listener_callback_t, struct obj *);

Loading…
Cancel
Save