Merge branch '1and1-master'

pull/101/head
Richard Fuchs 11 years ago
commit 9e44c16d37

@ -184,6 +184,8 @@ option and which are reproduced below:
-d, --delete-delay Delay for deleting a session from memory. -d, --delete-delay Delay for deleting a session from memory.
--sip-source Use SIP source address by default --sip-source Use SIP source address by default
--dtls-passive Always prefer DTLS passive role --dtls-passive Always prefer DTLS passive role
-g, --graphite=[IP46:]PORT TCP address of graphite statistics server
-w, --graphite-interval=INT Graphite data statistics send interval
Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local Most of these options are indeed optional, with two exceptions. It's mandatory to specify at least one local
IP address through `--interface`, and at least one of the `--listen-...` options must be given. IP address through `--interface`, and at least one of the `--listen-...` options must be given.
@ -354,6 +356,14 @@ The options are described in more detail below.
NGCP-specific options NGCP-specific options
* -g, --graphite
Address of the graphite statistics server.
* -w, --graphite-interval
Interval of the time when information is sent to the graphite server.
A typical command line (enabling both UDP and NG protocols) thus may look like: A typical command line (enabling both UDP and NG protocols) thus may look like:
/usr/sbin/rtpengine --table=0 --interface=10.64.73.31 --interface=2001:db8::4f3:3d \ /usr/sbin/rtpengine --table=0 --interface=10.64.73.31 --interface=2001:db8::4f3:3d \

@ -63,7 +63,7 @@ endif
SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c
OBJS= $(SRCS:.c=.o) OBJS= $(SRCS:.c=.o)

@ -192,3 +192,13 @@ void thread_create_detach(void (*f)(void *), void *d) {
if (thread_create(thread_detach_func, dt, 1, NULL)) if (thread_create(thread_detach_func, dt, 1, NULL))
abort(); abort();
} }
unsigned int in6_addr_hash(const void *p) {
const struct in6_addr *a = p;
return a->s6_addr32[0] ^ a->s6_addr32[3];
}
int in6_addr_eq(const void *a, const void *b) {
const struct in6_addr *A = a, *B = b;
return !memcmp(A, B, sizeof(*A));
}

