@ -54,6 +54,8 @@ struct websocket_ng_buf {
static GQueue websocket_vhost_configs ;
static GQueue websocket_vhost_configs ;
static struct lws_context * websocket_context ;
static struct lws_context * websocket_context ;
static GThreadPool * websocket_threads ;
static GThreadPool * websocket_threads ;
static mutex_t websocket_callback_lock = MUTEX_STATIC_INIT ;
static mutex_t websocket_service_lock = MUTEX_STATIC_INIT ;
static struct websocket_message * websocket_message_new ( struct websocket_conn * wc ) {
static struct websocket_message * websocket_message_new ( struct websocket_conn * wc ) {
@ -135,14 +137,42 @@ void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
wo - > protocol = protocol ;
wo - > protocol = protocol ;
g_queue_push_tail ( & wc - > output_q , websocket_output_new ( ) ) ;
g_queue_push_tail ( & wc - > output_q , websocket_output_new ( ) ) ;
mutex_unlock ( & wc - > lock ) ;
if ( done ) {
if ( done ) {
lws_callback_on_writable ( wc - > wsi ) ;
// Sadly lws_callback_on_writable() doesn't do any internal
// locking, therefore we must protect it against a concurrently
// running lws_service(), as well as against other threads
// invoking lws_callback_on_writable().
//
// Acquire the callback lock first, which is normally unlocked,
// then wake up the service thread and try to break out of
// lws_service(). The service thread holds the service lock
// while lws_service() is executing and releases it as soon as
// lws_service() is done. We therefore try to acquire the
// service lock here, which blocks us until lws_service() is
// actually done. At this point the service thread will try to
// acquire the callback lock, which is still held by us here,
// and so the service thread will block until we are done
// calling lws_callback_on_writable(). Finally we release both
// locks, which allows the service thread to resume
// lws_service().
//
// The suggested approach of using
// LWS_CALLBACK_EVENT_WAIT_CANCELLED together with a queue and
// then calling lws_callback_on_writable() from the service
// thread is not usable as libwebsockets 2.0 doesn't support
// LWS_CALLBACK_EVENT_WAIT_CANCELLED.
mutex_lock ( & websocket_callback_lock ) ;
lws_cancel_service ( websocket_context ) ;
lws_cancel_service ( websocket_context ) ;
}
mutex_unlock ( & wc - > lock ) ;
mutex_lock ( & websocket_service_lock ) ;
lws_callback_on_writable ( wc - > wsi ) ;
return 0 ;
mutex_unlock ( & websocket_service_lock ) ;
mutex_unlock ( & websocket_callback_lock ) ;
}
}
}
@ -1124,8 +1154,16 @@ err:
static void websocket_loop ( void * p ) {
static void websocket_loop ( void * p ) {
ilogs ( http , LOG_INFO , " Websocket listener thread running " ) ;
ilogs ( http , LOG_INFO , " Websocket listener thread running " ) ;
while ( ! rtpe_shutdown )
while ( ! rtpe_shutdown ) {
// see websocket_write_raw() for locking logic
mutex_lock ( & websocket_service_lock ) ;
lws_service ( websocket_context , 100 ) ;
lws_service ( websocket_context , 100 ) ;
mutex_unlock ( & websocket_service_lock ) ;
mutex_lock ( & websocket_callback_lock ) ;
mutex_unlock ( & websocket_callback_lock ) ;
}
websocket_cleanup ( ) ;
websocket_cleanup ( ) ;
}
}