Removes multikey stuff

pull/323/head
Frederic-Philippe Metz 8 years ago
parent f3364d9e7d
commit 14b37ebfe5

@ -595,12 +595,8 @@ static void callmaster_timer(void *ptr) {
rwlock_unlock_r(&sfd->call->master_lock);
if (update) {
if (m->conf.redis_multikey) {
redis_update(ps->call, m->conf.redis_write);
} else {
redis_update_onekey(ps->call, m->conf.redis_write);
}
}
next:
g_hash_table_remove(hlp.addr_sfd, &ep);

@ -463,7 +463,6 @@ struct callmaster_config {
GQueue *redis_subscribed_keyspaces;
struct redisAsyncContext *redis_notify_async_context;
unsigned int redis_expires_secs;
unsigned int redis_multikey;
char *b2b_url;
unsigned char default_tos;
enum xmlrpc_format fmt;

@ -186,11 +186,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o
sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP);
rwlock_unlock_w(&c->master_lock);
if (m->conf.redis_multikey) {
redis_update(c, m->conf.redis_write);
} else {
redis_update_onekey(c, m->conf.redis_write);
}
gettimeofday(&(monologue->started), NULL);
@ -338,11 +334,7 @@ out2:
rwlock_unlock_w(&c->master_lock);
streams_free(&s);
if (m->conf.redis_multikey) {
redis_update(c, m->conf.redis_write);
} else {
redis_update_onekey(c, m->conf.redis_write);
}
ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret));
obj_put(c);
@ -774,11 +766,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
rwlock_unlock_w(&call->master_lock);
if (!flags.no_redis_update) {
if (m->conf.redis_multikey) {
redis_update(call, m->conf.redis_write);
} else {
redis_update_onekey(call,m->conf.redis_write);
}
} else {
ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag");
}

@ -62,7 +62,6 @@ static unsigned int timeout;
static unsigned int silent_timeout;
static unsigned int final_timeout;
static unsigned int redis_expires = 86400;
static unsigned int redis_multikey = 0;
static int port_min = 30000;
static int port_max = 40000;
static int max_sessions = -1;
@ -299,7 +298,6 @@ static void options(int *argc, char ***argv) {
{ "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" },
{ "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" },
{ "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" },
{ "redis-multikey", 0, 0, G_OPTION_ARG_NONE, &redis_multikey, "Use multiple redis keys for storing the call (old behaviour) DEPRECATED", NULL },
{ "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL },
{ "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" },
{ "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"},
@ -628,7 +626,6 @@ no_kernel:
}
mc.redis_expires_secs = redis_expires;
mc.redis_multikey = redis_multikey;
ctx->m->conf = mc;

@ -1472,12 +1472,8 @@ out:
ca = sfd->call ? : NULL;
if (ca && update) {
if (ca->callmaster->conf.redis_multikey) {
redis_update(ca, ca->callmaster->conf.redis_write);
} else {
redis_update_onekey(ca, ca->callmaster->conf.redis_write);
}
}
done:
log_info_clear();
}

