MT#55283 use bufferpool for rtpe_stats

Using allocated memory makes it possible to move this to shm

Change-Id: Ie52bb595f824461b295d7fbdaf70127806c8580e
pull/1826/head
Richard Fuchs 1 year ago
parent cdc8407a7c
commit 4cee2c1fa7

@ -92,7 +92,7 @@ GString *print_graphite_data(void) {
long long time_diff_us = timeval_diff(&rtpe_now, &rtpe_latest_graphite_interval_start);
rtpe_latest_graphite_interval_start = rtpe_now;
stats_counters_calc_diff(&rtpe_stats, &rtpe_stats_graphite_intv, &rtpe_stats_graphite_diff);
stats_counters_calc_diff(rtpe_stats, &rtpe_stats_graphite_intv, &rtpe_stats_graphite_diff);
stats_rate_min_max_avg_sample(&rtpe_rate_graphite_min_max, &rtpe_rate_graphite_min_max_avg_sampled,
time_diff_us, &rtpe_stats_graphite_diff);
@ -129,7 +129,7 @@ GString *print_graphite_data(void) {
(double) atomic64_get_na(&rtpe_sampled_graphite_min_max_sampled.max.ng_command_times[i]) / 1000000.0,
(double) atomic64_get_na(&rtpe_sampled_graphite_avg.avg.ng_command_times[i]) / 1000000.0);
GPF("%s_count %" PRIu64, ng_command_strings_esc[i], atomic64_get_na(&rtpe_stats.ng_commands[i]));
GPF("%s_count %" PRIu64, ng_command_strings_esc[i], atomic64_get_na(&rtpe_stats->ng_commands[i]));
}
GPF("call_dur %.6f", (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0);
@ -141,7 +141,7 @@ GString *print_graphite_data(void) {
avg_duration = (struct timeval) {0,0};
GPF("average_call_dur %llu.%06llu",(unsigned long long)avg_duration.tv_sec,(unsigned long long)avg_duration.tv_usec);
GPF("forced_term_sess "UINT64F, atomic64_get_na(&rtpe_stats_graphite_diff.forced_term_sess));
GPF("managed_sess "UINT64F, atomic64_get_na(&rtpe_stats.managed_sess));
GPF("managed_sess "UINT64F, atomic64_get_na(&rtpe_stats->managed_sess));
GPF("managed_sess_min "UINT64F, atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.min.total_sessions));
GPF("managed_sess_max "UINT64F, atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.max.total_sessions));
GPF("current_sessions_total "UINT64F, atomic64_get_na(&rtpe_stats_gauge.total_sessions));

