diff --git a/channels/chan_websocket.c b/channels/chan_websocket.c index b86bac4c67..a431f26092 100644 --- a/channels/chan_websocket.c +++ b/channels/chan_websocket.c @@ -70,6 +70,19 @@ struct webchan_conf_global { enum webchan_control_msg_format control_msg_format; }; +/* This is from the perspective of the app, NOT Asterisk */ +enum webchan_media_direction { + WEBCHAN_MEDIA_DIRECTION_BOTH, + WEBCHAN_MEDIA_DIRECTION_OUT, + WEBCHAN_MEDIA_DIRECTION_IN, +}; + +static const char *websocket_media_direction_map[] = { + [WEBCHAN_MEDIA_DIRECTION_BOTH] = "both", + [WEBCHAN_MEDIA_DIRECTION_OUT] = "out", + [WEBCHAN_MEDIA_DIRECTION_IN] = "in", +}; + static struct ast_websocket_server *ast_ws_server; static struct ao2_container *instances = NULL; @@ -102,6 +115,7 @@ struct websocket_pvt { int frame_queue_length; int queue_full; int queue_paused; + int media_direction; char connection_id[0]; }; @@ -126,6 +140,7 @@ struct websocket_pvt { #define REPORT_QUEUE_DRAINED "REPORT_QUEUE_DRAINED" #define PAUSE_MEDIA "PAUSE_MEDIA" #define CONTINUE_MEDIA "CONTINUE_MEDIA" +#define SET_MEDIA_DIRECTION "SET_MEDIA_DIRECTION" #define QUEUE_LENGTH_MAX 1000 #define QUEUE_LENGTH_XOFF_LEVEL 900 @@ -142,6 +157,7 @@ static struct ast_frame *webchan_read(struct ast_channel *ast); static int webchan_write(struct ast_channel *ast, struct ast_frame *f); static int webchan_hangup(struct ast_channel *ast); static int webchan_send_dtmf_text(struct ast_channel *ast, char digit, unsigned int duration); +static int set_channel_timer(struct websocket_pvt *instance); #define websocket_request_hangup(_instance, _cause, _tech) \ _websocket_request_hangup(_instance, _cause, _tech, __LINE__, __FUNCTION__) @@ -746,6 +762,27 @@ static int queue_option_frame(struct websocket_pvt *instance, return 0; } +#define ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command) \ +({ \ + if (instance->passthrough) { \ + send_event(instance, ERROR, "%s not supported in passthrough mode", command); \ + ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", \ + ast_channel_name(instance->channel), command); \ + return 0; \ + } \ +}) + +#define ERROR_ON_INVALID_MEDIA_DIRECTION_RTN(instance, command, direction) \ +({ \ + if (instance->media_direction == direction) { \ + send_event(instance, ERROR, "%s not supported while media direction " \ + "is '%s'", command, websocket_media_direction_map[direction]); \ + ast_debug(4, "%s: WebSocket media direction is '%s'. Ignoring %s command.\n", \ + ast_channel_name(instance->channel), websocket_media_direction_map[direction], command); \ + return 0; \ + } \ +}) + /*! * \internal * \brief Handle commands from the websocket @@ -787,12 +824,8 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) websocket_request_hangup(instance, AST_CAUSE_NORMAL, AST_WEBSOCKET_STATUS_NORMAL); } else if (ast_strings_equal(command, START_MEDIA_BUFFERING)) { - if (instance->passthrough) { - send_event(instance, ERROR, "%s not supported in passthrough mode", command); - ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", - ast_channel_name(instance->channel), command); - return 0; - } + ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command); + ERROR_ON_INVALID_MEDIA_DIRECTION_RTN(instance, command, WEBCHAN_MEDIA_DIRECTION_IN); AST_LIST_LOCK(&instance->frame_queue); instance->bulk_media_in_progress = 1; AST_LIST_UNLOCK(&instance->frame_queue); @@ -809,12 +842,8 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) id = data; } - if (instance->passthrough) { - send_event(instance, ERROR, "%s not supported in passthrough mode", command); - ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", - ast_channel_name(instance->channel), command); - return 0; - } + ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command); + ERROR_ON_INVALID_MEDIA_DIRECTION_RTN(instance, command, WEBCHAN_MEDIA_DIRECTION_IN); ast_debug(4, "%s: WebSocket %s '%s' with %d bytes in leftover_data.\n", ast_channel_name(instance->channel), STOP_MEDIA_BUFFERING, id, @@ -841,12 +870,8 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) SCOPED_LOCK(frame_queue_lock, &instance->frame_queue, AST_LIST_LOCK, AST_LIST_UNLOCK); - if (instance->passthrough) { - send_event(instance, ERROR, "%s not supported in passthrough mode", command); - ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", - ast_channel_name(instance->channel), command); - return 0; - } + ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command); + ERROR_ON_INVALID_MEDIA_DIRECTION_RTN(instance, command, WEBCHAN_MEDIA_DIRECTION_IN); if (instance->control_msg_format == WEBCHAN_CONTROL_MSG_FORMAT_JSON) { id = ast_json_object_string_get(json, "correlation_id"); @@ -867,12 +892,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) } else if (ast_strings_equal(command, FLUSH_MEDIA)) { struct ast_frame *frame = NULL; - if (instance->passthrough) { - send_event(instance, ERROR, "FLUSH_MEDIA not supported in passthrough mode"); - ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", - ast_channel_name(instance->channel), command); - return 0; - } + ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command); AST_LIST_LOCK(&instance->frame_queue); while ((frame = AST_LIST_REMOVE_HEAD(&instance->frame_queue, frame_list))) { @@ -884,12 +904,7 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) AST_LIST_UNLOCK(&instance->frame_queue); } else if (ast_strings_equal(command, REPORT_QUEUE_DRAINED)) { - if (instance->passthrough) { - send_event(instance, ERROR, "%s not supported in passthrough mode", command); - ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", - ast_channel_name(instance->channel), command); - return 0; - } + ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command); AST_LIST_LOCK(&instance->frame_queue); instance->report_queue_drained = 1; @@ -899,27 +914,79 @@ static int handle_command(struct websocket_pvt *instance, char *buffer) return send_event(instance, STATUS); } else if (ast_strings_equal(command, PAUSE_MEDIA)) { - if (instance->passthrough) { - send_event(instance, ERROR, "%s not supported in passthrough mode", command); - ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", - ast_channel_name(instance->channel), command); - return 0; - } + ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command); + ERROR_ON_INVALID_MEDIA_DIRECTION_RTN(instance, command, WEBCHAN_MEDIA_DIRECTION_IN); AST_LIST_LOCK(&instance->frame_queue); instance->queue_paused = 1; AST_LIST_UNLOCK(&instance->frame_queue); } else if (ast_strings_equal(command, CONTINUE_MEDIA)) { - if (instance->passthrough) { - send_event(instance, ERROR, "%s not supported in passthrough mode", command); - ast_debug(4, "%s: WebSocket in passthrough mode. Ignoring %s command.\n", - ast_channel_name(instance->channel), command); - return 0; - } + ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command); + ERROR_ON_INVALID_MEDIA_DIRECTION_RTN(instance, command, WEBCHAN_MEDIA_DIRECTION_IN); AST_LIST_LOCK(&instance->frame_queue); instance->queue_paused = 0; AST_LIST_UNLOCK(&instance->frame_queue); + } else if (ast_strings_equal(command, SET_MEDIA_DIRECTION)) { + const char *direction; + + ERROR_ON_PASSTHROUGH_MODE_RTN(instance, command); + + if (instance->control_msg_format != WEBCHAN_CONTROL_MSG_FORMAT_JSON) { + send_event(instance, ERROR, "%s only supports JSON format.\n", command); + return 0; + } + + direction = ast_json_object_string_get(json, "direction"); + if (!direction) { + send_event(instance, ERROR, "%s requires a 'direction' parameter.\n", command); + return 0; + } + + if (!strcmp("both", direction)) { + if (instance->media_direction == WEBCHAN_MEDIA_DIRECTION_BOTH) { + return 0; + } + + if (!instance->timer) { + set_channel_timer(instance); + ast_queue_frame(instance->channel, &ast_null_frame); + } + + instance->media_direction = WEBCHAN_MEDIA_DIRECTION_BOTH; + + } else if (!strcmp("out", direction)) { + if (instance->media_direction == WEBCHAN_MEDIA_DIRECTION_OUT) { + return 0; + } + + if (!instance->timer) { + set_channel_timer(instance); + ast_queue_frame(instance->channel, &ast_null_frame); + } + + instance->media_direction = WEBCHAN_MEDIA_DIRECTION_OUT; + + } else if (!strcmp("in", direction)) { + if (instance->media_direction == WEBCHAN_MEDIA_DIRECTION_IN) { + return 0; + } + + if (instance->timer) { + ast_channel_internal_fd_clear(instance->channel, WS_TIMER_FDNO); + ast_timer_close(instance->timer); + instance->timer = NULL; + ast_queue_frame(instance->channel, &ast_null_frame); + } + + instance->media_direction = WEBCHAN_MEDIA_DIRECTION_IN; + + } else { + send_event(instance, ERROR, "'%s' is not a valid direction for %s.\n", + direction, command); + return 0; + } + } else { ast_log(LOG_WARNING, "%s: WebSocket %s command unknown\n", ast_channel_name(instance->channel), command); @@ -1127,7 +1194,14 @@ static int read_from_ws_and_queue(struct websocket_pvt *instance) return -1; } - if (opcode != AST_WEBSOCKET_OPCODE_BINARY) { + if (opcode == AST_WEBSOCKET_OPCODE_BINARY) { + /* If the application's media direction is 'in', drop any media we receive from it */ + if (instance->media_direction == WEBCHAN_MEDIA_DIRECTION_IN) { + ast_debug(5, "%s: WebSocket dropped frame (application media direction is 'in')\n", + ast_channel_name(instance->channel)); + return 0; + } + } else { ast_log(LOG_WARNING, "%s: WebSocket frame type %d not supported\n", ast_channel_name(instance->channel), (int)opcode); websocket_request_hangup(instance, AST_CAUSE_FAILURE, AST_WEBSOCKET_STATUS_UNSUPPORTED_DATA); @@ -1205,6 +1279,11 @@ static int webchan_write(struct ast_channel *ast, struct ast_frame *f) return -1; } + /* The app doesn't want media right now */ + if (instance->media_direction == WEBCHAN_MEDIA_DIRECTION_OUT) { + return 0; + } + if (f->frametype == AST_FRAME_CNG) { return 0; } @@ -1580,6 +1659,7 @@ enum { OPT_WS_URI_PARAM = (1 << 2), OPT_WS_PASSTHROUGH = (1 << 3), OPT_WS_MSG_FORMAT = (1 << 4), + OPT_WS_MEDIA_DIRECTION = (1 << 5), }; enum { @@ -1588,6 +1668,7 @@ enum { OPT_ARG_WS_URI_PARAM, OPT_ARG_WS_PASSTHROUGH, OPT_ARG_WS_MSG_FORMAT, + OPT_ARG_WS_MEDIA_DIRECTION, OPT_ARG_ARRAY_SIZE }; @@ -1597,6 +1678,7 @@ AST_APP_OPTIONS(websocket_options, BEGIN_OPTIONS AST_APP_OPTION_ARG('v', OPT_WS_URI_PARAM, OPT_ARG_WS_URI_PARAM), AST_APP_OPTION('p', OPT_WS_PASSTHROUGH), AST_APP_OPTION_ARG('f', OPT_WS_MSG_FORMAT, OPT_ARG_WS_MSG_FORMAT), + AST_APP_OPTION_ARG('d', OPT_WS_MEDIA_DIRECTION, OPT_ARG_WS_MEDIA_DIRECTION), END_OPTIONS ); static struct ast_channel *webchan_request(const char *type, @@ -1673,6 +1755,22 @@ static struct ast_channel *webchan_request(const char *type, goto failure; } + instance->media_direction = WEBCHAN_MEDIA_DIRECTION_BOTH; + if (ast_test_flag(&opts, OPT_WS_MEDIA_DIRECTION)) { + if (!strcmp("both", opt_args[OPT_ARG_WS_MEDIA_DIRECTION])) { + /* The default. Don't need to do anything here other than + * ensure it is an allowed value. */ + } else if (!strcmp("out", opt_args[OPT_ARG_WS_MEDIA_DIRECTION])) { + instance->media_direction = WEBCHAN_MEDIA_DIRECTION_OUT; + } else if (!strcmp("in", opt_args[OPT_ARG_WS_MEDIA_DIRECTION])) { + instance->media_direction = WEBCHAN_MEDIA_DIRECTION_IN; + } else { + ast_log(LOG_ERROR, "Unrecognized option for media direction: '%s'.\n", + opt_args[OPT_ARG_WS_MEDIA_DIRECTION]); + goto failure; + } + } + instance->no_auto_answer = ast_test_flag(&opts, OPT_WS_NO_AUTO_ANSWER); if (!instance->passthrough) { instance->passthrough = ast_test_flag(&opts, OPT_WS_PASSTHROUGH); @@ -1751,7 +1849,9 @@ static struct ast_channel *webchan_request(const char *type, goto failure; } - if (set_channel_timer(instance) != 0) { + /* If the application's media direction is 'both' or 'out', we need the channel timer. */ + if (instance->media_direction != WEBCHAN_MEDIA_DIRECTION_IN + && set_channel_timer(instance) != 0) { goto failure; } diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c index d989abb9ab..3acb0e072c 100644 --- a/res/ari/resource_channels.c +++ b/res/ari/resource_channels.c @@ -2212,11 +2212,21 @@ static int external_media_websocket(struct ast_ari_channels_external_media_args char *endpoint; struct ast_channel *chan; struct varshead *vars; + char direction[16] = ""; - if (ast_asprintf(&endpoint, "WebSocket/%s%s%s", + /* If direction is set here, it WILL override any m() line in transport data + * since it is appended to the end of the string. + */ + if (args->direction) { + snprintf(direction, sizeof(direction), "d(%s)", args->direction); + } + + if (ast_asprintf(&endpoint, "WebSocket/%s%s%s%s%s", args->external_host, S_COR(args->transport_data, "/", ""), - S_OR(args->transport_data, "")) == -1) { + S_OR(args->transport_data, ""), + S_COR(!args->transport_data && args->direction, "/", ""), + direction) == -1) { return 1; } @@ -2354,8 +2364,14 @@ void ast_ari_channels_external_media(struct ast_variable *headers, return; } - if (ast_strlen_zero(args->direction)) { - args->direction = "both"; + if (!ast_strlen_zero(args->direction)) { + if (strcmp(args->direction, "both") && strcmp(args->direction, "in") + && strcmp(args->direction, "out")) { + ast_ari_response_error( + response, 400, "Bad Request", + "Invalid direction specified"); + return; + } } if (strcasecmp(args->encapsulation, "rtp") == 0 && strcasecmp(args->transport, "udp") == 0) {