@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages)
return AST_TEST_PASS ;
}
AST_TEST_DEFINE ( subscription_pool_messages )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_subscription * , uut , NULL , stasis_unsubscribe ) ;
RAII_VAR ( char * , test_data , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer , NULL , ao2_cleanup ) ;
RAII_VAR ( char * , expected_uniqueid , NULL , ast_free ) ;
int complete ;
struct stasis_subscription_change * change ;
switch ( cmd ) {
case TEST_INIT :
info - > name = __func__ ;
info - > category = test_category ;
info - > summary = " Test subscribe/unsubscribe messages using a threadpool subscription " ;
info - > description = " Test subscribe/unsubscribe messages using a threadpool subscription " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
topic = stasis_topic_create ( " TestTopic " ) ;
ast_test_validate ( test , NULL ! = topic ) ;
consumer = consumer_create ( 0 ) ;
ast_test_validate ( test , NULL ! = consumer ) ;
uut = stasis_subscribe_pool ( topic , consumer_exec , consumer ) ;
ast_test_validate ( test , NULL ! = uut ) ;
ao2_ref ( consumer , + 1 ) ;
expected_uniqueid = ast_strdup ( stasis_subscription_uniqueid ( uut ) ) ;
uut = stasis_unsubscribe ( uut ) ;
complete = consumer_wait_for_completion ( consumer ) ;
ast_test_validate ( test , 1 = = complete ) ;
ast_test_validate ( test , 2 = = consumer - > messages_rxed_len ) ;
ast_test_validate ( test , stasis_subscription_change_type ( ) = = stasis_message_type ( consumer - > messages_rxed [ 0 ] ) ) ;
ast_test_validate ( test , stasis_subscription_change_type ( ) = = stasis_message_type ( consumer - > messages_rxed [ 1 ] ) ) ;
change = stasis_message_data ( consumer - > messages_rxed [ 0 ] ) ;
ast_test_validate ( test , topic = = change - > topic ) ;
ast_test_validate ( test , 0 = = strcmp ( " Subscribe " , change - > description ) ) ;
ast_test_validate ( test , 0 = = strcmp ( expected_uniqueid , change - > uniqueid ) ) ;
change = stasis_message_data ( consumer - > messages_rxed [ 1 ] ) ;
ast_test_validate ( test , topic = = change - > topic ) ;
ast_test_validate ( test , 0 = = strcmp ( " Unsubscribe " , change - > description ) ) ;
ast_test_validate ( test , 0 = = strcmp ( expected_uniqueid , change - > uniqueid ) ) ;
return AST_TEST_PASS ;
}
AST_TEST_DEFINE ( publish )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync)
return AST_TEST_PASS ;
}
AST_TEST_DEFINE ( publish_pool )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_subscription * , uut , NULL , stasis_unsubscribe ) ;
RAII_VAR ( char * , test_data , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer , NULL , ao2_cleanup ) ;
int actual_len ;
const char * actual ;
switch ( cmd ) {
case TEST_INIT :
info - > name = __func__ ;
info - > category = test_category ;
info - > summary = " Test publishing with a threadpool " ;
info - > description = " Test publishing to a subscriber whose \n "
" subscription dictates messages are received through a \n "
" threadpool. " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
topic = stasis_topic_create ( " TestTopic " ) ;
ast_test_validate ( test , NULL ! = topic ) ;
consumer = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer ) ;
uut = stasis_subscribe_pool ( topic , consumer_exec , consumer ) ;
ast_test_validate ( test , NULL ! = uut ) ;
ao2_ref ( consumer , + 1 ) ;
test_data = ao2_alloc ( 1 , NULL ) ;
ast_test_validate ( test , NULL ! = test_data ) ;
test_message_type = stasis_message_type_create ( " TestMessage " , NULL ) ;
test_message = stasis_message_create ( test_message_type , test_data ) ;
stasis_publish ( topic , test_message ) ;
actual_len = consumer_wait_for ( consumer , 1 ) ;
ast_test_validate ( test , 1 = = actual_len ) ;
actual = stasis_message_data ( consumer - > messages_rxed [ 0 ] ) ;
ast_test_validate ( test , test_data = = actual ) ;
return AST_TEST_PASS ;
}
AST_TEST_DEFINE ( unsubscribe_stops_messages )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving)
return AST_TEST_PASS ;
}
AST_TEST_DEFINE ( subscription_interleaving )
{
RAII_VAR ( struct stasis_topic * , parent_topic , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_topic * , topic1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_topic * , topic2 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type , NULL , ao2_cleanup ) ;
RAII_VAR ( char * , test_data , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message2 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message3 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_forward * , forward_sub1 , NULL , stasis_forward_cancel ) ;
RAII_VAR ( struct stasis_forward * , forward_sub2 , NULL , stasis_forward_cancel ) ;
RAII_VAR ( struct stasis_subscription * , sub1 , NULL , stasis_unsubscribe ) ;
RAII_VAR ( struct stasis_subscription * , sub2 , NULL , stasis_unsubscribe ) ;
RAII_VAR ( struct consumer * , consumer1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer2 , NULL , ao2_cleanup ) ;
int actual_len ;
switch ( cmd ) {
case TEST_INIT :
info - > name = __func__ ;
info - > category = test_category ;
info - > summary = " Test sending interleaved events to a parent topic with different subscribers " ;
info - > description = " Test sending events to a parent topic. \n "
" This test creates three topics (one parent, two children) \n "
" and publishes messages alternately between the children. \n "
" It verifies that the messages are received in the expected \n "
" order, for different subscription types: one with a dedicated \n "
" thread, the other on the Stasis threadpool. \n " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
test_message_type = stasis_message_type_create ( " test " , NULL ) ;
ast_test_validate ( test , NULL ! = test_message_type ) ;
test_data = ao2_alloc ( 1 , NULL ) ;
ast_test_validate ( test , NULL ! = test_data ) ;
test_message1 = stasis_message_create ( test_message_type , test_data ) ;
ast_test_validate ( test , NULL ! = test_message1 ) ;
test_message2 = stasis_message_create ( test_message_type , test_data ) ;
ast_test_validate ( test , NULL ! = test_message2 ) ;
test_message3 = stasis_message_create ( test_message_type , test_data ) ;
ast_test_validate ( test , NULL ! = test_message3 ) ;
parent_topic = stasis_topic_create ( " ParentTestTopic " ) ;
ast_test_validate ( test , NULL ! = parent_topic ) ;
topic1 = stasis_topic_create ( " Topic1 " ) ;
ast_test_validate ( test , NULL ! = topic1 ) ;
topic2 = stasis_topic_create ( " Topic2 " ) ;
ast_test_validate ( test , NULL ! = topic2 ) ;
forward_sub1 = stasis_forward_all ( topic1 , parent_topic ) ;
ast_test_validate ( test , NULL ! = forward_sub1 ) ;
forward_sub2 = stasis_forward_all ( topic2 , parent_topic ) ;
ast_test_validate ( test , NULL ! = forward_sub2 ) ;
consumer1 = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer1 ) ;
consumer2 = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer2 ) ;
sub1 = stasis_subscribe ( parent_topic , consumer_exec , consumer1 ) ;
ast_test_validate ( test , NULL ! = sub1 ) ;
ao2_ref ( consumer1 , + 1 ) ;
sub2 = stasis_subscribe_pool ( parent_topic , consumer_exec , consumer2 ) ;
ast_test_validate ( test , NULL ! = sub2 ) ;
ao2_ref ( consumer2 , + 1 ) ;
stasis_publish ( topic1 , test_message1 ) ;
stasis_publish ( topic2 , test_message2 ) ;
stasis_publish ( topic1 , test_message3 ) ;
actual_len = consumer_wait_for ( consumer1 , 3 ) ;
ast_test_validate ( test , 3 = = actual_len ) ;
actual_len = consumer_wait_for ( consumer2 , 3 ) ;
ast_test_validate ( test , 3 = = actual_len ) ;
ast_test_validate ( test , test_message1 = = consumer1 - > messages_rxed [ 0 ] ) ;
ast_test_validate ( test , test_message2 = = consumer1 - > messages_rxed [ 1 ] ) ;
ast_test_validate ( test , test_message3 = = consumer1 - > messages_rxed [ 2 ] ) ;
ast_test_validate ( test , test_message1 = = consumer2 - > messages_rxed [ 0 ] ) ;
ast_test_validate ( test , test_message2 = = consumer2 - > messages_rxed [ 1 ] ) ;
ast_test_validate ( test , test_message3 = = consumer2 - > messages_rxed [ 2 ] ) ;
return AST_TEST_PASS ;
}
struct cache_test_data {
char * id ;
char * value ;
@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router)
return AST_TEST_PASS ;
}
AST_TEST_DEFINE ( router_pool )
{
RAII_VAR ( struct stasis_topic * , topic , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_router * , uut , NULL , stasis_message_router_unsubscribe_and_join ) ;
RAII_VAR ( char * , test_data , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type2 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message_type * , test_message_type3 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer2 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct consumer * , consumer3 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message1 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message2 , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , test_message3 , NULL , ao2_cleanup ) ;
int actual_len , ret ;
struct stasis_message * actual ;
switch ( cmd ) {
case TEST_INIT :
info - > name = __func__ ;
info - > category = test_category ;
info - > summary = " Test message routing via threadpool " ;
info - > description = " Test simple message routing when \n "
" the subscriptions dictate usage of the Stasis \n "
" threadpool. \n " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
topic = stasis_topic_create ( " TestTopic " ) ;
ast_test_validate ( test , NULL ! = topic ) ;
consumer1 = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer1 ) ;
consumer2 = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer2 ) ;
consumer3 = consumer_create ( 1 ) ;
ast_test_validate ( test , NULL ! = consumer3 ) ;
test_message_type1 = stasis_message_type_create ( " TestMessage1 " , NULL ) ;
ast_test_validate ( test , NULL ! = test_message_type1 ) ;
test_message_type2 = stasis_message_type_create ( " TestMessage2 " , NULL ) ;
ast_test_validate ( test , NULL ! = test_message_type2 ) ;
test_message_type3 = stasis_message_type_create ( " TestMessage3 " , NULL ) ;
ast_test_validate ( test , NULL ! = test_message_type3 ) ;
uut = stasis_message_router_create_pool ( topic ) ;
ast_test_validate ( test , NULL ! = uut ) ;
ret = stasis_message_router_add (
uut , test_message_type1 , consumer_exec , consumer1 ) ;
ast_test_validate ( test , 0 = = ret ) ;
ao2_ref ( consumer1 , + 1 ) ;
ret = stasis_message_router_add (
uut , test_message_type2 , consumer_exec , consumer2 ) ;
ast_test_validate ( test , 0 = = ret ) ;
ao2_ref ( consumer2 , + 1 ) ;
ret = stasis_message_router_set_default ( uut , consumer_exec , consumer3 ) ;
ast_test_validate ( test , 0 = = ret ) ;
ao2_ref ( consumer3 , + 1 ) ;
test_data = ao2_alloc ( 1 , NULL ) ;
ast_test_validate ( test , NULL ! = test_data ) ;
test_message1 = stasis_message_create ( test_message_type1 , test_data ) ;
ast_test_validate ( test , NULL ! = test_message1 ) ;
test_message2 = stasis_message_create ( test_message_type2 , test_data ) ;
ast_test_validate ( test , NULL ! = test_message2 ) ;
test_message3 = stasis_message_create ( test_message_type3 , test_data ) ;
ast_test_validate ( test , NULL ! = test_message3 ) ;
stasis_publish ( topic , test_message1 ) ;
stasis_publish ( topic , test_message2 ) ;
stasis_publish ( topic , test_message3 ) ;
actual_len = consumer_wait_for ( consumer1 , 1 ) ;
ast_test_validate ( test , 1 = = actual_len ) ;
actual_len = consumer_wait_for ( consumer2 , 1 ) ;
ast_test_validate ( test , 1 = = actual_len ) ;
actual_len = consumer_wait_for ( consumer3 , 1 ) ;
ast_test_validate ( test , 1 = = actual_len ) ;
actual = consumer1 - > messages_rxed [ 0 ] ;
ast_test_validate ( test , test_message1 = = actual ) ;
actual = consumer2 - > messages_rxed [ 0 ] ;
ast_test_validate ( test , test_message2 = = actual ) ;
actual = consumer3 - > messages_rxed [ 0 ] ;
ast_test_validate ( test , test_message3 = = actual ) ;
/* consumer1 and consumer2 do not get the final message. */
ao2_cleanup ( consumer1 ) ;
ao2_cleanup ( consumer2 ) ;
return AST_TEST_PASS ;
}
static const char * cache_simple ( struct stasis_message * message )
{
const char * type_name =
@ -1748,8 +2050,10 @@ static int unload_module(void)
AST_TEST_UNREGISTER ( message_type ) ;
AST_TEST_UNREGISTER ( message ) ;
AST_TEST_UNREGISTER ( subscription_messages ) ;
AST_TEST_UNREGISTER ( subscription_pool_messages ) ;
AST_TEST_UNREGISTER ( publish ) ;
AST_TEST_UNREGISTER ( publish_sync ) ;
AST_TEST_UNREGISTER ( publish_pool ) ;
AST_TEST_UNREGISTER ( unsubscribe_stops_messages ) ;
AST_TEST_UNREGISTER ( forward ) ;
AST_TEST_UNREGISTER ( cache_filter ) ;
@ -1757,8 +2061,10 @@ static int unload_module(void)
AST_TEST_UNREGISTER ( cache_dump ) ;
AST_TEST_UNREGISTER ( cache_eid_aggregate ) ;
AST_TEST_UNREGISTER ( router ) ;
AST_TEST_UNREGISTER ( router_pool ) ;
AST_TEST_UNREGISTER ( router_cache_updates ) ;
AST_TEST_UNREGISTER ( interleaving ) ;
AST_TEST_UNREGISTER ( subscription_interleaving ) ;
AST_TEST_UNREGISTER ( no_to_json ) ;
AST_TEST_UNREGISTER ( to_json ) ;
AST_TEST_UNREGISTER ( no_to_ami ) ;
@ -1773,8 +2079,10 @@ static int load_module(void)
AST_TEST_REGISTER ( message_type ) ;
AST_TEST_REGISTER ( message ) ;
AST_TEST_REGISTER ( subscription_messages ) ;
AST_TEST_REGISTER ( subscription_pool_messages ) ;
AST_TEST_REGISTER ( publish ) ;
AST_TEST_REGISTER ( publish_sync ) ;
AST_TEST_REGISTER ( publish_pool ) ;
AST_TEST_REGISTER ( unsubscribe_stops_messages ) ;
AST_TEST_REGISTER ( forward ) ;
AST_TEST_REGISTER ( cache_filter ) ;
@ -1782,8 +2090,10 @@ static int load_module(void)
AST_TEST_REGISTER ( cache_dump ) ;
AST_TEST_REGISTER ( cache_eid_aggregate ) ;
AST_TEST_REGISTER ( router ) ;
AST_TEST_REGISTER ( router_pool ) ;
AST_TEST_REGISTER ( router_cache_updates ) ;
AST_TEST_REGISTER ( interleaving ) ;
AST_TEST_REGISTER ( subscription_interleaving ) ;
AST_TEST_REGISTER ( no_to_json ) ;
AST_TEST_REGISTER ( to_json ) ;
AST_TEST_REGISTER ( no_to_ami ) ;