Merge "ARI: Add the ability to subscribe to all events"

changes/04/1304/1
Matt Jordan 10 years ago committed by Gerrit Code Review
commit 17dd4a476c

@ -91,6 +91,21 @@ struct ao2_container *stasis_app_get_all(void);
*/
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data);
/*!
* \brief Register a new Stasis application that receives all Asterisk events.
*
* If an application is already registered with the given name, the old
* application is sent a 'replaced' message and unregistered.
*
* \param app_name Name of this application.
* \param handler Callback for application messages.
* \param data Data blob to pass to the callback. Must be AO2 managed.
*
* \return 0 for success
* \return -1 for error.
*/
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data);
/*!
* \brief Unregister a Stasis application.
* \param app_name Name of the application to unregister.

@ -280,7 +280,9 @@ static void event_session_cleanup(struct event_session *session)
}
event_session_shutdown(session);
ao2_unlink(event_session_registry, session);
if (event_session_registry) {
ao2_unlink(event_session_registry, session);
}
}
/*!
@ -367,6 +369,7 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
struct ast_ari_events_event_websocket_args *args, const char *session_id)
{
RAII_VAR(struct event_session *, session, NULL, ao2_cleanup);
int (* register_handler)(const char *, stasis_app_cb handler, void *data);
size_t size, i;
/* The request must have at least one [app] parameter */
@ -399,6 +402,12 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
}
/* Register the apps with Stasis */
if (args->subscribe_all) {
register_handler = &stasis_app_register_all;
} else {
register_handler = &stasis_app_register;
}
for (i = 0; i < args->app_count; ++i) {
const char *app = args->app[i];
@ -411,10 +420,10 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
if (stasis_app_register(app, stasis_app_message_handler, session)) {
if (register_handler(app, stasis_app_message_handler, session)) {
ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
return event_session_allocation_error_handler(
session, ERROR_TYPE_STASIS_REGISTRATION, ser);
session, ERROR_TYPE_STASIS_REGISTRATION, ser);
}
}
@ -426,8 +435,17 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser,
return 0;
}
static int event_session_shutdown_cb(void *session, void *arg, int flags)
{
event_session_cleanup(session);
return 0;
}
void ast_ari_websocket_events_event_websocket_dtor(void)
{
ao2_callback(event_session_registry, OBJ_MULTIPLE | OBJ_NODATA, event_session_shutdown_cb, NULL);
ao2_cleanup(event_session_registry);
event_session_registry = NULL;
}
@ -462,7 +480,8 @@ void ast_ari_websocket_events_event_websocket_established(
struct ast_ari_websocket_session *ws_session, struct ast_variable *headers,
struct ast_ari_events_event_websocket_args *args)
{
RAII_VAR(struct event_session *, session, NULL, event_session_cleanup);
struct event_session *session;
struct ast_json *msg;
const char *session_id;
@ -474,7 +493,6 @@ void ast_ari_websocket_events_event_websocket_established(
/* Find the event_session and update its websocket */
session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY);
if (session) {
ao2_unlink(event_session_registry, session);
event_session_update_websocket(session, ws_session);
@ -487,6 +505,9 @@ void ast_ari_websocket_events_event_websocket_established(
while ((msg = ast_ari_websocket_session_read(ws_session))) {
ast_json_unref(msg);
}
event_session_cleanup(session);
ao2_ref(session, -1);
}
void ast_ari_events_user_event(struct ast_variable *headers,

@ -47,6 +47,8 @@ struct ast_ari_events_event_websocket_args {
size_t app_count;
/*! Parsing context for app. */
char *app_parse;
/*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */
int subscribe_all;
};
/*!

@ -111,6 +111,9 @@ static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_sess
args.app[j] = (vals[j]);
}
} else
if (strcmp(i->name, "subscribeAll") == 0) {
args.subscribe_all = ast_true(i->value);
} else
{}
}
@ -209,6 +212,9 @@ static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocke
args.app[j] = (vals[j]);
}
} else
if (strcmp(i->name, "subscribeAll") == 0) {
args.subscribe_all = ast_true(i->value);
} else
{}
}

@ -109,6 +109,11 @@ struct ao2_container *app_bridges_moh;
struct ao2_container *app_bridges_playback;
/*!
* \internal \brief List of registered event sources.
*/
AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
static struct ast_json *stasis_end_to_json(struct stasis_message *message,
const struct stasis_message_sanitizer *sanitize)
{
@ -1469,7 +1474,7 @@ struct ao2_container *stasis_app_get_all(void)
return ao2_bump(apps);
}
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@ -1482,8 +1487,20 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
if (app) {
app_update(app, handler, data);
} else {
app = app_create(app_name, handler, data);
app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL);
if (app) {
if (all_events) {
struct stasis_app_event_source *source;
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
AST_LIST_TRAVERSE(&event_sources, source, next) {
if (!source->subscribe) {
continue;
}
source->subscribe(app, NULL);
}
}
ao2_link_flags(apps_registry, app, OBJ_NOLOCK);
} else {
ao2_unlock(apps_registry);
@ -1499,6 +1516,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
return 0;
}
int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
{
return __stasis_app_register(app_name, handler, data, 0);
}
int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data)
{
return __stasis_app_register(app_name, handler, data, 1);
}
void stasis_app_unregister(const char *app_name)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
@ -1526,11 +1553,6 @@ void stasis_app_unregister(const char *app_name)
cleanup();
}
/*!
* \internal \brief List of registered event sources.
*/
AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source);
void stasis_app_register_event_source(struct stasis_app_event_source *obj)
{
SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
@ -1727,8 +1749,8 @@ static enum stasis_app_subscribe_res app_subscribe(
ast_debug(3, "%s: Checking %s\n", app_name, uri);
if (!event_source->find ||
(!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) {
if (!ast_strlen_zero(uri + strlen(event_source->scheme)) &&
(!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) {
ast_log(LOG_WARNING, "Event source not found: %s\n", uri);
return STASIS_ASR_EVENT_SOURCE_NOT_FOUND;
}
@ -2062,6 +2084,7 @@ static int load_module(void)
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support",
.load_pri = AST_MODPRI_APP_DEPEND,
.support_level = AST_MODULE_SUPPORT_CORE,
.load = load_module,
.unload = unload_module,

@ -38,6 +38,10 @@ ASTERISK_REGISTER_FILE()
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"
#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
struct stasis_app {
@ -47,12 +51,16 @@ struct stasis_app {
struct stasis_message_router *router;
/*! Router for handling messages to the bridge all \a topic. */
struct stasis_message_router *bridge_router;
/*! Optional router for handling endpoint messages in 'all' subscriptions */
struct stasis_message_router *endpoint_router;
/*! Container of the channel forwards to this app's topic. */
struct ao2_container *forwards;
/*! Callback function for this application. */
stasis_app_cb handler;
/*! Opaque data to hand to callback function. */
void *data;
/*! Subscription model for the application */
enum stasis_app_subscription_model subscription_model;
/*! Name of the Stasis application */
char name[];
};
@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app,
static struct app_forwards *forwards_create_channel(struct stasis_app *app,
struct ast_channel *chan)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
struct app_forwards *forwards;
if (!app || !chan) {
if (!app) {
return NULL;
}
forwards = forwards_create(app, ast_channel_uniqueid(chan));
forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_CHANNEL;
forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
app->topic);
if (!forwards->topic_forward) {
return NULL;
if (chan) {
forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
app->topic);
}
forwards->topic_cached_forward = stasis_forward_all(
ast_channel_topic_cached(chan), app->topic);
if (!forwards->topic_cached_forward) {
chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
app->topic);
if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
}
ao2_ref(forwards, +1);
return forwards;
}
@ -156,69 +163,100 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app,
static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
struct ast_bridge *bridge)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
struct app_forwards *forwards;
if (!app || !bridge) {
if (!app) {
return NULL;
}
forwards = forwards_create(app, bridge->uniqueid);
forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_BRIDGE;
forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
app->topic);
if (!forwards->topic_forward) {
return NULL;
if (bridge) {
forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
app->topic);
}
forwards->topic_cached_forward = stasis_forward_all(
ast_bridge_topic_cached(bridge), app->topic);
if (!forwards->topic_cached_forward) {
bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
app->topic);
if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
}
ao2_ref(forwards, +1);
return forwards;
}
static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
stasis_publish(app->topic, message);
}
/*! Forward a endpoint's topics to an app */
static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
struct ast_endpoint *endpoint)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
struct app_forwards *forwards;
int ret = 0;
if (!app || !endpoint) {
if (!app) {
return NULL;
}
forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
if (!forwards) {
return NULL;
}
forwards->forward_type = FORWARD_ENDPOINT;
forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
app->topic);
if (!forwards->topic_forward) {
return NULL;
}
if (endpoint) {
forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
app->topic);
forwards->topic_cached_forward = stasis_forward_all(
ast_endpoint_topic_cached(endpoint), app->topic);
if (!forwards->topic_forward || !forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
}
} else {
/* Since endpoint subscriptions also subscribe to channels, in the case
* of all endpoint subscriptions, we only want messages for the endpoints.
* As such, we route those particular messages and then re-publish them
* on the app's topic.
*/
ast_assert(app->endpoint_router == NULL);
app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
if (!app->endpoint_router) {
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1);
return NULL;
}
forwards->topic_cached_forward = stasis_forward_all(
ast_endpoint_topic_cached(endpoint), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
ret |= stasis_message_router_add(app->endpoint_router,
ast_endpoint_state_type(), endpoint_state_cb, app);
ret |= stasis_message_router_add(app->endpoint_router,
ast_endpoint_contact_state_type(), endpoint_state_cb, app);
if (ret) {
ao2_ref(app->endpoint_router, -1);
app->endpoint_router = NULL;
ao2_ref(forwards, -1);
return NULL;
}
}
ao2_ref(forwards, +1);
return forwards;
}
@ -260,6 +298,7 @@ static void app_dtor(void *obj)
ast_assert(app->router == NULL);
ast_assert(app->bridge_router == NULL);
ast_assert(app->endpoint_router == NULL);
ao2_cleanup(app->topic);
app->topic = NULL;
@ -793,7 +832,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub,
}
}
struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data)
struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
{
RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
size_t size;
@ -806,10 +845,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
size = sizeof(*app) + strlen(name) + 1;
app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
if (!app) {
return NULL;
}
app->subscription_model = subscription_model;
app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
@ -877,7 +916,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
return app;
}
struct stasis_topic *ast_app_get_topic(struct stasis_app *app) {
struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
{
return app->topic;
}
@ -930,6 +970,8 @@ void app_shutdown(struct stasis_app *app)
app->router = NULL;
stasis_message_router_unsubscribe(app->bridge_router);
app->bridge_router = NULL;
stasis_message_router_unsubscribe(app->endpoint_router);
app->endpoint_router = NULL;
}
int app_is_active(struct stasis_app *app)
@ -1029,34 +1071,47 @@ struct ast_json *app_to_json(const struct stasis_app *app)
int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
int res;
if (!app || !chan) {
if (!app) {
return -1;
} else {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
}
forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_channel(app, chan);
if (!forwards) {
return -1;
}
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 0;
}
res = ao2_link_flags(app->forwards, forwards,
OBJ_NOLOCK);
if (!res) {
return -1;
}
forwards = ao2_find(app->forwards,
chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_channel(app, chan);
if (!forwards) {
return -1;
}
++forwards->interested;
ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
return 0;
res = ao2_link_flags(app->forwards, forwards,
OBJ_NOLOCK);
if (!res) {
ao2_ref(forwards, -1);
return -1;
}
}
++forwards->interested;
ast_debug(3, "Channel '%s' is %d interested in %s\n",
chan ? ast_channel_uniqueid(chan) : "ALL",
forwards->interested,
app->name);
ao2_ref(forwards, -1);
return 0;
}
static int subscribe_channel(struct stasis_app *app, void *obj)
@ -1069,6 +1124,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
if (!id) {
if (!strcmp(kind, "bridge")) {
id = BRIDGE_ALL;
} else if (!strcmp(kind, "channel")) {
id = CHANNEL_ALL;
} else if (!strcmp(kind, "endpoint")) {
id = ENDPOINT_ALL;
} else {
ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
return -1;
}
}
forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
@ -1095,16 +1163,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id,
int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
{
if (!app || !chan) {
if (!app) {
return -1;
}
return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
}
int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
{
if (!app || !channel_id) {
if (!app) {
return -1;
}
@ -1114,6 +1182,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
if (ast_strlen_zero(channel_id)) {
channel_id = CHANNEL_ALL;
}
forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}
@ -1133,28 +1205,39 @@ struct stasis_app_event_source channel_event_source = {
int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
if (!app || !bridge) {
struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
if (!app) {
return -1;
} else {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
}
forwards = ao2_find(app->forwards, bridge->uniqueid,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 0;
}
forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_bridge(app, bridge);
if (!forwards) {
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
return -1;
}
++forwards->interested;
ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
return 0;
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
}
++forwards->interested;
ast_debug(3, "Bridge '%s' is %d interested in %s\n",
bridge ? bridge->uniqueid : "ALL",
forwards->interested,
app->name);
ao2_ref(forwards, -1);
return 0;
}
static int subscribe_bridge(struct stasis_app *app, void *obj)
@ -1164,16 +1247,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj)
int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
{
if (!app || !bridge) {
if (!app) {
return -1;
}
return app_unsubscribe_bridge_id(app, bridge->uniqueid);
return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
}
int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
{
if (!app || !bridge_id) {
if (!app) {
return -1;
}
@ -1182,9 +1265,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
return forwards != NULL;
struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 1;
}
if (ast_strlen_zero(bridge_id)) {
bridge_id = BRIDGE_ALL;
}
forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 1;
}
return 0;
}
static void *bridge_find(const struct stasis_app *app, const char *id)
@ -1202,31 +1302,43 @@ struct stasis_app_event_source bridge_event_source = {
int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
{
if (!app || !endpoint) {
struct app_forwards *forwards;
SCOPED_AO2LOCK(lock, app->forwards);
if (!app) {
return -1;
} else {
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, app->forwards);
}
forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
OBJ_SEARCH_KEY | OBJ_NOLOCK);
/* If subscribed to all, don't subscribe again */
forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (forwards) {
ao2_ref(forwards, -1);
return 0;
}
forwards = ao2_find(app->forwards,
endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
/* Forwards not found, create one */
forwards = forwards_create_endpoint(app, endpoint);
if (!forwards) {
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
/* Subscribe for messages */
messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
return -1;
}
ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
++forwards->interested;
ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
return 0;
/* Subscribe for messages */
messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
}
++forwards->interested;
ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
forwards->interested,
app->name);
ao2_ref(forwards, -1);
return 0;
}
static int subscribe_endpoint(struct stasis_app *app, void *obj)
@ -1236,7 +1348,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj)
int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
if (!app || !endpoint_id) {
if (!app) {
return -1;
}
@ -1246,6 +1358,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
{
RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
if (ast_strlen_zero(endpoint_id)) {
endpoint_id = ENDPOINT_ALL;
}
forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
return forwards != NULL;
}

@ -36,6 +36,19 @@
*/
struct stasis_app;
enum stasis_app_subscription_model {
/*
* \brief An application must manually subscribe to each
* resource that it cares about. This is the default approach.
*/
STASIS_APP_SUBSCRIBE_MANUAL,
/*
* \brief An application is automatically subscribed to all
* resources in Asterisk, even if it does not control them.
*/
STASIS_APP_SUBSCRIBE_ALL
};
/*!
* \brief Create a res_stasis application.
*
@ -45,7 +58,7 @@ struct stasis_app;
* \return New \c res_stasis application.
* \return \c NULL on error.
*/
struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data);
struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model);
/*!
* \brief Tears down an application.

@ -37,6 +37,11 @@ ASTERISK_REGISTER_FILE()
#include "asterisk/test.h"
#include "messaging.h"
/*!
* \brief Subscription to all technologies
*/
#define TECH_WILDCARD "__AST_ALL_TECH"
/*!
* \brief Number of buckets for the \ref endpoint_subscriptions container
*/
@ -219,10 +224,14 @@ static int has_destination_cb(const struct ast_msg *msg)
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token))
|| !strncasecmp(sub->token, buf, strlen(sub->token)))) {
if (!sub) {
continue;
}
if (!strcmp(sub->token, TECH_WILDCARD)
|| !strncasecmp(sub->token, buf, strlen(sub->token))
|| !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
sub = NULL; /* No ref bump! */
goto match;
}
@ -231,6 +240,7 @@ static int has_destination_cb(const struct ast_msg *msg)
sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY);
if (sub) {
ao2_ref(sub, -1);
goto match;
}
@ -238,7 +248,6 @@ static int has_destination_cb(const struct ast_msg *msg)
return 0;
match:
ao2_cleanup(sub);
return 1;
}
@ -301,7 +310,8 @@ static int handle_msg_cb(struct ast_msg *msg)
continue;
}
if (!strncasecmp(sub->token, buf, strlen(sub->token))) {
if (!strcmp(sub->token, TECH_WILDCARD)
|| !strncasecmp(sub->token, buf, strlen(sub->token))) {
ast_rwlock_unlock(&tech_subscriptions_lock);
ao2_bump(sub);
endpoint_name = buf;
@ -374,7 +384,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
{
struct message_subscription *sub = NULL;
if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY);
} else {
int i;
@ -383,7 +393,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi
for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) {
sub = AST_VECTOR_GET(&tech_subscriptions, i);
if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) {
if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) {
ao2_bump(sub);
break;
}
@ -400,10 +410,6 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
endpoint = ast_endpoint_find_by_id(endpoint_id);
if (!endpoint) {
return;
}
sub = get_subscription(endpoint);
if (!sub) {
return;
@ -417,11 +423,11 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup);
if (AST_VECTOR_SIZE(&sub->applications) == 0) {
if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_unlink(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint),
AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD,
messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP);
ast_rwlock_unlock(&tech_subscriptions_lock);
}
@ -429,9 +435,9 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi
ao2_unlock(sub);
ao2_ref(sub, -1);
ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n",
app_name, ast_endpoint_get_id(endpoint));
app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
}
static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint)
@ -442,12 +448,12 @@ static struct message_subscription *get_or_create_subscription(struct ast_endpoi
return sub;
}
sub = message_subscription_alloc(ast_endpoint_get_id(endpoint));
sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD);
if (!sub) {
return NULL;
}
if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) {
ao2_link(endpoint_subscriptions, sub);
} else {
ast_rwlock_wrlock(&tech_subscriptions_lock);
@ -482,9 +488,9 @@ int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint *
AST_VECTOR_APPEND(&sub->applications, tuple);
ao2_unlock(sub);
ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint));
ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --");
ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n",
app_name, ast_endpoint_get_id(endpoint));
app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL");
return 0;
}

@ -26,6 +26,14 @@
"required": true,
"allowMultiple": true,
"dataType": "string"
},
{
"name": "subscribeAll",
"description": "Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'.",
"paramType": "query",
"required": false,
"allowMultiple": false,
"dataType": "boolean"
}
]
}

Loading…
Cancel
Save