diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h index 27761351ad..2ea643e192 100644 --- a/include/asterisk/stasis_cache_pattern.h +++ b/include/asterisk/stasis_cache_pattern.h @@ -109,8 +109,6 @@ struct stasis_cp_single; /*! * \brief Create the 'one' side of the cache pattern. * - * Create the 'one' and forward to all's topic and topic_cached. - * * Dispose of using stasis_cp_single_unsubscribe(). * * \param all Corresponding all side. @@ -120,32 +118,6 @@ struct stasis_cp_single; struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, const char *name); -/*! - * \brief Create the 'one' side of the cache pattern. - * - * Create the 'one' but do not automatically forward. - * - * Dispose of using stasis_cp_single_unsubscribe(). - * - * \param all Corresponding all side. - * \param name Base name for the topics. - * \return One side instance - */ -struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all, - const char *name); - -/*! - * \brief Set up a topic and topic cache forward. - * - * Forward 'from' to 'to'. - * - * \param from Source 'one' side instance. - * \param to Destination 'one' side instance. - * \retval 0 Success - * \retval -1 Failure - */ -int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to); - /*! * \brief Stops caching and forwarding messages. * diff --git a/main/endpoints.c b/main/endpoints.c index 979d43e431..df9d289c7f 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -74,6 +74,8 @@ struct ast_endpoint { struct stasis_message_router *router; /*! ast_str_container of channels associated with this endpoint */ struct ao2_container *channel_ids; + /*! Forwarding subscription from an endpoint to its tech endpoint */ + struct stasis_forward *tech_forward; }; static int endpoint_hash(const void *obj, int flags) @@ -301,14 +303,13 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha return NULL; } - if (!ast_strlen_zero(resource)) { - - endpoint->topics = stasis_cp_single_create_only(ast_endpoint_cache_all(), - endpoint->id); - if (!endpoint->topics) { - return NULL; - } + endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), + endpoint->id); + if (!endpoint->topics) { + return NULL; + } + if (!ast_strlen_zero(resource)) { endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); if (!endpoint->router) { return NULL; @@ -322,19 +323,11 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha return NULL; } - if (stasis_cp_single_forward(endpoint->topics, tech_endpoint->topics)) { - return NULL; - } - + endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics), + stasis_cp_single_topic(tech_endpoint->topics)); endpoint_publish_snapshot(endpoint); ao2_link(endpoints, endpoint); } else { - endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(), - endpoint->id); - if (!endpoint->topics) { - return NULL; - } - ao2_link(tech_endpoints, endpoint); } @@ -382,6 +375,7 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) } ao2_unlink(endpoints, endpoint); + endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward); clear_msg = create_endpoint_snapshot_message(endpoint); if (clear_msg) { diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index d916a48b10..bbe63ba1de 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -138,30 +138,6 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, { RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup); - one = stasis_cp_single_create_only(all, name); - if (!one) { - return NULL; - } - - one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic); - if (!one->forward_topic_to_all) { - return NULL; - } - one->forward_cached_to_all = stasis_forward_all( - stasis_caching_get_topic(one->topic_cached), all->topic_cached); - if (!one->forward_cached_to_all) { - return NULL; - } - - ao2_ref(one, +1); - return one; -} - -struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all, - const char *name) -{ - RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup); - one = ao2_t_alloc(sizeof(*one), one_dtor, name); if (!one) { return NULL; @@ -176,25 +152,18 @@ struct stasis_cp_single *stasis_cp_single_create_only(struct stasis_cp_all *all, return NULL; } - ao2_ref(one, +1); - return one; -} - -int stasis_cp_single_forward(struct stasis_cp_single *from, struct stasis_cp_single *to) -{ - from->forward_topic_to_all = stasis_forward_all(from->topic, to->topic); - if (!from->forward_topic_to_all) { - return -1;; + one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic); + if (!one->forward_topic_to_all) { + return NULL; } - - from->forward_cached_to_all = stasis_forward_all( - stasis_caching_get_topic(from->topic_cached), - stasis_caching_get_topic(to->topic_cached)); - if (!from->forward_cached_to_all) { - return -1; + one->forward_cached_to_all = stasis_forward_all( + stasis_caching_get_topic(one->topic_cached), all->topic_cached); + if (!one->forward_cached_to_all) { + return NULL; } - return 0; + ao2_ref(one, +1); + return one; } void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)