#include #include #include #include "medredis.h" #include "medmysql.h" #include "config.h" #include "records.h" #define PBXSUFFIX "_pbx-1" #define XFERSUFFIX "_xfer-1" #define medredis_check_reply(reply, err, fmt, ...) do { \ if (!(reply) && (!(con) || !(con)->ctx)) { \ L_ERROR("Failed to perform redis query '" fmt "': no connection to server\n", \ ##__VA_ARGS__); \ goto err; \ } \ if (!(reply)) { \ L_ERROR("Failed to perform redis query '" fmt "': %s\n", \ ##__VA_ARGS__, (con)->ctx->errstr); \ goto err; \ } \ if ((reply)->type == REDIS_REPLY_ERROR) { \ L_ERROR("Failed to perform redis query '" fmt "': %s\n", \ ##__VA_ARGS__, (reply)->str); \ goto err; \ } \ } while(0) #define SREM_KEY_LUA "redis.call('SREM', KEYS[1], KEYS[3]); if redis.call('SCARD', KEYS[1]) == 0 then redis.call('SREM', KEYS[2], KEYS[1]) end" typedef struct { char **argv; size_t argc; } medredis_command_t; typedef struct { redisContext *ctx; GQueue *command_queue; unsigned int append_counter; } medredis_con_t; typedef struct { const char *name; GQueue record_lists; } medredis_table_t; static medredis_con_t *con = NULL; static char medredis_srem_key_lua[41]; // sha-1 hex string static medredis_table_t medredis_table_trash = { .name = "trash", .record_lists = G_QUEUE_INIT }; static medredis_table_t medredis_table_backup = { .name = "backup", .record_lists = G_QUEUE_INIT }; /**********************************************************************/ static void medredis_free_reply(redisReply **reply) { if (reply && *reply) { freeReplyObject(*reply); *reply = NULL; } } /**********************************************************************/ static void medredis_dump_reply(redisReply *reply) { size_t i; if (reply->type == REDIS_REPLY_STRING) { L_DEBUG(" %s\n", reply->str); } else if (reply->type == REDIS_REPLY_INTEGER) { L_DEBUG(" %lld\n", reply->integer); } else if (reply->type == REDIS_REPLY_NIL) { L_DEBUG(" \n"); } else if (reply->type == REDIS_REPLY_ARRAY) { L_DEBUG("printing %lu elements in array reply\n", reply->elements); for(i = 0; i < reply->elements; ++i) { medredis_dump_reply(reply->element[i]); } } else { L_DEBUG(" not printing invalid reply type\n"); } } /**********************************************************************/ static void medredis_dump_command(size_t argc, char **argv) { L_DEBUG("Dumping command:\n"); for (size_t i = 0; i < argc; ++i) { L_DEBUG(" %lu: %s", i, argv[i]); } } /**********************************************************************/ static void medredis_free_command(gpointer data) { medredis_command_t *cmd = (medredis_command_t*)data; if (!cmd) return; if (cmd->argv) { for (size_t i = 0; i < cmd->argc; ++i) free(cmd->argv[i]); free(cmd->argv); } free(cmd); } /**********************************************************************/ static int medredis_push_command(size_t argc, char **argv) { medredis_command_t *cmd = NULL; if (!argv) return 0; cmd = (medredis_command_t*)malloc(sizeof(medredis_command_t)); if (!cmd) { L_ERROR("Failed to allocate memory for redis command\n"); goto err; } cmd->argc = 0; cmd->argv = (char**)malloc(argc * sizeof(char*)); if (!cmd->argv) { L_ERROR("Failed to allocate memory for redis query\n"); goto err; } for (size_t i = 0; i < argc; ++i) { cmd->argv[i] = (char*)malloc(strlen(argv[i]) + 1); strcpy(cmd->argv[i], argv[i]); } cmd->argc = argc; g_queue_push_tail(con->command_queue, cmd); return 0; err: medredis_free_command(cmd); return -1; } /**********************************************************************/ MEDIATOR_ATTR_PRINTF(1, 2) static redisReply *medredis_command(const char* cmd, ...) { va_list args; va_start(args, cmd); L_DEBUG("Performing redis query '%s'\n", cmd); redisReply *reply = redisvCommand(con->ctx, cmd, args); if (con->ctx->err == REDIS_ERR_EOF) { if (medredis_init() != 0) { va_end(args); L_ERROR("Failed to reconnect to redis db\n"); medredis_cleanup(); return NULL; } reply = redisvCommand(con->ctx, cmd, args); } va_end(args); return reply; } /**********************************************************************/ static int medredis_append_command_argv(size_t argc, char **argv, int queue) { int ret; if (queue > 0 && medredis_push_command(argc, argv) != 0) { L_ERROR("Failed to queue redis command\n"); return -1; } medredis_dump_command(argc, argv); ret = redisAppendCommandArgv(con->ctx, argc, (const char**)argv, NULL); // this should actually never happen, because if all replies // are properly consumed for the previous command, it won't send // out a new query until redisGetReply is called if (con->ctx->err == REDIS_ERR_EOF) { if (medredis_init() != 0) { L_ERROR("Failed to reconnect to redis db\n"); medredis_cleanup(); return -1; } ret = redisAppendCommandArgv(con->ctx, argc, (const char**)argv, NULL); } if (!con->ctx->err) { con->append_counter++; } return ret; } /**********************************************************************/ static int medredis_get_reply(redisReply **reply) { int ret; medredis_command_t *cmd; if (con->append_counter == 0) { *reply = NULL; return 0; } *reply = NULL; ret = redisGetReply(con->ctx, (void**)reply); if (con->ctx->err == REDIS_ERR_EOF) { L_DEBUG("Redis connection is gone, try reconnect\n"); con->append_counter = 0; if (medredis_init() != 0) { L_ERROR("Failed to reconnect to redis db\n"); medredis_cleanup(); return -1; } // take commands from oldest to newest and re-do again, // but don't queue them once again in retry-mode while ((cmd = g_queue_pop_head(con->command_queue))) { L_DEBUG("re-queueing appended command\n"); if (medredis_append_command_argv(cmd->argc, cmd->argv, 0) != 0) { L_ERROR("Failed to re-queue redis command"); return -1; } medredis_free_command(cmd); } ret = redisGetReply(con->ctx, (void**)reply); if (con->ctx->err != REDIS_ERR_EOF) { con->append_counter--; } } else { L_DEBUG("Redis get_reply successful, un-queueing command\n"); cmd = g_queue_pop_head(con->command_queue); medredis_free_command(cmd); con->append_counter--; } return ret; } /**********************************************************************/ static void medredis_consume_replies(void) { medredis_command_t *cmd; redisReply *reply = NULL; while (con->append_counter > 0) { int ret = redisGetReply(con->ctx, (void**)&reply); if (ret == REDIS_OK) { con->append_counter--; medredis_free_reply(&reply); } else { con->append_counter = 0; } } while ((cmd = g_queue_pop_head(con->command_queue))) { medredis_free_command(cmd); } } /**********************************************************************/ static int medredis_remove_entry(char* key) { char *argv[2]; // delete acc:entry::$cid:$timestamp argv[0] = "DEL"; argv[1] = key; if (medredis_append_command_argv(2, argv, 1) != 0) { L_ERROR("Failed to append redis command to remove key '%s'\n", key); goto err; } medredis_consume_replies(); return 0; err: medredis_consume_replies(); return -1; } /**********************************************************************/ static int medredis_remove_mappings(const char* cid, char* key, const char *method) { char *argv[6]; char buffer[512]; char *method_key = NULL; snprintf(buffer, sizeof(buffer), "acc:cid::%s", cid); // delete cid from acc:cid::$cid mapping and handle acc::index::cid mapping argv[0] = "EVALSHA"; argv[1] = medredis_srem_key_lua; argv[2] = "3"; argv[3] = buffer; argv[4] = "acc::index::cid"; argv[5] = key; if (medredis_append_command_argv(6, argv, 1) != 0) { L_ERROR("Failed to append redis command to remove mapping key '%s' from '%s'\n", key, argv[1]); goto err; } // delete cid from acc:meth::$METHOD and handle acc::index::meth mappings char *methods[3]; int num_methods; if (!method || !method[0]) { methods[0] = "acc:meth::INVITE"; methods[1] = "acc:meth::BYE"; methods[2] = "acc:meth::REFER"; num_methods = 3; } else { method_key = g_strdup_printf("acc:meth::%s", method); methods[0] = method_key; num_methods = 1; } argv[0] = "EVALSHA"; argv[1] = medredis_srem_key_lua; argv[2] = "3"; argv[4] = "acc::index::meth"; argv[5] = key; for (int i = 0; i < num_methods; i++) { argv[3] = methods[i]; if (medredis_append_command_argv(6, argv, 1) != 0) { L_ERROR("Failed to append redis command to remove mapping key '%s' from '%s'\n", key, argv[1]); goto err; } } g_free(method_key); medredis_consume_replies(); return 0; err: g_free(method_key); medredis_consume_replies(); return -1; } /**********************************************************************/ static int medredis_check_reply_string(med_entry_t *entry, redisReply *reply, const char* element_name, int index, const char* cid, const char* key) { if (reply->element[index]->type != REDIS_REPLY_STRING) { L_WARNING("Received Redis reply type %i instead of %i (string) for %s field of cid '%s' using key '%s'\n", reply->element[index]->type, REDIS_REPLY_STRING, element_name, cid, key); entry->valid = 0; return -1; } return 0; } /**********************************************************************/ static int medredis_check_reply_strcpy(med_entry_t *entry, redisReply *reply, char* element, const char* element_name, size_t element_size, int index, const char* cid, const char* key) { if (medredis_check_reply_string(entry, reply, element_name, index, cid, key) == 0) { g_strlcpy(element, reply->element[index]->str, element_size); return 0; } return -1; } /**********************************************************************/ static void medredis_check_reply_strdup(med_entry_t *entry, redisReply *reply, char** element, const char* element_name, int index, const char* cid, const char* key) { if (medredis_check_reply_string(entry, reply, element_name, index, cid, key) == 0) *element = g_strdup(reply->element[index]->str); else *element = g_strdup(""); } /**********************************************************************/ static med_entry_t *medredis_reply_to_entry(redisReply *reply, const char* cid, char* key) { med_entry_t *entry; uint8_t all_null = 1; if (reply->elements != 9) { L_ERROR("Invalid number of redis reply elements for acc record with cid '%s' and key '%s', expected 9, got %lu, trashing record\n", cid, key, reply->elements); medredis_remove_mappings(cid, key, NULL); medredis_remove_entry(key); return NULL; } // check for all fields being null, as this is a special case where the entry // doesn't exist in redis at all (compared to cases where only a certain field might be null) for (int i = 0; i < (int) reply->elements; i++) { if (reply->element[i]->type != REDIS_REPLY_NIL) { all_null = 0; break; } } if (all_null) { L_WARNING("Redis entry does not exist for key '%s', trashing mappings", key); medredis_remove_mappings(cid, key, NULL); return NULL; } // verify call-id and trash if invalid if (reply->element[3]->type != REDIS_REPLY_STRING) { L_WARNING("Received Redis reply type %i instead of %i (string) for call-id field of cid '%s' using key '%s', trashing record\n", reply->element[3]->type, REDIS_REPLY_STRING, cid, key); medredis_remove_mappings(cid, key, NULL); medredis_remove_entry(key); return NULL; } entry = g_slice_alloc0(sizeof(med_entry_t)); entry->valid = 1; entry->redis = 1; medredis_check_reply_strcpy(entry, reply, entry->sip_code, "sip_code", sizeof(entry->sip_code), 0, cid, key); medredis_check_reply_strcpy(entry, reply, entry->sip_reason, "sip_reason", sizeof(entry->sip_reason), 1, cid, key); medredis_check_reply_strcpy(entry, reply, entry->sip_method, "sip_method", sizeof(entry->sip_method), 2, cid, key); medredis_check_reply_strdup(entry, reply, &entry->callid, "callid", 3, cid, key); if (medredis_check_reply_strcpy(entry, reply, entry->timestamp, "timestamp", sizeof(entry->timestamp), 4, cid, key) != 0) { g_strlcpy(entry->timestamp, "0000-00-00 00:00:00", sizeof(entry->timestamp)); } if (reply->element[5]->type != REDIS_REPLY_STRING) { L_WARNING("Received Redis reply type %i instead of %i (string) for unix_timestamp field of cid '%s' using key '%s'\n", reply->element[5]->type, REDIS_REPLY_STRING, cid, key); entry->valid = 0; } else { entry->unix_timestamp = atof(reply->element[5]->str); } medredis_check_reply_strdup(entry, reply, &entry->src_leg, "src_leg", 6, cid, key); medredis_check_reply_strdup(entry, reply, &entry->dst_leg, "dst_leg", 7, cid, key); if (reply->element[8]->type != REDIS_REPLY_STRING) { L_DEBUG("Received Redis reply of cid '%s' using key '%s' doesn't have branch_id >>> old acc record version\n", cid, key); g_strlcpy(entry->branch_id, "", sizeof(entry->branch_id)); } else { medredis_check_reply_strcpy(entry, reply, entry->branch_id, "branch_id", sizeof(entry->branch_id), 8, cid, key); } entry->acc_ref = g_strdup(key); cdr_parse_entry(entry); L_DEBUG("Converted record with cid '%s' and method '%s'\n", entry->callid, entry->sip_method); return entry; } /**********************************************************************/ static int medredis_init_one(void) { struct timeval tv; redisReply *reply; tv.tv_sec = 1; tv.tv_usec = 0; reply = NULL; medredis_cleanup(); L_DEBUG("Connecting to redis %s:%d/%d\n", config_redis_host, config_redis_port, config_redis_db); con = (medredis_con_t*)malloc(sizeof(medredis_con_t)); if (!con) { L_ERROR("Failed to allocate memory for redis connection\n"); goto err; } con->command_queue = g_queue_new(); con->append_counter = 0; con->ctx = redisConnectWithTimeout(config_redis_host, config_redis_port, tv); if (!con->ctx) { L_ERROR("Cannot open redis connection to %s:%d\n", config_redis_host, config_redis_port); goto err; } if (con->ctx->err) { L_ERROR("Cannot open redis connection to %s:%d: %s\n", config_redis_host, config_redis_port, con->ctx->errstr); goto err; } if (config_redis_pass) { reply = redisCommand(con->ctx, "AUTH %s", config_redis_pass); medredis_check_reply(reply, err, "AUTH"); medredis_free_reply(&reply); } reply = redisCommand(con->ctx, "PING"); medredis_check_reply(reply, err, "PING"); medredis_free_reply(&reply); reply = redisCommand(con->ctx, "SELECT %i", config_redis_db); medredis_check_reply(reply, err, "SELECT %i", config_redis_db); medredis_free_reply(&reply); reply = redisCommand(con->ctx, "SCRIPT LOAD %s", SREM_KEY_LUA); medredis_check_reply(reply, err, "SCRIPT LOAD %s", SREM_KEY_LUA); if (reply->type != REDIS_REPLY_STRING || reply->len >= sizeof(medredis_srem_key_lua)) { L_ERROR("Invalid reply from SCRIPT LOAD: %i/%lu\n", reply->type, (unsigned long) reply->len); goto err; } g_strlcpy(medredis_srem_key_lua, reply->str, sizeof(medredis_srem_key_lua)); medredis_free_reply(&reply); L_DEBUG("Redis connection opened to %s:%d/%d\n", config_redis_host, config_redis_port, config_redis_db); return 0; err: medredis_free_reply(&reply); if (con) { if (con->ctx) { redisFree(con->ctx); con->ctx = NULL; } free(con); con = NULL; } return -1; } int medredis_init() { int i = 0; static const int max_retries = 10; while (1) { if (medredis_init_one() == 0) return 0; if (i++ >= max_retries) return -1; L_ERROR("Redis connection init failed, retry attempt #%i/%i\n", i, max_retries); usleep(200000); } } /**********************************************************************/ void medredis_cleanup() { if (con) { if (con->ctx) { redisFree(con->ctx); con->ctx = NULL; } g_queue_free_full(con->command_queue, medredis_free_command); free(con); con = NULL; } } /**********************************************************************/ gboolean medredis_fetch_callids(GQueue *output) { unsigned int cursor = 0; size_t i = 0; redisReply *reply = NULL; #define SCAN_CMD "SSCAN acc:meth::INVITE %u COUNT 1000" #define GET_CLEAN_CMD "HGET %s dont_clean_suffix" GHashTable *cid_table; char *tmp; cid_table = g_hash_table_new(g_str_hash, g_str_equal); do { reply = medredis_command(SCAN_CMD, cursor); medredis_check_reply(reply, err, SCAN_CMD, cursor); if (reply->type != REDIS_REPLY_ARRAY) { L_ERROR("Invalid reply type for scan, expected array\n"); goto err; } if (reply->elements != 2) { L_ERROR("Invalid number of reply elements for scan, expected 2, got %lu\n", reply->elements); goto err; } if (reply->element[0]->type == REDIS_REPLY_STRING) { cursor = atol(reply->element[0]->str); } else if (reply->element[0]->type == REDIS_REPLY_INTEGER) { // should not happen, but play it safe cursor = reply->element[0]->integer; } else { L_ERROR("Invalid cursor type for scan, expected string or integer\n"); goto err; } if (reply->element[1]->type != REDIS_REPLY_ARRAY) { L_ERROR("Invalid content type for scan, expected array\n"); goto err; } if (reply->element[1]->elements == 0 && cursor == 0) { L_DEBUG("No matching entries found for scan\n"); medredis_free_reply(&reply); break; } for (i = 0; i < reply->element[1]->elements; ++i) { char *cid; redisReply *entry = reply->element[1]->element[i]; if (!entry) { L_ERROR("Invalid null entry at cursor result index %lu while scanning table\n", i); goto err; } if (entry->type != REDIS_REPLY_STRING) { L_ERROR("Invalid entry type at cursor result index %lu while scanning table, expected string\n", i); goto err; } L_DEBUG("Got entry '%s'\n", entry->str); // strip leading "acc:entry::" and trailing "::" cid = g_strdup(entry->str + strlen("acc:entry::")); tmp = strrchr(cid, ':'); if (tmp) { *tmp = '\0'; } tmp = strrchr(cid, ':'); if (tmp) { *tmp = '\0'; } int truncate_callid = 1; redisReply *reply_get_dont_clean_suffix = medredis_command(GET_CLEAN_CMD, entry->str); medredis_check_reply(reply_get_dont_clean_suffix, err, GET_CLEAN_CMD, entry->str); if (reply_get_dont_clean_suffix->type == REDIS_REPLY_STRING && reply_get_dont_clean_suffix->str) { if (!strcmp(reply_get_dont_clean_suffix->str,"1")) { truncate_callid = 0; L_WARNING("Preventing callid to be truncated since the original CALLID coming from endpoint has already more _pbx-1 suffixes\n"); } } medredis_free_reply(&reply_get_dont_clean_suffix); if (truncate_callid) cdr_truncate_call_id_suffix(cid); if (g_hash_table_insert(cid_table, cid, cid)) { g_queue_push_tail(output, cid); } else { free(cid); } } medredis_free_reply(&reply); } while (cursor && output->length < 200000); g_hash_table_destroy(cid_table); return TRUE; err: if (reply) freeReplyObject(reply); g_hash_table_destroy(cid_table); return FALSE; } /**********************************************************************/ static void medredis_append_key(gpointer data, gpointer user_data) { char *key = (char*)data; (void)user_data; L_DEBUG("Appending key '%s' to keys list\n", key); size_t entry_argc = 11; char *entry_argv[11]; entry_argv[0] = "HMGET"; entry_argv[1] = key; entry_argv[2] = "sip_code"; entry_argv[3] = "sip_reason"; entry_argv[4] = "method"; entry_argv[5] = "callid"; entry_argv[6] = "time"; entry_argv[7] = "time_hires"; entry_argv[8] = "src_leg"; entry_argv[9] = "dst_leg"; entry_argv[10] = "branch_id"; if (medredis_append_command_argv(entry_argc, entry_argv, 1) != 0) { L_ERROR("Failed to append command to fetch key\n"); } } /**********************************************************************/ static void medredis_free_keys_list(gpointer data) { char *key = (char*)data; free(key); } /**********************************************************************/ int medredis_fetch_records(char *callid, GQueue *entries, records_filter_func filter, void *filter_data) { /* 1. fetch from acc:cid::$cid 2. fetch from acc:cid::$cidPBXSUFFIX 3. fetch from acc:cid::$cidXFERSUFFIX 4. fetch from acc:cid::$cidPBXSUFFIXXFERSUFFIX 5. combine all in list sorted by time_hires 6. skip if INVITE/200 and no BYE */ uint8_t cid_set_argc; char *cid_set_argv[2]; redisReply *reply = NULL; char *cids[4]; GList *keys; size_t i; cid_set_argc = 2; cid_set_argv[0] = "SMEMBERS"; keys = NULL; memset(cids, 0, sizeof(cids)); cids[0] = g_strdup_printf("acc:cid::%s", callid); cids[1] = g_strdup_printf("acc:cid::%s%s", callid, PBXSUFFIX); cids[2] = g_strdup_printf("acc:cid::%s%s", callid, XFERSUFFIX); cids[3] = g_strdup_printf("acc:cid::%s%s%s", callid, PBXSUFFIX, XFERSUFFIX); L_DEBUG("Fetching records from redis\n"); for (i = 0; i < G_N_ELEMENTS(cids); ++i) { char *cid = cids[i]; cid_set_argv[1] = cid; if (medredis_append_command_argv(cid_set_argc, cid_set_argv, 1) != 0) { L_ERROR("Failed to append redis command to fetch entries for cid '%s'\n", callid); goto err; } } for (i = 0; i < G_N_ELEMENTS(cids); ++i) { if (medredis_get_reply(&reply) != 0) { L_ERROR("Failed to get redis reply for command to fetch entries for cid '%s'\n", callid); goto err; } medredis_check_reply(reply, err, "smembers for cid"); medredis_dump_reply(reply); if (reply->type != REDIS_REPLY_ARRAY) { medredis_free_reply(&reply); L_ERROR("Invalid reply type for scan, expected array\n"); goto err; } if (!reply->elements) { medredis_free_reply(&reply); L_DEBUG("No matching entries for cid '%s' at suffix idx %lu\n", callid, i); continue; } for (size_t j = 0; j < reply->elements; ++j) { char *key = strdup(reply->element[j]->str); if (!key) { L_ERROR("Failed to allocate memory for redis key (cid '%s')\n", callid); goto err; } L_DEBUG("Putting key '%s' to keys list\n", key); keys = g_list_prepend(keys, key); } medredis_free_reply(&reply); } L_DEBUG("Appending all keys to redis command\n"); g_list_foreach(keys, medredis_append_key, NULL); int ret = 0; GList *entries_end = entries->tail; for (GList *l = keys; l; l = l->next) { med_entry_t *e; char *key = (char*)l->data; L_DEBUG("Fetching next reply record, query key was '%s'\n", key); if (medredis_get_reply(&reply) != 0) { L_ERROR("Failed to get reply from redis (cid '%s')\n", callid); goto err; } if (!reply) break; medredis_check_reply(reply, err, "get reply"); medredis_dump_reply(reply); e = medredis_reply_to_entry(reply, callid, key); if (!e) { L_WARNING("Failed to convert redis reply to entry (cid '%s')\n", callid); medredis_free_reply(&reply); continue; } medredis_free_reply(&reply); if (!filter || filter(e, filter_data)) { g_queue_push_tail(entries, e); ret = 1; } else med_entry_free(e); } // Second pass over the newly added entries (starting at `entries_end`). // It's necessary to do this as a second pass so the replies from Redis // are read and consumed in order. for (GList *l = entries_end ? entries_end->next : entries->head; l; l = l->next) { med_entry_t *e = l->data; records_handle_refer(entries, e, callid); } g_list_free_full(keys, medredis_free_keys_list); for (i = 0; i < G_N_ELEMENTS(cids); ++i) g_free(cids[i]); medredis_consume_replies(); return ret; err: if (reply) freeReplyObject(reply); for (i = 0; i < G_N_ELEMENTS(cids); ++i) { g_free(cids[i]); } medredis_consume_replies(); return -1; } /**********************************************************************/ static int medredis_cleanup_entries(GQueue *records, medredis_table_t *table) { if (medmysql_insert_records(records, table->name) != 0) { L_CRITICAL("Failed to cleanup redis records\n"); goto err; } // take ownership of the contents of the `records` queue and swap out contents with // an empty queue GQueue *copy = g_queue_new(); GQueue tmp = *copy; // empty queue *copy = *records; // take ownership *records = tmp; // replace with empty queue g_queue_push_tail(&table->record_lists, copy); return 0; err: return -1; } /**********************************************************************/ static int medredis_batch_end_records(GQueue *records) { char buffer[512]; for (GList *l = records->head; l; l = l->next) { med_entry_t *e = l->data; if (!e->redis) continue; const char *branch_id = e->branch_id; // do this at most twice: once with an empty branch ID, and once without a branch ID at all do { if (branch_id == NULL) { // Old ACC record version: it doesn't have branch_id snprintf(buffer, sizeof(buffer), "acc:entry::%s:%f", e->callid, e->unix_timestamp); } else { snprintf(buffer, sizeof(buffer), "acc:entry::%s:%f:%s", e->callid, e->unix_timestamp, branch_id); } L_DEBUG("Cleaning up redis entry for %s\n", buffer); if (medredis_remove_mappings(e->callid, buffer, e->sip_method) != 0) { goto err; } if (medredis_remove_entry(buffer) != 0) { goto err; } // exit condition: if the branch ID is non-NULL but an empty string, try again with // a NULL branch ID. // otherwise (branch ID is NULL or a non-empty string) we break. if (branch_id == NULL || strlen(branch_id) != 0) break; branch_id = NULL; } while (1); } return 0; err: return -1; } /**********************************************************************/ static int medredis_batch_end_table(medredis_table_t *table) { while (table->record_lists.length) { GQueue *records = g_queue_pop_head(&table->record_lists); int ret = medredis_batch_end_records(records); g_queue_free_full(records, med_entry_free); if (ret) return -1; } return 0; } /**********************************************************************/ int medredis_batch_end(void) { if (medredis_batch_end_table(&medredis_table_trash)) return -1; if (medredis_batch_end_table(&medredis_table_backup)) return -1; return 0; } /**********************************************************************/ int medredis_trash_entries(GQueue *records) { return medredis_cleanup_entries(records, &medredis_table_trash); } /**********************************************************************/ int medredis_backup_entries(GQueue *records) { return medredis_cleanup_entries(records, &medredis_table_backup); }