@ -41,6 +41,9 @@
# include "asterisk/stasis_bridges.h"
# include "asterisk/stasis_endpoints.h"
# include "asterisk/config_options.h"
# ifdef AST_DEVMODE
# include "asterisk/cli.h"
# endif
/*** DOCUMENTATION
< managerEvent language = " en_US " name = " UserEvent " >
@ -304,14 +307,67 @@ static struct ast_threadpool *pool;
STASIS_MESSAGE_TYPE_DEFN ( stasis_subscription_change_type ) ;
# ifdef AST_DEVMODE
/*! The number of buckets to use for topic statistics */
# define TOPIC_STATISTICS_BUCKETS 57
/*! The number of buckets to use for subscription statistics */
# define SUBSCRIPTION_STATISTICS_BUCKETS 57
/*! Container which stores statistics for topics */
static struct ao2_container * topic_statistics ;
/*! Container which stores statistics for subscriptions */
static struct ao2_container * subscription_statistics ;
/*! \internal */
struct stasis_message_type_statistics {
/*! \brief The number of messages of this published */
int published ;
/*! \brief The number of messages of this that did not reach a subscriber */
int unused ;
/*! \brief The stasis message type */
struct stasis_message_type * message_type ;
} ;
/*! Lock to protect the message types vector */
AST_MUTEX_DEFINE_STATIC ( message_type_statistics_lock ) ;
/*! Vector containing message type information */
static AST_VECTOR ( , struct stasis_message_type_statistics ) message_type_statistics ;
/*! \internal */
struct stasis_topic_statistics {
/*! \brief The number of messages that were not dispatched to any subscriber */
int messages_not_dispatched ;
/*! \brief The number of messages that were dispatched to at least 1 subscriber */
int messages_dispatched ;
/*! \brief Highest time spent dispatching messages to subscribers */
int64_t highest_time_dispatched ;
/*! \brief Lowest time spent dispatching messages to subscribers */
int64_t lowest_time_dispatched ;
/*! \brief The number of subscribers to this topic */
int subscriber_count ;
/*! \brief Name of the topic */
char name [ 0 ] ;
} ;
# endif
/*! \internal */
struct stasis_topic {
char * name ;
/*! Variable length array of the subscribers */
AST_VECTOR ( , struct stasis_subscription * ) subscribers ;
/*! Topics forwarding into this topic */
AST_VECTOR ( , struct stasis_topic * ) upstream_topics ;
# ifdef AST_DEVMODE
struct stasis_topic_statistics * statistics ;
# endif
/*! Name of the topic */
char name [ 0 ] ;
} ;
/* Forward declarations for the tightly-coupled subscription object */
@ -337,28 +393,54 @@ static void topic_dtor(void *obj)
* unsubscribed before we get here . */
ast_assert ( AST_VECTOR_SIZE ( & topic - > subscribers ) = = 0 ) ;
ast_free ( topic - > name ) ;
topic - > name = NULL ;
AST_VECTOR_FREE ( & topic - > subscribers ) ;
AST_VECTOR_FREE ( & topic - > upstream_topics ) ;
# ifdef AST_DEVMODE
if ( topic - > statistics ) {
ao2_unlink ( topic_statistics , topic - > statistics ) ;
ao2_ref ( topic - > statistics , - 1 ) ;
}
# endif
}
# ifdef AST_DEVMODE
static struct stasis_topic_statistics * stasis_topic_statistics_create ( const char * name )
{
struct stasis_topic_statistics * statistics ;
statistics = ao2_alloc ( sizeof ( * statistics ) + strlen ( name ) + 1 , NULL ) ;
if ( ! statistics ) {
return NULL ;
}
strcpy ( statistics - > name , name ) ; /* SAFE */
ao2_link ( topic_statistics , statistics ) ;
return statistics ;
}
# endif
struct stasis_topic * stasis_topic_create ( const char * name )
{
struct stasis_topic * topic ;
int res = 0 ;
topic = ao2_t_alloc ( sizeof ( * topic ) , topic_dtor , name ) ;
topic = ao2_t_alloc ( sizeof ( * topic ) + strlen ( name ) + 1 , topic_dtor , name ) ;
if ( ! topic ) {
return NULL ;
}
topic - > name = ast_strdup ( name ) ;
strcpy ( topic - > name , name ) ; /* SAFE */
res | = AST_VECTOR_INIT ( & topic - > subscribers , INITIAL_SUBSCRIBERS_MAX ) ;
res | = AST_VECTOR_INIT ( & topic - > upstream_topics , 0 ) ;
# ifdef AST_DEVMODE
topic - > statistics = stasis_topic_statistics_create ( name ) ;
if ( ! topic - > name | | ! topic - > statistics | | res ) {
# else
if ( ! topic - > name | | res ) {
ao2_cleanup ( topic ) ;
# endif
ao2_ref ( topic , - 1 ) ;
return NULL ;
}
@ -375,6 +457,35 @@ size_t stasis_topic_subscribers(const struct stasis_topic *topic)
return AST_VECTOR_SIZE ( & topic - > subscribers ) ;
}
# ifdef AST_DEVMODE
struct stasis_subscription_statistics {
/*! \brief The filename where the subscription originates */
const char * file ;
/*! \brief The line number where the subscription originates */
int lineno ;
/*! \brief The function where the subscription originates */
const char * func ;
/*! \brief The number of messages that were filtered out */
int messages_dropped ;
/*! \brief The number of messages that passed filtering */
int messages_passed ;
/*! \brief Highest time spent invoking a message */
int64_t highest_time_invoked ;
/*! \brief The message type that currently took the longest to process */
struct stasis_message_type * highest_time_message_type ;
/*! \brief Lowest time spent invoking a message */
int64_t lowest_time_invoked ;
/*! \brief Using a mailbox to queue messages */
int uses_mailbox ;
/*! \brief Using stasis threadpool for handling messages */
int uses_threadpool ;
/*! \brief Name of the topic we subscribed to */
char * topic ;
/*! \brief Unique ID of the subscription */
char uniqueid [ 0 ] ;
} ;
# endif
/*! \internal */
struct stasis_subscription {
/*! Unique ID for this subscription */
@ -403,6 +514,11 @@ struct stasis_subscription {
enum stasis_subscription_message_formatters accepted_formatters ;
/*! The message filter currently in use */
enum stasis_subscription_message_filter filter ;
# ifdef AST_DEVMODE
/*! Statistics information */
struct stasis_subscription_statistics * statistics ;
# endif
} ;
static void subscription_dtor ( void * obj )
@ -423,6 +539,13 @@ static void subscription_dtor(void *obj)
ast_cond_destroy ( & sub - > join_cond ) ;
AST_VECTOR_FREE ( & sub - > accepted_message_types ) ;
# ifdef AST_DEVMODE
if ( sub - > statistics ) {
ao2_unlink ( subscription_statistics , sub - > statistics ) ;
ao2_ref ( sub - > statistics , - 1 ) ;
}
# endif
}
/*!
@ -436,6 +559,12 @@ static void subscription_invoke(struct stasis_subscription *sub,
{
unsigned int final = stasis_subscription_final_message ( sub , message ) ;
int message_type_id = stasis_message_type_id ( stasis_subscription_change_type ( ) ) ;
# ifdef AST_DEVMODE
struct timeval start ;
int elapsed ;
start = ast_tvnow ( ) ;
# endif
/* Notify that the final message has been received */
if ( final ) {
@ -462,6 +591,19 @@ static void subscription_invoke(struct stasis_subscription *sub,
ast_cond_signal ( & sub - > join_cond ) ;
ao2_unlock ( sub ) ;
}
# ifdef AST_DEVMODE
elapsed = ast_tvdiff_ms ( ast_tvnow ( ) , start ) ;
if ( elapsed > sub - > statistics - > highest_time_invoked ) {
sub - > statistics - > highest_time_invoked = elapsed ;
ao2_lock ( sub - > statistics ) ;
sub - > statistics - > highest_time_message_type = stasis_message_type ( message ) ;
ao2_unlock ( sub - > statistics ) ;
}
if ( elapsed < sub - > statistics - > lowest_time_invoked ) {
sub - > statistics - > lowest_time_invoked = elapsed ;
}
# endif
}
static void send_subscription_subscribe ( struct stasis_topic * topic , struct stasis_subscription * sub ) ;
@ -471,12 +613,51 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
{
}
# ifdef AST_DEVMODE
static struct stasis_subscription_statistics * stasis_subscription_statistics_create ( const char * uniqueid ,
const char * topic , int needs_mailbox , int use_thread_pool , const char * file , int lineno ,
const char * func )
{
struct stasis_subscription_statistics * statistics ;
size_t uniqueid_len = strlen ( uniqueid ) + 1 ;
statistics = ao2_alloc ( sizeof ( * statistics ) + uniqueid_len + strlen ( topic ) + 1 , NULL ) ;
if ( ! statistics ) {
return NULL ;
}
statistics - > file = file ;
statistics - > lineno = lineno ;
statistics - > func = func ;
statistics - > uses_mailbox = needs_mailbox ;
statistics - > uses_threadpool = use_thread_pool ;
strcpy ( statistics - > uniqueid , uniqueid ) ; /* SAFE */
statistics - > topic = statistics - > uniqueid + uniqueid_len ;
strcpy ( statistics - > topic , topic ) ; /* SAFE */
ao2_link ( subscription_statistics , statistics ) ;
return statistics ;
}
# endif
# ifdef AST_DEVMODE
struct stasis_subscription * internal_stasis_subscribe (
struct stasis_topic * topic ,
stasis_subscription_cb callback ,
void * data ,
int needs_mailbox ,
int use_thread_pool ,
const char * file ,
int lineno ,
const char * func )
# else
struct stasis_subscription * internal_stasis_subscribe (
struct stasis_topic * topic ,
stasis_subscription_cb callback ,
void * data ,
int needs_mailbox ,
int use_thread_pool )
# endif
{
struct stasis_subscription * sub ;
@ -491,6 +672,15 @@ struct stasis_subscription *internal_stasis_subscribe(
}
ast_uuid_generate_str ( sub - > uniqueid , sizeof ( sub - > uniqueid ) ) ;
# ifdef AST_DEVMODE
sub - > statistics = stasis_subscription_statistics_create ( sub - > uniqueid , topic - > name , needs_mailbox ,
use_thread_pool , file , lineno , func ) ;
if ( ! sub - > statistics ) {
ao2_ref ( sub , - 1 ) ;
return NULL ;
}
# endif
if ( needs_mailbox ) {
char tps_name [ AST_TASKPROCESSOR_MAX_NAME + 1 ] ;
@ -538,6 +728,18 @@ struct stasis_subscription *internal_stasis_subscribe(
return sub ;
}
# ifdef AST_DEVMODE
struct stasis_subscription * __stasis_subscribe (
struct stasis_topic * topic ,
stasis_subscription_cb callback ,
void * data ,
const char * file ,
int lineno ,
const char * func )
{
return internal_stasis_subscribe ( topic , callback , data , 1 , 0 , file , lineno , func ) ;
}
# else
struct stasis_subscription * stasis_subscribe (
struct stasis_topic * topic ,
stasis_subscription_cb callback ,
@ -545,7 +747,20 @@ struct stasis_subscription *stasis_subscribe(
{
return internal_stasis_subscribe ( topic , callback , data , 1 , 0 ) ;
}
# endif
# ifdef AST_DEVMODE
struct stasis_subscription * __stasis_subscribe_pool (
struct stasis_topic * topic ,
stasis_subscription_cb callback ,
void * data ,
const char * file ,
int lineno ,
const char * func )
{
return internal_stasis_subscribe ( topic , callback , data , 1 , 1 , file , lineno , func ) ;
}
# else
struct stasis_subscription * stasis_subscribe_pool (
struct stasis_topic * topic ,
stasis_subscription_cb callback ,
@ -553,6 +768,7 @@ struct stasis_subscription *stasis_subscribe_pool(
{
return internal_stasis_subscribe ( topic , callback , data , 1 , 1 ) ;
}
# endif
static int sub_cleanup ( void * data )
{
@ -808,6 +1024,11 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
topic_add_subscription (
AST_VECTOR_GET ( & topic - > upstream_topics , idx ) , sub ) ;
}
# ifdef AST_DEVMODE
topic - > statistics - > subscriber_count + = 1 ;
# endif
ao2_unlock ( topic ) ;
return 0 ;
@ -825,6 +1046,13 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
}
res = AST_VECTOR_REMOVE_ELEM_UNORDERED ( & topic - > subscribers , sub ,
AST_VECTOR_ELEM_CLEANUP_NOOP ) ;
# ifdef AST_DEVMODE
if ( ! res ) {
topic - > statistics - > subscriber_count - = 1 ;
}
# endif
ao2_unlock ( topic ) ;
return res ;
@ -885,8 +1113,10 @@ static int dispatch_exec_sync(struct ast_taskprocessor_local *local)
* \ param message The message to send
* \ param synchronous If non - zero , synchronize on the subscriber receiving
* the message
* \ retval 0 if message was not dispatched
* \ retval 1 if message was dispatched
*/
static void dispatch_message ( struct stasis_subscription * sub ,
static unsigned int dispatch_message ( struct stasis_subscription * sub ,
struct stasis_message * message ,
int synchronous )
{
@ -938,14 +1168,22 @@ static void dispatch_message(struct stasis_subscription *sub,
break ;
}
return ;
# ifdef AST_DEVMODE
ast_atomic_fetchadd_int ( & sub - > statistics - > messages_dropped , + 1 ) ;
# endif
return 0 ;
} while ( 0 ) ;
# ifdef AST_DEVMODE
ast_atomic_fetchadd_int ( & sub - > statistics - > messages_passed , + 1 ) ;
# endif
if ( ! sub - > mailbox ) {
/* Dispatch directly */
subscription_invoke ( sub , message ) ;
return ;
return 1 ;
}
/* Bump the message for the taskprocessor push. This will get de-ref'd
@ -957,6 +1195,7 @@ static void dispatch_message(struct stasis_subscription *sub,
/* Push failed; ugh. */
ast_log ( LOG_ERROR , " Dropping async dispatch \n " ) ;
ao2_cleanup ( message ) ;
return 0 ;
}
} else {
struct sync_task_data std ;
@ -972,7 +1211,7 @@ static void dispatch_message(struct stasis_subscription *sub,
ao2_cleanup ( message ) ;
ast_mutex_destroy ( & std . lock ) ;
ast_cond_destroy ( & std . cond ) ;
return ;
return 0 ;
}
ast_mutex_lock ( & std . lock ) ;
@ -984,6 +1223,8 @@ static void dispatch_message(struct stasis_subscription *sub,
ast_mutex_destroy ( & std . lock ) ;
ast_cond_destroy ( & std . cond ) ;
}
return 1 ;
}
/*!
@ -997,12 +1238,41 @@ static void publish_msg(struct stasis_topic *topic,
struct stasis_message * message , struct stasis_subscription * sync_sub )
{
size_t i ;
unsigned int dispatched = 0 ;
# ifdef AST_DEVMODE
int message_type_id = stasis_message_type_id ( stasis_message_type ( message ) ) ;
struct stasis_message_type_statistics * statistics ;
struct timeval start ;
int elapsed ;
# endif
ast_assert ( topic ! = NULL ) ;
ast_assert ( message ! = NULL ) ;
# ifdef AST_DEVMODE
ast_mutex_lock ( & message_type_statistics_lock ) ;
if ( message_type_id > = AST_VECTOR_SIZE ( & message_type_statistics ) ) {
struct stasis_message_type_statistics new_statistics = {
. published = 0 ,
} ;
if ( AST_VECTOR_REPLACE ( & message_type_statistics , message_type_id , new_statistics ) ) {
ast_mutex_unlock ( & message_type_statistics_lock ) ;
return ;
}
}
statistics = AST_VECTOR_GET_ADDR ( & message_type_statistics , message_type_id ) ;
statistics - > message_type = stasis_message_type ( message ) ;
ast_mutex_unlock ( & message_type_statistics_lock ) ;
ast_atomic_fetchadd_int ( & statistics - > published , + 1 ) ;
# endif
/* If there are no subscribers don't bother */
if ( ! stasis_topic_subscribers ( topic ) ) {
# ifdef AST_DEVMODE
ast_atomic_fetchadd_int ( & statistics - > unused , + 1 ) ;
ast_atomic_fetchadd_int ( & topic - > statistics - > messages_not_dispatched , + 1 ) ;
# endif
return ;
}
@ -1011,15 +1281,35 @@ static void publish_msg(struct stasis_topic *topic,
* Make sure we hold onto a reference while dispatching .
*/
ao2_ref ( topic , + 1 ) ;
# ifdef AST_DEVMODE
start = ast_tvnow ( ) ;
# endif
ao2_lock ( topic ) ;
for ( i = 0 ; i < AST_VECTOR_SIZE ( & topic - > subscribers ) ; + + i ) {
struct stasis_subscription * sub = AST_VECTOR_GET ( & topic - > subscribers , i ) ;
ast_assert ( sub ! = NULL ) ;
dispatch_message ( sub , message , ( sub = = sync_sub ) ) ;
dispatched + = dispatch_message ( sub , message , ( sub = = sync_sub ) ) ;
}
ao2_unlock ( topic ) ;
# ifdef AST_DEVMODE
elapsed = ast_tvdiff_ms ( ast_tvnow ( ) , start ) ;
if ( elapsed > topic - > statistics - > highest_time_dispatched ) {
topic - > statistics - > highest_time_dispatched = elapsed ;
}
if ( elapsed < topic - > statistics - > lowest_time_dispatched ) {
topic - > statistics - > lowest_time_dispatched = elapsed ;
}
if ( dispatched ) {
ast_atomic_fetchadd_int ( & topic - > statistics - > messages_dispatched , + 1 ) ;
} else {
ast_atomic_fetchadd_int ( & statistics - > unused , + 1 ) ;
ast_atomic_fetchadd_int ( & topic - > statistics - > messages_not_dispatched , + 1 ) ;
}
# endif
ao2_ref ( topic , - 1 ) ;
}
@ -1805,9 +2095,458 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
/*! @} */
# ifdef AST_DEVMODE
/*!
* \ internal
* \ brief CLI command implementation for ' stasis statistics show subscriptions '
*/
static char * statistics_show_subscriptions ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct ao2_iterator iter ;
struct stasis_subscription_statistics * statistics ;
int count = 0 ;
int dropped = 0 ;
int passed = 0 ;
# define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
# define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
# define FMT_FIELDS2 "%-64s %10d %10d\n"
switch ( cmd ) {
case CLI_INIT :
e - > command = " stasis statistics show subscriptions " ;
e - > usage =
" Usage: stasis statistics show subscriptions \n "
" Shows a list of subscriptions and their general statistics \n " ;
return NULL ;
case CLI_GENERATE :
return NULL ;
}
if ( a - > argc ! = e - > args ) {
return CLI_SHOWUSAGE ;
}
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Subscription " , " Dropped " , " Passed " , " Lowest Invoke " , " Highest Invoke " ) ;
iter = ao2_iterator_init ( subscription_statistics , 0 ) ;
while ( ( statistics = ao2_iterator_next ( & iter ) ) ) {
ast_cli ( a - > fd , FMT_FIELDS , statistics - > uniqueid , statistics - > messages_dropped , statistics - > messages_passed ,
statistics - > lowest_time_invoked , statistics - > highest_time_invoked ) ;
dropped + = statistics - > messages_dropped ;
passed + = statistics - > messages_passed ;
ao2_ref ( statistics , - 1 ) ;
+ + count ;
}
ao2_iterator_destroy ( & iter ) ;
ast_cli ( a - > fd , FMT_FIELDS2 , " Total " , dropped , passed ) ;
ast_cli ( a - > fd , " \n %d subscriptions \n \n " , count ) ;
# undef FMT_HEADERS
# undef FMT_FIELDS
# undef FMT_FIELDS2
return CLI_SUCCESS ;
}
/*!
* \ internal
* \ brief CLI tab completion for subscription statistics names
*/
static char * subscription_statistics_complete_name ( const char * word , int state )
{
struct stasis_subscription_statistics * statistics ;
struct ao2_iterator it_statistics ;
int wordlen = strlen ( word ) ;
int which = 0 ;
char * result = NULL ;
it_statistics = ao2_iterator_init ( subscription_statistics , 0 ) ;
while ( ( statistics = ao2_iterator_next ( & it_statistics ) ) ) {
if ( ! strncasecmp ( word , statistics - > uniqueid , wordlen )
& & + + which > state ) {
result = ast_strdup ( statistics - > uniqueid ) ;
}
ao2_ref ( statistics , - 1 ) ;
if ( result ) {
break ;
}
}
ao2_iterator_destroy ( & it_statistics ) ;
return result ;
}
/*!
* \ internal
* \ brief CLI command implementation for ' stasis statistics show subscription '
*/
static char * statistics_show_subscription ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct stasis_subscription_statistics * statistics ;
switch ( cmd ) {
case CLI_INIT :
e - > command = " stasis statistics show subscription " ;
e - > usage =
" Usage: stasis statistics show subscription <uniqueid> \n "
" Show stasis subscription statistics. \n " ;
return NULL ;
case CLI_GENERATE :
if ( a - > pos = = 4 ) {
return subscription_statistics_complete_name ( a - > word , a - > n ) ;
} else {
return NULL ;
}
}
if ( a - > argc ! = 5 ) {
return CLI_SHOWUSAGE ;
}
statistics = ao2_find ( subscription_statistics , a - > argv [ 4 ] , OBJ_SEARCH_KEY ) ;
if ( ! statistics ) {
ast_cli ( a - > fd , " Specified subscription '%s' does not exist \n " , a - > argv [ 4 ] ) ;
return CLI_FAILURE ;
}
ast_cli ( a - > fd , " Subscription: %s \n " , statistics - > uniqueid ) ;
ast_cli ( a - > fd , " Topic: %s \n " , statistics - > topic ) ;
ast_cli ( a - > fd , " Source filename: %s \n " , S_OR ( statistics - > file , " <unavailable> " ) ) ;
ast_cli ( a - > fd , " Source line number: %d \n " , statistics - > lineno ) ;
ast_cli ( a - > fd , " Source function: %s \n " , S_OR ( statistics - > func , " <unavailable> " ) ) ;
ast_cli ( a - > fd , " Number of messages dropped due to filtering: %d \n " , statistics - > messages_dropped ) ;
ast_cli ( a - > fd , " Number of messages passed to subscriber callback: %d \n " , statistics - > messages_passed ) ;
ast_cli ( a - > fd , " Using mailbox to queue messages: %s \n " , statistics - > uses_mailbox ? " Yes " : " No " ) ;
ast_cli ( a - > fd , " Using stasis threadpool for handling messages: %s \n " , statistics - > uses_threadpool ? " Yes " : " No " ) ;
ast_cli ( a - > fd , " Lowest amount of time (in milliseconds) spent invoking message: %ld \n " , statistics - > lowest_time_invoked ) ;
ast_cli ( a - > fd , " Highest amount of time (in milliseconds) spent invoking message: %ld \n " , statistics - > highest_time_invoked ) ;
ao2_lock ( statistics ) ;
if ( statistics - > highest_time_message_type ) {
ast_cli ( a - > fd , " Offender message type for highest invoking time: %s \n " , stasis_message_type_name ( statistics - > highest_time_message_type ) ) ;
}
ao2_unlock ( statistics ) ;
ao2_ref ( statistics , - 1 ) ;
return CLI_SUCCESS ;
}
/*!
* \ internal
* \ brief CLI command implementation for ' stasis statistics show topics '
*/
static char * statistics_show_topics ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct ao2_iterator iter ;
struct stasis_topic_statistics * statistics ;
int count = 0 ;
int not_dispatched = 0 ;
int dispatched = 0 ;
# define FMT_HEADERS "%-64s %10s %10s %16s %16s\n"
# define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n"
# define FMT_FIELDS2 "%-64s %10d %10d\n"
switch ( cmd ) {
case CLI_INIT :
e - > command = " stasis statistics show topics " ;
e - > usage =
" Usage: stasis statistics show topics \n "
" Shows a list of topics and their general statistics \n " ;
return NULL ;
case CLI_GENERATE :
return NULL ;
}
if ( a - > argc ! = e - > args ) {
return CLI_SHOWUSAGE ;
}
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Topic " , " Dropped " , " Dispatched " , " Lowest Dispatch " , " Highest Dispatch " ) ;
iter = ao2_iterator_init ( topic_statistics , 0 ) ;
while ( ( statistics = ao2_iterator_next ( & iter ) ) ) {
ast_cli ( a - > fd , FMT_FIELDS , statistics - > name , statistics - > messages_not_dispatched , statistics - > messages_dispatched ,
statistics - > lowest_time_dispatched , statistics - > highest_time_dispatched ) ;
not_dispatched + = statistics - > messages_not_dispatched ;
dispatched + = statistics - > messages_dispatched ;
ao2_ref ( statistics , - 1 ) ;
+ + count ;
}
ao2_iterator_destroy ( & iter ) ;
ast_cli ( a - > fd , FMT_FIELDS2 , " Total " , not_dispatched , dispatched ) ;
ast_cli ( a - > fd , " \n %d topics \n \n " , count ) ;
# undef FMT_HEADERS
# undef FMT_FIELDS
# undef FMT_FIELDS2
return CLI_SUCCESS ;
}
/*!
* \ internal
* \ brief CLI tab completion for topic statistics names
*/
static char * topic_statistics_complete_name ( const char * word , int state )
{
struct stasis_topic_statistics * statistics ;
struct ao2_iterator it_statistics ;
int wordlen = strlen ( word ) ;
int which = 0 ;
char * result = NULL ;
it_statistics = ao2_iterator_init ( topic_statistics , 0 ) ;
while ( ( statistics = ao2_iterator_next ( & it_statistics ) ) ) {
if ( ! strncasecmp ( word , statistics - > name , wordlen )
& & + + which > state ) {
result = ast_strdup ( statistics - > name ) ;
}
ao2_ref ( statistics , - 1 ) ;
if ( result ) {
break ;
}
}
ao2_iterator_destroy ( & it_statistics ) ;
return result ;
}
/*!
* \ internal
* \ brief CLI command implementation for ' stasis statistics show topic '
*/
static char * statistics_show_topic ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct stasis_topic_statistics * statistics ;
switch ( cmd ) {
case CLI_INIT :
e - > command = " stasis statistics show topic " ;
e - > usage =
" Usage: stasis statistics show topic <name> \n "
" Show stasis topic statistics. \n " ;
return NULL ;
case CLI_GENERATE :
if ( a - > pos = = 4 ) {
return topic_statistics_complete_name ( a - > word , a - > n ) ;
} else {
return NULL ;
}
}
if ( a - > argc ! = 5 ) {
return CLI_SHOWUSAGE ;
}
statistics = ao2_find ( topic_statistics , a - > argv [ 4 ] , OBJ_SEARCH_KEY ) ;
if ( ! statistics ) {
ast_cli ( a - > fd , " Specified topic '%s' does not exist \n " , a - > argv [ 4 ] ) ;
return CLI_FAILURE ;
}
ast_cli ( a - > fd , " Topic: %s \n " , statistics - > name ) ;
ast_cli ( a - > fd , " Number of messages published that went to no subscriber: %d \n " , statistics - > messages_not_dispatched ) ;
ast_cli ( a - > fd , " Number of messages that went to at least one subscriber: %d \n " , statistics - > messages_dispatched ) ;
ast_cli ( a - > fd , " Lowest amount of time (in milliseconds) spent dispatching message: %ld \n " , statistics - > lowest_time_dispatched ) ;
ast_cli ( a - > fd , " Highest amount of time (in milliseconds) spent dispatching messages: %ld \n " , statistics - > highest_time_dispatched ) ;
ast_cli ( a - > fd , " Number of subscribers: %d \n " , statistics - > subscriber_count ) ;
ao2_ref ( statistics , - 1 ) ;
return CLI_SUCCESS ;
}
/*!
* \ internal
* \ brief CLI command implementation for ' stasis statistics show messages '
*/
static char * statistics_show_messages ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
int i ;
int count = 0 ;
int published = 0 ;
int unused = 0 ;
# define FMT_HEADERS "%-64s %10s %10s\n"
# define FMT_FIELDS "%-64s %10d %10d\n"
switch ( cmd ) {
case CLI_INIT :
e - > command = " stasis statistics show messages " ;
e - > usage =
" Usage: stasis statistics show messages \n "
" Shows a list of message types and their general statistics \n " ;
return NULL ;
case CLI_GENERATE :
return NULL ;
}
if ( a - > argc ! = e - > args ) {
return CLI_SHOWUSAGE ;
}
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Message Type " , " Published " , " Unused " ) ;
ast_mutex_lock ( & message_type_statistics_lock ) ;
for ( i = 0 ; i < AST_VECTOR_SIZE ( & message_type_statistics ) ; + + i ) {
struct stasis_message_type_statistics * statistics = AST_VECTOR_GET_ADDR ( & message_type_statistics , i ) ;
if ( ! statistics - > message_type ) {
continue ;
}
ast_cli ( a - > fd , FMT_FIELDS , stasis_message_type_name ( statistics - > message_type ) , statistics - > published ,
statistics - > unused ) ;
published + = statistics - > published ;
unused + = statistics - > unused ;
+ + count ;
}
ast_mutex_unlock ( & message_type_statistics_lock ) ;
ast_cli ( a - > fd , FMT_FIELDS , " Total " , published , unused ) ;
ast_cli ( a - > fd , " \n %d seen message types \n \n " , count ) ;
# undef FMT_HEADERS
# undef FMT_FIELDS
return CLI_SUCCESS ;
}
static struct ast_cli_entry cli_stasis_statistics [ ] = {
AST_CLI_DEFINE ( statistics_show_subscriptions , " Show subscriptions with general statistics " ) ,
AST_CLI_DEFINE ( statistics_show_subscription , " Show subscription statistics " ) ,
AST_CLI_DEFINE ( statistics_show_topics , " Show topics with general statistics " ) ,
AST_CLI_DEFINE ( statistics_show_topic , " Show topic statistics " ) ,
AST_CLI_DEFINE ( statistics_show_messages , " Show message types with general statistics " ) ,
} ;
static int subscription_statistics_hash ( const void * obj , const int flags )
{
const struct stasis_subscription_statistics * object ;
const char * key ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_KEY :
key = obj ;
break ;
case OBJ_SEARCH_OBJECT :
object = obj ;
key = object - > uniqueid ;
break ;
default :
/* Hash can only work on something with a full key. */
ast_assert ( 0 ) ;
return 0 ;
}
return ast_str_case_hash ( key ) ;
}
static int subscription_statistics_cmp ( void * obj , void * arg , int flags )
{
const struct stasis_subscription_statistics * object_left = obj ;
const struct stasis_subscription_statistics * object_right = arg ;
const char * right_key = arg ;
int cmp ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_OBJECT :
right_key = object_right - > uniqueid ;
/* Fall through */
case OBJ_SEARCH_KEY :
cmp = strcasecmp ( object_left - > uniqueid , right_key ) ;
break ;
case OBJ_SEARCH_PARTIAL_KEY :
/* Not supported by container */
ast_assert ( 0 ) ;
cmp = - 1 ;
break ;
default :
/*
* What arg points to is specific to this traversal callback
* and has no special meaning to astobj2 .
*/
cmp = 0 ;
break ;
}
if ( cmp ) {
return 0 ;
}
/*
* At this point the traversal callback is identical to a sorted
* container .
*/
return CMP_MATCH ;
}
static int topic_statistics_hash ( const void * obj , const int flags )
{
const struct stasis_topic_statistics * object ;
const char * key ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_KEY :
key = obj ;
break ;
case OBJ_SEARCH_OBJECT :
object = obj ;
key = object - > name ;
break ;
default :
/* Hash can only work on something with a full key. */
ast_assert ( 0 ) ;
return 0 ;
}
return ast_str_case_hash ( key ) ;
}
static int topic_statistics_cmp ( void * obj , void * arg , int flags )
{
const struct stasis_topic_statistics * object_left = obj ;
const struct stasis_topic_statistics * object_right = arg ;
const char * right_key = arg ;
int cmp ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_OBJECT :
right_key = object_right - > name ;
/* Fall through */
case OBJ_SEARCH_KEY :
cmp = strcasecmp ( object_left - > name , right_key ) ;
break ;
case OBJ_SEARCH_PARTIAL_KEY :
/* Not supported by container */
ast_assert ( 0 ) ;
cmp = - 1 ;
break ;
default :
/*
* What arg points to is specific to this traversal callback
* and has no special meaning to astobj2 .
*/
cmp = 0 ;
break ;
}
if ( cmp ) {
return 0 ;
}
/*
* At this point the traversal callback is identical to a sorted
* container .
*/
return CMP_MATCH ;
}
# endif
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup ( void )
{
# ifdef AST_DEVMODE
ast_cli_unregister_multiple ( cli_stasis_statistics , ARRAY_LEN ( cli_stasis_statistics ) ) ;
AST_VECTOR_FREE ( & message_type_statistics ) ;
ao2_cleanup ( subscription_statistics ) ;
ao2_cleanup ( topic_statistics ) ;
# endif
ast_threadpool_shutdown ( pool ) ;
pool = NULL ;
STASIS_MESSAGE_TYPE_CLEANUP ( stasis_subscription_change_type ) ;
@ -1902,5 +2641,28 @@ int stasis_init(void)
return - 1 ;
}
# ifdef AST_DEVMODE
/* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
* topic or subscripton .
*/
subscription_statistics = ao2_container_alloc_hash ( AO2_ALLOC_OPT_LOCK_MUTEX , 0 , SUBSCRIPTION_STATISTICS_BUCKETS ,
subscription_statistics_hash , 0 , subscription_statistics_cmp ) ;
if ( ! subscription_statistics ) {
return - 1 ;
}
topic_statistics = ao2_container_alloc_hash ( AO2_ALLOC_OPT_LOCK_MUTEX , 0 , TOPIC_STATISTICS_BUCKETS ,
topic_statistics_hash , 0 , topic_statistics_cmp ) ;
if ( ! topic_statistics ) {
return - 1 ;
}
AST_VECTOR_INIT ( & message_type_statistics , 0 ) ;
if ( ast_cli_register_multiple ( cli_stasis_statistics , ARRAY_LEN ( cli_stasis_statistics ) ) ) {
return - 1 ;
}
# endif
return 0 ;
}