TT#52651 generalise streambuf interface

Change-Id: I7d5ab8ffe13e52d4dbb1901531cc13fcc173d60d
changes/68/27468/3
Richard Fuchs 6 years ago
parent cb2dbd2a92
commit b28e718ee4

@ -366,7 +366,8 @@ out:
}
void poller_blocked(struct poller *p, int fd) {
void poller_blocked(struct poller *p, void *fdp) {
int fd = GPOINTER_TO_INT(fdp);
struct epoll_event e;
if (!p || fd < 0)
@ -392,7 +393,8 @@ fail:
mutex_unlock(&p->lock);
}
void poller_error(struct poller *p, int fd) {
void poller_error(struct poller *p, void *fdp) {
int fd = GPOINTER_TO_INT(fdp);
if (!p || fd < 0)
return;
@ -412,7 +414,8 @@ fail:
mutex_unlock(&p->lock);
}
int poller_isblocked(struct poller *p, int fd) {
int poller_isblocked(struct poller *p, void *fdp) {
int fd = GPOINTER_TO_INT(fdp);
int ret;
if (!p || fd < 0)

@ -34,9 +34,10 @@ struct poller *poller_new(void);
int poller_add_item(struct poller *, struct poller_item *);
int poller_update_item(struct poller *, struct poller_item *);
int poller_del_item(struct poller *, int);
void poller_blocked(struct poller *, int);
int poller_isblocked(struct poller *, int);
void poller_error(struct poller *, int);
void poller_blocked(struct poller *, void *);
int poller_isblocked(struct poller *, void *);
void poller_error(struct poller *, void *);
int poller_poll(struct poller *, int);
void poller_timer_loop(void *);

@ -14,20 +14,38 @@
static ssize_t __fd_write(void *, const void *, size_t);
static ssize_t __fd_read(void *, void *, size_t);
struct streambuf *streambuf_new(struct poller *p, int fd) {
static const struct streambuf_funcs __fd_funcs = {
.write = __fd_write,
.read = __fd_read,
};
static ssize_t __fd_write(void *fd, const void *b, size_t s) {
return write(GPOINTER_TO_INT(fd), b, s);
}
static ssize_t __fd_read(void *fd, void *b, size_t s) {
return read(GPOINTER_TO_INT(fd), b, s);
}
struct streambuf *streambuf_new_ptr(struct poller *p, void *fd_ptr, const struct streambuf_funcs *funcs) {
struct streambuf *b;
b = g_slice_alloc0(sizeof(*b));
mutex_init(&b->lock);
b->buf = g_string_new("");
b->fd = fd;
b->fd_ptr = fd_ptr;
b->poller = p;
b->active = rtpe_now.tv_sec;
b->funcs = funcs;
return b;
}
struct streambuf *streambuf_new(struct poller *p, int fd) {
return streambuf_new_ptr(p, GINT_TO_POINTER(fd), &__fd_funcs);
}
void streambuf_destroy(struct streambuf *b) {
@ -47,7 +65,7 @@ int streambuf_writeable(struct streambuf *b) {
break;
out = (b->buf->len > 1024) ? 1024 : b->buf->len;
ret = write(b->fd, b->buf->str, out);
ret = b->funcs->write(b->fd_ptr, b->buf->str, out);
if (ret < 0) {
if (errno == EINTR)
@ -65,7 +83,7 @@ int streambuf_writeable(struct streambuf *b) {
}
if (ret != out) {
poller_blocked(b->poller, b->fd);
poller_blocked(b->poller, b->fd_ptr);
break;
}
}
@ -81,7 +99,7 @@ int streambuf_readable(struct streambuf *b) {
mutex_lock(&b->lock);
for (;;) {
ret = read(b->fd, buf, 1024);
ret = b->funcs->read(b->fd_ptr, buf, 1024);
if (ret == 0) {
// don't discard already read data in the buffer
@ -188,18 +206,18 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
mutex_lock(&b->lock);
while (len && !poller_isblocked(b->poller, b->fd)) {
while (len && !poller_isblocked(b->poller, b->fd_ptr)) {
out = (len > 1024) ? 1024 : len;
ret = write(b->fd, s, out);
ret = b->funcs->write(b->fd_ptr, s, out);
if (ret < 0) {
if (errno == EINTR)
continue;
if (errno != EAGAIN && errno != EWOULDBLOCK) {
poller_error(b->poller, b->fd);
poller_error(b->poller, b->fd_ptr);
break;
}
poller_blocked(b->poller, b->fd);
poller_blocked(b->poller, b->fd_ptr);
break;
}
if (ret == 0)
@ -211,7 +229,7 @@ void streambuf_write(struct streambuf *b, const char *s, unsigned int len) {
}
if (b->buf->len > 5242880)
poller_error(b->poller, b->fd);
poller_error(b->poller, b->fd_ptr);
else if (len)
g_string_append_len(b->buf, s, len);

@ -18,18 +18,25 @@ struct poller;
struct streambuf_funcs {
ssize_t (*write)(void *, const void *, size_t);
ssize_t (*read)(void *, void *, size_t);
};
struct streambuf {
mutex_t lock;
GString *buf;
int fd;
void *fd_ptr;
struct poller *poller;
time_t active;
int eof;
const struct streambuf_funcs
*funcs;
};
struct streambuf *streambuf_new(struct poller *, int);
struct streambuf *streambuf_new_ptr(struct poller *, void *, const struct streambuf_funcs *);
void streambuf_destroy(struct streambuf *);
int streambuf_writeable(struct streambuf *);
int streambuf_readable(struct streambuf *);

@ -1,10 +1,10 @@
#include "poller.h"
void poller_blocked(struct poller *p, int fd) {
void poller_blocked(struct poller *p, void *fdp) {
p->blocked = 1;
}
int poller_isblocked(struct poller *p, int fd) {
int poller_isblocked(struct poller *p, void *fdp) {
return p->blocked ? 1 : 0;
}
void poller_error(struct poller *p, int fd) {
void poller_error(struct poller *p, void *fdp) {
}

@ -10,9 +10,9 @@ struct poller {
int intro:1;
};
void poller_blocked(struct poller *, int);
int poller_isblocked(struct poller *, int);
void poller_error(struct poller *, int);
void poller_blocked(struct poller *, void *);
int poller_isblocked(struct poller *, void *);
void poller_error(struct poller *, void *);
#endif

Loading…
Cancel
Save