diff --git a/CHANGES b/CHANGES index 3eadadf092..b353e96a3a 100644 --- a/CHANGES +++ b/CHANGES @@ -389,6 +389,15 @@ app_voicemail Additional information can be found in the sample configuration file at config/samples/voicemail.conf.sample. +res_pjsip_mwi +------------------ + * Added "mwi_tps_queue_high" and "mwi_tps_queue_low" global configuration + options to tune taskprocessor alert levels. + * Added "mwi_disable_initial_unsolicited" global configuration option + to disable sending unsolicited MWI to all endpoints on startup. + Additional information can be found in the sample configuration file at + config/samples/pjsip.conf.sample. + ------------------------------------------------------------------------------ --- Functionality changes from Asterisk 13.10.0 to Asterisk 13.11.0 ---------- ------------------------------------------------------------------------------ diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample index 99bdfb99d0..eac0549049 100644 --- a/configs/samples/pjsip.conf.sample +++ b/configs/samples/pjsip.conf.sample @@ -964,6 +964,27 @@ ; set to this value if there is no better option (such as ; auth/realm) to be used + ; Asterisk Task Processor Queue Size + ; On heavy loaded system with DB storage you may need to increase + ; taskprocessor queue. + ; If the taskprocessor queue size reached high water level, + ; the alert is triggered. + ; If the alert is set the pjsip distibutor stops processing incoming + ; requests until the alert is cleared. + ; The alert is cleared when taskprocessor queue size drops to the + ; low water clear level. + ; The next options set taskprocessor queue levels for MWI. +;mwi_tps_queue_high=500 ; Taskprocessor high water alert trigger level. +;mwi_tps_queue_low=450 ; Taskprocessor low water clear alert level. + ; The default is -1 for 90% of high water level. + + ; Unsolicited MWI + ; If there are endpoints configured with unsolicited MWI + ; then res_pjsip_mwi module tries to send MWI to all endpoints on startup. +;mwi_disable_initial_unsolicited=no ; Disable sending unsolicited mwi to all endpoints on startup. + ; If disabled then unsolicited mwi will start processing + ; on the endpoint's next contact update. + ; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl ;==========================ACL SECTION OPTIONS========================= ;[acl] diff --git a/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py b/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py new file mode 100644 index 0000000000..d3efa2278a --- /dev/null +++ b/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py @@ -0,0 +1,35 @@ +"""pjsip: add global MWI options + +Revision ID: c7a44a5a0851 +Revises: 4a6c67fa9b7a +Create Date: 2016-08-03 15:08:22.524727 + +""" + +# revision identifiers, used by Alembic. +revision = 'c7a44a5a0851' +down_revision = '4a6c67fa9b7a' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import ENUM + +YESNO_NAME = 'yesno_values' +YESNO_VALUES = ['yes', 'no'] + + +def upgrade(): + ############################# Enums ############################## + + # yesno_values have already been created, so use postgres enum object + # type to get around "already created" issue - works okay with mysql + yesno_values = ENUM(*YESNO_VALUES, name=YESNO_NAME, create_type=False) + + op.add_column('ps_globals', sa.Column('mwi_tps_queue_high', sa.Integer)) + op.add_column('ps_globals', sa.Column('mwi_tps_queue_low', sa.Integer)) + op.add_column('ps_globals', sa.Column('mwi_disable_initial_unsolicited', yesno_values)) + +def downgrade(): + op.drop_column('ps_globals', 'mwi_tps_queue_high') + op.drop_column('ps_globals', 'mwi_tps_queue_low') + op.drop_column('ps_globals', 'mwi_disable_initial_unsolicited') diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 9bb2a82c5d..cd6b33db47 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2416,6 +2416,32 @@ int ast_sip_register_supplement(struct ast_sip_supplement *supplement); */ void ast_sip_unregister_supplement(struct ast_sip_supplement *supplement); +/*! + * \brief Retrieve the global MWI taskprocessor high water alert trigger level. + * + * \since 13.12.0 + * + * \retval the system MWI taskprocessor high water alert trigger level + */ +unsigned int ast_sip_get_mwi_tps_queue_high(void); + +/*! + * \brief Retrieve the global MWI taskprocessor low water clear alert level. + * + * \since 13.12.0 + * + * \retval the system MWI taskprocessor low water clear alert level + */ +int ast_sip_get_mwi_tps_queue_low(void); + +/*! + * \brief Retrieve the global setting 'disable sending unsolicited mwi on startup'. + * \since 13.12.0 + * + * \retval non zero if disable. + */ +unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void); + /*! * \brief Retrieve the system debug setting (yes|no|host). * diff --git a/res/res_pjsip.c b/res/res_pjsip.c index ffbf880b11..8a9a19db89 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -1499,6 +1499,48 @@ set to this value if there is no better option (such as auth/realm) to be used. + + MWI taskprocessor high water alert trigger level. + + On a heavily loaded system you may need to adjust the + taskprocessor queue limits. If any taskprocessor queue size + reaches its high water level then pjsip will stop processing + new requests until the alert is cleared. The alert clears + when all alerting taskprocessor queues have dropped to their + low water clear level. + + + + + MWI taskprocessor low water clear alert level. + + On a heavily loaded system you may need to adjust the + taskprocessor queue limits. If any taskprocessor queue size + reaches its high water level then pjsip will stop processing + new requests until the alert is cleared. The alert clears + when all alerting taskprocessor queues have dropped to their + low water clear level. + + Set to -1 for the low water level to be 90% of + the high water level. + + + + Enable/Disable sending unsolicited MWI to all endpoints on startup. + + When the initial unsolicited MWI notification are + enabled on startup then the initial notifications + get sent at startup. If you have a lot of endpoints + (thousands) that use unsolicited MWI then you may + want to consider disabling the initial startup + notifications. + + When the initial unsolicited MWI notifications are + disabled on startup then the notifications will start + on the endpoint's next contact update. + + + diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c index 6bb6888049..8a1b0d449f 100644 --- a/res/res_pjsip/config_global.c +++ b/res/res_pjsip/config_global.c @@ -24,6 +24,7 @@ #include "asterisk/res_pjsip.h" #include "include/res_pjsip_private.h" #include "asterisk/sorcery.h" +#include "asterisk/taskprocessor.h" #include "asterisk/ast_version.h" #include "asterisk/res_pjsip_cli.h" @@ -43,6 +44,9 @@ #define DEFAULT_UNIDENTIFIED_REQUEST_COUNT 5 #define DEFAULT_UNIDENTIFIED_REQUEST_PERIOD 5 #define DEFAULT_UNIDENTIFIED_REQUEST_PRUNE_INTERVAL 30 +#define DEFAULT_MWI_TPS_QUEUE_HIGH AST_TASKPROCESSOR_HIGH_WATER_LEVEL +#define DEFAULT_MWI_TPS_QUEUE_LOW -1 +#define DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED 0 static char default_useragent[256]; @@ -79,6 +83,14 @@ struct global_config { unsigned int unidentified_request_period; /* Interval at which expired unidentifed requests will be pruned */ unsigned int unidentified_request_prune_interval; + struct { + /*! Taskprocessor high water alert trigger level */ + unsigned int tps_queue_high; + /*! Taskprocessor low water clear alert level. */ + int tps_queue_low; + /*! Nonzero to disable sending unsolicited mwi to all endpoints on startup */ + unsigned int disable_initial_unsolicited; + } mwi; }; static void global_destructor(void *obj) @@ -314,6 +326,53 @@ void ast_sip_get_default_from_user(char *from_user, size_t size) } } + +unsigned int ast_sip_get_mwi_tps_queue_high(void) +{ + unsigned int tps_queue_high; + struct global_config *cfg; + + cfg = get_global_cfg(); + if (!cfg) { + return DEFAULT_MWI_TPS_QUEUE_HIGH; + } + + tps_queue_high = cfg->mwi.tps_queue_high; + ao2_ref(cfg, -1); + return tps_queue_high; +} + +int ast_sip_get_mwi_tps_queue_low(void) +{ + int tps_queue_low; + struct global_config *cfg; + + cfg = get_global_cfg(); + if (!cfg) { + return DEFAULT_MWI_TPS_QUEUE_LOW; + } + + tps_queue_low = cfg->mwi.tps_queue_low; + ao2_ref(cfg, -1); + return tps_queue_low; +} + +unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void) +{ + unsigned int disable_initial_unsolicited; + struct global_config *cfg; + + cfg = get_global_cfg(); + if (!cfg) { + return DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED; + } + + disable_initial_unsolicited = cfg->mwi.disable_initial_unsolicited; + ao2_ref(cfg, -1); + return disable_initial_unsolicited; +} + + /*! * \internal * \brief Observer to set default global object if none exist. @@ -450,6 +509,15 @@ int ast_sip_initialize_sorcery_global(void) OPT_UINT_T, 0, FLDSET(struct global_config, unidentified_request_prune_interval)); ast_sorcery_object_field_register(sorcery, "global", "default_realm", DEFAULT_REALM, OPT_STRINGFIELD_T, 0, STRFLDSET(struct global_config, default_realm)); + ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_high", + __stringify(DEFAULT_MWI_TPS_QUEUE_HIGH), + OPT_UINT_T, 0, FLDSET(struct global_config, mwi.tps_queue_high)); + ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_low", + __stringify(DEFAULT_MWI_TPS_QUEUE_LOW), + OPT_INT_T, 0, FLDSET(struct global_config, mwi.tps_queue_low)); + ast_sorcery_object_field_register(sorcery, "global", "mwi_disable_initial_unsolicited", + DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED ? "yes" : "no", + OPT_BOOL_T, 1, FLDSET(struct global_config, mwi.disable_initial_unsolicited)); if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) { return -1; diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index d86c96c744..bf7f042760 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -35,6 +35,7 @@ #include "asterisk/module.h" #include "asterisk/logger.h" #include "asterisk/astobj2.h" +#include "asterisk/taskprocessor.h" #include "asterisk/sorcery.h" #include "asterisk/stasis.h" #include "asterisk/app.h" @@ -52,6 +53,12 @@ static char *default_voicemail_extension; #define MWI_DATASTORE "MWI datastore" +/*! Number of serializers in pool if one not supplied. */ +#define MWI_SERIALIZER_POOL_SIZE 8 + +/*! Pool of serializers to use if not supplied. */ +static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE]; + static void mwi_subscription_shutdown(struct ast_sip_subscription *sub); static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf); static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint, @@ -119,6 +126,117 @@ struct mwi_subscription { char id[1]; }; +/*! + * \internal + * \brief Shutdown the serializers in the mwi pool. + * \since 13.12.0 + * + * \return Nothing + */ +static void mwi_serializer_pool_shutdown(void) +{ + int idx; + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + ast_taskprocessor_unreference(mwi_serializer_pool[idx]); + mwi_serializer_pool[idx] = NULL; + } +} + +/*! + * \internal + * \brief Setup the serializers in the mwi pool. + * \since 13.12.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int mwi_serializer_pool_setup(void) +{ + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + int idx; + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi"); + + mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name); + if (!mwi_serializer_pool[idx]) { + mwi_serializer_pool_shutdown(); + return -1; + } + } + return 0; +} + +/*! + * \internal + * \brief Pick a mwi serializer from the pool. + * \since 13.12.0 + * + * \retval least queue size task processor. + */ +static struct ast_taskprocessor *get_mwi_serializer(void) +{ + int idx; + int pos; + + if (!mwi_serializer_pool[0]) { + return NULL; + } + + for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) { + pos = idx; + } + } + + return mwi_serializer_pool[pos]; +} + +/*! + * \internal + * \brief Set taskprocessor alert levels for the serializers in the mwi pool. + * \since 13.12.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int mwi_serializer_set_alert_levels(void) +{ + int idx; + long tps_queue_high; + long tps_queue_low; + + if (!mwi_serializer_pool[0]) { + return -1; + } + + tps_queue_high = ast_sip_get_mwi_tps_queue_high(); + if (tps_queue_high <= 0) { + ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n", + tps_queue_high); + tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; + } + + tps_queue_low = ast_sip_get_mwi_tps_queue_low(); + if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) { + ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n", + tps_queue_low); + tps_queue_low = -1; + } + + for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { + if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) { + ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n", + idx); + } + } + + return 0; +} + + static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg); @@ -945,7 +1063,7 @@ static int send_notify(void *obj, void *arg, int flags) struct mwi_subscription *mwi_sub = obj; struct ast_taskprocessor *serializer = mwi_sub->is_solicited ? ast_sip_subscription_get_serializer(mwi_sub->sip_sub) - : NULL; + : get_mwi_serializer(); if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); @@ -1063,7 +1181,7 @@ static int send_contact_notify(void *obj, void *arg, int flags) return 0; } - if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } @@ -1149,6 +1267,7 @@ static void global_loaded(const char *object_type) { ast_free(default_voicemail_extension); default_voicemail_extension = ast_sip_get_default_voicemail_extension(); + mwi_serializer_set_alert_levels(); } static struct ast_sorcery_observer global_observer = { @@ -1175,15 +1294,21 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } + if (mwi_serializer_pool_setup()) { + ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); + } + create_mwi_subscriptions(); ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer); ast_sorcery_reload_object(ast_sip_get_sorcery(), "global"); - if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { - ast_sip_push_task(NULL, send_initial_notify_all, NULL); - } else { - stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + if (!ast_sip_get_mwi_disable_initial_unsolicited()) { + if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { + ast_sip_push_task(NULL, send_initial_notify_all, NULL); + } else { + stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + } } return AST_MODULE_LOAD_SUCCESS; @@ -1193,6 +1318,7 @@ static int unload_module(void) { ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); ao2_ref(unsolicited_mwi, -1); + mwi_serializer_pool_shutdown(); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); ast_sip_unregister_subscription_handler(&mwi_handler);