@ -45,6 +45,20 @@
# define MAX_RECV_LOOP_STRIKES 5
# endif
# define DS_io(x, ps, ke, io) do { \
uint64_t ks_val ; \
ks_val = atomic64_get ( & ps - > kernel_stats_ # # io . x ) ; \
if ( ( ke ) - > x < ks_val ) \
diff_ # # x # # _ # # io = 0 ; \
else \
diff_ # # x # # _ # # io = ( ke ) - > x - ks_val ; \
atomic64_add ( & ps - > stats_ # # io . x , diff_ # # x # # _ # # io ) ; \
atomic64_add ( & ps - > selected_sfd - > local_intf - > stats . io . x , diff_ # # x # # _ # # io ) ; \
RTPE_STATS_ADD ( x # # _kernel , diff_ # # x # # _ # # io ) ; \
} while ( 0 )
# define DS(x) DS_io(x, ps, &ke->stats_in, in)
# define DSo(x) DS_io(x, sink, stats_o, out)
struct intf_rr {
@ -2603,7 +2617,9 @@ static int media_packet_queue_dup(GQueue *q) {
return 0 ;
}
// reverse of count_stream_stats_kernel()
/**
* reverse of count_stream_stats_kernel ( )
*/
static void count_stream_stats_userspace ( struct packet_stream * ps ) {
if ( ! PS_ISSET ( ps , RTP ) )
return ;
@ -2625,7 +2641,30 @@ static void count_stream_stats_userspace(struct packet_stream *ps) {
RTPE_GAUGE_INC ( userspace_streams ) ;
}
}
/**
* reverse of count_stream_stats_userspace ( )
*/
static void count_stream_stats_kernel ( struct packet_stream * ps ) {
if ( ! PS_ISSET ( ps , RTP ) )
return ;
if ( bf_set ( & ps - > stats_flags , PS_STATS_KERNEL ) )
return ; // flag was already set, nothing to do
if ( bf_isset ( & ps - > stats_flags , PS_STATS_USERSPACE ) ) {
// mixed stream. count as only mixed stream.
if ( bf_clear ( & ps - > stats_flags , PS_STATS_KERNEL_COUNTED ) )
RTPE_GAUGE_DEC ( kernel_only_streams ) ;
if ( bf_clear ( & ps - > stats_flags , PS_STATS_USERSPACE_COUNTED ) )
RTPE_GAUGE_DEC ( userspace_streams ) ;
if ( ! bf_set ( & ps - > stats_flags , PS_STATS_MIXED_COUNTED ) )
RTPE_GAUGE_INC ( kernel_user_streams ) ;
}
else {
// kernel-only (for now). count it.
if ( ! bf_set ( & ps - > stats_flags , PS_STATS_KERNEL_COUNTED ) )
RTPE_GAUGE_INC ( kernel_only_streams ) ;
}
}
/**
* Packet handling starts in stream_packet ( ) .
@ -3297,3 +3336,184 @@ struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_
ret - > last_run = rtpe_now ;
return & ret - > stats ;
}
/**
* Ports iterations ( stats update from the kernel ) functionality .
*/
static void kernel_stats_updater ( void ) {
struct rtpengine_list_entry * ke ;
struct packet_stream * ps ;
int j ;
struct rtp_stats * rs ;
unsigned int pt ;
endpoint_t ep ;
/* TODO: should we realy check the count of call timers? `call_timer_iterator()` */
GList * kl = kernel_list ( ) ;
while ( kl ) {
ke = kl - > data ;
kernel2endpoint ( & ep , & ke - > target . local ) ;
AUTO_CLEANUP ( struct stream_fd * sfd , stream_fd_auto_cleanup ) = stream_fd_lookup ( & ep ) ;
if ( ! sfd )
goto next ;
log_info_stream_fd ( sfd ) ;
rwlock_lock_r ( & sfd - > call - > master_lock ) ;
ps = sfd - > stream ;
if ( ! ps | | ps - > selected_sfd ! = sfd ) {
rwlock_unlock_r ( & sfd - > call - > master_lock ) ;
goto next ;
}
uint64_t diff_packets_in , diff_bytes_in , diff_errors_in ;
uint64_t diff_packets_out , diff_bytes_out , diff_errors_out ;
DS ( packets ) ;
DS ( bytes ) ;
DS ( errors ) ;
if ( ke - > stats_in . packets ! = atomic64_get ( & ps - > kernel_stats_in . packets ) ) {
atomic64_set ( & ps - > last_packet , rtpe_now . tv_sec ) ;
count_stream_stats_kernel ( ps ) ;
}
ps - > in_tos_tclass = ke - > stats_in . tos ;
# if (RE_HAS_MEASUREDELAY)
/* XXX fix atomicity */
ps - > stats_in . delay_min = ke - > stats_in . delay_min ;
ps - > stats_in . delay_avg = ke - > stats_in . delay_avg ;
ps - > stats_in . delay_max = ke - > stats_in . delay_max ;
# endif
atomic64_set ( & ps - > kernel_stats_in . bytes , ke - > stats_in . bytes ) ;
atomic64_set ( & ps - > kernel_stats_in . packets , ke - > stats_in . packets ) ;
atomic64_set ( & ps - > kernel_stats_in . errors , ke - > stats_in . errors ) ;
uint64_t max_diff = 0 ;
int max_pt = - 1 ;
for ( j = 0 ; j < ke - > target . num_payload_types ; j + + ) {
pt = ke - > target . pt_input [ j ] . pt_num ;
rs = g_hash_table_lookup ( ps - > rtp_stats , GINT_TO_POINTER ( pt ) ) ;
if ( ! rs )
continue ;
if ( ke - > rtp_stats [ j ] . packets > atomic64_get ( & rs - > packets ) ) {
uint64_t diff = ke - > rtp_stats [ j ] . packets - atomic64_get ( & rs - > packets ) ;
atomic64_add ( & rs - > packets , diff ) ;
if ( diff > max_diff ) {
max_diff = diff ;
max_pt = pt ;
}
}
if ( ke - > rtp_stats [ j ] . bytes > atomic64_get ( & rs - > bytes ) )
atomic64_add ( & rs - > bytes ,
ke - > rtp_stats [ j ] . bytes - atomic64_get ( & rs - > bytes ) ) ;
atomic64_set ( & rs - > kernel_packets , ke - > rtp_stats [ j ] . packets ) ;
atomic64_set ( & rs - > kernel_bytes , ke - > rtp_stats [ j ] . bytes ) ;
}
bool update = false ;
if ( diff_packets_in )
sfd - > call - > foreign_media = 0 ;
if ( ! ke - > target . non_forwarding & & diff_packets_in ) {
for ( GList * l = ps - > rtp_sinks . head ; l ; l = l - > next ) {
struct sink_handler * sh = l - > data ;
struct packet_stream * sink = sh - > sink ;
if ( sh - > kernel_output_idx < 0
| | sh - > kernel_output_idx > = ke - > target . num_destinations )
continue ;
struct rtpengine_output_info * o = & ke - > outputs [ sh - > kernel_output_idx ] ;
struct rtpengine_stats * stats_o = & ke - > stats_out [ sh - > kernel_output_idx ] ;
DSo ( bytes ) ;
DSo ( packets ) ;
DSo ( errors ) ;
atomic64_set ( & sink - > kernel_stats_out . bytes , stats_o - > bytes ) ;
atomic64_set ( & sink - > kernel_stats_out . packets , stats_o - > packets ) ;
atomic64_set ( & sink - > kernel_stats_out . errors , stats_o - > errors ) ;
mutex_lock ( & sink - > out_lock ) ;
for ( unsigned int u = 0 ; u < G_N_ELEMENTS ( ke - > target . ssrc ) ; u + + ) {
if ( ! ke - > target . ssrc [ u ] ) // end of list
break ;
uint32_t out_ssrc = o - > ssrc_out [ u ] ;
if ( ! out_ssrc )
out_ssrc = ke - > target . ssrc [ u ] ;
struct ssrc_ctx * ctx = __hunt_ssrc_ctx ( ntohl ( out_ssrc ) ,
sink - > ssrc_out , 0 ) ;
if ( ! ctx )
continue ;
if ( max_pt ! = - 1 )
payload_tracker_add ( & ctx - > tracker , max_pt ) ;
if ( sink - > crypto . params . crypto_suite
& & o - > encrypt . last_index [ u ] - ctx - > srtp_index > 0x4000 )
{
ilog ( LOG_DEBUG , " Updating SRTP encryption index from % " PRIu64
" to % " PRIu64 ,
ctx - > srtp_index ,
o - > encrypt . last_index [ u ] ) ;
ctx - > srtp_index = o - > encrypt . last_index [ u ] ;
update = true ;
}
}
mutex_unlock ( & sink - > out_lock ) ;
}
mutex_lock ( & ps - > in_lock ) ;
for ( unsigned int u = 0 ; u < G_N_ELEMENTS ( ke - > target . ssrc ) ; u + + ) {
if ( ! ke - > target . ssrc [ u ] ) // end of list
break ;
struct ssrc_ctx * ctx = __hunt_ssrc_ctx ( ntohl ( ke - > target . ssrc [ u ] ) ,
ps - > ssrc_in , 0 ) ;
if ( ! ctx )
continue ;
// TODO: add in SSRC stats similar to __stream_update_stats
atomic64_set ( & ctx - > last_seq , ke - > target . decrypt . last_index [ u ] ) ;
if ( max_pt ! = - 1 )
payload_tracker_add ( & ctx - > tracker , max_pt ) ;
if ( sfd - > crypto . params . crypto_suite
& & ke - > target . decrypt . last_index [ u ]
- ctx - > srtp_index > 0x4000 ) {
ilog ( LOG_DEBUG , " Updating SRTP decryption index from % " PRIu64
" to % " PRIu64 ,
ctx - > srtp_index ,
ke - > target . decrypt . last_index [ u ] ) ;
ctx - > srtp_index = ke - > target . decrypt . last_index [ u ] ;
update = true ;
}
}
mutex_unlock ( & ps - > in_lock ) ;
}
rwlock_unlock_r ( & sfd - > call - > master_lock ) ;
if ( update )
redis_update_onekey ( ps - > call , rtpe_redis_write ) ;
next :
g_slice_free1 ( sizeof ( * ke ) , ke ) ;
kl = g_list_delete_link ( kl , kl ) ;
log_info_pop ( ) ;
}
}
void kernel_stats_updater_iterator ( void * dummy ) {
while ( ! rtpe_shutdown ) {
kernel_stats_updater ( ) ;
thread_cancel_enable ( ) ;
usleep ( 1000000 ) ; /* sleep for 1 second in each iteration */
thread_cancel_disable ( ) ;
}
}