MT#57337 Move kernel stats update to a separate thread

To do the work more efficiently,
and not be dependent on the call_timer runs by poller,
we should move the ports iterations (stats update from the kernel)
functionality to a separate thread, to make it faster and
not be dependent on what happens in the `call_timer` at all.
Since it has nothing to do with the call timers.

As an additional benefit: we unload the `call_timer` runner.

Change-Id: I511529ce504ef3d29f4e9d6d731ffd470d78d27a
pull/1665/head
Donat Zenichev 3 years ago
parent 1162f4a890
commit 6fe9cc9b8a

@ -499,55 +499,8 @@ destroy:
free(url_suffix);
}
// reverse of count_stream_stats_userspace()
static void count_stream_stats_kernel(struct packet_stream *ps) {
if (!PS_ISSET(ps, RTP))
return;
if (bf_set(&ps->stats_flags, PS_STATS_KERNEL))
return; // flag was already set, nothing to do
if (bf_isset(&ps->stats_flags, PS_STATS_USERSPACE)) {
// mixed stream. count as only mixed stream.
if (bf_clear(&ps->stats_flags, PS_STATS_KERNEL_COUNTED))
RTPE_GAUGE_DEC(kernel_only_streams);
if (bf_clear(&ps->stats_flags, PS_STATS_USERSPACE_COUNTED))
RTPE_GAUGE_DEC(userspace_streams);
if (!bf_set(&ps->stats_flags, PS_STATS_MIXED_COUNTED))
RTPE_GAUGE_INC(kernel_user_streams);
}
else {
// kernel-only (for now). count it.
if (!bf_set(&ps->stats_flags, PS_STATS_KERNEL_COUNTED))
RTPE_GAUGE_INC(kernel_only_streams);
}
}
#define DS_io(x, ps, ke, io) do { \
uint64_t ks_val; \
ks_val = atomic64_get(&ps->kernel_stats_ ## io.x); \
if ((ke)->x < ks_val) \
diff_ ## x ## _ ## io = 0; \
else \
diff_ ## x ## _ ## io = (ke)->x - ks_val; \
atomic64_add(&ps->stats_ ## io.x, diff_ ## x ## _ ## io); \
atomic64_add(&ps->selected_sfd->local_intf->stats.io.x, diff_ ## x ## _ ## io); \
RTPE_STATS_ADD(x ## _kernel, diff_ ## x ## _ ## io); \
} while (0)
#define DS(x) DS_io(x, ps, &ke->stats_in, in)
#define DSo(x) DS_io(x, sink, stats_o, out)
void call_timer(void *ptr) {
struct iterator_helper hlp;
GList *i;
struct rtpengine_list_entry *ke;
struct packet_stream *ps;
int j;
struct rtp_stats *rs;
unsigned int pt;
endpoint_t ep;
struct timeval tv_start;
long long run_diff_us;
@ -575,166 +528,6 @@ void call_timer(void *ptr) {
// stats derived while iterating calls
RTPE_GAUGE_SET(transcoded_media, hlp.transcoded_media);
// TODO: eliminate/split out most of what this single central timer does
i = hlp.count ? kernel_list() : NULL;
while (i) {
ke = i->data;
kernel2endpoint(&ep, &ke->target.local);
AUTO_CLEANUP(struct stream_fd *sfd, stream_fd_auto_cleanup) = stream_fd_lookup(&ep);
if (!sfd)
goto next;
log_info_stream_fd(sfd);
rwlock_lock_r(&sfd->call->master_lock);
ps = sfd->stream;
if (!ps || ps->selected_sfd != sfd) {
rwlock_unlock_r(&sfd->call->master_lock);
goto next;
}
uint64_t diff_packets_in, diff_bytes_in, diff_errors_in;
uint64_t diff_packets_out, diff_bytes_out, diff_errors_out;
DS(packets);
DS(bytes);
DS(errors);
if (ke->stats_in.packets != atomic64_get(&ps->kernel_stats_in.packets)) {
atomic64_set(&ps->last_packet, rtpe_now.tv_sec);
count_stream_stats_kernel(ps);
}
ps->in_tos_tclass = ke->stats_in.tos;
#if (RE_HAS_MEASUREDELAY)
/* XXX fix atomicity */
ps->stats_in.delay_min = ke->stats_in.delay_min;
ps->stats_in.delay_avg = ke->stats_in.delay_avg;
ps->stats_in.delay_max = ke->stats_in.delay_max;
#endif
atomic64_set(&ps->kernel_stats_in.bytes, ke->stats_in.bytes);
atomic64_set(&ps->kernel_stats_in.packets, ke->stats_in.packets);
atomic64_set(&ps->kernel_stats_in.errors, ke->stats_in.errors);
uint64_t max_diff = 0;
int max_pt = -1;
for (j = 0; j < ke->target.num_payload_types; j++) {
pt = ke->target.pt_input[j].pt_num;
rs = g_hash_table_lookup(ps->rtp_stats, GINT_TO_POINTER(pt));
if (!rs)
continue;
if (ke->rtp_stats[j].packets > atomic64_get(&rs->packets)) {
uint64_t diff = ke->rtp_stats[j].packets - atomic64_get(&rs->packets);
atomic64_add(&rs->packets, diff);
if (diff > max_diff) {
max_diff = diff;
max_pt = pt;
}
}
if (ke->rtp_stats[j].bytes > atomic64_get(&rs->bytes))
atomic64_add(&rs->bytes,
ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes));
atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets);
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
}
bool update = false;
if (diff_packets_in)
sfd->call->foreign_media = 0;
if (!ke->target.non_forwarding && diff_packets_in) {
for (GList *l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
if (sh->kernel_output_idx < 0
|| sh->kernel_output_idx >= ke->target.num_destinations)
continue;
struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx];
struct rtpengine_stats *stats_o = &ke->stats_out[sh->kernel_output_idx];
DSo(bytes);
DSo(packets);
DSo(errors);
atomic64_set(&sink->kernel_stats_out.bytes, stats_o->bytes);
atomic64_set(&sink->kernel_stats_out.packets, stats_o->packets);
atomic64_set(&sink->kernel_stats_out.errors, stats_o->errors);
mutex_lock(&sink->out_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
break;
uint32_t out_ssrc = o->ssrc_out[u];
if (!out_ssrc)
out_ssrc = ke->target.ssrc[u];
struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(out_ssrc),
sink->ssrc_out, 0);
if (!ctx)
continue;
if (max_pt != -1)
payload_tracker_add(&ctx->tracker, max_pt);
if (sink->crypto.params.crypto_suite
&& o->encrypt.last_index[u] - ctx->srtp_index > 0x4000)
{
ilog(LOG_DEBUG, "Updating SRTP encryption index from %" PRIu64
" to %" PRIu64,
ctx->srtp_index,
o->encrypt.last_index[u]);
ctx->srtp_index = o->encrypt.last_index[u];
update = true;
}
}
mutex_unlock(&sink->out_lock);
}
mutex_lock(&ps->in_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
break;
struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(ke->target.ssrc[u]),
ps->ssrc_in, 0);
if (!ctx)
continue;
// TODO: add in SSRC stats similar to __stream_update_stats
atomic64_set(&ctx->last_seq, ke->target.decrypt.last_index[u]);
if (max_pt != -1)
payload_tracker_add(&ctx->tracker, max_pt);
if (sfd->crypto.params.crypto_suite
&& ke->target.decrypt.last_index[u]
- ctx->srtp_index > 0x4000) {
ilog(LOG_DEBUG, "Updating SRTP decryption index from %" PRIu64
" to %" PRIu64,
ctx->srtp_index,
ke->target.decrypt.last_index[u]);
ctx->srtp_index = ke->target.decrypt.last_index[u];
update = true;
}
}
mutex_unlock(&ps->in_lock);
}
rwlock_unlock_r(&sfd->call->master_lock);
if (update)
redis_update_onekey(ps->call, rtpe_redis_write);
next:
g_slice_free1(sizeof(*ke), ke);
i = g_list_delete_link(i, i);
log_info_pop();
}
kill_calls_timer(hlp.del_scheduled, NULL);
kill_calls_timer(hlp.del_timeout, rtpe_config.b2b_url);

