diff --git a/main/threadpool.c b/main/threadpool.c index 6b412d27f9..229528c030 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -168,7 +168,7 @@ static void *worker_start(void *arg); static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool); static int worker_thread_start(struct worker_thread *worker); static int worker_idle(struct worker_thread *worker); -static void worker_set_state(struct worker_thread *worker, enum worker_state state); +static int worker_set_state(struct worker_thread *worker, enum worker_state state); static void worker_shutdown(struct worker_thread *worker); /*! @@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags) worker->id); return 0; } - worker_set_state(worker, ALIVE); + + if (worker_set_state(worker, ALIVE)) { + ast_debug(1, "Failed to activate thread %d. It is dead\n", + worker->id); + /* The worker thread will no longer exist in the active threads or + * idle threads container after this. + */ + ao2_unlink(pool->active_threads, worker); + } + return CMP_MATCH; } @@ -538,20 +547,33 @@ static int queued_task_pushed(void *data) struct task_pushed_data *tpd = data; struct ast_threadpool *pool = tpd->pool; int was_empty = tpd->was_empty; + unsigned int existing_active; if (pool->listener && pool->listener->callbacks->task_pushed) { pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); } - if (ao2_container_count(pool->idle_threads) == 0) { + + existing_active = ao2_container_count(pool->active_threads); + + /* The first pass transitions any existing idle threads to be active, and + * will also remove any worker threads that have recently entered the dead + * state. + */ + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); + + /* If no idle threads could be transitioned to active grow the pool as permitted. */ + if (ao2_container_count(pool->active_threads) == existing_active) { if (!pool->options.auto_increment) { + ao2_ref(tpd, -1); return 0; } grow(pool, pool->options.auto_increment); + /* An optional second pass transitions any newly added threads. */ + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); } - ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, - activate_thread, pool); - threadpool_send_state_changed(pool); ao2_ref(tpd, -1); return 0; @@ -797,7 +819,7 @@ static int queued_set_size(void *data) /* We don't count zombie threads as being "live" when potentially resizing */ unsigned int current_size = ao2_container_count(pool->active_threads) + - ao2_container_count(pool->idle_threads); + ao2_container_count(pool->idle_threads); if (current_size == num_threads) { ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n", @@ -806,6 +828,12 @@ static int queued_set_size(void *data) } if (current_size < num_threads) { + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + activate_thread, pool); + + /* As the above may have altered the number of current threads update it */ + current_size = ao2_container_count(pool->active_threads) + + ao2_container_count(pool->idle_threads); grow(pool, num_threads - current_size); ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, activate_thread, pool); @@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker) * * The threadpool calls into this function in order to let a worker know * how it should proceed. + * + * \retval -1 failure (state transition not permitted) + * \retval 0 success */ -static void worker_set_state(struct worker_thread *worker, enum worker_state state) +static int worker_set_state(struct worker_thread *worker, enum worker_state state) { SCOPED_MUTEX(lock, &worker->lock); + + switch (state) { + case ALIVE: + /* This can occur due to a race condition between being told to go active + * and an idle timeout happening. + */ + if (worker->state == DEAD) { + return -1; + } + ast_assert(worker->state != ZOMBIE); + break; + case DEAD: + break; + case ZOMBIE: + ast_assert(worker->state != DEAD); + break; + } + worker->state = state; worker->wake_up = 1; ast_cond_signal(&worker->cond); + + return 0; } struct serializer { diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 79b369d941..6937c477b4 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -571,6 +571,87 @@ end: return res; } +AST_TEST_DEFINE(threadpool_thread_timeout_thrash) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 1, + .auto_increment = 1, + .initial_size = 0, + .max_size = 1, + }; + int iteration; + + switch (cmd) { + case TEST_INIT: + info->name = "thread_timeout_thrash"; + info->category = "/main/threadpool/"; + info->summary = "Thrash threadpool thread timeout"; + info->description = + "Repeatedly queue a task when a threadpool thread should timeout."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + ast_threadpool_set_size(pool, 1); + + for (iteration = 0; iteration < 30; ++iteration) { + struct simple_task_data *std = NULL; + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + options.idle_timeout, + .tv_nsec = start.tv_usec * 1000 + }; + + std = simple_task_data_alloc(); + if (!std) { + goto end; + } + + /* Wait until the threadpool thread should timeout due to being idle */ + ast_mutex_lock(&tld->lock); + while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) { + /* This purposely left empty as we want to loop waiting for a time out */ + } + ast_mutex_unlock(&tld->lock); + + ast_threadpool_push(pool, simple_task, std); + } + + res = wait_until_thread_state(test, tld, 0, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 1, 30, 0, 0, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return res; +} + AST_TEST_DEFINE(threadpool_one_task_one_thread) { struct ast_threadpool *pool = NULL; @@ -1610,6 +1691,7 @@ static int unload_module(void) ast_test_unregister(threadpool_thread_creation); ast_test_unregister(threadpool_thread_destruction); ast_test_unregister(threadpool_thread_timeout); + ast_test_unregister(threadpool_thread_timeout_thrash); ast_test_unregister(threadpool_one_task_one_thread); ast_test_unregister(threadpool_one_thread_one_task); ast_test_unregister(threadpool_one_thread_multiple_tasks); @@ -1630,6 +1712,7 @@ static int load_module(void) ast_test_register(threadpool_thread_creation); ast_test_register(threadpool_thread_destruction); ast_test_register(threadpool_thread_timeout); + ast_test_register(threadpool_thread_timeout_thrash); ast_test_register(threadpool_one_task_one_thread); ast_test_register(threadpool_one_thread_one_task); ast_test_register(threadpool_one_thread_multiple_tasks);