mirror of https://github.com/asterisk/asterisk
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
681 lines
17 KiB
681 lines
17 KiB
/*
|
|
* Asterisk -- An open source telephony toolkit.
|
|
*
|
|
* Copyright (C) 2013, Digium, Inc.
|
|
*
|
|
* David M. Lee, II <dlee@digium.com>
|
|
*
|
|
* See http://www.asterisk.org for more information about
|
|
* the Asterisk project. Please do not directly contact
|
|
* any of the maintainers of this project for assistance;
|
|
* the project provides a web site, mailing lists and IRC
|
|
* channels for your use.
|
|
*
|
|
* This program is free software, distributed under the terms of
|
|
* the GNU General Public License Version 2. See the LICENSE file
|
|
* at the top of the source tree.
|
|
*/
|
|
|
|
/*! \file
|
|
*
|
|
* \brief Stasis Message Bus API.
|
|
*
|
|
* \author David M. Lee, II <dlee@digium.com>
|
|
*/
|
|
|
|
/*** MODULEINFO
|
|
<support_level>core</support_level>
|
|
***/
|
|
|
|
#include "asterisk.h"
|
|
|
|
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
|
|
|
|
#include "asterisk/astobj2.h"
|
|
#include "asterisk/stasis_internal.h"
|
|
#include "asterisk/stasis.h"
|
|
#include "asterisk/threadpool.h"
|
|
#include "asterisk/taskprocessor.h"
|
|
#include "asterisk/utils.h"
|
|
#include "asterisk/uuid.h"
|
|
|
|
/*! Initial size of the subscribers list. */
|
|
#define INITIAL_SUBSCRIBERS_MAX 4
|
|
|
|
/*! The number of buckets to use for topic pools */
|
|
#define TOPIC_POOL_BUCKETS 57
|
|
|
|
/*! Threadpool for dispatching notifications to subscribers */
|
|
static struct ast_threadpool *pool;
|
|
|
|
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
|
|
|
|
/*! \internal */
|
|
struct stasis_topic {
|
|
char *name;
|
|
/*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
|
|
struct stasis_subscription **subscribers;
|
|
/*! Allocated length of the subscribers array */
|
|
size_t num_subscribers_max;
|
|
/*! Current size of the subscribers array */
|
|
size_t num_subscribers_current;
|
|
};
|
|
|
|
/* Forward declarations for the tightly-coupled subscription object */
|
|
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
|
|
|
|
static void topic_dtor(void *obj)
|
|
{
|
|
struct stasis_topic *topic = obj;
|
|
ast_free(topic->name);
|
|
topic->name = NULL;
|
|
ast_free(topic->subscribers);
|
|
topic->subscribers = NULL;
|
|
}
|
|
|
|
struct stasis_topic *stasis_topic_create(const char *name)
|
|
{
|
|
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
|
|
|
|
topic = ao2_alloc(sizeof(*topic), topic_dtor);
|
|
|
|
if (!topic) {
|
|
return NULL;
|
|
}
|
|
|
|
topic->name = ast_strdup(name);
|
|
if (!topic->name) {
|
|
return NULL;
|
|
}
|
|
|
|
topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
|
|
topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
|
|
if (!topic->subscribers) {
|
|
return NULL;
|
|
}
|
|
|
|
ao2_ref(topic, +1);
|
|
return topic;
|
|
}
|
|
|
|
const char *stasis_topic_name(const struct stasis_topic *topic)
|
|
{
|
|
return topic->name;
|
|
}
|
|
|
|
/*! \internal */
|
|
struct stasis_subscription {
|
|
/*! Unique ID for this subscription */
|
|
char uniqueid[AST_UUID_STR_LEN];
|
|
/*! Topic subscribed to. */
|
|
struct stasis_topic *topic;
|
|
/*! Mailbox for processing incoming messages. */
|
|
struct ast_taskprocessor *mailbox;
|
|
/*! Callback function for incoming message processing. */
|
|
stasis_subscription_cb callback;
|
|
/*! Data pointer to be handed to the callback. */
|
|
void *data;
|
|
|
|
/*! Lock for joining with subscription. */
|
|
ast_mutex_t join_lock;
|
|
/*! Condition for joining with subscription. */
|
|
ast_cond_t join_cond;
|
|
/*! Flag set when final message for sub has been received.
|
|
* Be sure join_lock is held before reading/setting. */
|
|
int final_message_rxed;
|
|
/*! Flag set when final message for sub has been processed.
|
|
* Be sure join_lock is held before reading/setting. */
|
|
int final_message_processed;
|
|
};
|
|
|
|
static void subscription_dtor(void *obj)
|
|
{
|
|
struct stasis_subscription *sub = obj;
|
|
ast_assert(!stasis_subscription_is_subscribed(sub));
|
|
ast_assert(stasis_subscription_is_done(sub));
|
|
ao2_cleanup(sub->topic);
|
|
sub->topic = NULL;
|
|
ast_taskprocessor_unreference(sub->mailbox);
|
|
sub->mailbox = NULL;
|
|
ast_mutex_destroy(&sub->join_lock);
|
|
ast_cond_destroy(&sub->join_cond);
|
|
}
|
|
|
|
/*!
|
|
* \brief Invoke the subscription's callback.
|
|
* \param sub Subscription to invoke.
|
|
* \param topic Topic message was published to.
|
|
* \param message Message to send.
|
|
*/
|
|
static void subscription_invoke(struct stasis_subscription *sub,
|
|
struct stasis_topic *topic,
|
|
struct stasis_message *message)
|
|
{
|
|
/* Notify that the final message has been received */
|
|
if (stasis_subscription_final_message(sub, message)) {
|
|
SCOPED_MUTEX(lock, &sub->join_lock);
|
|
sub->final_message_rxed = 1;
|
|
ast_cond_signal(&sub->join_cond);
|
|
}
|
|
|
|
/* Since sub is mostly immutable, no need to lock sub */
|
|
sub->callback(sub->data, sub, topic, message);
|
|
|
|
/* Notify that the final message has been processed */
|
|
if (stasis_subscription_final_message(sub, message)) {
|
|
SCOPED_MUTEX(lock, &sub->join_lock);
|
|
sub->final_message_processed = 1;
|
|
ast_cond_signal(&sub->join_cond);
|
|
}
|
|
}
|
|
|
|
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
|
|
|
|
struct stasis_subscription *internal_stasis_subscribe(
|
|
struct stasis_topic *topic,
|
|
stasis_subscription_cb callback,
|
|
void *data,
|
|
int needs_mailbox)
|
|
{
|
|
RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
|
|
|
|
sub = ao2_alloc(sizeof(*sub), subscription_dtor);
|
|
if (!sub) {
|
|
return NULL;
|
|
}
|
|
|
|
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
|
|
|
|
if (needs_mailbox) {
|
|
sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
|
|
if (!sub->mailbox) {
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
ao2_ref(topic, +1);
|
|
sub->topic = topic;
|
|
sub->callback = callback;
|
|
sub->data = data;
|
|
ast_mutex_init(&sub->join_lock);
|
|
ast_cond_init(&sub->join_cond, NULL);
|
|
|
|
if (topic_add_subscription(topic, sub) != 0) {
|
|
return NULL;
|
|
}
|
|
send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
|
|
|
|
ao2_ref(sub, +1);
|
|
return sub;
|
|
}
|
|
|
|
struct stasis_subscription *stasis_subscribe(
|
|
struct stasis_topic *topic,
|
|
stasis_subscription_cb callback,
|
|
void *data)
|
|
{
|
|
return internal_stasis_subscribe(topic, callback, data, 1);
|
|
}
|
|
|
|
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
|
|
{
|
|
if (sub) {
|
|
size_t i;
|
|
struct stasis_topic *topic = sub->topic;
|
|
SCOPED_AO2LOCK(lock_topic, topic);
|
|
|
|
for (i = 0; i < topic->num_subscribers_current; ++i) {
|
|
if (topic->subscribers[i] == sub) {
|
|
send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
|
|
/* swap [i] with last entry; remove last entry */
|
|
topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
|
|
/* Unsubscribing unrefs the subscription */
|
|
ao2_cleanup(sub);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
/*!
|
|
* \brief Block until the final message has been received on a subscription.
|
|
*
|
|
* \param subscription Subscription to wait on.
|
|
*/
|
|
void stasis_subscription_join(struct stasis_subscription *subscription)
|
|
{
|
|
if (subscription) {
|
|
SCOPED_MUTEX(lock, &subscription->join_lock);
|
|
/* Wait until the processed flag has been set */
|
|
while (!subscription->final_message_processed) {
|
|
ast_cond_wait(&subscription->join_cond,
|
|
&subscription->join_lock);
|
|
}
|
|
}
|
|
}
|
|
|
|
int stasis_subscription_is_done(struct stasis_subscription *subscription)
|
|
{
|
|
if (subscription) {
|
|
SCOPED_MUTEX(lock, &subscription->join_lock);
|
|
return subscription->final_message_rxed;
|
|
}
|
|
|
|
/* Null subscription is about as done as you can get */
|
|
return 1;
|
|
}
|
|
|
|
struct stasis_subscription *stasis_unsubscribe_and_join(
|
|
struct stasis_subscription *subscription)
|
|
{
|
|
if (!subscription) {
|
|
return NULL;
|
|
}
|
|
|
|
/* Bump refcount to hold it past the unsubscribe */
|
|
ao2_ref(subscription, +1);
|
|
stasis_unsubscribe(subscription);
|
|
stasis_subscription_join(subscription);
|
|
/* Now decrement the refcount back */
|
|
ao2_cleanup(subscription);
|
|
return NULL;
|
|
}
|
|
|
|
int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
|
|
{
|
|
if (sub) {
|
|
size_t i;
|
|
struct stasis_topic *topic = sub->topic;
|
|
SCOPED_AO2LOCK(lock_topic, topic);
|
|
|
|
for (i = 0; i < topic->num_subscribers_current; ++i) {
|
|
if (topic->subscribers[i] == sub) {
|
|
return 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
|
|
{
|
|
return sub->uniqueid;
|
|
}
|
|
|
|
int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
|
|
{
|
|
struct stasis_subscription_change *change;
|
|
if (stasis_message_type(msg) != stasis_subscription_change_type()) {
|
|
return 0;
|
|
}
|
|
|
|
change = stasis_message_data(msg);
|
|
if (strcmp("Unsubscribe", change->description)) {
|
|
return 0;
|
|
}
|
|
|
|
if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
|
|
return 0;
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
/*!
|
|
* \brief Add a subscriber to a topic.
|
|
* \param topic Topic
|
|
* \param sub Subscriber
|
|
* \return 0 on success
|
|
* \return Non-zero on error
|
|
*/
|
|
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
|
|
{
|
|
struct stasis_subscription **subscribers;
|
|
SCOPED_AO2LOCK(lock, topic);
|
|
|
|
/* Increase list size, if needed */
|
|
if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
|
|
subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
|
|
if (!subscribers) {
|
|
return -1;
|
|
}
|
|
topic->subscribers = subscribers;
|
|
topic->num_subscribers_max *= 2;
|
|
}
|
|
|
|
/* Don't ref sub here or we'll cause a reference cycle. */
|
|
topic->subscribers[topic->num_subscribers_current++] = sub;
|
|
return 0;
|
|
}
|
|
|
|
/*!
|
|
* \internal
|
|
* \brief Information needed to dispatch a message to a subscription
|
|
*/
|
|
struct dispatch {
|
|
/*! Topic message was published to */
|
|
struct stasis_topic *topic;
|
|
/*! The message itself */
|
|
struct stasis_message *message;
|
|
/*! Subscription receiving the message */
|
|
struct stasis_subscription *sub;
|
|
};
|
|
|
|
static void dispatch_dtor(void *data)
|
|
{
|
|
struct dispatch *dispatch = data;
|
|
ao2_cleanup(dispatch->topic);
|
|
ao2_cleanup(dispatch->message);
|
|
ao2_cleanup(dispatch->sub);
|
|
}
|
|
|
|
static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
|
|
{
|
|
RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
|
|
|
|
ast_assert(topic != NULL);
|
|
ast_assert(message != NULL);
|
|
ast_assert(sub != NULL);
|
|
|
|
dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
|
|
if (!dispatch) {
|
|
return NULL;
|
|
}
|
|
|
|
dispatch->topic = topic;
|
|
ao2_ref(topic, +1);
|
|
|
|
dispatch->message = message;
|
|
ao2_ref(message, +1);
|
|
|
|
dispatch->sub = sub;
|
|
ao2_ref(sub, +1);
|
|
|
|
ao2_ref(dispatch, +1);
|
|
return dispatch;
|
|
}
|
|
|
|
/*!
|
|
* \brief Dispatch a message to a subscriber
|
|
* \param data \ref dispatch object
|
|
* \return 0
|
|
*/
|
|
static int dispatch_exec(void *data)
|
|
{
|
|
RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
|
|
|
|
subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
|
|
|
|
return 0;
|
|
}
|
|
|
|
void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
|
|
{
|
|
size_t i;
|
|
SCOPED_AO2LOCK(lock, topic);
|
|
|
|
ast_assert(topic != NULL);
|
|
ast_assert(publisher_topic != NULL);
|
|
ast_assert(message != NULL);
|
|
|
|
for (i = 0; i < topic->num_subscribers_current; ++i) {
|
|
struct stasis_subscription *sub = topic->subscribers[i];
|
|
|
|
ast_assert(sub != NULL);
|
|
|
|
if (sub->mailbox) {
|
|
RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
|
|
|
|
dispatch = dispatch_create(publisher_topic, message, sub);
|
|
if (!dispatch) {
|
|
ast_log(LOG_DEBUG, "Dropping dispatch\n");
|
|
break;
|
|
}
|
|
|
|
if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
|
|
/* Ownership transferred to mailbox.
|
|
* Don't increment ref, b/c the task processor
|
|
* may have already gotten rid of the object.
|
|
*/
|
|
dispatch = NULL;
|
|
}
|
|
} else {
|
|
/* Dispatch directly */
|
|
subscription_invoke(sub, publisher_topic, message);
|
|
}
|
|
}
|
|
}
|
|
|
|
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
|
|
{
|
|
stasis_forward_message(topic, topic, message);
|
|
}
|
|
|
|
/*! \brief Forwarding subscriber */
|
|
static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
|
|
{
|
|
struct stasis_topic *to_topic = data;
|
|
stasis_forward_message(to_topic, topic, message);
|
|
|
|
if (stasis_subscription_final_message(sub, message)) {
|
|
ao2_cleanup(to_topic);
|
|
}
|
|
}
|
|
|
|
struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
|
|
{
|
|
struct stasis_subscription *sub;
|
|
if (!from_topic || !to_topic) {
|
|
return NULL;
|
|
}
|
|
|
|
/* Forwarding subscriptions should dispatch directly instead of having a
|
|
* mailbox. Otherwise, messages forwarded to the same topic from
|
|
* different topics may get reordered. Which is bad.
|
|
*/
|
|
sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
|
|
if (sub) {
|
|
/* hold a ref to to_topic for this forwarding subscription */
|
|
ao2_ref(to_topic, +1);
|
|
}
|
|
return sub;
|
|
}
|
|
|
|
static void subscription_change_dtor(void *obj)
|
|
{
|
|
struct stasis_subscription_change *change = obj;
|
|
ast_string_field_free_memory(change);
|
|
ao2_cleanup(change->topic);
|
|
}
|
|
|
|
static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
|
|
{
|
|
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
|
|
|
|
change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
|
|
if (ast_string_field_init(change, 128)) {
|
|
return NULL;
|
|
}
|
|
|
|
ast_string_field_set(change, uniqueid, uniqueid);
|
|
ast_string_field_set(change, description, description);
|
|
ao2_ref(topic, +1);
|
|
change->topic = topic;
|
|
|
|
ao2_ref(change, +1);
|
|
return change;
|
|
}
|
|
|
|
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
|
|
{
|
|
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
|
|
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
|
|
|
|
change = subscription_change_alloc(topic, uniqueid, description);
|
|
|
|
if (!change) {
|
|
return;
|
|
}
|
|
|
|
msg = stasis_message_create(stasis_subscription_change_type(), change);
|
|
|
|
if (!msg) {
|
|
return;
|
|
}
|
|
|
|
stasis_publish(topic, msg);
|
|
}
|
|
|
|
struct topic_pool_entry {
|
|
struct stasis_subscription *forward;
|
|
struct stasis_topic *topic;
|
|
};
|
|
|
|
static void topic_pool_entry_dtor(void *obj)
|
|
{
|
|
struct topic_pool_entry *entry = obj;
|
|
entry->forward = stasis_unsubscribe(entry->forward);
|
|
ao2_cleanup(entry->topic);
|
|
entry->topic = NULL;
|
|
}
|
|
|
|
static struct topic_pool_entry *topic_pool_entry_alloc(void)
|
|
{
|
|
return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
|
|
}
|
|
|
|
struct stasis_topic_pool {
|
|
struct ao2_container *pool_container;
|
|
struct stasis_topic *pool_topic;
|
|
};
|
|
|
|
static void topic_pool_dtor(void *obj)
|
|
{
|
|
struct stasis_topic_pool *pool = obj;
|
|
ao2_cleanup(pool->pool_container);
|
|
pool->pool_container = NULL;
|
|
ao2_cleanup(pool->pool_topic);
|
|
pool->pool_topic = NULL;
|
|
}
|
|
|
|
static int topic_pool_entry_hash(const void *obj, const int flags)
|
|
{
|
|
const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
|
|
return ast_str_case_hash(topic_name);
|
|
}
|
|
|
|
static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
|
|
{
|
|
struct topic_pool_entry *opt1 = obj, *opt2 = arg;
|
|
const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
|
|
return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
|
|
}
|
|
|
|
struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
|
|
{
|
|
RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
|
|
if (!pool) {
|
|
return NULL;
|
|
}
|
|
pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
|
|
ao2_ref(pooled_topic, +1);
|
|
pool->pool_topic = pooled_topic;
|
|
|
|
ao2_ref(pool, +1);
|
|
return pool;
|
|
}
|
|
|
|
struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
|
|
{
|
|
RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
|
|
SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
|
|
topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
|
|
|
|
if (topic_pool_entry) {
|
|
return topic_pool_entry->topic;
|
|
}
|
|
|
|
topic_pool_entry = topic_pool_entry_alloc();
|
|
|
|
if (!topic_pool_entry) {
|
|
return NULL;
|
|
}
|
|
|
|
topic_pool_entry->topic = stasis_topic_create(topic_name);
|
|
if (!topic_pool_entry->topic) {
|
|
return NULL;
|
|
}
|
|
|
|
topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
|
|
if (!topic_pool_entry->forward) {
|
|
return NULL;
|
|
}
|
|
|
|
ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
|
|
|
|
return topic_pool_entry->topic;
|
|
}
|
|
|
|
void stasis_log_bad_type_access(const char *name)
|
|
{
|
|
ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
|
|
}
|
|
|
|
/*! \brief Cleanup function */
|
|
static void stasis_exit(void)
|
|
{
|
|
ast_threadpool_shutdown(pool);
|
|
pool = NULL;
|
|
}
|
|
|
|
/*! \brief Cleanup function for graceful shutdowns */
|
|
static void stasis_cleanup(void)
|
|
{
|
|
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
|
|
}
|
|
|
|
int stasis_init(void)
|
|
{
|
|
int cache_init;
|
|
|
|
struct ast_threadpool_options opts;
|
|
|
|
/* Be sure the types are cleaned up after the message bus */
|
|
ast_register_cleanup(stasis_cleanup);
|
|
ast_register_atexit(stasis_exit);
|
|
|
|
if (stasis_config_init() != 0) {
|
|
ast_log(LOG_ERROR, "Stasis configuration failed\n");
|
|
return -1;
|
|
}
|
|
|
|
if (pool) {
|
|
ast_log(LOG_ERROR, "Stasis double-initialized\n");
|
|
return -1;
|
|
}
|
|
|
|
stasis_config_get_threadpool_options(&opts);
|
|
ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
|
|
opts.initial_size, opts.max_size, opts.idle_timeout);
|
|
pool = ast_threadpool_create("stasis-core", NULL, &opts);
|
|
if (!pool) {
|
|
ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
|
|
return -1;
|
|
}
|
|
|
|
cache_init = stasis_cache_init();
|
|
if (cache_init != 0) {
|
|
return -1;
|
|
}
|
|
|
|
if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|