TT#156900 fix master/slave race condition with early closed ports

When ports are closed early (while the call is still running), we must
first update a slave rtpengine with this new information (that these
ports are now closed) before actually releasing the ports ourselves. Not
doing so leads to a race condition where the master instance re-uses a
port that was just closed before the slave instance knows about the port
being closed.

We implement this using a thread-local list to keep track of ports that
were released while processing a control message, and process this list
to actually close the ports only after Redis has been updated.

Additional calls to the function to close the ports are placed in
strategic locations to make sure this is triggered in every code path.

closes 

Change-Id: I803f4594f30ca315da0b84c6e76893f54ca3a7c9
(cherry picked from commit 17bda4b1e8)
mr10.5.1
Richard Fuchs 3 years ago
parent af0f6597f2
commit aa00c6ebce

@ -99,6 +99,7 @@ static int call_timer_delete_monologues(struct call *c) {
struct call_monologue *ml;
int ret = 0;
time_t min_deleted = 0;
bool update = false;
/* we need a write lock here */
rwlock_unlock_r(&c->master_lock);
@ -119,12 +120,15 @@ static int call_timer_delete_monologues(struct call *c) {
ret = 1; /* destroy call */
goto out;
}
update = true;
}
out:
c->ml_deleted = min_deleted;
rwlock_unlock_w(&c->master_lock);
if (update)
redis_update_onekey(c, rtpe_redis_write);
rwlock_lock_r(&c->master_lock);
// coverity[missing_unlock : FALSE]
@ -736,6 +740,8 @@ next:
interval /= 2;
ilog(LOG_INFO, "Decreasing timer run interval to %llu seconds", interval / 1000000);
}
release_closed_sockets();
}
#undef DS
@ -4261,6 +4267,7 @@ int call_delete_branch(const str *callid, const str *branch,
int ret;
const str *match_tag;
GList *i;
bool update = false;
if (delete_delay < 0)
delete_delay = rtpe_config.delete_delay;
@ -4339,6 +4346,7 @@ do_delete:
STR_FMT_M(&ml->tag), STR_FMT0_M(branch));
if (monologue_destroy(ml))
goto del_all;
update = true;
}
goto success_unlock;
@ -4375,7 +4383,10 @@ err:
goto out;
out:
if (c)
if (c) {
if (update)
redis_update_onekey(c, rtpe_redis_write);
obj_put(c);
}
return ret;
}

@ -1243,6 +1243,7 @@ static void cli_stream_readable(struct streambuf_stream *s) {
void cli_handle(str *instr, struct cli_writer *cw) {
ilogs(control, LOG_INFO, "Got CLI command: " STR_FORMAT_M, STR_FMT_M(instr));
cli_handler_do(cli_top_handlers, instr, cw);
release_closed_sockets();
}
static void cli_free(void *p) {

@ -398,6 +398,7 @@ send_only:
out:
ng_buffer_release(ngbuf);
release_closed_sockets();
log_info_pop();
return funcret;
}

@ -76,6 +76,13 @@ struct packet_handler_ctx {
// output:
struct media_packet mp; // passed to handlers
};
struct late_port_release {
socket_t socket;
struct intf_spec *spec;
};
static __thread GQueue ports_to_release = G_QUEUE_INIT;
static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *);
@ -817,7 +824,7 @@ static int get_port(socket_t *r, unsigned int port, struct intf_spec *spec, cons
return 0;
}
static void release_port(socket_t *r, struct intf_spec *spec) {
static void release_port_now(socket_t *r, struct intf_spec *spec) {
unsigned int port = r->local.port;
struct port_pool *pp = &spec->port_pool;
@ -840,10 +847,26 @@ static void release_port(socket_t *r, struct intf_spec *spec) {
__C_DBG("port %u is NOT released", port);
}
}
static void release_port(socket_t *r, struct intf_spec *spec) {
if (!r->local.port || r->fd == -1)
return;
__C_DBG("adding port %u to late-release list", r->local.port);
struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr));
move_socket(&lpr->socket, r);
lpr->spec = spec;
g_queue_push_tail(&ports_to_release, lpr);
}
static void free_port(socket_t *r, struct intf_spec *spec) {
release_port(r, spec);
g_slice_free1(sizeof(*r), r);
}
void release_closed_sockets(void) {
struct late_port_release *lpr;
while ((lpr = g_queue_pop_head(&ports_to_release))) {
release_port_now(&lpr->socket, lpr->spec);
g_slice_free1(sizeof(*lpr), lpr);
}
}

@ -376,6 +376,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
if (IS_FOREIGN_CALL(c)) {
c->redis_hosted_db = rtpe_redis_write->db; // don't delete from foreign DB
call_destroy(c);
release_closed_sockets();
}
else {
rlog(LOG_WARN, "Redis-Notifier: Ignoring SET received for OWN call: " STR_FORMAT "\n", STR_FMT(&callid));
@ -411,6 +412,7 @@ err:
obj_put(c);
mutex_unlock(&r->lock);
release_closed_sockets();
log_info_reset();
}
@ -2087,6 +2089,7 @@ static void restore_thread(void *call_p, void *ctx_p) {
mutex_lock(&ctx->r_m);
g_queue_push_tail(&ctx->r_q, r);
mutex_unlock(&ctx->r_m);
release_closed_sockets();
}
int redis_restore(struct redis *r, bool foreign, int db) {

@ -8,6 +8,7 @@
#include "aux.h"
#include "log.h"
#include "streambuf.h"
#include "media_socket.h"
struct tcp_listener_callback {
struct obj obj;
@ -138,6 +139,8 @@ static void streambuf_stream_readable(int fd, void *p, uintptr_t u) {
if (ret == -2)
goto close;
release_closed_sockets();
return;
close:

@ -60,6 +60,8 @@ static void udp_listener_incoming(int fd, void *p, uintptr_t x) {
obj_put(udp_buf);
udp_buf = NULL;
}
release_closed_sockets();
}
obj_put(udp_buf);
}

@ -758,6 +758,8 @@ static int websocket_http(struct lws *wsi, enum lws_callback_reasons reason, voi
break;
}
release_closed_sockets();
return 0;
}
@ -827,6 +829,8 @@ static int websocket_protocol(struct lws *wsi, enum lws_callback_reasons reason,
break;
}
release_closed_sockets();
return 0;
}

@ -179,6 +179,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_
struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif);
struct stream_fd *stream_fd_lookup(const endpoint_t *);
void stream_fd_release(struct stream_fd *);
void release_closed_sockets(void);
void free_intf_list(struct intf_list *il);
void free_release_intf_list(struct intf_list *il);

@ -800,6 +800,17 @@ int close_socket(socket_t *r) {
return 0;
}
// moves the contents of the socket object:
// dst must be initialised
// src will be reset and cleared, as if it was closed
// does not actually close the socket
void move_socket(socket_t *dst, socket_t *src) {
*dst = *src;
src->fd = -1;
ZERO(src->local);
ZERO(src->remote);
}

@ -217,6 +217,7 @@ int connect_socket(socket_t *r, int type, const endpoint_t *ep);
int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); // 1 == in progress
int connect_socket_retry(socket_t *r); // retries connect() while in progress
int close_socket(socket_t *r);
void move_socket(socket_t *dst, socket_t *src);
void dummy_socket(socket_t *r, const sockaddr_t *);
sockfamily_t *get_socket_family_rfc(const str *s);

Loading…
Cancel
Save