diff --git a/daemon/call.h b/daemon/call.h index 22c38594e..dcb1034a3 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -414,6 +414,7 @@ struct call { unsigned char tos; char *created_from; sockaddr_t created_from_addr; + int redis_hosted_db; }; struct callmaster_config { diff --git a/daemon/cli.c b/daemon/cli.c index ad831ffe8..ba88dd053 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -164,8 +164,8 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m return; } - printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu\n\n", - c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal); + printlen = snprintf (replybuffer,(outbufend-replybuffer), "\ncallid: %60s | deletionmark:%4s | created:%12i | proxy:%s | tos:%u | last_signal:%llu | redis_keyspace:%i\n\n", + c->callid.s , c->ml_deleted?"yes":"no", (int)c->created, c->created_from, (unsigned int)c->tos, (unsigned long long)c->last_signal, c->redis_hosted_db); ADJUSTLEN(printlen,outbufend,replybuffer); for (l = c->monologues.head; l; l = l->next) { @@ -360,7 +360,7 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char* while (g_hash_table_iter_next (&iter, &key, &value)) { ptrkey = (str*)key; call = (struct call*)value; - printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from); + printlen = snprintf(replybuffer, outbufend-replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created, call->created_from, call->redis_hosted_db); ADJUSTLEN(printlen,outbufend,replybuffer); } rwlock_unlock_r(&m->hashlock); @@ -467,6 +467,52 @@ static void cli_incoming_terminate(char* buffer, int len, struct callmaster* m, obj_put(c); } +static void cli_incoming_ksadd(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { + int printlen=0; + + unsigned int keyspace_db; + str str_keyspace_db; + + if (len<=1) { + printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "More parameters required."); + ADJUSTLEN(printlen,outbufend,replybuffer); + return; + } + ++buffer; --len; // one space + + str_keyspace_db.s = buffer; + str_keyspace_db.len = len; + keyspace_db = str_to_i(&str_keyspace_db, -1); + + redis_notify_subscribe_keyspace(m,keyspace_db); + + printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully added keyspace %i to redis notifications.\n", keyspace_db); + ADJUSTLEN(printlen,outbufend,replybuffer); +} + +static void cli_incoming_ksrm(char* buffer, int len, struct callmaster* m, char* replybuffer, const char* outbufend) { + int printlen=0; + + unsigned int keyspace_db; + str str_keyspace_db; + + if (len<=1) { + printlen = snprintf(replybuffer, outbufend-replybuffer, "%s\n", "More parameters required."); + ADJUSTLEN(printlen,outbufend,replybuffer); + return; + } + ++buffer; --len; // one space + + str_keyspace_db.s = buffer; + str_keyspace_db.len = len; + keyspace_db = str_to_i(&str_keyspace_db, -1); + + redis_notify_unsubscribe_keyspace(m,keyspace_db); + + printlen = snprintf(replybuffer, outbufend-replybuffer, "Successfully removed keyspace %i to redis notifications.\n", keyspace_db); + ADJUSTLEN(printlen,outbufend,replybuffer); +} + static void cli_incoming(int fd, void *p, uintptr_t u) { int nfd; struct sockaddr_in sin; @@ -514,6 +560,8 @@ next: static const char* LIST = "list"; static const char* TERMINATE = "terminate"; static const char* SET = "set"; + static const char* KSADD = "ksadd"; + static const char* KSRM = "ksrm"; if (strncmp(inbuf,LIST,strlen(LIST)) == 0) { cli_incoming_list(inbuf+strlen(LIST), inlen-strlen(LIST), cli->callmaster, outbuf, outbufend); @@ -521,6 +569,10 @@ next: cli_incoming_terminate(inbuf+strlen(TERMINATE), inlen-strlen(TERMINATE), cli->callmaster, outbuf, outbufend); } else if (strncmp(inbuf,SET,strlen(SET)) == 0) { cli_incoming_set(inbuf+strlen(SET), inlen-strlen(SET), cli->callmaster, outbuf, outbufend); + } else if (strncmp(inbuf,KSADD,strlen(KSADD)) == 0) { + cli_incoming_ksadd(inbuf+strlen(KSADD), inlen-strlen(KSADD), cli->callmaster, outbuf, outbufend); + } else if (strncmp(inbuf,KSRM,strlen(KSRM)) == 0) { + cli_incoming_ksrm(inbuf+strlen(KSRM), inlen-strlen(KSRM), cli->callmaster, outbuf, outbufend); } else { sprintf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", inbuf); } diff --git a/daemon/redis.c b/daemon/redis.c index 9103533c0..1df5a846b 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -221,6 +221,10 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { struct redis *r = 0; struct call* c; str callid; + char db_str[16]; memset(&db_str, 0, 8); + char* pdbstr = db_str; + unsigned char* p = 0; + int dbno; if (cm->conf.redis_read) { r = cm->conf.redis_read; @@ -249,6 +253,20 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { return; } + + // extract from __keyspace@__ prefix + p = strstr(rr->element[2]->str, "@"); + ++p; + while (isdigit(*p)) { + *pdbstr = *p; + ++pdbstr; ++p; + if (pdbstr-db_str>15) { + rlog(LOG_ERROR, "Could not extract keyspace db from notification."); + return; + } + } + dbno = atoi(db_str); + pch += strlen("notifier-"); str_cut(rr->element[2]->str,0,pch-rr->element[2]->str); rr->element[2]->len = strlen(rr->element[2]->str); @@ -264,6 +282,12 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { return; } redis_restore_call(r, cm, rr->element[2]); + + // we lookup again to retrieve the call to insert the kayspace db id + c = g_hash_table_lookup(cm->callhash, &callid); + if (c) { + c->redis_hosted_db = dbno; + } } if (strncmp(rr->element[3]->str,"del",3)==0) { @@ -277,6 +301,20 @@ void redis_notify_event_base_loopbreak(struct callmaster *cm) { redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe"); } +void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace) { + char* main_db_str[256]; + sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace); + + redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); +} + +void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) { + char* main_db_str[256]; + sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace); + + redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); +} + void redis_notify(void *d) { struct callmaster *cm = d; struct redis *r = 0; @@ -299,8 +337,9 @@ void redis_notify(void *d) { redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base); - redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, d, "psubscribe __key*__:notifier-*"); + redis_notify_subscribe_keyspace(cm,r->db); event_base_dispatch(cm->conf.redis_notify_event_base); + } struct redis *redis_new(const endpoint_t *ep, int db, int role) { @@ -1001,10 +1040,6 @@ static int redis_link_maps(struct redis *r, struct call *c, struct redis_list *m } - - - - static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id) { struct redis_hash call; struct redis_list tags, sfds, streams, medias, maps; @@ -1090,6 +1125,7 @@ static void redis_restore_call(struct redis *r, struct callmaster *m, const redi goto err6; err = NULL; + c->redis_hosted_db = r->db; obj_put(c); err6: @@ -1537,6 +1573,7 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op if (opmode==OP_ANSWER) { redis_pipe(r, "SADD notifier-"PB" "PB"", STR(&c->callid), STR(&c->callid)); } + c->redis_hosted_db = r->db; redis_consume(r); mutex_unlock(&r->lock); diff --git a/daemon/redis.h b/daemon/redis.h index 953de2096..462bc7556 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -83,7 +83,8 @@ void redis_update(struct call *, struct redis *, int, enum call_opmode); void redis_delete(struct call *, struct redis *, int); void redis_wipe(struct redis *, int); void redis_notify_event_base_loopbreak(struct callmaster *cm); - +void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace); +void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace);