MT#55283 combine in/out_lock

With selected_sfd being protected by in_lock, we pretty much have to
hold at least in_lock everywhere, and end up requiring both locks in
many places. The distinction has become pointless.

Change-Id: Ic0ad976c2d68d9639b9434da7f0e6e9c0d84c185
(cherry picked from commit e03f814855)
(cherry picked from commit 222fcaa4b0)
(cherry picked from commit abf1ad73d2)
mr11.5
Richard Fuchs 4 months ago
parent 10bbdfa9a9
commit 209a27bbdb

@ -191,7 +191,7 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
AUTO_CLEANUP(struct stream_fd *sfd, stream_fd_auto_cleanup) = NULL;
{
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (ps->selected_sfd)
sfd = obj_get(ps->selected_sfd);
}
@ -894,8 +894,7 @@ struct packet_stream *__packet_stream_new(struct call *call) {
struct packet_stream *stream;
stream = uid_slice_alloc0(stream, &call->streams);
mutex_init(&stream->in_lock);
mutex_init(&stream->out_lock);
mutex_init(&stream->lock);
stream->call = call;
atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec);
stream->rtp_stats = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __rtp_stats_free);
@ -1003,21 +1002,19 @@ void call_stream_crypto_reset(struct packet_stream *ps) {
crypto_reset(&ps->crypto);
if (PS_ISSET(ps, RTP)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) {
if (!ps->ssrc_in[u]) // end of list
break;
ps->ssrc_in[u]->srtp_index = 0;
}
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) {
if (!ps->ssrc_out[u]) // end of list
break;
ps->ssrc_out[u]->srtp_index = 0;
}
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->lock);
}
}
@ -1044,16 +1041,16 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) {
}
if (MEDIA_ISSET(media, DTLS)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
struct dtls_connection *d = dtls_ptr(ps->selected_sfd);
if (d && d->init && !d->connected) {
int dret = dtls(ps->selected_sfd, NULL, NULL);
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
if (dret == 1)
call_media_unkernelize(media, "DTLS connected");
return CSS_DTLS;
}
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
}
if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) {

@ -954,14 +954,14 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,};
if (media->streams.head) {
struct packet_stream *ps = media->streams.head->data;
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ps->ssrc_out[u]) // end of list
break;
ssrc_out[u] = ps->ssrc_out[u];
ssrc_ctx_hold(ssrc_out[u]);
}
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->lock);
}
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
@ -2887,11 +2887,10 @@ static void delay_frame_send(struct delay_frame *dframe) {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &dframe->mp))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
LOCK(&sink->lock);
if (media_socket_dequeue(&dframe->mp, sink))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
}
static void delay_frame_flush(struct delay_buffer *dbuf, struct delay_frame *dframe) {
@ -3475,11 +3474,11 @@ static void __dtx_send_later(struct codec_timer *ct) {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &mp_copy))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
if (media_socket_dequeue(&mp_copy, sink))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
}
}

