MT#55283 support bencode format to Redis

Change-Id: Ib66b740558f16f59890cc1ce0f1fb444dee9ad41
pull/1853/head
Richard Fuchs 8 months ago
parent 81a7d177ef
commit 7bf431a1fa

@ -516,6 +516,7 @@ static void options(int *argc, char ***argv) {
bool nftables_status = false; bool nftables_status = false;
g_autoptr(char) nftables_family = NULL; g_autoptr(char) nftables_family = NULL;
#endif #endif
g_autoptr(char) redis_format = NULL;
GOptionEntry e[] = { GOptionEntry e[] = {
{ "table", 't', 0, G_OPTION_ARG_INT, &rtpe_config.kernel_table, "Kernel table to use", "INT" }, { "table", 't', 0, G_OPTION_ARG_INT, &rtpe_config.kernel_table, "Kernel table to use", "INT" },
@ -559,6 +560,7 @@ static void options(int *argc, char ***argv) {
{ "redis-disable-time", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_disable_time, "Number of seconds redis communication is disabled because of errors", "INT" }, { "redis-disable-time", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_disable_time, "Number of seconds redis communication is disabled because of errors", "INT" },
{ "redis-cmd-timeout", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_cmd_timeout, "Sets a timeout in milliseconds for redis commands", "INT" }, { "redis-cmd-timeout", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_cmd_timeout, "Sets a timeout in milliseconds for redis commands", "INT" },
{ "redis-connect-timeout", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_connect_timeout, "Sets a timeout in milliseconds for redis connections", "INT" }, { "redis-connect-timeout", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_connect_timeout, "Sets a timeout in milliseconds for redis connections", "INT" },
{ "redis-format", 0, 0, G_OPTION_ARG_STRING, &redis_format, "Format for persistent storage in Redis/KeyDB", "native|bencode|JSON" },
#if 0 #if 0
// temporarily disabled, see discussion on https://github.com/sipwise/rtpengine/commit/2ebf5a1526c1ce8093b3011a1e23c333b3f99400 // temporarily disabled, see discussion on https://github.com/sipwise/rtpengine/commit/2ebf5a1526c1ce8093b3011a1e23c333b3f99400
// related to Change-Id: I83d9b9a844f4f494ad37b44f5d1312f272beff3f // related to Change-Id: I83d9b9a844f4f494ad37b44f5d1312f272beff3f
@ -787,6 +789,17 @@ static void options(int *argc, char ***argv) {
} }
} }
if (redis_format) {
if (!strcasecmp(redis_format, "native"))
rtpe_config.redis_format = 0;
else if (!strcasecmp(redis_format, "bencode"))
rtpe_config.redis_format = REDIS_FORMAT_BENCODE;
else if (!strcasecmp(redis_format, "JSON"))
rtpe_config.redis_format = REDIS_FORMAT_JSON;
else
die("Invalid --redis-format value given");
}
parse_listen_list(&rtpe_config.tcp_listen_ep, listenps, "listen-tcp"); parse_listen_list(&rtpe_config.tcp_listen_ep, listenps, "listen-tcp");
parse_listen_list(&rtpe_config.udp_listen_ep, listenudps, "listen-udp"); parse_listen_list(&rtpe_config.udp_listen_ep, listenudps, "listen-udp");
parse_listen_list(&rtpe_config.ng_listen_ep, listenngs, "listen-ng"); parse_listen_list(&rtpe_config.ng_listen_ep, listenngs, "listen-ng");

@ -50,7 +50,11 @@ struct redis *rtpe_redis_write_disabled;
struct redis *rtpe_redis_notify; struct redis *rtpe_redis_notify;
static const ng_parser_t *redis_parser = &ng_parser_json; static __thread const ng_parser_t *redis_parser = &ng_parser_json;
static const ng_parser_t *const redis_format_parsers[__REDIS_FORMAT_MAX] = {
&ng_parser_native,
&ng_parser_json,
};
INLINE redisReply *redis_expect(int type, redisReply *r) { INLINE redisReply *redis_expect(int type, redisReply *r) {
@ -1983,7 +1987,9 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
int i; int i;
atomic64 a64; atomic64 a64;
JsonNode *json_root = NULL; JsonNode *json_root = NULL;
JsonParser *parser =0; JsonParser *parser = NULL;
bencode_item_t *benc_root = NULL;
bencode_buffer_t buf = {0};
mutex_lock(&r->lock); mutex_lock(&r->lock);
rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET " PB, PBSTR(callid)); rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET " PB, PBSTR(callid));
@ -1996,16 +2002,34 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
if (!rr_jsonStr) if (!rr_jsonStr)
goto err1; goto err1;
parser = json_parser_new();
err = "could not parse JSON data";
if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL))
goto err1;
json_root = json_parser_get_root(parser);
err = "could not read JSON data";
if (!json_root)
goto err1;
parser_arg root = {.json = json_root}; parser_arg root = {.json = json_root};
if (rr_jsonStr->str[0] == '{') {
parser = json_parser_new();
err = "could not parse JSON data";
if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL))
goto err1;
json_root = json_parser_get_root(parser);
err = "could not read JSON data";
if (!json_root)
goto err1;
root.json = json_root;
redis_parser = &ng_parser_json;
}
else if (rr_jsonStr->str[0] == 'd') {
int ret = bencode_buffer_init(&buf);
err = "failed to initialise bencode buffer";
if (ret)
goto err1;
err = "failed to decode bencode dictionary";
benc_root = bencode_decode_expect_str(&buf, &STR_LEN(rr_jsonStr->str, rr_jsonStr->len),
BENCODE_DICTIONARY);
if (!benc_root)
goto err1;
redis_parser = &ng_parser_native;
root.benc = benc_root;
}
c = call_get_or_create(callid, false); c = call_get_or_create(callid, false);
err = "failed to create call struct"; err = "failed to create call struct";
if (!c) if (!c)
@ -2147,6 +2171,7 @@ err1:
g_object_unref (parser); g_object_unref (parser);
if (rr_jsonStr) if (rr_jsonStr)
freeReplyObject(rr_jsonStr); freeReplyObject(rr_jsonStr);
bencode_buffer_free(&buf);
if (err) { if (err) {
mutex_lock(&r->lock); mutex_lock(&r->lock);
if (r->ctx && r->ctx->err) if (r->ctx && r->ctx->err)
@ -2291,37 +2316,37 @@ err:
int len = snprintf(tmp,sizeof(tmp), f, __VA_ARGS__); \ int len = snprintf(tmp,sizeof(tmp), f, __VA_ARGS__); \
char enc[len * 3 + 1]; \ char enc[len * 3 + 1]; \
str_uri_encode_len(enc, tmp, len); \ str_uri_encode_len(enc, tmp, len); \
redis_parser->list_add_str_dup(inner, &STR_NC(enc)); \ parser->list_add_str_dup(inner, &STR_NC(enc)); \
} while (0) } while (0)
#define JSON_SET_NSTRING(a,b,c,...) do { \ #define JSON_SET_NSTRING(a,b,c,...) do { \
int len = snprintf(tmp,sizeof(tmp), c, __VA_ARGS__); \ int len = snprintf(tmp,sizeof(tmp), c, __VA_ARGS__); \
char enc[len * 3 + 1]; \ char enc[len * 3 + 1]; \
str_uri_encode_len(enc, tmp, len); \ str_uri_encode_len(enc, tmp, len); \
snprintf(tmp,sizeof(tmp), a,b); \ snprintf(tmp,sizeof(tmp), a,b); \
redis_parser->dict_add_str_dup(inner, tmp, &STR_NC(enc)); \ parser->dict_add_str_dup(inner, tmp, &STR_NC(enc)); \
} while (0) } while (0)
#define JSON_SET_NSTRING_CSTR(a,b,d) JSON_SET_NSTRING_LEN(a, b, strlen(d), d) #define JSON_SET_NSTRING_CSTR(a,b,d) JSON_SET_NSTRING_LEN(a, b, strlen(d), d)
#define JSON_SET_NSTRING_LEN(a,b,l,d) do { \ #define JSON_SET_NSTRING_LEN(a,b,l,d) do { \
char enc[l * 3 + 1]; \ char enc[l * 3 + 1]; \
str_uri_encode_len(enc, d, l); \ str_uri_encode_len(enc, d, l); \
snprintf(tmp,sizeof(tmp), a,b); \ snprintf(tmp,sizeof(tmp), a,b); \
redis_parser->dict_add_str_dup(inner, tmp, &STR_NC(enc)); \ parser->dict_add_str_dup(inner, tmp, &STR_NC(enc)); \
} while (0) } while (0)
#define JSON_SET_SIMPLE(a,c,...) do { \ #define JSON_SET_SIMPLE(a,c,...) do { \
int len = snprintf(tmp,sizeof(tmp), c, __VA_ARGS__); \ int len = snprintf(tmp,sizeof(tmp), c, __VA_ARGS__); \
char enc[len * 3 + 1]; \ char enc[len * 3 + 1]; \
str_uri_encode_len(enc, tmp, len); \ str_uri_encode_len(enc, tmp, len); \
redis_parser->dict_add_str_dup(inner, a, &STR_NC(enc)); \ parser->dict_add_str_dup(inner, a, &STR_NC(enc)); \
} while (0) } while (0)
#define JSON_SET_SIMPLE_LEN(a,l,d) do { \ #define JSON_SET_SIMPLE_LEN(a,l,d) do { \
char enc[l * 3 + 1]; \ char enc[l * 3 + 1]; \
str_uri_encode_len(enc, d, l); \ str_uri_encode_len(enc, d, l); \
redis_parser->dict_add_str_dup(inner, a, &STR_NC(enc)); \ parser->dict_add_str_dup(inner, a, &STR_NC(enc)); \
} while (0) } while (0)
#define JSON_SET_SIMPLE_CSTR(a,d) redis_parser->dict_add_str_dup(inner, a, &STR(d)) #define JSON_SET_SIMPLE_CSTR(a,d) parser->dict_add_str_dup(inner, a, &STR(d))
#define JSON_SET_SIMPLE_STR(a,d) redis_parser->dict_add_str_dup(inner, a, d) #define JSON_SET_SIMPLE_STR(a,d) parser->dict_add_str_dup(inner, a, d)
static void json_update_crypto_params(parser_arg inner, const char *key, struct crypto_params *p) { static void json_update_crypto_params(const ng_parser_t *parser, parser_arg inner, const char *key, struct crypto_params *p) {
char tmp[2048]; char tmp[2048];
if (!p->crypto_suite) if (!p->crypto_suite)
@ -2339,7 +2364,7 @@ static void json_update_crypto_params(parser_arg inner, const char *key, struct
JSON_SET_NSTRING_LEN("%s-mki", key, p->mki_len, (char *) p->mki); JSON_SET_NSTRING_LEN("%s-mki", key, p->mki_len, (char *) p->mki);
} }
static int json_update_sdes_params(parser_arg inner, const char *pref, static int json_update_sdes_params(const ng_parser_t *parser, parser_arg inner, const char *pref,
unsigned int unique_id, unsigned int unique_id,
const char *k, sdes_q *q) const char *k, sdes_q *q)
{ {
@ -2356,7 +2381,7 @@ static int json_update_sdes_params(parser_arg inner, const char *pref,
return -1; return -1;
JSON_SET_NSTRING("%s_tag", key, "%u", cps->tag); JSON_SET_NSTRING("%s_tag", key, "%u", cps->tag);
json_update_crypto_params(inner, key, p); json_update_crypto_params(parser, inner, key, p);
snprintf(keybuf, sizeof(keybuf), "%s-%u", k, iter++); snprintf(keybuf, sizeof(keybuf), "%s-%u", k, iter++);
key = keybuf; key = keybuf;
@ -2365,7 +2390,7 @@ static int json_update_sdes_params(parser_arg inner, const char *pref,
return 0; return 0;
} }
static void json_update_dtls_fingerprint(parser_arg inner, const char *pref, static void json_update_dtls_fingerprint(const ng_parser_t *parser, parser_arg inner, const char *pref,
unsigned int unique_id, unsigned int unique_id,
const struct dtls_fingerprint *f) const struct dtls_fingerprint *f)
{ {
@ -2383,11 +2408,12 @@ static void json_update_dtls_fingerprint(parser_arg inner, const char *pref,
static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) { static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
char tmp[2048]; char tmp[2048];
const ng_parser_t *parser = ctx->parser;
parser_arg root = redis_parser->dict(ctx); parser_arg root = parser->dict(ctx);
{ {
parser_arg inner = redis_parser->dict_add_dict(root, "json"); parser_arg inner = parser->dict_add_dict(root, "json");
{ {
JSON_SET_SIMPLE("created","%lli", timeval_us(&c->created)); JSON_SET_SIMPLE("created","%lli", timeval_us(&c->created));
@ -2424,7 +2450,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
stream_fd *sfd = l->data; stream_fd *sfd = l->data;
snprintf(tmp, sizeof(tmp), "sfd-%u", sfd->unique_id); snprintf(tmp, sizeof(tmp), "sfd-%u", sfd->unique_id);
inner = redis_parser->dict_add_dict_dup(root, tmp); inner = parser->dict_add_dict_dup(root, tmp);
{ {
JSON_SET_SIMPLE_CSTR("pref_family", sfd->local_intf->logical->preferred_family->rfc_name); JSON_SET_SIMPLE_CSTR("pref_family", sfd->local_intf->logical->preferred_family->rfc_name);
@ -2434,7 +2460,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
JSON_SET_SIMPLE("local_intf_uid","%u", sfd->local_intf->unique_id); JSON_SET_SIMPLE("local_intf_uid","%u", sfd->local_intf->unique_id);
JSON_SET_SIMPLE("stream","%u", sfd->stream->unique_id); JSON_SET_SIMPLE("stream","%u", sfd->stream->unique_id);
json_update_crypto_params(inner, "", &sfd->crypto.params); json_update_crypto_params(parser, inner, "", &sfd->crypto.params);
} }
} // --- for } // --- for
@ -2446,7 +2472,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
LOCK(&ps->out_lock); LOCK(&ps->out_lock);
snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id); snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id);
inner = redis_parser->dict_add_dict_dup(root, tmp); inner = parser->dict_add_dict_dup(root, tmp);
{ {
JSON_SET_SIMPLE("media","%u",ps->media->unique_id); JSON_SET_SIMPLE("media","%u",ps->media->unique_id);
@ -2461,7 +2487,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
JSON_SET_SIMPLE("stats-bytes","%" PRIu64, atomic64_get_na(&ps->stats_in->bytes)); JSON_SET_SIMPLE("stats-bytes","%" PRIu64, atomic64_get_na(&ps->stats_in->bytes));
JSON_SET_SIMPLE("stats-errors","%" PRIu64, atomic64_get_na(&ps->stats_in->errors)); JSON_SET_SIMPLE("stats-errors","%" PRIu64, atomic64_get_na(&ps->stats_in->errors));
json_update_crypto_params(inner, "", &ps->crypto.params); json_update_crypto_params(parser, inner, "", &ps->crypto.params);
} }
// stream_sfds was here before // stream_sfds was here before
@ -2477,14 +2503,14 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
LOCK(&ps->out_lock); LOCK(&ps->out_lock);
snprintf(tmp, sizeof(tmp), "stream_sfds-%u", ps->unique_id); snprintf(tmp, sizeof(tmp), "stream_sfds-%u", ps->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (__auto_type k = ps->sfds.head; k; k = k->next) { for (__auto_type k = ps->sfds.head; k; k = k->next) {
stream_fd *sfd = k->data; stream_fd *sfd = k->data;
JSON_ADD_LIST_STRING("%u", sfd->unique_id); JSON_ADD_LIST_STRING("%u", sfd->unique_id);
} }
snprintf(tmp, sizeof(tmp), "rtp_sinks-%u", ps->unique_id); snprintf(tmp, sizeof(tmp), "rtp_sinks-%u", ps->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (__auto_type k = ps->rtp_sinks.head; k; k = k->next) { for (__auto_type k = ps->rtp_sinks.head; k; k = k->next) {
struct sink_handler *sh = k->data; struct sink_handler *sh = k->data;
struct packet_stream *sink = sh->sink; struct packet_stream *sink = sh->sink;
@ -2492,7 +2518,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
} }
snprintf(tmp, sizeof(tmp), "rtcp_sinks-%u", ps->unique_id); snprintf(tmp, sizeof(tmp), "rtcp_sinks-%u", ps->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (__auto_type k = ps->rtcp_sinks.head; k; k = k->next) { for (__auto_type k = ps->rtcp_sinks.head; k; k = k->next) {
struct sink_handler *sh = k->data; struct sink_handler *sh = k->data;
struct packet_stream *sink = sh->sink; struct packet_stream *sink = sh->sink;
@ -2505,7 +2531,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
struct call_monologue *ml = l->data; struct call_monologue *ml = l->data;
snprintf(tmp, sizeof(tmp), "tag-%u", ml->unique_id); snprintf(tmp, sizeof(tmp), "tag-%u", ml->unique_id);
inner = redis_parser->dict_add_dict_dup(root, tmp); inner = parser->dict_add_dict_dup(root, tmp);
{ {
@ -2566,7 +2592,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
// XXX these should all go into the above loop // XXX these should all go into the above loop
GList *k = g_hash_table_get_values(ml->associated_tags); GList *k = g_hash_table_get_values(ml->associated_tags);
snprintf(tmp, sizeof(tmp), "associated_tags-%u", ml->unique_id); snprintf(tmp, sizeof(tmp), "associated_tags-%u", ml->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (GList *m = k; m; m = m->next) { for (GList *m = k; m; m = m->next) {
struct call_monologue *ml2 = m->data; struct call_monologue *ml2 = m->data;
JSON_ADD_LIST_STRING("%u", ml2->unique_id); JSON_ADD_LIST_STRING("%u", ml2->unique_id);
@ -2575,7 +2601,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
g_list_free(k); g_list_free(k);
snprintf(tmp, sizeof(tmp), "medias-%u", ml->unique_id); snprintf(tmp, sizeof(tmp), "medias-%u", ml->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (unsigned int j = 0; j < ml->medias->len; j++) { for (unsigned int j = 0; j < ml->medias->len; j++) {
struct call_media *media = ml->medias->pdata[j]; struct call_media *media = ml->medias->pdata[j];
JSON_ADD_LIST_STRING("%u", media ? media->unique_id : -1); JSON_ADD_LIST_STRING("%u", media ? media->unique_id : -1);
@ -2585,10 +2611,10 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
rwlock_lock_r(&ml->ssrc_hash->lock); rwlock_lock_r(&ml->ssrc_hash->lock);
k = g_hash_table_get_values(ml->ssrc_hash->ht); k = g_hash_table_get_values(ml->ssrc_hash->ht);
snprintf(tmp, sizeof(tmp), "ssrc_table-%u", ml->unique_id); snprintf(tmp, sizeof(tmp), "ssrc_table-%u", ml->unique_id);
parser_arg list = redis_parser->dict_add_list_dup(root, tmp); parser_arg list = parser->dict_add_list_dup(root, tmp);
for (GList *m = k; m; m = m->next) { for (GList *m = k; m; m = m->next) {
struct ssrc_entry_call *se = m->data; struct ssrc_entry_call *se = m->data;
inner = redis_parser->list_add_dict(list); inner = parser->list_add_dict(list);
JSON_SET_SIMPLE("ssrc", "%" PRIu32, se->h.ssrc); JSON_SET_SIMPLE("ssrc", "%" PRIu32, se->h.ssrc);
// XXX use function for in/out // XXX use function for in/out
@ -2613,7 +2639,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
/* store media subscriptions */ /* store media subscriptions */
snprintf(tmp, sizeof(tmp), "media-subscriptions-%u", media->unique_id); snprintf(tmp, sizeof(tmp), "media-subscriptions-%u", media->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (__auto_type sub = media->media_subscriptions.head; sub; sub = sub->next) for (__auto_type sub = media->media_subscriptions.head; sub; sub = sub->next)
{ {
@ -2626,7 +2652,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
} }
snprintf(tmp, sizeof(tmp), "media-%u", media->unique_id); snprintf(tmp, sizeof(tmp), "media-%u", media->unique_id);
inner = redis_parser->dict_add_dict_dup(root, tmp); inner = parser->dict_add_dict_dup(root, tmp);
{ {
JSON_SET_SIMPLE("tag","%u", media->monologue->unique_id); JSON_SET_SIMPLE("tag","%u", media->monologue->unique_id);
@ -2650,11 +2676,11 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
if (media->bandwidth_rs >= 0) if (media->bandwidth_rs >= 0)
JSON_SET_SIMPLE("bandwidth_rs","%i", media->bandwidth_rs); JSON_SET_SIMPLE("bandwidth_rs","%i", media->bandwidth_rs);
json_update_sdes_params(inner, "media", media->unique_id, "sdes_in", json_update_sdes_params(parser, inner, "media", media->unique_id, "sdes_in",
&media->sdes_in); &media->sdes_in);
json_update_sdes_params(inner, "media", media->unique_id, "sdes_out", json_update_sdes_params(parser, inner, "media", media->unique_id, "sdes_out",
&media->sdes_out); &media->sdes_out);
json_update_dtls_fingerprint(inner, "media", media->unique_id, &media->fingerprint); json_update_dtls_fingerprint(parser, inner, "media", media->unique_id, &media->fingerprint);
} }
} // --- for medias.head } // --- for medias.head
@ -2664,21 +2690,21 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
struct call_media *media = l->data; struct call_media *media = l->data;
snprintf(tmp, sizeof(tmp), "streams-%u", media->unique_id); snprintf(tmp, sizeof(tmp), "streams-%u", media->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (__auto_type m = media->streams.head; m; m = m->next) { for (__auto_type m = media->streams.head; m; m = m->next) {
struct packet_stream *ps = m->data; struct packet_stream *ps = m->data;
JSON_ADD_LIST_STRING("%u", ps->unique_id); JSON_ADD_LIST_STRING("%u", ps->unique_id);
} }
snprintf(tmp, sizeof(tmp), "maps-%u", media->unique_id); snprintf(tmp, sizeof(tmp), "maps-%u", media->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (__auto_type m = media->endpoint_maps.head; m; m = m->next) { for (__auto_type m = media->endpoint_maps.head; m; m = m->next) {
struct endpoint_map *ep = m->data; struct endpoint_map *ep = m->data;
JSON_ADD_LIST_STRING("%u", ep->unique_id); JSON_ADD_LIST_STRING("%u", ep->unique_id);
} }
snprintf(tmp, sizeof(tmp), "payload_types-%u", media->unique_id); snprintf(tmp, sizeof(tmp), "payload_types-%u", media->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (__auto_type m = media->codecs.codec_prefs.head; m; m = m->next) { for (__auto_type m = media->codecs.codec_prefs.head; m; m = m->next) {
rtp_payload_type *pt = m->data; rtp_payload_type *pt = m->data;
JSON_ADD_LIST_STRING("%u/" STR_FORMAT "/%u/" STR_FORMAT "/" STR_FORMAT "/%i/%i", JSON_ADD_LIST_STRING("%u/" STR_FORMAT "/%u/" STR_FORMAT "/" STR_FORMAT "/%i/%i",
@ -2692,7 +2718,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
struct endpoint_map *ep = l->data; struct endpoint_map *ep = l->data;
snprintf(tmp, sizeof(tmp), "map-%u", ep->unique_id); snprintf(tmp, sizeof(tmp), "map-%u", ep->unique_id);
inner = redis_parser->dict_add_dict_dup(root, tmp); inner = parser->dict_add_dict_dup(root, tmp);
{ {
JSON_SET_SIMPLE("wildcard","%i", ep->wildcard); JSON_SET_SIMPLE("wildcard","%i", ep->wildcard);
@ -2710,7 +2736,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
struct endpoint_map *ep = l->data; struct endpoint_map *ep = l->data;
snprintf(tmp, sizeof(tmp), "map_sfds-%u", ep->unique_id); snprintf(tmp, sizeof(tmp), "map_sfds-%u", ep->unique_id);
inner = redis_parser->dict_add_list_dup(root, tmp); inner = parser->dict_add_list_dup(root, tmp);
for (__auto_type m = ep->intf_sfds.head; m; m = m->next) { for (__auto_type m = ep->intf_sfds.head; m; m = m->next) {
struct sfd_intf_list *il = m->data; struct sfd_intf_list *il = m->data;
JSON_ADD_LIST_STRING("loc-%u", il->local_intf->unique_id); JSON_ADD_LIST_STRING("loc-%u", il->local_intf->unique_id);
@ -2724,7 +2750,7 @@ static str redis_encode_json(ng_parser_ctx_t *ctx, call_t *c) {
} }
str ret; str ret;
redis_parser->collapse(ctx, root, &ret); parser->collapse(ctx, root, &ret);
return ret; return ret;
} }
@ -2754,8 +2780,11 @@ void redis_update_onekey(call_t *c, struct redis *r) {
goto err; goto err;
} }
ng_parser_ctx_t ctx = {.parser = redis_parser}; ng_parser_ctx_t ctx = {.parser = redis_format_parsers[rtpe_config.redis_format]};
ctx.ngbuf = ng_buffer_new(NULL); ctx.ngbuf = ng_buffer_new(NULL); // XXX make conditional
int ret = bencode_buffer_init(&ctx.ngbuf->buffer); // XXX make conditional and/or optimise
if (ret)
goto err;
str result = redis_encode_json(&ctx, c); str result = redis_encode_json(&ctx, c);
if (!result.len) if (!result.len)

@ -599,6 +599,16 @@ call to inject-DTMF won't be sent to __\-\-dtmf-log-dest=__ or __\-\-listen-tcp-
The default value for the connection timeout is 1000ms. The default value for the connection timeout is 1000ms.
This parameter can also be set or listed via __rtpengine-ctl__. This parameter can also be set or listed via __rtpengine-ctl__.
- __\-\-redis-format=bencode__\|__JSON__
Selects the format for serialised call data written to Redis or KeyDB. The
old default (and previously only option) was as a JSON object. The new
default is using *bencode* formatting. Using *bencode* has the benefit of
yielding better performance and lower CPU usage, while making the data less
human readable.
Both formats can be restored from, regardless of this setting.
- __-b__, __\-\-b2b-url=__*STRING* - __-b__, __\-\-b2b-url=__*STRING*
Enables and sets the URI for an XMLRPC callback to be made when a call is Enables and sets the URI for an XMLRPC callback to be made when a call is

@ -90,6 +90,12 @@ struct rtpengine_config {
int redis_delete_async_interval; int redis_delete_async_interval;
char *redis_auth; char *redis_auth;
char *redis_write_auth; char *redis_write_auth;
enum {
REDIS_FORMAT_BENCODE = 0,
REDIS_FORMAT_JSON,
__REDIS_FORMAT_MAX
} redis_format;
gboolean active_switchover; gboolean active_switchover;
int num_threads; int num_threads;
int media_num_threads; int media_num_threads;

@ -100,7 +100,7 @@ include ../lib/common.Makefile
.PHONY: all-tests unit-tests daemon-tests daemon-tests \ .PHONY: all-tests unit-tests daemon-tests daemon-tests \
daemon-tests-main daemon-tests-jb daemon-tests-dtx daemon-tests-dtx-cn daemon-tests-pubsub \ daemon-tests-main daemon-tests-jb daemon-tests-dtx daemon-tests-dtx-cn daemon-tests-pubsub \
daemon-tests-intfs daemon-tests-stats daemon-tests-delay-buffer daemon-tests-delay-timing \ daemon-tests-intfs daemon-tests-stats daemon-tests-delay-buffer daemon-tests-delay-timing \
daemon-tests-evs daemon-tests-player-cache daemon-tests-redis daemon-tests-evs daemon-tests-player-cache daemon-tests-redis daemon-tests-redis-json
TESTS= test-bitstr aes-crypt aead-aes-crypt test-const_str_hash.strhash TESTS= test-bitstr aes-crypt aead-aes-crypt test-const_str_hash.strhash
ifeq ($(with_transcoding),yes) ifeq ($(with_transcoding),yes)
@ -138,7 +138,7 @@ daemon-tests: daemon-tests-main daemon-tests-jb daemon-tests-pubsub daemon-tests
daemon-tests-evs daemon-tests-async-tc \ daemon-tests-evs daemon-tests-async-tc \
daemon-tests-audio-player daemon-tests-audio-player-play-media \ daemon-tests-audio-player daemon-tests-audio-player-play-media \
daemon-tests-intfs daemon-tests-stats daemon-tests-player-cache daemon-tests-redis \ daemon-tests-intfs daemon-tests-stats daemon-tests-player-cache daemon-tests-redis \
daemon-tests-rtpp-flags daemon-tests-rtpp-flags daemon-tests-redis-json
daemon-test-deps: tests-preload.so daemon-test-deps: tests-preload.so
$(MAKE) -C ../daemon $(MAKE) -C ../daemon
@ -182,6 +182,9 @@ daemon-tests-player-cache: daemon-test-deps
daemon-tests-redis: daemon-test-deps daemon-tests-redis: daemon-test-deps
./auto-test-helper "$@" perl -I../perl auto-daemon-tests-redis.pl ./auto-test-helper "$@" perl -I../perl auto-daemon-tests-redis.pl
daemon-tests-redis-json: daemon-test-deps
./auto-test-helper "$@" perl -I../perl auto-daemon-tests-redis-json.pl
daemon-tests-audio-player: daemon-test-deps daemon-tests-audio-player: daemon-test-deps
./auto-test-helper "$@" perl -I../perl auto-daemon-tests-audio-player.pl ./auto-test-helper "$@" perl -I../perl auto-daemon-tests-audio-player.pl

File diff suppressed because it is too large Load Diff

@ -8,7 +8,7 @@ use NGCP::Rtpengine::AutoTest;
use Test::More; use Test::More;
use Test2::Tools::Compare qw(like); use Test2::Tools::Compare qw(like);
use Socket qw(AF_INET SOCK_STREAM sockaddr_in pack_sockaddr_in inet_aton); use Socket qw(AF_INET SOCK_STREAM sockaddr_in pack_sockaddr_in inet_aton);
use JSON; use Bencode;
use Data::Dumper; use Data::Dumper;
$Data::Dumper::Sortkeys = 1; $Data::Dumper::Sortkeys = 1;
@ -74,7 +74,7 @@ $NGCP::Rtpengine::req_cb = sub {
alarm(1); alarm(1);
recv($redis_fd, $buf, $len, 0) or die; recv($redis_fd, $buf, $len, 0) or die;
alarm(0); alarm(0);
my $json = decode_json($buf); my $json = Bencode::bdecode($buf, 1);
#print Dumper($json); #print Dumper($json);
like($json, $json_exp, "JSON"); like($json, $json_exp, "JSON");
redis_io("\r\n*3\r\n\$6\r\nEXPIRE\r\n\$" . length(cid()) . "\r\n" . cid() . "\r\n\$5\r\n86400\r\n", redis_io("\r\n*3\r\n\$6\r\nEXPIRE\r\n\$" . length(cid()) . "\r\n" . cid() . "\r\n\$5\r\n86400\r\n",

@ -21,16 +21,44 @@ def conv(e):
return e return e
def benc_enc(msg):
return fastbencode.bencode(conv(msg))
def json_enc(msg):
return bytes(json.dumps(msg), "ASCII")
addr = "127.0.0.1" addr = "127.0.0.1"
port = 2223 port = 2223
fmt = "bencode" fmt = "bencode"
iters = 200000 iters = 200
cmd = "statistics" requests = 5000
cmd = "offer"
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if cmd == "answer": if cmd == "offer":
msg = {
"command": "offer",
"from-tag": "bar",
"to-tag": "meh",
"sdp": """
v=0
o=- 1695296331 1695296331 IN IP4 192.168.1.202
s=-
t=0 0
c=IN IP4 192.168.1.202
m=audio 45825 RTP/AVP 0 8 101
a=rtpmap:0 PCMU/8000/1
a=rtpmap:8 PCMA/8000/1
a=rtpmap:101 telephone-event/8000
a=sendrecv
""",
"replace": ["origin"],
}
elif cmd == "answer":
msg = { msg = {
"command": "answer", "command": "answer",
"call-id": "foo", "call-id": "foo",
@ -66,16 +94,42 @@ if cmd == "answer":
elif cmd == "statistics": elif cmd == "statistics":
msg = {"command": "statistics"} msg = {"command": "statistics"}
enc = None
if fmt == "json": if fmt == "json":
enc = bytes(json.dumps(msg), "ASCII") fn = json_enc
elif fmt == "bencode": elif fmt == "bencode":
enc = fastbencode.bencode(conv(msg)) fn = benc_enc
else: else:
raise raise
if "call-id" in msg:
enc = fn(msg)
for _ in range(iters): for _ in range(iters):
packet = base64.b64encode(random.randbytes(6)) + b" " + enc call_ids = []
sock.sendto(packet, (addr, port))
sock.recvfrom(4096) for _ in range(requests):
if enc:
packet = base64.b64encode(random.randbytes(6)) + b" " + enc
else:
call_id = str(base64.b64encode(random.randbytes(6)))
call_ids.append(call_id)
packet = (
base64.b64encode(random.randbytes(6))
+ b" "
+ fn({**msg, "call-id": call_id})
)
sock.sendto(packet, (addr, port))
sock.recvfrom(4096)
for call_id in call_ids:
packet = (
base64.b64encode(random.randbytes(6))
+ b" "
+ fn({"command": "delete", "call-id": call_id, "delete-delay": 0})
)
sock.sendto(packet, (addr, port))
sock.recvfrom(4096)
print("done") print("done")

Loading…
Cancel
Save