|
|
|
@ -75,6 +75,10 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str *
|
|
|
|
|
static void redis_pipe(struct redis *r, const char *fmt, ...) {
|
|
|
|
|
va_list ap;
|
|
|
|
|
|
|
|
|
|
if (!r->ctx) {
|
|
|
|
|
ilog(LOG_ERROR, "Unable to pipe redis command. No redis context");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
va_start(ap, fmt);
|
|
|
|
|
redisvAppendCommand(r->ctx, fmt, ap);
|
|
|
|
|
va_end(ap);
|
|
|
|
@ -84,6 +88,10 @@ static redisReply *redis_get(struct redis *r, int type, const char *fmt, ...) {
|
|
|
|
|
va_list ap;
|
|
|
|
|
redisReply *ret;
|
|
|
|
|
|
|
|
|
|
if (!r->ctx) {
|
|
|
|
|
ilog(LOG_ERROR, "Unable to get redis reply. No redis context");
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
va_start(ap, fmt);
|
|
|
|
|
ret = redis_expect(type, redisvCommand(r->ctx, fmt, ap));
|
|
|
|
|
va_end(ap);
|
|
|
|
@ -95,6 +103,10 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) {
|
|
|
|
|
redisReply *ret;
|
|
|
|
|
int i = 0;
|
|
|
|
|
|
|
|
|
|
if (!r) {
|
|
|
|
|
ilog(LOG_ERROR, "Unable to send redis command. No redis context");
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
va_start(ap, fmt);
|
|
|
|
|
ret = redisvCommand(r, fmt, ap);
|
|
|
|
|
va_end(ap);
|
|
|
|
@ -116,6 +128,11 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...) {
|
|
|
|
|
static int redis_check_type(struct redis *r, char *key, char *suffix, char *type) {
|
|
|
|
|
redisReply *rp;
|
|
|
|
|
|
|
|
|
|
if (!r->ctx) {
|
|
|
|
|
ilog(LOG_ERROR, "Unable to check redis reply type. No redis context");
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rp = redisCommand(r->ctx, "TYPE %s%s", key, suffix ? : "");
|
|
|
|
|
if (!rp)
|
|
|
|
|
return -1;
|
|
|
|
@ -134,6 +151,11 @@ static int redis_check_type(struct redis *r, char *key, char *suffix, char *type
|
|
|
|
|
static void redis_consume(struct redis *r) {
|
|
|
|
|
redisReply *rp;
|
|
|
|
|
|
|
|
|
|
if (!r->ctx) {
|
|
|
|
|
ilog(LOG_ERROR, "Unable to consume pipelined replies. No redis context");
|
|
|
|
|
r->pipeline = 0;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
while (r->pipeline) {
|
|
|
|
|
if (redisGetReply(r->ctx, (void **) &rp) == REDIS_OK)
|
|
|
|
|
freeReplyObject(rp);
|
|
|
|
@ -292,7 +314,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
|
|
|
|
|
|
|
|
|
|
// select the right db for restoring the call
|
|
|
|
|
if (redisCommandNR(r->ctx, "SELECT %i", r->db)) {
|
|
|
|
|
if (r->ctx->err)
|
|
|
|
|
if (r->ctx && r->ctx->err)
|
|
|
|
|
rlog(LOG_ERROR, "Redis error: %s", r->ctx->errstr);
|
|
|
|
|
redisFree(r->ctx);
|
|
|
|
|
r->ctx = NULL;
|
|
|
|
@ -1621,7 +1643,8 @@ int redis_restore(struct callmaster *m, struct redis *r) {
|
|
|
|
|
calls = redis_get(r, REDIS_REPLY_ARRAY, "KEYS *");
|
|
|
|
|
|
|
|
|
|
if (!calls) {
|
|
|
|
|
rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr);
|
|
|
|
|
rlog(LOG_ERR, "Could not retrieve call list from Redis: %s",
|
|
|
|
|
r->ctx ? r->ctx->errstr : "No redis context");
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -2077,7 +2100,7 @@ err:
|
|
|
|
|
|
|
|
|
|
mutex_unlock(&r->lock);
|
|
|
|
|
rwlock_unlock_r(&c->master_lock);
|
|
|
|
|
if (r->ctx->err)
|
|
|
|
|
if (r->ctx && r->ctx->err)
|
|
|
|
|
rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr);
|
|
|
|
|
redisFree(r->ctx);
|
|
|
|
|
r->ctx = NULL;
|
|
|
|
@ -2110,7 +2133,7 @@ err:
|
|
|
|
|
rwlock_unlock_r(&c->master_lock);
|
|
|
|
|
mutex_unlock(&r->lock);
|
|
|
|
|
|
|
|
|
|
if (r->ctx->err)
|
|
|
|
|
if (r->ctx && r->ctx->err)
|
|
|
|
|
rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr);
|
|
|
|
|
redisFree(r->ctx);
|
|
|
|
|
r->ctx = NULL;
|
|
|
|
|