Fix shutdown assertions in stasis-core

In r388005, macros were introduced to consistently define message
types. This added an assert if a message type was used either before
it was initialized or after it had been cleaned up. It turns out that
this assertion fires during shutdown.

This actually exposed a hidden shutdown ordering problem. Since
unsubscribing is asynchronous, it's possible that the message types
used by the subscription could be freed before the final message of
the subscription was processed.

This patch adds stasis_subscription_join(), which blocks until the
last message has been processed by the subscription. Since joining was
most commonly done right after an unsubscribe, a
stasis_unsubscribe_and_join() convenience function was also added.

Similar functions were also added to the stasis_caching_topic and
stasis_message_router, since they wrap subscriptions and have similar
problems.

Other code in trunk was refactored to join() where appropriate, or at
least verify that the subscription was complete before being
destroyed.

Review: https://reviewboard.asterisk.org/r/2540


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@389011 65c4cc65-6c06-0410-ace0-fbb531ad65f3
changes/78/78/1
David M. Lee 12 years ago
parent 91bab76422
commit b97c71bb11

@ -9866,9 +9866,7 @@ static int unload_module(void)
res |= ast_data_unregister(NULL);
if (device_state_sub) {
device_state_sub = stasis_unsubscribe(device_state_sub);
}
device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
ast_extension_state_del(0, extension_state_cb);

@ -12689,9 +12689,7 @@ static void stop_poll_thread(void)
{
poll_thread_run = 0;
if (mwi_sub_sub) {
mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub);
}
mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub);
ast_mutex_lock(&poll_lock);
ast_cond_signal(&poll_cond);

@ -1334,9 +1334,7 @@ static void network_change_stasis_subscribe(void)
static void network_change_stasis_unsubscribe(void)
{
if (network_change_sub) {
network_change_sub = stasis_unsubscribe(network_change_sub);
}
network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
}
static void acl_change_stasis_subscribe(void)
@ -1349,9 +1347,7 @@ static void acl_change_stasis_subscribe(void)
static void acl_change_stasis_unsubscribe(void)
{
if (acl_change_sub) {
acl_change_sub = stasis_unsubscribe(acl_change_sub);
}
acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
static int network_change_sched_cb(const void *data)
@ -12424,9 +12420,7 @@ static void peer_destructor(void *obj)
if (peer->dnsmgr)
ast_dnsmgr_release(peer->dnsmgr);
if (peer->mwi_event_sub) {
peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
}
peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
ast_string_field_free_memory(peer);
}

@ -16742,25 +16742,21 @@ static void network_change_stasis_subscribe(void)
static void network_change_stasis_unsubscribe(void)
{
if (network_change_sub) {
network_change_sub = stasis_unsubscribe(network_change_sub);
}
network_change_sub = stasis_unsubscribe_and_join(network_change_sub);
}
static void acl_change_stasis_subscribe(void)
{
if (!acl_change_sub) {
acl_change_sub = stasis_subscribe(ast_security_topic(),
acl_change_stasis_cb, NULL);
acl_change_stasis_cb, NULL);
}
}
static void acl_change_event_stasis_unsubscribe(void)
{
if (acl_change_sub) {
acl_change_sub = stasis_unsubscribe(acl_change_sub);
}
acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
static int network_change_sched_cb(const void *data)

@ -706,7 +706,7 @@ AST_TEST_DEFINE(test_presence_state_change)
return AST_TEST_FAIL;
}
test_sub = stasis_unsubscribe(test_sub);
test_sub = stasis_unsubscribe_and_join(test_sub);
ao2_cleanup(cb_data->presence_state);
ast_free((char *)cb_data);

