MT#55283 combine in/out_lock

With selected_sfd being protected by in_lock, we pretty much have to
hold at least in_lock everywhere, and end up requiring both locks in
many places. The distinction has become pointless.

Change-Id: Ic0ad976c2d68d9639b9434da7f0e6e9c0d84c185
(cherry picked from commit e03f814855)
(cherry picked from commit 222fcaa4b0)
(cherry picked from commit abf1ad73d2)
(cherry picked from commit 209a27bbdb)
mr11.5.1
Richard Fuchs 11 months ago
parent bd81f2a3f4
commit 3434d23804

@ -191,7 +191,7 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
AUTO_CLEANUP(struct stream_fd *sfd, stream_fd_auto_cleanup) = NULL;
{
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (ps->selected_sfd)
sfd = obj_get(ps->selected_sfd);
}
@ -894,8 +894,7 @@ struct packet_stream *__packet_stream_new(struct call *call) {
struct packet_stream *stream;
stream = uid_slice_alloc0(stream, &call->streams);
mutex_init(&stream->in_lock);
mutex_init(&stream->out_lock);
mutex_init(&stream->lock);
stream->call = call;
atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec);
stream->rtp_stats = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, __rtp_stats_free);
@ -1003,21 +1002,19 @@ void call_stream_crypto_reset(struct packet_stream *ps) {
crypto_reset(&ps->crypto);
if (PS_ISSET(ps, RTP)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_in); u++) {
if (!ps->ssrc_in[u]) // end of list
break;
ps->ssrc_in[u]->srtp_index = 0;
}
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ps->ssrc_out); u++) {
if (!ps->ssrc_out[u]) // end of list
break;
ps->ssrc_out[u]->srtp_index = 0;
}
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->lock);
}
}
@ -1044,16 +1041,16 @@ enum call_stream_state call_stream_state_machine(struct packet_stream *ps) {
}
if (MEDIA_ISSET(media, DTLS)) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
struct dtls_connection *d = dtls_ptr(ps->selected_sfd);
if (d && d->init && !d->connected) {
int dret = dtls(ps->selected_sfd, NULL, NULL);
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
if (dret == 1)
call_media_unkernelize(media, "DTLS connected");
return CSS_DTLS;
}
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
}
if (PS_ISSET(ps, PIERCE_NAT) && PS_ISSET(ps, FILLED) && !PS_ISSET(ps, CONFIRMED)) {

@ -954,14 +954,14 @@ static void __rtcp_timer_run(struct codec_timer *ct) {
struct ssrc_ctx *ssrc_out[RTPE_NUM_SSRC_TRACKING] = {NULL,};
if (media->streams.head) {
struct packet_stream *ps = media->streams.head->data;
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
if (!ps->ssrc_out[u]) // end of list
break;
ssrc_out[u] = ps->ssrc_out[u];
ssrc_ctx_hold(ssrc_out[u]);
}
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->lock);
}
for (unsigned int u = 0; u < RTPE_NUM_SSRC_TRACKING; u++) {
@ -2887,11 +2887,10 @@ static void delay_frame_send(struct delay_frame *dframe) {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &dframe->mp))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
LOCK(&sink->lock);
if (media_socket_dequeue(&dframe->mp, sink))
ilogs(transcoding, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
}
}
static void delay_frame_flush(struct delay_buffer *dbuf, struct delay_frame *dframe) {
@ -3475,11 +3474,11 @@ static void __dtx_send_later(struct codec_timer *ct) {
if (sh->handler && media_packet_encrypt(sh->handler->out->rtp_crypt, sink, &mp_copy))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT, "Error encrypting buffered RTP media");
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
if (media_socket_dequeue(&mp_copy, sink))
ilogs(dtx, LOG_ERR | LOG_FLAG_LIMIT,
"Error sending buffered media to RTP sink");
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
}
}

