From 48d90d947d1084c4784138f83128837133d05435 Mon Sep 17 00:00:00 2001 From: "David M. Lee" Date: Mon, 30 Sep 2013 15:24:00 +0000 Subject: [PATCH] Stasis performance improvements This patch addresses several performance problems that were found in the initial performance testing of Asterisk 12. The Stasis dispatch object was allocated as an AO2 object, even though it has a very confined lifecycle. This was replaced with a straight ast_malloc(). The Stasis message router was spending an inordinate amount of time searching hash tables. In this case, most of our routers had 6 or fewer routes in them to begin with. This was replaced with an array that's searched linearly for the route. We more heavily rely on AO2 objects in Asterisk 12, and the memset() in ao2_ref() actually became noticeable on the profile. This was #ifdef'ed to only run when AO2_DEBUG was enabled. After being misled by an erroneous comment in taskprocessor.c during profiling, the wrong comment was removed. Review: https://reviewboard.asterisk.org/r/2873/ git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@400138 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- include/asterisk/stasis_message_router.h | 16 ++ main/astobj2.c | 29 +-- main/stasis.c | 26 +-- main/stasis_message_router.c | 232 +++++++++------------- main/taskprocessor.c | 4 - res/res_pjsip/include/res_pjsip_private.h | 1 + tests/test_stasis.c | 48 ----- 7 files changed, 132 insertions(+), 224 deletions(-) diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index b14868b4a5..3209adb167 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -100,6 +100,9 @@ int stasis_message_router_is_done(struct stasis_message_router *router); * updates for types not handled by routes added with * stasis_message_router_add_cache_update(). * + * Adding multiple routes for the same message type results in undefined + * behavior. + * * \param router Router to add the route to. * \param message_type Type of message to route. * \param callback Callback to forard messages of \a message_type to. @@ -121,6 +124,9 @@ int stasis_message_router_add(struct stasis_message_router *router, * These are distinct from regular routes, so one could have both a regular * route and a cache route for the same \a message_type. * + * Adding multiple routes for the same message type results in undefined + * behavior. + * * \param router Router to add the route to. * \param message_type Subtype of cache update to route. * \param callback Callback to forard messages of \a message_type to. @@ -138,6 +144,11 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router, /*! * \brief Remove a route from a message router. * + * If a route is removed from another thread, there is no notification that + * all messages using this route have been processed. This typically means that + * the associated \c data pointer for this route must be kept until the + * route itself is disposed of. + * * \param router Router to remove the route from. * \param message_type Type of message to route. * @@ -149,6 +160,11 @@ void stasis_message_router_remove(struct stasis_message_router *router, /*! * \brief Remove a cache route from a message router. * + * If a route is removed from another thread, there is no notification that + * all messages using this route have been processed. This typically means that + * the associated \c data pointer for this route must be kept until the + * route itself is disposed of. + * * \param router Router to remove the route from. * \param message_type Type of message to route. * diff --git a/main/astobj2.c b/main/astobj2.c index 88a6a6a234..88801bd2fe 100644 --- a/main/astobj2.c +++ b/main/astobj2.c @@ -478,38 +478,23 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li ast_atomic_fetchadd_int(&ao2.total_objects, -1); #endif + /* In case someone uses an object after it's been freed */ + obj->priv_data.magic = 0; + switch (obj->priv_data.options & AO2_ALLOC_OPT_LOCK_MASK) { case AO2_ALLOC_OPT_LOCK_MUTEX: obj_mutex = INTERNAL_OBJ_MUTEX(user_data); ast_mutex_destroy(&obj_mutex->mutex.lock); - /* - * For safety, zero-out the astobj2_lock header and also the - * first word of the user-data, which we make sure is always - * allocated. - */ - memset(obj_mutex, '\0', sizeof(*obj_mutex) + sizeof(void *) ); ast_free(obj_mutex); break; case AO2_ALLOC_OPT_LOCK_RWLOCK: obj_rwlock = INTERNAL_OBJ_RWLOCK(user_data); ast_rwlock_destroy(&obj_rwlock->rwlock.lock); - /* - * For safety, zero-out the astobj2_rwlock header and also the - * first word of the user-data, which we make sure is always - * allocated. - */ - memset(obj_rwlock, '\0', sizeof(*obj_rwlock) + sizeof(void *) ); ast_free(obj_rwlock); break; case AO2_ALLOC_OPT_LOCK_NOLOCK: - /* - * For safety, zero-out the astobj2 header and also the first - * word of the user-data, which we make sure is always - * allocated. - */ - memset(obj, '\0', sizeof(*obj) + sizeof(void *) ); ast_free(obj); break; default: @@ -575,14 +560,6 @@ static void *internal_ao2_alloc(size_t data_size, ao2_destructor_fn destructor_f struct astobj2_lock *obj_mutex; struct astobj2_rwlock *obj_rwlock; - if (data_size < sizeof(void *)) { - /* - * We always alloc at least the size of a void *, - * for debugging purposes. - */ - data_size = sizeof(void *); - } - switch (options & AO2_ALLOC_OPT_LOCK_MASK) { case AO2_ALLOC_OPT_LOCK_MUTEX: #if defined(__AST_DEBUG_MALLOC) diff --git a/main/stasis.c b/main/stasis.c index 1a03bb3d44..7c8c349209 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -467,23 +467,24 @@ struct dispatch { struct stasis_subscription *sub; }; -static void dispatch_dtor(void *data) +static void dispatch_dtor(struct dispatch *dispatch) { - struct dispatch *dispatch = data; ao2_cleanup(dispatch->topic); ao2_cleanup(dispatch->message); ao2_cleanup(dispatch->sub); + + ast_free(dispatch); } static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub) { - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + struct dispatch *dispatch; ast_assert(topic != NULL); ast_assert(message != NULL); ast_assert(sub != NULL); - dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor); + dispatch = ast_malloc(sizeof(*dispatch)); if (!dispatch) { return NULL; } @@ -497,7 +498,6 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi dispatch->sub = sub; ao2_ref(sub, +1); - ao2_ref(dispatch, +1); return dispatch; } @@ -508,9 +508,10 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi */ static int dispatch_exec(void *data) { - RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup); + struct dispatch *dispatch = data; subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message); + dispatch_dtor(dispatch); return 0; } @@ -534,20 +535,19 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu ast_assert(sub != NULL); if (sub->mailbox) { - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + struct dispatch *dispatch; dispatch = dispatch_create(publisher_topic, message, sub); if (!dispatch) { - ast_log(LOG_DEBUG, "Dropping dispatch\n"); + ast_log(LOG_ERROR, "Dropping dispatch\n"); break; } - if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { - /* Ownership transferred to mailbox. - * Don't increment ref, b/c the task processor - * may have already gotten rid of the object. + if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) { + /* Push failed; just delete the dispatch. */ - dispatch = NULL; + ast_log(LOG_ERROR, "Dropping dispatch\n"); + dispatch_dtor(dispatch); } } else { /* Dispatch directly */ diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 26d2f2c0c7..864cf42c47 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/astobj2.h" #include "asterisk/stasis_message_router.h" -/*! Number of hash buckets for the route table. Keep it prime! */ -#define ROUTE_TABLE_BUCKETS 7 - /*! \internal */ struct stasis_message_route { /*! Message type handle by this route. */ @@ -47,29 +44,79 @@ struct stasis_message_route { void *data; }; -static void route_dtor(void *obj) +struct route_table { + /*! Current number of entries in the route table */ + size_t current_size; + /*! Allocated number of entires in the route table */ + size_t max_size; + /*! The route table itself */ + struct stasis_message_route routes[]; +}; + +static struct stasis_message_route *table_find_route(struct route_table *table, + struct stasis_message_type *message_type) { - struct stasis_message_route *route = obj; + size_t idx; + + /* While a linear search for routes may seem very inefficient, most + * route tables have six routes or less. For such small data, it's + * hard to beat a linear search. If we start having larger route + * tables, then we can look into containers with more efficient + * lookups. + */ + for (idx = 0; idx < table->current_size; ++idx) { + if (table->routes[idx].message_type == message_type) { + return &table->routes[idx]; + } + } - ao2_cleanup(route->message_type); - route->message_type = NULL; + return NULL; } -static int route_hash(const void *obj, const int flags) +static int table_add_route(struct route_table **table_ptr, + struct stasis_message_type *message_type, + stasis_subscription_cb callback, void *data) { - const struct stasis_message_route *route = obj; - const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type; + struct route_table *table = *table_ptr; + struct stasis_message_route *route; + + ast_assert(table_find_route(table, message_type) == NULL); + + if (table->current_size + 1 > table->max_size) { + size_t new_max_size = table->max_size ? table->max_size * 2 : 1; + struct route_table *new_table = ast_realloc(table, + sizeof(*new_table) + + sizeof(new_table->routes[0]) * new_max_size); + if (!new_table) { + return -1; + } + *table_ptr = table = new_table; + table->max_size = new_max_size; + } - return ast_str_hash(stasis_message_type_name(message_type)); + route = &table->routes[table->current_size++]; + + route->message_type = ao2_bump(message_type); + route->callback = callback; + route->data = data; + + return 0; } -static int route_cmp(void *obj, void *arg, int flags) +static int table_remove_route(struct route_table *table, + struct stasis_message_type *message_type) { - const struct stasis_message_route *left = obj; - const struct stasis_message_route *right = arg; - const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type; - - return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0; + size_t idx; + + for (idx = 0; idx < table->current_size; ++idx) { + if (table->routes[idx].message_type == message_type) { + ao2_cleanup(message_type); + table->routes[idx] = + table->routes[--table->current_size]; + return 0; + } + } + return -1; } /*! \internal */ @@ -77,11 +124,11 @@ struct stasis_message_router { /*! Subscription to the upstream topic */ struct stasis_subscription *subscription; /*! Subscribed routes */ - struct ao2_container *routes; - /*! Subscribed routes for \ref stasi_cache_update messages */ - struct ao2_container *cache_routes; + struct route_table *routes; + /*! Subscribed routes for \ref stasis_cache_update messages */ + struct route_table *cache_routes; /*! Route of last resort */ - struct stasis_message_route *default_route; + struct stasis_message_route default_route; }; static void router_dtor(void *obj) @@ -92,49 +139,47 @@ static void router_dtor(void *obj) ast_assert(stasis_subscription_is_done(router->subscription)); router->subscription = NULL; - ao2_cleanup(router->routes); + ast_free(router->routes); router->routes = NULL; - ao2_cleanup(router->cache_routes); + ast_free(router->cache_routes); router->cache_routes = NULL; - - ao2_cleanup(router->default_route); - router->default_route = NULL; } -static struct stasis_message_route *find_route( +static int find_route( struct stasis_message_router *router, - struct stasis_message *message) + struct stasis_message *message, + struct stasis_message_route *route_out) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); + struct stasis_message_route *route = NULL; struct stasis_message_type *type = stasis_message_type(message); SCOPED_AO2LOCK(lock, router); + ast_assert(route_out != NULL); + if (type == stasis_cache_update_type()) { /* Find a cache route */ struct stasis_cache_update *update = stasis_message_data(message); - route = ao2_find(router->cache_routes, update->type, OBJ_KEY); + route = table_find_route(router->cache_routes, update->type); } if (route == NULL) { /* Find a regular route */ - route = ao2_find(router->routes, type, OBJ_KEY); + route = table_find_route(router->routes, type); } - if (route == NULL) { + if (route == NULL && router->default_route.callback) { /* Maybe the default route, then? */ - if ((route = router->default_route)) { - ao2_ref(route, +1); - } + route = &router->default_route; } - if (route == NULL) { - return NULL; + if (!route) { + return -1; } - ao2_ref(route, +1); - return route; + *route_out = *route; + return 0; } static void router_dispatch(void *data, @@ -143,15 +188,12 @@ static void router_dispatch(void *data, struct stasis_message *message) { struct stasis_message_router *router = data; - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); + struct stasis_message_route route; - route = find_route(router, message); - - if (route) { - route->callback(route->data, sub, topic, message); + if (find_route(router, message, &route) == 0) { + route.callback(route.data, sub, topic, message); } - if (stasis_subscription_final_message(sub, message)) { ao2_cleanup(router); } @@ -167,14 +209,12 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash, - route_cmp); + router->routes = ast_calloc(1, sizeof(*router->routes)); if (!router->routes) { return NULL; } - router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, - route_hash, route_cmp); + router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes)); if (!router->cache_routes) { return NULL; } @@ -216,100 +256,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router) return stasis_subscription_is_done(router->subscription); } - -static struct stasis_message_route *route_create( - struct stasis_message_type *message_type, - stasis_subscription_cb callback, - void *data) -{ - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = ao2_alloc(sizeof(*route), route_dtor); - if (!route) { - return NULL; - } - - if (message_type) { - ao2_ref(message_type, +1); - } - route->message_type = message_type; - route->callback = callback; - route->data = data; - - ao2_ref(route, +1); - return route; -} - -static int add_route(struct stasis_message_router *router, - struct stasis_message_route *route) -{ - RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, router); - - existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY); - - if (existing_route) { - ast_log(LOG_ERROR, "Cannot add route; route exists\n"); - return -1; - } - - ao2_link(router->routes, route); - return 0; -} - -static int add_cache_route(struct stasis_message_router *router, - struct stasis_message_route *route) -{ - RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, router); - - existing_route = ao2_find(router->cache_routes, route->message_type, - OBJ_KEY); - - if (existing_route) { - ast_log(LOG_ERROR, "Cannot add route; route exists\n"); - return -1; - } - - ao2_link(router->cache_routes, route); - return 0; -} - int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = route_create(message_type, callback, data); - if (!route) { - return -1; - } - - return add_route(router, route); + SCOPED_AO2LOCK(lock, router); + return table_add_route(&router->routes, message_type, callback, data); } int stasis_message_router_add_cache_update(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) { - RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup); - - route = route_create(message_type, callback, data); - if (!route) { - return -1; - } - - return add_cache_route(router, route); + SCOPED_AO2LOCK(lock, router); + return table_add_route(&router->cache_routes, message_type, callback, data); } void stasis_message_router_remove(struct stasis_message_router *router, struct stasis_message_type *message_type) { SCOPED_AO2LOCK(lock, router); - - ao2_find(router->routes, message_type, - OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); + table_remove_route(router->routes, message_type); } void stasis_message_router_remove_cache_update( @@ -317,9 +284,7 @@ void stasis_message_router_remove_cache_update( struct stasis_message_type *message_type) { SCOPED_AO2LOCK(lock, router); - - ao2_find(router->cache_routes, message_type, - OBJ_UNLINK | OBJ_NODATA | OBJ_KEY); + table_remove_route(router->cache_routes, message_type); } int stasis_message_router_set_default(struct stasis_message_router *router, @@ -327,7 +292,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router, void *data) { SCOPED_AO2LOCK(lock, router); - ao2_cleanup(router->default_route); - router->default_route = route_create(NULL, callback, data); - return router->default_route ? 0 : -1; + router->default_route.callback = callback; + router->default_route.data = data; + /* While this implementation can never fail, it used to be able to */ + return 0; } diff --git a/main/taskprocessor.c b/main/taskprocessor.c index a8d1c80f97..92bd64c328 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -431,10 +431,6 @@ static void tps_taskprocessor_destroy(void *tps) } ast_free((char *) t->name); if (t->listener) { - /* This code should not be reached since the listener - * should have been destroyed before the taskprocessor could - * be destroyed - */ ao2_ref(t->listener, -1); t->listener = NULL; } diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h index 2b7a82b3fe..c3e6c21924 100644 --- a/res/res_pjsip/include/res_pjsip_private.h +++ b/res/res_pjsip/include/res_pjsip_private.h @@ -9,6 +9,7 @@ #define RES_PJSIP_PRIVATE_H_ struct ao2_container; +struct ast_threadpool_options; /*! * \brief Initialize the configuration for res_pjsip diff --git a/tests/test_stasis.c b/tests/test_stasis.c index da9c50874e..498df94402 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -867,52 +867,6 @@ AST_TEST_DEFINE(cache_dump) return AST_TEST_PASS; } -AST_TEST_DEFINE(route_conflicts) -{ - RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); - RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); - RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); - RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); - int ret; - - switch (cmd) { - case TEST_INIT: - info->name = __func__; - info->category = test_category; - info->summary = - "Multiple routes to the same message_type should fail"; - info->description = - "Multiple routes to the same message_type should fail"; - return AST_TEST_NOT_RUN; - case TEST_EXECUTE: - break; - } - - topic = stasis_topic_create("TestTopic"); - ast_test_validate(test, NULL != topic); - - consumer1 = consumer_create(1); - ast_test_validate(test, NULL != consumer1); - consumer2 = consumer_create(1); - ast_test_validate(test, NULL != consumer2); - - test_message_type = stasis_message_type_create("TestMessage", NULL); - ast_test_validate(test, NULL != test_message_type); - - uut = stasis_message_router_create(topic); - ast_test_validate(test, NULL != uut); - - ret = stasis_message_router_add( - uut, test_message_type, consumer_exec, consumer1); - ast_test_validate(test, 0 == ret); - ret = stasis_message_router_add( - uut, test_message_type, consumer_exec, consumer2); - ast_test_validate(test, 0 != ret); - - return AST_TEST_PASS; -} - AST_TEST_DEFINE(router) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -1373,7 +1327,6 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache_filter); AST_TEST_UNREGISTER(cache); AST_TEST_UNREGISTER(cache_dump); - AST_TEST_UNREGISTER(route_conflicts); AST_TEST_UNREGISTER(router); AST_TEST_UNREGISTER(router_cache_updates); AST_TEST_UNREGISTER(interleaving); @@ -1397,7 +1350,6 @@ static int load_module(void) AST_TEST_REGISTER(cache_filter); AST_TEST_REGISTER(cache); AST_TEST_REGISTER(cache_dump); - AST_TEST_REGISTER(route_conflicts); AST_TEST_REGISTER(router); AST_TEST_REGISTER(router_cache_updates); AST_TEST_REGISTER(interleaving);