TT#78307 move SSRC send stats to actual point of sending

Change-Id: I54ff4afeb8b3a205a678e9102fb0d8cedee78bae
changes/57/39057/6
Richard Fuchs 5 years ago
parent dfc76d23b5
commit 2e4617ff86

@ -1148,8 +1148,10 @@ void codec_add_raw_packet(struct media_packet *mp) {
struct codec_packet *p = g_slice_alloc0(sizeof(*p));
p->s = mp->raw;
p->free_func = NULL;
if (mp->rtp && mp->ssrc_out)
payload_tracker_add(&mp->ssrc_out->tracker, mp->rtp->m_pt & 0x7f);
if (mp->rtp && mp->ssrc_out) {
p->ssrc_out = ssrc_ctx_get(mp->ssrc_out);
p->rtp = mp->rtp;
}
g_queue_push_tail(&mp->packets_out, p);
}
static int handler_func_passthrough(struct codec_handler *h, struct media_packet *mp) {
@ -1300,6 +1302,8 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
p->free_func = free;
p->ttq_entry.source = handler;
p->rtp = rh;
p->ts = ts;
p->ssrc_out = ssrc_ctx_get(ssrc_out);
// this packet is dynamically allocated, so we're able to schedule it.
// determine scheduled time to send
@ -1329,10 +1333,6 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
(long unsigned) p->ttq_entry.when.tv_usec);
g_queue_push_tail(&mp->packets_out, p);
atomic64_inc(&ssrc_out->packets);
atomic64_add(&ssrc_out->octets, payload_len);
atomic64_set(&ssrc_out->last_ts, ts);
}
// returns new reference
@ -1502,6 +1502,7 @@ void codec_packet_free(void *pp) {
struct codec_packet *p = pp;
if (p->free_func)
p->free_func(p->s.s);
ssrc_ctx_put(&p->ssrc_out);
g_slice_free1(sizeof(*p), p);
}

@ -184,7 +184,8 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_monolog
{
struct call *call = monologue->call;
struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out, call->ssrc_hash, SSRC_DIR_OUTPUT);
struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out, call->ssrc_hash, SSRC_DIR_OUTPUT,
monologue);
if (!ssrc_out)
return "No output SSRC context present"; // XXX generate stream

@ -107,7 +107,7 @@ struct media_player *media_player_new(struct call_monologue *ml) {
uint32_t ssrc = 0;
while (ssrc == 0)
ssrc = random();
struct ssrc_ctx *ssrc_ctx = get_ssrc_ctx(ssrc, ml->call->ssrc_hash, SSRC_DIR_OUTPUT);
struct ssrc_ctx *ssrc_ctx = get_ssrc_ctx(ssrc, ml->call->ssrc_hash, SSRC_DIR_OUTPUT, ml);
struct media_player *mp = obj_alloc0("media_player", sizeof(*mp), __media_player_free);
@ -166,16 +166,31 @@ static void __send_timer_send_common(struct send_timer *st, struct codec_packet
if (!st->sink->selected_sfd)
goto out;
struct rtp_header *rh = (void *) cp->s.s;
ilog(LOG_DEBUG, "Forward to sink endpoint: %s%s:%d%s (RTP seq %u TS %u)",
FMT_M(sockaddr_print_buf(&st->sink->endpoint.address),
st->sink->endpoint.port),
ntohs(rh->seq_num),
ntohl(rh->timestamp));
struct rtp_header *rh = cp->rtp;
if (rh)
ilog(LOG_DEBUG, "Forward to sink endpoint: %s%s:%d%s (RTP seq %u TS %u)",
FMT_M(sockaddr_print_buf(&st->sink->endpoint.address),
st->sink->endpoint.port),
ntohs(rh->seq_num),
ntohl(rh->timestamp));
else
ilog(LOG_DEBUG, "Forward to sink endpoint: %s%s:%d%s",
FMT_M(sockaddr_print_buf(&st->sink->endpoint.address),
st->sink->endpoint.port));
socket_sendto(&st->sink->selected_sfd->socket,
cp->s.s, cp->s.len, &st->sink->endpoint);
if (cp->ssrc_out && cp->rtp) {
atomic64_inc(&cp->ssrc_out->packets);
atomic64_add(&cp->ssrc_out->octets, cp->s.len);
if (cp->ts)
atomic64_set(&cp->ssrc_out->last_ts, cp->ts);
else
atomic64_set(&cp->ssrc_out->last_ts, ntohl(cp->rtp->timestamp));
payload_tracker_add(&cp->ssrc_out->tracker, cp->rtp->m_pt & 0x7f);
}
out:
codec_packet_free(cp);
}

@ -1330,7 +1330,7 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o
ssrc_ctx_put(ssrc_in_p);
ssrc_ctx_put(&in_srtp->ssrc_in);
(*ssrc_in_p) = in_srtp->ssrc_in =
get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT);
get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT, in_srtp->media->monologue);
ssrc_ctx_hold(in_srtp->ssrc_in);
// might have created a new entry, which would have a new random
@ -1352,7 +1352,7 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o
ssrc_ctx_put(ssrc_out_p);
ssrc_ctx_put(&out_srtp->ssrc_out);
(*ssrc_out_p) = out_srtp->ssrc_out =
get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT);
get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT, out_srtp->media->monologue);
ssrc_ctx_hold(out_srtp->ssrc_out);
// reverse SSRC mapping
@ -1847,6 +1847,12 @@ static int stream_packet(struct packet_handler_ctx *phc) {
// this set payload_type, ssrc_in, ssrc_out and mp
media_packet_rtp(phc);
// SSRC receive stats
if (phc->mp.ssrc_in) {
atomic64_inc(&phc->mp.ssrc_in->packets);
atomic64_add(&phc->mp.ssrc_in->packets, phc->mp.raw.len);
}
/* do we have somewhere to forward it to? */

@ -1282,12 +1282,12 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr)
// reverse SSRC mapping
struct ssrc_ctx *map_ctx = get_ssrc_ctx(ctx->scratch.rr.ssrc, ctx->mp->call->ssrc_hash,
SSRC_DIR_OUTPUT);
SSRC_DIR_OUTPUT, ctx->mp->media->monologue);
rr->ssrc = htonl(map_ctx->ssrc_map_out);
// for reception stats
struct ssrc_ctx *input_ctx = get_ssrc_ctx(map_ctx->ssrc_map_out, ctx->mp->call->ssrc_hash,
SSRC_DIR_INPUT);
SSRC_DIR_INPUT, NULL);
// substitute our own values

