TT#88951 support queuing multiple WS output messages

Change-Id: I9bcb1f5dd2942c4631820c25f343331c9630b8d7
pull/1099/head
Richard Fuchs 5 years ago
parent cd3652e81a
commit 4910f7b33f

@ -13,6 +13,12 @@ struct websocket_message;
struct websocket_conn;
struct websocket_output {
GString *str;
size_t str_done;
enum lws_write_protocol protocol;
};
struct websocket_conn {
// used in the single threaded libwebsockets context
struct lws *wsi;
@ -27,9 +33,7 @@ struct websocket_conn {
cond_t cond;
// output buffer - also protected by lock
GString *output;
size_t output_done;
enum lws_write_protocol output_protocol;
GQueue outout_q;
};
@ -56,16 +60,33 @@ static void websocket_message_free(struct websocket_message **wm) {
}
static struct websocket_output *websocket_output_new(void) {
struct websocket_output *wo = g_slice_alloc0(sizeof(*wo));
// str remains NULL -> unused output slot
return wo;
}
static void websocket_output_free(void *p) {
struct websocket_output *wo = p;
if (wo->str)
g_string_free(wo->str, TRUE);
g_slice_free1(sizeof(*wo), wo);
}
// appends to output buffer without triggering a response - unlocked
static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) {
if (!wc->output->len) {
struct websocket_output *wo = g_queue_peek_tail(&wc->outout_q);
if (!wo->str) {
wo->str = g_string_new("");
// allocate pre-buffer
g_string_set_size(wc->output, LWS_PRE);
wc->output_done = LWS_PRE;
g_string_set_size(wo->str, LWS_PRE);
wo->str_done = LWS_PRE;
}
if (msg && len)
g_string_append_len(wc->output, msg, len);
g_string_append_len(wo->str, msg, len);
}
@ -80,28 +101,31 @@ void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len)
// num bytes in output buffer
size_t websocket_queue_len(struct websocket_conn *wc) {
mutex_lock(&wc->lock);
size_t ret = wc->output->len;
size_t ret = 0;
for (GList *l = wc->outout_q.head; l; l = l->next) {
struct websocket_output *wo = l->data;
ret += (wo->str->len - LWS_PRE);
}
mutex_unlock(&wc->lock);
if (ret)
return ret - LWS_PRE;
return ret;
}
// adds data to output buffer (can be null) and triggers specified response
// adds data to output buffer (can be null) and optionally triggers specified response
int websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
enum lws_write_protocol protocol)
enum lws_write_protocol protocol, int done)
{
mutex_lock(&wc->lock);
__websocket_queue_raw(wc, msg, len);
struct websocket_output *wo = g_queue_peek_tail(&wc->outout_q);
wo->protocol = protocol;
g_queue_push_tail(&wc->outout_q, websocket_output_new());
// make sure post-buffer is allocated
g_string_set_size(wc->output, wc->output->len + LWS_SEND_BUFFER_POST_PADDING);
g_string_set_size(wc->output, wc->output->len - LWS_SEND_BUFFER_POST_PADDING);
wc->output_protocol = protocol;
if (done) {
lws_callback_on_writable(wc->wsi);
lws_cancel_service(websocket_context);
}
lws_callback_on_writable(wc->wsi);
lws_cancel_service(websocket_context);
mutex_unlock(&wc->lock);
return 0;
@ -109,24 +133,31 @@ int websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
// adds data to output buffer (can be null) and triggers specified response: http or binary websocket
int websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len) {
return websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP);
int websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, int done) {
return websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP, done);
}
int websocket_write_http(struct websocket_conn *wc, const char *msg, int done) {
return websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0, done);
}
int websocket_write_http(struct websocket_conn *wc, const char *msg) {
return websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0);
int websocket_write_text(struct websocket_conn *wc, const char *msg, int done) {
return websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT, done);
}
int websocket_write_text(struct websocket_conn *wc, const char *msg) {
return websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT);
int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, int done) {
return websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY, done);
}
int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len) {
return websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY);
void websocket_write_next(struct websocket_conn *wc) {
mutex_lock(&wc->lock);
g_queue_push_tail(&wc->outout_q, websocket_output_new());
mutex_unlock(&wc->lock);
}
static const char *websocket_echo_process(struct websocket_message *wm) {
ilog(LOG_DEBUG, "Returning %lu bytes websocket echo from %s", wm->body->len,
endpoint_print_buf(&wm->wc->endpoint));
websocket_write_binary(wm->wc, wm->body->str, wm->body->len);
websocket_write_binary(wm->wc, wm->body->str, wm->body->len, 1);
return NULL;
}
@ -177,18 +208,31 @@ static int websocket_dequeue(struct websocket_conn *wc) {
return 0;
mutex_lock(&wc->lock);
size_t to_send = wc->output->len - wc->output_done;
if (to_send) {
ilog(LOG_DEBUG, "Writing %lu bytes to LWS", (unsigned long) to_send);
size_t ret = lws_write(wc->wsi, (unsigned char *) wc->output->str + wc->output_done,
to_send, wc->output_protocol);
if (ret != to_send)
ilog(LOG_ERR, "Invalid LWS write: %lu != %lu",
(unsigned long) ret,
(unsigned long) to_send);
wc->output_done += ret;
struct websocket_output *wo;
while ((wo = g_queue_pop_head(&wc->outout_q))) {
// used buffer slot?
if (wo->str) {
// allocate post-buffer
g_string_set_size(wo->str, wo->str->len + LWS_SEND_BUFFER_POST_PADDING);
size_t to_send = wo->str->len - wo->str_done - LWS_SEND_BUFFER_POST_PADDING;
if (to_send) {
if (to_send > 500)
ilog(LOG_DEBUG, "Writing %lu bytes to LWS", (unsigned long) to_send);
else
ilog(LOG_DEBUG, "Writing back to LWS: '%.*s'",
(int) to_send, wo->str->str + wo->str_done);
size_t ret = lws_write(wc->wsi, (unsigned char *) wo->str->str + wo->str_done,
to_send, wo->protocol);
if (ret != to_send)
ilog(LOG_ERR, "Invalid LWS write: %lu != %lu",
(unsigned long) ret,
(unsigned long) to_send);
wo->str_done += ret;
}
}
websocket_output_free(wo);
}
g_queue_push_tail(&wc->outout_q, websocket_output_new());
mutex_unlock(&wc->lock);
return 0;
@ -235,7 +279,7 @@ static const char *websocket_http_ping(struct websocket_message *wm) {
if (websocket_http_response(wm->wc, 200, "text/plain", 5))
return "Failed to write response HTTP headers";
if (websocket_write_http(wm->wc, "pong\n"))
if (websocket_write_http(wm->wc, "pong\n", 1))
return "Failed to write pong response";
return NULL;
@ -272,7 +316,7 @@ static const char *websocket_http_cli(struct websocket_message *wm) {
if (websocket_http_response(wm->wc, 200, "text/plain", len))
return "Failed to write response HTTP headers";
if (websocket_write_http(wm->wc, NULL))
if (websocket_write_http(wm->wc, NULL, 1))
return "Failed to write pong response";
return NULL;
@ -291,7 +335,7 @@ static const char *websocket_cli_process(struct websocket_message *wm) {
};
cli_handle(&uri_cmd, &cw);
websocket_write_binary(wm->wc, NULL, 0);
websocket_write_binary(wm->wc, NULL, 0, 1);
return NULL;
}
@ -301,7 +345,7 @@ static void websocket_ng_send_ws(str *cookie, str *body, const endpoint_t *sin,
websocket_queue_raw(wc, cookie->s, cookie->len);
websocket_queue_raw(wc, " ", 1);
websocket_queue_raw(wc, body->s, body->len);
websocket_write_binary(wc, NULL, 0);
websocket_write_binary(wc, NULL, 0, 1);
}
static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin, void *p1) {
struct websocket_conn *wc = p1;
@ -310,7 +354,7 @@ static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin
websocket_queue_raw(wc, cookie->s, cookie->len);
websocket_queue_raw(wc, " ", 1);
websocket_queue_raw(wc, body->s, body->len);
websocket_write_http(wc, NULL);
websocket_write_http(wc, NULL, 1);
}
static const char *websocket_ng_process(struct websocket_message *wm) {
char addr[64];
@ -337,7 +381,7 @@ static const char *websocket_http_ng(struct websocket_message *wm) {
if (control_ng_process(&cmd, &wm->wc->endpoint, addr, websocket_ng_send_http, wm->wc)) {
websocket_http_response(wm->wc, 500, "text/plain", 6);
websocket_write_http(wm->wc, "error\n");
websocket_write_http(wm->wc, "error\n", 1);
}
return NULL;
@ -362,7 +406,7 @@ static int websocket_http_get(struct websocket_conn *wc) {
if (!handler) {
ilog(LOG_WARN, "Unhandled HTTP GET URI: '%s'", uri);
websocket_http_response(wm->wc, 404, "text/plain", 10);
websocket_write_http(wm->wc, "not found\n");
websocket_write_http(wm->wc, "not found\n", 1);
return 0;
}
@ -430,7 +474,7 @@ static int websocket_http_body(struct websocket_conn *wc, const char *body, size
if (!handler) {
ilog(LOG_WARN, "Unhandled HTTP POST URI: '%s'", wm->uri);
websocket_http_response(wm->wc, 404, "text/plain", 10);
websocket_write_http(wm->wc, "not found\n");
websocket_write_http(wm->wc, "not found\n", 1);
return 0;
}
@ -458,7 +502,7 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) {
free(wc->wm->uri);
g_slice_free1(sizeof(*wc->wm), wc->wm);
wc->wm = NULL;
g_string_free(wc->output, TRUE);
g_queue_clear_full(&wc->outout_q, websocket_output_free);
if (wc->uri)
free(wc->uri);
@ -479,7 +523,7 @@ static void websocket_conn_init(struct lws *wsi, void *p) {
mutex_init(&wc->lock);
cond_init(&wc->cond);
g_queue_init(&wc->messages);
wc->output = g_string_new("");
g_queue_push_tail(&wc->outout_q, websocket_output_new());
struct sockaddr_storage sa = {0,};
socklen_t sl = sizeof(sa);

@ -36,14 +36,14 @@ void websocket_start(void);
// appends to output buffer without triggering a response
void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len);
// adds data to output buffer (can be null) and triggers specified response
// adds data to output buffer (can be null) and optionally triggers specified response
int websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
enum lws_write_protocol protocol);
enum lws_write_protocol protocol, int done);
// adds data to output buffer (can be null) and triggers specified response: http or binary websocket
int websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len);
int websocket_write_http(struct websocket_conn *wc, const char *msg);
int websocket_write_text(struct websocket_conn *wc, const char *msg);
int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len);
int websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, int done);
int websocket_write_http(struct websocket_conn *wc, const char *msg, int done);
int websocket_write_text(struct websocket_conn *wc, const char *msg, int done);
int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, int done);
// num bytes in output buffer
size_t websocket_queue_len(struct websocket_conn *wc);

Loading…
Cancel
Save