From 2ebf5a1526c1ce8093b3011a1e23c333b3f99400 Mon Sep 17 00:00:00 2001 From: Stefan Mititelu Date: Fri, 4 Sep 2020 17:59:53 +0300 Subject: [PATCH] Add redis async delete --- daemon/cli.c | 4 +- daemon/main.c | 22 +++- daemon/poller.c | 8 ++ daemon/redis.c | 311 ++++++++++++++++++++++++++++++++++++------------ include/main.h | 2 + include/redis.h | 14 ++- 6 files changed, 278 insertions(+), 83 deletions(-) diff --git a/daemon/cli.c b/daemon/cli.c index 3f1845a96..d81c1369b 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -995,7 +995,7 @@ static void cli_incoming_ksadd(str *instr, struct streambuf *replybuffer) { rwlock_lock_w(&rtpe_config.config_lock); if (!g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) { g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); - redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, uint_keyspace_db); + redis_notify_subscribe_action(rtpe_redis_notify, SUBSCRIBE_KEYSPACE, uint_keyspace_db); streambuf_printf(replybuffer, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db); } else { streambuf_printf(replybuffer, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db); @@ -1024,7 +1024,7 @@ static void cli_incoming_ksrm(str *instr, struct streambuf *replybuffer) { streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s); } else if ((l = g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) { // remove this keyspace - redis_notify_subscribe_action(UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); + redis_notify_subscribe_action(rtpe_redis_notify, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); g_queue_remove(&rtpe_config.redis_subscribed_keyspaces, l->data); streambuf_printf(replybuffer, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db); diff --git a/daemon/main.c b/daemon/main.c index 6ac007bfa..4b44b4c55 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -404,6 +404,8 @@ static void options(int *argc, char ***argv) { { "redis-disable-time", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_disable_time, "Number of seconds redis communication is disabled because of errors", "INT" }, { "redis-cmd-timeout", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_cmd_timeout, "Sets a timeout in milliseconds for redis commands", "INT" }, { "redis-connect-timeout", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_connect_timeout, "Sets a timeout in milliseconds for redis connections", "INT" }, + { "redis-delete-async", 'y', 0, G_OPTION_ARG_INT, &rtpe_config.redis_delete_async, "Enable asynchronous redis delete", NULL }, + { "redis-delete-async-interval", 'y', 0, G_OPTION_ARG_INT, &rtpe_config.redis_delete_async_interval, "Set asynchronous redis delete interval (seconds)", NULL }, { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &rtpe_config.b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"}, { "log-facility-rtcp",0, 0, G_OPTION_ARG_STRING, &log_facility_rtcp_s, "Syslog facility to use for logging RTCP", "daemon|local0|...|local7"}, @@ -717,6 +719,8 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { ini_rtpe_cfg->redis_disable_time = rtpe_config.redis_disable_time; ini_rtpe_cfg->redis_cmd_timeout = rtpe_config.redis_cmd_timeout; ini_rtpe_cfg->redis_connect_timeout = rtpe_config.redis_connect_timeout; + ini_rtpe_cfg->redis_delete_async = rtpe_config.redis_delete_async; + ini_rtpe_cfg->redis_delete_async_interval = rtpe_config.redis_delete_async_interval; ini_rtpe_cfg->common.log_level = rtpe_config.common.log_level; ini_rtpe_cfg->graphite_ep = rtpe_config.graphite_ep; @@ -969,6 +973,9 @@ int main(int argc, char **argv) { thread_create_detach_prio(poller_timer_loop, rtpe_poller, rtpe_config.idle_scheduling, rtpe_config.idle_priority); thread_create_detach_prio(load_thread, NULL, rtpe_config.idle_scheduling, rtpe_config.idle_priority); + if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async) + thread_create_detach(redis_delete_async_loop, NULL); + if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && rtpe_redis_notify) thread_create_detach(redis_notify_loop, NULL); @@ -1008,15 +1015,26 @@ int main(int argc, char **argv) { threads_join_all(0); } + // free libevent +#if LIBEVENT_VERSION_NUMBER >= 0x02010100 + libevent_global_shutdown(); +#endif + service_notify("STOPPING=1\n"); + if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async) + redis_async_event_base_action(rtpe_redis_write, EVENT_BASE_LOOPBREAK); + if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && rtpe_redis_notify) - redis_notify_event_base_action(EVENT_BASE_LOOPBREAK); + redis_async_event_base_action(rtpe_redis_notify, EVENT_BASE_LOOPBREAK); threads_join_all(1); + if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async) + redis_async_event_base_action(rtpe_redis_write, EVENT_BASE_FREE); + if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && rtpe_redis_notify) - redis_notify_event_base_action(EVENT_BASE_FREE); + redis_async_event_base_action(rtpe_redis_notify, EVENT_BASE_FREE); ilog(LOG_INFO, "Version %s shutting down", RTPENGINE_VERSION); diff --git a/daemon/poller.c b/daemon/poller.c index cb1107e6c..6bce14fc1 100644 --- a/daemon/poller.c +++ b/daemon/poller.c @@ -12,6 +12,10 @@ #include #include #include +#include +#include +#include + #include "aux.h" #include "obj.h" @@ -522,6 +526,10 @@ void poller_timer_loop(void *d) { now: gettimeofday(&rtpe_now, NULL); + if (rtpe_redis_write && (rtpe_redis_write->async_last + rtpe_config.redis_delete_async_interval <= rtpe_now.tv_sec)) { + redis_async_event_base_action(rtpe_redis_write, EVENT_BASE_LOOPBREAK); + rtpe_redis_write->async_last = rtpe_now.tv_sec; + } poller_timers_run(p); } } diff --git a/daemon/redis.c b/daemon/redis.c index 11d10ddcc..66ac0c814 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -37,10 +37,6 @@ struct redis *rtpe_redis; struct redis *rtpe_redis_write; struct redis *rtpe_redis_notify; -struct event_base *rtpe_redis_notify_event_base; -struct redisAsyncContext *rtpe_redis_notify_async_context; - - INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) @@ -398,119 +394,168 @@ err: mutex_unlock(&r->lock); } -void redis_async_context_disconnect(const redisAsyncContext *redis_notify_async_context, int status) { +void redis_delete_async_context_connect(const redisAsyncContext *redis_delete_async_context, int status) { + if (status == REDIS_ERR) { + rtpe_redis_write->async_ctx = NULL; + if (redis_delete_async_context->errstr) { + rlog(LOG_ERROR, "redis_delete_async_context_connect error %d: %s", + redis_delete_async_context->err, redis_delete_async_context->errstr); + } else { + rlog(LOG_ERROR, "redis_delete_async_context_connect error %d: no errstr", + redis_delete_async_context->err); + } + } else if (status == REDIS_OK) { + rlog(LOG_NOTICE, "redis_delete_async_context_connect initiated by user"); + } else { + rlog(LOG_ERROR, "redis_delete_async_context_connect invalid status code %d", status); + } +} + +void redis_delete_async_context_disconnect(const redisAsyncContext *redis_delete_async_context, int status) { + rtpe_redis_write->async_ctx = NULL; + if (status == REDIS_ERR) { + if (redis_delete_async_context->errstr) { + rlog(LOG_ERROR, "redis_delete_async_context_disconnect error %d: %s", + redis_delete_async_context->err, redis_delete_async_context->errstr); + } else { + rlog(LOG_ERROR, "redis_delete_async_context_disconnect error %d: no errstr", + redis_delete_async_context->err); + } + } else if (status == REDIS_OK) { + rlog(LOG_NOTICE, "redis_delete_async_context_disconnect initiated by user"); + } else { + rlog(LOG_ERROR, "redis_delete_async_context_disconnect invalid status code %d", status); + } +} + +void redis_notify_async_context_disconnect(const redisAsyncContext *redis_notify_async_context, int status) { if (status == REDIS_ERR) { if (redis_notify_async_context->errstr) { - rlog(LOG_ERROR, "redis_async_context_disconnect error %d on context free: %s", + rlog(LOG_ERROR, "redis_notify_async_context_disconnect error %d on context free: %s", redis_notify_async_context->err, redis_notify_async_context->errstr); } else { - rlog(LOG_ERROR, "redis_async_context_disconnect error %d on context free: no errstr", + rlog(LOG_ERROR, "redis_notify_async_context_disconnect error %d on context free: no errstr", redis_notify_async_context->err); } } else if (status == REDIS_OK) { - rlog(LOG_ERROR, "redis_async_context_disconnect initiated by user"); + rlog(LOG_ERROR, "redis_notify_async_context_disconnect initiated by user"); } else { - rlog(LOG_ERROR, "redis_async_context_disconnect invalid status code %d", status); + rlog(LOG_ERROR, "redis_notify_async_context_disconnect invalid status code %d", status); } } -int redis_async_context_alloc(void) { - struct redis *r = 0; - - if (!rtpe_redis_notify) { - rlog(LOG_INFO, "redis_notify database is NULL."); +// connect_cb = connect callback, disconnect_cb = disconnect callback +int redis_async_context_alloc(struct redis *r, void *connect_cb, void *disconnect_cb) { + // sanity checks + if (!r) { + rlog(LOG_ERROR, "redis_async_context_alloc: NULL r"); return -1; + } else { + rlog(LOG_DEBUG, "redis_async_context_alloc: Use Redis %s", endpoint_print_buf(&r->endpoint)); } - // get redis_notify database - r = rtpe_redis_notify; - rlog(LOG_INFO, "Use Redis %s for notifications", endpoint_print_buf(&r->endpoint)); - // alloc async context - rtpe_redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); - if (!rtpe_redis_notify_async_context) { - rlog(LOG_ERROR, "redis_notify_async_context can't create new"); + r->async_ctx = redisAsyncConnect(r->host, r->endpoint.port); + if (!r->async_ctx) { + rlog(LOG_ERROR, "redis_async_context_alloc: can't create new"); + return -1; + } + + if (r->async_ctx->err) { + rlog(LOG_ERROR, "redis_async_context_alloc: can't create new error: %s", r->async_ctx->errstr); return -1; } - if (rtpe_redis_notify_async_context->err) { - rlog(LOG_ERROR, "redis_notify_async_context can't create new error: %s", rtpe_redis_notify_async_context->errstr); + + // callbacks async context + if (redisAsyncSetConnectCallback(r->async_ctx, connect_cb) != REDIS_OK) { + rlog(LOG_ERROR, "redis_async_context_alloc: can't set connect callback"); return -1; } - if (redisAsyncSetDisconnectCallback(rtpe_redis_notify_async_context, redis_async_context_disconnect) != REDIS_OK) { - rlog(LOG_ERROR, "redis_notify_async_context can't set disconnect callback"); + if (redisAsyncSetDisconnectCallback(r->async_ctx, disconnect_cb) != REDIS_OK) { + rlog(LOG_ERROR, "redis_async_context_alloc: can't set disconnect callback"); return -1; } + rlog(LOG_DEBUG, "redis_async_context_alloc: Success"); + return 0; } -int redis_notify_event_base_action(enum event_base_action action) { - if (!rtpe_redis_notify_event_base && action!=EVENT_BASE_ALLOC) { - rlog(LOG_ERROR, "redis_notify_event_base is NULL on event base action %d", action); +int redis_async_event_base_action(struct redis *r, enum event_base_action action) { + // sanity checks + if (!r) { + rlog(LOG_ERR, "redis_async_event_base_action: NULL r"); + return -1; + } else { + rlog(LOG_DEBUG, "redis_async_event_base_action: Use Redis %s", endpoint_print_buf(&r->endpoint)); + } + + if (!r->async_ev && action != EVENT_BASE_ALLOC) { + rlog(LOG_NOTICE, "redis_async_event_base_action: async_ev is NULL on event base action %d", action); return -1; } // exec event base action switch (action) { case EVENT_BASE_ALLOC: - rtpe_redis_notify_event_base = event_base_new(); - if (!rtpe_redis_notify_event_base) { - rlog(LOG_ERROR, "Fail alloc redis_notify_event_base"); + r->async_ev = event_base_new(); + if (!r->async_ev) { + rlog(LOG_ERROR, "redis_async_event_base_action: Fail alloc async_ev"); return -1; } else { - rlog(LOG_DEBUG, "Success alloc redis_notify_event_base"); + rlog(LOG_DEBUG, "redis_async_event_base_action: Success alloc async_ev"); } break; case EVENT_BASE_FREE: - event_base_free(rtpe_redis_notify_event_base); - rlog(LOG_DEBUG, "Success free redis_notify_event_base"); + event_base_free(r->async_ev); + rlog(LOG_DEBUG, "redis_async_event_base_action: Success free async_ev"); break; case EVENT_BASE_LOOPBREAK: - if (event_base_loopbreak(rtpe_redis_notify_event_base)) { - rlog(LOG_ERROR, "Fail loopbreak redis_notify_event_base"); + if (event_base_loopbreak(r->async_ev)) { + rlog(LOG_ERROR, "redis_async_event_base_action: Fail loopbreak async_ev"); return -1; } else { - rlog(LOG_DEBUG, "Success loopbreak redis_notify_event_base"); + rlog(LOG_DEBUG, "redis_async_event_base_action: Success loopbreak async_ev"); } break; default: - rlog(LOG_ERROR, "No event base action found: %d", action); + rlog(LOG_ERROR, "redis_async_event_base_action: No event base action found: %d", action); return -1; } return 0; } -int redis_notify_subscribe_action(enum subscribe_action action, int keyspace) { - if (!rtpe_redis_notify_async_context) { +int redis_notify_subscribe_action(struct redis *r, enum subscribe_action action, int keyspace) { + if (!r->async_ctx) { rlog(LOG_ERROR, "redis_notify_async_context is NULL on subscribe action"); return -1; } - if (rtpe_redis_notify_async_context->err) { - rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", rtpe_redis_notify_async_context->errstr); + if (r->async_ctx->err) { + rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", r->async_ctx->errstr); return -1; } switch (action) { case SUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(r->async_ctx, on_redis_notification, NULL, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_KEYSPACE: - if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { + if (redisAsyncCommand(r->async_ctx, on_redis_notification, NULL, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); return -1; } break; case UNSUBSCRIBE_ALL: - if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "punsubscribe") != REDIS_OK) { + if (redisAsyncCommand(r->async_ctx, on_redis_notification, NULL, "punsubscribe") != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); return -1; } @@ -523,33 +568,107 @@ int redis_notify_subscribe_action(enum subscribe_action action, int keyspace) { return 0; } -static int redis_notify(void) { - struct redis *r = 0; +static int redis_delete_async(struct redis *r) { + // sanity checks + if (!r) { + rlog(LOG_ERROR, "redis_delete_async: Don't use Redis async deletions beacause no redis/redis_write."); + return -1 ; + } + + // alloc new redis async context + if (r->async_ctx == NULL && redis_async_context_alloc(r, redis_delete_async_context_connect, redis_delete_async_context_disconnect) < 0) { + r->async_ctx = NULL; + rlog(LOG_ERROR, "redis_delete_async: Failed to alloc async_ctx"); + return -1; + } + + // attach event base + if (redisLibeventAttach(r->async_ctx, r->async_ev) == REDIS_ERR) { + if (r->async_ctx->err) { + rlog(LOG_ERROR, "redis_delete_async: redis_delete_async_context can't attach event base error: %s", r->async_ctx->errstr); + } else { + rlog(LOG_ERROR, "redis_delete_async: redis_delete_async_context can't attach event base"); + + } + return -1; + } + + // commands + if (r->auth) { + if (redisAsyncCommand(r->async_ctx, NULL, NULL, "AUTH %s", r->auth) != REDIS_OK) { + rlog(LOG_ERROR, "redis_delete_async: Fail redisAsyncCommand on AUTH"); + return -1; + } + } else { + if (redisAsyncCommand(r->async_ctx, NULL, NULL, "PING") != REDIS_OK) { + rlog(LOG_ERROR, "redis_delete_async: Fail redisAsyncCommand on PING"); + return -1; + } + } + + // delete commands + gchar *redis_command; + gint redis_command_total = 0; + + mutex_lock(&r->async_lock); + while (!g_queue_is_empty(&r->async_queue)) { + redis_command_total++; + redis_command = g_queue_pop_head(&r->async_queue); + + if (redisAsyncCommand(r->async_ctx, NULL, NULL, redis_command) != REDIS_OK) { + rlog(LOG_ERROR, "redis_delete_async: Fail redisAsyncCommand on DELETE"); + } + + g_free(redis_command); + } + mutex_unlock(&r->async_lock); + + rlog(LOG_NOTICE, "redis_delete_async: Queued DELETE redisAsyncCommand total: %d", redis_command_total); + + // dispatch event base => thread blocks here + if (event_base_dispatch(r->async_ev) < 0) { + rlog(LOG_ERROR, "redis_delete_async: Fail event_base_dispatch()"); + return -1; + } + + // loopbreak => NOT NULL context + if (r->async_ctx) { + redisAsyncDisconnect(r->async_ctx); + r->async_ctx = NULL; + + // disconnect => NULL context set in callback + } else { + return 1; + } + + return 0; +} + +static int redis_notify(struct redis *r) { GList *l; - if (!rtpe_redis_notify) { + if (!r) { rlog(LOG_ERROR, "redis_notify database is NULL on redis_notify()"); return -1; } - if (!rtpe_redis_notify_async_context) { + if (!r->async_ctx) { rlog(LOG_ERROR, "redis_notify_async_context is NULL on redis_notify()"); return -1; } - if (!rtpe_redis_notify_event_base) { + if (!r->async_ev) { rlog(LOG_ERROR, "redis_notify_event_base is NULL on redis_notify()"); return -1; } // get redis_notify database - r = rtpe_redis_notify; rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint)); // attach event base - if (redisLibeventAttach(rtpe_redis_notify_async_context, rtpe_redis_notify_event_base) == REDIS_ERR) { - if (rtpe_redis_notify_async_context->err) { - rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", rtpe_redis_notify_async_context->errstr); + if (redisLibeventAttach(r->async_ctx, r->async_ev) == REDIS_ERR) { + if (r->async_ctx->err) { + rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", r->async_ctx->errstr); } else { rlog(LOG_ERROR, "redis_notify_async_context can't attach event base"); @@ -557,8 +676,8 @@ static int redis_notify(void) { return -1; } - if (rtpe_redis_notify->auth) { - if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "AUTH %s", rtpe_redis_notify->auth) != REDIS_OK) { + if (r->auth) { + if (redisAsyncCommand(r->async_ctx, on_redis_notification, NULL, "AUTH %s", r->auth) != REDIS_OK) { rlog(LOG_ERROR, "Fail redisAsyncCommand on AUTH"); return -1; } @@ -567,12 +686,12 @@ static int redis_notify(void) { // subscribe to the values in the configured keyspaces rwlock_lock_r(&rtpe_config.config_lock); for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) { - redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data)); + redis_notify_subscribe_action(r, SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data)); } rwlock_unlock_r(&rtpe_config.config_lock); // dispatch event base => thread blocks here - if (event_base_dispatch(rtpe_redis_notify_event_base) < 0) { + if (event_base_dispatch(r->async_ev) < 0) { rlog(LOG_ERROR, "Fail event_base_dispatch()"); return -1; } @@ -580,8 +699,35 @@ static int redis_notify(void) { return 0; } -static void redis_disconnect(void) { - rtpe_redis_notify_async_context = NULL; +void redis_delete_async_loop(void *d) { + struct redis *r = NULL; + + // sanity checks + r = rtpe_redis_write; + if (!r) { + rlog(LOG_ERROR, "redis_delete_async_loop: Don't use Redis async deletions beacause no redis/redis_write."); + return ; + } + + r->async_last = rtpe_now.tv_sec; + + // init libevent for pthread usage + if (evthread_use_pthreads() < 0) { + ilog(LOG_ERROR, "redis_delete_async_loop: evthread_use_pthreads failed."); + return ; + } + + // alloc libevent base + if (redis_async_event_base_action(r, EVENT_BASE_ALLOC) < 0) { + rlog(LOG_ERROR, "redis_delete_async_loop: Failed to EVENT_BASE_ALLOC."); + return ; + } + + // loop (almost) forever + while (!rtpe_shutdown) { + redis_delete_async(r); + sleep(1); + } } void redis_notify_loop(void *d) { @@ -602,18 +748,18 @@ void redis_notify_loop(void *d) { } // alloc redis async context - if (redis_async_context_alloc() < 0) { + if (redis_async_context_alloc(r, NULL, redis_notify_async_context_disconnect) < 0) { return ; } // alloc event base - if (redis_notify_event_base_action(EVENT_BASE_ALLOC) < 0) { + if (redis_async_event_base_action(r, EVENT_BASE_ALLOC) < 0) { return ; } // initial redis_notify if (redis_check_conn(r) == REDIS_STATE_CONNECTED) { - redis_notify_return = redis_notify(); + redis_notify_return = redis_notify(r); } // loop redis_notify => in case of lost connection @@ -627,28 +773,23 @@ void redis_notify_loop(void *d) { next_run = rtpe_now.tv_sec + seconds; if (redis_check_conn(r) == REDIS_STATE_CONNECTED || redis_notify_return < 0) { - redis_disconnect(); + r->async_ctx = NULL; // alloc new redis async context upon redis breakdown - if (redis_async_context_alloc() < 0) { + if (redis_async_context_alloc(r, NULL, redis_notify_async_context_disconnect) < 0) { continue; } // prepare notifications - redis_notify_return = redis_notify(); + redis_notify_return = redis_notify(r); } } - // free libevent -#if LIBEVENT_VERSION_NUMBER >= 0x02010100 - libevent_global_shutdown(); -#endif - // unsubscribe notifications - redis_notify_subscribe_action(UNSUBSCRIBE_ALL, 0); + redis_notify_subscribe_action(r, UNSUBSCRIBE_ALL, 0); // free async context - redisAsyncDisconnect(rtpe_redis_notify_async_context); - redis_disconnect(); + redisAsyncDisconnect(r->async_ctx); + r->async_ctx = NULL; } struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, @@ -773,6 +914,16 @@ static void redis_delete_call_json(struct call *c, struct redis *r) { redis_consume(r); } +static void redis_delete_async_call_json(struct call *c, struct redis *r) { + gchar *redis_command; + + redis_command = g_strdup_printf("SELECT %i", c->redis_hosted_db); + g_queue_push_tail(&r->async_queue, redis_command); + + redis_command = g_strdup_printf("DEL %.*s", c->callid.len, c->callid.s); + g_queue_push_tail(&r->async_queue, redis_command); +} + INLINE void json_builder_add_string_value_uri_enc(JsonBuilder *builder, const char* tmp, int len) { char enc[len * 3 + 1]; str_uri_encode_len(enc, tmp, len); @@ -2276,9 +2427,21 @@ err: /* must be called lock-free */ void redis_delete(struct call *c, struct redis *r) { + int delete_async = rtpe_config.redis_delete_async; + rlog(LOG_DEBUG, "Redis delete_async=%d", delete_async); + if (!r) return; + if (delete_async) { + mutex_lock(&r->async_lock); + rwlock_lock_r(&c->master_lock); + redis_delete_async_call_json(c, r); + rwlock_unlock_r(&c->master_lock); + mutex_unlock(&r->async_lock); + return; + } + mutex_lock(&r->lock); // coverity[sleep : FALSE] if (redis_check_conn(r) == REDIS_STATE_DISCONNECTED) { diff --git a/include/main.h b/include/main.h index 3790f8082..9c0e443d1 100644 --- a/include/main.h +++ b/include/main.h @@ -70,6 +70,8 @@ struct rtpengine_config { int redis_disable_time; int redis_cmd_timeout; int redis_connect_timeout; + int redis_delete_async; + int redis_delete_async_interval; char *redis_auth; char *redis_write_auth; int num_threads; diff --git a/include/redis.h b/include/redis.h index 633109ac8..b232702d4 100644 --- a/include/redis.h +++ b/include/redis.h @@ -61,6 +61,12 @@ struct redis { int no_redis_required; int consecutive_errors; time_t restore_tick; + + struct event_base *async_ev; + struct redisAsyncContext *async_ctx; + mutex_t async_lock; + GQueue async_queue; + int async_last; }; struct redis_hash { @@ -78,9 +84,6 @@ extern struct redis *rtpe_redis; extern struct redis *rtpe_redis_write; extern struct redis *rtpe_redis_notify; -extern struct event_base *rtpe_redis_notify_event_base; -extern struct redisAsyncContext *rtpe_redis_notify_async_context; - #if !GLIB_CHECK_VERSION(2,40,0) @@ -99,6 +102,7 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) #define rlog(l, x...) ilog(l | LOG_FLAG_RESTORE, x) void redis_notify_loop(void *d); +void redis_delete_async_loop(void *d); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int); @@ -108,8 +112,8 @@ void redis_update(struct call *, struct redis *); void redis_update_onekey(struct call *c, struct redis *r); void redis_delete(struct call *, struct redis *); void redis_wipe(struct redis *); -int redis_notify_event_base_action(enum event_base_action); -int redis_notify_subscribe_action(enum subscribe_action action, int keyspace); +int redis_async_event_base_action(struct redis *r, enum event_base_action); +int redis_notify_subscribe_action(struct redis *r, enum subscribe_action action, int keyspace); int redis_set_timeout(struct redis* r, int timeout); int redis_reconnect(struct redis* r);