@ -83,9 +83,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
# define MINIMUM_RTP_PORT 1024 /*!< Minimum port number to accept */
# define MAXIMUM_RTP_PORT 65535 /*!< Maximum port number to accept */
# define DEFAULT_TURN_PORT 3478 0
# define DEFAULT_TURN_PORT 3478
# define TURN_ ALLOCATION _WAIT_TIME 2000
# define TURN_ STATE _WAIT_TIME 2000
# define RTCP_PT_FUR 192
# define RTCP_PT_SR AST_RTP_RTCP_SR
@ -149,20 +149,39 @@ static pj_str_t turnpassword;
/*! \brief Pool factory used by pjlib to allocate memory. */
static pj_caching_pool cachingpool ;
/*! \brief Pool used by pjlib functions which require memory allocation. */
/*! \brief Global memory pool for configuration and timers */
static pj_pool_t * pool ;
/*! \brief I/O queue for TURN relay traffic */
static pj_ ioqueue_t * ioqueue ;
/*! \brief Global timer heap */
static pj_ timer_heap_t * timer_heap ;
/*! \brief T imer heap for ICE and TURN stuff */
static pj_t imer_heap_t * timerheap ;
/*! \brief T hread executing the t imer heap */
static pj_t hread_t * timer_thread ;
/*! \brief Worker thread for ICE/TURN */
static pj_thread_t * thread ;
/*! \brief Used to tell the timer thread to terminate */
static int timer_terminate ;
/*! \brief Structure which contains ioqueue thread information */
struct ast_rtp_ioqueue_thread {
/*! \brief Pool used by the thread */
pj_pool_t * pool ;
/*! \brief The thread handling the queue and timer heap */
pj_thread_t * thread ;
/*! \brief Ioqueue which polls on sockets */
pj_ioqueue_t * ioqueue ;
/*! \brief Timer heap for scheduled items */
pj_timer_heap_t * timerheap ;
/*! \brief Termination request */
int terminate ;
/*! \brief Current number of descriptors being waited on */
unsigned int count ;
/*! \brief Linked list information */
AST_LIST_ENTRY ( ast_rtp_ioqueue_thread ) next ;
} ;
/*! \brief List of ioqueue threads */
static AST_LIST_HEAD_STATIC ( ioqueues , ast_rtp_ioqueue_thread ) ;
/*! \brief Notification that the ICE/TURN worker thread should stop */
static int worker_terminate ;
# endif
# define FLAG_3389_WARNING (1 << 0)
@ -172,10 +191,10 @@ static int worker_terminate;
# define FLAG_NEED_MARKER_BIT (1 << 3)
# define FLAG_DTMF_COMPENSATE (1 << 4)
# define TRANSPORT_SOCKET_RTP 1
# define TRANSPORT_SOCKET_RTCP 2
# define TRANSPORT_TURN_RTP 3
# define TRANSPORT_TURN_RTCP 4
# define TRANSPORT_SOCKET_RTP 0
# define TRANSPORT_SOCKET_RTCP 1
# define TRANSPORT_TURN_RTP 2
# define TRANSPORT_TURN_RTCP 3
/*! \brief RTP learning mode tracking information */
struct rtp_learning_info {
@ -276,7 +295,13 @@ struct ast_rtp {
pj_turn_sock * turn_rtcp ; /*!< RTCP TURN relay */
pj_turn_state_t turn_state ; /*!< Current state of the TURN relay session */
unsigned int passthrough : 1 ; /*!< Bit to indicate that the received packet should be passed through */
unsigned int rtp_passthrough : 1 ; /*!< Bit to indicate that TURN RTP should be passed through */
unsigned int rtcp_passthrough : 1 ; /*!< Bit to indicate that TURN RTCP should be passed through */
unsigned int ice_port ; /*!< Port that ICE was started with if it was previously started */
struct ast_sockaddr rtp_loop ; /*!< Loopback address for forwarding RTP from TURN */
struct ast_sockaddr rtcp_loop ; /*!< Loopback address for forwarding RTCP from TURN */
struct ast_rtp_ioqueue_thread * ioqueue ; /*!< The ioqueue thread handling us */
char remote_ufrag [ 256 ] ; /*!< The remote ICE username */
char remote_passwd [ 256 ] ; /*!< The remote ICE password */
@ -423,10 +448,11 @@ static void dtls_srtp_check_pending(struct ast_rtp_instance *instance, struct as
static int __rtp_sendto ( struct ast_rtp_instance * instance , void * buf , size_t size , int flags , struct ast_sockaddr * sa , int rtcp , int * ice , int use_srtp ) ;
# ifdef HAVE_PJPROJECT
/*! \brief Helper function which updates an ast_sockaddr with the candidate used for the component */
static void update_address_with_ice_candidate ( struct ast_rtp * rtp , int component , struct ast_sockaddr * cand_address )
static void update_address_with_ice_candidate ( struct ast_rtp * rtp , enum ast_rtp_ice_component_type component ,
struct ast_sockaddr * cand_address )
{
# ifdef HAVE_PJPROJECT
char address [ PJ_INET6_ADDRSTRLEN ] ;
if ( ! rtp - > ice | | ( component < 1 ) | | ! rtp - > ice - > comp [ component - 1 ] . valid_check ) {
@ -435,10 +461,20 @@ static void update_address_with_ice_candidate(struct ast_rtp *rtp, int component
ast_sockaddr_parse ( cand_address , pj_sockaddr_print ( & rtp - > ice - > comp [ component - 1 ] . valid_check - > rcand - > addr , address , sizeof ( address ) , 0 ) , 0 ) ;
ast_sockaddr_set_port ( cand_address , pj_sockaddr_get_port ( & rtp - > ice - > comp [ component - 1 ] . valid_check - > rcand - > addr ) ) ;
# endif
}
# ifdef HAVE_PJPROJECT
/*! \brief Helper function which sets up channel binding on a TURN session if applicable */
static void turn_enable_bind_channel ( struct ast_rtp * rtp , pj_turn_sock * turn , int component , int transport )
{
if ( ! rtp - > ice | | ! turn | | ( component < 1 ) | | ! rtp - > ice - > comp [ component - 1 ] . valid_check | |
( rtp - > ice - > comp [ component - 1 ] . valid_check - > lcand - > transport_id ! = transport ) ) {
return ;
}
pj_turn_sock_bind_channel ( turn , & rtp - > ice - > comp [ component - 1 ] . valid_check - > rcand - > addr ,
sizeof ( rtp - > ice - > comp [ component - 1 ] . valid_check - > rcand - > addr ) ) ;
}
/*! \brief Destructor for locally created ICE candidates */
static void ast_rtp_ice_candidate_destroy ( void * obj )
{
@ -669,7 +705,7 @@ static void ast_rtp_ice_start(struct ast_rtp_instance *instance)
if ( pj_ice_sess_create_check_list ( rtp - > ice , & ufrag , & passwd , ao2_container_count ( rtp - > ice_active_remote_candidates ) , & candidates [ 0 ] ) = = PJ_SUCCESS ) {
ast_test_suite_event_notify ( " ICECHECKLISTCREATE " , " Result: SUCCESS " ) ;
pj_ice_sess_start_check ( rtp - > ice ) ;
pj_timer_heap_poll ( timer heap, NULL ) ;
pj_timer_heap_poll ( timer _ heap, NULL ) ;
rtp - > strict_rtp_state = STRICT_RTP_OPEN ;
return ;
}
@ -795,6 +831,335 @@ static void ast_rtp_ice_add_cand(struct ast_rtp *rtp, unsigned comp_id, unsigned
ao2_ref ( candidate , - 1 ) ;
}
static void ast_rtp_on_turn_rx_rtp_data ( pj_turn_sock * turn_sock , void * pkt , unsigned pkt_len , const pj_sockaddr_t * peer_addr , unsigned addr_len )
{
struct ast_rtp_instance * instance = pj_turn_sock_get_user_data ( turn_sock ) ;
struct ast_rtp * rtp = ast_rtp_instance_get_data ( instance ) ;
pj_status_t status ;
status = pj_ice_sess_on_rx_pkt ( rtp - > ice , AST_RTP_ICE_COMPONENT_RTP , TRANSPORT_TURN_RTP , pkt , pkt_len , peer_addr ,
addr_len ) ;
if ( status ! = PJ_SUCCESS ) {
char buf [ 100 ] ;
pj_strerror ( status , buf , sizeof ( buf ) ) ;
ast_log ( LOG_WARNING , " PJ ICE Rx error status code: %d '%s'. \n " ,
( int ) status , buf ) ;
return ;
}
if ( ! rtp - > rtp_passthrough ) {
return ;
}
rtp - > rtp_passthrough = 0 ;
ast_sendto ( rtp - > s , pkt , pkt_len , 0 , & rtp - > rtp_loop ) ;
}
static void ast_rtp_on_turn_rtp_state ( pj_turn_sock * turn_sock , pj_turn_state_t old_state , pj_turn_state_t new_state )
{
struct ast_rtp_instance * instance = pj_turn_sock_get_user_data ( turn_sock ) ;
struct ast_rtp * rtp ;
/* If this is a leftover from an already notified RTP instance just ignore the state change */
if ( ! instance ) {
return ;
}
rtp = ast_rtp_instance_get_data ( instance ) ;
/* We store the new state so the other thread can actually handle it */
ast_mutex_lock ( & rtp - > lock ) ;
rtp - > turn_state = new_state ;
ast_cond_signal ( & rtp - > cond ) ;
if ( new_state = = PJ_TURN_STATE_DESTROYING ) {
pj_turn_sock_set_user_data ( rtp - > turn_rtp , NULL ) ;
rtp - > turn_rtp = NULL ;
}
ast_mutex_unlock ( & rtp - > lock ) ;
}
/* RTP TURN Socket interface declaration */
static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
. on_rx_data = ast_rtp_on_turn_rx_rtp_data ,
. on_state = ast_rtp_on_turn_rtp_state ,
} ;
static void ast_rtp_on_turn_rx_rtcp_data ( pj_turn_sock * turn_sock , void * pkt , unsigned pkt_len , const pj_sockaddr_t * peer_addr , unsigned addr_len )
{
struct ast_rtp_instance * instance = pj_turn_sock_get_user_data ( turn_sock ) ;
struct ast_rtp * rtp = ast_rtp_instance_get_data ( instance ) ;
pj_status_t status ;
status = pj_ice_sess_on_rx_pkt ( rtp - > ice , AST_RTP_ICE_COMPONENT_RTCP , TRANSPORT_TURN_RTCP , pkt , pkt_len , peer_addr ,
addr_len ) ;
if ( status ! = PJ_SUCCESS ) {
char buf [ 100 ] ;
pj_strerror ( status , buf , sizeof ( buf ) ) ;
ast_log ( LOG_WARNING , " PJ ICE Rx error status code: %d '%s'. \n " ,
( int ) status , buf ) ;
return ;
}
if ( ! rtp - > rtcp_passthrough ) {
return ;
}
rtp - > rtcp_passthrough = 0 ;
ast_sendto ( rtp - > rtcp - > s , pkt , pkt_len , 0 , & rtp - > rtcp_loop ) ;
}
static void ast_rtp_on_turn_rtcp_state ( pj_turn_sock * turn_sock , pj_turn_state_t old_state , pj_turn_state_t new_state )
{
struct ast_rtp_instance * instance = pj_turn_sock_get_user_data ( turn_sock ) ;
struct ast_rtp * rtp = NULL ;
/* If this is a leftover from an already destroyed RTP instance just ignore the state change */
if ( ! instance ) {
return ;
}
rtp = ast_rtp_instance_get_data ( instance ) ;
/* We store the new state so the other thread can actually handle it */
ast_mutex_lock ( & rtp - > lock ) ;
rtp - > turn_state = new_state ;
ast_cond_signal ( & rtp - > cond ) ;
if ( new_state = = PJ_TURN_STATE_DESTROYING ) {
pj_turn_sock_set_user_data ( rtp - > turn_rtcp , NULL ) ;
rtp - > turn_rtcp = NULL ;
}
ast_mutex_unlock ( & rtp - > lock ) ;
}
/* RTCP TURN Socket interface declaration */
static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = {
. on_rx_data = ast_rtp_on_turn_rx_rtcp_data ,
. on_state = ast_rtp_on_turn_rtcp_state ,
} ;
/*! \brief Worker thread for ioqueue and timerheap */
static int ioqueue_worker_thread ( void * data )
{
struct ast_rtp_ioqueue_thread * ioqueue = data ;
while ( ! ioqueue - > terminate ) {
const pj_time_val delay = { 0 , 10 } ;
pj_ioqueue_poll ( ioqueue - > ioqueue , & delay ) ;
pj_timer_heap_poll ( ioqueue - > timerheap , NULL ) ;
}
return 0 ;
}
/*! \brief Destroyer for ioqueue thread */
static void rtp_ioqueue_thread_destroy ( struct ast_rtp_ioqueue_thread * ioqueue )
{
if ( ioqueue - > thread ) {
ioqueue - > terminate = 1 ;
pj_thread_join ( ioqueue - > thread ) ;
pj_thread_destroy ( ioqueue - > thread ) ;
}
pj_pool_release ( ioqueue - > pool ) ;
ast_free ( ioqueue ) ;
}
/*! \brief Removal function for ioqueue thread, determines if it should be terminated and destroyed */
static void rtp_ioqueue_thread_remove ( struct ast_rtp_ioqueue_thread * ioqueue )
{
int destroy = 0 ;
/* If nothing is using this ioqueue thread destroy it */
AST_LIST_LOCK ( & ioqueues ) ;
if ( ( ioqueue - > count - 2 ) = = 0 ) {
destroy = 1 ;
AST_LIST_REMOVE ( & ioqueues , ioqueue , next ) ;
}
AST_LIST_UNLOCK ( & ioqueues ) ;
if ( ! destroy ) {
return ;
}
rtp_ioqueue_thread_destroy ( ioqueue ) ;
}
/*! \brief Finder and allocator for an ioqueue thread */
static struct ast_rtp_ioqueue_thread * rtp_ioqueue_thread_get_or_create ( void )
{
struct ast_rtp_ioqueue_thread * ioqueue ;
pj_lock_t * lock ;
AST_LIST_LOCK ( & ioqueues ) ;
/* See if an ioqueue thread exists that can handle more */
AST_LIST_TRAVERSE ( & ioqueues , ioqueue , next ) {
if ( ( ioqueue - > count + 2 ) < PJ_IOQUEUE_MAX_HANDLES ) {
break ;
}
}
/* If we found one bump it up and return it */
if ( ioqueue ) {
ioqueue - > count + = 2 ;
goto end ;
}
ioqueue = ast_calloc ( 1 , sizeof ( * ioqueue ) ) ;
if ( ! ioqueue ) {
goto end ;
}
ioqueue - > pool = pj_pool_create ( & cachingpool . factory , " rtp " , 512 , 512 , NULL ) ;
/* We use a timer on the ioqueue thread for TURN so that two threads aren't operating
* on a session at the same time
*/
if ( pj_timer_heap_create ( ioqueue - > pool , 4 , & ioqueue - > timerheap ) ! = PJ_SUCCESS ) {
goto fatal ;
}
if ( pj_lock_create_recursive_mutex ( ioqueue - > pool , " rtp%p " , & lock ) ! = PJ_SUCCESS ) {
goto fatal ;
}
pj_timer_heap_set_lock ( ioqueue - > timerheap , lock , PJ_TRUE ) ;
if ( pj_ioqueue_create ( ioqueue - > pool , 16 , & ioqueue - > ioqueue ) ! = PJ_SUCCESS ) {
goto fatal ;
}
if ( pj_thread_create ( ioqueue - > pool , " ice " , & ioqueue_worker_thread , ioqueue , 0 , 0 , & ioqueue - > thread ) ! = PJ_SUCCESS ) {
goto fatal ;
}
AST_LIST_INSERT_HEAD ( & ioqueues , ioqueue , next ) ;
/* Since this is being returned to an active session the count always starts at 2 */
ioqueue - > count = 2 ;
goto end ;
fatal :
rtp_ioqueue_thread_destroy ( ioqueue ) ;
ioqueue = NULL ;
end :
AST_LIST_UNLOCK ( & ioqueues ) ;
return ioqueue ;
}
static void ast_rtp_ice_turn_request ( struct ast_rtp_instance * instance , enum ast_rtp_ice_component_type component ,
enum ast_transport transport , const char * server , unsigned int port , const char * username , const char * password )
{
struct ast_rtp * rtp = ast_rtp_instance_get_data ( instance ) ;
pj_turn_sock * * turn_sock ;
const pj_turn_sock_cb * turn_cb ;
pj_turn_tp_type conn_type ;
int conn_transport ;
pj_stun_auth_cred cred = { 0 , } ;
pj_str_t turn_addr ;
struct ast_sockaddr addr = { { 0 , } } ;
pj_stun_config stun_config ;
struct timeval wait = ast_tvadd ( ast_tvnow ( ) , ast_samp2tv ( TURN_STATE_WAIT_TIME , 1000 ) ) ;
struct timespec ts = { . tv_sec = wait . tv_sec , . tv_nsec = wait . tv_usec * 1000 , } ;
pj_turn_session_info info ;
struct ast_sockaddr local , loop ;
ast_rtp_instance_get_local_address ( instance , & local ) ;
if ( ast_sockaddr_is_ipv4 ( & local ) ) {
ast_sockaddr_parse ( & loop , " 127.0.0.1 " , PARSE_PORT_FORBID ) ;
} else {
ast_sockaddr_parse ( & loop , " ::1 " , PARSE_PORT_FORBID ) ;
}
/* Determine what component we are requesting a TURN session for */
if ( component = = AST_RTP_ICE_COMPONENT_RTP ) {
turn_sock = & rtp - > turn_rtp ;
turn_cb = & ast_rtp_turn_rtp_sock_cb ;
conn_transport = TRANSPORT_TURN_RTP ;
ast_sockaddr_set_port ( & loop , ast_sockaddr_port ( & local ) ) ;
} else if ( component = = AST_RTP_ICE_COMPONENT_RTCP ) {
turn_sock = & rtp - > turn_rtcp ;
turn_cb = & ast_rtp_turn_rtcp_sock_cb ;
conn_transport = TRANSPORT_TURN_RTCP ;
ast_sockaddr_set_port ( & loop , ast_sockaddr_port ( & rtp - > rtcp - > us ) ) ;
} else {
return ;
}
if ( transport = = AST_TRANSPORT_UDP ) {
conn_type = PJ_TURN_TP_UDP ;
} else if ( transport = = AST_TRANSPORT_TCP ) {
conn_type = PJ_TURN_TP_TCP ;
} else {
ast_assert ( 0 ) ;
return ;
}
ast_sockaddr_parse ( & addr , server , PARSE_PORT_FORBID ) ;
ast_mutex_lock ( & rtp - > lock ) ;
if ( * turn_sock ) {
pj_turn_sock_destroy ( * turn_sock ) ;
rtp - > turn_state = PJ_TURN_STATE_NULL ;
while ( rtp - > turn_state ! = PJ_TURN_STATE_DESTROYING ) {
ast_cond_timedwait ( & rtp - > cond , & rtp - > lock , & ts ) ;
}
}
ast_mutex_unlock ( & rtp - > lock ) ;
if ( component = = AST_RTP_ICE_COMPONENT_RTP & & ! rtp - > ioqueue ) {
rtp - > ioqueue = rtp_ioqueue_thread_get_or_create ( ) ;
if ( ! rtp - > ioqueue ) {
return ;
}
}
pj_stun_config_init ( & stun_config , & cachingpool . factory , 0 , rtp - > ioqueue - > ioqueue , rtp - > ioqueue - > timerheap ) ;
if ( pj_turn_sock_create ( & stun_config , ast_sockaddr_is_ipv4 ( & addr ) ? pj_AF_INET ( ) : pj_AF_INET6 ( ) , conn_type ,
turn_cb , NULL , instance , turn_sock ) ! = PJ_SUCCESS ) {
ast_log ( LOG_WARNING , " Could not create a TURN client socket \n " ) ;
return ;
}
cred . type = PJ_STUN_AUTH_CRED_STATIC ;
pj_strset2 ( & cred . data . static_cred . username , ( char * ) username ) ;
cred . data . static_cred . data_type = PJ_STUN_PASSWD_PLAIN ;
pj_strset2 ( & cred . data . static_cred . data , ( char * ) password ) ;
/* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
ast_mutex_lock ( & rtp - > lock ) ;
pj_turn_sock_alloc ( * turn_sock , pj_cstr ( & turn_addr , server ) , port , NULL , & cred , NULL ) ;
while ( rtp - > turn_state < PJ_TURN_STATE_READY ) {
ast_cond_timedwait ( & rtp - > cond , & rtp - > lock , & ts ) ;
}
ast_mutex_unlock ( & rtp - > lock ) ;
/* If a TURN session was allocated add it as a candidate */
if ( rtp - > turn_state ! = PJ_TURN_STATE_READY ) {
return ;
}
pj_turn_sock_get_info ( * turn_sock , & info ) ;
ast_rtp_ice_add_cand ( rtp , component , conn_transport , PJ_ICE_CAND_TYPE_RELAYED , 65535 , & info . relay_addr ,
& info . relay_addr , NULL , pj_sockaddr_get_len ( & info . relay_addr ) ) ;
if ( component = = AST_RTP_ICE_COMPONENT_RTP ) {
ast_sockaddr_copy ( & rtp - > rtp_loop , & loop ) ;
} else if ( component = = AST_RTP_ICE_COMPONENT_RTCP ) {
ast_sockaddr_copy ( & rtp - > rtcp_loop , & loop ) ;
}
}
static char * generate_random_string ( char * buf , size_t size )
{
long val [ 4 ] ;
@ -819,6 +1184,7 @@ static struct ast_rtp_engine_ice ast_rtp_ice = {
. get_local_candidates = ast_rtp_ice_get_local_candidates ,
. ice_lite = ast_rtp_ice_lite ,
. set_role = ast_rtp_ice_set_role ,
. turn_request = ast_rtp_ice_turn_request ,
} ;
# endif
@ -1239,6 +1605,22 @@ static void ast_rtp_on_ice_complete(pj_ice_sess *ice, pj_status_t status)
{
struct ast_rtp_instance * instance = ice - > user_data ;
struct ast_rtp * rtp = ast_rtp_instance_get_data ( instance ) ;
if ( status = = PJ_SUCCESS ) {
struct ast_sockaddr remote_address ;
/* Symmetric RTP must be disabled for the remote address to not get overwritten */
ast_rtp_instance_set_prop ( instance , AST_RTP_PROPERTY_NAT , 0 ) ;
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTP , & remote_address ) ;
ast_rtp_instance_set_remote_address ( instance , & remote_address ) ;
turn_enable_bind_channel ( rtp , rtp - > turn_rtp , AST_RTP_ICE_COMPONENT_RTP , TRANSPORT_TURN_RTP ) ;
if ( rtp - > rtcp ) {
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTCP , & rtp - > rtcp - > them ) ;
turn_enable_bind_channel ( rtp , rtp - > turn_rtcp , AST_RTP_ICE_COMPONENT_RTCP , TRANSPORT_TURN_RTCP ) ;
}
}
# ifdef HAVE_OPENSSL_SRTP
dtls_perform_handshake ( instance , & rtp - > dtls , 0 ) ;
@ -1263,7 +1645,13 @@ static void ast_rtp_on_ice_rx_data(pj_ice_sess *ice, unsigned comp_id, unsigned
/* Instead of handling the packet here (which really doesn't work with our architecture) we set a bit to indicate that it should be handled after pj_ice_sess_on_rx_pkt
* returns */
rtp - > passthrough = 1 ;
if ( transport_id = = TRANSPORT_SOCKET_RTP | | transport_id = = TRANSPORT_SOCKET_RTCP ) {
rtp - > passthrough = 1 ;
} else if ( transport_id = = TRANSPORT_TURN_RTP ) {
rtp - > rtp_passthrough = 1 ;
} else if ( transport_id = = TRANSPORT_TURN_RTCP ) {
rtp - > rtcp_passthrough = 1 ;
}
}
static pj_status_t ast_rtp_on_ice_tx_pkt ( pj_ice_sess * ice , unsigned comp_id , unsigned transport_id , const void * pkt , pj_size_t size , const pj_sockaddr_t * dst_addr , unsigned dst_addr_len )
@ -1309,106 +1697,11 @@ static pj_ice_sess_cb ast_rtp_ice_sess_cb = {
. on_tx_pkt = ast_rtp_on_ice_tx_pkt ,
} ;
static void ast_rtp_on_turn_rx_rtp_data ( pj_turn_sock * turn_sock , void * pkt , unsigned pkt_len , const pj_sockaddr_t * peer_addr , unsigned addr_len )
{
struct ast_rtp_instance * instance = pj_turn_sock_get_user_data ( turn_sock ) ;
struct ast_rtp * rtp = ast_rtp_instance_get_data ( instance ) ;
struct ast_sockaddr dest = { { 0 , } , } ;
ast_rtp_instance_get_local_address ( instance , & dest ) ;
ast_sendto ( rtp - > s , pkt , pkt_len , 0 , & dest ) ;
}
static void ast_rtp_on_turn_rtp_state ( pj_turn_sock * turn_sock , pj_turn_state_t old_state , pj_turn_state_t new_state )
{
struct ast_rtp_instance * instance = pj_turn_sock_get_user_data ( turn_sock ) ;
struct ast_rtp * rtp = NULL ;
/* If this is a leftover from an already destroyed RTP instance just ignore the state change */
if ( ! instance ) {
return ;
}
rtp = ast_rtp_instance_get_data ( instance ) ;
/* If the TURN session is being destroyed we need to remove it from the RTP instance */
if ( new_state = = PJ_TURN_STATE_DESTROYING ) {
rtp - > turn_rtp = NULL ;
return ;
}
/* We store the new state so the other thread can actually handle it */
ast_mutex_lock ( & rtp - > lock ) ;
rtp - > turn_state = new_state ;
/* If this is a state that the main thread should be notified about do so */
if ( new_state = = PJ_TURN_STATE_READY | | new_state = = PJ_TURN_STATE_DEALLOCATING | | new_state = = PJ_TURN_STATE_DEALLOCATED ) {
ast_cond_signal ( & rtp - > cond ) ;
}
ast_mutex_unlock ( & rtp - > lock ) ;
}
/* RTP TURN Socket interface declaration */
static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
. on_rx_data = ast_rtp_on_turn_rx_rtp_data ,
. on_state = ast_rtp_on_turn_rtp_state ,
} ;
static void ast_rtp_on_turn_rx_rtcp_data ( pj_turn_sock * turn_sock , void * pkt , unsigned pkt_len , const pj_sockaddr_t * peer_addr , unsigned addr_len )
{
struct ast_rtp_instance * instance = pj_turn_sock_get_user_data ( turn_sock ) ;
struct ast_rtp * rtp = ast_rtp_instance_get_data ( instance ) ;
ast_sendto ( rtp - > rtcp - > s , pkt , pkt_len , 0 , & rtp - > rtcp - > us ) ;
}
static void ast_rtp_on_turn_rtcp_state ( pj_turn_sock * turn_sock , pj_turn_state_t old_state , pj_turn_state_t new_state )
{
struct ast_rtp_instance * instance = pj_turn_sock_get_user_data ( turn_sock ) ;
struct ast_rtp * rtp = NULL ;
/* If this is a leftover from an already destroyed RTP instance just ignore the state change */
if ( ! instance ) {
return ;
}
rtp = ast_rtp_instance_get_data ( instance ) ;
/* If the TURN session is being destroyed we need to remove it from the RTP instance */
if ( new_state = = PJ_TURN_STATE_DESTROYING ) {
rtp - > turn_rtcp = NULL ;
return ;
}
/* We store the new state so the other thread can actually handle it */
ast_mutex_lock ( & rtp - > lock ) ;
rtp - > turn_state = new_state ;
/* If this is a state that the main thread should be notified about do so */
if ( new_state = = PJ_TURN_STATE_READY | | new_state = = PJ_TURN_STATE_DEALLOCATING | | new_state = = PJ_TURN_STATE_DEALLOCATED ) {
ast_cond_signal ( & rtp - > cond ) ;
}
ast_mutex_unlock ( & rtp - > lock ) ;
}
/* RTCP TURN Socket interface declaration */
static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = {
. on_rx_data = ast_rtp_on_turn_rx_rtcp_data ,
. on_state = ast_rtp_on_turn_rtcp_state ,
} ;
/*! \brief Worker thread for I/O queue and timerheap */
static int ice_worker_thread ( void * data )
/*! \brief Worker thread for timerheap */
static int timer_worker_thread ( void * data )
{
while ( ! worker_terminate ) {
const pj_time_val delay = { 0 , 10 } ;
pj_ioqueue_poll ( ioqueue , & delay ) ;
pj_timer_heap_poll ( timerheap , NULL ) ;
while ( ! timer_terminate ) {
pj_timer_heap_poll ( timer_heap , NULL ) ;
}
return 0 ;
@ -1697,6 +1990,9 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
struct ast_rtp * rtp = ast_rtp_instance_get_data ( instance ) ;
struct ast_srtp * srtp = ast_rtp_instance_get_srtp ( instance ) ;
char * in = buf ;
# ifdef HAVE_PJPROJECT
struct ast_sockaddr * loop = rtcp ? & rtp - > rtcp_loop : & rtp - > rtp_loop ;
# endif
if ( ( len = ast_recvfrom ( rtcp ? rtp - > rtcp - > s : rtp - > s , buf , size , flags , sa ) ) < 0 ) {
return len ;
@ -1752,7 +2048,16 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
# endif
# ifdef HAVE_PJPROJECT
if ( rtp - > ice ) {
if ( ! ast_sockaddr_isnull ( loop ) & & ! ast_sockaddr_cmp ( loop , sa ) ) {
/* ICE traffic will have been handled in the TURN callback, so skip it but update the address
* so it reflects the actual source and not the loopback
*/
if ( rtcp ) {
ast_sockaddr_copy ( sa , & rtp - > rtcp - > them ) ;
} else {
ast_rtp_instance_get_remote_address ( instance , sa ) ;
}
} else if ( rtp - > ice ) {
pj_str_t combined = pj_str ( ast_sockaddr_stringify ( sa ) ) ;
pj_sockaddr address ;
pj_status_t status ;
@ -1769,7 +2074,7 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
pj_strerror ( status , buf , sizeof ( buf ) ) ;
ast_log ( LOG_WARNING , " PJ ICE Rx error status code: %d '%s'. \n " ,
( int ) status , buf ) ;
( int ) status , buf ) ;
return - 1 ;
}
if ( ! rtp - > passthrough ) {
@ -1942,7 +2247,7 @@ static int rtp_learning_rtp_seq_update(struct rtp_learning_info *info, uint16_t
# ifdef HAVE_PJPROJECT
static void rtp_add_candidates_to_ice ( struct ast_rtp_instance * instance , struct ast_rtp * rtp , struct ast_sockaddr * addr , int port , int component ,
int transport , const pj_turn_sock_cb * turn_cb , pj_turn_sock * * turn_sock )
int transport )
{
pj_sockaddr address [ 16 ] ;
unsigned int count = PJ_ARRAY_SIZE ( address ) , pos = 0 ;
@ -1981,38 +2286,9 @@ static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct
}
/* If configured to use a TURN relay create a session and allocate */
if ( pj_strlen ( & turnaddr ) & & pj_turn_sock_create ( & rtp - > ice - > stun_cfg , ast_sockaddr_is_ipv4 ( addr ) ? pj_AF_INET ( ) : pj_AF_INET6 ( ) , PJ_TURN_TP_TCP ,
turn_cb , NULL , instance , turn_sock ) = = PJ_SUCCESS ) {
pj_stun_auth_cred cred = { 0 , } ;
struct timeval wait = ast_tvadd ( ast_tvnow ( ) , ast_samp2tv ( TURN_ALLOCATION_WAIT_TIME , 1000 ) ) ;
struct timespec ts = { . tv_sec = wait . tv_sec , . tv_nsec = wait . tv_usec * 1000 , } ;
cred . type = PJ_STUN_AUTH_CRED_STATIC ;
cred . data . static_cred . username = turnusername ;
cred . data . static_cred . data_type = PJ_STUN_PASSWD_PLAIN ;
cred . data . static_cred . data = turnpassword ;
/* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
ast_mutex_lock ( & rtp - > lock ) ;
pj_turn_sock_alloc ( * turn_sock , & turnaddr , turnport , NULL , & cred , NULL ) ;
ast_cond_timedwait ( & rtp - > cond , & rtp - > lock , & ts ) ;
ast_mutex_unlock ( & rtp - > lock ) ;
/* If a TURN session was allocated add it as a candidate */
if ( rtp - > turn_state = = PJ_TURN_STATE_READY ) {
pj_turn_session_info info ;
pj_turn_sock_get_info ( * turn_sock , & info ) ;
if ( transport = = TRANSPORT_SOCKET_RTP ) {
transport = TRANSPORT_TURN_RTP ;
} else if ( transport = = TRANSPORT_SOCKET_RTCP ) {
transport = TRANSPORT_TURN_RTCP ;
}
ast_rtp_ice_add_cand ( rtp , component , transport , PJ_ICE_CAND_TYPE_RELAYED , 65535 , & info . relay_addr , & info . relay_addr ,
NULL , pj_sockaddr_get_len ( & info . relay_addr ) ) ;
}
if ( pj_strlen ( & turnaddr ) ) {
ast_rtp_ice_turn_request ( instance , component , AST_TRANSPORT_TCP , pj_strbuf ( & turnaddr ) , turnport ,
pj_strbuf ( & turnusername ) , pj_strbuf ( & turnpassword ) ) ;
}
}
# endif
@ -2071,7 +2347,7 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad
pj_thread_register_check ( ) ;
pj_stun_config_init ( & stun_config , & cachingpool . factory , 0 , ioqueue , timer heap) ;
pj_stun_config_init ( & stun_config , & cachingpool . factory , 0 , NULL , timer_ heap) ;
ufrag = pj_str ( rtp - > local_ufrag ) ;
passwd = pj_str ( rtp - > local_passwd ) ;
@ -2084,14 +2360,14 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad
/* Add all of the available candidates to the ICE session */
rtp_add_candidates_to_ice ( instance , rtp , addr , port , AST_RTP_ICE_COMPONENT_RTP ,
TRANSPORT_SOCKET_RTP , & ast_rtp_turn_rtp_sock_cb , & rtp - > turn_rtp );
TRANSPORT_SOCKET_RTP );
/* Only add the RTCP candidates to ICE when replacing the session. New sessions
* handle this in a separate part of the setup phase */
if ( replace & & rtp - > rtcp ) {
rtp_add_candidates_to_ice ( instance , rtp , & rtp - > rtcp - > us ,
ast_sockaddr_port ( & rtp - > rtcp - > us ) , AST_RTP_ICE_COMPONENT_RTCP ,
TRANSPORT_SOCKET_RTCP , & ast_rtp_turn_rtcp_sock_cb , & rtp - > turn_rtcp );
TRANSPORT_SOCKET_RTCP );
}
return 0 ;
@ -2200,6 +2476,8 @@ static int ast_rtp_new(struct ast_rtp_instance *instance,
static int ast_rtp_destroy ( struct ast_rtp_instance * instance )
{
struct ast_rtp * rtp = ast_rtp_instance_get_data ( instance ) ;
struct timeval wait = ast_tvadd ( ast_tvnow ( ) , ast_samp2tv ( TURN_STATE_WAIT_TIME , 1000 ) ) ;
struct timespec ts = { . tv_sec = wait . tv_sec , . tv_nsec = wait . tv_usec * 1000 , } ;
/* Destroy the smoother that was smoothing out audio if present */
if ( rtp - > smoother ) {
@ -2236,21 +2514,33 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance)
# ifdef HAVE_PJPROJECT
pj_thread_register_check ( ) ;
/* Destroy the ICE session if being used */
if ( rtp - > ice ) {
pj_ice_sess_destroy ( rtp - > ice ) ;
}
/* Destroy the RTP TURN relay if being used */
ast_mutex_lock ( & rtp - > lock ) ;
if ( rtp - > turn_rtp ) {
pj_turn_sock_set_user_data ( rtp - > turn_rtp , NULL ) ;
pj_turn_sock_destroy ( rtp - > turn_rtp ) ;
rtp - > turn_state = PJ_TURN_STATE_NULL ;
while ( rtp - > turn_state ! = PJ_TURN_STATE_DESTROYING ) {
ast_cond_timedwait ( & rtp - > cond , & rtp - > lock , & ts ) ;
}
}
/* Destroy the RTCP TURN relay if being used */
if ( rtp - > turn_rtcp ) {
pj_turn_sock_set_user_data ( rtp - > turn_rtcp , NULL ) ;
pj_turn_sock_destroy ( rtp - > turn_rtcp ) ;
rtp - > turn_state = PJ_TURN_STATE_NULL ;
while ( rtp - > turn_state ! = PJ_TURN_STATE_DESTROYING ) {
ast_cond_timedwait ( & rtp - > cond , & rtp - > lock , & ts ) ;
}
}
ast_mutex_unlock ( & rtp - > lock ) ;
if ( rtp - > ioqueue ) {
rtp_ioqueue_thread_remove ( rtp - > ioqueue ) ;
}
/* Destroy the ICE session if being used */
if ( rtp - > ice ) {
pj_ice_sess_destroy ( rtp - > ice ) ;
}
/* Destroy any candidates */
@ -2357,7 +2647,6 @@ static int ast_rtp_dtmf_begin(struct ast_rtp_instance *instance, char digit)
ast_sockaddr_stringify ( & remote_address ) ,
strerror ( errno ) ) ;
}
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTP , & remote_address ) ;
if ( rtp_debug_test_addr ( & remote_address ) ) {
ast_verbose ( " Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d) \n " ,
ast_sockaddr_stringify ( & remote_address ) ,
@ -2407,8 +2696,6 @@ static int ast_rtp_dtmf_continuation(struct ast_rtp_instance *instance)
strerror ( errno ) ) ;
}
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTP , & remote_address ) ;
if ( rtp_debug_test_addr ( & remote_address ) ) {
ast_verbose ( " Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d) \n " ,
ast_sockaddr_stringify ( & remote_address ) ,
@ -2483,8 +2770,6 @@ static int ast_rtp_dtmf_end_with_duration(struct ast_rtp_instance *instance, cha
strerror ( errno ) ) ;
}
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTP , & remote_address ) ;
if ( rtp_debug_test_addr ( & remote_address ) ) {
ast_verbose ( " Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d) \n " ,
ast_sockaddr_stringify ( & remote_address ) ,
@ -2745,8 +3030,6 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr)
rtp - > rtcp - > rr_count + + ;
}
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTCP , & remote_address ) ;
if ( rtcp_debug_test_addr ( & rtp - > rtcp - > them ) ) {
ast_verbose ( " * Sent RTCP %s to %s%s \n " , sr ? " SR " : " RR " ,
ast_sockaddr_stringify ( & remote_address ) , ice ? " (via ICE) " : " " ) ;
@ -2940,8 +3223,6 @@ static int ast_rtp_raw_write(struct ast_rtp_instance *instance, struct ast_frame
}
}
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTP , & remote_address ) ;
if ( rtp_debug_test_addr ( & remote_address ) ) {
ast_verbose ( " Sent RTP packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d) \n " ,
ast_sockaddr_stringify ( & remote_address ) ,
@ -3910,8 +4191,6 @@ static int bridge_p2p_rtp_write(struct ast_rtp_instance *instance, unsigned int
return 0 ;
}
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTP , & remote_address ) ;
if ( rtp_debug_test_addr ( & remote_address ) ) {
ast_verbose ( " Sent RTP P2P packet to %s%s (type %-2.2d, len %-6.6d) \n " ,
ast_sockaddr_stringify ( & remote_address ) ,
@ -4388,8 +4667,7 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro
# ifdef HAVE_PJPROJECT
if ( rtp - > ice ) {
rtp_add_candidates_to_ice ( instance , rtp , & rtp - > rtcp - > us , ast_sockaddr_port ( & rtp - > rtcp - > us ) , AST_RTP_ICE_COMPONENT_RTCP , TRANSPORT_SOCKET_RTCP ,
& ast_rtp_turn_rtcp_sock_cb , & rtp - > turn_rtcp ) ;
rtp_add_candidates_to_ice ( instance , rtp , & rtp - > rtcp - > us , ast_sockaddr_port ( & rtp - > rtcp - > us ) , AST_RTP_ICE_COMPONENT_RTCP , TRANSPORT_SOCKET_RTCP ) ;
}
# endif
@ -4681,8 +4959,6 @@ static int ast_rtp_sendcng(struct ast_rtp_instance *instance, int level)
return res ;
}
update_address_with_ice_candidate ( rtp , AST_RTP_ICE_COMPONENT_RTP , & remote_address ) ;
if ( rtp_debug_test_addr ( & remote_address ) ) {
ast_verbose ( " Sent Comfort Noise RTP packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d) \n " ,
ast_sockaddr_stringify ( & remote_address ) ,
@ -4949,17 +5225,17 @@ static int rtp_reload(int reload)
if ( ast_parse_arg ( s , PARSE_INADDR , & addr ) ) {
ast_log ( LOG_WARNING , " Invalid TURN server address: %s \n " , s ) ;
} else {
pj_strdup2 ( pool , & turnaddr , ast_inet_ntoa ( addr . sin_addr ) ) ;
pj_strdup2 _with_null ( pool , & turnaddr , ast_inet_ntoa ( addr . sin_addr ) ) ;
/* ntohs() is not a bug here. The port number is used in host byte order with
* a pjnat API . */
turnport = ntohs ( addr . sin_port ) ;
}
}
if ( ( s = ast_variable_retrieve ( cfg , " general " , " turnusername " ) ) ) {
pj_strdup2 ( pool , & turnusername , s ) ;
pj_strdup2 _with_null ( pool , & turnusername , s ) ;
}
if ( ( s = ast_variable_retrieve ( cfg , " general " , " turnpassword " ) ) ) {
pj_strdup2 ( pool , & turnpassword , s ) ;
pj_strdup2 _with_null ( pool , & turnpassword , s ) ;
}
# endif
ast_config_destroy ( cfg ) ;
@ -4979,6 +5255,20 @@ static int reload_module(void)
return 0 ;
}
# ifdef HAVE_PJPROJECT
static void rtp_terminate_pjproject ( void )
{
if ( timer_thread ) {
timer_terminate = 1 ;
pj_thread_join ( timer_thread ) ;
pj_thread_destroy ( timer_thread ) ;
}
pj_caching_pool_destroy ( & cachingpool ) ;
pj_shutdown ( ) ;
}
# endif
static int load_module ( void )
{
# ifdef HAVE_PJPROJECT
@ -4989,65 +5279,49 @@ static int load_module(void)
}
if ( pjlib_util_init ( ) ! = PJ_SUCCESS ) {
pj_shutdown ( ) ;
rtp_terminate_pjproject ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
if ( pjnath_init ( ) ! = PJ_SUCCESS ) {
pj_shutdown ( ) ;
rtp_terminate_pjproject ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
pj_caching_pool_init ( & cachingpool , & pj_pool_factory_default_policy , 0 ) ;
pool = pj_pool_create ( & cachingpool . factory , " rtp " , 512 , 512 , NULL ) ;
pool = pj_pool_create ( & cachingpool . factory , " timer " , 512 , 512 , NULL ) ;
if ( pj_timer_heap_create ( pool , 100 , & timerheap ) ! = PJ_SUCCESS ) {
pj_caching_pool_destroy ( & cachingpool ) ;
pj_shutdown ( ) ;
if ( pj_timer_heap_create ( pool , 100 , & timer_heap ) ! = PJ_SUCCESS ) {
rtp_terminate_pjproject ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
if ( pj_lock_create_recursive_mutex ( pool , " rtp%p " , & lock ) ! = PJ_SUCCESS ) {
pj_caching_pool_destroy ( & cachingpool ) ;
pj_shutdown ( ) ;
rtp_terminate_pjproject ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
pj_timer_heap_set_lock ( timer heap, lock , PJ_TRUE ) ;
pj_timer_heap_set_lock ( timer _ heap, lock , PJ_TRUE ) ;
if ( pj_ioqueue_create ( pool , 16 , & ioqueue ) ! = PJ_SUCCESS ) {
pj_caching_pool_destroy ( & cachingpool ) ;
pj_shutdown ( ) ;
if ( pj_thread_create ( pool , " timer " , & timer_worker_thread , NULL , 0 , 0 , & timer_thread ) ! = PJ_SUCCESS ) {
rtp_terminate_pjproject ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
if ( pj_thread_create ( pool , " ice " , & ice_worker_thread , NULL , 0 , 0 , & thread ) ! = PJ_SUCCESS ) {
pj_caching_pool_destroy ( & cachingpool ) ;
pj_shutdown ( ) ;
return AST_MODULE_LOAD_DECLINE ;
}
# endif
if ( ast_rtp_engine_register ( & asterisk_rtp_engine ) ) {
# ifdef HAVE_PJPROJECT
worker_terminate = 1 ;
pj_thread_join ( thread ) ;
pj_thread_destroy ( thread ) ;
pj_caching_pool_destroy ( & cachingpool ) ;
pj_shutdown ( ) ;
rtp_terminate_pjproject ( ) ;
# endif
return AST_MODULE_LOAD_DECLINE ;
}
if ( ast_cli_register_multiple ( cli_rtp , ARRAY_LEN ( cli_rtp ) ) ) {
# ifdef HAVE_PJPROJECT
worker_terminate = 1 ;
pj_thread_join ( thread ) ;
pj_thread_destroy ( thread ) ;
ast_rtp_engine_unregister ( & asterisk_rtp_engine ) ;
pj_caching_pool_destroy ( & cachingpool ) ;
pj_shutdown ( ) ;
rtp_terminate_pjproject ( ) ;
# endif
return AST_MODULE_LOAD_DECLINE ;
}
@ -5063,15 +5337,8 @@ static int unload_module(void)
ast_cli_unregister_multiple ( cli_rtp , ARRAY_LEN ( cli_rtp ) ) ;
# ifdef HAVE_PJPROJECT
worker_terminate = 1 ;
pj_thread_register_check ( ) ;
pj_thread_join ( thread ) ;
pj_thread_destroy ( thread ) ;
pj_caching_pool_destroy ( & cachingpool ) ;
pj_shutdown ( ) ;
rtp_terminate_pjproject ( ) ;
# endif
return 0 ;