TT#150050 remove full subscriber/UUID mapping

As the number of subscribers grows, the current approach of doing a full
table dump of the subscribers DB and caching it in memory becomes less
and less feasible. The new approach is to simply do a straight DB query
for each subscriber as records are processed, and then cache the result
in memory for a little while.

Change-Id: I19a6271d779bd0abccc29e3548e7bcdb2e00baa3
mr10.3
Richard Fuchs 3 years ago
parent 0d3c2c2eed
commit 22dadc8002

@ -1556,7 +1556,7 @@ void cdr_set_provider(cdr_entry_t *cdr)
if(strcmp("0", cdr->source_user_id->str) != 0) if(strcmp("0", cdr->source_user_id->str) != 0)
{ {
if((val = g_hash_table_lookup(med_uuid_table, cdr->source_user_id->str)) != NULL) if((val = medmysql_lookup_uuid(cdr->source_user_id->str)) != NULL)
{ {
g_string_assign(cdr->source_provider_id, val); g_string_assign(cdr->source_provider_id, val);
} }
@ -1586,7 +1586,7 @@ void cdr_set_provider(cdr_entry_t *cdr)
if(strcmp("0", cdr->destination_user_id->str) != 0) if(strcmp("0", cdr->destination_user_id->str) != 0)
{ {
if((val = g_hash_table_lookup(med_uuid_table, cdr->destination_user_id->str)) != NULL) if((val = medmysql_lookup_uuid(cdr->destination_user_id->str)) != NULL)
{ {
g_string_assign(cdr->destination_provider_id, val); g_string_assign(cdr->destination_provider_id, val);
} }

@ -28,23 +28,33 @@ static time_t next_intermediate_run = 0;
GHashTable *med_peer_ip_table = NULL; GHashTable *med_peer_ip_table = NULL;
GHashTable *med_peer_host_table = NULL; GHashTable *med_peer_host_table = NULL;
GHashTable *med_peer_id_table = NULL; GHashTable *med_peer_id_table = NULL;
GHashTable *med_uuid_table = NULL; GHashTable *med_uuid_cache = NULL;
GHashTable *med_call_stat_info_table = NULL; GHashTable *med_call_stat_info_table = NULL;
GHashTable *med_cdr_tag_table = NULL; GHashTable *med_cdr_tag_table = NULL;
static void med_free_cache_entry(void *p)
{
med_cache_entry_t *e = p;
g_free(e->str_value);
g_slice_free1(sizeof(*e), e);
}
/**********************************************************************/ /**********************************************************************/
static void mediator_create_caches(void)
{
med_uuid_cache = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, med_free_cache_entry);
}
static int mediator_load_maps(void) static int mediator_load_maps(void)
{ {
med_peer_ip_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); med_peer_ip_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
med_peer_host_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); med_peer_host_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
med_peer_id_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free); med_peer_id_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
med_uuid_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, free);
med_cdr_tag_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, NULL); med_cdr_tag_table = g_hash_table_new_full(g_str_hash, g_str_equal, free, NULL);
if(medmysql_load_maps(med_peer_ip_table, med_peer_host_table, med_peer_id_table)) if(medmysql_load_maps(med_peer_ip_table, med_peer_host_table, med_peer_id_table))
return -1; return -1;
if(medmysql_load_uuids(med_uuid_table))
return -1;
if (medmysql_load_db_ids()) if (medmysql_load_db_ids())
return -1; return -1;
if (medmysql_load_cdr_tag_ids(med_cdr_tag_table)) if (medmysql_load_cdr_tag_ids(med_cdr_tag_table))
@ -60,6 +70,17 @@ static void mediator_print_mapentry(gpointer key, gpointer value, gpointer d __a
} }
/**********************************************************************/ /**********************************************************************/
static void mediator_destroy_caches(void)
{
g_hash_table_destroy(med_uuid_cache);
med_uuid_cache = NULL;
}
static void mediator_cleanup_caches(void)
{
medmysql_cache_cleanup(med_uuid_cache);
}
static void mediator_destroy_maps(void) static void mediator_destroy_maps(void)
{ {
if(med_peer_ip_table) if(med_peer_ip_table)
@ -68,8 +89,6 @@ static void mediator_destroy_maps(void)
g_hash_table_destroy(med_peer_host_table); g_hash_table_destroy(med_peer_host_table);
if(med_peer_id_table) if(med_peer_id_table)
g_hash_table_destroy(med_peer_id_table); g_hash_table_destroy(med_peer_id_table);
if(med_uuid_table)
g_hash_table_destroy(med_uuid_table);
if(med_call_stat_info_table) if(med_call_stat_info_table)
g_hash_table_destroy(med_call_stat_info_table); g_hash_table_destroy(med_call_stat_info_table);
if(med_cdr_tag_table) if(med_cdr_tag_table)
@ -78,7 +97,6 @@ static void mediator_destroy_maps(void)
med_peer_ip_table = NULL; med_peer_ip_table = NULL;
med_peer_host_table = NULL; med_peer_host_table = NULL;
med_peer_id_table = NULL; med_peer_id_table = NULL;
med_uuid_table = NULL;
med_call_stat_info_table = NULL; med_call_stat_info_table = NULL;
med_cdr_tag_table = NULL; med_cdr_tag_table = NULL;
} }
@ -92,8 +110,6 @@ static void mediator_print_maps(void)
g_hash_table_foreach(med_peer_host_table, mediator_print_mapentry, NULL); g_hash_table_foreach(med_peer_host_table, mediator_print_mapentry, NULL);
L_DEBUG("Peer ID map:"); L_DEBUG("Peer ID map:");
g_hash_table_foreach(med_peer_id_table, mediator_print_mapentry, NULL); g_hash_table_foreach(med_peer_id_table, mediator_print_mapentry, NULL);
L_DEBUG("UUID map:");
g_hash_table_foreach(med_uuid_table, mediator_print_mapentry, NULL);
L_DEBUG("TAGS map:"); L_DEBUG("TAGS map:");
g_hash_table_foreach(med_cdr_tag_table, mediator_print_mapentry, NULL); g_hash_table_foreach(med_cdr_tag_table, mediator_print_mapentry, NULL);
} }
@ -254,6 +270,8 @@ int main(int argc, char **argv)
return -1; return -1;
} }
mediator_create_caches();
while(!mediator_shutdown) while(!mediator_shutdown)
{ {
L_DEBUG("Starting mediation loop\n"); L_DEBUG("Starting mediation loop\n");
@ -265,6 +283,7 @@ int main(int argc, char **argv)
{ {
break; break;
} }
mediator_cleanup_caches();
maprefresh = 10; maprefresh = 10;
} }
--maprefresh; --maprefresh;
@ -466,6 +485,7 @@ out:
sd_notify(0, "STOPPING=1\n"); sd_notify(0, "STOPPING=1\n");
mediator_destroy_maps(); mediator_destroy_maps();
mediator_destroy_caches();
medmysql_cleanup(); medmysql_cleanup();
medredis_cleanup(); medredis_cleanup();
free(batches); free(batches);

