|
|
|
|
@ -10,6 +10,7 @@
|
|
|
|
|
#include "tag.h"
|
|
|
|
|
#include "recaux.h"
|
|
|
|
|
#include "output.h"
|
|
|
|
|
#include "notify.h"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -376,39 +377,66 @@ void db_close_call(metafile_t *mf) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool db_close_stream(output_t *op) {
|
|
|
|
|
if (check_conn() || op->db_id == 0)
|
|
|
|
|
return !(output_storage & OUTPUT_STORAGE_DB); // error if DB storage is requested
|
|
|
|
|
|
|
|
|
|
int64_t now = now_us();
|
|
|
|
|
static bool do_notify(notif_req_t *req) {
|
|
|
|
|
if (check_conn())
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
|
|
str stream = STR_NULL;
|
|
|
|
|
MYSQL_BIND b[3];
|
|
|
|
|
bool ok = true;
|
|
|
|
|
|
|
|
|
|
content_t *content = NULL;
|
|
|
|
|
if ((output_storage & OUTPUT_STORAGE_DB)) {
|
|
|
|
|
content = output_get_content(op);
|
|
|
|
|
if (content)
|
|
|
|
|
stream = STR_GS(content->s);
|
|
|
|
|
else
|
|
|
|
|
ok = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int par_idx = 0;
|
|
|
|
|
double ts;
|
|
|
|
|
my_ts(&b[par_idx++], now, &ts);
|
|
|
|
|
if ((output_storage & OUTPUT_STORAGE_DB))
|
|
|
|
|
my_ts(&b[par_idx++], req->end_time, &ts);
|
|
|
|
|
if (req->content) {
|
|
|
|
|
str stream = STR_GS(req->content->s);
|
|
|
|
|
my_str(&b[par_idx++], &stream);
|
|
|
|
|
my_ull(&b[par_idx++], &op->db_id);
|
|
|
|
|
}
|
|
|
|
|
my_ull(&b[par_idx++], &req->db_id);
|
|
|
|
|
|
|
|
|
|
execute_wrap(&stm_close_stream, b, NULL);
|
|
|
|
|
bool ok = execute_wrap(&stm_close_stream, b, NULL);
|
|
|
|
|
|
|
|
|
|
obj_release(content);
|
|
|
|
|
// running in a thread pool, don't leave connection behind
|
|
|
|
|
reset_conn();
|
|
|
|
|
|
|
|
|
|
return ok;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void setup_notify(notif_req_t *req, output_t *o, metafile_t *mf, tag_t *tag) {
|
|
|
|
|
req->end_time = now_us();
|
|
|
|
|
if ((output_storage & OUTPUT_STORAGE_DB))
|
|
|
|
|
req->content = output_get_content(o);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void cleanup_notify(notif_req_t *req) {
|
|
|
|
|
obj_release(req->content);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static const notif_action_t db_action = {
|
|
|
|
|
.name = "DB",
|
|
|
|
|
.setup = setup_notify,
|
|
|
|
|
.perform = do_notify,
|
|
|
|
|
.cleanup = cleanup_notify,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
void db_close_stream(output_t *op) {
|
|
|
|
|
if (!c_mysql_host || !c_mysql_db)
|
|
|
|
|
return;
|
|
|
|
|
|
|
|
|
|
if (op->db_id == 0) {
|
|
|
|
|
if (!(output_storage & OUTPUT_STORAGE_DB))
|
|
|
|
|
return;
|
|
|
|
|
ilog(LOG_ERR, "DB storage requested but no entry exists");
|
|
|
|
|
content_t *content = output_get_content(op);
|
|
|
|
|
if (content)
|
|
|
|
|
output_content_failure(content);
|
|
|
|
|
obj_release(content);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
notify_push_setup(&db_action, op, NULL, NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void db_delete_stream(metafile_t *mf, output_t *op) {
|
|
|
|
|
if (check_conn())
|
|
|
|
|
return;
|
|
|
|
|
|