stasis: Use an implementation specific channel snapshot cache.

Channels no longer use the Stasis cache for channel snapshots. Instead
they are stored in a hash table in stasis_channels which reduces the
number of Stasis messages created and allows better storage.

As a result the following APIs are no longer available since the stasis
cache is no longer used:
ast_channel_topic_cached()
ast_channel_topic_all_cached()

The ast_channel_cache_all() and ast_channel_cache_by_name() functions
now return an ao2_container of ast_channel_snapshots rather than
a container of stasis_messages therefore you can't (and don't need
to) call stasis_cache functions on it.

The ast_channel_topic_all() function now returns a normal topic not
a cached one so you can't use stasis cache functions on it either.

The ast_channel_snapshot_type() stasis message now has the
ast_channel_snapshot_update structure as it's data. It contains the
last snapshot and the new one.

ast_channel_snapshot_get_latest() still returns the latest snapshot.

The latest snapshot is now stored on the channel itself to eliminate
cache hits when Stasis messages that have the snapshot as a payload
are created.

ASTERISK-28102

Change-Id: I9334febff60a82d7c39703e49059fa3a68825786
pull/12/head
Joshua Colp 7 years ago
parent 0a60bc1a68
commit d0ccbb3377

@ -19,6 +19,22 @@ chan_sip
https://wiki.asterisk.org/wiki/x/tAHOAQ https://wiki.asterisk.org/wiki/x/tAHOAQ
https://wiki.asterisk.org/wiki/x/hYCLAQ https://wiki.asterisk.org/wiki/x/hYCLAQ
Channels
------------------
* The core no longer uses the stasis cache for channels snapshots.
The following APIs are no longer available:
ast_channel_topic_cached()
ast_channel_topic_all_cached()
The ast_channel_cache_all() and ast_channel_cache_by_name() functions
now returns an ao2_container of ast_channel_snapshots rather than a
container of stasis_messages therefore you can't call stasis_cache
functions on it.
The ast_channel_topic_all() function now returns a normal topic,
not a cached one so you can't use stasis cache functions on it either.
The ast_channel_snapshot_type() stasis message now has the
ast_channel_snapshot_update structure as it's data.
ast_channel_snapshot_get_latest() still returns the latest snapshot.
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
--- Functionality changes from Asterisk 16.0.0 to Asterisk 16.1.0 ------------ --- Functionality changes from Asterisk 16.0.0 to Asterisk 16.1.0 ------------
------------------------------------------------------------------------------ ------------------------------------------------------------------------------

@ -43,3 +43,18 @@ res_parking:
res_xmpp: res_xmpp:
- The JabberStatus application, deprecated in Asterisk 12, has been removed. - The JabberStatus application, deprecated in Asterisk 12, has been removed.
Channels:
- The core no longer uses the stasis cache for channels snapshots.
The following APIs are no longer available:
ast_channel_topic_cached()
ast_channel_topic_all_cached()
The ast_channel_cache_all() and ast_channel_cache_by_name() functions
now returns an ao2_container of ast_channel_snapshots rather than a
container of stasis_messages therefore you can't call stasis_cache
functions on it.
The ast_channel_topic_all() function now returns a normal topic,
not a cached one so you can't use stasis cache functions on it either.
The ast_channel_snapshot_type() stasis message now has the
ast_channel_snapshot_update structure as it's data.
ast_channel_snapshot_get_latest() still returns the latest snapshot.

