From ff47e874ea2dbf74f570947aeeee758d6305a029 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 27 Jan 2023 08:01:07 -0500 Subject: [PATCH] MT#56493 add support for HTTP notifications Change-Id: I3ac04968279a5b750f1f7f4d4d06207e05e7d8a8 --- etc/rtpengine-recording.conf | 7 + recording-daemon/Makefile | 4 +- recording-daemon/main.c | 13 + recording-daemon/main.h | 5 + recording-daemon/metafile.c | 10 +- recording-daemon/notify.c | 299 +++++++++++++++++++++++ recording-daemon/notify.h | 13 + recording-daemon/output.c | 7 +- recording-daemon/output.h | 2 +- recording-daemon/packet.c | 2 +- recording-daemon/rtpengine-recording.pod | 25 ++ 11 files changed, 378 insertions(+), 9 deletions(-) create mode 100644 recording-daemon/notify.c create mode 100644 recording-daemon/notify.h diff --git a/etc/rtpengine-recording.conf b/etc/rtpengine-recording.conf index 0c4486142..4d790feb1 100644 --- a/etc/rtpengine-recording.conf +++ b/etc/rtpengine-recording.conf @@ -54,3 +54,10 @@ table = 0 # output-chmod-dir = 0750 # output-chown = rtpengine # output-chgrp = rtpengine + +### HTTP notifications for finished recordings +# notify-uri = https://example.com/rec/finished +# notify-post = false +# notify-no-verify = false +# notify-concurrency = 5 +# notify-retries = 10 diff --git a/recording-daemon/Makefile b/recording-daemon/Makefile index e20b26929..6cc677b62 100644 --- a/recording-daemon/Makefile +++ b/recording-daemon/Makefile @@ -14,6 +14,7 @@ CFLAGS+= $(shell pkg-config --cflags libavfilter) CFLAGS+= $(shell pkg-config --cflags opus) CFLAGS+= $(shell mysql_config --cflags) CFLAGS+= $(shell pkg-config --cflags openssl) +CFLAGS+= $(shell pkg-config --cflags libcurl) LDLIBS= -lm -ldl LDLIBS+= $(shell pkg-config --libs glib-2.0) @@ -26,11 +27,12 @@ LDLIBS+= $(shell pkg-config --libs libavfilter) LDLIBS+= $(shell pkg-config --libs opus) LDLIBS+= $(shell mysql_config --libs) LDLIBS+= $(shell pkg-config --libs openssl) +LDLIBS+= $(shell pkg-config --libs libcurl) include ../lib/g729.Makefile SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \ - decoder.c output.c mix.c db.c log.c forward.c tag.c poller.c + decoder.c output.c mix.c db.c log.c forward.c tag.c poller.c notify.c LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.strhash.c resample.c str.c socket.c streambuf.c ssllib.c \ dtmflib.c OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o) diff --git a/recording-daemon/main.c b/recording-daemon/main.c index f2720a250..0715dfa3d 100644 --- a/recording-daemon/main.c +++ b/recording-daemon/main.c @@ -28,6 +28,7 @@ #include "codeclib.h" #include "socket.h" #include "ssllib.h" +#include "notify.h" @@ -57,6 +58,11 @@ char *forward_to = NULL; static char *tls_send_to = NULL; endpoint_t tls_send_to_ep; int tls_resample = 8000; +char *notify_uri; +int notify_post; +int notify_nverify; +int notify_threads = 5; +int notify_retries = 10; static GQueue threads = G_QUEUE_INIT; // only accessed from main thread @@ -153,6 +159,7 @@ static void wait_for_signal(void) { static void cleanup(void) { + notify_cleanup(); garbage_collect_all(); metafile_cleanup(); inotify_cleanup(); @@ -206,6 +213,11 @@ static void options(int *argc, char ***argv) { { "forward-to", 0, 0, G_OPTION_ARG_STRING, &forward_to, "Where to forward to (unix socket)", "PATH" }, { "tls-send-to", 0, 0, G_OPTION_ARG_STRING, &tls_send_to, "Where to send to (TLS destination)", "IP:PORT" }, { "tls-resample", 0, 0, G_OPTION_ARG_INT, &tls_resample, "Sampling rate for TLS PCM output", "INT" }, + { "notify-uri", 0, 0, G_OPTION_ARG_STRING, ¬ify_uri, "Notify destination for finished outputs","URI" }, + { "notify-post", 0, 0, G_OPTION_ARG_NONE, ¬ify_post, "Use POST instead of GET", NULL }, + { "notify-no-verify", 0, 0, G_OPTION_ARG_NONE, ¬ify_nverify,"Don't verify HTTPS peer certificate", NULL }, + { "notify-concurrency", 0, 0, G_OPTION_ARG_INT, ¬ify_threads,"How many simultaneous requests", "INT" }, + { "notify-retries", 0, 0, G_OPTION_ARG_INT, ¬ify_retries,"How many times to retry failed requesets","INT" }, { NULL, } }; @@ -327,6 +339,7 @@ int main(int argc, char **argv) { setup(); daemonize(); wpidfile(); + notify_setup(); service_notify("READY=1\n"); diff --git a/recording-daemon/main.h b/recording-daemon/main.h index 222a456d7..8768d424b 100644 --- a/recording-daemon/main.h +++ b/recording-daemon/main.h @@ -41,6 +41,11 @@ extern int c_mysql_port; extern char *forward_to; extern endpoint_t tls_send_to_ep; extern int tls_resample; +extern char *notify_uri; +extern int notify_post; +extern int notify_nverify; +extern int notify_threads; +extern int notify_retries; extern volatile int shutdown_flag; diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c index 46a063bf1..c67deba58 100644 --- a/recording-daemon/metafile.c +++ b/recording-daemon/metafile.c @@ -26,23 +26,25 @@ static void meta_free(void *ptr) { metafile_t *mf = ptr; dbg("freeing metafile info for %s%s%s", FMT_M(mf->name)); - output_close(mf, mf->mix_out); + output_close(mf, mf->mix_out, NULL); mix_destroy(mf->mix); db_close_call(mf); g_string_chunk_free(mf->gsc); + // SSRCs first as they have linked outputs which need to be closed first + if (mf->ssrc_hash) + g_hash_table_destroy(mf->ssrc_hash); for (int i = 0; i < mf->streams->len; i++) { stream_t *stream = g_ptr_array_index(mf->streams, i); stream_close(stream); // should be closed already stream_free(stream); } - g_ptr_array_free(mf->streams, TRUE); for (int i = 0; i < mf->tags->len; i++) { tag_t *tag = g_ptr_array_index(mf->tags, i); tag_free(tag); } + g_ptr_array_free(mf->tags, TRUE); - if (mf->ssrc_hash) - g_hash_table_destroy(mf->ssrc_hash); + g_ptr_array_free(mf->streams, TRUE); g_slice_free1(sizeof(*mf), mf); } diff --git a/recording-daemon/notify.c b/recording-daemon/notify.c new file mode 100644 index 000000000..d2037e2a0 --- /dev/null +++ b/recording-daemon/notify.c @@ -0,0 +1,299 @@ +#include "notify.h" +#include +#include +#include "main.h" +#include "log.h" +#include "recaux.h" + + +struct notif_req { + char *name; // just for logging + struct curl_slist *headers; + + time_t retry_time; + unsigned int retries; + unsigned int falloff; +}; + + +static GThreadPool *notify_threadpool; + +static pthread_mutex_t timer_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t timer_cond = PTHREAD_COND_INITIALIZER; +static pthread_t notify_waiter; +static GTree *notify_timers; + + +static size_t dummy_write(char *ptr, size_t size, size_t nmemb, void *userdata) { + return size * nmemb; +} +static size_t dummy_read(char *ptr, size_t size, size_t nmemb, void *userdata) { + return 0; +} + +static void do_notify(void *p, void *u) { + struct notif_req *req = p; + const char *err = NULL; + CURLcode ret; + + ilog(LOG_DEBUG, "Launching HTTP notification for '%s%s%s'", FMT_M(req->name)); + + // set up the CURL request + + CURL *c = curl_easy_init(); + if (!c) + goto fail; + + err = "setting CURLOPT_URL"; + ret = curl_easy_setopt(c, CURLOPT_URL, notify_uri); + if (ret != CURLE_OK) + goto fail; + + // no output + err = "setting CURLOPT_WRITEFUNCTION"; + ret = curl_easy_setopt(c, CURLOPT_WRITEFUNCTION, dummy_write); + if (ret != CURLE_OK) + goto fail; + + // no input + err = "setting CURLOPT_READFUNCTION"; + ret = curl_easy_setopt(c, CURLOPT_READFUNCTION, dummy_read); + if (ret != CURLE_OK) + goto fail; + + // allow redirects + err = "setting CURLOPT_FOLLOWLOCATION"; + ret = curl_easy_setopt(c, CURLOPT_FOLLOWLOCATION, 1); + if (ret != CURLE_OK) + goto fail; + + // max 5 redirects + err = "setting CURLOPT_MAXREDIRS"; + ret = curl_easy_setopt(c, CURLOPT_MAXREDIRS, 5); + if (ret != CURLE_OK) + goto fail; + + // add headers + err = "setting CURLOPT_HTTPHEADER"; + ret = curl_easy_setopt(c, CURLOPT_HTTPHEADER, req->headers); + if (ret != CURLE_OK) + goto fail; + + // POST vs GET + if (notify_post) { + err = "setting CURLOPT_POST"; + ret = curl_easy_setopt(c, CURLOPT_POST, 1); + if (ret != CURLE_OK) + goto fail; + } + + // cert verify (enabled by default) + if (notify_nverify) { + err = "setting CURLOPT_SSL_VERIFYPEER"; + ret = curl_easy_setopt(c, CURLOPT_SSL_VERIFYPEER, 0); + if (ret != CURLE_OK) + goto fail; + } + + err = "performing request"; + ret = curl_easy_perform(c); + if (ret != CURLE_OK) + goto fail; + + long code; + err = "getting CURLINFO_RESPONSE_CODE"; + ret = curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &code); + if (ret != CURLE_OK) + goto fail; + + err = "checking response code (not 2xx)"; + if (code < 200 || code >= 300) + goto fail; + + // success + + ilog(LOG_NOTICE, "HTTP notification for '%s%s%s' was successful", FMT_M(req->name)); + goto cleanup; + +fail: + if (c) + curl_easy_cleanup(c); + + if (notify_retries >= 0 && req->retries < notify_retries) { + // schedule retry + req->retries++; + if (c) + ilog(LOG_DEBUG, "Failed to perform HTTP notification for '%s%s%s': " + "Error while %s: %s. Will retry in %u seconds (#%u)", + FMT_M(req->name), + err, curl_easy_strerror(ret), + req->falloff, req->retries); + else + ilog(LOG_DEBUG, "Failed to perform HTTP notification for '%s%s%s': " + "Failed to create CURL object. Will retry in %u seconds (#%u)", + FMT_M(req->name), + req->falloff, req->retries); + req->retry_time = time(NULL) + req->falloff; + req->falloff *= 2; + + pthread_mutex_lock(&timer_lock); + g_tree_insert(notify_timers, req, req); + pthread_cond_signal(&timer_cond); + pthread_mutex_unlock(&timer_lock); + + return; + } + + if (c) + ilog(LOG_ERR, "Failed to perform HTTP notification for '%s%s%s' after %u retries: " + "Error while %s: %s", + FMT_M(req->name), + req->retries, err, curl_easy_strerror(ret)); + else + ilog(LOG_ERR, "Failed to perform HTTP notification for '%s%s%s' after %u retries: " + "Failed to create CURL object", + FMT_M(req->name), + req->retries); + + c = NULL; + goto cleanup; + +cleanup: + if (c) + curl_easy_cleanup(c); + curl_slist_free_all(req->headers); + g_free(req->name); + g_slice_free1(sizeof(*req), req); +} + + +static void *notify_timer(void *p) { + pthread_mutex_lock(&timer_lock); + + // notify_timers being NULL acts as our shutdown flag + while (notify_timers) { + ilog(LOG_DEBUG, "HTTP notification timer thread looping"); + + // grab first entry in list, check retry time, sleep if it's in the future + + struct notif_req *first = g_tree_find_first(notify_timers, NULL, NULL); + if (!first) { + ilog(LOG_DEBUG, "No scheduled HTTP notification retries, sleeping"); + pthread_cond_wait(&timer_cond, &timer_lock); + continue; + } + struct timeval now; + gettimeofday(&now, NULL); + if (now.tv_sec < first->retry_time) { + ilog(LOG_DEBUG, "Sleeping until next scheduled HTTP notification retry in %lu seconds", + (unsigned long) first->retry_time - now.tv_sec); + struct timespec ts = {.tv_sec = first->retry_time, .tv_nsec = 0}; + pthread_cond_timedwait(&timer_cond, &timer_lock, &ts); + continue; + } + + // first entry is ready to run + + g_tree_remove(notify_timers, first); + ilog(LOG_DEBUG, "HTTP notification retry for '%s%s%s' is scheduled now", FMT_M(first->name)); + g_thread_pool_push(notify_threadpool, first, NULL); + } + + // clean up + + pthread_mutex_unlock(&timer_lock); + pthread_mutex_destroy(&timer_lock); + pthread_cond_destroy(&timer_cond); + + return NULL; +} + + +static int notify_req_cmp(const void *A, const void *B) { + const struct notif_req *a = A, *b = B; + + if (a->retry_time < b->retry_time) + return -1; + if (a->retry_time > b->retry_time) + return 1; + if (a < b) + return -1; + if (a > b) + return 1; + return 0; +} + + +void notify_setup(void) { + if (!notify_uri || notify_threads <= 0) + return; + + notify_threadpool = g_thread_pool_new(do_notify, NULL, notify_threads, false, NULL); + + notify_timers = g_tree_new(notify_req_cmp); + pthread_create(¬ify_waiter, NULL, notify_timer, NULL); +} + +void notify_cleanup(void) { + if (notify_threadpool) + g_thread_pool_free(notify_threadpool, true, false); + if (notify_waiter && notify_timers) { + // get lock, free GTree, signal thread to shut down + pthread_mutex_lock(&timer_lock); + g_tree_destroy(notify_timers); + notify_timers = NULL; + pthread_cond_signal(&timer_cond); + pthread_mutex_unlock(&timer_lock); + } +} + +__attribute__ ((format (printf, 2, 3))) +static void notify_add_header(struct notif_req *req, const char *fmt, ...) { + va_list ap; + va_start(ap, fmt); + char *s = g_strdup_vprintf(fmt, ap); + req->headers = curl_slist_append(req->headers, s); + g_free(s); + va_end(ap); +} + +void notify_push_output(output_t *o, metafile_t *mf, tag_t *tag) { + if (!notify_threadpool) + return; + + struct notif_req *req = g_slice_alloc0(sizeof(*req)); + + req->name = g_strdup(o->file_name); + double now = now_double(); + + notify_add_header(req, "X-Recording-Call-ID: %s", mf->call_id); + notify_add_header(req, "X-Recording-File-Name: %s.%s", o->file_name, o->file_format); + notify_add_header(req, "X-Recording-Full-File-Name: %s.%s", o->full_filename, o->file_format); + notify_add_header(req, "X-Recording-File-Format: %s", o->file_format); + notify_add_header(req, "X-Recording-Kind: %s", o->kind); + notify_add_header(req, "X-Recording-Call-Start-Time: %.06f", mf->start_time); + notify_add_header(req, "X-Recording-Stream-Start-Time: %.06f", o->start_time); + notify_add_header(req, "X-Recording-Call-End-Time: %.06f", now); + notify_add_header(req, "X-Recording-Stream-End-Time: %.06f", now); + + if (mf->db_id) + notify_add_header(req, "X-Recording-Call-DB-ID: %llu", mf->db_id); + if (o->db_id) + notify_add_header(req, "X-Recording-Stream-DB-ID: %llu", o->db_id); + if (mf->metadata) + notify_add_header(req, "X-Recording-Call-Metadata: %s", mf->metadata); + if (mf->metadata_db) + notify_add_header(req, "X-Recording-DB-Metadata: %s", mf->metadata_db); + + if (tag) { + notify_add_header(req, "X-Recording-Tag: %s", tag->name); + if (tag->label) + notify_add_header(req, "X-Recording-Label: %s", tag->label); + if (tag->metadata) + notify_add_header(req, "X-Recording-Tag-Metadata: %s", tag->metadata); + } + + req->falloff = 5; // initial retry time + + g_thread_pool_push(notify_threadpool, req, NULL); +} diff --git a/recording-daemon/notify.h b/recording-daemon/notify.h new file mode 100644 index 000000000..3b465434a --- /dev/null +++ b/recording-daemon/notify.h @@ -0,0 +1,13 @@ +#ifndef _NOTIFY_H_ +#define _NOTIFY_H_ + +#include +#include "types.h" + +void notify_setup(void); +void notify_cleanup(void); + +void notify_push_output(output_t *, metafile_t *, tag_t *); +void notify_push_call(metafile_t *); + +#endif diff --git a/recording-daemon/output.c b/recording-daemon/output.c index 69f833948..9d4ba9707 100644 --- a/recording-daemon/output.c +++ b/recording-daemon/output.c @@ -11,6 +11,7 @@ #include "db.h" #include "main.h" #include "recaux.h" +#include "notify.h" //static int output_codec_id; @@ -336,11 +337,13 @@ static bool output_shutdown(output_t *output) { } -void output_close(metafile_t *mf, output_t *output) { +void output_close(metafile_t *mf, output_t *output, tag_t *tag) { if (!output) return; - if (output_shutdown(output)) + if (output_shutdown(output)) { db_close_stream(output); + notify_push_output(output, mf, tag); + } else db_delete_stream(mf, output); encoder_free(output->encoder); diff --git a/recording-daemon/output.h b/recording-daemon/output.h index becef34bb..f4264b23a 100644 --- a/recording-daemon/output.h +++ b/recording-daemon/output.h @@ -12,7 +12,7 @@ void output_init(const char *format); output_t *output_new(const char *path, const char *call, const char *type, const char *kind, const char *label); output_t *output_new_from_full_path(const char *path, char *name, const char *kind); -void output_close(metafile_t *, output_t *); +void output_close(metafile_t *, output_t *, tag_t *); int output_config(output_t *output, const format_t *requested_format, format_t *actual_format); int output_add(output_t *output, AVFrame *frame); diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c index 7c06cc582..3220482c5 100644 --- a/recording-daemon/packet.c +++ b/recording-daemon/packet.c @@ -145,7 +145,7 @@ void ssrc_tls_state(ssrc_t *ssrc) { void ssrc_close(ssrc_t *s) { - output_close(s->metafile, s->output); + output_close(s->metafile, s->output, tag_get(s->metafile, s->stream->tag)); s->output = NULL; for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++) { decoder_free(s->decoders[i]); diff --git a/recording-daemon/rtpengine-recording.pod b/recording-daemon/rtpengine-recording.pod index bb2f5274e..d60e78d17 100644 --- a/recording-daemon/rtpengine-recording.pod +++ b/recording-daemon/rtpengine-recording.pod @@ -306,6 +306,31 @@ Forward raw RTP packets to a Unix socket. Disabled by default. Send decoded audio over a TCP TLS connection to the specified destination. Audio is sent as raw mono 16-bit PCM in the given sample rate. +=item B<--notify-uri=>I + +Enable HTTP notification about finished recordings to the specified URI, which +must be an HTTP or HTTPS URI. Information about the finished recording is +provided via custom HTTP headers, all of which use a prefix of B. + +=item B<--notify-post> + +Use HTTP POST instead of GET for the HTTP notification requests. The request +body is empty even if POST is used. + +=item B<--notify-no-verify> + +Disable TLS peer certificate verification for HTTPS requests. + +=item B<--notify-concurrency=>I + +The maximum number of HTTP requests to perform simultaneously. + +=item B<--notify-retries=>I + +How many times to retry a failed HTTP notification before giving up. An +exponential falloff time is used for each subsequent attempt, starting with 5 +seconds. + =back =head1 EXIT STATUS