MT#55283 move SSRC stats into shm

Change-Id: I87c3a88ba8acca79cb4594c55706620ac496581d
pull/1826/head
Richard Fuchs 1 year ago
parent a1a1e42b02
commit e8982c3cf9

@ -2364,10 +2364,10 @@ static void ng_stats_stream_ssrc(bencode_item_t *dict, struct ssrc_ctx *const ss
bencode_item_t *ssrc = bencode_list_add_dictionary(list);
bencode_dictionary_add_integer(ssrc, "SSRC", ssrcs[i]->parent->h.ssrc);
bencode_dictionary_add_integer(ssrc, "bytes", atomic64_get_na(&c->octets));
bencode_dictionary_add_integer(ssrc, "packets", atomic64_get_na(&c->packets));
bencode_dictionary_add_integer(ssrc, "last RTP timestamp", atomic64_get_na(&c->last_ts));
bencode_dictionary_add_integer(ssrc, "last RTP seq", atomic64_get_na(&c->last_seq));
bencode_dictionary_add_integer(ssrc, "bytes", atomic64_get_na(&c->stats->bytes));
bencode_dictionary_add_integer(ssrc, "packets", atomic64_get_na(&c->stats->packets));
bencode_dictionary_add_integer(ssrc, "last RTP timestamp", atomic_get_na(&c->stats->timestamp));
bencode_dictionary_add_integer(ssrc, "last RTP seq", atomic_get_na(&c->stats->ext_seq));
}
}

