Add paramaters to disable redis communication in case of multiple errors

pull/445/head
Claudiu Boriga 8 years ago
parent 8941c827d3
commit c31ffa4639

@ -183,6 +183,8 @@ option and which are reproduced below:
--redis-expires=INT Expire time in seconds for redis keys
--redis-multikey Use multiple redis keys for storing the call (old behaviour) DEPRECATED
-q, --no-redis-required Start even if can't connect to redis databases
--redis-allowed-errors Number of allowed errors before redis is temporarily disabled
--redis-disable-time Number of seconds redis communication is disabled because of errors
-b, --b2b-url=STRING XMLRPC URL of B2B UA
-L, --log-level=INT Mask log priorities above this level
--log-facility=daemon|local0|... Syslog facility to use for logging
@ -444,6 +446,16 @@ The options are described in more detail below.
Be aware that if the -r redis can't be initially connected, sessions are not reloaded upon rtpengine startup,
even though rtpengine still starts.
* --redis-allowed-errors
If this parameter is present and has a positive value, it will configure how many consecutive errors
are allowed when communicating with a redis server before the redis communication will be temporarily disabled
for that server. While the communcation is disabled there will be no attempts to reconnect to redis or send
commands to that server. Default value is -1, meaning that this feature is disabled.
* --redis-disable-time
This parameter configures the number of seconds redis communication is disabled because of errors.
This works together with redis-allowed-errors parameter. The default value is 10.
* -b, --b2b-url
Enables and sets the URI for an XMLRPC callback to be made when a call is torn down due to packet

@ -71,6 +71,8 @@ static int redis_db = -1;
static int redis_write_db = -1;
static int redis_num_threads;
static int no_redis_required;
static int redis_allowed_errors = -1;
static int redis_disable_time = 10;
static char *redis_auth;
static char *redis_write_auth;
static char *b2b_url;
@ -278,6 +280,8 @@ static void options(int *argc, char ***argv) {
{ "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" },
{ "redis-expires", 0, 0, G_OPTION_ARG_INT, &redis_expires, "Expire time in seconds for redis keys", "INT" },
{ "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL },
{ "redis-allowed-errors", 0, 0, G_OPTION_ARG_INT, &redis_allowed_errors, "Number of allowed errors before redis is temporarily disabled", "INT" },
{ "redis-disable-time", 0, 0, G_OPTION_ARG_INT, &redis_disable_time, "Number of seconds redis communication is disabled because of errors", "INT" },
{ "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" },
{ "log-facility-cdr",0, 0, G_OPTION_ARG_STRING, &log_facility_cdr_s, "Syslog facility to use for logging CDRs", "daemon|local0|...|local7"},
{ "log-facility-rtcp",0, 0, G_OPTION_ARG_STRING, &log_facility_rtcp_s, "Syslog facility to use for logging RTCP", "daemon|local0|...|local7"},
@ -576,15 +580,21 @@ no_kernel:
}
if (!is_addr_unspecified(&redis_write_ep.address)) {
mc.redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required);
mc.redis_write = redis_new(&redis_write_ep, redis_write_db,
redis_write_auth, ANY_REDIS_ROLE, no_redis_required,
redis_allowed_errors, redis_disable_time);
if (!mc.redis_write)
die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&redis_write_ep));
}
if (!is_addr_unspecified(&redis_ep.address)) {
mc.redis = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
mc.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
mc.redis = redis_new(&redis_ep, redis_db, redis_auth,
mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE,
no_redis_required, redis_allowed_errors, redis_disable_time);
mc.redis_notify = redis_new(&redis_ep, redis_db, redis_auth,
mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE,
no_redis_required, redis_allowed_errors, redis_disable_time);
if (!mc.redis || !mc.redis_notify)
die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&redis_ep));

@ -628,9 +628,10 @@ void redis_notify_loop(void *d) {
redis_notify_event_base_action(cm, EVENT_BASE_FREE);
}
struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) {
struct redis *redis_new(const endpoint_t *ep, int db, const char *auth,
enum redis_role role, int no_redis_required, int redis_allowed_errors,
int redis_disable_time) {
struct redis *r;
r = g_slice_alloc0(sizeof(*r));
r->endpoint = *ep;
@ -640,6 +641,10 @@ struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum red
r->role = role;
r->state = REDIS_STATE_DISCONNECTED;
r->no_redis_required = no_redis_required;
r->allowed_errors = redis_allowed_errors;
r->disable_time = redis_disable_time;
r->restore_tick = 0;
r->consecutive_errors = 0;
mutex_init(&r->lock);
if (redis_connect(r, 10)) {
@ -671,9 +676,34 @@ static void redis_close(struct redis *r) {
g_slice_free1(sizeof(*r), r);
}
static void redis_count_err_and_disable(struct redis *r)
{
if (r->allowed_errors < 0) {
return;
}
r->consecutive_errors++;
if (r->consecutive_errors > r->allowed_errors) {
r->restore_tick = g_now.tv_sec + r->disable_time;
ilog(LOG_WARNING, "Redis server %s disabled for %d seconds",
endpoint_print_buf(&r->endpoint),
r->disable_time);
}
}
/* must be called with r->lock held */
static int redis_check_conn(struct redis *r) {
if ((r->state == REDIS_STATE_DISCONNECTED) && (r->restore_tick > g_now.tv_sec)) {
ilog(LOG_WARNING, "Redis server %s is disabled. Don't try RE-Establishing for %d seconds",
endpoint_print_buf(&r->endpoint),r->disable_time);
return REDIS_STATE_DISCONNECTED;
}
if (r->state == REDIS_STATE_DISCONNECTED)
ilog(LOG_INFO, "RE-Establishing connection for Redis server %s",endpoint_print_buf(&r->endpoint));
// try redis connection
if (redisCommandNR(r->ctx, "PING") == 0) {
// redis is connected
@ -690,9 +720,12 @@ static int redis_check_conn(struct redis *r) {
// try redis reconnect => will free current r->ctx
if (redis_connect(r, 1)) {
// redis is disconnected
redis_count_err_and_disable(r);
return REDIS_STATE_DISCONNECTED;
}
r->consecutive_errors = 0;
// redis is connected
if (r->state == REDIS_STATE_DISCONNECTED) {
rlog(LOG_INFO, "RE-Established connection to Redis %s",
@ -1650,7 +1683,10 @@ int redis_restore(struct callmaster *m, struct redis *r) {
mutex_init(&ctx.r_m);
g_queue_init(&ctx.r_q);
for (i = 0; i < m->conf.redis_num_threads; i++)
g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, r->auth, r->role, r->no_redis_required));
g_queue_push_tail(&ctx.r_q,
redis_new(&r->endpoint, r->db, r->auth, r->role,
r->no_redis_required, r->allowed_errors,
r->disable_time));
gtp = g_thread_pool_new(restore_thread, &ctx, m->conf.redis_num_threads, TRUE, NULL);
for (i = 0; i < calls->elements; i++) {

@ -61,6 +61,10 @@ struct redis {
int state;
int no_redis_required;
int allowed_errors;
int consecutive_errors;
int disable_time;
time_t restore_tick;
};
struct redis_hash {
@ -92,7 +96,7 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v)
void redis_notify_loop(void *d);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int, int, int);
int redis_restore(struct callmaster *, struct redis *);
void redis_update(struct call *, struct redis *);
void redis_update_onekey(struct call *c, struct redis *r);

Loading…
Cancel
Save