chan_pjsip: Fix deadlock when masquerading PJSIP channels.

Performing a directed call pickup resulted in a deadlock when PJSIP
channels were involved.

A masquerade needs to hold onto the channel locks while it swaps channel
information between the two channels involved in the masquerade.  With
PJSIP channels, the fixup routine needed to push a fixup task onto the
PJSIP channel's serializer.  Unfortunately, if the serializer was also
processing a task that needed to lock the channel, you get deadlock.

* Added a new control frame that is used to notify the channels that a
masquerade is about to start and when it has completed.

* Added the ability to query taskprocessors if the current thread is the
taskprocessor thread.

* Added the ability to suspend/unsuspend the PJSIP serializer thread so a
masquerade could fixup the PJSIP channel without using the serializer.

ASTERISK-24356 #close
Reported by: rmudgett

Review: https://reviewboard.asterisk.org/r/4034/
........

Merged revisions 424471 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/13@424472 65c4cc65-6c06-0410-ace0-fbb531ad65f3
changes/97/197/1
Richard Mudgett 11 years ago
parent b67094624d
commit 6a844be566

@ -1396,6 +1396,7 @@ static int ooh323_indicate(struct ast_channel *ast, int condition, const void *d
break;
case AST_CONTROL_PROCEEDING:
case AST_CONTROL_PVT_CAUSE_CODE:
case AST_CONTROL_MASQUERADE_NOTIFY:
case -1:
break;
default:

@ -1421,6 +1421,8 @@ static int iax2_is_control_frame_allowed(int subtype)
/* Only meaningful across a bridge on this machine for direct-media exchange. */
case AST_CONTROL_PVT_CAUSE_CODE:
/* Intended only for the sending machine's local channel structure. */
case AST_CONTROL_MASQUERADE_NOTIFY:
/* Intended only for masquerades when calling ast_indicate_data(). */
case AST_CONTROL_STREAM_STOP:
case AST_CONTROL_STREAM_SUSPEND:
case AST_CONTROL_STREAM_RESTART:
@ -5770,6 +5772,7 @@ static int iax2_indicate(struct ast_channel *c, int condition, const void *data,
}
break;
case AST_CONTROL_PVT_CAUSE_CODE:
case AST_CONTROL_MASQUERADE_NOTIFY:
res = -1;
goto done;
}

@ -7108,6 +7108,7 @@ static int misdn_indication(struct ast_channel *ast, int cond, const void *data,
chan_misdn_log(1, p->bc->port, " --> * Unknown Indication:%d pid:%d\n", cond, p->bc->pid);
/* fallthrough */
case AST_CONTROL_PVT_CAUSE_CODE:
case AST_CONTROL_MASQUERADE_NOTIFY:
return -1;
}

