good bye poll(), hello epoll_*()

git.mgm/mediaproxy-ng/2.1
Richard Fuchs 14 years ago
parent b7c8ddf1a0
commit 8d50923508

@ -851,9 +851,9 @@ static void steal_peer(struct peer *dest, struct peer *src) {
if (sr->fd != -1) { if (sr->fd != -1) {
mylog(LOG_DEBUG, LOG_PREFIX_CI "Closing port %u in favor of re-use", mylog(LOG_DEBUG, LOG_PREFIX_CI "Closing port %u in favor of re-use",
LOG_PARAMS_CI(c), sr->localport); LOG_PARAMS_CI(c), sr->localport);
poller_del_item(po, sr->fd);
close(sr->fd); close(sr->fd);
bit_array_clear(ports_used, sr->localport); bit_array_clear(ports_used, sr->localport);
poller_del_item(po, sr->fd);
} }
sr->fd = srs->fd; sr->fd = srs->fd;
@ -1143,10 +1143,10 @@ static void kill_callstream(struct callstream *s) {
r = &p->rtps[j]; r = &p->rtps[j];
if (r->fd != -1) { if (r->fd != -1) {
poller_del_item(s->call->callmaster->poller, r->fd);
close(r->fd); close(r->fd);
bit_array_clear(ports_used, r->localport); bit_array_clear(ports_used, r->localport);
} }
poller_del_item(s->call->callmaster->poller, r->fd);
} }
} }