@ -144,7 +144,7 @@ INLINE u_int32_t in6_to_4(const struct in6_addr *a) {
return a->s6_addr32[3]; return a->s6_addr32[3];
} }
INLINE void smart_ntop(char *o, struct in6_addr *a, size_t len) { INLINE void smart_ntop(char *o, const struct in6_addr *a, size_t len) {
const char *r; const char *r;
if (IN6_IS_ADDR_V4MAPPED(a)) if (IN6_IS_ADDR_V4MAPPED(a))
@ -156,7 +156,7 @@ INLINE void smart_ntop(char *o, struct in6_addr *a, size_t len) {
*o = '\0'; *o = '\0';
} }
INLINE char *smart_ntop_p(char *o, struct in6_addr *a, size_t len) { INLINE char *smart_ntop_p(char *o, const struct in6_addr *a, size_t len) {
int l; int l;
if (IN6_IS_ADDR_V4MAPPED(a)) { if (IN6_IS_ADDR_V4MAPPED(a)) {
@ -178,7 +178,7 @@ INLINE char *smart_ntop_p(char *o, struct in6_addr *a, size_t len) {
} }
} }
INLINE void smart_ntop_port(char *o, struct sockaddr_in6 *a, size_t len) { INLINE void smart_ntop_port(char *o, const struct sockaddr_in6 *a, size_t len) {
char *e; char *e;
e = smart_ntop_p(o, &a->sin6_addr, len); e = smart_ntop_p(o, &a->sin6_addr, len);
@ -483,4 +483,9 @@ INLINE void g_queue_append(GQueue *dst, const GQueue *src) {
g_queue_push_tail(dst, l->data); g_queue_push_tail(dst, l->data);
} }
unsigned int in6_addr_hash(const void *p);
int in6_addr_eq(const void *a, const void *b);
#endif #endif

@ -1423,6 +1423,12 @@ struct callmaster *callmaster_new(struct poller *p) {
poller_add_timer(p, callmaster_timer, &c->obj); poller_add_timer(p, callmaster_timer, &c->obj);
mutex_init(&c->totalstats_lock);
c->totalstats.started = poller_now;
mutex_init(&c->cngs_lock);
c->cngs_hash = g_hash_table_new(in6_addr_hash, in6_addr_eq);
return c; return c;
fail: fail:
@ -2438,6 +2444,8 @@ void call_destroy(struct call *c) {
/* CDRs and statistics */ /* CDRs and statistics */
cdrbufcur += sprintf(cdrbufcur,"ci=%s, ",c->callid.s); cdrbufcur += sprintf(cdrbufcur,"ci=%s, ",c->callid.s);
cdrbufcur += sprintf(cdrbufcur,"created_from=%s, ", c->created_from); cdrbufcur += sprintf(cdrbufcur,"created_from=%s, ", c->created_from);
cdrbufcur += sprintf(cdrbufcur,"last_signal=%llu, ", (unsigned long long)c->last_signal);
cdrbufcur += sprintf(cdrbufcur,"tos=%u, ", (unsigned int)c->tos);
for (l = c->monologues; l; l = l->next) { for (l = c->monologues; l; l = l->next) {
ml = l->data; ml = l->data;
if (_log_facility_cdr) { if (_log_facility_cdr) {
@ -2486,26 +2494,34 @@ void call_destroy(struct call *c) {
"ml%i_midx%u_%s_local_relay_port=%u, " "ml%i_midx%u_%s_local_relay_port=%u, "
"ml%i_midx%u_%s_relayed_packets=%llu, " "ml%i_midx%u_%s_relayed_packets=%llu, "
"ml%i_midx%u_%s_relayed_bytes=%llu, " "ml%i_midx%u_%s_relayed_bytes=%llu, "
"ml%i_midx%u_%s_relayed_errors=%llu, ", "ml%i_midx%u_%s_relayed_errors=%llu, "
"ml%i_midx%u_%s_last_packet=%llu, ",
cdrlinecnt, md->index, protocol, buf, cdrlinecnt, md->index, protocol, buf,
cdrlinecnt, md->index, protocol, ps->endpoint.port, cdrlinecnt, md->index, protocol, ps->endpoint.port,
cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), cdrlinecnt, md->index, protocol, (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets, cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.packets,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes, cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.bytes,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors); cdrlinecnt, md->index, protocol, (unsigned long long) ps->stats.errors,
cdrlinecnt, md->index, protocol, (unsigned long long) ps->last_packet);
} }
ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, " ilog(LOG_INFO, "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e", "%llu p, %llu b, %llu e, %llu last_packet",
md->index, md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
buf, ps->endpoint.port, buf, ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets, (unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes, (unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors); (unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
mutex_lock(&m->totalstats_lock);
m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets; m->totalstats.total_relayed_packets += (unsigned long long) ps->stats.packets;
m->totalstats_interval.total_relayed_packets += (unsigned long long) ps->stats.packets;
m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors; m->totalstats.total_relayed_errors += (unsigned long long) ps->stats.errors;
m->totalstats_interval.total_relayed_errors += (unsigned long long) ps->stats.errors;
mutex_unlock(&m->totalstats_lock);
} }
} }
if (_log_facility_cdr) if (_log_facility_cdr)
@ -2513,7 +2529,11 @@ void call_destroy(struct call *c) {
} }
// --- for statistics getting one way stream or no relay at all // --- for statistics getting one way stream or no relay at all
mutex_lock(&m->totalstats_lock);
m->totalstats.total_nopacket_relayed_sess *= 2; m->totalstats.total_nopacket_relayed_sess *= 2;
m->totalstats_interval.total_nopacket_relayed_sess *= 2;
mutex_unlock(&m->totalstats_lock);
for (l = c->monologues; l; l = l->next) { for (l = c->monologues; l; l = l->next) {
ml = l->data; ml = l->data;
@ -2550,32 +2570,53 @@ void call_destroy(struct call *c) {
} }
} }
if (ps && ps2 && ps->stats.packets!=0 && ps2->stats.packets==0) if (ps && ps2 && ps2->stats.packets==0) {
m->totalstats.total_oneway_stream_sess++; mutex_lock(&m->totalstats_lock);
if (ps->stats.packets!=0) {
m->totalstats.total_oneway_stream_sess++;
m->totalstats_interval.total_oneway_stream_sess++;
}
else {
m->totalstats.total_nopacket_relayed_sess++;
m->totalstats_interval.total_nopacket_relayed_sess++;
}
mutex_unlock(&m->totalstats_lock);
}
}
if (ps && ps2 && ps->stats.packets==0 && ps2->stats.packets==0) mutex_lock(&m->totalstats_lock);
m->totalstats.total_nopacket_relayed_sess++;
}
m->totalstats.total_nopacket_relayed_sess /= 2; m->totalstats.total_nopacket_relayed_sess /= 2;
m->totalstats_interval.total_nopacket_relayed_sess /= 2;
m->totalstats.total_managed_sess += 1; m->totalstats.total_managed_sess += 1;
m->totalstats_interval.total_managed_sess += 1;
ml = c->monologues->data; ml = c->monologues->data;
if (ml->term_reason==TIMEOUT) { if (ml->term_reason==TIMEOUT) {
m->totalstats.total_timeout_sess++; m->totalstats.total_timeout_sess++;
m->totalstats_interval.total_timeout_sess++;
} else if (ml->term_reason==SILENT_TIMEOUT) { } else if (ml->term_reason==SILENT_TIMEOUT) {
m->totalstats.total_silent_timeout_sess++; m->totalstats.total_silent_timeout_sess++;
m->totalstats_interval.total_silent_timeout_sess++;
} else if (ml->term_reason==REGULAR) { } else if (ml->term_reason==REGULAR) {
m->totalstats.total_regular_term_sess++; m->totalstats.total_regular_term_sess++;
m->totalstats_interval.total_regular_term_sess++;
} else if (ml->term_reason==FORCED) { } else if (ml->term_reason==FORCED) {
m->totalstats.total_forced_term_sess++; m->totalstats.total_forced_term_sess++;
m->totalstats_interval.total_forced_term_sess++;
} }
timeval_multiply(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess-1); timeval_multiply(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess-1);
timeval_add(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,&tim_result_duration); timeval_add(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,&tim_result_duration);
timeval_devide(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess); timeval_devide(&m->totalstats.total_average_call_dur,&m->totalstats.total_average_call_dur,m->totalstats.total_managed_sess);
timeval_multiply(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess-1);
timeval_add(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,&tim_result_duration);
timeval_devide(&m->totalstats_interval.total_average_call_dur,&m->totalstats_interval.total_average_call_dur,m->totalstats_interval.total_managed_sess);
mutex_unlock(&m->totalstats_lock);
if (_log_facility_cdr) if (_log_facility_cdr)
/* log it */ /* log it */
cdrlog(cdrbuffer); cdrlog(cdrbuffer);

