@ -61,7 +61,6 @@ struct stasis_topic {
} ;
} ;
/* Forward declarations for the tightly-coupled subscription object */
/* Forward declarations for the tightly-coupled subscription object */
struct stasis_subscription ;
static int topic_add_subscription ( struct stasis_topic * topic , struct stasis_subscription * sub ) ;
static int topic_add_subscription ( struct stasis_topic * topic , struct stasis_subscription * sub ) ;
static void topic_dtor ( void * obj )
static void topic_dtor ( void * obj )
@ -127,9 +126,30 @@ static void subscription_dtor(void *obj)
sub - > mailbox = NULL ;
sub - > mailbox = NULL ;
}
}
/*!
* \ brief Invoke the subscription ' s callback .
* \ param sub Subscription to invoke .
* \ param topic Topic message was published to .
* \ param message Message to send .
*/
static void subscription_invoke ( struct stasis_subscription * sub ,
struct stasis_topic * topic ,
struct stasis_message * message )
{
/* Since sub->topic doesn't change, no need to lock sub */
sub - > callback ( sub - > data ,
sub ,
topic ,
message ) ;
}
static void send_subscription_change_message ( struct stasis_topic * topic , char * uniqueid , char * description ) ;
static void send_subscription_change_message ( struct stasis_topic * topic , char * uniqueid , char * description ) ;
struct stasis_subscription * stasis_subscribe ( struct stasis_topic * topic , stasis_subscription_cb callback , void * data )
static struct stasis_subscription * __stasis_subscribe (
struct stasis_topic * topic ,
stasis_subscription_cb callback ,
void * data ,
int needs_mailbox )
{
{
RAII_VAR ( struct stasis_subscription * , sub , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_subscription * , sub , NULL , ao2_cleanup ) ;
@ -140,10 +160,12 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_
ast_uuid_generate_str ( sub - > uniqueid , sizeof ( sub - > uniqueid ) ) ;
ast_uuid_generate_str ( sub - > uniqueid , sizeof ( sub - > uniqueid ) ) ;
if ( needs_mailbox ) {
sub - > mailbox = ast_threadpool_serializer ( sub - > uniqueid , pool ) ;
sub - > mailbox = ast_threadpool_serializer ( sub - > uniqueid , pool ) ;
if ( ! sub - > mailbox ) {
if ( ! sub - > mailbox ) {
return NULL ;
return NULL ;
}
}
}
ao2_ref ( topic , + 1 ) ;
ao2_ref ( topic , + 1 ) ;
sub - > topic = topic ;
sub - > topic = topic ;
@ -159,6 +181,14 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_
return sub ;
return sub ;
}
}
struct stasis_subscription * stasis_subscribe (
struct stasis_topic * topic ,
stasis_subscription_cb callback ,
void * data )
{
return __stasis_subscribe ( topic , callback , data , 1 ) ;
}
struct stasis_subscription * stasis_unsubscribe ( struct stasis_subscription * sub )
struct stasis_subscription * stasis_unsubscribe ( struct stasis_subscription * sub )
{
{
if ( sub ) {
if ( sub ) {
@ -305,17 +335,8 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
static int dispatch_exec ( void * data )
static int dispatch_exec ( void * data )
{
{
RAII_VAR ( struct dispatch * , dispatch , data , ao2_cleanup ) ;
RAII_VAR ( struct dispatch * , dispatch , data , ao2_cleanup ) ;
RAII_VAR ( struct stasis_topic * , sub_topic , NULL , ao2_cleanup ) ;
/* Since sub->topic doesn't change, no need to lock sub */
ast_assert ( dispatch - > sub - > topic ! = NULL ) ;
ao2_ref ( dispatch - > sub - > topic , + 1 ) ;
sub_topic = dispatch - > sub - > topic ;
dispatch - > sub - > callback ( dispatch - > sub - > data ,
subscription_invoke ( dispatch - > sub , dispatch - > topic , dispatch - > message ) ;
dispatch - > sub ,
sub_topic ,
dispatch - > message ) ;
return 0 ;
return 0 ;
}
}
@ -331,10 +352,12 @@ void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *pub
for ( i = 0 ; i < topic - > num_subscribers_current ; + + i ) {
for ( i = 0 ; i < topic - > num_subscribers_current ; + + i ) {
struct stasis_subscription * sub = topic - > subscribers [ i ] ;
struct stasis_subscription * sub = topic - > subscribers [ i ] ;
RAII_VAR ( struct dispatch * , dispatch , NULL , ao2_cleanup ) ;
ast_assert ( sub ! = NULL ) ;
ast_assert ( sub ! = NULL ) ;
if ( sub - > mailbox ) {
RAII_VAR ( struct dispatch * , dispatch , NULL , ao2_cleanup ) ;
dispatch = dispatch_create ( publisher_topic , message , sub ) ;
dispatch = dispatch_create ( publisher_topic , message , sub ) ;
if ( ! dispatch ) {
if ( ! dispatch ) {
ast_log ( LOG_DEBUG , " Dropping dispatch \n " ) ;
ast_log ( LOG_DEBUG , " Dropping dispatch \n " ) ;
@ -342,7 +365,15 @@ void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *pub
}
}
if ( ast_taskprocessor_push ( sub - > mailbox , dispatch_exec , dispatch ) = = 0 ) {
if ( ast_taskprocessor_push ( sub - > mailbox , dispatch_exec , dispatch ) = = 0 ) {
dispatch = NULL ; /* Ownership transferred to mailbox */
/* Ownership transferred to mailbox.
* Don ' t increment ref , b / c the task processor
* may have already gotten rid of the object .
*/
dispatch = NULL ;
}
} else {
/* Dispatch directly */
subscription_invoke ( sub , publisher_topic , message ) ;
}
}
}
}
}
}
@ -370,7 +401,11 @@ struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
return NULL ;
return NULL ;
}
}
sub = stasis_subscribe ( from_topic , stasis_forward_cb , to_topic ) ;
/* Forwarding subscriptions should dispatch directly instead of having a
* mailbox . Otherwise , messages forwarded to the same topic from
* different topics may get reordered . Which is bad .
*/
sub = __stasis_subscribe ( from_topic , stasis_forward_cb , to_topic , 0 ) ;
if ( sub ) {
if ( sub ) {
/* hold a ref to to_topic for this forwarding subscription */
/* hold a ref to to_topic for this forwarding subscription */
ao2_ref ( to_topic , + 1 ) ;
ao2_ref ( to_topic , + 1 ) ;