/** * $Id$ * * Copyright (C) 2010 Elena-Ramona Modroiu (asipto.com) * * This file is part of Kamailio, a free SIP server. * * This file is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version * * This file is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include #include #include #include "../../dprint.h" #include "../../mem/mem.h" #include "../../mem/shm_mem.h" #include "../../parser/parse_param.h" #include "../../ut.h" #include "../../shm_init.h" #include "mqueue_api.h" /** * */ typedef struct _mq_item { str key; str val; struct _mq_item *prev; struct _mq_item *next; } mq_item_t; /** * */ typedef struct _mq_head { str name; int msize; int csize; gen_lock_t lock; mq_item_t *ifirst; mq_item_t *ilast; struct _mq_head *next; } mq_head_t; /** * */ typedef struct _mq_pv { str *name; mq_item_t *item; struct _mq_pv *next; } mq_pv_t; /** * */ static mq_head_t *_mq_head_list = NULL; /** * */ static mq_pv_t *_mq_pv_list = NULL; /** * */ int mq_head_defined(void) { if(_mq_head_list!=NULL) return 1; return 0; } /** * */ void mq_destroy(void) { mq_head_t *mh = NULL; mq_pv_t *mp = NULL; mq_item_t *mi = NULL; mq_head_t *mh1 = NULL; mq_pv_t *mp1 = NULL; mq_item_t *mi1 = NULL; mh = _mq_head_list; while(mh!=NULL) { mi = mh->ifirst; while(mi!=NULL) { mi1 = mi; mi = mi->next; shm_free(mi1); } mh1 = mh; mh = mh->next; lock_destroy(&mh1->lock); shm_free(mh1); } _mq_head_list = 0; mp = _mq_pv_list; while(mp!=NULL) { mp1 = mp; mp = mp->next; pkg_free(mp1); } } /** * */ int mq_head_add(str *name, int msize) { mq_head_t *mh = NULL; mq_pv_t *mp = NULL; int len; if(!shm_initialized()) { LM_ERR("shm not intialized - cannot define mqueue now\n"); return 0; } mh = _mq_head_list; while(mh!=NULL) { if(name->len == mh->name.len && strncmp(mh->name.s, name->s, name->len)==0) { LM_ERR("mqueue redefined: %.*s\n", name->len, name->s); return -1; } mh = mh->next; } mp = (mq_pv_t*)pkg_malloc(sizeof(mq_pv_t)); if(mp==NULL) { LM_ERR("no more pkg for: %.*s\n", name->len, name->s); return -1; } memset(mp, 0, sizeof(mq_pv_t)); len = sizeof(mq_head_t) + name->len + 1; mh = (mq_head_t*)shm_malloc(len); if(mh==NULL) { LM_ERR("no more shm for: %.*s\n", name->len, name->s); pkg_free(mp); return -1; } memset(mh, 0, len); if (lock_init(&mh->lock)==0 ) { LM_CRIT("failed to init lock\n"); pkg_free(mp); shm_free(mh); return -1; } mh->name.s = (char*)mh + sizeof(mq_head_t); memcpy(mh->name.s, name->s, name->len); mh->name.len = name->len; mh->name.s[name->len] = '\0'; mh->msize = msize; mh->next = _mq_head_list; _mq_head_list = mh; mp->name = &mh->name; mp->next = _mq_pv_list; _mq_pv_list = mp; return 0; } /** * */ mq_head_t *mq_head_get(str *name) { mq_head_t *mh = NULL; mh = _mq_head_list; while(mh!=NULL) { if(name->len == mh->name.len && strncmp(mh->name.s, name->s, name->len)==0) { return mh; } mh = mh->next; } return NULL; } /** * */ mq_pv_t *mq_pv_get(str *name) { mq_pv_t *mp = NULL; mp = _mq_pv_list; while(mp!=NULL) { if(mp->name->len==name->len && strncmp(mp->name->s, name->s, name->len)==0) return mp; mp = mp->next; } return NULL; } /** * */ int mq_head_fetch(str *name) { mq_head_t *mh = NULL; mq_pv_t *mp = NULL; mp = mq_pv_get(name); if(mp==NULL) return -1; if(mp->item!=NULL) { shm_free(mp->item); mp->item = NULL; } mh = mq_head_get(name); if(mh==NULL) return -1; lock_get(&mh->lock); if(mh->ifirst==NULL) { /* empty queue */ lock_release(&mh->lock); return -2; } mp->item = mh->ifirst; mh->ifirst = mh->ifirst->next; if(mh->ifirst==NULL) { mh->ilast = NULL; } else { mh->ifirst->prev = NULL; } mh->csize--; lock_release(&mh->lock); return 0; } /** * */ void mq_pv_free(str *name) { mq_pv_t *mp = NULL; mp = mq_pv_get(name); if(mp==NULL) return; if(mp->item!=NULL) { shm_free(mp->item); mp->item = NULL; } } /** * */ int mq_item_add(str *qname, str *key, str *val) { mq_head_t *mh = NULL; mq_item_t *mi = NULL; int len; mh = mq_head_get(qname); if(mh==NULL) { LM_ERR("mqueue not found: %.*s\n", qname->len, qname->s); return -1; } len = sizeof(mq_item_t) + key->len + val->len + 2; mi = (mq_item_t*)shm_malloc(len); if(mi==NULL) { LM_ERR("no more shm to add to: %.*s\n", qname->len, qname->s); return -1; } memset(mi, 0, len); mi->key.s = (char*)mi + sizeof(mq_item_t); memcpy(mi->key.s, key->s, key->len); mi->key.len = key->len; mi->key.s[key->len] = '\0'; mi->val.s = mi->key.s + mi->key.len + 1; memcpy(mi->val.s, val->s, val->len); mi->val.len = val->len; mi->val.s[val->len] = '\0'; lock_get(&mh->lock); if(mh->ifirst==NULL) { mh->ifirst = mi; mh->ilast = mi; } else { mh->ilast->next = mi; mi->prev = mh->ilast; mh->ilast = mi; } mh->csize++; if(mh->msize>0 && mh->csize>mh->msize) { mi = mh->ifirst; mh->ifirst = mh->ifirst->next; if(mh->ifirst==NULL) mh->ilast = NULL; else mh->ifirst->prev = NULL; mh->csize--; shm_free(mi); } lock_release(&mh->lock); return 0; } /** * */ int pv_parse_mqk_name(pv_spec_t *sp, str *in) { mq_head_t *mh = NULL; mh = mq_head_get(in); if(mh==NULL) { LM_ERR("mqueue not found: %.*s\n", in->len, in->s); return -1; } sp->pvp.pvn.u.isname.name.s = *in; sp->pvp.pvn.type = PV_NAME_INTSTR; sp->pvp.pvn.u.isname.type = 1; return 0; } /** * */ int pv_get_mqk(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) { mq_pv_t *mp = NULL; mp = mq_pv_get(¶m->pvn.u.isname.name.s); if(mp==NULL || mp->item==NULL || mp->item->key.len<=0) return pv_get_null(msg, param, res); return pv_get_strval(msg, param, res, &mp->item->key); } /** * */ int pv_parse_mqv_name(pv_spec_t *sp, str *in) { mq_head_t *mh = NULL; mh = mq_head_get(in); if(mh==NULL) { LM_ERR("mqueue not found: %.*s\n", in->len, in->s); return -1; } sp->pvp.pvn.u.isname.name.s = *in; sp->pvp.pvn.type = PV_NAME_INTSTR; sp->pvp.pvn.u.isname.type = 1; return 0; } /** * */ int pv_get_mqv(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) { mq_pv_t *mp = NULL; mp = mq_pv_get(¶m->pvn.u.isname.name.s); if(mp==NULL || mp->item==NULL || mp->item->val.len<=0) return pv_get_null(msg, param, res); return pv_get_strval(msg, param, res, &mp->item->val); }