@ -39,6 +39,95 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
# include "asterisk/utils.h"
# include "asterisk/uuid.h"
/*!
* \ page stasis - impl Stasis Implementation Notes
*
* \ par Reference counting
*
* Stasis introduces a number of objects , which are tightly related to one
* another . Because we rely on ref - counting for memory management , understanding
* these relationships is important to understanding this code .
*
* \ code { . txt }
*
* stasis_topic < - - - - > stasis_subscription
* ^ ^
* \ /
* \ /
* dispatch
* |
* |
* v
* stasis_message
* |
* |
* v
* stasis_message_type
*
* \ endcode
*
* The most troubling thing in this chart is the cyclic reference between
* stasis_topic and stasis_subscription . This is both unfortunate , and
* necessary . Topics need the subscription in order to dispatch messages ;
* subscriptions need the topics to unsubscribe and check subscription status .
*
* The cycle is broken by stasis_unsubscribe ( ) . The unsubscribe will remove the
* topic ' s reference to a subscription . When the subcription is destroyed , it
* will remove its reference to the topic .
*
* This means that until a subscription has be explicitly unsubscribed , it will
* not be destroyed . Neither will a topic be destroyed while it has subscribers .
* The destructors of both have assertions regarding this to catch ref - counting
* problems where a subscription or topic has had an extra ao2_cleanup ( ) .
*
* The \ ref dispatch object is a transient object , which is posted to a
* subscription ' s taskprocessor to send a message to the subscriber . They have
* short life cycles , allocated on one thread , destroyed on another .
*
* During shutdown , or the deletion of a domain object , there are a flurry of
* ao2_cleanup ( ) s on subscriptions and topics , as the final in - flight messages
* are processed . Any one of these cleanups could be the one to actually destroy
* a given object , so care must be taken to ensure that an object isn ' t
* referenced after an ao2_cleanup ( ) . This includes the implicit ao2_unlock ( )
* that might happen when a RAII_VAR ( ) goes out of scope .
*
* \ par Typical life cycles
*
* \ li stasis_topic - There are several topics which live for the duration of
* the Asterisk process ( ast_channel_topic_all ( ) , etc . ) but most of these
* are actually fed by shorter - lived topics whose lifetime is associated
* with some domain object ( like ast_channel_topic ( ) for a given
* ast_channel ) .
*
* \ li stasis_subscription - Subscriptions have a similar mix of lifetimes as
* topics , for similar reasons .
*
* \ li dispatch - Very short lived ; just long enough to post a message to a
* subscriber .
*
* \ li stasis_message - Short to intermediate lifetimes , but that is mostly
* irrelevant . Messages are strictly data and have no behavior associated
* with them , so it doesn ' t really matter if / when they are destroyed . By
* design , a component could hold a ref to a message forever without any
* ill consequences ( aside from consuming more memory ) .
*
* \ li stasis_message_type - Long life cycles , typically only destroyed on
* module unloading or _clean_ process exit .
*
* \ par Subscriber shutdown sequencing
*
* Subscribers are sensitive to shutdown sequencing , specifically in how the
* reference message types . This is fully detailed on the wiki at
* https : //wiki.asterisk.org/wiki/x/K4BqAQ.
*
* In short , the lifetime of the \ a data ( and \ a callback , if in a module ) must
* be held until the stasis_subscription_final_message ( ) has been received .
* Depending on the structure of the subscriber code , this can be handled by
* using stasis_subscription_final_message ( ) to free resources on the final
* message , or using stasis_subscription_join ( ) / stasis_unsubscribe_and_join ( ) to
* block until the unsubscribe has completed .
*/
/*! Initial size of the subscribers list. */
# define INITIAL_SUBSCRIBERS_MAX 4
@ -53,7 +142,7 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
struct stasis_topic {
char * name ;
/*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
/*! Variable length array of the subscribers */
struct stasis_subscription * * subscribers ;
/*! Allocated length of the subscribers array */
size_t num_subscribers_max ;
@ -67,6 +156,10 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
static void topic_dtor ( void * obj )
{
struct stasis_topic * topic = obj ;
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here . */
ast_assert ( topic - > num_subscribers_current = = 0 ) ;
ast_free ( topic - > name ) ;
topic - > name = NULL ;
ast_free ( topic - > subscribers ) ;
@ -116,7 +209,7 @@ struct stasis_subscription {
/*! Data pointer to be handed to the callback. */
void * data ;
/*! Lock for joining with subscription . */
/*! Lock for completion flags \c final_message_{rxed,processed} . */
ast_mutex_t join_lock ;
/*! Condition for joining with subscription. */
ast_cond_t join_cond ;
@ -131,8 +224,14 @@ struct stasis_subscription {
static void subscription_dtor ( void * obj )
{
struct stasis_subscription * sub = obj ;
/* Subscriptions need to be manually unsubscribed before destruction
* b / c there ' s a cyclic reference between topics and subscriptions */
ast_assert ( ! stasis_subscription_is_subscribed ( sub ) ) ;
/* If there are any messages in flight to this subscription; that would
* be bad . */
ast_assert ( stasis_subscription_is_done ( sub ) ) ;
ao2_cleanup ( sub - > topic ) ;
sub - > topic = NULL ;
ast_taskprocessor_unreference ( sub - > mailbox ) ;
@ -221,7 +320,10 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
if ( sub ) {
size_t i ;
struct stasis_topic * topic = sub - > topic ;
/* The subscription may be the last ref to this topic. Hold
* the topic ref open until after the unlock . */
RAII_VAR ( struct stasis_topic * , topic , ao2_bump ( sub - > topic ) ,
ao2_cleanup ) ;
SCOPED_AO2LOCK ( lock_topic , topic ) ;
for ( i = 0 ; i < topic - > num_subscribers_current ; + + i ) {
@ -240,11 +342,6 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
return NULL ;
}
/*!
* \ brief Block until the final message has been received on a subscription .
*
* \ param subscription Subscription to wait on .
*/
void stasis_subscription_join ( struct stasis_subscription * subscription )
{
if ( subscription ) {
@ -347,7 +444,12 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
topic - > num_subscribers_max * = 2 ;
}
/* Don't ref sub here or we'll cause a reference cycle. */
/* The reference from the topic to the subscription is shared with
* the owner of the subscription , which will explicitly unsubscribe
* to release it .
*
* If we bumped the refcount here , the owner would have to unsubscribe
* and cleanup , which is a bit awkward . */
topic - > subscribers [ topic - > num_subscribers_current + + ] = sub ;
return 0 ;
}
@ -413,9 +515,13 @@ static int dispatch_exec(void *data)
return 0 ;
}
void stasis_forward_message ( struct stasis_topic * topic, struct stasis_topic * publisher_topic , struct stasis_message * message )
void stasis_forward_message ( struct stasis_topic * _ topic, struct stasis_topic * publisher_topic , struct stasis_message * message )
{
size_t i ;
/* The topic may be unref'ed by the subscription invocation.
* Make sure we hold onto a reference while dispatching . */
RAII_VAR ( struct stasis_topic * , topic , ao2_bump ( _topic ) ,
ao2_cleanup ) ;
SCOPED_AO2LOCK ( lock , topic ) ;
ast_assert ( topic ! = NULL ) ;
@ -625,7 +731,7 @@ void stasis_log_bad_type_access(const char *name)
ast_log ( LOG_ERROR , " Use of %s() before init/after destruction \n " , name ) ;
}
/*! \brief Cleanup function */
/*! \brief Shutdown function */
static void stasis_exit ( void )
{
ast_threadpool_shutdown ( pool ) ;