MT#64939 rework SQL connection into struct/object

Change-Id: I578c625e97e2c8628cce62b89604e5dd9026ccc2
pull/1126/merge
Richard Fuchs 1 week ago
parent 4326969440
commit ab6a5886f9

@ -63,16 +63,23 @@ CREATE TABLE `recording_metakeys` (
static __thread MYSQL *mysql_conn;
static __thread MYSQL_STMT
*stm_insert_call,
*stm_close_call,
*stm_delete_call,
*stm_insert_stream,
*stm_close_stream,
*stm_delete_stream,
*stm_config_stream,
*stm_insert_metadata;
struct db_conn {
MYSQL *mysql_conn;
MYSQL_STMT
*stm_insert_call,
*stm_close_call,
*stm_delete_call,
*stm_insert_stream,
*stm_close_stream,
*stm_delete_stream,
*stm_config_stream,
*stm_insert_metadata;
};
typedef struct db_conn db_conn_t;
static __thread db_conn_t *db_conn;
static void my_stmt_close(MYSQL_STMT **st) {
@ -83,22 +90,28 @@ static void my_stmt_close(MYSQL_STMT **st) {
}
static void reset_conn(void) {
my_stmt_close(&stm_insert_call);
my_stmt_close(&stm_close_call);
my_stmt_close(&stm_delete_call);
my_stmt_close(&stm_insert_stream);
my_stmt_close(&stm_close_stream);
my_stmt_close(&stm_delete_stream);
my_stmt_close(&stm_config_stream);
my_stmt_close(&stm_insert_metadata);
mysql_close(mysql_conn);
mysql_conn = NULL;
static void reset_conn(db_conn_t *dbc) {
if (!dbc)
return;
if (dbc->mysql_conn) {
my_stmt_close(&dbc->stm_insert_call);
my_stmt_close(&dbc->stm_close_call);
my_stmt_close(&dbc->stm_delete_call);
my_stmt_close(&dbc->stm_insert_stream);
my_stmt_close(&dbc->stm_close_stream);
my_stmt_close(&dbc->stm_delete_stream);
my_stmt_close(&dbc->stm_config_stream);
my_stmt_close(&dbc->stm_insert_metadata);
mysql_close(dbc->mysql_conn);
}
g_free(dbc);
}
INLINE int prep(MYSQL_STMT **st, const char *s) {
*st = mysql_stmt_init(mysql_conn);
INLINE int prep(db_conn_t *dbc, MYSQL_STMT **st, const char *s) {
*st = mysql_stmt_init(dbc->mysql_conn);
if (!*st)
return -1;
if (mysql_stmt_prepare(*st, s, strlen(s))) {
@ -109,31 +122,33 @@ INLINE int prep(MYSQL_STMT **st, const char *s) {
}
static int check_conn(void) {
if (mysql_conn)
return 0;
static db_conn_t *check_conn(void) {
if (db_conn)
return db_conn;
if (!c_mysql_host || !c_mysql_db)
return -1;
return NULL;
dbg("connecting to MySQL");
mysql_conn = mysql_init(NULL);
if (!mysql_conn)
db_conn_t *dbc = g_new0(db_conn_t, 1);
dbc->mysql_conn = mysql_init(NULL);
if (!dbc->mysql_conn)
goto err;
if (!mysql_real_connect(mysql_conn, c_mysql_host, c_mysql_user, c_mysql_pass, c_mysql_db, c_mysql_port,
if (!mysql_real_connect(dbc->mysql_conn, c_mysql_host, c_mysql_user, c_mysql_pass, c_mysql_db, c_mysql_port,
NULL, CLIENT_IGNORE_SIGPIPE))
goto err;
if (mysql_select_db(mysql_conn, c_mysql_db))
if (mysql_select_db(dbc->mysql_conn, c_mysql_db))
goto err;
if (mysql_autocommit(mysql_conn, 0))
if (mysql_autocommit(dbc->mysql_conn, 0))
goto err;
if (prep(&stm_insert_call, "insert into recording_calls (call_id, start_timestamp, " \
if (prep(dbc, &dbc->stm_insert_call, "insert into recording_calls (call_id, start_timestamp, " \
"`status`) " \
"values " \
"(?,?,'recording')"))
goto err;
if (prep(&stm_insert_stream, "insert into recording_streams (`call`, local_filename, full_filename, " \
if (prep(dbc, &dbc->stm_insert_stream, "insert into recording_streams (`call`, local_filename, full_filename, " \
"file_format, " \
"output_type, " \
"stream_id, ssrc, " \
@ -141,43 +156,45 @@ static int check_conn(void) {
"start_timestamp) values " \
"(?,concat(?,'.',?),concat(?,'.',?),?,?,?,?,?,?)"))
goto err;
if (prep(&stm_close_call, "update recording_calls set " \
if (prep(dbc, &dbc->stm_close_call, "update recording_calls set " \
"end_timestamp = ?, status = 'completed' where id = ? " \
"and status != 'completed'"))
goto err;
if (prep(&stm_delete_call, "delete from recording_calls where id = ?"))
if (prep(dbc, &dbc->stm_delete_call, "delete from recording_calls where id = ?"))
goto err;
if ((output_storage & OUTPUT_STORAGE_DB)) {
if (prep(&stm_close_stream, "update recording_streams set " \
if (prep(dbc, &dbc->stm_close_stream, "update recording_streams set " \
"end_timestamp = ?, stream = ? where id = ?"))
goto err;
}
else {
if (prep(&stm_close_stream, "update recording_streams set " \
if (prep(dbc, &dbc->stm_close_stream, "update recording_streams set " \
"end_timestamp = ? where id = ?"))
goto err;
}
if (prep(&stm_delete_stream, "delete from recording_streams where id = ?"))
if (prep(dbc, &dbc->stm_delete_stream, "delete from recording_streams where id = ?"))
goto err;
if (prep(&stm_config_stream, "update recording_streams set channels = ?, sample_rate = ? where id = ?"))
if (prep(dbc, &dbc->stm_config_stream, "update recording_streams set channels = ?, sample_rate = ? where id = ?"))
goto err;
if (prep(&stm_insert_metadata, "insert into recording_metakeys (`call`, `key`, `value`) values " \
if (prep(dbc, &dbc->stm_insert_metadata, "insert into recording_metakeys (`call`, `key`, `value`) values " \
"(?,?,?)"))
goto err;
dbg("Connection to MySQL established");
return 0;
db_conn = dbc;
return dbc;
err:
if (mysql_conn) {
ilog(LOG_ERR, "Failed to connect to MySQL: %s", mysql_error(mysql_conn));
reset_conn();
}
if (dbc->mysql_conn)
ilog(LOG_ERR, "Failed to connect to MySQL: %s", mysql_error(dbc->mysql_conn));
else
ilog(LOG_ERR, "Failed to connect to MySQL: out of memory");
return -1;
reset_conn(dbc);
return NULL;
}
@ -225,18 +242,19 @@ INLINE void my_ts(MYSQL_BIND *b, int64_t ts, double *d) {
static bool execute_wrap(MYSQL_STMT **stmt, MYSQL_BIND *binds, unsigned long long *auto_id) {
int retr = 0;
while (1) {
if (check_conn())
db_conn_t *dbc;
if (!(dbc = check_conn()))
goto err;
if (mysql_stmt_bind_param(*stmt, binds))
goto err;
if (mysql_stmt_execute(*stmt))
goto err;
if (auto_id) {
*auto_id = mysql_insert_id(mysql_conn);
*auto_id = mysql_insert_id(dbc->mysql_conn);
if (*auto_id == 0)
goto err;
}
if (mysql_commit(mysql_conn))
if (mysql_commit(dbc->mysql_conn))
goto err;
return true;
@ -246,12 +264,12 @@ err:
// fatal
ilog(LOG_ERR, "Failed to bind or execute prepared statement: %s",
mysql_stmt_error(*stmt));
reset_conn();
reset_conn(dbc);
return false;
}
if (retr > 2) {
reset_conn();
if (check_conn())
reset_conn(dbc);
if (!(dbc = check_conn()))
return false;
}
@ -260,7 +278,7 @@ err:
}
static void db_do_call_id(metafile_t *mf) {
static void db_do_call_id(db_conn_t *dbc, metafile_t *mf) {
if (mf->db_id > 0)
return;
if (!mf->call_id)
@ -275,9 +293,9 @@ static void db_do_call_id(metafile_t *mf) {
double ts;
my_ts(&b[1], mf->start_time_us, &ts);
execute_wrap(&stm_insert_call, b, &mf->db_id);
execute_wrap(&dbc->stm_insert_call, b, &mf->db_id);
}
static void db_do_call_metadata(metafile_t *mf) {
static void db_do_call_metadata(db_conn_t *dbc, metafile_t *mf) {
if (mf->db_metadata_done)
return;
if (mf->db_id == 0)
@ -296,7 +314,7 @@ static void db_do_call_metadata(metafile_t *mf) {
my_str(&b[1], key);
my_str(&b[2], l->data);
execute_wrap(&stm_insert_metadata, b, NULL);
execute_wrap(&dbc->stm_insert_metadata, b, NULL);
}
}
@ -304,17 +322,19 @@ static void db_do_call_metadata(metafile_t *mf) {
}
void db_do_call(metafile_t *mf) {
if (check_conn())
db_conn_t *dbc;
if (!(dbc = check_conn()))
return;
db_do_call_id(mf);
db_do_call_metadata(mf);
db_do_call_id(dbc, mf);
db_do_call_metadata(dbc, mf);
}
// mf is locked
void db_do_stream(metafile_t *mf, output_t *op, stream_t *stream, unsigned long ssrc) {
if (check_conn())
db_conn_t *dbc;
if (!(dbc = check_conn()))
return;
if (mf->db_id == 0)
return;
@ -354,14 +374,15 @@ void db_do_stream(metafile_t *mf, output_t *op, stream_t *stream, unsigned long
double ts;
my_ts(&b[10], op->start_time_us, &ts);
execute_wrap(&stm_insert_stream, b, &op->db_id);
execute_wrap(&dbc->stm_insert_stream, b, &op->db_id);
if (op->db_id > 0)
mf->db_streams++;
}
void db_close_call(metafile_t *mf) {
if (check_conn())
db_conn_t *dbc;
if (!(dbc = check_conn()))
return;
if (mf->db_id == 0)
return;
@ -374,18 +395,19 @@ void db_close_call(metafile_t *mf) {
double ts;
my_ts(&b[0], now, &ts);
my_ull(&b[1], &mf->db_id);
execute_wrap(&stm_close_call, b, NULL);
execute_wrap(&dbc->stm_close_call, b, NULL);
}
else {
my_ull(&b[0], &mf->db_id);
execute_wrap(&stm_delete_call, b, NULL);
execute_wrap(&dbc->stm_delete_call, b, NULL);
mf->db_id = 0;
}
}
static bool do_notify(notif_req_t *req) {
if (check_conn())
db_conn_t *dbc;
if (!(dbc = check_conn()))
return false;
MYSQL_BIND b[3];
@ -399,10 +421,10 @@ static bool do_notify(notif_req_t *req) {
}
my_ull(&b[par_idx++], &req->db_id);
bool ok = execute_wrap(&stm_close_stream, b, NULL);
bool ok = execute_wrap(&dbc->stm_close_stream, b, NULL);
// running in a thread pool, don't leave connection behind
reset_conn();
reset_conn(dbc);
return ok;
}
@ -444,7 +466,8 @@ void db_close_stream(output_t *op) {
void db_delete_stream(metafile_t *mf, output_t *op) {
if (check_conn())
db_conn_t *dbc;
if (!(dbc = check_conn()))
return;
if (op->db_id == 0)
return;
@ -452,13 +475,14 @@ void db_delete_stream(metafile_t *mf, output_t *op) {
MYSQL_BIND b[1];
my_ull(&b[0], &op->db_id);
execute_wrap(&stm_delete_stream, b, NULL);
execute_wrap(&dbc->stm_delete_stream, b, NULL);
mf->db_streams--;
}
void db_config_stream(output_t *op) {
if (check_conn())
db_conn_t *dbc;
if (!(dbc = check_conn()))
return;
if (op->db_id == 0)
return;
@ -468,9 +492,10 @@ void db_config_stream(output_t *op) {
my_i(&b[1], &op->encoder->actual_format.clockrate);
my_ull(&b[2], &op->db_id);
execute_wrap(&stm_config_stream, b, NULL);
execute_wrap(&dbc->stm_config_stream, b, NULL);
}
void db_thread_end(void) {
reset_conn();
reset_conn(db_conn);
db_conn = NULL;
}

Loading…
Cancel
Save