@ -841,7 +841,7 @@ error:
return -1;
}
/* called with call locked in W or R with ps->in_lock held */
/* called with call locked in W or R with ps->lock held */
int dtls(struct stream_fd *sfd, const str *s, const endpoint_t *fsin) {
struct packet_stream *ps = sfd->stream;
int ret;
@ -889,22 +889,16 @@ int dtls(struct stream_fd *sfd, const str *s, const endpoint_t *fsin) {
else if (ret == 1) {
/* connected! */
dret = 1;
mutex_lock(&ps->out_lock); // nested lock!
if (dtls_setup_crypto(ps, d))
{} /* XXX ?? */
mutex_unlock(&ps->out_lock);
if (PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP) && ps->rtcp_sibling
&& MEDIA_ISSET(ps->media, RTCP_MUX)
&& ps->rtcp_sibling != ps)
{
// nested locks!
mutex_lock(&ps->rtcp_sibling->in_lock);
mutex_lock(&ps->rtcp_sibling->out_lock);
LOCK(&ps->rtcp_sibling->lock);
if (dtls_setup_crypto(ps->rtcp_sibling, d))
{} /* XXX ?? */
mutex_unlock(&ps->rtcp_sibling->out_lock);
mutex_unlock(&ps->rtcp_sibling->in_lock);
}
}

@ -1203,7 +1203,7 @@ found:
g_queue_clear(&compo1);
}
/* call(W) or call(R)+agent must be locked - no in_lock or out_lock must be held */
/* call(W) or call(R)+agent must be locked - no ps->lock must be held */
static int __check_valid(struct ice_agent *ag) {
struct call_media *media;
struct packet_stream *ps;
@ -1246,7 +1246,7 @@ static int __check_valid(struct ice_agent *ag) {
ps = l->data;
pair = k->data;
mutex_lock(&ps->out_lock);
LOCK(&ps->lock);
if (memcmp(&ps->endpoint, &pair->remote_candidate->endpoint, sizeof(ps->endpoint))) {
ilogs(ice, LOG_INFO, "ICE negotiated: new peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
@ -1256,9 +1256,7 @@ static int __check_valid(struct ice_agent *ag) {
else
ilogs(ice, LOG_INFO, "ICE negotiated: peer for component %u is %s%s%s", ps->component,
FMT_M(endpoint_print_buf(&pair->remote_candidate->endpoint)));
mutex_unlock(&ps->out_lock);
LOCK(&ps->in_lock);
for (m = ps->sfds.head; m; m = m->next) {
sfd = m->data;
if (sfd->local_intf != pair->local_intf)

@ -319,11 +319,11 @@ static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp)
log_info_call(call);
rwlock_lock_r(&call->master_lock);
mutex_lock(&st->sink->out_lock);
mutex_lock(&st->sink->lock);
__send_timer_send_common(st, cp);
mutex_unlock(&st->sink->out_lock);
mutex_unlock(&st->sink->lock);
__send_timer_rtcp(st, ssrc_out);
ssrc_ctx_put(&ssrc_out);
@ -473,10 +473,10 @@ retry:;
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
// schedule our next run
timeval_add_usec(&mp->next_run, us_dur);
@ -848,10 +848,10 @@ void media_player_add_packet(struct media_player *mp, char *buf, size_t len,
media_packet_encrypt(mp->crypt_handler->out->rtp_crypt, mp->sink, &packet);
mutex_lock(&mp->sink->out_lock);
mutex_lock(&mp->sink->lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
mutex_unlock(&mp->sink->out_lock);
mutex_unlock(&mp->sink->lock);
timeval_add_usec(&mp->next_run, us_dur);
timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run);

@ -1457,9 +1457,7 @@ static const char *kernelize_one(struct rtpengine_target_info *reti, GQueue *out
goto output;
if (PS_ISSET2(stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
mutex_lock(&stream->out_lock);
__re_address_translate_ep(&reti->expected_src, MEDIA_ISSET(media, ASYMMETRIC) ? &stream->learned_endpoint : &stream->endpoint);
mutex_unlock(&stream->out_lock);
if (PS_ISSET(stream, STRICT_SOURCE))
reti->src_mismatch = MSM_DROP;
else if (PS_ISSET(stream, MEDIA_HANDOVER))
@ -1599,7 +1597,12 @@ output:
reti->pt_filter = 1;
}
mutex_lock(&sink->out_lock);
// XXX nested lock, avoid possible deadlock. should be reworked not to
// require a nested lock
if (sink != stream && mutex_trylock(&sink->lock)) {
g_slice_free1(sizeof(*redi), redi);
return ""; // indicate deadlock
}
__re_address_translate_ep(&redi->output.dst_addr, &sink->endpoint);
__re_address_translate_ep(&redi->output.src_addr, &sink->selected_sfd->socket.local);
@ -1616,7 +1619,8 @@ output:
handler->out->kernel(&redi->output.encrypt, sink);
mutex_unlock(&sink->out_lock);
if (sink != stream)
mutex_unlock(&sink->lock);
if (!redi->output.encrypt.cipher || !redi->output.encrypt.hmac) {
g_slice_free1(sizeof(*redi), redi);
@ -1633,25 +1637,33 @@ output:
return NULL;
}
// helper function for kernelize()
static void kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs,
static bool kernelize_one_sink_handler(struct rtpengine_target_info *reti, GQueue *outputs,
struct packet_stream *stream, struct sink_handler *sink_handler, GQueue *sinks,
GList **payload_types)
{
struct packet_stream *sink = sink_handler->sink;
if (PS_ISSET(sink, NAT_WAIT) && !PS_ISSET(sink, RECEIVED))
return;
return true;
const char *err = kernelize_one(reti, outputs, stream, sink_handler, &stream->rtp_sinks,
payload_types);
if (err)
if (err) {
if (!*err)
return false; // indicate deadlock
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
return true;
}
/* called with in_lock held */
void kernelize(struct packet_stream *stream) {
/* called with master_lock held */
static void kernelize(struct packet_stream *stream) {
struct call *call = stream->call;
const char *nk_warn_msg;
struct call_media *media = stream->media;
if (PS_ISSET(stream, KERNELIZED))
while (true) {
LOCK(&stream->lock);
// set flag, return if set already
if (PS_SET(stream, KERNELIZED))
return;
reset_ps_kernel_stats(stream);
@ -1660,7 +1672,6 @@ void kernelize(struct packet_stream *stream) {
goto no_kernel;
if (!kernel.is_wanted)
goto no_kernel;
nk_warn_msg = "interface to kernel module not open";
if (!kernel.is_open)
goto no_kernel_warn;
if (MEDIA_ISSET(media, GENERATOR))
@ -1682,27 +1693,36 @@ void kernelize(struct packet_stream *stream) {
if (num_sinks == 0) {
// add blackhole kernel rule
const char *err = kernelize_one(&reti, &outputs, stream, NULL, NULL, &payload_types);
if (err)
if (err) {
if (!*err)
goto restart;
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", err);
}
}
else {
for (GList *l = stream->rtp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
if (sh->attrs.block_media)
continue;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
&payload_types);
if (!ok)
goto restart;
}
for (GList *l = stream->rtp_mirrors.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks,
&payload_types);
if (!ok)
goto restart;
}
// record number of RTP destinations
unsigned int num_rtp_dests = reti.num_destinations;
for (GList *l = stream->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks, NULL);
bool ok = kernelize_one_sink_handler(&reti, &outputs, stream, sh, &stream->rtp_sinks, NULL);
if (!ok)
goto restart;
}
reti.num_rtcp_destinations = reti.num_destinations - num_rtp_dests;
}
@ -1726,15 +1746,25 @@ void kernelize(struct packet_stream *stream) {
}
stream->kernel_time = rtpe_now.tv_sec;
PS_SET(stream, KERNELIZED);
return;
no_kernel_warn:
ilog(LOG_WARNING, "No support for kernel packet forwarding available (%s)", nk_warn_msg);
ilog(LOG_WARNING, "No support for kernel packet forwarding available "
"(interface to kernel module not open)");
no_kernel:
PS_SET(stream, KERNELIZED);
stream->kernel_time = rtpe_now.tv_sec;
PS_SET(stream, NO_KERNEL_SUPPORT);
return;
restart: // handle detected deadlock
g_list_free(payload_types);
while ((redi = g_queue_pop_head(&outputs)))
g_slice_free1(sizeof(*redi), redi);
// try again, releases stream->lock
}
}
// must be called with appropriate locks (master lock and/or in/out_lock)
@ -1800,9 +1830,6 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
if (mutex_trylock(&sink->out_lock))
continue; // will have to skip this
ssrc_ctx = __hunt_ssrc_ctx(ssrc, sink->ssrc_out, u);
if (!ssrc_ctx)
ssrc_ctx = __hunt_ssrc_ctx(ssrc_map_out, sink->ssrc_out, u);
@ -1812,17 +1839,12 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
atomic64_add(&ssrc_ctx->packets, stats_info->ssrc_stats[u].basic_stats.packets);
atomic64_add(&ssrc_ctx->octets, stats_info->ssrc_stats[u].basic_stats.bytes);
}
mutex_unlock(&sink->out_lock);
}
for (GList *l = ps->rtcp_sinks.head; l; l = l->next) {
struct sink_handler *sh = l->data;
struct packet_stream *sink = sh->sink;
if (mutex_trylock(&sink->out_lock))
continue; // will have to skip this
ssrc_ctx = __hunt_ssrc_ctx(ssrc, sink->ssrc_out, u);
if (!ssrc_ctx)
ssrc_ctx = __hunt_ssrc_ctx(ssrc_map_out, sink->ssrc_out, u);
@ -1833,34 +1855,25 @@ static void __stream_consume_stats(struct packet_stream *ps, const struct rtpeng
= stats_info->last_rtcp_index[sh->kernel_output_idx][u];
}
}
mutex_unlock(&sink->out_lock);
}
}
}
// must be called with appropriate locks (master lock and/or in_lock)
static void __stream_update_stats(struct packet_stream *ps, bool have_in_lock) {
if (!have_in_lock)
mutex_lock(&ps->in_lock);
// called with master_lock hekd
static void __stream_update_stats(struct packet_stream *ps) {
LOCK(&ps->lock);
struct rtpengine_command_stats stats_info;
__re_address_translate_ep(&stats_info.local, &ps->selected_sfd->socket.local);
if (kernel_update_stats(&stats_info)) {
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
if (kernel_update_stats(&stats_info))
return;
}
__stream_consume_stats(ps, &stats_info.stats);
if (!have_in_lock)
mutex_unlock(&ps->in_lock);
}
/* must be called with in_lock held or call->master_lock held in W */
/* must be called with ps->lock held or call->master_lock held in W */
void __unkernelize(struct packet_stream *p, const char *reason) {
reset_ps_kernel_stats(p);
@ -1909,9 +1922,8 @@ void __stream_unconfirm(struct packet_stream *ps, const char *reason) {
static void stream_unconfirm(struct packet_stream *ps, const char *reason) {
if (!ps)
return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__stream_unconfirm(ps, reason);
mutex_unlock(&ps->in_lock);
}
static void unconfirm_sinks(GQueue *q, const char *reason) {
for (GList *l = q->head; l; l = l->next) {
@ -1922,9 +1934,8 @@ static void unconfirm_sinks(GQueue *q, const char *reason) {
void unkernelize(struct packet_stream *ps, const char *reason) {
if (!ps)
return;
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
__unkernelize(ps, reason);
mutex_unlock(&ps->in_lock);
}
// master lock held in R
@ -1945,7 +1956,7 @@ void media_update_stats(struct call_media *m) {
if (!ps->selected_sfd)
continue;
__stream_update_stats(ps, false);
__stream_update_stats(ps);
}
}
@ -1984,7 +1995,7 @@ err:
return &__sh_noop;
}
/* must be called with call->master_lock held in R, and in->in_lock held */
/* must be called with call->master_lock held in R, and in->lock held */
// `sh` can be null
static const struct streamhandler *__determine_handler(struct packet_stream *in, struct sink_handler *sh) {
const struct transport_protocol *in_proto, *out_proto;
@ -2091,7 +2102,7 @@ static const char *__stream_ssrc_inout(struct packet_stream *ps, uint32_t ssrc,
static const char *__stream_ssrc_in(struct packet_stream *in_srtp, uint32_t ssrc_bs,
struct ssrc_ctx **ssrc_in_p, struct ssrc_hash *ssrc_hash)
{
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->in_lock, in_srtp->ssrc_in,
return __stream_ssrc_inout(in_srtp, ntohl(ssrc_bs), &in_srtp->lock, in_srtp->ssrc_in,
&in_srtp->ssrc_in_idx, 0, ssrc_in_p, ssrc_hash, SSRC_DIR_INPUT, "ingress");
}
// check and update output SSRC pointers
@ -2101,12 +2112,12 @@ static const char *__stream_ssrc_out(struct packet_stream *out_srtp, uint32_t ss
bool ssrc_change)
{
if (ssrc_change)
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->out_lock,
return __stream_ssrc_inout(out_srtp, ssrc_in->ssrc_map_out, &out_srtp->lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, ntohl(ssrc_bs), ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress (mapped)");
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->out_lock,
return __stream_ssrc_inout(out_srtp, ntohl(ssrc_bs), &out_srtp->lock,
out_srtp->ssrc_out,
&out_srtp->ssrc_out_idx, 0, ssrc_out_p, ssrc_hash, SSRC_DIR_OUTPUT,
"egress (direct)");
@ -2131,14 +2142,13 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
}
}
mutex_lock(&phc->mp.stream->in_lock);
LOCK(&phc->mp.stream->lock);
int ret = dtls(phc->mp.sfd, &phc->s, &phc->mp.fsin);
if (ret == 1) {
phc->unkernelize = "DTLS connected";
phc->unkernelize_subscriptions = true;
ret = 0;
}
mutex_unlock(&phc->mp.stream->in_lock);
if (!ret)
return 0;
}
@ -2163,7 +2173,7 @@ static int media_demux_protocols(struct packet_handler_ctx *phc) {
#if RTP_LOOP_PROTECT
// returns: 0 = ok, proceed; -1 = duplicate detected, drop packet
static int media_loop_detect(struct packet_handler_ctx *phc) {
mutex_lock(&phc->mp.stream->in_lock);
mutex_lock(&phc->mp.stream->lock);
for (int i = 0; i < RTP_LOOP_PACKETS; i++) {
if (phc->mp.stream->lp_buf[i].len != phc->s.len)
@ -2177,7 +2187,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) {
"to avoid potential loop",
RTP_LOOP_MAX_COUNT,
FMT_M(endpoint_print_buf(&phc->mp.fsin)));
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return -1;
}
@ -2191,7 +2201,7 @@ static int media_loop_detect(struct packet_handler_ctx *phc) {
memcpy(phc->mp.stream->lp_buf[phc->mp.stream->lp_idx].buf, phc->s.s, MIN(phc->s.len, RTP_LOOP_PROTECT));
phc->mp.stream->lp_idx = (phc->mp.stream->lp_idx + 1) % RTP_LOOP_PACKETS;
loop_ok:
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return 0;
}
@ -2310,7 +2320,7 @@ static void media_packet_rtp_out(struct packet_handler_ctx *phc, struct sink_han
static int media_packet_decrypt(struct packet_handler_ctx *phc)
{
mutex_lock(&phc->in_srtp->in_lock);
mutex_lock(&phc->in_srtp->lock);
struct sink_handler *first_sh = phc->sinks->length ? phc->sinks->head->data : NULL;
const struct streamhandler *sh = __determine_handler(phc->in_srtp, first_sh);
@ -2331,7 +2341,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
phc->mp.payload.len -= ori_s.len - phc->s.len;
}
mutex_unlock(&phc->in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->lock);
if (ret == 1) {
phc->update = true;
@ -2341,7 +2351,7 @@ static int media_packet_decrypt(struct packet_handler_ctx *phc)
}
static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink_handler *sh)
{
mutex_lock(&phc->in_srtp->in_lock);
mutex_lock(&phc->in_srtp->lock);
__determine_handler(phc->in_srtp, sh);
// XXX use an array with index instead of if/else
@ -2351,7 +2361,7 @@ static void media_packet_set_encrypt(struct packet_handler_ctx *phc, struct sink
phc->encrypt_func = sh->handler->out->rtcp_crypt;
phc->rtcp_filter = sh->handler->in->rtcp_filter;
}
mutex_unlock(&phc->in_srtp->in_lock);
mutex_unlock(&phc->in_srtp->lock);
}
int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp) {
@ -2360,7 +2370,7 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
if (!encrypt_func)
return 0x00;
mutex_lock(&out->out_lock);
LOCK(&out->lock);
for (GList *l = mp->packets_out.head; l; l = l->next) {
struct codec_packet *p = l->data;
@ -2375,8 +2385,6 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
ret |= 0x01;
}
mutex_unlock(&out->out_lock);
return ret;
}
@ -2403,7 +2411,7 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
struct endpoint endpoint;
bool ret = false;
mutex_lock(&phc->mp.stream->in_lock);
mutex_lock(&phc->mp.stream->lock);
/* we're OK to (potentially) use the source address of this packet as destination
* in the other direction. */
@ -2432,10 +2440,8 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* do not pay attention to source addresses of incoming packets for asymmetric streams */
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) || phc->mp.stream->el_flags == EL_OFF) {
PS_SET(phc->mp.stream, CONFIRMED);
mutex_lock(&phc->mp.stream->out_lock);
if (MEDIA_ISSET(phc->mp.media, ASYMMETRIC) && !phc->mp.stream->learned_endpoint.address.family)
phc->mp.stream->learned_endpoint = phc->mp.fsin;
mutex_unlock(&phc->mp.stream->out_lock);
}
/* confirm sinks for unidirectional streams in order to kernelize */
@ -2451,7 +2457,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
/* see if we need to compare the source address with the known endpoint */
if (PS_ISSET2(phc->mp.stream, STRICT_SOURCE, MEDIA_HANDOVER)) {
endpoint = phc->mp.fsin;
mutex_lock(&phc->mp.stream->out_lock);
struct endpoint *ps_endpoint = MEDIA_ISSET(phc->mp.media, ASYMMETRIC) ?
&phc->mp.stream->learned_endpoint : &phc->mp.stream->endpoint;
@ -2467,8 +2472,6 @@ static bool media_packet_address_check(struct packet_handler_ctx *phc)
goto update_addr;
}
mutex_unlock(&phc->mp.stream->out_lock);
if (tmp && PS_ISSET(phc->mp.stream, STRICT_SOURCE)) {
ilog(LOG_INFO | LOG_FLAG_LIMIT, "Drop due to strict-source attribute; "
"got %s%s:%d%s, "
@ -2542,7 +2545,6 @@ confirm_now:
PS_SET(phc->mp.stream, CONFIRMED);
update_peerinfo:
mutex_lock(&phc->mp.stream->out_lock);
// if we're during the wait time, check the received address against the previously
// learned address. if they're the same, ignore this packet for learning purposes
if (!wait_time || !phc->mp.stream->learned_endpoint.address.family ||
@ -2561,8 +2563,6 @@ update_peerinfo:
}
}
update_addr:
mutex_unlock(&phc->mp.stream->out_lock);
/* check the destination address of the received packet against what we think our
* local interface to use is */
if (phc->mp.stream->selected_sfd && phc->mp.sfd != phc->mp.stream->selected_sfd) {
@ -2585,7 +2585,7 @@ update_addr:
}
out:
mutex_unlock(&phc->mp.stream->in_lock);
mutex_unlock(&phc->mp.stream->lock);
return ret;
}
@ -2605,9 +2605,7 @@ static void media_packet_kernel_check(struct packet_handler_ctx *phc) {
if (ML_ISSET(phc->mp.media->monologue, DTMF_INJECTION_ACTIVE))
return;
mutex_lock(&phc->mp.stream->in_lock);
kernelize(phc->mp.stream);
mutex_unlock(&phc->mp.stream->in_lock);
}
@ -2996,19 +2994,19 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (ret)
goto next_mirror;
mutex_lock(&mirror_sink->out_lock);
mutex_lock(&mirror_sink->lock);
if (!mirror_sink->advertised_endpoint.port
|| (is_addr_unspecified(&mirror_sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&mirror_sink->advertised_endpoint)))
{
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
goto next_mirror;
}
media_socket_dequeue(&mirror_phc.mp, mirror_sink);
mutex_unlock(&mirror_sink->out_lock);
mutex_unlock(&mirror_sink->lock);
next_mirror:
media_socket_dequeue(&mirror_phc.mp, NULL); // just free if anything left
@ -3034,13 +3032,13 @@ next_mirror:
}
}
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
if (!sink->advertised_endpoint.port
|| (is_addr_unspecified(&sink->advertised_endpoint.address)
&& !is_trickle_ice_address(&sink->advertised_endpoint)))
{
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
goto next;
}
@ -3049,18 +3047,18 @@ next_mirror:
else
ret = media_socket_dequeue(&phc->mp, NULL);
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
if (ret == 0)
goto next;
err_next:
ilog(LOG_DEBUG | LOG_FLAG_LIMIT ,"Error when sending message. Error: %s", strerror(errno));
mutex_lock(&sink->in_lock);
atomic64_inc(&sink->stats_in.errors);
mutex_lock(&sink->lock);
if (sink->selected_sfd)
atomic64_inc(&sink->selected_sfd->local_intf->stats.out.errors);
mutex_unlock(&sink->in_lock);
mutex_unlock(&sink->lock);
RTPE_STATS_INC(errors_user);
goto next;
@ -3537,7 +3535,7 @@ enum thread_looper_action kernel_stats_updater(void) {
atomic64_set(&sink->kernel_stats_out.packets, stats_o->packets);
atomic64_set(&sink->kernel_stats_out.errors, stats_o->errors);
mutex_lock(&sink->out_lock);
mutex_lock(&sink->lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
break;
@ -3565,10 +3563,10 @@ enum thread_looper_action kernel_stats_updater(void) {
update = true;
}
}
mutex_unlock(&sink->out_lock);
mutex_unlock(&sink->lock);
}
mutex_lock(&ps->in_lock);
mutex_lock(&ps->lock);
for (unsigned int u = 0; u < G_N_ELEMENTS(ke->target.ssrc); u++) {
if (!ke->target.ssrc[u]) // end of list
@ -3594,7 +3592,7 @@ enum thread_looper_action kernel_stats_updater(void) {
update = true;
}
}
mutex_unlock(&ps->in_lock);
mutex_unlock(&ps->lock);
}
rwlock_unlock_r(&sfd->call->master_lock);

@ -336,7 +336,7 @@ static void mqtt_stream_stats_dir(const struct stream_stats *s, JsonBuilder *jso
static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
mutex_lock(&ps->in_lock);
LOCK(&ps->lock);
struct stream_fd *sfd = ps->selected_sfd;
if (sfd) {
@ -364,10 +364,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_object(json);
mutex_unlock(&ps->in_lock);
mutex_lock(&ps->out_lock);
json_builder_set_member_name(json, "egress");
json_builder_begin_object(json);
mqtt_stream_stats_dir(&ps->stats_out, json);
@ -384,8 +380,6 @@ static void mqtt_stream_stats(struct packet_stream *ps, JsonBuilder *json) {
json_builder_end_array(json);
json_builder_end_object(json);
mutex_unlock(&ps->out_lock);
}

@ -2405,8 +2405,7 @@ char* redis_encode_json(struct call *c) {
for (GList *l = c->streams.head; l; l = l->next) {
struct packet_stream *ps = l->data;
LOCK(&ps->in_lock);
LOCK(&ps->out_lock);
LOCK(&ps->lock);
snprintf(tmp, sizeof(tmp), "stream-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);
@ -2440,8 +2439,7 @@ char* redis_encode_json(struct call *c) {
struct packet_stream *ps = l->data;
// XXX these should all go into the above loop
LOCK(&ps->in_lock);
LOCK(&ps->out_lock);
LOCK(&ps->lock);
snprintf(tmp, sizeof(tmp), "stream_sfds-%u", ps->unique_id);
json_builder_set_member_name(builder, tmp);

@ -1575,7 +1575,7 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
media_update_stats(media);
LOCK(&ps->in_lock);
LOCK(&ps->lock);
if (!ps->selected_sfd || !rtcp_ps->selected_sfd)
return;
@ -1604,8 +1604,6 @@ void rtcp_send_report(struct call_media *media, struct ssrc_ctx *ssrc_out) {
str rtcp_packet = STR_INIT_LEN(sr->str, sr->len);
LOCK(&ps->out_lock);
const struct streamhandler *crypt_handler = determine_handler(&transport_protocols[PROTO_RTP_AVP],
media, true);

@ -627,7 +627,7 @@ void ssrc_collect_metrics(struct call_media *media) {
e->jitter = e->jitter * 1000 / rpt->clock_rate;
}
LOCK(&ps->in_lock);
LOCK(&ps->lock);
RTPE_SAMPLE_SFD(jitter_measured, e->jitter, ps->selected_sfd);
}
}

@ -220,8 +220,7 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
ps = tg->t38_media->streams.head->data;
struct stream_fd *sfd = NULL;
if (ps) {
mutex_lock(&ps->in_lock);
mutex_lock(&ps->out_lock);
mutex_lock(&ps->lock);
sfd = ps->selected_sfd;
}
if (sfd && sfd->socket.fd != -1 && ps->endpoint.address.family != NULL) {
@ -233,10 +232,8 @@ static int t38_gateway_handler(t38_core_state_t *stat, void *user_data, const ui
else
ilog(LOG_WARN | LOG_FLAG_LIMIT, "Unable to send T.38 UDPTL packet due to lack of "
"socket or stream");
if (ps) {
mutex_unlock(&ps->out_lock);
mutex_unlock(&ps->in_lock);
}
if (ps)
mutex_unlock(&ps->lock);
g_string_free(s, TRUE);

@ -361,18 +361,10 @@ struct loop_protector {
* This is done through the various bit flags.
*/
struct packet_stream {
/* Both locks valid only with call->master_lock held in R.
/* Lock valid only with call->master_lock held in R.
* Preempted by call->master_lock held in W.
* If both in/out are to be locked, in_lock must be locked first.
*
* The in_lock protects fields relevant to packet reception on that stream,
* meanwhile the out_lock protects fields relevant to packet egress.
*
* This allows packet handling on multiple ports and streams belonging
* to the same call to happen at the same time.
*/
mutex_t in_lock,
out_lock;
mutex_t lock;
struct call_media *media; /* RO */
struct call *call; /* RO */
@ -381,23 +373,23 @@ struct packet_stream {
struct recording_stream recording; /* LOCK: call->master_lock */
GQueue sfds; /* LOCK: call->master_lock */
struct stream_fd * selected_sfd; // LOCK: in_lock
struct stream_fd * selected_sfd; // LOCK: ps->lock
endpoint_t last_local_endpoint;
struct dtls_connection ice_dtls; /* LOCK: in_lock */
struct dtls_connection ice_dtls; /* LOCK: ps->lock */
GQueue rtp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
GQueue rtcp_sinks; /* LOCK: call->master_lock, in_lock for streamhandler */
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
GQueue rtp_mirrors; /* LOCK: call->master_lock, in_lock for streamhandler */
struct endpoint endpoint; /* LOCK: out_lock */
struct endpoint detected_endpoints[4]; /* LOCK: out_lock */
time_t ep_detect_signal; /* LOCK: out_lock */
struct endpoint endpoint; /* LOCK: ps->lock */
struct endpoint detected_endpoints[4]; /* LOCK: ps->lock */
time_t ep_detect_signal; /* LOCK: ps->lock */
struct endpoint advertised_endpoint; /* RO */
struct endpoint learned_endpoint; /* LOCK: out_lock */
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: in_lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: out_lock */
unsigned int ssrc_in_idx, /* LOCK: in_lock */
ssrc_out_idx; /* LOCK: out_lock */
struct endpoint learned_endpoint; /* LOCK: ps->lock */
struct crypto_context crypto; /* OUT direction, LOCK: ps->lock */
struct ssrc_ctx *ssrc_in[RTPE_NUM_SSRC_TRACKING], /* LOCK: ps->lock */
*ssrc_out[RTPE_NUM_SSRC_TRACKING]; /* LOCK: ps->lock */
unsigned int ssrc_in_idx, /* LOCK: ps->lock */
ssrc_out_idx; /* LOCK: ps->lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
time_t kernel_time;
@ -414,15 +406,15 @@ struct packet_stream {
enum endpoint_learning el_flags;
#if RTP_LOOP_PROTECT
/* LOCK: in_lock: */
/* LOCK: ps->lock: */
unsigned int lp_idx;
struct loop_protector lp_buf[RTP_LOOP_PACKETS];
unsigned int lp_count;
#endif
X509 *dtls_cert; /* LOCK: in_lock */
X509 *dtls_cert; /* LOCK: ps->lock */
/* in_lock must be held for SETTING these: */
/* ps->lock must be held for SETTING these: */
volatile unsigned int ps_flags;
};

@ -311,7 +311,6 @@ INLINE int open_intf_socket(socket_t *r, unsigned int port, const struct local_i
return open_socket(r, SOCK_DGRAM, port, &lif->spec->local_address.addr);
}
void kernelize(struct packet_stream *);
void __unkernelize(struct packet_stream *, const char *);
void unkernelize(struct packet_stream *, const char *);
void __stream_unconfirm(struct packet_stream *, const char *);

Loading…
Cancel
Save