@ -124,6 +124,34 @@
* stasis_subscription. Due to cyclic references, the \ref
* stasis_subscription will not be freed until after it has been unsubscribed,
* and all other ao2_ref()'s have been cleaned up.
*
* \par Shutdown
*
* Subscriptions have two options for unsubscribing, depending upon the context
* in which you need to unsubscribe.
*
* If your subscription is owned by a module, and you must unsubscribe from the
* module_unload() function, then you'll want to use the
* stasis_unsubscribe_and_join() function. This will block until the final
* message has been received on the subscription. Otherwise, there's the danger
* of invoking the callback function after it has been unloaded.
*
* If your subscription is owned by an object, then your object should have an
* explicit shutdown() function, which calls stasis_unsubscribe(). In your
* subscription handler, when the stasis_subscription_final_message() has been
* received, decrement the refcount on your object. In your object's destructor,
* you may assert that stasis_subscription_is_done() to validate that the
* subscription's callback will no longer be invoked.
*
* \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
* an object's destructor. While code that does this may work most of the time,
* it's got one big downside. There's a general assumption that object
* destruction is non-blocking. If you block the destruction waiting for the
* subscription to complete, there's the danger that the subscription may
* process a message which will bump the refcount up by one. Then it does
* whatever it does, decrements the refcount, which then proceeds to re-destroy
* the object. Now you've got hard to reproduce bugs that only show up under
* certain loads.
*/
#include "asterisk/utils.h"
@ -292,8 +320,7 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s
* \since 12
*/
struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
stasis_subscription_cb callback,
void *data);
stasis_subscription_cb callback, void *data);
/*!
* \brief Cancel a subscription.
@ -304,10 +331,52 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
* delivery of the final message.
*
* \param subscription Subscription to cancel.
* \retval NULL for convenience
* \return \c NULL for convenience
* \since 12
*/
struct stasis_subscription *stasis_unsubscribe(
struct stasis_subscription *subscription);
/*!
* \brief Block until the last message is processed on a subscription.
*
* This function will not return until the \a subscription's callback for the
* stasis_subscription_final_message() completes. This allows cleanup routines
* to run before unblocking the joining thread.
*
* \param subscription Subscription to block on.
* \since 12
*/
void stasis_subscription_join(struct stasis_subscription *subscription);
/*!
* \brief Returns whether \a subscription has received its final message.
*
* Note that a subscription is considered done even while the
* stasis_subscription_final_message() is being processed. This allows cleanup
* routines to check the status of the subscription.
*
* \param subscription Subscription.
* \return True (non-zero) if stasis_subscription_final_message() has been
* received.
* \return False (zero) if waiting for the end.
*/
int stasis_subscription_is_done(struct stasis_subscription *subscription);
/*!
* \brief Cancel a subscription, blocking until the last message is processed.
*
* While normally it's recommended to stasis_unsubscribe() and wait for
* stasis_subscription_final_message(), there are times (like during a module
* unload) where you have to wait for the final message (otherwise you'll call
* a function in a shared module that no longer exists).
*
* \param subscription Subscription to cancel.
* \return \c NULL for convenience
* \since 12
*/
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription);
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription);
/*!
* \brief Create a subscription which forwards all messages from one topic to
@ -322,7 +391,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subsc
* \return \c NULL on error.
* \since 12
*/
struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic);
struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic);
/*!
* \brief Get the unique ID for the subscription.
@ -389,7 +459,8 @@ struct stasis_topic_pool;
/*!
* \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
* \param pooled_topic Topic to which messages will be routed
* \retval the new stasis_topic_pool or NULL on failure
* \return the new stasis_topic_pool
* \return \c NULL on failure
*/
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
@ -397,8 +468,8 @@ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_t
* \brief Find or create a topic in the pool
* \param pool Pool for which to get the topic
* \param topic_name Name of the topic to get
* \retval The already stored or newly allocated topic
* \retval NULL if the topic was not found and could not be allocated
* \return The already stored or newly allocated topic
* \return \c NULL if the topic was not found and could not be allocated
*/
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
@ -496,12 +567,31 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message);
struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn);
/*!
* Unsubscribes a caching topic from its upstream topic.
* \brief Unsubscribes a caching topic from its upstream topic.
*
* This function returns immediately, so be sure to cleanup when
* stasis_subscription_final_message() is received.
*
* \param caching_topic Caching topic to unsubscribe
* \return \c NULL for convenience
* \since 12
*/
struct stasis_caching_topic *stasis_caching_unsubscribe(
struct stasis_caching_topic *caching_topic);
/*!
* \brief Unsubscribes a caching topic from its upstream topic, blocking until
* all messages have been forwarded.
*
* See stasis_unsubscriben_and_join() for more info on when to use this as
* opposed to stasis_caching_unsubscribe().
*
* \param caching_topic Caching topic to unsubscribe
* \retval NULL for convenience
* \return \c NULL for convenience
* \since 12
*/
struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic);
struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
struct stasis_caching_topic *caching_topic);
/*!
* \brief Returns the topic of cached events from a caching topics.
@ -530,9 +620,9 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top
/*!
* \brief Dump cached items to a subscription
* \param caching_topic The topic returned from stasis_caching_topic_create().
* \param type Type of message to dump (any type if NULL).
* \param type Type of message to dump (any type if \c NULL).
* \return ao2_container containing all matches (must be unreffed by caller)
* \return NULL on allocation error
* \return \c NULL on allocation error
* \since 12
*/
struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,

