Implemented session limitation logic

The session limit is only for calls an rtpengine is responsible for.
Foreign calls (coming in via redis notification) are not counted as
long as the rtpengine is not responsible for those calls.
At least that means that the limit may exceed if the calls the rtpengine
is responsible for plus the former foreign calls are greater than the limit.
This will happen suddenly when the rtpengine becomes responsible for the
foreign calls.
pull/225/head
Frederic-Philippe Metz 9 years ago
parent 637d1f4cce
commit 1ca0cc5a52

@ -539,6 +539,9 @@ INLINE u_int64_t atomic64_get_set(atomic64 *u, u_int64_t a) {
INLINE void atomic64_inc(atomic64 *u) {
atomic64_add(u, 1);
}
INLINE void atomic64_dec(atomic64 *u) {
atomic64_add(u, -1);
}
INLINE void atomic64_local_copy_zero(atomic64 *dst, atomic64 *src) {
atomic64_set_na(dst, atomic64_get_set(src, 0));
}

@ -540,6 +540,7 @@ static void callmaster_timer(void *ptr) {
if (ps && !(ps->call->redis_call_responsible) && ke->rtp_stats[j].packets >0) {
ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets.");
ps->call->redis_call_responsible = 1;
atomic64_dec(&m->stats.foreign_sessions);
}
}

@ -229,12 +229,14 @@ struct stats {
u_int64_t delay_avg;
u_int64_t delay_max;
u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */
atomic64 foreign_sessions; // unresponsible via redis notification
};
struct totalstats {
time_t started;
atomic64 total_timeout_sess;
atomic64 total_rejected_sess;
atomic64 total_foreign_sessions;
atomic64 total_rejected_sess;
atomic64 total_silent_timeout_sess;
atomic64 total_regular_term_sess;
atomic64 total_forced_term_sess;
@ -245,7 +247,7 @@ struct totalstats {
mutex_t total_average_lock; /* for these two below */
u_int64_t total_managed_sess;
struct timeval total_average_call_dur;
struct timeval total_average_call_dur;
mutex_t managed_sess_lock; /* for these below */
u_int64_t managed_sess_crt;

@ -753,7 +753,8 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i
{
if (m->conf.max_sessions>=0) {
rwlock_lock_r(&m->hashlock);
if (g_hash_table_size(m->callhash) >= m->conf.max_sessions) {
if (g_hash_table_size(m->callhash) -
atomic64_get(&m->stats.foreign_sessions) >= m->conf.max_sessions) {
rwlock_unlock_r(&m->hashlock);
atomic64_inc(&m->totalstats.total_rejected_sess);
atomic64_inc(&m->totalstats_interval.total_rejected_sess);

@ -33,6 +33,8 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :"UINT64F"\n", num_sessions);
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total foreign sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_foreign_sessions));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total rejected sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_rejected_sess));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_timeout_sess));
@ -164,8 +166,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
return;
}
printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i\n\n",
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db);
printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i | foreign:%s\n\n",
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, c->redis_call_responsible?"no":"yes");
ADJUSTLEN(printlen,outbufend,replybuffer);
for (l = c->monologues.head; l; l = l->next) {
@ -345,7 +347,9 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char*
if (len>=strlen(LIST_NUMSESSIONS) && strncmp(buffer,LIST_NUMSESSIONS,strlen(LIST_NUMSESSIONS)) == 0) {
rwlock_lock_r(&m->hashlock);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions running on rtpengine: %i\n", g_hash_table_size(m->callhash));
printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions (own and foreign) running on rtpengine: %i\n", g_hash_table_size(m->callhash));
ADJUSTLEN(printlen,outbufend,replybuffer);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Current foreign sessions on rtpengine: %i\n", atomic64_get(&m->stats.foreign_sessions));
ADJUSTLEN(printlen,outbufend,replybuffer);
rwlock_unlock_r(&m->hashlock);
} else if (len>=strlen(LIST_SESSIONS) && strncmp(buffer,LIST_SESSIONS,strlen(LIST_SESSIONS)) == 0) {
@ -360,7 +364,7 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char*
while (g_hash_table_iter_next (&iter, &key, &value)) {
ptrkey = (str*)key;
call = (struct call*)value;
printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db);
printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, call->redis_call_responsible?"no":"yes");
ADJUSTLEN(printlen,outbufend,replybuffer);
}
rwlock_unlock_r(&m->hashlock);

@ -231,10 +231,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
return;
}
mutex_lock(&r->lock);
r = cm->conf.redis_read_notify;
mutex_lock(&r->lock);
redisReply *rr = (redisReply*)reply;
if (reply == NULL || rr->type != REDIS_REPLY_ARRAY)
@ -296,6 +296,8 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
goto err;
}
redis_restore_call(r, cm, rr->element[2]);
atomic64_inc(&cm->stats.foreign_sessions);
atomic64_inc(&cm->totalstats.total_foreign_sessions);
// we lookup again to retrieve the call to insert the kayspace db id
c = g_hash_table_lookup(cm->callhash, &callid);
@ -306,6 +308,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
if (strncmp(rr->element[3]->str,"del",3)==0) {
call_destroy(c);
atomic64_dec(&cm->stats.foreign_sessions);
}
err:
@ -1142,6 +1145,7 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi
err = NULL;
c->redis_hosted_db = r->db;
c->redis_call_responsible = 1;
obj_put(c);
err6:
@ -1355,8 +1359,10 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op
rwlock_lock_r(&c->master_lock);
if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db))
if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) {
rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error.");
goto err;
}
redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid));
redis_pipe(r, "SREM calls "PB"", STR(&c->callid));
@ -1596,6 +1602,10 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op
redis_consume(r);
mutex_unlock(&r->lock);
rwlock_unlock_r(&c->master_lock);
return;
err:
mutex_unlock(&r->lock);
@ -1620,6 +1630,10 @@ void redis_delete(struct call *c, struct redis *r, int role) {
redis_delete_call(c, r);
rwlock_unlock_r(&c->master_lock);
mutex_unlock(&r->lock);
return;
err:
rwlock_unlock_r(&c->master_lock);
mutex_unlock(&r->lock);

Loading…
Cancel
Save