Merge "serializer: move/add asterisk serializer pool functionality" into 16

16.7
George Joseph 6 years ago committed by Gerrit Code Review
commit 4e44ffc623

@ -0,0 +1,85 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2019, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef _AST_SERIALIZER_H
#define _AST_SERIALIZER_H
struct ast_threadpool;
/*!
* Maintains a named pool of thread pooled taskprocessors. Also if configured
* a shutdown group can be enabled that will ensure all serializers have
* completed any assigned task before destruction.
*/
struct ast_serializer_pool;
/*!
* \brief Destroy the serializer pool.
*
* Attempt to destroy the serializer pool. If a shutdown group has been enabled,
* and times out waiting for threads to complete, then this function will return
* the number of remaining threads, and the pool will not be destroyed.
*
* \param pool The pool to destroy
*/
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool);
/*!
* \brief Create a serializer pool.
*
* Create a serializer pool with an optional shutdown group. If a timeout greater
* than -1 is specified then a shutdown group is enabled on the pool.
*
* \param name The base name for the pool, and used when building taskprocessor(s)
* \param size The size of the pool
* \param threadpool The backing threadpool to use
* \param timeout The timeout used if using a shutdown group (-1 = disabled)
*
* \retval A newly allocated serializer pool object, or NULL on error
*/
struct ast_serializer_pool *ast_serializer_pool_create(const char *name,
unsigned int size, struct ast_threadpool *threadpool, int timeout);
/*!
* \brief Retrieve the base name of the serializer pool.
*
* \param pool The pool object
*
* \retval The base name given to the pool
*/
const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool);
/*!
* \brief Retrieve a serializer from the pool.
*
* \param pool The pool object
*
* \retval A serializer/taskprocessor
*/
struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool);
/*!
* \brief Set taskprocessor alert levels for the serializers in the pool.
*
* \param pool The pool to destroy
*
* \retval 0 on success, or -1 on error.
*/
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low);
#endif /* _AST_SERIALIZER_H */

@ -304,6 +304,15 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps);
*/
unsigned int ast_taskprocessor_seq_num(void);
/*!
* \brief Append the next sequence number to the given string, and copy into the buffer.
*
* \param buf Where to copy the appended taskprocessor name.
* \param size How large is buf including null terminator.
* \param name A name to append the sequence number to.
*/
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name);
/*!
* \brief Build a taskprocessor name with a sequence number on the end.
* \since 13.8.0

@ -0,0 +1,189 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2019, Sangoma Technologies Corporation
*
* Kevin Harwell <kharwell@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/vector.h"
struct ast_serializer_pool {
/*! Shutdown group to monitor serializers. */
struct ast_serializer_shutdown_group *shutdown_group;
/*! Time to wait if using a shutdown group. */
int shutdown_group_timeout;
/*! A pool of taskprocessor(s) */
AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;
/*! Base name for the pool */
char name[];
};
int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
{
if (!pool) {
return 0;
}
/* Clear out the serializers */
AST_VECTOR_RW_WRLOCK(&pool->serializers);
AST_VECTOR_RESET(&pool->serializers, ast_taskprocessor_unreference);
AST_VECTOR_RW_UNLOCK(&pool->serializers);
/* If using a shutdown group then wait for all threads to complete */
if (pool->shutdown_group) {
int remaining;
ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);
remaining = ast_serializer_shutdown_group_join(
pool->shutdown_group, pool->shutdown_group_timeout);
if (remaining) {
/* If we've timed out don't fully cleanup yet */
ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
"'%d' dependencies still processing.\n", pool->name, remaining);
return remaining;
}
ao2_ref(pool->shutdown_group, -1);
pool->shutdown_group = NULL;
}
AST_VECTOR_RW_FREE(&pool->serializers);
ast_free(pool);
return 0;
}
struct ast_serializer_pool *ast_serializer_pool_create(const char *name,
unsigned int size, struct ast_threadpool *threadpool, int timeout)
{
struct ast_serializer_pool *pool;
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
size_t idx;
ast_assert(size > 0);
pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
if (!pool) {
return NULL;
}
strcpy(pool->name, name); /* safe */
pool->shutdown_group_timeout = timeout;
pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;
AST_VECTOR_RW_INIT(&pool->serializers, size);
for (idx = 0; idx < size; ++idx) {
struct ast_taskprocessor *tps;
/* Create name with seq number appended. */
ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);
tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);
if (!tps) {
ast_serializer_pool_destroy(pool);
ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
tps_name);
return NULL;
}
if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
ast_serializer_pool_destroy(pool);
ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
tps_name);
return NULL;
}
}
return pool;
}
const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
{
return pool->name;
}
struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool)
{
struct ast_taskprocessor *res;
size_t idx;
if (!pool) {
return NULL;
}
AST_VECTOR_RW_RDLOCK(&pool->serializers);
if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
AST_VECTOR_RW_UNLOCK(&pool->serializers);
return NULL;
}
res = AST_VECTOR_GET(&pool->serializers, 0);
/* Choose the taskprocessor with the smallest queue */
for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
if (ast_taskprocessor_size(cur) < ast_taskprocessor_size(res)) {
res = cur;
}
}
AST_VECTOR_RW_UNLOCK(&pool->serializers);
return res;
}
int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
{
size_t idx;
long tps_queue_high;
long tps_queue_low;
if (!pool) {
return 0;
}
tps_queue_high = high;
if (tps_queue_high <= 0) {
ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
"trigger level '%ld'\n", pool->name, tps_queue_high);
tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
}
tps_queue_low = low;
if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
"level '%ld'\n", pool->name, tps_queue_low);
tps_queue_low = -1;
}
for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {
ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
ast_taskprocessor_name(cur));
}
}
return 0;
}