@ -57,11 +57,35 @@ struct stasis_message_router *stasis_message_router_create(
/*!
* \brief Unsubscribe the router from the upstream topic.
*
* \param router Router to unsubscribe.
* \since 12
*/
void stasis_message_router_unsubscribe(struct stasis_message_router *router);
/*!
* \brief Unsubscribe the router from the upstream topic, blocking until the
* final message has been processed.
*
* See stasis_unsubscribe_and_join() for info on when to use this
* vs. stasis_message_router_unsubscribe().
*
* \param router Router to unsubscribe.
* \since 12
*/
void stasis_message_router_unsubscribe_and_join(
struct stasis_message_router *router);
/*!
* \brief Returns whether \a router has received its final message.
*
* \param router Router.
* \return True (non-zero) if stasis_subscription_final_message() has been
* received.
* \return False (zero) if waiting for the end.
*/
int stasis_message_router_is_done(struct stasis_message_router *router);
/*!
* \brief Add a route to a message router.
* \param router Router to add the route to.

@ -2727,7 +2727,7 @@ static void app_exit(void)
{
ao2_cleanup(mwi_topic_all);
mwi_topic_all = NULL;
mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached);
mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type);
ao2_cleanup(mwi_topic_pool);
mwi_topic_pool = NULL;

@ -776,7 +776,7 @@ static void devstate_exit(void)
{
ao2_cleanup(device_state_topic_all);
device_state_topic_all = NULL;
device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached);
device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached);
STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type);
ao2_cleanup(device_state_topic_pool);
device_state_topic_pool = NULL;

@ -106,7 +106,9 @@ static void endpoint_dtor(void *obj)
struct ast_endpoint *endpoint = obj;
/* The router should be shut down already */
ast_assert(endpoint->router == NULL);
ast_assert(stasis_message_router_is_done(endpoint->router));
ao2_cleanup(endpoint->router);
endpoint->router = NULL;
stasis_unsubscribe(endpoint->forward);
endpoint->forward = NULL;
@ -258,8 +260,9 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
stasis_publish(endpoint->topic, message);
}
/* Bump refcount to hold on to the router */
ao2_ref(endpoint->router, +1);
stasis_message_router_unsubscribe(endpoint->router);
endpoint->router = NULL;
}
const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)

