@ -31,12 +31,13 @@
# include "asterisk.h"
# include "asterisk/test.h"
# include "asterisk/threadpool.h"
# include "asterisk/module.h"
# include "asterisk/lock.h"
# include "asterisk/astobj2.h"
# include "asterisk/lock.h"
# include "asterisk/logger.h"
# include "asterisk/module.h"
# include "asterisk/taskprocessor.h"
# include "asterisk/test.h"
# include "asterisk/threadpool.h"
struct test_listener_data {
int num_active ;
@ -1124,11 +1125,12 @@ end:
}
struct complex_task_data {
int task_started ;
int task_executed ;
int continue_task ;
ast_mutex_t lock ;
ast_cond_t stall_cond ;
ast_cond_t done _cond;
ast_cond_t notify _cond;
} ;
static struct complex_task_data * complex_task_data_alloc ( void )
@ -1140,7 +1142,7 @@ static struct complex_task_data *complex_task_data_alloc(void)
}
ast_mutex_init ( & ctd - > lock ) ;
ast_cond_init ( & ctd - > stall_cond , NULL ) ;
ast_cond_init ( & ctd - > done _cond, NULL ) ;
ast_cond_init ( & ctd - > notify _cond, NULL ) ;
return ctd ;
}
@ -1148,12 +1150,15 @@ static int complex_task(void *data)
{
struct complex_task_data * ctd = data ;
SCOPED_MUTEX ( lock , & ctd - > lock ) ;
/* Notify that we started */
ctd - > task_started = 1 ;
ast_cond_signal ( & ctd - > notify_cond ) ;
while ( ! ctd - > continue_task ) {
ast_cond_wait ( & ctd - > stall_cond , lock ) ;
}
/* We got poked. Finish up */
ctd - > task_executed = 1 ;
ast_cond_signal ( & ctd - > done _cond) ;
ast_cond_signal ( & ctd - > notify _cond) ;
return 0 ;
}
@ -1164,6 +1169,42 @@ static void poke_worker(struct complex_task_data *ctd)
ast_cond_signal ( & ctd - > stall_cond ) ;
}
static int wait_for_complex_start ( struct complex_task_data * ctd )
{
struct timeval start = ast_tvnow ( ) ;
struct timespec end = {
. tv_sec = start . tv_sec + 5 ,
. tv_nsec = start . tv_usec * 1000
} ;
SCOPED_MUTEX ( lock , & ctd - > lock ) ;
while ( ! ctd - > task_started ) {
if ( ast_cond_timedwait ( & ctd - > notify_cond , lock , & end ) = = ETIMEDOUT ) {
break ;
}
}
return ctd - > task_started ;
}
static int has_complex_started ( struct complex_task_data * ctd )
{
struct timeval start = ast_tvnow ( ) ;
struct timespec end = {
. tv_sec = start . tv_sec + 1 ,
. tv_nsec = start . tv_usec * 1000
} ;
SCOPED_MUTEX ( lock , & ctd - > lock ) ;
while ( ! ctd - > task_started ) {
if ( ast_cond_timedwait ( & ctd - > notify_cond , lock , & end ) = = ETIMEDOUT ) {
break ;
}
}
return ctd - > task_started ;
}
static enum ast_test_result_state wait_for_complex_completion ( struct complex_task_data * ctd )
{
struct timeval start = ast_tvnow ( ) ;
@ -1175,7 +1216,7 @@ static enum ast_test_result_state wait_for_complex_completion(struct complex_tas
SCOPED_MUTEX ( lock , & ctd - > lock ) ;
while ( ! ctd - > task_executed ) {
if ( ast_cond_timedwait ( & ctd - > done _cond, lock , & end ) = = ETIMEDOUT ) {
if ( ast_cond_timedwait ( & ctd - > notify _cond, lock , & end ) = = ETIMEDOUT ) {
break ;
}
}
@ -1391,6 +1432,177 @@ end:
return res ;
}
AST_TEST_DEFINE ( threadpool_serializer )
{
int started = 0 ;
int finished = 0 ;
enum ast_test_result_state res = AST_TEST_FAIL ;
struct ast_threadpool * pool = NULL ;
struct ast_taskprocessor * uut = NULL ;
struct complex_task_data * data1 = NULL ;
struct complex_task_data * data2 = NULL ;
struct complex_task_data * data3 = NULL ;
struct ast_threadpool_options options = {
. version = AST_THREADPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. initial_size = 2 ,
. max_size = 0 ,
} ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " threadpool_serializer " ;
info - > category = " /main/threadpool/ " ;
info - > summary = " Test that serializers " ;
info - > description =
" Ensures that tasks enqueued to a serialize execute in sequence. \n " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
pool = ast_threadpool_create ( " threadpool_serializer " , NULL , & options ) ;
if ( ! pool ) {
ast_test_status_update ( test , " Could not create threadpool \n " ) ;
goto end ;
}
uut = ast_threadpool_serializer ( " ser1 " , pool ) ;
data1 = complex_task_data_alloc ( ) ;
data2 = complex_task_data_alloc ( ) ;
data3 = complex_task_data_alloc ( ) ;
if ( ! uut | | ! data1 | | ! data2 | | ! data3 ) {
ast_test_status_update ( test , " Allocation failed \n " ) ;
goto end ;
}
/* This should start right away */
if ( ast_taskprocessor_push ( uut , complex_task , data1 ) ) {
ast_test_status_update ( test , " Failed to enqueue data1 \n " ) ;
goto end ;
}
started = wait_for_complex_start ( data1 ) ;
if ( ! started ) {
ast_test_status_update ( test , " Failed to start data1 \n " ) ;
goto end ;
}
/* This should not start until data 1 is complete */
if ( ast_taskprocessor_push ( uut , complex_task , data2 ) ) {
ast_test_status_update ( test , " Failed to enqueue data2 \n " ) ;
goto end ;
}
started = has_complex_started ( data2 ) ;
if ( started ) {
ast_test_status_update ( test , " data2 started out of order \n " ) ;
goto end ;
}
/* But the free thread in the pool can still run */
if ( ast_threadpool_push ( pool , complex_task , data3 ) ) {
ast_test_status_update ( test , " Failed to enqueue data3 \n " ) ;
}
started = wait_for_complex_start ( data3 ) ;
if ( ! started ) {
ast_test_status_update ( test , " Failed to start data3 \n " ) ;
goto end ;
}
/* Finishing data1 should allow data2 to start */
poke_worker ( data1 ) ;
finished = wait_for_complex_completion ( data1 ) = = AST_TEST_PASS ;
if ( ! finished ) {
ast_test_status_update ( test , " data1 couldn't finish \n " ) ;
goto end ;
}
started = wait_for_complex_start ( data2 ) ;
if ( ! started ) {
ast_test_status_update ( test , " Failed to start data2 \n " ) ;
goto end ;
}
/* Finish up */
poke_worker ( data2 ) ;
finished = wait_for_complex_completion ( data2 ) = = AST_TEST_PASS ;
if ( ! finished ) {
ast_test_status_update ( test , " data2 couldn't finish \n " ) ;
goto end ;
}
poke_worker ( data3 ) ;
finished = wait_for_complex_completion ( data3 ) = = AST_TEST_PASS ;
if ( ! finished ) {
ast_test_status_update ( test , " data3 couldn't finish \n " ) ;
goto end ;
}
res = AST_TEST_PASS ;
end :
poke_worker ( data1 ) ;
poke_worker ( data2 ) ;
poke_worker ( data3 ) ;
ast_taskprocessor_unreference ( uut ) ;
ast_threadpool_shutdown ( pool ) ;
ast_free ( data1 ) ;
ast_free ( data2 ) ;
ast_free ( data3 ) ;
return res ;
}
AST_TEST_DEFINE ( threadpool_serializer_dupe )
{
enum ast_test_result_state res = AST_TEST_FAIL ;
struct ast_threadpool * pool = NULL ;
struct ast_taskprocessor * uut = NULL ;
struct ast_taskprocessor * there_can_be_only_one = NULL ;
struct ast_threadpool_options options = {
. version = AST_THREADPOOL_OPTIONS_VERSION ,
. idle_timeout = 0 ,
. auto_increment = 0 ,
. initial_size = 2 ,
. max_size = 0 ,
} ;
switch ( cmd ) {
case TEST_INIT :
info - > name = " threadpool_serializer_dupe " ;
info - > category = " /main/threadpool/ " ;
info - > summary = " Test that serializers are uniquely named " ;
info - > description =
" Creating two serializers with the same name should \n "
" result in error. \n " ;
return AST_TEST_NOT_RUN ;
case TEST_EXECUTE :
break ;
}
pool = ast_threadpool_create ( " threadpool_serializer " , NULL , & options ) ;
if ( ! pool ) {
ast_test_status_update ( test , " Could not create threadpool \n " ) ;
goto end ;
}
uut = ast_threadpool_serializer ( " highlander " , pool ) ;
if ( ! uut ) {
ast_test_status_update ( test , " Allocation failed \n " ) ;
goto end ;
}
there_can_be_only_one = ast_threadpool_serializer ( " highlander " , pool ) ;
if ( there_can_be_only_one ) {
ast_taskprocessor_unreference ( there_can_be_only_one ) ;
ast_test_status_update ( test , " Duplicate name error \n " ) ;
goto end ;
}
res = AST_TEST_PASS ;
end :
ast_taskprocessor_unreference ( uut ) ;
ast_threadpool_shutdown ( pool ) ;
return res ;
}
static int unload_module ( void )
{
ast_test_unregister ( threadpool_push ) ;
@ -1406,6 +1618,8 @@ static int unload_module(void)
ast_test_unregister ( threadpool_reactivation ) ;
ast_test_unregister ( threadpool_task_distribution ) ;
ast_test_unregister ( threadpool_more_destruction ) ;
ast_test_unregister ( threadpool_serializer ) ;
ast_test_unregister ( threadpool_serializer_dupe ) ;
return 0 ;
}
@ -1424,6 +1638,8 @@ static int load_module(void)
ast_test_register ( threadpool_reactivation ) ;
ast_test_register ( threadpool_task_distribution ) ;
ast_test_register ( threadpool_more_destruction ) ;
ast_test_register ( threadpool_serializer ) ;
ast_test_register ( threadpool_serializer_dupe ) ;
return AST_MODULE_LOAD_SUCCESS ;
}