You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rtpengine/daemon/redis.c

1325 lines
33 KiB

#include <stdio.h>
#include <hiredis/hiredis.h>
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>
#include <glib.h>
#include "redis.h"
#include "compat.h"
#include "aux.h"
#include "call.h"
#include "log.h"
#include "str.h"
#include "crypto.h"
#include "dtls.h"
INLINE redisReply *redis_expect(int type, redisReply *r) {
if (!r)
return NULL;
if (r->type != type) {
freeReplyObject(r);
return NULL;
}
return r;
}
#if __YCM
/* format checking in YCM editor */
INLINE void redis_pipe(struct redis *r, const char *fmt, ...)
__attribute__((format(printf,2,3)));
INLINE redisReply *redis_get(struct redis *r, int type, const char *fmt, ...)
__attribute__((format(printf,3,4)));
static int redisCommandNR(redisContext *r, const char *fmt, ...)
__attribute__((format(printf,2,3)));
#define PB "%.*s"
#define STR(x) (int) (x)->len, (x)->s
#define STR_R(x) (int) (x)->len, (x)->str
#define S_LEN(s,l) (int) (l), (s)
#else
#define PB "%b"
#define STR(x) (x)->s, (size_t) (x)->len
#define STR_R(x) (x)->str, (size_t) (x)->len
#define S_LEN(s,l) (s), (size_t) (l)
#endif
static void redis_pipe(struct redis *r, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
redisvAppendCommand(r->ctx, fmt, ap);
va_end(ap);
r->pipeline++;
}
static redisReply *redis_get(struct redis *r, int type, const char *fmt, ...) {
va_list ap;
redisReply *ret;
va_start(ap, fmt);
ret = redis_expect(type, redisvCommand(r->ctx, fmt, ap));
va_end(ap);
return ret;
}
static int redisCommandNR(redisContext *r, const char *fmt, ...) {
va_list ap;
redisReply *ret;
va_start(ap, fmt);
ret = redisvCommand(r, fmt, ap);
va_end(ap);
if (ret)
freeReplyObject(ret);
return ret ? 0 : -1;
}
/* called with r->lock held */
static int redis_check_type(struct redis *r, char *key, char *suffix, char *type) {
redisReply *rp;
rp = redisCommand(r->ctx, "TYPE %s%s", key, suffix ? : "");
if (!rp)
return -1;
if (rp->type != REDIS_REPLY_STATUS) {
freeReplyObject(rp);
return -1;
}
if (strcmp(rp->str, type) && strcmp(rp->str, "none"))
redisCommandNR(r->ctx, "DEL %s%s", key, suffix ? : "");
freeReplyObject(rp);
return 0;
}
/* called with r->lock held */
static void redis_consume(struct redis *r) {
redisReply *rp;
while (r->pipeline) {
if (redisGetReply(r->ctx, (void **) &rp) == REDIS_OK)
freeReplyObject(rp);
r->pipeline--;
}
}
/* called with r->lock held if necessary */
static int redis_connect(struct redis *r, int wait) {
struct timeval tv;
redisReply *rp;
char *s;
if (r->ctx)
redisFree(r->ctx);
r->ctx = NULL;
tv.tv_sec = 1;
tv.tv_usec = 0;
r->ctx = redisConnectWithTimeout(r->host, r->port, tv);
if (!r->ctx)
goto err;
if (r->ctx->err)
goto err2;
if (redisCommandNR(r->ctx, "PING"))
goto err2;
if (redisCommandNR(r->ctx, "SELECT %i", r->db))
goto err2;
while (wait-- >= 0) {
ilog(LOG_INFO, "Asking Redis whether it's master or slave...");
rp = redisCommand(r->ctx, "INFO");
if (!rp)
goto err2;
s = strstr(rp->str, "role:");
if (!s)
goto err3;
if (!memcmp(s, "role:master", 9))
goto done;
else if (!memcmp(s, "role:slave", 8))
goto next;
else
goto err3;
next:
freeReplyObject(rp);
rlog(LOG_INFO, "Connected to Redis, but it's in slave mode");
usleep(1000000);
}
goto err2;
done:
freeReplyObject(rp);
redis_check_type(r, "calls", NULL, "set");
ilog(LOG_INFO, "Connected to Redis");
return 0;
err3:
freeReplyObject(rp);
err2:
if (r->ctx->err)
rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr);
redisFree(r->ctx);
r->ctx = NULL;
err:
rlog(LOG_ERR, "Failed to connect to master Redis database");
return -1;
}
struct redis *redis_new(u_int32_t ip, u_int16_t port, int db) {
struct redis *r;
r = g_slice_alloc0(sizeof(*r));
r->ip = ip;
sprintf(r->host, IPF, IPP(ip));
r->port = port;
r->db = db;
mutex_init(&r->lock);
if (redis_connect(r, 10))
goto err;
return r;
err:
mutex_destroy(&r->lock);
g_slice_free1(sizeof(*r), r);
return NULL;
}
static void redis_close(struct redis *r) {
if (r->ctx)
redisFree(r->ctx);
mutex_destroy(&r->lock);
g_slice_free1(sizeof(*r), r);
}
/* called with r->lock held if necessary */
static void redis_check_conn(struct redis *r) {
if (redisCommandNR(r->ctx, "PING") == 0)
return;
rlog(LOG_INFO, "Lost connection to Redis");
if (redis_connect(r, 1))
abort();
}
/* called with r->lock held and c->master_lock held */
static void redis_delete_call(struct call *c, struct redis *r) {
GSList *l, *n;
GList *k;
struct call_monologue *ml;
struct call_media *media;
redis_pipe(r, "SREM calls "PB"", STR(&c->callid));
redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"", STR(&c->callid), STR(&c->callid),
STR(&c->callid), STR(&c->callid));
for (l = c->stream_fds; l; l = l->next)
redis_pipe(r, "DEL sfd-%llu", (long long unsigned) l->data);
for (l = c->streams; l; l = l->next)
redis_pipe(r, "DEL stream-%llu", (long long unsigned) l->data);
for (l = c->monologues; l; l = l->next) {
ml = l->data;
redis_pipe(r, "DEL tag-%llu other_tags-%llu medias-%llu",
(long long unsigned) ml,
(long long unsigned) ml,
(long long unsigned) ml);
for (k = ml->medias.head; k; k = k->next) {
media = k->data;
redis_pipe(r, "DEL media-%llu streams-%llu maps-%llu payload_types-%llu",
(long long unsigned) k->data, (long long unsigned) k->data,
(long long unsigned) k->data, (long long unsigned) k->data);
for (n = media->endpoint_maps; n; n = n->next)
redis_pipe(r, "DEL map-%llu sfds-%llu",
(long long unsigned) n->data,
(long long unsigned) n->data);
}
}
redis_consume(r);
}
static int redis_get_hash(struct redis_hash *out, struct redis *r, const char *key, const redisReply *which) {
redisReply *k, *v;
int i;
out->ht = g_hash_table_new(g_str_hash, g_str_equal);
if (!out->ht)
goto err;
out->rr = redis_get(r, REDIS_REPLY_ARRAY, "HGETALL %s-"PB"", key, STR_R(which));
if (!out->rr)
goto err2;
for (i = 1; i < out->rr->elements; i += 2) {
k = out->rr->element[i - 1];
v = out->rr->element[i];
if (k->type != REDIS_REPLY_STRING || v->type != REDIS_REPLY_STRING)
continue;
if (g_hash_table_insert_check(out->ht, k->str, v) != TRUE)
goto err3;
}
return 0;
err3:
freeReplyObject(out->rr);
err2:
g_hash_table_destroy(out->ht);
err:
return -1;
}
/*
static struct redis_hash *redis_get_hash_new(struct redis *r, const char *key, const redisReply *which) {
struct redis_hash *out;
out = g_slice_alloc(sizeof(*out));
if (!out)
return NULL;
if (!redis_get_hash(out, r, key, which))
return out;
g_slice_free1(sizeof(*out), out);
return NULL;
}
*/
static void redis_destroy_hash(struct redis_hash *rh) {
freeReplyObject(rh->rr);
g_hash_table_destroy(rh->ht);
}
/*
static void redis_free_hash(struct redis_hash *rh) {
redis_destroy_hash(rh);
g_slice_free1(sizeof(*rh), rh);
}
*/
static void redis_destroy_list(struct redis_list *rl) {
struct list_item *it;
redis_destroy_hash(&rl->rh);
while ((it = g_queue_pop_head(&rl->q)))
g_slice_free1(sizeof(*it), it);
}
static int redis_get_list_hash(struct redis_list *out, struct redis *r, const char *key, const redisReply *id,
const char *sub)
{
redisReply *el;
int i;
struct list_item *it;
g_queue_init(&out->q);
out->rh.ht = g_hash_table_new(g_str_hash, g_str_equal);
if (!out->rh.ht)
return -1;
out->rh.rr = redis_get(r, REDIS_REPLY_ARRAY, "LRANGE %s-"PB" 0 -1", key, STR_R(id));
if (!out->rh.rr)
goto err;
for (i = 0; i < out->rh.rr->elements; i++) {
el = out->rh.rr->element[i];
if (el->type != REDIS_REPLY_STRING)
continue;
it = g_slice_alloc(sizeof(*it));
if (!it)
goto err2;
it->id = el;
if (redis_get_hash(&it->rh, r, sub, el))
goto err3;
if (g_hash_table_insert_check(out->rh.ht, el->str, it) != TRUE)
goto err4;
g_queue_push_tail(&out->q, it);
}
return 0;
err4:
redis_destroy_hash(&it->rh);
err3:
g_slice_free1(sizeof(*it), it);
err2:
freeReplyObject(out->rh.rr);
err:
g_hash_table_destroy(out->rh.ht);
g_queue_clear(&out->q);
return -1;
}
static int redis_hash_get_str(str *out, const struct redis_hash *h, const char *k) {
redisReply *r;
r = g_hash_table_lookup(h->ht, k);
if (!r) {
out->s = NULL;
out->len = 0;
return -1;
}
out->s = r->str;
out->len = r->len;
return 0;
}
/* we can do this because this happens during startup in a single thread */
static atomic64 strtoa64(const char *c, char **endp, int base) {
u_int64_t u;
atomic64 ret;
u = strtoull(c, endp, base);
atomic64_set_na(&ret, u);
return ret;
}
define_get_int_type(time_t, time_t, strtoull);
define_get_int_type(int, int, strtol);
define_get_int_type(unsigned, unsigned int, strtol);
define_get_int_type(u16, u_int16_t, strtol);
define_get_int_type(u64, u_int64_t, strtoull);
define_get_int_type(a64, atomic64, strtoa64);
define_get_type_format(str, str);
define_get_type_format(u16, u_int16_t);
//define_get_type_format(u64, u_int64_t);
define_get_type_format(a64, atomic64);
static int redis_hash_get_c_buf_fn(unsigned char *out, size_t len, const struct redis_hash *h,
const char *k, ...)
{
va_list ap;
str s;
int ret;
va_start(ap, k);
ret = redis_hash_get_str_v(&s, h, k, ap);
va_end(ap);
if (ret)
return -1;
if (s.len > len)
return -1;
memcpy(out, s.s, s.len);
return 0;
}
#define redis_hash_get_c_buf_f(o, h, f...) \
redis_hash_get_c_buf_fn(o, sizeof(o), h, f)
static int redis_hash_get_bool_flag(const struct redis_hash *h, const char *k) {
int i;
if (redis_hash_get_int(&i, h, k))
return 0;
if (i)
return -1;
return 0;
}
static int redis_hash_get_endpoint(struct endpoint *out, const struct redis_hash *h, const char *k) {
str s;
if (redis_hash_get_str_f(&s, h, "%s-addr", k))
return -1;
if (inet_pton(AF_INET6, s.s, &out->ip46) != 1)
return -1;
if (redis_hash_get_u16_f(&out->port, h, "%s-port", k))
return -1;
return 0;
}
static int redis_hash_get_stats(struct stats *out, const struct redis_hash *h, const char *k) {
if (redis_hash_get_a64_f(&out->packets, h, "%s-packets", k))
return -1;
if (redis_hash_get_a64_f(&out->bytes, h, "%s-bytes", k))
return -1;
if (redis_hash_get_a64_f(&out->errors, h, "%s-errors", k))
return -1;
return 0;
}
static void *redis_hash_get_ptr(struct redis_list *list, const char *key) {
struct list_item *it;
if (!strcmp(key, "0"))
return NULL;
it = g_hash_table_lookup(list->rh.ht, key);
if (!it)
return NULL;
return it->ptr;
}
static void *redis_hash_get_ptr_rr(struct redis_list *list, const redisReply *rr) {
if (rr->type != REDIS_REPLY_STRING)
return NULL;
return redis_hash_get_ptr(list, rr->str);
}
static void *redis_hash_get_ptr_hash(struct redis_list *list, struct redis_hash *rh, const char *key) {
str s;
if (redis_hash_get_str(&s, rh, key))
return NULL;
return redis_hash_get_ptr(list, s.s);
}
/* can return 1, 0 or -1 */
static int redis_hash_get_crypto_params(struct crypto_params *out, const struct redis_hash *h, const char *k) {
str s;
if (redis_hash_get_str_f(&s, h, "%s-crypto_suite", k))
return 1;
out->crypto_suite = crypto_find_suite(&s);
if (!out->crypto_suite)
return -1;
if (redis_hash_get_c_buf_f(out->master_key, h, "%s-master_key", k))
return -1;
if (redis_hash_get_c_buf_f(out->master_salt, h, "%s-master_salt", k))
return -1;
if (!redis_hash_get_str_f(&s, h, "%s-mki", k)) {
if (s.len > 255)
return -1;
out->mki = malloc(s.len);
memcpy(out->mki, s.s, s.len);
}
return 0;
}
static int redis_hash_get_crypto_context(struct crypto_context *out, const struct redis_hash *h) {
int ret;
ret = redis_hash_get_crypto_params(&out->params, h, "");
if (ret == 1)
return 0;
else if (ret)
return -1;
if (redis_hash_get_u64(&out->last_index, h, "last_index"))
return -1;
return 0;
}
static int redis_hash_build_list(struct redis *r, const char *key, redisReply *tag, struct redis_list *list,
int (*func)(void *, void *), void *up) {
redisReply *rr;
void *ptr;
int i, ret = -1;
rr = redis_get(r, REDIS_REPLY_ARRAY, "LRANGE %s-"PB" 0 -1", key, STR_R(tag));
if (!rr)
return -1;
for (i = 0; i < rr->elements; i++) {
ptr = redis_hash_get_ptr_rr(list, rr->element[i]);
if (!ptr)
goto out;
if (func(up, ptr))
goto out;
}
ret = 0;
out:
freeReplyObject(rr);
return ret;
}
static int redis_build_other_tags(void *a, void *b) {
struct call_monologue *A = a, *B = b;
g_hash_table_insert(A->other_tags, &B->tag, B);
return 0;
}
static int redis_build_streams(void *a, void *b) {
struct call_media *med = a;
struct packet_stream *ps = b;
g_queue_push_tail(&med->streams, ps);
ps->media = med;
return 0;
}
static int redis_build_em_sfds(void *a, void *b) {
struct endpoint_map *em = a;
g_queue_push_tail(&em->sfds, b);
return 0;
}
static int redis_sfds(struct call *c, struct redis_list *sfds) {
GList *l;
struct list_item *it;
struct stream_fd *sfd;
struct udp_fd fd;
int port;
for (l = sfds->q.head; l; l = l->next) {
it = l->data;
if (redis_hash_get_int(&port, &it->rh, "localport"))
return -1;
if (__get_consecutive_ports(&fd, 1, port, c))
return -1;
sfd = __stream_fd_new(&fd, c);
if (redis_hash_get_crypto_context(&sfd->crypto, &it->rh))
return -1;
it->ptr = sfd;
}
return 0;
}
static int redis_streams(struct call *c, struct redis_list *streams) {
GList *l;
struct list_item *it;
struct packet_stream *ps;
for (l = streams->q.head; l; l = l->next) {
it = l->data;
ps = __packet_stream_new(c);
if (!ps)
return -1;
atomic64_set_na(&ps->last_packet, time(NULL));
if (redis_hash_get_unsigned((unsigned int *) &ps->ps_flags, &it->rh, "ps_flags"))
return -1;
if (redis_hash_get_endpoint(&ps->endpoint, &it->rh, "endpoint"))
return -1;
if (redis_hash_get_endpoint(&ps->advertised_endpoint, &it->rh, "advertised_endpoint"))
return -1;
if (redis_hash_get_stats(&ps->stats, &it->rh, "stats"))
return -1;
if (redis_hash_get_crypto_context(&ps->crypto, &it->rh))
return -1;
it->ptr = ps;
PS_CLEAR(ps, KERNELIZED);
}
return 0;
}
static int redis_tags(struct call *c, struct redis_list *tags) {
GList *l;
struct list_item *it;
struct call_monologue *ml;
str s;
for (l = tags->q.head; l; l = l->next) {
it = l->data;
ml = __monologue_create(c);
if (!ml)
return -1;
if (redis_hash_get_time_t(&ml->created, &it->rh, "created"))
return -1;
if (!redis_hash_get_str(&s, &it->rh, "tag"))
__monologue_tag(ml, &s);
if (!redis_hash_get_str(&s, &it->rh, "via-branch"))
__monologue_viabranch(ml, &s);
redis_hash_get_time_t(&ml->deleted, &it->rh, "deleted");
it->ptr = ml;
}
return 0;
}
static int redis_link_sfds(struct redis_list *sfds, struct redis_list *streams) {
GList *l;
struct list_item *it;
struct stream_fd *sfd;
for (l = sfds->q.head; l; l = l->next) {
it = l->data;
sfd = it->ptr;
sfd->stream = redis_hash_get_ptr_hash(streams, &it->rh, "stream");
if (!sfd->stream)
return -1;
}
return 0;
}
static int redis_tags_populate(struct redis *r, struct redis_list *tags, struct redis_list *streams,
struct redis_list *sfds)
{
GList *l_tags, *l_medias, *l_ems, *l_streams;
struct list_item *it_tag, *it_media, *it_em;
struct packet_stream *it_stream;
struct call_monologue *ml;
struct redis_list rl_medias, rl_ems;
int i;
struct call_media *med;
str s;
struct endpoint_map *em = NULL;
struct callmaster *cm;
struct in6_addr in6a;
struct rtp_payload_type *pt = NULL;
struct redis_hash pt_hash;
unsigned int pt_index, pt_totals;
char hash_key[32];
for (l_tags = tags->q.head; l_tags; l_tags = l_tags->next) {
it_tag = l_tags->data;
ml = it_tag->ptr;
cm = ml->call->callmaster;
if (redis_hash_build_list(r, "other_tags", it_tag->id, tags, redis_build_other_tags, ml))
return -1;
ml->active_dialogue = redis_hash_get_ptr_hash(tags, &it_tag->rh, "active");
if (redis_get_list_hash(&rl_medias, r, "medias", it_tag->id, "media"))
return -1;
for (i = 1, l_medias = rl_medias.q.head; l_medias; i++, l_medias = l_medias->next) {
it_media = l_medias->data;
/* from call.c:__get_media() */
med = g_slice_alloc0(sizeof(*med));
med->monologue = ml;
med->call = ml->call;
med->index = i;
med->rtp_payload_types = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __payload_type_free);
g_queue_push_tail(&ml->medias, med);
if (redis_hash_get_str(&s, &it_media->rh, "type"))
goto free1;
call_str_cpy(ml->call, &med->type, &s);
if (redis_hash_get_str(&s, &it_media->rh, "protocol"))
goto free1;
med->protocol = transport_protocol(&s);
if (redis_hash_get_int(&med->desired_family, &it_media->rh, "desired_family"))
goto free1;
if (redis_hash_get_str(&s, &it_media->rh, "interface")
|| !(med->interface = get_local_interface(cm, &s, med->desired_family)))
{
rlog(LOG_ERR, "unable to find specified local interface");
med->interface = get_local_interface(cm, NULL, med->desired_family);
}
if (redis_hash_get_str(&s, &it_media->rh, "local_address")
|| inet_pton(AF_INET6, s.s, &in6a) != 1
|| !(med->local_address = get_interface_from_address(med->interface,
&in6a)))
{
rlog(LOG_ERR, "unable to find specified local address");
med->local_address = get_any_interface_address(med->interface,
med->desired_family);
}
if (redis_hash_get_unsigned(&med->sdes_in.tag, &it_media->rh, "sdes_in_tag"))
goto free1;
if (redis_hash_get_unsigned(&med->sdes_out.tag, &it_media->rh, "sdes_out_tag"))
goto free1;
if (redis_hash_get_unsigned((unsigned int *) &med->media_flags, &it_media->rh,
"media_flags"))
goto free1;
if (redis_hash_get_crypto_params(&med->sdes_in.params, &it_media->rh, "sdes_in") < 0)
goto free1;
if (redis_hash_get_crypto_params(&med->sdes_out.params, &it_media->rh, "sdes_out") < 0)
goto free1;
/* XXX dtls */
if (redis_hash_build_list(r, "streams", it_media->id, streams, redis_build_streams, med))
goto free1;
if (redis_get_list_hash(&rl_ems, r, "maps", it_media->id, "map"))
goto free1;
for (l_ems = rl_ems.q.head; l_ems; l_ems = l_ems->next) {
it_em = l_ems->data;
/* from call.c:__get_endpoint_map() */
em = g_slice_alloc0(sizeof(*em));
g_queue_init(&em->sfds);
med->endpoint_maps = g_slist_prepend(med->endpoint_maps, em);
if (redis_hash_get_endpoint(&em->endpoint, &it_em->rh, "endpoint"))
goto free2;
if (redis_hash_build_list(r, "sfds", it_em->id, sfds, redis_build_em_sfds, em))
goto free2;
em->wildcard = redis_hash_get_bool_flag(&it_em->rh, "wildcard");
}
// get payload_types hash content (e.g 0:pt0, 1:pt1, ...)
if (redis_get_hash(&pt_hash, r, "payload_types", it_media->id))
goto free3;
// fill media rtp_payload_types
pt_totals = (unsigned int)g_hash_table_size(pt_hash.ht);
for (pt_index = 0; pt_index < pt_totals; pt_index++) {
pt = g_slice_alloc0(sizeof(*pt));
if (!pt) {
goto free3;
}
sprintf(hash_key,"%u", pt_index);
if (redis_hash_get_unsigned(&pt->payload_type, &pt_hash, hash_key)) {
goto free3;
}
g_hash_table_insert(med->rtp_payload_types, &pt->payload_type, pt);
}
// fill streams rtp_stats
for (l_streams = med->streams.head; l_streams; l_streams = l_streams->next) {
it_stream = l_streams->data;
__rtp_stats_update(it_stream->rtp_stats, med->rtp_payload_types);
}
}
}
return 0;
free3:
redis_destroy_hash(&pt_hash);
if (pt) {
g_slice_free1(sizeof(*pt), pt);
}
free2:
med->endpoint_maps = g_slist_delete_link(med->endpoint_maps, med->endpoint_maps);
if (em) {
g_slice_free1(sizeof(*em), em);
}
free1:
g_queue_pop_tail(&ml->medias);
g_queue_clear(&med->streams);
g_slice_free1(sizeof(*med), med);
return -1;
}
static int redis_link_streams(struct redis_list *streams, struct redis_list *sfds) {
GList *l;
struct list_item *it;
struct packet_stream *ps;
for (l = streams->q.head; l; l = l->next) {
it = l->data;
ps = it->ptr;
ps->sfd = redis_hash_get_ptr_hash(sfds, &it->rh, "sfd");
ps->rtp_sink = redis_hash_get_ptr_hash(streams, &it->rh, "rtp_sink");
ps->rtcp_sink = redis_hash_get_ptr_hash(streams, &it->rh, "rtcp_sink");
ps->rtcp_sibling = redis_hash_get_ptr_hash(streams, &it->rh, "rtcp_sibling");
}
return 0;
}
static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id) {
struct redis_hash call;
struct redis_list tags, sfds, streams;
struct call *c = NULL;
str s;
const char *err;
int i;
err = "'call' data incomplete";
if (redis_get_hash(&call, r, "call", id))
goto err1;
err = "'tags' incomplete";
if (redis_get_list_hash(&tags, r, "tags", id, "tag"))
goto err2;
err = "'sfds' incomplete";
if (redis_get_list_hash(&sfds, r, "sfds", id, "sfd"))
goto err3;
err = "'streams' incomplete";
if (redis_get_list_hash(&streams, r, "streams", id, "stream"))
goto err4;
s.s = id->str;
s.len = id->len;
c = call_get_or_create(&s, m);
err = "failed to create call struct";
if (!c)
goto err5;
err = "missing 'created' timestamp";
if (redis_hash_get_time_t(&c->created, &call, "created"))
goto err6;
err = "missing 'last signal' timestamp";
if (redis_hash_get_time_t(&c->last_signal, &call, "last_signal"))
goto err6;
if (redis_hash_get_int(&i, &call, "tos"))
c->tos = 184;
else
c->tos = i;
redis_hash_get_time_t(&c->deleted, &call, "deleted");
redis_hash_get_time_t(&c->ml_deleted, &call, "ml_deleted");
if (!redis_hash_get_str(&s, &call, "created_from"))
c->created_from = call_strdup(c, s.s);
if (!redis_hash_get_str(&s, &call, "created_from_addr")) {
parse_ip6_port(&c->created_from_addr.sin6_addr, &c->created_from_addr.sin6_port, s.s);
c->created_from_addr.sin6_port = htons(c->created_from_addr.sin6_port);
c->created_from_addr.sin6_family = AF_INET6;
}
err = "failed to create sfds";
if (redis_sfds(c, &sfds))
goto err6;
err = "failed to create streams";
if (redis_streams(c, &streams))
goto err6;
err = "failed to create tags";
if (redis_tags(c, &tags))
goto err6;
err = "failed to link sfds";
if (redis_link_sfds(&sfds, &streams))
goto err6;
err = "failed to populate tags";
if (redis_tags_populate(r, &tags, &streams, &sfds))
goto err6;
err = "failed to link streams";
if (redis_link_streams(&streams, &sfds))
goto err6;
err = NULL;
obj_put(c);
err6:
rwlock_unlock_w(&c->master_lock);
err5:
redis_destroy_list(&streams);
err4:
redis_destroy_list(&sfds);
err3:
redis_destroy_list(&tags);
err2:
redis_destroy_hash(&call);
err1:
log_info_clear();
if (err) {
rlog(LOG_WARNING, "Failed to restore call ID '%.*s' from Redis: %s", REDIS_FMT(id), err);
if (c) {
call_destroy(c);
obj_put(c);
}
}
}
struct thread_ctx {
struct callmaster *m;
GQueue r_q;
mutex_t r_m;
};
#define RESTORE_NUM_THREADS 4
static void restore_thread(void *call_p, void *ctx_p) {
struct thread_ctx *ctx = ctx_p;
redisReply *call = call_p;
struct redis *r;
rlog(LOG_DEBUG, "Processing call ID '%.*s' from Redis", REDIS_FMT(call));
mutex_lock(&ctx->r_m);
r = g_queue_pop_head(&ctx->r_q);
mutex_unlock(&ctx->r_m);
redis_restore_call(r, ctx->m, call);
mutex_lock(&ctx->r_m);
g_queue_push_tail(&ctx->r_q, r);
mutex_unlock(&ctx->r_m);
}
int redis_restore(struct callmaster *m, struct redis *r) {
redisReply *calls, *call;
int i, ret = -1;
GThreadPool *gtp;
struct thread_ctx ctx;
if (!r)
return 0;
log_level |= LOG_FLAG_RESTORE;
rlog(LOG_DEBUG, "Restoring calls from Redis...");
redis_check_conn(r);
calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls");
if (!calls) {
rlog(LOG_ERR, "Could not retrieve call list from Redis: %s", r->ctx->errstr);
goto err;
}
ctx.m = m;
mutex_init(&ctx.r_m);
g_queue_init(&ctx.r_q);
for (i = 0; i < RESTORE_NUM_THREADS; i++)
g_queue_push_tail(&ctx.r_q, redis_new(r->ip, r->port, r->db));
gtp = g_thread_pool_new(restore_thread, &ctx, RESTORE_NUM_THREADS, TRUE, NULL);
for (i = 0; i < calls->elements; i++) {
call = calls->element[i];
if (call->type != REDIS_REPLY_STRING)
continue;
g_thread_pool_push(gtp, call, NULL);
}
g_thread_pool_free(gtp, FALSE, TRUE);
while ((r = g_queue_pop_head(&ctx.r_q)))
redis_close(r);
ret = 0;
err:
log_level &= ~LOG_FLAG_RESTORE;
return ret;
}
static int redis_update_crypto_params(struct redis *r, const char *pref, void *suff,
const char *key, const struct crypto_params *p)
{
if (!p->crypto_suite)
return -1;
redis_pipe(r, "HMSET %s-%llu %s-crypto_suite %s %s-master_key "PB" %s-master_salt "PB"",
pref,
(long long unsigned) suff,
key, p->crypto_suite->name,
key, S_LEN(p->master_key, sizeof(p->master_key)),
key, S_LEN(p->master_salt, sizeof(p->master_salt)));
if (p->mki)
redis_pipe(r, "HMSET %s-%llu %s-mki "PB"",
pref,
(long long unsigned) suff,
key,
S_LEN(p->mki, p->mki_len));
return 0;
}
static void redis_update_crypto_context(struct redis *r, const char *pref, void *suff,
const struct crypto_context *c)
{
if (redis_update_crypto_params(r, pref, suff, "", &c->params))
return;
redis_pipe(r, "HMSET %s-%llu last_index "UINT64F"",
pref,
(long long unsigned) suff,
c->last_index);
}
static void redis_update_endpoint(struct redis *r, const char *pref, void *suff,
const char *key, const struct endpoint *e)
{
char a[64];
inet_ntop(AF_INET6, &e->ip46, a, sizeof(a));
redis_pipe(r, "HMSET %s-%llu %s-addr %s %s-port %hu",
pref,
(long long unsigned) suff,
key, a, key, (short unsigned) e->port);
}
static void redis_update_stats(struct redis *r, const char *pref, void *suff,
const char *key, const struct stats *s)
{
redis_pipe(r, "HMSET %s-%llu %s-packets "UINT64F" %s-bytes "UINT64F" %s-errors "UINT64F"",
pref,
(long long unsigned) suff,
key, atomic64_get(&s->packets), key, atomic64_get(&s->bytes),
key, atomic64_get(&s->errors));
}
static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, void *suff,
const struct dtls_fingerprint *f)
{
if (!f->hash_func)
return;
redis_pipe(r, "HMSET %s-%llu hash_func %s fingerprint "PB"",
pref,
(long long unsigned) suff,
f->hash_func->name,
S_LEN(f->digest, sizeof(f->digest)));
}
/* must be called lock-free */
void redis_update(struct call *c, struct redis *r) {
GSList *l, *n;
GList *pt_list, *pt_iter;
GList *k, *m;
struct call_monologue *ml;
struct call_media *media;
struct packet_stream *ps;
struct stream_fd *sfd;
struct endpoint_map *ep;
struct rtp_payload_type *pt;
char a[64];
unsigned int pt_index;
if (!r)
return;
mutex_lock(&r->lock);
redis_check_conn(r);
rwlock_lock_r(&c->master_lock);
redis_pipe(r, "SREM calls "PB"", STR(&c->callid));
redis_pipe(r, "DEL call-"PB" tags-"PB" sfds-"PB" streams-"PB"", STR(&c->callid), STR(&c->callid),
STR(&c->callid), STR(&c->callid));
smart_ntop_port(a, &c->created_from_addr, sizeof(a));
redis_pipe(r, "HMSET call-"PB" created %llu last_signal %llu tos %i deleted %llu "
"ml_deleted %llu created_from %s created_from_addr %s",
STR(&c->callid), (long long unsigned) c->created, (long long unsigned) c->last_signal,
(int) c->tos, (long long unsigned) c->deleted, (long long unsigned) c->ml_deleted,
c->created_from, a);
/* XXX DTLS cert?? */
for (l = c->stream_fds; l; l = l->next) {
sfd = l->data;
redis_pipe(r, "DEL sfd-%llu", (long long unsigned) sfd);
redis_pipe(r, "HMSET sfd-%llu localport %hu stream %llu",
(long long unsigned) sfd, (short unsigned) sfd->fd.localport,
(long long unsigned) sfd->stream);
redis_update_crypto_context(r, "sfd", sfd, &sfd->crypto);
/* XXX DTLS?? */
redis_pipe(r, "EXPIRE sfd-%llu 86400", (long long unsigned) sfd);
redis_pipe(r, "LPUSH sfds-"PB" %llu", STR(&c->callid), (long long unsigned) sfd);
}
for (l = c->streams; l; l = l->next) {
ps = l->data;
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
redis_pipe(r, "DEL stream-%llu", (long long unsigned) ps);
redis_pipe(r, "HMSET stream-%llu media %llu sfd %llu rtp_sink %llu "
"rtcp_sink %llu rtcp_sibling %llu last_packet "UINT64F" "
"ps_flags %u",
(long long unsigned) ps,
(long long unsigned) ps->media,
(long long unsigned) ps->sfd,
(long long unsigned) ps->rtp_sink,
(long long unsigned) ps->rtcp_sink,
(long long unsigned) ps->rtcp_sibling,
atomic64_get(&ps->last_packet),
ps->ps_flags);
redis_update_endpoint(r, "stream", ps, "endpoint", &ps->endpoint);
redis_update_endpoint(r, "stream", ps, "advertised_endpoint", &ps->advertised_endpoint);
redis_update_stats(r, "stream", ps, "stats", &ps->stats);
redis_update_crypto_context(r, "stream", ps, &ps->crypto);
/* XXX DTLS?? */
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->out_lock);
redis_pipe(r, "EXPIRE stream-%llu 86400", (long long unsigned) ps);
redis_pipe(r, "LPUSH streams-"PB" %llu", STR(&c->callid), (long long unsigned) ps);
}
for (l = c->monologues; l; l = l->next) {
ml = l->data;
redis_pipe(r, "DEL tag-%llu other_tags-%llu medias-%llu",
(long long unsigned) ml,
(long long unsigned) ml,
(long long unsigned) ml);
redis_pipe(r, "HMSET tag-%llu created %llu active %llu deleted %llu",
(long long unsigned) ml,
(long long unsigned) ml->created,
(long long unsigned) ml->active_dialogue,
(long long unsigned) ml->deleted);
if (ml->tag.s)
redis_pipe(r, "HMSET tag-%llu tag "PB"",
(long long unsigned) ml,
STR(&ml->tag));
if (ml->viabranch.s)
redis_pipe(r, "HMSET tag-%llu via-branch "PB"",
(long long unsigned) ml,
STR(&ml->viabranch));
k = g_hash_table_get_values(ml->other_tags);
for (m = k; m; m = m->next) {
redis_pipe(r, "RPUSH other_tags-%llu %llu",
(long long unsigned) ml,
(long long unsigned) m->data);
}
g_list_free(k);
for (k = ml->medias.head; k; k = k->next) {
media = k->data;
redis_pipe(r, "DEL media-%llu streams-%llu maps-%llu",
(long long unsigned) media, (long long unsigned) media,
(long long unsigned) media);
redis_pipe(r, "HMSET media-%llu "
"type "PB" protocol %s desired_family %i "
"sdes_in_tag %u sdes_out_tag %u interface "PB" local_address "IP6F" "
"media_flags %u",
(long long unsigned) media,
STR(&media->type), media->protocol ? media->protocol->name : "",
media->desired_family,
media->sdes_in.tag, media->sdes_out.tag,
STR(&media->interface->name), IP6P(&media->local_address->addr.s6_addr),
media->media_flags);
redis_update_crypto_params(r, "media", media, "sdes_in", &media->sdes_in.params);
redis_update_crypto_params(r, "media", media, "sdes_out", &media->sdes_out.params);
redis_update_dtls_fingerprint(r, "media", media, &media->fingerprint);
for (m = media->streams.head; m; m = m->next) {
redis_pipe(r, "RPUSH streams-%llu %llu",
(long long unsigned) media,
(long long unsigned) m->data);
}
for (n = media->endpoint_maps; n; n = n->next) {
ep = n->data;
redis_pipe(r, "DEL map-%llu sfds-%llu",
(long long unsigned) ep,
(long long unsigned) ep);
redis_pipe(r, "HMSET map-%llu wildcard %i",
(long long unsigned) ep,
ep->wildcard);
redis_update_endpoint(r, "map", ep, "endpoint", &ep->endpoint);
for (m = ep->sfds.head; m; m = m->next) {
redis_pipe(r, "RPUSH sfds-%llu %llu",
(long long unsigned) ep,
(long long unsigned) m->data);
}
redis_pipe(r, "EXPIRE map-%llu 86400", (long long unsigned) ep);
redis_pipe(r, "EXPIRE sfds-%llu 86400", (long long unsigned) ep);
redis_pipe(r, "LPUSH maps-%llu %llu",
(long long unsigned) media, (long long unsigned) ep);
}
pt_list = g_hash_table_get_values(media->rtp_payload_types);
pt_index = 0;
for (pt_iter = pt_list; pt_iter; pt_iter = pt_iter->next) {
pt = pt_iter->data;
redis_pipe(r, "HSET payload_types-%llu %u %u",
(long long unsigned) media,
(unsigned int) pt_index,
(unsigned int) pt->payload_type);
pt_index++;
}
g_list_free(pt_list);
redis_pipe(r, "EXPIRE media-%llu 86400", (long long unsigned) media);
redis_pipe(r, "EXPIRE streams-%llu 86400", (long long unsigned) media);
redis_pipe(r, "EXPIRE maps-%llu 86400", (long long unsigned) media);
redis_pipe(r, "EXPIRE payload_types-%llu 86400", (long long unsigned) media);
redis_pipe(r, "LPUSH medias-%llu %llu",
(long long unsigned) ml, (long long unsigned) media);
}
redis_pipe(r, "EXPIRE tag-%llu 86400", (long long unsigned) ml);
redis_pipe(r, "EXPIRE other_tags-%llu 86400", (long long unsigned) ml);
redis_pipe(r, "EXPIRE medias-%llu 86400", (long long unsigned) ml);
redis_pipe(r, "LPUSH tags-"PB" %llu", STR(&c->callid), (long long unsigned) ml);
}
redis_pipe(r, "EXPIRE call-"PB" 86400", STR(&c->callid));
redis_pipe(r, "EXPIRE tags-"PB" 86400", STR(&c->callid));
redis_pipe(r, "EXPIRE sfds-"PB" 86400", STR(&c->callid));
redis_pipe(r, "EXPIRE streams-"PB" 86400", STR(&c->callid));
redis_pipe(r, "SADD calls "PB"", STR(&c->callid));
redis_consume(r);
mutex_unlock(&r->lock);
rwlock_unlock_r(&c->master_lock);
}
/* must be called lock-free */
void redis_delete(struct call *c, struct redis *r) {
if (!r)
return;
mutex_lock(&r->lock);
redis_check_conn(r);
rwlock_lock_r(&c->master_lock);
redis_delete_call(c, r);
rwlock_unlock_r(&c->master_lock);
mutex_unlock(&r->lock);
}
void redis_wipe(struct redis *r) {
if (!r)
return;
mutex_lock(&r->lock);
redis_check_conn(r);
redisCommandNR(r->ctx, "DEL calls");
mutex_unlock(&r->lock);
}