|
|
|
|
@ -34,7 +34,7 @@ struct websocket_conn {
|
|
|
|
|
cond_t cond;
|
|
|
|
|
|
|
|
|
|
// output buffer - also protected by lock
|
|
|
|
|
GQueue outout_q;
|
|
|
|
|
GQueue output_q;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -77,7 +77,7 @@ static void websocket_output_free(void *p) {
|
|
|
|
|
|
|
|
|
|
// appends to output buffer without triggering a response - unlocked
|
|
|
|
|
static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) {
|
|
|
|
|
struct websocket_output *wo = g_queue_peek_tail(&wc->outout_q);
|
|
|
|
|
struct websocket_output *wo = g_queue_peek_tail(&wc->output_q);
|
|
|
|
|
|
|
|
|
|
if (!wo->str) {
|
|
|
|
|
wo->str = g_string_new("");
|
|
|
|
|
@ -103,7 +103,7 @@ void websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len)
|
|
|
|
|
size_t websocket_queue_len(struct websocket_conn *wc) {
|
|
|
|
|
mutex_lock(&wc->lock);
|
|
|
|
|
size_t ret = 0;
|
|
|
|
|
for (GList *l = wc->outout_q.head; l; l = l->next) {
|
|
|
|
|
for (GList *l = wc->output_q.head; l; l = l->next) {
|
|
|
|
|
struct websocket_output *wo = l->data;
|
|
|
|
|
ret += (wo->str->len - LWS_PRE);
|
|
|
|
|
}
|
|
|
|
|
@ -118,9 +118,9 @@ int websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
|
|
|
|
|
{
|
|
|
|
|
mutex_lock(&wc->lock);
|
|
|
|
|
__websocket_queue_raw(wc, msg, len);
|
|
|
|
|
struct websocket_output *wo = g_queue_peek_tail(&wc->outout_q);
|
|
|
|
|
struct websocket_output *wo = g_queue_peek_tail(&wc->output_q);
|
|
|
|
|
wo->protocol = protocol;
|
|
|
|
|
g_queue_push_tail(&wc->outout_q, websocket_output_new());
|
|
|
|
|
g_queue_push_tail(&wc->output_q, websocket_output_new());
|
|
|
|
|
|
|
|
|
|
if (done) {
|
|
|
|
|
lws_callback_on_writable(wc->wsi);
|
|
|
|
|
@ -150,7 +150,7 @@ int websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t le
|
|
|
|
|
|
|
|
|
|
void websocket_write_next(struct websocket_conn *wc) {
|
|
|
|
|
mutex_lock(&wc->lock);
|
|
|
|
|
g_queue_push_tail(&wc->outout_q, websocket_output_new());
|
|
|
|
|
g_queue_push_tail(&wc->output_q, websocket_output_new());
|
|
|
|
|
mutex_unlock(&wc->lock);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -210,9 +210,11 @@ static int websocket_dequeue(struct websocket_conn *wc) {
|
|
|
|
|
if (!wc)
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
int is_http = 0;
|
|
|
|
|
|
|
|
|
|
mutex_lock(&wc->lock);
|
|
|
|
|
struct websocket_output *wo;
|
|
|
|
|
while ((wo = g_queue_pop_head(&wc->outout_q))) {
|
|
|
|
|
while ((wo = g_queue_pop_head(&wc->output_q))) {
|
|
|
|
|
// used buffer slot?
|
|
|
|
|
if (wo->str) {
|
|
|
|
|
// allocate post-buffer
|
|
|
|
|
@ -231,14 +233,22 @@ static int websocket_dequeue(struct websocket_conn *wc) {
|
|
|
|
|
(unsigned long) ret,
|
|
|
|
|
(unsigned long) to_send);
|
|
|
|
|
wo->str_done += ret;
|
|
|
|
|
|
|
|
|
|
if (wo->protocol == LWS_WRITE_HTTP)
|
|
|
|
|
is_http = 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
websocket_output_free(wo);
|
|
|
|
|
}
|
|
|
|
|
g_queue_push_tail(&wc->outout_q, websocket_output_new());
|
|
|
|
|
g_queue_push_tail(&wc->output_q, websocket_output_new());
|
|
|
|
|
|
|
|
|
|
int ret = 0;
|
|
|
|
|
if (is_http)
|
|
|
|
|
ret = lws_http_transaction_completed(wc->wsi);
|
|
|
|
|
|
|
|
|
|
mutex_unlock(&wc->lock);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static const char *websocket_do_http_response(struct websocket_conn *wc, int status, const char *content_type,
|
|
|
|
|
@ -545,7 +555,7 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) {
|
|
|
|
|
free(wc->wm->uri);
|
|
|
|
|
g_slice_free1(sizeof(*wc->wm), wc->wm);
|
|
|
|
|
wc->wm = NULL;
|
|
|
|
|
g_queue_clear_full(&wc->outout_q, websocket_output_free);
|
|
|
|
|
g_queue_clear_full(&wc->output_q, websocket_output_free);
|
|
|
|
|
if (wc->uri)
|
|
|
|
|
free(wc->uri);
|
|
|
|
|
|
|
|
|
|
@ -566,7 +576,7 @@ static void websocket_conn_init(struct lws *wsi, void *p) {
|
|
|
|
|
mutex_init(&wc->lock);
|
|
|
|
|
cond_init(&wc->cond);
|
|
|
|
|
g_queue_init(&wc->messages);
|
|
|
|
|
g_queue_push_tail(&wc->outout_q, websocket_output_new());
|
|
|
|
|
g_queue_push_tail(&wc->output_q, websocket_output_new());
|
|
|
|
|
|
|
|
|
|
struct sockaddr_storage sa = {0,};
|
|
|
|
|
socklen_t sl = sizeof(sa);
|
|
|
|
|
|