MT#55283 io_uring poller implementation

Change-Id: I4ead8cfd709e5b55c9174c0a0542eb9019b49dd0
pull/1826/head
Richard Fuchs 1 year ago
parent ebaca8a4e0
commit 9c2bccde84

@ -58,6 +58,8 @@
#include "janus.h" #include "janus.h"
#include "nftables.h" #include "nftables.h"
#include "bufferpool.h" #include "bufferpool.h"
#include "log_funcs.h"
#include "uring.h"
@ -68,12 +70,6 @@ static unsigned int num_poller_threads;
unsigned int num_media_pollers; unsigned int num_media_pollers;
unsigned int rtpe_poller_rr_iter; unsigned int rtpe_poller_rr_iter;
bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item;
bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item;
bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback;
void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked;
void (*rtpe_poller_error)(struct poller *, void *) = poller_error;
struct rtpengine_config initial_rtpe_config; struct rtpengine_config initial_rtpe_config;
static GQueue rtpe_tcp = G_QUEUE_INIT; static GQueue rtpe_tcp = G_QUEUE_INIT;
@ -1225,9 +1221,21 @@ static void early_init(void) {
#ifdef WITH_TRANSCODING #ifdef WITH_TRANSCODING
static void clib_init(void) { static void clib_init(void) {
media_bufferpool = bufferpool_new(g_malloc, g_free, 64 * 65536); media_bufferpool = bufferpool_new(g_malloc, g_free, 64 * 65536);
#ifdef HAVE_LIBURING
if (rtpe_config.common.io_uring)
uring_thread_init();
#endif
} }
static void clib_cleanup(void) { static void clib_cleanup(void) {
bufferpool_destroy(media_bufferpool); bufferpool_destroy(media_bufferpool);
#ifdef HAVE_LIBURING
if (rtpe_config.common.io_uring)
uring_thread_cleanup();
#endif
}
static void clib_loop(void) {
uring_thread_loop();
append_thread_lpr_to_glob_lpr();
} }
#endif #endif
@ -1259,6 +1267,7 @@ static void init_everything(void) {
#ifdef WITH_TRANSCODING #ifdef WITH_TRANSCODING
codeclib_thread_init = clib_init; codeclib_thread_init = clib_init;
codeclib_thread_cleanup = clib_cleanup; codeclib_thread_cleanup = clib_cleanup;
codeclib_thread_loop = clib_loop;
#endif #endif
codeclib_init(0); codeclib_init(0);
media_player_init(); media_player_init();
@ -1296,6 +1305,17 @@ static void create_everything(void) {
kernel_setup(); kernel_setup();
// either one global poller, or one per thread for media sockets plus one for control sockets // either one global poller, or one per thread for media sockets plus one for control sockets
#ifdef HAVE_LIBURING
if (rtpe_config.common.io_uring) {
rtpe_config.poller_per_thread = true;
rtpe_poller_add_item = uring_poller_add_item;
rtpe_poller_del_item = uring_poller_del_item;
rtpe_poller_del_item_callback = uring_poller_del_item_callback;
rtpe_poller_blocked = uring_poller_blocked;
rtpe_poller_error = uring_poller_error;
}
#endif
if (!rtpe_config.poller_per_thread) { if (!rtpe_config.poller_per_thread) {
num_media_pollers = num_rtpe_pollers = 1; num_media_pollers = num_rtpe_pollers = 1;
num_poller_threads = rtpe_config.num_threads; num_poller_threads = rtpe_config.num_threads;
@ -1307,7 +1327,7 @@ static void create_everything(void) {
} }
rtpe_pollers = g_malloc(sizeof(*rtpe_pollers) * num_rtpe_pollers); rtpe_pollers = g_malloc(sizeof(*rtpe_pollers) * num_rtpe_pollers);
for (unsigned int i = 0; i < num_rtpe_pollers; i++) { for (unsigned int i = 0; i < num_rtpe_pollers; i++) {
rtpe_pollers[i] = poller_new(); rtpe_pollers[i] = rtpe_config.common.io_uring ? uring_poller_new() : poller_new();
if (!rtpe_pollers[i]) if (!rtpe_pollers[i])
die("poller creation failed"); die("poller creation failed");
} }
@ -1412,6 +1432,29 @@ static void do_redis_restore(void) {
} }
static void uring_thread_waker(struct thread_waker *wk) {
struct poller *p = wk->arg;
uring_poller_wake(p);
}
static void uring_poller_loop(void *ptr) {
struct poller *p = ptr;
uring_poller_add_waker(p);
struct thread_waker wk = {.func = uring_thread_waker, .arg = p};
thread_waker_add_generic(&wk);
while (!rtpe_shutdown) {
gettimeofday(&rtpe_now, NULL);
uring_poller_poll(p);
append_thread_lpr_to_glob_lpr();
log_info_reset();
}
thread_waker_del(&wk);
uring_poller_clear(p);
}
int main(int argc, char **argv) { int main(int argc, char **argv) {
early_init(); early_init();
options(&argc, &argv); options(&argc, &argv);
@ -1473,7 +1516,8 @@ int main(int argc, char **argv) {
service_notify("READY=1\n"); service_notify("READY=1\n");
for (unsigned int idx = 0; idx < num_poller_threads; ++idx) for (unsigned int idx = 0; idx < num_poller_threads; ++idx)
thread_create_detach_prio(poller_loop, rtpe_pollers[idx % num_rtpe_pollers], thread_create_detach_prio(rtpe_config.common.io_uring ? uring_poller_loop : poller_loop,
rtpe_pollers[idx % num_rtpe_pollers],
rtpe_config.scheduling, rtpe_config.priority, rtpe_config.scheduling, rtpe_config.priority,
idx < rtpe_config.num_threads ? "poller" : "cpoller"); idx < rtpe_config.num_threads ? "poller" : "cpoller");
@ -1542,7 +1586,10 @@ int main(int argc, char **argv) {
release_listeners(&rtpe_control_ng); release_listeners(&rtpe_control_ng);
release_listeners(&rtpe_control_ng_tcp); release_listeners(&rtpe_control_ng_tcp);
for (unsigned int idx = 0; idx < num_rtpe_pollers; ++idx) for (unsigned int idx = 0; idx < num_rtpe_pollers; ++idx)
poller_free(&rtpe_pollers[idx]); if (rtpe_config.common.io_uring)
uring_poller_free(&rtpe_pollers[idx]);
else
poller_free(&rtpe_pollers[idx]);
g_free(rtpe_pollers); g_free(rtpe_pollers);
interfaces_free(); interfaces_free();
#ifndef WITHOUT_NFTABLES #ifndef WITHOUT_NFTABLES

@ -14,6 +14,7 @@
#include "obj.h" #include "obj.h"
#include "socket.h" #include "socket.h"
#include "log_funcs.h" #include "log_funcs.h"
#include "uring.h"
struct udp_listener_callback { struct udp_listener_callback {
struct obj obj; struct obj obj;

@ -213,13 +213,6 @@ extern struct poller *rtpe_control_poller; // poller for control sockets (maybe
extern unsigned int num_media_pollers; // for media sockets, >= 1 extern unsigned int num_media_pollers; // for media sockets, >= 1
extern unsigned int rtpe_poller_rr_iter; // round-robin assignment of pollers to each thread extern unsigned int rtpe_poller_rr_iter; // round-robin assignment of pollers to each thread
struct poller_item;
extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *);
extern bool (*rtpe_poller_del_item)(struct poller *, int);
extern bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *);
extern void (*rtpe_poller_blocked)(struct poller *, void *);
extern void (*rtpe_poller_error)(struct poller *, void *);
INLINE struct poller *rtpe_get_poller(void) { INLINE struct poller *rtpe_get_poller(void) {
// XXX optimise this for num_media_pollers == 1 ? // XXX optimise this for num_media_pollers == 1 ?

@ -850,6 +850,7 @@ static codec_def_t *codec_def_cn;
void (*codeclib_thread_init)(void); void (*codeclib_thread_init)(void);
void (*codeclib_thread_cleanup)(void); void (*codeclib_thread_cleanup)(void);
void (*codeclib_thread_loop)(void);
static GHashTable *codecs_ht; static GHashTable *codecs_ht;
@ -1531,7 +1532,7 @@ static void cc_init(void) {
cc_dlsym_resolve(rtpe_common_config_ptr->codec_chain_lib_path); cc_dlsym_resolve(rtpe_common_config_ptr->codec_chain_lib_path);
cc_set_thread_funcs(codeclib_thread_init, codeclib_thread_cleanup, NULL); cc_set_thread_funcs(codeclib_thread_init, codeclib_thread_cleanup, codeclib_thread_loop);
cc_client = cc_client_connect(4); cc_client = cc_client_connect(4);
if (!cc_client) if (!cc_client)

@ -368,6 +368,7 @@ extern const GQueue * const codec_supplemental_codecs;
// must be set before calling codeclib_init // must be set before calling codeclib_init
extern void (*codeclib_thread_init)(void); extern void (*codeclib_thread_init)(void);
extern void (*codeclib_thread_cleanup)(void); extern void (*codeclib_thread_cleanup)(void);
extern void (*codeclib_thread_loop)(void);
void codeclib_init(int); void codeclib_init(int);
void codeclib_free(void); void codeclib_free(void);

@ -35,6 +35,13 @@ struct poller {
GPtrArray *items; GPtrArray *items;
}; };
bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item;
bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item;
bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback;
void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked;
void (*rtpe_poller_error)(struct poller *, void *) = poller_error;
static void poller_free_item(struct poller_item_int *ele) { static void poller_free_item(struct poller_item_int *ele) {
if (ele) if (ele)
obj_put(ele); obj_put(ele);

@ -11,7 +11,7 @@
struct obj; struct obj;
struct sockaddr;
typedef void (*poller_func_t)(int, void *); typedef void (*poller_func_t)(int, void *);
@ -21,6 +21,8 @@ struct poller_item {
struct obj *obj; struct obj *obj;
poller_func_t readable; poller_func_t readable;
void (*recv)(struct obj *, char *b, size_t len, struct sockaddr *,
struct timeval *);
poller_func_t writeable; poller_func_t writeable;
poller_func_t closed; poller_func_t closed;
}; };
@ -38,6 +40,12 @@ void poller_error(struct poller *, void *);
void poller_loop(void *); void poller_loop(void *);
extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *);
extern bool (*rtpe_poller_del_item)(struct poller *, int);
extern bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *);
extern void (*rtpe_poller_blocked)(struct poller *, void *);
extern void (*rtpe_poller_error)(struct poller *, void *);
#ifdef HAVE_LIBURING #ifdef HAVE_LIBURING
extern __thread unsigned int (*uring_thread_loop)(void); extern __thread unsigned int (*uring_thread_loop)(void);

@ -83,7 +83,7 @@ int streambuf_writeable(struct streambuf *b) {
} }
if (ret != out) { if (ret != out) {
poller_blocked(b->poller, b->fd_ptr); rtpe_poller_blocked(b->poller, b->fd_ptr);
break; break;
} }
} }
@ -217,10 +217,10 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
if (errno == EINTR) if (errno == EINTR)
continue; continue;
if (errno != EAGAIN && errno != EWOULDBLOCK) { if (errno != EAGAIN && errno != EWOULDBLOCK) {
poller_error(b->poller, b->fd_ptr); rtpe_poller_error(b->poller, b->fd_ptr);
break; break;
} }
poller_blocked(b->poller, b->fd_ptr); rtpe_poller_blocked(b->poller, b->fd_ptr);
break; break;
} }
if (ret == 0) if (ret == 0)
@ -232,7 +232,7 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
} }
if (b->buf->len > 5242880) if (b->buf->len > 5242880)
poller_error(b->poller, b->fd_ptr); rtpe_poller_error(b->poller, b->fd_ptr);
else if (len) else if (len)
g_string_append_len(b->buf, s, len); g_string_append_len(b->buf, s, len);

@ -1,12 +1,52 @@
#include "uring.h" #include "uring.h"
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <poll.h>
#include "log.h" #include "log.h"
#include "loglib.h" #include "loglib.h"
#include "socket.h" #include "socket.h"
#include "poller.h" #include "poller.h"
#include "bufferpool.h"
#include "call.h"
#define BUFFER_SIZE RTP_BUFFER_SIZE // size of individual buffer
#define BUFFERS_COUNT 1024 // number of buffers allocated in one pool, should be 2^n
#define BUFFER_POOLS 8 // number of pools to keep alive
static_assert(BUFFERS_COUNT * BUFFER_POOLS < (1<<16), "too many buffers (>= 2^16)");
struct uring_buffer {
void *buf;
struct poller *poller;
unsigned int num;
};
struct poller {
mutex_t lock;
GQueue reqs;
int waker_fds[2];
GPtrArray *evs; // holds uring_poll_event by fd
struct bufferpool *bufferpool;
struct uring_buffer *buffers[BUFFER_POOLS];
};
struct poller_req {
enum { ADD, BLOCKED, ERROR, DEL, BUFFERS, RECV } type;
union {
struct poller_item it;
struct {
int fd;
void (*callback)(void *);
void *arg;
};
struct {
void *buf;
unsigned int num;
};
};
};
static ssize_t __socket_sendmsg(socket_t *s, struct msghdr *m, const endpoint_t *e, static ssize_t __socket_sendmsg(socket_t *s, struct msghdr *m, const endpoint_t *e,
struct sockaddr_storage *ss, struct uring_req *r) struct sockaddr_storage *ss, struct uring_req *r)
{ {
@ -71,4 +111,449 @@ void uring_thread_cleanup(void) {
io_uring_queue_exit(&rtpe_uring); io_uring_queue_exit(&rtpe_uring);
} }
static void uring_submit_buffer(struct poller *p, void *b, unsigned int num) {
struct poller_req *req = g_new0(__typeof(*req), 1);
req->type = BUFFERS;
req->buf = b;
LOCK(&p->lock);
g_queue_push_tail(&p->reqs, req);
uring_poller_wake(p);
req->num = num;
}
static unsigned int uring_buffer_recycle(void *p) {
//ilog(LOG_INFO, "uring buffer recycle");
struct uring_buffer *b = p;
struct poller *poller = b->poller;
uring_submit_buffer(poller, b->buf, b->num);
return BUFFERS_COUNT;
}
struct poller *uring_poller_new(void) {
struct poller *ret = g_new0(__typeof(*ret), 1);
mutex_init(&ret->lock);
g_queue_init(&ret->reqs);
int ok = socketpair(AF_UNIX, SOCK_STREAM, 0, ret->waker_fds);
if (ok != 0)
return false;
nonblock(ret->waker_fds[0]);
nonblock(ret->waker_fds[1]);
ret->evs = g_ptr_array_new();
ret->bufferpool = bufferpool_new(g_malloc, g_free, BUFFER_SIZE * BUFFERS_COUNT);
for (int i = 0; i < BUFFER_POOLS; i++) {
ret->buffers[i] = g_new0(__typeof(*ret->buffers[i]), 1);
ret->buffers[i]->buf = bufferpool_reserve(ret->bufferpool, BUFFERS_COUNT,
uring_buffer_recycle, ret->buffers[i]);
ret->buffers[i]->buf += RTP_BUFFER_HEAD_ROOM;
ret->buffers[i]->num = i;
ret->buffers[i]->poller = ret;
uring_submit_buffer(ret, ret->buffers[i]->buf, i);
}
return ret;
}
void uring_poller_free(struct poller **pp) {
// XXX cleanup of reqs
close((*pp)->waker_fds[0]);
close((*pp)->waker_fds[1]);
g_ptr_array_free((*pp)->evs, true);
for (int i = 0; i < BUFFER_POOLS; i++) {
bufferpool_release((*pp)->buffers[i]->buf);
g_free((*pp)->buffers[i]);
}
bufferpool_destroy((*pp)->bufferpool);
g_free(*pp);
*pp = NULL;
}
void uring_poller_wake(struct poller *p) {
ssize_t ret = write(p->waker_fds[0], "", 1);
(void)ret; // ignore return value
}
bool uring_poller_add_item(struct poller *p, struct poller_item *i) {
if (!p)
return false;
if (!i)
return false;
if (i->fd < 0)
return false;
if (!i->readable)
return false;
if (!i->closed)
return false;
struct poller_req *req = g_new0(__typeof(*req), 1);
if (i->recv)
req->type = RECV;
else
req->type = ADD;
req->it = *i;
if (req->it.obj)
obj_hold_o(req->it.obj);
LOCK(&p->lock);
g_queue_push_tail(&p->reqs, req);
uring_poller_wake(p);
return true;
}
void uring_poller_blocked(struct poller *p, void *fdp) {
struct poller_req *req = g_new0(__typeof(*req), 1);
req->type = ERROR;
req->fd = GPOINTER_TO_INT(fdp);
LOCK(&p->lock);
g_queue_push_tail(&p->reqs, req);
uring_poller_wake(p);
}
void uring_poller_error(struct poller *p, void *fdp) {
struct poller_req *req = g_new0(__typeof(*req), 1);
req->type = BLOCKED;
req->fd = GPOINTER_TO_INT(fdp);
LOCK(&p->lock);
g_queue_push_tail(&p->reqs, req);
uring_poller_wake(p);
}
bool uring_poller_del_item_callback(struct poller *p, int fd, void (*callback)(void *), void *arg) {
if (rtpe_shutdown)
return true;
struct poller_req *req = g_new0(__typeof(*req), 1);
req->type = DEL;
req->fd = fd;
req->callback = callback;
req->arg = arg;
LOCK(&p->lock);
g_queue_push_tail(&p->reqs, req);
uring_poller_wake(p);
return true;
}
bool uring_poller_del_item(struct poller *p, int fd) {
return uring_poller_del_item_callback(p, fd, NULL, NULL);
}
struct uring_poll_event {
struct uring_req req; // must be first
struct poller_item it;
struct poller *poller;
bool closed:1;
};
static void uring_poll_event(struct uring_req *req, int32_t res, uint32_t flags) {
struct uring_poll_event *ereq = (__typeof(ereq)) req;
bool closed = false;
//ilog(LOG_INFO, "uring poll event %i %i %i", ereq->it.fd, res, flags);
if (res < 0) {
if (res != -ECANCELED)
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "io_uring poll error on fd %i: %s",
ereq->it.fd, strerror(-res));
closed = true;
}
else if ((res & (POLLERR | POLLHUP))) {
//ilog(LOG_INFO, "uring poll fd error %i %i", ereq->it.fd, res);
closed = true;
}
else if ((res & POLLIN))
ereq->it.readable(ereq->it.fd, ereq->it.obj);
else {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "unhandled io_uring poll event mask on fd %i: %i",
ereq->it.fd, res);
closed = true;
}
if (closed) {
if (!ereq->closed)
ereq->it.closed(ereq->it.fd, ereq->it.obj);
ereq->closed = true;
}
if (!(flags & IORING_CQE_F_MORE)) {
if (ereq->it.obj)
obj_put_o(ereq->it.obj);
struct poller *p = ereq->poller;
{
LOCK(&p->lock);
if (p->evs->len > ereq->it.fd && p->evs->pdata[ereq->it.fd] == ereq)
p->evs->pdata[ereq->it.fd] = NULL;
}
uring_req_free(&ereq->req);
}
}
struct uring_poll_removed {
struct uring_req req; // must be first
int fd;
void (*callback)(void *);
void *arg;
struct poller *poller;
};
static void uring_poll_removed(struct uring_req *req, int32_t res, uint32_t flags) {
struct uring_poll_removed *rreq = (__typeof(rreq)) req;
//ilog(LOG_INFO, "poll removed fd %i with cb %p/%p", rreq->fd, rreq->callback, rreq->arg);
if (rreq->callback)
rreq->callback(rreq->arg);
else
close(rreq->fd);
uring_req_free(req);
}
struct uring_poll_unblocked {
struct uring_req req; // must be first
struct poller_item it;
};
static void uring_poll_unblocked(struct uring_req *req, int32_t res, uint32_t flags) {
struct uring_poll_unblocked *ureq = (__typeof(ureq)) req;
bool closed = false;
if (res < 0) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "io_uring poll write error on fd %i: %s",
ureq->it.fd, strerror(-res));
closed = true;
}
else if (!(res & (POLLOUT))) {
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "unhandled io_uring poll event write mask on fd %i: %i",
ureq->it.fd, res);
closed = true;
}
else
ureq->it.writeable(ureq->it.fd, ureq->it.obj);
assert((flags & IORING_CQE_F_MORE) == 0);
if (closed)
ureq->it.closed(ureq->it.fd, ureq->it.obj);
if (ureq->it.obj)
obj_put_o(ureq->it.obj);
uring_req_free(req);
}
struct uring_poll_recv {
struct uring_req req; // must be first
struct poller_item it;
struct msghdr msg;
struct iovec iov;
struct poller *poller;
bool closed:1;
};
INLINE void uring_recvmsg_parse_cmsg(struct timeval *tv,
sockaddr_t *to, bool (*parse)(struct cmsghdr *, sockaddr_t *),
struct io_uring_recvmsg_out *out, struct msghdr *mh)
{
socket_recvfrom_parse_cmsg(&tv, &to, parse, mh,
io_uring_recvmsg_cmsg_firsthdr(out, mh),
io_uring_recvmsg_cmsg_nexthdr(out, mh, cm));
}
static void uring_poll_recv(struct uring_req *req, int32_t res, uint32_t flags) {
struct uring_poll_recv *rreq = (__typeof(rreq)) req;
struct poller *p = rreq->poller;
bool closed = false;
//ilog(LOG_INFO, "uring recvmsg event %i %i %i", rreq->it.fd, res, flags);
if (res < 0) {
if (res != -ECANCELED)
ilog(LOG_WARNING | LOG_FLAG_LIMIT, "io_uring recvmsg error on fd %i: %s",
rreq->it.fd, strerror(-res));
closed = true;
}
else {
assert((flags & IORING_CQE_F_BUFFER) != 0);
unsigned int buffer_id = flags >> IORING_CQE_BUFFER_SHIFT;
unsigned int pool_id = buffer_id / BUFFERS_COUNT;
unsigned int pool_offset = buffer_id % BUFFERS_COUNT;
//ilog(LOG_INFO, "pool id %u buf id %u", pool_id, pool_offset);
assert(pool_id < BUFFER_POOLS);
struct uring_buffer *ubuf = p->buffers[pool_id];
void *buf = ubuf->buf + BUFFER_SIZE * pool_offset;
struct io_uring_recvmsg_out *out = io_uring_recvmsg_validate(buf, BUFFER_SIZE, &rreq->msg);
assert(out != NULL);
void *payload = io_uring_recvmsg_payload(out, &rreq->msg);
struct sockaddr *sa = io_uring_recvmsg_name(out);
struct timeval tv = {0};
uring_recvmsg_parse_cmsg(&tv, NULL, NULL, out, &rreq->msg);
rreq->it.recv(rreq->it.obj, payload, out->payloadlen, sa, &tv);
}
if (!(flags & IORING_CQE_F_MORE))
closed = true;
if (closed) {
if (!rreq->closed)
rreq->it.closed(rreq->it.fd, rreq->it.obj);
rreq->closed = true;
}
if (!(flags & IORING_CQE_F_MORE)) {
//ilog(LOG_INFO, "last uring recv event for fd %i for %p (%i)", rreq->it.fd, rreq->it.obj, rreq->it.obj->ref);
if (rreq->it.obj)
obj_put_o(rreq->it.obj);
uring_req_free(&rreq->req);
}
}
static void uring_poller_do_add(struct poller *p, struct poller_req *preq) {
// don't allow duplicates
if (p->evs->len > preq->it.fd && p->evs->pdata[preq->it.fd])
abort(); // XXX handle gracefully?
struct uring_poll_event *ereq
= uring_alloc_req(sizeof(*ereq), uring_poll_event);
ereq->it = preq->it;
ereq->poller = p;
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring);
io_uring_prep_poll_multishot(sqe, ereq->it.fd, POLLHUP | POLLERR | POLLIN);
io_uring_sqe_set_data(sqe, ereq);
// save ereq for write blocks. no extra obj reference
if (p->evs->len <= ereq->it.fd)
g_ptr_array_set_size(p->evs, ereq->it.fd + 1);
p->evs->pdata[ereq->it.fd] = ereq;
}
static void uring_poller_do_blocked(struct poller *p, struct poller_req *preq) {
// valid fd?
if (p->evs->len <= preq->fd || !p->evs->pdata[preq->fd])
abort(); // XXX handle gracefully?
struct uring_poll_event *ereq = p->evs->pdata[preq->fd];
struct uring_poll_unblocked *ureq
= uring_alloc_req(sizeof(*ureq), uring_poll_unblocked);
ureq->it = ereq->it;
if (ureq->it.obj)
obj_hold_o(ureq->it.obj);
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring);
io_uring_prep_poll_add(sqe, ureq->it.fd, POLLOUT);
io_uring_sqe_set_data(sqe, ureq);
}
static void uring_poller_do_error(struct poller *p, struct poller_req *preq) {
// do nothing?
}
static void uring_poller_do_del(struct poller *p, struct poller_req *preq) {
//ilog(LOG_INFO, "del fd %i on %p", preq->fd, p);
struct uring_poll_removed *rreq
= uring_alloc_req(sizeof(*rreq), uring_poll_removed);
rreq->fd = preq->fd;
rreq->poller = p;
rreq->callback = preq->callback;
rreq->arg = preq->arg;
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring);
io_uring_prep_cancel_fd(sqe, rreq->fd, IORING_ASYNC_CANCEL_ALL);
io_uring_sqe_set_data(sqe, rreq);
}
static void uring_poller_do_buffers(struct poller *p, struct poller_req *preq) {
//ilog(LOG_INFO, "XXXXXXXXX adding buffers %p %u", p, preq->num);
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring);
io_uring_prep_provide_buffers(sqe, preq->buf, BUFFER_SIZE, BUFFERS_COUNT, 0,
preq->num * BUFFERS_COUNT);
struct uring_req *breq = uring_alloc_buffer_req(sizeof(*breq));
io_uring_sqe_set_data(sqe, breq); // XXX no content? not needed?
}
static void uring_poller_do_recv(struct poller *p, struct poller_req *preq) {
//ilog(LOG_INFO, "adding recv fd %i on %p for %p", preq->it.fd, p, preq->it.obj);
struct uring_poll_recv *rreq
= uring_alloc_req(sizeof(*rreq), uring_poll_recv);
rreq->it = preq->it;
rreq->poller = p;
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring);
rreq->iov = (__typeof(rreq->iov)) {
.iov_len = MAX_RTP_PACKET_SIZE,
};
rreq->msg = (__typeof(rreq->msg)) {
.msg_iov = &rreq->iov,
.msg_iovlen = 1,
.msg_namelen = sizeof(struct sockaddr_storage),
.msg_controllen = 64,
};
io_uring_prep_recvmsg_multishot(sqe, rreq->it.fd, &rreq->msg, 0);
sqe->ioprio |= IORING_RECVSEND_POLL_FIRST;
io_uring_sqe_set_flags(sqe, IOSQE_BUFFER_SELECT);
sqe->buf_group = 0;
io_uring_sqe_set_data(sqe, rreq);
}
static void uring_poller_do_reqs(struct poller *p) {
LOCK(&p->lock);
while (p->reqs.length) {
struct poller_req *preq = g_queue_pop_head(&p->reqs);
switch (preq->type) {
case ADD:
uring_poller_do_add(p, preq);
break;
case BLOCKED:
uring_poller_do_blocked(p, preq);
break;
case ERROR:
uring_poller_do_error(p, preq);
break;
case DEL:
uring_poller_do_del(p, preq);
break;
case BUFFERS:
uring_poller_do_buffers(p, preq);
break;
case RECV:
uring_poller_do_recv(p, preq);
break;
default:
abort();
}
g_free(preq);
}
}
void uring_poller_waker_read(int fd, void *p) {
char buf[32];
while (read(fd, buf, sizeof(buf)) > 0) { }
}
void uring_poller_waker_closed(int fd, void *p) {
if (!rtpe_shutdown)
abort();
}
void uring_poller_add_waker(struct poller *p) {
uring_poller_add_item(p,
&(struct poller_item) {
.readable = uring_poller_waker_read,
.closed = uring_poller_waker_closed,
.fd = p->waker_fds[1],
});
}
void uring_poller_poll(struct poller *p) {
uring_poller_do_reqs(p);
unsigned int events = __uring_thread_loop();
if (events == 0) {
struct io_uring_cqe *cqe; // ignored
thread_cancel_enable();
io_uring_wait_cqe(&rtpe_uring, &cqe); // maybe not a cancellation point
thread_cancel_disable();
}
}
void uring_poller_clear(struct poller *p) {
struct uring_req *req = uring_alloc_buffer_req(sizeof(*req));
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring);
io_uring_prep_cancel(sqe, 0, IORING_ASYNC_CANCEL_ANY);
io_uring_sqe_set_data(sqe, req);
while (__uring_thread_loop() != 0) { }
}
#endif #endif

