@ -83,6 +83,7 @@ struct ast_taskprocessor {
AST_LIST_HEAD_NOLOCK ( tps_queue , tps_task ) tps_queue ;
/*! \brief Taskprocessor singleton list entry */
AST_LIST_ENTRY ( ast_taskprocessor ) list ;
struct ast_taskprocessor_listener * listener ;
} ;
# define TPS_MAX_BUCKETS 7
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
@ -122,6 +123,83 @@ static struct ast_cli_entry taskprocessor_clis[] = {
AST_CLI_DEFINE ( cli_tps_report , " List instantiated task processors and statistics " ) ,
} ;
struct default_taskprocessor_listener_pvt {
pthread_t poll_thread ;
ast_mutex_t lock ;
ast_cond_t cond ;
int wake_up ;
int dead ;
} ;
static void default_tps_wake_up ( struct default_taskprocessor_listener_pvt * pvt , int should_die )
{
SCOPED_MUTEX ( lock , & pvt - > lock ) ;
pvt - > wake_up = 1 ;
pvt - > dead = should_die ;
ast_cond_signal ( & pvt - > cond ) ;
}
static void default_listener_destroy ( void * obj )
{
struct ast_taskprocessor_listener * listener = obj ;
struct default_taskprocessor_listener_pvt * pvt = listener - > private_data ;
default_tps_wake_up ( pvt , 1 ) ;
pthread_join ( pvt - > poll_thread , NULL ) ;
pvt - > poll_thread = AST_PTHREADT_NULL ;
ast_mutex_destroy ( & pvt - > lock ) ;
ast_cond_destroy ( & pvt - > cond ) ;
ast_free ( pvt ) ;
ao2_ref ( listener - > tps , - 1 ) ;
listener - > tps = NULL ;
}
static int default_tps_idle ( struct default_taskprocessor_listener_pvt * pvt )
{
SCOPED_MUTEX ( lock , & pvt - > lock ) ;
while ( ! pvt - > wake_up ) {
ast_cond_wait ( & pvt - > cond , lock ) ;
}
pvt - > wake_up = 0 ;
return pvt - > dead ;
}
/* this is the task processing worker function */
static void * tps_processing_function ( void * data )
{
struct ast_taskprocessor_listener * listener = data ;
struct ast_taskprocessor * tps = listener - > tps ;
struct default_taskprocessor_listener_pvt * pvt = listener - > private_data ;
int dead = 0 ;
while ( ! dead ) {
if ( ! ast_taskprocessor_execute ( tps ) ) {
dead = default_tps_idle ( pvt ) ;
}
}
return NULL ;
}
static void default_task_pushed ( struct ast_taskprocessor_listener * listener , int was_empty )
{
struct default_taskprocessor_listener_pvt * pvt = listener - > private_data ;
if ( was_empty ) {
default_tps_wake_up ( pvt , 0 ) ;
}
}
static void default_emptied ( struct ast_taskprocessor_listener * listener )
{
/* No-op */
}
static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
. task_pushed = default_task_pushed ,
. emptied = default_emptied ,
} ;
/*! \internal \brief Clean up resources on Asterisk shutdown */
static void tps_shutdown ( void )
{
@ -286,75 +364,22 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_SUCCESS ;
}
/* this is the task processing worker function */
static void * tps_processing_function ( void * data )
{
struct ast_taskprocessor * i = data ;
struct tps_task * t ;
int size ;
if ( ! i ) {
ast_log ( LOG_ERROR , " cannot start thread_function loop without a ast_taskprocessor structure. \n " ) ;
return NULL ;
}
while ( i - > poll_thread_run ) {
ast_mutex_lock ( & i - > taskprocessor_lock ) ;
if ( ! i - > poll_thread_run ) {
ast_mutex_unlock ( & i - > taskprocessor_lock ) ;
break ;
}
if ( ! ( size = tps_taskprocessor_depth ( i ) ) ) {
ast_cond_wait ( & i - > poll_cond , & i - > taskprocessor_lock ) ;
if ( ! i - > poll_thread_run ) {
ast_mutex_unlock ( & i - > taskprocessor_lock ) ;
break ;
}
}
ast_mutex_unlock ( & i - > taskprocessor_lock ) ;
/* stuff is in the queue */
if ( ! ( t = tps_taskprocessor_pop ( i ) ) ) {
ast_log ( LOG_ERROR , " Wtf?? %d tasks in the queue, but we're popping blanks! \n " , size ) ;
continue ;
}
if ( ! t - > execute ) {
ast_log ( LOG_WARNING , " Task is missing a function to execute! \n " ) ;
tps_task_free ( t ) ;
continue ;
}
t - > execute ( t - > datap ) ;
ast_mutex_lock ( & i - > taskprocessor_lock ) ;
if ( i - > stats ) {
i - > stats - > _tasks_processed_count + + ;
if ( size > i - > stats - > max_qsize ) {
i - > stats - > max_qsize = size ;
}
}
ast_mutex_unlock ( & i - > taskprocessor_lock ) ;
tps_task_free ( t ) ;
}
while ( ( t = tps_taskprocessor_pop ( i ) ) ) {
tps_task_free ( t ) ;
}
return NULL ;
}
/* hash callback for astobj2 */
static int tps_hash_cb ( const void * obj , const int flags )
{
const struct ast_taskprocessor * tps = obj ;
const char * name = flags & OBJ_KEY ? obj : tps - > name ;
return ast_str_case_hash ( tps- > name) ;
return ast_str_case_hash ( name ) ;
}
/* compare callback for astobj2 */
static int tps_cmp_cb ( void * obj , void * arg , int flags )
{
struct ast_taskprocessor * lhs = obj , * rhs = arg ;
const char * rhsname = flags & OBJ_KEY ? arg : rhs - > name ;
return ! strcasecmp ( lhs - > name , rhs - > name) ? CMP_MATCH | CMP_STOP : 0 ;
return ! strcasecmp ( lhs - > name , rhsname ) ? CMP_MATCH | CMP_STOP : 0 ;
}
/* destroy the taskprocessor */
@ -368,20 +393,21 @@ static void tps_taskprocessor_destroy(void *tps)
}
ast_debug ( 1 , " destroying taskprocessor '%s' \n " , t - > name ) ;
/* kill it */
ast_mutex_lock ( & t - > taskprocessor_lock ) ;
t - > poll_thread_run = 0 ;
ast_cond_signal ( & t - > poll_cond ) ;
ast_mutex_unlock ( & t - > taskprocessor_lock ) ;
pthread_join ( t - > poll_thread , NULL ) ;
t - > poll_thread = AST_PTHREADT_NULL ;
ast_mutex_destroy ( & t - > taskprocessor_lock ) ;
ast_cond_destroy ( & t - > poll_cond ) ;
/* free it */
if ( t - > stats ) {
ast_free ( t - > stats ) ;
t - > stats = NULL ;
}
ast_free ( ( char * ) t - > name ) ;
if ( t - > listener ) {
/* This code should not be reached since the listener
* should have been destroyed before the taskprocessor could
* be destroyed
*/
ao2_ref ( t - > listener , - 1 ) ;
t - > listener = NULL ;
}
}
/* pop the front task and return it */
@ -416,80 +442,120 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
return tps - > name ;
}
static struct ast_taskprocessor_listener * default_listener_alloc ( void )
{
struct ast_taskprocessor_listener * listener ;
struct default_taskprocessor_listener_pvt * pvt ;
listener = ao2_alloc ( sizeof ( * listener ) , default_listener_destroy ) ;
if ( ! listener ) {
return NULL ;
}
pvt = ast_calloc ( 1 , sizeof ( * pvt ) ) ;
if ( ! pvt ) {
ao2_ref ( listener , - 1 ) ;
return NULL ;
}
listener - > callbacks = & default_listener_callbacks ;
listener - > private_data = pvt ;
ast_cond_init ( & pvt - > cond , NULL ) ;
ast_mutex_init ( & pvt - > lock ) ;
pvt - > poll_thread = AST_PTHREADT_NULL ;
if ( ast_pthread_create ( & pvt - > poll_thread , NULL , tps_processing_function , listener ) < 0 ) {
ao2_ref ( listener , - 1 ) ;
return NULL ;
}
return listener ;
}
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
* create the taskprocessor if we were told via ast_tps_options to return a reference only
* if it already exists */
struct ast_taskprocessor * ast_taskprocessor_get ( const char * name , enum ast_tps_options create )
{
struct ast_taskprocessor * p , tmp_tps = {
. name = name ,
} ;
struct ast_taskprocessor * p ;
struct ast_taskprocessor_listener * listener ;
if ( ast_strlen_zero ( name ) ) {
ast_log ( LOG_ERROR , " requesting a nameless taskprocessor!!! \n " ) ;
return NULL ;
}
ao2_lock ( tps_singletons ) ;
p = ao2_find ( tps_singletons , & tmp_tps , OBJ_POINTER ) ;
p = ao2_find ( tps_singletons , name , OBJ_KEY ) ;
if ( p ) {
ao2_unlock ( tps_singletons ) ;
return p ;
}
if ( create & TPS_REF_IF_EXISTS ) {
/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
ao2_unlock ( tps_singletons ) ;
return NULL ;
}
/* create a new taskprocessor */
if ( ! ( p = ao2_alloc ( sizeof ( * p ) , tps_taskprocessor_destroy ) ) ) {
ao2_unlock ( tps_singletons ) ;
/* Create a new taskprocessor. Start by creating a default listener */
listener = default_listener_alloc ( ) ;
p = ast_taskprocessor_create_with_listener ( name , listener ) ;
ao2_ref ( listener , - 1 ) ;
return p ;
}
struct ast_taskprocessor * ast_taskprocessor_create_with_listener ( const char * name , struct ast_taskprocessor_listener * listener )
{
RAII_VAR ( struct ast_taskprocessor * , p ,
ao2_alloc ( sizeof ( * p ) , tps_taskprocessor_destroy ) ,
ao2_cleanup ) ;
if ( ! p ) {
ast_log ( LOG_WARNING , " failed to create taskprocessor '%s' \n " , name ) ;
return NULL ;
}
ast_cond_init ( & p - > poll_cond , NULL ) ;
ast_mutex_init ( & p - > taskprocessor_lock ) ;
if ( ! ( p - > stats = ast_calloc ( 1 , sizeof ( * p - > stats ) ) ) ) {
ao2_unlock ( tps_singletons ) ;
ast_log ( LOG_WARNING , " failed to create taskprocessor stats for '%s' \n " , name ) ;
ao2_ref ( p , - 1 ) ;
return NULL ;
}
if ( ! ( p - > name = ast_strdup ( name ) ) ) {
ao2_unlock ( tps_singletons ) ;
ao2_ref ( p , - 1 ) ;
return NULL ;
}
p - > poll_thread_run = 1 ;
p - > poll_thread = AST_PTHREADT_NULL ;
if ( ast_pthread_create ( & p - > poll_thread , NULL , tps_processing_function , p ) < 0 ) {
ao2_unlock ( tps_singletons ) ;
ast_log ( LOG_ERROR , " Taskprocessor '%s' failed to create the processing thread. \n " , p - > name ) ;
ao2_ref ( p , - 1 ) ;
return NULL ;
}
ao2_ref ( listener , + 1 ) ;
p - > listener = listener ;
ao2_ref ( p , + 1 ) ;
listener - > tps = p ;
if ( ! ( ao2_link ( tps_singletons , p ) ) ) {
ao2_unlock ( tps_singletons ) ;
ast_log ( LOG_ERROR , " Failed to add taskprocessor '%s' to container \n " , p - > name ) ;
ao2_ref ( p , - 1 ) ;
return NULL ;
}
ao2_unlock ( tps_singletons ) ;
/* RAII_VAR will decrement the refcount at the end of the function.
* Since we want to pass back a reference to p , we bump the refcount
*/
ao2_ref ( p , + 1 ) ;
return p ;
}
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void * ast_taskprocessor_unreference ( struct ast_taskprocessor * tps )
{
if ( tps ) {
ao2_lock ( tps_singletons ) ;
ao2_unlink ( tps_singletons , tps ) ;
if ( ao2_ref ( tps , - 1 ) > 1 ) {
ao2_link ( tps_singletons , tps ) ;
}
ao2_unlock ( tps_singletons ) ;
struct ast_taskprocessor_listener * listener ;
if ( ! tps ) {
return NULL ;
}
if ( ao2_ref ( tps , - 1 ) > 3 ) {
return NULL ;
}
/* If we're down to 3 references, then those must be:
* 1. The reference we just got rid of
* 2. The container
* 3. The listener
*/
ao2_unlink ( tps_singletons , tps ) ;
listener = tps - > listener ;
tps - > listener = NULL ;
ao2_ref ( listener , - 1 ) ;
return NULL ;
}
@ -497,6 +563,7 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
int ast_taskprocessor_push ( struct ast_taskprocessor * tps , int ( * task_exe ) ( void * datap ) , void * datap )
{
struct tps_task * t ;
int previous_size ;
if ( ! tps | | ! task_exe ) {
ast_log ( LOG_ERROR , " %s is missing!! \n " , ( tps ) ? " task callback " : " taskprocessor " ) ;
@ -508,9 +575,38 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
}
ast_mutex_lock ( & tps - > taskprocessor_lock ) ;
AST_LIST_INSERT_TAIL ( & tps - > tps_queue , t , list ) ;
tps - > tps_queue_size + + ;
ast_cond_signal ( & tps - > poll_cond ) ;
previous_size = tps - > tps_queue_size + + ;
ast_mutex_unlock ( & tps - > taskprocessor_lock ) ;
tps - > listener - > callbacks - > task_pushed ( tps - > listener , previous_size ? 0 : 1 ) ;
return 0 ;
}
int ast_taskprocessor_execute ( struct ast_taskprocessor * tps )
{
struct tps_task * t ;
int size ;
if ( ! ( t = tps_taskprocessor_pop ( tps ) ) ) {
return 0 ;
}
t - > execute ( t - > datap ) ;
tps_task_free ( t ) ;
ast_mutex_lock ( & tps - > taskprocessor_lock ) ;
size = tps_taskprocessor_depth ( tps ) ;
if ( tps - > stats ) {
tps - > stats - > _tasks_processed_count + + ;
if ( size > tps - > stats - > max_qsize ) {
tps - > stats - > max_qsize = size ;
}
}
ast_mutex_unlock ( & tps - > taskprocessor_lock ) ;
if ( size = = 0 ) {
tps - > listener - > callbacks - > emptied ( tps - > listener ) ;
return 0 ;
}
return 1 ;
}