@ -36,6 +36,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
# include "asterisk/stasis_internal.h"
# include "asterisk/stasis.h"
# include "asterisk/utils.h"
# include "asterisk/vector.h"
# ifdef LOW_MEMORY
# define NUM_CACHE_BUCKETS 17
@ -47,6 +48,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
struct stasis_cache {
struct ao2_container * entries ;
snapshot_get_id id_fn ;
cache_aggregate_calc_fn aggregate_calc_fn ;
cache_aggregate_publish_fn aggregate_publish_fn ;
} ;
/*! \internal */
@ -124,29 +127,53 @@ struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_c
return NULL ;
}
struct cache_entry {
struct cache_entry _key {
struct stasis_message_type * type ;
char * id ;
struct stasis_message * snapshot ;
const char * id ;
} ;
struct stasis_cache_entry {
struct cache_entry_key key ;
/*! Aggregate snapshot of the stasis cache. */
struct stasis_message * aggregate ;
/*! Local entity snapshot of the stasis event. */
struct stasis_message * local ;
/*! Remote entity snapshots of the stasis event. */
AST_VECTOR ( , struct stasis_message * ) remote ;
} ;
static void cache_entry_dtor ( void * obj )
{
struct cache_entry * entry = obj ;
ao2_cleanup ( entry - > type ) ;
entry - > type = NULL ;
ast_free ( entry - > id ) ;
entry - > id = NULL ;
ao2_cleanup ( entry - > snapshot ) ;
entry - > snapshot = NULL ;
struct stasis_cache_entry * entry = obj ;
size_t idx ;
ao2_cleanup ( entry - > key . type ) ;
entry - > key . type = NULL ;
ast_free ( ( char * ) entry - > key . id ) ;
entry - > key . id = NULL ;
ao2_cleanup ( entry - > aggregate ) ;
entry - > aggregate = NULL ;
ao2_cleanup ( entry - > local ) ;
entry - > local = NULL ;
for ( idx = 0 ; idx < AST_VECTOR_SIZE ( & entry - > remote ) ; + + idx ) {
struct stasis_message * remote ;
remote = AST_VECTOR_GET ( & entry - > remote , idx ) ;
ao2_cleanup ( remote ) ;
}
AST_VECTOR_FREE ( & entry - > remote ) ;
}
static struct cache_entry * cache_entry_create ( struct stasis_message_type * type , const char * id , struct stasis_message * snapshot )
static struct stasis_ cache_entry * cache_entry_create ( struct stasis_message_type * type , const char * id , struct stasis_message * snapshot )
{
RAII_VAR ( struct cache_entry * , entry , NULL , ao2_cleanup ) ;
struct stasis_cache_entry * entry ;
int is_remote ;
ast_assert ( type ! = NULL ) ;
ast_assert ( id ! = NULL ) ;
ast_assert ( snapshot ! = NULL ) ;
entry = ao2_alloc_options ( sizeof ( * entry ) , cache_entry_dtor ,
AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
@ -154,173 +181,528 @@ static struct cache_entry *cache_entry_create(struct stasis_message_type *type,
return NULL ;
}
entry - > id = ast_strdup ( id ) ;
if ( ! entry - > id ) {
entry - > key . id = ast_strdup ( id ) ;
if ( ! entry - > key . id ) {
ao2_cleanup ( entry ) ;
return NULL ;
}
entry - > key . type = ao2_bump ( type ) ;
is_remote = ast_eid_cmp ( & ast_eid_default , stasis_message_eid ( snapshot ) ) ? 1 : 0 ;
if ( AST_VECTOR_INIT ( & entry - > remote , is_remote ) ) {
ao2_cleanup ( entry ) ;
return NULL ;
}
ao2_ref ( type , + 1 ) ;
entry - > type = type ;
if ( snapshot ! = NULL ) {
ao2_ref ( snapshot , + 1 ) ;
entry - > snapshot = snapshot ;
if ( is_remote ) {
if ( AST_VECTOR_APPEND ( & entry - > remote , snapshot ) ) {
ao2_cleanup ( entry ) ;
return NULL ;
}
} else {
entry - > local = snapshot ;
}
ao2_bump ( snapshot ) ;
ao2_ref ( entry , + 1 ) ;
return entry ;
}
static int cache_entry_hash ( const void * obj , int flags )
{
const struct cache_entry * entry = obj ;
const struct stasis_cache_entry * object ;
const struct cache_entry_key * key ;
int hash = 0 ;
ast_assert ( ! ( flags & OBJ_KEY ) ) ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_KEY :
key = obj ;
break ;
case OBJ_SEARCH_OBJECT :
object = obj ;
key = & object - > key ;
break ;
default :
/* Hash can only work on something with a full key. */
ast_assert ( 0 ) ;
return 0 ;
}
hash + = ast_hashtab_hash_string ( stasis_message_type_name ( entry - > type ) ) ;
hash + = ast_hashtab_hash_string ( entry - > id ) ;
hash + = ast_hashtab_hash_string ( stasis_message_type_name ( k ey- > type ) ) ;
hash + = ast_hashtab_hash_string ( k ey- > id ) ;
return hash ;
}
static int cache_entry_cmp ( void * obj , void * arg , int flags )
{
const struct cache_entry * left = obj ;
const struct cache_entry * right = arg ;
const struct stasis_cache_entry * object_left = obj ;
const struct stasis_cache_entry * object_right = arg ;
const struct cache_entry_key * right_key = obj ;
int cmp ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_OBJECT :
right_key = & object_right - > key ;
/* Fall through */
case OBJ_SEARCH_KEY :
cmp = object_left - > key . type ! = right_key - > type
| | strcmp ( object_left - > key . id , right_key - > id ) ;
break ;
case OBJ_SEARCH_PARTIAL_KEY :
/* Not supported by container */
ast_assert ( 0 ) ;
cmp = - 1 ;
break ;
default :
/*
* What arg points to is specific to this traversal callback
* and has no special meaning to astobj2 .
*/
cmp = 0 ;
break ;
}
if ( cmp ) {
return 0 ;
}
/*
* At this point the traversal callback is identical to a sorted
* container .
*/
return CMP_MATCH ;
}
static void cache_dtor ( void * obj )
{
struct stasis_cache * cache = obj ;
ast_assert ( ! ( flags & OBJ_KEY ) ) ;
ao2_cleanup ( cache - > entries ) ;
cache - > entries = NULL ;
}
if ( left - > type = = right - > type & & strcmp ( left - > id , right - > id ) = = 0 ) {
return CMP_MATCH | CMP_STOP ;
struct stasis_cache * stasis_cache_create_full ( snapshot_get_id id_fn ,
cache_aggregate_calc_fn aggregate_calc_fn ,
cache_aggregate_publish_fn aggregate_publish_fn )
{
struct stasis_cache * cache ;
cache = ao2_alloc_options ( sizeof ( * cache ) , cache_dtor ,
AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
if ( ! cache ) {
return NULL ;
}
return 0 ;
cache - > entries = ao2_container_alloc_hash ( AO2_ALLOC_OPT_LOCK_RWLOCK , 0 ,
NUM_CACHE_BUCKETS , cache_entry_hash , NULL , cache_entry_cmp ) ;
if ( ! cache - > entries ) {
ao2_cleanup ( cache ) ;
return NULL ;
}
cache - > id_fn = id_fn ;
cache - > aggregate_calc_fn = aggregate_calc_fn ;
cache - > aggregate_publish_fn = aggregate_publish_fn ;
return cache ;
}
static void cache_dtor ( void * obj )
st ruct stasis_cache * stasis_cache_create ( snapshot_get_id id_fn )
{
struct stasis_cache * cache = obj ;
return stasis_cache_create_full ( id_fn , NULL , NULL ) ;
}
ao2_cleanup ( cache - > entries ) ;
cache - > entries = NULL ;
struct stasis_message * stasis_cache_entry_get_aggregate ( struct stasis_cache_entry * entry )
{
return entry - > aggregate ;
}
struct stasis_cache * stasis_cache_create ( snapshot_get_id id_fn )
struct stasis_ message * stasis_cache_entry_get_local ( struct stasis_cache_entry * entry )
{
RAII_VAR ( struct stasis_cache * , cache , NULL , ao2_cleanup ) ;
return entry - > local ;
}
cache = ao2_alloc_options ( sizeof ( * cache ) , cache_dtor ,
AO2_ALLOC_OPT_LOCK_NOLOCK ) ;
if ( ! cache ) {
return NULL ;
}
struct stasis_message * stasis_cache_entry_get_remote ( struct stasis_cache_entry * entry , int idx )
{
if ( idx < AST_VECTOR_SIZE ( & entry - > remote ) ) {
return AST_VECTOR_GET ( & entry - > remote , idx ) ;
}
return NULL ;
}
cache - > entries = ao2_container_alloc ( NUM_CACHE_BUCKETS , cache_entry_hash ,
cache_entry_cmp ) ;
if ( ! cache - > entries ) {
return NULL ;
}
/*!
* \ internal
* \ brief Find the cache entry in the cache entries container .
*
* \ param entries Container of cached entries .
* \ param type Type of message to retrieve the cache entry .
* \ param id Identity of the snapshot to retrieve the cache entry .
*
* \ note The entries container is already locked .
*
* \ retval Cache - entry on success .
* \ retval NULL Not in cache .
*/
static struct stasis_cache_entry * cache_find ( struct ao2_container * entries , struct stasis_message_type * type , const char * id )
{
struct cache_entry_key search_key ;
search_key . type = type ;
search_key . id = id ;
return ao2_find ( entries , & search_key , OBJ_SEARCH_KEY | OBJ_NOLOCK ) ;
}
/*!
* \ internal
* \ brief Remove the stasis snapshot in the cache entry determined by eid .
*
* \ param entries Container of cached entries .
* \ param cached_entry The entry to remove the snapshot from .
* \ param eid Which snapshot in the cached entry .
*
* \ note The entries container is already locked .
*
* \ return Previous stasis entry snapshot .
*/
static struct stasis_message * cache_remove ( struct ao2_container * entries , struct stasis_cache_entry * cached_entry , const struct ast_eid * eid )
{
struct stasis_message * old_snapshot ;
int is_remote ;
is_remote = ast_eid_cmp ( eid , & ast_eid_default ) ;
if ( ! is_remote ) {
old_snapshot = cached_entry - > local ;
cached_entry - > local = NULL ;
} else {
int idx ;
old_snapshot = NULL ;
for ( idx = 0 ; idx < AST_VECTOR_SIZE ( & cached_entry - > remote ) ; + + idx ) {
struct stasis_message * cur ;
cur = AST_VECTOR_GET ( & cached_entry - > remote , idx ) ;
if ( ! ast_eid_cmp ( eid , stasis_message_eid ( cur ) ) ) {
old_snapshot = AST_VECTOR_REMOVE_UNORDERED ( & cached_entry - > remote , idx ) ;
break ;
}
}
}
if ( ! cached_entry - > local & & ! AST_VECTOR_SIZE ( & cached_entry - > remote ) ) {
ao2_unlink_flags ( entries , cached_entry , OBJ_NOLOCK ) ;
}
return old_snapshot ;
}
cache - > id_fn = id_fn ;
/*!
* \ internal
* \ brief Update the stasis snapshot in the cache entry determined by eid .
*
* \ param cached_entry The entry to remove the snapshot from .
* \ param eid Which snapshot in the cached entry .
* \ param new_snapshot Snapshot to replace the old snapshot .
*
* \ return Previous stasis entry snapshot .
*/
static struct stasis_message * cache_udpate ( struct stasis_cache_entry * cached_entry , const struct ast_eid * eid , struct stasis_message * new_snapshot )
{
struct stasis_message * old_snapshot ;
int is_remote ;
int idx ;
is_remote = ast_eid_cmp ( eid , & ast_eid_default ) ;
if ( ! is_remote ) {
old_snapshot = cached_entry - > local ;
cached_entry - > local = ao2_bump ( new_snapshot ) ;
return old_snapshot ;
}
old_snapshot = NULL ;
for ( idx = 0 ; idx < AST_VECTOR_SIZE ( & cached_entry - > remote ) ; + + idx ) {
struct stasis_message * cur ;
ao2_ref ( cache , + 1 ) ;
return cache ;
cur = AST_VECTOR_GET ( & cached_entry - > remote , idx ) ;
if ( ! ast_eid_cmp ( eid , stasis_message_eid ( cur ) ) ) {
old_snapshot = AST_VECTOR_REMOVE_UNORDERED ( & cached_entry - > remote , idx ) ;
break ;
}
}
if ( ! AST_VECTOR_APPEND ( & cached_entry - > remote , new_snapshot ) ) {
ao2_bump ( new_snapshot ) ;
}
return old_snapshot ;
}
static struct stasis_message * cache_put ( struct stasis_cache * cache ,
struct stasis_message_type * type , const char * id ,
struct cache_put_snapshots {
/*! Old cache eid snapshot. */
struct stasis_message * old ;
/*! Old cache aggregate snapshot. */
struct stasis_message * aggregate_old ;
/*! New cache aggregate snapshot. */
struct stasis_message * aggregate_new ;
} ;
static struct cache_put_snapshots cache_put ( struct stasis_cache * cache ,
struct stasis_message_type * type , const char * id , const struct ast_eid * eid ,
struct stasis_message * new_snapshot )
{
RAII_VAR ( struct cache_entry * , new_entry , NULL , ao2_cleanup ) ;
RAII_VAR ( struct cache_entry * , cached_entry , NULL , ao2_cleanup ) ;
struct stasis_message * old_snapshot = NULL ;
struct stasis_cache_entry * cached_entry ;
struct cache_put_snapshots snapshots ;
ast_assert ( cache - > entries ! = NULL ) ;
ast_assert ( eid ! = NULL ) ; /* Aggregate snapshots not allowed to be put directly. */
ast_assert ( new_snapshot = = NULL | |
type = = stasis_message_type ( new_snapshot ) ) ;
new_entry = cache_entry_create ( type , id , new_snapshot ) ;
memset ( & snapshots , 0 , sizeof ( snapshots ) ) ;
ao2_wrlock ( cache - > entries ) ;
cached_entry = cache_find ( cache - > entries , type , id ) ;
if ( new_snapshot = = NULL ) {
/* Remove entry from cache */
cached_entry = ao2_find ( cache - > entries , new_entry , OBJ_POINTER | OBJ_UNLINK ) ;
/* Update the eid snapshot. */
if ( ! new_snapshot ) {
/* Remove snapshot from cache */
if ( cached_entry ) {
old_snapshot = cached_entry - > snapshot ;
cached_entry - > snapshot = NULL ;
snapshots . old = cache_remove ( cache - > entries , cached_entry , eid ) ;
}
} else if ( cached_entry ) {
/* Update snapshot in cache */
snapshots . old = cache_udpate ( cached_entry , eid , new_snapshot ) ;
} else {
/* Insert/update cache */
SCOPED_AO2LOCK ( lock , cache - > entries ) ;
cached_entry = ao2_find ( cache - > entries , new_entry , OBJ_POINTER | OBJ_NOLOCK ) ;
/* Insert into the cache */
cached_entry = cache_entry_create ( type , id , new_snapshot ) ;
if ( cached_entry ) {
/* Update cache. Because objects are moving, no need to update refcounts. */
old_snapshot = cached_entry - > snapshot ;
cached_entry - > snapshot = new_entry - > snapshot ;
new_entry - > snapshot = NULL ;
} else {
/* Insert into the cache */
ao2_link_flags ( cache - > entries , new_entry , OBJ_NOLOCK ) ;
ao2_link_flags ( cache - > entries , cached_entry , OBJ_NOLOCK ) ;
}
}
/* Update the aggregate snapshot. */
if ( cache - > aggregate_calc_fn & & cached_entry ) {
snapshots . aggregate_new = cache - > aggregate_calc_fn ( cached_entry , new_snapshot ) ;
snapshots . aggregate_old = cached_entry - > aggregate ;
cached_entry - > aggregate = ao2_bump ( snapshots . aggregate_new ) ;
}
return old_snapshot ;
ao2_unlock ( cache - > entries ) ;
ao2_cleanup ( cached_entry ) ;
return snapshots ;
}
struct stasis_message * stasis_cache_get ( struct stasis_cache * cache , struct stasis_message_type * type , const char * id )
/*!
* \ internal
* \ brief Dump all entity snapshots in the cache entry into the given container .
*
* \ param snapshots Container to put all snapshots in the cache entry .
* \ param entry Cache entry to use .
*
* \ retval 0 on success .
* \ retval non - zero on error .
*/
static int cache_entry_dump ( struct ao2_container * snapshots , const struct stasis_cache_entry * entry )
{
int idx ;
int err = 0 ;
ast_assert ( snapshots ! = NULL ) ;
ast_assert ( entry ! = NULL ) ;
/* The aggregate snapshot is not a snapshot from an entity. */
if ( entry - > local ) {
err | = ! ao2_link ( snapshots , entry - > local ) ;
}
for ( idx = 0 ; ! err & & idx < AST_VECTOR_SIZE ( & entry - > remote ) ; + + idx ) {
struct stasis_message * snapshot ;
snapshot = AST_VECTOR_GET ( & entry - > remote , idx ) ;
err | = ! ao2_link ( snapshots , snapshot ) ;
}
return err ;
}
struct ao2_container * stasis_cache_get_all ( struct stasis_cache * cache , struct stasis_message_type * type , const char * id )
{
RAII_VAR ( struct cache_entry * , search_entry , NULL , ao2_cleanup ) ;
RAII_VAR ( struct cache_entry * , cached_entry , NULL , ao2_cleanup ) ;
struct stasis_cache_entry * cached_entry ;
struct ao2_container * found ;
ast_assert ( cache ! = NULL ) ;
ast_assert ( cache - > entries ! = NULL ) ;
ast_assert ( type ! = NULL ) ;
ast_assert ( id ! = NULL ) ;
search_entry = cache_entry_create ( type , id , NULL ) ;
if ( search_entry = = NULL ) {
found = ao2_container_alloc_list ( AO2_ALLOC_OPT_LOCK_NOLOCK , 0 , NULL , NULL ) ;
if ( ! found ) {
return NULL ;
}
cached_entry = ao2_find ( cache - > entries , search_entry , OBJ_POINTER ) ;
if ( cached_entry = = NULL ) {
return NULL ;
ao2_rdlock ( cache - > entries ) ;
cached_entry = cache_find ( cache - > entries , type , id ) ;
if ( cached_entry & & cache_entry_dump ( found , cached_entry ) ) {
ao2_cleanup ( found ) ;
found = NULL ;
}
ao2_unlock ( cache - > entries ) ;
ao2_cleanup ( cached_entry ) ;
return found ;
}
/*!
* \ internal
* \ brief Retrieve an item from the cache entry for a specific eid .
*
* \ param entry Cache entry to use .
* \ param eid Specific entity id to retrieve . NULL for aggregate .
*
* \ note The returned snapshot has not had its reference bumped .
*
* \ retval Snapshot from the cache .
* \ retval \ c NULL if snapshot is not found .
*/
static struct stasis_message * cache_entry_by_eid ( const struct stasis_cache_entry * entry , const struct ast_eid * eid )
{
int is_remote ;
int idx ;
if ( ! eid ) {
/* Get aggregate. */
return entry - > aggregate ;
}
/* Get snapshot with specific eid. */
is_remote = ast_eid_cmp ( eid , & ast_eid_default ) ;
if ( ! is_remote ) {
return entry - > local ;
}
ast_assert ( cached_entry - > snapshot ! = NULL ) ;
ao2_ref ( cached_entry - > snapshot , + 1 ) ;
return cached_entry - > snapshot ;
for ( idx = 0 ; idx < AST_VECTOR_SIZE ( & entry - > remote ) ; + + idx ) {
struct stasis_message * cur ;
cur = AST_VECTOR_GET ( & entry - > remote , idx ) ;
if ( ! ast_eid_cmp ( eid , stasis_message_eid ( cur ) ) ) {
return cur ;
}
}
return NULL ;
}
struct stasis_message * stasis_cache_get_by_eid ( struct stasis_cache * cache , struct stasis_message_type * type , const char * id , const struct ast_eid * eid )
{
struct stasis_cache_entry * cached_entry ;
struct stasis_message * snapshot = NULL ;
ast_assert ( cache ! = NULL ) ;
ast_assert ( cache - > entries ! = NULL ) ;
ast_assert ( type ! = NULL ) ;
ast_assert ( id ! = NULL ) ;
ao2_rdlock ( cache - > entries ) ;
cached_entry = cache_find ( cache - > entries , type , id ) ;
if ( cached_entry ) {
snapshot = cache_entry_by_eid ( cached_entry , eid ) ;
ao2_bump ( snapshot ) ;
}
ao2_unlock ( cache - > entries ) ;
ao2_cleanup ( cached_entry ) ;
return snapshot ;
}
struct stasis_message * stasis_cache_get ( struct stasis_cache * cache , struct stasis_message_type * type , const char * id )
{
return stasis_cache_get_by_eid ( cache , type , id , & ast_eid_default ) ;
}
struct cache_dump_data {
struct ao2_container * cached ;
struct ao2_container * c ontainer ;
struct stasis_message_type * type ;
const struct ast_eid * eid ;
} ;
static int cache_dump_cb ( void * obj , void * arg , int flags )
static int cache_dump_ by_eid_ cb( void * obj , void * arg , int flags )
{
struct cache_dump_data * cache_dump = arg ;
struct cache_entry * entry = obj ;
struct stasis_cache_entry * entry = obj ;
if ( ! cache_dump - > type | | entry - > key . type = = cache_dump - > type ) {
struct stasis_message * snapshot ;
if ( ! cache_dump - > type | | entry - > type = = cache_dump - > type ) {
ao2_link ( cache_dump - > cached , entry - > snapshot ) ;
snapshot = cache_entry_by_eid ( entry , cache_dump - > eid ) ;
if ( snapshot ) {
if ( ! ao2_link ( cache_dump - > container , snapshot ) ) {
ao2_cleanup ( cache_dump - > container ) ;
cache_dump - > container = NULL ;
return CMP_STOP ;
}
}
}
return 0 ;
}
struct ao2_container * stasis_cache_dump_by_eid ( struct stasis_cache * cache , struct stasis_message_type * type , const struct ast_eid * eid )
{
struct cache_dump_data cache_dump ;
ast_assert ( cache ! = NULL ) ;
ast_assert ( cache - > entries ! = NULL ) ;
cache_dump . eid = eid ;
cache_dump . type = type ;
cache_dump . container = ao2_container_alloc_list ( AO2_ALLOC_OPT_LOCK_NOLOCK , 0 , NULL , NULL ) ;
if ( ! cache_dump . container ) {
return NULL ;
}
ao2_callback ( cache - > entries , OBJ_MULTIPLE | OBJ_NODATA , cache_dump_by_eid_cb , & cache_dump ) ;
return cache_dump . container ;
}
struct ao2_container * stasis_cache_dump ( struct stasis_cache * cache , struct stasis_message_type * type )
{
return stasis_cache_dump_by_eid ( cache , type , & ast_eid_default ) ;
}
static int cache_dump_all_cb ( void * obj , void * arg , int flags )
{
struct cache_dump_data * cache_dump = arg ;
struct stasis_cache_entry * entry = obj ;
if ( ! cache_dump - > type | | entry - > key . type = = cache_dump - > type ) {
if ( cache_entry_dump ( cache_dump - > container , entry ) ) {
ao2_cleanup ( cache_dump - > container ) ;
cache_dump - > container = NULL ;
return CMP_STOP ;
}
}
return 0 ;
}
struct ao2_container * stasis_cache_dump_all ( struct stasis_cache * cache , struct stasis_message_type * type )
{
struct cache_dump_data cache_dump ;
ast_assert ( cache ! = NULL ) ;
ast_assert ( cache - > entries ! = NULL ) ;
cache_dump . eid = NULL ;
cache_dump . type = type ;
cache_dump . cached = ao2_container_alloc_options (
AO2_ALLOC_OPT_LOCK_NOLOCK , 1 , NULL , NULL ) ;
if ( ! cache_dump . cached ) {
cache_dump . container = ao2_container_alloc_list ( AO2_ALLOC_OPT_LOCK_NOLOCK , 0 , NULL , NULL ) ;
if ( ! cache_dump . container ) {
return NULL ;
}
ao2_callback ( cache - > entries , OBJ_MULTIPLE | OBJ_NODATA , cache_dump_cb , & cache_dump ) ;
return cache_dump . cached ;
ao2_callback ( cache - > entries , OBJ_MULTIPLE | OBJ_NODATA , cache_dump_ all_ cb, & cache_dump ) ;
return cache_dump . c ontainer ;
}
STASIS_MESSAGE_TYPE_DEFN ( stasis_cache_clear_type ) ;
@ -380,9 +762,13 @@ static struct stasis_message *update_create(struct stasis_message *old_snapshot,
static void caching_topic_exec ( void * data , struct stasis_subscription * sub ,
struct stasis_message * message )
{
RAII_VAR ( struct stasis_caching_topic * , caching_topic_needs_unref , NULL , ao2_cleanup ) ;
struct stasis_caching_topic * caching_topic_needs_unref ;
struct stasis_caching_topic * caching_topic = data ;
const char * id = NULL ;
struct stasis_message * msg ;
struct stasis_message * msg_put ;
struct stasis_message_type * msg_type ;
const struct ast_eid * msg_eid ;
const char * msg_id ;
ast_assert ( caching_topic ! = NULL ) ;
ast_assert ( caching_topic - > topic ! = NULL ) ;
@ -391,50 +777,62 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
if ( stasis_subscription_final_message ( sub , message ) ) {
caching_topic_needs_unref = caching_topic ;
} else {
caching_topic_needs_unref = NULL ;
}
/* Handle cache clear event */
if ( stasis_cache_clear_type ( ) = = stasis_message_type ( message ) ) {
RAII_VAR ( struct stasis_message * , old_snapshot , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , update , NULL , ao2_cleanup ) ;
struct stasis_message * clear_msg = stasis_message_data ( message ) ;
const char * clear_id = caching_topic - > cache - > id_fn ( clear_msg ) ;
struct stasis_message_type * clear_type = stasis_message_type ( clear_msg ) ;
msg_type = stasis_message_type ( message ) ;
if ( stasis_cache_clear_type ( ) = = msg_type ) {
/* Cache clear event. */
msg_put = NULL ;
msg = stasis_message_data ( message ) ;
msg_type = stasis_message_type ( msg ) ;
} else {
/* Normal cache update event. */
msg_put = message ;
msg = message ;
}
ast_assert ( msg_type ! = NULL ) ;
ast_assert ( clear_type ! = NULL ) ;
msg_eid = stasis_message_eid ( msg ) ; /* msg_eid is NULL for aggregate message. */
msg_id = caching_topic - > cache - > id_fn ( msg ) ;
if ( msg_id & & msg_eid ) {
struct stasis_message * update ;
struct cache_put_snapshots snapshots ;
if ( clear_id ) {
old_snapshot = cache_put ( caching_topic - > cache , clear_type , clear_id , NULL ) ;
if ( old_snapshot ) {
update = update_create ( old_snapshot , NULL ) ;
/* Update the cache */
snapshots = cache_put ( caching_topic - > cache , msg_type , msg_id , msg_eid , msg_put ) ;
if ( snapshots . old | | msg_put ) {
update = update_create ( snapshots . old , msg_put ) ;
if ( update ) {
stasis_publish ( caching_topic - > topic , update ) ;
return ;
}
ao2_cleanup ( update ) ;
} else {
ast_log ( LOG_ERROR ,
" Attempting to remove an item from the %s cache that isn't there: %s %s \n " ,
stasis_topic_name ( caching_topic - > topic ) , stasis_message_type_name ( clear_type ) , clear_id ) ;
return ;
stasis_topic_name ( caching_topic - > topic ) ,
stasis_message_type_name ( msg_type ) , msg_id ) ;
}
}
id = caching_topic - > cache - > id_fn ( message ) ;
if ( id = = NULL ) {
/* Object isn't cached; discard */
} else {
/* Update the cache */
RAII_VAR ( struct stasis_message * , old_snapshot , NULL , ao2_cleanup ) ;
RAII_VAR ( struct stasis_message * , update , NULL , ao2_cleanup ) ;
old_snapshot = cache_put ( caching_topic - > cache , stasis_message_type ( message ) , id , message ) ;
update = update_create ( old_snapshot , message ) ;
if ( update = = NULL ) {
return ;
if ( snapshots . aggregate_old ! = snapshots . aggregate_new ) {
if ( snapshots . aggregate_new & & caching_topic - > cache - > aggregate_publish_fn ) {
caching_topic - > cache - > aggregate_publish_fn ( caching_topic - > original_topic ,
snapshots . aggregate_new ) ;
}
update = update_create ( snapshots . aggregate_old , snapshots . aggregate_new ) ;
if ( update ) {
stasis_publish ( caching_topic - > topic , update ) ;
}
ao2_cleanup ( update ) ;
}
stasis_publish ( caching_topic - > topic , update ) ;
ao2_cleanup ( snapshots . old ) ;
ao2_cleanup ( snapshots . aggregate_old ) ;
ao2_cleanup ( snapshots . aggregate_new ) ;
}
ao2_cleanup ( caching_topic_needs_unref ) ;
}
struct stasis_caching_topic * stasis_caching_topic_create ( struct stasis_topic * original_topic , struct stasis_cache * cache )