You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rtpengine/daemon/graphite.c

344 lines
13 KiB

#include "graphite.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <poll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <string.h>
#include "log.h"
#include "call.h"
#include "socket.h"
#include "statistics.h"
#include "main.h"
int64_t rtpe_latest_graphite_interval_start;
static socket_t graphite_sock;
static int connection_state = STATE_DISCONNECTED;
//struct totalstats totalstats_prev;
static int64_t next_run;
// HEAD: static time_t rtpe_now, next_run;
static char* graphite_prefix = NULL;
static int64_t graphite_interval_tv;
struct global_stats_counter rtpe_stats_graphite_diff; // per-interval increases
static struct global_stats_counter rtpe_stats_graphite_intv; // copied out when graphite stats run
struct global_gauge_min_max rtpe_gauge_graphite_min_max;
struct global_gauge_min_max rtpe_gauge_graphite_min_max_sampled;
struct global_rate_min_max rtpe_rate_graphite_min_max;
struct global_rate_min_max_avg rtpe_rate_graphite_min_max_avg_sampled;
struct global_sampled_min_max rtpe_sampled_graphite_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled;
static struct global_stats_sampled rtpe_sampled_graphite_min_max_diff;
static struct global_stats_sampled rtpe_sampled_graphite_min_max_intv;
struct global_sampled_avg rtpe_sampled_graphite_avg;
void set_graphite_interval_tv(int64_t tv) {
graphite_interval_tv = tv;
}
void set_prefix(char* prefix) {
graphite_prefix = g_strdup(prefix);
}
void free_prefix(void) {
g_free(graphite_prefix);
}
static int connect_to_graphite_server(const endpoint_t *graphite_ep) {
int rc;
if (!graphite_ep) {
ilog(LOG_ERROR, "NULL graphite_ep");
return -1;
}
ilog(LOG_INFO, "Connecting to graphite server %s", endpoint_print_buf(graphite_ep));
rc = connect_socket_nb(&graphite_sock, SOCK_STREAM, graphite_ep);
if (rtpe_config.graphite_timeout > 0 && !(graphite_sock.fd < 0)) {
usertimeout(&graphite_sock, rtpe_config.graphite_timeout * 1000);
}
if (rc == -1) {
ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite.");
return -1;
}
if (rc == 0)
ilog(LOG_INFO, "Graphite server connected.");
else {
/* EINPROGRESS */
ilog(LOG_INFO, "Connection to graphite is in progress.");
connection_state = STATE_IN_PROGRESS;
}
return 0;
}
GString *print_graphite_data(void) {
int64_t time_diff_us = rtpe_now - rtpe_latest_graphite_interval_start;
rtpe_latest_graphite_interval_start = rtpe_now;
stats_counters_calc_diff(rtpe_stats, &rtpe_stats_graphite_intv, &rtpe_stats_graphite_diff);
stats_rate_min_max_avg_sample(&rtpe_rate_graphite_min_max, &rtpe_rate_graphite_min_max_avg_sampled,
time_diff_us, &rtpe_stats_graphite_diff);
stats_gauge_min_max_sample(&rtpe_gauge_graphite_min_max_sampled, &rtpe_gauge_graphite_min_max,
&rtpe_stats_gauge);
stats_sampled_calc_diff(&rtpe_stats_sampled, &rtpe_sampled_graphite_min_max_intv,
&rtpe_sampled_graphite_min_max_diff);
stats_sampled_min_max_sample(&rtpe_sampled_graphite_min_max, &rtpe_sampled_graphite_min_max_sampled);
stats_sampled_avg(&rtpe_sampled_graphite_avg, &rtpe_sampled_graphite_min_max_diff);
GString *graph_str = g_string_new("");
#define GPF(fmt, ...) \
if (graphite_prefix) \
g_string_append(graph_str, graphite_prefix); \
g_string_append_printf(graph_str, fmt " %" PRId64 "\n", ##__VA_ARGS__, rtpe_now / 1000000L)
for (int i = 0; i < OP_COUNT; i++) {
GPF("%s_time_min %.6f", ng_command_strings_esc[i],
(double) atomic64_get_na(&rtpe_sampled_graphite_min_max_sampled.min.ng_command_times[i]) / 1000000.0);
GPF("%s_time_max %.6f", ng_command_strings_esc[i],
(double) atomic64_get_na(&rtpe_sampled_graphite_min_max_sampled.max.ng_command_times[i]) / 1000000.0);
GPF("%s_time_avg %.6f", ng_command_strings_esc[i],
(double) atomic64_get_na(&rtpe_sampled_graphite_avg.avg.ng_command_times[i]) / 1000000.0);
GPF("%ss_ps_min %" PRIu64, ng_command_strings_esc[i], atomic64_get_na(&rtpe_rate_graphite_min_max_avg_sampled.min.ng_commands[i]));
GPF("%ss_ps_max %" PRIu64, ng_command_strings_esc[i], atomic64_get_na(&rtpe_rate_graphite_min_max_avg_sampled.max.ng_commands[i]));
GPF("%ss_ps_avg %" PRIu64, ng_command_strings_esc[i], atomic64_get_na(&rtpe_rate_graphite_min_max_avg_sampled.avg.ng_commands[i]));
ilog(LOG_DEBUG, "Min/Max/Avg %s processing delay: %.6f/%.6f/%.6f sec",
ng_command_strings[i],
(double) atomic64_get_na(&rtpe_sampled_graphite_min_max_sampled.min.ng_command_times[i]) / 1000000.0,
(double) atomic64_get_na(&rtpe_sampled_graphite_min_max_sampled.max.ng_command_times[i]) / 1000000.0,
(double) atomic64_get_na(&rtpe_sampled_graphite_avg.avg.ng_command_times[i]) / 1000000.0);
GPF("%s_count %" PRIu64, ng_command_strings_esc[i], atomic64_get_na(&rtpe_stats->ng_commands[i]));
}
GPF("call_dur %.6f", (double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0);
int64_t avg_duration;
uint64_t managed_sess = atomic64_get_na(&rtpe_stats_graphite_diff.managed_sess);
if (managed_sess)
avg_duration = atomic64_get_na(&rtpe_stats_graphite_diff.call_duration) / managed_sess;
else
avg_duration = 0;
GPF("average_call_dur %" PRId64 ".%06" PRId64, avg_duration / 1000000, avg_duration % 1000000);
GPF("forced_term_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.forced_term_sess));
GPF("managed_sess %" PRIu64, atomic64_get_na(&rtpe_stats->managed_sess));
GPF("managed_sess_min %" PRIu64, atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.min.total_sessions));
GPF("managed_sess_max %" PRIu64, atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.max.total_sessions));
GPF("current_sessions_total %" PRIu64, atomic64_get_na(&rtpe_stats_gauge.total_sessions));
GPF("current_sessions_own %" PRIu64, atomic64_get_na(&rtpe_stats_gauge.total_sessions) - atomic64_get_na(&rtpe_stats_gauge.foreign_sessions));
GPF("current_sessions_foreign %" PRIu64, atomic64_get_na(&rtpe_stats_gauge.foreign_sessions));
GPF("current_transcoded_media %" PRIu64, atomic64_get_na(&rtpe_stats_gauge.transcoded_media));
GPF("current_sessions_ipv4 %" PRIu64, atomic64_get_na(&rtpe_stats_gauge.ipv4_sessions));
GPF("current_sessions_ipv6 %" PRIu64, atomic64_get_na(&rtpe_stats_gauge.ipv6_sessions));
GPF("current_sessions_mixed %" PRIu64, atomic64_get_na(&rtpe_stats_gauge.mixed_sessions));
GPF("nopacket_relayed_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.nopacket_relayed_sess));
GPF("oneway_stream_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.oneway_stream_sess));
GPF("regular_term_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.regular_term_sess));
GPF("relayed_errors_user %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.errors_user));
GPF("relayed_packets_user %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.packets_user));
GPF("relayed_bytes_user %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_user));
GPF("relayed_errors_kernel %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.errors_kernel));
GPF("relayed_packets_kernel %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.packets_kernel));
GPF("relayed_bytes_kernel %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_kernel));
GPF("relayed_errors %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.errors_user) +
atomic64_get_na(&rtpe_stats_graphite_diff.errors_kernel));
GPF("relayed_packets %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.packets_user) +
atomic64_get_na(&rtpe_stats_graphite_diff.packets_kernel));
GPF("relayed_bytes %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.bytes_user) +
atomic64_get_na(&rtpe_stats_graphite_diff.bytes_kernel));
GPF("silent_timeout_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.silent_timeout_sess));
GPF("final_timeout_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.final_timeout_sess));
GPF("offer_timeout_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.offer_timeout_sess));
GPF("timeout_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.timeout_sess));
GPF("reject_sess %" PRIu64, atomic64_get_na(&rtpe_stats_graphite_diff.rejected_sess));
for (__auto_type l = all_local_interfaces.head; l; l = l->next) {
struct local_intf *lif = l->data;
// only show first-order interface entries: socket families must match
if (lif->logical->preferred_family != lif->spec->local_address.addr.family)
continue;
int num_ports = lif->spec->port_pool.max - lif->spec->port_pool.min + 1;
GPF("ports_free_%s_%s %i", lif->logical->name.s,
sockaddr_print_buf(&lif->spec->local_address.addr),
lif->spec->port_pool.free_ports_q.length);
GPF("ports_used_%s_%s %i", lif->logical->name.s,
sockaddr_print_buf(&lif->spec->local_address.addr),
num_ports - lif->spec->port_pool.free_ports_q.length);
}
mutex_lock(&rtpe_codec_stats_lock);
int last_tv_sec = rtpe_now / 1000000 - 1;
unsigned int idx = last_tv_sec & 1;
codec_stats_ht_iter iter;
t_hash_table_iter_init(&iter, rtpe_codec_stats);
char *chain;
struct codec_stats *stats_entry;
while (t_hash_table_iter_next(&iter, &chain, &stats_entry)) {
GPF("transcoder_%s %i", stats_entry->chain_brief,
g_atomic_int_get(&stats_entry->num_transcoders));
if (g_atomic_int_get(&stats_entry->last_tv_sec[idx]) != last_tv_sec)
continue;
GPF("transcoder_%s_packets %llu", stats_entry->chain_brief,
(unsigned long long) atomic64_get(&stats_entry->packets_input[idx]));
GPF("transcoder_%s_bytes %llu", stats_entry->chain_brief,
(unsigned long long) atomic64_get(&stats_entry->bytes_input[idx]));
GPF("transcoder_%s_samples %llu", stats_entry->chain_brief,
(unsigned long long) atomic64_get(&stats_entry->pcm_samples[idx]));
}
mutex_unlock(&rtpe_codec_stats_lock);
ilog(LOG_DEBUG, "min_sessions:%llu max_sessions:%llu, call_dur_per_interval:%.6f at time %" PRId64 "\n",
(unsigned long long) atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.min.total_sessions),
(unsigned long long) atomic64_get_na(&rtpe_gauge_graphite_min_max_sampled.max.total_sessions),
(double) atomic64_get_na(&rtpe_stats_graphite_diff.total_calls_duration_intv) / 1000000.0,
rtpe_now / 1000000);
return graph_str;
}
static int send_graphite_data(void) {
if (graphite_sock.fd < 0) {
ilog(LOG_ERROR,"Graphite socket is not connected.");
return -1;
}
GString *graph_str = print_graphite_data();
size_t sent = 0;
int blockings = 10; // let it block that many times
while (sent < graph_str->len) {
ssize_t rc = write(graphite_sock.fd, graph_str->str + sent, graph_str->len - sent);
if (rc<0) {
if (blockings <= 0 || (errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR)) {
ilog(LOG_ERROR,"Could not write to graphite socket (%s). " \
"Disconnecting graphite server.", strerror(errno));
goto error;
}
rc = 0;
}
if (rc == 0) {
// poor man's blocking handling
blockings--;
usleep(500000);
continue;
}
sent += rc;
}
g_string_free(graph_str, TRUE);
return 0;
error:
g_string_free(graph_str, TRUE);
close_socket(&graphite_sock);
return -1;
}
static void graphite_loop_run(endpoint_t *graphite_ep, int64_t microseconds) {
int rc=0;
struct pollfd wfds[1];
if (!graphite_ep) {
ilog(LOG_ERROR, "NULL graphite_ep");
return ;
}
if (connection_state == STATE_IN_PROGRESS && graphite_sock.fd >= 0) {
wfds[0].fd=graphite_sock.fd;
wfds[0].events = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
rc = poll(wfds,1,1000);
if (rc == -1) {
ilog(LOG_ERROR,"Error on the socket.");
close_socket(&graphite_sock);
connection_state = STATE_DISCONNECTED;
return;
} else if (rc==0) {
// timeout
return;
} else {
if (!(wfds[0].revents & POLLOUT)) {
ilog(LOG_WARN,"fd is active but not ready for writing, poll events=%x",wfds[0].revents);
close_socket(&graphite_sock);
connection_state = STATE_DISCONNECTED;
return;
}
rc = socket_error(&graphite_sock);
if (rc < 0) ilog(LOG_ERROR,"getsockopt failure.");
if (rc != 0) {
ilog(LOG_ERROR,"Socket connect failed. fd: %i, Reason: %s\n",graphite_sock.fd, strerror(rc));
close_socket(&graphite_sock);
connection_state = STATE_DISCONNECTED;
return;
}
ilog(LOG_INFO, "Graphite server connected.");
connection_state = STATE_CONNECTED;
next_run=0; // fake next run to skip sleep after reconnect
}
}
rtpe_now = now_us();
if (rtpe_now < next_run) {
usleep(100000); // XXX refactor using proper sleep time
return;
}
next_run = rtpe_now + microseconds;
if (graphite_sock.fd < 0 && connection_state == STATE_DISCONNECTED) {
connect_to_graphite_server(graphite_ep);
}
if (graphite_sock.fd >= 0 && connection_state == STATE_CONNECTED) {
add_total_calls_duration_in_interval(graphite_interval_tv);
rtpe_now = now_us();
rc = send_graphite_data();
if (rc < 0) {
ilog(LOG_ERROR,"Sending graphite data failed.");
close_socket(&graphite_sock);
connection_state = STATE_DISCONNECTED;
}
}
}
void graphite_loop(void *d) {
if (rtpe_config.graphite_interval <= 0) {
ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second.");
rtpe_config.graphite_interval=1;
}
connect_to_graphite_server(&rtpe_config.graphite_ep);
while (!rtpe_shutdown)
graphite_loop_run(&rtpe_config.graphite_ep, rtpe_config.graphite_interval * 1000000LL);
}