diff --git a/daemon/Makefile b/daemon/Makefile index 3b28d19cb..941a520e5 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -5,7 +5,7 @@ CFLAGS+= `pkg-config --cflags glib-2.0` CFLAGS+= `pkg-config --cflags gthread-2.0` CFLAGS+= `pkg-config --cflags zlib` CFLAGS+= `pkg-config --cflags openssl` -CFLAGS+= `pkg-config --cflags libevent` +CFLAGS+= `pkg-config --cflags libevent_pthreads` CFLAGS+= `pcre-config --cflags` CFLAGS+= -I../kernel-module/ CFLAGS+= -D_GNU_SOURCE @@ -49,7 +49,7 @@ LDFLAGS+= `pkg-config --libs zlib` LDFLAGS+= `pkg-config --libs libpcre` LDFLAGS+= `pkg-config --libs libcrypto` LDFLAGS+= `pkg-config --libs openssl` -LDFLAGS+= `pkg-config --libs libevent` +LDFLAGS+= `pkg-config --libs libevent_pthreads` LDFLAGS+= `pcre-config --libs` LDFLAGS+= `xmlrpc-c-config client --libs` LDFLAGS+= -lhiredis diff --git a/daemon/aux.c b/daemon/aux.c index 286ee044a..e26ef4320 100644 --- a/daemon/aux.c +++ b/daemon/aux.c @@ -220,7 +220,8 @@ int uint32_eq(const void *a, const void *b) { const u_int32_t *A = a, *B = b; return (*A == *B) ? TRUE : FALSE; } -int uint_cmp(const void *a, const void *b) { - const unsigned int *A = a, *B = b; - return (int) (*A - *B); + +int guint_cmp(gconstpointer a, gconstpointer b) { + const guint A = GPOINTER_TO_UINT(a), B = GPOINTER_TO_UINT(b); + return (int) (A - B); } diff --git a/daemon/aux.h b/daemon/aux.h index f3468a180..15a3fc261 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -82,7 +82,7 @@ unsigned int in6_addr_hash(const void *p); int in6_addr_eq(const void *a, const void *b); unsigned int uint32_hash(const void *p); int uint32_eq(const void *a, const void *b); -int uint_cmp(const void *a, const void *b); +int guint_cmp(gconstpointer a, gconstpointer b); /*** GLIB HELPERS ***/ diff --git a/daemon/cli.c b/daemon/cli.c index 29581ce33..c82b985d3 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -670,9 +670,9 @@ static void cli_incoming_terminate(char* buffer, int len, struct callmaster* m, static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { int printlen=0; - int *pint; - int keyspace_db; + unsigned int uint_keyspace_db; str str_keyspace_db; + char *endptr; if (len<=1) { printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "More parameters required."); @@ -683,19 +683,20 @@ static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char str_keyspace_db.s = buffer; str_keyspace_db.len = len; - keyspace_db = str_to_i(&str_keyspace_db, -1); - - if (keyspace_db != -1) { - redis_notify_subscribe_keyspace(m,keyspace_db); - if (!g_queue_find_custom(m->conf.redis_subscribed_keyspaces, &keyspace_db, uint_cmp)) { - pint = (int*)malloc(sizeof(int)); - *pint = keyspace_db; - g_queue_push_tail(m->conf.redis_subscribed_keyspaces, pint); + uint_keyspace_db = strtol(str_keyspace_db.s, &endptr, 10); + + if ((errno == ERANGE && (uint_keyspace_db == LONG_MAX || uint_keyspace_db == LONG_MIN)) || (errno != 0 && uint_keyspace_db == 0)) { + printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail adding keyspace %.*s to redis notifications; errono=%d\n", str_keyspace_db.len, str_keyspace_db.s, errno); + } else if (endptr == str_keyspace_db.s) { + printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail adding keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s); + } else { + if (!g_queue_find_custom(m->conf.redis_subscribed_keyspaces, uint_keyspace_db, guint_cmp)) { + g_queue_push_tail(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); + redis_notify_subscribe_action(m, SUBSCRIBE_KEYSPACE, uint_keyspace_db); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Success adding keyspace %u to redis notifications.\n", uint_keyspace_db); + } else { + printlen = snprintf(replybuffer, outbufend-replybuffer, "Keyspace %u is already among redis notifications.\n", uint_keyspace_db); } - printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully added keyspace %i to redis notifications.\n", keyspace_db); - } - else { - printlen = snprintf(replybuffer, outbufend-replybuffer, "Could not add keyspace %i to redis notifications.\n", keyspace_db); } ADJUSTLEN(printlen,outbufend,replybuffer); } @@ -707,8 +708,9 @@ static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* gpointer key, value; struct call_monologue *ml = NULL; GList *l, *i; - int keyspace_db; + unsigned int uint_keyspace_db; str str_keyspace_db; + char *endptr; if (len <= 1) { printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "More parameters required."); @@ -719,19 +721,23 @@ static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* str_keyspace_db.s = buffer; str_keyspace_db.len = len; - keyspace_db = str_to_i(&str_keyspace_db, -1); + uint_keyspace_db = strtol(str_keyspace_db.s, &endptr, 10); - if ((l = g_queue_find_custom(m->conf.redis_subscribed_keyspaces, &keyspace_db, uint_cmp))) { + if ((errno == ERANGE && (uint_keyspace_db == LONG_MAX || uint_keyspace_db == LONG_MIN)) || (errno != 0 && uint_keyspace_db == 0)) { + printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail removing keyspace %.*s to redis notifications; errono=%d\n", str_keyspace_db.len, str_keyspace_db.s, errno); + } else if (endptr == str_keyspace_db.s) { + printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail removing keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s); + } else if ((l = g_queue_find_custom(m->conf.redis_subscribed_keyspaces, uint_keyspace_db, guint_cmp))) { // remove this keyspace - redis_notify_unsubscribe_keyspace(m,keyspace_db); + redis_notify_subscribe_action(m, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); g_queue_remove(m->conf.redis_subscribed_keyspaces, l->data); - printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully unsubscribed from keyspace %i.\n", keyspace_db); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully unsubscribed from keyspace %u.\n", uint_keyspace_db); // remove all current foreign calls for this keyspace g_hash_table_iter_init(&iter, m->callhash); while (g_hash_table_iter_next(&iter, &key, &value)) { c = (struct call*)value; - if (!c || !c->is_backup_call || !(c->redis_hosted_db == keyspace_db)) { + if (!c || !c->is_backup_call|| !(c->redis_hosted_db == uint_keyspace_db)) { continue; } if (!c->ml_deleted) { @@ -744,9 +750,9 @@ static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* call_destroy(c); g_hash_table_iter_init(&iter, m->callhash); } - printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed all foreign calls for keyspace %i.\n", keyspace_db); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed all foreign calls for keyspace %u.\n", uint_keyspace_db); } else { - printlen = snprintf(replybuffer, outbufend-replybuffer, "Keyspace %i was not among redis notifications.\n", keyspace_db); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Keyspace %u is not among redis notifications.\n", uint_keyspace_db); } ADJUSTLEN(printlen,outbufend,replybuffer); } @@ -759,7 +765,7 @@ static void cli_incoming_kslist(char* buffer, int len, struct callmaster* m, cha ADJUSTLEN(printlen,outbufend,replybuffer); for (l = m->conf.redis_subscribed_keyspaces->head; l; l = l->next) { - printlen = snprintf(replybuffer,(outbufend-replybuffer), "%d ", *((unsigned int *)(l->data))); + printlen = snprintf(replybuffer,(outbufend-replybuffer), "%u ", GPOINTER_TO_UINT(l->data)); ADJUSTLEN(printlen,outbufend,replybuffer); } @@ -831,7 +837,7 @@ next: } else if (strncmp(inbuf,KSRM,strlen(KSRM)) == 0) { cli_incoming_ksrm(inbuf+strlen(KSRM), inlen-strlen(KSRM), cli->callmaster, outbuf, outbufend); } else if (strncmp(inbuf,KSLIST,strlen(KSLIST)) == 0) { - cli_incoming_kslist(inbuf+strlen(KSRM), inlen-strlen(KSRM), cli->callmaster, outbuf, outbufend); + cli_incoming_kslist(inbuf+strlen(KSLIST), inlen-strlen(KSLIST), cli->callmaster, outbuf, outbufend); } else { sprintf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", inbuf); } diff --git a/daemon/main.c b/daemon/main.c index b2a055221..110a2b69e 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -261,7 +261,7 @@ static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth static void options(int *argc, char ***argv) { char **if_a = NULL; char **ks_a = NULL; - int *pint; + unsigned int uint_keyspace_db; str str_keyspace_db; char **iter; struct intf_config *ifa; @@ -274,12 +274,13 @@ static void options(int *argc, char ***argv) { char *redisps = NULL; char *redisps_write = NULL; char *log_facility_s = NULL; - char *log_facility_cdr_s = NULL; + char *log_facility_cdr_s = NULL; char *log_facility_rtcp_s = NULL; int version = 0; int sip_source = 0; char *homerp = NULL; char *homerproto = NULL; + char *endptr; GOptionEntry e[] = { { "version", 'v', 0, G_OPTION_ARG_NONE, &version, "Print build time and exit", NULL }, @@ -349,14 +350,19 @@ static void options(int *argc, char ***argv) { if (ks_a) { for (iter = ks_a; *iter; iter++) { - pint = (int *)malloc(sizeof(int)); str_keyspace_db.s = *iter; - str_keyspace_db.len = strlen(*iter); - *pint = str_to_i(&str_keyspace_db, -1); - - if (*pint != -1) - g_queue_push_tail(&keyspaces, pint); - } + str_keyspace_db.len = strlen(*iter); + uint_keyspace_db = strtol(str_keyspace_db.s, &endptr, 10); + + if ((errno == ERANGE && (uint_keyspace_db == LONG_MAX || uint_keyspace_db == LONG_MIN)) || + (errno != 0 && uint_keyspace_db == 0)) { + ilog(LOG_ERR, "Fail adding keyspace %.*s to redis notifications; errono=%d\n", str_keyspace_db.len, str_keyspace_db.s, errno); + } else if (endptr == str_keyspace_db.s) { + ilog(LOG_ERR, "Fail adding keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s); + } else { + g_queue_push_tail(&keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); + } + } } if (listenps) { @@ -718,7 +724,7 @@ int main(int argc, char **argv) { threads_join_all(0); } - redis_notify_event_base_loopbreak(ctx.m); + redis_notify_event_base_action(ctx.m, EVENT_BASE_LOOPBREAK); threads_join_all(1); diff --git a/daemon/redis.c b/daemon/redis.c index b47c39aff..8a13c2e8b 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -19,6 +19,7 @@ #include "hiredis/hiredis.h" #include "hiredis/async.h" #include "hiredis/adapters/libevent.h" +#include "event2/thread.h" INLINE redisReply *redis_expect(int type, redisReply *r) { if (!r) @@ -246,8 +247,14 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { char *pdbstr = db_str; char *p = 0; - if (!(cm->conf.redis)) { - rlog(LOG_ERROR, "A redis notification has been there but role was not 'master' or 'read'"); + // sanity checks + if (!cm) { + rlog(LOG_ERROR, "Struct callmaster is NULL on onRedisNotification"); + return; + } + + if (!cm->conf.redis_notify) { + rlog(LOG_ERROR, "A redis notification has been received but no redis_notify database found"); return; } @@ -303,7 +310,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { str_init(&callid,rr->element[2]->str); - c = g_hash_table_lookup(cm->callhash, &callid); + c = call_get(&callid, cm); + if (c) { + rwlock_unlock_w(&c->master_lock); // because of call_get(..) + } if (c && !c->redis_foreign_call) { rlog(LOG_DEBUG,"I am responsible for that call so I ignore redis notifications."); @@ -326,110 +336,205 @@ err: mutex_unlock(&r->lock); } -void redis_notify_event_base_loopbreak(struct callmaster *cm) { +void redis_async_context_disconnect(const redisAsyncContext *redis_notify_async_context, int status) { + if (status == REDIS_ERR) { + if (redis_notify_async_context->errstr) { + rlog(LOG_ERROR, "redis_async_context_disconnect error %d on context free: %s", + redis_notify_async_context->err, redis_notify_async_context->errstr); + } else { + rlog(LOG_ERROR, "redis_async_context_disconnect error %d on context free: no errstr", + redis_notify_async_context->err); + } + } else if (status == REDIS_OK) { + rlog(LOG_ERROR, "redis_async_context_disconnect initiated by user"); + } else { + rlog(LOG_ERROR, "redis_async_context_disconnect invalid status code %d", status); + } +} + +int redis_async_context_alloc(struct callmaster *cm) { + struct redis *r = 0; + // sanity checks - if (!cm->conf.redis_notify_event_base) { - rlog(LOG_ERROR, "Redis event_base_new() is NULL on loopbreak"); - return; + if (!cm) { + rlog(LOG_ERROR, "Struct callmaster is NULL on context free"); + return -1; } - if (!cm->conf.redis_notify_async_context) { - rlog(LOG_ERROR, "Redis notify async context is NULL on loopbreak"); - return; + if (!cm->conf.redis_notify) { + rlog(LOG_INFO, "redis_notify database is NULL."); + return -1; } + // get redis_notify database + r = cm->conf.redis_notify; + rlog(LOG_INFO, "Use Redis %s for notifications", endpoint_print_buf(&r->endpoint)); + + // alloc async context + cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); + if (!cm->conf.redis_notify_async_context) { + rlog(LOG_ERROR, "redis_notify_async_context can't create new"); + return -1; + } if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "Redis notify async context error on loopbreak: %s", cm->conf.redis_notify_async_context->errstr); - return; + rlog(LOG_ERROR, "redis_notify_async_context can't create new error: %s", cm->conf.redis_notify_async_context->errstr); + return -1; } - event_base_loopbreak(cm->conf.redis_notify_event_base); - redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe"); -} + if (redisAsyncSetDisconnectCallback(cm->conf.redis_notify_async_context, redis_async_context_disconnect) != REDIS_OK) { + rlog(LOG_ERROR, "redis_notify_async_context can't set disconnect callback"); + return -1; + } -void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace) { - char main_db_str[256]; + return 0; +} +int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action action) { // sanity checks - if (!cm->conf.redis_notify_async_context) { - rlog(LOG_ERROR, "Redis notify async context NULL on subscribe"); - return; + if (!cm) { + rlog(LOG_ERROR, "Struct callmaster is NULL on event base action %d", action); + return -1; } - if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "Redis notify async context error on subscribe: %s", cm->conf.redis_notify_async_context->errstr); - return; + if (!cm->conf.redis_notify_event_base && action!=EVENT_BASE_ALLOC) { + rlog(LOG_ERROR, "redis_notify_event_base is NULL on event base action %d", action); + return -1; } - memset(&main_db_str, 0, 256); - sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace); + // exec event base action + switch (action) { + case EVENT_BASE_ALLOC: + cm->conf.redis_notify_event_base = event_base_new(); + if (!cm->conf.redis_notify_event_base) { + rlog(LOG_ERROR, "Fail alloc redis_notify_event_base"); + return -1; + } else { + rlog(LOG_DEBUG, "Success alloc redis_notify_event_base"); + } + break; + + case EVENT_BASE_FREE: + event_base_free(cm->conf.redis_notify_event_base); + rlog(LOG_DEBUG, "Success free redis_notify_event_base"); + break; + + case EVENT_BASE_LOOPBREAK: + if (event_base_loopbreak(cm->conf.redis_notify_event_base)) { + rlog(LOG_ERROR, "Fail loopbreak redis_notify_event_base"); + return -1; + } else { + rlog(LOG_DEBUG, "Success loopbreak redis_notify_event_base"); + } + break; - redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); -} + default: + rlog(LOG_ERROR, "No event base action found: %d", action); + return -1; + } -void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) { - char main_db_str[256]; + return 0; +} +int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action action, int keyspace) { // sanity checks + if (!cm) { + rlog(LOG_ERROR, "Struct callmaster is NULL on subscribe action"); + return -1; + } + if (!cm->conf.redis_notify_async_context) { - rlog(LOG_ERROR, "Redis notify async context NULL on unsubscribe"); - return; + rlog(LOG_ERROR, "redis_notify_async_context is NULL on subscribe action"); + return -1; } if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "Redis notify async context error on unsubscribe: %s", cm->conf.redis_notify_async_context->errstr); - return; + rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", cm->conf.redis_notify_async_context->errstr); + return -1; } - memset(&main_db_str, 0, 256); - sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace); + switch (action) { + case SUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, "psubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on SUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_KEYSPACE: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, "punsubscribe __keyspace@%i*:notifier-*", keyspace) != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_KEYSPACE"); + return -1; + } + break; + case UNSUBSCRIBE_ALL: + if (redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void *) cm, "punsubscribe") != REDIS_OK) { + rlog(LOG_ERROR, "Fail redisAsyncCommand on UNSUBSCRIBE_ALL"); + return -1; + } + break; + default: + rlog(LOG_ERROR, "No subscribe action found: %d", action); + return -1; + } - redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); + return 0; } -static void redis_notify(struct callmaster *cm) { +static int redis_notify(struct callmaster *cm) { struct redis *r = 0; GList *l; - if (cm->conf.redis_notify) { - r = cm->conf.redis_notify; - rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint)); - } else { - rlog(LOG_INFO, "Don't use Redis notifications. See --redis-notifications parameter."); - return; + // sanity checks + if (!cm) { + rlog(LOG_ERROR, "Struct callmaster is NULL on redis_notify()"); + return -1; } - cm->conf.redis_notify_event_base = NULL; - cm->conf.redis_notify_event_base = event_base_new(); - if (!cm->conf.redis_notify_event_base) { - rlog(LOG_ERROR, "Redis event_base_new() NULL error"); - return; + if (!cm->conf.redis_notify) { + rlog(LOG_ERROR, "redis_notify database is NULL on redis_notify()"); + return -1; } - cm->conf.redis_notify_async_context = NULL; - cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); if (!cm->conf.redis_notify_async_context) { - rlog(LOG_ERROR, "Redis notify async context NULL error"); - return; + rlog(LOG_ERROR, "redis_notify_async_context is NULL on redis_notify()"); + return -1; } - if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "Redis notify async context error: %s", cm->conf.redis_notify_async_context->errstr); - return; + + if (!cm->conf.redis_notify_event_base) { + rlog(LOG_ERROR, "redis_notify_event_base is NULL on redis_notify()"); + return -1; } - redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base); + // get redis_notify database + r = cm->conf.redis_notify; + rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint)); + + // attach event base + if (redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base) == REDIS_ERR) { + if (cm->conf.redis_notify_async_context->err) { + rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", cm->conf.redis_notify_async_context->errstr); + } else { + rlog(LOG_ERROR, "redis_notify_async_context can't attach event base"); - /* Subscribing to the values in the configured keyspaces */ + } + return -1; + } + + // subscribe to the values in the configured keyspaces for (l = cm->conf.redis_subscribed_keyspaces->head; l; l = l->next) { - redis_notify_subscribe_keyspace(cm,*(int *)(l->data)); + redis_notify_subscribe_action(cm, SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data)); } - event_base_dispatch(cm->conf.redis_notify_event_base); + // dispatch event base => thread blocks here + if (event_base_dispatch(cm->conf.redis_notify_event_base) < 0) { + rlog(LOG_ERROR, "Fail event_base_dispatch()"); + return -1; + } + return 0; } void redis_notify_loop(void *d) { - int seconds = 1; + int seconds = 1, redis_notify_return = 0; time_t next_run = g_now.tv_sec; struct callmaster *cm = (struct callmaster *)d; struct redis *r; @@ -442,12 +547,29 @@ void redis_notify_loop(void *d) { r = cm->conf.redis_notify; if (!r) { + rlog(LOG_ERROR, "Don't use Redis notifications. See --redis-notifications parameter."); + return ; + } + + // init libevent for pthread usage + if (evthread_use_pthreads() < 0) { + ilog(LOG_ERROR, "evthread_use_pthreads failed"); + return ; + } + + // alloc redis async context + if (redis_async_context_alloc(cm) < 0) { + return ; + } + + // alloc event base + if (redis_notify_event_base_action(cm, EVENT_BASE_ALLOC) < 0) { return ; } // initial redis_notify if (redis_check_conn(r) == REDIS_STATE_CONNECTED) { - redis_notify(cm); + redis_notify_return = redis_notify(cm); } // loop redis_notify => in case of lost connection @@ -460,10 +582,25 @@ void redis_notify_loop(void *d) { next_run = g_now.tv_sec + seconds; - if (redis_check_conn(r) == REDIS_STATE_RECONNECTED) { - redis_notify(cm); + if (redis_check_conn(r) == REDIS_STATE_RECONNECTED || redis_notify_return < 0) { + // alloc new redis async context upon redis breakdown + if (redis_async_context_alloc(cm) < 0) { + continue; + } + + // prepare notifications + redis_notify_return = redis_notify(cm); } } + + // unsubscribe notifications + redis_notify_subscribe_action(cm, UNSUBSCRIBE_ALL, 0); + + // free async context + redisAsyncDisconnect(cm->conf.redis_notify_async_context); + + // free event base + redis_notify_event_base_action(cm, EVENT_BASE_FREE); } struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) { diff --git a/daemon/redis.h b/daemon/redis.h index 9d3553c0d..0321abd77 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -30,6 +30,18 @@ enum redis_state { REDIS_STATE_RECONNECTED, // DISCONNECTED -> CONNECTED }; +enum event_base_action { + EVENT_BASE_ALLOC = 0, + EVENT_BASE_FREE, + EVENT_BASE_LOOPBREAK, +}; + +enum subscribe_action { + SUBSCRIBE_KEYSPACE = 0, + UNSUBSCRIBE_KEYSPACE, + UNSUBSCRIBE_ALL, +}; + struct callmaster; struct call; @@ -98,9 +110,8 @@ int redis_restore(struct callmaster *, struct redis *); void redis_update(struct call *, struct redis *); void redis_delete(struct call *, struct redis *); void redis_wipe(struct redis *); -void redis_notify_event_base_loopbreak(struct callmaster *cm); -void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace); -void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace); +int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action); +int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action action, int keyspace); diff --git a/debian/control b/debian/control index c4ee1d931..a08ebb3d0 100644 --- a/debian/control +++ b/debian/control @@ -7,8 +7,8 @@ Build-Depends: debhelper (>= 5), libcurl4-openssl-dev | libcurl4-gnutls-dev | libcurl3-openssl-dev | libcurl3-gnutls-dev, libglib2.0-dev (>= 2.30), libhiredis-dev, - libevent-2.0-5, - libevent-dev, + libevent-dev (>= 2.0), + libevent-pthreads-2.0-5 (>= 2.0), libpcre3-dev, libssl-dev (>= 1.0.1), libxmlrpc-c3-dev (>= 1.16.07) | libxmlrpc-core-c3-dev (>= 1.16.07),