|
|
|
|
@ -67,7 +67,6 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...)
|
|
|
|
|
|
|
|
|
|
#define REDIS_FMT(x) (x)->len, (x)->str
|
|
|
|
|
|
|
|
|
|
static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type);
|
|
|
|
|
static int redis_check_conn(struct redis *r);
|
|
|
|
|
static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type);
|
|
|
|
|
|
|
|
|
|
@ -310,10 +309,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (cm->conf.redis_multikey && strncmp(rr->element[3]->str,"sadd",4)==0)
|
|
|
|
|
redis_restore_call(r, cm, &callid, CT_FOREIGN_CALL);
|
|
|
|
|
|
|
|
|
|
if (!cm->conf.redis_multikey && strncmp(rr->element[3]->str,"set",3)==0) {
|
|
|
|
|
if (strncmp(rr->element[3]->str,"set",3)==0) {
|
|
|
|
|
c = call_get(&callid, cm);
|
|
|
|
|
if (c) {
|
|
|
|
|
rwlock_unlock_w(&c->master_lock);
|
|
|
|
|
@ -785,8 +781,7 @@ INLINE redisReply *createReplyObject(int type) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int json_get_hash(struct redis_hash *out, struct call* c,
|
|
|
|
|
const char *key,
|
|
|
|
|
unsigned int id)
|
|
|
|
|
const char *key, unsigned int id)
|
|
|
|
|
{
|
|
|
|
|
static unsigned int MAXKEYLENGTH = 512;
|
|
|
|
|
char key_concatted[MAXKEYLENGTH];
|
|
|
|
|
@ -803,7 +798,6 @@ static int json_get_hash(struct redis_hash *out, struct call* c,
|
|
|
|
|
if (rc>=MAXKEYLENGTH)
|
|
|
|
|
rlog(LOG_ERROR,"Json key too long.");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (!json_reader_read_member(c->root_reader, key_concatted)) {
|
|
|
|
|
rlog(LOG_ERROR, "Could not read json member: %s",key_concatted);
|
|
|
|
|
goto err;
|
|
|
|
|
@ -812,7 +806,6 @@ static int json_get_hash(struct redis_hash *out, struct call* c,
|
|
|
|
|
out->ht = g_hash_table_new_full(g_str_hash, g_str_equal, free, freeReplyObject);
|
|
|
|
|
if (!out->ht)
|
|
|
|
|
goto err;
|
|
|
|
|
out->rr = 0;
|
|
|
|
|
|
|
|
|
|
gchar **members = json_reader_list_members(c->root_reader);
|
|
|
|
|
gchar **orig_members = members;
|
|
|
|
|
@ -823,16 +816,11 @@ static int json_get_hash(struct redis_hash *out, struct call* c,
|
|
|
|
|
rlog(LOG_ERROR, "Could not read json member: %s",*members);
|
|
|
|
|
goto err3;
|
|
|
|
|
}
|
|
|
|
|
out->rr = createReplyObject(REDIS_REPLY_STRING);
|
|
|
|
|
if (!out->rr) {
|
|
|
|
|
rlog(LOG_ERROR, "Could not create redis reply object (out of memory?)");
|
|
|
|
|
goto err3;
|
|
|
|
|
}
|
|
|
|
|
out->rr->str = json_reader_get_string_value_uri_enc(c->root_reader, &out->rr->len);
|
|
|
|
|
|
|
|
|
|
str_init_len(&out->s,(char*)json_reader_get_string_value(c->root_reader),strlen((char*)json_reader_get_string_value(c->root_reader)));
|
|
|
|
|
char* tmp = strdup(*members);
|
|
|
|
|
|
|
|
|
|
if (g_hash_table_insert_check(out->ht, tmp, out->rr) != TRUE) {
|
|
|
|
|
if (g_hash_table_insert_check(out->ht, tmp, str_dup(&out->s)) != TRUE) {
|
|
|
|
|
ilog(LOG_WARNING,"Key %s already exists", tmp);
|
|
|
|
|
goto err3;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -847,68 +835,11 @@ static int json_get_hash(struct redis_hash *out, struct call* c,
|
|
|
|
|
|
|
|
|
|
err3:
|
|
|
|
|
g_strfreev(members);
|
|
|
|
|
if (out->rr)
|
|
|
|
|
freeReplyObject(out->rr);
|
|
|
|
|
g_hash_table_destroy(out->ht);
|
|
|
|
|
err:
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const str *which,
|
|
|
|
|
unsigned int id)
|
|
|
|
|
{
|
|
|
|
|
redisReply *k, *v;
|
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
out->ht = g_hash_table_new(g_str_hash, g_str_equal);
|
|
|
|
|
if (!out->ht)
|
|
|
|
|
goto err;
|
|
|
|
|
if (id == -1)
|
|
|
|
|
out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"", key, STR(which));
|
|
|
|
|
else
|
|
|
|
|
out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"-%u", key, STR(which), id);
|
|
|
|
|
if (!out->rr)
|
|
|
|
|
goto err2;
|
|
|
|
|
|
|
|
|
|
for (i = 1; i < out->rr->elements; i += 2) {
|
|
|
|
|
k = out->rr->element[i - 1];
|
|
|
|
|
v = out->rr->element[i];
|
|
|
|
|
if (k->type != REDIS_REPLY_STRING || v->type != REDIS_REPLY_STRING)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
if (g_hash_table_insert_check(out->ht, k->str, v) != TRUE)
|
|
|
|
|
goto err3;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
err3:
|
|
|
|
|
if (out->rr)
|
|
|
|
|
freeReplyObject(out->rr);
|
|
|
|
|
err2:
|
|
|
|
|
g_hash_table_destroy(out->ht);
|
|
|
|
|
err:
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void redis_destroy_hash(struct redis_hash *rh) {
|
|
|
|
|
|
|
|
|
|
if (rh->rr)
|
|
|
|
|
freeReplyObject(rh->rr);
|
|
|
|
|
g_hash_table_destroy(rh->ht);
|
|
|
|
|
}
|
|
|
|
|
static void redis_destroy_list(struct redis_list *rl) {
|
|
|
|
|
unsigned int i;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < rl->len; i++) {
|
|
|
|
|
redis_destroy_hash(&rl->rh[i]);
|
|
|
|
|
}
|
|
|
|
|
free(rl->rh);
|
|
|
|
|
free(rl->ptrs);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void json_destroy_hash(struct redis_hash *rh) {
|
|
|
|
|
g_hash_table_destroy(rh->ht);
|
|
|
|
|
}
|
|
|
|
|
@ -924,7 +855,7 @@ static void json_destroy_list(struct redis_list *rl) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *k) {
|
|
|
|
|
redisReply *r;
|
|
|
|
|
str *r;
|
|
|
|
|
|
|
|
|
|
r = g_hash_table_lookup(h->ht, k);
|
|
|
|
|
if (!r) {
|
|
|
|
|
@ -932,7 +863,7 @@ static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *
|
|
|
|
|
out->len = 0;
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
out->s = r->str;
|
|
|
|
|
out->s = r->s;
|
|
|
|
|
out->len = r->len;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
@ -1054,34 +985,6 @@ static int json_build_list_cb(GQueue *q, struct call *c, const char *key,
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int redis_build_list_cb(GQueue *q, struct redis *r, const char *key, const str *callid,
|
|
|
|
|
unsigned int idx, struct redis_list *list,
|
|
|
|
|
int (*cb)(str *, GQueue *, struct redis_list *, void *), void *ptr)
|
|
|
|
|
{
|
|
|
|
|
redisReply *rr;
|
|
|
|
|
int i;
|
|
|
|
|
str s;
|
|
|
|
|
|
|
|
|
|
rr = redis_get(r, REDIS_REPLY_ARRAY, "LRANGE %s-"PB"-%u 0 -1", key, STR(callid), idx);
|
|
|
|
|
if (!rr)
|
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < rr->elements; i++) {
|
|
|
|
|
if (rr->element[i]->type != REDIS_REPLY_STRING) {
|
|
|
|
|
freeReplyObject(rr);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
str_init_len(&s, rr->element[i]->str, rr->element[i]->len);
|
|
|
|
|
if (cb(&s, q, list, ptr)) {
|
|
|
|
|
freeReplyObject(rr);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
freeReplyObject(rr);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
static int rbl_cb_simple(str *s, GQueue *q, struct redis_list *list, void *ptr) {
|
|
|
|
|
int j;
|
|
|
|
|
j = str_to_i(s, 0);
|
|
|
|
|
@ -1095,12 +998,6 @@ static int json_build_list(GQueue *q, struct call *c, const char *key, const str
|
|
|
|
|
return json_build_list_cb(q, c, key, idx, list, rbl_cb_simple, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int redis_build_list(GQueue *q, struct redis *r, const char *key, const str *callid,
|
|
|
|
|
unsigned int idx, struct redis_list *list)
|
|
|
|
|
{
|
|
|
|
|
return redis_build_list_cb(q, r, key, callid, idx, list, rbl_cb_simple, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int json_get_list_hash(struct redis_list *out, struct call* c,
|
|
|
|
|
const char *key,
|
|
|
|
|
const struct redis_hash *rh, const char *rh_num_key)
|
|
|
|
|
@ -1134,41 +1031,6 @@ err1:
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const str *id,
|
|
|
|
|
const struct redis_hash *rh, const char *rh_num_key)
|
|
|
|
|
{
|
|
|
|
|
unsigned int i;
|
|
|
|
|
|
|
|
|
|
if (redis_hash_get_unsigned(&out->len, rh, rh_num_key))
|
|
|
|
|
return -1;
|
|
|
|
|
out->rh = malloc(sizeof(*out->rh) * out->len);
|
|
|
|
|
if (!out->rh)
|
|
|
|
|
return -1;
|
|
|
|
|
out->ptrs = malloc(sizeof(*out->ptrs) * out->len);
|
|
|
|
|
if (!out->ptrs)
|
|
|
|
|
goto err1;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < out->len; i++) {
|
|
|
|
|
if (redis_get_hash(&out->rh[i], r, key, id, i))
|
|
|
|
|
goto err2;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
err2:
|
|
|
|
|
free(out->ptrs);
|
|
|
|
|
while (i) {
|
|
|
|
|
i--;
|
|
|
|
|
redis_destroy_hash(&out->rh[i]);
|
|
|
|
|
}
|
|
|
|
|
err1:
|
|
|
|
|
free(out->rh);
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* can return 1, 0 or -1 */
|
|
|
|
|
static int redis_hash_get_crypto_params(struct crypto_params *out, const struct redis_hash *h, const char *k) {
|
|
|
|
|
str s;
|
|
|
|
|
@ -1432,62 +1294,6 @@ static int json_medias(struct redis *r, struct call *c, struct redis_list *media
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
static int redis_medias(struct redis *r, struct call *c, struct redis_list *medias) {
|
|
|
|
|
unsigned int i;
|
|
|
|
|
struct redis_hash *rh;
|
|
|
|
|
struct call_media *med;
|
|
|
|
|
str s;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < medias->len; i++) {
|
|
|
|
|
rh = &medias->rh[i];
|
|
|
|
|
|
|
|
|
|
/* from call.c:__get_media() */
|
|
|
|
|
med = uid_slice_alloc0(med, &c->medias);
|
|
|
|
|
med->call = c;
|
|
|
|
|
med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL,
|
|
|
|
|
__payload_type_free);
|
|
|
|
|
|
|
|
|
|
if (redis_hash_get_unsigned(&med->index, rh, "index"))
|
|
|
|
|
return -1;
|
|
|
|
|
if (redis_hash_get_str(&s, rh, "type"))
|
|
|
|
|
return -1;
|
|
|
|
|
call_str_cpy(c, &med->type, &s);
|
|
|
|
|
|
|
|
|
|
if (redis_hash_get_str(&s, rh, "protocol"))
|
|
|
|
|
return -1;
|
|
|
|
|
med->protocol = transport_protocol(&s);
|
|
|
|
|
|
|
|
|
|
if (redis_hash_get_str(&s, rh, "desired_family"))
|
|
|
|
|
return -1;
|
|
|
|
|
med->desired_family = get_socket_family_rfc(&s);
|
|
|
|
|
|
|
|
|
|
if (redis_hash_get_str(&s, rh, "logical_intf")
|
|
|
|
|
|| !(med->logical_intf = get_logical_interface(&s, med->desired_family, 0)))
|
|
|
|
|
{
|
|
|
|
|
rlog(LOG_ERR, "unable to find specified local interface");
|
|
|
|
|
med->logical_intf = get_logical_interface(NULL, med->desired_family, 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (redis_hash_get_unsigned(&med->sdes_in.tag, rh, "sdes_in_tag"))
|
|
|
|
|
return -1;
|
|
|
|
|
if (redis_hash_get_unsigned(&med->sdes_out.tag, rh, "sdes_out_tag"))
|
|
|
|
|
return -1;
|
|
|
|
|
if (redis_hash_get_unsigned((unsigned int *) &med->media_flags, rh,
|
|
|
|
|
"media_flags"))
|
|
|
|
|
return -1;
|
|
|
|
|
if (redis_hash_get_crypto_params(&med->sdes_in.params, rh, "sdes_in") < 0)
|
|
|
|
|
return -1;
|
|
|
|
|
if (redis_hash_get_crypto_params(&med->sdes_out.params, rh, "sdes_out") < 0)
|
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
|
|
redis_build_list_cb(NULL, r, "payload_types", &c->callid, i, NULL, rbl_cb_plts, med);
|
|
|
|
|
/* XXX dtls */
|
|
|
|
|
|
|
|
|
|
medias->ptrs[i] = med;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int redis_maps(struct call *c, struct redis_list *maps) {
|
|
|
|
|
unsigned int i;
|
|
|
|
|
@ -1570,35 +1376,6 @@ static int json_link_tags(struct call *c, struct redis_list *tags, struct redis_
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int redis_link_tags(struct redis *r, struct call *c, struct redis_list *tags, struct redis_list *medias)
|
|
|
|
|
{
|
|
|
|
|
unsigned int i;
|
|
|
|
|
struct call_monologue *ml, *other_ml;
|
|
|
|
|
GQueue q = G_QUEUE_INIT;
|
|
|
|
|
GList *l;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < tags->len; i++) {
|
|
|
|
|
ml = tags->ptrs[i];
|
|
|
|
|
|
|
|
|
|
ml->active_dialogue = redis_list_get_ptr(tags, &tags->rh[i], "active");
|
|
|
|
|
|
|
|
|
|
if (redis_build_list(&q, r, "other_tags", &c->callid, i, tags))
|
|
|
|
|
return -1;
|
|
|
|
|
for (l = q.head; l; l = l->next) {
|
|
|
|
|
other_ml = l->data;
|
|
|
|
|
if (!other_ml)
|
|
|
|
|
return -1;
|
|
|
|
|
g_hash_table_insert(ml->other_tags, &other_ml->tag, other_ml);
|
|
|
|
|
}
|
|
|
|
|
g_queue_clear(&q);
|
|
|
|
|
|
|
|
|
|
if (redis_build_list(&ml->medias, r, "medias", &c->callid, i, medias))
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int json_link_streams(struct call *c, struct redis_list *streams,
|
|
|
|
|
struct redis_list *sfds, struct redis_list *medias)
|
|
|
|
|
{
|
|
|
|
|
@ -1624,31 +1401,6 @@ static int json_link_streams(struct call *c, struct redis_list *streams,
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int redis_link_streams(struct redis *r, struct call *c, struct redis_list *streams,
|
|
|
|
|
struct redis_list *sfds, struct redis_list *medias)
|
|
|
|
|
{
|
|
|
|
|
unsigned int i;
|
|
|
|
|
struct packet_stream *ps;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < streams->len; i++) {
|
|
|
|
|
ps = streams->ptrs[i];
|
|
|
|
|
|
|
|
|
|
ps->media = redis_list_get_ptr(medias, &streams->rh[i], "media");
|
|
|
|
|
ps->selected_sfd = redis_list_get_ptr(sfds, &streams->rh[i], "sfd");
|
|
|
|
|
ps->rtp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtp_sink");
|
|
|
|
|
ps->rtcp_sink = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sink");
|
|
|
|
|
ps->rtcp_sibling = redis_list_get_ptr(streams, &streams->rh[i], "rtcp_sibling");
|
|
|
|
|
|
|
|
|
|
if (redis_build_list(&ps->sfds, r, "stream_sfds", &c->callid, i, sfds))
|
|
|
|
|
return -1;
|
|
|
|
|
|
|
|
|
|
if (ps->media)
|
|
|
|
|
__rtp_stats_update(ps->rtp_stats, ps->media->rtp_payload_types);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int json_link_medias(struct redis *r, struct call *c, struct redis_list *medias,
|
|
|
|
|
struct redis_list *streams, struct redis_list *maps, struct redis_list *tags)
|
|
|
|
|
{
|
|
|
|
|
@ -1669,26 +1421,6 @@ static int json_link_medias(struct redis *r, struct call *c, struct redis_list *
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int redis_link_medias(struct redis *r, struct call *c, struct redis_list *medias,
|
|
|
|
|
struct redis_list *streams, struct redis_list *maps, struct redis_list *tags)
|
|
|
|
|
{
|
|
|
|
|
unsigned int i;
|
|
|
|
|
struct call_media *med;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < medias->len; i++) {
|
|
|
|
|
med = medias->ptrs[i];
|
|
|
|
|
|
|
|
|
|
med->monologue = redis_list_get_ptr(tags, &medias->rh[i], "tag");
|
|
|
|
|
if (!med->monologue)
|
|
|
|
|
return -1;
|
|
|
|
|
if (redis_build_list(&med->streams, r, "streams", &c->callid, i, streams))
|
|
|
|
|
return -1;
|
|
|
|
|
if (redis_build_list(&med->endpoint_maps, r, "maps", &c->callid, i, maps))
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int rbl_cb_intf_sfds(str *s, GQueue *q, struct redis_list *list, void *ptr) {
|
|
|
|
|
int i;
|
|
|
|
|
struct intf_list *il;
|
|
|
|
|
@ -1734,37 +1466,25 @@ static int json_link_maps(struct redis *r, struct call *c, struct redis_list *ma
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *maps,
|
|
|
|
|
struct redis_list *sfds)
|
|
|
|
|
{
|
|
|
|
|
unsigned int i;
|
|
|
|
|
struct endpoint_map *em;
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < maps->len; i++) {
|
|
|
|
|
em = maps->ptrs[i];
|
|
|
|
|
|
|
|
|
|
if (redis_build_list_cb(&em->intf_sfds, r, "map_sfds", &c->callid, em->unique_id, sfds,
|
|
|
|
|
rbl_cb_intf_sfds, em))
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void json_restore_call(struct redis *r, struct callmaster *m, const str *callid, enum call_type type) {
|
|
|
|
|
static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) {
|
|
|
|
|
redisReply* rr_jsonStr;
|
|
|
|
|
struct redis_hash call;
|
|
|
|
|
struct redis_list tags, sfds, streams, medias, maps;
|
|
|
|
|
struct call *c = NULL;
|
|
|
|
|
str callid;
|
|
|
|
|
|
|
|
|
|
const char *err = 0;
|
|
|
|
|
int i;
|
|
|
|
|
str s;
|
|
|
|
|
JsonReader *root_reader =0;
|
|
|
|
|
JsonParser *parser =0;
|
|
|
|
|
|
|
|
|
|
rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(callid));
|
|
|
|
|
// TODO: Maybe refactor
|
|
|
|
|
str_init_len(&callid, id->s, id->len);
|
|
|
|
|
str_shift(&callid,strlen("json-"));
|
|
|
|
|
|
|
|
|
|
rr_jsonStr = redis_get(r, REDIS_REPLY_STRING, "GET json-" PB, STR(&callid));
|
|
|
|
|
if (!rr_jsonStr) {
|
|
|
|
|
rlog(LOG_ERR, "Could not retrieve json data from redis for callid: " STR_FORMAT,
|
|
|
|
|
STR_FMT(callid));
|
|
|
|
|
rlog(LOG_ERR, "Could not retrieve json data from redis for key: %s", id->s);
|
|
|
|
|
goto err1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -1779,7 +1499,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str *
|
|
|
|
|
goto err1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c = call_get_or_create(callid, m, type);
|
|
|
|
|
c = call_get_or_create(&callid, m, type);
|
|
|
|
|
err = "failed to create call struct";
|
|
|
|
|
if (!c)
|
|
|
|
|
goto err1;
|
|
|
|
|
@ -1790,6 +1510,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str *
|
|
|
|
|
if (c->last_signal)
|
|
|
|
|
goto err2;
|
|
|
|
|
err = "'call' data incomplete";
|
|
|
|
|
|
|
|
|
|
if (json_get_hash(&call, c, "json", -1))
|
|
|
|
|
goto err2;
|
|
|
|
|
err = "'tags' incomplete";
|
|
|
|
|
@ -1820,10 +1541,10 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str *
|
|
|
|
|
c->tos = i;
|
|
|
|
|
redis_hash_get_time_t(&c->deleted, &call, "deleted");
|
|
|
|
|
redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted");
|
|
|
|
|
if (!redis_hash_get_str(&s, &call, "created_from"))
|
|
|
|
|
c->created_from = call_strdup(c, s.s);
|
|
|
|
|
if (!redis_hash_get_str(&s, &call, "created_from_addr"))
|
|
|
|
|
sockaddr_parse_any_str(&c->created_from_addr, &s);
|
|
|
|
|
if (!redis_hash_get_str(&callid, &call, "created_from"))
|
|
|
|
|
c->created_from = call_strdup(c, callid.s);
|
|
|
|
|
if (!redis_hash_get_str(&callid, &call, "created_from_addr"))
|
|
|
|
|
sockaddr_parse_any_str(&c->created_from_addr, &callid);
|
|
|
|
|
|
|
|
|
|
err = "missing 'redis_hosted_db' value";
|
|
|
|
|
if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db"))
|
|
|
|
|
@ -1886,12 +1607,12 @@ err1:
|
|
|
|
|
freeReplyObject(rr_jsonStr);
|
|
|
|
|
log_info_clear();
|
|
|
|
|
if (err) {
|
|
|
|
|
rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(callid),
|
|
|
|
|
rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(&callid),
|
|
|
|
|
err);
|
|
|
|
|
if (c)
|
|
|
|
|
call_destroy(c);
|
|
|
|
|
else
|
|
|
|
|
redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(callid));
|
|
|
|
|
redisCommandNR(m->conf.redis_write->ctx, "DEL json-" PB, STR(&callid));
|
|
|
|
|
}
|
|
|
|
|
if (c)
|
|
|
|
|
obj_put(c);
|
|
|
|
|
@ -1911,127 +1632,6 @@ static void redis_restore_recording(struct call *c, struct redis_hash *call) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void redis_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type) {
|
|
|
|
|
struct redis_hash call;
|
|
|
|
|
struct redis_list tags, sfds, streams, medias, maps;
|
|
|
|
|
struct call *c = NULL;
|
|
|
|
|
str s;
|
|
|
|
|
const char *err;
|
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
c = call_get_or_create(id, m, type);
|
|
|
|
|
err = "failed to create call struct";
|
|
|
|
|
if (!c)
|
|
|
|
|
goto err1;
|
|
|
|
|
err = "call already exists";
|
|
|
|
|
if (c->last_signal)
|
|
|
|
|
goto err2;
|
|
|
|
|
err = "'call' data incomplete";
|
|
|
|
|
if (redis_get_hash(&call, r, "call", id, -1))
|
|
|
|
|
goto err2;
|
|
|
|
|
err = "'tags' incomplete";
|
|
|
|
|
if (redis_get_list_hash(&tags, r, "tag", id, &call, "num_tags"))
|
|
|
|
|
goto err3;
|
|
|
|
|
err = "'sfds' incomplete";
|
|
|
|
|
if (redis_get_list_hash(&sfds, r, "sfd", id, &call, "num_sfds"))
|
|
|
|
|
goto err4;
|
|
|
|
|
err = "'streams' incomplete";
|
|
|
|
|
if (redis_get_list_hash(&streams, r, "stream", id, &call, "num_streams"))
|
|
|
|
|
goto err5;
|
|
|
|
|
err = "'medias' incomplete";
|
|
|
|
|
if (redis_get_list_hash(&medias, r, "media", id, &call, "num_medias"))
|
|
|
|
|
goto err6;
|
|
|
|
|
err = "'maps' incomplete";
|
|
|
|
|
if (redis_get_list_hash(&maps, r, "map", id, &call, "num_maps"))
|
|
|
|
|
goto err7;
|
|
|
|
|
|
|
|
|
|
err = "missing 'created' timestamp";
|
|
|
|
|
if (redis_hash_get_time_t(&c->created, &call, "created"))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "missing 'last signal' timestamp";
|
|
|
|
|
if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal"))
|
|
|
|
|
goto err8;
|
|
|
|
|
if (redis_hash_get_int(&i, &call, "tos"))
|
|
|
|
|
c->tos = 184;
|
|
|
|
|
else
|
|
|
|
|
c->tos = i;
|
|
|
|
|
redis_hash_get_time_t(&c->deleted, &call, "deleted");
|
|
|
|
|
redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted");
|
|
|
|
|
if (!redis_hash_get_str(&s, &call, "created_from"))
|
|
|
|
|
c->created_from = call_strdup(c, s.s);
|
|
|
|
|
if (!redis_hash_get_str(&s, &call, "created_from_addr"))
|
|
|
|
|
sockaddr_parse_any_str(&c->created_from_addr, &s);
|
|
|
|
|
|
|
|
|
|
err = "missing 'redis_hosted_db' value";
|
|
|
|
|
if (redis_hash_get_unsigned((unsigned int *) &c->redis_hosted_db, &call, "redis_hosted_db"))
|
|
|
|
|
goto err8;
|
|
|
|
|
|
|
|
|
|
err = "failed to create sfds";
|
|
|
|
|
if (redis_sfds(c, &sfds))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "failed to create streams";
|
|
|
|
|
if (redis_streams(c, &streams))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "failed to create tags";
|
|
|
|
|
if (redis_tags(c, &tags))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "failed to create medias";
|
|
|
|
|
if (redis_medias(r, c, &medias))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "failed to create maps";
|
|
|
|
|
if (redis_maps(c, &maps))
|
|
|
|
|
goto err8;
|
|
|
|
|
|
|
|
|
|
err = "failed to link sfds";
|
|
|
|
|
if (redis_link_sfds(&sfds, &streams))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "failed to link streams";
|
|
|
|
|
if (redis_link_streams(r, c, &streams, &sfds, &medias))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "failed to link tags";
|
|
|
|
|
if (redis_link_tags(r, c, &tags, &medias))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "failed to link medias";
|
|
|
|
|
if (redis_link_medias(r, c, &medias, &streams, &maps, &tags))
|
|
|
|
|
goto err8;
|
|
|
|
|
err = "failed to link maps";
|
|
|
|
|
if (redis_link_maps(r, c, &maps, &sfds))
|
|
|
|
|
goto err8;
|
|
|
|
|
|
|
|
|
|
redis_restore_recording(c, &call);
|
|
|
|
|
|
|
|
|
|
err = NULL;
|
|
|
|
|
|
|
|
|
|
err8:
|
|
|
|
|
redis_destroy_list(&maps);
|
|
|
|
|
err7:
|
|
|
|
|
redis_destroy_list(&medias);
|
|
|
|
|
err6:
|
|
|
|
|
redis_destroy_list(&streams);
|
|
|
|
|
err5:
|
|
|
|
|
redis_destroy_list(&sfds);
|
|
|
|
|
err4:
|
|
|
|
|
redis_destroy_list(&tags);
|
|
|
|
|
err3:
|
|
|
|
|
redis_destroy_hash(&call);
|
|
|
|
|
err2:
|
|
|
|
|
rwlock_unlock_w(&c->master_lock);
|
|
|
|
|
err1:
|
|
|
|
|
log_info_clear();
|
|
|
|
|
if (err) {
|
|
|
|
|
rlog(LOG_WARNING, "Failed to restore call ID '" STR_FORMAT "' from Redis: %s", STR_FMT(id), err);
|
|
|
|
|
if (c)
|
|
|
|
|
call_destroy(c);
|
|
|
|
|
else
|
|
|
|
|
redisCommandNR(m->conf.redis_write->ctx, "SREM calls "PB"", STR(id));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (c)
|
|
|
|
|
obj_put(c);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct thread_ctx {
|
|
|
|
|
struct callmaster *m;
|
|
|
|
|
GQueue r_q;
|
|
|
|
|
@ -2043,6 +1643,7 @@ static void restore_thread(void *call_p, void *ctx_p) {
|
|
|
|
|
redisReply *call = call_p;
|
|
|
|
|
struct redis *r;
|
|
|
|
|
str callid;
|
|
|
|
|
str_init(&callid,call->str);
|
|
|
|
|
|
|
|
|
|
rlog(LOG_DEBUG, "Processing call ID '%.*s' from Redis", REDIS_FMT(call));
|
|
|
|
|
|
|
|
|
|
@ -2050,14 +1651,7 @@ static void restore_thread(void *call_p, void *ctx_p) {
|
|
|
|
|
r = g_queue_pop_head(&ctx->r_q);
|
|
|
|
|
mutex_unlock(&ctx->r_m);
|
|
|
|
|
|
|
|
|
|
str_init_len(&callid, call->str, call->len);
|
|
|
|
|
|
|
|
|
|
if (ctx->m->conf.redis_multikey) {
|
|
|
|
|
redis_restore_call(r, ctx->m, &callid, CT_OWN_CALL);
|
|
|
|
|
} else {
|
|
|
|
|
if (str_shift_cmp(&callid, "json-") == 0)
|
|
|
|
|
json_restore_call(r, ctx->m, &callid, CT_OWN_CALL);
|
|
|
|
|
}
|
|
|
|
|
json_restore_call(r, ctx->m, &callid, CT_OWN_CALL);
|
|
|
|
|
|
|
|
|
|
mutex_lock(&ctx->r_m);
|
|
|
|
|
g_queue_push_tail(&ctx->r_q, r);
|
|
|
|
|
@ -2085,11 +1679,7 @@ int redis_restore(struct callmaster *m, struct redis *r) {
|
|
|
|
|
}
|
|
|
|
|
mutex_unlock(&r->lock);
|
|
|
|
|
|
|
|
|
|
if (m->conf.redis_multikey) {
|
|
|
|
|
calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls");
|
|
|
|
|
} else {
|
|
|
|
|
calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*");
|
|
|
|
|
}
|
|
|
|
|
calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS json-*");
|
|
|
|
|
|
|
|
|
|
if (!calls) {
|
|
|
|
|
rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr);
|
|
|
|
|
@ -2299,7 +1889,6 @@ char* redis_encode_json(struct call *c) {
|
|
|
|
|
JSON_SET_SIMPLE_CSTR("created_from",c->created_from);
|
|
|
|
|
JSON_SET_SIMPLE_CSTR("created_from_addr",sockaddr_print_buf(&c->created_from_addr));
|
|
|
|
|
JSON_SET_SIMPLE("redis_hosted_db","%u",c->redis_hosted_db);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
json_builder_end_object (builder);
|
|
|
|
|
|