From 2ae0e35de4dfb040cc74d15e6ecb12dffab813b0 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Tue, 3 Feb 2015 12:23:33 +0100 Subject: [PATCH 01/12] Added functionality to report statistics to graphite --- README.md | 10 ++++++++++ daemon/Makefile | 2 +- daemon/call.c | 21 +++++++++++++++++-- daemon/call.h | 1 + daemon/main.c | 30 ++++++++++++++++++++++++++-- debian/ngcp-rtpengine-daemon.default | 4 +++- debian/ngcp-rtpengine-daemon.init | 2 ++ el/rtpengine.init | 10 ++++++++++ 8 files changed, 74 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 7931add40..9727cef63 100644 --- a/README.md +++ b/README.md @@ -184,6 +184,8 @@ option and which are reproduced below: -d, --delete-delay Delay for deleting a session from memory. --sip-source Use SIP source address by default --dtls-passive Always prefer DTLS passive role + -g, --graphite=[IP46:]PORT TCP address of graphite statistics server + -w, --graphite-interval=INT Graphite data statistics send interval Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local IP address through `--interface`, and at least one of the `--listen-...` options must be given. @@ -354,6 +356,14 @@ The options are described in more detail below. NGCP-specific options +* -g, --graphite + + Address of the graphite statistics server. + +* -w, --graphite-interval + + Interval of the time when information is sent to the graphite server. + A typical command line (enabling both UDP and NG protocols) thus may look like: /usr/sbin/rtpengine --table=0 --interface=10.64.73.31 --interface=2001:db8::4f3:3d \ diff --git a/daemon/Makefile b/daemon/Makefile index 001dd5a47..24d0637b6 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -63,7 +63,7 @@ endif SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ - crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c + crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c OBJS= $(SRCS:.c=.o) diff --git a/daemon/call.c b/daemon/call.c index 66ad7c393..6d0895fcc 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -2505,7 +2505,9 @@ void call_destroy(struct call *c) { (unsigned long long) ps->stats.bytes, (unsigned long long) ps->stats.errors); m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets; + m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets; m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors; + m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors; } } if (_log_facility_cdr) @@ -2514,6 +2516,7 @@ void call_destroy(struct call *c) { // --- for statistics getting one way stream or no relay at all m->totalstats.total_nopacket_relayed_sess *= 2; + m->totalstats_interval.total_nopacket_relayed_sess *= 2; for (l = c->monologues; l; l = l->next) { ml = l->data; @@ -2550,32 +2553,46 @@ void call_destroy(struct call *c) { } } - if (ps && ps2 && ps->stats.packets!=0 && ps2->stats.packets==0) + if (ps && ps2 && ps->stats.packets!=0 && ps2->stats.packets==0) { m->totalstats.total_oneway_stream_sess++; + m->totalstats_interval.total_oneway_stream_sess++; + } - if (ps && ps2 && ps->stats.packets==0 && ps2->stats.packets==0) + if (ps && ps2 && ps->stats.packets==0 && ps2->stats.packets==0) { m->totalstats.total_nopacket_relayed_sess++; + m->totalstats_interval.total_nopacket_relayed_sess++; + } } m->totalstats.total_nopacket_relayed_sess /= 2; + m->totalstats_interval.total_nopacket_relayed_sess /= 2; m->totalstats.total_managed_sess += 1; + m->totalstats_interval.total_managed_sess += 1; ml = c->monologues->data; if (ml->term_reason==TIMEOUT) { m->totalstats.total_timeout_sess++; + m->totalstats_interval.total_timeout_sess++; } else if (ml->term_reason==SILENT_TIMEOUT) { m->totalstats.total_silent_timeout_sess++; + m->totalstats_interval.total_silent_timeout_sess++; } else if (ml->term_reason==REGULAR) { m->totalstats.total_regular_term_sess++; + m->totalstats_interval.total_regular_term_sess++; } else if (ml->term_reason==FORCED) { m->totalstats.total_forced_term_sess++; + m->totalstats_interval.total_forced_term_sess++; } timeval_multiply(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess-1); timeval_add(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,&tim_result_duration); timeval_devide(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess); + timeval_multiply(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess-1); + timeval_add(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,&tim_result_duration); + timeval_devide(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess); + if (_log_facility_cdr) /* log it */ cdrlog(cdrbuffer); diff --git a/daemon/call.h b/daemon/call.h index bcf741fd9..fe9308fe5 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -411,6 +411,7 @@ struct callmaster { mutex_t statslock; struct stats stats; /* copied from statsps once a second */ struct totalstats totalstats; + struct totalstats totalstats_interval; struct poller *poller; pcre *info_re; diff --git a/daemon/main.c b/daemon/main.c index afc0e44dc..429fb8430 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -26,7 +26,7 @@ #include "dtls.h" #include "call_interfaces.h" #include "cli.h" - +#include "graphite.h" @@ -108,7 +108,9 @@ static char *b2b_url; static enum xmlrpc_format xmlrpc_fmt = XF_SEMS; static int num_threads; static int delete_delay = 30; - +static u_int32_t graphite_ip = NULL; +static u_int16_t graphite_port; +static int graphite_interval = 0; static void sighandler(gpointer x) { sigset_t ss; @@ -254,6 +256,7 @@ static void options(int *argc, char ***argv) { char *listenudps = NULL; char *listenngs = NULL; char *listencli = NULL; + char *graphitep = NULL; char *redisps = NULL; char *log_facility_s = NULL; char *log_facility_cdr_s = NULL; @@ -269,6 +272,8 @@ static void options(int *argc, char ***argv) { { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" }, + { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" }, + { "graphite-interval", 'w', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" }, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, @@ -331,6 +336,10 @@ static void options(int *argc, char ***argv) { die("Invalid IP or port (--listen-cli)"); } + if (graphitep) {if (parse_ip_port(&graphite_ip, &graphite_port, graphitep)) + die("Invalid IP or port (--graphite)"); + } + if (tos < 0 || tos > 255) die("Invalid TOS value"); @@ -606,6 +615,7 @@ no_kernel: callmaster_config_init(ctx->m); ZERO(ctx->m->totalstats); + ZERO(ctx->m->totalstats_interval); ctx->m->totalstats.started = time(NULL); if (!foreground) @@ -623,6 +633,20 @@ static void timer_loop(void *d) { poller_timers_wait_run(p, 100); } +static void graphite_loop(void *d) { + struct callmaster *cm = d; + + if (!graphite_interval) { + ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second."); + graphite_interval=1; + } + + connect_to_graphite_server(graphite_ip,graphite_port); + + while (!global_shutdown) + graphite_loop_run(cm,graphite_interval); // time in seconds +} + static void poller_loop(void *d) { struct poller *p = d; @@ -642,6 +666,8 @@ int main(int argc, char **argv) { thread_create_detach(sighandler, NULL); thread_create_detach(timer_loop, ctx.p); + if (graphite_ip) + thread_create_detach(graphite_loop, ctx.m); if (num_threads < 1) { #ifdef _SC_NPROCESSORS_ONLN diff --git a/debian/ngcp-rtpengine-daemon.default b/debian/ngcp-rtpengine-daemon.default index 20b659cb5..20b10c3d5 100644 --- a/debian/ngcp-rtpengine-daemon.default +++ b/debian/ngcp-rtpengine-daemon.default @@ -22,4 +22,6 @@ TABLE=0 # LOG_FACILITY=daemon # LOG_FACILITY_CDR=daemon # NUM_THREADS=5 -# DELETE_DELAY=30 \ No newline at end of file +# DELETE_DELAY=30 +# GRAPHITE=9006 +# GRAPHITE_INTERVAL=60 \ No newline at end of file diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index 651e17f6e..1c7e96f5c 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -72,6 +72,8 @@ OPTIONS="$OPTIONS --table=$TABLE" [ -z "$LOG_FACILITY_CDR" ] || OPTIONS="$OPTIONS --log-facility-cdr=$LOG_FACILITY_CDR" [ -z "$NUM_THREADS" ] || OPTIONS="$OPTIONS --num-threads=$NUM_THREADS" [ -z "$DELETE_DELAY" ] || OPTIONS="$OPTIONS --delete-delay=$DELETE_DELAY" +[ -z "$GRAPHITE" ] || OPTIONS="$OPTIONS --graphite=$GRAPHITE" +[ -z "$GRAPHITE_INTERVAL" ] || OPTIONS="$OPTIONS --graphite-interval=$GRAPHITE_INTERVAL" if test "$FORK" = "no" ; then OPTIONS="$OPTIONS --foreground" fi diff --git a/el/rtpengine.init b/el/rtpengine.init index f515d5341..1fb33ce62 100644 --- a/el/rtpengine.init +++ b/el/rtpengine.init @@ -143,6 +143,16 @@ build_opts() { OPTS+=" --delete-delay=$DELETE_DELAY" fi + if [[ -n "$GRAPHITE" ]] + then + OPTS+=" --graphite=$GRAPHITE" + fi + + if [[ -n "$GRAPHITE_INTERVAL" ]] + then + OPTS+=" --graphite-interval=$GRAPHITE_INTERVAL" + fi + if [[ -n "$LOG_FACILITY_CDR" ]] then OPTS+=" --log-facility-cdr=$LOG_FACILITY_CDR" From 7446822df6439e25d04eb456ff085ca2719b6fb1 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Tue, 3 Feb 2015 13:20:27 +0100 Subject: [PATCH 02/12] Forgot the files .. --- daemon/graphite.c | 129 ++++++++++++++++++++++++++++++++++++++++++++++ daemon/graphite.h | 17 ++++++ 2 files changed, 146 insertions(+) create mode 100644 daemon/graphite.c create mode 100644 daemon/graphite.h diff --git a/daemon/graphite.c b/daemon/graphite.c new file mode 100644 index 000000000..496ed1f3d --- /dev/null +++ b/daemon/graphite.c @@ -0,0 +1,129 @@ +/* + * graphite.c + * + * Created on: Jan 19, 2015 + * Author: fmetz + */ +#include +#include +#include +#include +#include + +#include "log.h" +#include "call.h" + +int graphite_sock=0; +u_int32_t graphite_ipaddress; +int graphite_port=0; +struct callmaster* cm=0; +struct totalstats totalstats_prev; + +int connect_to_graphite_server(u_int32_t ipaddress, int port) { + + graphite_sock=0; + int reconnect=0, MAXRECONNECTS=5; + int rc=0; + struct sockaddr_in sin; + memset(&sin,0,sizeof(sin)); + int val=1; + + graphite_ipaddress = ipaddress; + graphite_port = port; + + rc = graphite_sock = socket(AF_INET, SOCK_STREAM,0); + if(rc<0) { + ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite."); + return -1; + } + + sin.sin_family=AF_INET; + sin.sin_addr.s_addr=graphite_ipaddress; + sin.sin_port=htons(graphite_port); + + rc = setsockopt(graphite_sock,SOL_SOCKET,SO_REUSEADDR, &val,sizeof(val)); + if(rc<0) { + ilog(LOG_ERROR,"Couldn't set sockopt for graphite descriptor."); + return -1; + } + + while (reconnecttotalstats_interval.total_average_call_dur.tv_sec,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %i %u\n",hostname, cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.forced_term_sess %i %u\n",hostname, cm->totalstats_interval.total_forced_term_sess,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.managed_sess %i %u\n",hostname, cm->totalstats_interval.total_managed_sess,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %i %u\n",hostname, cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.oneway_stream_sess %i %u\n",hostname, cm->totalstats_interval.total_oneway_stream_sess,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.regular_term_sess %i %u\n",hostname, cm->totalstats_interval.total_regular_term_sess,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_errors %i %u\n",hostname, cm->totalstats_interval.total_relayed_errors,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_packets %i %u\n",hostname, cm->totalstats_interval.total_relayed_packets,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.silent_timeout_sess %i %u\n",hostname, cm->totalstats_interval.total_silent_timeout_sess,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.timeout_sess %i %u\n",hostname, cm->totalstats_interval.total_timeout_sess,(unsigned)time(NULL)); ptr += rc; + + ZERO(cm->totalstats_interval); + + rc = write(graphite_sock, data_to_send, strlen(data_to_send)); + if (rc<0) { + ilog(LOG_ERROR,"Could not write to graphite socket. Disconnecting graphite server."); + close(graphite_sock); graphite_sock=0; + return -1; + } + return 0; +} + +void graphite_loop_run(struct callmaster* callmaster, int seconds) { + + int rc=0; + + if (!cm) + cm = callmaster; + + rc = send_graphite_data(); + if (rc<0) { + rc = connect_to_graphite_server(graphite_ipaddress, graphite_port); + if (rc<0) { + return; + } + } + sleep(seconds); +} diff --git a/daemon/graphite.h b/daemon/graphite.h new file mode 100644 index 000000000..0b51e7b9a --- /dev/null +++ b/daemon/graphite.h @@ -0,0 +1,17 @@ +/* + * graphite.h + * + * Created on: Jan 19, 2015 + * Author: fmetz + */ + +#ifndef GRAPHITE_H_ +#define GRAPHITE_H_ + +#include "call.h" + +int connect_to_graphite_server(u_int32_t ipaddress, int port); +int send_graphite_data(); +void graphite_loop_run(struct callmaster* cm, int seconds); + +#endif /* GRAPHITE_H_ */ From 61a72b190f3067324cd7d1a5c9f031afa8626428 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Tue, 3 Feb 2015 14:20:23 +0100 Subject: [PATCH 03/12] Added some more statistics. RTPENGINE-3. RTCP omitted since it has to be parsed manually --- daemon/call.c | 13 +++++++++---- daemon/cli.c | 8 +++++--- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 6d0895fcc..6b4d5f712 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -2438,6 +2438,8 @@ void call_destroy(struct call *c) { /* CDRs and statistics */ cdrbufcur += sprintf(cdrbufcur,"ci=%s, ",c->callid.s); cdrbufcur += sprintf(cdrbufcur,"created_from=%s, ", c->created_from); + cdrbufcur += sprintf(cdrbufcur,"last_signal=%llu, ", (unsigned long long)c->last_signal); + cdrbufcur += sprintf(cdrbufcur,"tos=%u, ", (unsigned int)c->tos); for (l = c->monologues; l; l = l->next) { ml = l->data; if (_log_facility_cdr) { @@ -2486,24 +2488,27 @@ void call_destroy(struct call *c) { "ml%i_midx%u_%s_local_relay_port=%u, " "ml%i_midx%u_%s_relayed_packets=%llu, " "ml%i_midx%u_%s_relayed_bytes=%llu, " - "ml%i_midx%u_%s_relayed_errors=%llu, ", + "ml%i_midx%u_%s_relayed_errors=%llu, " + "ml%i_midx%u_%s_last_packet=%llu, ", cdrlinecnt, md->index, protocol, buf, cdrlinecnt, md->index, protocol, ps->endpoint.port, cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets, cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes, - cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors); + cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors, + cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet); } ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, " - "%llu p, %llu b, %llu e", + "%llu p, %llu b, %llu e, %llu last_packet", md->index, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), buf, ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", (unsigned long long) ps->stats.packets, (unsigned long long) ps->stats.bytes, - (unsigned long long) ps->stats.errors); + (unsigned long long) ps->stats.errors, + (unsigned long long) ps->last_packet); m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets; m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets; m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors; diff --git a/daemon/cli.c b/daemon/cli.c index 162d6274d..fb32996bb 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -81,7 +81,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m return; } - printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %30s | deletionmark:%4s | created:%12i | proxy:%s\n\n", c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from); + printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %30s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu\n\n", + c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal); ADJUSTLEN(printlen,outbufend,replybuffer); for (l = c->monologues; l; l = l->next) { @@ -113,14 +114,15 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m smart_ntop_p(buf, &ps->endpoint.ip46, sizeof(buf)); printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " - "%llu p, %llu b, %llu e\n", + "%llu p, %llu b, %llu e, %llu last_packet\n", md->index, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), buf, ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", (unsigned long long) ps->stats.packets, (unsigned long long) ps->stats.bytes, - (unsigned long long) ps->stats.errors); + (unsigned long long) ps->stats.errors, + (unsigned long long) ps->last_packet); ADJUSTLEN(printlen,outbufend,replybuffer); } } From 0ccb11a18d328a209a53f1606e5bee55c145aac5 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Tue, 3 Feb 2015 19:08:13 +0100 Subject: [PATCH 04/12] Added statistics on control ng interface. Seen by using 'rtpengine-ctl list totals' --- daemon/call.h | 3 ++- daemon/cli.c | 29 ++++++++++++++++++++ daemon/control_ng.c | 65 ++++++++++++++++++++++++++++++++++++++++----- daemon/control_ng.h | 12 +++++++++ daemon/main.c | 2 ++ 5 files changed, 104 insertions(+), 7 deletions(-) diff --git a/daemon/call.h b/daemon/call.h index fe9308fe5..2c946f062 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -11,7 +11,7 @@ #include #include #include "compat.h" - +#include "control_ng.h" enum termination_reason { UNKNOWN=0, @@ -412,6 +412,7 @@ struct callmaster { struct stats stats; /* copied from statsps once a second */ struct totalstats totalstats; struct totalstats totalstats_interval; + struct control_ng_stats* control_ng_stats; struct poller *poller; pcre *info_re; diff --git a/daemon/cli.c b/daemon/cli.c index fb32996bb..f7547914b 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -49,6 +49,35 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec); ADJUSTLEN(printlen,outbufend,replybuffer); + printlen = snprintf(replybuffer,(outbufend-replybuffer), "Control statistics:\n\n"); + ADJUSTLEN(printlen,outbufend,replybuffer); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s \n", + "Proxy", "Offer", "Answer", "Delete", "Ping", "List", "Query", "Errors"); + ADJUSTLEN(printlen,outbufend,replybuffer); + struct control_ng_stats* cur = m->control_ng_stats; + + if (!cur) { + printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n No proxies have yet tried to send data."); + ADJUSTLEN(printlen,outbufend,replybuffer); + } + while (cur) { + char buf[128]; memset(&buf,0,128); + smart_ntop_p(buf, &(cur->proxy.sin6_addr), sizeof(buf)); + + printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10u | %10u | %10u | %10u | %10u | %10u | %10u \n", + buf, + cur->offer, + cur->answer, + cur->delete, + cur->ping, + cur->list, + cur->query, + cur->errors); + ADJUSTLEN(printlen,outbufend,replybuffer); + cur = cur->next; + } + printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n\n"); + ADJUSTLEN(printlen,outbufend,replybuffer); } static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { diff --git a/daemon/control_ng.c b/daemon/control_ng.c index d568520b0..ca42fc534 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -56,6 +56,44 @@ static void pretty_print(bencode_item_t *el, GString *s) { } } +struct control_ng_stats* get_control_ng_stats(struct control_ng* c, struct sockaddr_in6* sin) { + + // seems to be the first address + if (!c->callmaster->control_ng_stats) { + c->callmaster->control_ng_stats = malloc(sizeof(struct control_ng_stats)); + memset(c->callmaster->control_ng_stats,0,sizeof(struct control_ng_stats)); + memcpy(&(c->callmaster->control_ng_stats->proxy),sin,sizeof(struct sockaddr_in6)); + char buf[128]; memset(&buf,0,128); + smart_ntop_p(buf, &sin->sin6_addr, sizeof(buf)); + ilog(LOG_INFO,"Adding a first proxy for control ng stats:%s\n",buf); + return c->callmaster->control_ng_stats; + } + + struct control_ng_stats* cur = c->callmaster->control_ng_stats; + struct control_ng_stats* last; + + while (cur) { + last = cur; + if (memcmp((const void*)(&(cur->proxy.sin6_addr)),(const void*)(&(sin->sin6_addr)),sizeof(struct in6_addr))==0) { + ilog(LOG_DEBUG,"Already found proxy for control ng stats.\n"); + return cur; + } + cur = cur->next; + } + + // add a new one + char buf[128]; memset(&buf,0,128); + smart_ntop_p(buf, &sin->sin6_addr, sizeof(buf)); + ilog(LOG_INFO,"Adding a new proxy for control ng stats:%s\n",buf); + + cur = malloc(sizeof(struct control_ng_stats)); + memset(cur,0,sizeof(struct control_ng_stats)); + memcpy(cur,sin,sizeof(struct sockaddr_in6)); + last->next = cur; + + return cur; +} + static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *sin, char *addr) { struct control_ng *c = (void *) obj; bencode_buffer_t bencbuf; @@ -66,6 +104,8 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 * struct iovec iov[3]; GString *log_str; + struct control_ng_stats* cur = get_control_ng_stats(c,sin); + str_chr_str(&data, buf, ' '); if (!data.s || data.s == buf->s) { ilog(LOG_WARNING, "Received invalid data on NG port (no cookie) from %s: "STR_FORMAT, addr, STR_FMT(buf)); @@ -108,19 +148,31 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 * g_string_free(log_str, TRUE); errstr = NULL; - if (!str_cmp(&cmd, "ping")) + if (!str_cmp(&cmd, "ping")) { bencode_dictionary_add_string(resp, "result", "pong"); - else if (!str_cmp(&cmd, "offer")) + cur->ping++; + } + else if (!str_cmp(&cmd, "offer")) { errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin); - else if (!str_cmp(&cmd, "answer")) + cur->offer++; + } + else if (!str_cmp(&cmd, "answer")) { errstr = call_answer_ng(dict, c->callmaster, resp); - else if (!str_cmp(&cmd, "delete")) + cur->answer++; + } + else if (!str_cmp(&cmd, "delete")) { errstr = call_delete_ng(dict, c->callmaster, resp); - else if (!str_cmp(&cmd, "query")) + cur->delete++; + } + else if (!str_cmp(&cmd, "query")) { errstr = call_query_ng(dict, c->callmaster, resp); + cur->query++; + } #if GLIB_CHECK_VERSION(2,16,0) - else if (!str_cmp(&cmd, "list")) + else if (!str_cmp(&cmd, "list")) { errstr = call_list_ng(dict, c->callmaster, resp); + cur->list++; + } #endif else errstr = "Unrecognized command"; @@ -134,6 +186,7 @@ err_send: ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data)); bencode_dictionary_add_string(resp, "result", "error"); bencode_dictionary_add_string(resp, "error-reason", errstr); + cur->errors++; goto send_resp; send_resp: diff --git a/daemon/control_ng.h b/daemon/control_ng.h index e332f64b2..9851ac48a 100644 --- a/daemon/control_ng.h +++ b/daemon/control_ng.h @@ -9,6 +9,18 @@ struct poller; struct callmaster; +struct control_ng_stats { + struct sockaddr_in6 proxy; + u_int32_t ping; + u_int32_t offer; + u_int32_t answer; + u_int32_t delete; + u_int32_t query; + u_int32_t list; + u_int32_t errors; + struct control_ng_stats* next; +}; + struct control_ng { struct obj obj; struct callmaster *callmaster; diff --git a/daemon/main.c b/daemon/main.c index 429fb8430..560615ee2 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -624,6 +624,8 @@ no_kernel: if (redis_restore(ctx->m, mc.redis)) die("Refusing to continue without working Redis database"); + + ZERO(ctx->m->control_ng_stats); } static void timer_loop(void *d) { From 794709f71aa7f59ee1a382b9f1d4187bd1251f4e Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Tue, 3 Feb 2015 19:34:59 +0100 Subject: [PATCH 05/12] Changed retry behaviour for connecting to graphite server. --- daemon/graphite.c | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/daemon/graphite.c b/daemon/graphite.c index 496ed1f3d..74108fc7e 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -22,7 +22,7 @@ struct totalstats totalstats_prev; int connect_to_graphite_server(u_int32_t ipaddress, int port) { graphite_sock=0; - int reconnect=0, MAXRECONNECTS=5; + int reconnect=0; int rc=0; struct sockaddr_in sin; memset(&sin,0,sizeof(sin)); @@ -47,25 +47,18 @@ int connect_to_graphite_server(u_int32_t ipaddress, int port) { return -1; } - while (reconnect=0) { + rc = send_graphite_data(); if (rc<0) { - return; + ilog(LOG_ERROR,"Sending graphite data failed."); + graphite_sock=0; } } + sleep(seconds); } From 57c0a84d8122a65e1244ab7a67e4a6d083b6ef56 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sun, 8 Feb 2015 10:12:19 -0500 Subject: [PATCH 06/12] add locking to totalstats --- daemon/call.c | 32 ++++++++++++++++++++++++-------- daemon/call.h | 1 + daemon/cli.c | 6 ++++++ daemon/graphite.c | 4 ++++ daemon/main.c | 4 ---- 5 files changed, 35 insertions(+), 12 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 6b4d5f712..f484dfa4a 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1423,6 +1423,9 @@ struct callmaster *callmaster_new(struct poller *p) { poller_add_timer(p, callmaster_timer, &c->obj); + mutex_init(&c->totalstats_lock); + c->totalstats.started = poller_now; + return c; fail: @@ -2509,10 +2512,13 @@ void call_destroy(struct call *c) { (unsigned long long) ps->stats.bytes, (unsigned long long) ps->stats.errors, (unsigned long long) ps->last_packet); + + mutex_lock(&m->totalstats_lock); m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets; m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets; m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors; m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors; + mutex_unlock(&m->totalstats_lock); } } if (_log_facility_cdr) @@ -2520,8 +2526,11 @@ void call_destroy(struct call *c) { } // --- for statistics getting one way stream or no relay at all + mutex_lock(&m->totalstats_lock); m->totalstats.total_nopacket_relayed_sess *= 2; m->totalstats_interval.total_nopacket_relayed_sess *= 2; + mutex_unlock(&m->totalstats_lock); + for (l = c->monologues; l; l = l->next) { ml = l->data; @@ -2558,17 +2567,22 @@ void call_destroy(struct call *c) { } } - if (ps && ps2 && ps->stats.packets!=0 && ps2->stats.packets==0) { - m->totalstats.total_oneway_stream_sess++; - m->totalstats_interval.total_oneway_stream_sess++; + if (ps && ps2 && ps2->stats.packets==0) { + mutex_lock(&m->totalstats_lock); + if (ps->stats.packets!=0) { + m->totalstats.total_oneway_stream_sess++; + m->totalstats_interval.total_oneway_stream_sess++; + } + else { + m->totalstats.total_nopacket_relayed_sess++; + m->totalstats_interval.total_nopacket_relayed_sess++; + } + mutex_unlock(&m->totalstats_lock); } + } - if (ps && ps2 && ps->stats.packets==0 && ps2->stats.packets==0) { - m->totalstats.total_nopacket_relayed_sess++; - m->totalstats_interval.total_nopacket_relayed_sess++; - } + mutex_lock(&m->totalstats_lock); - } m->totalstats.total_nopacket_relayed_sess /= 2; m->totalstats_interval.total_nopacket_relayed_sess /= 2; @@ -2598,6 +2612,8 @@ void call_destroy(struct call *c) { timeval_add(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,&tim_result_duration); timeval_devide(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess); + mutex_unlock(&m->totalstats_lock); + if (_log_facility_cdr) /* log it */ cdrlog(cdrbuffer); diff --git a/daemon/call.h b/daemon/call.h index 2c946f062..ddd9e8778 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -410,6 +410,7 @@ struct callmaster { struct stats statsps; /* per second stats, running timer */ mutex_t statslock; struct stats stats; /* copied from statsps once a second */ + mutex_t totalstats_lock; /* for both of them */ struct totalstats totalstats; struct totalstats totalstats_interval; struct control_ng_stats* control_ng_stats; diff --git a/daemon/cli.c b/daemon/cli.c index f7547914b..3b72cd6e7 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -25,6 +25,9 @@ static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { int printlen=0; + + mutex_lock(&m->totalstats_lock); + printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nTotal statistics (does not include current running sessions):\n\n"); ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Uptime of rtpengine :%llu seconds\n", (unsigned long long)time(NULL)-m->totalstats.started); @@ -49,6 +52,9 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec); ADJUSTLEN(printlen,outbufend,replybuffer); + + mutex_unlock(&m->totalstats_lock); + printlen = snprintf(replybuffer,(outbufend-replybuffer), "Control statistics:\n\n"); ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s \n", diff --git a/daemon/graphite.c b/daemon/graphite.c index 74108fc7e..702c2ce71 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -81,6 +81,8 @@ int send_graphite_data() { char data_to_send[8192]; memset(&data_to_send,0,8192); char* ptr = data_to_send; + mutex_lock(&cm->totalstats_lock); + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %i %u\n",hostname, cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned)time(NULL)); ptr += rc; rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %i %u\n",hostname, cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned)time(NULL)); ptr += rc; rc = sprintf(ptr,"%s.totals.forced_term_sess %i %u\n",hostname, cm->totalstats_interval.total_forced_term_sess,(unsigned)time(NULL)); ptr += rc; @@ -95,6 +97,8 @@ int send_graphite_data() { ZERO(cm->totalstats_interval); + mutex_unlock(&cm->totalstats_lock); + rc = write(graphite_sock, data_to_send, strlen(data_to_send)); if (rc<0) { ilog(LOG_ERROR,"Could not write to graphite socket. Disconnecting graphite server."); diff --git a/daemon/main.c b/daemon/main.c index 560615ee2..d46667597 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -614,10 +614,6 @@ no_kernel: ctx->m->conf = mc; callmaster_config_init(ctx->m); - ZERO(ctx->m->totalstats); - ZERO(ctx->m->totalstats_interval); - ctx->m->totalstats.started = time(NULL); - if (!foreground) daemonize(); wpidfile(); From f5444718d4f003e19f2924e2d8ec5a43f84f7e49 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sun, 8 Feb 2015 10:19:30 -0500 Subject: [PATCH 07/12] fix compiler warnings --- daemon/graphite.c | 24 ++++++++++++------------ daemon/main.c | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/daemon/graphite.c b/daemon/graphite.c index 702c2ce71..ad6f3a768 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -22,7 +22,7 @@ struct totalstats totalstats_prev; int connect_to_graphite_server(u_int32_t ipaddress, int port) { graphite_sock=0; - int reconnect=0; + //int reconnect=0; int rc=0; struct sockaddr_in sin; memset(&sin,0,sizeof(sin)); @@ -83,17 +83,17 @@ int send_graphite_data() { mutex_lock(&cm->totalstats_lock); - rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %i %u\n",hostname, cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %i %u\n",hostname, cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.forced_term_sess %i %u\n",hostname, cm->totalstats_interval.total_forced_term_sess,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.managed_sess %i %u\n",hostname, cm->totalstats_interval.total_managed_sess,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %i %u\n",hostname, cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.oneway_stream_sess %i %u\n",hostname, cm->totalstats_interval.total_oneway_stream_sess,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.regular_term_sess %i %u\n",hostname, cm->totalstats_interval.total_regular_term_sess,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.relayed_errors %i %u\n",hostname, cm->totalstats_interval.total_relayed_errors,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.relayed_packets %i %u\n",hostname, cm->totalstats_interval.total_relayed_packets,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.silent_timeout_sess %i %u\n",hostname, cm->totalstats_interval.total_silent_timeout_sess,(unsigned)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.timeout_sess %i %u\n",hostname, cm->totalstats_interval.total_timeout_sess,(unsigned)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.forced_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_forced_term_sess,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.managed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_managed_sess,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.oneway_stream_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_oneway_stream_sess,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.regular_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_regular_term_sess,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_errors %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_errors,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_packets %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_packets,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.silent_timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_silent_timeout_sess,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_timeout_sess,(unsigned long long)time(NULL)); ptr += rc; ZERO(cm->totalstats_interval); diff --git a/daemon/main.c b/daemon/main.c index d46667597..b03328e07 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -108,7 +108,7 @@ static char *b2b_url; static enum xmlrpc_format xmlrpc_fmt = XF_SEMS; static int num_threads; static int delete_delay = 30; -static u_int32_t graphite_ip = NULL; +static u_int32_t graphite_ip = 0; static u_int16_t graphite_port; static int graphite_interval = 0; From 36c7141d53e96d9e6d93ef8c1c90a382842c6225 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sun, 8 Feb 2015 11:17:20 -0500 Subject: [PATCH 08/12] change control_ng_stats into a hash and use locking and atomic ops --- daemon/aux.c | 10 ++++++++ daemon/aux.h | 11 +++++--- daemon/call.c | 3 +++ daemon/call.h | 4 ++- daemon/cli.c | 14 ++++++---- daemon/control_ng.c | 62 +++++++++++++++------------------------------ daemon/control_ng.h | 18 ++++++------- daemon/main.c | 2 -- 8 files changed, 62 insertions(+), 62 deletions(-) diff --git a/daemon/aux.c b/daemon/aux.c index c09d0c918..8856f7e76 100644 --- a/daemon/aux.c +++ b/daemon/aux.c @@ -192,3 +192,13 @@ void thread_create_detach(void (*f)(void *), void *d) { if (thread_create(thread_detach_func, dt, 1, NULL)) abort(); } + +unsigned int in6_addr_hash(const void *p) { + const struct in6_addr *a = p; + return a->s6_addr32[0] ^ a->s6_addr32[3]; +} + +int in6_addr_eq(const void *a, const void *b) { + const struct in6_addr *A = a, *B = b; + return !memcmp(A, B, sizeof(*A)); +} diff --git a/daemon/aux.h b/daemon/aux.h index 0c6b2b2c3..21bb0498b 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -144,7 +144,7 @@ INLINE u_int32_t in6_to_4(const struct in6_addr *a) { return a->s6_addr32[3]; } -INLINE void smart_ntop(char *o, struct in6_addr *a, size_t len) { +INLINE void smart_ntop(char *o, const struct in6_addr *a, size_t len) { const char *r; if (IN6_IS_ADDR_V4MAPPED(a)) @@ -156,7 +156,7 @@ INLINE void smart_ntop(char *o, struct in6_addr *a, size_t len) { *o = '\0'; } -INLINE char *smart_ntop_p(char *o, struct in6_addr *a, size_t len) { +INLINE char *smart_ntop_p(char *o, const struct in6_addr *a, size_t len) { int l; if (IN6_IS_ADDR_V4MAPPED(a)) { @@ -178,7 +178,7 @@ INLINE char *smart_ntop_p(char *o, struct in6_addr *a, size_t len) { } } -INLINE void smart_ntop_port(char *o, struct sockaddr_in6 *a, size_t len) { +INLINE void smart_ntop_port(char *o, const struct sockaddr_in6 *a, size_t len) { char *e; e = smart_ntop_p(o, &a->sin6_addr, len); @@ -483,4 +483,9 @@ INLINE void g_queue_append(GQueue *dst, const GQueue *src) { g_queue_push_tail(dst, l->data); } + + +unsigned int in6_addr_hash(const void *p); +int in6_addr_eq(const void *a, const void *b); + #endif diff --git a/daemon/call.c b/daemon/call.c index f484dfa4a..8c9beb260 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1426,6 +1426,9 @@ struct callmaster *callmaster_new(struct poller *p) { mutex_init(&c->totalstats_lock); c->totalstats.started = poller_now; + mutex_init(&c->cngs_lock); + c->cngs_hash = g_hash_table_new(in6_addr_hash, in6_addr_eq); + return c; fail: diff --git a/daemon/call.h b/daemon/call.h index ddd9e8778..8be512487 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -413,7 +413,9 @@ struct callmaster { mutex_t totalstats_lock; /* for both of them */ struct totalstats totalstats; struct totalstats totalstats_interval; - struct control_ng_stats* control_ng_stats; + /* control_ng_stats stuff */ + mutex_t cngs_lock; + GHashTable *cngs_hash; struct poller *poller; pcre *info_re; diff --git a/daemon/cli.c b/daemon/cli.c index 3b72cd6e7..dfa0c6744 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -60,15 +60,18 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s \n", "Proxy", "Offer", "Answer", "Delete", "Ping", "List", "Query", "Errors"); ADJUSTLEN(printlen,outbufend,replybuffer); - struct control_ng_stats* cur = m->control_ng_stats; - if (!cur) { + mutex_lock(&m->cngs_lock); + GList *list = g_hash_table_get_values(m->cngs_hash); + + if (!list) { printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n No proxies have yet tried to send data."); ADJUSTLEN(printlen,outbufend,replybuffer); } - while (cur) { + for (GList *l = list; l; l = l->next) { + struct control_ng_stats* cur = l->data; char buf[128]; memset(&buf,0,128); - smart_ntop_p(buf, &(cur->proxy.sin6_addr), sizeof(buf)); + smart_ntop_p(buf, &(cur->proxy), sizeof(buf)); printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10u | %10u | %10u | %10u | %10u | %10u | %10u \n", buf, @@ -80,10 +83,11 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m cur->query, cur->errors); ADJUSTLEN(printlen,outbufend,replybuffer); - cur = cur->next; } printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n\n"); ADJUSTLEN(printlen,outbufend,replybuffer); + mutex_unlock(&m->cngs_lock); + g_list_free(list); } static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { diff --git a/daemon/control_ng.c b/daemon/control_ng.c index ca42fc534..911479bea 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -56,41 +56,21 @@ static void pretty_print(bencode_item_t *el, GString *s) { } } -struct control_ng_stats* get_control_ng_stats(struct control_ng* c, struct sockaddr_in6* sin) { - - // seems to be the first address - if (!c->callmaster->control_ng_stats) { - c->callmaster->control_ng_stats = malloc(sizeof(struct control_ng_stats)); - memset(c->callmaster->control_ng_stats,0,sizeof(struct control_ng_stats)); - memcpy(&(c->callmaster->control_ng_stats->proxy),sin,sizeof(struct sockaddr_in6)); +struct control_ng_stats* get_control_ng_stats(struct control_ng* c, const struct in6_addr *addr) { + struct callmaster *m = c->callmaster; + struct control_ng_stats* cur; + + mutex_lock(&m->cngs_lock); + cur = g_hash_table_lookup(m->cngs_hash, addr); + if (!cur) { + cur = g_slice_alloc0(sizeof(struct control_ng_stats)); + cur->proxy = *addr; char buf[128]; memset(&buf,0,128); - smart_ntop_p(buf, &sin->sin6_addr, sizeof(buf)); - ilog(LOG_INFO,"Adding a first proxy for control ng stats:%s\n",buf); - return c->callmaster->control_ng_stats; - } - - struct control_ng_stats* cur = c->callmaster->control_ng_stats; - struct control_ng_stats* last; - - while (cur) { - last = cur; - if (memcmp((const void*)(&(cur->proxy.sin6_addr)),(const void*)(&(sin->sin6_addr)),sizeof(struct in6_addr))==0) { - ilog(LOG_DEBUG,"Already found proxy for control ng stats.\n"); - return cur; - } - cur = cur->next; + smart_ntop_p(buf, addr, sizeof(buf)); + ilog(LOG_DEBUG,"Adding a proxy for control ng stats:%s",buf); + g_hash_table_insert(m->cngs_hash, &cur->proxy, cur); } - - // add a new one - char buf[128]; memset(&buf,0,128); - smart_ntop_p(buf, &sin->sin6_addr, sizeof(buf)); - ilog(LOG_INFO,"Adding a new proxy for control ng stats:%s\n",buf); - - cur = malloc(sizeof(struct control_ng_stats)); - memset(cur,0,sizeof(struct control_ng_stats)); - memcpy(cur,sin,sizeof(struct sockaddr_in6)); - last->next = cur; - + mutex_unlock(&m->cngs_lock); return cur; } @@ -104,7 +84,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 * struct iovec iov[3]; GString *log_str; - struct control_ng_stats* cur = get_control_ng_stats(c,sin); + struct control_ng_stats* cur = get_control_ng_stats(c,&sin->sin6_addr); str_chr_str(&data, buf, ' '); if (!data.s || data.s == buf->s) { @@ -150,28 +130,28 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 * errstr = NULL; if (!str_cmp(&cmd, "ping")) { bencode_dictionary_add_string(resp, "result", "pong"); - cur->ping++; + g_atomic_int_inc(&cur->ping); } else if (!str_cmp(&cmd, "offer")) { errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin); - cur->offer++; + g_atomic_int_inc(&cur->offer); } else if (!str_cmp(&cmd, "answer")) { errstr = call_answer_ng(dict, c->callmaster, resp); - cur->answer++; + g_atomic_int_inc(&cur->answer); } else if (!str_cmp(&cmd, "delete")) { errstr = call_delete_ng(dict, c->callmaster, resp); - cur->delete++; + g_atomic_int_inc(&cur->delete); } else if (!str_cmp(&cmd, "query")) { errstr = call_query_ng(dict, c->callmaster, resp); - cur->query++; + g_atomic_int_inc(&cur->query); } #if GLIB_CHECK_VERSION(2,16,0) else if (!str_cmp(&cmd, "list")) { errstr = call_list_ng(dict, c->callmaster, resp); - cur->list++; + g_atomic_int_inc(&cur->list); } #endif else @@ -186,7 +166,7 @@ err_send: ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data)); bencode_dictionary_add_string(resp, "result", "error"); bencode_dictionary_add_string(resp, "error-reason", errstr); - cur->errors++; + g_atomic_int_inc(&cur->errors); goto send_resp; send_resp: diff --git a/daemon/control_ng.h b/daemon/control_ng.h index 9851ac48a..611ab51e0 100644 --- a/daemon/control_ng.h +++ b/daemon/control_ng.h @@ -10,15 +10,14 @@ struct poller; struct callmaster; struct control_ng_stats { - struct sockaddr_in6 proxy; - u_int32_t ping; - u_int32_t offer; - u_int32_t answer; - u_int32_t delete; - u_int32_t query; - u_int32_t list; - u_int32_t errors; - struct control_ng_stats* next; + struct in6_addr proxy; + int ping; + int offer; + int answer; + int delete; + int query; + int list; + int errors; }; struct control_ng { @@ -30,5 +29,4 @@ struct control_ng { struct control_ng *control_ng_new(struct poller *, struct in6_addr, u_int16_t, struct callmaster *); - #endif diff --git a/daemon/main.c b/daemon/main.c index b03328e07..6958cf3e1 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -620,8 +620,6 @@ no_kernel: if (redis_restore(ctx->m, mc.redis)) die("Refusing to continue without working Redis database"); - - ZERO(ctx->m->control_ng_stats); } static void timer_loop(void *d) { From c0b2f3debd5effb3883f2d283a260c5300c20cbb Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sun, 8 Feb 2015 15:34:01 -0500 Subject: [PATCH 09/12] fix graphite code not to leak fds in error cases --- daemon/graphite.c | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/daemon/graphite.c b/daemon/graphite.c index ad6f3a768..b3637d269 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -13,7 +13,7 @@ #include "log.h" #include "call.h" -int graphite_sock=0; +int graphite_sock=-1; u_int32_t graphite_ipaddress; int graphite_port=0; struct callmaster* cm=0; @@ -21,7 +21,7 @@ struct totalstats totalstats_prev; int connect_to_graphite_server(u_int32_t ipaddress, int port) { - graphite_sock=0; + graphite_sock=-1; //int reconnect=0; int rc=0; struct sockaddr_in sin; @@ -44,7 +44,7 @@ int connect_to_graphite_server(u_int32_t ipaddress, int port) { rc = setsockopt(graphite_sock,SOL_SOCKET,SO_REUSEADDR, &val,sizeof(val)); if(rc<0) { ilog(LOG_ERROR,"Couldn't set sockopt for graphite descriptor."); - return -1; + goto error; } struct in_addr ip; @@ -53,19 +53,24 @@ int connect_to_graphite_server(u_int32_t ipaddress, int port) { rc = connect(graphite_sock, (struct sockaddr *)&sin, sizeof(sin)); if (rc==-1) { ilog(LOG_ERROR, "Connection could not be established. Trying again next time of graphite-interval."); - return -1; + goto error; } ilog(LOG_INFO, "Graphite server connected."); return graphite_sock; + +error: + close(graphite_sock); + graphite_sock = -1; + return -1; } int send_graphite_data() { int rc=0; - if (!graphite_sock) { + if (graphite_sock < 0) { ilog(LOG_ERROR,"Graphite socket is not connected."); return -1; } @@ -75,7 +80,7 @@ int send_graphite_data() { rc = gethostname(hostname,256); if (rc<0) { ilog(LOG_ERROR, "Could not retrieve host name information."); - return -1; + goto error; } char data_to_send[8192]; memset(&data_to_send,0,8192); @@ -102,10 +107,13 @@ int send_graphite_data() { rc = write(graphite_sock, data_to_send, strlen(data_to_send)); if (rc<0) { ilog(LOG_ERROR,"Could not write to graphite socket. Disconnecting graphite server."); - close(graphite_sock); graphite_sock=0; - return -1; + goto error; } return 0; + +error: + close(graphite_sock); graphite_sock=-1; + return -1; } void graphite_loop_run(struct callmaster* callmaster, int seconds) { @@ -115,7 +123,7 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { if (!cm) cm = callmaster; - if (!graphite_sock) { + if (graphite_sock < 0) { rc = connect_to_graphite_server(graphite_ipaddress, graphite_port); } @@ -123,7 +131,6 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { rc = send_graphite_data(); if (rc<0) { ilog(LOG_ERROR,"Sending graphite data failed."); - graphite_sock=0; } } From 965d989c93763e20ea85c12b074456808796eb49 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sun, 8 Feb 2015 15:35:15 -0500 Subject: [PATCH 10/12] static'ize graphite global vars --- daemon/graphite.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/daemon/graphite.c b/daemon/graphite.c index b3637d269..5ef134387 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -13,11 +13,11 @@ #include "log.h" #include "call.h" -int graphite_sock=-1; -u_int32_t graphite_ipaddress; -int graphite_port=0; -struct callmaster* cm=0; -struct totalstats totalstats_prev; +static int graphite_sock=-1; +static u_int32_t graphite_ipaddress; +static int graphite_port=0; +static struct callmaster* cm=0; +//struct totalstats totalstats_prev; int connect_to_graphite_server(u_int32_t ipaddress, int port) { From 7175a261ca86fee7143a2a2766923117d5c64e5d Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sun, 8 Feb 2015 15:45:16 -0500 Subject: [PATCH 11/12] decrease sleep time of graphite loop --- daemon/graphite.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/daemon/graphite.c b/daemon/graphite.c index 5ef134387..38a9278e1 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -18,6 +18,7 @@ static u_int32_t graphite_ipaddress; static int graphite_port=0; static struct callmaster* cm=0; //struct totalstats totalstats_prev; +static time_t g_now, next_run; int connect_to_graphite_server(u_int32_t ipaddress, int port) { @@ -120,6 +121,12 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { int rc=0; + g_now = time(NULL); + if (g_now < next_run) + goto sleep; + + next_run = g_now + seconds; + if (!cm) cm = callmaster; @@ -134,5 +141,6 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { } } - sleep(seconds); +sleep: + usleep(100000); } From 38822852c348f92e3032730d4c5c72b75b18cf7e Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Sun, 8 Feb 2015 15:46:02 -0500 Subject: [PATCH 12/12] decrease calls to time() in graphite code --- daemon/graphite.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/daemon/graphite.c b/daemon/graphite.c index 38a9278e1..f886584af 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -89,17 +89,17 @@ int send_graphite_data() { mutex_lock(&cm->totalstats_lock); - rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.forced_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_forced_term_sess,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.managed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_managed_sess,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.oneway_stream_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_oneway_stream_sess,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.regular_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_regular_term_sess,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.relayed_errors %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_errors,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.relayed_packets %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_packets,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.silent_timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_silent_timeout_sess,(unsigned long long)time(NULL)); ptr += rc; - rc = sprintf(ptr,"%s.totals.timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_timeout_sess,(unsigned long long)time(NULL)); ptr += rc; + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.forced_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_forced_term_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.managed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_managed_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.oneway_stream_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_oneway_stream_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.regular_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_regular_term_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_errors %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_errors,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_packets %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_packets,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.silent_timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_silent_timeout_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_timeout_sess,(unsigned long long)g_now); ptr += rc; ZERO(cm->totalstats_interval);