Merge branch 'dev-fmetz'

pull/69/head
Frederic-Philippe Metz 10 years ago
commit 7e6a2b809a

@ -184,6 +184,8 @@ option and which are reproduced below:
-d, --delete-delay Delay for deleting a session from memory. -d, --delete-delay Delay for deleting a session from memory.
--sip-source Use SIP source address by default --sip-source Use SIP source address by default
--dtls-passive Always prefer DTLS passive role --dtls-passive Always prefer DTLS passive role
-g, --graphite=[IP46:]PORT TCP address of graphite statistics server
-w, --graphite-interval=INT Graphite data statistics send interval
Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local
IP address through `--interface`, and at least one of the `--listen-...` options must be given. IP address through `--interface`, and at least one of the `--listen-...` options must be given.
@ -354,6 +356,14 @@ The options are described in more detail below.
NGCP-specific options NGCP-specific options
* -g, --graphite
Address of the graphite statistics server.
* -w, --graphite-interval
Interval of the time when information is sent to the graphite server.
A typical command line (enabling both UDP and NG protocols) thus may look like: A typical command line (enabling both UDP and NG protocols) thus may look like:
/usr/sbin/rtpengine --table=0 --interface=10.64.73.31 --interface=2001:db8::4f3:3d \ /usr/sbin/rtpengine --table=0 --interface=10.64.73.31 --interface=2001:db8::4f3:3d \

@ -63,7 +63,7 @@ endif
SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c
OBJS= $(SRCS:.c=.o) OBJS= $(SRCS:.c=.o)

@ -2438,6 +2438,8 @@ void call_destroy(struct call *c) {
/* CDRs and statistics */ /* CDRs and statistics */
cdrbufcur += sprintf(cdrbufcur,"ci=%s, ",c->callid.s); cdrbufcur += sprintf(cdrbufcur,"ci=%s, ",c->callid.s);
cdrbufcur += sprintf(cdrbufcur,"created_from=%s, ", c->created_from); cdrbufcur += sprintf(cdrbufcur,"created_from=%s, ", c->created_from);
cdrbufcur += sprintf(cdrbufcur,"last_signal=%llu, ", (unsigned long long)c->last_signal);
cdrbufcur += sprintf(cdrbufcur,"tos=%u, ", (unsigned int)c->tos);
for (l = c->monologues; l; l = l->next) { for (l = c->monologues; l; l = l->next) {
ml = l->data; ml = l->data;
if (_log_facility_cdr) { if (_log_facility_cdr) {
@ -2486,26 +2488,31 @@ void call_destroy(struct call *c) {
"ml%i_midx%u_%s_local_relay_port=%u, " "ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets=%llu, " "ml%i_midx%u_%s_relayed_packets=%llu, "
"ml%i_midx%u_%s_relayed_bytes=%llu, " "ml%i_midx%u_%s_relayed_bytes=%llu, "
"ml%i_midx%u_%s_relayed_errors=%llu, ", "ml%i_midx%u_%s_relayed_errors=%llu, "
"ml%i_midx%u_%s_last_packet=%llu, ",
cdrlinecnt, md->index, protocol, buf, cdrlinecnt, md->index, protocol, buf,
cdrlinecnt, md->index, protocol, ps->endpoint.port, cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets, cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes, cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors); cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet);
} }
ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, " ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e", "%llu p, %llu b, %llu e, %llu last_packet",
md->index, md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
buf, ps->endpoint.port, buf, ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets, (unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes, (unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors); (unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets; m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets;
m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets;
m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors; m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors;
m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors;
} }
} }
if (_log_facility_cdr) if (_log_facility_cdr)
@ -2514,6 +2521,7 @@ void call_destroy(struct call *c) {
// --- for statistics getting one way stream or no relay at all // --- for statistics getting one way stream or no relay at all
m->totalstats.total_nopacket_relayed_sess *= 2; m->totalstats.total_nopacket_relayed_sess *= 2;
m->totalstats_interval.total_nopacket_relayed_sess *= 2;
for (l = c->monologues; l; l = l->next) { for (l = c->monologues; l; l = l->next) {
ml = l->data; ml = l->data;
@ -2550,32 +2558,46 @@ void call_destroy(struct call *c) {
} }
} }
if (ps && ps2 && ps->stats.packets!=0 && ps2->stats.packets==0) if (ps && ps2 && ps->stats.packets!=0 && ps2->stats.packets==0) {
m->totalstats.total_oneway_stream_sess++; m->totalstats.total_oneway_stream_sess++;
m->totalstats_interval.total_oneway_stream_sess++;
}
if (ps && ps2 && ps->stats.packets==0 && ps2->stats.packets==0) if (ps && ps2 && ps->stats.packets==0 && ps2->stats.packets==0) {
m->totalstats.total_nopacket_relayed_sess++; m->totalstats.total_nopacket_relayed_sess++;
m->totalstats_interval.total_nopacket_relayed_sess++;
}
} }
m->totalstats.total_nopacket_relayed_sess /= 2; m->totalstats.total_nopacket_relayed_sess /= 2;
m->totalstats_interval.total_nopacket_relayed_sess /= 2;
m->totalstats.total_managed_sess += 1; m->totalstats.total_managed_sess += 1;
m->totalstats_interval.total_managed_sess += 1;
ml = c->monologues->data; ml = c->monologues->data;
if (ml->term_reason==TIMEOUT) { if (ml->term_reason==TIMEOUT) {
m->totalstats.total_timeout_sess++; m->totalstats.total_timeout_sess++;
m->totalstats_interval.total_timeout_sess++;
} else if (ml->term_reason==SILENT_TIMEOUT) { } else if (ml->term_reason==SILENT_TIMEOUT) {
m->totalstats.total_silent_timeout_sess++; m->totalstats.total_silent_timeout_sess++;
m->totalstats_interval.total_silent_timeout_sess++;
} else if (ml->term_reason==REGULAR) { } else if (ml->term_reason==REGULAR) {
m->totalstats.total_regular_term_sess++; m->totalstats.total_regular_term_sess++;
m->totalstats_interval.total_regular_term_sess++;
} else if (ml->term_reason==FORCED) { } else if (ml->term_reason==FORCED) {
m->totalstats.total_forced_term_sess++; m->totalstats.total_forced_term_sess++;
m->totalstats_interval.total_forced_term_sess++;
} }
timeval_multiply(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess-1); timeval_multiply(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess-1);
timeval_add(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,&tim_result_duration); timeval_add(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,&tim_result_duration);
timeval_devide(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess); timeval_devide(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess);
timeval_multiply(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess-1);
timeval_add(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,&tim_result_duration);
timeval_devide(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess);
if (_log_facility_cdr) if (_log_facility_cdr)
/* log it */ /* log it */
cdrlog(cdrbuffer); cdrlog(cdrbuffer);

