@ -44,20 +44,125 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
# include "asterisk/event.h"
# include "asterisk/cli.h"
# include "asterisk/devicestate.h"
# include "asterisk/app.h"
# include "asterisk/stasis.h"
# include "asterisk/stasis_message_router.h"
AST_RWLOCK_DEFINE_STATIC ( event_types_lock ) ;
static void publish_mwi_to_stasis ( struct ast_event * event ) ;
static void publish_device_state_to_stasis ( struct ast_event * event ) ;
/*! \brief The internal topic used for message forwarding and pings */
static struct stasis_topic * corosync_aggregate_topic ;
/*! \brief Our \ref stasis message router */
static struct stasis_message_router * stasis_router ;
/*! \brief Internal accessor for our topic */
static struct stasis_topic * corosync_topic ( void )
{
return corosync_aggregate_topic ;
}
/*! \brief A payload wrapper around a corosync ping event */
struct corosync_ping_payload {
/*! The corosync ping event being passed over \ref stasis */
struct ast_event * event ;
} ;
/*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
static void corosync_ping_payload_dtor ( void * obj )
{
struct corosync_ping_payload * payload = obj ;
ast_free ( payload - > event ) ;
}
/*! \brief Convert a Corosync PING to a \ref ast_event */
static struct ast_event * corosync_ping_to_event ( struct stasis_message * message )
{
struct corosync_ping_payload * payload ;
struct ast_event * event ;
struct ast_eid * event_eid ;
if ( ! message ) {
return NULL ;
}
payload = stasis_message_data ( message ) ;
if ( ! payload - > event ) {
return NULL ;
}
event_eid = ( struct ast_eid * ) ast_event_get_ie_raw ( payload - > event , AST_EVENT_IE_EID ) ;
event = ast_event_new ( AST_EVENT_PING ,
AST_EVENT_IE_EID , AST_EVENT_IE_PLTYPE_RAW , event_eid , sizeof ( * event_eid ) ,
AST_EVENT_IE_END ) ;
return event ;
}
STASIS_MESSAGE_TYPE_DEFN_LOCAL ( corosync_ping_message_type ,
. to_event = corosync_ping_to_event , ) ;
/*! \brief Publish a Corosync ping to \ref stasis */
static void publish_corosync_ping_to_stasis ( struct ast_event * event )
{
struct corosync_ping_payload * payload ;
struct stasis_message * message ;
ast_assert ( ast_event_get_type ( event ) = = AST_EVENT_PING ) ;
ast_assert ( event ! = NULL ) ;
payload = ao2_t_alloc ( sizeof ( * payload ) , corosync_ping_payload_dtor , " Create ping payload " ) ;
if ( ! payload ) {
return ;
}
payload - > event = event ;
message = stasis_message_create ( corosync_ping_message_type ( ) , payload ) ;
if ( ! message ) {
ao2_t_ref ( payload , - 1 , " Destroy payload on off nominal " ) ;
return ;
}
stasis_publish ( corosync_topic ( ) , message ) ;
ao2_t_ref ( payload , - 1 , " Hand ref to stasis " ) ;
ao2_t_ref ( message , - 1 , " Hand ref to stasis " ) ;
}
static struct {
const char * name ;
struct ast_event_sub * sub ;
struct stasis_forward * sub ;
unsigned char publish ;
unsigned char publish_default ;
unsigned char subscribe ;
unsigned char subscribe_default ;
struct stasis_topic * ( * topic_fn ) ( void ) ;
struct stasis_cache * ( * cache_fn ) ( void ) ;
struct stasis_message_type * ( * message_type_fn ) ( void ) ;
void ( * publish_to_stasis ) ( struct ast_event * ) ;
} event_types [ ] = {
[ AST_EVENT_MWI ] = { . name = " mwi " , } ,
[ AST_EVENT_DEVICE_STATE_CHANGE ] = { . name = " device_state " , } ,
[ AST_EVENT_PING ] = { . name = " ping " , . publish_default = 1 , . subscribe_default = 1 } ,
[ AST_EVENT_MWI ] = { . name = " mwi " ,
. topic_fn = ast_mwi_topic_all ,
. cache_fn = ast_mwi_state_cache ,
. message_type_fn = ast_mwi_state_type ,
. publish_to_stasis = publish_mwi_to_stasis , } ,
[ AST_EVENT_DEVICE_STATE_CHANGE ] = { . name = " device_state " ,
. topic_fn = ast_device_state_topic_all ,
. cache_fn = ast_device_state_cache ,
. message_type_fn = ast_device_state_message_type ,
. publish_to_stasis = publish_device_state_to_stasis , } ,
[ AST_EVENT_PING ] = { . name = " ping " ,
. publish_default = 1 ,
. subscribe_default = 1 ,
. topic_fn = corosync_topic ,
. message_type_fn = corosync_ping_message_type ,
. publish_to_stasis = publish_corosync_ping_to_stasis , } ,
} ;
static struct {
@ -88,6 +193,71 @@ static corosync_cfg_callbacks_t cfg_callbacks = {
. corosync_cfg_shutdown_callback = cfg_shutdown_cb ,
} ;
/*! \brief Publish a received MWI \ref ast_event to \ref stasis */
static void publish_mwi_to_stasis ( struct ast_event * event )
{
const char * mailbox ;
const char * context ;
unsigned int new_msgs ;
unsigned int old_msgs ;
struct ast_eid * event_eid ;
ast_assert ( ast_event_get_type ( event ) = = AST_EVENT_MWI ) ;
mailbox = ast_event_get_ie_str ( event , AST_EVENT_IE_MAILBOX ) ;
context = ast_event_get_ie_str ( event , AST_EVENT_IE_CONTEXT ) ;
new_msgs = ast_event_get_ie_uint ( event , AST_EVENT_IE_NEWMSGS ) ;
old_msgs = ast_event_get_ie_uint ( event , AST_EVENT_IE_OLDMSGS ) ;
event_eid = ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
if ( ast_strlen_zero ( mailbox ) | | ast_strlen_zero ( context ) ) {
return ;
}
if ( new_msgs > INT_MAX ) {
new_msgs = INT_MAX ;
}
if ( old_msgs > INT_MAX ) {
old_msgs = INT_MAX ;
}
if ( ast_publish_mwi_state_full ( mailbox , context , ( int ) new_msgs ,
( int ) old_msgs , NULL , event_eid ) ) {
char eid [ 16 ] ;
ast_eid_to_str ( eid , sizeof ( eid ) , event_eid ) ;
ast_log ( LOG_WARNING , " Failed to publish MWI message for %s@%s from %s \n " ,
mailbox , context , eid ) ;
}
}
/*! \brief Publish a received device state \ref ast_event to \ref stasis */
static void publish_device_state_to_stasis ( struct ast_event * event )
{
const char * device ;
enum ast_device_state state ;
unsigned int cachable ;
struct ast_eid * event_eid ;
ast_assert ( ast_event_get_type ( event ) = = AST_EVENT_DEVICE_STATE_CHANGE ) ;
device = ast_event_get_ie_str ( event , AST_EVENT_IE_DEVICE ) ;
state = ast_event_get_ie_uint ( event , AST_EVENT_IE_STATE ) ;
cachable = ast_event_get_ie_uint ( event , AST_EVENT_IE_CACHABLE ) ;
event_eid = ( struct ast_eid * ) ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
if ( ast_strlen_zero ( device ) ) {
return ;
}
if ( ast_publish_device_state_full ( device , state , cachable , event_eid ) ) {
char eid [ 16 ] ;
ast_eid_to_str ( eid , sizeof ( eid ) , event_eid ) ;
ast_log ( LOG_WARNING , " Failed to publish device state message for %s from %s \n " ,
device , eid ) ;
}
}
static void cpg_deliver_cb ( cpg_handle_t handle , const struct cpg_name * group_name ,
uint32_t nodeid , uint32_t pid , void * msg , size_t msg_len ) ;
@ -101,8 +271,6 @@ static cpg_callbacks_t cpg_callbacks = {
. cpg_confchg_fn = cpg_confchg_cb ,
} ;
static void ast_event_cb ( const struct ast_event * event , void * data ) ;
# ifdef HAVE_COROSYNC_CFG_STATE_TRACK
static void cfg_state_track_cb (
corosync_cfg_state_notification_buffer_t * notification_buffer ,
@ -120,6 +288,8 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
uint32_t nodeid , uint32_t pid , void * msg , size_t msg_len )
{
struct ast_event * event ;
void ( * publish_handler ) ( struct ast_event * ) = NULL ;
enum ast_event_type event_type ;
if ( msg_len < ast_event_minimum_length ( ) ) {
ast_debug ( 1 , " Ignoring event that's too small. %u < %u \n " ,
@ -133,9 +303,17 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
return ;
}
event_type = ast_event_get_type ( msg ) ;
if ( event_type > AST_EVENT_TOTAL ) {
/* Egads, we don't support this */
return ;
}
ast_rwlock_rdlock ( & event_types_lock ) ;
if ( ! event_types [ ast_event_get_type ( msg ) ] . subscribe ) {
/* We are not configured to subscribe to these events. */
publish_handler = event_types [ event_type ] . publish_to_stasis ;
if ( ! event_types [ event_type ] . subscribe | | ! publish_handler ) {
/* We are not configured to subscribe to these events or
we have no way to publish it internally . */
ast_rwlock_unlock ( & event_types_lock ) ;
return ;
}
@ -147,20 +325,80 @@ static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_nam
memcpy ( event , msg , msg_len ) ;
if ( event_type = = AST_EVENT_PING ) {
const struct ast_eid * eid ;
char buf [ 128 ] = " " ;
eid = ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
ast_eid_to_str ( buf , sizeof ( buf ) , ( struct ast_eid * ) eid ) ;
ast_log ( LOG_NOTICE , " Got event PING from server with EID: '%s' \n " , buf ) ;
}
ast_debug ( 5 , " Publishing event %s (%d) to stasis \n " ,
ast_event_get_type_name ( event ) , event_type ) ;
publish_handler ( event ) ;
}
static void publish_to_corosync ( struct stasis_message * message )
{
cs_error_t cs_err ;
struct iovec iov ;
struct ast_event * event ;
event = stasis_message_to_event ( message ) ;
if ( ! event ) {
return ;
}
if ( ast_eid_cmp ( & ast_eid_default , ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ) ) {
/* If the event didn't originate from this server, don't send it back out. */
ast_event_destroy ( event ) ;
return ;
}
if ( ast_event_get_type ( event ) = = AST_EVENT_PING ) {
const struct ast_eid * eid ;
char buf [ 128 ] = " " ;
eid = ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
ast_eid_to_str ( buf , sizeof ( buf ) , ( struct ast_eid * ) eid ) ;
ast_log ( LOG_NOTICE , " (cpg_deliver_cb) Got event PING from server with EID: '%s' \n " , buf ) ;
ast_log ( LOG_NOTICE , " Sending event PING from this server with EID: '%s' \n " , buf ) ;
}
iov . iov_base = ( void * ) event ;
iov . iov_len = ast_event_get_size ( event ) ;
ast_debug ( 5 , " Publishing event %s (%d) to corosync \n " ,
ast_event_get_type_name ( event ) , ast_event_get_type ( event ) ) ;
ast_event_queue ( event ) ;
} else {
ast_event_queue_and_cache ( event ) ;
/* The stasis subscription will only exist if we are configured to publish
* these events , so just send away . */
if ( ( cs_err = cpg_mcast_joined ( cpg_handle , CPG_TYPE_FIFO , & iov , 1 ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " CPG mcast failed (%d) \n " , cs_err ) ;
}
}
static void stasis_message_cb ( void * data , struct stasis_subscription * sub , struct stasis_message * message )
{
if ( ! message ) {
return ;
}
publish_to_corosync ( message ) ;
}
static int dump_cache_cb ( void * obj , void * arg , int flags )
{
struct stasis_message * message = obj ;
if ( ! message ) {
return 0 ;
}
publish_to_corosync ( message ) ;
return 0 ;
}
static void cpg_confchg_cb ( cpg_handle_t handle , const struct cpg_name * group_name ,
const struct cpg_address * member_list , size_t member_list_entries ,
const struct cpg_address * left_list , size_t left_list_entries ,
@ -176,20 +414,27 @@ static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_nam
}
for ( i = 0 ; i < ARRAY_LEN ( event_types ) ; i + + ) {
struct a st_event_sub * event_sub ;
struct a o2_container * messages ;
ast_rwlock_rdlock ( & event_types_lock ) ;
if ( ! event_types [ i ] . publish ) {
ast_rwlock_unlock ( & event_types_lock ) ;
continue ;
}
if ( ! event_types [ i ] . cache_fn | | ! event_types [ i ] . message_type_fn ) {
ast_rwlock_unlock ( & event_types_lock ) ;
continue ;
}
messages = stasis_cache_dump_by_eid ( event_types [ i ] . cache_fn ( ) ,
event_types [ i ] . message_type_fn ( ) ,
& ast_eid_default ) ;
ast_rwlock_unlock ( & event_types_lock ) ;
event_sub = ast_event_subscribe_new ( i , ast_event_cb , NULL ) ;
ast_event_sub_append_ie_raw ( event_sub , AST_EVENT_IE_EID ,
& ast_eid_default , sizeof ( ast_eid_default ) ) ;
ast_event_dump_cache ( event_sub ) ;
ast_event_sub_destroy ( event_sub ) ;
ao2_callback ( messages , OBJ_NODATA , dump_cache_cb , NULL ) ;
ao2_t_ref ( messages , - 1 , " Dispose of dumped cache " ) ;
}
}
@ -231,13 +476,13 @@ static void *dispatch_thread_handler(void *data)
if ( pfd [ 0 ] . revents & POLLIN ) {
if ( ( cs_err = cpg_dispatch ( cpg_handle , CS_DISPATCH_ALL ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed CPG dispatch: % u \n " , cs_err ) ;
ast_log ( LOG_WARNING , " Failed CPG dispatch: % d \n " , cs_err ) ;
}
}
if ( pfd [ 1 ] . revents & POLLIN ) {
if ( ( cs_err = corosync_cfg_dispatch ( cfg_handle , CS_DISPATCH_ALL ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " Failed CFG dispatch: % u \n " , cs_err ) ;
ast_log ( LOG_WARNING , " Failed CFG dispatch: % d \n " , cs_err ) ;
}
}
@ -287,37 +532,6 @@ static void *dispatch_thread_handler(void *data)
return NULL ;
}
static void ast_event_cb ( const struct ast_event * event , void * data )
{
cs_error_t cs_err ;
struct iovec iov = {
. iov_base = ( void * ) event ,
. iov_len = ast_event_get_size ( event ) ,
} ;
if ( ast_event_get_type ( event ) = = AST_EVENT_PING ) {
const struct ast_eid * eid ;
char buf [ 128 ] = " " ;
eid = ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ;
ast_eid_to_str ( buf , sizeof ( buf ) , ( struct ast_eid * ) eid ) ;
ast_log ( LOG_NOTICE , " (ast_event_cb) Got event PING from server with EID: '%s' \n " , buf ) ;
}
if ( ast_eid_cmp ( & ast_eid_default ,
ast_event_get_ie_raw ( event , AST_EVENT_IE_EID ) ) ) {
/* If the event didn't originate from this server, don't send it back out. */
return ;
}
/* The ast_event subscription will only exist if we are configured to publish
* these events , so just send away . */
if ( ( cs_err = cpg_mcast_joined ( cpg_handle , CPG_TYPE_FIFO , & iov , 1 ) ) ! = CS_OK ) {
ast_log ( LOG_WARNING , " CPG mcast failed (%u) \n " , cs_err ) ;
}
}
static char * corosync_show_members ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
cs_error_t cs_err ;
@ -368,7 +582,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
continue ;
}
ast_cli ( a - > fd , " === Node % u \n " , i ) ;
ast_cli ( a - > fd , " === Node % d \n " , i ) ;
ast_cli ( a - > fd , " === --> Group: %s \n " , cpg_desc . group . value ) ;
for ( j = 0 ; j < num_addrs ; j + + ) {
@ -378,7 +592,7 @@ static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_
getnameinfo ( sa , sa_len , buf , sizeof ( buf ) , NULL , 0 , NI_NUMERICHOST ) ;
ast_cli ( a - > fd , " === --> Address % u : %s\n " , j + 1 , buf ) ;
ast_cli ( a - > fd , " === --> Address % d : %s\n " , j + 1 , buf ) ;
}
}
@ -421,7 +635,9 @@ static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
return CLI_FAILURE ;
}
ast_event_queue ( event ) ;
ast_rwlock_rdlock ( & event_types_lock ) ;
event_types [ AST_EVENT_PING ] . publish_to_stasis ( event ) ;
ast_rwlock_unlock ( & event_types_lock ) ;
return CLI_SUCCESS ;
}
@ -532,11 +748,16 @@ static int load_general_config(struct ast_config *cfg)
for ( i = 0 ; i < ARRAY_LEN ( event_types ) ; i + + ) {
if ( event_types [ i ] . publish & & ! event_types [ i ] . sub ) {
event_types [ i ] . sub = ast_event_subscribe ( i ,
ast_event_cb , " Corosync " , NULL ,
AST_EVENT_IE_END ) ;
event_types [ i ] . sub = stasis_forward_all ( event_types [ i ] . topic_fn ( ) ,
corosync_topic ( ) ) ;
stasis_message_router_add ( stasis_router ,
event_types [ i ] . message_type_fn ( ) ,
stasis_message_cb ,
NULL ) ;
} else if ( ! event_types [ i ] . publish & & event_types [ i ] . sub ) {
event_types [ i ] . sub = ast_event_unsubscribe ( event_types [ i ] . sub ) ;
event_types [ i ] . sub = stasis_forward_cancel ( event_types [ i ] . sub ) ;
stasis_message_router_remove ( stasis_router ,
event_types [ i ] . message_type_fn ( ) ) ;
}
}
@ -577,14 +798,32 @@ static void cleanup_module(void)
cs_error_t cs_err ;
unsigned int i ;
for ( i = 0 ; i < ARRAY_LEN ( event_types ) ; i + + ) {
if ( event_types [ i ] . sub ) {
event_types [ i ] . sub = ast_event_unsubscribe ( event_types [ i ] . sub ) ;
if ( stasis_router ) {
/* Unsubscribe all topic forwards and cancel all message routes */
ast_rwlock_wrlock ( & event_types_lock ) ;
for ( i = 0 ; i < ARRAY_LEN ( event_types ) ; i + + ) {
if ( event_types [ i ] . sub ) {
event_types [ i ] . sub = stasis_forward_cancel ( event_types [ i ] . sub ) ;
stasis_message_router_remove ( stasis_router ,
event_types [ i ] . message_type_fn ( ) ) ;
}
event_types [ i ] . publish = 0 ;
event_types [ i ] . subscribe = 0 ;
}
event_types [ i ] . publish = 0 ;
event_types [ i ] . subscribe = 0 ;
ast_rwlock_unlock ( & event_types_lock ) ;
stasis_message_router_unsubscribe_and_join ( stasis_router ) ;
stasis_router = NULL ;
}
if ( corosync_aggregate_topic ) {
ao2_t_ref ( corosync_aggregate_topic , - 1 , " Dispose of topic on cleanup " ) ;
corosync_aggregate_topic = NULL ;
}
STASIS_MESSAGE_TYPE_CLEANUP ( corosync_ping_message_type ) ;
if ( dispatch_thread . id ! = AST_PTHREADT_NULL ) {
char meepmeep = ' x ' ;
dispatch_thread . stop = 1 ;
@ -623,13 +862,30 @@ static int load_module(void)
enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE ;
struct cpg_name name ;
corosync_aggregate_topic = stasis_topic_create ( " corosync_aggregate_topic " ) ;
if ( ! corosync_aggregate_topic ) {
ast_log ( AST_LOG_ERROR , " Failed to create stasis topic for corosync \n " ) ;
goto failed ;
}
stasis_router = stasis_message_router_create ( corosync_aggregate_topic ) ;
if ( ! stasis_router ) {
ast_log ( AST_LOG_ERROR , " Failed to create message router for corosync topic \n " ) ;
goto failed ;
}
if ( STASIS_MESSAGE_TYPE_INIT ( corosync_ping_message_type ) ! = 0 ) {
ast_log ( AST_LOG_ERROR , " Failed to initialize corosync ping message type \n " ) ;
goto failed ;
}
if ( ( cs_err = corosync_cfg_initialize ( & cfg_handle , & cfg_callbacks ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to initialize cfg (%d) \n " , ( int ) cs_err ) ;
return AST_MODULE_LOAD_DECLINE ;
ast_log ( LOG_ERROR , " Failed to initialize cfg : (%d)\n " , ( int ) cs_err ) ;
goto failed ;
}
if ( ( cs_err = cpg_initialize ( & cpg_handle , & cpg_callbacks ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to initialize cpg (%d) \n " , ( int ) cs_err ) ;
ast_log ( LOG_ERROR , " Failed to initialize cpg : (%d)\n " , ( int ) cs_err ) ;
goto failed ;
}
@ -637,7 +893,7 @@ static int load_module(void)
name . length = strlen ( name . value ) ;
if ( ( cs_err = cpg_join ( cpg_handle , & name ) ) ! = CS_OK ) {
ast_log ( LOG_ERROR , " Failed to join (%d)\n " , ( int ) cs_err ) ;
ast_log ( LOG_ERROR , " Failed to join : (%d)\n " , ( int ) cs_err ) ;
goto failed ;
}