@ -61,6 +61,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
# include "asterisk/module.h"
# include "asterisk/stasis_app_impl.h"
# include "asterisk/stasis_channels.h"
# include "asterisk/stasis_bridging.h"
# include "asterisk/stasis_message_router.h"
# include "asterisk/strings.h"
# include "stasis/app.h"
@ -89,9 +90,14 @@ struct ao2_container *apps_registry;
struct ao2_container * app_controls ;
struct ao2_container * app_bridges ;
/*! \brief Message router for the channel caching topic */
struct stasis_message_router * channel_router ;
/*! \brief Message router for the bridge caching topic */
struct stasis_message_router * bridge_router ;
/*! AO2 hash function for \ref app */
static int app_hash ( const void * obj , const int flags )
{
@ -159,9 +165,42 @@ struct stasis_app_control *stasis_app_control_find_by_channel_id(
return ao2_find ( app_controls , channel_id , OBJ_KEY ) ;
}
/*! AO2 hash function for bridges container */
static int bridges_hash ( const void * obj , const int flags )
{
const struct ast_bridge * bridge = obj ;
const char * id = flags & OBJ_KEY ?
obj : bridge - > uniqueid ;
return ast_str_hash ( id ) ;
}
/*! AO2 comparison function for bridges container */
static int bridges_compare ( void * lhs , void * rhs , int flags )
{
const struct ast_bridge * lhs_bridge = lhs ;
const struct ast_bridge * rhs_bridge = rhs ;
const char * lhs_id = lhs_bridge - > uniqueid ;
const char * rhs_id = flags & OBJ_KEY ?
rhs : rhs_bridge - > uniqueid ;
if ( strcmp ( lhs_id , rhs_id ) = = 0 ) {
return CMP_MATCH | CMP_STOP ;
} else {
return 0 ;
}
}
struct ast_bridge * stasis_app_bridge_find_by_id (
const char * bridge_id )
{
return ao2_find ( app_bridges , bridge_id , OBJ_KEY ) ;
}
/*! \brief Typedef for blob handler callbacks */
typedef struct ast_json * ( * channel_blob_handler_cb ) ( struct ast_channel_blob * ) ;
/*! \brief Callback to check whether an app is watching a given channel */
static int app_watching_channel_cb ( void * obj , void * arg , int flags )
{
struct app * app = obj ;
@ -170,7 +209,8 @@ static int app_watching_channel_cb(void *obj, void *arg, int flags)
return app_is_watching_channel ( app , uniqueid ) ? CMP_MATCH : 0 ;
}
static struct ao2_container * get_watching_apps ( const char * uniqueid )
/*! \brief Get a container full of apps that are interested in the specified channel */
static struct ao2_container * get_apps_watching_channel ( const char * uniqueid )
{
struct ao2_container * watching_apps ;
char * uniqueid_dup ;
@ -302,7 +342,7 @@ static int app_send_cb(void *obj, void *arg, int flags)
return 0 ;
}
static void sub_ snapshot_handler( void * data ,
static void sub_ channel_ snapshot_handler( void * data ,
struct stasis_subscription * sub ,
struct stasis_topic * topic ,
struct stasis_message * message )
@ -313,7 +353,7 @@ static void sub_snapshot_handler(void *data,
struct ast_channel_snapshot * old_snapshot = stasis_message_data ( update - > old_snapshot ) ;
int i ;
watching_apps = get_ watching_apps ( new_snapshot ? new_snapshot - > uniqueid : old_snapshot - > uniqueid ) ;
watching_apps = get_ apps_watching_channel ( new_snapshot ? new_snapshot - > uniqueid : old_snapshot - > uniqueid ) ;
if ( ! watching_apps ) {
return ;
}
@ -342,7 +382,7 @@ static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_hand
return ;
}
watching_apps = get_ watching_apps ( obj - > snapshot - > uniqueid ) ;
watching_apps = get_ apps_watching_channel ( obj - > snapshot - > uniqueid ) ;
if ( ! watching_apps ) {
return ;
}
@ -357,7 +397,7 @@ static void generic_blob_handler(struct ast_channel_blob *obj, channel_blob_hand
/*!
* \ brief In addition to running ao2_cleanup ( ) , this function also removes the
* object from the app_controls ( ) container .
* object from the app_controls container .
*/
static void control_unlink ( struct stasis_app_control * control )
{
@ -370,6 +410,38 @@ static void control_unlink(struct stasis_app_control *control)
ao2_cleanup ( control ) ;
}
struct ast_bridge * stasis_app_bridge_create ( const char * type )
{
struct ast_bridge * bridge ;
int capabilities , flags = 0 ;
if ( ast_strlen_zero ( type ) | | ! strcmp ( type , " mixing " ) ) {
capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX |
AST_BRIDGE_CAPABILITY_MULTIMIX |
AST_BRIDGE_CAPABILITY_NATIVE ;
flags = AST_BRIDGE_FLAG_SMART ;
} else if ( ! strcmp ( type , " holding " ) ) {
capabilities = AST_BRIDGE_CAPABILITY_HOLDING ;
} else {
return NULL ;
}
bridge = ast_bridge_base_new ( capabilities , flags ) ;
if ( bridge ) {
ao2_link ( app_bridges , bridge ) ;
}
return bridge ;
}
void stasis_app_bridge_destroy ( const char * bridge_id )
{
struct ast_bridge * bridge = stasis_app_bridge_find_by_id ( bridge_id ) ;
if ( ! bridge ) {
return ;
}
ao2_unlink ( app_bridges , bridge ) ;
ast_bridge_destroy ( bridge ) ;
}
int app_send_start_msg ( struct app * app , struct ast_channel * chan ,
int argc , char * argv [ ] )
{
@ -679,6 +751,199 @@ void stasis_app_unref(void)
ast_module_unref ( ast_module_info - > self ) ;
}
/*! \brief Callback to check whether an app is watching a given bridge */
static int app_watching_bridge_cb ( void * obj , void * arg , int flags )
{
struct app * app = obj ;
char * uniqueid = arg ;
return app_is_watching_bridge ( app , uniqueid ) ? CMP_MATCH : 0 ;
}
/*! \brief Get a container full of apps that are interested in the specified bridge */
static struct ao2_container * get_apps_watching_bridge ( const char * uniqueid )
{
struct ao2_container * watching_apps ;
char * uniqueid_dup ;
RAII_VAR ( struct ao2_iterator * , watching_apps_iter , NULL , ao2_iterator_destroy ) ;
ast_assert ( uniqueid ! = NULL ) ;
uniqueid_dup = ast_strdupa ( uniqueid ) ;
watching_apps_iter = ao2_callback ( apps_registry , OBJ_MULTIPLE , app_watching_bridge_cb , uniqueid_dup ) ;
watching_apps = watching_apps_iter - > c ;
if ( ! ao2_container_count ( watching_apps ) ) {
return NULL ;
}
ao2_ref ( watching_apps , + 1 ) ;
return watching_apps_iter - > c ;
}
/*! Callback used to remove an app's interest in a bridge */
static int remove_bridge_cb ( void * obj , void * arg , int flags )
{
app_remove_bridge ( obj , arg ) ;
return 0 ;
}
static void sub_bridge_snapshot_handler ( void * data ,
struct stasis_subscription * sub ,
struct stasis_topic * topic ,
struct stasis_message * message )
{
RAII_VAR ( struct ao2_container * , watching_apps , NULL , ao2_cleanup ) ;
struct stasis_cache_update * update = stasis_message_data ( message ) ;
struct ast_bridge_snapshot * new_snapshot = stasis_message_data ( update - > new_snapshot ) ;
struct ast_bridge_snapshot * old_snapshot = stasis_message_data ( update - > old_snapshot ) ;
RAII_VAR ( struct ast_json * , msg , NULL , ast_json_unref ) ;
watching_apps = get_apps_watching_bridge ( new_snapshot ? new_snapshot - > uniqueid : old_snapshot - > uniqueid ) ;
if ( ! watching_apps | | ! ao2_container_count ( watching_apps ) ) {
return ;
}
if ( ! new_snapshot ) {
RAII_VAR ( char * , bridge_id , ast_strdup ( old_snapshot - > uniqueid ) , ast_free ) ;
/* The bridge has gone away. Create the message, make sure no apps are
* watching this bridge anymore , and destroy the bridge ' s control
* structure */
msg = stasis_json_event_bridge_destroyed_create ( old_snapshot ) ;
ao2_callback ( watching_apps , OBJ_NODATA , remove_bridge_cb , bridge_id ) ;
stasis_app_bridge_destroy ( old_snapshot - > uniqueid ) ;
} else if ( ! old_snapshot ) {
msg = stasis_json_event_bridge_created_create ( old_snapshot ) ;
}
if ( ! msg ) {
return ;
}
distribute_message ( watching_apps , msg ) ;
}
/*! \brief Callback used to merge two containers of applications */
static int list_merge_cb ( void * obj , void * arg , int flags )
{
/* remove any current entries for this app */
ao2_find ( arg , obj , OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE ) ;
/* relink as the only entry */
ao2_link ( arg , obj ) ;
return 0 ;
}
/*! \brief Merge container src into container dst without modifying src */
static void update_apps_list ( struct ao2_container * dst , struct ao2_container * src )
{
ao2_callback ( src , OBJ_NODATA , list_merge_cb , dst ) ;
}
/*! \brief Callback for adding to an app's bridges of interest */
static int app_add_bridge_cb ( void * obj , void * arg , int flags )
{
app_add_bridge ( obj , arg ) ;
return 0 ;
}
/*! \brief Add interest in the given bridge to all apps in the container */
static void update_bridge_interest ( struct ao2_container * apps , const char * bridge_id )
{
RAII_VAR ( char * , bridge_id_dup , ast_strdup ( bridge_id ) , ast_free ) ;
ao2_callback ( apps , OBJ_NODATA , app_add_bridge_cb , bridge_id_dup ) ;
}
static void sub_bridge_merge_handler ( void * data ,
struct stasis_subscription * sub ,
struct stasis_topic * topic ,
struct stasis_message * message )
{
RAII_VAR ( struct ao2_container * , watching_apps_to , NULL , ao2_cleanup ) ;
RAII_VAR ( struct ao2_container * , watching_apps_from , NULL , ao2_cleanup ) ;
RAII_VAR ( struct ao2_container * , watching_apps_all , ao2_container_alloc ( 1 , NULL , NULL ) , ao2_cleanup ) ;
struct ast_bridge_merge_message * merge = stasis_message_data ( message ) ;
RAII_VAR ( struct ast_json * , msg , NULL , ast_json_unref ) ;
RAII_VAR ( struct ast_json * , blob , NULL , ast_json_unref ) ;
watching_apps_to = get_apps_watching_bridge ( merge - > to - > uniqueid ) ;
if ( watching_apps_to ) {
update_apps_list ( watching_apps_all , watching_apps_to ) ;
}
watching_apps_from = get_apps_watching_bridge ( merge - > from - > uniqueid ) ;
if ( watching_apps_from ) {
update_bridge_interest ( watching_apps_from , merge - > to - > uniqueid ) ;
update_apps_list ( watching_apps_all , watching_apps_from ) ;
}
if ( ! ao2_container_count ( watching_apps_all ) ) {
return ;
}
/* The secondary bridge has to be packed into JSON by hand because the auto-generated
* JSON event generator can only handle one instance of a given snapshot type in an
* elegant way */
blob = ast_json_pack ( " {s: o} " , " bridge_from " , ast_bridge_snapshot_to_json ( merge - > from ) ) ;
if ( ! blob ) {
return ;
}
msg = stasis_json_event_bridge_merged_create ( merge - > to , blob ) ;
distribute_message ( watching_apps_all , msg ) ;
}
static void sub_bridge_enter_handler ( void * data ,
struct stasis_subscription * sub ,
struct stasis_topic * topic ,
struct stasis_message * message )
{
RAII_VAR ( struct ao2_container * , watching_apps_channel , NULL , ao2_cleanup ) ;
RAII_VAR ( struct ao2_container * , watching_apps_bridge , NULL , ao2_cleanup ) ;
RAII_VAR ( struct ao2_container * , watching_apps_all , ao2_container_alloc ( 1 , NULL , NULL ) , ao2_cleanup ) ;
struct ast_bridge_blob * obj = stasis_message_data ( message ) ;
RAII_VAR ( struct ast_json * , msg , NULL , ast_json_unref ) ;
watching_apps_bridge = get_apps_watching_bridge ( obj - > bridge - > uniqueid ) ;
if ( watching_apps_bridge ) {
update_apps_list ( watching_apps_all , watching_apps_bridge ) ;
}
watching_apps_channel = get_apps_watching_channel ( obj - > channel - > uniqueid ) ;
if ( watching_apps_channel ) {
update_bridge_interest ( watching_apps_channel , obj - > bridge - > uniqueid ) ;
update_apps_list ( watching_apps_all , watching_apps_channel ) ;
}
if ( ! ao2_container_count ( watching_apps_all ) ) {
return ;
}
msg = stasis_json_event_channel_entered_bridge_create ( obj - > bridge , obj - > channel ) ;
distribute_message ( watching_apps_all , msg ) ;
}
static void sub_bridge_leave_handler ( void * data ,
struct stasis_subscription * sub ,
struct stasis_topic * topic ,
struct stasis_message * message )
{
RAII_VAR ( struct ao2_container * , watching_apps_bridge , NULL , ao2_cleanup ) ;
struct ast_bridge_blob * obj = stasis_message_data ( message ) ;
RAII_VAR ( struct ast_json * , msg , NULL , ast_json_unref ) ;
watching_apps_bridge = get_apps_watching_bridge ( obj - > bridge - > uniqueid ) ;
if ( ! watching_apps_bridge ) {
return ;
}
msg = stasis_json_event_channel_left_bridge_create ( obj - > bridge , obj - > channel ) ;
distribute_message ( watching_apps_bridge , msg ) ;
}
static int load_module ( void )
{
int r = 0 ;
@ -695,12 +960,18 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE ;
}
app_bridges = ao2_container_alloc ( CONTROLS_NUM_BUCKETS ,
bridges_hash , bridges_compare ) ;
if ( app_bridges = = NULL ) {
return AST_MODULE_LOAD_FAILURE ;
}
channel_router = stasis_message_router_create ( stasis_caching_get_topic ( ast_channel_topic_all_cached ( ) ) ) ;
if ( ! channel_router ) {
return AST_MODULE_LOAD_FAILURE ;
}
r | = stasis_message_router_add ( channel_router , stasis_cache_update_type ( ) , sub_snapshot_handler , NULL ) ;
r | = stasis_message_router_add ( channel_router , stasis_cache_update_type ( ) , sub_ channel_ snapshot_handler, NULL ) ;
r | = stasis_message_router_add ( channel_router , ast_channel_user_event_type ( ) , sub_userevent_handler , NULL ) ;
r | = stasis_message_router_add ( channel_router , ast_channel_varset_type ( ) , sub_varset_handler , NULL ) ;
r | = stasis_message_router_add ( channel_router , ast_channel_dtmf_begin_type ( ) , sub_dtmf_handler , NULL ) ;
@ -709,6 +980,19 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE ;
}
bridge_router = stasis_message_router_create ( stasis_caching_get_topic ( ast_bridge_topic_all_cached ( ) ) ) ;
if ( ! bridge_router ) {
return AST_MODULE_LOAD_FAILURE ;
}
r | = stasis_message_router_add ( bridge_router , stasis_cache_update_type ( ) , sub_bridge_snapshot_handler , NULL ) ;
r | = stasis_message_router_add ( bridge_router , ast_bridge_merge_message_type ( ) , sub_bridge_merge_handler , NULL ) ;
r | = stasis_message_router_add ( bridge_router , ast_channel_entered_bridge_type ( ) , sub_bridge_enter_handler , NULL ) ;
r | = stasis_message_router_add ( bridge_router , ast_channel_left_bridge_type ( ) , sub_bridge_leave_handler , NULL ) ;
if ( r ) {
return AST_MODULE_LOAD_FAILURE ;
}
return AST_MODULE_LOAD_SUCCESS ;
}
@ -719,12 +1003,18 @@ static int unload_module(void)
stasis_message_router_unsubscribe_and_join ( channel_router ) ;
channel_router = NULL ;
stasis_message_router_unsubscribe_and_join ( bridge_router ) ;
bridge_router = NULL ;
ao2_cleanup ( apps_registry ) ;
apps_registry = NULL ;
ao2_cleanup ( app_controls ) ;
app_controls = NULL ;
ao2_cleanup ( app_bridges ) ;
app_bridges = NULL ;
return r ;
}