diff --git a/daemon/codec.c b/daemon/codec.c index 6b1260b43..02457e66a 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -1148,8 +1148,10 @@ void codec_add_raw_packet(struct media_packet *mp) { struct codec_packet *p = g_slice_alloc0(sizeof(*p)); p->s = mp->raw; p->free_func = NULL; - if (mp->rtp && mp->ssrc_out) - payload_tracker_add(&mp->ssrc_out->tracker, mp->rtp->m_pt & 0x7f); + if (mp->rtp && mp->ssrc_out) { + p->ssrc_out = ssrc_ctx_get(mp->ssrc_out); + p->rtp = mp->rtp; + } g_queue_push_tail(&mp->packets_out, p); } static int handler_func_passthrough(struct codec_handler *h, struct media_packet *mp) { @@ -1300,6 +1302,8 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, p->free_func = free; p->ttq_entry.source = handler; p->rtp = rh; + p->ts = ts; + p->ssrc_out = ssrc_ctx_get(ssrc_out); // this packet is dynamically allocated, so we're able to schedule it. // determine scheduled time to send @@ -1329,10 +1333,6 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, (long unsigned) p->ttq_entry.when.tv_usec); g_queue_push_tail(&mp->packets_out, p); - - atomic64_inc(&ssrc_out->packets); - atomic64_add(&ssrc_out->octets, payload_len); - atomic64_set(&ssrc_out->last_ts, ts); } // returns new reference @@ -1502,6 +1502,7 @@ void codec_packet_free(void *pp) { struct codec_packet *p = pp; if (p->free_func) p->free_func(p->s.s); + ssrc_ctx_put(&p->ssrc_out); g_slice_free1(sizeof(*p), p); } diff --git a/daemon/dtmf.c b/daemon/dtmf.c index 6586a48bc..4816baf12 100644 --- a/daemon/dtmf.c +++ b/daemon/dtmf.c @@ -184,7 +184,8 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_monolog { struct call *call = monologue->call; - struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out, call->ssrc_hash, SSRC_DIR_OUTPUT); + struct ssrc_ctx *ssrc_out = get_ssrc_ctx(ssrc_in->ssrc_map_out, call->ssrc_hash, SSRC_DIR_OUTPUT, + monologue); if (!ssrc_out) return "No output SSRC context present"; // XXX generate stream diff --git a/daemon/media_player.c b/daemon/media_player.c index 7f43dfcaf..7f2b6c3e7 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -107,7 +107,7 @@ struct media_player *media_player_new(struct call_monologue *ml) { uint32_t ssrc = 0; while (ssrc == 0) ssrc = random(); - struct ssrc_ctx *ssrc_ctx = get_ssrc_ctx(ssrc, ml->call->ssrc_hash, SSRC_DIR_OUTPUT); + struct ssrc_ctx *ssrc_ctx = get_ssrc_ctx(ssrc, ml->call->ssrc_hash, SSRC_DIR_OUTPUT, ml); struct media_player *mp = obj_alloc0("media_player", sizeof(*mp), __media_player_free); @@ -166,16 +166,31 @@ static void __send_timer_send_common(struct send_timer *st, struct codec_packet if (!st->sink->selected_sfd) goto out; - struct rtp_header *rh = (void *) cp->s.s; - ilog(LOG_DEBUG, "Forward to sink endpoint: %s%s:%d%s (RTP seq %u TS %u)", - FMT_M(sockaddr_print_buf(&st->sink->endpoint.address), - st->sink->endpoint.port), - ntohs(rh->seq_num), - ntohl(rh->timestamp)); + struct rtp_header *rh = cp->rtp; + if (rh) + ilog(LOG_DEBUG, "Forward to sink endpoint: %s%s:%d%s (RTP seq %u TS %u)", + FMT_M(sockaddr_print_buf(&st->sink->endpoint.address), + st->sink->endpoint.port), + ntohs(rh->seq_num), + ntohl(rh->timestamp)); + else + ilog(LOG_DEBUG, "Forward to sink endpoint: %s%s:%d%s", + FMT_M(sockaddr_print_buf(&st->sink->endpoint.address), + st->sink->endpoint.port)); socket_sendto(&st->sink->selected_sfd->socket, cp->s.s, cp->s.len, &st->sink->endpoint); + if (cp->ssrc_out && cp->rtp) { + atomic64_inc(&cp->ssrc_out->packets); + atomic64_add(&cp->ssrc_out->octets, cp->s.len); + if (cp->ts) + atomic64_set(&cp->ssrc_out->last_ts, cp->ts); + else + atomic64_set(&cp->ssrc_out->last_ts, ntohl(cp->rtp->timestamp)); + payload_tracker_add(&cp->ssrc_out->tracker, cp->rtp->m_pt & 0x7f); + } + out: codec_packet_free(cp); } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index a0b09802d..471888878 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1330,7 +1330,7 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o ssrc_ctx_put(ssrc_in_p); ssrc_ctx_put(&in_srtp->ssrc_in); (*ssrc_in_p) = in_srtp->ssrc_in = - get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT); + get_ssrc_ctx(in_ssrc, ssrc_hash, SSRC_DIR_INPUT, in_srtp->media->monologue); ssrc_ctx_hold(in_srtp->ssrc_in); // might have created a new entry, which would have a new random @@ -1352,7 +1352,7 @@ static void __stream_ssrc(struct packet_stream *in_srtp, struct packet_stream *o ssrc_ctx_put(ssrc_out_p); ssrc_ctx_put(&out_srtp->ssrc_out); (*ssrc_out_p) = out_srtp->ssrc_out = - get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT); + get_ssrc_ctx(out_ssrc, ssrc_hash, SSRC_DIR_OUTPUT, out_srtp->media->monologue); ssrc_ctx_hold(out_srtp->ssrc_out); // reverse SSRC mapping @@ -1847,6 +1847,12 @@ static int stream_packet(struct packet_handler_ctx *phc) { // this set payload_type, ssrc_in, ssrc_out and mp media_packet_rtp(phc); + // SSRC receive stats + if (phc->mp.ssrc_in) { + atomic64_inc(&phc->mp.ssrc_in->packets); + atomic64_add(&phc->mp.ssrc_in->packets, phc->mp.raw.len); + } + /* do we have somewhere to forward it to? */ diff --git a/daemon/rtcp.c b/daemon/rtcp.c index af3a55898..730a4d928 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -1282,12 +1282,12 @@ static void transcode_rr(struct rtcp_process_ctx *ctx, struct report_block *rr) // reverse SSRC mapping struct ssrc_ctx *map_ctx = get_ssrc_ctx(ctx->scratch.rr.ssrc, ctx->mp->call->ssrc_hash, - SSRC_DIR_OUTPUT); + SSRC_DIR_OUTPUT, ctx->mp->media->monologue); rr->ssrc = htonl(map_ctx->ssrc_map_out); // for reception stats struct ssrc_ctx *input_ctx = get_ssrc_ctx(map_ctx->ssrc_map_out, ctx->mp->call->ssrc_hash, - SSRC_DIR_INPUT); + SSRC_DIR_INPUT, NULL); // substitute our own values diff --git a/daemon/ssrc.c b/daemon/ssrc.c index 10fa185cf..2c3874dca 100644 --- a/daemon/ssrc.c +++ b/daemon/ssrc.c @@ -194,11 +194,14 @@ struct ssrc_hash *create_ssrc_hash_call(void) { return create_ssrc_hash_full(create_ssrc_entry_call, NULL); } -struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir) { +struct ssrc_ctx *get_ssrc_ctx(u_int32_t ssrc, struct ssrc_hash *ht, enum ssrc_dir dir, void *ref) { struct ssrc_entry *s = get_ssrc(ssrc, ht /* , NULL */); if (G_UNLIKELY(!s)) return NULL; - return ((void *) s) + dir; + struct ssrc_ctx *ret = ((void *) s) + dir; + if (ref) + ret->ref = ref; + return ret; } diff --git a/include/codec.h b/include/codec.h index 6f205eb07..d97a7ca8c 100644 --- a/include/codec.h +++ b/include/codec.h @@ -49,6 +49,8 @@ struct codec_packet { struct timerthread_queue_entry ttq_entry; str s; struct rtp_header *rtp; + unsigned long ts; + struct ssrc_ctx *ssrc_out; void (*free_func)(void *); }; diff --git a/include/rtcp.h b/include/rtcp.h index dfd06ce76..177789bf2 100644 --- a/include/rtcp.h +++ b/include/rtcp.h @@ -11,6 +11,7 @@ struct crypto_context; struct rtcp_packet; struct ssrc_ctx; struct rtcp_handler; +struct call_monologue; struct rtcp_parse_ctx { diff --git a/include/ssrc.h b/include/ssrc.h index a87b8889d..66ecdba9c 100644 --- a/include/ssrc.h +++ b/include/ssrc.h @@ -46,6 +46,7 @@ struct payload_tracker { struct ssrc_ctx { struct ssrc_entry_call *parent; struct payload_tracker tracker; + void *ref; // points to the call_monologue but is opaque // XXX lock this? u_int64_t srtp_index, @@ -186,7 +187,7 @@ struct ssrc_hash *create_ssrc_hash_call(void); void *get_ssrc(u_int32_t, struct ssrc_hash * /* , int *created */); // creates new entry if not found -struct ssrc_ctx *get_ssrc_ctx(u_int32_t, struct ssrc_hash *, enum ssrc_dir); // creates new entry if not found +struct ssrc_ctx *get_ssrc_ctx(u_int32_t, struct ssrc_hash *, enum ssrc_dir, void *ref); // creates new entry if not found void ssrc_sender_report(struct call_media *, const struct ssrc_sender_report *, const struct timeval *); diff --git a/t/transcode-test.c b/t/transcode-test.c index d5c2e66d5..ec5679d11 100644 --- a/t/transcode-test.c +++ b/t/transcode-test.c @@ -179,12 +179,12 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media struct media_packet mp = { .call = &call, .media = media, - .ssrc_in = get_ssrc_ctx(ssrc, call.ssrc_hash, SSRC_DIR_INPUT), + .ssrc_in = get_ssrc_ctx(ssrc, call.ssrc_hash, SSRC_DIR_INPUT, NULL), }; // from __stream_ssrc() if (!MEDIA_ISSET(media, TRANSCODE)) mp.ssrc_in->ssrc_map_out = ntohl(ssrc); - mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, call.ssrc_hash, SSRC_DIR_OUTPUT); + mp.ssrc_out = get_ssrc_ctx(mp.ssrc_in->ssrc_map_out, call.ssrc_hash, SSRC_DIR_OUTPUT, NULL); payload_tracker_add(&mp.ssrc_in->tracker, pt_in & 0x7f); int packet_len = sizeof(struct rtp_header) + pl.len;