From 399e15b39aff7e2336aa0860ca3de2b22cc32113 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 3 Jan 2018 10:19:54 -0500 Subject: [PATCH] make the callhash global Change-Id: Ifa1fbb5d1b1f623dbc6a1bfac556342735b40161 --- daemon/call.c | 107 ++++++++++++--------------------------- daemon/call.h | 8 +-- daemon/call_interfaces.c | 14 ++--- daemon/cli.c | 24 ++++----- daemon/graphite.c | 6 +-- daemon/statistics.c | 4 +- 6 files changed, 62 insertions(+), 101 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 8e992965a..17d5132bf 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -115,6 +115,10 @@ const struct transport_protocol transport_protocols[] = { }; const int num_transport_protocols = G_N_ELEMENTS(transport_protocols); + +rwlock_t rtpe_callhash_lock; +GHashTable *rtpe_callhash; + /* ********** */ static void __monologue_destroy(struct call_monologue *monologue); @@ -477,14 +481,14 @@ static void callmaster_timer(void *ptr) { hlp.addr_sfd = g_hash_table_new(g_endpoint_hash, g_endpoint_eq); /* obtain the call list and make a copy from it so not to hold the lock */ - rwlock_lock_r(&m->hashlock); - l = g_hash_table_get_values(m->callhash); + rwlock_lock_r(&rtpe_callhash_lock); + l = g_hash_table_get_values(rtpe_callhash); if (l) { calls = g_list_copy(l); g_list_free(l); g_list_foreach(calls, call_obj_get, NULL); } - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); if (calls) { g_list_foreach(calls, call_timer_iterator, &hlp); @@ -611,11 +615,11 @@ struct callmaster *callmaster_new(struct poller *p) { c = obj_alloc0("callmaster", sizeof(*c), NULL); - c->callhash = g_hash_table_new(str_hash, str_equal); - if (!c->callhash) + rtpe_callhash = g_hash_table_new(str_hash, str_equal); + if (!rtpe_callhash) goto fail; c->poller = p; - rwlock_init(&c->hashlock); + rwlock_init(&rtpe_callhash_lock); c->info_re = pcre_compile("^([^:,]+)(?::(.*?))?(?:$|,)", PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); if (!c->info_re) @@ -1755,8 +1759,8 @@ static struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, struct call *call; struct call_monologue *ml; - rwlock_lock_r(&m->hashlock); - g_hash_table_iter_init(&iter, m->callhash); + rwlock_lock_r(&rtpe_callhash_lock); + g_hash_table_iter_init(&iter, rtpe_callhash); while (g_hash_table_iter_next(&iter, &key, &value)) { call = (struct call*) value; @@ -1770,7 +1774,7 @@ static struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, timeval_add(&res, &res, &call_duration); } } - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); return res; } @@ -1794,11 +1798,11 @@ void call_destroy(struct call *c) { m = c->callmaster; p = m->poller; - rwlock_lock_w(&m->hashlock); - ret = (g_hash_table_lookup(m->callhash, &c->callid) == c); + rwlock_lock_w(&rtpe_callhash_lock); + ret = (g_hash_table_lookup(rtpe_callhash, &c->callid) == c); if (ret) - g_hash_table_remove(m->callhash, &c->callid); - rwlock_unlock_w(&m->hashlock); + g_hash_table_remove(rtpe_callhash, &c->callid); + rwlock_unlock_w(&rtpe_callhash_lock); // if call not found in callhash => previously deleted if (!ret) @@ -2047,20 +2051,20 @@ struct call *call_get_or_create(const str *callid, struct callmaster *m, enum ca struct call *c; restart: - rwlock_lock_r(&m->hashlock); - c = g_hash_table_lookup(m->callhash, callid); + rwlock_lock_r(&rtpe_callhash_lock); + c = g_hash_table_lookup(rtpe_callhash, callid); if (!c) { - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); /* completely new call-id, create call */ c = call_create(callid, m); - rwlock_lock_w(&m->hashlock); - if (g_hash_table_lookup(m->callhash, callid)) { + rwlock_lock_w(&rtpe_callhash_lock); + if (g_hash_table_lookup(rtpe_callhash, callid)) { /* preempted */ - rwlock_unlock_w(&m->hashlock); + rwlock_unlock_w(&rtpe_callhash_lock); obj_put(c); goto restart; } - g_hash_table_insert(m->callhash, &c->callid, obj_get(c)); + g_hash_table_insert(rtpe_callhash, &c->callid, obj_get(c)); if (type == CT_FOREIGN_CALL) /* foreign call*/ c->foreign_call = 1; @@ -2068,12 +2072,12 @@ restart: statistics_update_foreignown_inc(m,c); rwlock_lock_w(&c->master_lock); - rwlock_unlock_w(&m->hashlock); + rwlock_unlock_w(&rtpe_callhash_lock); } else { obj_hold(c); rwlock_lock_w(&c->master_lock); - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); } log_info_call(c); @@ -2084,16 +2088,16 @@ restart: struct call *call_get(const str *callid, struct callmaster *m) { struct call *ret; - rwlock_lock_r(&m->hashlock); - ret = g_hash_table_lookup(m->callhash, callid); + rwlock_lock_r(&rtpe_callhash_lock); + ret = g_hash_table_lookup(rtpe_callhash, callid); if (!ret) { - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); return NULL; } rwlock_lock_w(&ret->master_lock); obj_hold(ret); - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); log_info_call(ret); return ret; @@ -2488,57 +2492,12 @@ static void callmaster_get_all_calls_interator(void *key, void *val, void *ptr) } void callmaster_get_all_calls(struct callmaster *m, GQueue *q) { - rwlock_lock_r(&m->hashlock); - g_hash_table_foreach(m->callhash, callmaster_get_all_calls_interator, q); - rwlock_unlock_r(&m->hashlock); - -} - + rwlock_lock_r(&rtpe_callhash_lock); + g_hash_table_foreach(rtpe_callhash, callmaster_get_all_calls_interator, q); + rwlock_unlock_r(&rtpe_callhash_lock); -#if 0 -// unused -// simplifty redis_write <> redis if put back into use -static void calls_dump_iterator(void *key, void *val, void *ptr) { - struct call *c = val; - struct callmaster *m = c->callmaster; - - if (m->conf.redis_write) { - redis_update(c, m->conf.redis_write); - } else if (m->conf.redis) { - redis_update(c, m->conf.redis); - } } -void calls_dump_redis(struct callmaster *m) { - if (!m->conf.redis) - return; - - ilog(LOG_DEBUG, "Start dumping all call data to Redis...\n"); - redis_wipe(m->conf.redis); - g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); - ilog(LOG_DEBUG, "Finished dumping all call data to Redis\n"); -} - -void calls_dump_redis_read(struct callmaster *m) { - if (!m->conf.redis_read) - return; - - ilog(LOG_DEBUG, "Start dumping all call data to read Redis...\n"); - redis_wipe(m->conf.redis_read); - g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); - ilog(LOG_DEBUG, "Finished dumping all call data to read Redis\n"); -} - -void calls_dump_redis_write(struct callmaster *m) { - if (!m->conf.redis_write) - return; - - ilog(LOG_DEBUG, "Start dumping all call data to write Redis...\n"); - redis_wipe(m->conf.redis_write); - g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); - ilog(LOG_DEBUG, "Finished dumping all call data to write Redis\n"); -} -#endif const struct transport_protocol *transport_protocol(const str *s) { int i; diff --git a/daemon/call.h b/daemon/call.h index af6fbd331..05b7e4afe 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -416,9 +416,6 @@ struct callmaster_config { struct callmaster { struct obj obj; - rwlock_t hashlock; - GHashTable *callhash; - /* XXX rework these */ struct stats statsps; /* per second stats, running timer */ struct stats stats; /* copied from statsps once a second */ @@ -441,6 +438,11 @@ struct callmaster { struct timeval latest_graphite_interval_start; }; + +extern rwlock_t rtpe_callhash_lock; +extern GHashTable *rtpe_callhash; + + struct callmaster *callmaster_new(struct poller *); void callmaster_get_all_calls(struct callmaster *m, GQueue *q); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 4814d38c5..34ffb033e 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -812,10 +812,10 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i { rwlock_lock_r(&m->conf.config_lock); if (m->conf.max_sessions>=0) { - rwlock_lock_r(&m->hashlock); - if (g_hash_table_size(m->callhash) - + rwlock_lock_r(&rtpe_callhash_lock); + if (g_hash_table_size(rtpe_callhash) - atomic64_get(&m->stats.foreign_sessions) >= m->conf.max_sessions) { - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); /* foreign calls can't get rejected * total_rejected_sess applies only to "own" sessions */ atomic64_inc(&m->totalstats.total_rejected_sess); @@ -825,7 +825,7 @@ const char *call_offer_ng(bencode_item_t *input, struct callmaster *m, bencode_i rwlock_unlock_r(&m->conf.config_lock); return "Parallel session limit reached"; } - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); } rwlock_unlock_r(&m->conf.config_lock); @@ -1144,14 +1144,14 @@ static void ng_list_calls( struct callmaster *m, bencode_item_t *output, long lo GHashTableIter iter; gpointer key, value; - rwlock_lock_r(&m->hashlock); + rwlock_lock_r(&rtpe_callhash_lock); - g_hash_table_iter_init (&iter, m->callhash); + g_hash_table_iter_init (&iter, rtpe_callhash); while (limit-- && g_hash_table_iter_next (&iter, &key, &value)) { bencode_list_add_str_dup(output, key); } - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); } diff --git a/daemon/cli.c b/daemon/cli.c index ff7beb9f9..91047f67a 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -108,9 +108,9 @@ static void destroy_own_foreign_calls(struct callmaster *m, unsigned int foreign GList *i; // lock read - rwlock_lock_r(&m->hashlock); + rwlock_lock_r(&rtpe_callhash_lock); - g_hash_table_iter_init(&iter, m->callhash); + g_hash_table_iter_init(&iter, rtpe_callhash); while (g_hash_table_iter_next(&iter, &key, &value)) { c = (struct call*)value; if (!c) { @@ -135,7 +135,7 @@ static void destroy_own_foreign_calls(struct callmaster *m, unsigned int foreign } // unlock read - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); // destroy calls while ((c = g_queue_pop_head(&call_list))) { @@ -253,11 +253,11 @@ static void cli_incoming_list_totals(str *instr, struct callmaster* m, struct st } static void cli_incoming_list_numsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { - rwlock_lock_r(&m->hashlock); - streambuf_printf(replybuffer, "Current sessions own: "UINT64F"\n", g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); + rwlock_lock_r(&rtpe_callhash_lock); + streambuf_printf(replybuffer, "Current sessions own: "UINT64F"\n", g_hash_table_size(rtpe_callhash) - atomic64_get(&m->stats.foreign_sessions)); streambuf_printf(replybuffer, "Current sessions foreign: "UINT64F"\n", atomic64_get(&m->stats.foreign_sessions)); - streambuf_printf(replybuffer, "Current sessions total: %i\n", g_hash_table_size(m->callhash)); - rwlock_unlock_r(&m->hashlock); + streambuf_printf(replybuffer, "Current sessions total: %i\n", g_hash_table_size(rtpe_callhash)); + rwlock_unlock_r(&rtpe_callhash_lock); } static void cli_incoming_list_maxsessions(str *instr, struct callmaster* m, struct streambuf *replybuffer) { @@ -414,15 +414,15 @@ static void cli_incoming_list_sessions(str *instr, struct callmaster* m, struct return; } - rwlock_lock_r(&m->hashlock); + rwlock_lock_r(&rtpe_callhash_lock); - if (g_hash_table_size(m->callhash)==0) { + if (g_hash_table_size(rtpe_callhash)==0) { streambuf_printf(replybuffer, "No sessions on this media relay.\n"); - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); return; } - g_hash_table_iter_init (&iter, m->callhash); + g_hash_table_iter_init (&iter, rtpe_callhash); while (g_hash_table_iter_next(&iter, &key, &value)) { ptrkey = (str*)key; call = (struct call*)value; @@ -450,7 +450,7 @@ static void cli_incoming_list_sessions(str *instr, struct callmaster* m, struct streambuf_printf(replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created.tv_sec, call->created_from, call->redis_hosted_db, IS_FOREIGN_CALL(call)?"yes":"no"); } - rwlock_unlock_r(&m->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); if (str_cmp(instr, LIST_ALL) == 0) { ; diff --git a/daemon/graphite.c b/daemon/graphite.c index 4003d5eb0..d634fa831 100644 --- a/daemon/graphite.c +++ b/daemon/graphite.c @@ -131,17 +131,17 @@ int send_graphite_data(struct callmaster *cm, struct totalstats *sent_data) { ts->answer = timeval_clear_request_time(&cm->totalstats_interval.answer); ts->delete = timeval_clear_request_time(&cm->totalstats_interval.delete); - rwlock_lock_r(&cm->hashlock); + rwlock_lock_r(&rtpe_callhash_lock); mutex_lock(&cm->totalstats_interval.managed_sess_lock); ts->managed_sess_max = cm->totalstats_interval.managed_sess_max; ts->managed_sess_min = cm->totalstats_interval.managed_sess_min; - ts->total_sessions = g_hash_table_size(cm->callhash); + ts->total_sessions = g_hash_table_size(rtpe_callhash); ts->foreign_sessions = atomic64_get(&cm->stats.foreign_sessions); ts->own_sessions = ts->total_sessions - ts->foreign_sessions; cm->totalstats_interval.managed_sess_max = ts->own_sessions;; cm->totalstats_interval.managed_sess_min = ts->own_sessions; mutex_unlock(&cm->totalstats_interval.managed_sess_lock); - rwlock_unlock_r(&cm->hashlock); + rwlock_unlock_r(&rtpe_callhash_lock); // compute average offer/answer/delete time timeval_divide(&ts->offer.time_avg, &ts->offer.time_avg, ts->offer.count); diff --git a/daemon/statistics.c b/daemon/statistics.c index a785fd06b..af536550c 100644 --- a/daemon/statistics.c +++ b/daemon/statistics.c @@ -82,7 +82,7 @@ void statistics_update_foreignown_dec(struct call* c) { if(IS_OWN_CALL(c)) { mutex_lock(&m->totalstats_interval.managed_sess_lock); m->totalstats_interval.managed_sess_min = MIN(m->totalstats_interval.managed_sess_min, - g_hash_table_size(m->callhash) - atomic64_get(&m->stats.foreign_sessions)); + g_hash_table_size(rtpe_callhash) - atomic64_get(&m->stats.foreign_sessions)); mutex_unlock(&m->totalstats_interval.managed_sess_lock); } @@ -93,7 +93,7 @@ void statistics_update_foreignown_inc(struct callmaster *m, struct call* c) { mutex_lock(&m->totalstats_interval.managed_sess_lock); m->totalstats_interval.managed_sess_max = MAX( m->totalstats_interval.managed_sess_max, - g_hash_table_size(m->callhash) + g_hash_table_size(rtpe_callhash) - atomic64_get(&m->stats.foreign_sessions)); mutex_unlock(&m->totalstats_interval.managed_sess_lock); }