Merge "pjsip_distributor.c: Consistently pick a serializer for messages."

changes/05/3005/1
Joshua Colp 9 years ago committed by Gerrit Code Review
commit 9acb5e3084

@ -1300,6 +1300,17 @@ struct ast_serializer_shutdown_group;
*/
struct ast_taskprocessor *ast_sip_create_serializer_group(const char *name, struct ast_serializer_shutdown_group *shutdown_group);
/*!
* \brief Determine the distributor serializer for the SIP message.
* \since 13.10.0
*
* \param rdata The incoming message.
*
* \retval Calculated distributor serializer on success.
* \retval NULL on error.
*/
struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata);
/*!
* \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized
*

@ -59,6 +59,12 @@ struct unidentified_request{
char src_name[];
};
/*! Number of serializers in pool if one not otherwise known. (Best if prime number) */
#define DISTRIBUTOR_POOL_SIZE 31
/*! Pool of serializers to use if not supplied. */
static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
/*!
* \internal
* \brief Record the task's serializer name on the tdata structure.
@ -278,6 +284,83 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
return dlg;
}
/*!
* \internal
* \brief Compute a hash value on a pjlib string
* \since 13.10.0
*
* \param[in] str The pjlib string to add to the hash
* \param[in] hash The hash value to add to
*
* \details
* This version of the function is for when you need to compute a
* string hash of more than one string.
*
* This famous hash algorithm was written by Dan Bernstein and is
* commonly used.
*
* \sa http://www.cse.yorku.ca/~oz/hash.html
*/
static int pjstr_hash_add(pj_str_t *str, int hash)
{
size_t len;
const char *pos;
len = pj_strlen(str);
pos = pj_strbuf(str);
while (len--) {
hash = hash * 33 ^ *pos++;
}
return hash;
}
/*!
* \internal
* \brief Compute a hash value on a pjlib string
* \since 13.10.0
*
* \param[in] str The pjlib string to hash
*
* This famous hash algorithm was written by Dan Bernstein and is
* commonly used.
*
* http://www.cse.yorku.ca/~oz/hash.html
*/
static int pjstr_hash(pj_str_t *str)
{
return pjstr_hash_add(str, 5381);
}
struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
{
int hash;
pj_str_t *remote_tag;
struct ast_taskprocessor *serializer;
if (!rdata->msg_info.msg) {
return NULL;
}
if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
remote_tag = &rdata->msg_info.from->tag;
} else {
remote_tag = &rdata->msg_info.to->tag;
}
/* Compute the hash from the SIP message call-id and remote-tag */
hash = pjstr_hash(&rdata->msg_info.cid->id);
hash = pjstr_hash_add(remote_tag, hash);
hash = abs(hash);
serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
if (serializer) {
ast_debug(3, "Calculated serializer %s to use for %s\n",
ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
}
return serializer;
}
static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
static pjsip_module endpoint_mod = {
@ -324,12 +407,23 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
pjsip_rx_data_get_info(rdata));
serializer = find_request_serializer(rdata);
if (!serializer) {
/*
* Pick a serializer for the unmatched response. Maybe
* the stack can figure out what it is for, or we really
* should just toss it regardless.
*/
serializer = ast_sip_get_distributor_serializer(rdata);
}
} else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
|| !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
/* We have a BYE or CANCEL request without a serializer. */
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
return PJ_TRUE;
} else {
/* Pick a serializer for the out-of-dialog request. */
serializer = ast_sip_get_distributor_serializer(rdata);
}
pjsip_rx_data_clone(rdata, 0, &clone);
@ -349,7 +443,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
pjsip_rx_data_free_cloned(clone);
} else {
ast_sip_push_task(serializer, distribute, clone);
if (ast_sip_push_task(serializer, distribute, clone)) {
ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
pjsip_rx_data_free_cloned(clone);
}
}
ast_taskprocessor_unreference(serializer);
@ -796,6 +893,7 @@ static int cli_unid_print_header(void *obj, void *arg, int flags)
return 0;
}
static int cli_unid_print_body(void *obj, void *arg, int flags)
{
struct unidentified_request *unid = obj;
@ -886,6 +984,47 @@ static struct ast_sorcery_observer global_observer = {
.loaded = global_loaded,
};
/*!
* \internal
* \brief Shutdown the serializers in the distributor pool.
* \since 13.10.0
*
* \return Nothing
*/
static void distributor_pool_shutdown(void)
{
int idx;
for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
ast_taskprocessor_unreference(distributor_pool[idx]);
distributor_pool[idx] = NULL;
}
}
/*!
* \internal
* \brief Setup the serializers in the distributor pool.
* \since 13.10.0
*
* \retval 0 on success.
* \retval -1 on error.
*/
static int distributor_pool_setup(void)
{
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
int idx;
for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
/* Create name with seq number appended. */
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
distributor_pool[idx] = ast_sip_create_serializer(tps_name);
if (!distributor_pool[idx]) {
return -1;
}
}
return 0;
}
int ast_sip_initialize_distributor(void)
{
@ -895,6 +1034,11 @@ int ast_sip_initialize_distributor(void)
return -1;
}
if (distributor_pool_setup()) {
ast_sip_destroy_distributor();
return -1;
}
prune_context = ast_sched_context_create();
if (!prune_context) {
ast_sip_destroy_distributor();
@ -927,8 +1071,10 @@ int ast_sip_initialize_distributor(void)
return -1;
}
unid_formatter = ao2_alloc(sizeof(struct ast_sip_cli_formatter_entry), NULL);
unid_formatter = ao2_alloc_options(sizeof(struct ast_sip_cli_formatter_entry), NULL,
AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!unid_formatter) {
ast_sip_destroy_distributor();
ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
return -1;
}
@ -940,6 +1086,7 @@ int ast_sip_initialize_distributor(void)
unid_formatter->get_id = cli_unid_get_id;
unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
ast_sip_register_cli_formatter(unid_formatter);
ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
return 0;
@ -950,17 +1097,20 @@ void ast_sip_destroy_distributor(void)
ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
ast_sip_unregister_cli_formatter(unid_formatter);
internal_sip_unregister_service(&distributor_mod);
internal_sip_unregister_service(&endpoint_mod);
internal_sip_unregister_service(&auth_mod);
internal_sip_unregister_service(&endpoint_mod);
internal_sip_unregister_service(&distributor_mod);
ao2_cleanup(artificial_auth);
ao2_cleanup(artificial_endpoint);
ao2_cleanup(unidentified_requests);
ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
if (prune_context) {
ast_sched_context_destroy(prune_context);
}
distributor_pool_shutdown();
ao2_cleanup(unidentified_requests);
}

Loading…
Cancel
Save