pjsip: Rewrite OPTIONS support with new eyes.

The OPTIONS support in PJSIP has organically grown, like many things in
Asterisk.  It has been tweaked, changed, and adapted based on situations
run into.  Unfortunately this has taken its toll.  Configuration file
based objects have poor performance and even dynamic ones aren't that
great.

This change scraps the existing code and starts fresh with new eyes.  It
leverages all of the APIs made available such as sorcery observers and
serializers to provide a better implementation.

1.  The state of contacts, AORs, and endpoints relevant to the qualify
process is maintained.  This state can be updated by external forces (such
as a device registering/unregistering) and also the reload process.  This
state also includes the association between endpoints and AORs.

2.  AORs are scheduled and not contacts.  This reduces the amount of work
spent juggling scheduled items.

3.  Manipulation of which AORs are being qualified and the endpoint states
all occur within a serializer to reduce the conflict that can occur with
multiple threads attempting to modify things.

4.  Operations regarding an AOR use a serializer specific to that AOR.

5.  AORs and endpoint state act as state compositors.  They take input
from lower level objects (contacts feed AORs, AORs feed endpoint state)
and determine if a sufficient enough change has occurred to be fed further
up the chain.

6.  Realtime is supported by using observers to know when a contact has
been registered.  If state does not exist for the associated AOR then it
is retrieved and becomes active as appropriate.

The end result of all of this is best shown with a configuration file of
3000 endpoints each with an AOR that has a static contact.  In the old
code it would take over a minute to load and use all 8 of my cores.  This
new code takes 2-3 seconds and barely touches the CPU even while dealing
with all of the OPTIONS requests.

ASTERISK-26806

Change-Id: I6a5ebbfca9001dfe933eaeac4d3babd8d2e6f082
changes/58/8758/4
Joshua Colp 8 years ago committed by Richard Mudgett
parent 9c430569d4
commit 882e79b77e

