|
|
|
@ -44,47 +44,6 @@ typedef struct {
|
|
|
|
|
|
|
|
|
|
static medredis_con_t *con = NULL;
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
static med_entry_t *medredis_reply_to_entry(redisReply *reply) {
|
|
|
|
|
med_entry_t *entry;
|
|
|
|
|
|
|
|
|
|
if (reply->elements != 8) {
|
|
|
|
|
L_ERROR("Invalid number of redis reply elements for acc record, expected 8, got %lu\n",
|
|
|
|
|
reply->elements);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
// verify types
|
|
|
|
|
for (int i = 0; i < (int) reply->elements; i++) {
|
|
|
|
|
if (reply->element[i]->type != REDIS_REPLY_STRING) {
|
|
|
|
|
L_ERROR("Received Redis reply type %i instead of %i (string) for element %i\n",
|
|
|
|
|
reply->element[i]->type, REDIS_REPLY_STRING, i);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
if (reply->element[i]->str == NULL) {
|
|
|
|
|
L_ERROR("Received NULL string from Redis for element %i\n",
|
|
|
|
|
i);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
entry = (med_entry_t*)malloc(sizeof(med_entry_t));
|
|
|
|
|
memset(entry, 0, sizeof(med_entry_t));
|
|
|
|
|
|
|
|
|
|
g_strlcpy(entry->sip_code, reply->element[0]->str, sizeof(entry->sip_code));
|
|
|
|
|
g_strlcpy(entry->sip_reason, reply->element[1]->str, sizeof(entry->sip_reason));
|
|
|
|
|
g_strlcpy(entry->sip_method, reply->element[2]->str, sizeof(entry->sip_method));
|
|
|
|
|
g_strlcpy(entry->callid, reply->element[3]->str, sizeof(entry->callid));
|
|
|
|
|
g_strlcpy(entry->timestamp, reply->element[4]->str, sizeof(entry->timestamp));
|
|
|
|
|
entry->unix_timestamp = atof(reply->element[5]->str);
|
|
|
|
|
g_strlcpy(entry->src_leg, reply->element[6]->str, sizeof(entry->src_leg));
|
|
|
|
|
g_strlcpy(entry->dst_leg, reply->element[7]->str, sizeof(entry->dst_leg));
|
|
|
|
|
entry->valid = 1;
|
|
|
|
|
entry->redis = 1;
|
|
|
|
|
|
|
|
|
|
L_DEBUG("Converted record with cid '%s' and method '%s'\n", entry->callid, entry->sip_method);
|
|
|
|
|
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
static void medredis_free_reply(redisReply **reply) {
|
|
|
|
|
if (reply && *reply) {
|
|
|
|
@ -275,6 +234,146 @@ static void medredis_consume_replies(void) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
static int medredis_remove_entry(char* key) {
|
|
|
|
|
char *argv[2];
|
|
|
|
|
|
|
|
|
|
// delete acc:entry::$cid:$timestamp
|
|
|
|
|
argv[0] = "DEL";
|
|
|
|
|
argv[1] = key;
|
|
|
|
|
if (medredis_append_command_argv(2, argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append redis command to remove key '%s'\n", key);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
medredis_consume_replies();
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
err:
|
|
|
|
|
medredis_consume_replies();
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
static int medredis_remove_mappings(const char* cid, char* key) {
|
|
|
|
|
char *argv[3];
|
|
|
|
|
char buffer[512];
|
|
|
|
|
|
|
|
|
|
snprintf(buffer, sizeof(buffer), "acc:cid::%s", cid);
|
|
|
|
|
|
|
|
|
|
// delete cid from acc:cid::$cid mapping
|
|
|
|
|
argv[0] = "SREM";
|
|
|
|
|
argv[1] = buffer;
|
|
|
|
|
argv[2] = key;
|
|
|
|
|
if (medredis_append_command_argv(3, argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append redis command to remove mapping key '%s' from '%s'\n", key, argv[1]);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// delete cid from acc:meth::INVITE and acc:meth::BYE
|
|
|
|
|
argv[0] = "SREM";
|
|
|
|
|
argv[1] = "acc:meth::INVITE";
|
|
|
|
|
argv[2] = key;
|
|
|
|
|
if (medredis_append_command_argv(3, argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append redis command to remove mapping key '%s' from '%s'\n", key, argv[1]);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
argv[1] = "acc:meth::BYE";
|
|
|
|
|
if (medredis_append_command_argv(3, argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append redis command to remove mapping key '%s' from '%s'\n", key, argv[1]);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
medredis_consume_replies();
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
err:
|
|
|
|
|
medredis_consume_replies();
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
static int medredis_check_reply_string(med_entry_t *entry, redisReply *reply,
|
|
|
|
|
const char* element, const char* element_name, size_t element_size,
|
|
|
|
|
int index,
|
|
|
|
|
const char* cid, const char* key) {
|
|
|
|
|
int ret;
|
|
|
|
|
if (reply->element[index]->type != REDIS_REPLY_STRING) {
|
|
|
|
|
L_WARNING("Received Redis reply type %i instead of %i (string) for %s field of cid '%s' using key '%s'\n",
|
|
|
|
|
reply->element[index]->type, REDIS_REPLY_STRING, cid, key);
|
|
|
|
|
entry->valid = 0;
|
|
|
|
|
ret = -1;
|
|
|
|
|
} else {
|
|
|
|
|
g_strlcpy(element, reply->element[index]->str, element_size);
|
|
|
|
|
ret = 0;
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
static med_entry_t *medredis_reply_to_entry(redisReply *reply, const char* cid, const char* key) {
|
|
|
|
|
med_entry_t *entry;
|
|
|
|
|
uint8_t all_null = 1;
|
|
|
|
|
|
|
|
|
|
if (reply->elements != 8) {
|
|
|
|
|
L_ERROR("Invalid number of redis reply elements for acc record with cid '%s' and key '%s', expected 8, got %lu, trashing record\n",
|
|
|
|
|
cid, key, reply->elements);
|
|
|
|
|
medredis_remove_mappings(cid, key);
|
|
|
|
|
medredis_remove_entry(key);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// check for all fields being null, as this is a special case where the entry
|
|
|
|
|
// doesn't exist in redis at all (compared to cases where only a certain field might be null)
|
|
|
|
|
for (int i = 0; i < (int) reply->elements; i++) {
|
|
|
|
|
if (reply->element[i]->type != REDIS_REPLY_NIL) {
|
|
|
|
|
all_null = 0;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (all_null) {
|
|
|
|
|
L_WARNING("Redis entry does not exist for key '%s', trashing mappings", key);
|
|
|
|
|
medredis_remove_mappings(cid, key);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// verify call-id and trash if invalid
|
|
|
|
|
if (reply->element[3]->type != REDIS_REPLY_STRING) {
|
|
|
|
|
L_WARNING("Received Redis reply type %i instead of %i (string) for call-id field of cid '%s' using key '%s', trashing record\n",
|
|
|
|
|
reply->element[3]->type, REDIS_REPLY_STRING, cid, key);
|
|
|
|
|
medredis_remove_mappings(cid, key);
|
|
|
|
|
medredis_remove_entry(key);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
entry = (med_entry_t*)malloc(sizeof(med_entry_t));
|
|
|
|
|
memset(entry, 0, sizeof(med_entry_t));
|
|
|
|
|
entry->valid = 1;
|
|
|
|
|
entry->redis = 1;
|
|
|
|
|
|
|
|
|
|
medredis_check_reply_string(entry, reply, entry->sip_code, "sip_code", sizeof(entry->sip_code), 0, cid, key);
|
|
|
|
|
medredis_check_reply_string(entry, reply, entry->sip_reason, "sip_reason", sizeof(entry->sip_reason), 1, cid, key);
|
|
|
|
|
medredis_check_reply_string(entry, reply, entry->sip_method, "sip_method", sizeof(entry->sip_method), 2, cid, key);
|
|
|
|
|
medredis_check_reply_string(entry, reply, entry->callid, "callid", sizeof(entry->callid), 3, cid, key);
|
|
|
|
|
if (medredis_check_reply_string(entry, reply, entry->timestamp, "timestamp", sizeof(entry->timestamp), 4, cid, key) != 0) {
|
|
|
|
|
g_strlcpy(entry->timestamp, "0000-00-00 00:00:00", sizeof(entry->timestamp));
|
|
|
|
|
}
|
|
|
|
|
if (reply->element[5]->type != REDIS_REPLY_STRING) {
|
|
|
|
|
L_WARNING("Received Redis reply type %i instead of %i (string) for unix_timestamp field of cid '%s' using key '%s'\n",
|
|
|
|
|
reply->element[5]->type, REDIS_REPLY_STRING, cid, key);
|
|
|
|
|
entry->valid = 0;
|
|
|
|
|
} else {
|
|
|
|
|
entry->unix_timestamp = atof(reply->element[5]->str);
|
|
|
|
|
}
|
|
|
|
|
medredis_check_reply_string(entry, reply, entry->src_leg, "src_leg", sizeof(entry->src_leg), 6, cid, key);
|
|
|
|
|
medredis_check_reply_string(entry, reply, entry->dst_leg, "dst_leg", sizeof(entry->dst_leg), 7, cid, key);
|
|
|
|
|
|
|
|
|
|
L_DEBUG("Converted record with cid '%s' and method '%s'\n", entry->callid, entry->sip_method);
|
|
|
|
|
|
|
|
|
|
return entry;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
int medredis_init() {
|
|
|
|
|
struct timeval tv;
|
|
|
|
@ -503,6 +602,11 @@ static void medredis_append_key(gpointer data, gpointer user_data) {
|
|
|
|
|
if (medredis_append_command_argv(entry_argc, entry_argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append command to fetch key\n");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
static void medredis_free_keys_list(gpointer data) {
|
|
|
|
|
char *key = (char*)data;
|
|
|
|
|
free(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -609,9 +713,12 @@ int medredis_fetch_records(med_callid_t *callid,
|
|
|
|
|
|
|
|
|
|
L_DEBUG("Appending all keys to redis command\n");
|
|
|
|
|
g_list_foreach(keys, medredis_append_key, NULL);
|
|
|
|
|
do {
|
|
|
|
|
|
|
|
|
|
for (GList *l = keys; l; l = l->next) {
|
|
|
|
|
med_entry_t *e;
|
|
|
|
|
L_DEBUG("Fetching next reply record\n");
|
|
|
|
|
char *key = (char*)l->data;
|
|
|
|
|
|
|
|
|
|
L_DEBUG("Fetching next reply record, query key was '%s'\n", key);
|
|
|
|
|
if (medredis_get_reply(&reply) != 0) {
|
|
|
|
|
L_ERROR("Failed to get reply from redis (cid '%s')\n", callid->value);
|
|
|
|
|
goto err;
|
|
|
|
@ -621,17 +728,16 @@ int medredis_fetch_records(med_callid_t *callid,
|
|
|
|
|
medredis_check_reply("get reply", reply, err);
|
|
|
|
|
medredis_dump_reply(reply);
|
|
|
|
|
|
|
|
|
|
e = medredis_reply_to_entry(reply);
|
|
|
|
|
e = medredis_reply_to_entry(reply, callid->value, key);
|
|
|
|
|
if (!e) {
|
|
|
|
|
L_ERROR("Failed to convert redis reply to entry (cid '%s')\n", callid->value);
|
|
|
|
|
L_WARNING("Failed to convert redis reply to entry (cid '%s')\n", callid->value);
|
|
|
|
|
medredis_free_reply(&reply);
|
|
|
|
|
goto err;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
medredis_free_reply(&reply);
|
|
|
|
|
records = g_list_prepend(records, e);
|
|
|
|
|
(*count)++;
|
|
|
|
|
|
|
|
|
|
} while(1);
|
|
|
|
|
(*count)++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*entries = (med_entry_t*)malloc(*count * sizeof(med_entry_t));
|
|
|
|
|
if (!*entries) {
|
|
|
|
@ -653,7 +759,7 @@ int medredis_fetch_records(med_callid_t *callid,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
g_list_free(records);
|
|
|
|
|
g_list_free(keys);
|
|
|
|
|
g_list_free_full(keys, medredis_free_keys_list);
|
|
|
|
|
|
|
|
|
|
medredis_consume_replies();
|
|
|
|
|
|
|
|
|
@ -678,7 +784,6 @@ err:
|
|
|
|
|
|
|
|
|
|
/**********************************************************************/
|
|
|
|
|
static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const char *table) {
|
|
|
|
|
char *argv[3];
|
|
|
|
|
char buffer[512];
|
|
|
|
|
|
|
|
|
|
if (medmysql_insert_records(records, count, table) != 0) {
|
|
|
|
@ -691,46 +796,20 @@ static int medredis_cleanup_entries(med_entry_t *records, uint64_t count, const
|
|
|
|
|
if (!e->redis)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
L_DEBUG("Cleaning up redis entry for %s:%f\n", e->callid, e->unix_timestamp);
|
|
|
|
|
|
|
|
|
|
// delete acc:cid mapping
|
|
|
|
|
snprintf(buffer, sizeof(buffer), "acc:cid::%s", e->callid);
|
|
|
|
|
argv[0] = "DEL";
|
|
|
|
|
argv[1] = buffer;
|
|
|
|
|
if (medredis_append_command_argv(2, argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append redis command to remove key '%s'\n", buffer);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// delete cid from acc:meth::INVITE and acc:meth::BYE
|
|
|
|
|
argv[0] = "SREM";
|
|
|
|
|
argv[1] = "acc:meth::INVITE";
|
|
|
|
|
snprintf(buffer, sizeof(buffer), "acc:entry::%s:%f", e->callid, e->unix_timestamp);
|
|
|
|
|
argv[2] = buffer;
|
|
|
|
|
if (medredis_append_command_argv(3, argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append redis command to remove key '%s' from '%s'\n", buffer, argv[1]);
|
|
|
|
|
|
|
|
|
|
L_DEBUG("Cleaning up redis entry for %s\n", buffer);
|
|
|
|
|
if (medredis_remove_mappings(e->callid, buffer) != 0) {
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
argv[1] = "acc:meth::BYE";
|
|
|
|
|
if (medredis_append_command_argv(3, argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append redis command to remove key '%s' from '%s'\n", buffer, argv[1]);
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// delete acc:entry::$cid:$timestamp
|
|
|
|
|
argv[0] = "DEL";
|
|
|
|
|
argv[1] = buffer;
|
|
|
|
|
if (medredis_append_command_argv(2, argv, 1) != 0) {
|
|
|
|
|
L_ERROR("Failed to append redis command to remove key '%s'\n", buffer);
|
|
|
|
|
if (medredis_remove_entry(buffer) != 0) {
|
|
|
|
|
goto err;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
medredis_consume_replies();
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
err:
|
|
|
|
|
medredis_consume_replies();
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|