@ -11,7 +11,7 @@
#include <pcre.h> #include <pcre.h>
#include <openssl/x509.h> #include <openssl/x509.h>
#include "compat.h" #include "compat.h"
#include "control_ng.h"
enum termination_reason { enum termination_reason {
UNKNOWN=0, UNKNOWN=0,
@ -410,7 +410,12 @@ struct callmaster {
struct stats statsps; /* per second stats, running timer */ struct stats statsps; /* per second stats, running timer */
mutex_t statslock; mutex_t statslock;
struct stats stats; /* copied from statsps once a second */ struct stats stats; /* copied from statsps once a second */
mutex_t totalstats_lock; /* for both of them */
struct totalstats totalstats; struct totalstats totalstats;
struct totalstats totalstats_interval;
/* control_ng_stats stuff */
mutex_t cngs_lock;
GHashTable *cngs_hash;
struct poller *poller; struct poller *poller;
pcre *info_re; pcre *info_re;

@ -25,6 +25,9 @@ static const char* TRUNCATED = " ... Output truncated. Increase Output Buffer
static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen=0; int printlen=0;
mutex_lock(&m->totalstats_lock);
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nTotal statistics (does not include current running sessions):\n\n"); printlen = snprintf(replybuffer,(outbufend-replybuffer), "\nTotal statistics (does not include current running sessions):\n\n");
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Uptime of rtpengine :%llu seconds\n", (unsigned long long)time(NULL)-m->totalstats.started); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Uptime of rtpengine :%llu seconds\n", (unsigned long long)time(NULL)-m->totalstats.started);
@ -49,6 +52,42 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Average call duration :%ld.%06ld\n\n",m->totalstats.total_average_call_dur.tv_sec,m->totalstats.total_average_call_dur.tv_usec);
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
mutex_unlock(&m->totalstats_lock);
printlen = snprintf(replybuffer,(outbufend-replybuffer), "Control statistics:\n\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s \n",
"Proxy", "Offer", "Answer", "Delete", "Ping", "List", "Query", "Errors");
ADJUSTLEN(printlen,outbufend,replybuffer);
mutex_lock(&m->cngs_lock);
GList *list = g_hash_table_get_values(m->cngs_hash);
if (!list) {
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n No proxies have yet tried to send data.");
ADJUSTLEN(printlen,outbufend,replybuffer);
}
for (GList *l = list; l; l = l->next) {
struct control_ng_stats* cur = l->data;
char buf[128]; memset(&buf,0,128);
smart_ntop_p(buf, &(cur->proxy), sizeof(buf));
printlen = snprintf(replybuffer,(outbufend-replybuffer), " %10s | %10u | %10u | %10u | %10u | %10u | %10u | %10u \n",
buf,
cur->offer,
cur->answer,
cur->delete,
cur->ping,
cur->list,
cur->query,
cur->errors);
ADJUSTLEN(printlen,outbufend,replybuffer);
}
printlen = snprintf(replybuffer,(outbufend-replybuffer), "\n\n");
ADJUSTLEN(printlen,outbufend,replybuffer);
mutex_unlock(&m->cngs_lock);
g_list_free(list);
} }
static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
@ -81,7 +120,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
return; return;
} }
printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %30s | deletionmark:%4s | created:%12i | proxy:%s\n\n", c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from); printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %30s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu\n\n",
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal);
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
for (l = c->monologues; l; l = l->next) { for (l = c->monologues; l; l = l->next) {
@ -113,14 +153,15 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
smart_ntop_p(buf, &ps->endpoint.ip46, sizeof(buf)); smart_ntop_p(buf, &ps->endpoint.ip46, sizeof(buf));
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, " printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5hu%s, "
"%llu p, %llu b, %llu e\n", "%llu p, %llu b, %llu e, %llu last_packet\n",
md->index, md->index,
(unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0), (unsigned int) (ps->sfd ? ps->sfd->fd.localport : 0),
buf, ps->endpoint.port, buf, ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "", (!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
(unsigned long long) ps->stats.packets, (unsigned long long) ps->stats.packets,
(unsigned long long) ps->stats.bytes, (unsigned long long) ps->stats.bytes,
(unsigned long long) ps->stats.errors); (unsigned long long) ps->stats.errors,
(unsigned long long) ps->last_packet);
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
} }
} }

