diff --git a/README.md b/README.md index 03b50bd20..2dc4ce742 100644 --- a/README.md +++ b/README.md @@ -1961,6 +1961,13 @@ command. Sample return dictionary: "result": "ok" } +The *tcp-ng* Control Protocol +========================= + +*rtpengine* also has support for *ng* control protocol where transport is +TCP (If enabled in the config via the --listen-tcp-ng option). Everything +said for UDP based *ng* protocol counts for TCP variant too. + HTTP/WebSocket support ====================== diff --git a/daemon/bencode.c b/daemon/bencode.c index 4b4eb12e5..e6d1ae2be 100644 --- a/daemon/bencode.c +++ b/daemon/bencode.c @@ -5,7 +5,7 @@ #include #include #include - +#include /* set to 0 for alloc debugging, e.g. through valgrind */ #define BENCODE_MIN_BUFFER_PIECE_LEN 512 @@ -703,3 +703,112 @@ void bencode_buffer_destroy_add(bencode_buffer_t *buf, free_func_t func, void *p li->next = buf->free_list; buf->free_list = li; } + +static int __bencode_string(const char *s, int offset, int len) { + int pos; + unsigned long long sl; + char *end; + + for (pos = offset + 1; s[pos] != ':' && isdigit(s[pos]) && pos < len; ++pos); + if (pos == len) + return -1; + + sl = strtoul(s + offset + 1, &end, 10); + if (s + offset + 1 == end || end != s + pos) + return -2; + + if (pos + sl > len) + return -1; + + return pos + sl + 1; +} + +static int __bencode_integer(const char *s, int offset, int len) { + int pos; + + if (s[offset + 1] == '-') { + if (offset + 3 < len && s[offset + 2] == '0' && s[offset + 3] == 'e') { + return -2; + } + ++offset; + } + + if (s[offset + 1] == 'e') + return -2; + + if (s[offset + 1] == '0') { + if (offset + 2 < len && s[offset + 2] == 'e') + return offset + 3; + return -2; + } + + for (pos = offset + 1; s[pos] != 'e' && pos < len; ++pos) { + if (s[pos] < '0' || s[pos] > '9') + return -2; + } + + if (pos == len) + return -1; + + return pos + 1; +} + +static int __bencode_next(const char *s, int offset, int len); + +static int __bencode_list(const char *s, int offset, int len) { + for (++offset; s[offset] != 'e' && offset < len;) { + offset = __bencode_next(s, offset, len); + if (offset < 0) + return offset; + } + + if (offset == len) + return -1; + + return offset + 1; +} + +static int __bencode_dictionary(const char *s, int offset, int len) { + for (++offset; s[offset] != 'e' && offset < len;) { + offset = __bencode_string(s, offset - 1, len); + if (offset < 0) + return offset; + offset = __bencode_next(s, offset, len); + if (offset < 0) + return offset; + } + + if (offset == len) + return -1; + + return offset + 1; +} + +static int __bencode_next(const char *s, int offset, int len) { + if (offset >= len) + return -1; + switch(s[offset]) { + case 'i': + return __bencode_integer(s, offset, len); + case 'l': + return __bencode_list(s, offset, len); + case 'd': + return __bencode_dictionary(s, offset, len); + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + return __bencode_string(s, offset - 1, len); + } + return -2; +} + +int bencode_valid(const char *s, int len) { + return __bencode_next(s, 0, len); +} diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 91a7cbfa2..b6807bf39 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -1,5 +1,6 @@ #include "control_ng.h" +#include #include #include #include @@ -16,10 +17,14 @@ #include "log_funcs.h" #include "main.h" #include "statistics.h" - +#include "streambuf.h" +#include "str.h" +#include "tcp_listener.h" mutex_t rtpe_cngs_lock; +mutex_t tcp_connections_lock; GHashTable *rtpe_cngs_hash; +GHashTable *tcp_connections_hash; struct control_ng *rtpe_control_ng; static struct cookie_cache ng_cookie_cache; @@ -42,7 +47,6 @@ const char *ng_command_strings_short[NGC_COUNT] = { "PlayDTMF", "Stats", }; - static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) { // lock offers mutex_lock(&request->lock); @@ -390,7 +394,6 @@ out: return funcret; } - static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void *p1) { socket_t *ul = p1; struct iovec iov[3]; @@ -408,13 +411,88 @@ static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void socket_sendiov(ul, iov, iovlen, sin); } - static void control_ng_incoming(struct obj *obj, struct udp_buffer *udp_buf) { control_ng_process(&udp_buf->str, &udp_buf->sin, udp_buf->addr, control_ng_send, udp_buf->listener, &udp_buf->obj); } +static void control_incoming(struct streambuf_stream *s) { + ilog(LOG_INFO, "New TCP control ng connection from %s", s->addr); + mutex_lock(&tcp_connections_lock); + g_hash_table_insert(tcp_connections_hash, s->addr, s); + mutex_unlock(&tcp_connections_lock); + ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash)); +} + +static void control_closed(struct streambuf_stream *s) { + ilog(LOG_INFO, "TCP control ng connection from %s is closing", s->addr); + mutex_lock(&tcp_connections_lock); + g_hash_table_remove(tcp_connections_hash, s->addr); + mutex_unlock(&tcp_connections_lock); + ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash)); +} + +static str *chunk_message(struct streambuf *b) { + char *p = NULL; + int len, to_del, bsize; + str *ret = NULL; + + mutex_lock(&b->lock); + + for (;;) { + if (b->eof) + break; + + p = memchr(b->buf->str, ' ', b->buf->len); + if (!p) + break; + + len = p - b->buf->str; + if (len == b->buf->len) + break; + + ++p; /* bencode dictionary here */ + bsize = bencode_valid(p, b->buf->str + b->buf->len - p); + if (bsize < 0) + break; /* not enough data to parse bencoded dictionary */ + + p += bsize; + len = p - b->buf->str; + to_del = len; + + ret = str_alloc(len); + memcpy(ret->s, b->buf->str, len); + ret->len = len; + g_string_erase(b->buf, 0, to_del); + + break; + } + + mutex_unlock(&b->lock); + return ret; +} + +static void control_stream_readable(struct streambuf_stream *s) { + str *data; + + ilog(LOG_DEBUG, "Got %ld bytes from %s", s->inbuf->buf->len, s->addr); + while ((data = chunk_message(s->inbuf))) { + ilog(LOG_DEBUG, "Got control ng message from %s", s->addr); + control_ng_process(data, &s->sock.remote, s->addr, control_ng_send, &s->sock, s->parent); + free(data); + } + + if (streambuf_bufsize(s->inbuf) > 1024) { + ilog(LOG_WARNING, "Buffer length exceeded in control connection from %s", s->addr); + goto close; + } + + return; + + close: + streambuf_stream_close(s); +} void control_ng_free(void *p) { struct control_ng *c = p; @@ -433,9 +511,12 @@ void control_ng_free(void *p) { poller_del_item(c->poller, c->udp_listeners[1].fd); close_socket(&c->udp_listeners[0]); close_socket(&c->udp_listeners[1]); + streambuf_listener_shutdown(&c->tcp_listeners[0]); + streambuf_listener_shutdown(&c->tcp_listeners[1]); + if (tcp_connections_hash) + g_hash_table_destroy(tcp_connections_hash); } - struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) { struct control_ng *c; @@ -463,9 +544,63 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned cha fail2: obj_put(c); return NULL; +} + +struct control_ng *control_ng_tcp_new(struct poller *p, endpoint_t *ep, struct control_ng *ctrl_ng) { + if (!p) + return NULL; + + if (!ctrl_ng) { + ctrl_ng = obj_alloc0("control_ng", sizeof(*ctrl_ng), NULL); + ctrl_ng->udp_listeners[0].fd = -1; + ctrl_ng->udp_listeners[1].fd = -1; + } + + ctrl_ng->poller = p; + + if (streambuf_listener_init(&ctrl_ng->tcp_listeners[0], p, ep, + control_incoming, control_stream_readable, + control_closed, + NULL, + &ctrl_ng->obj)) { + ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); + goto fail; + } + if (ipv46_any_convert(ep)) { + if (streambuf_listener_init(&ctrl_ng->tcp_listeners[1], p, ep, + control_incoming, control_stream_readable, + control_closed, + NULL, + &ctrl_ng->obj)) { + ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); + goto fail; + } + } + + tcp_connections_hash = g_hash_table_new(g_str_hash, g_str_equal); + mutex_init(&tcp_connections_lock); + return ctrl_ng; + +fail: + obj_put(ctrl_ng); + return NULL; +} + +static void notify_tcp_client(gpointer key, gpointer value, gpointer user_data) { + struct streambuf_stream *s = (struct streambuf_stream *)value; + str *to_send = (str *)user_data; + char cookie_buf[16]; + str cookie = STR_CONST_INIT_LEN(cookie_buf, sizeof(cookie_buf)); + rand_hex_str(cookie_buf, sizeof(cookie_buf) / 2); + control_ng_send(&cookie, to_send, &s->sock.remote, &s->sock); } +void notify_ng_tcp_clients(str *data) { + mutex_lock(&tcp_connections_lock); + g_hash_table_foreach(tcp_connections_hash, notify_tcp_client, data); + mutex_unlock(&tcp_connections_lock); +} void control_ng_init() { mutex_init(&rtpe_cngs_lock); diff --git a/daemon/main.c b/daemon/main.c index db3ba905f..bec3d985f 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -368,6 +368,7 @@ static void options(int *argc, char ***argv) { AUTO_CLEANUP_GBUF(listenps); AUTO_CLEANUP_GBUF(listenudps); AUTO_CLEANUP_GBUF(listenngs); + AUTO_CLEANUP_GBUF(listenngtcps); AUTO_CLEANUP_GBUF(listencli); AUTO_CLEANUP_GBUF(graphitep); AUTO_CLEANUP_GBUF(graphite_prefix_s); @@ -400,6 +401,7 @@ static void options(int *argc, char ***argv) { { "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" }, { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46|HOSTNAME:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" }, + { "listen-tcp-ng", 'N', 0, G_OPTION_ARG_STRING, &listenngtcps, "TCP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46|HOSTNAME:]PORT" }, { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "IP46|HOSTNAME:PORT" }, { "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &rtpe_config.graphite_interval, "Graphite send interval in seconds", "INT" }, @@ -504,8 +506,8 @@ static void options(int *argc, char ***argv) { if (!if_a) die("Missing option --interface"); - if (!listenps && !listenudps && !listenngs) - die("Missing option --listen-tcp, --listen-udp or --listen-ng"); + if (!listenps && !listenudps && !listenngs && !listenngtcps) + die("Missing option --listen-tcp, --listen-udp or --listen-ng or --listen-tcp-ng"); struct ifaddrs *ifas; if (getifaddrs(&ifas)) { @@ -552,6 +554,10 @@ static void options(int *argc, char ***argv) { if (endpoint_parse_any_getaddrinfo(&rtpe_config.ng_listen_ep, listenngs)) die("Invalid IP or port '%s' (--listen-ng)", listenngs); } + if (listenngtcps) { + if (endpoint_parse_any_getaddrinfo(&rtpe_config.ng_tcp_listen_ep, listenngtcps)) + die("Invalid IP or port '%s' (--listen-tcp-ng)", listenngtcps); + } if (listencli) {if (endpoint_parse_any_getaddrinfo(&rtpe_config.cli_listen_ep, listencli)) die("Invalid IP or port '%s' (--listen-cli)", listencli); @@ -787,6 +793,7 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { ini_rtpe_cfg->tcp_listen_ep = rtpe_config.tcp_listen_ep; ini_rtpe_cfg->udp_listen_ep = rtpe_config.udp_listen_ep; ini_rtpe_cfg->ng_listen_ep = rtpe_config.ng_listen_ep; + ini_rtpe_cfg->ng_tcp_listen_ep = rtpe_config.ng_tcp_listen_ep; ini_rtpe_cfg->cli_listen_ep = rtpe_config.cli_listen_ep; ini_rtpe_cfg->redis_ep = rtpe_config.redis_ep; ini_rtpe_cfg->redis_write_ep = rtpe_config.redis_write_ep; @@ -957,6 +964,12 @@ no_kernel: die("Failed to open UDP control connection port"); } + if (rtpe_config.ng_tcp_listen_ep.port) { + rtpe_control_ng = control_ng_tcp_new(rtpe_poller, &rtpe_config.ng_tcp_listen_ep, rtpe_control_ng); + if (!rtpe_control_ng) + die("Failed to open TCP control connection port"); + } + rtpe_cli = NULL; if (rtpe_config.cli_listen_ep.port) { interfaces_exclude_port(rtpe_config.cli_listen_ep.port); diff --git a/daemon/rtpengine.pod b/daemon/rtpengine.pod index de0ac489b..8460a3dd0 100644 --- a/daemon/rtpengine.pod +++ b/daemon/rtpengine.pod @@ -4,7 +4,7 @@ rtpengine - NGCP proxy for RTP and other UDP based media traffic =head1 SYNOPSIS -B B<--interface>=I... B<--listen-tcp>|B<--listen-udp>|B<--listen-ng>=I... [I