MT#55283 track SSRC stats in kernel via shm

This eliminates the need to periodically update the stats via syscalls.

Change-Id: I12ce84e541c7a95b323fb52297efd1983fde9dd3
pull/1826/head
Richard Fuchs 1 year ago
parent 9ba3297d44
commit af96fc777f

@ -1610,8 +1610,11 @@ output:
if (reti->track_ssrc) {
for (unsigned int u = 0; u < G_N_ELEMENTS(stream->ssrc_in); u++) {
if (sink->ssrc_out[u])
if (sink->ssrc_out[u]) {
// XXX order can be different from ingress?
redi->output.seq_offset[u] = sink->ssrc_out[u]->parent->seq_diff;
redi->output.ssrc_stats[u] = sink->ssrc_out[u]->stats;
}
if (redi->output.ssrc_subst && stream->ssrc_in[u])
redi->output.ssrc_out[u] = htonl(stream->ssrc_in[u]->ssrc_map_out);
@ -1776,50 +1779,9 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
struct ssrc_ctx *ssrc_ctx = __hunt_ssrc_ctx(ssrc, ps->ssrc_in, u);
if (!ssrc_ctx)
continue;
struct ssrc_entry_call *parent = ssrc_ctx->parent;
if (!atomic64_get_na(&stats_info->ssrc_stats[u].packets)) // no change
continue;
atomic64_add_na(&ssrc_ctx->stats->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets));
atomic64_add_na(&ssrc_ctx->stats->bytes, atomic64_get_na(&stats_info->ssrc_stats[u].bytes));
parent->packets_lost += stats_info->ssrc_stats[u].total_lost; // XXX should be atomic?
atomic_set_na(&ssrc_ctx->stats->ext_seq, stats_info->ssrc_stats[u].ext_seq);
parent->jitter = stats_info->ssrc_stats[u].jitter;
// update TS only if ahead or very different
uint32_t ts = atomic_get_na(&ssrc_ctx->stats->timestamp);
uint64_t diff = ts - stats_info->ssrc_stats[u].timestamp;
if (diff > 1000000)
atomic_set_na(&ssrc_ctx->stats->timestamp, stats_info->ssrc_stats[u].timestamp);
RTPE_STATS_ADD(packets_lost, stats_info->ssrc_stats[u].total_lost);
atomic64_add_na(&ps->selected_sfd->local_intf->stats->s.packets_lost,
stats_info->ssrc_stats[u].total_lost);
uint32_t ssrc_map_out = ssrc_ctx->ssrc_map_out;
// update opposite outgoing SSRC
for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
if (mutex_trylock(&sink->out_lock))
continue; // will have to skip this
ssrc_ctx = __hunt_ssrc_ctx(ssrc, sink->ssrc_out, u);
if (!ssrc_ctx)
ssrc_ctx = __hunt_ssrc_ctx(ssrc_map_out, sink->ssrc_out, u);
if (ssrc_ctx) {
parent = ssrc_ctx->parent;
atomic64_add_na(&ssrc_ctx->stats->packets, atomic64_get_na(&stats_info->ssrc_stats[u].packets));
atomic64_add_na(&ssrc_ctx->stats->bytes, atomic64_get_na(&stats_info->ssrc_stats[u].bytes));
}
mutex_unlock(&sink->out_lock);
}
for (__auto_type l = ps->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;

@ -321,7 +321,6 @@ struct rtpengine_target {
unsigned int last_pt; // index into pt_input[] and pt_output[]
spinlock_t ssrc_stats_lock;
struct ssrc_stats ssrc_stats[RTPE_NUM_SSRC_TRACKING];
struct re_crypto_context decrypt_rtp;
struct re_crypto_context decrypt_rtcp;
@ -1860,15 +1859,6 @@ static void target_retrieve_stats(struct rtpengine_target *g, struct rtpengine_s
spin_lock_irqsave(&g->ssrc_stats_lock, flags);
for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
i->ssrc[u] = g->target.ssrc[u];
i->ssrc_stats[u] = g->ssrc_stats[u];
atomic64_set(&g->ssrc_stats[u].packets, 0);
atomic64_set(&g->ssrc_stats[u].bytes, 0);
atomic_set(&g->ssrc_stats[u].total_lost, 0);
}
for (u = 0; u < g->target.num_destinations; u++) {
for (v = 0; v < RTPE_NUM_SSRC_TRACKING; v++)
i->last_rtcp_index[u][v] = g->outputs[u].output.encrypt.last_rtcp_index[v];
@ -2472,10 +2462,8 @@ static int table_new_target(struct rtpengine_table *t, struct rtpengine_target_i
crypto_context_init(&g->decrypt_rtp, &g->target.decrypt);
crypto_context_init(&g->decrypt_rtcp, &g->target.decrypt);
spin_lock_init(&g->ssrc_stats_lock);
for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
g->ssrc_stats[u].lost_bits = -1;
for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++)
g->target.ssrc_stats[u] = ssrc_stats[u];
}
rwlock_init(&g->outputs_lock);
g->target.iface_stats = iface_stats;
g->target.stats = stats;
@ -2601,6 +2589,8 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des
struct rtpengine_target *g;
struct interface_stats_block *iface_stats;
struct stream_stats *stats;
struct ssrc_stats *ssrc_stats[RTPE_NUM_SSRC_TRACKING];
unsigned int u;
// validate input
@ -2619,6 +2609,15 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des
stats = shm_map_resolve(i->output.stats, sizeof(*stats));
if (!stats)
return -EFAULT;
for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
// XXX order expected to be the same as input target
// XXX validate if target->ssrc[u] is set?
if (!i->output.ssrc_stats[u])
break;
ssrc_stats[u] = shm_map_resolve(i->output.ssrc_stats[u], sizeof(*ssrc_stats[u]));
if (!ssrc_stats[u])
return -EFAULT;
}
g = get_target(t, &i->local);
@ -2646,6 +2645,8 @@ static int table_add_destination(struct rtpengine_table *t, struct rtpengine_des
g->outputs[i->num].output = i->output;
g->outputs[i->num].output.iface_stats = iface_stats;
g->outputs[i->num].output.stats = stats;
for (u = 0; u < RTPE_NUM_SSRC_TRACKING; u++)
g->outputs[i->num].output.ssrc_stats[u] = ssrc_stats[u];
// init crypto stuff lock free: the "output" is already filled so we
// know it's there, but outputs_unfilled hasn't been decreased yet, so
@ -5200,6 +5201,13 @@ static bool proxy_packet_output_rtXp(struct sk_buff *skb, struct rtpengine_outpu
srtp_authenticate(&o->encrypt_rtp, &o->output.encrypt, rtp, pkt_idx);
skb_put(skb, rtp->payload_len - pllen);
if (ssrc_idx >= 0 && o->output.ssrc_stats[ssrc_idx]) {
atomic64_inc(&o->output.ssrc_stats[ssrc_idx]->packets);
atomic64_add(rtp->payload_len, &o->output.ssrc_stats[ssrc_idx]->bytes);
atomic_set(&o->output.ssrc_stats[ssrc_idx]->ext_seq, pkt_idx);
atomic_set(&o->output.ssrc_stats[ssrc_idx]->timestamp, ntohl(rtp->rtp_header->timestamp));
}
return true;
}
@ -5221,10 +5229,10 @@ static int send_proxy_packet_output(struct sk_buff *skb, struct rtpengine_target
static void rtp_stats(struct rtpengine_target *g, struct rtp_parsed *rtp, s64 arrival_time, int pt_idx,
int ssrc_idx)
int ssrc_idx, struct global_stats_counter *rtpe_stats)
{
unsigned long flags;
struct ssrc_stats *s = &g->ssrc_stats[ssrc_idx];
struct ssrc_stats *s = g->target.ssrc_stats[ssrc_idx];
uint16_t old_seq_trunc;
uint32_t last_seq;
uint16_t seq_diff;
@ -5273,13 +5281,18 @@ static void rtp_stats(struct rtpengine_target *g, struct rtp_parsed *rtp, s64 ar
if (seq_diff >= (sizeof(s->lost_bits) * 8)) {
// complete loss
atomic_add(sizeof(s->lost_bits) * 8, &s->total_lost);
atomic64_add(sizeof(s->lost_bits) * 8, &g->target.iface_stats->s.packets_lost);
atomic64_add(sizeof(s->lost_bits) * 8, &rtpe_stats->packets_lost);
s->lost_bits = -1;
}
else {
while (seq_diff) {
// shift out one bit and see if we lost it
if ((s->lost_bits & 0x80000000) == 0)
if ((s->lost_bits & 0x80000000) == 0) {
atomic_inc(&s->total_lost);
atomic64_inc(&g->target.iface_stats->s.packets_lost);
atomic64_inc(&rtpe_stats->packets_lost);
}
s->lost_bits <<= 1;
seq_diff--;
}
@ -5458,7 +5471,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
skb_trim(skb, rtp.header_len + rtp.payload_len);
if (g->target.rtp_stats && ssrc_idx != -1 && rtp_pt_idx >= 0)
rtp_stats(g, &rtp, ktime_to_us(skb->tstamp), rtp_pt_idx, ssrc_idx);
rtp_stats(g, &rtp, ktime_to_us(skb->tstamp), rtp_pt_idx, ssrc_idx, t->rtpe_stats);
DBG("packet payload decrypted as %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x...\n",
rtp.payload[0], rtp.payload[1], rtp.payload[2], rtp.payload[3],

@ -127,6 +127,7 @@ struct rtpengine_output_info {
struct interface_stats_block *iface_stats; // for egress stats
struct stream_stats *stats; // for egress stats
struct ssrc_stats *ssrc_stats[RTPE_NUM_SSRC_TRACKING];
unsigned char tos;
unsigned int ssrc_subst:1;

Loading…
Cancel
Save