diff --git a/channels/chan_websocket.c b/channels/chan_websocket.c index df7373c52a..ad4e694ccf 100644 --- a/channels/chan_websocket.c +++ b/channels/chan_websocket.c @@ -90,6 +90,7 @@ struct websocket_pvt { pthread_t outbound_read_thread; size_t bytes_read; size_t leftover_len; + char *remote_addr; char *uri_params; char *leftover_data; enum webchan_control_msg_format control_msg_format; @@ -104,6 +105,13 @@ struct websocket_pvt { char connection_id[0]; }; +/* + * These are the indexes in the channel's file descriptor array + * not the file descriptors themselves. + */ +#define WS_TIMER_FDNO (AST_EXTENDED_FDS + 1) +#define WS_WEBSOCKET_FDNO (AST_EXTENDED_FDS + 2) + #define MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE "MEDIA_WEBSOCKET_OPTIMAL_FRAME_SIZE" #define MEDIA_WEBSOCKET_CONNECTION_ID "MEDIA_WEBSOCKET_CONNECTION_ID" #define INCOMING_CONNECTION_ID "INCOMING" @@ -125,6 +133,9 @@ struct websocket_pvt { #define MAX_TEXT_MESSAGE_LEN MIN(128, (AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE - 1)) /* Forward declarations */ +static int read_from_ws_and_queue(struct websocket_pvt *instance); +static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause, + enum ast_websocket_status_code tech_cause, int line, const char *function); static struct ast_channel *webchan_request(const char *type, struct ast_format_cap *cap, const struct ast_assigned_ids *assignedids, const struct ast_channel *requestor, const char *data, int *cause); static int webchan_call(struct ast_channel *ast, const char *dest, int timeout); static struct ast_frame *webchan_read(struct ast_channel *ast); @@ -132,6 +143,9 @@ static int webchan_write(struct ast_channel *ast, struct ast_frame *f); static int webchan_hangup(struct ast_channel *ast); static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration); +#define websocket_request_hangup(_instance, _cause, _tech) \ + _websocket_request_hangup(_instance, _cause, _tech, __LINE__, __FUNCTION__) + static struct ast_channel_tech websocket_tech = { .type = "WebSocket", .description = "Media over WebSocket Channel Driver", @@ -416,19 +430,17 @@ static __attribute__ ((format (gnu_printf, 2, 3))) char *_create_event_ERROR( ({ \ int _res = -1; \ char *_payload = _create_event_ ## _event(_instance, ##__VA_ARGS__); \ - ao2_lock(instance); \ if (_payload && _instance->websocket) { \ _res = ast_websocket_write_string(_instance->websocket, _payload); \ if (_res != 0) { \ ast_log(LOG_ERROR, "%s: Unable to send event %s\n", \ ast_channel_name(instance->channel), _payload); \ } else { \ - ast_debug(4, "%s: Sent %s\n", \ + ast_debug(3, "%s: Sent %s\n", \ ast_channel_name(instance->channel), _payload); \ }\ ast_free(_payload); \ } \ - ao2_unlock(instance); \ (_res); \ }) @@ -551,7 +563,16 @@ static struct ast_frame *dequeue_frame(struct websocket_pvt *instance) /*! * \internal * - * Called by the core channel thread each time the instance timer fires. + * There are two file descriptors on this channel that can trigger + * this function... + * + * The timer fd (WS_TIMER_FDNO) which gets triggered at a constant + * rate determined by the format. In this case, we need to pull a + * frame OFF the queue and return it to the core. + * + * The websocket fd (WS_WEBSOCKET_FDNO) which gets triggered when + * there's incoming data to read from the websocket. In this case, + * we read the data and put it ON the queue. We'll return a null frame. * */ static struct ast_frame *webchan_read(struct ast_channel *ast) @@ -559,12 +580,21 @@ static struct ast_frame *webchan_read(struct ast_channel *ast) struct websocket_pvt *instance = NULL; struct ast_frame *native_frame = NULL; struct ast_frame *slin_frame = NULL; + int fdno = ast_channel_fdno(ast); instance = ast_channel_tech_pvt(ast); if (!instance) { return NULL; } + if (fdno == WS_WEBSOCKET_FDNO) { + read_from_ws_and_queue(instance); + return &ast_null_frame; + } + if (fdno != WS_TIMER_FDNO) { + return &ast_null_frame; + } + if (ast_timer_get_event(instance->timer) == AST_TIMING_EVENT_EXPIRED) { ast_timer_ack(instance->timer, 1); } @@ -754,7 +784,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) ast_queue_control(instance->channel, AST_CONTROL_ANSWER); } else if (ast_strings_equal(command, HANGUP_CHANNEL)) { - ast_queue_control(instance->channel, AST_CONTROL_HANGUP); + websocket_request_hangup(instance, AST_CAUSE_NORMAL, AST_WEBSOCKET_STATUS_NORMAL); } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) { if (instance->passthrough) { @@ -1064,36 +1094,19 @@ static int read_from_ws_and_queue(struct websocket_pvt *instance) int fragmented = 0; int res = 0; - if (!instance || !instance->websocket) { - ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", - ast_channel_name(instance->channel)); - return -1; - } - - ast_debug(9, "%s: Waiting for websocket to have data\n", ast_channel_name(instance->channel)); - res = ast_wait_for_input( - ast_websocket_fd(instance->websocket), -1); - if (res <= 0) { - ast_log(LOG_WARNING, "%s: WebSocket read failed: %s\n", - ast_channel_name(instance->channel), strerror(errno)); - return -1; - } - - /* - * We need to lock here to prevent the websocket handle from - * being pulled out from under us if the core sends us a - * hangup request. - */ - ao2_lock(instance); if (!instance->websocket) { - ao2_unlock(instance); + ast_log(LOG_WARNING, "%s: WebSocket session not found\n", + ast_channel_name(instance->channel)); return -1; } res = ast_websocket_read(instance->websocket, &payload, &payload_len, &opcode, &fragmented); - ao2_unlock(instance); + if (res) { + ast_debug(3, "%s: WebSocket read error\n", + ast_channel_name(instance->channel)); + websocket_request_hangup(instance, AST_CAUSE_NETWORK_OUT_OF_ORDER, AST_WEBSOCKET_STATUS_GOING_AWAY); return -1; } ast_debug(5, "%s: WebSocket read %d bytes\n", ast_channel_name(instance->channel), @@ -1104,41 +1117,52 @@ static int read_from_ws_and_queue(struct websocket_pvt *instance) } if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) { - ast_debug(5, "%s: WebSocket closed by remote\n", + ast_debug(3, "%s: WebSocket closed by remote\n", ast_channel_name(instance->channel)); + websocket_request_hangup(instance, AST_CAUSE_NORMAL, AST_WEBSOCKET_STATUS_GOING_AWAY); return -1; } if (opcode != AST_WEBSOCKET_OPCODE_BINARY) { - ast_debug(5, "%s: WebSocket frame type %d not supported. Ignoring.\n", + ast_log(LOG_WARNING, "%s: WebSocket frame type %d not supported\n", ast_channel_name(instance->channel), (int)opcode); + websocket_request_hangup(instance, AST_CAUSE_FAILURE, AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA); return 0; } return process_binary_message(instance, payload, payload_len); } -/*! - * \internal - * - * For incoming websocket connections, this function gets called by - * incoming_ws_established_cb() and is run in the http server thread - * handling the websocket connection. - * - * For outgoing websocket connections, this function gets started as - * a background thread by webchan_call(). - */ -static void *read_thread_handler(void *obj) +static int websocket_handoff_to_channel(struct websocket_pvt *instance) { - RAII_VAR(struct websocket_pvt *, instance, obj, ao2_cleanup); int res = 0; + int nodelay = 1; + struct ast_sockaddr *remote_addr = ast_websocket_remote_address(instance->websocket); - ast_debug(3, "%s: Read thread started\n", ast_channel_name(instance->channel)); + instance->remote_addr = ast_strdup(ast_sockaddr_stringify(remote_addr)); + ast_debug(3, "%s: WebSocket connection with %s established\n", + ast_channel_name(instance->channel), instance->remote_addr); + + if (setsockopt(ast_websocket_fd(instance->websocket), + IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) { + ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno)); + } + + ast_channel_set_fd(instance->channel, WS_WEBSOCKET_FDNO, ast_websocket_fd(instance->websocket)); res = send_event(instance, MEDIA_START); if (res != 0 ) { - ast_queue_control(instance->channel, AST_CONTROL_HANGUP); - return NULL; + if (instance->type == AST_WS_TYPE_SERVER) { + websocket_request_hangup(instance, AST_CAUSE_NETWORK_OUT_OF_ORDER, AST_WEBSOCKET_STATUS_GOING_AWAY); + } else { + /* + * We were called by webchan_call so just need to set causes. + * The core will hangup the channel. + */ + ast_channel_tech_hangupcause_set(instance->channel, AST_WEBSOCKET_STATUS_GOING_AWAY); + ast_channel_hangupcause_set(instance->channel, AST_CAUSE_NETWORK_OUT_OF_ORDER); + } + return -1; } if (!instance->no_auto_answer) { @@ -1146,17 +1170,24 @@ static void *read_thread_handler(void *obj) ast_queue_control(instance->channel, AST_CONTROL_ANSWER); } - while (read_from_ws_and_queue(instance) == 0) - { - } + return 0; +} - /* - * websocket_hangup will take care of closing the websocket if needed. - */ - ast_debug(3, "%s: HANGUP by websocket close/error\n", ast_channel_name(instance->channel)); - ast_queue_control(instance->channel, AST_CONTROL_HANGUP); +static void _websocket_request_hangup(struct websocket_pvt *instance, int ast_cause, + enum ast_websocket_status_code tech_cause, int line, const char *function) +{ + if (!instance || !instance->channel) { + return; + } + ast_debug(3, "%s:%s: Hangup requested from %s line %d. cause: %s(%d) tech_cause: %s(%d)", + ast_channel_name(instance->channel), instance->remote_addr, + function, line, + ast_cause2str(ast_cause), ast_cause, ast_websocket_status_to_str(tech_cause), tech_cause); - return NULL; + if (tech_cause) { + ast_channel_tech_hangupcause_set(instance->channel, tech_cause); + } + ast_queue_hangup_with_cause(instance->channel, ast_cause); } /*! \brief Function called when we should write a frame to the channel */ @@ -1195,17 +1226,19 @@ static int webchan_write(struct ast_channel *ast, struct ast_frame *f) * \internal * * Called by the core to actually call the remote. + * The core will hang up the channel if a non-zero is returned. + * We just need to set hangup causes if appropriate. */ static int webchan_call(struct ast_channel *ast, const char *dest, int timeout) { struct websocket_pvt *instance = ast_channel_tech_pvt(ast); - int nodelay = 1; enum ast_websocket_result result; if (!instance) { ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", ast_channel_name(ast)); + ast_channel_hangupcause_set(ast, AST_CAUSE_FAILURE); return -1; } @@ -1218,6 +1251,7 @@ static int webchan_call(struct ast_channel *ast, const char *dest, if (!instance->client) { ast_log(LOG_WARNING, "%s: WebSocket client not found\n", ast_channel_name(ast)); + ast_channel_hangupcause_set(ast, AST_CAUSE_FAILURE); return -1; } @@ -1233,26 +1267,11 @@ static int webchan_call(struct ast_channel *ast, const char *dest, if (!instance->websocket || result != WS_OK) { ast_log(LOG_WARNING, "%s: WebSocket connection failed to %s: %s\n", ast_channel_name(ast), dest, ast_websocket_result_to_str(result)); + ast_channel_hangupcause_set(ast, AST_CAUSE_NO_ROUTE_DESTINATION); return -1; } - if (setsockopt(ast_websocket_fd(instance->websocket), - IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) { - ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on websocket connection: %s\n", strerror(errno)); - } - - ast_debug(3, "%s: WebSocket connection to %s established\n", - ast_channel_name(ast), dest); - - /* read_thread_handler() will clean up the bump */ - if (ast_pthread_create_detached_background(&instance->outbound_read_thread, NULL, - read_thread_handler, ao2_bump(instance))) { - ast_log(LOG_WARNING, "%s: Failed to create thread.\n", ast_channel_name(ast)); - ao2_cleanup(instance); - return -1; - } - - return 0; + return websocket_handoff_to_channel(instance); } static void websocket_destructor(void *data) @@ -1312,6 +1331,7 @@ static void websocket_destructor(void *data) } ast_free(instance->uri_params); + ast_free(instance->remote_addr); } struct instance_proxy { @@ -1501,7 +1521,7 @@ static int set_channel_timer(struct websocket_pvt *instance) * Calling ast_channel_set_fd will cause the channel thread to call * webchan_read at 'rate' times per second. */ - ast_channel_set_fd(instance->channel, 0, ast_timer_fd(instance->timer)); + ast_channel_set_fd(instance->channel, WS_TIMER_FDNO, ast_timer_fd(instance->timer)); return 0; } @@ -1779,20 +1799,12 @@ static int webchan_hangup(struct ast_channel *ast) ast_debug(3, "%s: WebSocket call hangup. cid: %s\n", ast_channel_name(ast), instance->connection_id); - /* - * We need to lock because read_from_ws_and_queue() is probably waiting - * on the websocket file descriptor and will unblock and immediately try to - * check the websocket and read from it. We don't want to pull the - * websocket out from under it between the check and read. - */ - ao2_lock(instance); if (instance->websocket) { - ast_websocket_close(instance->websocket, 1000); + ast_websocket_close(instance->websocket, ast_channel_tech_hangupcause(ast) ?: 1000); ast_websocket_unref(instance->websocket); instance->websocket = NULL; } ast_channel_tech_pvt_set(ast, NULL); - ao2_unlock(instance); /* Clean up the reference from adding the instance to the channel */ ao2_cleanup(instance); @@ -1827,7 +1839,6 @@ static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, struct ast_variable *v; const char *connection_id = NULL; struct websocket_pvt *instance = NULL; - int nodelay = 1; ast_debug(3, "WebSocket established\n"); @@ -1847,8 +1858,8 @@ static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, * Just in case though... */ ast_log(LOG_WARNING, "WebSocket connection id not found\n"); - ast_queue_control(instance->channel, AST_CONTROL_HANGUP); - ast_websocket_close(ast_ws_session, 1000); + websocket_request_hangup(instance, AST_CAUSE_FAILURE, AST_WEBSOCKET_STATUS_INTERNAL_ERROR); + ast_websocket_close(ast_ws_session, AST_WEBSOCKET_STATUS_INTERNAL_ERROR); return; } @@ -1860,22 +1871,18 @@ static void incoming_ws_established_cb(struct ast_websocket *ast_ws_session, * Just in case though... */ ast_log(LOG_WARNING, "%s: WebSocket instance not found\n", connection_id); - ast_queue_control(instance->channel, AST_CONTROL_HANGUP); - ast_websocket_close(ast_ws_session, 1000); + websocket_request_hangup(instance, AST_CAUSE_FAILURE, AST_WEBSOCKET_STATUS_INTERNAL_ERROR); + ast_websocket_close(ast_ws_session, AST_WEBSOCKET_STATUS_INTERNAL_ERROR); return; } instance->websocket = ao2_bump(ast_ws_session); - if (setsockopt(ast_websocket_fd(instance->websocket), - IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(nodelay)) < 0) { - ast_log(LOG_WARNING, "Failed to set TCP_NODELAY on manager connection: %s\n", strerror(errno)); - } - - /* read_thread_handler cleans up the bump */ - read_thread_handler(ao2_bump(instance)); - + websocket_handoff_to_channel(instance); ao2_cleanup(instance); - ast_debug(3, "WebSocket closed\n"); + /* + * The instance is the channel's responsibility now. + * We just return here. + */ } /*! diff --git a/include/asterisk/http_websocket.h b/include/asterisk/http_websocket.h index 062f710377..42629f2ef6 100644 --- a/include/asterisk/http_websocket.h +++ b/include/asterisk/http_websocket.h @@ -77,6 +77,26 @@ enum ast_websocket_opcode { AST_WEBSOCKET_OPCODE_CONTINUATION = 0x0, /*!< Continuation of a previous frame */ }; +/*! \brief Websocket Status Codes from RFC-6455 */ +enum ast_websocket_status_code { + AST_WEBSOCKET_STATUS_NORMAL = 1000, + AST_WEBSOCKET_STATUS_GOING_AWAY = 1001, + AST_WEBSOCKET_STATUS_PROTOCOL_ERROR = 1002, + AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA = 1003, + AST_WEBSOCKET_STATUS_RESERVED_1004 = 1004, + AST_WEBSOCKET_STATUS_RESERVED_1005 = 1005, + AST_WEBSOCKET_STATUS_RESERVED_1006 = 1006, + AST_WEBSOCKET_STATUS_INVALID_FRAME = 1007, + AST_WEBSOCKET_STATUS_POLICY_VIOLATION = 1008, + AST_WEBSOCKET_STATUS_TOO_BIG = 1009, + AST_WEBSOCKET_STATUS_MANDATORY_EXT = 1010, + AST_WEBSOCKET_STATUS_INTERNAL_ERROR = 1011, + AST_WEBSOCKET_STATUS_RESERVED_1012 = 1012, + AST_WEBSOCKET_STATUS_RESERVED_1013 = 1013, + AST_WEBSOCKET_STATUS_BAD_GATEWAY = 1014, + AST_WEBSOCKET_STATUS_RESERVED_1015 = 1015, +}; + #ifdef LOW_MEMORY /*! \brief Size of the pre-determined buffer for WebSocket frames */ #define AST_WEBSOCKET_MAX_RX_PAYLOAD_SIZE 8192 @@ -551,4 +571,13 @@ AST_OPTIONAL_API(int, ast_websocket_set_timeout, (struct ast_websocket *session, */ AST_OPTIONAL_API(const char *, ast_websocket_result_to_str, (enum ast_websocket_result result), {return "";}); +/*! + * \brief Convert a websocket status code to a string. + * + * \param code The code to convert + * + * \return A string representation of the code + */ +AST_OPTIONAL_API(const char *, ast_websocket_status_to_str, (enum ast_websocket_status_code code), {return "";}); + #endif diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c index 60caf41fef..2854f6be2e 100644 --- a/res/res_http_websocket.c +++ b/res/res_http_websocket.c @@ -1632,6 +1632,43 @@ const char *AST_OPTIONAL_API_NAME(ast_websocket_result_to_str) return websocket_result_string_map[result]; } +struct status_map { + enum ast_websocket_status_code code; + const char *desc; +}; + +static const struct status_map websocket_status_map[] = { + { AST_WEBSOCKET_STATUS_NORMAL, "Normal" }, + { AST_WEBSOCKET_STATUS_GOING_AWAY, "Going away" }, + { AST_WEBSOCKET_STATUS_PROTOCOL_ERROR, "Protocol error" }, + { AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA, "Unsupported data" }, + { AST_WEBSOCKET_STATUS_RESERVED_1004, "reserved 1004" }, + { AST_WEBSOCKET_STATUS_RESERVED_1005, "reserved 1005" }, + { AST_WEBSOCKET_STATUS_RESERVED_1006, "reserved 1006" }, + { AST_WEBSOCKET_STATUS_INVALID_FRAME, "Invalid frame" }, + { AST_WEBSOCKET_STATUS_POLICY_VIOLATION, "Policy violation" }, + { AST_WEBSOCKET_STATUS_TOO_BIG, "Data too big" }, + { AST_WEBSOCKET_STATUS_MANDATORY_EXT, "Mandatory extension" }, + { AST_WEBSOCKET_STATUS_INTERNAL_ERROR, "Internal error" }, + { AST_WEBSOCKET_STATUS_RESERVED_1012, "reserved 1012" }, + { AST_WEBSOCKET_STATUS_RESERVED_1013, "reserved 1013" }, + { AST_WEBSOCKET_STATUS_BAD_GATEWAY, "Bad gateway" }, + { AST_WEBSOCKET_STATUS_RESERVED_1015, "reserved 1015" }, +}; + +const char *AST_OPTIONAL_API_NAME(ast_websocket_status_to_str) + (enum ast_websocket_status_code code) +{ + int i; + + for (i = 0; i < ARRAY_LEN(websocket_status_map); i++) { + if (websocket_status_map[i].code == code) + return websocket_status_map[i].desc; + } + + return "Unknown"; +} + static int load_module(void) { websocketuri.data = websocket_server_internal_create();