From a780cf78918e0bda7967da3a8ab399ac21df5b52 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 28 Jun 2013 15:01:13 -0400 Subject: [PATCH] redis updates --- daemon/call.c | 46 +++++++++++++++++++++++++++---------------- daemon/call.h | 8 +++++--- daemon/crypto.h | 1 - daemon/main.c | 12 +++++------ daemon/redis.c | 10 +++++----- daemon/redis.h | 29 ++++++++++++++++++++++----- daemon/rtcp.c | 4 ++-- daemon/str.h | 14 +++++++------ tests/simulator-ng.pl | 2 ++ 9 files changed, 80 insertions(+), 46 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index c97207fa8..54de3542d 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -517,6 +517,7 @@ static int stream_packet(struct streamrelay *sr_incoming, str *s, struct sockadd struct callmaster *m; unsigned char cc; char addr[64]; + struct stream s_copy; p_incoming = sr_incoming->up; cs_incoming = p_incoming->up; @@ -566,6 +567,7 @@ use_cand: LOG_PARAMS_C(c), sr_incoming->fd.localport, addr); p_incoming->confirmed = 1; + update = 1; peerinfo: if (!stun_ret && !p_incoming->codec && s->len >= 2) { @@ -578,12 +580,14 @@ peerinfo: } sr_out_rtcp = &p_outgoing->rtps[1]; /* sr_incoming->idx == 0 */ + s_copy = sr_outgoing->peer; sr_outgoing->peer.ip46 = fsin->sin6_addr; sr_outgoing->peer.port = ntohs(fsin->sin6_port); - sr_out_rtcp->peer.ip46 = sr_outgoing->peer.ip46; - sr_out_rtcp->peer.port = sr_outgoing->peer.port + 1; /* sr_out_rtcp->idx == 1 */ - - update = 1; + if (memcmp(&s_copy, &sr_outgoing->peer, sizeof(s_copy))) { + sr_out_rtcp->peer.ip46 = sr_outgoing->peer.ip46; + sr_out_rtcp->peer.port = sr_outgoing->peer.port + 1; /* sr_out_rtcp->idx == 1 */ + update = 1; + } if (sr_incoming->no_kernel_support) goto forward; @@ -744,7 +748,7 @@ out: ca = cs->call; mutex_unlock(&cs->lock); - if (update && redis_update) + if (update) redis_update(ca, ca->callmaster->conf.redis); } @@ -1026,7 +1030,7 @@ static void callmaster_timer(void *ptr) { u_int64_t d; struct stats tmpstats; struct callstream *cs; - int j; + int j, update; ZERO(hlp); @@ -1064,13 +1068,24 @@ static void callmaster_timer(void *ptr) { sr->kstats.bytes = ke->stats.bytes; sr->kstats.errors = ke->stats.errors; - if (sr->other->crypto.out.crypto_suite) + update = 0; + + if (sr->other->crypto.out.crypto_suite + && ke->target.encrypt.last_index - sr->other->crypto.out.s_l > 0x4000) { sr->other->crypto.out.s_l = ke->target.encrypt.last_index; - if (sr->crypto.in.crypto_suite) + update = 1; + } + if (sr->crypto.in.crypto_suite + && ke->target.decrypt.last_index - sr->crypto.in.s_l > 0x4000) { sr->crypto.in.s_l = ke->target.decrypt.last_index; + update = 1; + } mutex_unlock(&cs->lock); + if (update) + redis_update(cs->call, m->conf.redis); + next: hlp.ports[ke->target.target_port] = NULL; g_slice_free1(sizeof(*ke), ke); @@ -1765,8 +1780,7 @@ static void call_destroy(struct call *c) { obj_put(c); - if (redis_delete) - redis_delete(c, m->conf.redis); + redis_delete(c, m->conf.redis); mutex_lock(&c->lock); /* at this point, no more callstreams can be added */ @@ -2129,8 +2143,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o ret = streams_print(c->callstreams, num, opmode, out[RE_UDP_COOKIE], SAF_UDP); mutex_unlock(&c->lock); - if (redis_update) - redis_update(c, m->conf.redis); + redis_update(c, m->conf.redis); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %.*s", LOG_PARAMS_CI(c), STR_FMT(ret)); goto out; @@ -2177,8 +2190,7 @@ static str *call_request_lookup_tcp(char **out, struct callmaster *m, enum call_ streams_free(&s); - if (redis_update) - redis_update(c, m->conf.redis); + redis_update(c, m->conf.redis); mylog(LOG_INFO, LOG_PREFIX_CI "Returning to SIP proxy: %.*s", LOG_PARAMS_CI(c), STR_FMT(ret)); obj_put(c); @@ -2487,8 +2499,7 @@ static void calls_dump_iterator(void *key, void *val, void *ptr) { struct call *c = val; struct callmaster *m = c->callmaster; - if (redis_update) - redis_update(c, m->conf.redis); + redis_update(c, m->conf.redis); } void calls_dump_redis(struct callmaster *m) { @@ -2496,7 +2507,7 @@ void calls_dump_redis(struct callmaster *m) { return; mylog(LOG_DEBUG, "Start dumping all call data to Redis...\n"); - redis_wipe(m->conf.redis); + redis_wipe_mod(m->conf.redis); g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); mylog(LOG_DEBUG, "Finished dumping all call data to Redis\n"); } @@ -2652,6 +2663,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster ret = sdp_replace(chopper, &parsed, call, (num >= 0) ? opmode : (opmode ^ 1), &flags, streamhash); mutex_unlock(&call->lock); + redis_update(call, m->conf.redis); obj_put(call); errstr = "Error rewriting SDP"; diff --git a/daemon/call.h b/daemon/call.h index af6801345..072b74e87 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -214,6 +214,11 @@ void relays_cache_init(struct relays_cache *c); int relays_cache_want_ports(struct relays_cache *c, int portA, int portB, struct call *call); void relays_cache_cleanup(struct relays_cache *c, struct callmaster *m); +enum transport_protocol transport_protocol(const str *s); + + + + static inline char *call_strdup(struct call *c, const char *s) { char *r; if (!s) @@ -255,7 +260,4 @@ static inline str *call_str_init_dup(struct call *c, char *s) { -enum transport_protocol transport_protocol(const str *s); - - #endif diff --git a/daemon/crypto.h b/daemon/crypto.h index 63bd5c85e..4e5a2aef1 100644 --- a/daemon/crypto.h +++ b/daemon/crypto.h @@ -59,7 +59,6 @@ struct crypto_context { u_int32_t roc; u_int64_t s_l; /* XXX replay list */ - u_int64_t num_packets; /* ? */ char session_key[16]; /* k_e */ diff --git a/daemon/main.c b/daemon/main.c index 1a53891be..47abd60e6 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -27,8 +27,8 @@ #define die(x...) do { fprintf(stderr, x); exit(-1); } while(0) #define dlresolve(n) do { \ - n = dlsym(dlh, "mod_" #n); \ - if (!n) \ + n ## _mod = dlsym(dlh, "mod_" #n); \ + if (!n ## _mod) \ die("Failed to resolve symbol from plugin: %s\n", "mod_" #n); \ } while(0) #define check_struct_size(x) do { \ @@ -460,7 +460,7 @@ void create_everything(struct main_context *ctx) { if (!strp || !*strp || strcmp(*strp, "redis/3")) die("Incorrect redis module version: %s\n", *strp); redis_mod_verify(dlh); - mc.redis = redis_new(redis_ip, redis_port, redis_db); + mc.redis = redis_new_mod(redis_ip, redis_port, redis_db); if (!mc.redis) die("Cannot start up without Redis database\n"); } @@ -471,10 +471,8 @@ void create_everything(struct main_context *ctx) { daemonize(); wpidfile(); - if (mc.redis) { - if (redis_restore(ctx->m, mc.redis)) - die("Refusing to continue without working Redis database\n"); - } + if (redis_restore(ctx->m, mc.redis)) + die("Refusing to continue without working Redis database\n"); } static void timer_loop(void *d) { diff --git a/daemon/redis.c b/daemon/redis.c index 0f32e25f1..83349bfcb 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1,7 +1,7 @@ #include "redis.h" -struct redis *(*redis_new)(u_int32_t, u_int16_t, int); -int (*redis_restore)(struct callmaster *, struct redis *); -void (*redis_update)(struct call *, struct redis *); -void (*redis_delete)(struct call *, struct redis *); -void (*redis_wipe)(struct redis *); +struct redis *(*redis_new_mod)(u_int32_t, u_int16_t, int); +int (*redis_restore_mod)(struct callmaster *, struct redis *); +void (*redis_update_mod)(struct call *, struct redis *); +void (*redis_delete_mod)(struct call *, struct redis *); +void (*redis_wipe_mod)(struct redis *); diff --git a/daemon/redis.h b/daemon/redis.h index 17bb81135..e8e7dd27b 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -15,11 +15,30 @@ struct redis; -extern struct redis *(*redis_new)(u_int32_t, u_int16_t, int); -extern int (*redis_restore)(struct callmaster *, struct redis *); -extern void (*redis_update)(struct call *, struct redis *); -extern void (*redis_delete)(struct call *, struct redis *); -extern void (*redis_wipe)(struct redis *); +extern struct redis *(*redis_new_mod)(u_int32_t, u_int16_t, int); +extern int (*redis_restore_mod)(struct callmaster *, struct redis *); +extern void (*redis_update_mod)(struct call *, struct redis *); +extern void (*redis_delete_mod)(struct call *, struct redis *); +extern void (*redis_wipe_mod)(struct redis *); + + + + +static inline void redis_update(struct call *c, struct redis *r) { + if (!redis_update_mod) + return; + redis_update_mod(c, r); +} +static inline void redis_delete(struct call *c, struct redis *r) { + if (!redis_delete_mod) + return; + redis_delete_mod(c, r); +} +static inline int redis_restore(struct callmaster *m, struct redis *r) { + if (!redis_restore_mod) + return 0; + return redis_restore_mod(m, r); +} diff --git a/daemon/rtcp.c b/daemon/rtcp.c index 2a540818c..446c9e442 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -395,11 +395,11 @@ int rtcp_avp2savp(str *s, struct crypto_context *c) { if (check_session_keys(c)) return -1; - if (crypto_encrypt_rtcp(c, rtcp, &payload, c->num_packets)) + if (crypto_encrypt_rtcp(c, rtcp, &payload, c->s_l)) return -1; idx = (void *) s->s + s->len; - *idx = htonl(0x80000000ULL | c->num_packets++); + *idx = htonl(0x80000000ULL | c->s_l++); s->len += sizeof(*idx); to_auth = *s; diff --git a/daemon/str.h b/daemon/str.h index 5ca46cff0..76e8d5cda 100644 --- a/daemon/str.h +++ b/daemon/str.h @@ -41,10 +41,10 @@ static inline int str_cmp_len(const str *a, const char *b, int len); static inline int str_cmp_str(const str *a, const str *b); /* compares two str objects, allows either to be NULL */ static inline int str_cmp_str0(const str *a, const str *b); -/* inits a str object from a regular string */ -static inline void str_init(str *out, char *s); -/* inits a str object from any binary string */ -static inline void str_init_len(str *out, char *s, int len); +/* inits a str object from a regular string. returns out */ +static inline str *str_init(str *out, char *s); +/* inits a str object from any binary string. returns out */ +static inline str *str_init_len(str *out, char *s, int len); /* returns new str object allocated with malloc, including buffer */ static inline str *str_dup(const str *s); /* returns new str object allocated from chunk, including buffer */ @@ -131,13 +131,15 @@ static inline int str_cmp_str0(const str *a, const str *b) { } return str_cmp_str(a, b); } -static inline void str_init(str *out, char *s) { +static inline str *str_init(str *out, char *s) { out->s = s; out->len = s ? strlen(s) : 0; + return out; } -static inline void str_init_len(str *out, char *s, int len) { +static inline str *str_init_len(str *out, char *s, int len) { out->s = s; out->len = len; + return out; } static inline str *str_dup(const str *s) { str *r; diff --git a/tests/simulator-ng.pl b/tests/simulator-ng.pl index 480d32402..eeaa88f5f 100755 --- a/tests/simulator-ng.pl +++ b/tests/simulator-ng.pl @@ -505,6 +505,7 @@ sub do_rtp { $KEEPGOING or undef($c); } $NOENC and $repl = $expect; + !$repl && $KEEPGOING and next; $repl eq $expect or die hexdump($repl, $expect) . " $$trans[$a]{name} > $$trans[$b]{name}, ports $$outputs[$b][$j][0] and $$outputs[$a][$j][0]"; $rtcp or next; @@ -524,6 +525,7 @@ sub do_rtp { $dst = $$pr{sockaddr}($dstport, $addr); $repl = send_receive($sendfd, $expfd, $payload, $dst); $NOENC and $repl = $expect; + !$repl && $KEEPGOING and next; $repl eq $expect or die hexdump($repl, $expect) . " $$trans[$a]{name} > $$trans[$b]{name}"; } }