Implementation of control-ng via TCP.

pull/1180/head
Damir Nedžibović 4 years ago
parent c5ef68897a
commit ffe187f116

@ -703,3 +703,112 @@ void bencode_buffer_destroy_add(bencode_buffer_t *buf, free_func_t func, void *p
li->next = buf->free_list;
buf->free_list = li;
}
static int __bencode_string(const char *s, int offset, int len) {
int pos;
unsigned long long sl;
char *end;
for (pos = offset + 1; s[pos] != 0x3a && isdigit(s[pos]) && pos < len; ++pos);
if (pos == len)
return -1;
sl = strtoul(s + offset + 1, &end, 10);
if (s + offset + 1 == end || end != s + pos)
return -2;
if (pos + sl > len)
return -1;
return pos + sl + 1;
}
static int __bencode_integer(const char *s, int offset, int len) {
int pos;
if (s[offset + 1] == 0x2d) {
if (offset + 3 < len && s[offset + 2] == 0x30 && s[offset + 3] == 0x65) {
return -2;
}
++offset;
}
if (s[offset + 1] == 0x65)
return -2;
if (s[offset + 1] == 0x30) {
if (offset + 2 < len && s[offset + 2] == 0x65)
return offset + 3;
return -2;
}
for (pos = offset + 1; s[pos] != 0x65 && pos < len; ++pos) {
if (s[pos] < 0x30 || s[pos] > 0x39)
return -2;
}
if (pos == len)
return -1;
return pos + 1;
}
static int __bencode_next(const char *s, int offset, int len);
static int __bencode_list(const char *s, int offset, int len) {
for (++offset; s[offset] != 0x65 && offset < len;) {
offset = __bencode_next(s, offset, len);
if (offset < 0)
return offset;
}
if (offset == len)
return -1;
return offset + 1;
}
static int __bencode_dictionary(const char *s, int offset, int len) {
for (++offset; s[offset] != 0x65 && offset < len;) {
offset = __bencode_string(s, offset - 1, len);
if (offset < 0)
return offset;
offset = __bencode_next(s, offset, len);
if (offset < 0)
return offset;
}
if (offset == len)
return -1;
return offset + 1;
}
static int __bencode_next(const char *s, int offset, int len) {
if (offset >= len)
return -1;
switch(s[offset]) {
case 0x69:
return __bencode_integer(s, offset, len);
case 0x6c:
return __bencode_list(s, offset, len);
case 0x64:
return __bencode_dictionary(s, offset, len);
case 0x30:
case 0x31:
case 0x32:
case 0x33:
case 0x34:
case 0x35:
case 0x36:
case 0x37:
case 0x38:
case 0x39:
return __bencode_string(s, offset - 1, len);
}
return -2;
}
int bencode_valid(const char *s, int len) {
return __bencode_next(s, 0, len);
}

