|
|
|
@ -336,6 +336,22 @@ err:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void redis_notify_event_base_loopbreak(struct callmaster *cm) {
|
|
|
|
void redis_notify_event_base_loopbreak(struct callmaster *cm) {
|
|
|
|
|
|
|
|
// sanity checks
|
|
|
|
|
|
|
|
if (!cm->conf.redis_notify_event_base) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis event_base_new() is NULL on loopbreak");
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (!cm->conf.redis_notify_async_context) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis notify async context is NULL on loopbreak");
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (cm->conf.redis_notify_async_context->err) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis notify async context error on loopbreak: %s", cm->conf.redis_notify_async_context->errstr);
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
event_base_loopbreak(cm->conf.redis_notify_event_base);
|
|
|
|
event_base_loopbreak(cm->conf.redis_notify_event_base);
|
|
|
|
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe");
|
|
|
|
redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -343,6 +359,17 @@ void redis_notify_event_base_loopbreak(struct callmaster *cm) {
|
|
|
|
void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace) {
|
|
|
|
void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace) {
|
|
|
|
char main_db_str[256];
|
|
|
|
char main_db_str[256];
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// sanity checks
|
|
|
|
|
|
|
|
if (!cm->conf.redis_notify_async_context) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis notify async context NULL on subscribe");
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (cm->conf.redis_notify_async_context->err) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis notify async context error on subscribe: %s", cm->conf.redis_notify_async_context->errstr);
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
memset(&main_db_str, 0, 256);
|
|
|
|
memset(&main_db_str, 0, 256);
|
|
|
|
sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace);
|
|
|
|
sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace);
|
|
|
|
|
|
|
|
|
|
|
|
@ -352,6 +379,17 @@ void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace) {
|
|
|
|
void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) {
|
|
|
|
void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) {
|
|
|
|
char main_db_str[256];
|
|
|
|
char main_db_str[256];
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// sanity checks
|
|
|
|
|
|
|
|
if (!cm->conf.redis_notify_async_context) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis notify async context NULL on unsubscribe");
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (cm->conf.redis_notify_async_context->err) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis notify async context error on unsubscribe: %s", cm->conf.redis_notify_async_context->errstr);
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
memset(&main_db_str, 0, 256);
|
|
|
|
memset(&main_db_str, 0, 256);
|
|
|
|
sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace);
|
|
|
|
sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace);
|
|
|
|
|
|
|
|
|
|
|
|
@ -370,11 +408,21 @@ static void redis_notify(struct callmaster *cm) {
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cm->conf.redis_notify_event_base = NULL;
|
|
|
|
cm->conf.redis_notify_event_base = event_base_new();
|
|
|
|
cm->conf.redis_notify_event_base = event_base_new();
|
|
|
|
|
|
|
|
if (!cm->conf.redis_notify_event_base) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis event_base_new() NULL error");
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cm->conf.redis_notify_async_context = NULL;
|
|
|
|
cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port);
|
|
|
|
cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port);
|
|
|
|
|
|
|
|
if (!cm->conf.redis_notify_async_context) {
|
|
|
|
|
|
|
|
rlog(LOG_ERROR, "Redis notify async context NULL error");
|
|
|
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
}
|
|
|
|
if (cm->conf.redis_notify_async_context->err) {
|
|
|
|
if (cm->conf.redis_notify_async_context->err) {
|
|
|
|
rlog(LOG_ERROR, "Redis notification error: %s\n", cm->conf.redis_notify_async_context->errstr);
|
|
|
|
rlog(LOG_ERROR, "Redis notify async context error: %s", cm->conf.redis_notify_async_context->errstr);
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|