MT#55283 single-threaded lws_write()

Avoid calling lws_write() from threads other than the service thread, as
this might not be thread-safe. Instead store the values used for the
HTTP response headers in the websocket_output, then trigger a "writable"
callback, and finally do all the lws_write() calls from the service
thread.

Reported in #1624

Change-Id: Ifcb050193044e5543f750a12fb44f5e16d4c0a08
pull/1640/head
Richard Fuchs 3 years ago
parent f293ca9a18
commit b207d0c586

@ -109,10 +109,11 @@ static const char *janus_send_json_msg(struct websocket_message *wm, JsonBuilder
else {
if (!code)
ret = "Tried to send asynchronous event to HTTP";
else if (websocket_http_response(wm->wc, code, "application/json", strlen(result)))
ret = "Failed to write Janus response HTTP headers";
else if (websocket_write_http(wm->wc, result, done))
ret = "Failed to write Janus JSON response";
else {
websocket_http_response(wm->wc, code, "application/json", strlen(result));
if (websocket_write_http(wm->wc, result, done))
ret = "Failed to write Janus JSON response";
}
}
g_free(result);

@ -19,6 +19,9 @@ struct websocket_output {
GString *str;
size_t str_done;
enum lws_write_protocol protocol;
int http_status;
const char *content_type;
ssize_t content_length;
};
struct websocket_conn {
@ -216,6 +219,49 @@ static void websocket_process(void *p, void *up) {
}
static const char *__websocket_write_http_response(struct websocket_conn *wc, int status,
const char *content_type, ssize_t content_length)
{
uint8_t buf[LWS_PRE + 2048], *start = &buf[LWS_PRE], *p = start,
*end = &buf[sizeof(buf) - LWS_PRE - 1];
if (lws_add_http_header_status(wc->wsi, status, &p, end))
return "Failed to add HTTP status";
if (content_type)
if (lws_add_http_header_by_token(wc->wsi, WSI_TOKEN_HTTP_CONTENT_TYPE,
(const unsigned char *) content_type,
strlen(content_type), &p, end))
return "Failed to add content-type";
if (content_length >= 0)
if (lws_add_http_header_content_length(wc->wsi, content_length, &p, end))
return "Failed to add HTTP headers to response";
if (lws_add_http_header_by_name(wc->wsi,
(unsigned char *) "Access-Control-Allow-Origin:",
(unsigned char *) "*", 1, &p, end))
return "Failed to add CORS header";
if (lws_add_http_header_by_name(wc->wsi,
(unsigned char *) "Access-Control-Max-Age:",
(unsigned char *) "86400", 5, &p, end))
return "Failed to add CORS header";
if (lws_add_http_header_by_name(wc->wsi,
(unsigned char *) "Access-Control-Allow-Methods:",
(unsigned char *) "GET, POST", 9, &p, end))
return "Failed to add CORS header";
if (lws_add_http_header_by_name(wc->wsi,
(unsigned char *) "Access-Control-Allow-Headers:",
(unsigned char *) "Content-Type", 12, &p, end))
return "Failed to add CORS header";
if (lws_finalize_http_header(wc->wsi, &p, end))
return "Failed to write HTTP headers";
size_t len = p - start;
if (lws_write(wc->wsi, start, len, LWS_WRITE_HTTP_HEADERS) != len)
return "Failed to write HTTP headers";
return NULL;
}
static int websocket_dequeue(struct websocket_conn *wc) {
if (!wc)
return 0;
@ -230,6 +276,15 @@ static int websocket_dequeue(struct websocket_conn *wc) {
if (!wo->str)
goto next;
if (wo->http_status) {
const char *err = __websocket_write_http_response(wc, wo->http_status,
wo->content_type, wo->content_length);
if (err) {
ilogs(http, LOG_ERR, "Failed to write HTTP response headers: %s", err);
goto next;
}
}
// allocate post-buffer
g_string_set_size(wo->str, wo->str->len + LWS_SEND_BUFFER_POST_PADDING);
size_t to_send = wo->str->len - wo->str_done - LWS_SEND_BUFFER_POST_PADDING;
@ -266,63 +321,21 @@ next:
return ret;
}
static const char *websocket_do_http_response(struct websocket_conn *wc, int status, const char *content_type,
void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length)
{
uint8_t buf[LWS_PRE + 2048], *start = &buf[LWS_PRE], *p = start,
*end = &buf[sizeof(buf) - LWS_PRE - 1];
if (lws_add_http_header_status(wc->wsi, status, &p, end))
return "Failed to add HTTP status";
if (content_type)
if (lws_add_http_header_by_token(wc->wsi, WSI_TOKEN_HTTP_CONTENT_TYPE,
(const unsigned char *) content_type,
strlen(content_type), &p, end))
return "Failed to add content-type";
if (content_length >= 0)
if (lws_add_http_header_content_length(wc->wsi, content_length, &p, end))
return "Failed to add HTTP headers to response";
if (lws_add_http_header_by_name(wc->wsi,
(unsigned char *) "Access-Control-Allow-Origin:",
(unsigned char *) "*", 1, &p, end))
return "Failed to add CORS header";
if (lws_add_http_header_by_name(wc->wsi,
(unsigned char *) "Access-Control-Max-Age:",
(unsigned char *) "86400", 5, &p, end))
return "Failed to add CORS header";
if (lws_add_http_header_by_name(wc->wsi,
(unsigned char *) "Access-Control-Allow-Methods:",
(unsigned char *) "GET, POST", 9, &p, end))
return "Failed to add CORS header";
if (lws_add_http_header_by_name(wc->wsi,
(unsigned char *) "Access-Control-Allow-Headers:",
(unsigned char *) "Content-Type", 12, &p, end))
return "Failed to add CORS header";
if (lws_finalize_http_header(wc->wsi, &p, end))
return "Failed to write HTTP headers";
LOCK(&wc->lock);
size_t len = p - start;
if (lws_write(wc->wsi, start, len, LWS_WRITE_HTTP_HEADERS) != len)
return "Failed to write HTTP headers";
struct websocket_output *wo = g_queue_peek_tail(&wc->output_q);
return NULL;
}
int websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length)
{
const char *err = websocket_do_http_response(wc, status, content_type, content_length);
if (!err)
return 0;
ilogs(http, LOG_ERR, "Failed to write HTTP response headers: %s", err);
return -1;
wo->http_status = status;
wo->content_type = content_type;
wo->content_length = content_length;
}
const char *websocket_http_complete(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length, const char *content)
{
if (websocket_http_response(wc, status, content_type, content_length))
return "Failed to write response HTTP headers";
websocket_http_response(wc, status, content_type, content_length);
if (websocket_write_http(wc, content, true))
return "Failed to write pong response";
return NULL;
@ -430,8 +443,7 @@ static void websocket_ng_send_ws(str *cookie, str *body, const endpoint_t *sin,
}
static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin, void *p1) {
struct websocket_conn *wc = p1;
if (websocket_http_response(wc, 200, "application/x-rtpengine-ng", cookie->len + 1 + body->len))
ilogs(http, LOG_WARN, "Failed to write HTTP headers");
websocket_http_response(wc, 200, "application/x-rtpengine-ng", cookie->len + 1 + body->len);
websocket_queue_raw(wc, cookie->s, cookie->len);
websocket_queue_raw(wc, " ", 1);
websocket_queue_raw(wc, body->s, body->len);

@ -52,7 +52,7 @@ int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t le
size_t websocket_queue_len(struct websocket_conn *wc);
// write HTTP response headers
int websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,
void websocket_http_response(struct websocket_conn *wc, int status, const char *content_type,
ssize_t content_length);
// mark a janus session as owned by this transport

Loading…
Cancel
Save