@ -1448,7 +1448,7 @@ static void send_agent_login(struct ast_channel *chan, const char *agent)
return; return;
} }
ast_channel_publish_cached_blob(chan, ast_channel_agent_login_type(), blob); ast_channel_publish_blob(chan, ast_channel_agent_login_type(), blob);
} }
static void send_agent_logoff(struct ast_channel *chan, const char *agent, long logintime) static void send_agent_logoff(struct ast_channel *chan, const char *agent, long logintime)
@ -1464,7 +1464,7 @@ static void send_agent_logoff(struct ast_channel *chan, const char *agent, long
return; return;
} }
ast_channel_publish_cached_blob(chan, ast_channel_agent_logoff_type(), blob); ast_channel_publish_blob(chan, ast_channel_agent_logoff_type(), blob);
} }
/*! /*!

@ -783,7 +783,7 @@ int manager_confbridge_init(void)
} }
channel_state_router = stasis_message_router_create( channel_state_router = stasis_message_router_create(
ast_channel_topic_all_cached()); ast_channel_topic_all());
if (!channel_state_router) { if (!channel_state_router) {
manager_confbridge_shutdown(); manager_confbridge_shutdown();

@ -1135,7 +1135,6 @@ static int chan_pjsip_devicestate(const char *data)
RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", data), ao2_cleanup); RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", data), ao2_cleanup);
enum ast_device_state state = AST_DEVICE_UNKNOWN; enum ast_device_state state = AST_DEVICE_UNKNOWN;
RAII_VAR(struct ast_endpoint_snapshot *, endpoint_snapshot, NULL, ao2_cleanup); RAII_VAR(struct ast_endpoint_snapshot *, endpoint_snapshot, NULL, ao2_cleanup);
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
struct ast_devstate_aggregate aggregate; struct ast_devstate_aggregate aggregate;
int num, inuse = 0; int num, inuse = 0;
@ -1156,27 +1155,20 @@ static int chan_pjsip_devicestate(const char *data)
state = AST_DEVICE_NOT_INUSE; state = AST_DEVICE_NOT_INUSE;
} }
if (!endpoint_snapshot->num_channels || !(cache = ast_channel_cache())) { if (!endpoint_snapshot->num_channels) {
return state; return state;
} }
ast_devstate_aggregate_init(&aggregate); ast_devstate_aggregate_init(&aggregate);
ao2_ref(cache, +1);
for (num = 0; num < endpoint_snapshot->num_channels; num++) { for (num = 0; num < endpoint_snapshot->num_channels; num++) {
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_channel_snapshot *snapshot; struct ast_channel_snapshot *snapshot;
msg = stasis_cache_get(cache, ast_channel_snapshot_type(), snapshot = ast_channel_snapshot_get_latest(endpoint_snapshot->channel_ids[num]);
endpoint_snapshot->channel_ids[num]); if (!snapshot) {
if (!msg) {
continue; continue;
} }
snapshot = stasis_message_data(msg);
if (chan_pjsip_get_hold(snapshot->uniqueid)) { if (chan_pjsip_get_hold(snapshot->uniqueid)) {
ast_devstate_aggregate_add(&aggregate, AST_DEVICE_ONHOLD); ast_devstate_aggregate_add(&aggregate, AST_DEVICE_ONHOLD);
} else { } else {
@ -1187,6 +1179,8 @@ static int chan_pjsip_devicestate(const char *data)
(snapshot->state == AST_STATE_BUSY)) { (snapshot->state == AST_STATE_BUSY)) {
inuse++; inuse++;
} }
ao2_ref(snapshot, -1);
} }
if (endpoint->devicestate_busy_at && (inuse == endpoint->devicestate_busy_at)) { if (endpoint->devicestate_busy_at && (inuse == endpoint->devicestate_busy_at)) {

@ -169,9 +169,8 @@ static int cli_channelstats_compare(void *obj, void *arg, int flags)
static int cli_message_to_snapshot(void *obj, void *arg, int flags) static int cli_message_to_snapshot(void *obj, void *arg, int flags)
{ {
struct stasis_message *message = obj; struct ast_channel_snapshot *snapshot = obj;
struct ao2_container *snapshots = arg; struct ao2_container *snapshots = arg;
struct ast_channel_snapshot *snapshot = stasis_message_data(message);
if (!strcmp(snapshot->type, "PJSIP")) { if (!strcmp(snapshot->type, "PJSIP")) {
ao2_link(snapshots, snapshot); ao2_link(snapshots, snapshot);
@ -198,8 +197,7 @@ static struct ao2_container *get_container(const char *regex, ao2_sort_fn sort_f
{ {
struct ao2_container *child_container; struct ao2_container *child_container;
regex_t regexbuf; regex_t regexbuf;
RAII_VAR(struct ao2_container *, parent_container, RAII_VAR(struct ao2_container *, parent_container, ast_channel_cache_by_name(), ao2_cleanup);
stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()), ao2_cleanup);
if (!parent_container) { if (!parent_container) {
return NULL; return NULL;

@ -147,6 +147,15 @@ extern "C" {
*/ */
#define AST_MAX_PUBLIC_UNIQUEID 149 #define AST_MAX_PUBLIC_UNIQUEID 149
/*!
* The number of buckets to store channels or channel information
*/
#ifdef LOW_MEMORY
#define AST_NUM_CHANNEL_BUCKETS 61
#else
#define AST_NUM_CHANNEL_BUCKETS 1567
#endif
/*! /*!
* Maximum size of an internal Asterisk channel unique ID. * Maximum size of an internal Asterisk channel unique ID.
* *
@ -2649,6 +2658,17 @@ void ast_channel_internal_swap_uniqueid_and_linkedid(struct ast_channel *a, stru
*/ */
void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b); void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b);
/*!
* \brief Swap snapshots beteween two channels
* \param a First channel
* \param b Second channel
* \return void
*
* \note
* This is used in masquerade to exchange snapshots
*/
void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b);
/*! /*!
* \brief Set uniqueid and linkedid string value only (not time) * \brief Set uniqueid and linkedid string value only (not time)
* \param chan The channel to set the uniqueid to * \param chan The channel to set the uniqueid to
@ -4236,6 +4256,8 @@ enum ast_channel_adsicpe ast_channel_adsicpe(const struct ast_channel *chan);
void ast_channel_adsicpe_set(struct ast_channel *chan, enum ast_channel_adsicpe value); void ast_channel_adsicpe_set(struct ast_channel *chan, enum ast_channel_adsicpe value);
enum ast_channel_state ast_channel_state(const struct ast_channel *chan); enum ast_channel_state ast_channel_state(const struct ast_channel *chan);
ast_callid ast_channel_callid(const struct ast_channel *chan); ast_callid ast_channel_callid(const struct ast_channel *chan);
struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan);
void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot);
/*! /*!
* \pre chan is locked * \pre chan is locked
@ -4561,21 +4583,6 @@ struct varshead *ast_channel_get_vars(struct ast_channel *chan);
*/ */
struct stasis_topic *ast_channel_topic(struct ast_channel *chan); struct stasis_topic *ast_channel_topic(struct ast_channel *chan);
/*!
* \since 12
* \brief A topic which publishes the events for a particular channel.
*
* \ref ast_channel_snapshot messages are replaced with \ref stasis_cache_update
*
* If the given \a chan is \c NULL, ast_channel_topic_all_cached() is returned.
*
* \param chan Channel, or \c NULL.
*
* \retval Topic for channel's events.
* \retval ast_channel_topic_all() if \a chan is \c NULL.
*/
struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan);
/*! /*!
* \brief Get the bridge associated with a channel * \brief Get the bridge associated with a channel
* \since 12.0.0 * \since 12.0.0

@ -75,6 +75,23 @@ struct ast_channel_snapshot {
struct varshead *ari_vars; /*!< Variables to be appended to ARI events */ struct varshead *ari_vars; /*!< Variables to be appended to ARI events */
}; };
/*!
* \since 17
* \brief Structure representing a change of snapshot of channel state.
*
* While not enforced programmatically, this object is shared across multiple
* threads, and should be treated as an immutable object.
*
* \note This structure will not have a transition of an old snapshot with no
* new snapshot to indicate that a channel has gone away. A new snapshot will
* always exist and a channel going away can be determined by checking for the
* AST_FLAG_DEAD flag on the new snapshot.
*/
struct ast_channel_snapshot_update {
struct ast_channel_snapshot *old_snapshot; /*!< The old channel snapshot */
struct ast_channel_snapshot *new_snapshot; /*!< The new channel snapshot */
};
/*! /*!
* \since 12 * \since 12
* \brief Blob of data associated with a channel. * \brief Blob of data associated with a channel.
@ -94,7 +111,7 @@ struct ast_channel_blob {
*/ */
struct ast_multi_channel_blob; struct ast_multi_channel_blob;
struct stasis_cp_all *ast_channel_cache_all(void); struct ao2_container *ast_channel_cache_all(void);
/*! /*!
* \since 12 * \since 12
@ -103,36 +120,19 @@ struct stasis_cp_all *ast_channel_cache_all(void);
*/ */
struct stasis_topic *ast_channel_topic_all(void); struct stasis_topic *ast_channel_topic_all(void);
/*!
* \since 12
* \brief A caching topic which caches \ref ast_channel_snapshot messages from
* ast_channel_events_all(void).
*
* \retval Topic for all channel events.
*/
struct stasis_topic *ast_channel_topic_all_cached(void);
/*!
* \since 12
* \brief Primary channel cache, indexed by Uniqueid.
*
* \retval Cache of \ref ast_channel_snapshot.
*/
struct stasis_cache *ast_channel_cache(void);
/*! /*!
* \since 12 * \since 12
* \brief Secondary channel cache, indexed by name. * \brief Secondary channel cache, indexed by name.
* *
* \retval Cache of \ref ast_channel_snapshot. * \retval Cache of \ref ast_channel_snapshot.
*/ */
struct stasis_cache *ast_channel_cache_by_name(void); struct ao2_container *ast_channel_cache_by_name(void);
/*! /*!
* \since 12 * \since 12
* \brief Message type for \ref ast_channel_snapshot. * \brief Message type for \ref ast_channel_snapshot_update.
* *
* \retval Message type for \ref ast_channel_snapshot. * \retval Message type for \ref ast_channel_snapshot_update.
*/ */
struct stasis_message_type *ast_channel_snapshot_type(void); struct stasis_message_type *ast_channel_snapshot_type(void);
@ -175,6 +175,18 @@ struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniquei
*/ */
struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name); struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name);
/*!
* \since 17
* \brief Send the final channel snapshot for a channel, thus removing it from cache
*
* \pre chan is locked
*
* \param chan The channel to send the final channel snapshot for
*
* \note This will also remove the cached snapshot from the channel itself
*/
void ast_channel_publish_final_snapshot(struct ast_channel *chan);
/*! /*!
* \since 12 * \since 12
* \brief Creates a \ref ast_channel_blob message. * \brief Creates a \ref ast_channel_blob message.
@ -303,6 +315,8 @@ void ast_multi_channel_blob_add_channel(struct ast_multi_channel_blob *obj,
* \param type Type of stasis message. * \param type Type of stasis message.
* \param blob The blob being published. (NULL if no blob) * \param blob The blob being published. (NULL if no blob)
* *
* \note This will use the current snapshot on the channel and will not generate a new one.
*
* \return Nothing * \return Nothing
*/ */
void ast_channel_publish_blob(struct ast_channel *chan, struct stasis_message_type *type, void ast_channel_publish_blob(struct ast_channel *chan, struct stasis_message_type *type,
@ -557,17 +571,6 @@ void ast_channel_publish_dial_forward(struct ast_channel *caller,
const char *dialstatus, const char *dialstatus,
const char *forward); const char *forward);
/*!
* \since 12
* \brief Publish in the \ref ast_channel_topic a \ref ast_channel_snapshot
* message indicating a change in channel state
*
* \pre chan is locked
*
* \param chan The channel whose state has changed
*/
void ast_publish_channel_state(struct ast_channel *chan);
/*! @} */ /*! @} */
/*! /*!

@ -1849,7 +1849,9 @@ static void aoc_publish_blob(struct ast_channel *chan, struct stasis_message_typ
} }
if (chan) { if (chan) {
aoc_event->snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan)); ast_channel_lock(chan);
aoc_event->snapshot = ao2_bump(ast_channel_snapshot(chan));
ast_channel_unlock(chan);
if (!aoc_event->snapshot) { if (!aoc_event->snapshot) {
ao2_ref(aoc_event, -1); ao2_ref(aoc_event, -1);
return; return;

@ -3244,15 +3244,7 @@ static struct stasis_message *mwi_state_create_message(
mwi_state->old_msgs = old_msgs; mwi_state->old_msgs = old_msgs;
if (!ast_strlen_zero(channel_id)) { if (!ast_strlen_zero(channel_id)) {
struct stasis_message *chan_message; mwi_state->snapshot = ast_channel_snapshot_get_latest(channel_id);
chan_message = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
channel_id);
if (chan_message) {
mwi_state->snapshot = stasis_message_data(chan_message);
ao2_ref(mwi_state->snapshot, +1);
}
ao2_cleanup(chan_message);
} }
if (eid) { if (eid) {

@ -5150,16 +5150,15 @@ static int bridge_show_specific_print_channel(void *obj, void *arg, int flags)
{ {
const char *uniqueid = obj; const char *uniqueid = obj;
struct ast_cli_args *a = arg; struct ast_cli_args *a = arg;
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct ast_channel_snapshot *snapshot; struct ast_channel_snapshot *snapshot;
msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid); snapshot = ast_channel_snapshot_get_latest(uniqueid);
if (!msg) { if (!snapshot) {
return 0; return 0;
} }
snapshot = stasis_message_data(msg);
ast_cli(a->fd, "Channel: %s\n", snapshot->name); ast_cli(a->fd, "Channel: %s\n", snapshot->name);
ao2_ref(snapshot, -1);
return 0; return 0;
} }

@ -186,14 +186,6 @@
</configInfo> </configInfo>
***/ ***/
/* The prime here should be similar in size to the channel container. */
#ifdef LOW_MEMORY
#define NUM_CDR_BUCKETS 61
#else
#define NUM_CDR_BUCKETS 769
#endif
#define DEFAULT_ENABLED "1" #define DEFAULT_ENABLED "1"
#define DEFAULT_BATCHMODE "0" #define DEFAULT_BATCHMODE "0"
#define DEFAULT_UNANSWERED "0" #define DEFAULT_UNANSWERED "0"
@ -2056,9 +2048,9 @@ static int filter_channel_snapshot(struct ast_channel_snapshot *snapshot)
/*! /*!
* \internal * \internal
* \brief Filter a channel cache update * \brief Filter a channel snapshot update
*/ */
static int filter_channel_cache_message(struct ast_channel_snapshot *old_snapshot, static int filter_channel_snapshot_message(struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot) struct ast_channel_snapshot *new_snapshot)
{ {
int ret = 0; int ret = 0;
@ -2256,52 +2248,38 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot,
} }
/*! /*!
* \brief Handler for Stasis-Core channel cache update messages * \brief Handler for channel snapshot update messages
* \param data Passed on * \param data Passed on
* \param sub The stasis subscription for this message callback * \param sub The stasis subscription for this message callback
* \param topic The topic this message was published for * \param topic The topic this message was published for
* \param message The message * \param message The message
*/ */
static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message) static void handle_channel_snapshot_update_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{ {
struct cdr_object *cdr; struct cdr_object *cdr;
struct stasis_cache_update *update = stasis_message_data(message); struct ast_channel_snapshot_update *update = stasis_message_data(message);
struct ast_channel_snapshot *old_snapshot;
struct ast_channel_snapshot *new_snapshot;
struct cdr_object *it_cdr; struct cdr_object *it_cdr;
ast_assert(update != NULL); if (filter_channel_snapshot_message(update->old_snapshot, update->new_snapshot)) {
ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
if (filter_channel_cache_message(old_snapshot, new_snapshot)) {
return; return;
} }
if (new_snapshot && !old_snapshot) { if (update->new_snapshot && !update->old_snapshot) {
cdr = cdr_object_alloc(new_snapshot); cdr = cdr_object_alloc(update->new_snapshot);
if (!cdr) { if (!cdr) {
return; return;
} }
cdr->is_root = 1; cdr->is_root = 1;
ao2_link(active_cdrs_master, cdr); ao2_link(active_cdrs_master, cdr);
} else { } else {
const char *uniqueid; cdr = ao2_find(active_cdrs_master, update->new_snapshot->uniqueid, OBJ_SEARCH_KEY);
uniqueid = new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid;
cdr = ao2_find(active_cdrs_master, uniqueid, OBJ_SEARCH_KEY);
} }
/* Handle Party A */ /* Handle Party A */
if (!cdr) { if (!cdr) {
const char *name; ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", update->new_snapshot->name);
name = new_snapshot ? new_snapshot->name : old_snapshot->name;
ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", name);
ast_assert(0); ast_assert(0);
} else if (new_snapshot) { } else {
int all_reject = 1; int all_reject = 1;
ao2_lock(cdr); ao2_lock(cdr);
@ -2309,21 +2287,23 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription
if (!it_cdr->fn_table->process_party_a) { if (!it_cdr->fn_table->process_party_a) {
continue; continue;
} }
all_reject &= it_cdr->fn_table->process_party_a(it_cdr, new_snapshot); all_reject &= it_cdr->fn_table->process_party_a(it_cdr, update->new_snapshot);
} }
if (all_reject && check_new_cdr_needed(old_snapshot, new_snapshot)) { if (all_reject && check_new_cdr_needed(update->old_snapshot, update->new_snapshot)) {
/* We're not hung up and we have a new snapshot - we need a new CDR */ /* We're not hung up and we have a new snapshot - we need a new CDR */
struct cdr_object *new_cdr; struct cdr_object *new_cdr;
new_cdr = cdr_object_create_and_append(cdr); new_cdr = cdr_object_create_and_append(cdr);
if (new_cdr) { if (new_cdr) {
new_cdr->fn_table->process_party_a(new_cdr, new_snapshot); new_cdr->fn_table->process_party_a(new_cdr, update->new_snapshot);
} }
} }
ao2_unlock(cdr); ao2_unlock(cdr);
} else { }
if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
ao2_lock(cdr); ao2_lock(cdr);
CDR_DEBUG("%p - Beginning finalize/dispatch for %s\n", cdr, old_snapshot->name); CDR_DEBUG("%p - Beginning finalize/dispatch for %s\n", cdr, update->old_snapshot->name);
for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) { for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) {
cdr_object_finalize(it_cdr); cdr_object_finalize(it_cdr);
} }
@ -2335,12 +2315,14 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription
} }
/* Handle Party B */ /* Handle Party B */
if (new_snapshot) { if (update->new_snapshot) {
ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY, ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY,
cdr_object_update_party_b, (char *) new_snapshot->name, new_snapshot); cdr_object_update_party_b, (char *) update->new_snapshot->name, update->new_snapshot);
} else { }
if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY, ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY,
cdr_object_finalize_party_b, (char *) old_snapshot->name, old_snapshot); cdr_object_finalize_party_b, (char *) update->new_snapshot->name, update->new_snapshot);
} }
ao2_cleanup(cdr); ao2_cleanup(cdr);
@ -4302,7 +4284,7 @@ static int create_subscriptions(void)
return 0; return 0;
} }
channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic); channel_subscription = stasis_forward_all(ast_channel_topic_all(), cdr_topic);
if (!channel_subscription) { if (!channel_subscription) {
return -1; return -1;
} }
@ -4522,7 +4504,7 @@ static int load_module(void)
return AST_MODULE_LOAD_FAILURE; return AST_MODULE_LOAD_FAILURE;
} }
stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL); stasis_message_router_add(stasis_router, ast_channel_snapshot_type(), handle_channel_snapshot_update_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL); stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL); stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL);
stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL); stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL);
@ -4530,14 +4512,14 @@ static int load_module(void)
stasis_message_router_add(stasis_router, cdr_sync_message_type(), handle_cdr_sync_message, NULL); stasis_message_router_add(stasis_router, cdr_sync_message_type(), handle_cdr_sync_message, NULL);
active_cdrs_master = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, active_cdrs_master = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
NUM_CDR_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn); AST_NUM_CHANNEL_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn);
if (!active_cdrs_master) { if (!active_cdrs_master) {
return AST_MODULE_LOAD_FAILURE; return AST_MODULE_LOAD_FAILURE;
} }
ao2_container_register("cdrs_master", active_cdrs_master, cdr_master_print_fn); ao2_container_register("cdrs_master", active_cdrs_master, cdr_master_print_fn);
active_cdrs_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, active_cdrs_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
NUM_CDR_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn); AST_NUM_CHANNEL_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn);
if (!active_cdrs_all) { if (!active_cdrs_all) {
return AST_MODULE_LOAD_FAILURE; return AST_MODULE_LOAD_FAILURE;
} }