@ -1845,8 +1845,8 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
packet->ts = packet_ts;
packet->marker = (mp->rtp->m_pt & 0x80) ? 1 : 0;
atomic64_inc(&ssrc_in->packets);
atomic64_add(&ssrc_in->octets, mp->payload.len);
atomic64_inc_na(&ssrc_in->stats->packets);
atomic64_add_na(&ssrc_in->stats->bytes, mp->payload.len);
atomic64_inc_na(&mp->sfd->local_intf->stats->in.packets);
atomic64_add_na(&mp->sfd->local_intf->stats->in.bytes, mp->payload.len);
@ -1969,7 +1969,7 @@ static int __handler_func_sequencer(struct media_packet *mp, struct transcode_pa
}
ssrc_in_p->packets_lost = seq->lost_count;
atomic64_set(&ssrc_in->last_seq, seq->ext_seq);
atomic_set_na(&ssrc_in->stats->ext_seq, seq->ext_seq);
ilogs(transcoding, LOG_DEBUG, "Processing RTP packet: seq %u, TS %lu",
packet->p.seq, packet->ts);
@ -2287,7 +2287,7 @@ static tc_code packet_dtmf(struct codec_ssrc_handler *ch, struct codec_ssrc_hand
uint64_t ts = packet->ts + ntohs(dtmf->duration) - duration;
// remember this as last "encoder" TS
atomic64_set(&mp->ssrc_in->last_ts, ts);
atomic_set_na(&mp->ssrc_in->stats->timestamp, ts);
// provide an uninitialised buffer as potential output storage for DTMF
char buf[sizeof(struct telephone_event_payload)];
@ -2702,7 +2702,7 @@ uint64_t codec_encoder_pts(struct codec_ssrc_handler *ch, struct ssrc_ctx *ssrc_
if (!ch || !ch->encoder) {
if (!ssrc_in)
return 0;
uint64_t cur = atomic64_get(&ssrc_in->last_ts);
uint64_t cur = atomic_get_na(&ssrc_in->stats->timestamp);
// return the TS of the next expected packet
if (ch)
cur += (uint64_t) ch->ptime * ch->handler->source_pt.clock_rate / 1000;

@ -318,12 +318,12 @@ static void __send_timer_send_common(struct send_timer *st, struct codec_packet
goto out;
if (cp->ssrc_out && cp->rtp) {
atomic64_inc(&cp->ssrc_out->packets);
atomic64_add(&cp->ssrc_out->octets, cp->s.len);
atomic64_inc_na(&cp->ssrc_out->stats->packets);
atomic64_add_na(&cp->ssrc_out->stats->bytes, cp->s.len);
if (cp->ts)
atomic64_set(&cp->ssrc_out->last_ts, cp->ts);
atomic_set_na(&cp->ssrc_out->stats->timestamp, cp->ts);
else
atomic64_set(&cp->ssrc_out->last_ts, ntohl(cp->rtp->timestamp));
atomic_set_na(&cp->ssrc_out->stats->timestamp, ntohl(cp->rtp->timestamp));
payload_tracker_add(&cp->ssrc_out->tracker, cp->rtp->m_pt & 0x7f);
}

@ -1779,17 +1779,17 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
if (!atomic64_get_na(&stats_info->ssrc_stats[u].packets)) // no change
continue;
atomic64_add(&ssrc_ctx->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets));
atomic64_add(&ssrc_ctx->octets, atomic64_get_na(&stats_info->ssrc_stats[u].bytes));
atomic64_add_na(&ssrc_ctx->stats->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets));
atomic64_add_na(&ssrc_ctx->stats->bytes, atomic64_get_na(&stats_info->ssrc_stats[u].bytes));
parent->packets_lost += stats_info->ssrc_stats[u].total_lost; // XXX should be atomic?
atomic64_set(&ssrc_ctx->last_seq, stats_info->ssrc_stats[u].ext_seq);
atomic_set_na(&ssrc_ctx->stats->ext_seq, stats_info->ssrc_stats[u].ext_seq);
parent->jitter = stats_info->ssrc_stats[u].jitter;
// update TS only if ahead or very different
uint64_t ts = atomic64_get(&ssrc_ctx->last_ts);
uint32_t ts = atomic_get_na(&ssrc_ctx->stats->timestamp);
uint64_t diff = ts - stats_info->ssrc_stats[u].timestamp;
if (diff > 1000000)
atomic64_set(&ssrc_ctx->last_ts, stats_info->ssrc_stats[u].timestamp);
atomic_set_na(&ssrc_ctx->stats->timestamp, stats_info->ssrc_stats[u].timestamp);
RTPE_STATS_ADD(packets_lost, stats_info->ssrc_stats[u].total_lost);
atomic64_add_na(&ps->selected_sfd->local_intf->stats->s.packets_lost,
@ -1811,8 +1811,8 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
if (ssrc_ctx) {
parent = ssrc_ctx->parent;
atomic64_add(&ssrc_ctx->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets));
atomic64_add(&ssrc_ctx->octets, atomic64_get_na(&stats_info->ssrc_stats[u].bytes));
atomic64_add_na(&ssrc_ctx->stats->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets));
atomic64_add_na(&ssrc_ctx->stats->bytes, atomic64_get_na(&stats_info->ssrc_stats[u].bytes));
}
mutex_unlock(&sink->out_lock);
@ -2841,10 +2841,10 @@ static int stream_packet(struct packet_handler_ctx *phc) {
// SSRC receive stats
if (phc->mp.ssrc_in && phc->mp.rtp) {
atomic64_inc(&phc->mp.ssrc_in->packets);
atomic64_add(&phc->mp.ssrc_in->octets, phc->s.len);
atomic64_inc_na(&phc->mp.ssrc_in->stats->packets);
atomic64_add_na(&phc->mp.ssrc_in->stats->bytes, phc->s.len);
// no real sequencing, so this is rudimentary
uint64_t old_seq = atomic64_get(&phc->mp.ssrc_in->last_seq);
uint64_t old_seq = atomic_get_na(&phc->mp.ssrc_in->stats->ext_seq);
uint64_t new_seq = ntohs(phc->mp.rtp->seq_num) | (old_seq & 0xffff0000UL);
// XXX combine this with similar code elsewhere
long seq_diff = new_seq - old_seq;
@ -2853,8 +2853,8 @@ static int stream_packet(struct packet_handler_ctx *phc) {
seq_diff += 0x10000;
}
if (seq_diff > 0 || seq_diff < -10) {
atomic64_set(&phc->mp.ssrc_in->last_seq, new_seq);
atomic64_set(&phc->mp.ssrc_in->last_ts, ntohl(phc->mp.rtp->timestamp));
atomic_set_na(&phc->mp.ssrc_in->stats->ext_seq, new_seq);
atomic_set_na(&phc->mp.ssrc_in->stats->timestamp, ntohl(phc->mp.rtp->timestamp));
}
}
@ -3563,7 +3563,7 @@ enum thread_looper_action kernel_stats_updater(void) {
if (!ctx)
continue;
// TODO: add in SSRC stats similar to __stream_update_stats
atomic64_set(&ctx->last_seq, ke->target.decrypt.last_rtp_index[u]);
atomic_set_na(&ctx->stats->ext_seq, ke->target.decrypt.last_rtp_index[u]);
if (rtpe_now.tv_sec - atomic64_get_na(&ps->stats_in->last_packet) < 2)
payload_tracker_add(&ctx->tracker,

@ -241,8 +241,8 @@ static void mqtt_ssrc_stats(struct ssrc_ctx *ssrc, JsonBuilder *json, struct cal
// copy out values
int64_t packets, octets, packets_lost, duplicates;
packets = atomic64_get(&ssrc->packets);
octets = atomic64_get(&ssrc->octets);
packets = atomic64_get_na(&ssrc->stats->packets);
octets = atomic64_get_na(&ssrc->stats->bytes);
packets_lost = sc->packets_lost;
duplicates = sc->duplicates;

@ -1322,7 +1322,7 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr)
// substitute our own values
unsigned int packets = atomic64_get(&input_ctx->packets);
unsigned int packets = atomic64_get(&input_ctx->stats->packets);
// we might not be keeping track of stats for this SSRC (handler_func_passthrough_ssrc).
// just leave the values in place.
@ -1352,7 +1352,7 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr)
else
rr->fraction_lost = tot_lost * 256 / (packets + lost);
rr->high_seq_received = htonl(atomic64_get(&input_ctx->last_seq));
rr->high_seq_received = htonl(atomic_get_na(&input_ctx->stats->ext_seq));
// XXX jitter, last SR
out:
@ -1367,7 +1367,7 @@ static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_pack
return;
if (!ctx->mp->ssrc_out)
return;
unsigned int packets = atomic64_get(&ctx->mp->ssrc_out->packets);
unsigned int packets = atomic64_get(&ctx->mp->ssrc_out->stats->packets);
// we might not be keeping track of stats for this SSRC (handler_func_passthrough_ssrc).
// just leave the values in place.
@ -1375,9 +1375,9 @@ static void transcode_sr(struct rtcp_process_ctx *ctx, struct sender_report_pack
return;
// substitute our own values
sr->octet_count = htonl(atomic64_get(&ctx->mp->ssrc_out->octets));
sr->octet_count = htonl(atomic64_get(&ctx->mp->ssrc_out->stats->bytes));
sr->packet_count = htonl(packets);
sr->timestamp = htonl(atomic64_get(&ctx->mp->ssrc_out->last_ts));
sr->timestamp = htonl(atomic_get_na(&ctx->mp->ssrc_out->stats->timestamp));
// XXX NTP timestamp
}
@ -1465,7 +1465,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
mutex_unlock(&se->h.lock);
uint64_t lost = se->packets_lost;
uint64_t tot = atomic64_get(&s->packets);
uint64_t tot = atomic64_get(&s->stats->packets);
*rr = (struct report_block) {
.ssrc = htonl(s->parent->h.ssrc),
@ -1473,7 +1473,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
.number_lost[0] = (lost >> 16) & 0xff,
.number_lost[1] = (lost >> 8) & 0xff,
.number_lost[2] = lost & 0xff,
.high_seq_received = htonl(atomic64_get(&s->last_seq)),
.high_seq_received = htonl(atomic_get_na(&s->stats->ext_seq)),
.lsr = htonl(ntp_middle_bits),
.dlsr = htonl(tv_diff * 65536 / 1000000),
.jitter = htonl(jitter >> 4),
@ -1486,7 +1486,7 @@ static GString *rtcp_sender_report(struct ssrc_sender_report *ssr,
.ssrc = s->parent->h.ssrc,
.fraction_lost = lost * 256 / (tot + lost),
.packets_lost = lost,
.high_seq_received = atomic64_get(&s->last_seq),
.high_seq_received = atomic_get_na(&s->stats->ext_seq),
.lsr = ntp_middle_bits,
.dlsr = tv_diff * 65536 / 1000000,
.jitter = jitter >> 4,
@ -1544,7 +1544,7 @@ void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash, struct call_mono
struct ssrc_ctx *i = &e->input_ctx;
if (i->ref != ml)
continue;
if (!atomic64_get(&i->packets))
if (!atomic64_get_na(&i->stats->packets))
continue;
ssrc_ctx_hold(i);
@ -1590,9 +1590,9 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
GString *sr = rtcp_sender_report(&ssr, ssrc_out->parent->h.ssrc,
ssrc_out->ssrc_map_out ? : ssrc_out->parent->h.ssrc,
atomic64_get(&ssrc_out->last_ts),
atomic64_get(&ssrc_out->packets),
atomic64_get(&ssrc_out->octets),
atomic_get_na(&ssrc_out->stats->timestamp),
atomic64_get_na(&ssrc_out->stats->packets),
atomic64_get(&ssrc_out->stats->bytes),
&rrs, &srrs);
// handle crypto

@ -6,6 +6,7 @@
#include "call.h"
#include "rtplib.h"
#include "codeclib.h"
#include "bufferpool.h"
static void __free_ssrc_entry_call(void *e);
@ -17,6 +18,7 @@ static void init_ssrc_ctx(struct ssrc_ctx *c, struct ssrc_entry_call *parent) {
c->ssrc_map_out = ssl_random();
c->seq_out = ssl_random();
atomic64_set_na(&c->last_sample, ssrc_timeval_to_ts(&rtpe_now));
c->stats = bufferpool_alloc0(shm_bufferpool, sizeof(*c->stats));
}
static void init_ssrc_entry(struct ssrc_entry *ent, uint32_t ssrc) {
ent->ssrc = ssrc;
@ -56,6 +58,8 @@ static void __free_ssrc_entry_call(void *ep) {
g_queue_clear_full(&e->stats_blocks, (GDestroyNotify) free_stats_block);
if (e->sequencers)
g_hash_table_destroy(e->sequencers);
bufferpool_unref(e->input_ctx.stats);
bufferpool_unref(e->output_ctx.stats);
}
static void ssrc_entry_put(void *ep) {
struct ssrc_entry_call *e = ep;

@ -51,10 +51,7 @@ struct ssrc_ctx {
uint16_t seq_out;
// RTCP stats
atomic64 packets,
octets,
last_seq, // XXX dup with srtp_index?
last_ts;
struct ssrc_stats *stats;
// for per-second stats:
atomic64 last_sample,

@ -17,6 +17,7 @@ struct global_sampled_min_max rtpe_sampled_graphite_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled;
__thread struct bufferpool *media_bufferpool;
void append_thread_lpr_to_glob_lpr(void) {}
struct bufferpool *shm_bufferpool;
int get_local_log_level(unsigned int u) {
return -1;

@ -17,6 +17,7 @@ struct global_sampled_min_max rtpe_sampled_graphite_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled;
__thread struct bufferpool *media_bufferpool;
void append_thread_lpr_to_glob_lpr(void) {}
struct bufferpool *shm_bufferpool;
static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) {
char buf[1024] = "";

Loading…
Cancel
Save