@ -33,6 +33,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
# include "asterisk/astobj2.h"
# include "asterisk/stasis_message_router.h"
# include "asterisk/vector.h"
/*! \internal */
struct stasis_message_route {
@ -44,19 +45,13 @@ struct stasis_message_route {
void * data ;
} ;
struct route_table {
/*! Current number of entries in the route table */
size_t current_size ;
/*! Allocated number of entires in the route table */
size_t max_size ;
/*! The route table itself */
struct stasis_message_route routes [ ] ;
} ;
ast_vector ( route_table , struct stasis_message_route ) ;
static struct stasis_message_route * table_find_route ( struct route_table * table ,
static struct stasis_message_route * route_table_find ( struct route_table * table ,
struct stasis_message_type * message_type )
{
size_t idx ;
struct stasis_message_route * route ;
/* While a linear search for routes may seem very inefficient, most
* route tables have six routes or less . For such small data , it ' s
@ -64,59 +59,74 @@ static struct stasis_message_route *table_find_route(struct route_table *table,
* tables , then we can look into containers with more efficient
* lookups .
*/
for ( idx = 0 ; idx < table - > current_size ; + + idx ) {
if ( table - > routes [ idx ] . message_type = = message_type ) {
return & table - > routes [ idx ] ;
for ( idx = 0 ; idx < ast_vector_size ( table ) ; + + idx ) {
route = ast_vector_get_addr ( table , idx ) ;
if ( route - > message_type = = message_type ) {
return route ;
}
}
return NULL ;
}
static int table_add_route ( struct route_table * * table_ptr ,
/*!
* \ brief route_table comparator for ast_vector_remove_cmp_unordered ( )
*
* \ param elem Element to compare against
* \ param value Value to compare with the vector element .
*
* \ return 0 if element does not match .
* \ return Non - zero if element matches .
*/
# define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
/*!
* \ brief route_table vector element cleanup .
*
* \ param elem Element to cleanup
*
* \ return Nothing
*/
# define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
static int route_table_remove ( struct route_table * table ,
struct stasis_message_type * message_type )
{
return ast_vector_remove_cmp_unordered ( table , message_type , ROUTE_TABLE_ELEM_CMP ,
ROUTE_TABLE_ELEM_CLEANUP ) ;
}
static int route_table_add ( struct route_table * table ,
struct stasis_message_type * message_type ,
stasis_subscription_cb callback , void * data )
{
struct route_table * table = * table_ptr ;
struct stasis_message_route * route ;
ast_assert ( table_find_route ( table , message_type ) = = NULL ) ;
if ( table - > current_size + 1 > table - > max_size ) {
size_t new_max_size = table - > max_size ? table - > max_size * 2 : 1 ;
struct route_table * new_table = ast_realloc ( table ,
sizeof ( * new_table ) +
sizeof ( new_table - > routes [ 0 ] ) * new_max_size ) ;
if ( ! new_table ) {
return - 1 ;
}
* table_ptr = table = new_table ;
table - > max_size = new_max_size ;
}
struct stasis_message_route route ;
int res ;
route = & table - > routes [ table - > current_size + + ] ;
ast_assert ( callback ! = NULL ) ;
ast_assert ( route_table_find ( table , message_type ) = = NULL ) ;
route - > message_type = ao2_bump ( message_type ) ;
route - > callback = callback ;
route - > data = data ;
route . message_type = ao2_bump ( message_type ) ;
route . callback = callback ;
route . data = data ;
return 0 ;
res = ast_vector_append ( table , route ) ;
if ( res ) {
ROUTE_TABLE_ELEM_CLEANUP ( route ) ;
}
return res ;
}
static int table_remove_route ( struct route_table * table ,
struct stasis_message_type * message_type )
static void route_table_dtor ( struct route_table * table )
{
size_t idx ;
struct stasis_message_route * route ;
for ( idx = 0 ; idx < table - > current_size ; + + idx ) {
if ( table - > routes [ idx ] . message_type = = message_type ) {
ao2_cleanup ( message_type ) ;
table - > routes [ idx ] =
table - > routes [ - - table - > current_size ] ;
return 0 ;
}
for ( idx = 0 ; idx < ast_vector_size ( table ) ; + + idx ) {
route = ast_vector_get_addr ( table , idx ) ;
ROUTE_TABLE_ELEM_CLEANUP ( * route ) ;
}
return - 1 ;
ast_vector_free ( table ) ;
}
/*! \internal */
@ -124,9 +134,9 @@ struct stasis_message_router {
/*! Subscription to the upstream topic */
struct stasis_subscription * subscription ;
/*! Subscribed routes */
struct route_table * routes ;
struct route_table routes ;
/*! Subscribed routes for \ref stasis_cache_update messages */
struct route_table * cache_routes ;
struct route_table cache_routes ;
/*! Route of last resort */
struct stasis_message_route default_route ;
} ;
@ -137,13 +147,11 @@ static void router_dtor(void *obj)
ast_assert ( ! stasis_subscription_is_subscribed ( router - > subscription ) ) ;
ast_assert ( stasis_subscription_is_done ( router - > subscription ) ) ;
router - > subscription = NULL ;
ast_free ( router - > routes ) ;
router - > routes = NULL ;
router - > subscription = NULL ;
ast_free( router - > cache_ routes) ;
router - > cache_routes = NULL ;
route_table_dtor( & router - > routes) ;
route_table_dtor ( & router - > cache_routes ) ;
}
static int find_route (
@ -161,12 +169,12 @@ static int find_route(
/* Find a cache route */
struct stasis_cache_update * update =
stasis_message_data ( message ) ;
route = table_find_route ( router - > cache_routes , update - > type ) ;
route = route_ table_find( & router - > cache_routes , update - > type ) ;
}
if ( route = = NULL ) {
/* Find a regular route */
route = table_find_route ( router - > routes , type ) ;
route = route_ table_find( & router - > routes , type ) ;
}
if ( route = = NULL & & router - > default_route . callback ) {
@ -201,6 +209,7 @@ static void router_dispatch(void *data,
struct stasis_message_router * stasis_message_router_create (
struct stasis_topic * topic )
{
int res ;
RAII_VAR ( struct stasis_message_router * , router , NULL , ao2_cleanup ) ;
router = ao2_alloc ( sizeof ( * router ) , router_dtor ) ;
@ -208,13 +217,10 @@ struct stasis_message_router *stasis_message_router_create(
return NULL ;
}
router - > routes = ast_calloc ( 1 , sizeof ( * router - > routes ) ) ;
if ( ! router - > routes ) {
return NULL ;
}
router - > cache_routes = ast_calloc ( 1 , sizeof ( * router - > cache_routes ) ) ;
if ( ! router - > cache_routes ) {
res = 0 ;
res | = ast_vector_init ( & router - > routes , 0 ) ;
res | = ast_vector_init ( & router - > cache_routes , 0 ) ;
if ( res ) {
return NULL ;
}
@ -259,40 +265,78 @@ int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type * message_type ,
stasis_subscription_cb callback , void * data )
{
SCOPED_AO2LOCK ( lock , router ) ;
return table_add_route ( & router - > routes , message_type , callback , data ) ;
int res ;
ast_assert ( router ! = NULL ) ;
if ( ! message_type ) {
/* Cannot route to NULL type. */
return - 1 ;
}
ao2_lock ( router ) ;
res = route_table_add ( & router - > routes , message_type , callback , data ) ;
ao2_unlock ( router ) ;
return res ;
}
int stasis_message_router_add_cache_update ( struct stasis_message_router * router ,
struct stasis_message_type * message_type ,
stasis_subscription_cb callback , void * data )
{
SCOPED_AO2LOCK ( lock , router ) ;
return table_add_route ( & router - > cache_routes , message_type , callback , data ) ;
int res ;
ast_assert ( router ! = NULL ) ;
if ( ! message_type ) {
/* Cannot cache a route to NULL type. */
return - 1 ;
}
ao2_lock ( router ) ;
res = route_table_add ( & router - > cache_routes , message_type , callback , data ) ;
ao2_unlock ( router ) ;
return res ;
}
void stasis_message_router_remove ( struct stasis_message_router * router ,
struct stasis_message_type * message_type )
{
SCOPED_AO2LOCK ( lock , router ) ;
table_remove_route ( router - > routes , message_type ) ;
ast_assert ( router ! = NULL ) ;
if ( ! message_type ) {
/* Cannot remove a NULL type. */
return ;
}
ao2_lock ( router ) ;
route_table_remove ( & router - > routes , message_type ) ;
ao2_unlock ( router ) ;
}
void stasis_message_router_remove_cache_update (
struct stasis_message_router * router ,
struct stasis_message_type * message_type )
{
SCOPED_AO2LOCK ( lock , router ) ;
table_remove_route ( router - > cache_routes , message_type ) ;
ast_assert ( router ! = NULL ) ;
if ( ! message_type ) {
/* Cannot remove a NULL type. */
return ;
}
ao2_lock ( router ) ;
route_table_remove ( & router - > cache_routes , message_type ) ;
ao2_unlock ( router ) ;
}
int stasis_message_router_set_default ( struct stasis_message_router * router ,
stasis_subscription_cb callback ,
void * data )
stasis_subscription_cb callback ,
void * data )
{
SCOPED_AO2LOCK ( lock , router ) ;
ast_assert ( router ! = NULL ) ;
ast_assert ( callback ! = NULL ) ;
ao2_lock ( router ) ;
router - > default_route . callback = callback ;
router - > default_route . data = data ;
ao2_unlock ( router ) ;
/* While this implementation can never fail, it used to be able to */
return 0 ;
}