@ -105,26 +105,6 @@ struct sip_outbound_publish_message {
char body_contents [ 0 ] ;
} ;
/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
struct ast_sip_outbound_publish_client {
/*! \brief Underlying publish client */
pjsip_publishc * client ;
/*! \brief Timer entry for refreshing publish */
pj_timer_entry timer ;
/*! \brief Publisher datastores set up by handlers */
struct ao2_container * datastores ;
/*! \brief The number of auth attempts done */
unsigned int auth_attempts ;
/*! \brief Queue of outgoing publish messages to send*/
AST_LIST_HEAD_NOLOCK ( , sip_outbound_publish_message ) queue ;
/*! \brief The message currently being sent */
struct sip_outbound_publish_message * sending ;
/*! \brief Publish client has been fully started and event type informed */
unsigned int started ;
/*! \brief Publish client should be destroyed */
unsigned int destroy ;
} ;
/*! \brief Outbound publish information */
struct ast_sip_outbound_publish {
/*! \brief Sorcery object details */
@ -148,14 +128,122 @@ struct ast_sip_outbound_publish {
unsigned int max_auth_attempts ;
/*! \brief Configured authentication credentials */
struct ast_sip_auth_vector outbound_auths ;
/*! \brief Outbound publish state */
struct ast_sip_outbound_publish_client * state ;
} ;
AST_RWLIST_HEAD_STATIC ( publisher_handlers , ast_sip_event_publisher_handler ) ;
/*! \brief Outbound publish client state information (persists for lifetime that publish should exist) */
struct ast_sip_outbound_publish_client {
/*! \brief Underlying publish client */
pjsip_publishc * client ;
/*! \brief Timer entry for refreshing publish */
pj_timer_entry timer ;
/*! \brief Publisher datastores set up by handlers */
struct ao2_container * datastores ;
/*! \brief The number of auth attempts done */
unsigned int auth_attempts ;
/*! \brief Queue of outgoing publish messages to send*/
AST_LIST_HEAD_NOLOCK ( , sip_outbound_publish_message ) queue ;
/*! \brief The message currently being sent */
struct sip_outbound_publish_message * sending ;
/*! \brief Publish client has been fully started and event type informed */
unsigned int started ;
/*! \brief Publish client should be destroyed */
unsigned int destroy ;
/*! \brief Outbound publish information */
struct ast_sip_outbound_publish * publish ;
} ;
/*! \brief Outbound publish state information (persists for lifetime of a publish) */
struct ast_sip_outbound_publish_state {
/*! \brief Outbound publish client */
struct ast_sip_outbound_publish_client * client ;
/* publish state id lookup key - same as publish configuration id */
char id [ 0 ] ;
} ;
/*! \brief Container of currently active publish clients */
static AO2_GLOBAL_OBJ_STATIC ( active ) ;
/*! \brief Unloading data */
struct unloading_data {
int is_unloading ;
int count ;
ast_mutex_t lock ;
ast_cond_t cond ;
} unloading ;
/*! \brief Default number of client state container buckets */
# define DEFAULT_STATE_BUCKETS 31
static AO2_GLOBAL_OBJ_STATIC ( current_states ) ;
/*! \brief Used on [re]loads to hold new state data */
static struct ao2_container * new_states ;
/*! \brief hashing function for state objects */
static int outbound_publish_state_hash ( const void * obj , const int flags )
{
const struct ast_sip_outbound_publish_state * object ;
const char * key ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_KEY :
key = obj ;
break ;
case OBJ_SEARCH_OBJECT :
object = obj ;
key = object - > id ;
break ;
default :
ast_assert ( 0 ) ;
return 0 ;
}
return ast_str_hash ( key ) ;
}
/*! \brief comparator function for client objects */
static int outbound_publish_state_cmp ( void * obj , void * arg , int flags )
{
const struct ast_sip_outbound_publish_state * object_left = obj ;
const struct ast_sip_outbound_publish_state * object_right = arg ;
const char * right_key = arg ;
int cmp ;
switch ( flags & OBJ_SEARCH_MASK ) {
case OBJ_SEARCH_OBJECT :
right_key = object_right - > id ;
/* Fall through */
case OBJ_SEARCH_KEY :
cmp = strcmp ( object_left - > id , right_key ) ;
break ;
case OBJ_SEARCH_PARTIAL_KEY :
/* Not supported by container. */
ast_assert ( 0 ) ;
return 0 ;
default :
cmp = 0 ;
break ;
}
if ( cmp ) {
return 0 ;
}
return CMP_MATCH ;
}
static struct ao2_container * get_publishes_and_update_state ( void )
{
struct ao2_container * container ;
container = ast_sorcery_retrieve_by_fields (
ast_sip_get_sorcery ( ) , " outbound-publish " ,
AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL , NULL ) ;
if ( ! new_states ) {
return container ;
}
ao2_global_obj_replace_unref ( current_states , new_states ) ;
ao2_cleanup ( new_states ) ;
new_states = NULL ;
return container ;
}
AST_RWLIST_HEAD_STATIC ( publisher_handlers , ast_sip_event_publisher_handler ) ;
static void sub_add_handler ( struct ast_sip_event_publisher_handler * handler )
{
@ -185,12 +273,13 @@ static void cancel_publish_refresh(struct ast_sip_outbound_publish_client *clien
}
/*! \brief Helper function which sets up the timer to send publication */
static void schedule_publish_refresh ( struct ast_sip_outbound_publish * publish , pjsip_rx_data * rdata )
static void schedule_publish_refresh ( struct ast_sip_outbound_publish _client * client , pjsip_rx_data * rdata )
{
struct ast_sip_outbound_publish * publish = ao2_bump ( client - > publish ) ;
pj_time_val delay = { . sec = 0 , } ;
pjsip_expires_hdr * expires ;
cancel_publish_refresh ( publish- > state ) ;
cancel_publish_refresh ( client ) ;
/* Determine when we should refresh - we favor the Expires header if possible */
expires = pjsip_msg_find_hdr ( rdata - > msg_info . msg , PJSIP_H_EXPIRES , NULL ) ;
@ -204,11 +293,12 @@ static void schedule_publish_refresh(struct ast_sip_outbound_publish *publish, p
delay . sec = PJSIP_PUBLISHC_DELAY_BEFORE_REFRESH ;
}
ao2_ref ( publish- > state , + 1 ) ;
if ( pjsip_endpt_schedule_timer ( ast_sip_get_pjsip_endpoint ( ) , & publish- > state - > timer , & delay ) ! = PJ_SUCCESS ) {
ao2_ref ( client , + 1 ) ;
if ( pjsip_endpt_schedule_timer ( ast_sip_get_pjsip_endpoint ( ) , & client - > timer , & delay ) ! = PJ_SUCCESS ) {
ast_log ( LOG_WARNING , " Failed to pass timed publish refresh to scheduler \n " ) ;
ao2_ref ( publish- > state , - 1 ) ;
ao2_ref ( client , - 1 ) ;
}
ao2_ref ( publish , - 1 ) ;
}
/*! \brief Publish client timer callback function */
@ -229,10 +319,10 @@ static void sip_outbound_publish_timer_cb(pj_timer_heap_t *timer_heap, struct pj
/*! \brief Task for cancelling a refresh timer */
static int cancel_refresh_timer_task ( void * data )
{
struct ast_sip_outbound_publish_client * state = data ;
struct ast_sip_outbound_publish_client * client = data ;
cancel_publish_refresh ( state ) ;
ao2_ref ( state , - 1 ) ;
cancel_publish_refresh ( client ) ;
ao2_ref ( client , - 1 ) ;
return 0 ;
}
@ -240,14 +330,14 @@ static int cancel_refresh_timer_task(void *data)
/*! \brief Task for sending an unpublish */
static int send_unpublish_task ( void * data )
{
struct ast_sip_outbound_publish_client * state = data ;
struct ast_sip_outbound_publish_client * client = data ;
pjsip_tx_data * tdata ;
if ( pjsip_publishc_unpublish ( state - > client , & tdata ) = = PJ_SUCCESS ) {
pjsip_publishc_send ( state - > client , tdata ) ;
if ( pjsip_publishc_unpublish ( client - > client , & tdata ) = = PJ_SUCCESS ) {
pjsip_publishc_send ( client - > client , tdata ) ;
}
ao2_ref ( state , - 1 ) ;
ao2_ref ( client , - 1 ) ;
return 0 ;
}
@ -255,53 +345,70 @@ static int send_unpublish_task(void *data)
/*! \brief Helper function which starts or stops publish clients when applicable */
static void sip_outbound_publish_synchronize ( struct ast_sip_event_publisher_handler * removed )
{
RAII_VAR ( struct ao2_container * , publishes , ast_sorcery_retrieve_by_fields ( ast_sip_get_sorcery ( ) , " outbound-publish " , AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL , NULL ) , ao2_cleanup ) ;
RAII_VAR ( struct ao2_container * , publishes , get_publishes_and_update_state ( ) , ao2_cleanup ) ;
struct ao2_container * states ;
struct ao2_iterator i ;
struct ast_sip_outbound_publish * publish ;
struct ast_sip_outbound_publish _state * state ;
if ( ! publishes ) {
return ;
}
i = ao2_iterator_init ( publishes , 0 ) ;
while ( ( publish = ao2_iterator_next ( & i ) ) ) {
states = ao2_global_obj_ref ( current_states ) ;
if ( ! states ) {
return ;
}
i = ao2_iterator_init ( states , 0 ) ;
while ( ( state = ao2_iterator_next ( & i ) ) ) {
struct ast_sip_outbound_publish * publish = ao2_bump ( state - > client - > publish ) ;
struct ast_sip_event_publisher_handler * handler = find_publisher_handler_for_event_name ( publish - > event ) ;
if ( ! publish - > state - > started ) {
if ( ! state- > client - > started ) {
/* If the publisher client has not yet been started try to start it */
if ( ! handler ) {
ast_debug ( 2 , " Could not find handler for event '%s' for outbound publish client '%s' \n " ,
publish - > event , ast_sorcery_object_get_id ( publish ) ) ;
} else if ( handler - > start_publishing ( publish , publish- > state ) ) {
publish - > event , ast_sorcery_object_get_id ( publish ) ) ;
} else if ( handler - > start_publishing ( publish , state- > client ) ) {
ast_log ( LOG_ERROR , " Failed to start outbound publish with event '%s' for client '%s' \n " ,
publish - > event , ast_sorcery_object_get_id ( publish ) ) ;
} else {
publish- > state - > started = 1 ;
state- > client - > started = 1 ;
}
} else if ( publish- > state - > started & & ! handler & & removed & & ! strcmp ( publish - > event , removed - > event_name ) ) {
} else if ( state- > client - > started & & ! handler & & removed & & ! strcmp ( publish - > event , removed - > event_name ) ) {
/* If the publisher client has been started but it is going away stop it */
removed - > stop_publishing ( publish- > state ) ;
publish- > state - > started = 0 ;
if ( ast_sip_push_task ( NULL , cancel_refresh_timer_task , ao2_bump ( publish- > state ) ) ) {
removed - > stop_publishing ( state- > client ) ;
state- > client - > started = 0 ;
if ( ast_sip_push_task ( NULL , cancel_refresh_timer_task , ao2_bump ( state- > client ) ) ) {
ast_log ( LOG_WARNING , " Could not stop refresh timer on client '%s' \n " ,
ast_sorcery_object_get_id ( publish ) ) ;
ao2_ref ( publish- > state , - 1 ) ;
ao2_ref ( state- > client , - 1 ) ;
}
}
ao2_ref ( publish , - 1 ) ;
ao2_ref ( state , - 1 ) ;
}
ao2_iterator_destroy ( & i ) ;
ao2_ref ( states , - 1 ) ;
}
struct ast_sip_outbound_publish_client * ast_sip_publish_client_get ( const char * name )
{
RAII_VAR ( struct ast_sip_outbound_publish * , publish , ast_sorcery_retrieve_by_id ( ast_sip_get_sorcery ( ) , " outbound-publish " , name ) , ao2_cleanup ) ;
RAII_VAR ( struct ao2_container * , states ,
ao2_global_obj_ref ( current_states ) , ao2_cleanup ) ;
RAII_VAR ( struct ast_sip_outbound_publish_state * , state , NULL , ao2_cleanup ) ;
if ( ! publish ) {
if ( ! states ) {
return NULL ;
}
return ao2_bump ( publish - > state ) ;
state = ao2_find ( states , name , OBJ_SEARCH_KEY ) ;
if ( ! state ) {
return NULL ;
}
ao2_ref ( state - > client , + 1 ) ;
return state - > client ;
}
int ast_sip_register_event_publisher_handler ( struct ast_sip_event_publisher_handler * handler )
@ -351,16 +458,7 @@ void ast_sip_unregister_event_publisher_handler(struct ast_sip_event_publisher_h
static void sip_outbound_publish_destroy ( void * obj )
{
struct ast_sip_outbound_publish * publish = obj ;
SCOPED_LOCK ( lock , & publisher_handlers , AST_RWLIST_RDLOCK , AST_RWLIST_UNLOCK ) ;
struct ast_sip_event_publisher_handler * handler = find_publisher_handler_for_event_name ( publish - > event ) ;
if ( handler ) {
handler - > stop_publishing ( publish - > state ) ;
}
if ( publish - > state ) {
cancel_publish_refresh ( publish - > state ) ;
ao2_ref ( publish - > state , - 1 ) ;
}
ast_sip_auth_vector_destroy ( & publish - > outbound_auths ) ;
ast_string_field_free_memory ( publish ) ;
@ -538,19 +636,72 @@ int ast_sip_publish_client_send(struct ast_sip_outbound_publish_client *client,
return res ;
}
/*! \brief Destructor function for publish state */
/*! \brief Destructor function for publish client */
static void sip_outbound_publish_client_destroy ( void * obj )
{
struct ast_sip_outbound_publish_client * state = obj ;
struct ast_sip_outbound_publish_client * client = obj ;
struct sip_outbound_publish_message * message ;
/* You might be tempted to think "the publish client isn't being destroyed" but it actually is - just elsewhere */
while ( ( message = AST_LIST_REMOVE_HEAD ( & state - > queue , entry ) ) ) {
while ( ( message = AST_LIST_REMOVE_HEAD ( & client - > queue , entry ) ) ) {
ast_free ( message ) ;
}
ao2_cleanup ( state - > datastores ) ;
ao2_cleanup ( client - > datastores ) ;
ao2_cleanup ( client - > publish ) ;
/* if unloading the module and all objects have been unpublished
send the signal to finish unloading */
if ( unloading . is_unloading ) {
ast_mutex_lock ( & unloading . lock ) ;
if ( - - unloading . count = = 0 ) {
ast_cond_signal ( & unloading . cond ) ;
}
ast_mutex_unlock ( & unloading . lock ) ;
}
}
/*! \brief Helper function which cancels and un-publishes a no longer used client */
static int cancel_and_unpublish ( struct ast_sip_outbound_publish_client * client )
{
SCOPED_AO2LOCK ( lock , client ) ;
/* If this publish client is currently publishing stop and terminate any refresh timer */
if ( client - > started ) {
struct ast_sip_event_publisher_handler * handler = find_publisher_handler_for_event_name ( client - > publish - > event ) ;
if ( handler ) {
handler - > stop_publishing ( client ) ;
}
client - > started = 0 ;
if ( ast_sip_push_task ( NULL , cancel_refresh_timer_task , ao2_bump ( client ) ) ) {
ast_log ( LOG_WARNING , " Could not stop refresh timer on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( client - > publish ) ) ;
ao2_ref ( client , - 1 ) ;
}
}
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
if ( ! client - > sending ) {
if ( ast_sip_push_task ( NULL , send_unpublish_task , ao2_bump ( client ) ) ) {
ast_log ( LOG_WARNING , " Could not send unpublish message on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( client - > publish ) ) ;
ao2_ref ( client , - 1 ) ;
}
}
client - > destroy = 1 ;
return 0 ;
}
/*! \brief Destructor function for publish state */
static void sip_outbound_publish_state_destroy ( void * obj )
{
struct ast_sip_outbound_publish_state * state = obj ;
cancel_and_unpublish ( state - > client ) ;
ao2_cleanup ( state - > client ) ;
}
/*!
@ -588,35 +739,38 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param);
/*! \brief Helper function that allocates a pjsip publish client and configures it */
static int sip_outbound_publish_client_alloc ( void * data )
{
struct ast_sip_outbound_publish * publish = data ;
struct ast_sip_outbound_publish_client * client = data ;
RAII_VAR ( struct ast_sip_outbound_publish * , publish , NULL , ao2_cleanup ) ;
pjsip_publishc_opt opt = {
. queue_request = PJ_FALSE ,
} ;
pj_str_t event , server_uri , to_uri , from_uri ;
pj_status_t status ;
if ( publish- > state - > client ) {
if ( client - > client ) {
return 0 ;
} else if ( pjsip_publishc_create ( ast_sip_get_pjsip_endpoint ( ) , & opt , ao2_bump ( publish ) , sip_outbound_publish_callback ,
& publish- > state - > client ) ! = PJ_SUCCESS ) {
ao2_ref ( publish , - 1 ) ;
} else if ( pjsip_publishc_create ( ast_sip_get_pjsip_endpoint ( ) , & opt , ao2_bump ( client ) , sip_outbound_publish_callback ,
& client - > client ) ! = PJ_SUCCESS ) {
ao2_ref ( client , - 1 ) ;
return - 1 ;
}
publish = ao2_bump ( client - > publish ) ;
if ( ! ast_strlen_zero ( publish - > outbound_proxy ) ) {
pjsip_route_hdr route_set , * route ;
static const pj_str_t ROUTE_HNAME = { " Route " , 5 } ;
pj_list_init ( & route_set ) ;
if ( ! ( route = pjsip_parse_hdr ( pjsip_publishc_get_pool ( publish- > state - > client ) , & ROUTE_HNAME ,
if ( ! ( route = pjsip_parse_hdr ( pjsip_publishc_get_pool ( client - > client ) , & ROUTE_HNAME ,
( char * ) publish - > outbound_proxy , strlen ( publish - > outbound_proxy ) , NULL ) ) ) {
pjsip_publishc_destroy ( publish- > state - > client ) ;
pjsip_publishc_destroy ( client - > client ) ;
return - 1 ;
}
pj_list_insert_nodes_before ( & route_set , route ) ;
pjsip_publishc_set_route_set ( publish- > state - > client , & route_set ) ;
pjsip_publishc_set_route_set ( client - > client , & route_set ) ;
}
pj_cstr ( & event , publish - > event ) ;
@ -624,7 +778,7 @@ static int sip_outbound_publish_client_alloc(void *data)
pj_cstr ( & to_uri , S_OR ( publish - > to_uri , publish - > server_uri ) ) ;
pj_cstr ( & from_uri , S_OR ( publish - > from_uri , publish - > server_uri ) ) ;
status = pjsip_publishc_init ( publish- > state - > client , & event , & server_uri , & from_uri , & to_uri ,
status = pjsip_publishc_init ( client - > client , & event , & server_uri , & from_uri , & to_uri ,
publish - > expiration ) ;
if ( status = = PJSIP_EINVALIDURI ) {
pj_pool_t * pool ;
@ -635,7 +789,7 @@ static int sip_outbound_publish_client_alloc(void *data)
if ( ! pool ) {
ast_log ( LOG_ERROR , " Could not create pool for URI validation on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( publish ) ) ;
pjsip_publishc_destroy ( publish- > state - > client ) ;
pjsip_publishc_destroy ( client - > client ) ;
return - 1 ;
}
@ -665,10 +819,10 @@ static int sip_outbound_publish_client_alloc(void *data)
}
pjsip_endpt_release_pool ( ast_sip_get_pjsip_endpoint ( ) , pool ) ;
pjsip_publishc_destroy ( publish- > state - > client ) ;
pjsip_publishc_destroy ( client - > client ) ;
return - 1 ;
} else if ( status ! = PJ_SUCCESS ) {
pjsip_publishc_destroy ( publish- > state - > client ) ;
pjsip_publishc_destroy ( client - > client ) ;
return - 1 ;
}
@ -678,51 +832,52 @@ static int sip_outbound_publish_client_alloc(void *data)
/*! \brief Callback function for publish client responses */
static void sip_outbound_publish_callback ( struct pjsip_publishc_cbparam * param )
{
RAII_VAR ( struct ast_sip_outbound_publish * , publish , ao2_bump ( param - > token ) , ao2_cleanup ) ;
SCOPED_AO2LOCK ( lock , publish - > state ) ;
RAII_VAR ( struct ast_sip_outbound_publish_client * , client , ao2_bump ( param - > token ) , ao2_cleanup ) ;
RAII_VAR ( struct ast_sip_outbound_publish * , publish , ao2_bump ( client - > publish ) , ao2_cleanup ) ;
SCOPED_AO2LOCK ( lock , client ) ;
pjsip_tx_data * tdata ;
if ( publish - > state - > destroy ) {
if ( publish - > state - > sending ) {
publish - > state - > sending = NULL ;
if ( ! ast_sip_push_task ( NULL , send_unpublish_task , ao2_bump ( publish - > state ) ) ) {
if ( client - > destroy ) {
if ( client - > sending ) {
client - > sending = NULL ;
if ( ! ast_sip_push_task ( NULL , send_unpublish_task , ao2_bump ( client ) ) ) {
return ;
}
ast_log ( LOG_WARNING , " Could not send unpublish message on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( publish ) ) ;
ao2_ref ( publish- > state , - 1 ) ;
ao2_ref ( client , - 1 ) ;
}
/* Once the destroy is called this callback will not get called any longer, so drop the publish ref */
pjsip_publishc_destroy ( publish- > state - > client ) ;
ao2_ref ( publish , - 1 ) ;
/* Once the destroy is called this callback will not get called any longer, so drop the client ref */
pjsip_publishc_destroy ( client - > client ) ;
ao2_ref ( client , - 1 ) ;
return ;
}
if ( param - > code = = 401 | | param - > code = = 407 ) {
if ( ! ast_sip_create_request_with_auth ( & publish - > outbound_auths ,
param - > rdata , pjsip_rdata_get_tsx ( param - > rdata ) , & tdata ) ) {
pjsip_publishc_send ( publish- > state - > client , tdata ) ;
pjsip_publishc_send ( client - > client , tdata ) ;
}
publish- > state - > auth_attempts + + ;
client - > auth_attempts + + ;
if ( publish- > state - > auth_attempts = = publish - > max_auth_attempts ) {
pjsip_publishc_destroy ( publish- > state - > client ) ;
publish- > state - > client = NULL ;
if ( client - > auth_attempts = = publish - > max_auth_attempts ) {
pjsip_publishc_destroy ( client - > client ) ;
client - > client = NULL ;
ast_log ( LOG_ERROR , " Reached maximum number of PUBLISH authentication attempts on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( publish ) ) ;
goto end ;
}
return ;
}
publish- > state - > auth_attempts = 0 ;
client - > auth_attempts = 0 ;
if ( param - > code = = 412 ) {
pjsip_publishc_destroy ( publish- > state - > client ) ;
publish- > state - > client = NULL ;
pjsip_publishc_destroy ( client - > client ) ;
client - > client = NULL ;
if ( sip_outbound_publish_client_alloc ( publish ) ) {
ast_log ( LOG_ERROR , " Failed to create a new outbound publish client for '%s' on 412 response \n " ,
@ -731,7 +886,7 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
}
/* Setting this to NULL will cause a new PUBLISH to get created and sent for the same underlying body */
publish- > state - > sending = NULL ;
client - > sending = NULL ;
} else if ( param - > code = = 423 ) {
/* Update the expiration with the new expiration time if available */
pjsip_expires_hdr * expires ;
@ -740,34 +895,34 @@ static void sip_outbound_publish_callback(struct pjsip_publishc_cbparam *param)
if ( ! expires | | ! expires - > ivalue ) {
ast_log ( LOG_ERROR , " Received 423 response on outbound publish '%s' without a Min-Expires header \n " ,
ast_sorcery_object_get_id ( publish ) ) ;
pjsip_publishc_destroy ( publish- > state - > client ) ;
publish- > state - > client = NULL ;
pjsip_publishc_destroy ( client - > client ) ;
client - > client = NULL ;
goto end ;
}
pjsip_publishc_update_expires ( publish- > state - > client , expires - > ivalue ) ;
publish- > state - > sending = NULL ;
} else if ( publish- > state - > sending ) {
pjsip_publishc_update_expires ( client - > client , expires - > ivalue ) ;
client - > sending = NULL ;
} else if ( client - > sending ) {
/* Remove the message currently being sent so that when the queue is serviced another will get sent */
AST_LIST_REMOVE_HEAD ( & publish- > state - > queue , entry ) ;
ast_free ( publish- > state - > sending ) ;
publish- > state - > sending = NULL ;
AST_LIST_REMOVE_HEAD ( & client - > queue , entry ) ;
ast_free ( client - > sending ) ;
client - > sending = NULL ;
}
if ( AST_LIST_EMPTY ( & publish- > state - > queue ) ) {
schedule_publish_refresh ( publish , param - > rdata ) ;
if ( AST_LIST_EMPTY ( & client - > queue ) ) {
schedule_publish_refresh ( client , param - > rdata ) ;
}
end :
if ( ! publish- > state - > client ) {
if ( ! client - > client ) {
struct sip_outbound_publish_message * message ;
while ( ( message = AST_LIST_REMOVE_HEAD ( & publish- > state - > queue , entry ) ) ) {
while ( ( message = AST_LIST_REMOVE_HEAD ( & client - > queue , entry ) ) ) {
ast_free ( message ) ;
}
} else {
if ( ast_sip_push_task ( NULL , sip_publish_client_service_queue , ao2_bump ( publish- > state ) ) ) {
ao2_ref ( publish- > state , - 1 ) ;
if ( ast_sip_push_task ( NULL , sip_publish_client_service_queue , ao2_bump ( client ) ) ) {
ao2_ref ( client , - 1 ) ;
}
}
}
@ -831,31 +986,43 @@ static int datastore_cmp(void *obj, void *arg, int flags)
return CMP_MATCH ;
}
/*! \brief Allocator function for publish client state */
static struct ast_sip_outbound_publish_client * sip_outbound_publish_state_alloc ( void )
/*! \brief Allocator function for publish client */
static struct ast_sip_outbound_publish_state * sip_outbound_publish_state_alloc (
struct ast_sip_outbound_publish * publish )
{
struct ast_sip_outbound_publish_client * state = ao2_alloc ( sizeof ( * state ) , sip_outbound_publish_client_destroy ) ;
const char * id = ast_sorcery_object_get_id ( publish ) ;
struct ast_sip_outbound_publish_state * state =
ao2_alloc ( sizeof ( * state ) + strlen ( id ) + 1 , sip_outbound_publish_state_destroy ) ;
if ( ! state ) {
return NULL ;
}
state - > datastores = ao2_container_alloc ( DATASTORE_BUCKETS , datastore_hash , datastore_cmp ) ;
if ( ! state - > datastores ) {
state - > client = ao2_alloc ( sizeof ( * state - > client ) , sip_outbound_publish_client_destroy ) ;
if ( ! state - > client ) {
ao2_ref ( state , - 1 ) ;
return NULL ;
}
state - > client - > datastores = ao2_container_alloc ( DATASTORE_BUCKETS , datastore_hash , datastore_cmp ) ;
if ( ! state - > client - > datastores ) {
ao2_ref ( state , - 1 ) ;
return NULL ;
}
state - > timer . user_data = state ;
state - > timer . cb = sip_outbound_publish_timer_cb ;
state - > client - > timer . user_data = state - > client ;
state - > client - > timer . cb = sip_outbound_publish_timer_cb ;
state - > client - > publish = ao2_bump ( publish ) ;
strcpy ( state - > id , id ) ;
return state ;
}
/*! \brief Apply function which finds or allocates a state structure */
static int sip_outbound_publish_apply ( const struct ast_sorcery * sorcery , void * obj )
{
RAII_VAR ( struct ast_sip_outbound_publish * , existing , ast_sorcery_retrieve_by_id ( sorcery , " outbound-publish " , ast_sorcery_object_get_id ( obj ) ) , ao2_cleanup ) ;
RAII_VAR ( struct ao2_container * , states , ao2_global_obj_ref ( current_states ) , ao2_cleanup ) ;
RAII_VAR ( struct ast_sip_outbound_publish_state * , state , NULL , ao2_cleanup ) ;
struct ast_sip_outbound_publish * applied = obj ;
if ( ast_strlen_zero ( applied - > server_uri ) ) {
@ -868,24 +1035,47 @@ static int sip_outbound_publish_apply(const struct ast_sorcery *sorcery, void *o
return - 1 ;
}
if ( ! existing ) {
/* If no existing publish exists we can just start fresh easily */
applied - > state = sip_outbound_publish_state_alloc ( ) ;
} else {
/* If there is an existing publish things are more complicated, we can immediately reuse this state if most stuff remains unchanged */
if ( can_reuse_publish ( existing , applied ) ) {
applied - > state = existing - > state ;
ao2_ref ( applied - > state , + 1 ) ;
} else {
applied - > state = sip_outbound_publish_state_alloc ( ) ;
if ( ! new_states ) {
/* make sure new_states has been allocated as we will be adding to it */
new_states = ao2_container_alloc_options (
AO2_ALLOC_OPT_LOCK_NOLOCK , DEFAULT_STATE_BUCKETS ,
outbound_publish_state_hash , outbound_publish_state_cmp ) ;
if ( ! new_states ) {
ast_log ( LOG_ERROR , " Unable to allocate new states container \n " ) ;
return - 1 ;
}
}
if ( states ) {
state = ao2_find ( states , ast_sorcery_object_get_id ( obj ) , OBJ_SEARCH_KEY ) ;
if ( state ) {
if ( can_reuse_publish ( state - > client - > publish , applied ) ) {
ao2_replace ( state - > client - > publish , applied ) ;
} else {
ao2_ref ( state , - 1 ) ;
state = NULL ;
}
}
}
if ( ! applied - > state ) {
if ( ! state ) {
state = sip_outbound_publish_state_alloc ( applied ) ;
if ( ! state ) {
ast_log ( LOG_ERROR , " Unable to create state for outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( applied ) ) ;
return - 1 ;
} ;
}
if ( ast_sip_push_task_synchronous ( NULL , sip_outbound_publish_client_alloc , state - > client ) ) {
ast_log ( LOG_ERROR , " Unable to create client for outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( applied ) ) ;
return - 1 ;
}
return ast_sip_push_task_synchronous ( NULL , sip_outbound_publish_client_alloc , applied ) ;
ao2_link ( new_states , state ) ;
return 0 ;
}
static int outbound_auth_handler ( const struct aco_option * opt , struct ast_variable * var , void * obj )
@ -895,74 +1085,9 @@ static int outbound_auth_handler(const struct aco_option *opt, struct ast_variab
return ast_sip_auth_vector_init ( & publish - > outbound_auths , var - > value ) ;
}
/*! \brief Helper function which prunes old publish clients */
static void prune_publish_clients ( const char * object_type )
{
struct ao2_container * old , * current ;
old = ao2_global_obj_ref ( active ) ;
if ( old ) {
struct ao2_iterator i ;
struct ast_sip_outbound_publish * existing ;
i = ao2_iterator_init ( old , 0 ) ;
for ( ; ( existing = ao2_iterator_next ( & i ) ) ; ao2_ref ( existing , - 1 ) ) {
struct ast_sip_outbound_publish * created ;
created = ast_sorcery_retrieve_by_id ( ast_sip_get_sorcery ( ) , " outbound-publish " ,
ast_sorcery_object_get_id ( existing ) ) ;
if ( created ) {
if ( created - > state = = existing - > state ) {
ao2_ref ( created , - 1 ) ;
continue ;
}
ao2_ref ( created , - 1 ) ;
}
ao2_lock ( existing - > state ) ;
/* If this publish client is currently publishing stop and terminate any refresh timer */
if ( existing - > state - > started ) {
struct ast_sip_event_publisher_handler * handler = find_publisher_handler_for_event_name ( existing - > event ) ;
if ( handler ) {
handler - > stop_publishing ( existing - > state ) ;
}
if ( ast_sip_push_task ( NULL , cancel_refresh_timer_task , ao2_bump ( existing - > state ) ) ) {
ast_log ( LOG_WARNING , " Could not stop refresh timer on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( existing ) ) ;
ao2_ref ( existing - > state , - 1 ) ;
}
}
/* If nothing is being sent right now send the unpublish - the destroy will happen in the subsequent callback */
if ( ! existing - > state - > sending ) {
if ( ast_sip_push_task ( NULL , send_unpublish_task , ao2_bump ( existing - > state ) ) ) {
ast_log ( LOG_WARNING , " Could not send unpublish message on outbound publish '%s' \n " ,
ast_sorcery_object_get_id ( existing ) ) ;
ao2_ref ( existing - > state , - 1 ) ;
}
}
existing - > state - > destroy = 1 ;
ao2_unlock ( existing - > state ) ;
}
ao2_iterator_destroy ( & i ) ;
ao2_ref ( old , - 1 ) ;
}
current = ast_sorcery_retrieve_by_fields ( ast_sip_get_sorcery ( ) , " outbound-publish " , AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL , NULL ) ;
ao2_global_obj_replace_unref ( active , current ) ;
}
static struct ast_sorcery_observer outbound_publish_observer = {
. loaded = prune_publish_clients ,
} ;
static int load_module ( void )
{
ast_sorcery_apply_config ( ast_sip_get_sorcery ( ) , " res_pjsip_outbound_publish " ) ;
ast_sorcery_apply_default ( ast_sip_get_sorcery ( ) , " outbound-publish " , " config " , " pjsip.conf,criteria=type=outbound-publish " ) ;
if ( ast_sorcery_object_register ( ast_sip_get_sorcery ( ) , " outbound-publish " , sip_outbound_publish_alloc , NULL ,
@ -970,8 +1095,6 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE ;
}
ast_sorcery_observer_add ( ast_sip_get_sorcery ( ) , " outbound-publish " , & outbound_publish_observer ) ;
ast_sorcery_object_field_register ( ast_sip_get_sorcery ( ) , " outbound-publish " , " type " , " " , OPT_NOOP_T , 0 , 0 ) ;
ast_sorcery_object_field_register ( ast_sip_get_sorcery ( ) , " outbound-publish " , " server_uri " , " " , OPT_STRINGFIELD_T , 0 , STRFLDSET ( struct ast_sip_outbound_publish , server_uri ) ) ;
ast_sorcery_object_field_register ( ast_sip_get_sorcery ( ) , " outbound-publish " , " from_uri " , " " , OPT_STRINGFIELD_T , 0 , STRFLDSET ( struct ast_sip_outbound_publish , from_uri ) ) ;
@ -981,6 +1104,7 @@ static int load_module(void)
ast_sorcery_object_field_register ( ast_sip_get_sorcery ( ) , " outbound-publish " , " expiration " , " 3600 " , OPT_UINT_T , 0 , FLDSET ( struct ast_sip_outbound_publish , expiration ) ) ;
ast_sorcery_object_field_register ( ast_sip_get_sorcery ( ) , " outbound-publish " , " max_auth_attempts " , " 5 " , OPT_UINT_T , 0 , FLDSET ( struct ast_sip_outbound_publish , max_auth_attempts ) ) ;
ast_sorcery_object_field_register_custom ( ast_sip_get_sorcery ( ) , " outbound-publish " , " outbound_auth " , " " , outbound_auth_handler , NULL , NULL , 0 , 0 ) ;
ast_sorcery_reload_object ( ast_sip_get_sorcery ( ) , " outbound-publish " ) ;
AST_RWLIST_RDLOCK ( & publisher_handlers ) ;
@ -1004,7 +1128,44 @@ static int reload_module(void)
static int unload_module ( void )
{
return 0 ;
struct timeval start = ast_tvnow ( ) ;
struct timespec end = {
. tv_sec = start . tv_sec + 10 ,
. tv_nsec = start . tv_usec * 1000
} ;
int res = 0 ;
struct ao2_container * states = ao2_global_obj_ref ( current_states ) ;
if ( ! states | | ! ( unloading . count = ao2_container_count ( states ) ) ) {
return 0 ;
}
ao2_ref ( states , - 1 ) ;
ast_mutex_init ( & unloading . lock ) ;
ast_cond_init ( & unloading . cond , NULL ) ;
ast_mutex_lock ( & unloading . lock ) ;
unloading . is_unloading = 1 ;
ao2_global_obj_release ( current_states ) ;
/* wait for items to unpublish */
ast_verb ( 5 , " Waiting to complete unpublishing task(s) \n " ) ;
while ( unloading . count ) {
res = ast_cond_timedwait ( & unloading . cond , & unloading . lock , & end ) ;
}
ast_mutex_unlock ( & unloading . lock ) ;
ast_mutex_destroy ( & unloading . lock ) ;
ast_cond_destroy ( & unloading . cond ) ;
if ( res ) {
ast_verb ( 5 , " At least %d items were unable to unpublish "
" in the allowed time \n " , unloading . count ) ;
} else {
ast_verb ( 5 , " All items successfully unpublished \n " ) ;
}
return res ;
}
AST_MODULE_INFO ( ASTERISK_GPL_KEY , AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER , " PJSIP Outbound Publish Support " ,