initial support for better call/server monitoring

git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@1259 8eb893ce-cfd4-0310-b710-fb5ebe64c474
sayer/1.4-spce2.6
Stefan Sayer 17 years ago
parent f366f0f922
commit 43a387c392

@ -9,7 +9,7 @@ SVN_REV?=r$(shell svnversion -n .)
VERSION = 1
PATCHLEVEL = 1
SUBLEVEL = 0
EXTRAVERSION ?= -pre-$(SVN_REV)
EXTRAVERSION ?= -dev-$(SVN_REV)
REL_VERSION=$(VERSION).$(PATCHLEVEL).$(SUBLEVEL)
RELEASE=$(REL_VERSION)$(EXTRAVERSION)
@ -49,6 +49,11 @@ CPPFLAGS += -D_DEBUG \
# e.g. python modules:
#exclude_modules ?= py_sems ivr mailbox pin_collect conf_auth mp3 examples
# build in support for monitoring?
#
#
#USE_MONITORING=yes
LDFLAGS += -lm
OS = $(shell uname -s | sed -e s/SunOS/solaris/ | tr "[A-Z]" "[a-z]")
@ -69,6 +74,10 @@ CPPFLAGS += -DUSE_LIBSAMPLERATE
endif
endif
ifdef USE_MONITORING
CPPFLAGS += -DUSE_MONITORING
endif
# Additions for Solaris support.
ifeq ($(OS),solaris)
GETARCH=uname -p

@ -0,0 +1,7 @@
plug_in_name = monitoring
module_ldflags =
module_cflags = -DMOD_NAME=\"$(plug_in_name)\"
COREPATH ?=../../core
include $(COREPATH)/plug-in/Makefile.app_module

