#ifdef HAVE_MQTT #include "mqtt.h" #include #include #include #include #include #include #include #include "main.h" #include "log.h" #include "log_funcs.h" #include "call.h" #include "ssrc.h" #include "rtplib.h" #include "media_player.h" static struct mosquitto *mosq; static bool is_connected = false; static struct interface_sampled_rate_stats interface_rate_stats; static void mqtt_ssrc_stats(struct ssrc_entry_call *ssrc, JsonBuilder *json, struct call_media *media); int mqtt_init(void) { interface_sampled_rate_stats_init(&interface_rate_stats); mosq = mosquitto_new(rtpe_config.mqtt_id, true, NULL); if (!mosq) { ilog(LOG_ERR, "Failed to create mosquitto client instance: %s", strerror(errno)); return -1; } return 0; } static int mqtt_connect(void) { ilog(LOG_DEBUG, "Connecting to mosquitto..."); mosquitto_disconnect(mosq); int ret = mosquitto_reinitialise(mosq, rtpe_config.mqtt_id, true, NULL); if (ret) { ilog(LOG_ERR, "Failed to initialise mosquitto client instance: %s", mosquitto_strerror(ret)); return -1; } mosquitto_threaded_set(mosq, true); if (rtpe_config.mqtt_user) { ret = mosquitto_username_pw_set(mosq, rtpe_config.mqtt_user, rtpe_config.mqtt_pass); if (ret != MOSQ_ERR_SUCCESS) { ilog(LOG_ERR, "Failed to set mosquitto user/pass auth: %s", mosquitto_strerror(errno)); return -1; } } if (rtpe_config.mqtt_cafile || rtpe_config.mqtt_capath) { ret = mosquitto_tls_set(mosq, rtpe_config.mqtt_cafile, rtpe_config.mqtt_capath, rtpe_config.mqtt_certfile, rtpe_config.mqtt_keyfile, NULL); if (ret != MOSQ_ERR_SUCCESS) { ilog(LOG_ERR, "Failed to set mosquitto TLS options: %s", mosquitto_strerror(errno)); return -1; } } if (rtpe_config.mqtt_tls_alpn) { #if LIBMOSQUITTO_VERSION_NUMBER >= 1006000 ret = mosquitto_string_option(mosq, MOSQ_OPT_TLS_ALPN, rtpe_config.mqtt_tls_alpn); if (ret != MOSQ_ERR_SUCCESS) { ilog(LOG_ERR, "Failed to set mosquitto TLS ALPN options: %s", mosquitto_strerror(errno)); return -1; } #else ilog(LOG_WARN, "Cannot set mqtt TLS ALPN due to outdated mosquitto library"); #endif } ret = mosquitto_connect(mosq, rtpe_config.mqtt_host, rtpe_config.mqtt_port, rtpe_config.mqtt_keepalive); if (ret != MOSQ_ERR_SUCCESS) { ilog(LOG_ERR, "Failed to connect to mosquitto broker: %s", mosquitto_strerror(ret)); return -1; } ilog(LOG_DEBUG, "Successfully connected to mosquitto"); return 0; } void mqtt_loop(void *dummy) { while (!rtpe_shutdown) { while (!is_connected && !rtpe_shutdown) { if (!mqtt_connect()) { is_connected = true; break; } usleep(1000000); } unsigned int errors = 0; while (!rtpe_shutdown) { int ret = mosquitto_loop(mosq, 100, 1); if (ret == MOSQ_ERR_SUCCESS) { errors = 0; continue; } if (ret == MOSQ_ERR_ERRNO) ilog(LOG_ERR, "Error from mosquitto: %s", strerror(errno)); else ilog(LOG_ERR, "Error from mosquitto: %s", mosquitto_strerror(ret)); errors++; if (errors >= 5) { ilog(LOG_WARN, "Reconnecting to mosquitto"); break; } } mosquitto_disconnect(mosq); is_connected = false; } mosquitto_destroy(mosq); mosq = NULL; } int mqtt_publish_scope(void) { if (!mosq) return MPS_NONE; return rtpe_config.mqtt_publish_scope; } void mqtt_publish(char *s) { ilog(LOG_DEBUG, "Publishing to mosquitto: %s%s%s", FMT_M(s)); int ret = mosquitto_publish(mosq, NULL, rtpe_config.mqtt_publish_topic, strlen(s), s, rtpe_config.mqtt_publish_qos, false); if (ret != MOSQ_ERR_SUCCESS) ilog(LOG_WARN | LOG_FLAG_LIMIT, "Error publishing message to mosquitto: %s", mosquitto_strerror(ret)); g_free(s); } static void mqtt_call_stats(call_t *call, JsonBuilder *json) { json_builder_set_member_name(json, "call_id"); glib_json_builder_add_str(json, &call->callid); } static void mqtt_monologue_stats(struct call_monologue *ml, JsonBuilder *json) { if (ml->tag.len) { json_builder_set_member_name(json, "tag"); glib_json_builder_add_str(json, &ml->tag); } if (ml->label.len) { json_builder_set_member_name(json, "label"); glib_json_builder_add_str(json, &ml->label); } #ifdef WITH_TRANSCODING struct media_player *mp = ml->player; if (mp) { mutex_lock(&mp->lock); json_builder_set_member_name(json, "media_player"); json_builder_begin_object(json); json_builder_set_member_name(json, "duration"); json_builder_add_int_value(json, mp->coder.duration); json_builder_set_member_name(json, "repeat"); json_builder_add_int_value(json, mp->opts.repeat); json_builder_set_member_name(json, "frame_time"); json_builder_add_int_value(json, mp->last_frame_ts); if (mp->ssrc_out && mp->media) { json_builder_set_member_name(json, "SSRC"); json_builder_begin_object(json); mqtt_ssrc_stats(mp->ssrc_out, json, mp->media); json_builder_end_object(json); } json_builder_end_object(json); mutex_unlock(&mp->lock); } #endif } static void mqtt_ssrc_stats(struct ssrc_entry_call *ssrc, JsonBuilder *json, struct call_media *media) { if (!ssrc || !media) return; json_builder_set_member_name(json, "SSRC"); json_builder_add_int_value(json, ssrc->h.ssrc); unsigned char prim_pt = 255; mutex_lock(&ssrc->tracker.lock); if (ssrc->tracker.most_len > 0) prim_pt = ssrc->tracker.most[0]; mutex_unlock(&ssrc->tracker.lock); unsigned int clockrate = 0; rtp_payload_type *pt = t_hash_table_lookup(media->codecs.codecs, GUINT_TO_POINTER(prim_pt)); if (pt) { json_builder_set_member_name(json, "codec"); glib_json_builder_add_str(json, &pt->encoding); json_builder_set_member_name(json, "clock_rate"); json_builder_add_int_value(json, pt->clock_rate); clockrate = pt->clock_rate; if (pt->encoding_parameters.s) { json_builder_set_member_name(json, "codec_params"); glib_json_builder_add_str(json, &pt->encoding_parameters); } if (pt->format_parameters.s) { json_builder_set_member_name(json, "codec_format"); glib_json_builder_add_str(json, &pt->format_parameters); } } json_builder_set_member_name(json, "metrics"); json_builder_begin_object(json); // copy out values int64_t packets, octets, packets_lost, duplicates; packets = atomic64_get_na(&ssrc->stats->packets); octets = atomic64_get_na(&ssrc->stats->bytes); packets_lost = ssrc->packets_lost; duplicates = ssrc->duplicates; // process per-second stats int64_t cur_ts = rtpe_now; int64_t last_sample; int64_t sample_packets, sample_octets, sample_packets_lost, sample_duplicates; // sample values last_sample = atomic64_get_set(&ssrc->last_sample, cur_ts); sample_packets = atomic64_get_set(&ssrc->sample_packets, packets); sample_octets = atomic64_get_set(&ssrc->sample_octets, octets); sample_packets_lost = atomic64_get_set(&ssrc->sample_packets_lost, packets_lost); sample_duplicates = atomic64_get_set(&ssrc->sample_duplicates, duplicates); json_builder_set_member_name(json, "packets"); json_builder_add_int_value(json, packets); json_builder_set_member_name(json, "bytes"); json_builder_add_int_value(json, octets); json_builder_set_member_name(json, "lost"); json_builder_add_int_value(json, packets_lost); json_builder_set_member_name(json, "duplicates"); json_builder_add_int_value(json, duplicates); if (last_sample && last_sample != cur_ts) { // calc sample rates with primitive math int64_t last_sample_ts = last_sample; int64_t usecs_diff = rtpe_now - last_sample_ts; // adjust samples packets -= sample_packets; octets -= sample_octets; packets_lost -= sample_packets_lost; duplicates -= sample_duplicates; json_builder_set_member_name(json, "packets_per_second"); json_builder_add_double_value(json, (double) packets * 1000000.0 / usecs_diff); json_builder_set_member_name(json, "bytes_per_second"); json_builder_add_double_value(json, (double) octets * 1000000.0 / usecs_diff); json_builder_set_member_name(json, "lost_per_second"); json_builder_add_double_value(json, (double) packets_lost * 1000000.0 / usecs_diff); json_builder_set_member_name(json, "duplicates_per_second"); json_builder_add_double_value(json, (double) duplicates * 1000000.0 / usecs_diff); } mutex_lock(&ssrc->h.lock); uint32_t jitter = ssrc->jitter; int64_t mos = -1, rtt = -1, rtt_leg = -1; if (ssrc->stats_blocks.length) { struct ssrc_stats_block *sb = ssrc->stats_blocks.tail->data; mos = sb->mos; rtt = sb->rtt; rtt_leg = sb->rtt_leg; } mutex_unlock(&ssrc->h.lock); if (clockrate) { json_builder_set_member_name(json, "jitter"); json_builder_add_double_value(json, (double) jitter * 1000.0 / (double) clockrate); } if (mos != -1 && mos != 0) { json_builder_set_member_name(json, "MOS"); json_builder_add_double_value(json, (double) mos / 10.0); } if (rtt != -1) { json_builder_set_member_name(json, "RTT"); json_builder_add_double_value(json, (double) rtt / 1000.0); } if (rtt_leg != -1) { json_builder_set_member_name(json, "RTT_leg"); json_builder_add_double_value(json, (double) rtt_leg / 1000.0); } json_builder_end_object(json); } static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *json) { json_builder_set_member_name(json, "bytes"); json_builder_add_int_value(json, atomic64_get_na(&s->bytes)); json_builder_set_member_name(json, "packets"); json_builder_add_int_value(json, atomic64_get_na(&s->packets)); json_builder_set_member_name(json, "errors"); json_builder_add_int_value(json, atomic64_get_na(&s->errors)); } static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { mutex_lock(&ps->in_lock); stream_fd *sfd = ps->selected_sfd; if (sfd) { json_builder_set_member_name(json, "address"); json_builder_add_string_value(json, sockaddr_print_buf(&sfd->socket.local.address)); json_builder_set_member_name(json, "port"); json_builder_add_int_value(json, sfd->socket.local.port); json_builder_set_member_name(json, "endpoint_address"); json_builder_add_string_value(json, sockaddr_print_buf(&ps->endpoint.address)); json_builder_set_member_name(json, "endpoint_port"); json_builder_add_int_value(json, ps->endpoint.port); } if (ps->crypto.params.crypto_suite) { json_builder_set_member_name(json, "crypto_suite"); json_builder_add_string_value(json, ps->crypto.params.crypto_suite->name); } json_builder_set_member_name(json, "transcoding"); json_builder_add_boolean_value(json, MEDIA_ISSET(ps->media, TRANSCODING) ? TRUE : FALSE); json_builder_set_member_name(json, "ingress"); json_builder_begin_object(json); mqtt_stream_stats_dir(ps->stats_in, json); json_builder_end_object(json); mutex_unlock(&ps->in_lock); mutex_lock(&ps->out_lock); json_builder_set_member_name(json, "egress"); json_builder_begin_object(json); mqtt_stream_stats_dir(ps->stats_out, json); json_builder_end_object(json); mutex_unlock(&ps->out_lock); } static void mqtt_media_stats(struct call_media *media, JsonBuilder *json) { json_builder_set_member_name(json, "media_index"); json_builder_add_int_value(json, media->index); json_builder_set_member_name(json, "type"); glib_json_builder_add_str(json, &media->type); json_builder_set_member_name(json, "interface"); glib_json_builder_add_str(json, &media->logical_intf->name); if (media->protocol) { json_builder_set_member_name(json, "protocol"); json_builder_add_string_value(json, media->protocol->name); } json_builder_set_member_name(json, "status"); if (MEDIA_ISSET(media, SEND)) { if (MEDIA_ISSET(media, RECV)) json_builder_add_string_value(json, "sendrecv"); else json_builder_add_string_value(json, "sendonly"); } else { if (MEDIA_ISSET(media, RECV)) json_builder_add_string_value(json, "recvonly"); else json_builder_add_string_value(json, "inactive"); } mutex_lock(&media->ssrc_hash_in.lock); json_builder_set_member_name(json, "ingress"); json_builder_begin_object(json); json_builder_set_member_name(json, "SSRC"); json_builder_begin_array(json); for (GList *l = media->ssrc_hash_in.nq.head; l; l = l->next) { struct ssrc_entry_call *se = l->data; json_builder_begin_object(json); mqtt_ssrc_stats(se, json, media); json_builder_end_object(json); } json_builder_end_array(json); json_builder_end_object(json); mutex_unlock(&media->ssrc_hash_in.lock); mutex_lock(&media->ssrc_hash_out.lock); json_builder_set_member_name(json, "egress"); json_builder_begin_object(json); json_builder_set_member_name(json, "SSRC"); json_builder_begin_array(json); for (GList *l = media->ssrc_hash_out.nq.head; l; l = l->next) { struct ssrc_entry_call *se = l->data; json_builder_begin_object(json); mqtt_ssrc_stats(se, json, media); json_builder_end_object(json); } json_builder_end_array(json); json_builder_end_object(json); mutex_unlock(&media->ssrc_hash_out.lock); struct packet_stream *ps = media->streams.head ? media->streams.head->data : NULL; if (ps) mqtt_stream_stats(ps, json); } static void mqtt_full_call(call_t *call, JsonBuilder *json) { rwlock_lock_r(&call->master_lock); log_info_call(call); mqtt_call_stats(call, json); json_builder_set_member_name(json, "legs"); json_builder_begin_array(json); for (__auto_type l = call->monologues.head; l; l = l->next) { struct call_monologue *ml = l->data; json_builder_begin_object(json); mqtt_monologue_stats(ml, json); json_builder_set_member_name(json, "medias"); json_builder_begin_array(json); for (unsigned int k = 0; k < ml->medias->len; k++) { struct call_media *media = ml->medias->pdata[k]; if (!media) continue; json_builder_begin_object(json); mqtt_media_stats(media, json); json_builder_end_object(json); } json_builder_end_array(json); json_builder_end_object(json); } json_builder_end_array(json); rwlock_unlock_r(&call->master_lock); log_info_pop(); } static void mqtt_global_stats(JsonBuilder *json) { g_autoptr(stats_metric_q) metrics = statistics_gather_metrics(&interface_rate_stats); for (__auto_type l = metrics->head; l; l = l->next) { stats_metric *m = l->data; if (!m->label) continue; if (m->is_bracket && l == metrics->head) // skip initial { continue; if (m->is_bracket && l == metrics->tail) // skip final } continue; if (m->value_short) { json_builder_set_member_name(json, m->label); if (m->is_int) json_builder_add_int_value(json, m->int_value); else if (m->is_double) json_builder_add_double_value(json, m->double_value); else if (m->value_raw) json_builder_add_string_value(json, m->value_raw); else json_builder_add_string_value(json, m->value_short); } else if (m->is_bracket) { if (m->is_close_bracket) { if (m->is_brace) json_builder_end_object(json); else json_builder_end_array(json); } else { if (m->is_brace) json_builder_begin_object(json); else json_builder_begin_array(json); } } else json_builder_set_member_name(json, m->label); } } INLINE JsonBuilder *__mqtt_timer_intro(void) { JsonBuilder *json = json_builder_new(); json_builder_begin_object(json); json_builder_set_member_name(json, "timestamp"); json_builder_add_double_value(json, (double) rtpe_now / 1000000.); return json; } INLINE void __mqtt_timer_outro(JsonBuilder *json) { json_builder_end_object(json); mqtt_publish(glib_json_print(json)); } void mqtt_timer_run_media(call_t *call, struct call_media *media) { JsonBuilder *json = __mqtt_timer_intro(); rwlock_lock_r(&call->master_lock); log_info_call(call); mqtt_call_stats(call, json); mqtt_monologue_stats(media->monologue, json); mqtt_media_stats(media, json); rwlock_unlock_r(&call->master_lock); log_info_pop(); __mqtt_timer_outro(json); } void mqtt_timer_run_call(call_t *call) { JsonBuilder *json = __mqtt_timer_intro(); mqtt_full_call(call, json); __mqtt_timer_outro(json); } void mqtt_timer_run_global(void) { JsonBuilder *json = __mqtt_timer_intro(); mqtt_global_stats(json); json_builder_set_member_name(json, "calls"); json_builder_begin_array(json); ITERATE_CALL_LIST_START(CALL_ITERATOR_MQTT, call); json_builder_begin_object(json); mqtt_full_call(call, json); json_builder_end_object(json); ITERATE_CALL_LIST_NEXT_END(call); json_builder_end_array(json); __mqtt_timer_outro(json); } void mqtt_timer_run_summary(void) { JsonBuilder *json = __mqtt_timer_intro(); mqtt_global_stats(json); __mqtt_timer_outro(json); } #endif