mirror of https://github.com/sipwise/kamailio.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
379 lines
7.8 KiB
379 lines
7.8 KiB
/*
|
|
* $Id$
|
|
*
|
|
* dmq module - distributed message queue
|
|
*
|
|
* Copyright (C) 2011 Bucur Marius - Ovidiu
|
|
*
|
|
* This file is part of Kamailio, a free SIP server.
|
|
*
|
|
* Kamailio is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 2 of the License, or
|
|
* (at your option) any later version
|
|
*
|
|
* Kamailio is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program; if not, write to the Free Software
|
|
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
*
|
|
*/
|
|
|
|
#include "../../ut.h"
|
|
#include "dmqnode.h"
|
|
#include "dmq.h"
|
|
|
|
dmq_node_t* self_node;
|
|
dmq_node_t* notification_node;
|
|
|
|
/* name */
|
|
str dmq_node_status_str = str_init("status");
|
|
/* possible values */
|
|
str dmq_node_active_str = str_init("active");
|
|
str dmq_node_disabled_str = str_init("disabled");
|
|
str dmq_node_timeout_str = str_init("timeout");
|
|
|
|
/**
|
|
* @brief get the string status of the node
|
|
*/
|
|
str* get_status_str(int status)
|
|
{
|
|
switch(status) {
|
|
case DMQ_NODE_ACTIVE: {
|
|
return &dmq_node_active_str;
|
|
}
|
|
case DMQ_NODE_DISABLED: {
|
|
return &dmq_node_disabled_str;
|
|
}
|
|
case DMQ_NODE_TIMEOUT: {
|
|
return &dmq_node_timeout_str;
|
|
}
|
|
default: {
|
|
return 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief initialize dmg node list
|
|
*/
|
|
dmq_node_list_t* init_dmq_node_list()
|
|
{
|
|
dmq_node_list_t* node_list;
|
|
node_list = shm_malloc(sizeof(dmq_node_list_t));
|
|
if(node_list==NULL) {
|
|
LM_ERR("no more shm\n");
|
|
return NULL;
|
|
}
|
|
memset(node_list, 0, sizeof(dmq_node_list_t));
|
|
lock_init(&node_list->lock);
|
|
return node_list;
|
|
}
|
|
|
|
/**
|
|
* @brief compare dmq node addresses
|
|
*/
|
|
int cmp_dmq_node(dmq_node_t* node, dmq_node_t* cmpnode)
|
|
{
|
|
if(!node || !cmpnode) {
|
|
LM_ERR("cmp_dmq_node - null node received\n");
|
|
return -1;
|
|
}
|
|
return STR_EQ(node->uri.host, cmpnode->uri.host) &&
|
|
STR_EQ(node->uri.port, cmpnode->uri.port);
|
|
}
|
|
|
|
/**
|
|
* @brief get the value of a parameter
|
|
*/
|
|
str* get_param_value(param_t* params, str* param)
|
|
{
|
|
while (params) {
|
|
if ((params->name.len == param->len) &&
|
|
(strncmp(params->name.s, param->s, param->len) == 0)) {
|
|
return ¶ms->body;
|
|
}
|
|
params = params->next;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
/**
|
|
* @brief set the parameters for the node
|
|
*/
|
|
int set_dmq_node_params(dmq_node_t* node, param_t* params)
|
|
{
|
|
str* status;
|
|
if(!params) {
|
|
LM_DBG("no parameters given\n");
|
|
return 0;
|
|
}
|
|
status = get_param_value(params, &dmq_node_status_str);
|
|
if(status) {
|
|
if(str_strcmp(status, &dmq_node_active_str)) {
|
|
node->status = DMQ_NODE_ACTIVE;
|
|
} else if(str_strcmp(status, &dmq_node_timeout_str)) {
|
|
node->status = DMQ_NODE_ACTIVE;
|
|
} else if(str_strcmp(status, &dmq_node_disabled_str)) {
|
|
node->status = DMQ_NODE_ACTIVE;
|
|
} else {
|
|
LM_ERR("invalid status parameter: %.*s\n", STR_FMT(status));
|
|
goto error;
|
|
}
|
|
}
|
|
return 0;
|
|
error:
|
|
return -1;
|
|
}
|
|
|
|
/**
|
|
* @brief set default node params
|
|
*/
|
|
int set_default_dmq_node_params(dmq_node_t* node)
|
|
{
|
|
node->status = DMQ_NODE_ACTIVE;
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* @brief build a dmq node
|
|
*/
|
|
dmq_node_t* build_dmq_node(str* uri, int shm) {
|
|
dmq_node_t* ret = NULL;
|
|
param_hooks_t hooks;
|
|
param_t* params;
|
|
|
|
LM_DBG("build_dmq_node %.*s with %s memory\n", STR_FMT(uri), shm?"shm":"private");
|
|
|
|
if(shm) {
|
|
ret = shm_malloc(sizeof(dmq_node_t));
|
|
if(ret==NULL) {
|
|
LM_ERR("no more shm\n");
|
|
goto error;
|
|
}
|
|
memset(ret, 0, sizeof(dmq_node_t));
|
|
if(shm_str_dup(&ret->orig_uri, uri)<0) {
|
|
goto error;
|
|
}
|
|
} else {
|
|
ret = pkg_malloc(sizeof(dmq_node_t));
|
|
if(ret==NULL) {
|
|
LM_ERR("no more pkg\n");
|
|
goto error;
|
|
}
|
|
memset(ret, 0, sizeof(dmq_node_t));
|
|
if(pkg_str_dup(&ret->orig_uri, uri)<0) {
|
|
goto error;
|
|
}
|
|
}
|
|
set_default_dmq_node_params(ret);
|
|
if(parse_uri(ret->orig_uri.s, ret->orig_uri.len, &ret->uri) < 0) {
|
|
LM_ERR("error parsing uri\n");
|
|
goto error;
|
|
}
|
|
/* if any parameters found, parse them */
|
|
if(parse_params(&ret->uri.params, CLASS_ANY, &hooks, ¶ms) < 0) {
|
|
LM_ERR("error parsing params\n");
|
|
goto error;
|
|
}
|
|
/* if any params found */
|
|
if(params) {
|
|
if(shm) {
|
|
if(shm_duplicate_params(&ret->params, params) < 0) {
|
|
LM_ERR("error duplicating params\n");
|
|
free_params(params);
|
|
goto error;
|
|
}
|
|
free_params(params);
|
|
} else {
|
|
ret->params = params;
|
|
}
|
|
if(set_dmq_node_params(ret, ret->params) < 0) {
|
|
LM_ERR("error setting parameters\n");
|
|
goto error;
|
|
}
|
|
} else {
|
|
LM_DBG("no dmqnode params found\n");
|
|
}
|
|
return ret;
|
|
|
|
error:
|
|
if(ret!=NULL) {
|
|
/* tbd: free uri and params */
|
|
if(shm) {
|
|
shm_free(ret);
|
|
} else {
|
|
pkg_free(ret);
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
/**
|
|
* @brief find dmq node by uri
|
|
*/
|
|
dmq_node_t* find_dmq_node_uri(dmq_node_list_t* list, str* uri)
|
|
{
|
|
dmq_node_t *ret, *find;
|
|
find = build_dmq_node(uri, 0);
|
|
if(find==NULL)
|
|
return NULL;
|
|
ret = find_dmq_node(list, find);
|
|
destroy_dmq_node(find, 0);
|
|
return ret;
|
|
}
|
|
|
|
/**
|
|
* @brief destroy dmq node
|
|
*/
|
|
void destroy_dmq_node(dmq_node_t* node, int shm)
|
|
{
|
|
/* tbd: check inner fields */
|
|
if(shm) {
|
|
shm_free_node(node);
|
|
} else {
|
|
pkg_free_node(node);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief find dmq node
|
|
*/
|
|
dmq_node_t* find_dmq_node(dmq_node_list_t* list, dmq_node_t* node)
|
|
{
|
|
dmq_node_t* cur = list->nodes;
|
|
while(cur) {
|
|
if(cmp_dmq_node(cur, node)) {
|
|
return cur;
|
|
}
|
|
cur = cur->next;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
/**
|
|
* @brief duplicate dmq node
|
|
*/
|
|
dmq_node_t* shm_dup_node(dmq_node_t* node)
|
|
{
|
|
dmq_node_t* newnode;
|
|
newnode = shm_malloc(sizeof(dmq_node_t));
|
|
if(newnode==NULL) {
|
|
LM_ERR("no more shm\n");
|
|
return NULL;
|
|
}
|
|
memcpy(newnode, node, sizeof(dmq_node_t));
|
|
newnode->orig_uri.s = NULL;
|
|
if(shm_str_dup(&newnode->orig_uri, &node->orig_uri)<0) {
|
|
goto error;
|
|
}
|
|
if(parse_uri(newnode->orig_uri.s, newnode->orig_uri.len,
|
|
&newnode->uri) < 0) {
|
|
LM_ERR("error in parsing node uri\n");
|
|
goto error;
|
|
}
|
|
return newnode;
|
|
error:
|
|
if(newnode->orig_uri.s!=NULL)
|
|
shm_free(newnode->orig_uri.s);
|
|
shm_free(newnode);
|
|
return NULL;
|
|
}
|
|
|
|
/**
|
|
* @brief free shm dmq node
|
|
*/
|
|
void shm_free_node(dmq_node_t* node)
|
|
{
|
|
shm_free(node->orig_uri.s);
|
|
shm_free(node);
|
|
}
|
|
|
|
/**
|
|
* @brief free pkg dmq node
|
|
*/
|
|
void pkg_free_node(dmq_node_t* node)
|
|
{
|
|
pkg_free(node->orig_uri.s);
|
|
pkg_free(node);
|
|
}
|
|
|
|
/**
|
|
* @brief delete dmq node
|
|
*/
|
|
int del_dmq_node(dmq_node_list_t* list, dmq_node_t* node)
|
|
{
|
|
dmq_node_t *cur, **prev;
|
|
lock_get(&list->lock);
|
|
cur = list->nodes;
|
|
prev = &list->nodes;
|
|
while(cur) {
|
|
if(cmp_dmq_node(cur, node)) {
|
|
*prev = cur->next;
|
|
shm_free_node(cur);
|
|
lock_release(&list->lock);
|
|
return 1;
|
|
}
|
|
prev = &cur->next;
|
|
cur = cur->next;
|
|
}
|
|
lock_release(&list->lock);
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* @brief add dmq node
|
|
*/
|
|
dmq_node_t* add_dmq_node(dmq_node_list_t* list, str* uri)
|
|
{
|
|
dmq_node_t* newnode;
|
|
|
|
newnode = build_dmq_node(uri, 1);
|
|
if(!newnode) {
|
|
LM_ERR("error creating node\n");
|
|
goto error;
|
|
}
|
|
LM_DBG("dmq node successfully created\n");
|
|
lock_get(&list->lock);
|
|
newnode->next = list->nodes;
|
|
list->nodes = newnode;
|
|
list->count++;
|
|
lock_release(&list->lock);
|
|
return newnode;
|
|
error:
|
|
return NULL;
|
|
}
|
|
|
|
/**
|
|
* @brief build dmq node string
|
|
*/
|
|
int build_node_str(dmq_node_t* node, char* buf, int buflen) {
|
|
/* sip:host:port;status=[status] */
|
|
int len = 0;
|
|
if(buflen < node->orig_uri.len + 32) {
|
|
LM_ERR("no more space left for node string\n");
|
|
return -1;
|
|
}
|
|
memcpy(buf + len, "sip:", 4);
|
|
len += 4;
|
|
memcpy(buf + len, node->uri.host.s, node->uri.host.len);
|
|
len += node->uri.host.len;
|
|
memcpy(buf + len, ":", 1);
|
|
len += 1;
|
|
memcpy(buf + len, node->uri.port.s, node->uri.port.len);
|
|
len += node->uri.port.len;
|
|
memcpy(buf + len, ";", 1);
|
|
len += 1;
|
|
memcpy(buf + len, "status=", 7);
|
|
len += 7;
|
|
memcpy(buf + len, get_status_str(node->status)->s,
|
|
get_status_str(node->status)->len);
|
|
len += get_status_str(node->status)->len;
|
|
return len;
|
|
}
|
|
|