From a8d5076065db8131ea332a957167fc0c3523e557 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 16 Apr 2021 14:05:48 -0400 Subject: [PATCH] TT#119502 correctly restore calls from both Redis instances Change-Id: I713d7e8ba0a7d14f5ef9016d33619df91ce6ec32 --- daemon/call.c | 13 ++++++++---- daemon/cli.c | 8 ++++++++ daemon/main.c | 15 ++++++++++++-- daemon/media_socket.c | 2 ++ daemon/redis.c | 48 ++++++++++++++++++++++++++++++------------- include/call.h | 1 + include/redis.h | 2 +- 7 files changed, 68 insertions(+), 21 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 0de526a32..bce708a92 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -174,6 +174,10 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) { if (!c->streams.head) goto drop; + // ignore media timeout if call was recently taken over + if (c->foreign_media && rtpe_now.tv_sec - c->last_signal <= rtpe_config.timeout) + goto out; + for (it = c->streams.head; it; it = it->next) { ps = it->data; @@ -644,6 +648,9 @@ static void call_timer(void *ptr) { update = 0; + if (diff_packets) + sfd->call->foreign_media = 0; + sink = packet_stream_sink(ps); if (!ke->target.non_forwarding && diff_packets) { @@ -2119,7 +2126,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, monologue = other_ml->active_dialogue; call = monologue->call; - call->last_signal = rtpe_now.tv_sec; + call->last_signal = MAX(call->last_signal, rtpe_now.tv_sec); call->deleted = 0; __C_DBG("this="STR_FORMAT" other="STR_FORMAT, STR_FMT(&monologue->tag), STR_FMT(&other_ml->tag)); @@ -2524,9 +2531,7 @@ void call_destroy(struct call *c) { statistics_update_ip46_inc_dec(c, CMC_DECREMENT); statistics_update_foreignown_dec(c); - if (IS_OWN_CALL(c)) { - redis_delete(c, rtpe_redis_write); - } + redis_delete(c, rtpe_redis_write); rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ diff --git a/daemon/cli.c b/daemon/cli.c index 8e1a543e9..fe1f591c7 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -1080,7 +1080,15 @@ static void cli_incoming_active_standby(struct cli_writer *cw, int foreign) { g_hash_table_iter_init(&iter, rtpe_callhash); while (g_hash_table_iter_next(&iter, &key, &value)) { struct call *c = value; + rwlock_lock_w(&c->master_lock); call_make_own_foreign(c, foreign); + c->last_signal = MAX(c->last_signal, rtpe_now.tv_sec); + if (!foreign) { + c->foreign_media = 1; // ignore timeout until we have media + c->last_signal++; // we are authoritative now + } + rwlock_unlock_w(&c->master_lock); + redis_update_onekey(c, rtpe_redis_write); } rwlock_unlock_r(&rtpe_callhash_lock); diff --git a/daemon/main.c b/daemon/main.c index 5535073ea..65e3bb7d5 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1050,8 +1050,19 @@ no_kernel: gettimeofday(&redis_start, NULL); // restore - if (redis_restore(rtpe_redis)) - die("Refusing to continue without working Redis database"); + if (rtpe_redis_notify) { + // active-active mode: the main DB has our own calls, while + // the "notify" DB has the "foreign" calls. "foreign" DB goes + // first as the "owned" DB can do a stray update back to Redis + if (redis_restore(rtpe_redis_notify, 1)) + ilog(LOG_WARN, "Unable to restore calls from the active-active peer"); + if (redis_restore(rtpe_redis_write, 0)) + die("Refusing to continue without working Redis database"); + } + else { + if (redis_restore(rtpe_redis, 0)) + die("Refusing to continue without working Redis database"); + } // stop redis restore timer gettimeofday(&redis_stop, NULL); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 326c672fa..5365870cb 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -2008,6 +2008,8 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (!phc->mp.stream->selected_sfd) goto out; + phc->mp.call->foreign_media = 0; + if (phc->mp.call->drop_traffic) { goto drop; } diff --git a/daemon/redis.c b/daemon/redis.c index b45b029c6..235ed7637 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -438,7 +438,7 @@ void redis_notify_async_context_disconnect(const redisAsyncContext *redis_notify redis_notify_async_context->err); } } else if (status == REDIS_OK) { - rlog(LOG_ERROR, "redis_notify_async_context_disconnect initiated by user"); + rlog(LOG_NOTICE, "redis_notify_async_context_disconnect initiated by user"); } else { rlog(LOG_ERROR, "redis_notify_async_context_disconnect invalid status code %d", status); } @@ -1719,6 +1719,7 @@ static void json_restore_call(struct redis *r, const str *callid, int foreign) { struct redis_list tags, sfds, streams, medias, maps; struct call *c = NULL; str s, id, meta; + time_t last_signal; const char *err = 0; int i; @@ -1744,13 +1745,25 @@ static void json_restore_call(struct redis *r, const str *callid, int foreign) { if (!c) goto err1; - err = "call already exists"; - if (c->last_signal) - goto err2; err = "'call' data incomplete"; - if (json_get_hash(&call, "json", -1, root_reader)) goto err2; + + err = "missing 'last signal' timestamp"; + if (redis_hash_get_time_t(&last_signal, &call, "last_signal")) + goto err3; + + if (c->last_signal) { + err = NULL; + // is the call we're loading newer than the one we have? + if (last_signal > c->last_signal) { + // switch ownership + call_make_own_foreign(c, foreign); + c->last_signal = last_signal; + } + goto err3; // no error, just bail + } + err = "'tags' incomplete"; if (json_get_list_hash(&tags, "tag", &call, "num_tags", root_reader)) goto err3; @@ -1770,9 +1783,7 @@ static void json_restore_call(struct redis *r, const str *callid, int foreign) { err = "missing 'created' timestamp"; if (redis_hash_get_timeval(&c->created, &call, "created")) goto err8; - err = "missing 'last signal' timestamp"; - if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal")) - goto err8; + c->last_signal = last_signal; if (redis_hash_get_int(&i, &call, "tos")) c->tos = 184; else @@ -1863,10 +1874,15 @@ err1: err); if (c) call_destroy(c); - else { - mutex_lock(&rtpe_redis_write->lock); - redisCommandNR(rtpe_redis_write->ctx, "DEL " PB, STR(callid)); - mutex_unlock(&rtpe_redis_write->lock); + + mutex_lock(&rtpe_redis_write->lock); + redisCommandNR(rtpe_redis_write->ctx, "DEL " PB, STR(callid)); + mutex_unlock(&rtpe_redis_write->lock); + + if (rtpe_redis_notify) { + mutex_lock(&rtpe_redis_notify->lock); + redisCommandNR(rtpe_redis_notify->ctx, "DEL " PB, STR(callid)); + mutex_unlock(&rtpe_redis_notify->lock); } } if (c) @@ -1876,6 +1892,7 @@ err1: struct thread_ctx { GQueue r_q; mutex_t r_m; + int foreign; }; static void restore_thread(void *call_p, void *ctx_p) { @@ -1891,14 +1908,14 @@ static void restore_thread(void *call_p, void *ctx_p) { r = g_queue_pop_head(&ctx->r_q); mutex_unlock(&ctx->r_m); - json_restore_call(r, &callid, 0); + json_restore_call(r, &callid, ctx->foreign); mutex_lock(&ctx->r_m); g_queue_push_tail(&ctx->r_q, r); mutex_unlock(&ctx->r_m); } -int redis_restore(struct redis *r) { +int redis_restore(struct redis *r, int foreign) { redisReply *calls = NULL, *call; int i, ret = -1; GThreadPool *gtp; @@ -1931,6 +1948,7 @@ int redis_restore(struct redis *r) { mutex_init(&ctx.r_m); g_queue_init(&ctx.r_q); + ctx.foreign = foreign; for (i = 0; i < rtpe_config.redis_num_threads; i++) g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, r->auth, r->role, r->no_redis_required)); @@ -2401,6 +2419,8 @@ void redis_update_onekey(struct call *c, struct redis *r) { if (!r) return; + if (c->foreign_call) + return; mutex_lock(&r->lock); // coverity[sleep : FALSE] diff --git a/include/call.h b/include/call.h index e62f29ae2..32cdd88b7 100644 --- a/include/call.h +++ b/include/call.h @@ -435,6 +435,7 @@ struct call { unsigned int rec_forwarding:1; unsigned int drop_traffic:1; unsigned int foreign_call:1; // created_via_redis_notify call + unsigned int foreign_media:1; // for calls taken over, tracks whether we have media unsigned int disable_jb:1; unsigned int debug:1; }; diff --git a/include/redis.h b/include/redis.h index b232702d4..1daffbb58 100644 --- a/include/redis.h +++ b/include/redis.h @@ -107,7 +107,7 @@ void redis_delete_async_loop(void *d); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int); void redis_close(struct redis *r); -int redis_restore(struct redis *); +int redis_restore(struct redis *, int foreign); void redis_update(struct call *, struct redis *); void redis_update_onekey(struct call *c, struct redis *r); void redis_delete(struct call *, struct redis *);