Make 'redis-allowed-errors', 'redis-disable-time', 'redis-cmd-timeout'

and 'redis-connect-timeout' configurable via rtpengine-ctl
pull/456/head
Claudiu Boriga 8 years ago
parent f2ce4d3a25
commit 0f9151e9ea

@ -445,23 +445,26 @@ The options are described in more detail below.
If this parameter is present and has a value >= 0, it will configure how many consecutive errors are allowed
when communicating with a redis server before the redis communication will be temporarily disabled for that
server. While the communcation is disabled there will be no attempts to reconnect to redis or send commands
to that server. Default value is -1, meaning that this feature is disabled.
to that server. Default value is -1, meaning that this feature is disabled. This parameter can also be set or
listed via rtpengine-ctl.
* --redis-disable-time
This parameter configures the number of seconds redis communication is disabled because of errors.
This works together with redis-allowed-errors parameter. The default value is 10.
This works together with redis-allowed-errors parameter. The default value is 10. This parameter can also be
set or listed via rtpengine-ctl.
* --redis-cmd-timeout
If this parameter is set to a non-zero value it will set the timeout, in milliseconds, for each command to the redis server.
If redis does not reply within the specified timeout the command will fail. The default value is 0, meaning that the commands
will have no timeout
will be blocking without timeout. This parameter can also be set or listed via rtpengine-ctl; note that setting the parameter
to 0 will require a reconnect on all configured redis servers.
* --redis-connect-timeout
This parameter sets the timeout value, in milliseconds, when connecting to a redis server. If the connection cannot be made
within the specified timeout the connection will fail. Note that in case of failure, when reconnecting to redis, a PING command
is issued before attempting to connect so the `--redis-cmd-timeout` value will also be added to the total waiting time.
This is useful if using `--redis-allowed-errors`, when attempting to estimate the total lost time in case of redis failures.
The default value for the connection timeout is 1000ms.
The default value for the connection timeout is 1000ms. This parameter can also be set or listed via rtpengine-ctl.
* -b, --b2b-url