@ -841,7 +841,7 @@ error:
return -1;
}
/* called with call locked in W or R with ps->in_lock held */
/* called with call locked in W or R with ps->lock held */
int dtls(struct stream_fd *sfd, const str *s, const endpoint_t *fsin) {
struct packet_stream *ps = sfd->stream;
int ret;
@ -889,22 +889,16 @@ int dtls(struct stream_fd *sfd, const str *s, const endpoint_t *fsin) {
else if (ret == 1) {
/* connected! */
dret = 1;
mutex_lock(&ps->out_lock); // nested lock!
if (dtls_setup_crypto(ps, d))
{} /* XXX ?? */
mutex_unlock(&ps->out_lock);
if (PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP) && ps->rtcp_sibling
&& MEDIA_ISSET(ps->media, RTCP_MUX)
&& ps->rtcp_sibling != ps)
{
// nested locks!
mutex_lock(&ps->rtcp_sibling->in_lock);
mutex_lock(&ps->rtcp_sibling->out_lock);
LOCK(&ps->rtcp_sibling->lock);
if (dtls_setup_crypto(ps->rtcp_sibling, d))
{} /* XXX ?? */
mutex_unlock(&ps->rtcp_sibling->out_lock);
mutex_unlock(&ps->rtcp_sibling->in_lock);
}
}

@ -1203,7 +1203,7 @@ found:
g_queue_clear(&compo1);
}
/* call(W) or call(R)+agent must be locked - no in_lock or out_lock must be held */
/* call(W) or call(R)+agent must be locked - no ps->lock must be held */
static int __check_valid(struct ice_agent *ag) {
struct call_media *media;
struct packet_stream *ps;
@ -1246,7 +1246,7 @@ static int __check_valid(struct ice_agent *ag) {
ps = l->data;
pair = k->data;
mutex_lock(&ps->out_lock);
LOCK(&ps->lock);
if (memcmp(&ps->endpoint, &pair->remote_candidate->endpoint, sizeof(ps->endpoint))) {
ilogs(ice, LOG_INFO, "ICE negotiated: new peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
@ -1256,9 +1256,7 @@ static int __check_valid(struct ice_agent *ag) {
else
ilogs(ice, LOG_INFO, "ICE negotiated: peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
mutex_unlock(&ps->out_lock);
LOCK(&ps->in_lock);
for (m = ps->sfds.head; m; m = m->next) {
sfd = m->data;
if (sfd->local_intf != pair->local_intf)

@ -319,11 +319,11 @@ static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp)
log_info_call(call);
rwlock_lock_r(&call->master_lock);
mutex_lock(&st->sink->out_lock);
mutex_lock(&st->sink->lock);
__send_timer_send_common(st, cp);
mutex_unlock(&st->sink->out_lock);
mutex_unlock(&st->sink->lock);
__send_timer_rtcp(st, ssrc_out);
ssrc_ctx_put(&ssrc_out);
@ -473,10 +473,10 @@ retry:;
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
// schedule our next run
timeval_add_usec(&mp->next_run, us_dur);
@ -848,10 +848,10 @@ void media_player_add_packet(struct media_player *mp, char *buf, size_t len,
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
timeval_add_usec(&mp->next_run, us_dur);
timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run);

@ -1457,9 +1457,7 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
goto output;
if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
mutex_lock(&stream->out_lock);
__re_address_translate_ep(&reti->expected_src, MEDIA_ISSET(media, ASYMMETRIC) ? &stream->learned_endpoint : &stream->endpoint);
mutex_unlock(&stream->out_lock);
if (PS_ISSET(stream, STRICT_SOURCE))
reti->src_mismatch = MSM_DROP;
else if (PS_ISSET(stream, MEDIA_HANDOVER))
@ -1599,7 +1597,12 @@ output:
reti->pt_filter = 1;
}
mutex_lock(&sink->out_lock);
// XXX nested lock, avoid possible deadlock. should be reworked not to
// require a nested lock
if (sink != stream && mutex_trylock(&sink->lock)) {
g_slice_free1(sizeof(*redi), redi);
return ""; // indicate deadlock
}
__re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint);
__re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local);
@ -1616,7 +1619,8 @@ output:
handler->out->kernel(&redi->output.encrypt, sink);
mutex_unlock(&sink->out_lock);
if (sink != stream)
mutex_unlock(&sink->lock);
if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) {
g_slice_free1(sizeof(*redi), redi);
@ -1633,25 +1637,33 @@ output:
return NULL;
}
// helper function for kernelize()
static void kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs,
static bool kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs,
struct packet_stream *stream, struct sink_handler *sink_handler, GQueue *sinks,
GList **payload_types)
{
struct packet_stream *sink = sink_handler->sink;
if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED))
return;
return true;
const char *err = kernelize_one(reti, outputs, stream, sink_handler, &stream->rtp_sinks,
payload_types);
if (err)
if (err) {
if (!*err)
return false; // indicate deadlock
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
return true;
}
/* called with in_lock held */
void kernelize(struct packet_stream *stream) {
/* called with master_lock held */
static void kernelize(struct packet_stream *stream) {
struct call *call = stream->call;
const char *nk_warn_msg;
struct call_media *media = stream->media;
if (PS_ISSET(stream, KERNELIZED))
while (true) {
LOCK(&stream->lock);
// set flag, return if set already
if (PS_SET(stream, KERNELIZED))
return;
reset_ps_kernel_stats(stream);
@ -1660,7 +1672,6 @@ void kernelize(struct packet_stream *stream) {
goto no_kernel;
if (!kernel.is_wanted)
goto no_kernel;
nk_warn_msg = "interface to kernel module not open";
if (!kernel.is_open)
goto no_kernel_warn;
if (MEDIA_ISSET(media, GENERATOR))
@ -1682,27 +1693,36 @@ void kernelize(struct packet_stream *stream) {
if (num_sinks == 0) {
// add blackhole kernel rule
const char *err = kernelize_one(&reti, &outputs, stream, NULL, NULL, &payload_types);
if (err)
if (err) {
if (!*err)
goto restart;
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
}
else {
for (GList *l = stream->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
&payload_types);
if (!ok)
goto restart;
}
for (GList *l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
&payload_types);
if (!ok)
goto restart;
}
// record number of RTP destinations
unsigned int num_rtp_dests = reti.num_destinations;
for (GList *l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks, NULL);
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks, NULL);
if (!ok)
goto restart;
}
reti.num_rtcp_destinations = reti.num_destinations - num_rtp_dests;
}
@ -1726,15 +1746,25 @@ void kernelize(struct packet_stream *stream) {
}
stream->kernel_time = rtpe_now.tv_sec;
PS_SET(stream, KERNELIZED);
return;
no_kernel_warn:
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg);
ilog(LOG_WARNING, "No support for kernel packet forwarding available "
"(interface to kernel module not open)");
no_kernel:
PS_SET(stream, KERNELIZED);
stream->kernel_time = rtpe_now.tv_sec;
PS_SET(stream, NO_KERNEL_SUPPORT);
return;
restart: // handle detected deadlock
g_list_free(payload_types);
while ((redi = g_queue_pop_head(&outputs)))
g_slice_free1(sizeof(*redi), redi);
// try again, releases stream->lock
}
}
// must be called with appropriate locks (master lock and/or in/out_lock)
@ -1800,9 +1830,6 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
if (mutex_trylock(&sink->out_lock))
continue; // will have to skip this
ssrc_ctx = __hunt_ssrc_ctx(ssrc, sink->ssrc_out, u);
if (!ssrc_ctx)
ssrc_ctx = __hunt_ssrc_ctx(ssrc_map_out, sink->ssrc_out, u);
@ -1812,17 +1839,12 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
atomic64_add(&ssrc_ctx->packets, stats_info->ssrc_stats[u].basic_stats.packets);
atomic64_add(&ssrc_ctx->octets, stats_info->ssrc_stats[u].basic_stats.bytes);
}
mutex_unlock(&sink->out_lock);
}
for (GList *l = ps->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
if (mutex_trylock(&sink->out_lock))
continue; // will have to skip this
ssrc_ctx = __hunt_ssrc_ctx(ssrc, sink->ssrc_out, u);
if (!ssrc_ctx)
ssrc_ctx = __hunt_ssrc_ctx(ssrc_map_out, sink->ssrc_out, u);
@ -1833,34 +1855,25 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
= stats_info->last_rtcp_index[sh->kernel_output_idx][u];
}
}
mutex_unlock(&sink->out_lock);
}
}
}
// must be called with appropriate locks (master lock and/or in_lock)
static void __stream_update_stats(struct packet_stream *ps, bool have_in_lock) {
if (!have_in_lock)
mutex_lock(&ps->in_lock);
// called with master_lock hekd
static void __stream_update_stats(struct packet_stream *ps) {
LOCK(&ps->lock);
struct rtpengine_command_stats stats_info;
__re_address_translate_ep(&stats_info.local, &ps->selected_sfd->socket.local);
if (kernel_update_stats(&stats_info)) {
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
if (kernel_update_stats(&stats_info))
return;
}
__stream_consume_stats(ps, &stats_info.stats);
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
}
/* must be called with in_lock held or call->master_lock held in W */
/* must be called with ps->lock held or call->master_lock held in W */
void __unkernelize(struct packet_stream *p, const char *reason) {
reset_ps_kernel_stats(p);
@ -1909,9 +1922,8 @@ void __stream_unconfirm(struct packet_stream *ps, const char *reason) {
static void stream_unconfirm(struct packet_stream *ps, const char *reason) {
if (!ps)
return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__stream_unconfirm(ps, reason);
mutex_unlock(&ps->in_lock);
}
static void unconfirm_sinks(GQueue *q, const char *reason) {
for (GList *l = q->head; l; l = l->next) {
@ -1922,9 +1934,8 @@ static void unconfirm_sinks(GQueue *q, const char *reason) {
void unkernelize(struct packet_stream *ps, const char *reason) {
if (!ps)
return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__unkernelize(ps, reason);
mutex_unlock(&ps->in_lock);
}
// master lock held in R
@ -1945,7 +1956,7 @@ void media_update_stats(struct call_media *m) {
if (!ps->selected_sfd)
continue;
__stream_update_stats(ps, false);
__stream_update_stats(ps);
}
}
@ -1984,7 +1995,7 @@ err:
return &__sh_noop;
}
/* must be called with call->master_lock held in R, and in->in_lock held */
/* must be called with call->master_lock held in R, and in->lock held */
// `sh` can be null
static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) {
const struct transport_protocol *in_proto, *out_proto;
@ -2091,7 +2102,7 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs,
struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash)
{
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in,
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->lock, in_srtp->ssrc_in,
&in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress");
}
// check and update output SSRC pointers
@ -2101,12 +2112,12 @@ static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ss
bool ssrc_change)
{
if (ssrc_change)
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock,
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress (mapped)");
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock,
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress (direct)");
@ -2131,14 +2142,13 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
}
}
mutex_lock(&phc->mp.stream->in_lock);
LOCK(&phc->mp.stream->lock);
int ret = dtls(phc->mp.sfd, &phc->s, &phc->mp.fsin);
if (ret == 1) {
phc->unkernelize = "DTLS connected";
phc->unkernelize_subscriptions = true;
ret = 0;
}
mutex_unlock(&phc->mp.stream->in_lock);
if (!ret)
return 0;
}
@ -2163,7 +2173,7 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
#if RTP_LOOP_PROTECT
// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet
static int media_loop_detect(struct packet_handler_ctx *phc) {
mutex_lock(&phc->mp.stream->in_lock);
mutex_lock(&phc->mp.stream->lock);
for (int i = 0; i < RTP_LOOP_PACKETS; i++) {
if (phc->mp.stream->lp_buf[i].len != phc->s.len)
@ -2177,7 +2187,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) {
"to avoid potential loop",
RTP_LOOP_MAX_COUNT,
FMT_M(endpoint_print_buf(&phc->mp.fsin)));
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return -1;
}
@ -2191,7 +2201,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) {
memcpy(phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT));
phc->mp.stream->lp_idx = (phc->mp.stream->lp_idx + 1) % RTP_LOOP_PACKETS;
loop_ok:
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return 0;
}
@ -2310,7 +2320,7 @@ static void media_packet_rtp_out(struct packet_handler_ctx *phc, struct sink_han
static int media_packet_decrypt(struct packet_handler_ctx *phc)
{
mutex_lock(&phc->in_srtp->in_lock);
mutex_lock(&phc->in_srtp->lock);
struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL;
const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh);
@ -2331,7 +2341,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
phc->mp.payload.len -= ori_s.len - phc->s.len;
}
mutex_unlock(&phc->in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->lock);
if (ret == 1) {
phc->update = true;
@ -2341,7 +2351,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
}
static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh)
{
mutex_lock(&phc->in_srtp->in_lock);
mutex_lock(&phc->in_srtp->lock);
__determine_handler(phc->in_srtp, sh);
// XXX use an array with index instead of if/else
@ -2351,7 +2361,7 @@ static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink
phc->encrypt_func = sh->handler->out->rtcp_crypt;
phc->rtcp_filter = sh->handler->in->rtcp_filter;
}
mutex_unlock(&phc->in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->lock);
}
int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp) {
@ -2360,7 +2370,7 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
if (!encrypt_func)
return 0x00;
mutex_lock(&out->out_lock);
LOCK(&out->lock);
for (GList *l = mp->packets_out.head; l; l = l->next) {
struct codec_packet *p = l->data;
@ -2375,8 +2385,6 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
ret |= 0x01;
}
mutex_unlock(&out->out_lock);
return ret;
}
@ -2403,7 +2411,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
struct endpoint endpoint;
bool ret = false;
mutex_lock(&phc->mp.stream->in_lock);
mutex_lock(&phc->mp.stream->lock);
/* we're OK to (potentially) use the source address of this packet as destination
* in the other direction. */
@ -2432,10 +2440,8 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* do not pay attention to source addresses of incoming packets for asymmetric streams */
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) || phc->mp.stream->el_flags == EL_OFF) {
PS_SET(phc->mp.stream, CONFIRMED);
mutex_lock(&phc->mp.stream->out_lock);
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) && !phc->mp.stream->learned_endpoint.address.family)
phc->mp.stream->learned_endpoint = phc->mp.fsin;
mutex_unlock(&phc->mp.stream->out_lock);
}
/* confirm sinks for unidirectional streams in order to kernelize */
@ -2451,7 +2457,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* see if we need to compare the source address with the known endpoint */
if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint = phc->mp.fsin;
mutex_lock(&phc->mp.stream->out_lock);
struct endpoint *ps_endpoint = MEDIA_ISSET(phc->mp.media, ASYMMETRIC) ?
&phc->mp.stream->learned_endpoint : &phc->mp.stream->endpoint;
@ -2467,8 +2472,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
goto update_addr;
}
mutex_unlock(&phc->mp.stream->out_lock);
if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) {
ilog(LOG_INFO | LOG_FLAG_LIMIT, "Drop due to strict-source attribute; "
"got %s%s:%d%s, "
@ -2542,7 +2545,6 @@ confirm_now:
PS_SET(phc->mp.stream, CONFIRMED);
update_peerinfo:
mutex_lock(&phc->mp.stream->out_lock);
// if we're during the wait time, check the received address against the previously
// learned address. if they're the same, ignore this packet for learning purposes
if (!wait_time || !phc->mp.stream->learned_endpoint.address.family ||
@ -2561,8 +2563,6 @@ update_peerinfo:
}
}
update_addr:
mutex_unlock(&phc->mp.stream->out_lock);
/* check the destination address of the received packet against what we think our
* local interface to use is */
if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) {
@ -2585,7 +2585,7 @@ update_addr:
}
out:
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return ret;
}
@ -2605,9 +2605,7 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) {
if (ML_ISSET(phc->mp.media->monologue, DTMF_INJECTION_ACTIVE))
return;
mutex_lock(&phc->mp.stream->in_lock);
kernelize(phc->mp.stream);
mutex_unlock(&phc->mp.stream->in_lock);
}
@ -2996,19 +2994,19 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (ret)
goto next_mirror;
mutex_lock(&mirror_sink->out_lock);
mutex_lock(&mirror_sink->lock);
if (!mirror_sink->advertised_endpoint.port
|| (is_addr_unspecified(&mirror_sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&mirror_sink->advertised_endpoint)))
{
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
goto next_mirror;
}
media_socket_dequeue(&mirror_phc.mp, mirror_sink);
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
next_mirror:
media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left
@ -3034,13 +3032,13 @@ next_mirror:
}
}
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
if (!sink->advertised_endpoint.port
|| (is_addr_unspecified(&sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&sink->advertised_endpoint)))
{
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
goto next;
}
@ -3049,18 +3047,18 @@ next_mirror:
else
ret = media_socket_dequeue(&phc->mp, NULL);
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
if (ret == 0)
goto next;
err_next:
ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno));
mutex_lock(&sink->in_lock);
atomic64_inc(&sink->stats_in.errors);
mutex_lock(&sink->lock);
if (sink->selected_sfd)
atomic64_inc(&sink->selected_sfd->local_intf->stats.out.errors);
mutex_unlock(&sink->in_lock);
mutex_unlock(&sink->lock);
RTPE_STATS_INC(errors_user);
goto next;
@ -3537,7 +3535,7 @@ enum thread_looper_action kernel_stats_updater(void) {
atomic64_set(&sink->kernel_stats_out.packets, stats_o->packets);
atomic64_set(&sink->kernel_stats_out.errors, stats_o->errors);
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
break;
@ -3565,10 +3563,10 @@ enum thread_looper_action kernel_stats_updater(void) {
update = true;
}
}
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
}
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
@ -3594,7 +3592,7 @@ enum thread_looper_action kernel_stats_updater(void) {
update = true;
}
}
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
}
rwlock_unlock_r(&sfd->call->master_lock);

