From eeeb2d864108741a3c539e3f2c49584444aed76a Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 14 Feb 2025 08:13:10 -0400 Subject: [PATCH] MT#55283 move socket_port_link into stream_fd This allows us to simplify some function signatures Change-Id: I58f65735ba84ec7a536b1b170d1ef90e266308f5 --- daemon/call.c | 8 ++++---- daemon/media_socket.c | 26 ++++++++++++-------------- daemon/redis.c | 21 +++++++-------------- include/media_socket.h | 8 +++++--- 4 files changed, 28 insertions(+), 35 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index baaef2df7..fb188563f 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -878,7 +878,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU " "affinity: %s", strerror(errno)); } - sfd = stream_fd_new(&spl->socket, &spl->links, media->call, il->local_intf); + sfd = stream_fd_new(spl, media->call, il->local_intf); t_queue_push_tail(&em_il->list, sfd); // not referenced g_free(spl); } @@ -912,9 +912,9 @@ static void __assign_stream_fds(struct call_media *media, sfd_intf_list_q *intf_ if (!sfd) { // create a dummy sfd. needed to hold RTCP crypto context when // RTCP-mux is in use - socket_t sock; - dummy_socket(&sock, &il->local_intf->spec->local_address.addr); - sfd = stream_fd_new(&sock, NULL, media->call, il->local_intf); + struct socket_port_link spl = {0}; + dummy_socket(&spl.socket, &il->local_intf->spec->local_address.addr); + sfd = stream_fd_new(&spl, media->call, il->local_intf); } sfd->stream = ps; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 62ab7e6f4..7ba263cac 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1064,13 +1064,13 @@ static void release_port_push(void *p) { __C_DBG("Adding the port '%u' to late-release list", lpr->socket.local.port); t_queue_push_tail(&ports_to_release, lpr); } -static void release_port_poller(socket_t *r, ports_q *links, struct port_pool *pp, struct poller *poller) { - if (!r->local.port || r->fd == -1) +static void release_port_poller(struct socket_port_link *spl, struct poller *poller) { + if (!spl->socket.local.port || spl->socket.fd == -1) return; struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr)); - move_socket(&lpr->socket, r); - lpr->pp = pp; - lpr->pp_links = *links; + move_socket(&lpr->socket, &spl->socket); + lpr->pp = spl->pp; + lpr->pp_links = spl->links; if (!poller) release_port_push(lpr); else { @@ -1078,11 +1078,11 @@ static void release_port_poller(socket_t *r, ports_q *links, struct port_pool *p rtpe_poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr); } } -static void release_port(socket_t *r, ports_q *links, struct port_pool *pp) { - release_port_poller(r, links, pp, NULL); +static void release_port(struct socket_port_link *spl) { + release_port_poller(spl, NULL); } static void free_port(struct socket_port_link *spl) { - release_port(&spl->socket, &spl->links, spl->pp); + release_port(spl); g_free(spl); } /** @@ -3198,24 +3198,22 @@ out: static void stream_fd_free(stream_fd *f) { - release_port(&f->socket, &f->port_pool_links, &f->local_intf->spec->port_pool); + release_port(&f->spl); crypto_cleanup(&f->crypto); dtls_connection_cleanup(&f->dtls); obj_put(f->call); } -stream_fd *stream_fd_new(socket_t *fd, ports_q *links, call_t *call, struct local_intf *lif) { +stream_fd *stream_fd_new(struct socket_port_link *spl, call_t *call, struct local_intf *lif) { stream_fd *sfd; struct poller_item pi; sfd = obj_alloc0(stream_fd, stream_fd_free); sfd->unique_id = t_queue_get_length(&call->stream_fds); - sfd->socket = *fd; sfd->call = obj_get(call); sfd->local_intf = lif; - if (links) - sfd->port_pool_links = *links; + sfd->spl = *spl; t_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */ __C_DBG("stream_fd_new localport=%d", sfd->socket.local.port); @@ -3264,7 +3262,7 @@ void stream_fd_release(stream_fd *sfd) { &sfd->socket.local); // releases reference } - release_port_poller(&sfd->socket, &sfd->port_pool_links, &sfd->local_intf->spec->port_pool, sfd->poller); + release_port_poller(&sfd->spl, sfd->poller); } diff --git a/daemon/redis.c b/daemon/redis.c index 127d8d80c..bb527d201 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1382,8 +1382,6 @@ static int redis_sfds(call_t *c, struct redis_list *sfds) { socket_port_q q = TYPED_GQUEUE_INIT; unsigned int loc_uid; stream_fd *sfd; - socket_t *sock; - socket_t local_sock; int port, fd; const char *err; @@ -1418,28 +1416,23 @@ static int redis_sfds(call_t *c, struct redis_list *sfds) { if (!loc) goto err; - struct socket_port_link *spl = NULL; - ports_q *links = NULL; - if (fd != -1) { err = "failed to open ports"; if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid)) goto err; err = "no port returned"; - spl = t_queue_pop_head(&q); + struct socket_port_link *spl = t_queue_pop_head(&q); if (!spl) goto err; - sock = &spl->socket; - links = &spl->links; - set_tos(sock, c->tos); + set_tos(&spl->socket, c->tos); + sfd = stream_fd_new(spl, c, loc); + g_free(spl); } else { - sock = &local_sock; - dummy_socket(sock, &loc->spec->local_address.addr); + struct socket_port_link spl = {0}; + dummy_socket(&spl.socket, &loc->spec->local_address.addr); + sfd = stream_fd_new(&spl, c, loc); } - sfd = stream_fd_new(sock, links, c, loc); - if (spl) - g_free(spl); if (redis_hash_get_sdes_params1(&sfd->crypto.params, rh, "") == -1) return -1; diff --git a/include/media_socket.h b/include/media_socket.h index 5afb2b973..c1bf5db19 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -215,9 +215,11 @@ struct stream_fd { struct obj obj; unsigned int unique_id; /* RO */ - socket_t socket; /* RO */ + union { + socket_t socket; /* RO - alias */ + struct socket_port_link spl; /* RO */ + }; struct local_intf *local_intf; /* RO */ - ports_q port_pool_links; /* RO */ /* stream_fd object holds a reference to the call it belongs to. * Which in turn holds references to all stream_fd objects it contains, @@ -296,7 +298,7 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port); int __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigned int wanted_start_port, struct intf_spec *spec, const str *); int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media); -stream_fd *stream_fd_new(socket_t *fd, ports_q *links, call_t *call, struct local_intf *lif); +stream_fd *stream_fd_new(struct socket_port_link *, call_t *call, struct local_intf *lif); stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(stream_fd *); enum thread_looper_action release_closed_sockets(void);