taskprocessors: Improve logging and add new cli options

This change makes some small changes to improve log readability in
addition to the following changes:

Modified 'core show taskprocessors' to now show Low time and High time
for task execution.

New command 'core show taskprocessor name <taskprocessor-name>' to dump
taskprocessor info and current queue.

Addionally, a new test was added to demonstrate the 'show taskprocessor
name' functionality:
test execute category /main/taskprocessor/ name taskprocessor_cli_show

Setting 'core set debug 3 taskprocessor.c' will now log pushed tasks.
(Warning this is will cause extremely high levels of logging at even
low traffic levels.)

Resolves: #1566

UserNote: New CLI command has been added -
core show taskprocessor name <taskprocessor-name>
20
Mike Bradeen 2 months ago
parent e1d49be69d
commit e016ba689a

@ -2076,7 +2076,11 @@ struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg);
* \retval 0 Success
* \retval -1 Failure
*/
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
int __ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function);
#define ast_sip_push_task(serializer, sip_task, task_data) \
__ast_sip_push_task(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Push a task to SIP servants and wait for it to complete.
@ -2112,13 +2116,19 @@ int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void
* \return sip_task() return value on success.
* \retval -1 Failure to push the task.
*/
int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
int __ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function);
#define ast_sip_push_task_wait_servant(serializer, sip_task, task_data) \
__ast_sip_push_task_wait_servant(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Push a task to SIP servants and wait for it to complete.
* \deprecated Replaced with ast_sip_push_task_wait_servant().
*/
int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
int __ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function);
#define ast_sip_push_task_synchronous(serializer, sip_task, task_data) \
__ast_sip_push_task_synchronous(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Push a task to the serializer and wait for it to complete.
@ -2162,7 +2172,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si
* \return sip_task() return value on success.
* \retval -1 Failure to push the task.
*/
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
int __ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function);
#define ast_sip_push_task_wait_serializer(serializer, sip_task, task_data) \
__ast_sip_push_task_wait_serializer(serializer, sip_task, task_data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Determine if the current thread is a SIP servant thread

@ -197,8 +197,11 @@ long ast_taskpool_queue_size(struct ast_taskpool *pool);
* \retval 0 success
* \retval -1 failure
*/
int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
int __ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data,
const char *file, int line, const char *function)
attribute_warn_unused_result;
#define ast_taskpool_push(pool, task, data) \
__ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Push a task to the taskpool, and wait for completion
@ -214,8 +217,11 @@ int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *
* \retval 0 success
* \retval -1 failure
*/
int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
int __ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data,
const char *file, int line, const char *function)
attribute_warn_unused_result;
#define ast_taskpool_push_wait(pool, task, data) \
__ast_taskpool_push_wait(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Shut down a taskpool and remove the underlying taskprocessors

@ -214,8 +214,10 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
* \retval -1 failure
* \since 1.6.1
*/
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
attribute_warn_unused_result;
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap,
const char *file, int line, const char *function) attribute_warn_unused_result;
#define ast_taskprocessor_push(tps, task_exe, datap) \
__ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*! \brief Local data parameter */
struct ast_taskprocessor_local {
@ -240,9 +242,11 @@ struct ast_taskprocessor_local {
* \retval -1 failure
* \since 12.0.0
*/
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
attribute_warn_unused_result;
int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
int (*task_exe)(struct ast_taskprocessor_local *local), void *datap,
const char *file, int line, const char *function) attribute_warn_unused_result;
#define ast_taskprocessor_push_local(tps, task_exe, datap) \
__ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Indicate the taskprocessor is suspended.

@ -188,8 +188,11 @@ void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int siz
* \retval 0 success
* \retval -1 failure
*/
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
attribute_warn_unused_result;
int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data,
const char *file, int line, const char *function) attribute_warn_unused_result;
#define ast_threadpool_push(pool, task, data) \
__ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
/*!
* \brief Shut down a threadpool and destroy it

@ -519,7 +519,13 @@ static void taskpool_dynamic_pool_grow(struct ast_taskpool *pool, struct taskpoo
}
}
int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
#undef ast_taskpool_push
#define ast_taskpool_push_internal(pool, task, data) \
__ast_taskpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data);
int __ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data,
const char *file, int line, const char *function)
{
RAII_VAR(struct taskpool_taskprocessor *, taskprocessor, NULL, ao2_cleanup);
@ -555,13 +561,19 @@ int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *
return -1;
}
if (ast_taskprocessor_push(taskprocessor->taskprocessor, task, data)) {
if (__ast_taskprocessor_push(taskprocessor->taskprocessor, task, data, file, line, function)) {
return -1;
}
return 0;
}
/* ABI compatibility: Provide actual function symbol for external modules */
int ast_taskpool_push(struct ast_taskpool *pool, int (*task)(void *data), void *data)
{
return __ast_taskpool_push(pool, task, data, NULL, 0, NULL);
}
/*!
* \internal Structure used for synchronous task
*/
@ -620,7 +632,8 @@ static int taskpool_sync_task(void *data)
return ret;
}
int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
int __ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data,
const char *file, int line, const char *function)
{
struct taskpool_sync_task sync_task;
@ -635,7 +648,7 @@ int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), v
return -1;
}
if (ast_taskpool_push(pool, taskpool_sync_task, &sync_task)) {
if (__ast_taskpool_push(pool, taskpool_sync_task, &sync_task, file, line, function)) {
taskpool_sync_task_cleanup(&sync_task);
return -1;
}
@ -650,6 +663,15 @@ int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), v
return sync_task.fail;
}
/* ABI compatibility: Provide actual function symbol for external modules */
#undef ast_taskpool_push_wait
int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data);
int ast_taskpool_push_wait(struct ast_taskpool *pool, int (*task)(void *data), void *data)
{
return __ast_taskpool_push_wait(pool, task, data, NULL, 0, NULL);
}
void ast_taskpool_shutdown(struct ast_taskpool *pool)
{
if (!pool) {

@ -55,6 +55,10 @@ struct tps_task {
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
unsigned int wants_local:1;
/*! \brief Debug information about where the task was pushed from */
const char *file;
int line;
const char *function;
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@ -63,6 +67,16 @@ struct tps_taskprocessor_stats {
unsigned long max_qsize;
/*! \brief This is the current number of tasks processed */
unsigned long _tasks_processed_count;
/*! \brief Highest time (in microseconds) spent processing a task */
long highest_time_processed;
/*! \brief Lowest time (in microseconds) spent processing a task */
long lowest_time_processed;
/*! \brief File where the highest time task was pushed from */
const char *highest_time_task_file;
/*! \brief Line where the highest time task was pushed from */
int highest_time_task_line;
/*! \brief Function where the highest time task was pushed from */
const char *highest_time_task_function;
};
/*! \brief A ast_taskprocessor structure is a singleton by name */
@ -155,6 +169,7 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_reset_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_reset_stats_all(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags);
@ -162,6 +177,7 @@ static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags);
static struct ast_cli_entry taskprocessor_clis[] = {
AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
AST_CLI_DEFINE(cli_tps_show_taskprocessor, "Display detailed info about a taskprocessor"),
AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
AST_CLI_DEFINE(cli_tps_reset_stats, "Reset a named task processor's stats"),
AST_CLI_DEFINE(cli_tps_reset_stats_all, "Reset all task processors' stats"),
@ -189,6 +205,17 @@ static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listene
listener->user_data = NULL;
}
/* Keeping the old symbols for ABI compatibility */
#undef ast_taskprocessor_push
#define ast_taskprocessor_push_internal(tps, task_exe, datap) \
__ast_taskprocessor_push(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
#undef ast_taskprocessor_push_local
#define ast_taskprocessor_push_local_internal(tps, task_exe, datap) \
__ast_taskprocessor_push_local(tps, task_exe, datap, __FILE__, __LINE__, __PRETTY_FUNCTION__)
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap);
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
@ -204,8 +231,8 @@ static void *default_tps_processing_function(void *data)
while (!pvt->dead) {
res = ast_sem_wait(&pvt->sem);
if (res != 0 && errno != EINTR) {
ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
strerror(errno));
ast_log(LOG_ERROR, "Taskprocessor '%s': Semaphore wait failed: %s\n",
tps->name, strerror(errno));
/* Just give up */
break;
}
@ -238,8 +265,8 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
if (ast_sem_post(&pvt->sem) != 0) {
ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
strerror(errno));
ast_log(LOG_ERROR, "Taskprocessor '%s': Failed to signal task enqueue: %s\n",
listener->tps->name, strerror(errno));
}
}
@ -258,7 +285,7 @@ static void default_listener_shutdown(struct ast_taskprocessor_listener *listene
/* Hold a reference during shutdown */
ao2_t_ref(listener->tps, +1, "tps-shutdown");
if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
if (ast_taskprocessor_push_internal(listener->tps, default_listener_die, pvt)) {
/* This will cause the thread to exit early without completing tasks already
* in the queue. This is probably the least bad option in this situation. */
default_listener_die(pvt);
@ -312,7 +339,7 @@ static void tps_shutdown(void)
objcount = ao2_container_count(tps_singletons);
if (objcount > 0) {
ast_log(LOG_DEBUG,
"waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
"Taskprocessor shutdown: Waiting for %d taskprocessor(s) to complete.\n",
objcount);
/* give the running taskprocessors a chance to finish, up to
@ -327,8 +354,8 @@ static void tps_shutdown(void)
delay.tv_sec = 1;
delay.tv_nsec = 0;
ast_log(LOG_DEBUG,
"waiting for taskprocessor shutdown, %d tps object(s) still allocated.\n",
objcount);
"Taskprocessor shutdown: Still waiting for %d taskprocessor(s) after %d second(s).\n",
objcount, tries + 1);
}
}
@ -336,17 +363,19 @@ static void tps_shutdown(void)
* a taskprocessor was not cleaned up somewhere */
if (objcount > 0) {
ast_log(LOG_ERROR,
"Assertion may occur, the following taskprocessors are still running:\n");
"Taskprocessor shutdown: %d taskprocessor(s) still running after %d seconds. Assertion may occur:\n",
objcount, AST_TASKPROCESSOR_SHUTDOWN_MAX_WAIT);
sorted_tps = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, tps_sort_cb,
NULL);
if (!sorted_tps || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
ast_log(LOG_ERROR, "unable to get sorted list of taskprocessors");
ast_log(LOG_ERROR, "Unable to get sorted list of taskprocessors for shutdown report\n");
}
else {
iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
while ((tps = ao2_iterator_next(&iter))) {
ast_log(LOG_ERROR, "taskprocessor '%s'\n", tps->name);
ast_log(LOG_ERROR, " - Taskprocessor '%s' (queue size: %ld)\n",
tps->name, tps->tps_queue_size);
}
}
@ -354,7 +383,7 @@ static void tps_shutdown(void)
}
else {
ast_log(LOG_DEBUG,
"All waiting taskprocessors cleared!\n");
"Taskprocessor shutdown: All taskprocessors completed successfully.\n");
}
ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@ -370,13 +399,13 @@ int ast_tps_init(void)
tps_singletons = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
TPS_MAX_BUCKETS, tps_hash_cb, NULL, tps_cmp_cb);
if (!tps_singletons) {
ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
ast_log(LOG_ERROR, "Failed to initialize taskprocessor container!\n");
return -1;
}
if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
ao2_ref(tps_singletons, -1);
ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
ast_log(LOG_ERROR, "Failed to initialize taskprocessor subsystems tracking vector!\n");
return -1;
}
@ -390,43 +419,51 @@ int ast_tps_init(void)
}
/* allocate resources for the task */
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap,
const char *file, int line, const char *function)
{
struct tps_task *t;
if (!task_exe) {
ast_log(LOG_ERROR, "task_exe is NULL!\n");
ast_log(LOG_ERROR, "Task callback function is NULL!\n");
return NULL;
}
t = ast_calloc(1, sizeof(*t));
if (!t) {
ast_log(LOG_ERROR, "failed to allocate task!\n");
ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
return NULL;
}
t->callback.execute = task_exe;
t->datap = datap;
t->file = file;
t->line = line;
t->function = function;
return t;
}
static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap,
const char *file, int line, const char *function)
{
struct tps_task *t;
if (!task_exe) {
ast_log(LOG_ERROR, "task_exe is NULL!\n");
ast_log(LOG_ERROR, "Task callback function is NULL!\n");
return NULL;
}
t = ast_calloc(1, sizeof(*t));
if (!t) {
ast_log(LOG_ERROR, "failed to allocate task!\n");
ast_log(LOG_ERROR, "Failed to allocate memory for task!\n");
return NULL;
}
t->callback.execute_local = task_exe;
t->datap = datap;
t->wants_local = 1;
t->file = file;
t->line = line;
t->function = function;
return t;
}
@ -520,7 +557,7 @@ static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args
ts.tv_nsec = when.tv_usec * 1000;
ast_mutex_lock(&cli_ping_cond_lock);
if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
if (ast_taskprocessor_push_internal(tps, tps_ping_handler, 0) < 0) {
ast_mutex_unlock(&cli_ping_cond_lock);
ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
ast_taskprocessor_unreference(tps);
@ -574,8 +611,8 @@ static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
return cmp;
}
#define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s\n"
#define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu\n"
#define FMT_HEADERS "%-70s %10s %10s %10s %10s %10s %10s %10s\n"
#define FMT_FIELDS "%-70s %10lu %10lu %10lu %10lu %10lu %10ld %10ld\n"
/*!
* \internal
@ -589,7 +626,8 @@ static void tps_report_taskprocessor_list_helper(int fd, struct ast_taskprocesso
{
ast_cli(fd, FMT_FIELDS, tps->name, tps->stats._tasks_processed_count,
tps->tps_queue_size, tps->stats.max_qsize, tps->tps_queue_low,
tps->tps_queue_high);
tps->tps_queue_high, tps->stats.lowest_time_processed,
tps->stats.highest_time_processed);
}
/*!
@ -667,12 +705,94 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
return CLI_SHOWUSAGE;
}
ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water", "Low time(us)", "High time(us)");
ast_cli(a->fd, "\n%d taskprocessors\n\n", tps_report_taskprocessor_list(a->fd, like));
return CLI_SUCCESS;
}
static char *cli_tps_show_taskprocessor(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
const char *name;
struct ast_taskprocessor *tps;
struct tps_task *task;
int task_count = 0;
switch (cmd) {
case CLI_INIT:
e->command = "core show taskprocessor name";
e->usage =
"Usage: core show taskprocessor name <taskprocessor>\n"
" Displays detailed information about a specific taskprocessor,\n"
" including all queued tasks and their origins (DEVMODE only).\n";
return NULL;
case CLI_GENERATE:
if (a->pos == 4) {
return tps_taskprocessor_tab_complete(a);
}
return NULL;
}
if (a->argc != 5) {
return CLI_SHOWUSAGE;
}
name = a->argv[4];
tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS);
if (!tps) {
ast_cli(a->fd, "\nTaskprocessor '%s' not found\n\n", name);
return CLI_SUCCESS;
}
ao2_lock(tps);
ast_cli(a->fd, "\nTaskprocessor: %s\n", tps->name);
ast_cli(a->fd, "===========================================\n");
ast_cli(a->fd, "Subsystem: %s\n", tps->subsystem[0] ? tps->subsystem : "(none)");
ast_cli(a->fd, "Tasks processed: %lu\n", tps->stats._tasks_processed_count);
ast_cli(a->fd, "Current queue size: %ld\n", tps->tps_queue_size);
ast_cli(a->fd, "Max queue depth: %lu\n", tps->stats.max_qsize);
ast_cli(a->fd, "Low water mark: %ld\n", tps->tps_queue_low);
ast_cli(a->fd, "High water mark: %ld\n", tps->tps_queue_high);
ast_cli(a->fd, "High water alert: %s\n", tps->high_water_alert ? "Yes" : "No");
ast_cli(a->fd, "Suspended: %s\n", tps->suspended ? "Yes" : "No");
ast_cli(a->fd, "Currently executing: %s\n", tps->executing ? "Yes" : "No");
ast_cli(a->fd, "Highest time (us): %ld\n", tps->stats.highest_time_processed);
if (tps->stats.highest_time_task_file) {
ast_cli(a->fd, " Highest task origin: %s:%d (%s)\n",
tps->stats.highest_time_task_file,
tps->stats.highest_time_task_line,
tps->stats.highest_time_task_function);
}
ast_cli(a->fd, "Lowest time (us): %ld\n", tps->stats.lowest_time_processed);
if (tps->tps_queue_size > 0) {
ast_cli(a->fd, "\nQueued Tasks:\n");
ast_cli(a->fd, "-------------------------------------------\n");
AST_LIST_TRAVERSE(&tps->tps_queue, task, list) {
task_count++;
if (task->file) {
ast_cli(a->fd, " Task #%d:\n", task_count);
ast_cli(a->fd, " Origin: %s:%d\n", task->file, task->line);
ast_cli(a->fd, " Function: %s\n", task->function);
ast_cli(a->fd, " Type: %s\n", task->wants_local ? "Local" : "Standard");
} else {
ast_cli(a->fd, " Task #%d: (origin not available)\n", task_count);
}
}
ast_cli(a->fd, "\nTotal queued tasks: %d\n", task_count);
} else {
ast_cli(a->fd, "\nNo tasks currently queued.\n");
}
ao2_unlock(tps);
ast_taskprocessor_unreference(tps);
ast_cli(a->fd, "\n");
return CLI_SUCCESS;
}
/* hash callback for astobj2 */
static int tps_hash_cb(const void *obj, const int flags)
{
@ -866,8 +986,8 @@ static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
if (DEBUG_ATLEAST(3)
/* and tps_alert_count becomes zero or non-zero */
&& !old != !tps_alert_count) {
ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
tps->name, tps_alert_count ? "triggered" : "cleared");
ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert (total alerts: %u).\n",
tps->name, tps_alert_count ? "triggered" : "cleared", tps_alert_count);
}
if (tps->subsystem[0] != '\0') {
@ -1028,11 +1148,12 @@ static void *default_listener_pvt_alloc(void)
pvt = ast_calloc(1, sizeof(*pvt));
if (!pvt) {
ast_log(LOG_ERROR, "Failed to allocate memory for taskprocessor listener\n");
return NULL;
}
pvt->poll_thread = AST_PTHREADT_NULL;
if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
ast_log(LOG_ERROR, "Failed to initialize taskprocessor semaphore: %s\n", strerror(errno));
ast_free(pvt);
return NULL;
}
@ -1098,7 +1219,7 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
static struct ast_taskprocessor *__start_taskprocessor(struct ast_taskprocessor *p)
{
if (p && p->listener->callbacks->start(p->listener)) {
ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
ast_log(LOG_ERROR, "Failed to start taskprocessor listener for '%s'\n",
p->name);
ast_taskprocessor_unreference(p);
@ -1118,7 +1239,7 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
struct default_taskprocessor_listener_pvt *pvt;
if (ast_strlen_zero(name)) {
ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
ast_log(LOG_ERROR, "Cannot get taskprocessor with empty name!\n");
return NULL;
}
ao2_lock(tps_singletons);
@ -1212,23 +1333,28 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
int was_empty;
if (!tps) {
ast_log(LOG_ERROR, "tps is NULL!\n");
ast_log(LOG_ERROR, "Taskprocessor is NULL!\n");
return -1;
}
if (!t) {
ast_log(LOG_ERROR, "t is NULL!\n");
ast_log(LOG_ERROR, "Task is NULL!\n");
return -1;
}
if (t->file) {
ast_debug(3, "Taskprocessor '%s': Task pushed from %s:%d (%s)\n",
tps->name, t->file, t->line, t->function);
}
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
if (tps->tps_queue_high <= tps->tps_queue_size) {
if (!tps->high_water_alert) {
ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
ast_log(LOG_WARNING, "Taskprocessor '%s' queue reached %ld scheduled tasks (high water mark: %ld)%s.\n",
tps->name, tps->tps_queue_size, tps->tps_queue_high, tps->high_water_warned ? " again" : "");
tps->high_water_warned = 1;
tps->high_water_alert = 1;
tps_alert_add(tps, +1);
@ -1242,14 +1368,26 @@ static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
return 0;
}
int __ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap,
const char *file, int line, const char *function)
{
return taskprocessor_push(tps, tps_task_alloc(task_exe, datap, file, line, function));
}
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
{
return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
return __ast_taskprocessor_push(tps, task_exe, datap, NULL, 0, NULL);
}
int __ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap,
const char *file, int line, const char *function)
{
return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap, file, line, function));
}
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
{
return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
return __ast_taskprocessor_push_local(tps, task_exe, datap, NULL, 0, NULL);
}
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
@ -1284,6 +1422,11 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
struct ast_taskprocessor_local local;
struct tps_task *t;
long size;
struct timeval start;
long elapsed;
const char *task_file = NULL;
int task_line = 0;
const char *task_function = NULL;
ao2_lock(tps);
t = tps_taskprocessor_pop(tps);
@ -1299,8 +1442,15 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
local.local_data = tps->local_data;
local.data = t->datap;
}
/* Save task origin info before we free the task */
task_file = t->file;
task_line = t->line;
task_function = t->function;
ao2_unlock(tps);
start = ast_tvnow();
if (t->wants_local) {
t->callback.execute_local(&local);
} else {
@ -1324,6 +1474,18 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
if (size >= tps->stats.max_qsize) {
tps->stats.max_qsize = size + 1;
}
elapsed = ast_tvdiff_us(ast_tvnow(), start);
if (elapsed > tps->stats.highest_time_processed) {
tps->stats.highest_time_processed = elapsed;
tps->stats.highest_time_task_file = task_file;
tps->stats.highest_time_task_line = task_line;
tps->stats.highest_time_task_function = task_function;
}
if (elapsed < tps->stats.lowest_time_processed) {
tps->stats.lowest_time_processed = elapsed;
}
ao2_unlock(tps);
/* If we executed a task, check for the transition to empty */
@ -1393,6 +1555,11 @@ static void tps_reset_stats(struct ast_taskprocessor *tps)
ao2_lock(tps);
tps->stats._tasks_processed_count = 0;
tps->stats.max_qsize = 0;
tps->stats.highest_time_processed = 0;
tps->stats.lowest_time_processed = 0;
tps->stats.highest_time_task_file = NULL;
tps->stats.highest_time_task_line = 0;
tps->stats.highest_time_task_function = NULL;
ao2_unlock(tps);
}

