MT#55283 restore poller_isblocked

Revert of 128e84e and implement for io_uring poller

Turns out that this function is necessary to prevent out-of-order writes
to a streambuf once the connection is open.

Use a simple array to keep track of blocked fds under uring.

Change-Id: I4af2a64071030fd4892dde88547705230aec59fd
rfuchs/dataport
Richard Fuchs 2 years ago
parent 48e9628d12
commit 76363a0569

@ -1320,6 +1320,7 @@ static void create_everything(void) {
rtpe_poller_del_item = uring_poller_del_item;
rtpe_poller_del_item_callback = uring_poller_del_item_callback;
rtpe_poller_blocked = uring_poller_blocked;
rtpe_poller_isblocked = uring_poller_isblocked;
rtpe_poller_error = uring_poller_error;
}
#endif

@ -39,6 +39,7 @@ bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *) = poller_add
bool (*rtpe_poller_del_item)(struct poller *, int) = poller_del_item;
bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *) = poller_del_item_callback;
void (*rtpe_poller_blocked)(struct poller *, void *) = poller_blocked;
bool (*rtpe_poller_isblocked)(struct poller *, void *) = poller_isblocked;
void (*rtpe_poller_error)(struct poller *, void *) = poller_error;
@ -307,6 +308,30 @@ __thread unsigned int (*uring_thread_loop)(void) = __uring_thread_loop_dummy;
#endif
bool poller_isblocked(struct poller *p, void *fdp) {
int fd = GPOINTER_TO_INT(fdp);
int ret;
if (!p || fd < 0)
return false;
LOCK(&p->lock);
ret = -1;
if (fd >= p->items->len)
goto out;
struct poller_item_int *it;
if (!(it = p->items->pdata[fd]))
goto out;
if (!it->item.writeable)
goto out;
ret = !!it->blocked;
out:
return ret;
}
void poller_loop(void *d) {
struct poller *p = d;
int poller_size = rtpe_common_config_ptr->poller_size;

@ -37,6 +37,7 @@ bool poller_del_item(struct poller *, int);
bool poller_del_item_callback(struct poller *, int, void (*)(void *), void *);
void poller_blocked(struct poller *, void *);
bool poller_isblocked(struct poller *, void *);
void poller_error(struct poller *, void *);
void poller_loop(void *);
@ -45,6 +46,7 @@ extern bool (*rtpe_poller_add_item)(struct poller *, struct poller_item *);
extern bool (*rtpe_poller_del_item)(struct poller *, int);
extern bool (*rtpe_poller_del_item_callback)(struct poller *, int, void (*)(void *), void *);
extern void (*rtpe_poller_blocked)(struct poller *, void *);
extern bool (*rtpe_poller_isblocked)(struct poller *, void *);
extern void (*rtpe_poller_error)(struct poller *, void *);

@ -209,7 +209,7 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
mutex_lock(&b->lock);
while (len) {
while (len && !rtpe_poller_isblocked(b->poller, b->fd_ptr)) {
out = (len > 1024) ? 1024 : len;
ret = b->funcs->write(b->fd_ptr, s, out);

@ -29,6 +29,7 @@ struct poller {
GPtrArray *evs; // holds uring_poll_event by fd
struct bufferpool *bufferpool;
struct uring_buffer *buffers[BUFFER_POOLS];
GArray *blocked;
};
struct poller_req {
@ -141,6 +142,7 @@ struct poller *uring_poller_new(void) {
nonblock(ret->waker_fds[0]);
nonblock(ret->waker_fds[1]);
ret->evs = g_ptr_array_new();
ret->blocked = g_array_new(false, true, sizeof(char));
ret->bufferpool = bufferpool_new(g_malloc, g_free, BUFFER_SIZE * BUFFERS_COUNT);
for (int i = 0; i < BUFFER_POOLS; i++) {
@ -161,6 +163,7 @@ void uring_poller_free(struct poller **pp) {
close((*pp)->waker_fds[0]);
close((*pp)->waker_fds[1]);
g_ptr_array_free((*pp)->evs, true);
g_array_free((*pp)->blocked, true);
for (int i = 0; i < BUFFER_POOLS; i++) {
bufferpool_release((*pp)->buffers[i]->buf);
g_free((*pp)->buffers[i]);
@ -211,9 +214,24 @@ void uring_poller_blocked(struct poller *p, void *fdp) {
LOCK(&p->lock);
if (p->blocked->len <= req->fd)
g_array_set_size(p->blocked, req->fd + 1);
g_array_index(p->blocked, char, req->fd) = 1;
g_queue_push_tail(&p->reqs, req);
uring_poller_wake(p);
}
bool uring_poller_isblocked(struct poller *p, void *fdp) {
int fd = GPOINTER_TO_INT(fdp);
if (fd < 0)
return false;
LOCK(&p->lock);
if (p->blocked->len <= fd)
return false;
return !!g_array_index(p->blocked, char, fd);
}
void uring_poller_error(struct poller *p, void *fdp) {
struct poller_req *req = g_new0(__typeof(*req), 1);
req->type = ERROR;
@ -314,6 +332,7 @@ static void uring_poll_removed(struct uring_req *req, int32_t res, uint32_t flag
struct uring_poll_unblocked {
struct uring_req req; // must be first
struct poller_item it;
struct poller *poller;
};
static void uring_poll_unblocked(struct uring_req *req, int32_t res, uint32_t flags) {
struct uring_poll_unblocked *ureq = (__typeof(ureq)) req;
@ -328,8 +347,12 @@ static void uring_poll_unblocked(struct uring_req *req, int32_t res, uint32_t fl
ureq->it.fd, res);
closed = true;
}
else
else {
struct poller *p = ureq->poller;
if (p->blocked->len > ureq->it.fd)
g_array_index(p->blocked, char, ureq->it.fd) = 0;
ureq->it.writeable(ureq->it.fd, ureq->it.obj);
}
assert((flags & IORING_CQE_F_MORE) == 0);
@ -432,6 +455,7 @@ static void uring_poller_do_blocked(struct poller *p, struct poller_req *preq) {
struct uring_poll_unblocked *ureq
= uring_alloc_req(sizeof(*ureq), uring_poll_unblocked);
ureq->it = ereq->it;
ureq->poller = p;
if (ureq->it.obj)
obj_hold_o(ureq->it.obj);
struct io_uring_sqe *sqe = io_uring_get_sqe(&rtpe_uring);

@ -49,6 +49,7 @@ void uring_poller_clear(struct poller *);
bool uring_poller_add_item(struct poller *p, struct poller_item *i);
bool uring_poller_del_item(struct poller *p, int fd);
void uring_poller_blocked(struct poller *p, void *fdp);
bool uring_poller_isblocked(struct poller *p, void *fdp);
void uring_poller_error(struct poller *p, void *fdp);
bool uring_poller_del_item_callback(struct poller *p, int fd, void (*callback)(void *), void *arg);

@ -3,7 +3,7 @@
void poller_blocked(struct poller *p, void *fdp) {
p->state = PS_WRITE_BLOCKED;
}
int poller_isblocked(struct poller *p, void *fdp) {
bool poller_isblocked(struct poller *p, void *fdp) {
return p->state != PS_OPEN;
}
void poller_error(struct poller *p, void *fdp) {

@ -1,6 +1,7 @@
#ifndef __POLLER_H__
#define __POLLER_H__
#include <stdbool.h>
// dummy poller
struct poller {
@ -16,7 +17,9 @@ struct poller {
void poller_blocked(struct poller *, void *);
void poller_error(struct poller *, void *);
bool poller_isblocked(struct poller *, void *);
#define rtpe_poller_isblocked poller_isblocked
#define rtpe_poller_blocked poller_blocked
#define rtpe_poller_error poller_error

Loading…
Cancel
Save