From f864da77c5f82a951fd884862690a02329ddf48b Mon Sep 17 00:00:00 2001 From: Lucian Balaceanu Date: Fri, 28 Aug 2015 14:34:24 +0300 Subject: [PATCH] Adding per graphite interval statistics: min/max concurrent calls, total call time per interval --- daemon/call.c | 73 +++++++++++++++++++++++++++++++++++++++++++++++ daemon/call.h | 29 ++++++++++++------- daemon/graphite.c | 38 +++++++++++++++++++++++- daemon/graphite.h | 3 +- daemon/main.c | 6 ++++ 5 files changed, 136 insertions(+), 13 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index c1f2a62f1..b951061c8 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -1522,7 +1522,12 @@ struct callmaster *callmaster_new(struct poller *p) { mutex_init(&c->totalstats.total_average_lock); mutex_init(&c->totalstats_interval.total_average_lock); + mutex_init(&c->totalstats_interval.managed_sess_lock); + mutex_init(&c->totalstats_interval.total_calls_duration_lock); + c->totalstats.started = poller_now; + c->totalstats_interval.managed_sess_min = 0; + c->totalstats_interval.managed_sess_max = 0; mutex_init(&c->cngs_lock); c->cngs_hash = g_hash_table_new(in6_addr_hash, in6_addr_eq); @@ -2683,6 +2688,23 @@ static void timeval_totalstats_average_add(struct totalstats *s, const struct ti mutex_unlock(&s->total_average_lock); } +static void timeval_totalstats_interval_call_duration_add(struct totalstats *s, + const struct timeval *call_start, const struct timeval *call_stop, + const struct timeval *interval_start) { + struct timeval call_duration; + struct timeval const *call_start_in_interval = call_start; + + if (timercmp(interval_start, call_start, >)) + call_start_in_interval = interval_start; + + timeval_subtract(&call_duration, call_stop, call_start_in_interval); + + mutex_lock(&s->total_calls_duration_lock); + timeval_add(&s->total_calls_duration_interval, + &s->total_calls_duration_interval, &call_duration); + mutex_unlock(&s->total_calls_duration_lock); +} + static int __rtp_stats_sort(const void *ap, const void *bp) { const struct rtp_stats *a = ap, *b = bp; @@ -2724,6 +2746,44 @@ out: return rtp_pt; /* may be NULL */ } +void add_total_calls_duration_in_interval(struct callmaster *cm, + struct timeval *interval_tv) { + struct timeval ongoing_calls_dur = add_ongoing_calls_dur_in_interval(cm, + &cm->latest_graphite_interval_start, interval_tv); + + mutex_lock(&cm->totalstats_interval.total_calls_duration_lock); + timeval_add(&cm->totalstats_interval.total_calls_duration_interval, + &cm->totalstats_interval.total_calls_duration_interval, + &ongoing_calls_dur); + mutex_unlock(&cm->totalstats_interval.total_calls_duration_lock); +} + +struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, + struct timeval *interval_start, struct timeval *interval_duration) { + GHashTableIter iter; + gpointer key, value; + struct timeval call_duration, now, res = {0}; + struct call *call; + struct call_monologue *ml; + + rwlock_lock_r(&m->hashlock); + g_hash_table_iter_init(&iter, m->callhash); + + while (g_hash_table_iter_next(&iter, &key, &value)) { + call = (struct call*) value; + ml = call->monologues->data; + if (timercmp(interval_start, &ml->started, >)) { + timeval_add(&res, &res, interval_duration); + } else { + gettimeofday(&now, NULL); + timeval_subtract(&call_duration, &now, &ml->started); + timeval_add(&res, &res, &call_duration); + } + } + rwlock_unlock_r(&m->hashlock); + return res; +} + /* called lock-free, but must hold a reference to the call */ void call_destroy(struct call *c) { struct callmaster *m = c->callmaster; @@ -2747,6 +2807,12 @@ void call_destroy(struct call *c) { ret = g_hash_table_remove(m->callhash, &c->callid); rwlock_unlock_w(&m->hashlock); + mutex_lock(&m->totalstats_interval.managed_sess_lock); + m->totalstats.managed_sess_crt--; + m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_crt, + m->totalstats_interval.managed_sess_min); + mutex_unlock(&m->totalstats_interval.managed_sess_lock); + if (!ret) return; @@ -3020,6 +3086,8 @@ void call_destroy(struct call *c) { timeval_totalstats_average_add(&m->totalstats, &tim_result_duration); timeval_totalstats_average_add(&m->totalstats_interval, &tim_result_duration); + timeval_totalstats_interval_call_duration_add(&m->totalstats_interval, + &ml->started, &g_now, &m->latest_graphite_interval_start); } @@ -3220,6 +3288,11 @@ restart: goto restart; } g_hash_table_insert(m->callhash, &c->callid, obj_get(c)); + mutex_lock(&m->totalstats_interval.managed_sess_lock); + m->totalstats.managed_sess_crt++; + m->totalstats_interval.managed_sess_max = MAX(m->totalstats_interval.managed_sess_max, + m->totalstats.managed_sess_crt); + mutex_unlock(&m->totalstats_interval.managed_sess_lock); rwlock_lock_w(&c->master_lock); rwlock_unlock_w(&m->hashlock); } diff --git a/daemon/call.h b/daemon/call.h index 64123eebc..53fef6156 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -13,6 +13,7 @@ #include #include #include +#include #include "compat.h" #include "control_ng.h" #include "aux.h" @@ -66,10 +67,6 @@ enum call_stream_state { CSS_RUNNING, }; - - - - #include "obj.h" #include "aux.h" #include "bencode.h" @@ -223,7 +220,7 @@ struct stats { struct totalstats { time_t started; atomic64 total_timeout_sess; - atomic64 total_rejected_sess; + atomic64 total_rejected_sess; atomic64 total_silent_timeout_sess; atomic64 total_regular_term_sess; atomic64 total_forced_term_sess; @@ -234,7 +231,15 @@ struct totalstats { mutex_t total_average_lock; /* for these two below */ u_int64_t total_managed_sess; - struct timeval total_average_call_dur; + struct timeval total_average_call_dur; + + mutex_t managed_sess_lock; /* for these below */ + u_int64_t managed_sess_crt; + u_int64_t managed_sess_max; /* per graphite interval statistic */ + u_int64_t managed_sess_min; /* per graphite interval statistic */ + + mutex_t total_calls_duration_lock; /* for these two below */ + struct timeval total_calls_duration_interval; }; struct udp_fd { @@ -385,7 +390,7 @@ struct call { struct callmaster *callmaster; /* RO */ mutex_t buffer_lock; - call_buffer_t buffer; + call_buffer_t buffer; GQueue rtp_bridge_ports; /* everything below protected by master_lock */ @@ -402,7 +407,7 @@ struct call { time_t last_signal; time_t deleted; time_t ml_deleted; - unsigned char tos; + unsigned char tos; char *created_from; struct sockaddr_in6 created_from_addr; }; @@ -426,7 +431,7 @@ struct interface_address { struct callmaster_config { int kernelfd; int kernelid; - GQueue *interfaces; /* struct interface_address */ + GQueue *interfaces; /* struct interface_address */ int port_min; int port_max; int max_sessions; @@ -471,6 +476,7 @@ struct callmaster { pcre_extra *streams_ree; struct callmaster_config conf; + struct timeval latest_graphite_interval_start; }; struct call_stats { @@ -484,7 +490,8 @@ struct callmaster *callmaster_new(struct poller *); void callmaster_config_init(struct callmaster *); void stream_msg_mh_src(struct packet_stream *, struct msghdr *); void callmaster_get_all_calls(struct callmaster *m, GQueue *q); - +struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, + struct timeval *iv_start, struct timeval *iv_duration); void calls_dump_redis(struct callmaster *); struct call_monologue *__monologue_create(struct call *call); @@ -520,7 +527,7 @@ INLINE struct interface_address *get_interface_from_address(struct local_interfa struct interface_address *get_any_interface_address(struct local_interface *lif, int family); const struct transport_protocol *transport_protocol(const str *s); - +void add_total_calls_duration_in_interval(struct callmaster *cm, struct timeval *interval_tv); INLINE void *call_malloc(struct call *c, size_t l) { diff --git a/daemon/graphite.c b/daemon/graphite.c index 1b505f95f..b3f86972b 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -28,6 +28,11 @@ static struct callmaster* cm=0; static time_t next_run; // HEAD: static time_t g_now, next_run; static char* graphite_prefix = NULL; +static struct timeval graphite_interval_tv; + +void set_graphite_interval_tv(struct timeval *tv) { + graphite_interval_tv = *tv; +} void set_prefix(char* prefix) { graphite_prefix = prefix; @@ -126,13 +131,36 @@ int send_graphite_data() { ZERO(cm->totalstats_interval.total_managed_sess); mutex_unlock(&cm->totalstats_interval.total_average_lock); + mutex_lock(&cm->totalstats_interval.total_calls_duration_lock); + ts.total_calls_duration_interval = cm->totalstats_interval.total_calls_duration_interval; + cm->totalstats_interval.total_calls_duration_interval.tv_sec = 0; + cm->totalstats_interval.total_calls_duration_interval.tv_usec = 0; + + //ZERO(cm->totalstats_interval.total_calls_duration_interval); + mutex_unlock(&cm->totalstats_interval.total_calls_duration_lock); + + rwlock_lock_r(&cm->hashlock); + mutex_lock(&cm->totalstats_interval.managed_sess_lock); + ts.managed_sess_max = cm->totalstats_interval.managed_sess_max; + ts.managed_sess_min = cm->totalstats_interval.managed_sess_min; + cm->totalstats_interval.managed_sess_max = cm->totalstats.managed_sess_crt; + cm->totalstats_interval.managed_sess_min = cm->totalstats.managed_sess_crt; + mutex_unlock(&cm->totalstats_interval.managed_sess_lock); + rwlock_unlock_r(&cm->hashlock); + + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } + rc = sprintf(ptr, "%s.totals.call_dur %llu.%06llu %llu\n",hostname,(unsigned long long)ts.total_calls_duration_interval.tv_sec,(unsigned long long)ts.total_calls_duration_interval.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc; if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } - rc = sprintf(ptr,"%s.totals.average_call_dur %llu.%06llu %llu\n",hostname, (unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_sec,(unsigned long long) cm->totalstats_interval.total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc; + rc = sprintf(ptr,"%s.totals.average_call_dur %llu.%06llu %llu\n",hostname,(unsigned long long)ts.total_average_call_dur.tv_sec,(unsigned long long)ts.total_average_call_dur.tv_usec,(unsigned long long)g_now.tv_sec); ptr += rc; if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } rc = sprintf(ptr,"%s.totals.forced_term_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_forced_term_sess),(unsigned long long)g_now.tv_sec); ptr += rc; if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } rc = sprintf(ptr,"%s.totals.managed_sess "UINT64F" %llu\n",hostname, ts.total_managed_sess,(unsigned long long)g_now.tv_sec); ptr += rc; if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"%s.totals.managed_sess_min "UINT64F" %llu\n",hostname, ts.managed_sess_min,(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } + rc = sprintf(ptr,"%s.totals.managed_sess_max "UINT64F" %llu\n",hostname, ts.managed_sess_max,(unsigned long long)g_now.tv_sec); ptr += rc; + if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } rc = sprintf(ptr,"%s.totals.nopacket_relayed_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_nopacket_relayed_sess),(unsigned long long)g_now.tv_sec); ptr += rc; if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } rc = sprintf(ptr,"%s.totals.oneway_stream_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_oneway_stream_sess),(unsigned long long)g_now.tv_sec); ptr += rc; @@ -149,6 +177,12 @@ int send_graphite_data() { if (graphite_prefix!=NULL) { rc = sprintf(ptr,"%s.",graphite_prefix); ptr += rc; } rc = sprintf(ptr,"%s.totals.reject_sess "UINT64F" %llu\n",hostname, atomic64_get_na(&ts.total_rejected_sess),(unsigned long long)g_now.tv_sec); ptr += rc; + ilog(LOG_DEBUG, "min_sessions:%u max_sessions:%u, call_dur_per_interval:%llu.%06llu at time %llu\n", + ts.managed_sess_min, ts.managed_sess_max, + (unsigned long long ) ts.total_calls_duration_interval.tv_sec, + (unsigned long long ) ts.total_calls_duration_interval.tv_usec, + (unsigned long long ) g_now.tv_sec); + rc = write(graphite_sock, data_to_send, ptr - data_to_send); if (rc<0) { ilog(LOG_ERROR,"Could not write to graphite socket. Disconnecting graphite server."); @@ -225,7 +259,9 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { } if (graphite_sock>0 && !connectinprogress) { + add_total_calls_duration_in_interval(cm, &graphite_interval_tv); rc = send_graphite_data(); + gettimeofday(&cm->latest_graphite_interval_start, NULL); if (rc<0) { ilog(LOG_ERROR,"Sending graphite data failed."); } diff --git a/daemon/graphite.h b/daemon/graphite.h index 67cf1b575..784bce274 100644 --- a/daemon/graphite.h +++ b/daemon/graphite.h @@ -15,5 +15,6 @@ int send_graphite_data(); void graphite_loop_run(struct callmaster* cm, int seconds); void set_prefix(char* prefix); void graphite_loop(void *d); - +void set_latest_graphite_interval_start(struct timeval *tv); +void set_graphite_interval_tv(struct timeval *tv); #endif /* GRAPHITE_H_ */ diff --git a/daemon/main.c b/daemon/main.c index ca2961118..7453c2a41 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -535,6 +535,7 @@ void create_everything(struct main_context *ctx) { int kfd = -1; void *dlh; const char **strp; + struct timeval tmp_tv; if (table < 0) goto no_kernel; @@ -638,6 +639,11 @@ no_kernel: if (redis_restore(ctx->m, mc.redis)) die("Refusing to continue without working Redis database"); + + gettimeofday(&ctx->m->latest_graphite_interval_start, NULL); + + timeval_from_ms(&tmp_tv, graphite_interval*1000000); + set_graphite_interval_tv(&tmp_tv); } int main(int argc, char **argv) {