@ -48,13 +48,21 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
# include "asterisk/stasis.h"
# include "asterisk/stasis_message_router.h"
# include "asterisk/stasis_system.h"
# include "asterisk/taskprocessor.h"
AST_RWLOCK_DEFINE_STATIC ( event_types_lock ) ;
AST_RWLOCK_DEFINE_STATIC ( init_cpg_lock ) ;
/*! \brief Timeout for Corosync's poll process */
# define COROSYNC_POLL_TIMEOUT (10 * 1000)
static void publish_mwi_to_stasis ( struct ast_event * event ) ;
static void publish_device_state_to_stasis ( struct ast_event * event ) ;
static void publish_cluster_discovery_to_stasis ( struct ast_event * event ) ;
/*! \brief Join to corosync */
static int corosync_node_joined = 0 ;
/*! \brief All the nodes that we're aware of */
static struct ao2_container * nodes ;
@ -199,6 +207,7 @@ static void publish_corosync_ping_to_stasis(struct ast_event *event)
{
struct corosync_ping_payload * payload ;
struct stasis_message * message ;
struct ast_eid * event_eid ;
ast_assert ( ast_event_get_type ( event ) = = AST_EVENT_PING ) ;
ast_assert ( event ! = NULL ) ;
@ -211,7 +220,10 @@ static void publish_corosync_ping_to_stasis(struct ast_event *event)
if ( ! payload ) {
return ;
}
payload - > event = event ;
event_eid = ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
payload - > event = ast_event_new ( AST_EVENT_PING ,
AST_EVENT_IE_EID , AST_EVENT_IE_PLTYPE_RAW , event_eid , sizeof ( * event_eid ) ,
AST_EVENT_IE_END ) ;
message = stasis_message_create ( corosync_ping_message_type ( ) , payload ) ;
if ( ! message ) {
@ -347,7 +359,7 @@ static void publish_cluster_discovery_to_stasis(struct ast_event *event)
ast_assert ( ast_event_get_type ( event ) = = AST_EVENT_CLUSTER_DISCOVERY ) ;
event_eid = ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
if ( ! ast_eid_cmp ( & ast_eid_default , event_eid ) ) {
if ( ! event_eid | | ! ast_eid_cmp ( & ast_eid_default , event_eid ) ) {
/* Don't feed events back in that originated locally. */
return ;
}
@ -477,6 +489,7 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
struct ast_event * event ;
void ( * publish_handler ) ( struct ast_event * ) = NULL ;
enum ast_event_type event_type ;
struct ast_eid * event_eid ;
if ( msg_len < ast_event_minimum_length ( ) ) {
ast_debug ( 1 , " Ignoring event that's too small. %u < %u \n " ,
@ -485,7 +498,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
return ;
}
if ( ! ast_eid_cmp ( & ast_eid_default , ast_event_get_ie_raw ( msg , AST_EVENT_IE_EID ) ) ) {
event_eid = ( struct ast_eid * ) ast_event_get_ie_raw ( msg , AST_EVENT_IE_EID ) ;
if ( ! event_eid | | ! ast_eid_cmp ( & ast_eid_default , event_eid ) ) {
/* Don't feed events back in that originated locally. */
return ;
}
@ -497,14 +511,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
}
ast_rwlock_rdlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_deliver_cb rdlock \n " ) ;
publish_handler = event_types [ event_type ] . publish_to_stasis ;
if ( ! event_types [ event_type ] . subscribe | | ! publish_handler ) {
/* We are not configured to subscribe to these events or
we have no way to publish it internally . */
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_deliver_cb unlock \n " ) ;
return ;
}
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_deliver_cb unlock \n " ) ;
if ( ! ( event = ast_malloc ( msg_len ) ) ) {
return ;
@ -516,13 +533,14 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
const struct ast_eid * eid ;
char buf [ 128 ] = " " ;
eid = ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
eid = ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
ast_eid_to_str ( buf , sizeof ( buf ) , ( struct ast_eid * ) eid ) ;
ast_log ( LOG_NOTICE , " Got event PING from server with EID: '%s' \n " , buf ) ;
}
ast_debug ( 5 , " Publishing event %s (%u) to stasis \n " ,
ast_event_get_type_name ( event ) , event_type ) ;
publish_handler ( event ) ;
ast_free ( event ) ;
}
static void publish_event_to_corosync ( struct ast_event * event )
@ -538,22 +556,32 @@ static void publish_event_to_corosync(struct ast_event *event)
/* The stasis subscription will only exist if we are configured to publish
* these events , so just send away . */
if ( ( cs_err = cpg_mcast_joined ( cpg_handle , CPG_TYPE_FIFO , & iov , 1 ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " CPG mcast failed (%u) for event %s (%u) \n " ,
cs_err , ast_event_get_type_name ( event ) , ast_event_get_type ( event ) ) ;
if ( corosync_node_joined & & ! ast_rwlock_tryrdlock ( & init_cpg_lock ) ) {
ast_debug ( 5 , " publish_event_to_corosync rdlock \n " ) ;
if ( ( cs_err = cpg_mcast_joined ( cpg_handle , CPG_TYPE_FIFO , & iov , 1 ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " CPG mcast failed (%u) for event %s (%u) \n " ,
cs_err , ast_event_get_type_name ( event ) , ast_event_get_type ( event ) ) ;
}
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " publish_event_to_corosync unlock \n " ) ;
} else {
ast_log ( LOG_WARNING , " CPG mcast not executed for event %s (%u): initializing CPG. \n " ,
ast_event_get_type_name ( event ) , ast_event_get_type ( event ) ) ;
}
}
static void publish_to_corosync ( struct stasis_message * message )
{
struct ast_event * event ;
struct ast_eid * event_eid ;
event = stasis_message_to_event ( message ) ;
if ( ! event ) {
return ;
}
if ( ast_eid_cmp ( & ast_eid_default , ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ) ) {
event_eid = ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
if ( ! event_eid | | ast_eid_cmp ( & ast_eid_default , event_eid ) ) {
/* If the event didn't originate from this server, don't send it back out. */
ast_event_destroy ( event ) ;
return ;
@ -563,12 +591,13 @@ static void publish_to_corosync(struct stasis_message *message)
const struct ast_eid * eid ;
char buf [ 128 ] = " " ;
eid = ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
eid = ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
ast_eid_to_str ( buf , sizeof ( buf ) , ( struct ast_eid * ) eid ) ;
ast_log ( LOG_NOTICE , " Sending event PING from this server with EID: '%s' \n " , buf ) ;
}
publish_event_to_corosync ( event ) ;
ast_event_destroy ( event ) ;
}
static void stasis_message_cb ( void * data , struct stasis_subscription * sub , struct stasis_message * message )
@ -593,6 +622,30 @@ static int dump_cache_cb(void *obj, void *arg, int flags)
return 0 ;
}
static int clear_node_cache ( void * obj , void * arg , int flags )
{
struct stasis_message * cached_msg = obj ;
struct stasis_topic * topic = arg ;
struct stasis_message * msg ;
struct ast_eid * msg_eid ;
if ( ! cached_msg ) {
return 0 ;
}
msg_eid = ( struct ast_eid * ) stasis_message_eid ( cached_msg ) ;
if ( msg_eid & & ast_eid_cmp ( & ast_eid_default , msg_eid ) )
{
msg = stasis_cache_clear_create ( cached_msg ) ;
if ( msg ) {
stasis_publish ( topic , msg ) ;
ao2_cleanup ( msg ) ;
}
}
return 0 ;
}
static void cpg_confchg_cb ( cpg_handle_t handle , const struct cpg_name * group_name ,
const struct cpg_address * member_list , size_t member_list_entries ,
const struct cpg_address * left_list , size_t left_list_entries ,
@ -604,12 +657,43 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
for ( i = 0 ; i < left_list_entries ; i + + ) {
const struct cpg_address * cpg_node = & left_list [ i ] ;
struct corosync_node * node ;
unsigned int j ;
node = ao2_find ( nodes , & cpg_node - > nodeid , OBJ_UNLINK | OBJ_SEARCH_KEY ) ;
if ( ! node ) {
continue ;
}
for ( j = 0 ; j < ARRAY_LEN ( event_types ) ; j + + ) {
struct ao2_container * messages ;
int messages_count ;
ast_rwlock_rdlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_confchg_cb rdlock \n " ) ;
if ( ! event_types [ j ] . subscribe ) {
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_confchg_cb unlock \n " ) ;
continue ;
}
if ( ! event_types [ j ] . cache_fn | | ! event_types [ j ] . message_type_fn ) {
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_confchg_cb unlock \n " ) ;
continue ;
}
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_confchg_cb unlock \n " ) ;
messages = stasis_cache_dump_by_eid ( event_types [ j ] . cache_fn ( ) , event_types [ j ] . message_type_fn ( ) , & node - > eid ) ;
messages_count = ao2_container_count ( messages ) ;
ast_log ( LOG_NOTICE , " Clearing %i events of type %s of node %i from stasis cache. \n " , messages_count , event_types [ j ] . name , node - > id ) ;
ao2_callback ( messages , OBJ_NODATA , clear_node_cache , event_types [ j ] . topic_fn ( ) ) ;
ast_log ( LOG_NOTICE , " Cleared events of type %s from stasis cache. \n " , event_types [ j ] . name ) ;
ao2_t_ref ( messages , - 1 , " Dispose of flushed cache " ) ;
}
publish_cluster_discovery_to_stasis_full ( node , 0 ) ;
ao2_ref ( node , - 1 ) ;
}
@ -622,24 +706,30 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
for ( i = 0 ; i < ARRAY_LEN ( event_types ) ; i + + ) {
struct ao2_container * messages ;
int messages_count ;
ast_rwlock_rdlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_confchg_cb rdlock \n " ) ;
if ( ! event_types [ i ] . publish ) {
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_confchg_cb unlock \n " ) ;
continue ;
}
if ( ! event_types [ i ] . cache_fn | | ! event_types [ i ] . message_type_fn ) {
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_confchg_cb unlock \n " ) ;
continue ;
}
messages = stasis_cache_dump_by_eid ( event_types [ i ] . cache_fn ( ) ,
event_types [ i ] . message_type_fn ( ) ,
& ast_eid_default ) ;
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cpg_confchg_cb unlock \n " ) ;
messages = stasis_cache_dump_by_eid ( event_types [ i ] . cache_fn ( ) , event_types [ i ] . message_type_fn ( ) , & ast_eid_default ) ;
messages_count = ao2_container_count ( messages ) ;
ast_log ( LOG_NOTICE , " Sending %i events of type %s to corosync. \n " , messages_count , event_types [ i ] . name ) ;
ao2_callback ( messages , OBJ_NODATA , dump_cache_cb , NULL ) ;
ast_log ( LOG_NOTICE , " Sent events of type %s to corosync. \n " , event_types [ i ] . name ) ;
ao2_t_ref ( messages , - 1 , " Dispose of dumped cache " ) ;
}
@ -658,14 +748,21 @@ static void send_cluster_notify(void)
char buf [ 128 ] ;
int res ;
if ( ( cs_err = corosync_cfg_local_get ( cfg_handle , & node_id ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed to extract Corosync node ID for this node. Not informing cluster of existance. \n " ) ;
return ;
}
if ( ! ast_rwlock_tryrdlock ( & init_cpg_lock ) ) {
ast_debug ( 5 , " send_cluster_notify rdlock \n " ) ;
if ( ( ( cs_err = corosync_cfg_get_node_addrs ( cfg_handle , node_id , 1 , & num_addrs , & corosync_addr ) ) ! = CS_OK ) | | ( num_addrs < 1 ) ) {
ast_log ( LOG_WARNING , " Failed to get local Corosync address. Not informing cluster of existance. \n " ) ;
return ;
if ( ( cs_err = corosync_cfg_local_get ( cfg_handle , & node_id ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed to extract Corosync node ID for this node. Not informing cluster of existance. \n " ) ;
return ;
}
if ( ( ( cs_err = corosync_cfg_get_node_addrs ( cfg_handle , node_id , 1 , & num_addrs , & corosync_addr ) ) ! = CS_OK ) | | ( num_addrs < 1 ) ) {
ast_log ( LOG_WARNING , " Failed to get local Corosync address. Not informing cluster of existance. \n " ) ;
return ;
}
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " send_cluster_notify unlock \n " ) ;
}
sa = ( struct sockaddr * ) corosync_addr . address ;
@ -681,7 +778,7 @@ static void send_cluster_notify(void)
AST_EVENT_IE_LOCAL_ADDR , AST_EVENT_IE_PLTYPE_STR , buf ,
AST_EVENT_IE_END ) ;
publish_event_to_corosync ( event ) ;
ast_ fr ee( event ) ;
ast_ ev ent_destroy ( event ) ;
}
static void * dispatch_thread_handler ( void * data )
@ -693,18 +790,29 @@ static void *dispatch_thread_handler(void *data)
{ . events = POLLIN , } ,
} ;
if ( ( cs_err = cpg_fd_get ( cpg_handle , & pfd [ 0 ] . fd ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to get CPG fd. This module is now broken. \n " ) ;
return NULL ;
}
if ( ! ast_rwlock_tryrdlock ( & init_cpg_lock ) ) {
ast_debug ( 5 , " dispatch_thread_handler rdlock \n " ) ;
if ( ( cs_err = cpg_fd_get ( cpg_handle , & pfd [ 0 ] . fd ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to get CPG fd. This module is now broken. \n " ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
return NULL ;
}
if ( ( cs_err = corosync_cfg_fd_get ( cfg_handle , & pfd [ 1 ] . fd ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to get CFG fd. This module is now broken. \n " ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
return NULL ;
}
if ( ( cs_err = corosync_cfg_fd_get ( cfg_handle , & pfd [ 1 ] . fd ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to get CFG fd. This module is now broken. \n " ) ;
pfd [ 2 ] . fd = dispatch_thread . alert_pipe [ 0 ] ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
} else {
ast_log ( LOG_ERROR , " Failed to get fd: initiliazing CPG. This module is now broken. \n " ) ;
return NULL ;
}
pfd [ 2 ] . fd = dispatch_thread . alert_pipe [ 0 ] ;
send_cluster_notify ( ) ;
while ( ! dispatch_thread . stop ) {
int res ;
@ -715,65 +823,141 @@ static void *dispatch_thread_handler(void *data)
pfd [ 1 ] . revents = 0 ;
pfd [ 2 ] . revents = 0 ;
res = ast_poll ( pfd , ARRAY_LEN ( pfd ) , - 1 ) ;
res = ast_poll ( pfd , ARRAY_LEN ( pfd ) , COROSYNC_POLL_TIMEOUT ) ;
if ( res = = - 1 & & errno ! = EINTR & & errno ! = EAGAIN ) {
ast_log ( LOG_ERROR , " poll() error: %s (%d) \n " , strerror ( errno ) , errno ) ;
continue ;
}
if ( pfd [ 0 ] . revents & POLLIN ) {
if ( ( cs_err = cpg_dispatch ( cpg_handle , CS_DISPATCH_ALL ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed CPG dispatch: %u \n " , cs_err ) ;
cs_err = CS_ERR_BAD_HANDLE ;
} else if ( res = = 0 ) {
unsigned int local_nodeid ;
if ( ! ast_rwlock_tryrdlock ( & init_cpg_lock ) ) {
ast_debug ( 5 , " dispatch_thread_handler rdlock \n " ) ;
if ( ( cs_err = cpg_local_get ( cpg_handle , & local_nodeid ) ) = = CS_OK ) {
struct cpg_name name ;
struct cpg_address address [ CPG_MEMBERS_MAX ] ;
int entries = CPG_MEMBERS_MAX ;
ast_copy_string ( name . value , " asterisk " , sizeof ( name . value ) ) ;
name . length = strlen ( name . value ) ;
if ( ( cs_err = cpg_membership_get ( cpg_handle , & name , address , & entries ) ) = = CS_OK ) {
int i ;
int found = 0 ;
ast_debug ( 1 , " CPG group has %i node membership \n " , entries ) ;
for ( i = 0 ; ( i < entries ) & & ! found ; i + + ) {
if ( address [ i ] . nodeid = = local_nodeid )
found = 1 ;
}
if ( ! found ) {
ast_log ( LOG_WARNING , " Failed to check CPG node membership \n " ) ;
corosync_node_joined = 0 ;
cs_err = CS_ERR_BAD_HANDLE ;
}
} else {
ast_log ( LOG_WARNING , " Failed to get CPG node membership: %u \n " , cs_err ) ;
corosync_node_joined = 0 ;
cs_err = CS_ERR_BAD_HANDLE ;
}
} else {
ast_log ( LOG_WARNING , " Failed to get CPG local node id: %u \n " , cs_err ) ;
corosync_node_joined = 0 ;
cs_err = CS_ERR_BAD_HANDLE ;
}
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
} else {
ast_log ( LOG_WARNING , " Failed to check CPG node membership: initializing CPG. \n " ) ;
corosync_node_joined = 0 ;
cs_err = CS_ERR_BAD_HANDLE ;
}
}
if ( pfd [ 1 ] . revents & POLLIN ) {
if ( ( cs_err = corosync_cfg_dispatch ( cfg_handle , CS_DISPATCH_ALL ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed CFG dispatch: %u \n " , cs_err ) ;
} else {
if ( ! ast_rwlock_tryrdlock ( & init_cpg_lock ) ) {
ast_debug ( 5 , " dispatch_thread_handler rdlock \n " ) ;
if ( pfd [ 0 ] . revents & POLLIN ) {
if ( ( cs_err = cpg_dispatch ( cpg_handle , CS_DISPATCH_ALL ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed CPG dispatch: %u \n " , cs_err ) ;
}
}
if ( pfd [ 1 ] . revents & POLLIN ) {
if ( ( cs_err = corosync_cfg_dispatch ( cfg_handle , CS_DISPATCH_ALL ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed CFG dispatch: %u \n " , cs_err ) ;
}
}
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
} else {
ast_log ( LOG_WARNING , " Failed to dispatch: initializing CPG. \n " ) ;
}
}
if ( cs_err = = CS_ERR_LIBRARY | | cs_err = = CS_ERR_BAD_HANDLE ) {
struct cpg_name name ;
/* If corosync gets restarted out from under Asterisk, try to recover. */
ast_log ( LOG_NOTICE , " Attempting to recover from corosync failure. \n " ) ;
if ( ( cs_err = corosync_cfg_initialize ( & cfg_handle , & cfg_callbacks ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to initialize cfg (%d) \n " , ( int ) cs_err ) ;
sleep ( 5 ) ;
continue ;
if ( ! ast_rwlock_trywrlock ( & init_cpg_lock ) ) {
struct cpg_name name ;
ast_debug ( 5 , " dispatch_thread_handler wrlock \n " ) ;
corosync_node_joined = 0 ;
if ( cpg_handle & & ( cs_err = cpg_finalize ( cpg_handle ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to finalize cpg (%d) \n " , ( int ) cs_err ) ;
}
if ( cfg_handle & & ( cs_err = corosync_cfg_finalize ( cfg_handle ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to finalize cfg (%d) \n " , ( int ) cs_err ) ;
}
if ( ( cs_err = corosync_cfg_initialize ( & cfg_handle , & cfg_callbacks ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to initialize cfg (%d) \n " , ( int ) cs_err ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
sleep ( 5 ) ;
continue ;
}
if ( ( cs_err = cpg_initialize ( & cpg_handle , & cpg_callbacks ) ! = CS_OK ) ) {
ast_log ( LOG_ERROR , " Failed to initialize cpg (%d) \n " , ( int ) cs_err ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
sleep ( 5 ) ;
continue ;
}
if ( ( cs_err = cpg_fd_get ( cpg_handle , & pfd [ 0 ] . fd ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to get CPG fd. \n " ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
sleep ( 5 ) ;
continue ;
}
if ( ( cs_err = corosync_cfg_fd_get ( cfg_handle , & pfd [ 1 ] . fd ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to get CFG fd. \n " ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
sleep ( 5 ) ;
continue ;
}
ast_copy_string ( name . value , " asterisk " , sizeof ( name . value ) ) ;
name . length = strlen ( name . value ) ;
if ( ( cs_err = cpg_join ( cpg_handle , & name ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to join cpg (%d) \n " , ( int ) cs_err ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
sleep ( 5 ) ;
continue ;
}
corosync_node_joined = 1 ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " dispatch_thread_handler unlock \n " ) ;
ast_log ( LOG_NOTICE , " Corosync recovery complete. \n " ) ;
send_cluster_notify ( ) ;
} else {
ast_log ( LOG_NOTICE , " Failed to recover from corosync failure: initializing CPG. \n " ) ;
}
if ( ( cs_err = cpg_initialize ( & cpg_handle , & cpg_callbacks ) ! = CS_OK ) ) {
ast_log ( LOG_ERROR , " Failed to initialize cpg (%d) \n " , ( int ) cs_err ) ;
sleep ( 5 ) ;
continue ;
}
if ( ( cs_err = cpg_fd_get ( cpg_handle , & pfd [ 0 ] . fd ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to get CPG fd. \n " ) ;
sleep ( 5 ) ;
continue ;
}
if ( ( cs_err = corosync_cfg_fd_get ( cfg_handle , & pfd [ 1 ] . fd ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to get CFG fd. \n " ) ;
sleep ( 5 ) ;
continue ;
}
ast_copy_string ( name . value , " asterisk " , sizeof ( name . value ) ) ;
name . length = strlen ( name . value ) ;
if ( ( cs_err = cpg_join ( cpg_handle , & name ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to join cpg (%d) \n " , ( int ) cs_err ) ;
sleep ( 5 ) ;
continue ;
}
ast_log ( LOG_NOTICE , " Corosync recovery complete. \n " ) ;
send_cluster_notify ( ) ;
}
}
@ -803,63 +987,73 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
return CLI_SHOWUSAGE ;
}
cs_err = cpg_iteration_initialize ( cpg_handle , CPG_ITERATION_ALL , NULL , & cpg_iter ) ;
if ( cs_err ! = CS_OK ) {
ast_cli ( a - > fd , " Failed to initialize CPG iterator. \n " ) ;
return CLI_FAILURE ;
}
if ( ! ast_rwlock_tryrdlock ( & init_cpg_lock ) ) {
ast_debug ( 5 , " corosync_show_members rdlock \n " ) ;
cs_err = cpg_iteration_initialize ( cpg_handle , CPG_ITERATION_ALL , NULL , & cpg_iter ) ;
ast_cli ( a - > fd , " \n "
" ============================================================= \n "
" === Cluster members ========================================= \n "
" ============================================================= \n "
" === \n " ) ;
for ( i = 1 , cs_err = cpg_iteration_next ( cpg_iter , & cpg_desc ) ;
cs_err = = CS_OK ;
cs_err = cpg_iteration_next ( cpg_iter , & cpg_desc ) , i + + ) {
# ifdef HAVE_COROSYNC_CFG_STATE_TRACK
corosync_cfg_node_address_t addrs [ 8 ] ;
int num_addrs = 0 ;
unsigned int j ;
# endif
ast_cli ( a - > fd , " === Node %u \n " , i ) ;
ast_cli ( a - > fd , " === --> Group: %s \n " , cpg_desc . group . value ) ;
# ifdef HAVE_COROSYNC_CFG_STATE_TRACK
/*
* Corosync 2. x cfg lib needs to allocate 1 M on stack after calling
* corosync_cfg_get_node_addrs . netconsole thread has allocated only 0.5 M
* resulting in crash .
*/
cs_err = corosync_cfg_get_node_addrs ( cfg_handle , cpg_desc . nodeid ,
ARRAY_LEN ( addrs ) , & num_addrs , addrs ) ;
if ( cs_err ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed to get node addresses \n " ) ;
continue ;
ast_cli ( a - > fd , " Failed to initialize CPG iterator: %u. \n " , cs_err ) ;
cpg_iteration_finalize ( cpg_iter ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " corosync_show_members unlock \n " ) ;
return CLI_FAILURE ;
}
for ( j = 0 ; j < num_addrs ; j + + ) {
struct sockaddr * sa = ( struct sockaddr * ) addrs [ j ] . address ;
size_t sa_len = ( size_t ) addrs [ j ] . address_length ;
char buf [ 128 ] ;
ast_cli ( a - > fd , " \n "
" ============================================================= \n "
" === Cluster members ========================================= \n "
" ============================================================= \n "
" === \n " ) ;
for ( i = 1 , cs_err = cpg_iteration_next ( cpg_iter , & cpg_desc ) ;
cs_err = = CS_OK ;
cs_err = cpg_iteration_next ( cpg_iter , & cpg_desc ) , i + + ) {
# ifdef HAVE_COROSYNC_CFG_STATE_TRACK
corosync_cfg_node_address_t addrs [ 8 ] ;
int num_addrs = 0 ;
unsigned int j ;
# endif
ast_cli ( a - > fd , " === Node %u \n " , i ) ;
ast_cli ( a - > fd , " === --> Group: %s \n " , cpg_desc . group . value ) ;
# ifdef HAVE_COROSYNC_CFG_STATE_TRACK
/*
* Corosync 2. x cfg lib needs to allocate 1 M on stack after calling
* corosync_cfg_get_node_addrs . netconsole thread has allocated only 0.5 M
* resulting in crash .
*/
cs_err = corosync_cfg_get_node_addrs ( cfg_handle , cpg_desc . nodeid ,
ARRAY_LEN ( addrs ) , & num_addrs , addrs ) ;
if ( cs_err ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed to get node addresses \n " ) ;
continue ;
}
for ( j = 0 ; j < num_addrs ; j + + ) {
struct sockaddr * sa = ( struct sockaddr * ) addrs [ j ] . address ;
size_t sa_len = ( size_t ) addrs [ j ] . address_length ;
char buf [ 128 ] ;
getnameinfo ( sa , sa_len , buf , sizeof ( buf ) , NULL , 0 , NI_NUMERICHOST ) ;
getnameinfo ( sa , sa_len , buf , sizeof ( buf ) , NULL , 0 , NI_NUMERICHOST ) ;
ast_cli ( a - > fd , " === --> Address %u: %s \n " , j + 1 , buf ) ;
ast_cli ( a - > fd , " === --> Address %u: %s \n " , j + 1 , buf ) ;
}
# else
ast_cli ( a - > fd , " === --> Nodeid: % " PRIu32 " \n " , cpg_desc . nodeid ) ;
# endif
}
# else
ast_cli ( a - > fd , " === --> Nodeid: % " PRIu32 " \n " , cpg_desc . nodeid ) ;
# endif
}
ast_cli ( a - > fd , " === \n "
" ============================================================= \n "
" \n " ) ;
ast_cli ( a - > fd , " === \n "
" ============================================================= \n "
" \n " ) ;
cpg_iteration_finalize ( cpg_iter ) ;
cpg_iteration_finalize ( cpg_iter ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " corosync_show_members unlock \n " ) ;
} else {
ast_cli ( a - > fd , " Failed to initialize CPG iterator: initializing CPG. \n " ) ;
}
return CLI_SUCCESS ;
}
@ -893,10 +1087,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_FAILURE ;
}
ast_rwlock_rdlock ( & event_types_lock ) ;
event_types [ AST_EVENT_PING ] . publish_to_stasis ( event ) ;
ast_rwlock_unlock ( & event_types_lock ) ;
ast_event_destroy ( event ) ;
return CLI_SUCCESS ;
}
@ -927,6 +1120,7 @@ static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_c
" === \n " ) ;
ast_rwlock_rdlock ( & event_types_lock ) ;
ast_debug ( 5 , " corosync_show_config rdlock \n " ) ;
for ( i = 0 ; i < ARRAY_LEN ( event_types ) ; i + + ) {
if ( event_types [ i ] . publish ) {
ast_cli ( a - > fd , " === ==> Publishing Event Type: %s \n " ,
@ -938,6 +1132,7 @@ static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_c
}
}
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " corosync_show_config unlock \n " ) ;
ast_cli ( a - > fd , " === \n "
" ============================================================= \n "
@ -988,6 +1183,7 @@ static int load_general_config(struct ast_config *cfg)
unsigned int i ;
ast_rwlock_wrlock ( & event_types_lock ) ;
ast_debug ( 5 , " load_general_config wrlock \n " ) ;
for ( i = 0 ; i < ARRAY_LEN ( event_types ) ; i + + ) {
event_types [ i ] . publish = event_types [ i ] . publish_default ;
@ -1020,6 +1216,7 @@ static int load_general_config(struct ast_config *cfg)
}
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " load_general_config unlock \n " ) ;
return res ;
}
@ -1059,17 +1256,33 @@ static void cleanup_module(void)
if ( stasis_router ) {
/* Unsubscribe all topic forwards and cancel all message routes */
ast_rwlock_wrlock ( & event_types_lock ) ;
for ( i = 0 ; i < ARRAY_LEN ( event_types ) ; i + + ) {
struct ao2_container * messages = NULL ;
int messages_count ;
unsigned char subscribe = 0 ;
ast_rwlock_wrlock ( & event_types_lock ) ;
ast_debug ( 5 , " cleanup_module wrlock \n " ) ;
subscribe = event_types [ i ] . subscribe ;
if ( event_types [ i ] . sub ) {
event_types [ i ] . sub = stasis_forward_cancel ( event_types [ i ] . sub ) ;
stasis_message_router_remove ( stasis_router ,
event_types [ i ] . message_type_fn ( ) ) ;
stasis_message_router_remove ( stasis_router , event_types [ i ] . message_type_fn ( ) ) ;
}
event_types [ i ] . publish = 0 ;
event_types [ i ] . subscribe = 0 ;
ast_rwlock_unlock ( & event_types_lock ) ;
ast_debug ( 5 , " cleanup_module unlock \n " ) ;
if ( subscribe & & event_types [ i ] . cache_fn & & event_types [ i ] . message_type_fn ) {
messages = stasis_cache_dump_all ( event_types [ i ] . cache_fn ( ) , event_types [ i ] . message_type_fn ( ) ) ;
messages_count = ao2_container_count ( messages ) ;
ast_log ( LOG_NOTICE , " Clearing %i events of type %s of other nodes from stasis cache. \n " , messages_count , event_types [ i ] . name ) ;
ao2_callback ( messages , OBJ_NODATA , clear_node_cache , event_types [ i ] . topic_fn ( ) ) ;
ast_log ( LOG_NOTICE , " Cleared events of type %s from stasis cache. \n " , event_types [ i ] . name ) ;
ao2_t_ref ( messages , - 1 , " Dispose of flushed cache " ) ;
}
}
ast_rwlock_unlock ( & event_types_lock ) ;
stasis_message_router_unsubscribe_and_join ( stasis_router ) ;
stasis_router = NULL ;
@ -1103,16 +1316,21 @@ static void cleanup_module(void)
dispatch_thread . alert_pipe [ 1 ] = - 1 ;
}
if ( cpg_handle & & ( cs_err = cpg_finalize ( cpg_handle ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to finalize cpg (%d) \n " , ( int ) cs_err ) ;
}
cpg_handle = 0 ;
if ( ! ast_rwlock_trywrlock ( & init_cpg_lock ) ) {
ast_debug ( 5 , " cleanup_module wrlock \n " ) ;
if ( cpg_handle & & ( cs_err = cpg_finalize ( cpg_handle ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to finalize cpg (%d) \n " , ( int ) cs_err ) ;
}
cpg_handle = 0 ;
if ( cfg_handle & & ( cs_err = corosync_cfg_finalize ( cfg_handle ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to finalize cfg (%d) \n " , ( int ) cs_err ) ;
if ( cfg_handle & & ( cs_err = corosync_cfg_finalize ( cfg_handle ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to finalize cfg (%d) \n " , ( int ) cs_err ) ;
}
cfg_handle = 0 ;
corosync_node_joined = 0 ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " cleanup_module unlock \n " ) ;
}
cfg_handle = 0 ;
ao2_cleanup ( nodes ) ;
nodes = NULL ;
}
@ -1144,6 +1362,8 @@ static int load_module(void)
ast_log ( AST_LOG_ERROR , " Failed to create message router for corosync topic \n " ) ;
goto failed ;
}
stasis_message_router_set_congestion_limits ( stasis_router , - 1 ,
10 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL ) ;
if ( STASIS_MESSAGE_TYPE_INIT ( corosync_ping_message_type ) ! = 0 ) {
ast_log ( AST_LOG_ERROR , " Failed to initialize corosync ping message type \n " ) ;
@ -1155,39 +1375,55 @@ static int load_module(void)
goto failed ;
}
if ( ( cs_err = corosync_cfg_initialize ( & cfg_handle , & cfg_callbacks ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to initialize cfg: (%d) \n " , ( int ) cs_err ) ;
goto failed ;
}
if ( ! ast_rwlock_trywrlock ( & init_cpg_lock ) ) {
corosync_node_joined = 0 ;
ast_debug ( 5 , " load_module wrlock \n " ) ;
if ( ( cs_err = corosync_cfg_initialize ( & cfg_handle , & cfg_callbacks ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to initialize cfg: (%d) \n " , ( int ) cs_err ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " load_module unlock \n " ) ;
goto failed ;
}
if ( ( cs_err = cpg_initialize ( & cpg_handle , & cpg_callbacks ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to initialize cpg: (%d) \n " , ( int ) cs_err ) ;
goto failed ;
}
if ( ( cs_err = cpg_initialize ( & cpg_handle , & cpg_callbacks ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to initialize cpg: (%d) \n " , ( int ) cs_err ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " load_module unlock \n " ) ;
goto failed ;
}
ast_copy_string ( name . value , " asterisk " , sizeof ( name . value ) ) ;
name . length = strlen ( name . value ) ;
ast_copy_string ( name . value , " asterisk " , sizeof ( name . value ) ) ;
name . length = strlen ( name . value ) ;
if ( ( cs_err = cpg_join ( cpg_handle , & name ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to join: (%d) \n " , ( int ) cs_err ) ;
goto failed ;
}
if ( ( cs_err = cpg_join ( cpg_handle , & name ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to join: (%d) \n " , ( int ) cs_err ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " load_module unlock \n " ) ;
goto failed ;
}
if ( pipe ( dispatch_thread . alert_pipe ) = = - 1 ) {
ast_log ( LOG_ERROR , " Failed to create alert pipe: %s (%d) \n " ,
strerror ( errno ) , errno ) ;
goto failed ;
}
if ( pipe ( dispatch_thread . alert_pipe ) = = - 1 ) {
ast_log ( LOG_ERROR , " Failed to create alert pipe: %s (%d) \n " ,
strerror ( errno ) , errno ) ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " load_module unlock \n " ) ;
goto failed ;
}
corosync_node_joined = 1 ;
ast_rwlock_unlock ( & init_cpg_lock ) ;
ast_debug ( 5 , " load_module unlock \n " ) ;
if ( corosync_pthread_create_background ( & dispatch_thread . id , NULL ,
dispatch_thread_handler , NULL ) ) {
ast_log ( LOG_ERROR , " Error starting CPG dispatch thread. \n " ) ;
goto failed ;
}
if ( corosync_pthread_create_background ( & dispatch_thread . id , NULL ,
dispatch_thread_handler , NULL ) ) {
ast_log ( LOG_ERROR , " Error starting CPG dispatch thread. \n " ) ;
ast_cli_register_multiple ( corosync_cli , ARRAY_LEN ( corosync_cli ) ) ;
} else {
goto failed ;
}
ast_cli_register_multiple ( corosync_cli , ARRAY_LEN ( corosync_cli ) ) ;
return AST_MODULE_LOAD_SUCCESS ;
failed :