@ -230,7 +230,7 @@ static int consumer_wait_for_completion(struct consumer *consumer)
{
struct timeval start = ast_tvnow ( ) ;
struct timespec end = {
. tv_sec = start . tv_sec + 3 0 ,
. tv_sec = start . tv_sec + 3 ,
. tv_nsec = start . tv_usec * 1000
} ;
@ -867,7 +867,7 @@ AST_TEST_DEFINE(cache_dump)
AST_TEST_DEFINE ( route_conflicts )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_router * , uut , NULL , stasis_message_router_unsubscribe ) ;
RAII_VAR ( struct stasis_message_router * , uut , NULL , stasis_message_router_unsubscribe _and_join ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer2 , NULL , ao2_cleanup ) ;
@ -913,7 +913,7 @@ AST_TEST_DEFINE(route_conflicts)
AST_TEST_DEFINE ( router )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_router * , uut , NULL , stasis_message_router_unsubscribe ) ;
RAII_VAR ( struct stasis_message_router * , uut , NULL , stasis_message_router_unsubscribe _and_join ) ;
RAII_VAR ( char * , test_data , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type2 , NULL , ao2_cleanup ) ;
@ -1006,6 +1006,126 @@ AST_TEST_DEFINE(router)
return AST_TEST_PASS ;
}
static const char * cache_simple ( struct stasis_message * message ) {
const char * type_name =
stasis_message_type_name ( stasis_message_type ( message ) ) ;
if ( ! ast_begins_with ( type_name , " Cache " ) ) {
return NULL ;
}
return " cached " ;
}
AST_TEST_DEFINE ( router_cache_updates )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_caching_topic * , caching_topic , NULL , stasis_caching_unsubscribe ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type2 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type3 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_router * , uut , NULL , stasis_message_router_unsubscribe_and_join ) ;
RAII_VAR ( char * , test_data , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message2 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message3 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer2 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer3 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , message1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , message2 , NULL , ao2_cleanup ) ;
struct stasis_cache_update * update ;
int actual_len , ret ;
struct stasis_message * actual ;
switch ( cmd ) {
case TEST_INIT :
info - > name = __func__ ;
info - > category = test_category ;
info - > summary = " Test special handling cache_update messages " ;
info - > description = " Test special handling cache_update messages " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
topic = stasis_topic_create ( " TestTopic " ) ;
ast_test_validate ( test , NULL ! = topic ) ;
caching_topic = stasis_caching_topic_create ( topic , cache_simple ) ;
ast_test_validate ( test , NULL ! = caching_topic ) ;
consumer1 = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer1 ) ;
consumer2 = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer2 ) ;
consumer3 = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer3 ) ;
test_message_type1 = stasis_message_type_create ( " Cache1 " , NULL ) ;
ast_test_validate ( test , NULL ! = test_message_type1 ) ;
test_message_type2 = stasis_message_type_create ( " Cache2 " , NULL ) ;
ast_test_validate ( test , NULL ! = test_message_type2 ) ;
test_message_type3 = stasis_message_type_create ( " NonCache " , NULL ) ;
ast_test_validate ( test , NULL ! = test_message_type3 ) ;
uut = stasis_message_router_create (
stasis_caching_get_topic ( caching_topic ) ) ;
ast_test_validate ( test , NULL ! = uut ) ;
ret = stasis_message_router_add_cache_update (
uut , test_message_type1 , consumer_exec , consumer1 ) ;
ast_test_validate ( test , 0 = = ret ) ;
ao2_ref ( consumer1 , + 1 ) ;
ret = stasis_message_router_add (
uut , stasis_cache_update_type ( ) , consumer_exec , consumer2 ) ;
ast_test_validate ( test , 0 = = ret ) ;
ao2_ref ( consumer2 , + 1 ) ;
ret = stasis_message_router_set_default ( uut , consumer_exec , consumer3 ) ;
ast_test_validate ( test , 0 = = ret ) ;
ao2_ref ( consumer3 , + 1 ) ;
test_data = ao2_alloc ( 1 , NULL ) ;
ast_test_validate ( test , NULL ! = test_data ) ;
test_message1 = stasis_message_create ( test_message_type1 , test_data ) ;
ast_test_validate ( test , NULL ! = test_message1 ) ;
test_message2 = stasis_message_create ( test_message_type2 , test_data ) ;
ast_test_validate ( test , NULL ! = test_message2 ) ;
test_message3 = stasis_message_create ( test_message_type3 , test_data ) ;
ast_test_validate ( test , NULL ! = test_message3 ) ;
stasis_publish ( topic , test_message1 ) ;
stasis_publish ( topic , test_message2 ) ;
stasis_publish ( topic , test_message3 ) ;
actual_len = consumer_wait_for ( consumer1 , 1 ) ;
ast_test_validate ( test , 1 = = actual_len ) ;
actual_len = consumer_wait_for ( consumer2 , 1 ) ;
ast_test_validate ( test , 1 = = actual_len ) ;
actual_len = consumer_wait_for ( consumer3 , 1 ) ;
ast_test_validate ( test , 1 = = actual_len ) ;
actual = consumer1 - > messages_rxed [ 0 ] ;
ast_test_validate ( test , stasis_cache_update_type ( ) = = stasis_message_type ( actual ) ) ;
update = stasis_message_data ( actual ) ;
ast_test_validate ( test , test_message_type1 = = update - > type ) ;
ast_test_validate ( test , test_message1 = = update - > new_snapshot ) ;
actual = consumer2 - > messages_rxed [ 0 ] ;
ast_test_validate ( test , stasis_cache_update_type ( ) = = stasis_message_type ( actual ) ) ;
update = stasis_message_data ( actual ) ;
ast_test_validate ( test , test_message_type2 = = update - > type ) ;
ast_test_validate ( test , test_message2 = = update - > new_snapshot ) ;
actual = consumer3 - > messages_rxed [ 0 ] ;
ast_test_validate ( test , test_message3 = = actual ) ;
/* consumer1 and consumer2 do not get the final message. */
ao2_cleanup ( consumer1 ) ;
ao2_cleanup ( consumer2 ) ;
return AST_TEST_PASS ;
}
AST_TEST_DEFINE ( no_to_json )
{
RAII_VAR ( struct stasis_message_type * , type , NULL , ao2_cleanup ) ;
@ -1160,6 +1280,7 @@ static int unload_module(void)
AST_TEST_UNREGISTER ( cache_dump ) ;
AST_TEST_UNREGISTER ( route_conflicts ) ;
AST_TEST_UNREGISTER ( router ) ;
AST_TEST_UNREGISTER ( router_cache_updates ) ;
AST_TEST_UNREGISTER ( interleaving ) ;
AST_TEST_UNREGISTER ( no_to_json ) ;
AST_TEST_UNREGISTER ( to_json ) ;
@ -1181,6 +1302,7 @@ static int load_module(void)
AST_TEST_REGISTER ( cache_dump ) ;
AST_TEST_REGISTER ( route_conflicts ) ;
AST_TEST_REGISTER ( router ) ;
AST_TEST_REGISTER ( router_cache_updates ) ;
AST_TEST_REGISTER ( interleaving ) ;
AST_TEST_REGISTER ( no_to_json ) ;
AST_TEST_REGISTER ( to_json ) ;