@ -349,6 +349,8 @@ struct stasis_topic_statistics {
int messages_dispatched ;
/*! \brief The ids of the subscribers to this topic */
struct ao2_container * subscribers ;
/*! \brief Pointer to the topic (NOT refcounted, and must NOT be accessed) */
struct stasis_topic * topic ;
/*! \brief Name of the topic */
char name [ 0 ] ;
} ;
@ -366,6 +368,9 @@ struct stasis_topic {
struct stasis_topic_statistics * statistics ;
# endif
/*! Unique incrementing integer for subscriber ids */
int subscriber_id ;
/*! Name of the topic */
char name [ 0 ] ;
} ;
@ -412,11 +417,11 @@ static void topic_statistics_destroy(void *obj)
ao2_cleanup ( statistics - > subscribers ) ;
}
static struct stasis_topic_statistics * stasis_topic_statistics_create ( const char * name )
static struct stasis_topic_statistics * stasis_topic_statistics_create ( struct stasis_topic * topic )
{
struct stasis_topic_statistics * statistics ;
statistics = ao2_alloc ( sizeof ( * statistics ) + strlen ( name) + 1 , topic_statistics_destroy ) ;
statistics = ao2_alloc ( sizeof ( * statistics ) + strlen ( topic- > name) + 1 , topic_statistics_destroy ) ;
if ( ! statistics ) {
return NULL ;
}
@ -427,7 +432,9 @@ static struct stasis_topic_statistics *stasis_topic_statistics_create(const char
return NULL ;
}
strcpy ( statistics - > name , name ) ; /* SAFE */
/* This is strictly used for the pointer address when showing the topic */
statistics - > topic = topic ;
strcpy ( statistics - > name , topic - > name ) ; /* SAFE */
ao2_link ( topic_statistics , statistics ) ;
return statistics ;
@ -448,7 +455,7 @@ struct stasis_topic *stasis_topic_create(const char *name)
res | = AST_VECTOR_INIT ( & topic - > subscribers , INITIAL_SUBSCRIBERS_MAX ) ;
res | = AST_VECTOR_INIT ( & topic - > upstream_topics , 0 ) ;
# ifdef AST_DEVMODE
topic - > statistics = stasis_topic_statistics_create ( name ) ;
topic - > statistics = stasis_topic_statistics_create ( topic ) ;
if ( ! topic - > name | | ! topic - > statistics | | res )
# else
if ( ! topic - > name | | res )
@ -477,8 +484,8 @@ struct stasis_subscription_statistics {
const char * file ;
/*! \brief The function where the subscription originates */
const char * func ;
/*! \brief Name of the topic we subscribed to */
char * topic ;
/*! \brief Name s of the topics we ar e subscribed to */
struct ao2_container * topics ;
/*! \brief The message type that currently took the longest to process */
struct stasis_message_type * highest_time_message_type ;
/*! \brief Highest time spent invoking a message */
@ -495,6 +502,8 @@ struct stasis_subscription_statistics {
int uses_threadpool ;
/*! \brief The line number where the subscription originates */
int lineno ;
/*! \brief Pointer to the subscription (NOT refcounted, and must NOT be accessed) */
struct stasis_subscription * sub ;
/*! \brief Unique ID of the subscription */
char uniqueid [ 0 ] ;
} ;
@ -503,7 +512,7 @@ struct stasis_subscription_statistics {
/*! \internal */
struct stasis_subscription {
/*! Unique ID for this subscription */
char uniqueid [ AST_UUID_STR_LEN ] ;
char * uniqueid ;
/*! Topic subscribed to. */
struct stasis_topic * topic ;
/*! Mailbox for processing incoming messages. */
@ -546,6 +555,7 @@ static void subscription_dtor(void *obj)
* be bad . */
ast_assert ( stasis_subscription_is_done ( sub ) ) ;
ast_free ( sub - > uniqueid ) ;
ao2_cleanup ( sub - > topic ) ;
sub - > topic = NULL ;
ast_taskprocessor_unreference ( sub - > mailbox ) ;
@ -628,26 +638,37 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st
}
# ifdef AST_DEVMODE
static struct stasis_subscription_statistics * stasis_subscription_statistics_create ( const char * uniqueid ,
const char * topic , int needs_mailbox , int use_thread_pool , const char * file , int lineno ,
static void subscription_statistics_destroy ( void * obj )
{
struct stasis_subscription_statistics * statistics = obj ;
ao2_cleanup ( statistics - > topics ) ;
}
static struct stasis_subscription_statistics * stasis_subscription_statistics_create ( struct stasis_subscription * sub ,
int needs_mailbox , int use_thread_pool , const char * file , int lineno ,
const char * func )
{
struct stasis_subscription_statistics * statistics ;
size_t uniqueid_len = strlen ( uniqueid ) + 1 ;
statistics = ao2_alloc ( sizeof ( * statistics ) + uniqueid_len + strlen ( topic ) + 1 , NULL ) ;
statistics = ao2_alloc ( sizeof ( * statistics ) + strlen ( sub - > uniqueid ) + 1 , subscription_statistics_destroy ) ;
if ( ! statistics ) {
return NULL ;
}
statistics - > topics = ast_str_container_alloc ( 1 ) ;
if ( ! statistics - > topics ) {
ao2_ref ( statistics , - 1 ) ;
return NULL ;
}
statistics - > file = file ;
statistics - > lineno = lineno ;
statistics - > func = func ;
statistics - > uses_mailbox = needs_mailbox ;
statistics - > uses_threadpool = use_thread_pool ;
strcpy ( statistics - > uniqueid , uniqueid ) ; /* SAFE */
statistics - > topic = statistics - > uniqueid + uniqueid_len ;
strcpy ( statistics - > topic , topic ) ; /* SAFE */
strcpy ( statistics - > uniqueid , sub - > uniqueid ) ; /* SAFE */
statistics - > sub = sub ;
ao2_link ( subscription_statistics , statistics ) ;
return statistics ;
@ -665,6 +686,7 @@ struct stasis_subscription *internal_stasis_subscribe(
const char * func )
{
struct stasis_subscription * sub ;
int ret ;
if ( ! topic ) {
return NULL ;
@ -675,12 +697,17 @@ struct stasis_subscription *internal_stasis_subscribe(
if ( ! sub ) {
return NULL ;
}
ast_uuid_generate_str ( sub - > uniqueid , sizeof ( sub - > uniqueid ) ) ;
# ifdef AST_DEVMODE
sub - > statistics = stasis_subscription_statistics_create ( sub - > uniqueid , topic - > name , needs_mailbox ,
use_thread_pool , file , lineno , func ) ;
if ( ! sub - > statistics ) {
ret = ast_asprintf ( & sub - > uniqueid , " %s:%s-%d " , file , stasis_topic_name ( topic ) , ast_atomic_fetchadd_int ( & topic - > subscriber_id , + 1 ) ) ;
sub - > statistics = stasis_subscription_statistics_create ( sub , needs_mailbox , use_thread_pool , file , lineno , func ) ;
if ( ret < 0 | | ! sub - > statistics ) {
ao2_ref ( sub , - 1 ) ;
return NULL ;
}
# else
ret = ast_asprintf ( & sub - > uniqueid , " %s-%d " , stasis_topic_name ( topic ) , ast_atomic_fetchadd_int ( & topic - > subscriber_id , + 1 ) ) ;
if ( ret < 0 ) {
ao2_ref ( sub , - 1 ) ;
return NULL ;
}
@ -1012,6 +1039,7 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
# ifdef AST_DEVMODE
ast_str_container_add ( topic - > statistics - > subscribers , stasis_subscription_uniqueid ( sub ) ) ;
ast_str_container_add ( sub - > statistics - > topics , stasis_topic_name ( topic ) ) ;
# endif
ao2_unlock ( topic ) ;
@ -1035,6 +1063,7 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
# ifdef AST_DEVMODE
if ( ! res ) {
ast_str_container_remove ( topic - > statistics - > subscribers , stasis_subscription_uniqueid ( sub ) ) ;
ast_str_container_remove ( sub - > statistics - > topics , stasis_topic_name ( topic ) ) ;
}
# endif
@ -1498,6 +1527,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
struct topic_pool_entry {
struct stasis_forward * forward ;
struct stasis_topic * topic ;
char name [ 0 ] ;
} ;
static void topic_pool_entry_dtor ( void * obj )
@ -1509,10 +1539,19 @@ static void topic_pool_entry_dtor(void *obj)
entry - > topic = NULL ;
}
static struct topic_pool_entry * topic_pool_entry_alloc ( void )
static struct topic_pool_entry * topic_pool_entry_alloc ( const char * topic_name )
{
return ao2_alloc_options ( sizeof ( struct topic_pool_entry ) , topic_pool_entry_dtor ,
AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
struct topic_pool_entry * topic_pool_entry ;
topic_pool_entry = ao2_alloc_options ( sizeof ( * topic_pool_entry ) + strlen ( topic_name ) + 1 ,
topic_pool_entry_dtor , AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
if ( ! topic_pool_entry ) {
return NULL ;
}
strcpy ( topic_pool_entry - > name , topic_name ) ; /* Safe */
return topic_pool_entry ;
}
struct stasis_topic_pool {
@ -1550,7 +1589,7 @@ static int topic_pool_entry_hash(const void *obj, const int flags)
break ;
case OBJ_SEARCH_OBJECT :
object = obj ;
key = stasis_topic_name ( object - > topic ) ;
key = object - > name ;
break ;
default :
/* Hash can only work on something with a full key. */
@ -1569,10 +1608,10 @@ static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_OBJECT :
right_key = stasis_topic_name ( object_right - > topic ) ;
right_key = object_right - > name ;
/* Fall through */
case OBJ_SEARCH_KEY :
cmp = strcasecmp ( stasis_topic_name ( object_left - > topic ) , right_key ) ;
cmp = strcasecmp ( object_left - > name , right_key ) ;
break ;
case OBJ_SEARCH_PARTIAL_KEY :
/* Not supported by container */
@ -1649,18 +1688,29 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
{
RAII_VAR ( struct topic_pool_entry * , topic_pool_entry , NULL , ao2_cleanup ) ;
SCOPED_AO2LOCK ( topic_container_lock , pool - > pool_container ) ;
char * new_topic_name ;
int ret ;
topic_pool_entry = ao2_find ( pool - > pool_container , topic_name , OBJ_SEARCH_KEY | OBJ_NOLOCK ) ;
if ( topic_pool_entry ) {
return topic_pool_entry - > topic ;
}
topic_pool_entry = topic_pool_entry_alloc ( ) ;
topic_pool_entry = topic_pool_entry_alloc ( topic_name ) ;
if ( ! topic_pool_entry ) {
return NULL ;
}
topic_pool_entry - > topic = stasis_topic_create ( topic_name ) ;
/* To provide further detail and to ensure that the topic is unique within the scope of the
* system we prefix it with the pooling topic name , which should itself already be unique .
*/
ret = ast_asprintf ( & new_topic_name , " %s/%s " , stasis_topic_name ( pool - > pool_topic ) , topic_name ) ;
if ( ret < 0 ) {
return NULL ;
}
topic_pool_entry - > topic = stasis_topic_create ( new_topic_name ) ;
ast_free ( new_topic_name ) ;
if ( ! topic_pool_entry - > topic ) {
return NULL ;
}
@ -2082,12 +2132,15 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
# ifdef AST_DEVMODE
AO2_STRING_FIELD_SORT_FN ( stasis_subscription_statistics , uniqueid ) ;
/*!
* \ internal
* \ brief CLI command implementation for ' stasis statistics show subscriptions '
*/
static char * statistics_show_subscriptions ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct ao2_container * sorted_subscriptions ;
struct ao2_iterator iter ;
struct stasis_subscription_statistics * statistics ;
int count = 0 ;
@ -2112,9 +2165,22 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str
return CLI_SHOWUSAGE ;
}
sorted_subscriptions = ao2_container_alloc_rbtree ( AO2_ALLOC_OPT_LOCK_NOLOCK , 0 ,
stasis_subscription_statistics_sort_fn , NULL ) ;
if ( ! sorted_subscriptions ) {
ast_cli ( a - > fd , " Could not create container for sorting subscription statistics \n " ) ;
return CLI_SUCCESS ;
}
if ( ao2_container_dup ( sorted_subscriptions , subscription_statistics , 0 ) ) {
ao2_ref ( sorted_subscriptions , - 1 ) ;
ast_cli ( a - > fd , " Could not sort subscription statistics \n " ) ;
return CLI_SUCCESS ;
}
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Subscription " , " Dropped " , " Passed " , " Lowest Invoke " , " Highest Invoke " ) ;
iter = ao2_iterator_init ( subscription_statistics , 0 ) ;
iter = ao2_iterator_init ( s orted_s ubscriptions, 0 ) ;
while ( ( statistics = ao2_iterator_next ( & iter ) ) ) {
ast_cli ( a - > fd , FMT_FIELDS , statistics - > uniqueid , statistics - > messages_dropped , statistics - > messages_passed ,
statistics - > lowest_time_invoked , statistics - > highest_time_invoked ) ;
@ -2125,6 +2191,8 @@ static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, str
}
ao2_iterator_destroy ( & iter ) ;
ao2_ref ( sorted_subscriptions , - 1 ) ;
ast_cli ( a - > fd , FMT_FIELDS2 , " Total " , dropped , passed ) ;
ast_cli ( a - > fd , " \n %d subscriptions \n \n " , count ) ;
@ -2169,6 +2237,8 @@ static char *subscription_statistics_complete_name(const char *word, int state)
static char * statistics_show_subscription ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct stasis_subscription_statistics * statistics ;
struct ao2_iterator i ;
char * name ;
switch ( cmd ) {
case CLI_INIT :
@ -2196,7 +2266,7 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
}
ast_cli ( a - > fd , " Subscription: %s \n " , statistics - > uniqueid ) ;
ast_cli ( a - > fd , " Topic: %s\n " , statistics - > topic ) ;
ast_cli ( a - > fd , " Pointer Address: %p\n " , statistics - > sub ) ;
ast_cli ( a - > fd , " Source filename: %s \n " , S_OR ( statistics - > file , " <unavailable> " ) ) ;
ast_cli ( a - > fd , " Source line number: %d \n " , statistics - > lineno ) ;
ast_cli ( a - > fd , " Source function: %s \n " , S_OR ( statistics - > func , " <unavailable> " ) ) ;
@ -2213,25 +2283,38 @@ static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, stru
}
ao2_unlock ( statistics ) ;
ast_cli ( a - > fd , " Number of topics: %d \n " , ao2_container_count ( statistics - > topics ) ) ;
ast_cli ( a - > fd , " Subscribed topics: \n " ) ;
i = ao2_iterator_init ( statistics - > topics , 0 ) ;
while ( ( name = ao2_iterator_next ( & i ) ) ) {
ast_cli ( a - > fd , " \t %s \n " , name ) ;
ao2_ref ( name , - 1 ) ;
}
ao2_iterator_destroy ( & i ) ;
ao2_ref ( statistics , - 1 ) ;
return CLI_SUCCESS ;
}
AO2_STRING_FIELD_SORT_FN ( stasis_topic_statistics , name ) ;
/*!
* \ internal
* \ brief CLI command implementation for ' stasis statistics show topics '
*/
static char * statistics_show_topics ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct ao2_container * sorted_topics ;
struct ao2_iterator iter ;
struct stasis_topic_statistics * statistics ;
int count = 0 ;
int not_dispatched = 0 ;
int dispatched = 0 ;
# define FMT_HEADERS "%-64s %10s %10s %1 6s %16s\n"
# define FMT_FIELDS "%-64s %10d %10d %1 6ld %16ld\n"
# define FMT_FIELDS2 "%-64s %10 d %10d\n"
# define FMT_HEADERS "%-64s %10s %10s %1 0s %1 6s %16s\n"
# define FMT_FIELDS "%-64s %10d %10d %1 0d %1 6ld %16ld\n"
# define FMT_FIELDS2 "%-64s %10 s %10 d %10d\n"
switch ( cmd ) {
case CLI_INIT :
@ -2248,11 +2331,25 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast
return CLI_SHOWUSAGE ;
}
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Topic " , " Dropped " , " Dispatched " , " Lowest Dispatch " , " Highest Dispatch " ) ;
sorted_topics = ao2_container_alloc_rbtree ( AO2_ALLOC_OPT_LOCK_NOLOCK , 0 ,
stasis_topic_statistics_sort_fn , NULL ) ;
if ( ! sorted_topics ) {
ast_cli ( a - > fd , " Could not create container for sorting topic statistics \n " ) ;
return CLI_SUCCESS ;
}
if ( ao2_container_dup ( sorted_topics , topic_statistics , 0 ) ) {
ao2_ref ( sorted_topics , - 1 ) ;
ast_cli ( a - > fd , " Could not sort topic statistics \n " ) ;
return CLI_SUCCESS ;
}
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Topic " , " Subscribers " , " Dropped " , " Dispatched " , " Lowest Dispatch " , " Highest Dispatch " ) ;
iter = ao2_iterator_init ( topic_statistics , 0 ) ;
iter = ao2_iterator_init ( sorted_ topics, 0 ) ;
while ( ( statistics = ao2_iterator_next ( & iter ) ) ) {
ast_cli ( a - > fd , FMT_FIELDS , statistics - > name , statistics - > messages_not_dispatched , statistics - > messages_dispatched ,
ast_cli ( a - > fd , FMT_FIELDS , statistics - > name , ao2_container_count ( statistics - > subscribers ) ,
statistics - > messages_not_dispatched , statistics - > messages_dispatched ,
statistics - > lowest_time_dispatched , statistics - > highest_time_dispatched ) ;
not_dispatched + = statistics - > messages_not_dispatched ;
dispatched + = statistics - > messages_dispatched ;
@ -2261,7 +2358,9 @@ static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast
}
ao2_iterator_destroy ( & iter ) ;
ast_cli ( a - > fd , FMT_FIELDS2 , " Total " , not_dispatched , dispatched ) ;
ao2_ref ( sorted_topics , - 1 ) ;
ast_cli ( a - > fd , FMT_FIELDS2 , " Total " , " " , not_dispatched , dispatched ) ;
ast_cli ( a - > fd , " \n %d topics \n \n " , count ) ;
# undef FMT_HEADERS
@ -2334,6 +2433,7 @@ static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_
}
ast_cli ( a - > fd , " Topic: %s \n " , statistics - > name ) ;
ast_cli ( a - > fd , " Pointer Address: %p \n " , statistics - > topic ) ;
ast_cli ( a - > fd , " Number of messages published that went to no subscriber: %d \n " , statistics - > messages_not_dispatched ) ;
ast_cli ( a - > fd , " Number of messages that went to at least one subscriber: %d \n " , statistics - > messages_dispatched ) ;
ast_cli ( a - > fd , " Lowest amount of time (in milliseconds) spent dispatching message: %ld \n " , statistics - > lowest_time_dispatched ) ;