diff --git a/daemon/aux.h b/daemon/aux.h index afbae0deb..ea7576eb0 100644 --- a/daemon/aux.h +++ b/daemon/aux.h @@ -539,6 +539,9 @@ INLINE u_int64_t atomic64_get_set(atomic64 *u, u_int64_t a) { INLINE void atomic64_inc(atomic64 *u) { atomic64_add(u, 1); } +INLINE void atomic64_dec(atomic64 *u) { + atomic64_add(u, -1); +} INLINE void atomic64_local_copy_zero(atomic64 *dst, atomic64 *src) { atomic64_set_na(dst, atomic64_get_set(src, 0)); } diff --git a/daemon/call.c b/daemon/call.c index 0c5f5bc96..0a690f95c 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -540,6 +540,7 @@ static void callmaster_timer(void *ptr) { if (ps && !(ps->call->redis_call_responsible) && ke->rtp_stats[j].packets >0) { ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets."); ps->call->redis_call_responsible = 1; + atomic64_dec(&m->stats.foreign_sessions); } } diff --git a/daemon/call.h b/daemon/call.h index 29324d25c..ec97008c6 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -229,12 +229,14 @@ struct stats { u_int64_t delay_avg; u_int64_t delay_max; u_int8_t in_tos_tclass; /* XXX shouldn't be here - not stats */ + atomic64 foreign_sessions; // unresponsible via redis notification }; struct totalstats { time_t started; atomic64 total_timeout_sess; - atomic64 total_rejected_sess; + atomic64 total_foreign_sessions; + atomic64 total_rejected_sess; atomic64 total_silent_timeout_sess; atomic64 total_regular_term_sess; atomic64 total_forced_term_sess; @@ -245,7 +247,7 @@ struct totalstats { mutex_t total_average_lock; /* for these two below */ u_int64_t total_managed_sess; - struct timeval total_average_call_dur; + struct timeval total_average_call_dur; mutex_t managed_sess_lock; /* for these below */ u_int64_t managed_sess_crt; diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 38a029e8e..76bac7c53 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -753,7 +753,8 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i { if (m->conf.max_sessions>=0) { rwlock_lock_r(&m->hashlock); - if (g_hash_table_size(m->callhash) >= m->conf.max_sessions) { + if (g_hash_table_size(m->callhash) - + atomic64_get(&m->stats.foreign_sessions) >= m->conf.max_sessions) { rwlock_unlock_r(&m->hashlock); atomic64_inc(&m->totalstats.total_rejected_sess); atomic64_inc(&m->totalstats_interval.total_rejected_sess); diff --git a/daemon/cli.c b/daemon/cli.c index ba88dd053..a831502a1 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -33,6 +33,8 @@ static void cli_incoming_list_totals(char* buffer, int len, struct callmaster* m ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total managed sessions :"UINT64F"\n", num_sessions); ADJUSTLEN(printlen,outbufend,replybuffer); + printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total foreign sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_foreign_sessions)); + ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total rejected sessions :"UINT64F"\n", atomic64_get(&m->totalstats.total_rejected_sess)); ADJUSTLEN(printlen,outbufend,replybuffer); printlen = snprintf(replybuffer,(outbufend-replybuffer), " Total timed-out sessions via TIMEOUT :"UINT64F"\n",atomic64_get(&m->totalstats.total_timeout_sess)); @@ -164,8 +166,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m return; } - printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i\n\n", - c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db); + printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i | foreign:%s\n\n", + c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, c->redis_call_responsible?"no":"yes"); ADJUSTLEN(printlen,outbufend,replybuffer); for (l = c->monologues.head; l; l = l->next) { @@ -345,7 +347,9 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char* if (len>=strlen(LIST_NUMSESSIONS) && strncmp(buffer,LIST_NUMSESSIONS,strlen(LIST_NUMSESSIONS)) == 0) { rwlock_lock_r(&m->hashlock); - printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions running on rtpengine: %i\n", g_hash_table_size(m->callhash)); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions (own and foreign) running on rtpengine: %i\n", g_hash_table_size(m->callhash)); + ADJUSTLEN(printlen,outbufend,replybuffer); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Current foreign sessions on rtpengine: %i\n", atomic64_get(&m->stats.foreign_sessions)); ADJUSTLEN(printlen,outbufend,replybuffer); rwlock_unlock_r(&m->hashlock); } else if (len>=strlen(LIST_SESSIONS) && strncmp(buffer,LIST_SESSIONS,strlen(LIST_SESSIONS)) == 0) { @@ -360,7 +364,7 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char* while (g_hash_table_iter_next (&iter, &key, &value)) { ptrkey = (str*)key; call = (struct call*)value; - printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db); + printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, call->redis_call_responsible?"no":"yes"); ADJUSTLEN(printlen,outbufend,replybuffer); } rwlock_unlock_r(&m->hashlock); diff --git a/daemon/redis.c b/daemon/redis.c index 9079ed98b..68a3baa80 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -231,10 +231,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { return; } - mutex_lock(&r->lock); - r = cm->conf.redis_read_notify; + mutex_lock(&r->lock); + redisReply *rr = (redisReply*)reply; if (reply == NULL || rr->type != REDIS_REPLY_ARRAY) @@ -296,6 +296,8 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { goto err; } redis_restore_call(r, cm, rr->element[2]); + atomic64_inc(&cm->stats.foreign_sessions); + atomic64_inc(&cm->totalstats.total_foreign_sessions); // we lookup again to retrieve the call to insert the kayspace db id c = g_hash_table_lookup(cm->callhash, &callid); @@ -306,6 +308,7 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { if (strncmp(rr->element[3]->str,"del",3)==0) { call_destroy(c); + atomic64_dec(&cm->stats.foreign_sessions); } err: @@ -1142,6 +1145,7 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi err = NULL; c->redis_hosted_db = r->db; + c->redis_call_responsible = 1; obj_put(c); err6: @@ -1355,8 +1359,10 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op rwlock_lock_r(&c->master_lock); - if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) + if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) { + rlog(LOG_ERR, " >>>>>>>>>>>>>>>>> Redis error."); goto err; + } redis_pipe(r, "DEL notifier-"PB"", STR(&c->callid)); redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); @@ -1596,6 +1602,10 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op redis_consume(r); + mutex_unlock(&r->lock); + rwlock_unlock_r(&c->master_lock); + + return; err: mutex_unlock(&r->lock); @@ -1620,6 +1630,10 @@ void redis_delete(struct call *c, struct redis *r, int role) { redis_delete_call(c, r); + rwlock_unlock_r(&c->master_lock); + mutex_unlock(&r->lock); + return; + err: rwlock_unlock_r(&c->master_lock); mutex_unlock(&r->lock);