@ -11,7 +11,7 @@
#include <pcre.h> #include <pcre.h>
#include <openssl/x509.h> #include <openssl/x509.h>
#include "compat.h" #include "compat.h"
#include "control_ng.h"
enum termination_reason { enum termination_reason {
UNKNOWN=0, UNKNOWN=0,
@ -411,6 +411,8 @@ struct callmaster {
mutex_t statslock; mutex_t statslock;
struct stats stats; /* copied from statsps once a second */ struct stats stats; /* copied from statsps once a second */
struct totalstats totalstats; struct totalstats totalstats;
struct totalstats totalstats_interval;
struct control_ng_stats* control_ng_stats;
struct poller *poller; struct poller *poller;
pcre *info_re; pcre *info_re;

@ -49,6 +49,35 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec);
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), "Control statistics:\n\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s \n",
"Proxy", "Offer", "Answer", "Delete", "Ping", "List", "Query", "Errors");
ADJUSTLEN(printlen,outbufend,replybuffer);
struct control_ng_stats* cur = m->control_ng_stats;
if (!cur) {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n No proxies have yet tried to send data.");
ADJUSTLEN(printlen,outbufend,replybuffer);
}
while (cur) {
char buf[128]; memset(&buf,0,128);
smart_ntop_p(buf, &(cur->proxy.sin6_addr), sizeof(buf));
printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10u | %10u | %10u | %10u | %10u | %10u | %10u \n",
buf,
cur->offer,
cur->answer,
cur->delete,
cur->ping,
cur->list,
cur->query,
cur->errors);
ADJUSTLEN(printlen,outbufend,replybuffer);
cur = cur->next;
}
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
} }
static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
@ -81,7 +110,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
return; return;
} }
printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %30s | deletionmark:%4s | created:%12i | proxy:%s\n\n", c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from); printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %30s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu\n\n",
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal);
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
for (l = c->monologues; l; l = l->next) { for (l = c->monologues; l; l = l->next) {
@ -113,14 +143,15 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
smart_ntop_p(buf, &ps->endpoint.ip46, sizeof(buf)); smart_ntop_p(buf, &ps->endpoint.ip46, sizeof(buf));
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e\n", "%llu p, %llu b, %llu e, %llu last_packet\n",
md->index, md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
buf, ps->endpoint.port, buf, ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets, (unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes, (unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors); (unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
} }
} }