@ -142,12 +142,12 @@ static int pjsip_contact_function_read(struct ast_channel *chan,
return -1;
}
contact_status = ast_sorcery_retrieve_by_id(pjsip_sorcery, CONTACT_STATUS, ast_sorcery_object_get_id(contact_obj));
contact_status = ast_sip_get_contact_status(contact_obj);
if (!strcmp(args.field_name, "status")) {
ast_str_set(buf, len, "%s", ast_sip_get_contact_status_label(contact_status->status));
ast_str_set(buf, len, "%s", ast_sip_get_contact_status_label(contact_status ? contact_status->status : UNKNOWN));
} else if (!strcmp(args.field_name, "rtt")) {
if (contact_status->status == UNKNOWN) {
if (!contact_status || contact_status->status != AVAILABLE) {
ast_str_set(buf, len, "%s", "N/A");
} else {
ast_str_set(buf, len, "%" PRId64, contact_status->rtt);

@ -283,8 +283,6 @@ struct ast_sip_contact {
int prune_on_boot;
};
#define CONTACT_STATUS "contact_status"
/*!
* \brief Status type for a contact.
*/
@ -307,23 +305,20 @@ enum ast_sip_contact_status_type {
* if available.
*/
struct ast_sip_contact_status {
SORCERY_OBJECT(details);
AST_DECLARE_STRING_FIELDS(
/*! The original contact's URI */
AST_STRING_FIELD(uri);
/*! The name of the aor this contact_status belongs to */
AST_STRING_FIELD(aor);
);
/*! Current status for a contact (default - unavailable) */
enum ast_sip_contact_status_type status;
/*! The round trip start time set before sending a qualify request */
struct timeval rtt_start;
/*! The round trip time in microseconds */
int64_t rtt;
/*! Current status for a contact (default - unavailable) */
enum ast_sip_contact_status_type status;
/*! Last status for a contact (default - unavailable) */
enum ast_sip_contact_status_type last_status;
/*! TRUE if the contact was refreshed. e.g., re-registered */
unsigned int refresh:1;
/*! Name of the contact */
char name[0];
};
/*!
@ -1061,13 +1056,33 @@ void *ast_sip_endpoint_alloc(const char *name);
/*!
* \brief Change state of a persistent endpoint.
*
* \param endpoint The SIP endpoint name to change state.
* \param endpoint_name The SIP endpoint name to change state.
* \param state The new state
* \retval 0 Success
* \retval -1 Endpoint not found
*/
int ast_sip_persistent_endpoint_update_state(const char *endpoint_name, enum ast_endpoint_state state);
/*!
* \brief Publish the change of state for a contact.
*
* \param endpoint_name The SIP endpoint name.
* \param contact_status The contact status.
*/
void ast_sip_persistent_endpoint_publish_contact_state(const char *endpoint_name, const struct ast_sip_contact_status *contact_status);
/*!
* \brief Retrieve the current status for a contact.
*
* \param contact The contact.
*
* \retval non-NULL Success
* \retval NULL Status information not found
*
* \note The returned contact status object is immutable.
*/
struct ast_sip_contact_status *ast_sip_get_contact_status(const struct ast_sip_contact *contact);
/*!
* \brief Get a pointer to the PJSIP endpoint.
*

@ -5174,9 +5174,13 @@ static int load_module(void)
}
ast_sip_initialize_dns();
ast_sip_initialize_global_headers();
if (ast_res_pjsip_preinit_options_handling()) {
ast_log(LOG_ERROR, "Failed to pre-initialize OPTIONS handling. Aborting load\n");
goto error;
}
if (ast_res_pjsip_initialize_configuration()) {
ast_log(LOG_ERROR, "Failed to initialize SIP configuration. Aborting load\n");
goto error;
@ -5200,7 +5204,10 @@ static int load_module(void)
goto error;
}
ast_res_pjsip_init_options_handling(0);
if (ast_res_pjsip_init_options_handling(0)) {
ast_log(LOG_ERROR, "Failed to initialize OPTIONS handling. Aborting load\n");
goto error;
}
if (ast_res_pjsip_init_message_filter()) {
ast_log(LOG_ERROR, "Failed to initialize message IP updating. Aborting load\n");

@ -191,6 +191,15 @@ void ast_sip_initialize_global_headers(void);
*/
void ast_sip_destroy_global_headers(void);
/*!
* \internal
* \brief Pre-initialize OPTIONS request handling.
*
* \retval 0 on success
* \retval other on failure
*/
int ast_res_pjsip_preinit_options_handling(void);
/*!
* \internal
* \brief Initialize OPTIONS request handling.

@ -180,7 +180,7 @@ static int contact_remove_unreachable(void *obj, void *arg, int flags)
struct ast_sip_contact_status *status;
int unreachable;
status = ast_res_pjsip_find_or_create_contact_status(contact);
status = ast_sip_get_contact_status(contact);
if (!status) {
return 0;
}
@ -1065,7 +1065,7 @@ static int cli_contact_print_body(void *obj, void *arg, int flags)
ast_assert(contact->uri != NULL);
ast_assert(context->output_buffer != NULL);
status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), CONTACT_STATUS, contact_id);
status = ast_sip_get_contact_status(contact);
indent = CLI_INDENT_TO_SPACES(context->indent_level);
flexwidth = CLI_LAST_TABSTOP - indent - 9 - strlen(contact->aor) + 1;
@ -1078,7 +1078,7 @@ static int cli_contact_print_body(void *obj, void *arg, int flags)
contact->uri,
hash_start,
ast_sip_get_contact_short_status_label(status ? status->status : UNKNOWN),
(status && (status->status != UNKNOWN) ? ((long long) status->rtt) / 1000.0 : NAN));
(status && (status->status == AVAILABLE)) ? ((long long) status->rtt) / 1000.0 : NAN);
ao2_cleanup(status);
return 0;

@ -42,8 +42,6 @@
struct sip_persistent_endpoint {
/*! \brief Asterisk endpoint itself */
struct ast_endpoint *endpoint;
/*! \brief AORs that we should react to */
char *aors;
};
/*! \brief Container for persistent endpoint information */
@ -70,239 +68,6 @@ static int persistent_endpoint_cmp(void *obj, void *arg, int flags)
return !strcmp(ast_endpoint_get_resource(persistent1->endpoint), id) ? CMP_MATCH | CMP_STOP : 0;
}
/*! \brief Internal function for changing the state of an endpoint */
static void endpoint_update_state(struct ast_endpoint *endpoint, enum ast_endpoint_state state)
{
struct ast_json *blob;
char *regcontext;
/* If there was no state change, don't publish anything. */
if (ast_endpoint_get_state(endpoint) == state) {
return;
}
regcontext = ast_sip_get_regcontext();
if (state == AST_ENDPOINT_ONLINE) {
ast_endpoint_set_state(endpoint, AST_ENDPOINT_ONLINE);
blob = ast_json_pack("{s: s}", "peer_status", "Reachable");
if (!ast_strlen_zero(regcontext)) {
if (!ast_exists_extension(NULL, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL)) {
ast_add_extension(regcontext, 1, ast_endpoint_get_resource(endpoint), 1, NULL, NULL,
"Noop", ast_strdup(ast_endpoint_get_resource(endpoint)), ast_free_ptr, "SIP");
}
}
ast_verb(2, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(endpoint));
} else {
ast_endpoint_set_state(endpoint, AST_ENDPOINT_OFFLINE);
blob = ast_json_pack("{s: s}", "peer_status", "Unreachable");
if (!ast_strlen_zero(regcontext)) {
struct pbx_find_info q = { .stacklen = 0 };
if (pbx_find_extension(NULL, NULL, &q, regcontext, ast_endpoint_get_resource(endpoint), 1, NULL, "", E_MATCH)) {
ast_context_remove_extension(regcontext, ast_endpoint_get_resource(endpoint), 1, NULL);
}
}
ast_verb(2, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(endpoint));
}
ast_free(regcontext);
ast_endpoint_blob_publish(endpoint, ast_endpoint_state_type(), blob);
ast_json_unref(blob);
ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(endpoint));
}
static void endpoint_publish_contact_status(struct ast_endpoint *endpoint, struct ast_sip_contact_status *contact)
{
struct ast_json *blob;
char rtt[32];
snprintf(rtt, sizeof(rtt), "%" PRId64, contact->rtt);
blob = ast_json_pack("{s: s, s: s, s: s, s: s, s: s}",
"contact_status", ast_sip_get_contact_status_label(contact->status),
"aor", contact->aor,
"uri", contact->uri,
"roundtrip_usec", rtt,
"endpoint_name", ast_endpoint_get_resource(endpoint));
if (blob) {
ast_endpoint_blob_publish(endpoint, ast_endpoint_contact_state_type(), blob);
ast_json_unref(blob);
}
}
/*! \brief Callback function for publishing the status of an endpoint */
static int persistent_endpoint_publish_status(void *obj, void *arg, int flags)
{
struct sip_persistent_endpoint *persistent = obj;
struct ast_endpoint *endpoint = persistent->endpoint;
struct ast_sip_contact_status *status = arg;
/* If the status' aor isn't one of the endpoint's, we skip */
if (!strstr(persistent->aors, status->aor)) {
return 0;
}
endpoint_publish_contact_status(endpoint, status);
return 0;
}
/*! \brief Callback function for changing the state of an endpoint */
static int persistent_endpoint_update_state(void *obj, void *arg, int flags)
{
struct sip_persistent_endpoint *persistent = obj;
struct ast_endpoint *endpoint = persistent->endpoint;
struct ast_sip_contact_status *status = arg;
struct ao2_container *contacts;
struct ao2_iterator iter;
struct ast_sip_contact *contact;
enum ast_endpoint_state state = AST_ENDPOINT_OFFLINE;
/* If the status' aor isn't one of the endpoint's, we skip */
if (!strstr(persistent->aors, status->aor)) {
return 0;
}
endpoint_publish_contact_status(endpoint, status);
/* Find all the contacts for this endpoint. If ANY are available,
* mark the endpoint as ONLINE.
*/
contacts = ast_sip_location_retrieve_contacts_from_aor_list(persistent->aors);
if (contacts) {
iter = ao2_iterator_init(contacts, 0);
while (state == AST_ENDPOINT_OFFLINE && (contact = ao2_iterator_next(&iter))) {
struct ast_sip_contact_status *contact_status;
const char *contact_id = ast_sorcery_object_get_id(contact);
contact_status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(),
CONTACT_STATUS, contact_id);
if (contact_status && contact_status->status != UNAVAILABLE) {
state = AST_ENDPOINT_ONLINE;
}
ao2_cleanup(contact_status);
ao2_ref(contact, -1);
}
ao2_iterator_destroy(&iter);
ao2_ref(contacts, -1);
}
endpoint_update_state(endpoint, state);
return 0;
}
/*! \brief Function called when a contact is created */
static void persistent_endpoint_contact_created_observer(const void *object)
{
const struct ast_sip_contact *contact = object;
struct ast_sip_contact_status *contact_status;
contact_status = ast_sorcery_alloc(ast_sip_get_sorcery(), CONTACT_STATUS,
ast_sorcery_object_get_id(contact));
if (!contact_status) {
ast_log(LOG_ERROR, "Unable to create ast_sip_contact_status for contact %s/%s\n",
contact->aor, contact->uri);
return;
}
ast_string_field_set(contact_status, uri, contact->uri);
contact_status->status = CREATED;
ast_verb(2, "Contact %s/%s has been created\n", contact->aor, contact->uri);
ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, contact_status);
ao2_cleanup(contact_status);
}
/*! \brief Function called when a contact is deleted */
static void persistent_endpoint_contact_deleted_observer(const void *object)
{
const struct ast_sip_contact *contact = object;
struct ast_sip_contact_status *contact_status;
contact_status = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), CONTACT_STATUS, ast_sorcery_object_get_id(contact));
if (!contact_status) {
ast_log(LOG_ERROR, "Unable to find ast_sip_contact_status for contact %s/%s\n",
contact->aor, contact->uri);
return;
}
ast_verb(2, "Contact %s/%s has been deleted\n", contact->aor, contact->uri);
ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
"-1", 1.0, ast_sip_get_contact_status_label(contact_status->status));
ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
"+1", 1.0, ast_sip_get_contact_status_label(REMOVED));
ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state, contact_status);
ast_sorcery_delete(ast_sip_get_sorcery(), contact_status);
ao2_cleanup(contact_status);
}
/*! \brief Observer for contacts so state can be updated on respective endpoints */
static const struct ast_sorcery_observer state_contact_observer = {
.created = persistent_endpoint_contact_created_observer,
.deleted = persistent_endpoint_contact_deleted_observer,
};
/*! \brief Function called when a contact_status is updated */
static void persistent_endpoint_contact_status_observer(const void *object)
{
struct ast_sip_contact_status *contact_status = (struct ast_sip_contact_status *)object;
if (contact_status->refresh) {
/* We are only re-publishing the contact status. */
ao2_callback(persistent_endpoints, OBJ_NODATA,
persistent_endpoint_publish_status, contact_status);
return;
}
/* If rtt_start is set (this is the outgoing OPTIONS), ignore. */
if (contact_status->rtt_start.tv_sec > 0) {
return;
}
if (contact_status->status != contact_status->last_status) {
ast_verb(3, "Contact %s/%s is now %s. RTT: %.3f msec\n",
contact_status->aor, contact_status->uri,
ast_sip_get_contact_status_label(contact_status->status),
contact_status->rtt / 1000.0);
ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
"-1", 1.0, ast_sip_get_contact_status_label(contact_status->last_status));
ast_statsd_log_string_va("PJSIP.contacts.states.%s", AST_STATSD_GAUGE,
"+1", 1.0, ast_sip_get_contact_status_label(contact_status->status));
ast_test_suite_event_notify("AOR_CONTACT_UPDATE",
"Contact: %s\r\n"
"Status: %s",
ast_sorcery_object_get_id(contact_status),
ast_sip_get_contact_status_label(contact_status->status));
ao2_callback(persistent_endpoints, OBJ_NODATA, persistent_endpoint_update_state,
contact_status);
} else {
ast_debug(3, "Contact %s/%s status didn't change: %s, RTT: %.3f msec\n",
contact_status->aor, contact_status->uri,
ast_sip_get_contact_status_label(contact_status->status),
contact_status->rtt / 1000.0);
}
ast_statsd_log_full_va("PJSIP.contacts.%s.rtt", AST_STATSD_TIMER,
contact_status->status != AVAILABLE ? -1 : contact_status->rtt / 1000,
1.0,
ast_sorcery_object_get_id(contact_status));
}
/*! \brief Observer for contacts so state can be updated on respective endpoints */
static const struct ast_sorcery_observer state_contact_status_observer = {
.updated = persistent_endpoint_contact_status_observer,
};
static void endpoint_deleted_observer(const void *object)
{
const struct ast_sip_endpoint *endpoint = object;
@ -1352,21 +1117,89 @@ static void persistent_endpoint_destroy(void *obj)
struct sip_persistent_endpoint *persistent = obj;
ast_endpoint_shutdown(persistent->endpoint);
ast_free(persistent->aors);
}
int ast_sip_persistent_endpoint_update_state(const char *endpoint_name, enum ast_endpoint_state state)
{
struct sip_persistent_endpoint *persistent;
struct ast_json *blob;
char *regcontext;
persistent = ao2_find(persistent_endpoints, endpoint_name, OBJ_SEARCH_KEY);
if (!persistent) {
return -1;
}
ao2_lock(persistent_endpoints);
persistent = ao2_find(persistent_endpoints, endpoint_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (persistent) {
endpoint_update_state(persistent->endpoint, state);
/* If there was no state change, don't publish anything. */
if (ast_endpoint_get_state(persistent->endpoint) == state) {
ao2_ref(persistent, -1);
return 0;
}
regcontext = ast_sip_get_regcontext();
if (state == AST_ENDPOINT_ONLINE) {
ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_ONLINE);
blob = ast_json_pack("{s: s}", "peer_status", "Reachable");
if (!ast_strlen_zero(regcontext)) {
if (!ast_exists_extension(NULL, regcontext, ast_endpoint_get_resource(persistent->endpoint), 1, NULL)) {
ast_add_extension(regcontext, 1, ast_endpoint_get_resource(persistent->endpoint), 1, NULL, NULL,
"Noop", ast_strdup(ast_endpoint_get_resource(persistent->endpoint)), ast_free_ptr, "SIP");
}
}
ast_verb(2, "Endpoint %s is now Reachable\n", ast_endpoint_get_resource(persistent->endpoint));
} else {
ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_OFFLINE);
blob = ast_json_pack("{s: s}", "peer_status", "Unreachable");
if (!ast_strlen_zero(regcontext)) {
struct pbx_find_info q = { .stacklen = 0 };
if (pbx_find_extension(NULL, NULL, &q, regcontext, ast_endpoint_get_resource(persistent->endpoint), 1, NULL, "", E_MATCH)) {
ast_context_remove_extension(regcontext, ast_endpoint_get_resource(persistent->endpoint), 1, NULL);
}
}
ast_verb(2, "Endpoint %s is now Unreachable\n", ast_endpoint_get_resource(persistent->endpoint));
}
ast_free(regcontext);
ast_endpoint_blob_publish(persistent->endpoint, ast_endpoint_state_type(), blob);
ast_json_unref(blob);
ast_devstate_changed(AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "PJSIP/%s", ast_endpoint_get_resource(persistent->endpoint));
ao2_ref(persistent, -1);
return 0;
}
void ast_sip_persistent_endpoint_publish_contact_state(const char *endpoint_name, const struct ast_sip_contact_status *contact_status)
{
struct sip_persistent_endpoint *persistent;
struct ast_json *blob;
char rtt[32];
persistent = ao2_find(persistent_endpoints, endpoint_name, OBJ_SEARCH_KEY);
if (!persistent) {
return;
}
ao2_unlock(persistent_endpoints);
return persistent ? 0 : -1;
snprintf(rtt, sizeof(rtt), "%" PRId64, contact_status->rtt);
blob = ast_json_pack("{s: s, s: s, s: s, s: s, s: s}",
"contact_status", ast_sip_get_contact_status_label(contact_status->status),
"aor", contact_status->aor,
"uri", contact_status->uri,
"roundtrip_usec", rtt,
"endpoint_name", ast_endpoint_get_resource(persistent->endpoint));
if (blob) {
ast_endpoint_blob_publish(persistent->endpoint, ast_endpoint_contact_state_type(), blob);
ast_json_unref(blob);
}
ao2_ref(persistent, -1);
}
/*! \brief Internal function which finds (or creates) persistent endpoint information */
@ -1390,22 +1223,9 @@ static struct ast_endpoint *persistent_endpoint_find_or_create(const struct ast_
return NULL;
}
persistent->aors = ast_strdup(endpoint->aors);
if (!persistent->aors) {
return NULL;
}
ast_endpoint_set_state(persistent->endpoint, AST_ENDPOINT_OFFLINE);
ao2_link_flags(persistent_endpoints, persistent, OBJ_NOLOCK);
} else if (strcmp(persistent->aors, endpoint->aors)) {
char *new_aors = ast_strdup(endpoint->aors);
/* make sure we don't NULL persistent->aors if allocation fails. */
if (new_aors) {
ast_free(persistent->aors);
persistent->aors = new_aors;
}
}
ao2_ref(persistent->endpoint, +1);
@ -2097,16 +1917,7 @@ int ast_res_pjsip_initialize_configuration(void)
return -1;
}
if (ast_sip_initialize_sorcery_qualify()) {
ast_log(LOG_ERROR, "Failed to register SIP qualify support with sorcery\n");
ast_sorcery_unref(sip_sorcery);
sip_sorcery = NULL;
return -1;
}
ast_sorcery_observer_add(sip_sorcery, "endpoint", &endpoint_observers);
ast_sorcery_observer_add(sip_sorcery, "contact", &state_contact_observer);
ast_sorcery_observer_add(sip_sorcery, CONTACT_STATUS, &state_contact_status_observer);
if (ast_sip_initialize_sorcery_domain_alias()) {
ast_log(LOG_ERROR, "Failed to register SIP domain aliases support with sorcery\n");
@ -2155,8 +1966,6 @@ void ast_res_pjsip_destroy_configuration(void)
return;
}
ast_sorcery_observer_remove(sip_sorcery, CONTACT_STATUS, &state_contact_status_observer);
ast_sorcery_observer_remove(sip_sorcery, "contact", &state_contact_observer);
ast_sip_destroy_sorcery_global();
ast_sip_destroy_sorcery_location();
ast_sip_destroy_sorcery_auth();

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save