diff --git a/daemon/codec.c b/daemon/codec.c index 2e1b00803..aeff30a29 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -969,35 +969,35 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, p->s.len = payload_len + sizeof(struct rtp_header); payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type); p->free_func = free; - p->source = handler; + p->ttq_entry.source = handler; p->rtp = rh; // this packet is dynamically allocated, so we're able to schedule it. // determine scheduled time to send if (ch->first_send.tv_sec && ch->encoder_format.clockrate) { // scale first_send from first_send_ts to ts - p->to_send = ch->first_send; + p->ttq_entry.when = ch->first_send; uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around unsigned long long ts_diff_us = (unsigned long long) ts_diff * 1000000 / ch->encoder_format.clockrate * ch->handler->dest_pt.codec_def->clockrate_mult; - timeval_add_usec(&p->to_send, ts_diff_us); + timeval_add_usec(&p->ttq_entry.when, ts_diff_us); // how far in the future is this? - ts_diff_us = timeval_diff(&p->to_send, &rtpe_now); // negative wrap-around to positive OK + ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); // negative wrap-around to positive OK if (ts_diff_us > 1000000) // more than one second, can't be right ch->first_send.tv_sec = 0; // fix it up below } if (!ch->first_send.tv_sec) { - p->to_send = ch->first_send = rtpe_now; + p->ttq_entry.when = ch->first_send = rtpe_now; ch->first_send_ts = ts; } ilog(LOG_DEBUG, "Scheduling to send RTP packet (seq %u TS %lu) at %lu.%06lu", ntohs(rh->seq_num), ts, - (long unsigned) p->to_send.tv_sec, - (long unsigned) p->to_send.tv_usec); + (long unsigned) p->ttq_entry.when.tv_sec, + (long unsigned) p->ttq_entry.when.tv_usec); g_queue_push_tail(&mp->packets_out, p); diff --git a/daemon/media_player.c b/daemon/media_player.c index 22d0cb000..e21810e7f 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -30,27 +30,15 @@ static struct timerthread send_timer_thread; +static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp); +static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp); + + + #ifdef WITH_TRANSCODING // called with call->master lock in W static unsigned int send_timer_flush(struct send_timer *st, void *ptr) { - if (!st) - return 0; - - unsigned int num = 0; - GList *l = st->packets.head; - while (l) { - GList *next = l->next; - struct codec_packet *p = l->data; - if (p->source != ptr) - goto next; - g_queue_delete_link(&st->packets, l); - codec_packet_free(p); - num++; - -next: - l = next; - } - return num; + return timerthread_queue_flush(&st->ttq, ptr); } @@ -145,32 +133,34 @@ static void __send_timer_free(void *p) { ilog(LOG_DEBUG, "freeing send_timer"); - g_queue_clear_full(&st->packets, codec_packet_free); - mutex_destroy(&st->lock); obj_put(st->call); } +static void __send_timer_send_now(struct timerthread_queue *ttq, void *p) { + send_timer_send_nolock((void *) ttq, p); +}; +static void __send_timer_send_later(struct timerthread_queue *ttq, void *p) { + send_timer_send_lock((void *) ttq, p); +}; + // call->master_lock held in W struct send_timer *send_timer_new(struct packet_stream *ps) { ilog(LOG_DEBUG, "creating send_timer"); - struct send_timer *st = obj_alloc0("send_timer", sizeof(*st), __send_timer_free); - st->tt_obj.tt = &send_timer_thread; - mutex_init(&st->lock); + struct send_timer *st = timerthread_queue_new("send_timer", sizeof(*st), + &send_timer_thread, + __send_timer_send_now, + __send_timer_send_later, + __send_timer_free, codec_packet_free); st->call = obj_get(ps->call); st->sink = ps; - g_queue_init(&st->packets); return st; } -// st->stream->out_lock (or call->master_lock/W) must be held already -static int send_timer_send(struct send_timer *st, struct codec_packet *cp) { - if (cp->to_send.tv_sec && timeval_cmp(&cp->to_send, &rtpe_now) > 0) - return -1; // not yet - +static void __send_timer_send_common(struct send_timer *st, struct codec_packet *cp) { if (!st->sink->selected_sfd) goto out; @@ -186,36 +176,36 @@ static int send_timer_send(struct send_timer *st, struct codec_packet *cp) { out: codec_packet_free(cp); - - return 0; } +static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp) { + struct call *call = st->call; + if (!call) + return; + + log_info_call(call); + rwlock_lock_r(&call->master_lock); + + __send_timer_send_common(st, cp); + + rwlock_unlock_r(&call->master_lock); +} // st->stream->out_lock (or call->master_lock/W) must be held already -void send_timer_push(struct send_timer *st, struct codec_packet *cp) { - // can we send immediately? - if (!send_timer_send(st, cp)) +static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp) { + struct call *call = st->call; + if (!call) return; - // queue for sending + log_info_call(call); - struct rtp_header *rh = (void *) cp->s.s; - ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)", - (unsigned long) cp->to_send.tv_sec, - (unsigned int) cp->to_send.tv_usec, - ntohs(rh->seq_num), - ntohl(rh->timestamp)); + __send_timer_send_common(st, cp); +} - mutex_lock(&st->lock); - unsigned int qlen = st->packets.length; - // this hands over ownership of cp, so we must copy the timeval out - struct timeval tv_send = cp->to_send; - g_queue_push_tail(&st->packets, cp); - mutex_unlock(&st->lock); - // first packet in? we're probably not scheduled yet - if (!qlen) - timerthread_obj_schedule_abs(&st->tt_obj, &tv_send); +// st->stream->out_lock (or call->master_lock/W) must be held already +void send_timer_push(struct send_timer *st, struct codec_packet *cp) { + timerthread_queue_push(&st->ttq, &cp->ttq_entry); } @@ -352,7 +342,7 @@ static void media_player_read_packet(struct media_player *mp) { struct codec_packet *p = packet.packets_out.head->data; if (p->rtp) { mp->sync_ts = ntohl(p->rtp->timestamp); - mp->sync_ts_tv = p->to_send; + mp->sync_ts_tv = p->ttq_entry.when; } } @@ -646,46 +636,11 @@ static void media_player_run(void *ptr) { #endif -static void send_timer_run(void *ptr) { - struct send_timer *st = ptr; - struct call *call = st->call; - - log_info_call(call); - - ilog(LOG_DEBUG, "running scheduled send_timer"); - - struct timeval next_send = {0,}; - - rwlock_lock_r(&call->master_lock); - mutex_lock(&st->lock); - - while (st->packets.length) { - struct codec_packet *cp = st->packets.head->data; - // XXX this could be made lock-free - if (!send_timer_send(st, cp)) { - g_queue_pop_head(&st->packets); - continue; - } - // couldn't send the last one. remember time to schedule - next_send = cp->to_send; - break; - } - - mutex_unlock(&st->lock); - rwlock_unlock_r(&call->master_lock); - - if (next_send.tv_sec) - timerthread_obj_schedule_abs(&st->tt_obj, &next_send); - - log_info_clear(); -} - - void media_player_init(void) { #ifdef WITH_TRANSCODING timerthread_init(&media_player_thread, media_player_run); #endif - timerthread_init(&send_timer_thread, send_timer_run); + timerthread_init(&send_timer_thread, timerthread_queue_run); } diff --git a/daemon/timerthread.c b/daemon/timerthread.c index 0b02ceb90..0de77c7ef 100644 --- a/daemon/timerthread.c +++ b/daemon/timerthread.c @@ -88,3 +88,129 @@ void timerthread_obj_deschedule(struct timerthread_obj *tt_obj) { nope: mutex_unlock(&tt->lock); } + +static int timerthread_queue_run_one(struct timerthread_queue *ttq, + struct timerthread_queue_entry *ttqe, + void (*run_func)(struct timerthread_queue *, void *)) { + if (ttqe->when.tv_sec && timeval_cmp(&ttqe->when, &rtpe_now) > 0) + return -1; // not yet + run_func(ttq, ttqe); + return 0; +} + + +void timerthread_queue_run(void *ptr) { + struct timerthread_queue *ttq = ptr; + + ilog(LOG_DEBUG, "running timerthread_queue"); + + struct timeval next_send = {0,}; + + mutex_lock(&ttq->lock); + + while (ttq->entries.length) { + struct timerthread_queue_entry *ttqe = g_queue_pop_head(&ttq->entries); + + mutex_unlock(&ttq->lock); + + int ret = timerthread_queue_run_one(ttq, ttqe, ttq->run_later_func); + + mutex_lock(&ttq->lock); + + if (!ret) + continue; + // couldn't send the last one. remember time to schedule + g_queue_push_head(&ttq->entries, ttqe); + // XXX sort queue? + next_send = ttqe->when; + break; + } + + mutex_unlock(&ttq->lock); + + if (next_send.tv_sec) + timerthread_obj_schedule_abs(&ttq->tt_obj, &next_send); // XXX does this work if already scheduled earlier? +} + +static void __timerthread_queue_free(void *p) { + struct timerthread_queue *ttq = p; + g_queue_clear_full(&ttq->entries, ttq->entry_free_func); + mutex_destroy(&ttq->lock); + if (ttq->free_func) + ttq->free_func(p); +} + +void *timerthread_queue_new(const char *type, size_t size, + struct timerthread *tt, + void (*run_now_func)(struct timerthread_queue *, void *), + void (*run_later_func)(struct timerthread_queue *, void *), + void (*free_func)(void *), + void (*entry_free_func)(void *)) +{ + struct timerthread_queue *ttq = obj_alloc0(type, size, __timerthread_queue_free); + ttq->type = type; + ttq->tt_obj.tt = tt; + assert(tt->func == timerthread_queue_run); + ttq->run_now_func = run_now_func; + ttq->run_later_func = run_later_func; + if (!ttq->run_later_func) + ttq->run_later_func = run_now_func; + ttq->free_func = free_func; + ttq->entry_free_func = entry_free_func; + mutex_init(&ttq->lock); + g_queue_init(&ttq->entries); + return ttq; +} + +void timerthread_queue_push(struct timerthread_queue *ttq, struct timerthread_queue_entry *ttqe) { + // can we send immediately? + if (!timerthread_queue_run_one(ttq, ttqe, ttq->run_now_func)) + return; + + // queue for sending + + ilog(LOG_DEBUG, "queuing up %s object for processing at %lu.%06u", + ttq->type, + (unsigned long) ttqe->when.tv_sec, + (unsigned int) ttqe->when.tv_usec); + + // XXX recover log line fields +// struct rtp_header *rh = (void *) cp->s.s; +// ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)", +// (unsigned long) cp->to_send.tv_sec, +// (unsigned int) cp->to_send.tv_usec, +// ntohs(rh->seq_num), +// ntohl(rh->timestamp)); + + mutex_lock(&ttq->lock); + unsigned int qlen = ttq->entries.length; + // this hands over ownership of cp, so we must copy the timeval out + struct timeval tv_send = ttqe->when; + g_queue_push_tail(&ttq->entries, ttqe); + mutex_unlock(&ttq->lock); + + // first packet in? we're probably not scheduled yet + if (!qlen) + timerthread_obj_schedule_abs(&ttq->tt_obj, &tv_send); +} + +unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) { + if (!ttq) + return 0; + + unsigned int num = 0; + GList *l = ttq->entries.head; + while (l) { + GList *next = l->next; + struct timerthread_queue_entry *ttqe = l->data; + if (ttqe->source != ptr) + goto next; + g_queue_delete_link(&ttq->entries, l); + ttq->entry_free_func(ttqe); + num++; + +next: + l = next; + } + return num; +} diff --git a/include/codec.h b/include/codec.h index 17e0bc019..d11aa9599 100644 --- a/include/codec.h +++ b/include/codec.h @@ -8,6 +8,7 @@ #include "codeclib.h" #include "aux.h" #include "rtplib.h" +#include "timerthread.h" struct call_media; @@ -40,11 +41,10 @@ struct codec_handler { }; struct codec_packet { + struct timerthread_queue_entry ttq_entry; str s; struct rtp_header *rtp; - void *source; // opaque void (*free_func)(void *); - struct timeval to_send; }; diff --git a/include/media_player.h b/include/media_player.h index 6ec4e7673..813ab9245 100644 --- a/include/media_player.h +++ b/include/media_player.h @@ -62,11 +62,9 @@ INLINE void media_player_put(struct media_player **mp) { #endif struct send_timer { - struct timerthread_obj tt_obj; - mutex_t lock; + struct timerthread_queue ttq; struct call *call; // main reference that keeps this alive struct packet_stream *sink; - GQueue packets; }; @@ -89,7 +87,7 @@ void send_timer_loop(void *p); INLINE void send_timer_put(struct send_timer **st) { if (!*st) return; - obj_put(&(*st)->tt_obj); + obj_put(&(*st)->ttq.tt_obj); *st = NULL; } diff --git a/include/timerthread.h b/include/timerthread.h index 3f19d9bb6..8fd9a0162 100644 --- a/include/timerthread.h +++ b/include/timerthread.h @@ -22,6 +22,23 @@ struct timerthread_obj { struct timeval last_run; /* ditto */ }; +struct timerthread_queue { + struct timerthread_obj tt_obj; + const char *type; + mutex_t lock; + GQueue entries; + void (*run_now_func)(struct timerthread_queue *, void *); + void (*run_later_func)(struct timerthread_queue *, void *); + void (*free_func)(void *); + void (*entry_free_func)(void *); +}; + +struct timerthread_queue_entry { + struct timeval when; + void *source; // opaque + char __rest[0]; +}; + void timerthread_init(struct timerthread *, void (*)(void *)); void timerthread_run(void *); @@ -29,6 +46,17 @@ void timerthread_run(void *); void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct timeval *); void timerthread_obj_deschedule(struct timerthread_obj *); +// run_now_func = called if newly inserted object can be processed immediately by timerthread_queue_push within its calling context +// run_later_func = called from the separate timer thread +void *timerthread_queue_new(const char *type, size_t size, + struct timerthread *tt, + void (*run_now_func)(struct timerthread_queue *, void *), + void (*run_later_func)(struct timerthread_queue *, void *), // optional + void (*free_func)(void *), + void (*entry_free_func)(void *)); +void timerthread_queue_run(void *ptr); +void timerthread_queue_push(struct timerthread_queue *, struct timerthread_queue_entry *); +unsigned int timerthread_queue_flush(struct timerthread_queue *, void *); INLINE void timerthread_obj_schedule_abs(struct timerthread_obj *tt_obj, const struct timeval *tv) { if (!tt_obj)