@ -888,14 +888,6 @@ static void cel_channel_state_change(
{ {
int is_hungup, was_hungup; int is_hungup, was_hungup;
if (!new_snapshot) {
cel_report_event(old_snapshot, AST_CEL_CHANNEL_END, NULL, NULL, NULL);
if (ast_cel_track_event(AST_CEL_LINKEDID_END)) {
check_retire_linkedid(old_snapshot);
}
return;
}
if (!old_snapshot) { if (!old_snapshot) {
cel_report_event(new_snapshot, AST_CEL_CHANNEL_START, NULL, NULL, NULL); cel_report_event(new_snapshot, AST_CEL_CHANNEL_START, NULL, NULL, NULL);
return; return;
@ -915,6 +907,11 @@ static void cel_channel_state_change(
cel_report_event(new_snapshot, AST_CEL_HANGUP, NULL, extra, NULL); cel_report_event(new_snapshot, AST_CEL_HANGUP, NULL, extra, NULL);
ast_json_unref(extra); ast_json_unref(extra);
ao2_cleanup(dialstatus); ao2_cleanup(dialstatus);
cel_report_event(new_snapshot, AST_CEL_CHANNEL_END, NULL, NULL, NULL);
if (ast_cel_track_event(AST_CEL_LINKEDID_END)) {
check_retire_linkedid(new_snapshot);
}
return; return;
} }
@ -928,7 +925,7 @@ static void cel_channel_linkedid_change(
struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot) struct ast_channel_snapshot *new_snapshot)
{ {
if (!old_snapshot || !new_snapshot) { if (!old_snapshot) {
return; return;
} }
@ -946,8 +943,7 @@ static void cel_channel_app_change(
struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot) struct ast_channel_snapshot *new_snapshot)
{ {
if (new_snapshot && old_snapshot if (old_snapshot && !strcmp(old_snapshot->appl, new_snapshot->appl)) {
&& !strcmp(old_snapshot->appl, new_snapshot->appl)) {
return; return;
} }
@ -957,7 +953,7 @@ static void cel_channel_app_change(
} }
/* new snapshot has an application, start it */ /* new snapshot has an application, start it */
if (new_snapshot && !ast_strlen_zero(new_snapshot->appl)) { if (!ast_strlen_zero(new_snapshot->appl)) {
cel_report_event(new_snapshot, AST_CEL_APP_START, NULL, NULL, NULL); cel_report_event(new_snapshot, AST_CEL_APP_START, NULL, NULL, NULL);
} }
} }
@ -984,22 +980,15 @@ static int cel_filter_channel_snapshot(struct ast_channel_snapshot *snapshot)
static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub, static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
struct stasis_message *message) struct stasis_message *message)
{ {
struct stasis_cache_update *update = stasis_message_data(message); struct ast_channel_snapshot_update *update = stasis_message_data(message);
if (ast_channel_snapshot_type() == update->type) { size_t i;
struct ast_channel_snapshot *old_snapshot;
struct ast_channel_snapshot *new_snapshot;
size_t i;
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
if (cel_filter_channel_snapshot(old_snapshot) || cel_filter_channel_snapshot(new_snapshot)) { if (cel_filter_channel_snapshot(update->old_snapshot) || cel_filter_channel_snapshot(update->new_snapshot)) {
return; return;
} }
for (i = 0; i < ARRAY_LEN(cel_channel_monitors); ++i) { for (i = 0; i < ARRAY_LEN(cel_channel_monitors); ++i) {
cel_channel_monitors[i](old_snapshot, new_snapshot); cel_channel_monitors[i](update->old_snapshot, update->new_snapshot);
}
} }
} }
@ -1453,7 +1442,7 @@ static int create_subscriptions(void)
} }
cel_channel_forwarder = stasis_forward_all( cel_channel_forwarder = stasis_forward_all(
ast_channel_topic_all_cached(), ast_channel_topic_all(),
cel_aggregation_topic); cel_aggregation_topic);
if (!cel_channel_forwarder) { if (!cel_channel_forwarder) {
return -1; return -1;
@ -1498,7 +1487,7 @@ static int create_routes(void)
6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL);
ret |= stasis_message_router_add(cel_state_router, ret |= stasis_message_router_add(cel_state_router,
stasis_cache_update_type(), ast_channel_snapshot_type(),
cel_snapshot_update_cb, cel_snapshot_update_cb,
NULL); NULL);

@ -116,12 +116,6 @@ struct chanlist {
/*! \brief the list of registered channel types */ /*! \brief the list of registered channel types */
static AST_RWLIST_HEAD_STATIC(backends, chanlist); static AST_RWLIST_HEAD_STATIC(backends, chanlist);
#ifdef LOW_MEMORY
#define NUM_CHANNEL_BUCKETS 61
#else
#define NUM_CHANNEL_BUCKETS 1567
#endif
/*! \brief All active channels on the system */ /*! \brief All active channels on the system */
static struct ao2_container *channels; static struct ao2_container *channels;
@ -635,38 +629,6 @@ int ast_str2cause(const char *name)
return -1; return -1;
} }
static struct stasis_message *create_channel_snapshot_message(struct ast_channel *channel)
{
RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
if (!ast_channel_snapshot_type()) {
return NULL;
}
ast_channel_lock(channel);
snapshot = ast_channel_snapshot_create(channel);
ast_channel_unlock(channel);
if (!snapshot) {
return NULL;
}
return stasis_message_create(ast_channel_snapshot_type(), snapshot);
}
static void publish_cache_clear(struct ast_channel *chan)
{
RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
clear_msg = create_channel_snapshot_message(chan);
if (!clear_msg) {
return;
}
message = stasis_cache_clear_create(clear_msg);
stasis_publish(ast_channel_topic(chan), message);
}
/*! \brief Gives the string form of a given channel state. /*! \brief Gives the string form of a given channel state.
* *
* \note This function is not reentrant. * \note This function is not reentrant.
@ -1236,7 +1198,9 @@ int ast_queue_hold(struct ast_channel *chan, const char *musicclass)
"musicclass", musicclass); "musicclass", musicclass);
} }
ast_channel_publish_cached_blob(chan, ast_channel_hold_type(), blob); ast_channel_lock(chan);
ast_channel_publish_blob(chan, ast_channel_hold_type(), blob);
ast_channel_unlock(chan);
res = ast_queue_frame(chan, &f); res = ast_queue_frame(chan, &f);
@ -1250,7 +1214,9 @@ int ast_queue_unhold(struct ast_channel *chan)
struct ast_frame f = { AST_FRAME_CONTROL, .subclass.integer = AST_CONTROL_UNHOLD }; struct ast_frame f = { AST_FRAME_CONTROL, .subclass.integer = AST_CONTROL_UNHOLD };
int res; int res;
ast_channel_publish_cached_blob(chan, ast_channel_unhold_type(), NULL); ast_channel_lock(chan);
ast_channel_publish_blob(chan, ast_channel_unhold_type(), NULL);
ast_channel_unlock(chan);
res = ast_queue_frame(chan, &f); res = ast_queue_frame(chan, &f);
@ -2230,9 +2196,8 @@ static void ast_channel_destructor(void *obj)
ast_assert(!ast_test_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE)); ast_assert(!ast_test_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE));
ast_channel_lock(chan); ast_channel_lock(chan);
ast_channel_publish_snapshot(chan); ast_channel_publish_final_snapshot(chan);
ast_channel_unlock(chan); ast_channel_unlock(chan);
publish_cache_clear(chan);
} }
ast_channel_lock(chan); ast_channel_lock(chan);
@ -3344,7 +3309,7 @@ static void send_dtmf_begin_event(struct ast_channel *chan,
return; return;
} }
ast_channel_publish_cached_blob(chan, ast_channel_dtmf_begin_type(), blob); ast_channel_publish_blob(chan, ast_channel_dtmf_begin_type(), blob);
} }
static void send_dtmf_end_event(struct ast_channel *chan, static void send_dtmf_end_event(struct ast_channel *chan,
@ -3361,7 +3326,7 @@ static void send_dtmf_end_event(struct ast_channel *chan,
return; return;
} }
ast_channel_publish_cached_blob(chan, ast_channel_dtmf_end_type(), blob); ast_channel_publish_blob(chan, ast_channel_dtmf_end_type(), blob);
} }
static void ast_read_generator_actions(struct ast_channel *chan, struct ast_frame *f) static void ast_read_generator_actions(struct ast_channel *chan, struct ast_frame *f)
@ -6819,6 +6784,9 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann
/* Make sure the Stasis topic on the channel is updated appropriately */ /* Make sure the Stasis topic on the channel is updated appropriately */
ast_channel_internal_swap_topics(clonechan, original); ast_channel_internal_swap_topics(clonechan, original);
/* The old snapshots need to follow the channels so the snapshot update is correct */
ast_channel_internal_swap_snapshots(clonechan, original);
/* Swap channel names. This uses ast_channel_name_set directly, so we /* Swap channel names. This uses ast_channel_name_set directly, so we
* don't get any spurious rename events. * don't get any spurious rename events.
*/ */
@ -7246,7 +7214,7 @@ int ast_setstate(struct ast_channel *chan, enum ast_channel_state state)
ast_channel_state_set(chan, state); ast_channel_state_set(chan, state);
ast_publish_channel_state(chan); ast_channel_publish_snapshot(chan);
/* We have to pass AST_DEVICE_UNKNOWN here because it is entirely possible that the channel driver /* We have to pass AST_DEVICE_UNKNOWN here because it is entirely possible that the channel driver
* for this channel is using the callback method for device state. If we pass in an actual state here * for this channel is using the callback method for device state. If we pass in an actual state here
@ -7856,7 +7824,7 @@ static void channels_shutdown(void)
int ast_channels_init(void) int ast_channels_init(void)
{ {
channels = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, NUM_CHANNEL_BUCKETS, channels = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, AST_NUM_CHANNEL_BUCKETS,
ast_channel_hash_cb, NULL, ast_channel_cmp_cb); ast_channel_hash_cb, NULL, ast_channel_cmp_cb);
if (!channels) { if (!channels) {
return -1; return -1;

@ -42,7 +42,6 @@
#include "asterisk/channel_internal.h" #include "asterisk/channel_internal.h"
#include "asterisk/endpoints.h" #include "asterisk/endpoints.h"
#include "asterisk/indications.h" #include "asterisk/indications.h"
#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/stasis_channels.h" #include "asterisk/stasis_channels.h"
#include "asterisk/stasis_endpoints.h" #include "asterisk/stasis_endpoints.h"
#include "asterisk/stringfields.h" #include "asterisk/stringfields.h"
@ -215,12 +214,13 @@ struct ast_channel {
char dtmf_digit_to_emulate; /*!< Digit being emulated */ char dtmf_digit_to_emulate; /*!< Digit being emulated */
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */ char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_cp_single *topics; /*!< Topic for all channel's events */ struct stasis_topic *topic; /*!< Topic for trhis channel */
struct stasis_forward *channel_forward; /*!< Subscription for event forwarding to all channel topic */
struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */ struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
struct stasis_forward *endpoint_cache_forward; /*!< Subscription for cache updates to endpoint's topic */
struct ast_stream_topology *stream_topology; /*!< Stream topology */ struct ast_stream_topology *stream_topology; /*!< Stream topology */
void *stream_topology_change_source; /*!< Source that initiated a stream topology change */ void *stream_topology_change_source; /*!< Source that initiated a stream topology change */
struct ast_stream *default_streams[AST_MEDIA_TYPE_END]; /*!< Default streams indexed by media type */ struct ast_stream *default_streams[AST_MEDIA_TYPE_END]; /*!< Default streams indexed by media type */
struct ast_channel_snapshot *snapshot; /*!< The current up to date snapshot of the channel */
}; };
/*! \brief The monotonically increasing integer counter for channel uniqueids */ /*! \brief The monotonically increasing integer counter for channel uniqueids */
@ -1381,11 +1381,25 @@ void ast_channel_internal_swap_uniqueid_and_linkedid(struct ast_channel *a, stru
void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b) void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b)
{ {
struct stasis_cp_single *temp; struct stasis_topic *topic;
struct stasis_forward *forward;
temp = a->topics; topic = a->topic;
a->topics = b->topics; a->topic = b->topic;
b->topics = temp; b->topic = topic;
forward = a->channel_forward;
a->channel_forward = b->channel_forward;
b->channel_forward = forward;
}
void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b)
{
struct ast_channel_snapshot *snapshot;
snapshot = a->snapshot;
a->snapshot = b->snapshot;
b->snapshot = snapshot;
} }
void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid) void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid)
@ -1404,11 +1418,11 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
ast_string_field_free_memory(chan); ast_string_field_free_memory(chan);
chan->channel_forward = stasis_forward_cancel(chan->channel_forward);
chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward); chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
chan->endpoint_cache_forward = stasis_forward_cancel(chan->endpoint_cache_forward);
stasis_cp_single_unsubscribe(chan->topics); ao2_cleanup(chan->topic);
chan->topics = NULL; chan->topic = NULL;
ast_channel_internal_set_stream_topology(chan, NULL); ast_channel_internal_set_stream_topology(chan, NULL);
@ -1431,16 +1445,7 @@ struct stasis_topic *ast_channel_topic(struct ast_channel *chan)
return ast_channel_topic_all(); return ast_channel_topic_all();
} }
return stasis_cp_single_topic(chan->topics); return chan->topic;
}
struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan)
{
if (!chan) {
return ast_channel_topic_all_cached();
}
return stasis_cp_single_topic_cached(chan->topics);
} }
int ast_channel_forward_endpoint(struct ast_channel *chan, int ast_channel_forward_endpoint(struct ast_channel *chan,
@ -1456,28 +1461,28 @@ int ast_channel_forward_endpoint(struct ast_channel *chan,
return -1; return -1;
} }
chan->endpoint_cache_forward = stasis_forward_all(ast_channel_topic_cached(chan),
ast_endpoint_topic(endpoint));
if (!chan->endpoint_cache_forward) {
chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
return -1;
}
return 0; return 0;
} }
int ast_channel_internal_setup_topics(struct ast_channel *chan) int ast_channel_internal_setup_topics(struct ast_channel *chan)
{ {
const char *topic_name = chan->uniqueid.unique_id; const char *topic_name = chan->uniqueid.unique_id;
ast_assert(chan->topics == NULL); ast_assert(chan->topic == NULL);
if (ast_strlen_zero(topic_name)) { if (ast_strlen_zero(topic_name)) {
topic_name = "<dummy-channel>"; topic_name = "<dummy-channel>";
} }
chan->topics = stasis_cp_single_create( chan->topic = stasis_topic_create(topic_name);
ast_channel_cache_all(), topic_name); if (!chan->topic) {
if (!chan->topics) { return -1;
}
chan->channel_forward = stasis_forward_all(ast_channel_topic(chan),
ast_channel_topic_all());
if (!chan->channel_forward) {
ao2_ref(chan->topic, -1);
chan->topic = NULL;
return -1; return -1;
} }
@ -1568,3 +1573,14 @@ int ast_channel_is_multistream(struct ast_channel *chan)
{ {
return (chan->tech && chan->tech->read_stream && chan->tech->write_stream); return (chan->tech && chan->tech->read_stream && chan->tech->write_stream);
} }
struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan)
{
return chan->snapshot;
}
void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot)
{
ao2_cleanup(chan->snapshot);
chan->snapshot = ao2_bump(snapshot);
}

