From ee77b475f2177c07e4208d24d9258e9ebc4008b5 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Wed, 25 Mar 2009 21:57:19 +0000 Subject: [PATCH] Improve performance of the ast_event cache functionality. This code comes from svn/asterisk/team/russell/event_performance/. Here is a summary of the changes that have been made, in order of both invasiveness and performance impact, from smallest to largest. 1) Asterisk 1.6.1 introduces some additional logic to be able to handle distributed device state. This functionality comes at a cost. One relatively minor change in this patch is that the extra processing required for distributed device state is now completely bypassed if it's not needed. 2) One of the things that I noticed when profiling this code was that a _lot_ of time was spent doing string comparisons. I changed the way strings are represented in an event to include a hash value at the front. So, before doing a string comparison, we do an integer comparison on the hash. 3) Finally, the code that handles the event cache has been re-written. I tried to do this in a such a way that it had minimal impact on the API. I did have to change one API call, though - ast_event_queue_and_cache(). However, the way it works now is nicer, IMO. Each type of event that can be cached (MWI, device state) has its own hash table and rules for hashing and comparing objects. This by far made the biggest impact on performance. For additional details regarding this code and how it was tested, please see the review request. (closes issue #14738) Reported by: russell Review: http://reviewboard.digium.com/r/205/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@184339 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- apps/app_minivm.c | 7 +- apps/app_voicemail.c | 5 +- channels/chan_dahdi.c | 6 +- channels/chan_iax2.c | 2 - channels/chan_mgcp.c | 1 - channels/chan_sip.c | 7 +- channels/chan_unistim.c | 1 - include/asterisk/_private.h | 2 +- include/asterisk/devicestate.h | 17 ++ include/asterisk/event.h | 52 ++-- include/asterisk/strings.h | 23 ++ main/asterisk.c | 5 +- main/devicestate.c | 69 +++-- main/event.c | 486 ++++++++++++++++++++++++--------- res/ais/evt.c | 48 ++-- 15 files changed, 478 insertions(+), 253 deletions(-) diff --git a/apps/app_minivm.c b/apps/app_minivm.c index c526758486..c4bbce43d2 100644 --- a/apps/app_minivm.c +++ b/apps/app_minivm.c @@ -1785,13 +1785,10 @@ static void queue_mwi_event(const char *mbx, const char *ctx, int urgent, int ne AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, (new+urgent), AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, old, AST_EVENT_IE_END))) { - return; + return; } - ast_event_queue_and_cache(event, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_END); + ast_event_queue_and_cache(event); } /*! \brief Send MWI using interal Asterisk event subsystem */ diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index 05dae3117a..b2ec1e0811 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -6215,10 +6215,7 @@ static void queue_mwi_event(const char *box, int urgent, int new, int old) return; } - ast_event_queue_and_cache(event, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_END); + ast_event_queue_and_cache(event); } /*! diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index 6e765ec692..6df93c7959 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -2959,10 +2959,7 @@ static void notify_message(char *mailbox_full, int thereornot) return; } - ast_event_queue_and_cache(event, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_END); + ast_event_queue_and_cache(event); if (!ast_strlen_zero(mailbox) && !ast_strlen_zero(mwimonitornotify)) { snprintf(s, sizeof(s), "%s %s %d", mwimonitornotify, mailbox, thereornot); @@ -3016,7 +3013,6 @@ static int has_voicemail(struct dahdi_pvt *p) event = ast_event_get_cached(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS, AST_EVENT_IE_END); if (event) { diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index 48dd24a2ad..3d88ecb48f 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -7394,8 +7394,6 @@ static int update_registry(struct sockaddr_in *sin, int callno, char *devtype, i event = ast_event_get_cached(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_EXISTS, AST_EVENT_IE_END); if (event) { new = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index de502dac80..1c14829756 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -472,7 +472,6 @@ static int has_voicemail(struct mgcp_endpoint *p) event = ast_event_get_cached(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox, AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cntx, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS, AST_EVENT_IE_END); if (event) { diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 67d41e4b31..bc121b1289 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -18059,10 +18059,7 @@ static int handle_request_notify(struct sip_pvt *p, struct sip_request *req, str AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, atoi(new), AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, atoi(old), AST_EVENT_IE_END))) { - ast_event_queue_and_cache(event, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_END); + ast_event_queue_and_cache(event); } } @@ -21038,8 +21035,6 @@ static int get_cached_mwi(struct sip_peer *peer, int *new, int *old) event = ast_event_get_cached(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox->mailbox, AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, S_OR(mailbox->context, "default"), - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_EXISTS, AST_EVENT_IE_END); if (!event) continue; diff --git a/channels/chan_unistim.c b/channels/chan_unistim.c index 8229c9a328..818a32d71d 100644 --- a/channels/chan_unistim.c +++ b/channels/chan_unistim.c @@ -4391,7 +4391,6 @@ static int unistim_send_mwi_to_peer(struct unistimsession *s, unsigned int tick) event = ast_event_get_cached(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS, AST_EVENT_IE_END); if (event) { diff --git a/include/asterisk/_private.h b/include/asterisk/_private.h index 34c87c767b..c709b1f152 100644 --- a/include/asterisk/_private.h +++ b/include/asterisk/_private.h @@ -29,7 +29,7 @@ int dnsmgr_init(void); /*!< Provided by dnsmgr.c */ void dnsmgr_start_refresh(void); /*!< Provided by dnsmgr.c */ int dnsmgr_reload(void); /*!< Provided by dnsmgr.c */ void threadstorage_init(void); /*!< Provided by threadstorage.c */ -void ast_event_init(void); /*!< Provided by event.c */ +int ast_event_init(void); /*!< Provided by event.c */ int ast_device_state_engine_init(void); /*!< Provided by devicestate.c */ int astobj2_init(void); /*!< Provided by astobj2.c */ int ast_file_init(void); /*!< Provided by file.c */ diff --git a/include/asterisk/devicestate.h b/include/asterisk/devicestate.h index fe911bacfe..e4634a797a 100644 --- a/include/asterisk/devicestate.h +++ b/include/asterisk/devicestate.h @@ -37,6 +37,8 @@ #ifndef _ASTERISK_DEVICESTATE_H #define _ASTERISK_DEVICESTATE_H +#include "asterisk/channel.h" + #if defined(__cplusplus) || defined(c_plusplus) extern "C" { #endif @@ -260,6 +262,21 @@ struct ast_devstate_aggregate { unsigned int ring:1; }; +/*! + * \brief Enable distributed device state processing. + * + * \details + * By default, Asterisk assumes that device state change events will only be + * originating from one instance. If a module gets loaded and configured such + * that multiple instances of Asterisk will be sharing device state, this + * function should be called to enable distributed device state processing. + * It is off by default to save on unnecessary processing. + * + * \retval 0 success + * \retval -1 failure + */ +int ast_enable_distributed_devstate(void); + #if defined(__cplusplus) || defined(c_plusplus) } #endif diff --git a/include/asterisk/event.h b/include/asterisk/event.h index 96cf9bd0ac..ac42e59427 100644 --- a/include/asterisk/event.h +++ b/include/asterisk/event.h @@ -358,42 +358,18 @@ int ast_event_queue(struct ast_event *event); * * \param event the event to be queued and cached * - * The rest of the arguments to this function specify information elements to - * use for determining which events in the cache that this event should replace. - * All events in the cache that match the specified criteria will be removed from - * the cache and then this one will be added. The arguments are specified in - * the form: - * - * \code - * , [enum ast_event_ie_pltype] - * \endcode - * and must end with AST_EVENT_IE_END. - * - * If the ie_type specified is *not* AST_EVENT_IE_END, then it must be followed - * by a valid IE payload type. If the payload type given is EXISTS, then all - * events that contain that information element will be removed from the cache. - * Otherwise, all events in the cache that contain an information element with - * the same value as the new event will be removed. - * - * \note If more than one IE parameter is specified, they *all* must match for - * the event to be removed from the cache. - * - * Example usage: - * - * \code - * ast_event_queue_and_cache(event, - * AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, - * AST_EVENT_IE_END); - * \endcode - * - * This example queues and caches an event. Any events in the cache that have - * the same MAILBOX information element as this event will be removed. - * + * \details * The purpose of caching events is so that the core can retain the last known * information for events that represent some sort of state. That way, when * code needs to find out the current state, it can query the cache. + * + * The event API already knows which events can be cached and how to cache them. + * + * \retval 0 success + * \retval non-zero failure. If failure is returned, the event must be destroyed + * by the caller of this function. */ -int ast_event_queue_and_cache(struct ast_event *event, ...); +int ast_event_queue_and_cache(struct ast_event *event); /*! * \brief Retrieve an event from the cache @@ -510,6 +486,18 @@ uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_ */ const char *ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type); +/*! + * \brief Get the hash for the string payload of an IE + * + * \param event The event to get the IE from + * \param ie_type the type of information element to retrieve the hash for + * + * \return This function returns the hash value as calculated by ast_str_hash() + * for the string payload. This is stored in the event to avoid + * unnecessary string comparisons. + */ +uint32_t ast_event_get_ie_str_hash(const struct ast_event *event, enum ast_event_ie_type ie_type); + /*! * \brief Get the value of an information element that has a raw payload * diff --git a/include/asterisk/strings.h b/include/asterisk/strings.h index 94ca668a4e..3e0dd46f0d 100644 --- a/include/asterisk/strings.h +++ b/include/asterisk/strings.h @@ -851,6 +851,29 @@ static force_inline int attribute_pure ast_str_hash(const char *str) return abs(hash); } +/*! + * \brief Compute a hash value on a string + * + * \param[in] str The string to add to the hash + * \param[in] hash The hash value to add to + * + * \details + * This version of the function is for when you need to compute a + * string hash of more than one string. + * + * This famous hash algorithm was written by Dan Bernstein and is + * commonly used. + * + * \sa http://www.cse.yorku.ca/~oz/hash.html + */ +static force_inline int ast_str_hash_add(const char *str, int hash) +{ + while (*str) + hash = hash * 33 ^ *str++; + + return abs(hash); +} + /*! * \brief Compute a hash value on a case-insensitive string * diff --git a/main/asterisk.c b/main/asterisk.c index 58dd81a5d9..ba90103372 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -3509,7 +3509,10 @@ int main(int argc, char *argv[]) } #endif - ast_event_init(); + if (ast_event_init()) { + printf("%s", term_quit()); + exit(1); + } ast_makesocket(); sigemptyset(&sigs); diff --git a/main/devicestate.c b/main/devicestate.c index 8410b95928..4572c66046 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -196,8 +196,10 @@ struct { ast_cond_t cond; ast_mutex_t lock; AST_LIST_HEAD_NOLOCK(, devstate_change) devstate_change_q; + unsigned int enabled:1; } devstate_collector = { .thread = AST_PTHREADT_NULL, + .enabled = 0, }; /* Forward declarations */ @@ -428,22 +430,26 @@ static int getproviderstate(const char *provider, const char *address) static void devstate_event(const char *device, enum ast_device_state state) { struct ast_event *event; + enum ast_event_type event_type; + + if (devstate_collector.enabled) { + /* Distributed device state is enabled, so this state change is a change + * for a single server, not the real state. */ + event_type = AST_EVENT_DEVICE_STATE_CHANGE; + } else { + event_type = AST_EVENT_DEVICE_STATE; + } ast_debug(3, "device '%s' state '%d'\n", device, state); - if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE, + if (!(event = ast_event_new(event_type, AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state, AST_EVENT_IE_END))) { return; } - /* Cache this event, replacing an event in the cache with the same - * device name if it exists. */ - ast_event_queue_and_cache(event, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, sizeof(struct ast_eid), - AST_EVENT_IE_END); + ast_event_queue_and_cache(event); } /*! Called by the state change thread to find out what the state is, and then @@ -632,13 +638,12 @@ static void process_collection(const char *device, struct change_collection *col AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, device, AST_EVENT_IE_STATE, AST_EVENT_IE_PLTYPE_UINT, state, AST_EVENT_IE_END); - - if (!event) + + if (!event) { return; + } - ast_event_queue_and_cache(event, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_END); + ast_event_queue_and_cache(event); } static void handle_devstate_change(struct devstate_change *sc) @@ -719,21 +724,6 @@ static void devstate_change_collector_cb(const struct ast_event *event, void *da /*! \brief Initialize the device state engine in separate thread */ int ast_device_state_engine_init(void) { - devstate_collector.event_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, - devstate_change_collector_cb, NULL, AST_EVENT_IE_END); - - if (!devstate_collector.event_sub) { - ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); - return -1; - } - - ast_mutex_init(&devstate_collector.lock); - ast_cond_init(&devstate_collector.cond, NULL); - if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) { - ast_log(LOG_ERROR, "Unable to start device state collector thread.\n"); - return -1; - } - ast_cond_init(&change_pending, NULL); if (ast_pthread_create_background(&change_thread, NULL, do_devstate_changes, NULL) < 0) { ast_log(LOG_ERROR, "Unable to start device state change thread.\n"); @@ -830,3 +820,28 @@ enum ast_device_state ast_devstate_aggregate_result(struct ast_devstate_aggregat return AST_DEVICE_NOT_INUSE; } +int ast_enable_distributed_devstate(void) +{ + if (devstate_collector.enabled) { + return 0; + } + + devstate_collector.event_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, + devstate_change_collector_cb, NULL, AST_EVENT_IE_END); + + if (!devstate_collector.event_sub) { + ast_log(LOG_ERROR, "Failed to create subscription for the device state change collector\n"); + return -1; + } + + ast_mutex_init(&devstate_collector.lock); + ast_cond_init(&devstate_collector.cond, NULL); + if (ast_pthread_create_background(&devstate_collector.thread, NULL, run_devstate_collector, NULL) < 0) { + ast_log(LOG_ERROR, "Unable to start device state collector thread.\n"); + return -1; + } + + devstate_collector.enabled = 1; + + return 0; +} diff --git a/main/event.c b/main/event.c index 956ce6f911..e29f057e3f 100644 --- a/main/event.c +++ b/main/event.c @@ -28,6 +28,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/_private.h" + #include "asterisk/event.h" #include "asterisk/linkedlists.h" #include "asterisk/dlinkedlists.h" @@ -36,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/unaligned.h" #include "asterisk/utils.h" #include "asterisk/taskprocessor.h" +#include "asterisk/astobj2.h" struct ast_taskprocessor *event_dispatcher; @@ -54,6 +56,16 @@ struct ast_event_ie { unsigned char ie_payload[0]; } __attribute__((packed)); +/*! + * \brief The payload for a string information element + */ +struct ast_event_ie_str_payload { + /*! \brief A hash calculated with ast_str_hash(), to speed up comparisons */ + uint32_t hash; + /*! \brief The actual string, null terminated */ + char str[1]; +} __attribute__((packed)); + /*! * \brief An event * @@ -85,7 +97,10 @@ struct ast_event_ie_val { enum ast_event_ie_pltype ie_pltype; union { uint32_t uint; - const char *str; + struct { + uint32_t hash; + const char *str; + }; void *raw; } payload; size_t raw_datalen; @@ -107,11 +122,55 @@ static uint32_t sub_uniqueid; * The event subscribers are indexed by which event they are subscribed to */ static AST_RWDLLIST_HEAD(ast_event_sub_list, ast_event_sub) ast_event_subs[AST_EVENT_TOTAL]; -/*! \brief Cached events - * The event cache is indexed on the event type. The purpose of this is - * for events that express some sort of state. So, when someone first - * needs to know this state, it can get the last known state from the cache. */ -static AST_RWLIST_HEAD(ast_event_ref_list, ast_event_ref) ast_event_cache[AST_EVENT_TOTAL]; +static int ast_event_cmp(void *obj, void *arg, int flags); +static int ast_event_hash_mwi(const void *obj, const int flags); +static int ast_event_hash_devstate(const void *obj, const int flags); +static int ast_event_hash_devstate_change(const void *obj, const int flags); + +#ifdef LOW_MEMORY +#define NUM_CACHE_BUCKETS 17 +#else +#define NUM_CACHE_BUCKETS 563 +#endif + +#define MAX_CACHE_ARGS 8 + +/*! + * \brief Event types that are kept in the cache. + */ +static struct { + /*! + * \brief Container of cached events + * + * \details This gets allocated in ast_event_init() when Asterisk starts + * for the event types declared as using the cache. + */ + struct ao2_container *container; + /*! \brief Event type specific hash function */ + ao2_hash_fn *hash_fn; + /*! + * \brief Information Elements used for caching + * + * \details This array is the set of information elements that will be unique + * among all events in the cache for this event type. When a new event gets + * cached, a previous event with the same values for these information elements + * will be replaced. + */ + enum ast_event_ie_type cache_args[MAX_CACHE_ARGS]; +} ast_event_cache[AST_EVENT_TOTAL] = { + [AST_EVENT_MWI] = { + .hash_fn = ast_event_hash_mwi, + .cache_args = { AST_EVENT_IE_MAILBOX, AST_EVENT_IE_CONTEXT }, + }, + [AST_EVENT_DEVICE_STATE] = { + .hash_fn = ast_event_hash_devstate, + .cache_args = { AST_EVENT_IE_DEVICE, }, + }, + [AST_EVENT_DEVICE_STATE_CHANGE] = { + .hash_fn = ast_event_hash_devstate_change, + .cache_args = { AST_EVENT_IE_DEVICE, AST_EVENT_IE_EID, }, + }, +}; /*! * The index of each entry _must_ match the event type number! @@ -237,6 +296,8 @@ static void ast_event_ie_val_destroy(struct ast_event_ie_val *ie_val) { switch (ie_val->ie_pltype) { case AST_EVENT_IE_PLTYPE_STR: + ast_free((char *) ie_val->payload.str); + break; case AST_EVENT_IE_PLTYPE_RAW: ast_free(ie_val->payload.raw); break; @@ -328,7 +389,8 @@ enum ast_event_subscriber_res ast_event_check_subscriber(enum ast_event_type typ return res; } -static int match_ie_val(struct ast_event *event, struct ast_event_ie_val *ie_val, struct ast_event *event2) +static int match_ie_val(const struct ast_event *event, + const struct ast_event_ie_val *ie_val, const struct ast_event *event2) { if (ie_val->ie_pltype == AST_EVENT_IE_PLTYPE_UINT) { uint32_t val = event2 ? ast_event_get_ie_uint(event2, ie_val->ie_type) : ie_val->payload.uint; @@ -338,9 +400,19 @@ static int match_ie_val(struct ast_event *event, struct ast_event_ie_val *ie_val } if (ie_val->ie_pltype == AST_EVENT_IE_PLTYPE_STR) { - const char *str = event2 ? ast_event_get_ie_str(event2, ie_val->ie_type) : ie_val->payload.str; - if (str && !strcmp(str, ast_event_get_ie_str(event, ie_val->ie_type))) + const char *str; + uint32_t hash; + + hash = event2 ? ast_event_get_ie_str_hash(event2, ie_val->ie_type) : ie_val->payload.hash; + if (hash != ast_event_get_ie_str_hash(event, ie_val->ie_type)) { + return 0; + } + + str = event2 ? ast_event_get_ie_str(event2, ie_val->ie_type) : ie_val->payload.str; + if (str && !strcmp(str, ast_event_get_ie_str(event, ie_val->ie_type))) { return 1; + } + return 0; } @@ -360,26 +432,32 @@ static int match_ie_val(struct ast_event *event, struct ast_event_ie_val *ie_val return 0; } -/*! \brief Dump the event cache for the subscribed event type */ -void ast_event_dump_cache(const struct ast_event_sub *event_sub) +static int dump_cache_cb(void *obj, void *arg, int flags) { - struct ast_event_ref *event_ref; - enum ast_event_type type = event_sub->type; + const struct ast_event_ref *event_ref = obj; + const struct ast_event *event = event_ref->event; + const struct ast_event_sub *event_sub = arg; + struct ast_event_ie_val *ie_val = NULL; - AST_RWLIST_RDLOCK(&ast_event_cache[type]); - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&ast_event_cache[type], event_ref, entry) { - struct ast_event_ie_val *ie_val; - AST_LIST_TRAVERSE(&event_sub->ie_vals, ie_val, entry) { - if (!match_ie_val(event_ref->event, ie_val, NULL)) - break; - } - if (!ie_val) { - /* All parameters were matched on this cache entry, so dump it */ - event_sub->cb(event_ref->event, event_sub->userdata); + AST_LIST_TRAVERSE(&event_sub->ie_vals, ie_val, entry) { + if (!match_ie_val(event, ie_val, NULL)) { + break; } } - AST_RWLIST_TRAVERSE_SAFE_END - AST_RWLIST_UNLOCK(&ast_event_cache[type]); + + if (!ie_val) { + /* All parameters were matched on this cache entry, so dump it */ + event_sub->cb(event, event_sub->userdata); + } + + return 0; +} + +/*! \brief Dump the event cache for the subscribed event type */ +void ast_event_dump_cache(const struct ast_event_sub *event_sub) +{ + ao2_callback(ast_event_cache[event_sub->type].container, OBJ_NODATA, + dump_cache_cb, (void *) event_sub); } static struct ast_event *gen_sub_event(struct ast_event_sub *sub) @@ -536,6 +614,8 @@ int ast_event_sub_append_ie_str(struct ast_event_sub *sub, return -1; } + ie_val->payload.hash = ast_str_hash(str); + AST_LIST_INSERT_TAIL(&sub->ie_vals, ie_val, entry); return 0; @@ -703,7 +783,11 @@ uint32_t ast_event_iterator_get_ie_uint(struct ast_event_iterator *iterator) const char *ast_event_iterator_get_ie_str(struct ast_event_iterator *iterator) { - return (const char*)iterator->ie->ie_payload; + const struct ast_event_ie_str_payload *str_payload; + + str_payload = (struct ast_event_ie_str_payload *) iterator->ie->ie_payload; + + return str_payload->str; } void *ast_event_iterator_get_ie_raw(struct ast_event_iterator *iterator) @@ -725,9 +809,22 @@ uint32_t ast_event_get_ie_uint(const struct ast_event *event, enum ast_event_ie_ return ie_val ? ntohl(get_unaligned_uint32(ie_val)) : 0; } +uint32_t ast_event_get_ie_str_hash(const struct ast_event *event, enum ast_event_ie_type ie_type) +{ + const struct ast_event_ie_str_payload *str_payload; + + str_payload = ast_event_get_ie_raw(event, ie_type); + + return str_payload->hash; +} + const char *ast_event_get_ie_str(const struct ast_event *event, enum ast_event_ie_type ie_type) { - return ast_event_get_ie_raw(event, ie_type); + const struct ast_event_ie_str_payload *str_payload; + + str_payload = ast_event_get_ie_raw(event, ie_type); + + return str_payload->str; } const void *ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_ie_type ie_type) @@ -746,7 +843,16 @@ const void *ast_event_get_ie_raw(const struct ast_event *event, enum ast_event_i int ast_event_append_ie_str(struct ast_event **event, enum ast_event_ie_type ie_type, const char *str) { - return ast_event_append_ie_raw(event, ie_type, str, strlen(str) + 1); + struct ast_event_ie_str_payload *str_payload; + size_t payload_len; + + payload_len = sizeof(*str_payload) + strlen(str); + str_payload = alloca(payload_len); + + strcpy(str_payload->str, str); + str_payload->hash = ast_str_hash(str); + + return ast_event_append_ie_raw(event, ie_type, str_payload, payload_len); } int ast_event_append_ie_uint(struct ast_event **event, enum ast_event_ie_type ie_type, @@ -850,10 +956,11 @@ void ast_event_destroy(struct ast_event *event) ast_free(event); } -static void ast_event_ref_destroy(struct ast_event_ref *event_ref) +static void ast_event_ref_destroy(void *obj) { + struct ast_event_ref *event_ref = obj; + ast_event_destroy(event_ref->event); - ast_free(event_ref); } static struct ast_event *ast_event_dup(const struct ast_event *event) @@ -863,9 +970,10 @@ static struct ast_event *ast_event_dup(const struct ast_event *event) event_len = ast_event_get_size(event); - if (!(dup_event = ast_calloc(1, event_len))) + if (!(dup_event = ast_calloc(1, event_len))) { return NULL; - + } + memcpy(dup_event, event, event_len); return dup_event; @@ -876,139 +984,122 @@ struct ast_event *ast_event_get_cached(enum ast_event_type type, ...) va_list ap; enum ast_event_ie_type ie_type; struct ast_event *dup_event = NULL; - struct ast_event_ref *event_ref; - struct ast_event_ie_val *cache_arg; - AST_LIST_HEAD_NOLOCK_STATIC(cache_args, ast_event_ie_val); + struct ast_event_ref *cached_event_ref; + struct ast_event *cache_arg_event; + struct ast_event_ref tmp_event_ref = { + .event = NULL, + }; + struct ao2_container *container = NULL; if (type >= AST_EVENT_TOTAL) { ast_log(LOG_ERROR, "%u is an invalid type!\n", type); return NULL; } + if (!(container = ast_event_cache[type].container)) { + ast_log(LOG_ERROR, "%u is not a cached event type\n", type); + return NULL; + } + + if (!(cache_arg_event = ast_event_new(type, AST_EVENT_IE_END))) { + return NULL; + } + va_start(ap, type); for (ie_type = va_arg(ap, enum ast_event_type); ie_type != AST_EVENT_IE_END; ie_type = va_arg(ap, enum ast_event_type)) { - cache_arg = alloca(sizeof(*cache_arg)); - memset(cache_arg, 0, sizeof(*cache_arg)); - cache_arg->ie_type = ie_type; - cache_arg->ie_pltype = va_arg(ap, enum ast_event_ie_pltype); - if (cache_arg->ie_pltype == AST_EVENT_IE_PLTYPE_UINT) - cache_arg->payload.uint = va_arg(ap, uint32_t); - else if (cache_arg->ie_pltype == AST_EVENT_IE_PLTYPE_STR) - cache_arg->payload.str = ast_strdupa(va_arg(ap, const char *)); - else if (cache_arg->ie_pltype == AST_EVENT_IE_PLTYPE_RAW) { + enum ast_event_ie_pltype ie_pltype; + + ie_pltype = va_arg(ap, enum ast_event_ie_pltype); + + switch (ie_pltype) { + case AST_EVENT_IE_PLTYPE_UINT: + ast_event_append_ie_uint(&cache_arg_event, ie_type, va_arg(ap, uint32_t)); + break; + case AST_EVENT_IE_PLTYPE_STR: + ast_event_append_ie_str(&cache_arg_event, ie_type, va_arg(ap, const char *)); + break; + case AST_EVENT_IE_PLTYPE_RAW: + { void *data = va_arg(ap, void *); size_t datalen = va_arg(ap, size_t); - cache_arg->payload.raw = alloca(datalen); - memcpy(cache_arg->payload.raw, data, datalen); - cache_arg->raw_datalen = datalen; + ast_event_append_ie_raw(&cache_arg_event, ie_type, data, datalen); + } + case AST_EVENT_IE_PLTYPE_EXISTS: + ast_log(LOG_WARNING, "PLTYPE_EXISTS not supported by this function\n"); + break; + case AST_EVENT_IE_PLTYPE_UNKNOWN: + break; } - AST_LIST_INSERT_TAIL(&cache_args, cache_arg, entry); } va_end(ap); - if (AST_LIST_EMPTY(&cache_args)) { - ast_log(LOG_ERROR, "Events can not be retrieved from the cache without " - "specifying at least one IE type!\n"); - return NULL; - } + tmp_event_ref.event = cache_arg_event; - AST_RWLIST_RDLOCK(&ast_event_cache[type]); - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&ast_event_cache[type], event_ref, entry) { - AST_LIST_TRAVERSE(&cache_args, cache_arg, entry) { - if (!match_ie_val(event_ref->event, cache_arg, NULL)) - break; - } - if (!cache_arg) { - /* All parameters were matched on this cache entry, so return it */ - dup_event = ast_event_dup(event_ref->event); - break; - } + cached_event_ref = ao2_find(container, &tmp_event_ref, OBJ_POINTER); + + ast_event_destroy(cache_arg_event); + cache_arg_event = NULL; + + if (cached_event_ref) { + dup_event = ast_event_dup(cached_event_ref->event); + ao2_ref(cached_event_ref, -1); + cached_event_ref = NULL; } - AST_RWLIST_TRAVERSE_SAFE_END - AST_RWLIST_UNLOCK(&ast_event_cache[type]); return dup_event; } +static struct ast_event_ref *alloc_event_ref(void) +{ + return ao2_alloc(sizeof(struct ast_event_ref), ast_event_ref_destroy); +} + /*! \brief Duplicate an event and add it to the cache * \note This assumes this index in to the cache is locked */ -static int ast_event_dup_and_cache(const struct ast_event *event) +static int attribute_unused ast_event_dup_and_cache(const struct ast_event *event) { struct ast_event *dup_event; struct ast_event_ref *event_ref; - if (!(dup_event = ast_event_dup(event))) + if (!(dup_event = ast_event_dup(event))) { return -1; - if (!(event_ref = ast_calloc(1, sizeof(*event_ref)))) + } + + if (!(event_ref = alloc_event_ref())) { + ast_event_destroy(dup_event); return -1; - + } + event_ref->event = dup_event; - AST_LIST_INSERT_TAIL(&ast_event_cache[ntohs(event->type)], event_ref, entry); + ao2_link(ast_event_cache[ast_event_get_type(event)].container, event_ref); + + ao2_ref(event_ref, -1); return 0; } -int ast_event_queue_and_cache(struct ast_event *event, ...) +int ast_event_queue_and_cache(struct ast_event *event) { - va_list ap; - enum ast_event_type ie_type; - uint16_t host_event_type; - struct ast_event_ref *event_ref; - int res; - struct ast_event_ie_val *cache_arg; - AST_LIST_HEAD_NOLOCK_STATIC(cache_args, ast_event_ie_val); - - host_event_type = ntohs(event->type); - - /* Invalid type */ - if (host_event_type >= AST_EVENT_TOTAL) { - ast_log(LOG_WARNING, "Someone tried to queue an event of invalid " - "type '%d'!\n", host_event_type); - return -1; + struct ao2_container *container; + struct ast_event_ref tmp_event_ref = { + .event = event, + }; + + if (!(container = ast_event_cache[ast_event_get_type(event)].container)) { + ast_log(LOG_WARNING, "cache requested for non-cached event type\n"); + goto queue_event; } - va_start(ap, event); - for (ie_type = va_arg(ap, enum ast_event_type); - ie_type != AST_EVENT_IE_END; - ie_type = va_arg(ap, enum ast_event_type)) - { - cache_arg = alloca(sizeof(*cache_arg)); - memset(cache_arg, 0, sizeof(*cache_arg)); - cache_arg->ie_type = ie_type; - cache_arg->ie_pltype = va_arg(ap, enum ast_event_ie_pltype); - if (cache_arg->ie_pltype == AST_EVENT_IE_PLTYPE_RAW) - cache_arg->raw_datalen = va_arg(ap, size_t); - AST_LIST_INSERT_TAIL(&cache_args, cache_arg, entry); - } - va_end(ap); - - if (AST_LIST_EMPTY(&cache_args)) { - ast_log(LOG_ERROR, "Events can not be cached without specifying at " - "least one IE type!\n"); - return ast_event_queue(event); - } - - AST_RWLIST_WRLOCK(&ast_event_cache[host_event_type]); - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&ast_event_cache[host_event_type], event_ref, entry) { - AST_LIST_TRAVERSE(&cache_args, cache_arg, entry) { - if (!match_ie_val(event_ref->event, cache_arg, event)) - break; - } - if (!cache_arg) { - /* All parameters were matched on this cache entry, so remove it */ - AST_LIST_REMOVE_CURRENT(entry); - ast_event_ref_destroy(event_ref); - } - } - AST_RWLIST_TRAVERSE_SAFE_END; - res = ast_event_dup_and_cache(event); - AST_RWLIST_UNLOCK(&ast_event_cache[host_event_type]); + /* Remove matches from the cache */ + ao2_callback(container, OBJ_POINTER | OBJ_UNLINK | OBJ_MULTIPLE | OBJ_NODATA, + ast_event_cmp, &tmp_event_ref); - return (ast_event_queue(event) || res) ? -1 : 0; +queue_event: + return ast_event_queue(event); } static int handle_event(void *data) @@ -1024,22 +1115,25 @@ static int handle_event(void *data) AST_RWDLLIST_TRAVERSE(&ast_event_subs[host_event_type], sub, entry) { struct ast_event_ie_val *ie_val; AST_LIST_TRAVERSE(&sub->ie_vals, ie_val, entry) { - if (!match_ie_val(event_ref->event, ie_val, NULL)) + if (!match_ie_val(event_ref->event, ie_val, NULL)) { break; + } } - if (ie_val) + if (ie_val) { continue; + } sub->cb(event_ref->event, sub->userdata); } AST_RWDLLIST_UNLOCK(&ast_event_subs[host_event_type]); /* Now to subscribers to all event types */ AST_RWDLLIST_RDLOCK(&ast_event_subs[AST_EVENT_ALL]); - AST_RWDLLIST_TRAVERSE(&ast_event_subs[AST_EVENT_ALL], sub, entry) + AST_RWDLLIST_TRAVERSE(&ast_event_subs[AST_EVENT_ALL], sub, entry) { sub->cb(event_ref->event, sub->userdata); + } AST_RWDLLIST_UNLOCK(&ast_event_subs[AST_EVENT_ALL]); - ast_event_ref_destroy(event_ref); + ao2_ref(event_ref, -1); return 0; } @@ -1059,29 +1153,149 @@ int ast_event_queue(struct ast_event *event) } /* If nobody has subscribed to this event type, throw it away now */ - if (ast_event_check_subscriber(host_event_type, AST_EVENT_IE_END) - == AST_EVENT_SUB_NONE) { + if (ast_event_check_subscriber(host_event_type, AST_EVENT_IE_END) + == AST_EVENT_SUB_NONE) { ast_event_destroy(event); return 0; } - if (!(event_ref = ast_calloc(1, sizeof(*event_ref)))) + if (!(event_ref = alloc_event_ref())) { return -1; + } event_ref->event = event; return ast_taskprocessor_push(event_dispatcher, handle_event, event_ref); } -void ast_event_init(void) +static int ast_event_hash_mwi(const void *obj, const int flags) +{ + const struct ast_event *event = obj; + const char *mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX); + const char *context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT); + + return ast_str_hash_add(context, ast_str_hash(mailbox)); +} + +/*! + * \internal + * \brief Hash function for AST_EVENT_DEVICE_STATE + * + * \param[in] obj an ast_event + * \param[in] flags unused + * + * \return hash value + */ +static int ast_event_hash_devstate(const void *obj, const int flags) +{ + const struct ast_event *event = obj; + + return ast_str_hash(ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE)); +} + +/*! + * \internal + * \brief Hash function for AST_EVENT_DEVICE_STATE_CHANGE + * + * \param[in] obj an ast_event + * \param[in] flags unused + * + * \return hash value + */ +static int ast_event_hash_devstate_change(const void *obj, const int flags) +{ + const struct ast_event *event = obj; + + return ast_str_hash(ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE)); +} + +static int ast_event_hash(const void *obj, const int flags) +{ + const struct ast_event_ref *event_ref; + const struct ast_event *event; + ao2_hash_fn *hash_fn; + + event_ref = obj; + event = event_ref->event; + + if (!(hash_fn = ast_event_cache[ast_event_get_type(event)].hash_fn)) { + return 0; + } + + return hash_fn(event, flags); +} + +/*! + * \internal + * \brief Compare two events + * + * \param[in] obj the first event, as an ast_event_ref + * \param[in] arg the second event, as an ast_event_ref + * \param[in] flags unused + * + * \pre Both events must be the same type. + * \pre The event type must be declared as a cached event type in ast_event_cache + * + * \details This function takes two events, and determines if they are considered + * equivalent. The values of information elements specified in the cache arguments + * for the event type are used to determine if the events are equivalent. + * + * \retval 0 No match + * \retval CMP_MATCH The events are considered equivalent based on the cache arguments + */ +static int ast_event_cmp(void *obj, void *arg, int flags) +{ + struct ast_event_ref *event_ref, *event_ref2; + struct ast_event *event, *event2; + int res = CMP_MATCH; + int i; + enum ast_event_ie_type *cache_args; + + event_ref = obj; + event = event_ref->event; + + event_ref2 = arg; + event2 = event_ref2->event; + + cache_args = ast_event_cache[ast_event_get_type(event)].cache_args; + + for (i = 0; i < ARRAY_LEN(ast_event_cache[0].cache_args) && cache_args[i]; i++) { + struct ast_event_ie_val ie_val = { + .ie_type = cache_args[i], + }; + + if (!match_ie_val(event, &ie_val, event2)) { + res = 0; + break; + } + } + + return res; +} + +int ast_event_init(void) { int i; - for (i = 0; i < AST_EVENT_TOTAL; i++) + for (i = 0; i < AST_EVENT_TOTAL; i++) { AST_RWDLLIST_HEAD_INIT(&ast_event_subs[i]); + } - for (i = 0; i < AST_EVENT_TOTAL; i++) - AST_RWLIST_HEAD_INIT(&ast_event_cache[i]); + for (i = 0; i < AST_EVENT_TOTAL; i++) { + if (!ast_event_cache[i].hash_fn) { + /* This event type is not cached. */ + continue; + } - event_dispatcher = ast_taskprocessor_get("core_event_dispatcher", 0); + if (!(ast_event_cache[i].container = ao2_container_alloc(NUM_CACHE_BUCKETS, + ast_event_hash, ast_event_cmp))) { + return -1; + } + } + + if (!(event_dispatcher = ast_taskprocessor_get("core_event_dispatcher", 0))) { + return -1; + } + + return 0; } diff --git a/res/ais/evt.c b/res/ais/evt.c index 5de26e39c0..fe5fe66623 100644 --- a/res/ais/evt.c +++ b/res/ais/evt.c @@ -47,6 +47,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/event.h" #include "asterisk/config.h" #include "asterisk/linkedlists.h" +#include "asterisk/devicestate.h" #ifndef AST_MODULE /* XXX HACK */ @@ -111,34 +112,7 @@ void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_h static void queue_event(struct ast_event *ast_event) { - enum ast_event_type type; - - /*! - * \todo This hack macks me sad. I need to come up with a better way to - * figure out whether an event should be cached or not, and what - * parameters to cache on. - * - * As long as the types of events that are supported is limited, - * this isn't *terrible*, I guess. Perhaps we should just define - * caching rules in the core, and make them configurable, and not - * have it be the job of the event publishers. - */ - - type = ast_event_get_type(ast_event); - - if (type == AST_EVENT_MWI) { - ast_event_queue_and_cache(ast_event, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_END); - } else if (type == AST_EVENT_DEVICE_STATE_CHANGE) { - ast_event_queue_and_cache(ast_event, - AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, - AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, sizeof(struct ast_eid), - AST_EVENT_IE_END); - } else { - ast_event_queue(ast_event); - } + ast_event_queue_and_cache(ast_event); } void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id, @@ -341,9 +315,14 @@ static void add_publish_event(struct event_channel *event_channel, const char *e return; } - if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) + if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) { return; - + } + + if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) { + return; + } + publish_event->type = type; ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type); publish_event->sub = ast_event_subscribe(type, ast_event_cb, event_channel, @@ -399,9 +378,14 @@ static void add_subscribe_event(struct event_channel *event_channel, const char return; } - if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) + if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) { return; - + } + + if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) { + return; + } + subscribe_event->type = type; subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);