TT#24140 add transaction replay capability to work around temporary errors

Change-Id: Ic20a40a5eafc6f5f8d59724a40ff3cfce36ce41f
changes/65/16665/7
Richard Fuchs 8 years ago
parent c94f9dffb1
commit 266c624ddd

@ -2,6 +2,7 @@
#include <m_string.h>
#include <mysql.h>
#include <mysql/errmsg.h>
#include <mysql/mysqld_error.h>
#include <assert.h>
#include "medmysql.h"
@ -56,7 +57,12 @@ typedef struct _medmysql_handler {
const char *name;
MYSQL *m;
int is_transaction;
GQueue transaction_statements;
} medmysql_handler;
typedef struct _statement_str {
char *str;
unsigned long len;
} statement_str;
static medmysql_handler *cdr_handler;
static medmysql_handler *med_handler;
@ -67,6 +73,19 @@ static int medmysql_flush_cdr(struct medmysql_batches *);
static int medmysql_flush_medlist(struct medmysql_str *);
static int medmysql_flush_call_stat_info();
static void medmysql_handler_close(medmysql_handler **h);
static int medmysql_handler_transaction(medmysql_handler *h);
static void statement_free(void *stm_p) {
statement_str *stm = stm_p;
free(stm->str);
free(stm);
}
static void __g_queue_clear_full(GQueue *q, GDestroyNotify free_func) {
void *p;
while ((p = g_queue_pop_head(q)))
free_func(p);
}
static int medmysql_query_wrapper(medmysql_handler *mysql, const char *stmt_str, unsigned long length) {
@ -77,9 +96,11 @@ static int medmysql_query_wrapper(medmysql_handler *mysql, const char *stmt_str,
for (i = 0; i < 10; i++) {
ret = mysql_real_query(mysql->m, stmt_str, length);
if (!ret)
return ret;
break;
err = mysql_errno(mysql->m);
if (err == CR_SERVER_GONE_ERROR || err == CR_SERVER_LOST || err == CR_CONN_HOST_ERROR || err == CR_CONNECTION_ERROR) {
if (err == CR_SERVER_GONE_ERROR || err == CR_SERVER_LOST || err == CR_CONN_HOST_ERROR
|| err == CR_CONNECTION_ERROR)
{
syslog(LOG_WARNING, "Lost connection to SQL server during query, retrying...");
sleep(10);
continue;
@ -89,6 +110,80 @@ static int medmysql_query_wrapper(medmysql_handler *mysql, const char *stmt_str,
return ret;
}
static int medmysql_query_wrapper_tx(medmysql_handler *mysql, const char *stmt_str, unsigned long length) {
int ret;
int i;
unsigned int err;
if (!mysql->is_transaction) {
syslog(LOG_CRIT, "SQL mode is not in transaction");
return -1;
}
for (i = 0; i < 10; i++) {
ret = mysql_real_query(mysql->m, stmt_str, length);
if (!ret)
break;
err = mysql_errno(mysql->m);
if (err == CR_SERVER_GONE_ERROR || err == CR_SERVER_LOST || err == CR_CONN_HOST_ERROR
|| err == CR_CONNECTION_ERROR || err == ER_LOCK_WAIT_TIMEOUT
|| err == ER_LOCK_DEADLOCK)
{
// rollback, cancel transaction, restart transaction, replay all statements,
// and then try again
syslog(LOG_WARNING, "Got error %u from SQL server during transaction, retrying...",
err);
ret = mysql_real_query(mysql->m, "rollback", 8);
if (ret) {
syslog(LOG_CRIT, "Got error %u from SQL during rollback",
mysql_errno(mysql->m));
return -1;
}
mysql->is_transaction = 0;
sleep(10);
if (medmysql_handler_transaction(mysql))
return -1;
// steal the statement queue and recursively replay them into a new empty queue
GQueue replay = mysql->transaction_statements;
g_queue_init(&mysql->transaction_statements);
statement_str *stm;
while ((stm = g_queue_pop_head(&replay))) {
if (medmysql_query_wrapper_tx(mysql, stm->str, stm->len)) {
__g_queue_clear_full(&mysql->transaction_statements, statement_free);
statement_free(stm);
return -1;
}
statement_free(stm);
}
continue;
}
break;
}
if (!ret) {
// append statement to queue for possible replaying
statement_str *stm = malloc(sizeof(*stm));
if (!stm) {
syslog(LOG_CRIT, "Out of memory (malloc statement_str)");
return -1;
}
stm->str = malloc(length);
if (!stm->str) {
syslog(LOG_CRIT, "Out of memory (malloc statement_str body)");
free(stm);
return -1;
}
memcpy(stm->str, stmt_str, length);
stm->len = length;
g_queue_push_tail(&mysql->transaction_statements, stm);
}
return ret;
}
static medmysql_handler *medmysql_handler_init(const char *name, const char *host, const char *user,
const char *pass, const char *db, unsigned int port)
{
@ -102,6 +197,7 @@ static medmysql_handler *medmysql_handler_init(const char *name, const char *hos
}
memset(ret, 0, sizeof(*ret));
ret->name = name;
g_queue_init(&ret->transaction_statements);
ret->m = mysql_init(NULL);
if (!ret->m) {
syslog(LOG_CRIT, "Out of memory (mysql_init)");
@ -177,7 +273,12 @@ static void medmysql_handler_close(medmysql_handler **h) {
if ((*h)->m)
mysql_close((*h)->m);
free(h);
if ((*h)->transaction_statements.length)
syslog(LOG_WARNING, "Closing DB handle with still %u statements in queue",
(*h)->transaction_statements.length);
__g_queue_clear_full(&(*h)->transaction_statements, statement_free);
free(*h);
*h = NULL;
}
@ -756,6 +857,7 @@ static int medmysql_handler_commit(medmysql_handler *h) {
if (medmysql_query_wrapper(h, "commit", 6))
return -1;
h->is_transaction = 0;
__g_queue_clear_full(&h->transaction_statements, statement_free);
return 0;
}
@ -789,7 +891,7 @@ static int medmysql_flush_cdr(struct medmysql_batches *batches) {
batches->cdrs.len--;
batches->cdrs.str[batches->cdrs.len] = '\0';
if(medmysql_query_wrapper(cdr_handler, batches->cdrs.str, batches->cdrs.len) != 0)
if(medmysql_query_wrapper_tx(cdr_handler, batches->cdrs.str, batches->cdrs.len) != 0)
{
batches->cdrs.len = 0;
syslog(LOG_CRIT, "Error inserting cdrs: %s",
@ -825,7 +927,7 @@ static int medmysql_flush_medlist(struct medmysql_str *str) {
str->str[str->len - 1] = ')';
if(medmysql_query_wrapper(med_handler, str->str, str->len) != 0)
if(medmysql_query_wrapper_tx(med_handler, str->str, str->len) != 0)
{
str->len = 0;
syslog(LOG_CRIT, "Error executing query: %s",

Loading…
Cancel
Save