Merge branch 'master' of https://github.com/smititelu/rtpengine into smititelu-master

Change-Id: I425a188c632e2a6ee3f9109380d9ae114409f62b
pull/1093/head
Richard Fuchs 5 years ago
commit 4902b07ff9

@ -995,7 +995,7 @@ static void cli_incoming_ksadd(str *instr, struct cli_writer *cw) {
rwlock_lock_w(&rtpe_config.config_lock);
if (!g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) {
g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db));
redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, uint_keyspace_db);
redis_notify_subscribe_action(rtpe_redis_notify, SUBSCRIBE_KEYSPACE, uint_keyspace_db);
cw->cw_printf(cw, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db);
} else {
cw->cw_printf(cw, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db);
@ -1024,7 +1024,7 @@ static void cli_incoming_ksrm(str *instr, struct cli_writer *cw) {
cw->cw_printf(cw, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s);
} else if ((l = g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) {
// remove this keyspace
redis_notify_subscribe_action(UNSUBSCRIBE_KEYSPACE, uint_keyspace_db);
redis_notify_subscribe_action(rtpe_redis_notify, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db);
g_queue_remove(&rtpe_config.redis_subscribed_keyspaces, l->data);
cw->cw_printf(cw, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db);

@ -405,6 +405,8 @@ static void options(int *argc, char ***argv) {
{ "redis-disable-time", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_disable_time, "Number of seconds redis communication is disabled because of errors", "INT" },
{ "redis-cmd-timeout", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_cmd_timeout, "Sets a timeout in milliseconds for redis commands", "INT" },
{ "redis-connect-timeout", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_connect_timeout, "Sets a timeout in milliseconds for redis connections", "INT" },
{ "redis-delete-async", 'y', 0, G_OPTION_ARG_INT, &rtpe_config.redis_delete_async, "Enable asynchronous redis delete", NULL },
{ "redis-delete-async-interval", 'y', 0, G_OPTION_ARG_INT, &rtpe_config.redis_delete_async_interval, "Set asynchronous redis delete interval (seconds)", NULL },
{ "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &rtpe_config.b2b_url, "XMLRPC URL of B2B UA" , "STRING" },
{ "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"},
{ "log-facility-rtcp",0, 0, G_OPTION_ARG_STRING, &log_facility_rtcp_s, "Syslog facility to use for logging RTCP", "daemon|local0|...|local7"},
@ -723,6 +725,8 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) {
ini_rtpe_cfg->redis_disable_time = rtpe_config.redis_disable_time;
ini_rtpe_cfg->redis_cmd_timeout = rtpe_config.redis_cmd_timeout;
ini_rtpe_cfg->redis_connect_timeout = rtpe_config.redis_connect_timeout;
ini_rtpe_cfg->redis_delete_async = rtpe_config.redis_delete_async;
ini_rtpe_cfg->redis_delete_async_interval = rtpe_config.redis_delete_async_interval;
ini_rtpe_cfg->common.log_level = rtpe_config.common.log_level;
ini_rtpe_cfg->graphite_ep = rtpe_config.graphite_ep;
@ -990,6 +994,9 @@ int main(int argc, char **argv) {
thread_create_detach_prio(poller_timer_loop, rtpe_poller, rtpe_config.idle_scheduling, rtpe_config.idle_priority);
thread_create_detach_prio(load_thread, NULL, rtpe_config.idle_scheduling, rtpe_config.idle_priority);
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async)
thread_create_detach(redis_delete_async_loop, NULL);
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && rtpe_redis_notify)
thread_create_detach(redis_notify_loop, NULL);
@ -1023,15 +1030,26 @@ int main(int argc, char **argv) {
threads_join_all(0);
}
// free libevent
#if LIBEVENT_VERSION_NUMBER >= 0x02010100
libevent_global_shutdown();
#endif
service_notify("STOPPING=1\n");
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async)
redis_async_event_base_action(rtpe_redis_write, EVENT_BASE_LOOPBREAK);
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && rtpe_redis_notify)
redis_notify_event_base_action(EVENT_BASE_LOOPBREAK);
redis_async_event_base_action(rtpe_redis_notify, EVENT_BASE_LOOPBREAK);
threads_join_all(1);
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async)
redis_async_event_base_action(rtpe_redis_write, EVENT_BASE_FREE);
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && rtpe_redis_notify)
redis_notify_event_base_action(EVENT_BASE_FREE);
redis_async_event_base_action(rtpe_redis_notify, EVENT_BASE_FREE);
ilog(LOG_INFO, "Version %s shutting down", RTPENGINE_VERSION);

