From 04a534bb6742c32e10f2cb469b76816b5eeb4582 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Mon, 27 Jun 2022 13:41:46 -0400 Subject: [PATCH] TT#182200 delay Redis entry deletion until after MySQL transaction The MySQL INSERT statements to move processed Redis acc records from Redis to the respective backup/trash MySQL tables are always issued within a MySQL transaction (med_handler via medmysql_batch_start), but the deletions from Redis were done immediately. Therefore if mediator were to abort within a processing loop, the MySQL transaction would be rolled back after the entries had already been deleted from Redis, therefore losing the acc entries. Solve this by using an internal queue for Redis entries to hold the lists of entries to be deleted until the MySQL transaction is commited. Change-Id: Ib41d0e2ca722c66f9e078ca31f7e5ca2b9d9fe2d --- mediator.c | 2 ++ medredis.c | 47 +++++++++++++++++++++++++++++++++++++++++++---- medredis.h | 1 + 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/mediator.c b/mediator.c index 33c9672..b7d7ca5 100644 --- a/mediator.c +++ b/mediator.c @@ -443,6 +443,8 @@ int main(int argc, char **argv) if (medmysql_batch_end(batches)) break; + if (medredis_batch_end()) + break; gettimeofday(&loop_tv_stop, NULL); loop_runtime = mediator_calc_runtime(&loop_tv_start, &loop_tv_stop); diff --git a/medredis.c b/medredis.c index 9f78027..421d614 100644 --- a/medredis.c +++ b/medredis.c @@ -42,13 +42,14 @@ typedef struct { typedef struct { const char *name; + GQueue record_lists; } medredis_table_t; static medredis_con_t *con = NULL; static char medredis_srem_key_lua[41]; // sha-1 hex string -static medredis_table_t medredis_table_trash = { .name = "trash" }; -static medredis_table_t medredis_table_backup = { .name = "backup" }; +static medredis_table_t medredis_table_trash = { .name = "trash", .record_lists = G_QUEUE_INIT }; +static medredis_table_t medredis_table_backup = { .name = "backup", .record_lists = G_QUEUE_INIT }; /**********************************************************************/ static void medredis_free_reply(redisReply **reply) { @@ -810,13 +811,30 @@ err: /**********************************************************************/ static int medredis_cleanup_entries(GQueue *records, medredis_table_t *table) { - char buffer[512]; - if (medmysql_insert_records(records, table->name) != 0) { L_CRITICAL("Failed to cleanup redis records\n"); goto err; } + // take ownership of the contents of the `records` queue and swap out contents with + // an empty queue + GQueue *copy = g_queue_new(); + GQueue tmp = *copy; // empty queue + *copy = *records; // take ownership + *records = tmp; // replace with empty queue + + g_queue_push_tail(&table->record_lists, copy); + + return 0; + +err: + return -1; +} + +/**********************************************************************/ +static int medredis_batch_end_records(GQueue *records) { + char buffer[512]; + for (GList *l = records->head; l; l = l->next) { med_entry_t *e = l->data; if (!e->redis) @@ -859,6 +877,27 @@ err: return -1; } +/**********************************************************************/ +static int medredis_batch_end_table(medredis_table_t *table) { + while (table->record_lists.length) { + GQueue *records = g_queue_pop_head(&table->record_lists); + int ret = medredis_batch_end_records(records); + g_queue_free_full(records, med_entry_free); + if (ret) + return -1; + } + return 0; +} + +/**********************************************************************/ +int medredis_batch_end(void) { + if (medredis_batch_end_table(&medredis_table_trash)) + return -1; + if (medredis_batch_end_table(&medredis_table_backup)) + return -1; + return 0; +} + /**********************************************************************/ int medredis_trash_entries(GQueue *records) { return medredis_cleanup_entries(records, &medredis_table_trash); diff --git a/medredis.h b/medredis.h index 93df908..425cc1b 100644 --- a/medredis.h +++ b/medredis.h @@ -12,5 +12,6 @@ gboolean medredis_fetch_callids(GQueue *output); int medredis_fetch_records(char *callid, GQueue *entries, records_filter_func, void *filter_data); int medredis_trash_entries(GQueue *records); int medredis_backup_entries(GQueue *records); +int medredis_batch_end(void); #endif /* _MED_REDIS_H */