Merge "stasis / manager / ari: Better filter messages." into 16

16.2
Joshua C. Colp 6 years ago committed by Gerrit Code Review
commit 7d1736b59b

@ -238,7 +238,7 @@ void stasis_message_router_remove_cache_update(
* \brief Sets the default route of a router.
*
* \param router Router to set the default route of.
* \param callback Callback to forard messages which otherwise have no home.
* \param callback Callback to forward messages which otherwise have no home.
* \param data Data pointer to pass to \a callback.
*
* \retval 0 on success
@ -254,6 +254,27 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
void *data);
/*!
* \brief Sets the default route of a router with formatters.
*
* \param router Router to set the default route of.
* \param callback Callback to forward messages which otherwise have no home.
* \param data Data pointer to pass to \a callback.
* \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
*
* \since 13.26.0
* \since 16.3.0
*
* \note If formatters are specified then the message router will remain in a selective
* filtering state. Any explicit routes will receive messages of their message type and
* the default callback will only receive messages that have one of the given formatters.
* Explicit routes will not be filtered according to the given formatters.
*/
void stasis_message_router_set_formatters_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
void *data,
enum stasis_subscription_message_formatters formatters);
/*!
* \brief Indicate to a message router that we are interested in messages with one or more formatters.
*

@ -8904,8 +8904,8 @@ static int manager_subscriptions_init(void)
stasis_message_router_set_congestion_limits(stasis_router, -1,
6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
res |= stasis_message_router_set_default(stasis_router,
manager_default_msg_cb, NULL);
stasis_message_router_set_formatters_default(stasis_router,
manager_default_msg_cb, NULL, STASIS_SUBSCRIPTION_FORMATTER_AMI);
res |= stasis_message_router_add(stasis_router,
ast_manager_get_generic_type(), manager_generic_msg_cb, NULL);

@ -1272,6 +1272,10 @@ int manager_channels_init(void)
ast_register_cleanup(manager_channels_shutdown);
/* The snapshot type has a special handler as it can result in multiple
* manager events being queued due to aspects of the snapshot itself
* changing.
*/
ret |= stasis_message_router_add_cache_update(message_router,
ast_channel_snapshot_type(), channel_snapshot_update, NULL);

@ -387,19 +387,34 @@ void stasis_message_router_remove_cache_update(
int stasis_message_router_set_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
void *data)
{
stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE);
/* While this implementation can never fail, it used to be able to */
return 0;
}
void stasis_message_router_set_formatters_default(struct stasis_message_router *router,
stasis_subscription_cb callback,
void *data,
enum stasis_subscription_message_formatters formatters)
{
ast_assert(router != NULL);
ast_assert(callback != NULL);
stasis_subscription_accept_formatters(router->subscription, formatters);
ao2_lock(router);
router->default_route.callback = callback;
router->default_route.data = data;
ao2_unlock(router);
stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
/* While this implementation can never fail, it used to be able to */
return 0;
if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
/* Formatters govern what messages the default callback get, so it is only if none is
* specified that we accept all messages regardless.
*/
stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
}
}
void stasis_message_router_accept_formatters(struct stasis_message_router *router,

@ -330,16 +330,25 @@ static void call_forwarded_handler(struct stasis_app *app, struct stasis_message
ast_channel_unref(chan);
}
static void sub_default_handler(void *data, struct stasis_subscription *sub,
static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
struct ast_json *json;
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(app);
}
}
static void sub_default_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
struct ast_json *json;
/* The dial type can be converted to JSON so it will always be passed
* here.
*/
if (stasis_message_type(message) == ast_channel_dial_type()) {
call_forwarded_handler(app, message);
}
@ -843,7 +852,7 @@ static void bridge_attended_transfer_handler(void *data, struct stasis_subscript
}
}
static void bridge_default_handler(void *data, struct stasis_subscription *sub,
static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_app *app = data;
@ -970,8 +979,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
res |= stasis_message_router_add(app->bridge_router,
ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
res |= stasis_message_router_set_default(app->bridge_router,
bridge_default_handler, app);
res |= stasis_message_router_add(app->bridge_router,
stasis_subscription_change_type(), bridge_subscription_change_handler, app);
if (res != 0) {
return NULL;
@ -993,8 +1002,11 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
res |= stasis_message_router_add_cache_update(app->router,
ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
res |= stasis_message_router_set_default(app->router,
sub_default_handler, app);
res |= stasis_message_router_add(app->router,
stasis_subscription_change_type(), sub_subscription_change_handler, app);
stasis_message_router_set_formatters_default(app->router,
sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
if (res != 0) {
return NULL;

Loading…
Cancel
Save