MT#55283 delegate closing sockets to poller

To support asynchronous pollers which may hold references on underlying
sockets, let the poller close the socket after it has released its
references. This prevents cases of file descriptor re-use while an
underlying socket is still open.

Add reset_socket() to be used in place of close_socket() which does the
same thing except the actual closing of the socket.

Add poller_del_item_callback() for cases where more action than just
closing the file descriptor is needed.

Change-Id: Iefda1487ecb89263729120ecb964436dd79b2a0e
pull/1826/head
Richard Fuchs 2 years ago
parent aab5d66c31
commit 58cbd2f21c

@ -648,7 +648,7 @@ void control_ng_free(void *p) {
rtpe_cngs_hash = NULL;
}
poller_del_item(rtpe_control_poller, c->udp_listener.fd);
close_socket(&c->udp_listener);
reset_socket(&c->udp_listener);
streambuf_listener_shutdown(&c->tcp_listener);
if (tcp_connections_hash)
g_hash_table_destroy(tcp_connections_hash);

@ -934,14 +934,26 @@ static int add_socket(socket_t *r, unsigned int port, struct intf_spec *spec, co
/**
* Pushing ports into the `ports_to_release` queue.
*/
static void release_port(socket_t *r, struct intf_spec *spec) {
static void release_port_push(void *p) {
struct late_port_release *lpr = p;
__C_DBG("Adding the port '%u' to late-release list", lpr->socket.local.port);
t_queue_push_tail(&ports_to_release, lpr);
}
static void release_port_poller(socket_t *r, struct intf_spec *spec, struct poller *poller) {
if (!r->local.port || r->fd == -1)
return;
__C_DBG("Adding the port '%u' to late-release list", r->local.port);
struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr));
move_socket(&lpr->socket, r);
lpr->spec = spec;
t_queue_push_tail(&ports_to_release, lpr);
if (!poller)
release_port_push(lpr);
else {
__C_DBG("Adding late-release callback for port '%u'", lpr->socket.local.port);
poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr);
}
}
static void release_port(socket_t *r, struct intf_spec *spec) {
release_port_poller(r, spec, NULL);
}
static void free_port(socket_t *r, struct intf_spec *spec) {
release_port(r, spec);
@ -3306,11 +3318,7 @@ void stream_fd_release(stream_fd *sfd) {
&sfd->socket.local); // releases reference
}
if (sfd->poller)
poller_del_item(sfd->poller, sfd->socket.fd);
sfd->poller = NULL;
release_port(&sfd->socket, sfd->local_intf->spec);
release_port_poller(&sfd->socket, sfd->local_intf->spec, sfd->poller);
}

@ -101,7 +101,6 @@ fail:
static void streambuf_stream_free(void *p) {
struct streambuf_stream *s = p;
close_socket(&s->sock);
streambuf_destroy(s->inbuf);
streambuf_destroy(s->outbuf);
obj_put(s->cb);
@ -123,6 +122,7 @@ static void streambuf_stream_closed(int fd, void *p, uintptr_t u) {
bool ret = t_hash_table_remove(l->streams, s);
mutex_unlock(&l->lock);
poller_del_item(rtpe_control_poller, s->sock.fd);
reset_socket(&s->sock);
if (ret)
obj_put(s);
}
@ -243,7 +243,7 @@ void streambuf_listener_shutdown(struct streambuf_listener *listener) {
if (!listener)
return;
poller_del_item(rtpe_control_poller, listener->listener.fd);
close_socket(&listener->listener);
reset_socket(&listener->listener);
t_hash_table_destroy_ptr(&listener->streams);
}

@ -132,7 +132,7 @@ bool poller_add_item(struct poller *p, struct poller_item *i) {
}
bool poller_del_item(struct poller *p, int fd) {
bool poller_del_item_callback(struct poller *p, int fd, void (*callback)(void *), void *arg) {
struct poller_item_int *it;
if (!p || fd < 0)
@ -158,8 +158,16 @@ bool poller_del_item(struct poller *p, int fd) {
obj_put(it);
if (callback)
callback(arg);
else
close(fd);
return true;
}
bool poller_del_item(struct poller *p, int fd) {
return poller_del_item_callback(p, fd, NULL, NULL);
}
static int poller_poll(struct poller *p, int timeout, struct epoll_event *evs, int poller_size) {

@ -32,6 +32,7 @@ struct poller *poller_new(void);
void poller_free(struct poller **);
bool poller_add_item(struct poller *, struct poller_item *);
bool poller_del_item(struct poller *, int);
bool poller_del_item_callback(struct poller *, int, void (*)(void *), void *);
void poller_blocked(struct poller *, void *);
int poller_isblocked(struct poller *, void *);

@ -874,6 +874,16 @@ int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep) {
return connect_socket_retry(r);
}
int reset_socket(socket_t *r) {
if (!r)
return -1;
r->fd = -1;
ZERO(r->local);
ZERO(r->remote);
return 0;
}
int close_socket(socket_t *r) {
if (!r) {
__C_DBG("close() syscall not called, no socket");
@ -891,9 +901,7 @@ int close_socket(socket_t *r) {
__C_DBG("close() syscall success, fd=%d", r->fd);
r->fd = -1;
ZERO(r->local);
ZERO(r->remote);
reset_socket(r);
return 0;
}
@ -904,9 +912,7 @@ int close_socket(socket_t *r) {
// does not actually close the socket
void move_socket(socket_t *dst, socket_t *src) {
*dst = *src;
src->fd = -1;
ZERO(src->local);
ZERO(src->remote);
reset_socket(src);
}

@ -247,6 +247,7 @@ int connect_socket(socket_t *r, int type, const endpoint_t *ep);
int connect_socket_nb(socket_t *r, int type, const endpoint_t *ep); // 1 == in progress
int connect_socket_retry(socket_t *r); // retries connect() while in progress
int close_socket(socket_t *r);
int reset_socket(socket_t *r);
void move_socket(socket_t *dst, socket_t *src);
void dummy_socket(socket_t *r, const sockaddr_t *);

@ -434,7 +434,6 @@ static void kill_threads(uint num) {
static void stream_free(void *p) {
struct stream *s = p;
close(s->timer_fd);
close(s->output_fd);
dump_close(s);
if (s->encoder)
@ -587,6 +586,7 @@ static void del_stream(void) {
return;
poller_del_item(rtpe_poller, s->timer_fd);
s->timer_fd = -1;
obj_put(s);
}

Loading…
Cancel
Save