MT#55283 use list pointer ops for port pool

Avoid repeated calls to the memory allocated for port pool list
management by picking out the list elements without freeing them,
storing them in the stream_fd object, and then returning them to the
list when the port is released.

Change-Id: I67cd5039e62e4d2965e85d7ba7f0454f08f40494
pull/1910/head
Richard Fuchs 10 months ago
parent bdadee5dff
commit 6276b37cd2

@ -871,16 +871,17 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne
em_il->local_intf = il->local_intf;
t_queue_push_tail(&em->intf_sfds, em_il);
socket_t *sock;
while ((sock = t_queue_pop_head(&il->list))) {
set_tos(sock, media->call->tos);
struct socket_port_link *spl;
while ((spl = t_queue_pop_head(&il->list))) {
set_tos(&spl->socket, media->call->tos);
if (media->call->cpu_affinity >= 0) {
if (socket_cpu_affinity(sock, media->call->cpu_affinity))
if (socket_cpu_affinity(&spl->socket, media->call->cpu_affinity))
ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU "
"affinity: %s", strerror(errno));
}
sfd = stream_fd_new(sock, media->call, il->local_intf);
sfd = stream_fd_new(&spl->socket, spl->link, media->call, il->local_intf);
t_queue_push_tail(&em_il->list, sfd); // not referenced
g_free(spl);
}
next_il:
@ -912,9 +913,9 @@ static void __assign_stream_fds(struct call_media *media, sfd_intf_list_q *intf_
if (!sfd) {
// create a dummy sfd. needed to hold RTCP crypto context when
// RTCP-mux is in use
socket_t *sock = g_slice_alloc(sizeof(*sock));
dummy_socket(sock, &il->local_intf->spec->local_address.addr);
sfd = stream_fd_new(sock, media->call, il->local_intf);
socket_t sock;
dummy_socket(&sock, &il->local_intf->spec->local_address.addr);
sfd = stream_fd_new(&sock, NULL, media->call, il->local_intf);
}
sfd->stream = ps;

@ -85,6 +85,7 @@ struct packet_handler_ctx {
struct late_port_release {
socket_t socket;
struct port_pool *pp;
ports_list *pp_link;
};
struct interface_stats_interval {
struct interface_stats_block stats;
@ -682,19 +683,18 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port) {
/**
* This function just (globally) reserves a port number, it doesn't provide any binding/unbinding.
*/
static void reserve_port(struct port_pool *pp,
ports_list *value_looked_up, unsigned int port) {
t_queue_delete_link(&pp->free_ports_q, value_looked_up);
static void reserve_port(struct port_pool *pp, ports_list *link) {
t_queue_unlink(&pp->free_ports_q, link);
unsigned int port = GPOINTER_TO_UINT(link->data);
free_ports_link(pp, port) = NULL;
}
/**
* This function just releases reserved port number, it doesn't provide any binding/unbinding.
*/
static void release_reserved_port(struct port_pool *pp, unsigned int port) {
t_queue_push_tail(&pp->free_ports_q, GUINT_TO_POINTER(port));
__auto_type l = pp->free_ports_q.tail;
free_ports_link(pp, port) = l;
static void release_reserved_port(struct port_pool *pp, ports_list *link) {
t_queue_push_tail_link(&pp->free_ports_q, link);
unsigned int port = GPOINTER_TO_UINT(link->data);
free_ports_link(pp, port) = link;
}
/* Append a list of free ports within the min-max range */
static void __append_free_ports_to_int(struct intf_spec *spec) {
@ -907,8 +907,10 @@ void interfaces_exclude_port(unsigned int port) {
mutex_lock(&pp->free_list_lock);
__auto_type ll = free_ports_link(pp, port);
if (ll)
reserve_port(pp, ll, port);
if (ll) {
reserve_port(pp, ll);
t_list_free(ll);
}
mutex_unlock(&pp->free_list_lock);
}
}
@ -961,12 +963,13 @@ static void release_port_push(void *p) {
__C_DBG("Adding the port '%u' to late-release list", lpr->socket.local.port);
t_queue_push_tail(&ports_to_release, lpr);
}
static void release_port_poller(socket_t *r, struct port_pool *pp, struct poller *poller) {
static void release_port_poller(socket_t *r, ports_list *link, struct port_pool *pp, struct poller *poller) {
if (!r->local.port || r->fd == -1)
return;
struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr));
move_socket(&lpr->socket, r);
lpr->pp = pp;
lpr->pp_link = link;
if (!poller)
release_port_push(lpr);
else {
@ -974,18 +977,18 @@ static void release_port_poller(socket_t *r, struct port_pool *pp, struct poller
rtpe_poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr);
}
}
static void release_port(socket_t *r, struct port_pool *pp) {
release_port_poller(r, pp, NULL);
static void release_port(socket_t *r, ports_list *link, struct port_pool *pp) {
release_port_poller(r, link, pp, NULL);
}
static void free_port(socket_t *r, struct port_pool *pp) {
release_port(r, pp);
g_slice_free1(sizeof(*r), r);
static void free_port(struct socket_port_link *spl, struct port_pool *pp) {
release_port(&spl->socket, spl->link, pp);
g_free(spl);
}
/**
* Logic responsible for devastating the `ports_to_release` queue.
* It's being called by main poller.
*/
static void release_port_now(socket_t *r, struct port_pool *pp) {
static void release_port_now(socket_t *r, ports_list *link, struct port_pool *pp) {
unsigned int port = r->local.port;
__C_DBG("Trying to release the port '%u'", port);
@ -997,7 +1000,7 @@ static void release_port_now(socket_t *r, struct port_pool *pp) {
/* first return the engaged port back */
mutex_lock(&pp->free_list_lock);
release_reserved_port(pp, port);
release_reserved_port(pp, link);
mutex_unlock(&pp->free_list_lock);
} else {
ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port);
@ -1022,7 +1025,7 @@ enum thread_looper_action release_closed_sockets(void) {
mutex_unlock(&ports_to_release_glob_lock);
while ((lpr = t_queue_pop_head(&ports_left))) {
release_port_now(&lpr->socket, lpr->pp);
release_port_now(&lpr->socket, lpr->pp_link, lpr->pp);
g_slice_free1(sizeof(*lpr), lpr);
}
}
@ -1046,12 +1049,12 @@ void append_thread_lpr_to_glob_lpr(void) {
* @param spec, interface specifications
* @param out, a list of sockets for this particular session (not a global list)
*/
int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int wanted_start_port,
int __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigned int wanted_start_port,
struct intf_spec *spec, const str *label)
{
unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0;
socket_t * sk;
GQueue ports_to_engage = G_QUEUE_INIT; /* usually it's only one RTCP port, theoretically can be more */
ports_list *port_link = NULL;
ports_q ports_to_engage = TYPED_GQUEUE_INIT; /* usually it's only one RTCP port, theoretically can be more */
struct port_pool * pp = &spec->port_pool; /* port pool for a given local interface */
ports_q *free_ports_q;
@ -1080,14 +1083,14 @@ int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int
if (wanted_start_port > 0) {
ilog(LOG_DEBUG, "A specific port value is requested, wanted_start_port: '%d'", wanted_start_port);
mutex_lock(&pp->free_list_lock);
__auto_type l = free_ports_link(pp, wanted_start_port);
if (!l) {
port_link = free_ports_link(pp, wanted_start_port);
if (!port_link) {
/* if engaged already, just select any other (so default logic) */
ilog(LOG_WARN, "This requested port has been already engaged, can't take it.");
wanted_start_port = 0; /* take what is proposed by FIFO instead */
} else {
/* we got the port, and we are sure it wasn't engaged */
reserve_port(pp, l, wanted_start_port);
reserve_port(pp, port_link);
port = wanted_start_port;
}
mutex_unlock(&pp->free_list_lock);
@ -1135,21 +1138,23 @@ new_cycle:
* Then additionally make sure that the RTCP port can also be engaged, if needed.
*/
mutex_lock(&pp->free_list_lock);
port = GPOINTER_TO_UINT(t_queue_pop_head(free_ports_q)); /* RTP */
port_link = t_queue_pop_head_link(free_ports_q);
if (!port) {
if (!port_link) {
mutex_unlock(&pp->free_list_lock);
ilog(LOG_ERR, "Failure while trying to get a port from the list");
goto fail;
}
free_ports_link(pp, port) = NULL; /* RTP */
port = GPOINTER_TO_UINT(port_link->data); /* RTP */
free_ports_link(pp, port) = NULL;
mutex_unlock(&pp->free_list_lock);
/* ports for RTP must be even, if there is an additional port for RTCP */
if (num_ports > 1 && (port & 1)) {
/* return port for RTP back and try again */
mutex_lock(&pp->free_list_lock);
release_reserved_port(pp, port);
release_reserved_port(pp, port_link);
mutex_unlock(&pp->free_list_lock);
goto new_cycle;
}
@ -1161,31 +1166,30 @@ new_cycle:
additional_port++;
mutex_lock(&pp->free_list_lock);
__auto_type l = additional_port <= pp->max ? free_ports_link(pp, additional_port) : NULL;
__auto_type add_link = additional_port <= pp->max ? free_ports_link(pp, additional_port) : NULL;
if (!l) {
if (!add_link) {
/* return port for RTP back and try again */
release_reserved_port(pp, port);
release_reserved_port(pp, port_link);
mutex_unlock(&pp->free_list_lock);
/* check if we managed to enagage anything in previous for-cycles */
while ((additional_port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage))))
while ((add_link = t_queue_pop_head_link(&ports_to_engage)))
{
mutex_lock(&pp->free_list_lock);
/* return additional ports back */
release_reserved_port(pp, additional_port);
release_reserved_port(pp, add_link);
mutex_unlock(&pp->free_list_lock);
}
goto new_cycle;
}
} else {
/* engage this port right away */
reserve_port(pp, l, additional_port);
mutex_unlock(&pp->free_list_lock);
/* engage this port right away */
reserve_port(pp, add_link);
mutex_unlock(&pp->free_list_lock);
/* track for which additional ports, we have to open sockets */
g_queue_push_tail(&ports_to_engage, GUINT_TO_POINTER(additional_port));
}
/* track for which additional ports, we have to open sockets */
t_queue_push_tail_link(&ports_to_engage, add_link);
}
}
@ -1193,22 +1197,25 @@ new_cycle:
allocation_attempts);
/* at this point we consider all things before as successfull. Now just add the RTP port */
g_queue_push_head(&ports_to_engage, GUINT_TO_POINTER(port));
t_queue_push_head_link(&ports_to_engage, port_link);
while ((port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage))))
struct socket_port_link *spl;
while ((port_link = t_queue_pop_head_link(&ports_to_engage)))
{
port = GPOINTER_TO_UINT(port_link->data);
ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port);
sk = g_slice_alloc0(sizeof(*sk));
sk->fd = -1;
t_queue_push_tail(out, sk);
spl = g_new0(struct socket_port_link, 1);
spl->socket.fd = -1;
spl->link = port_link;
t_queue_push_tail(out, spl);
/* if not possible to engage this socket, try to reallocate it again */
if (!add_socket(sk, port, spec, label)) {
if (!add_socket(&spl->socket, port, spec, label)) {
/* if something has been left in the `ports_to_engage` queue, release it right away */
while ((port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage))))
while ((port_link = t_queue_pop_head(&ports_to_engage)))
{
mutex_lock(&pp->free_list_lock);
release_reserved_port(pp, port);
release_reserved_port(pp, port_link);
mutex_unlock(&pp->free_list_lock);
}
/* ports which are already bound to a socket, will be freed by `free_port()` */
@ -1221,8 +1228,8 @@ new_cycle:
release_restart:
/* release all previously engaged sockets */
while ((sk = t_queue_pop_head(out)))
free_port(sk, pp); /* engaged ports will be released here */
while ((spl = t_queue_pop_head(out)))
free_port(spl, pp); /* engaged ports will be released here */
/* do not re-try for specifically wanted ports */
if (wanted_start_port > 0)
@ -1294,10 +1301,10 @@ error_ports:
}
void free_socket_intf_list(struct socket_intf_list *il) {
socket_t *sock;
struct socket_port_link *spl;
while ((sock = t_queue_pop_head(&il->list)))
free_port(sock, &il->local_intf->spec->port_pool);
while ((spl = t_queue_pop_head(&il->list)))
free_port(spl, &il->local_intf->spec->port_pool);
g_slice_free1(sizeof(*il), il);
}
void free_sfd_intf_list(struct sfd_intf_list *il) {
@ -3110,14 +3117,14 @@ out:
static void stream_fd_free(stream_fd *f) {
release_port(&f->socket, &f->local_intf->spec->port_pool);
release_port(&f->socket, f->port_pool_link, &f->local_intf->spec->port_pool);
crypto_cleanup(&f->crypto);
dtls_connection_cleanup(&f->dtls);
obj_put(f->call);
}
stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) {
stream_fd *stream_fd_new(socket_t *fd, ports_list *link, call_t *call, struct local_intf *lif) {
stream_fd *sfd;
struct poller_item pi;
@ -3126,8 +3133,8 @@ stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) {
sfd->socket = *fd;
sfd->call = obj_get(call);
sfd->local_intf = lif;
sfd->port_pool_link = link;
t_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */
g_slice_free1(sizeof(*fd), fd); /* moved into sfd, thus free */
__C_DBG("stream_fd_new localport=%d", sfd->socket.local.port);
@ -3175,7 +3182,7 @@ void stream_fd_release(stream_fd *sfd) {
&sfd->socket.local); // releases reference
}
release_port_poller(&sfd->socket, &sfd->local_intf->spec->port_pool, sfd->poller);
release_port_poller(&sfd->socket, sfd->port_pool_link, &sfd->local_intf->spec->port_pool, sfd->poller);
}

@ -1381,10 +1381,11 @@ static int redis_sfds(call_t *c, struct redis_list *sfds) {
sockfamily_t *fam;
struct logical_intf *lif;
struct local_intf *loc;
socket_q q = TYPED_GQUEUE_INIT;
socket_port_q q = TYPED_GQUEUE_INIT;
unsigned int loc_uid;
stream_fd *sfd;
socket_t *sock;
socket_t local_sock;
int port, fd;
const char *err;
@ -1419,21 +1420,28 @@ static int redis_sfds(call_t *c, struct redis_list *sfds) {
if (!loc)
goto err;
struct socket_port_link *spl = NULL;
ports_list *link = NULL;
if (fd != -1) {
err = "failed to open ports";
if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid))
goto err;
err = "no port returned";
sock = t_queue_pop_head(&q);
if (!sock)
spl = t_queue_pop_head(&q);
if (!spl)
goto err;
sock = &spl->socket;
link = spl->link;
set_tos(sock, c->tos);
}
else {
sock = g_slice_alloc(sizeof(*sock));
sock = &local_sock;
dummy_socket(sock, &loc->spec->local_address.addr);
}
sfd = stream_fd_new(sock, c, loc);
sfd = stream_fd_new(sock, link, c, loc);
if (spl)
g_free(spl);
if (redis_hash_get_sdes_params1(&sfd->crypto.params, rh, "") == -1)
return -1;

@ -88,6 +88,11 @@ struct logical_intf {
typedef void port_t;
TYPED_GQUEUE(ports, port_t)
struct socket_port_link {
socket_t socket;
ports_list *link;
};
struct port_pool {
unsigned int min, max;
@ -161,6 +166,8 @@ void interface_sampled_rate_stats_destroy(struct interface_sampled_rate_stats *)
struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_sampled_rate_stats *s,
struct local_intf *lif, long long *time_diff_us);
TYPED_GQUEUE(socket_port, struct socket_port_link)
struct local_intf {
struct intf_spec *spec;
struct intf_address advertised_address;
@ -172,13 +179,13 @@ struct local_intf {
};
struct socket_intf_list {
struct local_intf *local_intf;
socket_q list;
socket_port_q list;
};
struct sfd_intf_list {
struct local_intf *local_intf;
stream_fd_q list;
};
TYPED_GQUEUE(socket_intf_list, struct socket_intf_list)
TYPED_GQUEUE(socket_intf_list, struct socket_intf_list) /* RO */
TYPED_GQUEUE(sfd_intf_list, struct sfd_intf_list)
/**
@ -205,6 +212,7 @@ struct stream_fd {
unsigned int unique_id; /* RO */
socket_t socket; /* RO */
struct local_intf *local_intf; /* RO */
ports_list *port_pool_link; /* RO */
/* stream_fd object holds a reference to the call it belongs to.
* Which in turn holds references to all stream_fd objects it contains,
@ -213,7 +221,7 @@ struct stream_fd {
* The call is only released when it has been dissociated from all stream_fd objects,
* which happens during call teardown.
*/
call_t *call; /* RO */
call_t *call; /* RO */
struct packet_stream *stream; /* LOCK: call->master_lock */
struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */
struct dtls_connection dtls; /* LOCK: stream->in_lock */
@ -280,13 +288,10 @@ struct local_intf *get_any_interface_address(const struct logical_intf *lif, soc
void interfaces_exclude_port(unsigned int port);
int is_local_endpoint(const struct intf_address *addr, unsigned int port);
//int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const call_t *c);
//void release_port(socket_t *r, const struct local_intf *);
int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int wanted_start_port,
int __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigned int wanted_start_port,
struct intf_spec *spec, const str *);
int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media);
stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif);
stream_fd *stream_fd_new(socket_t *fd, ports_list *link, call_t *call, struct local_intf *lif);
stream_fd *stream_fd_lookup(const endpoint_t *);
void stream_fd_release(stream_fd *);
enum thread_looper_action release_closed_sockets(void);

Loading…
Cancel
Save