@ -47,11 +47,16 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
# include "asterisk/app.h"
# include "asterisk/stasis.h"
# include "asterisk/stasis_message_router.h"
# include "asterisk/stasis_system.h"
AST_RWLOCK_DEFINE_STATIC ( event_types_lock ) ;
static void publish_mwi_to_stasis ( struct ast_event * event ) ;
static void publish_device_state_to_stasis ( struct ast_event * event ) ;
static void publish_cluster_discovery_to_stasis ( struct ast_event * event ) ;
/*! \brief All the nodes that we're aware of */
static struct ao2_container * nodes ;
/*! \brief The internal topic used for message forwarding and pings */
static struct stasis_topic * corosync_aggregate_topic ;
@ -65,6 +70,78 @@ static struct stasis_topic *corosync_topic(void)
return corosync_aggregate_topic ;
}
struct corosync_node {
/*! The corosync ID */
int id ;
/*! The Asterisk EID */
struct ast_eid eid ;
/*! The IP address of the node */
struct ast_sockaddr addr ;
} ;
static struct corosync_node * corosync_node_alloc ( struct ast_event * event )
{
struct corosync_node * node ;
node = ao2_alloc_options ( sizeof ( * node ) , NULL , AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
if ( ! node ) {
return NULL ;
}
memcpy ( & node - > eid , ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) , sizeof ( node - > eid ) ) ;
node - > id = ast_event_get_ie_uint ( event , AST_EVENT_IE_NODE_ID ) ;
ast_sockaddr_parse ( & node - > addr , ast_event_get_ie_str ( event , AST_EVENT_IE_LOCAL_ADDR ) , PARSE_PORT_IGNORE ) ;
return node ;
}
static int corosync_node_hash_fn ( const void * obj , const int flags )
{
const struct corosync_node * node ;
const int * id ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_KEY :
id = obj ;
break ;
case OBJ_SEARCH_OBJECT :
node = obj ;
id = & node - > id ;
break ;
default :
ast_assert ( 0 ) ;
return 0 ;
}
return * id ;
}
static int corosync_node_cmp_fn ( void * obj , void * arg , int flags )
{
struct corosync_node * left = obj ;
struct corosync_node * right = arg ;
const int * id = arg ;
int cmp ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_OBJECT :
id = & right - > id ;
/* Fall through */
case OBJ_SEARCH_KEY :
cmp = ( left - > id = = * id ) ;
break ;
case OBJ_SEARCH_PARTIAL_KEY :
cmp = ( left - > id = = right - > id ) ;
break ;
default :
/* Sort can only work on something with a full or partial key. */
ast_assert ( 0 ) ;
cmp = 1 ;
break ;
}
return cmp ? CMP_MATCH : 0 ;
}
/*! \brief A payload wrapper around a corosync ping event */
struct corosync_ping_payload {
/*! The corosync ping event being passed over \ref stasis */
@ -167,6 +244,12 @@ static struct {
. topic_fn = corosync_topic ,
. message_type_fn = corosync_ping_message_type ,
. publish_to_stasis = publish_corosync_ping_to_stasis , } ,
[ AST_EVENT_CLUSTER_DISCOVERY ] = { . name = " cluster_discovery " ,
. publish_default = 1 ,
. subscribe_default = 1 ,
. topic_fn = ast_system_topic ,
. message_type_fn = ast_cluster_discovery_type ,
. publish_to_stasis = publish_cluster_discovery_to_stasis , } ,
} ;
static struct {
@ -197,6 +280,97 @@ static corosync_cfg_callbacks_t cfg_callbacks = {
. corosync_cfg_shutdown_callback = cfg_shutdown_cb ,
} ;
/*! \brief Publish cluster discovery to \ref stasis */
static void publish_cluster_discovery_to_stasis_full ( struct corosync_node * node , int joined )
{
struct ast_json * json ;
struct ast_json_payload * payload ;
struct stasis_message * message ;
char eid [ 18 ] ;
const char * addr ;
ast_eid_to_str ( eid , sizeof ( eid ) , & node - > eid ) ;
addr = ast_sockaddr_stringify_addr ( & node - > addr ) ;
ast_log ( AST_LOG_NOTICE , " Node %u (%s) at %s %s the cluster \n " ,
node - > id ,
eid ,
addr ,
joined ? " joined " : " left " ) ;
json = ast_json_pack ( " {s: s, s: i, s: s, s: i} " ,
" address " , addr ,
" node_id " , node - > id ,
" eid " , eid ,
" joined " , joined ) ;
if ( ! json ) {
return ;
}
payload = ast_json_payload_create ( json ) ;
if ( ! payload ) {
ast_json_unref ( json ) ;
return ;
}
message = stasis_message_create ( ast_cluster_discovery_type ( ) , payload ) ;
if ( ! message ) {
ast_json_unref ( json ) ;
ao2_ref ( payload , - 1 ) ;
return ;
}
stasis_publish ( ast_system_topic ( ) , message ) ;
ast_json_unref ( json ) ;
ao2_ref ( payload , - 1 ) ;
ao2_ref ( message , - 1 ) ;
}
static void send_cluster_notify ( void ) ;
/*! \brief Publish a received cluster discovery \ref ast_event to \ref stasis */
static void publish_cluster_discovery_to_stasis ( struct ast_event * event )
{
struct corosync_node * node ;
int id = ast_event_get_ie_uint ( event , AST_EVENT_IE_NODE_ID ) ;
struct ast_eid * event_eid ;
ast_assert ( ast_event_get_type ( event ) = = AST_EVENT_CLUSTER_DISCOVERY ) ;
event_eid = ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
if ( ! ast_eid_cmp ( & ast_eid_default , event_eid ) ) {
/* Don't feed events back in that originated locally. */
return ;
}
ao2_lock ( nodes ) ;
node = ao2_find ( nodes , & id , OBJ_SEARCH_KEY | OBJ_NOLOCK ) ;
if ( node ) {
/* We already know about this node */
ao2_unlock ( nodes ) ;
ao2_ref ( node , - 1 ) ;
return ;
}
node = corosync_node_alloc ( event ) ;
if ( ! node ) {
ao2_unlock ( nodes ) ;
return ;
}
ao2_link_flags ( nodes , node , OBJ_NOLOCK ) ;
ao2_unlock ( nodes ) ;
publish_cluster_discovery_to_stasis_full ( node , 1 ) ;
ao2_ref ( node , - 1 ) ;
/*
* When we get news that someone else has joined , we need to let them
* know we exist as well .
*/
send_cluster_notify ( ) ;
}
/*! \brief Publish a received MWI \ref ast_event to \ref stasis */
static void publish_mwi_to_stasis ( struct ast_event * event )
{
@ -228,7 +402,7 @@ static void publish_mwi_to_stasis(struct ast_event *event)
if ( ast_publish_mwi_state_full ( mailbox , context , ( int ) new_msgs ,
( int ) old_msgs , NULL , event_eid ) ) {
char eid [ 1 6 ] ;
char eid [ 1 8 ] ;
ast_eid_to_str ( eid , sizeof ( eid ) , event_eid ) ;
ast_log ( LOG_WARNING , " Failed to publish MWI message for %s@%s from %s \n " ,
mailbox , context , eid ) ;
@ -255,7 +429,7 @@ static void publish_device_state_to_stasis(struct ast_event *event)
}
if ( ast_publish_device_state_full ( device , state , cachable , event_eid ) ) {
char eid [ 1 6 ] ;
char eid [ 1 8 ] ;
ast_eid_to_str ( eid , sizeof ( eid ) , event_eid ) ;
ast_log ( LOG_WARNING , " Failed to publish device state message for %s from %s \n " ,
device , eid ) ;
@ -342,10 +516,27 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
publish_handler ( event ) ;
}
static void publish_ to_corosync( struct stasis_message * message )
static void publish_ event_to_corosync( struct ast_event * event )
{
cs_error_t cs_err ;
struct iovec iov ;
iov . iov_base = ( void * ) event ;
iov . iov_len = ast_event_get_size ( event ) ;
ast_debug ( 5 , " Publishing event %s (%u) to corosync \n " ,
ast_event_get_type_name ( event ) , ast_event_get_type ( event ) ) ;
/* The stasis subscription will only exist if we are configured to publish
* these events , so just send away . */
if ( ( cs_err = cpg_mcast_joined ( cpg_handle , CPG_TYPE_FIFO , & iov , 1 ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " CPG mcast failed (%u) for event %s (%u) \n " ,
cs_err , ast_event_get_type_name ( event ) , ast_event_get_type ( event ) ) ;
}
}
static void publish_to_corosync ( struct stasis_message * message )
{
struct ast_event * event ;
event = stasis_message_to_event ( message ) ;
@ -368,17 +559,7 @@ static void publish_to_corosync(struct stasis_message *message)
ast_log ( LOG_NOTICE , " Sending event PING from this server with EID: '%s' \n " , buf ) ;
}
iov . iov_base = ( void * ) event ;
iov . iov_len = ast_event_get_size ( event ) ;
ast_debug ( 5 , " Publishing event %s (%u) to corosync \n " ,
ast_event_get_type_name ( event ) , ast_event_get_type ( event ) ) ;
/* The stasis subscription will only exist if we are configured to publish
* these events , so just send away . */
if ( ( cs_err = cpg_mcast_joined ( cpg_handle , CPG_TYPE_FIFO , & iov , 1 ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " CPG mcast failed (%u) \n " , cs_err ) ;
}
publish_event_to_corosync ( event ) ;
}
static void stasis_message_cb ( void * data , struct stasis_subscription * sub , struct stasis_message * message )
@ -410,9 +591,22 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
{
unsigned int i ;
for ( i = 0 ; i < left_list_entries ; i + + ) {
const struct cpg_address * cpg_node = & left_list [ i ] ;
struct corosync_node * node ;
node = ao2_find ( nodes , & cpg_node - > nodeid , OBJ_UNLINK | OBJ_SEARCH_KEY ) ;
if ( ! node ) {
continue ;
}
publish_cluster_discovery_to_stasis_full ( node , 0 ) ;
ao2_ref ( node , - 1 ) ;
}
/* If any new nodes have joined, dump our cache of events we are publishing
* that originated from this server . */
if ( ! joined_list_entries ) {
return ;
}
@ -442,6 +636,45 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
}
}
/*! \brief Informs the cluster of our EID and our IP addresses */
static void send_cluster_notify ( void )
{
struct ast_event * event ;
unsigned int node_id ;
cs_error_t cs_err ;
corosync_cfg_node_address_t corosync_addr ;
int num_addrs = 0 ;
struct sockaddr * sa ;
size_t sa_len ;
char buf [ 128 ] ;
int res ;
if ( ( cs_err = corosync_cfg_local_get ( cfg_handle , & node_id ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed to extract Corosync node ID for this node. Not informing cluster of existance. \n " ) ;
return ;
}
if ( ( ( cs_err = corosync_cfg_get_node_addrs ( cfg_handle , node_id , 1 , & num_addrs , & corosync_addr ) ) ! = CS_OK ) | | ( num_addrs < 1 ) ) {
ast_log ( LOG_WARNING , " Failed to get local Corosync address. Not informing cluster of existance. \n " ) ;
return ;
}
sa = ( struct sockaddr * ) corosync_addr . address ;
sa_len = ( size_t ) corosync_addr . address_length ;
if ( ( res = getnameinfo ( sa , sa_len , buf , sizeof ( buf ) , NULL , 0 , NI_NUMERICHOST ) ) ) {
ast_log ( LOG_WARNING , " Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance. \n " ,
gai_strerror ( res ) , res ) ;
return ;
}
event = ast_event_new ( AST_EVENT_CLUSTER_DISCOVERY ,
AST_EVENT_IE_NODE_ID , AST_EVENT_IE_PLTYPE_UINT , node_id ,
AST_EVENT_IE_LOCAL_ADDR , AST_EVENT_IE_PLTYPE_STR , buf ,
AST_EVENT_IE_END ) ;
publish_event_to_corosync ( event ) ;
ast_free ( event ) ;
}
static void * dispatch_thread_handler ( void * data )
{
cs_error_t cs_err ;
@ -463,6 +696,7 @@ static void *dispatch_thread_handler(void *data)
pfd [ 2 ] . fd = dispatch_thread . alert_pipe [ 0 ] ;
send_cluster_notify ( ) ;
while ( ! dispatch_thread . stop ) {
int res ;
@ -530,6 +764,7 @@ static void *dispatch_thread_handler(void *data)
}
ast_log ( LOG_NOTICE , " Corosync recovery complete. \n " ) ;
send_cluster_notify ( ) ;
}
}
@ -858,6 +1093,9 @@ static void cleanup_module(void)
ast_log ( LOG_ERROR , " Failed to finalize cfg (%d) \n " , ( int ) cs_err ) ;
}
cfg_handle = 0 ;
ao2_cleanup ( nodes ) ;
nodes = NULL ;
}
static int load_module ( void )
@ -865,6 +1103,11 @@ static int load_module(void)
cs_error_t cs_err ;
struct cpg_name name ;
nodes = ao2_container_alloc ( 23 , corosync_node_hash_fn , corosync_node_cmp_fn ) ;
if ( ! nodes ) {
goto failed ;
}
corosync_aggregate_topic = stasis_topic_create ( " corosync_aggregate_topic " ) ;
if ( ! corosync_aggregate_topic ) {
ast_log ( AST_LOG_ERROR , " Failed to create stasis topic for corosync \n " ) ;