diff --git a/daemon/mqtt.c b/daemon/mqtt.c index 695f67454..97a11498a 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -161,6 +161,9 @@ static void mqtt_monologue_stats(struct call_monologue *ml, JsonBuilder *json) { static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct call_media *media) { + if (!ssrc || !media) + return; + struct ssrc_entry_call *sc = ssrc->parent; json_builder_set_member_name(json, "SSRC"); @@ -173,7 +176,6 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal mutex_unlock(&ssrc->tracker.lock); unsigned int clockrate = 0; - //struct codec_handler *h = codec_handler_get(ps->media, prim_pt); struct rtp_payload_type *pt = g_hash_table_lookup(media->codecs.codecs, GUINT_TO_POINTER(prim_pt)); if (pt) { json_builder_set_member_name(json, "codec"); @@ -285,6 +287,16 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal } +static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *json) { + json_builder_set_member_name(json, "bytes"); + json_builder_add_int_value(json, atomic64_get(&s->bytes)); + json_builder_set_member_name(json, "packets"); + json_builder_add_int_value(json, atomic64_get(&s->packets)); + json_builder_set_member_name(json, "errors"); + json_builder_add_int_value(json, atomic64_get(&s->errors)); +} + + static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { mutex_lock(&ps->in_lock); @@ -297,23 +309,43 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) { json_builder_add_int_value(json, sfd->socket.local.port); } - if (ps->ssrc_in[0]) { - json_builder_set_member_name(json, "ingress"); + json_builder_set_member_name(json, "ingress"); + json_builder_begin_object(json); + mqtt_stream_stats_dir(&ps->stats_in, json); + + json_builder_set_member_name(json, "SSRC"); + json_builder_begin_array(json); + for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) { + if (!ps->ssrc_in[i]) + break; json_builder_begin_object(json); - mqtt_ssrc_stats(ps->ssrc_in[0], json, ps->media); + mqtt_ssrc_stats(ps->ssrc_in[i], json, ps->media); json_builder_end_object(json); } + json_builder_end_array(json); + + json_builder_end_object(json); mutex_unlock(&ps->in_lock); mutex_lock(&ps->out_lock); - if (ps->ssrc_out[0]) { - json_builder_set_member_name(json, "egress"); + json_builder_set_member_name(json, "egress"); + json_builder_begin_object(json); + mqtt_stream_stats_dir(&ps->stats_out, json); + + json_builder_set_member_name(json, "SSRC"); + json_builder_begin_array(json); + for (int i = 0; i < RTPE_NUM_SSRC_TRACKING; i++) { + if (!ps->ssrc_out[i]) + break; json_builder_begin_object(json); - mqtt_ssrc_stats(ps->ssrc_out[0], json, ps->media); + mqtt_ssrc_stats(ps->ssrc_out[i], json, ps->media); json_builder_end_object(json); } + json_builder_end_array(json); + + json_builder_end_object(json); mutex_unlock(&ps->out_lock); }