res_stasis: Reduce RAII_VAR usage.

In addition to being a micro-optimization (RAII_VAR has overhead), this
change improves output of REF_DEBUG.  Unfortunately when RAII_VAR calls
ao2_cleanup it does so from a generated _dtor_varname function.  For
example this caused _dtor_app to release a reference instead of
__stasis_app_unregister.

Change-Id: I4ce67120583a446babf9adeec678b71d37fcd9e5
pull/9/head
Corey Farrell 7 years ago
parent 26e5edd043
commit 55a540272f

@ -498,7 +498,8 @@ static void moh_after_bridge_cb(struct ast_channel *chan, void *data)
/*! Request a bridge MOH channel */ /*! Request a bridge MOH channel */
static struct ast_channel *prepare_bridge_moh_channel(void) static struct ast_channel *prepare_bridge_moh_channel(void)
{ {
RAII_VAR(struct ast_format_cap *, cap, NULL, ao2_cleanup); struct ast_channel *chan;
struct ast_format_cap *cap;
cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT); cap = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
if (!cap) { if (!cap) {
@ -507,7 +508,10 @@ static struct ast_channel *prepare_bridge_moh_channel(void)
ast_format_cap_append(cap, ast_format_slin, 0); ast_format_cap_append(cap, ast_format_slin, 0);
return ast_request("Announcer", cap, NULL, NULL, "ARI_MOH", NULL); chan = ast_request("Announcer", cap, NULL, NULL, "ARI_MOH", NULL);
ao2_ref(cap, -1);
return chan;
} }
/*! Provides the moh channel with a thread so it can actually play its music */ /*! Provides the moh channel with a thread so it can actually play its music */
@ -599,23 +603,27 @@ static struct ast_channel *bridge_moh_create(struct ast_bridge *bridge)
struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge) struct ast_channel *stasis_app_bridge_moh_channel(struct ast_bridge *bridge)
{ {
RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup); struct ast_channel *chan;
struct stasis_app_bridge_channel_wrapper *moh_wrapper;
{ ao2_lock(app_bridges_moh);
SCOPED_AO2LOCK(lock, app_bridges_moh); moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!moh_wrapper) {
chan = bridge_moh_create(bridge);
}
ao2_unlock(app_bridges_moh);
moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (moh_wrapper) {
if (!moh_wrapper) { chan = ast_channel_get_by_name(moh_wrapper->channel_id);
return bridge_moh_create(bridge); ao2_ref(moh_wrapper, -1);
}
} }
return ast_channel_get_by_name(moh_wrapper->channel_id); return chan;
} }
int stasis_app_bridge_moh_stop(struct ast_bridge *bridge) int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
{ {
RAII_VAR(struct stasis_app_bridge_channel_wrapper *, moh_wrapper, NULL, ao2_cleanup); struct stasis_app_bridge_channel_wrapper *moh_wrapper;
struct ast_channel *chan; struct ast_channel *chan;
moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK); moh_wrapper = ao2_find(app_bridges_moh, bridge->uniqueid, OBJ_SEARCH_KEY | OBJ_UNLINK);
@ -624,6 +632,7 @@ int stasis_app_bridge_moh_stop(struct ast_bridge *bridge)
} }
chan = ast_channel_get_by_name(moh_wrapper->channel_id); chan = ast_channel_get_by_name(moh_wrapper->channel_id);
ao2_ref(moh_wrapper, -1);
if (!chan) { if (!chan) {
return -1; return -1;
} }
@ -862,25 +871,30 @@ static const struct ast_datastore_info replace_channel_store_info = {
static struct replace_channel_store *get_replace_channel_store(struct ast_channel *chan, int no_create) static struct replace_channel_store *get_replace_channel_store(struct ast_channel *chan, int no_create)
{ {
struct ast_datastore *datastore; struct ast_datastore *datastore;
struct replace_channel_store *ret;
SCOPED_CHANNELLOCK(lock, chan); ast_channel_lock(chan);
datastore = ast_channel_datastore_find(chan, &replace_channel_store_info, NULL); datastore = ast_channel_datastore_find(chan, &replace_channel_store_info, NULL);
if (!datastore) { if (!datastore && !no_create) {
if (no_create) {
return NULL;
}
datastore = ast_datastore_alloc(&replace_channel_store_info, NULL); datastore = ast_datastore_alloc(&replace_channel_store_info, NULL);
if (!datastore) { if (datastore) {
return NULL; ast_channel_datastore_add(chan, datastore);
} }
ast_channel_datastore_add(chan, datastore); }
if (!datastore) {
ast_channel_unlock(chan);
return NULL;
} }
if (!datastore->data) { if (!datastore->data) {
datastore->data = ast_calloc(1, sizeof(struct replace_channel_store)); datastore->data = ast_calloc(1, sizeof(struct replace_channel_store));
} }
return datastore->data;
ret = datastore->data;
ast_channel_unlock(chan);
return ret;
} }
int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot) int app_set_replace_channel_snapshot(struct ast_channel *chan, struct ast_channel_snapshot *replace_snapshot)
@ -959,9 +973,9 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
int argc, char *argv[], struct ast_channel_snapshot *snapshot, int argc, char *argv[], struct ast_channel_snapshot *snapshot,
struct ast_channel_snapshot *replace_channel_snapshot) struct ast_channel_snapshot *replace_channel_snapshot)
{ {
RAII_VAR(struct ast_json *, json_blob, NULL, ast_json_unref); struct ast_json *json_blob;
struct ast_json *json_args; struct ast_json *json_args;
RAII_VAR(struct start_message_blob *, payload, NULL, ao2_cleanup); struct start_message_blob *payload;
struct stasis_message *msg; struct stasis_message *msg;
int i; int i;
@ -986,8 +1000,11 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
"args"); "args");
if (!json_blob) { if (!json_blob) {
ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n"); ast_log(LOG_ERROR, "Error packing JSON for StasisStart message\n");
ao2_ref(payload, -1);
return -1; return -1;
} }
payload->blob = json_blob;
/* Append arguments to args array */ /* Append arguments to args array */
json_args = ast_json_object_get(json_blob, "args"); json_args = ast_json_object_get(json_blob, "args");
@ -997,13 +1014,14 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
ast_json_string_create(argv[i])); ast_json_string_create(argv[i]));
if (r != 0) { if (r != 0) {
ast_log(LOG_ERROR, "Error appending to StasisStart message\n"); ast_log(LOG_ERROR, "Error appending to StasisStart message\n");
ao2_ref(payload, -1);
return -1; return -1;
} }
} }
payload->blob = ast_json_ref(json_blob);
msg = stasis_message_create(start_message_type(), payload); msg = stasis_message_create(start_message_type(), payload);
ao2_ref(payload, -1);
if (!msg) { if (!msg) {
ast_log(LOG_ERROR, "Error sending StasisStart message\n"); ast_log(LOG_ERROR, "Error sending StasisStart message\n");
return -1; return -1;
@ -1020,9 +1038,9 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
static int send_start_msg(struct stasis_app *app, struct ast_channel *chan, static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
int argc, char *argv[]) int argc, char *argv[])
{ {
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); int ret = -1;
RAII_VAR(struct ast_channel_snapshot *, replace_channel_snapshot, struct ast_channel_snapshot *snapshot;
NULL, ao2_cleanup); struct ast_channel_snapshot *replace_channel_snapshot;
ast_assert(chan != NULL); ast_assert(chan != NULL);
@ -1032,10 +1050,13 @@ static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
ast_channel_lock(chan); ast_channel_lock(chan);
snapshot = ast_channel_snapshot_create(chan); snapshot = ast_channel_snapshot_create(chan);
ast_channel_unlock(chan); ast_channel_unlock(chan);
if (!snapshot) { if (snapshot) {
return -1; ret = send_start_msg_snapshots(chan, app, argc, argv, snapshot, replace_channel_snapshot);
ao2_ref(snapshot, -1);
} }
return send_start_msg_snapshots(chan, app, argc, argv, snapshot, replace_channel_snapshot); ao2_cleanup(replace_channel_snapshot);
return ret;
} }
static void remove_masquerade_store(struct ast_channel *chan); static void remove_masquerade_store(struct ast_channel *chan);
@ -1478,7 +1499,7 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
int stasis_app_send(const char *app_name, struct ast_json *message) int stasis_app_send(const char *app_name, struct ast_json *message)
{ {
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); struct stasis_app *app;
if (!apps_registry) { if (!apps_registry) {
return -1; return -1;
@ -1494,6 +1515,8 @@ int stasis_app_send(const char *app_name, struct ast_json *message)
return -1; return -1;
} }
app_send(app, message); app_send(app, message);
ao2_ref(app, -1);
return 0; return 0;
} }
@ -1528,7 +1551,7 @@ static int append_name(void *obj, void *arg, int flags)
struct ao2_container *stasis_app_get_all(void) struct ao2_container *stasis_app_get_all(void)
{ {
RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup); struct ao2_container *apps;
if (!apps_registry) { if (!apps_registry) {
return NULL; return NULL;
@ -1541,12 +1564,12 @@ struct ao2_container *stasis_app_get_all(void)
ao2_callback(apps_registry, OBJ_NODATA, append_name, apps); ao2_callback(apps_registry, OBJ_NODATA, append_name, apps);
return ao2_bump(apps); return apps;
} }
static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events) static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
{ {
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); struct stasis_app *app;
if (!apps_registry) { if (!apps_registry) {
return -1; return -1;
@ -1558,24 +1581,25 @@ static int __stasis_app_register(const char *app_name, stasis_app_cb handler, vo
app_update(app, handler, data); app_update(app, handler, data);
} else { } else {
app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL); app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
if (app) { if (!app) {
if (all_events) { ao2_unlock(apps_registry);
struct stasis_app_event_source *source; return -1;
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); }
AST_LIST_TRAVERSE(&event_sources, source, next) { if (all_events) {
if (!source->subscribe) { struct stasis_app_event_source *source;
continue;
}
source->subscribe(app, NULL); AST_RWLIST_RDLOCK(&event_sources);
AST_LIST_TRAVERSE(&event_sources, source, next) {
if (!source->subscribe) {
continue;
} }
source->subscribe(app, NULL);
} }
ao2_link_flags(apps_registry, app, OBJ_NOLOCK); AST_RWLIST_UNLOCK(&event_sources);
} else {
ao2_unlock(apps_registry);
return -1;
} }
ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
} }
/* We lazily clean up the apps_registry, because it's good enough to /* We lazily clean up the apps_registry, because it's good enough to
@ -1583,6 +1607,7 @@ static int __stasis_app_register(const char *app_name, stasis_app_cb handler, vo
*/ */
cleanup(); cleanup();
ao2_unlock(apps_registry); ao2_unlock(apps_registry);
ao2_ref(app, -1);
return 0; return 0;
} }
@ -1598,7 +1623,7 @@ int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *d
void stasis_app_unregister(const char *app_name) void stasis_app_unregister(const char *app_name)
{ {
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); struct stasis_app *app;
if (!app_name) { if (!app_name) {
return; return;
@ -1621,23 +1646,27 @@ void stasis_app_unregister(const char *app_name)
* and clean up, just in case * and clean up, just in case
*/ */
cleanup(); cleanup();
ao2_ref(app, -1);
} }
void stasis_app_register_event_source(struct stasis_app_event_source *obj) void stasis_app_register_event_source(struct stasis_app_event_source *obj)
{ {
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); AST_RWLIST_WRLOCK(&event_sources);
AST_LIST_INSERT_TAIL(&event_sources, obj, next); AST_LIST_INSERT_TAIL(&event_sources, obj, next);
/* only need to bump the module ref on non-core sources because the /* only need to bump the module ref on non-core sources because the
core ones are [un]registered by this module. */ core ones are [un]registered by this module. */
if (!stasis_app_is_core_event_source(obj)) { if (!stasis_app_is_core_event_source(obj)) {
ast_module_ref(ast_module_info->self); ast_module_ref(ast_module_info->self);
} }
AST_RWLIST_UNLOCK(&event_sources);
} }
void stasis_app_unregister_event_source(struct stasis_app_event_source *obj) void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
{ {
struct stasis_app_event_source *source; struct stasis_app_event_source *source;
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
AST_RWLIST_WRLOCK(&event_sources);
AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) { AST_RWLIST_TRAVERSE_SAFE_BEGIN(&event_sources, source, next) {
if (source == obj) { if (source == obj) {
AST_RWLIST_REMOVE_CURRENT(next); AST_RWLIST_REMOVE_CURRENT(next);
@ -1648,6 +1677,7 @@ void stasis_app_unregister_event_source(struct stasis_app_event_source *obj)
} }
} }
AST_RWLIST_TRAVERSE_SAFE_END; AST_RWLIST_TRAVERSE_SAFE_END;
AST_RWLIST_UNLOCK(&event_sources);
} }
/*! /*!
@ -1666,12 +1696,15 @@ static struct ast_json *app_event_sources_to_json(
const struct stasis_app *app, struct ast_json *json) const struct stasis_app *app, struct ast_json *json)
{ {
struct stasis_app_event_source *source; struct stasis_app_event_source *source;
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
AST_RWLIST_RDLOCK(&event_sources);
AST_LIST_TRAVERSE(&event_sources, source, next) { AST_LIST_TRAVERSE(&event_sources, source, next) {
if (source->to_json) { if (source->to_json) {
source->to_json(app, json); source->to_json(app, json);
} }
} }
AST_RWLIST_UNLOCK(&event_sources);
return json; return json;
} }
@ -1686,9 +1719,12 @@ static struct ast_json *stasis_app_object_to_json(struct stasis_app *app)
struct ast_json *stasis_app_to_json(const char *app_name) struct ast_json *stasis_app_to_json(const char *app_name)
{ {
RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup); struct stasis_app *app = find_app_by_name(app_name);
struct ast_json *json = stasis_app_object_to_json(app);
return stasis_app_object_to_json(app); ao2_cleanup(app);
return json;
} }
/*! /*!
@ -1705,13 +1741,16 @@ struct ast_json *stasis_app_to_json(const char *app_name)
static struct stasis_app_event_source *app_event_source_find(const char *uri) static struct stasis_app_event_source *app_event_source_find(const char *uri)
{ {
struct stasis_app_event_source *source; struct stasis_app_event_source *source;
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
AST_RWLIST_RDLOCK(&event_sources);
AST_LIST_TRAVERSE(&event_sources, source, next) { AST_LIST_TRAVERSE(&event_sources, source, next) {
if (ast_begins_with(uri, source->scheme)) { if (ast_begins_with(uri, source->scheme)) {
return source; break;
} }
} }
return NULL; AST_RWLIST_UNLOCK(&event_sources);
return source;
} }
/*! /*!
@ -1746,25 +1785,32 @@ static enum stasis_app_subscribe_res app_handle_subscriptions(
int event_sources_count, struct ast_json **json, int event_sources_count, struct ast_json **json,
app_subscription_handler handler) app_subscription_handler handler)
{ {
RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup); struct stasis_app *app = find_app_by_name(app_name);
int i; int i;
ast_assert(handler != NULL);
if (!app) { if (!app) {
return STASIS_ASR_APP_NOT_FOUND; return STASIS_ASR_APP_NOT_FOUND;
} }
for (i = 0; i < event_sources_count; ++i) { for (i = 0; i < event_sources_count; ++i) {
const char *uri = event_source_uris[i]; const char *uri = event_source_uris[i];
enum stasis_app_subscribe_res res = STASIS_ASR_INTERNAL_ERROR;
struct stasis_app_event_source *event_source; struct stasis_app_event_source *event_source;
enum stasis_app_subscribe_res res;
if (!(event_source = app_event_source_find(uri))) { event_source = app_event_source_find(uri);
if (!event_source) {
ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri); ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
ao2_ref(app, -1);
return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME; return STASIS_ASR_EVENT_SOURCE_BAD_SCHEME;
} }
if (handler && res = handler(app, uri, event_source);
((res = handler(app, uri, event_source)))) { if (res != STASIS_ASR_OK) {
ao2_ref(app, -1);
return res; return res;
} }
} }
@ -1774,13 +1820,15 @@ static enum stasis_app_subscribe_res app_handle_subscriptions(
*json = stasis_app_object_to_json(app); *json = stasis_app_object_to_json(app);
} }
ao2_ref(app, -1);
return STASIS_ASR_OK; return STASIS_ASR_OK;
} }
enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name, enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
struct ast_channel *chan) struct ast_channel *chan)
{ {
RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup); struct stasis_app *app = find_app_by_name(app_name);
int res; int res;
if (!app) { if (!app) {
@ -1790,6 +1838,8 @@ enum stasis_app_subscribe_res stasis_app_subscribe_channel(const char *app_name,
ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan)); ast_debug(3, "%s: Subscribing to %s\n", app_name, ast_channel_uniqueid(chan));
res = app_subscribe_channel(app, chan); res = app_subscribe_channel(app, chan);
ao2_ref(app, -1);
if (res != 0) { if (res != 0) {
ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n", ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n",
app_name, ast_channel_uniqueid(chan)); app_name, ast_channel_uniqueid(chan));
@ -1892,12 +1942,10 @@ enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
struct ast_json *json_variables) struct ast_json *json_variables)
{ {
RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup); RAII_VAR(struct stasis_app *, app, find_app_by_name(app_name), ao2_cleanup);
RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); struct ast_json *blob = NULL;
RAII_VAR(struct ast_multi_object_blob *, multi, NULL, ao2_cleanup); struct ast_multi_object_blob *multi;
RAII_VAR(void *, obj, NULL, ao2_cleanup); struct stasis_message *message;
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
enum stasis_app_user_event_res res = STASIS_APP_USER_INTERNAL_ERROR; enum stasis_app_user_event_res res = STASIS_APP_USER_INTERNAL_ERROR;
struct ast_json *json_value;
int have_channel = 0; int have_channel = 0;
int i; int i;
@ -1910,23 +1958,29 @@ enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
return res; return res;
} }
blob = json_variables; if (json_variables) {
if (!blob) { struct ast_json *json_value = ast_json_string_create(event_name);
blob = ast_json_pack("{}");
if (json_value && !ast_json_object_set(json_variables, "eventname", json_value)) {
blob = ast_json_ref(json_variables);
}
} else { } else {
ast_json_ref(blob); blob = ast_json_pack("{s: s}", "eventname", event_name);
} }
json_value = ast_json_string_create(event_name);
if (!json_value) { if (!blob) {
ast_log(LOG_ERROR, "unable to create json string\n"); ast_log(LOG_ERROR, "Failed to initialize blob\n");
return res;
}
if (ast_json_object_set(blob, "eventname", json_value)) {
ast_log(LOG_ERROR, "unable to set eventname to blob\n");
return res; return res;
} }
multi = ast_multi_object_blob_create(blob); multi = ast_multi_object_blob_create(blob);
ast_json_unref(blob);
if (!multi) {
ast_log(LOG_ERROR, "Failed to initialize multi\n");
return res;
}
for (i = 0; i < sources_count; ++i) { for (i = 0; i < sources_count; ++i) {
const char *uri = source_uris[i]; const char *uri = source_uris[i];
@ -1945,16 +1999,22 @@ enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL); snapshot = ast_endpoint_latest_snapshot(uri + 9, NULL);
} else { } else {
ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri); ast_log(LOG_WARNING, "Invalid scheme: %s\n", uri);
ao2_ref(multi, -1);
return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME; return STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME;
} }
if (!snapshot) { if (!snapshot) {
ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri); ast_log(LOG_ERROR, "Unable to get snapshot for %s\n", uri);
ao2_ref(multi, -1);
return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND; return STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND;
} }
ast_multi_object_blob_add(multi, type, snapshot); ast_multi_object_blob_add(multi, type, snapshot);
} }
message = stasis_message_create(ast_multi_user_event_type(), multi); message = stasis_message_create(ast_multi_user_event_type(), multi);
ao2_ref(multi, -1);
if (!message) { if (!message) {
ast_log(LOG_ERROR, "Unable to create stasis user event message\n"); ast_log(LOG_ERROR, "Unable to create stasis user event message\n");
return res; return res;
@ -1971,6 +2031,7 @@ enum stasis_app_user_event_res stasis_app_user_event(const char *app_name,
if (have_channel) { if (have_channel) {
stasis_publish(ast_manager_get_topic(), message); stasis_publish(ast_manager_get_topic(), message);
} }
ao2_ref(message, -1);
return STASIS_APP_USER_OK; return STASIS_APP_USER_OK;
} }
@ -2036,9 +2097,14 @@ static int channel_sanitizer(const struct ast_channel *chan)
/* \brief Sanitization callback for channel unique IDs */ /* \brief Sanitization callback for channel unique IDs */
static int channel_id_sanitizer(const char *id) static int channel_id_sanitizer(const char *id)
{ {
RAII_VAR(struct ast_channel_snapshot *, snapshot, ast_channel_snapshot_get_latest(id), ao2_cleanup); struct ast_channel_snapshot *snapshot;
int ret;
snapshot = ast_channel_snapshot_get_latest(id);
ret = channel_snapshot_sanitizer(snapshot);
ao2_cleanup(snapshot);
return channel_snapshot_sanitizer(snapshot); return ret;
} }
/* \brief Sanitization callbacks for communication to Stasis applications */ /* \brief Sanitization callbacks for communication to Stasis applications */

