MT#55283 add helpers for reserve_port

Add helper functions wrapping around reserve_port which directly return
a socket_port_link object. This allows us to rework
__get_consecutive_ports and release_reserved_port to eliminate mixing
different port numbers in the same list, which makes for cleaner code.

Change-Id: Ia9808f71c854f6efd0e93ce8df4527c4aea311bd
pull/1918/head
Richard Fuchs 10 months ago
parent 8ab206c26a
commit 7a4675c776

@ -682,7 +682,7 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port) {
return 0; return 0;
} }
static void release_reserved_port(struct port_pool *pp, ports_q *); static void release_reserved_port(struct port_pool *pp, ports_q *, unsigned int port);
static void reserve_additional_port_links(ports_q *ret, struct port_pool *pp, unsigned int port) { static void reserve_additional_port_links(ports_q *ret, struct port_pool *pp, unsigned int port) {
for (__auto_type l = pp->overlaps.head; l; l = l->next) { for (__auto_type l = pp->overlaps.head; l; l = l->next) {
@ -707,7 +707,7 @@ bail:
// Oops. Some spec didn't have the port available. Probably a race condition. // Oops. Some spec didn't have the port available. Probably a race condition.
// Return everything to its place and report failure by resetting the output // Return everything to its place and report failure by resetting the output
// list to empty. // list to empty.
release_reserved_port(pp, ret); release_reserved_port(pp, ret, port);
} }
/** /**
@ -739,20 +739,10 @@ static ports_q reserve_port(struct port_pool *pp, unsigned int port) {
/** /**
* This function just releases reserved port number, it doesn't provide any binding/unbinding. * This function just releases reserved port number, it doesn't provide any binding/unbinding.
*/ */
static void release_reserved_port(struct port_pool *pp, ports_q *list) { static void release_reserved_port(struct port_pool *pp, ports_q *list, unsigned int port) {
// the list contains links in order:
// first port for port pool
// first port for first overlap pool
// first port for second overlap pool
// first port ...
// second port for port pool
// second port for first overlap pool
// ...
while (list->length) { while (list->length) {
// remove top link from list, which belongs to our port pool // remove top link from list, which belongs to our port pool
__auto_type link = t_queue_pop_head_link(list); __auto_type link = t_queue_pop_head_link(list);
unsigned int port = GPOINTER_TO_UINT(link->data);
{ {
LOCK(&pp->free_list_lock); LOCK(&pp->free_list_lock);
@ -779,6 +769,16 @@ static void release_reserved_port(struct port_pool *pp, ports_q *list) {
} }
} }
} }
static void release_reserved_ports(socket_port_q *ports) {
while (ports->length) {
__auto_type p = t_queue_pop_head(ports);
if (p->links.length)
release_reserved_port(p->pp, &p->links, GPOINTER_TO_UINT(p->links.head->data));
g_free(p);
}
}
/* Append a list of free ports within the min-max range */ /* Append a list of free ports within the min-max range */
static void __append_free_ports_to_int(struct intf_spec *spec) { static void __append_free_ports_to_int(struct intf_spec *spec) {
unsigned int ports_amount, count; unsigned int ports_amount, count;
@ -1108,7 +1108,7 @@ static void release_port_now(socket_t *r, ports_q *list, struct port_pool *pp) {
iptables_del_rule(r); iptables_del_rule(r);
/* first return the engaged port back */ /* first return the engaged port back */
release_reserved_port(pp, list); release_reserved_port(pp, list, port);
} else { } else {
ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port); ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port);
} }
@ -1148,6 +1148,34 @@ void append_thread_lpr_to_glob_lpr(void) {
mutex_unlock(&ports_to_release_glob_lock); mutex_unlock(&ports_to_release_glob_lock);
} }
static struct socket_port_link get_one_port_link(unsigned int port, struct intf_spec *spec) {
__auto_type links = reserve_port(&spec->port_pool, port);
return (struct socket_port_link) { .links = links, .pp = &spec->port_pool, .socket = { .fd = -1 }};
}
static struct socket_port_link get_any_port_link(struct intf_spec *spec) {
struct socket_port_link ret = { .pp = &spec->port_pool, .socket = { .fd = -1 } };
struct port_pool *pp = &spec->port_pool;
unsigned int port;
{
// get/reserve port and its primary port link
LOCK(&pp->free_list_lock);
__auto_type port_link = t_queue_pop_head_link(&pp->free_ports_q);
if (!port_link)
return ret;
port = GPOINTER_TO_UINT(port_link->data);
free_ports_link(pp, port) = NULL;
t_queue_push_tail_link(&ret.links, port_link);
}
reserve_additional_port_links(&ret.links, &spec->port_pool, port);
return ret;
}
/** /**
* Puts a list of `socket_t` objects into the `out`. * Puts a list of `socket_t` objects into the `out`.
* *
@ -1160,8 +1188,6 @@ bool __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigne
struct intf_spec *spec, const str *label) struct intf_spec *spec, const str *label)
{ {
unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0; unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0;
ports_q all_ports = TYPED_GQUEUE_INIT;
ports_q ports_to_engage = TYPED_GQUEUE_INIT; /* usually it's only one RTCP port, theoretically can be more */
struct port_pool * pp = &spec->port_pool; /* port pool for a given local interface */ struct port_pool * pp = &spec->port_pool; /* port pool for a given local interface */
ports_q *free_ports_q; ports_q *free_ports_q;
@ -1189,14 +1215,17 @@ bool __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigne
/* specifically requested port */ /* specifically requested port */
if (wanted_start_port > 0) { if (wanted_start_port > 0) {
ilog(LOG_DEBUG, "A specific port value is requested, wanted_start_port: '%d'", wanted_start_port); ilog(LOG_DEBUG, "A specific port value is requested, wanted_start_port: '%d'", wanted_start_port);
all_ports = reserve_port(pp, wanted_start_port); __auto_type spl = get_one_port_link(wanted_start_port, spec);
if (!all_ports.length) { if (!spl.links.length) {
/* if engaged already, just select any other (so default logic) */ /* if engaged already, just select any other (so default logic) */
ilog(LOG_WARN, "This requested port has been already engaged, can't take it."); ilog(LOG_WARN, "This requested port has been already engaged, can't take it.");
wanted_start_port = 0; /* take what is proposed by FIFO instead */ wanted_start_port = 0; /* take what is proposed by FIFO instead */
} else { } else {
/* we got the port, and we are sure it wasn't engaged */ /* we got the port, and we are sure it wasn't engaged */
port = wanted_start_port; port = wanted_start_port;
__auto_type splp = g_new(struct socket_port_link, 1);
*splp = spl;
t_queue_push_tail(out, splp);
} }
} }
@ -1241,76 +1270,63 @@ new_cycle:
/* Now only get first possible port for RTP. /* Now only get first possible port for RTP.
* Then additionally make sure that the RTCP port can also be engaged, if needed. * Then additionally make sure that the RTCP port can also be engaged, if needed.
*/ */
mutex_lock(&pp->free_list_lock); __auto_type spl = get_any_port_link(spec);
__auto_type port_link = t_queue_pop_head_link(free_ports_q); if (!spl.links.length) {
if (!port_link) {
mutex_unlock(&pp->free_list_lock);
ilog(LOG_ERR, "Failure while trying to get a port from the list"); ilog(LOG_ERR, "Failure while trying to get a port from the list");
goto fail; goto fail;
} }
port = GPOINTER_TO_UINT(port_link->data); /* RTP */ port = GPOINTER_TO_UINT(spl.links.head->data); /* RTP */
free_ports_link(pp, port) = NULL;
mutex_unlock(&pp->free_list_lock);
t_queue_push_tail_link(&all_ports, port_link);
/* ports for RTP must be even, if there is an additional port for RTCP */ /* ports for RTP must be even, if there is an additional port for RTCP */
if (num_ports > 1 && (port & 1)) { if (num_ports > 1 && (port & 1)) {
/* return port for RTP back and try again */ /* return port for RTP back and try again */
release_reserved_port(pp, &all_ports); release_reserved_port(pp, &spl.links, port);
goto new_cycle; continue;
} }
__auto_type splp = g_new(struct socket_port_link, 1);
*splp = spl;
t_queue_push_tail(out, splp);
/* find additional ports, usually it's only RTCP */ /* find additional ports, usually it's only RTCP */
additional_port = port; additional_port = port;
for (int i = 1; i < num_ports; i++) for (int i = 1; i < num_ports; i++)
{ {
additional_port++; additional_port++;
__auto_type add_port = reserve_port(pp, additional_port); spl = get_one_port_link(additional_port, spec);
if (!add_port.length) { if (!spl.links.length) {
/* return port for RTP back and try again */ /* return previously reserved ports and try again */
release_reserved_port(pp, &all_ports); release_reserved_ports(out);
/* return additional ports back */ /* return additional port back */
release_reserved_port(pp, &ports_to_engage); release_reserved_port(pp, &spl.links, additional_port);
goto new_cycle; goto new_cycle;
} }
/* engage this port right away */ /* engage this port right away */
/* track for which additional ports, we have to open sockets */ /* track for which additional ports, we have to open sockets */
t_queue_move(&ports_to_engage, &add_port); splp = g_new(struct socket_port_link, 1);
*splp = spl;
t_queue_push_tail(out, splp);
} }
} }
ilog(LOG_DEBUG, "Trying to bind the socket for RTP/RTCP ports (allocation attempt = '%d')", ilog(LOG_DEBUG, "Trying to bind the socket for RTP/RTCP ports (allocation attempt = '%d')",
allocation_attempts); allocation_attempts);
/* at this point we consider all things before as successfull. Now just add the RTP port */ /* at this point we consider all things before as successful */
t_queue_move(&all_ports, &ports_to_engage);
struct socket_port_link *spl; for (__auto_type l = out->head; l; l = l->next) {
while (all_ports.length) { __auto_type spl = l->data;
__auto_type port_link = t_queue_pop_head_link(&all_ports); port = GPOINTER_TO_UINT(spl->links.head->data);
port = GPOINTER_TO_UINT(port_link->data);
ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port); ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port);
spl = g_new0(struct socket_port_link, 1);
spl->socket.fd = -1;
spl->pp = pp;
t_queue_push_tail_link(&spl->links, port_link);
t_queue_push_tail(out, spl);
// append other links belonging to the same port
while (all_ports.length && GPOINTER_TO_UINT(t_queue_peek_head(&all_ports)) == port) {
port_link = t_queue_pop_head_link(&all_ports);
t_queue_push_tail_link(&spl->links, port_link);
}
/* if not possible to engage this socket, try to reallocate it again */ /* if not possible to engage this socket, try to reallocate it again */
if (!add_socket(&spl->socket, port, spec, label)) { if (!add_socket(&spl->socket, port, spec, label)) {
/* if something has been left in the `ports_to_engage` queue, release it right away */ /* if something has been left in the `ports_to_engage` queue, release it right away */
release_reserved_port(pp, &all_ports); release_reserved_ports(out);
/* ports which are already bound to a socket, will be freed by `free_port()` */ /* ports which are already bound to a socket, will be freed by `free_port()` */
goto release_restart; goto release_restart;
} }
@ -1320,10 +1336,6 @@ new_cycle:
break; break;
release_restart: release_restart:
/* release all previously engaged sockets */
while ((spl = t_queue_pop_head(out)))
free_port(spl); /* engaged ports will be released here */
/* do not re-try for specifically wanted ports */ /* do not re-try for specifically wanted ports */
if (wanted_start_port > 0) if (wanted_start_port > 0)
goto fail; goto fail;
@ -1332,8 +1344,10 @@ release_restart:
} }
/* success */ /* success */
ilog(LOG_DEBUG, "Opened a socket on port '%u' (on interface '%s') for a media relay", ilog(LOG_DEBUG, "Opened %u socket(s) from port '%u' (on interface '%s') for a media relay",
((socket_t *) out->head->data)->local.port, sockaddr_print_buf(&spec->local_address.addr)); num_ports,
out->head->data->socket.local.port,
sockaddr_print_buf(&spec->local_address.addr));
return true; return true;
fail: fail:

Loading…
Cancel
Save