MT#55283 use typed GQueue for socket/sfd lists

Change-Id: I0d61e0c8d9ccc23b038dd560d8ff876e550d2c05
pull/1776/head
Richard Fuchs 2 years ago
parent 3114510c12
commit 73f1037720

@ -668,7 +668,7 @@ static int __media_want_interfaces(struct call_media *media) {
}
static void __endpoint_map_truncate(struct endpoint_map *em, unsigned int num_intfs) {
while (em->intf_sfds.length > num_intfs) {
struct sfd_intf_list *il = g_queue_pop_tail(&em->intf_sfds);
struct sfd_intf_list *il = t_queue_pop_tail(&em->intf_sfds);
free_sfd_intf_list(il);
}
}
@ -682,9 +682,9 @@ static struct endpoint_map *__hunt_endpoint_map(struct call_media *media, unsign
continue;
// any of our sockets shut down?
for (GList *k = em->intf_sfds.head; k; k = k->next) {
for (__auto_type k = em->intf_sfds.head; k; k = k->next) {
struct sfd_intf_list *il = k->data;
for (GList *j = il->list.head; j; j = j->next) {
for (__auto_type j = il->list.head; j; j = j->next) {
struct stream_fd *sfd = j->data;
if (sfd->socket.fd == -1)
return NULL;
@ -722,7 +722,7 @@ static struct endpoint_map *__hunt_endpoint_map(struct call_media *media, unsign
/* endpoint matches, but not enough ports. flush existing ports
* and allocate a new set. */
__C_DBG("endpoint matches, doesn't have enough ports");
g_queue_clear_full(&em->intf_sfds, (void *) free_intf_list);
t_queue_clear_full(&em->intf_sfds, free_sfd_intf_list);
return em;
}
@ -755,7 +755,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne
const struct endpoint *ep, const sdp_ng_flags *flags, bool always_reuse)
{
struct stream_fd *sfd;
GQueue intf_sockets = G_QUEUE_INIT;
socket_intf_list_q intf_sockets = TYPED_GQUEUE_INIT;
unsigned int want_interfaces = __media_want_interfaces(media);
bool port_latching = false;
@ -786,7 +786,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne
em->wildcard = 1;
em->logical_intf = media->logical_intf;
em->num_ports = num_ports;
g_queue_init(&em->intf_sfds);
t_queue_init(&em->intf_sfds);
g_queue_push_tail(&media->endpoint_maps, em);
}
@ -798,16 +798,16 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne
__C_DBG("allocating stream_fds for %u ports", num_ports);
struct socket_intf_list *il;
while ((il = g_queue_pop_head(&intf_sockets))) {
while ((il = t_queue_pop_head(&intf_sockets))) {
if (il->list.length != num_ports)
goto next_il;
struct sfd_intf_list *em_il = g_slice_alloc0(sizeof(*em_il));
em_il->local_intf = il->local_intf;
g_queue_push_tail(&em->intf_sfds, em_il);
t_queue_push_tail(&em->intf_sfds, em_il);
socket_t *sock;
while ((sock = g_queue_pop_head(&il->list))) {
while ((sock = t_queue_pop_head(&il->list))) {
set_tos(sock, media->call->tos);
if (media->call->cpu_affinity >= 0) {
if (socket_cpu_affinity(sock, media->call->cpu_affinity))
@ -815,7 +815,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne
"affinity: %s", strerror(errno));
}
sfd = stream_fd_new(sock, media->call, il->local_intf);
g_queue_push_tail(&em_il->list, sfd); // not referenced
t_queue_push_tail(&em_il->list, sfd); // not referenced
}
next_il:
@ -825,7 +825,7 @@ next_il:
return em;
}
static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) {
static void __assign_stream_fds(struct call_media *media, sfd_intf_list_q *intf_sfds) {
int reset_ice = 0;
for (GList *k = media->streams.head; k; k = k->next) {
@ -838,10 +838,10 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) {
bool sfd_found = false;
struct stream_fd *intf_sfd = NULL;
for (GList *l = intf_sfds->head; l; l = l->next) {
for (__auto_type l = intf_sfds->head; l; l = l->next) {
struct sfd_intf_list *il = l->data;
struct stream_fd *sfd = g_queue_peek_nth(&il->list, ps->component - 1);
struct stream_fd *sfd = t_queue_peek_nth(&il->list, ps->component - 1);
if (!sfd)
sfd = ps->selected_sfd;
if (!sfd) {
@ -3913,7 +3913,7 @@ static void __call_free(void *p) {
while (c->endpoint_maps.head) {
em = g_queue_pop_head(&c->endpoint_maps);
g_queue_clear_full(&em->intf_sfds, (void *) free_intf_list);
t_queue_clear_full(&em->intf_sfds, free_sfd_intf_list);
g_slice_free1(sizeof(*em), em);
}

@ -1011,7 +1011,7 @@ void append_thread_lpr_to_glob_lpr(void) {
* @param spec, interface specifications
* @param out, a list of sockets for this particular session (not a global list)
*/
int __get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int wanted_start_port,
int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int wanted_start_port,
struct intf_spec *spec, const str *label)
{
unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0;
@ -1167,7 +1167,7 @@ new_cycle:
ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port);
sk = g_slice_alloc0(sizeof(*sk));
sk->fd = -1;
g_queue_push_tail(out, sk);
t_queue_push_tail(out, sk);
/* if not possible to engage this socket, try to reallocate it again */
if (add_socket(sk, port, spec, label)) {
@ -1188,7 +1188,7 @@ new_cycle:
release_restart:
/* release all previously engaged sockets */
while ((sk = g_queue_pop_head(out)))
while ((sk = t_queue_pop_head(out)))
free_port(sk, spec); /* engaged ports will be released here */
/* do not re-try for specifically wanted ports */
@ -1210,7 +1210,7 @@ fail:
}
/* puts a list of "struct intf_list" into "out", containing socket_t list */
int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media)
int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media)
{
GList *l;
struct socket_intf_list *il;
@ -1237,7 +1237,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_
il = g_slice_alloc0(sizeof(*il));
il->local_intf = loc;
g_queue_push_tail(out, il);
t_queue_push_tail(out, il);
if (G_LIKELY(!__get_consecutive_ports(&il->list, num_ports, 0, loc->spec, label))) {
// success - found available ports on local interfaces, so far
continue;
@ -1254,7 +1254,7 @@ error_ports:
num_ports, STR_FMT(&log->name));
// free all ports alloc'ed so far for the previous local interfaces
while ((il = g_queue_pop_head(out))) {
while ((il = t_queue_pop_head(out))) {
free_socket_intf_list(il);
}
@ -1264,16 +1264,16 @@ error_ports:
void free_socket_intf_list(struct socket_intf_list *il) {
socket_t *sock;
while ((sock = g_queue_pop_head(&il->list)))
while ((sock = t_queue_pop_head(&il->list)))
free_port(sock, il->local_intf->spec);
g_slice_free1(sizeof(*il), il);
}
void free_intf_list(struct socket_intf_list *il) {
g_queue_clear(&il->list);
void free_sfd_intf_list(struct sfd_intf_list *il) {
t_queue_clear(&il->list);
g_slice_free1(sizeof(*il), il);
}
void free_sfd_intf_list(struct sfd_intf_list *il) {
g_queue_clear_full(&il->list, (GDestroyNotify) stream_fd_release);
void free_release_sfd_intf_list(struct sfd_intf_list *il) {
t_queue_clear_full(&il->list, stream_fd_release);
g_slice_free1(sizeof(*il), il);
}

@ -39,6 +39,7 @@ typedef union {
GQueue *q;
stream_fd_q *sfds_q;
GPtrArray *pa;
sfd_intf_list_q *siq;
void *v;
} callback_arg_t __attribute__ ((__transparent_union__));
@ -1379,7 +1380,7 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) {
sockfamily_t *fam;
struct logical_intf *lif;
struct local_intf *loc;
GQueue q = G_QUEUE_INIT;
socket_q q = TYPED_GQUEUE_INIT;
unsigned int loc_uid;
struct stream_fd *sfd;
socket_t *sock;
@ -1422,7 +1423,7 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) {
if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid))
goto err;
err = "no port returned";
sock = g_queue_pop_head(&q);
sock = t_queue_pop_head(&q);
if (!sock)
goto err;
set_tos(sock, c->tos);
@ -1621,7 +1622,7 @@ static int redis_maps(struct call *c, struct redis_list *maps) {
/* from call.c:__get_endpoint_map() */
em = uid_slice_alloc0(em, &c->endpoint_maps);
g_queue_init(&em->intf_sfds);
t_queue_init(&em->intf_sfds);
em->wildcard = redis_hash_get_bool_flag(rh, "wildcard");
if (redis_hash_get_unsigned(&em->num_ports, rh, "num_ports"))
@ -1853,7 +1854,7 @@ static int json_link_medias(struct call *c, struct redis_list *medias,
g_hash_table_insert(med->monologue->media_ids, &med->media_id, med);
/* find the pair media to subscribe */
if (!json_build_list_cb((callback_arg_t) NULL, c, "media-subscriptions", med->unique_id,
if (!json_build_list_cb(NULL, c, "media-subscriptions", med->unique_id,
medias, rbl_subs_cb, med, root_reader))
{
rlog(LOG_DEBUG, "Restored media subscriptions for: '" STR_FORMAT_M "'", STR_FMT_M(&med->monologue->tag));
@ -1863,7 +1864,7 @@ static int json_link_medias(struct call *c, struct redis_list *medias,
}
static int rbl_cb_intf_sfds(str *s, callback_arg_t qp, struct redis_list *list, void *ptr) {
GQueue *q = qp.q;
sfd_intf_list_q *q = qp.siq;
int i;
struct sfd_intf_list *il;
struct endpoint_map *em;
@ -1876,11 +1877,11 @@ static int rbl_cb_intf_sfds(str *s, callback_arg_t qp, struct redis_list *list,
il->local_intf = g_queue_peek_nth((GQueue*) &em->logical_intf->list, i);
if (!il->local_intf)
return -1;
g_queue_push_tail(q, il);
t_queue_push_tail(q, il);
return 0;
}
il = g_queue_peek_tail(q);
il = t_queue_peek_tail(q);
if (!il)
return -1;
@ -1888,7 +1889,7 @@ static int rbl_cb_intf_sfds(str *s, callback_arg_t qp, struct redis_list *list,
if (G_UNLIKELY(!sfd))
return -1;
g_queue_push_tail(&il->list, sfd);
t_queue_push_tail(&il->list, sfd);
return 0;
}
@ -1901,7 +1902,7 @@ static int json_link_maps(struct call *c, struct redis_list *maps,
for (i = 0; i < maps->len; i++) {
em = maps->ptrs[i];
if (json_build_list_cb((callback_arg_t) &em->intf_sfds, c, "map_sfds", em->unique_id, sfds,
if (json_build_list_cb(&em->intf_sfds, c, "map_sfds", em->unique_id, sfds,
rbl_cb_intf_sfds, em, root_reader))
return -1;
}
@ -2661,10 +2662,10 @@ char* redis_encode_json(struct call *c) {
snprintf(tmp, sizeof(tmp), "map_sfds-%u", ep->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (GList *m = ep->intf_sfds.head; m; m = m->next) {
for (__auto_type m = ep->intf_sfds.head; m; m = m->next) {
struct sfd_intf_list *il = m->data;
JSON_ADD_STRING("loc-%u", il->local_intf->unique_id);
for (GList *n = il->list.head; n; n = n->next) {
for (__auto_type n = il->list.head; n; n = n->next) {
struct stream_fd *sfd = n->data;
JSON_ADD_STRING("%u", sfd->unique_id);
}

@ -347,7 +347,7 @@ struct endpoint_map {
struct endpoint endpoint;
unsigned int num_ports;
const struct logical_intf *logical_intf;
GQueue intf_sfds; /* list of struct sfd_intf_list - contains stream_fd list */
sfd_intf_list_q intf_sfds; /* list of struct sfd_intf_list - contains stream_fd list */
unsigned int wildcard:1;
};

@ -193,12 +193,14 @@ struct local_intf {
};
struct socket_intf_list {
struct local_intf *local_intf;
GQueue list;
socket_q list;
};
struct sfd_intf_list {
struct local_intf *local_intf;
GQueue list;
stream_fd_q list;
};
TYPED_GQUEUE(socket_intf_list, struct socket_intf_list)
TYPED_GQUEUE(sfd_intf_list, struct sfd_intf_list)
/**
* stream_fd is an entry-point object for RTP packets handling,
@ -301,17 +303,17 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port);
//int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const struct call *c);
//void release_port(socket_t *r, const struct local_intf *);
int __get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int wanted_start_port,
int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int wanted_start_port,
struct intf_spec *spec, const str *);
int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media);
int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media);
struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_intf *lif);
struct stream_fd *stream_fd_lookup(const endpoint_t *);
void stream_fd_release(struct stream_fd *);
enum thread_looper_action release_closed_sockets(void);
void append_thread_lpr_to_glob_lpr(void);
void free_intf_list(struct socket_intf_list *il);
void free_sfd_intf_list(struct sfd_intf_list *il);
void free_release_sfd_intf_list(struct sfd_intf_list *il);
void free_socket_intf_list(struct socket_intf_list *il);
INLINE int open_intf_socket(socket_t *r, unsigned int port, const struct local_intf *lif) {

@ -9,6 +9,7 @@
#include <errno.h>
#include <netinet/tcp.h>
#include <stdbool.h>
#include "containers.h"
@ -34,6 +35,8 @@ typedef struct socket socket_t;
typedef const struct socket_type socktype_t;
typedef const struct socket_family sockfamily_t;
TYPED_GQUEUE(socket, socket_t)
#include "str.h"

Loading…
Cancel
Save