#include "control_ng.h" #include #include #include #include #include #include "poller.h" #include "log_d.h" #include "cookie_cache.h" #include "call.h" #include "sdp.h" #include "call_interfaces.h" #include "main.h" #include "statistics.h" #include "streambuf.h" #include "homer.h" #include "cli.h" mutex_t rtpe_cngs_lock; mutex_t tcp_connections_lock; GHashTable *rtpe_cngs_hash; GHashTable *tcp_connections_hash; static struct cookie_cache ng_cookie_cache; static bool trace_ng = false; const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64] = { [LOAD_LIMIT_MAX_SESSIONS] = "Parallel session limit reached", [LOAD_LIMIT_CPU] = "CPU usage limit exceeded", [LOAD_LIMIT_LOAD] = "Load limit exceeded", [LOAD_LIMIT_BW] = "Bandwidth limit exceeded", }; const char *ng_command_strings[OP_COUNT] = { #define X(op, name, esc, short_name) [op] = name, NG_COMMANDS(X) #undef X }; const char *ng_command_strings_esc[OP_COUNT] = { #define X(op, name, esc, short_name) [op] = esc, NG_COMMANDS(X) #undef X }; const char *ng_command_strings_short[OP_COUNT] = { #define X(op, name, esc, short_name) [op] = short_name, NG_COMMANDS(X) #undef X }; typedef struct ng_ctx { str callid; enum ng_opmode command; str cookie; bool should_trace; const endpoint_t *sin_ep; const endpoint_t *local_ep; } ng_ctx; #define CH(func, ...) do { \ if (trace_ng) \ func( __VA_ARGS__); \ } while (0) static const char *bencode_dict_iter(const ng_parser_t *parser, bencode_item_t *input, const char *(*callback)(const ng_parser_t *, str *key, bencode_item_t *value, helper_arg), helper_arg arg) { if (input->type != BENCODE_DICTIONARY) return NULL; bencode_item_t *value = NULL; for (bencode_item_t *key = input->child; key; key = value->sibling) { value = key->sibling; if (!value) break; str k; if (!bencode_get_str(key, &k)) continue; const char *err = callback(parser, &k, value, arg); if (err) return err; } return NULL; } static bool bencode_is_dict(bencode_item_t *arg) { return arg->type == BENCODE_DICTIONARY; } static bool bencode_is_list(bencode_item_t *arg) { return arg->type == BENCODE_LIST; } static bool bencode_is_int(bencode_item_t *arg) { return arg->type == BENCODE_INTEGER; } static const char *bencode_list_iter(const ng_parser_t *parser, bencode_item_t *list, const char *(*str_callback)(str *key, unsigned int, helper_arg), const char *(*item_callback)(const ng_parser_t *, bencode_item_t *, helper_arg), helper_arg arg) { if (list->type != BENCODE_LIST) return NULL; // return error? str s; unsigned int idx = 0; for (bencode_item_t *it = list->child; it; it = it->sibling) { const char *err = NULL; if (bencode_get_str(it, &s) && str_callback) err = str_callback(&s, idx, arg); else if (item_callback) err = item_callback(parser, it, arg); else ilog(LOG_DEBUG, "Ignoring non-string value in list"); if (err) return err; idx++; } return NULL; } static long long bencode_get_int(bencode_item_t *arg) { return arg->value; } static parser_arg __bencode_dict(ng_parser_ctx_t *ctx) { return (parser_arg) bencode_dictionary(ctx->buffer); } static parser_arg __bencode_list(ng_parser_ctx_t *ctx) { return (parser_arg) bencode_list(ctx->buffer); } static void bencode_pretty_print(bencode_item_t *el, GString *s); static parser_arg __bencode_dictionary_get_expect(bencode_item_t *arg, const char *ele, bencode_type_t type) { return (parser_arg) bencode_dictionary_get_expect(arg, ele, type); } static bool __bencode_dictionary_contains(bencode_item_t *d, const char *ele) { return bencode_dictionary_get(d, ele) != NULL; } static parser_arg __bencode_dictionary_add(bencode_item_t *n, const char *e, bencode_item_t *v) { return (parser_arg) bencode_dictionary_add(n, e, v); } static parser_arg __bencode_dictionary_add_dictionary(bencode_item_t *n, const char *e) { return (parser_arg) bencode_dictionary_add_dictionary(n, e); } static parser_arg __bencode_dictionary_add_dictionary_dup(bencode_item_t *n, const char *e) { size_t len = strlen(e) + 1; char *s = bencode_buffer_alloc(n->buffer, len); memcpy(s, e, len); return (parser_arg) bencode_dictionary_add_dictionary(n, s); } static parser_arg __bencode_dictionary_add_list(bencode_item_t *n, const char *e) { return (parser_arg) bencode_dictionary_add_list(n, e); } static parser_arg __bencode_dictionary_add_list_dup(bencode_item_t *n, const char *e) { size_t len = strlen(e) + 1; char *s = bencode_buffer_alloc(n->buffer, len); memcpy(s, e, len); return (parser_arg) bencode_dictionary_add_list(n, s); } static parser_arg __bencode_list_add(bencode_item_t *l, bencode_item_t *e) { return (parser_arg) bencode_list_add(l, e); } static parser_arg __bencode_list_add_dictionary(bencode_item_t *l) { return (parser_arg) bencode_list_add_dictionary(l); } static str __bencode_collapse_str(ng_parser_ctx_t *ctx, bencode_item_t *a, void **to_free) { return bencode_collapse_str(a); } static const char *__bencode_strdup(ng_parser_ctx_t *ctx, const char *s) { return bencode_strdup(ctx->buffer, s); } static void __bencode_ctx_init(ng_parser_ctx_t *ctx, bencode_buffer_t *buf) { bencode_buffer_init(buf); *ctx = (ng_parser_ctx_t) { .parser = &ng_parser_native, .buffer = buf }; } static bool json_is_dict(JsonNode *n) { return json_node_get_node_type(n) == JSON_NODE_OBJECT; } static bool json_is_list(JsonNode *n) { return json_node_get_node_type(n) == JSON_NODE_ARRAY; } static bool json_is_int(JsonNode *n) { if (json_node_get_node_type(n) != JSON_NODE_VALUE) return false; GType type = json_node_get_value_type(n); switch (type) { case G_TYPE_INT: case G_TYPE_UINT: case G_TYPE_LONG: case G_TYPE_ULONG: case G_TYPE_INT64: case G_TYPE_UINT64: case G_TYPE_BOOLEAN: return true; } return false; } static str json_dict_get_str(JsonNode *dict, const char *entry) { JsonObject *o = json_node_get_object(dict); if (!o) goto out; JsonNode *n = json_object_get_member(o, entry); if (!n) goto out; const char *s = json_node_get_string(n); if (!s) goto out; return STR(s); out: return STR_NULL; } static void json_pretty_print(JsonNode *a, GString *out) { JsonGenerator *g = json_generator_new(); json_generator_set_root(g, a); json_generator_to_gstring(g, out); g_object_unref(g); } static long long json_get_int_str(JsonNode *n, long long def) { if (json_node_get_node_type(n) != JSON_NODE_VALUE) return def; GType type = json_node_get_value_type(n); switch (type) { case G_TYPE_INT: case G_TYPE_UINT: case G_TYPE_LONG: case G_TYPE_ULONG: case G_TYPE_INT64: case G_TYPE_UINT64: case G_TYPE_BOOLEAN: return json_node_get_int(n); case G_TYPE_STRING:; const char *s = json_node_get_string(n); char *ep; long long r = strtoll(s, &ep, 0); if (ep == s) return def; return r; default: return def; } } static long long json_get_int(JsonNode *n) { if (json_node_get_node_type(n) != JSON_NODE_VALUE) return 0; if (!json_is_int(n)) return 0; return json_node_get_int(n); } static long long json_dict_get_int_str(JsonNode *dict, const char *entry, long long def) { JsonObject *o = json_node_get_object(dict); if (!o) return def; JsonNode *n = json_object_get_member(o, entry); if (!n) return def; if (json_node_get_node_type(n) != JSON_NODE_VALUE) return def; return json_get_int_str(n, def); } static parser_arg json_dict_get_expect(JsonNode *dict, const char *entry, bencode_type_t type) { JsonObject *o = json_node_get_object(dict); if (!o) return (parser_arg) NULL; JsonNode *n = json_object_get_member(o, entry); if (!n) return (parser_arg) NULL; switch (type) { case BENCODE_LIST: if (json_node_get_node_type(n) != JSON_NODE_ARRAY) return (parser_arg) NULL; return (parser_arg) n; case BENCODE_DICTIONARY: if (json_node_get_node_type(n) != JSON_NODE_OBJECT) return (parser_arg) NULL; return (parser_arg) n; default: abort(); } } static bool json_dict_contains(JsonNode *on, const char *ele) { JsonObject *o = json_node_get_object(on); if (!o) return false; JsonNode *n = json_object_get_member(o, ele); return n != NULL; } static void json_dict_iter_fn(JsonObject *o, const char *key, JsonNode *val, void *arg) { void **ptrs = arg; const char *(*callback)(const ng_parser_t *, str *key, JsonNode *value, helper_arg) = ptrs[1]; const char *err = callback(ptrs[0], STR_PTR(key), val, ptrs[2]); if (err) ptrs[3] = (void *) err; } static const char *json_dict_iter(const ng_parser_t *parser, JsonNode *input, const char *(*callback)(const ng_parser_t *, str *key, JsonNode *value, helper_arg), helper_arg arg) { if (json_node_get_node_type(input) != JSON_NODE_OBJECT) return NULL; JsonObject *o = json_node_get_object(input); if (!o) return false; const void *ptrs[4] = { parser, callback, arg.generic }; json_object_foreach_member(o, json_dict_iter_fn, ptrs); if (ptrs[3]) return ptrs[3]; return NULL; } static const char *json_list_iter(const ng_parser_t *parser, JsonNode *list, const char *(*str_callback)(str *key, unsigned int, helper_arg), const char *(*item_callback)(const ng_parser_t *parser, JsonNode *, helper_arg), helper_arg arg) { if (json_node_get_node_type(list) != JSON_NODE_ARRAY) return NULL; // return error? JsonArray *a = json_node_get_array(list); if (!a) return NULL; // return error? unsigned int l = json_array_get_length(a); for (unsigned int i = 0; i < l; i++) { JsonNode *n = json_array_get_element(a, i); const char *err = NULL; if (json_node_get_node_type(n) == JSON_NODE_VALUE && json_node_get_value_type(n) == G_TYPE_STRING) { const char *s = json_node_get_string(n); if (s) err = str_callback(STR_PTR(s), i, arg); } else if (item_callback) err = item_callback(parser, n, arg); else ilog(LOG_DEBUG, "Ignoring non-string value in list"); if (err) return err; } return NULL; } static str *json_get_str(JsonNode *a, str *out) { const char *s = json_node_get_string(a); if (!s) return NULL; *out = STR(s); return out; } static int json_strcmp(JsonNode *n, const char *b) { if (json_node_get_node_type(n) != JSON_NODE_VALUE) return 2; if (json_node_get_value_type(n) != G_TYPE_STRING) return 1; const char *s = json_node_get_string(n); return strcmp(s, b); } static const char *__json_strdup(ng_parser_ctx_t *ctx, const char *s) { return s; } static parser_arg json_dict(ng_parser_ctx_t *c) { JsonObject *o = json_object_new(); JsonNode *n = json_node_init_object(json_node_new(JSON_NODE_OBJECT), o); json_object_unref(o); return (parser_arg) n; } static void json_dict_add_string(JsonNode *n, const char *k, const char *v) { json_object_set_string_member(json_node_get_object(n), k, v); } static void json_dict_add_str(JsonNode *n, const char *k, const str *v) { g_autoptr(char) s = g_malloc(v->len + 1); memcpy(s, v->s, v->len); s[v->len] = 0; json_object_set_string_member(json_node_get_object(n), k, s); } static void json_dict_add_int(JsonNode *n, const char *k, long long i) { json_object_set_int_member(json_node_get_object(n), k, i); } static parser_arg json_dict_add(JsonNode *n, const char *k, JsonNode *v) { json_object_set_member(json_node_get_object(n), k, v); return (parser_arg) v; } static parser_arg json_dict_add_list(JsonNode *n, const char *e) { JsonArray *a = json_array_new(); JsonNode *an = json_node_init_array(json_node_new(JSON_NODE_ARRAY), a); json_object_set_member(json_node_get_object(n), e, an); json_array_unref(a); return (parser_arg) an; } static parser_arg json_list(ng_parser_ctx_t *c) { JsonArray *a = json_array_new(); JsonNode *n = json_node_init_array(json_node_new(JSON_NODE_ARRAY), a); json_array_unref(a); return (parser_arg) n; } static parser_arg json_list_add(JsonNode *n, JsonNode *e) { json_array_add_element(json_node_get_array(n), e); return (parser_arg) e; } static parser_arg json_list_add_dict(JsonNode *n) { JsonObject *o = json_object_new(); JsonNode *on = json_node_init_object(json_node_new(JSON_NODE_OBJECT), o); json_array_add_element(json_node_get_array(n), on); json_object_unref(o); return (parser_arg) on; } static parser_arg json_dict_add_dict(JsonNode *n, const char *e) { JsonObject *o = json_object_new(); JsonNode *on = json_node_init_object(json_node_new(JSON_NODE_OBJECT), o); json_object_set_member(json_node_get_object(n), e, on); json_object_unref(o); return (parser_arg) on; } static void json_list_add_str(JsonNode *n, const str *v) { g_autoptr(char) s = g_malloc(v->len + 1); memcpy(s, v->s, v->len); s[v->len] = 0; json_array_add_string_element(json_node_get_array(n), s); } static void json_list_add_string(JsonNode *n, const char *s) { json_array_add_string_element(json_node_get_array(n), s); } static str json_collapse(ng_parser_ctx_t *ctx, JsonNode *a, void **to_free) { JsonGenerator *g = json_generator_new(); json_generator_set_root(g, a); size_t len; char *s = json_generator_to_data(g, &len); *to_free = s; g_object_unref(g); str out = STR_LEN(s, len); json_node_unref(a); return out; } static void json_ctx_init(ng_parser_ctx_t *ctx, bencode_buffer_t *buf) { *ctx = (ng_parser_ctx_t) { .parser = &ng_parser_json }; } static str dummy_encode_len(char *out, const char *in, size_t in_len) { return STR_LEN(in, in_len); } static str *dummy_decode_len(const char *in, size_t len) { str *r = str_alloc(len); memcpy(r->s, in, len); r->len = len; r->s[len] = '\0'; return r; } const ng_parser_t ng_parser_native = { .init = __bencode_ctx_init, .collapse = __bencode_collapse_str, .dict_iter = bencode_dict_iter, .is_list = bencode_is_list, .list_iter = bencode_list_iter, .get_str = bencode_get_str, .strcmp = bencode_strcmp, .strdup = __bencode_strdup, .get_int_str = bencode_get_integer_str, .is_int = bencode_is_int, .get_int = bencode_get_int, .is_dict = bencode_is_dict, .dict = __bencode_dict, .dict_get_str = bencode_dictionary_get_str, .dict_get_int_str = bencode_dictionary_get_int_str, .dict_get_expect = __bencode_dictionary_get_expect, .dict_contains = __bencode_dictionary_contains, .dict_add = __bencode_dictionary_add, .dict_add_string = bencode_dictionary_add_string, .dict_add_str = bencode_dictionary_add_str, .dict_add_str_dup = bencode_dictionary_add_str_dup, .dict_add_str_dup_dup = bencode_dictionary_add_str_dup_dup, .dict_add_int = bencode_dictionary_add_integer, .dict_add_dict = __bencode_dictionary_add_dictionary, .dict_add_dict_dup = __bencode_dictionary_add_dictionary_dup, .dict_add_list = __bencode_dictionary_add_list, .dict_add_list_dup = __bencode_dictionary_add_list_dup, .list = __bencode_list, .list_add = __bencode_list_add, .list_add_dict = __bencode_list_add_dictionary, .list_add_string = bencode_list_add_string, .list_add_str_dup = bencode_list_add_str_dup, .pretty_print = bencode_pretty_print, .escape = dummy_encode_len, .unescape = dummy_decode_len, }; const ng_parser_t ng_parser_json = { .init = json_ctx_init, .collapse = json_collapse, .dict_iter = json_dict_iter, .is_list = json_is_list, .list_iter = json_list_iter, .get_str = json_get_str, .strcmp = json_strcmp, .strdup = __json_strdup, .get_int_str = json_get_int_str, .is_int = json_is_int, .get_int = json_get_int, .is_dict = json_is_dict, .dict = json_dict, .dict_get_str = json_dict_get_str, .dict_get_int_str = json_dict_get_int_str, .dict_get_expect = json_dict_get_expect, .dict_contains = json_dict_contains, .dict_add = json_dict_add, .dict_add_string = json_dict_add_string, .dict_add_str = json_dict_add_str, .dict_add_str_dup = json_dict_add_str, .dict_add_str_dup_dup = json_dict_add_str, .dict_add_int = json_dict_add_int, .dict_add_dict = json_dict_add_dict, .dict_add_dict_dup = json_dict_add_dict, .dict_add_list = json_dict_add_list, .dict_add_list_dup = json_dict_add_list, .list = json_list, .list_add = json_list_add, .list_add_dict = json_list_add_dict, .list_add_string = json_list_add_string, .list_add_str_dup = json_list_add_str, .pretty_print = json_pretty_print, .escape = str_uri_encode_len, .unescape = str_uri_decode_len, }; void init_ng_tracing(void) { if (rtpe_config.homer_ng_on && has_homer()) trace_ng = true; } static GString *create_homer_msg(str *cookie, str *data) { GString *msg = g_string_sized_new(cookie->len + 1 + data->len); g_string_append_printf(msg, "%.*s %.*s", STR_FMT(cookie), STR_FMT(data)); return msg; } static bool should_trace_msg(enum ng_opmode command) { switch (command) { case OP_PING: return false; default: return true; } } static void homer_fill_values(ng_ctx *hctx, str *callid, enum ng_opmode command) { if (hctx) { hctx->command = command; hctx->callid = *callid; } } static void homer_trace_msg_in(ng_ctx *hctx, str *data) { if (hctx && hctx->local_ep) { hctx->should_trace = should_trace_msg(hctx->command); if (hctx->should_trace) { GString *msg = create_homer_msg(&hctx->cookie, data); homer_send(msg, &hctx->callid, hctx->sin_ep, hctx->local_ep, now_us(), rtpe_config.homer_ng_capt_proto); } } } static void homer_trace_msg_out(ng_ctx *hctx, str *data) { if (hctx && hctx->should_trace) { GString *msg = create_homer_msg(&hctx->cookie, data); homer_send(msg, &hctx->callid, hctx->local_ep, hctx->sin_ep, now_us(), rtpe_config.homer_ng_capt_proto); } } static void bencode_pretty_print(bencode_item_t *el, GString *s) { bencode_item_t *chld; const char *sep; switch (el->type) { case BENCODE_STRING: g_string_append(s, "\""); g_string_append_len(s, el->iov[1].iov_base, el->iov[1].iov_len); g_string_append(s, "\""); break; case BENCODE_INTEGER: g_string_append_printf(s, "%lli", el->value); break; case BENCODE_LIST: g_string_append(s, "[ "); sep = ""; for (chld = el->child; chld; chld = chld->sibling) { g_string_append(s, sep); bencode_pretty_print(chld, s); sep = ", "; } g_string_append(s, " ]"); break; case BENCODE_DICTIONARY: g_string_append(s, "{ "); sep = ""; for (chld = el->child; chld; chld = chld->sibling) { g_string_append(s, sep); bencode_pretty_print(chld, s); g_string_append(s, ": "); if (!chld->sibling) break; chld = chld->sibling; bencode_pretty_print(chld, s); sep = ", "; } g_string_append(s, " }"); break; default: abort(); } } struct control_ng_stats* get_control_ng_stats(const sockaddr_t *addr) { struct control_ng_stats* cur; mutex_lock(&rtpe_cngs_lock); cur = g_hash_table_lookup(rtpe_cngs_hash, addr); if (!cur) { cur = g_new0(__typeof(*cur), 1); cur->proxy = *addr; ilogs(control, LOG_DEBUG, "Adding a proxy for control ng stats:%s", sockaddr_print_buf(addr)); for (int i = 0; i < OP_COUNT; i++) { struct ng_command_stats *c = &cur->cmd[i]; mutex_init(&c->lock); } g_hash_table_insert(rtpe_cngs_hash, &cur->proxy, cur); } mutex_unlock(&rtpe_cngs_lock); return cur; } static void __ng_buffer_free(ng_buffer *ngbuf) { bencode_buffer_free(&ngbuf->buffer); if (ngbuf->ref) obj_put_o(ngbuf->ref); if (ngbuf->json) g_object_unref(ngbuf->json); g_free(ngbuf->sdp_out); if (ngbuf->call) obj_put(ngbuf->call); g_free(ngbuf->collapsed); } ng_buffer *ng_buffer_new(struct obj *ref) { __auto_type ngbuf = obj_alloc0(ng_buffer, __ng_buffer_free); if (ref) ngbuf->ref = obj_get_o(ref); // hold until we're done return ngbuf; } /** * Initialize resp context. */ static void prepare_resp_ctx(ng_command_ctx_t *command_ctx, const ng_parser_t *parser) { if (!command_ctx->parser_ctx.parser) parser->init(&command_ctx->parser_ctx, &command_ctx->ngbuf->buffer); /* TODO: JSON-like structured data probably needs to have own `parser_arg` * because otherwise resp is always added as dictionary */ command_ctx->resp = command_ctx->parser_ctx.parser->dict(&command_ctx->parser_ctx); assert(command_ctx->resp.gen != NULL); } static void control_ng_process_payload(ng_ctx *hctx, str *reply, str *data, const endpoint_t *sin, char *addr, struct obj *ref, struct ng_buffer **ngbufp) { str cmd = STR_NULL; const char *errstr, *resultstr; GString *log_str; int64_t cmd_start, cmd_stop, cmd_process_time = {0}; struct control_ng_stats* cur = get_control_ng_stats(&sin->address); ng_command_ctx_t command_ctx = {.opmode = -1}; const ng_parser_t *parser = &ng_parser_native; const ng_parser_t *json_parser = &ng_parser_json; command_ctx.ngbuf = *ngbufp = ng_buffer_new(ref); errstr = "Invalid data (no payload)"; if (data->len <= 0) { prepare_resp_ctx(&command_ctx, parser); goto err_send; } /* Bencode dictionary */ if (data->s[0] == 'd') { parser->init(&command_ctx.parser_ctx, &command_ctx.ngbuf->buffer); command_ctx.req.benc = bencode_decode_expect_str(&command_ctx.ngbuf->buffer, data, BENCODE_DICTIONARY); errstr = "Could not decode bencode dictionary"; if (!command_ctx.req.benc) { prepare_resp_ctx(&command_ctx, parser); goto err_send; } } /* JSON */ else if (data->s[0] == '{') { json_parser->init(&command_ctx.parser_ctx, &command_ctx.ngbuf->buffer); command_ctx.ngbuf->json = json_parser_new(); errstr = "Failed to parse JSON document"; if (!json_parser_load_from_data(command_ctx.ngbuf->json, data->s, data->len, NULL)) { prepare_resp_ctx(&command_ctx, json_parser); goto err_send; } command_ctx.req.json = json_parser_get_root(command_ctx.ngbuf->json); errstr = "Could not decode JSON dictionary"; if (!command_ctx.req.json || !json_parser->is_dict(command_ctx.req)) { prepare_resp_ctx(&command_ctx, json_parser); goto err_send; } } else { prepare_resp_ctx(&command_ctx, parser); errstr = "Invalid NG data format"; goto err_send; } parser = command_ctx.parser_ctx.parser; /* TODO: JSON-like structured data probably needs to have own `parser_arg` * because otherwise resp is always added as dictionary */ command_ctx.resp = parser->dict(&command_ctx.parser_ctx); assert(command_ctx.resp.gen != NULL); cmd = parser->dict_get_str(command_ctx.req, "command"); errstr = "Dictionary contains no key \"command\""; if (!cmd.s) goto err_send; str callid = parser->dict_get_str(command_ctx.req, "call-id"); log_info_str(&callid); ilogs(control, LOG_INFO, "Received command '"STR_FORMAT"' from %s", STR_FMT(&cmd), addr); if (get_log_level(control) >= LOG_DEBUG) { log_str = g_string_sized_new(256); g_string_append_printf(log_str, "Dump for '"STR_FORMAT"' from %s: %s", STR_FMT(&cmd), addr, rtpe_config.common.log_mark_prefix); parser->pretty_print(command_ctx.req, log_str); g_string_append(log_str, rtpe_config.common.log_mark_suffix); ilogs(control, LOG_DEBUG, "%.*s", (int) log_str->len, log_str->str); g_string_free(log_str, TRUE); } errstr = NULL; resultstr = "ok"; // start command timer cmd_start = now_us(); switch (__csh_lookup(&cmd)) { case CSH_LOOKUP("ping"): resultstr = "pong"; command_ctx.opmode = OP_PING; break; case CSH_LOOKUP("offer"): command_ctx.opmode = OP_OFFER; errstr = call_offer_ng(&command_ctx, addr); break; case CSH_LOOKUP("answer"): command_ctx.opmode = OP_ANSWER; errstr = call_answer_ng(&command_ctx); break; case CSH_LOOKUP("delete"): command_ctx.opmode = OP_DELETE; errstr = call_delete_ng(&command_ctx); break; case CSH_LOOKUP("query"): command_ctx.opmode = OP_QUERY; errstr = call_query_ng(&command_ctx); break; case CSH_LOOKUP("list"): command_ctx.opmode = OP_LIST; errstr = call_list_ng(&command_ctx); break; case CSH_LOOKUP("start recording"): command_ctx.opmode = OP_START_RECORDING; errstr = call_start_recording_ng(&command_ctx); break; case CSH_LOOKUP("stop recording"): command_ctx.opmode = OP_STOP_RECORDING; errstr = call_stop_recording_ng(&command_ctx); break; case CSH_LOOKUP("pause recording"): command_ctx.opmode = OP_PAUSE_RECORDING; errstr = call_pause_recording_ng(&command_ctx); break; case CSH_LOOKUP("start forwarding"): command_ctx.opmode = OP_START_FORWARDING; errstr = call_start_forwarding_ng(&command_ctx); break; case CSH_LOOKUP("stop forwarding"): command_ctx.opmode = OP_STOP_FORWARDING; errstr = call_stop_forwarding_ng(&command_ctx); break; case CSH_LOOKUP("block DTMF"): command_ctx.opmode = OP_BLOCK_DTMF; errstr = call_block_dtmf_ng(&command_ctx); break; case CSH_LOOKUP("unblock DTMF"): command_ctx.opmode = OP_UNBLOCK_DTMF; errstr = call_unblock_dtmf_ng(&command_ctx); break; case CSH_LOOKUP("block media"): command_ctx.opmode = OP_BLOCK_MEDIA; errstr = call_block_media_ng(&command_ctx); break; case CSH_LOOKUP("unblock media"): command_ctx.opmode = OP_UNBLOCK_MEDIA; errstr = call_unblock_media_ng(&command_ctx); break; case CSH_LOOKUP("silence media"): command_ctx.opmode = OP_SILENCE_MEDIA; errstr = call_silence_media_ng(&command_ctx); break; case CSH_LOOKUP("unsilence media"): command_ctx.opmode = OP_UNSILENCE_MEDIA; errstr = call_unsilence_media_ng(&command_ctx); break; case CSH_LOOKUP("play media"): command_ctx.opmode = OP_PLAY_MEDIA; errstr = call_play_media_ng(&command_ctx); break; case CSH_LOOKUP("stop media"): command_ctx.opmode = OP_STOP_MEDIA; errstr = call_stop_media_ng(&command_ctx); break; case CSH_LOOKUP("play DTMF"): command_ctx.opmode = OP_PLAY_DTMF; errstr = call_play_dtmf_ng(&command_ctx); break; case CSH_LOOKUP("statistics"): command_ctx.opmode = OP_STATISTICS; errstr = statistics_ng(&command_ctx); break; case CSH_LOOKUP("publish"): command_ctx.opmode = OP_PUBLISH; errstr = call_publish_ng(&command_ctx, addr); break; case CSH_LOOKUP("subscribe request"): command_ctx.opmode = OP_SUBSCRIBE_REQ; errstr = call_subscribe_request_ng(&command_ctx); break; case CSH_LOOKUP("subscribe answer"): command_ctx.opmode = OP_SUBSCRIBE_ANS; errstr = call_subscribe_answer_ng(&command_ctx); break; case CSH_LOOKUP("unsubscribe"): command_ctx.opmode = OP_UNSUBSCRIBE; errstr = call_unsubscribe_ng(&command_ctx); break; case CSH_LOOKUP("inject start"): command_ctx.opmode = OP_INJECT_START; errstr = call_inject_start_ng(&command_ctx); break; case CSH_LOOKUP("inject stop"): command_ctx.opmode = OP_INJECT_STOP; errstr = call_inject_stop_ng(&command_ctx); break; case CSH_LOOKUP("connect"): command_ctx.opmode = OP_CONNECT; errstr = call_connect_ng(&command_ctx); break; case CSH_LOOKUP("cli"): case CSH_LOOKUP("CLI"): command_ctx.opmode = OP_CLI; errstr = cli_ng(&command_ctx); break; case CSH_LOOKUP("transform"): command_ctx.opmode = OP_TRANSFORM; errstr = call_transform_ng(&command_ctx); break; case CSH_LOOKUP("create"): command_ctx.opmode = OP_CREATE; errstr = call_create_ng(&command_ctx); break; case CSH_LOOKUP("create answer"): command_ctx.opmode = OP_CREATE_ANSWER; errstr = call_create_answer_ng(&command_ctx); break; case CSH_LOOKUP("mesh"): command_ctx.opmode = OP_MESH; errstr = call_mesh_ng(&command_ctx); break; default: errstr = "Unrecognized command"; } CH(homer_fill_values, hctx, &callid, command_ctx.opmode); CH(homer_trace_msg_in, hctx, data); // stop command timer cmd_stop = now_us(); //print command duration cmd_process_time = cmd_stop - cmd_start; if (command_ctx.opmode >= 0 && command_ctx.opmode < OP_COUNT) { mutex_lock(&cur->cmd[command_ctx.opmode].lock); cur->cmd[command_ctx.opmode].count++; cur->cmd[command_ctx.opmode].time += cmd_process_time; mutex_unlock(&cur->cmd[command_ctx.opmode].lock); } if (errstr) goto err_send; parser->dict_add_string(command_ctx.resp, "result", resultstr); // update interval statistics RTPE_STATS_INC(ng_commands[command_ctx.opmode]); RTPE_STATS_SAMPLE(ng_command_times[command_ctx.opmode], cmd_process_time); goto send_resp; err_send: if (errstr < magic_load_limit_strings[0] || errstr > magic_load_limit_strings[__LOAD_LIMIT_MAX-1]) { ilogs(control, LOG_WARNING, "Protocol error in packet from %s: %s [" STR_FORMAT_M "]", addr, errstr, STR_FMT_M(data)); parser->dict_add_string(command_ctx.resp, "result", "error"); parser->dict_add_string(command_ctx.resp, "error-reason", errstr); g_atomic_int_inc(&cur->errors); cmd = STR_NULL; } else { parser->dict_add_string(command_ctx.resp, "result", "load limit"); parser->dict_add_string(command_ctx.resp, "message", errstr); } send_resp: if (cmd.s) { ilogs(control, LOG_INFO, "Replying to '" STR_FORMAT "' " "from %s (elapsed time %" PRId64 ".%06" PRId64 " sec)", STR_FMT(&cmd), addr, cmd_process_time / 1000000, cmd_process_time % 1000000); if (get_log_level(control) >= LOG_DEBUG) { log_str = g_string_sized_new(256); g_string_append_printf(log_str, "Response dump for '" STR_FORMAT "' to %s: %s", STR_FMT(&cmd), addr, rtpe_config.common.log_mark_prefix); parser->pretty_print(command_ctx.resp, log_str); g_string_append(log_str, rtpe_config.common.log_mark_suffix); ilogs(control, LOG_DEBUG, "%.*s", (int) log_str->len, log_str->str); g_string_free(log_str, TRUE); } } *reply = parser->collapse(&command_ctx.parser_ctx, command_ctx.resp, &command_ctx.ngbuf->collapsed); release_closed_sockets(); log_info_pop_until(&callid); CH(homer_trace_msg_out, hctx, reply); } int control_ng_process(str *buf, const endpoint_t *sin, char *addr, const sockaddr_t *local, void (*cb)(str *, str *, const endpoint_t *, const sockaddr_t *, void *), void *p1, struct obj *ref) { str data; str_chr_str(&data, buf, ' '); if (!data.s || data.s == buf->s) { ilogs(control, LOG_WARNING, "Received invalid NG data (no cookie) from %s: " STR_FORMAT_M, addr, STR_FMT_M(buf)); return -1; } str cookie = *buf; cookie.len -= data.len; *data.s++ = '\0'; data.len--; cache_entry *cached = cookie_cache_lookup(&ng_cookie_cache, &cookie); if (cached) { ilogs(control, LOG_INFO, "Detected command from %s as a duplicate", addr); ng_ctx hctx = {.sin_ep = sin, .local_ep = p1 ? &(((socket_t*)p1)->local) : NULL, .cookie = cookie, .command = cached->command, .callid = cached->callid, .should_trace = should_trace_msg(cached->command)}; CH(homer_trace_msg_in, &hctx, &data); cb(&cookie, &cached->reply, sin, local, p1); CH(homer_trace_msg_out, &hctx, &cached->reply); cache_entry_free(cached); return 0; } str reply; g_autoptr(ng_buffer) ngbuf = NULL; ng_ctx hctx = {.sin_ep = sin, .local_ep = p1 ? &(((socket_t*)p1)->local) : NULL, .cookie = cookie, .command = -1}; control_ng_process_payload(trace_ng ? &hctx : NULL, &reply, &data, sin, addr, ref, &ngbuf); cb(&cookie, &reply, sin, local, p1); cache_entry ce = {.reply = reply, .command = hctx.command, .callid = hctx.callid}; cookie_cache_insert(&ng_cookie_cache, &cookie, &ce); return 0; } int control_ng_process_plain(str *data, const endpoint_t *sin, char *addr, const sockaddr_t *local, void (*cb)(str *, str *, const endpoint_t *, const sockaddr_t *, void *), void *p1, struct obj *ref) { g_autoptr(ng_buffer) ngbuf = NULL; str reply; control_ng_process_payload(NULL, &reply, data, sin, addr, ref, &ngbuf); cb(NULL, &reply, sin, local, p1); return 0; } INLINE void control_ng_send_generic(str *cookie, str *body, const endpoint_t *sin, const sockaddr_t *from, void *p1) { socket_t *ul = p1; struct iovec iov[3]; unsigned int iovlen; iovlen = 3; iov[0].iov_base = cookie->s; iov[0].iov_len = cookie->len; iov[1].iov_base = " "; iov[1].iov_len = 1; iov[2].iov_base = body->s; iov[2].iov_len = body->len; socket_sendiov(ul, iov, iovlen, sin, from); } static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, const sockaddr_t *from, void *p1) { control_ng_send_generic(cookie, body, sin, NULL, p1); } static void control_ng_send_from(str *cookie, str *body, const endpoint_t *sin, const sockaddr_t *from, void *p1) { control_ng_send_generic(cookie, body, sin, from, p1); } static void control_ng_incoming(struct obj *obj, struct udp_buffer *udp_buf) { control_ng_process(&udp_buf->str, &udp_buf->sin, udp_buf->addr, &udp_buf->local_addr, control_ng_send_from, udp_buf->listener, &udp_buf->obj); } static void control_incoming(struct streambuf_stream *s) { ilog(LOG_INFO, "New TCP control ng connection from %s", s->addr); mutex_lock(&tcp_connections_lock); g_hash_table_insert(tcp_connections_hash, s->addr, s); mutex_unlock(&tcp_connections_lock); ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash)); } static void control_closed(struct streambuf_stream *s) { ilog(LOG_INFO, "TCP control ng connection from %s is closing", s->addr); mutex_lock(&tcp_connections_lock); g_hash_table_remove(tcp_connections_hash, s->addr); mutex_unlock(&tcp_connections_lock); ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash)); } static str *chunk_message(struct streambuf *b) { char *p = NULL; int len, to_del, bsize; str *ret = NULL; mutex_lock(&b->lock); for (;;) { if (b->eof) break; p = memchr(b->buf->str, ' ', b->buf->len); if (!p) break; len = p - b->buf->str; if (len == b->buf->len) break; ++p; /* bencode dictionary here */ bsize = bencode_valid(p, b->buf->str + b->buf->len - p); if (bsize < 0) break; /* not enough data to parse bencoded dictionary */ p += bsize; len = p - b->buf->str; to_del = len; ret = str_alloc(len); memcpy(ret->s, b->buf->str, len); ret->len = len; g_string_erase(b->buf, 0, to_del); break; } mutex_unlock(&b->lock); return ret; } static void control_stream_readable(struct streambuf_stream *s) { str *data; ilog(LOG_DEBUG, "Got %zu bytes from %s", s->inbuf->buf->len, s->addr); while ((data = chunk_message(s->inbuf))) { ilog(LOG_DEBUG, "Got control ng message from %s", s->addr); control_ng_process(data, &s->sock.remote, s->addr, NULL, control_ng_send, &s->sock, s->parent); free(data); } if (streambuf_bufsize(s->inbuf) > 1024) { ilog(LOG_WARNING, "Buffer length exceeded in control connection from %s", s->addr); goto close; } return; close: streambuf_stream_close(s); } void control_ng_free(struct control_ng *c) { // XXX this should go elsewhere if (rtpe_cngs_hash) { GList *ll = g_hash_table_get_values(rtpe_cngs_hash); for (GList *l = ll; l; l = l->next) { struct control_ng_stats *s = l->data; g_free(s); } g_list_free(ll); g_hash_table_destroy(rtpe_cngs_hash); rtpe_cngs_hash = NULL; } rtpe_poller_del_item(rtpe_control_poller, c->udp_listener.fd); reset_socket(&c->udp_listener); streambuf_listener_shutdown(&c->tcp_listener); if (tcp_connections_hash) g_hash_table_destroy(tcp_connections_hash); } struct control_ng *control_ng_new(const endpoint_t *ep) { struct control_ng *c; c = obj_alloc0(struct control_ng, control_ng_free); c->udp_listener.fd = -1; if (udp_listener_init(&c->udp_listener, ep, control_ng_incoming, &c->obj)) goto fail2; if (rtpe_config.control_tos) set_tos(&c->udp_listener, rtpe_config.control_tos); if (rtpe_config.control_pmtu) set_pmtu_disc(&c->udp_listener, rtpe_config.control_pmtu == PMTU_DISC_WANT ? IP_PMTUDISC_WANT : IP_PMTUDISC_DONT); return c; fail2: obj_put(c); return NULL; } struct control_ng *control_ng_tcp_new(const endpoint_t *ep) { struct control_ng *ctrl_ng = obj_alloc0(struct control_ng, NULL); ctrl_ng->udp_listener.fd = -1; if (streambuf_listener_init(&ctrl_ng->tcp_listener, ep, control_incoming, control_stream_readable, control_closed, &ctrl_ng->obj)) { ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); goto fail; } tcp_connections_hash = g_hash_table_new(g_str_hash, g_str_equal); mutex_init(&tcp_connections_lock); return ctrl_ng; fail: obj_put(ctrl_ng); return NULL; } static void notify_tcp_client(gpointer key, gpointer value, gpointer user_data) { struct streambuf_stream *s = (struct streambuf_stream *)value; str *to_send = (str *)user_data; char cookie_buf[17]; str cookie = STR_CONST(cookie_buf); rand_hex_str(cookie_buf, cookie.len / 2); control_ng_send(&cookie, to_send, &s->sock.remote, NULL, &s->sock); } void notify_ng_tcp_clients(str *data) { mutex_lock(&tcp_connections_lock); g_hash_table_foreach(tcp_connections_hash, notify_tcp_client, data); mutex_unlock(&tcp_connections_lock); } void control_ng_init(void) { mutex_init(&rtpe_cngs_lock); rtpe_cngs_hash = g_hash_table_new(sockaddr_t_hash, sockaddr_t_eq); cookie_cache_init(&ng_cookie_cache); } void control_ng_cleanup(void) { cookie_cache_cleanup(&ng_cookie_cache); }