@ -12,6 +12,10 @@
#include <sys/epoll.h>
#include <glib.h>
#include <sys/time.h>
#include <main.h>
#include <redis.h>
#include <hiredis/adapters/libevent.h>
#include "aux.h"
#include "obj.h"
@ -522,6 +526,10 @@ void poller_timer_loop(void *d) {
now:
gettimeofday(&rtpe_now, NULL);
if (rtpe_redis_write && (rtpe_redis_write->async_last + rtpe_config.redis_delete_async_interval <= rtpe_now.tv_sec)) {
redis_async_event_base_action(rtpe_redis_write, EVENT_BASE_LOOPBREAK);
rtpe_redis_write->async_last = rtpe_now.tv_sec;
}
poller_timers_run(p);
}
}

@ -37,10 +37,6 @@ struct redis *rtpe_redis;
struct redis *rtpe_redis_write;
struct redis *rtpe_redis_notify;
struct event_base *rtpe_redis_notify_event_base;
struct redisAsyncContext *rtpe_redis_notify_async_context;
INLINE redisReply *redis_expect(int type, redisReply *r) {
if (!r)
@ -398,119 +394,168 @@ err:
mutex_unlock(&r->lock);
}
void redis_async_context_disconnect(const redisAsyncContext *redis_notify_async_context, int status) {
void redis_delete_async_context_connect(const redisAsyncContext *redis_delete_async_context, int status) {
if (status == REDIS_ERR) {
rtpe_redis_write->async_ctx = NULL;
if (redis_delete_async_context->errstr) {
rlog(LOG_ERROR, "redis_delete_async_context_connect error %d: %s",
redis_delete_async_context->err, redis_delete_async_context->errstr);
} else {
rlog(LOG_ERROR, "redis_delete_async_context_connect error %d: no errstr",
redis_delete_async_context->err);
}
} else if (status == REDIS_OK) {
rlog(LOG_NOTICE, "redis_delete_async_context_connect initiated by user");
} else {
rlog(LOG_ERROR, "redis_delete_async_context_connect invalid status code %d", status);
}
}
void redis_delete_async_context_disconnect(const redisAsyncContext *redis_delete_async_context, int status) {
rtpe_redis_write->async_ctx = NULL;
if (status == REDIS_ERR) {
if (redis_delete_async_context->errstr) {
rlog(LOG_ERROR, "redis_delete_async_context_disconnect error %d: %s",
redis_delete_async_context->err, redis_delete_async_context->errstr);
} else {
rlog(LOG_ERROR, "redis_delete_async_context_disconnect error %d: no errstr",
redis_delete_async_context->err);
}
} else if (status == REDIS_OK) {
rlog(LOG_NOTICE, "redis_delete_async_context_disconnect initiated by user");
} else {
rlog(LOG_ERROR, "redis_delete_async_context_disconnect invalid status code %d", status);
}
}
void redis_notify_async_context_disconnect(const redisAsyncContext *redis_notify_async_context, int status) {
if (status == REDIS_ERR) {
if (redis_notify_async_context->errstr) {
rlog(LOG_ERROR, "redis_async_context_disconnect error %d on context free: %s",
rlog(LOG_ERROR, "redis_notify_async_context_disconnect error %d on context free: %s",
redis_notify_async_context->err, redis_notify_async_context->errstr);
} else {
rlog(LOG_ERROR, "redis_async_context_disconnect error %d on context free: no errstr",
rlog(LOG_ERROR, "redis_notify_async_context_disconnect error %d on context free: no errstr",
redis_notify_async_context->err);
}
} else if (status == REDIS_OK) {
rlog(LOG_ERROR, "redis_async_context_disconnect initiated by user");
rlog(LOG_ERROR, "redis_notify_async_context_disconnect initiated by user");
} else {
rlog(LOG_ERROR, "redis_async_context_disconnect invalid status code %d", status);
rlog(LOG_ERROR, "redis_notify_async_context_disconnect invalid status code %d", status);
}
}
int redis_async_context_alloc(void) {
struct redis *r = 0;
if (!rtpe_redis_notify) {
rlog(LOG_INFO, "redis_notify database is NULL.");
// connect_cb = connect callback, disconnect_cb = disconnect callback
int redis_async_context_alloc(struct redis *r, void *connect_cb, void *disconnect_cb) {
// sanity checks
if (!r) {
rlog(LOG_ERROR, "redis_async_context_alloc: NULL r");
return -1;
} else {
rlog(LOG_DEBUG, "redis_async_context_alloc: Use Redis %s", endpoint_print_buf(&r->endpoint));
}
// get redis_notify database
r = rtpe_redis_notify;
rlog(LOG_INFO, "Use Redis %s for notifications", endpoint_print_buf(&r->endpoint));
// alloc async context
rtpe_redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port);
if (!rtpe_redis_notify_async_context) {
rlog(LOG_ERROR, "redis_notify_async_context can't create new");
r->async_ctx = redisAsyncConnect(r->host, r->endpoint.port);
if (!r->async_ctx) {
rlog(LOG_ERROR, "redis_async_context_alloc: can't create new");
return -1;
}
if (r->async_ctx->err) {
rlog(LOG_ERROR, "redis_async_context_alloc: can't create new error: %s", r->async_ctx->errstr);
return -1;
}
if (rtpe_redis_notify_async_context->err) {
rlog(LOG_ERROR, "redis_notify_async_context can't create new error: %s", rtpe_redis_notify_async_context->errstr);
// callbacks async context
if (redisAsyncSetConnectCallback(r->async_ctx, connect_cb) != REDIS_OK) {
rlog(LOG_ERROR, "redis_async_context_alloc: can't set connect callback");
return -1;
}
if (redisAsyncSetDisconnectCallback(rtpe_redis_notify_async_context, redis_async_context_disconnect) != REDIS_OK) {
rlog(LOG_ERROR, "redis_notify_async_context can't set disconnect callback");
if (redisAsyncSetDisconnectCallback(r->async_ctx, disconnect_cb) != REDIS_OK) {
rlog(LOG_ERROR, "redis_async_context_alloc: can't set disconnect callback");
return -1;
}
rlog(LOG_DEBUG, "redis_async_context_alloc: Success");
return 0;
}
int redis_notify_event_base_action(enum event_base_action action) {
if (!rtpe_redis_notify_event_base && action!=EVENT_BASE_ALLOC) {
rlog(LOG_ERROR, "redis_notify_event_base is NULL on event base action %d", action);
int redis_async_event_base_action(struct redis *r, enum event_base_action action) {
// sanity checks
if (!r) {
rlog(LOG_ERR, "redis_async_event_base_action: NULL r");
return -1;
} else {
rlog(LOG_DEBUG, "redis_async_event_base_action: Use Redis %s", endpoint_print_buf(&r->endpoint));
}
if (!r->async_ev && action != EVENT_BASE_ALLOC) {
rlog(LOG_NOTICE, "redis_async_event_base_action: async_ev is NULL on event base action %d", action);
return -1;
}
// exec event base action
switch (action) {
case EVENT_BASE_ALLOC:
rtpe_redis_notify_event_base = event_base_new();
if (!rtpe_redis_notify_event_base) {
rlog(LOG_ERROR, "Fail alloc redis_notify_event_base");
r->async_ev = event_base_new();
if (!r->async_ev) {
rlog(LOG_ERROR, "redis_async_event_base_action: Fail alloc async_ev");
return -1;
} else {
rlog(LOG_DEBUG, "Success alloc redis_notify_event_base");
rlog(LOG_DEBUG, "redis_async_event_base_action: Success alloc async_ev");
}
break;
case EVENT_BASE_FREE:
event_base_free(rtpe_redis_notify_event_base);
rlog(LOG_DEBUG, "Success free redis_notify_event_base");
event_base_free(r->async_ev);
rlog(LOG_DEBUG, "redis_async_event_base_action: Success free async_ev");
break;
case EVENT_BASE_LOOPBREAK:
if (event_base_loopbreak(rtpe_redis_notify_event_base)) {
rlog(LOG_ERROR, "Fail loopbreak redis_notify_event_base");
if (event_base_loopbreak(r->async_ev)) {
rlog(LOG_ERROR, "redis_async_event_base_action: Fail loopbreak async_ev");
return -1;
} else {
rlog(LOG_DEBUG, "Success loopbreak redis_notify_event_base");
rlog(LOG_DEBUG, "redis_async_event_base_action: Success loopbreak async_ev");
}
break;
default:
rlog(LOG_ERROR, "No event base action found: %d", action);
rlog(LOG_ERROR, "redis_async_event_base_action: No event base action found: %d", action);
return -1;
}
return 0;
}
int redis_notify_subscribe_action(enum subscribe_action action, int keyspace) {
if (!rtpe_redis_notify_async_context) {
int redis_notify_subscribe_action(struct redis *r, enum subscribe_action action, int keyspace) {
if (!r->async_ctx) {
rlog(LOG_ERROR, "redis_notify_async_context is NULL on subscribe action");
return -1;
}
if (rtpe_redis_notify_async_context->err) {
rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", rtpe_redis_notify_async_context->errstr);
if (r->async_ctx->err) {
rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", r->async_ctx->errstr);
return -1;
}
switch (action) {
case SUBSCRIBE_KEYSPACE:
if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) {
if (redisAsyncCommand(r->async_ctx, on_redis_notification, NULL, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE");
return -1;
}
break;
case UNSUBSCRIBE_KEYSPACE:
if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) {
if (redisAsyncCommand(r->async_ctx, on_redis_notification, NULL, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE");
return -1;
}
break;
case UNSUBSCRIBE_ALL:
if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "punsubscribe") != REDIS_OK) {
if (redisAsyncCommand(r->async_ctx, on_redis_notification, NULL, "punsubscribe") != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL");
return -1;
}
@ -523,33 +568,107 @@ int redis_notify_subscribe_action(enum subscribe_action action, int keyspace) {
return 0;
}
static int redis_notify(void) {
struct redis *r = 0;
static int redis_delete_async(struct redis *r) {
// sanity checks
if (!r) {
rlog(LOG_ERROR, "redis_delete_async: Don't use Redis async deletions beacause no redis/redis_write.");
return -1 ;
}
// alloc new redis async context
if (r->async_ctx == NULL && redis_async_context_alloc(r, redis_delete_async_context_connect, redis_delete_async_context_disconnect) < 0) {
r->async_ctx = NULL;
rlog(LOG_ERROR, "redis_delete_async: Failed to alloc async_ctx");
return -1;
}
// attach event base
if (redisLibeventAttach(r->async_ctx, r->async_ev) == REDIS_ERR) {
if (r->async_ctx->err) {
rlog(LOG_ERROR, "redis_delete_async: redis_delete_async_context can't attach event base error: %s", r->async_ctx->errstr);
} else {
rlog(LOG_ERROR, "redis_delete_async: redis_delete_async_context can't attach event base");
}
return -1;
}
// commands
if (r->auth) {
if (redisAsyncCommand(r->async_ctx, NULL, NULL, "AUTH %s", r->auth) != REDIS_OK) {
rlog(LOG_ERROR, "redis_delete_async: Fail redisAsyncCommand on AUTH");
return -1;
}
} else {
if (redisAsyncCommand(r->async_ctx, NULL, NULL, "PING") != REDIS_OK) {
rlog(LOG_ERROR, "redis_delete_async: Fail redisAsyncCommand on PING");
return -1;
}
}
// delete commands
gchar *redis_command;
gint redis_command_total = 0;
mutex_lock(&r->async_lock);
while (!g_queue_is_empty(&r->async_queue)) {
redis_command_total++;
redis_command = g_queue_pop_head(&r->async_queue);
if (redisAsyncCommand(r->async_ctx, NULL, NULL, redis_command) != REDIS_OK) {
rlog(LOG_ERROR, "redis_delete_async: Fail redisAsyncCommand on DELETE");
}
g_free(redis_command);
}
mutex_unlock(&r->async_lock);
rlog(LOG_NOTICE, "redis_delete_async: Queued DELETE redisAsyncCommand total: %d", redis_command_total);
// dispatch event base => thread blocks here
if (event_base_dispatch(r->async_ev) < 0) {
rlog(LOG_ERROR, "redis_delete_async: Fail event_base_dispatch()");
return -1;
}
// loopbreak => NOT NULL context
if (r->async_ctx) {
redisAsyncDisconnect(r->async_ctx);
r->async_ctx = NULL;
// disconnect => NULL context set in callback
} else {
return 1;
}
return 0;
}
static int redis_notify(struct redis *r) {
GList *l;
if (!rtpe_redis_notify) {
if (!r) {
rlog(LOG_ERROR, "redis_notify database is NULL on redis_notify()");
return -1;
}
if (!rtpe_redis_notify_async_context) {
if (!r->async_ctx) {
rlog(LOG_ERROR, "redis_notify_async_context is NULL on redis_notify()");
return -1;
}
if (!rtpe_redis_notify_event_base) {
if (!r->async_ev) {
rlog(LOG_ERROR, "redis_notify_event_base is NULL on redis_notify()");
return -1;
}
// get redis_notify database
r = rtpe_redis_notify;
rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint));
// attach event base
if (redisLibeventAttach(rtpe_redis_notify_async_context, rtpe_redis_notify_event_base) == REDIS_ERR) {
if (rtpe_redis_notify_async_context->err) {
rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", rtpe_redis_notify_async_context->errstr);
if (redisLibeventAttach(r->async_ctx, r->async_ev) == REDIS_ERR) {
if (r->async_ctx->err) {
rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", r->async_ctx->errstr);
} else {
rlog(LOG_ERROR, "redis_notify_async_context can't attach event base");
@ -557,8 +676,8 @@ static int redis_notify(void) {
return -1;
}
if (rtpe_redis_notify->auth) {
if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "AUTH %s", rtpe_redis_notify->auth) != REDIS_OK) {
if (r->auth) {
if (redisAsyncCommand(r->async_ctx, on_redis_notification, NULL, "AUTH %s", r->auth) != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on AUTH");
return -1;
}
@ -567,12 +686,12 @@ static int redis_notify(void) {
// subscribe to the values in the configured keyspaces
rwlock_lock_r(&rtpe_config.config_lock);
for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) {
redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data));
redis_notify_subscribe_action(r, SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data));
}
rwlock_unlock_r(&rtpe_config.config_lock);
// dispatch event base => thread blocks here
if (event_base_dispatch(rtpe_redis_notify_event_base) < 0) {
if (event_base_dispatch(r->async_ev) < 0) {
rlog(LOG_ERROR, "Fail event_base_dispatch()");
return -1;
}
@ -580,8 +699,35 @@ static int redis_notify(void) {
return 0;
}
static void redis_disconnect(void) {
rtpe_redis_notify_async_context = NULL;
void redis_delete_async_loop(void *d) {
struct redis *r = NULL;
// sanity checks
r = rtpe_redis_write;
if (!r) {
rlog(LOG_ERROR, "redis_delete_async_loop: Don't use Redis async deletions beacause no redis/redis_write.");
return ;
}
r->async_last = rtpe_now.tv_sec;
// init libevent for pthread usage
if (evthread_use_pthreads() < 0) {
ilog(LOG_ERROR, "redis_delete_async_loop: evthread_use_pthreads failed.");
return ;
}
// alloc libevent base
if (redis_async_event_base_action(r, EVENT_BASE_ALLOC) < 0) {
rlog(LOG_ERROR, "redis_delete_async_loop: Failed to EVENT_BASE_ALLOC.");
return ;
}
// loop (almost) forever
while (!rtpe_shutdown) {
redis_delete_async(r);
sleep(1);
}
}
void redis_notify_loop(void *d) {
@ -602,18 +748,18 @@ void redis_notify_loop(void *d) {
}
// alloc redis async context
if (redis_async_context_alloc() < 0) {
if (redis_async_context_alloc(r, NULL, redis_notify_async_context_disconnect) < 0) {
return ;
}
// alloc event base
if (redis_notify_event_base_action(EVENT_BASE_ALLOC) < 0) {
if (redis_async_event_base_action(r, EVENT_BASE_ALLOC) < 0) {
return ;
}
// initial redis_notify
if (redis_check_conn(r) == REDIS_STATE_CONNECTED) {
redis_notify_return = redis_notify();
redis_notify_return = redis_notify(r);
}
// loop redis_notify => in case of lost connection
@ -627,28 +773,23 @@ void redis_notify_loop(void *d) {
next_run = rtpe_now.tv_sec + seconds;
if (redis_check_conn(r) == REDIS_STATE_CONNECTED || redis_notify_return < 0) {
redis_disconnect();
r->async_ctx = NULL;
// alloc new redis async context upon redis breakdown
if (redis_async_context_alloc() < 0) {
if (redis_async_context_alloc(r, NULL, redis_notify_async_context_disconnect) < 0) {
continue;
}
// prepare notifications
redis_notify_return = redis_notify();
redis_notify_return = redis_notify(r);
}
}
// free libevent
#if LIBEVENT_VERSION_NUMBER >= 0x02010100
libevent_global_shutdown();
#endif
// unsubscribe notifications
redis_notify_subscribe_action(UNSUBSCRIBE_ALL, 0);
redis_notify_subscribe_action(r, UNSUBSCRIBE_ALL, 0);
// free async context
redisAsyncDisconnect(rtpe_redis_notify_async_context);
redis_disconnect();
redisAsyncDisconnect(r->async_ctx);
r->async_ctx = NULL;
}
struct redis *redis_new(const endpoint_t *ep, int db, const char *auth,
@ -773,6 +914,16 @@ static void redis_delete_call_json(struct call *c, struct redis *r) {
redis_consume(r);
}
static void redis_delete_async_call_json(struct call *c, struct redis *r) {
gchar *redis_command;
redis_command = g_strdup_printf("SELECT %i", c->redis_hosted_db);
g_queue_push_tail(&r->async_queue, redis_command);
redis_command = g_strdup_printf("DEL %.*s", c->callid.len, c->callid.s);
g_queue_push_tail(&r->async_queue, redis_command);
}
INLINE void json_builder_add_string_value_uri_enc(JsonBuilder *builder, const char* tmp, int len) {
char enc[len * 3 + 1];
str_uri_encode_len(enc, tmp, len);
@ -2276,9 +2427,21 @@ err:
/* must be called lock-free */
void redis_delete(struct call *c, struct redis *r) {
int delete_async = rtpe_config.redis_delete_async;
rlog(LOG_DEBUG, "Redis delete_async=%d", delete_async);
if (!r)
return;
if (delete_async) {
mutex_lock(&r->async_lock);
rwlock_lock_r(&c->master_lock);
redis_delete_async_call_json(c, r);
rwlock_unlock_r(&c->master_lock);
mutex_unlock(&r->async_lock);
return;
}
mutex_lock(&r->lock);
// coverity[sleep : FALSE]
if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) {

@ -70,6 +70,8 @@ struct rtpengine_config {
int redis_disable_time;
int redis_cmd_timeout;
int redis_connect_timeout;
int redis_delete_async;
int redis_delete_async_interval;
char *redis_auth;
char *redis_write_auth;
int num_threads;

@ -61,6 +61,12 @@ struct redis {
int no_redis_required;
int consecutive_errors;
time_t restore_tick;
struct event_base *async_ev;
struct redisAsyncContext *async_ctx;
mutex_t async_lock;
GQueue async_queue;
int async_last;
};
struct redis_hash {
@ -78,9 +84,6 @@ extern struct redis *rtpe_redis;
extern struct redis *rtpe_redis_write;
extern struct redis *rtpe_redis_notify;
extern struct event_base *rtpe_redis_notify_event_base;
extern struct redisAsyncContext *rtpe_redis_notify_async_context;
#if !GLIB_CHECK_VERSION(2,40,0)
@ -99,6 +102,7 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v)
#define rlog(l, x...) ilog(l | LOG_FLAG_RESTORE, x)
void redis_notify_loop(void *d);
void redis_delete_async_loop(void *d);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int);
@ -108,8 +112,8 @@ void redis_update(struct call *, struct redis *);
void redis_update_onekey(struct call *c, struct redis *r);
void redis_delete(struct call *, struct redis *);
void redis_wipe(struct redis *);
int redis_notify_event_base_action(enum event_base_action);
int redis_notify_subscribe_action(enum subscribe_action action, int keyspace);
int redis_async_event_base_action(struct redis *r, enum event_base_action);
int redis_notify_subscribe_action(struct redis *r, enum subscribe_action action, int keyspace);
int redis_set_timeout(struct redis* r, int timeout);
int redis_reconnect(struct redis* r);

Loading…
Cancel
Save