@ -336,7 +336,7 @@ static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *jso
static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
struct stream_fd *sfd = ps->selected_sfd;
if (sfd) {
@ -364,10 +364,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_object(json);
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
json_builder_set_member_name(json, "egress");
json_builder_begin_object(json);
mqtt_stream_stats_dir(&ps->stats_out, json);
@ -384,8 +380,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_array(json);
json_builder_end_object(json);
mutex_unlock(&ps->out_lock);
}

@ -2405,8 +2405,7 @@ char* redis_encode_json(struct call *c) {
for (GList *l = c->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
LOCK(&ps->in_lock);
LOCK(&ps->out_lock);
LOCK(&ps->lock);
snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);
@ -2440,8 +2439,7 @@ char* redis_encode_json(struct call *c) {
struct packet_stream *ps = l->data;
// XXX these should all go into the above loop
LOCK(&ps->in_lock);
LOCK(&ps->out_lock);
LOCK(&ps->lock);
snprintf(tmp, sizeof(tmp), "stream_sfds-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);

@ -1575,7 +1575,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
media_update_stats(media);
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (!ps->selected_sfd || !rtcp_ps->selected_sfd)
return;
@ -1604,8 +1604,6 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
str rtcp_packet = STR_INIT_LEN(sr->str, sr->len);
LOCK(&ps->out_lock);
const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP],
media, true);

@ -627,7 +627,7 @@ void ssrc_collect_metrics(struct call_media *media) {
e->jitter = e->jitter * 1000 / rpt->clock_rate;
}
LOCK(&ps->in_lock);
LOCK(&ps->lock);
RTPE_SAMPLE_SFD(jitter_measured, e->jitter, ps->selected_sfd);
}
}