@ -194,11 +194,14 @@ struct ssrc_hash *create_ssrc_hash_call(void) {
return create_ssrc_hash_full(create_ssrc_entry_call, NULL);
}
struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir) {
struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir, void *ref) {
struct ssrc_entry *s = get_ssrc(ssrc, ht /* , NULL */);
if (G_UNLIKELY(!s))
return NULL;
return ((void *) s) + dir;
struct ssrc_ctx *ret = ((void *) s) + dir;
if (ref)
ret->ref = ref;
return ret;
}

@ -49,6 +49,8 @@ struct codec_packet {
struct timerthread_queue_entry ttq_entry;
str s;
struct rtp_header *rtp;
unsigned long ts;
struct ssrc_ctx *ssrc_out;
void (*free_func)(void *);
};

@ -11,6 +11,7 @@ struct crypto_context;
struct rtcp_packet;
struct ssrc_ctx;
struct rtcp_handler;
struct call_monologue;
struct rtcp_parse_ctx {

@ -46,6 +46,7 @@ struct payload_tracker {
struct ssrc_ctx {
struct ssrc_entry_call *parent;
struct payload_tracker tracker;
void *ref; // points to the call_monologue but is opaque
// XXX lock this?
u_int64_t srtp_index,
@ -186,7 +187,7 @@ struct ssrc_hash *create_ssrc_hash_call(void);
void *get_ssrc(u_int32_t, struct ssrc_hash * /* , int *created */); // creates new entry if not found
struct ssrc_ctx *get_ssrc_ctx(u_int32_t, struct ssrc_hash *, enum ssrc_dir); // creates new entry if not found
struct ssrc_ctx *get_ssrc_ctx(u_int32_t, struct ssrc_hash *, enum ssrc_dir, void *ref); // creates new entry if not found
void ssrc_sender_report(struct call_media *, const struct ssrc_sender_report *, const struct timeval *);

@ -179,12 +179,12 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media
struct media_packet mp = {
.call = &call,
.media = media,
.ssrc_in = get_ssrc_ctx(ssrc, call.ssrc_hash, SSRC_DIR_INPUT),
.ssrc_in = get_ssrc_ctx(ssrc, call.ssrc_hash, SSRC_DIR_INPUT, NULL),
};
// from __stream_ssrc()
if (!MEDIA_ISSET(media, TRANSCODE))
mp.ssrc_in->ssrc_map_out = ntohl(ssrc);
mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, call.ssrc_hash, SSRC_DIR_OUTPUT);
mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, call.ssrc_hash, SSRC_DIR_OUTPUT, NULL);
payload_tracker_add(&mp.ssrc_in->tracker, pt_in & 0x7f);
int packet_len = sizeof(struct rtp_header) + pl.len;

Loading…
Cancel
Save