From 403b63571c53719bc050f4aded696cd933995d24 Mon Sep 17 00:00:00 2001 From: Alexei Gradinari Date: Mon, 8 Aug 2016 13:53:32 -0400 Subject: [PATCH] res_pjsip_mwi: fix unsolicited mwi blocks PJSIP stack The PJSIP taskprocessors could be overflowed on startup if there are many (thousands) realtime endpoints configured with unsolicited mwi. The PJSIP stack could be totally unresponsive for a few minutes after boot completed. This patch creates a separate PJSIP serializers pool for mwi and makes unsolicited mwi use serializers from this pool. This patch also adds 2 new global options to tune taskprocessor alert levels: 'mwi_tps_queue_high' and 'mwi_tps_queue_low'. This patch also adds new global option 'mwi_disable_initial_unsolicited' to disable sending unsolicited mwi to all endpoints on startup. If disabled then unsolicited mwi will start processing on next endpoint's contact update. ASTERISK-26230 #close Change-Id: I4c8ecb82c249eb887930980a800c9f87f28f861a --- CHANGES | 9 ++ configs/samples/pjsip.conf.sample | 21 +++ ...a44a5a0851_pjsip_add_global_mwi_options.py | 35 +++++ include/asterisk/res_pjsip.h | 26 ++++ res/res_pjsip.c | 42 ++++++ res/res_pjsip/config_global.c | 68 +++++++++ res/res_pjsip_mwi.c | 138 +++++++++++++++++- 7 files changed, 333 insertions(+), 6 deletions(-) create mode 100644 contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py diff --git a/CHANGES b/CHANGES index 3eadadf092..b353e96a3a 100644 --- a/CHANGES +++ b/CHANGES @@ -389,6 +389,15 @@ app_voicemail Additional information can be found in the sample configuration file at config/samples/voicemail.conf.sample. +res_pjsip_mwi +------------------ + * Added "mwi_tps_queue_high" and "mwi_tps_queue_low" global configuration + options to tune taskprocessor alert levels. + * Added "mwi_disable_initial_unsolicited" global configuration option + to disable sending unsolicited MWI to all endpoints on startup. + Additional information can be found in the sample configuration file at + config/samples/pjsip.conf.sample. + ------------------------------------------------------------------------------ --- Functionality changes from Asterisk 13.10.0 to Asterisk 13.11.0 ---------- ------------------------------------------------------------------------------ diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample index 99bdfb99d0..eac0549049 100644 --- a/configs/samples/pjsip.conf.sample +++ b/configs/samples/pjsip.conf.sample @@ -964,6 +964,27 @@ ; set to this value if there is no better option (such as ; auth/realm) to be used + ; Asterisk Task Processor Queue Size + ; On heavy loaded system with DB storage you may need to increase + ; taskprocessor queue. + ; If the taskprocessor queue size reached high water level, + ; the alert is triggered. + ; If the alert is set the pjsip distibutor stops processing incoming + ; requests until the alert is cleared. + ; The alert is cleared when taskprocessor queue size drops to the + ; low water clear level. + ; The next options set taskprocessor queue levels for MWI. +;mwi_tps_queue_high=500 ; Taskprocessor high water alert trigger level. +;mwi_tps_queue_low=450 ; Taskprocessor low water clear alert level. + ; The default is -1 for 90% of high water level. + + ; Unsolicited MWI + ; If there are endpoints configured with unsolicited MWI + ; then res_pjsip_mwi module tries to send MWI to all endpoints on startup. +;mwi_disable_initial_unsolicited=no ; Disable sending unsolicited mwi to all endpoints on startup. + ; If disabled then unsolicited mwi will start processing + ; on the endpoint's next contact update. + ; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl ;==========================ACL SECTION OPTIONS========================= ;[acl] diff --git a/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py b/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py new file mode 100644 index 0000000000..d3efa2278a --- /dev/null +++ b/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py @@ -0,0 +1,35 @@ +"""pjsip: add global MWI options + +Revision ID: c7a44a5a0851 +Revises: 4a6c67fa9b7a +Create Date: 2016-08-03 15:08:22.524727 + +""" + +# revision identifiers, used by Alembic. +revision = 'c7a44a5a0851' +down_revision = '4a6c67fa9b7a' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import ENUM + +YESNO_NAME = 'yesno_values' +YESNO_VALUES = ['yes', 'no'] + + +def upgrade(): + ############################# Enums ############################## + + # yesno_values have already been created, so use postgres enum object + # type to get around "already created" issue - works okay with mysql + yesno_values = ENUM(*YESNO_VALUES, name=YESNO_NAME, create_type=False) + + op.add_column('ps_globals', sa.Column('mwi_tps_queue_high', sa.Integer)) + op.add_column('ps_globals', sa.Column('mwi_tps_queue_low', sa.Integer)) + op.add_column('ps_globals', sa.Column('mwi_disable_initial_unsolicited', yesno_values)) + +def downgrade(): + op.drop_column('ps_globals', 'mwi_tps_queue_high') + op.drop_column('ps_globals', 'mwi_tps_queue_low') + op.drop_column('ps_globals', 'mwi_disable_initial_unsolicited') diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 9bb2a82c5d..cd6b33db47 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2416,6 +2416,32 @@ int ast_sip_register_supplement(struct ast_sip_supplement *supplement); */ void ast_sip_unregister_supplement(struct ast_sip_supplement *supplement); +/*! + * \brief Retrieve the global MWI taskprocessor high water alert trigger level. + * + * \since 13.12.0 + * + * \retval the system MWI taskprocessor high water alert trigger level + */ +unsigned int ast_sip_get_mwi_tps_queue_high(void); + +/*! + * \brief Retrieve the global MWI taskprocessor low water clear alert level. + * + * \since 13.12.0 + * + * \retval the system MWI taskprocessor low water clear alert level + */ +int ast_sip_get_mwi_tps_queue_low(void); + +/*! + * \brief Retrieve the global setting 'disable sending unsolicited mwi on startup'. + * \since 13.12.0 + * + * \retval non zero if disable. + */ +unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void); + /*! * \brief Retrieve the system debug setting (yes|no|host). * diff --git a/res/res_pjsip.c b/res/res_pjsip.c index ffbf880b11..8a9a19db89 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -1499,6 +1499,48 @@ set to this value if there is no better option (such as auth/realm) to be used. + + MWI taskprocessor high water alert trigger level. + + On a heavily loaded system you may need to adjust the + taskprocessor queue limits. If any taskprocessor queue size + reaches its high water level then pjsip will stop processing + new requests until the alert is cleared. The alert clears + when all alerting taskprocessor queues have dropped to their + low water clear level. + + + + + MWI taskprocessor low water clear alert level. + + On a heavily loaded system you may need to adjust the + taskprocessor queue limits. If any taskprocessor queue size + reaches its high water level then pjsip will stop processing + new requests until the alert is cleared. The alert clears + when all alerting taskprocessor queues have dropped to their + low water clear level. + + Set to -1 for the low water level to be 90% of + the high water level. + + + + Enable/Disable sending unsolicited MWI to all endpoints on startup. + + When the initial unsolicited MWI notification are + enabled on startup then the initial notifications + get sent at startup. If you have a lot of endpoints + (thousands) that use unsolicited MWI then you may + want to consider disabling the initial startup + notifications. + + When the initial unsolicited MWI notifications are + disabled on startup then the notifications will start + on the endpoint's next contact update. + + + diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c index 6bb6888049..8a1b0d449f 100644 --- a/res/res_pjsip/config_global.c +++ b/res/res_pjsip/config_global.c @@ -24,6 +24,7 @@ #include "asterisk/res_pjsip.h" #include "include/res_pjsip_private.h" #include "asterisk/sorcery.h" +#include "asterisk/taskprocessor.h" #include "asterisk/ast_version.h" #include "asterisk/res_pjsip_cli.h" @@ -43,6 +44,9 @@ #define DEFAULT_UNIDENTIFIED_REQUEST_COUNT 5 #define DEFAULT_UNIDENTIFIED_REQUEST_PERIOD 5 #define DEFAULT_UNIDENTIFIED_REQUEST_PRUNE_INTERVAL 30 +#define DEFAULT_MWI_TPS_QUEUE_HIGH AST_TASKPROCESSOR_HIGH_WATER_LEVEL +#define DEFAULT_MWI_TPS_QUEUE_LOW -1 +#define DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED 0 static char default_useragent[256]; @@ -79,6 +83,14 @@ struct global_config { unsigned int unidentified_request_period; /* Interval at which expired unidentifed requests will be pruned */ unsigned int unidentified_request_prune_interval; + struct { + /*! Taskprocessor high water alert trigger level */ + unsigned int tps_queue_high; + /*! Taskprocessor low water clear alert level. */ + int tps_queue_low; + /*! Nonzero to disable sending unsolicited mwi to all endpoints on startup */ + unsigned int disable_initial_unsolicited; + } mwi; }; static void global_destructor(void *obj) @@ -314,6 +326,53 @@ void ast_sip_get_default_from_user(char *from_user, size_t size) } } + +unsigned int ast_sip_get_mwi_tps_queue_high(void) +{ + unsigned int tps_queue_high; + struct global_config *cfg; + + cfg = get_global_cfg(); + if (!cfg) { + return DEFAULT_MWI_TPS_QUEUE_HIGH; + } + + tps_queue_high = cfg->mwi.tps_queue_high; + ao2_ref(cfg, -1); + return tps_queue_high; +} + +int ast_sip_get_mwi_tps_queue_low(void) +{ + int tps_queue_low; + struct global_config *cfg; + + cfg = get_global_cfg(); + if (!cfg) { + return DEFAULT_MWI_TPS_QUEUE_LOW; + } + + tps_queue_low = cfg->mwi.tps_queue_low; + ao2_ref(cfg, -1); + return tps_queue_low; +} + +unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void) +{ + unsigned int disable_initial_unsolicited; + struct global_config *cfg; + + cfg = get_global_cfg(); + if (!cfg) { + return DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED; + } + + disable_initial_unsolicited = cfg->mwi.disable_initial_unsolicited; + ao2_ref(cfg, -1); + return disable_initial_unsolicited; +} + + /*! * \internal * \brief Observer to set default global object if none exist. @@ -450,6 +509,15 @@ int ast_sip_initialize_sorcery_global(void) OPT_UINT_T, 0, FLDSET(struct global_config, unidentified_request_prune_interval)); ast_sorcery_object_field_register(sorcery, "global", "default_realm", DEFAULT_REALM, OPT_STRINGFIELD_T, 0, STRFLDSET(struct global_config, default_realm)); + ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_high", + __stringify(DEFAULT_MWI_TPS_QUEUE_HIGH), + OPT_UINT_T, 0, FLDSET(struct global_config, mwi.tps_queue_high)); + ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_low", + __stringify(DEFAULT_MWI_TPS_QUEUE_LOW), + OPT_INT_T, 0, FLDSET(struct global_config, mwi.tps_queue_low)); + ast_sorcery_object_field_register(sorcery, "global", "mwi_disable_initial_unsolicited", + DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED ? "yes" : "no", + OPT_BOOL_T, 1, FLDSET(struct global_config, mwi.disable_initial_unsolicited)); if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) { return -1; diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index d86c96c744..bf7f042760 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -35,6 +35,7 @@ #include "asterisk/module.h" #include "asterisk/logger.h" #include "asterisk/astobj2.h" +#include "asterisk/taskprocessor.h" #include "asterisk/sorcery.h" #include "asterisk/stasis.h" #include "asterisk/app.h" @@ -52,6 +53,12 @@ static char *default_voicemail_extension; #define MWI_DATASTORE "MWI datastore" +/*! Number of serializers in pool if one not supplied. */ +#define MWI_SERIALIZER_POOL_SIZE 8 + +/*! Pool of serializers to use if not supplied. */ +static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE]; + static void mwi_subscription_shutdown(struct ast_sip_subscription *sub); static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf); static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint, @@ -119,6 +126,117 @@ struct mwi_subscription { char id[1]; }; +/*! + * \internal + * \brief Shutdown the serializers in the mwi pool. + * \since 13.12.0 + * + * \return Nothing + */ +static void mwi_serializer_pool_shutdown(void) +{ + int idx; + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + ast_taskprocessor_unreference(mwi_serializer_pool[idx]); + mwi_serializer_pool[idx] = NULL; + } +} + +/*! + * \internal + * \brief Setup the serializers in the mwi pool. + * \since 13.12.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int mwi_serializer_pool_setup(void) +{ + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + int idx; + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi"); + + mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name); + if (!mwi_serializer_pool[idx]) { + mwi_serializer_pool_shutdown(); + return -1; + } + } + return 0; +} + +/*! + * \internal + * \brief Pick a mwi serializer from the pool. + * \since 13.12.0 + * + * \retval least queue size task processor. + */ +static struct ast_taskprocessor *get_mwi_serializer(void) +{ + int idx; + int pos; + + if (!mwi_serializer_pool[0]) { + return NULL; + } + + for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) { + pos = idx; + } + } + + return mwi_serializer_pool[pos]; +} + +/*! + * \internal + * \brief Set taskprocessor alert levels for the serializers in the mwi pool. + * \since 13.12.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int mwi_serializer_set_alert_levels(void) +{ + int idx; + long tps_queue_high; + long tps_queue_low; + + if (!mwi_serializer_pool[0]) { + return -1; + } + + tps_queue_high = ast_sip_get_mwi_tps_queue_high(); + if (tps_queue_high <= 0) { + ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n", + tps_queue_high); + tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; + } + + tps_queue_low = ast_sip_get_mwi_tps_queue_low(); + if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) { + ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n", + tps_queue_low); + tps_queue_low = -1; + } + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) { + ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n", + idx); + } + } + + return 0; +} + + static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg); @@ -945,7 +1063,7 @@ static int send_notify(void *obj, void *arg, int flags) struct mwi_subscription *mwi_sub = obj; struct ast_taskprocessor *serializer = mwi_sub->is_solicited ? ast_sip_subscription_get_serializer(mwi_sub->sip_sub) - : NULL; + : get_mwi_serializer(); if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); @@ -1063,7 +1181,7 @@ static int send_contact_notify(void *obj, void *arg, int flags) return 0; } - if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } @@ -1149,6 +1267,7 @@ static void global_loaded(const char *object_type) { ast_free(default_voicemail_extension); default_voicemail_extension = ast_sip_get_default_voicemail_extension(); + mwi_serializer_set_alert_levels(); } static struct ast_sorcery_observer global_observer = { @@ -1175,15 +1294,21 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } + if (mwi_serializer_pool_setup()) { + ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); + } + create_mwi_subscriptions(); ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer); ast_sorcery_reload_object(ast_sip_get_sorcery(), "global"); - if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { - ast_sip_push_task(NULL, send_initial_notify_all, NULL); - } else { - stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + if (!ast_sip_get_mwi_disable_initial_unsolicited()) { + if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { + ast_sip_push_task(NULL, send_initial_notify_all, NULL); + } else { + stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + } } return AST_MODULE_LOAD_SUCCESS; @@ -1193,6 +1318,7 @@ static int unload_module(void) { ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); ao2_ref(unsolicited_mwi, -1); + mwi_serializer_pool_shutdown(); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); ast_sip_unregister_subscription_handler(&mwi_handler);