MT#55283 use typed GQueue for stream/handlers

Change-Id: I874680db25f26466400f4a08641a97cb6f7be36b
pull/1776/head
Richard Fuchs 2 years ago
parent dd3471d919
commit 57598e1255

@ -131,7 +131,6 @@ void call_make_own_foreign(struct call *c, bool foreign) {
/* called with hashlock held */
static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
GList *it;
unsigned int check;
bool good = false;
struct packet_stream *ps;
@ -152,7 +151,7 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
if (rtpe_config.final_timeout && rtpe_now.tv_sec >= (c->created.tv_sec + rtpe_config.final_timeout)) {
ilog(LOG_INFO, "Closing call due to final timeout");
tmp_t_reason = FINAL_TIMEOUT;
for (it = c->monologues.head; it; it = it->next) {
for (GList *it = c->monologues.head; it; it = it->next) {
ml = it->data;
gettimeofday(&(ml->terminated),NULL);
ml->term_reason = tmp_t_reason;
@ -183,7 +182,7 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
if (CALL_ISSET(c, FOREIGN_MEDIA) && rtpe_now.tv_sec - c->last_signal <= rtpe_config.timeout)
goto out;
for (it = c->streams.head; it; it = it->next) {
for (__auto_type it = c->streams.head; it; it = it->next) {
ps = it->data;
timestamp = &ps->last_packet;
@ -223,7 +222,7 @@ next:
;
}
for (it = c->medias.head; it; it = it->next) {
for (GList *it = c->medias.head; it; it = it->next) {
struct call_media *media = it->data;
if (rtpe_config.measure_rtp) {
media_update_stats(media);
@ -240,7 +239,7 @@ next:
if (c->ml_deleted)
goto out;
for (it = c->monologues.head; it; it = it->next) {
for (GList *it = c->monologues.head; it; it = it->next) {
ml = it->data;
gettimeofday(&(ml->terminated),NULL);
ml->term_reason = tmp_t_reason;
@ -828,7 +827,7 @@ next_il:
static void __assign_stream_fds(struct call_media *media, sfd_intf_list_q *intf_sfds) {
int reset_ice = 0;
for (GList *k = media->streams.head; k; k = k->next) {
for (__auto_type k = media->streams.head; k; k = k->next) {
struct packet_stream *ps = k->data;
// use opaque pointer to detect changes
@ -895,7 +894,7 @@ static void __rtp_stats_free(void *p) {
struct packet_stream *__packet_stream_new(struct call *call) {
struct packet_stream *stream;
stream = uid_slice_alloc0(stream, &call->streams);
stream = uid_slice_alloc0(stream, &call->streams.q);
mutex_init(&stream->in_lock);
mutex_init(&stream->out_lock);
stream->call = call;
@ -923,12 +922,12 @@ static int __num_media_streams(struct call_media *media, unsigned int num_ports)
while (media->streams.length < num_ports) {
stream = __packet_stream_new(call);
stream->media = media;
g_queue_push_tail(&media->streams, stream);
t_queue_push_tail(&media->streams, stream);
stream->component = media->streams.length;
ret++;
}
g_queue_truncate(&media->streams, num_ports);
t_queue_truncate(&media->streams, num_ports);
return ret;
}
@ -1074,9 +1073,7 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) {
}
void call_media_state_machine(struct call_media *m) {
GList *l;
for (l = m->streams.head; l; l = l->next)
for (__auto_type l = m->streams.head; l; l = l->next)
call_stream_state_machine(l->data);
}
@ -1163,8 +1160,7 @@ void __rtp_stats_update(GHashTable *dst, struct codec_store *cs) {
/* we leave previously added but now removed payload types in place */
}
void free_sink_handler(void *p) {
struct sink_handler *sh = p;
void free_sink_handler(struct sink_handler *sh) {
g_slice_free1(sizeof(*sh), sh);
}
@ -1172,22 +1168,22 @@ void free_sink_handler(void *p) {
* A transfer of flags from the subscription to the sink handlers (sink_handler) is done
* using the __init_streams() through __add_sink_handler().
*/
void __add_sink_handler(GQueue *q, struct packet_stream *sink, const struct sink_attrs *attrs) {
void __add_sink_handler(sink_handler_q *q, struct packet_stream *sink, const struct sink_attrs *attrs) {
struct sink_handler *sh = g_slice_alloc0(sizeof(*sh));
sh->sink = sink;
sh->kernel_output_idx = -1;
if (attrs)
sh->attrs = *attrs;
g_queue_push_tail(q, sh);
t_queue_push_tail(q, sh);
}
// called once before calling __init_streams once for each sink
static void __reset_streams(struct call_media *media) {
for (GList *l = media->streams.head; l; l = l->next) {
for (__auto_type l = media->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
g_queue_clear_full(&ps->rtp_sinks, free_sink_handler);
g_queue_clear_full(&ps->rtcp_sinks, free_sink_handler);
g_queue_clear_full(&ps->rtp_mirrors, free_sink_handler);
t_queue_clear_full(&ps->rtp_sinks, free_sink_handler);
t_queue_clear_full(&ps->rtcp_sinks, free_sink_handler);
t_queue_clear_full(&ps->rtp_mirrors, free_sink_handler);
}
}
@ -1198,12 +1194,11 @@ static void __reset_streams(struct call_media *media) {
*/
static int __init_streams(struct call_media *A, struct call_media *B, const struct stream_params *sp,
const sdp_ng_flags *flags, const struct sink_attrs *attrs) {
GList *la, *lb;
struct packet_stream *a, *ax, *b;
unsigned int port_off = 0;
la = A->streams.head;
lb = B ? B->streams.head : NULL;
__auto_type la = A->streams.head;
__auto_type lb = B ? B->streams.head : NULL;
if (B)
__C_DBG("Sink init media %u -> %u", A->index, B->index);
@ -1919,12 +1914,11 @@ del_next:
static void __disable_streams(struct call_media *media, unsigned int num_ports) {
GList *l;
struct packet_stream *ps;
__num_media_streams(media, num_ports);
for (l = media->streams.head; l; l = l->next) {
for (__auto_type l = media->streams.head; l; l = l->next) {
ps = l->data;
t_queue_clear(&ps->sfds);
ps->selected_sfd = NULL;
@ -2004,10 +1998,9 @@ static void __rtcp_mux_logic(sdp_ng_flags *flags, struct call_media *media,
}
static void __dtls_restart(struct call_media *m) {
GList *l;
struct packet_stream *ps;
for (l = m->streams.head; l; l = l->next) {
for (__auto_type l = m->streams.head; l; l = l->next) {
ps = l->data;
PS_CLEAR(ps, FINGERPRINT_VERIFIED);
dtls_shutdown(ps);
@ -3558,7 +3551,7 @@ next:
}
static void __call_cleanup(struct call *c) {
for (GList *l = c->streams.head; l; l = l->next) {
for (__auto_type l = c->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
send_timer_put(&ps->send_timer);
@ -3569,9 +3562,9 @@ static void __call_cleanup(struct call *c) {
t_queue_clear(&ps->sfds);
crypto_cleanup(&ps->crypto);
g_queue_clear_full(&ps->rtp_sinks, free_sink_handler);
g_queue_clear_full(&ps->rtcp_sinks, free_sink_handler);
g_queue_clear_full(&ps->rtp_mirrors, free_sink_handler);
t_queue_clear_full(&ps->rtp_sinks, free_sink_handler);
t_queue_clear_full(&ps->rtcp_sinks, free_sink_handler);
t_queue_clear_full(&ps->rtp_mirrors, free_sink_handler);
}
for (GList *l = c->medias.head; l; l = l->next) {
@ -3608,7 +3601,7 @@ void call_destroy(struct call *c) {
GList *l, *ll;
struct call_monologue *ml;
struct call_media *md;
GList *k, *o;
GList *k;
const struct rtp_payload_type *rtp_pt;
if (!c) {
@ -3721,7 +3714,7 @@ void call_destroy(struct call *c) {
STR_FMT(&md->format_str));
}
for (o = md->streams.head; o; o = o->next) {
for (__auto_type o = md->streams.head; o; o = o->next) {
ps = o->data;
// stats output only - no cleanups
@ -3855,7 +3848,7 @@ void call_media_free(struct call_media **mdp) {
struct call_media *md = *mdp;
crypto_params_sdes_queue_clear(&md->sdes_in);
crypto_params_sdes_queue_clear(&md->sdes_out);
g_queue_clear(&md->streams);
t_queue_clear(&md->streams);
g_queue_clear(&md->endpoint_maps);
codec_store_cleanup(&md->codecs);
codec_handlers_free(md);
@ -3922,7 +3915,7 @@ static void __call_free(void *p) {
g_hash_table_destroy(c->labels);
while (c->streams.head) {
ps = g_queue_pop_head(&c->streams);
ps = t_queue_pop_head(&c->streams);
crypto_cleanup(&ps->crypto);
t_queue_clear(&ps->sfds);
g_hash_table_destroy(ps->rtp_stats);
@ -4127,8 +4120,8 @@ void __monologue_viabranch(struct call_monologue *ml, const str *viabranch) {
g_hash_table_insert(call->viabranches, &ml->viabranch, ml);
}
static void __unconfirm_sinks(GQueue *q, const char *reason) {
for (GList *l = q->head; l; l = l->next) {
static void __unconfirm_sinks(sink_handler_q *q, const char *reason) {
for (__auto_type l = q->head; l; l = l->next) {
struct sink_handler *sh = l->data;
__stream_unconfirm(sh->sink, reason);
}
@ -4156,7 +4149,7 @@ void __media_unconfirm(struct call_media *media, const char *reason) {
if (!media)
return;
for (GList *m = media->streams.head; m; m = m->next) {
for (__auto_type m = media->streams.head; m; m = m->next) {
struct packet_stream *stream = m->data;
__stream_unconfirm(stream, reason);
__unconfirm_sinks(&stream->rtp_sinks, reason);
@ -4196,8 +4189,8 @@ void dialogue_unconfirm(struct call_monologue *ml, const char *reason) {
}
}
static void __unkernelize_sinks(GQueue *q, const char *reason) {
for (GList *l = q->head; l; l = l->next) {
static void __unkernelize_sinks(sink_handler_q *q, const char *reason) {
for (__auto_type l = q->head; l; l = l->next) {
struct sink_handler *sh = l->data;
unkernelize(sh->sink, reason);
}
@ -4209,7 +4202,7 @@ static void __unkernelize_sinks(GQueue *q, const char *reason) {
void call_media_unkernelize(struct call_media *media, const char *reason) {
if (!media)
return;
for (GList *m = media->streams.head; m; m = m->next) {
for (__auto_type m = media->streams.head; m; m = m->next) {
struct packet_stream *stream = m->data;
unkernelize(stream, reason);
__unkernelize_sinks(&stream->rtp_sinks, reason);
@ -4248,7 +4241,7 @@ void monologue_destroy(struct call_monologue *monologue) {
struct call_media *m = monologue->medias->pdata[i];
if (!m)
continue;
for (GList *k = m->streams.head; k; k = k->next) {
for (__auto_type k = m->streams.head; k; k = k->next) {
struct packet_stream *ps = k->data;
if (ps->selected_sfd && ps->selected_sfd->socket.local.port)
ps->last_local_endpoint = ps->selected_sfd->socket.local;

@ -2263,7 +2263,6 @@ static void ng_stats_media(bencode_item_t *list, const struct call_media *m,
struct call_stats *totals)
{
bencode_item_t *dict, *streams = NULL, *flags;
GList *l;
struct packet_stream *ps;
const struct rtp_payload_type *rtp_pt = NULL;
@ -2304,7 +2303,7 @@ static void ng_stats_media(bencode_item_t *list, const struct call_media *m,
BF_M("generator/sink", GENERATOR);
stats:
for (l = m->streams.head; l; l = l->next) {
for (__auto_type l = m->streams.head; l; l = l->next) {
ps = l->data;
ng_stats_stream(streams, ps, totals);
}

@ -42,7 +42,6 @@ void cdr_update_entry(struct call* c) {
int cdrlinecnt = 0;
g_autoptr(GString) cdr = g_string_new("");
struct call_media *md;
GList *o;
const struct rtp_payload_type *rtp_pt;
struct packet_stream *ps=0;
@ -114,7 +113,7 @@ void cdr_update_entry(struct call* c) {
g_string_append_printf(cdr, "payload_type=unknown, ");
}
for (o = md->streams.head; o; o = o->next) {
for (__auto_type o = md->streams.head; o; o = o->next) {
ps = o->data;
if (PS_ISSET(ps, FALLBACK_RTCP))

@ -692,7 +692,7 @@ static void cli_list_tag_info(struct cli_writer *cw, struct call_monologue *ml)
else
cw->cw_printf(cw, STR_FORMAT "\n", STR_FMT(&rtp_pt->encoding_with_params));
for (GList *o = md->streams.head; o; o = o->next) {
for (__auto_type o = md->streams.head; o; o = o->next) {
ps = o->data;
if (PS_ISSET(ps, FALLBACK_RTCP))

@ -508,7 +508,7 @@ static const char *dtmf_inject_pcm(struct call_media *media, struct call_media *
{
struct call *call = monologue->call;
for (GList *l = ps->rtp_sinks.head; l; l = l->next) {
for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink_ps = sh->sink;
struct call_monologue *sink_ml = sink_ps->media->monologue;

@ -545,7 +545,7 @@ void ice_update(struct ice_agent *ag, struct stream_params *sp, bool allow_reset
/* get our component streams */
ZERO(components);
comps = 0;
for (GList *l = media->streams.head; l; l = l->next)
for (__auto_type l = media->streams.head; l; l = l->next)
components[comps++] = l->data;
if (comps == 2 && (MEDIA_ISSET(media, RTCP_MUX) || !proto_is_rtp(media->protocol)))
components[1] = NULL;
@ -1214,7 +1214,7 @@ found:
static int __check_valid(struct ice_agent *ag) {
struct call_media *media;
struct packet_stream *ps;
GList *l;
packet_stream_list *l;
candidate_pair_list *k;
candidate_pair_q all_compos;
struct ice_candidate_pair *pair;

@ -70,7 +70,7 @@ struct packet_handler_ctx {
str s; // raw input packet
bool kernel_handled; // parse and read contents but do not forward
GQueue *sinks; // where to send output packets to (forward destination)
sink_handler_q *sinks; // where to send output packets to (forward destination)
rewrite_func decrypt_func, encrypt_func; // handlers for decrypt/encrypt
rtcp_filter_func *rtcp_filter;
struct packet_stream *in_srtp, *out_srtp; // SRTP contexts for decrypt/encrypt (relevant for muxed RTCP)
@ -1408,7 +1408,7 @@ static void reset_ps_kernel_stats(struct packet_stream *ps) {
* sink_handler can be NULL.
*/
static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *outputs,
struct packet_stream *stream, struct sink_handler *sink_handler, GQueue *sinks,
struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks,
GList **payload_types)
{
struct rtpengine_destination_info *redi = NULL;
@ -1511,7 +1511,7 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
// only add payload types that are passthrough for all sinks
bool can_kernelize = true;
unsigned int clockrate = 0;
for (GList *k = sinks->head; k; k = k->next) {
for (__auto_type k = sinks->head; k; k = k->next) {
struct sink_handler *ksh = k->data;
struct packet_stream *ksink = ksh->sink;
struct codec_handler *ch = codec_handler_get(media, rs->payload_type,
@ -1630,7 +1630,7 @@ output:
}
// helper function for kernelize()
static void kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs,
struct packet_stream *stream, struct sink_handler *sink_handler, GQueue *sinks,
struct packet_stream *stream, struct sink_handler *sink_handler, sink_handler_q *sinks,
GList **payload_types)
{
struct packet_stream *sink = sink_handler->sink;
@ -1682,21 +1682,21 @@ void kernelize(struct packet_stream *stream) {
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
else {
for (GList *l = stream->rtp_sinks.head; l; l = l->next) {
for (__auto_type l = stream->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
&payload_types);
}
for (GList *l = stream->rtp_mirrors.head; l; l = l->next) {
for (__auto_type l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
&payload_types);
}
// record number of RTP destinations
unsigned int num_rtp_dests = reti.num_destinations;
for (GList *l = stream->rtcp_sinks.head; l; l = l->next) {
for (__auto_type l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks, NULL);
}
@ -1792,7 +1792,7 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
uint32_t ssrc_map_out = ssrc_ctx->ssrc_map_out;
// update opposite outgoing SSRC
for (GList *l = ps->rtp_sinks.head; l; l = l->next) {
for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
@ -1812,7 +1812,7 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
mutex_unlock(&sink->out_lock);
}
for (GList *l = ps->rtcp_sinks.head; l; l = l->next) {
for (__auto_type l = ps->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
@ -1882,11 +1882,11 @@ void __unkernelize(struct packet_stream *p, const char *reason) {
void __reset_sink_handlers(struct packet_stream *ps) {
for (GList *l = ps->rtp_sinks.head; l; l = l->next) {
for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
sh->handler = NULL;
}
for (GList *l = ps->rtcp_sinks.head; l; l = l->next) {
for (__auto_type l = ps->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
sh->handler = NULL;
}
@ -1909,8 +1909,8 @@ static void stream_unconfirm(struct packet_stream *ps, const char *reason) {
__stream_unconfirm(ps, reason);
mutex_unlock(&ps->in_lock);
}
static void unconfirm_sinks(GQueue *q, const char *reason) {
for (GList *l = q->head; l; l = l->next) {
static void unconfirm_sinks(sink_handler_q *q, const char *reason) {
for (__auto_type l = q->head; l; l = l->next) {
struct sink_handler *sh = l->data;
stream_unconfirm(sh->sink, reason);
}
@ -1930,7 +1930,7 @@ void media_update_stats(struct call_media *m) {
if (!kernel.is_open)
return;
for (GList *l = m->streams.head; l; l = l->next) {
for (__auto_type l = m->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
if (!PS_ISSET(ps, RTP))
continue;
@ -2419,7 +2419,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* confirm sinks for unidirectional streams in order to kernelize */
if (MEDIA_ISSET(phc->mp.media, UNIDIRECTIONAL)) {
for (GList *l = phc->sinks->head; l; l = l->next) {
for (__auto_type l = phc->sinks->head; l; l = l->next) {
struct sink_handler *sh = l->data;
PS_SET(sh->sink, CONFIRMED);
}
@ -2874,7 +2874,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
str orig_raw = STR_NULL;
for (GList *sh_link = phc->sinks->head; sh_link; sh_link = sh_link->next) {
for (__auto_type sh_link = phc->sinks->head; sh_link; sh_link = sh_link->next) {
struct sink_handler *sh = sh_link->data;
struct packet_stream *sink = sh->sink;
@ -2951,7 +2951,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
// egress mirroring
if (!phc->rtcp) {
for (GList *mirror_link = phc->mp.stream->rtp_mirrors.head; mirror_link;
for (__auto_type mirror_link = phc->mp.stream->rtp_mirrors.head; mirror_link;
mirror_link = mirror_link->next)
{
struct packet_handler_ctx mirror_phc = *phc;
@ -3075,7 +3075,7 @@ out:
if (!sub_media)
continue;
for (GList *m = sub_media->streams.head; m; m = m->next) {
for (__auto_type m = sub_media->streams.head; m; m = m->next) {
struct packet_stream *sub_ps = m->data;
__unkernelize(sub_ps, "subscriptions modified");
}
@ -3500,7 +3500,7 @@ enum thread_looper_action kernel_stats_updater(void) {
CALL_CLEAR(sfd->call, FOREIGN_MEDIA);
if (!ke->target.non_forwarding && diff_packets_in) {
for (GList *l = ps->rtp_sinks.head; l; l = l->next) {
for (__auto_type l = ps->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;

@ -307,7 +307,7 @@ static void update_flags_proc(struct call *call, bool streams) {
append_meta_chunk_null(call->recording, "FORWARDING %u", CALL_ISSET(call, REC_FORWARDING));
if (!streams)
return;
for (GList *l = call->streams.head; l; l = l->next) {
for (__auto_type l = call->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
append_meta_chunk_null(call->recording, "STREAM %u FORWARDING %u",
ps->unique_id, ML_ISSET(ps->media->monologue, REC_FORWARDING) ? 1 : 0);
@ -356,16 +356,15 @@ void recording_start(struct call *call, const char *prefix, const str *output_de
// through all related objects and initialize the recording stuff. if this
// function is called right at the start of the call, all of the following
// is essentially a no-op
GList *l;
for (l = call->monologues.head; l; l = l->next) {
for (__auto_type l = call->monologues.head; l; l = l->next) {
struct call_monologue *ml = l->data;
recording_setup_monologue(ml);
}
for (l = call->medias.head; l; l = l->next) {
for (__auto_type l = call->medias.head; l; l = l->next) {
struct call_media *m = l->data;
recording_setup_media(m);
}
for (l = call->streams.head; l; l = l->next) {
for (__auto_type l = call->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
recording_setup_stream(ps);
__unkernelize(ps, "recording start");
@ -851,7 +850,7 @@ static void finish_proc(struct call *call, bool discard) {
kernel_del_call(recording->proc.call_idx);
recording->proc.call_idx = UNINIT_IDX;
}
for (GList *l = call->streams.head; l; l = l->next) {
for (__auto_type l = call->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
ps->recording.proc.stream_idx = UNINIT_IDX;
}

@ -40,7 +40,7 @@ typedef union {
stream_fd_q *sfds_q;
GPtrArray *pa;
sfd_intf_list_q *siq;
void *v;
packet_stream_q *psq;
} callback_arg_t __attribute__ ((__transparent_union__));
@ -2352,7 +2352,7 @@ char* redis_encode_json(struct call *c) {
JSON_SET_SIMPLE("tos","%u", (int) c->tos);
JSON_SET_SIMPLE("deleted","%ld", (long int) c->deleted);
JSON_SET_SIMPLE("num_sfds","%u", t_queue_get_length(&c->stream_fds));
JSON_SET_SIMPLE("num_streams","%u", g_queue_get_length(&c->streams));
JSON_SET_SIMPLE("num_streams","%u", t_queue_get_length(&c->streams));
JSON_SET_SIMPLE("num_medias","%u", g_queue_get_length(&c->medias));
JSON_SET_SIMPLE("num_tags","%u", g_queue_get_length(&c->monologues));
JSON_SET_SIMPLE("num_maps","%u", g_queue_get_length(&c->endpoint_maps));
@ -2393,7 +2393,7 @@ char* redis_encode_json(struct call *c) {
} // --- for
for (GList *l = c->streams.head; l; l = l->next) {
for (__auto_type l = c->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
LOCK(&ps->in_lock);
@ -2427,7 +2427,7 @@ char* redis_encode_json(struct call *c) {
} // --- for streams.head
for (GList *l = c->streams.head; l; l = l->next) {
for (__auto_type l = c->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
// XXX these should all go into the above loop
@ -2446,7 +2446,7 @@ char* redis_encode_json(struct call *c) {
snprintf(tmp, sizeof(tmp), "rtp_sinks-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (GList *k = ps->rtp_sinks.head; k; k = k->next) {
for (__auto_type k = ps->rtp_sinks.head; k; k = k->next) {
struct sink_handler *sh = k->data;
struct packet_stream *sink = sh->sink;
JSON_ADD_STRING("%u", sink->unique_id);
@ -2456,7 +2456,7 @@ char* redis_encode_json(struct call *c) {
snprintf(tmp, sizeof(tmp), "rtcp_sinks-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (GList *k = ps->rtcp_sinks.head; k; k = k->next) {
for (__auto_type k = ps->rtcp_sinks.head; k; k = k->next) {
struct sink_handler *sh = k->data;
struct packet_stream *sink = sh->sink;
JSON_ADD_STRING("%u", sink->unique_id);
@ -2608,7 +2608,7 @@ char* redis_encode_json(struct call *c) {
snprintf(tmp, sizeof(tmp), "streams-%u", media->unique_id);
json_builder_set_member_name(builder, tmp);
json_builder_begin_array(builder);
for (GList *m = media->streams.head; m; m = m->next) {
for (__auto_type m = media->streams.head; m; m = m->next) {
struct packet_stream *ps = m->data;
JSON_ADD_STRING("%u", ps->unique_id);
}

@ -1613,8 +1613,8 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
socket_sendto(&ps->selected_sfd->socket, rtcp_packet.s, rtcp_packet.len, &ps->endpoint);
g_string_free(sr, TRUE);
GQueue *sinks = ps->rtp_sinks.length ? &ps->rtp_sinks : &ps->rtcp_sinks;
for (GList *l = sinks->head; l; l = l->next) {
sink_handler_q *sinks = ps->rtp_sinks.length ? &ps->rtp_sinks : &ps->rtcp_sinks;
for (__auto_type l = sinks->head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
struct call_media *other_media = sink->media;

@ -2216,7 +2216,7 @@ static int replace_media_port(struct sdp_chopper *chop, struct sdp_media *media,
}
static int replace_consecutive_port_count(struct sdp_chopper *chop, struct sdp_media *media,
struct packet_stream *ps, GList *j)
struct packet_stream *ps, packet_stream_list *j)
{
int cons;
struct packet_stream *ps_n;
@ -3009,7 +3009,7 @@ static void append_attr_int_to_gstring(GString *s, char * name, const int * valu
g_string_append(s, "\r\n");
}
struct packet_stream *print_rtcp(GString *s, struct call_media *media, GList *rtp_ps_link,
struct packet_stream *print_rtcp(GString *s, struct call_media *media, packet_stream_list *rtp_ps_link,
sdp_ng_flags *flags, struct sdp_media *sdp_media)
{
struct packet_stream *ps = rtp_ps_link->data;
@ -3017,7 +3017,7 @@ struct packet_stream *print_rtcp(GString *s, struct call_media *media, GList *rt
if (ps->rtcp_sibling) {
ps_rtcp = ps->rtcp_sibling;
GList *rtcp_ps_link = rtp_ps_link->next;
__auto_type rtcp_ps_link = rtp_ps_link->next;
if (!rtcp_ps_link)
return NULL;
assert(rtcp_ps_link->data == ps_rtcp);
@ -3074,7 +3074,7 @@ static void print_sdp_session_section(GString *s, sdp_ng_flags *flags,
static struct packet_stream *print_sdp_media_section(GString *s, struct call_media *media,
struct sdp_media *sdp_media,
sdp_ng_flags *flags,
GList *rtp_ps_link,
packet_stream_list *rtp_ps_link,
bool is_active,
bool force_end_of_ice,
bool print_other_attrs)
@ -3136,7 +3136,7 @@ static struct packet_stream *print_sdp_media_section(GString *s, struct call_med
static const char *replace_sdp_media_section(struct sdp_chopper *chop, struct call_media *call_media,
struct sdp_media *sdp_media, GList *rtp_ps_link, sdp_ng_flags *flags,
struct sdp_media *sdp_media, packet_stream_list *rtp_ps_link, sdp_ng_flags *flags,
const bool keep_zero_address, bool print_other_attrs)
{
const char *err = NULL;
@ -3204,7 +3204,6 @@ int sdp_replace(struct sdp_chopper *chop, sdp_sessions_q *sessions, struct call_
{
struct sdp_session *session;
struct sdp_media *sdp_media;
GList *rtp_ps_link;
int sess_conn;
struct call_media *call_media;
struct packet_stream *ps;
@ -3327,7 +3326,7 @@ int sdp_replace(struct sdp_chopper *chop, sdp_sessions_q *sessions, struct call_
if (!call_media)
goto error;
err = "no matching media stream";
rtp_ps_link = call_media->streams.head;
__auto_type rtp_ps_link = call_media->streams.head;
if (!rtp_ps_link)
goto error;
@ -3465,7 +3464,7 @@ int sdp_create(str *out, struct call_monologue *monologue, sdp_ng_flags *flags,
err = "Zero length media stream";
if (!media->streams.length)
goto err;
GList *rtp_ps_link = media->streams.head;
__auto_type rtp_ps_link = media->streams.head;
struct packet_stream *rtp_ps = rtp_ps_link->data;
err = "No selected FD";
if (!rtp_ps->selected_sfd)

@ -120,7 +120,6 @@ void statistics_update_foreignown_inc(struct call* c) {
void statistics_update_oneway(struct call* c) {
struct call_monologue *ml;
struct call_media *md;
GList *o;
GList *l;
if (IS_OWN_CALL(c)) {
@ -137,7 +136,7 @@ void statistics_update_oneway(struct call* c) {
if (!md)
continue;
for (o = md->streams.head; o; o = o->next) {
for (__auto_type o = md->streams.head; o; o = o->next) {
ps = o->data;
if (PS_ISSET(ps, RTP)) {
// --- only RTP is interesting
@ -149,7 +148,7 @@ void statistics_update_oneway(struct call* c) {
continue;
found:;
struct sink_handler *sh = g_queue_peek_head(&ps->rtp_sinks);
struct sink_handler *sh = t_queue_peek_head(&ps->rtp_sinks);
ps2 = sh ? sh->sink : NULL;
if (!ps2)
continue;

@ -390,10 +390,10 @@ struct packet_stream {
struct stream_fd * selected_sfd;
endpoint_t last_local_endpoint;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
GQueue rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
GQueue rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
sink_handler_q rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
sink_handler_q rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
GQueue rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */
sink_handler_q rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */
struct endpoint endpoint; /* LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
struct timeval ep_detect_signal; /* LOCK: out_lock */
@ -461,7 +461,7 @@ struct call_media {
const struct dtls_hash_func *fp_hash_func; /* outgoing */
str tls_id;
GQueue streams; /* normally RTP + RTCP */
packet_stream_q streams; /* normally RTP + RTCP */
GQueue endpoint_maps;
struct codec_store codecs;
@ -670,7 +670,7 @@ struct call {
GHashTable *tags;
GHashTable *viabranches;
GHashTable *labels;
GQueue streams;
packet_stream_q streams;
stream_fd_q stream_fds; /* stream_fd */
GQueue endpoint_maps;
struct dtls_cert *dtls_cert; /* for outgoing */
@ -728,8 +728,8 @@ void __add_media_subscription(struct call_media * which, struct call_media * to,
struct media_subscription *call_get_media_subscription(GHashTable *ht, struct call_media * cm);
struct media_subscription * call_media_subscribed_to_monologue(const struct call_media * media,
const struct call_monologue * monologue);
void free_sink_handler(void *);
void __add_sink_handler(GQueue *, struct packet_stream *, const struct sink_attrs *);
void free_sink_handler(struct sink_handler *);
void __add_sink_handler(sink_handler_q *, struct packet_stream *, const struct sink_attrs *);
void media_subscription_free(void *);
void media_subscriptions_clear(GQueue *q);

@ -126,10 +126,6 @@ INLINE void g_queue_move(GQueue *dst, GQueue *src) {
dst->length += src->length;
g_queue_init(src);
}
INLINE void g_queue_truncate(GQueue *q, unsigned int len) {
while (q->length > len)
g_queue_pop_tail(q);
}
INLINE void g_queue_append(GQueue *dst, const GQueue *src) {
GList *l;
if (!src || !dst)

@ -28,4 +28,10 @@ TYPED_GQUEUE(codec_handlers, struct codec_handler)
struct codec_packet;
TYPED_GQUEUE(codec_packet, struct codec_packet)
struct packet_stream;
TYPED_GQUEUE(packet_stream, struct packet_stream)
struct sink_handler;
TYPED_GQUEUE(sink_handler, struct sink_handler)
#endif

@ -90,8 +90,8 @@ static void __start(const char *file, int line) {
ml_B = __monologue_create(&call);
media_A = call_media_new(&call); // originator
media_B = call_media_new(&call); // output destination
g_queue_push_tail(&media_A->streams, ps_new(&call));
g_queue_push_tail(&media_B->streams, ps_new(&call));
t_queue_push_tail(&media_A->streams, ps_new(&call));
t_queue_push_tail(&media_B->streams, ps_new(&call));
str_init(&ml_A->tag, "tag_A");
str_init(&ml_A->label, "label_A");
media_A->monologue = ml_A;
@ -366,8 +366,8 @@ static void __packet_seq_ts(const char *file, int line, struct call_media *media
static void end(void) {
g_hash_table_destroy(rtp_ts_ht);
g_hash_table_destroy(rtp_seq_ht);
g_queue_clear_full(&media_A->streams, free);
g_queue_clear_full(&media_B->streams, free);
t_queue_clear_full(&media_A->streams, (void (*)(struct packet_stream *)) free);
t_queue_clear_full(&media_B->streams, (void (*)(struct packet_stream *)) free);
call_media_free(&media_A);
call_media_free(&media_B);
bencode_buffer_free(&call.buffer);

Loading…
Cancel
Save