manager: Use separate lock for session event notification.

When notifying a manager session that new events were available
the same lock was used that was also held when doing things within
the session (such as sending events out). If the manager session
blocked for a period of time this would cause a back up of messages
in Stasis and would also block any other sessions from receiving
events.

This change adds a separate lock to the manager session which is
strictly used for notifying it that new events are available.

ASTERISK-28350

Change-Id: Ifbcac007faca9ad0231640f5e82a6ca9228f261b
13.26
Joshua Colp 6 years ago
parent 85f87790a5
commit 03708f435c

@ -1593,6 +1593,7 @@ struct mansession_session {
time_t noncetime; /*!< Timer for nonce value expiration */
unsigned long oldnonce; /*!< Stale nonce value */
unsigned long nc; /*!< incremental nonce counter */
ast_mutex_t notify_lock; /*!< Lock for notifying this session of events */
AST_LIST_HEAD_NOLOCK(mansession_datastores, ast_datastore) datastores; /*!< Data stores on the session */
AST_LIST_ENTRY(mansession_session) list;
};
@ -2211,6 +2212,8 @@ static void session_destructor(void *obj)
if (session->blackfilters) {
ao2_t_ref(session->blackfilters, -1, "decrement ref for black container, should be last one");
}
ast_mutex_destroy(&session->notify_lock);
}
/*! \brief Allocate manager session structure and add it to the list of sessions */
@ -2237,6 +2240,8 @@ static struct mansession_session *build_mansession(const struct ast_sockaddr *ad
newsession->send_events = -1;
ast_sockaddr_copy(&newsession->addr, addr);
ast_mutex_init(&newsession->notify_lock);
sessions = ao2_global_obj_ref(mgr_sessions);
if (sessions) {
ao2_link(sessions, newsession);
@ -4150,10 +4155,13 @@ static int action_waitevent(struct mansession *s, const struct message *m)
/* XXX maybe put an upper bound, or prevent the use of 0 ? */
}
ao2_lock(s->session);
ast_mutex_lock(&s->session->notify_lock);
if (s->session->waiting_thread != AST_PTHREADT_NULL) {
pthread_kill(s->session->waiting_thread, SIGURG);
}
ast_mutex_unlock(&s->session->notify_lock);
ao2_lock(s->session);
if (s->session->managerid) { /* AMI-over-HTTP session */
/*
@ -4176,8 +4184,9 @@ static int action_waitevent(struct mansession *s, const struct message *m)
}
ao2_unlock(s->session);
/* XXX should this go inside the lock ? */
ast_mutex_lock(&s->session->notify_lock);
s->session->waiting_thread = pthread_self(); /* let new events wake up this thread */
ast_mutex_unlock(&s->session->notify_lock);
ast_debug(1, "Starting waiting for an event!\n");
for (x = 0; x < timeout || timeout < 0; x++) {
@ -4185,17 +4194,19 @@ static int action_waitevent(struct mansession *s, const struct message *m)
if (AST_RWLIST_NEXT(s->session->last_ev, eq_next)) {
needexit = 1;
}
if (s->session->needdestroy) {
needexit = 1;
}
ao2_unlock(s->session);
/* We can have multiple HTTP session point to the same mansession entry.
* The way we deal with it is not very nice: newcomers kick out the previous
* HTTP session. XXX this needs to be improved.
*/
ast_mutex_lock(&s->session->notify_lock);
if (s->session->waiting_thread != pthread_self()) {
needexit = 1;
}
if (s->session->needdestroy) {
needexit = 1;
}
ao2_unlock(s->session);
ast_mutex_unlock(&s->session->notify_lock);
if (needexit) {
break;
}
@ -4209,9 +4220,14 @@ static int action_waitevent(struct mansession *s, const struct message *m)
}
ast_debug(1, "Finished waiting for an event!\n");
ao2_lock(s->session);
ast_mutex_lock(&s->session->notify_lock);
if (s->session->waiting_thread == pthread_self()) {
struct eventqent *eqe = s->session->last_ev;
s->session->waiting_thread = AST_PTHREADT_NULL;
ast_mutex_unlock(&s->session->notify_lock);
ao2_lock(s->session);
astman_send_response(s, m, "Success", "Waiting for Event completed.");
while ((eqe = advance_event(eqe))) {
if (((s->session->readperm & eqe->category) == eqe->category)
@ -4225,11 +4241,11 @@ static int action_waitevent(struct mansession *s, const struct message *m)
"Event: WaitEventComplete\r\n"
"%s"
"\r\n", idText);
s->session->waiting_thread = AST_PTHREADT_NULL;
ao2_unlock(s->session);
} else {
ast_mutex_unlock(&s->session->notify_lock);
ast_debug(1, "Abandoning event request!\n");
}
ao2_unlock(s->session);
return 0;
}
@ -6580,20 +6596,20 @@ static int get_input(struct mansession *s, char *output)
}
}
ao2_lock(s->session);
ast_mutex_lock(&s->session->notify_lock);
if (s->session->pending_event) {
s->session->pending_event = 0;
ao2_unlock(s->session);
ast_mutex_unlock(&s->session->notify_lock);
return 0;
}
s->session->waiting_thread = pthread_self();
ao2_unlock(s->session);
ast_mutex_unlock(&s->session->notify_lock);
res = ast_wait_for_input(s->session->fd, timeout);
ao2_lock(s->session);
ast_mutex_lock(&s->session->notify_lock);
s->session->waiting_thread = AST_PTHREADT_NULL;
ao2_unlock(s->session);
ast_mutex_unlock(&s->session->notify_lock);
}
if (res < 0) {
/* If we get a signal from some other thread (typically because
@ -6985,7 +7001,7 @@ static int __attribute__((format(printf, 9, 0))) __manager_event_sessions_va(
iter = ao2_iterator_init(sessions, 0);
while ((session = ao2_iterator_next(&iter))) {
ao2_lock(session);
ast_mutex_lock(&session->notify_lock);
if (session->waiting_thread != AST_PTHREADT_NULL) {
pthread_kill(session->waiting_thread, SIGURG);
} else {
@ -6996,7 +7012,7 @@ static int __attribute__((format(printf, 9, 0))) __manager_event_sessions_va(
*/
session->pending_event = 1;
}
ao2_unlock(session);
ast_mutex_unlock(&session->notify_lock);
unref_mansession(session);
}
ao2_iterator_destroy(&iter);
@ -7892,9 +7908,11 @@ static int generic_http_callback(struct ast_tcptls_session_instance *ser,
blastaway = 1;
} else {
ast_debug(1, "Need destroy, but can't do it yet!\n");
ast_mutex_lock(&session->notify_lock);
if (session->waiting_thread != AST_PTHREADT_NULL) {
pthread_kill(session->waiting_thread, SIGURG);
}
ast_mutex_unlock(&session->notify_lock);
session->inuse--;
}
} else {

Loading…
Cancel
Save