From 299ba78b0955f17bfac5703b891d0d19466f99c1 Mon Sep 17 00:00:00 2001 From: Kevin Harwell Date: Wed, 2 Oct 2019 12:18:09 -0500 Subject: [PATCH 1/2] res_pjsip/res_pjsip_mwi: use centralized serializer pools Both res_pjsip and res_pjsip_mwi made use of serializer pools. However, they both implemented their own serializer pool functionality that was pretty much identical in each of the source files. This patch removes the duplicated code, and uses the new 'ast_serializer_pool' object instead. Additionally res_pjsip_mwi enables a shutdown group on the pool since if the timing was right the module could be unloaded while taskprocessor threads still needed to execute, thus causing a crash. Change-Id: I959b0805ad024585bbb6276593118be34fbf6e1d --- include/asterisk/res_pjsip.h | 5 ++ res/res_pjsip.c | 83 ++++------------- res/res_pjsip_mwi.c | 170 +++++++++-------------------------- 3 files changed, 64 insertions(+), 194 deletions(-) diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index f38e0b39a6..b26aba90d0 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2980,6 +2980,11 @@ const char *ast_sip_get_host_ip_string(int af); */ long ast_sip_threadpool_queue_size(void); +/*! + * \brief Retrieve the SIP threadpool object + */ +struct ast_threadpool *ast_sip_threadpool(void); + /*! * \brief Retrieve transport state * \since 13.7.1 diff --git a/res/res_pjsip.c b/res/res_pjsip.c index c6594708bb..0dcbcea83b 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -34,6 +34,7 @@ #include "asterisk/utils.h" #include "asterisk/astobj2.h" #include "asterisk/module.h" +#include "asterisk/serializer.h" #include "asterisk/threadpool.h" #include "asterisk/taskprocessor.h" #include "asterisk/uuid.h" @@ -2840,7 +2841,7 @@ #define SERIALIZER_POOL_SIZE 8 /*! Pool of serializers to use if not supplied. */ -static struct ast_taskprocessor *serializer_pool[SERIALIZER_POOL_SIZE]; +static struct ast_serializer_pool *sip_serializer_pool; static pjsip_endpoint *ast_pjsip_endpoint; @@ -4626,71 +4627,10 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name) return ast_sip_create_serializer_group(name, NULL); } -/*! - * \internal - * \brief Shutdown the serializers in the default pool. - * \since 14.0.0 - * - * \return Nothing - */ -static void serializer_pool_shutdown(void) -{ - int idx; - - for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { - ast_taskprocessor_unreference(serializer_pool[idx]); - serializer_pool[idx] = NULL; - } -} - -/*! - * \internal - * \brief Setup the serializers in the default pool. - * \since 14.0.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int serializer_pool_setup(void) -{ - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - int idx; - - for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/default"); - - serializer_pool[idx] = ast_sip_create_serializer(tps_name); - if (!serializer_pool[idx]) { - serializer_pool_shutdown(); - return -1; - } - } - return 0; -} - -static struct ast_taskprocessor *serializer_pool_pick(void) -{ - int idx; - int pos = 0; - - if (!serializer_pool[0]) { - return NULL; - } - - for (idx = 1; idx < SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_size(serializer_pool[idx]) < ast_taskprocessor_size(serializer_pool[pos])) { - pos = idx; - } - } - - return serializer_pool[pos]; -} - int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - serializer = serializer_pool_pick(); + serializer = ast_serializer_pool_get(sip_serializer_pool); } return ast_taskprocessor_push(serializer, sip_task, task_data); @@ -4771,7 +4711,7 @@ int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int { if (!serializer) { /* Caller doesn't care which PJSIP serializer the task executes under. */ - serializer = serializer_pool_pick(); + serializer = ast_serializer_pool_get(sip_serializer_pool); if (!serializer) { /* No serializer picked to execute the task */ return -1; @@ -5133,6 +5073,11 @@ long ast_sip_threadpool_queue_size(void) return ast_threadpool_queue_size(sip_threadpool); } +struct ast_threadpool *ast_sip_threadpool(void) +{ + return sip_threadpool; +} + #ifdef TEST_FRAMEWORK AST_TEST_DEFINE(xml_sanitization_end_null) { @@ -5204,7 +5149,7 @@ static int unload_pjsip(void *data) * These calls need the pjsip endpoint and serializer to clean up. * If they're not set, then there's nothing to clean up anyway. */ - if (ast_pjsip_endpoint && serializer_pool[0]) { + if (ast_pjsip_endpoint && sip_serializer_pool) { ast_res_pjsip_cleanup_options_handling(); ast_res_pjsip_cleanup_message_filter(); ast_sip_destroy_distributor(); @@ -5340,7 +5285,9 @@ static int load_module(void) goto error; } - if (serializer_pool_setup()) { + sip_serializer_pool = ast_serializer_pool_create( + "pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1); + if (!sip_serializer_pool) { ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n"); goto error; } @@ -5413,7 +5360,7 @@ error: /* These functions all check for NULLs and are safe to call at any time */ ast_sip_destroy_scheduler(); - serializer_pool_shutdown(); + ast_serializer_pool_destroy(sip_serializer_pool); ast_threadpool_shutdown(sip_threadpool); return AST_MODULE_LOAD_DECLINE; @@ -5444,7 +5391,7 @@ static int unload_module(void) */ ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL); ast_sip_destroy_scheduler(); - serializer_pool_shutdown(); + ast_serializer_pool_destroy(sip_serializer_pool); ast_threadpool_shutdown(sip_threadpool); return 0; diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index c89d383855..49b5dc56c4 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -36,6 +36,7 @@ #include "asterisk/logger.h" #include "asterisk/astobj2.h" #include "asterisk/taskprocessor.h" +#include "asterisk/serializer.h" #include "asterisk/sorcery.h" #include "asterisk/stasis.h" #include "asterisk/mwi.h" @@ -57,8 +58,11 @@ static char *default_voicemail_extension; /*! Number of serializers in pool if one not supplied. */ #define MWI_SERIALIZER_POOL_SIZE 8 +/*! Max timeout for all threads to join during an unload. */ +#define MAX_UNLOAD_TIMEOUT_TIME 10 /* Seconds */ + /*! Pool of serializers to use if not supplied. */ -static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE]; +static struct ast_serializer_pool *mwi_serializer_pool; static void mwi_subscription_shutdown(struct ast_sip_subscription *sub); static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf); @@ -129,117 +133,6 @@ struct mwi_subscription { char id[1]; }; -/*! - * \internal - * \brief Shutdown the serializers in the mwi pool. - * \since 13.12.0 - * - * \return Nothing - */ -static void mwi_serializer_pool_shutdown(void) -{ - int idx; - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - ast_taskprocessor_unreference(mwi_serializer_pool[idx]); - mwi_serializer_pool[idx] = NULL; - } -} - -/*! - * \internal - * \brief Setup the serializers in the mwi pool. - * \since 13.12.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int mwi_serializer_pool_setup(void) -{ - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - int idx; - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi"); - - mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name); - if (!mwi_serializer_pool[idx]) { - mwi_serializer_pool_shutdown(); - return -1; - } - } - return 0; -} - -/*! - * \internal - * \brief Pick a mwi serializer from the pool. - * \since 13.12.0 - * - * \retval least queue size task processor. - */ -static struct ast_taskprocessor *get_mwi_serializer(void) -{ - int idx; - int pos; - - if (!mwi_serializer_pool[0]) { - return NULL; - } - - for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) { - pos = idx; - } - } - - return mwi_serializer_pool[pos]; -} - -/*! - * \internal - * \brief Set taskprocessor alert levels for the serializers in the mwi pool. - * \since 13.12.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int mwi_serializer_set_alert_levels(void) -{ - int idx; - long tps_queue_high; - long tps_queue_low; - - if (!mwi_serializer_pool[0]) { - return -1; - } - - tps_queue_high = ast_sip_get_mwi_tps_queue_high(); - if (tps_queue_high <= 0) { - ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n", - tps_queue_high); - tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; - } - - tps_queue_low = ast_sip_get_mwi_tps_queue_low(); - if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) { - ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n", - tps_queue_low); - tps_queue_low = -1; - } - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) { - ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n", - idx); - } - } - - return 0; -} - - static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg); @@ -1213,7 +1106,7 @@ static int send_notify(void *obj, void *arg, int flags) struct mwi_subscription *mwi_sub = obj; struct ast_taskprocessor *serializer = mwi_sub->is_solicited ? ast_sip_subscription_get_serializer(mwi_sub->sip_sub) - : get_mwi_serializer(); + : ast_serializer_pool_get(mwi_serializer_pool); if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); @@ -1228,7 +1121,8 @@ static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct mwi_subscription *mwi_sub = userdata; if (stasis_subscription_final_message(sub, msg)) { - if (ast_sip_push_task(NULL, serialized_cleanup, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + serialized_cleanup, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } return; @@ -1391,7 +1285,8 @@ static int send_contact_notify(void *obj, void *arg, int flags) return 0; } - if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } @@ -1513,7 +1408,8 @@ static void mwi_startup_event_cb(void *data, struct stasis_subscription *sub, st return; } - ast_sip_push_task(NULL, send_initial_notify_all, NULL); + ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + send_initial_notify_all, NULL); stasis_unsubscribe(sub); } @@ -1522,7 +1418,8 @@ static void global_loaded(const char *object_type) { ast_free(default_voicemail_extension); default_voicemail_extension = ast_sip_get_default_voicemail_extension(); - mwi_serializer_set_alert_levels(); + ast_serializer_pool_set_alerts(mwi_serializer_pool, + ast_sip_get_mwi_tps_queue_high(), ast_sip_get_mwi_tps_queue_low()); } static struct ast_sorcery_observer global_observer = { @@ -1543,14 +1440,16 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } - if (mwi_serializer_pool_setup()) { + mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi", + MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME); + if (!mwi_serializer_pool) { ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); } solicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, mwi_sub_hash, NULL, mwi_sub_cmp); if (!solicited_mwi) { - mwi_serializer_pool_shutdown(); + ast_serializer_pool_destroy(mwi_serializer_pool); ast_sip_unregister_subscription_handler(&mwi_handler); return AST_MODULE_LOAD_DECLINE; } @@ -1558,7 +1457,7 @@ static int load_module(void) unsolicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, mwi_sub_hash, NULL, mwi_sub_cmp); if (!unsolicited_mwi) { - mwi_serializer_pool_shutdown(); + ast_serializer_pool_destroy(mwi_serializer_pool); ast_sip_unregister_subscription_handler(&mwi_handler); ao2_ref(solicited_mwi, -1); return AST_MODULE_LOAD_DECLINE; @@ -1571,7 +1470,8 @@ static int load_module(void) if (!ast_sip_get_mwi_disable_initial_unsolicited()) { create_mwi_subscriptions(); if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { - ast_sip_push_task(NULL, send_initial_notify_all, NULL); + ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + send_initial_notify_all, NULL); } else { struct stasis_subscription *sub; @@ -1581,6 +1481,15 @@ static int load_module(void) } } + if (!mwi_serializer_pool) { + /* + * If the mwi serializer pool was unable to be established then the module will + * use the default serializer pool. If this happens prevent manual unloading + * since there would now exist the potential for a crash on unload. + */ + ast_module_shutdown_ref(ast_module_info->self); + } + return AST_MODULE_LOAD_SUCCESS; } @@ -1589,13 +1498,22 @@ static int unload_module(void) ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); - ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); - ao2_ref(unsolicited_mwi, -1); - unsolicited_mwi = NULL; + if (unsolicited_mwi) { + ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); + ao2_ref(unsolicited_mwi, -1); + unsolicited_mwi = NULL; + } - ao2_cleanup(solicited_mwi); + if (solicited_mwi) { + ao2_ref(solicited_mwi, -1); + solicited_mwi = NULL; + } - mwi_serializer_pool_shutdown(); + if (ast_serializer_pool_destroy(mwi_serializer_pool)) { + ast_log(LOG_WARNING, "Unload incomplete. Try again later\n"); + return -1; + } + mwi_serializer_pool = NULL; ast_sip_unregister_subscription_handler(&mwi_handler); From 996fc40e2b489c7df13567d2255d8238c0cad1bd Mon Sep 17 00:00:00 2001 From: Kevin Harwell Date: Wed, 2 Oct 2019 12:18:50 -0500 Subject: [PATCH 2/2] res_pjsip_mwi: use an ao2_global object for mwi containers On shutdown it's possible for the unsolicited mwi container to be freed before other dependent threads are done using it. This patch ensures this can no longer happen by wrapping the container in an ao2_global object. The solicited container was also changed too. ASTERISK-28552 Change-Id: I8f812286dc19a34916acacd71ce2ec26e1042047 --- res/res_pjsip_mwi.c | 201 +++++++++++++++++++++++++++++++------------- 1 file changed, 142 insertions(+), 59 deletions(-) diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index 49b5dc56c4..d7749fab64 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -42,8 +42,9 @@ #include "asterisk/mwi.h" struct mwi_subscription; -static struct ao2_container *unsolicited_mwi; -static struct ao2_container *solicited_mwi; + +AO2_GLOBAL_OBJ_STATIC(mwi_unsolicited); +AO2_GLOBAL_OBJ_STATIC(mwi_solicited); static char *default_voicemail_extension; @@ -581,13 +582,15 @@ static int unsubscribe_stasis(void *obj, void *arg, int flags) } static int create_unsolicited_mwi_subscriptions(struct ast_sip_endpoint *endpoint, - int recreate, int send_now); + int recreate, int send_now, struct ao2_container *unsolicited_mwi, struct ao2_container *solicited_mwi); static void mwi_subscription_shutdown(struct ast_sip_subscription *sub) { struct mwi_subscription *mwi_sub; struct ast_datastore *mwi_datastore; struct ast_sip_endpoint *endpoint = NULL; + struct ao2_container *unsolicited_mwi; + struct ao2_container *solicited_mwi; mwi_datastore = ast_sip_subscription_get_datastore(sub, MWI_DATASTORE); if (!mwi_datastore) { @@ -601,18 +604,25 @@ static void mwi_subscription_shutdown(struct ast_sip_subscription *sub) endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", mwi_sub->id); ao2_ref(mwi_datastore, -1); - ao2_unlink(solicited_mwi, mwi_sub); + + solicited_mwi = ao2_global_obj_ref(mwi_solicited); + if (solicited_mwi) { + ao2_unlink(solicited_mwi, mwi_sub); + } /* * When a solicited subscription is removed it's possible an unsolicited one * needs to be [re-]created. Attempt to establish unsolicited MWI. */ + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); if (unsolicited_mwi && endpoint) { ao2_lock(unsolicited_mwi); - create_unsolicited_mwi_subscriptions(endpoint, 1, 1); + create_unsolicited_mwi_subscriptions(endpoint, 1, 1, unsolicited_mwi, solicited_mwi); ao2_unlock(unsolicited_mwi); + ao2_ref(unsolicited_mwi, -1); } + ao2_cleanup(solicited_mwi); ao2_cleanup(endpoint); } @@ -678,6 +688,10 @@ static int has_mwi_subscription(struct ao2_container *container, *mwi_sub = NULL; *mwi_stasis = NULL; + if (!container) { + return 0; + } + mwi_subs = ao2_find(container, ast_sorcery_object_get_id(endpoint), OBJ_SEARCH_KEY | OBJ_MULTIPLE | OBJ_NOLOCK); if (!mwi_subs) { @@ -707,11 +721,13 @@ static int has_mwi_subscription(struct ao2_container *container, * * \param endpoint The endpoint * \param mailbox The mailbox + * \param unsolicited_mwi A container of unsolicited mwi objects * * \retval 1 if a solicited subscription is allowed for the endpoint/mailbox * 0 otherwise */ -static int allow_and_or_replace_unsolicited(struct ast_sip_endpoint *endpoint, const char *mailbox) +static int allow_and_or_replace_unsolicited(struct ast_sip_endpoint *endpoint, const char *mailbox, + struct ao2_container *unsolicited_mwi) { struct mwi_subscription *mwi_sub; struct mwi_stasis_subscription *mwi_stasis; @@ -758,11 +774,14 @@ static int send_notify(void *obj, void *arg, int flags); * * \param endpoint The endpoint * \param mailbox The mailbox + * \param unsolicited_mwi A container of unsolicited mwi objects + * \param solicited_mwi A container of solicited mwi objects * * \retval 1 if an unsolicited subscription is allowed for the endpoint/mailbox * 0 otherwise */ -static int is_unsolicited_allowed(struct ast_sip_endpoint *endpoint, const char *mailbox) +static int is_unsolicited_allowed(struct ast_sip_endpoint *endpoint, const char *mailbox, + struct ao2_container *unsolicited_mwi, struct ao2_container *solicited_mwi) { struct mwi_subscription *mwi_sub; struct mwi_stasis_subscription *mwi_stasis; @@ -828,28 +847,41 @@ static int mwi_validate_for_aor(void *obj, void *arg, int flags) struct ast_sip_endpoint *endpoint = arg; char *mailboxes; char *mailbox; + struct ao2_container *unsolicited_mwi; if (ast_strlen_zero(aor->mailboxes)) { return 0; } /* A reload could be taking place so lock while checking if allowed */ - ao2_lock(unsolicited_mwi); + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + if (unsolicited_mwi) { + ao2_lock(unsolicited_mwi); + } + mailboxes = ast_strdupa(aor->mailboxes); while ((mailbox = ast_strip(strsep(&mailboxes, ",")))) { if (ast_strlen_zero(mailbox)) { continue; } - if (!allow_and_or_replace_unsolicited(endpoint, mailbox)) { + if (!allow_and_or_replace_unsolicited(endpoint, mailbox, unsolicited_mwi)) { ast_debug(1, "Endpoint '%s' already configured for unsolicited MWI for mailbox '%s'. " "Denying MWI subscription to %s\n", ast_sorcery_object_get_id(endpoint), mailbox, ast_sorcery_object_get_id(aor)); - ao2_unlock(unsolicited_mwi); + + if (unsolicited_mwi) { + ao2_unlock(unsolicited_mwi); + ao2_ref(unsolicited_mwi, -1); + } return -1; } } - ao2_unlock(unsolicited_mwi); + + if (unsolicited_mwi) { + ao2_unlock(unsolicited_mwi); + ao2_ref(unsolicited_mwi, -1); + } return 0; } @@ -975,6 +1007,7 @@ static int mwi_subscription_established(struct ast_sip_subscription *sip_sub) const char *resource = ast_sip_subscription_get_resource_name(sip_sub); struct mwi_subscription *sub; struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sip_sub); + struct ao2_container *solicited_mwi; /* no aor in uri? subscribe to all on endpoint */ if (ast_strlen_zero(resource)) { @@ -995,7 +1028,12 @@ static int mwi_subscription_established(struct ast_sip_subscription *sip_sub) ast_sip_subscription_remove_datastore(sip_sub, MWI_DATASTORE); } - ao2_link(solicited_mwi, sub); + solicited_mwi = ao2_global_obj_ref(mwi_solicited); + if (solicited_mwi) { + ao2_link(solicited_mwi, sub); + ao2_ref(solicited_mwi, -1); + } + ao2_cleanup(sub); ao2_cleanup(endpoint); return 0; @@ -1142,11 +1180,13 @@ static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, * \param endpoint An endpoint object * \param recreate Whether or not unsolicited subscriptions are potentially being recreated * \param send_now Whether or not to send a notify once the subscription is created + * \param unsolicited_mwi A container of unsolicited mwi objects + * \param solicited_mwi A container of solicited mwi objects * * \retval 0 */ static int create_unsolicited_mwi_subscriptions(struct ast_sip_endpoint *endpoint, - int recreate, int send_now) + int recreate, int send_now, struct ao2_container *unsolicited_mwi, struct ao2_container *solicited_mwi) { RAII_VAR(struct mwi_subscription *, aggregate_sub, NULL, ao2_cleanup); char *mailboxes; @@ -1181,14 +1221,16 @@ static int create_unsolicited_mwi_subscriptions(struct ast_sip_endpoint *endpoin } /* Lock solicited so we don't potentially add to both containers */ - ao2_lock(solicited_mwi); + if (solicited_mwi) { + ao2_lock(solicited_mwi); + } mailboxes = ast_strdupa(endpoint->subscription.mwi.mailboxes); while ((mailbox = ast_strip(strsep(&mailboxes, ",")))) { struct mwi_subscription *sub; struct mwi_stasis_subscription *mwi_stasis_sub; - if (!is_unsolicited_allowed(endpoint, mailbox)) { + if (!is_unsolicited_allowed(endpoint, mailbox, unsolicited_mwi, solicited_mwi)) { continue; } @@ -1228,13 +1270,16 @@ static int create_unsolicited_mwi_subscriptions(struct ast_sip_endpoint *endpoin } } - ao2_unlock(solicited_mwi); + if (solicited_mwi) { + ao2_unlock(solicited_mwi); + } + return 0; } -static int create_mwi_subscriptions_for_endpoint(void *obj, void *arg, int flags) +static int create_mwi_subscriptions_for_endpoint(void *obj, void *arg, void *data, int flags) { - return create_unsolicited_mwi_subscriptions(obj, 0, 0); + return create_unsolicited_mwi_subscriptions(obj, 0, 0, arg, data); } static int unsubscribe(void *obj, void *arg, int flags) @@ -1248,9 +1293,16 @@ static int unsubscribe(void *obj, void *arg, int flags) static void create_mwi_subscriptions(void) { + struct ao2_container *unsolicited_mwi; + struct ao2_container *solicited_mwi; struct ao2_container *endpoints; struct ast_variable *var; + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + if (!unsolicited_mwi) { + return; + } + var = ast_variable_new("mailboxes !=", "", ""); endpoints = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "endpoint", @@ -1258,9 +1310,12 @@ static void create_mwi_subscriptions(void) ast_variables_destroy(var); if (!endpoints) { + ao2_ref(unsolicited_mwi, -1); return; } + solicited_mwi = ao2_global_obj_ref(mwi_solicited); + /* We remove all the old stasis subscriptions first before applying the new configuration. This * prevents a situation where there might be multiple overlapping stasis subscriptions for an * endpoint for mailboxes. Though there may be mailbox changes during the gap between unsubscribing @@ -1269,10 +1324,12 @@ static void create_mwi_subscriptions(void) */ ao2_lock(unsolicited_mwi); ao2_callback(unsolicited_mwi, OBJ_NOLOCK | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); - ao2_callback(endpoints, OBJ_NODATA, create_mwi_subscriptions_for_endpoint, NULL); + ao2_callback_data(endpoints, OBJ_NODATA, create_mwi_subscriptions_for_endpoint, unsolicited_mwi, solicited_mwi); ao2_unlock(unsolicited_mwi); ao2_ref(endpoints, -1); + ao2_cleanup(solicited_mwi); + ao2_ref(unsolicited_mwi, -1); } /*! \brief Function called to send MWI NOTIFY on any unsolicited mailboxes relating to this AOR */ @@ -1299,6 +1356,8 @@ static void mwi_contact_changed(const struct ast_sip_contact *contact) char *id = ast_strdupa(ast_sorcery_object_get_id(contact)); char *aor = NULL; struct ast_sip_endpoint *endpoint = NULL; + struct ao2_container *unsolicited_mwi; + struct ao2_container *solicited_mwi; if (contact->endpoint) { endpoint = ao2_bump(contact->endpoint); @@ -1313,10 +1372,20 @@ static void mwi_contact_changed(const struct ast_sip_contact *contact) return; } + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + if (!unsolicited_mwi) { + ao2_cleanup(endpoint); + return; + } + + solicited_mwi = ao2_global_obj_ref(mwi_solicited); + ao2_lock(unsolicited_mwi); - create_mwi_subscriptions_for_endpoint(endpoint, NULL, 0); + create_unsolicited_mwi_subscriptions(endpoint, 0, 0, unsolicited_mwi, solicited_mwi); ao2_unlock(unsolicited_mwi); ao2_cleanup(endpoint); + ao2_cleanup(solicited_mwi); + ao2_ref(unsolicited_mwi, -1); aor = strsep(&id, ";@"); ao2_callback(unsolicited_mwi, OBJ_NODATA, send_contact_notify, aor); @@ -1342,6 +1411,7 @@ static void mwi_contact_deleted(const void *object) struct mwi_subscription *mwi_sub; struct ast_sip_endpoint *endpoint = NULL; struct ast_sip_contact *found_contact; + struct ao2_container *unsolicited_mwi; if (contact->endpoint) { endpoint = ao2_bump(contact->endpoint); @@ -1364,6 +1434,11 @@ static void mwi_contact_deleted(const void *object) return; } + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + if (!unsolicited_mwi) { + return; + } + ao2_lock(unsolicited_mwi); mwi_subs = ao2_find(unsolicited_mwi, contact->endpoint_name, OBJ_SEARCH_KEY | OBJ_MULTIPLE | OBJ_NOLOCK | OBJ_UNLINK); @@ -1374,6 +1449,7 @@ static void mwi_contact_deleted(const void *object) ao2_iterator_destroy(mwi_subs); } ao2_unlock(unsolicited_mwi); + ao2_ref(unsolicited_mwi, -1); } /*! \brief Observer for contacts so unsolicited MWI is sent when a contact changes */ @@ -1386,7 +1462,12 @@ static const struct ast_sorcery_observer mwi_contact_observer = { /*! \brief Task invoked to send initial MWI NOTIFY for unsolicited */ static int send_initial_notify_all(void *obj) { - ao2_callback(unsolicited_mwi, OBJ_NODATA, send_notify, NULL); + struct ao2_container *unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + + if (unsolicited_mwi) { + ao2_callback(unsolicited_mwi, OBJ_NODATA, send_notify, NULL); + ao2_ref(unsolicited_mwi, -1); + } return 0; } @@ -1434,8 +1515,38 @@ static int reload(void) return 0; } +static int unload_module(void) +{ + struct ao2_container *unsolicited_mwi; + + ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); + ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); + + unsolicited_mwi = ao2_global_obj_replace(mwi_unsolicited, NULL); + if (unsolicited_mwi) { + ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); + ao2_ref(unsolicited_mwi, -1); + } + + ao2_global_obj_release(mwi_solicited); + + if (ast_serializer_pool_destroy(mwi_serializer_pool)) { + ast_log(LOG_WARNING, "Unload incomplete. Try again later\n"); + return -1; + } + mwi_serializer_pool = NULL; + + ast_sip_unregister_subscription_handler(&mwi_handler); + + ast_free(default_voicemail_extension); + default_voicemail_extension = NULL; + return 0; +} + static int load_module(void) { + struct ao2_container *mwi_container; + if (ast_sip_register_subscription_handler(&mwi_handler)) { return AST_MODULE_LOAD_DECLINE; } @@ -1446,22 +1557,23 @@ static int load_module(void) ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); } - solicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, + mwi_container = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, mwi_sub_hash, NULL, mwi_sub_cmp); - if (!solicited_mwi) { - ast_serializer_pool_destroy(mwi_serializer_pool); - ast_sip_unregister_subscription_handler(&mwi_handler); + if (!mwi_container) { + unload_module(); return AST_MODULE_LOAD_DECLINE; } + ao2_global_obj_replace_unref(mwi_solicited, mwi_container); + ao2_ref(mwi_container, -1); - unsolicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, + mwi_container = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, mwi_sub_hash, NULL, mwi_sub_cmp); - if (!unsolicited_mwi) { - ast_serializer_pool_destroy(mwi_serializer_pool); - ast_sip_unregister_subscription_handler(&mwi_handler); - ao2_ref(solicited_mwi, -1); + if (!mwi_container) { + unload_module(); return AST_MODULE_LOAD_DECLINE; } + ao2_global_obj_replace_unref(mwi_unsolicited, mwi_container); + ao2_ref(mwi_container, -1); ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer); @@ -1493,35 +1605,6 @@ static int load_module(void) return AST_MODULE_LOAD_SUCCESS; } -static int unload_module(void) -{ - ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); - ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); - - if (unsolicited_mwi) { - ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); - ao2_ref(unsolicited_mwi, -1); - unsolicited_mwi = NULL; - } - - if (solicited_mwi) { - ao2_ref(solicited_mwi, -1); - solicited_mwi = NULL; - } - - if (ast_serializer_pool_destroy(mwi_serializer_pool)) { - ast_log(LOG_WARNING, "Unload incomplete. Try again later\n"); - return -1; - } - mwi_serializer_pool = NULL; - - ast_sip_unregister_subscription_handler(&mwi_handler); - - ast_free(default_voicemail_extension); - default_voicemail_extension = NULL; - return 0; -} - AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP MWI resource", .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module,