@ -220,8 +220,7 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
ps = tg->t38_media->streams.head->data;
struct stream_fd *sfd = NULL;
if (ps) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
sfd = ps->selected_sfd;
}
if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) {
@ -233,10 +232,8 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
else
ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of "
"socket or stream");
if (ps) {
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->in_lock);
}
if (ps)
mutex_unlock(&ps->lock);
g_string_free(s, TRUE);

@ -361,18 +361,10 @@ struct loop_protector {
* This is done through the various bit flags.
*/
struct packet_stream {
/* Both locks valid only with call->master_lock held in R.
/* Lock valid only with call->master_lock held in R.
* Preempted by call->master_lock held in W.
* If both in/out are to be locked, in_lock must be locked first.
*
* The in_lock protects fields relevant to packet reception on that stream,
* meanwhile the out_lock protects fields relevant to packet egress.
*
* This allows packet handling on multiple ports and streams belonging
* to the same call to happen at the same time.
*/
mutex_t in_lock,
out_lock;
mutex_t lock;
struct call_media *media; /* RO */
struct call *call; /* RO */
@ -381,23 +373,23 @@ struct packet_stream {
struct recording_stream recording; /* LOCK: call->master_lock */
GQueue sfds; /* LOCK: call->master_lock */
struct stream_fd * selected_sfd; // LOCK: in_lock
struct stream_fd * selected_sfd; // LOCK: ps->lock
endpoint_t last_local_endpoint;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
struct dtls_connection ice_dtls; /* LOCK: ps->lock */
GQueue rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
GQueue rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
GQueue rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */
struct endpoint endpoint; /* LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
time_t ep_detect_signal; /* LOCK: out_lock */
struct endpoint endpoint; /* LOCK: ps->lock */
struct endpoint detected_endpoints[4]; /* LOCK: ps->lock */
time_t ep_detect_signal; /* LOCK: ps->lock */
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */
unsigned int ssrc_in_idx, /* LOCK: in_lock */
ssrc_out_idx; /* LOCK: out_lock */
struct endpoint learned_endpoint; /* LOCK: ps->lock */
struct crypto_context crypto; /* OUT direction, LOCK: ps->lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: ps->lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: ps->lock */
unsigned int ssrc_in_idx, /* LOCK: ps->lock */
ssrc_out_idx; /* LOCK: ps->lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
time_t kernel_time;
@ -414,15 +406,15 @@ struct packet_stream {
enum endpoint_learning el_flags;
#if RTP_LOOP_PROTECT
/* LOCK: in_lock: */
/* LOCK: ps->lock: */
unsigned int lp_idx;
struct loop_protector lp_buf[RTP_LOOP_PACKETS];
unsigned int lp_count;
#endif
X509 *dtls_cert; /* LOCK: in_lock */
X509 *dtls_cert; /* LOCK: ps->lock */
/* in_lock must be held for SETTING these: */
/* ps->lock must be held for SETTING these: */
volatile unsigned int ps_flags;
};

@ -311,7 +311,6 @@ INLINE int open_intf_socket(socket_t *r, unsigned int port, const struct local_i
return open_socket(r, SOCK_DGRAM, port, &lif->spec->local_address.addr);
}
void kernelize(struct packet_stream *);
void __unkernelize(struct packet_stream *, const char *);
void unkernelize(struct packet_stream *, const char *);
void __stream_unconfirm(struct packet_stream *, const char *);

Loading…
Cancel
Save