res_pjsip/res_pjsip_mwi: use centralized serializer pools

Both res_pjsip and res_pjsip_mwi made use of serializer pools. However, they
both implemented their own serializer pool functionality that was pretty much
identical in each of the source files. This patch removes the duplicated code,
and uses the new 'ast_serializer_pool' object instead.

Additionally res_pjsip_mwi enables a shutdown group on the pool since if the
timing was right the module could be unloaded while taskprocessor threads still
needed to execute, thus causing a crash.

Change-Id: I959b0805ad024585bbb6276593118be34fbf6e1d
17.1
Kevin Harwell 6 years ago
parent 37ec88c4c8
commit 299ba78b09

@ -2980,6 +2980,11 @@ const char *ast_sip_get_host_ip_string(int af);
*/
long ast_sip_threadpool_queue_size(void);
/*!
* \brief Retrieve the SIP threadpool object
*/
struct ast_threadpool *ast_sip_threadpool(void);
/*!
* \brief Retrieve transport state
* \since 13.7.1

@ -34,6 +34,7 @@
#include "asterisk/utils.h"
#include "asterisk/astobj2.h"
#include "asterisk/module.h"
#include "asterisk/serializer.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/uuid.h"
@ -2840,7 +2841,7 @@
#define SERIALIZER_POOL_SIZE 8
/*! Pool of serializers to use if not supplied. */
static struct ast_taskprocessor *serializer_pool[SERIALIZER_POOL_SIZE];
static struct ast_serializer_pool *sip_serializer_pool;
static pjsip_endpoint *ast_pjsip_endpoint;
@ -4626,71 +4627,10 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name)
return ast_sip_create_serializer_group(name, NULL);
}
/*!
* \internal
* \brief Shutdown the serializers in the default pool.
* \since 14.0.0
*
* \return Nothing
*/
static void serializer_pool_shutdown(void)
{
int idx;
for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) {
ast_taskprocessor_unreference(serializer_pool[idx]);
serializer_pool[idx] = NULL;
}
}
/*!
* \internal
* \brief Setup the serializers in the default pool.
* \since 14.0.0
*
* \retval 0 on success.
* \retval -1 on error.
*/
static int serializer_pool_setup(void)
{
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
int idx;
for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) {
/* Create name with seq number appended. */
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/default");
serializer_pool[idx] = ast_sip_create_serializer(tps_name);
if (!serializer_pool[idx]) {
serializer_pool_shutdown();
return -1;
}
}
return 0;
}
static struct ast_taskprocessor *serializer_pool_pick(void)
{
int idx;
int pos = 0;
if (!serializer_pool[0]) {
return NULL;
}
for (idx = 1; idx < SERIALIZER_POOL_SIZE; ++idx) {
if (ast_taskprocessor_size(serializer_pool[idx]) < ast_taskprocessor_size(serializer_pool[pos])) {
pos = idx;
}
}
return serializer_pool[pos];
}
int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data)
{
if (!serializer) {
serializer = serializer_pool_pick();
serializer = ast_serializer_pool_get(sip_serializer_pool);
}
return ast_taskprocessor_push(serializer, sip_task, task_data);
@ -4771,7 +4711,7 @@ int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int
{
if (!serializer) {
/* Caller doesn't care which PJSIP serializer the task executes under. */
serializer = serializer_pool_pick();
serializer = ast_serializer_pool_get(sip_serializer_pool);
if (!serializer) {
/* No serializer picked to execute the task */
return -1;
@ -5133,6 +5073,11 @@ long ast_sip_threadpool_queue_size(void)
return ast_threadpool_queue_size(sip_threadpool);
}
struct ast_threadpool *ast_sip_threadpool(void)
{
return sip_threadpool;
}
#ifdef TEST_FRAMEWORK
AST_TEST_DEFINE(xml_sanitization_end_null)
{
@ -5204,7 +5149,7 @@ static int unload_pjsip(void *data)
* These calls need the pjsip endpoint and serializer to clean up.
* If they're not set, then there's nothing to clean up anyway.
*/
if (ast_pjsip_endpoint && serializer_pool[0]) {
if (ast_pjsip_endpoint && sip_serializer_pool) {
ast_res_pjsip_cleanup_options_handling();
ast_res_pjsip_cleanup_message_filter();
ast_sip_destroy_distributor();
@ -5340,7 +5285,9 @@ static int load_module(void)
goto error;
}
if (serializer_pool_setup()) {
sip_serializer_pool = ast_serializer_pool_create(
"pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1);
if (!sip_serializer_pool) {
ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n");
goto error;
}
@ -5413,7 +5360,7 @@ error:
/* These functions all check for NULLs and are safe to call at any time */
ast_sip_destroy_scheduler();
serializer_pool_shutdown();
ast_serializer_pool_destroy(sip_serializer_pool);
ast_threadpool_shutdown(sip_threadpool);
return AST_MODULE_LOAD_DECLINE;
@ -5444,7 +5391,7 @@ static int unload_module(void)
*/
ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL);
ast_sip_destroy_scheduler();
serializer_pool_shutdown();
ast_serializer_pool_destroy(sip_serializer_pool);
ast_threadpool_shutdown(sip_threadpool);
return 0;

@ -36,6 +36,7 @@
#include "asterisk/logger.h"
#include "asterisk/astobj2.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/serializer.h"
#include "asterisk/sorcery.h"
#include "asterisk/stasis.h"
#include "asterisk/mwi.h"
@ -57,8 +58,11 @@ static char *default_voicemail_extension;
/*! Number of serializers in pool if one not supplied. */
#define MWI_SERIALIZER_POOL_SIZE 8
/*! Max timeout for all threads to join during an unload. */
#define MAX_UNLOAD_TIMEOUT_TIME 10 /* Seconds */
/*! Pool of serializers to use if not supplied. */
static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE];
static struct ast_serializer_pool *mwi_serializer_pool;
static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
@ -129,117 +133,6 @@ struct mwi_subscription {
char id[1];
};
/*!
* \internal
* \brief Shutdown the serializers in the mwi pool.
* \since 13.12.0
*
* \return Nothing
*/
static void mwi_serializer_pool_shutdown(void)
{
int idx;
for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
ast_taskprocessor_unreference(mwi_serializer_pool[idx]);
mwi_serializer_pool[idx] = NULL;
}
}
/*!
* \internal
* \brief Setup the serializers in the mwi pool.
* \since 13.12.0
*
* \retval 0 on success.
* \retval -1 on error.
*/
static int mwi_serializer_pool_setup(void)
{
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
int idx;
for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
/* Create name with seq number appended. */
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi");
mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name);
if (!mwi_serializer_pool[idx]) {
mwi_serializer_pool_shutdown();
return -1;
}
}
return 0;
}
/*!
* \internal
* \brief Pick a mwi serializer from the pool.
* \since 13.12.0
*
* \retval least queue size task processor.
*/
static struct ast_taskprocessor *get_mwi_serializer(void)
{
int idx;
int pos;
if (!mwi_serializer_pool[0]) {
return NULL;
}
for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) {
pos = idx;
}
}
return mwi_serializer_pool[pos];
}
/*!
* \internal
* \brief Set taskprocessor alert levels for the serializers in the mwi pool.
* \since 13.12.0
*
* \retval 0 on success.
* \retval -1 on error.
*/
static int mwi_serializer_set_alert_levels(void)
{
int idx;
long tps_queue_high;
long tps_queue_low;
if (!mwi_serializer_pool[0]) {
return -1;
}
tps_queue_high = ast_sip_get_mwi_tps_queue_high();
if (tps_queue_high <= 0) {
ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n",
tps_queue_high);
tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
}
tps_queue_low = ast_sip_get_mwi_tps_queue_low();
if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n",
tps_queue_low);
tps_queue_low = -1;
}
for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) {
ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n",
idx);
}
}
return 0;
}
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg);
@ -1213,7 +1106,7 @@ static int send_notify(void *obj, void *arg, int flags)
struct mwi_subscription *mwi_sub = obj;
struct ast_taskprocessor *serializer = mwi_sub->is_solicited
? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
: get_mwi_serializer();
: ast_serializer_pool_get(mwi_serializer_pool);
if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
@ -1228,7 +1121,8 @@ static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
struct mwi_subscription *mwi_sub = userdata;
if (stasis_subscription_final_message(sub, msg)) {
if (ast_sip_push_task(NULL, serialized_cleanup, ao2_bump(mwi_sub))) {
if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool),
serialized_cleanup, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
}
return;
@ -1391,7 +1285,8 @@ static int send_contact_notify(void *obj, void *arg, int flags)
return 0;
}
if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) {
if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool),
serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
}
@ -1513,7 +1408,8 @@ static void mwi_startup_event_cb(void *data, struct stasis_subscription *sub, st
return;
}
ast_sip_push_task(NULL, send_initial_notify_all, NULL);
ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool),
send_initial_notify_all, NULL);
stasis_unsubscribe(sub);
}
@ -1522,7 +1418,8 @@ static void global_loaded(const char *object_type)
{
ast_free(default_voicemail_extension);
default_voicemail_extension = ast_sip_get_default_voicemail_extension();
mwi_serializer_set_alert_levels();
ast_serializer_pool_set_alerts(mwi_serializer_pool,
ast_sip_get_mwi_tps_queue_high(), ast_sip_get_mwi_tps_queue_low());
}
static struct ast_sorcery_observer global_observer = {
@ -1543,14 +1440,16 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
if (mwi_serializer_pool_setup()) {
mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi",
MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME);
if (!mwi_serializer_pool) {
ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
}
solicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS,
mwi_sub_hash, NULL, mwi_sub_cmp);
if (!solicited_mwi) {
mwi_serializer_pool_shutdown();
ast_serializer_pool_destroy(mwi_serializer_pool);
ast_sip_unregister_subscription_handler(&mwi_handler);
return AST_MODULE_LOAD_DECLINE;
}
@ -1558,7 +1457,7 @@ static int load_module(void)
unsolicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS,
mwi_sub_hash, NULL, mwi_sub_cmp);
if (!unsolicited_mwi) {
mwi_serializer_pool_shutdown();
ast_serializer_pool_destroy(mwi_serializer_pool);
ast_sip_unregister_subscription_handler(&mwi_handler);
ao2_ref(solicited_mwi, -1);
return AST_MODULE_LOAD_DECLINE;
@ -1571,7 +1470,8 @@ static int load_module(void)
if (!ast_sip_get_mwi_disable_initial_unsolicited()) {
create_mwi_subscriptions();
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
ast_sip_push_task(NULL, send_initial_notify_all, NULL);
ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool),
send_initial_notify_all, NULL);
} else {
struct stasis_subscription *sub;
@ -1581,6 +1481,15 @@ static int load_module(void)
}
}
if (!mwi_serializer_pool) {
/*
* If the mwi serializer pool was unable to be established then the module will
* use the default serializer pool. If this happens prevent manual unloading
* since there would now exist the potential for a crash on unload.
*/
ast_module_shutdown_ref(ast_module_info->self);
}
return AST_MODULE_LOAD_SUCCESS;
}
@ -1589,13 +1498,22 @@ static int unload_module(void)
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
ao2_ref(unsolicited_mwi, -1);
unsolicited_mwi = NULL;
if (unsolicited_mwi) {
ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
ao2_ref(unsolicited_mwi, -1);
unsolicited_mwi = NULL;
}
ao2_cleanup(solicited_mwi);
if (solicited_mwi) {
ao2_ref(solicited_mwi, -1);
solicited_mwi = NULL;
}
mwi_serializer_pool_shutdown();
if (ast_serializer_pool_destroy(mwi_serializer_pool)) {
ast_log(LOG_WARNING, "Unload incomplete. Try again later\n");
return -1;
}
mwi_serializer_pool = NULL;
ast_sip_unregister_subscription_handler(&mwi_handler);

Loading…
Cancel
Save