From 41fa252d6b597e05175b5884205bd0ac98db559b Mon Sep 17 00:00:00 2001 From: Lucian Balaceanu Date: Wed, 21 Oct 2015 12:56:02 +0200 Subject: [PATCH] Replacing redis hashkeys with -CALLID_IDX instead of -POINTER --- daemon/call.c | 18 +++- daemon/call.h | 24 ++++- daemon/graphite.c | 2 +- daemon/redis.c | 246 ++++++++++++++++++++++++---------------------- daemon/redis.h | 2 +- 5 files changed, 165 insertions(+), 127 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 1fa0ad4d0..b02a03558 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -353,6 +353,14 @@ static void stream_fd_closed(int fd, void *p, uintptr_t u) { +INLINE int _redis_id_generate(struct call *c, rc_type id_type) { + return c->rc.val[id_type]++; +} + +INLINE void redis_hkey_generate(char *out, struct call *c, rc_type rc_kind) { + snprintf(out, MAX_REDIS_HKEY_SIZE, "%.*s_%d", c->callid.len, c->callid.s, + _redis_id_generate(c, rc_kind)); +} INLINE void __re_address_translate(struct re_address *o, const struct endpoint *ep) { o->family = family_from_address(&ep->ip46); @@ -1726,6 +1734,7 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con med->monologue = ml; med->call = ml->call; med->index = sp->index; + redis_hkey_generate(med->redis_hkey, ml->call, RC_MEDIA); call_str_cpy(ml->call, &med->type, &sp->type); med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free); @@ -1814,6 +1823,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne em->endpoint = *ep; else em->wildcard = 1; + redis_hkey_generate(em->redis_hkey, call, RC_EM); g_queue_init(&em->sfds); media->endpoint_maps = g_slist_prepend(media->endpoint_maps, em); @@ -1826,6 +1836,7 @@ alloc: __C_DBG("allocating stream_fds for %u ports", num_ports); for (i = 0; i < num_ports; i++) { sfd = __stream_fd_new(&fd_arr[i], call); + redis_hkey_generate(sfd->redis_hkey, call, RC_SFD); g_queue_push_tail(&em->sfds, sfd); /* not referenced */ } @@ -1898,6 +1909,7 @@ static int __num_media_streams(struct call_media *media, unsigned int num_ports) __C_DBG("allocating %i new packet_streams", num_ports - media->streams.length); while (media->streams.length < num_ports) { stream = __packet_stream_new(call); + redis_hkey_generate(stream->redis_hkey, call, RC_PS); stream->media = media; g_queue_push_tail(&media->streams, stream); stream->component = media->streams.length; @@ -3545,12 +3557,14 @@ static struct call_monologue *call_get_monologue(struct call *call, const str *f __C_DBG("creating new monologue"); ret = __monologue_create(call); + redis_hkey_generate(ret->redis_hkey, call, RC_MONO); __monologue_tag(ret, fromtag); /* we need both sides of the dialogue even in the initial offer, so create * another monologue without to-tag (to be filled in later) */ new_branch: __C_DBG("create new \"other side\" monologue for viabranch "STR_FORMAT, STR_FMT0(viabranch)); os = __monologue_create(call); + redis_hkey_generate(os->redis_hkey, call, RC_MONO); ret->active_dialogue = os; os->active_dialogue = ret; __monologue_viabranch(os, viabranch); @@ -3595,8 +3609,10 @@ static struct call_monologue *call_get_dialogue(struct call *call, const str *fr /* if we don't have a fromtag monologue yet, we can use a half-complete dialogue * from the totag if there is one. otherwise we have to create a new one. */ ft = tt->active_dialogue; - if (ft->tag.s) + if (ft->tag.s) { ft = __monologue_create(call); + redis_hkey_generate(ft->redis_hkey, call, RC_MONO); + } } /* the fromtag monologue may be newly created, or half-complete from the totag, or diff --git a/daemon/call.h b/daemon/call.h index 2f2868a3d..d4e0c4d60 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -92,6 +92,8 @@ enum call_stream_state { #define RTP_BUFFER_TAIL_ROOM 512 #define RTP_BUFFER_SIZE (MAX_RTP_PACKET_SIZE + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM) +#define MAX_REDIS_HKEY_SIZE 150 + #ifndef RTP_LOOP_PROTECT #define RTP_LOOP_PROTECT 28 /* number of bytes */ #define RTP_LOOP_PACKETS 2 /* number of packets */ @@ -184,8 +186,8 @@ enum call_stream_state { #define MEDIA_SET(p, f) bf_set(&(p)->media_flags, MEDIA_FLAG_ ## f) #define MEDIA_CLEAR(p, f) bf_clear(&(p)->media_flags, MEDIA_FLAG_ ## f) - - +typedef enum { RC_SFD, RC_EM, RC_PS, RC_MEDIA, RC_MONO, RC_LIMIT} rc_type; +typedef struct redis_hkey_counters { unsigned char val[RC_LIMIT]; } redis_hkey_counters; struct poller; struct control_stream; @@ -282,12 +284,14 @@ struct stream_fd { struct packet_stream *stream; /* LOCK: call->master_lock */ struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */ + char redis_hkey[MAX_REDIS_HKEY_SIZE]; }; struct endpoint_map { struct endpoint endpoint; GQueue sfds; int wildcard:1; + char redis_hkey[MAX_REDIS_HKEY_SIZE]; }; struct loop_protector { @@ -296,7 +300,7 @@ struct loop_protector { }; struct rtp_stats { - unsigned int payload_type; + unsigned int payload_type; atomic64 packets; atomic64 bytes; atomic64 kernel_packets; @@ -340,6 +344,7 @@ struct packet_stream { /* in_lock must be held for SETTING these: */ volatile unsigned int ps_flags; + char redis_hkey[MAX_REDIS_HKEY_SIZE]; }; /* protected by call->master_lock, except the RO elements */ @@ -373,6 +378,7 @@ struct call_media { GHashTable *rtp_payload_types; volatile unsigned int media_flags; + char redis_hkey[MAX_REDIS_HKEY_SIZE]; }; /* half a dialogue */ @@ -392,6 +398,7 @@ struct call_monologue { struct call_monologue *active_dialogue; GQueue medias; + char redis_hkey[MAX_REDIS_HKEY_SIZE]; }; struct call { @@ -412,14 +419,16 @@ struct call { GSList *stream_fds; struct dtls_cert *dtls_cert; /* for outgoing */ - str callid; + str callid; time_t created; time_t last_signal; time_t deleted; time_t ml_deleted; - unsigned char tos; + unsigned char tos; char *created_from; struct sockaddr_in6 created_from_addr; + + struct redis_hkey_counters rc; }; struct local_interface { @@ -610,6 +619,11 @@ INLINE struct packet_stream *packet_stream_sink(struct packet_stream *ps) { return ret; } +INLINE void redis_hkey_cpy(char *dst, char *src) { + strncpy(dst, src, MAX_REDIS_HKEY_SIZE); + dst[MAX_REDIS_HKEY_SIZE-1] = '\0'; +} + const char * get_tag_type_text(enum tag_type t); #endif diff --git a/daemon/graphite.c b/daemon/graphite.c index 2a3b65586..00ed46a90 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -276,7 +276,7 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) { ilog(LOG_ERROR,"Sending graphite data failed."); } - copy_with_lock(&cm->totalstats_lastinterval, &graphite_stats, &cm->totalstats_lastinterval.total_average_lock); + copy_with_lock(&cm->totalstats_lastinterval, &graphite_stats, &cm->totalstats_lastinterval_lock); } } diff --git a/daemon/redis.c b/daemon/redis.c index bca61fea7..1f091db4d 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -247,8 +247,17 @@ static void redis_check_conn(struct redis *r, int role) { abort(); } +/* +static void _redis_pipe(struct redis *r, const char *fmt, ...) { + va_list ap; + char interm[200]; - + va_start(ap, fmt); + sprintf(interm, fmt, ap); + va_end(ap); + ilog(LOG_ERR, "<< %s >>> \n", interm); +} +*/ /* called with r->lock held and c->master_lock held */ static void redis_delete_call(struct call *c, struct redis *r) { @@ -256,36 +265,43 @@ static void redis_delete_call(struct call *c, struct redis *r) { GList *k; struct call_monologue *ml; struct call_media *media; + struct stream_fd *sfd; + struct packet_stream *ps; + struct endpoint_map *em; + char *mono_key, *media_key, *em_key; redis_pipe(r, "SREM calls "PB"", STR(&c->callid)); - redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"", STR(&c->callid), STR(&c->callid), - STR(&c->callid), STR(&c->callid)); + redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"", + STR(&c->callid), STR(&c->callid), STR(&c->callid), STR(&c->callid)); - for (l = c->stream_fds; l; l = l->next) - redis_pipe(r, "DEL sfd-%llu", (long long unsigned) l->data); - - for (l = c->streams; l; l = l->next) - redis_pipe(r, "DEL stream-%llu", (long long unsigned) l->data); + for (l = c->stream_fds; l; l = l->next) { + sfd = l->data; + redis_pipe(r, "DEL sfd-%s", HKEY(sfd)); + } + for (l = c->streams; l; l = l->next) { + ps = l->data; + redis_pipe(r, "DEL stream-%s", HKEY(ps)); + } for (l = c->monologues; l; l = l->next) { ml = l->data; + mono_key = HKEY(ml); - redis_pipe(r, "DEL tag-%llu other_tags-%llu medias-%llu", - (long long unsigned) ml, - (long long unsigned) ml, - (long long unsigned) ml); + redis_pipe(r, "DEL tag-%s other_tags-%s medias-%s", mono_key, mono_key, + mono_key); for (k = ml->medias.head; k; k = k->next) { media = k->data; + media_key = HKEY(media); - redis_pipe(r, "DEL media-%llu streams-%llu maps-%llu payload_types-%llu", - (long long unsigned) k->data, (long long unsigned) k->data, - (long long unsigned) k->data, (long long unsigned) k->data); + redis_pipe(r, "DEL media-%s streams-%s maps-%s payload_types-%s", + media_key, media_key, media_key, media_key); - for (n = media->endpoint_maps; n; n = n->next) - redis_pipe(r, "DEL map-%llu sfds-%llu", - (long long unsigned) n->data, - (long long unsigned) n->data); + for (n = media->endpoint_maps; n; n = n->next) { + em = n->data; + em_key = HKEY(em); + redis_pipe(r, "DEL map-%s sfds-%s", em_key, em_key); + } } } @@ -626,6 +642,8 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) { if (__get_consecutive_ports(&fd, 1, port, c)) return -1; sfd = __stream_fd_new(&fd, c); + redis_hkey_cpy( sfd->redis_hkey, it->id->str); + if (redis_hash_get_crypto_context(&sfd->crypto, &it->rh)) return -1; it->ptr = sfd; @@ -657,6 +675,7 @@ static int redis_streams(struct call *c, struct redis_list *streams) { if (redis_hash_get_crypto_context(&ps->crypto, &it->rh)) return -1; it->ptr = ps; + redis_hkey_cpy(ps->redis_hkey, it->id->str); PS_CLEAR(ps, KERNELIZED); } @@ -676,6 +695,8 @@ static int redis_tags(struct call *c, struct redis_list *tags) { if (!ml) return -1; + redis_hkey_cpy(ml->redis_hkey, it->id->str); + if (redis_hash_get_time_t(&ml->created, &it->rh, "created")) return -1; if (!redis_hash_get_str(&s, &it->rh, "tag")) @@ -746,6 +767,9 @@ static int redis_tags_populate(struct redis *r, struct redis_list *tags, struct med->call = ml->call; med->index = i; med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free); + + redis_hkey_cpy(med->redis_hkey, it_media->id->str); + g_queue_push_tail(&ml->medias, med); if (redis_hash_get_str(&s, &it_media->rh, "type")) @@ -803,6 +827,8 @@ static int redis_tags_populate(struct redis *r, struct redis_list *tags, struct g_queue_init(&em->sfds); med->endpoint_maps = g_slist_prepend(med->endpoint_maps, em); + redis_hkey_cpy(em->redis_hkey, it_em->id->str); + if (redis_hash_get_endpoint(&em->endpoint, &it_em->rh, "endpoint")) goto free2; if (redis_hash_build_list(r, "sfds", it_em->id, sfds, redis_build_em_sfds, em)) @@ -1052,18 +1078,13 @@ static int redis_update_crypto_params(struct redis *r, const char *pref, void *s { if (!p->crypto_suite) return -1; - redis_pipe(r, "HMSET %s-%llu %s-crypto_suite %s %s-master_key "PB" %s-master_salt "PB"", - pref, - (long long unsigned) suff, + redis_pipe(r, "HMSET %s-%s %s-crypto_suite %s %s-master_key "PB" %s-master_salt "PB"", + pref, suff, key, p->crypto_suite->name, key, S_LEN(p->master_key, sizeof(p->master_key)), key, S_LEN(p->master_salt, sizeof(p->master_salt))); if (p->mki) - redis_pipe(r, "HMSET %s-%llu %s-mki "PB"", - pref, - (long long unsigned) suff, - key, - S_LEN(p->mki, p->mki_len)); + redis_pipe(r, "HMSET %s-%s %s-mki "PB"", pref, suff, key, S_LEN(p->mki, p->mki_len)); return 0; } @@ -1072,10 +1093,8 @@ static void redis_update_crypto_context(struct redis *r, const char *pref, void { if (redis_update_crypto_params(r, pref, suff, "", &c->params)) return; - redis_pipe(r, "HMSET %s-%llu last_index "UINT64F"", - pref, - (long long unsigned) suff, - c->last_index); + redis_pipe(r, "HMSET %s-%s last_index "UINT64F"", + pref, suff, c->last_index); } static void redis_update_endpoint(struct redis *r, const char *pref, void *suff, const char *key, const struct endpoint *e) @@ -1083,17 +1102,14 @@ static void redis_update_endpoint(struct redis *r, const char *pref, void *suff, char a[64]; inet_ntop(AF_INET6, &e->ip46, a, sizeof(a)); - redis_pipe(r, "HMSET %s-%llu %s-addr %s %s-port %hu", - pref, - (long long unsigned) suff, - key, a, key, (short unsigned) e->port); + redis_pipe(r, "HMSET %s-%s %s-addr %s %s-port %hu", + pref, suff, key, a, key, (short unsigned) e->port); } static void redis_update_stats(struct redis *r, const char *pref, void *suff, const char *key, const struct stats *s) { - redis_pipe(r, "HMSET %s-%llu %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"", - pref, - (long long unsigned) suff, + redis_pipe(r, "HMSET %s-%s %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"", + pref, suff, key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes), key, atomic64_get(&s->errors)); } @@ -1102,9 +1118,8 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, voi { if (!f->hash_func) return; - redis_pipe(r, "HMSET %s-%llu hash_func %s fingerprint "PB"", - pref, - (long long unsigned) suff, + redis_pipe(r, "HMSET %s-%s hash_func %s fingerprint "PB"", + pref, suff, f->hash_func->name, S_LEN(f->digest, sizeof(f->digest))); } @@ -1125,6 +1140,9 @@ void redis_update(struct call *c, struct redis *r, int role) { struct rtp_payload_type *pt; char a[64]; unsigned int pt_index; + char *sfd_key, *ps_key, *mono_key, *active_mono_key, *other_mono_key, + *media_key, *em_key, *ps_rtp_sink_key, *ps_rtcp_sink_key, + *ps_rtcp_sibling_key; if (!r) return; @@ -1147,15 +1165,16 @@ void redis_update(struct call *c, struct redis *r, int role) { for (l = c->stream_fds; l; l = l->next) { sfd = l->data; + sfd_key = HKEY(sfd) ; + ps_key = HKEY(sfd->stream); - redis_pipe(r, "DEL sfd-%llu", (long long unsigned) sfd); - redis_pipe(r, "HMSET sfd-%llu localport %hu stream %llu", - (long long unsigned) sfd, (short unsigned) sfd->fd.localport, - (long long unsigned) sfd->stream); + redis_pipe(r, "DEL sfd-%s", sfd_key); + redis_pipe(r, "HMSET sfd-%s localport %hu stream %s", + sfd_key, (short unsigned) sfd->fd.localport, ps_key); redis_update_crypto_context(r, "sfd", sfd, &sfd->crypto); /* XXX DTLS?? */ - redis_pipe(r, "EXPIRE sfd-%llu 86400", (long long unsigned) sfd); - redis_pipe(r, "LPUSH sfds-"PB" %llu", STR(&c->callid), (long long unsigned) sfd); + redis_pipe(r, "EXPIRE sfd-%s 86400", sfd_key); + redis_pipe(r, "LPUSH sfds-"PB" %s", STR(&c->callid), sfd_key); } for (l = c->streams; l; l = l->next) { @@ -1164,133 +1183,122 @@ void redis_update(struct call *c, struct redis *r, int role) { mutex_lock(&ps->in_lock); mutex_lock(&ps->out_lock); - redis_pipe(r, "DEL stream-%llu", (long long unsigned) ps); - redis_pipe(r, "HMSET stream-%llu media %llu sfd %llu rtp_sink %llu " - "rtcp_sink %llu rtcp_sibling %llu last_packet "UINT64F" " + ps_key = HKEY(ps); + ps_rtp_sink_key = HKEY(ps->rtp_sink); + ps_rtcp_sink_key = HKEY(ps->rtcp_sink); + ps_rtcp_sibling_key = HKEY(ps->rtcp_sibling); + media_key = HKEY(ps->media); + sfd_key = HKEY(ps->sfd); + + redis_pipe(r, "DEL stream-%s", ps_key); + redis_pipe(r, "HMSET stream-%s media %s sfd %s rtp_sink %s " + "rtcp_sink %s rtcp_sibling %s last_packet "UINT64F" " "ps_flags %u", - (long long unsigned) ps, - (long long unsigned) ps->media, - (long long unsigned) ps->sfd, - (long long unsigned) ps->rtp_sink, - (long long unsigned) ps->rtcp_sink, - (long long unsigned) ps->rtcp_sibling, - atomic64_get(&ps->last_packet), + ps_key, media_key, sfd_key, ps_rtp_sink_key, + ps_rtcp_sink_key, ps_rtcp_sibling_key, atomic64_get(&ps->last_packet), ps->ps_flags); - redis_update_endpoint(r, "stream", ps, "endpoint", &ps->endpoint); - redis_update_endpoint(r, "stream", ps, "advertised_endpoint", &ps->advertised_endpoint); - redis_update_stats(r, "stream", ps, "stats", &ps->stats); - redis_update_crypto_context(r, "stream", ps, &ps->crypto); + redis_update_endpoint(r, "stream", ps_key, "endpoint", &ps->endpoint); + redis_update_endpoint(r, "stream", ps_key, "advertised_endpoint", &ps->advertised_endpoint); + redis_update_stats(r, "stream", ps_key, "stats", &ps->stats); + redis_update_crypto_context(r, "stream", ps_key, &ps->crypto); /* XXX DTLS?? */ mutex_unlock(&ps->in_lock); mutex_unlock(&ps->out_lock); - redis_pipe(r, "EXPIRE stream-%llu 86400", (long long unsigned) ps); - redis_pipe(r, "LPUSH streams-"PB" %llu", STR(&c->callid), (long long unsigned) ps); + redis_pipe(r, "EXPIRE stream-%s 86400", ps_key); + redis_pipe(r, "LPUSH streams-"PB" %s", STR(&c->callid), ps_key); } for (l = c->monologues; l; l = l->next) { ml = l->data; - - redis_pipe(r, "DEL tag-%llu other_tags-%llu medias-%llu", - (long long unsigned) ml, - (long long unsigned) ml, - (long long unsigned) ml); - redis_pipe(r, "HMSET tag-%llu created %llu active %llu deleted %llu", - (long long unsigned) ml, - (long long unsigned) ml->created, - (long long unsigned) ml->active_dialogue, - (long long unsigned) ml->deleted); + mono_key = HKEY(ml); + active_mono_key = HKEY(ml->active_dialogue); + + redis_pipe(r, "DEL tag-%s other_tags-%s medias-%s", + mono_key, mono_key, mono_key); + redis_pipe(r, "HMSET tag-%s created %llu active %s deleted %llu", + mono_key, (long long unsigned) ml->created, + active_mono_key, (long long unsigned) ml->deleted); if (ml->tag.s) - redis_pipe(r, "HMSET tag-%llu tag "PB"", - (long long unsigned) ml, - STR(&ml->tag)); + redis_pipe(r, "HMSET tag-%s tag "PB"", mono_key, STR(&ml->tag)); if (ml->viabranch.s) - redis_pipe(r, "HMSET tag-%llu via-branch "PB"", - (long long unsigned) ml, - STR(&ml->viabranch)); + redis_pipe(r, "HMSET tag-%s via-branch "PB"", mono_key, STR(&ml->viabranch)); k = g_hash_table_get_values(ml->other_tags); for (m = k; m; m = m->next) { - redis_pipe(r, "RPUSH other_tags-%llu %llu", - (long long unsigned) ml, - (long long unsigned) m->data); + other_mono_key = HKEY(((struct call_monologue *) m->data)); + redis_pipe(r, "RPUSH other_tags-%s %s", mono_key, other_mono_key); } g_list_free(k); for (k = ml->medias.head; k; k = k->next) { media = k->data; + media_key = HKEY(media); - redis_pipe(r, "DEL media-%llu streams-%llu maps-%llu payload_types-%llu", - (long long unsigned) media, (long long unsigned) media, - (long long unsigned) media, (long long unsigned) media); - redis_pipe(r, "HMSET media-%llu " + redis_pipe(r, "DEL media-%s streams-%s maps-%s payload_types-%s", + media_key, media_key, + media_key, media_key); + redis_pipe(r, "HMSET media-%s " "type "PB" protocol %s desired_family %i " "sdes_in_tag %u sdes_out_tag %u interface "PB" local_address "IP6F" " "media_flags %u", - (long long unsigned) media, + media_key, STR(&media->type), media->protocol ? media->protocol->name : "", media->desired_family, media->sdes_in.tag, media->sdes_out.tag, STR(&media->interface->name), IP6P(&media->local_address->addr.s6_addr), media->media_flags); - redis_update_crypto_params(r, "media", media, "sdes_in", &media->sdes_in.params); - redis_update_crypto_params(r, "media", media, "sdes_out", &media->sdes_out.params); - redis_update_dtls_fingerprint(r, "media", media, &media->fingerprint); + redis_update_crypto_params(r, "media", media_key, "sdes_in", &media->sdes_in.params); + redis_update_crypto_params(r, "media", media_key, "sdes_out", &media->sdes_out.params); + redis_update_dtls_fingerprint(r, "media", media_key, &media->fingerprint); for (m = media->streams.head; m; m = m->next) { - redis_pipe(r, "RPUSH streams-%llu %llu", - (long long unsigned) media, - (long long unsigned) m->data); + ps_key = HKEY(((struct packet_stream *) m->data)); + redis_pipe(r, "RPUSH streams-%s %s", media_key, ps_key); } for (n = media->endpoint_maps; n; n = n->next) { ep = n->data; + em_key = HKEY(ep); - redis_pipe(r, "DEL map-%llu sfds-%llu", - (long long unsigned) ep, - (long long unsigned) ep); - redis_pipe(r, "HMSET map-%llu wildcard %i", - (long long unsigned) ep, - ep->wildcard); - redis_update_endpoint(r, "map", ep, "endpoint", &ep->endpoint); + redis_pipe(r, "DEL map-%s sfds-%s", em_key, em_key); + redis_pipe(r, "HMSET map-%s wildcard %i", em_key, ep->wildcard); + redis_update_endpoint(r, "map", em_key, "endpoint", &ep->endpoint); for (m = ep->sfds.head; m; m = m->next) { - redis_pipe(r, "RPUSH sfds-%llu %llu", - (long long unsigned) ep, - (long long unsigned) m->data); + sfd_key = HKEY(((struct stream_fd *) m->data)); + redis_pipe(r, "RPUSH sfds-%s %s", em_key, sfd_key); } - redis_pipe(r, "EXPIRE map-%llu 86400", (long long unsigned) ep); - redis_pipe(r, "EXPIRE sfds-%llu 86400", (long long unsigned) ep); - redis_pipe(r, "LPUSH maps-%llu %llu", - (long long unsigned) media, (long long unsigned) ep); + redis_pipe(r, "EXPIRE map-%s 86400", em_key); + redis_pipe(r, "EXPIRE sfds-%s 86400", em_key); + redis_pipe(r, "LPUSH maps-%s %s", media_key, em_key); } pt_list = g_hash_table_get_values(media->rtp_payload_types); pt_index = 0; for (pt_iter = pt_list; pt_iter; pt_iter = pt_iter->next) { pt = pt_iter->data; - redis_pipe(r, "HSET payload_types-%llu %u %u", - (long long unsigned) media, + redis_pipe(r, "HSET payload_types-%s %u %u", + media_key, (unsigned int) pt_index, (unsigned int) pt->payload_type); pt_index++; } g_list_free(pt_list); - redis_pipe(r, "EXPIRE media-%llu 86400", (long long unsigned) media); - redis_pipe(r, "EXPIRE streams-%llu 86400", (long long unsigned) media); - redis_pipe(r, "EXPIRE maps-%llu 86400", (long long unsigned) media); - redis_pipe(r, "EXPIRE payload_types-%llu 86400", (long long unsigned) media); - redis_pipe(r, "LPUSH medias-%llu %llu", - (long long unsigned) ml, (long long unsigned) media); + redis_pipe(r, "EXPIRE media-%s 86400", media_key); + redis_pipe(r, "EXPIRE streams-%s 86400", media_key); + redis_pipe(r, "EXPIRE maps-%s 86400", media_key); + redis_pipe(r, "EXPIRE payload_types-%s 86400", media_key); + redis_pipe(r, "LPUSH medias-%s %s", mono_key, media_key); } - redis_pipe(r, "EXPIRE tag-%llu 86400", (long long unsigned) ml); - redis_pipe(r, "EXPIRE other_tags-%llu 86400", (long long unsigned) ml); - redis_pipe(r, "EXPIRE medias-%llu 86400", (long long unsigned) ml); - redis_pipe(r, "LPUSH tags-"PB" %llu", STR(&c->callid), (long long unsigned) ml); + redis_pipe(r, "EXPIRE tag-%s 86400", mono_key); + redis_pipe(r, "EXPIRE other_tags-%s 86400", mono_key); + redis_pipe(r, "EXPIRE medias-%s 86400", mono_key); + redis_pipe(r, "LPUSH tags-"PB" %s", STR(&c->callid), mono_key); } redis_pipe(r, "EXPIRE call-"PB" 86400", STR(&c->callid)); diff --git a/daemon/redis.h b/daemon/redis.h index 0ab1f710b..92c0c87e4 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -45,7 +45,7 @@ struct list_item { }; - +#define HKEY(ptr) ptr ? ptr->redis_hkey : "0"