eliminate the callmaster struct

Change-Id: I9151dbe8f47b3bb9ab67d6fea2fc3783bdc558da
pull/432/merge
Richard Fuchs 8 years ago
parent f2b93f9ef8
commit 75056a8dd1

@ -128,8 +128,8 @@ GHashTable *rtpe_callhash;
static void __monologue_destroy(struct call_monologue *monologue); static void __monologue_destroy(struct call_monologue *monologue);
static int monologue_destroy(struct call_monologue *ml); static int monologue_destroy(struct call_monologue *ml);
static struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start,
struct timeval *interval_start, struct timeval *interval_duration); struct timeval *interval_duration);
/* called with call->master_lock held in R */ /* called with call->master_lock held in R */
static int call_timer_delete_monologues(struct call *c) { static int call_timer_delete_monologues(struct call *c) {
@ -171,7 +171,7 @@ out:
/* called with callmaster->hashlock held */ /* called with hashlock held */
static void call_timer_iterator(gpointer data, gpointer user_data) { static void call_timer_iterator(gpointer data, gpointer user_data) {
struct call *c = data; struct call *c = data;
struct iterator_helper *hlp = user_data; struct iterator_helper *hlp = user_data;
@ -383,19 +383,18 @@ fault:
g_slice_free1(sizeof(*xh), xh); g_slice_free1(sizeof(*xh), xh);
} }
void kill_calls_timer(GSList *list, struct callmaster *m) { void kill_calls_timer(GSList *list, const char *url) {
struct call *ca; struct call *ca;
GList *csl; GList *csl;
struct call_monologue *cm; struct call_monologue *cm;
const char *url, *url_prefix, *url_suffix; const char *url_prefix, *url_suffix;
struct xmlrpc_helper *xh = NULL; struct xmlrpc_helper *xh = NULL;
char url_buf[128]; char url_buf[128];
if (!list) if (!list)
return; return;
/* if m is NULL, it's the scheduled deletions, otherwise it's the timeouts */ /* if url is NULL, it's the scheduled deletions, otherwise it's the timeouts */
url = m ? rtpe_config.b2b_url : NULL;
if (url) { if (url) {
xh = g_slice_alloc(sizeof(*xh)); xh = g_slice_alloc(sizeof(*xh));
xh->c = g_string_chunk_new(64); xh->c = g_string_chunk_new(64);
@ -467,8 +466,7 @@ destroy:
atomic64_add(&ps->stats.x, d); \ atomic64_add(&ps->stats.x, d); \
atomic64_add(&rtpe_statsps.x, d); \ atomic64_add(&rtpe_statsps.x, d); \
} while (0) } while (0)
static void callmaster_timer(void *ptr) { static void call_timer(void *ptr) {
struct callmaster *m = ptr;
struct iterator_helper hlp; struct iterator_helper hlp;
GList *i, *l, *calls = NULL; GList *i, *l, *calls = NULL;
struct rtpengine_list_entry *ke; struct rtpengine_list_entry *ke;
@ -588,7 +586,7 @@ static void callmaster_timer(void *ptr) {
rwlock_unlock_r(&sfd->call->master_lock); rwlock_unlock_r(&sfd->call->master_lock);
if (update) { if (update) {
redis_update_onekey(ps->call, m->conf.redis_write); redis_update_onekey(ps->call, rtpe_redis_write);
} }
next: next:
@ -606,28 +604,20 @@ next:
g_hash_table_destroy(hlp.addr_sfd); g_hash_table_destroy(hlp.addr_sfd);
kill_calls_timer(hlp.del_scheduled, NULL); kill_calls_timer(hlp.del_scheduled, NULL);
kill_calls_timer(hlp.del_timeout, m); kill_calls_timer(hlp.del_timeout, rtpe_config.b2b_url);
} }
#undef DS #undef DS
struct callmaster *callmaster_new() { int call_init() {
struct callmaster *c;
c = obj_alloc0("callmaster", sizeof(*c), NULL);
rtpe_callhash = g_hash_table_new(str_hash, str_equal); rtpe_callhash = g_hash_table_new(str_hash, str_equal);
if (!rtpe_callhash) if (!rtpe_callhash)
goto fail; return -1;
rwlock_init(&rtpe_callhash_lock); rwlock_init(&rtpe_callhash_lock);
poller_add_timer(rtpe_poller, callmaster_timer, &c->obj); poller_add_timer(rtpe_poller, call_timer, NULL);
return c;
fail: return 0;
obj_put(c);
return NULL;
} }
@ -1716,9 +1706,8 @@ out:
return rtp_pt; /* may be NULL */ return rtp_pt; /* may be NULL */
} }
void add_total_calls_duration_in_interval(struct callmaster *cm, void add_total_calls_duration_in_interval(struct timeval *interval_tv) {
struct timeval *interval_tv) { struct timeval ongoing_calls_dur = add_ongoing_calls_dur_in_interval(
struct timeval ongoing_calls_dur = add_ongoing_calls_dur_in_interval(cm,
&rtpe_latest_graphite_interval_start, interval_tv); &rtpe_latest_graphite_interval_start, interval_tv);
mutex_lock(&rtpe_totalstats_interval.total_calls_duration_lock); mutex_lock(&rtpe_totalstats_interval.total_calls_duration_lock);
@ -1728,8 +1717,9 @@ void add_total_calls_duration_in_interval(struct callmaster *cm,
mutex_unlock(&rtpe_totalstats_interval.total_calls_duration_lock); mutex_unlock(&rtpe_totalstats_interval.total_calls_duration_lock);
} }
static struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, static struct timeval add_ongoing_calls_dur_in_interval(struct timeval *interval_start,
struct timeval *interval_start, struct timeval *interval_duration) { struct timeval *interval_duration)
{
GHashTableIter iter; GHashTableIter iter;
gpointer key, value; gpointer key, value;
struct timeval call_duration, res = {0}; struct timeval call_duration, res = {0};
@ -1757,7 +1747,6 @@ static struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m,
/* called lock-free, but must hold a reference to the call */ /* called lock-free, but must hold a reference to the call */
void call_destroy(struct call *c) { void call_destroy(struct call *c) {
struct callmaster *m;
struct packet_stream *ps=0; struct packet_stream *ps=0;
struct stream_fd *sfd; struct stream_fd *sfd;
GList *l; GList *l;
@ -1771,8 +1760,6 @@ void call_destroy(struct call *c) {
return; return;
} }
m = c->callmaster;
rwlock_lock_w(&rtpe_callhash_lock); rwlock_lock_w(&rtpe_callhash_lock);
ret = (g_hash_table_lookup(rtpe_callhash, &c->callid) == c); ret = (g_hash_table_lookup(rtpe_callhash, &c->callid) == c);
if (ret) if (ret)
@ -1789,7 +1776,7 @@ void call_destroy(struct call *c) {
statistics_update_foreignown_dec(c); statistics_update_foreignown_dec(c);
if (IS_OWN_CALL(c)) { if (IS_OWN_CALL(c)) {
redis_delete(c, m->conf.redis_write); redis_delete(c, rtpe_redis_write);
} }
rwlock_lock_w(&c->master_lock); rwlock_lock_w(&c->master_lock);
@ -2001,12 +1988,11 @@ static void __call_free(void *p) {
assert(c->stream_fds.head == NULL); assert(c->stream_fds.head == NULL);
} }
static struct call *call_create(const str *callid, struct callmaster *m) { static struct call *call_create(const str *callid) {
struct call *c; struct call *c;
ilog(LOG_NOTICE, "Creating new call"); ilog(LOG_NOTICE, "Creating new call");
c = obj_alloc0("call", sizeof(*c), __call_free); c = obj_alloc0("call", sizeof(*c), __call_free);
c->callmaster = m;
mutex_init(&c->buffer_lock); mutex_init(&c->buffer_lock);
call_buffer_init(&c->buffer); call_buffer_init(&c->buffer);
rwlock_init(&c->master_lock); rwlock_init(&c->master_lock);
@ -2022,7 +2008,7 @@ static struct call *call_create(const str *callid, struct callmaster *m) {
} }
/* returns call with master_lock held in W */ /* returns call with master_lock held in W */
struct call *call_get_or_create(const str *callid, struct callmaster *m, enum call_type type) { struct call *call_get_or_create(const str *callid, enum call_type type) {
struct call *c; struct call *c;
restart: restart:
@ -2031,7 +2017,7 @@ restart:
if (!c) { if (!c) {
rwlock_unlock_r(&rtpe_callhash_lock); rwlock_unlock_r(&rtpe_callhash_lock);
/* completely new call-id, create call */ /* completely new call-id, create call */
c = call_create(callid, m); c = call_create(callid);
rwlock_lock_w(&rtpe_callhash_lock); rwlock_lock_w(&rtpe_callhash_lock);
if (g_hash_table_lookup(rtpe_callhash, callid)) { if (g_hash_table_lookup(rtpe_callhash, callid)) {
/* preempted */ /* preempted */
@ -2060,7 +2046,7 @@ restart:
} }
/* returns call with master_lock held in W, or NULL if not found */ /* returns call with master_lock held in W, or NULL if not found */
struct call *call_get(const str *callid, struct callmaster *m) { struct call *call_get(const str *callid) {
struct call *ret; struct call *ret;
rwlock_lock_r(&rtpe_callhash_lock); rwlock_lock_r(&rtpe_callhash_lock);
@ -2079,10 +2065,10 @@ struct call *call_get(const str *callid, struct callmaster *m) {
} }
/* returns call with master_lock held in W, or possibly NULL iff opmode == OP_ANSWER */ /* returns call with master_lock held in W, or possibly NULL iff opmode == OP_ANSWER */
struct call *call_get_opmode(const str *callid, struct callmaster *m, enum call_opmode opmode) { struct call *call_get_opmode(const str *callid, enum call_opmode opmode) {
if (opmode == OP_OFFER) if (opmode == OP_OFFER)
return call_get_or_create(callid, m, CT_OWN_CALL); return call_get_or_create(callid, CT_OWN_CALL);
return call_get(callid, m); return call_get(callid);
} }
/* must be called with call->master_lock held in W */ /* must be called with call->master_lock held in W */
@ -2351,7 +2337,7 @@ struct call_monologue *call_get_mono_dialogue(struct call *call, const str *from
} }
int call_delete_branch(struct callmaster *m, const str *callid, const str *branch, int call_delete_branch(const str *callid, const str *branch,
const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay) const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay)
{ {
struct call *c; struct call *c;
@ -2363,7 +2349,7 @@ int call_delete_branch(struct callmaster *m, const str *callid, const str *branc
if (delete_delay < 0) if (delete_delay < 0)
delete_delay = rtpe_config.delete_delay; delete_delay = rtpe_config.delete_delay;
c = call_get(callid, m); c = call_get(callid);
if (!c) { if (!c) {
ilog(LOG_INFO, "Call-ID to delete not found"); ilog(LOG_INFO, "Call-ID to delete not found");
goto err; goto err;
@ -2461,14 +2447,14 @@ out:
} }
static void callmaster_get_all_calls_interator(void *key, void *val, void *ptr) { static void call_get_all_calls_interator(void *key, void *val, void *ptr) {
GQueue *q = ptr; GQueue *q = ptr;
g_queue_push_tail(q, obj_get_o(val)); g_queue_push_tail(q, obj_get_o(val));
} }
void callmaster_get_all_calls(struct callmaster *m, GQueue *q) { void call_get_all_calls(GQueue *q) {
rwlock_lock_r(&rtpe_callhash_lock); rwlock_lock_r(&rtpe_callhash_lock);
g_hash_table_foreach(rtpe_callhash, callmaster_get_all_calls_interator, q); g_hash_table_foreach(rtpe_callhash, call_get_all_calls_interator, q);
rwlock_unlock_r(&rtpe_callhash_lock); rwlock_unlock_r(&rtpe_callhash_lock);
} }

@ -350,8 +350,6 @@ struct call_monologue {
struct call { struct call {
struct obj obj; struct obj obj;
struct callmaster *callmaster; /* RO */
mutex_t buffer_lock; mutex_t buffer_lock;
call_buffer_t buffer; call_buffer_t buffer;
@ -382,20 +380,6 @@ struct call {
struct recording *recording; struct recording *recording;
}; };
struct callmaster_config {
struct redis *redis;
struct redis *redis_write;
struct redis *redis_notify;
struct event_base *redis_notify_event_base;
struct redisAsyncContext *redis_notify_async_context;
};
struct callmaster {
struct obj obj;
struct callmaster_config conf;
};
extern rwlock_t rtpe_callhash_lock; extern rwlock_t rtpe_callhash_lock;
@ -405,25 +389,22 @@ extern struct stats rtpe_statsps; /* per second stats, running timer */
extern struct stats rtpe_stats; /* copied from statsps once a second */ extern struct stats rtpe_stats; /* copied from statsps once a second */
struct callmaster *callmaster_new(void); int call_init(void);
void callmaster_get_all_calls(struct callmaster *m, GQueue *q); void call_get_all_calls(GQueue *q);
//void calls_dump_redis(struct callmaster *);
//void calls_dump_redis_read(struct callmaster *);
//void calls_dump_redis_write(struct callmaster *);
struct call_monologue *__monologue_create(struct call *call); struct call_monologue *__monologue_create(struct call *call);
void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_tag(struct call_monologue *ml, const str *tag);
void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch);
struct packet_stream *__packet_stream_new(struct call *call); struct packet_stream *__packet_stream_new(struct call *call);
struct call *call_get_or_create(const str *callid, struct callmaster *m, enum call_type); struct call *call_get_or_create(const str *callid, enum call_type);
struct call *call_get_opmode(const str *callid, struct callmaster *m, enum call_opmode opmode); struct call *call_get_opmode(const str *callid, enum call_opmode opmode);
struct call_monologue *call_get_mono_dialogue(struct call *call, const str *fromtag, const str *totag, struct call_monologue *call_get_mono_dialogue(struct call *call, const str *fromtag, const str *totag,
const str *viabranch); const str *viabranch);
struct call *call_get(const str *callid, struct callmaster *m); struct call *call_get(const str *callid);
int monologue_offer_answer(struct call_monologue *monologue, GQueue *streams, const struct sdp_ng_flags *flags); int monologue_offer_answer(struct call_monologue *monologue, GQueue *streams, const struct sdp_ng_flags *flags);
int call_delete_branch(struct callmaster *m, const str *callid, const str *branch, int call_delete_branch(const str *callid, const str *branch,
const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay); const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay);
void call_destroy(struct call *); void call_destroy(struct call *);
enum call_stream_state call_stream_state_machine(struct packet_stream *); enum call_stream_state call_stream_state_machine(struct packet_stream *);
@ -434,7 +415,7 @@ int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address
int *len, const struct local_intf *ifa, int keep_unspec); int *len, const struct local_intf *ifa, int keep_unspec);
const struct transport_protocol *transport_protocol(const str *s); const struct transport_protocol *transport_protocol(const str *s);
void add_total_calls_duration_in_interval(struct callmaster *cm, struct timeval *interval_tv); void add_total_calls_duration_in_interval(struct timeval *interval_tv);
void __payload_type_free(void *p); void __payload_type_free(void *p);
void __rtp_stats_update(GHashTable *dst, GHashTable *src); void __rtp_stats_update(GHashTable *dst, GHashTable *src);

@ -142,7 +142,7 @@ fail:
return -1; return -1;
} }
static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_opmode opmode, const char* addr, static str *call_update_lookup_udp(char **out, enum call_opmode opmode, const char* addr,
const endpoint_t *sin) const endpoint_t *sin)
{ {
struct call *c; struct call *c;
@ -159,7 +159,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o
if (opmode == OP_ANSWER) if (opmode == OP_ANSWER)
str_swap(&fromtag, &totag); str_swap(&fromtag, &totag);
c = call_get_opmode(&callid, m, opmode); c = call_get_opmode(&callid, opmode);
if (!c) { if (!c) {
ilog(LOG_WARNING, "["STR_FORMAT"] Got UDP LOOKUP for unknown call-id", ilog(LOG_WARNING, "["STR_FORMAT"] Got UDP LOOKUP for unknown call-id",
STR_FMT(&callid)); STR_FMT(&callid));
@ -195,7 +195,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o
sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP);
rwlock_unlock_w(&c->master_lock); rwlock_unlock_w(&c->master_lock);
redis_update_onekey(c, m->conf.redis_write); redis_update_onekey(c, rtpe_redis_write);
gettimeofday(&(monologue->started), NULL); gettimeofday(&(monologue->started), NULL);
@ -219,11 +219,11 @@ out:
return ret; return ret;
} }
str *call_update_udp(char **out, struct callmaster *m, const char* addr, const endpoint_t *sin) { str *call_update_udp(char **out, const char* addr, const endpoint_t *sin) {
return call_update_lookup_udp(out, m, OP_OFFER, addr, sin); return call_update_lookup_udp(out, OP_OFFER, addr, sin);
} }
str *call_lookup_udp(char **out, struct callmaster *m) { str *call_lookup_udp(char **out) {
return call_update_lookup_udp(out, m, OP_ANSWER, NULL, NULL); return call_update_lookup_udp(out, OP_ANSWER, NULL, NULL);
} }
@ -235,7 +235,7 @@ static int info_parse_func(char **a, void **ret, void *p) {
return -1; return -1;
} }
static void info_parse(const char *s, GHashTable *ih, struct callmaster *m) { static void info_parse(const char *s, GHashTable *ih) {
pcre_multi_match(info_re, info_ree, s, 2, info_parse_func, ih, NULL); pcre_multi_match(info_re, info_ree, s, 2, info_parse_func, ih, NULL);
} }
@ -273,7 +273,7 @@ fail:
} }
static void streams_parse(const char *s, struct callmaster *m, GQueue *q) { static void streams_parse(const char *s, GQueue *q) {
int i; int i;
i = 0; i = 0;
pcre_multi_match(streams_re, streams_ree, s, 3, streams_parse_func, &i, q); pcre_multi_match(streams_re, streams_ree, s, 3, streams_parse_func, &i, q);
@ -298,7 +298,7 @@ static void streams_free(GQueue *q) {
static str *call_request_lookup_tcp(char **out, struct callmaster *m, enum call_opmode opmode) { static str *call_request_lookup_tcp(char **out, enum call_opmode opmode) {
struct call *c; struct call *c;
struct call_monologue *monologue; struct call_monologue *monologue;
GQueue s = G_QUEUE_INIT; GQueue s = G_QUEUE_INIT;
@ -307,14 +307,14 @@ static str *call_request_lookup_tcp(char **out, struct callmaster *m, enum call_
str_init(&callid, out[RE_TCP_RL_CALLID]); str_init(&callid, out[RE_TCP_RL_CALLID]);
infohash = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); infohash = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
c = call_get_opmode(&callid, m, opmode); c = call_get_opmode(&callid, opmode);
if (!c) { if (!c) {
ilog(LOG_WARNING, "["STR_FORMAT"] Got LOOKUP for unknown call-id", STR_FMT(&callid)); ilog(LOG_WARNING, "["STR_FORMAT"] Got LOOKUP for unknown call-id", STR_FMT(&callid));
goto out; goto out;
} }
info_parse(out[RE_TCP_RL_INFO], infohash, m); info_parse(out[RE_TCP_RL_INFO], infohash);
streams_parse(out[RE_TCP_RL_STREAMS], m, &s); streams_parse(out[RE_TCP_RL_STREAMS], &s);
str_init(&fromtag, g_hash_table_lookup(infohash, "fromtag")); str_init(&fromtag, g_hash_table_lookup(infohash, "fromtag"));
if (!fromtag.s) { if (!fromtag.s) {
ilog(LOG_WARNING, "No from-tag in message"); ilog(LOG_WARNING, "No from-tag in message");
@ -343,7 +343,7 @@ out2:
rwlock_unlock_w(&c->master_lock); rwlock_unlock_w(&c->master_lock);
streams_free(&s); streams_free(&s);
redis_update_onekey(c, m->conf.redis_write); redis_update_onekey(c, rtpe_redis_write);
ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret));
obj_put(c); obj_put(c);
@ -353,14 +353,14 @@ out:
return ret; return ret;
} }
str *call_request_tcp(char **out, struct callmaster *m) { str *call_request_tcp(char **out) {
return call_request_lookup_tcp(out, m, OP_OFFER); return call_request_lookup_tcp(out, OP_OFFER);
} }
str *call_lookup_tcp(char **out, struct callmaster *m) { str *call_lookup_tcp(char **out) {
return call_request_lookup_tcp(out, m, OP_ANSWER); return call_request_lookup_tcp(out, OP_ANSWER);
} }
str *call_delete_udp(char **out, struct callmaster *m) { str *call_delete_udp(char **out) {
str callid, branch, fromtag, totag; str callid, branch, fromtag, totag;
__C_DBG("got delete for callid '%s' and viabranch '%s'", __C_DBG("got delete for callid '%s' and viabranch '%s'",
@ -371,12 +371,12 @@ str *call_delete_udp(char **out, struct callmaster *m) {
str_init(&fromtag, out[RE_UDP_DQ_FROMTAG]); str_init(&fromtag, out[RE_UDP_DQ_FROMTAG]);
str_init(&totag, out[RE_UDP_DQ_TOTAG]); str_init(&totag, out[RE_UDP_DQ_TOTAG]);
if (call_delete_branch(m, &callid, &branch, &fromtag, &totag, NULL, -1)) if (call_delete_branch(&callid, &branch, &fromtag, &totag, NULL, -1))
return str_sprintf("%s E8\n", out[RE_UDP_COOKIE]); return str_sprintf("%s E8\n", out[RE_UDP_COOKIE]);
return str_sprintf("%s 0\n", out[RE_UDP_COOKIE]); return str_sprintf("%s 0\n", out[RE_UDP_COOKIE]);
} }
str *call_query_udp(char **out, struct callmaster *m) { str *call_query_udp(char **out) {
struct call *c; struct call *c;
str *ret, callid, fromtag, totag; str *ret, callid, fromtag, totag;
struct call_stats stats; struct call_stats stats;
@ -387,7 +387,7 @@ str *call_query_udp(char **out, struct callmaster *m) {
str_init(&fromtag, out[RE_UDP_DQ_FROMTAG]); str_init(&fromtag, out[RE_UDP_DQ_FROMTAG]);
str_init(&totag, out[RE_UDP_DQ_TOTAG]); str_init(&totag, out[RE_UDP_DQ_TOTAG]);
c = call_get_opmode(&callid, m, OP_OTHER); c = call_get_opmode(&callid, OP_OTHER);
if (!c) { if (!c) {
ilog(LOG_INFO, "["STR_FORMAT"] Call-ID to query not found", STR_FMT(&callid)); ilog(LOG_INFO, "["STR_FORMAT"] Call-ID to query not found", STR_FMT(&callid));
goto err; goto err;
@ -417,11 +417,11 @@ out:
return ret; return ret;
} }
void call_delete_tcp(char **out, struct callmaster *m) { void call_delete_tcp(char **out) {
str callid; str callid;
str_init(&callid, out[RE_TCP_D_CALLID]); str_init(&callid, out[RE_TCP_D_CALLID]);
call_delete_branch(m, &callid, NULL, NULL, NULL, NULL, -1); call_delete_branch(&callid, NULL, NULL, NULL, NULL, -1);
} }
static void call_status_iterator(struct call *c, struct streambuf_stream *s) { static void call_status_iterator(struct call *c, struct streambuf_stream *s) {
@ -430,10 +430,8 @@ static void call_status_iterator(struct call *c, struct streambuf_stream *s) {
// struct peer *p; // struct peer *p;
// struct streamrelay *r1, *r2; // struct streamrelay *r1, *r2;
// struct streamrelay *rx1, *rx2; // struct streamrelay *rx1, *rx2;
// struct callmaster *m;
// char addr1[64], addr2[64], addr3[64]; // char addr1[64], addr2[64], addr3[64];
// m = c->callmaster;
// mutex_lock(&c->master_lock); // mutex_lock(&c->master_lock);
streambuf_printf(s->outbuf, "session "STR_FORMAT" - - - - %lli\n", streambuf_printf(s->outbuf, "session "STR_FORMAT" - - - - %lli\n",
@ -445,11 +443,11 @@ static void call_status_iterator(struct call *c, struct streambuf_stream *s) {
// mutex_unlock(&c->master_lock); // mutex_unlock(&c->master_lock);
} }
void calls_status_tcp(struct callmaster *m, struct streambuf_stream *s) { void calls_status_tcp(struct streambuf_stream *s) {
GQueue q = G_QUEUE_INIT; GQueue q = G_QUEUE_INIT;
struct call *c; struct call *c;
callmaster_get_all_calls(m, &q); call_get_all_calls(&q);
streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n", streambuf_printf(s->outbuf, "proxy %u "UINT64F"/%i/%i\n",
g_queue_get_length(&q), g_queue_get_length(&q),
@ -661,7 +659,7 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu
bencode_dictionary_get_str(input, "metadata", &out->metadata); bencode_dictionary_get_str(input, "metadata", &out->metadata);
} }
static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster *m, static const char *call_offer_answer_ng(bencode_item_t *input,
bencode_item_t *output, enum call_opmode opmode, const char* addr, bencode_item_t *output, enum call_opmode opmode, const char* addr,
const endpoint_t *sin) const endpoint_t *sin)
{ {
@ -702,7 +700,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
goto out; goto out;
/* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */ /* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */
call = call_get(&callid, m); call = call_get(&callid);
/* Failover scenario because of timeout on offer response: siprouter tries /* Failover scenario because of timeout on offer response: siprouter tries
* to establish session with another rtpengine2 even though rtpengine1 * to establish session with another rtpengine2 even though rtpengine1
@ -715,12 +713,12 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
rwlock_unlock_w(&call->master_lock); rwlock_unlock_w(&call->master_lock);
call_destroy(call); call_destroy(call);
obj_put(call); obj_put(call);
call = call_get_or_create(&callid, m, CT_OWN_CALL); call = call_get_or_create(&callid, CT_OWN_CALL);
} }
} }
else { else {
/* call == NULL, should create call */ /* call == NULL, should create call */
call = call_get_or_create(&callid, m, CT_OWN_CALL); call = call_get_or_create(&callid, CT_OWN_CALL);
} }
} }
@ -783,7 +781,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster
rwlock_unlock_w(&call->master_lock); rwlock_unlock_w(&call->master_lock);
if (!flags.no_redis_update) { if (!flags.no_redis_update) {
redis_update_onekey(call,m->conf.redis_write); redis_update_onekey(call, rtpe_redis_write);
} else { } else {
ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag"); ilog(LOG_DEBUG, "Not updating Redis due to present no-redis-update flag");
} }
@ -812,7 +810,7 @@ out:
return errstr; return errstr;
} }
const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output, const char* addr, const char *call_offer_ng(bencode_item_t *input, bencode_item_t *output, const char* addr,
const endpoint_t *sin) const endpoint_t *sin)
{ {
rwlock_lock_r(&rtpe_config.config_lock); rwlock_lock_r(&rtpe_config.config_lock);
@ -834,14 +832,14 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i
} }
rwlock_unlock_r(&rtpe_config.config_lock); rwlock_unlock_r(&rtpe_config.config_lock);
return call_offer_answer_ng(input, m, output, OP_OFFER, addr, sin); return call_offer_answer_ng(input, output, OP_OFFER, addr, sin);
} }
const char *call_answer_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { const char *call_answer_ng(bencode_item_t *input, bencode_item_t *output) {
return call_offer_answer_ng(input, m, output, OP_ANSWER, NULL, NULL); return call_offer_answer_ng(input, output, OP_ANSWER, NULL, NULL);
} }
const char *call_delete_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { const char *call_delete_ng(bencode_item_t *input, bencode_item_t *output) {
str fromtag, totag, viabranch, callid; str fromtag, totag, viabranch, callid;
bencode_item_t *flags, *it; bencode_item_t *flags, *it;
int fatal = 0, delete_delay; int fatal = 0, delete_delay;
@ -871,7 +869,7 @@ const char *call_delete_ng(bencode_item_t *input, struct callmaster *m, bencode_
} }
} }
if (call_delete_branch(m, &callid, &viabranch, &fromtag, &totag, output, delete_delay)) { if (call_delete_branch(&callid, &viabranch, &fromtag, &totag, output, delete_delay)) {
if (fatal) if (fatal)
return "Call-ID not found or tags didn't match"; return "Call-ID not found or tags didn't match";
bencode_dictionary_add_string(output, "warning", "Call-ID not found or tags didn't match"); bencode_dictionary_add_string(output, "warning", "Call-ID not found or tags didn't match");
@ -1145,7 +1143,7 @@ stats:
ng_stats(bencode_dictionary_add_dictionary(dict, "RTCP"), &totals->totals[1], NULL); ng_stats(bencode_dictionary_add_dictionary(dict, "RTCP"), &totals->totals[1], NULL);
} }
static void ng_list_calls( struct callmaster *m, bencode_item_t *output, long long int limit) { static void ng_list_calls(bencode_item_t *output, long long int limit) {
GHashTableIter iter; GHashTableIter iter;
gpointer key, value; gpointer key, value;
@ -1161,13 +1159,13 @@ static void ng_list_calls( struct callmaster *m, bencode_item_t *output, long lo
const char *call_query_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { const char *call_query_ng(bencode_item_t *input, bencode_item_t *output) {
str callid, fromtag, totag; str callid, fromtag, totag;
struct call *call; struct call *call;
if (!bencode_dictionary_get_str(input, "call-id", &callid)) if (!bencode_dictionary_get_str(input, "call-id", &callid))
return "No call-id in message"; return "No call-id in message";
call = call_get_opmode(&callid, m, OP_OTHER); call = call_get_opmode(&callid, OP_OTHER);
if (!call) if (!call)
return "Unknown call-id"; return "Unknown call-id";
bencode_dictionary_get_str(input, "from-tag", &fromtag); bencode_dictionary_get_str(input, "from-tag", &fromtag);
@ -1181,7 +1179,7 @@ const char *call_query_ng(bencode_item_t *input, struct callmaster *m, bencode_i
} }
const char *call_list_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { const char *call_list_ng(bencode_item_t *input, bencode_item_t *output) {
bencode_item_t *calls = NULL; bencode_item_t *calls = NULL;
long long int limit; long long int limit;
@ -1192,13 +1190,13 @@ const char *call_list_ng(bencode_item_t *input, struct callmaster *m, bencode_it
} }
calls = bencode_dictionary_add_list(output, "calls"); calls = bencode_dictionary_add_list(output, "calls");
ng_list_calls(m, calls, limit); ng_list_calls(calls, limit);
return NULL; return NULL;
} }
const char *call_start_recording_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { const char *call_start_recording_ng(bencode_item_t *input, bencode_item_t *output) {
str callid; str callid;
struct call *call; struct call *call;
str metadata; str metadata;
@ -1206,7 +1204,7 @@ const char *call_start_recording_ng(bencode_item_t *input, struct callmaster *m,
if (!bencode_dictionary_get_str(input, "call-id", &callid)) if (!bencode_dictionary_get_str(input, "call-id", &callid))
return "No call-id in message"; return "No call-id in message";
bencode_dictionary_get_str(input, "metadata", &metadata); bencode_dictionary_get_str(input, "metadata", &metadata);
call = call_get_opmode(&callid, m, OP_OTHER); call = call_get_opmode(&callid, OP_OTHER);
if (!call) if (!call)
return "Unknown call-id"; return "Unknown call-id";
@ -1218,13 +1216,13 @@ const char *call_start_recording_ng(bencode_item_t *input, struct callmaster *m,
return NULL; return NULL;
} }
const char *call_stop_recording_ng(bencode_item_t *input, struct callmaster *m, bencode_item_t *output) { const char *call_stop_recording_ng(bencode_item_t *input, bencode_item_t *output) {
str callid; str callid;
struct call *call; struct call *call;
if (!bencode_dictionary_get_str(input, "call-id", &callid)) if (!bencode_dictionary_get_str(input, "call-id", &callid))
return "No call-id in message"; return "No call-id in message";
call = call_get_opmode(&callid, m, OP_OTHER); call = call_get_opmode(&callid, OP_OTHER);
if (!call) if (!call)
return "Unknown call-id"; return "Unknown call-id";

@ -13,7 +13,6 @@
struct call; struct call;
struct call_stats; struct call_stats;
struct callmaster;
struct streambuf_stream; struct streambuf_stream;
struct sockaddr_in6; struct sockaddr_in6;
@ -67,24 +66,24 @@ extern int trust_address_def;
extern int dtls_passive_def; extern int dtls_passive_def;
str *call_request_tcp(char **, struct callmaster *); str *call_request_tcp(char **);
str *call_lookup_tcp(char **, struct callmaster *); str *call_lookup_tcp(char **);
void call_delete_tcp(char **, struct callmaster *); void call_delete_tcp(char **);
void calls_status_tcp(struct callmaster *, struct streambuf_stream *); void calls_status_tcp(struct streambuf_stream *);
str *call_update_udp(char **, struct callmaster *, const char*, const endpoint_t *); str *call_update_udp(char **, const char*, const endpoint_t *);
str *call_lookup_udp(char **, struct callmaster *); str *call_lookup_udp(char **);
str *call_delete_udp(char **, struct callmaster *); str *call_delete_udp(char **);
str *call_query_udp(char **, struct callmaster *); str *call_query_udp(char **);
const char *call_offer_ng(bencode_item_t *, struct callmaster *, bencode_item_t *, const char*, const char *call_offer_ng(bencode_item_t *, bencode_item_t *, const char*,
const endpoint_t *); const endpoint_t *);
const char *call_answer_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); const char *call_answer_ng(bencode_item_t *, bencode_item_t *);
const char *call_delete_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); const char *call_delete_ng(bencode_item_t *, bencode_item_t *);
const char *call_query_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); const char *call_query_ng(bencode_item_t *, bencode_item_t *);
const char *call_list_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); const char *call_list_ng(bencode_item_t *, bencode_item_t *);
const char *call_start_recording_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); const char *call_start_recording_ng(bencode_item_t *, bencode_item_t *);
const char *call_stop_recording_ng(bencode_item_t *, struct callmaster *, bencode_item_t *); const char *call_stop_recording_ng(bencode_item_t *, bencode_item_t *);
void ng_call_stats(struct call *call, const str *fromtag, const str *totag, bencode_item_t *output, void ng_call_stats(struct call *call, const str *fromtag, const str *totag, bencode_item_t *output,
struct call_stats *totals); struct call_stats *totals);

@ -28,33 +28,33 @@
#include "rtpengine_config.h" #include "rtpengine_config.h"
typedef void (*cli_handler_func)(str *, struct callmaster *, struct streambuf *); typedef void (*cli_handler_func)(str *, struct streambuf *);
typedef struct { typedef struct {
const char *cmd; const char *cmd;
cli_handler_func handler; cli_handler_func handler;
} cli_handler_t; } cli_handler_t;
static void cli_incoming_list(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_list(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_set(str *instr, struct streambuf *replybuffer);
static void cli_incoming_terminate(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer);
static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_ksadd(str *instr, struct streambuf *replybuffer);
static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_ksrm(str *instr, struct streambuf *replybuffer);
static void cli_incoming_kslist(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_kslist(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_maxopenfiles(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_set_timeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_silenttimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_set_silenttimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_finaltimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_set_finaltimeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_set_loglevel(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_set_loglevel(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_list_numsessions(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_maxopenfiles(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_list_maxopenfiles(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_totals(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_list_totals(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_sessions(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_list_timeout(str *instr, struct streambuf *replybuffer);
static void cli_incoming_list_loglevel(str *instr, struct callmaster* m, struct streambuf *replybuffer); static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer);
static const cli_handler_t cli_top_handlers[] = { static const cli_handler_t cli_top_handlers[] = {
{ "list", cli_incoming_list }, { "list", cli_incoming_list },
@ -86,7 +86,7 @@ static const cli_handler_t cli_list_handlers[] = {
}; };
static void cli_handler_do(const cli_handler_t *handlers, str *instr, struct callmaster *m, static void cli_handler_do(const cli_handler_t *handlers, str *instr,
struct streambuf *replybuffer) struct streambuf *replybuffer)
{ {
const cli_handler_t *h; const cli_handler_t *h;
@ -94,14 +94,14 @@ static void cli_handler_do(const cli_handler_t *handlers, str *instr, struct cal
for (h = handlers; h->cmd; h++) { for (h = handlers; h->cmd; h++) {
if (str_shift_cmp(instr, h->cmd)) if (str_shift_cmp(instr, h->cmd))
continue; continue;
h->handler(instr, m, replybuffer); h->handler(instr, replybuffer);
return; return;
} }
streambuf_printf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", instr->s); streambuf_printf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", instr->s);
} }
static void destroy_own_foreign_calls(struct callmaster *m, unsigned int foreign_call, unsigned int uint_keyspace_db) { static void destroy_own_foreign_calls(unsigned int foreign_call, unsigned int uint_keyspace_db) {
struct call *c = NULL; struct call *c = NULL;
struct call_monologue *ml = NULL; struct call_monologue *ml = NULL;
GQueue call_list = G_QUEUE_INIT; GQueue call_list = G_QUEUE_INIT;
@ -155,19 +155,19 @@ static void destroy_own_foreign_calls(struct callmaster *m, unsigned int foreign
} }
} }
static void destroy_all_foreign_calls(struct callmaster *m) { static void destroy_all_foreign_calls(void) {
destroy_own_foreign_calls(m, CT_FOREIGN_CALL, UNDEFINED); destroy_own_foreign_calls(CT_FOREIGN_CALL, UNDEFINED);
} }
static void destroy_all_own_calls(struct callmaster *m) { static void destroy_all_own_calls(void) {
destroy_own_foreign_calls(m, CT_OWN_CALL, UNDEFINED); destroy_own_foreign_calls(CT_OWN_CALL, UNDEFINED);
} }
static void destroy_keyspace_foreign_calls(struct callmaster *m, unsigned int uint_keyspace_db) { static void destroy_keyspace_foreign_calls(unsigned int uint_keyspace_db) {
destroy_own_foreign_calls(m, CT_FOREIGN_CALL, uint_keyspace_db); destroy_own_foreign_calls(CT_FOREIGN_CALL, uint_keyspace_db);
} }
static void cli_incoming_list_totals(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list_totals(str *instr, struct streambuf *replybuffer) {
struct timeval avg, calls_dur_iv; struct timeval avg, calls_dur_iv;
u_int64_t num_sessions, min_sess_iv, max_sess_iv; u_int64_t num_sessions, min_sess_iv, max_sess_iv;
struct request_time offer_iv, answer_iv, delete_iv; struct request_time offer_iv, answer_iv, delete_iv;
@ -254,7 +254,7 @@ static void cli_incoming_list_totals(str *instr, struct callmaster* m, struct st
g_list_free(list); g_list_free(list);
} }
static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list_numsessions(str *instr, struct streambuf *replybuffer) {
rwlock_lock_r(&rtpe_callhash_lock); rwlock_lock_r(&rtpe_callhash_lock);
streambuf_printf(replybuffer, "Current sessions own: "UINT64F"\n", g_hash_table_size(rtpe_callhash) - atomic64_get(&rtpe_stats.foreign_sessions)); streambuf_printf(replybuffer, "Current sessions own: "UINT64F"\n", g_hash_table_size(rtpe_callhash) - atomic64_get(&rtpe_stats.foreign_sessions));
streambuf_printf(replybuffer, "Current sessions foreign: "UINT64F"\n", atomic64_get(&rtpe_stats.foreign_sessions)); streambuf_printf(replybuffer, "Current sessions foreign: "UINT64F"\n", atomic64_get(&rtpe_stats.foreign_sessions));
@ -262,14 +262,14 @@ static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, stru
rwlock_unlock_r(&rtpe_callhash_lock); rwlock_unlock_r(&rtpe_callhash_lock);
} }
static void cli_incoming_list_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuffer) {
/* don't lock anything while reading the value */ /* don't lock anything while reading the value */
streambuf_printf(replybuffer, "Maximum sessions configured on rtpengine: %d\n", rtpe_config.max_sessions); streambuf_printf(replybuffer, "Maximum sessions configured on rtpengine: %d\n", rtpe_config.max_sessions);
return ; return ;
} }
static void cli_incoming_list_maxopenfiles(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list_maxopenfiles(str *instr, struct streambuf *replybuffer) {
struct rlimit rlim; struct rlimit rlim;
pid_t pid = getpid(); pid_t pid = getpid();
@ -287,7 +287,7 @@ static void cli_incoming_list_maxopenfiles(str *instr, struct callmaster* m, str
return ; return ;
} }
static void cli_incoming_list_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list_timeout(str *instr, struct streambuf *replybuffer) {
rwlock_lock_r(&rtpe_config.config_lock); rwlock_lock_r(&rtpe_config.config_lock);
/* don't lock anything while reading the value */ /* don't lock anything while reading the value */
@ -300,7 +300,7 @@ static void cli_incoming_list_timeout(str *instr, struct callmaster* m, struct s
return ; return ;
} }
static void cli_incoming_list_callid(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) {
struct call* c=0; struct call* c=0;
struct call_monologue *ml; struct call_monologue *ml;
struct call_media *md; struct call_media *md;
@ -316,7 +316,7 @@ static void cli_incoming_list_callid(str *instr, struct callmaster* m, struct st
return; return;
} }
c = call_get(instr, m); c = call_get(instr);
if (!c) { if (!c) {
streambuf_printf(replybuffer, "\nCall Id not found (%s).\n\n",instr->s); streambuf_printf(replybuffer, "\nCall Id not found (%s).\n\n",instr->s);
@ -400,7 +400,7 @@ static void cli_incoming_list_callid(str *instr, struct callmaster* m, struct st
obj_put(c); obj_put(c);
} }
static void cli_incoming_list_sessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer) {
GHashTableIter iter; GHashTableIter iter;
gpointer key, value; gpointer key, value;
str *ptrkey; str *ptrkey;
@ -466,13 +466,13 @@ static void cli_incoming_list_sessions(str *instr, struct callmaster* m, struct
} }
} else { } else {
// list session for callid // list session for callid
cli_incoming_list_callid(instr, m, replybuffer); cli_incoming_list_callid(instr, replybuffer);
} }
return; return;
} }
static void cli_incoming_set_maxopenfiles(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuffer) {
unsigned long open_files_num; unsigned long open_files_num;
pid_t pid; pid_t pid;
char *endptr; char *endptr;
@ -505,7 +505,7 @@ static void cli_incoming_set_maxopenfiles(str *instr, struct callmaster* m, stru
} }
} }
static void cli_incoming_set_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuffer) {
long maxsessions_num; long maxsessions_num;
int disabled = -1; int disabled = -1;
char *endptr; char *endptr;
@ -540,7 +540,7 @@ static void cli_incoming_set_maxsessions(str *instr, struct callmaster* m, struc
return; return;
} }
static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer, int *conf_timeout) { static void cli_incoming_set_gentimeout(str *instr, struct streambuf *replybuffer, int *conf_timeout) {
long timeout_num; long timeout_num;
char *endptr; char *endptr;
@ -565,35 +565,35 @@ static void cli_incoming_set_gentimeout(str *instr, struct callmaster* m, struct
} }
} }
static void cli_incoming_set_timeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_set_timeout(str *instr, struct streambuf *replybuffer) {
cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.timeout); cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.timeout);
} }
static void cli_incoming_set_silenttimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_set_silenttimeout(str *instr, struct streambuf *replybuffer) {
cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.silent_timeout); cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.silent_timeout);
} }
static void cli_incoming_set_finaltimeout(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_set_finaltimeout(str *instr, struct streambuf *replybuffer) {
cli_incoming_set_gentimeout(instr, m, replybuffer, &rtpe_config.final_timeout); cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.final_timeout);
} }
static void cli_incoming_list(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list(str *instr, struct streambuf *replybuffer) {
if (str_shift(instr, 1)) { if (str_shift(instr, 1)) {
streambuf_printf(replybuffer, "%s\n", "More parameters required."); streambuf_printf(replybuffer, "%s\n", "More parameters required.");
return; return;
} }
cli_handler_do(cli_list_handlers, instr, m, replybuffer); cli_handler_do(cli_list_handlers, instr, replybuffer);
} }
static void cli_incoming_set(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_set(str *instr, struct streambuf *replybuffer) {
if (str_shift(instr, 1)) { if (str_shift(instr, 1)) {
streambuf_printf(replybuffer, "%s\n", "More parameters required."); streambuf_printf(replybuffer, "%s\n", "More parameters required.");
return; return;
} }
cli_handler_do(cli_set_handlers, instr, m, replybuffer); cli_handler_do(cli_set_handlers, instr, replybuffer);
} }
static void cli_incoming_terminate(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) {
struct call* c=0; struct call* c=0;
struct call_monologue *ml; struct call_monologue *ml;
GList *i; GList *i;
@ -606,10 +606,10 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre
// --- terminate all calls // --- terminate all calls
if (!str_memcmp(instr,"all")) { if (!str_memcmp(instr,"all")) {
// destroy own calls // destroy own calls
destroy_all_own_calls(m); destroy_all_own_calls();
// destroy foreign calls // destroy foreign calls
destroy_all_foreign_calls(m); destroy_all_foreign_calls();
// update cli // update cli
ilog(LOG_INFO,"All calls terminated by operator."); ilog(LOG_INFO,"All calls terminated by operator.");
@ -620,7 +620,7 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre
// --- terminate own calls // --- terminate own calls
} else if (!str_memcmp(instr,"own")) { } else if (!str_memcmp(instr,"own")) {
// destroy own calls // destroy own calls
destroy_all_own_calls(m); destroy_all_own_calls();
// update cli // update cli
ilog(LOG_INFO,"All own calls terminated by operator."); ilog(LOG_INFO,"All own calls terminated by operator.");
@ -631,7 +631,7 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre
// --- terminate foreign calls // --- terminate foreign calls
} else if (!str_memcmp(instr,"foreign")) { } else if (!str_memcmp(instr,"foreign")) {
// destroy foreign calls // destroy foreign calls
destroy_all_foreign_calls(m); destroy_all_foreign_calls();
// update cli // update cli
ilog(LOG_INFO,"All foreign calls terminated by operator."); ilog(LOG_INFO,"All foreign calls terminated by operator.");
@ -641,7 +641,7 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre
} }
// --- terminate a dedicated call id // --- terminate a dedicated call id
c = call_get(instr, m); c = call_get(instr);
if (!c) { if (!c) {
streambuf_printf(replybuffer, "\nCall Id not found (%s).\n\n",instr->s); streambuf_printf(replybuffer, "\nCall Id not found (%s).\n\n",instr->s);
@ -665,7 +665,7 @@ static void cli_incoming_terminate(str *instr, struct callmaster* m, struct stre
obj_put(c); obj_put(c);
} }
static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_ksadd(str *instr, struct streambuf *replybuffer) {
unsigned long uint_keyspace_db; unsigned long uint_keyspace_db;
char *endptr; char *endptr;
@ -684,7 +684,7 @@ static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambu
rwlock_lock_w(&rtpe_config.config_lock); rwlock_lock_w(&rtpe_config.config_lock);
if (!g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) { if (!g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) {
g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db));
redis_notify_subscribe_action(m, SUBSCRIBE_KEYSPACE, uint_keyspace_db); redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, uint_keyspace_db);
streambuf_printf(replybuffer, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db); streambuf_printf(replybuffer, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db);
} else { } else {
streambuf_printf(replybuffer, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db); streambuf_printf(replybuffer, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db);
@ -693,7 +693,7 @@ static void cli_incoming_ksadd(str *instr, struct callmaster* m, struct streambu
} }
} }
static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_ksrm(str *instr, struct streambuf *replybuffer) {
GList *l; GList *l;
unsigned long uint_keyspace_db; unsigned long uint_keyspace_db;
char *endptr; char *endptr;
@ -712,12 +712,12 @@ static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf
streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s); streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s);
} else if ((l = g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) { } else if ((l = g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) {
// remove this keyspace // remove this keyspace
redis_notify_subscribe_action(m, UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); redis_notify_subscribe_action(UNSUBSCRIBE_KEYSPACE, uint_keyspace_db);
g_queue_remove(&rtpe_config.redis_subscribed_keyspaces, l->data); g_queue_remove(&rtpe_config.redis_subscribed_keyspaces, l->data);
streambuf_printf(replybuffer, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db); streambuf_printf(replybuffer, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db);
// destroy foreign calls for this keyspace // destroy foreign calls for this keyspace
destroy_keyspace_foreign_calls(m, uint_keyspace_db); destroy_keyspace_foreign_calls(uint_keyspace_db);
// update cli // update cli
streambuf_printf(replybuffer, "Successfully removed all foreign calls for keyspace %lu.\n", uint_keyspace_db); streambuf_printf(replybuffer, "Successfully removed all foreign calls for keyspace %lu.\n", uint_keyspace_db);
@ -728,7 +728,7 @@ static void cli_incoming_ksrm(str *instr, struct callmaster* m, struct streambuf
} }
static void cli_incoming_kslist(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_kslist(str *instr, struct streambuf *replybuffer) {
GList *l; GList *l;
streambuf_printf(replybuffer, "\nSubscribed-on keyspaces:\n"); streambuf_printf(replybuffer, "\nSubscribed-on keyspaces:\n");
@ -747,7 +747,6 @@ static void cli_incoming(struct streambuf_stream *s) {
} }
static void cli_stream_readable(struct streambuf_stream *s) { static void cli_stream_readable(struct streambuf_stream *s) {
struct cli *cli = (void *) s->parent;
static const int MAXINPUT = 1024; static const int MAXINPUT = 1024;
char *inbuf; char *inbuf;
str instr; str instr;
@ -764,17 +763,17 @@ static void cli_stream_readable(struct streambuf_stream *s) {
ilog(LOG_INFO, "Got CLI command:%s",inbuf); ilog(LOG_INFO, "Got CLI command:%s",inbuf);
str_init(&instr, inbuf); str_init(&instr, inbuf);
cli_handler_do(cli_top_handlers, &instr, cli->callmaster, s->outbuf); cli_handler_do(cli_top_handlers, &instr, s->outbuf);
free(inbuf); free(inbuf);
streambuf_stream_shutdown(s); streambuf_stream_shutdown(s);
log_info_clear(); log_info_clear();
} }
struct cli *cli_new(struct poller *p, endpoint_t *ep, struct callmaster *m) { struct cli *cli_new(struct poller *p, endpoint_t *ep) {
struct cli *c; struct cli *c;
if (!p || !m) if (!p)
return NULL; return NULL;
c = obj_alloc0("cli", sizeof(*c), NULL); c = obj_alloc0("cli", sizeof(*c), NULL);
@ -801,7 +800,6 @@ struct cli *cli_new(struct poller *p, endpoint_t *ep, struct callmaster *m) {
} }
c->poller = p; c->poller = p;
c->callmaster = m;
obj_put(c); obj_put(c);
return c; return c;
@ -812,10 +810,10 @@ fail:
return NULL; return NULL;
} }
static void cli_incoming_list_loglevel(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer) {
streambuf_printf(replybuffer, "%i\n", g_atomic_int_get(&log_level)); streambuf_printf(replybuffer, "%i\n", g_atomic_int_get(&log_level));
} }
static void cli_incoming_set_loglevel(str *instr, struct callmaster* m, struct streambuf *replybuffer) { static void cli_incoming_set_loglevel(str *instr, struct streambuf *replybuffer) {
int nl; int nl;
if (str_shift(instr, 1)) { if (str_shift(instr, 1)) {

@ -8,12 +8,11 @@
struct cli { struct cli {
struct obj obj; struct obj obj;
struct callmaster *callmaster;
struct poller *poller; struct poller *poller;
struct streambuf_listener listeners[2]; struct streambuf_listener listeners[2];
}; };
struct cli *cli_new(struct poller *p, endpoint_t *, struct callmaster *m); struct cli *cli_new(struct poller *p, endpoint_t *);
#endif /* CLI_UDP_H_ */ #endif /* CLI_UDP_H_ */

@ -183,7 +183,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
// start offer timer // start offer timer
gettimeofday(&offer_start, NULL); gettimeofday(&offer_start, NULL);
errstr = call_offer_ng(dict, c->callmaster, resp, addr, sin); errstr = call_offer_ng(dict, resp, addr, sin);
g_atomic_int_inc(&cur->offer); g_atomic_int_inc(&cur->offer);
// stop offer timer // stop offer timer
@ -197,7 +197,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
// start answer timer // start answer timer
gettimeofday(&answer_start, NULL); gettimeofday(&answer_start, NULL);
errstr = call_answer_ng(dict, c->callmaster, resp); errstr = call_answer_ng(dict, resp);
g_atomic_int_inc(&cur->answer); g_atomic_int_inc(&cur->answer);
// stop answer timer // stop answer timer
@ -211,7 +211,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
// start delete timer // start delete timer
gettimeofday(&delete_start, NULL); gettimeofday(&delete_start, NULL);
errstr = call_delete_ng(dict, c->callmaster, resp); errstr = call_delete_ng(dict, resp);
g_atomic_int_inc(&cur->delete); g_atomic_int_inc(&cur->delete);
// stop delete timer // stop delete timer
@ -222,19 +222,19 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
ilog(LOG_INFO, "delete time = %llu.%06llu sec", (unsigned long long)delete_stop.tv_sec, (unsigned long long)delete_stop.tv_usec); ilog(LOG_INFO, "delete time = %llu.%06llu sec", (unsigned long long)delete_stop.tv_sec, (unsigned long long)delete_stop.tv_usec);
} }
else if (!str_cmp(&cmd, "query")) { else if (!str_cmp(&cmd, "query")) {
errstr = call_query_ng(dict, c->callmaster, resp); errstr = call_query_ng(dict, resp);
g_atomic_int_inc(&cur->query); g_atomic_int_inc(&cur->query);
} }
else if (!str_cmp(&cmd, "list")) { else if (!str_cmp(&cmd, "list")) {
errstr = call_list_ng(dict, c->callmaster, resp); errstr = call_list_ng(dict, resp);
g_atomic_int_inc(&cur->list); g_atomic_int_inc(&cur->list);
} }
else if (!str_cmp(&cmd, "start recording")) { else if (!str_cmp(&cmd, "start recording")) {
errstr = call_start_recording_ng(dict, c->callmaster, resp); errstr = call_start_recording_ng(dict, resp);
g_atomic_int_inc(&cur->start_recording); g_atomic_int_inc(&cur->start_recording);
} }
else if (!str_cmp(&cmd, "stop recording")) { else if (!str_cmp(&cmd, "stop recording")) {
errstr = call_stop_recording_ng(dict, c->callmaster, resp); errstr = call_stop_recording_ng(dict, resp);
g_atomic_int_inc(&cur->stop_recording); g_atomic_int_inc(&cur->stop_recording);
} }
else else
@ -309,15 +309,14 @@ out:
struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, struct callmaster *m, unsigned char tos) { struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) {
struct control_ng *c; struct control_ng *c;
if (!p || !m) if (!p)
return NULL; return NULL;
c = obj_alloc0("control_ng", sizeof(*c), NULL); c = obj_alloc0("control_ng", sizeof(*c), NULL);
c->callmaster = m;
cookie_cache_init(&c->cookie_cache); cookie_cache_init(&c->cookie_cache);
if (udp_listener_init(&c->udp_listeners[0], p, ep, control_ng_incoming, &c->obj)) if (udp_listener_init(&c->udp_listeners[0], p, ep, control_ng_incoming, &c->obj))

@ -8,7 +8,6 @@
struct poller; struct poller;
struct callmaster;
struct control_ng_stats { struct control_ng_stats {
sockaddr_t proxy; sockaddr_t proxy;
@ -25,12 +24,11 @@ struct control_ng_stats {
struct control_ng { struct control_ng {
struct obj obj; struct obj obj;
struct callmaster *callmaster;
struct cookie_cache cookie_cache; struct cookie_cache cookie_cache;
socket_t udp_listeners[2]; socket_t udp_listeners[2];
}; };
struct control_ng *control_ng_new(struct poller *, endpoint_t *, struct callmaster *, unsigned char); struct control_ng *control_ng_new(struct poller *, endpoint_t *, unsigned char);
void control_ng_init(void); void control_ng_init(void);
extern mutex_t rtpe_cngs_lock; extern mutex_t rtpe_cngs_lock;

@ -32,7 +32,6 @@ struct control_tcp {
pcre_extra *parse_ree; pcre_extra *parse_ree;
struct poller *poller; struct poller *poller;
struct callmaster *callmaster;
}; };
@ -91,13 +90,13 @@ static int control_stream_parse(struct streambuf_stream *s, char *line) {
if (!strcmp(out[RE_TCP_RL_CMD], "request")) if (!strcmp(out[RE_TCP_RL_CMD], "request"))
output = call_request_tcp(out, c->callmaster); output = call_request_tcp(out);
else if (!strcmp(out[RE_TCP_RL_CMD], "lookup")) else if (!strcmp(out[RE_TCP_RL_CMD], "lookup"))
output = call_lookup_tcp(out, c->callmaster); output = call_lookup_tcp(out);
else if (!strcmp(out[RE_TCP_D_CMD], "delete")) else if (!strcmp(out[RE_TCP_D_CMD], "delete"))
call_delete_tcp(out, c->callmaster); call_delete_tcp(out);
else if (!strcmp(out[RE_TCP_DIV_CMD], "status")) else if (!strcmp(out[RE_TCP_DIV_CMD], "status"))
calls_status_tcp(c->callmaster, s); calls_status_tcp(s);
else if (!strcmp(out[RE_TCP_DIV_CMD], "build") || !strcmp(out[RE_TCP_DIV_CMD], "version")) else if (!strcmp(out[RE_TCP_DIV_CMD], "build") || !strcmp(out[RE_TCP_DIV_CMD], "version"))
streambuf_printf(s->outbuf, "Version: %s\n", RTPENGINE_VERSION); streambuf_printf(s->outbuf, "Version: %s\n", RTPENGINE_VERSION);
else if (!strcmp(out[RE_TCP_DIV_CMD], "controls")) else if (!strcmp(out[RE_TCP_DIV_CMD], "controls"))
@ -155,15 +154,13 @@ static void control_incoming(struct streambuf_stream *s) {
} }
struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep, struct callmaster *m) { struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep) {
struct control_tcp *c; struct control_tcp *c;
const char *errptr; const char *errptr;
int erroff; int erroff;
if (!p) if (!p)
return NULL; return NULL;
if (!m)
return NULL;
c = obj_alloc0("control", sizeof(*c), NULL); c = obj_alloc0("control", sizeof(*c), NULL);
@ -195,7 +192,6 @@ struct control_tcp *control_tcp_new(struct poller *p, endpoint_t *ep, struct cal
c->parse_ree = pcre_study(c->parse_re, 0, &errptr); c->parse_ree = pcre_study(c->parse_re, 0, &errptr);
c->poller = p; c->poller = p;
c->callmaster = m;
obj_put(c); obj_put(c);
return c; return c;

@ -30,13 +30,12 @@
#define RE_TCP_DIV_CMD 14 #define RE_TCP_DIV_CMD 14
struct poller; struct poller;
struct callmaster;
struct control_tcp; struct control_tcp;
struct streambuf_stream; struct streambuf_stream;
struct control_tcp *control_tcp_new(struct poller *, endpoint_t *, struct callmaster *); struct control_tcp *control_tcp_new(struct poller *, endpoint_t *);

@ -86,13 +86,13 @@ static void control_udp_incoming(struct obj *obj, str *buf, const endpoint_t *si
log_info_c_string(out[RE_UDP_DQ_CALLID]); log_info_c_string(out[RE_UDP_DQ_CALLID]);
if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'U') if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'U')
reply = call_update_udp(out, u->callmaster, addr, sin); reply = call_update_udp(out, addr, sin);
else if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'L') else if (chrtoupper(out[RE_UDP_UL_CMD][0]) == 'L')
reply = call_lookup_udp(out, u->callmaster); reply = call_lookup_udp(out);
else if (chrtoupper(out[RE_UDP_DQ_CMD][0]) == 'D') else if (chrtoupper(out[RE_UDP_DQ_CMD][0]) == 'D')
reply = call_delete_udp(out, u->callmaster); reply = call_delete_udp(out);
else if (chrtoupper(out[RE_UDP_DQ_CMD][0]) == 'Q') else if (chrtoupper(out[RE_UDP_DQ_CMD][0]) == 'Q')
reply = call_query_udp(out, u->callmaster); reply = call_query_udp(out);
else if (chrtoupper(out[RE_UDP_V_CMD][0]) == 'V') { else if (chrtoupper(out[RE_UDP_V_CMD][0]) == 'V') {
iovlen = 2; iovlen = 2;
@ -134,17 +134,16 @@ out:
log_info_clear(); log_info_clear();
} }
struct control_udp *control_udp_new(struct poller *p, endpoint_t *ep, struct callmaster *m) { struct control_udp *control_udp_new(struct poller *p, endpoint_t *ep) {
struct control_udp *c; struct control_udp *c;
const char *errptr; const char *errptr;
int erroff; int erroff;
if (!p || !m) if (!p)
return NULL; return NULL;
c = obj_alloc0("control_udp", sizeof(*c), NULL); c = obj_alloc0("control_udp", sizeof(*c), NULL);
c->callmaster = m;
c->parse_re = pcre_compile( c->parse_re = pcre_compile(
/* cookie cmd flags callid viabranch:5 */ /* cookie cmd flags callid viabranch:5 */
"^(\\S+)\\s+(?:([ul])(\\S*)\\s+([^;]+)(?:;(\\S+))?\\s+" \ "^(\\S+)\\s+(?:([ul])(\\S*)\\s+([^;]+)(?:;(\\S+))?\\s+" \

@ -39,7 +39,6 @@
#define RE_UDP_V_PARMS 20 #define RE_UDP_V_PARMS 20
struct poller; struct poller;
struct callmaster;
@ -48,7 +47,6 @@ struct callmaster;
struct control_udp { struct control_udp {
struct obj obj; struct obj obj;
struct callmaster *callmaster;
struct cookie_cache cookie_cache; struct cookie_cache cookie_cache;
socket_t udp_listeners[2]; socket_t udp_listeners[2];
@ -61,7 +59,7 @@ struct control_udp {
struct control_udp *control_udp_new(struct poller *, endpoint_t *, struct callmaster *); struct control_udp *control_udp_new(struct poller *, endpoint_t *);

@ -85,16 +85,10 @@ int connect_to_graphite_server(const endpoint_t *graphite_ep) {
return 0; return 0;
} }
int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data) { int send_graphite_data(struct totalstats *sent_data) {
int rc=0; int rc=0;
// sanity checks
if (!cm) {
ilog(LOG_ERROR, "NULL callmaster when trying to send data");
return -1;
}
if (graphite_sock.fd < 0) { if (graphite_sock.fd < 0) {
ilog(LOG_ERROR,"Graphite socket is not connected."); ilog(LOG_ERROR,"Graphite socket is not connected.");
return -1; return -1;
@ -248,17 +242,11 @@ static inline void copy_with_lock(struct totalstats *ts_dst, struct totalstats *
mutex_unlock(ts_lock); mutex_unlock(ts_lock);
} }
void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int seconds) { void graphite_loop_run(endpoint_t *graphite_ep, int seconds) {
int rc=0; int rc=0;
struct pollfd wfds[1]; struct pollfd wfds[1];
// sanity checks
if (!cm) {
ilog(LOG_ERROR, "NULL callmaster");
return ;
}
if (!graphite_ep) { if (!graphite_ep) {
ilog(LOG_ERROR, "NULL graphite_ep"); ilog(LOG_ERROR, "NULL graphite_ep");
return ; return ;
@ -311,9 +299,9 @@ void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int secon
} }
if (graphite_sock.fd >= 0 && connection_state == STATE_CONNECTED) { if (graphite_sock.fd >= 0 && connection_state == STATE_CONNECTED) {
add_total_calls_duration_in_interval(cm, &graphite_interval_tv); add_total_calls_duration_in_interval(&graphite_interval_tv);
rc = send_graphite_data(cm, &graphite_stats); rc = send_graphite_data(&graphite_stats);
gettimeofday(&rtpe_latest_graphite_interval_start, NULL); gettimeofday(&rtpe_latest_graphite_interval_start, NULL);
if (rc < 0) { if (rc < 0) {
ilog(LOG_ERROR,"Sending graphite data failed."); ilog(LOG_ERROR,"Sending graphite data failed.");
@ -327,14 +315,6 @@ void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int secon
} }
void graphite_loop(void *d) { void graphite_loop(void *d) {
struct callmaster *cm = d;
// sanity checks
if (!cm) {
ilog(LOG_ERROR, "NULL callmaster");
return ;
}
if (rtpe_config.graphite_interval <= 0) { if (rtpe_config.graphite_interval <= 0) {
ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second."); ilog(LOG_WARNING,"Graphite send interval was not set. Setting it to 1 second.");
rtpe_config.graphite_interval=1; rtpe_config.graphite_interval=1;
@ -343,5 +323,5 @@ void graphite_loop(void *d) {
connect_to_graphite_server(&rtpe_config.graphite_ep); connect_to_graphite_server(&rtpe_config.graphite_ep);
while (!rtpe_shutdown) while (!rtpe_shutdown)
graphite_loop_run(cm, &rtpe_config.graphite_ep, rtpe_config.graphite_interval); // time in seconds graphite_loop_run(&rtpe_config.graphite_ep, rtpe_config.graphite_interval); // time in seconds
} }

@ -19,8 +19,8 @@ enum connection_state {
extern struct timeval rtpe_latest_graphite_interval_start; extern struct timeval rtpe_latest_graphite_interval_start;
int connect_to_graphite_server(const endpoint_t *ep); int connect_to_graphite_server(const endpoint_t *ep);
int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data); int send_graphite_data(struct totalstats *sent_data);
void graphite_loop_run(struct callmaster *cm, endpoint_t *graphite_ep, int seconds); void graphite_loop_run(endpoint_t *graphite_ep, int seconds);
void set_prefix(char* prefix); void set_prefix(char* prefix);
void graphite_loop(void *d); void graphite_loop(void *d);
void set_latest_graphite_interval_start(struct timeval *tv); void set_latest_graphite_interval_start(struct timeval *tv);

@ -41,11 +41,6 @@
struct main_context {
struct callmaster *m;
};
struct poller *rtpe_poller; struct poller *rtpe_poller;
@ -497,10 +492,12 @@ static void init_everything() {
if (call_interfaces_init()) if (call_interfaces_init())
abort(); abort();
statistics_init(); statistics_init();
if (call_init())
abort();
} }
static void create_everything(struct main_context *ctx) { static void create_everything(void) {
struct control_tcp *ct; struct control_tcp *ct;
struct control_udp *cu; struct control_udp *cu;
struct control_ng *cn; struct control_ng *cn;
@ -524,10 +521,6 @@ no_kernel:
if (!rtpe_poller) if (!rtpe_poller)
die("poller creation failed"); die("poller creation failed");
ctx->m = callmaster_new();
if (!ctx->m)
die("callmaster creation failed");
dtls_timer(rtpe_poller); dtls_timer(rtpe_poller);
rwlock_init(&rtpe_config.config_lock); rwlock_init(&rtpe_config.config_lock);
@ -546,7 +539,7 @@ no_kernel:
ct = NULL; ct = NULL;
if (tcp_listen_ep.port) { if (tcp_listen_ep.port) {
ct = control_tcp_new(rtpe_poller, &tcp_listen_ep, ctx->m); ct = control_tcp_new(rtpe_poller, &tcp_listen_ep);
if (!ct) if (!ct)
die("Failed to open TCP control connection port"); die("Failed to open TCP control connection port");
} }
@ -554,7 +547,7 @@ no_kernel:
cu = NULL; cu = NULL;
if (udp_listen_ep.port) { if (udp_listen_ep.port) {
interfaces_exclude_port(udp_listen_ep.port); interfaces_exclude_port(udp_listen_ep.port);
cu = control_udp_new(rtpe_poller, &udp_listen_ep, ctx->m); cu = control_udp_new(rtpe_poller, &udp_listen_ep);
if (!cu) if (!cu)
die("Failed to open UDP control connection port"); die("Failed to open UDP control connection port");
} }
@ -562,7 +555,7 @@ no_kernel:
cn = NULL; cn = NULL;
if (ng_listen_ep.port) { if (ng_listen_ep.port) {
interfaces_exclude_port(ng_listen_ep.port); interfaces_exclude_port(ng_listen_ep.port);
cn = control_ng_new(rtpe_poller, &ng_listen_ep, ctx->m, rtpe_config.control_tos); cn = control_ng_new(rtpe_poller, &ng_listen_ep, rtpe_config.control_tos);
if (!cn) if (!cn)
die("Failed to open UDP control connection port"); die("Failed to open UDP control connection port");
} }
@ -570,27 +563,27 @@ no_kernel:
cl = NULL; cl = NULL;
if (cli_listen_ep.port) { if (cli_listen_ep.port) {
interfaces_exclude_port(cli_listen_ep.port); interfaces_exclude_port(cli_listen_ep.port);
cl = cli_new(rtpe_poller, &cli_listen_ep, ctx->m); cl = cli_new(rtpe_poller, &cli_listen_ep);
if (!cl) if (!cl)
die("Failed to open UDP CLI connection port"); die("Failed to open UDP CLI connection port");
} }
if (!is_addr_unspecified(&redis_write_ep.address)) { if (!is_addr_unspecified(&redis_write_ep.address)) {
ctx->m->conf.redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required); rtpe_redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required);
if (!ctx->m->conf.redis_write) if (!rtpe_redis_write)
die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.", die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&redis_write_ep)); endpoint_print_buf(&redis_write_ep));
} }
if (!is_addr_unspecified(&redis_ep.address)) { if (!is_addr_unspecified(&redis_ep.address)) {
ctx->m->conf.redis = redis_new(&redis_ep, redis_db, redis_auth, ctx->m->conf.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); rtpe_redis = redis_new(&redis_ep, redis_db, redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
ctx->m->conf.redis_notify = redis_new(&redis_ep, redis_db, redis_auth, ctx->m->conf.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); rtpe_redis_notify = redis_new(&redis_ep, redis_db, redis_auth, rtpe_redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required);
if (!ctx->m->conf.redis || !ctx->m->conf.redis_notify) if (!rtpe_redis || !rtpe_redis_notify)
die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED parameter.", die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED parameter.",
endpoint_print_buf(&redis_ep)); endpoint_print_buf(&redis_ep));
if (!ctx->m->conf.redis_write) if (!rtpe_redis_write)
ctx->m->conf.redis_write = ctx->m->conf.redis; rtpe_redis_write = rtpe_redis;
} }
daemonize(); daemonize();
@ -600,12 +593,12 @@ no_kernel:
rtcp_init(); // must come after Homer init rtcp_init(); // must come after Homer init
if (ctx->m->conf.redis) { if (rtpe_redis) {
// start redis restore timer // start redis restore timer
gettimeofday(&redis_start, NULL); gettimeofday(&redis_start, NULL);
// restore // restore
if (redis_restore(ctx->m, ctx->m->conf.redis)) if (redis_restore(rtpe_redis))
die("Refusing to continue without working Redis database"); die("Refusing to continue without working Redis database");
// stop redis restore timer // stop redis restore timer
@ -624,13 +617,12 @@ no_kernel:
int main(int argc, char **argv) { int main(int argc, char **argv) {
struct main_context ctx;
int idx=0; int idx=0;
early_init(); early_init();
options(&argc, &argv); options(&argc, &argv);
init_everything(); init_everything();
create_everything(&ctx); create_everything();
ilog(LOG_INFO, "Startup complete, version %s", RTPENGINE_VERSION); ilog(LOG_INFO, "Startup complete, version %s", RTPENGINE_VERSION);
@ -638,10 +630,10 @@ int main(int argc, char **argv) {
thread_create_detach(poller_timer_loop, rtpe_poller); thread_create_detach(poller_timer_loop, rtpe_poller);
if (!is_addr_unspecified(&redis_ep.address)) if (!is_addr_unspecified(&redis_ep.address))
thread_create_detach(redis_notify_loop, ctx.m); thread_create_detach(redis_notify_loop, NULL);
if (!is_addr_unspecified(&rtpe_config.graphite_ep.address)) if (!is_addr_unspecified(&rtpe_config.graphite_ep.address))
thread_create_detach(graphite_loop, ctx.m); thread_create_detach(graphite_loop, NULL);
thread_create_detach(ice_thread_run, NULL); thread_create_detach(ice_thread_run, NULL);
@ -663,7 +655,7 @@ int main(int argc, char **argv) {
} }
if (!is_addr_unspecified(&redis_ep.address)) if (!is_addr_unspecified(&redis_ep.address))
redis_notify_event_base_action(ctx.m, EVENT_BASE_LOOPBREAK); redis_notify_event_base_action(EVENT_BASE_LOOPBREAK);
threads_join_all(1); threads_join_all(1);

@ -1596,7 +1596,7 @@ out:
ca = sfd->call ? : NULL; ca = sfd->call ? : NULL;
if (ca && update) { if (ca && update) {
redis_update_onekey(ca, ca->callmaster->conf.redis_write); redis_update_onekey(ca, rtpe_redis_write);
} }
done: done:
log_info_clear(); log_info_clear();

@ -32,6 +32,14 @@
#include "ssrc.h" #include "ssrc.h"
#include "main.h" #include "main.h"
struct redis *rtpe_redis;
struct redis *rtpe_redis_write;
struct redis *rtpe_redis_notify;
struct event_base *rtpe_redis_notify_event_base;
struct redisAsyncContext *rtpe_redis_notify_async_context;
INLINE redisReply *redis_expect(int type, redisReply *r) { INLINE redisReply *redis_expect(int type, redisReply *r) {
if (!r) if (!r)
@ -71,7 +79,7 @@ static int redisCommandNR(redisContext *r, const char *fmt, ...)
#define REDIS_FMT(x) (x)->len, (x)->str #define REDIS_FMT(x) (x)->len, (x)->str
static int redis_check_conn(struct redis *r); static int redis_check_conn(struct redis *r);
static void json_restore_call(struct redis *r, struct callmaster *m, const str *id, enum call_type type); static void json_restore_call(struct redis *r, const str *id, enum call_type type);
static void redis_pipe(struct redis *r, const char *fmt, ...) { static void redis_pipe(struct redis *r, const char *fmt, ...) {
va_list ap; va_list ap;
@ -260,25 +268,17 @@ err:
void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) { void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata) {
struct callmaster *cm = privdata;
struct redis *r = 0; struct redis *r = 0;
struct call *c = NULL; struct call *c = NULL;
str callid; str callid;
str keyspace_id; str keyspace_id;
// sanity checks if (!rtpe_redis_notify) {
if (!cm) {
rlog(LOG_ERROR, "Struct callmaster is NULL on on_redis_notification");
return;
}
if (!cm->conf.redis_notify) {
rlog(LOG_ERROR, "A redis notification has been received but no redis_notify database found"); rlog(LOG_ERROR, "A redis notification has been received but no redis_notify database found");
return; return;
} }
r = cm->conf.redis_notify; r = rtpe_redis_notify;
mutex_lock(&r->lock); mutex_lock(&r->lock);
@ -323,7 +323,7 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
} }
if (strncmp(rr->element[3]->str,"set",3)==0) { if (strncmp(rr->element[3]->str,"set",3)==0) {
c = call_get(&callid, cm); c = call_get(&callid);
if (c) { if (c) {
rwlock_unlock_w(&c->master_lock); rwlock_unlock_w(&c->master_lock);
if (IS_FOREIGN_CALL(c)) if (IS_FOREIGN_CALL(c))
@ -333,11 +333,11 @@ void on_redis_notification(redisAsyncContext *actx, void *reply, void *privdata)
goto err; goto err;
} }
} }
json_restore_call(r, cm, &callid, CT_FOREIGN_CALL); json_restore_call(r, &callid, CT_FOREIGN_CALL);
} }
if (strncmp(rr->element[3]->str,"del",3)==0) { if (strncmp(rr->element[3]->str,"del",3)==0) {
c = call_get(&callid, cm); c = call_get(&callid);
if (!c) { if (!c) {
rlog(LOG_NOTICE, "Redis-Notifier: DEL did not find call with callid: %s\n", rr->element[2]->str); rlog(LOG_NOTICE, "Redis-Notifier: DEL did not find call with callid: %s\n", rr->element[2]->str);
goto err; goto err;
@ -376,36 +376,30 @@ void redis_async_context_disconnect(const redisAsyncContext *redis_notify_async_
} }
} }
int redis_async_context_alloc(struct callmaster *cm) { int redis_async_context_alloc() {
struct redis *r = 0; struct redis *r = 0;
// sanity checks if (!rtpe_redis_notify) {
if (!cm) {
rlog(LOG_ERROR, "Struct callmaster is NULL on context free");
return -1;
}
if (!cm->conf.redis_notify) {
rlog(LOG_INFO, "redis_notify database is NULL."); rlog(LOG_INFO, "redis_notify database is NULL.");
return -1; return -1;
} }
// get redis_notify database // get redis_notify database
r = cm->conf.redis_notify; r = rtpe_redis_notify;
rlog(LOG_INFO, "Use Redis %s for notifications", endpoint_print_buf(&r->endpoint)); rlog(LOG_INFO, "Use Redis %s for notifications", endpoint_print_buf(&r->endpoint));
// alloc async context // alloc async context
cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); rtpe_redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port);
if (!cm->conf.redis_notify_async_context) { if (!rtpe_redis_notify_async_context) {
rlog(LOG_ERROR, "redis_notify_async_context can't create new"); rlog(LOG_ERROR, "redis_notify_async_context can't create new");
return -1; return -1;
} }
if (cm->conf.redis_notify_async_context->err) { if (rtpe_redis_notify_async_context->err) {
rlog(LOG_ERROR, "redis_notify_async_context can't create new error: %s", cm->conf.redis_notify_async_context->errstr); rlog(LOG_ERROR, "redis_notify_async_context can't create new error: %s", rtpe_redis_notify_async_context->errstr);
return -1; return -1;
} }
if (redisAsyncSetDisconnectCallback(cm->conf.redis_notify_async_context, redis_async_context_disconnect) != REDIS_OK) { if (redisAsyncSetDisconnectCallback(rtpe_redis_notify_async_context, redis_async_context_disconnect) != REDIS_OK) {
rlog(LOG_ERROR, "redis_notify_async_context can't set disconnect callback"); rlog(LOG_ERROR, "redis_notify_async_context can't set disconnect callback");
return -1; return -1;
} }
@ -413,14 +407,8 @@ int redis_async_context_alloc(struct callmaster *cm) {
return 0; return 0;
} }
int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action action) { int redis_notify_event_base_action(enum event_base_action action) {
// sanity checks if (!rtpe_redis_notify_event_base && action!=EVENT_BASE_ALLOC) {
if (!cm) {
rlog(LOG_ERROR, "Struct callmaster is NULL on event base action %d", action);
return -1;
}
if (!cm->conf.redis_notify_event_base && action!=EVENT_BASE_ALLOC) {
rlog(LOG_ERROR, "redis_notify_event_base is NULL on event base action %d", action); rlog(LOG_ERROR, "redis_notify_event_base is NULL on event base action %d", action);
return -1; return -1;
} }
@ -428,8 +416,8 @@ int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action
// exec event base action // exec event base action
switch (action) { switch (action) {
case EVENT_BASE_ALLOC: case EVENT_BASE_ALLOC:
cm->conf.redis_notify_event_base = event_base_new(); rtpe_redis_notify_event_base = event_base_new();
if (!cm->conf.redis_notify_event_base) { if (!rtpe_redis_notify_event_base) {
rlog(LOG_ERROR, "Fail alloc redis_notify_event_base"); rlog(LOG_ERROR, "Fail alloc redis_notify_event_base");
return -1; return -1;
} else { } else {
@ -438,12 +426,12 @@ int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action
break; break;
case EVENT_BASE_FREE: case EVENT_BASE_FREE:
event_base_free(cm->conf.redis_notify_event_base); event_base_free(rtpe_redis_notify_event_base);
rlog(LOG_DEBUG, "Success free redis_notify_event_base"); rlog(LOG_DEBUG, "Success free redis_notify_event_base");
break; break;
case EVENT_BASE_LOOPBREAK: case EVENT_BASE_LOOPBREAK:
if (event_base_loopbreak(cm->conf.redis_notify_event_base)) { if (event_base_loopbreak(rtpe_redis_notify_event_base)) {
rlog(LOG_ERROR, "Fail loopbreak redis_notify_event_base"); rlog(LOG_ERROR, "Fail loopbreak redis_notify_event_base");
return -1; return -1;
} else { } else {
@ -459,38 +447,32 @@ int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action
return 0; return 0;
} }
int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action action, int keyspace) { int redis_notify_subscribe_action(enum subscribe_action action, int keyspace) {
// sanity checks if (!rtpe_redis_notify_async_context) {
if (!cm) {
rlog(LOG_ERROR, "Struct callmaster is NULL on subscribe action");
return -1;
}
if (!cm->conf.redis_notify_async_context) {
rlog(LOG_ERROR, "redis_notify_async_context is NULL on subscribe action"); rlog(LOG_ERROR, "redis_notify_async_context is NULL on subscribe action");
return -1; return -1;
} }
if (cm->conf.redis_notify_async_context->err) { if (rtpe_redis_notify_async_context->err) {
rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", cm->conf.redis_notify_async_context->errstr); rlog(LOG_ERROR, "redis_notify_async_context error on subscribe action: %s", rtpe_redis_notify_async_context->errstr);
return -1; return -1;
} }
switch (action) { switch (action) {
case SUBSCRIBE_KEYSPACE: case SUBSCRIBE_KEYSPACE:
if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "psubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE"); rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON SUBSCRIBE_KEYSPACE");
return -1; return -1;
} }
break; break;
case UNSUBSCRIBE_KEYSPACE: case UNSUBSCRIBE_KEYSPACE:
if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void*)cm, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) { if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "punsubscribe __keyspace@%i__:*", keyspace) != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE"); rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_KEYSPACE");
return -1; return -1;
} }
break; break;
case UNSUBSCRIBE_ALL: case UNSUBSCRIBE_ALL:
if (redisAsyncCommand(cm->conf.redis_notify_async_context, on_redis_notification, (void *) cm, "punsubscribe") != REDIS_OK) { if (redisAsyncCommand(rtpe_redis_notify_async_context, on_redis_notification, NULL, "punsubscribe") != REDIS_OK) {
rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL"); rlog(LOG_ERROR, "Fail redisAsyncCommand on JSON UNSUBSCRIBE_ALL");
return -1; return -1;
} }
@ -503,39 +485,33 @@ int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action a
return 0; return 0;
} }
static int redis_notify(struct callmaster *cm) { static int redis_notify() {
struct redis *r = 0; struct redis *r = 0;
GList *l; GList *l;
// sanity checks if (!rtpe_redis_notify) {
if (!cm) {
rlog(LOG_ERROR, "Struct callmaster is NULL on redis_notify()");
return -1;
}
if (!cm->conf.redis_notify) {
rlog(LOG_ERROR, "redis_notify database is NULL on redis_notify()"); rlog(LOG_ERROR, "redis_notify database is NULL on redis_notify()");
return -1; return -1;
} }
if (!cm->conf.redis_notify_async_context) { if (!rtpe_redis_notify_async_context) {
rlog(LOG_ERROR, "redis_notify_async_context is NULL on redis_notify()"); rlog(LOG_ERROR, "redis_notify_async_context is NULL on redis_notify()");
return -1; return -1;
} }
if (!cm->conf.redis_notify_event_base) { if (!rtpe_redis_notify_event_base) {
rlog(LOG_ERROR, "redis_notify_event_base is NULL on redis_notify()"); rlog(LOG_ERROR, "redis_notify_event_base is NULL on redis_notify()");
return -1; return -1;
} }
// get redis_notify database // get redis_notify database
r = cm->conf.redis_notify; r = rtpe_redis_notify;
rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint)); rlog(LOG_INFO, "Use Redis %s to subscribe to notifications", endpoint_print_buf(&r->endpoint));
// attach event base // attach event base
if (redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base) == REDIS_ERR) { if (redisLibeventAttach(rtpe_redis_notify_async_context, rtpe_redis_notify_event_base) == REDIS_ERR) {
if (cm->conf.redis_notify_async_context->err) { if (rtpe_redis_notify_async_context->err) {
rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", cm->conf.redis_notify_async_context->errstr); rlog(LOG_ERROR, "redis_notify_async_context can't attach event base error: %s", rtpe_redis_notify_async_context->errstr);
} else { } else {
rlog(LOG_ERROR, "redis_notify_async_context can't attach event base"); rlog(LOG_ERROR, "redis_notify_async_context can't attach event base");
@ -546,12 +522,12 @@ static int redis_notify(struct callmaster *cm) {
// subscribe to the values in the configured keyspaces // subscribe to the values in the configured keyspaces
rwlock_lock_r(&rtpe_config.config_lock); rwlock_lock_r(&rtpe_config.config_lock);
for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) { for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) {
redis_notify_subscribe_action(cm, SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data)); redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, GPOINTER_TO_UINT(l->data));
} }
rwlock_unlock_r(&rtpe_config.config_lock); rwlock_unlock_r(&rtpe_config.config_lock);
// dispatch event base => thread blocks here // dispatch event base => thread blocks here
if (event_base_dispatch(cm->conf.redis_notify_event_base) < 0) { if (event_base_dispatch(rtpe_redis_notify_event_base) < 0) {
rlog(LOG_ERROR, "Fail event_base_dispatch()"); rlog(LOG_ERROR, "Fail event_base_dispatch()");
return -1; return -1;
} }
@ -562,16 +538,9 @@ static int redis_notify(struct callmaster *cm) {
void redis_notify_loop(void *d) { void redis_notify_loop(void *d) {
int seconds = 1, redis_notify_return = 0; int seconds = 1, redis_notify_return = 0;
time_t next_run = rtpe_now.tv_sec; time_t next_run = rtpe_now.tv_sec;
struct callmaster *cm = (struct callmaster *)d;
struct redis *r; struct redis *r;
// sanity checks r = rtpe_redis_notify;
if (!cm) {
ilog(LOG_ERROR, "NULL callmaster");
return ;
}
r = cm->conf.redis_notify;
if (!r) { if (!r) {
rlog(LOG_ERROR, "Don't use Redis notifications. See --redis-notifications parameter."); rlog(LOG_ERROR, "Don't use Redis notifications. See --redis-notifications parameter.");
return ; return ;
@ -584,18 +553,18 @@ void redis_notify_loop(void *d) {
} }
// alloc redis async context // alloc redis async context
if (redis_async_context_alloc(cm) < 0) { if (redis_async_context_alloc() < 0) {
return ; return ;
} }
// alloc event base // alloc event base
if (redis_notify_event_base_action(cm, EVENT_BASE_ALLOC) < 0) { if (redis_notify_event_base_action(EVENT_BASE_ALLOC) < 0) {
return ; return ;
} }
// initial redis_notify // initial redis_notify
if (redis_check_conn(r) == REDIS_STATE_CONNECTED) { if (redis_check_conn(r) == REDIS_STATE_CONNECTED) {
redis_notify_return = redis_notify(cm); redis_notify_return = redis_notify();
} }
// loop redis_notify => in case of lost connection // loop redis_notify => in case of lost connection
@ -610,23 +579,23 @@ void redis_notify_loop(void *d) {
if (redis_check_conn(r) == REDIS_STATE_RECONNECTED || redis_notify_return < 0) { if (redis_check_conn(r) == REDIS_STATE_RECONNECTED || redis_notify_return < 0) {
// alloc new redis async context upon redis breakdown // alloc new redis async context upon redis breakdown
if (redis_async_context_alloc(cm) < 0) { if (redis_async_context_alloc() < 0) {
continue; continue;
} }
// prepare notifications // prepare notifications
redis_notify_return = redis_notify(cm); redis_notify_return = redis_notify();
} }
} }
// unsubscribe notifications // unsubscribe notifications
redis_notify_subscribe_action(cm, UNSUBSCRIBE_ALL, 0); redis_notify_subscribe_action(UNSUBSCRIBE_ALL, 0);
// free async context // free async context
redisAsyncDisconnect(cm->conf.redis_notify_async_context); redisAsyncDisconnect(rtpe_redis_notify_async_context);
// free event base // free event base
redis_notify_event_base_action(cm, EVENT_BASE_FREE); redis_notify_event_base_action(EVENT_BASE_FREE);
} }
struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) { struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) {
@ -1436,7 +1405,7 @@ static int json_build_ssrc(struct call *c, JsonReader *root_reader) {
return 0; return 0;
} }
static void json_restore_call(struct redis *r, struct callmaster *m, const str *callid, enum call_type type) { static void json_restore_call(struct redis *r, const str *callid, enum call_type type) {
redisReply* rr_jsonStr; redisReply* rr_jsonStr;
struct redis_hash call; struct redis_hash call;
struct redis_list tags, sfds, streams, medias, maps; struct redis_list tags, sfds, streams, medias, maps;
@ -1462,7 +1431,7 @@ static void json_restore_call(struct redis *r, struct callmaster *m, const str *
if (!root_reader) if (!root_reader)
goto err1; goto err1;
c = call_get_or_create(callid, m, type); c = call_get_or_create(callid, type);
err = "failed to create call struct"; err = "failed to create call struct";
if (!c) if (!c)
goto err1; goto err1;
@ -1582,9 +1551,9 @@ err1:
if (c) if (c)
call_destroy(c); call_destroy(c);
else { else {
mutex_lock(&m->conf.redis_write->lock); mutex_lock(&rtpe_redis_write->lock);
redisCommandNR(m->conf.redis_write->ctx, "DEL " PB, STR(callid)); redisCommandNR(rtpe_redis_write->ctx, "DEL " PB, STR(callid));
mutex_unlock(&m->conf.redis_write->lock); mutex_unlock(&rtpe_redis_write->lock);
} }
} }
if (c) if (c)
@ -1592,7 +1561,6 @@ err1:
} }
struct thread_ctx { struct thread_ctx {
struct callmaster *m;
GQueue r_q; GQueue r_q;
mutex_t r_m; mutex_t r_m;
}; };
@ -1610,14 +1578,14 @@ static void restore_thread(void *call_p, void *ctx_p) {
r = g_queue_pop_head(&ctx->r_q); r = g_queue_pop_head(&ctx->r_q);
mutex_unlock(&ctx->r_m); mutex_unlock(&ctx->r_m);
json_restore_call(r, ctx->m, &callid, CT_OWN_CALL); json_restore_call(r, &callid, CT_OWN_CALL);
mutex_lock(&ctx->r_m); mutex_lock(&ctx->r_m);
g_queue_push_tail(&ctx->r_q, r); g_queue_push_tail(&ctx->r_q, r);
mutex_unlock(&ctx->r_m); mutex_unlock(&ctx->r_m);
} }
int redis_restore(struct callmaster *m, struct redis *r) { int redis_restore(struct redis *r) {
redisReply *calls = NULL, *call; redisReply *calls = NULL, *call;
int i, ret = -1; int i, ret = -1;
GThreadPool *gtp; GThreadPool *gtp;
@ -1647,7 +1615,6 @@ int redis_restore(struct callmaster *m, struct redis *r) {
goto err; goto err;
} }
ctx.m = m;
mutex_init(&ctx.r_m); mutex_init(&ctx.r_m);
g_queue_init(&ctx.r_q); g_queue_init(&ctx.r_q);
for (i = 0; i < rtpe_config.redis_num_threads; i++) for (i = 0; i < rtpe_config.redis_num_threads; i++)

@ -43,7 +43,6 @@ enum subscribe_action {
UNSUBSCRIBE_ALL, UNSUBSCRIBE_ALL,
}; };
struct callmaster;
struct call; struct call;
@ -74,6 +73,15 @@ struct redis_list {
}; };
extern struct redis *rtpe_redis;
extern struct redis *rtpe_redis_write;
extern struct redis *rtpe_redis_notify;
extern struct event_base *rtpe_redis_notify_event_base;
extern struct redisAsyncContext *rtpe_redis_notify_async_context;
#if !GLIB_CHECK_VERSION(2,40,0) #if !GLIB_CHECK_VERSION(2,40,0)
INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) { INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) {
gboolean ret = TRUE; gboolean ret = TRUE;
@ -93,13 +101,13 @@ void redis_notify_loop(void *d);
struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required); struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required);
int redis_restore(struct callmaster *, struct redis *); int redis_restore(struct redis *);
void redis_update(struct call *, struct redis *); void redis_update(struct call *, struct redis *);
void redis_update_onekey(struct call *c, struct redis *r); void redis_update_onekey(struct call *c, struct redis *r);
void redis_delete(struct call *, struct redis *); void redis_delete(struct call *, struct redis *);
void redis_wipe(struct redis *); void redis_wipe(struct redis *);
int redis_notify_event_base_action(struct callmaster *cm, enum event_base_action); int redis_notify_event_base_action(enum event_base_action);
int redis_notify_subscribe_action(struct callmaster *cm, enum subscribe_action action, int keyspace); int redis_notify_subscribe_action(enum subscribe_action action, int keyspace);

Loading…
Cancel
Save