Merge branch '1and1-dev-lbalaceanu'

changes/69/2669/1
Richard Fuchs 10 years ago
commit 79b78e6fd4

@ -1522,7 +1522,12 @@ struct callmaster *callmaster_new(struct poller *p) {
mutex_init(&c->totalstats.total_average_lock);
mutex_init(&c->totalstats_interval.total_average_lock);
mutex_init(&c->totalstats_interval.managed_sess_lock);
mutex_init(&c->totalstats_interval.total_calls_duration_lock);
c->totalstats.started = poller_now;
c->totalstats_interval.managed_sess_min = 0;
c->totalstats_interval.managed_sess_max = 0;
mutex_init(&c->cngs_lock);
c->cngs_hash = g_hash_table_new(in6_addr_hash, in6_addr_eq);
@ -2683,6 +2688,23 @@ static void timeval_totalstats_average_add(struct totalstats *s, const struct ti
mutex_unlock(&s->total_average_lock);
}
static void timeval_totalstats_interval_call_duration_add(struct totalstats *s,
const struct timeval *call_start, const struct timeval *call_stop,
const struct timeval *interval_start) {
struct timeval call_duration;
struct timeval const *call_start_in_interval = call_start;
if (timercmp(interval_start, call_start, >))
call_start_in_interval = interval_start;
timeval_subtract(&call_duration, call_stop, call_start_in_interval);
mutex_lock(&s->total_calls_duration_lock);
timeval_add(&s->total_calls_duration_interval,
&s->total_calls_duration_interval, &call_duration);
mutex_unlock(&s->total_calls_duration_lock);
}
static int __rtp_stats_sort(const void *ap, const void *bp) {
const struct rtp_stats *a = ap, *b = bp;
@ -2724,6 +2746,44 @@ out:
return rtp_pt; /* may be NULL */
}
void add_total_calls_duration_in_interval(struct callmaster *cm,
struct timeval *interval_tv) {
struct timeval ongoing_calls_dur = add_ongoing_calls_dur_in_interval(cm,
&cm->latest_graphite_interval_start, interval_tv);
mutex_lock(&cm->totalstats_interval.total_calls_duration_lock);
timeval_add(&cm->totalstats_interval.total_calls_duration_interval,
&cm->totalstats_interval.total_calls_duration_interval,
&ongoing_calls_dur);
mutex_unlock(&cm->totalstats_interval.total_calls_duration_lock);
}
struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
struct timeval *interval_start, struct timeval *interval_duration) {
GHashTableIter iter;
gpointer key, value;
struct timeval call_duration, now, res = {0};
struct call *call;
struct call_monologue *ml;
rwlock_lock_r(&m->hashlock);
g_hash_table_iter_init(&iter, m->callhash);
while (g_hash_table_iter_next(&iter, &key, &value)) {
call = (struct call*) value;
ml = call->monologues->data;
if (timercmp(interval_start, &ml->started, >)) {
timeval_add(&res, &res, interval_duration);
} else {
gettimeofday(&now, NULL);
timeval_subtract(&call_duration, &now, &ml->started);
timeval_add(&res, &res, &call_duration);
}
}
rwlock_unlock_r(&m->hashlock);
return res;
}
/* called lock-free, but must hold a reference to the call */
void call_destroy(struct call *c) {
struct callmaster *m = c->callmaster;
@ -2747,6 +2807,12 @@ void call_destroy(struct call *c) {
ret = g_hash_table_remove(m->callhash, &c->callid);
rwlock_unlock_w(&m->hashlock);
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats.managed_sess_crt--;
m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_crt,
m->totalstats_interval.managed_sess_min);
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
if (!ret)
return;
@ -3020,6 +3086,8 @@ void call_destroy(struct call *c) {
timeval_totalstats_average_add(&m->totalstats, &tim_result_duration);
timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration);
timeval_totalstats_interval_call_duration_add(&m->totalstats_interval,
&ml->started, &g_now, &m->latest_graphite_interval_start);
}
@ -3220,6 +3288,11 @@ restart:
goto restart;
}
g_hash_table_insert(m->callhash, &c->callid, obj_get(c));
mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats.managed_sess_crt++;
m->totalstats_interval.managed_sess_max = MAX(m->totalstats_interval.managed_sess_max,
m->totalstats.managed_sess_crt);
mutex_unlock(&m->totalstats_interval.managed_sess_lock);
rwlock_lock_w(&c->master_lock);
rwlock_unlock_w(&m->hashlock);
}

