mirror of http://gerrit.asterisk.org/asterisk
In working with res_stasis, I discovered a significant limitation to the current structure of stasis_caching_topics: you cannot subscribe to cache updates for a single channel/bridge/endpoint/etc. To address this, this patch splits the cache away from the stasis_caching_topic, making it a first class object. The stasis_cache object is shared amongst individual stasis_caching_topics that are created per channel/endpoint/etc. These are still forwarded to global whatever_all_cached topics, so their use from most of the code does not change. In making these changes, I noticed that we frequently used a similar pattern for bridges, endpoints and channels: single_topic ----------------> all_topic ^ | single_topic_cached ----+----> all_topic_cached | +----> cache This pattern was extracted as the 'Stasis Caching Pattern', defined in stasis_caching_pattern.h. This avoids a lot of duplicate code between the different domain objects. Since the cache is now disassociated from its upstream caching topics, this also necessitated a change to how the 'guaranteed' flag worked for retrieving from a cache. The code for handling the caching guarantee was extracted into a 'stasis_topic_wait' function, which works for any stasis_topic. (closes issue ASTERISK-22002) Review: https://reviewboard.asterisk.org/r/2672/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@395954 65c4cc65-6c06-0410-ace0-fbb531ad65f3changes/78/78/1
parent
5c13969469
commit
e1b959ccbb
@ -0,0 +1,153 @@
|
||||
/*
|
||||
* Asterisk -- An open source telephony toolkit.
|
||||
*
|
||||
* Copyright (C) 2013, Digium, Inc.
|
||||
*
|
||||
* David M. Lee, II <dlee@digium.com>
|
||||
*
|
||||
* See http://www.asterisk.org for more information about
|
||||
* the Asterisk project. Please do not directly contact
|
||||
* any of the maintainers of this project for assistance;
|
||||
* the project provides a web site, mailing lists and IRC
|
||||
* channels for your use.
|
||||
*
|
||||
* This program is free software, distributed under the terms of
|
||||
* the GNU General Public License Version 2. See the LICENSE file
|
||||
* at the top of the source tree.
|
||||
*/
|
||||
|
||||
#ifndef _ASTERISK_STASIS_CACHE_PATTERN_H
|
||||
#define _ASTERISK_STASIS_CACHE_PATTERN_H
|
||||
|
||||
/*! \file
|
||||
*
|
||||
* \brief Caching pattern for \ref stasis topics.
|
||||
*
|
||||
* A typical pattern for Stasis objects is to have individual objects, which
|
||||
* have their own topic and caching topic. These individual topics feed an
|
||||
* upstream aggregate topics, and a shared cache.
|
||||
*
|
||||
* The \ref stasis_cp_all object contains the aggregate topics and shared cache.
|
||||
* This is built with the base name for the topics, and the identity function to
|
||||
* identify messages in the cache.
|
||||
*
|
||||
* The \ref stasis_cp_single object contains the \ref stasis_topic for a single
|
||||
* instance, and the corresponding \ref stasis_caching_topic.
|
||||
*
|
||||
* Since the \ref stasis_cp_single object has subscriptions for forwarding
|
||||
* and caching, it must be disposed of using stasis_cp_single_unsubscribe()
|
||||
* instead of simply ao2_cleanup().
|
||||
*/
|
||||
|
||||
#include "asterisk/stasis.h"
|
||||
|
||||
/*!
|
||||
* \brief The 'all' side of the cache pattern. These are typically built as
|
||||
* global objects for specific modules.
|
||||
*/
|
||||
struct stasis_cp_all;
|
||||
|
||||
/*!
|
||||
* \brief Create an all instance of the cache pattern.
|
||||
*
|
||||
* This object is AO2 managed, so dispose of it with ao2_cleanup().
|
||||
*
|
||||
* \param name Base name of the topics.
|
||||
* \param id_fn Identity function for the cache.
|
||||
* \return All side instance.
|
||||
* \return \c NULL on error.
|
||||
*/
|
||||
struct stasis_cp_all *stasis_cp_all_create(const char *name,
|
||||
snapshot_get_id id_fn);
|
||||
|
||||
/*!
|
||||
* \brief Get the aggregate topic.
|
||||
*
|
||||
* This topic aggregates all messages published to corresponding
|
||||
* stasis_cp_single_topic() topics.
|
||||
*
|
||||
* \param all All side caching pattern object.
|
||||
* \return The aggregate topic.
|
||||
* \return \c NULL if \a all is \c NULL
|
||||
*/
|
||||
struct stasis_topic *stasis_cp_all_topic(struct stasis_cp_all *all);
|
||||
|
||||
/*!
|
||||
* \brief Get the caching topic.
|
||||
*
|
||||
* This topic aggregates all messages from the corresponding
|
||||
* stasis_cp_single_topic_cached() topics.
|
||||
*
|
||||
* Note that one normally only subscribes to the caching topic, since data
|
||||
* is fed to it from its upstream topic.
|
||||
*
|
||||
* \param all All side caching pattern object.
|
||||
* \return The aggregate caching topic.
|
||||
* \return \c NULL if \a all is \c NULL
|
||||
*/
|
||||
struct stasis_topic *stasis_cp_all_topic_cached(
|
||||
struct stasis_cp_all *all);
|
||||
|
||||
/*!
|
||||
* \brief Get the cache.
|
||||
*
|
||||
* This is the shared cache for all corresponding \ref stasis_cp_single objects.
|
||||
*
|
||||
* \param all All side caching pattern object.
|
||||
* \return The cache.
|
||||
* \return \c NULL if \a all is \c NULL
|
||||
*/
|
||||
struct stasis_cache *stasis_cp_all_cache(struct stasis_cp_all *all);
|
||||
|
||||
/*!
|
||||
* \brief The 'one' side of the cache pattern. These are built per-instance for
|
||||
* some corresponding object, and must be explicitly disposed of using
|
||||
* stasis_cp_single_unsubscribe().
|
||||
*/
|
||||
struct stasis_cp_single;
|
||||
|
||||
/*!
|
||||
* \brief Create the 'one' side of the cache pattern.
|
||||
*
|
||||
* Dispose of using stasis_cp_single_unsubscribe().
|
||||
*
|
||||
* \param all Corresponding all side.
|
||||
* \param name Base name for the topics.
|
||||
* \return One side instance
|
||||
*/
|
||||
struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
|
||||
const char *name);
|
||||
|
||||
/*!
|
||||
* \brief Stops caching and forwarding messages.
|
||||
*
|
||||
* \param one One side of the cache pattern.
|
||||
*/
|
||||
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one);
|
||||
|
||||
/*!
|
||||
* \brief Get the topic for this instance.
|
||||
*
|
||||
* This is the topic to which one would post instance-specific messages, or
|
||||
* subscribe for single-instance, uncached messages.
|
||||
*
|
||||
* \param one One side of the cache pattern.
|
||||
* \return The main topic.
|
||||
* \return \c NULL if \a one is \c NULL
|
||||
*/
|
||||
struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one);
|
||||
|
||||
/*!
|
||||
* \brief Get the caching topic for this instance.
|
||||
*
|
||||
* Note that one normally only subscribes to the caching topic, since data
|
||||
* is fed to it from its upstream topic.
|
||||
*
|
||||
* \param one One side of the cache pattern.
|
||||
* \return The caching topic.
|
||||
* \return \c NULL if \a one is \c NULL
|
||||
*/
|
||||
struct stasis_topic *stasis_cp_single_topic_cached(
|
||||
struct stasis_cp_single *one);
|
||||
|
||||
#endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */
|
@ -0,0 +1,189 @@
|
||||
/*
|
||||
* Asterisk -- An open source telephony toolkit.
|
||||
*
|
||||
* Copyright (C) 2013, Digium, Inc.
|
||||
*
|
||||
* David M. Lee, II <dlee@digium.com>
|
||||
*
|
||||
* See http://www.asterisk.org for more information about
|
||||
* the Asterisk project. Please do not directly contact
|
||||
* any of the maintainers of this project for assistance;
|
||||
* the project provides a web site, mailing lists and IRC
|
||||
* channels for your use.
|
||||
*
|
||||
* This program is free software, distributed under the terms of
|
||||
* the GNU General Public License Version 2. See the LICENSE file
|
||||
* at the top of the source tree.
|
||||
*/
|
||||
|
||||
/*! \file
|
||||
*
|
||||
* \brief Typical cache pattern for Stasis topics.
|
||||
*
|
||||
* \author David M. Lee, II <dlee@digium.com>
|
||||
*/
|
||||
|
||||
/*** MODULEINFO
|
||||
<support_level>core</support_level>
|
||||
***/
|
||||
|
||||
#include "asterisk.h"
|
||||
|
||||
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
|
||||
|
||||
#include "asterisk/astobj2.h"
|
||||
#include "asterisk/stasis_cache_pattern.h"
|
||||
|
||||
struct stasis_cp_all {
|
||||
struct stasis_topic *topic;
|
||||
struct stasis_topic *topic_cached;
|
||||
struct stasis_cache *cache;
|
||||
};
|
||||
|
||||
struct stasis_cp_single {
|
||||
struct stasis_topic *topic;
|
||||
struct stasis_caching_topic *topic_cached;
|
||||
|
||||
struct stasis_subscription *forward;
|
||||
struct stasis_subscription *forward_cached;
|
||||
};
|
||||
|
||||
static void all_dtor(void *obj)
|
||||
{
|
||||
struct stasis_cp_all *all = obj;
|
||||
|
||||
ao2_cleanup(all->topic);
|
||||
ao2_cleanup(all->topic_cached);
|
||||
ao2_cleanup(all->cache);
|
||||
}
|
||||
|
||||
struct stasis_cp_all *stasis_cp_all_create(const char *name,
|
||||
snapshot_get_id id_fn)
|
||||
{
|
||||
RAII_VAR(char *, cached_name, NULL, ast_free);
|
||||
RAII_VAR(struct stasis_cp_all *, all, NULL, ao2_cleanup);
|
||||
|
||||
all = ao2_alloc(sizeof(*all), all_dtor);
|
||||
if (!all) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ast_asprintf(&cached_name, "%s-cached", name);
|
||||
if (!cached_name) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
all->topic = stasis_topic_create(name);
|
||||
all->topic_cached = stasis_topic_create(cached_name);
|
||||
all->cache = stasis_cache_create(id_fn);
|
||||
|
||||
if (!all->topic || !all->topic_cached || !all->cache) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ao2_ref(all, +1);
|
||||
return all;
|
||||
}
|
||||
|
||||
struct stasis_topic *stasis_cp_all_topic(struct stasis_cp_all *all)
|
||||
{
|
||||
if (!all) {
|
||||
return NULL;
|
||||
}
|
||||
return all->topic;
|
||||
}
|
||||
|
||||
struct stasis_topic *stasis_cp_all_topic_cached(
|
||||
struct stasis_cp_all *all)
|
||||
{
|
||||
if (!all) {
|
||||
return NULL;
|
||||
}
|
||||
return all->topic_cached;
|
||||
}
|
||||
|
||||
struct stasis_cache *stasis_cp_all_cache(struct stasis_cp_all *all)
|
||||
{
|
||||
if (!all) {
|
||||
return NULL;
|
||||
}
|
||||
return all->cache;
|
||||
}
|
||||
|
||||
static void one_dtor(void *obj)
|
||||
{
|
||||
struct stasis_cp_single *one = obj;
|
||||
|
||||
/* Should already be unsubscribed */
|
||||
ast_assert(one->topic_cached == NULL);
|
||||
ast_assert(one->forward == NULL);
|
||||
ast_assert(one->forward_cached == NULL);
|
||||
|
||||
ao2_cleanup(one->topic);
|
||||
one->topic = NULL;
|
||||
}
|
||||
|
||||
struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
|
||||
const char *name)
|
||||
{
|
||||
RAII_VAR(struct stasis_cp_single *, one, NULL, ao2_cleanup);
|
||||
|
||||
one = ao2_alloc(sizeof(*one), one_dtor);
|
||||
if (!one) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
one->topic = stasis_topic_create(name);
|
||||
if (!one->topic) {
|
||||
return NULL;
|
||||
}
|
||||
one->topic_cached = stasis_caching_topic_create(one->topic, all->cache);
|
||||
if (!one->topic_cached) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
one->forward = stasis_forward_all(one->topic, all->topic);
|
||||
if (!one->forward) {
|
||||
return NULL;
|
||||
}
|
||||
one->forward_cached = stasis_forward_all(
|
||||
stasis_caching_get_topic(one->topic_cached), all->topic_cached);
|
||||
if (!one->forward_cached) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ao2_ref(one, +1);
|
||||
return one;
|
||||
}
|
||||
|
||||
void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
|
||||
{
|
||||
if (!one) {
|
||||
return;
|
||||
}
|
||||
|
||||
stasis_caching_unsubscribe(one->topic_cached);
|
||||
one->topic_cached = NULL;
|
||||
stasis_unsubscribe(one->forward);
|
||||
one->forward = NULL;
|
||||
stasis_unsubscribe(one->forward_cached);
|
||||
one->forward_cached = NULL;
|
||||
}
|
||||
|
||||
struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one)
|
||||
{
|
||||
if (!one) {
|
||||
return NULL;
|
||||
}
|
||||
return one->topic;
|
||||
}
|
||||
|
||||
struct stasis_topic *stasis_cp_single_topic_cached(
|
||||
struct stasis_cp_single *one)
|
||||
{
|
||||
if (!one) {
|
||||
return NULL;
|
||||
}
|
||||
return stasis_caching_get_topic(one->topic_cached);
|
||||
}
|
||||
|
@ -0,0 +1,133 @@
|
||||
/*
|
||||
* Asterisk -- An open source telephony toolkit.
|
||||
*
|
||||
* Copyright (C) 2013, Digium, Inc.
|
||||
*
|
||||
* Joshua Colp <jcolp@digium.com>
|
||||
*
|
||||
* See http://www.asterisk.org for more information about
|
||||
* the Asterisk project. Please do not directly contact
|
||||
* any of the maintainers of this project for assistance;
|
||||
* the project provides a web site, mailing lists and IRC
|
||||
* channels for your use.
|
||||
*
|
||||
* This program is free software, distributed under the terms of
|
||||
* the GNU General Public License Version 2. See the LICENSE file
|
||||
* at the top of the source tree.
|
||||
*/
|
||||
|
||||
/*! \file
|
||||
*
|
||||
* \brief Wait support for Stasis topics.
|
||||
*
|
||||
* \author Joshua Colp <jcolp@digium.com>
|
||||
*/
|
||||
|
||||
/*** MODULEINFO
|
||||
<support_level>core</support_level>
|
||||
***/
|
||||
|
||||
#include "asterisk.h"
|
||||
|
||||
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
|
||||
|
||||
#include "asterisk/astobj2.h"
|
||||
#include "asterisk/stasis.h"
|
||||
|
||||
static struct stasis_message_type *cache_guarantee_type(void);
|
||||
STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
|
||||
|
||||
/*! \internal */
|
||||
struct caching_guarantee {
|
||||
ast_mutex_t lock;
|
||||
ast_cond_t cond;
|
||||
unsigned int done:1;
|
||||
};
|
||||
|
||||
static void caching_guarantee_dtor(void *obj)
|
||||
{
|
||||
struct caching_guarantee *guarantee = obj;
|
||||
|
||||
ast_assert(guarantee->done == 1);
|
||||
|
||||
ast_mutex_destroy(&guarantee->lock);
|
||||
ast_cond_destroy(&guarantee->cond);
|
||||
}
|
||||
|
||||
static void guarantee_handler(void *data, struct stasis_subscription *sub,
|
||||
struct stasis_topic *topic, struct stasis_message *message)
|
||||
{
|
||||
/* Wait for our particular message */
|
||||
if (data == message) {
|
||||
struct caching_guarantee *guarantee;
|
||||
ast_assert(cache_guarantee_type() == stasis_message_type(message));
|
||||
guarantee = stasis_message_data(message);
|
||||
|
||||
ast_mutex_lock(&guarantee->lock);
|
||||
guarantee->done = 1;
|
||||
ast_cond_signal(&guarantee->cond);
|
||||
ast_mutex_unlock(&guarantee->lock);
|
||||
}
|
||||
}
|
||||
|
||||
static struct stasis_message *caching_guarantee_create(void)
|
||||
{
|
||||
RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
||||
|
||||
if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ast_mutex_init(&guarantee->lock);
|
||||
ast_cond_init(&guarantee->cond, NULL);
|
||||
|
||||
if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ao2_ref(msg, +1);
|
||||
return msg;
|
||||
}
|
||||
|
||||
int stasis_topic_wait(struct stasis_topic *topic)
|
||||
{
|
||||
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
|
||||
struct caching_guarantee *guarantee;
|
||||
|
||||
msg = caching_guarantee_create();
|
||||
if (!msg) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
sub = stasis_subscribe(topic, guarantee_handler, msg);
|
||||
if (!sub) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
guarantee = stasis_message_data(msg);
|
||||
|
||||
ast_mutex_lock(&guarantee->lock);
|
||||
stasis_publish(topic, msg);
|
||||
while (!guarantee->done) {
|
||||
ast_cond_wait(&guarantee->cond, &guarantee->lock);
|
||||
}
|
||||
ast_mutex_unlock(&guarantee->lock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void wait_cleanup(void)
|
||||
{
|
||||
STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
|
||||
}
|
||||
|
||||
int stasis_wait_init(void)
|
||||
{
|
||||
ast_register_cleanup(wait_cleanup);
|
||||
|
||||
if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
Loading…
Reference in new issue