@ -608,8 +608,7 @@ static int queued_task_pushed(void *data)
* \param listener The taskprocessor listener. The threadpool is the listener's private data
* \param was_empty True if the taskprocessor was empty prior to the task being pushed
*/
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
int was_empty)
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
{
struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
struct task_pushed_data *tpd;
@ -954,15 +953,27 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
return pool;
}
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
#undef ast_threadpool_push
#define ast_threadpool_push_internal(pool, task, data) \
__ast_threadpool_push(pool, task, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data);
int __ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data,
const char *file, int line, const char *function)
{
SCOPED_AO2LOCK(lock, pool);
if (!pool->shutting_down) {
return ast_taskprocessor_push(pool->tps, task, data);
return __ast_taskprocessor_push(pool->tps, task, data, file, line, function);
}
return -1;
}
/* ABI compatibility: Provide actual function symbol for external modules */
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
{
return __ast_threadpool_push(pool, task, data, NULL, 0, NULL);
}
void ast_threadpool_shutdown(struct ast_threadpool *pool)
{
if (!pool) {

@ -2096,13 +2096,48 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
return ast_sip_create_serializer_group(name, NULL);
}
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
#undef ast_sip_push_task
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
int __ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function)
{
if (!serializer) {
serializer = ast_serializer_pool_get(sip_serializer_pool);
}
return ast_taskprocessor_push(serializer, sip_task, task_data);
return __ast_taskprocessor_push(serializer, sip_task, task_data, file, line, function);
}
/* ABI compatibility: Provide actual function symbol for external modules */
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
return __ast_sip_push_task(serializer, sip_task, task_data, NULL, 0, NULL);
}
/* ABI compatibility: Provide actual function symbols for wait functions */
#undef ast_sip_push_task_wait_servant
int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
return __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, NULL, 0, NULL);
}
#undef ast_sip_push_task_synchronous
int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
return __ast_sip_push_task_synchronous(serializer, sip_task, task_data, NULL, 0, NULL);
}
#undef ast_sip_push_task_wait_serializer
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data);
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
return __ast_sip_push_task_wait_serializer(serializer, sip_task, task_data, NULL, 0, NULL);
}
struct sync_task_data {
@ -2134,7 +2169,8 @@ static int sync_task(void *data)
return ret;
}
static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
static int __ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function)
{
/* This method is an onion */
struct sync_task_data std;
@ -2145,7 +2181,7 @@ static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*si
std.task = sip_task;
std.task_data = task_data;
if (ast_sip_push_task(serializer, sync_task, &std)) {
if (__ast_sip_push_task(serializer, sync_task, &std, file, line, function)) {
ast_mutex_destroy(&std.lock);
ast_cond_destroy(&std.cond);
return -1;
@ -2162,21 +2198,24 @@ static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*si
return std.fail;
}
int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
int __ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function)
{
if (ast_sip_thread_is_servant()) {
return sip_task(task_data);
}
return ast_sip_push_task_wait(serializer, sip_task, task_data);
return __ast_sip_push_task_wait(serializer, sip_task, task_data, file, line, function);
}
int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
int __ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function)
{
return ast_sip_push_task_wait_servant(serializer, sip_task, task_data);
return __ast_sip_push_task_wait_servant(serializer, sip_task, task_data, file, line, function);
}
int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
int __ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data,
const char *file, int line, const char *function)
{
if (!serializer) {
/* Caller doesn't care which PJSIP serializer the task executes under. */
@ -2195,7 +2234,7 @@ int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int
return sip_task(task_data);
}
return ast_sip_push_task_wait(serializer, sip_task, task_data);
return __ast_sip_push_task_wait(serializer, sip_task, task_data, file, line, function);
}
void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size)