@ -13,6 +13,7 @@
#include <sys/time.h>
#include <pcre.h>
#include <openssl/x509.h>
#include <limits.h>
#include "compat.h"
#include "control_ng.h"
#include "aux.h"
@ -66,10 +67,6 @@ enum call_stream_state {
CSS_RUNNING,
};
#include "obj.h"
#include "aux.h"
#include "bencode.h"
@ -223,7 +220,7 @@ struct stats {
struct totalstats {
time_t started;
atomic64 total_timeout_sess;
atomic64 total_rejected_sess;
atomic64 total_rejected_sess;
atomic64 total_silent_timeout_sess;
atomic64 total_regular_term_sess;
atomic64 total_forced_term_sess;
@ -234,7 +231,15 @@ struct totalstats {
mutex_t total_average_lock; /* for these two below */
u_int64_t total_managed_sess;
struct timeval total_average_call_dur;
struct timeval total_average_call_dur;
mutex_t managed_sess_lock; /* for these below */
u_int64_t managed_sess_crt;
u_int64_t managed_sess_max; /* per graphite interval statistic */
u_int64_t managed_sess_min; /* per graphite interval statistic */
mutex_t total_calls_duration_lock; /* for these two below */
struct timeval total_calls_duration_interval;
};
struct udp_fd {
@ -385,7 +390,7 @@ struct call {
struct callmaster *callmaster; /* RO */
mutex_t buffer_lock;
call_buffer_t buffer;
call_buffer_t buffer;
GQueue rtp_bridge_ports;
/* everything below protected by master_lock */
@ -402,7 +407,7 @@ struct call {
time_t last_signal;
time_t deleted;
time_t ml_deleted;
unsigned char tos;
unsigned char tos;
char *created_from;
struct sockaddr_in6 created_from_addr;
};
@ -426,7 +431,7 @@ struct interface_address {
struct callmaster_config {
int kernelfd;
int kernelid;
GQueue *interfaces; /* struct interface_address */
GQueue *interfaces; /* struct interface_address */
int port_min;
int port_max;
int max_sessions;
@ -471,6 +476,7 @@ struct callmaster {
pcre_extra *streams_ree;
struct callmaster_config conf;
struct timeval latest_graphite_interval_start;
};
struct call_stats {
@ -484,7 +490,8 @@ struct callmaster *callmaster_new(struct poller *);
void callmaster_config_init(struct callmaster *);
void stream_msg_mh_src(struct packet_stream *, struct msghdr *);
void callmaster_get_all_calls(struct callmaster *m, GQueue *q);
struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
struct timeval *iv_start, struct timeval *iv_duration);
void calls_dump_redis(struct callmaster *);
struct call_monologue *__monologue_create(struct call *call);
@ -520,7 +527,7 @@ INLINE struct interface_address *get_interface_from_address(struct local_interfa
struct interface_address *get_any_interface_address(struct local_interface *lif, int family);
const struct transport_protocol *transport_protocol(const str *s);
void add_total_calls_duration_in_interval(struct callmaster *cm, struct timeval *interval_tv);
INLINE void *call_malloc(struct call *c, size_t l) {

@ -28,6 +28,11 @@ static struct callmaster* cm=0;
static time_t next_run;
// HEAD: static time_t g_now, next_run;
static char* graphite_prefix = NULL;
static struct timeval graphite_interval_tv;
void set_graphite_interval_tv(struct timeval *tv) {
graphite_interval_tv = *tv;
}
void set_prefix(char* prefix) {
graphite_prefix = prefix;
@ -126,13 +131,36 @@ int send_graphite_data() {
ZERO(cm->totalstats_interval.total_managed_sess);
mutex_unlock(&cm->totalstats_interval.total_average_lock);
mutex_lock(&cm->totalstats_interval.total_calls_duration_lock);
ts.total_calls_duration_interval = cm->totalstats_interval.total_calls_duration_interval;
cm->totalstats_interval.total_calls_duration_interval.tv_sec = 0;
cm->totalstats_interval.total_calls_duration_interval.tv_usec = 0;
//ZERO(cm->totalstats_interval.total_calls_duration_interval);
mutex_unlock(&cm->totalstats_interval.total_calls_duration_lock);
rwlock_lock_r(&cm->hashlock);
mutex_lock(&cm->totalstats_interval.managed_sess_lock);
ts.managed_sess_max = cm->totalstats_interval.managed_sess_max;
ts.managed_sess_min = cm->totalstats_interval.managed_sess_min;
cm->totalstats_interval.managed_sess_max = cm->totalstats.managed_sess_crt;
cm->totalstats_interval.managed_sess_min = cm->totalstats.managed_sess_crt;
mutex_unlock(&cm->totalstats_interval.managed_sess_lock);
rwlock_unlock_r(&cm->hashlock);
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr, "%s.totals.call_dur %llu.%06llu %llu\n",hostname,(unsigned long long)ts.total_calls_duration_interval.tv_sec,(unsigned long long)ts.total_calls_duration_interval.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.average_call_dur %llu.%06llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc;
rc = sprintf(ptr,"%s.totals.average_call_dur %llu.%06llu %llu\n",hostname,(unsigned long long)ts.total_average_call_dur.tv_sec,(unsigned long long)ts.total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.forced_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_forced_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.managed_sess "UINT64F" %llu\n",hostname, ts.total_managed_sess,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.managed_sess_min "UINT64F" %llu\n",hostname, ts.managed_sess_min,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.managed_sess_max "UINT64F" %llu\n",hostname, ts.managed_sess_max,(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_nopacket_relayed_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.oneway_stream_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_oneway_stream_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
@ -149,6 +177,12 @@ int send_graphite_data() {
if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; }
rc = sprintf(ptr,"%s.totals.reject_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_rejected_sess),(unsigned long long)g_now.tv_sec); ptr += rc;
ilog(LOG_DEBUG, "min_sessions:%u max_sessions:%u, call_dur_per_interval:%llu.%06llu at time %llu\n",
ts.managed_sess_min, ts.managed_sess_max,
(unsigned long long ) ts.total_calls_duration_interval.tv_sec,
(unsigned long long ) ts.total_calls_duration_interval.tv_usec,
(unsigned long long ) g_now.tv_sec);
rc = write(graphite_sock, data_to_send, ptr - data_to_send);
if (rc<0) {
ilog(LOG_ERROR,"Could not write to graphite socket. Disconnecting graphite server.");
@ -225,7 +259,9 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) {
}
if (graphite_sock>0 && !connectinprogress) {
add_total_calls_duration_in_interval(cm, &graphite_interval_tv);
rc = send_graphite_data();
gettimeofday(&cm->latest_graphite_interval_start, NULL);
if (rc<0) {
ilog(LOG_ERROR,"Sending graphite data failed.");
}

@ -15,5 +15,6 @@ int send_graphite_data();
void graphite_loop_run(struct callmaster* cm, int seconds);
void set_prefix(char* prefix);
void graphite_loop(void *d);
void set_latest_graphite_interval_start(struct timeval *tv);
void set_graphite_interval_tv(struct timeval *tv);
#endif /* GRAPHITE_H_ */

@ -535,6 +535,7 @@ void create_everything(struct main_context *ctx) {
int kfd = -1;
void *dlh;
const char **strp;
struct timeval tmp_tv;
if (table < 0)
goto no_kernel;
@ -638,6 +639,11 @@ no_kernel:
if (redis_restore(ctx->m, mc.redis))
die("Refusing to continue without working Redis database");
gettimeofday(&ctx->m->latest_graphite_interval_start, NULL);
timeval_from_ms(&tmp_tv, graphite_interval*1000000);
set_graphite_interval_tv(&tmp_tv);
}
int main(int argc, char **argv) {

Loading…
Cancel
Save