@ -140,7 +140,6 @@ AST_TEST_DEFINE(message)
}
struct consumer {
ast_mutex_t lock ;
ast_cond_t out ;
struct stasis_message * * messages_rxed ;
size_t messages_rxed_len ;
@ -148,10 +147,10 @@ struct consumer {
int complete ;
} ;
static void consumer_dtor ( void * obj ) {
static void consumer_dtor ( void * obj )
{
struct consumer * consumer = obj ;
ast_mutex_destroy ( & consumer - > lock ) ;
ast_cond_destroy ( & consumer - > out ) ;
while ( consumer - > messages_rxed_len > 0 ) {
@ -161,25 +160,24 @@ static void consumer_dtor(void *obj) {
consumer - > messages_rxed = NULL ;
}
static struct consumer * consumer_create ( int ignore_subscriptions ) {
RAII_VAR ( struct consumer * , consumer , NULL , ao2_cleanup ) ;
static struct consumer * consumer_create ( int ignore_subscriptions )
{
struct consumer * consumer ;
consumer = ao2_alloc ( sizeof ( * consumer ) , consumer_dtor ) ;
if ( ! consumer ) {
return NULL ;
}
consumer - > ignore_subscriptions = ignore_subscriptions ;
consumer - > messages_rxed = ast_malloc ( 0 ) ;
consumer - > messages_rxed = ast_malloc ( sizeof ( * consumer - > messages_rxed ) ) ;
if ( ! consumer - > messages_rxed ) {
ao2_cleanup ( consumer ) ;
return NULL ;
}
ast_mutex_init ( & consumer - > lock ) ;
ast_cond_init ( & consumer - > out , NULL ) ;
ao2_ref ( consumer , + 1 ) ;
return consumer ;
}
@ -187,10 +185,9 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
{
struct consumer * consumer = data ;
RAII_VAR ( struct consumer * , consumer_needs_cleanup , NULL , ao2_cleanup ) ;
SCOPED_ MUTEX( lock , & consumer - > lock ) ;
SCOPED_ AO2LOCK( lock , consumer ) ;
if ( ! consumer - > ignore_subscriptions | | stasis_message_type ( message ) ! = stasis_subscription_change_type ( ) ) {
+ + consumer - > messages_rxed_len ;
consumer - > messages_rxed = ast_realloc ( consumer - > messages_rxed , sizeof ( * consumer - > messages_rxed ) * consumer - > messages_rxed_len ) ;
ast_assert ( consumer - > messages_rxed ! = NULL ) ;
@ -210,10 +207,9 @@ static void consumer_exec_sync(void *data, struct stasis_subscription *sub, stru
{
struct consumer * consumer = data ;
RAII_VAR ( struct consumer * , consumer_needs_cleanup , NULL , ao2_cleanup ) ;
SCOPED_ MUTEX( lock , & consumer - > lock ) ;
SCOPED_ AO2LOCK( lock , consumer ) ;
if ( ! consumer - > ignore_subscriptions | | stasis_message_type ( message ) ! = stasis_subscription_change_type ( ) ) {
+ + consumer - > messages_rxed_len ;
consumer - > messages_rxed = ast_realloc ( consumer - > messages_rxed , sizeof ( * consumer - > messages_rxed ) * consumer - > messages_rxed_len ) ;
ast_assert ( consumer - > messages_rxed ! = NULL ) ;
@ -235,10 +231,11 @@ static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
. tv_nsec = start . tv_usec * 1000
} ;
SCOPED_ MUTEX( lock , & consumer - > lock ) ;
SCOPED_ AO2LOCK( lock , consumer ) ;
while ( consumer - > messages_rxed_len < expected_len ) {
int r = ast_cond_timedwait ( & consumer - > out , & consumer - > lock , & end ) ;
int r = ast_cond_timedwait ( & consumer - > out , ao2_object_get_lockaddr ( consumer ) , & end ) ;
if ( r = = ETIMEDOUT ) {
break ;
}
@ -255,10 +252,11 @@ static int consumer_wait_for_completion(struct consumer *consumer)
. tv_nsec = start . tv_usec * 1000
} ;
SCOPED_ MUTEX( lock , & consumer - > lock ) ;
SCOPED_ AO2LOCK( lock , consumer ) ;
while ( ! consumer - > complete ) {
int r = ast_cond_timedwait ( & consumer - > out , & consumer - > lock , & end ) ;
int r = ast_cond_timedwait ( & consumer - > out , ao2_object_get_lockaddr ( consumer ) , & end ) ;
if ( r = = ETIMEDOUT ) {
break ;
}
@ -280,10 +278,11 @@ static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
. tv_nsec = end_tv . tv_usec * 1000
} ;
SCOPED_ MUTEX( lock , & consumer - > lock ) ;
SCOPED_ AO2LOCK( lock , consumer ) ;
while ( consumer - > messages_rxed_len = = expected_len ) {
int r = ast_cond_timedwait ( & consumer - > out , & consumer - > lock , & end ) ;
int r = ast_cond_timedwait ( & consumer - > out , ao2_object_get_lockaddr ( consumer ) , & end ) ;
if ( r = = ETIMEDOUT ) {
break ;
}
@ -669,7 +668,8 @@ static struct stasis_message *cache_test_message_create(struct stasis_message_ty
return stasis_message_create ( type , data ) ;
}
static const char * cache_test_data_id ( struct stasis_message * message ) {
static const char * cache_test_data_id ( struct stasis_message * message )
{
struct cache_test_data * cachable = stasis_message_data ( message ) ;
if ( 0 ! = strcmp ( " Cacheable " , stasis_message_type_name ( stasis_message_type ( message ) ) ) ) {
@ -1033,7 +1033,8 @@ AST_TEST_DEFINE(router)
return AST_TEST_PASS ;
}
static const char * cache_simple ( struct stasis_message * message ) {
static const char * cache_simple ( struct stasis_message * message )
{
const char * type_name =
stasis_message_type_name ( stasis_message_type ( message ) ) ;
if ( ! ast_begins_with ( type_name , " Cache " ) ) {