|
|
|
@ -1,5 +1,6 @@
|
|
|
|
|
#include "control_ng.h"
|
|
|
|
|
|
|
|
|
|
#include <errno.h>
|
|
|
|
|
#include <sys/types.h>
|
|
|
|
|
#include <sys/socket.h>
|
|
|
|
|
#include <assert.h>
|
|
|
|
@ -16,10 +17,14 @@
|
|
|
|
|
#include "log_funcs.h"
|
|
|
|
|
#include "main.h"
|
|
|
|
|
#include "statistics.h"
|
|
|
|
|
|
|
|
|
|
#include "streambuf.h"
|
|
|
|
|
#include "str.h"
|
|
|
|
|
#include "tcp_listener.h"
|
|
|
|
|
|
|
|
|
|
mutex_t rtpe_cngs_lock;
|
|
|
|
|
mutex_t tcp_connections_lock;
|
|
|
|
|
GHashTable *rtpe_cngs_hash;
|
|
|
|
|
GHashTable *tcp_connections_hash;
|
|
|
|
|
struct control_ng *rtpe_control_ng;
|
|
|
|
|
static struct cookie_cache ng_cookie_cache;
|
|
|
|
|
|
|
|
|
@ -42,7 +47,6 @@ const char *ng_command_strings_short[NGC_COUNT] = {
|
|
|
|
|
"PlayDTMF", "Stats",
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) {
|
|
|
|
|
// lock offers
|
|
|
|
|
mutex_lock(&request->lock);
|
|
|
|
@ -390,7 +394,6 @@ out:
|
|
|
|
|
return funcret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void *p1) {
|
|
|
|
|
socket_t *ul = p1;
|
|
|
|
|
struct iovec iov[3];
|
|
|
|
@ -408,13 +411,88 @@ static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void
|
|
|
|
|
socket_sendiov(ul, iov, iovlen, sin);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void control_ng_incoming(struct obj *obj, struct udp_buffer *udp_buf)
|
|
|
|
|
{
|
|
|
|
|
control_ng_process(&udp_buf->str, &udp_buf->sin, udp_buf->addr, control_ng_send, udp_buf->listener,
|
|
|
|
|
&udp_buf->obj);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void control_incoming(struct streambuf_stream *s) {
|
|
|
|
|
ilog(LOG_INFO, "New TCP control ng connection from %s", s->addr);
|
|
|
|
|
mutex_lock(&tcp_connections_lock);
|
|
|
|
|
g_hash_table_insert(tcp_connections_hash, s->addr, s);
|
|
|
|
|
mutex_unlock(&tcp_connections_lock);
|
|
|
|
|
ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void control_closed(struct streambuf_stream *s) {
|
|
|
|
|
ilog(LOG_INFO, "TCP control ng connection from %s is closing", s->addr);
|
|
|
|
|
mutex_lock(&tcp_connections_lock);
|
|
|
|
|
g_hash_table_remove(tcp_connections_hash, s->addr);
|
|
|
|
|
mutex_unlock(&tcp_connections_lock);
|
|
|
|
|
ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static str *chunk_message(struct streambuf *b) {
|
|
|
|
|
char *p = NULL;
|
|
|
|
|
int len, to_del, bsize;
|
|
|
|
|
str *ret = NULL;
|
|
|
|
|
|
|
|
|
|
mutex_lock(&b->lock);
|
|
|
|
|
|
|
|
|
|
for (;;) {
|
|
|
|
|
if (b->eof)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
p = memchr(b->buf->str, ' ', b->buf->len);
|
|
|
|
|
if (!p)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
len = p - b->buf->str;
|
|
|
|
|
if (len == b->buf->len)
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
++p; /* bencode dictionary here */
|
|
|
|
|
bsize = bencode_valid(p, b->buf->str + b->buf->len - p);
|
|
|
|
|
if (bsize < 0)
|
|
|
|
|
break; /* not enough data to parse bencoded dictionary */
|
|
|
|
|
|
|
|
|
|
p += bsize;
|
|
|
|
|
len = p - b->buf->str;
|
|
|
|
|
to_del = len;
|
|
|
|
|
|
|
|
|
|
ret = str_alloc(len);
|
|
|
|
|
memcpy(ret->s, b->buf->str, len);
|
|
|
|
|
ret->len = len;
|
|
|
|
|
g_string_erase(b->buf, 0, to_del);
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mutex_unlock(&b->lock);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void control_stream_readable(struct streambuf_stream *s) {
|
|
|
|
|
str *data;
|
|
|
|
|
|
|
|
|
|
ilog(LOG_DEBUG, "Got %ld bytes from %s", s->inbuf->buf->len, s->addr);
|
|
|
|
|
while ((data = chunk_message(s->inbuf))) {
|
|
|
|
|
ilog(LOG_DEBUG, "Got control ng message from %s", s->addr);
|
|
|
|
|
control_ng_process(data, &s->sock.remote, s->addr, control_ng_send, &s->sock, s->parent);
|
|
|
|
|
free(data);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (streambuf_bufsize(s->inbuf) > 1024) {
|
|
|
|
|
ilog(LOG_WARNING, "Buffer length exceeded in control connection from %s", s->addr);
|
|
|
|
|
goto close;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
close:
|
|
|
|
|
streambuf_stream_close(s);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void control_ng_free(void *p) {
|
|
|
|
|
struct control_ng *c = p;
|
|
|
|
@ -433,9 +511,12 @@ void control_ng_free(void *p) {
|
|
|
|
|
poller_del_item(c->poller, c->udp_listeners[1].fd);
|
|
|
|
|
close_socket(&c->udp_listeners[0]);
|
|
|
|
|
close_socket(&c->udp_listeners[1]);
|
|
|
|
|
streambuf_listener_shutdown(&c->tcp_listeners[0]);
|
|
|
|
|
streambuf_listener_shutdown(&c->tcp_listeners[1]);
|
|
|
|
|
if (tcp_connections_hash)
|
|
|
|
|
g_hash_table_destroy(tcp_connections_hash);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) {
|
|
|
|
|
struct control_ng *c;
|
|
|
|
|
|
|
|
|
@ -463,9 +544,63 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned cha
|
|
|
|
|
fail2:
|
|
|
|
|
obj_put(c);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct control_ng *control_ng_tcp_new(struct poller *p, endpoint_t *ep, struct control_ng *ctrl_ng) {
|
|
|
|
|
if (!p)
|
|
|
|
|
return NULL;
|
|
|
|
|
|
|
|
|
|
if (!ctrl_ng) {
|
|
|
|
|
ctrl_ng = obj_alloc0("control_ng", sizeof(*ctrl_ng), NULL);
|
|
|
|
|
ctrl_ng->udp_listeners[0].fd = -1;
|
|
|
|
|
ctrl_ng->udp_listeners[1].fd = -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctrl_ng->poller = p;
|
|
|
|
|
|
|
|
|
|
if (streambuf_listener_init(&ctrl_ng->tcp_listeners[0], p, ep,
|
|
|
|
|
control_incoming, control_stream_readable,
|
|
|
|
|
control_closed,
|
|
|
|
|
NULL,
|
|
|
|
|
&ctrl_ng->obj)) {
|
|
|
|
|
ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));
|
|
|
|
|
goto fail;
|
|
|
|
|
}
|
|
|
|
|
if (ipv46_any_convert(ep)) {
|
|
|
|
|
if (streambuf_listener_init(&ctrl_ng->tcp_listeners[1], p, ep,
|
|
|
|
|
control_incoming, control_stream_readable,
|
|
|
|
|
control_closed,
|
|
|
|
|
NULL,
|
|
|
|
|
&ctrl_ng->obj)) {
|
|
|
|
|
ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));
|
|
|
|
|
goto fail;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tcp_connections_hash = g_hash_table_new(g_str_hash, g_str_equal);
|
|
|
|
|
mutex_init(&tcp_connections_lock);
|
|
|
|
|
return ctrl_ng;
|
|
|
|
|
|
|
|
|
|
fail:
|
|
|
|
|
obj_put(ctrl_ng);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void notify_tcp_client(gpointer key, gpointer value, gpointer user_data) {
|
|
|
|
|
struct streambuf_stream *s = (struct streambuf_stream *)value;
|
|
|
|
|
str *to_send = (str *)user_data;
|
|
|
|
|
char cookie_buf[16];
|
|
|
|
|
str cookie = STR_CONST_INIT_LEN(cookie_buf, sizeof(cookie_buf));
|
|
|
|
|
|
|
|
|
|
rand_hex_str(cookie_buf, sizeof(cookie_buf) / 2);
|
|
|
|
|
control_ng_send(&cookie, to_send, &s->sock.remote, &s->sock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void notify_ng_tcp_clients(str *data) {
|
|
|
|
|
mutex_lock(&tcp_connections_lock);
|
|
|
|
|
g_hash_table_foreach(tcp_connections_hash, notify_tcp_client, data);
|
|
|
|
|
mutex_unlock(&tcp_connections_lock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void control_ng_init() {
|
|
|
|
|
mutex_init(&rtpe_cngs_lock);
|
|
|
|
|