TT#74301 refactor send_timer into generic timer

Change-Id: I81dae7ae8bb1bfe0324f9a8ce256cf9d1c377840
changes/80/36980/5
Richard Fuchs 6 years ago
parent 4267e1ca09
commit 18634c4202

@ -969,35 +969,35 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
p->s.len = payload_len + sizeof(struct rtp_header); p->s.len = payload_len + sizeof(struct rtp_header);
payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type); payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type);
p->free_func = free; p->free_func = free;
p->source = handler; p->ttq_entry.source = handler;
p->rtp = rh; p->rtp = rh;
// this packet is dynamically allocated, so we're able to schedule it. // this packet is dynamically allocated, so we're able to schedule it.
// determine scheduled time to send // determine scheduled time to send
if (ch->first_send.tv_sec && ch->encoder_format.clockrate) { if (ch->first_send.tv_sec && ch->encoder_format.clockrate) {
// scale first_send from first_send_ts to ts // scale first_send from first_send_ts to ts
p->to_send = ch->first_send; p->ttq_entry.when = ch->first_send;
uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around
unsigned long long ts_diff_us = unsigned long long ts_diff_us =
(unsigned long long) ts_diff * 1000000 / ch->encoder_format.clockrate (unsigned long long) ts_diff * 1000000 / ch->encoder_format.clockrate
* ch->handler->dest_pt.codec_def->clockrate_mult; * ch->handler->dest_pt.codec_def->clockrate_mult;
timeval_add_usec(&p->to_send, ts_diff_us); timeval_add_usec(&p->ttq_entry.when, ts_diff_us);
// how far in the future is this? // how far in the future is this?
ts_diff_us = timeval_diff(&p->to_send, &rtpe_now); // negative wrap-around to positive OK ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); // negative wrap-around to positive OK
if (ts_diff_us > 1000000) // more than one second, can't be right if (ts_diff_us > 1000000) // more than one second, can't be right
ch->first_send.tv_sec = 0; // fix it up below ch->first_send.tv_sec = 0; // fix it up below
} }
if (!ch->first_send.tv_sec) { if (!ch->first_send.tv_sec) {
p->to_send = ch->first_send = rtpe_now; p->ttq_entry.when = ch->first_send = rtpe_now;
ch->first_send_ts = ts; ch->first_send_ts = ts;
} }
ilog(LOG_DEBUG, "Scheduling to send RTP packet (seq %u TS %lu) at %lu.%06lu", ilog(LOG_DEBUG, "Scheduling to send RTP packet (seq %u TS %lu) at %lu.%06lu",
ntohs(rh->seq_num), ntohs(rh->seq_num),
ts, ts,
(long unsigned) p->to_send.tv_sec, (long unsigned) p->ttq_entry.when.tv_sec,
(long unsigned) p->to_send.tv_usec); (long unsigned) p->ttq_entry.when.tv_usec);
g_queue_push_tail(&mp->packets_out, p); g_queue_push_tail(&mp->packets_out, p);

