Update notifications flag logic

-> is_backup_call becomes foreign_call (established via redis notifications)
-> remove active_foreign_call logic

Add only libevent-dev dependency.
pull/225/head
Stefan Mititelu 10 years ago
parent 1648757aa7
commit f392d9b768

@ -191,6 +191,7 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
cm = c->callmaster; cm = c->callmaster;
rwlock_lock_r(&cm->conf.config_lock); rwlock_lock_r(&cm->conf.config_lock);
// final timeout applicable to all calls (own and foreign)
if (cm->conf.final_timeout && poller_now >= (c->created + cm->conf.final_timeout)) { if (cm->conf.final_timeout && poller_now >= (c->created + cm->conf.final_timeout)) {
ilog(LOG_INFO, "Closing call due to final timeout"); ilog(LOG_INFO, "Closing call due to final timeout");
tmp_t_reason = FINAL_TIMEOUT; tmp_t_reason = FINAL_TIMEOUT;
@ -203,8 +204,8 @@ static void call_timer_iterator(void *key, void *val, void *ptr) {
goto delete; goto delete;
} }
if (c->redis_foreign_call) { // other timeouts not applicable to foreign calls
ilog(LOG_DEBUG, "Redis-Notification: Timeout resets the deletion timers for a call where I am not responsible."); if (IS_FOREIGN_CALL(c)) {
c->deleted = c->ml_deleted = poller_now + cm->conf.delete_delay; c->deleted = c->ml_deleted = poller_now + cm->conf.delete_delay;
goto out; goto out;
} }
@ -261,7 +262,7 @@ next:
; ;
} }
if (good || c->redis_foreign_call) { if (good || IS_FOREIGN_CALL(c)) {
goto out; goto out;
} }
@ -547,11 +548,6 @@ static void callmaster_timer(void *ptr) {
ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes)); ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes));
atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets); atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets);
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes); atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
if (ps && ps->call->redis_foreign_call && ke->rtp_stats[j].packets > 0) {
ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets.");
ps->call->redis_foreign_call = 0;
//atomic64_dec(&m->stats.foreign_sessions); /* this doesn't decrease when call becomes active */
}
} }
update = 0; update = 0;
@ -1821,7 +1817,7 @@ struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
while (g_hash_table_iter_next(&iter, &key, &value)) { while (g_hash_table_iter_next(&iter, &key, &value)) {
call = (struct call*) value; call = (struct call*) value;
if (!call->monologues.head || IS_BACKUP_CALL(call)) if (!call->monologues.head || IS_FOREIGN_CALL(call))
continue; continue;
ml = call->monologues.head->data; ml = call->monologues.head->data;
if (timercmp(interval_start, &ml->started, >)) { if (timercmp(interval_start, &ml->started, >)) {
@ -1868,10 +1864,10 @@ void call_destroy(struct call *c) {
rwlock_lock_w(&m->hashlock); rwlock_lock_w(&m->hashlock);
ret = g_hash_table_remove(m->callhash, &c->callid); ret = g_hash_table_remove(m->callhash, &c->callid);
if (IS_BACKUP_CALL(c)) { if (IS_FOREIGN_CALL(c)) {
atomic64_dec(&m->stats.foreign_sessions); atomic64_dec(&m->stats.foreign_sessions);
} }
if(!IS_BACKUP_CALL(c)) { if(!IS_FOREIGN_CALL(c)) {
mutex_lock(&m->totalstats_interval.managed_sess_lock); mutex_lock(&m->totalstats_interval.managed_sess_lock);
m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min, m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min,
g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions));
@ -1884,7 +1880,7 @@ void call_destroy(struct call *c) {
obj_put(c); obj_put(c);
if (!c->redis_foreign_call) { if (!IS_FOREIGN_CALL(c)) {
redis_delete(c, m->conf.redis_write); redis_delete(c, m->conf.redis_write);
} }
@ -2135,7 +2131,7 @@ void call_destroy(struct call *c) {
} }
if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) { if (ps && ps2 && atomic64_get(&ps2->stats.packets)==0) {
if (atomic64_get(&ps->stats.packets)!=0 && !IS_BACKUP_CALL(c)){ if (atomic64_get(&ps->stats.packets)!=0 && !IS_FOREIGN_CALL(c)){
if (atomic64_get(&ps->stats.packets)!=0) { if (atomic64_get(&ps->stats.packets)!=0) {
atomic64_inc(&m->totalstats.total_oneway_stream_sess); atomic64_inc(&m->totalstats.total_oneway_stream_sess);
atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess); atomic64_inc(&m->totalstats_interval.total_oneway_stream_sess);
@ -2147,7 +2143,7 @@ void call_destroy(struct call *c) {
} }
} }
if (!IS_BACKUP_CALL(c)) { if (!IS_FOREIGN_CALL(c)) {
atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); atomic64_add(&m->totalstats.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2); atomic64_add(&m->totalstats_interval.total_nopacket_relayed_sess, total_nopacket_relayed_sess / 2);
} }
@ -2155,7 +2151,7 @@ void call_destroy(struct call *c) {
if (c->monologues.head) { if (c->monologues.head) {
ml = c->monologues.head->data; ml = c->monologues.head->data;
if (!IS_BACKUP_CALL(c)) { if (!IS_FOREIGN_CALL(c)) {
if (ml->term_reason==TIMEOUT) { if (ml->term_reason==TIMEOUT) {
atomic64_inc(&m->totalstats.total_timeout_sess); atomic64_inc(&m->totalstats.total_timeout_sess);
atomic64_inc(&m->totalstats_interval.total_timeout_sess); atomic64_inc(&m->totalstats_interval.total_timeout_sess);
@ -2345,8 +2341,7 @@ restart:
mutex_unlock(&m->totalstats_interval.managed_sess_lock); mutex_unlock(&m->totalstats_interval.managed_sess_lock);
} }
else if (type == CT_FOREIGN_CALL) { /* foreign call*/ else if (type == CT_FOREIGN_CALL) { /* foreign call*/
c->redis_foreign_call = 1; c->foreign_call = 1;
c->is_backup_call = 1;
atomic64_inc(&m->stats.foreign_sessions); atomic64_inc(&m->stats.foreign_sessions);
atomic64_inc(&m->totalstats.total_foreign_sessions); atomic64_inc(&m->totalstats.total_foreign_sessions);
} }

@ -114,7 +114,7 @@ enum call_type {
#define __C_DBG(x...) ((void)0) #define __C_DBG(x...) ((void)0)
#endif #endif
#define IS_BACKUP_CALL(c) (c->is_backup_call) #define IS_FOREIGN_CALL(c) (c->foreign_call)
/* flags shared by several of the structs below */ /* flags shared by several of the structs below */
#define SHARED_FLAG_IMPLICIT_RTCP 0x00000001 #define SHARED_FLAG_IMPLICIT_RTCP 0x00000001
@ -435,8 +435,7 @@ struct call {
sockaddr_t created_from_addr; sockaddr_t created_from_addr;
unsigned int redis_hosted_db; unsigned int redis_hosted_db;
unsigned int redis_foreign_call; unsigned int foreign_call; // created_via_redis_notify call
unsigned int is_backup_call; // created_via_redis_notify call
}; };
struct callmaster_config { struct callmaster_config {

@ -211,7 +211,7 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
} }
printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i | foreign:%s\n\n", printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i | foreign:%s\n\n",
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, c->is_backup_call?"yes":"no"); c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db, IS_FOREIGN_CALL(c)?"yes":"no");
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
for (l = c->monologues.head; l; l = l->next) { for (l = c->monologues.head; l; l = l->next) {
@ -328,13 +328,13 @@ static void cli_incoming_list_sessions(char* buffer, int len, struct callmaster*
continue; continue;
} }
} else if (len>=strlen(LIST_OWN) && strncmp(buffer,LIST_OWN,strlen(LIST_OWN)) == 0) { } else if (len>=strlen(LIST_OWN) && strncmp(buffer,LIST_OWN,strlen(LIST_OWN)) == 0) {
if (!call || call->is_backup_call) { if (!call || IS_FOREIGN_CALL(call)) {
continue; continue;
} else { } else {
found_own = 1; found_own = 1;
} }
} else if (len>=strlen(LIST_FOREIGN) && strncmp(buffer,LIST_FOREIGN,strlen(LIST_FOREIGN)) == 0) { } else if (len>=strlen(LIST_FOREIGN) && strncmp(buffer,LIST_FOREIGN,strlen(LIST_FOREIGN)) == 0) {
if (!call || !call->is_backup_call) { if (!call || !IS_FOREIGN_CALL(call)) {
continue; continue;
} else { } else {
found_foreign = 1; found_foreign = 1;
@ -344,7 +344,7 @@ static void cli_incoming_list_sessions(char* buffer, int len, struct callmaster*
break; break;
} }
printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, call->is_backup_call?"yes":"no"); printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db, IS_FOREIGN_CALL(call)?"yes":"no");
ADJUSTLEN(printlen,outbufend,replybuffer); ADJUSTLEN(printlen,outbufend,replybuffer);
} }
rwlock_unlock_r(&m->hashlock); rwlock_unlock_r(&m->hashlock);
@ -614,9 +614,9 @@ static void cli_incoming_terminate(char* buffer, int len, struct callmaster* m,
while (g_hash_table_iter_next(&iter, &key, &value)) { while (g_hash_table_iter_next(&iter, &key, &value)) {
c = (struct call*)value; c = (struct call*)value;
if (!c) continue; if (!c) continue;
if (!str_memcmp(&termparam,"own") && c->is_backup_call) { if (!str_memcmp(&termparam,"own") && IS_FOREIGN_CALL(c)) {
continue; continue;
} else if (!str_memcmp(&termparam,"foreign") && !c->is_backup_call) { } else if (!str_memcmp(&termparam,"foreign") && !IS_FOREIGN_CALL(c)) {
continue; continue;
} }
if (!c->ml_deleted) { if (!c->ml_deleted) {
@ -690,7 +690,7 @@ static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char
} else if (endptr == str_keyspace_db.s) { } else if (endptr == str_keyspace_db.s) {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail adding keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s); printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail adding keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s);
} else { } else {
if (!g_queue_find_custom(m->conf.redis_subscribed_keyspaces, uint_keyspace_db, guint_cmp)) { if (!g_queue_find_custom(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db), guint_cmp)) {
g_queue_push_tail(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); g_queue_push_tail(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db));
redis_notify_subscribe_action(m, SUBSCRIBE_KEYSPACE, uint_keyspace_db); redis_notify_subscribe_action(m, SUBSCRIBE_KEYSPACE, uint_keyspace_db);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Success adding keyspace %u to redis notifications.\n", uint_keyspace_db); printlen = snprintf(replybuffer, outbufend-replybuffer, "Success adding keyspace %u to redis notifications.\n", uint_keyspace_db);
@ -727,7 +727,7 @@ static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char*
printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail removing keyspace %.*s to redis notifications; errono=%d\n", str_keyspace_db.len, str_keyspace_db.s, errno); printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail removing keyspace %.*s to redis notifications; errono=%d\n", str_keyspace_db.len, str_keyspace_db.s, errno);
} else if (endptr == str_keyspace_db.s) { } else if (endptr == str_keyspace_db.s) {
printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail removing keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s); printlen = snprintf(replybuffer, outbufend-replybuffer, "Fail removing keyspace %.*s to redis notifications; no digists found\n", str_keyspace_db.len, str_keyspace_db.s);
} else if ((l = g_queue_find_custom(m->conf.redis_subscribed_keyspaces, uint_keyspace_db, guint_cmp))) { } else if ((l = g_queue_find_custom(m->conf.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db), guint_cmp))) {
// remove this keyspace // remove this keyspace
redis_notify_subscribe_action(m, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); redis_notify_subscribe_action(m, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db);
g_queue_remove(m->conf.redis_subscribed_keyspaces, l->data); g_queue_remove(m->conf.redis_subscribed_keyspaces, l->data);
@ -737,7 +737,7 @@ static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char*
g_hash_table_iter_init(&iter, m->callhash); g_hash_table_iter_init(&iter, m->callhash);
while (g_hash_table_iter_next(&iter, &key, &value)) { while (g_hash_table_iter_next(&iter, &key, &value)) {
c = (struct call*)value; c = (struct call*)value;
if (!c || !c->is_backup_call|| !(c->redis_hosted_db == uint_keyspace_db)) { if (!c || !IS_FOREIGN_CALL(c)|| !(c->redis_hosted_db == uint_keyspace_db)) {
continue; continue;
} }
if (!c->ml_deleted) { if (!c->ml_deleted) {

@ -315,11 +315,6 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
rwlock_unlock_w(&c->master_lock); // because of call_get(..) rwlock_unlock_w(&c->master_lock); // because of call_get(..)
} }
if (c && !c->redis_foreign_call) {
rlog(LOG_DEBUG,"I am responsible for that call so I ignore redis notifications.");
goto err;
}
if (strncmp(rr->element[3]->str,"sadd",4)==0) { if (strncmp(rr->element[3]->str,"sadd",4)==0) {
if (c) { if (c) {
rlog(LOG_INFO, "Redis-Notifier: Call already exists with this callid:%s\n", rr->element[2]->str); rlog(LOG_INFO, "Redis-Notifier: Call already exists with this callid:%s\n", rr->element[2]->str);

1
debian/control vendored

@ -8,7 +8,6 @@ Build-Depends: debhelper (>= 5),
libglib2.0-dev (>= 2.30), libglib2.0-dev (>= 2.30),
libhiredis-dev, libhiredis-dev,
libevent-dev (>= 2.0), libevent-dev (>= 2.0),
libevent-pthreads-2.0-5 (>= 2.0),
libpcre3-dev, libpcre3-dev,
libssl-dev (>= 1.0.1), libssl-dev (>= 1.0.1),
libxmlrpc-c3-dev (>= 1.16.07) | libxmlrpc-core-c3-dev (>= 1.16.07), libxmlrpc-c3-dev (>= 1.16.07) | libxmlrpc-core-c3-dev (>= 1.16.07),

Loading…
Cancel
Save