diff --git a/README.md b/README.md index 7931add40..9727cef63 100644 --- a/README.md +++ b/README.md @@ -184,6 +184,8 @@ option and which are reproduced below: -d, --delete-delay Delay for deleting a session from memory. --sip-source Use SIP source address by default --dtls-passive Always prefer DTLS passive role + -g, --graphite=[IP46:]PORT TCP address of graphite statistics server + -w, --graphite-interval=INT Graphite data statistics send interval Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local IP address through `--interface`, and at least one of the `--listen-...` options must be given. @@ -354,6 +356,14 @@ The options are described in more detail below. NGCP-specific options +* -g, --graphite + + Address of the graphite statistics server. + +* -w, --graphite-interval + + Interval of the time when information is sent to the graphite server. + A typical command line (enabling both UDP and NG protocols) thus may look like: /usr/sbin/rtpengine --table=0 --interface=10.64.73.31 --interface=2001:db8::4f3:3d \ diff --git a/daemon/Makefile b/daemon/Makefile index 001dd5a47..24d0637b6 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -63,7 +63,7 @@ endif SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ - crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c + crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c OBJS= $(SRCS:.c=.o) diff --git a/daemon/aux.c b/daemon/aux.c index c09d0c918..8856f7e76 100644 --- a/daemon/aux.c +++ b/daemon/aux.c @@ -192,3 +192,13 @@ void thread_create_detach(void (*f)(void *), void *d) { if (thread_create(thread_detach_func, dt, 1, NULL)) abort(); } + +unsigned int in6_addr_hash(const void *p) { + const struct in6_addr *a = p; + return a->s6_addr32[0] ^ a->s6_addr32[3]; +} + +int in6_addr_eq(const void *a, const void *b) { + const struct in6_addr *A = a, *B = b; + return !memcmp(A, B, sizeof(*A)); +} diff --git a/daemon/aux.h b/daemon/aux.h index 0c6b2b2c3..21bb0498b 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -144,7 +144,7 @@ INLINE u_int32_t in6_to_4(const struct in6_addr *a) { return a->s6_addr32[3]; } -INLINE void smart_ntop(char *o, struct in6_addr *a, size_t len) { +INLINE void smart_ntop(char *o, const struct in6_addr *a, size_t len) { const char *r; if (IN6_IS_ADDR_V4MAPPED(a)) @@ -156,7 +156,7 @@ INLINE void smart_ntop(char *o, struct in6_addr *a, size_t len) { *o = '\0'; } -INLINE char *smart_ntop_p(char *o, struct in6_addr *a, size_t len) { +INLINE char *smart_ntop_p(char *o, const struct in6_addr *a, size_t len) { int l; if (IN6_IS_ADDR_V4MAPPED(a)) { @@ -178,7 +178,7 @@ INLINE char *smart_ntop_p(char *o, struct in6_addr *a, size_t len) { } } -INLINE void smart_ntop_port(char *o, struct sockaddr_in6 *a, size_t len) { +INLINE void smart_ntop_port(char *o, const struct sockaddr_in6 *a, size_t len) { char *e; e = smart_ntop_p(o, &a->sin6_addr, len); @@ -483,4 +483,9 @@ INLINE void g_queue_append(GQueue *dst, const GQueue *src) { g_queue_push_tail(dst, l->data); } + + +unsigned int in6_addr_hash(const void *p); +int in6_addr_eq(const void *a, const void *b); + #endif diff --git a/daemon/call.c b/daemon/call.c index 66ad7c393..8c9beb260 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1423,6 +1423,12 @@ struct callmaster *callmaster_new(struct poller *p) { poller_add_timer(p, callmaster_timer, &c->obj); + mutex_init(&c->totalstats_lock); + c->totalstats.started = poller_now; + + mutex_init(&c->cngs_lock); + c->cngs_hash = g_hash_table_new(in6_addr_hash, in6_addr_eq); + return c; fail: @@ -2438,6 +2444,8 @@ void call_destroy(struct call *c) { /* CDRs and statistics */ cdrbufcur += sprintf(cdrbufcur,"ci=%s, ",c->callid.s); cdrbufcur += sprintf(cdrbufcur,"created_from=%s, ", c->created_from); + cdrbufcur += sprintf(cdrbufcur,"last_signal=%llu, ", (unsigned long long)c->last_signal); + cdrbufcur += sprintf(cdrbufcur,"tos=%u, ", (unsigned int)c->tos); for (l = c->monologues; l; l = l->next) { ml = l->data; if (_log_facility_cdr) { @@ -2486,26 +2494,34 @@ void call_destroy(struct call *c) { "ml%i_midx%u_%s_local_relay_port=%u, " "ml%i_midx%u_%s_relayed_packets=%llu, " "ml%i_midx%u_%s_relayed_bytes=%llu, " - "ml%i_midx%u_%s_relayed_errors=%llu, ", + "ml%i_midx%u_%s_relayed_errors=%llu, " + "ml%i_midx%u_%s_last_packet=%llu, ", cdrlinecnt, md->index, protocol, buf, cdrlinecnt, md->index, protocol, ps->endpoint.port, cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets, cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes, - cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors); + cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors, + cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet); } ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, " - "%llu p, %llu b, %llu e", + "%llu p, %llu b, %llu e, %llu last_packet", md->index, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), buf, ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", (unsigned long long) ps->stats.packets, (unsigned long long) ps->stats.bytes, - (unsigned long long) ps->stats.errors); + (unsigned long long) ps->stats.errors, + (unsigned long long) ps->last_packet); + + mutex_lock(&m->totalstats_lock); m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets; + m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets; m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors; + m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors; + mutex_unlock(&m->totalstats_lock); } } if (_log_facility_cdr) @@ -2513,7 +2529,11 @@ void call_destroy(struct call *c) { } // --- for statistics getting one way stream or no relay at all + mutex_lock(&m->totalstats_lock); m->totalstats.total_nopacket_relayed_sess *= 2; + m->totalstats_interval.total_nopacket_relayed_sess *= 2; + mutex_unlock(&m->totalstats_lock); + for (l = c->monologues; l; l = l->next) { ml = l->data; @@ -2550,32 +2570,53 @@ void call_destroy(struct call *c) { } } - if (ps && ps2 && ps->stats.packets!=0 && ps2->stats.packets==0) - m->totalstats.total_oneway_stream_sess++; + if (ps && ps2 && ps2->stats.packets==0) { + mutex_lock(&m->totalstats_lock); + if (ps->stats.packets!=0) { + m->totalstats.total_oneway_stream_sess++; + m->totalstats_interval.total_oneway_stream_sess++; + } + else { + m->totalstats.total_nopacket_relayed_sess++; + m->totalstats_interval.total_nopacket_relayed_sess++; + } + mutex_unlock(&m->totalstats_lock); + } + } - if (ps && ps2 && ps->stats.packets==0 && ps2->stats.packets==0) - m->totalstats.total_nopacket_relayed_sess++; + mutex_lock(&m->totalstats_lock); - } m->totalstats.total_nopacket_relayed_sess /= 2; + m->totalstats_interval.total_nopacket_relayed_sess /= 2; m->totalstats.total_managed_sess += 1; + m->totalstats_interval.total_managed_sess += 1; ml = c->monologues->data; if (ml->term_reason==TIMEOUT) { m->totalstats.total_timeout_sess++; + m->totalstats_interval.total_timeout_sess++; } else if (ml->term_reason==SILENT_TIMEOUT) { m->totalstats.total_silent_timeout_sess++; + m->totalstats_interval.total_silent_timeout_sess++; } else if (ml->term_reason==REGULAR) { m->totalstats.total_regular_term_sess++; + m->totalstats_interval.total_regular_term_sess++; } else if (ml->term_reason==FORCED) { m->totalstats.total_forced_term_sess++; + m->totalstats_interval.total_forced_term_sess++; } timeval_multiply(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess-1); timeval_add(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,&tim_result_duration); timeval_devide(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess); + timeval_multiply(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess-1); + timeval_add(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,&tim_result_duration); + timeval_devide(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess); + + mutex_unlock(&m->totalstats_lock); + if (_log_facility_cdr) /* log it */ cdrlog(cdrbuffer); diff --git a/daemon/call.h b/daemon/call.h index bcf741fd9..8be512487 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -11,7 +11,7 @@ #include #include #include "compat.h" - +#include "control_ng.h" enum termination_reason { UNKNOWN=0, @@ -410,7 +410,12 @@ struct callmaster { struct stats statsps; /* per second stats, running timer */ mutex_t statslock; struct stats stats; /* copied from statsps once a second */ + mutex_t totalstats_lock; /* for both of them */ struct totalstats totalstats; + struct totalstats totalstats_interval; + /* control_ng_stats stuff */ + mutex_t cngs_lock; + GHashTable *cngs_hash; struct poller *poller; pcre *info_re; diff --git a/daemon/cli.c b/daemon/cli.c index 162d6274d..dfa0c6744 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -25,6 +25,9 @@ static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { int printlen=0; + + mutex_lock(&m->totalstats_lock); + printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nTotal statistics (does not include current running sessions):\n\n"); ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Uptime of rtpengine :%llu seconds\n", (unsigned long long)time(NULL)-m->totalstats.started); @@ -49,6 +52,42 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec); ADJUSTLEN(printlen,outbufend,replybuffer); + + mutex_unlock(&m->totalstats_lock); + + printlen = snprintf(replybuffer,(outbufend-replybuffer), "Control statistics:\n\n"); + ADJUSTLEN(printlen,outbufend,replybuffer); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s \n", + "Proxy", "Offer", "Answer", "Delete", "Ping", "List", "Query", "Errors"); + ADJUSTLEN(printlen,outbufend,replybuffer); + + mutex_lock(&m->cngs_lock); + GList *list = g_hash_table_get_values(m->cngs_hash); + + if (!list) { + printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n No proxies have yet tried to send data."); + ADJUSTLEN(printlen,outbufend,replybuffer); + } + for (GList *l = list; l; l = l->next) { + struct control_ng_stats* cur = l->data; + char buf[128]; memset(&buf,0,128); + smart_ntop_p(buf, &(cur->proxy), sizeof(buf)); + + printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10u | %10u | %10u | %10u | %10u | %10u | %10u \n", + buf, + cur->offer, + cur->answer, + cur->delete, + cur->ping, + cur->list, + cur->query, + cur->errors); + ADJUSTLEN(printlen,outbufend,replybuffer); + } + printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n\n"); + ADJUSTLEN(printlen,outbufend,replybuffer); + mutex_unlock(&m->cngs_lock); + g_list_free(list); } static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { @@ -81,7 +120,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m return; } - printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %30s | deletionmark:%4s | created:%12i | proxy:%s\n\n", c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from); + printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %30s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu\n\n", + c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal); ADJUSTLEN(printlen,outbufend,replybuffer); for (l = c->monologues; l; l = l->next) { @@ -113,14 +153,15 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m smart_ntop_p(buf, &ps->endpoint.ip46, sizeof(buf)); printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " - "%llu p, %llu b, %llu e\n", + "%llu p, %llu b, %llu e, %llu last_packet\n", md->index, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), buf, ps->endpoint.port, (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", (unsigned long long) ps->stats.packets, (unsigned long long) ps->stats.bytes, - (unsigned long long) ps->stats.errors); + (unsigned long long) ps->stats.errors, + (unsigned long long) ps->last_packet); ADJUSTLEN(printlen,outbufend,replybuffer); } } diff --git a/daemon/control_ng.c b/daemon/control_ng.c index d568520b0..911479bea 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -56,6 +56,24 @@ static void pretty_print(bencode_item_t *el, GString *s) { } } +struct control_ng_stats* get_control_ng_stats(struct control_ng* c, const struct in6_addr *addr) { + struct callmaster *m = c->callmaster; + struct control_ng_stats* cur; + + mutex_lock(&m->cngs_lock); + cur = g_hash_table_lookup(m->cngs_hash, addr); + if (!cur) { + cur = g_slice_alloc0(sizeof(struct control_ng_stats)); + cur->proxy = *addr; + char buf[128]; memset(&buf,0,128); + smart_ntop_p(buf, addr, sizeof(buf)); + ilog(LOG_DEBUG,"Adding a proxy for control ng stats:%s",buf); + g_hash_table_insert(m->cngs_hash, &cur->proxy, cur); + } + mutex_unlock(&m->cngs_lock); + return cur; +} + static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *sin, char *addr) { struct control_ng *c = (void *) obj; bencode_buffer_t bencbuf; @@ -66,6 +84,8 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 * struct iovec iov[3]; GString *log_str; + struct control_ng_stats* cur = get_control_ng_stats(c,&sin->sin6_addr); + str_chr_str(&data, buf, ' '); if (!data.s || data.s == buf->s) { ilog(LOG_WARNING, "Received invalid data on NG port (no cookie) from %s: "STR_FORMAT, addr, STR_FMT(buf)); @@ -108,19 +128,31 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 * g_string_free(log_str, TRUE); errstr = NULL; - if (!str_cmp(&cmd, "ping")) + if (!str_cmp(&cmd, "ping")) { bencode_dictionary_add_string(resp, "result", "pong"); - else if (!str_cmp(&cmd, "offer")) + g_atomic_int_inc(&cur->ping); + } + else if (!str_cmp(&cmd, "offer")) { errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin); - else if (!str_cmp(&cmd, "answer")) + g_atomic_int_inc(&cur->offer); + } + else if (!str_cmp(&cmd, "answer")) { errstr = call_answer_ng(dict, c->callmaster, resp); - else if (!str_cmp(&cmd, "delete")) + g_atomic_int_inc(&cur->answer); + } + else if (!str_cmp(&cmd, "delete")) { errstr = call_delete_ng(dict, c->callmaster, resp); - else if (!str_cmp(&cmd, "query")) + g_atomic_int_inc(&cur->delete); + } + else if (!str_cmp(&cmd, "query")) { errstr = call_query_ng(dict, c->callmaster, resp); + g_atomic_int_inc(&cur->query); + } #if GLIB_CHECK_VERSION(2,16,0) - else if (!str_cmp(&cmd, "list")) + else if (!str_cmp(&cmd, "list")) { errstr = call_list_ng(dict, c->callmaster, resp); + g_atomic_int_inc(&cur->list); + } #endif else errstr = "Unrecognized command"; @@ -134,6 +166,7 @@ err_send: ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data)); bencode_dictionary_add_string(resp, "result", "error"); bencode_dictionary_add_string(resp, "error-reason", errstr); + g_atomic_int_inc(&cur->errors); goto send_resp; send_resp: diff --git a/daemon/control_ng.h b/daemon/control_ng.h index e332f64b2..611ab51e0 100644 --- a/daemon/control_ng.h +++ b/daemon/control_ng.h @@ -9,6 +9,17 @@ struct poller; struct callmaster; +struct control_ng_stats { + struct in6_addr proxy; + int ping; + int offer; + int answer; + int delete; + int query; + int list; + int errors; +}; + struct control_ng { struct obj obj; struct callmaster *callmaster; @@ -18,5 +29,4 @@ struct control_ng { struct control_ng *control_ng_new(struct poller *, struct in6_addr, u_int16_t, struct callmaster *); - #endif diff --git a/daemon/graphite.c b/daemon/graphite.c new file mode 100644 index 000000000..f886584af --- /dev/null +++ b/daemon/graphite.c @@ -0,0 +1,146 @@ +/* + * graphite.c + * + * Created on: Jan 19, 2015 + * Author: fmetz + */ +#include +#include +#include +#include +#include + +#include "log.h" +#include "call.h" + +static int graphite_sock=-1; +static u_int32_t graphite_ipaddress; +static int graphite_port=0; +static struct callmaster* cm=0; +//struct totalstats totalstats_prev; +static time_t g_now, next_run; + +int connect_to_graphite_server(u_int32_t ipaddress, int port) { + + graphite_sock=-1; + //int reconnect=0; + int rc=0; + struct sockaddr_in sin; + memset(&sin,0,sizeof(sin)); + int val=1; + + graphite_ipaddress = ipaddress; + graphite_port = port; + + rc = graphite_sock = socket(AF_INET, SOCK_STREAM,0); + if(rc<0) { + ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite."); + return -1; + } + + sin.sin_family=AF_INET; + sin.sin_addr.s_addr=graphite_ipaddress; + sin.sin_port=htons(graphite_port); + + rc = setsockopt(graphite_sock,SOL_SOCKET,SO_REUSEADDR, &val,sizeof(val)); + if(rc<0) { + ilog(LOG_ERROR,"Couldn't set sockopt for graphite descriptor."); + goto error; + } + + struct in_addr ip; + ip.s_addr = graphite_ipaddress; + ilog(LOG_INFO, "Connecting to graphite server %s at port:%i with fd:%i",inet_ntoa(ip),graphite_port,graphite_sock); + rc = connect(graphite_sock, (struct sockaddr *)&sin, sizeof(sin)); + if (rc==-1) { + ilog(LOG_ERROR, "Connection could not be established. Trying again next time of graphite-interval."); + goto error; + } + + ilog(LOG_INFO, "Graphite server connected."); + + return graphite_sock; + +error: + close(graphite_sock); + graphite_sock = -1; + return -1; +} + +int send_graphite_data() { + + int rc=0; + + if (graphite_sock < 0) { + ilog(LOG_ERROR,"Graphite socket is not connected."); + return -1; + } + + // format hostname "." totals.subkey SPACE value SPACE timestamp + char hostname[256]; memset(&hostname,0,256); + rc = gethostname(hostname,256); + if (rc<0) { + ilog(LOG_ERROR, "Could not retrieve host name information."); + goto error; + } + + char data_to_send[8192]; memset(&data_to_send,0,8192); + char* ptr = data_to_send; + + mutex_lock(&cm->totalstats_lock); + + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.forced_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_forced_term_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.managed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_managed_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.oneway_stream_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_oneway_stream_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.regular_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_regular_term_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_errors %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_errors,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.relayed_packets %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_packets,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.silent_timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_silent_timeout_sess,(unsigned long long)g_now); ptr += rc; + rc = sprintf(ptr,"%s.totals.timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_timeout_sess,(unsigned long long)g_now); ptr += rc; + + ZERO(cm->totalstats_interval); + + mutex_unlock(&cm->totalstats_lock); + + rc = write(graphite_sock, data_to_send, strlen(data_to_send)); + if (rc<0) { + ilog(LOG_ERROR,"Could not write to graphite socket. Disconnecting graphite server."); + goto error; + } + return 0; + +error: + close(graphite_sock); graphite_sock=-1; + return -1; +} + +void graphite_loop_run(struct callmaster* callmaster, int seconds) { + + int rc=0; + + g_now = time(NULL); + if (g_now < next_run) + goto sleep; + + next_run = g_now + seconds; + + if (!cm) + cm = callmaster; + + if (graphite_sock < 0) { + rc = connect_to_graphite_server(graphite_ipaddress, graphite_port); + } + + if (rc>=0) { + rc = send_graphite_data(); + if (rc<0) { + ilog(LOG_ERROR,"Sending graphite data failed."); + } + } + +sleep: + usleep(100000); +} diff --git a/daemon/graphite.h b/daemon/graphite.h new file mode 100644 index 000000000..0b51e7b9a --- /dev/null +++ b/daemon/graphite.h @@ -0,0 +1,17 @@ +/* + * graphite.h + * + * Created on: Jan 19, 2015 + * Author: fmetz + */ + +#ifndef GRAPHITE_H_ +#define GRAPHITE_H_ + +#include "call.h" + +int connect_to_graphite_server(u_int32_t ipaddress, int port); +int send_graphite_data(); +void graphite_loop_run(struct callmaster* cm, int seconds); + +#endif /* GRAPHITE_H_ */ diff --git a/daemon/main.c b/daemon/main.c index 921b8646c..50da00044 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -26,7 +26,7 @@ #include "dtls.h" #include "call_interfaces.h" #include "cli.h" - +#include "graphite.h" @@ -108,7 +108,9 @@ static char *b2b_url; static enum xmlrpc_format xmlrpc_fmt = XF_SEMS; static int num_threads; static int delete_delay = 30; - +static u_int32_t graphite_ip = 0; +static u_int16_t graphite_port; +static int graphite_interval = 0; static void sighandler(gpointer x) { sigset_t ss; @@ -254,6 +256,7 @@ static void options(int *argc, char ***argv) { char *listenudps = NULL; char *listenngs = NULL; char *listencli = NULL; + char *graphitep = NULL; char *redisps = NULL; char *log_facility_s = NULL; char *log_facility_cdr_s = NULL; @@ -269,6 +272,8 @@ static void options(int *argc, char ***argv) { { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" }, + { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" }, + { "graphite-interval", 'w', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" }, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, @@ -331,6 +336,10 @@ static void options(int *argc, char ***argv) { die("Invalid IP or port (--listen-cli)"); } + if (graphitep) {if (parse_ip_port(&graphite_ip, &graphite_port, graphitep)) + die("Invalid IP or port (--graphite)"); + } + if (tos < 0 || tos > 255) die("Invalid TOS value"); @@ -605,9 +614,6 @@ no_kernel: ctx->m->conf = mc; callmaster_config_init(ctx->m); - ZERO(ctx->m->totalstats); - ctx->m->totalstats.started = time(NULL); - if (!foreground) daemonize(); wpidfile(); @@ -623,6 +629,20 @@ static void timer_loop(void *d) { poller_timers_wait_run(p, 100); } +static void graphite_loop(void *d) { + struct callmaster *cm = d; + + if (!graphite_interval) { + ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second."); + graphite_interval=1; + } + + connect_to_graphite_server(graphite_ip,graphite_port); + + while (!global_shutdown) + graphite_loop_run(cm,graphite_interval); // time in seconds +} + static void poller_loop(void *d) { struct poller *p = d; @@ -642,6 +662,8 @@ int main(int argc, char **argv) { thread_create_detach(sighandler, NULL); thread_create_detach(timer_loop, ctx.p); + if (graphite_ip) + thread_create_detach(graphite_loop, ctx.m); if (num_threads < 1) { #ifdef _SC_NPROCESSORS_ONLN diff --git a/debian/ngcp-rtpengine-daemon.default b/debian/ngcp-rtpengine-daemon.default index 20b659cb5..20b10c3d5 100644 --- a/debian/ngcp-rtpengine-daemon.default +++ b/debian/ngcp-rtpengine-daemon.default @@ -22,4 +22,6 @@ TABLE=0 # LOG_FACILITY=daemon # LOG_FACILITY_CDR=daemon # NUM_THREADS=5 -# DELETE_DELAY=30 \ No newline at end of file +# DELETE_DELAY=30 +# GRAPHITE=9006 +# GRAPHITE_INTERVAL=60 \ No newline at end of file diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index 651e17f6e..1c7e96f5c 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -72,6 +72,8 @@ OPTIONS="$OPTIONS --table=$TABLE" [ -z "$LOG_FACILITY_CDR" ] || OPTIONS="$OPTIONS --log-facility-cdr=$LOG_FACILITY_CDR" [ -z "$NUM_THREADS" ] || OPTIONS="$OPTIONS --num-threads=$NUM_THREADS" [ -z "$DELETE_DELAY" ] || OPTIONS="$OPTIONS --delete-delay=$DELETE_DELAY" +[ -z "$GRAPHITE" ] || OPTIONS="$OPTIONS --graphite=$GRAPHITE" +[ -z "$GRAPHITE_INTERVAL" ] || OPTIONS="$OPTIONS --graphite-interval=$GRAPHITE_INTERVAL" if test "$FORK" = "no" ; then OPTIONS="$OPTIONS --foreground" fi diff --git a/el/rtpengine.init b/el/rtpengine.init index f515d5341..1fb33ce62 100644 --- a/el/rtpengine.init +++ b/el/rtpengine.init @@ -143,6 +143,16 @@ build_opts() { OPTS+=" --delete-delay=$DELETE_DELAY" fi + if [[ -n "$GRAPHITE" ]] + then + OPTS+=" --graphite=$GRAPHITE" + fi + + if [[ -n "$GRAPHITE_INTERVAL" ]] + then + OPTS+=" --graphite-interval=$GRAPHITE_INTERVAL" + fi + if [[ -n "$LOG_FACILITY_CDR" ]] then OPTS+=" --log-facility-cdr=$LOG_FACILITY_CDR"