@ -48,6 +48,7 @@ struct stasis_cache {
snapshot_get_id id_fn ;
cache_aggregate_calc_fn aggregate_calc_fn ;
cache_aggregate_publish_fn aggregate_publish_fn ;
int registered ;
} ;
/*! \internal */
@ -69,6 +70,8 @@ static void stasis_caching_topic_dtor(void *obj)
* be bad . */
ast_assert ( stasis_subscription_is_done ( caching_topic - > sub ) ) ;
ao2_container_unregister ( stasis_topic_name ( caching_topic - > topic ) ) ;
ao2_cleanup ( caching_topic - > sub ) ;
caching_topic - > sub = NULL ;
ao2_cleanup ( caching_topic - > cache ) ;
@ -813,7 +816,31 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
}
msg_type = stasis_message_type ( message ) ;
if ( stasis_cache_clear_type ( ) = = msg_type ) {
if ( stasis_subscription_change_type ( ) = = msg_type ) {
struct stasis_subscription_change * change = stasis_message_data ( message ) ;
/*
* If this change type is an unsubscribe , we need to find the original
* subscribe and remove it from the cache otherwise the cache will
* continue to grow unabated .
*/
if ( strcmp ( change - > description , " Unsubscribe " ) = = 0 ) {
struct stasis_cache_entry * sub ;
ao2_wrlock ( caching_topic - > cache - > entries ) ;
sub = cache_find ( caching_topic - > cache - > entries , stasis_subscription_change_type ( ) , change - > uniqueid ) ;
if ( sub ) {
cache_remove ( caching_topic - > cache - > entries , sub , stasis_message_eid ( message ) ) ;
ao2_cleanup ( sub ) ;
}
ao2_unlock ( caching_topic - > cache - > entries ) ;
ao2_cleanup ( caching_topic_needs_unref ) ;
return ;
}
msg_put = message ;
msg = message ;
} else if ( stasis_cache_clear_type ( ) = = msg_type ) {
/* Cache clear event. */
msg_put = NULL ;
msg = stasis_message_data ( message ) ;
@ -866,6 +893,17 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
ao2_cleanup ( caching_topic_needs_unref ) ;
}
static void print_cache_entry ( void * v_obj , void * where , ao2_prnt_fn * prnt )
{
struct stasis_cache_entry * entry = v_obj ;
if ( ! entry ) {
return ;
}
prnt ( where , " Type: %s ID: %s Hash: %u " , stasis_message_type_name ( entry - > key . type ) ,
entry - > key . id , entry - > key . hash ) ;
}
struct stasis_caching_topic * stasis_caching_topic_create ( struct stasis_topic * original_topic , struct stasis_cache * cache )
{
struct stasis_caching_topic * caching_topic ;
@ -886,15 +924,24 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
}
caching_topic - > topic = stasis_topic_create ( new_name ) ;
ast_free ( new_name ) ;
if ( caching_topic - > topic = = NULL ) {
ao2_ref ( caching_topic , - 1 ) ;
ast_free ( new_name ) ;
return NULL ;
}
ao2_ref ( cache , + 1 ) ;
caching_topic - > cache = cache ;
if ( ! cache - > registered ) {
if ( ao2_container_register ( new_name , cache - > entries , print_cache_entry ) ) {
ast_log ( LOG_ERROR , " Stasis cache container '%p' for '%s' did not register \n " ,
cache - > entries , new_name ) ;
} else {
cache - > registered = 1 ;
}
}
ast_free ( new_name ) ;
caching_topic - > sub = internal_stasis_subscribe ( original_topic , caching_topic_exec , caching_topic , 0 , 0 ) ;
if ( caching_topic - > sub = = NULL ) {