@ -56,6 +56,24 @@ static void pretty_print(bencode_item_t *el, GString *s) {
} }
} }
struct control_ng_stats* get_control_ng_stats(struct control_ng* c, const struct in6_addr *addr) {
struct callmaster *m = c->callmaster;
struct control_ng_stats* cur;
mutex_lock(&m->cngs_lock);
cur = g_hash_table_lookup(m->cngs_hash, addr);
if (!cur) {
cur = g_slice_alloc0(sizeof(struct control_ng_stats));
cur->proxy = *addr;
char buf[128]; memset(&buf,0,128);
smart_ntop_p(buf, addr, sizeof(buf));
ilog(LOG_DEBUG,"Adding a proxy for control ng stats:%s",buf);
g_hash_table_insert(m->cngs_hash, &cur->proxy, cur);
}
mutex_unlock(&m->cngs_lock);
return cur;
}
static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *sin, char *addr) { static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *sin, char *addr) {
struct control_ng *c = (void *) obj; struct control_ng *c = (void *) obj;
bencode_buffer_t bencbuf; bencode_buffer_t bencbuf;
@ -66,6 +84,8 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *
struct iovec iov[3]; struct iovec iov[3];
GString *log_str; GString *log_str;
struct control_ng_stats* cur = get_control_ng_stats(c,&sin->sin6_addr);
str_chr_str(&data, buf, ' '); str_chr_str(&data, buf, ' ');
if (!data.s || data.s == buf->s) { if (!data.s || data.s == buf->s) {
ilog(LOG_WARNING, "Received invalid data on NG port (no cookie) from %s: "STR_FORMAT, addr, STR_FMT(buf)); ilog(LOG_WARNING, "Received invalid data on NG port (no cookie) from %s: "STR_FORMAT, addr, STR_FMT(buf));
@ -108,19 +128,31 @@ static void control_ng_incoming(struct obj *obj, str *buf, struct sockaddr_in6 *
g_string_free(log_str, TRUE); g_string_free(log_str, TRUE);
errstr = NULL; errstr = NULL;
if (!str_cmp(&cmd, "ping")) if (!str_cmp(&cmd, "ping")) {
bencode_dictionary_add_string(resp, "result", "pong"); bencode_dictionary_add_string(resp, "result", "pong");
else if (!str_cmp(&cmd, "offer")) g_atomic_int_inc(&cur->ping);
}
else if (!str_cmp(&cmd, "offer")) {
errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin); errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin);
else if (!str_cmp(&cmd, "answer")) g_atomic_int_inc(&cur->offer);
}
else if (!str_cmp(&cmd, "answer")) {
errstr = call_answer_ng(dict, c->callmaster, resp); errstr = call_answer_ng(dict, c->callmaster, resp);
else if (!str_cmp(&cmd, "delete")) g_atomic_int_inc(&cur->answer);
}
else if (!str_cmp(&cmd, "delete")) {
errstr = call_delete_ng(dict, c->callmaster, resp); errstr = call_delete_ng(dict, c->callmaster, resp);
else if (!str_cmp(&cmd, "query")) g_atomic_int_inc(&cur->delete);
}
else if (!str_cmp(&cmd, "query")) {
errstr = call_query_ng(dict, c->callmaster, resp); errstr = call_query_ng(dict, c->callmaster, resp);
g_atomic_int_inc(&cur->query);
}
#if GLIB_CHECK_VERSION(2,16,0) #if GLIB_CHECK_VERSION(2,16,0)
else if (!str_cmp(&cmd, "list")) else if (!str_cmp(&cmd, "list")) {
errstr = call_list_ng(dict, c->callmaster, resp); errstr = call_list_ng(dict, c->callmaster, resp);
g_atomic_int_inc(&cur->list);
}
#endif #endif
else else
errstr = "Unrecognized command"; errstr = "Unrecognized command";
@ -134,6 +166,7 @@ err_send:
ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data)); ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data));
bencode_dictionary_add_string(resp, "result", "error"); bencode_dictionary_add_string(resp, "result", "error");
bencode_dictionary_add_string(resp, "error-reason", errstr); bencode_dictionary_add_string(resp, "error-reason", errstr);
g_atomic_int_inc(&cur->errors);
goto send_resp; goto send_resp;
send_resp: send_resp:

