|
|
|
|
@ -78,8 +78,19 @@ struct db_conn {
|
|
|
|
|
};
|
|
|
|
|
typedef struct db_conn db_conn_t;
|
|
|
|
|
|
|
|
|
|
TYPED_GQUEUE(db_conn, db_conn_t)
|
|
|
|
|
|
|
|
|
|
static __thread db_conn_t *db_conn;
|
|
|
|
|
static db_conn_q db_conns;
|
|
|
|
|
static pthread_mutex_t db_conns_lock = PTHREAD_MUTEX_INITIALIZER;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void return_conn(db_conn_t *dbc) {
|
|
|
|
|
pthread_mutex_lock(&db_conns_lock);
|
|
|
|
|
t_queue_push_head(&db_conns, dbc);
|
|
|
|
|
pthread_mutex_unlock(&db_conns_lock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
G_DEFINE_AUTOPTR_CLEANUP_FUNC(db_conn_t, return_conn);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void my_stmt_close(MYSQL_STMT **st) {
|
|
|
|
|
@ -91,9 +102,6 @@ static void my_stmt_close(MYSQL_STMT **st) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void reset_conn(db_conn_t *dbc) {
|
|
|
|
|
if (!dbc)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
if (dbc->mysql_conn) {
|
|
|
|
|
my_stmt_close(&dbc->stm_insert_call);
|
|
|
|
|
my_stmt_close(&dbc->stm_close_call);
|
|
|
|
|
@ -109,6 +117,13 @@ static void reset_conn(db_conn_t *dbc) {
|
|
|
|
|
g_free(dbc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void clear_conn(db_conn_t **dbc) {
|
|
|
|
|
if (!*dbc)
|
|
|
|
|
return;
|
|
|
|
|
reset_conn(*dbc);
|
|
|
|
|
*dbc = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INLINE int prep(db_conn_t *dbc, MYSQL_STMT **st, const char *s) {
|
|
|
|
|
*st = mysql_stmt_init(dbc->mysql_conn);
|
|
|
|
|
@ -128,8 +143,14 @@ static bool db_wanted(void) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static db_conn_t *check_conn(void) {
|
|
|
|
|
if (db_conn)
|
|
|
|
|
return db_conn;
|
|
|
|
|
pthread_mutex_lock(&db_conns_lock);
|
|
|
|
|
if (db_conns.head) {
|
|
|
|
|
db_conn_t *ret = t_queue_pop_head(&db_conns);
|
|
|
|
|
pthread_mutex_unlock(&db_conns_lock);
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
pthread_mutex_unlock(&db_conns_lock);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dbg("connecting to MySQL");
|
|
|
|
|
|
|
|
|
|
@ -185,8 +206,6 @@ static db_conn_t *check_conn(void) {
|
|
|
|
|
|
|
|
|
|
dbg("Connection to MySQL established");
|
|
|
|
|
|
|
|
|
|
db_conn = dbc;
|
|
|
|
|
|
|
|
|
|
return dbc;
|
|
|
|
|
|
|
|
|
|
err:
|
|
|
|
|
@ -245,8 +264,8 @@ INLINE void my_ts(MYSQL_BIND *b, int64_t ts, double *d) {
|
|
|
|
|
static bool __execute_wrap(size_t stmt_offset, MYSQL_BIND *binds, unsigned long long *auto_id) {
|
|
|
|
|
int retr = 0;
|
|
|
|
|
while (1) {
|
|
|
|
|
db_conn_t *dbc;
|
|
|
|
|
MYSQL_STMT *stmt = NULL;
|
|
|
|
|
g_autoptr(db_conn_t) dbc;
|
|
|
|
|
if (!(dbc = check_conn()))
|
|
|
|
|
goto err;
|
|
|
|
|
stmt = G_STRUCT_MEMBER(MYSQL_STMT *, dbc, stmt_offset);
|
|
|
|
|
@ -270,11 +289,11 @@ err:
|
|
|
|
|
if (stmt)
|
|
|
|
|
ilog(LOG_ERR, "Failed to bind or execute prepared statement: %s",
|
|
|
|
|
mysql_stmt_error(stmt));
|
|
|
|
|
reset_conn(dbc);
|
|
|
|
|
clear_conn(&dbc);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
if (retr > 2) {
|
|
|
|
|
reset_conn(dbc);
|
|
|
|
|
clear_conn(&dbc);
|
|
|
|
|
if (!(dbc = check_conn()))
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
@ -429,9 +448,6 @@ static bool do_notify(notif_req_t *req) {
|
|
|
|
|
|
|
|
|
|
bool ok = execute_wrap(stm_close_stream, b, NULL);
|
|
|
|
|
|
|
|
|
|
// running in a thread pool, don't leave connection behind
|
|
|
|
|
reset_conn(db_conn);
|
|
|
|
|
|
|
|
|
|
return ok;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -498,8 +514,3 @@ void db_config_stream(output_t *op) {
|
|
|
|
|
|
|
|
|
|
execute_wrap(stm_config_stream, b, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void db_thread_end(void) {
|
|
|
|
|
reset_conn(db_conn);
|
|
|
|
|
db_conn = NULL;
|
|
|
|
|
}
|
|
|
|
|
|