TT#99621 update RTP stats from kernel for RTCP

Change-Id: Iff54c4b8bac9df7c0e58c8dbdc424e6c4faf8780
pull/1134/head
Richard Fuchs 5 years ago
parent 09026c1910
commit a15e3a28c9

@ -815,8 +815,8 @@ Optionally included keys are:
With this flag set, received RTCP packets will not simply be passed through as With this flag set, received RTCP packets will not simply be passed through as
usual, but instead will be consumed, and instead *rtpengine* will generate its own usual, but instead will be consumed, and instead *rtpengine* will generate its own
RTCP packets to send to the RTP peers. This is currently supported only for RTCP packets to send to the RTP peers. This flag will be effective for both
transcoded streams, and the flag will be effective for both sides of a call. sides of a call.
* `replace` * `replace`

@ -1326,10 +1326,6 @@ next:
// we have to translate RTCP packets // we have to translate RTCP packets
receiver->rtcp_handler = rtcp_transcode_handler; receiver->rtcp_handler = rtcp_transcode_handler;
if (MEDIA_ISSET(receiver, RTCP_GEN)) {
receiver->rtcp_handler = rtcp_sink_handler;
__codec_rtcp_timer(receiver);
}
__check_dtmf_injector(flags, receiver, pref_dest_codec, output_transcoders, dtmf_payload_type); __check_dtmf_injector(flags, receiver, pref_dest_codec, output_transcoders, dtmf_payload_type);
@ -1360,6 +1356,11 @@ next:
} }
g_hash_table_destroy(output_transcoders); g_hash_table_destroy(output_transcoders);
if (MEDIA_ISSET(receiver, RTCP_GEN)) {
receiver->rtcp_handler = rtcp_sink_handler;
__codec_rtcp_timer(receiver);
}
} }

@ -240,3 +240,27 @@ unsigned int kernel_add_intercept_stream(unsigned int call_idx, const char *id)
return UNINIT_IDX; return UNINIT_IDX;
return msg.u.stream.stream_idx; return msg.u.stream.stream_idx;
} }
int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out) {
struct rtpengine_message msg;
int ret;
if (!kernel.is_open)
return -1;
ZERO(msg);
msg.cmd = REMG_GET_RESET_STATS;
msg.u.stats.local = *a;
ret = read(kernel.fd, &msg, sizeof(msg));
if (ret <= 0) {
ilog(LOG_ERROR, "Failed to get stream stats from kernel: %s", strerror(errno));
return -1;
}
if (msg.u.stats.ssrc != ssrc)
return -1;
*out = msg.u.stats.ssrc_stats;
return 0;
}

