redis updates

git.mgm/mediaproxy-ng/github/master
Richard Fuchs 12 years ago
parent 1b1bb16117
commit a780cf7891

@ -517,6 +517,7 @@ static int stream_packet(struct streamrelay *sr_incoming, str *s, struct sockadd
struct callmaster *m;
unsigned char cc;
char addr[64];
struct stream s_copy;
p_incoming = sr_incoming->up;
cs_incoming = p_incoming->up;
@ -566,6 +567,7 @@ use_cand:
LOG_PARAMS_C(c), sr_incoming->fd.localport, addr);
p_incoming->confirmed = 1;
update = 1;
peerinfo:
if (!stun_ret && !p_incoming->codec && s->len >= 2) {
@ -578,12 +580,14 @@ peerinfo:
}
sr_out_rtcp = &p_outgoing->rtps[1]; /* sr_incoming->idx == 0 */
s_copy = sr_outgoing->peer;
sr_outgoing->peer.ip46 = fsin->sin6_addr;
sr_outgoing->peer.port = ntohs(fsin->sin6_port);
sr_out_rtcp->peer.ip46 = sr_outgoing->peer.ip46;
sr_out_rtcp->peer.port = sr_outgoing->peer.port + 1; /* sr_out_rtcp->idx == 1 */
update = 1;
if (memcmp(&s_copy, &sr_outgoing->peer, sizeof(s_copy))) {
sr_out_rtcp->peer.ip46 = sr_outgoing->peer.ip46;
sr_out_rtcp->peer.port = sr_outgoing->peer.port + 1; /* sr_out_rtcp->idx == 1 */
update = 1;
}
if (sr_incoming->no_kernel_support)
goto forward;
@ -744,7 +748,7 @@ out:
ca = cs->call;
mutex_unlock(&cs->lock);
if (update && redis_update)
if (update)
redis_update(ca, ca->callmaster->conf.redis);
}
@ -1026,7 +1030,7 @@ static void callmaster_timer(void *ptr) {
u_int64_t d;
struct stats tmpstats;
struct callstream *cs;
int j;
int j, update;
ZERO(hlp);
@ -1064,13 +1068,24 @@ static void callmaster_timer(void *ptr) {
sr->kstats.bytes = ke->stats.bytes;
sr->kstats.errors = ke->stats.errors;
if (sr->other->crypto.out.crypto_suite)
update = 0;
if (sr->other->crypto.out.crypto_suite
&& ke->target.encrypt.last_index - sr->other->crypto.out.s_l > 0x4000) {
sr->other->crypto.out.s_l = ke->target.encrypt.last_index;
if (sr->crypto.in.crypto_suite)
update = 1;
}
if (sr->crypto.in.crypto_suite
&& ke->target.decrypt.last_index - sr->crypto.in.s_l > 0x4000) {
sr->crypto.in.s_l = ke->target.decrypt.last_index;
update = 1;
}
mutex_unlock(&cs->lock);
if (update)
redis_update(cs->call, m->conf.redis);
next:
hlp.ports[ke->target.target_port] = NULL;
g_slice_free1(sizeof(*ke), ke);
@ -1765,8 +1780,7 @@ static void call_destroy(struct call *c) {
obj_put(c);
if (redis_delete)
redis_delete(c, m->conf.redis);
redis_delete(c, m->conf.redis);
mutex_lock(&c->lock);
/* at this point, no more callstreams can be added */
@ -2129,8 +2143,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o
ret = streams_print(c->callstreams, num, opmode, out[RE_UDP_COOKIE], SAF_UDP);
mutex_unlock(&c->lock);
if (redis_update)
redis_update(c, m->conf.redis);
redis_update(c, m->conf.redis);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %.*s", LOG_PARAMS_CI(c), STR_FMT(ret));
goto out;
@ -2177,8 +2190,7 @@ static str *call_request_lookup_tcp(char **out, struct callmaster *m, enum call_
streams_free(&s);
if (redis_update)
redis_update(c, m->conf.redis);
redis_update(c, m->conf.redis);
mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %.*s", LOG_PARAMS_CI(c), STR_FMT(ret));
obj_put(c);
@ -2487,8 +2499,7 @@ static void calls_dump_iterator(void *key, void *val, void *ptr) {
struct call *c = val;
struct callmaster *m = c->callmaster;
if (redis_update)
redis_update(c, m->conf.redis);
redis_update(c, m->conf.redis);
}
void calls_dump_redis(struct callmaster *m) {
@ -2496,7 +2507,7 @@ void calls_dump_redis(struct callmaster *m) {
return;
mylog(LOG_DEBUG, "Start dumping all call data to Redis...\n");
redis_wipe(m->conf.redis);
redis_wipe_mod(m->conf.redis);
g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL);
mylog(LOG_DEBUG, "Finished dumping all call data to Redis\n");
}
@ -2652,6 +2663,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
ret = sdp_replace(chopper, &parsed, call, (num >= 0) ? opmode : (opmode ^ 1), &flags, streamhash);
mutex_unlock(&call->lock);
redis_update(call, m->conf.redis);
obj_put(call);
errstr = "Error rewriting SDP";

@ -214,6 +214,11 @@ void relays_cache_init(struct relays_cache *c);
int relays_cache_want_ports(struct relays_cache *c, int portA, int portB, struct call *call);
void relays_cache_cleanup(struct relays_cache *c, struct callmaster *m);
enum transport_protocol transport_protocol(const str *s);
static inline char *call_strdup(struct call *c, const char *s) {
char *r;
if (!s)
@ -255,7 +260,4 @@ static inline str *call_str_init_dup(struct call *c, char *s) {
enum transport_protocol transport_protocol(const str *s);
#endif

@ -59,7 +59,6 @@ struct crypto_context {
u_int32_t roc;
u_int64_t s_l;
/* XXX replay list */
u_int64_t num_packets;
/* <from, to>? */
char session_key[16]; /* k_e */

@ -27,8 +27,8 @@
#define die(x...) do { fprintf(stderr, x); exit(-1); } while(0)
#define dlresolve(n) do { \
n = dlsym(dlh, "mod_" #n); \
if (!n) \
n ## _mod = dlsym(dlh, "mod_" #n); \
if (!n ## _mod) \
die("Failed to resolve symbol from plugin: %s\n", "mod_" #n); \
} while(0)
#define check_struct_size(x) do { \
@ -460,7 +460,7 @@ void create_everything(struct main_context *ctx) {
if (!strp || !*strp || strcmp(*strp, "redis/3"))
die("Incorrect redis module version: %s\n", *strp);
redis_mod_verify(dlh);
mc.redis = redis_new(redis_ip, redis_port, redis_db);
mc.redis = redis_new_mod(redis_ip, redis_port, redis_db);
if (!mc.redis)
die("Cannot start up without Redis database\n");
}
@ -471,10 +471,8 @@ void create_everything(struct main_context *ctx) {
daemonize();
wpidfile();
if (mc.redis) {
if (redis_restore(ctx->m, mc.redis))
die("Refusing to continue without working Redis database\n");
}
if (redis_restore(ctx->m, mc.redis))
die("Refusing to continue without working Redis database\n");
}
static void timer_loop(void *d) {

@ -1,7 +1,7 @@
#include "redis.h"
struct redis *(*redis_new)(u_int32_t, u_int16_t, int);
int (*redis_restore)(struct callmaster *, struct redis *);
void (*redis_update)(struct call *, struct redis *);
void (*redis_delete)(struct call *, struct redis *);
void (*redis_wipe)(struct redis *);
struct redis *(*redis_new_mod)(u_int32_t, u_int16_t, int);
int (*redis_restore_mod)(struct callmaster *, struct redis *);
void (*redis_update_mod)(struct call *, struct redis *);
void (*redis_delete_mod)(struct call *, struct redis *);
void (*redis_wipe_mod)(struct redis *);

@ -15,11 +15,30 @@ struct redis;
extern struct redis *(*redis_new)(u_int32_t, u_int16_t, int);
extern int (*redis_restore)(struct callmaster *, struct redis *);
extern void (*redis_update)(struct call *, struct redis *);
extern void (*redis_delete)(struct call *, struct redis *);
extern void (*redis_wipe)(struct redis *);
extern struct redis *(*redis_new_mod)(u_int32_t, u_int16_t, int);
extern int (*redis_restore_mod)(struct callmaster *, struct redis *);
extern void (*redis_update_mod)(struct call *, struct redis *);
extern void (*redis_delete_mod)(struct call *, struct redis *);
extern void (*redis_wipe_mod)(struct redis *);
static inline void redis_update(struct call *c, struct redis *r) {
if (!redis_update_mod)
return;
redis_update_mod(c, r);
}
static inline void redis_delete(struct call *c, struct redis *r) {
if (!redis_delete_mod)
return;
redis_delete_mod(c, r);
}
static inline int redis_restore(struct callmaster *m, struct redis *r) {
if (!redis_restore_mod)
return 0;
return redis_restore_mod(m, r);
}

@ -395,11 +395,11 @@ int rtcp_avp2savp(str *s, struct crypto_context *c) {
if (check_session_keys(c))
return -1;
if (crypto_encrypt_rtcp(c, rtcp, &payload, c->num_packets))
if (crypto_encrypt_rtcp(c, rtcp, &payload, c->s_l))
return -1;
idx = (void *) s->s + s->len;
*idx = htonl(0x80000000ULL | c->num_packets++);
*idx = htonl(0x80000000ULL | c->s_l++);
s->len += sizeof(*idx);
to_auth = *s;

@ -41,10 +41,10 @@ static inline int str_cmp_len(const str *a, const char *b, int len);
static inline int str_cmp_str(const str *a, const str *b);
/* compares two str objects, allows either to be NULL */
static inline int str_cmp_str0(const str *a, const str *b);
/* inits a str object from a regular string */
static inline void str_init(str *out, char *s);
/* inits a str object from any binary string */
static inline void str_init_len(str *out, char *s, int len);
/* inits a str object from a regular string. returns out */
static inline str *str_init(str *out, char *s);
/* inits a str object from any binary string. returns out */
static inline str *str_init_len(str *out, char *s, int len);
/* returns new str object allocated with malloc, including buffer */
static inline str *str_dup(const str *s);
/* returns new str object allocated from chunk, including buffer */
@ -131,13 +131,15 @@ static inline int str_cmp_str0(const str *a, const str *b) {
}
return str_cmp_str(a, b);
}
static inline void str_init(str *out, char *s) {
static inline str *str_init(str *out, char *s) {
out->s = s;
out->len = s ? strlen(s) : 0;
return out;
}
static inline void str_init_len(str *out, char *s, int len) {
static inline str *str_init_len(str *out, char *s, int len) {
out->s = s;
out->len = len;
return out;
}
static inline str *str_dup(const str *s) {
str *r;

@ -505,6 +505,7 @@ sub do_rtp {
$KEEPGOING or undef($c);
}
$NOENC and $repl = $expect;
!$repl && $KEEPGOING and next;
$repl eq $expect or die hexdump($repl, $expect) . " $$trans[$a]{name} > $$trans[$b]{name}, ports $$outputs[$b][$j][0] and $$outputs[$a][$j][0]";
$rtcp or next;
@ -524,6 +525,7 @@ sub do_rtp {
$dst = $$pr{sockaddr}($dstport, $addr);
$repl = send_receive($sendfd, $expfd, $payload, $dst);
$NOENC and $repl = $expect;
!$repl && $KEEPGOING and next;
$repl eq $expect or die hexdump($repl, $expect) . " $$trans[$a]{name} > $$trans[$b]{name}";
}
}

Loading…
Cancel
Save