@ -286,17 +286,10 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
goto err;
// now at <key>
if (cm->conf.redis_multikey) {
if (str_shift_cmp(&keyspace_id, "notifier-")) {
rlog(LOG_ERROR,"Redis-Notifier: The prefix 'notifier-' to determine the redis key has not been found in the redis notification !\n");
goto err;
}
} else {
if (str_shift_cmp(&keyspace_id, "json-")) {
rlog(LOG_ERROR,"Redis-Notifier: The prefix 'json-' to determine the redis key has not been found in the redis notification !\n");
goto err;
}
}
callid = keyspace_id;
@ -463,31 +456,6 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a
return -1;
}
if (cm->conf.redis_multikey) {
switch (action) {
case SUBSCRIBE_KEYSPACE:
if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:notifier-*", keyspace) != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on SUBSCRIBE_KEYSPACE");
return -1;
}
break;
case UNSUBSCRIBE_KEYSPACE:
if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:notifier-*", keyspace) != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_KEYSPACE");
return -1;
}
break;
case UNSUBSCRIBE_ALL:
if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_ALL");
return -1;
}
break;
default:
rlog(LOG_ERROR, "No subscribe action found: %d", action);
return -1;
}
} else {
switch (action) {
case SUBSCRIBE_KEYSPACE:
if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:json-*", keyspace) != REDIS_OK) {
@ -511,7 +479,6 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a
rlog(LOG_ERROR, "No subscribe action found: %d", action);
return -1;
}
}
return 0;
}
@ -718,42 +685,12 @@ static int redis_check_conn(struct redis *r) {
return REDIS_STATE_RECONNECTED;
}
static void redis_delete_list(struct redis *r, const str *callid, const char *prefix, GQueue *q) {
unsigned int i;
for (i = 0; i < g_queue_get_length(q); i++)
redis_pipe(r, "DEL %s-"PB"-%u", prefix, STR(callid), i);
}
/* called with r->lock held and c->master_lock held */
static void redis_delete_call_json(struct call *c, struct redis *r) {
redis_pipe(r, "DEL json-"PB"", STR(&c->callid));
redis_consume(r);
}
/* called with r->lock held and c->master_lock held */
static void redis_delete_call(struct call *c, struct redis *r) {
redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid));
redis_pipe(r, "SREM calls "PB"", STR(&c->callid));
redis_pipe(r, "DEL call-"PB"", STR(&c->callid));
redis_delete_list(r, &c->callid, "sfd", &c->stream_fds);
redis_delete_list(r, &c->callid, "stream", &c->streams);
redis_delete_list(r, &c->callid, "stream_sfds", &c->streams);
redis_delete_list(r, &c->callid, "tag", &c->monologues);
redis_delete_list(r, &c->callid, "other_tags", &c->monologues);
redis_delete_list(r, &c->callid, "medias", &c->monologues);
redis_delete_list(r, &c->callid, "media", &c->medias);
redis_delete_list(r, &c->callid, "streams", &c->medias);
redis_delete_list(r, &c->callid, "maps", &c->medias);
redis_delete_list(r, &c->callid, "payload_types", &c->medias);
redis_delete_list(r, &c->callid, "map", &c->endpoint_maps);
redis_delete_list(r, &c->callid, "map_sfds", &c->endpoint_maps);
redis_consume(r);
}
INLINE void json_builder_add_string_value_uri_enc(JsonBuilder *builder, const char* tmp, int len) {
char enc[len * 3 + 1];
str_uri_encode_len(enc, tmp, len);
@ -768,18 +705,6 @@ INLINE char* json_reader_get_string_value_uri_enc(JsonReader *root_reader, int *
return ret; // must be free'd
}
// stolen from libhiredis
/* Create a reply object */
INLINE redisReply *createReplyObject(int type) {
redisReply *r = calloc(1,sizeof(*r));
if (r == NULL)
return NULL;
r->type = type;
return r;
}
static int json_get_hash(struct redis_hash *out, struct call* c,
const char *key, unsigned int id)
{
@ -1763,30 +1688,6 @@ static int json_update_crypto_params(JsonBuilder *builder, const char *pref,
return 0;
}
static int redis_update_crypto_params(struct redis *r, const char *pref, const str *callid,
unsigned int unique_id,
const char *key, const struct crypto_params *p)
{
if (!p->crypto_suite)
return -1;
redis_pipe(r, "HMSET %s-"PB"-%u %s-crypto_suite %s %s-master_key "PB" %s-master_salt "PB" "
"%s-unenc-srtp %i %s-unenc-srtcp %i %s-unauth-srtp %i",
pref, STR(callid), unique_id,
key, p->crypto_suite->name,
key, S_LEN(p->master_key, sizeof(p->master_key)),
key, S_LEN(p->master_salt, sizeof(p->master_salt)),
key, p->session_params.unencrypted_srtp,
key, p->session_params.unencrypted_srtcp,
key, p->session_params.unauthenticated_srtp);
if (p->mki)
redis_pipe(r, "HMSET %s-"PB"-%u %s-mki "PB"",
pref, STR(callid), unique_id,
key,
S_LEN(p->mki, p->mki_len));
return 0;
}
static void json_update_crypto_context(JsonBuilder *builder, const char *pref,
unsigned int unique_id,
const struct crypto_context *c)
@ -1801,34 +1702,6 @@ static void json_update_crypto_context(JsonBuilder *builder, const char *pref,
}
static void redis_update_crypto_context(struct redis *r, const char *pref, const str *callid,
unsigned int unique_id,
const struct crypto_context *c)
{
if (redis_update_crypto_params(r, pref, callid, unique_id, "", &c->params))
return;
redis_pipe(r, "HMSET %s-"PB"-%u last_index "UINT64F" ssrc %u",
pref, STR(callid), unique_id,
c->last_index, (unsigned) c->ssrc);
}
static void redis_update_endpoint(struct redis *r, const char *pref, const str *callid,
unsigned int unique_id,
const char *key, const struct endpoint *e)
{
redis_pipe(r, "HMSET %s-"PB"-%u %s %s",
pref, STR(callid), unique_id,
key, endpoint_print_buf(e));
}
static void redis_update_stats(struct redis *r, const char *pref, const str *callid,
unsigned int unique_id,
const char *key, const struct stats *s)
{
redis_pipe(r, "HMSET %s-"PB"-%u %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"",
pref, STR(callid), unique_id,
key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes),
key, atomic64_get(&s->errors));
}
static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref,
unsigned int unique_id,
const struct dtls_fingerprint *f)
@ -1840,17 +1713,6 @@ static void json_update_dtls_fingerprint(JsonBuilder *builder, const char *pref,
JSON_SET_SIMPLE_LEN("fingerprint", sizeof(f->digest), (char *) f->digest);
}
static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, const str *callid,
unsigned int unique_id,
const struct dtls_fingerprint *f)
{
if (!f->hash_func)
return;
redis_pipe(r, "HMSET %s-"PB"-%u hash_func %s fingerprint "PB"",
pref, STR(callid), unique_id,
f->hash_func->name,
S_LEN(f->digest, sizeof(f->digest)));
}
/**
* encodes the few (k,v) pairs for one call under one json structure
*/
@ -2208,317 +2070,6 @@ static void redis_update_recording(struct redis *r, struct call *c) {
STR(&rec->metadata), rec->meta_prefix);
}
/*
* Redis data structure:
*
* SET: calls %s %s %s ...
*
* HASH: call-$callid num_sfds %u num_streams %u num_medias %u num_tags %u num_maps %u
*
* HASH: sfd-$callid-$num stream %u
*
* HASH: stream-$callid-$num media %u sfd %u rtp_sink %u rtcp_sink %u rtcp_sibling %u
* LIST: stream_sfds-$callid-$num %u %u ...
*
* HASH: tag-$callid-$num
* LIST: other_tags-$callid-$num %u %u ...
* LIST: medias-$callid-$num %u %u ...
*
* HASH: media-$callid-$num tag %u
* LIST: streams-$callid-$num %u %u ...
* LIST: maps-$callid-$num %u %u ...
*
* HASH: map-$callid-$num
* LIST: map_sfds-$callid-$num %u %u ...
*/
/* must be called lock-free */
void redis_update(struct call *c, struct redis *r) {
GList *l, *n, *k, *m;
struct call_monologue *ml, *ml2;
struct call_media *media;
struct packet_stream *ps;
struct stream_fd *sfd;
struct intf_list *il;
struct endpoint_map *ep;
struct rtp_payload_type *pt;
unsigned int redis_expires_s;
if (!r)
return;
mutex_lock(&r->lock);
if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) {
mutex_unlock(&r->lock);
return ;
}
rwlock_lock_r(&c->master_lock);
redis_expires_s = c->callmaster->conf.redis_expires_secs;
c->redis_hosted_db = r->db;
if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) {
rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error.");
goto err;
}
redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid));
redis_pipe(r, "SREM calls "PB"", STR(&c->callid));
redis_pipe(r, "DEL call-"PB"", STR(&c->callid));
redis_pipe(r, "HMSET call-"PB" created %llu last_signal %llu tos %i deleted %llu "
"num_sfds %u num_streams %u num_medias %u num_tags %u "
"num_maps %u "
"ml_deleted %llu created_from %s created_from_addr %s redis_hosted_db %u",
STR(&c->callid), (long long unsigned) c->created, (long long unsigned) c->last_signal,
(int) c->tos, (long long unsigned) c->deleted,
g_queue_get_length(&c->stream_fds), g_queue_get_length(&c->streams),
g_queue_get_length(&c->medias), g_queue_get_length(&c->monologues),
g_queue_get_length(&c->endpoint_maps),
(long long unsigned) c->ml_deleted,
c->created_from, sockaddr_print_buf(&c->created_from_addr),
c->redis_hosted_db);
/* XXX DTLS cert?? */
redis_update_recording(r, c);
redis_pipe(r, "DEL sfd-"PB"-0", STR(&c->callid));
for (l = c->stream_fds.head; l; l = l->next) {
sfd = l->data;
redis_pipe(r, "HMSET sfd-"PB"-%u pref_family %s localport %u logical_intf "PB" "
"local_intf_uid %u "
"stream %u",
STR(&c->callid), sfd->unique_id,
sfd->local_intf->logical->preferred_family->rfc_name,
sfd->socket.local.port,
STR(&sfd->local_intf->logical->name),
sfd->local_intf->unique_id,
sfd->stream->unique_id);
redis_update_crypto_context(r, "sfd", &c->callid, sfd->unique_id, &sfd->crypto);
/* XXX DTLS?? */
redis_pipe(r, "EXPIRE sfd-"PB"-%u %u", STR(&c->callid), sfd->unique_id, redis_expires_s);
redis_pipe(r, "DEL sfd-"PB"-%u", STR(&c->callid), sfd->unique_id + 1);
}
redis_pipe(r, "DEL stream-"PB"-0 stream_sfds-"PB"-0", STR(&c->callid), STR(&c->callid));
for (l = c->streams.head; l; l = l->next) {
ps = l->data;
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
redis_pipe(r, "HMSET stream-"PB"-%u media %u sfd %u rtp_sink %u "
"rtcp_sink %u rtcp_sibling %u last_packet "UINT64F" "
"ps_flags %u component %u",
STR(&c->callid), ps->unique_id,
ps->media->unique_id,
ps->selected_sfd ? ps->selected_sfd->unique_id : -1,
ps->rtp_sink ? ps->rtp_sink->unique_id : -1,
ps->rtcp_sink ? ps->rtcp_sink->unique_id : -1,
ps->rtcp_sibling ? ps->rtcp_sibling->unique_id : -1,
atomic64_get(&ps->last_packet),
ps->ps_flags,
ps->component);
redis_update_endpoint(r, "stream", &c->callid, ps->unique_id, "endpoint", &ps->endpoint);
redis_update_endpoint(r, "stream", &c->callid, ps->unique_id, "advertised_endpoint",
&ps->advertised_endpoint);
redis_update_stats(r, "stream", &c->callid, ps->unique_id, "stats", &ps->stats);
redis_update_crypto_context(r, "stream", &c->callid, ps->unique_id, &ps->crypto);
/* XXX DTLS?? */
for (k = ps->sfds.head; k; k = k->next) {
sfd = k->data;
redis_pipe(r, "RPUSH stream_sfds-"PB"-%u %u",
STR(&c->callid), ps->unique_id,
sfd->unique_id);
}
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->out_lock);
redis_pipe(r, "EXPIRE stream-"PB"-%u %u", STR(&c->callid), ps->unique_id, redis_expires_s);
redis_pipe(r, "EXPIRE stream_sfds-"PB"-%u %u", STR(&c->callid), ps->unique_id, redis_expires_s);
redis_pipe(r, "DEL stream-"PB"-%u stream_sfds-"PB"-%u",
STR(&c->callid), ps->unique_id + 1,
STR(&c->callid), ps->unique_id + 1);
}
redis_pipe(r, "DEL tag-"PB"-0 other_tags-"PB"-0 medias-"PB"-0",
STR(&c->callid), STR(&c->callid), STR(&c->callid));
for (l = c->monologues.head; l; l = l->next) {
ml = l->data;
redis_pipe(r, "HMSET tag-"PB"-%u created %llu active %u deleted %llu",
STR(&c->callid), ml->unique_id,
(long long unsigned) ml->created,
ml->active_dialogue ? ml->active_dialogue->unique_id : -1,
(long long unsigned) ml->deleted);
if (ml->tag.s)
redis_pipe(r, "HMSET tag-"PB"-%u tag "PB"",
STR(&c->callid), ml->unique_id,
STR(&ml->tag));
if (ml->viabranch.s)
redis_pipe(r, "HMSET tag-"PB"-%u via-branch "PB"",
STR(&c->callid), ml->unique_id,
STR(&ml->viabranch));
k = g_hash_table_get_values(ml->other_tags);
for (m = k; m; m = m->next) {
ml2 = m->data;
redis_pipe(r, "RPUSH other_tags-"PB"-%u %u",
STR(&c->callid), ml->unique_id,
ml2->unique_id);
}
g_list_free(k);
for (k = ml->medias.head; k; k = k->next) {
media = k->data;
redis_pipe(r, "RPUSH medias-"PB"-%u %u",
STR(&c->callid), ml->unique_id,
media->unique_id);
}
redis_pipe(r, "EXPIRE tag-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s);
redis_pipe(r, "EXPIRE other_tags-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s);
redis_pipe(r, "EXPIRE medias-"PB"-%u %u", STR(&c->callid), ml->unique_id, redis_expires_s);
redis_pipe(r, "DEL tag-"PB"-%u other_tags-"PB"-%u medias-"PB"-%u",
STR(&c->callid), ml->unique_id + 1,
STR(&c->callid), ml->unique_id + 1,
STR(&c->callid), ml->unique_id + 1);
}
redis_pipe(r, "DEL media-"PB"-0 streams-"PB"-0 maps-"PB"-0 payload_types-"PB"-0",
STR(&c->callid), STR(&c->callid), STR(&c->callid), STR(&c->callid));
for (l = c->medias.head; l; l = l->next) {
media = l->data;
redis_pipe(r, "HMSET media-"PB"-%u "
"tag %u "
"index %u "
"type "PB" protocol %s desired_family %s "
"sdes_in_tag %u sdes_out_tag %u logical_intf "PB" "
"media_flags %u",
STR(&c->callid), media->unique_id,
media->monologue->unique_id,
media->index,
STR(&media->type), media->protocol ? media->protocol->name : "",
media->desired_family ? media->desired_family->rfc_name : "",
media->sdes_in.tag, media->sdes_out.tag,
STR(&media->logical_intf->name),
media->media_flags);
redis_update_crypto_params(r, "media", &c->callid, media->unique_id, "sdes_in",
&media->sdes_in.params);
redis_update_crypto_params(r, "media", &c->callid, media->unique_id, "sdes_out",
&media->sdes_out.params);
redis_update_dtls_fingerprint(r, "media", &c->callid, media->unique_id, &media->fingerprint);
for (m = media->streams.head; m; m = m->next) {
ps = m->data;
redis_pipe(r, "RPUSH streams-"PB"-%u %u",
STR(&c->callid), media->unique_id,
ps->unique_id);
}
for (m = media->endpoint_maps.head; m; m = m->next) {
ep = m->data;
redis_pipe(r, "RPUSH maps-"PB"-%u %u",
STR(&c->callid), media->unique_id,
ep->unique_id);
}
k = g_hash_table_get_values(media->rtp_payload_types);
for (m = k; m; m = m->next) {
pt = m->data;
redis_pipe(r, "RPUSH payload_types-"PB"-%u %u/"PB"/%u/"PB"",
STR(&c->callid), media->unique_id,
pt->payload_type, STR(&pt->encoding),
pt->clock_rate, STR(&pt->encoding_parameters));
}
g_list_free(k);
redis_pipe(r, "EXPIRE media-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s);
redis_pipe(r, "EXPIRE streams-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s);
redis_pipe(r, "EXPIRE maps-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s);
redis_pipe(r, "EXPIRE payload_types-"PB"-%u %u", STR(&c->callid), media->unique_id, redis_expires_s);
redis_pipe(r, "DEL media-"PB"-%u streams-"PB"-%u maps-"PB"-%u payload_types-"PB"-%u",
STR(&c->callid), media->unique_id + 1,
STR(&c->callid), media->unique_id + 1,
STR(&c->callid), media->unique_id + 1,
STR(&c->callid), media->unique_id + 1);
}
redis_pipe(r, "DEL map-"PB"-0 map_sfds-"PB"-0",
STR(&c->callid), STR(&c->callid));
for (l = c->endpoint_maps.head; l; l = l->next) {
ep = l->data;
redis_pipe(r, "HMSET map-"PB"-%u wildcard %i num_ports %u intf_preferred_family %s "
"logical_intf "PB"",
STR(&c->callid), ep->unique_id,
ep->wildcard,
ep->num_ports,
ep->logical_intf->preferred_family->rfc_name,
STR(&ep->logical_intf->name));
redis_update_endpoint(r, "map", &c->callid, ep->unique_id, "endpoint", &ep->endpoint);
for (m = ep->intf_sfds.head; m; m = m->next) {
il = m->data;
redis_pipe(r, "RPUSH map_sfds-"PB"-%u loc-%u",
STR(&c->callid), ep->unique_id,
il->local_intf->unique_id);
for (n = il->list.head; n; n = n->next) {
sfd = n->data;
redis_pipe(r, "RPUSH map_sfds-"PB"-%u %u",
STR(&c->callid), ep->unique_id,
sfd->unique_id);
}
}
redis_pipe(r, "EXPIRE map-"PB"-%u %u", STR(&c->callid), ep->unique_id, redis_expires_s);
redis_pipe(r, "EXPIRE map_sfds-"PB"-%u %u", STR(&c->callid), ep->unique_id, redis_expires_s);
redis_pipe(r, "DEL map-"PB"-%u map_sfds-"PB"-%u",
STR(&c->callid), ep->unique_id + 1,
STR(&c->callid), ep->unique_id + 1);
}
redis_pipe(r, "EXPIRE call-"PB" %u", STR(&c->callid), redis_expires_s);
redis_pipe(r, "SADD calls "PB"", STR(&c->callid));
redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid));
redis_pipe(r, "EXPIRE notifier-"PB" %u", STR(&c->callid), redis_expires_s);
redis_consume(r);
mutex_unlock(&r->lock);
rwlock_unlock_r(&c->master_lock);
return;
err:
mutex_unlock(&r->lock);
rwlock_unlock_r(&c->master_lock);
if (r->ctx->err)
rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr);
redisFree(r->ctx);
r->ctx = NULL;
}
/* must be called lock-free */
void redis_delete(struct call *c, struct redis *r) {
if (!r)
@ -2534,11 +2085,7 @@ void redis_delete(struct call *c, struct redis *r) {
if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db))
goto err;
if (c->callmaster->conf.redis_multikey) {
redis_delete_call(c, r);
} else {
redis_delete_call_json(c, r);
}
rwlock_unlock_r(&c->master_lock);
mutex_unlock(&r->lock);

Loading…
Cancel
Save