diff --git a/medmysql.c b/medmysql.c index 290c1e6..e7a4d00 100644 --- a/medmysql.c +++ b/medmysql.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #include "medmysql.h" @@ -56,7 +57,12 @@ typedef struct _medmysql_handler { const char *name; MYSQL *m; int is_transaction; + GQueue transaction_statements; } medmysql_handler; +typedef struct _statement_str { + char *str; + unsigned long len; +} statement_str; static medmysql_handler *cdr_handler; static medmysql_handler *med_handler; @@ -67,6 +73,19 @@ static int medmysql_flush_cdr(struct medmysql_batches *); static int medmysql_flush_medlist(struct medmysql_str *); static int medmysql_flush_call_stat_info(); static void medmysql_handler_close(medmysql_handler **h); +static int medmysql_handler_transaction(medmysql_handler *h); + + +static void statement_free(void *stm_p) { + statement_str *stm = stm_p; + free(stm->str); + free(stm); +} +static void __g_queue_clear_full(GQueue *q, GDestroyNotify free_func) { + void *p; + while ((p = g_queue_pop_head(q))) + free_func(p); +} static int medmysql_query_wrapper(medmysql_handler *mysql, const char *stmt_str, unsigned long length) { @@ -77,9 +96,11 @@ static int medmysql_query_wrapper(medmysql_handler *mysql, const char *stmt_str, for (i = 0; i < 10; i++) { ret = mysql_real_query(mysql->m, stmt_str, length); if (!ret) - return ret; + break; err = mysql_errno(mysql->m); - if (err == CR_SERVER_GONE_ERROR || err == CR_SERVER_LOST || err == CR_CONN_HOST_ERROR || err == CR_CONNECTION_ERROR) { + if (err == CR_SERVER_GONE_ERROR || err == CR_SERVER_LOST || err == CR_CONN_HOST_ERROR + || err == CR_CONNECTION_ERROR) + { syslog(LOG_WARNING, "Lost connection to SQL server during query, retrying..."); sleep(10); continue; @@ -89,6 +110,80 @@ static int medmysql_query_wrapper(medmysql_handler *mysql, const char *stmt_str, return ret; } + +static int medmysql_query_wrapper_tx(medmysql_handler *mysql, const char *stmt_str, unsigned long length) { + int ret; + int i; + unsigned int err; + + if (!mysql->is_transaction) { + syslog(LOG_CRIT, "SQL mode is not in transaction"); + return -1; + } + + for (i = 0; i < 10; i++) { + ret = mysql_real_query(mysql->m, stmt_str, length); + if (!ret) + break; + err = mysql_errno(mysql->m); + if (err == CR_SERVER_GONE_ERROR || err == CR_SERVER_LOST || err == CR_CONN_HOST_ERROR + || err == CR_CONNECTION_ERROR || err == ER_LOCK_WAIT_TIMEOUT + || err == ER_LOCK_DEADLOCK) + { + // rollback, cancel transaction, restart transaction, replay all statements, + // and then try again + syslog(LOG_WARNING, "Got error %u from SQL server during transaction, retrying...", + err); + ret = mysql_real_query(mysql->m, "rollback", 8); + if (ret) { + syslog(LOG_CRIT, "Got error %u from SQL during rollback", + mysql_errno(mysql->m)); + return -1; + } + mysql->is_transaction = 0; + + sleep(10); + + if (medmysql_handler_transaction(mysql)) + return -1; + + // steal the statement queue and recursively replay them into a new empty queue + GQueue replay = mysql->transaction_statements; + g_queue_init(&mysql->transaction_statements); + statement_str *stm; + while ((stm = g_queue_pop_head(&replay))) { + if (medmysql_query_wrapper_tx(mysql, stm->str, stm->len)) { + __g_queue_clear_full(&mysql->transaction_statements, statement_free); + statement_free(stm); + return -1; + } + statement_free(stm); + } + + continue; + } + break; + } + if (!ret) { + // append statement to queue for possible replaying + statement_str *stm = malloc(sizeof(*stm)); + if (!stm) { + syslog(LOG_CRIT, "Out of memory (malloc statement_str)"); + return -1; + } + stm->str = malloc(length); + if (!stm->str) { + syslog(LOG_CRIT, "Out of memory (malloc statement_str body)"); + free(stm); + return -1; + } + memcpy(stm->str, stmt_str, length); + stm->len = length; + g_queue_push_tail(&mysql->transaction_statements, stm); + } + return ret; +} + static medmysql_handler *medmysql_handler_init(const char *name, const char *host, const char *user, const char *pass, const char *db, unsigned int port) { @@ -102,6 +197,7 @@ static medmysql_handler *medmysql_handler_init(const char *name, const char *hos } memset(ret, 0, sizeof(*ret)); ret->name = name; + g_queue_init(&ret->transaction_statements); ret->m = mysql_init(NULL); if (!ret->m) { syslog(LOG_CRIT, "Out of memory (mysql_init)"); @@ -177,7 +273,12 @@ static void medmysql_handler_close(medmysql_handler **h) { if ((*h)->m) mysql_close((*h)->m); - free(h); + if ((*h)->transaction_statements.length) + syslog(LOG_WARNING, "Closing DB handle with still %u statements in queue", + (*h)->transaction_statements.length); + __g_queue_clear_full(&(*h)->transaction_statements, statement_free); + + free(*h); *h = NULL; } @@ -756,6 +857,7 @@ static int medmysql_handler_commit(medmysql_handler *h) { if (medmysql_query_wrapper(h, "commit", 6)) return -1; h->is_transaction = 0; + __g_queue_clear_full(&h->transaction_statements, statement_free); return 0; } @@ -789,7 +891,7 @@ static int medmysql_flush_cdr(struct medmysql_batches *batches) { batches->cdrs.len--; batches->cdrs.str[batches->cdrs.len] = '\0'; - if(medmysql_query_wrapper(cdr_handler, batches->cdrs.str, batches->cdrs.len) != 0) + if(medmysql_query_wrapper_tx(cdr_handler, batches->cdrs.str, batches->cdrs.len) != 0) { batches->cdrs.len = 0; syslog(LOG_CRIT, "Error inserting cdrs: %s", @@ -825,7 +927,7 @@ static int medmysql_flush_medlist(struct medmysql_str *str) { str->str[str->len - 1] = ')'; - if(medmysql_query_wrapper(med_handler, str->str, str->len) != 0) + if(medmysql_query_wrapper_tx(med_handler, str->str, str->len) != 0) { str->len = 0; syslog(LOG_CRIT, "Error executing query: %s",