TT#82405 add Prometheus stats exporter

Change-Id: Ic55b42a707e430a61c0422c65827ed2145511586
pull/1126/head
Richard Fuchs 5 years ago
parent de7807f0f1
commit 5d895fb7be

@ -1958,3 +1958,8 @@ the same format, including the unique cookie.
For WebSockets, the subprotocol `ng.rtpengine.com` is used and the protocol
follows the same format. Messages must consist of a unique cookie and a string
in bencode format, and responses will also be in the same format.
Prometheus Stats Exporter
-------------------------
The Prometheus metrics can be found under the URI `/metrics`.

@ -29,6 +29,18 @@ const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64] = {
[LOAD_LIMIT_LOAD] = "Load limit exceeded",
[LOAD_LIMIT_BW] = "Bandwidth limit exceeded",
};
const char *ng_command_strings[NGC_COUNT] = {
"ping", "offer", "answer", "delete", "query", "list", "start recording",
"stop recording", "start forwarding", "stop forwarding", "block DTMF",
"unblock DTMF", "block media", "unblock media", "play media", "stop media",
"play DTMF", "statistics",
};
const char *ng_command_strings_short[NGC_COUNT] = {
"Ping", "Offer", "Answer", "Delete", "Query", "List", "StartRec",
"StopRec", "StartFwd", "StopFwd", "BlkDTMF",
"UnblkDTMF", "BlkMedia", "UnblkMedia", "PlayMedia", "StopMedia",
"PlayDTMF", "Stats",
};
static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) {
@ -110,6 +122,12 @@ struct control_ng_stats* get_control_ng_stats(const sockaddr_t *addr) {
cur = g_slice_alloc0(sizeof(struct control_ng_stats));
cur->proxy = *addr;
ilog(LOG_DEBUG,"Adding a proxy for control ng stats:%s", sockaddr_print_buf(addr));
for (int i = 0; i < NGC_COUNT; i++) {
struct ng_command_stats *c = &cur->cmd[i];
mutex_init(&c->lock);
}
g_hash_table_insert(rtpe_cngs_hash, &cur->proxy, cur);
}
mutex_unlock(&rtpe_cngs_lock);
@ -127,6 +145,7 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr,
struct timeval cmd_start, cmd_stop, cmd_process_time;
struct control_ng_stats* cur = get_control_ng_stats(&sin->address);
int funcret = -1;
enum ng_command command = -1;
str_chr_str(&data, buf, ' ');
if (!data.s || data.s == buf->s) {
@ -182,87 +201,84 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr,
g_string_free(log_str, TRUE);
}
// XXX do the strcmp's only once
errstr = NULL;
resultstr = "ok";
// start command timer
gettimeofday(&cmd_start, NULL);
int cmdcode = __csh_lookup(&cmd);
switch (cmdcode) {
switch (__csh_lookup(&cmd)) {
case CSH_LOOKUP("ping"):
resultstr = "pong";
g_atomic_int_inc(&cur->ping);
command = NGC_PING;
break;
case CSH_LOOKUP("offer"):
errstr = call_offer_ng(dict, resp, addr, sin);
g_atomic_int_inc(&cur->offer);
command = NGC_OFFER;
break;
case CSH_LOOKUP("answer"):
errstr = call_answer_ng(dict, resp);
g_atomic_int_inc(&cur->answer);
command = NGC_ANSWER;
break;
case CSH_LOOKUP("delete"):
errstr = call_delete_ng(dict, resp);
g_atomic_int_inc(&cur->delete);
command = NGC_DELETE;
break;
case CSH_LOOKUP("query"):
errstr = call_query_ng(dict, resp);
g_atomic_int_inc(&cur->query);
command = NGC_QUERY;
break;
case CSH_LOOKUP("list"):
errstr = call_list_ng(dict, resp);
g_atomic_int_inc(&cur->list);
command = NGC_LIST;
break;
case CSH_LOOKUP("start recording"):
errstr = call_start_recording_ng(dict, resp);
g_atomic_int_inc(&cur->start_recording);
command = NGC_START_RECORDING;
break;
case CSH_LOOKUP("stop recording"):
errstr = call_stop_recording_ng(dict, resp);
g_atomic_int_inc(&cur->stop_recording);
command = NGC_STOP_RECORDING;
break;
case CSH_LOOKUP("start forwarding"):
errstr = call_start_forwarding_ng(dict, resp);
g_atomic_int_inc(&cur->start_forwarding);
command = NGC_START_FORWARDING;
break;
case CSH_LOOKUP("stop forwarding"):
errstr = call_stop_forwarding_ng(dict, resp);
g_atomic_int_inc(&cur->stop_forwarding);
command = NGC_STOP_FORWARDING;
break;
case CSH_LOOKUP("block DTMF"):
errstr = call_block_dtmf_ng(dict, resp);
g_atomic_int_inc(&cur->block_dtmf);
command = NGC_BLOCK_DTMF;
break;
case CSH_LOOKUP("unblock DTMF"):
errstr = call_unblock_dtmf_ng(dict, resp);
g_atomic_int_inc(&cur->unblock_dtmf);
command = NGC_UNBLOCK_DTMF;
break;
case CSH_LOOKUP("block media"):
errstr = call_block_media_ng(dict, resp);
g_atomic_int_inc(&cur->block_media);
command = NGC_BLOCK_MEDIA;
break;
case CSH_LOOKUP("unblock media"):
errstr = call_unblock_media_ng(dict, resp);
g_atomic_int_inc(&cur->unblock_media);
command = NGC_UNBLOCK_MEDIA;
break;
case CSH_LOOKUP("play media"):
errstr = call_play_media_ng(dict, resp);
g_atomic_int_inc(&cur->play_media);
command = NGC_PLAY_MEDIA;
break;
case CSH_LOOKUP("stop media"):
errstr = call_stop_media_ng(dict, resp);
g_atomic_int_inc(&cur->stop_media);
command = NGC_STOP_MEDIA;
break;
case CSH_LOOKUP("play DTMF"):
errstr = call_play_dtmf_ng(dict, resp);
g_atomic_int_inc(&cur->play_dtmf);
command = NGC_PLAY_DTMF;
break;
case CSH_LOOKUP("statistics"):
errstr = statistics_ng(dict, resp);
g_atomic_int_inc(&cur->statistics);
command = NGC_STATISTICS;
break;
default:
errstr = "Unrecognized command";
@ -273,25 +289,35 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr,
//print command duration
timeval_from_us(&cmd_process_time, timeval_diff(&cmd_stop, &cmd_start));
if (command >= 0 && command < NGC_COUNT) {
mutex_lock(&cur->cmd[command].lock);
cur->cmd[command].count++;
timeval_add(&cur->cmd[command].time, &cur->cmd[command].time, &cmd_process_time);
mutex_unlock(&cur->cmd[command].lock);
}
if (errstr)
goto err_send;
bencode_dictionary_add_string(resp, "result", resultstr);
// update interval statistics
switch (cmdcode) {
case CSH_LOOKUP("offer"):
// XXX could generalise these, same as above
switch (command) {
case NGC_OFFER:
atomic64_inc(&rtpe_statsps.offers);
timeval_update_request_time(&rtpe_totalstats_interval.offer, &cmd_process_time);
break;
case CSH_LOOKUP("answer"):
case NGC_ANSWER:
atomic64_inc(&rtpe_statsps.answers);
timeval_update_request_time(&rtpe_totalstats_interval.answer, &cmd_process_time);
break;
case CSH_LOOKUP("delete"):
case NGC_DELETE:
atomic64_inc(&rtpe_statsps.deletes);
timeval_update_request_time(&rtpe_totalstats_interval.delete, &cmd_process_time);
break;
default:
break;
}
goto send_resp;

@ -40,33 +40,37 @@ static void timeval_totalstats_average_add(struct totalstats *s, const struct ti
mutex_unlock(&s->total_average_lock);
}
static void timeval_totalstats_interval_call_duration_add(struct totalstats *s,
static void timeval_totalstats_call_duration_add(struct totalstats *s,
struct timeval *call_start, struct timeval *call_stop,
struct timeval *interval_start, int interval_dur_s) {
/* work with graphite interval start val which might be changed elsewhere in the code*/
struct timeval real_iv_start = *interval_start;
struct timeval real_iv_start = {0,};
struct timeval call_duration;
struct timeval *call_start_in_iv = call_start;
/* in case graphite interval needs to be the previous one */
if (timercmp(&real_iv_start, call_stop, >) && interval_dur_s) {
// round up to nearest while interval_dur_s
long long d = timeval_diff(&real_iv_start, call_stop);
d += (interval_dur_s * 1000000) - 1;
d /= 1000000 * interval_dur_s;
d *= interval_dur_s;
struct timeval graph_dur = { .tv_sec = d, .tv_usec = 0LL };
timeval_subtract(&real_iv_start, interval_start, &graph_dur);
}
if (interval_start) {
real_iv_start = *interval_start;
/* in case graphite interval needs to be the previous one */
if (timercmp(&real_iv_start, call_stop, >) && interval_dur_s) {
// round up to nearest while interval_dur_s
long long d = timeval_diff(&real_iv_start, call_stop);
d += (interval_dur_s * 1000000) - 1;
d /= 1000000 * interval_dur_s;
d *= interval_dur_s;
struct timeval graph_dur = { .tv_sec = d, .tv_usec = 0LL };
timeval_subtract(&real_iv_start, interval_start, &graph_dur);
}
if (timercmp(&real_iv_start, call_start, >))
call_start_in_iv = &real_iv_start;
if (timercmp(&real_iv_start, call_start, >))
call_start_in_iv = &real_iv_start;
/* this should never happen and is here for sanitization of output */
if (timercmp(call_start_in_iv, call_stop, >)) {
ilog(LOG_ERR, "Call start seems to exceed call stop");
return;
/* this should never happen and is here for sanitization of output */
if (timercmp(call_start_in_iv, call_stop, >)) {
ilog(LOG_ERR, "Call start seems to exceed call stop");
return;
}
}
timeval_subtract(&call_duration, call_stop, call_start_in_iv);
@ -87,6 +91,10 @@ void statistics_update_totals(struct packet_stream *ps) {
atomic64_get(&ps->stats.errors));
atomic64_add(&rtpe_totalstats_interval.total_relayed_errors,
atomic64_get(&ps->stats.errors));
atomic64_add(&rtpe_totalstats.total_relayed_bytes,
atomic64_get(&ps->stats.bytes));
atomic64_add(&rtpe_totalstats_interval.total_relayed_bytes,
atomic64_get(&ps->stats.bytes));
}
void statistics_update_foreignown_dec(struct call* c) {
@ -214,10 +222,13 @@ void statistics_update_oneway(struct call* c) {
timeval_totalstats_average_add(&rtpe_totalstats, &tim_result_duration);
timeval_totalstats_average_add(&rtpe_totalstats_interval, &tim_result_duration);
timeval_totalstats_interval_call_duration_add(
timeval_totalstats_call_duration_add(
&rtpe_totalstats_interval, &ml->started, &ml->terminated,
&rtpe_latest_graphite_interval_start,
rtpe_config.graphite_interval);
timeval_totalstats_call_duration_add(
&rtpe_totalstats, &ml->started, &ml->terminated,
NULL, 0);
}
if (ml->term_reason==FINAL_TIMEOUT) {
@ -248,6 +259,18 @@ void statistics_update_oneway(struct call* c) {
g_queue_push_tail(ret, m); \
} while (0)
#define PROM(name, type) \
do { \
struct stats_metric *last = g_queue_peek_tail(ret); \
last->prom_name = name; \
last->prom_type = type; \
} while (0)
#define PROMLAB(fmt, ...) \
do { \
struct stats_metric *last = g_queue_peek_tail(ret); \
last->prom_label = g_strdup_printf(fmt, ## __VA_ARGS__); \
} while (0)
#define METRICva(lb, dsc, fmt1, fmt2, ...) \
do { \
struct stats_metric *m = g_slice_alloc0(sizeof(*m)); \
@ -378,9 +401,15 @@ GQueue *statistics_gather_metrics(void) {
rwlock_unlock_r(&rtpe_callhash_lock);
METRIC("sessionsown", "Owned sessions", UINT64F, UINT64F, cur_sessions - atomic64_get(&rtpe_stats.foreign_sessions));
PROM("sessions", "gauge");
PROMLAB("type=\"own\"");
METRIC("sessionsforeign", "Foreign sessions", UINT64F, UINT64F, atomic64_get(&rtpe_stats.foreign_sessions));
PROM("sessions", "gauge");
PROMLAB("type=\"foreign\"");
METRIC("sessionstotal", "Total sessions", UINT64F, UINT64F, cur_sessions);
METRIC("transcodedmedia", "Transcoded media", UINT64F, UINT64F, atomic64_get(&rtpe_stats.transcoded_media));
PROM("transcoded_media", "gauge");
METRIC("packetrate", "Packets per second", UINT64F, UINT64F, atomic64_get(&rtpe_stats.packets));
METRIC("byterate", "Bytes per second", UINT64F, UINT64F, atomic64_get(&rtpe_stats.bytes));
@ -396,19 +425,43 @@ GQueue *statistics_gather_metrics(void) {
HEADER("{", "");
METRIC("uptime", "Uptime of rtpengine", "%llu", "%llu seconds", (unsigned long long) time(NULL)-rtpe_totalstats.started);
PROM("uptime_seconds", "gauge");
METRIC("managedsessions", "Total managed sessions", UINT64F, UINT64F, num_sessions);
PROM("sessions_total", "counter");
METRIC("rejectedsessions", "Total rejected sessions", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_rejected_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"rejected\"");
METRIC("timeoutsessions", "Total timed-out sessions via TIMEOUT", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"timeout\"");
METRIC("silenttimeoutsessions", "Total timed-out sessions via SILENT_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_totalstats.total_silent_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"silent_timeout\"");
METRIC("finaltimeoutsessions", "Total timed-out sessions via FINAL_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_totalstats.total_final_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"final_timeout\"");
METRIC("offertimeoutsessions", "Total timed-out sessions via OFFER_TIMEOUT", UINT64F, UINT64F,atomic64_get(&rtpe_totalstats.total_offer_timeout_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"offer_timeout\"");
METRIC("regularterminatedsessions", "Total regular terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_regular_term_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"terminated\"");
METRIC("forcedterminatedsessions", "Total forced terminated sessions", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_forced_term_sess));
PROM("closed_sessions_total", "counter");
PROMLAB("reason=\"force_terminated\"");
METRIC("relayedpackets", "Total relayed packets", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_relayed_packets));
PROM("packets_total", "counter");
METRIC("relayedpacketerrors", "Total relayed packet errors", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_relayed_errors));
PROM("packet_errors_total", "counter");
METRIC("relayedbytes", "Total relayed bytes", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_relayed_bytes));
PROM("bytes_total", "counter");
METRIC("zerowaystreams", "Total number of streams with no relayed packets", UINT64F, UINT64F, atomic64_get(&rtpe_totalstats.total_nopacket_relayed_sess));
PROM("zero_packet_streams_total", "counter");
METRIC("onewaystreams", "Total number of 1-way streams", UINT64F, UINT64F,atomic64_get(&rtpe_totalstats.total_oneway_stream_sess));
PROM("one_way_sessions_total", "counter");
METRICva("avgcallduration", "Average call duration", "%ld.%06ld", "%ld.%06ld", avg.tv_sec, avg.tv_usec);
mutex_lock(&rtpe_totalstats_lastinterval_lock);
@ -485,8 +538,12 @@ GQueue *statistics_gather_metrics(void) {
HEADER("proxies", NULL);
HEADER("[", NULL);
HEADERl(" %20s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s ",
"Proxy", "Offer", "Answer", "Delete", "Ping", "List", "Query", "StartRec", "StopRec", "Errors", "BlkDTMF", "UnblkDTMF");
GString *tmp = g_string_new("");
g_string_append_printf(tmp, " %20s ", "Proxy");
for (int i = 0; i < NGC_COUNT; i++)
g_string_append_printf(tmp, "| %10s ", ng_command_strings_short[i]);
HEADERl("%s", tmp->str);
g_string_free(tmp, TRUE);
struct control_ng_stats total = {0,};
@ -498,60 +555,47 @@ GQueue *statistics_gather_metrics(void) {
}
for (GList *l = list; l; l = l->next) {
struct control_ng_stats* cur = l->data;
METRICl("", " %20s | %10u | %10u | %10u | %10u | %10u | %10u | %10u | %10u | %10u | %10u | %10u",
sockaddr_print_buf(&cur->proxy),
cur->offer,
cur->answer,
cur->delete,
cur->ping,
cur->list,
cur->query,
cur->start_recording,
cur->stop_recording,
cur->errors,
cur->block_dtmf,
cur->unblock_dtmf);
total.ping += cur->ping;
total.offer += cur->offer;
total.answer += cur->answer;
total.delete += cur->delete;
total.query += cur->query;
total.list += cur->list;
total.start_recording += cur->start_recording;
total.stop_recording += cur->stop_recording;
total.start_forwarding += cur->start_forwarding;
total.stop_forwarding += cur->stop_forwarding;
total.block_dtmf += cur->block_dtmf;
total.unblock_dtmf += cur->unblock_dtmf;
total.block_media += cur->block_media;
total.unblock_media += cur->unblock_media;
total.play_media += cur->play_media;
total.stop_media += cur->stop_media;
total.play_dtmf += cur->play_dtmf;
total.statistics += cur->statistics;
total.errors += cur->errors;
HEADER("{", NULL);
GString *tmp = g_string_new("");
METRICsva("proxy", "\"%s\"", sockaddr_print_buf(&cur->proxy));
METRICs("pingcount", "%u", cur->ping);
METRICs("offercount", "%u", cur->offer);
METRICs("answercount", "%u", cur->answer);
METRICs("deletecount", "%u", cur->delete);
METRICs("querycount", "%u", cur->query);
METRICs("listcount", "%u", cur->list);
METRICs("startreccount", "%u", cur->start_recording);
METRICs("stopreccount", "%u", cur->stop_recording);
METRICs("startfwdcount", "%u", cur->start_forwarding);
METRICs("stopfwdcount", "%u", cur->stop_forwarding);
METRICs("blkdtmfcount", "%u", cur->block_dtmf);
METRICs("unblkdtmfcount", "%u", cur->unblock_dtmf);
METRICs("blkmedia", "%u", cur->block_media);
METRICs("unblkmedia", "%u", cur->unblock_media);
METRICs("playmedia", "%u", cur->play_media);
METRICs("stopmedia", "%u", cur->stop_media);
METRICs("playdtmf", "%u", cur->play_dtmf);
METRICs("statistics", "%u", cur->statistics);
METRICs("errorcount", "%u", cur->errors);
g_string_append_printf(tmp, " %20s ", sockaddr_print_buf(&cur->proxy));
for (int i = 0; i < NGC_COUNT; i++) {
mutex_lock(&cur->cmd[i].lock);
g_string_append_printf(tmp, "| %10u ", cur->cmd[i].count);
total.cmd[i].count += cur->cmd[i].count;
char *mn = g_strdup_printf("%scount", ng_command_strings_short[i]);
char *lw = g_ascii_strdown(mn, -1);
METRICs(lw, "%u", cur->cmd[i].count);
PROM("requests_total", "counter");
PROMLAB("proxy=\"%s\",request=\"%s\"", sockaddr_print_buf(&cur->proxy),
ng_command_strings[i]);
free(mn);
free(lw);
mn = g_strdup_printf("%sduration", ng_command_strings_short[i]);
lw = g_ascii_strdown(mn, -1);
METRICsva(lw, "%llu.%06llu", (unsigned long long) cur->cmd[i].time.tv_sec,
(unsigned long long) cur->cmd[i].time.tv_usec);
PROM("request_seconds_total", "counter");
PROMLAB("proxy=\"%s\",request=\"%s\"", sockaddr_print_buf(&cur->proxy),
ng_command_strings[i]);
free(mn);
free(lw);
mutex_unlock(&cur->cmd[i].lock);
}
METRICl("", "%s", tmp->str);
g_string_free(tmp, TRUE);
int errors = g_atomic_int_get(&cur->errors);
total.errors += errors;
METRICs("errorcount", "%i", errors);
PROM("errors_total", "counter");
PROMLAB("proxy=\"%s\"", sockaddr_print_buf(&cur->proxy));
HEADER("}", NULL);
}
@ -560,25 +604,13 @@ GQueue *statistics_gather_metrics(void) {
HEADER("]", "");
METRICs("totalpingcount", "%u", total.ping);
METRICs("totaloffercount", "%u", total.offer);
METRICs("totalanswercount", "%u", total.answer);
METRICs("totaldeletecount", "%u", total.delete);
METRICs("totalquerycount", "%u", total.query);
METRICs("totallistcount", "%u", total.list);
METRICs("totalstartreccount", "%u", total.start_recording);
METRICs("totalstopreccount", "%u", total.stop_recording);
METRICs("totalstartfwdcount", "%u", total.start_forwarding);
METRICs("totalstopfwdcount", "%u", total.stop_forwarding);
METRICs("totalblkdtmfcount", "%u", total.block_dtmf);
METRICs("totalunblkdtmfcount", "%u", total.unblock_dtmf);
METRICs("totalblkmedia", "%u", total.block_media);
METRICs("totalunblkmedia", "%u", total.unblock_media);
METRICs("totalplaymedia", "%u", total.play_media);
METRICs("totalstopmedia", "%u", total.stop_media);
METRICs("totalplaydtmf", "%u", total.play_dtmf);
METRICs("totalstatistics", "%u", total.statistics);
METRICs("totalerrorcount", "%u", total.errors);
for (int i = 0; i < NGC_COUNT; i++) {
char *mn = g_strdup_printf("total%scount", ng_command_strings_short[i]);
char *lw = g_ascii_strdown(mn, -1);
METRICs(lw, "%u", total.cmd[i].count);
free(mn);
free(lw);
}
HEADER("}", "");
@ -604,9 +636,18 @@ GQueue *statistics_gather_metrics(void) {
unsigned int l = g_atomic_int_get(&lif->spec->port_pool.last_used);
unsigned int r = lif->spec->port_pool.max - lif->spec->port_pool.min + 1;
METRICs("used", "%u", r - f);
PROM("ports_used", "gauge");
PROMLAB("name=\"%s\",address=\"%s\"", lif->logical->name.s,
sockaddr_print_buf(&lif->spec->local_address.addr));
METRICs("used_pct", "%.2f", (double) (r - f) * 100.0 / r);
METRICs("free", "%u", f);
PROM("ports_free", "gauge");
PROMLAB("name=\"%s\",address=\"%s\"", lif->logical->name.s,
sockaddr_print_buf(&lif->spec->local_address.addr));
METRICs("totals", "%u", r);
PROM("ports", "gauge");
PROMLAB("name=\"%s\",address=\"%s\"", lif->logical->name.s,
sockaddr_print_buf(&lif->spec->local_address.addr));
METRICs("last", "%u", l);
HEADER("}", NULL);
@ -627,11 +668,22 @@ GQueue *statistics_gather_metrics(void) {
HEADER("{", "");
METRICsva("chain", "\"%s\"", chain);
METRICs("num", "%i", g_atomic_int_get(&stats_entry->num_transcoders));
PROM("transcoders", "gauge");
PROMLAB("chain=\"%s\"", chain);
if (g_atomic_int_get(&stats_entry->last_tv_sec[idx]) == last_tv_sec) {
METRICs("packetrate", UINT64F, atomic64_get(&stats_entry->packets_input[idx]));
METRICs("byterate", UINT64F, atomic64_get(&stats_entry->bytes_input[idx]));
METRICs("samplerate", UINT64F, atomic64_get(&stats_entry->pcm_samples[idx]));
}
METRICs("packets", UINT64F, atomic64_get(&stats_entry->packets_input[2]));
PROM("transcode_packets_total", "counter");
PROMLAB("chain=\"%s\"", chain);
METRICs("bytes", UINT64F, atomic64_get(&stats_entry->bytes_input[2]));
PROM("transcode_bytes_total", "counter");
PROMLAB("chain=\"%s\"", chain);
METRICs("samples", UINT64F, atomic64_get(&stats_entry->pcm_samples[2]));
PROM("transcode_samples_total", "counter");
PROMLAB("chain=\"%s\"", chain);
HEADER("}", "");
}
@ -651,6 +703,7 @@ static void free_stats_metric(void *p) {
g_free(m->label);
g_free(m->value_long);
g_free(m->value_short);
g_free(m->prom_label);
g_slice_free1(sizeof(*m), m);
}

@ -7,6 +7,7 @@
#include "str.h"
#include "cli.h"
#include "control_ng.h"
#include "statistics.h"
struct websocket_message;
@ -288,6 +289,54 @@ static const char *websocket_http_ping(struct websocket_message *wm) {
}
static void __g_string_free(GString **s) {
g_string_free(*s, TRUE);
}
static void __g_hash_table_destroy(GHashTable **s) {
g_hash_table_destroy(*s);
}
static const char *websocket_http_metrics(struct websocket_message *wm) {
ilog(LOG_DEBUG, "Respoding to GET /metrics");
AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics());
AUTO_CLEANUP_INIT(GString *outp, __g_string_free, g_string_new(""));
AUTO_CLEANUP_INIT(GHashTable *metric_types, __g_hash_table_destroy,
g_hash_table_new(g_str_hash, g_str_equal));
for (GList *l = metrics->head; l; l = l->next) {
struct stats_metric *m = l->data;
if (!m->label)
continue;
if (!m->value_short)
continue;
if (!m->prom_name)
continue;
if (!g_hash_table_lookup(metric_types, m->prom_name)) {
if (m->descr)
g_string_append_printf(outp, "# HELP rtpengine_%s %s\n",
m->prom_name, m->descr);
if (m->prom_type)
g_string_append_printf(outp, "# TYPE rtpengine_%s %s\n",
m->prom_name, m->prom_type);
g_hash_table_insert(metric_types, (void *) m->prom_name, (void *) 0x1);
}
g_string_append_printf(outp, "rtpengine_%s", m->prom_name);
if (m->prom_label)
g_string_append_printf(outp, "{%s}", m->prom_label);
g_string_append_printf(outp, " %s\n", m->value_short);
}
if (websocket_http_response(wm->wc, 200, "text/plain", outp->len))
return "Failed to write response HTTP headers";
if (websocket_write_http(wm->wc, outp->str, 1))
return "Failed to write metrics response";
return NULL;
}
// adds printf string to output buffer without triggering response
static void websocket_queue_printf(struct cli_writer *cw, const char *fmt, ...) {
va_list va;
@ -404,6 +453,8 @@ static int websocket_http_get(struct websocket_conn *wc) {
handler = websocket_http_ping;
else if (!strncmp(uri, "/cli/", 5))
handler = websocket_http_cli;
else if (!strcmp(uri, "/metrics"))
handler = websocket_http_metrics;
if (!handler) {
ilog(LOG_WARN, "Unhandled HTTP GET URI: '%s'", uri);

@ -9,26 +9,38 @@
struct poller;
enum ng_command {
NGC_PING = 0,
NGC_OFFER,
NGC_ANSWER,
NGC_DELETE,
NGC_QUERY,
NGC_LIST,
NGC_START_RECORDING,
NGC_STOP_RECORDING,
NGC_START_FORWARDING,
NGC_STOP_FORWARDING,
NGC_BLOCK_DTMF,
NGC_UNBLOCK_DTMF,
NGC_BLOCK_MEDIA,
NGC_UNBLOCK_MEDIA,
NGC_PLAY_MEDIA,
NGC_STOP_MEDIA,
NGC_PLAY_DTMF,
NGC_STATISTICS,
NGC_COUNT // last, number of elements
};
struct ng_command_stats {
mutex_t lock;
unsigned int count;
struct timeval time;
};
struct control_ng_stats {
sockaddr_t proxy;
int ping;
int offer;
int answer;
int delete;
int query;
int list;
int start_recording;
int stop_recording;
int start_forwarding;
int stop_forwarding;
int block_dtmf;
int unblock_dtmf;
int block_media;
int unblock_media;
int play_media;
int stop_media;
int play_dtmf;
int statistics;
struct ng_command_stats cmd[NGC_COUNT];
int errors;
};
@ -38,6 +50,9 @@ struct control_ng {
struct poller *poller;
};
extern const char *ng_command_strings[NGC_COUNT];
extern const char *ng_command_strings_short[NGC_COUNT];
struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char);
void control_ng_init(void);
void control_ng_cleanup(void);

@ -50,6 +50,7 @@ struct totalstats {
atomic64 total_forced_term_sess;
atomic64 total_relayed_packets;
atomic64 total_relayed_errors;
atomic64 total_relayed_bytes;
atomic64 total_nopacket_relayed_sess;
atomic64 total_oneway_stream_sess;
@ -104,6 +105,9 @@ struct stats_metric {
int is_brace;
int is_follow_up;
int is_int;
const char *prom_name;
const char *prom_type;
char *prom_label;
};

Loading…
Cancel
Save