@ -31,6 +31,7 @@
# include "asterisk/res_pjsip_outbound_publish.h"
# include "asterisk/module.h"
# include "asterisk/taskprocessor.h"
# include "asterisk/threadpool.h"
# include "asterisk/datastore.h"
/*** DOCUMENTATION
@ -161,6 +162,8 @@ struct ast_sip_outbound_publish_client {
struct ast_sip_outbound_publish * publish ;
/*! \brief The name of the transport to be used for the publish */
char * transport_name ;
/*! \brief Serializer for stuff and things */
struct ast_taskprocessor * serializer ;
} ;
/*! \brief Outbound publish state information (persists for lifetime of a publish) */
@ -171,13 +174,11 @@ struct ast_sip_outbound_publish_state {
char id [ 0 ] ;
} ;
/*! \brief Unloading data */
struct unloading_data {
int is_unloading ;
int count ;
ast_mutex_t lock ;
ast_cond_t cond ;
} unloading ;
/*! Time needs to be long enough for a transaction to timeout if nothing replies. */
# define MAX_UNLOAD_TIMEOUT_TIME 35 /* Seconds */
/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
static struct ast_serializer_shutdown_group * shutdown_group ;
/*! \brief Default number of client state container buckets */
# define DEFAULT_STATE_BUCKETS 31
@ -393,7 +394,7 @@ static void sip_outbound_publish_synchronize(struct ast_sip_event_publisher_hand
/* If the publisher client has been started but it is going away stop it */
removed - > stop_publishing ( state - > client ) ;
state - > client - > started = 0 ;
if ( ast_sip_push_task ( NULL , cancel_refresh_timer_task , ao2_bump ( state - > client ) ) ) {
if ( ast_sip_push_task ( state - > client - > serializer , cancel_refresh_timer_task , ao2_bump ( state - > client ) ) ) {
ast_log ( LOG_WARNING , " Could not stop refresh timer on client '%s' \n " ,
ast_sorcery_object_get_id ( publish ) ) ;
ao2_ref ( state - > client , - 1 ) ;
@ -614,7 +615,7 @@ fatal:
ast_free ( message ) ;
service :
if ( ast_sip_push_task ( NULL , sip_publish_client_service_queue , ao2_bump ( client ) ) ) {
if ( ast_sip_push_task ( client - > serializer , sip_publish_client_service_queue , ao2_bump ( client ) ) ) {
ao2_ref ( client , - 1 ) ;
}
return - 1 ;
@ -656,7 +657,7 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
AST_LIST_INSERT_TAIL ( & client - > queue , message , entry ) ;
res = ast_sip_push_task ( NULL , sip_publish_client_service_queue , ao2_bump ( client ) ) ;
res = ast_sip_push_task ( client - > serializer , sip_publish_client_service_queue , ao2_bump ( client ) ) ;
if ( res ) {
ao2_ref ( client , - 1 ) ;
}
@ -679,16 +680,7 @@ static void sip_outbound_publish_client_destroy(void *obj)
ao2_cleanup ( client - > datastores ) ;
ao2_cleanup ( client - > publish ) ;
ast_free ( client - > transport_name ) ;
/* if unloading the module and all objects have been unpublished
send the signal to finish unloading */
if ( unloading . is_unloading ) {
ast_mutex_lock ( & unloading . lock ) ;
if ( - - unloading . count = = 0 ) {
ast_cond_signal ( & unloading . cond ) ;
}
ast_mutex_unlock ( & unloading . lock ) ;
}
ast_taskprocessor_unreference ( client - > serializer ) ;
}
static int explicit_publish_destroy ( void * data )
@ -718,7 +710,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
/* If the client was never started, there's nothing to unpublish, so just
* destroy the publication and remove its reference to the client .
*/
ast_sip_push_task ( NULL , explicit_publish_destroy , client ) ;
ast_sip_push_task ( client - > serializer , explicit_publish_destroy , client ) ;
return 0 ;
}
@ -728,7 +720,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
}
client - > started = 0 ;
if ( ast_sip_push_task ( NULL , cancel_refresh_timer_task , ao2_bump ( client ) ) ) {
if ( ast_sip_push_task ( client - > serializer , cancel_refresh_timer_task , ao2_bump ( client ) ) ) {
ast_log ( LOG_WARNING , " Could not stop refresh timer on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( client - > publish ) ) ;
ao2_ref ( client , - 1 ) ;
@ -736,7 +728,7 @@ static int cancel_and_unpublish(struct ast_sip_outbound_publish_client *client)
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
if ( ! client - > sending ) {
if ( ast_sip_push_task ( NULL , send_unpublish_task , ao2_bump ( client ) ) ) {
if ( ast_sip_push_task ( client - > serializer , send_unpublish_task , ao2_bump ( client ) ) ) {
ast_log ( LOG_WARNING , " Could not send unpublish message on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( client - > publish ) ) ;
ao2_ref ( client , - 1 ) ;
@ -897,7 +889,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
if ( client - > sending ) {
client - > sending = NULL ;
if ( ! ast_sip_push_task ( NULL , send_unpublish_task , ao2_bump ( client ) ) ) {
if ( ! ast_sip_push_task ( client - > serializer , send_unpublish_task , ao2_bump ( client ) ) ) {
return ;
}
ast_log ( LOG_WARNING , " Could not send unpublish message on outbound publish '%s' \n " ,
@ -981,7 +973,7 @@ end:
ast_free ( message ) ;
}
} else {
if ( ast_sip_push_task ( NULL , sip_publish_client_service_queue , ao2_bump ( client ) ) ) {
if ( ast_sip_push_task ( client - > serializer , sip_publish_client_service_queue , ao2_bump ( client ) ) ) {
ao2_ref ( client , - 1 ) ;
}
}
@ -1053,6 +1045,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
const char * id = ast_sorcery_object_get_id ( publish ) ;
struct ast_sip_outbound_publish_state * state =
ao2_alloc ( sizeof ( * state ) + strlen ( id ) + 1 , sip_outbound_publish_state_destroy ) ;
char tps_name [ AST_TASKPROCESSOR_MAX_NAME + 1 ] ;
if ( ! state ) {
return NULL ;
@ -1070,6 +1063,17 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
return NULL ;
}
/* Create name with seq number appended. */
ast_taskprocessor_build_name ( tps_name , sizeof ( tps_name ) , " pjsip/outpub/%s " ,
ast_sorcery_object_get_id ( publish ) ) ;
state - > client - > serializer = ast_sip_create_serializer_group_named ( tps_name ,
shutdown_group ) ;
if ( ! state - > client - > serializer ) {
ao2_ref ( state , - 1 ) ;
return NULL ;
}
state - > client - > timer . user_data = state - > client ;
state - > client - > timer . cb = sip_outbound_publish_timer_cb ;
state - > client - > transport_name = ast_strdup ( publish - > transport ) ;
@ -1082,7 +1086,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
static int initialize_publish_client ( struct ast_sip_outbound_publish * publish ,
struct ast_sip_outbound_publish_state * state )
{
if ( ast_sip_push_task_synchronous ( NULL , sip_outbound_publish_client_alloc , state - > client ) ) {
if ( ast_sip_push_task_synchronous ( state - > client - > serializer , sip_outbound_publish_client_alloc , state - > client ) ) {
ast_log ( LOG_ERROR , " Unable to create client for outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( publish ) ) ;
return - 1 ;
@ -1219,16 +1223,48 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab
return ast_sip_auth_vector_init ( & publish - > outbound_auths , var - > value ) ;
}
static int unload_module ( void )
{
int remaining ;
ast_sorcery_object_unregister ( ast_sip_get_sorcery ( ) , " outbound-publish " ) ;
ao2_global_obj_release ( current_states ) ;
/* Wait for publication serializers to get destroyed. */
ast_debug ( 2 , " Waiting for publication to complete for unload. \n " ) ;
remaining = ast_serializer_shutdown_group_join ( shutdown_group , MAX_UNLOAD_TIMEOUT_TIME ) ;
if ( remaining ) {
ast_log ( LOG_WARNING , " Unload incomplete. Could not stop %d outbound publications. Try again later. \n " ,
remaining ) ;
return - 1 ;
}
ast_debug ( 2 , " Successful shutdown. \n " ) ;
ao2_cleanup ( shutdown_group ) ;
shutdown_group = NULL ;
return 0 ;
}
static int load_module ( void )
{
CHECK_PJSIP_MODULE_LOADED ( ) ;
shutdown_group = ast_serializer_shutdown_group_alloc ( ) ;
if ( ! shutdown_group ) {
return AST_MODULE_LOAD_FAILURE ;
}
ast_sorcery_apply_config ( ast_sip_get_sorcery ( ) , " res_pjsip_outbound_publish " ) ;
ast_sorcery_apply_default ( ast_sip_get_sorcery ( ) , " outbound-publish " , " config " , " pjsip.conf,criteria=type=outbound-publish " ) ;
if ( ast_sorcery_object_register ( ast_sip_get_sorcery ( ) , " outbound-publish " , sip_outbound_publish_alloc , NULL ,
sip_outbound_publish_apply ) ) {
ast_log ( LOG_ERROR , " Unable to register 'outbound-publish' type with sorcery \n " ) ;
unload_module ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
@ -1264,49 +1300,6 @@ static int reload_module(void)
return 0 ;
}
static int unload_module ( void )
{
struct timeval start = ast_tvnow ( ) ;
struct timespec end = {
. tv_sec = start . tv_sec + 10 ,
. tv_nsec = start . tv_usec * 1000
} ;
int res = 0 ;
struct ao2_container * states = ao2_global_obj_ref ( current_states ) ;
if ( ! states | | ! ( unloading . count = ao2_container_count ( states ) ) ) {
return 0 ;
}
ao2_ref ( states , - 1 ) ;
ast_mutex_init ( & unloading . lock ) ;
ast_cond_init ( & unloading . cond , NULL ) ;
ast_mutex_lock ( & unloading . lock ) ;
unloading . is_unloading = 1 ;
ao2_global_obj_release ( current_states ) ;
/* wait for items to unpublish */
ast_verb ( 5 , " Waiting to complete unpublishing task(s) \n " ) ;
while ( unloading . count & & ! res ) {
res = ast_cond_timedwait ( & unloading . cond , & unloading . lock , & end ) ;
}
ast_mutex_unlock ( & unloading . lock ) ;
ast_mutex_destroy ( & unloading . lock ) ;
ast_cond_destroy ( & unloading . cond ) ;
if ( res ) {
ast_verb ( 5 , " At least %d items were unable to unpublish "
" in the allowed time \n " , unloading . count ) ;
} else {
ast_verb ( 5 , " All items successfully unpublished \n " ) ;
ast_sorcery_object_unregister ( ast_sip_get_sorcery ( ) , " outbound-publish " ) ;
}
return res ;
}
AST_MODULE_INFO ( ASTERISK_GPL_KEY , AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER , " PJSIP Outbound Publish Support " ,
. load = load_module ,
. reload = reload_module ,