@ -112,20 +112,19 @@ static void forwards_unsubscribe(struct app_forwards *forwards)
static struct app_forwards *forwards_create(struct stasis_app *app, static struct app_forwards *forwards_create(struct stasis_app *app,
const char *id) const char *id)
{ {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); struct app_forwards *forwards;
if (!app || ast_strlen_zero(id)) { if (!app || ast_strlen_zero(id)) {
return NULL; return NULL;
} }
forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor); forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
if (!forwards) { if (!forwards) {
return NULL; return NULL;
} }
strcpy(forwards->id, id); strcpy(forwards->id, id); /* SAFE */
ao2_ref(forwards, +1);
return forwards; return forwards;
} }
@ -335,7 +334,7 @@ static void sub_default_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message) struct stasis_message *message)
{ {
struct stasis_app *app = data; struct stasis_app *app = data;
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ast_json *json;
if (stasis_subscription_final_message(sub, message)) { if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(app); ao2_cleanup(app);
@ -352,6 +351,7 @@ static void sub_default_handler(void *data, struct stasis_subscription *sub,
} }
app_send(app, json); app_send(app, json);
ast_json_unref(json);
} }
/*! \brief Typedef for callbacks that get called on channel snapshot updates */ /*! \brief Typedef for callbacks that get called on channel snapshot updates */
@ -554,11 +554,12 @@ static void sub_channel_update_handler(void *data,
stasis_message_timestamp(message); stasis_message_timestamp(message);
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); struct ast_json *msg;
msg = channel_monitors[i](old_snapshot, new_snapshot, tv); msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
if (msg) { if (msg) {
app_send(app, msg); app_send(app, msg);
ast_json_unref(msg);
} }
} }
@ -586,7 +587,7 @@ static struct ast_json *simple_endpoint_event(
static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt) static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
{ {
RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup); struct ast_endpoint_snapshot *snapshot;
struct ast_json *json_endpoint; struct ast_json *json_endpoint;
struct ast_json *message; struct ast_json *message;
struct stasis_app *app = pvt; struct stasis_app *app = pvt;
@ -610,6 +611,7 @@ static int message_received_handler(const char *endpoint_id, struct ast_json *js
} }
json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer()); json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
ao2_ref(snapshot, -1);
if (!json_endpoint) { if (!json_endpoint) {
return -1; return -1;
} }
@ -631,7 +633,6 @@ static void sub_endpoint_update_handler(void *data,
struct stasis_subscription *sub, struct stasis_subscription *sub,
struct stasis_message *message) struct stasis_message *message)
{ {
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct stasis_app *app = data; struct stasis_app *app = data;
struct stasis_cache_update *update; struct stasis_cache_update *update;
struct ast_endpoint_snapshot *new_snapshot; struct ast_endpoint_snapshot *new_snapshot;
@ -648,6 +649,8 @@ static void sub_endpoint_update_handler(void *data,
old_snapshot = stasis_message_data(update->old_snapshot); old_snapshot = stasis_message_data(update->old_snapshot);
if (new_snapshot) { if (new_snapshot) {
struct ast_json *json;
tv = stasis_message_timestamp(update->new_snapshot); tv = stasis_message_timestamp(update->new_snapshot);
json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv); json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
@ -656,6 +659,7 @@ static void sub_endpoint_update_handler(void *data,
} }
app_send(app, json); app_send(app, json);
ast_json_unref(json);
} }
if (!new_snapshot && old_snapshot) { if (!new_snapshot && old_snapshot) {
@ -683,7 +687,7 @@ static void sub_bridge_update_handler(void *data,
struct stasis_subscription *sub, struct stasis_subscription *sub,
struct stasis_message *message) struct stasis_message *message)
{ {
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ast_json *json = NULL;
struct stasis_app *app = data; struct stasis_app *app = data;
struct stasis_cache_update *update; struct stasis_cache_update *update;
struct ast_bridge_snapshot *new_snapshot; struct ast_bridge_snapshot *new_snapshot;
@ -717,6 +721,7 @@ static void sub_bridge_update_handler(void *data,
if (json) { if (json) {
app_send(app, json); app_send(app, json);
ast_json_unref(json);
} }
if (!new_snapshot && old_snapshot) { if (!new_snapshot && old_snapshot) {
@ -1019,7 +1024,7 @@ void app_send(struct stasis_app *app, struct ast_json *message)
{ {
stasis_app_cb handler; stasis_app_cb handler;
char eid[20]; char eid[20];
RAII_VAR(void *, data, NULL, ao2_cleanup); void *data;
if (ast_json_object_set(message, "asterisk_id", ast_json_string_create( if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) { ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
@ -1028,37 +1033,36 @@ void app_send(struct stasis_app *app, struct ast_json *message)
} }
/* Copy off mutable state with lock held */ /* Copy off mutable state with lock held */
{ ao2_lock(app);
SCOPED_AO2LOCK(lock, app); handler = app->handler;
handler = app->handler; data = ao2_bump(app->data);
if (app->data) { ao2_unlock(app);
ao2_ref(app->data, +1); /* Name is immutable; no need to copy */
data = app->data;
} if (handler) {
/* Name is immutable; no need to copy */ handler(data, app->name, message);
} } else {
if (!handler) {
ast_verb(3, ast_verb(3,
"Inactive Stasis app '%s' missed message\n", app->name); "Inactive Stasis app '%s' missed message\n", app->name);
return;
} }
ao2_cleanup(data);
handler(data, app->name, message);
} }
void app_deactivate(struct stasis_app *app) void app_deactivate(struct stasis_app *app)
{ {
SCOPED_AO2LOCK(lock, app); ao2_lock(app);
ast_verb(1, "Deactivating Stasis app '%s'\n", app->name); ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
app->handler = NULL; app->handler = NULL;
ao2_cleanup(app->data); ao2_cleanup(app->data);
app->data = NULL; app->data = NULL;
ao2_unlock(app);
} }
void app_shutdown(struct stasis_app *app) void app_shutdown(struct stasis_app *app)
{ {
SCOPED_AO2LOCK(lock, app); ao2_lock(app);
ast_assert(app_is_finished(app)); ast_assert(app_is_finished(app));
@ -1068,27 +1072,37 @@ void app_shutdown(struct stasis_app *app)
app->bridge_router = NULL; app->bridge_router = NULL;
stasis_message_router_unsubscribe(app->endpoint_router); stasis_message_router_unsubscribe(app->endpoint_router);
app->endpoint_router = NULL; app->endpoint_router = NULL;
ao2_unlock(app);
} }
int app_is_active(struct stasis_app *app) int app_is_active(struct stasis_app *app)
{ {
SCOPED_AO2LOCK(lock, app); int ret;
return app->handler != NULL;
ao2_lock(app);
ret = app->handler != NULL;
ao2_unlock(app);
return ret;
} }
int app_is_finished(struct stasis_app *app) int app_is_finished(struct stasis_app *app)
{ {
SCOPED_AO2LOCK(lock, app); int ret;
return app->handler == NULL && ao2_container_count(app->forwards) == 0; ao2_lock(app);
ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
ao2_unlock(app);
return ret;
} }
void app_update(struct stasis_app *app, stasis_app_cb handler, void *data) void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
{ {
SCOPED_AO2LOCK(lock, app); ao2_lock(app);
if (app->handler && app->data) { if (app->handler && app->data) {
RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); struct ast_json *msg;
ast_verb(1, "Replacing Stasis app '%s'\n", app->name); ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
@ -1097,17 +1111,15 @@ void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
"application", app->name); "application", app->name);
if (msg) { if (msg) {
app_send(app, msg); app_send(app, msg);
ast_json_unref(msg);
} }
} else { } else {
ast_verb(1, "Activating Stasis app '%s'\n", app->name); ast_verb(1, "Activating Stasis app '%s'\n", app->name);
} }
app->handler = handler; app->handler = handler;
ao2_cleanup(app->data); ao2_replace(app->data, data);
if (data) { ao2_unlock(app);
ao2_ref(data, +1);
}
app->data = data;
} }
const char *stasis_app_name(const struct stasis_app *app) const char *stasis_app_name(const struct stasis_app *app)
@ -1184,68 +1196,72 @@ void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
struct ast_json *app_to_json(const struct stasis_app *app) struct ast_json *app_to_json(const struct stasis_app *app)
{ {
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ast_json *json;
struct ast_json *channels; struct ast_json *channels;
struct ast_json *bridges; struct ast_json *bridges;
struct ast_json *endpoints; struct ast_json *endpoints;
struct ao2_iterator i; struct ao2_iterator i;
void *obj; struct app_forwards *forwards;
json = ast_json_pack("{s: s, s: [], s: [], s: []}", json = ast_json_pack("{s: s, s: [], s: [], s: []}",
"name", app->name, "name", app->name,
"channel_ids", "bridge_ids", "endpoint_ids"); "channel_ids", "bridge_ids", "endpoint_ids");
if (!json) {
return NULL;
}
channels = ast_json_object_get(json, "channel_ids"); channels = ast_json_object_get(json, "channel_ids");
bridges = ast_json_object_get(json, "bridge_ids"); bridges = ast_json_object_get(json, "bridge_ids");
endpoints = ast_json_object_get(json, "endpoint_ids"); endpoints = ast_json_object_get(json, "endpoint_ids");
i = ao2_iterator_init(app->forwards, 0); i = ao2_iterator_init(app->forwards, 0);
while ((obj = ao2_iterator_next(&i))) { while ((forwards = ao2_iterator_next(&i))) {
RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup); struct ast_json *array = NULL;
RAII_VAR(struct ast_json *, id, NULL, ast_json_unref); int append_res;
int append_res = -1;
id = ast_json_string_create(forwards->id);
switch (forwards->forward_type) { switch (forwards->forward_type) {
case FORWARD_CHANNEL: case FORWARD_CHANNEL:
append_res = ast_json_array_append(channels, array = channels;
ast_json_ref(id));
break; break;
case FORWARD_BRIDGE: case FORWARD_BRIDGE:
append_res = ast_json_array_append(bridges, array = bridges;
ast_json_ref(id));
break; break;
case FORWARD_ENDPOINT: case FORWARD_ENDPOINT:
append_res = ast_json_array_append(endpoints, array = endpoints;
ast_json_ref(id));
break; break;
} }
/* If forward_type value is unexpected this will safely return an error. */
append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
ao2_ref(forwards, -1);
if (append_res != 0) { if (append_res != 0) {
ast_log(LOG_ERROR, "Error building response\n"); ast_log(LOG_ERROR, "Error building response\n");
ao2_iterator_destroy(&i); ao2_iterator_destroy(&i);
ast_json_unref(json);
return NULL; return NULL;
} }
} }
ao2_iterator_destroy(&i); ao2_iterator_destroy(&i);
return ast_json_ref(json); return json;
} }
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan) int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{ {
struct app_forwards *forwards; struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
int res;
if (!app) { if (!app) {
return -1; return -1;
} }
ao2_lock(app->forwards);
/* If subscribed to all, don't subscribe again */ /* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) { if (forwards) {
ao2_unlock(app->forwards);
ao2_ref(forwards, -1); ao2_ref(forwards, -1);
return 0; return 0;
} }
@ -1253,16 +1269,21 @@ int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK); OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) { if (!forwards) {
int res;
/* Forwards not found, create one */ /* Forwards not found, create one */
forwards = forwards_create_channel(app, chan); forwards = forwards_create_channel(app, chan);
if (!forwards) { if (!forwards) {
ao2_unlock(app->forwards);
return -1; return -1;
} }
res = ao2_link_flags(app->forwards, forwards, res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
OBJ_NOLOCK);
if (!res) { if (!res) {
ao2_unlock(app->forwards);
ao2_ref(forwards, -1); ao2_ref(forwards, -1);
return -1; return -1;
} }
} }
@ -1273,7 +1294,9 @@ int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
forwards->interested, forwards->interested,
app->name); app->name);
ao2_unlock(app->forwards);
ao2_ref(forwards, -1); ao2_ref(forwards, -1);
return 0; return 0;
} }
@ -1284,8 +1307,7 @@ static int subscribe_channel(struct stasis_app *app, void *obj)
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate) static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
{ {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
if (!id) { if (!id) {
if (!strcmp(kind, "bridge")) { if (!strcmp(kind, "bridge")) {
@ -1300,8 +1322,10 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
} }
} }
ao2_lock(app->forwards);
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK); forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) { if (!forwards) {
ao2_unlock(app->forwards);
ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id); ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
return -1; return -1;
} }
@ -1320,6 +1344,8 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
messaging_app_unsubscribe_endpoint(app->name, id); messaging_app_unsubscribe_endpoint(app->name, id);
} }
} }
ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
return 0; return 0;
} }
@ -1344,12 +1370,14 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id) int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{ {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); struct app_forwards *forwards;
if (ast_strlen_zero(channel_id)) { if (ast_strlen_zero(channel_id)) {
channel_id = CHANNEL_ALL; channel_id = CHANNEL_ALL;
} }
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY); forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
ao2_cleanup(forwards);
return forwards != NULL; return forwards != NULL;
} }
@ -1369,28 +1397,42 @@ struct stasis_app_event_source channel_event_source = {
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge) int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{ {
struct app_forwards *forwards; struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
if (!app) { if (!app) {
return -1; return -1;
} }
ao2_lock(app->forwards);
/* If subscribed to all, don't subscribe again */ /* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) { if (forwards) {
ao2_unlock(app->forwards);
ao2_ref(forwards, -1); ao2_ref(forwards, -1);
return 0; return 0;
} }
forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL, forwards = ao2_find(app->forwards,
bridge ? bridge->uniqueid : BRIDGE_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK); OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) { if (!forwards) {
int res;
/* Forwards not found, create one */ /* Forwards not found, create one */
forwards = forwards_create_bridge(app, bridge); forwards = forwards_create_bridge(app, bridge);
if (!forwards) { if (!forwards) {
ao2_unlock(app->forwards);
return -1;
}
res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
if (!res) {
ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
return -1; return -1;
} }
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
} }
++forwards->interested; ++forwards->interested;
@ -1399,7 +1441,9 @@ int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
forwards->interested, forwards->interested,
app->name); app->name);
ao2_unlock(app->forwards);
ao2_ref(forwards, -1); ao2_ref(forwards, -1);
return 0; return 0;
} }
@ -1466,16 +1510,18 @@ struct stasis_app_event_source bridge_event_source = {
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint) int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{ {
struct app_forwards *forwards; struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
if (!app) { if (!app) {
return -1; return -1;
} }
ao2_lock(app->forwards);
/* If subscribed to all, don't subscribe again */ /* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) { if (forwards) {
ao2_unlock(app->forwards);
ao2_ref(forwards, -1); ao2_ref(forwards, -1);
return 0; return 0;
} }
@ -1483,12 +1529,23 @@ int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint
endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK); OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) { if (!forwards) {
int res;
/* Forwards not found, create one */ /* Forwards not found, create one */
forwards = forwards_create_endpoint(app, endpoint); forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) { if (!forwards) {
ao2_unlock(app->forwards);
return -1;
}
res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
if (!res) {
ao2_unlock(app->forwards);
ao2_ref(forwards, -1);
return -1; return -1;
} }
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
/* Subscribe for messages */ /* Subscribe for messages */
messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app); messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
@ -1500,7 +1557,9 @@ int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint
forwards->interested, forwards->interested,
app->name); app->name);
ao2_unlock(app->forwards);
ao2_ref(forwards, -1); ao2_ref(forwards, -1);
return 0; return 0;
} }
@ -1520,12 +1579,14 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id) int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{ {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); struct app_forwards *forwards;
if (ast_strlen_zero(endpoint_id)) { if (ast_strlen_zero(endpoint_id)) {
endpoint_id = ENDPOINT_ALL; endpoint_id = ENDPOINT_ALL;
} }
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY); forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
ao2_cleanup(forwards);
return forwards != NULL; return forwards != NULL;
} }

