diff --git a/daemon/call.c b/daemon/call.c index 723810320..dc90318c3 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -959,6 +959,7 @@ struct packet_stream *__packet_stream_new(struct call *call) { atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec); stream->rtp_stats = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __rtp_stats_free); recording_init_stream(stream); + stream->send_timer = send_timer_new(stream); return stream; } @@ -2220,6 +2221,7 @@ void call_destroy(struct call *c) { rtpe_now.tv_sec - atomic64_get(&ps->last_packet)); statistics_update_totals(ps); + send_timer_put(&ps->send_timer); } diff --git a/daemon/codec.c b/daemon/codec.c index 81907c994..20982eb47 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -62,6 +62,8 @@ struct codec_ssrc_handler { int bytes_per_packet; unsigned long first_ts; // for output TS scaling unsigned long ts_in; // for DTMF dupe detection + struct timeval first_send; + unsigned long first_send_ts; GString *sample_buffer; }; struct transcode_packet { @@ -563,7 +565,7 @@ void codec_handlers_free(struct call_media *m) { void codec_add_raw_packet(struct media_packet *mp) { - struct codec_packet *p = g_slice_alloc(sizeof(*p)); + struct codec_packet *p = g_slice_alloc0(sizeof(*p)); p->s = mp->raw; p->free_func = NULL; if (mp->rtp && mp->ssrc_out) @@ -682,14 +684,14 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, struct codec_handler *handler, // normally == ch->handler except for DTMF char *buf, // malloc'd, room for rtp_header + filled-in payload unsigned int payload_len, - unsigned int payload_ts, + unsigned long payload_ts, int marker, int seq, int seq_inc) { struct rtp_header *rh = (void *) buf; struct ssrc_ctx *ssrc_out = mp->ssrc_out; struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent; // reconstruct RTP header - unsigned int ts = payload_ts; + unsigned long ts = payload_ts; ZERO(*rh); rh->v_p_x_cc = 0x80; rh->m_pt = handler->dest_pt.payload_type | (marker ? 0x80 : 0); @@ -701,11 +703,39 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, rh->ssrc = htonl(ssrc_out_p->h.ssrc); // add to output queue - struct codec_packet *p = g_slice_alloc(sizeof(*p)); + struct codec_packet *p = g_slice_alloc0(sizeof(*p)); p->s.s = buf; p->s.len = payload_len + sizeof(struct rtp_header); payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type); p->free_func = free; + + // this packet is dynamically allocated, so we're able to schedule it. + // determine scheduled time to send + if (ch->first_send.tv_sec) { + // scale first_send from first_send_ts to ts + p->to_send = ch->first_send; + uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around + unsigned long long ts_diff_us = + (unsigned long long) ts_diff * 1000000 / ch->encoder_format.clockrate + * ch->handler->dest_pt.codec_def->clockrate_mult; + timeval_add_usec(&p->to_send, ts_diff_us); + + // how far in the future is this? + ts_diff_us = timeval_diff(&p->to_send, &rtpe_now); // negative wrap-around to positive OK + + if (ts_diff_us > 1000000) // more than one second, can't be right + ch->first_send.tv_sec = 0; // fix it up below + } + if (!ch->first_send.tv_sec) { + p->to_send = ch->first_send = rtpe_now; + ch->first_send_ts = ts; + } + ilog(LOG_DEBUG, "Scheduling to send RTP packet (seq %u TS %lu) at %lu.%06lu", + ntohs(rh->seq_num), + ts, + (long unsigned) p->to_send.tv_sec, + (long unsigned) p->to_send.tv_usec); + g_queue_push_tail(&mp->packets_out, p); atomic64_inc(&ssrc_out->packets); diff --git a/daemon/main.c b/daemon/main.c index 378c579fd..a944eac88 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -786,8 +786,10 @@ int main(int argc, char **argv) { if (rtpe_config.media_num_threads < 0) rtpe_config.media_num_threads = rtpe_config.num_threads; - for (idx = 0; idx < rtpe_config.media_num_threads; ++idx) + for (idx = 0; idx < rtpe_config.media_num_threads; ++idx) { thread_create_detach_prio(media_player_loop, NULL, rtpe_config.scheduling, rtpe_config.priority); + thread_create_detach_prio(send_timer_loop, NULL, rtpe_config.scheduling, rtpe_config.priority); + } while (!rtpe_shutdown) { diff --git a/daemon/media_player.c b/daemon/media_player.c index 1f8819ebd..5beba21fe 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -20,6 +20,7 @@ static struct timerthread media_player_thread; +static struct timerthread send_timer_thread; @@ -83,6 +84,85 @@ struct media_player *media_player_new(struct call_monologue *ml) { } +static void __send_timer_free(void *p) { + struct send_timer *st = p; + + ilog(LOG_DEBUG, "freeing send_timer"); + + g_queue_clear_full(&st->packets, codec_packet_free); + mutex_destroy(&st->lock); + obj_put(st->call); +} + + +// call->master_lock held in W +struct send_timer *send_timer_new(struct packet_stream *ps) { + ilog(LOG_DEBUG, "creating send_timer"); + + struct send_timer *st = obj_alloc0("send_timer", sizeof(*st), __send_timer_free); + st->tt_obj.tt = &send_timer_thread; + mutex_init(&st->lock); + st->call = obj_get(ps->call); + st->sink = ps; + g_queue_init(&st->packets); + + return st; +} + + +// st->stream->out_lock (or call->master_lock/W) must be held already +static int send_timer_send(struct send_timer *st, struct codec_packet *cp) { + if (cp->to_send.tv_sec && timeval_cmp(&cp->to_send, &rtpe_now) > 0) + return -1; // not yet + + if (!st->sink->selected_sfd) + goto out; + + struct rtp_header *rh = (void *) cp->s.s; + ilog(LOG_DEBUG, "Forward to sink endpoint: %s:%d (RTP seq %u TS %u)", + sockaddr_print_buf(&st->sink->endpoint.address), + st->sink->endpoint.port, + ntohs(rh->seq_num), + ntohl(rh->timestamp)); + + socket_sendto(&st->sink->selected_sfd->socket, + cp->s.s, cp->s.len, &st->sink->endpoint); + +out: + codec_packet_free(cp); + + return 0; +} + + +// st->stream->out_lock (or call->master_lock/W) must be held already +void send_timer_push(struct send_timer *st, struct codec_packet *cp) { + // can we send immediately? + if (!send_timer_send(st, cp)) + return; + + // queue for sending + + struct rtp_header *rh = (void *) cp->s.s; + ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)", + (unsigned long) cp->to_send.tv_sec, + (unsigned int) cp->to_send.tv_usec, + ntohs(rh->seq_num), + ntohl(rh->timestamp)); + + mutex_lock(&st->lock); + unsigned int qlen = st->packets.length; + // this hands over ownership of cp, so we must copy the timeval out + struct timeval tv_send = cp->to_send; + g_queue_push_tail(&st->packets, cp); + mutex_unlock(&st->lock); + + // first packet in? we're probably not scheduled yet + if (!qlen) + timerthread_obj_schedule_abs(&st->tt_obj, &tv_send); +} + + static int __ensure_codec_handler(struct media_player *mp, AVStream *avs) { if (mp->handler) return 0; @@ -189,6 +269,10 @@ static void media_player_read_packet(struct media_player *mp) { mp->handler->func(mp->handler, &packet); + // as this is timing sensitive and we may have spent some time decoding, + // update our global "now" timestamp + gettimeofday(&rtpe_now, NULL); + mutex_lock(&mp->sink->out_lock); if (media_socket_dequeue(&packet, mp->sink)) ilog(LOG_ERR, "Error sending playback media to RTP sink"); @@ -234,6 +318,8 @@ found: // call->master_lock held in W static void media_player_play_start(struct media_player *mp) { mp->next_run = rtpe_now; + // give ourselves a bit of a head start with decoding + timeval_add_usec(&mp->next_run, -50000); media_player_read_packet(mp); } @@ -365,8 +451,44 @@ static void media_player_run(void *ptr) { } +static void send_timer_run(void *ptr) { + struct send_timer *st = ptr; + struct call *call = st->call; + + log_info_call(call); + + ilog(LOG_DEBUG, "running scheduled send_timer"); + + struct timeval next_send = {0,}; + + rwlock_lock_r(&call->master_lock); + mutex_lock(&st->lock); + + while (st->packets.length) { + struct codec_packet *cp = st->packets.head->data; + // XXX this could be made lock-free + if (!send_timer_send(st, cp)) { + g_queue_pop_head(&st->packets); + continue; + } + // couldn't send the last one. remember time to schedule + next_send = cp->to_send; + break; + } + + mutex_unlock(&st->lock); + rwlock_unlock_r(&call->master_lock); + + if (next_send.tv_sec) + timerthread_obj_schedule_abs(&st->tt_obj, &next_send); + + log_info_clear(); +} + + void media_player_init(void) { timerthread_init(&media_player_thread, media_player_run); + timerthread_init(&send_timer_thread, send_timer_run); } @@ -374,3 +496,7 @@ void media_player_loop(void *p) { ilog(LOG_DEBUG, "media_player_loop"); timerthread_run(&media_player_thread); } +void send_timer_loop(void *p) { + ilog(LOG_DEBUG, "send_timer_loop"); + timerthread_run(&send_timer_thread); +} diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 312db95bc..8579df685 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -25,6 +25,7 @@ #include "iptables.h" #include "main.h" #include "codec.h" +#include "media_player.h" #ifndef PORT_RANDOM_MIN @@ -1575,18 +1576,8 @@ out: // appropriate locks must be held int media_socket_dequeue(struct media_packet *mp, struct packet_stream *sink) { struct codec_packet *p; - while ((p = g_queue_pop_head(&mp->packets_out))) { - __C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&sink->endpoint.address), - sink->endpoint.port); - - int ret = socket_sendto(&sink->selected_sfd->socket, - p->s.s, p->s.len, &sink->endpoint); - - codec_packet_free(p); - - if (ret == -1) - return -1; - } + while ((p = g_queue_pop_head(&mp->packets_out))) + send_timer_push(sink->send_timer, p); return 0; } diff --git a/include/call.h b/include/call.h index 2acc90944..f16440e69 100644 --- a/include/call.h +++ b/include/call.h @@ -199,6 +199,7 @@ struct ssrc_hash; struct codec_handler; struct rtp_payload_type; struct media_player; +struct send_timer; typedef bencode_buffer_t call_buffer_t; @@ -281,6 +282,7 @@ struct packet_stream { struct crypto_context crypto; /* OUT direction, LOCK: out_lock */ struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ // XXX eliminate these *ssrc_out; /* LOCK: out_lock */ + struct send_timer *send_timer; /* RO */ struct stats stats; struct stats kernel_stats; diff --git a/include/codec.h b/include/codec.h index 7d4b6887a..449d42cb0 100644 --- a/include/codec.h +++ b/include/codec.h @@ -3,6 +3,7 @@ #include +#include #include "str.h" #include "codeclib.h" #include "aux.h" @@ -36,6 +37,7 @@ struct codec_handler { struct codec_packet { str s; void (*free_func)(void *); + struct timeval to_send; }; diff --git a/include/media_player.h b/include/media_player.h index d43a7b41d..6fc77388c 100644 --- a/include/media_player.h +++ b/include/media_player.h @@ -15,6 +15,7 @@ struct call_monologue; struct codec_handler; struct ssrc_ctx; struct packet_stream; +struct codec_packet; struct media_player { @@ -38,6 +39,14 @@ struct media_player { str read_pos; }; +struct send_timer { + struct timerthread_obj tt_obj; + mutex_t lock; + struct call *call; // main reference that keeps this alive + struct packet_stream *sink; + GQueue packets; +}; + struct media_player *media_player_new(struct call_monologue *); int media_player_play_file(struct media_player *, const str *); @@ -47,6 +56,11 @@ void media_player_stop(struct media_player *); void media_player_init(void); void media_player_loop(void *); +struct send_timer *send_timer_new(struct packet_stream *); +void send_timer_push(struct send_timer *, struct codec_packet *); + +void send_timer_loop(void *p); + INLINE void media_player_put(struct media_player **mp) { @@ -55,6 +69,12 @@ INLINE void media_player_put(struct media_player **mp) { obj_put(&(*mp)->tt_obj); *mp = NULL; } +INLINE void send_timer_put(struct send_timer **st) { + if (!*st) + return; + obj_put(&(*st)->tt_obj); + *st = NULL; +} #endif