TT#78307 generate RTCP for media players

Change-Id: Ic6dd65211ca253491a7ef3acd35499155b60fe4c
changes/47/38947/16
Richard Fuchs 5 years ago
parent 92bcedc08a
commit a168a3c024

@ -642,6 +642,7 @@ static void call_timer(void *ptr) {
if (ps->ssrc_in && ntohl(ke->target.ssrc) == ps->ssrc_in->parent->h.ssrc) {
atomic64_add(&ps->ssrc_in->octets, diff_bytes);
atomic64_add(&ps->ssrc_in->packets, diff_packets);
atomic64_set(&ps->ssrc_in->last_seq, ke->target.decrypt.last_index);
ps->ssrc_in->srtp_index = ke->target.decrypt.last_index;
if (sfd->crypto.params.crypto_suite

@ -15,6 +15,7 @@
#include "ssrc.h"
#include "log_funcs.h"
#include "main.h"
#include "rtcp.h"
@ -108,6 +109,7 @@ struct media_player *media_player_new(struct call_monologue *ml) {
while (ssrc == 0)
ssrc = random();
struct ssrc_ctx *ssrc_ctx = get_ssrc_ctx(ssrc, ml->call->ssrc_hash, SSRC_DIR_OUTPUT, ml);
ssrc_ctx->next_rtcp = rtpe_now;
struct media_player *mp = obj_alloc0("media_player", sizeof(*mp), __media_player_free);
@ -162,6 +164,53 @@ struct send_timer *send_timer_new(struct packet_stream *ps) {
}
// call is locked in R
static void send_timer_send_rtcp(struct ssrc_ctx *ssrc_out, struct call *call, struct packet_stream *ps) {
GQueue rrs = G_QUEUE_INIT;
rtcp_receiver_reports(&rrs, call->ssrc_hash, ps->media->monologue);
ilog(LOG_DEBUG, "Generating and sending RTCP SR for %x and up to %i source(s)",
ssrc_out->parent->h.ssrc, rrs.length);
GString *sr = rtcp_sender_report(ssrc_out->parent->h.ssrc,
atomic64_get(&ssrc_out->last_ts),
atomic64_get(&ssrc_out->packets),
atomic64_get(&ssrc_out->octets),
&rrs);
socket_sendto(&ps->selected_sfd->socket, sr->str, sr->len, &ps->endpoint);
g_string_free(sr, TRUE);
}
// call is locked in R
static void send_timer_rtcp(struct send_timer *st, struct ssrc_ctx *ssrc_out) {
struct call_media *media = st->sink ? st->sink->media : NULL;
if (!media)
return;
struct call *call = media->call;
// figure out where to send it
struct packet_stream *ps = media->streams.head->data;
if (MEDIA_ISSET(media, RTCP_MUX))
;
else if (!media->streams.head->next)
;
else {
struct packet_stream *next_ps = media->streams.head->next->data;
if (PS_ISSET(next_ps, RTCP))
ps = next_ps;
}
log_info_stream_fd(ps->selected_sfd);
send_timer_send_rtcp(ssrc_out, call, ps);
// XXX missing locking?
ssrc_out->next_rtcp = rtpe_now;
timeval_add_usec(&ssrc_out->next_rtcp, 5000000);
}
static void __send_timer_send_common(struct send_timer *st, struct codec_packet *cp) {
if (!st->sink->selected_sfd)
goto out;
@ -191,6 +240,13 @@ static void __send_timer_send_common(struct send_timer *st, struct codec_packet
payload_tracker_add(&cp->ssrc_out->tracker, cp->rtp->m_pt & 0x7f);
}
// do we send RTCP?
struct ssrc_ctx *ssrc_out = cp->ssrc_out;
if (ssrc_out && ssrc_out->next_rtcp.tv_sec) {
if (timeval_diff(&ssrc_out->next_rtcp, &rtpe_now) < 0)
send_timer_rtcp(st, ssrc_out);
}
out:
codec_packet_free(cp);
}

@ -15,6 +15,7 @@
#include "media_socket.h"
#include "rtcplib.h"
#include "ssrc.h"
#include "sdp.h"
@ -1376,3 +1377,115 @@ void rtcp_init() {
rtcp_handlers.logging = _log_facility_rtcp ? &log_handlers : &dummy_handlers;
rtcp_handlers.homer = has_homer() ? &homer_handlers : &dummy_handlers;
}
GString *rtcp_sender_report(uint32_t ssrc, uint32_t ts, uint32_t packets, uint32_t octets, GQueue *rrs) {
GString *ret = g_string_sized_new(128);
g_string_set_size(ret, sizeof(struct sender_report_packet));
struct sender_report_packet *sr = (void *) ret->str;
*sr = (struct sender_report_packet) {
.rtcp.header.version = 2,
.rtcp.header.pt = RTCP_PT_SR,
.rtcp.ssrc = htonl(ssrc),
.ntp_msw = htonl(rtpe_now.tv_sec + 2208988800),
.ntp_lsw = htonl((4294967295ULL * rtpe_now.tv_usec) / 1000000ULL),
.timestamp = htonl(ts), // XXX calculate from rtpe_now instead
.packet_count = htonl(packets),
.octet_count = htonl(octets),
};
// receiver reports
int i = 0, n = 0;
for (GList *l = rrs->head; l; l = l->next) {
struct ssrc_ctx *s = l->data;
if (i < 30) {
struct report_block *rr = (void *) ret->str + ret->len;
g_string_set_size(ret, ret->len + sizeof(*rr));
// XXX unify with transcode_rr
// last received SR?
struct ssrc_entry_call *se = s->parent;
long long tv_diff = 0;
uint32_t ntp_middle_bits = 0;
mutex_lock(&se->h.lock);
if (se->sender_reports.length) {
struct ssrc_time_item *si = se->sender_reports.tail->data;
tv_diff = timeval_diff(&rtpe_now, &si->received);
ntp_middle_bits = si->ntp_middle_bits;
}
mutex_unlock(&se->h.lock);
uint64_t lost = atomic64_get(&s->packets_lost);
uint64_t tot = atomic64_get(&s->packets);
*rr = (struct report_block) {
.ssrc = htonl(s->parent->h.ssrc),
.fraction_lost = lost * 256 / (tot + lost),
.number_lost[0] = (lost >> 16) & 0xff,
.number_lost[1] = (lost >> 8) & 0xff,
.number_lost[2] = lost & 0xff,
.high_seq_received = htonl(atomic64_get(&s->last_seq)),
.lsr = htonl(ntp_middle_bits),
.dlsr = htonl(tv_diff * 65536 / 1000000),
};
// XXX jitter
n++;
}
ssrc_ctx_put(&s);
i++;
}
sr->rtcp.header.count = n;
sr->rtcp.header.length = htons((ret->len >> 2) - 1);
// sdes
assert(rtpe_instance_id.len == 12);
struct {
struct source_description_packet sdes;
struct sdes_chunk chunk;
struct sdes_item cname;
char str[12];
char nul;
char pad;
} __attribute__ ((packed)) *sdes;
assert(sizeof(*sdes) == 24);
sdes = (void *) ret->str + ret->len;
g_string_set_size(ret, ret->len + sizeof(*sdes));
*sdes = (__typeof(*sdes)) {
.sdes.header.version = 2,
.sdes.header.pt = RTCP_PT_SDES,
.sdes.header.count = 1,
.sdes.header.length = htons((sizeof(*sdes) >> 2) - 1),
.chunk.ssrc = htonl(ssrc),
.cname.type = SDES_TYPE_CNAME,
.cname.length = rtpe_instance_id.len,
.nul = 0,
.pad = 0,
};
memcpy(sdes->str, rtpe_instance_id.s, rtpe_instance_id.len);
return ret;
}
void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash, struct call_monologue *ml) {
rwlock_lock_r(&hash->lock);
for (GList *l = hash->q.head; l; l = l->next) {
struct ssrc_entry_call *e = l->data;
ilog(LOG_DEBUG, "xxxxx %x %i %i %p %p %p", e->h.ssrc, (int) atomic64_get(&e->input_ctx.packets), (int) atomic64_get(&e->output_ctx.packets), ml, e->input_ctx.ref, e->output_ctx.ref);
struct ssrc_ctx *i = &e->input_ctx;
if (i->ref != ml)
continue;
if (!atomic64_get(&i->packets))
continue;
g_queue_push_tail(out, ssrc_ctx_get(i));
}
rwlock_unlock_r(&hash->lock);
}

@ -264,7 +264,7 @@ struct sdp_attribute { /* example: a=rtpmap:8 PCMA/8000 */
static char __id_buf[6*2 + 1]; // 6 hex encoded characters
static const str instance_id = STR_CONST_INIT(__id_buf);
const str rtpe_instance_id = STR_CONST_INIT(__id_buf);
@ -2333,7 +2333,7 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call_monologu
if (flags->loop_protect) {
chopper_append_c(chop, "a=rtpengine:");
chopper_append_str(chop, &instance_id);
chopper_append_str(chop, &rtpe_instance_id);
chopper_append_c(chop, "\r\n");
}
@ -2481,7 +2481,7 @@ int sdp_is_duplicate(GQueue *sessions) {
return 0;
for (GList *ql = attr_list->head; ql; ql = ql->next) {
struct sdp_attribute *attr = ql->data;
if (!str_cmp_str(&attr->value, &instance_id))
if (!str_cmp_str(&attr->value, &rtpe_instance_id))
goto next;
}
return 0;
@ -2492,5 +2492,5 @@ next:
}
void sdp_init() {
rand_hex_str(instance_id.s, instance_id.len / 2);
rand_hex_str(rtpe_instance_id.s, rtpe_instance_id.len / 2);
}

@ -36,4 +36,8 @@ rtcp_filter_func rtcp_avpf2avp_filter;
void rtcp_init(void);
GString *rtcp_sender_report(uint32_t ssrc, uint32_t ts, uint32_t packets, uint32_t octets, GQueue *rrs);
void rtcp_receiver_reports(GQueue *out, struct ssrc_hash *hash, struct call_monologue *ml);
#endif

@ -17,6 +17,8 @@ struct sdp_chopper {
// int str_len;
};
extern const str rtpe_instance_id;
void sdp_init(void);
int sdp_parse(str *body, GQueue *sessions, const struct sdp_ng_flags *);

@ -63,6 +63,8 @@ struct ssrc_ctx {
duplicates,
last_seq, // XXX dup with srtp_index?
last_ts;
struct timeval next_rtcp; // for self-generated RTCP reports
};
struct ssrc_stats_block {

File diff suppressed because one or more lines are too long

@ -24,9 +24,9 @@ my ($playsrc, $playsink);
open($playsrc, '|-', qw(play -q -c 1 -e a-law -r 8000 -t raw -)) or die;
open($playsink, '|-', qw(play -q -c 1 -e a-law -r 8000 -t raw -)) or die;
my $lseq = rand();
my $lssrc = rand();
my $lts = rand();
my $lseq = rand(65536);
my $lssrc = rand(65536);
my $lts = rand(2*32);
my $lpt = 8; # PCMA
my $lmark = 0x80;
my $rseq = -1;

Loading…
Cancel
Save