ARI: REST over Websocket

This commit adds the ability to make ARI REST requests over the same
websocket used to receive events.

For full details on how to use the new capability, visit...

https://docs.asterisk.org/Configuration/Interfaces/Asterisk-REST-Interface-ARI/ARI-REST-over-WebSocket/

Changes:

* Added utilities to http.c:
  * ast_get_http_method_from_string().
  * ast_http_parse_post_form().
* Added utilities to json.c:
  * ast_json_nvp_array_to_ast_variables().
  * ast_variables_to_json_nvp_array().
* Added definitions for new events to carry REST responses.
* Created res/ari/ari_websocket_requests.c to house the new request handlers.
* Moved non-event specific code out of res/ari/resource_events.c into
  res/ari/ari_websockets.c
* Refactored res/res_ari.c to move non-http code out of ast_ari_callback()
  (which is http specific) and into ast_ari_invoke() so it can be shared
  between both the http and websocket transports.

UpgradeNote: This commit adds the ability to make ARI REST requests over the same
websocket used to receive events.
See https://docs.asterisk.org/Configuration/Interfaces/Asterisk-REST-Interface-ARI/ARI-REST-over-WebSocket/
22
George Joseph 1 month ago
parent ecd1a727e8
commit 1442c17141

@ -50,7 +50,7 @@ struct ast_ari_response;
/*!
* \brief Callback type for RESTful method handlers.
* \param ser TCP/TLS session object
* \param ser TCP/TLS session object (Maybe NULL if not available).
* \param get_params GET parameters from the HTTP request.
* \param path_vars Path variables from any wildcard path segments.
* \param headers HTTP headers from the HTTP requiest.
@ -78,8 +78,17 @@ struct stasis_rest_handlers {
int is_wildcard;
/*! Callbacks for all handled HTTP methods. */
stasis_rest_callback callbacks[AST_HTTP_MAX_METHOD];
/*! WebSocket server for handling WebSocket upgrades. */
struct ast_websocket_server *ws_server;
/*!
* ws_server is no longer needed to indicate if a path should cause
* an Upgrade to websocket but is kept for backwards compatability.
* Instead, simply set is_websocket to true.
*/
union {
/*! \deprecated WebSocket server for handling WebSocket upgrades. */
struct ast_websocket_server *ws_server;
/*! The path segment is handled by the websocket */
int is_websocket;
};
/*! Number of children in the children array */
size_t num_children;
/*! Handlers for sub-paths */
@ -121,6 +130,26 @@ int ast_ari_add_handler(struct stasis_rest_handlers *handler);
*/
int ast_ari_remove_handler(struct stasis_rest_handlers *handler);
/*!
* \internal
* \brief Stasis RESTful invocation handler response codes.
*/
enum ast_ari_invoke_result {
ARI_INVOKE_RESULT_SUCCESS = 0,
ARI_INVOKE_RESULT_ERROR_CONTINUE = -1,
ARI_INVOKE_RESULT_ERROR_CLOSE = -2,
};
/*!
* \internal
* \brief How was Stasis RESTful invocation handler invoked?
*/
enum ast_ari_invoke_source {
ARI_INVOKE_SOURCE_REST = 0,
ARI_INVOKE_SOURCE_WEBSOCKET,
ARI_INVOKE_SOURCE_TEST,
};
/*!
* \internal
* \brief Stasis RESTful invocation handler.
@ -135,8 +164,10 @@ int ast_ari_remove_handler(struct stasis_rest_handlers *handler);
* \param headers HTTP headers.
* \param body
* \param[out] response RESTful HTTP response.
* \param is_websocket Flag to indicate if this is a WebSocket request.
*/
void ast_ari_invoke(struct ast_tcptls_session_instance *ser,
enum ast_ari_invoke_result ast_ari_invoke(struct ast_tcptls_session_instance *ser,
enum ast_ari_invoke_source source, const struct ast_http_uri *urih,
const char *uri, enum ast_http_method method,
struct ast_variable *get_params, struct ast_variable *headers,
struct ast_json *body, struct ast_ari_response *response);
@ -155,63 +186,6 @@ void ast_ari_invoke(struct ast_tcptls_session_instance *ser,
*/
void ast_ari_get_docs(const char *uri, const char *prefix, struct ast_variable *headers, struct ast_ari_response *response);
/*! \brief Abstraction for reading/writing JSON to a WebSocket */
struct ast_ari_websocket_session;
/*!
* \brief Create an ARI WebSocket session.
*
* If \c NULL is given for the validator function, no validation will be
* performed.
*
* \param ws_session Underlying WebSocket session.
* \param validator Function to validate outgoing messages.
* \return New ARI WebSocket session.
* \retval NULL on error.
*/
struct ast_ari_websocket_session *ast_ari_websocket_session_create(
struct ast_websocket *ws_session, int (*validator)(struct ast_json *));
/*!
* \brief Read a message from an ARI WebSocket.
*
* \param session Session to read from.
* \return Message received.
* \retval NULL if WebSocket could not be read.
*/
struct ast_json *ast_ari_websocket_session_read(
struct ast_ari_websocket_session *session);
/*!
* \brief Send a message to an ARI WebSocket.
*
* \param session Session to write to.
* \param message Message to send.
* \retval 0 on success.
* \retval Non-zero on error.
*/
int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session,
struct ast_json *message);
/*!
* \brief Get the Session ID for an ARI WebSocket.
*
* \param session Session to query.
* \return Session ID.
* \retval NULL on error.
*/
const char *ast_ari_websocket_session_id(
const struct ast_ari_websocket_session *session);
/*!
* \brief Get the remote address from an ARI WebSocket.
*
* \param session Session to write to.
* \return ast_sockaddr (does not have to be freed)
*/
struct ast_sockaddr *ast_ari_websocket_session_get_remote_addr(
struct ast_ari_websocket_session *session);
/*!
* \brief The stock message to return when out of memory.
*

@ -156,6 +156,11 @@ void ast_http_uri_unlink_all_with_key(const char *key);
*/
const char *ast_get_http_method(enum ast_http_method method) attribute_pure;
/*!
* \brief Return http method from string
*/
enum ast_http_method ast_get_http_method_from_string(const char *method);
/*!
* \brief Return mime type based on extension
* \param ftype filename extension
@ -279,6 +284,20 @@ int ast_http_body_discard(struct ast_tcptls_session_instance *ser);
*/
struct ast_variable *ast_http_get_post_vars(struct ast_tcptls_session_instance *ser, struct ast_variable *headers);
/*!
* \brief Get post variables from an application/x-www-form-urlencoded buffer
* \param buf input buffer
* \param content_len Buffer length
* \param content_type Content type (must be "application/x-www-form-urlencoded")
*
* \warning The input buffer will be modified by strsep() so pass in a copy
* if you need to keep the original.
*
* \return List of ast_variables from the buffer. Must be freed with ast_variables_destroy().
*/
struct ast_variable *ast_http_parse_post_form(char *buf, int content_length,
const char *content_type);
struct ast_json;
/*!

@ -1137,6 +1137,63 @@ enum ast_json_to_ast_vars_code {
*/
enum ast_json_to_ast_vars_code ast_json_to_ast_variables(struct ast_json *json_variables, struct ast_variable **variables);
enum ast_json_nvp_ast_vars_code {
/*! \brief Conversion successful */
AST_JSON_NVP_AST_VARS_CODE_SUCCESS,
/*!
* \brief Conversion failed because invalid value type supplied.
* \note Only string values allowed.
*/
AST_JSON_NVP_AST_VARS_CODE_INVALID_TYPE,
/*! \brief Conversion failed because of allocation failure. (Out Of Memory) */
AST_JSON_NVP_AST_VARS_CODE_OOM,
/*! \brief Input was NULL or empty */
AST_JSON_NVP_AST_VARS_CODE_NO_INPUT,
};
/*!
* \brief Convert a \c ast_json array of name/value pairs into an \c ast_variable list
*
* This is the inverse of \ref ast_variables_to_json_nvp_array().
*
* \param json_array The JSON array containing the name/value pairs
* \param[out] variables The ast_variable list containing the name/value pairs
*
* If the variables list already exists, new values are appended to it.
*
* \note The JSON array must be in the following format:
* \code
* [
* {
* "name": "foo",
* "value": "bar"
* },
* {
* "name": "foo2",
* "value": "bar2"
* }
* ]
* \endcode
*
* \warning If an error occurred during parsing the variables list will contain
* all variables that had been successfully parsed before the error.
*
* \return enum ast_json_to_ast_vars_code indicating status.
*/
enum ast_json_nvp_ast_vars_code ast_json_nvp_array_to_ast_variables(
struct ast_json *json_array, struct ast_variable **variables);
/*!
* \brief Convert a \c ast_variable list into a \c ast_json array of name/value pairs
*
* This is the inverse of \ref ast_json_nvp_array_to_ast_variables().
*
* \param variables The ast_variable list to convert
* \return JSON array of name/value pairs. Must be freed with \ref ast_json_unref().
*/
struct ast_json *ast_variables_to_json_nvp_array(struct ast_variable *variables);
struct varshead;
/*!

@ -203,6 +203,19 @@ const char *ast_get_http_method(enum ast_http_method method)
return NULL;
}
enum ast_http_method ast_get_http_method_from_string(const char *method)
{
int x;
for (x = 0; x < ARRAY_LEN(ast_http_methods_text); x++) {
if (ast_strings_equal(method, ast_http_methods_text[x].text)) {
return ast_http_methods_text[x].method;
}
}
return AST_HTTP_UNKNOWN;
}
const char *ast_http_ftype2mtype(const char *ftype)
{
int x;
@ -1353,33 +1366,21 @@ struct ast_json *ast_http_get_json(
* get post variables from client Request Entity-Body, if content type is
* application/x-www-form-urlencoded
*/
struct ast_variable *ast_http_get_post_vars(
struct ast_tcptls_session_instance *ser, struct ast_variable *headers)
struct ast_variable *ast_http_parse_post_form(char *buf, int content_length,
const char *content_type)
{
int content_length = 0;
struct ast_variable *v, *post_vars=NULL, *prev = NULL;
char *var, *val;
RAII_VAR(char *, buf, NULL, ast_free);
RAII_VAR(char *, type, get_content_type(headers), ast_free);
/* Use errno to distinguish errors from no params */
errno = 0;
if (ast_strlen_zero(type) ||
strcasecmp(type, "application/x-www-form-urlencoded")) {
if (ast_strlen_zero(content_type) ||
strcasecmp(content_type, "application/x-www-form-urlencoded") != 0) {
/* Content type is not form data. Don't read the body. */
return NULL;
}
buf = ast_http_get_contents(&content_length, ser, headers);
if (!buf || !content_length) {
/*
* errno already set
* or it is not an error to have zero content
*/
return NULL;
}
while ((val = strsep(&buf, "&"))) {
var = strsep(&val, "=");
if (val) {
@ -1401,6 +1402,34 @@ struct ast_variable *ast_http_get_post_vars(
return post_vars;
}
struct ast_variable *ast_http_get_post_vars(
struct ast_tcptls_session_instance *ser, struct ast_variable *headers)
{
int content_length = 0;
RAII_VAR(char *, buf, NULL, ast_free);
RAII_VAR(char *, type, get_content_type(headers), ast_free);
/* Use errno to distinguish errors from no params */
errno = 0;
if (ast_strlen_zero(type) ||
strcasecmp(type, "application/x-www-form-urlencoded")) {
/* Content type is not form data. Don't read the body. */
return NULL;
}
buf = ast_http_get_contents(&content_length, ser, headers);
if (!buf || !content_length) {
/*
* errno already set
* or it is not an error to have zero content
*/
return NULL;
}
return ast_http_parse_post_form(buf, content_length, type);
}
static int handle_uri(struct ast_tcptls_session_instance *ser, char *uri,
enum ast_http_method method, struct ast_variable *headers)
{

@ -861,6 +861,83 @@ enum ast_json_to_ast_vars_code ast_json_to_ast_variables(struct ast_json *json_v
return AST_JSON_TO_AST_VARS_CODE_SUCCESS;
}
enum ast_json_nvp_ast_vars_code ast_json_nvp_array_to_ast_variables(
struct ast_json *json_variables, struct ast_variable **variables)
{
struct ast_variable *tail = NULL;
int i = 0;
size_t len = json_variables ? ast_json_array_size(json_variables) : 0;
if (len == 0) {
return AST_JSON_NVP_AST_VARS_CODE_NO_INPUT;
}
for (i = 0; i < len; i++) {
struct ast_variable *new_var;
struct ast_json *json_value;
struct ast_json *json_key;
const char *key;
const char *value;
json_value = ast_json_array_get(json_variables, i);
if (!json_value || ast_json_is_null(json_value) || ast_json_typeof(json_value) != AST_JSON_OBJECT) {
/* Error: Only objects allowed */
return AST_JSON_NVP_AST_VARS_CODE_INVALID_TYPE;
}
json_key = ast_json_object_get(json_value, "name");
if (!json_key || ast_json_is_null(json_key) || ast_json_typeof(json_key) != AST_JSON_STRING) {
/* Error: Only strings allowed */
return AST_JSON_NVP_AST_VARS_CODE_INVALID_TYPE;
}
key = ast_json_string_get(json_key);
json_key = ast_json_object_get(json_value, "value");
if (!json_key || ast_json_is_null(json_key) || ast_json_typeof(json_key) != AST_JSON_STRING) {
/* Error: Only strings allowed */
return AST_JSON_NVP_AST_VARS_CODE_INVALID_TYPE;
}
value = ast_json_string_get(json_key);
new_var = ast_variable_new(key, value, "");
if (!new_var) {
/* Error: OOM */
return AST_JSON_NVP_AST_VARS_CODE_OOM;
}
tail = ast_variable_list_append_hint(variables, tail, new_var);
}
return AST_JSON_NVP_AST_VARS_CODE_SUCCESS;
}
struct ast_json *ast_variables_to_json_nvp_array(struct ast_variable *variables)
{
struct ast_variable *v = NULL;
struct ast_json *json_variables = ast_json_array_create();
if (!variables || !json_variables) {
return NULL;
}
for (v = variables; v; v = v->next) {
struct ast_json *obj = ast_json_pack("{s: s, s: s}",
"name", v->name,
"value", v->value);
if (!obj) {
ast_json_unref(json_variables);
return NULL;
}
if (ast_json_array_append(json_variables, obj)) {
ast_json_unref(json_variables);
ast_json_unref(obj);
return NULL;
}
}
return json_variables;
}
struct ast_json *ast_json_channel_vars(struct varshead *channelvars)
{
struct ast_json *ret;

@ -62,7 +62,7 @@ $(call MOD_ADD_C,res_parking,$(wildcard parking/*.c))
$(call MOD_ADD_C,res_pjsip,$(wildcard res_pjsip/*.c))
$(call MOD_ADD_C,res_pjsip_session,$(wildcard res_pjsip_session/*.c))
$(call MOD_ADD_C,res_prometheus,$(wildcard prometheus/*.c))
$(call MOD_ADD_C,res_ari,ari/cli.c ari/config.c ari/ari_websockets.c)
$(call MOD_ADD_C,res_ari,ari/cli.c ari/config.c ari/ari_websockets.c ari/ari_websocket_requests.c)
$(call MOD_ADD_C,res_ari_model,ari/ari_model_validators.c)
$(call MOD_ADD_C,res_stasis_recording,stasis_recording/stored.c)
$(call MOD_ADD_C,res_stir_shaken,$(wildcard res_stir_shaken/*.c))

@ -6184,6 +6184,9 @@ int ast_ari_validate_event(struct ast_json *json)
if (strcmp("PlaybackStarted", discriminator) == 0) {
return ast_ari_validate_playback_started(json);
} else
if (strcmp("RESTResponse", discriminator) == 0) {
return ast_ari_validate_restresponse(json);
} else
if (strcmp("RecordingFailed", discriminator) == 0) {
return ast_ari_validate_recording_failed(json);
} else
@ -6403,6 +6406,9 @@ int ast_ari_validate_message(struct ast_json *json)
if (strcmp("PlaybackStarted", discriminator) == 0) {
return ast_ari_validate_playback_started(json);
} else
if (strcmp("RESTResponse", discriminator) == 0) {
return ast_ari_validate_restresponse(json);
} else
if (strcmp("RecordingFailed", discriminator) == 0) {
return ast_ari_validate_recording_failed(json);
} else
@ -7002,6 +7008,421 @@ ari_validator ast_ari_validate_playback_started_fn(void)
return ast_ari_validate_playback_started;
}
int ast_ari_validate_restheader(struct ast_json *json)
{
int res = 1;
struct ast_json_iter *iter;
int has_name = 0;
int has_value = 0;
for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
if (strcmp("name", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_name = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTHeader field name failed validation\n");
res = 0;
}
} else
if (strcmp("value", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_value = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTHeader field value failed validation\n");
res = 0;
}
} else
{
ast_log(LOG_ERROR,
"ARI RESTHeader has undocumented field %s\n",
ast_json_object_iter_key(iter));
res = 0;
}
}
if (!has_name) {
ast_log(LOG_ERROR, "ARI RESTHeader missing required field name\n");
res = 0;
}
if (!has_value) {
ast_log(LOG_ERROR, "ARI RESTHeader missing required field value\n");
res = 0;
}
return res;
}
ari_validator ast_ari_validate_restheader_fn(void)
{
return ast_ari_validate_restheader;
}
int ast_ari_validate_restquery_string_parameter(struct ast_json *json)
{
int res = 1;
struct ast_json_iter *iter;
int has_name = 0;
int has_value = 0;
for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
if (strcmp("name", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_name = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTQueryStringParameter field name failed validation\n");
res = 0;
}
} else
if (strcmp("value", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_value = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTQueryStringParameter field value failed validation\n");
res = 0;
}
} else
{
ast_log(LOG_ERROR,
"ARI RESTQueryStringParameter has undocumented field %s\n",
ast_json_object_iter_key(iter));
res = 0;
}
}
if (!has_name) {
ast_log(LOG_ERROR, "ARI RESTQueryStringParameter missing required field name\n");
res = 0;
}
if (!has_value) {
ast_log(LOG_ERROR, "ARI RESTQueryStringParameter missing required field value\n");
res = 0;
}
return res;
}
ari_validator ast_ari_validate_restquery_string_parameter_fn(void)
{
return ast_ari_validate_restquery_string_parameter;
}
int ast_ari_validate_restrequest(struct ast_json *json)
{
int res = 1;
struct ast_json_iter *iter;
int has_method = 0;
int has_request_id = 0;
int has_transaction_id = 0;
int has_type = 0;
int has_uri = 0;
for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
if (strcmp("content_type", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTRequest field content_type failed validation\n");
res = 0;
}
} else
if (strcmp("message_body", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTRequest field message_body failed validation\n");
res = 0;
}
} else
if (strcmp("method", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_method = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTRequest field method failed validation\n");
res = 0;
}
} else
if (strcmp("query_strings", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_list(
ast_json_object_iter_value(iter),
ast_ari_validate_restquery_string_parameter);
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTRequest field query_strings failed validation\n");
res = 0;
}
} else
if (strcmp("request_id", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_request_id = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTRequest field request_id failed validation\n");
res = 0;
}
} else
if (strcmp("transaction_id", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_transaction_id = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTRequest field transaction_id failed validation\n");
res = 0;
}
} else
if (strcmp("type", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_type = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTRequest field type failed validation\n");
res = 0;
}
} else
if (strcmp("uri", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_uri = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTRequest field uri failed validation\n");
res = 0;
}
} else
{
ast_log(LOG_ERROR,
"ARI RESTRequest has undocumented field %s\n",
ast_json_object_iter_key(iter));
res = 0;
}
}
if (!has_method) {
ast_log(LOG_ERROR, "ARI RESTRequest missing required field method\n");
res = 0;
}
if (!has_request_id) {
ast_log(LOG_ERROR, "ARI RESTRequest missing required field request_id\n");
res = 0;
}
if (!has_transaction_id) {
ast_log(LOG_ERROR, "ARI RESTRequest missing required field transaction_id\n");
res = 0;
}
if (!has_type) {
ast_log(LOG_ERROR, "ARI RESTRequest missing required field type\n");
res = 0;
}
if (!has_uri) {
ast_log(LOG_ERROR, "ARI RESTRequest missing required field uri\n");
res = 0;
}
return res;
}
ari_validator ast_ari_validate_restrequest_fn(void)
{
return ast_ari_validate_restrequest;
}
int ast_ari_validate_restresponse(struct ast_json *json)
{
int res = 1;
struct ast_json_iter *iter;
int has_type = 0;
int has_application = 0;
int has_timestamp = 0;
int has_reason_phrase = 0;
int has_request_id = 0;
int has_status_code = 0;
int has_transaction_id = 0;
int has_uri = 0;
for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
if (strcmp("asterisk_id", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field asterisk_id failed validation\n");
res = 0;
}
} else
if (strcmp("type", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_type = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field type failed validation\n");
res = 0;
}
} else
if (strcmp("application", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_application = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field application failed validation\n");
res = 0;
}
} else
if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_timestamp = 1;
prop_is_valid = ast_ari_validate_date(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field timestamp failed validation\n");
res = 0;
}
} else
if (strcmp("content_type", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field content_type failed validation\n");
res = 0;
}
} else
if (strcmp("message_body", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field message_body failed validation\n");
res = 0;
}
} else
if (strcmp("reason_phrase", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_reason_phrase = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field reason_phrase failed validation\n");
res = 0;
}
} else
if (strcmp("request_id", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_request_id = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field request_id failed validation\n");
res = 0;
}
} else
if (strcmp("status_code", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_status_code = 1;
prop_is_valid = ast_ari_validate_int(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field status_code failed validation\n");
res = 0;
}
} else
if (strcmp("transaction_id", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_transaction_id = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field transaction_id failed validation\n");
res = 0;
}
} else
if (strcmp("uri", ast_json_object_iter_key(iter)) == 0) {
int prop_is_valid;
has_uri = 1;
prop_is_valid = ast_ari_validate_string(
ast_json_object_iter_value(iter));
if (!prop_is_valid) {
ast_log(LOG_ERROR, "ARI RESTResponse field uri failed validation\n");
res = 0;
}
} else
{
ast_log(LOG_ERROR,
"ARI RESTResponse has undocumented field %s\n",
ast_json_object_iter_key(iter));
res = 0;
}
}
if (!has_type) {
ast_log(LOG_ERROR, "ARI RESTResponse missing required field type\n");
res = 0;
}
if (!has_application) {
ast_log(LOG_ERROR, "ARI RESTResponse missing required field application\n");
res = 0;
}
if (!has_timestamp) {
ast_log(LOG_ERROR, "ARI RESTResponse missing required field timestamp\n");
res = 0;
}
if (!has_reason_phrase) {
ast_log(LOG_ERROR, "ARI RESTResponse missing required field reason_phrase\n");
res = 0;
}
if (!has_request_id) {
ast_log(LOG_ERROR, "ARI RESTResponse missing required field request_id\n");
res = 0;
}
if (!has_status_code) {
ast_log(LOG_ERROR, "ARI RESTResponse missing required field status_code\n");
res = 0;
}
if (!has_transaction_id) {
ast_log(LOG_ERROR, "ARI RESTResponse missing required field transaction_id\n");
res = 0;
}
if (!has_uri) {
ast_log(LOG_ERROR, "ARI RESTResponse missing required field uri\n");
res = 0;
}
return res;
}
ari_validator ast_ari_validate_restresponse_fn(void)
{
return ast_ari_validate_restresponse;
}
int ast_ari_validate_recording_failed(struct ast_json *json)
{
int res = 1;

@ -1215,6 +1215,70 @@ int ast_ari_validate_playback_started(struct ast_json *json);
*/
ari_validator ast_ari_validate_playback_started_fn(void);
/*!
* \brief Validator for RESTHeader.
*
* REST over Websocket header
*
* \param json JSON object to validate.
* \retval True (non-zero) if valid.
* \retval False (zero) if invalid.
*/
int ast_ari_validate_restheader(struct ast_json *json);
/*!
* \brief Function pointer to ast_ari_validate_restheader().
*/
ari_validator ast_ari_validate_restheader_fn(void);
/*!
* \brief Validator for RESTQueryStringParameter.
*
* REST over Websocket Query String Parameter
*
* \param json JSON object to validate.
* \retval True (non-zero) if valid.
* \retval False (zero) if invalid.
*/
int ast_ari_validate_restquery_string_parameter(struct ast_json *json);
/*!
* \brief Function pointer to ast_ari_validate_restquery_string_parameter().
*/
ari_validator ast_ari_validate_restquery_string_parameter_fn(void);
/*!
* \brief Validator for RESTRequest.
*
* REST over Websocket Request.
*
* \param json JSON object to validate.
* \retval True (non-zero) if valid.
* \retval False (zero) if invalid.
*/
int ast_ari_validate_restrequest(struct ast_json *json);
/*!
* \brief Function pointer to ast_ari_validate_restrequest().
*/
ari_validator ast_ari_validate_restrequest_fn(void);
/*!
* \brief Validator for RESTResponse.
*
* REST over Websocket Response.
*
* \param json JSON object to validate.
* \retval True (non-zero) if valid.
* \retval False (zero) if invalid.
*/
int ast_ari_validate_restresponse(struct ast_json *json);
/*!
* \brief Function pointer to ast_ari_validate_restresponse().
*/
ari_validator ast_ari_validate_restresponse_fn(void);
/*!
* \brief Validator for RecordingFailed.
*
@ -1821,6 +1885,33 @@ ari_validator ast_ari_validate_application_fn(void);
* - application: string (required)
* - timestamp: Date (required)
* - playback: Playback (required)
* RESTHeader
* - name: string (required)
* - value: string (required)
* RESTQueryStringParameter
* - name: string (required)
* - value: string (required)
* RESTRequest
* - content_type: string
* - message_body: string
* - method: string (required)
* - query_strings: List[RESTQueryStringParameter]
* - request_id: string (required)
* - transaction_id: string (required)
* - type: string (required)
* - uri: string (required)
* RESTResponse
* - asterisk_id: string
* - type: string (required)
* - application: string (required)
* - timestamp: Date (required)
* - content_type: string
* - message_body: string
* - reason_phrase: string (required)
* - request_id: string (required)
* - status_code: int (required)
* - transaction_id: string (required)
* - uri: string (required)
* RecordingFailed
* - asterisk_id: string
* - type: string (required)

@ -0,0 +1,319 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2025, Sangoma Technologies Corporation
*
* George Joseph <gjoseph@sangoma.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#include "asterisk.h"
#include "ari_websockets.h"
#include "asterisk/ari.h"
#include "asterisk/json.h"
#include "asterisk/stasis_app.h"
struct rest_request_msg {
char *request_type;
char *transaction_id;
char *request_id;
enum ast_http_method method;
char *uri;
char *content_type;
struct ast_variable *query_strings;
struct ast_json *body;
};
static void request_destroy(struct rest_request_msg *request)
{
if (!request) {
return;
}
ast_free(request->request_type);
ast_free(request->transaction_id);
ast_free(request->request_id);
ast_free(request->uri);
ast_free(request->content_type);
ast_variables_destroy(request->query_strings);
ast_json_unref(request->body);
ast_free(request);
}
#define SET_RESPONSE_AND_EXIT(_reponse_code, _reponse_text, \
_reponse_msg, _remote_addr, _request, _request_msg) \
({ \
RAII_VAR(char *, _msg_str, NULL, ast_json_free); \
if (_request_msg) { \
_msg_str = ast_json_dump_string_format(_request_msg, AST_JSON_COMPACT); \
if (!_msg_str) { \
response->response_code = 500; \
response->response_text = "Server error. Out of memory"; \
} \
} \
response->message = ast_json_pack("{ s:s }", \
"message", _reponse_msg); \
response->response_code = _reponse_code; \
response->response_text = _reponse_text; \
SCOPE_EXIT_LOG_RTN_VALUE(_request, LOG_WARNING, \
"%s: %s Request: %s\n", _remote_addr, _reponse_text, S_OR(_msg_str, "<none>")); \
})
static struct rest_request_msg *parse_rest_request_msg(
const char *remote_addr, struct ast_json *request_msg,
struct ast_ari_response *response, int debug_app)
{
struct rest_request_msg *request = NULL;
RAII_VAR(char *, body, NULL, ast_free);
enum ast_json_nvp_ast_vars_code nvp_code;
char *query_string_start = NULL;
SCOPE_ENTER(4, "%s: Parsing RESTRequest message\n", remote_addr);
response->response_code = 200;
response->response_text = "OK";
if (!request_msg) {
SET_RESPONSE_AND_EXIT(500,
"Server error","No message to parse.",
remote_addr, request, NULL);
}
request = ast_calloc(1, sizeof(*request));
if (!request) {
SET_RESPONSE_AND_EXIT(500,
"Server error","Out of memory",
remote_addr, request, NULL);
}
/* transaction_id is optional */
request->transaction_id = ast_strdup(
ast_json_string_get(ast_json_object_get(
request_msg, "transaction_id")));
/* request_id is optional */
request->request_id = ast_strdup(
ast_json_string_get(ast_json_object_get(
request_msg, "request_id")));
request->request_type = ast_strdup(
ast_json_string_get(ast_json_object_get(request_msg, "type")));
if (ast_strlen_zero(request->request_type)) {
SET_RESPONSE_AND_EXIT(400,
"Bad request","No 'type' property.",
remote_addr, request, request_msg);
}
if (!ast_strings_equal(request->request_type, "RESTRequest")) {
SET_RESPONSE_AND_EXIT(400,
"Bad request","Unknown request type.",
remote_addr, request, request_msg);
}
request->uri = ast_strdup(
ast_json_string_get(ast_json_object_get(request_msg, "uri")));
if (ast_strlen_zero(request->uri)) {
SET_RESPONSE_AND_EXIT(400,
"Bad request","Empty or missing 'uri' property.",
remote_addr, request, request_msg);
}
if ((query_string_start = strchr(request->uri, '?')))
{
*query_string_start = '\0';
query_string_start++;
request->query_strings = ast_http_parse_post_form(
query_string_start, strlen(query_string_start), "application/x-www-form-urlencoded");
}
request->method = ast_get_http_method_from_string(
ast_json_string_get(ast_json_object_get(request_msg, "method")));
if (request->method == AST_HTTP_UNKNOWN) {
SET_RESPONSE_AND_EXIT(400,
"Bad request","Unknown or missing 'method' property.",
remote_addr, request, request_msg);
}
/* query_strings is optional */
nvp_code = ast_json_nvp_array_to_ast_variables(
ast_json_object_get(request_msg, "query_strings"),
&request->query_strings);
if (nvp_code != AST_JSON_NVP_AST_VARS_CODE_SUCCESS &&
nvp_code != AST_JSON_NVP_AST_VARS_CODE_NO_INPUT) {
SET_RESPONSE_AND_EXIT(400,
"Bad request","Unable to parse 'query_strings' array.",
remote_addr, request, request_msg);
}
request->body = ast_json_null();
body = ast_strdup(ast_json_string_get(
ast_json_object_get(request_msg, "message_body")));
if (ast_strlen_zero(body)) {
SCOPE_EXIT_RTN_VALUE(request,
"%s: Done parsing RESTRequest message.\n", remote_addr);
}
/* content_type is optional */
request->content_type = ast_strdup(
ast_json_string_get(ast_json_object_get(request_msg, "content_type")));
if (ast_strlen_zero(request->content_type)) {
SET_RESPONSE_AND_EXIT(400,
"Bad request","No 'content_type' for 'message_body'.",
remote_addr, request, request_msg);
}
if (ast_strings_equal(request->content_type, "application/x-www-form-urlencoded")) {
struct ast_variable *vars = ast_http_parse_post_form(body, strlen(body),
request->content_type);
if (!vars) {
SET_RESPONSE_AND_EXIT(400,
"Bad request","Unable to parse 'message_body' as 'application/x-www-form-urlencoded'.",
remote_addr, request, request_msg);
}
ast_variable_list_append(&request->query_strings, vars);
} else if (ast_strings_equal(request->content_type, "application/json")) {
struct ast_json_error error;
request->body = ast_json_load_buf(body, strlen(body), &error);
if (!request->body) {
SET_RESPONSE_AND_EXIT(400,
"Bad request","Unable to parse 'message_body' as 'application/json'.",
remote_addr, request, request_msg);
}
} else {
SET_RESPONSE_AND_EXIT(400,
"Bad request","Unknown content type.",
remote_addr, request, request_msg);
}
if (TRACE_ATLEAST(3) || debug_app) {
struct ast_variable *v = request->query_strings;
for (; v; v = v->next) {
ast_trace(-1, "Query string: %s=%s\n", v->name, v->value);
}
}
SCOPE_EXIT_RTN_VALUE(request,
"%s: Done parsing RESTRequest message.\n", remote_addr);
}
static void send_rest_response(
struct ari_ws_session *ari_ws_session,
const char *remote_addr, const char *app_name,
struct rest_request_msg *request,
struct ast_ari_response *response, int debug_app)
{
struct ast_json *app_resp_json = NULL;
char *message = NULL;
SCOPE_ENTER(4, "%s: Sending REST response %d:%s for uri %s\n",
remote_addr, response->response_code, response->response_text,
request ? request->uri : "N/A");
if (response->fd >= 0) {
close(response->fd);
response->response_code = 406;
response->response_text = "Not Acceptable. Use HTTP GET";
} else if (response->message && !ast_json_is_null(response->message)) {
message = ast_json_dump_string_format(response->message, AST_JSON_COMPACT);
ast_json_unref(response->message);
}
app_resp_json = ast_json_pack(
"{s:s, s:s*, s:s*, s:i, s:s, s:s, s:s*, s:s* }",
"type", "RESTResponse",
"transaction_id", request ? S_OR(request->transaction_id, "") : "",
"request_id", request ? S_OR(request->request_id, "") : "",
"status_code", response->response_code,
"reason_phrase", response->response_text,
"uri", request ? S_OR(request->uri, "") : "",
"content_type", message ? "application/json" : NULL,
"message_body", message);
ast_json_free(message);
if (!app_resp_json || ast_json_is_null(app_resp_json)) {
SCOPE_EXIT_LOG_RTN(LOG_WARNING,
"%s: Failed to pack JSON response for request %s\n",
remote_addr, request ? request->uri : "N/A");
}
SCOPE_CALL(-1, ari_websocket_send_event, ari_ws_session,
app_name, app_resp_json, debug_app);
ast_json_unref(app_resp_json);
SCOPE_EXIT("%s: Done. response: %d : %s\n",
remote_addr,
response->response_code,
response->response_text);
}
int ari_websocket_process_request(struct ari_ws_session *ari_ws_session,
const char *remote_addr, struct ast_variable *upgrade_headers,
const char *app_name, struct ast_json *request_msg)
{
int debug_app = stasis_app_get_debug_by_name(app_name);
RAII_VAR(struct rest_request_msg *, request, NULL, request_destroy);
struct ast_ari_response response = { .fd = -1, 0 };
SCOPE_ENTER(3, "%s: New WebSocket Msg\n", remote_addr);
if (TRACE_ATLEAST(3) || debug_app) {
char *str = ast_json_dump_string_format(request_msg, AST_JSON_PRETTY);
/* If we can't allocate a string, we can't respond to the client either. */
if (!str) {
SCOPE_EXIT_LOG_RTN_VALUE(-1, LOG_ERROR, "%s: Failed to dump JSON request\n",
remote_addr);
}
ast_verbose("<--- Received ARI message from %s --->\n%s\n",
remote_addr, str);
ast_json_free(str);
}
request = SCOPE_CALL_WITH_RESULT(-1, struct rest_request_msg *,
parse_rest_request_msg, remote_addr, request_msg, &response, debug_app);
if (!request || response.response_code != 200) {
SCOPE_CALL(-1, send_rest_response, ari_ws_session,
remote_addr, app_name, request, &response, debug_app);
SCOPE_EXIT_RTN_VALUE(0, "%s: Done with message\n", remote_addr);
}
/*
* We don't actually use the headers in the response
* but we have to allocate it because ast_ari_invoke
* and the resource handlers expect it.
*/
response.headers = ast_str_create(80);
if (!response.headers) {
/* If we can't allocate a string, we can't respond to the client either. */
SCOPE_EXIT_LOG_RTN_VALUE(-1, LOG_ERROR, "%s: Failed allocate headers string\n",
remote_addr);
}
SCOPE_CALL(-1, ast_ari_invoke, NULL, ARI_INVOKE_SOURCE_WEBSOCKET,
NULL, request->uri, request->method, request->query_strings,
upgrade_headers, request->body, &response);
ast_free(response.headers);
if (response.no_response) {
SCOPE_EXIT_RTN_VALUE(0, "No response needed\n");
}
SCOPE_CALL(-1, send_rest_response, ari_ws_session,
remote_addr, app_name, request, &response, debug_app);
SCOPE_EXIT_RTN_VALUE(0, "%s: Done with message\n", remote_addr);
}

@ -18,11 +18,19 @@
#include "asterisk.h"
#include "resource_events.h"
#include "ari_websockets.h"
#include "internal.h"
#if defined(AST_DEVMODE)
#include "ari_model_validators.h"
#endif
#include "asterisk/app.h"
#include "asterisk/ari.h"
#include "asterisk/astobj2.h"
#include "asterisk/http_websocket.h"
#include "asterisk/module.h"
#include "asterisk/stasis_app.h"
#include "internal.h"
/*! \file
*
@ -30,18 +38,22 @@
* \author David M. Lee, II <dlee@digium.com>
*/
struct ast_ari_websocket_session {
struct ast_websocket *ws_session;
int (*validator)(struct ast_json *);
};
/*! Number of buckets for the event session registry. Remember to keep it a prime number! */
#define ARI_WS_SESSION_NUM_BUCKETS 23
static void websocket_session_dtor(void *obj)
{
struct ast_ari_websocket_session *session = obj;
/*! Number of buckets for a websocket apps container. Remember to keep it a prime number! */
#define APPS_NUM_BUCKETS 7
ast_websocket_unref(session->ws_session);
session->ws_session = NULL;
}
/*! Initial size of a message queue. */
#define MESSAGES_INIT_SIZE 23
/*! \brief Local registry for created \ref event_session objects. */
static struct ao2_container *ari_ws_session_registry;
struct ast_websocket_server *ast_ws_server;
#define MAX_VALS 128
/*!
* \brief Validator that always succeeds.
@ -51,55 +63,99 @@ static int null_validator(struct ast_json *json)
return 1;
}
struct ast_ari_websocket_session *ast_ari_websocket_session_create(
struct ast_websocket *ws_session, int (*validator)(struct ast_json *))
#define VALIDATION_FAILED \
"{" \
" \"error\": \"InvalidMessage\"," \
" \"message\": \"Message validation failed\"" \
"}"
static int ari_ws_session_write(
struct ari_ws_session *ari_ws_session,
struct ast_json *message)
{
RAII_VAR(struct ast_ari_websocket_session *, session, NULL, ao2_cleanup);
RAII_VAR(struct ast_ari_conf *, config, ast_ari_config_get(), ao2_cleanup);
RAII_VAR(char *, str, NULL, ast_json_free);
if (ws_session == NULL) {
return NULL;
#ifdef AST_DEVMODE
if (!ari_ws_session->validator(message)) {
ast_log(LOG_ERROR, "Outgoing message failed validation\n");
return ast_websocket_write_string(ari_ws_session->ast_ws_session, VALIDATION_FAILED);
}
#endif
if (config == NULL || config->general == NULL) {
return NULL;
str = ast_json_dump_string_format(message, ast_ari_json_format());
if (str == NULL) {
ast_log(LOG_ERROR, "Failed to encode JSON object\n");
return -1;
}
if (validator == NULL) {
validator = null_validator;
if (ast_websocket_write_string(ari_ws_session->ast_ws_session, str)) {
ast_log(LOG_NOTICE, "Problem occurred during websocket write to %s, websocket closed\n",
ast_sockaddr_stringify(ast_websocket_remote_address(ari_ws_session->ast_ws_session)));
return -1;
}
return 0;
}
if (ast_websocket_set_nonblock(ws_session) != 0) {
/*!
* \internal
* \brief Updates the websocket session.
*
* \details If the value of the \c ws_session is not \c NULL and there are messages in the
* event session's \c message_queue, the messages are dispatched and removed from
* the queue.
*
* \param ari_ws_session The ARI websocket session
* \param ast_ws_session The Asterisk websocket session
*/
static int ari_ws_session_update(
struct ari_ws_session *ari_ws_session,
struct ast_websocket *ast_ws_session)
{
RAII_VAR(struct ast_ari_conf *, config, ast_ari_config_get(), ao2_cleanup);
int i;
if (ast_ws_session == NULL) {
return -1;
}
if (config == NULL || config->general == NULL) {
return -1;
}
if (ast_websocket_set_nonblock(ast_ws_session) != 0) {
ast_log(LOG_ERROR,
"ARI web socket failed to set nonblock; closing: %s\n",
strerror(errno));
return NULL;
return -1;
}
if (ast_websocket_set_timeout(ws_session, config->general->write_timeout)) {
if (ast_websocket_set_timeout(ast_ws_session, config->general->write_timeout)) {
ast_log(LOG_WARNING, "Failed to set write timeout %d on ARI web socket\n",
config->general->write_timeout);
}
session = ao2_alloc(sizeof(*session), websocket_session_dtor);
if (!session) {
return NULL;
ao2_ref(ast_ws_session, +1);
ari_ws_session->ast_ws_session = ast_ws_session;
ao2_lock(ari_ws_session);
for (i = 0; i < AST_VECTOR_SIZE(&ari_ws_session->message_queue); i++) {
struct ast_json *msg = AST_VECTOR_GET(&ari_ws_session->message_queue, i);
ari_ws_session_write(ari_ws_session, msg);
ast_json_unref(msg);
}
ao2_ref(ws_session, +1);
session->ws_session = ws_session;
session->validator = validator;
AST_VECTOR_RESET(&ari_ws_session->message_queue, AST_VECTOR_ELEM_CLEANUP_NOOP);
ao2_unlock(ari_ws_session);
ao2_ref(session, +1);
return session;
return 0;
}
struct ast_json *ast_ari_websocket_session_read(
struct ast_ari_websocket_session *session)
static struct ast_json *ari_ws_session_read(
struct ari_ws_session *ari_ws_session)
{
RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
if (ast_websocket_fd(session->ws_session) < 0) {
if (ast_websocket_fd(ari_ws_session->ast_ws_session) < 0) {
return NULL;
}
@ -111,7 +167,7 @@ struct ast_json *ast_ari_websocket_session_read(
int fragmented;
res = ast_wait_for_input(
ast_websocket_fd(session->ws_session), -1);
ast_websocket_fd(ari_ws_session->ast_ws_session), -1);
if (res <= 0) {
ast_log(LOG_WARNING, "WebSocket poll error: %s\n",
@ -119,7 +175,7 @@ struct ast_json *ast_ari_websocket_session_read(
return NULL;
}
res = ast_websocket_read(session->ws_session, &payload,
res = ast_websocket_read(ari_ws_session->ast_ws_session, &payload,
&payload_len, &opcode, &fragmented);
if (res != 0) {
@ -135,8 +191,21 @@ struct ast_json *ast_ari_websocket_session_read(
case AST_WEBSOCKET_OPCODE_TEXT:
message = ast_json_load_buf(payload, payload_len, NULL);
if (message == NULL) {
struct ast_json *error = ast_json_pack(
"{s:s, s:s, s:s, s:i, s:s, s:s }",
"type", "RESTResponse",
"transaction_id", "",
"request_id", "",
"status_code", 400,
"reason_phrase", "Failed to parse request message JSON",
"uri", ""
);
ari_websocket_send_event(ari_ws_session, ari_ws_session->app_name,
error, 0);
ast_json_unref(error);
ast_log(LOG_WARNING,
"WebSocket input failed to parse\n");
}
break;
@ -149,59 +218,534 @@ struct ast_json *ast_ari_websocket_session_read(
return ast_json_ref(message);
}
#define VALIDATION_FAILED \
"{" \
" \"error\": \"InvalidMessage\"," \
" \"message\": \"Message validation failed\"" \
"}"
void ari_handle_websocket(
struct ast_tcptls_session_instance *ser, const char *uri,
enum ast_http_method method, struct ast_variable *get_params,
struct ast_variable *headers)
{
struct ast_http_uri fake_urih = {
.data = ast_ws_server,
};
ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params,
headers);
}
/*!
* \brief Callback handler for Stasis application messages.
*
* \internal
*
* \param data Void pointer to the event session (\ref event_session).
* \param app_name Name of the Stasis application that dispatched the message.
* \param message The dispatched message.
* \param debug_app Debug flag for the application.
*/
void ari_websocket_send_event(struct ari_ws_session *ari_ws_session,
const char *app_name, struct ast_json *message, int debug_app)
{
char *remote_addr = ast_sockaddr_stringify(
ast_websocket_remote_address(ari_ws_session->ast_ws_session));
const char *msg_type, *msg_application, *msg_timestamp, *msg_ast_id;
SCOPE_ENTER(4, "%s: Dispatching message from Stasis app '%s'\n", remote_addr, app_name);
ast_assert(ari_ws_session != NULL);
ao2_lock(ari_ws_session);
msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
msg_application = S_OR(
ast_json_string_get(ast_json_object_get(message, "application")), "");
/* If we've been replaced, remove the application from our local
websocket_apps container */
if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
strcmp(msg_application, app_name) == 0) {
ao2_find(ari_ws_session->websocket_apps, msg_application,
OBJ_UNLINK | OBJ_NODATA);
}
msg_timestamp = S_OR(
ast_json_string_get(ast_json_object_get(message, "timestamp")), "");
if (ast_strlen_zero(msg_timestamp)) {
if (ast_json_object_set(message, "timestamp", ast_json_timeval(ast_tvnow(), NULL))) {
ao2_unlock(ari_ws_session);
SCOPE_EXIT_LOG_RTN(LOG_WARNING,
"%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
remote_addr, msg_type, msg_application);
}
}
msg_ast_id = S_OR(
ast_json_string_get(ast_json_object_get(message, "asterisk_id")), "");
if (ast_strlen_zero(msg_ast_id)) {
char eid[20];
if (ast_json_object_set(message, "asterisk_id",
ast_json_string_create(ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
ao2_unlock(ari_ws_session);
SCOPE_EXIT_LOG_RTN(LOG_WARNING,
"%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
remote_addr, msg_type, msg_application);
}
}
/* Now, we need to determine our state to see how we will handle the message */
if (ast_json_object_set(message, "application", ast_json_string_create(app_name))) {
ao2_unlock(ari_ws_session);
SCOPE_EXIT_LOG_RTN(LOG_WARNING,
"%s: Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
remote_addr, msg_type, msg_application);
}
if (!ari_ws_session) {
/* If the websocket is NULL, the message goes to the queue */
if (!AST_VECTOR_APPEND(&ari_ws_session->message_queue, message)) {
ast_json_ref(message);
}
ast_log(LOG_WARNING,
"%s: Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
remote_addr,
msg_type,
msg_application);
} else if (stasis_app_event_allowed(app_name, message)) {
if (TRACE_ATLEAST(4) || debug_app) {
char *str = ast_json_dump_string_format(message, AST_JSON_PRETTY);
ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
remote_addr,
str);
ast_json_free(str);
}
ari_ws_session_write(ari_ws_session, message);
}
ao2_unlock(ari_ws_session);
SCOPE_EXIT("%s: Dispatched '%s' message from Stasis app '%s'\n",
remote_addr, msg_type, app_name);
}
int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session,
static void stasis_app_message_handler(void *data, const char *app_name,
struct ast_json *message)
{
RAII_VAR(char *, str, NULL, ast_json_free);
int debug_app = stasis_app_get_debug_by_name(app_name);
struct ari_ws_session *ari_ws_session = data;
ast_assert(ari_ws_session != NULL);
ari_websocket_send_event(ari_ws_session, app_name, message, debug_app);
}
#ifdef AST_DEVMODE
if (!session->validator(message)) {
ast_log(LOG_ERROR, "Outgoing message failed validation\n");
return ast_websocket_write_string(session->ws_session, VALIDATION_FAILED);
static int parse_app_args(struct ast_variable *get_params,
struct ast_ari_response * response,
struct ast_ari_events_event_websocket_args *args)
{
struct ast_variable *i;
RAII_VAR(char *, app_parse, NULL, ast_free);
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "app") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
app_parse = ast_strdup(i->value);
if (!app_parse) {
ast_ari_response_alloc_failed(response);
return -1;
}
if (strlen(app_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args->app_count = 1;
vals[0] = app_parse;
} else {
args->app_count = ast_app_separate_args(
app_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args->app_count == 0) {
ast_ari_response_alloc_failed(response);
return -1;
}
if (args->app_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for app");
return -1;
}
args->app = ast_malloc(sizeof(*args->app) * args->app_count);
if (!args->app) {
ast_ari_response_alloc_failed(response);
return -1;
}
for (j = 0; j < args->app_count; ++j) {
args->app[j] = (vals[j]);
}
} else if (strcmp(i->name, "subscribeAll") == 0) {
args->subscribe_all = ast_true(i->value);
}
}
#endif
str = ast_json_dump_string_format(message, ast_ari_json_format());
args->app_parse = app_parse;
app_parse = NULL;
if (str == NULL) {
ast_log(LOG_ERROR, "Failed to encode JSON object\n");
return 0;
}
/*
* Websocket session cleanup is a bit complicated because it can be
* in different states, it may or may not be in the registry container,
* and stasis may be sending asynchronous events to it and some
* stages of cleanup need to lock it.
*
* That's why there are 3 different cleanup functions.
*/
/*!
* \internal
* \brief Reset the ari_ws_session without destroying it.
* It can't be reused and will be cleaned up by the caller.
*/
static void ari_ws_session_reset(struct ari_ws_session *ari_ws_session)
{
struct ao2_iterator i;
char *app;
int j;
SCOPED_AO2LOCK(lock, ari_ws_session);
/* Clean up the websocket_apps container */
if (ari_ws_session->websocket_apps) {
i = ao2_iterator_init(ari_ws_session->websocket_apps, 0);
while ((app = ao2_iterator_next(&i))) {
stasis_app_unregister(app);
ao2_cleanup(app);
}
ao2_iterator_destroy(&i);
ao2_cleanup(ari_ws_session->websocket_apps);
ari_ws_session->websocket_apps = NULL;
}
/* Clean up the message_queue container */
for (j = 0; j < AST_VECTOR_SIZE(&ari_ws_session->message_queue); j++) {
struct ast_json *msg = AST_VECTOR_GET(&ari_ws_session->message_queue, j);
ast_json_unref(msg);
}
AST_VECTOR_FREE(&ari_ws_session->message_queue);
}
/*!
* \internal
* \brief RAII_VAR and container ari_ws_session cleanup function.
* This unlinks the ari_ws_session from the registry and cleans up the
* decrements the reference count.
*/
static void ari_ws_session_cleanup(struct ari_ws_session *ari_ws_session)
{
if (!ari_ws_session) {
return;
}
ari_ws_session_reset(ari_ws_session);
if (ari_ws_session_registry) {
ao2_unlink(ari_ws_session_registry, ari_ws_session);
}
ao2_ref(ari_ws_session, -1);
}
/*!
* \internal
* \brief The ao2 destructor.
* This cleans up the reference to the parent ast_websocket.
*/
static void ari_ws_session_dtor(void *obj)
{
struct ari_ws_session *ari_ws_session = obj;
ast_free(ari_ws_session->app_name);
if (!ari_ws_session->ast_ws_session) {
return;
}
ast_websocket_unref(ari_ws_session->ast_ws_session);
ari_ws_session->ast_ws_session = NULL;
}
static int ari_ws_session_create(
int (*validator)(struct ast_json *),
struct ast_tcptls_session_instance *ser,
struct ast_ari_events_event_websocket_args *args,
const char *session_id)
{
RAII_VAR(struct ari_ws_session *, ari_ws_session, NULL, ao2_cleanup);
int (* register_handler)(const char *, stasis_app_cb handler, void *data);
size_t size, i;
if (validator == NULL) {
validator = null_validator;
}
size = sizeof(*ari_ws_session) + strlen(session_id) + 1;
ari_ws_session = ao2_alloc(size, ari_ws_session_dtor);
if (!ari_ws_session) {
return -1;
}
if (ast_websocket_write_string(session->ws_session, str)) {
ast_log(LOG_NOTICE, "Problem occurred during websocket write to %s, websocket closed\n",
ast_sockaddr_stringify(ast_ari_websocket_session_get_remote_addr(session)));
ari_ws_session->app_name = ast_strdup(args->app_parse);
if (!ari_ws_session->app_name) {
ast_http_error(ser, 500, "Internal Server Error",
"Allocation failed");
return -1;
}
strcpy(ari_ws_session->session_id, session_id); /* Safe */
/* Instantiate the hash table for Stasis apps */
ari_ws_session->websocket_apps =
ast_str_container_alloc(APPS_NUM_BUCKETS);
if (!ari_ws_session->websocket_apps) {
ast_http_error(ser, 500, "Internal Server Error",
"Allocation failed");
return -1;
}
/* Instantiate the message queue */
if (AST_VECTOR_INIT(&ari_ws_session->message_queue, MESSAGES_INIT_SIZE)) {
ast_http_error(ser, 500, "Internal Server Error",
"Allocation failed");
ao2_cleanup(ari_ws_session->websocket_apps);
return -1;
}
/* Register the apps with Stasis */
if (args->subscribe_all) {
register_handler = &stasis_app_register_all;
} else {
register_handler = &stasis_app_register;
}
for (i = 0; i < args->app_count; ++i) {
const char *app = args->app[i];
if (ast_strlen_zero(app)) {
ast_http_error(ser, 400, "Bad Request",
"Invalid application provided in param [app].");
ari_ws_session_reset(ari_ws_session);
return -1;
}
if (ast_str_container_add(ari_ws_session->websocket_apps, app)) {
ast_http_error(ser, 500, "Internal Server Error",
"Allocation failed");
ari_ws_session_reset(ari_ws_session);
return -1;
}
if (register_handler(app, stasis_app_message_handler, ari_ws_session)) {
ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
ast_http_error(ser, 500, "Internal Server Error",
"Stasis registration failed");
ari_ws_session_reset(ari_ws_session);
return -1;
}
}
ari_ws_session->validator = validator;
/*
* Add the event session to the session registry.
* When this functions returns, the registry will have
* the only reference to the session.
*/
if (!ao2_link(ari_ws_session_registry, ari_ws_session)) {
ast_http_error(ser, 500, "Internal Server Error",
"Allocation failed");
ari_ws_session_reset(ari_ws_session);
return -1;
}
return 0;
}
struct ast_sockaddr *ast_ari_websocket_session_get_remote_addr(
struct ast_ari_websocket_session *session)
/*!
* \internal
* \brief This function gets called before the upgrade process is completed.
* HTTP is still in effect.
*/
static int websocket_attempted_cb(struct ast_tcptls_session_instance *ser,
struct ast_variable *get_params, struct ast_variable *headers,
const char *session_id)
{
return ast_websocket_remote_address(session->ws_session);
struct ast_ari_events_event_websocket_args args = {};
int res = 0;
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
char *remote_addr = ast_sockaddr_stringify(&ser->remote_address);
response = ast_calloc(1, sizeof(*response));
if (!response) {
ast_log(LOG_ERROR, "Failed to create response.\n");
ast_http_error(ser, 500, "Server Error", "Memory allocation error");
return -1;
}
res = parse_app_args(get_params, response, &args);
if (res != 0) {
/* Param parsing failure */
RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
ast_log(LOG_ERROR, "Missing response message\n");
}
if (msg) {
ast_http_error(ser, response->response_code, response->response_text, msg);
return -1;
}
}
if (args.app_count == 0) {
ast_http_error(ser, 400, "Bad Request",
"HTTP request is missing param: [app]");
return -1;
}
#if defined(AST_DEVMODE)
res = ari_ws_session_create(ast_ari_validate_message_fn(),
ser, &args, session_id);
#else
res = ari_ws_session_create(NULL, ser, &args, session_id);
#endif
if (res != 0) {
ast_log(LOG_ERROR,
"%s: Failed to create ARI ari_session\n", remote_addr);
}
ast_free(args.app_parse);
ast_free(args.app);
return res;
}
void ari_handle_websocket(struct ast_websocket_server *ws_server,
struct ast_tcptls_session_instance *ser, const char *uri,
enum ast_http_method method, struct ast_variable *get_params,
struct ast_variable *headers)
/*!
* \internal
* \brief This function gets called after the upgrade process is completed.
* The websocket is now in effect.
*/
static void websocket_established_cb(struct ast_websocket *ast_ws_session,
struct ast_variable *get_params, struct ast_variable *upgrade_headers)
{
struct ast_http_uri fake_urih = {
.data = ws_server,
};
ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params,
headers);
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
/*
* ast_ws_session is passed in with it's refcount bumped so
* we need to unref it when we're done. The refcount will
* be bumped again when we add it to the ari_ws_session.
*/
RAII_VAR(struct ast_websocket *, s, ast_ws_session, ast_websocket_unref);
RAII_VAR(struct ari_ws_session *, ari_ws_session, NULL, ari_ws_session_cleanup);
struct ast_json *msg;
struct ast_variable *v;
char *remote_addr = ast_sockaddr_stringify(
ast_websocket_remote_address(ast_ws_session));
const char *session_id = ast_websocket_session_id(ast_ws_session);
SCOPE_ENTER(2, "%s: WebSocket established\n", remote_addr);
if (TRACE_ATLEAST(2)) {
ast_trace(2, "%s: Websocket Upgrade Headers:\n", remote_addr);
for (v = upgrade_headers; v; v = v->next) {
ast_trace(3, "--> %s: %s\n", v->name, v->value);
}
}
response = ast_calloc(1, sizeof(*response));
if (!response) {
SCOPE_EXIT_LOG_RTN(LOG_ERROR,
"%s: Failed to create response\n", remote_addr);
}
/* Find the event_session and update its websocket */
ari_ws_session = ao2_find(ari_ws_session_registry, session_id, OBJ_SEARCH_KEY);
if (ari_ws_session) {
ao2_unlink(ari_ws_session_registry, ari_ws_session);
ari_ws_session_update(ari_ws_session, ast_ws_session);
} else {
SCOPE_EXIT_LOG_RTN(LOG_ERROR,
"%s: Failed to locate an event session for the websocket session\n",
remote_addr);
}
ast_trace(-1, "%s: Waiting for messages\n", remote_addr);
while ((msg = ari_ws_session_read(ari_ws_session))) {
ari_websocket_process_request(ari_ws_session, remote_addr,
upgrade_headers, ari_ws_session->app_name, msg);
ast_json_unref(msg);
}
SCOPE_EXIT("%s: Websocket closed\n", remote_addr);
}
const char *ast_ari_websocket_session_id(
const struct ast_ari_websocket_session *session)
static int ari_ws_session_shutdown_cb(void *ari_ws_session, void *arg, int flags)
{
return ast_websocket_session_id(session->ws_session);
ari_ws_session_cleanup(ari_ws_session);
return 0;
}
static void ari_ws_session_registry_dtor(void)
{
ao2_callback(ari_ws_session_registry, OBJ_MULTIPLE | OBJ_NODATA,
ari_ws_session_shutdown_cb, NULL);
ao2_cleanup(ari_ws_session_registry);
ari_ws_session_registry = NULL;
}
int ari_websocket_unload_module(void)
{
ari_ws_session_registry_dtor();
ao2_cleanup(ast_ws_server);
ast_ws_server = NULL;
return 0;
}
AO2_STRING_FIELD_CMP_FN(ari_ws_session, session_id);
AO2_STRING_FIELD_HASH_FN(ari_ws_session, session_id);
int ari_websocket_load_module(void)
{
int res = 0;
struct ast_websocket_protocol *protocol;
ari_ws_session_registry = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
ARI_WS_SESSION_NUM_BUCKETS, ari_ws_session_hash_fn,
NULL, ari_ws_session_cmp_fn);
if (!ari_ws_session_registry) {
ast_log(LOG_WARNING,
"Failed to allocate the local registry for websocket applications\n");
return AST_MODULE_LOAD_DECLINE;
}
ast_ws_server = ast_websocket_server_create();
if (!ast_ws_server) {
ari_ws_session_registry_dtor();
return AST_MODULE_LOAD_DECLINE;
}
protocol = ast_websocket_sub_protocol_alloc("ari");
if (!protocol) {
ao2_ref(ast_ws_server, -1);
ast_ws_server = NULL;
ari_ws_session_registry_dtor();
return AST_MODULE_LOAD_DECLINE;
}
protocol->session_attempted = websocket_attempted_cb;
protocol->session_established = websocket_established_cb;
res = ast_websocket_server_add_protocol2(ast_ws_server, protocol);
return res == 0 ? AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
}

@ -0,0 +1,96 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef ARI_WEBSOCKETS_H_
#define ARI_WEBSOCKETS_H_
/*! \file
*
* \brief Internal API's for websockets.
* \author David M. Lee, II <dlee@digium.com>
*/
#include "asterisk/http.h"
#include "asterisk/json.h"
#include "asterisk/vector.h"
struct ast_ari_events_event_websocket_args;
/* Forward-declare websocket structs. This avoids including http_websocket.h,
* which causes optional_api stuff to happen, which makes optional_api more
* difficult to debug. */
//struct ast_websocket_server;
struct ast_websocket;
struct ari_ws_session {
struct ast_websocket *ast_ws_session; /*!< The parent websocket session. */
int (*validator)(struct ast_json *); /*!< The message validator. */
struct ao2_container *websocket_apps; /*!< List of Stasis apps registered to
the websocket session. */
AST_VECTOR(, struct ast_json *) message_queue; /*!< Container for holding delayed messages. */
char *app_name; /*!< The name of the Stasis application. */
char session_id[]; /*!< The id for the websocket session. */
};
/*!
* \internal
* \brief Send a JSON event to a websocket.
*
* \param ari_ws_session ARI websocket session
* \param app_name Application name
* \param message JSON message
* \param debug_app Debug flag for application
*/
void ari_websocket_send_event(struct ari_ws_session *ari_ws_session,
const char *app_name, struct ast_json *message, int debug_app);
/*!
* \internal
* \brief Process an ARI REST over Websocket request
*
* \param ari_ws_session ARI websocket session
* \param remote_addr Remote address for log messages
* \param upgrade_headers HTTP headers from the upgrade request
* \param app_name Application name
* \param msg JSON Request message
* \retval 0 on success, -1 on failure
*/
int ari_websocket_process_request(struct ari_ws_session *ast_ws_session,
const char *remote_addr, struct ast_variable *upgrade_headers,
const char *app_name, struct ast_json *msg);
/*!
* \brief Wrapper for invoking the websocket code for an incoming connection.
*
* \param ws_server WebSocket server to invoke.
* \param ser HTTP session.
* \param uri Requested URI.
* \param method Requested HTTP method.
* \param get_params Parsed query parameters.
* \param headers Parsed HTTP headers.
*/
void ari_handle_websocket(struct ast_tcptls_session_instance *ser,
const char *uri, enum ast_http_method method,
struct ast_variable *get_params,
struct ast_variable *headers);
int ari_websocket_unload_module(void);
int ari_websocket_load_module(void);
#endif /* ARI_WEBSOCKETS_H_ */

@ -143,25 +143,4 @@ struct ast_ari_conf_user *ast_ari_config_validate_user(const char *username,
/*! @} */
/* Forward-declare websocket structs. This avoids including http_websocket.h,
* which causes optional_api stuff to happen, which makes optional_api more
* difficult to debug. */
struct ast_websocket_server;
/*!
* \brief Wrapper for invoking the websocket code for an incoming connection.
*
* \param ws_server WebSocket server to invoke.
* \param ser HTTP session.
* \param uri Requested URI.
* \param method Requested HTTP method.
* \param get_params Parsed query parameters.
* \param headers Parsed HTTP headers.
*/
void ari_handle_websocket(struct ast_websocket_server *ws_server,
struct ast_tcptls_session_instance *ser, const char *uri,
enum ast_http_method method, struct ast_variable *get_params,
struct ast_variable *headers);
#endif /* ARI_INTERNAL_H_ */

@ -30,504 +30,8 @@
#include "asterisk.h"
#include "resource_events.h"
#include "asterisk/astobj2.h"
#include "asterisk/http_websocket.h"
#include "internal.h"
#include "asterisk/stasis_app.h"
#include "asterisk/vector.h"
/*! Number of buckets for the event session registry. Remember to keep it a prime number! */
#define EVENT_SESSION_NUM_BUCKETS 23
/*! Number of buckets for a websocket apps container. Remember to keep it a prime number! */
#define APPS_NUM_BUCKETS 7
/*! Initial size of a message queue. */
#define MESSAGES_INIT_SIZE 23
/*! \brief A wrapper for the /ref ast_ari_websocket_session. */
struct event_session {
struct ast_ari_websocket_session *ws_session; /*!< Handle to the websocket session. */
struct ao2_container *websocket_apps; /*!< List of Stasis apps registered to
the websocket session. */
AST_VECTOR(, struct ast_json *) message_queue; /*!< Container for holding delayed messages. */
char session_id[]; /*!< The id for the websocket session. */
};
/*! \brief \ref event_session error types. */
enum event_session_error_type {
ERROR_TYPE_STASIS_REGISTRATION = 1, /*!< Stasis failed to register the application. */
ERROR_TYPE_OOM = 2, /*!< Insufficient memory to create the event
session. */
ERROR_TYPE_MISSING_APP_PARAM = 3, /*!< HTTP request was missing an [app] parameter. */
ERROR_TYPE_INVALID_APP_PARAM = 4, /*!< HTTP request contained an invalid [app]
parameter. */
};
/*! \brief Local registry for created \ref event_session objects. */
static struct ao2_container *event_session_registry;
/*!
* \brief Callback handler for Stasis application messages.
*
* \internal
*
* \param data Void pointer to the event session (\ref event_session).
* \param app_name Name of the Stasis application that dispatched the message.
* \param message The dispatched message.
*/
static void stasis_app_message_handler(
void *data, const char *app_name, struct ast_json *message)
{
struct event_session *session = data;
const char *msg_type, *msg_application;
int app_debug_enabled;
ast_assert(session != NULL);
/*
* We need to get the debug flag before locking session
* to help prevent a deadlock with the apps_registry container.
*/
app_debug_enabled = stasis_app_get_debug_by_name(app_name);
ao2_lock(session);
msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), "");
msg_application = S_OR(
ast_json_string_get(ast_json_object_get(message, "application")), "");
/* If we've been replaced, remove the application from our local
websocket_apps container */
if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
strcmp(msg_application, app_name) == 0) {
ao2_find(session->websocket_apps, msg_application,
OBJ_UNLINK | OBJ_NODATA);
}
/* Now, we need to determine our state to see how we will handle the message */
if (ast_json_object_set(message, "application", ast_json_string_create(app_name))) {
/* We failed to add an application element to our json message */
ast_log(LOG_WARNING,
"Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n",
msg_type,
msg_application);
} else if (!session->ws_session) {
/* If the websocket is NULL, the message goes to the queue */
if (!AST_VECTOR_APPEND(&session->message_queue, message)) {
ast_json_ref(message);
}
ast_log(LOG_WARNING,
"Queued '%s' message for Stasis app '%s'; websocket is not ready\n",
msg_type,
msg_application);
} else if (stasis_app_event_allowed(app_name, message)) {
if (app_debug_enabled) {
char *str = ast_json_dump_string_format(message, ast_ari_json_format());
ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
ast_sockaddr_stringify(ast_ari_websocket_session_get_remote_addr(session->ws_session)),
str);
ast_json_free(str);
}
/* We are ready to publish the message */
ast_ari_websocket_session_write(session->ws_session, message);
}
ao2_unlock(session);
}
/*!
* \brief AO2 comparison function for \ref event_session objects.
*
* \internal
*
* \param obj Void pointer to the \ref event_session container.
* \param arg Void pointer to the \ref event_session object.
* \param flags The \ref search_flags to use when creating the hash key.
*
* \retval 0 The objects are not equal.
* \retval CMP_MATCH The objects are equal.
*/
static int event_session_compare(void *obj, void *arg, int flags)
{
const struct event_session *object_left = obj;
const struct event_session *object_right = arg;
const char *right_key = arg;
int cmp = 0;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT:
right_key = object_right->session_id;
/* Fall through */
case OBJ_SEARCH_KEY:
cmp = strcmp(object_left->session_id, right_key);
break;
case OBJ_SEARCH_PARTIAL_KEY:
cmp = strncmp(object_left->session_id, right_key, strlen(right_key));
break;
default:
break;
}
return cmp ? 0 : CMP_MATCH;
}
/*!
* \brief AO2 hash function for \ref event_session objects.
*
* \details Computes hash value for the given \ref event_session, with respect to the
* provided search flags.
*
* \internal
*
* \param obj Void pointer to the \ref event_session object.
* \param flags The \ref search_flags to use when creating the hash key.
*
* \retval > 0 on success
* \retval 0 on failure
*/
static int event_session_hash(const void *obj, const int flags)
{
const struct event_session *session;
const char *key;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_KEY:
key = obj;
break;
case OBJ_SEARCH_OBJECT:
session = obj;
key = session->session_id;
break;
default:
/* Hash can only work on something with a full key. */
ast_assert(0);
return 0;
}
return ast_str_hash(key);
}
/*!
* \brief Explicitly shutdown a session.
*
* \details An explicit shutdown is necessary, since the \ref stasis_app has a reference
* to this session. We also need to be sure to null out the \c ws_session field,
* since the websocket is about to go away.
*
* \internal
*
* \param session Event session object (\ref event_session).
*/
static void event_session_shutdown(struct event_session *session)
{
struct ao2_iterator i;
char *app;
int j;
SCOPED_AO2LOCK(lock, session);
/* Clean up the websocket_apps container */
if (session->websocket_apps) {
i = ao2_iterator_init(session->websocket_apps, 0);
while ((app = ao2_iterator_next(&i))) {
stasis_app_unregister(app);
ao2_cleanup(app);
}
ao2_iterator_destroy(&i);
ao2_cleanup(session->websocket_apps);
session->websocket_apps = NULL;
}
/* Clean up the message_queue container */
for (j = 0; j < AST_VECTOR_SIZE(&session->message_queue); j++) {
struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, j);
ast_json_unref(msg);
}
AST_VECTOR_FREE(&session->message_queue);
/* Remove the handle to the underlying websocket session */
session->ws_session = NULL;
}
/*!
* \brief Updates the websocket session for an \ref event_session.
*
* \details The websocket for the given \ref event_session will be updated to the value
* of the \c ws_session argument.
*
* If the value of the \c ws_session is not \c NULL and there are messages in the
* event session's \c message_queue, the messages are dispatched and removed from
* the queue.
*
* \internal
*
* \param session The event session object to update (\ref event_session).
* \param ws_session Handle to the underlying websocket session
* (\ref ast_ari_websocket_session).
*/
static void event_session_update_websocket(
struct event_session *session, struct ast_ari_websocket_session *ws_session)
{
int i;
ast_assert(session != NULL);
ao2_lock(session);
session->ws_session = ws_session;
for (i = 0; i < AST_VECTOR_SIZE(&session->message_queue); i++) {
struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, i);
ast_ari_websocket_session_write(session->ws_session, msg);
ast_json_unref(msg);
}
AST_VECTOR_RESET(&session->message_queue, AST_VECTOR_ELEM_CLEANUP_NOOP);
ao2_unlock(session);
}
/*!
* \brief Processes cleanup actions for a \ref event_session object.
*
* \internal
*
* \param session The event session object to cleanup (\ref event_session).
*/
static void event_session_cleanup(struct event_session *session)
{
if (!session) {
return;
}
event_session_shutdown(session);
if (event_session_registry) {
ao2_unlink(event_session_registry, session);
}
}
/*!
* \brief Event session object destructor (\ref event_session).
*
* \internal
*
* \param obj Void pointer to the \ref event_session object.
*/
static void event_session_dtor(void *obj)
{
#ifdef AST_DEVMODE /* Avoid unused variable warning */
struct event_session *session = obj;
#endif
/* event_session_shutdown should have been called before now */
ast_assert(session->ws_session == NULL);
ast_assert(session->websocket_apps == NULL);
ast_assert(AST_VECTOR_SIZE(&session->message_queue) == 0);
}
/*!
* \brief Handles \ref event_session error processing.
*
* \internal
*
* \param session The \ref event_session object.
* \param error The \ref event_session_error_type to handle.
* \param ser HTTP TCP/TLS Server Session (\ref ast_tcptls_session_instance).
*
* \retval -1 Always returns -1.
*/
static int event_session_allocation_error_handler(
struct event_session *session, enum event_session_error_type error,
struct ast_tcptls_session_instance *ser)
{
/* Notify the client */
switch (error) {
case ERROR_TYPE_STASIS_REGISTRATION:
ast_http_error(ser, 500, "Internal Server Error",
"Stasis registration failed");
break;
case ERROR_TYPE_OOM:
ast_http_error(ser, 500, "Internal Server Error",
"Allocation failed");
break;
case ERROR_TYPE_MISSING_APP_PARAM:
ast_http_error(ser, 400, "Bad Request",
"HTTP request is missing param: [app]");
break;
case ERROR_TYPE_INVALID_APP_PARAM:
ast_http_error(ser, 400, "Bad Request",
"Invalid application provided in param [app].");
break;
default:
break;
}
/* Cleanup the session */
event_session_cleanup(session);
return -1;
}
/*!
* \brief Creates an \ref event_session object and registers its apps with Stasis.
*
* \internal
*
* \param ser HTTP TCP/TLS Server Session (\ref ast_tcptls_session_instance).
* \param args The Stasis [app] parameters as parsed from the HTTP request
* (\ref ast_ari_events_event_websocket_args).
* \param session_id The id for the websocket session that will be created for this
* event session.
*
* \retval 0 on success
* \retval -1 on failure
*/
static int event_session_alloc(struct ast_tcptls_session_instance *ser,
struct ast_ari_events_event_websocket_args *args, const char *session_id)
{
RAII_VAR(struct event_session *, session, NULL, ao2_cleanup);
int (* register_handler)(const char *, stasis_app_cb handler, void *data);
size_t size, i;
/* The request must have at least one [app] parameter */
if (args->app_count == 0) {
return event_session_allocation_error_handler(
session, ERROR_TYPE_MISSING_APP_PARAM, ser);
}
size = sizeof(*session) + strlen(session_id) + 1;
/* Instantiate the event session */
session = ao2_alloc(size, event_session_dtor);
if (!session) {
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
strncpy(session->session_id, session_id, size - sizeof(*session));
/* Instantiate the hash table for Stasis apps */
session->websocket_apps =
ast_str_container_alloc(APPS_NUM_BUCKETS);
if (!session->websocket_apps) {
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
/* Instantiate the message queue */
if (AST_VECTOR_INIT(&session->message_queue, MESSAGES_INIT_SIZE)) {
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
/* Register the apps with Stasis */
if (args->subscribe_all) {
register_handler = &stasis_app_register_all;
} else {
register_handler = &stasis_app_register;
}
for (i = 0; i < args->app_count; ++i) {
const char *app = args->app[i];
if (ast_strlen_zero(app)) {
return event_session_allocation_error_handler(
session, ERROR_TYPE_INVALID_APP_PARAM, ser);
}
if (ast_str_container_add(session->websocket_apps, app)) {
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
if (register_handler(app, stasis_app_message_handler, session)) {
ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app);
return event_session_allocation_error_handler(
session, ERROR_TYPE_STASIS_REGISTRATION, ser);
}
}
/* Add the event session to the local registry */
if (!ao2_link(event_session_registry, session)) {
return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser);
}
return 0;
}
static int event_session_shutdown_cb(void *session, void *arg, int flags)
{
event_session_cleanup(session);
return 0;
}
void ast_ari_websocket_events_event_websocket_dtor(void)
{
ao2_callback(event_session_registry, OBJ_MULTIPLE | OBJ_NODATA, event_session_shutdown_cb, NULL);
ao2_cleanup(event_session_registry);
event_session_registry = NULL;
}
int ast_ari_websocket_events_event_websocket_init(void)
{
/* Try to instantiate the registry */
event_session_registry = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
EVENT_SESSION_NUM_BUCKETS, event_session_hash, NULL, event_session_compare);
if (!event_session_registry) {
/* This is bad, bad. */
ast_log(LOG_WARNING,
"Failed to allocate the local registry for websocket applications\n");
return -1;
}
return 0;
}
int ast_ari_websocket_events_event_websocket_attempted(
struct ast_tcptls_session_instance *ser, struct ast_variable *headers,
struct ast_ari_events_event_websocket_args *args, const char *session_id)
{
ast_debug(3, "/events WebSocket attempted\n");
/* Create the event session */
return event_session_alloc(ser, args, session_id);
}
void ast_ari_websocket_events_event_websocket_established(
struct ast_ari_websocket_session *ws_session, struct ast_variable *headers,
struct ast_ari_events_event_websocket_args *args)
{
struct event_session *session;
struct ast_json *msg;
const char *session_id;
ast_debug(3, "/events WebSocket established\n");
ast_assert(ws_session != NULL);
session_id = ast_ari_websocket_session_id(ws_session);
/* Find the event_session and update its websocket */
session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY);
if (session) {
ao2_unlink(event_session_registry, session);
event_session_update_websocket(session, ws_session);
} else {
ast_log(LOG_WARNING,
"Failed to locate an event session for the provided websocket session\n");
}
/* We don't process any input, but we'll consume it waiting for EOF */
while ((msg = ast_ari_websocket_session_read(ws_session))) {
ast_json_unref(msg);
}
event_session_cleanup(session);
ao2_ref(session, -1);
}
void ast_ari_events_user_event(struct ast_variable *headers,
struct ast_ari_events_user_event_args *args,

@ -50,43 +50,6 @@ struct ast_ari_events_event_websocket_args {
/*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */
int subscribe_all;
};
/*!
* \brief WebSocket connection for events.
*
* \retval 0 success
* \retval -1 error
*/
int ast_ari_websocket_events_event_websocket_init(void);
/*!
* \brief WebSocket connection for events.
*/
void ast_ari_websocket_events_event_websocket_dtor(void);
/*!
* \brief WebSocket connection for events.
*
* \param ser HTTP TCP/TLS Server Session
* \param headers HTTP headers
* \param args Swagger parameters
* \param session_id The id of the current session.
*
* \retval 0 success
* \retval non-zero error
*/
int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser,
struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args, const char *session_id);
/*!
* \brief WebSocket connection for events.
*
* \param session ARI WebSocket.
* \param headers HTTP headers.
* \param args Swagger parameters.
*/
void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *session,
struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args);
/*! Argument struct for ast_ari_events_user_event() */
struct ast_ari_events_user_event_args {
/*! Event name */

@ -188,6 +188,7 @@
#include "asterisk.h"
#include "ari/internal.h"
#include "ari/ari_websockets.h"
#include "asterisk/ari.h"
#include "asterisk/astobj2.h"
#include "asterisk/module.h"
@ -214,6 +215,22 @@ static struct stasis_rest_handlers *root_handler;
/*! Pre-defined message for allocation failures. */
static struct ast_json *oom_json;
/*! \brief Callback for the root URI. */
static int ast_ari_callback(struct ast_tcptls_session_instance *ser,
const struct ast_http_uri *urih, const char *uri,
enum ast_http_method method, struct ast_variable *get_params,
struct ast_variable *headers);
static struct ast_http_uri http_uri = {
.callback = ast_ari_callback,
.description = "Asterisk RESTful API",
.uri = "ari",
.has_subtree = 1,
.data = NULL,
.key = __FILE__,
.no_decode_uri = 1,
};
struct ast_json *ast_ari_oom_json(void)
{
return oom_json;
@ -531,33 +548,186 @@ static void handle_options(struct stasis_rest_handlers *handler,
}
}
void ast_ari_invoke(struct ast_tcptls_session_instance *ser,
/*!
* \brief Authenticate a <code>?api_key=userid:password</code>
*
* \param api_key API key query parameter
* \return User object for the authenticated user.
* \retval NULL if authentication failed.
*/
static struct ast_ari_conf_user *authenticate_api_key(const char *api_key)
{
RAII_VAR(char *, copy, NULL, ast_free);
char *username;
char *password;
password = copy = ast_strdup(api_key);
if (!copy) {
return NULL;
}
username = strsep(&password, ":");
if (!password) {
ast_log(LOG_WARNING, "Invalid api_key\n");
return NULL;
}
return ast_ari_config_validate_user(username, password);
}
/*!
* \brief Authenticate an HTTP request.
*
* \param get_params GET parameters of the request.
* \param headers HTTP headers.
* \return User object for the authenticated user.
* \retval NULL if authentication failed.
*/
static struct ast_ari_conf_user *authenticate_user(struct ast_variable *get_params,
struct ast_variable *headers)
{
RAII_VAR(struct ast_http_auth *, http_auth, NULL, ao2_cleanup);
struct ast_variable *v;
/* HTTP Basic authentication */
http_auth = ast_http_get_auth(headers);
if (http_auth) {
return ast_ari_config_validate_user(http_auth->userid,
http_auth->password);
}
/* ?api_key authentication */
for (v = get_params; v; v = v->next) {
if (strcasecmp("api_key", v->name) == 0) {
return authenticate_api_key(v->value);
}
}
return NULL;
}
static void remove_trailing_slash(const char *uri,
struct ast_ari_response *response)
{
char *slashless = ast_strdupa(uri);
slashless[strlen(slashless) - 1] = '\0';
/* While it's tempting to redirect the client to the slashless URL,
* that is problematic. A 302 Found is the most appropriate response,
* but most clients issue a GET on the location you give them,
* regardless of the method of the original request.
*
* While there are some ways around this, it gets into a lot of client
* specific behavior and corner cases in the HTTP standard. There's also
* very little practical benefit of redirecting; only GET and HEAD can
* be redirected automagically; all other requests "MUST NOT
* automatically redirect the request unless it can be confirmed by the
* user, since this might change the conditions under which the request
* was issued."
*
* Given all of that, a 404 with a nice message telling them what to do
* is probably our best bet.
*/
ast_ari_response_error(response, 404, "Not Found",
"ARI URLs do not end with a slash. Try /ari/%s", slashless);
}
enum ast_ari_invoke_result ast_ari_invoke(struct ast_tcptls_session_instance *ser,
enum ast_ari_invoke_source source, const struct ast_http_uri *urih,
const char *uri, enum ast_http_method method,
struct ast_variable *get_params, struct ast_variable *headers,
struct ast_json *body, struct ast_ari_response *response)
{
RAII_VAR(struct stasis_rest_handlers *, root, NULL, ao2_cleanup);
struct stasis_rest_handlers *handler;
struct stasis_rest_handlers *handler = NULL;
struct stasis_rest_handlers *wildcard_handler = NULL;
RAII_VAR(struct ast_variable *, path_vars, NULL, ast_variables_destroy);
RAII_VAR(struct ast_ari_conf_user *, user, NULL, ao2_cleanup);
RAII_VAR(struct ast_ari_conf *, conf, ast_ari_config_get(), ao2_cleanup);
char *path = ast_strdupa(uri);
char *path_segment;
char *path_segment = NULL;
stasis_rest_callback callback;
SCOPE_ENTER(3, "Request: %s %s, path:%s\n", ast_get_http_method(method), uri, path);
if (!conf || !conf->general) {
if (ser && source == ARI_INVOKE_SOURCE_REST) {
ast_http_request_close_on_completion(ser);
}
ast_ari_response_error(response, 500, "Server Error", "URI handler config missing");
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CLOSE, "Response: %d : %s\n",
response->response_code, response->response_text);
}
user = authenticate_user(get_params, headers);
if (!user && source == ARI_INVOKE_SOURCE_REST) {
/* Per RFC 2617, section 1.2: The 401 (Unauthorized) response
* message is used by an origin server to challenge the
* authorization of a user agent. This response MUST include a
* WWW-Authenticate header field containing at least one
* challenge applicable to the requested resource.
*/
ast_ari_response_error(response, 401, "Unauthorized", "Authentication required");
/* Section 1.2:
* realm = "realm" "=" realm-value
* realm-value = quoted-string
* Section 2:
* challenge = "Basic" realm
*/
ast_str_append(&response->headers, 0,
"WWW-Authenticate: Basic realm=\"%s\"\r\n",
conf->general->auth_realm);
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s\n",
response->response_code, response->response_text);
} else if (!ast_fully_booted) {
ast_ari_response_error(response, 503, "Service Unavailable", "Asterisk not booted");
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CLOSE, "Response: %d : %s\n",
response->response_code, response->response_text);
} else if (user && user->read_only && method != AST_HTTP_GET && method != AST_HTTP_OPTIONS) {
ast_ari_response_error(response, 403, "Forbidden", "Write access denied");
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s\n",
response->response_code, response->response_text);
} else if (ast_ends_with(uri, "/")) {
remove_trailing_slash(uri, response);
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s\n",
response->response_code, response->response_text);
} else if (ast_begins_with(uri, "api-docs/")) {
/* Serving up API docs */
if (method != AST_HTTP_GET) {
ast_ari_response_error(response, 405, "Method Not Allowed", "Unsupported method");
} else {
if (urih) {
/* Skip the api-docs prefix */
ast_ari_get_docs(strchr(uri, '/') + 1, urih->prefix, headers, response);
} else {
/*
* If we were invoked without a urih, we're probably
* being called from the websocket so just use the
* default prefix. It's filled in by ast_http_uri_link().
*/
ast_ari_get_docs(strchr(uri, '/') + 1, http_uri.prefix, headers, response);
}
}
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s\n",
response->response_code, response->response_text);
}
root = handler = get_root_handler();
ast_assert(root != NULL);
ast_debug(3, "Finding handler for %s\n", path);
while ((path_segment = strsep(&path, "/")) && (strlen(path_segment) > 0)) {
struct stasis_rest_handlers *found_handler = NULL;
int i;
SCOPE_ENTER(4, "Finding handler for path segment %s\n", path_segment);
ast_uri_decode(path_segment, ast_uri_http_legacy);
ast_debug(3, " Finding handler for %s\n", path_segment);
for (i = 0; found_handler == NULL && i < handler->num_children; ++i) {
struct stasis_rest_handlers *child = handler->children[i];
SCOPE_ENTER(5, "Checking handler path segment %s\n", child->path_segment);
if (child->is_wildcard) {
/* Record the path variable */
@ -565,18 +735,19 @@ void ast_ari_invoke(struct ast_tcptls_session_instance *ser,
path_var->next = path_vars;
path_vars = path_var;
wildcard_handler = child;
ast_debug(3, " Checking %s %s: Matched wildcard.\n", handler->path_segment, child->path_segment);
ast_trace(-1, " Checking %s %s: Matched wildcard.\n", handler->path_segment, child->path_segment);
} else if (strcmp(child->path_segment, path_segment) == 0) {
found_handler = child;
ast_debug(3, " Checking %s %s: Explicit match with %s\n", handler->path_segment, child->path_segment, path_segment);
ast_trace(-1, " Checking %s %s: Explicit match with %s\n", handler->path_segment, child->path_segment, path_segment);
} else {
ast_debug(3, " Checking %s %s: Didn't match %s\n", handler->path_segment, child->path_segment, path_segment);
ast_trace(-1, " Checking %s %s: Didn't match %s\n", handler->path_segment, child->path_segment, path_segment);
}
SCOPE_EXIT("Done checking %s\n", child->path_segment);
}
if (!found_handler && wildcard_handler) {
ast_debug(3, " No explicit handler found for %s. Using wildcard %s.\n",
ast_trace(-1, " No explicit handler found for %s. Using wildcard %s.\n",
path_segment, wildcard_handler->path_segment);
found_handler = wildcard_handler;
wildcard_handler = NULL;
@ -584,20 +755,26 @@ void ast_ari_invoke(struct ast_tcptls_session_instance *ser,
if (found_handler == NULL) {
/* resource not found */
ast_debug(3, " Handler not found for %s\n", path_segment);
ast_ari_response_error(
response, 404, "Not Found",
"Resource not found");
return;
SCOPE_EXIT_EXPR(break, "Handler not found for %s\n", path_segment);
} else {
handler = found_handler;
}
SCOPE_EXIT("Done checking %s\n", path_segment);
}
if (handler == NULL || response->response_code == 404) {
/* resource not found */
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s %s\n",
response->response_code, response->response_text, uri);
}
ast_assert(handler != NULL);
if (method == AST_HTTP_OPTIONS) {
handle_options(handler, headers, response);
return;
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Was options\n");
}
if (method < 0 || method >= AST_HTTP_MAX_METHOD) {
@ -605,17 +782,26 @@ void ast_ari_invoke(struct ast_tcptls_session_instance *ser,
ast_ari_response_error(
response, 405, "Method Not Allowed",
"Invalid method");
return;
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s\n",
response->response_code, response->response_text);
}
if (handler->ws_server && method == AST_HTTP_GET) {
if (handler->is_websocket && method == AST_HTTP_GET) {
if (source == ARI_INVOKE_SOURCE_WEBSOCKET) {
ast_ari_response_error(
response, 400, "Bad request",
"Can't upgrade to websocket from a websocket");
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s\n",
response->response_code, response->response_text);
}
/* WebSocket! */
ari_handle_websocket(handler->ws_server, ser, uri, method,
ast_trace(-1, "Handling websocket %s\n", uri);
ari_handle_websocket(ser, uri, method,
get_params, headers);
/* Since the WebSocket code handles the connection, we shouldn't
* do anything else; setting no_response */
response->no_response = 1;
return;
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Upgrade to websocket\n");
}
callback = handler->callbacks[method];
@ -624,9 +810,11 @@ void ast_ari_invoke(struct ast_tcptls_session_instance *ser,
ast_ari_response_error(
response, 405, "Method Not Allowed",
"Invalid method");
return;
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s\n",
response->response_code, response->response_text);
}
ast_trace(-1, "Running callback: %s\n", uri);
callback(ser, get_params, path_vars, headers, body, response);
if (response->message == NULL && response->response_code == 0) {
/* Really should not happen */
@ -635,7 +823,11 @@ void ast_ari_invoke(struct ast_tcptls_session_instance *ser,
ast_ari_response_error(
response, 501, "Not Implemented",
"Method not implemented");
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_ERROR_CONTINUE, "Response: %d : %s\n",
response->response_code, response->response_text);
}
SCOPE_EXIT_RTN_VALUE(ARI_INVOKE_RESULT_SUCCESS, "Response: %d : %s\n",
response->response_code, response->response_text);
}
void ast_ari_get_docs(const char *uri, const char *prefix, struct ast_variable *headers,
@ -762,32 +954,6 @@ void ast_ari_get_docs(const char *uri, const char *prefix, struct ast_variable *
ast_ari_response_ok(response, obj);
}
static void remove_trailing_slash(const char *uri,
struct ast_ari_response *response)
{
char *slashless = ast_strdupa(uri);
slashless[strlen(slashless) - 1] = '\0';
/* While it's tempting to redirect the client to the slashless URL,
* that is problematic. A 302 Found is the most appropriate response,
* but most clients issue a GET on the location you give them,
* regardless of the method of the original request.
*
* While there are some ways around this, it gets into a lot of client
* specific behavior and corner cases in the HTTP standard. There's also
* very little practical benefit of redirecting; only GET and HEAD can
* be redirected automagically; all other requests "MUST NOT
* automatically redirect the request unless it can be confirmed by the
* user, since this might change the conditions under which the request
* was issued."
*
* Given all of that, a 404 with a nice message telling them what to do
* is probably our best bet.
*/
ast_ari_response_error(response, 404, "Not Found",
"ARI URLs do not end with a slash. Try /ari/%s", slashless);
}
/*!
* \brief Handle CORS headers for simple requests.
*
@ -853,64 +1019,6 @@ enum ast_json_encoding_format ast_ari_json_format(void)
return cfg->general->format;
}
/*!
* \brief Authenticate a <code>?api_key=userid:password</code>
*
* \param api_key API key query parameter
* \return User object for the authenticated user.
* \retval NULL if authentication failed.
*/
static struct ast_ari_conf_user *authenticate_api_key(const char *api_key)
{
RAII_VAR(char *, copy, NULL, ast_free);
char *username;
char *password;
password = copy = ast_strdup(api_key);
if (!copy) {
return NULL;
}
username = strsep(&password, ":");
if (!password) {
ast_log(LOG_WARNING, "Invalid api_key\n");
return NULL;
}
return ast_ari_config_validate_user(username, password);
}
/*!
* \brief Authenticate an HTTP request.
*
* \param get_params GET parameters of the request.
* \param headers HTTP headers.
* \return User object for the authenticated user.
* \retval NULL if authentication failed.
*/
static struct ast_ari_conf_user *authenticate_user(struct ast_variable *get_params,
struct ast_variable *headers)
{
RAII_VAR(struct ast_http_auth *, http_auth, NULL, ao2_cleanup);
struct ast_variable *v;
/* HTTP Basic authentication */
http_auth = ast_http_get_auth(headers);
if (http_auth) {
return ast_ari_config_validate_user(http_auth->userid,
http_auth->password);
}
/* ?api_key authentication */
for (v = get_params; v; v = v->next) {
if (strcasecmp("api_key", v->name) == 0) {
return authenticate_api_key(v->value);
}
}
return NULL;
}
/*!
* \internal
* \brief ARI HTTP handler.
@ -932,35 +1040,28 @@ static int ast_ari_callback(struct ast_tcptls_session_instance *ser,
struct ast_variable *get_params,
struct ast_variable *headers)
{
RAII_VAR(struct ast_ari_conf *, conf, NULL, ao2_cleanup);
RAII_VAR(struct ast_str *, response_body, ast_str_create(256), ast_free);
RAII_VAR(struct ast_ari_conf_user *, user, NULL, ao2_cleanup);
struct ast_ari_response response = { .fd = -1, 0 };
RAII_VAR(struct ast_variable *, post_vars, NULL, ast_variables_destroy);
struct ast_variable *var;
const char *app_name = NULL;
RAII_VAR(struct ast_json *, body, ast_json_null(), ast_json_unref);
int debug_app = 0;
enum ast_ari_invoke_result result;
SCOPE_ENTER(2, "%s: Request: %s %s\n", ast_sockaddr_stringify(&ser->remote_address),
ast_get_http_method(method), uri);
if (!response_body) {
ast_http_request_close_on_completion(ser);
ast_http_error(ser, 500, "Server Error", "Out of memory");
return 0;
SCOPE_EXIT_RTN_VALUE(0, "Out of memory\n");
}
response.headers = ast_str_create(40);
if (!response.headers) {
ast_http_request_close_on_completion(ser);
ast_http_error(ser, 500, "Server Error", "Out of memory");
return 0;
}
conf = ast_ari_config_get();
if (!conf || !conf->general) {
ast_free(response.headers);
ast_http_request_close_on_completion(ser);
ast_http_error(ser, 500, "Server Error", "URI handler config missing");
return 0;
SCOPE_EXIT_RTN_VALUE(0, "Out of memory\n");
}
process_cors_request(headers, &response);
@ -971,6 +1072,7 @@ static int ast_ari_callback(struct ast_tcptls_session_instance *ser,
*/
post_vars = ast_http_get_post_vars(ser, headers);
if (!post_vars) {
ast_trace(-1, "No post_vars\n");
switch (errno) {
case EFBIG:
ast_ari_response_error(&response, 413,
@ -993,6 +1095,7 @@ static int ast_ari_callback(struct ast_tcptls_session_instance *ser,
* If there were post_vars, then the request body would already have
* been consumed and can not be read again.
*/
ast_trace(-1, "Checking body for vars\n");
body = ast_http_get_json(ser, headers);
if (!body) {
switch (errno) {
@ -1009,10 +1112,12 @@ static int ast_ari_callback(struct ast_tcptls_session_instance *ser,
}
}
if (get_params == NULL) {
ast_trace(-1, "No get_params, using post_vars if any\n");
get_params = post_vars;
} else if (get_params && post_vars) {
/* Has both post_vars and get_params */
struct ast_variable *last_var = post_vars;
ast_trace(-1, "Has get_params and post_vars. Merging\n");
while (last_var->next) {
last_var = last_var->next;
}
@ -1030,6 +1135,7 @@ static int ast_ari_callback(struct ast_tcptls_session_instance *ser,
app_name = (app ? ast_json_string_get(app) : NULL);
}
ast_trace(-1, "app_name: %s\n", app_name);
/* stasis_app_get_debug_by_name returns an "||" of the app's debug flag
* and the global debug flag.
@ -1061,53 +1167,18 @@ static int ast_ari_callback(struct ast_tcptls_session_instance *ser,
ast_free(buf);
}
user = authenticate_user(get_params, headers);
if (response.response_code > 0) {
/* POST parameter processing error. Do nothing. */
} else if (!user) {
/* Per RFC 2617, section 1.2: The 401 (Unauthorized) response
* message is used by an origin server to challenge the
* authorization of a user agent. This response MUST include a
* WWW-Authenticate header field containing at least one
* challenge applicable to the requested resource.
*/
ast_ari_response_error(&response, 401, "Unauthorized", "Authentication required");
/* Section 1.2:
* realm = "realm" "=" realm-value
* realm-value = quoted-string
* Section 2:
* challenge = "Basic" realm
*/
ast_str_append(&response.headers, 0,
"WWW-Authenticate: Basic realm=\"%s\"\r\n",
conf->general->auth_realm);
} else if (!ast_fully_booted) {
result = SCOPE_CALL_WITH_RESULT(-1, enum ast_ari_invoke_result,
ast_ari_invoke, ser, ARI_INVOKE_SOURCE_REST,
urih, uri, method, get_params, headers, body, &response);
if (result == ARI_INVOKE_RESULT_ERROR_CLOSE) {
ast_http_request_close_on_completion(ser);
ast_ari_response_error(&response, 503, "Service Unavailable", "Asterisk not booted");
} else if (user->read_only && method != AST_HTTP_GET && method != AST_HTTP_OPTIONS) {
ast_ari_response_error(&response, 403, "Forbidden", "Write access denied");
} else if (ast_ends_with(uri, "/")) {
remove_trailing_slash(uri, &response);
} else if (ast_begins_with(uri, "api-docs/")) {
/* Serving up API docs */
if (method != AST_HTTP_GET) {
ast_ari_response_error(&response, 405, "Method Not Allowed", "Unsupported method");
} else {
/* Skip the api-docs prefix */
ast_ari_get_docs(strchr(uri, '/') + 1, urih->prefix, headers, &response);
}
} else {
/* Other RESTful resources */
ast_ari_invoke(ser, uri, method, get_params, headers, body,
&response);
}
if (response.no_response) {
/* The handler indicates no further response is necessary.
* Probably because it already handled it */
ast_free(response.headers);
return 0;
SCOPE_EXIT_RTN_VALUE(0, "No response needed\n");
}
request_failed:
@ -1125,7 +1196,7 @@ request_failed:
ast_str_append(&response.headers, 0,
"Content-type: application/json\r\n");
if (ast_json_dump_str_format(response.message, &response_body,
conf->general->format) != 0) {
ast_ari_json_format()) != 0) {
/* Error encoding response */
response.response_code = 500;
response.response_text = "Internal Server Error";
@ -1151,21 +1222,14 @@ request_failed:
if (response.fd >= 0) {
close(response.fd);
}
return 0;
SCOPE_EXIT_RTN_VALUE(0, "Done. response: %d : %s\n", response.response_code,
response.response_text);
}
static struct ast_http_uri http_uri = {
.callback = ast_ari_callback,
.description = "Asterisk RESTful API",
.uri = "ari",
.has_subtree = 1,
.data = NULL,
.key = __FILE__,
.no_decode_uri = 1,
};
static int unload_module(void)
{
ari_websocket_unload_module();
ast_ari_cli_unregister();
if (is_enabled()) {
@ -1213,6 +1277,11 @@ static int load_module(void)
return AST_MODULE_LOAD_DECLINE;
}
if (ari_websocket_load_module() != AST_MODULE_LOAD_SUCCESS) {
unload_module();
return AST_MODULE_LOAD_DECLINE;
}
if (is_enabled()) {
ast_debug(3, "ARI enabled\n");
ast_http_uri_link(&http_uri);
@ -1252,7 +1321,6 @@ AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_
.load = load_module,
.unload = unload_module,
.reload = reload_module,
.optional_modules = "res_http_websocket",
.requires = "http,res_stasis",
.requires = "http,res_stasis,res_http_websocket",
.load_pri = AST_MODPRI_APP_DEPEND,
);

@ -48,196 +48,9 @@
#if defined(AST_DEVMODE)
#include "ari/ari_model_validators.h"
#endif
#include "asterisk/http_websocket.h"
#define MAX_VALS 128
static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_session_instance *ser,
struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
{
struct ast_ari_events_event_websocket_args args = {};
int res = 0;
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
struct ast_variable *i;
response = ast_calloc(1, sizeof(*response));
if (!response) {
ast_log(LOG_ERROR, "Failed to create response.\n");
goto fin;
}
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "app") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
args.app_parse = ast_strdup(i->value);
if (!args.app_parse) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (strlen(args.app_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args.app_count = 1;
vals[0] = args.app_parse;
} else {
args.app_count = ast_app_separate_args(
args.app_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args.app_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (args.app_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for app");
goto fin;
}
args.app = ast_malloc(sizeof(*args.app) * args.app_count);
if (!args.app) {
ast_ari_response_alloc_failed(response);
goto fin;
}
for (j = 0; j < args.app_count; ++j) {
args.app[j] = (vals[j]);
}
} else
if (strcmp(i->name, "subscribeAll") == 0) {
args.subscribe_all = ast_true(i->value);
} else
{}
}
res = ast_ari_websocket_events_event_websocket_attempted(ser, headers, &args, session_id);
fin: __attribute__((unused))
if (!response) {
ast_http_error(ser, 500, "Server Error", "Memory allocation error");
res = -1;
} else if (response->response_code != 0) {
/* Param parsing failure */
RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
ast_log(LOG_ERROR, "Missing response message\n");
}
if (msg) {
ast_http_error(ser, response->response_code, response->response_text, msg);
}
res = -1;
}
ast_free(args.app_parse);
ast_free(args.app);
return res;
}
static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocket *ws_session,
struct ast_variable *get_params, struct ast_variable *headers)
{
struct ast_ari_events_event_websocket_args args = {};
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
struct ast_variable *i;
RAII_VAR(struct ast_websocket *, s, ws_session, ast_websocket_unref);
RAII_VAR(struct ast_ari_websocket_session *, session, NULL, ao2_cleanup);
SCOPED_MODULE_USE(ast_module_info->self);
response = ast_calloc(1, sizeof(*response));
if (!response) {
ast_log(LOG_ERROR, "Failed to create response.\n");
goto fin;
}
#if defined(AST_DEVMODE)
session = ast_ari_websocket_session_create(ws_session,
ast_ari_validate_message_fn());
#else
session = ast_ari_websocket_session_create(ws_session, NULL);
#endif
if (!session) {
ast_log(LOG_ERROR, "Failed to create ARI session\n");
goto fin;
}
for (i = get_params; i; i = i->next) {
if (strcmp(i->name, "app") == 0) {
/* Parse comma separated list */
char *vals[MAX_VALS];
size_t j;
args.app_parse = ast_strdup(i->value);
if (!args.app_parse) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (strlen(args.app_parse) == 0) {
/* ast_app_separate_args can't handle "" */
args.app_count = 1;
vals[0] = args.app_parse;
} else {
args.app_count = ast_app_separate_args(
args.app_parse, ',', vals,
ARRAY_LEN(vals));
}
if (args.app_count == 0) {
ast_ari_response_alloc_failed(response);
goto fin;
}
if (args.app_count >= MAX_VALS) {
ast_ari_response_error(response, 400,
"Bad Request",
"Too many values for app");
goto fin;
}
args.app = ast_malloc(sizeof(*args.app) * args.app_count);
if (!args.app) {
ast_ari_response_alloc_failed(response);
goto fin;
}
for (j = 0; j < args.app_count; ++j) {
args.app[j] = (vals[j]);
}
} else
if (strcmp(i->name, "subscribeAll") == 0) {
args.subscribe_all = ast_true(i->value);
} else
{}
}
ast_ari_websocket_events_event_websocket_established(session, headers, &args);
fin: __attribute__((unused))
if (response && response->response_code != 0) {
/* Param parsing failure */
RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
ast_log(LOG_ERROR, "Missing response message\n");
}
if (msg) {
ast_websocket_write(ws_session,
AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg));
}
}
ast_free(args.app_parse);
ast_free(args.app);
}
int ast_ari_events_user_event_parse_body(
struct ast_json *body,
struct ast_ari_events_user_event_args *args)
@ -425,9 +238,6 @@ static struct stasis_rest_handlers events = {
static int unload_module(void)
{
ast_ari_remove_handler(&events);
ao2_cleanup(events.ws_server);
events.ws_server = NULL;
ast_ari_websocket_events_event_websocket_dtor();
return 0;
}
@ -435,28 +245,7 @@ static int load_module(void)
{
int res = 0;
struct ast_websocket_protocol *protocol;
if (ast_ari_websocket_events_event_websocket_init() == -1) {
return AST_MODULE_LOAD_DECLINE;
}
events.ws_server = ast_websocket_server_create();
if (!events.ws_server) {
ast_ari_websocket_events_event_websocket_dtor();
return AST_MODULE_LOAD_DECLINE;
}
protocol = ast_websocket_sub_protocol_alloc("ari");
if (!protocol) {
ao2_ref(events.ws_server, -1);
events.ws_server = NULL;
ast_ari_websocket_events_event_websocket_dtor();
return AST_MODULE_LOAD_DECLINE;
}
protocol->session_attempted = ast_ari_events_event_websocket_ws_attempted_cb;
protocol->session_established = ast_ari_events_event_websocket_ws_established_cb;
res |= ast_websocket_server_add_protocol2(events.ws_server, protocol);
events.is_websocket = 1;
res |= ast_ari_add_handler(&events);
if (res) {

@ -96,61 +96,6 @@ void ast_ari_{{c_name}}_{{c_nickname}}(struct ast_variable *headers, struct ast_
void ast_ari_{{c_name}}_{{c_nickname}}(struct ast_tcptls_session_instance *ser, struct ast_variable *headers, struct ast_ari_{{c_name}}_{{c_nickname}}_args *args, struct ast_ari_response *response);
{{/is_binary_response}}
{{/is_req}}
{{#is_websocket}}
/*!
* \brief {{{summary}}}
{{#notes}}
*
* {{{notes}}}
{{/notes}}
*
* \retval 0 success
* \retval -1 error
*/
int ast_ari_websocket_{{c_name}}_{{c_nickname}}_init(void);
/*!
* \brief {{{summary}}}
{{#notes}}
*
* {{{notes}}}
{{/notes}}
*/
void ast_ari_websocket_{{c_name}}_{{c_nickname}}_dtor(void);
/*!
* \brief {{summary}}
{{#notes}}
*
* {{{notes}}}
{{/notes}}
*
* \param ser HTTP TCP/TLS Server Session
* \param headers HTTP headers
* \param args Swagger parameters
* \param session_id The id of the current session.
*
* \retval 0 success
* \retval non-zero error
*/
int ast_ari_websocket_{{c_name}}_{{c_nickname}}_attempted(struct ast_tcptls_session_instance *ser,
struct ast_variable *headers, struct ast_ari_{{c_name}}_{{c_nickname}}_args *args, const char *session_id);
/*!
* \brief {{summary}}
{{#notes}}
*
* {{{notes}}}
{{/notes}}
*
* \param session ARI WebSocket.
* \param headers HTTP headers.
* \param args Swagger parameters.
*/
void ast_ari_websocket_{{c_name}}_{{c_nickname}}_established(struct ast_ari_websocket_session *session,
struct ast_variable *headers, struct ast_ari_{{c_name}}_{{c_nickname}}_args *args);
{{/is_websocket}}
{{/operations}}
{{/apis}}

@ -55,13 +55,6 @@
#if defined(AST_DEVMODE)
#include "ari/ari_model_validators.h"
#endif
{{#has_websocket}}
{{! Only include http_websocket if necessary. Otherwise we'll do a lot of
* unnecessary optional_api intialization, which makes optional_api harder
* to debug
}}
#include "asterisk/http_websocket.h"
{{/has_websocket}}
#define MAX_VALS 128
@ -149,112 +142,6 @@ fin: __attribute__((unused))
return;
}
{{/is_req}}
{{#is_websocket}}
static int ast_ari_{{c_name}}_{{c_nickname}}_ws_attempted_cb(struct ast_tcptls_session_instance *ser,
struct ast_variable *get_params, struct ast_variable *headers, const char *session_id)
{
struct ast_ari_{{c_name}}_{{c_nickname}}_args args = {};
{{#has_parameters}}
int res = 0;
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
struct ast_variable *i;
{{/has_parameters}}
{{#has_parameters}}
response = ast_calloc(1, sizeof(*response));
if (!response) {
ast_log(LOG_ERROR, "Failed to create response.\n");
goto fin;
}
{{/has_parameters}}
{{> param_parsing}}
res = ast_ari_websocket_{{c_name}}_{{c_nickname}}_attempted(ser, headers, &args, session_id);
fin: __attribute__((unused))
if (!response) {
ast_http_error(ser, 500, "Server Error", "Memory allocation error");
res = -1;
} else if (response->response_code != 0) {
/* Param parsing failure */
RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
ast_log(LOG_ERROR, "Missing response message\n");
}
if (msg) {
ast_http_error(ser, response->response_code, response->response_text, msg);
}
res = -1;
}
{{> param_cleanup}}
{{#has_parameters}}
return res;
{{/has_parameters}}
}
static void ast_ari_{{c_name}}_{{c_nickname}}_ws_established_cb(struct ast_websocket *ws_session,
struct ast_variable *get_params, struct ast_variable *headers)
{
struct ast_ari_{{c_name}}_{{c_nickname}}_args args = {};
{{#has_parameters}}
RAII_VAR(struct ast_ari_response *, response, NULL, ast_free);
struct ast_variable *i;
{{/has_parameters}}
RAII_VAR(struct ast_websocket *, s, ws_session, ast_websocket_unref);
RAII_VAR(struct ast_ari_websocket_session *, session, NULL, ao2_cleanup);
{{#has_path_parameters}}
/* TODO: It's not immediately obvious how to pass path params through
* the websocket code to this callback. Not needed right now, so we'll
* just punt. */
struct ast_variable *path_vars = NULL;
{{/has_path_parameters}}
SCOPED_MODULE_USE(ast_module_info->self);
{{#has_parameters}}
response = ast_calloc(1, sizeof(*response));
if (!response) {
ast_log(LOG_ERROR, "Failed to create response.\n");
goto fin;
}
{{/has_parameters}}
#if defined(AST_DEVMODE)
session = ast_ari_websocket_session_create(ws_session,
ast_ari_validate_{{response_class.c_name}}_fn());
#else
session = ast_ari_websocket_session_create(ws_session, NULL);
#endif
if (!session) {
ast_log(LOG_ERROR, "Failed to create ARI session\n");
goto fin;
}
{{> param_parsing}}
ast_ari_websocket_{{c_name}}_{{c_nickname}}_established(session, headers, &args);
fin: __attribute__((unused))
if (response && response->response_code != 0) {
/* Param parsing failure */
RAII_VAR(char *, msg, NULL, ast_json_free);
if (response->message) {
msg = ast_json_dump_string(response->message);
} else {
ast_log(LOG_ERROR, "Missing response message\n");
}
if (msg) {
ast_websocket_write(ws_session,
AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg));
}
}
{{> param_cleanup}}
}
{{/is_websocket}}
{{/operations}}
{{/apis}}
@ -266,13 +153,6 @@ fin: __attribute__((unused))
static int unload_module(void)
{
ast_ari_remove_handler(&{{root_full_name}});
{{#apis}}
{{#has_websocket}}
ao2_cleanup({{full_name}}.ws_server);
{{full_name}}.ws_server = NULL;
ast_ari_websocket_events_event_websocket_dtor();
{{/has_websocket}}
{{/apis}}
return 0;
}
@ -283,28 +163,7 @@ static int load_module(void)
{{#apis}}
{{#operations}}
{{#is_websocket}}
struct ast_websocket_protocol *protocol;
if (ast_ari_websocket_{{c_name}}_{{c_nickname}}_init() == -1) {
return AST_MODULE_LOAD_DECLINE;
}
{{full_name}}.ws_server = ast_websocket_server_create();
if (!{{full_name}}.ws_server) {
ast_ari_websocket_events_event_websocket_dtor();
return AST_MODULE_LOAD_DECLINE;
}
protocol = ast_websocket_sub_protocol_alloc("{{websocket_protocol}}");
if (!protocol) {
ao2_ref({{full_name}}.ws_server, -1);
{{full_name}}.ws_server = NULL;
ast_ari_websocket_events_event_websocket_dtor();
return AST_MODULE_LOAD_DECLINE;
}
protocol->session_attempted = ast_ari_{{c_name}}_{{c_nickname}}_ws_attempted_cb;
protocol->session_established = ast_ari_{{c_name}}_{{c_nickname}}_ws_established_cb;
res |= ast_websocket_server_add_protocol2({{full_name}}.ws_server, protocol);
{{full_name}}.is_websocket = 1;
{{/is_websocket}}
{{/operations}}
{{/apis}}

@ -200,7 +200,8 @@
"TextMessageReceived",
"ChannelConnectedLine",
"PeerStatusChange",
"ChannelTransfer"
"ChannelTransfer",
"RESTResponse"
]
},
"ContactInfo": {
@ -1039,6 +1040,125 @@
"description": "Value of the parameter"
}
}
},
"RESTHeader": {
"id": "RESTHeader",
"description": "REST over Websocket header",
"properties": {
"name": {
"type": "string",
"description": "Header name",
"required": true
},
"value": {
"required": true,
"type": "string",
"description": "Header value"
}
}
},
"RESTQueryStringParameter": {
"id": "RESTQueryStringParameter",
"description": "REST over Websocket Query String Parameter",
"properties": {
"name": {
"type": "string",
"description": "Parameter name",
"required": true
},
"value": {
"required": true,
"type": "string",
"description": "Parameter value"
}
}
},
"RESTRequest": {
"id": "RESTRequest",
"description": "REST over Websocket Request.",
"properties": {
"type": {
"type": "string",
"description": "Message type. Must be 'RESTRequest'",
"required": true
},
"transaction_id": {
"type": "string",
"description": "Opaque transaction id. Can be any valid string. Will be returned in any response to this request.",
"required": true
},
"request_id": {
"type": "string",
"description": "Opaque request id. Can be any valid string. Will be returned in any response to this request.",
"required": true
},
"method": {
"required": true,
"type": "string",
"description": "HTTP method (GET, PUT, POST, DELETE, etc.)"
},
"uri": {
"required": true,
"type": "string",
"description": "Resource URI with optional query string parameters."
},
"content_type": {
"required": false,
"type": "string",
"description": "The Content-Type of the message body."
},
"query_strings": {
"required": false,
"type": "List[RESTQueryStringParameter]",
"description": "Request query string parameters."
},
"message_body": {
"required": false,
"type": "string",
"description": "Request message body. Only content types application/json and application/x-www-form-urlencoded are supported."
}
}
},
"RESTResponse": {
"id": "RESTResponse",
"description": "REST over Websocket Response.",
"properties": {
"transaction_id": {
"type": "string",
"description": "Opaque transaction id. Will be whatever was specified on the original request.",
"required": true
},
"request_id": {
"type": "string",
"description": "Opaque request id. Will be whatever was specified on the original request.",
"required": true
},
"status_code": {
"required": true,
"type": "int",
"description": "HTTP status code"
},
"reason_phrase": {
"required": true,
"type": "string",
"description": "HTTP reason phrase"
},
"uri": {
"required": true,
"type": "string",
"description": "Original request resource URI"
},
"content_type": {
"required": false,
"type": "string",
"description": "The Content-Type of the message body."
},
"message_body": {
"required": false,
"type": "string",
"description": "Response message body"
}
}
}
}
}

@ -345,7 +345,7 @@ AST_TEST_DEFINE(invoke_get)
"head2", "head-two",
"path_vars");
ast_ari_invoke(NULL, "foo", AST_HTTP_GET, get_params, headers,
ast_ari_invoke(NULL, ARI_INVOKE_SOURCE_TEST, NULL, "foo", AST_HTTP_GET, get_params, headers,
ast_json_null(), response);
ast_test_validate(test, 1 == invocation_count);
@ -383,7 +383,7 @@ AST_TEST_DEFINE(invoke_wildcard)
"path_vars",
"bam", "foshizzle");
ast_ari_invoke(NULL, "foo/foshizzle", AST_HTTP_GET, get_params, headers,
ast_ari_invoke(NULL, ARI_INVOKE_SOURCE_TEST, NULL, "foo/foshizzle", AST_HTTP_GET, get_params, headers,
ast_json_null(), response);
ast_test_validate(test, 1 == invocation_count);
@ -421,7 +421,7 @@ AST_TEST_DEFINE(invoke_delete)
"path_vars",
"bam", "foshizzle");
ast_ari_invoke(NULL, "foo/foshizzle/bang", AST_HTTP_DELETE, get_params, headers,
ast_ari_invoke(NULL, ARI_INVOKE_SOURCE_TEST, NULL, "foo/foshizzle/bang", AST_HTTP_DELETE, get_params, headers,
ast_json_null(), response);
ast_test_validate(test, 1 == invocation_count);
@ -472,7 +472,7 @@ AST_TEST_DEFINE(invoke_post)
"head2", "head-two",
"path_vars");
ast_ari_invoke(NULL, "foo/bar", AST_HTTP_POST, get_params, headers,
ast_ari_invoke(NULL, ARI_INVOKE_SOURCE_TEST, NULL, "foo/bar", AST_HTTP_POST, get_params, headers,
ast_json_null(), response);
ast_test_validate(test, 1 == invocation_count);
@ -502,7 +502,7 @@ AST_TEST_DEFINE(invoke_bad_post)
fixture = setup_invocation_test();
response = response_alloc();
ast_ari_invoke(NULL, "foo", AST_HTTP_POST, get_params, headers,
ast_ari_invoke(NULL, ARI_INVOKE_SOURCE_TEST, NULL, "foo", AST_HTTP_POST, get_params, headers,
ast_json_null(), response);
ast_test_validate(test, 0 == invocation_count);
@ -531,7 +531,7 @@ AST_TEST_DEFINE(invoke_not_found)
fixture = setup_invocation_test();
response = response_alloc();
ast_ari_invoke(NULL, "foo/fizzle/i-am-not-a-resource", AST_HTTP_GET, get_params, headers,
ast_ari_invoke(NULL, ARI_INVOKE_SOURCE_TEST, NULL, "foo/fizzle/i-am-not-a-resource", AST_HTTP_GET, get_params, headers,
ast_json_null(), response);
ast_test_validate(test, 0 == invocation_count);

Loading…
Cancel
Save