@ -1805,6 +1805,7 @@ static int jingle_indicate(struct ast_channel *ast, int condition, const void *d
case AST_CONTROL_CONNECTED_LINE:
break;
case AST_CONTROL_PVT_CAUSE_CODE:
case AST_CONTROL_MASQUERADE_NOTIFY:
case -1:
res = -1;
break;

@ -701,40 +701,24 @@ static int chan_pjsip_write(struct ast_channel *ast, struct ast_frame *frame)
return res;
}
struct fixup_data {
struct ast_sip_session *session;
struct ast_channel *chan;
};
static int fixup(void *data)
{
struct fixup_data *fix_data = data;
struct ast_sip_channel_pvt *channel = ast_channel_tech_pvt(fix_data->chan);
struct chan_pjsip_pvt *pvt = channel->pvt;
channel->session->channel = fix_data->chan;
set_channel_on_rtp_instance(pvt, ast_channel_uniqueid(fix_data->chan));
return 0;
}
/*! \brief Function called by core to change the underlying owner channel */
static int chan_pjsip_fixup(struct ast_channel *oldchan, struct ast_channel *newchan)
{
struct ast_sip_channel_pvt *channel = ast_channel_tech_pvt(newchan);
struct fixup_data fix_data;
fix_data.session = channel->session;
fix_data.chan = newchan;
struct chan_pjsip_pvt *pvt = channel->pvt;
if (channel->session->channel != oldchan) {
return -1;
}
if (ast_sip_push_task_synchronous(channel->session->serializer, fixup, &fix_data)) {
ast_log(LOG_WARNING, "Unable to perform channel fixup\n");
return -1;
}
/*
* The masquerade has suspended the channel's session
* serializer so we can safely change it outside of
* the serializer thread.
*/
channel->session->channel = newchan;
set_channel_on_rtp_instance(pvt, ast_channel_uniqueid(newchan));
return 0;
}
@ -1211,6 +1195,24 @@ static int chan_pjsip_indicate(struct ast_channel *ast, int condition, const voi
case AST_CONTROL_PVT_CAUSE_CODE:
res = -1;
break;
case AST_CONTROL_MASQUERADE_NOTIFY:
ast_assert(datalen == sizeof(int));
if (*(int *) data) {
/*
* Masquerade is beginning:
* Wait for session serializer to get suspended.
*/
ast_channel_unlock(ast);
ast_sip_session_suspend(channel->session);
ast_channel_lock(ast);
} else {
/*
* Masquerade is complete:
* Unsuspend the session serializer.
*/
ast_sip_session_unsuspend(channel->session);
}
break;
case AST_CONTROL_HOLD:
chan_pjsip_add_hold(ast_channel_uniqueid(ast));
device_buf_size = strlen(ast_channel_name(ast)) + 1;

@ -7847,6 +7847,7 @@ static int sip_indicate(struct ast_channel *ast, int condition, const void *data
res = -1;
break;
case AST_CONTROL_PVT_CAUSE_CODE: /* these should be handled by the code in channel.c */
case AST_CONTROL_MASQUERADE_NOTIFY:
case -1:
res = -1;
break;

@ -5336,6 +5336,7 @@ static int skinny_indicate(struct ast_channel *ast, int ind, const void *data, s
ast_log(LOG_WARNING, "Don't know how to indicate condition %d\n", ind);
/* fallthrough */
case AST_CONTROL_PVT_CAUSE_CODE:
case AST_CONTROL_MASQUERADE_NOTIFY:
return -1; /* Tell asterisk to provide inband signalling */
}
return 0;

@ -5369,6 +5369,7 @@ static int unistim_indicate(struct ast_channel *ast, int ind, const void *data,
ast_log(LOG_WARNING, "Don't know how to indicate condition %d\n", ind);
/* fallthrough */
case AST_CONTROL_PVT_CAUSE_CODE:
case AST_CONTROL_MASQUERADE_NOTIFY:
return -1;
}

@ -334,6 +334,10 @@ static void print_frame(struct ast_frame *frame)
case AST_CONTROL_PVT_CAUSE_CODE:
ast_verbose("SubClass: PVT_CAUSE_CODE\n");
break;
case AST_CONTROL_MASQUERADE_NOTIFY:
/* Should never happen. */
ast_assert(0);
break;
case AST_CONTROL_STREAM_STOP:
ast_verbose("SubClass: STREAM_STOP\n");
break;

@ -290,6 +290,7 @@ enum ast_control_frame_type {
AST_CONTROL_MCID = 31, /*!< Indicate that the caller is being malicious. */
AST_CONTROL_UPDATE_RTP_PEER = 32, /*!< Interrupt the bridge and have it update the peer */
AST_CONTROL_PVT_CAUSE_CODE = 33, /*!< Contains an update to the protocol-specific cause-code stored for branching dials */
AST_CONTROL_MASQUERADE_NOTIFY = 34, /*!< A masquerade is about to begin/end. (Never sent as a frame but directly with ast_indicate_data().) */
/*
* WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING

@ -87,6 +87,9 @@ struct ast_sip_session_media {
*/
struct ast_sip_session_delayed_request;
/*! \brief Opaque struct controlling the suspension of the session's serializer. */
struct ast_sip_session_suspender;
/*!
* \brief A structure describing a SIP session
*
@ -96,43 +99,45 @@ struct ast_sip_session_delayed_request;
* to use the term "SIP session" to refer to the INVITE dialog itself.
*/
struct ast_sip_session {
/* Dialplan extension where incoming call is destined */
/*! Dialplan extension where incoming call is destined */
char exten[AST_MAX_EXTENSION];
/* The endpoint with which Asterisk is communicating */
/*! The endpoint with which Asterisk is communicating */
struct ast_sip_endpoint *endpoint;
/* The contact associated with this session */
/*! The contact associated with this session */
struct ast_sip_contact *contact;
/* The PJSIP details of the session, which includes the dialog */
/*! The PJSIP details of the session, which includes the dialog */
struct pjsip_inv_session *inv_session;
/* The Asterisk channel associated with the session */
/*! The Asterisk channel associated with the session */
struct ast_channel *channel;
/* Registered session supplements */
/*! Registered session supplements */
AST_LIST_HEAD(, ast_sip_session_supplement) supplements;
/* Datastores added to the session by supplements to the session */
/*! Datastores added to the session by supplements to the session */
struct ao2_container *datastores;
/* Media streams */
/*! Media streams */
struct ao2_container *media;
/* Serializer for tasks relating to this SIP session */
/*! Serializer for tasks relating to this SIP session */
struct ast_taskprocessor *serializer;
/* Requests that could not be sent due to current inv_session state */
/*! Non-null if the session serializer is suspended or being suspended. */
struct ast_sip_session_suspender *suspended;
/*! Requests that could not be sent due to current inv_session state */
AST_LIST_HEAD_NOLOCK(, ast_sip_session_delayed_request) delayed_requests;
/* When we need to reschedule a reinvite, we use this structure to do it */
/*! When we need to reschedule a reinvite, we use this structure to do it */
pj_timer_entry rescheduled_reinvite;
/* Format capabilities pertaining to direct media */
/*! Format capabilities pertaining to direct media */
struct ast_format_cap *direct_media_cap;
/* When we need to forcefully end the session */
/*! When we need to forcefully end the session */
pj_timer_entry scheduled_termination;
/* Identity of endpoint this session deals with */
/*! Identity of endpoint this session deals with */
struct ast_party_id id;
/* Requested capabilities */
/*! Requested capabilities */
struct ast_format_cap *req_caps;
/* Optional DSP, used only for inband DTMF detection if configured */
/*! Optional DSP, used only for inband DTMF detection if configured */
struct ast_dsp *dsp;
/* Whether the termination of the session should be deferred */
/*! Whether the termination of the session should be deferred */
unsigned int defer_terminate:1;
/* Deferred incoming re-invite */
/*! Deferred incoming re-invite */
pjsip_rx_data *deferred_reinvite;
/* Current T.38 state */
/*! Current T.38 state */
enum ast_sip_session_t38state t38state;
};
@ -387,6 +392,28 @@ struct ast_sip_channel_pvt *ast_sip_channel_pvt_alloc(void *pvt, struct ast_sip_
struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint,
struct ast_sip_contact *contact, pjsip_inv_session *inv);
/*!
* \brief Request and wait for the session serializer to be suspended.
* \since 12.7.0
*
* \param session Which session to suspend the serializer.
*
* \note No channel locks can be held while calling without risk of deadlock.
*
* \return Nothing
*/
void ast_sip_session_suspend(struct ast_sip_session *session);
/*!
* \brief Request the session serializer be unsuspended.
* \since 12.7.0
*
* \param session Which session to unsuspend the serializer.
*
* \return Nothing
*/
void ast_sip_session_unsuspend(struct ast_sip_session *session);
/*!
* \brief Create a new outgoing SIP session
*

@ -246,6 +246,16 @@ int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
*/
int ast_taskprocessor_execute(struct ast_taskprocessor *tps);
/*!
* \brief Am I the given taskprocessor's current task.
* \since 12.7.0
*
* \param tps Taskprocessor to check.
*
* \retval non-zero if current thread is the taskprocessor thread.
*/
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps);
/*!
* \brief Return the name of the taskprocessor singleton
* \since 1.6.1

@ -2065,6 +2065,10 @@ static void bridge_channel_handle_control(struct ast_bridge_channel *bridge_chan
ast_indicate(chan, -1);
}
break;
case AST_CONTROL_MASQUERADE_NOTIFY:
/* Should never happen. */
ast_assert(0);
break;
default:
ast_indicate_data(chan, fr->subclass.integer, fr->data.ptr, fr->datalen);
break;

@ -4285,6 +4285,7 @@ static int attribute_const is_visible_indication(enum ast_control_frame_type con
case AST_CONTROL_MCID:
case AST_CONTROL_UPDATE_RTP_PEER:
case AST_CONTROL_PVT_CAUSE_CODE:
case AST_CONTROL_MASQUERADE_NOTIFY:
case AST_CONTROL_STREAM_STOP:
case AST_CONTROL_STREAM_SUSPEND:
case AST_CONTROL_STREAM_REVERSE:
@ -4451,7 +4452,9 @@ int ast_indicate_data(struct ast_channel *chan, int _condition,
ast_channel_lock(chan);
/* Don't bother if the channel is about to go away, anyway. */
if (ast_test_flag(ast_channel_flags(chan), AST_FLAG_ZOMBIE) || ast_check_hangup(chan)) {
if ((ast_test_flag(ast_channel_flags(chan), AST_FLAG_ZOMBIE)
|| ast_check_hangup(chan))
&& condition != AST_CONTROL_MASQUERADE_NOTIFY) {
res = -1;
goto indicate_cleanup;
}
@ -4599,6 +4602,7 @@ int ast_indicate_data(struct ast_channel *chan, int _condition,
case AST_CONTROL_AOC:
case AST_CONTROL_END_OF_Q:
case AST_CONTROL_MCID:
case AST_CONTROL_MASQUERADE_NOTIFY:
case AST_CONTROL_UPDATE_RTP_PEER:
case AST_CONTROL_STREAM_STOP:
case AST_CONTROL_STREAM_SUSPEND:
@ -6445,6 +6449,11 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann
* original channel's backend. While the features are nice, which is the
* reason we're keeping it, it's still awesomely weird. XXX */
/* Indicate to each channel that a masquerade is about to begin. */
x = 1;
ast_indicate_data(original, AST_CONTROL_MASQUERADE_NOTIFY, &x, sizeof(x));
ast_indicate_data(clonechan, AST_CONTROL_MASQUERADE_NOTIFY, &x, sizeof(x));
/*
* The container lock is necessary for proper locking order
* because the channels must be unlinked to change their
@ -6485,8 +6494,9 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann
/* Start the masquerade channel contents rearangement. */
ast_channel_lock_both(original, clonechan);
ast_debug(4, "Actually Masquerading %s(%u) into the structure of %s(%u)\n",
ast_channel_name(clonechan), ast_channel_state(clonechan), ast_channel_name(original), ast_channel_state(original));
ast_debug(1, "Actually Masquerading %s(%u) into the structure of %s(%u)\n",
ast_channel_name(clonechan), ast_channel_state(clonechan),
ast_channel_name(original), ast_channel_state(original));
/*
* Remember the original read/write formats. We turn off any
@ -6759,6 +6769,19 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann
ast_channel_unlock(original);
ast_channel_unlock(clonechan);
/*
* Indicate to each channel that a masquerade is complete.
*
* We can still do this to clonechan even though it is a
* zombie because ast_indicate_data() will explicitly pass
* this control and ast_hangup() is held off until the
* ast_channel_masq() and ast_channel_masqr() pointers are
* cleared.
*/
x = 0;
ast_indicate_data(original, AST_CONTROL_MASQUERADE_NOTIFY, &x, sizeof(x));
ast_indicate_data(clonechan, AST_CONTROL_MASQUERADE_NOTIFY, &x, sizeof(x));
ast_bridge_notify_masquerade(original);
if (clone_hold_state == AST_CONTROL_HOLD) {

@ -530,6 +530,12 @@ int ast_unreal_indicate(struct ast_channel *ast, int condition, const void *data
ao2_ref(p, 1); /* ref for unreal_queue_frame */
switch (condition) {
case AST_CONTROL_MASQUERADE_NOTIFY:
/*
* Always block this because this is the channel being
* masqueraded; not anything down the chain.
*/
break;
case AST_CONTROL_CONNECTED_LINE:
case AST_CONTROL_REDIRECTING:
res = unreal_colp_redirect_indicate(p, ast, condition);

@ -78,9 +78,9 @@ struct ast_taskprocessor {
long tps_queue_size;
/*! \brief Taskprocessor queue */
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
/*! \brief Taskprocessor singleton list entry */
AST_LIST_ENTRY(ast_taskprocessor) list;
struct ast_taskprocessor_listener *listener;
/*! Current thread executing the tasks */
pthread_t thread;
/*! Indicates if the taskprocessor is currently executing a task */
unsigned int executing:1;
};
@ -600,6 +600,8 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
ao2_ref(listener, +1);
p->listener = listener;
p->thread = AST_PTHREADT_NULL;
ao2_ref(p, +1);
listener->tps = p;
@ -752,6 +754,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
return 0;
}
tps->thread = pthread_self();
tps->executing = 1;
if (t->wants_local) {
@ -768,6 +771,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
tps_task_free(t);
ao2_lock(tps);
tps->thread = AST_PTHREADT_NULL;
/* We need to check size in the same critical section where we reset the
* executing bit. Avoids a race condition where a task is pushed right
* after we pop an empty stack.
@ -789,3 +793,13 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
}
return size > 0;
}
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
{
int is_task;
ao2_lock(tps);
is_task = pthread_equal(tps->thread, pthread_self());
ao2_unlock(tps);
return is_task;
}

@ -1223,6 +1223,108 @@ struct ast_sip_session *ast_sip_session_alloc(struct ast_sip_endpoint *endpoint,
return session;
}
/*! \brief struct controlling the suspension of the session's serializer. */
struct ast_sip_session_suspender {
ast_cond_t cond_suspended;
ast_cond_t cond_complete;
int suspended;
int complete;
};
static void sip_session_suspender_dtor(void *vdoomed)
{
struct ast_sip_session_suspender *doomed = vdoomed;
ast_cond_destroy(&doomed->cond_suspended);
ast_cond_destroy(&doomed->cond_complete);
}
/*!
* \internal
* \brief Block the session serializer thread task.
*
* \param data Pushed serializer task data for suspension.
*
* \retval 0
*/
static int sip_session_suspend_task(void *data)
{
struct ast_sip_session_suspender *suspender = data;
ao2_lock(suspender);
/* Signal that the serializer task is now suspended. */
suspender->suspended = 1;
ast_cond_signal(&suspender->cond_suspended);
/* Wait for the the serializer suspension to be completed. */
while (!suspender->complete) {
ast_cond_wait(&suspender->cond_complete, ao2_object_get_lockaddr(suspender));
}
ao2_unlock(suspender);
ao2_ref(suspender, -1);
return 0;
}
void ast_sip_session_suspend(struct ast_sip_session *session)
{
struct ast_sip_session_suspender *suspender;
int res;
ast_assert(session->suspended == NULL);
if (ast_taskprocessor_is_task(session->serializer)) {
/* I am the session's serializer thread so I cannot suspend. */
return;
}
suspender = ao2_alloc(sizeof(*suspender), sip_session_suspender_dtor);
if (!suspender) {
/* We will just have to hope that the system does not deadlock */
return;
}
ast_cond_init(&suspender->cond_suspended, NULL);
ast_cond_init(&suspender->cond_complete, NULL);
ao2_ref(suspender, +1);
res = ast_sip_push_task(session->serializer, sip_session_suspend_task, suspender);
if (res) {
/* We will just have to hope that the system does not deadlock */
ao2_ref(suspender, -2);
return;
}
session->suspended = suspender;
/* Wait for the serializer to get suspended. */
ao2_lock(suspender);
while (!suspender->suspended) {
ast_cond_wait(&suspender->cond_suspended, ao2_object_get_lockaddr(suspender));
}
ao2_unlock(suspender);
}
void ast_sip_session_unsuspend(struct ast_sip_session *session)
{
struct ast_sip_session_suspender *suspender = session->suspended;
if (!suspender) {
/* Nothing to do */
return;
}
session->suspended = NULL;
/* Signal that the serializer task suspension is now complete. */
ao2_lock(suspender);
suspender->complete = 1;
ast_cond_signal(&suspender->cond_complete);
ao2_unlock(suspender);
ao2_ref(suspender, -1);
}
static int session_outbound_auth(pjsip_dialog *dlg, pjsip_tx_data *tdata, void *user_data)
{
pjsip_inv_session *inv = pjsip_dlg_get_inv_session(dlg);

@ -15,6 +15,8 @@
LINKER_SYMBOL_PREFIXast_sip_session_send_request;
LINKER_SYMBOL_PREFIXast_sip_session_create_invite;
LINKER_SYMBOL_PREFIXast_sip_session_create_outgoing;
LINKER_SYMBOL_PREFIXast_sip_session_suspend;
LINKER_SYMBOL_PREFIXast_sip_session_unsuspend;
LINKER_SYMBOL_PREFIXast_sip_dialog_get_session;
LINKER_SYMBOL_PREFIXast_sip_session_resume_reinvite;
LINKER_SYMBOL_PREFIXast_sip_channel_pvt_alloc;

Loading…
Cancel
Save