mirror of https://github.com/asterisk/asterisk
				
				
				
			
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							255 lines
						
					
					
						
							6.5 KiB
						
					
					
				
			
		
		
	
	
							255 lines
						
					
					
						
							6.5 KiB
						
					
					
				| /*
 | |
|  * Asterisk -- An open source telephony toolkit.
 | |
|  *
 | |
|  * Copyright (C) 2021, Sangoma Technologies Corporation
 | |
|  *
 | |
|  * Kevin Harwell <kharwell@sangoma.com>
 | |
|  *
 | |
|  * See http://www.asterisk.org for more information about
 | |
|  * the Asterisk project. Please do not directly contact
 | |
|  * any of the maintainers of this project for assistance;
 | |
|  * the project provides a web site, mailing lists and IRC
 | |
|  * channels for your use.
 | |
|  *
 | |
|  * This program is free software, distributed under the terms of
 | |
|  * the GNU General Public License Version 2. See the LICENSE file
 | |
|  * at the top of the source tree.
 | |
|  */
 | |
| 
 | |
| #include "asterisk.h"
 | |
| 
 | |
| #include "asterisk/http_websocket.h"
 | |
| #include "asterisk/utils.h"
 | |
| 
 | |
| #include "logger.h"
 | |
| #include "transport.h"
 | |
| #include "transport_websocket.h"
 | |
| 
 | |
| #define log_error(obj, fmt, ...) aeap_error(obj, "websocket", fmt, ##__VA_ARGS__)
 | |
| 
 | |
| struct aeap_transport_websocket {
 | |
| 	/*! Derive from base transport (must be first attribute) */
 | |
| 	struct aeap_transport base;
 | |
| 	/*! The underlying websocket */
 | |
| 	struct ast_websocket *ws;
 | |
| };
 | |
| 
 | |
| static int websocket_connect(struct aeap_transport *self, const char *url,
 | |
| 	const char *protocol, int timeout)
 | |