@ -0,0 +1,244 @@
/*
* $Id$
*
* Copyright (C) 2009 IPTEGO GmbH
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "Monitoring.h"
#include "log.h"
//EXPORT_PLUGIN_CLASS_FACTORY(Monitor, MOD_NAME);
extern "C" void* plugin_class_create()
{
Monitor* m_inst = Monitor::instance();
assert(dynamic_cast<AmDynInvokeFactory*>(m_inst));
return m_inst;
}
Monitor* Monitor::_instance=0;
Monitor* Monitor::instance()
{
if(_instance == NULL)
_instance = new Monitor(MOD_NAME);
return _instance;
}
Monitor::Monitor(const string& name)
: AmDynInvokeFactory(MOD_NAME) {
}
Monitor::~Monitor() {
}
int Monitor::onLoad() {
// todo: if GC configured, start thread
return 0;
}
void Monitor::invoke(const string& method,
const AmArg& args, AmArg& ret) {
if(method == "log"){
log(args,ret);
} else if(method == "logAdd"){
logAdd(args,ret);
} else if(method == "markFinished"){
markFinished(args,ret);
} else if(method == "clear"){
clear(args,ret);
} else if(method == "clearFinished"){
clearFinished(args,ret);
} else if(method == "erase"){
clear(args,ret);
} else if(method == "get"){
get(args,ret);
} else if(method == "list"){
list(args,ret);
} else if(method == "listFinished"){
listFinished(args,ret);
} else if(method == "listUnfinished"){
listUnfinished(args,ret);
} else if(method == "_list"){
ret.push(AmArg("log"));
ret.push(AmArg("logAdd"));
ret.push(AmArg("markFinished"));
ret.push(AmArg("erase"));
ret.push(AmArg("clear"));
ret.push(AmArg("clearFinished"));
ret.push(AmArg("get"));
ret.push(AmArg("list"));
ret.push(AmArg("listFinished"));
ret.push(AmArg("listUnfinished"));
} else
throw AmDynInvoke::NotImplemented(method);
}
void Monitor::log(const AmArg& args, AmArg& ret) {
assertArgCStr(args[0]);
assertArgCStr(args[1]);
LogBucket& bucket = getLogBucket(args[0].asCStr());
bucket.log_lock.lock();
try {
for (size_t i=1;i<args.size();i+=2)
bucket.log[args[0].asCStr()].info[args[i].asCStr()]=AmArg(args[i+1]);
} catch (...) {
bucket.log_lock.unlock();
ret.push(-1);
ret.push("ERROR while converting value");
throw;
}
bucket.log_lock.unlock();
ret.push(0);
ret.push("OK");
}
void Monitor::logAdd(const AmArg& args, AmArg& ret) {
assertArgCStr(args[0]);
assertArgCStr(args[1]);
LogBucket& bucket = getLogBucket(args[0].asCStr());
bucket.log_lock.lock();
try {
bucket.log[args[0].asCStr()].info[args[1].asCStr()].push(AmArg(args[2]));
} catch (...) {
ret.push(-1);
ret.push("ERROR while converting value");
bucket.log_lock.unlock();
throw;
}
ret.push(0);
ret.push("OK");
bucket.log_lock.unlock();
}
void Monitor::markFinished(const AmArg& args, AmArg& ret) {
assertArgCStr(args[0]);
LogBucket& bucket = getLogBucket(args[0].asCStr());
bucket.log_lock.lock();
if (!bucket.log[args[0].asCStr()].finished)
bucket.log[args[0].asCStr()].finished = time(0);
bucket.log_lock.unlock();
ret.push(0);
ret.push("OK");
}
void Monitor::erase(const AmArg& args, AmArg& ret) {
assertArgCStr(args[0]);
LogBucket& bucket = getLogBucket(args[0].asCStr());
bucket.log_lock.lock();
bucket.log.erase(args[0].asCStr());
bucket.log_lock.unlock();
ret.push(0);
ret.push("OK");
}
void Monitor::clear(const AmArg& args, AmArg& ret) {
for (int i=0;i<NUM_LOG_BUCKETS;i++) {
logs[i].log_lock.lock();
logs[i].log.clear();
logs[i].log_lock.unlock();
}
ret.push(0);
ret.push("OK");
}
void Monitor::clearFinished(const AmArg& args, AmArg& ret) {
for (int i=0;i<NUM_LOG_BUCKETS;i++) {
logs[i].log_lock.lock();
std::map<string, LogInfo>::iterator it=
logs[i].log.begin();
while (it != logs[i].log.end()) {
if (it->second.finished > 0) {
std::map<string, LogInfo>::iterator d_it = it;
it++;
logs[i].log.erase(d_it);
} else {
it++;
}
}
logs[i].log_lock.unlock();
}
ret.push(0);
ret.push("OK");
}
void Monitor::get(const AmArg& args, AmArg& ret) {
assertArgCStr(args[0]);
LogBucket& bucket = getLogBucket(args[0].asCStr());
bucket.log_lock.lock();
std::map<string, LogInfo>::iterator it=bucket.log.find(args[0].asCStr());
if (it!=bucket.log.end())
ret.push(it->second.info);
bucket.log_lock.unlock();
}
void Monitor::list(const AmArg& args, AmArg& ret) {
for (int i=0;i<NUM_LOG_BUCKETS;i++) {
logs[i].log_lock.lock();
for (std::map<string, LogInfo>::iterator it=
logs[i].log.begin(); it != logs[i].log.end(); it++) {
ret.push(AmArg(it->first.c_str()));
}
logs[i].log_lock.unlock();
}
}
void Monitor::listFinished(const AmArg& args, AmArg& ret) {
for (int i=0;i<NUM_LOG_BUCKETS;i++) {
logs[i].log_lock.lock();
for (std::map<string, LogInfo>::iterator it=
logs[i].log.begin(); it != logs[i].log.end(); it++) {
if (it->second.finished > 0)
ret.push(AmArg(it->first.c_str()));
}
logs[i].log_lock.unlock();
}
}
void Monitor::listUnfinished(const AmArg& args, AmArg& ret) {
for (int i=0;i<NUM_LOG_BUCKETS;i++) {
logs[i].log_lock.lock();
for (std::map<string, LogInfo>::iterator it=
logs[i].log.begin(); it != logs[i].log.end(); it++) {
if (!it->second.finished)
ret.push(AmArg(it->first.c_str()));
}
logs[i].log_lock.unlock();
}
}
LogBucket& Monitor::getLogBucket(const string& call_id) {
if (call_id.empty())
return logs[0];
char c = '\0'; // some distribution...bad luck if all callid start with 00000...
for (size_t i=0;i<5 && i<call_id.length();i++)
c = c ^ call_id[i];
return logs[c % NUM_LOG_BUCKETS];
}

@ -0,0 +1,97 @@
/*
* $Id$
*
* Copyright (C) 2009 IPTEGO GmbH
*
* This file is part of SEMS, a free SIP media server.
*
* SEMS is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the SEMS software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* SEMS is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _MONITORING_H_
#define _MONITORING_H_
#include <map>
#include "AmThread.h"
#include "AmApi.h"
#include "AmArg.h"
#include <time.h>
#define NUM_LOG_BUCKETS 16
struct LogInfo {
time_t finished; // for garbage collection
LogInfo()
: finished(0) { }
AmArg info;
};
struct LogBucket {
AmMutex log_lock;
std::map<string, LogInfo> log;
};
class Monitor
: public AmDynInvokeFactory,
public AmDynInvoke
{
static Monitor* _instance;
LogBucket logs[NUM_LOG_BUCKETS];
LogBucket& getLogBucket(const string& call_id);
void log(const AmArg& args, AmArg& ret);
void logAdd(const AmArg& args, AmArg& ret);
void markFinished(const AmArg& args, AmArg& ret);
void clear(const AmArg& args, AmArg& ret);
void clearFinished(const AmArg& args, AmArg& ret);
void erase(const AmArg& args, AmArg& ret);
void get(const AmArg& args, AmArg& ret);
void list(const AmArg& args, AmArg& ret);
void listFinished(const AmArg& args, AmArg& ret);
void listUnfinished(const AmArg& args, AmArg& ret);
public:
Monitor(const string& name);
~Monitor();
// DI factory
AmDynInvoke* getInstance() { return instance(); }
// DI API
static Monitor* instance();
void invoke(const string& method,
const AmArg& args, AmArg& ret);
int onLoad();
};
/*
class MonitorGarbageCollector : public AmThread {
public:
void run();
void on_stop();
};
*/
#endif

