MT#55283 simplify/clarify stats gathering

Perform accumulation of stats only once (i.e. increasing an actual
counter) and report stats based on differences to previous values,
instead of carrying multiple stats counters for each metric and
resetting each counter to zero whenever stats are reported.

`rtpe_stats` is the global master accumulator.

`_intv` variables are intermediate and local storage for values sampled
from `rtpe_stats` at regular intervals.

`_rate` and `_diff` variables hold stats calculated from `rtpe_stats`
and the respective `_intv` variable whenever the sampling and reporting
occurs.

`stats_counters_calc_diff` is used to calculate stats as differences
between `rtpe_stats` and the last sampled `_intv`

`stats_counters_calc_rate` does the same but calculates a per-second
rate, based on a microsecond duration.

Eliminate now-useless struct global_stats_ax

Change-Id: Ic4ca630161787025219b67e49b41995204d60573
pull/1614/head
Richard Fuchs 2 years ago
parent b5a20bdb91
commit dddaa60afb

@ -69,11 +69,9 @@ struct global_stats_gauge_min_max rtpe_stats_gauge_cumulative;
struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max;
struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_interval;
struct global_stats_ax rtpe_stats;
struct global_stats_counter rtpe_stats_interval;
struct global_stats_counter rtpe_stats_cumulative;
struct global_stats_ax rtpe_stats_graphite;
struct global_stats_counter rtpe_stats_graphite_interval;
struct global_stats_counter rtpe_stats; // total, cumulative, master
static struct global_stats_counter rtpe_stats_intv; // copied out once per timer run
struct global_stats_counter rtpe_stats_rate; // per-second, calculated once per timer run
struct global_stats_min_max rtpe_stats_graphite_min_max;
struct global_stats_min_max rtpe_stats_graphite_min_max_interval;
@ -578,9 +576,9 @@ void call_timer(void *ptr) {
call_timer_iterator(c, &hlp);
ITERATE_CALL_LIST_NEXT_END(c);
stats_counters_ax_calc_avg(&rtpe_stats, run_diff_us, &rtpe_stats_interval);
stats_counters_calc_rate(&rtpe_stats, run_diff_us, &rtpe_stats_intv, &rtpe_stats_rate);
stats_counters_min_max(&rtpe_stats_graphite_min_max, &rtpe_stats.intv);
stats_counters_min_max(&rtpe_stats_graphite_min_max, &rtpe_stats_rate);
// stats derived while iterating calls
RTPE_GAUGE_SET(transcoded_media, hlp.transcoded_media);

@ -463,7 +463,7 @@ void calls_status_tcp(struct streambuf_stream *s) {
rwlock_lock_r(&rtpe_callhash_lock);
streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n",
g_hash_table_size(rtpe_callhash),
atomic64_get(&rtpe_stats.intv.bytes_user) + atomic64_get(&rtpe_stats.intv.bytes_kernel), 0, 0);
atomic64_get(&rtpe_stats_rate.bytes_user) + atomic64_get(&rtpe_stats_rate.bytes_kernel), 0, 0);
rwlock_unlock_r(&rtpe_callhash_lock);
ITERATE_CALL_LIST_START(CALL_ITERATOR_MAIN, c);
@ -1658,8 +1658,8 @@ static enum load_limit_reasons call_offer_session_limit(void) {
}
if (ret == LOAD_LIMIT_NONE && rtpe_config.bw_limit) {
uint64_t bw = atomic64_get(&rtpe_stats.intv.bytes_user) +
atomic64_get(&rtpe_stats.intv.bytes_kernel);
uint64_t bw = atomic64_get(&rtpe_stats_rate.bytes_user) +
atomic64_get(&rtpe_stats_rate.bytes_kernel);
if (bw >= rtpe_config.bw_limit) {
ilog(LOG_WARN, "Bandwidth limit exceeded (%" PRIu64 " > %" PRIu64 ")",
bw, rtpe_config.bw_limit);

@ -450,26 +450,26 @@ static void cli_incoming_params_revert(str *instr, struct cli_writer *cw) {
static void cli_incoming_list_counters(str *instr, struct cli_writer *cw) {
cw->cw_printf(cw, "\nCurrent per-second counters:\n\n");
cw->cw_printf(cw, " Packets per second (userspace) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.packets_user));
atomic64_get(&rtpe_stats_rate.packets_user));
cw->cw_printf(cw, " Bytes per second (userspace) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.bytes_user));
atomic64_get(&rtpe_stats_rate.bytes_user));
cw->cw_printf(cw, " Errors per second (userspace) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.errors_user));
atomic64_get(&rtpe_stats_rate.errors_user));
cw->cw_printf(cw, " Packets per second (kernel) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.packets_kernel));
atomic64_get(&rtpe_stats_rate.packets_kernel));
cw->cw_printf(cw, " Bytes per second (kernel) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.bytes_kernel));
atomic64_get(&rtpe_stats_rate.bytes_kernel));
cw->cw_printf(cw, " Errors per second (kernel) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.errors_kernel));
atomic64_get(&rtpe_stats_rate.errors_kernel));
cw->cw_printf(cw, " Packets per second (total) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.packets_user) +
atomic64_get(&rtpe_stats.intv.packets_kernel));
atomic64_get(&rtpe_stats_rate.packets_user) +
atomic64_get(&rtpe_stats_rate.packets_kernel));
cw->cw_printf(cw, " Bytes per second (total) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.bytes_user) +
atomic64_get(&rtpe_stats.intv.bytes_kernel));
atomic64_get(&rtpe_stats_rate.bytes_user) +
atomic64_get(&rtpe_stats_rate.bytes_kernel));
cw->cw_printf(cw, " Errors per second (total) :%" PRIu64 "\n",
atomic64_get(&rtpe_stats.intv.errors_user) +
atomic64_get(&rtpe_stats.intv.errors_kernel));
atomic64_get(&rtpe_stats_rate.errors_user) +
atomic64_get(&rtpe_stats_rate.errors_kernel));
}
static void cli_incoming_list_totals(str *instr, struct cli_writer *cw) {

@ -33,6 +33,9 @@ static time_t next_run;
static char* graphite_prefix = NULL;
static struct timeval graphite_interval_tv;
struct global_stats_counter rtpe_stats_graphite_diff; // per-interval increases
static struct global_stats_counter rtpe_stats_graphite_intv; // copied out when graphite stats run
void set_graphite_interval_tv(struct timeval *tv) {
graphite_interval_tv = *tv;
}
@ -78,8 +81,7 @@ static int connect_to_graphite_server(const endpoint_t *graphite_ep) {
GString *print_graphite_data(void) {
long long time_diff_us = timeval_diff(&rtpe_now, &rtpe_latest_graphite_interval_start);
stats_counters_ax_calc_avg(&rtpe_stats_graphite, time_diff_us, &rtpe_stats_graphite_interval);
stats_counters_calc_diff(&rtpe_stats, &rtpe_stats_graphite_intv, &rtpe_stats_graphite_diff);
stats_counters_min_max_reset(&rtpe_stats_graphite_min_max, &rtpe_stats_graphite_min_max_interval);
stats_gauge_calc_avg_reset(&rtpe_stats_gauge_graphite_min_max_interval, &rtpe_stats_gauge_graphite_min_max);
@ -108,19 +110,19 @@ GString *print_graphite_data(void) {
(double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.max.ng_command_times[i]) / 1000000.0,
(double) atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.avg.ng_command_times[i]) / 1000000.0);
GPF("%s_count %" PRIu64, ng_command_strings[i], atomic64_get(&rtpe_stats_cumulative.ng_commands[i]));
GPF("%s_count %" PRIu64, ng_command_strings[i], atomic64_get(&rtpe_stats.ng_commands[i]));
}
GPF("call_dur %.6f", (double) atomic64_get_na(&rtpe_stats_graphite_interval.total_calls_duration_intv) / 1000000.0);
GPF("call_dur %.6f", (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0);
struct timeval avg_duration;
uint64_t managed_sess = atomic64_get_na(&rtpe_stats_graphite_interval.managed_sess);
uint64_t managed_sess = atomic64_get_na(&rtpe_stats_graphite_diff.managed_sess);
if (managed_sess)
timeval_from_us(&avg_duration, atomic64_get_na(&rtpe_stats_graphite_interval.call_duration) / managed_sess);
timeval_from_us(&avg_duration, atomic64_get_na(&rtpe_stats_graphite_diff.call_duration) / managed_sess);
else
avg_duration = (struct timeval) {0,0};
GPF("average_call_dur %llu.%06llu",(unsigned long long)avg_duration.tv_sec,(unsigned long long)avg_duration.tv_usec);
GPF("forced_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.forced_term_sess));
GPF("managed_sess "UINT64F, atomic64_get(&rtpe_stats.ax.managed_sess));
GPF("forced_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.forced_term_sess));
GPF("managed_sess "UINT64F, atomic64_get(&rtpe_stats.managed_sess));
GPF("managed_sess_min "UINT64F, atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions));
GPF("managed_sess_max "UINT64F, atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions));
GPF("current_sessions_total "UINT64F, atomic64_get(&rtpe_stats_gauge.total_sessions));
@ -130,26 +132,26 @@ GString *print_graphite_data(void) {
GPF("current_sessions_ipv4 "UINT64F, atomic64_get(&rtpe_stats_gauge.ipv4_sessions));
GPF("current_sessions_ipv6 "UINT64F, atomic64_get(&rtpe_stats_gauge.ipv6_sessions));
GPF("current_sessions_mixed "UINT64F, atomic64_get(&rtpe_stats_gauge.mixed_sessions));
GPF("nopacket_relayed_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.nopacket_relayed_sess));
GPF("oneway_stream_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.oneway_stream_sess));
GPF("regular_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.regular_term_sess));
GPF("relayed_errors_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.errors_user));
GPF("relayed_packets_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.packets_user));
GPF("relayed_bytes_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.bytes_user));
GPF("relayed_errors_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.errors_kernel));
GPF("relayed_packets_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.packets_kernel));
GPF("relayed_bytes_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.bytes_kernel));
GPF("relayed_errors "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.errors_user) +
atomic64_get_na(&rtpe_stats_graphite_interval.errors_kernel));
GPF("relayed_packets "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.packets_user) +
atomic64_get_na(&rtpe_stats_graphite_interval.packets_kernel));
GPF("relayed_bytes "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.bytes_user) +
atomic64_get_na(&rtpe_stats_graphite_interval.bytes_kernel));
GPF("silent_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.silent_timeout_sess));
GPF("final_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.final_timeout_sess));
GPF("offer_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.offer_timeout_sess));
GPF("timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.timeout_sess));
GPF("reject_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_interval.rejected_sess));
GPF("nopacket_relayed_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.nopacket_relayed_sess));
GPF("oneway_stream_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.oneway_stream_sess));
GPF("regular_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.regular_term_sess));
GPF("relayed_errors_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.errors_user));
GPF("relayed_packets_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.packets_user));
GPF("relayed_bytes_user "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_user));
GPF("relayed_errors_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.errors_kernel));
GPF("relayed_packets_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.packets_kernel));
GPF("relayed_bytes_kernel "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_kernel));
GPF("relayed_errors "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.errors_user) +
atomic64_get_na(&rtpe_stats_graphite_diff.errors_kernel));
GPF("relayed_packets "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.packets_user) +
atomic64_get_na(&rtpe_stats_graphite_diff.packets_kernel));
GPF("relayed_bytes "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_user) +
atomic64_get_na(&rtpe_stats_graphite_diff.bytes_kernel));
GPF("silent_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.silent_timeout_sess));
GPF("final_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.final_timeout_sess));
GPF("offer_timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.offer_timeout_sess));
GPF("timeout_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.timeout_sess));
GPF("reject_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.rejected_sess));
for (GList *l = all_local_interfaces.head; l; l = l->next) {
struct local_intf *lif = l->data;
@ -193,7 +195,7 @@ GString *print_graphite_data(void) {
ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%.6f at time %llu\n",
(unsigned long long) atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions),
(unsigned long long) atomic64_get_na(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions),
(double) atomic64_get_na(&rtpe_stats_graphite_interval.total_calls_duration_intv) / 1000000.0,
(double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0,
(unsigned long long ) rtpe_now.tv_sec);
return graph_str;

@ -324,26 +324,26 @@ GQueue *statistics_gather_metrics(void) {
PROM("transcoded_media", "gauge");
METRIC("packetrate_user", "Packets per second (userspace)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.packets_user));
atomic64_get(&rtpe_stats_rate.packets_user));
METRIC("byterate_user", "Bytes per second (userspace)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.bytes_user));
atomic64_get(&rtpe_stats_rate.bytes_user));
METRIC("errorrate_user", "Errors per second (userspace)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.errors_user));
atomic64_get(&rtpe_stats_rate.errors_user));
METRIC("packetrate_kernel", "Packets per second (kernel)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.packets_kernel));
atomic64_get(&rtpe_stats_rate.packets_kernel));
METRIC("byterate_kernel", "Bytes per second (kernel)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.bytes_kernel));
atomic64_get(&rtpe_stats_rate.bytes_kernel));
METRIC("errorrate_kernel", "Errors per second (kernel)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.errors_kernel));
atomic64_get(&rtpe_stats_rate.errors_kernel));
METRIC("packetrate", "Packets per second (total)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.packets_user) +
atomic64_get(&rtpe_stats.intv.packets_kernel));
atomic64_get(&rtpe_stats_rate.packets_user) +
atomic64_get(&rtpe_stats_rate.packets_kernel));
METRIC("byterate", "Bytes per second (total)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.bytes_user) +
atomic64_get(&rtpe_stats.intv.bytes_kernel));
atomic64_get(&rtpe_stats_rate.bytes_user) +
atomic64_get(&rtpe_stats_rate.bytes_kernel));
METRIC("errorrate", "Errors per second (total)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats.intv.errors_user) +
atomic64_get(&rtpe_stats.intv.errors_kernel));
atomic64_get(&rtpe_stats_rate.errors_user) +
atomic64_get(&rtpe_stats_rate.errors_kernel));
METRIC("media_userspace", "Userspace-only media streams", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_gauge.userspace_streams));
@ -360,8 +360,8 @@ GQueue *statistics_gather_metrics(void) {
PROM("mediastreams", "gauge");
PROMLAB("type=\"mixed\"");
num_sessions = atomic64_get(&rtpe_stats_cumulative.managed_sess);
uint64_t total_duration = atomic64_get(&rtpe_stats_cumulative.call_duration);
num_sessions = atomic64_get(&rtpe_stats.managed_sess);
uint64_t total_duration = atomic64_get(&rtpe_stats.call_duration);
uint64_t avg_us = num_sessions ? total_duration / num_sessions : 0;
HEADER("}", "");
@ -373,67 +373,67 @@ GQueue *statistics_gather_metrics(void) {
METRIC("managedsessions", "Total managed sessions", UINT64F, UINT64F, num_sessions);
PROM("sessions_total", "counter");
METRIC("rejectedsessions", "Total rejected sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.rejected_sess));
METRIC("rejectedsessions", "Total rejected sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats.rejected_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"rejected\"");
METRIC("timeoutsessions", "Total timed-out sessions via TIMEOUT", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.timeout_sess));
METRIC("timeoutsessions", "Total timed-out sessions via TIMEOUT", UINT64F, UINT64F, atomic64_get(&rtpe_stats.timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"timeout\"");
METRIC("silenttimeoutsessions", "Total timed-out sessions via SILENT_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats_cumulative.silent_timeout_sess));
METRIC("silenttimeoutsessions", "Total timed-out sessions via SILENT_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats.silent_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"silent_timeout\"");
METRIC("finaltimeoutsessions", "Total timed-out sessions via FINAL_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats_cumulative.final_timeout_sess));
METRIC("finaltimeoutsessions", "Total timed-out sessions via FINAL_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats.final_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"final_timeout\"");
METRIC("offertimeoutsessions", "Total timed-out sessions via OFFER_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats_cumulative.offer_timeout_sess));
METRIC("offertimeoutsessions", "Total timed-out sessions via OFFER_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_stats.offer_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"offer_timeout\"");
METRIC("regularterminatedsessions", "Total regular terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.regular_term_sess));
METRIC("regularterminatedsessions", "Total regular terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats.regular_term_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"terminated\"");
METRIC("forcedterminatedsessions", "Total forced terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.forced_term_sess));
METRIC("forcedterminatedsessions", "Total forced terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats.forced_term_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"force_terminated\"");
METRIC("relayedpackets_user", "Total relayed packets (userspace)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.packets_user));
atomic64_get(&rtpe_stats.packets_user));
PROM("packets_total", "counter");
PROMLAB("type=\"userspace\"");
METRIC("relayedpacketerrors_user", "Total relayed packet errors (userspace)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.errors_user));
atomic64_get(&rtpe_stats.errors_user));
PROM("packet_errors_total", "counter");
PROMLAB("type=\"userspace\"");
METRIC("relayedbytes_user", "Total relayed bytes (userspace)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.bytes_user));
atomic64_get(&rtpe_stats.bytes_user));
PROM("bytes_total", "counter");
PROMLAB("type=\"userspace\"");
METRIC("relayedpackets_kernel", "Total relayed packets (kernel)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.packets_kernel));
atomic64_get(&rtpe_stats.packets_kernel));
PROM("packets_total", "counter");
PROMLAB("type=\"kernel\"");
METRIC("relayedpacketerrors_kernel", "Total relayed packet errors (kernel)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.errors_kernel));
atomic64_get(&rtpe_stats.errors_kernel));
PROM("packet_errors_total", "counter");
PROMLAB("type=\"kernel\"");
METRIC("relayedbytes_kernel", "Total relayed bytes (kernel)", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.bytes_kernel));
atomic64_get(&rtpe_stats.bytes_kernel));
PROM("bytes_total", "counter");
PROMLAB("type=\"kernel\"");
METRIC("relayedpackets", "Total relayed packets", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.packets_kernel) +
atomic64_get(&rtpe_stats_cumulative.packets_user));
atomic64_get(&rtpe_stats.packets_kernel) +
atomic64_get(&rtpe_stats.packets_user));
METRIC("relayedpacketerrors", "Total relayed packet errors", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.errors_kernel) +
atomic64_get(&rtpe_stats_cumulative.errors_user));
atomic64_get(&rtpe_stats.errors_kernel) +
atomic64_get(&rtpe_stats.errors_user));
METRIC("relayedbytes", "Total relayed bytes", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.bytes_kernel) +
atomic64_get(&rtpe_stats_cumulative.bytes_user));
atomic64_get(&rtpe_stats.bytes_kernel) +
atomic64_get(&rtpe_stats.bytes_user));
METRIC("zerowaystreams", "Total number of streams with no relayed packets", UINT64F, UINT64F, atomic64_get(&rtpe_stats_cumulative.nopacket_relayed_sess));
METRIC("zerowaystreams", "Total number of streams with no relayed packets", UINT64F, UINT64F, atomic64_get(&rtpe_stats.nopacket_relayed_sess));
PROM("zero_packet_streams_total", "counter");
METRIC("onewaystreams", "Total number of 1-way streams", UINT64F, UINT64F,atomic64_get(&rtpe_stats_cumulative.oneway_stream_sess));
METRIC("onewaystreams", "Total number of 1-way streams", UINT64F, UINT64F,atomic64_get(&rtpe_stats.oneway_stream_sess));
PROM("one_way_sessions_total", "counter");
METRICva("avgcallduration", "Average call duration", "%.6f", "%.6f seconds", (double) avg_us / 1000000.0);
PROM("call_duration_avg", "gauge");
@ -441,14 +441,14 @@ GQueue *statistics_gather_metrics(void) {
METRICva("totalcallsduration", "Total calls duration", "%.6f", "%.6f seconds", (double) total_duration / 1000000.0);
PROM("call_duration_total", "counter");
total_duration = atomic64_get(&rtpe_stats_cumulative.call_duration2);
total_duration = atomic64_get(&rtpe_stats.call_duration2);
METRICva("totalcallsduration2", "Total calls duration squared", "%.6f", "%.6f seconds squared", (double) total_duration / 1000000.0);
PROM("call_duration2_total", "counter");
double variance = num_sessions ? fabs((double) total_duration / (double) num_sessions - ((double) avg_us / 1000.0) * ((double) avg_us / 1000.0)) : 0.0;
METRICva("totalcallsduration_stddev", "Total calls duration standard deviation", "%.6f", "%.6f seconds", sqrt(variance) / 1000.0);
calls_dur_iv = (double) atomic64_get_na(&rtpe_stats_graphite_interval.total_calls_duration_intv) / 1000000.0;
calls_dur_iv = (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0;
min_sess_iv = atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.min.total_sessions);
max_sess_iv = atomic64_get(&rtpe_stats_gauge_graphite_min_max_interval.max.total_sessions);
@ -530,19 +530,19 @@ GQueue *statistics_gather_metrics(void) {
STAT_GET_PRINT(packetloss, "packet loss", 1.0);
STAT_GET_PRINT(jitter_measured, "jitter (measured)", 1.0);
METRIC("packets_lost", "Packets lost", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.packets_lost));
atomic64_get(&rtpe_stats.packets_lost));
PROM("packets_lost", "counter");
METRIC("rtp_duplicates", "Duplicate RTP packets", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.rtp_duplicates));
atomic64_get(&rtpe_stats.rtp_duplicates));
PROM("rtp_duplicates", "counter");
METRIC("rtp_skips", "RTP sequence skips", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.rtp_skips));
atomic64_get(&rtpe_stats.rtp_skips));
PROM("rtp_skips", "counter");
METRIC("rtp_seq_resets", "RTP sequence resets", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.rtp_seq_resets));
atomic64_get(&rtpe_stats.rtp_seq_resets));
PROM("rtp_seq_resets", "counter");
METRIC("rtp_reordered", "Out-of-order RTP packets", UINT64F, UINT64F,
atomic64_get(&rtpe_stats_cumulative.rtp_reordered));
atomic64_get(&rtpe_stats.rtp_reordered));
PROM("rtp_reordered", "counter");
HEADER(NULL, "");
HEADER("}", "");

@ -523,6 +523,26 @@ INLINE void atomic64_local_copy_zero(atomic64 *dst, atomic64 *src) {
} while (1); \
} while (0)
INLINE void atomic64_calc_rate(const atomic64 *ax_var, long long run_diff_us,
atomic64 *intv_var, atomic64 *rate_var)
{
uint64_t ax = atomic64_get(ax_var);
uint64_t old_intv = atomic64_get(intv_var);
atomic64_set(intv_var, ax);
atomic64_set(rate_var, (ax - old_intv) * 1000000LL / run_diff_us);
}
INLINE void atomic64_calc_diff(const atomic64 *ax_var, atomic64 *intv_var, atomic64 *diff_var) {
uint64_t ax = atomic64_get(ax_var);
uint64_t old_intv = atomic64_get(intv_var);
atomic64_set(intv_var, ax);
atomic64_set(diff_var, ax - old_intv);
}
INLINE void atomic64_mina(atomic64 *min, atomic64 *inp) {
atomic64_min(min, atomic64_get(inp));
}
INLINE void atomic64_maxa(atomic64 *max, atomic64 *inp) {
atomic64_max(max, atomic64_get(inp));
}

@ -704,20 +704,13 @@ extern struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_inter
#define RTPE_GAUGE_INC(field) RTPE_GAUGE_ADD(field, 1)
#define RTPE_GAUGE_DEC(field) RTPE_GAUGE_ADD(field, -1)
extern struct global_stats_ax rtpe_stats;
extern struct global_stats_counter rtpe_stats_interval; // accumulators copied out once per interval
extern struct global_stats_counter rtpe_stats_cumulative; // total, cumulative
extern struct global_stats_ax rtpe_stats_graphite;
extern struct global_stats_counter rtpe_stats_graphite_interval; // copied out when graphite stats run
extern struct global_stats_counter rtpe_stats; // total, cumulative, master
extern struct global_stats_counter rtpe_stats_rate; // per-second, calculated once per timer run
extern struct global_stats_counter rtpe_stats_graphite_diff; // per-interval increases
extern struct global_stats_min_max rtpe_stats_graphite_min_max; // running min/max
extern struct global_stats_min_max rtpe_stats_graphite_min_max_interval; // updated once per graphite run
#define RTPE_STATS_ADD(field, num) \
do { \
atomic64_add(&rtpe_stats.ax.field, num); \
atomic64_add(&rtpe_stats_cumulative.field, num); \
atomic64_add(&rtpe_stats_graphite.ax.field, num); \
} while (0)
#define RTPE_STATS_ADD(field, num) atomic64_add(&rtpe_stats.field, num)
#define RTPE_STATS_INC(field) RTPE_STATS_ADD(field, 1)

@ -55,10 +55,6 @@ struct global_stats_counter {
#undef FA
};
struct global_stats_ax {
struct global_stats_counter ax; // running accumulator
struct global_stats_counter intv; // last per-interval values
};
struct global_stats_min_max {
struct global_stats_counter min;
struct global_stats_counter max;
@ -126,37 +122,31 @@ GQueue *statistics_gather_metrics(void);
void statistics_free_metrics(GQueue **);
const char *statistics_ng(bencode_item_t *input, bencode_item_t *output);
INLINE void stats_counters_ax_calc_avg1(atomic64 *ax_var, atomic64 *intv_var, atomic64 *loc_var,
long long run_diff_us)
{
uint64_t tmp = atomic64_get_set(ax_var, 0);
if (loc_var)
atomic64_set(loc_var, tmp);
atomic64_set(intv_var, tmp * 1000000LL / run_diff_us);
}
INLINE void stats_counters_ax_calc_avg(struct global_stats_ax *stats, long long run_diff_us,
struct global_stats_counter *loc)
INLINE void stats_counters_calc_rate(const struct global_stats_counter *stats, long long run_diff_us,
struct global_stats_counter *intv, struct global_stats_counter *rate)
{
if (run_diff_us <= 0)
return;
#define F(x) stats_counters_ax_calc_avg1(&stats->ax.x, &stats->intv.x, loc ? &loc->x : NULL, run_diff_us);
#define F(x) atomic64_calc_rate(&stats->x, run_diff_us, &intv->x, &rate->x);
#define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) }
#include "counter_stats_fields.inc"
#undef F
}
INLINE void stats_counters_min1(atomic64 *min, atomic64 *inp) {
atomic64_min(min, atomic64_get(inp));
}
INLINE void stats_counters_max1(atomic64 *max, atomic64 *inp) {
atomic64_max(max, atomic64_get(inp));
INLINE void stats_counters_calc_diff(const struct global_stats_counter *stats,
struct global_stats_counter *intv, struct global_stats_counter *diff)
{
#define F(x) atomic64_calc_diff(&stats->x, &intv->x, &diff->x);
#define FA(x, n) for (int i = 0; i < n; i++) { F(x[i]) }
#include "counter_stats_fields.inc"
#undef F
}
INLINE void stats_counters_min_max(struct global_stats_min_max *mm, struct global_stats_counter *inp) {
#define F(x) \
stats_counters_min1(&mm->min.x, &inp->x); \
stats_counters_max1(&mm->max.x, &inp->x); \
atomic64_mina(&mm->min.x, &inp->x); \
atomic64_maxa(&mm->max.x, &inp->x); \
atomic64_add(&mm->avg.x, atomic64_get(&inp->x));
#include "counter_stats_fields.inc"
#undef F

@ -10,11 +10,9 @@ struct global_stats_gauge rtpe_stats_gauge;
struct global_stats_gauge_min_max rtpe_stats_gauge_cumulative;
struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max;
struct global_stats_gauge_min_max rtpe_stats_gauge_graphite_min_max_interval;
struct global_stats_ax rtpe_stats;
struct global_stats_counter rtpe_stats_interval;
struct global_stats_counter rtpe_stats_cumulative;
struct global_stats_ax rtpe_stats_graphite;
struct global_stats_counter rtpe_stats_graphite_interval;
struct global_stats_counter rtpe_stats;
struct global_stats_counter rtpe_stats_rate;
struct global_stats_counter rtpe_stats_graphite_diff;
struct global_stats_min_max rtpe_stats_graphite_min_max;
struct global_stats_min_max rtpe_stats_graphite_min_max_interval;

Loading…
Cancel
Save