diff --git a/daemon/main.c b/daemon/main.c index e237974d4..b303a7eec 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1065,8 +1065,6 @@ static void init_everything(void) { static void create_everything(void) { struct timeval tmp_tv; - struct timeval redis_start, redis_stop; - double redis_diff = 0; if (rtpe_config.kernel_table < 0) goto no_kernel; @@ -1227,40 +1225,52 @@ no_kernel: rtcp_init(); // must come after Homer init - if (rtpe_redis) { - // start redis restore timer - gettimeofday(&redis_start, NULL); - - // restore - if (rtpe_redis_notify) { - // active-active mode: the main DB has our own calls, while - // the "notify" DB has the "foreign" calls. "foreign" DB goes - // first as the "owned" DB can do a stray update back to Redis - for (GList *l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) { - int db = GPOINTER_TO_INT(l->data); - if (redis_restore(rtpe_redis_notify, true, db)) - ilog(LOG_WARN, "Unable to restore calls from the active-active peer"); - } - if (redis_restore(rtpe_redis_write, false, -1)) - die("Refusing to continue without working Redis database"); - } - else { - if (redis_restore(rtpe_redis, false, -1)) - die("Refusing to continue without working Redis database"); - } + gettimeofday(&rtpe_latest_graphite_interval_start, NULL); + + timeval_from_us(&tmp_tv, (long long) rtpe_config.graphite_interval*1000000); + set_graphite_interval_tv(&tmp_tv); +} + + +static void do_redis_restore(void) { + if (!rtpe_redis) + return; + + struct timeval redis_start, redis_stop; + double redis_diff = 0; - // stop redis restore timer - gettimeofday(&redis_stop, NULL); + // start redis restore timer + gettimeofday(&redis_start, NULL); - // print redis restore duration - redis_diff += timeval_diff(&redis_stop, &redis_start) / 1000.0; - ilog(LOG_INFO, "Redis restore time = %.0lf ms", redis_diff); + // restore + if (rtpe_redis_notify) { + // active-active mode: the main DB has our own calls, while + // the "notify" DB has the "foreign" calls. "foreign" DB goes + // first as the "owned" DB can do a stray update back to Redis + + // create new connection as notifications are already set up + struct redis *r = redis_dup(rtpe_redis_notify, -1); + + for (GList *l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) { + int db = GPOINTER_TO_INT(l->data); + if (redis_restore(r, true, db)) + ilog(LOG_WARN, "Unable to restore calls from the active-active peer"); + } + redis_close(r); + if (redis_restore(rtpe_redis_write, false, -1)) + die("Refusing to continue without working Redis database"); + } + else { + if (redis_restore(rtpe_redis, false, -1)) + die("Refusing to continue without working Redis database"); } - gettimeofday(&rtpe_latest_graphite_interval_start, NULL); + // stop redis restore timer + gettimeofday(&redis_stop, NULL); - timeval_from_us(&tmp_tv, (long long) rtpe_config.graphite_interval*1000000); - set_graphite_interval_tv(&tmp_tv); + // print redis restore duration + redis_diff += timeval_diff(&redis_stop, &redis_start) / 1000.0; + ilog(LOG_INFO, "Redis restore time = %.0lf ms", redis_diff); } @@ -1286,6 +1296,8 @@ int main(int argc, char **argv) { if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && rtpe_redis_notify) thread_create_detach(redis_notify_loop, NULL, "redis notify"); + do_redis_restore(); + if (!is_addr_unspecified(&rtpe_config.graphite_ep.address)) thread_create_detach(graphite_loop, NULL, "graphite"); diff --git a/daemon/redis.c b/daemon/redis.c index 9d5b9883a..e97bd185a 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -845,6 +845,9 @@ err: return NULL; } +struct redis *redis_dup(const struct redis *r, int db) { + return redis_new(&r->endpoint, db >= 0 ? db : r->db, r->auth, r->role, r->no_redis_required); +} void redis_close(struct redis *r) { if (!r) @@ -2136,7 +2139,7 @@ int redis_restore(struct redis *r, bool foreign, int db) { ctx.foreign = foreign; for (i = 0; i < rtpe_config.redis_num_threads; i++) g_queue_push_tail(&ctx.r_q, - redis_new(&r->endpoint, db, r->auth, r->role, r->no_redis_required)); + redis_dup(r, db)); gtp = g_thread_pool_new(restore_thread, &ctx, rtpe_config.redis_num_threads, TRUE, NULL); for (i = 0; i < calls->elements; i++) { diff --git a/include/redis.h b/include/redis.h index 31f811990..2b4d99379 100644 --- a/include/redis.h +++ b/include/redis.h @@ -107,6 +107,7 @@ void redis_delete_async_loop(void *d); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int); +struct redis *redis_dup(const struct redis *r, int db); void redis_close(struct redis *r); int redis_restore(struct redis *, bool foreign, int db); void redis_update(struct call *, struct redis *);