TT#43558 Move invalid redis acc records to trash

Change-Id: I9298b0cedded32b6932ddaa2bf4e11a2a5e1aa47
changes/57/23157/11
Andreas Granig 7 years ago
parent 4ff9eda645
commit 15eb7bbfa3

@ -63,7 +63,12 @@ int cdr_process_records(med_entry_t *records, uint64_t count, uint64_t *ext_coun
for(i = 0; i < count; ++i)
{
med_entry_t *e = &(records[i]);
if(strcmp(e->sip_method, MSG_INVITE) == 0)
if(!e->valid)
{
++msg_unknowns;
e->method = MED_UNRECOGNIZED;
}
else if(strcmp(e->sip_method, MSG_INVITE) == 0)
{
++msg_invites;
e->method = MED_INVITE;

@ -446,6 +446,11 @@ int medmysql_insert_records(med_entry_t *records, uint64_t count, const char *ta
L_DEBUG("Issuing record insert query: %s\n", sql_buffer);
ret = medmysql_query_wrapper(med_handler, sql_buffer, strlen(sql_buffer));
if (ret != 0)
{
L_ERROR("Error executing query '%s': %s",
sql_buffer, mysql_error(med_handler->m));
}
free(sql_buffer);
return ret;

@ -44,47 +44,6 @@ typedef struct {
static medredis_con_t *con = NULL;
/**********************************************************************/
static med_entry_t *medredis_reply_to_entry(redisReply *reply) {
med_entry_t *entry;
if (reply->elements != 8) {
L_ERROR("Invalid number of redis reply elements for acc record, expected 8, got %lu\n",
reply->elements);
return NULL;
}
// verify types
for (int i = 0; i < (int) reply->elements; i++) {
if (reply->element[i]->type != REDIS_REPLY_STRING) {
L_ERROR("Received Redis reply type %i instead of %i (string) for element %i\n",
reply->element[i]->type, REDIS_REPLY_STRING, i);
return NULL;
}
if (reply->element[i]->str == NULL) {
L_ERROR("Received NULL string from Redis for element %i\n",
i);
return NULL;
}
}
entry = (med_entry_t*)malloc(sizeof(med_entry_t));
memset(entry, 0, sizeof(med_entry_t));
g_strlcpy(entry->sip_code, reply->element[0]->str, sizeof(entry->sip_code));
g_strlcpy(entry->sip_reason, reply->element[1]->str, sizeof(entry->sip_reason));
g_strlcpy(entry->sip_method, reply->element[2]->str, sizeof(entry->sip_method));
g_strlcpy(entry->callid, reply->element[3]->str, sizeof(entry->callid));
g_strlcpy(entry->timestamp, reply->element[4]->str, sizeof(entry->timestamp));
entry->unix_timestamp = atof(reply->element[5]->str);
g_strlcpy(entry->src_leg, reply->element[6]->str, sizeof(entry->src_leg));
g_strlcpy(entry->dst_leg, reply->element[7]->str, sizeof(entry->dst_leg));
entry->valid = 1;
entry->redis = 1;
L_DEBUG("Converted record with cid '%s' and method '%s'\n", entry->callid, entry->sip_method);
return entry;
}
/**********************************************************************/
static void medredis_free_reply(redisReply **reply) {
if (reply && *reply) {
@ -275,6 +234,146 @@ static void medredis_consume_replies(void) {
}
}
/**********************************************************************/
static int medredis_remove_entry(char* key) {
char *argv[2];
// delete acc:entry::$cid:$timestamp
argv[0] = "DEL";
argv[1] = key;
if (medredis_append_command_argv(2, argv, 1) != 0) {
L_ERROR("Failed to append redis command to remove key '%s'\n", key);
goto err;
}
medredis_consume_replies();
return 0;
err:
medredis_consume_replies();
return -1;
}
/**********************************************************************/
static int medredis_remove_mappings(const char* cid, char* key) {
char *argv[3];
char buffer[512];
snprintf(buffer, sizeof(buffer), "acc:cid::%s", cid);
// delete cid from acc:cid::$cid mapping
argv[0] = "SREM";
argv[1] = buffer;
argv[2] = key;
if (medredis_append_command_argv(3, argv, 1) != 0) {
L_ERROR("Failed to append redis command to remove mapping key '%s' from '%s'\n", key, argv[1]);
goto err;
}
// delete cid from acc:meth::INVITE and acc:meth::BYE
argv[0] = "SREM";
argv[1] = "acc:meth::INVITE";
argv[2] = key;
if (medredis_append_command_argv(3, argv, 1) != 0) {
L_ERROR("Failed to append redis command to remove mapping key '%s' from '%s'\n", key, argv[1]);
goto err;
}
argv[1] = "acc:meth::BYE";
if (medredis_append_command_argv(3, argv, 1) != 0) {
L_ERROR("Failed to append redis command to remove mapping key '%s' from '%s'\n", key, argv[1]);
goto err;
}
medredis_consume_replies();
return 0;
err:
medredis_consume_replies();
return -1;
}
/**********************************************************************/
static int medredis_check_reply_string(med_entry_t *entry, redisReply *reply,
const char* element, const char* element_name, size_t element_size,
int index,
const char* cid, const char* key) {
int ret;
if (reply->element[index]->type != REDIS_REPLY_STRING) {
L_WARNING("Received Redis reply type %i instead of %i (string) for %s field of cid '%s' using key '%s'\n",
reply->element[index]->type, REDIS_REPLY_STRING, cid, key);
entry->valid = 0;
ret = -1;
} else {
g_strlcpy(element, reply->element[index]->str, element_size);
ret = 0;
}
return ret;
}
/**********************************************************************/
static med_entry_t *medredis_reply_to_entry(redisReply *reply, const char* cid, const char* key) {
med_entry_t *entry;
uint8_t all_null = 1;
if (reply->elements != 8) {
L_ERROR("Invalid number of redis reply elements for acc record with cid '%s' and key '%s', expected 8, got %lu, trashing record\n",
cid, key, reply->elements);
medredis_remove_mappings(cid, key);
medredis_remove_entry(key);
return NULL;
}
// check for all fields being null, as this is a special case where the entry
// doesn't exist in redis at all (compared to cases where only a certain field might be null)
for (int i = 0; i < (int) reply->elements; i++) {
if (reply->element[i]->type != REDIS_REPLY_NIL) {
all_null = 0;
break;
}
}
if (all_null) {
L_WARNING("Redis entry does not exist for key '%s', trashing mappings", key);
medredis_remove_mappings(cid, key);
return NULL;
}
// verify call-id and trash if invalid
if (reply->element[3]->type != REDIS_REPLY_STRING) {
L_WARNING("Received Redis reply type %i instead of %i (string) for call-id field of cid '%s' using key '%s', trashing record\n",
reply->element[3]->type, REDIS_REPLY_STRING, cid, key);
medredis_remove_mappings(cid, key);
medredis_remove_entry(key);
return NULL;
}
entry = (med_entry_t*)malloc(sizeof(med_entry_t));
memset(entry, 0, sizeof(med_entry_t));
entry->valid = 1;
entry->redis = 1;
medredis_check_reply_string(entry, reply, entry->sip_code, "sip_code", sizeof(entry->sip_code), 0, cid, key);
medredis_check_reply_string(entry, reply, entry->sip_reason, "sip_reason", sizeof(entry->sip_reason), 1, cid, key);
medredis_check_reply_string(entry, reply, entry->sip_method, "sip_method", sizeof(entry->sip_method), 2, cid, key);
medredis_check_reply_string(entry, reply, entry->callid, "callid", sizeof(entry->callid), 3, cid, key);
if (medredis_check_reply_string(entry, reply, entry->timestamp, "timestamp", sizeof(entry->timestamp), 4, cid, key) != 0) {
g_strlcpy(entry->timestamp, "0000-00-00 00:00:00", sizeof(entry->timestamp));
}
if (reply->element[5]->type != REDIS_REPLY_STRING) {
L_WARNING("Received Redis reply type %i instead of %i (string) for unix_timestamp field of cid '%s' using key '%s'\n",
reply->element[5]->type, REDIS_REPLY_STRING, cid, key);
entry->valid = 0;
} else {
entry->unix_timestamp = atof(reply->element[5]->str);
}
medredis_check_reply_string(entry, reply, entry->src_leg, "src_leg", sizeof(entry->src_leg), 6, cid, key);
medredis_check_reply_string(entry, reply, entry->dst_leg, "dst_leg", sizeof(entry->dst_leg), 7, cid, key);
L_DEBUG("Converted record with cid '%s' and method '%s'\n", entry->callid, entry->sip_method);
return entry;
}
/**********************************************************************/
int medredis_init() {
struct timeval tv;
@ -503,6 +602,11 @@ static void medredis_append_key(gpointer data, gpointer user_data) {
if (medredis_append_command_argv(entry_argc, entry_argv, 1) != 0) {
L_ERROR("Failed to append command to fetch key\n");
}
}
/**********************************************************************/
static void medredis_free_keys_list(gpointer data) {
char *key = (char*)data;
free(key);
}
@ -609,9 +713,12 @@ int medredis_fetch_records(med_callid_t *callid,
L_DEBUG("Appending all keys to redis command\n");
g_list_foreach(keys, medredis_append_key, NULL);
do {
for (GList *l = keys; l; l = l->next) {
med_entry_t *e;
L_DEBUG("Fetching next reply record\n");
char *key = (char*)l->data;
L_DEBUG("Fetching next reply record, query key was '%s'\n", key);
if (medredis_get_reply(&reply) != 0) {
L_ERROR("Failed to get reply from redis (cid '%s')\n", callid->value);
goto err;
@ -621,17 +728,16 @@ int medredis_fetch_records(med_callid_t *callid,
medredis_check_reply("get reply", reply, err);
medredis_dump_reply(reply);
e = medredis_reply_to_entry(reply);
e = medredis_reply_to_entry(reply, callid->value, key);
if (!e) {
L_ERROR("Failed to convert redis reply to entry (cid '%s')\n", callid->value);
L_WARNING("Failed to convert redis reply to entry (cid '%s')\n", callid->value);
medredis_free_reply(&reply);
goto err;
continue;
}
medredis_free_reply(&reply);
records = g_list_prepend(records, e);
(*count)++;
} while(1);
(*count)++;
}
*entries = (med_entry_t*)malloc(*count * sizeof(med_entry_t));
if (!*entries) {
@ -653,7 +759,7 @@ int medredis_fetch_records(med_callid_t *callid,
}
g_list_free(records);
g_list_free(keys);
g_list_free_full(keys, medredis_free_keys_list);
medredis_consume_replies();
@ -678,7 +784,6 @@ err:
/**********************************************************************/
static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const char *table) {
char *argv[3];
char buffer[512];
if (medmysql_insert_records(records, count, table) != 0) {
@ -691,46 +796,20 @@ static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const
if (!e->redis)
continue;
L_DEBUG("Cleaning up redis entry for %s:%f\n", e->callid, e->unix_timestamp);
// delete acc:cid mapping
snprintf(buffer, sizeof(buffer), "acc:cid::%s", e->callid);
argv[0] = "DEL";
argv[1] = buffer;
if (medredis_append_command_argv(2, argv, 1) != 0) {
L_ERROR("Failed to append redis command to remove key '%s'\n", buffer);
goto err;
}
// delete cid from acc:meth::INVITE and acc:meth::BYE
argv[0] = "SREM";
argv[1] = "acc:meth::INVITE";
snprintf(buffer, sizeof(buffer), "acc:entry::%s:%f", e->callid, e->unix_timestamp);
argv[2] = buffer;
if (medredis_append_command_argv(3, argv, 1) != 0) {
L_ERROR("Failed to append redis command to remove key '%s' from '%s'\n", buffer, argv[1]);
L_DEBUG("Cleaning up redis entry for %s\n", buffer);
if (medredis_remove_mappings(e->callid, buffer) != 0) {
goto err;
}
argv[1] = "acc:meth::BYE";
if (medredis_append_command_argv(3, argv, 1) != 0) {
L_ERROR("Failed to append redis command to remove key '%s' from '%s'\n", buffer, argv[1]);
goto err;
}
// delete acc:entry::$cid:$timestamp
argv[0] = "DEL";
argv[1] = buffer;
if (medredis_append_command_argv(2, argv, 1) != 0) {
L_ERROR("Failed to append redis command to remove key '%s'\n", buffer);
if (medredis_remove_entry(buffer) != 0) {
goto err;
}
}
medredis_consume_replies();
return 0;
err:
medredis_consume_replies();
return -1;
}

Loading…
Cancel
Save