diff --git a/daemon/call.c b/daemon/call.c index e37ab4f2a..cda239d95 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -668,7 +668,7 @@ static int __media_want_interfaces(struct call_media *media) { } static void __endpoint_map_truncate(struct endpoint_map *em, unsigned int num_intfs) { while (em->intf_sfds.length > num_intfs) { - struct sfd_intf_list *il = g_queue_pop_tail(&em->intf_sfds); + struct sfd_intf_list *il = t_queue_pop_tail(&em->intf_sfds); free_sfd_intf_list(il); } } @@ -682,9 +682,9 @@ static struct endpoint_map *__hunt_endpoint_map(struct call_media *media, unsign continue; // any of our sockets shut down? - for (GList *k = em->intf_sfds.head; k; k = k->next) { + for (__auto_type k = em->intf_sfds.head; k; k = k->next) { struct sfd_intf_list *il = k->data; - for (GList *j = il->list.head; j; j = j->next) { + for (__auto_type j = il->list.head; j; j = j->next) { struct stream_fd *sfd = j->data; if (sfd->socket.fd == -1) return NULL; @@ -722,7 +722,7 @@ static struct endpoint_map *__hunt_endpoint_map(struct call_media *media, unsign /* endpoint matches, but not enough ports. flush existing ports * and allocate a new set. */ __C_DBG("endpoint matches, doesn't have enough ports"); - g_queue_clear_full(&em->intf_sfds, (void *) free_intf_list); + t_queue_clear_full(&em->intf_sfds, free_sfd_intf_list); return em; } @@ -755,7 +755,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne const struct endpoint *ep, const sdp_ng_flags *flags, bool always_reuse) { struct stream_fd *sfd; - GQueue intf_sockets = G_QUEUE_INIT; + socket_intf_list_q intf_sockets = TYPED_GQUEUE_INIT; unsigned int want_interfaces = __media_want_interfaces(media); bool port_latching = false; @@ -786,7 +786,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne em->wildcard = 1; em->logical_intf = media->logical_intf; em->num_ports = num_ports; - g_queue_init(&em->intf_sfds); + t_queue_init(&em->intf_sfds); g_queue_push_tail(&media->endpoint_maps, em); } @@ -798,16 +798,16 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne __C_DBG("allocating stream_fds for %u ports", num_ports); struct socket_intf_list *il; - while ((il = g_queue_pop_head(&intf_sockets))) { + while ((il = t_queue_pop_head(&intf_sockets))) { if (il->list.length != num_ports) goto next_il; struct sfd_intf_list *em_il = g_slice_alloc0(sizeof(*em_il)); em_il->local_intf = il->local_intf; - g_queue_push_tail(&em->intf_sfds, em_il); + t_queue_push_tail(&em->intf_sfds, em_il); socket_t *sock; - while ((sock = g_queue_pop_head(&il->list))) { + while ((sock = t_queue_pop_head(&il->list))) { set_tos(sock, media->call->tos); if (media->call->cpu_affinity >= 0) { if (socket_cpu_affinity(sock, media->call->cpu_affinity)) @@ -815,7 +815,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne "affinity: %s", strerror(errno)); } sfd = stream_fd_new(sock, media->call, il->local_intf); - g_queue_push_tail(&em_il->list, sfd); // not referenced + t_queue_push_tail(&em_il->list, sfd); // not referenced } next_il: @@ -825,7 +825,7 @@ next_il: return em; } -static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) { +static void __assign_stream_fds(struct call_media *media, sfd_intf_list_q *intf_sfds) { int reset_ice = 0; for (GList *k = media->streams.head; k; k = k->next) { @@ -838,10 +838,10 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) { bool sfd_found = false; struct stream_fd *intf_sfd = NULL; - for (GList *l = intf_sfds->head; l; l = l->next) { + for (__auto_type l = intf_sfds->head; l; l = l->next) { struct sfd_intf_list *il = l->data; - struct stream_fd *sfd = g_queue_peek_nth(&il->list, ps->component - 1); + struct stream_fd *sfd = t_queue_peek_nth(&il->list, ps->component - 1); if (!sfd) sfd = ps->selected_sfd; if (!sfd) { @@ -3913,7 +3913,7 @@ static void __call_free(void *p) { while (c->endpoint_maps.head) { em = g_queue_pop_head(&c->endpoint_maps); - g_queue_clear_full(&em->intf_sfds, (void *) free_intf_list); + t_queue_clear_full(&em->intf_sfds, free_sfd_intf_list); g_slice_free1(sizeof(*em), em); } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index f4d793aff..5377db6aa 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1011,7 +1011,7 @@ void append_thread_lpr_to_glob_lpr(void) { * @param spec, interface specifications * @param out, a list of sockets for this particular session (not a global list) */ -int __get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int wanted_start_port, +int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int wanted_start_port, struct intf_spec *spec, const str *label) { unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0; @@ -1167,7 +1167,7 @@ new_cycle: ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port); sk = g_slice_alloc0(sizeof(*sk)); sk->fd = -1; - g_queue_push_tail(out, sk); + t_queue_push_tail(out, sk); /* if not possible to engage this socket, try to reallocate it again */ if (add_socket(sk, port, spec, label)) { @@ -1188,7 +1188,7 @@ new_cycle: release_restart: /* release all previously engaged sockets */ - while ((sk = g_queue_pop_head(out))) + while ((sk = t_queue_pop_head(out))) free_port(sk, spec); /* engaged ports will be released here */ /* do not re-try for specifically wanted ports */ @@ -1210,7 +1210,7 @@ fail: } /* puts a list of "struct intf_list" into "out", containing socket_t list */ -int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media) +int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media) { GList *l; struct socket_intf_list *il; @@ -1237,7 +1237,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_ il = g_slice_alloc0(sizeof(*il)); il->local_intf = loc; - g_queue_push_tail(out, il); + t_queue_push_tail(out, il); if (G_LIKELY(!__get_consecutive_ports(&il->list, num_ports, 0, loc->spec, label))) { // success - found available ports on local interfaces, so far continue; @@ -1254,7 +1254,7 @@ error_ports: num_ports, STR_FMT(&log->name)); // free all ports alloc'ed so far for the previous local interfaces - while ((il = g_queue_pop_head(out))) { + while ((il = t_queue_pop_head(out))) { free_socket_intf_list(il); } @@ -1264,16 +1264,16 @@ error_ports: void free_socket_intf_list(struct socket_intf_list *il) { socket_t *sock; - while ((sock = g_queue_pop_head(&il->list))) + while ((sock = t_queue_pop_head(&il->list))) free_port(sock, il->local_intf->spec); g_slice_free1(sizeof(*il), il); } -void free_intf_list(struct socket_intf_list *il) { - g_queue_clear(&il->list); +void free_sfd_intf_list(struct sfd_intf_list *il) { + t_queue_clear(&il->list); g_slice_free1(sizeof(*il), il); } -void free_sfd_intf_list(struct sfd_intf_list *il) { - g_queue_clear_full(&il->list, (GDestroyNotify) stream_fd_release); +void free_release_sfd_intf_list(struct sfd_intf_list *il) { + t_queue_clear_full(&il->list, stream_fd_release); g_slice_free1(sizeof(*il), il); } diff --git a/daemon/redis.c b/daemon/redis.c index 64b42ecaf..f43ad6655 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -39,6 +39,7 @@ typedef union { GQueue *q; stream_fd_q *sfds_q; GPtrArray *pa; + sfd_intf_list_q *siq; void *v; } callback_arg_t __attribute__ ((__transparent_union__)); @@ -1379,7 +1380,7 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) { sockfamily_t *fam; struct logical_intf *lif; struct local_intf *loc; - GQueue q = G_QUEUE_INIT; + socket_q q = TYPED_GQUEUE_INIT; unsigned int loc_uid; struct stream_fd *sfd; socket_t *sock; @@ -1422,7 +1423,7 @@ static int redis_sfds(struct call *c, struct redis_list *sfds) { if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid)) goto err; err = "no port returned"; - sock = g_queue_pop_head(&q); + sock = t_queue_pop_head(&q); if (!sock) goto err; set_tos(sock, c->tos); @@ -1621,7 +1622,7 @@ static int redis_maps(struct call *c, struct redis_list *maps) { /* from call.c:__get_endpoint_map() */ em = uid_slice_alloc0(em, &c->endpoint_maps); - g_queue_init(&em->intf_sfds); + t_queue_init(&em->intf_sfds); em->wildcard = redis_hash_get_bool_flag(rh, "wildcard"); if (redis_hash_get_unsigned(&em->num_ports, rh, "num_ports")) @@ -1853,7 +1854,7 @@ static int json_link_medias(struct call *c, struct redis_list *medias, g_hash_table_insert(med->monologue->media_ids, &med->media_id, med); /* find the pair media to subscribe */ - if (!json_build_list_cb((callback_arg_t) NULL, c, "media-subscriptions", med->unique_id, + if (!json_build_list_cb(NULL, c, "media-subscriptions", med->unique_id, medias, rbl_subs_cb, med, root_reader)) { rlog(LOG_DEBUG, "Restored media subscriptions for: '" STR_FORMAT_M "'", STR_FMT_M(&med->monologue->tag)); @@ -1863,7 +1864,7 @@ static int json_link_medias(struct call *c, struct redis_list *medias, } static int rbl_cb_intf_sfds(str *s, callback_arg_t qp, struct redis_list *list, void *ptr) { - GQueue *q = qp.q; + sfd_intf_list_q *q = qp.siq; int i; struct sfd_intf_list *il; struct endpoint_map *em; @@ -1876,11 +1877,11 @@ static int rbl_cb_intf_sfds(str *s, callback_arg_t qp, struct redis_list *list, il->local_intf = g_queue_peek_nth((GQueue*) &em->logical_intf->list, i); if (!il->local_intf) return -1; - g_queue_push_tail(q, il); + t_queue_push_tail(q, il); return 0; } - il = g_queue_peek_tail(q); + il = t_queue_peek_tail(q); if (!il) return -1; @@ -1888,7 +1889,7 @@ static int rbl_cb_intf_sfds(str *s, callback_arg_t qp, struct redis_list *list, if (G_UNLIKELY(!sfd)) return -1; - g_queue_push_tail(&il->list, sfd); + t_queue_push_tail(&il->list, sfd); return 0; } @@ -1901,7 +1902,7 @@ static int json_link_maps(struct call *c, struct redis_list *maps, for (i = 0; i < maps->len; i++) { em = maps->ptrs[i]; - if (json_build_list_cb((callback_arg_t) &em->intf_sfds, c, "map_sfds", em->unique_id, sfds, + if (json_build_list_cb(&em->intf_sfds, c, "map_sfds", em->unique_id, sfds, rbl_cb_intf_sfds, em, root_reader)) return -1; } @@ -2661,10 +2662,10 @@ char* redis_encode_json(struct call *c) { snprintf(tmp, sizeof(tmp), "map_sfds-%u", ep->unique_id); json_builder_set_member_name(builder, tmp); json_builder_begin_array(builder); - for (GList *m = ep->intf_sfds.head; m; m = m->next) { + for (__auto_type m = ep->intf_sfds.head; m; m = m->next) { struct sfd_intf_list *il = m->data; JSON_ADD_STRING("loc-%u", il->local_intf->unique_id); - for (GList *n = il->list.head; n; n = n->next) { + for (__auto_type n = il->list.head; n; n = n->next) { struct stream_fd *sfd = n->data; JSON_ADD_STRING("%u", sfd->unique_id); } diff --git a/include/call.h b/include/call.h index 12ca96844..c75f35189 100644 --- a/include/call.h +++ b/include/call.h @@ -347,7 +347,7 @@ struct endpoint_map { struct endpoint endpoint; unsigned int num_ports; const struct logical_intf *logical_intf; - GQueue intf_sfds; /* list of struct sfd_intf_list - contains stream_fd list */ + sfd_intf_list_q intf_sfds; /* list of struct sfd_intf_list - contains stream_fd list */ unsigned int wildcard:1; }; diff --git a/include/media_socket.h b/include/media_socket.h index 88814a2cf..d3f0ed2fb 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -193,12 +193,14 @@ struct local_intf { }; struct socket_intf_list { struct local_intf *local_intf; - GQueue list; + socket_q list; }; struct sfd_intf_list { struct local_intf *local_intf; - GQueue list; + stream_fd_q list; }; +TYPED_GQUEUE(socket_intf_list, struct socket_intf_list) +TYPED_GQUEUE(sfd_intf_list, struct sfd_intf_list) /** * stream_fd is an entry-point object for RTP packets handling, @@ -301,17 +303,17 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port); //int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const struct call *c); //void release_port(socket_t *r, const struct local_intf *); -int __get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int wanted_start_port, +int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int wanted_start_port, struct intf_spec *spec, const str *); -int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media); +int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media); struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_intf *lif); struct stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(struct stream_fd *); enum thread_looper_action release_closed_sockets(void); void append_thread_lpr_to_glob_lpr(void); -void free_intf_list(struct socket_intf_list *il); void free_sfd_intf_list(struct sfd_intf_list *il); +void free_release_sfd_intf_list(struct sfd_intf_list *il); void free_socket_intf_list(struct socket_intf_list *il); INLINE int open_intf_socket(socket_t *r, unsigned int port, const struct local_intf *lif) { diff --git a/lib/socket.h b/lib/socket.h index 48ed49a66..454aab78c 100644 --- a/lib/socket.h +++ b/lib/socket.h @@ -9,6 +9,7 @@ #include #include #include +#include "containers.h" @@ -34,6 +35,8 @@ typedef struct socket socket_t; typedef const struct socket_type socktype_t; typedef const struct socket_family sockfamily_t; +TYPED_GQUEUE(socket, socket_t) + #include "str.h"