@ -45,6 +45,11 @@ struct ast_threadpool {
* Idle threads are those that are currenly waiting to run tasks
*/
struct ao2_container * idle_threads ;
/*!
* \ brief The container of zombie threads .
* Zombie threads may be running tasks , but they are scheduled to die soon
*/
struct ao2_container * zombie_threads ;
/*!
* \ brief The main taskprocessor
*
@ -79,7 +84,7 @@ struct ast_threadpool {
* This is done for three main reasons
* 1 ) It ensures that listeners are given an accurate portrayal
* of the threadpool ' s current state . In other words , when a listener
* gets told a count of active and idl e threads , it does not
* gets told a count of active , idle and zomb ie threads , it does not
* need to worry that internal state of the threadpool might be different
* from what it has been told .
* 2 ) It minimizes the locking required in both the threadpool and in
@ -96,7 +101,20 @@ struct ast_threadpool {
enum worker_state {
/*! The worker is either active or idle */
ALIVE ,
/*! The worker has been asked to shut down. */
/*!
* The worker has been asked to shut down but
* may still be in the process of executing tasks .
* This transition happens when the threadpool needs
* to shrink and needs to kill active threads in order
* to do so .
*/
ZOMBIE ,
/*!
* The worker has been asked to shut down . Typically
* only idle threads go to this state directly , but
* active threads may go straight to this state when
* the threadpool is shut down .
*/
DEAD ,
} ;
@ -201,6 +219,41 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool,
ast_taskprocessor_push ( pool - > control_tps , queued_active_thread_idle , pair ) ;
}
/*!
* \ brief Kill a zombie thread
*
* This runs from the threadpool ' s control taskprocessor thread .
*
* \ param data A thread_worker_pair containing the threadpool and the zombie thread
* \ return 0
*/
static int queued_zombie_thread_dead ( void * data )
{
struct thread_worker_pair * pair = data ;
ao2_unlink ( pair - > pool - > zombie_threads , pair - > worker ) ;
threadpool_send_state_changed ( pair - > pool ) ;
ao2_ref ( pair , - 1 ) ;
return 0 ;
}
/*!
* \ brief Queue a task to kill a zombie thread
*
* This is called by a worker thread when it acknowledges that it is time for
* it to die .
*/
static void threadpool_zombie_thread_dead ( struct ast_threadpool * pool ,
struct worker_thread * worker )
{
struct thread_worker_pair * pair = thread_worker_pair_alloc ( pool , worker ) ;
if ( ! pair ) {
return ;
}
ast_taskprocessor_push ( pool - > control_tps , queued_zombie_thread_dead , pair ) ;
}
/*!
* \ brief Execute a task in the threadpool
*
@ -263,6 +316,10 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
if ( ! pool - > idle_threads ) {
return NULL ;
}
pool - > zombie_threads = ao2_container_alloc ( THREAD_BUCKETS , worker_thread_hash , worker_thread_cmp ) ;
if ( ! pool - > zombie_threads ) {
return NULL ;
}
ao2_ref ( pool , + 1 ) ;
return pool ;
@ -413,6 +470,7 @@ static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
ao2_cleanup ( pool - > active_threads ) ;
ao2_cleanup ( pool - > idle_threads ) ;
ao2_cleanup ( pool - > zombie_threads ) ;
}
/*!
@ -459,6 +517,7 @@ static void grow(struct ast_threadpool *pool, int delta)
return ;
}
ao2_link ( pool - > active_threads , worker ) ;
ao2_ref ( worker , - 1 ) ;
}
}
@ -478,7 +537,46 @@ static int kill_threads(void *obj, void *arg, int flags)
{
int * num_to_kill = arg ;
if ( ( * num_to_kill ) - - > 0 ) {
ast_log ( LOG_NOTICE , " num to kill is %d \n " , * num_to_kill ) ;
if ( * num_to_kill > 0 ) {
- - ( * num_to_kill ) ;
ast_log ( LOG_NOTICE , " Should be killing a thread \n " ) ;
return CMP_MATCH ;
} else {
return CMP_STOP ;
}
}
/*!
* \ brief ao2 callback to zombify a set number of threads .
*
* Threads will be zombified as long as as the counter has not reached
* zero . The counter is decremented with each thread that is zombified .
*
* Zombifying a thread involves removing it from its current container ,
* adding it to the zombie container , and changing the state of the
* worker to a zombie
*
* This callback is called from the threadpool control taskprocessor thread .
*
* \ param obj The worker thread that may be zombified
* \ param arg The pool to which the worker belongs
* \ param data The counter
* \ param flags Unused
* \ retval CMP_MATCH The zombified thread should be removed from its current container
* \ retval CMP_STOP Stop attempting to zombify threads
*/
static int zombify_threads ( void * obj , void * arg , void * data , int flags )
{
struct worker_thread * worker = obj ;
struct ast_threadpool * pool = arg ;
int * num_to_zombify = data ;
if ( ( * num_to_zombify ) - - > 0 ) {
ast_log ( LOG_NOTICE , " Should be zombifying a thread \n " ) ;
ao2_link ( pool - > zombie_threads , worker ) ;
worker_set_state ( worker , ZOMBIE ) ;
return CMP_MATCH ;
} else {
return CMP_STOP ;
@ -490,7 +588,7 @@ static int kill_threads(void *obj, void *arg, int flags)
*
* The preference is to kill idle threads . However , if there are
* more threads to remove than there are idle threads , then active
* threads will be removed too .
* threads will be zombified instead .
*
* This function is called from the threadpool control taskprocessor thread .
*
@ -499,15 +597,21 @@ static int kill_threads(void *obj, void *arg, int flags)
*/
static void shrink ( struct ast_threadpool * pool , int delta )
{
/*
* Preference is to kill idle threads , but
* we ' ll move on to deactivating active threads
* if we have to
*/
int idle_threads = ao2_container_count ( pool - > idle_threads ) ;
int idle_threads_to_kill = MIN ( delta , idle_threads ) ;
int active_threads_to_kill = delta - idle_threads_to_kill ;
int active_threads_to_ zombify = delta - idle_threads_to_kill ;
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK ,
ast_log ( LOG_NOTICE , " Going to kill off %d idle threads \n " , idle_threads_to_kill ) ;
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE ,
kill_threads , & idle_threads_to_kill ) ;
ao2_callback ( pool - > active_threads , OBJ_UNLINK | OBJ_NOLOCK ,
kill_threads, & active_threads_to_kill ) ;
ao2_callback _data ( pool - > active_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE ,
zombify_threads, pool , & active_threads_to_zombify ) ;
}
/*!
@ -553,20 +657,22 @@ static int queued_set_size(void *data)
{
struct set_size_data * ssd = data ;
struct ast_threadpool * pool = ssd - > pool ;
unsigned int new_size = ssd - > size ;
unsigned int num_threads = ssd - > size ;
/* We don't count zombie threads as being "live when potentially resizing */
unsigned int current_size = ao2_container_count ( pool - > active_threads ) +
ao2_container_count ( pool - > idle_threads ) ;
if ( current_size = = n ew_size ) {
if ( current_size = = n um_threads ) {
ast_log ( LOG_NOTICE , " Not changing threadpool size since new size %u is the same as current %u \n " ,
n ew_size , current_size ) ;
n um_threads , current_size ) ;
return 0 ;
}
if ( current_size < n ew_size ) {
grow ( pool , n ew_size - current_size ) ;
if ( current_size < n um_threads ) {
grow ( pool , n um_threads - current_size ) ;
} else {
shrink ( pool , current_size - n ew_size ) ;
shrink ( pool , current_size - n um_threads ) ;
}
threadpool_send_state_changed ( pool ) ;
@ -788,6 +894,18 @@ static void worker_active(struct worker_thread *worker)
alive = worker_idle ( worker ) ;
}
}
/* Reaching this portion means the thread is
* on death ' s door . It may have been killed while
* it was idle , in which case it can just die
* peacefully . If it ' s a zombie , though , then
* it needs to let the pool know so
* that the thread can be removed from the
* list of zombie threads .
*/
if ( worker - > state = = ZOMBIE ) {
threadpool_zombie_thread_dead ( worker - > pool , worker ) ;
}
}
/*!