From 2042020abfc145c8882ece41c18a5a4797805d8a Mon Sep 17 00:00:00 2001 From: Kevin Harwell Date: Tue, 1 Oct 2019 15:54:59 -0500 Subject: [PATCH] res_pjsip/res_pjsip_mwi: use centralized serializer pools Both res_pjsip and res_pjsip_mwi made use of serializer pools. However, they both implemented their own serializer pool functionality that was pretty much identical in each of the source files. This patch removes the duplicated code, and uses the new 'ast_serializer_pool' object instead. Additionally res_pjsip_mwi enables a shutdown group on the pool since if the timing was right the module could be unloaded while taskprocessor threads still needed to execute, thus causing a crash. Change-Id: I959b0805ad024585bbb6276593118be34fbf6e1d --- include/asterisk/res_pjsip.h | 5 ++ res/res_pjsip.c | 83 ++++------------- res/res_pjsip_mwi.c | 169 +++++++++-------------------------- 3 files changed, 64 insertions(+), 193 deletions(-) diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 163059531c..a71e2a1129 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2981,6 +2981,11 @@ const char *ast_sip_get_host_ip_string(int af); */ long ast_sip_threadpool_queue_size(void); +/*! + * \brief Retrieve the SIP threadpool object + */ +struct ast_threadpool *ast_sip_threadpool(void); + /*! * \brief Retrieve transport state * \since 13.7.1 diff --git a/res/res_pjsip.c b/res/res_pjsip.c index ec159d6fa5..e55c157af1 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -34,6 +34,7 @@ #include "asterisk/utils.h" #include "asterisk/astobj2.h" #include "asterisk/module.h" +#include "asterisk/serializer.h" #include "asterisk/threadpool.h" #include "asterisk/taskprocessor.h" #include "asterisk/uuid.h" @@ -2548,7 +2549,7 @@ #define SERIALIZER_POOL_SIZE 8 /*! Pool of serializers to use if not supplied. */ -static struct ast_taskprocessor *serializer_pool[SERIALIZER_POOL_SIZE]; +static struct ast_serializer_pool *sip_serializer_pool; static pjsip_endpoint *ast_pjsip_endpoint; @@ -4330,71 +4331,10 @@ struct ast_taskprocessor *ast_sip_create_serializer(void) return ast_sip_create_serializer_group_named(tps_name, NULL); } -/*! - * \internal - * \brief Shutdown the serializers in the default pool. - * \since 14.0.0 - * - * \return Nothing - */ -static void serializer_pool_shutdown(void) -{ - int idx; - - for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { - ast_taskprocessor_unreference(serializer_pool[idx]); - serializer_pool[idx] = NULL; - } -} - -/*! - * \internal - * \brief Setup the serializers in the default pool. - * \since 14.0.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int serializer_pool_setup(void) -{ - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - int idx; - - for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/default"); - - serializer_pool[idx] = ast_sip_create_serializer_named(tps_name); - if (!serializer_pool[idx]) { - serializer_pool_shutdown(); - return -1; - } - } - return 0; -} - -static struct ast_taskprocessor *serializer_pool_pick(void) -{ - int idx; - int pos = 0; - - if (!serializer_pool[0]) { - return NULL; - } - - for (idx = 1; idx < SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_size(serializer_pool[idx]) < ast_taskprocessor_size(serializer_pool[pos])) { - pos = idx; - } - } - - return serializer_pool[pos]; -} - int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - serializer = serializer_pool_pick(); + serializer = ast_serializer_pool_get(sip_serializer_pool); } return ast_taskprocessor_push(serializer, sip_task, task_data); @@ -4475,7 +4415,7 @@ int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int { if (!serializer) { /* Caller doesn't care which PJSIP serializer the task executes under. */ - serializer = serializer_pool_pick(); + serializer = ast_serializer_pool_get(sip_serializer_pool); if (!serializer) { /* No serializer picked to execute the task */ return -1; @@ -4821,6 +4761,11 @@ long ast_sip_threadpool_queue_size(void) return ast_threadpool_queue_size(sip_threadpool); } +struct ast_threadpool *ast_sip_threadpool(void) +{ + return sip_threadpool; +} + #ifdef TEST_FRAMEWORK AST_TEST_DEFINE(xml_sanitization_end_null) { @@ -4902,7 +4847,7 @@ static int unload_pjsip(void *data) * These calls need the pjsip endpoint and serializer to clean up. * If they're not set, then there's nothing to clean up anyway. */ - if (ast_pjsip_endpoint && serializer_pool[0]) { + if (ast_pjsip_endpoint && sip_serializer_pool) { ast_res_pjsip_cleanup_options_handling(); internal_sip_destroy_outbound_authentication(); ast_res_pjsip_cleanup_message_filter(); @@ -5041,7 +4986,9 @@ static int load_module(void) goto error; } - if (serializer_pool_setup()) { + sip_serializer_pool = ast_serializer_pool_create( + "pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1); + if (!sip_serializer_pool) { ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n"); goto error; } @@ -5118,7 +5065,7 @@ error: /* These functions all check for NULLs and are safe to call at any time */ ast_sip_destroy_scheduler(); - serializer_pool_shutdown(); + ast_serializer_pool_destroy(sip_serializer_pool); ast_threadpool_shutdown(sip_threadpool); return AST_MODULE_LOAD_DECLINE; @@ -5149,7 +5096,7 @@ static int unload_module(void) */ ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL); ast_sip_destroy_scheduler(); - serializer_pool_shutdown(); + ast_serializer_pool_destroy(sip_serializer_pool); ast_threadpool_shutdown(sip_threadpool); ast_pjproject_unref(); diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index 810db5d9cb..9cabe1a08a 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -36,6 +36,7 @@ #include "asterisk/logger.h" #include "asterisk/astobj2.h" #include "asterisk/taskprocessor.h" +#include "asterisk/serializer.h" #include "asterisk/sorcery.h" #include "asterisk/stasis.h" #include "asterisk/mwi.h" @@ -57,8 +58,11 @@ static char *default_voicemail_extension; /*! Number of serializers in pool if one not supplied. */ #define MWI_SERIALIZER_POOL_SIZE 8 +/*! Max timeout for all threads to join during an unload. */ +#define MAX_UNLOAD_TIMEOUT_TIME 10 /* Seconds */ + /*! Pool of serializers to use if not supplied. */ -static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE]; +static struct ast_serializer_pool *mwi_serializer_pool; static void mwi_subscription_shutdown(struct ast_sip_subscription *sub); static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf); @@ -129,116 +133,6 @@ struct mwi_subscription { char id[1]; }; -/*! - * \internal - * \brief Shutdown the serializers in the mwi pool. - * \since 13.12.0 - * - * \return Nothing - */ -static void mwi_serializer_pool_shutdown(void) -{ - int idx; - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - ast_taskprocessor_unreference(mwi_serializer_pool[idx]); - mwi_serializer_pool[idx] = NULL; - } -} - -/*! - * \internal - * \brief Setup the serializers in the mwi pool. - * \since 13.12.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int mwi_serializer_pool_setup(void) -{ - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - int idx; - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi"); - - mwi_serializer_pool[idx] = ast_sip_create_serializer_named(tps_name); - if (!mwi_serializer_pool[idx]) { - mwi_serializer_pool_shutdown(); - return -1; - } - } - return 0; -} - -/*! - * \internal - * \brief Pick a mwi serializer from the pool. - * \since 13.12.0 - * - * \retval least queue size task processor. - */ -static struct ast_taskprocessor *get_mwi_serializer(void) -{ - int idx; - int pos; - - if (!mwi_serializer_pool[0]) { - return NULL; - } - - for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) { - pos = idx; - } - } - - return mwi_serializer_pool[pos]; -} - -/*! - * \internal - * \brief Set taskprocessor alert levels for the serializers in the mwi pool. - * \since 13.12.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int mwi_serializer_set_alert_levels(void) -{ - int idx; - long tps_queue_high; - long tps_queue_low; - - if (!mwi_serializer_pool[0]) { - return -1; - } - - tps_queue_high = ast_sip_get_mwi_tps_queue_high(); - if (tps_queue_high <= 0) { - ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n", - tps_queue_high); - tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; - } - - tps_queue_low = ast_sip_get_mwi_tps_queue_low(); - if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) { - ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n", - tps_queue_low); - tps_queue_low = -1; - } - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) { - ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n", - idx); - } - } - - return 0; -} - static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg); @@ -1218,7 +1112,7 @@ static int send_notify(void *obj, void *arg, int flags) struct mwi_subscription *mwi_sub = obj; struct ast_taskprocessor *serializer = mwi_sub->is_solicited ? ast_sip_subscription_get_serializer(mwi_sub->sip_sub) - : get_mwi_serializer(); + : ast_serializer_pool_get(mwi_serializer_pool); if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); @@ -1233,7 +1127,8 @@ static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct mwi_subscription *mwi_sub = userdata; if (stasis_subscription_final_message(sub, msg)) { - if (ast_sip_push_task(NULL, serialized_cleanup, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + serialized_cleanup, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } return; @@ -1396,7 +1291,8 @@ static int send_contact_notify(void *obj, void *arg, int flags) return 0; } - if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } @@ -1518,7 +1414,8 @@ static void mwi_startup_event_cb(void *data, struct stasis_subscription *sub, st return; } - ast_sip_push_task(NULL, send_initial_notify_all, NULL); + ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + send_initial_notify_all, NULL); stasis_unsubscribe(sub); } @@ -1527,7 +1424,8 @@ static void global_loaded(const char *object_type) { ast_free(default_voicemail_extension); default_voicemail_extension = ast_sip_get_default_voicemail_extension(); - mwi_serializer_set_alert_levels(); + ast_serializer_pool_set_alerts(mwi_serializer_pool, + ast_sip_get_mwi_tps_queue_high(), ast_sip_get_mwi_tps_queue_low()); } static struct ast_sorcery_observer global_observer = { @@ -1550,14 +1448,16 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } - if (mwi_serializer_pool_setup()) { + mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi", + MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME); + if (!mwi_serializer_pool) { ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); } solicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, mwi_sub_hash, NULL, mwi_sub_cmp); if (!solicited_mwi) { - mwi_serializer_pool_shutdown(); + ast_serializer_pool_destroy(mwi_serializer_pool); ast_sip_unregister_subscription_handler(&mwi_handler); return AST_MODULE_LOAD_DECLINE; } @@ -1565,7 +1465,7 @@ static int load_module(void) unsolicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, mwi_sub_hash, NULL, mwi_sub_cmp); if (!unsolicited_mwi) { - mwi_serializer_pool_shutdown(); + ast_serializer_pool_destroy(mwi_serializer_pool); ast_sip_unregister_subscription_handler(&mwi_handler); ao2_ref(solicited_mwi, -1); return AST_MODULE_LOAD_DECLINE; @@ -1578,7 +1478,8 @@ static int load_module(void) if (!ast_sip_get_mwi_disable_initial_unsolicited()) { create_mwi_subscriptions(); if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { - ast_sip_push_task(NULL, send_initial_notify_all, NULL); + ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + send_initial_notify_all, NULL); } else { struct stasis_subscription *sub; @@ -1588,6 +1489,15 @@ static int load_module(void) } } + if (!mwi_serializer_pool) { + /* + * If the mwi serializer pool was unable to be established then the module will + * use the default serializer pool. If this happens prevent manual unloading + * since there would now exist the potential for a crash on unload. + */ + ast_module_shutdown_ref(ast_module_info->self); + } + return AST_MODULE_LOAD_SUCCESS; } @@ -1596,13 +1506,22 @@ static int unload_module(void) ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); - ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); - ao2_ref(unsolicited_mwi, -1); - unsolicited_mwi = NULL; + if (unsolicited_mwi) { + ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); + ao2_ref(unsolicited_mwi, -1); + unsolicited_mwi = NULL; + } - ao2_cleanup(solicited_mwi); + if (solicited_mwi) { + ao2_ref(solicited_mwi, -1); + solicited_mwi = NULL; + } - mwi_serializer_pool_shutdown(); + if (ast_serializer_pool_destroy(mwi_serializer_pool)) { + ast_log(LOG_WARNING, "Unload incomplete. Try again later\n"); + return -1; + } + mwi_serializer_pool = NULL; ast_sip_unregister_subscription_handler(&mwi_handler);