res_pjsip_mwi: fix unsolicited mwi blocks PJSIP stack

The PJSIP taskprocessors could be overflowed on startup
if there are many (thousands) realtime endpoints
configured with unsolicited mwi.
The PJSIP stack could be totally unresponsive for a few minutes
after boot completed.

This patch creates a separate PJSIP serializers pool for mwi
and makes unsolicited mwi use serializers from this pool.
This patch also adds 2 new global options to tune taskprocessor
alert levels: 'mwi_tps_queue_high' and 'mwi_tps_queue_low'.

This patch also adds new global option 'mwi_disable_initial_unsolicited'
to disable sending unsolicited mwi to all endpoints on startup.
If disabled then unsolicited mwi will start processing
on next endpoint's contact update.

ASTERISK-26230 #close

Change-Id: I4c8ecb82c249eb887930980a800c9f87f28f861a
changes/28/3428/2
Alexei Gradinari 9 years ago
parent 9042ad40f2
commit 403b63571c

@ -389,6 +389,15 @@ app_voicemail
Additional information can be found in the sample configuration file at
config/samples/voicemail.conf.sample.
res_pjsip_mwi
------------------
* Added "mwi_tps_queue_high" and "mwi_tps_queue_low" global configuration
options to tune taskprocessor alert levels.
* Added "mwi_disable_initial_unsolicited" global configuration option
to disable sending unsolicited MWI to all endpoints on startup.
Additional information can be found in the sample configuration file at
config/samples/pjsip.conf.sample.
------------------------------------------------------------------------------
--- Functionality changes from Asterisk 13.10.0 to Asterisk 13.11.0 ----------
------------------------------------------------------------------------------

@ -964,6 +964,27 @@
; set to this value if there is no better option (such as
; auth/realm) to be used
; Asterisk Task Processor Queue Size
; On heavy loaded system with DB storage you may need to increase
; taskprocessor queue.
; If the taskprocessor queue size reached high water level,
; the alert is triggered.
; If the alert is set the pjsip distibutor stops processing incoming
; requests until the alert is cleared.
; The alert is cleared when taskprocessor queue size drops to the
; low water clear level.
; The next options set taskprocessor queue levels for MWI.
;mwi_tps_queue_high=500 ; Taskprocessor high water alert trigger level.
;mwi_tps_queue_low=450 ; Taskprocessor low water clear alert level.
; The default is -1 for 90% of high water level.
; Unsolicited MWI
; If there are endpoints configured with unsolicited MWI
; then res_pjsip_mwi module tries to send MWI to all endpoints on startup.
;mwi_disable_initial_unsolicited=no ; Disable sending unsolicited mwi to all endpoints on startup.
; If disabled then unsolicited mwi will start processing
; on the endpoint's next contact update.
; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl
;==========================ACL SECTION OPTIONS=========================
;[acl]

@ -0,0 +1,35 @@
"""pjsip: add global MWI options
Revision ID: c7a44a5a0851
Revises: 4a6c67fa9b7a
Create Date: 2016-08-03 15:08:22.524727
"""
# revision identifiers, used by Alembic.
revision = 'c7a44a5a0851'
down_revision = '4a6c67fa9b7a'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import ENUM
YESNO_NAME = 'yesno_values'
YESNO_VALUES = ['yes', 'no']
def upgrade():
############################# Enums ##############################
# yesno_values have already been created, so use postgres enum object
# type to get around "already created" issue - works okay with mysql
yesno_values = ENUM(*YESNO_VALUES, name=YESNO_NAME, create_type=False)
op.add_column('ps_globals', sa.Column('mwi_tps_queue_high', sa.Integer))
op.add_column('ps_globals', sa.Column('mwi_tps_queue_low', sa.Integer))
op.add_column('ps_globals', sa.Column('mwi_disable_initial_unsolicited', yesno_values))
def downgrade():
op.drop_column('ps_globals', 'mwi_tps_queue_high')
op.drop_column('ps_globals', 'mwi_tps_queue_low')
op.drop_column('ps_globals', 'mwi_disable_initial_unsolicited')

