@ -41,9 +41,7 @@
# include "asterisk/stasis_bridges.h"
# include "asterisk/stasis_endpoints.h"
# include "asterisk/config_options.h"
# ifdef AST_DEVMODE
# include "asterisk/cli.h"
# endif
/*** DOCUMENTATION
< managerEvent language = " en_US " name = " UserEvent " >
@ -307,6 +305,16 @@ static struct ast_threadpool *pool;
STASIS_MESSAGE_TYPE_DEFN ( stasis_subscription_change_type ) ;
# if defined(LOW_MEMORY)
# define TOPIC_ALL_BUCKETS 257
# else
# define TOPIC_ALL_BUCKETS 997
# endif
# ifdef AST_DEVMODE
/*! The number of buckets to use for topic statistics */
@ -372,9 +380,37 @@ struct stasis_topic {
int subscriber_id ;
/*! Name of the topic */
char name [ 0 ] ;
char * name ;
/*! Detail of the topic */
char * detail ;
/*! Creation time */
struct timeval * creationtime ;
} ;
struct ao2_container * topic_all ;
struct topic_proxy {
AO2_WEAKPROXY ( ) ;
char * name ;
char * detail ;
struct timeval creationtime ;
char buf [ 0 ] ;
} ;
AO2_STRING_FIELD_HASH_FN ( topic_proxy , name ) ;
AO2_STRING_FIELD_CMP_FN ( topic_proxy , name ) ;
AO2_STRING_FIELD_CASE_SORT_FN ( topic_proxy , name ) ;
static void proxy_dtor ( void * weakproxy , void * data )
{
ao2_unlink ( topic_all , weakproxy ) ;
}
/* Forward declarations for the tightly-coupled subscription object */
static int topic_add_subscription ( struct stasis_topic * topic ,
struct stasis_subscription * sub ) ;
@ -394,6 +430,9 @@ static void topic_dtor(void *obj)
{
struct stasis_topic * topic = obj ;
ast_debug ( 2 , " Destroying topic. name: %s, detail: %s \n " ,
topic - > name , topic - > detail ) ;
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here . */
ast_assert ( AST_VECTOR_SIZE ( & topic - > subscribers ) = = 0 ) ;
@ -442,40 +481,145 @@ static struct stasis_topic_statistics *stasis_topic_statistics_create(struct sta
}
# endif
struct stasis_topic * stasis_topic_create ( const char * name )
static int link_topic_proxy ( struct stasis_topic * topic , const char * name , const char * detail )
{
struct topic_proxy * proxy ;
struct stasis_topic * topic_tmp ;
if ( ! topic | | ! name | | ! strlen ( name ) | | ! detail ) {
return - 1 ;
}
ao2_wrlock ( topic_all ) ;
topic_tmp = stasis_topic_get ( name ) ;
if ( topic_tmp ) {
ast_log ( LOG_ERROR , " The same topic is already exist. name: %s \n " , name ) ;
ao2_ref ( topic_tmp , - 1 ) ;
ao2_unlock ( topic_all ) ;
return - 1 ;
}
proxy = ao2_t_weakproxy_alloc (
sizeof ( * proxy ) + strlen ( name ) + 1 + strlen ( detail ) + 1 , NULL , topic - > name ) ;
if ( ! proxy ) {
ao2_unlock ( topic_all ) ;
return - 1 ;
}
/* set the proxy info */
proxy - > name = proxy - > buf ;
proxy - > detail = proxy - > name + strlen ( name ) + 1 ;
strcpy ( proxy - > name , name ) ; /* SAFE */
strcpy ( proxy - > detail , detail ) ; /* SAFE */
proxy - > creationtime = ast_tvnow ( ) ;
/* We have exclusive access to proxy, no need for locking here. */
if ( ao2_t_weakproxy_set_object ( proxy , topic , OBJ_NOLOCK , " weakproxy link " ) ) {
ao2_cleanup ( proxy ) ;
ao2_unlock ( topic_all ) ;
return - 1 ;
}
if ( ao2_weakproxy_subscribe ( proxy , proxy_dtor , NULL , OBJ_NOLOCK ) ) {
ao2_cleanup ( proxy ) ;
ao2_unlock ( topic_all ) ;
return - 1 ;
}
/* setting the topic point to the proxy */
topic - > name = proxy - > name ;
topic - > detail = proxy - > detail ;
topic - > creationtime = & ( proxy - > creationtime ) ;
ao2_link_flags ( topic_all , proxy , OBJ_NOLOCK ) ;
ao2_ref ( proxy , - 1 ) ;
ao2_unlock ( topic_all ) ;
return 0 ;
}
struct stasis_topic * stasis_topic_create_with_detail (
const char * name , const char * detail
)
{
struct stasis_topic * topic ;
int res = 0 ;
topic = ao2_t_alloc ( sizeof ( * topic ) + strlen ( name ) + 1 , topic_dtor , name ) ;
if ( ! name | | ! strlen ( name ) | | ! detail ) {
return NULL ;
}
ast_debug ( 2 , " Creating topic. name: %s, detail: %s \n " , name , detail ) ;
topic = stasis_topic_get ( name ) ;
if ( topic ) {
ast_debug ( 2 , " Topic is already exist. name: %s, detail: %s \n " ,
name , detail ) ;
return topic ;
}
topic = ao2_t_alloc ( sizeof ( * topic ) , topic_dtor , name ) ;
if ( ! topic ) {
return NULL ;
}
strcpy ( topic - > name , name ) ; /* SAFE */
res | = AST_VECTOR_INIT ( & topic - > subscribers , INITIAL_SUBSCRIBERS_MAX ) ;
res | = AST_VECTOR_INIT ( & topic - > upstream_topics , 0 ) ;
ast_debug ( 1 , " Topic '%s': %p created \n " , topic - > name , topic ) ;
if ( res ) {
ao2_ref ( topic , - 1 ) ;
return NULL ;
}
/* link to the proxy */
if ( link_topic_proxy ( topic , name , detail ) ) {
ao2_ref ( topic , - 1 ) ;
return NULL ;
}
# ifdef AST_DEVMODE
topic - > statistics = stasis_topic_statistics_create ( topic ) ;
if ( ! topic - > name | | ! topic - > statistics | | res )
# else
if ( ! topic - > name | | res )
# endif
{
if ( ! topic - > statistics ) {
ao2_ref ( topic , - 1 ) ;
return NULL ;
}
# endif
ast_debug ( 1 , " Topic '%s': %p created \n " , topic - > name , topic ) ;
return topic ;
}
struct stasis_topic * stasis_topic_create ( const char * name )
{
return stasis_topic_create_with_detail ( name , " " ) ;
}
struct stasis_topic * stasis_topic_get ( const char * name )
{
return ao2_weakproxy_find ( topic_all , name , OBJ_SEARCH_KEY , " " ) ;
}
const char * stasis_topic_name ( const struct stasis_topic * topic )
{
if ( ! topic ) {
return NULL ;
}
return topic - > name ;
}
const char * stasis_topic_detail ( const struct stasis_topic * topic )
{
if ( ! topic ) {
return NULL ;
}
return topic - > detail ;
}
size_t stasis_topic_subscribers ( const struct stasis_topic * topic )
{
return AST_VECTOR_SIZE ( & topic - > subscribers ) ;
@ -2134,6 +2278,142 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
/*! @} */
/*!
* \ internal
* \ brief CLI command implementation for ' stasis show topics '
*/
static char * stasis_show_topics ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct ao2_iterator iter ;
struct topic_proxy * topic ;
struct ao2_container * tmp_container ;
int count = 0 ;
# define FMT_HEADERS "%-64s %-64s\n"
# define FMT_FIELDS "%-64s %-64s\n"
switch ( cmd ) {
case CLI_INIT :
e - > command = " stasis show topics " ;
e - > usage =
" Usage: stasis show topics \n "
" Shows a list of topics \n " ;
return NULL ;
case CLI_GENERATE :
return NULL ;
}
if ( a - > argc ! = e - > args ) {
return CLI_SHOWUSAGE ;
}
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Name " , " Detail " ) ;
tmp_container = ao2_container_alloc_list ( AO2_ALLOC_OPT_LOCK_NOLOCK , 0 ,
topic_proxy_sort_fn , NULL ) ;
if ( ! tmp_container | | ao2_container_dup ( tmp_container , topic_all , OBJ_SEARCH_OBJECT ) ) {
ao2_cleanup ( tmp_container ) ;
return NULL ;
}
/* getting all topic in order */
iter = ao2_iterator_init ( tmp_container , AO2_ITERATOR_UNLINK ) ;
while ( ( topic = ao2_iterator_next ( & iter ) ) ) {
ast_cli ( a - > fd , FMT_FIELDS , topic - > name , topic - > detail ) ;
ao2_ref ( topic , - 1 ) ;
+ + count ;
}
ao2_iterator_destroy ( & iter ) ;
ao2_cleanup ( tmp_container ) ;
ast_cli ( a - > fd , " \n %d Total topics \n \n " , count ) ;
# undef FMT_HEADERS
# undef FMT_FIELDS
return CLI_SUCCESS ;
}
/*!
* \ internal
* \ brief CLI tab completion for topic names
*/
static char * topic_complete_name ( const char * word )
{
struct topic_proxy * topic ;
struct ao2_iterator it ;
int wordlen = strlen ( word ) ;
int ret ;
it = ao2_iterator_init ( topic_all , 0 ) ;
while ( ( topic = ao2_iterator_next ( & it ) ) ) {
if ( ! strncasecmp ( word , topic - > name , wordlen ) ) {
ret = ast_cli_completion_add ( ast_strdup ( topic - > name ) ) ;
if ( ret ) {
ao2_ref ( topic , - 1 ) ;
break ;
}
}
ao2_ref ( topic , - 1 ) ;
}
ao2_iterator_destroy ( & it ) ;
return NULL ;
}
/*!
* \ internal
* \ brief CLI command implementation for ' stasis show topic '
*/
static char * stasis_show_topic ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a )
{
struct stasis_topic * topic ;
char print_time [ 32 ] ;
switch ( cmd ) {
case CLI_INIT :
e - > command = " stasis show topic " ;
e - > usage =
" Usage: stasis show topic <name> \n "
" Show stasis topic detail info. \n " ;
return NULL ;
case CLI_GENERATE :
if ( a - > pos = = 3 ) {
return topic_complete_name ( a - > word ) ;
} else {
return NULL ;
}
}
if ( a - > argc ! = 4 ) {
return CLI_SHOWUSAGE ;
}
topic = stasis_topic_get ( a - > argv [ 3 ] ) ;
if ( ! topic ) {
ast_cli ( a - > fd , " Specified topic '%s' does not exist \n " , a - > argv [ 3 ] ) ;
return CLI_FAILURE ;
}
ast_cli ( a - > fd , " Name: %s \n " , topic - > name ) ;
ast_cli ( a - > fd , " Detail: %s \n " , topic - > detail ) ;
ast_cli ( a - > fd , " Subscribers count: %lu \n " , AST_VECTOR_SIZE ( & topic - > subscribers ) ) ;
ast_cli ( a - > fd , " Forwarding topic count: %lu \n " , AST_VECTOR_SIZE ( & topic - > upstream_topics ) ) ;
ast_format_duration_hh_mm_ss ( ast_tvnow ( ) . tv_sec - topic - > creationtime - > tv_sec , print_time , sizeof ( print_time ) ) ;
ast_cli ( a - > fd , " Duration time: %s \n " , print_time ) ;
ao2_ref ( topic , - 1 ) ;
return CLI_SUCCESS ;
}
static struct ast_cli_entry cli_stasis [ ] = {
AST_CLI_DEFINE ( stasis_show_topics , " Show all topics " ) ,
AST_CLI_DEFINE ( stasis_show_topic , " Show topic " ) ,
} ;
# ifdef AST_DEVMODE
AO2_STRING_FIELD_SORT_FN ( stasis_subscription_statistics , uniqueid ) ;
@ -2646,6 +2926,9 @@ static void stasis_cleanup(void)
ao2_cleanup ( subscription_statistics ) ;
ao2_cleanup ( topic_statistics ) ;
# endif
ast_cli_unregister_multiple ( cli_stasis , ARRAY_LEN ( cli_stasis ) ) ;
ao2_cleanup ( topic_all ) ;
topic_all = NULL ;
ast_threadpool_shutdown ( pool ) ;
pool = NULL ;
STASIS_MESSAGE_TYPE_CLEANUP ( stasis_subscription_change_type ) ;
@ -2740,6 +3023,16 @@ int stasis_init(void)
return - 1 ;
}
topic_all = ao2_container_alloc_hash ( AO2_ALLOC_OPT_LOCK_MUTEX , 0 , TOPIC_ALL_BUCKETS ,
topic_proxy_hash_fn , 0 , topic_proxy_cmp_fn ) ;
if ( ! topic_all ) {
return - 1 ;
}
if ( ast_cli_register_multiple ( cli_stasis , ARRAY_LEN ( cli_stasis ) ) ) {
return - 1 ;
}
# ifdef AST_DEVMODE
/* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
* topic or subscripton .