@ -370,6 +370,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
return topic - > name ;
return topic - > name ;
}
}
size_t stasis_topic_subscribers ( const struct stasis_topic * topic )
{
return AST_VECTOR_SIZE ( & topic - > subscribers ) ;
}
/*! \internal */
/*! \internal */
struct stasis_subscription {
struct stasis_subscription {
/*! Unique ID for this subscription */
/*! Unique ID for this subscription */
@ -391,6 +396,11 @@ struct stasis_subscription {
/*! Flag set when final message for sub has been processed.
/*! Flag set when final message for sub has been processed.
* Be sure join_lock is held before reading / setting . */
* Be sure join_lock is held before reading / setting . */
int final_message_processed ;
int final_message_processed ;
/*! The message types this subscription is accepting */
AST_VECTOR ( , char ) accepted_message_types ;
/*! The message filter currently in use */
enum stasis_subscription_message_filter filter ;
} ;
} ;
static void subscription_dtor ( void * obj )
static void subscription_dtor ( void * obj )
@ -409,6 +419,8 @@ static void subscription_dtor(void *obj)
ast_taskprocessor_unreference ( sub - > mailbox ) ;
ast_taskprocessor_unreference ( sub - > mailbox ) ;
sub - > mailbox = NULL ;
sub - > mailbox = NULL ;
ast_cond_destroy ( & sub - > join_cond ) ;
ast_cond_destroy ( & sub - > join_cond ) ;
AST_VECTOR_FREE ( & sub - > accepted_message_types ) ;
}
}
/*!
/*!
@ -420,19 +432,25 @@ static void subscription_dtor(void *obj)
static void subscription_invoke ( struct stasis_subscription * sub ,
static void subscription_invoke ( struct stasis_subscription * sub ,
struct stasis_message * message )
struct stasis_message * message )
{
{
unsigned int final = stasis_subscription_final_message ( sub , message ) ;
int message_type_id = stasis_message_type_id ( stasis_subscription_change_type ( ) ) ;
/* Notify that the final message has been received */
/* Notify that the final message has been received */
if ( stasis_subscription_final_message ( sub , message ) ) {
if ( final ) {
ao2_lock ( sub ) ;
ao2_lock ( sub ) ;
sub - > final_message_rxed = 1 ;
sub - > final_message_rxed = 1 ;
ast_cond_signal ( & sub - > join_cond ) ;
ast_cond_signal ( & sub - > join_cond ) ;
ao2_unlock ( sub ) ;
ao2_unlock ( sub ) ;
}
}
/* Since sub is mostly immutable, no need to lock sub */
if ( ! final | | sub - > filter ! = STASIS_SUBSCRIPTION_FILTER_SELECTIVE | |
sub - > callback ( sub - > data , sub , message ) ;
( message_type_id < AST_VECTOR_SIZE ( & sub - > accepted_message_types ) & & AST_VECTOR_GET ( & sub - > accepted_message_types , message_type_id ) ) ) {
/* Since sub is mostly immutable, no need to lock sub */
sub - > callback ( sub - > data , sub , message ) ;
}
/* Notify that the final message has been processed */
/* Notify that the final message has been processed */
if ( stasis_subscription_final_message ( sub , message ) ) {
if ( final ) {
ao2_lock ( sub ) ;
ao2_lock ( sub ) ;
sub - > final_message_processed = 1 ;
sub - > final_message_processed = 1 ;
ast_cond_signal ( & sub - > join_cond ) ;
ast_cond_signal ( & sub - > join_cond ) ;
@ -500,6 +518,8 @@ struct stasis_subscription *internal_stasis_subscribe(
sub - > callback = callback ;
sub - > callback = callback ;
sub - > data = data ;
sub - > data = data ;
ast_cond_init ( & sub - > join_cond , NULL ) ;
ast_cond_init ( & sub - > join_cond , NULL ) ;
sub - > filter = STASIS_SUBSCRIPTION_FILTER_NONE ;
AST_VECTOR_INIT ( & sub - > accepted_message_types , 0 ) ;
if ( topic_add_subscription ( topic , sub ) ! = 0 ) {
if ( topic_add_subscription ( topic , sub ) ! = 0 ) {
ao2_ref ( sub , - 1 ) ;
ao2_ref ( sub , - 1 ) ;
@ -586,6 +606,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
return res ;
return res ;
}
}
int stasis_subscription_accept_message_type ( struct stasis_subscription * subscription ,
const struct stasis_message_type * type )
{
if ( ! subscription ) {
return - 1 ;
}
ast_assert ( type ! = NULL ) ;
ast_assert ( stasis_message_type_name ( type ) ! = NULL ) ;
if ( ! type | | ! stasis_message_type_name ( type ) ) {
/* Filtering is unreliable as this message type is not yet initialized
* so force all messages through .
*/
subscription - > filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE ;
return 0 ;
}
ao2_lock ( subscription - > topic ) ;
if ( AST_VECTOR_REPLACE ( & subscription - > accepted_message_types , stasis_message_type_id ( type ) , 1 ) ) {
/* We do this for the same reason as above. The subscription can still operate, so allow
* it to do so by forcing all messages through .
*/
subscription - > filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE ;
}
ao2_unlock ( subscription - > topic ) ;
return 0 ;
}
int stasis_subscription_decline_message_type ( struct stasis_subscription * subscription ,
const struct stasis_message_type * type )
{
if ( ! subscription ) {
return - 1 ;
}
ast_assert ( type ! = NULL ) ;
ast_assert ( stasis_message_type_name ( type ) ! = NULL ) ;
if ( ! type | | ! stasis_message_type_name ( type ) ) {
return 0 ;
}
ao2_lock ( subscription - > topic ) ;
if ( stasis_message_type_id ( type ) < AST_VECTOR_SIZE ( & subscription - > accepted_message_types ) ) {
/* The memory is already allocated so this can't fail */
AST_VECTOR_REPLACE ( & subscription - > accepted_message_types , stasis_message_type_id ( type ) , 0 ) ;
}
ao2_unlock ( subscription - > topic ) ;
return 0 ;
}
int stasis_subscription_set_filter ( struct stasis_subscription * subscription ,
enum stasis_subscription_message_filter filter )
{
if ( ! subscription ) {
return - 1 ;
}
ao2_lock ( subscription - > topic ) ;
if ( subscription - > filter ! = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE ) {
subscription - > filter = filter ;
}
ao2_unlock ( subscription - > topic ) ;
return 0 ;
}
void stasis_subscription_join ( struct stasis_subscription * subscription )
void stasis_subscription_join ( struct stasis_subscription * subscription )
{
{
if ( subscription ) {
if ( subscription ) {
@ -781,6 +871,18 @@ static void dispatch_message(struct stasis_subscription *sub,
struct stasis_message * message ,
struct stasis_message * message ,
int synchronous )
int synchronous )
{
{
/* Determine if this subscription is interested in this message. Note that final
* messages are special and are always invoked on the subscription .
*/
if ( sub - > filter = = STASIS_SUBSCRIPTION_FILTER_SELECTIVE ) {
int message_type_id = stasis_message_type_id ( stasis_message_type ( message ) ) ;
if ( ( message_type_id > = AST_VECTOR_SIZE ( & sub - > accepted_message_types ) | |
! AST_VECTOR_GET ( & sub - > accepted_message_types , message_type_id ) ) & &
! stasis_subscription_final_message ( sub , message ) ) {
return ;
}
}
if ( ! sub - > mailbox ) {
if ( ! sub - > mailbox ) {
/* Dispatch directly */
/* Dispatch directly */
subscription_invoke ( sub , message ) ;
subscription_invoke ( sub , message ) ;
@ -840,6 +942,11 @@ static void publish_msg(struct stasis_topic *topic,
ast_assert ( topic ! = NULL ) ;
ast_assert ( topic ! = NULL ) ;
ast_assert ( message ! = NULL ) ;
ast_assert ( message ! = NULL ) ;
/* If there are no subscribers don't bother */
if ( ! stasis_topic_subscribers ( topic ) ) {
return ;
}
/*
/*
* The topic may be unref ' ed by the subscription invocation .
* The topic may be unref ' ed by the subscription invocation .
* Make sure we hold onto a reference while dispatching .
* Make sure we hold onto a reference while dispatching .