diff --git a/daemon/Makefile b/daemon/Makefile index 7a4293e6f..ac925b0f1 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -98,7 +98,7 @@ SRCS= main.c kernel.c helpers.c control_tcp.c call.c control_udp.c redis.c \ crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.strhash.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c t38.c websocket.c \ - mqtt.c janus.strhash.c audio_player.c arena.c + mqtt.c janus.strhash.c audio_player.c arena.c ng_client.c ifneq ($(without_nftables),yes) SRCS+= nftables.c endif diff --git a/daemon/main.c b/daemon/main.c index b9d418a04..3baa623d0 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -60,6 +60,7 @@ #include "bufferpool.h" #include "log_funcs.h" #include "uring.h" +#include "ng_client.h" @@ -122,6 +123,8 @@ struct rtpengine_config rtpe_config = { .max_recv_iters = MAX_RECV_ITERS, .kernel_player_media = 128, .timer_accuracy = 500, + .ng_client_timeout = 50, // ms, will be scaled to us by *1000 + .ng_client_retries = 5, }; struct interface_config_callback_arg { @@ -804,6 +807,8 @@ static void options(int *argc, char ***argv, charp_ht templates) { { "vsc-pause-resume-rec",0,0,G_OPTION_ARG_STRING,&rtpe_config.vsc_pause_resume_rec.s,"DTMF VSC to pause/resume recording.", "STRING"}, { "vsc-start-pause-resume-rec",0,0,G_OPTION_ARG_STRING,&rtpe_config.vsc_start_pause_resume_rec.s,"DTMF VSC to start/pause/resume recording.", "STRING"}, + { "ng-client-timeout",0,0,G_OPTION_ARG_INT, &rtpe_config.ng_client_timeout,"Timeout in milliseconds for outgoing NG requests","INT"}, + { "ng-client-retries",0,0,G_OPTION_ARG_INT, &rtpe_config.ng_client_retries,"How often to retry a timed-out NG request","INT"}, { NULL, } }; @@ -1285,6 +1290,12 @@ static void options(int *argc, char ***argv, charp_ht templates) { if (rtpe_config.timer_accuracy < 0) die("Invalid --timer-accuracy value (%d)", rtpe_config.timer_accuracy); + if (rtpe_config.ng_client_timeout <= 0) + die("Invalid value for 'ng-client-timeout'"); + rtpe_config.ng_client_timeout *= 1000; // from ms to us + if (rtpe_config.ng_client_retries <= 0) + die("Invalid value for 'ng-client-retries'"); + // everything OK, do post-processing } @@ -1634,6 +1645,8 @@ static void create_everything(void) { die("Failed to preload media from database into cache"); } + + ng_client_init(); } @@ -1863,6 +1876,7 @@ int main(int argc, char **argv) { nftables_shutdown(rtpe_config.nftables_chain, rtpe_config.nftables_base_chain, (nftables_args){.family = rtpe_config.nftables_family}); #endif + ng_client_cleanup(); bufferpool_destroy(shm_bufferpool); kernel_shutdown_table(); options_free(); diff --git a/daemon/ng_client.c b/daemon/ng_client.c new file mode 100644 index 000000000..32b2d0369 --- /dev/null +++ b/daemon/ng_client.c @@ -0,0 +1,194 @@ +#include "ng_client.h" +#include "media_socket.h" + +struct endpoint_sockets { + endpoint_t dst; + socket_slist *sockets; +}; + +static void socket_free(socket_t *s) { + close_socket(s); + g_free(s); +} + +static void endpoint_socket_free(struct endpoint_sockets *es) { + t_slist_free_full(es->sockets, socket_free); + g_free(es); +} + +TYPED_GHASHTABLE(endpoint_socket_ht, endpoint_t, struct endpoint_sockets, + endpoint_hash, endpoint_eq, + NULL, endpoint_socket_free) + + +static endpoint_socket_ht ng_client_endpoints; +static rwlock_t ng_client_endpoints_lock; + + +void ng_client_init(void) { + ng_client_endpoints = endpoint_socket_ht_new(); +} + +void ng_client_cleanup(void) { + t_hash_table_destroy(ng_client_endpoints); +} + + +static struct endpoint_sockets *ng_client_get_entry(const endpoint_t *dst) { + struct endpoint_sockets *es; + + // quick check for existing entry + { + RWLOCK_R(&ng_client_endpoints_lock); + es = t_hash_table_lookup(ng_client_endpoints, dst); + } + + if (es) + return es; + + // we have to create one + es = g_new0(__typeof(*es), 1); + es->dst = *dst; + + RWLOCK_W(&ng_client_endpoints_lock); + // ... but someone may have beaten us to it + __auto_type es2 = t_hash_table_lookup(ng_client_endpoints, dst); + if (es2) { + // lost the race + g_free(es); + es = es2; + } + else + t_hash_table_insert(ng_client_endpoints, &es->dst, es); + + return es; +} + +static socket_slist *ng_client_get_socket(struct endpoint_sockets *es) { + // see if we can grab a socket + socket_slist *link = __atomic_load_n(&es->sockets, __ATOMIC_SEQ_CST); + bool success = false; + while (link && !success) + success = __atomic_compare_exchange_n(&es->sockets, &link, link->next, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); + + if (link) { + link->next = NULL; + return link; + } + + // no socket available, we need to create one + link = g_new0(socket_slist, 1); + __auto_type sock = link->data = g_new0(socket_t, 1); + if (!connect_socket(sock, SOCK_DGRAM, &es->dst)) { + // oops... + ilog(LOG_ERR, "Failed to create or connect socket to remote NG peer (%s): %s", + endpoint_print_buf(&es->dst), + strerror(errno)); + g_free(link->data); + g_free(link); + return NULL; + } + socket_getsockname(sock); + interfaces_exclude_port(&sock->local); + socket_rcvtimeout(sock, rtpe_config.ng_client_timeout); + + return link; +} + +static void ng_client_put_socket(struct endpoint_sockets *es, socket_slist *link) { + link->next = __atomic_load_n(&es->sockets, __ATOMIC_SEQ_CST); + bool success; + do + success = __atomic_compare_exchange_n(&es->sockets, &link->next, link, + false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); + while (!success); +} + +bencode_item_t *ng_client_request(const endpoint_t *dst, const str *req, bencode_buffer_t *rbuf) { + __auto_type es = ng_client_get_entry(dst); + __auto_type link = ng_client_get_socket(es); + if (!link) + return NULL; + + __auto_type sock = link->data; + + // construct message + char cookie[17]; + rand_hex_str(cookie, 8); + cookie[16] = ' '; + + struct iovec iov[2] = { + { + .iov_base = cookie, + .iov_len = sizeof(cookie), + }, + { + .iov_base = req->s, + .iov_len = req->len, + }, + }; + + ilog(LOG_DEBUG, "Sending NG request to %s: '" STR_FORMAT "'", + endpoint_print_buf(dst), + STR_FMT(req)); + + static const size_t buflen = 4096; + char *buf = bencode_buffer_alloc(rbuf, buflen); + ssize_t len = 0; + + for (unsigned int try = 0; try < rtpe_config.ng_client_retries; try++) { + ilog(LOG_DEBUG, "Attempt #%u sending NG request", try + 1); + + ssize_t ret = socket_sendiov(sock, iov, G_N_ELEMENTS(iov), NULL, NULL); + if (ret <= 0) + goto err; + + // receive the response + len = socket_recvfrom(sock, buf, buflen, NULL); + if (len > 0) + ilog(LOG_DEBUG, "Received response from NG peer (%s): '%.*s'", + endpoint_print_buf(dst), + (int) len, buf); + if (len == buflen) + ilog(LOG_WARN, "Possibly truncated response from remote NG peer (%s)", + endpoint_print_buf(dst)); + if (len > sizeof(cookie) && memcmp(buf, cookie, sizeof(cookie)) == 0) + break; + + if (len < 0) + ilog(LOG_WARN, "Error reading from socket from remote NG peer (%s): %s", + endpoint_print_buf(dst), + strerror(errno)); + else if (len == 0) + ilog(LOG_WARN, "EOF while reading from socket from remote NG peer (%s)", + endpoint_print_buf(dst)); + else + ilog(LOG_WARN, "Short packet or mismatched cookie while reading from socket " + "from remote NG peer (%s)", + endpoint_print_buf(dst)); + } + + if (len <= 0) + goto err; + + ilog(LOG_DEBUG, "Received valid NG response: '%.*s'", (int) len, buf); + + bencode_item_t *ret = bencode_decode_expect(rbuf, buf + sizeof(cookie), len - sizeof(cookie), + BENCODE_DICTIONARY); + if (!ret) { + errno = EIO; + goto err; + } + + ng_client_put_socket(es, link); + return ret; + +err: + ilog(LOG_ERR, "Error communicating with remote NG peer (%s): %s", + endpoint_print_buf(&sock->remote), + strerror(errno)); + + ng_client_put_socket(es, link); + return NULL; +} diff --git a/docs/rtpengine.md b/docs/rtpengine.md index 90f9560f9..be72cee9c 100644 --- a/docs/rtpengine.md +++ b/docs/rtpengine.md @@ -1496,6 +1496,14 @@ call to inject-DTMF won't be sent to __\-\-dtmf-log-dest=__ or __\-\-listen-tcp- affinity). If this option is set to a negative number, then the number of available CPU cores will be used. +- __\-\ng-client-timeout=__*INT* +- __\-\ng-client-retries=__*INT* + + For requests sent to another NG peer, configure timeout and maximum number + of retries. The timeout is in milliseconds, must be larger than zero, and + defaults to 50. The number of retries must be larger than zero and defaults + to 5. + ## INTERFACES This section describes the legacy syntax for configuring interfaces, which can diff --git a/include/main.h b/include/main.h index 515255d24..1bfc4cc1b 100644 --- a/include/main.h +++ b/include/main.h @@ -100,6 +100,8 @@ enum endpoint_learning { X(db_expire) \ X(cache_expire) \ X(timer_accuracy) \ + X(ng_client_timeout) \ + X(ng_client_retries) \ #define RTPE_CONFIG_UINT64_PARAMS \ X(bw_limit) diff --git a/include/ng_client.h b/include/ng_client.h new file mode 100644 index 000000000..0754df861 --- /dev/null +++ b/include/ng_client.h @@ -0,0 +1,12 @@ +#ifndef _NG_CLIENT_H_ +#define _NG_CLIENT_H_ + +#include "types.h" +#include "bencode.h" + +void ng_client_init(void); +void ng_client_cleanup(void); + +bencode_item_t *ng_client_request(const endpoint_t *dst, const str *req, bencode_buffer_t *); + +#endif diff --git a/t/.gitignore b/t/.gitignore index 81eb775cc..828d533b6 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -89,3 +89,4 @@ bufferpool.c uring.c aead-decrypt arena.c +ng_client.c