diff --git a/daemon/redis.c b/daemon/redis.c index a51eb3e8e..405083aca 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -67,7 +67,6 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) #define REDIS_FMT(x) (x)->len, (x)->str -static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type); static int redis_check_conn(struct redis *r); static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type); @@ -310,10 +309,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) goto err; } - if (cm->conf.redis_multikey && strncmp(rr->element[3]->str,"sadd",4)==0) - redis_restore_call(r, cm, &callid, CT_FOREIGN_CALL); - - if (!cm->conf.redis_multikey && strncmp(rr->element[3]->str,"set",3)==0) { + if (strncmp(rr->element[3]->str,"set",3)==0) { c = call_get(&callid, cm); if (c) { rwlock_unlock_w(&c->master_lock); @@ -785,8 +781,7 @@ INLINE redisReply *createReplyObject(int type) { } static int json_get_hash(struct redis_hash *out, struct call* c, - const char *key, - unsigned int id) + const char *key, unsigned int id) { static unsigned int MAXKEYLENGTH = 512; char key_concatted[MAXKEYLENGTH]; @@ -803,7 +798,6 @@ static int json_get_hash(struct redis_hash *out, struct call* c, if (rc>=MAXKEYLENGTH) rlog(LOG_ERROR,"Json key too long."); - if (!json_reader_read_member(c->root_reader, key_concatted)) { rlog(LOG_ERROR, "Could not read json member: %s",key_concatted); goto err; @@ -812,7 +806,6 @@ static int json_get_hash(struct redis_hash *out, struct call* c, out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, freeReplyObject); if (!out->ht) goto err; - out->rr = 0; gchar **members = json_reader_list_members(c->root_reader); gchar **orig_members = members; @@ -823,16 +816,11 @@ static int json_get_hash(struct redis_hash *out, struct call* c, rlog(LOG_ERROR, "Could not read json member: %s",*members); goto err3; } - out->rr = createReplyObject(REDIS_REPLY_STRING); - if (!out->rr) { - rlog(LOG_ERROR, "Could not create redis reply object (out of memory?)"); - goto err3; - } - out->rr->str = json_reader_get_string_value_uri_enc(c->root_reader, &out->rr->len); - + str_init_len(&out->s,(char*)json_reader_get_string_value(c->root_reader),strlen((char*)json_reader_get_string_value(c->root_reader))); char* tmp = strdup(*members); - if (g_hash_table_insert_check(out->ht, tmp, out->rr) != TRUE) { + if (g_hash_table_insert_check(out->ht, tmp, str_dup(&out->s)) != TRUE) { + ilog(LOG_WARNING,"Key %s already exists", tmp); goto err3; } @@ -847,68 +835,11 @@ static int json_get_hash(struct redis_hash *out, struct call* c, err3: g_strfreev(members); - if (out->rr) - freeReplyObject(out->rr); g_hash_table_destroy(out->ht); err: return -1; } - -static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const str *which, - unsigned int id) -{ - redisReply *k, *v; - int i; - - out->ht = g_hash_table_new(g_str_hash, g_str_equal); - if (!out->ht) - goto err; - if (id == -1) - out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"", key, STR(which)); - else - out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"-%u", key, STR(which), id); - if (!out->rr) - goto err2; - - for (i = 1; i < out->rr->elements; i += 2) { - k = out->rr->element[i - 1]; - v = out->rr->element[i]; - if (k->type != REDIS_REPLY_STRING || v->type != REDIS_REPLY_STRING) - continue; - - if (g_hash_table_insert_check(out->ht, k->str, v) != TRUE) - goto err3; - } - - return 0; - -err3: - if (out->rr) - freeReplyObject(out->rr); -err2: - g_hash_table_destroy(out->ht); -err: - return -1; -} - - -static void redis_destroy_hash(struct redis_hash *rh) { - - if (rh->rr) - freeReplyObject(rh->rr); - g_hash_table_destroy(rh->ht); -} -static void redis_destroy_list(struct redis_list *rl) { - unsigned int i; - - for (i = 0; i < rl->len; i++) { - redis_destroy_hash(&rl->rh[i]); - } - free(rl->rh); - free(rl->ptrs); -} - static void json_destroy_hash(struct redis_hash *rh) { g_hash_table_destroy(rh->ht); } @@ -924,7 +855,7 @@ static void json_destroy_list(struct redis_list *rl) { } static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *k) { - redisReply *r; + str *r; r = g_hash_table_lookup(h->ht, k); if (!r) { @@ -932,7 +863,7 @@ static int redis_hash_get_str(str *out, const struct redis_hash *h, const char * out->len = 0; return -1; } - out->s = r->str; + out->s = r->s; out->len = r->len; return 0; } @@ -1054,34 +985,6 @@ static int json_build_list_cb(GQueue *q, struct call *c, const char *key, return 0; } - -static int redis_build_list_cb(GQueue *q, struct redis *r, const char *key, const str *callid, - unsigned int idx, struct redis_list *list, - int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr) -{ - redisReply *rr; - int i; - str s; - - rr = redis_get(r, REDIS_REPLY_ARRAY, "LRANGE %s-"PB"-%u 0 -1", key, STR(callid), idx); - if (!rr) - return -1; - - for (i = 0; i < rr->elements; i++) { - if (rr->element[i]->type != REDIS_REPLY_STRING) { - freeReplyObject(rr); - return -1; - } - str_init_len(&s, rr->element[i]->str, rr->element[i]->len); - if (cb(&s, q, list, ptr)) { - freeReplyObject(rr); - return -1; - } - } - - freeReplyObject(rr); - return 0; -} static int rbl_cb_simple(str *s, GQueue *q, struct redis_list *list, void *ptr) { int j; j = str_to_i(s, 0); @@ -1095,12 +998,6 @@ static int json_build_list(GQueue *q, struct call *c, const char *key, const str return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL); } -static int redis_build_list(GQueue *q, struct redis *r, const char *key, const str *callid, - unsigned int idx, struct redis_list *list) -{ - return redis_build_list_cb(q, r, key, callid, idx, list, rbl_cb_simple, NULL); -} - static int json_get_list_hash(struct redis_list *out, struct call* c, const char *key, const struct redis_hash *rh, const char *rh_num_key) @@ -1134,41 +1031,6 @@ err1: return -1; } -static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const str *id, - const struct redis_hash *rh, const char *rh_num_key) -{ - unsigned int i; - - if (redis_hash_get_unsigned(&out->len, rh, rh_num_key)) - return -1; - out->rh = malloc(sizeof(*out->rh) * out->len); - if (!out->rh) - return -1; - out->ptrs = malloc(sizeof(*out->ptrs) * out->len); - if (!out->ptrs) - goto err1; - - for (i = 0; i < out->len; i++) { - if (redis_get_hash(&out->rh[i], r, key, id, i)) - goto err2; - } - - return 0; - -err2: - free(out->ptrs); - while (i) { - i--; - redis_destroy_hash(&out->rh[i]); - } -err1: - free(out->rh); - return -1; -} - - - - /* can return 1, 0 or -1 */ static int redis_hash_get_crypto_params(struct crypto_params *out, const struct redis_hash *h, const char *k) { str s; @@ -1432,62 +1294,6 @@ static int json_medias(struct redis *r, struct call *c, struct redis_list *media return 0; } -static int redis_medias(struct redis *r, struct call *c, struct redis_list *medias) { - unsigned int i; - struct redis_hash *rh; - struct call_media *med; - str s; - - for (i = 0; i < medias->len; i++) { - rh = &medias->rh[i]; - - /* from call.c:__get_media() */ - med = uid_slice_alloc0(med, &c->medias); - med->call = c; - med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, - __payload_type_free); - - if (redis_hash_get_unsigned(&med->index, rh, "index")) - return -1; - if (redis_hash_get_str(&s, rh, "type")) - return -1; - call_str_cpy(c, &med->type, &s); - - if (redis_hash_get_str(&s, rh, "protocol")) - return -1; - med->protocol = transport_protocol(&s); - - if (redis_hash_get_str(&s, rh, "desired_family")) - return -1; - med->desired_family = get_socket_family_rfc(&s); - - if (redis_hash_get_str(&s, rh, "logical_intf") - || !(med->logical_intf = get_logical_interface(&s, med->desired_family, 0))) - { - rlog(LOG_ERR, "unable to find specified local interface"); - med->logical_intf = get_logical_interface(NULL, med->desired_family, 0); - } - - if (redis_hash_get_unsigned(&med->sdes_in.tag, rh, "sdes_in_tag")) - return -1; - if (redis_hash_get_unsigned(&med->sdes_out.tag, rh, "sdes_out_tag")) - return -1; - if (redis_hash_get_unsigned((unsigned int *) &med->media_flags, rh, - "media_flags")) - return -1; - if (redis_hash_get_crypto_params(&med->sdes_in.params, rh, "sdes_in") < 0) - return -1; - if (redis_hash_get_crypto_params(&med->sdes_out.params, rh, "sdes_out") < 0) - return -1; - - redis_build_list_cb(NULL, r, "payload_types", &c->callid, i, NULL, rbl_cb_plts, med); - /* XXX dtls */ - - medias->ptrs[i] = med; - } - - return 0; -} static int redis_maps(struct call *c, struct redis_list *maps) { unsigned int i; @@ -1570,35 +1376,6 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_ return 0; } -static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *tags, struct redis_list *medias) -{ - unsigned int i; - struct call_monologue *ml, *other_ml; - GQueue q = G_QUEUE_INIT; - GList *l; - - for (i = 0; i < tags->len; i++) { - ml = tags->ptrs[i]; - - ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active"); - - if (redis_build_list(&q, r, "other_tags", &c->callid, i, tags)) - return -1; - for (l = q.head; l; l = l->next) { - other_ml = l->data; - if (!other_ml) - return -1; - g_hash_table_insert(ml->other_tags, &other_ml->tag, other_ml); - } - g_queue_clear(&q); - - if (redis_build_list(&ml->medias, r, "medias", &c->callid, i, medias)) - return -1; - } - - return 0; -} - static int json_link_streams(struct call *c, struct redis_list *streams, struct redis_list *sfds, struct redis_list *medias) { @@ -1624,31 +1401,6 @@ static int json_link_streams(struct call *c, struct redis_list *streams, return 0; } -static int redis_link_streams(struct redis *r, struct call *c, struct redis_list *streams, - struct redis_list *sfds, struct redis_list *medias) -{ - unsigned int i; - struct packet_stream *ps; - - for (i = 0; i < streams->len; i++) { - ps = streams->ptrs[i]; - - ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media"); - ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd"); - ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink"); - ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink"); - ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling"); - - if (redis_build_list(&ps->sfds, r, "stream_sfds", &c->callid, i, sfds)) - return -1; - - if (ps->media) - __rtp_stats_update(ps->rtp_stats, ps->media->rtp_payload_types); - } - - return 0; -} - static int json_link_medias(struct redis *r, struct call *c, struct redis_list *medias, struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) { @@ -1669,26 +1421,6 @@ static int json_link_medias(struct redis *r, struct call *c, struct redis_list * return 0; } -static int redis_link_medias(struct redis *r, struct call *c, struct redis_list *medias, - struct redis_list *streams, struct redis_list *maps, struct redis_list *tags) -{ - unsigned int i; - struct call_media *med; - - for (i = 0; i < medias->len; i++) { - med = medias->ptrs[i]; - - med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag"); - if (!med->monologue) - return -1; - if (redis_build_list(&med->streams, r, "streams", &c->callid, i, streams)) - return -1; - if (redis_build_list(&med->endpoint_maps, r, "maps", &c->callid, i, maps)) - return -1; - } - return 0; -} - static int rbl_cb_intf_sfds(str *s, GQueue *q, struct redis_list *list, void *ptr) { int i; struct intf_list *il; @@ -1734,37 +1466,25 @@ static int json_link_maps(struct redis *r, struct call *c, struct redis_list *ma return 0; } -static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *maps, - struct redis_list *sfds) -{ - unsigned int i; - struct endpoint_map *em; - - for (i = 0; i < maps->len; i++) { - em = maps->ptrs[i]; - - if (redis_build_list_cb(&em->intf_sfds, r, "map_sfds", &c->callid, em->unique_id, sfds, - rbl_cb_intf_sfds, em)) - return -1; - } - return 0; -} - -static void json_restore_call(struct redis *r, struct callmaster *m, const str *callid, enum call_type type) { +static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) { redisReply* rr_jsonStr; struct redis_hash call; struct redis_list tags, sfds, streams, medias, maps; struct call *c = NULL; + str callid; + const char *err = 0; int i; - str s; JsonReader *root_reader =0; JsonParser *parser =0; - rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(callid)); + // TODO: Maybe refactor + str_init_len(&callid, id->s, id->len); + str_shift(&callid,strlen("json-")); + + rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(&callid)); if (!rr_jsonStr) { - rlog(LOG_ERR, "Could not retrieve json data from redis for callid: " STR_FORMAT, - STR_FMT(callid)); + rlog(LOG_ERR, "Could not retrieve json data from redis for key: %s", id->s); goto err1; } @@ -1779,7 +1499,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * goto err1; } - c = call_get_or_create(callid, m, type); + c = call_get_or_create(&callid, m, type); err = "failed to create call struct"; if (!c) goto err1; @@ -1790,6 +1510,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * if (c->last_signal) goto err2; err = "'call' data incomplete"; + if (json_get_hash(&call, c, "json", -1)) goto err2; err = "'tags' incomplete"; @@ -1820,10 +1541,10 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str * c->tos = i; redis_hash_get_time_t(&c->deleted, &call, "deleted"); redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); - if (!redis_hash_get_str(&s, &call, "created_from")) - c->created_from = call_strdup(c, s.s); - if (!redis_hash_get_str(&s, &call, "created_from_addr")) - sockaddr_parse_any_str(&c->created_from_addr, &s); + if (!redis_hash_get_str(&callid, &call, "created_from")) + c->created_from = call_strdup(c, callid.s); + if (!redis_hash_get_str(&callid, &call, "created_from_addr")) + sockaddr_parse_any_str(&c->created_from_addr, &callid); err = "missing 'redis_hosted_db' value"; if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) @@ -1886,12 +1607,12 @@ err1: freeReplyObject(rr_jsonStr); log_info_clear(); if (err) { - rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(callid), + rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(&callid), err); if (c) call_destroy(c); else - redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(callid)); + redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(&callid)); } if (c) obj_put(c); @@ -1911,127 +1632,6 @@ static void redis_restore_recording(struct call *c, struct redis_hash *call) { } -static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) { - struct redis_hash call; - struct redis_list tags, sfds, streams, medias, maps; - struct call *c = NULL; - str s; - const char *err; - int i; - - c = call_get_or_create(id, m, type); - err = "failed to create call struct"; - if (!c) - goto err1; - err = "call already exists"; - if (c->last_signal) - goto err2; - err = "'call' data incomplete"; - if (redis_get_hash(&call, r, "call", id, -1)) - goto err2; - err = "'tags' incomplete"; - if (redis_get_list_hash(&tags, r, "tag", id, &call, "num_tags")) - goto err3; - err = "'sfds' incomplete"; - if (redis_get_list_hash(&sfds, r, "sfd", id, &call, "num_sfds")) - goto err4; - err = "'streams' incomplete"; - if (redis_get_list_hash(&streams, r, "stream", id, &call, "num_streams")) - goto err5; - err = "'medias' incomplete"; - if (redis_get_list_hash(&medias, r, "media", id, &call, "num_medias")) - goto err6; - err = "'maps' incomplete"; - if (redis_get_list_hash(&maps, r, "map", id, &call, "num_maps")) - goto err7; - - err = "missing 'created' timestamp"; - if (redis_hash_get_time_t(&c->created, &call, "created")) - goto err8; - err = "missing 'last signal' timestamp"; - if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal")) - goto err8; - if (redis_hash_get_int(&i, &call, "tos")) - c->tos = 184; - else - c->tos = i; - redis_hash_get_time_t(&c->deleted, &call, "deleted"); - redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted"); - if (!redis_hash_get_str(&s, &call, "created_from")) - c->created_from = call_strdup(c, s.s); - if (!redis_hash_get_str(&s, &call, "created_from_addr")) - sockaddr_parse_any_str(&c->created_from_addr, &s); - - err = "missing 'redis_hosted_db' value"; - if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db")) - goto err8; - - err = "failed to create sfds"; - if (redis_sfds(c, &sfds)) - goto err8; - err = "failed to create streams"; - if (redis_streams(c, &streams)) - goto err8; - err = "failed to create tags"; - if (redis_tags(c, &tags)) - goto err8; - err = "failed to create medias"; - if (redis_medias(r, c, &medias)) - goto err8; - err = "failed to create maps"; - if (redis_maps(c, &maps)) - goto err8; - - err = "failed to link sfds"; - if (redis_link_sfds(&sfds, &streams)) - goto err8; - err = "failed to link streams"; - if (redis_link_streams(r, c, &streams, &sfds, &medias)) - goto err8; - err = "failed to link tags"; - if (redis_link_tags(r, c, &tags, &medias)) - goto err8; - err = "failed to link medias"; - if (redis_link_medias(r, c, &medias, &streams, &maps, &tags)) - goto err8; - err = "failed to link maps"; - if (redis_link_maps(r, c, &maps, &sfds)) - goto err8; - - redis_restore_recording(c, &call); - - err = NULL; - -err8: - redis_destroy_list(&maps); -err7: - redis_destroy_list(&medias); -err6: - redis_destroy_list(&streams); -err5: - redis_destroy_list(&sfds); -err4: - redis_destroy_list(&tags); -err3: - redis_destroy_hash(&call); -err2: - rwlock_unlock_w(&c->master_lock); -err1: - log_info_clear(); - if (err) { - rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(id), err); - if (c) - call_destroy(c); - else - redisCommandNR(m->conf.redis_write->ctx, "SREM calls "PB"", STR(id)); - } - - if (c) - obj_put(c); -} - - - struct thread_ctx { struct callmaster *m; GQueue r_q; @@ -2043,6 +1643,7 @@ static void restore_thread(void *call_p, void *ctx_p) { redisReply *call = call_p; struct redis *r; str callid; + str_init(&callid,call->str); rlog(LOG_DEBUG, "Processing call ID '%.*s' from Redis", REDIS_FMT(call)); @@ -2050,14 +1651,7 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - str_init_len(&callid, call->str, call->len); - - if (ctx->m->conf.redis_multikey) { - redis_restore_call(r, ctx->m, &callid, CT_OWN_CALL); - } else { - if (str_shift_cmp(&callid, "json-") == 0) - json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); - } + json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); @@ -2085,11 +1679,7 @@ int redis_restore(struct callmaster *m, struct redis *r) { } mutex_unlock(&r->lock); - if (m->conf.redis_multikey) { - calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); - } else { - calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); - } + calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*"); if (!calls) { rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr); @@ -2299,7 +1889,6 @@ char* redis_encode_json(struct call *c) { JSON_SET_SIMPLE_CSTR("created_from",c->created_from); JSON_SET_SIMPLE_CSTR("created_from_addr",sockaddr_print_buf(&c->created_from_addr)); JSON_SET_SIMPLE("redis_hosted_db","%u",c->redis_hosted_db); - } json_builder_end_object (builder); diff --git a/daemon/redis.h b/daemon/redis.h index ce4eddf67..6cc0f9bc4 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -13,6 +13,7 @@ #include #include #include "call.h" +#include "str.h" #define REDIS_RESTORE_NUM_THREADS 4 @@ -62,7 +63,7 @@ struct redis { int no_redis_required; }; struct redis_hash { - redisReply *rr; + str s; GHashTable *ht; }; struct redis_list { @@ -72,12 +73,6 @@ struct redis_list { }; - - - - - - #if !GLIB_CHECK_VERSION(2,40,0) INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) { gboolean ret = TRUE; @@ -129,12 +124,12 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a #define define_get_int_type(name, type, func) \ static int redis_hash_get_ ## name(type *out, const struct redis_hash *h, const char *k) { \ - redisReply *r; \ + str* s; \ \ - r = g_hash_table_lookup(h->ht, k); \ - if (!r) \ + s = g_hash_table_lookup(h->ht, k); \ + if (!s) \ return -1; \ - *out = func(r->str, NULL, 10); \ + *out = func(s->s, NULL, 10); \ return 0; \ }