@ -38,6 +38,20 @@ INLINE void *uring_alloc_buffer_req(size_t len) {
void uring_thread_init(void); void uring_thread_init(void);
void uring_thread_cleanup(void); void uring_thread_cleanup(void);
struct poller_item;
struct poller *uring_poller_new(void);
void uring_poller_free(struct poller **pp);
void uring_poller_add_waker(struct poller *p);
void uring_poller_wake(struct poller *p);
void uring_poller_poll(struct poller *);
void uring_poller_clear(struct poller *);
bool uring_poller_add_item(struct poller *p, struct poller_item *i);
bool uring_poller_del_item(struct poller *p, int fd);
void uring_poller_blocked(struct poller *p, void *fdp);
void uring_poller_error(struct poller *p, void *fdp);
bool uring_poller_del_item_callback(struct poller *p, int fd, void (*callback)(void *), void *arg);
#endif #endif
#endif #endif

@ -15,8 +15,9 @@ struct poller {
}; };
void poller_blocked(struct poller *, void *); void poller_blocked(struct poller *, void *);
int poller_isblocked(struct poller *, void *);
void poller_error(struct poller *, void *); void poller_error(struct poller *, void *);
#define rtpe_poller_blocked poller_blocked
#define rtpe_poller_error poller_error
#endif #endif

