|
|
|
@ -76,6 +76,10 @@ struct ast_taskprocessor {
|
|
|
|
|
void *local_data;
|
|
|
|
|
/*! \brief Taskprocessor current queue size */
|
|
|
|
|
long tps_queue_size;
|
|
|
|
|
/*! \brief Taskprocessor low water clear alert level */
|
|
|
|
|
long tps_queue_low;
|
|
|
|
|
/*! \brief Taskprocessor high water alert trigger level */
|
|
|
|
|
long tps_queue_high;
|
|
|
|
|
/*! \brief Taskprocessor queue */
|
|
|
|
|
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
|
|
|
|
|
struct ast_taskprocessor_listener *listener;
|
|
|
|
@ -85,6 +89,8 @@ struct ast_taskprocessor {
|
|
|
|
|
unsigned int executing:1;
|
|
|
|
|
/*! Indicates that a high water warning has been issued on this task processor */
|
|
|
|
|
unsigned int high_water_warned:1;
|
|
|
|
|
/*! Indicates that a high water alert is active on this taskprocessor */
|
|
|
|
|
unsigned int high_water_alert:1;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
@ -121,15 +127,9 @@ static int tps_hash_cb(const void *obj, const int flags);
|
|
|
|
|
/*! \brief The astobj2 compare callback for taskprocessors */
|
|
|
|
|
static int tps_cmp_cb(void *obj, void *arg, int flags);
|
|
|
|
|
|
|
|
|
|
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
|
|
|
|
|
static void tps_taskprocessor_destroy(void *tps);
|
|
|
|
|
|
|
|
|
|
/*! \brief CLI <example>taskprocessor ping <blah></example> handler function */
|
|
|
|
|
static int tps_ping_handler(void *datap);
|
|
|
|
|
|
|
|
|
|
/*! \brief Remove the front task off the taskprocessor queue */
|
|
|
|
|
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
|
|
|
|
|
|
|
|
|
|
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
|
|
|
|
|
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
|
|
|
|
|
|
|
|
|
@ -472,8 +472,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
|
|
|
|
|
struct ao2_container *sorted_tps;
|
|
|
|
|
struct ast_taskprocessor *tps;
|
|
|
|
|
struct ao2_iterator iter;
|
|
|
|
|
#define FMT_HEADERS "%-45s %10s %10s %10s\n"
|
|
|
|
|
#define FMT_FIELDS "%-45s %10lu %10lu %10lu\n"
|
|
|
|
|
#define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n"
|
|
|
|
|
#define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n"
|
|
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
|
case CLI_INIT:
|
|
|
|
@ -498,7 +498,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
|
|
|
|
|
return CLI_FAILURE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth");
|
|
|
|
|
ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
|
|
|
|
|
tcount = 0;
|
|
|
|
|
iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
|
|
|
|
|
while ((tps = ao2_iterator_next(&iter))) {
|
|
|
|
@ -511,7 +511,8 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
|
|
|
|
|
maxqsize = 0;
|
|
|
|
|
processed = 0;
|
|
|
|
|
}
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize);
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
|
|
|
|
|
tps->tps_queue_low, tps->tps_queue_high);
|
|
|
|
|
ast_taskprocessor_unreference(tps);
|
|
|
|
|
++tcount;
|
|
|
|
|
}
|
|
|
|
@ -539,28 +540,106 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
|
|
|
|
|
return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! Count of the number of taskprocessors in high water alert. */
|
|
|
|
|
static unsigned int tps_alert_count;
|
|
|
|
|
|
|
|
|
|
/*! Access protection for tps_alert_count */
|
|
|
|
|
AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief Add a delta to tps_alert_count with protection.
|
|
|
|
|
* \since 13.10.0
|
|
|
|
|
*
|
|
|
|
|
* \param tps Taskprocessor updating queue water mark alert trigger.
|
|
|
|
|
* \param delta The amount to add to tps_alert_count.
|
|
|
|
|
*
|
|
|
|
|
* \return Nothing
|
|
|
|
|
*/
|
|
|
|
|
static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
|
|
|
|
|
{
|
|
|
|
|
unsigned int old;
|
|
|
|
|
|
|
|
|
|
ast_rwlock_wrlock(&tps_alert_lock);
|
|
|
|
|
old = tps_alert_count;
|
|
|
|
|
tps_alert_count += delta;
|
|
|
|
|
if (DEBUG_ATLEAST(3)
|
|
|
|
|
/* and tps_alert_count becomes zero or non-zero */
|
|
|
|
|
&& !old != !tps_alert_count) {
|
|
|
|
|
ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
|
|
|
|
|
tps->name, tps_alert_count ? "triggered" : "cleared");
|
|
|
|
|
}
|
|
|
|
|
ast_rwlock_unlock(&tps_alert_lock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
unsigned int ast_taskprocessor_alert_get(void)
|
|
|
|
|
{
|
|
|
|
|
unsigned int count;
|
|
|
|
|
|
|
|
|
|
ast_rwlock_rdlock(&tps_alert_lock);
|
|
|
|
|
count = tps_alert_count;
|
|
|
|
|
ast_rwlock_unlock(&tps_alert_lock);
|
|
|
|
|
|
|
|
|
|
return count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
|
|
|
|
|
{
|
|
|
|
|
if (!tps || high_water < 0 || high_water < low_water) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (low_water < 0) {
|
|
|
|
|
/* Set low water level to 90% of high water level */
|
|
|
|
|
low_water = (high_water * 9) / 10;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ao2_lock(tps);
|
|
|
|
|
|
|
|
|
|
tps->tps_queue_low = low_water;
|
|
|
|
|
tps->tps_queue_high = high_water;
|
|
|
|
|
|
|
|
|
|
if (tps->high_water_alert) {
|
|
|
|
|
if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
|
|
|
|
|
/* Update water mark alert immediately */
|
|
|
|
|
tps->high_water_alert = 0;
|
|
|
|
|
tps_alert_add(tps, -1);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if (high_water <= tps->tps_queue_size) {
|
|
|
|
|
/* Update water mark alert immediately */
|
|
|
|
|
tps->high_water_alert = 1;
|
|
|
|
|
tps_alert_add(tps, +1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ao2_unlock(tps);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* destroy the taskprocessor */
|
|
|
|
|
static void tps_taskprocessor_destroy(void *tps)
|
|
|
|
|
static void tps_taskprocessor_dtor(void *tps)
|
|
|
|
|
{
|
|
|
|
|
struct ast_taskprocessor *t = tps;
|
|
|
|
|
struct tps_task *task;
|
|
|
|
|
|
|
|
|
|
if (!tps) {
|
|
|
|
|
ast_log(LOG_ERROR, "missing taskprocessor\n");
|
|
|
|
|
return;
|
|
|
|
|
while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
|
|
|
|
|
tps_task_free(task);
|
|
|
|
|
}
|
|
|
|
|
t->tps_queue_size = 0;
|
|
|
|
|
|
|
|
|
|
if (t->high_water_alert) {
|
|
|
|
|
t->high_water_alert = 0;
|
|
|
|
|
tps_alert_add(t, -1);
|
|
|
|
|
}
|
|
|
|
|
ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
|
|
|
|
|
/* free it */
|
|
|
|
|
|
|
|
|
|
ast_free(t->stats);
|
|
|
|
|
t->stats = NULL;
|
|
|
|
|
ast_free((char *) t->name);
|
|
|
|
|
if (t->listener) {
|
|
|
|
|
ao2_ref(t->listener, -1);
|
|
|
|
|
t->name = NULL;
|
|
|
|
|
ao2_cleanup(t->listener);
|
|
|
|
|
t->listener = NULL;
|
|
|
|
|
}
|
|
|
|
|
while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
|
|
|
|
|
tps_task_free(task);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* pop the front task and return it */
|
|
|
|
@ -569,7 +648,11 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
|
|
|
|
|
struct tps_task *task;
|
|
|
|
|
|
|
|
|
|
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
|
|
|
|
|
tps->tps_queue_size--;
|
|
|
|
|
--tps->tps_queue_size;
|
|
|
|
|
if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
|
|
|
|
|
tps->high_water_alert = 0;
|
|
|
|
|
tps_alert_add(tps, -1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return task;
|
|
|
|
|
}
|
|
|
|
@ -648,19 +731,22 @@ static void *default_listener_pvt_alloc(void)
|
|
|
|
|
|
|
|
|
|
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
|
|
|
|
|
{
|
|
|
|
|
RAII_VAR(struct ast_taskprocessor *, p,
|
|
|
|
|
ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
|
|
|
|
|
struct ast_taskprocessor *p;
|
|
|
|
|
|
|
|
|
|
p = ao2_alloc(sizeof(*p), tps_taskprocessor_dtor);
|
|
|
|
|
if (!p) {
|
|
|
|
|
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
|
|
|
|
|
ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
if (!(p->name = ast_strdup(name))) {
|
|
|
|
|
/* Set default congestion water level alert triggers. */
|
|
|
|
|
p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
|
|
|
|
|
p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
|
|
|
|
|
|
|
|
|
|
p->stats = ast_calloc(1, sizeof(*p->stats));
|
|
|
|
|
p->name = ast_strdup(name);
|
|
|
|
|
if (!p->stats || !p->name) {
|
|
|
|
|
ao2_ref(p, -1);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -675,22 +761,18 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
|
|
|
|
|
if (!(ao2_link(tps_singletons, p))) {
|
|
|
|
|
ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
|
|
|
|
|
listener->tps = NULL;
|
|
|
|
|
ao2_ref(p, -1);
|
|
|
|
|
ao2_ref(p, -2);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (p->listener->callbacks->start(p->listener)) {
|
|
|
|
|
ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
|
|
|
|
|
ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
|
|
|
|
|
p->name);
|
|
|
|
|
ast_taskprocessor_unreference(p);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* RAII_VAR will decrement the refcount at the end of the function.
|
|
|
|
|
* Since we want to pass back a reference to p, we bump the refcount
|
|
|
|
|
*/
|
|
|
|
|
ao2_ref(p, +1);
|
|
|
|
|
return p;
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
|
|
|
|
@ -799,10 +881,16 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
|
|
|
|
|
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
|
|
|
|
|
previous_size = tps->tps_queue_size++;
|
|
|
|
|
|
|
|
|
|
if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
|
|
|
|
|
if (previous_size >= tps->tps_queue_high) {
|
|
|
|
|
if (!tps->high_water_warned) {
|
|
|
|
|
tps->high_water_warned = 1;
|
|
|
|
|
ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
|
|
|
|
|
tps->name, previous_size);
|
|
|
|
|
tps->high_water_warned = 1;
|
|
|
|
|
}
|
|
|
|
|
if (!tps->high_water_alert) {
|
|
|
|
|
tps->high_water_alert = 1;
|
|
|
|
|
tps_alert_add(tps, +1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* The currently executing task counts as still in queue */
|
|
|
|
|