mirror of https://github.com/sipwise/rtpengine.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
596 lines
16 KiB
596 lines
16 KiB
#ifdef HAVE_MQTT
|
|
|
|
#include "mqtt.h"
|
|
|
|
#include <mosquitto.h>
|
|
#include <errno.h>
|
|
#include <string.h>
|
|
#include <stdbool.h>
|
|
#include <glib.h>
|
|
#include <glib-object.h>
|
|
#include <json-glib/json-glib.h>
|
|
|
|
#include "main.h"
|
|
#include "log.h"
|
|
#include "log_funcs.h"
|
|
#include "call.h"
|
|
#include "ssrc.h"
|
|
#include "rtplib.h"
|
|
#include "media_player.h"
|
|
|
|
|
|
|
|
static struct mosquitto *mosq;
|
|
static bool is_connected = false;
|
|
|
|
static struct interface_sampled_rate_stats interface_rate_stats;
|
|
|
|
|
|
static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media);
|
|
|
|
|
|
|
|
int mqtt_init(void) {
|
|
interface_sampled_rate_stats_init(&interface_rate_stats);
|
|
|
|
mosq = mosquitto_new(rtpe_config.mqtt_id, true, NULL);
|
|
if (!mosq) {
|
|
ilog(LOG_ERR, "Failed to create mosquitto client instance: %s", strerror(errno));
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
static int mqtt_connect(void) {
|
|
ilog(LOG_DEBUG, "Connecting to mosquitto...");
|
|
|
|
mosquitto_disconnect(mosq);
|
|
|
|
int ret = mosquitto_reinitialise(mosq, rtpe_config.mqtt_id, true, NULL);
|
|
if (ret) {
|
|
ilog(LOG_ERR, "Failed to initialise mosquitto client instance: %s", mosquitto_strerror(ret));
|
|
return -1;
|
|
}
|
|
|
|
mosquitto_threaded_set(mosq, true);
|
|
|
|
if (rtpe_config.mqtt_user) {
|
|
ret = mosquitto_username_pw_set(mosq, rtpe_config.mqtt_user, rtpe_config.mqtt_pass);
|
|
if (ret != MOSQ_ERR_SUCCESS) {
|
|
ilog(LOG_ERR, "Failed to set mosquitto user/pass auth: %s", mosquitto_strerror(errno));
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
if (rtpe_config.mqtt_cafile || rtpe_config.mqtt_capath) {
|
|
ret = mosquitto_tls_set(mosq, rtpe_config.mqtt_cafile, rtpe_config.mqtt_capath,
|
|
rtpe_config.mqtt_certfile, rtpe_config.mqtt_keyfile, NULL);
|
|
if (ret != MOSQ_ERR_SUCCESS) {
|
|
ilog(LOG_ERR, "Failed to set mosquitto TLS options: %s", mosquitto_strerror(errno));
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
if (rtpe_config.mqtt_tls_alpn) {
|
|
#if LIBMOSQUITTO_VERSION_NUMBER >= 1006000
|
|
ret = mosquitto_string_option(mosq, MOSQ_OPT_TLS_ALPN, rtpe_config.mqtt_tls_alpn);
|
|
if (ret != MOSQ_ERR_SUCCESS) {
|
|
ilog(LOG_ERR, "Failed to set mosquitto TLS ALPN options: %s", mosquitto_strerror(errno));
|
|
return -1;
|
|
}
|
|
#else
|
|
ilog(LOG_WARN, "Cannot set mqtt TLS ALPN due to outdated mosquitto library");
|
|
#endif
|
|
}
|
|
|
|
ret = mosquitto_connect(mosq, rtpe_config.mqtt_host, rtpe_config.mqtt_port,
|
|
rtpe_config.mqtt_keepalive);
|
|
if (ret != MOSQ_ERR_SUCCESS) {
|
|
ilog(LOG_ERR, "Failed to connect to mosquitto broker: %s", mosquitto_strerror(ret));
|
|
return -1;
|
|
}
|
|
|
|
ilog(LOG_DEBUG, "Successfully connected to mosquitto");
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void mqtt_loop(void *dummy) {
|
|
while (!rtpe_shutdown) {
|
|
while (!is_connected && !rtpe_shutdown) {
|
|
if (!mqtt_connect()) {
|
|
is_connected = true;
|
|
break;
|
|
}
|
|
usleep(1000000);
|
|
}
|
|
|
|
unsigned int errors = 0;
|
|
while (!rtpe_shutdown) {
|
|
int ret = mosquitto_loop(mosq, 100, 1);
|
|
if (ret == MOSQ_ERR_SUCCESS) {
|
|
errors = 0;
|
|
continue;
|
|
}
|
|
if (ret == MOSQ_ERR_ERRNO)
|
|
ilog(LOG_ERR, "Error from mosquitto: %s", strerror(errno));
|
|
else
|
|
ilog(LOG_ERR, "Error from mosquitto: %s", mosquitto_strerror(ret));
|
|
errors++;
|
|
if (errors >= 5) {
|
|
ilog(LOG_WARN, "Reconnecting to mosquitto");
|
|
break;
|
|
}
|
|
}
|
|
|
|
mosquitto_disconnect(mosq);
|
|
is_connected = false;
|
|
}
|
|
|
|
mosquitto_destroy(mosq);
|
|
mosq = NULL;
|
|
}
|
|
|
|
|
|
int mqtt_publish_scope(void) {
|
|
if (!mosq)
|
|
return MPS_NONE;
|
|
return rtpe_config.mqtt_publish_scope;
|
|
}
|
|
|
|
|
|
void mqtt_publish(char *s) {
|
|
ilog(LOG_DEBUG, "Publishing to mosquitto: %s%s%s", FMT_M(s));
|
|
|
|
int ret = mosquitto_publish(mosq, NULL, rtpe_config.mqtt_publish_topic, strlen(s), s,
|
|
rtpe_config.mqtt_publish_qos,
|
|
false);
|
|
if (ret != MOSQ_ERR_SUCCESS)
|
|
ilog(LOG_WARN | LOG_FLAG_LIMIT, "Error publishing message to mosquitto: %s",
|
|
mosquitto_strerror(ret));
|
|
g_free(s);
|
|
}
|
|
|
|
|
|
static void mqtt_call_stats(call_t *call, JsonBuilder *json) {
|
|
json_builder_set_member_name(json, "call_id");
|
|
glib_json_builder_add_str(json, &call->callid);
|
|
}
|
|
|
|
|
|
static void mqtt_monologue_stats(struct call_monologue *ml, JsonBuilder *json) {
|
|
if (ml->tag.len) {
|
|
json_builder_set_member_name(json, "tag");
|
|
glib_json_builder_add_str(json, &ml->tag);
|
|
}
|
|
|
|
if (ml->label.len) {
|
|
json_builder_set_member_name(json, "label");
|
|
glib_json_builder_add_str(json, &ml->label);
|
|
}
|
|
|
|
#ifdef WITH_TRANSCODING
|
|
struct media_player *mp = ml->player;
|
|
if (mp) {
|
|
mutex_lock(&mp->lock);
|
|
|
|
json_builder_set_member_name(json, "media_player");
|
|
|
|
json_builder_begin_object(json);
|
|
|
|
json_builder_set_member_name(json, "duration");
|
|
json_builder_add_int_value(json, mp->coder.duration);
|
|
json_builder_set_member_name(json, "repeat");
|
|
json_builder_add_int_value(json, mp->opts.repeat);
|
|
json_builder_set_member_name(json, "frame_time");
|
|
json_builder_add_int_value(json, mp->last_frame_ts);
|
|
|
|
if (mp->ssrc_out && mp->media) {
|
|
json_builder_set_member_name(json, "SSRC");
|
|
json_builder_begin_object(json);
|
|
mqtt_ssrc_stats(mp->ssrc_out, json, mp->media);
|
|
json_builder_end_object(json);
|
|
}
|
|
|
|
json_builder_end_object(json);
|
|
|
|
mutex_unlock(&mp->lock);
|
|
}
|
|
#endif
|
|
}
|
|
|
|
|
|
static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media) {
|
|
if (!ssrc || !media)
|
|
return;
|
|
|
|
struct ssrc_entry_call *sc = ssrc->parent;
|
|
|
|
json_builder_set_member_name(json, "SSRC");
|
|
json_builder_add_int_value(json, sc->h.ssrc);
|
|
|
|
unsigned char prim_pt = 255;
|
|
mutex_lock(&ssrc->tracker.lock);
|
|
if (ssrc->tracker.most_len > 0)
|
|
prim_pt = ssrc->tracker.most[0];
|
|
mutex_unlock(&ssrc->tracker.lock);
|
|
|
|
unsigned int clockrate = 0;
|
|
rtp_payload_type *pt = t_hash_table_lookup(media->codecs.codecs, GUINT_TO_POINTER(prim_pt));
|
|
if (pt) {
|
|
json_builder_set_member_name(json, "codec");
|
|
glib_json_builder_add_str(json, &pt->encoding);
|
|
|
|
json_builder_set_member_name(json, "clock_rate");
|
|
json_builder_add_int_value(json, pt->clock_rate);
|
|
clockrate = pt->clock_rate;
|
|
|
|
if (pt->encoding_parameters.s) {
|
|
json_builder_set_member_name(json, "codec_params");
|
|
glib_json_builder_add_str(json, &pt->encoding_parameters);
|
|
}
|
|
|
|
if (pt->format_parameters.s) {
|
|
json_builder_set_member_name(json, "codec_format");
|
|
glib_json_builder_add_str(json, &pt->format_parameters);
|
|
}
|
|
}
|
|
|
|
json_builder_set_member_name(json, "metrics");
|
|
json_builder_begin_object(json);
|
|
|
|
// copy out values
|
|
int64_t packets, octets, packets_lost, duplicates;
|
|
packets = atomic64_get_na(&ssrc->stats->packets);
|
|
octets = atomic64_get_na(&ssrc->stats->bytes);
|
|
packets_lost = sc->packets_lost;
|
|
duplicates = sc->duplicates;
|
|
|
|
// process per-second stats
|
|
int64_t cur_ts = rtpe_now;
|
|
int64_t last_sample;
|
|
int64_t sample_packets, sample_octets, sample_packets_lost, sample_duplicates;
|
|
|
|
// sample values
|
|
last_sample = atomic64_get_set(&ssrc->last_sample, cur_ts);
|
|
sample_packets = atomic64_get_set(&ssrc->sample_packets, packets);
|
|
sample_octets = atomic64_get_set(&ssrc->sample_octets, octets);
|
|
sample_packets_lost = atomic64_get_set(&ssrc->sample_packets_lost, packets_lost);
|
|
sample_duplicates = atomic64_get_set(&ssrc->sample_duplicates, duplicates);
|
|
|
|
json_builder_set_member_name(json, "packets");
|
|
json_builder_add_int_value(json, packets);
|
|
|
|
json_builder_set_member_name(json, "bytes");
|
|
json_builder_add_int_value(json, octets);
|
|
|
|
json_builder_set_member_name(json, "lost");
|
|
json_builder_add_int_value(json, packets_lost);
|
|
|
|
json_builder_set_member_name(json, "duplicates");
|
|
json_builder_add_int_value(json, duplicates);
|
|
|
|
if (last_sample && last_sample != cur_ts) {
|
|
// calc sample rates with primitive math
|
|
int64_t last_sample_ts = last_sample;
|
|
int64_t usecs_diff = rtpe_now - last_sample_ts;
|
|
|
|
// adjust samples
|
|
packets -= sample_packets;
|
|
octets -= sample_octets;
|
|
packets_lost -= sample_packets_lost;
|
|
duplicates -= sample_duplicates;
|
|
|
|
json_builder_set_member_name(json, "packets_per_second");
|
|
json_builder_add_double_value(json, (double) packets * 1000000.0 / usecs_diff);
|
|
|
|
json_builder_set_member_name(json, "bytes_per_second");
|
|
json_builder_add_double_value(json, (double) octets * 1000000.0 / usecs_diff);
|
|
|
|
json_builder_set_member_name(json, "lost_per_second");
|
|
json_builder_add_double_value(json, (double) packets_lost * 1000000.0 / usecs_diff);
|
|
|
|
json_builder_set_member_name(json, "duplicates_per_second");
|
|
json_builder_add_double_value(json, (double) duplicates * 1000000.0 / usecs_diff);
|
|
}
|
|
|
|
mutex_lock(&sc->h.lock);
|
|
uint32_t jitter = sc->jitter;
|
|
int64_t mos = -1, rtt = -1, rtt_leg = -1;
|
|
if (sc->stats_blocks.length) {
|
|
struct ssrc_stats_block *sb = sc->stats_blocks.tail->data;
|
|
mos = sb->mos;
|
|
rtt = sb->rtt;
|
|
rtt_leg = sb->rtt_leg;
|
|
}
|
|
mutex_unlock(&sc->h.lock);
|
|
|
|
if (clockrate) {
|
|
json_builder_set_member_name(json, "jitter");
|
|
json_builder_add_double_value(json, (double) jitter * 1000.0 / (double) clockrate);
|
|
}
|
|
|
|
if (mos != -1 && mos != 0) {
|
|
json_builder_set_member_name(json, "MOS");
|
|
json_builder_add_double_value(json, (double) mos / 10.0);
|
|
}
|
|
if (rtt != -1) {
|
|
json_builder_set_member_name(json, "RTT");
|
|
json_builder_add_double_value(json, (double) rtt / 1000.0);
|
|
}
|
|
if (rtt_leg != -1) {
|
|
json_builder_set_member_name(json, "RTT_leg");
|
|
json_builder_add_double_value(json, (double) rtt_leg / 1000.0);
|
|
}
|
|
|
|
json_builder_end_object(json);
|
|
}
|
|
|
|
|
|
static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *json) {
|
|
json_builder_set_member_name(json, "bytes");
|
|
json_builder_add_int_value(json, atomic64_get_na(&s->bytes));
|
|
json_builder_set_member_name(json, "packets");
|
|
json_builder_add_int_value(json, atomic64_get_na(&s->packets));
|
|
json_builder_set_member_name(json, "errors");
|
|
json_builder_add_int_value(json, atomic64_get_na(&s->errors));
|
|
}
|
|
|
|
|
|
static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
|
|
mutex_lock(&ps->in_lock);
|
|
|
|
stream_fd *sfd = ps->selected_sfd;
|
|
if (sfd) {
|
|
json_builder_set_member_name(json, "address");
|
|
json_builder_add_string_value(json, sockaddr_print_buf(&sfd->socket.local.address));
|
|
|
|
json_builder_set_member_name(json, "port");
|
|
json_builder_add_int_value(json, sfd->socket.local.port);
|
|
|
|
json_builder_set_member_name(json, "endpoint_address");
|
|
json_builder_add_string_value(json, sockaddr_print_buf(&ps->endpoint.address));
|
|
|
|
json_builder_set_member_name(json, "endpoint_port");
|
|
json_builder_add_int_value(json, ps->endpoint.port);
|
|
}
|
|
|
|
if (ps->crypto.params.crypto_suite) {
|
|
json_builder_set_member_name(json, "crypto_suite");
|
|
json_builder_add_string_value(json, ps->crypto.params.crypto_suite->name);
|
|
}
|
|
|
|
json_builder_set_member_name(json, "transcoding");
|
|
json_builder_add_boolean_value(json, MEDIA_ISSET(ps->media, TRANSCODING) ? TRUE : FALSE);
|
|
|
|
json_builder_set_member_name(json, "ingress");
|
|
json_builder_begin_object(json);
|
|
mqtt_stream_stats_dir(ps->stats_in, json);
|
|
|
|
json_builder_set_member_name(json, "SSRC");
|
|
json_builder_begin_array(json);
|
|
for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) {
|
|
if (!ps->ssrc_in[i])
|
|
break;
|
|
json_builder_begin_object(json);
|
|
mqtt_ssrc_stats(ps->ssrc_in[i], json, ps->media);
|
|
json_builder_end_object(json);
|
|
}
|
|
json_builder_end_array(json);
|
|
|
|
json_builder_end_object(json);
|
|
|
|
mutex_unlock(&ps->in_lock);
|
|
|
|
mutex_lock(&ps->out_lock);
|
|
|
|
json_builder_set_member_name(json, "egress");
|
|
json_builder_begin_object(json);
|
|
mqtt_stream_stats_dir(ps->stats_out, json);
|
|
|
|
json_builder_set_member_name(json, "SSRC");
|
|
json_builder_begin_array(json);
|
|
for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) {
|
|
if (!ps->ssrc_out[i])
|
|
break;
|
|
json_builder_begin_object(json);
|
|
mqtt_ssrc_stats(ps->ssrc_out[i], json, ps->media);
|
|
json_builder_end_object(json);
|
|
}
|
|
json_builder_end_array(json);
|
|
|
|
json_builder_end_object(json);
|
|
|
|
mutex_unlock(&ps->out_lock);
|
|
}
|
|
|
|
|
|
static void mqtt_media_stats(struct call_media *media, JsonBuilder *json) {
|
|
json_builder_set_member_name(json, "media_index");
|
|
json_builder_add_int_value(json, media->index);
|
|
|
|
json_builder_set_member_name(json, "type");
|
|
glib_json_builder_add_str(json, &media->type);
|
|
|
|
json_builder_set_member_name(json, "interface");
|
|
glib_json_builder_add_str(json, &media->logical_intf->name);
|
|
|
|
if (media->protocol) {
|
|
json_builder_set_member_name(json, "protocol");
|
|
json_builder_add_string_value(json, media->protocol->name);
|
|
}
|
|
|
|
json_builder_set_member_name(json, "status");
|
|
if (MEDIA_ISSET(media, SEND)) {
|
|
if (MEDIA_ISSET(media, RECV))
|
|
json_builder_add_string_value(json, "sendrecv");
|
|
else
|
|
json_builder_add_string_value(json, "sendonly");
|
|
}
|
|
else {
|
|
if (MEDIA_ISSET(media, RECV))
|
|
json_builder_add_string_value(json, "recvonly");
|
|
else
|
|
json_builder_add_string_value(json, "inactive");
|
|
}
|
|
|
|
struct packet_stream *ps = media->streams.head ? media->streams.head->data : NULL;
|
|
if (ps)
|
|
mqtt_stream_stats(ps, json);
|
|
}
|
|
|
|
|
|
static void mqtt_full_call(call_t *call, JsonBuilder *json) {
|
|
rwlock_lock_r(&call->master_lock);
|
|
|
|
log_info_call(call);
|
|
|
|
mqtt_call_stats(call, json);
|
|
|
|
json_builder_set_member_name(json, "legs");
|
|
json_builder_begin_array(json);
|
|
|
|
for (__auto_type l = call->monologues.head; l; l = l->next) {
|
|
struct call_monologue *ml = l->data;
|
|
|
|
json_builder_begin_object(json);
|
|
|
|
mqtt_monologue_stats(ml, json);
|
|
|
|
json_builder_set_member_name(json, "medias");
|
|
json_builder_begin_array(json);
|
|
|
|
for (unsigned int k = 0; k < ml->medias->len; k++) {
|
|
struct call_media *media = ml->medias->pdata[k];
|
|
if (!media)
|
|
continue;
|
|
json_builder_begin_object(json);
|
|
mqtt_media_stats(media, json);
|
|
json_builder_end_object(json);
|
|
}
|
|
|
|
json_builder_end_array(json);
|
|
json_builder_end_object(json);
|
|
}
|
|
|
|
json_builder_end_array(json);
|
|
|
|
rwlock_unlock_r(&call->master_lock);
|
|
log_info_pop();
|
|
}
|
|
|
|
|
|
static void mqtt_global_stats(JsonBuilder *json) {
|
|
g_autoptr(stats_metric_q) metrics = statistics_gather_metrics(&interface_rate_stats);
|
|
|
|
for (__auto_type l = metrics->head; l; l = l->next) {
|
|
stats_metric *m = l->data;
|
|
if (!m->label)
|
|
continue;
|
|
|
|
if (m->is_bracket && l == metrics->head) // skip initial {
|
|
continue;
|
|
if (m->is_bracket && l == metrics->tail) // skip final }
|
|
continue;
|
|
|
|
|
|
if (m->value_short) {
|
|
json_builder_set_member_name(json, m->label);
|
|
if (m->is_int)
|
|
json_builder_add_int_value(json, m->int_value);
|
|
else if (m->is_double)
|
|
json_builder_add_double_value(json, m->double_value);
|
|
else if (m->value_raw)
|
|
json_builder_add_string_value(json, m->value_raw);
|
|
else
|
|
json_builder_add_string_value(json, m->value_short);
|
|
}
|
|
else if (m->is_bracket) {
|
|
if (m->is_close_bracket) {
|
|
if (m->is_brace)
|
|
json_builder_end_object(json);
|
|
else
|
|
json_builder_end_array(json);
|
|
}
|
|
else {
|
|
if (m->is_brace)
|
|
json_builder_begin_object(json);
|
|
else
|
|
json_builder_begin_array(json);
|
|
}
|
|
}
|
|
else
|
|
json_builder_set_member_name(json, m->label);
|
|
}
|
|
}
|
|
|
|
|
|
INLINE JsonBuilder *__mqtt_timer_intro(void) {
|
|
JsonBuilder *json = json_builder_new();
|
|
|
|
json_builder_begin_object(json);
|
|
|
|
json_builder_set_member_name(json, "timestamp");
|
|
json_builder_add_double_value(json, (double) timeval_from_us(rtpe_now).tv_sec + (double) timeval_from_us(rtpe_now).tv_usec / 1000000.0); // XXX
|
|
|
|
return json;
|
|
}
|
|
INLINE void __mqtt_timer_outro(JsonBuilder *json) {
|
|
json_builder_end_object(json);
|
|
mqtt_publish(glib_json_print(json));
|
|
}
|
|
void mqtt_timer_run_media(call_t *call, struct call_media *media) {
|
|
JsonBuilder *json = __mqtt_timer_intro();
|
|
|
|
rwlock_lock_r(&call->master_lock);
|
|
log_info_call(call);
|
|
|
|
mqtt_call_stats(call, json);
|
|
mqtt_monologue_stats(media->monologue, json);
|
|
mqtt_media_stats(media, json);
|
|
|
|
rwlock_unlock_r(&call->master_lock);
|
|
log_info_pop();
|
|
|
|
__mqtt_timer_outro(json);
|
|
}
|
|
void mqtt_timer_run_call(call_t *call) {
|
|
JsonBuilder *json = __mqtt_timer_intro();
|
|
|
|
mqtt_full_call(call, json);
|
|
|
|
__mqtt_timer_outro(json);
|
|
}
|
|
void mqtt_timer_run_global(void) {
|
|
JsonBuilder *json = __mqtt_timer_intro();
|
|
|
|
mqtt_global_stats(json);
|
|
|
|
json_builder_set_member_name(json, "calls");
|
|
|
|
json_builder_begin_array(json);
|
|
|
|
ITERATE_CALL_LIST_START(CALL_ITERATOR_MQTT, call);
|
|
json_builder_begin_object(json);
|
|
mqtt_full_call(call, json);
|
|
json_builder_end_object(json);
|
|
ITERATE_CALL_LIST_NEXT_END(call);
|
|
|
|
json_builder_end_array(json);
|
|
|
|
__mqtt_timer_outro(json);
|
|
}
|
|
void mqtt_timer_run_summary(void) {
|
|
JsonBuilder *json = __mqtt_timer_intro();
|
|
|
|
mqtt_global_stats(json);
|
|
|
|
__mqtt_timer_outro(json);
|
|
}
|
|
|
|
|
|
|
|
#endif
|