@ -16,6 +16,7 @@ struct global_sampled_min_max rtpe_sampled_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled;
__thread struct bufferpool *media_bufferpool; __thread struct bufferpool *media_bufferpool;
void append_thread_lpr_to_glob_lpr(void) {}
int get_local_log_level(unsigned int u) { int get_local_log_level(unsigned int u) {
return -1; return -1;

@ -16,6 +16,7 @@ struct global_sampled_min_max rtpe_sampled_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled;
__thread struct bufferpool *media_bufferpool; __thread struct bufferpool *media_bufferpool;
void append_thread_lpr_to_glob_lpr(void) {}
static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) { static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) {
char buf[1024] = ""; char buf[1024] = "";

@ -17,13 +17,9 @@ struct rtpengine_config rtpe_config = {
struct rtpengine_config initial_rtpe_config; struct rtpengine_config initial_rtpe_config;
struct poller **rtpe_pollers = (struct poller *[]) {NULL}; struct poller **rtpe_pollers = (struct poller *[]) {NULL};
struct poller *rtpe_control_poller; struct poller *rtpe_control_poller;
struct poller *uring_poller;
unsigned int num_media_pollers = 1; unsigned int num_media_pollers = 1;
unsigned int rtpe_poller_rr_iter; unsigned int rtpe_poller_rr_iter;
bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item;
bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item;
bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback;
void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked;
void (*rtpe_poller_error)(struct poller *, void *) = poller_error;
GString *dtmf_logs; GString *dtmf_logs;
GQueue rtpe_control_ng = G_QUEUE_INIT; GQueue rtpe_control_ng = G_QUEUE_INIT;

@ -14,13 +14,9 @@ struct rtpengine_config rtpe_config;
struct rtpengine_config initial_rtpe_config; struct rtpengine_config initial_rtpe_config;
struct poller **rtpe_pollers; struct poller **rtpe_pollers;
struct poller *rtpe_control_poller; struct poller *rtpe_control_poller;
struct poller *uring_poller;
unsigned int num_media_pollers; unsigned int num_media_pollers;
unsigned int rtpe_poller_rr_iter; unsigned int rtpe_poller_rr_iter;
bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add_item;
bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item;
bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback;
void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked;
void (*rtpe_poller_error)(struct poller *, void *) = poller_error;
GString *dtmf_logs; GString *dtmf_logs;
GQueue rtpe_control_ng = G_QUEUE_INIT; GQueue rtpe_control_ng = G_QUEUE_INIT;

Loading…
Cancel
Save