From 1ca0cc5a52bd98b9428f2e19a008c36b5f2e73e6 Mon Sep 17 00:00:00 2001 From: Frederic-Philippe Metz Date: Fri, 12 Feb 2016 11:27:50 +0100 Subject: [PATCH] Implemented session limitation logic The session limit is only for calls an rtpengine is responsible for. Foreign calls (coming in via redis notification) are not counted as long as the rtpengine is not responsible for those calls. At least that means that the limit may exceed if the calls the rtpengine is responsible for plus the former foreign calls are greater than the limit. This will happen suddenly when the rtpengine becomes responsible for the foreign calls. --- daemon/aux.h | 3 +++ daemon/call.c | 1 + daemon/call.h | 6 ++++-- daemon/call_interfaces.c | 3 ++- daemon/cli.c | 12 ++++++++---- daemon/redis.c | 20 +++++++++++++++++--- 6 files changed, 35 insertions(+), 10 deletions(-) diff --git a/daemon/aux.h b/daemon/aux.h index afbae0deb..ea7576eb0 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -539,6 +539,9 @@ INLINE u_int64_t atomic64_get_set(atomic64 *u, u_int64_t a) { INLINE void atomic64_inc(atomic64 *u) { atomic64_add(u, 1); } +INLINE void atomic64_dec(atomic64 *u) { + atomic64_add(u, -1); +} INLINE void atomic64_local_copy_zero(atomic64 *dst, atomic64 *src) { atomic64_set_na(dst, atomic64_get_set(src, 0)); } diff --git a/daemon/call.c b/daemon/call.c index 0c5f5bc96..0a690f95c 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -540,6 +540,7 @@ static void callmaster_timer(void *ptr) { if (ps && !(ps->call->redis_call_responsible) && ke->rtp_stats[j].packets >0) { ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets."); ps->call->redis_call_responsible = 1; + atomic64_dec(&m->stats.foreign_sessions); } } diff --git a/daemon/call.h b/daemon/call.h index 29324d25c..ec97008c6 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -229,12 +229,14 @@ struct stats { u_int64_t delay_avg; u_int64_t delay_max; u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */ + atomic64 foreign_sessions; // unresponsible via redis notification }; struct totalstats { time_t started; atomic64 total_timeout_sess; - atomic64 total_rejected_sess; + atomic64 total_foreign_sessions; + atomic64 total_rejected_sess; atomic64 total_silent_timeout_sess; atomic64 total_regular_term_sess; atomic64 total_forced_term_sess; @@ -245,7 +247,7 @@ struct totalstats { mutex_t total_average_lock; /* for these two below */ u_int64_t total_managed_sess; - struct timeval total_average_call_dur; + struct timeval total_average_call_dur; mutex_t managed_sess_lock; /* for these below */ u_int64_t managed_sess_crt; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 38a029e8e..76bac7c53 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -753,7 +753,8 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i { if (m->conf.max_sessions>=0) { rwlock_lock_r(&m->hashlock); - if (g_hash_table_size(m->callhash) >= m->conf.max_sessions) { + if (g_hash_table_size(m->callhash) - + atomic64_get(&m->stats.foreign_sessions) >= m->conf.max_sessions) { rwlock_unlock_r(&m->hashlock); atomic64_inc(&m->totalstats.total_rejected_sess); atomic64_inc(&m->totalstats_interval.total_rejected_sess); diff --git a/daemon/cli.c b/daemon/cli.c index ba88dd053..a831502a1 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -33,6 +33,8 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :"UINT64F"\n", num_sessions); ADJUSTLEN(printlen,outbufend,replybuffer); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total foreign sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_foreign_sessions)); + ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total rejected sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_rejected_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_timeout_sess)); @@ -164,8 +166,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m return; } - printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i\n\n", - c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db); + printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i | foreign:%s\n\n", + c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, c->redis_call_responsible?"no":"yes"); ADJUSTLEN(printlen,outbufend,replybuffer); for (l = c->monologues.head; l; l = l->next) { @@ -345,7 +347,9 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char* if (len>=strlen(LIST_NUMSESSIONS) && strncmp(buffer,LIST_NUMSESSIONS,strlen(LIST_NUMSESSIONS)) == 0) { rwlock_lock_r(&m->hashlock); - printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions running on rtpengine: %i\n", g_hash_table_size(m->callhash)); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions (own and foreign) running on rtpengine: %i\n", g_hash_table_size(m->callhash)); + ADJUSTLEN(printlen,outbufend,replybuffer); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Current foreign sessions on rtpengine: %i\n", atomic64_get(&m->stats.foreign_sessions)); ADJUSTLEN(printlen,outbufend,replybuffer); rwlock_unlock_r(&m->hashlock); } else if (len>=strlen(LIST_SESSIONS) && strncmp(buffer,LIST_SESSIONS,strlen(LIST_SESSIONS)) == 0) { @@ -360,7 +364,7 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char* while (g_hash_table_iter_next (&iter, &key, &value)) { ptrkey = (str*)key; call = (struct call*)value; - printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db); + printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, call->redis_call_responsible?"no":"yes"); ADJUSTLEN(printlen,outbufend,replybuffer); } rwlock_unlock_r(&m->hashlock); diff --git a/daemon/redis.c b/daemon/redis.c index 9079ed98b..68a3baa80 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -231,10 +231,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { return; } - mutex_lock(&r->lock); - r = cm->conf.redis_read_notify; + mutex_lock(&r->lock); + redisReply *rr = (redisReply*)reply; if (reply == NULL || rr->type != REDIS_REPLY_ARRAY) @@ -296,6 +296,8 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } redis_restore_call(r, cm, rr->element[2]); + atomic64_inc(&cm->stats.foreign_sessions); + atomic64_inc(&cm->totalstats.total_foreign_sessions); // we lookup again to retrieve the call to insert the kayspace db id c = g_hash_table_lookup(cm->callhash, &callid); @@ -306,6 +308,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { if (strncmp(rr->element[3]->str,"del",3)==0) { call_destroy(c); + atomic64_dec(&cm->stats.foreign_sessions); } err: @@ -1142,6 +1145,7 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi err = NULL; c->redis_hosted_db = r->db; + c->redis_call_responsible = 1; obj_put(c); err6: @@ -1355,8 +1359,10 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op rwlock_lock_r(&c->master_lock); - if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) + if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { + rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error."); goto err; + } redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); @@ -1596,6 +1602,10 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op redis_consume(r); + mutex_unlock(&r->lock); + rwlock_unlock_r(&c->master_lock); + + return; err: mutex_unlock(&r->lock); @@ -1620,6 +1630,10 @@ void redis_delete(struct call *c, struct redis *r, int role) { redis_delete_call(c, r); + rwlock_unlock_r(&c->master_lock); + mutex_unlock(&r->lock); + return; + err: rwlock_unlock_r(&c->master_lock); mutex_unlock(&r->lock);