|
|
|
|
@ -124,7 +124,7 @@ static void redis_consume(struct redis *r) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* called with r->lock held if necessary */
|
|
|
|
|
static int redis_connect(struct redis *r, int wait, int role) {
|
|
|
|
|
static int redis_connect(struct redis *r, int wait) {
|
|
|
|
|
struct timeval tv;
|
|
|
|
|
redisReply *rp;
|
|
|
|
|
char *s;
|
|
|
|
|
@ -161,18 +161,18 @@ static int redis_connect(struct redis *r, int wait, int role) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!memcmp(s, "role:master", 9)) {
|
|
|
|
|
if (role == MASTER_REDIS_ROLE || role == ANY_REDIS_ROLE) {
|
|
|
|
|
if (r->role == MASTER_REDIS_ROLE || r->role == ANY_REDIS_ROLE) {
|
|
|
|
|
ilog(LOG_INFO, "Connected to Redis in master mode");
|
|
|
|
|
goto done;
|
|
|
|
|
} else if (role == SLAVE_REDIS_ROLE) {
|
|
|
|
|
} else if (r->role == SLAVE_REDIS_ROLE) {
|
|
|
|
|
ilog(LOG_INFO, "Connected to Redis in master mode, but wanted mode is slave; retrying...");
|
|
|
|
|
goto next;
|
|
|
|
|
}
|
|
|
|
|
} else if (!memcmp(s, "role:slave", 8)) {
|
|
|
|
|
if (role == SLAVE_REDIS_ROLE || role == ANY_REDIS_ROLE) {
|
|
|
|
|
if (r->role == SLAVE_REDIS_ROLE || r->role == ANY_REDIS_ROLE) {
|
|
|
|
|
ilog(LOG_INFO, "Connected to Redis in slave mode");
|
|
|
|
|
goto done;
|
|
|
|
|
} else if (role == MASTER_REDIS_ROLE) {
|
|
|
|
|
} else if (r->role == MASTER_REDIS_ROLE) {
|
|
|
|
|
ilog(LOG_INFO, "Connected to Redis in slave mode, but wanted mode is master; retrying...");
|
|
|
|
|
goto next;
|
|
|
|
|
}
|
|
|
|
|
@ -206,7 +206,7 @@ err:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct redis *redis_new(const endpoint_t *ep, int db, int role) {
|
|
|
|
|
struct redis *redis_new(const endpoint_t *ep, int db, enum redis_role role) {
|
|
|
|
|
struct redis *r;
|
|
|
|
|
|
|
|
|
|
r = g_slice_alloc0(sizeof(*r));
|
|
|
|
|
@ -214,9 +214,10 @@ struct redis *redis_new(const endpoint_t *ep, int db, int role) {
|
|
|
|
|
r->endpoint = *ep;
|
|
|
|
|
sockaddr_print(&ep->address, r->host, sizeof(r->host));
|
|
|
|
|
r->db = db;
|
|
|
|
|
r->role = role;
|
|
|
|
|
mutex_init(&r->lock);
|
|
|
|
|
|
|
|
|
|
if (redis_connect(r, 10, role))
|
|
|
|
|
if (redis_connect(r, 10))
|
|
|
|
|
goto err;
|
|
|
|
|
|
|
|
|
|
return r;
|
|
|
|
|
@ -239,11 +240,11 @@ static void redis_close(struct redis *r) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* called with r->lock held if necessary */
|
|
|
|
|
static void redis_check_conn(struct redis *r, int role) {
|
|
|
|
|
static void redis_check_conn(struct redis *r) {
|
|
|
|
|
if (redisCommandNR(r->ctx, "PING") == 0)
|
|
|
|
|
return;
|
|
|
|
|
rlog(LOG_INFO, "Lost connection to Redis");
|
|
|
|
|
if (redis_connect(r, 1, role))
|
|
|
|
|
if (redis_connect(r, 1))
|
|
|
|
|
abort();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -1048,7 +1049,7 @@ static void restore_thread(void *call_p, void *ctx_p) {
|
|
|
|
|
mutex_unlock(&ctx->r_m);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int redis_restore(struct callmaster *m, struct redis *r, int role) {
|
|
|
|
|
int redis_restore(struct callmaster *m, struct redis *r) {
|
|
|
|
|
redisReply *calls, *call;
|
|
|
|
|
int i, ret = -1;
|
|
|
|
|
GThreadPool *gtp;
|
|
|
|
|
@ -1060,7 +1061,7 @@ int redis_restore(struct callmaster *m, struct redis *r, int role) {
|
|
|
|
|
log_level |= LOG_FLAG_RESTORE;
|
|
|
|
|
|
|
|
|
|
rlog(LOG_DEBUG, "Restoring calls from Redis...");
|
|
|
|
|
redis_check_conn(r, role);
|
|
|
|
|
redis_check_conn(r);
|
|
|
|
|
|
|
|
|
|
calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls");
|
|
|
|
|
|
|
|
|
|
@ -1073,7 +1074,7 @@ int redis_restore(struct callmaster *m, struct redis *r, int role) {
|
|
|
|
|
mutex_init(&ctx.r_m);
|
|
|
|
|
g_queue_init(&ctx.r_q);
|
|
|
|
|
for (i = 0; i < RESTORE_NUM_THREADS; i++)
|
|
|
|
|
g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, role));
|
|
|
|
|
g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, r->role));
|
|
|
|
|
gtp = g_thread_pool_new(restore_thread, &ctx, RESTORE_NUM_THREADS, TRUE, NULL);
|
|
|
|
|
|
|
|
|
|
for (i = 0; i < calls->elements; i++) {
|
|
|
|
|
@ -1185,7 +1186,7 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, con
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/* must be called lock-free */
|
|
|
|
|
void redis_update(struct call *c, struct redis *r, int role) {
|
|
|
|
|
void redis_update(struct call *c, struct redis *r) {
|
|
|
|
|
GList *l, *n, *k, *m;
|
|
|
|
|
struct call_monologue *ml, *ml2;
|
|
|
|
|
struct call_media *media;
|
|
|
|
|
@ -1199,7 +1200,7 @@ void redis_update(struct call *c, struct redis *r, int role) {
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
mutex_lock(&r->lock);
|
|
|
|
|
redis_check_conn(r, role);
|
|
|
|
|
redis_check_conn(r);
|
|
|
|
|
|
|
|
|
|
rwlock_lock_r(&c->master_lock);
|
|
|
|
|
|
|
|
|
|
@ -1444,12 +1445,12 @@ void redis_update(struct call *c, struct redis *r, int role) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* must be called lock-free */
|
|
|
|
|
void redis_delete(struct call *c, struct redis *r, int role) {
|
|
|
|
|
void redis_delete(struct call *c, struct redis *r) {
|
|
|
|
|
if (!r)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
mutex_lock(&r->lock);
|
|
|
|
|
redis_check_conn(r, role);
|
|
|
|
|
redis_check_conn(r);
|
|
|
|
|
rwlock_lock_r(&c->master_lock);
|
|
|
|
|
|
|
|
|
|
redis_delete_call(c, r);
|
|
|
|
|
@ -1462,12 +1463,12 @@ void redis_delete(struct call *c, struct redis *r, int role) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void redis_wipe(struct redis *r, int role) {
|
|
|
|
|
void redis_wipe(struct redis *r) {
|
|
|
|
|
if (!r)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
mutex_lock(&r->lock);
|
|
|
|
|
redis_check_conn(r, role);
|
|
|
|
|
redis_check_conn(r);
|
|
|
|
|
redisCommandNR(r->ctx, "DEL calls");
|
|
|
|
|
mutex_unlock(&r->lock);
|
|
|
|
|
}
|
|
|
|
|
|