@ -101,10 +101,15 @@ typedef struct {
char value[256]; char value[256];
} med_callid_t; } med_callid_t;
typedef struct {
char *str_value;
time_t created;
} med_cache_entry_t;
extern GHashTable *med_peer_host_table; extern GHashTable *med_peer_host_table;
extern GHashTable *med_peer_ip_table; extern GHashTable *med_peer_ip_table;
extern GHashTable *med_peer_id_table; extern GHashTable *med_peer_id_table;
extern GHashTable *med_uuid_table; extern GHashTable *med_uuid_cache;
extern GHashTable *med_call_stat_info_table; extern GHashTable *med_call_stat_info_table;
extern GHashTable *med_cdr_tag_table; extern GHashTable *med_cdr_tag_table;

@ -14,6 +14,8 @@
#define _TEST_SIMULATE_SQL_ERRORS 0 #define _TEST_SIMULATE_SQL_ERRORS 0
#define MED_CACHE_DURATION 30
#define MED_SQL_BUF_LEN(fixed_string_len, escaped_string_len) \ #define MED_SQL_BUF_LEN(fixed_string_len, escaped_string_len) \
(fixed_string_len + 7 + 2 + (escaped_string_len) * 2 + 1) // _latin1'STRING'\0 (fixed_string_len + 7 + 2 + (escaped_string_len) * 2 + 1) // _latin1'STRING'\0
@ -43,9 +45,6 @@
#define MED_LOAD_PEER_QUERY "select h.ip, h.host, g.peering_contract_id, h.id " \ #define MED_LOAD_PEER_QUERY "select h.ip, h.host, g.peering_contract_id, h.id " \
"from provisioning.voip_peer_hosts h, provisioning.voip_peer_groups g " \ "from provisioning.voip_peer_hosts h, provisioning.voip_peer_groups g " \
"where g.id = h.group_id" "where g.id = h.group_id"
#define MED_LOAD_UUID_QUERY "select vs.uuid, r.contract_id from billing.voip_subscribers vs, " \
"billing.contracts c, billing.contacts ct, billing.resellers r where c.id = vs.contract_id and " \
"c.contact_id = ct.id and ct.reseller_id = r.id"
#define MED_LOAD_CDR_TAG_IDS_QUERY "select id, type from accounting.cdr_tag" #define MED_LOAD_CDR_TAG_IDS_QUERY "select id, type from accounting.cdr_tag"
@ -1297,50 +1296,111 @@ out:
} }
/**********************************************************************/ /**********************************************************************/
int medmysql_load_uuids(GHashTable *uuid_table) static char *medmysql_lookup_cache_str(GHashTable *table, const char *key)
{ {
MYSQL_RES *res; med_cache_entry_t *e = g_hash_table_lookup(table, key);
MYSQL_ROW row; if (!e)
int ret = 0; return NULL;
/* char query[1024] = ""; */ if (time(NULL) - e->created > MED_CACHE_DURATION) {
gpointer key; g_hash_table_remove(table, key);
char *provider_id; return NULL;
}
L_DEBUG("Returning cached map entry: '%s' -> '%s'", key, e->str_value);
return e->str_value;
}
/* snprintf(query, sizeof(query), MED_LOAD_UUID_QUERY); */
/* L_DEBUG("q='%s'", query); */ void medmysql_cache_cleanup(GHashTable *table)
if(medmysql_query_wrapper(prov_handler, MED_LOAD_UUID_QUERY, strlen(MED_LOAD_UUID_QUERY)) != 0)
{ {
L_CRITICAL("Error loading uuids: %s", GHashTableIter iter;
mysql_error(prov_handler->m)); g_hash_table_iter_init(&iter, table);
return -1; gpointer value;
time_t expiry = time(NULL) - MED_CACHE_DURATION;
while (g_hash_table_iter_next(&iter, NULL, &value)) {
med_cache_entry_t *e = value;
if (e->created > expiry)
continue;
g_hash_table_iter_remove(&iter);
}
} }
res = mysql_store_result(prov_handler->m);
while((row = mysql_fetch_row(res)) != NULL) static void medmysql_insert_cache_str(GHashTable *table, const char *key, char *val)
{
if(row[0] == NULL || row[1] == NULL)
{ {
L_CRITICAL("Error loading uuids, a column is NULL"); med_cache_entry_t *e = g_slice_alloc0(sizeof(*e));
ret = -1; e->created = time(NULL);
goto out; e->str_value = val; // allocated per g_strdup
g_hash_table_replace(table, g_strdup(key), e);
} }
provider_id = strdup(row[1]);
if(provider_id == NULL) static char *medmysql_select_one(medmysql_handler *mysql, const char *query, size_t query_len)
{ {
L_CRITICAL("Error allocating provider id memory: %s", strerror(errno)); int err = medmysql_query_wrapper(mysql, query, query_len);
ret = -1; if (err != 0) {
goto out; L_ERROR("Error executing query '%.*s': %s",
(int) query_len, query, mysql_error(mysql->m));
return NULL;
} }
key = (gpointer)g_strdup(row[0]); char *ret = NULL;
g_hash_table_insert(uuid_table, key, provider_id);
MYSQL_RES *res = mysql_store_result(mysql->m);
unsigned long long rows = mysql_num_rows(res);
if (rows > 0) {
if (rows > 1)
L_WARNING("More than one row returned from query '%.*s' (%llu rows)",
(int) query_len, query, rows);
MYSQL_ROW row = mysql_fetch_row(res);
if (!row) {
L_ERROR("Failed to fetch row from query '%.*s': %s",
(int) query_len, query, mysql_error(mysql->m));
}
else {
const char *val = row[0];
if (!val)
L_ERROR("NULL value returned from query '%.*s'",
(int) query_len, query);
else
ret = g_strdup(val);
}
} }
out:
mysql_free_result(res); mysql_free_result(res);
return ret;
}
char *medmysql_lookup_uuid(const char *uuid)
{
char *ret = medmysql_lookup_cache_str(med_uuid_cache, uuid);
if (ret)
return ret;
// query DB
static const char *query_prefix = "SELECT r.contract_id FROM billing.voip_subscribers vs, " \
"billing.contracts c, billing.contacts ct, billing.resellers r WHERE c.id = vs.contract_id AND " \
"c.contact_id = ct.id AND ct.reseller_id = r.id AND vs.uuid = ";
size_t query_len = strlen(query_prefix); // compile-time constant
char query[MED_SQL_BUF_LEN(query_len, strlen(uuid))];
strcpy(query, query_prefix);
char *buf_ptr = query + query_len;
medmysql_buf_escape_c(prov_handler->m, &query_len, uuid, buf_ptr, sizeof(query));
L_DEBUG("UUID lookup with query: '%.*s'\n", (int) query_len, query);
ret = medmysql_select_one(prov_handler, query, query_len);
if (!ret)
return NULL;
L_DEBUG("UUID query returned: '%.*s' -> '%s'\n", (int) query_len, query, ret);
medmysql_insert_cache_str(med_uuid_cache, uuid, ret);
return ret; return ret;
} }

