MT#55283 overhaul WS locking logic

Multiple writers may operate on a single WS connection simultaneously.
Make sure they don't get in each other's way while constructing their
messages/responses by holding the lock from the beginning of the
response until the point when it's fully ready. This fixes a problem of
parts of multiple messages getting mixed up with each other.

Change-Id: If84224fc06b423cd65c12981a5b09ee99b121df2
(cherry picked from commit 1480574001)
(cherry picked from commit a40dea5007)
mr12.5.1
Richard Fuchs 11 months ago
parent 16ca4c24ad
commit bc94e05219

@ -133,11 +133,9 @@ static void janus_send_json_sync_response(struct websocket_message *wm, JsonBuil
char *result = glib_json_print(builder); char *result = glib_json_print(builder);
if (wm->method == M_WEBSOCKET) if (wm->method == M_WEBSOCKET)
websocket_write_text(wm->wc, result, true); websocket_write_text(wm->wc, result);
else { else
websocket_http_response(wm->wc, code, "application/json", strlen(result)); websocket_http_complete(wm->wc, code, "application/json", strlen(result), result);
websocket_write_http(wm->wc, result, true);
}
g_free(result); g_free(result);
} }
@ -155,7 +153,7 @@ static void janus_send_json_async(struct janus_session *session, JsonBuilder *bu
struct websocket_conn *wc; struct websocket_conn *wc;
while (t_hash_table_iter_next(&iter, NULL, &wc)) { while (t_hash_table_iter_next(&iter, NULL, &wc)) {
// lock order constraint: janus_session lock first, websocket_conn lock second // lock order constraint: janus_session lock first, websocket_conn lock second
websocket_write_text(wc, result, true); websocket_write_text(wc, result);
} }
g_free(result); g_free(result);

@ -96,7 +96,7 @@ static void websocket_output_free(struct websocket_output *wo) {
} }
// appends to output buffer without triggering a response - unlocked // appends to output buffer without triggering a response - non-locking
static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) { static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) {
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
@ -111,17 +111,8 @@ static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, si
} }
// appends to output buffer without triggering a response
void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) {
LOCK(&wc->lock);
__websocket_queue_raw(wc, msg, len);
}
// num bytes in output buffer // num bytes in output buffer
size_t websocket_queue_len(struct websocket_conn *wc) { static size_t __websocket_queue_len(struct websocket_conn *wc) {
LOCK(&wc->lock);
size_t ret = 0; size_t ret = 0;
for (__auto_type l = wc->output_q.head; l; l = l->next) { for (__auto_type l = wc->output_q.head; l; l = l->next) {
struct websocket_output *wo = l->data; struct websocket_output *wo = l->data;
@ -132,19 +123,30 @@ size_t websocket_queue_len(struct websocket_conn *wc) {
} }
// adds data to output buffer (can be null) and optionally triggers specified response // adds data to output buffer (can be null) and optionally triggers specified response - non-locking
void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, static void __websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
enum lws_write_protocol protocol, bool done) enum lws_write_protocol protocol)
{ {
mutex_lock(&wc->lock);
__websocket_queue_raw(wc, msg, len); __websocket_queue_raw(wc, msg, len);
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
wo->protocol = protocol; wo->protocol = protocol;
t_queue_push_tail(&wc->output_q, websocket_output_new()); t_queue_push_tail(&wc->output_q, websocket_output_new());
}
mutex_unlock(&wc->lock); // called after all writes are done - non-locking
// may intermittently release wc->lock
static void __websocket_write_done(struct websocket_conn *wc) {
while (true) {
// If this connection is already closed, don't trigger
// a wakeup of the service thread and don't request a
// writeable callback. We can't write anyway if the
// connection is already closed. Prevents a deadlock
// between acquiring the service lock here and the
// websocket_conn_cleanup called from within the service
// loop.
if (!wc->wsi)
return;
if (done) {
// Sadly lws_callback_on_writable() doesn't do any internal // Sadly lws_callback_on_writable() doesn't do any internal
// locking, therefore we must protect it against a concurrently // locking, therefore we must protect it against a concurrently
// running lws_service(), as well as against other threads // running lws_service(), as well as against other threads
@ -172,40 +174,62 @@ void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
mutex_lock(&websocket_callback_lock); mutex_lock(&websocket_callback_lock);
lws_cancel_service(websocket_context); lws_cancel_service(websocket_context);
mutex_lock(&websocket_service_lock); if (mutex_trylock(&websocket_service_lock)) {
// Wrong lock order: service_lock should be the outer lock
// and wc->lock should be the inner lock. The connection may
// be in the process of being closed (websocket_conn_cleanup).
// Release wc->lock and try again.
mutex_unlock(&websocket_callback_lock);
mutex_unlock(&wc->lock);
sched_yield();
mutex_lock(&wc->lock);
continue;
}
lws_callback_on_writable(wc->wsi); lws_callback_on_writable(wc->wsi);
mutex_unlock(&websocket_service_lock); mutex_unlock(&websocket_service_lock);
mutex_unlock(&websocket_callback_lock); mutex_unlock(&websocket_callback_lock);
break;
} }
} }
// adds data to output buffer (can be null) and triggers specified response: http or binary websocket // adds data to output buffer (can be null) and triggers specified response: http or binary websocket - non-locking
void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, bool done) { void __websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len) {
websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP, done); __websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP);
} }
void websocket_write_http(struct websocket_conn *wc, const char *msg, bool done) { void __websocket_write_http(struct websocket_conn *wc, const char *msg) {
websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0, done); __websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0);
} }
void websocket_write_text(struct websocket_conn *wc, const char *msg, bool done) { void __websocket_write_text(struct websocket_conn *wc, const char *msg) {
websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT, done); __websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT);
} }
void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, bool done) { void __websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len) {
websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY, done); __websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY);
} }
// singe shot writes with locking
void websocket_write_next(struct websocket_conn *wc) { void websocket_write_text(struct websocket_conn *wc, const char *msg) {
LOCK(&wc->lock); {
t_queue_push_tail(&wc->output_q, websocket_output_new()); LOCK(&wc->lock);
__websocket_write_text(wc, msg);
__websocket_write_done(wc);
}
}
static void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len) {
{
LOCK(&wc->lock);
__websocket_write_binary(wc, msg, len);
__websocket_write_done(wc);
}
} }
static const char *websocket_echo_process(struct websocket_message *wm) { static const char *websocket_echo_process(struct websocket_message *wm) {
ilogs(http, LOG_DEBUG, "Returning %lu bytes websocket echo from %s", (unsigned long) wm->body->len, ilogs(http, LOG_DEBUG, "Returning %lu bytes websocket echo from %s", (unsigned long) wm->body->len,
endpoint_print_buf(&wm->wc->endpoint)); endpoint_print_buf(&wm->wc->endpoint));
websocket_write_binary(wm->wc, wm->body->str, wm->body->len, true); websocket_write_binary(wm->wc, wm->body->str, wm->body->len);
return NULL; return NULL;
} }
@ -306,6 +330,7 @@ static int websocket_dequeue(struct websocket_conn *wc) {
bool is_http = false; bool is_http = false;
mutex_lock(&wc->lock); mutex_lock(&wc->lock);
struct websocket_output *wo; struct websocket_output *wo;
struct lws *wsi = wc->wsi; struct lws *wsi = wc->wsi;
while ((wo = t_queue_pop_head(&wc->output_q))) { while ((wo = t_queue_pop_head(&wc->output_q))) {
@ -360,11 +385,9 @@ next:
return 0; return 0;
} }
void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type, void __websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length) ssize_t content_length)
{ {
LOCK(&wc->lock);
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q); struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
wo->http_status = status; wo->http_status = status;
@ -374,8 +397,12 @@ void websocket_http_response(struct websocket_conn *wc, int status, const char *
void websocket_http_complete(struct websocket_conn *wc, int status, const char *content_type, void websocket_http_complete(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length, const char *content) ssize_t content_length, const char *content)
{ {
websocket_http_response(wc, status, content_type, content_length); {
websocket_write_http(wc, content, true); LOCK(&wc->lock);
__websocket_http_response(wc, status, content_type, content_length);
__websocket_write_http(wc, content);
__websocket_write_done(wc);
}
} }
@ -426,12 +453,12 @@ static const char *websocket_http_metrics(struct websocket_message *wm) {
// adds printf string to output buffer without triggering response // adds printf string to output buffer without triggering response
static void websocket_queue_printf(struct cli_writer *cw, const char *fmt, ...) { static void __websocket_queue_printf(struct cli_writer *cw, const char *fmt, ...) {
va_list va; va_list va;
va_start(va, fmt); va_start(va, fmt);
char *s = g_strdup_vprintf(fmt, va); char *s = g_strdup_vprintf(fmt, va);
va_end(va); va_end(va);
websocket_queue_raw(cw->ptr, s, strlen(s)); __websocket_queue_raw(cw->ptr, s, strlen(s));
g_free(s); g_free(s);
} }
@ -445,14 +472,19 @@ static const char *websocket_http_cli(struct websocket_message *wm) {
str uri_cmd = STR_INIT(uri); str uri_cmd = STR_INIT(uri);
struct cli_writer cw = { struct cli_writer cw = {
.cw_printf = websocket_queue_printf, .cw_printf = __websocket_queue_printf,
.ptr = wm->wc, .ptr = wm->wc,
}; };
cli_handle(&uri_cmd, &cw);
size_t len = websocket_queue_len(wm->wc); {
LOCK(&wm->wc->lock);
cli_handle(&uri_cmd, &cw);
size_t len = __websocket_queue_len(wm->wc);
__websocket_http_response(wm->wc, 200, "text/plain", len);
__websocket_write_http(wm->wc, NULL);
__websocket_write_done(wm->wc);
}
websocket_http_complete(wm->wc, 200, "text/plain", len, NULL);
return NULL; return NULL;
} }
@ -463,12 +495,17 @@ static const char *websocket_cli_process(struct websocket_message *wm) {
str uri_cmd = STR_INIT_LEN(wm->body->str, wm->body->len); str uri_cmd = STR_INIT_LEN(wm->body->str, wm->body->len);
struct cli_writer cw = { struct cli_writer cw = {
.cw_printf = websocket_queue_printf, .cw_printf = __websocket_queue_printf,
.ptr = wm->wc, .ptr = wm->wc,
}; };
cli_handle(&uri_cmd, &cw);
websocket_write_binary(wm->wc, NULL, 0, true); {
LOCK(&wm->wc->lock);
cli_handle(&uri_cmd, &cw);
__websocket_write_binary(wm->wc, NULL, 0);
__websocket_write_done(wm->wc);
}
return NULL; return NULL;
} }
@ -477,25 +514,33 @@ static void websocket_ng_send_ws(str *cookie, str *body, const endpoint_t *sin,
void *p1) void *p1)
{ {
struct websocket_conn *wc = p1; struct websocket_conn *wc = p1;
if (cookie) { {
websocket_queue_raw(wc, cookie->s, cookie->len); LOCK(&wc->lock);
websocket_queue_raw(wc, " ", 1); if (cookie) {
__websocket_queue_raw(wc, cookie->s, cookie->len);
__websocket_queue_raw(wc, " ", 1);
}
__websocket_queue_raw(wc, body->s, body->len);
__websocket_write_binary(wc, NULL, 0);
__websocket_write_done(wc);
} }
websocket_queue_raw(wc, body->s, body->len);
websocket_write_binary(wc, NULL, 0, true);
} }
static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin, const sockaddr_t *from, static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin, const sockaddr_t *from,
void *p1) void *p1)
{ {
struct websocket_conn *wc = p1; struct websocket_conn *wc = p1;
websocket_http_response(wc, 200, "application/x-rtpengine-ng", {
(cookie ? (cookie->len + 1) : 0) + body->len); LOCK(&wc->lock);
if (cookie) { __websocket_http_response(wc, 200, "application/x-rtpengine-ng",
websocket_queue_raw(wc, cookie->s, cookie->len); (cookie ? (cookie->len + 1) : 0) + body->len);
websocket_queue_raw(wc, " ", 1); if (cookie) {
__websocket_queue_raw(wc, cookie->s, cookie->len);
__websocket_queue_raw(wc, " ", 1);
}
__websocket_queue_raw(wc, body->s, body->len);
__websocket_write_http(wc, NULL);
__websocket_write_done(wc);
} }
websocket_queue_raw(wc, body->s, body->len);
websocket_write_http(wc, NULL, true);
} }
static void __ng_buf_free(void *p) { static void __ng_buf_free(void *p) {
@ -694,6 +739,8 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) {
// wait until all remaining tasks are finished // wait until all remaining tasks are finished
mutex_lock(&wc->lock); mutex_lock(&wc->lock);
wc->wsi = NULL;
while (wc->jobs) while (wc->jobs)
cond_wait(&wc->cond, &wc->lock); cond_wait(&wc->cond, &wc->lock);

@ -38,18 +38,15 @@ int websocket_init(void);
void websocket_start(void); void websocket_start(void);
void websocket_stop(void); void websocket_stop(void);
// appends to output buffer without triggering a response
void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len);
// adds data to output buffer (can be null) and optionally triggers specified response
void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
enum lws_write_protocol protocol, bool done);
// adds data to output buffer (can be null) and triggers specified response: http or binary websocket // adds data to output buffer (can be null) and triggers specified response: http or binary websocket
void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, bool done); //void websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len);
void websocket_write_http(struct websocket_conn *wc, const char *msg, bool done); //void websocket_write_http(struct websocket_conn *wc, const char *msg);
void websocket_write_text(struct websocket_conn *wc, const char *msg, bool done); void websocket_write_text(struct websocket_conn *wc, const char *msg);
void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, bool done); //void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len);
// num bytes in output buffer
size_t websocket_queue_len(struct websocket_conn *wc); // single shot HTTP response
void websocket_http_complete(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length, const char *content);
// write HTTP response headers // write HTTP response headers
void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type, void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,

Loading…
Cancel
Save