diff --git a/mediator.c b/mediator.c index 33c9672..b7d7ca5 100644 --- a/mediator.c +++ b/mediator.c @@ -443,6 +443,8 @@ int main(int argc, char **argv) if (medmysql_batch_end(batches)) break; + if (medredis_batch_end()) + break; gettimeofday(&loop_tv_stop, NULL); loop_runtime = mediator_calc_runtime(&loop_tv_start, &loop_tv_stop); diff --git a/medredis.c b/medredis.c index 9f78027..421d614 100644 --- a/medredis.c +++ b/medredis.c @@ -42,13 +42,14 @@ typedef struct { typedef struct { const char *name; + GQueue record_lists; } medredis_table_t; static medredis_con_t *con = NULL; static char medredis_srem_key_lua[41]; // sha-1 hex string -static medredis_table_t medredis_table_trash = { .name = "trash" }; -static medredis_table_t medredis_table_backup = { .name = "backup" }; +static medredis_table_t medredis_table_trash = { .name = "trash", .record_lists = G_QUEUE_INIT }; +static medredis_table_t medredis_table_backup = { .name = "backup", .record_lists = G_QUEUE_INIT }; /**********************************************************************/ static void medredis_free_reply(redisReply **reply) { @@ -810,13 +811,30 @@ err: /**********************************************************************/ static int medredis_cleanup_entries(GQueue *records, medredis_table_t *table) { - char buffer[512]; - if (medmysql_insert_records(records, table->name) != 0) { L_CRITICAL("Failed to cleanup redis records\n"); goto err; } + // take ownership of the contents of the `records` queue and swap out contents with + // an empty queue + GQueue *copy = g_queue_new(); + GQueue tmp = *copy; // empty queue + *copy = *records; // take ownership + *records = tmp; // replace with empty queue + + g_queue_push_tail(&table->record_lists, copy); + + return 0; + +err: + return -1; +} + +/**********************************************************************/ +static int medredis_batch_end_records(GQueue *records) { + char buffer[512]; + for (GList *l = records->head; l; l = l->next) { med_entry_t *e = l->data; if (!e->redis) @@ -859,6 +877,27 @@ err: return -1; } +/**********************************************************************/ +static int medredis_batch_end_table(medredis_table_t *table) { + while (table->record_lists.length) { + GQueue *records = g_queue_pop_head(&table->record_lists); + int ret = medredis_batch_end_records(records); + g_queue_free_full(records, med_entry_free); + if (ret) + return -1; + } + return 0; +} + +/**********************************************************************/ +int medredis_batch_end(void) { + if (medredis_batch_end_table(&medredis_table_trash)) + return -1; + if (medredis_batch_end_table(&medredis_table_backup)) + return -1; + return 0; +} + /**********************************************************************/ int medredis_trash_entries(GQueue *records) { return medredis_cleanup_entries(records, &medredis_table_trash); diff --git a/medredis.h b/medredis.h index 93df908..425cc1b 100644 --- a/medredis.h +++ b/medredis.h @@ -12,5 +12,6 @@ gboolean medredis_fetch_callids(GQueue *output); int medredis_fetch_records(char *callid, GQueue *entries, records_filter_func, void *filter_data); int medredis_trash_entries(GQueue *records); int medredis_backup_entries(GQueue *records); +int medredis_batch_end(void); #endif /* _MED_REDIS_H */