From 4e7078834fc978ce9d50ef45d1b1b1efa245a508 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 29 Nov 2023 09:27:32 -0500 Subject: [PATCH] MT#55283 use typed GQueue for stream_fd Change-Id: I75544a48c79481473effa9651f1ad2b59b234dc6 --- daemon/call.c | 29 +++++++++++++---------------- daemon/dtls.c | 4 ++-- daemon/ice.c | 8 ++++---- daemon/media_socket.c | 6 +++--- daemon/redis.c | 7 ++++--- daemon/sdp.c | 7 ++----- include/call.h | 4 ++-- include/media_socket.h | 4 ++++ 8 files changed, 34 insertions(+), 35 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 5195f3893..948f15a51 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -834,7 +834,7 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) { // use opaque pointer to detect changes void *old_selected_sfd = ps->selected_sfd; - g_queue_clear(&ps->sfds); + t_queue_clear(&ps->sfds); bool sfd_found = false; struct stream_fd *intf_sfd = NULL; @@ -853,7 +853,7 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) { } sfd->stream = ps; - g_queue_push_tail(&ps->sfds, sfd); + t_queue_push_tail(&ps->sfds, sfd); if (ps->selected_sfd == sfd) sfd_found = true; @@ -865,7 +865,7 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) { if (intf_sfd) ps->selected_sfd = intf_sfd; else - ps->selected_sfd = g_queue_peek_nth(&ps->sfds, 0); + ps->selected_sfd = t_queue_peek_nth(&ps->sfds, 0); } if (old_selected_sfd && ps->selected_sfd && old_selected_sfd != ps->selected_sfd) @@ -1057,7 +1057,7 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) { } if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) { - for (GList *l = ps->sfds.head; l; l = l->next) { + for (__auto_type l = ps->sfds.head; l; l = l->next) { static const str fake_rtp = STR_CONST_INIT("\x80\x7f\xff\xff\x00\x00\x00\x00" "\x00\x00\x00\x00"); struct stream_fd *sfd = l->data; @@ -1096,7 +1096,7 @@ int __init_stream(struct packet_stream *ps) { dtls_shutdown(ps); if (MEDIA_ISSET(media, SDES) && dtls_active == -1) { - for (GList *l = ps->sfds.head; l; l = l->next) { + for (__auto_type l = ps->sfds.head; l; l = l->next) { struct stream_fd *sfd = l->data; struct crypto_params_sdes *cps = media->sdes_in.head ? media->sdes_in.head->data : NULL; @@ -1121,7 +1121,7 @@ int __init_stream(struct packet_stream *ps) { if (dtls_active == -1) dtls_active = (PS_ISSET(ps, FILLED) && MEDIA_ISSET(media, SETUP_ACTIVE)); dtls_connection_init(&ps->ice_dtls, ps, dtls_active, call->dtls_cert); - for (GList *l = ps->sfds.head; l; l = l->next) { + for (__auto_type l = ps->sfds.head; l; l = l->next) { struct stream_fd *sfd = l->data; dtls_connection_init(&sfd->dtls, ps, dtls_active, call->dtls_cert); } @@ -1928,7 +1928,7 @@ static void __disable_streams(struct call_media *media, unsigned int num_ports) for (l = media->streams.head; l; l = l->next) { ps = l->data; - g_queue_clear(&ps->sfds); + t_queue_clear(&ps->sfds); ps->selected_sfd = NULL; } } @@ -2026,11 +2026,8 @@ static void __fingerprint_changed(struct call_media *m) { } static void __set_all_tos(struct call *c) { - GList *l; - struct stream_fd *sfd; - - for (l = c->stream_fds.head; l; l = l->next) { - sfd = l->data; + for (__auto_type l = c->stream_fds.head; l; l = l->next) { + struct stream_fd *sfd = l->data; if (sfd->socket.fd == -1) continue; set_tos(&sfd->socket, c->tos); @@ -3571,7 +3568,7 @@ static void __call_cleanup(struct call *c) { __unkernelize(ps, "final call cleanup"); dtls_shutdown(ps); ps->selected_sfd = NULL; - g_queue_clear(&ps->sfds); + t_queue_clear(&ps->sfds); crypto_cleanup(&ps->crypto); g_queue_clear_full(&ps->rtp_sinks, free_sink_handler); @@ -3599,7 +3596,7 @@ static void __call_cleanup(struct call *c) { } while (c->stream_fds.head) { - struct stream_fd *sfd = g_queue_pop_head(&c->stream_fds); + struct stream_fd *sfd = t_queue_pop_head(&c->stream_fds); stream_fd_release(sfd); obj_put(sfd); } @@ -3929,7 +3926,7 @@ static void __call_free(void *p) { while (c->streams.head) { ps = g_queue_pop_head(&c->streams); crypto_cleanup(&ps->crypto); - g_queue_clear(&ps->sfds); + t_queue_clear(&ps->sfds); g_hash_table_destroy(ps->rtp_stats); for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) ssrc_ctx_put(&ps->ssrc_in[u]); @@ -4260,7 +4257,7 @@ void monologue_destroy(struct call_monologue *monologue) { ps->selected_sfd = NULL; struct stream_fd *sfd; - while ((sfd = g_queue_pop_head(&ps->sfds))) + while ((sfd = t_queue_pop_head(&ps->sfds))) stream_fd_release(sfd); } } diff --git a/daemon/dtls.c b/daemon/dtls.c index d163305a5..4be428d57 100644 --- a/daemon/dtls.c +++ b/daemon/dtls.c @@ -762,7 +762,7 @@ found: crypto_init(&ps->selected_sfd->crypto, &client); } // it's possible that ps->selected_sfd is not from ps->sfds list (?) - for (GList *l = ps->sfds.head; l; l = l->next) { + for (__auto_type l = ps->sfds.head; l; l = l->next) { struct stream_fd *sfd = l->data; if (d->active) /* we're the client */ crypto_init(&sfd->crypto, &server); @@ -905,7 +905,7 @@ void dtls_shutdown(struct packet_stream *ps) { } dtls_connection_cleanup(&ps->ice_dtls); } - for (GList *l = ps->sfds.head; l; l = l->next) { + for (__auto_type l = ps->sfds.head; l; l = l->next) { struct stream_fd *sfd = l->data; struct dtls_connection *d = &sfd->dtls; diff --git a/daemon/ice.c b/daemon/ice.c index a91e74a1b..f5808706e 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -490,7 +490,7 @@ void ice_restart(struct ice_agent *ag) { /* called with the call lock held in W, hence agent doesn't need to be locked */ void ice_update(struct ice_agent *ag, struct stream_params *sp, bool allow_reset) { - GList *l, *k; + GList *l; struct ice_candidate *cand, *dup; struct call_media *media; struct call *call; @@ -607,7 +607,7 @@ pair: if (!ps) continue; - for (k = ps->sfds.head; k; k = k->next) { + for (__auto_type k = ps->sfds.head; k; k = k->next) { sfd = k->data; /* skip duplicates here also */ if (__pair_lookup(ag, dup, sfd->local_intf)) @@ -1207,7 +1207,7 @@ found: static int __check_valid(struct ice_agent *ag) { struct call_media *media; struct packet_stream *ps; - GList *l, *k, *m; + GList *l, *k; GQueue all_compos; struct ice_candidate_pair *pair; // const struct local_intf *ifa; @@ -1258,7 +1258,7 @@ static int __check_valid(struct ice_agent *ag) { FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint))); mutex_unlock(&ps->out_lock); - for (m = ps->sfds.head; m; m = m->next) { + for (__auto_type m = ps->sfds.head; m; m = m->next) { sfd = m->data; if (sfd->local_intf != pair->local_intf) continue; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index a11d2da6e..16e1a071d 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -2546,7 +2546,7 @@ update_addr: if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) { // make sure the new interface/socket is actually one from the list of sockets // that we intend to use, and not an old one from a previous negotiation - GList *contains = g_queue_find(&phc->mp.stream->sfds, phc->mp.sfd); + __auto_type contains = t_queue_find(&phc->mp.stream->sfds, phc->mp.sfd); if (!contains) ilog(LOG_INFO | LOG_FLAG_LIMIT, "Not switching from local socket %s to %s (not in list)", endpoint_print_buf(&phc->mp.stream->selected_sfd->socket.local), @@ -3236,11 +3236,11 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_in struct poller *p = rtpe_poller; sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free); - sfd->unique_id = g_queue_get_length(&call->stream_fds); + sfd->unique_id = t_queue_get_length(&call->stream_fds); sfd->socket = *fd; sfd->call = obj_get(call); sfd->local_intf = lif; - g_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */ + t_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */ g_slice_free1(sizeof(*fd), fd); /* moved into sfd, thus free */ __C_DBG("stream_fd_new localport=%d", sfd->socket.local.port); diff --git a/daemon/redis.c b/daemon/redis.c index 5dcbfd197..a80b2a1cc 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -37,6 +37,7 @@ typedef union { GQueue *q; + stream_fd_q *sfds_q; GPtrArray *pa; void *v; } callback_arg_t __attribute__ ((__transparent_union__)); @@ -2349,7 +2350,7 @@ char* redis_encode_json(struct call *c) { JSON_SET_SIMPLE("last_signal","%ld", (long int) c->last_signal); JSON_SET_SIMPLE("tos","%u", (int) c->tos); JSON_SET_SIMPLE("deleted","%ld", (long int) c->deleted); - JSON_SET_SIMPLE("num_sfds","%u", g_queue_get_length(&c->stream_fds)); + JSON_SET_SIMPLE("num_sfds","%u", t_queue_get_length(&c->stream_fds)); JSON_SET_SIMPLE("num_streams","%u", g_queue_get_length(&c->streams)); JSON_SET_SIMPLE("num_medias","%u", g_queue_get_length(&c->medias)); JSON_SET_SIMPLE("num_tags","%u", g_queue_get_length(&c->monologues)); @@ -2368,7 +2369,7 @@ char* redis_encode_json(struct call *c) { json_builder_end_object(builder); - for (GList *l = c->stream_fds.head; l; l = l->next) { + for (__auto_type l = c->stream_fds.head; l; l = l->next) { struct stream_fd *sfd = l->data; snprintf(tmp, sizeof(tmp), "sfd-%u", sfd->unique_id); @@ -2435,7 +2436,7 @@ char* redis_encode_json(struct call *c) { snprintf(tmp, sizeof(tmp), "stream_sfds-%u", ps->unique_id); json_builder_set_member_name(builder, tmp); json_builder_begin_array(builder); - for (GList *k = ps->sfds.head; k; k = k->next) { + for (__auto_type k = ps->sfds.head; k; k = k->next) { struct stream_fd *sfd = k->data; JSON_ADD_STRING("%u", sfd->unique_id); } diff --git a/daemon/sdp.c b/daemon/sdp.c index 50992f0cf..e74d87c9d 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -2656,11 +2656,8 @@ static void insert_sfd_candidates(GString *s, struct packet_stream *ps, unsigned int type_pref, unsigned int local_pref, enum ice_candidate_type type, sdp_ng_flags *flags, struct sdp_media *sdp_media) { - GList *l; - struct stream_fd *sfd; - - for (l = ps->sfds.head; l; l = l->next) { - sfd = l->data; + for (__auto_type l = ps->sfds.head; l; l = l->next) { + struct stream_fd *sfd = l->data; insert_candidate(s, sfd, type_pref, local_pref, type, flags, sdp_media); if (local_pref != -1) diff --git a/include/call.h b/include/call.h index 173dffc7e..150bee63b 100644 --- a/include/call.h +++ b/include/call.h @@ -381,7 +381,7 @@ struct packet_stream { unsigned int unique_id; /* RO */ struct recording_stream recording; /* LOCK: call->master_lock */ - GQueue sfds; /* LOCK: call->master_lock */ + stream_fd_q sfds; /* LOCK: call->master_lock */ struct stream_fd * selected_sfd; endpoint_t last_local_endpoint; struct dtls_connection ice_dtls; /* LOCK: in_lock */ @@ -666,7 +666,7 @@ struct call { GHashTable *viabranches; GHashTable *labels; GQueue streams; - GQueue stream_fds; /* stream_fd */ + stream_fd_q stream_fds; /* stream_fd */ GQueue endpoint_maps; struct dtls_cert *dtls_cert; /* for outgoing */ struct mqtt_timer *mqtt_timer; diff --git a/include/media_socket.h b/include/media_socket.h index 3c77b804e..7979e5cb2 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -12,6 +12,7 @@ #include "crypto.h" #include "socket.h" #include "xt_RTPENGINE.h" +#include "containers.h" @@ -25,6 +26,9 @@ struct stream_fd; struct poller; struct media_player_cache_entry; +TYPED_GQUEUE(stream_fd, struct stream_fd) + + typedef int rtcp_filter_func(struct media_packet *, GQueue *); typedef int (*rewrite_func)(str *, struct packet_stream *, struct ssrc_ctx *);