@ -9,6 +9,17 @@
struct poller; struct poller;
struct callmaster; struct callmaster;
struct control_ng_stats {
struct in6_addr proxy;
int ping;
int offer;
int answer;
int delete;
int query;
int list;
int errors;
};
struct control_ng { struct control_ng {
struct obj obj; struct obj obj;
struct callmaster *callmaster; struct callmaster *callmaster;
@ -18,5 +29,4 @@ struct control_ng {
struct control_ng *control_ng_new(struct poller *, struct in6_addr, u_int16_t, struct callmaster *); struct control_ng *control_ng_new(struct poller *, struct in6_addr, u_int16_t, struct callmaster *);
#endif #endif

@ -0,0 +1,146 @@
/*
* graphite.c
*
* Created on: Jan 19, 2015
* Author: fmetz
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "log.h"
#include "call.h"
static int graphite_sock=-1;
static u_int32_t graphite_ipaddress;
static int graphite_port=0;
static struct callmaster* cm=0;
//struct totalstats totalstats_prev;
static time_t g_now, next_run;
int connect_to_graphite_server(u_int32_t ipaddress, int port) {
graphite_sock=-1;
//int reconnect=0;
int rc=0;
struct sockaddr_in sin;
memset(&sin,0,sizeof(sin));
int val=1;
graphite_ipaddress = ipaddress;
graphite_port = port;
rc = graphite_sock = socket(AF_INET, SOCK_STREAM,0);
if(rc<0) {
ilog(LOG_ERROR,"Couldn't make socket for connecting to graphite.");
return -1;
}
sin.sin_family=AF_INET;
sin.sin_addr.s_addr=graphite_ipaddress;
sin.sin_port=htons(graphite_port);
rc = setsockopt(graphite_sock,SOL_SOCKET,SO_REUSEADDR, &val,sizeof(val));
if(rc<0) {
ilog(LOG_ERROR,"Couldn't set sockopt for graphite descriptor.");
goto error;
}
struct in_addr ip;
ip.s_addr = graphite_ipaddress;
ilog(LOG_INFO, "Connecting to graphite server %s at port:%i with fd:%i",inet_ntoa(ip),graphite_port,graphite_sock);
rc = connect(graphite_sock, (struct sockaddr *)&sin, sizeof(sin));
if (rc==-1) {
ilog(LOG_ERROR, "Connection could not be established. Trying again next time of graphite-interval.");
goto error;
}
ilog(LOG_INFO, "Graphite server connected.");
return graphite_sock;
error:
close(graphite_sock);
graphite_sock = -1;
return -1;
}
int send_graphite_data() {
int rc=0;
if (graphite_sock < 0) {
ilog(LOG_ERROR,"Graphite socket is not connected.");
return -1;
}
// format hostname "." totals.subkey SPACE value SPACE timestamp
char hostname[256]; memset(&hostname,0,256);
rc = gethostname(hostname,256);
if (rc<0) {
ilog(LOG_ERROR, "Could not retrieve host name information.");
goto error;
}
char data_to_send[8192]; memset(&data_to_send,0,8192);
char* ptr = data_to_send;
mutex_lock(&cm->totalstats_lock);
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_sec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.average_call_dur.tv_usec %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.forced_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_forced_term_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.managed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_managed_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_nopacket_relayed_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.oneway_stream_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_oneway_stream_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.regular_term_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_regular_term_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.relayed_errors %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_errors,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.relayed_packets %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_relayed_packets,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.silent_timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_silent_timeout_sess,(unsigned long long)g_now); ptr += rc;
rc = sprintf(ptr,"%s.totals.timeout_sess %llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_timeout_sess,(unsigned long long)g_now); ptr += rc;
ZERO(cm->totalstats_interval);
mutex_unlock(&cm->totalstats_lock);
rc = write(graphite_sock, data_to_send, strlen(data_to_send));
if (rc<0) {
ilog(LOG_ERROR,"Could not write to graphite socket. Disconnecting graphite server.");
goto error;
}
return 0;
error:
close(graphite_sock); graphite_sock=-1;
return -1;
}
void graphite_loop_run(struct callmaster* callmaster, int seconds) {
int rc=0;
g_now = time(NULL);
if (g_now < next_run)
goto sleep;
next_run = g_now + seconds;
if (!cm)
cm = callmaster;
if (graphite_sock < 0) {
rc = connect_to_graphite_server(graphite_ipaddress, graphite_port);
}
if (rc>=0) {
rc = send_graphite_data();
if (rc<0) {
ilog(LOG_ERROR,"Sending graphite data failed.");
}
}
sleep:
usleep(100000);
}

@ -0,0 +1,17 @@
/*
* graphite.h
*
* Created on: Jan 19, 2015
* Author: fmetz
*/
#ifndef GRAPHITE_H_
#define GRAPHITE_H_
#include "call.h"
int connect_to_graphite_server(u_int32_t ipaddress, int port);
int send_graphite_data();
void graphite_loop_run(struct callmaster* cm, int seconds);
#endif /* GRAPHITE_H_ */

@ -26,7 +26,7 @@
#include "dtls.h" #include "dtls.h"
#include "call_interfaces.h" #include "call_interfaces.h"
#include "cli.h" #include "cli.h"
#include "graphite.h"
@ -108,7 +108,9 @@ static char *b2b_url;
static enum xmlrpc_format xmlrpc_fmt = XF_SEMS; static enum xmlrpc_format xmlrpc_fmt = XF_SEMS;
static int num_threads; static int num_threads;
static int delete_delay = 30; static int delete_delay = 30;
static u_int32_t graphite_ip = 0;
static u_int16_t graphite_port;
static int graphite_interval = 0;
static void sighandler(gpointer x) { static void sighandler(gpointer x) {
sigset_t ss; sigset_t ss;
@ -254,6 +256,7 @@ static void options(int *argc, char ***argv) {
char *listenudps = NULL; char *listenudps = NULL;
char *listenngs = NULL; char *listenngs = NULL;
char *listencli = NULL; char *listencli = NULL;
char *graphitep = NULL;
char *redisps = NULL; char *redisps = NULL;
char *log_facility_s = NULL; char *log_facility_s = NULL;
char *log_facility_cdr_s = NULL; char *log_facility_cdr_s = NULL;
@ -269,6 +272,8 @@ static void options(int *argc, char ***argv) {
{ "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" }, { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46:]PORT" },
{ "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46:]PORT" },
{ "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46:]PORT" },
{ "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "[IP46:]PORT" },
{ "graphite-interval", 'w', 0, G_OPTION_ARG_INT, &graphite_interval, "Graphite send interval in seconds", "INT" },
{ "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" }, { "tos", 'T', 0, G_OPTION_ARG_INT, &tos, "Default TOS value to set on streams", "INT" },
{ "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" }, { "timeout", 'o', 0, G_OPTION_ARG_INT, &timeout, "RTP timeout", "SECS" },
{ "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" }, { "silent-timeout",'s',0,G_OPTION_ARG_INT, &silent_timeout,"RTP timeout for muted", "SECS" },
@ -331,6 +336,10 @@ static void options(int *argc, char ***argv) {
die("Invalid IP or port (--listen-cli)"); die("Invalid IP or port (--listen-cli)");
} }
if (graphitep) {if (parse_ip_port(&graphite_ip, &graphite_port, graphitep))
die("Invalid IP or port (--graphite)");
}
if (tos < 0 || tos > 255) if (tos < 0 || tos > 255)
die("Invalid TOS value"); die("Invalid TOS value");
@ -605,9 +614,6 @@ no_kernel:
ctx->m->conf = mc; ctx->m->conf = mc;
callmaster_config_init(ctx->m); callmaster_config_init(ctx->m);
ZERO(ctx->m->totalstats);
ctx->m->totalstats.started = time(NULL);
if (!foreground) if (!foreground)
daemonize(); daemonize();
wpidfile(); wpidfile();
@ -623,6 +629,20 @@ static void timer_loop(void *d) {
poller_timers_wait_run(p, 100); poller_timers_wait_run(p, 100);
} }
static void graphite_loop(void *d) {
struct callmaster *cm = d;
if (!graphite_interval) {
ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second.");
graphite_interval=1;
}
connect_to_graphite_server(graphite_ip,graphite_port);
while (!global_shutdown)
graphite_loop_run(cm,graphite_interval); // time in seconds
}
static void poller_loop(void *d) { static void poller_loop(void *d) {
struct poller *p = d; struct poller *p = d;
@ -642,6 +662,8 @@ int main(int argc, char **argv) {
thread_create_detach(sighandler, NULL); thread_create_detach(sighandler, NULL);
thread_create_detach(timer_loop, ctx.p); thread_create_detach(timer_loop, ctx.p);
if (graphite_ip)
thread_create_detach(graphite_loop, ctx.m);
if (num_threads < 1) { if (num_threads < 1) {
#ifdef _SC_NPROCESSORS_ONLN #ifdef _SC_NPROCESSORS_ONLN

@ -22,4 +22,6 @@ TABLE=0
# LOG_FACILITY=daemon # LOG_FACILITY=daemon
# LOG_FACILITY_CDR=daemon # LOG_FACILITY_CDR=daemon
# NUM_THREADS=5 # NUM_THREADS=5
# DELETE_DELAY=30 # DELETE_DELAY=30
# GRAPHITE=9006
# GRAPHITE_INTERVAL=60

@ -72,6 +72,8 @@ OPTIONS="$OPTIONS --table=$TABLE"
[ -z "$LOG_FACILITY_CDR" ] || OPTIONS="$OPTIONS --log-facility-cdr=$LOG_FACILITY_CDR" [ -z "$LOG_FACILITY_CDR" ] || OPTIONS="$OPTIONS --log-facility-cdr=$LOG_FACILITY_CDR"
[ -z "$NUM_THREADS" ] || OPTIONS="$OPTIONS --num-threads=$NUM_THREADS" [ -z "$NUM_THREADS" ] || OPTIONS="$OPTIONS --num-threads=$NUM_THREADS"
[ -z "$DELETE_DELAY" ] || OPTIONS="$OPTIONS --delete-delay=$DELETE_DELAY" [ -z "$DELETE_DELAY" ] || OPTIONS="$OPTIONS --delete-delay=$DELETE_DELAY"
[ -z "$GRAPHITE" ] || OPTIONS="$OPTIONS --graphite=$GRAPHITE"
[ -z "$GRAPHITE_INTERVAL" ] || OPTIONS="$OPTIONS --graphite-interval=$GRAPHITE_INTERVAL"
if test "$FORK" = "no" ; then if test "$FORK" = "no" ; then
OPTIONS="$OPTIONS --foreground" OPTIONS="$OPTIONS --foreground"
fi fi

@ -143,6 +143,16 @@ build_opts() {
OPTS+=" --delete-delay=$DELETE_DELAY" OPTS+=" --delete-delay=$DELETE_DELAY"
fi fi
if [[ -n "$GRAPHITE" ]]
then
OPTS+=" --graphite=$GRAPHITE"
fi
if [[ -n "$GRAPHITE_INTERVAL" ]]
then
OPTS+=" --graphite-interval=$GRAPHITE_INTERVAL"
fi
if [[ -n "$LOG_FACILITY_CDR" ]] if [[ -n "$LOG_FACILITY_CDR" ]]
then then
OPTS+=" --log-facility-cdr=$LOG_FACILITY_CDR" OPTS+=" --log-facility-cdr=$LOG_FACILITY_CDR"

Loading…
Cancel
Save