@ -1280,11 +1280,22 @@ unsigned int ast_taskprocessor_seq_num(void)
return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
}
#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name)
{
int final_size = strlen(name) + SEQ_STR_SIZE;
ast_assert(buf != NULL && name != NULL);
ast_assert(final_size <= size);
snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num());
}
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
{
va_list ap;
int user_size;
#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
ast_assert(buf != NULL);
ast_assert(SEQ_STR_SIZE <= size);

@ -35,6 +35,8 @@
#include "asterisk/taskprocessor.h"
#include "asterisk/module.h"
#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/threadpool.h"
/*!
* \brief userdata associated with baseline taskprocessor test
@ -889,6 +891,78 @@ AST_TEST_DEFINE(taskprocessor_push_local)
return AST_TEST_PASS;
}
/*!
* \brief Baseline test for a serializer pool
*
* This test ensures that when a task is added to a taskprocessor that
* has been allocated with a default listener that the task gets executed
* as expected
*/
AST_TEST_DEFINE(serializer_pool)
{
RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown);
RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy);
RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
.auto_increment = 0,
.initial_size = 1,
.max_size = 0,
};
/* struct ast_taskprocessor *tps; */
switch (cmd) {
case TEST_INIT:
info->name = "serializer_pool";
info->category = "/main/taskprocessor/";
info->summary = "Test using a serializer pool";
info->description =
"Ensures that a queued task gets executed.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options));
ast_test_validate(test, serializer_pool = ast_serializer_pool_create(
"test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */
ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test"));
ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0));
ast_test_validate(test, task_data = task_data_create());
task_data->wait_time = 4000; /* task takes 4 seconds */
ast_test_validate(test, !ast_taskprocessor_push(
ast_serializer_pool_get(serializer_pool), task, task_data));
if (!ast_serializer_pool_destroy(serializer_pool)) {
ast_test_status_update(test, "Unexpected pool destruction!\n");
/*
* The pool should have timed out, so if it destruction reports success
* we need to fail.
*/
serializer_pool = NULL;
return AST_TEST_FAIL;
}
ast_test_validate(test, !task_wait(task_data));
/* The first attempt should have failed. Second try should destroy successfully */
if (ast_serializer_pool_destroy(serializer_pool)) {
ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n");
/*
* If this fails we'll try again on return to hopefully avoid a memory leak.
* If it again times out a third time, well not much we can do.
*/
return AST_TEST_FAIL;
}
/* Test passed, so set pool to NULL to avoid "re-running" destroy */
serializer_pool = NULL;
return AST_TEST_PASS;
}
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
@ -897,6 +971,7 @@ static int unload_module(void)
ast_test_unregister(taskprocessor_listener);
ast_test_unregister(taskprocessor_shutdown);
ast_test_unregister(taskprocessor_push_local);
ast_test_unregister(serializer_pool);
return 0;
}
@ -908,6 +983,7 @@ static int load_module(void)
ast_test_register(taskprocessor_listener);
ast_test_register(taskprocessor_shutdown);
ast_test_register(taskprocessor_push_local);
ast_test_register(serializer_pool);
return AST_MODULE_LOAD_SUCCESS;
}

Loading…
Cancel
Save