@ -76,11 +76,49 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...)
# define REDIS_FMT(x) (int) (x)->len, (x)->str
// To protect against a restore race condition: Keyspace notifications are set up
// before existing calls are restored (restore_thread). Therefore the following
// scenario is possible:
// NOTIF THREAD: receives SET, creates call
// RESTORE THREAD: executes KEYS *
// NOTIF THREAD: receives another SET:
// NOTIF THREAD: does call_destroy(), which:
// adds ports to late-release list
// RESTORE THREAD: comes across call ID, does GET
// RESTORE THREAD: creates new call
// RESTORE THREAD: wants to allocate ports, but they're still in use
// NOTIF THREAD: now does release_closed_sockets()
static mutex_t redis_ports_release_lock = MUTEX_STATIC_INIT ;
static cond_t redis_ports_release_cond = COND_STATIC_INIT ;
static int redis_ports_release_balance = 0 ; // negative = releasers, positive = allocators
static int redis_check_conn ( struct redis * r ) ;
static void json_restore_call ( struct redis * r , const str * id , bool foreign ) ;
static int redis_connect ( struct redis * r , int wait ) ;
static int json_build_ssrc ( struct call_monologue * ml , JsonReader * root_reader ) ;
// mutually exclusive multi-A multi-B lock
// careful with deadlocks against redis->lock
static void redis_ports_release_push ( bool inc ) {
LOCK ( & redis_ports_release_lock ) ;
if ( inc ) {
while ( redis_ports_release_balance < 0 )
cond_wait ( & redis_ports_release_cond , & redis_ports_release_lock ) ;
}
else {
while ( redis_ports_release_balance > 0 )
cond_wait ( & redis_ports_release_cond , & redis_ports_release_lock ) ;
}
redis_ports_release_balance + = ( inc ? 1 : - 1 ) ;
}
static void redis_ports_release_pop ( bool inc ) {
LOCK ( & redis_ports_release_lock ) ;
redis_ports_release_balance - = ( inc ? 1 : - 1 ) ;
if ( redis_ports_release_balance = = 0 )
cond_broadcast ( & redis_ports_release_cond ) ;
}
static void redis_pipe ( struct redis * r , const char * fmt , . . . ) {
va_list ap ;
@ -375,8 +413,11 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
rwlock_unlock_w ( & c - > master_lock ) ;
if ( IS_FOREIGN_CALL ( c ) ) {
c - > redis_hosted_db = rtpe_redis_write - > db ; // don't delete from foreign DB
// redis_notify->lock is held
redis_ports_release_push ( true ) ;
call_destroy ( c ) ;
release_closed_sockets ( ) ;
redis_ports_release_pop ( true ) ;
}
else {
rlog ( LOG_WARN , " Redis-Notifier: Ignoring SET received for OWN call: " STR_FORMAT " \n " , STR_FMT ( & callid ) ) ;
@ -404,7 +445,11 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
rlog ( LOG_WARN , " Redis-Notifier: Ignoring DEL received for an OWN call: " STR_FORMAT " \n " , STR_FMT ( & callid ) ) ;
goto err ;
}
// redis_notify->lock is held
redis_ports_release_push ( true ) ;
call_destroy ( c ) ;
release_closed_sockets ( ) ;
redis_ports_release_pop ( true ) ;
}
err :
@ -1898,6 +1943,9 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
rr_jsonStr = redis_get ( r , REDIS_REPLY_STRING , " GET " PB , STR ( callid ) ) ;
mutex_unlock ( & r - > lock ) ;
bool must_release_pop = true ;
redis_ports_release_push ( false ) ;
err = " could not retrieve JSON data from redis " ;
if ( ! rr_jsonStr )
goto err1 ;
@ -1911,6 +1959,7 @@ static void json_restore_call(struct redis *r, const str *callid, bool foreign)
if ( ! root_reader )
goto err1 ;
c = call_get_or_create ( callid , foreign , false ) ;
err = " failed to create call struct " ;
if ( ! c )
@ -2049,8 +2098,12 @@ err1:
STR_FMT_M ( callid ) ,
err ) ;
mutex_unlock ( & r - > lock ) ;
if ( c )
if ( c )
call_destroy ( c ) ;
release_closed_sockets ( ) ;
if ( must_release_pop ) // avoid deadlock with redis_notify->lock below
redis_ports_release_pop ( false ) ;
must_release_pop = false ;
mutex_lock ( & rtpe_redis_write - > lock ) ;
redisCommandNR ( rtpe_redis_write - > ctx , " DEL " PB , STR ( callid ) ) ;
@ -2064,6 +2117,9 @@ err1:
}
if ( c )
obj_put ( c ) ;
release_closed_sockets ( ) ;
if ( must_release_pop )
redis_ports_release_pop ( false ) ;
log_info_reset ( ) ;
}