@ -60,12 +60,13 @@ int medmysql_delete_entries(const char *callid, struct medmysql_batches *);
int medmysql_insert_cdrs(cdr_entry_t *records, uint64_t count, struct medmysql_batches *); int medmysql_insert_cdrs(cdr_entry_t *records, uint64_t count, struct medmysql_batches *);
int medmysql_delete_intermediate(cdr_entry_t *records, uint64_t count, struct medmysql_batches *); int medmysql_delete_intermediate(cdr_entry_t *records, uint64_t count, struct medmysql_batches *);
int medmysql_load_maps(GHashTable *ip_table, GHashTable *host_table, GHashTable *id_table); int medmysql_load_maps(GHashTable *ip_table, GHashTable *host_table, GHashTable *id_table);
int medmysql_load_uuids(GHashTable *uuid_table); char *medmysql_lookup_uuid(const char *uuid);
int medmysql_load_db_ids(void); int medmysql_load_db_ids(void);
int medmysql_load_cdr_tag_ids(GHashTable *cdr_tag_table); int medmysql_load_cdr_tag_ids(GHashTable *cdr_tag_table);
int medmysql_batch_start(struct medmysql_batches *); int medmysql_batch_start(struct medmysql_batches *);
int medmysql_batch_end(struct medmysql_batches *); int medmysql_batch_end(struct medmysql_batches *);
int medmysql_update_call_stat_info(const char *call_code, const double start_time); int medmysql_update_call_stat_info(const char *call_code, const double start_time);
int medmysql_insert_records(med_entry_t *records, uint64_t count, const char *table); int medmysql_insert_records(med_entry_t *records, uint64_t count, const char *table);
void medmysql_cache_cleanup(GHashTable *);
#endif /* _MED_MYSQL_H */ #endif /* _MED_MYSQL_H */

Loading…
Cancel
Save