|
|
|
@ -372,6 +372,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
|
|
|
|
|
return topic->name;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
|
|
|
|
|
{
|
|
|
|
|
return AST_VECTOR_SIZE(&topic->subscribers);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*! \internal */
|
|
|
|
|
struct stasis_subscription {
|
|
|
|
|
/*! Unique ID for this subscription */
|
|
|
|
@ -393,6 +398,11 @@ struct stasis_subscription {
|
|
|
|
|
/*! Flag set when final message for sub has been processed.
|
|
|
|
|
* Be sure join_lock is held before reading/setting. */
|
|
|
|
|
int final_message_processed;
|
|
|
|
|
|
|
|
|
|
/*! The message types this subscription is accepting */
|
|
|
|
|
AST_VECTOR(, char) accepted_message_types;
|
|
|
|
|
/*! The message filter currently in use */
|
|
|
|
|
enum stasis_subscription_message_filter filter;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
static void subscription_dtor(void *obj)
|
|
|
|
@ -411,6 +421,8 @@ static void subscription_dtor(void *obj)
|
|
|
|
|
ast_taskprocessor_unreference(sub->mailbox);
|
|
|
|
|
sub->mailbox = NULL;
|
|
|
|
|
ast_cond_destroy(&sub->join_cond);
|
|
|
|
|
|
|
|
|
|
AST_VECTOR_FREE(&sub->accepted_message_types);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
@ -422,19 +434,25 @@ static void subscription_dtor(void *obj)
|
|
|
|
|
static void subscription_invoke(struct stasis_subscription *sub,
|
|
|
|
|
struct stasis_message *message)
|
|
|
|
|
{
|
|
|
|
|
unsigned int final = stasis_subscription_final_message(sub, message);
|
|
|
|
|
int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
|
|
|
|
|
|
|
|
|
|
/* Notify that the final message has been received */
|
|
|
|
|
if (stasis_subscription_final_message(sub, message)) {
|
|
|
|
|
if (final) {
|
|
|
|
|
ao2_lock(sub);
|
|
|
|
|
sub->final_message_rxed = 1;
|
|
|
|
|
ast_cond_signal(&sub->join_cond);
|
|
|
|
|
ao2_unlock(sub);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Since sub is mostly immutable, no need to lock sub */
|
|
|
|
|
sub->callback(sub->data, sub, message);
|
|
|
|
|
if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
|
|
|
|
|
(message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
|
|
|
|
|
/* Since sub is mostly immutable, no need to lock sub */
|
|
|
|
|
sub->callback(sub->data, sub, message);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Notify that the final message has been processed */
|
|
|
|
|
if (stasis_subscription_final_message(sub, message)) {
|
|
|
|
|
if (final) {
|
|
|
|
|
ao2_lock(sub);
|
|
|
|
|
sub->final_message_processed = 1;
|
|
|
|
|
ast_cond_signal(&sub->join_cond);
|
|
|
|
@ -502,6 +520,8 @@ struct stasis_subscription *internal_stasis_subscribe(
|
|
|
|
|
sub->callback = callback;
|
|
|
|
|
sub->data = data;
|
|
|
|
|
ast_cond_init(&sub->join_cond, NULL);
|
|
|
|
|
sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
|
|
|
|
|
AST_VECTOR_INIT(&sub->accepted_message_types, 0);
|
|
|
|
|
|
|
|
|
|
if (topic_add_subscription(topic, sub) != 0) {
|
|
|
|
|
ao2_ref(sub, -1);
|
|
|
|
@ -588,6 +608,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
|
|
|
|
|
const struct stasis_message_type *type)
|
|
|
|
|
{
|
|
|
|
|
if (!subscription) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_assert(type != NULL);
|
|
|
|
|
ast_assert(stasis_message_type_name(type) != NULL);
|
|
|
|
|
|
|
|
|
|
if (!type || !stasis_message_type_name(type)) {
|
|
|
|
|
/* Filtering is unreliable as this message type is not yet initialized
|
|
|
|
|
* so force all messages through.
|
|
|
|
|
*/
|
|
|
|
|
subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ao2_lock(subscription->topic);
|
|
|
|
|
if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
|
|
|
|
|
/* We do this for the same reason as above. The subscription can still operate, so allow
|
|
|
|
|
* it to do so by forcing all messages through.
|
|
|
|
|
*/
|
|
|
|
|
subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
|
|
|
|
|
}
|
|
|
|
|
ao2_unlock(subscription->topic);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
|
|
|
|
|
const struct stasis_message_type *type)
|
|
|
|
|
{
|
|
|
|
|
if (!subscription) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_assert(type != NULL);
|
|
|
|
|
ast_assert(stasis_message_type_name(type) != NULL);
|
|
|
|
|
|
|
|
|
|
if (!type || !stasis_message_type_name(type)) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ao2_lock(subscription->topic);
|
|
|
|
|
if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
|
|
|
|
|
/* The memory is already allocated so this can't fail */
|
|
|
|
|
AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
|
|
|
|
|
}
|
|
|
|
|
ao2_unlock(subscription->topic);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int stasis_subscription_set_filter(struct stasis_subscription *subscription,
|
|
|
|
|
enum stasis_subscription_message_filter filter)
|
|
|
|
|
{
|
|
|
|
|
if (!subscription) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ao2_lock(subscription->topic);
|
|
|
|
|
if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
|
|
|
|
|
subscription->filter = filter;
|
|
|
|
|
}
|
|
|
|
|
ao2_unlock(subscription->topic);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void stasis_subscription_join(struct stasis_subscription *subscription)
|
|
|
|
|
{
|
|
|
|
|
if (subscription) {
|
|
|
|
@ -783,6 +873,18 @@ static void dispatch_message(struct stasis_subscription *sub,
|
|
|
|
|
struct stasis_message *message,
|
|
|
|
|
int synchronous)
|
|
|
|
|
{
|
|
|
|
|
/* Determine if this subscription is interested in this message. Note that final
|
|
|
|
|
* messages are special and are always invoked on the subscription.
|
|
|
|
|
*/
|
|
|
|
|
if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
|
|
|
|
|
int message_type_id = stasis_message_type_id(stasis_message_type(message));
|
|
|
|
|
if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
|
|
|
|
|
!AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
|
|
|
|
|
!stasis_subscription_final_message(sub, message)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!sub->mailbox) {
|
|
|
|
|
/* Dispatch directly */
|
|
|
|
|
subscription_invoke(sub, message);
|
|
|
|
@ -842,6 +944,11 @@ static void publish_msg(struct stasis_topic *topic,
|
|
|
|
|
ast_assert(topic != NULL);
|
|
|
|
|
ast_assert(message != NULL);
|
|
|
|
|
|
|
|
|
|
/* If there are no subscribers don't bother */
|
|
|
|
|
if (!stasis_topic_subscribers(topic)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* The topic may be unref'ed by the subscription invocation.
|
|
|
|
|
* Make sure we hold onto a reference while dispatching.
|
|
|
|
|