diff --git a/include/asterisk/rtp_engine.h b/include/asterisk/rtp_engine.h index e5f38eecc8..db5cd34ed7 100644 --- a/include/asterisk/rtp_engine.h +++ b/include/asterisk/rtp_engine.h @@ -429,6 +429,10 @@ struct ast_rtp_engine_ice { void (*ice_lite)(struct ast_rtp_instance *instance); /*! Callback for changing our role in negotiation */ void (*set_role)(struct ast_rtp_instance *instance, enum ast_rtp_ice_role role); + /*! Callback for requesting a TURN session */ + void (*turn_request)(struct ast_rtp_instance *instance, enum ast_rtp_ice_component_type component, + enum ast_transport transport, const char *server, unsigned int port, + const char *username, const char *password); }; /*! \brief DTLS setup types */ diff --git a/res/res_rtp_asterisk.c b/res/res_rtp_asterisk.c index f192bacb8e..739ae17d4a 100644 --- a/res/res_rtp_asterisk.c +++ b/res/res_rtp_asterisk.c @@ -83,9 +83,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #define MINIMUM_RTP_PORT 1024 /*!< Minimum port number to accept */ #define MAXIMUM_RTP_PORT 65535 /*!< Maximum port number to accept */ -#define DEFAULT_TURN_PORT 34780 +#define DEFAULT_TURN_PORT 3478 -#define TURN_ALLOCATION_WAIT_TIME 2000 +#define TURN_STATE_WAIT_TIME 2000 #define RTCP_PT_FUR 192 #define RTCP_PT_SR AST_RTP_RTCP_SR @@ -149,20 +149,39 @@ static pj_str_t turnpassword; /*! \brief Pool factory used by pjlib to allocate memory. */ static pj_caching_pool cachingpool; -/*! \brief Pool used by pjlib functions which require memory allocation. */ +/*! \brief Global memory pool for configuration and timers */ static pj_pool_t *pool; -/*! \brief I/O queue for TURN relay traffic */ -static pj_ioqueue_t *ioqueue; +/*! \brief Global timer heap */ +static pj_timer_heap_t *timer_heap; -/*! \brief Timer heap for ICE and TURN stuff */ -static pj_timer_heap_t *timerheap; +/*! \brief Thread executing the timer heap */ +static pj_thread_t *timer_thread; -/*! \brief Worker thread for ICE/TURN */ -static pj_thread_t *thread; +/*! \brief Used to tell the timer thread to terminate */ +static int timer_terminate; + +/*! \brief Structure which contains ioqueue thread information */ +struct ast_rtp_ioqueue_thread { + /*! \brief Pool used by the thread */ + pj_pool_t *pool; + /*! \brief The thread handling the queue and timer heap */ + pj_thread_t *thread; + /*! \brief Ioqueue which polls on sockets */ + pj_ioqueue_t *ioqueue; + /*! \brief Timer heap for scheduled items */ + pj_timer_heap_t *timerheap; + /*! \brief Termination request */ + int terminate; + /*! \brief Current number of descriptors being waited on */ + unsigned int count; + /*! \brief Linked list information */ + AST_LIST_ENTRY(ast_rtp_ioqueue_thread) next; +}; + +/*! \brief List of ioqueue threads */ +static AST_LIST_HEAD_STATIC(ioqueues, ast_rtp_ioqueue_thread); -/*! \brief Notification that the ICE/TURN worker thread should stop */ -static int worker_terminate; #endif #define FLAG_3389_WARNING (1 << 0) @@ -172,10 +191,10 @@ static int worker_terminate; #define FLAG_NEED_MARKER_BIT (1 << 3) #define FLAG_DTMF_COMPENSATE (1 << 4) -#define TRANSPORT_SOCKET_RTP 1 -#define TRANSPORT_SOCKET_RTCP 2 -#define TRANSPORT_TURN_RTP 3 -#define TRANSPORT_TURN_RTCP 4 +#define TRANSPORT_SOCKET_RTP 0 +#define TRANSPORT_SOCKET_RTCP 1 +#define TRANSPORT_TURN_RTP 2 +#define TRANSPORT_TURN_RTCP 3 /*! \brief RTP learning mode tracking information */ struct rtp_learning_info { @@ -276,7 +295,13 @@ struct ast_rtp { pj_turn_sock *turn_rtcp; /*!< RTCP TURN relay */ pj_turn_state_t turn_state; /*!< Current state of the TURN relay session */ unsigned int passthrough:1; /*!< Bit to indicate that the received packet should be passed through */ + unsigned int rtp_passthrough:1; /*!< Bit to indicate that TURN RTP should be passed through */ + unsigned int rtcp_passthrough:1; /*!< Bit to indicate that TURN RTCP should be passed through */ unsigned int ice_port; /*!< Port that ICE was started with if it was previously started */ + struct ast_sockaddr rtp_loop; /*!< Loopback address for forwarding RTP from TURN */ + struct ast_sockaddr rtcp_loop; /*!< Loopback address for forwarding RTCP from TURN */ + + struct ast_rtp_ioqueue_thread *ioqueue; /*!< The ioqueue thread handling us */ char remote_ufrag[256]; /*!< The remote ICE username */ char remote_passwd[256]; /*!< The remote ICE password */ @@ -423,10 +448,11 @@ static void dtls_srtp_check_pending(struct ast_rtp_instance *instance, struct as static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp, int *ice, int use_srtp); +#ifdef HAVE_PJPROJECT /*! \brief Helper function which updates an ast_sockaddr with the candidate used for the component */ -static void update_address_with_ice_candidate(struct ast_rtp *rtp, int component, struct ast_sockaddr *cand_address) +static void update_address_with_ice_candidate(struct ast_rtp *rtp, enum ast_rtp_ice_component_type component, + struct ast_sockaddr *cand_address) { -#ifdef HAVE_PJPROJECT char address[PJ_INET6_ADDRSTRLEN]; if (!rtp->ice || (component < 1) || !rtp->ice->comp[component - 1].valid_check) { @@ -435,10 +461,20 @@ static void update_address_with_ice_candidate(struct ast_rtp *rtp, int component ast_sockaddr_parse(cand_address, pj_sockaddr_print(&rtp->ice->comp[component - 1].valid_check->rcand->addr, address, sizeof(address), 0), 0); ast_sockaddr_set_port(cand_address, pj_sockaddr_get_port(&rtp->ice->comp[component - 1].valid_check->rcand->addr)); -#endif } -#ifdef HAVE_PJPROJECT +/*! \brief Helper function which sets up channel binding on a TURN session if applicable */ +static void turn_enable_bind_channel(struct ast_rtp *rtp, pj_turn_sock *turn, int component, int transport) +{ + if (!rtp->ice || !turn || (component < 1) || !rtp->ice->comp[component - 1].valid_check || + (rtp->ice->comp[component - 1].valid_check->lcand->transport_id != transport)) { + return; + } + + pj_turn_sock_bind_channel(turn, &rtp->ice->comp[component - 1].valid_check->rcand->addr, + sizeof(rtp->ice->comp[component - 1].valid_check->rcand->addr)); +} + /*! \brief Destructor for locally created ICE candidates */ static void ast_rtp_ice_candidate_destroy(void *obj) { @@ -669,7 +705,7 @@ static void ast_rtp_ice_start(struct ast_rtp_instance *instance) if (pj_ice_sess_create_check_list(rtp->ice, &ufrag, &passwd, ao2_container_count(rtp->ice_active_remote_candidates), &candidates[0]) == PJ_SUCCESS) { ast_test_suite_event_notify("ICECHECKLISTCREATE", "Result: SUCCESS"); pj_ice_sess_start_check(rtp->ice); - pj_timer_heap_poll(timerheap, NULL); + pj_timer_heap_poll(timer_heap, NULL); rtp->strict_rtp_state = STRICT_RTP_OPEN; return; } @@ -795,6 +831,335 @@ static void ast_rtp_ice_add_cand(struct ast_rtp *rtp, unsigned comp_id, unsigned ao2_ref(candidate, -1); } +static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len) +{ + struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock); + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + pj_status_t status; + + status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP, pkt, pkt_len, peer_addr, + addr_len); + if (status != PJ_SUCCESS) { + char buf[100]; + + pj_strerror(status, buf, sizeof(buf)); + ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n", + (int)status, buf); + return; + } + if (!rtp->rtp_passthrough) { + return; + } + rtp->rtp_passthrough = 0; + + ast_sendto(rtp->s, pkt, pkt_len, 0, &rtp->rtp_loop); +} + +static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state) +{ + struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock); + struct ast_rtp *rtp; + + /* If this is a leftover from an already notified RTP instance just ignore the state change */ + if (!instance) { + return; + } + + rtp = ast_rtp_instance_get_data(instance); + + /* We store the new state so the other thread can actually handle it */ + ast_mutex_lock(&rtp->lock); + rtp->turn_state = new_state; + ast_cond_signal(&rtp->cond); + + if (new_state == PJ_TURN_STATE_DESTROYING) { + pj_turn_sock_set_user_data(rtp->turn_rtp, NULL); + rtp->turn_rtp = NULL; + } + + ast_mutex_unlock(&rtp->lock); +} + +/* RTP TURN Socket interface declaration */ +static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = { + .on_rx_data = ast_rtp_on_turn_rx_rtp_data, + .on_state = ast_rtp_on_turn_rtp_state, +}; + +static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len) +{ + struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock); + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + pj_status_t status; + + status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP, pkt, pkt_len, peer_addr, + addr_len); + if (status != PJ_SUCCESS) { + char buf[100]; + + pj_strerror(status, buf, sizeof(buf)); + ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n", + (int)status, buf); + return; + } + if (!rtp->rtcp_passthrough) { + return; + } + rtp->rtcp_passthrough = 0; + + ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp_loop); +} + +static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state) +{ + struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock); + struct ast_rtp *rtp = NULL; + + /* If this is a leftover from an already destroyed RTP instance just ignore the state change */ + if (!instance) { + return; + } + + rtp = ast_rtp_instance_get_data(instance); + + /* We store the new state so the other thread can actually handle it */ + ast_mutex_lock(&rtp->lock); + rtp->turn_state = new_state; + ast_cond_signal(&rtp->cond); + + if (new_state == PJ_TURN_STATE_DESTROYING) { + pj_turn_sock_set_user_data(rtp->turn_rtcp, NULL); + rtp->turn_rtcp = NULL; + } + + ast_mutex_unlock(&rtp->lock); +} + +/* RTCP TURN Socket interface declaration */ +static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = { + .on_rx_data = ast_rtp_on_turn_rx_rtcp_data, + .on_state = ast_rtp_on_turn_rtcp_state, +}; + +/*! \brief Worker thread for ioqueue and timerheap */ +static int ioqueue_worker_thread(void *data) +{ + struct ast_rtp_ioqueue_thread *ioqueue = data; + + while (!ioqueue->terminate) { + const pj_time_val delay = {0, 10}; + + pj_ioqueue_poll(ioqueue->ioqueue, &delay); + + pj_timer_heap_poll(ioqueue->timerheap, NULL); + } + + return 0; +} + +/*! \brief Destroyer for ioqueue thread */ +static void rtp_ioqueue_thread_destroy(struct ast_rtp_ioqueue_thread *ioqueue) +{ + if (ioqueue->thread) { + ioqueue->terminate = 1; + pj_thread_join(ioqueue->thread); + pj_thread_destroy(ioqueue->thread); + } + + pj_pool_release(ioqueue->pool); + ast_free(ioqueue); +} + +/*! \brief Removal function for ioqueue thread, determines if it should be terminated and destroyed */ +static void rtp_ioqueue_thread_remove(struct ast_rtp_ioqueue_thread *ioqueue) +{ + int destroy = 0; + + /* If nothing is using this ioqueue thread destroy it */ + AST_LIST_LOCK(&ioqueues); + if ((ioqueue->count - 2) == 0) { + destroy = 1; + AST_LIST_REMOVE(&ioqueues, ioqueue, next); + } + AST_LIST_UNLOCK(&ioqueues); + + if (!destroy) { + return; + } + + rtp_ioqueue_thread_destroy(ioqueue); +} + +/*! \brief Finder and allocator for an ioqueue thread */ +static struct ast_rtp_ioqueue_thread *rtp_ioqueue_thread_get_or_create(void) +{ + struct ast_rtp_ioqueue_thread *ioqueue; + pj_lock_t *lock; + + AST_LIST_LOCK(&ioqueues); + + /* See if an ioqueue thread exists that can handle more */ + AST_LIST_TRAVERSE(&ioqueues, ioqueue, next) { + if ((ioqueue->count + 2) < PJ_IOQUEUE_MAX_HANDLES) { + break; + } + } + + /* If we found one bump it up and return it */ + if (ioqueue) { + ioqueue->count += 2; + goto end; + } + + ioqueue = ast_calloc(1, sizeof(*ioqueue)); + if (!ioqueue) { + goto end; + } + + ioqueue->pool = pj_pool_create(&cachingpool.factory, "rtp", 512, 512, NULL); + + /* We use a timer on the ioqueue thread for TURN so that two threads aren't operating + * on a session at the same time + */ + if (pj_timer_heap_create(ioqueue->pool, 4, &ioqueue->timerheap) != PJ_SUCCESS) { + goto fatal; + } + + if (pj_lock_create_recursive_mutex(ioqueue->pool, "rtp%p", &lock) != PJ_SUCCESS) { + goto fatal; + } + + pj_timer_heap_set_lock(ioqueue->timerheap, lock, PJ_TRUE); + + if (pj_ioqueue_create(ioqueue->pool, 16, &ioqueue->ioqueue) != PJ_SUCCESS) { + goto fatal; + } + + if (pj_thread_create(ioqueue->pool, "ice", &ioqueue_worker_thread, ioqueue, 0, 0, &ioqueue->thread) != PJ_SUCCESS) { + goto fatal; + } + + AST_LIST_INSERT_HEAD(&ioqueues, ioqueue, next); + + /* Since this is being returned to an active session the count always starts at 2 */ + ioqueue->count = 2; + + goto end; + +fatal: + rtp_ioqueue_thread_destroy(ioqueue); + ioqueue = NULL; + +end: + AST_LIST_UNLOCK(&ioqueues); + return ioqueue; +} + +static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast_rtp_ice_component_type component, + enum ast_transport transport, const char *server, unsigned int port, const char *username, const char *password) +{ + struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + pj_turn_sock **turn_sock; + const pj_turn_sock_cb *turn_cb; + pj_turn_tp_type conn_type; + int conn_transport; + pj_stun_auth_cred cred = { 0, }; + pj_str_t turn_addr; + struct ast_sockaddr addr = { { 0, } }; + pj_stun_config stun_config; + struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000)); + struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, }; + pj_turn_session_info info; + struct ast_sockaddr local, loop; + + ast_rtp_instance_get_local_address(instance, &local); + if (ast_sockaddr_is_ipv4(&local)) { + ast_sockaddr_parse(&loop, "127.0.0.1", PARSE_PORT_FORBID); + } else { + ast_sockaddr_parse(&loop, "::1", PARSE_PORT_FORBID); + } + + /* Determine what component we are requesting a TURN session for */ + if (component == AST_RTP_ICE_COMPONENT_RTP) { + turn_sock = &rtp->turn_rtp; + turn_cb = &ast_rtp_turn_rtp_sock_cb; + conn_transport = TRANSPORT_TURN_RTP; + ast_sockaddr_set_port(&loop, ast_sockaddr_port(&local)); + } else if (component == AST_RTP_ICE_COMPONENT_RTCP) { + turn_sock = &rtp->turn_rtcp; + turn_cb = &ast_rtp_turn_rtcp_sock_cb; + conn_transport = TRANSPORT_TURN_RTCP; + ast_sockaddr_set_port(&loop, ast_sockaddr_port(&rtp->rtcp->us)); + } else { + return; + } + + if (transport == AST_TRANSPORT_UDP) { + conn_type = PJ_TURN_TP_UDP; + } else if (transport == AST_TRANSPORT_TCP) { + conn_type = PJ_TURN_TP_TCP; + } else { + ast_assert(0); + return; + } + + ast_sockaddr_parse(&addr, server, PARSE_PORT_FORBID); + + ast_mutex_lock(&rtp->lock); + if (*turn_sock) { + pj_turn_sock_destroy(*turn_sock); + rtp->turn_state = PJ_TURN_STATE_NULL; + while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) { + ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts); + } + } + ast_mutex_unlock(&rtp->lock); + + if (component == AST_RTP_ICE_COMPONENT_RTP && !rtp->ioqueue) { + rtp->ioqueue = rtp_ioqueue_thread_get_or_create(); + if (!rtp->ioqueue) { + return; + } + } + + pj_stun_config_init(&stun_config, &cachingpool.factory, 0, rtp->ioqueue->ioqueue, rtp->ioqueue->timerheap); + + if (pj_turn_sock_create(&stun_config, ast_sockaddr_is_ipv4(&addr) ? pj_AF_INET() : pj_AF_INET6(), conn_type, + turn_cb, NULL, instance, turn_sock) != PJ_SUCCESS) { + ast_log(LOG_WARNING, "Could not create a TURN client socket\n"); + return; + } + + cred.type = PJ_STUN_AUTH_CRED_STATIC; + pj_strset2(&cred.data.static_cred.username, (char*)username); + cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; + pj_strset2(&cred.data.static_cred.data, (char*)password); + + /* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */ + ast_mutex_lock(&rtp->lock); + pj_turn_sock_alloc(*turn_sock, pj_cstr(&turn_addr, server), port, NULL, &cred, NULL); + while (rtp->turn_state < PJ_TURN_STATE_READY) { + ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts); + } + ast_mutex_unlock(&rtp->lock); + + /* If a TURN session was allocated add it as a candidate */ + if (rtp->turn_state != PJ_TURN_STATE_READY) { + return; + } + + pj_turn_sock_get_info(*turn_sock, &info); + + ast_rtp_ice_add_cand(rtp, component, conn_transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr, + &info.relay_addr, NULL, pj_sockaddr_get_len(&info.relay_addr)); + + if (component == AST_RTP_ICE_COMPONENT_RTP) { + ast_sockaddr_copy(&rtp->rtp_loop, &loop); + } else if (component == AST_RTP_ICE_COMPONENT_RTCP) { + ast_sockaddr_copy(&rtp->rtcp_loop, &loop); + } +} + static char *generate_random_string(char *buf, size_t size) { long val[4]; @@ -819,6 +1184,7 @@ static struct ast_rtp_engine_ice ast_rtp_ice = { .get_local_candidates = ast_rtp_ice_get_local_candidates, .ice_lite = ast_rtp_ice_lite, .set_role = ast_rtp_ice_set_role, + .turn_request = ast_rtp_ice_turn_request, }; #endif @@ -1239,6 +1605,22 @@ static void ast_rtp_on_ice_complete(pj_ice_sess *ice, pj_status_t status) { struct ast_rtp_instance *instance = ice->user_data; struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + + if (status == PJ_SUCCESS) { + struct ast_sockaddr remote_address; + + /* Symmetric RTP must be disabled for the remote address to not get overwritten */ + ast_rtp_instance_set_prop(instance, AST_RTP_PROPERTY_NAT, 0); + + update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address); + ast_rtp_instance_set_remote_address(instance, &remote_address); + turn_enable_bind_channel(rtp, rtp->turn_rtp, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP); + + if (rtp->rtcp) { + update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &rtp->rtcp->them); + turn_enable_bind_channel(rtp, rtp->turn_rtcp, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP); + } + } #ifdef HAVE_OPENSSL_SRTP dtls_perform_handshake(instance, &rtp->dtls, 0); @@ -1263,7 +1645,13 @@ static void ast_rtp_on_ice_rx_data(pj_ice_sess *ice, unsigned comp_id, unsigned /* Instead of handling the packet here (which really doesn't work with our architecture) we set a bit to indicate that it should be handled after pj_ice_sess_on_rx_pkt * returns */ - rtp->passthrough = 1; + if (transport_id == TRANSPORT_SOCKET_RTP || transport_id == TRANSPORT_SOCKET_RTCP) { + rtp->passthrough = 1; + } else if (transport_id == TRANSPORT_TURN_RTP) { + rtp->rtp_passthrough = 1; + } else if (transport_id == TRANSPORT_TURN_RTCP) { + rtp->rtcp_passthrough = 1; + } } static pj_status_t ast_rtp_on_ice_tx_pkt(pj_ice_sess *ice, unsigned comp_id, unsigned transport_id, const void *pkt, pj_size_t size, const pj_sockaddr_t *dst_addr, unsigned dst_addr_len) @@ -1309,106 +1697,11 @@ static pj_ice_sess_cb ast_rtp_ice_sess_cb = { .on_tx_pkt = ast_rtp_on_ice_tx_pkt, }; -static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len) -{ - struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock); - struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); - struct ast_sockaddr dest = { { 0, }, }; - - ast_rtp_instance_get_local_address(instance, &dest); - - ast_sendto(rtp->s, pkt, pkt_len, 0, &dest); -} - -static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state) -{ - struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock); - struct ast_rtp *rtp = NULL; - - /* If this is a leftover from an already destroyed RTP instance just ignore the state change */ - if (!instance) { - return; - } - - rtp = ast_rtp_instance_get_data(instance); - - /* If the TURN session is being destroyed we need to remove it from the RTP instance */ - if (new_state == PJ_TURN_STATE_DESTROYING) { - rtp->turn_rtp = NULL; - return; - } - - /* We store the new state so the other thread can actually handle it */ - ast_mutex_lock(&rtp->lock); - rtp->turn_state = new_state; - - /* If this is a state that the main thread should be notified about do so */ - if (new_state == PJ_TURN_STATE_READY || new_state == PJ_TURN_STATE_DEALLOCATING || new_state == PJ_TURN_STATE_DEALLOCATED) { - ast_cond_signal(&rtp->cond); - } - - ast_mutex_unlock(&rtp->lock); -} - -/* RTP TURN Socket interface declaration */ -static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = { - .on_rx_data = ast_rtp_on_turn_rx_rtp_data, - .on_state = ast_rtp_on_turn_rtp_state, -}; - -static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len) -{ - struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock); - struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); - - ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp->us); -} - -static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state) -{ - struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock); - struct ast_rtp *rtp = NULL; - - /* If this is a leftover from an already destroyed RTP instance just ignore the state change */ - if (!instance) { - return; - } - - rtp = ast_rtp_instance_get_data(instance); - - /* If the TURN session is being destroyed we need to remove it from the RTP instance */ - if (new_state == PJ_TURN_STATE_DESTROYING) { - rtp->turn_rtcp = NULL; - return; - } - - /* We store the new state so the other thread can actually handle it */ - ast_mutex_lock(&rtp->lock); - rtp->turn_state = new_state; - - /* If this is a state that the main thread should be notified about do so */ - if (new_state == PJ_TURN_STATE_READY || new_state == PJ_TURN_STATE_DEALLOCATING || new_state == PJ_TURN_STATE_DEALLOCATED) { - ast_cond_signal(&rtp->cond); - } - - ast_mutex_unlock(&rtp->lock); -} - -/* RTCP TURN Socket interface declaration */ -static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = { - .on_rx_data = ast_rtp_on_turn_rx_rtcp_data, - .on_state = ast_rtp_on_turn_rtcp_state, -}; - -/*! \brief Worker thread for I/O queue and timerheap */ -static int ice_worker_thread(void *data) +/*! \brief Worker thread for timerheap */ +static int timer_worker_thread(void *data) { - while (!worker_terminate) { - const pj_time_val delay = {0, 10}; - - pj_ioqueue_poll(ioqueue, &delay); - - pj_timer_heap_poll(timerheap, NULL); + while (!timer_terminate) { + pj_timer_heap_poll(timer_heap, NULL); } return 0; @@ -1697,6 +1990,9 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); struct ast_srtp *srtp = ast_rtp_instance_get_srtp(instance); char *in = buf; +#ifdef HAVE_PJPROJECT + struct ast_sockaddr *loop = rtcp ? &rtp->rtcp_loop : &rtp->rtp_loop; +#endif if ((len = ast_recvfrom(rtcp ? rtp->rtcp->s : rtp->s, buf, size, flags, sa)) < 0) { return len; @@ -1752,7 +2048,16 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s #endif #ifdef HAVE_PJPROJECT - if (rtp->ice) { + if (!ast_sockaddr_isnull(loop) && !ast_sockaddr_cmp(loop, sa)) { + /* ICE traffic will have been handled in the TURN callback, so skip it but update the address + * so it reflects the actual source and not the loopback + */ + if (rtcp) { + ast_sockaddr_copy(sa, &rtp->rtcp->them); + } else { + ast_rtp_instance_get_remote_address(instance, sa); + } + } else if (rtp->ice) { pj_str_t combined = pj_str(ast_sockaddr_stringify(sa)); pj_sockaddr address; pj_status_t status; @@ -1769,7 +2074,7 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s pj_strerror(status, buf, sizeof(buf)); ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n", - (int) status, buf); + (int)status, buf); return -1; } if (!rtp->passthrough) { @@ -1942,7 +2247,7 @@ static int rtp_learning_rtp_seq_update(struct rtp_learning_info *info, uint16_t #ifdef HAVE_PJPROJECT static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct ast_rtp *rtp, struct ast_sockaddr *addr, int port, int component, - int transport, const pj_turn_sock_cb *turn_cb, pj_turn_sock **turn_sock) + int transport) { pj_sockaddr address[16]; unsigned int count = PJ_ARRAY_SIZE(address), pos = 0; @@ -1981,38 +2286,9 @@ static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct } /* If configured to use a TURN relay create a session and allocate */ - if (pj_strlen(&turnaddr) && pj_turn_sock_create(&rtp->ice->stun_cfg, ast_sockaddr_is_ipv4(addr) ? pj_AF_INET() : pj_AF_INET6(), PJ_TURN_TP_TCP, - turn_cb, NULL, instance, turn_sock) == PJ_SUCCESS) { - pj_stun_auth_cred cred = { 0, }; - struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_ALLOCATION_WAIT_TIME, 1000)); - struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, }; - - cred.type = PJ_STUN_AUTH_CRED_STATIC; - cred.data.static_cred.username = turnusername; - cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN; - cred.data.static_cred.data = turnpassword; - - /* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */ - ast_mutex_lock(&rtp->lock); - pj_turn_sock_alloc(*turn_sock, &turnaddr, turnport, NULL, &cred, NULL); - ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts); - ast_mutex_unlock(&rtp->lock); - - /* If a TURN session was allocated add it as a candidate */ - if (rtp->turn_state == PJ_TURN_STATE_READY) { - pj_turn_session_info info; - - pj_turn_sock_get_info(*turn_sock, &info); - - if (transport == TRANSPORT_SOCKET_RTP) { - transport = TRANSPORT_TURN_RTP; - } else if (transport == TRANSPORT_SOCKET_RTCP) { - transport = TRANSPORT_TURN_RTCP; - } - - ast_rtp_ice_add_cand(rtp, component, transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr, &info.relay_addr, - NULL, pj_sockaddr_get_len(&info.relay_addr)); - } + if (pj_strlen(&turnaddr)) { + ast_rtp_ice_turn_request(instance, component, AST_TRANSPORT_TCP, pj_strbuf(&turnaddr), turnport, + pj_strbuf(&turnusername), pj_strbuf(&turnpassword)); } } #endif @@ -2071,7 +2347,7 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad pj_thread_register_check(); - pj_stun_config_init(&stun_config, &cachingpool.factory, 0, ioqueue, timerheap); + pj_stun_config_init(&stun_config, &cachingpool.factory, 0, NULL, timer_heap); ufrag = pj_str(rtp->local_ufrag); passwd = pj_str(rtp->local_passwd); @@ -2084,14 +2360,14 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad /* Add all of the available candidates to the ICE session */ rtp_add_candidates_to_ice(instance, rtp, addr, port, AST_RTP_ICE_COMPONENT_RTP, - TRANSPORT_SOCKET_RTP, &ast_rtp_turn_rtp_sock_cb, &rtp->turn_rtp); + TRANSPORT_SOCKET_RTP); /* Only add the RTCP candidates to ICE when replacing the session. New sessions * handle this in a separate part of the setup phase */ if (replace && rtp->rtcp) { rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, - TRANSPORT_SOCKET_RTCP, &ast_rtp_turn_rtcp_sock_cb, &rtp->turn_rtcp); + TRANSPORT_SOCKET_RTCP); } return 0; @@ -2200,6 +2476,8 @@ static int ast_rtp_new(struct ast_rtp_instance *instance, static int ast_rtp_destroy(struct ast_rtp_instance *instance) { struct ast_rtp *rtp = ast_rtp_instance_get_data(instance); + struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000)); + struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, }; /* Destroy the smoother that was smoothing out audio if present */ if (rtp->smoother) { @@ -2236,21 +2514,33 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance) #ifdef HAVE_PJPROJECT pj_thread_register_check(); - /* Destroy the ICE session if being used */ - if (rtp->ice) { - pj_ice_sess_destroy(rtp->ice); - } - /* Destroy the RTP TURN relay if being used */ + ast_mutex_lock(&rtp->lock); if (rtp->turn_rtp) { - pj_turn_sock_set_user_data(rtp->turn_rtp, NULL); pj_turn_sock_destroy(rtp->turn_rtp); + rtp->turn_state = PJ_TURN_STATE_NULL; + while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) { + ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts); + } } /* Destroy the RTCP TURN relay if being used */ if (rtp->turn_rtcp) { - pj_turn_sock_set_user_data(rtp->turn_rtcp, NULL); pj_turn_sock_destroy(rtp->turn_rtcp); + rtp->turn_state = PJ_TURN_STATE_NULL; + while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) { + ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts); + } + } + ast_mutex_unlock(&rtp->lock); + + if (rtp->ioqueue) { + rtp_ioqueue_thread_remove(rtp->ioqueue); + } + + /* Destroy the ICE session if being used */ + if (rtp->ice) { + pj_ice_sess_destroy(rtp->ice); } /* Destroy any candidates */ @@ -2357,7 +2647,6 @@ static int ast_rtp_dtmf_begin(struct ast_rtp_instance *instance, char digit) ast_sockaddr_stringify(&remote_address), strerror(errno)); } - update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address); if (rtp_debug_test_addr(&remote_address)) { ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n", ast_sockaddr_stringify(&remote_address), @@ -2407,8 +2696,6 @@ static int ast_rtp_dtmf_continuation(struct ast_rtp_instance *instance) strerror(errno)); } - update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address); - if (rtp_debug_test_addr(&remote_address)) { ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n", ast_sockaddr_stringify(&remote_address), @@ -2483,8 +2770,6 @@ static int ast_rtp_dtmf_end_with_duration(struct ast_rtp_instance *instance, cha strerror(errno)); } - update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address); - if (rtp_debug_test_addr(&remote_address)) { ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n", ast_sockaddr_stringify(&remote_address), @@ -2745,8 +3030,6 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr) rtp->rtcp->rr_count++; } - update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &remote_address); - if (rtcp_debug_test_addr(&rtp->rtcp->them)) { ast_verbose("* Sent RTCP %s to %s%s\n", sr ? "SR" : "RR", ast_sockaddr_stringify(&remote_address), ice ? " (via ICE)" : ""); @@ -2940,8 +3223,6 @@ static int ast_rtp_raw_write(struct ast_rtp_instance *instance, struct ast_frame } } - update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address); - if (rtp_debug_test_addr(&remote_address)) { ast_verbose("Sent RTP packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n", ast_sockaddr_stringify(&remote_address), @@ -3910,8 +4191,6 @@ static int bridge_p2p_rtp_write(struct ast_rtp_instance *instance, unsigned int return 0; } - update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address); - if (rtp_debug_test_addr(&remote_address)) { ast_verbose("Sent RTP P2P packet to %s%s (type %-2.2d, len %-6.6d)\n", ast_sockaddr_stringify(&remote_address), @@ -4388,8 +4667,7 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro #ifdef HAVE_PJPROJECT if (rtp->ice) { - rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_SOCKET_RTCP, - &ast_rtp_turn_rtcp_sock_cb, &rtp->turn_rtcp); + rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_SOCKET_RTCP); } #endif @@ -4681,8 +4959,6 @@ static int ast_rtp_sendcng(struct ast_rtp_instance *instance, int level) return res; } - update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address); - if (rtp_debug_test_addr(&remote_address)) { ast_verbose("Sent Comfort Noise RTP packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n", ast_sockaddr_stringify(&remote_address), @@ -4949,17 +5225,17 @@ static int rtp_reload(int reload) if (ast_parse_arg(s, PARSE_INADDR, &addr)) { ast_log(LOG_WARNING, "Invalid TURN server address: %s\n", s); } else { - pj_strdup2(pool, &turnaddr, ast_inet_ntoa(addr.sin_addr)); + pj_strdup2_with_null(pool, &turnaddr, ast_inet_ntoa(addr.sin_addr)); /* ntohs() is not a bug here. The port number is used in host byte order with * a pjnat API. */ turnport = ntohs(addr.sin_port); } } if ((s = ast_variable_retrieve(cfg, "general", "turnusername"))) { - pj_strdup2(pool, &turnusername, s); + pj_strdup2_with_null(pool, &turnusername, s); } if ((s = ast_variable_retrieve(cfg, "general", "turnpassword"))) { - pj_strdup2(pool, &turnpassword, s); + pj_strdup2_with_null(pool, &turnpassword, s); } #endif ast_config_destroy(cfg); @@ -4979,6 +5255,20 @@ static int reload_module(void) return 0; } +#ifdef HAVE_PJPROJECT +static void rtp_terminate_pjproject(void) +{ + if (timer_thread) { + timer_terminate = 1; + pj_thread_join(timer_thread); + pj_thread_destroy(timer_thread); + } + + pj_caching_pool_destroy(&cachingpool); + pj_shutdown(); +} +#endif + static int load_module(void) { #ifdef HAVE_PJPROJECT @@ -4989,65 +5279,49 @@ static int load_module(void) } if (pjlib_util_init() != PJ_SUCCESS) { - pj_shutdown(); + rtp_terminate_pjproject(); return AST_MODULE_LOAD_DECLINE; } if (pjnath_init() != PJ_SUCCESS) { - pj_shutdown(); + rtp_terminate_pjproject(); return AST_MODULE_LOAD_DECLINE; } pj_caching_pool_init(&cachingpool, &pj_pool_factory_default_policy, 0); - pool = pj_pool_create(&cachingpool.factory, "rtp", 512, 512, NULL); + pool = pj_pool_create(&cachingpool.factory, "timer", 512, 512, NULL); - if (pj_timer_heap_create(pool, 100, &timerheap) != PJ_SUCCESS) { - pj_caching_pool_destroy(&cachingpool); - pj_shutdown(); + if (pj_timer_heap_create(pool, 100, &timer_heap) != PJ_SUCCESS) { + rtp_terminate_pjproject(); return AST_MODULE_LOAD_DECLINE; } if (pj_lock_create_recursive_mutex(pool, "rtp%p", &lock) != PJ_SUCCESS) { - pj_caching_pool_destroy(&cachingpool); - pj_shutdown(); + rtp_terminate_pjproject(); return AST_MODULE_LOAD_DECLINE; } - pj_timer_heap_set_lock(timerheap, lock, PJ_TRUE); + pj_timer_heap_set_lock(timer_heap, lock, PJ_TRUE); - if (pj_ioqueue_create(pool, 16, &ioqueue) != PJ_SUCCESS) { - pj_caching_pool_destroy(&cachingpool); - pj_shutdown(); + if (pj_thread_create(pool, "timer", &timer_worker_thread, NULL, 0, 0, &timer_thread) != PJ_SUCCESS) { + rtp_terminate_pjproject(); return AST_MODULE_LOAD_DECLINE; } - if (pj_thread_create(pool, "ice", &ice_worker_thread, NULL, 0, 0, &thread) != PJ_SUCCESS) { - pj_caching_pool_destroy(&cachingpool); - pj_shutdown(); - return AST_MODULE_LOAD_DECLINE; - } #endif if (ast_rtp_engine_register(&asterisk_rtp_engine)) { #ifdef HAVE_PJPROJECT - worker_terminate = 1; - pj_thread_join(thread); - pj_thread_destroy(thread); - pj_caching_pool_destroy(&cachingpool); - pj_shutdown(); + rtp_terminate_pjproject(); #endif return AST_MODULE_LOAD_DECLINE; } if (ast_cli_register_multiple(cli_rtp, ARRAY_LEN(cli_rtp))) { #ifdef HAVE_PJPROJECT - worker_terminate = 1; - pj_thread_join(thread); - pj_thread_destroy(thread); ast_rtp_engine_unregister(&asterisk_rtp_engine); - pj_caching_pool_destroy(&cachingpool); - pj_shutdown(); + rtp_terminate_pjproject(); #endif return AST_MODULE_LOAD_DECLINE; } @@ -5063,15 +5337,8 @@ static int unload_module(void) ast_cli_unregister_multiple(cli_rtp, ARRAY_LEN(cli_rtp)); #ifdef HAVE_PJPROJECT - worker_terminate = 1; - pj_thread_register_check(); - - pj_thread_join(thread); - pj_thread_destroy(thread); - - pj_caching_pool_destroy(&cachingpool); - pj_shutdown(); + rtp_terminate_pjproject(); #endif return 0;