@ -76,6 +76,10 @@ struct ast_taskprocessor {
void * local_data ;
/*! \brief Taskprocessor current queue size */
long tps_queue_size ;
/*! \brief Taskprocessor low water clear alert level */
long tps_queue_low ;
/*! \brief Taskprocessor high water alert trigger level */
long tps_queue_high ;
/*! \brief Taskprocessor queue */
AST_LIST_HEAD_NOLOCK ( tps_queue , tps_task ) tps_queue ;
struct ast_taskprocessor_listener * listener ;
@ -85,6 +89,8 @@ struct ast_taskprocessor {
unsigned int executing : 1 ;
/*! Indicates that a high water warning has been issued on this task processor */
unsigned int high_water_warned : 1 ;
/*! Indicates that a high water alert is active on this taskprocessor */
unsigned int high_water_alert : 1 ;
} ;
/*!
@ -121,15 +127,9 @@ static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb ( void * obj , void * arg , int flags ) ;
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
static void tps_taskprocessor_destroy ( void * tps ) ;
/*! \brief CLI <example>taskprocessor ping <blah></example> handler function */
static int tps_ping_handler ( void * datap ) ;
/*! \brief Remove the front task off the taskprocessor queue */
static struct tps_task * tps_taskprocessor_pop ( struct ast_taskprocessor * tps ) ;
static char * cli_tps_ping ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a ) ;
static char * cli_tps_report ( struct ast_cli_entry * e , int cmd , struct ast_cli_args * a ) ;
@ -472,8 +472,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
struct ao2_container * sorted_tps ;
struct ast_taskprocessor * tps ;
struct ao2_iterator iter ;
# define FMT_HEADERS "%-45s %10s %10s %10s \n"
# define FMT_FIELDS "%-45s %10lu %10lu %10lu \n"
# define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s \n"
# define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu \n"
switch ( cmd ) {
case CLI_INIT :
@ -498,7 +498,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_FAILURE ;
}
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Processor " , " Processed " , " In Queue " , " Max Depth " );
ast_cli ( a - > fd , " \n " FMT_HEADERS , " Processor " , " Processed " , " In Queue " , " Max Depth " , " Low water " , " High water " );
tcount = 0 ;
iter = ao2_iterator_init ( sorted_tps , AO2_ITERATOR_UNLINK ) ;
while ( ( tps = ao2_iterator_next ( & iter ) ) ) {
@ -511,7 +511,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
maxqsize = 0 ;
processed = 0 ;
}
ast_cli ( a - > fd , FMT_FIELDS , name , processed , qsize , maxqsize ) ;
ast_cli ( a - > fd , FMT_FIELDS , name , processed , qsize , maxqsize ,
tps - > tps_queue_low , tps - > tps_queue_high ) ;
ast_taskprocessor_unreference ( tps ) ;
+ + tcount ;
}
@ -539,28 +540,106 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
return ! strcasecmp ( lhs - > name , rhsname ) ? CMP_MATCH | CMP_STOP : 0 ;
}
/*! Count of the number of taskprocessors in high water alert. */
static unsigned int tps_alert_count ;
/*! Access protection for tps_alert_count */
AST_RWLOCK_DEFINE_STATIC ( tps_alert_lock ) ;
/*!
* \ internal
* \ brief Add a delta to tps_alert_count with protection .
* \ since 13.10 .0
*
* \ param tps Taskprocessor updating queue water mark alert trigger .
* \ param delta The amount to add to tps_alert_count .
*
* \ return Nothing
*/
static void tps_alert_add ( struct ast_taskprocessor * tps , int delta )
{
unsigned int old ;
ast_rwlock_wrlock ( & tps_alert_lock ) ;
old = tps_alert_count ;
tps_alert_count + = delta ;
if ( DEBUG_ATLEAST ( 3 )
/* and tps_alert_count becomes zero or non-zero */
& & ! old ! = ! tps_alert_count ) {
ast_log ( LOG_DEBUG , " Taskprocessor '%s' %s the high water alert. \n " ,
tps - > name , tps_alert_count ? " triggered " : " cleared " ) ;
}
ast_rwlock_unlock ( & tps_alert_lock ) ;
}
unsigned int ast_taskprocessor_alert_get ( void )
{
unsigned int count ;
ast_rwlock_rdlock ( & tps_alert_lock ) ;
count = tps_alert_count ;
ast_rwlock_unlock ( & tps_alert_lock ) ;
return count ;
}
int ast_taskprocessor_alert_set_levels ( struct ast_taskprocessor * tps , long low_water , long high_water )
{
if ( ! tps | | high_water < 0 | | high_water < low_water ) {
return - 1 ;
}
if ( low_water < 0 ) {
/* Set low water level to 90% of high water level */
low_water = ( high_water * 9 ) / 10 ;
}
ao2_lock ( tps ) ;
tps - > tps_queue_low = low_water ;
tps - > tps_queue_high = high_water ;
if ( tps - > high_water_alert ) {
if ( ! tps - > tps_queue_size | | tps - > tps_queue_size < low_water ) {
/* Update water mark alert immediately */
tps - > high_water_alert = 0 ;
tps_alert_add ( tps , - 1 ) ;
}
} else {
if ( high_water < = tps - > tps_queue_size ) {
/* Update water mark alert immediately */
tps - > high_water_alert = 1 ;
tps_alert_add ( tps , + 1 ) ;
}
}
ao2_unlock ( tps ) ;
return 0 ;
}
/* destroy the taskprocessor */
static void tps_taskprocessor_destroy ( void * tps )
static void tps_taskprocessor_d to r( void * tps )
{
struct ast_taskprocessor * t = tps ;
struct tps_task * task ;
if ( ! tps ) {
ast_log ( LOG_ERROR , " missing taskprocessor \n " ) ;
return ;
while ( ( task = AST_LIST_REMOVE_HEAD ( & t - > tps_queue , list ) ) ) {
tps_task_free ( task ) ;
}
ast_debug ( 1 , " destroying taskprocessor '%s' \n " , t - > name ) ;
/* free it */
t - > tps_queue_size = 0 ;
if ( t - > high_water_alert ) {
t - > high_water_alert = 0 ;
tps_alert_add ( t , - 1 ) ;
}
ast_free ( t - > stats ) ;
t - > stats = NULL ;
ast_free ( ( char * ) t - > name ) ;
if ( t - > listener ) {
ao2_ref ( t - > listener , - 1 ) ;
t - > listener = NULL ;
}
while ( ( task = AST_LIST_REMOVE_HEAD ( & t - > tps_queue , list ) ) ) {
tps_task_free ( task ) ;
}
t - > name = NULL ;
ao2_cleanup ( t - > listener ) ;
t - > listener = NULL ;
}
/* pop the front task and return it */
@ -569,7 +648,11 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
struct tps_task * task ;
if ( ( task = AST_LIST_REMOVE_HEAD ( & tps - > tps_queue , list ) ) ) {
tps - > tps_queue_size - - ;
- - tps - > tps_queue_size ;
if ( tps - > high_water_alert & & tps - > tps_queue_size < = tps - > tps_queue_low ) {
tps - > high_water_alert = 0 ;
tps_alert_add ( tps , - 1 ) ;
}
}
return task ;
}
@ -648,19 +731,22 @@ static void *default_listener_pvt_alloc(void)
static struct ast_taskprocessor * __allocate_taskprocessor ( const char * name , struct ast_taskprocessor_listener * listener )
{
RAII_VAR ( struct ast_taskprocessor * , p ,
ao2_alloc ( sizeof ( * p ) , tps_taskprocessor_destroy ) , ao2_cleanup ) ;
struct ast_taskprocessor * p ;
p = ao2_alloc ( sizeof ( * p ) , tps_taskprocessor_dtor ) ;
if ( ! p ) {
ast_log ( LOG_WARNING , " failed to create taskprocessor '%s' \n " , name ) ;
return NULL ;
}
if ( ! ( p - > stats = ast_calloc ( 1 , sizeof ( * p - > stats ) ) ) ) {
ast_log ( LOG_WARNING , " failed to create taskprocessor stats for '%s' \n " , name ) ;
return NULL ;
}
if ( ! ( p - > name = ast_strdup ( name ) ) ) {
/* Set default congestion water level alert triggers. */
p - > tps_queue_low = ( AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9 ) / 10 ;
p - > tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL ;
p - > stats = ast_calloc ( 1 , sizeof ( * p - > stats ) ) ;
p - > name = ast_strdup ( name ) ;
if ( ! p - > stats | | ! p - > name ) {
ao2_ref ( p , - 1 ) ;
return NULL ;
}
@ -675,22 +761,18 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
if ( ! ( ao2_link ( tps_singletons , p ) ) ) {
ast_log ( LOG_ERROR , " Failed to add taskprocessor '%s' to container \n " , p - > name ) ;
listener - > tps = NULL ;
ao2_ref ( p , - 1 ) ;
ao2_ref ( p , - 2 ) ;
return NULL ;
}
if ( p - > listener - > callbacks - > start ( p - > listener ) ) {
ast_log ( LOG_ERROR , " Unable to start taskprocessor listener for taskprocessor %s \n " , p - > name ) ;
ast_log ( LOG_ERROR , " Unable to start taskprocessor listener for taskprocessor %s \n " ,
p - > name ) ;
ast_taskprocessor_unreference ( p ) ;
return NULL ;
}
/* RAII_VAR will decrement the refcount at the end of the function.
* Since we want to pass back a reference to p , we bump the refcount
*/
ao2_ref ( p , + 1 ) ;
return p ;
}
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
@ -799,10 +881,16 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
AST_LIST_INSERT_TAIL ( & tps - > tps_queue , t , list ) ;
previous_size = tps - > tps_queue_size + + ;
if ( previous_size > = AST_TASKPROCESSOR_HIGH_WATER_LEVEL & & ! tps - > high_water_warned ) {
ast_log ( LOG_WARNING , " The '%s' task processor queue reached %d scheduled tasks. \n " ,
tps - > name , previous_size ) ;
tps - > high_water_warned = 1 ;
if ( previous_size > = tps - > tps_queue_high ) {
if ( ! tps - > high_water_warned ) {
tps - > high_water_warned = 1 ;
ast_log ( LOG_WARNING , " The '%s' task processor queue reached %d scheduled tasks. \n " ,
tps - > name , previous_size ) ;
}
if ( ! tps - > high_water_alert ) {
tps - > high_water_alert = 1 ;
tps_alert_add ( tps , + 1 ) ;
}
}
/* The currently executing task counts as still in queue */