diff --git a/recording-daemon/db.c b/recording-daemon/db.c index 8199dd34f..f15f5f1ff 100644 --- a/recording-daemon/db.c +++ b/recording-daemon/db.c @@ -63,16 +63,23 @@ CREATE TABLE `recording_metakeys` ( -static __thread MYSQL *mysql_conn; -static __thread MYSQL_STMT - *stm_insert_call, - *stm_close_call, - *stm_delete_call, - *stm_insert_stream, - *stm_close_stream, - *stm_delete_stream, - *stm_config_stream, - *stm_insert_metadata; +struct db_conn { + MYSQL *mysql_conn; + + MYSQL_STMT + *stm_insert_call, + *stm_close_call, + *stm_delete_call, + *stm_insert_stream, + *stm_close_stream, + *stm_delete_stream, + *stm_config_stream, + *stm_insert_metadata; +}; +typedef struct db_conn db_conn_t; + + +static __thread db_conn_t *db_conn; static void my_stmt_close(MYSQL_STMT **st) { @@ -83,22 +90,28 @@ static void my_stmt_close(MYSQL_STMT **st) { } -static void reset_conn(void) { - my_stmt_close(&stm_insert_call); - my_stmt_close(&stm_close_call); - my_stmt_close(&stm_delete_call); - my_stmt_close(&stm_insert_stream); - my_stmt_close(&stm_close_stream); - my_stmt_close(&stm_delete_stream); - my_stmt_close(&stm_config_stream); - my_stmt_close(&stm_insert_metadata); - mysql_close(mysql_conn); - mysql_conn = NULL; +static void reset_conn(db_conn_t *dbc) { + if (!dbc) + return; + + if (dbc->mysql_conn) { + my_stmt_close(&dbc->stm_insert_call); + my_stmt_close(&dbc->stm_close_call); + my_stmt_close(&dbc->stm_delete_call); + my_stmt_close(&dbc->stm_insert_stream); + my_stmt_close(&dbc->stm_close_stream); + my_stmt_close(&dbc->stm_delete_stream); + my_stmt_close(&dbc->stm_config_stream); + my_stmt_close(&dbc->stm_insert_metadata); + mysql_close(dbc->mysql_conn); + } + + g_free(dbc); } -INLINE int prep(MYSQL_STMT **st, const char *s) { - *st = mysql_stmt_init(mysql_conn); +INLINE int prep(db_conn_t *dbc, MYSQL_STMT **st, const char *s) { + *st = mysql_stmt_init(dbc->mysql_conn); if (!*st) return -1; if (mysql_stmt_prepare(*st, s, strlen(s))) { @@ -109,31 +122,33 @@ INLINE int prep(MYSQL_STMT **st, const char *s) { } -static int check_conn(void) { - if (mysql_conn) - return 0; +static db_conn_t *check_conn(void) { + if (db_conn) + return db_conn; if (!c_mysql_host || !c_mysql_db) - return -1; + return NULL; dbg("connecting to MySQL"); - mysql_conn = mysql_init(NULL); - if (!mysql_conn) + db_conn_t *dbc = g_new0(db_conn_t, 1); + + dbc->mysql_conn = mysql_init(NULL); + if (!dbc->mysql_conn) goto err; - if (!mysql_real_connect(mysql_conn, c_mysql_host, c_mysql_user, c_mysql_pass, c_mysql_db, c_mysql_port, + if (!mysql_real_connect(dbc->mysql_conn, c_mysql_host, c_mysql_user, c_mysql_pass, c_mysql_db, c_mysql_port, NULL, CLIENT_IGNORE_SIGPIPE)) goto err; - if (mysql_select_db(mysql_conn, c_mysql_db)) + if (mysql_select_db(dbc->mysql_conn, c_mysql_db)) goto err; - if (mysql_autocommit(mysql_conn, 0)) + if (mysql_autocommit(dbc->mysql_conn, 0)) goto err; - if (prep(&stm_insert_call, "insert into recording_calls (call_id, start_timestamp, " \ + if (prep(dbc, &dbc->stm_insert_call, "insert into recording_calls (call_id, start_timestamp, " \ "`status`) " \ "values " \ "(?,?,'recording')")) goto err; - if (prep(&stm_insert_stream, "insert into recording_streams (`call`, local_filename, full_filename, " \ + if (prep(dbc, &dbc->stm_insert_stream, "insert into recording_streams (`call`, local_filename, full_filename, " \ "file_format, " \ "output_type, " \ "stream_id, ssrc, " \ @@ -141,43 +156,45 @@ static int check_conn(void) { "start_timestamp) values " \ "(?,concat(?,'.',?),concat(?,'.',?),?,?,?,?,?,?)")) goto err; - if (prep(&stm_close_call, "update recording_calls set " \ + if (prep(dbc, &dbc->stm_close_call, "update recording_calls set " \ "end_timestamp = ?, status = 'completed' where id = ? " \ "and status != 'completed'")) goto err; - if (prep(&stm_delete_call, "delete from recording_calls where id = ?")) + if (prep(dbc, &dbc->stm_delete_call, "delete from recording_calls where id = ?")) goto err; if ((output_storage & OUTPUT_STORAGE_DB)) { - if (prep(&stm_close_stream, "update recording_streams set " \ + if (prep(dbc, &dbc->stm_close_stream, "update recording_streams set " \ "end_timestamp = ?, stream = ? where id = ?")) goto err; } else { - if (prep(&stm_close_stream, "update recording_streams set " \ + if (prep(dbc, &dbc->stm_close_stream, "update recording_streams set " \ "end_timestamp = ? where id = ?")) goto err; } - if (prep(&stm_delete_stream, "delete from recording_streams where id = ?")) + if (prep(dbc, &dbc->stm_delete_stream, "delete from recording_streams where id = ?")) goto err; - if (prep(&stm_config_stream, "update recording_streams set channels = ?, sample_rate = ? where id = ?")) + if (prep(dbc, &dbc->stm_config_stream, "update recording_streams set channels = ?, sample_rate = ? where id = ?")) goto err; - if (prep(&stm_insert_metadata, "insert into recording_metakeys (`call`, `key`, `value`) values " \ + if (prep(dbc, &dbc->stm_insert_metadata, "insert into recording_metakeys (`call`, `key`, `value`) values " \ "(?,?,?)")) goto err; dbg("Connection to MySQL established"); - return 0; + db_conn = dbc; + + return dbc; err: - if (mysql_conn) { - ilog(LOG_ERR, "Failed to connect to MySQL: %s", mysql_error(mysql_conn)); - reset_conn(); - } + if (dbc->mysql_conn) + ilog(LOG_ERR, "Failed to connect to MySQL: %s", mysql_error(dbc->mysql_conn)); else ilog(LOG_ERR, "Failed to connect to MySQL: out of memory"); - return -1; + reset_conn(dbc); + + return NULL; } @@ -225,18 +242,19 @@ INLINE void my_ts(MYSQL_BIND *b, int64_t ts, double *d) { static bool execute_wrap(MYSQL_STMT **stmt, MYSQL_BIND *binds, unsigned long long *auto_id) { int retr = 0; while (1) { - if (check_conn()) + db_conn_t *dbc; + if (!(dbc = check_conn())) goto err; if (mysql_stmt_bind_param(*stmt, binds)) goto err; if (mysql_stmt_execute(*stmt)) goto err; if (auto_id) { - *auto_id = mysql_insert_id(mysql_conn); + *auto_id = mysql_insert_id(dbc->mysql_conn); if (*auto_id == 0) goto err; } - if (mysql_commit(mysql_conn)) + if (mysql_commit(dbc->mysql_conn)) goto err; return true; @@ -246,12 +264,12 @@ err: // fatal ilog(LOG_ERR, "Failed to bind or execute prepared statement: %s", mysql_stmt_error(*stmt)); - reset_conn(); + reset_conn(dbc); return false; } if (retr > 2) { - reset_conn(); - if (check_conn()) + reset_conn(dbc); + if (!(dbc = check_conn())) return false; } @@ -260,7 +278,7 @@ err: } -static void db_do_call_id(metafile_t *mf) { +static void db_do_call_id(db_conn_t *dbc, metafile_t *mf) { if (mf->db_id > 0) return; if (!mf->call_id) @@ -275,9 +293,9 @@ static void db_do_call_id(metafile_t *mf) { double ts; my_ts(&b[1], mf->start_time_us, &ts); - execute_wrap(&stm_insert_call, b, &mf->db_id); + execute_wrap(&dbc->stm_insert_call, b, &mf->db_id); } -static void db_do_call_metadata(metafile_t *mf) { +static void db_do_call_metadata(db_conn_t *dbc, metafile_t *mf) { if (mf->db_metadata_done) return; if (mf->db_id == 0) @@ -296,7 +314,7 @@ static void db_do_call_metadata(metafile_t *mf) { my_str(&b[1], key); my_str(&b[2], l->data); - execute_wrap(&stm_insert_metadata, b, NULL); + execute_wrap(&dbc->stm_insert_metadata, b, NULL); } } @@ -304,17 +322,19 @@ static void db_do_call_metadata(metafile_t *mf) { } void db_do_call(metafile_t *mf) { - if (check_conn()) + db_conn_t *dbc; + if (!(dbc = check_conn())) return; - db_do_call_id(mf); - db_do_call_metadata(mf); + db_do_call_id(dbc, mf); + db_do_call_metadata(dbc, mf); } // mf is locked void db_do_stream(metafile_t *mf, output_t *op, stream_t *stream, unsigned long ssrc) { - if (check_conn()) + db_conn_t *dbc; + if (!(dbc = check_conn())) return; if (mf->db_id == 0) return; @@ -354,14 +374,15 @@ void db_do_stream(metafile_t *mf, output_t *op, stream_t *stream, unsigned long double ts; my_ts(&b[10], op->start_time_us, &ts); - execute_wrap(&stm_insert_stream, b, &op->db_id); + execute_wrap(&dbc->stm_insert_stream, b, &op->db_id); if (op->db_id > 0) mf->db_streams++; } void db_close_call(metafile_t *mf) { - if (check_conn()) + db_conn_t *dbc; + if (!(dbc = check_conn())) return; if (mf->db_id == 0) return; @@ -374,18 +395,19 @@ void db_close_call(metafile_t *mf) { double ts; my_ts(&b[0], now, &ts); my_ull(&b[1], &mf->db_id); - execute_wrap(&stm_close_call, b, NULL); + execute_wrap(&dbc->stm_close_call, b, NULL); } else { my_ull(&b[0], &mf->db_id); - execute_wrap(&stm_delete_call, b, NULL); + execute_wrap(&dbc->stm_delete_call, b, NULL); mf->db_id = 0; } } static bool do_notify(notif_req_t *req) { - if (check_conn()) + db_conn_t *dbc; + if (!(dbc = check_conn())) return false; MYSQL_BIND b[3]; @@ -399,10 +421,10 @@ static bool do_notify(notif_req_t *req) { } my_ull(&b[par_idx++], &req->db_id); - bool ok = execute_wrap(&stm_close_stream, b, NULL); + bool ok = execute_wrap(&dbc->stm_close_stream, b, NULL); // running in a thread pool, don't leave connection behind - reset_conn(); + reset_conn(dbc); return ok; } @@ -444,7 +466,8 @@ void db_close_stream(output_t *op) { void db_delete_stream(metafile_t *mf, output_t *op) { - if (check_conn()) + db_conn_t *dbc; + if (!(dbc = check_conn())) return; if (op->db_id == 0) return; @@ -452,13 +475,14 @@ void db_delete_stream(metafile_t *mf, output_t *op) { MYSQL_BIND b[1]; my_ull(&b[0], &op->db_id); - execute_wrap(&stm_delete_stream, b, NULL); + execute_wrap(&dbc->stm_delete_stream, b, NULL); mf->db_streams--; } void db_config_stream(output_t *op) { - if (check_conn()) + db_conn_t *dbc; + if (!(dbc = check_conn())) return; if (op->db_id == 0) return; @@ -468,9 +492,10 @@ void db_config_stream(output_t *op) { my_i(&b[1], &op->encoder->actual_format.clockrate); my_ull(&b[2], &op->db_id); - execute_wrap(&stm_config_stream, b, NULL); + execute_wrap(&dbc->stm_config_stream, b, NULL); } void db_thread_end(void) { - reset_conn(); + reset_conn(db_conn); + db_conn = NULL; }