TT#162850 delete DB entries without matching streams

Keep track of how many streams have been added to the DB for each call.
Then at the end of the call, if there are no corresponding streams in
the DB, delete the entry for the call instead of marking it as complete.
This eliminates DB entries for calls that were cancelled or failed.

Change-Id: Id2eccd555a0801b4a8cc56299fe49ea2899c4826
mr10.4
Richard Fuchs 4 years ago
parent 16583ef76e
commit e46fdf0200

@ -57,6 +57,7 @@ static __thread MYSQL *mysql_conn;
static __thread MYSQL_STMT
*stm_insert_call,
*stm_close_call,
*stm_delete_call,
*stm_insert_stream,
*stm_close_stream,
*stm_delete_stream,
@ -75,6 +76,7 @@ static void my_stmt_close(MYSQL_STMT **st) {
static void reset_conn(void) {
my_stmt_close(&stm_insert_call);
my_stmt_close(&stm_close_call);
my_stmt_close(&stm_delete_call);
my_stmt_close(&stm_insert_stream);
my_stmt_close(&stm_close_stream);
my_stmt_close(&stm_delete_stream);
@ -130,7 +132,10 @@ static int check_conn(void) {
"(?,concat(?,'.',?),concat(?,'.',?),?,?,?,?,?,?)"))
goto err;
if (prep(&stm_close_call, "update recording_calls set " \
"end_timestamp = ?, status = 'completed' where id = ?"))
"end_timestamp = ?, status = 'completed' where id = ? " \
"and status != 'completed'"))
goto err;
if (prep(&stm_delete_call, "delete from recording_calls where id = ?"))
goto err;
if ((output_storage & OUTPUT_STORAGE_DB)) {
if (prep(&stm_close_stream, "update recording_streams set " \
@ -346,6 +351,9 @@ void db_do_stream(metafile_t *mf, output_t *op, const char *type, stream_t *stre
my_d(&b[10], &now);
execute_wrap(&stm_insert_stream, b, &op->db_id);
if (op->db_id > 0)
mf->db_streams++;
}
void db_close_call(metafile_t *mf) {
@ -357,10 +365,17 @@ void db_close_call(metafile_t *mf) {
double now = now_double();
MYSQL_BIND b[2];
my_d(&b[0], &now);
my_ull(&b[1], &mf->db_id);
execute_wrap(&stm_close_call, b, NULL);
if (mf->db_streams > 0) {
my_d(&b[0], &now);
my_ull(&b[1], &mf->db_id);
execute_wrap(&stm_close_call, b, NULL);
}
else {
my_ull(&b[0], &mf->db_id);
execute_wrap(&stm_delete_call, b, NULL);
mf->db_id = 0;
}
}
void db_close_stream(output_t *op) {
@ -427,7 +442,7 @@ file:;
ilog(LOG_ERR, "Failed to delete file '%s': %s", op->filename, strerror(errno));
}
void db_delete_stream(output_t *op) {
void db_delete_stream(metafile_t *mf, output_t *op) {
if (check_conn())
return;
if (op->db_id == 0)
@ -437,6 +452,8 @@ void db_delete_stream(output_t *op) {
my_ull(&b[0], &op->db_id);
execute_wrap(&stm_delete_stream, b, NULL);
mf->db_streams--;
}
void db_config_stream(output_t *op) {

@ -8,7 +8,7 @@ void db_do_call(metafile_t *);
void db_close_call(metafile_t *);
void db_do_stream(metafile_t *mf, output_t *op, const char *type, stream_t *, unsigned long ssrc);
void db_close_stream(output_t *op);
void db_delete_stream(output_t *op);
void db_delete_stream(metafile_t *, output_t *op);
void db_config_stream(output_t *op);
void db_thread_end(void);

@ -26,8 +26,9 @@ static void meta_free(void *ptr) {
metafile_t *mf = ptr;
dbg("freeing metafile info for %s%s%s", FMT_M(mf->name));
output_close(mf->mix_out);
output_close(mf, mf->mix_out);
mix_destroy(mf->mix);
db_close_call(mf);
g_string_chunk_free(mf->gsc);
for (int i = 0; i < mf->streams->len; i++) {
stream_t *stream = g_ptr_array_index(mf->streams, i);

@ -329,13 +329,13 @@ static int output_shutdown(output_t *output) {
}
void output_close(output_t *output) {
void output_close(metafile_t *mf, output_t *output) {
if (!output)
return;
if (output_shutdown(output))
db_close_stream(output);
else
db_delete_stream(output);
db_delete_stream(mf, output);
encoder_free(output->encoder);
g_clear_pointer(&output->full_filename, g_free);
g_clear_pointer(&output->file_path, g_free);

@ -12,7 +12,7 @@ void output_init(const char *format);
output_t *output_new(const char *path, const char *call, const char *type, const char *label);
output_t *output_new_from_full_path(const char *path, char *name);
void output_close(output_t *);
void output_close(metafile_t *, output_t *);
int output_config(output_t *output, const format_t *requested_format, format_t *actual_format);
int output_add(output_t *output, AVFrame *frame);

@ -145,7 +145,7 @@ void ssrc_tls_state(ssrc_t *ssrc) {
void ssrc_free(void *p) {
ssrc_t *s = p;
packet_sequencer_destroy(&s->sequencer);
output_close(s->output);
output_close(s->metafile, s->output);
for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++)
decoder_free(s->decoders[i]);
if (s->tls_fwd_stream)

@ -114,6 +114,7 @@ struct metafile_s {
char *output_dest;
off_t pos;
unsigned long long db_id;
unsigned int db_streams;
GStringChunk *gsc; // XXX limit max size

Loading…
Cancel
Save