MT#55283 use typed GQueue for stream_fd

Change-Id: I75544a48c79481473effa9651f1ad2b59b234dc6
pull/1776/head
Richard Fuchs 2 years ago
parent f4677f6b5b
commit 4e7078834f

@ -834,7 +834,7 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) {
// use opaque pointer to detect changes
void *old_selected_sfd = ps->selected_sfd;
g_queue_clear(&ps->sfds);
t_queue_clear(&ps->sfds);
bool sfd_found = false;
struct stream_fd *intf_sfd = NULL;
@ -853,7 +853,7 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) {
}
sfd->stream = ps;
g_queue_push_tail(&ps->sfds, sfd);
t_queue_push_tail(&ps->sfds, sfd);
if (ps->selected_sfd == sfd)
sfd_found = true;
@ -865,7 +865,7 @@ static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) {
if (intf_sfd)
ps->selected_sfd = intf_sfd;
else
ps->selected_sfd = g_queue_peek_nth(&ps->sfds, 0);
ps->selected_sfd = t_queue_peek_nth(&ps->sfds, 0);
}
if (old_selected_sfd && ps->selected_sfd && old_selected_sfd != ps->selected_sfd)
@ -1057,7 +1057,7 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) {
}
if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) {
for (GList *l = ps->sfds.head; l; l = l->next) {
for (__auto_type l = ps->sfds.head; l; l = l->next) {
static const str fake_rtp = STR_CONST_INIT("\x80\x7f\xff\xff\x00\x00\x00\x00"
"\x00\x00\x00\x00");
struct stream_fd *sfd = l->data;
@ -1096,7 +1096,7 @@ int __init_stream(struct packet_stream *ps) {
dtls_shutdown(ps);
if (MEDIA_ISSET(media, SDES) && dtls_active == -1) {
for (GList *l = ps->sfds.head; l; l = l->next) {
for (__auto_type l = ps->sfds.head; l; l = l->next) {
struct stream_fd *sfd = l->data;
struct crypto_params_sdes *cps = media->sdes_in.head
? media->sdes_in.head->data : NULL;
@ -1121,7 +1121,7 @@ int __init_stream(struct packet_stream *ps) {
if (dtls_active == -1)
dtls_active = (PS_ISSET(ps, FILLED) && MEDIA_ISSET(media, SETUP_ACTIVE));
dtls_connection_init(&ps->ice_dtls, ps, dtls_active, call->dtls_cert);
for (GList *l = ps->sfds.head; l; l = l->next) {
for (__auto_type l = ps->sfds.head; l; l = l->next) {
struct stream_fd *sfd = l->data;
dtls_connection_init(&sfd->dtls, ps, dtls_active, call->dtls_cert);
}
@ -1928,7 +1928,7 @@ static void __disable_streams(struct call_media *media, unsigned int num_ports)
for (l = media->streams.head; l; l = l->next) {
ps = l->data;
g_queue_clear(&ps->sfds);
t_queue_clear(&ps->sfds);
ps->selected_sfd = NULL;
}
}
@ -2026,11 +2026,8 @@ static void __fingerprint_changed(struct call_media *m) {
}
static void __set_all_tos(struct call *c) {
GList *l;
struct stream_fd *sfd;
for (l = c->stream_fds.head; l; l = l->next) {
sfd = l->data;
for (__auto_type l = c->stream_fds.head; l; l = l->next) {
struct stream_fd *sfd = l->data;
if (sfd->socket.fd == -1)
continue;
set_tos(&sfd->socket, c->tos);
@ -3571,7 +3568,7 @@ static void __call_cleanup(struct call *c) {
__unkernelize(ps, "final call cleanup");
dtls_shutdown(ps);
ps->selected_sfd = NULL;
g_queue_clear(&ps->sfds);
t_queue_clear(&ps->sfds);
crypto_cleanup(&ps->crypto);
g_queue_clear_full(&ps->rtp_sinks, free_sink_handler);
@ -3599,7 +3596,7 @@ static void __call_cleanup(struct call *c) {
}
while (c->stream_fds.head) {
struct stream_fd *sfd = g_queue_pop_head(&c->stream_fds);
struct stream_fd *sfd = t_queue_pop_head(&c->stream_fds);
stream_fd_release(sfd);
obj_put(sfd);
}
@ -3929,7 +3926,7 @@ static void __call_free(void *p) {
while (c->streams.head) {
ps = g_queue_pop_head(&c->streams);
crypto_cleanup(&ps->crypto);
g_queue_clear(&ps->sfds);
t_queue_clear(&ps->sfds);
g_hash_table_destroy(ps->rtp_stats);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++)
ssrc_ctx_put(&ps->ssrc_in[u]);
@ -4260,7 +4257,7 @@ void monologue_destroy(struct call_monologue *monologue) {
ps->selected_sfd = NULL;
struct stream_fd *sfd;
while ((sfd = g_queue_pop_head(&ps->sfds)))
while ((sfd = t_queue_pop_head(&ps->sfds)))
stream_fd_release(sfd);
}
}

@ -762,7 +762,7 @@ found:
crypto_init(&ps->selected_sfd->crypto, &client);
}
// it's possible that ps->selected_sfd is not from ps->sfds list (?)
for (GList *l = ps->sfds.head; l; l = l->next) {
for (__auto_type l = ps->sfds.head; l; l = l->next) {
struct stream_fd *sfd = l->data;
if (d->active) /* we're the client */
crypto_init(&sfd->crypto, &server);
@ -905,7 +905,7 @@ void dtls_shutdown(struct packet_stream *ps) {
}
dtls_connection_cleanup(&ps->ice_dtls);
}
for (GList *l = ps->sfds.head; l; l = l->next) {
for (__auto_type l = ps->sfds.head; l; l = l->next) {
struct stream_fd *sfd = l->data;
struct dtls_connection *d = &sfd->dtls;

@ -490,7 +490,7 @@ void ice_restart(struct ice_agent *ag) {
/* called with the call lock held in W, hence agent doesn't need to be locked */
void ice_update(struct ice_agent *ag, struct stream_params *sp, bool allow_reset) {
GList *l, *k;
GList *l;
struct ice_candidate *cand, *dup;
struct call_media *media;
struct call *call;
@ -607,7 +607,7 @@ pair:
if (!ps)
continue;
for (k = ps->sfds.head; k; k = k->next) {
for (__auto_type k = ps->sfds.head; k; k = k->next) {
sfd = k->data;
/* skip duplicates here also */
if (__pair_lookup(ag, dup, sfd->local_intf))
@ -1207,7 +1207,7 @@ found:
static int __check_valid(struct ice_agent *ag) {
struct call_media *media;
struct packet_stream *ps;
GList *l, *k, *m;
GList *l, *k;
GQueue all_compos;
struct ice_candidate_pair *pair;
// const struct local_intf *ifa;
@ -1258,7 +1258,7 @@ static int __check_valid(struct ice_agent *ag) {
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
mutex_unlock(&ps->out_lock);
for (m = ps->sfds.head; m; m = m->next) {
for (__auto_type m = ps->sfds.head; m; m = m->next) {
sfd = m->data;
if (sfd->local_intf != pair->local_intf)
continue;

@ -2546,7 +2546,7 @@ update_addr:
if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) {
// make sure the new interface/socket is actually one from the list of sockets
// that we intend to use, and not an old one from a previous negotiation
GList *contains = g_queue_find(&phc->mp.stream->sfds, phc->mp.sfd);
__auto_type contains = t_queue_find(&phc->mp.stream->sfds, phc->mp.sfd);
if (!contains)
ilog(LOG_INFO | LOG_FLAG_LIMIT, "Not switching from local socket %s to %s (not in list)",
endpoint_print_buf(&phc->mp.stream->selected_sfd->socket.local),
@ -3236,11 +3236,11 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, struct local_in
struct poller *p = rtpe_poller;
sfd = obj_alloc0("stream_fd", sizeof(*sfd), stream_fd_free);
sfd->unique_id = g_queue_get_length(&call->stream_fds);
sfd->unique_id = t_queue_get_length(&call->stream_fds);
sfd->socket = *fd;
sfd->call = obj_get(call);
sfd->local_intf = lif;
g_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */
t_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */
g_slice_free1(sizeof(*fd), fd); /* moved into sfd, thus free */
__C_DBG("stream_fd_new localport=%d", sfd->socket.local.port);

@ -37,6 +37,7 @@
typedef union {
GQueue *q;
stream_fd_q *sfds_q;
GPtrArray *pa;
void *v;
} callback_arg_t __attribute__ ((__transparent_union__));
@ -2349,7 +2350,7 @@ char* redis_encode_json(struct call *c) {
JSON_SET_SIMPLE("last_signal","%ld", (long int) c->last_signal);
JSON_SET_SIMPLE("tos","%u", (int) c->tos);
JSON_SET_SIMPLE("deleted","%ld", (long int) c->deleted);
JSON_SET_SIMPLE("num_sfds","%u", g_queue_get_length(&c->stream_fds));
JSON_SET_SIMPLE("num_sfds","%u", t_queue_get_length(&c->stream_fds));
JSON_SET_SIMPLE("num_streams","%u", g_queue_get_length(&c->streams));
JSON_SET_SIMPLE("num_medias","%u", g_queue_get_length(&c->medias));
JSON_SET_SIMPLE("num_tags","%u", g_queue_get_length(&c->monologues));
@ -2368,7 +2369,7 @@ char* redis_encode_json(struct call *c) {
json_builder_end_object(builder);
for (GList *l = c->stream_fds.head; l; l = l->next) {
for (__auto_type l = c->stream_fds.head; l; l = l->next) {
struct stream_fd *sfd = l->data;
snprintf(tmp, sizeof(tmp), "sfd-%u", sfd->unique_id);
@ -2435,7 +2436,7 @@ char* redis_encode_json(struct call *c) {
snprintf(tmp, sizeof(tmp), "stream_sfds-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (GList *k = ps->sfds.head; k; k = k->next) {
for (__auto_type k = ps->sfds.head; k; k = k->next) {
struct stream_fd *sfd = k->data;
JSON_ADD_STRING("%u", sfd->unique_id);
}

@ -2656,11 +2656,8 @@ static void insert_sfd_candidates(GString *s, struct packet_stream *ps,
unsigned int type_pref, unsigned int local_pref, enum ice_candidate_type type,
sdp_ng_flags *flags, struct sdp_media *sdp_media)
{
GList *l;
struct stream_fd *sfd;
for (l = ps->sfds.head; l; l = l->next) {
sfd = l->data;
for (__auto_type l = ps->sfds.head; l; l = l->next) {
struct stream_fd *sfd = l->data;
insert_candidate(s, sfd, type_pref, local_pref, type, flags, sdp_media);
if (local_pref != -1)

@ -381,7 +381,7 @@ struct packet_stream {
unsigned int unique_id; /* RO */
struct recording_stream recording; /* LOCK: call->master_lock */
GQueue sfds; /* LOCK: call->master_lock */
stream_fd_q sfds; /* LOCK: call->master_lock */
struct stream_fd * selected_sfd;
endpoint_t last_local_endpoint;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
@ -666,7 +666,7 @@ struct call {
GHashTable *viabranches;
GHashTable *labels;
GQueue streams;
GQueue stream_fds; /* stream_fd */
stream_fd_q stream_fds; /* stream_fd */
GQueue endpoint_maps;
struct dtls_cert *dtls_cert; /* for outgoing */
struct mqtt_timer *mqtt_timer;

@ -12,6 +12,7 @@
#include "crypto.h"
#include "socket.h"
#include "xt_RTPENGINE.h"
#include "containers.h"
@ -25,6 +26,9 @@ struct stream_fd;
struct poller;
struct media_player_cache_entry;
TYPED_GQUEUE(stream_fd, struct stream_fd)
typedef int rtcp_filter_func(struct media_packet *, GQueue *);
typedef int (*rewrite_func)(str *, struct packet_stream *, struct ssrc_ctx *);

Loading…
Cancel
Save