diff --git a/daemon/media_socket.c b/daemon/media_socket.c index a03c89118..e46a1b4c3 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -3009,6 +3009,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { if (sfd->socket.fd != fd) return; + // +1 to active read events. If it was zero then we handle it. If it was non-zero, + // another thread is already handling this socket and will process our event. + if (g_atomic_int_add(&sfd->active_read_events, 1) != 0) + return; + ca = sfd->call ? : NULL; log_info_stream_fd(sfd); @@ -3022,6 +3027,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { return; } +restart: + for (iters = 0; ; iters++) { #if MAX_RECV_ITERS if (iters >= MAX_RECV_ITERS) { @@ -3075,6 +3082,11 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { update = true; } + // -1 active read events. If it's non-zero, another thread has received a read event, + // and we must handle it here. + if (!g_atomic_int_dec_and_test(&sfd->active_read_events)) + goto restart; + // no strike if (strikes > 0) g_atomic_int_compare_and_exchange(&sfd->error_strikes, strikes, strikes - 1); diff --git a/include/media_socket.h b/include/media_socket.h index 23ce1a4dd..11e7f776c 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -229,6 +229,7 @@ struct stream_fd { struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */ int error_strikes; + int active_read_events; struct poller *poller; };