@ -1077,9 +1077,7 @@ static void acl_change_stasis_subscribe(void)
static void acl_change_stasis_unsubscribe(void)
{
if (acl_change_sub) {
acl_change_sub = stasis_unsubscribe(acl_change_sub);
}
acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub);
}
/* In order to understand what the heck is going on with the

@ -805,7 +805,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
static void manager_channels_shutdown(void)
{
stasis_message_router_unsubscribe(channel_state_router);
stasis_message_router_unsubscribe_and_join(channel_state_router);
channel_state_router = NULL;
}

@ -11700,12 +11700,8 @@ static void unload_pbx(void)
{
int x;
if (presence_state_sub) {
presence_state_sub = stasis_unsubscribe(presence_state_sub);
}
if (device_state_sub) {
device_state_sub = stasis_unsubscribe(device_state_sub);
}
presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub);
device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
/* Unregister builtin applications */
for (x = 0; x < ARRAY_LEN(builtins); x++) {

@ -114,16 +114,30 @@ struct stasis_subscription {
stasis_subscription_cb callback;
/*! Data pointer to be handed to the callback. */
void *data;
/*! Lock for joining with subscription. */
ast_mutex_t join_lock;
/*! Condition for joining with subscription. */
ast_cond_t join_cond;
/*! Flag set when final message for sub has been received.
* Be sure join_lock is held before reading/setting. */
int final_message_rxed;
/*! Flag set when final message for sub has been processed.
* Be sure join_lock is held before reading/setting. */
int final_message_processed;
};
static void subscription_dtor(void *obj)
{
struct stasis_subscription *sub = obj;
ast_assert(!stasis_subscription_is_subscribed(sub));
ast_assert(stasis_subscription_is_done(sub));
ao2_cleanup(sub->topic);
sub->topic = NULL;
ast_taskprocessor_unreference(sub->mailbox);
sub->mailbox = NULL;
ast_mutex_destroy(&sub->join_lock);
ast_cond_destroy(&sub->join_cond);
}
/*!
@ -136,11 +150,22 @@ static void subscription_invoke(struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
/* Since sub->topic doesn't change, no need to lock sub */
sub->callback(sub->data,
sub,
topic,
message);
/* Notify that the final message has been received */
if (stasis_subscription_final_message(sub, message)) {
SCOPED_MUTEX(lock, &sub->join_lock);
sub->final_message_rxed = 1;
ast_cond_signal(&sub->join_cond);
}
/* Since sub is mostly immutable, no need to lock sub */
sub->callback(sub->data, sub, topic, message);
/* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) {
SCOPED_MUTEX(lock, &sub->join_lock);
sub->final_message_processed = 1;
ast_cond_signal(&sub->join_cond);
}
}
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
@ -171,6 +196,8 @@ static struct stasis_subscription *__stasis_subscribe(
sub->topic = topic;
sub->callback = callback;
sub->data = data;
ast_mutex_init(&sub->join_lock);
ast_cond_init(&sub->join_cond, NULL);
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
@ -212,6 +239,50 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
return NULL;
}
/*!
* \brief Block until the final message has been received on a subscription.
*
* \param subscription Subscription to wait on.
*/
void stasis_subscription_join(struct stasis_subscription *subscription)
{
if (subscription) {
SCOPED_MUTEX(lock, &subscription->join_lock);
/* Wait until the processed flag has been set */
while (!subscription->final_message_processed) {
ast_cond_wait(&subscription->join_cond,
&subscription->join_lock);
}
}
}
int stasis_subscription_is_done(struct stasis_subscription *subscription)
{
if (subscription) {
SCOPED_MUTEX(lock, &subscription->join_lock);
return subscription->final_message_rxed;
}
/* Null subscription is about as done as you can get */
return 1;
}
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription)
{
if (!subscription) {
return NULL;
}
/* Bump refcount to hold it past the unsubscribe */
ao2_ref(subscription, +1);
stasis_unsubscribe(subscription);
stasis_subscription_join(subscription);
/* Now decrement the refcount back */
ao2_cleanup(subscription);
return NULL;
}
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
{
if (sub) {

@ -53,6 +53,8 @@ struct stasis_caching_topic {
static void stasis_caching_topic_dtor(void *obj) {
struct stasis_caching_topic *caching_topic = obj;
ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
ast_assert(stasis_subscription_is_done(caching_topic->sub));
ao2_cleanup(caching_topic->sub);
caching_topic->sub = NULL;
ao2_cleanup(caching_topic->cache);
caching_topic->cache = NULL;
@ -69,6 +71,9 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to
{
if (caching_topic) {
if (stasis_subscription_is_subscribed(caching_topic->sub)) {
/* Increment the reference to hold on to it past the
* unsubscribe */
ao2_ref(caching_topic->sub, +1);
stasis_unsubscribe(caching_topic->sub);
} else {
ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
@ -77,6 +82,20 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to
return NULL;
}
struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
{
if (!caching_topic) {
return NULL;
}
/* Hold a ref past the unsubscribe */
ao2_ref(caching_topic, +1);
stasis_caching_unsubscribe(caching_topic);
stasis_subscription_join(caching_topic->sub);
ao2_cleanup(caching_topic);
return NULL;
}
struct cache_entry {
struct stasis_message_type *type;
char *id;

@ -505,6 +505,9 @@ int ast_channel_snapshot_caller_id_equal(
void ast_stasis_channels_shutdown(void)
{
channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
ao2_cleanup(channel_topic_all);
channel_topic_all = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
@ -512,9 +515,6 @@ void ast_stasis_channels_shutdown(void)
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
ao2_cleanup(channel_topic_all);
channel_topic_all = NULL;
channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached);
}
void ast_stasis_channels_init(void)

@ -101,15 +101,6 @@ static const char *endpoint_snapshot_get_id(struct stasis_message *message)
}
static void endpoints_stasis_shutdown(void)
{
ao2_cleanup(endpoint_topic_all);
endpoint_topic_all = NULL;
stasis_caching_unsubscribe(endpoint_topic_all_cached);
endpoint_topic_all_cached = NULL;
}
struct ast_json *ast_endpoint_snapshot_to_json(
const struct ast_endpoint_snapshot *snapshot)
{
@ -149,6 +140,15 @@ struct ast_json *ast_endpoint_snapshot_to_json(
return ast_json_ref(json);
}
static void endpoints_stasis_shutdown(void)
{
stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached);
endpoint_topic_all_cached = NULL;
ao2_cleanup(endpoint_topic_all);
endpoint_topic_all = NULL;
}
int ast_endpoint_stasis_init(void)
{
ast_register_atexit(endpoints_stasis_shutdown);

@ -74,6 +74,7 @@ static void router_dtor(void *obj)
size_t i;
ast_assert(!stasis_subscription_is_subscribed(router->subscription));
ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
for (i = 0; i < router->num_routes_current; ++i) {
ao2_cleanup(router->routes[i]);
@ -165,6 +166,26 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router)
stasis_unsubscribe(router->subscription);
}
void stasis_message_router_unsubscribe_and_join(
struct stasis_message_router *router)
{
if (!router) {
return;
}
stasis_unsubscribe_and_join(router->subscription);
}
int stasis_message_router_is_done(struct stasis_message_router *router)
{
if (!router) {
/* Null router is about as done as you can get */
return 1;
}
return stasis_subscription_is_done(router->subscription);
}
static struct stasis_message_route *route_create(
struct stasis_message_type *message_type,
stasis_subscription_cb callback,

@ -172,9 +172,9 @@ static int load_module(void)
static int unload_module(void)
{
stasis_unsubscribe(sub);
stasis_unsubscribe_and_join(sub);
sub = NULL;
stasis_message_router_unsubscribe(router);
stasis_message_router_unsubscribe_and_join(router);
router = NULL;
return 0;
}

@ -4770,12 +4770,8 @@ static int unload_module(void)
ast_unregister_application(app_ajileave);
ast_manager_unregister("JabberSend");
ast_custom_function_unregister(&jabberstatus_function);
if (mwi_sub) {
mwi_sub = stasis_unsubscribe(mwi_sub);
}
if (device_state_sub) {
device_state_sub = stasis_unsubscribe(device_state_sub);
}
mwi_sub = stasis_unsubscribe_and_join(mwi_sub);
device_state_sub = stasis_unsubscribe_and_join(device_state_sub);
ast_custom_function_unregister(&jabberreceive_function);
ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {

@ -722,7 +722,7 @@ static int unload_module(void)
{
int r = 0;
stasis_message_router_unsubscribe(channel_router);
stasis_message_router_unsubscribe_and_join(channel_router);
channel_router = NULL;
ao2_cleanup(apps_registry);

Loading…
Cancel
Save