TT#14008 add mqtt support

Change-Id: Ica4a3e4ef58eadf3feca44ea63f2308103b3146f
pull/1287/head
Richard Fuchs 4 years ago
parent e32d128bd8
commit 369e64f9a7

@ -43,6 +43,7 @@ flags = [
'-DWITH_IPTABLES_OPTION',
'-DWITH_TRANSCODING',
'-DHAVE_BCG729',
'-DHAVE_MQTT',
'-D__csh_lookup(x)=str_hash(x)',
'-DCSH_LOOKUP(x)=' + csh_lookup_str,
'-O2',

@ -76,11 +76,14 @@ ifeq ($(with_transcoding),yes)
include ../lib/g729.Makefile
endif
include ../lib/mqtt.Makefile
SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c \
bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \
codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c t38.c websocket.c
codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c t38.c websocket.c \
mqtt.c
LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c
ifeq ($(with_transcoding),yes)
LIBSRCS+= codeclib.c resample.c

@ -47,6 +47,7 @@
#include "media_player.h"
#include "jitter_buffer.h"
#include "t38.h"
#include "mqtt.h"
struct iterator_helper {
@ -69,6 +70,7 @@ struct stats rtpe_stats_cumulative;
rwlock_t rtpe_callhash_lock;
GHashTable *rtpe_callhash;
struct call_iterator_list rtpe_call_iterators[NUM_CALL_ITERATORS];
static struct mqtt_timer *global_mqtt_timer;
/* ********** */
@ -723,10 +725,13 @@ int call_init() {
poller_add_timer(rtpe_poller, call_timer, NULL);
mqtt_timer_start(&global_mqtt_timer, NULL, NULL);
return 0;
}
void call_free(void) {
mqtt_timer_stop(&global_mqtt_timer);
GList *ll = g_hash_table_get_values(rtpe_callhash);
for (GList *l = ll; l; l = l->next) {
struct call *c = l->data;
@ -2406,6 +2411,11 @@ init:
recording_setup_media(media);
t38_gateway_start(media->t38_gateway);
if (mqtt_publish_scope() == MPS_MEDIA) {
mqtt_timer_start(&media->mqtt_timer, call, media);
mqtt_timer_start(&other_media->mqtt_timer, call, other_media);
}
}
// set ipv4/ipv6/mixed media stats
@ -2613,6 +2623,8 @@ void call_destroy(struct call *c) {
rwlock_lock_w(&c->master_lock);
/* at this point, no more packet streams can be added */
mqtt_timer_stop(&c->mqtt_timer);
if (!IS_OWN_CALL(c))
goto no_stats_output;
@ -2790,6 +2802,7 @@ static void __call_free(void *p) {
//ilog(LOG_DEBUG, "freeing main call struct");
obj_put(c->dtls_cert);
mqtt_timer_stop(&c->mqtt_timer);
while (c->monologues.head) {
m = g_queue_pop_head(&c->monologues);
@ -2912,6 +2925,9 @@ restart:
mutex_unlock(&first_call->iterator[i].lock);
mutex_unlock(&rtpe_call_iterators[i].lock);
}
if (mqtt_publish_scope() == MPS_CALL)
mqtt_timer_start(&c->mqtt_timer, c, NULL);
}
else {
obj_hold(c);
@ -3249,6 +3265,7 @@ static void media_stop(struct call_media *m) {
t38_gateway_stop(m->t38_gateway);
codec_handlers_stop(&m->codec_handlers_store);
rtcp_timer_stop(&m->rtcp_timer);
mqtt_timer_stop(&m->mqtt_timer);
}
static void __monologue_stop(struct call_monologue *ml) {
media_player_stop(ml->player);

@ -16,6 +16,7 @@
#include "media_player.h"
#include "timerthread.h"
#include "log_funcs.h"
#include "mqtt.h"
@ -25,6 +26,12 @@ struct codec_timer {
struct timeval next;
void (*func)(struct codec_timer *);
};
struct mqtt_timer {
struct codec_timer ct;
struct mqtt_timer **self;
struct call *call;
struct call_media *media;
};
static codec_handler_func handler_func_passthrough;
@ -1248,6 +1255,7 @@ static void __codec_rtcp_timer(struct call_media *receiver) {
}
// returns: 0 = supp codec not present; 1 = sink has codec but receiver does not, 2 = both have codec
int __supp_codec_match(struct call_media *receiver, struct call_media *sink, int pt,
struct rtp_payload_type **sink_pt, struct rtp_payload_type **recv_pt)
@ -1647,6 +1655,55 @@ static struct codec_handler *codec_handler_get_udptl(struct call_media *m) {
#endif
static void __mqtt_timer_free(void *p) {
struct mqtt_timer *mqt = p;
if (mqt->call)
obj_put(mqt->call);
}
static void __codec_mqtt_timer_schedule(struct mqtt_timer *mqt);
static void __mqtt_timer_run(struct codec_timer *ct) {
struct mqtt_timer *mqt = (struct mqtt_timer *) ct;
struct call *call = mqt->call;
if (call)
rwlock_lock_w(&call->master_lock);
if (!*mqt->self) {
if (call)
rwlock_unlock_w(&call->master_lock);
goto out;
}
__codec_mqtt_timer_schedule(mqt);
if (call)
rwlock_unlock_w(&call->master_lock);
mqtt_timer_run(call, mqt->media);
out:
log_info_clear();
}
static void __codec_mqtt_timer_schedule(struct mqtt_timer *mqt) {
timeval_add_usec(&mqt->ct.next, rtpe_config.mqtt_publish_interval * 1000);
timerthread_obj_schedule_abs(&mqt->ct.tt_obj, &mqt->ct.next);
}
// master lock held in W
void mqtt_timer_start(struct mqtt_timer **mqtp, struct call *call, struct call_media *media) {
if (*mqtp) // already scheduled
return;
struct mqtt_timer *mqt = *mqtp = obj_alloc0("mqtt_timer", sizeof(*mqt), __mqtt_timer_free);
mqt->ct.tt_obj.tt = &codec_timers_thread;
mqt->call = call ? obj_get(call) : NULL;
mqt->self = mqtp;
mqt->media = media;
mqt->ct.next = rtpe_now;
mqt->ct.func = __mqtt_timer_run;
__codec_mqtt_timer_schedule(mqt);
}
// master lock held in W
static void codec_timer_stop(struct codec_timer **ctp) {
@ -1659,6 +1716,11 @@ static void codec_timer_stop(struct codec_timer **ctp) {
void rtcp_timer_stop(struct rtcp_timer **rtp) {
codec_timer_stop((struct codec_timer **) rtp);
}
void mqtt_timer_stop(struct mqtt_timer **mqtp) {
codec_timer_stop((struct codec_timer **) mqtp);
}
// call must be locked in R

@ -16,6 +16,9 @@
#include <ifaddrs.h>
#include <net/if.h>
#include <netdb.h>
#ifdef HAVE_MQTT
#include <mosquitto.h>
#endif
#include "poller.h"
#include "control_tcp.h"
@ -49,6 +52,7 @@
#include "jitter_buffer.h"
#include "websocket.h"
#include "codec.h"
#include "mqtt.h"
@ -85,6 +89,9 @@ struct rtpengine_config rtpe_config = {
.dtx_shift = 5,
.dtx_buffer = 10,
.dtx_lag = 100,
.mqtt_port = 1883,
.mqtt_keepalive = 30,
.mqtt_publish_interval = 5000,
.common = {
.log_levels = {
[log_level_index_internals] = -1,
@ -424,6 +431,9 @@ static void options(int *argc, char ***argv) {
AUTO_CLEANUP_GVBUF(dtx_cn_params);
int debug_srtp = 0;
AUTO_CLEANUP_GBUF(amr_dtx);
#ifdef HAVE_MQTT
AUTO_CLEANUP_GBUF(mqtt_publish_scope);
#endif
rwlock_lock_w(&rtpe_config.config_lock);
@ -528,6 +538,22 @@ static void options(int *argc, char ***argv) {
{ "cn-payload",0,0, G_OPTION_ARG_STRING_ARRAY,&cn_payload, "Comfort noise parameters to replace silence with","INT INT INT ..."},
{ "reorder-codecs",0,0, G_OPTION_ARG_NONE, &rtpe_config.reorder_codecs,"Reorder answer codecs based on sender preference",NULL},
#endif
#ifdef HAVE_MQTT
{ "mqtt-host",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_host, "Mosquitto broker host or address", "HOST|IP"},
{ "mqtt-port",0,0, G_OPTION_ARG_INT, &rtpe_config.mqtt_port, "Mosquitto broker port number", "INT"},
{ "mqtt-id",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_id, "Mosquitto client ID", "STRING"},
{ "mqtt-keepalive",0,0, G_OPTION_ARG_INT, &rtpe_config.mqtt_keepalive,"Seconds between mosquitto keepalives","INT"},
{ "mqtt-user",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_user, "Username for mosquitto auth", "USERNAME"},
{ "mqtt-pass",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_pass, "Password for mosquitto auth", "PASSWORD"},
{ "mqtt-cafile",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_cafile,"CA file for mosquitto auth", "FILE"},
{ "mqtt-capath",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_capath,"CA path for mosquitto auth", "PATH"},
{ "mqtt-certfile",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_certfile,"Certificate file for mosquitto auth","FILE"},
{ "mqtt-keyfile",0,0, G_OPTION_ARG_STRING, &rtpe_config.mqtt_keyfile,"Key file for mosquitto auth", "FILE"},
{ "mqtt-publish-qos",0,0,G_OPTION_ARG_INT, &rtpe_config.mqtt_publish_qos,"Mosquitto publish QoS", "0|1|2"},
{ "mqtt-publish-topic",0,0,G_OPTION_ARG_STRING, &rtpe_config.mqtt_publish_topic,"Mosquitto publish topic", "STRING"},
{ "mqtt-publish-interval",0,0,G_OPTION_ARG_INT, &rtpe_config.mqtt_publish_interval,"Publish timer interval", "MILLISECONDS"},
{ "mqtt-publish-scope",0,0,G_OPTION_ARG_STRING, &mqtt_publish_scope, "Scope for published mosquitto messages","global|call|media"},
#endif
{ NULL, }
};
@ -782,6 +808,19 @@ static void options(int *argc, char ***argv) {
rtpe_config.software_id = g_strdup_printf("rtpengine-%s", RTPENGINE_VERSION);
g_strcanon(rtpe_config.software_id, "QWERTYUIOPASDFGHJKLZXCVBNMqwertyuiopasdfghjklzxcvbnm1234567890-", '-');
#ifdef HAVE_MQTT
if (mqtt_publish_scope) {
if (!strcmp(mqtt_publish_scope, "global"))
rtpe_config.mqtt_publish_scope = MPS_GLOBAL;
else if (!strcmp(mqtt_publish_scope, "call"))
rtpe_config.mqtt_publish_scope = MPS_CALL;
else if (!strcmp(mqtt_publish_scope, "media"))
rtpe_config.mqtt_publish_scope = MPS_MEDIA;
else
die("Invalid --mqtt-publish-scope option ('%s')", mqtt_publish_scope);
}
#endif
rwlock_unlock_w(&rtpe_config.config_lock);
}
@ -917,6 +956,12 @@ static void options_free(void) {
free(rtpe_config.cn_payload.s);
if (rtpe_config.dtx_cn_params.s)
free(rtpe_config.dtx_cn_params.s);
g_free(rtpe_config.mqtt_user);
g_free(rtpe_config.mqtt_pass);
g_free(rtpe_config.mqtt_cafile);
g_free(rtpe_config.mqtt_certfile);
g_free(rtpe_config.mqtt_keyfile);
g_free(rtpe_config.mqtt_publish_topic);
// free common config options
config_load_free(&rtpe_config.common);
@ -940,6 +985,11 @@ static void init_everything(void) {
g_type_init();
#endif
#ifdef HAVE_MQTT
if (mosquitto_lib_init() != MOSQ_ERR_SUCCESS)
die("failed to init libmosquitto");
#endif
signals();
resources();
sdp_init();
@ -957,6 +1007,8 @@ static void init_everything(void) {
dtmf_init();
jitter_buffer_init();
t38_init();
if (rtpe_config.mqtt_host && mqtt_init())
abort();
codecs_init();
}
@ -1138,6 +1190,11 @@ int main(int argc, char **argv) {
if (!is_addr_unspecified(&rtpe_config.graphite_ep.address))
thread_create_detach(graphite_loop, NULL, "graphite");
#ifdef HAVE_MQTT
if (mqtt_publish_scope() != MPS_NONE)
thread_create_detach(mqtt_loop, NULL, "mqtt");
#endif
thread_create_detach(ice_thread_run, NULL, "ICE");
websocket_start();

@ -28,6 +28,7 @@
#include "media_player.h"
#include "jitter_buffer.h"
#include "dtmf.h"
#include "mqtt.h"
#ifndef PORT_RANDOM_MIN
@ -1186,7 +1187,7 @@ void kernelize(struct packet_stream *stream) {
reti.stun = media->ice_agent ? 1 : 0;
reti.non_forwarding = non_forwarding;
reti.blackhole = MEDIA_ISSET(media, BLACKHOLE) ? 1 : 0;
reti.rtp_stats = MEDIA_ISSET(media, RTCP_GEN) ? 1 : 0;
reti.rtp_stats = (MEDIA_ISSET(media, RTCP_GEN) || (mqtt_publish_scope() != MPS_NONE)) ? 1 : 0;
__re_address_translate_ep(&reti.dst_addr, &sink->endpoint);
__re_address_translate_ep(&reti.src_addr, &sink->selected_sfd->socket.local);

@ -0,0 +1,473 @@
#ifdef HAVE_MQTT
#include "mqtt.h"
#include <mosquitto.h>
#include <errno.h>
#include <string.h>
#include <stdbool.h>
#include <glib.h>
#include <glib-object.h>
#include <json-glib/json-glib.h>
#include "main.h"
#include "log.h"
#include "log_funcs.h"
#include "call.h"
#include "ssrc.h"
#include "rtplib.h"
static struct mosquitto *mosq;
static bool is_connected = false;
int mqtt_init(void) {
mosq = mosquitto_new(rtpe_config.mqtt_id, true, NULL);
if (!mosq) {
ilog(LOG_ERR, "Failed to create mosquitto client instance: %s", strerror(errno));
return -1;
}
return 0;
}
static int mqtt_connect(void) {
ilog(LOG_DEBUG, "Connecting to mosquitto...");
mosquitto_disconnect(mosq);
int ret = mosquitto_reinitialise(mosq, rtpe_config.mqtt_id, true, NULL);
if (ret) {
ilog(LOG_ERR, "Failed to initialise mosquitto client instance: %s", mosquitto_strerror(ret));
return -1;
}
mosquitto_threaded_set(mosq, true);
if (rtpe_config.mqtt_user) {
int ret = mosquitto_username_pw_set(mosq, rtpe_config.mqtt_user, rtpe_config.mqtt_pass);
if (ret != MOSQ_ERR_SUCCESS) {
ilog(LOG_ERR, "Failed to set mosquitto user/pass auth: %s", mosquitto_strerror(errno));
return -1;
}
}
if (rtpe_config.mqtt_cafile || rtpe_config.mqtt_capath) {
int ret = mosquitto_tls_set(mosq, rtpe_config.mqtt_cafile, rtpe_config.mqtt_capath,
rtpe_config.mqtt_certfile, rtpe_config.mqtt_keyfile, NULL);
if (ret != MOSQ_ERR_SUCCESS) {
ilog(LOG_ERR, "Failed to set mosquitto TLS options: %s", mosquitto_strerror(errno));
return -1;
}
}
ret = mosquitto_connect(mosq, rtpe_config.mqtt_host, rtpe_config.mqtt_port,
rtpe_config.mqtt_keepalive);
if (ret != MOSQ_ERR_SUCCESS) {
ilog(LOG_ERR, "Failed to connect to mosquitto broker: %s", mosquitto_strerror(ret));
return -1;
}
ilog(LOG_DEBUG, "Successfully connected to mosquitto");
return 0;
}
void mqtt_loop(void *dummy) {
while (!rtpe_shutdown) {
while (!is_connected && !rtpe_shutdown) {
if (!mqtt_connect()) {
is_connected = true;
break;
}
usleep(1000000);
}
unsigned int errors = 0;
while (!rtpe_shutdown) {
int ret = mosquitto_loop(mosq, 100, 1);
if (ret == MOSQ_ERR_SUCCESS) {
errors = 0;
continue;
}
if (ret == MOSQ_ERR_ERRNO)
ilog(LOG_ERR, "Error from mosquitto: %s", strerror(errno));
else
ilog(LOG_ERR, "Error from mosquitto: %s", mosquitto_strerror(ret));
errors++;
if (errors >= 5) {
ilog(LOG_WARN, "Reconnecting to mosquitto");
break;
}
}
mosquitto_disconnect(mosq);
is_connected = false;
}
mosquitto_destroy(mosq);
mosq = NULL;
}
int mqtt_publish_scope(void) {
if (!mosq)
return MPS_NONE;
return rtpe_config.mqtt_publish_scope;
}
void mqtt_publish(char *s) {
ilog(LOG_DEBUG, "Publishing to mosquitto: %s%s%s", FMT_M(s));
int ret = mosquitto_publish(mosq, NULL, rtpe_config.mqtt_publish_topic, strlen(s), s,
rtpe_config.mqtt_publish_qos,
false);
if (ret != MOSQ_ERR_SUCCESS)
ilog(LOG_WARN | LOG_FLAG_LIMIT, "Error publishing message to mosquitto: %s",
mosquitto_strerror(ret));
g_free(s);
}
static void mqtt_call_stats(struct call *call, JsonBuilder *json) {
json_builder_set_member_name(json, "call_id");
json_builder_add_string_value(json, call->callid.s);
}
static void mqtt_monologue_stats(struct call_monologue *ml, JsonBuilder *json) {
json_builder_set_member_name(json, "tag");
json_builder_add_string_value(json, ml->tag.s);
if (ml->label.len) {
json_builder_set_member_name(json, "label");
json_builder_add_string_value(json, ml->label.s);
}
}
static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media) {
struct ssrc_entry_call *sc = ssrc->parent;
json_builder_set_member_name(json, "SSRC");
json_builder_add_int_value(json, sc->h.ssrc);
unsigned char prim_pt = 255;
mutex_lock(&ssrc->tracker.lock);
if (ssrc->tracker.most_len > 0)
prim_pt = ssrc->tracker.most[0];
mutex_unlock(&ssrc->tracker.lock);
unsigned int clockrate = 0;
//struct codec_handler *h = codec_handler_get(ps->media, prim_pt);
struct rtp_payload_type *pt = g_hash_table_lookup(media->codecs_recv, GUINT_TO_POINTER(prim_pt));
if (pt) {
json_builder_set_member_name(json, "codec");
json_builder_add_string_value(json, pt->encoding.s);
json_builder_set_member_name(json, "clock_rate");
json_builder_add_int_value(json, pt->clock_rate);
clockrate = pt->clock_rate;
if (pt->encoding_parameters.s) {
json_builder_set_member_name(json, "codec_params");
json_builder_add_string_value(json, pt->encoding_parameters.s);
}
if (pt->format_parameters.s) {
json_builder_set_member_name(json, "codec_format");
json_builder_add_string_value(json, pt->format_parameters.s);
}
}
json_builder_set_member_name(json, "metrics");
json_builder_begin_object(json);
// copy out values
uint64_t packets, octets, packets_lost, duplicates;
packets = atomic64_get(&ssrc->packets);
octets = atomic64_get(&ssrc->octets);
packets_lost = atomic64_get(&ssrc->packets_lost);
duplicates = atomic64_get(&ssrc->duplicates);
// process per-second stats
uint64_t cur_ts = ssrc_timeval_to_ts(&rtpe_now);
uint64_t last_sample, sample_packets, sample_octets, sample_packets_lost, sample_duplicates;
// sample values
last_sample = atomic64_get_set(&ssrc->last_sample, cur_ts);
sample_packets = atomic64_get_set(&ssrc->sample_packets, packets);
sample_octets = atomic64_get_set(&ssrc->sample_octets, octets);
sample_packets_lost = atomic64_get_set(&ssrc->sample_packets_lost, packets_lost);
sample_duplicates = atomic64_get_set(&ssrc->sample_duplicates, duplicates);
json_builder_set_member_name(json, "packets");
json_builder_add_int_value(json, packets);
json_builder_set_member_name(json, "bytes");
json_builder_add_int_value(json, octets);
json_builder_set_member_name(json, "lost");
json_builder_add_int_value(json, packets_lost);
json_builder_set_member_name(json, "duplicates");
json_builder_add_int_value(json, duplicates);
if (last_sample && last_sample != cur_ts) {
// calc sample rates with primitive math
struct timeval last_sample_ts = ssrc_ts_to_timeval(last_sample);
uint64_t usecs_diff = timeval_diff(&rtpe_now, &last_sample_ts);
// adjust samples
packets -= sample_packets;
octets -= sample_octets;
packets_lost -= sample_packets_lost;
duplicates -= sample_duplicates;
json_builder_set_member_name(json, "packets_per_second");
json_builder_add_double_value(json, (double) packets * 1000000.0 / (double) usecs_diff);
json_builder_set_member_name(json, "bytes_per_second");
json_builder_add_double_value(json, (double) octets * 1000000.0 / (double) usecs_diff);
json_builder_set_member_name(json, "lost_per_second");
json_builder_add_double_value(json, (double) packets_lost * 1000000.0 / (double) usecs_diff);
json_builder_set_member_name(json, "duplicates_per_second");
json_builder_add_double_value(json, (double) duplicates * 1000000.0 / (double) usecs_diff);
}
mutex_lock(&sc->h.lock);
uint32_t jitter = sc->jitter;
int64_t mos = -1, rtt = -1, rtt_leg = -1;
if (sc->stats_blocks.length) {
struct ssrc_stats_block *sb = sc->stats_blocks.tail->data;
mos = sb->mos;
rtt = sb->rtt;
rtt_leg = sb->rtt_leg;
}
mutex_unlock(&sc->h.lock);
if (clockrate) {
json_builder_set_member_name(json, "jitter");
json_builder_add_double_value(json, (double) jitter * 1000.0 / (double) clockrate);
}
if (mos != -1 && mos != 0) {
json_builder_set_member_name(json, "MOS");
json_builder_add_double_value(json, (double) mos / 10.0);
}
if (rtt != -1) {
json_builder_set_member_name(json, "RTT");
json_builder_add_double_value(json, (double) rtt / 1000.0);
}
if (rtt_leg != -1) {
json_builder_set_member_name(json, "RTT_leg");
json_builder_add_double_value(json, (double) rtt_leg / 1000.0);
}
json_builder_end_object(json);
}
static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
mutex_lock(&ps->in_lock);
struct stream_fd *sfd = ps->selected_sfd;
if (sfd) {
json_builder_set_member_name(json, "address");
json_builder_add_string_value(json, sockaddr_print_buf(&sfd->socket.local.address));
json_builder_set_member_name(json, "port");
json_builder_add_int_value(json, sfd->socket.local.port);
}
if (ps->ssrc_in) {
json_builder_set_member_name(json, "ingress");
json_builder_begin_object(json);
mqtt_ssrc_stats(ps->ssrc_in, json, ps->media);
json_builder_end_object(json);
}
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
if (ps->ssrc_out) {
json_builder_set_member_name(json, "egress");
json_builder_begin_object(json);
mqtt_ssrc_stats(ps->ssrc_out, json, ps->media);
json_builder_end_object(json);
}
mutex_unlock(&ps->out_lock);
}
static void mqtt_media_stats(struct call_media *media, JsonBuilder *json) {
media_update_stats(media);
json_builder_set_member_name(json, "media_index");
json_builder_add_int_value(json, media->index);
json_builder_set_member_name(json, "type");
json_builder_add_string_value(json, media->type.s);
json_builder_set_member_name(json, "interface");
json_builder_add_string_value(json, media->logical_intf->name.s);
if (media->protocol) {
json_builder_set_member_name(json, "protocol");
json_builder_add_string_value(json, media->protocol->name);
}
json_builder_set_member_name(json, "status");
if (MEDIA_ISSET(media, SEND)) {
if (MEDIA_ISSET(media, RECV))
json_builder_add_string_value(json, "sendrecv");
else
json_builder_add_string_value(json, "sendonly");
}
else {
if (MEDIA_ISSET(media, RECV))
json_builder_add_string_value(json, "recvonly");
else
json_builder_add_string_value(json, "inactive");
}
struct packet_stream *ps = media->streams.head ? media->streams.head->data : NULL;
if (ps)
mqtt_stream_stats(ps, json);
}
static void mqtt_full_call(struct call *call, JsonBuilder *json) {
rwlock_lock_r(&call->master_lock);
log_info_call(call);
mqtt_call_stats(call, json);
json_builder_set_member_name(json, "legs");
json_builder_begin_array(json);
for (GList *l = call->monologues.head; l; l = l->next) {
struct call_monologue *ml = l->data;
json_builder_begin_object(json);
mqtt_monologue_stats(ml, json);
json_builder_set_member_name(json, "medias");
json_builder_begin_array(json);
for (GList *k = ml->medias.head; k; k = k->next) {
struct call_media *media = k->data;
json_builder_begin_object(json);
mqtt_media_stats(media, json);
json_builder_end_object(json);
}
json_builder_end_array(json);
json_builder_end_object(json);
}
json_builder_end_array(json);
rwlock_unlock_r(&call->master_lock);
}
static void mqtt_global_stats(JsonBuilder *json) {
AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics());
for (GList *l = metrics->head; l; l = l->next) {
struct stats_metric *m = l->data;
if (!m->label)
continue;
if (m->value_short) {
json_builder_set_member_name(json, m->label);
if (m->is_int)
json_builder_add_int_value(json, m->int_value);
else
json_builder_add_string_value(json, m->value_short);
}
else if (m->is_bracket) {
if (m->is_close_bracket) {
if (m->is_brace)
json_builder_end_object(json);
else
json_builder_end_array(json);
}
else {
if (m->is_brace)
json_builder_begin_object(json);
else
json_builder_begin_array(json);
}
}
else
json_builder_set_member_name(json, m->label);
}
}
void mqtt_timer_run(struct call *call, struct call_media *media) {
JsonBuilder *json = json_builder_new();
json_builder_begin_object(json);
json_builder_set_member_name(json, "timestamp");
json_builder_add_double_value(json, (double) rtpe_now.tv_sec + (double) rtpe_now.tv_usec / 1000000.0);
if (!call) {
mqtt_global_stats(json);
if (mqtt_publish_scope() == MPS_GLOBAL) {
json_builder_set_member_name(json, "calls");
json_builder_begin_array(json);
ITERATE_CALL_LIST_START(CALL_ITERATOR_MQTT, call);
json_builder_begin_object(json);
mqtt_full_call(call, json);
json_builder_end_object(json);
ITERATE_CALL_LIST_NEXT_END(call);
json_builder_end_array(json);
}
}
else if (!media)
mqtt_full_call(call, json);
else {
rwlock_lock_r(&call->master_lock);
log_info_call(call);
mqtt_call_stats(call, json);
mqtt_monologue_stats(media->monologue, json);
mqtt_media_stats(media, json);
rwlock_unlock_r(&call->master_lock);
}
json_builder_end_object(json);
JsonGenerator *gen = json_generator_new();
JsonNode *root = json_builder_get_root(json);
json_generator_set_root(gen, root);
char *result = json_generator_to_data(gen, NULL);
mqtt_publish(result);
json_node_free(root);
g_object_unref(gen);
g_object_unref(json);
}
#endif

@ -1572,6 +1572,9 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
else if (ps->rtcp_sink)
other_media = ps->rtcp_sink->media;
media_update_stats(media);
media_update_stats(other_media);
log_info_stream_fd(ps->selected_sfd);
GQueue rrs = G_QUEUE_INIT;

@ -915,6 +915,69 @@ process IP datagrams of at least 576 bytes (for IPv4) or 1280 bytes (for IPv6).
This does not preclude link layers with an MTU smaller than this minimum MTU from
conveying IP data. Internet IPv4 path MTU is 68 bytes.
=item B<--mqtt-host=>I<HOST>|I<IP>
Host or IP address of the Mosquitto broker to connect to. Must be set to enable
exporting stats to Mosquitto.
=item B<--mqtt-port=>I<INT>
Port of the Mosquitto broker. Defaults to 1883.
=item B<--mqtt-id=>I<STRING>
Client ID to use for Mosquitto. Default is a generated random string.
=item B<--mqtt-keepalive=>I<INT>
Keepalive interval in seconds. Defaults to 30.
=item B<--mqtt-user=>I<USERNAME>
=item B<--mqtt-pass=>I<PASSWORD>
Credentials to connect to Mosquitto broker. At least a username must be given
to enable authentication.
=item B<--mqtt-cafile=>I<FILE>
=item B<--mqtt-capath=>I<PATH>
=item B<--mqtt-certfile=>I<FILE>
=item B<--mqtt-keyfile=>I<FILE>
Enable TLS to connect to Mosquitto broker, optionally with client certificate
authentication. At least B<cafile> or B<capath> must be given to enable TLS. To
enable client certificate authentication, both B<certfile> and B<keyfile> must
be set. All files must be in PEM format. Password-proteted files are not
supported.
=item B<--mqtt-publish-qos=>B<0>|B<1>|B<2>
QoS value to use for publishing to Mosquitto. See Mosquitto docs for details.
=item B<--mqtt-publish-topic=>I<STRING>
Topic string to use for publishing to Mosquitto. Must be set to a non-empty
string.
=item B<--mqtt-publish-interval=>I<MILLISECONDS>
Interval in milliseconds to publish to Mosquitto. Defaults to 5000 (5 seconds).
=item B<--mqtt-publish-scope=>B<global>|B<call>|B<media>
When set to B<global>, one message will be published to Mosquitto every
I<interval> milliseconds containing global stats plus a list of all running
calls with stats for each call. When set to B<call>, one message per call will
be published to Mosquitto with stats for that call every I<interval>
milliseconds, plus one message every I<interval> milliseconds with global
stats. When set to B<media>, one message per call media (usually one media per
call participant, so usually 2 media per call) will be published to Mosquitto
with stats for that call media every I<interval> milliseconds, plus one message
every I<interval> milliseconds with global stats.
=back
=head1 INTERFACES

@ -15,6 +15,7 @@ static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) {
payload_tracker_init(&c->tracker);
while (!c->ssrc_map_out)
c->ssrc_map_out = ssl_random();
atomic64_set_na(&c->last_sample, ssrc_timeval_to_ts(&rtpe_now));
}
static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) {
ent->ssrc = ssrc;

1
debian/control vendored

@ -27,6 +27,7 @@ Build-Depends:
libio-socket-ip-perl,
libiptc-dev,
libjson-glib-dev,
libmosquitto-dev,
libnet-interface-perl,
libpcap0.8-dev,
libpcre3-dev,

@ -16,6 +16,7 @@ BuildRequires: gcc make pkgconfig redhat-rpm-config
BuildRequires: glib2-devel libcurl-devel openssl-devel pcre-devel
BuildRequires: xmlrpc-c-devel zlib-devel hiredis-devel
BuildRequires: libpcap-devel libevent-devel json-glib-devel
BuildRequires: mosquitto-devel
BuildRequires: gperf perl-IPC-Cmd
BuildRequires: perl-podlators
BuildRequires: pkgconfig(libwebsockets)

@ -69,6 +69,7 @@ enum {
CALL_ITERATOR_MAIN = 0,
CALL_ITERATOR_TIMER,
CALL_ITERATOR_GRAPHITE,
CALL_ITERATOR_MQTT,
NUM_CALL_ITERATORS
};
@ -213,6 +214,7 @@ struct transport_protocol;
struct jitter_buffer;
struct codec_tracker;
struct rtcp_timer;
struct mqtt_timer;
typedef bencode_buffer_t call_buffer_t;
@ -353,6 +355,7 @@ struct call_media {
struct codec_handler *codec_handler_cache;
struct rtcp_handler *rtcp_handler;
struct rtcp_timer *rtcp_timer; // master lock for scheduling purposes
struct mqtt_timer *mqtt_timer; // master lock for scheduling purposes
struct codec_handler *dtmf_injector;
struct t38_gateway *t38_gateway;
struct codec_handler *t38_handler;
@ -450,6 +453,7 @@ struct call {
GQueue endpoint_maps;
struct dtls_cert *dtls_cert; /* for outgoing */
struct ssrc_hash *ssrc_hash;
struct mqtt_timer *mqtt_timer;
str callid;
struct timeval created;

@ -21,6 +21,8 @@ struct rtp_header;
struct stream_params;
struct supp_codec_tracker;
struct rtcp_timer;
struct mqtt_timer;
struct call;
typedef int codec_handler_func(struct codec_handler *, struct media_packet *);
@ -68,6 +70,9 @@ void codecs_cleanup(void);
void codec_timers_loop(void *);
void rtcp_timer_stop(struct rtcp_timer **);
void mqtt_timer_stop(struct mqtt_timer **);
void mqtt_timer_start(struct mqtt_timer **mqtp, struct call *call, struct call_media *media);
struct codec_handler *codec_handler_get(struct call_media *, int payload_type);
void codec_handlers_free(struct call_media *);
struct codec_handler *codec_handler_make_playback(const struct rtp_payload_type *src_pt,

@ -124,6 +124,25 @@ struct rtpengine_config {
int reorder_codecs;
char *software_id;
int poller_per_thread;
char *mqtt_host;
int mqtt_port;
char *mqtt_id;
int mqtt_keepalive;
char *mqtt_user;
char *mqtt_pass;
char *mqtt_cafile;
char *mqtt_capath;
char *mqtt_certfile;
char *mqtt_keyfile;
int mqtt_publish_qos;
char *mqtt_publish_topic;
int mqtt_publish_interval;
enum {
MPS_NONE = -1,
MPS_GLOBAL = 0,
MPS_CALL,
MPS_MEDIA,
} mqtt_publish_scope;
};

@ -0,0 +1,31 @@
#ifndef _MQTT_H_
#define _MQTT_H_
#include <stdbool.h>
#include "main.h"
struct call;
struct call_media;
#ifdef HAVE_MQTT
int mqtt_init(void);
void mqtt_loop(void *);
int mqtt_publish_scope(void);
void mqtt_publish(char *);
void mqtt_timer_run(struct call *, struct call_media *);
#else
#include "compat.h"
INLINE int mqtt_init(void) { return 0; }
INLINE void mqtt_publish(char *s) { }
INLINE int mqtt_publish_scope(void) { return MPS_NONE; };
INLINE void mqtt_timer_run(struct call *c, struct call_media *m) { }
#endif
#endif

@ -64,9 +64,24 @@ struct ssrc_ctx {
last_seq, // XXX dup with srtp_index?
last_ts;
// for per-second stats:
atomic64 last_sample,
sample_packets,
sample_octets,
sample_packets_lost,
sample_duplicates;
struct timeval next_rtcp; // for self-generated RTCP reports
};
INLINE uint64_t ssrc_timeval_to_ts(const struct timeval *tv) {
return (tv->tv_sec << 20) | tv->tv_usec;
}
INLINE struct timeval ssrc_ts_to_timeval(uint64_t ts) {
return (struct timeval) { .tv_sec = ts >> 20, .tv_usec = ts & 0xfffff };
}
struct ssrc_stats_block {
struct timeval reported;
uint64_t jitter; // ms

@ -0,0 +1,13 @@
ifeq ($(shell pkg-config --exists libmosquitto && echo yes),yes)
have_mqtt := yes
mqtt_inc := $(shell pkg-config --cflags libmosquitto)
mqtt_lib := $(shell pkg-config --libs libmosquitto)
endif
ifeq ($(have_mqtt),yes)
CFLAGS+= -DHAVE_MQTT
CFLAGS+= $(mqtt_inc)
endif
ifeq ($(have_mqtt),yes)
LDLIBS+= $(mqtt_lib)
endif

1
t/.gitignore vendored

@ -67,3 +67,4 @@ aead-aes-crypt
tcp_listener.c
test-kernel-module
test-resample
mqtt.c

@ -74,7 +74,7 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c
DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \
dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \
media_player.c jitter_buffer.c t38.c tcp_listener.c
media_player.c jitter_buffer.c t38.c tcp_listener.c mqtt.c
HASHSRCS+= call_interfaces.c control_ng.c sdp.c
endif
@ -180,7 +180,7 @@ test-transcode: test-transcode.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr
rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \
control_ng.strhash.o \
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o
test-resample: test-resample.o $(COMMONOBJS) codeclib.o resample.o dtmflib.o

Loading…
Cancel
Save