@ -30,27 +30,15 @@ static struct timerthread send_timer_thread;
static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp);
static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp);
#ifdef WITH_TRANSCODING #ifdef WITH_TRANSCODING
// called with call->master lock in W // called with call->master lock in W
static unsigned int send_timer_flush(struct send_timer *st, void *ptr) { static unsigned int send_timer_flush(struct send_timer *st, void *ptr) {
if (!st) return timerthread_queue_flush(&st->ttq, ptr);
return 0;
unsigned int num = 0;
GList *l = st->packets.head;
while (l) {
GList *next = l->next;
struct codec_packet *p = l->data;
if (p->source != ptr)
goto next;
g_queue_delete_link(&st->packets, l);
codec_packet_free(p);
num++;
next:
l = next;
}
return num;
} }
@ -145,32 +133,34 @@ static void __send_timer_free(void *p) {
ilog(LOG_DEBUG, "freeing send_timer"); ilog(LOG_DEBUG, "freeing send_timer");
g_queue_clear_full(&st->packets, codec_packet_free);
mutex_destroy(&st->lock);
obj_put(st->call); obj_put(st->call);
} }
static void __send_timer_send_now(struct timerthread_queue *ttq, void *p) {
send_timer_send_nolock((void *) ttq, p);
};
static void __send_timer_send_later(struct timerthread_queue *ttq, void *p) {
send_timer_send_lock((void *) ttq, p);
};
// call->master_lock held in W // call->master_lock held in W
struct send_timer *send_timer_new(struct packet_stream *ps) { struct send_timer *send_timer_new(struct packet_stream *ps) {
ilog(LOG_DEBUG, "creating send_timer"); ilog(LOG_DEBUG, "creating send_timer");
struct send_timer *st = obj_alloc0("send_timer", sizeof(*st), __send_timer_free); struct send_timer *st = timerthread_queue_new("send_timer", sizeof(*st),
st->tt_obj.tt = &send_timer_thread; &send_timer_thread,
mutex_init(&st->lock); __send_timer_send_now,
__send_timer_send_later,
__send_timer_free, codec_packet_free);
st->call = obj_get(ps->call); st->call = obj_get(ps->call);
st->sink = ps; st->sink = ps;
g_queue_init(&st->packets);
return st; return st;
} }
// st->stream->out_lock (or call->master_lock/W) must be held already static void __send_timer_send_common(struct send_timer *st, struct codec_packet *cp) {
static int send_timer_send(struct send_timer *st, struct codec_packet *cp) {
if (cp->to_send.tv_sec && timeval_cmp(&cp->to_send, &rtpe_now) > 0)
return -1; // not yet
if (!st->sink->selected_sfd) if (!st->sink->selected_sfd)
goto out; goto out;
@ -186,36 +176,36 @@ static int send_timer_send(struct send_timer *st, struct codec_packet *cp) {
out: out:
codec_packet_free(cp); codec_packet_free(cp);
return 0;
} }
static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp) {
struct call *call = st->call;
if (!call)
return;
log_info_call(call);
rwlock_lock_r(&call->master_lock);
__send_timer_send_common(st, cp);
rwlock_unlock_r(&call->master_lock);
}
// st->stream->out_lock (or call->master_lock/W) must be held already // st->stream->out_lock (or call->master_lock/W) must be held already
void send_timer_push(struct send_timer *st, struct codec_packet *cp) { static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp) {
// can we send immediately? struct call *call = st->call;
if (!send_timer_send(st, cp)) if (!call)
return; return;
// queue for sending log_info_call(call);
struct rtp_header *rh = (void *) cp->s.s; __send_timer_send_common(st, cp);
ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)", }
(unsigned long) cp->to_send.tv_sec,
(unsigned int) cp->to_send.tv_usec,
ntohs(rh->seq_num),
ntohl(rh->timestamp));
mutex_lock(&st->lock);
unsigned int qlen = st->packets.length;
// this hands over ownership of cp, so we must copy the timeval out
struct timeval tv_send = cp->to_send;
g_queue_push_tail(&st->packets, cp);
mutex_unlock(&st->lock);
// first packet in? we're probably not scheduled yet // st->stream->out_lock (or call->master_lock/W) must be held already
if (!qlen) void send_timer_push(struct send_timer *st, struct codec_packet *cp) {
timerthread_obj_schedule_abs(&st->tt_obj, &tv_send); timerthread_queue_push(&st->ttq, &cp->ttq_entry);
} }
@ -352,7 +342,7 @@ static void media_player_read_packet(struct media_player *mp) {
struct codec_packet *p = packet.packets_out.head->data; struct codec_packet *p = packet.packets_out.head->data;
if (p->rtp) { if (p->rtp) {
mp->sync_ts = ntohl(p->rtp->timestamp); mp->sync_ts = ntohl(p->rtp->timestamp);
mp->sync_ts_tv = p->to_send; mp->sync_ts_tv = p->ttq_entry.when;
} }
} }
@ -646,46 +636,11 @@ static void media_player_run(void *ptr) {
#endif #endif
static void send_timer_run(void *ptr) {
struct send_timer *st = ptr;
struct call *call = st->call;
log_info_call(call);
ilog(LOG_DEBUG, "running scheduled send_timer");
struct timeval next_send = {0,};
rwlock_lock_r(&call->master_lock);
mutex_lock(&st->lock);
while (st->packets.length) {
struct codec_packet *cp = st->packets.head->data;
// XXX this could be made lock-free
if (!send_timer_send(st, cp)) {
g_queue_pop_head(&st->packets);
continue;
}
// couldn't send the last one. remember time to schedule
next_send = cp->to_send;
break;
}
mutex_unlock(&st->lock);
rwlock_unlock_r(&call->master_lock);
if (next_send.tv_sec)
timerthread_obj_schedule_abs(&st->tt_obj, &next_send);
log_info_clear();
}
void media_player_init(void) { void media_player_init(void) {
#ifdef WITH_TRANSCODING #ifdef WITH_TRANSCODING
timerthread_init(&media_player_thread, media_player_run); timerthread_init(&media_player_thread, media_player_run);
#endif #endif
timerthread_init(&send_timer_thread, send_timer_run); timerthread_init(&send_timer_thread, timerthread_queue_run);
} }

@ -88,3 +88,129 @@ void timerthread_obj_deschedule(struct timerthread_obj *tt_obj) {
nope: nope:
mutex_unlock(&tt->lock); mutex_unlock(&tt->lock);
} }
static int timerthread_queue_run_one(struct timerthread_queue *ttq,
struct timerthread_queue_entry *ttqe,
void (*run_func)(struct timerthread_queue *, void *)) {
if (ttqe->when.tv_sec && timeval_cmp(&ttqe->when, &rtpe_now) > 0)
return -1; // not yet
run_func(ttq, ttqe);
return 0;
}
void timerthread_queue_run(void *ptr) {
struct timerthread_queue *ttq = ptr;
ilog(LOG_DEBUG, "running timerthread_queue");
struct timeval next_send = {0,};
mutex_lock(&ttq->lock);
while (ttq->entries.length) {
struct timerthread_queue_entry *ttqe = g_queue_pop_head(&ttq->entries);
mutex_unlock(&ttq->lock);
int ret = timerthread_queue_run_one(ttq, ttqe, ttq->run_later_func);
mutex_lock(&ttq->lock);
if (!ret)
continue;
// couldn't send the last one. remember time to schedule
g_queue_push_head(&ttq->entries, ttqe);
// XXX sort queue?
next_send = ttqe->when;
break;
}
mutex_unlock(&ttq->lock);
if (next_send.tv_sec)
timerthread_obj_schedule_abs(&ttq->tt_obj, &next_send); // XXX does this work if already scheduled earlier?
}
static void __timerthread_queue_free(void *p) {
struct timerthread_queue *ttq = p;
g_queue_clear_full(&ttq->entries, ttq->entry_free_func);
mutex_destroy(&ttq->lock);
if (ttq->free_func)
ttq->free_func(p);
}
void *timerthread_queue_new(const char *type, size_t size,
struct timerthread *tt,
void (*run_now_func)(struct timerthread_queue *, void *),
void (*run_later_func)(struct timerthread_queue *, void *),
void (*free_func)(void *),
void (*entry_free_func)(void *))
{
struct timerthread_queue *ttq = obj_alloc0(type, size, __timerthread_queue_free);
ttq->type = type;
ttq->tt_obj.tt = tt;
assert(tt->func == timerthread_queue_run);
ttq->run_now_func = run_now_func;
ttq->run_later_func = run_later_func;
if (!ttq->run_later_func)
ttq->run_later_func = run_now_func;
ttq->free_func = free_func;
ttq->entry_free_func = entry_free_func;
mutex_init(&ttq->lock);
g_queue_init(&ttq->entries);
return ttq;
}
void timerthread_queue_push(struct timerthread_queue *ttq, struct timerthread_queue_entry *ttqe) {
// can we send immediately?
if (!timerthread_queue_run_one(ttq, ttqe, ttq->run_now_func))
return;
// queue for sending
ilog(LOG_DEBUG, "queuing up %s object for processing at %lu.%06u",
ttq->type,
(unsigned long) ttqe->when.tv_sec,
(unsigned int) ttqe->when.tv_usec);
// XXX recover log line fields
// struct rtp_header *rh = (void *) cp->s.s;
// ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)",
// (unsigned long) cp->to_send.tv_sec,
// (unsigned int) cp->to_send.tv_usec,
// ntohs(rh->seq_num),
// ntohl(rh->timestamp));
mutex_lock(&ttq->lock);
unsigned int qlen = ttq->entries.length;
// this hands over ownership of cp, so we must copy the timeval out
struct timeval tv_send = ttqe->when;
g_queue_push_tail(&ttq->entries, ttqe);
mutex_unlock(&ttq->lock);
// first packet in? we're probably not scheduled yet
if (!qlen)
timerthread_obj_schedule_abs(&ttq->tt_obj, &tv_send);
}
unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) {
if (!ttq)
return 0;
unsigned int num = 0;
GList *l = ttq->entries.head;
while (l) {
GList *next = l->next;
struct timerthread_queue_entry *ttqe = l->data;
if (ttqe->source != ptr)
goto next;
g_queue_delete_link(&ttq->entries, l);
ttq->entry_free_func(ttqe);
num++;
next:
l = next;
}
return num;
}

