Implemented redis notification according to RTPENGINE-64

Thoughts on that topic so far:

There's one thing to keep also in mind. What do we do if the call
changes (streams) and the backup node is notified ? Currently we only
know (by subscribing to the 'notifier-' prefix that in fact it has
changed, but we don't know what has changed in detail. Subscribing to
everything would lead to the problem that we have to take care about
synchronising the the new streams with the old ones. Without having a
look at the code that might be a lot of effort and ... I guess that's
why richard likes to ... have clean states of the calls. Synchronizing
is always a mess. Easier to delete and setup new.

I thought about the following solution for makes things more abstract
and easier to understand:

1. Whenever that call on a backup node (foreign call) has seen a packet
(also timers are started then), we do not process any notification by a
redis notification for this call (caus' the RTP-IP has already been
switched). More precise and abstract that means: If a node has taken
over the responsibility for that call (by having seen a packet), it
assumes that notifications from the former node to be dropped. The call
will be deleted when either the timer fires or (for other companies) the
control channel address has also been switched and via that channel the
call is deleted.

2. If a call has not seen a packet (inactive call on the backup node,
seen as not responsible for that call but could become so), we accept
all commands for that call on the backup node in the same way as on the
original node. That means also deletions in-between and so on. I mean if
the original node does it, why not accepting to do the same way on the
backup node ?
pull/225/head
Frederic-Philippe Metz 10 years ago
parent bf38f151ba
commit fd3e2342c1

@ -259,6 +259,11 @@ next:
}
}
if (!(c->redis_call_responsible)) {
ilog(LOG_INFO, "Timeout did not lead to close the call since I am not responisble");
goto out;
}
ilog(LOG_INFO, "Closing call due to timeout");
drop:
@ -531,6 +536,10 @@ static void callmaster_timer(void *ptr) {
ke->rtp_stats[j].bytes - atomic64_get(&rs->bytes));
atomic64_set(&rs->kernel_packets, ke->rtp_stats[j].packets);
atomic64_set(&rs->kernel_bytes, ke->rtp_stats[j].bytes);
if (ps && !(ps->call->redis_call_responsible) && ke->rtp_stats[j].packets >0) {
ilog(LOG_DEBUG, "Taking over resposibility now for that call since I saw packets.");
ps->call->redis_call_responsible = 1;
}
}
update = 0;
@ -1832,12 +1841,13 @@ void call_destroy(struct call *c) {
obj_put(c);
if (m->conf.redis_write) {
redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE);
} else if (m->conf.redis) {
redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE);
if (c->redis_call_responsible) {
if (m->conf.redis_write) {
redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE);
} else if (m->conf.redis) {
redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE);
}
}
rwlock_lock_w(&c->master_lock);
/* at this point, no more packet streams can be added */

@ -415,6 +415,9 @@ struct call {
char *created_from;
sockaddr_t created_from_addr;
int redis_hosted_db;
// following flag servers as a mark that this
// call is either created by me or where I took over the responsibility (a packet has seen)
int redis_call_responsible;
};
struct callmaster_config {

@ -292,6 +292,11 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
c = g_hash_table_lookup(cm->callhash, &callid);
if (c && c->redis_call_responsible) {
rlog(LOG_DEBUG,"I am responsible for that call so I ignore redis notifications.");
return;
}
if (strncmp(rr->element[3]->str,"sadd",4)==0) {
if (c) {
rlog(LOG_INFO, "Redis-Notifier: Call already exists with this callid:%s\n", rr->element[2]->str);
@ -1586,10 +1591,11 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op
redis_pipe(r, "EXPIRE call-"PB" 86400", STR(&c->callid));
redis_pipe(r, "SADD calls "PB"", STR(&c->callid));
if (opmode==OP_ANSWER) {
// if (opmode==OP_ANSWER) {
redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid));
}
// }
c->redis_hosted_db = r->db;
c->redis_call_responsible = 1;
redis_consume(r);
mutex_unlock(&r->lock);

Loading…
Cancel
Save