@ -39,10 +39,10 @@
#include "sems.h"
// AmSessionContainer methods
AmSessionContainer* AmSessionContainer::_instance=NULL;
_MONITORING_DECLARE_INTERFACE(AmSessionContainer);
AmSessionContainer::AmSessionContainer()
: _run_cond(false), _container_closed(false)
@ -88,6 +88,8 @@ bool AmSessionContainer::clean_sessions() {
if(cur_session->is_stopped() && cur_session->detached.get()){
MONITORING_MARK_FINISHED(cur_session->getCallID().c_str());
DBG("session %p has been destroyed'\n",(void*)cur_session->_pid);
delete cur_session;
}
@ -115,6 +117,7 @@ bool AmSessionContainer::clean_sessions() {
void AmSessionContainer::run()
{
_MONITORING_INIT;
while(!_container_closed.get()){
@ -201,6 +204,13 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session
return NULL;
}
MONITORING_LOG5(session->getCallID().c_str(),
"app", req.cmd.c_str(),
"dir", "out",
"from", req.from.c_str(),
"to", req.to.c_str(),
"ruri", req.r_uri.c_str());
if (int err = session->sendInvite(req.hdrs)) {
ERROR("INVITE could not be sent: error code = %d.\n",
err);
@ -209,6 +219,7 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session
session->getCallID(),
session->getRemoteTag());
delete session;
MONITORING_MARK_FINISHED(session->getCallID().c_str());
return NULL;
}
@ -261,6 +272,13 @@ void AmSessionContainer::startSessionUAS(AmSipRequest& req)
throw string("internal server error");
}
MONITORING_LOG5(req.callid.c_str(),
"app", req.cmd.c_str(),
"dir", "in",
"from", req.from.c_str(),
"to", req.to.c_str(),
"ruri", req.r_uri.c_str());
session->start();
session->postEvent(new AmSipRequestEvent(req));

@ -31,6 +31,8 @@
#include "AmThread.h"
#include "AmSession.h"
#include "ampi/MonitoringAPI.h"
#include <string>
#include <queue>
#include <map>
@ -140,6 +142,9 @@ class AmSessionContainer : public AmThread
* @return false if session doesn't exist
*/
bool postEvent(const string& local_tag, AmEvent* event);
_MONITORING_DEFINE_INTERFACE;
};
#endif

@ -0,0 +1,61 @@
monitoring module
The 'monitoring' module gets information regarding calls from the core
and applications, and makes them available via DI methods, e.g. for
monitoring a SEMS server via XMLRPC (using xmlrpc2di), or any other
method that can acces DI.
monitoring information is explicitely pushed to monitoring module via
DI calls (See ampi/MonitoringAPI.h for useful macros). Info is always
accessed via primary key, usually call-id. Info for every call is organized
as attribute-value pairs (one or more values), value can be any type
representable by AmArg (SEMS' variable type).
A call can be marked as finished. If not done before, this is done by
the session container when deleting a session (i.e., as the session
garbage collector in session container only runs every few seconds,
this can lag some seconds). Finished sessions can be listed and erased
separately, to free used memory.
Internally, the monitoring module keeps info in locked buckets of calls;
thus lock contention can be minimized by adapting NUM_LOG_BUCKETS
(Monitoring.h), which defaults to 16 (should be ok for most cases).
monitoring must be compile time enabled in Makefile.defs by setting
USE_MONITORING = yes
and the monitoring module needs to be loaded.
DI API
------
functions to log, e.g. from inside SEMS:
log(ID, key, value [, key, value [, key, value [...]]]) - set one or multiple AVPs
logAdd(ID, key, value) - add a value to an AVPs
functions to get log, e.g. from the outside:
list - list IDs of calls
listFinished - list IDs of finished calls
listUnfinished - list IDs of unfinished (active) calls
get - get info for a specific call, parameter is the ID
erase - erase info of a specific call, parameter is the ID (+free used memory)
clear - erase info of all calls (+free used memory)
clearFinished - erase info of all finished calls (+free used memory)
(of course, log()/logAdd() functions can also be accessed via e.g. XMLRPC.)
Performance
-----------
monitoring is not very much optimized for speed. Thus, especially by
using DI/AmArg, a lot of string comparisions and copying is performed.
If you measure any performance figures in real life usage comparing use
of monitoring vs. monitoring not enabled, please contribute
(mailto:semsdev@iptel.org, http://tracker.iptel.org) to be included in
this documentation.
TODO
----
o internal garbage collector, e.g. x secs after call is finished
o codec info
o more app specific info
o b2bua specific info
Loading…
Cancel
Save