stasis: Remove silly usage of RAII_VAR.

Change-Id: Ib11193531e797bcb16bba560a408eab155f706d1
pull/9/head
Corey Farrell 7 years ago
parent 92ff6c6598
commit f0eb00d1e7

@ -422,10 +422,10 @@ static void subscription_invoke(struct stasis_subscription *sub,
{ {
/* Notify that the final message has been received */ /* Notify that the final message has been received */
if (stasis_subscription_final_message(sub, message)) { if (stasis_subscription_final_message(sub, message)) {
SCOPED_AO2LOCK(lock, sub); ao2_lock(sub);
sub->final_message_rxed = 1; sub->final_message_rxed = 1;
ast_cond_signal(&sub->join_cond); ast_cond_signal(&sub->join_cond);
ao2_unlock(sub);
} }
/* Since sub is mostly immutable, no need to lock sub */ /* Since sub is mostly immutable, no need to lock sub */
@ -433,10 +433,10 @@ static void subscription_invoke(struct stasis_subscription *sub,
/* Notify that the final message has been processed */ /* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) { if (stasis_subscription_final_message(sub, message)) {
SCOPED_AO2LOCK(lock, sub); ao2_lock(sub);
sub->final_message_processed = 1; sub->final_message_processed = 1;
ast_cond_signal(&sub->join_cond); ast_cond_signal(&sub->join_cond);
ao2_unlock(sub);
} }
} }
@ -454,7 +454,7 @@ struct stasis_subscription *internal_stasis_subscribe(
int needs_mailbox, int needs_mailbox,
int use_thread_pool) int use_thread_pool)
{ {
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); struct stasis_subscription *sub;
if (!topic) { if (!topic) {
return NULL; return NULL;
@ -486,6 +486,8 @@ struct stasis_subscription *internal_stasis_subscribe(
sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT); sub->mailbox = ast_taskprocessor_get(tps_name, TPS_REF_DEFAULT);
} }
if (!sub->mailbox) { if (!sub->mailbox) {
ao2_ref(sub, -1);
return NULL; return NULL;
} }
ast_taskprocessor_set_local(sub->mailbox, sub); ast_taskprocessor_set_local(sub->mailbox, sub);
@ -500,11 +502,12 @@ struct stasis_subscription *internal_stasis_subscribe(
ast_cond_init(&sub->join_cond, NULL); ast_cond_init(&sub->join_cond, NULL);
if (topic_add_subscription(topic, sub) != 0) { if (topic_add_subscription(topic, sub) != 0) {
ao2_ref(sub, -1);
return NULL; return NULL;
} }
send_subscription_subscribe(topic, sub); send_subscription_subscribe(topic, sub);
ao2_ref(sub, +1);
return sub; return sub;
} }
@ -535,18 +538,21 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{ {
/* The subscription may be the last ref to this topic. Hold /* The subscription may be the last ref to this topic. Hold
* the topic ref open until after the unlock. */ * the topic ref open until after the unlock. */
RAII_VAR(struct stasis_topic *, topic, struct stasis_topic *topic;
ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
if (!sub) { if (!sub) {
return NULL; return NULL;
} }
topic = ao2_bump(sub->topic);
/* We have to remove the subscription first, to ensure the unsubscribe /* We have to remove the subscription first, to ensure the unsubscribe
* is the final message */ * is the final message */
if (topic_remove_subscription(sub->topic, sub) != 0) { if (topic_remove_subscription(sub->topic, sub) != 0) {
ast_log(LOG_ERROR, ast_log(LOG_ERROR,
"Internal error: subscription has invalid topic\n"); "Internal error: subscription has invalid topic\n");
ao2_cleanup(topic);
return NULL; return NULL;
} }
@ -560,6 +566,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
/* Unsubscribing unrefs the subscription */ /* Unsubscribing unrefs the subscription */
ao2_cleanup(sub); ao2_cleanup(sub);
ao2_cleanup(topic);
return NULL; return NULL;
} }
@ -578,22 +586,26 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
void stasis_subscription_join(struct stasis_subscription *subscription) void stasis_subscription_join(struct stasis_subscription *subscription)
{ {
if (subscription) { if (subscription) {
SCOPED_AO2LOCK(lock, subscription); ao2_lock(subscription);
/* Wait until the processed flag has been set */ /* Wait until the processed flag has been set */
while (!subscription->final_message_processed) { while (!subscription->final_message_processed) {
ast_cond_wait(&subscription->join_cond, ast_cond_wait(&subscription->join_cond,
ao2_object_get_lockaddr(subscription)); ao2_object_get_lockaddr(subscription));
} }
ao2_unlock(subscription);
} }
} }
int stasis_subscription_is_done(struct stasis_subscription *subscription) int stasis_subscription_is_done(struct stasis_subscription *subscription)
{ {
if (subscription) { if (subscription) {
SCOPED_AO2LOCK(lock, subscription); int ret;
ao2_lock(subscription);
ret = subscription->final_message_rxed;
ao2_unlock(subscription);
return subscription->final_message_rxed; return ret;
} }
/* Null subscription is about as done as you can get */ /* Null subscription is about as done as you can get */
@ -621,13 +633,15 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
if (sub) { if (sub) {
size_t i; size_t i;
struct stasis_topic *topic = sub->topic; struct stasis_topic *topic = sub->topic;
SCOPED_AO2LOCK(lock_topic, topic);
ao2_lock(topic);
for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) { for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
if (AST_VECTOR_GET(&topic->subscribers, i) == sub) { if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
ao2_unlock(topic);
return 1; return 1;
} }
} }
ao2_unlock(topic);
} }
return 0; return 0;
@ -668,8 +682,8 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub) static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{ {
size_t idx; size_t idx;
SCOPED_AO2LOCK(lock, topic);
ao2_lock(topic);
/* The reference from the topic to the subscription is shared with /* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe * the owner of the subscription, which will explicitly unsubscribe
* to release it. * to release it.
@ -682,6 +696,7 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
topic_add_subscription( topic_add_subscription(
AST_VECTOR_GET(&topic->upstream_topics, idx), sub); AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
} }
ao2_unlock(topic);
return 0; return 0;
} }
@ -689,15 +704,18 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub) static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{ {
size_t idx; size_t idx;
SCOPED_AO2LOCK(lock_topic, topic); int res;
ao2_lock(topic);
for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) { for (idx = 0; idx < AST_VECTOR_SIZE(&topic->upstream_topics); ++idx) {
topic_remove_subscription( topic_remove_subscription(
AST_VECTOR_GET(&topic->upstream_topics, idx), sub); AST_VECTOR_GET(&topic->upstream_topics, idx), sub);
} }
res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
return AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub,
AST_VECTOR_ELEM_CLEANUP_NOOP); AST_VECTOR_ELEM_CLEANUP_NOOP);
ao2_unlock(topic);
return res;
} }
/*! /*!
@ -1214,25 +1232,25 @@ static void multi_object_blob_dtor(void *obj)
struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob) struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob)
{ {
int type; int type;
RAII_VAR(struct ast_multi_object_blob *, multi, struct ast_multi_object_blob *multi;
ao2_alloc(sizeof(*multi), multi_object_blob_dtor),
ao2_cleanup);
ast_assert(blob != NULL); ast_assert(blob != NULL);
multi = ao2_alloc(sizeof(*multi), multi_object_blob_dtor);
if (!multi) { if (!multi) {
return NULL; return NULL;
} }
for (type = 0; type < STASIS_UMOS_MAX; ++type) { for (type = 0; type < STASIS_UMOS_MAX; ++type) {
if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) { if (AST_VECTOR_INIT(&multi->snapshots[type], 0)) {
ao2_ref(multi, -1);
return NULL; return NULL;
} }
} }
multi->blob = ast_json_ref(blob); multi->blob = ast_json_ref(blob);
ao2_ref(multi, +1);
return multi; return multi;
} }
@ -1249,9 +1267,9 @@ void ast_multi_object_blob_add(struct ast_multi_object_blob *multi,
void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
struct stasis_message_type *type, struct ast_json *blob) struct stasis_message_type *type, struct ast_json *blob)
{ {
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); struct stasis_message *message;
RAII_VAR(struct ast_channel_snapshot *, channel_snapshot, NULL, ao2_cleanup); struct ast_channel_snapshot *channel_snapshot;
RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup); struct ast_multi_object_blob *multi;
if (!type) { if (!type) {
return; return;
@ -1263,13 +1281,20 @@ void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan,
} }
channel_snapshot = ast_channel_snapshot_create(chan); channel_snapshot = ast_channel_snapshot_create(chan);
ao2_ref(channel_snapshot, +1); if (!channel_snapshot) {
ao2_ref(multi, -1);
return;
}
/* this call steals the channel_snapshot reference */
ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot); ast_multi_object_blob_add(multi, STASIS_UMOS_CHANNEL, channel_snapshot);
message = stasis_message_create(type, multi); message = stasis_message_create(type, multi);
ao2_ref(multi, -1);
if (message) { if (message) {
/* app_userevent still publishes to channel */ /* app_userevent still publishes to channel */
stasis_publish(ast_channel_topic(chan), message); stasis_publish(ast_channel_topic(chan), message);
ao2_ref(message, -1);
} }
} }
@ -1278,7 +1303,7 @@ static struct ast_json *multi_user_event_to_json(
struct stasis_message *message, struct stasis_message *message,
const struct stasis_message_sanitizer *sanitize) const struct stasis_message_sanitizer *sanitize)
{ {
RAII_VAR(struct ast_json *, out, NULL, ast_json_unref); struct ast_json *out;
struct ast_multi_object_blob *multi = stasis_message_data(message); struct ast_multi_object_blob *multi = stasis_message_data(message);
struct ast_json *blob = multi->blob; struct ast_json *blob = multi->blob;
const struct timeval *tv = stasis_message_timestamp(message); const struct timeval *tv = stasis_message_timestamp(message);
@ -1320,7 +1345,8 @@ static struct ast_json *multi_user_event_to_json(
} }
} }
} }
return ast_json_ref(out);
return out;
} }
/*! \internal \brief convert multi object blob to ami string */ /*! \internal \brief convert multi object blob to ami string */
@ -1513,17 +1539,19 @@ static void *stasis_config_alloc(void)
int stasis_message_type_declined(const char *name) int stasis_message_type_declined(const char *name)
{ {
RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup); struct stasis_config *cfg = ao2_global_obj_ref(globals);
char *name_in_declined; char *name_in_declined;
int res; int res;
if (!cfg || !cfg->declined_message_types) { if (!cfg || !cfg->declined_message_types) {
ao2_cleanup(cfg);
return 0; return 0;
} }
name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY); name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
res = name_in_declined ? 1 : 0; res = name_in_declined ? 1 : 0;
ao2_cleanup(name_in_declined); ao2_cleanup(name_in_declined);
ao2_ref(cfg, -1);
if (res) { if (res) {
ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name); ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
} }
@ -1569,7 +1597,7 @@ static void stasis_cleanup(void)
int stasis_init(void) int stasis_init(void)
{ {
RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup); struct stasis_config *cfg;
int cache_init; int cache_init;
struct ast_threadpool_options threadpool_opts = { 0, }; struct ast_threadpool_options threadpool_opts = { 0, };
@ -1605,11 +1633,14 @@ int stasis_init(void)
if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) { if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) {
ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n"); ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n");
ao2_ref(default_cfg, -1); ao2_ref(default_cfg, -1);
return -1; return -1;
} }
if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) { if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) {
ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n"); ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n");
ao2_ref(default_cfg, -1);
return -1; return -1;
} }
@ -1620,6 +1651,7 @@ int stasis_init(void)
cfg = ao2_global_obj_ref(globals); cfg = ao2_global_obj_ref(globals);
if (!cfg) { if (!cfg) {
ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n"); ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n");
return -1; return -1;
} }
} }
@ -1630,8 +1662,10 @@ int stasis_init(void)
threadpool_opts.max_size = cfg->threadpool_options->max_size; threadpool_opts.max_size = cfg->threadpool_options->max_size;
threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec; threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts); pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
ao2_ref(cfg, -1);
if (!pool) { if (!pool) {
ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n"); ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
return -1; return -1;
} }

Loading…
Cancel
Save