@ -1354,6 +1354,10 @@ int main(int argc, char **argv) {
thread_create_detach_prio(call_rate_stats_updater, NULL, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "call rate stats");
/* separate thread for ports iterations (stats update from the kernel) functionality */
thread_create_detach_prio(kernel_stats_updater_iterator, NULL, rtpe_config.idle_scheduling,
rtpe_config.idle_priority, "kernel stats updater");
if (!is_addr_unspecified(&rtpe_config.redis_ep.address) && initial_rtpe_config.redis_delete_async)
thread_create_detach(redis_delete_async_loop, NULL, "redis async");

@ -45,6 +45,20 @@
#define MAX_RECV_LOOP_STRIKES 5
#endif
#define DS_io(x, ps, ke, io) do { \
uint64_t ks_val; \
ks_val = atomic64_get(&ps->kernel_stats_ ## io.x); \
if ((ke)->x < ks_val) \
diff_ ## x ## _ ## io = 0; \
else \
diff_ ## x ## _ ## io = (ke)->x - ks_val; \
atomic64_add(&ps->stats_ ## io.x, diff_ ## x ## _ ## io); \
atomic64_add(&ps->selected_sfd->local_intf->stats.io.x, diff_ ## x ## _ ## io); \
RTPE_STATS_ADD(x ## _kernel, diff_ ## x ## _ ## io); \
} while (0)
#define DS(x) DS_io(x, ps, &ke->stats_in, in)
#define DSo(x) DS_io(x, sink, stats_o, out)
struct intf_rr {
@ -2603,7 +2617,9 @@ static int media_packet_queue_dup(GQueue *q) {
return 0;
}
// reverse of count_stream_stats_kernel()
/**
* reverse of count_stream_stats_kernel()
*/
static void count_stream_stats_userspace(struct packet_stream *ps) {
if (!PS_ISSET(ps, RTP))
return;
@ -2625,7 +2641,30 @@ static void count_stream_stats_userspace(struct packet_stream *ps) {
RTPE_GAUGE_INC(userspace_streams);
}
}
/**
* reverse of count_stream_stats_userspace()
*/
static void count_stream_stats_kernel(struct packet_stream *ps) {
if (!PS_ISSET(ps, RTP))
return;
if (bf_set(&ps->stats_flags, PS_STATS_KERNEL))
return; // flag was already set, nothing to do
if (bf_isset(&ps->stats_flags, PS_STATS_USERSPACE)) {
// mixed stream. count as only mixed stream.
if (bf_clear(&ps->stats_flags, PS_STATS_KERNEL_COUNTED))
RTPE_GAUGE_DEC(kernel_only_streams);
if (bf_clear(&ps->stats_flags, PS_STATS_USERSPACE_COUNTED))
RTPE_GAUGE_DEC(userspace_streams);
if (!bf_set(&ps->stats_flags, PS_STATS_MIXED_COUNTED))
RTPE_GAUGE_INC(kernel_user_streams);
}
else {
// kernel-only (for now). count it.
if (!bf_set(&ps->stats_flags, PS_STATS_KERNEL_COUNTED))
RTPE_GAUGE_INC(kernel_only_streams);
}
}
/**
* Packet handling starts in stream_packet().
@ -3297,3 +3336,184 @@ struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_
ret->last_run = rtpe_now;
return &ret->stats;
}
/**
* Ports iterations (stats update from the kernel) functionality.
*/
static void kernel_stats_updater(void) {
struct rtpengine_list_entry *ke;
struct packet_stream *ps;
int j;
struct rtp_stats *rs;
unsigned int pt;
endpoint_t ep;
/* TODO: should we realy check the count of call timers? `call_timer_iterator()` */
GList * kl = kernel_list();
while (kl) {
ke = kl->data;
kernel2endpoint(&ep, &ke->target.local);
AUTO_CLEANUP(struct stream_fd *sfd, stream_fd_auto_cleanup) = stream_fd_lookup(&ep);
if (!sfd)
goto next;
log_info_stream_fd(sfd);
rwlock_lock_r(&sfd->call->master_lock);
ps = sfd->stream;
if (!ps || ps->selected_sfd != sfd) {
rwlock_unlock_r(&sfd->call->master_lock);
goto next;
}
uint64_t diff_packets_in, diff_bytes_in, diff_errors_in;
uint64_t diff_packets_out, diff_bytes_out, diff_errors_out;
DS(packets);
DS(bytes);
DS(errors);
if (ke->stats_in.packets != atomic64_get(&ps->kernel_stats_in.packets)) {
atomic64_set(&ps->last_packet, rtpe_now.tv_sec);
count_stream_stats_kernel(ps);
}
ps->in_tos_tclass = ke->stats_in.tos;
#if (RE_HAS_MEASUREDELAY)
/* XXX fix atomicity */
ps->stats_in.delay_min = ke->stats_in.delay_min;
ps->stats_in.delay_avg = ke->stats_in.delay_avg;
ps->stats_in.delay_max = ke->stats_in.delay_max;
#endif
atomic64_set(&ps->kernel_stats_in.bytes, ke->stats_in.bytes);
atomic64_set(&ps->kernel_stats_in.packets, ke->stats_in.packets);
atomic64_set(&ps->kernel_stats_in.errors, ke->stats_in.errors);
uint64_t max_diff = 0;
int max_pt = -1;
for (j = 0; j < ke->target.num_payload_types; j++) {
pt = ke->target.pt_input[j].pt_num;
rs = g_hash_table_lookup(ps->rtp_stats, GINT_TO_POINTER(pt));
if (!rs)
continue;
if (ke->rtp_stats[j].packets > atomic64_get(&rs->packets)) {
uint64_t diff = ke->rtp_stats[j].packets - atomic64_get(&rs->packets);
atomic64_add(&rs->packets, diff);
if (diff > max_diff) {
max_diff = diff;
max_pt = pt;
}
}
if (ke->rtp_stats[j].bytes > atomic64_get(&rs->bytes))
atomic64_add(&rs->bytes,
ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes));
atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets);
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
}
bool update = false;
if (diff_packets_in)
sfd->call->foreign_media = 0;
if (!ke->target.non_forwarding && diff_packets_in) {
for (GList *l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
if (sh->kernel_output_idx < 0
|| sh->kernel_output_idx >= ke->target.num_destinations)
continue;
struct rtpengine_output_info *o = &ke->outputs[sh->kernel_output_idx];
struct rtpengine_stats *stats_o = &ke->stats_out[sh->kernel_output_idx];
DSo(bytes);
DSo(packets);
DSo(errors);
atomic64_set(&sink->kernel_stats_out.bytes, stats_o->bytes);
atomic64_set(&sink->kernel_stats_out.packets, stats_o->packets);
atomic64_set(&sink->kernel_stats_out.errors, stats_o->errors);
mutex_lock(&sink->out_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
break;
uint32_t out_ssrc = o->ssrc_out[u];
if (!out_ssrc)
out_ssrc = ke->target.ssrc[u];
struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(out_ssrc),
sink->ssrc_out, 0);
if (!ctx)
continue;
if (max_pt != -1)
payload_tracker_add(&ctx->tracker, max_pt);
if (sink->crypto.params.crypto_suite
&& o->encrypt.last_index[u] - ctx->srtp_index > 0x4000)
{
ilog(LOG_DEBUG, "Updating SRTP encryption index from %" PRIu64
" to %" PRIu64,
ctx->srtp_index,
o->encrypt.last_index[u]);
ctx->srtp_index = o->encrypt.last_index[u];
update = true;
}
}
mutex_unlock(&sink->out_lock);
}
mutex_lock(&ps->in_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
break;
struct ssrc_ctx *ctx = __hunt_ssrc_ctx(ntohl(ke->target.ssrc[u]),
ps->ssrc_in, 0);
if (!ctx)
continue;
// TODO: add in SSRC stats similar to __stream_update_stats
atomic64_set(&ctx->last_seq, ke->target.decrypt.last_index[u]);
if (max_pt != -1)
payload_tracker_add(&ctx->tracker, max_pt);
if (sfd->crypto.params.crypto_suite
&& ke->target.decrypt.last_index[u]
- ctx->srtp_index > 0x4000) {
ilog(LOG_DEBUG, "Updating SRTP decryption index from %" PRIu64
" to %" PRIu64,
ctx->srtp_index,
ke->target.decrypt.last_index[u]);
ctx->srtp_index = ke->target.decrypt.last_index[u];
update = true;
}
}
mutex_unlock(&ps->in_lock);
}
rwlock_unlock_r(&sfd->call->master_lock);
if (update)
redis_update_onekey(ps->call, rtpe_redis_write);
next:
g_slice_free1(sizeof(*ke), ke);
kl = g_list_delete_link(kl, kl);
log_info_pop();
}
}
void kernel_stats_updater_iterator(void * dummy) {
while (!rtpe_shutdown) {
kernel_stats_updater();
thread_cancel_enable();
usleep(1000000); /* sleep for 1 second in each iteration */
thread_cancel_disable();
}
}

@ -334,6 +334,8 @@ const struct transport_protocol *transport_protocol(const str *s);
//void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered);
void play_buffered(struct jb_packet *cp);
void kernel_stats_updater_iterator(void * dummy);
INLINE int proto_is_rtp(const struct transport_protocol *protocol) {
// known to be RTP? therefore unknown is not RTP
if (!protocol)

Loading…
Cancel
Save