@ -25,9 +25,9 @@ static void control_stream_closed(int fd, void *p) {
c->stream_head = g_list_remove_link(c->stream_head, &s->link); c->stream_head = g_list_remove_link(c->stream_head, &s->link);
close(fd);
if (poller_del_item(s->poller, fd)) if (poller_del_item(s->poller, fd))
abort(); abort();
close(fd);
streambuf_destroy(s->inbuf); streambuf_destroy(s->inbuf);
streambuf_destroy(s->outbuf); streambuf_destroy(s->outbuf);

@ -1,60 +0,0 @@
#include <stdio.h>
#include <assert.h>
#include "poller.h"
void dummy(int a, void *b) {
}
int main() {
struct poller *p;
struct poller_item i;
p = poller_new();
if (!p) {
fprintf(stderr, "poller creation failed\n");
return -1;
}
assert(p->items_size == 0);
assert(p->pollfds_size == 0);
i.readable = dummy;
i.writeable = dummy;
i.closed = dummy;
i.fd = 3;
assert(poller_add_item(p, &i) == 0);
i.fd = 4;
assert(poller_add_item(p, &i) == 0);
i.fd = 2;
assert(poller_add_item(p, &i) == 0);
i.fd = 6;
assert(poller_add_item(p, &i) == 0);
i.fd = 0;
assert(poller_add_item(p, &i) == 0);
i.fd = 1;
assert(poller_add_item(p, &i) == 0);
i.fd = 5;
assert(poller_add_item(p, &i) == 0);
i.fd = 7;
assert(poller_add_item(p, &i) == 0);
i.fd = 9;
assert(poller_add_item(p, &i) == 0);
assert(poller_del_item(p, 10) == -1);
assert(poller_del_item(p, 6) == 0);
assert(poller_del_item(p, 8) == -1);
assert(poller_del_item(p, 0) == 0);
assert(poller_del_item(p, 3) == 0);
assert(poller_del_item(p, 11) == -1);
assert(poller_del_item(p, 9) == 0);
assert(poller_del_item(p, 11) == -1);
assert(poller_del_item(p, 4) == 0);
return 0;
}

@ -7,6 +7,7 @@
#include <time.h> #include <time.h>
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <sys/epoll.h>
#include "poller.h" #include "poller.h"
#include "aux.h" #include "aux.h"
@ -34,16 +35,23 @@ struct poller *poller_new(void) {
p = malloc(sizeof(*p)); p = malloc(sizeof(*p));
memset(p, 0, sizeof(*p)); memset(p, 0, sizeof(*p));
p->now = time(NULL); p->now = time(NULL);
p->fd = epoll_create1(0);
if (p->fd == -1)
abort();
return p; return p;
} }
static int epoll_events(struct poller_item *i) {
return EPOLLHUP | EPOLLERR | ((i->writeable && i->blocked) ? EPOLLOUT : 0) | (i->readable ? EPOLLIN : 0);
}
int poller_add_item(struct poller *p, struct poller_item *i) { int poller_add_item(struct poller *p, struct poller_item *i) {
struct poller_item *ip; struct poller_item *ip;
struct pollfd *pf;
int idx;
unsigned int u; unsigned int u;
struct epoll_event e;
if (!p || !i) if (!p || !i)
return -1; return -1;
@ -57,20 +65,10 @@ int poller_add_item(struct poller *p, struct poller_item *i) {
if (i->fd < p->items_size && p->items[i->fd]) if (i->fd < p->items_size && p->items[i->fd])
return -1; return -1;
idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &i->fd, 0); e.events = epoll_events(i);
assert(idx < 0); e.data.fd = i->fd;
if (epoll_ctl(p->fd, EPOLL_CTL_ADD, i->fd, &e))
idx *= -1; abort();
idx--;
p->pollfds_size++;
p->pollfds = realloc(p->pollfds, p->pollfds_size * sizeof(*p->pollfds));
memmove(p->pollfds + idx + 1, p->pollfds + idx, (p->pollfds_size - idx - 1) * sizeof(*p->pollfds));
pf = &p->pollfds[idx];
pf->fd = i->fd;
pf->events = POLLHUP | POLLERR | ((i->writeable && i->blocked) ? POLLOUT : 0) | (i->readable ? POLLIN : 0);
pf->revents = 0;
if (i->fd >= p->items_size) { if (i->fd >= p->items_size) {
u = p->items_size; u = p->items_size;
@ -88,30 +86,15 @@ int poller_add_item(struct poller *p, struct poller_item *i) {
int poller_del_item(struct poller *p, int fd) { int poller_del_item(struct poller *p, int fd) {
int idx;
if (!p || fd < 0) if (!p || fd < 0)
return -1; return -1;
if (fd >= p->items_size) if (fd >= p->items_size)
return -1; return -1;
if (!p->items || !p->items[fd]) if (!p->items || !p->items[fd])
return -1; return -1;
if (!p->pollfds || !p->pollfds_size)
return -1;
idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &fd, 1);
assert(idx != -1);
memmove(p->pollfds + idx, p->pollfds + idx + 1, (p->pollfds_size - idx - 1) * sizeof(*p->pollfds)); if (epoll_ctl(p->fd, EPOLL_CTL_DEL, fd, NULL))
p->pollfds_size--; abort();
p->pollfds = realloc(p->pollfds, p->pollfds_size * sizeof(*p->pollfds));
if (p->pollfds_work) {
idx = POLLER_BSEARCH(p->pollfds_work, p->pollfds_work_size, &fd, 1);
if (idx != -1)
p->pollfds_work[idx].fd = -1;
}
free(p->items[fd]); free(p->items[fd]);
p->items[fd] = NULL; p->items[fd] = NULL;
@ -146,83 +129,75 @@ int poller_update_item(struct poller *p, struct poller_item *i) {
int poller_poll(struct poller *p, int timeout) { int poller_poll(struct poller *p, int timeout) {
struct pollfd *pfd, *pf;
int ret, i; int ret, i;
struct poller_item *it; struct poller_item *it;
int idx;
time_t last; time_t last;
int do_timer;
GList *li; GList *li;
struct timer_item *ti; struct timer_item *ti;
struct epoll_event evs[128], *ev, e;
if (!p) if (!p)
return -1; return -1;
if (!p->pollfds || !p->pollfds_size)
return -1;
if (!p->items || !p->items_size) if (!p->items || !p->items_size)
return -1; return -1;
p->pollfds_work_size = i = p->pollfds_size;
p->pollfds_work = pfd = malloc(sizeof(*pfd) * i);
memcpy(pfd, p->pollfds, sizeof(*pfd) * i);
do_timer = 0;
last = p->now; last = p->now;
p->now = time(NULL); p->now = time(NULL);
if (last != p->now) { if (last != p->now) {
do_timer = 1;
ret = i;
for (li = p->timers; li; li = li->next) { for (li = p->timers; li; li = li->next) {
ti = li->data; ti = li->data;
ti->func(ti->ptr); ti->func(ti->ptr);
} }
}
else { for (i = 0; i < p->items_size; i++) {
ret = poll(pfd, i, timeout); it = p->items[i];
if (errno == EINTR) if (!it)
ret = 0; continue;
if (ret < 0) if (!it->timer)
goto out; continue;
it->timer(it->fd, it->ptr);
}
return p->items_size;
} }
pf = pfd; errno = 0;
for (pf = pfd; i; pf++) { ret = epoll_wait(p->fd, evs, sizeof(evs) / sizeof(*evs), timeout);
i--;
if (pf->fd < 0) if (errno == EINTR)
continue; ret = 0;
if (ret < 0)
goto out;
it = (pf->fd < p->items_size) ? p->items[pf->fd] : NULL; for (i = 0; i < ret; i++) {
if (!it) ev = &evs[i];
if (ev->data.fd < 0)
continue; continue;
if (do_timer) { it = (ev->data.fd < p->items_size) ? p->items[ev->data.fd] : NULL;
if (it->timer) if (!it)
it->timer(it->fd, it->ptr);
continue; continue;
}
if (it->error) { if (it->error) {
it->closed(it->fd, it->ptr); it->closed(it->fd, it->ptr);
continue; continue;
} }
if ((pf->revents & (POLLERR | POLLHUP))) if ((ev->events & (POLLERR | POLLHUP)))
it->closed(it->fd, it->ptr); it->closed(it->fd, it->ptr);
else if ((pf->revents & POLLOUT)) { else if ((ev->events & POLLOUT)) {
it->blocked = 0; it->blocked = 0;
idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &it->fd, 1); e.events = epoll_events(it);
assert(idx != -1); e.data.fd = it->fd;
if (epoll_ctl(p->fd, EPOLL_CTL_MOD, it->fd, &e))
p->pollfds[idx].events &= ~POLLOUT; abort();
it->writeable(it->fd, it->ptr); it->writeable(it->fd, it->ptr);
} }
else if ((pf->revents & POLLIN)) else if ((ev->events & POLLIN))
it->readable(it->fd, it->ptr); it->readable(it->fd, it->ptr);
else if (!pf->revents) else if (!ev->events)
continue; continue;
else else
abort(); abort();
@ -230,15 +205,12 @@ int poller_poll(struct poller *p, int timeout) {
out: out:
free(pfd);
p->pollfds_work = NULL;
p->pollfds_work_size = 0;
return ret; return ret;
} }
void poller_blocked(struct poller *p, int fd) { void poller_blocked(struct poller *p, int fd) {
int idx; struct epoll_event e;
if (!p || fd < 0) if (!p || fd < 0)
return; return;
@ -246,17 +218,15 @@ void poller_blocked(struct poller *p, int fd) {
return; return;
if (!p->items || !p->items[fd]) if (!p->items || !p->items[fd])
return; return;
if (!p->pollfds || !p->pollfds_size)
return;
if (!p->items[fd]->writeable) if (!p->items[fd]->writeable)
return; return;
p->items[fd]->blocked = 1; p->items[fd]->blocked = 1;
idx = POLLER_BSEARCH(p->pollfds, p->pollfds_size, &fd, 1); e.events = epoll_events(p->items[fd]);
assert(idx != -1); e.data.fd = fd;
if (epoll_ctl(p->fd, EPOLL_CTL_MOD, fd, &e))
p->pollfds[idx].events |= POLLOUT; abort();
} }
void poller_error(struct poller *p, int fd) { void poller_error(struct poller *p, int fd) {
@ -266,8 +236,6 @@ void poller_error(struct poller *p, int fd) {
return; return;
if (!p->items || !p->items[fd]) if (!p->items || !p->items[fd])
return; return;
if (!p->pollfds || !p->pollfds_size)
return;
if (!p->items[fd]->writeable) if (!p->items[fd]->writeable)
return; return;
@ -282,8 +250,6 @@ int poller_isblocked(struct poller *p, int fd) {
return -1; return -1;
if (!p->items || !p->items[fd]) if (!p->items || !p->items[fd])
return -1; return -1;
if (!p->pollfds || !p->pollfds_size)
return -1;
if (!p->items[fd]->writeable) if (!p->items[fd]->writeable)
return -1; return -1;

@ -23,16 +23,12 @@ struct poller_item {
}; };
struct poller { struct poller {
int fd;
struct poller_item **items; struct poller_item **items;
unsigned int items_size; unsigned int items_size;
struct pollfd *pollfds;
unsigned int pollfds_size;
GList *timers; GList *timers;
time_t now; time_t now;
struct pollfd *pollfds_work;
unsigned int pollfds_work_size;
}; };

Loading…
Cancel
Save