diff --git a/daemon/call.h b/daemon/call.h index f035a826c..95c9328ac 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -426,6 +426,7 @@ struct callmaster_config { struct redis *redis; struct redis *redis_read; struct redis *redis_write; + struct event_base *redis_notify_event_base; char *b2b_url; unsigned char default_tos; enum xmlrpc_format fmt; diff --git a/daemon/main.c b/daemon/main.c index a240532fc..39c1cd817 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -645,6 +645,8 @@ int main(int argc, char **argv) { threads_join_all(0); } + redis_notify_event_base_loopbreak(ctx.m); + threads_join_all(1); ilog(LOG_INFO, "Version %s shutting down", RTPENGINE_VERSION); diff --git a/daemon/redis.c b/daemon/redis.c index ff1e357cf..3caa31408 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -263,6 +263,14 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { } +void redis_notify_event_base_loopbreak(struct callmaster *cm) { + if (cm->conf.redis_notify_event_base) { + event_base_loopbreak(cm->conf.redis_notify_event_base); + free(cm->conf.redis_notify_event_base); + cm->conf.redis_notify_event_base = 0; + } +} + void redis_notify(void *d) { struct callmaster *cm = d; struct redis *r = 0; @@ -275,7 +283,7 @@ void redis_notify(void *d) { return; } - struct event_base *base = event_base_new(); + cm->conf.redis_notify_event_base = event_base_new(); redisAsyncContext *c = redisAsyncConnect(r->host, r->endpoint.port); if (c->err) { @@ -283,10 +291,10 @@ void redis_notify(void *d) { return; } - redisLibeventAttach(c, base); - // redisAsyncCommand(c, onRedisNotification, d, "SUBSCRIBE testtopic"); + redisLibeventAttach(c, cm->conf.redis_notify_event_base); + redisAsyncCommand(c, onRedisNotification, d, "psubscribe __key*__:notifier-*"); - event_base_dispatch(base); + event_base_dispatch(cm->conf.redis_notify_event_base); } struct redis *redis_new(const endpoint_t *ep, int db, int role) { diff --git a/daemon/redis.h b/daemon/redis.h index 43b00c4ed..953de2096 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -82,7 +82,7 @@ int redis_restore(struct callmaster *, struct redis *, int); void redis_update(struct call *, struct redis *, int, enum call_opmode); void redis_delete(struct call *, struct redis *, int); void redis_wipe(struct redis *, int); - +void redis_notify_event_base_loopbreak(struct callmaster *cm);