@ -24,6 +24,18 @@
# include "asterisk/stasis_state.h"
/*!
* \ internal
* \ brief Used to link a stasis_state to it ' s manager
*/
struct stasis_state_proxy {
AO2_WEAKPROXY ( ) ;
/*! The manager that owns and handles this state */
struct stasis_state_manager * manager ;
/*! A unique id for this state object. */
char id [ 0 ] ;
} ;
/*!
* \ internal
* \ brief Associates a stasis topic to its last known published message
@ -38,7 +50,10 @@
struct stasis_state {
/*! The number of state subscribers */
unsigned int num_subscribers ;
/*! The manager that owns and handles this state */
/*!
* \ brief The manager that owns and handles this state
* \ note This reference is owned by stasis_state_proxy
*/
struct stasis_state_manager * manager ;
/*! Forwarding information, i.e. this topic to manager's topic */
struct stasis_forward * forward ;
@ -52,11 +67,11 @@ struct stasis_state {
*/
AST_VECTOR ( , struct ast_eid ) eids ;
/*! A unique id for this state object. */
char id [ 0 ] ;
char * id ;
} ;
AO2_STRING_FIELD_HASH_FN ( stasis_state , id ) ;
AO2_STRING_FIELD_CMP_FN ( stasis_state , id ) ;
AO2_STRING_FIELD_HASH_FN ( stasis_state _proxy , id ) ;
AO2_STRING_FIELD_CMP_FN ( stasis_state _proxy , id ) ;
/*! The number of buckets to use for managed states */
# define STATE_BUCKETS 57
@ -112,17 +127,28 @@ static void state_dtor(void *obj)
state - > topic = NULL ;
ao2_cleanup ( state - > msg ) ;
state - > msg = NULL ;
ao2_cleanup ( state - > manager ) ;
state - > manager = NULL ;
/* All eids should have been removed */
ast_assert ( AST_VECTOR_SIZE ( & state - > eids ) = = 0 ) ;
AST_VECTOR_FREE ( & state - > eids ) ;
}
static void state_proxy_dtor ( void * obj ) {
struct stasis_state_proxy * proxy = obj ;
ao2_cleanup ( proxy - > manager ) ;
}
static void state_proxy_sub_cb ( void * obj , void * data )
{
struct stasis_state_proxy * proxy = obj ;
ao2_unlink ( proxy - > manager - > states , proxy ) ;
}
/*!
* \ internal
* \ brief Allocate a stasis state object .
* \ brief Allocate a stasis state object and add it to the manager .
*
* Create and initialize a state structure . It ' s required that either a state
* topic , or an id is specified . If a state topic is not given then one will be
@ -134,37 +160,48 @@ static void state_dtor(void *obj)
*
* \ return A stasis_state object or NULL
* \ return NULL on error
*
* \ pre manager - > states must be locked .
* \ pre manager - > states does not contain an object matching key \ a id .
*/
static struct stasis_state * state_alloc ( struct stasis_state_manager * manager ,
struct stasis_topic * state_topic , const char * id )
struct stasis_topic * state_topic , const char * id ,
const char * file , int line , const char * func )
{
struct stasis_state * state ;
struct stasis_state_proxy * proxy = NULL ;
struct stasis_state * state = NULL ;
if ( ! id ) {
/* If not given an id, then a state topic is required */
ast_assert ( state_topic ! = NULL ) ;
/* Get the id we'll key off of from the state topic */
id = state_id_by_topic ( manager - > all_topic , state_topic ) ;
}
state = __ao2_alloc ( sizeof ( * state ) , state_dtor , AO2_ALLOC_OPT_LOCK_MUTEX , id , file , line , func ) ;
if ( ! state ) {
goto error_return ;
}
if ( ! state_topic ) {
char * name ;
/* If not given a state topic, then an id is required */
ast_assert ( id ! = NULL ) ;
/*
* To provide further detail and to ensure that the topic is unique within the
* scope of the system we prefix it with the manager ' s topic name , which should
* itself already be unique .
*/
if ( ast_asprintf ( & name , " %s/%s " , stasis_topic_name ( manager - > all_topic ) , id ) < 0 ) {
ast_log ( LOG_ERROR , " Unable to create state topic name '%s/%s' \n " ,
stasis_topic_name ( manager - > all_topic ) , id ) ;
return NULL ;
goto error_return ;
}
state _ topic = stasis_topic_create ( name ) ;
state - > topic = stasis_topic_create ( name ) ;
if ( ! state_topic ) {
ast_log ( LOG_ERROR , " Unable to create state topic '%s' \n " , name ) ;
ast_free ( name ) ;
return NULL ;
if ( ! state - > topic ) {
goto error_return ;
}
ast_free ( name ) ;
} else {
/*
* Since the state topic was passed in , go ahead and bump its reference .
@ -172,87 +209,57 @@ static struct stasis_state *state_alloc(struct stasis_state_manager *manager,
* state allocation error .
*/
ao2_ref ( state_topic , + 1 ) ;
state - > topic = state_topic ;
}
if ( ! id ) {
/* If not given an id, then a state topic is required */
ast_assert ( state_topic ! = NULL ) ;
/* Get the id we'll key off of from the state topic */
id = state_id_by_topic ( manager - > all_topic , state_topic ) ;
proxy = ao2_t_weakproxy_alloc ( sizeof ( * proxy ) + strlen ( id ) + 1 , state_proxy_dtor , id ) ;
if ( ! proxy ) {
goto error_return ;
}
state = ao2_alloc ( sizeof ( * state ) + strlen ( id ) + 1 , state_dtor ) ;
if ( ! state ) {
ast_log ( LOG_ERROR , " Unable to allocate state '%s' in manager '%s' \n " ,
id , stasis_topic_name ( manager - > all_topic ) ) ;
ao2_ref ( state_topic , - 1 ) ;
return NULL ;
}
strcpy ( proxy - > id , id ) ; /* Safe */
strcpy ( state - > id , id ) ; /* Safe */
state - > topic = state_topic ; /* ref already bumped above */
state - > id = proxy - > id ;
proxy - > manager = ao2_bump ( manager ) ;
state - > manager = proxy - > manager ; /* state->manager is owned by the proxy */
state - > forward = stasis_forward_all ( state - > topic , manager - > all_topic ) ;
if ( ! state - > forward ) {
ast_log ( LOG_ERROR , " Unable to add state '%s' forward in manager '%s' \n " ,
id , stasis_topic_name ( manager - > all_topic ) ) ;
ao2_ref ( state , - 1 ) ;
return NULL ;
goto error_return ;
}
if ( AST_VECTOR_INIT ( & state - > eids , 2 ) ) {
ast_log ( LOG_ERROR , " Unable to initialize eids for state '%s' in manager '%s' \n " ,
id , stasis_topic_name ( manager - > all_topic ) ) ;
ao2_ref ( state , - 1 ) ;
return NULL ;
goto error_return ;
}
state - > manager = ao2_bump ( manager ) ;
return state ;
if ( ao2_t_weakproxy_set_object ( proxy , state , OBJ_NOLOCK , " weakproxy link " ) ) {
goto error_return ;
}
/*!
* \ internal
* \ brief Create a state object , and add it to the manager .
*
* \ note Locking on the states container is specifically not done here , thus
* appropriate locks should be applied prior to this function being called .
*
* \ param manager The manager to be added to
* \ param state_topic A state topic to be managed ( if NULL id is required )
* \ param id The unique id for the state ( if NULL state_topic is required )
*
* \ return The added state object
* \ return NULL on error
*/
static struct stasis_state * state_add ( struct stasis_state_manager * manager ,
struct stasis_topic * state_topic , const char * id )
{
struct stasis_state * state = state_alloc ( manager , state_topic , id ) ;
if ( ! state ) {
return NULL ;
if ( ao2_weakproxy_subscribe ( proxy , state_proxy_sub_cb , NULL , OBJ_NOLOCK ) ) {
goto error_return ;
}
if ( ! ao2_link_flags ( manager - > states , state , OBJ_NOLOCK ) ) {
ast_log ( LOG_ERROR , " Unable to add state '%s' to manager '%s' \n " ,
state - > id ? state - > id : " " , stasis_topic_name ( manager - > all_topic ) ) ;
ao2_ref ( state , - 1 ) ;
return NULL ;
if ( ! ao2_link_flags ( manager - > states , proxy , OBJ_NOLOCK ) ) {
goto error_return ;
}
ao2_ref ( proxy , - 1 ) ;
return state ;
error_return :
ast_log ( LOG_ERROR , " Unable to allocate state '%s' in manager '%s' \n " ,
id , stasis_topic_name ( manager - > all_topic ) ) ;
ao2_cleanup ( state ) ;
ao2_cleanup ( proxy ) ;
return NULL ;
}
/*!
* \ internal
* \ brief Find a state by id , or create one if not found and add it to the manager .
*
* \ note Locking on the states container is specifically not done here , thus
* appropriate locks should be applied prior to this function being called .
*
* \ param manager The manager to be added to
* \ param state_topic A state topic to be managed ( if NULL id is required )
* \ param id The unique id for the state ( if NULL state_topic is required )
@ -260,18 +267,26 @@ static struct stasis_state *state_add(struct stasis_state_manager *manager,
* \ return The added state object
* \ return NULL on error
*/
static struct stasis_state * state_find_or_add ( struct stasis_state_manager * manager ,
struct stasis_topic * state_topic , const char * id )
# define state_find_or_add(mgr, top, id) __state_find_or_add(mgr, top, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
static struct stasis_state * __state_find_or_add ( struct stasis_state_manager * manager ,
struct stasis_topic * state_topic , const char * id ,
const char * file , int line , const char * func )
{
struct stasis_state * state ;
ao2_lock ( manager - > states ) ;
if ( ast_strlen_zero ( id ) ) {
id = state_id_by_topic ( manager - > all_topic , state_topic ) ;
}
state = ao2_find ( manager - > states , id , OBJ_SEARCH_KEY | OBJ_NOLOCK ) ;
state = ao2_weakproxy_find ( manager - > states , id , OBJ_SEARCH_KEY | OBJ_NOLOCK , " " ) ;
if ( ! state ) {
state = state_alloc ( manager , state_topic , id , file , line , func ) ;
}
return state ? state : state_add ( manager , state_topic , id ) ;
ao2_unlock ( manager - > states ) ;
return state ;
}
static void state_manager_dtor ( void * obj )
@ -317,7 +332,7 @@ struct stasis_state_manager *stasis_state_manager_create(const char *topic_name)
}
manager - > states = ao2_container_alloc_hash ( AO2_ALLOC_OPT_LOCK_MUTEX , 0 ,
STATE_BUCKETS , stasis_state_ hash_fn, NULL , stasis_state _cmp_fn) ;
STATE_BUCKETS , stasis_state_ proxy_ hash_fn, NULL , stasis_state _proxy _cmp_fn) ;
if ( ! manager - > states ) {
ao2_ref ( manager , - 1 ) ;
return NULL ;
@ -356,10 +371,7 @@ struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, co
struct stasis_topic * topic ;
struct stasis_state * state ;
ao2_lock ( manager - > states ) ;
state = state_find_or_add ( manager , NULL , id ) ;
ao2_unlock ( manager - > states ) ;
if ( ! state ) {
return NULL ;
}
@ -369,53 +381,6 @@ struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, co
return topic ;
}
/*!
* \ internal
* \ brief Remove a state from the stasis manager
*
* State should only be removed from the manager under the following conditions :
*
* There are no more subscribers to it
* There are no more explicit publishers publishing to it
* There are no more implicit publishers publishing to it
*
* Subscribers and explicit publishers hold a reference to the state object itself , so
* once a state ' s reference count drops to 2 ( 1 for the manager , 1 passed in ) then we
* know there are no more subscribers or explicit publishers . Implicit publishers are
* tracked by eids , so once that container is empty no more implicit publishers exist
* for the state either . Only then can a state be removed .
*
* \ param state The state to remove
*/
static void state_remove ( struct stasis_state * state )
{
ao2_lock ( state ) ;
/*
* The manager ' s state container also needs to be locked here prior to checking
* the state ' s reference count , and potentially removing since we don ' t want its
* count to possibly increase between the check and unlinking .
*/
ao2_lock ( state - > manager - > states ) ;
/*
* If there are only 2 references left then it ' s the one owned by the manager ,
* and the one passed in to this function . However , before removing it from the
* manager we need to also check that no eid is associated with the given state .
* If an eid still remains then this means that an implicit publisher is still
* publishing to this state .
*/
if ( ao2_ref ( state , 0 ) = = 2 & & AST_VECTOR_SIZE ( & state - > eids ) = = 0 ) {
ao2_unlink_flags ( state - > manager - > states , state , 0 ) ;
}
ao2_unlock ( state - > manager - > states ) ;
ao2_unlock ( state ) ;
/* Now it's safe to remove the reference that is held on the given state */
ao2_ref ( state , - 1 ) ;
}
struct stasis_state_subscriber {
/*! The stasis state subscribed to */
struct stasis_state * state ;
@ -441,7 +406,7 @@ static void subscriber_dtor(void *obj)
- - sub - > state - > num_subscribers ;
ao2_unlock ( sub - > state ) ;
state_remove ( sub - > state ) ;
ao2_ref ( sub - > state , - 1 ) ;
}
struct stasis_state_subscriber * stasis_state_add_subscriber (
@ -457,14 +422,11 @@ struct stasis_state_subscriber *stasis_state_add_subscriber(
return NULL ;
}
ao2_lock ( manager - > states ) ;
sub - > state = state_find_or_add ( manager , NULL , id ) ;
if ( ! sub - > state ) {
ao2_unlock ( manager - > states ) ;
ao2_ref ( sub , - 1 ) ;
return NULL ;
}
ao2_unlock ( manager - > states ) ;
ao2_lock ( sub - > state ) ;
+ + sub - > state - > num_subscribers ;
@ -563,7 +525,7 @@ static void publisher_dtor(void *obj)
{
struct stasis_state_publisher * pub = obj ;
state_remove ( pub - > state ) ;
ao2_ref ( pub - > state , - 1 ) ;
}
struct stasis_state_publisher * stasis_state_add_publisher (
@ -578,14 +540,11 @@ struct stasis_state_publisher *stasis_state_add_publisher(
return NULL ;
}
ao2_lock ( manager - > states ) ;
pub - > state = state_find_or_add ( manager , NULL , id ) ;
if ( ! pub - > state ) {
ao2_unlock ( manager - > states ) ;
ao2_ref ( pub , - 1 ) ;
return NULL ;
}
ao2_unlock ( manager - > states ) ;
return pub ;
}
@ -639,7 +598,10 @@ static void state_find_or_add_eid(struct stasis_state *state, const struct ast_e
}
if ( i = = AST_VECTOR_SIZE ( & state - > eids ) ) {
AST_VECTOR_APPEND ( & state - > eids , * eid ) ;
if ( ! AST_VECTOR_APPEND ( & state - > eids , * eid ) ) {
/* This ensures state cannot be freed if it has any eids */
ao2_ref ( state , + 1 ) ;
}
}
}
@ -666,6 +628,8 @@ static void state_find_and_remove_eid(struct stasis_state *state, const struct a
for ( i = 0 ; i < AST_VECTOR_SIZE ( & state - > eids ) ; + + i ) {
if ( ! ast_eid_cmp ( AST_VECTOR_GET_ADDR ( & state - > eids , i ) , eid ) ) {
AST_VECTOR_REMOVE_UNORDERED ( & state - > eids , i ) ;
/* Balance the reference from state_find_or_add_eid */
ao2_ref ( state , - 1 ) ;
return ;
}
}
@ -676,10 +640,7 @@ void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char
{
struct stasis_state * state ;
ao2_lock ( manager - > states ) ;
state = state_find_or_add ( manager , NULL , id ) ;
ao2_unlock ( manager - > states ) ;
if ( ! state ) {
return ;
}
@ -697,7 +658,7 @@ void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char
void stasis_state_remove_publish_by_id ( struct stasis_state_manager * manager ,
const char * id , const struct ast_eid * eid , struct stasis_message * msg )
{
struct stasis_state * state = ao2_ find( manager - > states , id , OBJ_SEARCH_KEY ) ;
struct stasis_state * state = ao2_ weakproxy_ find( manager - > states , id , OBJ_SEARCH_KEY , " " ) ;
if ( ! state ) {
/*
@ -721,7 +682,7 @@ void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager,
state_find_and_remove_eid ( state , eid ) ;
ao2_unlock ( state ) ;
state_remove( state ) ;
ao2_ref( state , - 1 ) ;
}
int stasis_state_add_observer ( struct stasis_state_manager * manager ,
@ -744,10 +705,8 @@ void stasis_state_remove_observer(struct stasis_state_manager *manager,
AST_VECTOR_RW_UNLOCK ( & manager - > observers ) ;
}
static int handle_stasis_state ( void * obj , void * arg , void * data , int flags )
static int handle_stasis_state ( struct stasis_state * state , on_stasis_state handler , void * data )
{
struct stasis_state * state = obj ;
on_stasis_state handler = arg ;
struct stasis_message * msg ;
int res ;
@ -764,24 +723,41 @@ static int handle_stasis_state(void *obj, void *arg, void *data, int flags)
return res ;
}
static int handle_stasis_state_proxy ( void * obj , void * arg , void * data , int flags )
{
struct stasis_state * state = ao2_weakproxy_get_object ( obj , 0 ) ;
if ( state ) {
int res ;
res = handle_stasis_state ( state , arg , data ) ;
ao2_ref ( state , - 1 ) ;
return res ;
}
return 0 ;
}
void stasis_state_callback_all ( struct stasis_state_manager * manager , on_stasis_state handler ,
void * data )
{
ast_assert ( handler ! = NULL ) ;
ao2_callback_data ( manager - > states , OBJ_MULTIPLE | OBJ_NODATA ,
handle_stasis_state , handler , data ) ;
handle_stasis_state _proxy , handler , data ) ;
}
static int handle_stasis_state_subscribed ( void * obj , void * arg , void * data , int flags )
{
struct stasis_state * state = obj ;
struct stasis_state * state = ao2_weakproxy_get_object ( obj , 0 ) ;
int res = 0 ;
if ( state - > num_subscribers ) {
return handle_stasis_state ( obj , arg , data , flags ) ;
if ( state & & state - > num_subscribers ) {
res = handle_stasis_state ( state , arg , data ) ;
}
return 0 ;
ao2_cleanup ( state ) ;
return res ;
}
void stasis_state_callback_subscribed ( struct stasis_state_manager * manager , on_stasis_state handler ,