diff --git a/daemon/bencode.c b/daemon/bencode.c index 4b4eb12e5..934b59cfa 100644 --- a/daemon/bencode.c +++ b/daemon/bencode.c @@ -703,3 +703,112 @@ void bencode_buffer_destroy_add(bencode_buffer_t *buf, free_func_t func, void *p li->next = buf->free_list; buf->free_list = li; } + +static int __bencode_string(const char *s, int offset, int len) { + int pos; + unsigned long long sl; + char *end; + + for (pos = offset + 1; s[pos] != 0x3a && isdigit(s[pos]) && pos < len; ++pos); + if (pos == len) + return -1; + + sl = strtoul(s + offset + 1, &end, 10); + if (s + offset + 1 == end || end != s + pos) + return -2; + + if (pos + sl > len) + return -1; + + return pos + sl + 1; +} + +static int __bencode_integer(const char *s, int offset, int len) { + int pos; + + if (s[offset + 1] == 0x2d) { + if (offset + 3 < len && s[offset + 2] == 0x30 && s[offset + 3] == 0x65) { + return -2; + } + ++offset; + } + + if (s[offset + 1] == 0x65) + return -2; + + if (s[offset + 1] == 0x30) { + if (offset + 2 < len && s[offset + 2] == 0x65) + return offset + 3; + return -2; + } + + for (pos = offset + 1; s[pos] != 0x65 && pos < len; ++pos) { + if (s[pos] < 0x30 || s[pos] > 0x39) + return -2; + } + + if (pos == len) + return -1; + + return pos + 1; +} + +static int __bencode_next(const char *s, int offset, int len); + +static int __bencode_list(const char *s, int offset, int len) { + for (++offset; s[offset] != 0x65 && offset < len;) { + offset = __bencode_next(s, offset, len); + if (offset < 0) + return offset; + } + + if (offset == len) + return -1; + + return offset + 1; +} + +static int __bencode_dictionary(const char *s, int offset, int len) { + for (++offset; s[offset] != 0x65 && offset < len;) { + offset = __bencode_string(s, offset - 1, len); + if (offset < 0) + return offset; + offset = __bencode_next(s, offset, len); + if (offset < 0) + return offset; + } + + if (offset == len) + return -1; + + return offset + 1; +} + +static int __bencode_next(const char *s, int offset, int len) { + if (offset >= len) + return -1; + switch(s[offset]) { + case 0x69: + return __bencode_integer(s, offset, len); + case 0x6c: + return __bencode_list(s, offset, len); + case 0x64: + return __bencode_dictionary(s, offset, len); + case 0x30: + case 0x31: + case 0x32: + case 0x33: + case 0x34: + case 0x35: + case 0x36: + case 0x37: + case 0x38: + case 0x39: + return __bencode_string(s, offset - 1, len); + } + return -2; +} + +int bencode_valid(const char *s, int len) { + return __bencode_next(s, 0, len); +} diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 91a7cbfa2..b6807bf39 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -1,5 +1,6 @@ #include "control_ng.h" +#include #include #include #include @@ -16,10 +17,14 @@ #include "log_funcs.h" #include "main.h" #include "statistics.h" - +#include "streambuf.h" +#include "str.h" +#include "tcp_listener.h" mutex_t rtpe_cngs_lock; +mutex_t tcp_connections_lock; GHashTable *rtpe_cngs_hash; +GHashTable *tcp_connections_hash; struct control_ng *rtpe_control_ng; static struct cookie_cache ng_cookie_cache; @@ -42,7 +47,6 @@ const char *ng_command_strings_short[NGC_COUNT] = { "PlayDTMF", "Stats", }; - static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) { // lock offers mutex_lock(&request->lock); @@ -390,7 +394,6 @@ out: return funcret; } - static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void *p1) { socket_t *ul = p1; struct iovec iov[3]; @@ -408,13 +411,88 @@ static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void socket_sendiov(ul, iov, iovlen, sin); } - static void control_ng_incoming(struct obj *obj, struct udp_buffer *udp_buf) { control_ng_process(&udp_buf->str, &udp_buf->sin, udp_buf->addr, control_ng_send, udp_buf->listener, &udp_buf->obj); } +static void control_incoming(struct streambuf_stream *s) { + ilog(LOG_INFO, "New TCP control ng connection from %s", s->addr); + mutex_lock(&tcp_connections_lock); + g_hash_table_insert(tcp_connections_hash, s->addr, s); + mutex_unlock(&tcp_connections_lock); + ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash)); +} + +static void control_closed(struct streambuf_stream *s) { + ilog(LOG_INFO, "TCP control ng connection from %s is closing", s->addr); + mutex_lock(&tcp_connections_lock); + g_hash_table_remove(tcp_connections_hash, s->addr); + mutex_unlock(&tcp_connections_lock); + ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash)); +} + +static str *chunk_message(struct streambuf *b) { + char *p = NULL; + int len, to_del, bsize; + str *ret = NULL; + + mutex_lock(&b->lock); + + for (;;) { + if (b->eof) + break; + + p = memchr(b->buf->str, ' ', b->buf->len); + if (!p) + break; + + len = p - b->buf->str; + if (len == b->buf->len) + break; + + ++p; /* bencode dictionary here */ + bsize = bencode_valid(p, b->buf->str + b->buf->len - p); + if (bsize < 0) + break; /* not enough data to parse bencoded dictionary */ + + p += bsize; + len = p - b->buf->str; + to_del = len; + + ret = str_alloc(len); + memcpy(ret->s, b->buf->str, len); + ret->len = len; + g_string_erase(b->buf, 0, to_del); + + break; + } + + mutex_unlock(&b->lock); + return ret; +} + +static void control_stream_readable(struct streambuf_stream *s) { + str *data; + + ilog(LOG_DEBUG, "Got %ld bytes from %s", s->inbuf->buf->len, s->addr); + while ((data = chunk_message(s->inbuf))) { + ilog(LOG_DEBUG, "Got control ng message from %s", s->addr); + control_ng_process(data, &s->sock.remote, s->addr, control_ng_send, &s->sock, s->parent); + free(data); + } + + if (streambuf_bufsize(s->inbuf) > 1024) { + ilog(LOG_WARNING, "Buffer length exceeded in control connection from %s", s->addr); + goto close; + } + + return; + + close: + streambuf_stream_close(s); +} void control_ng_free(void *p) { struct control_ng *c = p; @@ -433,9 +511,12 @@ void control_ng_free(void *p) { poller_del_item(c->poller, c->udp_listeners[1].fd); close_socket(&c->udp_listeners[0]); close_socket(&c->udp_listeners[1]); + streambuf_listener_shutdown(&c->tcp_listeners[0]); + streambuf_listener_shutdown(&c->tcp_listeners[1]); + if (tcp_connections_hash) + g_hash_table_destroy(tcp_connections_hash); } - struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) { struct control_ng *c; @@ -463,9 +544,63 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned cha fail2: obj_put(c); return NULL; +} + +struct control_ng *control_ng_tcp_new(struct poller *p, endpoint_t *ep, struct control_ng *ctrl_ng) { + if (!p) + return NULL; + + if (!ctrl_ng) { + ctrl_ng = obj_alloc0("control_ng", sizeof(*ctrl_ng), NULL); + ctrl_ng->udp_listeners[0].fd = -1; + ctrl_ng->udp_listeners[1].fd = -1; + } + + ctrl_ng->poller = p; + + if (streambuf_listener_init(&ctrl_ng->tcp_listeners[0], p, ep, + control_incoming, control_stream_readable, + control_closed, + NULL, + &ctrl_ng->obj)) { + ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); + goto fail; + } + if (ipv46_any_convert(ep)) { + if (streambuf_listener_init(&ctrl_ng->tcp_listeners[1], p, ep, + control_incoming, control_stream_readable, + control_closed, + NULL, + &ctrl_ng->obj)) { + ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); + goto fail; + } + } + + tcp_connections_hash = g_hash_table_new(g_str_hash, g_str_equal); + mutex_init(&tcp_connections_lock); + return ctrl_ng; + +fail: + obj_put(ctrl_ng); + return NULL; +} + +static void notify_tcp_client(gpointer key, gpointer value, gpointer user_data) { + struct streambuf_stream *s = (struct streambuf_stream *)value; + str *to_send = (str *)user_data; + char cookie_buf[16]; + str cookie = STR_CONST_INIT_LEN(cookie_buf, sizeof(cookie_buf)); + rand_hex_str(cookie_buf, sizeof(cookie_buf) / 2); + control_ng_send(&cookie, to_send, &s->sock.remote, &s->sock); } +void notify_ng_tcp_clients(str *data) { + mutex_lock(&tcp_connections_lock); + g_hash_table_foreach(tcp_connections_hash, notify_tcp_client, data); + mutex_unlock(&tcp_connections_lock); +} void control_ng_init() { mutex_init(&rtpe_cngs_lock); diff --git a/daemon/main.c b/daemon/main.c index db3ba905f..bec3d985f 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -368,6 +368,7 @@ static void options(int *argc, char ***argv) { AUTO_CLEANUP_GBUF(listenps); AUTO_CLEANUP_GBUF(listenudps); AUTO_CLEANUP_GBUF(listenngs); + AUTO_CLEANUP_GBUF(listenngtcps); AUTO_CLEANUP_GBUF(listencli); AUTO_CLEANUP_GBUF(graphitep); AUTO_CLEANUP_GBUF(graphite_prefix_s); @@ -400,6 +401,7 @@ static void options(int *argc, char ***argv) { { "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" }, { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46|HOSTNAME:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" }, + { "listen-tcp-ng", 'N', 0, G_OPTION_ARG_STRING, &listenngtcps, "TCP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46|HOSTNAME:]PORT" }, { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "IP46|HOSTNAME:PORT" }, { "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &rtpe_config.graphite_interval, "Graphite send interval in seconds", "INT" }, @@ -504,8 +506,8 @@ static void options(int *argc, char ***argv) { if (!if_a) die("Missing option --interface"); - if (!listenps && !listenudps && !listenngs) - die("Missing option --listen-tcp, --listen-udp or --listen-ng"); + if (!listenps && !listenudps && !listenngs && !listenngtcps) + die("Missing option --listen-tcp, --listen-udp or --listen-ng or --listen-tcp-ng"); struct ifaddrs *ifas; if (getifaddrs(&ifas)) { @@ -552,6 +554,10 @@ static void options(int *argc, char ***argv) { if (endpoint_parse_any_getaddrinfo(&rtpe_config.ng_listen_ep, listenngs)) die("Invalid IP or port '%s' (--listen-ng)", listenngs); } + if (listenngtcps) { + if (endpoint_parse_any_getaddrinfo(&rtpe_config.ng_tcp_listen_ep, listenngtcps)) + die("Invalid IP or port '%s' (--listen-tcp-ng)", listenngtcps); + } if (listencli) {if (endpoint_parse_any_getaddrinfo(&rtpe_config.cli_listen_ep, listencli)) die("Invalid IP or port '%s' (--listen-cli)", listencli); @@ -787,6 +793,7 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { ini_rtpe_cfg->tcp_listen_ep = rtpe_config.tcp_listen_ep; ini_rtpe_cfg->udp_listen_ep = rtpe_config.udp_listen_ep; ini_rtpe_cfg->ng_listen_ep = rtpe_config.ng_listen_ep; + ini_rtpe_cfg->ng_tcp_listen_ep = rtpe_config.ng_tcp_listen_ep; ini_rtpe_cfg->cli_listen_ep = rtpe_config.cli_listen_ep; ini_rtpe_cfg->redis_ep = rtpe_config.redis_ep; ini_rtpe_cfg->redis_write_ep = rtpe_config.redis_write_ep; @@ -957,6 +964,12 @@ no_kernel: die("Failed to open UDP control connection port"); } + if (rtpe_config.ng_tcp_listen_ep.port) { + rtpe_control_ng = control_ng_tcp_new(rtpe_poller, &rtpe_config.ng_tcp_listen_ep, rtpe_control_ng); + if (!rtpe_control_ng) + die("Failed to open TCP control connection port"); + } + rtpe_cli = NULL; if (rtpe_config.cli_listen_ep.port) { interfaces_exclude_port(rtpe_config.cli_listen_ep.port); diff --git a/include/bencode.h b/include/bencode.h index fb7a1efc5..e729c9b70 100644 --- a/include/bencode.h +++ b/include/bencode.h @@ -301,8 +301,8 @@ INLINE bencode_item_t *bencode_decode_expect(bencode_buffer_t *buf, const char * /* Identical to bencode_decode_expect() but takes a "str" argument. */ INLINE bencode_item_t *bencode_decode_expect_str(bencode_buffer_t *buf, const str *s, bencode_type_t expect); - - +/* Returns the number of bytes that could successfully be decoded from 's', -1 if more bytes are needed or -2 on error */ +int bencode_valid(const char *s, int len); /*** DICTIONARY LOOKUP & EXTRACTION ***/ diff --git a/include/control_ng.h b/include/control_ng.h index c5d61f39b..c275ae45a 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -5,9 +5,9 @@ #include "udp_listener.h" #include "socket.h" #include "str.h" +#include "tcp_listener.h" #include "bencode.h" - struct poller; enum ng_command { @@ -48,6 +48,7 @@ struct control_ng_stats { struct control_ng { struct obj obj; socket_t udp_listeners[2]; + struct streambuf_listener tcp_listeners[2]; struct poller *poller; }; @@ -62,6 +63,8 @@ extern const char *ng_command_strings[NGC_COUNT]; extern const char *ng_command_strings_short[NGC_COUNT]; struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char); +struct control_ng *control_ng_tcp_new(struct poller *, endpoint_t *, struct control_ng *); +void notify_ng_tcp_clients(str *); void control_ng_init(void); void control_ng_cleanup(void); int control_ng_process(str *buf, const endpoint_t *sin, char *addr, diff --git a/include/main.h b/include/main.h index cf715df82..067e1e190 100644 --- a/include/main.h +++ b/include/main.h @@ -54,6 +54,7 @@ struct rtpengine_config { endpoint_t tcp_listen_ep; endpoint_t udp_listen_ep; endpoint_t ng_listen_ep; + endpoint_t ng_tcp_listen_ep; endpoint_t cli_listen_ep; endpoint_t redis_ep; endpoint_t redis_write_ep; diff --git a/t/Makefile b/t/Makefile index f184159d5..6b4bf4126 100644 --- a/t/Makefile +++ b/t/Makefile @@ -72,7 +72,7 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \ - media_player.c jitter_buffer.c t38.c + media_player.c jitter_buffer.c t38.c tcp_listener.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c endif @@ -164,7 +164,7 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \ control_ng.strhash.o \ streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ - media_player.o jitter_buffer.o dtmflib.o t38.o + media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \ resample.o dtmflib.o