From 5d895fb7beed90de9932a7d901e8da10788e16e8 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 23 Nov 2020 13:11:18 -0500 Subject: [PATCH] TT#82405 add Prometheus stats exporter Change-Id: Ic55b42a707e430a61c0422c65827ed2145511586 --- README.md | 5 + daemon/control_ng.c | 78 +++++++++----- daemon/statistics.c | 237 ++++++++++++++++++++++++++----------------- daemon/websocket.c | 51 ++++++++++ include/control_ng.h | 51 ++++++---- include/statistics.h | 4 + 6 files changed, 290 insertions(+), 136 deletions(-) diff --git a/README.md b/README.md index 2b64a233a..f07fbd2d3 100644 --- a/README.md +++ b/README.md @@ -1958,3 +1958,8 @@ the same format, including the unique cookie. For WebSockets, the subprotocol `ng.rtpengine.com` is used and the protocol follows the same format. Messages must consist of a unique cookie and a string in bencode format, and responses will also be in the same format. + +Prometheus Stats Exporter +------------------------- + +The Prometheus metrics can be found under the URI `/metrics`. diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 8acce4c7e..090992a47 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -29,6 +29,18 @@ const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64] = { [LOAD_LIMIT_LOAD] = "Load limit exceeded", [LOAD_LIMIT_BW] = "Bandwidth limit exceeded", }; +const char *ng_command_strings[NGC_COUNT] = { + "ping", "offer", "answer", "delete", "query", "list", "start recording", + "stop recording", "start forwarding", "stop forwarding", "block DTMF", + "unblock DTMF", "block media", "unblock media", "play media", "stop media", + "play DTMF", "statistics", +}; +const char *ng_command_strings_short[NGC_COUNT] = { + "Ping", "Offer", "Answer", "Delete", "Query", "List", "StartRec", + "StopRec", "StartFwd", "StopFwd", "BlkDTMF", + "UnblkDTMF", "BlkMedia", "UnblkMedia", "PlayMedia", "StopMedia", + "PlayDTMF", "Stats", +}; static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) { @@ -110,6 +122,12 @@ struct control_ng_stats* get_control_ng_stats(const sockaddr_t *addr) { cur = g_slice_alloc0(sizeof(struct control_ng_stats)); cur->proxy = *addr; ilog(LOG_DEBUG,"Adding a proxy for control ng stats:%s", sockaddr_print_buf(addr)); + + for (int i = 0; i < NGC_COUNT; i++) { + struct ng_command_stats *c = &cur->cmd[i]; + mutex_init(&c->lock); + } + g_hash_table_insert(rtpe_cngs_hash, &cur->proxy, cur); } mutex_unlock(&rtpe_cngs_lock); @@ -127,6 +145,7 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, struct timeval cmd_start, cmd_stop, cmd_process_time; struct control_ng_stats* cur = get_control_ng_stats(&sin->address); int funcret = -1; + enum ng_command command = -1; str_chr_str(&data, buf, ' '); if (!data.s || data.s == buf->s) { @@ -182,87 +201,84 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, g_string_free(log_str, TRUE); } - // XXX do the strcmp's only once errstr = NULL; resultstr = "ok"; // start command timer gettimeofday(&cmd_start, NULL); - int cmdcode = __csh_lookup(&cmd); - - switch (cmdcode) { + switch (__csh_lookup(&cmd)) { case CSH_LOOKUP("ping"): resultstr = "pong"; - g_atomic_int_inc(&cur->ping); + command = NGC_PING; break; case CSH_LOOKUP("offer"): errstr = call_offer_ng(dict, resp, addr, sin); - g_atomic_int_inc(&cur->offer); + command = NGC_OFFER; break; case CSH_LOOKUP("answer"): errstr = call_answer_ng(dict, resp); - g_atomic_int_inc(&cur->answer); + command = NGC_ANSWER; break; case CSH_LOOKUP("delete"): errstr = call_delete_ng(dict, resp); - g_atomic_int_inc(&cur->delete); + command = NGC_DELETE; break; case CSH_LOOKUP("query"): errstr = call_query_ng(dict, resp); - g_atomic_int_inc(&cur->query); + command = NGC_QUERY; break; case CSH_LOOKUP("list"): errstr = call_list_ng(dict, resp); - g_atomic_int_inc(&cur->list); + command = NGC_LIST; break; case CSH_LOOKUP("start recording"): errstr = call_start_recording_ng(dict, resp); - g_atomic_int_inc(&cur->start_recording); + command = NGC_START_RECORDING; break; case CSH_LOOKUP("stop recording"): errstr = call_stop_recording_ng(dict, resp); - g_atomic_int_inc(&cur->stop_recording); + command = NGC_STOP_RECORDING; break; case CSH_LOOKUP("start forwarding"): errstr = call_start_forwarding_ng(dict, resp); - g_atomic_int_inc(&cur->start_forwarding); + command = NGC_START_FORWARDING; break; case CSH_LOOKUP("stop forwarding"): errstr = call_stop_forwarding_ng(dict, resp); - g_atomic_int_inc(&cur->stop_forwarding); + command = NGC_STOP_FORWARDING; break; case CSH_LOOKUP("block DTMF"): errstr = call_block_dtmf_ng(dict, resp); - g_atomic_int_inc(&cur->block_dtmf); + command = NGC_BLOCK_DTMF; break; case CSH_LOOKUP("unblock DTMF"): errstr = call_unblock_dtmf_ng(dict, resp); - g_atomic_int_inc(&cur->unblock_dtmf); + command = NGC_UNBLOCK_DTMF; break; case CSH_LOOKUP("block media"): errstr = call_block_media_ng(dict, resp); - g_atomic_int_inc(&cur->block_media); + command = NGC_BLOCK_MEDIA; break; case CSH_LOOKUP("unblock media"): errstr = call_unblock_media_ng(dict, resp); - g_atomic_int_inc(&cur->unblock_media); + command = NGC_UNBLOCK_MEDIA; break; case CSH_LOOKUP("play media"): errstr = call_play_media_ng(dict, resp); - g_atomic_int_inc(&cur->play_media); + command = NGC_PLAY_MEDIA; break; case CSH_LOOKUP("stop media"): errstr = call_stop_media_ng(dict, resp); - g_atomic_int_inc(&cur->stop_media); + command = NGC_STOP_MEDIA; break; case CSH_LOOKUP("play DTMF"): errstr = call_play_dtmf_ng(dict, resp); - g_atomic_int_inc(&cur->play_dtmf); + command = NGC_PLAY_DTMF; break; case CSH_LOOKUP("statistics"): errstr = statistics_ng(dict, resp); - g_atomic_int_inc(&cur->statistics); + command = NGC_STATISTICS; break; default: errstr = "Unrecognized command"; @@ -273,25 +289,35 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, //print command duration timeval_from_us(&cmd_process_time, timeval_diff(&cmd_stop, &cmd_start)); + if (command >= 0 && command < NGC_COUNT) { + mutex_lock(&cur->cmd[command].lock); + cur->cmd[command].count++; + timeval_add(&cur->cmd[command].time, &cur->cmd[command].time, &cmd_process_time); + mutex_unlock(&cur->cmd[command].lock); + } + if (errstr) goto err_send; bencode_dictionary_add_string(resp, "result", resultstr); // update interval statistics - switch (cmdcode) { - case CSH_LOOKUP("offer"): + // XXX could generalise these, same as above + switch (command) { + case NGC_OFFER: atomic64_inc(&rtpe_statsps.offers); timeval_update_request_time(&rtpe_totalstats_interval.offer, &cmd_process_time); break; - case CSH_LOOKUP("answer"): + case NGC_ANSWER: atomic64_inc(&rtpe_statsps.answers); timeval_update_request_time(&rtpe_totalstats_interval.answer, &cmd_process_time); break; - case CSH_LOOKUP("delete"): + case NGC_DELETE: atomic64_inc(&rtpe_statsps.deletes); timeval_update_request_time(&rtpe_totalstats_interval.delete, &cmd_process_time); break; + default: + break; } goto send_resp; diff --git a/daemon/statistics.c b/daemon/statistics.c index eb2d6d6f9..53f3efc56 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -40,33 +40,37 @@ static void timeval_totalstats_average_add(struct totalstats *s, const struct ti mutex_unlock(&s->total_average_lock); } -static void timeval_totalstats_interval_call_duration_add(struct totalstats *s, +static void timeval_totalstats_call_duration_add(struct totalstats *s, struct timeval *call_start, struct timeval *call_stop, struct timeval *interval_start, int interval_dur_s) { /* work with graphite interval start val which might be changed elsewhere in the code*/ - struct timeval real_iv_start = *interval_start; + struct timeval real_iv_start = {0,}; struct timeval call_duration; struct timeval *call_start_in_iv = call_start; - /* in case graphite interval needs to be the previous one */ - if (timercmp(&real_iv_start, call_stop, >) && interval_dur_s) { - // round up to nearest while interval_dur_s - long long d = timeval_diff(&real_iv_start, call_stop); - d += (interval_dur_s * 1000000) - 1; - d /= 1000000 * interval_dur_s; - d *= interval_dur_s; - struct timeval graph_dur = { .tv_sec = d, .tv_usec = 0LL }; - timeval_subtract(&real_iv_start, interval_start, &graph_dur); - } + if (interval_start) { + real_iv_start = *interval_start; + + /* in case graphite interval needs to be the previous one */ + if (timercmp(&real_iv_start, call_stop, >) && interval_dur_s) { + // round up to nearest while interval_dur_s + long long d = timeval_diff(&real_iv_start, call_stop); + d += (interval_dur_s * 1000000) - 1; + d /= 1000000 * interval_dur_s; + d *= interval_dur_s; + struct timeval graph_dur = { .tv_sec = d, .tv_usec = 0LL }; + timeval_subtract(&real_iv_start, interval_start, &graph_dur); + } - if (timercmp(&real_iv_start, call_start, >)) - call_start_in_iv = &real_iv_start; + if (timercmp(&real_iv_start, call_start, >)) + call_start_in_iv = &real_iv_start; - /* this should never happen and is here for sanitization of output */ - if (timercmp(call_start_in_iv, call_stop, >)) { - ilog(LOG_ERR, "Call start seems to exceed call stop"); - return; + /* this should never happen and is here for sanitization of output */ + if (timercmp(call_start_in_iv, call_stop, >)) { + ilog(LOG_ERR, "Call start seems to exceed call stop"); + return; + } } timeval_subtract(&call_duration, call_stop, call_start_in_iv); @@ -87,6 +91,10 @@ void statistics_update_totals(struct packet_stream *ps) { atomic64_get(&ps->stats.errors)); atomic64_add(&rtpe_totalstats_interval.total_relayed_errors, atomic64_get(&ps->stats.errors)); + atomic64_add(&rtpe_totalstats.total_relayed_bytes, + atomic64_get(&ps->stats.bytes)); + atomic64_add(&rtpe_totalstats_interval.total_relayed_bytes, + atomic64_get(&ps->stats.bytes)); } void statistics_update_foreignown_dec(struct call* c) { @@ -214,10 +222,13 @@ void statistics_update_oneway(struct call* c) { timeval_totalstats_average_add(&rtpe_totalstats, &tim_result_duration); timeval_totalstats_average_add(&rtpe_totalstats_interval, &tim_result_duration); - timeval_totalstats_interval_call_duration_add( + timeval_totalstats_call_duration_add( &rtpe_totalstats_interval, &ml->started, &ml->terminated, &rtpe_latest_graphite_interval_start, rtpe_config.graphite_interval); + timeval_totalstats_call_duration_add( + &rtpe_totalstats, &ml->started, &ml->terminated, + NULL, 0); } if (ml->term_reason==FINAL_TIMEOUT) { @@ -248,6 +259,18 @@ void statistics_update_oneway(struct call* c) { g_queue_push_tail(ret, m); \ } while (0) +#define PROM(name, type) \ + do { \ + struct stats_metric *last = g_queue_peek_tail(ret); \ + last->prom_name = name; \ + last->prom_type = type; \ + } while (0) +#define PROMLAB(fmt, ...) \ + do { \ + struct stats_metric *last = g_queue_peek_tail(ret); \ + last->prom_label = g_strdup_printf(fmt, ## __VA_ARGS__); \ + } while (0) + #define METRICva(lb, dsc, fmt1, fmt2, ...) \ do { \ struct stats_metric *m = g_slice_alloc0(sizeof(*m)); \ @@ -378,9 +401,15 @@ GQueue *statistics_gather_metrics(void) { rwlock_unlock_r(&rtpe_callhash_lock); METRIC("sessionsown", "Owned sessions", UINT64F, UINT64F, cur_sessions - atomic64_get(&rtpe_stats.foreign_sessions)); + PROM("sessions", "gauge"); + PROMLAB("type=\"own\""); METRIC("sessionsforeign", "Foreign sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats.foreign_sessions)); + PROM("sessions", "gauge"); + PROMLAB("type=\"foreign\""); + METRIC("sessionstotal", "Total sessions", UINT64F, UINT64F, cur_sessions); METRIC("transcodedmedia", "Transcoded media", UINT64F, UINT64F, atomic64_get(&rtpe_stats.transcoded_media)); + PROM("transcoded_media", "gauge"); METRIC("packetrate", "Packets per second", UINT64F, UINT64F, atomic64_get(&rtpe_stats.packets)); METRIC("byterate", "Bytes per second", UINT64F, UINT64F, atomic64_get(&rtpe_stats.bytes)); @@ -396,19 +425,43 @@ GQueue *statistics_gather_metrics(void) { HEADER("{", ""); METRIC("uptime", "Uptime of rtpengine", "%llu", "%llu seconds", (unsigned long long) time(NULL)-rtpe_totalstats.started); + PROM("uptime_seconds", "gauge"); METRIC("managedsessions", "Total managed sessions", UINT64F, UINT64F, num_sessions); + PROM("sessions_total", "counter"); METRIC("rejectedsessions", "Total rejected sessions", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_rejected_sess)); + PROM("closed_sessions_total", "counter"); + PROMLAB("reason=\"rejected\""); METRIC("timeoutsessions", "Total timed-out sessions via TIMEOUT", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_timeout_sess)); + PROM("closed_sessions_total", "counter"); + PROMLAB("reason=\"timeout\""); METRIC("silenttimeoutsessions", "Total timed-out sessions via SILENT_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_totalstats.total_silent_timeout_sess)); + PROM("closed_sessions_total", "counter"); + PROMLAB("reason=\"silent_timeout\""); METRIC("finaltimeoutsessions", "Total timed-out sessions via FINAL_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_totalstats.total_final_timeout_sess)); + PROM("closed_sessions_total", "counter"); + PROMLAB("reason=\"final_timeout\""); METRIC("offertimeoutsessions", "Total timed-out sessions via OFFER_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_totalstats.total_offer_timeout_sess)); + PROM("closed_sessions_total", "counter"); + PROMLAB("reason=\"offer_timeout\""); METRIC("regularterminatedsessions", "Total regular terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_regular_term_sess)); + PROM("closed_sessions_total", "counter"); + PROMLAB("reason=\"terminated\""); METRIC("forcedterminatedsessions", "Total forced terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_forced_term_sess)); + PROM("closed_sessions_total", "counter"); + PROMLAB("reason=\"force_terminated\""); + METRIC("relayedpackets", "Total relayed packets", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_relayed_packets)); + PROM("packets_total", "counter"); METRIC("relayedpacketerrors", "Total relayed packet errors", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_relayed_errors)); + PROM("packet_errors_total", "counter"); + METRIC("relayedbytes", "Total relayed bytes", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_relayed_bytes)); + PROM("bytes_total", "counter"); + METRIC("zerowaystreams", "Total number of streams with no relayed packets", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_nopacket_relayed_sess)); + PROM("zero_packet_streams_total", "counter"); METRIC("onewaystreams", "Total number of 1-way streams", UINT64F, UINT64F,atomic64_get(&rtpe_totalstats.total_oneway_stream_sess)); + PROM("one_way_sessions_total", "counter"); METRICva("avgcallduration", "Average call duration", "%ld.%06ld", "%ld.%06ld", avg.tv_sec, avg.tv_usec); mutex_lock(&rtpe_totalstats_lastinterval_lock); @@ -485,8 +538,12 @@ GQueue *statistics_gather_metrics(void) { HEADER("proxies", NULL); HEADER("[", NULL); - HEADERl(" %20s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s ", - "Proxy", "Offer", "Answer", "Delete", "Ping", "List", "Query", "StartRec", "StopRec", "Errors", "BlkDTMF", "UnblkDTMF"); + GString *tmp = g_string_new(""); + g_string_append_printf(tmp, " %20s ", "Proxy"); + for (int i = 0; i < NGC_COUNT; i++) + g_string_append_printf(tmp, "| %10s ", ng_command_strings_short[i]); + HEADERl("%s", tmp->str); + g_string_free(tmp, TRUE); struct control_ng_stats total = {0,}; @@ -498,60 +555,47 @@ GQueue *statistics_gather_metrics(void) { } for (GList *l = list; l; l = l->next) { struct control_ng_stats* cur = l->data; - METRICl("", " %20s | %10u | %10u | %10u | %10u | %10u | %10u | %10u | %10u | %10u | %10u | %10u", - sockaddr_print_buf(&cur->proxy), - cur->offer, - cur->answer, - cur->delete, - cur->ping, - cur->list, - cur->query, - cur->start_recording, - cur->stop_recording, - cur->errors, - cur->block_dtmf, - cur->unblock_dtmf); - - total.ping += cur->ping; - total.offer += cur->offer; - total.answer += cur->answer; - total.delete += cur->delete; - total.query += cur->query; - total.list += cur->list; - total.start_recording += cur->start_recording; - total.stop_recording += cur->stop_recording; - total.start_forwarding += cur->start_forwarding; - total.stop_forwarding += cur->stop_forwarding; - total.block_dtmf += cur->block_dtmf; - total.unblock_dtmf += cur->unblock_dtmf; - total.block_media += cur->block_media; - total.unblock_media += cur->unblock_media; - total.play_media += cur->play_media; - total.stop_media += cur->stop_media; - total.play_dtmf += cur->play_dtmf; - total.statistics += cur->statistics; - total.errors += cur->errors; + HEADER("{", NULL); + + GString *tmp = g_string_new(""); METRICsva("proxy", "\"%s\"", sockaddr_print_buf(&cur->proxy)); - METRICs("pingcount", "%u", cur->ping); - METRICs("offercount", "%u", cur->offer); - METRICs("answercount", "%u", cur->answer); - METRICs("deletecount", "%u", cur->delete); - METRICs("querycount", "%u", cur->query); - METRICs("listcount", "%u", cur->list); - METRICs("startreccount", "%u", cur->start_recording); - METRICs("stopreccount", "%u", cur->stop_recording); - METRICs("startfwdcount", "%u", cur->start_forwarding); - METRICs("stopfwdcount", "%u", cur->stop_forwarding); - METRICs("blkdtmfcount", "%u", cur->block_dtmf); - METRICs("unblkdtmfcount", "%u", cur->unblock_dtmf); - METRICs("blkmedia", "%u", cur->block_media); - METRICs("unblkmedia", "%u", cur->unblock_media); - METRICs("playmedia", "%u", cur->play_media); - METRICs("stopmedia", "%u", cur->stop_media); - METRICs("playdtmf", "%u", cur->play_dtmf); - METRICs("statistics", "%u", cur->statistics); - METRICs("errorcount", "%u", cur->errors); + g_string_append_printf(tmp, " %20s ", sockaddr_print_buf(&cur->proxy)); + for (int i = 0; i < NGC_COUNT; i++) { + mutex_lock(&cur->cmd[i].lock); + + g_string_append_printf(tmp, "| %10u ", cur->cmd[i].count); + total.cmd[i].count += cur->cmd[i].count; + + char *mn = g_strdup_printf("%scount", ng_command_strings_short[i]); + char *lw = g_ascii_strdown(mn, -1); + METRICs(lw, "%u", cur->cmd[i].count); + PROM("requests_total", "counter"); + PROMLAB("proxy=\"%s\",request=\"%s\"", sockaddr_print_buf(&cur->proxy), + ng_command_strings[i]); + free(mn); + free(lw); + + mn = g_strdup_printf("%sduration", ng_command_strings_short[i]); + lw = g_ascii_strdown(mn, -1); + METRICsva(lw, "%llu.%06llu", (unsigned long long) cur->cmd[i].time.tv_sec, + (unsigned long long) cur->cmd[i].time.tv_usec); + PROM("request_seconds_total", "counter"); + PROMLAB("proxy=\"%s\",request=\"%s\"", sockaddr_print_buf(&cur->proxy), + ng_command_strings[i]); + free(mn); + free(lw); + + mutex_unlock(&cur->cmd[i].lock); + } + METRICl("", "%s", tmp->str); + g_string_free(tmp, TRUE); + + int errors = g_atomic_int_get(&cur->errors); + total.errors += errors; + METRICs("errorcount", "%i", errors); + PROM("errors_total", "counter"); + PROMLAB("proxy=\"%s\"", sockaddr_print_buf(&cur->proxy)); HEADER("}", NULL); } @@ -560,25 +604,13 @@ GQueue *statistics_gather_metrics(void) { HEADER("]", ""); - METRICs("totalpingcount", "%u", total.ping); - METRICs("totaloffercount", "%u", total.offer); - METRICs("totalanswercount", "%u", total.answer); - METRICs("totaldeletecount", "%u", total.delete); - METRICs("totalquerycount", "%u", total.query); - METRICs("totallistcount", "%u", total.list); - METRICs("totalstartreccount", "%u", total.start_recording); - METRICs("totalstopreccount", "%u", total.stop_recording); - METRICs("totalstartfwdcount", "%u", total.start_forwarding); - METRICs("totalstopfwdcount", "%u", total.stop_forwarding); - METRICs("totalblkdtmfcount", "%u", total.block_dtmf); - METRICs("totalunblkdtmfcount", "%u", total.unblock_dtmf); - METRICs("totalblkmedia", "%u", total.block_media); - METRICs("totalunblkmedia", "%u", total.unblock_media); - METRICs("totalplaymedia", "%u", total.play_media); - METRICs("totalstopmedia", "%u", total.stop_media); - METRICs("totalplaydtmf", "%u", total.play_dtmf); - METRICs("totalstatistics", "%u", total.statistics); - METRICs("totalerrorcount", "%u", total.errors); + for (int i = 0; i < NGC_COUNT; i++) { + char *mn = g_strdup_printf("total%scount", ng_command_strings_short[i]); + char *lw = g_ascii_strdown(mn, -1); + METRICs(lw, "%u", total.cmd[i].count); + free(mn); + free(lw); + } HEADER("}", ""); @@ -604,9 +636,18 @@ GQueue *statistics_gather_metrics(void) { unsigned int l = g_atomic_int_get(&lif->spec->port_pool.last_used); unsigned int r = lif->spec->port_pool.max - lif->spec->port_pool.min + 1; METRICs("used", "%u", r - f); + PROM("ports_used", "gauge"); + PROMLAB("name=\"%s\",address=\"%s\"", lif->logical->name.s, + sockaddr_print_buf(&lif->spec->local_address.addr)); METRICs("used_pct", "%.2f", (double) (r - f) * 100.0 / r); METRICs("free", "%u", f); + PROM("ports_free", "gauge"); + PROMLAB("name=\"%s\",address=\"%s\"", lif->logical->name.s, + sockaddr_print_buf(&lif->spec->local_address.addr)); METRICs("totals", "%u", r); + PROM("ports", "gauge"); + PROMLAB("name=\"%s\",address=\"%s\"", lif->logical->name.s, + sockaddr_print_buf(&lif->spec->local_address.addr)); METRICs("last", "%u", l); HEADER("}", NULL); @@ -627,11 +668,22 @@ GQueue *statistics_gather_metrics(void) { HEADER("{", ""); METRICsva("chain", "\"%s\"", chain); METRICs("num", "%i", g_atomic_int_get(&stats_entry->num_transcoders)); + PROM("transcoders", "gauge"); + PROMLAB("chain=\"%s\"", chain); if (g_atomic_int_get(&stats_entry->last_tv_sec[idx]) == last_tv_sec) { METRICs("packetrate", UINT64F, atomic64_get(&stats_entry->packets_input[idx])); METRICs("byterate", UINT64F, atomic64_get(&stats_entry->bytes_input[idx])); METRICs("samplerate", UINT64F, atomic64_get(&stats_entry->pcm_samples[idx])); } + METRICs("packets", UINT64F, atomic64_get(&stats_entry->packets_input[2])); + PROM("transcode_packets_total", "counter"); + PROMLAB("chain=\"%s\"", chain); + METRICs("bytes", UINT64F, atomic64_get(&stats_entry->bytes_input[2])); + PROM("transcode_bytes_total", "counter"); + PROMLAB("chain=\"%s\"", chain); + METRICs("samples", UINT64F, atomic64_get(&stats_entry->pcm_samples[2])); + PROM("transcode_samples_total", "counter"); + PROMLAB("chain=\"%s\"", chain); HEADER("}", ""); } @@ -651,6 +703,7 @@ static void free_stats_metric(void *p) { g_free(m->label); g_free(m->value_long); g_free(m->value_short); + g_free(m->prom_label); g_slice_free1(sizeof(*m), m); } diff --git a/daemon/websocket.c b/daemon/websocket.c index a90476948..2245860d6 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -7,6 +7,7 @@ #include "str.h" #include "cli.h" #include "control_ng.h" +#include "statistics.h" struct websocket_message; @@ -288,6 +289,54 @@ static const char *websocket_http_ping(struct websocket_message *wm) { } +static void __g_string_free(GString **s) { + g_string_free(*s, TRUE); +} +static void __g_hash_table_destroy(GHashTable **s) { + g_hash_table_destroy(*s); +} +static const char *websocket_http_metrics(struct websocket_message *wm) { + ilog(LOG_DEBUG, "Respoding to GET /metrics"); + + AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics()); + AUTO_CLEANUP_INIT(GString *outp, __g_string_free, g_string_new("")); + AUTO_CLEANUP_INIT(GHashTable *metric_types, __g_hash_table_destroy, + g_hash_table_new(g_str_hash, g_str_equal)); + + for (GList *l = metrics->head; l; l = l->next) { + struct stats_metric *m = l->data; + if (!m->label) + continue; + if (!m->value_short) + continue; + if (!m->prom_name) + continue; + + if (!g_hash_table_lookup(metric_types, m->prom_name)) { + if (m->descr) + g_string_append_printf(outp, "# HELP rtpengine_%s %s\n", + m->prom_name, m->descr); + if (m->prom_type) + g_string_append_printf(outp, "# TYPE rtpengine_%s %s\n", + m->prom_name, m->prom_type); + g_hash_table_insert(metric_types, (void *) m->prom_name, (void *) 0x1); + } + + g_string_append_printf(outp, "rtpengine_%s", m->prom_name); + if (m->prom_label) + g_string_append_printf(outp, "{%s}", m->prom_label); + g_string_append_printf(outp, " %s\n", m->value_short); + } + + if (websocket_http_response(wm->wc, 200, "text/plain", outp->len)) + return "Failed to write response HTTP headers"; + if (websocket_write_http(wm->wc, outp->str, 1)) + return "Failed to write metrics response"; + + return NULL; +} + + // adds printf string to output buffer without triggering response static void websocket_queue_printf(struct cli_writer *cw, const char *fmt, ...) { va_list va; @@ -404,6 +453,8 @@ static int websocket_http_get(struct websocket_conn *wc) { handler = websocket_http_ping; else if (!strncmp(uri, "/cli/", 5)) handler = websocket_http_cli; + else if (!strcmp(uri, "/metrics")) + handler = websocket_http_metrics; if (!handler) { ilog(LOG_WARN, "Unhandled HTTP GET URI: '%s'", uri); diff --git a/include/control_ng.h b/include/control_ng.h index 8346727f1..4e5c837a2 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -9,26 +9,38 @@ struct poller; +enum ng_command { + NGC_PING = 0, + NGC_OFFER, + NGC_ANSWER, + NGC_DELETE, + NGC_QUERY, + NGC_LIST, + NGC_START_RECORDING, + NGC_STOP_RECORDING, + NGC_START_FORWARDING, + NGC_STOP_FORWARDING, + NGC_BLOCK_DTMF, + NGC_UNBLOCK_DTMF, + NGC_BLOCK_MEDIA, + NGC_UNBLOCK_MEDIA, + NGC_PLAY_MEDIA, + NGC_STOP_MEDIA, + NGC_PLAY_DTMF, + NGC_STATISTICS, + + NGC_COUNT // last, number of elements +}; + +struct ng_command_stats { + mutex_t lock; + unsigned int count; + struct timeval time; +}; + struct control_ng_stats { sockaddr_t proxy; - int ping; - int offer; - int answer; - int delete; - int query; - int list; - int start_recording; - int stop_recording; - int start_forwarding; - int stop_forwarding; - int block_dtmf; - int unblock_dtmf; - int block_media; - int unblock_media; - int play_media; - int stop_media; - int play_dtmf; - int statistics; + struct ng_command_stats cmd[NGC_COUNT]; int errors; }; @@ -38,6 +50,9 @@ struct control_ng { struct poller *poller; }; +extern const char *ng_command_strings[NGC_COUNT]; +extern const char *ng_command_strings_short[NGC_COUNT]; + struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char); void control_ng_init(void); void control_ng_cleanup(void); diff --git a/include/statistics.h b/include/statistics.h index 2c3f6175a..b578a1be4 100644 --- a/include/statistics.h +++ b/include/statistics.h @@ -50,6 +50,7 @@ struct totalstats { atomic64 total_forced_term_sess; atomic64 total_relayed_packets; atomic64 total_relayed_errors; + atomic64 total_relayed_bytes; atomic64 total_nopacket_relayed_sess; atomic64 total_oneway_stream_sess; @@ -104,6 +105,9 @@ struct stats_metric { int is_brace; int is_follow_up; int is_int; + const char *prom_name; + const char *prom_type; + char *prom_label; };