@ -510,7 +510,7 @@ static void grow(struct ast_threadpool *pool, int delta)
if ( ! worker ) {
return ;
}
if ( ao2_link ( pool - > activ e_threads, worker ) ) {
if ( ao2_link ( pool - > idl e_threads, worker ) ) {
if ( worker_thread_start ( worker ) ) {
ast_log ( LOG_ERROR , " Unable to start worker thread %d. Destroying. \n " , worker - > id ) ;
ao2_unlink ( pool - > active_threads , worker ) ;
@ -536,24 +536,21 @@ static int queued_task_pushed(void *data)
struct task_pushed_data * tpd = data ;
struct ast_threadpool * pool = tpd - > pool ;
int was_empty = tpd - > was_empty ;
int state_changed ;
if ( pool - > listener & & pool - > listener - > callbacks - > task_pushed ) {
pool - > listener - > callbacks - > task_pushed ( pool , pool - > listener , was_empty ) ;
}
if ( ao2_container_count ( pool - > idle_threads ) = = 0 ) {
if ( pool - > options . auto_increment > 0 ) {
grow ( pool , pool - > options . auto_increment ) ;
state_changed = 1 ;
if ( ! pool - > options . auto_increment ) {
return 0 ;
}
} else {
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA ,
activate_thread , pool ) ;
state_changed = 1 ;
}
if ( state_changed ) {
threadpool_send_state_changed ( pool ) ;
grow ( pool , pool - > options . auto_increment ) ;
}
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA ,
activate_thread , pool ) ;
threadpool_send_state_changed ( pool ) ;
ao2_ref ( tpd , - 1 ) ;
return 0 ;
}
@ -808,6 +805,8 @@ static int queued_set_size(void *data)
if ( current_size < num_threads ) {
grow ( pool , num_threads - current_size ) ;
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE ,
activate_thread , pool ) ;
} else {
shrink ( pool , current_size - num_threads ) ;
}
@ -986,7 +985,31 @@ static void *worker_start(void *arg)
if ( worker - > options . thread_start ) {
worker - > options . thread_start ( ) ;
}
worker_active ( worker ) ;
ast_mutex_lock ( & worker - > lock ) ;
while ( worker_idle ( worker ) ) {
ast_mutex_unlock ( & worker - > lock ) ;
worker_active ( worker ) ;
ast_mutex_lock ( & worker - > lock ) ;
if ( worker - > state ! = ALIVE ) {
break ;
}
threadpool_active_thread_idle ( worker - > pool , worker ) ;
}
ast_mutex_unlock ( & worker - > lock ) ;
/* Reaching this portion means the thread is
* on death ' s door . It may have been killed while
* it was idle , in which case it can just die
* peacefully . If it ' s a zombie , though , then
* it needs to let the pool know so
* that the thread can be removed from the
* list of zombie threads .
*/
if ( worker - > state = = ZOMBIE ) {
threadpool_zombie_thread_dead ( worker - > pool , worker ) ;
}
if ( worker - > options . thread_end ) {
worker - > options . thread_end ( ) ;
}
@ -1035,24 +1058,19 @@ static int worker_thread_start(struct worker_thread *worker)
*/
static void worker_active ( struct worker_thread * worker )
{
int alive = 1 ;
while ( alive ) {
if ( ! threadpool_execute ( worker - > pool ) ) {
alive = worker_idle ( worker ) ;
}
}
int alive ;
/* Reaching this portion means the thread is
* on death ' s door . It may have been killed while
* it was idle , in which case it can just die
* peacefully . If it ' s a zombie , though , then
* it needs to let the pool know so
* that the thread can be removed from the
* list of zombie threads .
/* The following is equivalent to
*
* while ( threadpool_execute ( worker - > pool ) ) ;
*
* However, reviewers have suggested in the past
* doing that can cause optimizers to ( wrongly )
* optimize the code away .
*/
if ( worker - > state = = ZOMBIE ) {
threadpool_zombie_thread_dead ( worker - > pool , worker ) ;
}
do {
alive = threadpool_execute ( worker - > pool ) ;
} while ( alive ) ;
}
/*!
@ -1061,6 +1079,8 @@ static void worker_active(struct worker_thread *worker)
* The worker waits here until it gets told by the threadpool
* to wake up .
*
* worker is locked before entering this function .
*
* \ param worker The idle worker
* \ retval 0 The thread is being woken up so that it can conclude .
* \ retval non - zero The thread is being woken up to do more work .
@ -1072,15 +1092,10 @@ static int worker_idle(struct worker_thread *worker)
. tv_sec = start . tv_sec + worker - > options . idle_timeout ,
. tv_nsec = start . tv_usec * 1000 ,
} ;
SCOPED_MUTEX ( lock , & worker - > lock ) ;
if ( worker - > state ! = ALIVE ) {
return 0 ;
}
threadpool_active_thread_idle ( worker - > pool , worker ) ;
while ( ! worker - > wake_up ) {
if ( worker - > options . idle_timeout < = 0 ) {
ast_cond_wait ( & worker - > cond , lock ) ;
} else if ( ast_cond_timedwait ( & worker - > cond , lock , & end ) = = ETIMEDOUT ) {
ast_cond_wait ( & worker - > cond , & worker - > lock ) ;
} else if ( ast_cond_timedwait ( & worker - > cond , & worker - > lock , & end ) = = ETIMEDOUT ) {
break ;
}
}