Merge "pjsip_distributor.c: Fix deadlock with TCP type transports." into 13

certified/13.18
George Joseph 8 years ago committed by Gerrit Code Review
commit 642c597507

@ -140,62 +140,189 @@ static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata)
/*! Dialog-specific information the distributor uses */
struct distributor_dialog_data {
/*! dialog_associations ao2 container key */
pjsip_dialog *dlg;
/*! Serializer to distribute tasks to for this dialog */
struct ast_taskprocessor *serializer;
/*! Endpoint associated with this dialog */
struct ast_sip_endpoint *endpoint;
};
#define DIALOG_ASSOCIATIONS_BUCKETS 251
static struct ao2_container *dialog_associations;
/*!
* \internal
* \brief Compute a hash value on an arbitrary buffer.
* \since 13.17.0
*
* \param[in] pos The buffer to add to the hash
* \param[in] len The buffer length to add to the hash
* \param[in] hash The hash value to add to
*
* \details
* This version of the function is for when you need to compute a
* hash of more than one buffer.
*
* This famous hash algorithm was written by Dan Bernstein and is
* commonly used.
*
* \note Call this with the dialog locked
* \sa http://www.cse.yorku.ca/~oz/hash.html
*/
static struct distributor_dialog_data *distributor_dialog_data_alloc(pjsip_dialog *dlg)
static int buf_hash_add(const char *pos, size_t len, int hash)
{
struct distributor_dialog_data *dist;
while (len--) {
hash = hash * 33 ^ *pos++;
}
return hash;
}
/*!
* \internal
* \brief Compute a hash value on an arbitrary buffer.
* \since 13.17.0
*
* \param[in] pos The buffer to add to the hash
* \param[in] len The buffer length to add to the hash
*
* \details
* This version of the function is for when you need to compute a
* hash of more than one buffer.
*
* This famous hash algorithm was written by Dan Bernstein and is
* commonly used.
*
* \sa http://www.cse.yorku.ca/~oz/hash.html
*/
static int buf_hash(const char *pos, size_t len)
{
return buf_hash_add(pos, len, 5381);
}
static int dialog_associations_hash(const void *obj, int flags)
{
const struct distributor_dialog_data *object;
union {
const pjsip_dialog *dlg;
const char buf[sizeof(pjsip_dialog *)];
} key;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_KEY:
key.dlg = obj;
break;
case OBJ_SEARCH_OBJECT:
object = obj;
key.dlg = object->dlg;
break;
default:
/* Hash can only work on something with a full key. */
ast_assert(0);
return 0;
}
return ast_str_hash_restrict(buf_hash(key.buf, sizeof(key.buf)));
}
dist = PJ_POOL_ZALLOC_T(dlg->pool, struct distributor_dialog_data);
pjsip_dlg_set_mod_data(dlg, distributor_mod.id, dist);
static int dialog_associations_cmp(void *obj, void *arg, int flags)
{
const struct distributor_dialog_data *object_left = obj;
const struct distributor_dialog_data *object_right = arg;
const pjsip_dialog *right_key = arg;
int cmp = 0;
return dist;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT:
right_key = object_right->dlg;
/* Fall through */
case OBJ_SEARCH_KEY:
if (object_left->dlg == right_key) {
cmp = CMP_MATCH;
}
break;
case OBJ_SEARCH_PARTIAL_KEY:
/* There is no such thing for this container. */
ast_assert(0);
break;
default:
cmp = 0;
break;
}
return cmp;
}
void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
{
struct distributor_dialog_data *dist;
SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
ao2_wrlock(dialog_associations);
dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!dist) {
dist = distributor_dialog_data_alloc(dlg);
if (serializer) {
dist = ao2_alloc(sizeof(*dist), NULL);
if (dist) {
dist->dlg = dlg;
dist->serializer = serializer;
ao2_link_flags(dialog_associations, dist, OBJ_NOLOCK);
ao2_ref(dist, -1);
}
}
} else {
ao2_lock(dist);
dist->serializer = serializer;
if (!dist->serializer && !dist->endpoint) {
ao2_unlink_flags(dialog_associations, dist, OBJ_NOLOCK);
}
ao2_unlock(dist);
ao2_ref(dist, -1);
}
dist->serializer = serializer;
ao2_unlock(dialog_associations);
}
void ast_sip_dialog_set_endpoint(pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint)
{
struct distributor_dialog_data *dist;
SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
ao2_wrlock(dialog_associations);
dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY | OBJ_NOLOCK);
if (!dist) {
dist = distributor_dialog_data_alloc(dlg);
if (endpoint) {
dist = ao2_alloc(sizeof(*dist), NULL);
if (dist) {
dist->dlg = dlg;
dist->endpoint = endpoint;
ao2_link_flags(dialog_associations, dist, OBJ_NOLOCK);
ao2_ref(dist, -1);
}
}
} else {
ao2_lock(dist);
dist->endpoint = endpoint;
if (!dist->serializer && !dist->endpoint) {
ao2_unlink_flags(dialog_associations, dist, OBJ_NOLOCK);
}
ao2_unlock(dist);
ao2_ref(dist, -1);
}
dist->endpoint = endpoint;
ao2_unlock(dialog_associations);
}
struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg)
{
struct distributor_dialog_data *dist;
SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
struct ast_sip_endpoint *endpoint;
dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
if (!dist || !dist->endpoint) {
return NULL;
dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY);
if (dist) {
ao2_lock(dist);
endpoint = ao2_bump(dist->endpoint);
ao2_unlock(dist);
ao2_ref(dist, -1);
} else {
endpoint = NULL;
}
ao2_ref(dist->endpoint, +1);
return dist->endpoint;
return endpoint;
}
static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
@ -227,7 +354,7 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) ||
rdata->msg_info.to->tag.slen != 0) {
dlg = pjsip_ua_find_dialog(&rdata->msg_info.cid->id, local_tag,
remote_tag, PJ_TRUE);
remote_tag, PJ_FALSE);
if (dlg) {
return dlg;
}
@ -265,11 +392,6 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
pj_mutex_unlock(tsx->mutex);
#endif
if (!dlg) {
return NULL;
}
pjsip_dlg_inc_lock(dlg);
return dlg;
}
@ -292,16 +414,7 @@ static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
*/
static int pjstr_hash_add(pj_str_t *str, int hash)
{
size_t len;
const char *pos;
len = pj_strlen(str);
pos = pj_strbuf(str);
while (len--) {
hash = hash * 33 ^ *pos++;
}
return hash;
return buf_hash_add(pj_strbuf(str), pj_strlen(str), hash);
}
/*!
@ -340,7 +453,7 @@ struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdat
/* Compute the hash from the SIP message call-id and remote-tag */
hash = pjstr_hash(&rdata->msg_info.cid->id);
hash = pjstr_hash_add(remote_tag, hash);
hash = abs(hash);
hash = ast_str_hash_restrict(hash);
serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
if (serializer) {
@ -375,17 +488,18 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
dlg = find_dialog(rdata);
if (dlg) {
ast_debug(3, "Searching for serializer on dialog %s for %s\n",
ast_debug(3, "Searching for serializer associated with dialog %s for %s\n",
dlg->obj_name, pjsip_rx_data_get_info(rdata));
dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
dist = ao2_find(dialog_associations, dlg, OBJ_SEARCH_KEY);
if (dist) {
ao2_lock(dist);
serializer = ao2_bump(dist->serializer);
ao2_unlock(dist);
if (serializer) {
ast_debug(3, "Found serializer %s on dialog %s\n",
ast_debug(3, "Found serializer %s associated with dialog %s\n",
ast_taskprocessor_name(serializer), dlg->obj_name);
}
}
pjsip_dlg_dec_lock(dlg);
}
if (serializer) {
@ -407,6 +521,7 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
/* We have a BYE or CANCEL request without a serializer. */
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
ao2_cleanup(dist);
return PJ_TRUE;
} else {
if (ast_taskprocessor_alert_get()) {
@ -421,6 +536,7 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
*/
ast_debug(3, "Taskprocessor overload alert: Ignoring '%s'.\n",
pjsip_rx_data_get_info(rdata));
ao2_cleanup(dist);
return PJ_TRUE;
}
@ -428,10 +544,17 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
serializer = ast_sip_get_distributor_serializer(rdata);
}
pjsip_rx_data_clone(rdata, 0, &clone);
if (pjsip_rx_data_clone(rdata, 0, &clone) != PJ_SUCCESS) {
ast_taskprocessor_unreference(serializer);
ao2_cleanup(dist);
return PJ_TRUE;
}
if (dist) {
ao2_lock(dist);
clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
ao2_unlock(dist);
ao2_cleanup(dist);
}
if (ast_sip_push_task(serializer, distribute, clone)) {
@ -1078,6 +1201,14 @@ int ast_sip_initialize_distributor(void)
return -1;
}
dialog_associations = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
DIALOG_ASSOCIATIONS_BUCKETS, dialog_associations_hash, NULL,
dialog_associations_cmp);
if (!dialog_associations) {
ast_sip_destroy_distributor();
return -1;
}
if (distributor_pool_setup()) {
ast_sip_destroy_distributor();
return -1;
@ -1156,5 +1287,6 @@ void ast_sip_destroy_distributor(void)
distributor_pool_shutdown();
ao2_cleanup(dialog_associations);
ao2_cleanup(unidentified_requests);
}

Loading…
Cancel
Save