| {
 | |
| 	struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
 | |
| 	enum ast_websocket_result ws_result;
 | |
| 	struct ast_websocket_client_options ws_options = {
 | |
| 		.uri = url,
 | |
| 		.protocols = protocol,
 | |
| 		.timeout = timeout,
 | |
| 		.tls_cfg = NULL,
 | |
| 	};
 | |
| 
 | |
| 	transport->ws = ast_websocket_client_create_with_options(&ws_options, &ws_result);
 | |
| 	if (ws_result != WS_OK) {
 | |
| 		log_error(self, "connect failure (%d)", (int)ws_result);
 | |
| 		return -1;
 | |
| 	}
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static int websocket_disconnect(struct aeap_transport *self)
 | |
| {
 | |
| 	struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
 | |
| 
 | |
| 	if (transport->ws) {
 | |
| 		ast_websocket_unref(transport->ws);
 | |
| 		transport->ws = NULL;
 | |
| 	}
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| static void websocket_destroy(struct aeap_transport *self)
 | |
| {
 | |
| 	/*
 | |
| 	 * Disconnect takes care of cleaning up the websocket. Note, disconnect
 | |
| 	 * was called by the base/dispatch interface prior to calling this
 | |
| 	 * function so nothing to do here.
 | |
| 	 */
 | |
| }
 | |
| 
 | |
| static intmax_t websocket_read(struct aeap_transport *self, void *buf, intmax_t size,
 | |
| 	enum AST_AEAP_DATA_TYPE *rtype)
 | |
| {
 | |
| 	struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
 | |
| 
 | |
| 	char *payload;
 | |
| 	uint64_t bytes_read = 0;
 | |
| 	uint64_t total_bytes_read = 0;
 | |
| 	enum ast_websocket_opcode opcode;
 | |
| 	int fragmented = 0;
 | |
| 
 | |
| 	*rtype = AST_AEAP_DATA_TYPE_NONE;
 | |
| 
 | |
| 	if (ast_websocket_fd(transport->ws) < 0) {
 | |
| 		log_error(self, "unavailable for reading");
 | |
| 		/* Ensure this transport is in a disconnected state */
 | |
| 		aeap_transport_disconnect(self);
 | |
| 		return -1;
 | |
| 	}
 | |
| 
 | |
| 	/*
 | |
| 	 * This function is called with the read_lock locked. However, the lock needs to be
 | |
| 	 * unlocked while waiting for input otherwise a deadlock can occur during disconnect
 | |
| 	 * (disconnect attempts to grab the lock but can't because read holds it here). So
 | |
| 	 * unlock it prior to waiting.
 | |
| 	 */
 | |
| 	ast_mutex_unlock(&transport->base.read_lock);
 | |
| 	while (ast_websocket_wait_for_input(transport->ws, -1) <= 0) {
 | |
| 		/* If this was poll getting interrupted just go back to waiting */
 | |
| 		if (errno == EINTR || errno == EAGAIN) {
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		ast_mutex_lock(&transport->base.read_lock);
 | |
| 		log_error(self, "poll failure: %s", strerror(errno));
 | |
| 		/* Ensure this transport is in a disconnected state */
 | |
| 		aeap_transport_disconnect(self);
 | |
| 		return -1;
 | |
| 	}
 | |
| 	ast_mutex_lock(&transport->base.read_lock);
 | |
| 
 | |
| 	if (!transport->ws) {
 | |
| 		/*
 | |
| 		 * It's possible the transport was told to disconnect while waiting for input.
 | |
| 		 * If so then the websocket will be NULL, so we don't want to continue.
 | |
| 		 */
 | |
| 		return 0;
 | |
| 	}
 | |
| 
 | |
| 	do {
 | |
| 		if (ast_websocket_read(transport->ws, &payload, &bytes_read, &opcode,
 | |
| 				&fragmented) != 0) {
 | |
| 			log_error(self, "read failure (%d): %s", opcode, strerror(errno));
 | |
| 			return -1;
 | |
| 		}
 | |
| 
 | |
| 		if (!bytes_read) {
 | |
| 			continue;
 | |
| 		}
 | |
| 
 | |
| 		if (total_bytes_read + bytes_read > size) {
 | |
| 			log_error(self, "attempted to read too many bytes into (%jd) sized buffer", size);
 | |
| 			return -1;
 | |
| 		}
 | |
| 
 | |
| 		memcpy(buf + total_bytes_read, payload, bytes_read);
 | |
| 		total_bytes_read += bytes_read;
 | |
| 
 | |
| 	} while (opcode == AST_WEBSOCKET_OPCODE_CONTINUATION);
 | |
| 
 | |
| 	switch (opcode) {
 | |
| 	case AST_WEBSOCKET_OPCODE_CLOSE:
 | |
| 		log_error(self, "closed");
 | |
| 		return -1;
 | |
| 	case AST_WEBSOCKET_OPCODE_BINARY:
 | |
| 		*rtype = AST_AEAP_DATA_TYPE_BINARY;
 | |
| 		break;
 | |
| 	case AST_WEBSOCKET_OPCODE_TEXT:
 | |
| 		*rtype = AST_AEAP_DATA_TYPE_STRING;
 | |
| 
 | |
| 		/* Append terminator, but check for overflow first */
 | |
| 		if (total_bytes_read == size) {
 | |
| 			log_error(self, "unable to write string terminator");
 | |
| 			return -1;
 | |
| 		}
 | |
| 
 | |
| 		*((char *)(buf + total_bytes_read)) = '\0';
 | |
| 		break;
 | |
| 	default:
 | |
| 		/* Ignore all other message types */
 | |
| 		return 0;
 | |
| 	}
 | |
| 
 | |
| 	return total_bytes_read;
 | |
| }
 | |
| 
 | |
| static intmax_t websocket_write(struct aeap_transport *self, const void *buf, intmax_t size,
 | |
| 	enum AST_AEAP_DATA_TYPE wtype)
 | |
| {
 | |
| 	struct aeap_transport_websocket *transport = (struct aeap_transport_websocket *)self;
 | |
| 	intmax_t res = 0;
 | |
| 
 | |
| 	switch (wtype) {
 | |
| 	case AST_AEAP_DATA_TYPE_BINARY:
 | |
| 		res = ast_websocket_write(transport->ws, AST_WEBSOCKET_OPCODE_BINARY,
 | |
| 			(char *)buf, size);
 | |
| 		break;
 | |
| 	case AST_AEAP_DATA_TYPE_STRING:
 | |
| 		res = ast_websocket_write(transport->ws, AST_WEBSOCKET_OPCODE_TEXT,
 | |
| 			(char *)buf, size);
 | |
| 		break;
 | |
| 	default:
 | |
| 		break;
 | |
| 	}
 | |
| 
 | |
| 	if (res < 0) {
 | |
| 		log_error(self, "problem writing to websocket (closed)");
 | |
| 
 | |
| 		/*
 | |
| 		 * If the underlying socket is closed then ensure the
 | |
| 		 * transport is in a disconnected state as well.
 | |
| 		 */
 | |
| 		aeap_transport_disconnect(self);
 | |
| 
 | |
| 		return res;
 | |
| 	}
 | |
| 
 | |
| 	return size;
 | |
| }
 | |
| 
 | |
| static struct aeap_transport_vtable *transport_websocket_vtable(void)
 | |
| {
 | |
| 	static struct aeap_transport_vtable websocket_vtable = {
 | |
| 		.connect = websocket_connect,
 | |
| 		.disconnect = websocket_disconnect,
 | |
| 		.destroy = websocket_destroy,
 | |
| 		.read = websocket_read,
 | |
| 		.write = websocket_write,
 | |
| 	};
 | |
| 
 | |
| 	return &websocket_vtable;
 | |
| }
 | |
| 
 | |
| /*!
 | |
|  * \brief Initialize a transport websocket object, and set its virtual table
 | |
|  *
 | |
|  * \param transport The transport to initialize
 | |
|  *
 | |
|  * \returns 0 on success, -1 on error
 | |
|  */
 | |
| static int transport_websocket_init(struct aeap_transport_websocket *transport)
 | |
| {
 | |
| 	transport->ws = NULL;
 | |
| 
 | |
| 	((struct aeap_transport *)transport)->vtable = transport_websocket_vtable();
 | |
| 
 | |
| 	return 0;
 | |
| }
 | |
| 
 | |
| struct aeap_transport_websocket *aeap_transport_websocket_create(void)
 | |
| {
 | |
| 	struct aeap_transport_websocket *transport;
 | |
| 
 | |
| 	transport = ast_calloc(1, sizeof(*transport));
 | |
| 	if (!transport) {
 | |
| 		ast_log(LOG_ERROR, "AEAP websocket: unable to create transport websocket");
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	if (transport_websocket_init(transport)) {
 | |
| 		ast_free(transport);
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	return transport;
 | |
| }
 |