@ -168,7 +168,7 @@ static void *worker_start(void *arg);
static struct worker_thread * worker_thread_alloc ( struct ast_threadpool * pool ) ;
static int worker_thread_start ( struct worker_thread * worker ) ;
static int worker_idle ( struct worker_thread * worker ) ;
static void worker_set_state ( struct worker_thread * worker , enum worker_state state ) ;
static int worker_set_state ( struct worker_thread * worker , enum worker_state state ) ;
static void worker_shutdown ( struct worker_thread * worker ) ;
/*!
@ -482,7 +482,16 @@ static int activate_thread(void *obj, void *arg, int flags)
worker - > id ) ;
return 0 ;
}
worker_set_state ( worker , ALIVE ) ;
if ( worker_set_state ( worker , ALIVE ) ) {
ast_debug ( 1 , " Failed to activate thread %d. It is dead \n " ,
worker - > id ) ;
/* The worker thread will no longer exist in the active threads or
* idle threads container after this .
*/
ao2_unlink ( pool - > active_threads , worker ) ;
}
return CMP_MATCH ;
}
@ -538,20 +547,33 @@ static int queued_task_pushed(void *data)
struct task_pushed_data * tpd = data ;
struct ast_threadpool * pool = tpd - > pool ;
int was_empty = tpd - > was_empty ;
unsigned int existing_active ;
if ( pool - > listener & & pool - > listener - > callbacks - > task_pushed ) {
pool - > listener - > callbacks - > task_pushed ( pool , pool - > listener , was_empty ) ;
}
if ( ao2_container_count ( pool - > idle_threads ) = = 0 ) {
existing_active = ao2_container_count ( pool - > active_threads ) ;
/* The first pass transitions any existing idle threads to be active, and
* will also remove any worker threads that have recently entered the dead
* state .
*/
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA ,
activate_thread , pool ) ;
/* If no idle threads could be transitioned to active grow the pool as permitted. */
if ( ao2_container_count ( pool - > active_threads ) = = existing_active ) {
if ( ! pool - > options . auto_increment ) {
ao2_ref ( tpd , - 1 ) ;
return 0 ;
}
grow ( pool , pool - > options . auto_increment ) ;
/* An optional second pass transitions any newly added threads. */
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA ,
activate_thread , pool ) ;
}
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA ,
activate_thread , pool ) ;
threadpool_send_state_changed ( pool ) ;
ao2_ref ( tpd , - 1 ) ;
return 0 ;
@ -797,7 +819,7 @@ static int queued_set_size(void *data)
/* We don't count zombie threads as being "live" when potentially resizing */
unsigned int current_size = ao2_container_count ( pool - > active_threads ) +
ao2_container_count ( pool - > idle_threads ) ;
ao2_container_count ( pool - > idle_threads ) ;
if ( current_size = = num_threads ) {
ast_debug ( 3 , " Not changing threadpool size since new size %u is the same as current %u \n " ,
@ -806,6 +828,12 @@ static int queued_set_size(void *data)
}
if ( current_size < num_threads ) {
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE ,
activate_thread , pool ) ;
/* As the above may have altered the number of current threads update it */
current_size = ao2_container_count ( pool - > active_threads ) +
ao2_container_count ( pool - > idle_threads ) ;
grow ( pool , num_threads - current_size ) ;
ao2_callback ( pool - > idle_threads , OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE ,
activate_thread , pool ) ;
@ -1117,13 +1145,36 @@ static int worker_idle(struct worker_thread *worker)
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed .
*
* \ retval - 1 failure ( state transition not permitted )
* \ retval 0 success
*/
static void worker_set_state ( struct worker_thread * worker , enum worker_state state )
static int worker_set_state ( struct worker_thread * worker , enum worker_state state )
{
SCOPED_MUTEX ( lock , & worker - > lock ) ;
switch ( state ) {
case ALIVE :
/* This can occur due to a race condition between being told to go active
* and an idle timeout happening .
*/
if ( worker - > state = = DEAD ) {
return - 1 ;
}
ast_assert ( worker - > state ! = ZOMBIE ) ;
break ;
case DEAD :
break ;
case ZOMBIE :
ast_assert ( worker - > state ! = DEAD ) ;
break ;
}
worker - > state = state ;
worker - > wake_up = 1 ;
ast_cond_signal ( & worker - > cond ) ;
return 0 ;
}
struct serializer {