diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 81a636dcd5..b14868b4a5 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -95,6 +95,11 @@ int stasis_message_router_is_done(struct stasis_message_router *router); /*! * \brief Add a route to a message router. * + * A particular \a message_type may have at most one route per \a router. If + * you route \ref stasis_cache_update messages, the callback will only receive + * updates for types not handled by routes added with + * stasis_message_router_add_cache_update(). + * * \param router Router to add the route to. * \param message_type Type of message to route. * \param callback Callback to forard messages of \a message_type to. @@ -106,9 +111,29 @@ int stasis_message_router_is_done(struct stasis_message_router *router); * \since 12 */ int stasis_message_router_add(struct stasis_message_router *router, - struct stasis_message_type *message_type, - stasis_subscription_cb callback, - void *data); + struct stasis_message_type *message_type, + stasis_subscription_cb callback, void *data); + +/*! + * \brief Add a route for \ref stasis_cache_update messages to a message router. + * + * A particular \a message_type may have at most one cache route per \a router. + * These are distinct from regular routes, so one could have both a regular + * route and a cache route for the same \a message_type. + * + * \param router Router to add the route to. + * \param message_type Subtype of cache update to route. + * \param callback Callback to forard messages of \a message_type to. + * \param data Data pointer to pass to \a callback. + * + * \retval 0 on success + * \retval -1 on failure + * + * \since 12 + */ +int stasis_message_router_add_cache_update(struct stasis_message_router *router, + struct stasis_message_type *message_type, + stasis_subscription_cb callback, void *data); /*! * \brief Remove a route from a message router. @@ -119,7 +144,19 @@ int stasis_message_router_add(struct stasis_message_router *router, * \since 12 */ void stasis_message_router_remove(struct stasis_message_router *router, - struct stasis_message_type *message_type); + struct stasis_message_type *message_type); + +/*! + * \brief Remove a cache route from a message router. + * + * \param router Router to remove the route from. + * \param message_type Type of message to route. + * + * \since 12 + */ +void stasis_message_router_remove_cache_update( + struct stasis_message_router *router, + struct stasis_message_type *message_type); /*! * \brief Sets the default route of a router. diff --git a/main/cdr.c b/main/cdr.c index 2e6209551d..633324206d 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -1967,9 +1967,7 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription struct cdr_object *it_cdr; ast_assert(update != NULL); - if (ast_channel_snapshot_type() != update->type) { - return; - } + ast_assert(ast_channel_snapshot_type() == update->type); old_snapshot = stasis_message_data(update->old_snapshot); new_snapshot = stasis_message_data(update->new_snapshot); @@ -4024,7 +4022,7 @@ int ast_cdr_engine_init(void) if (!stasis_router) { return -1; } - stasis_message_router_add(stasis_router, stasis_cache_update_type(), handle_channel_cache_message, NULL); + stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL); stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL); stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL); stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL); diff --git a/main/manager_bridging.c b/main/manager_bridging.c index ccad94785d..c24567eaa3 100644 --- a/main/manager_bridging.c +++ b/main/manager_bridging.c @@ -196,9 +196,7 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub, update = stasis_message_data(message); - if (ast_bridge_snapshot_type() != update->type) { - return; - } + ast_assert(ast_bridge_snapshot_type() == update->type); old_snapshot = stasis_message_data(update->old_snapshot); new_snapshot = stasis_message_data(update->new_snapshot); @@ -495,35 +493,22 @@ int manager_bridging_init(void) return -1; } - /* BUGBUG - This should really route off of the manager_router, but - * can't b/c manager_channels is already routing the - * stasis_cache_update_type() messages. Having a separate router can - * cause some message ordering issues with bridge and channel messages. - */ - bridge_state_router = stasis_message_router_create(bridge_topic); + bridge_state_router = ast_manager_get_message_router(); if (!bridge_state_router) { return -1; } - ret |= stasis_message_router_add(bridge_state_router, - stasis_cache_update_type(), - bridge_snapshot_update, - NULL); + ret |= stasis_message_router_add_cache_update(bridge_state_router, + ast_bridge_snapshot_type(), bridge_snapshot_update, NULL); ret |= stasis_message_router_add(bridge_state_router, - ast_bridge_merge_message_type(), - bridge_merge_cb, - NULL); + ast_bridge_merge_message_type(), bridge_merge_cb, NULL); ret |= stasis_message_router_add(bridge_state_router, - ast_channel_entered_bridge_type(), - channel_enter_cb, - NULL); + ast_channel_entered_bridge_type(), channel_enter_cb, NULL); ret |= stasis_message_router_add(bridge_state_router, - ast_channel_left_bridge_type(), - channel_leave_cb, - NULL); + ast_channel_left_bridge_type(), channel_leave_cb, NULL); ret |= ast_manager_register_xml_core("BridgeList", 0, manager_bridges_list); ret |= ast_manager_register_xml_core("BridgeInfo", 0, manager_bridge_info); diff --git a/main/manager_channels.c b/main/manager_channels.c index 6e8621973a..d26f0be06f 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -726,9 +726,7 @@ static void channel_snapshot_update(void *data, struct stasis_subscription *sub, update = stasis_message_data(message); - if (ast_channel_snapshot_type() != update->type) { - return; - } + ast_assert(ast_channel_snapshot_type() == update->type); old_snapshot = stasis_message_data(update->old_snapshot); new_snapshot = stasis_message_data(update->new_snapshot); @@ -1283,85 +1281,57 @@ int manager_channels_init(void) ast_register_atexit(manager_channels_shutdown); - ret |= stasis_message_router_add(message_router, - stasis_cache_update_type(), - channel_snapshot_update, - NULL); + ret |= stasis_message_router_add_cache_update(message_router, + ast_channel_snapshot_type(), channel_snapshot_update, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_user_event_type(), - channel_user_event_cb, - NULL); + ast_channel_user_event_type(), channel_user_event_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_dtmf_begin_type(), - channel_dtmf_begin_cb, - NULL); + ast_channel_dtmf_begin_type(), channel_dtmf_begin_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_dtmf_end_type(), - channel_dtmf_end_cb, - NULL); + ast_channel_dtmf_end_type(), channel_dtmf_end_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_hangup_request_type(), - channel_hangup_request_cb, - NULL); + ast_channel_hangup_request_type(), channel_hangup_request_cb, + NULL); ret |= stasis_message_router_add(message_router, - ast_channel_dial_type(), - channel_dial_cb, - NULL); + ast_channel_dial_type(), channel_dial_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_hold_type(), - channel_hold_cb, - NULL); + ast_channel_hold_type(), channel_hold_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_unhold_type(), - channel_unhold_cb, - NULL); + ast_channel_unhold_type(), channel_unhold_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_fax_type(), - channel_fax_cb, - NULL); + ast_channel_fax_type(), channel_fax_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_chanspy_start_type(), - channel_chanspy_start_cb, - NULL); + ast_channel_chanspy_start_type(), channel_chanspy_start_cb, + NULL); ret |= stasis_message_router_add(message_router, - ast_channel_chanspy_stop_type(), - channel_chanspy_stop_cb, - NULL); + ast_channel_chanspy_stop_type(), channel_chanspy_stop_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_hangup_handler_type(), - channel_hangup_handler_cb, - NULL); + ast_channel_hangup_handler_type(), channel_hangup_handler_cb, + NULL); ret |= stasis_message_router_add(message_router, - ast_channel_moh_start_type(), - channel_moh_start_cb, - NULL); + ast_channel_moh_start_type(), channel_moh_start_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_moh_stop_type(), - channel_moh_stop_cb, - NULL); + ast_channel_moh_stop_type(), channel_moh_stop_cb, NULL); ret |= stasis_message_router_add(message_router, - ast_channel_monitor_start_type(), - channel_monitor_start_cb, - NULL); + ast_channel_monitor_start_type(), channel_monitor_start_cb, + NULL); ret |= stasis_message_router_add(message_router, - ast_channel_monitor_stop_type(), - channel_monitor_stop_cb, - NULL); + ast_channel_monitor_stop_type(), channel_monitor_stop_cb, NULL); /* If somehow we failed to add any routes, just shut down the whole * thing and fail it. diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 4409d22265..26d2f2c0c7 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -34,6 +34,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis_message_router.h" +/*! Number of hash buckets for the route table. Keep it prime! */ +#define ROUTE_TABLE_BUCKETS 7 + /*! \internal */ struct stasis_message_route { /*! Message type handle by this route. */ @@ -75,6 +78,8 @@ struct stasis_message_router { struct stasis_subscription *subscription; /*! Subscribed routes */ struct ao2_container *routes; + /*! Subscribed routes for \ref stasi_cache_update messages */ + struct ao2_container *cache_routes; /*! Route of last resort */ struct stasis_message_route *default_route; }; @@ -90,13 +95,46 @@ static void router_dtor(void *obj) ao2_cleanup(router->routes); router->routes = NULL; + ao2_cleanup(router->cache_routes); + router->cache_routes = NULL; + ao2_cleanup(router->default_route); router->default_route = NULL; } -static struct stasis_message_route *find_route(struct stasis_message_router *router, struct stasis_message_type *message_type) +static struct stasis_message_route *find_route( + struct stasis_message_router *router, + struct stasis_message *message) { - return ao2_find(router->routes, message_type, OBJ_KEY); + RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); + struct stasis_message_type *type = stasis_message_type(message); + SCOPED_AO2LOCK(lock, router); + + if (type == stasis_cache_update_type()) { + /* Find a cache route */ + struct stasis_cache_update *update = + stasis_message_data(message); + route = ao2_find(router->cache_routes, update->type, OBJ_KEY); + } + + if (route == NULL) { + /* Find a regular route */ + route = ao2_find(router->routes, type, OBJ_KEY); + } + + if (route == NULL) { + /* Maybe the default route, then? */ + if ((route = router->default_route)) { + ao2_ref(route, +1); + } + } + + if (route == NULL) { + return NULL; + } + + ao2_ref(route, +1); + return route; } static void router_dispatch(void *data, @@ -105,29 +143,18 @@ static void router_dispatch(void *data, struct stasis_message *message) { struct stasis_message_router *router = data; - RAII_VAR(struct stasis_message_router *, router_needs_cleanup, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - struct stasis_message_type *type = stasis_message_type(message); - { - SCOPED_AO2LOCK(lock, router); - - if (!(route = find_route(router, type))) { - if ((route = router->default_route)) { - ao2_ref(route, +1); - } - } - } + route = find_route(router, message); if (route) { route->callback(route->data, sub, topic, message); } + if (stasis_subscription_final_message(sub, message)) { - router_needs_cleanup = router; - return; + ao2_cleanup(router); } - } struct stasis_message_router *stasis_message_router_create( @@ -140,7 +167,15 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - if (!(router->routes = ao2_container_alloc(7, route_hash, route_cmp))) { + router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash, + route_cmp); + if (!router->routes) { + return NULL; + } + + router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, + route_hash, route_cmp); + if (!router->cache_routes) { return NULL; } @@ -211,7 +246,10 @@ static int add_route(struct stasis_message_router *router, RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); SCOPED_AO2LOCK(lock, router); - if ((existing_route = find_route(router, route->message_type))) { + existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY); + + if (existing_route) { + ast_log(LOG_ERROR, "Cannot add route; route exists\n"); return -1; } @@ -219,10 +257,27 @@ static int add_route(struct stasis_message_router *router, return 0; } +static int add_cache_route(struct stasis_message_router *router, + struct stasis_message_route *route) +{ + RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, router); + + existing_route = ao2_find(router->cache_routes, route->message_type, + OBJ_KEY); + + if (existing_route) { + ast_log(LOG_ERROR, "Cannot add route; route exists\n"); + return -1; + } + + ao2_link(router->cache_routes, route); + return 0; +} + int stasis_message_router_add(struct stasis_message_router *router, - struct stasis_message_type *message_type, - stasis_subscription_cb callback, - void *data) + struct stasis_message_type *message_type, + stasis_subscription_cb callback, void *data) { RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); @@ -234,12 +289,37 @@ int stasis_message_router_add(struct stasis_message_router *router, return add_route(router, route); } +int stasis_message_router_add_cache_update(struct stasis_message_router *router, + struct stasis_message_type *message_type, + stasis_subscription_cb callback, void *data) +{ + RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); + + route = route_create(message_type, callback, data); + if (!route) { + return -1; + } + + return add_cache_route(router, route); +} + void stasis_message_router_remove(struct stasis_message_router *router, - struct stasis_message_type *message_type) + struct stasis_message_type *message_type) +{ + SCOPED_AO2LOCK(lock, router); + + ao2_find(router->routes, message_type, + OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); +} + +void stasis_message_router_remove_cache_update( + struct stasis_message_router *router, + struct stasis_message_type *message_type) { SCOPED_AO2LOCK(lock, router); - ao2_find(router->routes, message_type, OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); + ao2_find(router->cache_routes, message_type, + OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); } int stasis_message_router_set_default(struct stasis_message_router *router, diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 0dc9182a33..6636633555 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -230,7 +230,7 @@ static int consumer_wait_for_completion(struct consumer *consumer) { struct timeval start = ast_tvnow(); struct timespec end = { - .tv_sec = start.tv_sec + 30, + .tv_sec = start.tv_sec + 3, .tv_nsec = start.tv_usec * 1000 }; @@ -867,7 +867,7 @@ AST_TEST_DEFINE(cache_dump) AST_TEST_DEFINE(route_conflicts) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); @@ -913,7 +913,7 @@ AST_TEST_DEFINE(route_conflicts) AST_TEST_DEFINE(router) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); RAII_VAR(char *, test_data, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); @@ -1006,6 +1006,126 @@ AST_TEST_DEFINE(router) return AST_TEST_PASS; } +static const char *cache_simple(struct stasis_message *message) { + const char *type_name = + stasis_message_type_name(stasis_message_type(message)); + if (!ast_begins_with(type_name, "Cache")) { + return NULL; + } + + return "cached"; +} + +AST_TEST_DEFINE(router_cache_updates) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe); + RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup); + struct stasis_cache_update *update; + int actual_len, ret; + struct stasis_message *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test special handling cache_update messages"; + info->description = "Test special handling cache_update messages"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + caching_topic = stasis_caching_topic_create(topic, cache_simple); + ast_test_validate(test, NULL != caching_topic); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + consumer3 = consumer_create(1); + ast_test_validate(test, NULL != consumer3); + + test_message_type1 = stasis_message_type_create("Cache1", NULL); + ast_test_validate(test, NULL != test_message_type1); + test_message_type2 = stasis_message_type_create("Cache2", NULL); + ast_test_validate(test, NULL != test_message_type2); + test_message_type3 = stasis_message_type_create("NonCache", NULL); + ast_test_validate(test, NULL != test_message_type3); + + uut = stasis_message_router_create( + stasis_caching_get_topic(caching_topic)); + ast_test_validate(test, NULL != uut); + + ret = stasis_message_router_add_cache_update( + uut, test_message_type1, consumer_exec, consumer1); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer1, +1); + ret = stasis_message_router_add( + uut, stasis_cache_update_type(), consumer_exec, consumer2); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer2, +1); + ret = stasis_message_router_set_default(uut, consumer_exec, consumer3); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer3, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message1 = stasis_message_create(test_message_type1, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type2, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type3, test_data); + ast_test_validate(test, NULL != test_message3); + + stasis_publish(topic, test_message1); + stasis_publish(topic, test_message2); + stasis_publish(topic, test_message3); + + actual_len = consumer_wait_for(consumer1, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer2, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer3, 1); + ast_test_validate(test, 1 == actual_len); + + actual = consumer1->messages_rxed[0]; + ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual)); + update = stasis_message_data(actual); + ast_test_validate(test, test_message_type1 == update->type); + ast_test_validate(test, test_message1 == update->new_snapshot); + + actual = consumer2->messages_rxed[0]; + ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual)); + update = stasis_message_data(actual); + ast_test_validate(test, test_message_type2 == update->type); + ast_test_validate(test, test_message2 == update->new_snapshot); + + actual = consumer3->messages_rxed[0]; + ast_test_validate(test, test_message3 == actual); + + /* consumer1 and consumer2 do not get the final message. */ + ao2_cleanup(consumer1); + ao2_cleanup(consumer2); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(no_to_json) { RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup); @@ -1160,6 +1280,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache_dump); AST_TEST_UNREGISTER(route_conflicts); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(router_cache_updates); AST_TEST_UNREGISTER(interleaving); AST_TEST_UNREGISTER(no_to_json); AST_TEST_UNREGISTER(to_json); @@ -1181,6 +1302,7 @@ static int load_module(void) AST_TEST_REGISTER(cache_dump); AST_TEST_REGISTER(route_conflicts); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(router_cache_updates); AST_TEST_REGISTER(interleaving); AST_TEST_REGISTER(no_to_json); AST_TEST_REGISTER(to_json);