@ -2416,6 +2416,32 @@ int ast_sip_register_supplement(struct ast_sip_supplement *supplement);
*/
void ast_sip_unregister_supplement(struct ast_sip_supplement *supplement);
/*!
* \brief Retrieve the global MWI taskprocessor high water alert trigger level.
*
* \since 13.12.0
*
* \retval the system MWI taskprocessor high water alert trigger level
*/
unsigned int ast_sip_get_mwi_tps_queue_high(void);
/*!
* \brief Retrieve the global MWI taskprocessor low water clear alert level.
*
* \since 13.12.0
*
* \retval the system MWI taskprocessor low water clear alert level
*/
int ast_sip_get_mwi_tps_queue_low(void);
/*!
* \brief Retrieve the global setting 'disable sending unsolicited mwi on startup'.
* \since 13.12.0
*
* \retval non zero if disable.
*/
unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void);
/*!
* \brief Retrieve the system debug setting (yes|no|host).
*

@ -1499,6 +1499,48 @@
set to this value if there is no better option (such as auth/realm) to be
used.</synopsis>
</configOption>
<configOption name="mwi_tps_queue_high" default="500">
<synopsis>MWI taskprocessor high water alert trigger level.</synopsis>
<description>
<para>On a heavily loaded system you may need to adjust the
taskprocessor queue limits. If any taskprocessor queue size
reaches its high water level then pjsip will stop processing
new requests until the alert is cleared. The alert clears
when all alerting taskprocessor queues have dropped to their
low water clear level.
</para>
</description>
</configOption>
<configOption name="mwi_tps_queue_low" default="-1">
<synopsis>MWI taskprocessor low water clear alert level.</synopsis>
<description>
<para>On a heavily loaded system you may need to adjust the
taskprocessor queue limits. If any taskprocessor queue size
reaches its high water level then pjsip will stop processing
new requests until the alert is cleared. The alert clears
when all alerting taskprocessor queues have dropped to their
low water clear level.
</para>
<note><para>Set to -1 for the low water level to be 90% of
the high water level.</para></note>
</description>
</configOption>
<configOption name="mwi_disable_initial_unsolicited" default="no">
<synopsis>Enable/Disable sending unsolicited MWI to all endpoints on startup.</synopsis>
<description>
<para>When the initial unsolicited MWI notification are
enabled on startup then the initial notifications
get sent at startup. If you have a lot of endpoints
(thousands) that use unsolicited MWI then you may
want to consider disabling the initial startup
notifications.
</para>
<para>When the initial unsolicited MWI notifications are
disabled on startup then the notifications will start
on the endpoint's next contact update.
</para>
</description>
</configOption>
</configObject>
</configFile>
</configInfo>

@ -24,6 +24,7 @@
#include "asterisk/res_pjsip.h"
#include "include/res_pjsip_private.h"
#include "asterisk/sorcery.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/ast_version.h"
#include "asterisk/res_pjsip_cli.h"
@ -43,6 +44,9 @@
#define DEFAULT_UNIDENTIFIED_REQUEST_COUNT 5
#define DEFAULT_UNIDENTIFIED_REQUEST_PERIOD 5
#define DEFAULT_UNIDENTIFIED_REQUEST_PRUNE_INTERVAL 30
#define DEFAULT_MWI_TPS_QUEUE_HIGH AST_TASKPROCESSOR_HIGH_WATER_LEVEL
#define DEFAULT_MWI_TPS_QUEUE_LOW -1
#define DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED 0
static char default_useragent[256];
@ -79,6 +83,14 @@ struct global_config {
unsigned int unidentified_request_period;
/* Interval at which expired unidentifed requests will be pruned */
unsigned int unidentified_request_prune_interval;
struct {
/*! Taskprocessor high water alert trigger level */
unsigned int tps_queue_high;
/*! Taskprocessor low water clear alert level. */
int tps_queue_low;
/*! Nonzero to disable sending unsolicited mwi to all endpoints on startup */
unsigned int disable_initial_unsolicited;
} mwi;
};
static void global_destructor(void *obj)
@ -314,6 +326,53 @@ void ast_sip_get_default_from_user(char *from_user, size_t size)
}
}
unsigned int ast_sip_get_mwi_tps_queue_high(void)
{
unsigned int tps_queue_high;
struct global_config *cfg;
cfg = get_global_cfg();
if (!cfg) {
return DEFAULT_MWI_TPS_QUEUE_HIGH;
}
tps_queue_high = cfg->mwi.tps_queue_high;
ao2_ref(cfg, -1);
return tps_queue_high;
}
int ast_sip_get_mwi_tps_queue_low(void)
{
int tps_queue_low;
struct global_config *cfg;
cfg = get_global_cfg();
if (!cfg) {
return DEFAULT_MWI_TPS_QUEUE_LOW;
}
tps_queue_low = cfg->mwi.tps_queue_low;
ao2_ref(cfg, -1);
return tps_queue_low;
}
unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void)
{
unsigned int disable_initial_unsolicited;
struct global_config *cfg;
cfg = get_global_cfg();
if (!cfg) {
return DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED;
}
disable_initial_unsolicited = cfg->mwi.disable_initial_unsolicited;
ao2_ref(cfg, -1);
return disable_initial_unsolicited;
}
/*!
* \internal
* \brief Observer to set default global object if none exist.
@ -450,6 +509,15 @@ int ast_sip_initialize_sorcery_global(void)
OPT_UINT_T, 0, FLDSET(struct global_config, unidentified_request_prune_interval));
ast_sorcery_object_field_register(sorcery, "global", "default_realm", DEFAULT_REALM,
OPT_STRINGFIELD_T, 0, STRFLDSET(struct global_config, default_realm));
ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_high",
__stringify(DEFAULT_MWI_TPS_QUEUE_HIGH),
OPT_UINT_T, 0, FLDSET(struct global_config, mwi.tps_queue_high));
ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_low",
__stringify(DEFAULT_MWI_TPS_QUEUE_LOW),
OPT_INT_T, 0, FLDSET(struct global_config, mwi.tps_queue_low));
ast_sorcery_object_field_register(sorcery, "global", "mwi_disable_initial_unsolicited",
DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED ? "yes" : "no",
OPT_BOOL_T, 1, FLDSET(struct global_config, mwi.disable_initial_unsolicited));
if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
return -1;

@ -35,6 +35,7 @@
#include "asterisk/module.h"
#include "asterisk/logger.h"
#include "asterisk/astobj2.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/sorcery.h"
#include "asterisk/stasis.h"
#include "asterisk/app.h"
@ -52,6 +53,12 @@ static char *default_voicemail_extension;
#define MWI_DATASTORE "MWI datastore"
/*! Number of serializers in pool if one not supplied. */
#define MWI_SERIALIZER_POOL_SIZE 8
/*! Pool of serializers to use if not supplied. */
static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE];
static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint,
@ -119,6 +126,117 @@ struct mwi_subscription {
char id[1];
};
/*!
* \internal
* \brief Shutdown the serializers in the mwi pool.
* \since 13.12.0
*
* \return Nothing
*/
static void mwi_serializer_pool_shutdown(void)
{
int idx;
for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
ast_taskprocessor_unreference(mwi_serializer_pool[idx]);
mwi_serializer_pool[idx] = NULL;
}
}
/*!
* \internal
* \brief Setup the serializers in the mwi pool.
* \since 13.12.0
*
* \retval 0 on success.
* \retval -1 on error.
*/
static int mwi_serializer_pool_setup(void)
{
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
int idx;
for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
/* Create name with seq number appended. */
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi");
mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name);
if (!mwi_serializer_pool[idx]) {
mwi_serializer_pool_shutdown();
return -1;
}
}
return 0;
}
/*!
* \internal
* \brief Pick a mwi serializer from the pool.
* \since 13.12.0
*
* \retval least queue size task processor.
*/
static struct ast_taskprocessor *get_mwi_serializer(void)
{
int idx;
int pos;
if (!mwi_serializer_pool[0]) {
return NULL;
}
for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) {
pos = idx;
}
}
return mwi_serializer_pool[pos];
}
/*!
* \internal
* \brief Set taskprocessor alert levels for the serializers in the mwi pool.
* \since 13.12.0
*
* \retval 0 on success.
* \retval -1 on error.
*/
static int mwi_serializer_set_alert_levels(void)
{
int idx;
long tps_queue_high;
long tps_queue_low;
if (!mwi_serializer_pool[0]) {
return -1;
}
tps_queue_high = ast_sip_get_mwi_tps_queue_high();
if (tps_queue_high <= 0) {
ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n",
tps_queue_high);
tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
}
tps_queue_low = ast_sip_get_mwi_tps_queue_low();
if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n",
tps_queue_low);
tps_queue_low = -1;
}
for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) {
ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n",
idx);
}
}
return 0;
}
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_message *msg);
@ -945,7 +1063,7 @@ static int send_notify(void *obj, void *arg, int flags)
struct mwi_subscription *mwi_sub = obj;
struct ast_taskprocessor *serializer = mwi_sub->is_solicited
? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
: NULL;
: get_mwi_serializer();
if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
@ -1063,7 +1181,7 @@ static int send_contact_notify(void *obj, void *arg, int flags)
return 0;
}
if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) {
if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) {
ao2_ref(mwi_sub, -1);
}
@ -1149,6 +1267,7 @@ static void global_loaded(const char *object_type)
{
ast_free(default_voicemail_extension);
default_voicemail_extension = ast_sip_get_default_voicemail_extension();
mwi_serializer_set_alert_levels();
}
static struct ast_sorcery_observer global_observer = {
@ -1175,15 +1294,21 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
if (mwi_serializer_pool_setup()) {
ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
}
create_mwi_subscriptions();
ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
ast_sip_push_task(NULL, send_initial_notify_all, NULL);
} else {
stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
if (!ast_sip_get_mwi_disable_initial_unsolicited()) {
if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
ast_sip_push_task(NULL, send_initial_notify_all, NULL);
} else {
stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
}
}
return AST_MODULE_LOAD_SUCCESS;
@ -1193,6 +1318,7 @@ static int unload_module(void)
{
ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
ao2_ref(unsolicited_mwi, -1);
mwi_serializer_pool_shutdown();
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
ast_sip_unregister_subscription_handler(&mwi_handler);

Loading…
Cancel
Save