@ -125,7 +125,7 @@ static struct ast_json *stasis_end_to_json(struct stasis_message *message,
" channel " , ast_channel_snapshot_to_json ( payload - > snapshot , sanitize ) ) ;
" channel " , ast_channel_snapshot_to_json ( payload - > snapshot , sanitize ) ) ;
}
}
STASIS_MESSAGE_TYPE_DEFN ( app_ end_message_type,
STASIS_MESSAGE_TYPE_DEFN _LOCAL ( end_message_type,
. to_json = stasis_end_to_json ) ;
. to_json = stasis_end_to_json ) ;
struct start_message_blob {
struct start_message_blob {
@ -919,6 +919,12 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
struct stasis_message * msg ;
struct stasis_message * msg ;
int i ;
int i ;
if ( app_subscribe_channel ( app , chan ) ) {
ast_log ( LOG_ERROR , " Error subscribing app '%s' to channel '%s' \n " ,
app_name ( app ) , ast_channel_name ( chan ) ) ;
return - 1 ;
}
payload = ao2_alloc ( sizeof ( * payload ) , start_message_blob_dtor ) ;
payload = ao2_alloc ( sizeof ( * payload ) , start_message_blob_dtor ) ;
if ( ! payload ) {
if ( ! payload ) {
ast_log ( LOG_ERROR , " Error packing JSON for StasisStart message \n " ) ;
ast_log ( LOG_ERROR , " Error packing JSON for StasisStart message \n " ) ;
@ -928,7 +934,8 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
payload - > channel = ao2_bump ( snapshot ) ;
payload - > channel = ao2_bump ( snapshot ) ;
payload - > replace_channel = ao2_bump ( replace_channel_snapshot ) ;
payload - > replace_channel = ao2_bump ( replace_channel_snapshot ) ;
json_blob = ast_json_pack ( " {s: o, s: []} " ,
json_blob = ast_json_pack ( " {s: s, s: o, s: []} " ,
" app " , app_name ( app ) ,
" timestamp " , ast_json_timeval ( ast_tvnow ( ) , NULL ) ,
" timestamp " , ast_json_timeval ( ast_tvnow ( ) , NULL ) ,
" args " ) ;
" args " ) ;
if ( ! json_blob ) {
if ( ! json_blob ) {
@ -956,7 +963,10 @@ static int send_start_msg_snapshots(struct ast_channel *chan, struct stasis_app
return - 1 ;
return - 1 ;
}
}
stasis_publish ( ast_channel_topic ( chan ) , msg ) ;
if ( replace_channel_snapshot ) {
app_unsubscribe_channel_id ( app , replace_channel_snapshot - > uniqueid ) ;
}
stasis_publish ( ast_app_get_topic ( app ) , msg ) ;
ao2_ref ( msg , - 1 ) ;
ao2_ref ( msg , - 1 ) ;
return 0 ;
return 0 ;
}
}
@ -988,6 +998,7 @@ int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
{
{
struct stasis_message_sanitizer * sanitize = stasis_app_get_sanitizer ( ) ;
struct stasis_message_sanitizer * sanitize = stasis_app_get_sanitizer ( ) ;
struct ast_json * blob ;
struct ast_json * blob ;
struct stasis_message * msg ;
if ( sanitize & & sanitize - > channel
if ( sanitize & & sanitize - > channel
& & sanitize - > channel ( chan ) ) {
& & sanitize - > channel ( chan ) ) {
@ -1000,10 +1011,13 @@ int app_send_end_msg(struct stasis_app *app, struct ast_channel *chan)
return - 1 ;
return - 1 ;
}
}
stasis_app_channel_set_stasis_end_published ( chan ) ;
remove_masquerade_store ( chan ) ;
remove_masquerade_store ( chan ) ;
ast_channel_publish_blob ( chan , app_end_message_type ( ) , blob ) ;
app_unsubscribe_channel ( app , chan ) ;
msg = ast_channel_blob_create ( chan , end_message_type ( ) , blob ) ;
if ( msg ) {
stasis_publish ( ast_app_get_topic ( app ) , msg ) ;
}
ao2_cleanup ( msg ) ;
ast_json_unref ( blob ) ;
ast_json_unref ( blob ) ;
return 0 ;
return 0 ;
@ -1034,6 +1048,7 @@ static void channel_stolen_cb(void *data, struct ast_channel *old_chan, struct a
}
}
/* send the StasisEnd message to the app */
/* send the StasisEnd message to the app */
stasis_app_channel_set_stasis_end_published ( new_chan ) ;
app_send_end_msg ( control_app ( control ) , new_chan ) ;
app_send_end_msg ( control_app ( control ) , new_chan ) ;
/* remove the datastore */
/* remove the datastore */
@ -1083,11 +1098,6 @@ static void channel_replaced_cb(void *data, struct ast_channel *old_chan, struct
/* send the StasisEnd message to the app */
/* send the StasisEnd message to the app */
app_send_end_msg ( control_app ( control ) , old_chan ) ;
app_send_end_msg ( control_app ( control ) , old_chan ) ;
/* fixup channel topic forwards */
if ( app_replace_channel_forwards ( control_app ( control ) , old_snapshot - > uniqueid , new_chan ) ) {
ast_log ( LOG_ERROR , " Failed to fixup channel topic forwards for %s(%s) owned by %s \n " ,
old_snapshot - > name , old_snapshot - > uniqueid , app_name ( control_app ( control ) ) ) ;
}
ao2_cleanup ( control ) ;
ao2_cleanup ( control ) ;
}
}
@ -1251,14 +1261,6 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
return - 1 ;
return - 1 ;
}
}
res = app_subscribe_channel ( app , chan ) ;
if ( res ! = 0 ) {
ast_log ( LOG_ERROR , " Error subscribing app '%s' to channel '%s' \n " ,
app_name , ast_channel_name ( chan ) ) ;
remove_masquerade_store ( chan ) ;
return - 1 ;
}
res = send_start_msg ( app , chan , argc , argv ) ;
res = send_start_msg ( app , chan , argc , argv ) ;
if ( res ! = 0 ) {
if ( res ! = 0 ) {
ast_log ( LOG_ERROR ,
ast_log ( LOG_ERROR ,
@ -1894,7 +1896,7 @@ static int unload_module(void)
ao2_cleanup ( app_bridges_playback ) ;
ao2_cleanup ( app_bridges_playback ) ;
app_bridges_playback = NULL ;
app_bridges_playback = NULL ;
STASIS_MESSAGE_TYPE_CLEANUP ( app_ end_message_type) ;
STASIS_MESSAGE_TYPE_CLEANUP ( end_message_type) ;
STASIS_MESSAGE_TYPE_CLEANUP ( start_message_type ) ;
STASIS_MESSAGE_TYPE_CLEANUP ( start_message_type ) ;
return 0 ;
return 0 ;
@ -1938,28 +1940,6 @@ struct stasis_message_sanitizer *stasis_app_get_sanitizer(void)
return & app_sanitizer ;
return & app_sanitizer ;
}
}
void app_end_message_handler ( struct stasis_message * message )
{
struct ast_channel_blob * payload ;
struct ast_channel_snapshot * snapshot ;
const char * app_name ;
char * channel_uri ;
size_t alloc_size ;
const char * channels [ 1 ] ;
payload = stasis_message_data ( message ) ;
snapshot = payload - > snapshot ;
app_name = ast_json_string_get ( ast_json_object_get ( payload - > blob , " app " ) ) ;
/* +8 is for the length of "channel:" */
alloc_size = AST_MAX_UNIQUEID + 8 ;
channel_uri = ast_alloca ( alloc_size ) ;
snprintf ( channel_uri , alloc_size , " channel:%s " , snapshot - > uniqueid ) ;
channels [ 0 ] = channel_uri ;
stasis_app_unsubscribe ( app_name , channels , ARRAY_LEN ( channels ) , NULL ) ;
}
static const struct ast_datastore_info stasis_internal_channel_info = {
static const struct ast_datastore_info stasis_internal_channel_info = {
. type = " stasis-internal-channel " ,
. type = " stasis-internal-channel " ,
} ;
} ;
@ -2033,7 +2013,7 @@ static int load_module(void)
if ( STASIS_MESSAGE_TYPE_INIT ( start_message_type ) ! = 0 ) {
if ( STASIS_MESSAGE_TYPE_INIT ( start_message_type ) ! = 0 ) {
return AST_MODULE_LOAD_DECLINE ;
return AST_MODULE_LOAD_DECLINE ;
}
}
if ( STASIS_MESSAGE_TYPE_INIT ( app_ end_message_type) ! = 0 ) {
if ( STASIS_MESSAGE_TYPE_INIT ( end_message_type) ! = 0 ) {
return AST_MODULE_LOAD_DECLINE ;
return AST_MODULE_LOAD_DECLINE ;
}
}
apps_registry = ao2_container_alloc ( APPS_NUM_BUCKETS , app_hash , app_compare ) ;
apps_registry = ao2_container_alloc ( APPS_NUM_BUCKETS , app_hash , app_compare ) ;