@ -8,6 +8,7 @@
#include "codeclib.h" #include "codeclib.h"
#include "aux.h" #include "aux.h"
#include "rtplib.h" #include "rtplib.h"
#include "timerthread.h"
struct call_media; struct call_media;
@ -40,11 +41,10 @@ struct codec_handler {
}; };
struct codec_packet { struct codec_packet {
struct timerthread_queue_entry ttq_entry;
str s; str s;
struct rtp_header *rtp; struct rtp_header *rtp;
void *source; // opaque
void (*free_func)(void *); void (*free_func)(void *);
struct timeval to_send;
}; };

@ -62,11 +62,9 @@ INLINE void media_player_put(struct media_player **mp) {
#endif #endif
struct send_timer { struct send_timer {
struct timerthread_obj tt_obj; struct timerthread_queue ttq;
mutex_t lock;
struct call *call; // main reference that keeps this alive struct call *call; // main reference that keeps this alive
struct packet_stream *sink; struct packet_stream *sink;
GQueue packets;
}; };
@ -89,7 +87,7 @@ void send_timer_loop(void *p);
INLINE void send_timer_put(struct send_timer **st) { INLINE void send_timer_put(struct send_timer **st) {
if (!*st) if (!*st)
return; return;
obj_put(&(*st)->tt_obj); obj_put(&(*st)->ttq.tt_obj);
*st = NULL; *st = NULL;
} }

@ -22,6 +22,23 @@ struct timerthread_obj {
struct timeval last_run; /* ditto */ struct timeval last_run; /* ditto */
}; };
struct timerthread_queue {
struct timerthread_obj tt_obj;
const char *type;
mutex_t lock;
GQueue entries;
void (*run_now_func)(struct timerthread_queue *, void *);
void (*run_later_func)(struct timerthread_queue *, void *);
void (*free_func)(void *);
void (*entry_free_func)(void *);
};
struct timerthread_queue_entry {
struct timeval when;
void *source; // opaque
char __rest[0];
};
void timerthread_init(struct timerthread *, void (*)(void *)); void timerthread_init(struct timerthread *, void (*)(void *));
void timerthread_run(void *); void timerthread_run(void *);
@ -29,6 +46,17 @@ void timerthread_run(void *);
void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct timeval *); void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct timeval *);
void timerthread_obj_deschedule(struct timerthread_obj *); void timerthread_obj_deschedule(struct timerthread_obj *);
// run_now_func = called if newly inserted object can be processed immediately by timerthread_queue_push within its calling context
// run_later_func = called from the separate timer thread
void *timerthread_queue_new(const char *type, size_t size,
struct timerthread *tt,
void (*run_now_func)(struct timerthread_queue *, void *),
void (*run_later_func)(struct timerthread_queue *, void *), // optional
void (*free_func)(void *),
void (*entry_free_func)(void *));
void timerthread_queue_run(void *ptr);
void timerthread_queue_push(struct timerthread_queue *, struct timerthread_queue_entry *);
unsigned int timerthread_queue_flush(struct timerthread_queue *, void *);
INLINE void timerthread_obj_schedule_abs(struct timerthread_obj *tt_obj, const struct timeval *tv) { INLINE void timerthread_obj_schedule_abs(struct timerthread_obj *tt_obj, const struct timeval *tv) {
if (!tt_obj) if (!tt_obj)

Loading…
Cancel
Save