diff --git a/daemon/websocket.c b/daemon/websocket.c index e27d3aecb..bdc42a759 100644 --- a/daemon/websocket.c +++ b/daemon/websocket.c @@ -13,6 +13,12 @@ struct websocket_message; struct websocket_conn; +struct websocket_output { + GString *str; + size_t str_done; + enum lws_write_protocol protocol; +}; + struct websocket_conn { // used in the single threaded libwebsockets context struct lws *wsi; @@ -27,9 +33,7 @@ struct websocket_conn { cond_t cond; // output buffer - also protected by lock - GString *output; - size_t output_done; - enum lws_write_protocol output_protocol; + GQueue outout_q; }; @@ -56,16 +60,33 @@ static void websocket_message_free(struct websocket_message **wm) { } +static struct websocket_output *websocket_output_new(void) { + struct websocket_output *wo = g_slice_alloc0(sizeof(*wo)); + // str remains NULL -> unused output slot + return wo; +} + +static void websocket_output_free(void *p) { + struct websocket_output *wo = p; + if (wo->str) + g_string_free(wo->str, TRUE); + g_slice_free1(sizeof(*wo), wo); +} + + // appends to output buffer without triggering a response - unlocked static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) { - if (!wc->output->len) { + struct websocket_output *wo = g_queue_peek_tail(&wc->outout_q); + + if (!wo->str) { + wo->str = g_string_new(""); // allocate pre-buffer - g_string_set_size(wc->output, LWS_PRE); - wc->output_done = LWS_PRE; + g_string_set_size(wo->str, LWS_PRE); + wo->str_done = LWS_PRE; } if (msg && len) - g_string_append_len(wc->output, msg, len); + g_string_append_len(wo->str, msg, len); } @@ -80,28 +101,31 @@ void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) // num bytes in output buffer size_t websocket_queue_len(struct websocket_conn *wc) { mutex_lock(&wc->lock); - size_t ret = wc->output->len; + size_t ret = 0; + for (GList *l = wc->outout_q.head; l; l = l->next) { + struct websocket_output *wo = l->data; + ret += (wo->str->len - LWS_PRE); + } mutex_unlock(&wc->lock); - if (ret) - return ret - LWS_PRE; return ret; } -// adds data to output buffer (can be null) and triggers specified response +// adds data to output buffer (can be null) and optionally triggers specified response int websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, - enum lws_write_protocol protocol) + enum lws_write_protocol protocol, int done) { mutex_lock(&wc->lock); __websocket_queue_raw(wc, msg, len); + struct websocket_output *wo = g_queue_peek_tail(&wc->outout_q); + wo->protocol = protocol; + g_queue_push_tail(&wc->outout_q, websocket_output_new()); - // make sure post-buffer is allocated - g_string_set_size(wc->output, wc->output->len + LWS_SEND_BUFFER_POST_PADDING); - g_string_set_size(wc->output, wc->output->len - LWS_SEND_BUFFER_POST_PADDING); - wc->output_protocol = protocol; + if (done) { + lws_callback_on_writable(wc->wsi); + lws_cancel_service(websocket_context); + } - lws_callback_on_writable(wc->wsi); - lws_cancel_service(websocket_context); mutex_unlock(&wc->lock); return 0; @@ -109,24 +133,31 @@ int websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, // adds data to output buffer (can be null) and triggers specified response: http or binary websocket -int websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len) { - return websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP); +int websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, int done) { + return websocket_write_raw(wc, msg, len, LWS_WRITE_HTTP, done); +} +int websocket_write_http(struct websocket_conn *wc, const char *msg, int done) { + return websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0, done); } -int websocket_write_http(struct websocket_conn *wc, const char *msg) { - return websocket_write_http_len(wc, msg, msg ? strlen(msg) : 0); +int websocket_write_text(struct websocket_conn *wc, const char *msg, int done) { + return websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT, done); } -int websocket_write_text(struct websocket_conn *wc, const char *msg) { - return websocket_write_raw(wc, msg, strlen(msg), LWS_WRITE_TEXT); +int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, int done) { + return websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY, done); } -int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len) { - return websocket_write_raw(wc, msg, len, LWS_WRITE_BINARY); + + +void websocket_write_next(struct websocket_conn *wc) { + mutex_lock(&wc->lock); + g_queue_push_tail(&wc->outout_q, websocket_output_new()); + mutex_unlock(&wc->lock); } static const char *websocket_echo_process(struct websocket_message *wm) { ilog(LOG_DEBUG, "Returning %lu bytes websocket echo from %s", wm->body->len, endpoint_print_buf(&wm->wc->endpoint)); - websocket_write_binary(wm->wc, wm->body->str, wm->body->len); + websocket_write_binary(wm->wc, wm->body->str, wm->body->len, 1); return NULL; } @@ -177,18 +208,31 @@ static int websocket_dequeue(struct websocket_conn *wc) { return 0; mutex_lock(&wc->lock); - size_t to_send = wc->output->len - wc->output_done; - if (to_send) { - ilog(LOG_DEBUG, "Writing %lu bytes to LWS", (unsigned long) to_send); - size_t ret = lws_write(wc->wsi, (unsigned char *) wc->output->str + wc->output_done, - to_send, wc->output_protocol); - if (ret != to_send) - ilog(LOG_ERR, "Invalid LWS write: %lu != %lu", - (unsigned long) ret, - (unsigned long) to_send); - wc->output_done += ret; - + struct websocket_output *wo; + while ((wo = g_queue_pop_head(&wc->outout_q))) { + // used buffer slot? + if (wo->str) { + // allocate post-buffer + g_string_set_size(wo->str, wo->str->len + LWS_SEND_BUFFER_POST_PADDING); + size_t to_send = wo->str->len - wo->str_done - LWS_SEND_BUFFER_POST_PADDING; + if (to_send) { + if (to_send > 500) + ilog(LOG_DEBUG, "Writing %lu bytes to LWS", (unsigned long) to_send); + else + ilog(LOG_DEBUG, "Writing back to LWS: '%.*s'", + (int) to_send, wo->str->str + wo->str_done); + size_t ret = lws_write(wc->wsi, (unsigned char *) wo->str->str + wo->str_done, + to_send, wo->protocol); + if (ret != to_send) + ilog(LOG_ERR, "Invalid LWS write: %lu != %lu", + (unsigned long) ret, + (unsigned long) to_send); + wo->str_done += ret; + } + } + websocket_output_free(wo); } + g_queue_push_tail(&wc->outout_q, websocket_output_new()); mutex_unlock(&wc->lock); return 0; @@ -235,7 +279,7 @@ static const char *websocket_http_ping(struct websocket_message *wm) { if (websocket_http_response(wm->wc, 200, "text/plain", 5)) return "Failed to write response HTTP headers"; - if (websocket_write_http(wm->wc, "pong\n")) + if (websocket_write_http(wm->wc, "pong\n", 1)) return "Failed to write pong response"; return NULL; @@ -272,7 +316,7 @@ static const char *websocket_http_cli(struct websocket_message *wm) { if (websocket_http_response(wm->wc, 200, "text/plain", len)) return "Failed to write response HTTP headers"; - if (websocket_write_http(wm->wc, NULL)) + if (websocket_write_http(wm->wc, NULL, 1)) return "Failed to write pong response"; return NULL; @@ -291,7 +335,7 @@ static const char *websocket_cli_process(struct websocket_message *wm) { }; cli_handle(&uri_cmd, &cw); - websocket_write_binary(wm->wc, NULL, 0); + websocket_write_binary(wm->wc, NULL, 0, 1); return NULL; } @@ -301,7 +345,7 @@ static void websocket_ng_send_ws(str *cookie, str *body, const endpoint_t *sin, websocket_queue_raw(wc, cookie->s, cookie->len); websocket_queue_raw(wc, " ", 1); websocket_queue_raw(wc, body->s, body->len); - websocket_write_binary(wc, NULL, 0); + websocket_write_binary(wc, NULL, 0, 1); } static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin, void *p1) { struct websocket_conn *wc = p1; @@ -310,7 +354,7 @@ static void websocket_ng_send_http(str *cookie, str *body, const endpoint_t *sin websocket_queue_raw(wc, cookie->s, cookie->len); websocket_queue_raw(wc, " ", 1); websocket_queue_raw(wc, body->s, body->len); - websocket_write_http(wc, NULL); + websocket_write_http(wc, NULL, 1); } static const char *websocket_ng_process(struct websocket_message *wm) { char addr[64]; @@ -337,7 +381,7 @@ static const char *websocket_http_ng(struct websocket_message *wm) { if (control_ng_process(&cmd, &wm->wc->endpoint, addr, websocket_ng_send_http, wm->wc)) { websocket_http_response(wm->wc, 500, "text/plain", 6); - websocket_write_http(wm->wc, "error\n"); + websocket_write_http(wm->wc, "error\n", 1); } return NULL; @@ -362,7 +406,7 @@ static int websocket_http_get(struct websocket_conn *wc) { if (!handler) { ilog(LOG_WARN, "Unhandled HTTP GET URI: '%s'", uri); websocket_http_response(wm->wc, 404, "text/plain", 10); - websocket_write_http(wm->wc, "not found\n"); + websocket_write_http(wm->wc, "not found\n", 1); return 0; } @@ -430,7 +474,7 @@ static int websocket_http_body(struct websocket_conn *wc, const char *body, size if (!handler) { ilog(LOG_WARN, "Unhandled HTTP POST URI: '%s'", wm->uri); websocket_http_response(wm->wc, 404, "text/plain", 10); - websocket_write_http(wm->wc, "not found\n"); + websocket_write_http(wm->wc, "not found\n", 1); return 0; } @@ -458,7 +502,7 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) { free(wc->wm->uri); g_slice_free1(sizeof(*wc->wm), wc->wm); wc->wm = NULL; - g_string_free(wc->output, TRUE); + g_queue_clear_full(&wc->outout_q, websocket_output_free); if (wc->uri) free(wc->uri); @@ -479,7 +523,7 @@ static void websocket_conn_init(struct lws *wsi, void *p) { mutex_init(&wc->lock); cond_init(&wc->cond); g_queue_init(&wc->messages); - wc->output = g_string_new(""); + g_queue_push_tail(&wc->outout_q, websocket_output_new()); struct sockaddr_storage sa = {0,}; socklen_t sl = sizeof(sa); diff --git a/include/websocket.h b/include/websocket.h index 01e8a659b..03d5c66ea 100644 --- a/include/websocket.h +++ b/include/websocket.h @@ -36,14 +36,14 @@ void websocket_start(void); // appends to output buffer without triggering a response void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len); -// adds data to output buffer (can be null) and triggers specified response +// adds data to output buffer (can be null) and optionally triggers specified response int websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len, - enum lws_write_protocol protocol); + enum lws_write_protocol protocol, int done); // adds data to output buffer (can be null) and triggers specified response: http or binary websocket -int websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len); -int websocket_write_http(struct websocket_conn *wc, const char *msg); -int websocket_write_text(struct websocket_conn *wc, const char *msg); -int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len); +int websocket_write_http_len(struct websocket_conn *wc, const char *msg, size_t len, int done); +int websocket_write_http(struct websocket_conn *wc, const char *msg, int done); +int websocket_write_text(struct websocket_conn *wc, const char *msg, int done); +int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t len, int done); // num bytes in output buffer size_t websocket_queue_len(struct websocket_conn *wc);