diff --git a/README.md b/README.md index 2d3471990..cd176c4c2 100644 --- a/README.md +++ b/README.md @@ -815,8 +815,8 @@ Optionally included keys are: With this flag set, received RTCP packets will not simply be passed through as usual, but instead will be consumed, and instead *rtpengine* will generate its own - RTCP packets to send to the RTP peers. This is currently supported only for - transcoded streams, and the flag will be effective for both sides of a call. + RTCP packets to send to the RTP peers. This flag will be effective for both + sides of a call. * `replace` diff --git a/daemon/codec.c b/daemon/codec.c index 2ad173789..7d37af04e 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -1326,10 +1326,6 @@ next: // we have to translate RTCP packets receiver->rtcp_handler = rtcp_transcode_handler; - if (MEDIA_ISSET(receiver, RTCP_GEN)) { - receiver->rtcp_handler = rtcp_sink_handler; - __codec_rtcp_timer(receiver); - } __check_dtmf_injector(flags, receiver, pref_dest_codec, output_transcoders, dtmf_payload_type); @@ -1360,6 +1356,11 @@ next: } g_hash_table_destroy(output_transcoders); + + if (MEDIA_ISSET(receiver, RTCP_GEN)) { + receiver->rtcp_handler = rtcp_sink_handler; + __codec_rtcp_timer(receiver); + } } diff --git a/daemon/kernel.c b/daemon/kernel.c index 7ce3fefca..07d849d5a 100644 --- a/daemon/kernel.c +++ b/daemon/kernel.c @@ -240,3 +240,27 @@ unsigned int kernel_add_intercept_stream(unsigned int call_idx, const char *id) return UNINIT_IDX; return msg.u.stream.stream_idx; } + +int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out) { + struct rtpengine_message msg; + int ret; + + if (!kernel.is_open) + return -1; + + ZERO(msg); + msg.cmd = REMG_GET_RESET_STATS; + msg.u.stats.local = *a; + + ret = read(kernel.fd, &msg, sizeof(msg)); + if (ret <= 0) { + ilog(LOG_ERROR, "Failed to get stream stats from kernel: %s", strerror(errno)); + return -1; + } + if (msg.u.stats.ssrc != ssrc) + return -1; + + *out = msg.u.stats.ssrc_stats; + + return 0; +} diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 32fd89a37..b991fc9dd 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1137,6 +1137,7 @@ void kernelize(struct packet_stream *stream) { reti.dtls = MEDIA_ISSET(media, DTLS); reti.stun = media->ice_agent ? 1 : 0; reti.non_forwarding = non_forwarding; + reti.rtp_stats = MEDIA_ISSET(media, RTCP_GEN) ? 1 : 0; __re_address_translate_ep(&reti.dst_addr, &sink->endpoint); __re_address_translate_ep(&reti.src_addr, &sink->selected_sfd->socket.local); @@ -1179,7 +1180,9 @@ void kernelize(struct packet_stream *stream) { struct codec_handler *ch = codec_handler_get(media, rs->payload_type); if (!ch->kernelize) continue; - reti.payload_types[reti.num_payload_types++] = rs->payload_type; + reti.payload_types[reti.num_payload_types] = rs->payload_type; + reti.clock_rates[reti.num_payload_types] = ch->source_pt.clock_rate; + reti.num_payload_types++; } g_list_free(values); } @@ -1202,6 +1205,60 @@ no_kernel: PS_SET(stream, NO_KERNEL_SUPPORT); } +// must be called with appropriate locks (master lock and/or in_lock) +static void __stream_update_stats(struct packet_stream *ps, int have_in_lock) { + struct re_address local; + + if (!have_in_lock) + mutex_lock(&ps->in_lock); + + struct ssrc_ctx *ssrc_ctx = ps->ssrc_in; + struct ssrc_entry_call *parent = ssrc_ctx->parent; + + __re_address_translate_ep(&local, &ps->selected_sfd->socket.local); + struct rtpengine_ssrc_stats stats; + if (kernel_update_stats(&local, htonl(parent->h.ssrc), &stats)) { + if (!have_in_lock) + mutex_unlock(&ps->in_lock); + return; + } + + if (!stats.basic_stats.packets) { + // no change + if (!have_in_lock) + mutex_unlock(&ps->in_lock); + return; + } + + atomic64_add(&ssrc_ctx->packets, stats.basic_stats.packets); + atomic64_add(&ssrc_ctx->octets, stats.basic_stats.bytes); + atomic64_add(&ssrc_ctx->packets_lost, stats.total_lost); + atomic64_set(&ssrc_ctx->last_seq, stats.ext_seq); + atomic64_set(&ssrc_ctx->last_ts, stats.timestamp); + parent->jitter = stats.jitter; + + uint32_t ssrc_map_out = ssrc_ctx->ssrc_map_out; + + if (!have_in_lock) + mutex_unlock(&ps->in_lock); + + // update opposite outgoing SSRC + if (!have_in_lock) + mutex_lock(&ps->out_lock); + else { + if (mutex_trylock(&ps->out_lock)) + return; // will have to skip this + } + ssrc_ctx = ps->ssrc_out; + parent = ssrc_ctx->parent; + if (parent->h.ssrc == ssrc_map_out) { + atomic64_add(&ssrc_ctx->packets, stats.basic_stats.packets); + atomic64_add(&ssrc_ctx->octets, stats.basic_stats.bytes); + } + mutex_unlock(&ps->out_lock); +} + + /* must be called with in_lock held or call->master_lock held in W */ void __unkernelize(struct packet_stream *p) { struct re_address rea; @@ -1212,6 +1269,7 @@ void __unkernelize(struct packet_stream *p) { return; if (kernel.is_open) { + __stream_update_stats(p, 1); __re_address_translate_ep(&rea, &p->selected_sfd->socket.local); kernel_del_stream(&rea); } @@ -1241,6 +1299,24 @@ void unkernelize(struct packet_stream *ps) { mutex_unlock(&ps->in_lock); } +// master lock held in R +void media_update_stats(struct call_media *m) { + if (!proto_is_rtp(m->protocol)) + return; + if (!kernel.is_open) + return; + + for (GList *l = m->streams.head; l; l = l->next) { + struct packet_stream *ps = l->data; + if (!PS_ISSET(ps, RTP)) + continue; + if (!PS_ISSET(ps, KERNELIZED)) + continue; + + __stream_update_stats(ps, 0); + } +} + const struct streamhandler *determine_handler(const struct transport_protocol *in_proto, diff --git a/include/kernel.h b/include/kernel.h index 96f123d70..6dc7b54d4 100644 --- a/include/kernel.h +++ b/include/kernel.h @@ -17,6 +17,7 @@ struct rtpengine_target_info; struct re_address; +struct rtpengine_ssrc_stats; @@ -35,6 +36,7 @@ int kernel_setup_table(unsigned int); int kernel_add_stream(struct rtpengine_target_info *, int); int kernel_del_stream(const struct re_address *); GList *kernel_list(void); +int kernel_update_stats(const struct re_address *a, uint32_t ssrc, struct rtpengine_ssrc_stats *out); unsigned int kernel_add_call(const char *id); int kernel_del_call(unsigned int); diff --git a/include/media_socket.h b/include/media_socket.h index 36439cf49..94c93af47 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -176,6 +176,8 @@ void __unkernelize(struct packet_stream *); void unkernelize(struct packet_stream *); void __stream_unconfirm(struct packet_stream *); +void media_update_stats(struct call_media *m); + void media_packet_copy(struct media_packet *, const struct media_packet *); void media_packet_release(struct media_packet *); int media_socket_dequeue(struct media_packet *mp, struct packet_stream *sink); diff --git a/kernel-module/xt_RTPENGINE.c b/kernel-module/xt_RTPENGINE.c index 97a63c864..77106a0d9 100644 --- a/kernel-module/xt_RTPENGINE.c +++ b/kernel-module/xt_RTPENGINE.c @@ -1575,6 +1575,8 @@ static int proc_list_show(struct seq_file *f, void *v) { seq_printf(f, " option: transcoding\n"); if (g->target.non_forwarding) seq_printf(f, " option: non forwarding\n"); + if (g->target.rtp_stats) + seq_printf(f, " option: RTP stats\n"); target_put(g);