diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index e18ba6c696..15093175a9 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -177,4 +177,33 @@ int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), vo * \param pool The pool to shut down */ void ast_threadpool_shutdown(struct ast_threadpool *pool); + +/*! + * \brief Serialized execution of tasks within a \ref ast_threadpool. + * + * \since 12.0.0 + * + * A \ref ast_taskprocessor with the same contract as a default taskprocessor + * (tasks execute serially) except instead of executing out of a dedicated + * thread, execution occurs in a thread from a \ref ast_threadpool. Think of it + * as a lightweight thread. + * + * While it guarantees that each task will complete before executing the next, + * there is no guarantee as to which thread from the \c pool individual tasks + * will execute. This normally only matters if your code relys on thread + * specific information, such as thread locals. + * + * Use ast_taskprocessor_unreference() to dispose of the returned \ref + * ast_taskprocessor. + * + * Only a single taskprocessor with a given name may exist. This function will fail + * if a taskprocessor with the given name already exists. + * + * \param name Name of the serializer. (must be unique) + * \param pool \ref ast_threadpool for execution. + * \return \ref ast_taskprocessor for enqueuing work. + * \return \c NULL on error. + */ +struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool); + #endif /* ASTERISK_THREADPOOL_H */ diff --git a/main/taskprocessor.c b/main/taskprocessor.c index b603e57384..35076b06e1 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -75,8 +75,8 @@ struct ast_taskprocessor { /*! \brief Taskprocessor singleton list entry */ AST_LIST_ENTRY(ast_taskprocessor) list; struct ast_taskprocessor_listener *listener; - /*! Indicates if the taskprocessor is in the process of shuting down */ - unsigned int shutting_down:1; + /*! Indicates if the taskprocessor is currently executing a task */ + unsigned int executing:1; }; /*! @@ -197,6 +197,8 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int { struct default_taskprocessor_listener_pvt *pvt = listener->user_data; + ast_assert(!pvt->dead); + if (was_empty) { default_tps_wake_up(pvt, 0); } @@ -447,10 +449,6 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) struct tps_task *task; SCOPED_AO2LOCK(lock, tps); - if (tps->shutting_down) { - return NULL; - } - if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) { tps->tps_queue_size--; } @@ -643,6 +641,7 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void * { struct tps_task *t; int previous_size; + int was_empty; if (!tps || !task_exe) { ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor"); @@ -655,8 +654,10 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void * ao2_lock(tps); AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); previous_size = tps->tps_queue_size++; + /* The currently executing task counts as still in queue */ + was_empty = tps->executing ? 0 : previous_size == 0; ao2_unlock(tps); - tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1); + tps->listener->callbacks->task_pushed(tps->listener, was_empty); return 0; } @@ -665,17 +666,26 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) struct tps_task *t; int size; - if (!(t = tps_taskprocessor_pop(tps))) { - return 0; - } + ao2_lock(tps); + tps->executing = 1; + ao2_unlock(tps); - t->execute(t->datap); + t = tps_taskprocessor_pop(tps); - tps_task_free(t); + if (t) { + t->execute(t->datap); + tps_task_free(t); + } ao2_lock(tps); + /* We need to check size in the same critical section where we reset the + * executing bit. Avoids a race condition where a task is pushed right + * after we pop an empty stack. + */ + tps->executing = 0; size = tps_taskprocessor_depth(tps); - if (tps->stats) { + /* If we executed a task, bump the stats */ + if (t && tps->stats) { tps->stats->_tasks_processed_count++; if (size > tps->stats->max_qsize) { tps->stats->max_qsize = size; @@ -683,9 +693,9 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) } ao2_unlock(tps); - if (size == 0 && tps->listener->callbacks->emptied) { + /* If we executed a task, check for the transition to empty */ + if (t && size == 0 && tps->listener->callbacks->emptied) { tps->listener->callbacks->emptied(tps->listener); - return 0; } - return 1; + return size > 0; } diff --git a/main/threadpool.c b/main/threadpool.c index adaf8a5543..ab390e9d82 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -866,7 +866,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name, if (!pool) { return NULL; } - + tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool); if (!tps_listener) { return NULL; @@ -1103,3 +1103,88 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta ast_cond_signal(&worker->cond); } +struct serializer { + struct ast_threadpool *pool; +}; + +static void serializer_dtor(void *obj) +{ + struct serializer *ser = obj; + ao2_cleanup(ser->pool); + ser->pool = NULL; +} + +static struct serializer *serializer_create(struct ast_threadpool *pool) +{ + struct serializer *ser = ao2_alloc(sizeof(*ser), serializer_dtor); + if (!ser) { + return NULL; + } + ao2_ref(pool, +1); + ser->pool = pool; + return ser; +} + +static int execute_tasks(void *data) +{ + struct ast_taskprocessor *tps = data; + + while (ast_taskprocessor_execute(tps)) { + /* No-op */ + } + + ast_taskprocessor_unreference(tps); + return 0; +} + +static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { + if (was_empty) { + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener); + ast_threadpool_push(ser->pool, execute_tasks, tps); + } +}; + +static int serializer_start(struct ast_taskprocessor_listener *listener) +{ + /* No-op */ + return 0; +} + +static void serializer_shutdown(struct ast_taskprocessor_listener *listener) +{ + struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + ao2_cleanup(ser); +} + +static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = { + .task_pushed = serializer_task_pushed, + .start = serializer_start, + .shutdown = serializer_shutdown, +}; + +struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool) +{ + RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup); + RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup); + struct ast_taskprocessor *tps = NULL; + + ser = serializer_create(pool); + if (!ser) { + return NULL; + } + + listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser); + if (!listener) { + return NULL; + } + ser = NULL; /* ownership transferred to listener */ + + tps = ast_taskprocessor_create_with_listener(name, listener); + if (!tps) { + return NULL; + } + listener = NULL; /* ownership transferred to tps */ + + return tps; +} diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index e370dd78f3..70400a9ec7 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -450,11 +450,193 @@ test_exit: return res; } +struct shutdown_data { + ast_cond_t in; + ast_cond_t out; + ast_mutex_t lock; + int task_complete; + int task_started; + int task_stop_waiting; +}; + +static void shutdown_data_dtor(void *data) +{ + struct shutdown_data *shutdown_data = data; + ast_mutex_destroy(&shutdown_data->lock); + ast_cond_destroy(&shutdown_data->in); + ast_cond_destroy(&shutdown_data->out); +} + +static struct shutdown_data *shutdown_data_create(int dont_wait) +{ + RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup); + + shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor); + if (!shutdown_data) { + return NULL; + } + + ast_mutex_init(&shutdown_data->lock); + ast_cond_init(&shutdown_data->in, NULL); + ast_cond_init(&shutdown_data->out, NULL); + shutdown_data->task_stop_waiting = dont_wait; + ao2_ref(shutdown_data, +1); + return shutdown_data; +} + +static int shutdown_task_exec(void *data) +{ + struct shutdown_data *shutdown_data = data; + SCOPED_MUTEX(lock, &shutdown_data->lock); + shutdown_data->task_started = 1; + ast_cond_signal(&shutdown_data->out); + while (!shutdown_data->task_stop_waiting) { + ast_cond_wait(&shutdown_data->in, &shutdown_data->lock); + } + shutdown_data->task_complete = 1; + ast_cond_signal(&shutdown_data->out); + return 0; +} + +static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &shutdown_data->lock); + + while (!shutdown_data->task_complete) { + if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) { + break; + } + } + + return shutdown_data->task_complete; +} + +static int shutdown_has_completed(struct shutdown_data *shutdown_data) +{ + SCOPED_MUTEX(lock, &shutdown_data->lock); + return shutdown_data->task_complete; +} + +static int shutdown_waitfor_start(struct shutdown_data *shutdown_data) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &shutdown_data->lock); + + while (!shutdown_data->task_started) { + if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) { + break; + } + } + + return shutdown_data->task_started; +} + +static void shutdown_poke(struct shutdown_data *shutdown_data) +{ + SCOPED_MUTEX(lock, &shutdown_data->lock); + shutdown_data->task_stop_waiting = 1; + ast_cond_signal(&shutdown_data->in); +} + +static void *tps_shutdown_thread(void *data) +{ + struct ast_taskprocessor *tps = data; + ast_taskprocessor_unreference(tps); + return NULL; +} + +AST_TEST_DEFINE(taskprocessor_shutdown) +{ + RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference); + RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup); + RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup); + int push_res; + int wait_res; + int pthread_res; + pthread_t shutdown_thread; + + switch (cmd) { + case TEST_INIT: + info->name = "taskprocessor_shutdown"; + info->category = "/main/taskprocessor/"; + info->summary = "Test of taskproccesor shutdown sequence"; + info->description = + "Ensures that all tasks run to completion after the taskprocessor has been unref'ed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT); + task1 = shutdown_data_create(0); /* task1 waits to be poked */ + task2 = shutdown_data_create(1); /* task2 waits for nothing */ + + if (!tps || !task1 || !task2) { + ast_test_status_update(test, "Allocation error\n"); + return AST_TEST_FAIL; + } + + push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1); + if (push_res != 0) { + ast_test_status_update(test, "Could not push task1\n"); + return AST_TEST_FAIL; + } + + push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2); + if (push_res != 0) { + ast_test_status_update(test, "Could not push task2\n"); + return AST_TEST_FAIL; + } + + wait_res = shutdown_waitfor_start(task1); + if (!wait_res) { + ast_test_status_update(test, "Task1 didn't start\n"); + return AST_TEST_FAIL; + } + + pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps); + if (pthread_res != 0) { + ast_test_status_update(test, "Failed to create shutdown thread\n"); + return AST_TEST_FAIL; + } + tps = NULL; + + /* Wakeup task1; it should complete */ + shutdown_poke(task1); + wait_res = shutdown_waitfor_completion(task1); + if (!wait_res) { + ast_test_status_update(test, "Task1 didn't complete\n"); + return AST_TEST_FAIL; + } + + /* Wait for shutdown to complete */ + pthread_join(shutdown_thread, NULL); + + /* Should have also also completed task2 */ + wait_res = shutdown_has_completed(task2); + if (!wait_res) { + ast_test_status_update(test, "Task2 didn't finish\n"); + return AST_TEST_FAIL; + } + + return AST_TEST_PASS; +} + static int unload_module(void) { ast_test_unregister(default_taskprocessor); ast_test_unregister(default_taskprocessor_load); ast_test_unregister(taskprocessor_listener); + ast_test_unregister(taskprocessor_shutdown); return 0; } @@ -463,6 +645,7 @@ static int load_module(void) ast_test_register(default_taskprocessor); ast_test_register(default_taskprocessor_load); ast_test_register(taskprocessor_listener); + ast_test_register(taskprocessor_shutdown); return AST_MODULE_LOAD_SUCCESS; } diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 712b8581b5..79b369d941 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -31,12 +31,13 @@ #include "asterisk.h" -#include "asterisk/test.h" -#include "asterisk/threadpool.h" -#include "asterisk/module.h" -#include "asterisk/lock.h" #include "asterisk/astobj2.h" +#include "asterisk/lock.h" #include "asterisk/logger.h" +#include "asterisk/module.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/test.h" +#include "asterisk/threadpool.h" struct test_listener_data { int num_active; @@ -1124,11 +1125,12 @@ end: } struct complex_task_data { + int task_started; int task_executed; int continue_task; ast_mutex_t lock; ast_cond_t stall_cond; - ast_cond_t done_cond; + ast_cond_t notify_cond; }; static struct complex_task_data *complex_task_data_alloc(void) @@ -1140,7 +1142,7 @@ static struct complex_task_data *complex_task_data_alloc(void) } ast_mutex_init(&ctd->lock); ast_cond_init(&ctd->stall_cond, NULL); - ast_cond_init(&ctd->done_cond, NULL); + ast_cond_init(&ctd->notify_cond, NULL); return ctd; } @@ -1148,12 +1150,15 @@ static int complex_task(void *data) { struct complex_task_data *ctd = data; SCOPED_MUTEX(lock, &ctd->lock); + /* Notify that we started */ + ctd->task_started = 1; + ast_cond_signal(&ctd->notify_cond); while (!ctd->continue_task) { ast_cond_wait(&ctd->stall_cond, lock); } /* We got poked. Finish up */ ctd->task_executed = 1; - ast_cond_signal(&ctd->done_cond); + ast_cond_signal(&ctd->notify_cond); return 0; } @@ -1164,6 +1169,42 @@ static void poke_worker(struct complex_task_data *ctd) ast_cond_signal(&ctd->stall_cond); } +static int wait_for_complex_start(struct complex_task_data *ctd) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &ctd->lock); + + while (!ctd->task_started) { + if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + return ctd->task_started; +} + +static int has_complex_started(struct complex_task_data *ctd) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 1, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &ctd->lock); + + while (!ctd->task_started) { + if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + return ctd->task_started; +} + static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd) { struct timeval start = ast_tvnow(); @@ -1175,7 +1216,7 @@ static enum ast_test_result_state wait_for_complex_completion(struct complex_tas SCOPED_MUTEX(lock, &ctd->lock); while (!ctd->task_executed) { - if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) { + if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) { break; } } @@ -1391,6 +1432,177 @@ end: return res; } +AST_TEST_DEFINE(threadpool_serializer) +{ + int started = 0; + int finished = 0; + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool *pool = NULL; + struct ast_taskprocessor *uut = NULL; + struct complex_task_data *data1 = NULL; + struct complex_task_data *data2 = NULL; + struct complex_task_data *data3 = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 2, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "threadpool_serializer"; + info->category = "/main/threadpool/"; + info->summary = "Test that serializers"; + info->description = + "Ensures that tasks enqueued to a serialize execute in sequence.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + pool = ast_threadpool_create("threadpool_serializer", NULL, &options); + if (!pool) { + ast_test_status_update(test, "Could not create threadpool\n"); + goto end; + } + uut = ast_threadpool_serializer("ser1", pool); + data1 = complex_task_data_alloc(); + data2 = complex_task_data_alloc(); + data3 = complex_task_data_alloc(); + if (!uut || !data1 || !data2 || !data3) { + ast_test_status_update(test, "Allocation failed\n"); + goto end; + } + + /* This should start right away */ + if (ast_taskprocessor_push(uut, complex_task, data1)) { + ast_test_status_update(test, "Failed to enqueue data1\n"); + goto end; + } + started = wait_for_complex_start(data1); + if (!started) { + ast_test_status_update(test, "Failed to start data1\n"); + goto end; + } + + /* This should not start until data 1 is complete */ + if (ast_taskprocessor_push(uut, complex_task, data2)) { + ast_test_status_update(test, "Failed to enqueue data2\n"); + goto end; + } + started = has_complex_started(data2); + if (started) { + ast_test_status_update(test, "data2 started out of order\n"); + goto end; + } + + /* But the free thread in the pool can still run */ + if (ast_threadpool_push(pool, complex_task, data3)) { + ast_test_status_update(test, "Failed to enqueue data3\n"); + } + started = wait_for_complex_start(data3); + if (!started) { + ast_test_status_update(test, "Failed to start data3\n"); + goto end; + } + + /* Finishing data1 should allow data2 to start */ + poke_worker(data1); + finished = wait_for_complex_completion(data1) == AST_TEST_PASS; + if (!finished) { + ast_test_status_update(test, "data1 couldn't finish\n"); + goto end; + } + started = wait_for_complex_start(data2); + if (!started) { + ast_test_status_update(test, "Failed to start data2\n"); + goto end; + } + + /* Finish up */ + poke_worker(data2); + finished = wait_for_complex_completion(data2) == AST_TEST_PASS; + if (!finished) { + ast_test_status_update(test, "data2 couldn't finish\n"); + goto end; + } + poke_worker(data3); + finished = wait_for_complex_completion(data3) == AST_TEST_PASS; + if (!finished) { + ast_test_status_update(test, "data3 couldn't finish\n"); + goto end; + } + + res = AST_TEST_PASS; + +end: + poke_worker(data1); + poke_worker(data2); + poke_worker(data3); + ast_taskprocessor_unreference(uut); + ast_threadpool_shutdown(pool); + ast_free(data1); + ast_free(data2); + ast_free(data3); + return res; +} + +AST_TEST_DEFINE(threadpool_serializer_dupe) +{ + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool *pool = NULL; + struct ast_taskprocessor *uut = NULL; + struct ast_taskprocessor *there_can_be_only_one = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 2, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "threadpool_serializer_dupe"; + info->category = "/main/threadpool/"; + info->summary = "Test that serializers are uniquely named"; + info->description = + "Creating two serializers with the same name should\n" + "result in error.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + pool = ast_threadpool_create("threadpool_serializer", NULL, &options); + if (!pool) { + ast_test_status_update(test, "Could not create threadpool\n"); + goto end; + } + + uut = ast_threadpool_serializer("highlander", pool); + if (!uut) { + ast_test_status_update(test, "Allocation failed\n"); + goto end; + } + + there_can_be_only_one = ast_threadpool_serializer("highlander", pool); + if (there_can_be_only_one) { + ast_taskprocessor_unreference(there_can_be_only_one); + ast_test_status_update(test, "Duplicate name error\n"); + goto end; + } + + res = AST_TEST_PASS; + +end: + ast_taskprocessor_unreference(uut); + ast_threadpool_shutdown(pool); + return res; +} + static int unload_module(void) { ast_test_unregister(threadpool_push); @@ -1406,6 +1618,8 @@ static int unload_module(void) ast_test_unregister(threadpool_reactivation); ast_test_unregister(threadpool_task_distribution); ast_test_unregister(threadpool_more_destruction); + ast_test_unregister(threadpool_serializer); + ast_test_unregister(threadpool_serializer_dupe); return 0; } @@ -1424,6 +1638,8 @@ static int load_module(void) ast_test_register(threadpool_reactivation); ast_test_register(threadpool_task_distribution); ast_test_register(threadpool_more_destruction); + ast_test_register(threadpool_serializer); + ast_test_register(threadpool_serializer_dupe); return AST_MODULE_LOAD_SUCCESS; }