MT#56493 add support for HTTP notifications

Change-Id: I3ac04968279a5b750f1f7f4d4d06207e05e7d8a8
pull/1611/head
Richard Fuchs 2 years ago
parent f861d984aa
commit ff47e874ea

@ -54,3 +54,10 @@ table = 0
# output-chmod-dir = 0750
# output-chown = rtpengine
# output-chgrp = rtpengine
### HTTP notifications for finished recordings
# notify-uri = https://example.com/rec/finished
# notify-post = false
# notify-no-verify = false
# notify-concurrency = 5
# notify-retries = 10

@ -14,6 +14,7 @@ CFLAGS+= $(shell pkg-config --cflags libavfilter)
CFLAGS+= $(shell pkg-config --cflags opus)
CFLAGS+= $(shell mysql_config --cflags)
CFLAGS+= $(shell pkg-config --cflags openssl)
CFLAGS+= $(shell pkg-config --cflags libcurl)
LDLIBS= -lm -ldl
LDLIBS+= $(shell pkg-config --libs glib-2.0)
@ -26,11 +27,12 @@ LDLIBS+= $(shell pkg-config --libs libavfilter)
LDLIBS+= $(shell pkg-config --libs opus)
LDLIBS+= $(shell mysql_config --libs)
LDLIBS+= $(shell pkg-config --libs openssl)
LDLIBS+= $(shell pkg-config --libs libcurl)
include ../lib/g729.Makefile
SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c packet.c \
decoder.c output.c mix.c db.c log.c forward.c tag.c poller.c
decoder.c output.c mix.c db.c log.c forward.c tag.c poller.c notify.c
LIBSRCS= loglib.c auxlib.c rtplib.c codeclib.strhash.c resample.c str.c socket.c streambuf.c ssllib.c \
dtmflib.c
OBJS= $(SRCS:.c=.o) $(LIBSRCS:.c=.o)

