From 9c2bccde8442c323954168a4cf4b3af91bfb3c90 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 18 Apr 2024 15:14:29 -0400 Subject: [PATCH] MT#55283 io_uring poller implementation Change-Id: I4ead8cfd709e5b55c9174c0a0542eb9019b49dd0 --- daemon/main.c | 65 ++++- daemon/udp_listener.c | 1 + include/main.h | 7 - lib/codeclib.c | 3 +- lib/codeclib.h | 1 + lib/poller.c | 7 + lib/poller.h | 10 +- lib/streambuf.c | 8 +- lib/uring.c | 485 ++++++++++++++++++++++++++++++++++++++ lib/uring.h | 14 ++ recording-daemon/poller.h | 3 +- t/test-mix-buffer.c | 1 + t/test-payload-tracker.c | 1 + t/test-stats.c | 6 +- t/test-transcode.c | 6 +- 15 files changed, 585 insertions(+), 33 deletions(-) diff --git a/daemon/main.c b/daemon/main.c index b56b1a87d..3564d741a 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -58,6 +58,8 @@ #include "janus.h" #include "nftables.h" #include "bufferpool.h" +#include "log_funcs.h" +#include "uring.h" @@ -68,12 +70,6 @@ static unsigned int num_poller_threads; unsigned int num_media_pollers; unsigned int rtpe_poller_rr_iter; -bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item; -bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item; -bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback; -void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked; -void (*rtpe_poller_error)(struct poller *, void *) = poller_error; - struct rtpengine_config initial_rtpe_config; static GQueue rtpe_tcp = G_QUEUE_INIT; @@ -1225,9 +1221,21 @@ static void early_init(void) { #ifdef WITH_TRANSCODING static void clib_init(void) { media_bufferpool = bufferpool_new(g_malloc, g_free, 64 * 65536); +#ifdef HAVE_LIBURING + if (rtpe_config.common.io_uring) + uring_thread_init(); +#endif } static void clib_cleanup(void) { bufferpool_destroy(media_bufferpool); +#ifdef HAVE_LIBURING + if (rtpe_config.common.io_uring) + uring_thread_cleanup(); +#endif +} +static void clib_loop(void) { + uring_thread_loop(); + append_thread_lpr_to_glob_lpr(); } #endif @@ -1259,6 +1267,7 @@ static void init_everything(void) { #ifdef WITH_TRANSCODING codeclib_thread_init = clib_init; codeclib_thread_cleanup = clib_cleanup; + codeclib_thread_loop = clib_loop; #endif codeclib_init(0); media_player_init(); @@ -1296,6 +1305,17 @@ static void create_everything(void) { kernel_setup(); // either one global poller, or one per thread for media sockets plus one for control sockets +#ifdef HAVE_LIBURING + if (rtpe_config.common.io_uring) { + rtpe_config.poller_per_thread = true; + rtpe_poller_add_item = uring_poller_add_item; + rtpe_poller_del_item = uring_poller_del_item; + rtpe_poller_del_item_callback = uring_poller_del_item_callback; + rtpe_poller_blocked = uring_poller_blocked; + rtpe_poller_error = uring_poller_error; + } +#endif + if (!rtpe_config.poller_per_thread) { num_media_pollers = num_rtpe_pollers = 1; num_poller_threads = rtpe_config.num_threads; @@ -1307,7 +1327,7 @@ static void create_everything(void) { } rtpe_pollers = g_malloc(sizeof(*rtpe_pollers) * num_rtpe_pollers); for (unsigned int i = 0; i < num_rtpe_pollers; i++) { - rtpe_pollers[i] = poller_new(); + rtpe_pollers[i] = rtpe_config.common.io_uring ? uring_poller_new() : poller_new(); if (!rtpe_pollers[i]) die("poller creation failed"); } @@ -1412,6 +1432,29 @@ static void do_redis_restore(void) { } +static void uring_thread_waker(struct thread_waker *wk) { + struct poller *p = wk->arg; + uring_poller_wake(p); +} +static void uring_poller_loop(void *ptr) { + struct poller *p = ptr; + + uring_poller_add_waker(p); + + struct thread_waker wk = {.func = uring_thread_waker, .arg = p}; + thread_waker_add_generic(&wk); + + while (!rtpe_shutdown) { + gettimeofday(&rtpe_now, NULL); + uring_poller_poll(p); + append_thread_lpr_to_glob_lpr(); + log_info_reset(); + } + thread_waker_del(&wk); + uring_poller_clear(p); +} + + int main(int argc, char **argv) { early_init(); options(&argc, &argv); @@ -1473,7 +1516,8 @@ int main(int argc, char **argv) { service_notify("READY=1\n"); for (unsigned int idx = 0; idx < num_poller_threads; ++idx) - thread_create_detach_prio(poller_loop, rtpe_pollers[idx % num_rtpe_pollers], + thread_create_detach_prio(rtpe_config.common.io_uring ? uring_poller_loop : poller_loop, + rtpe_pollers[idx % num_rtpe_pollers], rtpe_config.scheduling, rtpe_config.priority, idx < rtpe_config.num_threads ? "poller" : "cpoller"); @@ -1542,7 +1586,10 @@ int main(int argc, char **argv) { release_listeners(&rtpe_control_ng); release_listeners(&rtpe_control_ng_tcp); for (unsigned int idx = 0; idx < num_rtpe_pollers; ++idx) - poller_free(&rtpe_pollers[idx]); + if (rtpe_config.common.io_uring) + uring_poller_free(&rtpe_pollers[idx]); + else + poller_free(&rtpe_pollers[idx]); g_free(rtpe_pollers); interfaces_free(); #ifndef WITHOUT_NFTABLES diff --git a/daemon/udp_listener.c b/daemon/udp_listener.c index 8cad47f69..d190c0ef8 100644 --- a/daemon/udp_listener.c +++ b/daemon/udp_listener.c @@ -14,6 +14,7 @@ #include "obj.h" #include "socket.h" #include "log_funcs.h" +#include "uring.h" struct udp_listener_callback { struct obj obj; diff --git a/include/main.h b/include/main.h index dfcbedbf6..8f31f60da 100644 --- a/include/main.h +++ b/include/main.h @@ -213,13 +213,6 @@ extern struct poller *rtpe_control_poller; // poller for control sockets (maybe extern unsigned int num_media_pollers; // for media sockets, >= 1 extern unsigned int rtpe_poller_rr_iter; // round-robin assignment of pollers to each thread -struct poller_item; -extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *); -extern bool (*rtpe_poller_del_item)(struct poller *, int); -extern bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *); -extern void (*rtpe_poller_blocked)(struct poller *, void *); -extern void (*rtpe_poller_error)(struct poller *, void *); - INLINE struct poller *rtpe_get_poller(void) { // XXX optimise this for num_media_pollers == 1 ? diff --git a/lib/codeclib.c b/lib/codeclib.c index 1fcda717d..67e6f0a76 100644 --- a/lib/codeclib.c +++ b/lib/codeclib.c @@ -850,6 +850,7 @@ static codec_def_t *codec_def_cn; void (*codeclib_thread_init)(void); void (*codeclib_thread_cleanup)(void); +void (*codeclib_thread_loop)(void); static GHashTable *codecs_ht; @@ -1531,7 +1532,7 @@ static void cc_init(void) { cc_dlsym_resolve(rtpe_common_config_ptr->codec_chain_lib_path); - cc_set_thread_funcs(codeclib_thread_init, codeclib_thread_cleanup, NULL); + cc_set_thread_funcs(codeclib_thread_init, codeclib_thread_cleanup, codeclib_thread_loop); cc_client = cc_client_connect(4); if (!cc_client) diff --git a/lib/codeclib.h b/lib/codeclib.h index 1e21f6356..dfb37df69 100644 --- a/lib/codeclib.h +++ b/lib/codeclib.h @@ -368,6 +368,7 @@ extern const GQueue * const codec_supplemental_codecs; // must be set before calling codeclib_init extern void (*codeclib_thread_init)(void); extern void (*codeclib_thread_cleanup)(void); +extern void (*codeclib_thread_loop)(void); void codeclib_init(int); void codeclib_free(void); diff --git a/lib/poller.c b/lib/poller.c index c3ff630cc..ee58e4c50 100644 --- a/lib/poller.c +++ b/lib/poller.c @@ -35,6 +35,13 @@ struct poller { GPtrArray *items; }; +bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item; +bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item; +bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback; +void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked; +void (*rtpe_poller_error)(struct poller *, void *) = poller_error; + + static void poller_free_item(struct poller_item_int *ele) { if (ele) obj_put(ele); diff --git a/lib/poller.h b/lib/poller.h index a2d3c89fa..ef2c0c7e0 100644 --- a/lib/poller.h +++ b/lib/poller.h @@ -11,7 +11,7 @@ struct obj; - +struct sockaddr; typedef void (*poller_func_t)(int, void *); @@ -21,6 +21,8 @@ struct poller_item { struct obj *obj; poller_func_t readable; + void (*recv)(struct obj *, char *b, size_t len, struct sockaddr *, + struct timeval *); poller_func_t writeable; poller_func_t closed; }; @@ -38,6 +40,12 @@ void poller_error(struct poller *, void *); void poller_loop(void *); +extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *); +extern bool (*rtpe_poller_del_item)(struct poller *, int); +extern bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *); +extern void (*rtpe_poller_blocked)(struct poller *, void *); +extern void (*rtpe_poller_error)(struct poller *, void *); + #ifdef HAVE_LIBURING extern __thread unsigned int (*uring_thread_loop)(void); diff --git a/lib/streambuf.c b/lib/streambuf.c index 77bf0fbf7..61a092583 100644 --- a/lib/streambuf.c +++ b/lib/streambuf.c @@ -83,7 +83,7 @@ int streambuf_writeable(struct streambuf *b) { } if (ret != out) { - poller_blocked(b->poller, b->fd_ptr); + rtpe_poller_blocked(b->poller, b->fd_ptr); break; } } @@ -217,10 +217,10 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { if (errno == EINTR) continue; if (errno != EAGAIN && errno != EWOULDBLOCK) { - poller_error(b->poller, b->fd_ptr); + rtpe_poller_error(b->poller, b->fd_ptr); break; } - poller_blocked(b->poller, b->fd_ptr); + rtpe_poller_blocked(b->poller, b->fd_ptr); break; } if (ret == 0) @@ -232,7 +232,7 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) { } if (b->buf->len > 5242880) - poller_error(b->poller, b->fd_ptr); + rtpe_poller_error(b->poller, b->fd_ptr); else if (len) g_string_append_len(b->buf, s, len); diff --git a/lib/uring.c b/lib/uring.c index b9b0d350a..977115b6b 100644 --- a/lib/uring.c +++ b/lib/uring.c @@ -1,12 +1,52 @@ #include "uring.h" #include #include +#include #include "log.h" #include "loglib.h" #include "socket.h" #include "poller.h" +#include "bufferpool.h" +#include "call.h" +#define BUFFER_SIZE RTP_BUFFER_SIZE // size of individual buffer +#define BUFFERS_COUNT 1024 // number of buffers allocated in one pool, should be 2^n +#define BUFFER_POOLS 8 // number of pools to keep alive + +static_assert(BUFFERS_COUNT * BUFFER_POOLS < (1<<16), "too many buffers (>= 2^16)"); + +struct uring_buffer { + void *buf; + struct poller *poller; + unsigned int num; +}; + +struct poller { + mutex_t lock; + GQueue reqs; + int waker_fds[2]; + GPtrArray *evs; // holds uring_poll_event by fd + struct bufferpool *bufferpool; + struct uring_buffer *buffers[BUFFER_POOLS]; +}; + +struct poller_req { + enum { ADD, BLOCKED, ERROR, DEL, BUFFERS, RECV } type; + union { + struct poller_item it; + struct { + int fd; + void (*callback)(void *); + void *arg; + }; + struct { + void *buf; + unsigned int num; + }; + }; +}; + static ssize_t __socket_sendmsg(socket_t *s, struct msghdr *m, const endpoint_t *e, struct sockaddr_storage *ss, struct uring_req *r) { @@ -71,4 +111,449 @@ void uring_thread_cleanup(void) { io_uring_queue_exit(&rtpe_uring); } + +static void uring_submit_buffer(struct poller *p, void *b, unsigned int num) { + struct poller_req *req = g_new0(__typeof(*req), 1); + req->type = BUFFERS; + req->buf = b; + + LOCK(&p->lock); + + g_queue_push_tail(&p->reqs, req); + uring_poller_wake(p); + req->num = num; +} +static unsigned int uring_buffer_recycle(void *p) { + //ilog(LOG_INFO, "uring buffer recycle"); + struct uring_buffer *b = p; + struct poller *poller = b->poller; + uring_submit_buffer(poller, b->buf, b->num); + return BUFFERS_COUNT; +} +struct poller *uring_poller_new(void) { + struct poller *ret = g_new0(__typeof(*ret), 1); + + mutex_init(&ret->lock); + g_queue_init(&ret->reqs); + int ok = socketpair(AF_UNIX, SOCK_STREAM, 0, ret->waker_fds); + if (ok != 0) + return false; + nonblock(ret->waker_fds[0]); + nonblock(ret->waker_fds[1]); + ret->evs = g_ptr_array_new(); + + ret->bufferpool = bufferpool_new(g_malloc, g_free, BUFFER_SIZE * BUFFERS_COUNT); + for (int i = 0; i < BUFFER_POOLS; i++) { + ret->buffers[i] = g_new0(__typeof(*ret->buffers[i]), 1); + ret->buffers[i]->buf = bufferpool_reserve(ret->bufferpool, BUFFERS_COUNT, + uring_buffer_recycle, ret->buffers[i]); + ret->buffers[i]->buf += RTP_BUFFER_HEAD_ROOM; + ret->buffers[i]->num = i; + ret->buffers[i]->poller = ret; + uring_submit_buffer(ret, ret->buffers[i]->buf, i); + } + + return ret; +} + +void uring_poller_free(struct poller **pp) { + // XXX cleanup of reqs + close((*pp)->waker_fds[0]); + close((*pp)->waker_fds[1]); + g_ptr_array_free((*pp)->evs, true); + for (int i = 0; i < BUFFER_POOLS; i++) { + bufferpool_release((*pp)->buffers[i]->buf); + g_free((*pp)->buffers[i]); + } + bufferpool_destroy((*pp)->bufferpool); + g_free(*pp); + *pp = NULL; +} + +void uring_poller_wake(struct poller *p) { + ssize_t ret = write(p->waker_fds[0], "", 1); + (void)ret; // ignore return value +} + +bool uring_poller_add_item(struct poller *p, struct poller_item *i) { + if (!p) + return false; + if (!i) + return false; + if (i->fd < 0) + return false; + if (!i->readable) + return false; + if (!i->closed) + return false; + + struct poller_req *req = g_new0(__typeof(*req), 1); + if (i->recv) + req->type = RECV; + else + req->type = ADD; + req->it = *i; + + if (req->it.obj) + obj_hold_o(req->it.obj); + + LOCK(&p->lock); + + g_queue_push_tail(&p->reqs, req); + uring_poller_wake(p); + + return true; +} +void uring_poller_blocked(struct poller *p, void *fdp) { + struct poller_req *req = g_new0(__typeof(*req), 1); + req->type = ERROR; + req->fd = GPOINTER_TO_INT(fdp); + + LOCK(&p->lock); + + g_queue_push_tail(&p->reqs, req); + uring_poller_wake(p); +} +void uring_poller_error(struct poller *p, void *fdp) { + struct poller_req *req = g_new0(__typeof(*req), 1); + req->type = BLOCKED; + req->fd = GPOINTER_TO_INT(fdp); + + LOCK(&p->lock); + + g_queue_push_tail(&p->reqs, req); + uring_poller_wake(p); +} +bool uring_poller_del_item_callback(struct poller *p, int fd, void (*callback)(void *), void *arg) { + if (rtpe_shutdown) + return true; + + struct poller_req *req = g_new0(__typeof(*req), 1); + req->type = DEL; + req->fd = fd; + req->callback = callback; + req->arg = arg; + + LOCK(&p->lock); + + g_queue_push_tail(&p->reqs, req); + uring_poller_wake(p); + + return true; +} +bool uring_poller_del_item(struct poller *p, int fd) { + return uring_poller_del_item_callback(p, fd, NULL, NULL); +} + +struct uring_poll_event { + struct uring_req req; // must be first + struct poller_item it; + struct poller *poller; + bool closed:1; +}; +static void uring_poll_event(struct uring_req *req, int32_t res, uint32_t flags) { + struct uring_poll_event *ereq = (__typeof(ereq)) req; + bool closed = false; + + //ilog(LOG_INFO, "uring poll event %i %i %i", ereq->it.fd, res, flags); + + if (res < 0) { + if (res != -ECANCELED) + ilog(LOG_WARNING | LOG_FLAG_LIMIT, "io_uring poll error on fd %i: %s", + ereq->it.fd, strerror(-res)); + closed = true; + } + else if ((res & (POLLERR | POLLHUP))) { + //ilog(LOG_INFO, "uring poll fd error %i %i", ereq->it.fd, res); + closed = true; + } + else if ((res & POLLIN)) + ereq->it.readable(ereq->it.fd, ereq->it.obj); + else { + ilog(LOG_WARNING | LOG_FLAG_LIMIT, "unhandled io_uring poll event mask on fd %i: %i", + ereq->it.fd, res); + closed = true; + } + + if (closed) { + if (!ereq->closed) + ereq->it.closed(ereq->it.fd, ereq->it.obj); + ereq->closed = true; + } + + if (!(flags & IORING_CQE_F_MORE)) { + if (ereq->it.obj) + obj_put_o(ereq->it.obj); + struct poller *p = ereq->poller; + { + LOCK(&p->lock); + if (p->evs->len > ereq->it.fd && p->evs->pdata[ereq->it.fd] == ereq) + p->evs->pdata[ereq->it.fd] = NULL; + } + uring_req_free(&ereq->req); + } +} + +struct uring_poll_removed { + struct uring_req req; // must be first + int fd; + void (*callback)(void *); + void *arg; + struct poller *poller; +}; +static void uring_poll_removed(struct uring_req *req, int32_t res, uint32_t flags) { + struct uring_poll_removed *rreq = (__typeof(rreq)) req; + //ilog(LOG_INFO, "poll removed fd %i with cb %p/%p", rreq->fd, rreq->callback, rreq->arg); + if (rreq->callback) + rreq->callback(rreq->arg); + else + close(rreq->fd); + uring_req_free(req); +} + +struct uring_poll_unblocked { + struct uring_req req; // must be first + struct poller_item it; +}; +static void uring_poll_unblocked(struct uring_req *req, int32_t res, uint32_t flags) { + struct uring_poll_unblocked *ureq = (__typeof(ureq)) req; + bool closed = false; + if (res < 0) { + ilog(LOG_WARNING | LOG_FLAG_LIMIT, "io_uring poll write error on fd %i: %s", + ureq->it.fd, strerror(-res)); + closed = true; + } + else if (!(res & (POLLOUT))) { + ilog(LOG_WARNING | LOG_FLAG_LIMIT, "unhandled io_uring poll event write mask on fd %i: %i", + ureq->it.fd, res); + closed = true; + } + else + ureq->it.writeable(ureq->it.fd, ureq->it.obj); + + assert((flags & IORING_CQE_F_MORE) == 0); + + if (closed) + ureq->it.closed(ureq->it.fd, ureq->it.obj); + + if (ureq->it.obj) + obj_put_o(ureq->it.obj); + uring_req_free(req); +} + +struct uring_poll_recv { + struct uring_req req; // must be first + struct poller_item it; + struct msghdr msg; + struct iovec iov; + struct poller *poller; + bool closed:1; +}; +INLINE void uring_recvmsg_parse_cmsg(struct timeval *tv, + sockaddr_t *to, bool (*parse)(struct cmsghdr *, sockaddr_t *), + struct io_uring_recvmsg_out *out, struct msghdr *mh) +{ + socket_recvfrom_parse_cmsg(&tv, &to, parse, mh, + io_uring_recvmsg_cmsg_firsthdr(out, mh), + io_uring_recvmsg_cmsg_nexthdr(out, mh, cm)); +} +static void uring_poll_recv(struct uring_req *req, int32_t res, uint32_t flags) { + struct uring_poll_recv *rreq = (__typeof(rreq)) req; + struct poller *p = rreq->poller; + bool closed = false; + + //ilog(LOG_INFO, "uring recvmsg event %i %i %i", rreq->it.fd, res, flags); + + if (res < 0) { + if (res != -ECANCELED) + ilog(LOG_WARNING | LOG_FLAG_LIMIT, "io_uring recvmsg error on fd %i: %s", + rreq->it.fd, strerror(-res)); + closed = true; + } + else { + assert((flags & IORING_CQE_F_BUFFER) != 0); + unsigned int buffer_id = flags >> IORING_CQE_BUFFER_SHIFT; + unsigned int pool_id = buffer_id / BUFFERS_COUNT; + unsigned int pool_offset = buffer_id % BUFFERS_COUNT; + //ilog(LOG_INFO, "pool id %u buf id %u", pool_id, pool_offset); + assert(pool_id < BUFFER_POOLS); + struct uring_buffer *ubuf = p->buffers[pool_id]; + void *buf = ubuf->buf + BUFFER_SIZE * pool_offset; + + struct io_uring_recvmsg_out *out = io_uring_recvmsg_validate(buf, BUFFER_SIZE, &rreq->msg); + assert(out != NULL); + void *payload = io_uring_recvmsg_payload(out, &rreq->msg); + struct sockaddr *sa = io_uring_recvmsg_name(out); + + struct timeval tv = {0}; + uring_recvmsg_parse_cmsg(&tv, NULL, NULL, out, &rreq->msg); + + rreq->it.recv(rreq->it.obj, payload, out->payloadlen, sa, &tv); + } + + if (!(flags & IORING_CQE_F_MORE)) + closed = true; + + if (closed) { + if (!rreq->closed) + rreq->it.closed(rreq->it.fd, rreq->it.obj); + rreq->closed = true; + } + + if (!(flags & IORING_CQE_F_MORE)) { + //ilog(LOG_INFO, "last uring recv event for fd %i for %p (%i)", rreq->it.fd, rreq->it.obj, rreq->it.obj->ref); + if (rreq->it.obj) + obj_put_o(rreq->it.obj); + uring_req_free(&rreq->req); + } +} + +static void uring_poller_do_add(struct poller *p, struct poller_req *preq) { + // don't allow duplicates + if (p->evs->len > preq->it.fd && p->evs->pdata[preq->it.fd]) + abort(); // XXX handle gracefully? + struct uring_poll_event *ereq + = uring_alloc_req(sizeof(*ereq), uring_poll_event); + ereq->it = preq->it; + ereq->poller = p; + struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); + io_uring_prep_poll_multishot(sqe, ereq->it.fd, POLLHUP | POLLERR | POLLIN); + io_uring_sqe_set_data(sqe, ereq); + // save ereq for write blocks. no extra obj reference + if (p->evs->len <= ereq->it.fd) + g_ptr_array_set_size(p->evs, ereq->it.fd + 1); + p->evs->pdata[ereq->it.fd] = ereq; +} +static void uring_poller_do_blocked(struct poller *p, struct poller_req *preq) { + // valid fd? + if (p->evs->len <= preq->fd || !p->evs->pdata[preq->fd]) + abort(); // XXX handle gracefully? + struct uring_poll_event *ereq = p->evs->pdata[preq->fd]; + struct uring_poll_unblocked *ureq + = uring_alloc_req(sizeof(*ureq), uring_poll_unblocked); + ureq->it = ereq->it; + if (ureq->it.obj) + obj_hold_o(ureq->it.obj); + struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); + io_uring_prep_poll_add(sqe, ureq->it.fd, POLLOUT); + io_uring_sqe_set_data(sqe, ureq); +} +static void uring_poller_do_error(struct poller *p, struct poller_req *preq) { + // do nothing? +} +static void uring_poller_do_del(struct poller *p, struct poller_req *preq) { + //ilog(LOG_INFO, "del fd %i on %p", preq->fd, p); + struct uring_poll_removed *rreq + = uring_alloc_req(sizeof(*rreq), uring_poll_removed); + rreq->fd = preq->fd; + rreq->poller = p; + rreq->callback = preq->callback; + rreq->arg = preq->arg; + struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); + io_uring_prep_cancel_fd(sqe, rreq->fd, IORING_ASYNC_CANCEL_ALL); + io_uring_sqe_set_data(sqe, rreq); +} +static void uring_poller_do_buffers(struct poller *p, struct poller_req *preq) { + //ilog(LOG_INFO, "XXXXXXXXX adding buffers %p %u", p, preq->num); + struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); + io_uring_prep_provide_buffers(sqe, preq->buf, BUFFER_SIZE, BUFFERS_COUNT, 0, + preq->num * BUFFERS_COUNT); + struct uring_req *breq = uring_alloc_buffer_req(sizeof(*breq)); + io_uring_sqe_set_data(sqe, breq); // XXX no content? not needed? +} +static void uring_poller_do_recv(struct poller *p, struct poller_req *preq) { + //ilog(LOG_INFO, "adding recv fd %i on %p for %p", preq->it.fd, p, preq->it.obj); + struct uring_poll_recv *rreq + = uring_alloc_req(sizeof(*rreq), uring_poll_recv); + rreq->it = preq->it; + rreq->poller = p; + struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); + rreq->iov = (__typeof(rreq->iov)) { + .iov_len = MAX_RTP_PACKET_SIZE, + }; + rreq->msg = (__typeof(rreq->msg)) { + .msg_iov = &rreq->iov, + .msg_iovlen = 1, + .msg_namelen = sizeof(struct sockaddr_storage), + .msg_controllen = 64, + }; + io_uring_prep_recvmsg_multishot(sqe, rreq->it.fd, &rreq->msg, 0); + sqe->ioprio |= IORING_RECVSEND_POLL_FIRST; + io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT); + sqe->buf_group = 0; + io_uring_sqe_set_data(sqe, rreq); +} + +static void uring_poller_do_reqs(struct poller *p) { + LOCK(&p->lock); + + while (p->reqs.length) { + struct poller_req *preq = g_queue_pop_head(&p->reqs); + + switch (preq->type) { + case ADD: + uring_poller_do_add(p, preq); + break; + case BLOCKED: + uring_poller_do_blocked(p, preq); + break; + case ERROR: + uring_poller_do_error(p, preq); + break; + case DEL: + uring_poller_do_del(p, preq); + break; + case BUFFERS: + uring_poller_do_buffers(p, preq); + break; + case RECV: + uring_poller_do_recv(p, preq); + break; + default: + abort(); + } + + g_free(preq); + } +} + +void uring_poller_waker_read(int fd, void *p) { + char buf[32]; + while (read(fd, buf, sizeof(buf)) > 0) { } +} +void uring_poller_waker_closed(int fd, void *p) { + if (!rtpe_shutdown) + abort(); +} + +void uring_poller_add_waker(struct poller *p) { + uring_poller_add_item(p, + &(struct poller_item) { + .readable = uring_poller_waker_read, + .closed = uring_poller_waker_closed, + .fd = p->waker_fds[1], + }); +} + +void uring_poller_poll(struct poller *p) { + uring_poller_do_reqs(p); + + unsigned int events = __uring_thread_loop(); + + if (events == 0) { + struct io_uring_cqe *cqe; // ignored + thread_cancel_enable(); + io_uring_wait_cqe(&rtpe_uring, &cqe); // maybe not a cancellation point + thread_cancel_disable(); + } +} + +void uring_poller_clear(struct poller *p) { + struct uring_req *req = uring_alloc_buffer_req(sizeof(*req)); + struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring); + io_uring_prep_cancel(sqe, 0, IORING_ASYNC_CANCEL_ANY); + io_uring_sqe_set_data(sqe, req); + while (__uring_thread_loop() != 0) { } +} + #endif diff --git a/lib/uring.h b/lib/uring.h index 08a192eae..43e8cfa01 100644 --- a/lib/uring.h +++ b/lib/uring.h @@ -38,6 +38,20 @@ INLINE void *uring_alloc_buffer_req(size_t len) { void uring_thread_init(void); void uring_thread_cleanup(void); +struct poller_item; +struct poller *uring_poller_new(void); +void uring_poller_free(struct poller **pp); +void uring_poller_add_waker(struct poller *p); +void uring_poller_wake(struct poller *p); +void uring_poller_poll(struct poller *); +void uring_poller_clear(struct poller *); + +bool uring_poller_add_item(struct poller *p, struct poller_item *i); +bool uring_poller_del_item(struct poller *p, int fd); +void uring_poller_blocked(struct poller *p, void *fdp); +void uring_poller_error(struct poller *p, void *fdp); +bool uring_poller_del_item_callback(struct poller *p, int fd, void (*callback)(void *), void *arg); + #endif #endif diff --git a/recording-daemon/poller.h b/recording-daemon/poller.h index 887a52c16..c4d5f2c29 100644 --- a/recording-daemon/poller.h +++ b/recording-daemon/poller.h @@ -15,8 +15,9 @@ struct poller { }; void poller_blocked(struct poller *, void *); -int poller_isblocked(struct poller *, void *); void poller_error(struct poller *, void *); +#define rtpe_poller_blocked poller_blocked +#define rtpe_poller_error poller_error #endif diff --git a/t/test-mix-buffer.c b/t/test-mix-buffer.c index c85763c9e..ca174dd72 100644 --- a/t/test-mix-buffer.c +++ b/t/test-mix-buffer.c @@ -16,6 +16,7 @@ struct global_sampled_min_max rtpe_sampled_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; __thread struct bufferpool *media_bufferpool; +void append_thread_lpr_to_glob_lpr(void) {} int get_local_log_level(unsigned int u) { return -1; diff --git a/t/test-payload-tracker.c b/t/test-payload-tracker.c index 853e1d16e..51dcb2bd2 100644 --- a/t/test-payload-tracker.c +++ b/t/test-payload-tracker.c @@ -16,6 +16,7 @@ struct global_sampled_min_max rtpe_sampled_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; __thread struct bufferpool *media_bufferpool; +void append_thread_lpr_to_glob_lpr(void) {} static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) { char buf[1024] = ""; diff --git a/t/test-stats.c b/t/test-stats.c index de566d442..d0f8c0f4b 100644 --- a/t/test-stats.c +++ b/t/test-stats.c @@ -17,13 +17,9 @@ struct rtpengine_config rtpe_config = { struct rtpengine_config initial_rtpe_config; struct poller **rtpe_pollers = (struct poller *[]) {NULL}; struct poller *rtpe_control_poller; +struct poller *uring_poller; unsigned int num_media_pollers = 1; unsigned int rtpe_poller_rr_iter; -bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item; -bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item; -bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback; -void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked; -void (*rtpe_poller_error)(struct poller *, void *) = poller_error; GString *dtmf_logs; GQueue rtpe_control_ng = G_QUEUE_INIT; diff --git a/t/test-transcode.c b/t/test-transcode.c index 5483367b3..9f4773c0c 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -14,13 +14,9 @@ struct rtpengine_config rtpe_config; struct rtpengine_config initial_rtpe_config; struct poller **rtpe_pollers; struct poller *rtpe_control_poller; +struct poller *uring_poller; unsigned int num_media_pollers; unsigned int rtpe_poller_rr_iter; -bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item; -bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item; -bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback; -void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked; -void (*rtpe_poller_error)(struct poller *, void *) = poller_error; GString *dtmf_logs; GQueue rtpe_control_ng = G_QUEUE_INIT;