@ -956,7 +956,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar
struct ao2_container *channels; struct ao2_container *channels;
struct ao2_iterator it_chans; struct ao2_iterator it_chans;
struct stasis_message *msg; struct ast_channel_snapshot *cs;
int numchans = 0, concise = 0, verbose = 0, count = 0; int numchans = 0, concise = 0, verbose = 0, count = 0;
switch (cmd) { switch (cmd) {
@ -989,11 +989,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar
} else if (a->argc != e->args - 1) } else if (a->argc != e->args - 1)
return CLI_SHOWUSAGE; return CLI_SHOWUSAGE;
channels = ast_channel_cache_by_name();
if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) {
ast_cli(a->fd, "Failed to retrieve cached channels\n");
return CLI_SUCCESS;
}
if (!count) { if (!count) {
if (!concise && !verbose) if (!concise && !verbose)
@ -1004,8 +1000,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar
} }
it_chans = ao2_iterator_init(channels, 0); it_chans = ao2_iterator_init(channels, 0);
for (; (msg = ao2_iterator_next(&it_chans)); ao2_ref(msg, -1)) { for (; (cs = ao2_iterator_next(&it_chans)); ao2_ref(cs, -1)) {
struct ast_channel_snapshot *cs = stasis_message_data(msg);
char durbuf[16] = "-"; char durbuf[16] = "-";
if (!count) { if (!count) {
@ -1679,29 +1674,25 @@ char *ast_complete_channels(const char *line, const char *word, int pos, int sta
struct ao2_container *cached_channels; struct ao2_container *cached_channels;
char *ret = NULL; char *ret = NULL;
struct ao2_iterator iter; struct ao2_iterator iter;
struct stasis_message *msg; struct ast_channel_snapshot *snapshot;
if (pos != rpos) { if (pos != rpos) {
return NULL; return NULL;
} }
if (!(cached_channels = stasis_cache_dump(ast_channel_cache(), ast_channel_snapshot_type()))) { cached_channels = ast_channel_cache_all();
return NULL;
}
iter = ao2_iterator_init(cached_channels, 0); iter = ao2_iterator_init(cached_channels, 0);
for (; (msg = ao2_iterator_next(&iter)); ao2_ref(msg, -1)) { for (; (snapshot = ao2_iterator_next(&iter)); ao2_ref(snapshot, -1)) {
struct ast_channel_snapshot *snapshot = stasis_message_data(msg);
if (!strncasecmp(word, snapshot->name, wordlen) && (++which > state)) { if (!strncasecmp(word, snapshot->name, wordlen) && (++which > state)) {
if (state != -1) { if (state != -1) {
ret = ast_strdup(snapshot->name); ret = ast_strdup(snapshot->name);
ao2_ref(msg, -1); ao2_ref(snapshot, -1);
break; break;
} }
if (ast_cli_completion_add(ast_strdup(snapshot->name))) { if (ast_cli_completion_add(ast_strdup(snapshot->name))) {
ao2_ref(msg, -1); ao2_ref(snapshot, -1);
break; break;
} }
} }

@ -179,25 +179,23 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
return 0; return 0;
} }
/*! \brief Handler for channel snapshot cache clears */ /*! \brief Handler for channel snapshot update */
static void endpoint_cache_clear(void *data, static void endpoint_cache_clear(void *data,
struct stasis_subscription *sub, struct stasis_subscription *sub,
struct stasis_message *message) struct stasis_message *message)
{ {
struct ast_endpoint *endpoint = data; struct ast_endpoint *endpoint = data;
struct stasis_message *clear_msg = stasis_message_data(message); struct ast_channel_snapshot_update *update = stasis_message_data(message);
struct ast_channel_snapshot *clear_snapshot;
if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) { /* Only when the channel is dead do we remove it */
if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
return; return;
} }
clear_snapshot = stasis_message_data(clear_msg);
ast_assert(endpoint != NULL); ast_assert(endpoint != NULL);
ao2_lock(endpoint); ao2_lock(endpoint);
ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid); ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->uniqueid);
ao2_unlock(endpoint); ao2_unlock(endpoint);
endpoint_publish_snapshot(endpoint); endpoint_publish_snapshot(endpoint);
} }
@ -271,7 +269,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha
return NULL; return NULL;
} }
r |= stasis_message_router_add(endpoint->router, r |= stasis_message_router_add(endpoint->router,
stasis_cache_clear_type(), endpoint_cache_clear, ast_channel_snapshot_type(), endpoint_cache_clear,
endpoint); endpoint);
r |= stasis_message_router_add(endpoint->router, r |= stasis_message_router_add(endpoint->router,
stasis_subscription_change_type(), endpoint_subscription_change, stasis_subscription_change_type(), endpoint_subscription_change,

@ -6250,7 +6250,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m
int numchans = 0; int numchans = 0;
struct ao2_container *channels; struct ao2_container *channels;
struct ao2_iterator it_chans; struct ao2_iterator it_chans;
struct stasis_message *msg; struct ast_channel_snapshot *cs;
if (!ast_strlen_zero(actionid)) { if (!ast_strlen_zero(actionid)) {
snprintf(idText, sizeof(idText), "ActionID: %s\r\n", actionid); snprintf(idText, sizeof(idText), "ActionID: %s\r\n", actionid);
@ -6258,17 +6258,12 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m
idText[0] = '\0'; idText[0] = '\0';
} }
channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()); channels = ast_channel_cache_by_name();
if (!channels) {
astman_send_error(s, m, "Could not get cached channels");
return 0;
}
astman_send_listack(s, m, "Channels will follow", "start"); astman_send_listack(s, m, "Channels will follow", "start");
it_chans = ao2_iterator_init(channels, 0); it_chans = ao2_iterator_init(channels, 0);
for (; (msg = ao2_iterator_next(&it_chans)); ao2_ref(msg, -1)) { for (; (cs = ao2_iterator_next(&it_chans)); ao2_ref(cs, -1)) {
struct ast_channel_snapshot *cs = stasis_message_data(msg);
struct ast_str *built = ast_manager_build_channel_state_string_prefix(cs, ""); struct ast_str *built = ast_manager_build_channel_state_string_prefix(cs, "");
char durbuf[16] = ""; char durbuf[16] = "";

@ -528,17 +528,14 @@ static int send_bridge_info_item_cb(void *obj, void *arg, void *data, int flags)
char *uniqueid = obj; char *uniqueid = obj;
struct mansession *s = arg; struct mansession *s = arg;
struct bridge_list_data *list_data = data; struct bridge_list_data *list_data = data;
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
struct ast_channel_snapshot *snapshot;
RAII_VAR(struct ast_str *, channel_text, NULL, ast_free); RAII_VAR(struct ast_str *, channel_text, NULL, ast_free);
msg = stasis_cache_get(ast_channel_cache(), snapshot = ast_channel_snapshot_get_latest(uniqueid);
ast_channel_snapshot_type(), uniqueid); if (!snapshot) {
if (!msg) {
return 0; return 0;
} }
snapshot = stasis_message_data(msg);
if (snapshot->tech_properties & AST_CHAN_TP_INTERNAL) { if (snapshot->tech_properties & AST_CHAN_TP_INTERNAL) {
return 0; return 0;
} }

@ -576,11 +576,6 @@ static struct ast_manager_event_blob *channel_state_change(
{ {
int is_hungup, was_hungup; int is_hungup, was_hungup;
if (!new_snapshot) {
/* Ignore cache clearing events; we'll see the hangup first */
return NULL;
}
/* The Newchannel, Newstate and Hangup events are closely related, in /* The Newchannel, Newstate and Hangup events are closely related, in
* in that they are mutually exclusive, basically different flavors * in that they are mutually exclusive, basically different flavors
* of a new channel state event. * of a new channel state event.
@ -616,11 +611,6 @@ static struct ast_manager_event_blob *channel_newexten(
struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot) struct ast_channel_snapshot *new_snapshot)
{ {
/* No Newexten event on cache clear */
if (!new_snapshot) {
return NULL;
}
/* Empty application is not valid for a Newexten event */ /* Empty application is not valid for a Newexten event */
if (ast_strlen_zero(new_snapshot->appl)) { if (ast_strlen_zero(new_snapshot->appl)) {
return NULL; return NULL;
@ -654,8 +644,8 @@ static struct ast_manager_event_blob *channel_new_callerid(
struct ast_manager_event_blob *res; struct ast_manager_event_blob *res;
char *callerid; char *callerid;
/* No NewCallerid event on cache clear or first event */ /* No NewCallerid event on first channel snapshot */
if (!old_snapshot || !new_snapshot) { if (!old_snapshot) {
return NULL; return NULL;
} }
@ -682,8 +672,8 @@ static struct ast_manager_event_blob *channel_new_connected_line(
struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot) struct ast_channel_snapshot *new_snapshot)
{ {
/* No NewConnectedLine event on cache clear or first event */ /* No NewConnectedLine event on first channel snapshot */
if (!old_snapshot || !new_snapshot) { if (!old_snapshot) {
return NULL; return NULL;
} }
@ -699,7 +689,7 @@ static struct ast_manager_event_blob *channel_new_accountcode(
struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *old_snapshot,
struct ast_channel_snapshot *new_snapshot) struct ast_channel_snapshot *new_snapshot)
{ {
if (!old_snapshot || !new_snapshot) { if (!old_snapshot) {
return NULL; return NULL;
} }
@ -724,21 +714,14 @@ static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
struct stasis_message *message) struct stasis_message *message)
{ {
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
struct stasis_cache_update *update; struct ast_channel_snapshot_update *update;
struct ast_channel_snapshot *old_snapshot;
struct ast_channel_snapshot *new_snapshot;
size_t i; size_t i;
update = stasis_message_data(message); update = stasis_message_data(message);
ast_assert(ast_channel_snapshot_type() == update->type);
old_snapshot = stasis_message_data(update->old_snapshot);
new_snapshot = stasis_message_data(update->new_snapshot);
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup); RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
ev = channel_monitors[i](old_snapshot, new_snapshot); ev = channel_monitors[i](update->old_snapshot, update->new_snapshot);
if (!ev) { if (!ev) {
continue; continue;
@ -747,7 +730,7 @@ static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
/* If we haven't already, build the channel event string */ /* If we haven't already, build the channel event string */
if (!channel_event_string) { if (!channel_event_string) {
channel_event_string = channel_event_string =
ast_manager_build_channel_state_string(new_snapshot); ast_manager_build_channel_state_string(update->new_snapshot);
if (!channel_event_string) { if (!channel_event_string) {
return; return;
} }
@ -1260,7 +1243,7 @@ int manager_channels_init(void)
if (!message_router) { if (!message_router) {
return -1; return -1;
} }
channel_topic = ast_channel_topic_all_cached(); channel_topic = ast_channel_topic_all();
if (!channel_topic) { if (!channel_topic) {
return -1; return -1;
} }
@ -1272,7 +1255,7 @@ int manager_channels_init(void)
ast_register_cleanup(manager_channels_shutdown); ast_register_cleanup(manager_channels_shutdown);
ret |= stasis_message_router_add_cache_update(message_router, ret |= stasis_message_router_add(message_router,
ast_channel_snapshot_type(), channel_snapshot_update, NULL); ast_channel_snapshot_type(), channel_snapshot_update, NULL);
ret |= stasis_message_router_add(message_router, ret |= stasis_message_router_add(message_router,

@ -36,7 +36,6 @@
#include "asterisk/bridge.h" #include "asterisk/bridge.h"
#include "asterisk/translate.h" #include "asterisk/translate.h"
#include "asterisk/stasis.h" #include "asterisk/stasis.h"
#include "asterisk/stasis_cache_pattern.h"
#include "asterisk/stasis_channels.h" #include "asterisk/stasis_channels.h"
#include "asterisk/dial.h" #include "asterisk/dial.h"
#include "asterisk/linkedlists.h" #include "asterisk/linkedlists.h"
@ -117,60 +116,83 @@
#define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7 #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
static struct stasis_cp_all *channel_cache_all; static struct stasis_topic *channel_topic_all;
static struct stasis_cache *channel_cache_by_name; static struct ao2_container *channel_cache;
static struct stasis_caching_topic *channel_by_name_topic; static struct ao2_container *channel_cache_by_name;
struct stasis_cp_all *ast_channel_cache_all(void) struct ao2_container *ast_channel_cache_all(void)
{ {
return channel_cache_all; return ao2_bump(channel_cache);
}
struct stasis_cache *ast_channel_cache(void)
{
return stasis_cp_all_cache(channel_cache_all);
} }
struct stasis_topic *ast_channel_topic_all(void) struct stasis_topic *ast_channel_topic_all(void)
{ {
return stasis_cp_all_topic(channel_cache_all); return channel_topic_all;
} }
struct stasis_topic *ast_channel_topic_all_cached(void) struct ao2_container *ast_channel_cache_by_name(void)
{ {
return stasis_cp_all_topic_cached(channel_cache_all); return ao2_bump(channel_cache_by_name);
} }
struct stasis_cache *ast_channel_cache_by_name(void) /*!
* \internal
* \brief Hash function for \ref ast_channel_snapshot objects
*/
static int channel_snapshot_hash_cb(const void *obj, const int flags)
{ {
return channel_cache_by_name; const struct ast_channel_snapshot *object = obj;
} const char *key;
static const char *channel_snapshot_get_id(struct stasis_message *message) switch (flags & OBJ_SEARCH_MASK) {
{ case OBJ_SEARCH_KEY:
struct ast_channel_snapshot *snapshot; key = obj;
if (ast_channel_snapshot_type() != stasis_message_type(message)) { break;
return NULL; case OBJ_SEARCH_OBJECT:
key = object->name;
break;
default:
ast_assert(0);
return 0;
} }
snapshot = stasis_message_data(message); return ast_str_case_hash(key);
return snapshot->uniqueid;
} }
static const char *channel_snapshot_get_name(struct stasis_message *message) /*!
* \internal
* \brief Comparison function for \ref ast_channel_snapshot objects
*/
static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags)
{ {
struct ast_channel_snapshot *snapshot; const struct ast_channel_snapshot *object_left = obj;
if (ast_channel_snapshot_type() != stasis_message_type(message)) { const struct ast_channel_snapshot *object_right = arg;
return NULL; const char *right_key = arg;
int cmp;
switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT:
right_key = object_right->name;
case OBJ_SEARCH_KEY:
cmp = strcasecmp(object_left->name, right_key);
break;
case OBJ_SEARCH_PARTIAL_KEY:
cmp = strncasecmp(object_left->name, right_key, strlen(right_key));
break;
default:
cmp = 0;
break;
} }
snapshot = stasis_message_data(message); if (cmp) {
return snapshot->name; return 0;
}
return CMP_MATCH;
} }
/*! /*!
* \internal * \internal
* \brief Hash function for \ref ast_channel_snapshot objects * \brief Hash function (using uniqueid) for \ref ast_channel_snapshot objects
*/ */
static int channel_snapshot_hash_cb(const void *obj, const int flags) static int channel_snapshot_uniqueid_hash_cb(const void *obj, const int flags)
{ {
const struct ast_channel_snapshot *object = obj; const struct ast_channel_snapshot *object = obj;
const char *key; const char *key;
@ -180,7 +202,7 @@ static int channel_snapshot_hash_cb(const void *obj, const int flags)
key = obj; key = obj;
break; break;
case OBJ_SEARCH_OBJECT: case OBJ_SEARCH_OBJECT:
key = object->name; key = object->uniqueid;
break; break;
default: default:
ast_assert(0); ast_assert(0);
@ -191,9 +213,9 @@ static int channel_snapshot_hash_cb(const void *obj, const int flags)
/*! /*!
* \internal * \internal
* \brief Comparison function for \ref ast_channel_snapshot objects * \brief Comparison function (using uniqueid) for \ref ast_channel_snapshot objects
*/ */
static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags) static int channel_snapshot_uniqueid_cmp_cb(void *obj, void *arg, int flags)
{ {
const struct ast_channel_snapshot *object_left = obj; const struct ast_channel_snapshot *object_left = obj;
const struct ast_channel_snapshot *object_right = arg; const struct ast_channel_snapshot *object_right = arg;
@ -202,12 +224,12 @@ static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags)
switch (flags & OBJ_SEARCH_MASK) { switch (flags & OBJ_SEARCH_MASK) {
case OBJ_SEARCH_OBJECT: case OBJ_SEARCH_OBJECT:
right_key = object_right->name; right_key = object_right->uniqueid;
case OBJ_SEARCH_KEY: case OBJ_SEARCH_KEY:
cmp = strcasecmp(object_left->name, right_key); cmp = strcasecmp(object_left->uniqueid, right_key);
break; break;
case OBJ_SEARCH_PARTIAL_KEY: case OBJ_SEARCH_PARTIAL_KEY:
cmp = strncasecmp(object_left->name, right_key, strlen(right_key)); cmp = strncasecmp(object_left->uniqueid, right_key, strlen(right_key));
break; break;
default: default:
cmp = 0; cmp = 0;
@ -309,6 +331,34 @@ struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *cha
return snapshot; return snapshot;
} }
static void channel_snapshot_update_dtor(void *obj)
{
struct ast_channel_snapshot_update *update = obj;
ao2_cleanup(update->old_snapshot);
ao2_cleanup(update->new_snapshot);
}
static struct ast_channel_snapshot_update *channel_snapshot_update_create(struct ast_channel *chan)
{
struct ast_channel_snapshot_update *update;
update = ao2_alloc_options(sizeof(*update), channel_snapshot_update_dtor,
AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!update) {
return NULL;
}
update->old_snapshot = ao2_bump(ast_channel_snapshot(chan));
update->new_snapshot = ast_channel_snapshot_create(chan);
if (!update->new_snapshot) {
ao2_ref(update, -1);
return NULL;
}
return update;
}
static void publish_message_for_channel_topics(struct stasis_message *message, struct ast_channel *chan) static void publish_message_for_channel_topics(struct stasis_message *message, struct ast_channel *chan)
{ {
if (chan) { if (chan) {
@ -521,7 +571,7 @@ struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
return NULL; return NULL;
} }
snapshot = chan ? ast_channel_snapshot_create(chan) : NULL; snapshot = chan ? ao2_bump(ast_channel_snapshot(chan)) : NULL;
msg = create_channel_blob_message(snapshot, type, blob); msg = create_channel_blob_message(snapshot, type, blob);
ao2_cleanup(snapshot); ao2_cleanup(snapshot);
return msg; return msg;
@ -628,38 +678,48 @@ struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *bl
struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid) struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid)
{ {
struct stasis_message *message;
struct ast_channel_snapshot *snapshot;
ast_assert(!ast_strlen_zero(uniqueid)); ast_assert(!ast_strlen_zero(uniqueid));
message = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), return ao2_find(channel_cache, uniqueid, OBJ_SEARCH_KEY);
uniqueid);
if (!message) {
return NULL;
}
snapshot = ao2_bump(stasis_message_data(message));
ao2_ref(message, -1);
return snapshot;
} }
struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name) struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name)
{ {
ast_assert(!ast_strlen_zero(name));
return ao2_find(channel_cache_by_name, name, OBJ_SEARCH_KEY);
}
void ast_channel_publish_final_snapshot(struct ast_channel *chan)
{
struct ast_channel_snapshot_update *update;
struct stasis_message *message; struct stasis_message *message;
struct ast_channel_snapshot *snapshot;
ast_assert(!ast_strlen_zero(name)); if (!ast_channel_snapshot_type()) {
return;
}
message = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), update = channel_snapshot_update_create(chan);
name); if (!update) {
return;
}
message = stasis_message_create(ast_channel_snapshot_type(), update);
/* In the success path message holds a reference to update so it will be valid
* for the lifetime of this function until the end.
*/
ao2_ref(update, -1);
if (!message) { if (!message) {
return NULL; return;
} }
snapshot = ao2_bump(stasis_message_data(message)); ao2_unlink(channel_cache, update->old_snapshot);
ao2_unlink(channel_cache_by_name, update->old_snapshot);
ast_channel_snapshot_set(chan, NULL);
stasis_publish(ast_channel_topic(chan), message);
ao2_ref(message, -1); ao2_ref(message, -1);
return snapshot;
} }
static void channel_role_snapshot_dtor(void *obj) static void channel_role_snapshot_dtor(void *obj)
@ -764,7 +824,7 @@ void ast_channel_stage_snapshot_done(struct ast_channel *chan)
void ast_channel_publish_snapshot(struct ast_channel *chan) void ast_channel_publish_snapshot(struct ast_channel *chan)
{ {
struct ast_channel_snapshot *snapshot; struct ast_channel_snapshot_update *update;
struct stasis_message *message; struct stasis_message *message;
if (!ast_channel_snapshot_type()) { if (!ast_channel_snapshot_type()) {
@ -775,17 +835,40 @@ void ast_channel_publish_snapshot(struct ast_channel *chan)
return; return;
} }
snapshot = ast_channel_snapshot_create(chan); update = channel_snapshot_update_create(chan);
if (!snapshot) { if (!update) {
return; return;
} }
message = stasis_message_create(ast_channel_snapshot_type(), snapshot); message = stasis_message_create(ast_channel_snapshot_type(), update);
ao2_ref(snapshot, -1); /* In the success path message holds a reference to update so it will be valid
* for the lifetime of this function until the end.
*/
ao2_ref(update, -1);
if (!message) { if (!message) {
return; return;
} }
/* We lock these ourselves so that the update is atomic and there isn't time where a
* snapshot is not in the cache.
*/
ao2_wrlock(channel_cache);
if (update->old_snapshot) {
ao2_unlink_flags(channel_cache, update->old_snapshot, OBJ_NOLOCK);
}
ao2_link_flags(channel_cache, update->new_snapshot, OBJ_NOLOCK);
ao2_unlock(channel_cache);
/* The same applies here. */
ao2_wrlock(channel_cache_by_name);
if (update->old_snapshot) {
ao2_unlink_flags(channel_cache_by_name, update->old_snapshot, OBJ_NOLOCK);
}
ao2_link_flags(channel_cache_by_name, update->new_snapshot, OBJ_NOLOCK);
ao2_unlock(channel_cache_by_name);
ast_channel_snapshot_set(chan, update->new_snapshot);
ast_assert(ast_channel_topic(chan) != NULL); ast_assert(ast_channel_topic(chan) != NULL);
stasis_publish(ast_channel_topic(chan), message); stasis_publish(ast_channel_topic(chan), message);
ao2_ref(message, -1); ao2_ref(message, -1);
@ -841,13 +924,8 @@ void ast_channel_publish_varset(struct ast_channel *chan, const char *name, cons
ast_channel_publish_snapshot(chan); ast_channel_publish_snapshot(chan);
} }
if (chan) { /* This function is NULL safe for global variables */
ast_channel_publish_cached_blob(chan, ast_channel_varset_type(), blob); ast_channel_publish_blob(chan, ast_channel_varset_type(), blob);
} else {
/* This function is NULL safe for global variables */
ast_channel_publish_blob(NULL, ast_channel_varset_type(), blob);
}
ast_json_unref(blob); ast_json_unref(blob);
} }
@ -931,36 +1009,6 @@ static struct ast_manager_event_blob *agent_logoff_to_ami(struct stasis_message
return ev; return ev;
} }
void ast_publish_channel_state(struct ast_channel *chan)
{
struct ast_channel_snapshot *snapshot;
struct stasis_message *message;
if (!ast_channel_snapshot_type()) {
return;
}
ast_assert(chan != NULL);
if (!chan) {
return;
}
snapshot = ast_channel_snapshot_create(chan);
if (!snapshot) {
return;
}
message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
ao2_ref(snapshot, -1);
if (!message) {
return;
}
ast_assert(ast_channel_topic(chan) != NULL);
stasis_publish(ast_channel_topic(chan), message);
ao2_ref(message, -1);
}
struct ast_json *ast_channel_snapshot_to_json( struct ast_json *ast_channel_snapshot_to_json(
const struct ast_channel_snapshot *snapshot, const struct ast_channel_snapshot *snapshot,
const struct stasis_message_sanitizer *sanitize) const struct stasis_message_sanitizer *sanitize)
@ -1332,12 +1380,12 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_talking_stop,
static void stasis_channels_cleanup(void) static void stasis_channels_cleanup(void)
{ {
stasis_caching_unsubscribe_and_join(channel_by_name_topic); ao2_cleanup(channel_topic_all);
channel_by_name_topic = NULL; channel_topic_all = NULL;
ao2_cleanup(channel_cache);
channel_cache = NULL;
ao2_cleanup(channel_cache_by_name); ao2_cleanup(channel_cache_by_name);
channel_cache_by_name = NULL; channel_cache_by_name = NULL;
ao2_cleanup(channel_cache_all);
channel_cache_all = NULL;
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
@ -1367,29 +1415,28 @@ int ast_stasis_channels_init(void)
ast_register_cleanup(stasis_channels_cleanup); ast_register_cleanup(stasis_channels_cleanup);
channel_cache_all = stasis_cp_all_create("ast_channel_topic_all", channel_topic_all = stasis_topic_create("ast_channel_topic_all");
channel_snapshot_get_id); if (!channel_topic_all) {
if (!channel_cache_all) {
return -1; return -1;
} }
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name); channel_cache = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK,
if (!channel_cache_by_name) { 0, AST_NUM_CHANNEL_BUCKETS, channel_snapshot_uniqueid_hash_cb,
NULL, channel_snapshot_uniqueid_cmp_cb);
if (!channel_cache) {
return -1; return -1;
} }
/* This should be initialized before the caching topic */ channel_cache_by_name = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK,
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type); 0, AST_NUM_CHANNEL_BUCKETS, channel_snapshot_hash_cb,
NULL, channel_snapshot_cmp_cb);
channel_by_name_topic = stasis_caching_topic_create( if (!channel_cache_by_name) {
stasis_cp_all_topic(channel_cache_all),
channel_cache_by_name);
if (!channel_by_name_topic) {
return -1; return -1;
} }
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type);
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type);
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);

@ -833,32 +833,19 @@ void ast_ari_channels_get(struct ast_variable *headers,
struct ast_ari_channels_get_args *args, struct ast_ari_channels_get_args *args,
struct ast_ari_response *response) struct ast_ari_response *response)
{ {
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
struct stasis_cache *cache;
struct ast_channel_snapshot *snapshot; struct ast_channel_snapshot *snapshot;
cache = ast_channel_cache(); snapshot = ast_channel_snapshot_get_latest(args->channel_id);
if (!cache) { if (!snapshot) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"Message bus not initialized");
return;
}
msg = stasis_cache_get(cache, ast_channel_snapshot_type(),
args->channel_id);
if (!msg) {
ast_ari_response_error( ast_ari_response_error(
response, 404, "Not Found", response, 404, "Not Found",
"Channel not found"); "Channel not found");
return; return;
} }
snapshot = stasis_message_data(msg);
ast_assert(snapshot != NULL);
ast_ari_response_ok(response, ast_ari_response_ok(response,
ast_channel_snapshot_to_json(snapshot, NULL)); ast_channel_snapshot_to_json(snapshot, NULL));
ao2_ref(snapshot, -1);
} }
void ast_ari_channels_hangup(struct ast_variable *headers, void ast_ari_channels_hangup(struct ast_variable *headers,
@ -903,27 +890,13 @@ void ast_ari_channels_list(struct ast_variable *headers,
struct ast_ari_channels_list_args *args, struct ast_ari_channels_list_args *args,
struct ast_ari_response *response) struct ast_ari_response *response)
{ {
RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup);
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
struct ao2_iterator i; struct ao2_iterator i;
void *obj; void *obj;
struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer();
cache = ast_channel_cache(); snapshots = ast_channel_cache_all();
if (!cache) {
ast_ari_response_error(
response, 500, "Internal Server Error",
"Message bus not initialized");
return;
}
ao2_ref(cache, +1);
snapshots = stasis_cache_dump(cache, ast_channel_snapshot_type());
if (!snapshots) {
ast_ari_response_alloc_failed(response);
return;
}
json = ast_json_array_create(); json = ast_json_array_create();
if (!json) { if (!json) {
@ -933,12 +906,12 @@ void ast_ari_channels_list(struct ast_variable *headers,
i = ao2_iterator_init(snapshots, 0); i = ao2_iterator_init(snapshots, 0);
while ((obj = ao2_iterator_next(&i))) { while ((obj = ao2_iterator_next(&i))) {
RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup); struct ast_channel_snapshot *snapshot = obj;
struct ast_channel_snapshot *snapshot = stasis_message_data(msg);
int r; int r;
if (sanitize && sanitize->channel_snapshot if (sanitize && sanitize->channel_snapshot
&& sanitize->channel_snapshot(snapshot)) { && sanitize->channel_snapshot(snapshot)) {
ao2_ref(snapshot, -1);
continue; continue;
} }
@ -947,8 +920,10 @@ void ast_ari_channels_list(struct ast_variable *headers,
if (r != 0) { if (r != 0) {
ast_ari_response_alloc_failed(response); ast_ari_response_alloc_failed(response);
ao2_iterator_destroy(&i); ao2_iterator_destroy(&i);
ao2_ref(snapshot, -1);
return; return;
} }
ao2_ref(snapshot, -1);
} }
ao2_iterator_destroy(&i); ao2_iterator_destroy(&i);

@ -3182,13 +3182,13 @@ static int handle_channelstatus(struct ast_channel *chan, AGI *agi, int argc, co
ast_agi_send(agi->fd, chan, "200 result=%u\n", ast_channel_state(chan)); ast_agi_send(agi->fd, chan, "200 result=%u\n", ast_channel_state(chan));
return RESULT_SUCCESS; return RESULT_SUCCESS;
} else if (argc == 3) { } else if (argc == 3) {
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_channel_snapshot *snapshot;
/* one argument: look for info on the specified channel */ /* one argument: look for info on the specified channel */
if ((msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), argv[2]))) { snapshot = ast_channel_snapshot_get_latest_by_name(argv[2]);
struct ast_channel_snapshot *snapshot = stasis_message_data(msg); if (snapshot) {
ast_agi_send(agi->fd, chan, "200 result=%u\n", snapshot->state); ast_agi_send(agi->fd, chan, "200 result=%u\n", snapshot->state);
ao2_ref(snapshot, -1);
return RESULT_SUCCESS; return RESULT_SUCCESS;
} }
/* if we get this far no channel name matched the argument given */ /* if we get this far no channel name matched the argument given */

@ -78,7 +78,7 @@ static void statsmaker(void *data, struct stasis_subscription *sub,
} }
/*! /*!
* \brief Router callback for \ref stasis_cache_update messages. * \brief Router callback for \ref ast_channel_snapshot_update messages.
* \param data Data pointer given when added to router. * \param data Data pointer given when added to router.
* \param sub This subscription. * \param sub This subscription.
* \param topic The topic the message was posted to. This is not necessarily the * \param topic The topic the message was posted to. This is not necessarily the
@ -92,34 +92,25 @@ static void updates(void *data, struct stasis_subscription *sub,
/* Since this came from a message router, we know the type of the /* Since this came from a message router, we know the type of the
* message. We can cast the data without checking its type. * message. We can cast the data without checking its type.
*/ */
struct stasis_cache_update *update = stasis_message_data(message); struct ast_channel_snapshot_update *update = stasis_message_data(message);
/* We're only interested in channel snapshots, so check the type /* There are three types of channel snapshot updates.
* of the underlying message. * !old && new -> Initial channel creation
*/ * old && new -> Updated channel snapshot
if (ast_channel_snapshot_type() != update->type) { * old && dead -> Final channel snapshot
return;
}
/* There are three types of cache updates.
* !old && new -> Initial cache entry
* old && new -> Updated cache entry
* old && !new -> Cache entry removed.
*/ */
if (!update->old_snapshot && update->new_snapshot) { if (!update->old_snapshot && update->new_snapshot) {
/* Initial cache entry; count a channel creation */ /* Initial channel snapshot; count a channel creation */
ast_statsd_log_string("channels.count", AST_STATSD_GAUGE, "+1", 1.0); ast_statsd_log_string("channels.count", AST_STATSD_GAUGE, "+1", 1.0);
} else if (update->old_snapshot && !update->new_snapshot) { } else if (update->old_snapshot && ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
/* Cache entry removed. Compute the age of the channel and post /* Channel is gone. Compute the age of the channel and post
* that, as well as decrementing the channel count. * that, as well as decrementing the channel count.
*/ */
struct ast_channel_snapshot *last;
int64_t age; int64_t age;
last = stasis_message_data(update->old_snapshot);
age = ast_tvdiff_ms(*stasis_message_timestamp(message), age = ast_tvdiff_ms(*stasis_message_timestamp(message),
last->creationtime); update->new_snapshot->creationtime);
ast_statsd_log("channels.calltime", AST_STATSD_TIMER, age); ast_statsd_log("channels.calltime", AST_STATSD_TIMER, age);
/* And decrement the channel count */ /* And decrement the channel count */
@ -161,11 +152,11 @@ static int load_module(void)
{ {
/* You can create a message router to route messages by type */ /* You can create a message router to route messages by type */
router = stasis_message_router_create( router = stasis_message_router_create(
ast_channel_topic_all_cached()); ast_channel_topic_all());
if (!router) { if (!router) {
return AST_MODULE_LOAD_DECLINE; return AST_MODULE_LOAD_DECLINE;
} }
stasis_message_router_add(router, stasis_cache_update_type(), stasis_message_router_add(router, ast_channel_snapshot_type(),
updates, NULL); updates, NULL);
stasis_message_router_set_default(router, default_route, NULL); stasis_message_router_set_default(router, default_route, NULL);

@ -144,17 +144,11 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app,
} }
forwards->forward_type = FORWARD_CHANNEL; forwards->forward_type = FORWARD_CHANNEL;
if (chan) { forwards->topic_forward = stasis_forward_all(
forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), chan ? ast_channel_topic(chan) : ast_channel_topic_all(),
app->topic);
}
forwards->topic_cached_forward = stasis_forward_all(
chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
app->topic); app->topic);
if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) { if (!forwards->topic_forward) {
/* Half-subscribed is a bad thing */
forwards_unsubscribe(forwards);
ao2_ref(forwards, -1); ao2_ref(forwards, -1);
return NULL; return NULL;
} }
@ -420,7 +414,7 @@ static struct ast_json *channel_state(
if (!old_snapshot) { if (!old_snapshot) {
return channel_created_event(snapshot, tv); return channel_created_event(snapshot, tv);
} else if (!new_snapshot) { } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) {
return channel_destroyed_event(snapshot, tv); return channel_destroyed_event(snapshot, tv);
} else if (old_snapshot->state != new_snapshot->state) { } else if (old_snapshot->state != new_snapshot->state) {
return channel_state_change_event(snapshot, tv); return channel_state_change_event(snapshot, tv);
@ -436,8 +430,8 @@ static struct ast_json *channel_dialplan(
{ {
struct ast_json *json_channel; struct ast_json *json_channel;
/* No Newexten event on cache clear or first event */ /* No Newexten event on first channel snapshot */
if (!old_snapshot || !new_snapshot) { if (!old_snapshot) {
return NULL; return NULL;
} }
@ -470,8 +464,8 @@ static struct ast_json *channel_callerid(
{ {
struct ast_json *json_channel; struct ast_json *json_channel;
/* No NewCallerid event on cache clear or first event */ /* No NewCallerid event on first channel snapshot */
if (!old_snapshot || !new_snapshot) { if (!old_snapshot) {
return NULL; return NULL;
} }
@ -500,8 +494,8 @@ static struct ast_json *channel_connected_line(
{ {
struct ast_json *json_channel; struct ast_json *json_channel;
/* No ChannelConnectedLine event on cache clear or first event */ /* No ChannelConnectedLine event on first channel snapshot */
if (!old_snapshot || !new_snapshot) { if (!old_snapshot) {
return NULL; return NULL;
} }
@ -532,39 +526,22 @@ static void sub_channel_update_handler(void *data,
struct stasis_message *message) struct stasis_message *message)
{ {
struct stasis_app *app = data; struct stasis_app *app = data;
struct stasis_cache_update *update; struct ast_channel_snapshot_update *update = stasis_message_data(message);
struct ast_channel_snapshot *new_snapshot;
struct ast_channel_snapshot *old_snapshot;
const struct timeval *tv;
int i; int i;
ast_assert(stasis_message_type(message) == stasis_cache_update_type());
update = stasis_message_data(message);
ast_assert(update->type == ast_channel_snapshot_type());
new_snapshot = stasis_message_data(update->new_snapshot);
old_snapshot = stasis_message_data(update->old_snapshot);
/* Pull timestamp from the new snapshot, or from the update message
* when there isn't one. */
tv = update->new_snapshot ?
stasis_message_timestamp(update->new_snapshot) :
stasis_message_timestamp(message);
for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
struct ast_json *msg; struct ast_json *msg;
msg = channel_monitors[i](old_snapshot, new_snapshot, tv); msg = channel_monitors[i](update->old_snapshot, update->new_snapshot,
stasis_message_timestamp(message));
if (msg) { if (msg) {
app_send(app, msg); app_send(app, msg);
ast_json_unref(msg); ast_json_unref(msg);
} }
} }
if (!new_snapshot && old_snapshot) { if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
unsubscribe(app, "channel", old_snapshot->uniqueid, 1); unsubscribe(app, "channel", update->new_snapshot->uniqueid, 1);
} }
} }
@ -987,7 +964,7 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat
res |= stasis_message_router_add_cache_update(app->router, res |= stasis_message_router_add_cache_update(app->router,
ast_bridge_snapshot_type(), sub_bridge_update_handler, app); ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
res |= stasis_message_router_add_cache_update(app->router, res |= stasis_message_router_add(app->router,
ast_channel_snapshot_type(), sub_channel_update_handler, app); ast_channel_snapshot_type(), sub_channel_update_handler, app);
res |= stasis_message_router_add_cache_update(app->router, res |= stasis_message_router_add_cache_update(app->router,

@ -773,22 +773,7 @@ void stasis_app_control_silence_stop(struct stasis_app_control *control)
struct ast_channel_snapshot *stasis_app_control_get_snapshot( struct ast_channel_snapshot *stasis_app_control_get_snapshot(
const struct stasis_app_control *control) const struct stasis_app_control *control)
{ {
struct stasis_message *msg; return ast_channel_snapshot_get_latest(stasis_app_control_get_channel_id(control));
struct ast_channel_snapshot *snapshot;
msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
stasis_app_control_get_channel_id(control));
if (!msg) {
return NULL;
}
snapshot = stasis_message_data(msg);
ast_assert(snapshot != NULL);
ao2_ref(snapshot, +1);
ao2_ref(msg, -1);
return snapshot;
} }
static int app_send_command_on_condition(struct stasis_app_control *control, static int app_send_command_on_condition(struct stasis_app_control *control,

@ -276,8 +276,7 @@ static void do_sleep(void)
ast_hangup((channel)); \ ast_hangup((channel)); \
HANGUP_EVENT(channel, cause, dialstatus); \ HANGUP_EVENT(channel, cause, dialstatus); \
APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL); \ APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL); \
ao2_cleanup(stasis_cache_get(ast_channel_cache(), \ ao2_cleanup(ast_channel_snapshot_get_latest(ast_channel_uniqueid(channel))); \
ast_channel_snapshot_type(), ast_channel_uniqueid(channel))); \
ao2_cleanup(channel); \ ao2_cleanup(channel); \
channel = NULL; \ channel = NULL; \
} while (0) } while (0)

@ -255,32 +255,18 @@ AST_TEST_DEFINE(channel_messages)
ast_hangup(chan); ast_hangup(chan);
chan = NULL; chan = NULL;
actual_count = stasis_message_sink_wait_for_count(sink, 6, actual_count = stasis_message_sink_wait_for_count(sink, 3,
STASIS_SINK_DEFAULT_WAIT); STASIS_SINK_DEFAULT_WAIT);
ast_test_validate(test, 6 == actual_count); ast_test_validate(test, 3 == actual_count);
msg = sink->messages[1]; msg = sink->messages[1];
type = stasis_message_type(msg); type = stasis_message_type(msg);
ast_test_validate(test, stasis_cache_update_type() == type);
msg = sink->messages[2];
type = stasis_message_type(msg);
ast_test_validate(test, ast_channel_snapshot_type() == type); ast_test_validate(test, ast_channel_snapshot_type() == type);
msg = sink->messages[3]; msg = sink->messages[2];
type = stasis_message_type(msg);
ast_test_validate(test, stasis_cache_update_type() == type);
/* The ordering of the cache clear and endpoint snapshot are
* unspecified */
msg = sink->messages[4];
if (stasis_message_type(msg) == stasis_cache_clear_type()) {
/* Okay; the next message should be the endpoint snapshot */
msg = sink->messages[5];
}
type = stasis_message_type(msg); type = stasis_message_type(msg);
ast_test_validate(test, ast_endpoint_snapshot_type() == type); ast_test_validate(test, ast_endpoint_snapshot_type() == type);
actual_snapshot = stasis_message_data(msg); actual_snapshot = stasis_message_data(msg);
ast_test_validate(test, 0 == actual_snapshot->num_channels); ast_test_validate(test, 0 == actual_snapshot->num_channels);

Loading…
Cancel
Save