@ -56,6 +56,44 @@ static void pretty_print(bencode_item_t *el, GString *s) {
} }
} }
struct control_ng_stats* get_control_ng_stats(struct control_ng* c, struct sockaddr_in6* sin) {
// seems to be the first address
if (!c->callmaster->control_ng_stats) {
c->callmaster->control_ng_stats = malloc(sizeof(struct control_ng_stats));
memset(c->callmaster->control_ng_stats,0,sizeof(struct control_ng_stats));
memcpy(&(c->callmaster->control_ng_stats->proxy),sin,sizeof(struct sockaddr_in6));
char buf[128]; memset(&buf,0,128);
smart_ntop_p(buf, &sin->sin6_addr, sizeof(buf));
ilog(LOG_INFO,"Adding a first proxy for control ng stats:%s\n",buf);
return c->callmaster->control_ng_stats;
}
struct control_ng_stats* cur = c->callmaster->control_ng_stats;
struct control_ng_stats* last;
while (cur) {
last = cur;
if (memcmp((const void*)(&(cur->proxy.sin6_addr)),(const void*)(&(sin->sin6_addr)),sizeof(struct in6_addr))==0) {
ilog(LOG_DEBUG,"Already found proxy for control ng stats.\n");
return cur;
}
cur = cur->next;
}
// add a new one
char buf[128]; memset(&buf,0,128);
smart_ntop_p(buf, &sin->sin6_addr, sizeof(buf));
ilog(LOG_INFO,"Adding a new proxy for control ng stats:%s\n",buf);
cur = malloc(sizeof(struct control_ng_stats));
memset(cur,0,sizeof(struct control_ng_stats));
memcpy(cur,sin,sizeof(struct sockaddr_in6));
last->next = cur;
return cur;
}
static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *sin, char *addr) { static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *sin, char *addr) {
struct control_ng *c = (void *) obj; struct control_ng *c = (void *) obj;
bencode_buffer_t bencbuf; bencode_buffer_t bencbuf;
@ -66,6 +104,8 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *
struct iovec iov[3]; struct iovec iov[3];
GString *log_str; GString *log_str;
struct control_ng_stats* cur = get_control_ng_stats(c,sin);
str_chr_str(&data, buf, ' '); str_chr_str(&data, buf, ' ');
if (!data.s || data.s == buf->s) { if (!data.s || data.s == buf->s) {
ilog(LOG_WARNING, "Received invalid data on NG port (no cookie) from %s: "STR_FORMAT, addr, STR_FMT(buf)); ilog(LOG_WARNING, "Received invalid data on NG port (no cookie) from %s: "STR_FORMAT, addr, STR_FMT(buf));
@ -108,19 +148,31 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *
g_string_free(log_str, TRUE); g_string_free(log_str, TRUE);
errstr = NULL; errstr = NULL;
if (!str_cmp(&cmd, "ping")) if (!str_cmp(&cmd, "ping")) {
bencode_dictionary_add_string(resp, "result", "pong"); bencode_dictionary_add_string(resp, "result", "pong");
else if (!str_cmp(&cmd, "offer")) cur->ping++;
}
else if (!str_cmp(&cmd, "offer")) {
errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin); errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin);
else if (!str_cmp(&cmd, "answer")) cur->offer++;
}
else if (!str_cmp(&cmd, "answer")) {
errstr = call_answer_ng(dict, c->callmaster, resp); errstr = call_answer_ng(dict, c->callmaster, resp);
else if (!str_cmp(&cmd, "delete")) cur->answer++;
}
else if (!str_cmp(&cmd, "delete")) {
errstr = call_delete_ng(dict, c->callmaster, resp); errstr = call_delete_ng(dict, c->callmaster, resp);
else if (!str_cmp(&cmd, "query")) cur->delete++;
}
else if (!str_cmp(&cmd, "query")) {
errstr = call_query_ng(dict, c->callmaster, resp); errstr = call_query_ng(dict, c->callmaster, resp);
cur->query++;
}
#if GLIB_CHECK_VERSION(2,16,0) #if GLIB_CHECK_VERSION(2,16,0)
else if (!str_cmp(&cmd, "list")) else if (!str_cmp(&cmd, "list")) {
errstr = call_list_ng(dict, c->callmaster, resp); errstr = call_list_ng(dict, c->callmaster, resp);
cur->list++;
}
#endif #endif
else else
errstr = "Unrecognized command"; errstr = "Unrecognized command";
@ -134,6 +186,7 @@ err_send:
ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data)); ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data));
bencode_dictionary_add_string(resp, "result", "error"); bencode_dictionary_add_string(resp, "result", "error");
bencode_dictionary_add_string(resp, "error-reason", errstr); bencode_dictionary_add_string(resp, "error-reason", errstr);
cur->errors++;
goto send_resp; goto send_resp;
send_resp: send_resp:

