From e8982c3cf9f183ae9134db3b8c3cdd5e98b75748 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 9 Apr 2024 15:36:14 -0400 Subject: [PATCH] MT#55283 move SSRC stats into shm Change-Id: I87c3a88ba8acca79cb4594c55706620ac496581d --- daemon/call_interfaces.c | 8 ++++---- daemon/codec.c | 10 +++++----- daemon/media_player.c | 8 ++++---- daemon/media_socket.c | 26 +++++++++++++------------- daemon/mqtt.c | 4 ++-- daemon/rtcp.c | 24 ++++++++++++------------ daemon/ssrc.c | 4 ++++ include/ssrc.h | 5 +---- t/test-mix-buffer.c | 1 + t/test-payload-tracker.c | 1 + 10 files changed, 47 insertions(+), 44 deletions(-) diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 1e611f914..da9c0de77 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -2364,10 +2364,10 @@ static void ng_stats_stream_ssrc(bencode_item_t *dict, struct ssrc_ctx *const ss bencode_item_t *ssrc = bencode_list_add_dictionary(list); bencode_dictionary_add_integer(ssrc, "SSRC", ssrcs[i]->parent->h.ssrc); - bencode_dictionary_add_integer(ssrc, "bytes", atomic64_get_na(&c->octets)); - bencode_dictionary_add_integer(ssrc, "packets", atomic64_get_na(&c->packets)); - bencode_dictionary_add_integer(ssrc, "last RTP timestamp", atomic64_get_na(&c->last_ts)); - bencode_dictionary_add_integer(ssrc, "last RTP seq", atomic64_get_na(&c->last_seq)); + bencode_dictionary_add_integer(ssrc, "bytes", atomic64_get_na(&c->stats->bytes)); + bencode_dictionary_add_integer(ssrc, "packets", atomic64_get_na(&c->stats->packets)); + bencode_dictionary_add_integer(ssrc, "last RTP timestamp", atomic_get_na(&c->stats->timestamp)); + bencode_dictionary_add_integer(ssrc, "last RTP seq", atomic_get_na(&c->stats->ext_seq)); } } diff --git a/daemon/codec.c b/daemon/codec.c index 33964f4dc..09ee9fea0 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -1845,8 +1845,8 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa packet->ts = packet_ts; packet->marker = (mp->rtp->m_pt & 0x80) ? 1 : 0; - atomic64_inc(&ssrc_in->packets); - atomic64_add(&ssrc_in->octets, mp->payload.len); + atomic64_inc_na(&ssrc_in->stats->packets); + atomic64_add_na(&ssrc_in->stats->bytes, mp->payload.len); atomic64_inc_na(&mp->sfd->local_intf->stats->in.packets); atomic64_add_na(&mp->sfd->local_intf->stats->in.bytes, mp->payload.len); @@ -1969,7 +1969,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa } ssrc_in_p->packets_lost = seq->lost_count; - atomic64_set(&ssrc_in->last_seq, seq->ext_seq); + atomic_set_na(&ssrc_in->stats->ext_seq, seq->ext_seq); ilogs(transcoding, LOG_DEBUG, "Processing RTP packet: seq %u, TS %lu", packet->p.seq, packet->ts); @@ -2287,7 +2287,7 @@ static tc_code packet_dtmf(struct codec_ssrc_handler *ch, struct codec_ssrc_hand uint64_t ts = packet->ts + ntohs(dtmf->duration) - duration; // remember this as last "encoder" TS - atomic64_set(&mp->ssrc_in->last_ts, ts); + atomic_set_na(&mp->ssrc_in->stats->timestamp, ts); // provide an uninitialised buffer as potential output storage for DTMF char buf[sizeof(struct telephone_event_payload)]; @@ -2702,7 +2702,7 @@ uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_ctx *ssrc_ if (!ch || !ch->encoder) { if (!ssrc_in) return 0; - uint64_t cur = atomic64_get(&ssrc_in->last_ts); + uint64_t cur = atomic_get_na(&ssrc_in->stats->timestamp); // return the TS of the next expected packet if (ch) cur += (uint64_t) ch->ptime * ch->handler->source_pt.clock_rate / 1000; diff --git a/daemon/media_player.c b/daemon/media_player.c index 4444f0745..26f8d6ee9 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -318,12 +318,12 @@ static void __send_timer_send_common(struct send_timer *st, struct codec_packet goto out; if (cp->ssrc_out && cp->rtp) { - atomic64_inc(&cp->ssrc_out->packets); - atomic64_add(&cp->ssrc_out->octets, cp->s.len); + atomic64_inc_na(&cp->ssrc_out->stats->packets); + atomic64_add_na(&cp->ssrc_out->stats->bytes, cp->s.len); if (cp->ts) - atomic64_set(&cp->ssrc_out->last_ts, cp->ts); + atomic_set_na(&cp->ssrc_out->stats->timestamp, cp->ts); else - atomic64_set(&cp->ssrc_out->last_ts, ntohl(cp->rtp->timestamp)); + atomic_set_na(&cp->ssrc_out->stats->timestamp, ntohl(cp->rtp->timestamp)); payload_tracker_add(&cp->ssrc_out->tracker, cp->rtp->m_pt & 0x7f); } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index a9a2a3463..4b3d9c4fd 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1779,17 +1779,17 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng if (!atomic64_get_na(&stats_info->ssrc_stats[u].packets)) // no change continue; - atomic64_add(&ssrc_ctx->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets)); - atomic64_add(&ssrc_ctx->octets, atomic64_get_na(&stats_info->ssrc_stats[u].bytes)); + atomic64_add_na(&ssrc_ctx->stats->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets)); + atomic64_add_na(&ssrc_ctx->stats->bytes, atomic64_get_na(&stats_info->ssrc_stats[u].bytes)); parent->packets_lost += stats_info->ssrc_stats[u].total_lost; // XXX should be atomic? - atomic64_set(&ssrc_ctx->last_seq, stats_info->ssrc_stats[u].ext_seq); + atomic_set_na(&ssrc_ctx->stats->ext_seq, stats_info->ssrc_stats[u].ext_seq); parent->jitter = stats_info->ssrc_stats[u].jitter; // update TS only if ahead or very different - uint64_t ts = atomic64_get(&ssrc_ctx->last_ts); + uint32_t ts = atomic_get_na(&ssrc_ctx->stats->timestamp); uint64_t diff = ts - stats_info->ssrc_stats[u].timestamp; if (diff > 1000000) - atomic64_set(&ssrc_ctx->last_ts, stats_info->ssrc_stats[u].timestamp); + atomic_set_na(&ssrc_ctx->stats->timestamp, stats_info->ssrc_stats[u].timestamp); RTPE_STATS_ADD(packets_lost, stats_info->ssrc_stats[u].total_lost); atomic64_add_na(&ps->selected_sfd->local_intf->stats->s.packets_lost, @@ -1811,8 +1811,8 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng if (ssrc_ctx) { parent = ssrc_ctx->parent; - atomic64_add(&ssrc_ctx->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets)); - atomic64_add(&ssrc_ctx->octets, atomic64_get_na(&stats_info->ssrc_stats[u].bytes)); + atomic64_add_na(&ssrc_ctx->stats->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets)); + atomic64_add_na(&ssrc_ctx->stats->bytes, atomic64_get_na(&stats_info->ssrc_stats[u].bytes)); } mutex_unlock(&sink->out_lock); @@ -2841,10 +2841,10 @@ static int stream_packet(struct packet_handler_ctx *phc) { // SSRC receive stats if (phc->mp.ssrc_in && phc->mp.rtp) { - atomic64_inc(&phc->mp.ssrc_in->packets); - atomic64_add(&phc->mp.ssrc_in->octets, phc->s.len); + atomic64_inc_na(&phc->mp.ssrc_in->stats->packets); + atomic64_add_na(&phc->mp.ssrc_in->stats->bytes, phc->s.len); // no real sequencing, so this is rudimentary - uint64_t old_seq = atomic64_get(&phc->mp.ssrc_in->last_seq); + uint64_t old_seq = atomic_get_na(&phc->mp.ssrc_in->stats->ext_seq); uint64_t new_seq = ntohs(phc->mp.rtp->seq_num) | (old_seq & 0xffff0000UL); // XXX combine this with similar code elsewhere long seq_diff = new_seq - old_seq; @@ -2853,8 +2853,8 @@ static int stream_packet(struct packet_handler_ctx *phc) { seq_diff += 0x10000; } if (seq_diff > 0 || seq_diff < -10) { - atomic64_set(&phc->mp.ssrc_in->last_seq, new_seq); - atomic64_set(&phc->mp.ssrc_in->last_ts, ntohl(phc->mp.rtp->timestamp)); + atomic_set_na(&phc->mp.ssrc_in->stats->ext_seq, new_seq); + atomic_set_na(&phc->mp.ssrc_in->stats->timestamp, ntohl(phc->mp.rtp->timestamp)); } } @@ -3563,7 +3563,7 @@ enum thread_looper_action kernel_stats_updater(void) { if (!ctx) continue; // TODO: add in SSRC stats similar to __stream_update_stats - atomic64_set(&ctx->last_seq, ke->target.decrypt.last_rtp_index[u]); + atomic_set_na(&ctx->stats->ext_seq, ke->target.decrypt.last_rtp_index[u]); if (rtpe_now.tv_sec - atomic64_get_na(&ps->stats_in->last_packet) < 2) payload_tracker_add(&ctx->tracker, diff --git a/daemon/mqtt.c b/daemon/mqtt.c index c1e4ca00f..7505b2587 100644 --- a/daemon/mqtt.c +++ b/daemon/mqtt.c @@ -241,8 +241,8 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal // copy out values int64_t packets, octets, packets_lost, duplicates; - packets = atomic64_get(&ssrc->packets); - octets = atomic64_get(&ssrc->octets); + packets = atomic64_get_na(&ssrc->stats->packets); + octets = atomic64_get_na(&ssrc->stats->bytes); packets_lost = sc->packets_lost; duplicates = sc->duplicates; diff --git a/daemon/rtcp.c b/daemon/rtcp.c index 1331bb19a..20a06daf2 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -1322,7 +1322,7 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) // substitute our own values - unsigned int packets = atomic64_get(&input_ctx->packets); + unsigned int packets = atomic64_get(&input_ctx->stats->packets); // we might not be keeping track of stats for this SSRC (handler_func_passthrough_ssrc). // just leave the values in place. @@ -1352,7 +1352,7 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) else rr->fraction_lost = tot_lost * 256 / (packets + lost); - rr->high_seq_received = htonl(atomic64_get(&input_ctx->last_seq)); + rr->high_seq_received = htonl(atomic_get_na(&input_ctx->stats->ext_seq)); // XXX jitter, last SR out: @@ -1367,7 +1367,7 @@ static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_pack return; if (!ctx->mp->ssrc_out) return; - unsigned int packets = atomic64_get(&ctx->mp->ssrc_out->packets); + unsigned int packets = atomic64_get(&ctx->mp->ssrc_out->stats->packets); // we might not be keeping track of stats for this SSRC (handler_func_passthrough_ssrc). // just leave the values in place. @@ -1375,9 +1375,9 @@ static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_pack return; // substitute our own values - sr->octet_count = htonl(atomic64_get(&ctx->mp->ssrc_out->octets)); + sr->octet_count = htonl(atomic64_get(&ctx->mp->ssrc_out->stats->bytes)); sr->packet_count = htonl(packets); - sr->timestamp = htonl(atomic64_get(&ctx->mp->ssrc_out->last_ts)); + sr->timestamp = htonl(atomic_get_na(&ctx->mp->ssrc_out->stats->timestamp)); // XXX NTP timestamp } @@ -1465,7 +1465,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, mutex_unlock(&se->h.lock); uint64_t lost = se->packets_lost; - uint64_t tot = atomic64_get(&s->packets); + uint64_t tot = atomic64_get(&s->stats->packets); *rr = (struct report_block) { .ssrc = htonl(s->parent->h.ssrc), @@ -1473,7 +1473,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, .number_lost[0] = (lost >> 16) & 0xff, .number_lost[1] = (lost >> 8) & 0xff, .number_lost[2] = lost & 0xff, - .high_seq_received = htonl(atomic64_get(&s->last_seq)), + .high_seq_received = htonl(atomic_get_na(&s->stats->ext_seq)), .lsr = htonl(ntp_middle_bits), .dlsr = htonl(tv_diff * 65536 / 1000000), .jitter = htonl(jitter >> 4), @@ -1486,7 +1486,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr, .ssrc = s->parent->h.ssrc, .fraction_lost = lost * 256 / (tot + lost), .packets_lost = lost, - .high_seq_received = atomic64_get(&s->last_seq), + .high_seq_received = atomic_get_na(&s->stats->ext_seq), .lsr = ntp_middle_bits, .dlsr = tv_diff * 65536 / 1000000, .jitter = jitter >> 4, @@ -1544,7 +1544,7 @@ void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash, struct call_mono struct ssrc_ctx *i = &e->input_ctx; if (i->ref != ml) continue; - if (!atomic64_get(&i->packets)) + if (!atomic64_get_na(&i->stats->packets)) continue; ssrc_ctx_hold(i); @@ -1590,9 +1590,9 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) { GString *sr = rtcp_sender_report(&ssr, ssrc_out->parent->h.ssrc, ssrc_out->ssrc_map_out ? : ssrc_out->parent->h.ssrc, - atomic64_get(&ssrc_out->last_ts), - atomic64_get(&ssrc_out->packets), - atomic64_get(&ssrc_out->octets), + atomic_get_na(&ssrc_out->stats->timestamp), + atomic64_get_na(&ssrc_out->stats->packets), + atomic64_get(&ssrc_out->stats->bytes), &rrs, &srrs); // handle crypto diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 42c8fd0ce..1089adda7 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -6,6 +6,7 @@ #include "call.h" #include "rtplib.h" #include "codeclib.h" +#include "bufferpool.h" static void __free_ssrc_entry_call(void *e); @@ -17,6 +18,7 @@ static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) { c->ssrc_map_out = ssl_random(); c->seq_out = ssl_random(); atomic64_set_na(&c->last_sample, ssrc_timeval_to_ts(&rtpe_now)); + c->stats = bufferpool_alloc0(shm_bufferpool, sizeof(*c->stats)); } static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) { ent->ssrc = ssrc; @@ -56,6 +58,8 @@ static void __free_ssrc_entry_call(void *ep) { g_queue_clear_full(&e->stats_blocks, (GDestroyNotify) free_stats_block); if (e->sequencers) g_hash_table_destroy(e->sequencers); + bufferpool_unref(e->input_ctx.stats); + bufferpool_unref(e->output_ctx.stats); } static void ssrc_entry_put(void *ep) { struct ssrc_entry_call *e = ep; diff --git a/include/ssrc.h b/include/ssrc.h index 3f8ba08c8..1778ca527 100644 --- a/include/ssrc.h +++ b/include/ssrc.h @@ -51,10 +51,7 @@ struct ssrc_ctx { uint16_t seq_out; // RTCP stats - atomic64 packets, - octets, - last_seq, // XXX dup with srtp_index? - last_ts; + struct ssrc_stats *stats; // for per-second stats: atomic64 last_sample, diff --git a/t/test-mix-buffer.c b/t/test-mix-buffer.c index bad796ffb..1bd9cb129 100644 --- a/t/test-mix-buffer.c +++ b/t/test-mix-buffer.c @@ -17,6 +17,7 @@ struct global_sampled_min_max rtpe_sampled_graphite_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; __thread struct bufferpool *media_bufferpool; void append_thread_lpr_to_glob_lpr(void) {} +struct bufferpool *shm_bufferpool; int get_local_log_level(unsigned int u) { return -1; diff --git a/t/test-payload-tracker.c b/t/test-payload-tracker.c index 688c42b7a..8271de769 100644 --- a/t/test-payload-tracker.c +++ b/t/test-payload-tracker.c @@ -17,6 +17,7 @@ struct global_sampled_min_max rtpe_sampled_graphite_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; __thread struct bufferpool *media_bufferpool; void append_thread_lpr_to_glob_lpr(void) {} +struct bufferpool *shm_bufferpool; static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) { char buf[1024] = "";