@ -31,12 +31,15 @@
#include "asterisk.h"
#include <unistd.h>
#include "asterisk/test.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/module.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/threadpool.h"
#include "asterisk/cli.h"
/*!
* \brief userdata associated with baseline taskprocessor test
@ -963,6 +966,170 @@ AST_TEST_DEFINE(serializer_pool)
return AST_TEST_PASS;
}
/*!
* \brief Test for CLI command "core show taskprocessor <name>"
*
* This test creates a taskprocessor, queues tasks with controlled execution,
* and verifies that the CLI command displays the queued tasks correctly.
*/
AST_TEST_DEFINE(taskprocessor_cli_show)
{
RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
struct task_data *task_data1 = NULL;
struct task_data *task_data2 = NULL;
struct task_data *task_data3 = NULL;
int task_queued1 = 0, task_queued2 = 0, task_queued3 = 0;
char cli_command[128];
int cli_output_fd[2];
char output_buffer[4096] = {0};
ssize_t bytes_read;
int res = AST_TEST_FAIL;
switch (cmd) {
case TEST_INIT:
info->name = "taskprocessor_cli_show";
info->category = "/main/taskprocessor/";
info->summary = "Test CLI command 'core show taskprocessor'";
info->description =
"Verifies that the 'core show taskprocessor <name>' CLI command\n"
"displays taskprocessor information and queued tasks correctly.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
/* Create a pipe to capture CLI output */
if (pipe(cli_output_fd) != 0) {
ast_test_status_update(test, "Failed to create pipe for CLI output\n");
return AST_TEST_FAIL;
}
/* Create taskprocessor */
tps = ast_taskprocessor_get("test_cli_taskprocessor", TPS_REF_DEFAULT);
if (!tps) {
ast_test_status_update(test, "Unable to create test taskprocessor\n");
close(cli_output_fd[0]);
close(cli_output_fd[1]);
return AST_TEST_FAIL;
}
/* Create tasks that will wait so they stay in the queue */
task_data1 = task_data_create();
task_data2 = task_data_create();
task_data3 = task_data_create();
if (!task_data1 || !task_data2 || !task_data3) {
ast_test_status_update(test, "Unable to create task_data\n");
goto cleanup;
}
/* Set a long wait time so tasks stay queued */
task_data1->wait_time = 2000; /* 2 seconds */
task_data2->wait_time = 2000;
task_data3->wait_time = 2000;
/* Queue the tasks */
if (ast_taskprocessor_push(tps, task, task_data1)) {
ast_test_status_update(test, "Failed to queue task 1\n");
goto cleanup;
}
task_queued1 = 1;
if (ast_taskprocessor_push(tps, task, task_data2)) {
ast_test_status_update(test, "Failed to queue task 2\n");
goto cleanup;
}
task_queued2 = 1;
if (ast_taskprocessor_push(tps, task, task_data3)) {
ast_test_status_update(test, "Failed to queue task 3\n");
goto cleanup;
}
task_queued3 = 1;
/* Execute the CLI command */
snprintf(cli_command, sizeof(cli_command), "core show taskprocessor name test_cli_taskprocessor");
if (ast_cli_command(cli_output_fd[1], cli_command) != 0) {
ast_test_status_update(test, "CLI command execution failed\n");
goto cleanup;
}
/* Close write end and read the output */
close(cli_output_fd[1]);
cli_output_fd[1] = -1;
bytes_read = read(cli_output_fd[0], output_buffer, sizeof(output_buffer) - 1);
if (bytes_read <= 0) {
ast_test_status_update(test, "Failed to read CLI output\n");
goto cleanup;
}
output_buffer[bytes_read] = '\0';
/* Log the output for inspection */
ast_test_status_update(test, "CLI Output:\n%s\n", output_buffer);
/* Verify the output contains expected information */
if (!strstr(output_buffer, "test_cli_taskprocessor")) {
ast_test_status_update(test, "Output missing taskprocessor name\n");
goto cleanup;
}
if (!strstr(output_buffer, "Current queue size")) {
ast_test_status_update(test, "Output missing queue size information\n");
goto cleanup;
}
/* Check for queued tasks section (at least one task should be shown) */
if (!strstr(output_buffer, "Queued Tasks") && !strstr(output_buffer, "Currently executing")) {
ast_test_status_update(test, "Output missing queued tasks or execution status\n");
goto cleanup;
}
/* Verify we see task information */
if (!strstr(output_buffer, "Task #")) {
ast_test_status_update(test, "Output missing task information\n");
goto cleanup;
}
ast_test_status_update(test, "CLI command output validated successfully\n");
res = AST_TEST_PASS;
cleanup:
ast_test_status_update(test, "Waiting for tasks to complete\n");
/* Wait for tasks to complete */
if (task_data1) {
if (task_queued1) {
task_wait(task_data1);
}
ao2_cleanup(task_data1);
}
if (task_data2) {
if (task_queued2) {
task_wait(task_data2);
}
ao2_cleanup(task_data2);
}
if (task_data3) {
if (task_queued3) {
task_wait(task_data3);
}
ao2_cleanup(task_data3);
}
if (cli_output_fd[0] >= 0) {
close(cli_output_fd[0]);
}
if (cli_output_fd[1] >= 0) {
close(cli_output_fd[1]);
}
ast_test_status_update(test, "Tasks complete\n");
return res;
}
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
@ -972,6 +1139,7 @@ static int unload_module(void)
ast_test_unregister(taskprocessor_shutdown);
ast_test_unregister(taskprocessor_push_local);
ast_test_unregister(serializer_pool);
ast_test_unregister(taskprocessor_cli_show);
return 0;
}
@ -984,6 +1152,7 @@ static int load_module(void)
ast_test_register(taskprocessor_shutdown);
ast_test_register(taskprocessor_push_local);
ast_test_register(serializer_pool);
ast_test_register(taskprocessor_cli_show);
return AST_MODULE_LOAD_SUCCESS;
}

Loading…
Cancel
Save