From b956303748ab5c8a4bf91a79ae7582f63f11adf4 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 3 Feb 2023 12:27:00 -0500 Subject: [PATCH] MT#55283 support reporting per-interval metrics Change-Id: Ia94594f5742dec4a47849a09cca71ab731e3a0e7 --- daemon/cli.c | 4 +- daemon/media_socket.c | 34 +++++++++++ daemon/mqtt.c | 8 ++- daemon/statistics.c | 128 ++++++++++++++++++++++++++++++++++++++++- daemon/websocket.c | 2 +- include/aux.h | 5 +- include/media_socket.h | 33 +++++++++++ include/statistics.h | 2 +- t/test-stats.c | 14 ++--- 9 files changed, 215 insertions(+), 15 deletions(-) diff --git a/daemon/cli.c b/daemon/cli.c index 281ef1d8d..cc4ebba48 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -473,7 +473,7 @@ static void cli_incoming_list_counters(str *instr, struct cli_writer *cw) { } static void cli_incoming_list_totals(str *instr, struct cli_writer *cw) { - AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics()); + AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics(NULL)); for (GList *l = metrics->head; l; l = l->next) { struct stats_metric *m = l->data; @@ -1661,7 +1661,7 @@ static void cli_incoming_list_interfaces(str *instr, struct cli_writer *cw) { } static void cli_incoming_list_jsonstats(str *instr, struct cli_writer *cw) { - AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics()); + AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics(NULL)); for (GList *l = metrics->head; l; l = l->next) { struct stats_metric *m = l->data; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index f589a2b1f..f77a1a040 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -80,6 +80,10 @@ struct late_port_release { socket_t socket; struct intf_spec *spec; }; +struct interface_stats_interval { + struct interface_stats_block stats; + struct timeval last_run; +}; static __thread GQueue ports_to_release = G_QUEUE_INIT; @@ -2978,3 +2982,33 @@ void interfaces_free(void) { local_media_socket_endpoints = NULL; rwlock_destroy(&local_media_socket_endpoints_lock); } + + + +static void interface_stats_block_free(void *p) { + g_slice_free1(sizeof(struct interface_stats_interval), p); +} +void interface_sampled_rate_stats_init(struct interface_sampled_rate_stats *s) { + s->ht = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, + interface_stats_block_free); +} +void interface_sampled_rate_stats_destroy(struct interface_sampled_rate_stats *s) { + g_hash_table_destroy(s->ht); +} +struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_sampled_rate_stats *s, + struct local_intf *lif, long long *time_diff_us) +{ + if (!s) + return NULL; + struct interface_stats_interval *ret = g_hash_table_lookup(s->ht, lif); + if (!ret) { + ret = g_slice_alloc0(sizeof(*ret)); + g_hash_table_insert(s->ht, lif, ret); + } + if (ret->last_run.tv_sec) + *time_diff_us = timeval_diff(&rtpe_now, &ret->last_run); + else + *time_diff_us = 0; + ret->last_run = rtpe_now; + return &ret->stats; +} diff --git a/daemon/mqtt.c b/daemon/mqtt.c index f706a166e..8fe218280 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -21,11 +21,16 @@ static struct mosquitto *mosq; static bool is_connected = false; +static struct interface_sampled_rate_stats interface_rate_stats; + static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media); + int mqtt_init(void) { + interface_sampled_rate_stats_init(&interface_rate_stats); + mosq = mosquitto_new(rtpe_config.mqtt_id, true, NULL); if (!mosq) { ilog(LOG_ERR, "Failed to create mosquitto client instance: %s", strerror(errno)); @@ -460,7 +465,8 @@ static void mqtt_full_call(struct call *call, JsonBuilder *json) { static void mqtt_global_stats(JsonBuilder *json) { - AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics()); + AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, + statistics_gather_metrics(&interface_rate_stats)); for (GList *l = metrics->head; l; l = l->next) { struct stats_metric *m = l->data; diff --git a/daemon/statistics.c b/daemon/statistics.c index c27b2ca24..2a34496cf 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -310,7 +310,7 @@ static void add_header(GQueue *ret, const char *fmt1, const char *fmt2, ...) { #define HEADERl(fmt2, ...) add_header(ret, NULL, fmt2, ##__VA_ARGS__) -GQueue *statistics_gather_metrics(void) { +GQueue *statistics_gather_metrics(struct interface_sampled_rate_stats *interface_rate_stats) { GQueue *ret = g_queue_new(); double calls_dur_iv; @@ -688,6 +688,39 @@ GQueue *statistics_gather_metrics(void) { #include "interface_counter_stats_fields.inc" #undef F + // expected to be single thread only, so no locking + long long time_diff_us; + struct interface_stats_block *intv_stats + = interface_sampled_rate_stats_get(interface_rate_stats, lif, &time_diff_us); + + if (intv_stats) { + HEADER("interval", NULL); + HEADER("{", NULL); + + struct interface_counter_stats diff; + interface_counter_calc_diff(&lif->stats.s, &intv_stats->s, &diff); + +#define F(f) METRICs(#f, UINT64F, atomic64_get(&diff.f)); +#include "interface_counter_stats_fields.inc" +#undef F + + HEADER("}", NULL); + + if (time_diff_us) { + HEADER("rate", NULL); + HEADER("{", NULL); + + struct interface_counter_stats rate; + interface_counter_calc_rate_from_diff(time_diff_us, &diff, &rate); + +#define F(f) METRICs(#f, UINT64F, atomic64_get(&rate.f)); +#include "interface_counter_stats_fields.inc" +#undef F + + HEADER("}", NULL); + } + } + HEADER("voip_metrics", NULL); HEADER("{", NULL); @@ -718,6 +751,44 @@ GQueue *statistics_gather_metrics(void) { HEADER("}", NULL); + if (intv_stats) { + + HEADER("voip_metrics_interval", NULL); + HEADER("{", NULL); + + struct interface_sampled_stats diff; + interface_sampled_calc_diff(&lif->stats.sampled, &intv_stats->sampled, &diff); + struct interface_sampled_stats_avg avg; + interface_sampled_avg(&avg, &diff); + + METRIC("mos", "Average interval MOS", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.avg.mos) / 10.0); \ + METRIC("mos_stddev", "Standard deviation interval MOS", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.stddev.mos) / 100.0); \ + METRIC("jitter", "Average interval jitter (reported)", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.avg.jitter)); \ + METRIC("jitter_stddev", "Standard deviation interval jitter (reported)", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.stddev.jitter)); \ + METRIC("rtt_e2e", "Average interval end-to-end round-trip time", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.avg.rtt_e2e)); \ + METRIC("rtt_e2e_stddev", "Standard deviation interval end-to-end round-trip time", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.stddev.rtt_e2e)); \ + METRIC("rtt_dsct", "Average interval discrete round-trip time", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.avg.rtt_dsct)); \ + METRIC("rtt_dsct_stddev", "Standard deviation interval discrete round-trip time", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.stddev.rtt_dsct)); \ + METRIC("packetloss", "Average interval packet loss", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.avg.packetloss)); \ + METRIC("packetloss_stddev", "Standard deviation interval packet loss", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.stddev.packetloss)); \ + METRIC("jitter_measured", "Average interval jitter (measured)", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.avg.jitter_measured)); \ + METRIC("jitter_measured_stddev", "Standard deviation interval jitter (measured)", "%.6f", "%.6f", \ + (double) atomic64_get(&avg.stddev.jitter_measured)); \ + + HEADER("}", NULL); + } + HEADER("ingress", NULL); HEADER("{", NULL); #define F(f) \ @@ -740,6 +811,59 @@ GQueue *statistics_gather_metrics(void) { #undef F HEADER("}", NULL); + if (intv_stats) { + HEADER("ingress_interval", NULL); + HEADER("{", NULL); + + struct interface_counter_stats_dir diff_in; + interface_counter_calc_diff_dir(&lif->stats.in, &intv_stats->in, &diff_in); + +#define F(f) METRICs(#f, UINT64F, atomic64_get(&diff_in.f)); +#include "interface_counter_stats_fields_dir.inc" +#undef F + + HEADER("}", NULL); + + HEADER("egress_interval", NULL); + HEADER("{", NULL); + + struct interface_counter_stats_dir diff_out; + interface_counter_calc_diff_dir(&lif->stats.out, &intv_stats->out, &diff_out); + +#define F(f) METRICs(#f, UINT64F, atomic64_get(&diff_out.f)); +#include "interface_counter_stats_fields_dir.inc" +#undef F + + HEADER("}", NULL); + + if (time_diff_us) { + HEADER("ingress_rate", NULL); + HEADER("{", NULL); + + struct interface_counter_stats_dir rate; + interface_counter_calc_rate_from_diff_dir(time_diff_us, &diff_in, + &rate); + +#define F(f) METRICs(#f, UINT64F, atomic64_get(&rate.f)); +#include "interface_counter_stats_fields_dir.inc" +#undef F + + HEADER("}", NULL); + + HEADER("egress_rate", NULL); + HEADER("{", NULL); + + interface_counter_calc_rate_from_diff_dir(time_diff_us, &diff_out, + &rate); + +#define F(f) METRICs(#f, UINT64F, atomic64_get(&rate.f)); +#include "interface_counter_stats_fields_dir.inc" +#undef F + + HEADER("}", NULL); + } + } + HEADER("}", NULL); } HEADER("]", NULL); @@ -824,7 +948,7 @@ void statistics_init() { } const char *statistics_ng(bencode_item_t *input, bencode_item_t *output) { - AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics()); + AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics(NULL)); AUTO_CLEANUP_INIT(GQueue bstack, g_queue_clear, G_QUEUE_INIT); bencode_item_t *dict = output; diff --git a/daemon/websocket.c b/daemon/websocket.c index ebb368cd6..c9510eda4 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -338,7 +338,7 @@ static const char *websocket_http_ping(struct websocket_message *wm) { static const char *websocket_http_metrics(struct websocket_message *wm) { ilogs(http, LOG_DEBUG, "Respoding to GET /metrics"); - AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics()); + AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics(NULL)); AUTO_CLEANUP_INIT(GString *outp, __g_string_free, g_string_new("")); AUTO_CLEANUP_INIT(GHashTable *metric_types, __g_hash_table_destroy, g_hash_table_new(g_str_hash, g_str_equal)); diff --git a/include/aux.h b/include/aux.h index a075fe704..dd3bc5365 100644 --- a/include/aux.h +++ b/include/aux.h @@ -523,13 +523,16 @@ INLINE void atomic64_local_copy_zero(atomic64 *dst, atomic64 *src) { } while (1); \ } while (0) +INLINE void atomic64_calc_rate_from_diff(long long run_diff_us, uint64_t diff, atomic64 *rate_var) { + atomic64_set(rate_var, run_diff_us ? diff * 1000000LL / run_diff_us : 0); +} INLINE void atomic64_calc_rate(const atomic64 *ax_var, long long run_diff_us, atomic64 *intv_var, atomic64 *rate_var) { uint64_t ax = atomic64_get(ax_var); uint64_t old_intv = atomic64_get(intv_var); atomic64_set(intv_var, ax); - atomic64_set(rate_var, run_diff_us ? (ax - old_intv) * 1000000LL / run_diff_us : 0); + atomic64_calc_rate_from_diff(run_diff_us, ax - old_intv, rate_var); } INLINE void atomic64_calc_diff(const atomic64 *ax_var, atomic64 *intv_var, atomic64 *diff_var) { uint64_t ax = atomic64_get(ax_var); diff --git a/include/media_socket.h b/include/media_socket.h index 881a5d3fe..0ba80b7ce 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -135,6 +135,10 @@ struct interface_stats_block { struct interface_counter_stats s; struct interface_sampled_stats sampled; }; +struct interface_sampled_rate_stats { + GHashTable *ht; + struct interface_stats_block intv; +}; INLINE void interface_sampled_calc_diff(const struct interface_sampled_stats *stats, struct interface_sampled_stats *intv, struct interface_sampled_stats *diff) { @@ -148,6 +152,35 @@ INLINE void interface_sampled_avg(struct interface_sampled_stats_avg *loc, #include "interface_sampled_stats_fields.inc" #undef F } +INLINE void interface_counter_calc_diff(const struct interface_counter_stats *stats, + struct interface_counter_stats *intv, struct interface_counter_stats *diff) { +#define F(x) atomic64_calc_diff(&stats->x, &intv->x, &diff->x); +#include "interface_counter_stats_fields.inc" +#undef F +} +INLINE void interface_counter_calc_diff_dir(const struct interface_counter_stats_dir *stats, + struct interface_counter_stats_dir *intv, struct interface_counter_stats_dir *diff) { +#define F(x) atomic64_calc_diff(&stats->x, &intv->x, &diff->x); +#include "interface_counter_stats_fields_dir.inc" +#undef F +} +INLINE void interface_counter_calc_rate_from_diff(long long run_diff_us, + struct interface_counter_stats *diff, struct interface_counter_stats *rate) { +#define F(x) atomic64_calc_rate_from_diff(run_diff_us, atomic64_get(&diff->x), &rate->x); +#include "interface_counter_stats_fields.inc" +#undef F +} +INLINE void interface_counter_calc_rate_from_diff_dir(long long run_diff_us, + struct interface_counter_stats_dir *diff, struct interface_counter_stats_dir *rate) { +#define F(x) atomic64_calc_rate_from_diff(run_diff_us, atomic64_get(&diff->x), &rate->x); +#include "interface_counter_stats_fields_dir.inc" +#undef F +} +void interface_sampled_rate_stats_init(struct interface_sampled_rate_stats *); +void interface_sampled_rate_stats_destroy(struct interface_sampled_rate_stats *); +struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_sampled_rate_stats *s, + struct local_intf *lif, long long *time_diff_us); + struct local_intf { struct intf_spec *spec; struct intf_address advertised_address; diff --git a/include/statistics.h b/include/statistics.h index a02b4ce84..f4e410a79 100644 --- a/include/statistics.h +++ b/include/statistics.h @@ -193,7 +193,7 @@ void statistics_update_ip46_inc_dec(struct call *, int op); void statistics_update_foreignown_dec(struct call *); void statistics_update_foreignown_inc(struct call* c); -GQueue *statistics_gather_metrics(void); +GQueue *statistics_gather_metrics(struct interface_sampled_rate_stats *); void statistics_free_metrics(GQueue **); const char *statistics_ng(bencode_item_t *input, bencode_item_t *output); diff --git a/t/test-stats.c b/t/test-stats.c index 2ee2d0974..0d25c63f6 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -282,7 +282,7 @@ int main(void) { "timeout_sess 0 150\n" "reject_sess 0 150\n"); - GQueue *stats = statistics_gather_metrics(); + GQueue *stats = statistics_gather_metrics(NULL); assert_metrics_eq(stats, "\n" "{\n" @@ -1335,7 +1335,7 @@ int main(void) { "timeout_sess 0 150\n" "reject_sess 0 150\n"); - stats = statistics_gather_metrics(); + stats = statistics_gather_metrics(NULL); assert_metrics_eq(stats, "\n" "{\n" @@ -2385,7 +2385,7 @@ int main(void) { "timeout_sess 0 150\n" "reject_sess 0 150\n"); - stats = statistics_gather_metrics(); + stats = statistics_gather_metrics(NULL); assert_metrics_eq(stats, "\n" "{\n" @@ -3448,7 +3448,7 @@ int main(void) { "timeout_sess 0 157\n" "reject_sess 0 157\n"); - stats = statistics_gather_metrics(); + stats = statistics_gather_metrics(NULL); assert_metrics_eq(stats, "\n" "{\n" @@ -4506,7 +4506,7 @@ int main(void) { "timeout_sess 0 157\n" "reject_sess 0 157\n"); - stats = statistics_gather_metrics(); + stats = statistics_gather_metrics(NULL); assert_metrics_eq(stats, "\n" "{\n" @@ -5559,7 +5559,7 @@ int main(void) { "timeout_sess 0 200\n" "reject_sess 0 200\n"); - stats = statistics_gather_metrics(); + stats = statistics_gather_metrics(NULL); assert_metrics_eq(stats, "\n" "{\n" @@ -6614,7 +6614,7 @@ int main(void) { "timeout_sess 0 200\n" "reject_sess 0 200\n"); - stats = statistics_gather_metrics(); + stats = statistics_gather_metrics(NULL); assert_metrics_eq(stats, "\n" "{\n"