TT#170203 use linked list to manage acc records

Using an array in this context (to return a list of acc records) is
mostly pointless as it wastes memory and incurs the additional overhead
of having to initialise the array and an extra layer of copying strings
around. This also ultimately allows us to dynamically append to the list
of acc records without having to reallocate the array.

Change-Id: I1039f01861f8d3f82fdc3a80377fd7535fa24bab
mr11.0
Richard Fuchs 3 years ago
parent ca8cb2533c
commit 8537ef8c68

33
cdr.c

@ -7,7 +7,7 @@
#include "config.h"
#include "mediator.h"
static int cdr_create_cdrs(med_entry_t *records, uint64_t count,
static int cdr_create_cdrs(GQueue *records,
cdr_entry_t **cdrs, uint64_t *cdr_count, uint64_t *alloc_size, uint8_t *trash, int do_intermediate);
static const char* cdr_map_status(const char *sip_status)
@ -62,12 +62,11 @@ static void free_cdrs(cdr_entry_t **cdrs, uint64_t cdr_count) {
}
int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_count,
int cdr_process_records(GQueue *records, uint64_t *ext_count,
struct medmysql_batches *batches, int do_intermediate)
{
int ret = 0;
uint8_t trash = 0;
uint64_t i;
int timed_out = 0;
uint16_t msg_invites = 0;
@ -75,7 +74,8 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
uint16_t msg_unknowns = 0;
uint16_t invite_200 = 0;
char *callid = records[0].callid;
med_entry_t *first = g_queue_peek_head(records);
char *callid = first->callid;
int has_redis = 0, has_mysql = 0;
@ -84,9 +84,9 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
*ext_count = 0;
for(i = 0; i < count; ++i)
for (GList *l = records->head; l; l = l->next)
{
med_entry_t *e = &(records[i]);
med_entry_t *e = l->data;
if (e->timed_out)
timed_out = 1;
@ -140,7 +140,7 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
}
else
{
if(cdr_create_cdrs(records, count, &cdrs, &cdr_count, &alloc_size, &trash, do_intermediate) != 0)
if(cdr_create_cdrs(records, &cdrs, &cdr_count, &alloc_size, &trash, do_intermediate) != 0)
goto error;
else
{
@ -162,7 +162,7 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
{
if (has_redis)
{
if(medredis_backup_entries(records, count) != 0)
if(medredis_backup_entries(records) != 0)
goto error;
}
if (has_mysql)
@ -210,7 +210,7 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
{
if (has_redis)
{
if(medredis_trash_entries(records, count) != 0)
if(medredis_trash_entries(records) != 0)
goto error;
}
if (has_mysql)
@ -1387,7 +1387,7 @@ static cdr_entry_t *alloc_cdrs(uint64_t cdr_count) {
}
static int cdr_create_cdrs(med_entry_t *records, uint64_t count,
static int cdr_create_cdrs(GQueue *records,
cdr_entry_t **cdrs, uint64_t *cdr_count, uint64_t *alloc_size, uint8_t *trash, int do_intermediate)
{
uint64_t i = 0, cdr_index = 0;
@ -1404,9 +1404,9 @@ static int cdr_create_cdrs(med_entry_t *records, uint64_t count,
/* get end time from BYE's timestamp */
for(i = 0; i < count; ++i)
for (GList *l = records->head; l; l = l->next)
{
med_entry_t *e = &(records[i]);
med_entry_t *e = l->data;
if (e->timed_out)
timed_out = 1;
@ -1430,8 +1430,9 @@ static int cdr_create_cdrs(med_entry_t *records, uint64_t count,
if(invites == 0)
{
med_entry_t *e = g_queue_peek_head(records);
L_CRITICAL("No valid INVITEs for creating a cdr, internal error, callid='%s'",
records[0].callid);
e->callid);
return -1;
}
@ -1444,19 +1445,19 @@ static int cdr_create_cdrs(med_entry_t *records, uint64_t count,
}
*alloc_size = invites;
for(i = 0; i < count; ++i)
for (GList *l = records->head; l; l = l->next)
{
med_entry_t *e = NULL;
cdr_entry_t *cdr = NULL;
cdr = &(*cdrs)[cdr_index];
e = &(records[i]);
e = l->data;
if (!e->valid)
continue;
L_DEBUG("create cdr %lu of %lu in batch\n", i, count);
L_DEBUG("create cdr %" PRIu64 " of %u in batch\n", ++i, records->length);
call_status = cdr_map_status(e->sip_code);
if (timed_out)

@ -59,9 +59,8 @@ typedef struct {
#undef F
#undef FA
int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *cdr_count, struct medmysql_batches *,
int cdr_process_records(GQueue *records, uint64_t *cdr_count, struct medmysql_batches *,
int do_intermediate);
void cdr_fix_accids(med_entry_t *records, uint64_t count);
int cdr_fill_record(cdr_entry_t *cdr);
void cdr_set_provider(cdr_entry_t *cdr);

@ -39,6 +39,9 @@ static void med_free_cache_entry(void *p)
g_free(e->str_value);
g_slice_free1(sizeof(*e), e);
}
void med_entry_free(void *p) {
g_slice_free1(sizeof(med_entry_t), p);
}
/**********************************************************************/
static void mediator_create_caches(void)
@ -175,13 +178,29 @@ static uint64_t mediator_calc_runtime(struct timeval *tv_start, struct timeval *
(tv_start->tv_sec * 1000000 + tv_start->tv_usec)));
}
/**********************************************************************/
// appends the entire `two` list to the end of `one`, resetting `two` to empty
static void mediator_splice_gqueue(GQueue *one, GQueue *two) {
if (!two->head)
return;
if (!one->tail) {
*one = *two;
g_queue_init(two);
return;
}
one->tail->next = two->head;
two->head->prev = one->tail;
one->length += two->length;
g_queue_init(two);
}
/**********************************************************************/
int main(int argc, char **argv)
{
GQueue mysql_callids = G_QUEUE_INIT;
GQueue redis_callids = G_QUEUE_INIT;
med_entry_t *mysql_records, *redis_records;
uint64_t mysql_rec_count, redis_rec_count;
GQueue mysql_records = G_QUEUE_INIT;
GQueue redis_records = G_QUEUE_INIT;
uint64_t cdr_count, last_count;
int maprefresh;
struct medmysql_batches *batches;
@ -306,7 +325,9 @@ int main(int argc, char **argv)
g_queue_clear_full(&mysql_callids, g_free);
g_queue_clear_full(&redis_callids, g_free);
mysql_rec_count = redis_rec_count = cdr_count = 0;
g_queue_clear_full(&mysql_records, med_entry_free);
g_queue_clear_full(&redis_records, med_entry_free);
cdr_count = 0;
last_count = mediator_count;
success = medmysql_fetch_callids(&mysql_callids);
@ -342,45 +363,30 @@ int main(int argc, char **argv)
gettimeofday(&tv_start, NULL);
#endif
if(medmysql_fetch_records(mysql_callid, &mysql_records, &mysql_rec_count, 1) != 0)
if(medmysql_fetch_records(mysql_callid, &mysql_records, 1) != 0)
goto out;
if(medredis_fetch_records(mysql_callid, &redis_records, &redis_rec_count) == 0
&& redis_rec_count)
if(medredis_fetch_records(mysql_callid, &redis_records) == 0)
{
med_entry_t *mysql_new_records;
mysql_new_records = realloc(mysql_records, (mysql_rec_count + redis_rec_count) * sizeof(med_entry_t));
if (!mysql_new_records)
{
L_ERROR("Failed to realloc mysql_records\n");
break;
}
mysql_records = mysql_new_records;
memcpy(&mysql_records[mysql_rec_count], redis_records, redis_rec_count * sizeof(med_entry_t));
free(redis_records);
mysql_rec_count += redis_rec_count;
mediator_splice_gqueue(&mysql_records, &redis_records);
// only re-sort if records from Redis were added, as MySQL already does the sorting
records_sort(mysql_records, mysql_rec_count);
records_sort(&mysql_records);
}
int are_records_complete = records_complete(mysql_records, mysql_rec_count);
int are_records_complete = records_complete(&mysql_records);
if (!are_records_complete && !do_intermediate)
{
L_DEBUG("Found incomplete call with cid '%s', skipping...\n", mysql_callid);
free(mysql_records);
g_queue_clear_full(&mysql_records, med_entry_free);
continue;
}
if(cdr_process_records(mysql_records, mysql_rec_count, &cdr_count, batches, do_intermediate) != 0)
if(cdr_process_records(&mysql_records, &cdr_count, batches, do_intermediate) != 0)
goto out;
if(mysql_rec_count > 0)
{
free(mysql_records);
}
g_queue_clear_full(&mysql_records, med_entry_free);
mediator_count += cdr_count;
@ -403,46 +409,34 @@ int main(int argc, char **argv)
gettimeofday(&tv_start, NULL);
#endif
if(medredis_fetch_records(redis_callid, &redis_records, &redis_rec_count) != 0)
if(medredis_fetch_records(redis_callid, &redis_records) != 0)
goto out;
if(medmysql_fetch_records(redis_callid, &mysql_records, &mysql_rec_count, 0) == 0
&& mysql_rec_count)
if(medmysql_fetch_records(redis_callid, &mysql_records, 0) == 0)
{
med_entry_t *redis_new_records;
redis_new_records = realloc(redis_records, (mysql_rec_count + redis_rec_count) * sizeof(med_entry_t));
if (!redis_new_records)
{
L_ERROR("Failed to realloc redis_records\n");
break;
}
redis_records = redis_new_records;
memcpy(&redis_records[redis_rec_count], mysql_records, mysql_rec_count * sizeof(med_entry_t));
free(mysql_records);
redis_rec_count += mysql_rec_count;
mediator_splice_gqueue(&redis_records, &mysql_records);
}
// always sort records from Redis, regardless of whether records from MySQL were merged
records_sort(redis_records, redis_rec_count);
records_sort(&redis_records);
int are_records_complete = records_complete(redis_records, redis_rec_count);
int are_records_complete = records_complete(&redis_records);
if (!are_records_complete && !do_intermediate)
{
L_DEBUG("Found incomplete call with cid '%s', skipping...\n", redis_callid);
free(redis_records);
g_queue_clear_full(&redis_records, med_entry_free);
continue;
}
L_DEBUG("process cdr with cid '%s' and %"PRIu64" records\n", redis_callid, redis_rec_count);
L_DEBUG("process cdr with cid '%s' and %u records\n", redis_callid, redis_records.length);
if (redis_rec_count) {
if(cdr_process_records(redis_records, redis_rec_count, &cdr_count, batches, do_intermediate) != 0) {
free(redis_records);
if (redis_records.length) {
if(cdr_process_records(&redis_records, &cdr_count, batches, do_intermediate) != 0) {
g_queue_clear_full(&redis_records, med_entry_free);
goto out;
}
free(redis_records);
g_queue_clear_full(&redis_records, med_entry_free);
mediator_count += cdr_count;
}

@ -106,6 +106,8 @@ extern GHashTable *med_cdr_tag_table;
void critical(const char *);
void med_entry_free(void *p);
static inline int check_shutdown(void) {
if (mediator_shutdown) {
syslog(LOG_INFO, "Shutdown detected, aborting work in progress");

@ -561,14 +561,14 @@ static inline void medmysql_buf_escape_c(MYSQL *m, size_t *buflen, const char *s
#define BUFESCAPE(x) medmysql_buf_escape_c(med_handler->m, &buflen, x, sql_buffer + buflen, sql_buffer_size)
/**********************************************************************/
int medmysql_insert_records(med_entry_t *records, uint64_t count, const char *table)
int medmysql_insert_records(GQueue *records, const char *table)
{
char *sql_buffer = NULL;
size_t sql_buffer_size = (count + 1) * (sizeof(med_entry_t) + 64) + 256;
size_t sql_buffer_size = (records->length + 1) * (sizeof(med_entry_t) + 64) + 256;
size_t buflen = 0;
int ret = 0, entries = 0;
if (!count)
if (!records->length)
return 0;
sql_buffer = (char*)malloc(sql_buffer_size);
@ -580,8 +580,8 @@ int medmysql_insert_records(med_entry_t *records, uint64_t count, const char *ta
"(sip_code,sip_reason,method,callid,time,time_hires,src_leg,dst_leg,branch_id) VALUES ",
table);
for (uint64_t i = 0; i < count; ++i) {
med_entry_t *e = &(records[i]);
for (GList *l = records->head; l; l = l->next) {
med_entry_t *e = l->data;
// this is only used for inserting redis entries into mysql
if (!e->redis)
@ -670,21 +670,18 @@ gboolean medmysql_fetch_callids(GQueue *output)
/**********************************************************************/
int medmysql_fetch_records(char *callid,
med_entry_t **entries, uint64_t *count, int warn_empty)
GQueue *entries, int warn_empty)
{
MYSQL_RES *res;
MYSQL_ROW row;
size_t callid_len = strlen(callid);
char query[strlen(MED_FETCH_QUERY) + callid_len * 7 + 1];
size_t entry_size;
uint64_t i = 0;
int ret = 0;
int len;
unsigned long long count = 0;
char esc_callid[callid_len*2+1];
*count = 0;
mysql_real_escape_string(med_handler->m, esc_callid, callid, callid_len);
len = snprintf(query, sizeof(query), MED_FETCH_QUERY,
@ -705,8 +702,8 @@ int medmysql_fetch_records(char *callid,
}
res = mysql_store_result(med_handler->m);
*count = mysql_num_rows(res);
if(*count == 0)
count = mysql_num_rows(res);
if(count == 0)
{
if (warn_empty)
L_CRITICAL("No records found for callid '%s'!",
@ -715,21 +712,9 @@ int medmysql_fetch_records(char *callid,
goto out;
}
entry_size = (*count) * sizeof(med_entry_t);
*entries = (med_entry_t*)malloc(entry_size);
if(*entries == NULL)
{
L_CRITICAL("Error allocating memory for record entries: %s",
strerror(errno));
ret = -1;
goto out;
}
memset(*entries, 0, entry_size);
while((row = mysql_fetch_row(res)) != NULL)
{
med_entry_t *e = &(*entries)[i++];
med_entry_t *e = g_slice_alloc0(sizeof(*e));
g_strlcpy(e->sip_code, row[0], sizeof(e->sip_code));
g_strlcpy(e->sip_reason, row[1], sizeof(e->sip_reason));
@ -743,6 +728,8 @@ int medmysql_fetch_records(char *callid,
g_strlcpy(e->acc_ref, row[9], sizeof(e->acc_ref));
e->valid = 1;
g_queue_push_tail(entries, e);
if (check_shutdown())
return -1;
}

@ -53,7 +53,7 @@ struct medmysql_call_stat_info_t {
int medmysql_init(void);
void medmysql_cleanup(void);
gboolean medmysql_fetch_callids(GQueue *output);
int medmysql_fetch_records(char *callid, med_entry_t **entries, uint64_t *count, int warn_empty);
int medmysql_fetch_records(char *callid, GQueue *entries, int warn_empty);
int medmysql_trash_entries(const char *callid, struct medmysql_batches *);
int medmysql_backup_entries(const char *callid, struct medmysql_batches *);
int medmysql_delete_entries(const char *callid, struct medmysql_batches *);
@ -66,7 +66,7 @@ int medmysql_load_cdr_tag_ids(GHashTable *cdr_tag_table);
int medmysql_batch_start(struct medmysql_batches *);
int medmysql_batch_end(struct medmysql_batches *);
int medmysql_update_call_stat_info(const char *call_code, const double start_time);
int medmysql_insert_records(med_entry_t *records, uint64_t count, const char *table);
int medmysql_insert_records(GQueue *records, const char *table);
void medmysql_cache_cleanup(GHashTable *);
#endif /* _MED_MYSQL_H */

@ -350,8 +350,7 @@ static med_entry_t *medredis_reply_to_entry(redisReply *reply, const char* cid,
return NULL;
}
entry = (med_entry_t*)malloc(sizeof(med_entry_t));
memset(entry, 0, sizeof(med_entry_t));
entry = g_slice_alloc0(sizeof(med_entry_t));
entry->valid = 1;
entry->redis = 1;
@ -612,7 +611,7 @@ static void medredis_free_keys_list(gpointer data) {
/**********************************************************************/
int medredis_fetch_records(char *callid,
med_entry_t **entries, uint64_t *count) {
GQueue *entries) {
/*
@ -632,7 +631,6 @@ int medredis_fetch_records(char *callid,
redisReply *reply = NULL;
char *cids[4];
GList *records;
GList *keys;
size_t i;
@ -640,7 +638,6 @@ int medredis_fetch_records(char *callid,
cid_set_argc = 2;
cid_set_argv[0] = "SMEMBERS";
records = NULL;
keys = NULL;
@ -669,8 +666,7 @@ int medredis_fetch_records(char *callid,
goto err;
}
*count = 0;
*entries = NULL;
g_queue_clear_full(entries, med_entry_free);
L_DEBUG("Fetching records from redis\n");
@ -742,34 +738,9 @@ int medredis_fetch_records(char *callid,
continue;
}
medredis_free_reply(&reply);
records = g_list_prepend(records, e);
(*count)++;
g_queue_push_head(entries, e);
}
if (!*count)
goto no_entries;
*entries = (med_entry_t*)malloc(*count * sizeof(med_entry_t));
if (!*entries) {
L_ERROR("Failed to allocate memory for entries (cid '%s')\n", callid);
goto err;
}
i = 0;
for (GList *l = records; l; l = l->next) {
med_entry_t *s = (med_entry_t*)l->data;
med_entry_t *d = &(*entries)[i++];
L_DEBUG("Copying record with cid='%s', method='%s', code='%s'",
s->callid, s->sip_method, s->sip_code);
memcpy(d, s, sizeof(med_entry_t));
free(s);
L_DEBUG("Added entry with cid '%s' and method '%s'\n", d->callid, d->sip_method);
}
no_entries:
g_list_free(records);
g_list_free_full(keys, medredis_free_keys_list);
medredis_consume_replies();
@ -780,13 +751,11 @@ no_entries:
err:
if (reply)
freeReplyObject(reply);
*count = (uint64_t) -1;
for (i = 0; i < 4; ++i) {
if (cids[i])
free(cids[i]);
}
if (*entries)
free(*entries);
g_queue_clear_full(entries, med_entry_free);
medredis_consume_replies();
return -1;
@ -794,16 +763,16 @@ err:
}
/**********************************************************************/
static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const char *table) {
static int medredis_cleanup_entries(GQueue *records, const char *table) {
char buffer[512];
if (medmysql_insert_records(records, count, table) != 0) {
if (medmysql_insert_records(records, table) != 0) {
L_CRITICAL("Failed to cleanup redis records\n");
goto err;
}
for (uint64_t i = 0; i < count; ++i) {
med_entry_t *e = &(records[i]);
for (GList *l = records->head; l; l = l->next) {
med_entry_t *e = l->data;
if (!e->redis)
continue;
@ -845,11 +814,11 @@ err:
}
/**********************************************************************/
int medredis_trash_entries(med_entry_t *records, uint64_t count) {
return medredis_cleanup_entries(records, count, "trash");
int medredis_trash_entries(GQueue *records) {
return medredis_cleanup_entries(records, "trash");
}
/**********************************************************************/
int medredis_backup_entries(med_entry_t *records, uint64_t count) {
return medredis_cleanup_entries(records, count, "backup");
int medredis_backup_entries(GQueue *records) {
return medredis_cleanup_entries(records, "backup");
}

@ -9,8 +9,8 @@
int medredis_init(void);
void medredis_cleanup(void);
gboolean medredis_fetch_callids(GQueue *output);
int medredis_fetch_records(char *callid, med_entry_t **entries, uint64_t *count);
int medredis_trash_entries(med_entry_t *records, uint64_t count);
int medredis_backup_entries(med_entry_t *records, uint64_t count);
int medredis_fetch_records(char *callid, GQueue *entries);
int medredis_trash_entries(GQueue *records);
int medredis_backup_entries(GQueue *records);
#endif /* _MED_REDIS_H */

@ -1,5 +1,6 @@
#include "records.h"
#include "config.h"
#include <glib.h>
#include <time.h>
#define comp_ret(a_var, b_var) \
@ -12,7 +13,7 @@
return 1; \
} while (0)
static int records_sort_func(const void *aa, const void *bb)
static int records_sort_func(const void *aa, const void *bb, __attribute__ ((unused)) void *dummy)
{
const med_entry_t *a = aa;
const med_entry_t *b = bb;
@ -22,28 +23,29 @@ static int records_sort_func(const void *aa, const void *bb)
return 0;
}
void records_sort(med_entry_t *records, uint64_t count)
void records_sort(GQueue *records)
{
qsort(records, count, sizeof(med_entry_t), records_sort_func);
g_queue_sort(records, records_sort_func, NULL);
}
int records_complete(med_entry_t *records, uint64_t count)
int records_complete(GQueue *records)
{
uint8_t has_bye = 0;
uint8_t has_inv_200 = 0;
// if our records are old enough, we always consider them complete
if (count && config_max_acc_age)
if (records->head && config_max_acc_age)
{
if (time(NULL) - records[0].unix_timestamp > config_max_acc_age) {
records[0].timed_out = 1;
med_entry_t *r = g_queue_peek_head(records);
if (time(NULL) - r->unix_timestamp > config_max_acc_age) {
r->timed_out = 1;
return 1;
}
}
for (uint64_t i = 0; i < count; i++)
for (GList *l = records->head; l; l = l->next)
{
med_entry_t *s = &records[i];
med_entry_t *s = l->data;
if (s->sip_method[0] == 'I' && s->sip_method[1] == 'N' && s->sip_method[2] == 'V' &&
s->sip_code[0] == '2') {
has_inv_200 |= 1;

@ -2,8 +2,9 @@
#define _RECORDS_H
#include "mediator.h"
#include <glib.h>
void records_sort(med_entry_t *records, uint64_t count);
int records_complete(med_entry_t *records, uint64_t count);
void records_sort(GQueue *records);
int records_complete(GQueue *records);
#endif

Loading…
Cancel
Save