@ -19,6 +19,7 @@
# include "hiredis/hiredis.h"
# include "hiredis/async.h"
# include "hiredis/adapters/libevent.h"
# include "event2/thread.h"
INLINE redisReply * redis_expect ( int type , redisReply * r ) {
if ( ! r )
@ -246,8 +247,14 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
char * pdbstr = db_str ;
char * p = 0 ;
if ( ! ( cm - > conf . redis ) ) {
rlog ( LOG_ERROR , " A redis notification has been there but role was not 'master' or 'read' " ) ;
// sanity checks
if ( ! cm ) {
rlog ( LOG_ERROR , " Struct callmaster is NULL on onRedisNotification " ) ;
return ;
}
if ( ! cm - > conf . redis_notify ) {
rlog ( LOG_ERROR , " A redis notification has been received but no redis_notify database found " ) ;
return ;
}
@ -303,7 +310,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
str_init ( & callid , rr - > element [ 2 ] - > str ) ;
c = g_hash_table_lookup ( cm - > callhash , & callid ) ;
c = call_get ( & callid , cm ) ;
if ( c ) {
rwlock_unlock_w ( & c - > master_lock ) ; // because of call_get(..)
}
if ( c & & ! c - > redis_foreign_call ) {
rlog ( LOG_DEBUG , " I am responsible for that call so I ignore redis notifications. " ) ;
@ -326,110 +336,205 @@ err:
mutex_unlock ( & r - > lock ) ;
}
void redis_notify_event_base_loopbreak ( struct callmaster * cm ) {
void redis_async_context_disconnect ( const redisAsyncContext * redis_notify_async_context , int status ) {
if ( status = = REDIS_ERR ) {
if ( redis_notify_async_context - > errstr ) {
rlog ( LOG_ERROR , " redis_async_context_disconnect error %d on context free: %s " ,
redis_notify_async_context - > err , redis_notify_async_context - > errstr ) ;
} else {
rlog ( LOG_ERROR , " redis_async_context_disconnect error %d on context free: no errstr " ,
redis_notify_async_context - > err ) ;
}
} else if ( status = = REDIS_OK ) {
rlog ( LOG_ERROR , " redis_async_context_disconnect initiated by user " ) ;
} else {
rlog ( LOG_ERROR , " redis_async_context_disconnect invalid status code %d " , status ) ;
}
}
int redis_async_context_alloc ( struct callmaster * cm ) {
struct redis * r = 0 ;
// sanity checks
if ( ! cm - > conf . redis_notify_event_base ) {
rlog ( LOG_ERROR , " Redis event_base_new() is NULL on loopbreak " ) ;
return ;
if ( ! cm ) {
rlog ( LOG_ERROR , " Struct callmaster is NULL on context free " ) ;
return - 1 ;
}
if ( ! cm - > conf . redis_notify_async_context ) {
rlog ( LOG_ERROR , " Redis notify async context is NULL on loopbreak " ) ;
return ;
if ( ! cm - > conf . redis_notify ) {
rlog ( LOG_ INFO, " redis_notify database is NULL. " ) ;
return - 1 ;
}
// get redis_notify database
r = cm - > conf . redis_notify ;
rlog ( LOG_INFO , " Use Redis %s for notifications " , endpoint_print_buf ( & r - > endpoint ) ) ;
// alloc async context
cm - > conf . redis_notify_async_context = redisAsyncConnect ( r - > host , r - > endpoint . port ) ;
if ( ! cm - > conf . redis_notify_async_context ) {
rlog ( LOG_ERROR , " redis_notify_async_context can't create new " ) ;
return - 1 ;
}
if ( cm - > conf . redis_notify_async_context - > err ) {
rlog ( LOG_ERROR , " Redis notify async context error on loopbreak: %s " , cm - > conf . redis_notify_async_context - > errstr ) ;
return ;
rlog ( LOG_ERROR , " redis_notify_async_context can't create new error : %s" , cm - > conf . redis_notify_async_context - > errstr ) ;
return - 1 ;
}
event_base_loopbreak ( cm - > conf . redis_notify_event_base ) ;
redisAsyncCommand ( cm - > conf . redis_notify_async_context , onRedisNotification , NULL , " punsubscribe " ) ;
}
if ( redisAsyncSetDisconnectCallback ( cm - > conf . redis_notify_async_context , redis_async_context_disconnect ) ! = REDIS_OK ) {
rlog ( LOG_ERROR , " redis_notify_async_context can't set disconnect callback " ) ;
return - 1 ;
}
void redis_notify_subscribe_keyspace ( struct callmaster * cm , int keyspace ) {
char main_db_str [ 256 ] ;
return 0 ;
}
int redis_notify_event_base_action ( struct callmaster * cm , enum event_base_action action ) {
// sanity checks
if ( ! cm - > conf . redis_notify_async_context ) {
rlog ( LOG_ERROR , " Redis notify async context NULL on subscribe" ) ;
return ;
if ( ! cm ) {
rlog ( LOG_ERROR , " Struct callmaster is NULL on event base action %d" , action ) ;
return - 1 ;
}
if ( cm - > conf . redis_notify_ async_context- > err ) {
rlog ( LOG_ERROR , " Redis notify async context error on subscribe: %s" , cm - > conf . redis_notify_async_context - > errstr ) ;
return ;
if ( ! cm - > conf . redis_notify_ event_base & & action ! = EVENT_BASE_ALLOC ) {
rlog ( LOG_ERROR , " redis_notify_event_base is NULL on event base action %d" , action ) ;
return - 1 ;
}
memset ( & main_db_str , 0 , 256 ) ;
sprintf ( main_db_str , " psubscribe __keyspace@%i*:notifier-* " , keyspace ) ;
// exec event base action
switch ( action ) {
case EVENT_BASE_ALLOC :
cm - > conf . redis_notify_event_base = event_base_new ( ) ;
if ( ! cm - > conf . redis_notify_event_base ) {
rlog ( LOG_ERROR , " Fail alloc redis_notify_event_base " ) ;
return - 1 ;
} else {
rlog ( LOG_DEBUG , " Success alloc redis_notify_event_base " ) ;
}
break ;
case EVENT_BASE_FREE :
event_base_free ( cm - > conf . redis_notify_event_base ) ;
rlog ( LOG_DEBUG , " Success free redis_notify_event_base " ) ;
break ;
case EVENT_BASE_LOOPBREAK :
if ( event_base_loopbreak ( cm - > conf . redis_notify_event_base ) ) {
rlog ( LOG_ERROR , " Fail loopbreak redis_notify_event_base " ) ;
return - 1 ;
} else {
rlog ( LOG_DEBUG , " Success loopbreak redis_notify_event_base " ) ;
}
break ;
redisAsyncCommand ( cm - > conf . redis_notify_async_context , onRedisNotification , ( void * ) cm , main_db_str ) ;
}
default :
rlog ( LOG_ERROR , " No event base action found: %d " , action ) ;
return - 1 ;
}
void redis_notify_unsubscribe_keyspace ( struct callmaster * cm , int keyspace ) {
char main_db_str [ 256 ] ;
return 0 ;
}
int redis_notify_subscribe_action ( struct callmaster * cm , enum subscribe_action action , int keyspace ) {
// sanity checks
if ( ! cm ) {
rlog ( LOG_ERROR , " Struct callmaster is NULL on subscribe action " ) ;
return - 1 ;
}
if ( ! cm - > conf . redis_notify_async_context ) {
rlog ( LOG_ERROR , " Redis notify async context NULL on unsubscribe " ) ;
return ;
rlog ( LOG_ERROR , " redis_notify_async_context is NULL on subscribe action " ) ;
return - 1 ;
}
if ( cm - > conf . redis_notify_async_context - > err ) {
rlog ( LOG_ERROR , " Redis notify async context error on unsubscribe: %s " , cm - > conf . redis_notify_async_context - > errstr ) ;
return ;
rlog ( LOG_ERROR , " redis_notify_async_context error on subscribe action : %s" , cm - > conf . redis_notify_async_context - > errstr ) ;
return - 1 ;
}
memset ( & main_db_str , 0 , 256 ) ;
sprintf ( main_db_str , " punsubscribe __keyspace@%i*:notifier-* " , keyspace ) ;
switch ( action ) {
case SUBSCRIBE_KEYSPACE :
if ( redisAsyncCommand ( cm - > conf . redis_notify_async_context , onRedisNotification , ( void * ) cm , " psubscribe __keyspace@%i*:notifier-* " , keyspace ) ! = REDIS_OK ) {
rlog ( LOG_ERROR , " Fail redisAsyncCommand on SUBSCRIBE_KEYSPACE " ) ;
return - 1 ;
}
break ;
case UNSUBSCRIBE_KEYSPACE :
if ( redisAsyncCommand ( cm - > conf . redis_notify_async_context , onRedisNotification , ( void * ) cm , " punsubscribe __keyspace@%i*:notifier-* " , keyspace ) ! = REDIS_OK ) {
rlog ( LOG_ERROR , " Fail redisAsyncCommand on UNSUBSCRIBE_KEYSPACE " ) ;
return - 1 ;
}
break ;
case UNSUBSCRIBE_ALL :
if ( redisAsyncCommand ( cm - > conf . redis_notify_async_context , onRedisNotification , ( void * ) cm , " punsubscribe " ) ! = REDIS_OK ) {
rlog ( LOG_ERROR , " Fail redisAsyncCommand on UNSUBSCRIBE_ALL " ) ;
return - 1 ;
}
break ;
default :
rlog ( LOG_ERROR , " No subscribe action found: %d " , action ) ;
return - 1 ;
}
redisAsyncCommand ( cm - > conf . redis_notify_async_context , onRedisNotification , ( void * ) cm , main_db_str ) ;
return 0 ;
}
static void redis_notify ( struct callmaster * cm ) {
static int redis_notify ( struct callmaster * cm ) {
struct redis * r = 0 ;
GList * l ;
if ( cm - > conf . redis_notify ) {
r = cm - > conf . redis_notify ;
rlog ( LOG_INFO , " Use Redis %s to subscribe to notifications " , endpoint_print_buf ( & r - > endpoint ) ) ;
} else {
rlog ( LOG_INFO , " Don't use Redis notifications. See --redis-notifications parameter. " ) ;
return ;
// sanity checks
if ( ! cm ) {
rlog ( LOG_ERROR , " Struct callmaster is NULL on redis_notify() " ) ;
return - 1 ;
}
cm - > conf . redis_notify_event_base = NULL ;
cm - > conf . redis_notify_event_base = event_base_new ( ) ;
if ( ! cm - > conf . redis_notify_event_base ) {
rlog ( LOG_ERROR , " Redis event_base_new() NULL error " ) ;
return ;
if ( ! cm - > conf . redis_notify ) {
rlog ( LOG_ERROR , " redis_notify database is NULL on redis_notify() " ) ;
return - 1 ;
}
cm - > conf . redis_notify_async_context = NULL ;
cm - > conf . redis_notify_async_context = redisAsyncConnect ( r - > host , r - > endpoint . port ) ;
if ( ! cm - > conf . redis_notify_async_context ) {
rlog ( LOG_ERROR , " Redis notify async context NULL error " ) ;
return ;
rlog ( LOG_ERROR , " redis_notify_async_context is NULL on redis_notify() " ) ;
return - 1 ;
}
if ( cm - > conf . redis_notify_async_context - > err ) {
rlog ( LOG_ERROR , " Redis notify async context error: %s " , cm - > conf . redis_notify_async_context - > errstr ) ;
return ;
if ( ! cm - > conf . redis_notify_event_base ) {
rlog ( LOG_ERROR , " redis_notify_event_base is NULL on redis_notify() " ) ;
return - 1 ;
}
redisLibeventAttach ( cm - > conf . redis_notify_async_context , cm - > conf . redis_notify_event_base ) ;
// get redis_notify database
r = cm - > conf . redis_notify ;
rlog ( LOG_INFO , " Use Redis %s to subscribe to notifications " , endpoint_print_buf ( & r - > endpoint ) ) ;
// attach event base
if ( redisLibeventAttach ( cm - > conf . redis_notify_async_context , cm - > conf . redis_notify_event_base ) = = REDIS_ERR ) {
if ( cm - > conf . redis_notify_async_context - > err ) {
rlog ( LOG_ERROR , " redis_notify_async_context can't attach event base error: %s " , cm - > conf . redis_notify_async_context - > errstr ) ;
} else {
rlog ( LOG_ERROR , " redis_notify_async_context can't attach event base " ) ;
/* Subscribing to the values in the configured keyspaces */
}
return - 1 ;
}
// subscribe to the values in the configured keyspaces
for ( l = cm - > conf . redis_subscribed_keyspaces - > head ; l ; l = l - > next ) {
redis_notify_subscribe_keyspace ( cm , * ( int * ) ( l - > data ) ) ;
redis_notify_subscribe_ action( cm , SUBSCRIBE_KEYSPACE , GPOINTER_TO_UINT ( l - > data ) ) ;
}
event_base_dispatch ( cm - > conf . redis_notify_event_base ) ;
// dispatch event base => thread blocks here
if ( event_base_dispatch ( cm - > conf . redis_notify_event_base ) < 0 ) {
rlog ( LOG_ERROR , " Fail event_base_dispatch() " ) ;
return - 1 ;
}
return 0 ;
}
void redis_notify_loop ( void * d ) {
int seconds = 1 ;
int seconds = 1 , redis_notify_return = 0 ;
time_t next_run = g_now . tv_sec ;
struct callmaster * cm = ( struct callmaster * ) d ;
struct redis * r ;
@ -442,12 +547,29 @@ void redis_notify_loop(void *d) {
r = cm - > conf . redis_notify ;
if ( ! r ) {
rlog ( LOG_ERROR , " Don't use Redis notifications. See --redis-notifications parameter. " ) ;
return ;
}
// init libevent for pthread usage
if ( evthread_use_pthreads ( ) < 0 ) {
ilog ( LOG_ERROR , " evthread_use_pthreads failed " ) ;
return ;
}
// alloc redis async context
if ( redis_async_context_alloc ( cm ) < 0 ) {
return ;
}
// alloc event base
if ( redis_notify_event_base_action ( cm , EVENT_BASE_ALLOC ) < 0 ) {
return ;
}
// initial redis_notify
if ( redis_check_conn ( r ) = = REDIS_STATE_CONNECTED ) {
redis_notify ( cm ) ;
redis_notify_return = redis_notify ( cm ) ;
}
// loop redis_notify => in case of lost connection
@ -460,10 +582,25 @@ void redis_notify_loop(void *d) {
next_run = g_now . tv_sec + seconds ;
if ( redis_check_conn ( r ) = = REDIS_STATE_RECONNECTED ) {
redis_notify ( cm ) ;
if ( redis_check_conn ( r ) = = REDIS_STATE_RECONNECTED | | redis_notify_return < 0 ) {
// alloc new redis async context upon redis breakdown
if ( redis_async_context_alloc ( cm ) < 0 ) {
continue ;
}
// prepare notifications
redis_notify_return = redis_notify ( cm ) ;
}
}
// unsubscribe notifications
redis_notify_subscribe_action ( cm , UNSUBSCRIBE_ALL , 0 ) ;
// free async context
redisAsyncDisconnect ( cm - > conf . redis_notify_async_context ) ;
// free event base
redis_notify_event_base_action ( cm , EVENT_BASE_FREE ) ;
}
struct redis * redis_new ( const endpoint_t * ep , int db , const char * auth , enum redis_role role , int no_redis_required ) {