diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 8edccd9af..780b98c9f 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -270,7 +270,11 @@ static parser_arg json_dict_get_expect(JsonNode *dict, const char *entry, bencod return (parser_arg) NULL; switch (type) { case BENCODE_LIST: - if (json_node_get_value_type(n) != JSON_NODE_ARRAY) + if (json_node_get_node_type(n) != JSON_NODE_ARRAY) + return (parser_arg) NULL; + return (parser_arg) n; + case BENCODE_DICTIONARY: + if (json_node_get_node_type(n) != JSON_NODE_OBJECT) return (parser_arg) NULL; return (parser_arg) n; default: diff --git a/daemon/redis.c b/daemon/redis.c index 9d59eb785..5148e5c6f 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -50,6 +50,9 @@ struct redis *rtpe_redis_write_disabled; struct redis *rtpe_redis_notify; +static const ng_parser_t *redis_parser = &ng_parser_json; + + INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) return NULL; @@ -106,7 +109,7 @@ static int redis_ports_release_balance = 0; // negative = releasers, positive = static int redis_check_conn(struct redis *r); static void json_restore_call(struct redis *r, const str *id, bool foreign); static int redis_connect(struct redis *r, int wait); -static int json_build_ssrc(struct call_monologue *ml, JsonReader *root_reader); +static int json_build_ssrc(struct call_monologue *ml, parser_arg arg); // mutually exclusive multi-A multi-B lock @@ -1005,31 +1008,32 @@ INLINE void json_builder_add_string_value_uri_enc(JsonBuilder *builder, const ch str_uri_encode_len(enc, tmp, len); json_builder_add_string_value(builder,enc); } -INLINE str *json_reader_get_string_value_uri_enc(JsonReader *root_reader) { - const char *s = json_reader_get_string_value(root_reader); - if (!s) - return NULL; - str *out = str_uri_decode_len(s, strlen(s)); - return out; // must be free'd -} // XXX rework restore procedure to use functions like this everywhere and eliminate the GHashTable -INLINE long long json_reader_get_ll(JsonReader *root_reader, const char *key) { - if (!json_reader_read_member(root_reader, key)) - return -1; - str *ret = json_reader_get_string_value_uri_enc(root_reader); - long long r = strtoll(ret->s, NULL, 10); - free(ret); - json_reader_end_member(root_reader); - return r; +INLINE long long parser_get_ll(parser_arg arg, const char *key) { + return redis_parser->dict_get_int_str(arg, key, -1); +} + +static void json_get_hash_iter(const ng_parser_t *parser, str *key, parser_arg val_a, helper_arg arg) { + str val; + if (!parser->get_str(val_a, &val)) { + rlog(LOG_ERROR, "Could not read json member: " STR_FORMAT, STR_FMT(key)); + return; + } + + // XXX convert to proper str ht + char *tmp = g_memdup2(key->s, key->len + 1); + tmp[key->len] = '\0'; + // XXX eliminate string dup? eliminate URI decode? + if (g_hash_table_insert(arg.ht, tmp, str_uri_decode_len(val.s, val.len)) != TRUE) + rlog(LOG_WARNING,"Key %s already exists", tmp); } static int json_get_hash(struct redis_hash *out, - const char *key, unsigned int id, JsonReader *root_reader) + const char *key, unsigned int id, parser_arg root) { static unsigned int MAXKEYLENGTH = 512; char key_concatted[MAXKEYLENGTH]; int rc=0; - g_autoptr(char_p) orig_members = NULL; if (id == -1) { rc = snprintf(key_concatted, MAXKEYLENGTH, "%s",key); @@ -1038,48 +1042,22 @@ static int json_get_hash(struct redis_hash *out, } if (rc>=MAXKEYLENGTH) { rlog(LOG_ERROR,"Json key too long."); - goto err; + return -1; } - if (!json_reader_read_member(root_reader, key_concatted)) { + parser_arg dict = redis_parser->dict_get_expect(root, key_concatted, BENCODE_DICTIONARY); + if (!dict.gen) { rlog(LOG_ERROR, "Could not read json member: %s",key_concatted); - goto err; + return -1; } out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); if (!out->ht) - goto err; - - gchar **members = json_reader_list_members(root_reader); - orig_members = members; - int nmemb = json_reader_count_members (root_reader); - - for (int i=0; i < nmemb; ++i) { - - if (!json_reader_read_member(root_reader, *members)) { - rlog(LOG_ERROR, "Could not read json member: %s",*members); - goto err3; - } - str *val = json_reader_get_string_value_uri_enc(root_reader); - char* tmp = strdup(*members); - - if (g_hash_table_insert(out->ht, tmp, val) != TRUE) { - rlog(LOG_WARNING,"Key %s already exists", tmp); - goto err3; - } - - json_reader_end_member(root_reader); + return -1; - ++members; - } // for - json_reader_end_member (root_reader); + redis_parser->dict_iter(redis_parser, dict, json_get_hash_iter, out->ht); return 0; - -err3: - g_hash_table_destroy(out->ht); -err: - return -1; } static void json_destroy_hash(struct redis_hash *rh) { @@ -1205,38 +1183,40 @@ static void *redis_list_get_ptr(struct redis_list *list, struct redis_hash *rh, return redis_list_get_idx_ptr(list, idx); } +struct cb_iter_ptrs { // XXX remove this? + int (*cb)(str *, callback_arg_t, struct redis_list *, void *); + callback_arg_t cb_arg; + struct redis_list *list; + void *ptr; +}; + +static void json_build_list_cb_iter(str *val, unsigned int i, helper_arg arg) { + struct cb_iter_ptrs *args = arg.generic; + str *s = str_uri_decode_len(val->s, val->len); + args->cb(s, args->cb_arg, args->list, args->ptr); + g_free(s); +} + static int json_build_list_cb(callback_arg_t q, call_t *c, const char *key, unsigned int idx, struct redis_list *list, - int (*cb)(str *, callback_arg_t, struct redis_list *, void *), void *ptr, JsonReader *root_reader) + int (*cb)(str *, callback_arg_t, struct redis_list *, void *), void *ptr, parser_arg arg) { char key_concatted[256]; snprintf(key_concatted, 256, "%s-%u", key, idx); - if (!json_reader_read_member(root_reader, key_concatted)) { + parser_arg r_list = redis_parser->dict_get_expect(arg, key_concatted, BENCODE_LIST); + if (!r_list.gen) { rlog(LOG_ERROR,"Key in json not found:%s",key_concatted); return -1; } - int nmemb = json_reader_count_elements(root_reader); - for (int jidx=0; jidx < nmemb; ++jidx) { - if (!json_reader_read_element(root_reader,jidx)) { - rlog(LOG_ERROR,"Element in array not found."); - return -1; - } - str *s = json_reader_get_string_value_uri_enc(root_reader); - if (!s) { - rlog(LOG_ERROR,"String in json not found."); - return -1; - } - if (cb(s, q, list, ptr)) { - free(s); - return -1; - } - free(s); - json_reader_end_element(root_reader); - } - json_reader_end_member (root_reader); - + struct cb_iter_ptrs args = { + .cb = cb, + .cb_arg = q, + .list = list, + .ptr = ptr, + }; + redis_parser->list_iter(redis_parser, r_list, json_build_list_cb_iter, NULL, &args); return 0; } @@ -1257,20 +1237,20 @@ static int rbpa_cb_simple(str *s, callback_arg_t pap, struct redis_list *list, v } static int json_build_list(callback_arg_t q, call_t *c, const char *key, - unsigned int idx, struct redis_list *list, JsonReader *root_reader) + unsigned int idx, struct redis_list *list, parser_arg arg) { - return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL, root_reader); + return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL, arg); } static int json_build_ptra(medias_arr *q, call_t *c, const char *key, - unsigned int idx, struct redis_list *list, JsonReader *root_reader) + unsigned int idx, struct redis_list *list, parser_arg arg) { - return json_build_list_cb(q, c, key, idx, list, rbpa_cb_simple, NULL, root_reader); + return json_build_list_cb(q, c, key, idx, list, rbpa_cb_simple, NULL, arg); } static int json_get_list_hash(struct redis_list *out, const char *key, - const struct redis_hash *rh, const char *rh_num_key, JsonReader *root_reader) + const struct redis_hash *rh, const char *rh_num_key, parser_arg arg) { unsigned int i; @@ -1284,7 +1264,7 @@ static int json_get_list_hash(struct redis_list *out, goto err1; for (i = 0; i < out->len; i++) { - if (json_get_hash(&out->rh[i], key, i, root_reader)) + if (json_get_hash(&out->rh[i], key, i, arg)) goto err2; } @@ -1476,7 +1456,7 @@ static int redis_streams(call_t *c, struct redis_list *streams) { return 0; } -static int redis_tags(call_t *c, struct redis_list *tags, JsonReader *root_reader) { +static int redis_tags(call_t *c, struct redis_list *tags, parser_arg arg) { unsigned int i; int ii; atomic64 a64; @@ -1564,7 +1544,7 @@ static int redis_tags(call_t *c, struct redis_list *tags, JsonReader *root_reade ml->logical_intf = get_logical_interface(NULL, ml->desired_family, 0); } - if (json_build_ssrc(ml, root_reader)) + if (json_build_ssrc(ml, arg)) return -1; tags->ptrs[i] = ml; @@ -1594,7 +1574,7 @@ static int rbl_cb_plts_r(str *s, callback_arg_t dummy, struct redis_list *list, return 0; } static int json_medias(call_t *c, struct redis_list *medias, struct redis_list *tags, - JsonReader *root_reader) + parser_arg arg) { unsigned int i; int ii; @@ -1653,7 +1633,7 @@ static int json_medias(call_t *c, struct redis_list *medias, struct redis_list * med->bandwidth_rr = (!redis_hash_get_int(&ii, rh, "bandwidth_rr")) ? ii : -1; med->bandwidth_rs = (!redis_hash_get_int(&ii, rh, "bandwidth_rs")) ? ii : -1; - json_build_list_cb(NULL, c, "payload_types", i, NULL, rbl_cb_plts_r, med, root_reader); + json_build_list_cb(NULL, c, "payload_types", i, NULL, rbl_cb_plts_r, med, arg); /* XXX dtls */ /* link monologue */ @@ -1760,7 +1740,7 @@ static int rbl_subs_cb(str *s, callback_arg_t dummy, struct redis_list *list, vo return 0; } -static int json_link_tags(call_t *c, struct redis_list *tags, struct redis_list *medias, JsonReader *root_reader) +static int json_link_tags(call_t *c, struct redis_list *tags, struct redis_list *medias, parser_arg arg) { unsigned int i; struct call_monologue *ml, *other_ml; @@ -1777,20 +1757,17 @@ static int json_link_tags(call_t *c, struct redis_list *tags, struct redis_list snprintf(key_subscriptions_noa, 256, "subscriptions-noa-%u", i); /* Legacy */ - if (json_reader_read_member(root_reader, key_subscriptions)) + if (redis_parser->dict_contains(arg, key_subscriptions)) rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped."); - json_reader_end_member(root_reader); - if (json_reader_read_member(root_reader, key_subscriptions_oa)) + if (redis_parser->dict_contains(arg, key_subscriptions_oa)) rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped."); - json_reader_end_member(root_reader); - if (json_reader_read_member(root_reader, key_subscriptions_noa)) + if (redis_parser->dict_contains(arg, key_subscriptions_noa)) rlog(LOG_DEBUG, "Outdated format used to restore subscriptions (older rtpengine ver.), will be dropped."); - json_reader_end_member(root_reader); /* associated tags */ - if (json_build_list(&q, c, "associated_tags", i, tags, root_reader)) + if (json_build_list(&q, c, "associated_tags", i, tags, arg)) return -1; for (l = q.head; l; l = l->next) { @@ -1801,7 +1778,7 @@ static int json_link_tags(call_t *c, struct redis_list *tags, struct redis_list } g_queue_clear(&q); - if (json_build_ptra(ml->medias, c, "medias", i, medias, root_reader)) + if (json_build_ptra(ml->medias, c, "medias", i, medias, arg)) return -1; } @@ -1827,7 +1804,7 @@ static struct media_subscription *__find_media_subscriber(struct call_media *med } static int json_link_streams(call_t *c, struct redis_list *streams, - struct redis_list *sfds, struct redis_list *medias, JsonReader *root_reader) + struct redis_list *sfds, struct redis_list *medias, parser_arg arg) { unsigned int i; struct packet_stream *ps; @@ -1842,10 +1819,10 @@ static int json_link_streams(call_t *c, struct redis_list *streams, ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); - if (json_build_list(&ps->sfds, c, "stream_sfds", i, sfds, root_reader)) + if (json_build_list(&ps->sfds, c, "stream_sfds", i, sfds, arg)) return -1; - if (json_build_list(&q, c, "rtp_sinks", i, streams, root_reader)) + if (json_build_list(&q, c, "rtp_sinks", i, streams, arg)) return -1; for (l = q.head; l; l = l->next) { struct packet_stream *sink = l->data; @@ -1866,7 +1843,7 @@ static int json_link_streams(call_t *c, struct redis_list *streams, __add_sink_handler(&ps->rtp_sinks, sink, NULL); } - if (json_build_list(&q, c, "rtcp_sinks", i, streams, root_reader)) + if (json_build_list(&q, c, "rtcp_sinks", i, streams, arg)) return -1; for (l = q.head; l; l = l->next) { struct packet_stream *sink = l->data; @@ -1893,16 +1870,16 @@ static int json_link_streams(call_t *c, struct redis_list *streams, } static int json_link_medias(call_t *c, struct redis_list *medias, - struct redis_list *streams, struct redis_list *maps, JsonReader *root_reader) + struct redis_list *streams, struct redis_list *maps, parser_arg arg) { for (unsigned int i = 0; i < medias->len; i++) { struct call_media *med = medias->ptrs[i]; if (!med || !med->monologue) continue; - if (json_build_list(&med->streams, c, "streams", i, streams, root_reader)) + if (json_build_list(&med->streams, c, "streams", i, streams, arg)) return -1; - if (json_build_list(&med->endpoint_maps, c, "maps", i, maps, root_reader)) + if (json_build_list(&med->endpoint_maps, c, "maps", i, maps, arg)) return -1; if (med->media_id.s) @@ -1910,7 +1887,7 @@ static int json_link_medias(call_t *c, struct redis_list *medias, /* find the pair media to subscribe */ if (!json_build_list_cb(NULL, c, "media-subscriptions", med->unique_id, - medias, rbl_subs_cb, med, root_reader)) + medias, rbl_subs_cb, med, arg)) { rlog(LOG_DEBUG, "Restored media subscriptions for: '" STR_FORMAT_M "'", STR_FMT_M(&med->monologue->tag)); } @@ -1949,7 +1926,7 @@ static int rbl_cb_intf_sfds(str *s, callback_arg_t qp, struct redis_list *list, } static int json_link_maps(call_t *c, struct redis_list *maps, - struct redis_list *sfds, JsonReader *root_reader) + struct redis_list *sfds, parser_arg arg) { unsigned int i; struct endpoint_map *em; @@ -1958,41 +1935,39 @@ static int json_link_maps(call_t *c, struct redis_list *maps, em = maps->ptrs[i]; if (json_build_list_cb(&em->intf_sfds, c, "map_sfds", em->unique_id, sfds, - rbl_cb_intf_sfds, em, root_reader)) + rbl_cb_intf_sfds, em, arg)) return -1; } return 0; } -static int json_build_ssrc(struct call_monologue *ml, JsonReader *root_reader) { +static void json_build_ssrc_iter(const ng_parser_t *parser, parser_arg dict, helper_arg arg) { + struct call_monologue *ml = arg.ml; + + uint32_t ssrc = parser_get_ll(dict, "ssrc"); + struct ssrc_entry_call *se = get_ssrc(ssrc, ml->ssrc_hash); + if (!se) + return; + + atomic_set_na(&se->input_ctx.stats->ext_seq, parser_get_ll(dict, "in_srtp_index")); + atomic_set_na(&se->input_ctx.stats->rtcp_seq, parser_get_ll(dict, "in_srtcp_index")); + payload_tracker_add(&se->input_ctx.tracker, parser_get_ll(dict, "in_payload_type")); + atomic_set_na(&se->output_ctx.stats->ext_seq, parser_get_ll(dict, "out_srtp_index")); + atomic_set_na(&se->output_ctx.stats->rtcp_seq, parser_get_ll(dict, "out_srtcp_index")); + payload_tracker_add(&se->output_ctx.tracker, parser_get_ll(dict, "out_payload_type")); + + obj_put(&se->h); +} + +static int json_build_ssrc(struct call_monologue *ml, parser_arg arg) { char tmp[2048]; snprintf(tmp, sizeof(tmp), "ssrc_table-%u", ml->unique_id); - if (!json_reader_read_member(root_reader, "ssrc_table")) { + parser_arg list = redis_parser->dict_get_expect(arg, tmp, BENCODE_LIST); + if (!list.gen) { // non-fatal for backwards compatibility - json_reader_end_member(root_reader); return 0; } - int nmemb = json_reader_count_elements(root_reader); - for (int jidx=0; jidx < nmemb; ++jidx) { - if (!json_reader_read_element(root_reader, jidx)) - return -1; - - uint32_t ssrc = json_reader_get_ll(root_reader, "ssrc"); - struct ssrc_entry_call *se = get_ssrc(ssrc, ml->ssrc_hash); - if (!se) - goto next; - atomic_set_na(&se->input_ctx.stats->ext_seq, json_reader_get_ll(root_reader, "in_srtp_index")); - atomic_set_na(&se->input_ctx.stats->rtcp_seq, json_reader_get_ll(root_reader, "in_srtcp_index")); - payload_tracker_add(&se->input_ctx.tracker, json_reader_get_ll(root_reader, "in_payload_type")); - atomic_set_na(&se->output_ctx.stats->ext_seq, json_reader_get_ll(root_reader, "out_srtp_index")); - atomic_set_na(&se->output_ctx.stats->rtcp_seq, json_reader_get_ll(root_reader, "out_srtcp_index")); - payload_tracker_add(&se->output_ctx.tracker, json_reader_get_ll(root_reader, "out_payload_type")); - - obj_put(&se->h); -next: - json_reader_end_element(root_reader); - } - json_reader_end_member (root_reader); + redis_parser->list_iter(redis_parser, list, NULL, json_build_ssrc_iter, ml); return 0; } @@ -2007,7 +1982,7 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign) const char *err = 0; int i; atomic64 a64; - JsonReader *root_reader =0; + JsonNode *json_root = NULL; JsonParser *parser =0; mutex_lock(&r->lock); @@ -2025,11 +2000,11 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign) err = "could not parse JSON data"; if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL)) goto err1; - root_reader = json_reader_new (json_parser_get_root (parser)); + json_root = json_parser_get_root(parser); err = "could not read JSON data"; - if (!root_reader) + if (!json_root) goto err1; - + parser_arg root = {.json = json_root}; c = call_get_or_create(callid, false); err = "failed to create call struct"; @@ -2037,7 +2012,7 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign) goto err1; err = "'call' data incomplete"; - if (json_get_hash(&call, "json", -1, root_reader)) + if (json_get_hash(&call, "json", -1, root)) goto err2; err = "missing 'last signal' timestamp"; @@ -2056,19 +2031,19 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign) } err = "'tags' incomplete"; - if (json_get_list_hash(&tags, "tag", &call, "num_tags", root_reader)) + if (json_get_list_hash(&tags, "tag", &call, "num_tags", root)) goto err3; err = "'sfds' incomplete"; - if (json_get_list_hash(&sfds, "sfd", &call, "num_sfds", root_reader)) + if (json_get_list_hash(&sfds, "sfd", &call, "num_sfds", root)) goto err4; err = "'streams' incomplete"; - if (json_get_list_hash(&streams, "stream", &call, "num_streams", root_reader)) + if (json_get_list_hash(&streams, "stream", &call, "num_streams", root)) goto err5; err = "'medias' incomplete"; - if (json_get_list_hash(&medias, "media", &call, "num_medias", root_reader)) + if (json_get_list_hash(&medias, "media", &call, "num_medias", root)) goto err6; err = "'maps' incomplete"; - if (json_get_list_hash(&maps, "map", &call, "num_maps", root_reader)) + if (json_get_list_hash(&maps, "map", &call, "num_maps", root)) goto err7; err = "missing 'created' timestamp"; @@ -2102,10 +2077,10 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign) if (redis_streams(c, &streams)) goto err8; err = "failed to create tags"; - if (redis_tags(c, &tags, root_reader)) + if (redis_tags(c, &tags, root)) goto err8; err = "failed to create medias"; - if (json_medias(c, &medias, &tags, root_reader)) + if (json_medias(c, &medias, &tags, root)) goto err8; err = "failed to create maps"; if (redis_maps(c, &maps)) @@ -2115,16 +2090,16 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign) if (redis_link_sfds(&sfds, &streams)) goto err8; err = "failed to link streams"; - if (json_link_streams(c, &streams, &sfds, &medias, root_reader)) + if (json_link_streams(c, &streams, &sfds, &medias, root)) goto err8; err = "failed to link tags"; - if (json_link_tags(c, &tags, &medias, root_reader)) + if (json_link_tags(c, &tags, &medias, root)) goto err8; err = "failed to link medias"; - if (json_link_medias(c, &medias, &streams, &maps, root_reader)) + if (json_link_medias(c, &medias, &streams, &maps, root)) goto err8; err = "failed to link maps"; - if (json_link_maps(c, &maps, &sfds, root_reader)) + if (json_link_maps(c, &maps, &sfds, root)) goto err8; // presence of this key determines whether we were recording at all @@ -2168,8 +2143,6 @@ err3: err2: rwlock_unlock_w(&c->master_lock); err1: - if (root_reader) - g_object_unref (root_reader); if (parser) g_object_unref (parser); if (rr_jsonStr) diff --git a/include/control_ng.h b/include/control_ng.h index 54a025f8f..e2ab99d15 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -112,6 +112,8 @@ typedef union { str *strs; sdp_ng_flags *flags; void (**call_fn)(call_t *); + GHashTable *ht; + struct call_monologue *ml; void *generic; } helper_arg __attribute__ ((__transparent_union__));