MT#55283 use typed GQueue for websocket objects

Change-Id: I80fabf32d72eb1e87b41335416dad918b458409d
pull/1776/head
Richard Fuchs 2 years ago
parent 30f75fa7c2
commit 218f4b586b

@ -26,6 +26,8 @@ struct websocket_output {
TYPED_GHASHTABLE(janus_sessions_ht, struct janus_session, struct janus_session,
g_direct_hash, g_direct_equal, NULL, NULL)
TYPED_GQUEUE(websocket_message, struct websocket_message)
TYPED_GQUEUE(websocket_output, struct websocket_output)
struct websocket_conn {
// used in the single threaded libwebsockets context
@ -37,12 +39,12 @@ struct websocket_conn {
// multithreaded message processing
mutex_t lock;
unsigned int jobs;
GQueue messages;
websocket_message_q messages;
cond_t cond;
janus_sessions_ht janus_sessions;
// output buffer - also protected by lock
GQueue output_q;
websocket_output_q output_q;
};
struct websocket_ng_buf {
@ -85,8 +87,7 @@ static struct websocket_output *websocket_output_new(void) {
return wo;
}
static void websocket_output_free(void *p) {
struct websocket_output *wo = p;
static void websocket_output_free(struct websocket_output *wo) {
if (wo->str)
g_string_free(wo->str, TRUE);
g_slice_free1(sizeof(*wo), wo);
@ -95,7 +96,7 @@ static void websocket_output_free(void *p) {
// appends to output buffer without triggering a response - unlocked
static void __websocket_queue_raw(struct websocket_conn *wc, const char *msg, size_t len) {
struct websocket_output *wo = g_queue_peek_tail(&wc->output_q);
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
if (!wo->str) {
wo->str = g_string_new("");
@ -121,7 +122,7 @@ size_t websocket_queue_len(struct websocket_conn *wc) {
LOCK(&wc->lock);
size_t ret = 0;
for (GList *l = wc->output_q.head; l; l = l->next) {
for (__auto_type l = wc->output_q.head; l; l = l->next) {
struct websocket_output *wo = l->data;
ret += (wo->str->len - LWS_PRE);
}
@ -136,9 +137,9 @@ void websocket_write_raw(struct websocket_conn *wc, const char *msg, size_t len,
{
mutex_lock(&wc->lock);
__websocket_queue_raw(wc, msg, len);
struct websocket_output *wo = g_queue_peek_tail(&wc->output_q);
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
wo->protocol = protocol;
g_queue_push_tail(&wc->output_q, websocket_output_new());
t_queue_push_tail(&wc->output_q, websocket_output_new());
mutex_unlock(&wc->lock);
@ -196,7 +197,7 @@ void websocket_write_binary(struct websocket_conn *wc, const char *msg, size_t l
void websocket_write_next(struct websocket_conn *wc) {
LOCK(&wc->lock);
g_queue_push_tail(&wc->output_q, websocket_output_new());
t_queue_push_tail(&wc->output_q, websocket_output_new());
}
@ -217,7 +218,7 @@ static void websocket_message_push(struct websocket_conn *wc, websocket_message_
assert(wm != NULL);
wm->func = func;
g_queue_push_tail(&wc->messages, wm);
t_queue_push_tail(&wc->messages, wm);
wc->jobs++;
g_thread_pool_push(websocket_threads, wc, NULL);
@ -229,7 +230,7 @@ static void websocket_process(void *p, void *up) {
struct websocket_conn *wc = p;
mutex_lock(&wc->lock);
struct websocket_message *wm = g_queue_pop_head(&wc->messages);
struct websocket_message *wm = t_queue_pop_head(&wc->messages);
mutex_unlock(&wc->lock);
assert(wm != NULL);
@ -304,7 +305,7 @@ static int websocket_dequeue(struct websocket_conn *wc) {
mutex_lock(&wc->lock);
struct websocket_output *wo;
struct lws *wsi = wc->wsi;
while ((wo = g_queue_pop_head(&wc->output_q))) {
while ((wo = t_queue_pop_head(&wc->output_q))) {
// used buffer slot?
if (!wo->str)
goto next;
@ -342,7 +343,7 @@ static int websocket_dequeue(struct websocket_conn *wc) {
next:
websocket_output_free(wo);
}
g_queue_push_tail(&wc->output_q, websocket_output_new());
t_queue_push_tail(&wc->output_q, websocket_output_new());
mutex_unlock(&wc->lock);
@ -359,7 +360,7 @@ void websocket_http_response(struct websocket_conn *wc, int status, const char *
{
LOCK(&wc->lock);
struct websocket_output *wo = g_queue_peek_tail(&wc->output_q);
struct websocket_output *wo = t_queue_peek_tail(&wc->output_q);
wo->http_status = status;
wo->content_type = content_type;
@ -694,7 +695,7 @@ static void websocket_conn_cleanup(struct websocket_conn *wc) {
free(wc->wm->uri);
g_slice_free1(sizeof(*wc->wm), wc->wm);
wc->wm = NULL;
g_queue_clear_full(&wc->output_q, websocket_output_free);
t_queue_clear_full(&wc->output_q, websocket_output_free);
if (wc->uri)
free(wc->uri);
@ -741,8 +742,8 @@ static int websocket_conn_init(struct lws *wsi, void *p) {
wc->wsi = wsi;
mutex_init(&wc->lock);
cond_init(&wc->cond);
g_queue_init(&wc->messages);
g_queue_push_tail(&wc->output_q, websocket_output_new());
t_queue_init(&wc->messages);
t_queue_push_tail(&wc->output_q, websocket_output_new());
wc->wm = websocket_message_new(wc);
wc->janus_sessions = janus_sessions_ht_new();

Loading…
Cancel
Save