MT#55283 refactor timer thread creation

Use a unified function to launch timer threads.

Functional no-op.

Change-Id: I6debd71220cfe464c48c00e5d194b5cf9b65e16c
pull/1802/head
Richard Fuchs 1 year ago
parent 5f2d5ed72c
commit da7b8f30ff

@ -5662,11 +5662,11 @@ static void codec_timers_run(void *p) {
} }
void codecs_init(void) { void codecs_init(void) {
timerthread_init(&codec_timers_thread, codec_timers_run); timerthread_init(&codec_timers_thread, rtpe_config.media_num_threads, codec_timers_run);
} }
void codecs_cleanup(void) { void codecs_cleanup(void) {
timerthread_free(&codec_timers_thread); timerthread_free(&codec_timers_thread);
} }
void codec_timers_loop(void *p) { void codec_timers_launch(void) {
timerthread_run(&codec_timers_thread); timerthread_launch(&codec_timers_thread, rtpe_config.scheduling, rtpe_config.priority, "codec timer");
} }

@ -749,7 +749,7 @@ static void __agent_deschedule(struct ice_agent *ag) {
void ice_init(void) { void ice_init(void) {
random_string((void *) &tie_breaker, sizeof(tie_breaker)); random_string((void *) &tie_breaker, sizeof(tie_breaker));
timerthread_init(&ice_agents_timer_thread, ice_agents_timer_run); timerthread_init(&ice_agents_timer_thread, 1, ice_agents_timer_run);
sdp_fragments = fragments_ht_new(); sdp_fragments = fragments_ht_new();
mutex_init(&sdp_fragments_lock); mutex_init(&sdp_fragments_lock);
@ -1527,8 +1527,8 @@ err:
void ice_thread_run(void *p) { void ice_thread_launch(void) {
timerthread_run(&ice_agents_timer_thread); timerthread_launch(&ice_agents_timer_thread, NULL, 0, "ICE");
} }
static void ice_agents_timer_run(void *ptr) { static void ice_agents_timer_run(void *ptr) {
struct ice_agent *ag = ptr; struct ice_agent *ag = ptr;

@ -23,7 +23,8 @@ static struct timerthread jitter_buffer_thread;
void jitter_buffer_init(void) { void jitter_buffer_init(void) {
//ilog(LOG_DEBUG, "jitter_buffer_init"); //ilog(LOG_DEBUG, "jitter_buffer_init");
timerthread_init(&jitter_buffer_thread, timerthread_queue_run); unsigned int num_threads = rtpe_config.jb_length > 0 ? rtpe_config.media_num_threads : 0;
timerthread_init(&jitter_buffer_thread, num_threads, timerthread_queue_run);
} }
void jitter_buffer_init_free(void) { void jitter_buffer_init_free(void) {
@ -412,9 +413,8 @@ void __jb_packet_free(void *p) {
jb_packet_free(&jbp); jb_packet_free(&jbp);
} }
void jitter_buffer_loop(void *p) { void jitter_buffer_launch(void) {
ilog(LOG_DEBUG, "jitter_buffer_loop"); timerthread_launch(&jitter_buffer_thread, rtpe_config.scheduling, rtpe_config.priority, "jitter buffer");
timerthread_run(&jitter_buffer_thread);
} }
struct jitter_buffer *jitter_buffer_new(call_t *c) { struct jitter_buffer *jitter_buffer_new(call_t *c) {

@ -1420,7 +1420,7 @@ int main(int argc, char **argv) {
thread_create_detach(mqtt_loop, NULL, "mqtt"); thread_create_detach(mqtt_loop, NULL, "mqtt");
#endif #endif
thread_create_detach(ice_thread_run, NULL, "ICE"); ice_thread_launch();
websocket_start(); websocket_start();
@ -1436,20 +1436,10 @@ int main(int argc, char **argv) {
if (rtpe_config.poller_per_thread) if (rtpe_config.poller_per_thread)
thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller"); thread_create_detach_prio(poller_loop2, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority, "poller");
for (idx = 0; idx < rtpe_config.media_num_threads; ++idx) { media_player_launch();
#ifdef WITH_TRANSCODING send_timer_launch();
thread_create_detach_prio(media_player_loop, NULL, rtpe_config.scheduling, jitter_buffer_launch();
rtpe_config.priority, "media player"); codec_timers_launch();
#endif
thread_create_detach_prio(send_timer_loop, NULL, rtpe_config.scheduling,
rtpe_config.priority, "send timer");
if (rtpe_config.jb_length > 0)
thread_create_detach_prio(jitter_buffer_loop, NULL, rtpe_config.scheduling,
rtpe_config.priority, "jitter buffer");
thread_create_detach_prio(codec_timers_loop, NULL, rtpe_config.scheduling,
rtpe_config.priority, "codec timer");
}
// reap threads as they shut down during run time // reap threads as they shut down during run time
threads_join_all(false); threads_join_all(false);

@ -1301,9 +1301,9 @@ void media_player_init(void) {
mutex_init(&media_player_cache_lock); mutex_init(&media_player_cache_lock);
} }
timerthread_init(&media_player_thread, media_player_run); timerthread_init(&media_player_thread, rtpe_config.media_num_threads, media_player_run);
#endif #endif
timerthread_init(&send_timer_thread, timerthread_queue_run); timerthread_init(&send_timer_thread, rtpe_config.media_num_threads, timerthread_queue_run);
} }
void media_player_free(void) { void media_player_free(void) {
@ -1319,12 +1319,12 @@ void media_player_free(void) {
} }
void media_player_launch(void) {
#ifdef WITH_TRANSCODING #ifdef WITH_TRANSCODING
void media_player_loop(void *p) { timerthread_launch(&media_player_thread, rtpe_config.scheduling, rtpe_config.priority, "media player");
timerthread_run(&media_player_thread);
}
#endif #endif
void send_timer_loop(void *p) { }
void send_timer_launch(void) {
//ilog(LOG_DEBUG, "send_timer_loop"); //ilog(LOG_DEBUG, "send_timer_loop");
timerthread_run(&send_timer_thread); timerthread_launch(&send_timer_thread, rtpe_config.scheduling, rtpe_config.priority, "media player");
} }

@ -9,11 +9,12 @@ static int tt_obj_cmp(const void *a, const void *b) {
return timeval_cmp_ptr(&A->next_check, &B->next_check); return timeval_cmp_ptr(&A->next_check, &B->next_check);
} }
void timerthread_init(struct timerthread *tt, void (*func)(void *)) { void timerthread_init(struct timerthread *tt, unsigned int num, void (*func)(void *)) {
tt->tree = g_tree_new(tt_obj_cmp); tt->tree = g_tree_new(tt_obj_cmp);
mutex_init(&tt->lock); mutex_init(&tt->lock);
cond_init(&tt->cond); cond_init(&tt->cond);
tt->func = func; tt->func = func;
tt->num_threads = num;
} }
static int __tt_put_all(void *k, void *d, void *p) { static int __tt_put_all(void *k, void *d, void *p) {
@ -29,7 +30,7 @@ void timerthread_free(struct timerthread *tt) {
mutex_destroy(&tt->lock); mutex_destroy(&tt->lock);
} }
void timerthread_run(void *p) { static void timerthread_run(void *p) {
struct timerthread *tt = p; struct timerthread *tt = p;
struct thread_waker waker = { .lock = &tt->lock, .cond = &tt->cond }; struct thread_waker waker = { .lock = &tt->lock, .cond = &tt->cond };
@ -80,6 +81,11 @@ sleep:;
thread_waker_del(&waker); thread_waker_del(&waker);
} }
void timerthread_launch(struct timerthread *tt, const char *scheduler, int prio, const char *name) {
for (unsigned int i = 0; i < tt->num_threads; i++)
thread_create_detach_prio(timerthread_run, tt, scheduler, prio, name);
}
void timerthread_obj_schedule_abs_nl(struct timerthread_obj *tt_obj, const struct timeval *tv) { void timerthread_obj_schedule_abs_nl(struct timerthread_obj *tt_obj, const struct timeval *tv) {
if (!tt_obj) if (!tt_obj)
return; return;

@ -96,7 +96,7 @@ typedef union {
void codecs_init(void); void codecs_init(void);
void codecs_cleanup(void); void codecs_cleanup(void);
void codec_timers_loop(void *); void codec_timers_launch(void);
void rtcp_timer_stop(struct rtcp_timer **); void rtcp_timer_stop(struct rtcp_timer **);
void codec_timer_callback(call_t *, void (*)(call_t *, codec_timer_callback_arg_t), void codec_timer_callback(call_t *, void (*)(call_t *, codec_timer_callback_arg_t),
codec_timer_callback_arg_t, uint64_t delay); codec_timer_callback_arg_t, uint64_t delay);

@ -159,7 +159,7 @@ void ice_restart(struct ice_agent *);
void ice_candidates_free(candidate_q *); void ice_candidates_free(candidate_q *);
void ice_remote_candidates(candidate_q *, struct ice_agent *); void ice_remote_candidates(candidate_q *, struct ice_agent *);
void ice_thread_run(void *); void ice_thread_launch(void);
int ice_request(stream_fd *, const endpoint_t *, struct stun_attrs *); int ice_request(stream_fd *, const endpoint_t *, struct stun_attrs *);
int ice_response(stream_fd *, const endpoint_t *src, int ice_response(stream_fd *, const endpoint_t *src,

@ -48,7 +48,7 @@ void jitter_buffer_free(struct jitter_buffer **);
int buffer_packet(struct media_packet *mp, const str *s); int buffer_packet(struct media_packet *mp, const str *s);
void jb_packet_free(struct jb_packet **jbp); void jb_packet_free(struct jb_packet **jbp);
void jitter_buffer_loop(void *p); void jitter_buffer_launch(void);
INLINE void jb_put(struct jitter_buffer **jb) { INLINE void jb_put(struct jitter_buffer **jb) {
if (!*jb) if (!*jb)

@ -121,12 +121,11 @@ void media_player_add_packet(struct media_player *mp, char *buf, size_t len,
void media_player_init(void); void media_player_init(void);
void media_player_free(void); void media_player_free(void);
void media_player_loop(void *); void media_player_launch(void);
struct send_timer *send_timer_new(struct packet_stream *); struct send_timer *send_timer_new(struct packet_stream *);
void send_timer_push(struct send_timer *, struct codec_packet *); void send_timer_push(struct send_timer *, struct codec_packet *);
void send_timer_launch(void);
void send_timer_loop(void *p);

@ -8,6 +8,7 @@
#include "obj.h" #include "obj.h"
struct timerthread { struct timerthread {
unsigned int num_threads;
GTree *tree; GTree *tree;
mutex_t lock; mutex_t lock;
cond_t cond; cond_t cond;
@ -41,9 +42,9 @@ struct timerthread_queue_entry {
}; };
void timerthread_init(struct timerthread *, void (*)(void *)); void timerthread_init(struct timerthread *, unsigned int, void (*)(void *));
void timerthread_free(struct timerthread *); void timerthread_free(struct timerthread *);
void timerthread_run(void *); void timerthread_launch(struct timerthread *, const char *scheduler, int prio, const char *name);
void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct timeval *); void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct timeval *);
void timerthread_obj_deschedule(struct timerthread_obj *); void timerthread_obj_deschedule(struct timerthread_obj *);

Loading…
Cancel
Save