@ -32,6 +32,7 @@
# include "asterisk/res_pjsip_outbound_publish.h"
# include "asterisk/module.h"
# include "asterisk/taskprocessor.h"
# include "asterisk/threadpool.h"
# include "asterisk/datastore.h"
/*** DOCUMENTATION
@ -204,6 +205,8 @@ struct sip_outbound_publisher {
struct sip_outbound_publish_message * sending ;
/*! \brief Publish client should be destroyed */
unsigned int destroy ;
/*! \brief Serializer for stuff and things */
struct ast_taskprocessor * serializer ;
/*! \brief User, if any, associated with the publisher */
char user [ 0 ] ;
} ;
@ -242,13 +245,11 @@ AST_RWLOCK_DEFINE_STATIC(load_lock);
AO2_STRING_FIELD_HASH_FN ( sip_outbound_publisher , user ) ;
AO2_STRING_FIELD_CMP_FN ( sip_outbound_publisher , user ) ;
/*! \brief Unloading data */
struct unloading_data {
int is_unloading ;
int count ;
ast_mutex_t lock ;
ast_cond_t cond ;
} unloading ;
/*! Time needs to be long enough for a transaction to timeout if nothing replies. */
# define MAX_UNLOAD_TIMEOUT_TIME 35 /* Seconds */
/*! Shutdown group to monitor sip_outbound_registration_client_state serializers. */
static struct ast_serializer_shutdown_group * shutdown_group ;
/*! \brief Default number of client state container buckets */
# define DEFAULT_STATE_BUCKETS 31
@ -772,7 +773,7 @@ fatal:
ast_free ( message ) ;
service :
if ( ast_sip_push_task ( NULL , sip_publisher_service_queue , ao2_bump ( publisher ) ) ) {
if ( ast_sip_push_task ( publisher - > serializer , sip_publisher_service_queue , ao2_bump ( publisher ) ) ) {
ao2_ref ( publisher , - 1 ) ;
}
return - 1 ;
@ -815,7 +816,7 @@ static int publisher_client_send(void *obj, void *arg, void *data, int flags)
AST_LIST_INSERT_TAIL ( & publisher - > queue , message , entry ) ;
* res = ast_sip_push_task ( NULL , sip_publisher_service_queue , ao2_bump ( publisher ) ) ;
* res = ast_sip_push_task ( publisher - > serializer , sip_publisher_service_queue , ao2_bump ( publisher ) ) ;
if ( * res ) {
ao2_ref ( publisher , - 1 ) ;
}
@ -1008,25 +1009,17 @@ static void sip_outbound_publisher_destroy(void *obj)
}
ao2_cleanup ( publisher - > owner ) ;
ast_free ( publisher - > from_uri ) ;
ast_free ( publisher - > to_uri ) ;
/* if unloading the module and all objects have been unpublished
send the signal to finish unloading */
if ( unloading . is_unloading ) {
ast_mutex_lock ( & unloading . lock ) ;
if ( - - unloading . count = = 0 ) {
ast_cond_signal ( & unloading . cond ) ;
}
ast_mutex_unlock ( & unloading . lock ) ;
}
ast_taskprocessor_unreference ( publisher - > serializer ) ;
}
static struct sip_outbound_publisher * sip_outbound_publisher_alloc (
struct ast_sip_outbound_publish_client * client , const char * user )
{
struct sip_outbound_publisher * publisher ;
char tps_name [ AST_TASKPROCESSOR_MAX_NAME + 1 ] ;
publisher = ao2_alloc ( sizeof ( * publisher ) + ( user ? strlen ( user ) : 0 ) + 1 ,
sip_outbound_publisher_destroy ) ;
@ -1054,6 +1047,16 @@ static struct sip_outbound_publisher *sip_outbound_publisher_alloc(
* publisher - > user = ' \0 ' ;
}
ast_taskprocessor_build_name ( tps_name , sizeof ( tps_name ) , " pjsip/outpub/%s " ,
ast_sorcery_object_get_id ( client - > publish ) ) ;
publisher - > serializer = ast_sip_create_serializer_group ( tps_name ,
shutdown_group ) ;
if ( ! publisher - > serializer ) {
ao2_ref ( publisher , - 1 ) ;
return NULL ;
}
if ( ast_sip_push_task_synchronous ( NULL , sip_outbound_publisher_init , publisher ) ) {
ast_log ( LOG_ERROR , " Unable to create publisher for outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( client - > publish ) ) ;
@ -1079,7 +1082,7 @@ static struct sip_outbound_publisher *sip_outbound_publish_client_add_publisher(
* No need to bump the reference here . The task will take care of
* removing the reference .
*/
if ( ast_sip_push_task ( NULL , cancel_refresh_timer_task , publisher ) ) {
if ( ast_sip_push_task ( publisher - > serializer , cancel_refresh_timer_task , publisher ) ) {
ao2_ref ( publisher , - 1 ) ;
}
return NULL ;
@ -1142,13 +1145,13 @@ static int cancel_and_unpublish(void *obj, void *arg, int flags)
/* If the publisher was never started, there's nothing to unpublish, so just
* destroy the publication and remove its reference to the publisher .
*/
if ( ast_sip_push_task ( NULL , explicit_publish_destroy , ao2_bump ( publisher ) ) ) {
if ( ast_sip_push_task ( publisher - > serializer , explicit_publish_destroy , ao2_bump ( publisher ) ) ) {
ao2_ref ( publisher , - 1 ) ;
}
return 0 ;
}
if ( ast_sip_push_task ( NULL , cancel_refresh_timer_task , ao2_bump ( publisher ) ) ) {
if ( ast_sip_push_task ( publisher - > serializer , cancel_refresh_timer_task , ao2_bump ( publisher ) ) ) {
ast_log ( LOG_WARNING , " Could not stop refresh timer on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( client - > publish ) ) ;
ao2_ref ( publisher , - 1 ) ;
@ -1156,7 +1159,7 @@ static int cancel_and_unpublish(void *obj, void *arg, int flags)
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
if ( ! publisher - > sending ) {
if ( ast_sip_push_task ( NULL , send_unpublish_task , ao2_bump ( publisher ) ) ) {
if ( ast_sip_push_task ( publisher - > serializer , send_unpublish_task , ao2_bump ( publisher ) ) ) {
ast_log ( LOG_WARNING , " Could not send unpublish message on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( client - > publish ) ) ;
ao2_ref ( publisher , - 1 ) ;
@ -1255,7 +1258,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
if ( publisher - > sending ) {
publisher - > sending = NULL ;
if ( ! ast_sip_push_task ( NULL , send_unpublish_task , ao2_bump ( publisher ) ) ) {
if ( ! ast_sip_push_task ( publisher - > serializer , send_unpublish_task , ao2_bump ( publisher ) ) ) {
return ;
}
ast_log ( LOG_WARNING , " Could not send unpublish message on outbound publish '%s' \n " ,
@ -1336,7 +1339,7 @@ end:
ast_free ( message ) ;
}
} else {
if ( ast_sip_push_task ( NULL , sip_publisher_service_queue , ao2_bump ( publisher ) ) ) {
if ( ast_sip_push_task ( publisher - > serializer , sip_publisher_service_queue , ao2_bump ( publisher ) ) ) {
ao2_ref ( publisher , - 1 ) ;
}
}
@ -1431,6 +1434,7 @@ static struct ast_sip_outbound_publish_state *sip_outbound_publish_state_alloc(
ao2_ref ( state , - 1 ) ;
return NULL ;
}
state - > client - > publish = ao2_bump ( publish ) ;
strcpy ( state - > id , id ) ;
@ -1580,6 +1584,32 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab
return ast_sip_auth_vector_init ( & publish - > outbound_auths , var - > value ) ;
}
static int unload_module ( void )
{
int remaining ;
ast_sorcery_object_unregister ( ast_sip_get_sorcery ( ) , " outbound-publish " ) ;
ao2_global_obj_release ( current_states ) ;
/* Wait for publication serializers to get destroyed. */
ast_debug ( 2 , " Waiting for publication to complete for unload. \n " ) ;
remaining = ast_serializer_shutdown_group_join ( shutdown_group , MAX_UNLOAD_TIMEOUT_TIME ) ;
if ( remaining ) {
ast_log ( LOG_WARNING , " Unload incomplete. Could not stop %d outbound publications. Try again later. \n " ,
remaining ) ;
return - 1 ;
}
ast_debug ( 2 , " Successful shutdown. \n " ) ;
ao2_cleanup ( shutdown_group ) ;
shutdown_group = NULL ;
return 0 ;
}
static int load_module ( void )
{
CHECK_PJSIP_MODULE_LOADED ( ) ;
@ -1587,12 +1617,18 @@ static int load_module(void)
/* As of pjproject 2.4.5, PJSIP_MAX_URL_SIZE isn't exposed yet but we try anyway. */
ast_pjproject_get_buildopt ( " PJSIP_MAX_URL_SIZE " , " %d " , & pjsip_max_url_size ) ;
shutdown_group = ast_serializer_shutdown_group_alloc ( ) ;
if ( ! shutdown_group ) {
return AST_MODULE_LOAD_FAILURE ;
}
ast_sorcery_apply_config ( ast_sip_get_sorcery ( ) , " res_pjsip_outbound_publish " ) ;
ast_sorcery_apply_default ( ast_sip_get_sorcery ( ) , " outbound-publish " , " config " , " pjsip.conf,criteria=type=outbound-publish " ) ;
if ( ast_sorcery_object_register ( ast_sip_get_sorcery ( ) , " outbound-publish " , sip_outbound_publish_alloc , NULL ,
sip_outbound_publish_apply ) ) {
ast_log ( LOG_ERROR , " Unable to register 'outbound-publish' type with sorcery \n " ) ;
unload_module ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
@ -1629,63 +1665,6 @@ static int reload_module(void)
return 0 ;
}
static int current_publishing_count ( void * obj , void * arg , int flags )
{
struct ast_sip_outbound_publish_state * state = obj ;
unloading . count + = ao2_container_count ( state - > client - > publishers ) ;
return 0 ;
}
static int unload_module ( void )
{
struct timeval start = ast_tvnow ( ) ;
struct timespec end = {
. tv_sec = start . tv_sec + 10 ,
. tv_nsec = start . tv_usec * 1000
} ;
int res = 0 ;
struct ao2_container * states = ao2_global_obj_ref ( current_states ) ;
if ( ! states ) {
return 0 ;
}
unloading . count = 0 ;
ao2_callback ( states , OBJ_NODATA , current_publishing_count , NULL ) ;
ao2_ref ( states , - 1 ) ;
if ( ! unloading . count ) {
return 0 ;
}
ast_mutex_init ( & unloading . lock ) ;
ast_cond_init ( & unloading . cond , NULL ) ;
ast_mutex_lock ( & unloading . lock ) ;
unloading . is_unloading = 1 ;
ao2_global_obj_release ( current_states ) ;
/* wait for items to unpublish */
ast_verb ( 5 , " Waiting to complete unpublishing task(s) \n " ) ;
while ( unloading . count & & ! res ) {
res = ast_cond_timedwait ( & unloading . cond , & unloading . lock , & end ) ;
}
ast_mutex_unlock ( & unloading . lock ) ;
ast_mutex_destroy ( & unloading . lock ) ;
ast_cond_destroy ( & unloading . cond ) ;
if ( res ) {
ast_verb ( 5 , " At least %d items were unable to unpublish "
" in the allowed time \n " , unloading . count ) ;
} else {
ast_verb ( 5 , " All items successfully unpublished \n " ) ;
ast_sorcery_object_unregister ( ast_sip_get_sorcery ( ) , " outbound-publish " ) ;
}
return res ;
}
AST_MODULE_INFO ( ASTERISK_GPL_KEY , AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER , " PJSIP Outbound Publish Support " ,
. load = load_module ,
. reload = reload_module ,