@ -1137,6 +1137,7 @@ void kernelize(struct packet_stream *stream) {
reti.dtls = MEDIA_ISSET(media, DTLS); reti.dtls = MEDIA_ISSET(media, DTLS);
reti.stun = media->ice_agent ? 1 : 0; reti.stun = media->ice_agent ? 1 : 0;
reti.non_forwarding = non_forwarding; reti.non_forwarding = non_forwarding;
reti.rtp_stats = MEDIA_ISSET(media, RTCP_GEN) ? 1 : 0;
__re_address_translate_ep(&reti.dst_addr, &sink->endpoint); __re_address_translate_ep(&reti.dst_addr, &sink->endpoint);
__re_address_translate_ep(&reti.src_addr, &sink->selected_sfd->socket.local); __re_address_translate_ep(&reti.src_addr, &sink->selected_sfd->socket.local);
@ -1179,7 +1180,9 @@ void kernelize(struct packet_stream *stream) {
struct codec_handler *ch = codec_handler_get(media, rs->payload_type); struct codec_handler *ch = codec_handler_get(media, rs->payload_type);
if (!ch->kernelize) if (!ch->kernelize)
continue; continue;
reti.payload_types[reti.num_payload_types++] = rs->payload_type; reti.payload_types[reti.num_payload_types] = rs->payload_type;
reti.clock_rates[reti.num_payload_types] = ch->source_pt.clock_rate;
reti.num_payload_types++;
} }
g_list_free(values); g_list_free(values);
} }
@ -1202,6 +1205,60 @@ no_kernel:
PS_SET(stream, NO_KERNEL_SUPPORT); PS_SET(stream, NO_KERNEL_SUPPORT);
} }
// must be called with appropriate locks (master lock and/or in_lock)
static void __stream_update_stats(struct packet_stream *ps, int have_in_lock) {
struct re_address local;
if (!have_in_lock)
mutex_lock(&ps->in_lock);
struct ssrc_ctx *ssrc_ctx = ps->ssrc_in;
struct ssrc_entry_call *parent = ssrc_ctx->parent;
__re_address_translate_ep(&local, &ps->selected_sfd->socket.local);
struct rtpengine_ssrc_stats stats;
if (kernel_update_stats(&local, htonl(parent->h.ssrc), &stats)) {
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
return;
}
if (!stats.basic_stats.packets) {
// no change
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
return;
}
atomic64_add(&ssrc_ctx->packets, stats.basic_stats.packets);
atomic64_add(&ssrc_ctx->octets, stats.basic_stats.bytes);
atomic64_add(&ssrc_ctx->packets_lost, stats.total_lost);
atomic64_set(&ssrc_ctx->last_seq, stats.ext_seq);
atomic64_set(&ssrc_ctx->last_ts, stats.timestamp);
parent->jitter = stats.jitter;
uint32_t ssrc_map_out = ssrc_ctx->ssrc_map_out;
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
// update opposite outgoing SSRC
if (!have_in_lock)
mutex_lock(&ps->out_lock);
else {
if (mutex_trylock(&ps->out_lock))
return; // will have to skip this
}
ssrc_ctx = ps->ssrc_out;
parent = ssrc_ctx->parent;
if (parent->h.ssrc == ssrc_map_out) {
atomic64_add(&ssrc_ctx->packets, stats.basic_stats.packets);
atomic64_add(&ssrc_ctx->octets, stats.basic_stats.bytes);
}
mutex_unlock(&ps->out_lock);
}
/* must be called with in_lock held or call->master_lock held in W */ /* must be called with in_lock held or call->master_lock held in W */
void __unkernelize(struct packet_stream *p) { void __unkernelize(struct packet_stream *p) {
struct re_address rea; struct re_address rea;
@ -1212,6 +1269,7 @@ void __unkernelize(struct packet_stream *p) {
return; return;
if (kernel.is_open) { if (kernel.is_open) {
__stream_update_stats(p, 1);
__re_address_translate_ep(&rea, &p->selected_sfd->socket.local); __re_address_translate_ep(&rea, &p->selected_sfd->socket.local);
kernel_del_stream(&rea); kernel_del_stream(&rea);
} }
@ -1241,6 +1299,24 @@ void unkernelize(struct packet_stream *ps) {
mutex_unlock(&ps->in_lock); mutex_unlock(&ps->in_lock);
} }
// master lock held in R
void media_update_stats(struct call_media *m) {
if (!proto_is_rtp(m->protocol))
return;
if (!kernel.is_open)
return;
for (GList *l = m->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
if (!PS_ISSET(ps, RTP))
continue;
if (!PS_ISSET(ps, KERNELIZED))
continue;
__stream_update_stats(ps, 0);
}
}
const struct streamhandler *determine_handler(const struct transport_protocol *in_proto, const struct streamhandler *determine_handler(const struct transport_protocol *in_proto,

@ -17,6 +17,7 @@
struct rtpengine_target_info; struct rtpengine_target_info;
struct re_address; struct re_address;
struct rtpengine_ssrc_stats;
@ -35,6 +36,7 @@ int kernel_setup_table(unsigned int);
int kernel_add_stream(struct rtpengine_target_info *, int); int kernel_add_stream(struct rtpengine_target_info *, int);
int kernel_del_stream(const struct re_address *); int kernel_del_stream(const struct re_address *);
GList *kernel_list(void); GList *kernel_list(void);
int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out);
unsigned int kernel_add_call(const char *id); unsigned int kernel_add_call(const char *id);
int kernel_del_call(unsigned int); int kernel_del_call(unsigned int);

@ -176,6 +176,8 @@ void __unkernelize(struct packet_stream *);
void unkernelize(struct packet_stream *); void unkernelize(struct packet_stream *);
void __stream_unconfirm(struct packet_stream *); void __stream_unconfirm(struct packet_stream *);
void media_update_stats(struct call_media *m);
void media_packet_copy(struct media_packet *, const struct media_packet *); void media_packet_copy(struct media_packet *, const struct media_packet *);
void media_packet_release(struct media_packet *); void media_packet_release(struct media_packet *);
int media_socket_dequeue(struct media_packet *mp, struct packet_stream *sink); int media_socket_dequeue(struct media_packet *mp, struct packet_stream *sink);

@ -1575,6 +1575,8 @@ static int proc_list_show(struct seq_file *f, void *v) {
seq_printf(f, " option: transcoding\n"); seq_printf(f, " option: transcoding\n");
if (g->target.non_forwarding) if (g->target.non_forwarding)
seq_printf(f, " option: non forwarding\n"); seq_printf(f, " option: non forwarding\n");
if (g->target.rtp_stats)
seq_printf(f, " option: RTP stats\n");
target_put(g); target_put(g);

Loading…
Cancel
Save