TT#182200 delay Redis entry deletion until after MySQL transaction

The MySQL INSERT statements to move processed Redis acc records from
Redis to the respective backup/trash MySQL tables are always issued
within a MySQL transaction (med_handler via medmysql_batch_start), but
the deletions from Redis were done immediately. Therefore if mediator
were to abort within a processing loop, the MySQL transaction would be
rolled back after the entries had already been deleted from Redis,
therefore losing the acc entries.

Solve this by using an internal queue for Redis entries to hold the
lists of entries to be deleted until the MySQL transaction is commited.

Change-Id: Ib41d0e2ca722c66f9e078ca31f7e5ca2b9d9fe2d
mr11.0
Richard Fuchs 3 years ago
parent 66d9cccb98
commit 04a534bb67

@ -443,6 +443,8 @@ int main(int argc, char **argv)
if (medmysql_batch_end(batches))
break;
if (medredis_batch_end())
break;
gettimeofday(&loop_tv_stop, NULL);
loop_runtime = mediator_calc_runtime(&loop_tv_start, &loop_tv_stop);

@ -42,13 +42,14 @@ typedef struct {
typedef struct {
const char *name;
GQueue record_lists;
} medredis_table_t;
static medredis_con_t *con = NULL;
static char medredis_srem_key_lua[41]; // sha-1 hex string
static medredis_table_t medredis_table_trash = { .name = "trash" };
static medredis_table_t medredis_table_backup = { .name = "backup" };
static medredis_table_t medredis_table_trash = { .name = "trash", .record_lists = G_QUEUE_INIT };
static medredis_table_t medredis_table_backup = { .name = "backup", .record_lists = G_QUEUE_INIT };
/**********************************************************************/
static void medredis_free_reply(redisReply **reply) {
@ -810,13 +811,30 @@ err:
/**********************************************************************/
static int medredis_cleanup_entries(GQueue *records, medredis_table_t *table) {
char buffer[512];
if (medmysql_insert_records(records, table->name) != 0) {
L_CRITICAL("Failed to cleanup redis records\n");
goto err;
}
// take ownership of the contents of the `records` queue and swap out contents with
// an empty queue
GQueue *copy = g_queue_new();
GQueue tmp = *copy; // empty queue
*copy = *records; // take ownership
*records = tmp; // replace with empty queue
g_queue_push_tail(&table->record_lists, copy);
return 0;
err:
return -1;
}
/**********************************************************************/
static int medredis_batch_end_records(GQueue *records) {
char buffer[512];
for (GList *l = records->head; l; l = l->next) {
med_entry_t *e = l->data;
if (!e->redis)
@ -859,6 +877,27 @@ err:
return -1;
}
/**********************************************************************/
static int medredis_batch_end_table(medredis_table_t *table) {
while (table->record_lists.length) {
GQueue *records = g_queue_pop_head(&table->record_lists);
int ret = medredis_batch_end_records(records);
g_queue_free_full(records, med_entry_free);
if (ret)
return -1;
}
return 0;
}
/**********************************************************************/
int medredis_batch_end(void) {
if (medredis_batch_end_table(&medredis_table_trash))
return -1;
if (medredis_batch_end_table(&medredis_table_backup))
return -1;
return 0;
}
/**********************************************************************/
int medredis_trash_entries(GQueue *records) {
return medredis_cleanup_entries(records, &medredis_table_trash);

@ -12,5 +12,6 @@ gboolean medredis_fetch_callids(GQueue *output);
int medredis_fetch_records(char *callid, GQueue *entries, records_filter_func, void *filter_data);
int medredis_trash_entries(GQueue *records);
int medredis_backup_entries(GQueue *records);
int medredis_batch_end(void);
#endif /* _MED_REDIS_H */

Loading…
Cancel
Save