TT#14008 change Redis restore order

To prevent a race condition that might miss updates about call info, set
up the Redis keyspace notifications first and then run loop to restore
calls from the existing data.

closes #1503

Change-Id: I6afa4c50fe0a34c602063fc2f45b2ee38133cf1e
mr10.5.2
Richard Fuchs 3 years ago
parent 97d32c86d1
commit 3d49d0437e

@ -1065,8 +1065,6 @@ static void init_everything(void) {
static void create_everything(void) {
struct timeval tmp_tv;
struct timeval redis_start, redis_stop;
double redis_diff = 0;
if (rtpe_config.kernel_table < 0)
goto no_kernel;
@ -1227,40 +1225,52 @@ no_kernel:
rtcp_init(); // must come after Homer init
if (rtpe_redis) {
// start redis restore timer
gettimeofday(&redis_start, NULL);
// restore
if (rtpe_redis_notify) {
// active-active mode: the main DB has our own calls, while
// the "notify" DB has the "foreign" calls. "foreign" DB goes
// first as the "owned" DB can do a stray update back to Redis
for (GList *l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) {
int db = GPOINTER_TO_INT(l->data);
if (redis_restore(rtpe_redis_notify, true, db))
ilog(LOG_WARN, "Unable to restore calls from the active-active peer");
}
if (redis_restore(rtpe_redis_write, false, -1))
die("Refusing to continue without working Redis database");
}
else {
if (redis_restore(rtpe_redis, false, -1))
die("Refusing to continue without working Redis database");
}
gettimeofday(&rtpe_latest_graphite_interval_start, NULL);
timeval_from_us(&tmp_tv, (long long) rtpe_config.graphite_interval*1000000);
set_graphite_interval_tv(&tmp_tv);
}
static void do_redis_restore(void) {
if (!rtpe_redis)
return;
struct timeval redis_start, redis_stop;
double redis_diff = 0;
// stop redis restore timer
gettimeofday(&redis_stop, NULL);
// start redis restore timer
gettimeofday(&redis_start, NULL);
// print redis restore duration
redis_diff += timeval_diff(&redis_stop, &redis_start) / 1000.0;
ilog(LOG_INFO, "Redis restore time = %.0lf ms", redis_diff);
// restore
if (rtpe_redis_notify) {
// active-active mode: the main DB has our own calls, while
// the "notify" DB has the "foreign" calls. "foreign" DB goes
// first as the "owned" DB can do a stray update back to Redis
// create new connection as notifications are already set up
struct redis *r = redis_dup(rtpe_redis_notify, -1);
for (GList *l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) {
int db = GPOINTER_TO_INT(l->data);
if (redis_restore(r, true, db))
ilog(LOG_WARN, "Unable to restore calls from the active-active peer");
}
redis_close(r);
if (redis_restore(rtpe_redis_write, false, -1))
die("Refusing to continue without working Redis database");
}
else {
if (redis_restore(rtpe_redis, false, -1))
die("Refusing to continue without working Redis database");
}
gettimeofday(&rtpe_latest_graphite_interval_start, NULL);
// stop redis restore timer
gettimeofday(&redis_stop, NULL);
timeval_from_us(&tmp_tv, (long long) rtpe_config.graphite_interval*1000000);
set_graphite_interval_tv(&tmp_tv);
// print redis restore duration
redis_diff += timeval_diff(&redis_stop, &redis_start) / 1000.0;
ilog(LOG_INFO, "Redis restore time = %.0lf ms", redis_diff);
}
@ -1286,6 +1296,8 @@ int main(int argc, char **argv) {
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && rtpe_redis_notify)
thread_create_detach(redis_notify_loop, NULL, "redis notify");
do_redis_restore();
if (!is_addr_unspecified(&rtpe_config.graphite_ep.address))
thread_create_detach(graphite_loop, NULL, "graphite");

@ -845,6 +845,9 @@ err:
return NULL;
}
struct redis *redis_dup(const struct redis *r, int db) {
return redis_new(&r->endpoint, db >= 0 ? db : r->db, r->auth, r->role, r->no_redis_required);
}
void redis_close(struct redis *r) {
if (!r)
@ -2136,7 +2139,7 @@ int redis_restore(struct redis *r, bool foreign, int db) {
ctx.foreign = foreign;
for (i = 0; i < rtpe_config.redis_num_threads; i++)
g_queue_push_tail(&ctx.r_q,
redis_new(&r->endpoint, db, r->auth, r->role, r->no_redis_required));
redis_dup(r, db));
gtp = g_thread_pool_new(restore_thread, &ctx, rtpe_config.redis_num_threads, TRUE, NULL);
for (i = 0; i < calls->elements; i++) {

@ -107,6 +107,7 @@ void redis_delete_async_loop(void *d);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int);
struct redis *redis_dup(const struct redis *r, int db);
void redis_close(struct redis *r);
int redis_restore(struct redis *, bool foreign, int db);
void redis_update(struct call *, struct redis *);

Loading…
Cancel
Save