@ -76,21 +76,26 @@ struct stasis_app_command *command_create(
void command_complete(struct stasis_app_command *command, int retval) void command_complete(struct stasis_app_command *command, int retval)
{ {
SCOPED_MUTEX(lock, &command->lock); ast_mutex_lock(&command->lock);
command->is_done = 1; command->is_done = 1;
command->retval = retval; command->retval = retval;
ast_cond_signal(&command->condition); ast_cond_signal(&command->condition);
ast_mutex_unlock(&command->lock);
} }
int command_join(struct stasis_app_command *command) int command_join(struct stasis_app_command *command)
{ {
SCOPED_MUTEX(lock, &command->lock); int ret;
ast_mutex_lock(&command->lock);
while (!command->is_done) { while (!command->is_done) {
ast_cond_wait(&command->condition, &command->lock); ast_cond_wait(&command->condition, &command->lock);
} }
return command->retval; ret = command->retval;
ast_mutex_unlock(&command->lock);
return ret;
} }
void command_invoke(struct stasis_app_command *command, void command_invoke(struct stasis_app_command *command,

@ -148,8 +148,9 @@ static void app_control_register_rule(
struct stasis_app_control *control, struct stasis_app_control *control,
struct app_control_rules *list, struct stasis_app_control_rule *obj) struct app_control_rules *list, struct stasis_app_control_rule *obj)
{ {
SCOPED_AO2LOCK(lock, control->command_queue); ao2_lock(control->command_queue);
AST_LIST_INSERT_TAIL(list, obj, next); AST_LIST_INSERT_TAIL(list, obj, next);
ao2_unlock(control->command_queue);
} }
static void app_control_unregister_rule( static void app_control_unregister_rule(
@ -157,7 +158,8 @@ static void app_control_unregister_rule(
struct app_control_rules *list, struct stasis_app_control_rule *obj) struct app_control_rules *list, struct stasis_app_control_rule *obj)
{ {
struct stasis_app_control_rule *rule; struct stasis_app_control_rule *rule;
SCOPED_AO2LOCK(lock, control->command_queue);
ao2_lock(control->command_queue);
AST_RWLIST_TRAVERSE_SAFE_BEGIN(list, rule, next) { AST_RWLIST_TRAVERSE_SAFE_BEGIN(list, rule, next) {
if (rule == obj) { if (rule == obj) {
AST_RWLIST_REMOVE_CURRENT(next); AST_RWLIST_REMOVE_CURRENT(next);
@ -165,6 +167,7 @@ static void app_control_unregister_rule(
} }
} }
AST_RWLIST_TRAVERSE_SAFE_END; AST_RWLIST_TRAVERSE_SAFE_END;
ao2_unlock(control->command_queue);
} }
/*! /*!
@ -508,9 +511,10 @@ static int app_control_mute(struct stasis_app_control *control,
struct ast_channel *chan, void *data) struct ast_channel *chan, void *data)
{ {
struct stasis_app_control_mute_data *mute_data = data; struct stasis_app_control_mute_data *mute_data = data;
SCOPED_CHANNELLOCK(lockvar, chan);
ast_channel_lock(chan);
ast_channel_suppress(control->channel, mute_data->direction, mute_data->frametype); ast_channel_suppress(control->channel, mute_data->direction, mute_data->frametype);
ast_channel_unlock(chan);
return 0; return 0;
} }
@ -535,9 +539,10 @@ static int app_control_unmute(struct stasis_app_control *control,
struct ast_channel *chan, void *data) struct ast_channel *chan, void *data)
{ {
struct stasis_app_control_mute_data *mute_data = data; struct stasis_app_control_mute_data *mute_data = data;
SCOPED_CHANNELLOCK(lockvar, chan);
ast_channel_lock(chan);
ast_channel_unsuppress(control->channel, mute_data->direction, mute_data->frametype); ast_channel_unsuppress(control->channel, mute_data->direction, mute_data->frametype);
ast_channel_unlock(chan);
return 0; return 0;
} }
@ -746,7 +751,7 @@ void stasis_app_control_silence_stop(struct stasis_app_control *control)
struct ast_channel_snapshot *stasis_app_control_get_snapshot( struct ast_channel_snapshot *stasis_app_control_get_snapshot(
const struct stasis_app_control *control) const struct stasis_app_control *control)
{ {
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct stasis_message *msg;
struct ast_channel_snapshot *snapshot; struct ast_channel_snapshot *snapshot;
msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
@ -759,6 +764,8 @@ struct ast_channel_snapshot *stasis_app_control_get_snapshot(
ast_assert(snapshot != NULL); ast_assert(snapshot != NULL);
ao2_ref(snapshot, +1); ao2_ref(snapshot, +1);
ao2_ref(msg, -1);
return snapshot; return snapshot;
} }
@ -767,7 +774,8 @@ static int app_send_command_on_condition(struct stasis_app_control *control,
command_data_destructor_fn data_destructor, command_data_destructor_fn data_destructor,
app_command_can_exec_cb can_exec_fn) app_command_can_exec_cb can_exec_fn)
{ {
RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup); int ret;
struct stasis_app_command *command;
if (control == NULL || control->is_done) { if (control == NULL || control->is_done) {
/* If exec_command_on_condition fails, it calls the data_destructor. /* If exec_command_on_condition fails, it calls the data_destructor.
@ -787,7 +795,10 @@ static int app_send_command_on_condition(struct stasis_app_control *control,
return -1; return -1;
} }
return command_join(command); ret = command_join(command);
ao2_ref(command, -1);
return ret;
} }
int stasis_app_send_command(struct stasis_app_control *control, int stasis_app_send_command(struct stasis_app_control *control,
@ -800,7 +811,7 @@ int stasis_app_send_command_async(struct stasis_app_control *control,
stasis_app_command_cb command_fn, void *data, stasis_app_command_cb command_fn, void *data,
command_data_destructor_fn data_destructor) command_data_destructor_fn data_destructor)
{ {
RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup); struct stasis_app_command *command;
if (control == NULL || control->is_done) { if (control == NULL || control->is_done) {
/* If exec_command fails, it calls the data_destructor. In order to /* If exec_command fails, it calls the data_destructor. In order to
@ -818,18 +829,24 @@ int stasis_app_send_command_async(struct stasis_app_control *control,
if (!command) { if (!command) {
return -1; return -1;
} }
ao2_ref(command, -1);
return 0; return 0;
} }
struct ast_bridge *stasis_app_get_bridge(struct stasis_app_control *control) struct ast_bridge *stasis_app_get_bridge(struct stasis_app_control *control)
{ {
struct ast_bridge *ret;
if (!control) { if (!control) {
return NULL; return NULL;
} else {
SCOPED_AO2LOCK(lock, control);
return control->bridge;
} }
ao2_lock(control);
ret = control->bridge;
ao2_unlock(control);
return ret;
} }
/*! /*!
@ -970,16 +987,16 @@ static int depart_channel(struct stasis_app_control *control, struct ast_channel
static int bridge_channel_depart(struct stasis_app_control *control, static int bridge_channel_depart(struct stasis_app_control *control,
struct ast_channel *chan, void *data) struct ast_channel *chan, void *data)
{ {
struct ast_bridge_channel *bridge_channel = data; struct ast_bridge_channel *bridge_channel;
{ ast_channel_lock(chan);
SCOPED_CHANNELLOCK(lock, chan); bridge_channel = ast_channel_internal_bridge_channel(chan);
ast_channel_unlock(chan);
if (bridge_channel != ast_channel_internal_bridge_channel(chan)) { if (bridge_channel != data) {
ast_debug(3, "%s: Channel is no longer in departable state\n", ast_debug(3, "%s: Channel is no longer in departable state\n",
ast_channel_uniqueid(chan)); ast_channel_uniqueid(chan));
return -1; return -1;
}
} }
ast_debug(3, "%s: Channel departing bridge\n", ast_debug(3, "%s: Channel departing bridge\n",
@ -994,9 +1011,9 @@ static void internal_bridge_after_cb(struct ast_channel *chan, void *data,
enum ast_bridge_after_cb_reason reason) enum ast_bridge_after_cb_reason reason)
{ {
struct stasis_app_control *control = data; struct stasis_app_control *control = data;
SCOPED_AO2LOCK(lock, control);
struct ast_bridge_channel *bridge_channel; struct ast_bridge_channel *bridge_channel;
ao2_lock(control);
ast_debug(3, "%s, %s: %s\n", ast_debug(3, "%s, %s: %s\n",
ast_channel_uniqueid(chan), control->bridge ? control->bridge->uniqueid : "unknown", ast_channel_uniqueid(chan), control->bridge ? control->bridge->uniqueid : "unknown",
ast_bridge_after_cb_reason_string(reason)); ast_bridge_after_cb_reason_string(reason));
@ -1042,6 +1059,7 @@ static void internal_bridge_after_cb(struct ast_channel *chan, void *data,
ast_softhangup_nolock(chan, hangup_flag); ast_softhangup_nolock(chan, hangup_flag);
ast_channel_unlock(chan); ast_channel_unlock(chan);
} }
ao2_unlock(control);
} }
static void bridge_after_cb(struct ast_channel *chan, void *data) static void bridge_after_cb(struct ast_channel *chan, void *data)

@ -250,7 +250,7 @@ static int bridge_stasis_moving(struct ast_bridge_channel *bridge_channel, void
{ {
if (src->v_table == &bridge_stasis_v_table && if (src->v_table == &bridge_stasis_v_table &&
dst->v_table != &bridge_stasis_v_table) { dst->v_table != &bridge_stasis_v_table) {
RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup); struct stasis_app_control *control;
struct ast_channel *chan; struct ast_channel *chan;
chan = bridge_channel->chan; chan = bridge_channel->chan;
@ -263,6 +263,7 @@ static int bridge_stasis_moving(struct ast_bridge_channel *bridge_channel, void
stasis_app_channel_set_stasis_end_published(chan); stasis_app_channel_set_stasis_end_published(chan);
app_send_end_msg(control_app(control), chan); app_send_end_msg(control_app(control), chan);
ao2_ref(control, -1);
} }
return -1; return -1;

Loading…
Cancel
Save