Replacing redis hashkeys with -CALLID_IDX instead of -POINTER

pull/183/head
Lucian Balaceanu 10 years ago
parent 59a69f066f
commit 41fa252d6b

@ -353,6 +353,14 @@ static void stream_fd_closed(int fd, void *p, uintptr_t u) {
INLINE int _redis_id_generate(struct call *c, rc_type id_type) {
return c->rc.val[id_type]++;
}
INLINE void redis_hkey_generate(char *out, struct call *c, rc_type rc_kind) {
snprintf(out, MAX_REDIS_HKEY_SIZE, "%.*s_%d", c->callid.len, c->callid.s,
_redis_id_generate(c, rc_kind));
}
INLINE void __re_address_translate(struct re_address *o, const struct endpoint *ep) {
o->family = family_from_address(&ep->ip46);
@ -1726,6 +1734,7 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con
med->monologue = ml;
med->call = ml->call;
med->index = sp->index;
redis_hkey_generate(med->redis_hkey, ml->call, RC_MEDIA);
call_str_cpy(ml->call, &med->type, &sp->type);
med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free);
@ -1814,6 +1823,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne
em->endpoint = *ep;
else
em->wildcard = 1;
redis_hkey_generate(em->redis_hkey, call, RC_EM);
g_queue_init(&em->sfds);
media->endpoint_maps = g_slist_prepend(media->endpoint_maps, em);
@ -1826,6 +1836,7 @@ alloc:
__C_DBG("allocating stream_fds for %u ports", num_ports);
for (i = 0; i < num_ports; i++) {
sfd = __stream_fd_new(&fd_arr[i], call);
redis_hkey_generate(sfd->redis_hkey, call, RC_SFD);
g_queue_push_tail(&em->sfds, sfd); /* not referenced */
}
@ -1898,6 +1909,7 @@ static int __num_media_streams(struct call_media *media, unsigned int num_ports)
__C_DBG("allocating %i new packet_streams", num_ports - media->streams.length);
while (media->streams.length < num_ports) {
stream = __packet_stream_new(call);
redis_hkey_generate(stream->redis_hkey, call, RC_PS);
stream->media = media;
g_queue_push_tail(&media->streams, stream);
stream->component = media->streams.length;
@ -3545,12 +3557,14 @@ static struct call_monologue *call_get_monologue(struct call *call, const str *f
__C_DBG("creating new monologue");
ret = __monologue_create(call);
redis_hkey_generate(ret->redis_hkey, call, RC_MONO);
__monologue_tag(ret, fromtag);
/* we need both sides of the dialogue even in the initial offer, so create
* another monologue without to-tag (to be filled in later) */
new_branch:
__C_DBG("create new \"other side\" monologue for viabranch "STR_FORMAT, STR_FMT0(viabranch));
os = __monologue_create(call);
redis_hkey_generate(os->redis_hkey, call, RC_MONO);
ret->active_dialogue = os;
os->active_dialogue = ret;
__monologue_viabranch(os, viabranch);
@ -3595,8 +3609,10 @@ static struct call_monologue *call_get_dialogue(struct call *call, const str *fr
/* if we don't have a fromtag monologue yet, we can use a half-complete dialogue
* from the totag if there is one. otherwise we have to create a new one. */
ft = tt->active_dialogue;
if (ft->tag.s)
if (ft->tag.s) {
ft = __monologue_create(call);
redis_hkey_generate(ft->redis_hkey, call, RC_MONO);
}
}
/* the fromtag monologue may be newly created, or half-complete from the totag, or

@ -92,6 +92,8 @@ enum call_stream_state {
#define RTP_BUFFER_TAIL_ROOM 512
#define RTP_BUFFER_SIZE (MAX_RTP_PACKET_SIZE + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM)
#define MAX_REDIS_HKEY_SIZE 150
#ifndef RTP_LOOP_PROTECT
#define RTP_LOOP_PROTECT 28 /* number of bytes */
#define RTP_LOOP_PACKETS 2 /* number of packets */
@ -184,8 +186,8 @@ enum call_stream_state {
#define MEDIA_SET(p, f) bf_set(&(p)->media_flags, MEDIA_FLAG_ ## f)
#define MEDIA_CLEAR(p, f) bf_clear(&(p)->media_flags, MEDIA_FLAG_ ## f)
typedef enum { RC_SFD, RC_EM, RC_PS, RC_MEDIA, RC_MONO, RC_LIMIT} rc_type;
typedef struct redis_hkey_counters { unsigned char val[RC_LIMIT]; } redis_hkey_counters;
struct poller;
struct control_stream;
@ -282,12 +284,14 @@ struct stream_fd {
struct packet_stream *stream; /* LOCK: call->master_lock */
struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */
struct dtls_connection dtls; /* LOCK: stream->in_lock */
char redis_hkey[MAX_REDIS_HKEY_SIZE];
};
struct endpoint_map {
struct endpoint endpoint;
GQueue sfds;
int wildcard:1;
char redis_hkey[MAX_REDIS_HKEY_SIZE];
};
struct loop_protector {
@ -296,7 +300,7 @@ struct loop_protector {
};
struct rtp_stats {
unsigned int payload_type;
unsigned int payload_type;
atomic64 packets;
atomic64 bytes;
atomic64 kernel_packets;
@ -340,6 +344,7 @@ struct packet_stream {
/* in_lock must be held for SETTING these: */
volatile unsigned int ps_flags;
char redis_hkey[MAX_REDIS_HKEY_SIZE];
};
/* protected by call->master_lock, except the RO elements */
@ -373,6 +378,7 @@ struct call_media {
GHashTable *rtp_payload_types;
volatile unsigned int media_flags;
char redis_hkey[MAX_REDIS_HKEY_SIZE];
};
/* half a dialogue */
@ -392,6 +398,7 @@ struct call_monologue {
struct call_monologue *active_dialogue;
GQueue medias;
char redis_hkey[MAX_REDIS_HKEY_SIZE];
};
struct call {
@ -412,14 +419,16 @@ struct call {
GSList *stream_fds;
struct dtls_cert *dtls_cert; /* for outgoing */
str callid;
str callid;
time_t created;
time_t last_signal;
time_t deleted;
time_t ml_deleted;
unsigned char tos;
unsigned char tos;
char *created_from;
struct sockaddr_in6 created_from_addr;
struct redis_hkey_counters rc;
};
struct local_interface {
@ -610,6 +619,11 @@ INLINE struct packet_stream *packet_stream_sink(struct packet_stream *ps) {
return ret;
}
INLINE void redis_hkey_cpy(char *dst, char *src) {
strncpy(dst, src, MAX_REDIS_HKEY_SIZE);
dst[MAX_REDIS_HKEY_SIZE-1] = '\0';
}
const char * get_tag_type_text(enum tag_type t);
#endif

@ -276,7 +276,7 @@ void graphite_loop_run(struct callmaster* callmaster, int seconds) {
ilog(LOG_ERROR,"Sending graphite data failed.");
}
copy_with_lock(&cm->totalstats_lastinterval, &graphite_stats, &cm->totalstats_lastinterval.total_average_lock);
copy_with_lock(&cm->totalstats_lastinterval, &graphite_stats, &cm->totalstats_lastinterval_lock);
}
}

@ -247,8 +247,17 @@ static void redis_check_conn(struct redis *r, int role) {
abort();
}
/*
static void _redis_pipe(struct redis *r, const char *fmt, ...) {
va_list ap;
char interm[200];
va_start(ap, fmt);
sprintf(interm, fmt, ap);
va_end(ap);
ilog(LOG_ERR, "<< %s >>> \n", interm);
}
*/
/* called with r->lock held and c->master_lock held */
static void redis_delete_call(struct call *c, struct redis *r) {
@ -256,36 +265,43 @@ static void redis_delete_call(struct call *c, struct redis *r) {
GList *k;
struct call_monologue *ml;
struct call_media *media;
struct stream_fd *sfd;
struct packet_stream *ps;
struct endpoint_map *em;
char *mono_key, *media_key, *em_key;
redis_pipe(r, "SREM calls "PB"", STR(&c->callid));
redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"", STR(&c->callid), STR(&c->callid),
STR(&c->callid), STR(&c->callid));
redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"",
STR(&c->callid), STR(&c->callid), STR(&c->callid), STR(&c->callid));
for (l = c->stream_fds; l; l = l->next)
redis_pipe(r, "DEL sfd-%llu", (long long unsigned) l->data);
for (l = c->streams; l; l = l->next)
redis_pipe(r, "DEL stream-%llu", (long long unsigned) l->data);
for (l = c->stream_fds; l; l = l->next) {
sfd = l->data;
redis_pipe(r, "DEL sfd-%s", HKEY(sfd));
}
for (l = c->streams; l; l = l->next) {
ps = l->data;
redis_pipe(r, "DEL stream-%s", HKEY(ps));
}
for (l = c->monologues; l; l = l->next) {
ml = l->data;
mono_key = HKEY(ml);
redis_pipe(r, "DEL tag-%llu other_tags-%llu medias-%llu",
(long long unsigned) ml,
(long long unsigned) ml,
(long long unsigned) ml);
redis_pipe(r, "DEL tag-%s other_tags-%s medias-%s", mono_key, mono_key,
mono_key);
for (k = ml->medias.head; k; k = k->next) {
media = k->data;
media_key = HKEY(media);
redis_pipe(r, "DEL media-%llu streams-%llu maps-%llu payload_types-%llu",
(long long unsigned) k->data, (long long unsigned) k->data,
(long long unsigned) k->data, (long long unsigned) k->data);
redis_pipe(r, "DEL media-%s streams-%s maps-%s payload_types-%s",
media_key, media_key, media_key, media_key);
for (n = media->endpoint_maps; n; n = n->next)
redis_pipe(r, "DEL map-%llu sfds-%llu",
(long long unsigned) n->data,
(long long unsigned) n->data);
for (n = media->endpoint_maps; n; n = n->next) {
em = n->data;
em_key = HKEY(em);
redis_pipe(r, "DEL map-%s sfds-%s", em_key, em_key);
}
}
}
@ -626,6 +642,8 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) {
if (__get_consecutive_ports(&fd, 1, port, c))
return -1;
sfd = __stream_fd_new(&fd, c);
redis_hkey_cpy( sfd->redis_hkey, it->id->str);
if (redis_hash_get_crypto_context(&sfd->crypto, &it->rh))
return -1;
it->ptr = sfd;
@ -657,6 +675,7 @@ static int redis_streams(struct call *c, struct redis_list *streams) {
if (redis_hash_get_crypto_context(&ps->crypto, &it->rh))
return -1;
it->ptr = ps;
redis_hkey_cpy(ps->redis_hkey, it->id->str);
PS_CLEAR(ps, KERNELIZED);
}
@ -676,6 +695,8 @@ static int redis_tags(struct call *c, struct redis_list *tags) {
if (!ml)
return -1;
redis_hkey_cpy(ml->redis_hkey, it->id->str);
if (redis_hash_get_time_t(&ml->created, &it->rh, "created"))
return -1;
if (!redis_hash_get_str(&s, &it->rh, "tag"))
@ -746,6 +767,9 @@ static int redis_tags_populate(struct redis *r, struct redis_list *tags, struct
med->call = ml->call;
med->index = i;
med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free);
redis_hkey_cpy(med->redis_hkey, it_media->id->str);
g_queue_push_tail(&ml->medias, med);
if (redis_hash_get_str(&s, &it_media->rh, "type"))
@ -803,6 +827,8 @@ static int redis_tags_populate(struct redis *r, struct redis_list *tags, struct
g_queue_init(&em->sfds);
med->endpoint_maps = g_slist_prepend(med->endpoint_maps, em);
redis_hkey_cpy(em->redis_hkey, it_em->id->str);
if (redis_hash_get_endpoint(&em->endpoint, &it_em->rh, "endpoint"))
goto free2;
if (redis_hash_build_list(r, "sfds", it_em->id, sfds, redis_build_em_sfds, em))
@ -1052,18 +1078,13 @@ static int redis_update_crypto_params(struct redis *r, const char *pref, void *s
{
if (!p->crypto_suite)
return -1;
redis_pipe(r, "HMSET %s-%llu %s-crypto_suite %s %s-master_key "PB" %s-master_salt "PB"",
pref,
(long long unsigned) suff,
redis_pipe(r, "HMSET %s-%s %s-crypto_suite %s %s-master_key "PB" %s-master_salt "PB"",
pref, suff,
key, p->crypto_suite->name,
key, S_LEN(p->master_key, sizeof(p->master_key)),
key, S_LEN(p->master_salt, sizeof(p->master_salt)));
if (p->mki)
redis_pipe(r, "HMSET %s-%llu %s-mki "PB"",
pref,
(long long unsigned) suff,
key,
S_LEN(p->mki, p->mki_len));
redis_pipe(r, "HMSET %s-%s %s-mki "PB"", pref, suff, key, S_LEN(p->mki, p->mki_len));
return 0;
}
@ -1072,10 +1093,8 @@ static void redis_update_crypto_context(struct redis *r, const char *pref, void
{
if (redis_update_crypto_params(r, pref, suff, "", &c->params))
return;
redis_pipe(r, "HMSET %s-%llu last_index "UINT64F"",
pref,
(long long unsigned) suff,
c->last_index);
redis_pipe(r, "HMSET %s-%s last_index "UINT64F"",
pref, suff, c->last_index);
}
static void redis_update_endpoint(struct redis *r, const char *pref, void *suff,
const char *key, const struct endpoint *e)
@ -1083,17 +1102,14 @@ static void redis_update_endpoint(struct redis *r, const char *pref, void *suff,
char a[64];
inet_ntop(AF_INET6, &e->ip46, a, sizeof(a));
redis_pipe(r, "HMSET %s-%llu %s-addr %s %s-port %hu",
pref,
(long long unsigned) suff,
key, a, key, (short unsigned) e->port);
redis_pipe(r, "HMSET %s-%s %s-addr %s %s-port %hu",
pref, suff, key, a, key, (short unsigned) e->port);
}
static void redis_update_stats(struct redis *r, const char *pref, void *suff,
const char *key, const struct stats *s)
{
redis_pipe(r, "HMSET %s-%llu %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"",
pref,
(long long unsigned) suff,
redis_pipe(r, "HMSET %s-%s %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"",
pref, suff,
key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes),
key, atomic64_get(&s->errors));
}
@ -1102,9 +1118,8 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, voi
{
if (!f->hash_func)
return;
redis_pipe(r, "HMSET %s-%llu hash_func %s fingerprint "PB"",
pref,
(long long unsigned) suff,
redis_pipe(r, "HMSET %s-%s hash_func %s fingerprint "PB"",
pref, suff,
f->hash_func->name,
S_LEN(f->digest, sizeof(f->digest)));
}
@ -1125,6 +1140,9 @@ void redis_update(struct call *c, struct redis *r, int role) {
struct rtp_payload_type *pt;
char a[64];
unsigned int pt_index;
char *sfd_key, *ps_key, *mono_key, *active_mono_key, *other_mono_key,
*media_key, *em_key, *ps_rtp_sink_key, *ps_rtcp_sink_key,
*ps_rtcp_sibling_key;
if (!r)
return;
@ -1147,15 +1165,16 @@ void redis_update(struct call *c, struct redis *r, int role) {
for (l = c->stream_fds; l; l = l->next) {
sfd = l->data;
sfd_key = HKEY(sfd) ;
ps_key = HKEY(sfd->stream);
redis_pipe(r, "DEL sfd-%llu", (long long unsigned) sfd);
redis_pipe(r, "HMSET sfd-%llu localport %hu stream %llu",
(long long unsigned) sfd, (short unsigned) sfd->fd.localport,
(long long unsigned) sfd->stream);
redis_pipe(r, "DEL sfd-%s", sfd_key);
redis_pipe(r, "HMSET sfd-%s localport %hu stream %s",
sfd_key, (short unsigned) sfd->fd.localport, ps_key);
redis_update_crypto_context(r, "sfd", sfd, &sfd->crypto);
/* XXX DTLS?? */
redis_pipe(r, "EXPIRE sfd-%llu 86400", (long long unsigned) sfd);
redis_pipe(r, "LPUSH sfds-"PB" %llu", STR(&c->callid), (long long unsigned) sfd);
redis_pipe(r, "EXPIRE sfd-%s 86400", sfd_key);
redis_pipe(r, "LPUSH sfds-"PB" %s", STR(&c->callid), sfd_key);
}
for (l = c->streams; l; l = l->next) {
@ -1164,133 +1183,122 @@ void redis_update(struct call *c, struct redis *r, int role) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
redis_pipe(r, "DEL stream-%llu", (long long unsigned) ps);
redis_pipe(r, "HMSET stream-%llu media %llu sfd %llu rtp_sink %llu "
"rtcp_sink %llu rtcp_sibling %llu last_packet "UINT64F" "
ps_key = HKEY(ps);
ps_rtp_sink_key = HKEY(ps->rtp_sink);
ps_rtcp_sink_key = HKEY(ps->rtcp_sink);
ps_rtcp_sibling_key = HKEY(ps->rtcp_sibling);
media_key = HKEY(ps->media);
sfd_key = HKEY(ps->sfd);
redis_pipe(r, "DEL stream-%s", ps_key);
redis_pipe(r, "HMSET stream-%s media %s sfd %s rtp_sink %s "
"rtcp_sink %s rtcp_sibling %s last_packet "UINT64F" "
"ps_flags %u",
(long long unsigned) ps,
(long long unsigned) ps->media,
(long long unsigned) ps->sfd,
(long long unsigned) ps->rtp_sink,
(long long unsigned) ps->rtcp_sink,
(long long unsigned) ps->rtcp_sibling,
atomic64_get(&ps->last_packet),
ps_key, media_key, sfd_key, ps_rtp_sink_key,
ps_rtcp_sink_key, ps_rtcp_sibling_key, atomic64_get(&ps->last_packet),
ps->ps_flags);
redis_update_endpoint(r, "stream", ps, "endpoint", &ps->endpoint);
redis_update_endpoint(r, "stream", ps, "advertised_endpoint", &ps->advertised_endpoint);
redis_update_stats(r, "stream", ps, "stats", &ps->stats);
redis_update_crypto_context(r, "stream", ps, &ps->crypto);
redis_update_endpoint(r, "stream", ps_key, "endpoint", &ps->endpoint);
redis_update_endpoint(r, "stream", ps_key, "advertised_endpoint", &ps->advertised_endpoint);
redis_update_stats(r, "stream", ps_key, "stats", &ps->stats);
redis_update_crypto_context(r, "stream", ps_key, &ps->crypto);
/* XXX DTLS?? */
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->out_lock);
redis_pipe(r, "EXPIRE stream-%llu 86400", (long long unsigned) ps);
redis_pipe(r, "LPUSH streams-"PB" %llu", STR(&c->callid), (long long unsigned) ps);
redis_pipe(r, "EXPIRE stream-%s 86400", ps_key);
redis_pipe(r, "LPUSH streams-"PB" %s", STR(&c->callid), ps_key);
}
for (l = c->monologues; l; l = l->next) {
ml = l->data;
redis_pipe(r, "DEL tag-%llu other_tags-%llu medias-%llu",
(long long unsigned) ml,
(long long unsigned) ml,
(long long unsigned) ml);
redis_pipe(r, "HMSET tag-%llu created %llu active %llu deleted %llu",
(long long unsigned) ml,
(long long unsigned) ml->created,
(long long unsigned) ml->active_dialogue,
(long long unsigned) ml->deleted);
mono_key = HKEY(ml);
active_mono_key = HKEY(ml->active_dialogue);
redis_pipe(r, "DEL tag-%s other_tags-%s medias-%s",
mono_key, mono_key, mono_key);
redis_pipe(r, "HMSET tag-%s created %llu active %s deleted %llu",
mono_key, (long long unsigned) ml->created,
active_mono_key, (long long unsigned) ml->deleted);
if (ml->tag.s)
redis_pipe(r, "HMSET tag-%llu tag "PB"",
(long long unsigned) ml,
STR(&ml->tag));
redis_pipe(r, "HMSET tag-%s tag "PB"", mono_key, STR(&ml->tag));
if (ml->viabranch.s)
redis_pipe(r, "HMSET tag-%llu via-branch "PB"",
(long long unsigned) ml,
STR(&ml->viabranch));
redis_pipe(r, "HMSET tag-%s via-branch "PB"", mono_key, STR(&ml->viabranch));
k = g_hash_table_get_values(ml->other_tags);
for (m = k; m; m = m->next) {
redis_pipe(r, "RPUSH other_tags-%llu %llu",
(long long unsigned) ml,
(long long unsigned) m->data);
other_mono_key = HKEY(((struct call_monologue *) m->data));
redis_pipe(r, "RPUSH other_tags-%s %s", mono_key, other_mono_key);
}
g_list_free(k);
for (k = ml->medias.head; k; k = k->next) {
media = k->data;
media_key = HKEY(media);
redis_pipe(r, "DEL media-%llu streams-%llu maps-%llu payload_types-%llu",
(long long unsigned) media, (long long unsigned) media,
(long long unsigned) media, (long long unsigned) media);
redis_pipe(r, "HMSET media-%llu "
redis_pipe(r, "DEL media-%s streams-%s maps-%s payload_types-%s",
media_key, media_key,
media_key, media_key);
redis_pipe(r, "HMSET media-%s "
"type "PB" protocol %s desired_family %i "
"sdes_in_tag %u sdes_out_tag %u interface "PB" local_address "IP6F" "
"media_flags %u",
(long long unsigned) media,
media_key,
STR(&media->type), media->protocol ? media->protocol->name : "",
media->desired_family,
media->sdes_in.tag, media->sdes_out.tag,
STR(&media->interface->name), IP6P(&media->local_address->addr.s6_addr),
media->media_flags);
redis_update_crypto_params(r, "media", media, "sdes_in", &media->sdes_in.params);
redis_update_crypto_params(r, "media", media, "sdes_out", &media->sdes_out.params);
redis_update_dtls_fingerprint(r, "media", media, &media->fingerprint);
redis_update_crypto_params(r, "media", media_key, "sdes_in", &media->sdes_in.params);
redis_update_crypto_params(r, "media", media_key, "sdes_out", &media->sdes_out.params);
redis_update_dtls_fingerprint(r, "media", media_key, &media->fingerprint);
for (m = media->streams.head; m; m = m->next) {
redis_pipe(r, "RPUSH streams-%llu %llu",
(long long unsigned) media,
(long long unsigned) m->data);
ps_key = HKEY(((struct packet_stream *) m->data));
redis_pipe(r, "RPUSH streams-%s %s", media_key, ps_key);
}
for (n = media->endpoint_maps; n; n = n->next) {
ep = n->data;
em_key = HKEY(ep);
redis_pipe(r, "DEL map-%llu sfds-%llu",
(long long unsigned) ep,
(long long unsigned) ep);
redis_pipe(r, "HMSET map-%llu wildcard %i",
(long long unsigned) ep,
ep->wildcard);
redis_update_endpoint(r, "map", ep, "endpoint", &ep->endpoint);
redis_pipe(r, "DEL map-%s sfds-%s", em_key, em_key);
redis_pipe(r, "HMSET map-%s wildcard %i", em_key, ep->wildcard);
redis_update_endpoint(r, "map", em_key, "endpoint", &ep->endpoint);
for (m = ep->sfds.head; m; m = m->next) {
redis_pipe(r, "RPUSH sfds-%llu %llu",
(long long unsigned) ep,
(long long unsigned) m->data);
sfd_key = HKEY(((struct stream_fd *) m->data));
redis_pipe(r, "RPUSH sfds-%s %s", em_key, sfd_key);
}
redis_pipe(r, "EXPIRE map-%llu 86400", (long long unsigned) ep);
redis_pipe(r, "EXPIRE sfds-%llu 86400", (long long unsigned) ep);
redis_pipe(r, "LPUSH maps-%llu %llu",
(long long unsigned) media, (long long unsigned) ep);
redis_pipe(r, "EXPIRE map-%s 86400", em_key);
redis_pipe(r, "EXPIRE sfds-%s 86400", em_key);
redis_pipe(r, "LPUSH maps-%s %s", media_key, em_key);
}
pt_list = g_hash_table_get_values(media->rtp_payload_types);
pt_index = 0;
for (pt_iter = pt_list; pt_iter; pt_iter = pt_iter->next) {
pt = pt_iter->data;
redis_pipe(r, "HSET payload_types-%llu %u %u",
(long long unsigned) media,
redis_pipe(r, "HSET payload_types-%s %u %u",
media_key,
(unsigned int) pt_index,
(unsigned int) pt->payload_type);
pt_index++;
}
g_list_free(pt_list);
redis_pipe(r, "EXPIRE media-%llu 86400", (long long unsigned) media);
redis_pipe(r, "EXPIRE streams-%llu 86400", (long long unsigned) media);
redis_pipe(r, "EXPIRE maps-%llu 86400", (long long unsigned) media);
redis_pipe(r, "EXPIRE payload_types-%llu 86400", (long long unsigned) media);
redis_pipe(r, "LPUSH medias-%llu %llu",
(long long unsigned) ml, (long long unsigned) media);
redis_pipe(r, "EXPIRE media-%s 86400", media_key);
redis_pipe(r, "EXPIRE streams-%s 86400", media_key);
redis_pipe(r, "EXPIRE maps-%s 86400", media_key);
redis_pipe(r, "EXPIRE payload_types-%s 86400", media_key);
redis_pipe(r, "LPUSH medias-%s %s", mono_key, media_key);
}
redis_pipe(r, "EXPIRE tag-%llu 86400", (long long unsigned) ml);
redis_pipe(r, "EXPIRE other_tags-%llu 86400", (long long unsigned) ml);
redis_pipe(r, "EXPIRE medias-%llu 86400", (long long unsigned) ml);
redis_pipe(r, "LPUSH tags-"PB" %llu", STR(&c->callid), (long long unsigned) ml);
redis_pipe(r, "EXPIRE tag-%s 86400", mono_key);
redis_pipe(r, "EXPIRE other_tags-%s 86400", mono_key);
redis_pipe(r, "EXPIRE medias-%s 86400", mono_key);
redis_pipe(r, "LPUSH tags-"PB" %s", STR(&c->callid), mono_key);
}
redis_pipe(r, "EXPIRE call-"PB" 86400", STR(&c->callid));

@ -45,7 +45,7 @@ struct list_item {
};
#define HKEY(ptr) ptr ? ptr->redis_hkey : "0"

Loading…
Cancel
Save