@ -109,30 +109,68 @@ struct ao2_container *app_bridges_moh;
struct ao2_container * app_bridges_playback ;
static struct ast_json * stasis_end_ json_payload( struct ast_channel_snapshot * snapshot ,
static struct ast_json * stasis_end_ to_json( struct stasis_message * message ,
const struct stasis_message_sanitizer * sanitize )
{
struct ast_channel_blob * payload = stasis_message_data ( message ) ;
if ( sanitize & & sanitize - > channel_snapshot & &
sanitize - > channel_snapshot ( payload - > snapshot ) ) {
return NULL ;
}
return ast_json_pack ( " {s: s, s: o, s: o} " ,
" type " , " StasisEnd " ,
" timestamp " , ast_json_timeval ( ast_tvnow ( ) , NULL ) ,
" channel " , ast_channel_snapshot_to_json ( snapshot , sanitize ) ) ;
" channel " , ast_channel_snapshot_to_json ( payload- > snapshot, sanitize ) ) ;
}
static struct ast_json * stasis_end_to_json ( struct stasis_message * message ,
STASIS_MESSAGE_TYPE_DEFN ( app_end_message_type ,
. to_json = stasis_end_to_json ) ;
struct start_message_blob {
struct ast_channel_snapshot * channel ; /*!< Channel that is entering Stasis() */
struct ast_channel_snapshot * replace_channel ; /*!< Channel that is being replaced (optional) */
struct ast_json * blob ; /*!< JSON blob containing timestamp and args */
} ;
static struct ast_json * stasis_start_to_json ( struct stasis_message * message ,
const struct stasis_message_sanitizer * sanitize )
{
struct ast_channel_blob * payload = stasis_message_data ( message ) ;
struct start_message_blob * payload = stasis_message_data ( message ) ;
struct ast_json * msg ;
if ( sanitize & & sanitize - > channel_snapshot & &
sanitize - > channel_snapshot ( payload - > snapshot ) ) {
sanitize - > channel_snapshot ( payload - > channel ) ) {
return NULL ;
}
msg = ast_json_pack ( " {s: s, s: O, s: O, s: o} " ,
" type " , " StasisStart " ,
" timestamp " , ast_json_object_get ( payload - > blob , " timestamp " ) ,
" args " , ast_json_object_get ( payload - > blob , " args " ) ,
" channel " , ast_channel_snapshot_to_json ( payload - > channel , NULL ) ) ;
if ( ! msg ) {
ast_log ( LOG_ERROR , " Failed to pack JSON for StasisStart message \n " ) ;
return NULL ;
}
return stasis_end_json_payload ( payload - > snapshot , sanitize ) ;
if ( payload - > replace_channel ) {
int res = ast_json_object_set ( msg , " replace_channel " ,
ast_channel_snapshot_to_json ( payload - > replace_channel , NULL ) ) ;
if ( res ) {
ast_json_unref ( msg ) ;
ast_log ( LOG_ERROR , " Failed to append JSON for StasisStart message \n " ) ;
return NULL ;
}
}
STASIS_MESSAGE_TYPE_DEFN ( ast_stasis_end_message_type ,
. to_json = stasis_end_to_json ) ;
return msg ;
}
STASIS_MESSAGE_TYPE_DEFN_LOCAL ( start_message_type ,
. to_json = stasis_start_to_json ) ;
const char * stasis_app_name ( const struct stasis_app * app )
{
@ -862,51 +900,64 @@ char *app_get_replace_channel_app(struct ast_channel *chan)
return replace_channel_app ;
}
static int send_start_msg_snapshots ( struct stasis_app * app ,
static void start_message_blob_dtor ( void * obj )
{
struct start_message_blob * payload = obj ;
ao2_cleanup ( payload - > channel ) ;
ao2_cleanup ( payload - > replace_channel ) ;
ast_json_unref ( payload - > blob ) ;
}
static int send_start_msg_snapshots ( struct ast_channel * chan , struct stasis_app * app ,
int argc , char * argv [ ] , struct ast_channel_snapshot * snapshot ,
struct ast_channel_snapshot * replace_channel_snapshot )
{
RAII_VAR ( struct ast_json * , msg , NULL , ast_json_unref ) ;
RAII_VAR ( struct ast_json * , json_blob , NULL , ast_json_unref ) ;
struct ast_json * json_args ;
struct stasis_message_sanitizer * sanitize = stasis_app_get_sanitizer ( ) ;
RAII_VAR ( struct start_message_blob * , payload , NULL , ao2_cleanup ) ;
struct stasis_message * msg ;
int i ;
if ( sanitize & & sanitize - > channel_snapshot
& & sanitize - > channel_snapshot ( snapshot ) ) {
return 0 ;
}
msg = ast_json_pack ( " {s: s, s: o, s: [], s: o} " ,
" type " , " StasisStart " ,
" timestamp " , ast_json_timeval ( ast_tvnow ( ) , NULL ) ,
" args " ,
" channel " , ast_channel_snapshot_to_json ( snapshot , NULL ) ) ;
if ( ! msg ) {
payload = ao2_alloc ( sizeof ( * payload ) , start_message_blob_dtor ) ;
if ( ! payload ) {
ast_log ( LOG_ERROR , " Error packing JSON for StasisStart message \n " ) ;
return - 1 ;
}
if ( replace_channel_snapshot ) {
int res = ast_json_object_set ( msg , " replace_channel " ,
ast_channel_snapshot_to_json ( replace_channel_snapshot , NULL ) ) ;
payload - > channel = ao2_bump ( snapshot ) ;
payload - > replace_channel = ao2_bump ( replace_channel_snapshot ) ;
if ( res ) {
json_blob = ast_json_pack ( " {s: o, s: []} " ,
" timestamp " , ast_json_timeval ( ast_tvnow ( ) , NULL ) ,
" args " ) ;
if ( ! json_blob ) {
ast_log ( LOG_ERROR , " Error packing JSON for StasisStart message \n " ) ;
return - 1 ;
}
}
/* Append arguments to args array */
json_args = ast_json_object_get ( msg , " args " ) ;
json_args = ast_json_object_get ( json_blob , " args " ) ;
ast_assert ( json_args ! = NULL ) ;
for ( i = 0 ; i < argc ; + + i ) {
int r = ast_json_array_append ( json_args ,
ast_json_string_create ( argv [ i ] ) ) ;
if ( r ! = 0 ) {
ast_log ( LOG_ERROR , " Error appending start message\n " ) ;
ast_log ( LOG_ERROR , " Error appending to Sta sisS tart message\n " ) ;
return - 1 ;
}
}
app_send ( app , msg ) ;
payload - > blob = ast_json_ref ( json_blob ) ;
msg = stasis_message_create ( start_message_type ( ) , payload ) ;
if ( ! msg ) {
ast_log ( LOG_ERROR , " Error sending StasisStart message \n " ) ;
return - 1 ;
}
stasis_publish ( ast_channel_topic ( chan ) , msg ) ;
ao2_ref ( msg , - 1 ) ;
return 0 ;
}
@ -928,31 +979,36 @@ static int send_start_msg(struct stasis_app *app, struct ast_channel *chan,
if ( ! snapshot ) {
return - 1 ;
}
return send_start_msg_snapshots ( app, argc , argv , snapshot , replace_channel_snapshot ) ;
return send_start_msg_snapshots ( chan, app, argc , argv , snapshot , replace_channel_snapshot ) ;
}
static int send_end_msg_snapshot ( struct stasis_app * app , struct ast_channel_snapshot * snapshot )
static void remove_masquerade_store ( struct ast_channel * chan ) ;
int app_send_end_msg ( struct stasis_app * app , struct ast_channel * chan )
{
struct stasis_message_sanitizer * sanitize = stasis_app_get_sanitizer ( ) ;
struct ast_json * msg ;
struct ast_json * blob ;
if ( sanitize & & sanitize - > channel _snapshot
& & sanitize - > channel _snapshot( snapshot ) ) {
if ( sanitize & & sanitize - > channel
& & sanitize - > channel ( chan ) ) {
return 0 ;
}
msg = stasis_end_json_payload ( snapshot , sanitize ) ;
if ( ! msg ) {
blob = ast_json_pack ( " {s: s} " , " app " , app_name ( app ) ) ;
if ( ! blob ) {
ast_log ( LOG_ERROR , " Error packing JSON for StasisEnd message \n " ) ;
return - 1 ;
}
app_send ( app , msg ) ;
ast_json_unref ( msg ) ;
stasis_app_channel_set_stasis_end_published ( chan ) ;
remove_masquerade_store ( chan ) ;
ast_channel_publish_blob ( chan , app_end_message_type ( ) , blob ) ;
ast_json_unref ( blob ) ;
return 0 ;
}
static void remove_masquerade_store ( struct ast_channel * chan ) ;
static int masq_match_cb ( void * obj , void * data , int flags )
{
struct stasis_app_control * control = obj ;
@ -968,32 +1024,22 @@ static int masq_match_cb(void *obj, void *data, int flags)
static void channel_stolen_cb ( void * data , struct ast_channel * old_chan , struct ast_channel * new_chan )
{
struct ast_channel_snapshot * snapshot ;
struct stasis_app_control * control ;
/* grab a snapshot */
snapshot = ast_channel_snapshot_get_latest ( ast_channel_uniqueid ( new_chan ) ) ;
if ( ! snapshot ) {
ast_log ( LOG_ERROR , " Could not get snapshot for masqueraded channel \n " ) ;
return ;
}
/* find control */
control = ao2_callback ( app_controls , 0 , masq_match_cb , old_chan ) ;
if ( ! control ) {
ast_log ( LOG_ERROR , " Could not find control for masqueraded channel \n " ) ;
ao2_cleanup ( snapshot ) ;
return ;
}
/* send the StasisEnd message to the app */
send_end_msg_snapshot ( control_app ( control ) , snapshot ) ;
app_ send_end_msg( control_app ( control ) , new_chan ) ;
/* remove the datastore */
remove_masquerade_store ( old_chan ) ;
ao2_cleanup ( control ) ;
ao2_cleanup ( snapshot ) ;
}
static void channel_replaced_cb ( void * data , struct ast_channel * old_chan , struct ast_channel * new_chan )
@ -1032,10 +1078,10 @@ static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct
/* send the StasisStart with replace_channel to the app */
send_start_msg_snapshots ( control_app ( control ) , 0 , NULL , new_snapshot ,
send_start_msg_snapshots ( new_chan , control_app ( control ) , 0 , NULL , new_snapshot ,
old_snapshot ) ;
/* send the StasisEnd message to the app */
send_end_msg_snapshot ( control_app ( control ) , old_ snapshot ) ;
app_ send_end_msg( control_app ( control ) , old_ chan ) ;
/* fixup channel topic forwards */
if ( app_replace_channel_forwards ( control_app ( control ) , old_snapshot - > uniqueid , new_chan ) ) {
@ -1090,33 +1136,6 @@ static void remove_masquerade_store(struct ast_channel *chan)
ast_datastore_free ( datastore ) ;
}
static int send_end_msg ( struct stasis_app * app , struct ast_channel * chan )
{
struct ast_channel_snapshot * snapshot ;
int res = 0 ;
ast_assert ( chan ! = NULL ) ;
/* A masquerade has occurred and this message will be wrong so it
* has already been sent elsewhere . */
if ( ! has_masquerade_store ( chan ) ) {
return 0 ;
}
/* Set channel info */
snapshot = ast_channel_snapshot_get_latest ( ast_channel_uniqueid ( chan ) ) ;
if ( ! snapshot ) {
return - 1 ;
}
if ( send_end_msg_snapshot ( app , snapshot ) ) {
res = - 1 ;
}
ao2_cleanup ( snapshot ) ;
return res ;
}
void stasis_app_control_execute_until_exhausted ( struct ast_channel * chan , struct stasis_app_control * control )
{
while ( ! control_is_done ( control ) ) {
@ -1232,18 +1251,18 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
return - 1 ;
}
res = send_start_msg( app , chan , argc , argv ) ;
res = app_subscribe_channel( app , chan ) ;
if ( res ! = 0 ) {
ast_log ( LOG_ERROR ,
" Error sending start message to '%s' \n " , app_name ) ;
ast_log ( LOG_ERROR , " Error subscribing app '%s' to channel '%s' \n " ,
app_name , ast_channel_name ( chan ) ) ;
remove_masquerade_store ( chan ) ;
return - 1 ;
}
res = app_subscribe_channel( app , chan ) ;
res = send_start_msg( app , chan , argc , argv ) ;
if ( res ! = 0 ) {
ast_log ( LOG_ERROR , " Error subscribing app '%s' to channel '%s' \n " ,
app_name , ast_channel_name ( chan ) ) ;
ast_log ( LOG_ERROR ,
" Error sending start message to '%s' \n " , app_name ) ;
remove_masquerade_store ( chan ) ;
return - 1 ;
}
@ -1327,9 +1346,9 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
/* Only publish a stasis_end event if it hasn't already been published */
if ( ! stasis_app_channel_is_stasis_end_published ( chan ) ) {
app_unsubscribe_channel ( app , chan ) ;
res = send_end_msg ( app , chan ) ;
remove_masquerade_store ( chan ) ;
/* A masquerade has occurred and this message will be wrong so it
* has already been sent elsewhere . */
res = has_masquerade_store ( chan ) & & app_send_end_msg ( app , chan ) ;
if ( res ! = 0 ) {
ast_log ( LOG_ERROR ,
" Error sending end message to %s \n " , app_name ) ;
@ -1853,15 +1872,8 @@ void stasis_app_unref(void)
ast_module_unref ( ast_module_info - > self ) ;
}
/*!
* \ brief Subscription to StasisEnd events
*/
struct stasis_subscription * stasis_end_sub ;
static int unload_module ( void )
{
stasis_end_sub = stasis_unsubscribe ( stasis_end_sub ) ;
stasis_app_unregister_event_sources ( ) ;
messaging_cleanup ( ) ;
@ -1882,7 +1894,8 @@ static int unload_module(void)
ao2_cleanup ( app_bridges_playback ) ;
app_bridges_playback = NULL ;
STASIS_MESSAGE_TYPE_CLEANUP ( ast_stasis_end_message_type ) ;
STASIS_MESSAGE_TYPE_CLEANUP ( app_end_message_type ) ;
STASIS_MESSAGE_TYPE_CLEANUP ( start_message_type ) ;
return 0 ;
}
@ -1896,6 +1909,15 @@ static int channel_snapshot_sanitizer(const struct ast_channel_snapshot *snapsho
return 1 ;
}
/* \brief Sanitization callback for channels */
static int channel_sanitizer ( const struct ast_channel * chan )
{
if ( ! chan | | ! ( ast_channel_tech ( chan ) - > properties & AST_CHAN_TP_INTERNAL ) ) {
return 0 ;
}
return 1 ;
}
/* \brief Sanitization callback for channel unique IDs */
static int channel_id_sanitizer ( const char * id )
{
@ -1908,6 +1930,7 @@ static int channel_id_sanitizer(const char *id)
struct stasis_message_sanitizer app_sanitizer = {
. channel_id = channel_id_sanitizer ,
. channel_snapshot = channel_snapshot_sanitizer ,
. channel = channel_sanitizer ,
} ;
struct stasis_message_sanitizer * stasis_app_get_sanitizer ( void )
@ -1915,21 +1938,7 @@ struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
return & app_sanitizer ;
}
static void remove_masquerade_store_by_name ( const char * channel_name )
{
struct ast_channel * chan ;
chan = ast_channel_get_by_name ( channel_name ) ;
if ( ! chan ) {
return ;
}
remove_masquerade_store ( chan ) ;
ast_channel_unref ( chan ) ;
}
static void check_for_stasis_end ( void * data , struct stasis_subscription * sub ,
struct stasis_message * message )
void app_end_message_handler ( struct stasis_message * message )
{
struct ast_channel_blob * payload ;
struct ast_channel_snapshot * snapshot ;
@ -1938,10 +1947,6 @@ static void check_for_stasis_end(void *data, struct stasis_subscription *sub,
size_t alloc_size ;
const char * channels [ 1 ] ;
if ( stasis_message_type ( message ) ! = ast_stasis_end_message_type ( ) ) {
return ;
}
payload = stasis_message_data ( message ) ;
snapshot = payload - > snapshot ;
app_name = ast_json_string_get ( ast_json_object_get ( payload - > blob , " app " ) ) ;
@ -1953,8 +1958,6 @@ static void check_for_stasis_end(void *data, struct stasis_subscription *sub,
channels [ 0 ] = channel_uri ;
stasis_app_unsubscribe ( app_name , channels , ARRAY_LEN ( channels ) , NULL ) ;
remove_masquerade_store_by_name ( snapshot - > name ) ;
}
static const struct ast_datastore_info stasis_internal_channel_info = {
@ -2027,7 +2030,10 @@ int stasis_app_channel_is_internal(struct ast_channel *chan)
static int load_module ( void )
{
if ( STASIS_MESSAGE_TYPE_INIT ( ast_stasis_end_message_type ) ! = 0 ) {
if ( STASIS_MESSAGE_TYPE_INIT ( start_message_type ) ! = 0 ) {
return AST_MODULE_LOAD_DECLINE ;
}
if ( STASIS_MESSAGE_TYPE_INIT ( app_end_message_type ) ! = 0 ) {
return AST_MODULE_LOAD_DECLINE ;
}
apps_registry = ao2_container_alloc ( APPS_NUM_BUCKETS , app_hash , app_compare ) ;
@ -2053,12 +2059,6 @@ static int load_module(void)
stasis_app_register_event_sources ( ) ;
stasis_end_sub = stasis_subscribe ( ast_channel_topic_all ( ) , check_for_stasis_end , NULL ) ;
if ( ! stasis_end_sub ) {
unload_module ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
return AST_MODULE_LOAD_SUCCESS ;
}