MT#55283 move last_packet timestamp to shm

Keep two separate timestamps, one updated by userspace code only and the
other updated by kernel only. This way we can tell where the packet
processing happens. For code that wants to report only the last
timestamp regardless of which one of the two it is, we add a convenience
function that just returns the newer one.

Change-Id: Ib3af7aa55006d8b32e2bc3db4f8bfa5514c57e40
pull/1826/head
Richard Fuchs 1 year ago
parent 065270ba49
commit 311f5bc31c

@ -138,7 +138,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
int tmp_t_reason = UNKNOWN;
struct call_monologue *ml;
enum call_stream_state css;
atomic64 *timestamp;
uint64_t timestamp;
hlp->count++;
@ -185,7 +185,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
for (__auto_type it = c->streams.head; it; it = it->next) {
ps = it->data;
timestamp = &ps->last_packet;
timestamp = packet_stream_last_packet(ps);
if (!ps->media)
goto next;
@ -198,7 +198,7 @@ static void call_timer_iterator(call_t *c, struct iterator_helper *hlp) {
css = call_stream_state_machine(ps);
if (css == CSS_ICE)
timestamp = &ps->media->ice_agent->last_activity;
timestamp = atomic64_get_na(&ps->media->ice_agent->last_activity);
no_sfd:
if (good)
@ -215,7 +215,7 @@ no_sfd:
tmp_t_reason = OFFER_TIMEOUT;
}
if (rtpe_now.tv_sec - atomic64_get(timestamp) < check)
if (rtpe_now.tv_sec - timestamp < check)
good = true;
next:
@ -3747,7 +3747,7 @@ void call_destroy(call_t *c) {
atomic64_get_na(&ps->stats_in->packets),
atomic64_get_na(&ps->stats_in->bytes),
atomic64_get_na(&ps->stats_in->errors),
rtpe_now.tv_sec - atomic64_get(&ps->last_packet),
rtpe_now.tv_sec - packet_stream_last_packet(ps),
atomic64_get_na(&ps->stats_out->packets),
atomic64_get_na(&ps->stats_out->bytes),
atomic64_get_na(&ps->stats_out->errors));

@ -2396,7 +2396,7 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps
if (ps->crypto.params.crypto_suite)
bencode_dictionary_add_string(dict, "crypto suite",
ps->crypto.params.crypto_suite->name);
bencode_dictionary_add_integer(dict, "last packet", atomic64_get(&ps->last_packet));
bencode_dictionary_add_integer(dict, "last packet", packet_stream_last_packet(ps));
flags = bencode_dictionary_add_list(dict, "flags");
@ -2416,8 +2416,8 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps
ng_stats_stream_ssrc(dict, ps->ssrc_out, "egress SSRCs");
stats:
if (totals->last_packet < atomic64_get(&ps->last_packet))
totals->last_packet = atomic64_get(&ps->last_packet);
if (totals->last_packet < packet_stream_last_packet(ps))
totals->last_packet = packet_stream_last_packet(ps);
/* XXX distinguish between input and output */
s = &totals->totals[0];

@ -148,7 +148,7 @@ void cdr_update_entry(call_t * c) {
cdrlinecnt, md->index, protocol,
atomic64_get_na(&ps->stats_in->errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
packet_stream_last_packet(ps),
cdrlinecnt, md->index, protocol,
ps->in_tos_tclass);
} else {
@ -173,7 +173,7 @@ void cdr_update_entry(call_t * c) {
cdrlinecnt, md->index, protocol,
atomic64_get_na(&ps->stats_in->errors),
cdrlinecnt, md->index, protocol,
atomic64_get(&ps->last_packet),
packet_stream_last_packet(ps),
cdrlinecnt, md->index, protocol,
ps->in_tos_tclass);
}

@ -717,7 +717,7 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml)
atomic64_get_na(&ps->stats_in->packets),
atomic64_get_na(&ps->stats_in->bytes),
atomic64_get_na(&ps->stats_in->errors),
atomic64_get_na(&ps->last_packet));
packet_stream_last_packet(ps));
cw->cw_printf(cw, "\n");
}
}

@ -3527,8 +3527,9 @@ enum thread_looper_action kernel_stats_updater(void) {
DS(bytes);
DS(errors);
if (diff_packets_in != 0) {
atomic64_set(&ps->last_packet, rtpe_now.tv_sec);
// stats_in->last_packet is updated by the kernel only, so we can use it
// to count kernel streams
if (rtpe_now.tv_sec - atomic64_get_na(&ps->stats_in->last_packet) < 2) {
count_stream_stats_kernel(ps);
}

@ -470,7 +470,7 @@ struct packet_stream {
struct stream_stats kernel_stats_in;
struct stream_stats kernel_stats_out;
unsigned char in_tos_tclass;
atomic64 last_packet;
atomic64 last_packet; // userspace only
GHashTable *rtp_stats; /* LOCK: call->master_lock */
struct rtp_stats *rtp_stats_cache;
atomic64 stats_flags;
@ -489,6 +489,12 @@ struct packet_stream {
atomic64 ps_flags;
};
INLINE uint64_t packet_stream_last_packet(const struct packet_stream *ps) {
uint64_t lp1 = atomic64_get_na(&ps->last_packet);
uint64_t lp2 = atomic64_get_na(&ps->stats_in->last_packet);
return MAX(lp1, lp2);
}
/**
* Protected by call->master_lock, except the RO elements.
*

@ -43,6 +43,7 @@ struct stream_stats {
atomic64 packets;
atomic64 bytes;
atomic64 errors;
atomic64 last_packet;
};
#endif

@ -5300,6 +5300,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
unsigned int i;
unsigned int start_idx, end_idx;
enum {NOT_RTCP = 0, RTCP, RTCP_FORWARD} is_rtcp;
ktime_t packet_ts;
skb_reset_transport_header(skb);
uh = udp_hdr(skb);
@ -5349,6 +5350,8 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
goto out_error;
}
packet_ts = ktime_divns(skb->tstamp, 1000000000LL);
if (g->target.dtls && is_dtls(skb))
goto out;
if (g->target.non_forwarding && !g->target.do_intercept) {
@ -5517,6 +5520,7 @@ static unsigned int rtpengine46(struct sk_buff *skb, struct sk_buff *oskb,
do_stats:
atomic_set(&g->tos, in_tos);
atomic64_set(&g->target.stats->last_packet, packet_ts);
atomic64_inc(&g->target.stats->packets);
atomic64_add(datalen, &g->target.stats->bytes);

Loading…
Cancel
Save