@ -1,5 +1,6 @@
#include "control_ng.h"
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <assert.h>
@ -16,10 +17,14 @@
#include "log_funcs.h"
#include "main.h"
#include "statistics.h"
#include "streambuf.h"
#include "str.h"
#include "tcp_listener.h"
mutex_t rtpe_cngs_lock;
mutex_t tcp_connections_lock;
GHashTable *rtpe_cngs_hash;
GHashTable *tcp_connections_hash;
struct control_ng *rtpe_control_ng;
static struct cookie_cache ng_cookie_cache;
@ -42,7 +47,6 @@ const char *ng_command_strings_short[NGC_COUNT] = {
"PlayDTMF", "Stats",
};
static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) {
// lock offers
mutex_lock(&request->lock);
@ -390,7 +394,6 @@ out:
return funcret;
}
static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void *p1) {
socket_t *ul = p1;
struct iovec iov[3];
@ -408,13 +411,88 @@ static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void
socket_sendiov(ul, iov, iovlen, sin);
}
static void control_ng_incoming(struct obj *obj, struct udp_buffer *udp_buf)
{
control_ng_process(&udp_buf->str, &udp_buf->sin, udp_buf->addr, control_ng_send, udp_buf->listener,
&udp_buf->obj);
}
static void control_incoming(struct streambuf_stream *s) {
ilog(LOG_INFO, "New TCP control ng connection from %s", s->addr);
mutex_lock(&tcp_connections_lock);
g_hash_table_insert(tcp_connections_hash, s->addr, s);
mutex_unlock(&tcp_connections_lock);
ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash));
}
static void control_closed(struct streambuf_stream *s) {
ilog(LOG_INFO, "TCP control ng connection from %s is closing", s->addr);
mutex_lock(&tcp_connections_lock);
g_hash_table_remove(tcp_connections_hash, s->addr);
mutex_unlock(&tcp_connections_lock);
ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash));
}
static str *chunk_message(struct streambuf *b) {
char *p = NULL;
int len, to_del, bsize;
str *ret = NULL;
mutex_lock(&b->lock);
for (;;) {
if (b->eof)
break;
p = memchr(b->buf->str, ' ', b->buf->len);
if (!p)
break;
len = p - b->buf->str;
if (len == b->buf->len)
break;
++p; /* bencode dictionary here */
bsize = bencode_valid(p, b->buf->str + b->buf->len - p);
if (bsize < 0)
break; /* not enough data to parse bencoded dictionary */
p += bsize;
len = p - b->buf->str;
to_del = len;
ret = str_alloc(len);
memcpy(ret->s, b->buf->str, len);
ret->len = len;
g_string_erase(b->buf, 0, to_del);
break;
}
mutex_unlock(&b->lock);
return ret;
}
static void control_stream_readable(struct streambuf_stream *s) {
str *data;
ilog(LOG_DEBUG, "Got %ld bytes from %s", s->inbuf->buf->len, s->addr);
while ((data = chunk_message(s->inbuf))) {
ilog(LOG_DEBUG, "Got control ng message from %s", s->addr);
control_ng_process(data, &s->sock.remote, s->addr, control_ng_send, &s->sock, s->parent);
free(data);
}
if (streambuf_bufsize(s->inbuf) > 1024) {
ilog(LOG_WARNING, "Buffer length exceeded in control connection from %s", s->addr);
goto close;
}
return;
close:
streambuf_stream_close(s);
}
void control_ng_free(void *p) {
struct control_ng *c = p;
@ -433,9 +511,12 @@ void control_ng_free(void *p) {
poller_del_item(c->poller, c->udp_listeners[1].fd);
close_socket(&c->udp_listeners[0]);
close_socket(&c->udp_listeners[1]);
streambuf_listener_shutdown(&c->tcp_listeners[0]);
streambuf_listener_shutdown(&c->tcp_listeners[1]);
if (tcp_connections_hash)
g_hash_table_destroy(tcp_connections_hash);
}
struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) {
struct control_ng *c;
@ -463,9 +544,63 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned cha
fail2:
obj_put(c);
return NULL;
}
struct control_ng *control_ng_tcp_new(struct poller *p, endpoint_t *ep, struct control_ng *ctrl_ng) {
if (!p)
return NULL;
if (!ctrl_ng) {
ctrl_ng = obj_alloc0("control_ng", sizeof(*ctrl_ng), NULL);
ctrl_ng->udp_listeners[0].fd = -1;
ctrl_ng->udp_listeners[1].fd = -1;
}
ctrl_ng->poller = p;
if (streambuf_listener_init(&ctrl_ng->tcp_listeners[0], p, ep,
control_incoming, control_stream_readable,
control_closed,
NULL,
&ctrl_ng->obj)) {
ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));
goto fail;
}
if (ipv46_any_convert(ep)) {
if (streambuf_listener_init(&ctrl_ng->tcp_listeners[1], p, ep,
control_incoming, control_stream_readable,
control_closed,
NULL,
&ctrl_ng->obj)) {
ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno));
goto fail;
}
}
tcp_connections_hash = g_hash_table_new(g_str_hash, g_str_equal);
mutex_init(&tcp_connections_lock);
return ctrl_ng;
fail:
obj_put(ctrl_ng);
return NULL;
}
static void notify_tcp_client(gpointer key, gpointer value, gpointer user_data) {
struct streambuf_stream *s = (struct streambuf_stream *)value;
str *to_send = (str *)user_data;
char cookie_buf[16];
str cookie = STR_CONST_INIT_LEN(cookie_buf, sizeof(cookie_buf));
rand_hex_str(cookie_buf, sizeof(cookie_buf) / 2);
control_ng_send(&cookie, to_send, &s->sock.remote, &s->sock);
}
void notify_ng_tcp_clients(str *data) {
mutex_lock(&tcp_connections_lock);
g_hash_table_foreach(tcp_connections_hash, notify_tcp_client, data);
mutex_unlock(&tcp_connections_lock);
}
void control_ng_init() {
mutex_init(&rtpe_cngs_lock);

@ -368,6 +368,7 @@ static void options(int *argc, char ***argv) {
AUTO_CLEANUP_GBUF(listenps);
AUTO_CLEANUP_GBUF(listenudps);
AUTO_CLEANUP_GBUF(listenngs);
AUTO_CLEANUP_GBUF(listenngtcps);
AUTO_CLEANUP_GBUF(listencli);
AUTO_CLEANUP_GBUF(graphitep);
AUTO_CLEANUP_GBUF(graphite_prefix_s);
@ -400,6 +401,7 @@ static void options(int *argc, char ***argv) {
{ "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" },
{ "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46|HOSTNAME:]PORT" },
{ "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" },
{ "listen-tcp-ng", 'N', 0, G_OPTION_ARG_STRING, &listenngtcps, "TCP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" },
{ "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46|HOSTNAME:]PORT" },
{ "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "IP46|HOSTNAME:PORT" },
{ "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &rtpe_config.graphite_interval, "Graphite send interval in seconds", "INT" },
@ -504,8 +506,8 @@ static void options(int *argc, char ***argv) {
if (!if_a)
die("Missing option --interface");
if (!listenps && !listenudps && !listenngs)
die("Missing option --listen-tcp, --listen-udp or --listen-ng");
if (!listenps && !listenudps && !listenngs && !listenngtcps)
die("Missing option --listen-tcp, --listen-udp or --listen-ng or --listen-tcp-ng");
struct ifaddrs *ifas;
if (getifaddrs(&ifas)) {
@ -552,6 +554,10 @@ static void options(int *argc, char ***argv) {
if (endpoint_parse_any_getaddrinfo(&rtpe_config.ng_listen_ep, listenngs))
die("Invalid IP or port '%s' (--listen-ng)", listenngs);
}
if (listenngtcps) {
if (endpoint_parse_any_getaddrinfo(&rtpe_config.ng_tcp_listen_ep, listenngtcps))
die("Invalid IP or port '%s' (--listen-tcp-ng)", listenngtcps);
}
if (listencli) {if (endpoint_parse_any_getaddrinfo(&rtpe_config.cli_listen_ep, listencli))
die("Invalid IP or port '%s' (--listen-cli)", listencli);
@ -787,6 +793,7 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) {
ini_rtpe_cfg->tcp_listen_ep = rtpe_config.tcp_listen_ep;
ini_rtpe_cfg->udp_listen_ep = rtpe_config.udp_listen_ep;
ini_rtpe_cfg->ng_listen_ep = rtpe_config.ng_listen_ep;
ini_rtpe_cfg->ng_tcp_listen_ep = rtpe_config.ng_tcp_listen_ep;
ini_rtpe_cfg->cli_listen_ep = rtpe_config.cli_listen_ep;
ini_rtpe_cfg->redis_ep = rtpe_config.redis_ep;
ini_rtpe_cfg->redis_write_ep = rtpe_config.redis_write_ep;
@ -957,6 +964,12 @@ no_kernel:
die("Failed to open UDP control connection port");
}
if (rtpe_config.ng_tcp_listen_ep.port) {
rtpe_control_ng = control_ng_tcp_new(rtpe_poller, &rtpe_config.ng_tcp_listen_ep, rtpe_control_ng);
if (!rtpe_control_ng)
die("Failed to open TCP control connection port");
}
rtpe_cli = NULL;
if (rtpe_config.cli_listen_ep.port) {
interfaces_exclude_port(rtpe_config.cli_listen_ep.port);

@ -301,8 +301,8 @@ INLINE bencode_item_t *bencode_decode_expect(bencode_buffer_t *buf, const char *
/* Identical to bencode_decode_expect() but takes a "str" argument. */
INLINE bencode_item_t *bencode_decode_expect_str(bencode_buffer_t *buf, const str *s, bencode_type_t expect);
/* Returns the number of bytes that could successfully be decoded from 's', -1 if more bytes are needed or -2 on error */
int bencode_valid(const char *s, int len);
/*** DICTIONARY LOOKUP & EXTRACTION ***/

@ -5,9 +5,9 @@
#include "udp_listener.h"
#include "socket.h"
#include "str.h"
#include "tcp_listener.h"
#include "bencode.h"
struct poller;
enum ng_command {
@ -48,6 +48,7 @@ struct control_ng_stats {
struct control_ng {
struct obj obj;
socket_t udp_listeners[2];
struct streambuf_listener tcp_listeners[2];
struct poller *poller;
};
@ -62,6 +63,8 @@ extern const char *ng_command_strings[NGC_COUNT];
extern const char *ng_command_strings_short[NGC_COUNT];
struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char);
struct control_ng *control_ng_tcp_new(struct poller *, endpoint_t *, struct control_ng *);
void notify_ng_tcp_clients(str *);
void control_ng_init(void);
void control_ng_cleanup(void);
int control_ng_process(str *buf, const endpoint_t *sin, char *addr,

@ -54,6 +54,7 @@ struct rtpengine_config {
endpoint_t tcp_listen_ep;
endpoint_t udp_listen_ep;
endpoint_t ng_listen_ep;
endpoint_t ng_tcp_listen_ep;
endpoint_t cli_listen_ep;
endpoint_t redis_ep;
endpoint_t redis_write_ep;

@ -72,7 +72,7 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c
DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \
dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \
media_player.c jitter_buffer.c t38.c
media_player.c jitter_buffer.c t38.c tcp_listener.c
HASHSRCS+= call_interfaces.c control_ng.c sdp.c
endif
@ -164,7 +164,7 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr
rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \
control_ng.strhash.o \
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o
payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \
resample.o dtmflib.o

Loading…
Cancel
Save