@ -7,6 +7,7 @@
#include "graphite.h"
#include "main.h"
#include "control_ng.h"
#include "bufferpool.h"
struct timeval rtpe_started;
@ -21,7 +22,7 @@ struct global_gauge_min_max rtpe_gauge_min_max; // master lifetime min/max
struct global_stats_sampled rtpe_stats_sampled; // master cumulative values
struct global_sampled_min_max rtpe_sampled_min_max; // master lifetime min/max
struct global_stats_counter rtpe_stats; // total, cumulative, master
struct global_stats_counter *rtpe_stats; // total, cumulative, master
struct global_stats_counter rtpe_stats_rate; // per-second, calculated once per timer run
struct global_stats_counter rtpe_stats_intv; // calculated once per sec by `call_rate_stats_updater()`
@ -374,8 +375,8 @@ stats_metric_q *statistics_gather_metrics(struct interface_sampled_rate_stats *i
PROM("mediastreams", "gauge");
PROMLAB("type=\"mixed\"");
num_sessions = atomic64_get_na(&rtpe_stats.managed_sess);
uint64_t total_duration = atomic64_get_na(&rtpe_stats.call_duration);
num_sessions = atomic64_get_na(&rtpe_stats->managed_sess);
uint64_t total_duration = atomic64_get_na(&rtpe_stats->call_duration);
uint64_t avg_us = num_sessions ? total_duration / num_sessions : 0;
HEADER("}", "");
@ -387,67 +388,67 @@ stats_metric_q *statistics_gather_metrics(struct interface_sampled_rate_stats *i
METRIC("managedsessions", "Total managed sessions", UINT64F, UINT64F, num_sessions);
PROM("sessions_total", "counter");
METRIC("rejectedsessions", "Total rejected sessions", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats.rejected_sess));
METRIC("rejectedsessions", "Total rejected sessions", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats->rejected_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"rejected\"");
METRIC("timeoutsessions", "Total timed-out sessions via TIMEOUT", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats.timeout_sess));
METRIC("timeoutsessions", "Total timed-out sessions via TIMEOUT", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats->timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"timeout\"");
METRIC("silenttimeoutsessions", "Total timed-out sessions via SILENT_TIMEOUT", UINT64F, UINT64F,atomic64_get_na(&rtpe_stats.silent_timeout_sess));
METRIC("silenttimeoutsessions", "Total timed-out sessions via SILENT_TIMEOUT", UINT64F, UINT64F,atomic64_get_na(&rtpe_stats->silent_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"silent_timeout\"");
METRIC("finaltimeoutsessions", "Total timed-out sessions via FINAL_TIMEOUT", UINT64F, UINT64F,atomic64_get_na(&rtpe_stats.final_timeout_sess));
METRIC("finaltimeoutsessions", "Total timed-out sessions via FINAL_TIMEOUT", UINT64F, UINT64F,atomic64_get_na(&rtpe_stats->final_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"final_timeout\"");
METRIC("offertimeoutsessions", "Total timed-out sessions via OFFER_TIMEOUT", UINT64F, UINT64F,atomic64_get_na(&rtpe_stats.offer_timeout_sess));
METRIC("offertimeoutsessions", "Total timed-out sessions via OFFER_TIMEOUT", UINT64F, UINT64F,atomic64_get_na(&rtpe_stats->offer_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"offer_timeout\"");
METRIC("regularterminatedsessions", "Total regular terminated sessions", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats.regular_term_sess));
METRIC("regularterminatedsessions", "Total regular terminated sessions", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats->regular_term_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"terminated\"");
METRIC("forcedterminatedsessions", "Total forced terminated sessions", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats.forced_term_sess));
METRIC("forcedterminatedsessions", "Total forced terminated sessions", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats->forced_term_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"force_terminated\"");
METRIC("relayedpackets_user", "Total relayed packets (userspace)", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.packets_user));
atomic64_get_na(&rtpe_stats->packets_user));
PROM("packets_total", "counter");
PROMLAB("type=\"userspace\"");
METRIC("relayedpacketerrors_user", "Total relayed packet errors (userspace)", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.errors_user));
atomic64_get_na(&rtpe_stats->errors_user));
PROM("packet_errors_total", "counter");
PROMLAB("type=\"userspace\"");
METRIC("relayedbytes_user", "Total relayed bytes (userspace)", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.bytes_user));
atomic64_get_na(&rtpe_stats->bytes_user));
PROM("bytes_total", "counter");
PROMLAB("type=\"userspace\"");
METRIC("relayedpackets_kernel", "Total relayed packets (kernel)", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.packets_kernel));
atomic64_get_na(&rtpe_stats->packets_kernel));
PROM("packets_total", "counter");
PROMLAB("type=\"kernel\"");
METRIC("relayedpacketerrors_kernel", "Total relayed packet errors (kernel)", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.errors_kernel));
atomic64_get_na(&rtpe_stats->errors_kernel));
PROM("packet_errors_total", "counter");
PROMLAB("type=\"kernel\"");
METRIC("relayedbytes_kernel", "Total relayed bytes (kernel)", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.bytes_kernel));
atomic64_get_na(&rtpe_stats->bytes_kernel));
PROM("bytes_total", "counter");
PROMLAB("type=\"kernel\"");
METRIC("relayedpackets", "Total relayed packets", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.packets_kernel) +
atomic64_get_na(&rtpe_stats.packets_user));
atomic64_get_na(&rtpe_stats->packets_kernel) +
atomic64_get_na(&rtpe_stats->packets_user));
METRIC("relayedpacketerrors", "Total relayed packet errors", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.errors_kernel) +
atomic64_get_na(&rtpe_stats.errors_user));
atomic64_get_na(&rtpe_stats->errors_kernel) +
atomic64_get_na(&rtpe_stats->errors_user));
METRIC("relayedbytes", "Total relayed bytes", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.bytes_kernel) +
atomic64_get_na(&rtpe_stats.bytes_user));
atomic64_get_na(&rtpe_stats->bytes_kernel) +
atomic64_get_na(&rtpe_stats->bytes_user));
METRIC("zerowaystreams", "Total number of streams with no relayed packets", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats.nopacket_relayed_sess));
METRIC("zerowaystreams", "Total number of streams with no relayed packets", UINT64F, UINT64F, atomic64_get_na(&rtpe_stats->nopacket_relayed_sess));
PROM("zero_packet_streams_total", "counter");
METRIC("onewaystreams", "Total number of 1-way streams", UINT64F, UINT64F,atomic64_get_na(&rtpe_stats.oneway_stream_sess));
METRIC("onewaystreams", "Total number of 1-way streams", UINT64F, UINT64F,atomic64_get_na(&rtpe_stats->oneway_stream_sess));
PROM("one_way_sessions_total", "counter");
METRICva("avgcallduration", "Average call duration", "%.6f", "%.6f seconds", (double) avg_us / 1000000.0);
PROM("call_duration_avg", "gauge");
@ -455,7 +456,7 @@ stats_metric_q *statistics_gather_metrics(struct interface_sampled_rate_stats *i
METRICva("totalcallsduration", "Total calls duration", "%.6f", "%.6f seconds", (double) total_duration / 1000000.0);
PROM("call_duration_total", "counter");
total_duration = atomic64_get_na(&rtpe_stats.call_duration2);
total_duration = atomic64_get_na(&rtpe_stats->call_duration2);
METRICva("totalcallsduration2", "Total calls duration squared", "%.6f", "%.6f seconds squared", (double) total_duration / 1000000.0);
PROM("call_duration2_total", "counter");
@ -549,19 +550,19 @@ stats_metric_q *statistics_gather_metrics(struct interface_sampled_rate_stats *i
STAT_GET_PRINT(packetloss, "packet loss", 1.0);
STAT_GET_PRINT(jitter_measured, "jitter (measured)", 1.0);
METRIC("packets_lost", "Packets lost", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.packets_lost));
atomic64_get_na(&rtpe_stats->packets_lost));
PROM("packets_lost", "counter");
METRIC("rtp_duplicates", "Duplicate RTP packets", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.rtp_duplicates));
atomic64_get_na(&rtpe_stats->rtp_duplicates));
PROM("rtp_duplicates", "counter");
METRIC("rtp_skips", "RTP sequence skips", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.rtp_skips));
atomic64_get_na(&rtpe_stats->rtp_skips));
PROM("rtp_skips", "counter");
METRIC("rtp_seq_resets", "RTP sequence resets", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.rtp_seq_resets));
atomic64_get_na(&rtpe_stats->rtp_seq_resets));
PROM("rtp_seq_resets", "counter");
METRIC("rtp_reordered", "Out-of-order RTP packets", UINT64F, UINT64F,
atomic64_get_na(&rtpe_stats.rtp_reordered));
atomic64_get_na(&rtpe_stats->rtp_reordered));
PROM("rtp_reordered", "counter");
HEADER(NULL, "");
HEADER("}", "");
@ -931,6 +932,8 @@ void statistics_free_metrics(stats_metric_q *q) {
void statistics_free(void) {
mutex_destroy(&rtpe_codec_stats_lock);
t_hash_table_destroy(rtpe_codec_stats);
bufferpool_unref(rtpe_stats);
rtpe_stats = NULL;
}
static void codec_stats_free(struct codec_stats *stats_entry) {
@ -943,8 +946,7 @@ TYPED_GHASHTABLE_IMPL(codec_stats_ht, g_str_hash, g_str_equal, NULL, codec_stats
void statistics_init(void) {
gettimeofday(&rtpe_started, NULL);
//rtpe_totalstats_interval.managed_sess_min = 0; // already zeroed
//rtpe_totalstats_interval.managed_sess_max = 0;
rtpe_stats = bufferpool_alloc0(shm_bufferpool, sizeof(*rtpe_stats));
mutex_init(&rtpe_codec_stats_lock);
rtpe_codec_stats = codec_stats_ht_new();
@ -1028,7 +1030,7 @@ enum thread_looper_action call_rate_stats_updater(void) {
if (last_run.tv_sec) { /* `stats_counters_calc_rate()` shouldn't be called on the very first cycle */
long long run_diff_us = timeval_diff(&rtpe_now, &last_run);
stats_counters_calc_rate(&rtpe_stats, run_diff_us, &rtpe_stats_intv, &rtpe_stats_rate);
stats_counters_calc_rate(rtpe_stats, run_diff_us, &rtpe_stats_intv, &rtpe_stats_rate);
}
last_run = rtpe_now;

@ -164,11 +164,11 @@ extern struct global_sampled_min_max rtpe_sampled_min_max; // master lifetime m
} \
} while (0)
extern struct global_stats_counter rtpe_stats; // total, cumulative, master
extern struct global_stats_counter *rtpe_stats; // total, cumulative, master
extern struct global_stats_counter rtpe_stats_rate; // per-second, calculated once per timer run
extern struct global_stats_counter rtpe_stats_intv; // per-second, calculated once per timer run
#define RTPE_STATS_ADD(field, num) atomic64_add_na(&rtpe_stats.field, num)
#define RTPE_STATS_ADD(field, num) atomic64_add_na(&rtpe_stats->field, num)
#define RTPE_STATS_INC(field) RTPE_STATS_ADD(field, 1)

@ -8,7 +8,7 @@
struct rtpengine_config rtpe_config;
struct global_stats_gauge rtpe_stats_gauge;
struct global_gauge_min_max rtpe_gauge_min_max;
struct global_stats_counter rtpe_stats;
struct global_stats_counter *rtpe_stats;
struct global_stats_counter rtpe_stats_rate;
struct global_stats_counter rtpe_stats_intv;
struct global_stats_sampled rtpe_stats_sampled;

@ -8,7 +8,7 @@
struct rtpengine_config rtpe_config;
struct global_stats_gauge rtpe_stats_gauge;
struct global_gauge_min_max rtpe_gauge_min_max;
struct global_stats_counter rtpe_stats;
struct global_stats_counter *rtpe_stats;
struct global_stats_counter rtpe_stats_rate;
struct global_stats_counter rtpe_stats_intv;
struct global_stats_sampled rtpe_stats_sampled;

@ -7,6 +7,7 @@
#include "call_interfaces.h"
#include "ssllib.h"
#include "ice.h"
#include "bufferpool.h"
int _log_facility_rtcp;
int _log_facility_cdr;
@ -64,6 +65,8 @@ static void __assert_metrics_eq(stats_metric_q *q, const char *b, unsigned int l
int main(void) {
rtpe_common_config_ptr = &rtpe_config.common;
bufferpool_init();
shm_bufferpool = bufferpool_new(g_malloc, g_free, 4096);
endpoint_parse_any(&rtpe_config.graphite_ep, "1.2.3.4:4567");
@ -3234,7 +3237,7 @@ int main(void) {
// test cmd_ps_min/max/avg
call_timer();
stats_counters_calc_rate(&rtpe_stats, 150000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_counters_calc_rate(rtpe_stats, 150000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate);
ice_slow_timer();
@ -3243,7 +3246,7 @@ int main(void) {
RTPE_STATS_ADD(ng_commands[NGC_OFFER], 20);
call_timer();
stats_counters_calc_rate(&rtpe_stats, 2000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_counters_calc_rate(rtpe_stats, 2000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate);
ice_slow_timer();
@ -3252,7 +3255,7 @@ int main(void) {
RTPE_STATS_ADD(ng_commands[NGC_OFFER], 200);
call_timer();
stats_counters_calc_rate(&rtpe_stats, 5000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_counters_calc_rate(rtpe_stats, 5000000, &rtpe_stats_intv, &rtpe_stats_rate);
stats_rate_min_max(&rtpe_rate_graphite_min_max, &rtpe_stats_rate);
ice_slow_timer();
@ -7478,6 +7481,8 @@ int main(void) {
control_ng_cleanup();
dtls_cert_free();
ice_free();
bufferpool_destroy(shm_bufferpool);
bufferpool_cleanup();
return 0;
}

@ -417,6 +417,7 @@ int main(void) {
rtpe_common_config_ptr = &rtpe_config.common;
bufferpool_init();
media_bufferpool = bufferpool_new(g_malloc, g_free, 4096);
shm_bufferpool = bufferpool_new(g_malloc, g_free, 4096);
unsigned long random_seed = 0;
@ -1722,7 +1723,9 @@ int main(void) {
expect(B, "8/PCMA/8000");
end();
statistics_free();
bufferpool_destroy(media_bufferpool);
bufferpool_destroy(shm_bufferpool);
bufferpool_cleanup();
return 0;

Loading…
Cancel
Save