@ -178,28 +178,22 @@ static void topic_dtor(void *obj)
struct stasis_topic * stasis_topic_create ( const char * name )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
struct stasis_topic * topic ;
int res = 0 ;
topic = ao2_alloc ( sizeof ( * topic ) , topic_dtor ) ;
if ( ! topic ) {
return NULL ;
}
topic - > name = ast_strdup ( name ) ;
if ( ! topic - > name ) {
return NULL ;
}
res | = AST_VECTOR_INIT ( & topic - > subscribers , INITIAL_SUBSCRIBERS_MAX ) ;
res | = AST_VECTOR_INIT ( & topic - > upstream_topics , 0 ) ;
if ( res ! = 0 ) {
if ( ! topic - > name | | res ) {
ao2_cleanup ( topic ) ;
return NULL ;
}
ao2_ref ( topic , + 1 ) ;
return topic ;
}
@ -221,8 +215,6 @@ struct stasis_subscription {
/*! Data pointer to be handed to the callback. */
void * data ;
/*! Lock for completion flags \c final_message_{rxed,processed}. */
ast_mutex_t join_lock ;
/*! Condition for joining with subscription. */
ast_cond_t join_cond ;
/*! Flag set when final message for sub has been received.
@ -248,7 +240,6 @@ static void subscription_dtor(void *obj)
sub - > topic = NULL ;
ast_taskprocessor_unreference ( sub - > mailbox ) ;
sub - > mailbox = NULL ;
ast_mutex_destroy ( & sub - > join_lock ) ;
ast_cond_destroy ( & sub - > join_cond ) ;
}
@ -263,7 +254,8 @@ static void subscription_invoke(struct stasis_subscription *sub,
{
/* Notify that the final message has been received */
if ( stasis_subscription_final_message ( sub , message ) ) {
SCOPED_MUTEX ( lock , & sub - > join_lock ) ;
SCOPED_AO2LOCK ( lock , sub ) ;
sub - > final_message_rxed = 1 ;
ast_cond_signal ( & sub - > join_cond ) ;
}
@ -273,7 +265,8 @@ static void subscription_invoke(struct stasis_subscription *sub,
/* Notify that the final message has been processed */
if ( stasis_subscription_final_message ( sub , message ) ) {
SCOPED_MUTEX ( lock , & sub - > join_lock ) ;
SCOPED_AO2LOCK ( lock , sub ) ;
sub - > final_message_processed = 1 ;
ast_cond_signal ( & sub - > join_cond ) ;
}
@ -294,6 +287,7 @@ struct stasis_subscription *internal_stasis_subscribe(
return NULL ;
}
/* The ao2 lock is used for join_cond. */
sub = ao2_alloc ( sizeof ( * sub ) , subscription_dtor ) ;
if ( ! sub ) {
return NULL ;
@ -323,7 +317,6 @@ struct stasis_subscription *internal_stasis_subscribe(
sub - > topic = topic ;
sub - > callback = callback ;
sub - > data = data ;
ast_mutex_init ( & sub - > join_lock ) ;
ast_cond_init ( & sub - > join_cond , NULL ) ;
if ( topic_add_subscription ( topic , sub ) ! = 0 ) {
@ -385,11 +378,12 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
void stasis_subscription_join ( struct stasis_subscription * subscription )
{
if ( subscription ) {
SCOPED_MUTEX ( lock , & subscription - > join_lock ) ;
SCOPED_AO2LOCK ( lock , subscription ) ;
/* Wait until the processed flag has been set */
while ( ! subscription - > final_message_processed ) {
ast_cond_wait ( & subscription - > join_cond ,
& subscription - > join_lock ) ;
ao2_object_get_lockaddr ( subscription ) ) ;
}
}
}
@ -397,7 +391,8 @@ void stasis_subscription_join(struct stasis_subscription *subscription)
int stasis_subscription_is_done ( struct stasis_subscription * subscription )
{
if ( subscription ) {
SCOPED_MUTEX ( lock , & subscription - > join_lock ) ;
SCOPED_AO2LOCK ( lock , subscription ) ;
return subscription - > final_message_rxed ;
}
@ -446,6 +441,7 @@ const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
int stasis_subscription_final_message ( struct stasis_subscription * sub , struct stasis_message * msg )
{
struct stasis_subscription_change * change ;
if ( stasis_message_type ( msg ) ! = stasis_subscription_change_type ( ) ) {
return 0 ;
}
@ -575,9 +571,7 @@ static void dispatch_message(struct stasis_subscription *sub,
*/
ao2_bump ( message ) ;
if ( ! synchronous ) {
if ( ast_taskprocessor_push_local ( sub - > mailbox ,
dispatch_exec_async ,
message ) ! = 0 ) {
if ( ast_taskprocessor_push_local ( sub - > mailbox , dispatch_exec_async , message ) ) {
/* Push failed; ugh. */
ast_log ( LOG_ERROR , " Dropping async dispatch \n " ) ;
ao2_cleanup ( message ) ;
@ -590,12 +584,12 @@ static void dispatch_message(struct stasis_subscription *sub,
std . complete = 0 ;
std . task_data = message ;
if ( ast_taskprocessor_push_local ( sub - > mailbox ,
dispatch_exec_sync ,
& std ) ) {
if ( ast_taskprocessor_push_local ( sub - > mailbox , dispatch_exec_sync , & std ) ) {
/* Push failed; ugh. */
ast_log ( LOG_ERROR , " Dropping sync dispatch \n " ) ;
ao2_cleanup ( message ) ;
ast_mutex_destroy ( & std . lock ) ;
ast_cond_destroy ( & std . cond ) ;
return ;
}
@ -718,7 +712,7 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
return NULL ;
}
forward = ao2_alloc ( sizeof ( * forward ) , forward_dtor ) ;
forward = ao2_alloc _options ( sizeof ( * forward ) , forward_dtor , AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
if ( ! forward ) {
return NULL ;
}
@ -746,16 +740,18 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
static void subscription_change_dtor ( void * obj )
{
struct stasis_subscription_change * change = obj ;
ast_string_field_free_memory ( change ) ;
ao2_cleanup ( change - > topic ) ;
}
static struct stasis_subscription_change * subscription_change_alloc ( struct stasis_topic * topic , const char * uniqueid , const char * description )
{
RAII_VAR ( struct stasis_subscription_change * , change , NULL , ao2_cleanup ) ;
struct stasis_subscription_change * change ;
change = ao2_alloc ( sizeof ( struct stasis_subscription_change ) , subscription_change_dtor ) ;
if ( ast_string_field_init ( change , 128 ) ) {
if ( ! change | | ast_string_field_init ( change , 128 ) ) {
ao2_cleanup ( change ) ;
return NULL ;
}
@ -764,51 +760,50 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi
ao2_ref ( topic , + 1 ) ;
change - > topic = topic ;
ao2_ref ( change , + 1 ) ;
return change ;
}
static void send_subscription_subscribe ( struct stasis_topic * topic , struct stasis_subscription * sub )
{
RAII_VAR ( struct stasis_subscription_change * , change , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , msg , NULL , ao2_cleanup ) ;
struct stasis_subscription_change * change ;
struct stasis_message * msg ;
/* This assumes that we have already unsubscribed */
ast_assert ( stasis_subscription_is_subscribed ( sub ) ) ;
change = subscription_change_alloc ( topic , sub - > uniqueid , " Subscribe " ) ;
if ( ! change ) {
return ;
}
msg = stasis_message_create ( stasis_subscription_change_type ( ) , change ) ;
if ( ! msg ) {
ao2_cleanup ( change ) ;
return ;
}
stasis_publish ( topic , msg ) ;
ao2_cleanup ( msg ) ;
ao2_cleanup ( change ) ;
}
static void send_subscription_unsubscribe ( struct stasis_topic * topic ,
struct stasis_subscription * sub )
{
RAII_VAR ( struct stasis_subscription_change * , change , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , msg , NULL , ao2_cleanup ) ;
struct stasis_subscription_change * change ;
struct stasis_message * msg ;
/* This assumes that we have already unsubscribed */
ast_assert ( ! stasis_subscription_is_subscribed ( sub ) ) ;
change = subscription_change_alloc ( topic , sub - > uniqueid , " Unsubscribe " ) ;
if ( ! change ) {
return ;
}
msg = stasis_message_create ( stasis_subscription_change_type ( ) , change ) ;
if ( ! msg ) {
ao2_cleanup ( change ) ;
return ;
}
@ -816,6 +811,9 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
/* Now we have to dispatch to the subscription itself */
dispatch_message ( sub , msg , 0 ) ;
ao2_cleanup ( msg ) ;
ao2_cleanup ( change ) ;
}
struct topic_pool_entry {
@ -826,6 +824,7 @@ struct topic_pool_entry {
static void topic_pool_entry_dtor ( void * obj )
{
struct topic_pool_entry * entry = obj ;
entry - > forward = stasis_forward_cancel ( entry - > forward ) ;
ao2_cleanup ( entry - > topic ) ;
entry - > topic = NULL ;
@ -833,7 +832,8 @@ static void topic_pool_entry_dtor(void *obj)
static struct topic_pool_entry * topic_pool_entry_alloc ( void )
{
return ao2_alloc ( sizeof ( struct topic_pool_entry ) , topic_pool_entry_dtor ) ;
return ao2_alloc_options ( sizeof ( struct topic_pool_entry ) , topic_pool_entry_dtor ,
AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
}
struct stasis_topic_pool {
@ -844,6 +844,7 @@ struct stasis_topic_pool {
static void topic_pool_dtor ( void * obj )
{
struct stasis_topic_pool * pool = obj ;
ao2_cleanup ( pool - > pool_container ) ;
pool - > pool_container = NULL ;
ao2_cleanup ( pool - > pool_topic ) ;
@ -852,28 +853,80 @@ static void topic_pool_dtor(void *obj)
static int topic_pool_entry_hash ( const void * obj , const int flags )
{
const char * topic_name = ( flags & OBJ_KEY ) ? obj : stasis_topic_name ( ( ( struct topic_pool_entry * ) obj ) - > topic ) ;
return ast_str_case_hash ( topic_name ) ;
const struct topic_pool_entry * object ;
const char * key ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_KEY :
key = obj ;
break ;
case OBJ_SEARCH_OBJECT :
object = obj ;
key = stasis_topic_name ( object - > topic ) ;
break ;
default :
/* Hash can only work on something with a full key. */
ast_assert ( 0 ) ;
return 0 ;
}
return ast_str_case_hash ( key ) ;
}
static int topic_pool_entry_cmp ( void * obj , void * arg , int flags )
{
struct topic_pool_entry * opt1 = obj , * opt2 = arg ;
const char * topic_name = ( flags & OBJ_KEY ) ? arg : stasis_topic_name ( opt2 - > topic ) ;
return strcasecmp ( stasis_topic_name ( opt1 - > topic ) , topic_name ) ? 0 : CMP_MATCH | CMP_STOP ;
const struct topic_pool_entry * object_left = obj ;
const struct topic_pool_entry * object_right = arg ;
const char * right_key = arg ;
int cmp ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_OBJECT :
right_key = stasis_topic_name ( object_right - > topic ) ;
/* Fall through */
case OBJ_SEARCH_KEY :
cmp = strcasecmp ( stasis_topic_name ( object_left - > topic ) , right_key ) ;
break ;
case OBJ_SEARCH_PARTIAL_KEY :
/* Not supported by container */
ast_assert ( 0 ) ;
cmp = - 1 ;
break ;
default :
/*
* What arg points to is specific to this traversal callback
* and has no special meaning to astobj2 .
*/
cmp = 0 ;
break ;
}
if ( cmp ) {
return 0 ;
}
/*
* At this point the traversal callback is identical to a sorted
* container .
*/
return CMP_MATCH ;
}
struct stasis_topic_pool * stasis_topic_pool_create ( struct stasis_topic * pooled_topic )
{
RAII_VAR ( struct stasis_topic_pool * , pool , ao2_alloc ( sizeof ( * pool ) , topic_pool_dtor ) , ao2_cleanup ) ;
struct stasis_topic_pool * pool ;
pool = ao2_alloc_options ( sizeof ( * pool ) , topic_pool_dtor , AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
if ( ! pool ) {
return NULL ;
}
pool - > pool_container = ao2_container_alloc ( TOPIC_POOL_BUCKETS , topic_pool_entry_hash , topic_pool_entry_cmp ) ;
pool - > pool_container = ao2_container_alloc ( TOPIC_POOL_BUCKETS ,
topic_pool_entry_hash , topic_pool_entry_cmp ) ;
if ( ! pool - > pool_container ) {
ao2_cleanup ( pool ) ;
return NULL ;
}
ao2_ref ( pooled_topic , + 1 ) ;
pool - > pool_topic = pooled_topic ;
ao2_ref ( pool , + 1 ) ;
return pool ;
}
@ -881,14 +934,13 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
{
RAII_VAR ( struct topic_pool_entry * , topic_pool_entry , NULL , ao2_cleanup ) ;
SCOPED_AO2LOCK ( topic_container_lock , pool - > pool_container ) ;
topic_pool_entry = ao2_find ( pool - > pool_container , topic_name , OBJ_KEY | OBJ_NOLOCK ) ;
topic_pool_entry = ao2_find ( pool - > pool_container , topic_name , OBJ_SEARCH_KEY | OBJ_NOLOCK ) ;
if ( topic_pool_entry ) {
return topic_pool_entry - > topic ;
}
topic_pool_entry = topic_pool_entry_alloc ( ) ;
if ( ! topic_pool_entry ) {
return NULL ;
}
@ -903,7 +955,9 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
return NULL ;
}
ao2_link_flags ( pool - > pool_container , topic_pool_entry , OBJ_NOLOCK ) ;
if ( ! ao2_link_flags ( pool - > pool_container , topic_pool_entry , OBJ_NOLOCK ) ) {
return NULL ;
}
return topic_pool_entry - > topic ;
}