MT#61556 redis: re-resolve when re-connecting

When re-connecting to the remote redis server
try to re-resolve if the redist hostname
was an FQDN and not IP address.

Change-Id: Ie80e1d1a1ea76811c54123201ad4fe8cb64fc748
pull/1880/head
Donat Zenichev 5 months ago
parent f2b5df0a41
commit 1eb0c5e13e

@ -364,9 +364,10 @@ static int if_addr_parse(intf_config_q *q, char *s, struct ifaddrs *ifas) {
static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth_env, char *s) {
char *sl;
static int redis_ep_parse(endpoint_t *ep, int *db, char **hostname, char **auth, const char *auth_env, char *s) {
char *sl, *sp;
long l;
char buf[255]; // max length due to RFC standards
sl = strrchr(s, '@');
if (sl) {
@ -390,6 +391,14 @@ static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth
if (l < 0)
return -1;
*db = l;
/* copy for the case with re-resolve during re-connections */
sp = strrchr(s, ':'); /* make sure to not take port into the value of hostname */
if (sp)
*hostname = g_strdup_printf("%.*s", (int)(sp - s), s);
else
*hostname = g_strdup(s);
if (endpoint_parse_any_getaddrinfo_full(ep, s))
return -1;
return 0;
@ -561,6 +570,7 @@ static void options(int *argc, char ***argv) {
{ "port-max", 'M', 0, G_OPTION_ARG_INT, &rtpe_config.port_max, "Highest port to use for RTP", "INT" },
{ "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "[PW@]IP:PORT/INT" },
{ "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" },
{ "redis-resolve-on-reconnect", 0,0, G_OPTION_ARG_NONE, &rtpe_config.redis_resolve_on_reconnect, "Re-resolve given FQDN on each re-connect to the redis server.", NULL },
{ "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_num_threads, "Number of Redis restore threads", "INT" },
{ "redis-expires", 0, 0, G_OPTION_ARG_INT, &rtpe_config.redis_expires_secs, "Expire time in seconds for redis keys", "INT" },
{ "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &rtpe_config.no_redis_required, "Start no matter of redis connection state", NULL },
@ -866,14 +876,21 @@ static void options(int *argc, char ***argv) {
if (rtpe_config.rtcp_interval <= 0)
rtpe_config.rtcp_interval = 5000;
if (redisps)
if (redis_ep_parse(&rtpe_config.redis_ep, &rtpe_config.redis_db, &rtpe_config.redis_auth, "RTPENGINE_REDIS_AUTH_PW", redisps))
if (redisps) {
if (redis_ep_parse(&rtpe_config.redis_ep, &rtpe_config.redis_db, &rtpe_config.redis_hostname,
&rtpe_config.redis_auth, "RTPENGINE_REDIS_AUTH_PW", redisps))
{
die("Invalid Redis endpoint [IP:PORT/INT] '%s' (--redis)", redisps);
}
}
if (redisps_write)
if (redis_ep_parse(&rtpe_config.redis_write_ep, &rtpe_config.redis_write_db, &rtpe_config.redis_write_auth,
"RTPENGINE_REDIS_WRITE_AUTH_PW", redisps_write))
if (redisps_write) {
if (redis_ep_parse(&rtpe_config.redis_write_ep, &rtpe_config.redis_write_db, &rtpe_config.redis_write_hostname,
&rtpe_config.redis_write_auth, "RTPENGINE_REDIS_WRITE_AUTH_PW", redisps_write))
{
die("Invalid Redis endpoint [IP:PORT/INT] '%s' (--redis-write)", redisps_write);
}
}
if (rtpe_config.fmt > 2)
die("Invalid XMLRPC format");
@ -1358,22 +1375,41 @@ static void create_everything(void) {
if (!is_addr_unspecified(&rtpe_config.redis_write_ep.address)) {
rtpe_redis_write = redis_new(&rtpe_config.redis_write_ep,
rtpe_config.redis_write_db, rtpe_config.redis_write_auth,
ANY_REDIS_ROLE, rtpe_config.no_redis_required);
rtpe_config.redis_write_db,
rtpe_config.redis_write_hostname,
rtpe_config.redis_write_auth,
ANY_REDIS_ROLE,
rtpe_config.no_redis_required,
rtpe_config.redis_resolve_on_reconnect);
if (!rtpe_redis_write)
die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&rtpe_config.redis_write_ep));
}
if (!is_addr_unspecified(&rtpe_config.redis_ep.address)) {
rtpe_redis = redis_new(&rtpe_config.redis_ep, rtpe_config.redis_db, rtpe_config.redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, rtpe_config.no_redis_required);
rtpe_redis = redis_new(&rtpe_config.redis_ep,
rtpe_config.redis_db,
rtpe_config.redis_hostname,
rtpe_config.redis_auth,
(rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE),
rtpe_config.no_redis_required,
rtpe_config.redis_resolve_on_reconnect);
if (!rtpe_redis)
die("Cannot start up without running Redis %s database! "
"See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&rtpe_config.redis_ep));
if (rtpe_config.redis_subscribed_keyspaces.length) {
rtpe_redis_notify = redis_new(&rtpe_config.redis_ep, rtpe_config.redis_db, rtpe_config.redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, rtpe_config.no_redis_required);
rtpe_redis_notify = redis_new(&rtpe_config.redis_ep,
rtpe_config.redis_db,
rtpe_config.redis_hostname,
rtpe_config.redis_auth,
(rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE),
rtpe_config.no_redis_required,
rtpe_config.redis_resolve_on_reconnect);
if (!rtpe_redis_notify)
die("Cannot start up without running notification Redis %s database! "
"See also NO_REDIS_REQUIRED parameter.",

@ -112,7 +112,7 @@ static int redis_ports_release_balance = 0; // negative = releasers, positive =
static int redis_check_conn(struct redis *r);
static void json_restore_call(struct redis *r, const str *id, bool foreign);
static int redis_connect(struct redis *r, int wait);
static int redis_connect(struct redis *r, int wait, bool resolve);
static int json_build_ssrc(struct call_monologue *ml, parser_arg arg);
@ -244,7 +244,8 @@ int redis_set_timeout(struct redis* r, int timeout) {
int redis_reconnect(struct redis* r) {
int rval;
LOCK(&r->lock);
rval = redis_connect(r,1);
rval = redis_connect(r, 1, r->update_resolve);
if (rval)
r->state = REDIS_STATE_DISCONNECTED;
return rval;
@ -261,11 +262,12 @@ static int redis_select_db(struct redis *r, int db) {
}
/* called with r->lock held if necessary */
static int redis_connect(struct redis *r, int wait) {
static int redis_connect(struct redis *r, int wait, bool resolve) {
struct timeval tv;
redisReply *rp;
char *s;
int cmd_timeout, connect_timeout;
sockaddr_t a;
if (r->ctx)
redisFree(r->ctx);
@ -277,6 +279,17 @@ static int redis_connect(struct redis *r, int wait) {
tv.tv_sec = (int) connect_timeout / 1000;
tv.tv_usec = (int) (connect_timeout % 1000) * 1000;
/* re-resolve if asked */
if (resolve && r->hostname) {
if (sockaddr_getaddrinfo(&a, r->hostname))
ilog(LOG_WARN, "Failed to re-resolve remote server hostname: '%s'. Just use older one: '%s'.",
r->hostname, r->host);
else
sockaddr_print(&a, r->host, sizeof(r->host));
r->endpoint.address = a;
}
r->ctx = redisConnectWithTimeout(r->host, r->endpoint.port, tv);
if (!r->ctx)
@ -868,8 +881,8 @@ void redis_notify_loop(void *d) {
r->async_ctx = NULL;
}
struct redis *redis_new(const endpoint_t *ep, int db, const char *auth,
enum redis_role role, int no_redis_required) {
struct redis *redis_new(const endpoint_t *ep, int db, const char *hostname, const char *auth,
enum redis_role role, int no_redis_required, bool update_resolve) {
struct redis *r;
r = g_slice_alloc0(sizeof(*r));
@ -877,14 +890,16 @@ struct redis *redis_new(const endpoint_t *ep, int db, const char *auth,
sockaddr_print(&ep->address, r->host, sizeof(r->host));
r->db = db;
r->auth = auth;
r->hostname = hostname;
r->role = role;
r->state = REDIS_STATE_DISCONNECTED;
r->no_redis_required = no_redis_required;
r->restore_tick = 0;
r->consecutive_errors = 0;
r->update_resolve = update_resolve;
mutex_init(&r->lock);
if (redis_connect(r, 10)) {
if (redis_connect(r, 10, false)) {
if (r->no_redis_required) {
rlog(LOG_WARN, "Starting with no initial connection to Redis %s !",
endpoint_print_buf(&r->endpoint));
@ -906,7 +921,13 @@ err:
}
struct redis *redis_dup(const struct redis *r, int db) {
return redis_new(&r->endpoint, db >= 0 ? db : r->db, r->auth, r->role, r->no_redis_required);
return redis_new(&r->endpoint,
(db >= 0 ? db : r->db),
r->hostname,
r->auth,
r->role,
r->no_redis_required,
r->update_resolve);
}
void redis_close(struct redis *r) {
@ -972,7 +993,7 @@ static int redis_check_conn(struct redis *r) {
}
// try redis reconnect => will free current r->ctx
if (redis_connect(r, 1)) {
if (redis_connect(r, 1, r->update_resolve)) {
// redis is disconnected
redis_count_err_and_disable(r);
return REDIS_STATE_DISCONNECTED;

@ -72,6 +72,7 @@ recording-method = proc
# redis-disable-time = 10
# redis-cmd-timeout = 0
# redis-connect-timeout = 1000
# redis-resolve-on-reconnect = false
# b2b-url = http://127.0.0.1:8090/
# xmlrpc-format = 0

@ -111,12 +111,15 @@ enum endpoint_learning {
X(jb_clock_drift) \
X(player_cache) \
X(poller_per_thread) \
X(redis_resolve_on_reconnect) \
X(measure_rtp)
#define RTPE_CONFIG_CHARP_PARAMS \
X(b2b_url) \
X(redis_auth) \
X(redis_write_auth) \
X(redis_hostname) \
X(redis_write_hostname) \
X(spooldir) \
X(rec_method) \
X(rec_format) \

@ -43,6 +43,7 @@ enum subscribe_action {
struct redis {
endpoint_t endpoint;
char host[64];
const char *hostname; /* can be a hostname or IP address */
enum redis_role role;
redisContext *ctx;
@ -62,6 +63,8 @@ struct redis {
mutex_t async_lock;
GQueue async_queue;
int async_last;
bool update_resolve;
};
struct redis_hash {
@ -88,7 +91,7 @@ void redis_notify_loop(void *d);
void redis_delete_async_loop(void *d);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int);
struct redis *redis_new(const endpoint_t *, int, const char *, const char *, enum redis_role, int, bool);
struct redis *redis_dup(const struct redis *r, int db);
void redis_close(struct redis *r);
int redis_restore(struct redis *, bool foreign, int db);

@ -13,7 +13,6 @@
enum socket_families {
SF_IP4 = 0,
SF_IP6,

Loading…
Cancel
Save