@ -47,6 +47,10 @@ static void cli_incoming_set_timeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_silenttimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_finaltimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_loglevel(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_redisallowederrors(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_redisdisabletime(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_redisconnecttimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_rediscmdtimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_numsessions(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuffer);
@ -57,6 +61,11 @@ static void cli_incoming_list_timeout(str *instr, struct streambuf *replybuffer)
static void cli_incoming_list_silenttimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_finaltimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_redisallowederrors(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_redisdisabletime(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_redisconnecttimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_rediscmdtimeout(str *instr, struct streambuf *replybuffer);
static const cli_handler_t cli_top_handlers[] = {
{ "list", cli_incoming_list },
@ -69,24 +78,32 @@ static const cli_handler_t cli_top_handlers[] = {
{ NULL, },
};
static const cli_handler_t cli_set_handlers[] = {
{ "maxopenfiles", cli_incoming_set_maxopenfiles },
{ "maxsessions", cli_incoming_set_maxsessions },
{ "timeout", cli_incoming_set_timeout },
{ "silenttimeout", cli_incoming_set_silenttimeout },
{ "finaltimeout", cli_incoming_set_finaltimeout },
{ "loglevel", cli_incoming_set_loglevel },
{ "maxopenfiles", cli_incoming_set_maxopenfiles },
{ "maxsessions", cli_incoming_set_maxsessions },
{ "timeout", cli_incoming_set_timeout },
{ "silenttimeout", cli_incoming_set_silenttimeout },
{ "finaltimeout", cli_incoming_set_finaltimeout },
{ "loglevel", cli_incoming_set_loglevel },
{ "redisallowederrors", cli_incoming_set_redisallowederrors },
{ "redisdisabletime", cli_incoming_set_redisdisabletime },
{ "redisconnecttimeout", cli_incoming_set_redisconnecttimeout },
{ "rediscmdtimeout", cli_incoming_set_rediscmdtimeout },
{ NULL, },
};
static const cli_handler_t cli_list_handlers[] = {
{ "numsessions", cli_incoming_list_numsessions },
{ "sessions", cli_incoming_list_sessions },
{ "totals", cli_incoming_list_totals },
{ "maxopenfiles", cli_incoming_list_maxopenfiles },
{ "maxsessions", cli_incoming_list_maxsessions },
{ "timeout", cli_incoming_list_timeout },
{ "silenttimeout", cli_incoming_list_silenttimeout },
{ "finaltimeout", cli_incoming_list_finaltimeout },
{ "loglevel", cli_incoming_list_loglevel },
{ "numsessions", cli_incoming_list_numsessions },
{ "sessions", cli_incoming_list_sessions },
{ "totals", cli_incoming_list_totals },
{ "maxopenfiles", cli_incoming_list_maxopenfiles },
{ "maxsessions", cli_incoming_list_maxsessions },
{ "timeout", cli_incoming_list_timeout },
{ "silenttimeout", cli_incoming_list_silenttimeout },
{ "finaltimeout", cli_incoming_list_finaltimeout },
{ "loglevel", cli_incoming_list_loglevel },
{ "redisallowederrors", cli_incoming_list_redisallowederrors },
{ "redisdisabletime", cli_incoming_list_redisdisabletime },
{ "redisconnecttimeout", cli_incoming_list_redisconnecttimeout },
{ "rediscmdtimeout", cli_incoming_list_rediscmdtimeout },
{ NULL, },
};
@ -865,3 +882,147 @@ static void cli_incoming_set_loglevel(str *instr, struct streambuf *replybuffer)
g_atomic_int_set(&rtpe_config.common.log_level, nl);
streambuf_printf(replybuffer, "Success setting loglevel to %i\n", nl);
}
static void cli_incoming_list_redisallowederrors(str *instr, struct streambuf *replybuffer) {
rwlock_lock_r(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "%d\n", rtpe_config.redis_allowed_errors);
rwlock_unlock_r(&rtpe_config.config_lock);
}
static void cli_incoming_set_redisallowederrors(str *instr, struct streambuf *replybuffer) {
long allowed_errors;
char *endptr;
if (str_shift(instr, 1)) {
streambuf_printf(replybuffer, "%s\n", "More parameters required.");
return;
}
allowed_errors = strtol(instr->s, &endptr, 10);
rwlock_lock_w(&rtpe_config.config_lock);
rtpe_config.redis_allowed_errors = allowed_errors;
rwlock_unlock_w(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "Success setting redis-allowed-errors to %ld\n", allowed_errors);
}
static void cli_incoming_list_redisdisabletime(str *instr, struct streambuf *replybuffer) {
rwlock_lock_r(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "%d\n", rtpe_config.redis_disable_time);
rwlock_unlock_r(&rtpe_config.config_lock);
}
static void cli_incoming_set_redisdisabletime(str *instr, struct streambuf *replybuffer) {
long seconds;
char *endptr;
if (str_shift(instr, 1)) {
streambuf_printf(replybuffer, "%s\n", "More parameters required.");
return;
}
seconds = strtol(instr->s, &endptr, 10);
if (seconds < 0) {
streambuf_printf(replybuffer, "Invalid redis-disable-time value %ld, must be >= 0\n", seconds);
return;
}
rwlock_lock_w(&rtpe_config.config_lock);
rtpe_config.redis_disable_time = seconds;
rwlock_unlock_w(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "Success setting redis-disable-time to %ld\n", seconds);
}
static void cli_incoming_list_redisconnecttimeout(str *instr, struct streambuf *replybuffer) {
rwlock_lock_r(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "%d\n", rtpe_config.redis_connect_timeout);
rwlock_unlock_r(&rtpe_config.config_lock);
}
static void cli_incoming_set_redisconnecttimeout(str *instr, struct streambuf *replybuffer) {
long timeout;
char *endptr;
if (str_shift(instr, 1)) {
streambuf_printf(replybuffer, "%s\n", "More parameters required.");
return ;
}
timeout = strtol(instr->s, &endptr, 10);
if (timeout <= 0) {
streambuf_printf(replybuffer, "Invalid redis-connect-timeout value %ld, must be > 0\n", timeout);
return;
}
rwlock_lock_w(&rtpe_config.config_lock);
rtpe_config.redis_connect_timeout = timeout;
rwlock_unlock_w(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "Success setting redis-connect-timeout to %ld\n", timeout);
}
static void cli_incoming_list_rediscmdtimeout(str *instr, struct streambuf *replybuffer) {
rwlock_lock_r(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "%d\n", rtpe_config.redis_cmd_timeout);
rwlock_unlock_r(&rtpe_config.config_lock);
}
static void cli_incoming_set_rediscmdtimeout(str *instr, struct streambuf *replybuffer) {
long timeout;
char *endptr;
int fail = 0;
if (str_shift(instr, 1)) {
streambuf_printf(replybuffer, "%s\n", "More parameters required.");
return;
}
timeout = strtol(instr->s, &endptr, 10);
if (timeout < 0) {
streambuf_printf(replybuffer, "Invalid redis-cmd-timeout value %ld, must be >= 0\n", timeout);
return;
}
rwlock_lock_w(&rtpe_config.config_lock);
if (rtpe_config.redis_cmd_timeout == timeout) {
rwlock_unlock_w(&rtpe_config.config_lock);
streambuf_printf(replybuffer, "Success setting redis-cmd-timeout to %ld\n", timeout);
return;
}
rtpe_config.redis_cmd_timeout = timeout;
rwlock_unlock_w(&rtpe_config.config_lock);
if (timeout == 0) {
streambuf_printf(replybuffer, "Warning: Setting redis-cmd-timeout to 0 (no timeout) will require a redis reconnect\n");
if (rtpe_redis && redis_reconnect(rtpe_redis)) {
streambuf_printf(replybuffer, "Failed reconnecting to redis\n");
fail = 1;
}
if (rtpe_redis && redis_reconnect(rtpe_redis_write)) {
streambuf_printf(replybuffer, "Failed reconnecting to redis-write\n");
fail = 1;
}
if (rtpe_redis && redis_reconnect(rtpe_redis_notify)) {
streambuf_printf(replybuffer, "Failed reconnecting to redis-notify\n");
fail = 1;
}
} else {
if (rtpe_redis && redis_set_timeout(rtpe_redis, timeout)) {
streambuf_printf(replybuffer, "Failed setting redis-cmd-timeout for redis %ld\n", timeout);
fail = 1;
}
if (rtpe_redis_write && redis_set_timeout(rtpe_redis_write, timeout)) {
streambuf_printf(replybuffer, "Failed setting redis-cmd-timeout for redis-write %ld\n", timeout);
fail = 1;
}
if (rtpe_redis_notify && redis_set_timeout(rtpe_redis_notify, timeout)) {
streambuf_printf(replybuffer, "Failed setting redis-cmd-timeout for redis-notify %ld\n", timeout);
fail = 1;
}
}
if (!fail)
streambuf_printf(replybuffer, "Success setting redis-cmd-timeout to %ld\n", timeout);
}

@ -575,24 +575,15 @@ no_kernel:
if (!is_addr_unspecified(&rtpe_config.redis_write_ep.address)) {
rtpe_redis_write = redis_new(&rtpe_config.redis_write_ep,
rtpe_config.redis_write_db, rtpe_config.redis_write_auth,
ANY_REDIS_ROLE, rtpe_config.no_redis_required,
rtpe_config.redis_allowed_errors,
rtpe_config.redis_disable_time, rtpe_config.redis_cmd_timeout,
rtpe_config.redis_connect_timeout);
ANY_REDIS_ROLE, rtpe_config.no_redis_required);
if (!rtpe_redis_write)
die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&rtpe_config.redis_write_ep));
}
if (!is_addr_unspecified(&rtpe_config.redis_ep.address)) {
rtpe_redis = redis_new(&rtpe_config.redis_ep, rtpe_config.redis_db, rtpe_config.redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, rtpe_config.no_redis_required,
rtpe_config.redis_allowed_errors,
rtpe_config.redis_disable_time, rtpe_config.redis_cmd_timeout,
rtpe_config.redis_connect_timeout);
rtpe_redis_notify = redis_new(&rtpe_config.redis_ep, rtpe_config.redis_db, rtpe_config.redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, rtpe_config.no_redis_required,
rtpe_config.redis_allowed_errors,
rtpe_config.redis_disable_time, rtpe_config.redis_cmd_timeout,
rtpe_config.redis_connect_timeout);
rtpe_redis = redis_new(&rtpe_config.redis_ep, rtpe_config.redis_db, rtpe_config.redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, rtpe_config.no_redis_required);
rtpe_redis_notify = redis_new(&rtpe_config.redis_ep, rtpe_config.redis_db, rtpe_config.redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, rtpe_config.no_redis_required);
if (!rtpe_redis || !rtpe_redis_notify)
die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&rtpe_config.redis_ep));

@ -56,10 +56,10 @@ struct rtpengine_config {
int redis_db;
int redis_write_db;
int no_redis_required;
int redis_allowed_errors;
int redis_disable_time;
int redis_cmd_timeout;
int redis_connect_timeout;
int redis_allowed_errors;
int redis_disable_time;
int redis_cmd_timeout;
int redis_connect_timeout;
char *redis_auth;
char *redis_write_auth;
int num_threads;

@ -80,6 +80,7 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...)
static int redis_check_conn(struct redis *r);
static void json_restore_call(struct redis *r, const str *id, enum call_type type);
static int redis_connect(struct redis *r, int wait);
static void redis_pipe(struct redis *r, const char *fmt, ...) {
va_list ap;
@ -172,19 +173,47 @@ static void redis_consume(struct redis *r) {
}
}
int redis_set_timeout(struct redis* r, int timeout) {
struct timeval tv_cmd;
if (!timeout)
return 0;
tv_cmd.tv_sec = (int) timeout / 1000;
tv_cmd.tv_usec = (int) (timeout % 1000) * 1000;
if (redisSetTimeout(r->ctx, tv_cmd))
return -1;
ilog(LOG_INFO, "Setting timeout for Redis commands to %d milliseconds",timeout);
return 0;
}
int redis_reconnect(struct redis* r) {
int rval;
mutex_lock(&r->lock);
rval = redis_connect(r,1);
if (rval)
r->state = REDIS_STATE_DISCONNECTED;
mutex_unlock(&r->lock);
return rval;
}
/* called with r->lock held if necessary */
static int redis_connect(struct redis *r, int wait) {
struct timeval tv;
redisReply *rp;
char *s;
int cmd_timeout, connect_timeout;
if (r->ctx)
redisFree(r->ctx);
r->ctx = NULL;
tv.tv_sec = (int) r->connect_timeout / 1000;
tv.tv_usec = (int) (r->connect_timeout % 1000) * 1000;
rwlock_lock_r(&rtpe_config.config_lock);
connect_timeout = rtpe_config.redis_connect_timeout;
cmd_timeout = rtpe_config.redis_cmd_timeout;
rwlock_unlock_r(&rtpe_config.config_lock);
tv.tv_sec = (int) connect_timeout / 1000;
tv.tv_usec = (int) (connect_timeout % 1000) * 1000;
r->ctx = redisConnectWithTimeout(r->host, r->endpoint.port, tv);
if (!r->ctx)
@ -192,14 +221,8 @@ static int redis_connect(struct redis *r, int wait) {
if (r->ctx->err)
goto err2;
if (r->cmd_timeout) {
struct timeval tv_cmd;
tv_cmd.tv_sec = (int) r->cmd_timeout / 1000;
tv_cmd.tv_usec = (int) (r->cmd_timeout % 1000) * 1000;
if (redisSetTimeout(r->ctx, tv_cmd))
goto err2;
ilog(LOG_INFO, "Setting timeout for Redis commands to %d milliseconds",r->cmd_timeout);
}
if (redis_set_timeout(r,cmd_timeout))
goto err2;
if (r->auth) {
if (redisCommandNR(r->ctx, "AUTH %s", r->auth))
@ -608,8 +631,7 @@ void redis_notify_loop(void *d) {
}
struct redis *redis_new(const endpoint_t *ep, int db, const char *auth,
enum redis_role role, int no_redis_required, int redis_allowed_errors,
int redis_disable_time, int redis_cmd_timeout,int redis_connect_timeout) {
enum redis_role role, int no_redis_required) {
struct redis *r;
r = g_slice_alloc0(sizeof(*r));
@ -620,12 +642,8 @@ struct redis *redis_new(const endpoint_t *ep, int db, const char *auth,
r->role = role;
r->state = REDIS_STATE_DISCONNECTED;
r->no_redis_required = no_redis_required;
r->allowed_errors = redis_allowed_errors;
r->disable_time = redis_disable_time;
r->restore_tick = 0;
r->consecutive_errors = 0;
r->cmd_timeout = redis_cmd_timeout;
r->connect_timeout = redis_connect_timeout;
mutex_init(&r->lock);
if (redis_connect(r, 10)) {
@ -659,17 +677,24 @@ static void redis_close(struct redis *r) {
static void redis_count_err_and_disable(struct redis *r)
{
int allowed_errors;
int disable_time;
rwlock_lock_r(&rtpe_config.config_lock);
allowed_errors = rtpe_config.redis_allowed_errors;
disable_time = rtpe_config.redis_disable_time;
rwlock_unlock_r(&rtpe_config.config_lock);
if (r->allowed_errors < 0) {
if (allowed_errors < 0) {
return;
}
r->consecutive_errors++;
if (r->consecutive_errors > r->allowed_errors) {
r->restore_tick = rtpe_now.tv_sec + r->disable_time;
if (r->consecutive_errors > allowed_errors) {
r->restore_tick = rtpe_now.tv_sec + disable_time;
ilog(LOG_WARNING, "Redis server %s disabled for %d seconds",
endpoint_print_buf(&r->endpoint),
r->disable_time);
disable_time);
}
}
@ -677,8 +702,8 @@ static void redis_count_err_and_disable(struct redis *r)
static int redis_check_conn(struct redis *r) {
if ((r->state == REDIS_STATE_DISCONNECTED) && (r->restore_tick > rtpe_now.tv_sec)) {
ilog(LOG_WARNING, "Redis server %s is disabled. Don't try RE-Establishing for %d seconds",
endpoint_print_buf(&r->endpoint),r->disable_time);
ilog(LOG_WARNING, "Redis server %s is disabled. Don't try RE-Establishing for %ld more seconds",
endpoint_print_buf(&r->endpoint),r->restore_tick - rtpe_now.tv_sec);
return REDIS_STATE_DISCONNECTED;
}
@ -1669,9 +1694,7 @@ int redis_restore(struct redis *r) {
g_queue_init(&ctx.r_q);
for (i = 0; i < rtpe_config.redis_num_threads; i++)
g_queue_push_tail(&ctx.r_q,
redis_new(&r->endpoint, r->db, r->auth, r->role,
r->no_redis_required, r->allowed_errors,
r->disable_time, r->cmd_timeout, r->connect_timeout));
redis_new(&r->endpoint, r->db, r->auth, r->role, r->no_redis_required));
gtp = g_thread_pool_new(restore_thread, &ctx, rtpe_config.redis_num_threads, TRUE, NULL);
for (i = 0; i < calls->elements; i++) {

@ -60,12 +60,8 @@ struct redis {
int state;
int no_redis_required;
int allowed_errors;
int consecutive_errors;
int disable_time;
time_t restore_tick;
int cmd_timeout;
int connect_timeout;
};
struct redis_hash {
@ -106,7 +102,7 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v)
void redis_notify_loop(void *d);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int, int, int, int, int);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int);
int redis_restore(struct redis *);
void redis_update(struct call *, struct redis *);
void redis_update_onekey(struct call *c, struct redis *r);
@ -114,7 +110,8 @@ void redis_delete(struct call *, struct redis *);
void redis_wipe(struct redis *);
int redis_notify_event_base_action(enum event_base_action);
int redis_notify_subscribe_action(enum subscribe_action action, int keyspace);
int redis_set_timeout(struct redis* r, int timeout);
int redis_reconnect(struct redis* r);

Loading…
Cancel
Save