@ -28,6 +28,7 @@
#include "codeclib.h"
#include "socket.h"
#include "ssllib.h"
#include "notify.h"
@ -57,6 +58,11 @@ char *forward_to = NULL;
static char *tls_send_to = NULL;
endpoint_t tls_send_to_ep;
int tls_resample = 8000;
char *notify_uri;
int notify_post;
int notify_nverify;
int notify_threads = 5;
int notify_retries = 10;
static GQueue threads = G_QUEUE_INIT; // only accessed from main thread
@ -153,6 +159,7 @@ static void wait_for_signal(void) {
static void cleanup(void) {
notify_cleanup();
garbage_collect_all();
metafile_cleanup();
inotify_cleanup();
@ -206,6 +213,11 @@ static void options(int *argc, char ***argv) {
{ "forward-to", 0, 0, G_OPTION_ARG_STRING, &forward_to, "Where to forward to (unix socket)", "PATH" },
{ "tls-send-to", 0, 0, G_OPTION_ARG_STRING, &tls_send_to, "Where to send to (TLS destination)", "IP:PORT" },
{ "tls-resample", 0, 0, G_OPTION_ARG_INT, &tls_resample, "Sampling rate for TLS PCM output", "INT" },
{ "notify-uri", 0, 0, G_OPTION_ARG_STRING, &notify_uri, "Notify destination for finished outputs","URI" },
{ "notify-post", 0, 0, G_OPTION_ARG_NONE, &notify_post, "Use POST instead of GET", NULL },
{ "notify-no-verify", 0, 0, G_OPTION_ARG_NONE, &notify_nverify,"Don't verify HTTPS peer certificate", NULL },
{ "notify-concurrency", 0, 0, G_OPTION_ARG_INT, &notify_threads,"How many simultaneous requests", "INT" },
{ "notify-retries", 0, 0, G_OPTION_ARG_INT, &notify_retries,"How many times to retry failed requesets","INT" },
{ NULL, }
};
@ -327,6 +339,7 @@ int main(int argc, char **argv) {
setup();
daemonize();
wpidfile();
notify_setup();
service_notify("READY=1\n");

@ -41,6 +41,11 @@ extern int c_mysql_port;
extern char *forward_to;
extern endpoint_t tls_send_to_ep;
extern int tls_resample;
extern char *notify_uri;
extern int notify_post;
extern int notify_nverify;
extern int notify_threads;
extern int notify_retries;
extern volatile int shutdown_flag;

@ -26,23 +26,25 @@ static void meta_free(void *ptr) {
metafile_t *mf = ptr;
dbg("freeing metafile info for %s%s%s", FMT_M(mf->name));
output_close(mf, mf->mix_out);
output_close(mf, mf->mix_out, NULL);
mix_destroy(mf->mix);
db_close_call(mf);
g_string_chunk_free(mf->gsc);
// SSRCs first as they have linked outputs which need to be closed first
if (mf->ssrc_hash)
g_hash_table_destroy(mf->ssrc_hash);
for (int i = 0; i < mf->streams->len; i++) {
stream_t *stream = g_ptr_array_index(mf->streams, i);
stream_close(stream); // should be closed already
stream_free(stream);
}
g_ptr_array_free(mf->streams, TRUE);
for (int i = 0; i < mf->tags->len; i++) {
tag_t *tag = g_ptr_array_index(mf->tags, i);
tag_free(tag);
}
g_ptr_array_free(mf->tags, TRUE);
if (mf->ssrc_hash)
g_hash_table_destroy(mf->ssrc_hash);
g_ptr_array_free(mf->streams, TRUE);
g_slice_free1(sizeof(*mf), mf);
}

@ -0,0 +1,299 @@
#include "notify.h"
#include <stdbool.h>
#include <curl/curl.h>
#include "main.h"
#include "log.h"
#include "recaux.h"
struct notif_req {
char *name; // just for logging
struct curl_slist *headers;
time_t retry_time;
unsigned int retries;
unsigned int falloff;
};
static GThreadPool *notify_threadpool;
static pthread_mutex_t timer_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t timer_cond = PTHREAD_COND_INITIALIZER;
static pthread_t notify_waiter;
static GTree *notify_timers;
static size_t dummy_write(char *ptr, size_t size, size_t nmemb, void *userdata) {
return size * nmemb;
}
static size_t dummy_read(char *ptr, size_t size, size_t nmemb, void *userdata) {
return 0;
}
static void do_notify(void *p, void *u) {
struct notif_req *req = p;
const char *err = NULL;
CURLcode ret;
ilog(LOG_DEBUG, "Launching HTTP notification for '%s%s%s'", FMT_M(req->name));
// set up the CURL request
CURL *c = curl_easy_init();
if (!c)
goto fail;
err = "setting CURLOPT_URL";
ret = curl_easy_setopt(c, CURLOPT_URL, notify_uri);
if (ret != CURLE_OK)
goto fail;
// no output
err = "setting CURLOPT_WRITEFUNCTION";
ret = curl_easy_setopt(c, CURLOPT_WRITEFUNCTION, dummy_write);
if (ret != CURLE_OK)
goto fail;
// no input
err = "setting CURLOPT_READFUNCTION";
ret = curl_easy_setopt(c, CURLOPT_READFUNCTION, dummy_read);
if (ret != CURLE_OK)
goto fail;
// allow redirects
err = "setting CURLOPT_FOLLOWLOCATION";
ret = curl_easy_setopt(c, CURLOPT_FOLLOWLOCATION, 1);
if (ret != CURLE_OK)
goto fail;
// max 5 redirects
err = "setting CURLOPT_MAXREDIRS";
ret = curl_easy_setopt(c, CURLOPT_MAXREDIRS, 5);
if (ret != CURLE_OK)
goto fail;
// add headers
err = "setting CURLOPT_HTTPHEADER";
ret = curl_easy_setopt(c, CURLOPT_HTTPHEADER, req->headers);
if (ret != CURLE_OK)
goto fail;
// POST vs GET
if (notify_post) {
err = "setting CURLOPT_POST";
ret = curl_easy_setopt(c, CURLOPT_POST, 1);
if (ret != CURLE_OK)
goto fail;
}
// cert verify (enabled by default)
if (notify_nverify) {
err = "setting CURLOPT_SSL_VERIFYPEER";
ret = curl_easy_setopt(c, CURLOPT_SSL_VERIFYPEER, 0);
if (ret != CURLE_OK)
goto fail;
}
err = "performing request";
ret = curl_easy_perform(c);
if (ret != CURLE_OK)
goto fail;
long code;
err = "getting CURLINFO_RESPONSE_CODE";
ret = curl_easy_getinfo(c, CURLINFO_RESPONSE_CODE, &code);
if (ret != CURLE_OK)
goto fail;
err = "checking response code (not 2xx)";
if (code < 200 || code >= 300)
goto fail;
// success
ilog(LOG_NOTICE, "HTTP notification for '%s%s%s' was successful", FMT_M(req->name));
goto cleanup;
fail:
if (c)
curl_easy_cleanup(c);
if (notify_retries >= 0 && req->retries < notify_retries) {
// schedule retry
req->retries++;
if (c)
ilog(LOG_DEBUG, "Failed to perform HTTP notification for '%s%s%s': "
"Error while %s: %s. Will retry in %u seconds (#%u)",
FMT_M(req->name),
err, curl_easy_strerror(ret),
req->falloff, req->retries);
else
ilog(LOG_DEBUG, "Failed to perform HTTP notification for '%s%s%s': "
"Failed to create CURL object. Will retry in %u seconds (#%u)",
FMT_M(req->name),
req->falloff, req->retries);
req->retry_time = time(NULL) + req->falloff;
req->falloff *= 2;
pthread_mutex_lock(&timer_lock);
g_tree_insert(notify_timers, req, req);
pthread_cond_signal(&timer_cond);
pthread_mutex_unlock(&timer_lock);
return;
}
if (c)
ilog(LOG_ERR, "Failed to perform HTTP notification for '%s%s%s' after %u retries: "
"Error while %s: %s",
FMT_M(req->name),
req->retries, err, curl_easy_strerror(ret));
else
ilog(LOG_ERR, "Failed to perform HTTP notification for '%s%s%s' after %u retries: "
"Failed to create CURL object",
FMT_M(req->name),
req->retries);
c = NULL;
goto cleanup;
cleanup:
if (c)
curl_easy_cleanup(c);
curl_slist_free_all(req->headers);
g_free(req->name);
g_slice_free1(sizeof(*req), req);
}
static void *notify_timer(void *p) {
pthread_mutex_lock(&timer_lock);
// notify_timers being NULL acts as our shutdown flag
while (notify_timers) {
ilog(LOG_DEBUG, "HTTP notification timer thread looping");
// grab first entry in list, check retry time, sleep if it's in the future
struct notif_req *first = g_tree_find_first(notify_timers, NULL, NULL);
if (!first) {
ilog(LOG_DEBUG, "No scheduled HTTP notification retries, sleeping");
pthread_cond_wait(&timer_cond, &timer_lock);
continue;
}
struct timeval now;
gettimeofday(&now, NULL);
if (now.tv_sec < first->retry_time) {
ilog(LOG_DEBUG, "Sleeping until next scheduled HTTP notification retry in %lu seconds",
(unsigned long) first->retry_time - now.tv_sec);
struct timespec ts = {.tv_sec = first->retry_time, .tv_nsec = 0};
pthread_cond_timedwait(&timer_cond, &timer_lock, &ts);
continue;
}
// first entry is ready to run
g_tree_remove(notify_timers, first);
ilog(LOG_DEBUG, "HTTP notification retry for '%s%s%s' is scheduled now", FMT_M(first->name));
g_thread_pool_push(notify_threadpool, first, NULL);
}
// clean up
pthread_mutex_unlock(&timer_lock);
pthread_mutex_destroy(&timer_lock);
pthread_cond_destroy(&timer_cond);
return NULL;
}
static int notify_req_cmp(const void *A, const void *B) {
const struct notif_req *a = A, *b = B;
if (a->retry_time < b->retry_time)
return -1;
if (a->retry_time > b->retry_time)
return 1;
if (a < b)
return -1;
if (a > b)
return 1;
return 0;
}
void notify_setup(void) {
if (!notify_uri || notify_threads <= 0)
return;
notify_threadpool = g_thread_pool_new(do_notify, NULL, notify_threads, false, NULL);
notify_timers = g_tree_new(notify_req_cmp);
pthread_create(&notify_waiter, NULL, notify_timer, NULL);
}
void notify_cleanup(void) {
if (notify_threadpool)
g_thread_pool_free(notify_threadpool, true, false);
if (notify_waiter && notify_timers) {
// get lock, free GTree, signal thread to shut down
pthread_mutex_lock(&timer_lock);
g_tree_destroy(notify_timers);
notify_timers = NULL;
pthread_cond_signal(&timer_cond);
pthread_mutex_unlock(&timer_lock);
}
}
__attribute__ ((format (printf, 2, 3)))
static void notify_add_header(struct notif_req *req, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
char *s = g_strdup_vprintf(fmt, ap);
req->headers = curl_slist_append(req->headers, s);
g_free(s);
va_end(ap);
}
void notify_push_output(output_t *o, metafile_t *mf, tag_t *tag) {
if (!notify_threadpool)
return;
struct notif_req *req = g_slice_alloc0(sizeof(*req));
req->name = g_strdup(o->file_name);
double now = now_double();
notify_add_header(req, "X-Recording-Call-ID: %s", mf->call_id);
notify_add_header(req, "X-Recording-File-Name: %s.%s", o->file_name, o->file_format);
notify_add_header(req, "X-Recording-Full-File-Name: %s.%s", o->full_filename, o->file_format);
notify_add_header(req, "X-Recording-File-Format: %s", o->file_format);
notify_add_header(req, "X-Recording-Kind: %s", o->kind);
notify_add_header(req, "X-Recording-Call-Start-Time: %.06f", mf->start_time);
notify_add_header(req, "X-Recording-Stream-Start-Time: %.06f", o->start_time);
notify_add_header(req, "X-Recording-Call-End-Time: %.06f", now);
notify_add_header(req, "X-Recording-Stream-End-Time: %.06f", now);
if (mf->db_id)
notify_add_header(req, "X-Recording-Call-DB-ID: %llu", mf->db_id);
if (o->db_id)
notify_add_header(req, "X-Recording-Stream-DB-ID: %llu", o->db_id);
if (mf->metadata)
notify_add_header(req, "X-Recording-Call-Metadata: %s", mf->metadata);
if (mf->metadata_db)
notify_add_header(req, "X-Recording-DB-Metadata: %s", mf->metadata_db);
if (tag) {
notify_add_header(req, "X-Recording-Tag: %s", tag->name);
if (tag->label)
notify_add_header(req, "X-Recording-Label: %s", tag->label);
if (tag->metadata)
notify_add_header(req, "X-Recording-Tag-Metadata: %s", tag->metadata);
}
req->falloff = 5; // initial retry time
g_thread_pool_push(notify_threadpool, req, NULL);
}

@ -0,0 +1,13 @@
#ifndef _NOTIFY_H_
#define _NOTIFY_H_
#include <glib.h>
#include "types.h"
void notify_setup(void);
void notify_cleanup(void);
void notify_push_output(output_t *, metafile_t *, tag_t *);
void notify_push_call(metafile_t *);
#endif

@ -11,6 +11,7 @@
#include "db.h"
#include "main.h"
#include "recaux.h"
#include "notify.h"
//static int output_codec_id;
@ -336,11 +337,13 @@ static bool output_shutdown(output_t *output) {
}
void output_close(metafile_t *mf, output_t *output) {
void output_close(metafile_t *mf, output_t *output, tag_t *tag) {
if (!output)
return;
if (output_shutdown(output))
if (output_shutdown(output)) {
db_close_stream(output);
notify_push_output(output, mf, tag);
}
else
db_delete_stream(mf, output);
encoder_free(output->encoder);

@ -12,7 +12,7 @@ void output_init(const char *format);
output_t *output_new(const char *path, const char *call, const char *type, const char *kind, const char *label);
output_t *output_new_from_full_path(const char *path, char *name, const char *kind);
void output_close(metafile_t *, output_t *);
void output_close(metafile_t *, output_t *, tag_t *);
int output_config(output_t *output, const format_t *requested_format, format_t *actual_format);
int output_add(output_t *output, AVFrame *frame);

@ -145,7 +145,7 @@ void ssrc_tls_state(ssrc_t *ssrc) {
void ssrc_close(ssrc_t *s) {
output_close(s->metafile, s->output);
output_close(s->metafile, s->output, tag_get(s->metafile, s->stream->tag));
s->output = NULL;
for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++) {
decoder_free(s->decoders[i]);

@ -306,6 +306,31 @@ Forward raw RTP packets to a Unix socket. Disabled by default.
Send decoded audio over a TCP TLS connection to the specified destination.
Audio is sent as raw mono 16-bit PCM in the given sample rate.
=item B<--notify-uri=>I<URI>
Enable HTTP notification about finished recordings to the specified URI, which
must be an HTTP or HTTPS URI. Information about the finished recording is
provided via custom HTTP headers, all of which use a prefix of B<X-Recording->.
=item B<--notify-post>
Use HTTP POST instead of GET for the HTTP notification requests. The request
body is empty even if POST is used.
=item B<--notify-no-verify>
Disable TLS peer certificate verification for HTTPS requests.
=item B<--notify-concurrency=>I<INT>
The maximum number of HTTP requests to perform simultaneously.
=item B<--notify-retries=>I<INT>
How many times to retry a failed HTTP notification before giving up. An
exponential falloff time is used for each subsequent attempt, starting with 5
seconds.
=back
=head1 EXIT STATUS

Loading…
Cancel
Save