Intermediate State: Notifications enabled by cli

pull/225/head
Frederic-Philippe Metz 10 years ago
parent a9b27c7e57
commit 48543b4c4e

@ -414,6 +414,7 @@ struct call {
unsigned char tos;
char *created_from;
sockaddr_t created_from_addr;
int redis_hosted_db;
};
struct callmaster_config {

@ -164,8 +164,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
return;
}
printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu\n\n",
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal);
printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i\n\n",
c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db);
ADJUSTLEN(printlen,outbufend,replybuffer);
for (l = c->monologues.head; l; l = l->next) {
@ -360,7 +360,7 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char*
while (g_hash_table_iter_next (&iter, &key, &value)) {
ptrkey = (str*)key;
call = (struct call*)value;
printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from);
printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db);
ADJUSTLEN(printlen,outbufend,replybuffer);
}
rwlock_unlock_r(&m->hashlock);
@ -467,6 +467,52 @@ static void cli_incoming_terminate(char* buffer, int len, struct callmaster* m,
obj_put(c);
}
static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen=0;
unsigned int keyspace_db;
str str_keyspace_db;
if (len<=1) {
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "More parameters required.");
ADJUSTLEN(printlen,outbufend,replybuffer);
return;
}
++buffer; --len; // one space
str_keyspace_db.s = buffer;
str_keyspace_db.len = len;
keyspace_db = str_to_i(&str_keyspace_db, -1);
redis_notify_subscribe_keyspace(m,keyspace_db);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully added keyspace %i to redis notifications.\n", keyspace_db);
ADJUSTLEN(printlen,outbufend,replybuffer);
}
static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) {
int printlen=0;
unsigned int keyspace_db;
str str_keyspace_db;
if (len<=1) {
printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "More parameters required.");
ADJUSTLEN(printlen,outbufend,replybuffer);
return;
}
++buffer; --len; // one space
str_keyspace_db.s = buffer;
str_keyspace_db.len = len;
keyspace_db = str_to_i(&str_keyspace_db, -1);
redis_notify_unsubscribe_keyspace(m,keyspace_db);
printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed keyspace %i to redis notifications.\n", keyspace_db);
ADJUSTLEN(printlen,outbufend,replybuffer);
}
static void cli_incoming(int fd, void *p, uintptr_t u) {
int nfd;
struct sockaddr_in sin;
@ -514,6 +560,8 @@ next:
static const char* LIST = "list";
static const char* TERMINATE = "terminate";
static const char* SET = "set";
static const char* KSADD = "ksadd";
static const char* KSRM = "ksrm";
if (strncmp(inbuf,LIST,strlen(LIST)) == 0) {
cli_incoming_list(inbuf+strlen(LIST), inlen-strlen(LIST), cli->callmaster, outbuf, outbufend);
@ -521,6 +569,10 @@ next:
cli_incoming_terminate(inbuf+strlen(TERMINATE), inlen-strlen(TERMINATE), cli->callmaster, outbuf, outbufend);
} else if (strncmp(inbuf,SET,strlen(SET)) == 0) {
cli_incoming_set(inbuf+strlen(SET), inlen-strlen(SET), cli->callmaster, outbuf, outbufend);
} else if (strncmp(inbuf,KSADD,strlen(KSADD)) == 0) {
cli_incoming_ksadd(inbuf+strlen(KSADD), inlen-strlen(KSADD), cli->callmaster, outbuf, outbufend);
} else if (strncmp(inbuf,KSRM,strlen(KSRM)) == 0) {
cli_incoming_ksrm(inbuf+strlen(KSRM), inlen-strlen(KSRM), cli->callmaster, outbuf, outbufend);
} else {
sprintf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", inbuf);
}

@ -221,6 +221,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
struct redis *r = 0;
struct call* c;
str callid;
char db_str[16]; memset(&db_str, 0, 8);
char* pdbstr = db_str;
unsigned char* p = 0;
int dbno;
if (cm->conf.redis_read) {
r = cm->conf.redis_read;
@ -249,6 +253,20 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
return;
}
// extract <db> from __keyspace@<db>__ prefix
p = strstr(rr->element[2]->str, "@");
++p;
while (isdigit(*p)) {
*pdbstr = *p;
++pdbstr; ++p;
if (pdbstr-db_str>15) {
rlog(LOG_ERROR, "Could not extract keyspace db from notification.");
return;
}
}
dbno = atoi(db_str);
pch += strlen("notifier-");
str_cut(rr->element[2]->str,0,pch-rr->element[2]->str);
rr->element[2]->len = strlen(rr->element[2]->str);
@ -264,6 +282,12 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) {
return;
}
redis_restore_call(r, cm, rr->element[2]);
// we lookup again to retrieve the call to insert the kayspace db id
c = g_hash_table_lookup(cm->callhash, &callid);
if (c) {
c->redis_hosted_db = dbno;
}
}
if (strncmp(rr->element[3]->str,"del",3)==0) {
@ -277,6 +301,20 @@ void redis_notify_event_base_loopbreak(struct callmaster *cm) {
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe");
}
void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace) {
char* main_db_str[256];
sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace);
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str);
}
void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) {
char* main_db_str[256];
sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace);
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str);
}
void redis_notify(void *d) {
struct callmaster *cm = d;
struct redis *r = 0;
@ -299,8 +337,9 @@ void redis_notify(void *d) {
redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base);
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, d, "psubscribe __key*__:notifier-*");
redis_notify_subscribe_keyspace(cm,r->db);
event_base_dispatch(cm->conf.redis_notify_event_base);
}
struct redis *redis_new(const endpoint_t *ep, int db, int role) {
@ -1001,10 +1040,6 @@ static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *m
}
static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id) {
struct redis_hash call;
struct redis_list tags, sfds, streams, medias, maps;
@ -1090,6 +1125,7 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi
goto err6;
err = NULL;
c->redis_hosted_db = r->db;
obj_put(c);
err6:
@ -1537,6 +1573,7 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op
if (opmode==OP_ANSWER) {
redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid));
}
c->redis_hosted_db = r->db;
redis_consume(r);
mutex_unlock(&r->lock);

@ -83,7 +83,8 @@ void redis_update(struct call *, struct redis *, int, enum call_opmode);
void redis_delete(struct call *, struct redis *, int);
void redis_wipe(struct redis *, int);
void redis_notify_event_base_loopbreak(struct callmaster *cm);
void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace);
void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace);

Loading…
Cancel
Save