@ -9,6 +9,18 @@
struct poller; struct poller;
struct callmaster; struct callmaster;
struct control_ng_stats {
struct sockaddr_in6 proxy;
u_int32_t ping;
u_int32_t offer;
u_int32_t answer;
u_int32_t delete;
u_int32_t query;
u_int32_t list;
u_int32_t errors;
struct control_ng_stats* next;
};
struct control_ng { struct control_ng {
struct obj obj; struct obj obj;
struct callmaster *callmaster; struct callmaster *callmaster;

@ -0,0 +1,127 @@
/*
* graphite.c
*
* Created on: Jan 19, 2015
* Author: fmetz
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "log.h"
#include "call.h"
int graphite_sock=0;
u_int32_t graphite_ipaddress;
int graphite_port=0;
struct callmaster* cm=0;
struct totalstats totalstats_prev;
int connect_to_graphite_server(u_int32_t ipaddress, int port) {
graphite_sock=0;
int reconnect=0;
int rc=0;
struct sockaddr_in sin;
memset(&sin,0,sizeof(sin));
int val=1;
graphite_ipaddress = ipaddress;
graphite_port = port;
rc = graphite_sock = socket(AF_INET, SOCK_STREAM,0);
if(rc<0) {
ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite.");
return -1;
}
sin.sin_family=AF_INET;
sin.sin_addr.s_addr=graphite_ipaddress;
sin.sin_port=htons(graphite_port);
rc = setsockopt(graphite_sock,SOL_SOCKET,SO_REUSEADDR, &val,sizeof(val));
if(rc<0) {
ilog(LOG_ERROR,"Couldn't set sockopt for graphite descriptor.");
return -1;
}
struct in_addr ip;
ip.s_addr = graphite_ipaddress;
ilog(LOG_INFO, "Connecting to graphite server %s at port:%i with fd:%i",inet_ntoa(ip),graphite_port,graphite_sock);
rc = connect(graphite_sock, (struct sockaddr *)&sin, sizeof(sin));
if (rc==-1) {
ilog(LOG_ERROR, "Connection could not be established. Trying again next time of graphite-interval.");
return -1;
}
ilog(LOG_INFO, "Graphite server connected.");
return graphite_sock;
}
int send_graphite_data() {
int rc=0;
if (!graphite_sock) {
ilog(LOG_ERROR,"Graphite socket is not connected.");
return -1;
}
// format hostname "." totals.subkey SPACE value SPACE timestamp
char hostname[256]; memset(&hostname,0,256);
rc = gethostname(hostname,256);
if (rc<0) {
ilog(LOG_ERROR, "Could not retrieve host name information.");
return -1;
}
char data_to_send[8192]; memset(&data_to_send,0,8192);
char* ptr = data_to_send;
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %i %u\n",hostname, cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %i %u\n",hostname, cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.forced_term_sess %i %u\n",hostname, cm->totalstats_interval.total_forced_term_sess,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.managed_sess %i %u\n",hostname, cm->totalstats_interval.total_managed_sess,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %i %u\n",hostname, cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.oneway_stream_sess %i %u\n",hostname, cm->totalstats_interval.total_oneway_stream_sess,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.regular_term_sess %i %u\n",hostname, cm->totalstats_interval.total_regular_term_sess,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.relayed_errors %i %u\n",hostname, cm->totalstats_interval.total_relayed_errors,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.relayed_packets %i %u\n",hostname, cm->totalstats_interval.total_relayed_packets,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.silent_timeout_sess %i %u\n",hostname, cm->totalstats_interval.total_silent_timeout_sess,(unsigned)time(NULL)); ptr += rc;
rc = sprintf(ptr,"%s.totals.timeout_sess %i %u\n",hostname, cm->totalstats_interval.total_timeout_sess,(unsigned)time(NULL)); ptr += rc;
ZERO(cm->totalstats_interval);
rc = write(graphite_sock, data_to_send, strlen(data_to_send));
if (rc<0) {
ilog(LOG_ERROR,"Could not write to graphite socket. Disconnecting graphite server.");
close(graphite_sock); graphite_sock=0;
return -1;
}
return 0;
}
void graphite_loop_run(struct callmaster* callmaster, int seconds) {
int rc=0;
if (!cm)
cm = callmaster;
if (!graphite_sock) {
rc = connect_to_graphite_server(graphite_ipaddress, graphite_port);
}
if (rc>=0) {
rc = send_graphite_data();
if (rc<0) {
ilog(LOG_ERROR,"Sending graphite data failed.");
graphite_sock=0;
}
}
sleep(seconds);
}

@ -0,0 +1,17 @@
/*
* graphite.h
*
* Created on: Jan 19, 2015
* Author: fmetz
*/
#ifndef GRAPHITE_H_
#define GRAPHITE_H_
#include "call.h"
int connect_to_graphite_server(u_int32_t ipaddress, int port);
int send_graphite_data();
void graphite_loop_run(struct callmaster* cm, int seconds);
#endif /* GRAPHITE_H_ */

@ -26,7 +26,7 @@
#include "dtls.h" #include "dtls.h"
#include "call_interfaces.h" #include "call_interfaces.h"
#include "cli.h" #include "cli.h"
#include "graphite.h"
@ -108,7 +108,9 @@ static char *b2b_url;
static enum xmlrpc_format xmlrpc_fmt = XF_SEMS; static enum xmlrpc_format xmlrpc_fmt = XF_SEMS;
static int num_threads; static int num_threads;
static int delete_delay = 30; static int delete_delay = 30;
static u_int32_t graphite_ip = NULL;
static u_int16_t graphite_port;
static int graphite_interval = 0;
static void sighandler(gpointer x) { static void sighandler(gpointer x) {
sigset_t ss; sigset_t ss;
@ -254,6 +256,7 @@ static void options(int *argc, char ***argv) {
char *listenudps = NULL; char *listenudps = NULL;
char *listenngs = NULL; char *listenngs = NULL;
char *listencli = NULL; char *listencli = NULL;
char *graphitep = NULL;
char *redisps = NULL; char *redisps = NULL;
char *log_facility_s = NULL; char *log_facility_s = NULL;
char *log_facility_cdr_s = NULL; char *log_facility_cdr_s = NULL;
@ -269,6 +272,8 @@ static void options(int *argc, char ***argv) {
{ "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" }, { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" },
{ "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" },
{ "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" },
{ "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" },
{ "graphite-interval", 'w', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" },
{ "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" }, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" },
{ "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" },
@ -331,6 +336,10 @@ static void options(int *argc, char ***argv) {
die("Invalid IP or port (--listen-cli)"); die("Invalid IP or port (--listen-cli)");
} }
if (graphitep) {if (parse_ip_port(&graphite_ip, &graphite_port, graphitep))
die("Invalid IP or port (--graphite)");
}
if (tos < 0 || tos > 255) if (tos < 0 || tos > 255)
die("Invalid TOS value"); die("Invalid TOS value");
@ -606,6 +615,7 @@ no_kernel:
callmaster_config_init(ctx->m); callmaster_config_init(ctx->m);
ZERO(ctx->m->totalstats); ZERO(ctx->m->totalstats);
ZERO(ctx->m->totalstats_interval);
ctx->m->totalstats.started = time(NULL); ctx->m->totalstats.started = time(NULL);
if (!foreground) if (!foreground)
@ -614,6 +624,8 @@ no_kernel:
if (redis_restore(ctx->m, mc.redis)) if (redis_restore(ctx->m, mc.redis))
die("Refusing to continue without working Redis database"); die("Refusing to continue without working Redis database");
ZERO(ctx->m->control_ng_stats);
} }
static void timer_loop(void *d) { static void timer_loop(void *d) {
@ -623,6 +635,20 @@ static void timer_loop(void *d) {
poller_timers_wait_run(p, 100); poller_timers_wait_run(p, 100);
} }
static void graphite_loop(void *d) {
struct callmaster *cm = d;
if (!graphite_interval) {
ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second.");
graphite_interval=1;
}
connect_to_graphite_server(graphite_ip,graphite_port);
while (!global_shutdown)
graphite_loop_run(cm,graphite_interval); // time in seconds
}
static void poller_loop(void *d) { static void poller_loop(void *d) {
struct poller *p = d; struct poller *p = d;
@ -642,6 +668,8 @@ int main(int argc, char **argv) {
thread_create_detach(sighandler, NULL); thread_create_detach(sighandler, NULL);
thread_create_detach(timer_loop, ctx.p); thread_create_detach(timer_loop, ctx.p);
if (graphite_ip)
thread_create_detach(graphite_loop, ctx.m);
if (num_threads < 1) { if (num_threads < 1) {
#ifdef _SC_NPROCESSORS_ONLN #ifdef _SC_NPROCESSORS_ONLN

@ -23,3 +23,5 @@ TABLE=0
# LOG_FACILITY_CDR=daemon # LOG_FACILITY_CDR=daemon
# NUM_THREADS=5 # NUM_THREADS=5
# DELETE_DELAY=30 # DELETE_DELAY=30
# GRAPHITE=9006
# GRAPHITE_INTERVAL=60

@ -72,6 +72,8 @@ OPTIONS="$OPTIONS --table=$TABLE"
[ -z "$LOG_FACILITY_CDR" ] || OPTIONS="$OPTIONS --log-facility-cdr=$LOG_FACILITY_CDR" [ -z "$LOG_FACILITY_CDR" ] || OPTIONS="$OPTIONS --log-facility-cdr=$LOG_FACILITY_CDR"
[ -z "$NUM_THREADS" ] || OPTIONS="$OPTIONS --num-threads=$NUM_THREADS" [ -z "$NUM_THREADS" ] || OPTIONS="$OPTIONS --num-threads=$NUM_THREADS"
[ -z "$DELETE_DELAY" ] || OPTIONS="$OPTIONS --delete-delay=$DELETE_DELAY" [ -z "$DELETE_DELAY" ] || OPTIONS="$OPTIONS --delete-delay=$DELETE_DELAY"
[ -z "$GRAPHITE" ] || OPTIONS="$OPTIONS --graphite=$GRAPHITE"
[ -z "$GRAPHITE_INTERVAL" ] || OPTIONS="$OPTIONS --graphite-interval=$GRAPHITE_INTERVAL"
if test "$FORK" = "no" ; then if test "$FORK" = "no" ; then
OPTIONS="$OPTIONS --foreground" OPTIONS="$OPTIONS --foreground"
fi fi

@ -143,6 +143,16 @@ build_opts() {
OPTS+=" --delete-delay=$DELETE_DELAY" OPTS+=" --delete-delay=$DELETE_DELAY"
fi fi
if [[ -n "$GRAPHITE" ]]
then
OPTS+=" --graphite=$GRAPHITE"
fi
if [[ -n "$GRAPHITE_INTERVAL" ]]
then
OPTS+=" --graphite-interval=$GRAPHITE_INTERVAL"
fi
if [[ -n "$LOG_FACILITY_CDR" ]] if [[ -n "$LOG_FACILITY_CDR" ]]
then then
OPTS+=" --log-facility-cdr=$LOG_FACILITY_CDR" OPTS+=" --log-facility-cdr=$LOG_FACILITY_CDR"

Loading…
Cancel
Save