TT#37553 always combine acc records from mysql and redis

Change-Id: I0d1fe416fc0e9168a2fe8b25fd40177d6e8b5d3b
changes/58/22758/7
Richard Fuchs 7 years ago
parent 3e80aca320
commit 094ef0b6e8

17
cdr.c

@ -52,7 +52,7 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
uint16_t invite_200 = 0;
char *callid = records[0].callid;
uint8_t redis = records[0].redis;
int has_redis = 0, has_mysql = 0;
cdr_entry_t *cdrs = NULL;
@ -85,7 +85,12 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
}
if (check_shutdown())
return -1;
return 1;
if (e->redis)
has_redis = 1;
else
has_mysql = 1;
}
L_DEBUG("%d INVITEs, %d BYEs, %d unrecognized", msg_invites, msg_byes, msg_unknowns);
@ -120,12 +125,12 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
}
else
{
if (redis)
if (has_redis)
{
if(medredis_backup_entries(records, count) != 0)
goto error;
}
else
if (has_mysql)
{
if(medmysql_backup_entries(callid, batches) != 0)
goto error;
@ -158,12 +163,12 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
if(trash)
{
if (redis)
if (has_redis)
{
if(medredis_trash_entries(records, count) != 0)
goto error;
}
else
if (has_mysql)
{
if(medmysql_trash_entries(callid, batches) != 0)
goto error;

@ -17,6 +17,7 @@
#include "medmysql.h"
#include "medredis.h"
#include "cdr.h"
#include "records.h"
sig_atomic_t mediator_shutdown = 0;
int mediator_lockfd = -1;
@ -161,8 +162,8 @@ int main(int argc, char **argv)
{
med_callid_t *mysql_callids;
med_callid_t *redis_callids;
med_entry_t *records;
uint64_t mysql_id_count, redis_id_count, rec_count, i;
med_entry_t *mysql_records, *redis_records;
uint64_t mysql_id_count, redis_id_count, mysql_rec_count, redis_rec_count, i;
uint64_t cdr_count, last_count;
int maprefresh;
struct medmysql_batches *batches;
@ -267,7 +268,7 @@ int main(int argc, char **argv)
if (0)
mediator_print_maps();
mysql_id_count = redis_id_count = rec_count = cdr_count = 0;
mysql_id_count = redis_id_count = mysql_rec_count = redis_rec_count = cdr_count = 0;
last_count = mediator_count;
mysql_callids = medmysql_fetch_callids(&mysql_id_count);
@ -302,15 +303,39 @@ int main(int argc, char **argv)
gettimeofday(&tv_start, NULL);
#endif
if(medmysql_fetch_records(&(mysql_callids[i]), &records, &rec_count) != 0)
if(medmysql_fetch_records(&(mysql_callids[i]), &mysql_records, &mysql_rec_count) != 0)
goto out;
if(cdr_process_records(records, rec_count, &cdr_count, batches) != 0)
if(medredis_fetch_records(&(mysql_callids[i]), &redis_records, &redis_rec_count) == 0
&& redis_rec_count)
{
mysql_records = realloc(mysql_records, (mysql_rec_count + redis_rec_count) * sizeof(med_entry_t));
if (!mysql_records)
{
L_ERROR("Failed to realloc mysql_records\n");
break;
}
memcpy(&mysql_records[mysql_rec_count], redis_records, redis_rec_count * sizeof(med_entry_t));
free(redis_records);
mysql_rec_count += redis_rec_count;
// only re-sort if records from Redis were added, as MySQL already does the sorting
records_sort(mysql_records, mysql_rec_count);
}
if (!records_complete(mysql_records, mysql_rec_count))
{
L_DEBUG("Found incomplete call with cid '%s', skipping...\n", mysql_callids[i].value);
free(mysql_records);
continue;
}
if(cdr_process_records(mysql_records, mysql_rec_count, &cdr_count, batches) != 0)
goto out;
if(rec_count > 0)
if(mysql_rec_count > 0)
{
free(records);
free(mysql_records);
}
mediator_count += cdr_count;
@ -336,17 +361,41 @@ int main(int argc, char **argv)
gettimeofday(&tv_start, NULL);
#endif
if(medredis_fetch_records(&(redis_callids[i]), &records, &rec_count) != 0)
if(medredis_fetch_records(&(redis_callids[i]), &redis_records, &redis_rec_count) != 0)
goto out;
L_DEBUG("process cdr with cid '%s' and %"PRIu64" records\n", redis_callids[i].value, rec_count);
if(medmysql_fetch_records(&(redis_callids[i]), &mysql_records, &mysql_rec_count) == 0
&& mysql_rec_count)
{
redis_records = realloc(redis_records, (mysql_rec_count + redis_rec_count) * sizeof(med_entry_t));
if (!redis_records)
{
L_ERROR("Failed to realloc redis_records\n");
break;
}
memcpy(&redis_records[redis_rec_count], mysql_records, mysql_rec_count * sizeof(med_entry_t));
free(mysql_records);
redis_rec_count += mysql_rec_count;
}
// always sort records from Redis, regardless of whether records from MySQL were merged
records_sort(redis_records, redis_rec_count);
if (!records_complete(redis_records, redis_rec_count))
{
L_DEBUG("Found incomplete call with cid '%s', skipping...\n", redis_callids[i].value);
free(redis_records);
continue;
}
L_DEBUG("process cdr with cid '%s' and %"PRIu64" records\n", redis_callids[i].value, redis_rec_count);
if (rec_count) {
if(cdr_process_records(records, rec_count, &cdr_count, batches) != 0) {
free(records);
if (redis_rec_count) {
if(cdr_process_records(redis_records, redis_rec_count, &cdr_count, batches) != 0) {
free(redis_records);
goto out;
}
free(records);
free(redis_records);
mediator_count += cdr_count;
}

@ -12,23 +12,6 @@
#define MED_CALLID_QUERY "select a.callid from acc a" \
" where a.method = 'INVITE' " \
" and (a.sip_code != '200' " \
" OR EXISTS " \
" (select b.id from acc b " \
" where b.callid = a.callid " \
" and b.method = 'BYE' " \
" limit 1) " \
" OR EXISTS " \
" (select b.id from acc b " \
" where b.callid = concat(a.callid, '"PBXSUFFIX"') " \
" and b.method = 'BYE' " \
" limit 1) " \
" OR EXISTS " \
" (select b.id from acc b " \
" where b.callid = concat(a.callid, '"XFERSUFFIX"') " \
" and b.method = 'BYE' " \
" limit 1) " \
" ) " \
" group by a.callid limit 0,200000"
#define MED_FETCH_QUERY "(select distinct sip_code, sip_reason, method, callid, time, time_hires, " \
@ -450,6 +433,10 @@ int medmysql_insert_records(med_entry_t *records, uint64_t count, const char *ta
for (uint64_t i = 0; i < count; ++i) {
med_entry_t *e = &(records[i]);
// this is only used for inserting redis entries into mysql
if (!e->redis)
continue;
snprintf(entry_buffer, sizeof(entry_buffer), "('%s','%s','%s','%s','%s','%f','%s','%s'),",
e->sip_code, e->sip_reason, e->sip_method, e->callid, e->timestamp, e->unix_timestamp, e->src_leg, e->dst_leg);
sql_buffer = strcat(sql_buffer, entry_buffer);

@ -493,19 +493,6 @@ static void medredis_append_key(gpointer data, gpointer user_data) {
free(key);
}
/**********************************************************************/
static gint medredis_sort_entry(gconstpointer a, gconstpointer b) {
med_entry_t *aa = (med_entry_t*)a;
med_entry_t *bb = (med_entry_t*)b;
if (aa->unix_timestamp == bb->unix_timestamp)
return 0;
else if (aa->unix_timestamp < bb->unix_timestamp)
return -1;
else
return 1;
}
/**********************************************************************/
int medredis_fetch_records(med_callid_t *callid,
med_entry_t **entries, uint64_t *count) {
@ -531,8 +518,6 @@ int medredis_fetch_records(med_callid_t *callid,
GList *keys;
size_t i;
uint8_t has_bye = 0;
uint8_t has_inv_200 = 0;
cid_set_argc = 2;
cid_set_argv[0] = "SMEMBERS";
@ -629,11 +614,11 @@ int medredis_fetch_records(med_callid_t *callid,
goto err;
}
medredis_free_reply(&reply);
records = g_list_insert_sorted(records, e, medredis_sort_entry);
records = g_list_prepend(records, e);
(*count)++;
} while(1);
*count = g_list_length(records);
*entries = (med_entry_t*)malloc(*count * sizeof(med_entry_t));
if (!*entries) {
L_ERROR("Failed to allocate memory for entries\n");
@ -647,26 +632,12 @@ int medredis_fetch_records(med_callid_t *callid,
L_DEBUG("Copying record with cid='%s', method='%s', code='%s'",
s->callid, s->sip_method, s->sip_code);
if (s->sip_method[0] == 'I' && s->sip_method[1] == 'N' && s->sip_method[2] == 'V' &&
s->sip_code[0] == '2') {
has_inv_200 |= 1;
} else if (s->sip_method[0] == 'B' && s->sip_method[1] == 'Y' && s->sip_method[2] == 'E') {
has_bye |= 1;
}
memcpy(d, s, sizeof(med_entry_t));
free(s);
L_DEBUG("Added entry with cid '%s' and method '%s'\n", d->callid, d->sip_method);
}
if (has_inv_200 && !has_bye) {
L_DEBUG("Found incomplete call with cid '%s', skipping...\n", callid->value);
free(*entries);
*entries = NULL;
*count = 0;
}
g_list_free(records);
g_list_free(keys);
@ -703,6 +674,8 @@ static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const
for (uint64_t i = 0; i < count; ++i) {
med_entry_t *e = &(records[i]);
if (!e->redis)
continue;
L_DEBUG("Cleaning up redis entry for %s:%f\n", e->callid, e->unix_timestamp);

@ -0,0 +1,47 @@
#include "records.h"
#include "config.h"
#define comp_ret(a_var, b_var) \
do { \
__auto_type x_a = a_var; \
__auto_type x_b = b_var; \
if (x_a < x_b) \
return -1; \
if (x_a > x_b) \
return 1; \
} while (0)
static int records_sort_func(const void *aa, const void *bb)
{
const med_entry_t *a = aa;
const med_entry_t *b = bb;
comp_ret(strlen(a->callid), strlen(b->callid));
comp_ret(a->unix_timestamp, b->unix_timestamp);
return 0;
}
void records_sort(med_entry_t *records, uint64_t count)
{
qsort(records, count, sizeof(med_entry_t), records_sort_func);
}
int records_complete(med_entry_t *records, uint64_t count)
{
uint8_t has_bye = 0;
uint8_t has_inv_200 = 0;
for (uint64_t i = 0; i < count; i++)
{
med_entry_t *s = &records[i];
if (s->sip_method[0] == 'I' && s->sip_method[1] == 'N' && s->sip_method[2] == 'V' &&
s->sip_code[0] == '2') {
has_inv_200 |= 1;
} else if (s->sip_method[0] == 'B' && s->sip_method[1] == 'Y' && s->sip_method[2] == 'E') {
has_bye |= 1;
}
}
if (has_inv_200 && !has_bye)
return 0;
return 1;
}

@ -0,0 +1,9 @@
#ifndef _RECORDS_H
#define _RECORDS_H
#include "mediator.h"
void records_sort(med_entry_t *records, uint64_t count);
int records_complete(med_entry_t *records, uint64_t count);
#endif
Loading…
Cancel
Save