diff --git a/README.md b/README.md index bbed85197..73d7c7c02 100644 --- a/README.md +++ b/README.md @@ -175,7 +175,8 @@ option and which are reproduced below: -w, --redis-write=[PW@]IP:PORT/INT Connect to Redis write database -k, --subscribe-keyspace Subscription keyspace list --redis-num-threads=INT Number of Redis restore threads - --redis-expires=INT Expire time in seconds for redis keys + --redis-expires=INT Expire time in seconds for redis keys + --redis-multikey=INT Use multiple redis keys for storing the call (old behaviour) DEPRECATED -q, --no-redis-required Start even if can't connect to redis databases -b, --b2b-url=STRING XMLRPC URL of B2B UA -L, --log-level=INT Mask log priorities above this level @@ -426,6 +427,10 @@ The options are described in more detail below. Expire time in seconds for redis keys. Default is 86400. +* --redis-multikey + + Use multiple redis keys for storing the call (old behaviour) DEPRECATED + * -q, --no-redis-required When this paramter is present or NO_REDIS_REQUIRED='yes' or '1' in config file, rtpengine starts even if there is no initial connection to redis databases(either to -r or to -w or to both redis). diff --git a/daemon/Makefile b/daemon/Makefile index 7eca49501..9843099a7 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -8,6 +8,7 @@ CFLAGS+= `pkg-config --cflags zlib` CFLAGS+= `pkg-config --cflags openssl` CFLAGS+= `pkg-config --cflags libevent_pthreads` CFLAGS+= `pcre-config --cflags` +CFLAGS+= `pkg-config --cflags json-glib-1.0` CFLAGS+= -I. -I../kernel-module/ -I../lib/ CFLAGS+= -D_GNU_SOURCE @@ -30,6 +31,7 @@ LDFLAGS+= -lpcap LDFLAGS+= `pcre-config --libs` LDFLAGS+= `xmlrpc-c-config client --libs` LDFLAGS+= -lhiredis +LDFLAGS+= `pkg-config --libs json-glib-1.0` include ../lib/lib.Makefile diff --git a/daemon/call.c b/daemon/call.c index c3b0a10d9..fc08a4390 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -594,8 +594,13 @@ static void callmaster_timer(void *ptr) { rwlock_unlock_r(&sfd->call->master_lock); - if (update) - redis_update(ps->call, m->conf.redis_write); + if (update) { + if (m->conf.redis_multikey) { + redis_update(ps->call, m->conf.redis_write); + } else { + redis_update_onekey(ps->call, m->conf.redis_write); + } + } next: g_hash_table_remove(hlp.addr_sfd, &ep); diff --git a/daemon/call.h b/daemon/call.h index d8f19c7aa..2dc84b3b9 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -5,7 +5,8 @@ /* XXX split everything into call_signalling.[ch] and call_packets.[ch] or w/e */ - +#include +#include #include #include @@ -443,6 +444,7 @@ struct call { unsigned int foreign_call; // created_via_redis_notify call struct recording *recording; + JsonReader *root_reader; }; struct callmaster_config { @@ -460,7 +462,8 @@ struct callmaster_config { struct event_base *redis_notify_event_base; GQueue *redis_subscribed_keyspaces; struct redisAsyncContext *redis_notify_async_context; - unsigned int redis_expires_secs; + unsigned int redis_expires_secs; + unsigned int redis_multikey; char *b2b_url; unsigned char default_tos; enum xmlrpc_format fmt; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index e08e708f4..8bd2fc992 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -186,7 +186,11 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); - redis_update(c, m->conf.redis_write); + if (m->conf.redis_multikey) { + redis_update(c, m->conf.redis_write); + } else { + redis_update_onekey(c, m->conf.redis_write); + } gettimeofday(&(monologue->started), NULL); @@ -334,7 +338,11 @@ out2: rwlock_unlock_w(&c->master_lock); streams_free(&s); - redis_update(c, m->conf.redis_write); + if (m->conf.redis_multikey) { + redis_update(c, m->conf.redis_write); + } else { + redis_update_onekey(c, m->conf.redis_write); + } ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); obj_put(c); @@ -764,10 +772,16 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster } rwlock_unlock_w(&call->master_lock); - if (!flags.no_redis_update) - redis_update(call, m->conf.redis_write); - else + + if (!flags.no_redis_update) { + if (m->conf.redis_multikey) { + redis_update(call, m->conf.redis_write); + } else { + redis_update_onekey(call,m->conf.redis_write); + } + } else { ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag"); + } obj_put(call); gettimeofday(&(monologue->started), NULL); diff --git a/daemon/main.c b/daemon/main.c index 32698170d..586310f43 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -62,6 +62,7 @@ static unsigned int timeout; static unsigned int silent_timeout; static unsigned int final_timeout; static unsigned int redis_expires = 86400; +static unsigned int redis_multikey = 0; static int port_min = 30000; static int port_max = 40000; static int max_sessions = -1; @@ -298,6 +299,7 @@ static void options(int *argc, char ***argv) { { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" }, { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" }, { "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" }, + { "redis-multikey", 0, 0, G_OPTION_ARG_NONE, &redis_multikey, "Use multiple redis keys for storing the call (old behaviour) DEPRECATED", "INT" }, { "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL }, { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"}, @@ -502,6 +504,11 @@ static void init_everything() { #if !GLIB_CHECK_VERSION(2,32,0) g_thread_init(NULL); #endif + +#if !(GLIB_CHECK_VERSION(2,36,0)) + g_type_init(); +#endif + if (!_log_stderr) openlog("rtpengine", LOG_PID | LOG_NDELAY, _log_facility); signals(); @@ -621,6 +628,7 @@ no_kernel: } mc.redis_expires_secs = redis_expires; + mc.redis_multikey = redis_multikey; ctx->m->conf = mc; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 5195c6a6c..ffd482029 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1471,8 +1471,13 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { out: ca = sfd->call ? : NULL; - if (ca && update) - redis_update(ca, ca->callmaster->conf.redis_write); + if (ca && update) { + if (ca->callmaster->conf.redis_multikey) { + redis_update(ca, ca->callmaster->conf.redis_write); + } else { + redis_update_onekey(ca, ca->callmaster->conf.redis_write); + } + } done: log_info_clear(); } diff --git a/daemon/redis.c b/daemon/redis.c index 49fb976fc..a6c7ffd78 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -24,11 +24,11 @@ #include "dtls.h" #include "recording.h" #include "rtplib.h" +#include "str.h" - - - - +#include +#include +#include INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) @@ -62,9 +62,23 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) #define STR(x) (x)->s, (size_t) (x)->len #define STR_R(x) (x)->str, (size_t) (x)->len #define S_LEN(s,l) (s), (size_t) (l) +#define STRSTR(x) (x)->s #endif +static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id, enum call_type); +static int redis_check_conn(struct redis *r); +static void json_restore_call(struct redis *r, struct callmaster *m, redisReply *id, enum call_type type); + +static int str_cut(char *str, int begin, int len) { + int l = strlen(str); + + if (len < 0) len = l - begin; + if (begin + len > l) len = l - begin; + memmove(str + begin, str + begin + len, l - len + 1); + + return len; +} static void redis_pipe(struct redis *r, const char *fmt, ...) { va_list ap; @@ -105,7 +119,6 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) { } - /* called with r->lock held */ static int redis_check_type(struct redis *r, char *key, char *suffix, char *type) { redisReply *rp; @@ -124,8 +137,6 @@ static int redis_check_type(struct redis *r, char *key, char *suffix, char *type } - - /* called with r->lock held */ static void redis_consume(struct redis *r) { redisReply *rp; @@ -138,8 +149,6 @@ static void redis_consume(struct redis *r) { } - - /* called with r->lock held if necessary */ static int redis_connect(struct redis *r, int wait) { struct timeval tv; @@ -233,20 +242,8 @@ err: return -1; } -int str_cut(char *str, int begin, int len) { - int l = strlen(str); - - if (len < 0) len = l - begin; - if (begin + len > l) len = l - begin; - memmove(str + begin, str + begin + len, l - len + 1); - - return len; -} - -static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id, enum call_type); -static int redis_check_conn(struct redis *r); -void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { +void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) { struct callmaster *cm = privdata; struct redis *r = 0; @@ -258,7 +255,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { // sanity checks if (!cm) { - rlog(LOG_ERROR, "Struct callmaster is NULL on onRedisNotification"); + rlog(LOG_ERROR, "Struct callmaster is NULL on on_redis_notification"); return; } @@ -283,13 +280,22 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { if (rr->elements != 4) goto err; - char *pch = strstr(rr->element[2]->str, "notifier-"); - if (pch == NULL) { - rlog(LOG_ERROR,"Redis-Notifier: The substring 'notifier-' has not been found in the redis notification !\n"); - goto err; + char *pch =0; + if (cm->conf.redis_multikey) { + pch = strstr(rr->element[2]->str, "notifier-"); + if (pch == NULL) { + rlog(LOG_ERROR,"Redis-Notifier: The prefix 'notifier-' to determine the redis key has not been found in the redis notification !\n"); + goto err; + } + pch += strlen("notifier-"); + } else { + pch = strstr(rr->element[2]->str, "json-"); + if (pch == NULL) { + rlog(LOG_ERROR,"Redis-Notifier: The prefix 'json-' to determine the redis key has not been found in the redis notification !\n"); + goto err; + } } - // extract from __keyspace@__ prefix p = strstr(rr->element[2]->str, "@"); ++p; @@ -312,16 +318,34 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } - pch += strlen("notifier-"); str_cut(rr->element[2]->str,0,pch-rr->element[2]->str); rr->element[2]->len = strlen(rr->element[2]->str); rlog(LOG_DEBUG,"Redis-Notifier:%s:%d: Processing call with callid: %s\n", rr->element[3]->str, r->db, rr->element[2]->str); - str_init(&callid,rr->element[2]->str); + // strip off prefix from callid + if (cm->conf.redis_multikey) { + str_init_len(&callid,rr->element[2]->str+strlen("notifier-"),strlen(rr->element[2]->str)-strlen("notifier-")); + } else { + str_init_len(&callid,rr->element[2]->str+strlen("json-"),strlen(rr->element[2]->str)-strlen("json-")); + } - if (strncmp(rr->element[3]->str,"sadd",4)==0) + if (cm->conf.redis_multikey && strncmp(rr->element[3]->str,"sadd",4)==0) redis_restore_call(r, cm, rr->element[2], CT_FOREIGN_CALL); + if (!cm->conf.redis_multikey && strncmp(rr->element[3]->str,"set",3)==0) { + c = call_get(&callid, cm); + if (c) { + rwlock_unlock_w(&c->master_lock); + if (IS_FOREIGN_CALL(c)) + call_destroy(c); + else { + rlog(LOG_WARN, "Redis-Notifier: Ignoring SET received for OWN call: %s\n", rr->element[2]->str); + goto err; + } + } + json_restore_call(r, cm, rr->element[2], CT_FOREIGN_CALL); + } + if (strncmp(rr->element[3]->str,"del",3)==0) { c = call_get(&callid, cm); if (!c) { @@ -329,6 +353,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } rwlock_unlock_w(&c->master_lock); + if (!IS_FOREIGN_CALL(c)) { + rlog(LOG_WARN, "Redis-Notifier: Ignoring DEL received for an OWN call: %s\n", rr->element[2]->str); + goto err; + } call_destroy(c); } @@ -458,21 +486,22 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a return -1; } - switch (action) { + if (cm->conf.redis_multikey) { + switch (action) { case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, "psubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on SUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, "punsubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void *) cm, "punsubscribe") != REDIS_OK) { + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_ALL"); return -1; } @@ -480,6 +509,31 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a default: rlog(LOG_ERROR, "No subscribe action found: %d", action); return -1; + } + } else { + switch (action) { + case SUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i*:json-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i*:json-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_ALL: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); + return -1; + } + break; + default: + rlog(LOG_ERROR, "No subscribe action found: %d", action); + return -1; + } } return 0; @@ -647,7 +701,6 @@ err: } - static void redis_close(struct redis *r) { if (r->ctx) redisFree(r->ctx); @@ -656,7 +709,6 @@ static void redis_close(struct redis *r) { } - /* must be called with r->lock held */ static int redis_check_conn(struct redis *r) { // try redis connection @@ -698,6 +750,12 @@ static void redis_delete_list(struct redis *r, const str *callid, const char *pr redis_pipe(r, "DEL %s-"PB"-%u", prefix, STR(callid), i); } +/* called with r->lock held and c->master_lock held */ +static void redis_delete_call_json(struct call *c, struct redis *r) { + redis_pipe(r, "DEL json-"PB"", STR(&c->callid)); + redis_consume(r); +} + /* called with r->lock held and c->master_lock held */ static void redis_delete_call(struct call *c, struct redis *r) { redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); @@ -719,7 +777,101 @@ static void redis_delete_call(struct call *c, struct redis *r) { redis_consume(r); } +static void json_builder_add_string_value_uri_enc(JsonBuilder *builder, char* tmp) { + json_builder_add_string_value(builder,g_uri_escape_string(tmp,NULL,0)); +} +static void json_builder_set_member_name_uri_enc(JsonBuilder *builder, char* tmp) { + json_builder_set_member_name(builder,g_uri_escape_string(tmp,NULL,0)); +} +static gboolean json_reader_read_member_uri_enc(JsonReader *root_reader, char* tmp) { + return json_reader_read_member(root_reader, g_uri_unescape_string(tmp,NULL)); +} +static char* json_reader_get_string_value_uri_enc(JsonReader *root_reader) { + return (char*)json_reader_get_string_value(root_reader); +} + +// stolen from libhiredis +/* Create a reply object */ +INLINE redisReply *createReplyObject(int type) { + redisReply *r = calloc(1,sizeof(*r)); + + if (r == NULL) + return NULL; + + r->type = type; + return r; +} + +static int json_get_hash(struct redis_hash *out, struct call* c, + const char *key, const redisReply *which, + unsigned int id) +{ + static unsigned int MAXKEYLENGTH = 512; + char key_concatted[MAXKEYLENGTH]; + int rc=0; + + if (!c) + goto err; + + if (id == -1) { + rc = snprintf(key_concatted, MAXKEYLENGTH, "%s-%s",key,which->str); + } else { + rc = snprintf(key_concatted, MAXKEYLENGTH, "%s-%s-%u",key,which->str,id); + } + if (rc>=MAXKEYLENGTH) + rlog(LOG_ERROR,"Json key too long."); + + + if (!json_reader_read_member_uri_enc(c->root_reader, key_concatted)) { + rlog(LOG_ERROR, "Could not read json member: %s",key_concatted); + goto err; + } + + out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); + if (!out->ht) + goto err; + out->rr = 0; + + gchar **members = json_reader_list_members(c->root_reader); + gchar **orig_members = members; + + for (int i=0; i < json_reader_count_members (c->root_reader); ++i) { + + if (!json_reader_read_member_uri_enc (c->root_reader, *members)) { + rlog(LOG_ERROR, "Could not read json member: %s",*members); + goto err3; + } + out->rr = createReplyObject(REDIS_REPLY_STRING); + if (!out->rr) { + rlog(LOG_ERROR, "Could not create redis reply object (out of memory?)"); + goto err3; + } + out->rr->str = json_reader_get_string_value_uri_enc(c->root_reader); + out->rr->len = strlen(out->rr->str); + + char* tmp = strdup(*members); + + if (g_hash_table_insert_check(out->ht, tmp, out->rr) != TRUE) { + goto err3; + } + + json_reader_end_member(c->root_reader); + + ++members; + } // for + g_strfreev(orig_members); + json_reader_end_member (c->root_reader); + + return 0; +err3: + g_strfreev(members); + if (out->rr) + free(out->rr); + g_hash_table_destroy(out->ht); +err: + return -1; +} static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const redisReply *which, @@ -751,7 +903,8 @@ static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *k return 0; err3: - freeReplyObject(out->rr); + if (out->rr) + freeReplyObject(out->rr); err2: g_hash_table_destroy(out->ht); err: @@ -760,7 +913,9 @@ err: static void redis_destroy_hash(struct redis_hash *rh) { - freeReplyObject(rh->rr); + + if (rh->rr) + freeReplyObject(rh->rr); g_hash_table_destroy(rh->ht); } static void redis_destroy_list(struct redis_list *rl) { @@ -773,7 +928,19 @@ static void redis_destroy_list(struct redis_list *rl) { free(rl->ptrs); } +static void json_destroy_hash(struct redis_hash *rh) { + g_hash_table_destroy(rh->ht); +} + +static void json_destroy_list(struct redis_list *rl) { + unsigned int i; + for (i = 0; i < rl->len; i++) { + json_destroy_hash(&rl->rh[i]); + } + free(rl->rh); + free(rl->ptrs); +} static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *k) { redisReply *r; @@ -876,6 +1043,36 @@ static void *redis_list_get_ptr(struct redis_list *list, struct redis_hash *rh, return NULL; return redis_list_get_idx_ptr(list, idx); } + +static int json_build_list_cb(GQueue *q, struct call *c, const char *key, const str *callid, + unsigned int idx, struct redis_list *list, + int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) +{ + str s; + char key_concatted[256]; + + snprintf(key_concatted, 256, "%s-%s-%u",key,callid->s,idx); + + if (!json_reader_read_member_uri_enc (c->root_reader, key_concatted)) + rlog(LOG_ERROR,"Key in json not found:%s",key_concatted); + for (int jidx=0; jidx < json_reader_count_elements(c->root_reader); ++jidx) { + if (!json_reader_read_element(c->root_reader,jidx)) + rlog(LOG_ERROR,"Element in array not found."); + const char* value = json_reader_get_string_value_uri_enc(c->root_reader); + if (!value) + rlog(LOG_ERROR,"String in json not found."); + str_init_len(&s, (char*)value , strlen(value)); + if (cb(&s, q, list, ptr)) { + return -1; + } + json_reader_end_element(c->root_reader); + } + json_reader_end_member (c->root_reader); + + return 0; +} + + static int redis_build_list_cb(GQueue *q, struct redis *r, const char *key, const str *callid, unsigned int idx, struct redis_list *list, int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) @@ -909,11 +1106,53 @@ static int rbl_cb_simple(str *s, GQueue *q, struct redis_list *list, void *ptr) g_queue_push_tail(q, redis_list_get_idx_ptr(list, (unsigned) j)); return 0; } + +static int json_build_list(GQueue *q, struct call *c, const char *key, const str *callid, + unsigned int idx, struct redis_list *list) +{ + return json_build_list_cb(q, c, key, callid, idx, list, rbl_cb_simple, NULL); +} + static int redis_build_list(GQueue *q, struct redis *r, const char *key, const str *callid, unsigned int idx, struct redis_list *list) { return redis_build_list_cb(q, r, key, callid, idx, list, rbl_cb_simple, NULL); } + +static int json_get_list_hash(struct redis_list *out, struct call* c, + const char *key, const redisReply *id, + const struct redis_hash *rh, const char *rh_num_key) +{ + unsigned int i; + + if (redis_hash_get_unsigned(&out->len, rh, rh_num_key)) + return -1; + out->rh = malloc(sizeof(*out->rh) * out->len); + memset(out->rh,0,sizeof(*out->rh) * out->len); + if (!out->rh) + return -1; + out->ptrs = malloc(sizeof(*out->ptrs) * out->len); + if (!out->ptrs) + goto err1; + + for (i = 0; i < out->len; i++) { + if (json_get_hash(&out->rh[i], c, key, id, i)) + goto err2; + } + + return 0; + +err2: + free(out->ptrs); + while (i) { + i--; + json_destroy_hash(&out->rh[i]); + } +err1: + free(out->rh); + return -1; +} + static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const redisReply *id, const struct redis_hash *rh, const char *rh_num_key) { @@ -998,8 +1237,6 @@ static int redis_hash_get_crypto_context(struct crypto_context *out, const struc return 0; } - - static int redis_sfds(struct call *c, struct redis_list *sfds) { unsigned int i; str family, intf_name; @@ -1243,6 +1480,35 @@ static int redis_link_sfds(struct redis_list *sfds, struct redis_list *streams) return 0; } +static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_list *medias) +{ + unsigned int i; + struct call_monologue *ml, *other_ml; + GQueue q = G_QUEUE_INIT; + GList *l; + + for (i = 0; i < tags->len; i++) { + ml = tags->ptrs[i]; + + ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); + + if (json_build_list(&q, c, "other_tags", &c->callid, i, tags)) + return -1; + for (l = q.head; l; l = l->next) { + other_ml = l->data; + if (!other_ml) + return -1; + g_hash_table_insert(ml->other_tags, &other_ml->tag, other_ml); + } + g_queue_clear(&q); + + if (json_build_list(&ml->medias, c, "medias", &c->callid, i, medias)) + return -1; + } + + return 0; +} + static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *tags, struct redis_list *medias) { unsigned int i; @@ -1272,6 +1538,31 @@ static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *t return 0; } +static int json_link_streams(struct call *c, struct redis_list *streams, + struct redis_list *sfds, struct redis_list *medias) +{ + unsigned int i; + struct packet_stream *ps; + + for (i = 0; i < streams->len; i++) { + ps = streams->ptrs[i]; + + ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); + ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); + ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); + ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); + ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); + + if (json_build_list(&ps->sfds, c, "stream_sfds", &c->callid, i, sfds)) + return -1; + + if (ps->media) + __rtp_stats_update(ps->rtp_stats, ps->media->rtp_payload_types); + } + + return 0; +} + static int redis_link_streams(struct redis *r, struct call *c, struct redis_list *streams, struct redis_list *sfds, struct redis_list *medias) { @@ -1297,6 +1588,26 @@ static int redis_link_streams(struct redis *r, struct call *c, struct redis_list return 0; } +static int json_link_medias(struct redis *r, struct call *c, struct redis_list *medias, + struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) +{ + unsigned int i; + struct call_media *med; + + for (i = 0; i < medias->len; i++) { + med = medias->ptrs[i]; + + med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); + if (!med->monologue) + return -1; + if (json_build_list(&med->streams, c, "streams", &c->callid, i, streams)) + return -1; + if (json_build_list(&med->endpoint_maps, c, "maps", &c->callid, i, maps)) + return -1; + } + return 0; +} + static int redis_link_medias(struct redis *r, struct call *c, struct redis_list *medias, struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) { @@ -1346,6 +1657,21 @@ static int rbl_cb_intf_sfds(str *s, GQueue *q, struct redis_list *list, void *pt return 0; } +static int json_link_maps(struct redis *r, struct call *c, struct redis_list *maps, + struct redis_list *sfds) +{ + unsigned int i; + struct endpoint_map *em; + + for (i = 0; i < maps->len; i++) { + em = maps->ptrs[i]; + + if (json_build_list_cb(&em->intf_sfds, c, "map_sfds", &c->callid, em->unique_id, sfds, + rbl_cb_intf_sfds, em)) + return -1; + } + return 0; +} static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *maps, struct redis_list *sfds) @@ -1363,9 +1689,161 @@ static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *m return 0; } +static void json_restore_call(struct redis *r, struct callmaster *m, redisReply *id, enum call_type type) { + redisReply* rr_jsonStr; + struct redis_hash call; + struct redis_list tags, sfds, streams, medias, maps; + struct call *c = NULL; + str s ; + const char *err = 0; + int i; -static void redis_restore_recording(struct call *c, struct redis_hash *call) { - str s; + JsonReader *root_reader =0; + JsonParser *parser =0; + char orig_json_callid[512]; memset(orig_json_callid,0,512); + memcpy(orig_json_callid,id->str,strlen(id->str)); + + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET %s",id->str); + if (!rr_jsonStr) { + rlog(LOG_ERR, "Could not retrieve json data from redis for callid: %s", id->str); + goto err1; + } + + // strip off json- prefix from callid + str_cut(id->str, 0, strlen("json-")); + id->len = strlen(id->str); + str_init_len(&s, id->str, id->len); + + parser = json_parser_new(); + if (!json_parser_load_from_data (parser, rr_jsonStr->str, -1, NULL)) { + rlog(LOG_DEBUG, "Could not parse json data !"); + goto err1; + } + root_reader = json_reader_new (json_parser_get_root (parser)); + if (!root_reader) { + rlog(LOG_DEBUG, "Could not read json data !"); + goto err1; + } + + c = call_get_or_create(&s, m, type); + err = "failed to create call struct"; + if (!c) + goto err1; + + c->root_reader = root_reader; // attach the json to the call in order to restore data from there + + err = "call already exists"; + if (c->last_signal) + goto err2; + err = "'call' data incomplete"; + if (json_get_hash(&call, c, "json", id, -1)) + goto err2; + err = "'tags' incomplete"; + if (json_get_list_hash(&tags, c, "tag", id, &call, "num_tags")) + goto err3; + err = "'sfds' incomplete"; + if (json_get_list_hash(&sfds, c, "sfd", id, &call, "num_sfds")) + goto err4; + err = "'streams' incomplete"; + if (json_get_list_hash(&streams, c, "stream", id, &call, "num_streams")) + goto err5; + err = "'medias' incomplete"; + if (json_get_list_hash(&medias, c, "media", id, &call, "num_medias")) + goto err6; + err = "'maps' incomplete"; + if (json_get_list_hash(&maps, c, "map", id, &call, "num_maps")) + goto err7; + + err = "missing 'created' timestamp"; + if (redis_hash_get_time_t(&c->created, &call, "created")) + goto err8; + err = "missing 'last signal' timestamp"; + if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal")) + goto err8; + if (redis_hash_get_int(&i, &call, "tos")) + c->tos = 184; + else + c->tos = i; + redis_hash_get_time_t(&c->deleted, &call, "deleted"); + redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); + if (!redis_hash_get_str(&s, &call, "created_from")) + c->created_from = call_strdup(c, s.s); + if (!redis_hash_get_str(&s, &call, "created_from_addr")) + sockaddr_parse_any_str(&c->created_from_addr, &s); + + err = "missing 'redis_hosted_db' value"; + if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) + goto err8; + + err = "failed to create sfds"; + if (redis_sfds(c, &sfds)) + goto err8; + err = "failed to create streams"; + if (redis_streams(c, &streams)) + goto err8; + err = "failed to create tags"; + if (redis_tags(c, &tags)) + goto err8; + err = "failed to create medias"; + if (redis_medias(r, c, &medias)) + goto err8; + err = "failed to create maps"; + if (redis_maps(c, &maps)) + goto err8; + + err = "failed to link sfds"; + if (redis_link_sfds(&sfds, &streams)) + goto err8; + err = "failed to link streams"; + if (json_link_streams(c, &streams, &sfds, &medias)) + goto err8; + err = "failed to link tags"; + if (json_link_tags(c, &tags, &medias)) + goto err8; + err = "failed to link medias"; + if (json_link_medias(r, c, &medias, &streams, &maps, &tags)) + goto err8; + err = "failed to link maps"; + if (json_link_maps(r, c, &maps, &sfds)) + goto err8; + + err = NULL; + +err8: + json_destroy_list(&maps); +err7: + json_destroy_list(&medias); +err6: + json_destroy_list(&streams); +err5: + json_destroy_list(&sfds); +err4: + json_destroy_list(&tags); +err3: + json_destroy_hash(&call); +err2: + rwlock_unlock_w(&c->master_lock); +err1: + if (root_reader) + g_object_unref (root_reader); + if (parser) + g_object_unref (parser); + if (rr_jsonStr) + freeReplyObject(rr_jsonStr); + log_info_clear(); + if (err) { + rlog(LOG_WARNING, "Failed to restore call ID '%.*s' from Redis: %s", REDIS_FMT(id), err); + if (c) + call_destroy(c); + else + redisCommandNR(m->conf.redis_write->ctx, "DEL %s", orig_json_callid); + } + if (c) + obj_put(c); +} + +static void redis_restore_recording(struct call *c, struct redis_hash *call) { + str s; // presence of this key determines whether we were recording at all if (redis_hash_get_str(&s, call, "recording_meta_prefix")) @@ -1519,7 +1997,11 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - redis_restore_call(r, ctx->m, call, CT_OWN_CALL); + if (ctx->m->conf.redis_multikey) { + redis_restore_call(r, ctx->m, call, CT_OWN_CALL); + } else { + json_restore_call(r, ctx->m, call, CT_OWN_CALL); + } mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); @@ -1547,7 +2029,11 @@ int redis_restore(struct callmaster *m, struct redis *r) { } mutex_unlock(&r->lock); - calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); + if (m->conf.redis_multikey) { + calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); + } else { + calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); + } if (!calls) { rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr); @@ -1581,8 +2067,38 @@ err: return ret; } +#define JSON_SET_NSTRING(a,b,c,d) do { \ + sprintf(tmp,a,b); \ + json_builder_set_member_name_uri_enc (builder, tmp); \ + ZERO(tmp); \ + snprintf(tmp,sizeof(tmp), c,d); \ + json_builder_add_string_value_uri_enc(builder, tmp); \ + ZERO(tmp); \ + } while (0) +static int json_update_crypto_params(JsonBuilder *builder, const char *pref, const str *callid, + unsigned int unique_id, + const char *key, const struct crypto_params *p) +{ + char tmp[2048]; + + if (!p->crypto_suite) + return -1; + + JSON_SET_NSTRING("%s-crypto_suite",key,"%s",p->crypto_suite->name); + JSON_SET_NSTRING("%s-master_key",key,"%s",p->master_key); + JSON_SET_NSTRING("%s-master_salt",key,"%s",p->master_salt); + JSON_SET_NSTRING("%s-unenc-srtp",key,"%i",p->session_params.unencrypted_srtp); + JSON_SET_NSTRING("%s-unenc-srtcp",key,"%i",p->session_params.unencrypted_srtcp); + JSON_SET_NSTRING("%s-unauth-srtp",key,"%i",p->session_params.unauthenticated_srtp); + + if (p->mki) { + JSON_SET_NSTRING("%s-mki",key,"%s",p->mki); + } + + return 0; +} static int redis_update_crypto_params(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, @@ -1607,6 +2123,21 @@ static int redis_update_crypto_params(struct redis *r, const char *pref, const s return 0; } + +static void json_update_crypto_context(JsonBuilder *builder, const char *pref, const str *callid, + unsigned int unique_id, + const struct crypto_context *c) +{ + char tmp[2048]; ZERO(tmp); + + if (json_update_crypto_params(builder, pref, callid, unique_id, "", &c->params)) + return; + + JSON_SET_NSTRING("%s","last_index","%lu",c->last_index); + JSON_SET_NSTRING("%s","ssrc","%u",(unsigned) c->ssrc); + +} + static void redis_update_crypto_context(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, const struct crypto_context *c) @@ -1634,6 +2165,20 @@ static void redis_update_stats(struct redis *r, const char *pref, const str *cal key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes), key, atomic64_get(&s->errors)); } + +static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref, const str *callid, + unsigned int unique_id, + const struct dtls_fingerprint *f) +{ + char tmp[2048]; ZERO(tmp); + + if (!f->hash_func) + return; + + JSON_SET_NSTRING("%s","hash_func","%s",f->hash_func->name); + JSON_SET_NSTRING("%s","fingerprint","%s",f->digest); +} + static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, const str *callid, unsigned int unique_id, const struct dtls_fingerprint *f) @@ -1645,6 +2190,380 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, con f->hash_func->name, S_LEN(f->digest, sizeof(f->digest))); } +/** + * encodes the few (k,v) pairs for one call under one json structure + */ + +char* redis_encode_json(struct call *c) { + + GList *l=0,*k=0, *m=0, *n=0; + struct endpoint_map *ep; + struct call_media *media; + struct rtp_payload_type *pt; + struct stream_fd *sfd; + struct packet_stream *ps; + struct intf_list *il; + struct call_monologue *ml, *ml2; + JsonBuilder *builder = json_builder_new (); + + char tmp[2048]; ZERO(tmp); + + json_builder_begin_object (builder); + { + sprintf(tmp,"json-" STR_FORMAT, STR_FMT(&c->callid)); + json_builder_set_member_name_uri_enc (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + + { + JSON_SET_NSTRING("%s","created","%ld",(long int) c->created); + JSON_SET_NSTRING("%s","last_signal","%ld",(long int) c->last_signal); + JSON_SET_NSTRING("%s","tos","%u",(int) c->tos); + JSON_SET_NSTRING("%s","deleted","%ld",(long int) c->deleted); + JSON_SET_NSTRING("%s","num_sfds","%u",g_queue_get_length(&c->stream_fds)); + JSON_SET_NSTRING("%s","num_streams","%u",g_queue_get_length(&c->streams)); + JSON_SET_NSTRING("%s","num_medias","%u",g_queue_get_length(&c->medias)); + JSON_SET_NSTRING("%s","num_tags","%u",g_queue_get_length(&c->monologues)); + JSON_SET_NSTRING("%s","num_maps","%u",g_queue_get_length(&c->endpoint_maps)); + JSON_SET_NSTRING("%s","ml_deleted","%ld",(long int) c->ml_deleted); + JSON_SET_NSTRING("%s","created_from","%s",c->created_from); + JSON_SET_NSTRING("%s","created_from_addr","%s",sockaddr_print_buf(&c->created_from_addr)); + JSON_SET_NSTRING("%s","redis_hosted_db","%u",c->redis_hosted_db); + + } + + json_builder_end_object (builder); + + for (l = c->stream_fds.head; l; l = l->next) { + sfd = l->data; + + sprintf(tmp,"sfd-%s-%u",STRSTR(&c->callid),sfd->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + + { + JSON_SET_NSTRING("%s","pref_family","%s",sfd->local_intf->logical->preferred_family->rfc_name); + JSON_SET_NSTRING("%s","localport","%u",sfd->socket.local.port); + JSON_SET_NSTRING("%s","logical_intf","%s",STRSTR(&sfd->local_intf->logical->name)); + JSON_SET_NSTRING("%s","local_intf_uid","%u",sfd->local_intf->unique_id); + JSON_SET_NSTRING("%s","stream","%u",sfd->stream->unique_id); + + json_update_crypto_context(builder, "sfd", &c->callid, sfd->unique_id, &sfd->crypto); + + } + json_builder_end_object (builder); + + } // --- for + + for (l = c->streams.head; l; l = l->next) { + ps = l->data; + + mutex_lock(&ps->in_lock); + mutex_lock(&ps->out_lock); + + sprintf(tmp,"stream-%s-%u",STRSTR(&c->callid),ps->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + + { + JSON_SET_NSTRING("%s","media","%u",ps->media->unique_id); + JSON_SET_NSTRING("%s","sfd","%u",ps->selected_sfd ? ps->selected_sfd->unique_id : -1); + JSON_SET_NSTRING("%s","rtp_sink","%u",ps->rtp_sink ? ps->rtp_sink->unique_id : -1); + JSON_SET_NSTRING("%s","rtcp_sink","%u",ps->rtcp_sink ? ps->rtcp_sink->unique_id : -1); + JSON_SET_NSTRING("%s","rtcp_sibling","%u",ps->rtcp_sibling ? ps->rtcp_sibling->unique_id : -1); + JSON_SET_NSTRING("%s","last_packet",UINT64F,atomic64_get(&ps->last_packet)); + JSON_SET_NSTRING("%s","ps_flags","%u",ps->ps_flags); + JSON_SET_NSTRING("%s","component","%u",ps->component); + JSON_SET_NSTRING("%s","endpoint","%s",endpoint_print_buf(&ps->endpoint)); + JSON_SET_NSTRING("%s","advertised_endpoint","%s",endpoint_print_buf(&ps->advertised_endpoint)); + JSON_SET_NSTRING("%s","stats-packets","%ld",atomic64_get(&ps->stats.packets)); + JSON_SET_NSTRING("%s","stats-bytes","%ld",atomic64_get(&ps->stats.bytes)); + JSON_SET_NSTRING("%s","stats-errors","%ld",atomic64_get(&ps->stats.errors)); + + json_update_crypto_context(builder, "stream", &c->callid, ps->unique_id, &ps->crypto); + + } + + json_builder_end_object (builder); + + // stream_sfds was here before + mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->out_lock); + + } // --- for streams.head + + + for (l = c->streams.head; l; l = l->next) { + ps = l->data; + + mutex_lock(&ps->in_lock); + mutex_lock(&ps->out_lock); + + sprintf(tmp,"stream_sfds-%s-%u",STRSTR(&c->callid),ps->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (k = ps->sfds.head; k; k = k->next) { + sfd = k->data; + sprintf(tmp,"%u",sfd->unique_id); + json_builder_add_string_value_uri_enc(builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + + mutex_unlock(&ps->in_lock); + mutex_unlock(&ps->out_lock); + } + + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + + sprintf(tmp,"tag-%s-%u",STRSTR(&c->callid),ml->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + { + + JSON_SET_NSTRING("%s","created","%llu",(long long unsigned) ml->created); + JSON_SET_NSTRING("%s","active","%u",ml->active_dialogue ? ml->active_dialogue->unique_id : -1); + JSON_SET_NSTRING("%s","deleted","%llu",(long long unsigned) ml->deleted); + + if (ml->tag.s) { + JSON_SET_NSTRING("%s","tag","%s",STRSTR(&ml->tag)); + } + if (ml->viabranch.s) { + JSON_SET_NSTRING("%s","via-branch","%s",STRSTR(&ml->viabranch)); + } + } + json_builder_end_object (builder); + + // other_tags and medias- was here before + + } // --- for monologues.head + + for (l = c->monologues.head; l; l = l->next) { + ml = l->data; + // -- we do it again here since the jsonbuilder is linear straight forward + k = g_hash_table_get_values(ml->other_tags); + sprintf(tmp,"other_tags-%s-%u",STRSTR(&c->callid),ml->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = k; m; m = m->next) { + ml2 = m->data; + sprintf(tmp,"%u",ml2->unique_id); + json_builder_add_string_value_uri_enc(builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + + g_list_free(k); + + sprintf(tmp,"medias-%s-%u",STRSTR(&c->callid),ml->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (k = ml->medias.head; k; k = k->next) { + media = k->data; + sprintf(tmp,"%u",media->unique_id); + json_builder_add_string_value_uri_enc (builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + } + + + for (l = c->medias.head; l; l = l->next) { + media = l->data; + + sprintf(tmp,"media-%s-%u",STRSTR(&c->callid),media->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + { + JSON_SET_NSTRING("%s","tag","%u",media->monologue->unique_id); + JSON_SET_NSTRING("%s","index","%u",media->index); + JSON_SET_NSTRING("%s","type","%s",STRSTR(&media->type)); + JSON_SET_NSTRING("%s","protocol","%s",media->protocol ? media->protocol->name : ""); + JSON_SET_NSTRING("%s","desired_family","%s",media->desired_family ? media->desired_family->rfc_name : ""); + JSON_SET_NSTRING("%s","sdes_in_tag","%u",media->sdes_in.tag); + JSON_SET_NSTRING("%s","sdes_out_tag","%u",media->sdes_out.tag); + JSON_SET_NSTRING("%s","logical_intf","%s",STRSTR(&media->logical_intf->name)); + JSON_SET_NSTRING("%s","media_flags","%u",media->media_flags); + + json_update_crypto_params(builder, "media", &c->callid, media->unique_id, "sdes_in", + &media->sdes_in.params); + json_update_crypto_params(builder, "media", &c->callid, media->unique_id, "sdes_out", + &media->sdes_out.params); + json_update_dtls_fingerprint(builder, "media", &c->callid, media->unique_id, &media->fingerprint); + + // streams and maps- and payload_types- was here before + + } + json_builder_end_object (builder); + + } // --- for medias.head + + // -- we do it again here since the jsonbuilder is linear straight forward + for (l = c->medias.head; l; l = l->next) { + media = l->data; + + sprintf(tmp,"streams-%s-%u",STRSTR(&c->callid),media->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = media->streams.head; m; m = m->next) { + ps = m->data; + sprintf(tmp,"%u",ps->unique_id); + json_builder_add_string_value_uri_enc (builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + + sprintf(tmp,"maps-%s-%u",STRSTR(&c->callid),media->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = media->endpoint_maps.head; m; m = m->next) { + ep = m->data; + sprintf(tmp,"%u",ep->unique_id); + json_builder_add_string_value_uri_enc (builder, tmp); + ZERO(tmp); + } + json_builder_end_array (builder); + + k = g_hash_table_get_values(media->rtp_payload_types); + sprintf(tmp,"payload_types-%s-%u",STRSTR(&c->callid),media->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = k; m; m = m->next) { + pt = m->data; + sprintf(tmp,"%u/%s/%u/%s", + pt->payload_type, STRSTR(&pt->encoding), + pt->clock_rate, STRSTR(&pt->encoding_parameters)); + json_builder_add_string_value_uri_enc(builder, tmp); + } + json_builder_end_array (builder); + + g_list_free(k); + + } + + for (l = c->endpoint_maps.head; l; l = l->next) { + ep = l->data; + + sprintf(tmp,"map-%s-%u",STRSTR(&c->callid),ep->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + ZERO(tmp); + + json_builder_begin_object (builder); + { + JSON_SET_NSTRING("%s","wildcard","%i",ep->wildcard); + JSON_SET_NSTRING("%s","num_ports","%u",ep->num_ports); + JSON_SET_NSTRING("%s","intf_preferred_family","%s",ep->logical_intf->preferred_family->rfc_name); + JSON_SET_NSTRING("%s","logical_intf","%s",STRSTR(&ep->logical_intf->name)); + JSON_SET_NSTRING("%s","endpoint","%s",endpoint_print_buf(&ep->endpoint)); + + } + json_builder_end_object (builder); + + } // --- for c->endpoint_maps.head + + // -- we do it again here since the jsonbuilder is linear straight forward + for (l = c->endpoint_maps.head; l; l = l->next) { + ep = l->data; + + sprintf(tmp,"map_sfds-%s-%u",STRSTR(&c->callid),ep->unique_id); + json_builder_set_member_name_uri_enc (builder, tmp); + json_builder_begin_array (builder); + ZERO(tmp); + for (m = ep->intf_sfds.head; m; m = m->next) { + il = m->data; + sprintf(tmp,"loc-%u",il->local_intf->unique_id); + json_builder_add_string_value_uri_enc(builder, tmp); + ZERO(tmp); + for (n = il->list.head; n; n = n->next) { + sfd = n->data; + sprintf(tmp,"%u",sfd->unique_id); + json_builder_add_string_value_uri_enc (builder, tmp); + ZERO(tmp); + } + } + json_builder_end_array (builder); + } + } + json_builder_end_object (builder); + + JsonGenerator *gen = json_generator_new (); + JsonNode * root = json_builder_get_root (builder); + json_generator_set_root (gen, root); + char* result = json_generator_to_data (gen, NULL); + + json_node_free (root); + g_object_unref (gen); + g_object_unref (builder); + + return result; + +} + + +void redis_update_onekey(struct call *c, struct redis *r) { + unsigned int redis_expires_s; + + if (!r) + return; + + mutex_lock(&r->lock); + if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { + mutex_unlock(&r->lock); + return ; + } + + rwlock_lock_r(&c->master_lock); + + redis_expires_s = c->callmaster->conf.redis_expires_secs; + + c->redis_hosted_db = r->db; + if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { + rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error."); + goto err; + } + + char* result = redis_encode_json(c); + if (!result) + goto err; + + redis_pipe(r, "SET json-"PB" %s", STR(&c->callid), result); + redis_pipe(r, "EXPIRE json-"PB" %i", STR(&c->callid), redis_expires_s); + + redis_consume(r); + + if (result) + free(result); + mutex_unlock(&r->lock); + rwlock_unlock_r(&c->master_lock); + + return; +err: + + mutex_unlock(&r->lock); + rwlock_unlock_r(&c->master_lock); + if (r->ctx->err) + rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr); + redisFree(r->ctx); + r->ctx = NULL; + +} static void redis_update_recording(struct redis *r, struct call *c) { struct recording *rec; @@ -1658,7 +2577,6 @@ static void redis_update_recording(struct redis *r, struct call *c) { } - /* * Redis data structure: * @@ -1984,7 +2902,11 @@ void redis_delete(struct call *c, struct redis *r) { if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) goto err; - redis_delete_call(c, r); + if (c->callmaster->conf.redis_multikey) { + redis_delete_call(c, r); + } else { + redis_delete_call_json(c, r); + } rwlock_unlock_r(&c->master_lock); mutex_unlock(&r->lock); diff --git a/daemon/redis.h b/daemon/redis.h index 0321abd77..a1be63291 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -91,23 +91,17 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) #endif - - - - #define rlog(l, x...) ilog(l | LOG_FLAG_RESTORE, x) - - #define REDIS_FMT(x) (x)->len, (x)->str - void redis_notify_loop(void *d); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required); int redis_restore(struct callmaster *, struct redis *); void redis_update(struct call *, struct redis *); +void redis_update_onekey(struct call *c, struct redis *r); void redis_delete(struct call *, struct redis *); void redis_wipe(struct redis *); int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action); diff --git a/debian/control b/debian/control index fa2988706..ad42acc8b 100644 --- a/debian/control +++ b/debian/control @@ -13,6 +13,7 @@ Build-Depends: debhelper (>= 5), libevent-dev (>= 2.0), libglib2.0-dev (>= 2.30), libhiredis-dev, + libjson-glib-dev, libpcap0.8-dev | libpcap-dev, libpcre3-dev, libssl-dev (>= 1.0.1), diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index 432a06082..7b770ce52 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -74,6 +74,7 @@ fi [ -z "$REDIS_WRITE_AUTH_PW" ] || export RTPENGINE_REDIS_WRITE_AUTH_PW="$REDIS_WRITE_AUTH_PW" [ -z "$REDIS_NUM_THREADS" ] || OPTIONS="$OPTIONS --redis-num-threads=$REDIS_NUM_THREADS" [ -z "$REDIS_EXPIRES" ] || OPTIONS="$OPTIONS --redis-expires=$REDIS_EXPIRES" +[ -z "$REDIS_MULTIKEY" ] || OPTIONS="$OPTIONS --redis-multikey=$REDIS_MULTIKEY" [ -z "$NO_REDIS_REQUIRED" -o \( "$NO_REDIS_REQUIRED" != "1" -a "$NO_REDIS_REQUIRED" != "yes" \) ] || OPTIONS="$OPTIONS --no-redis-required" [ -z "$B2B_URL" ] || OPTIONS="$OPTIONS --b2b-url=$B2B_URL" [ -z "$NO_FALLBACK" -o \( "$NO_FALLBACK" != "1" -a "$NO_FALLBACK" != "yes" \) ] || OPTIONS="$OPTIONS --no-fallback"