- moved the "dialog-lookup-container" away from the session container into AmEventDispatcher.

- removed AmSIPEventHandler (its functionalities are now in AmEventDispatcher).
- added possibility for each plug-in to receive out-of-dialog messages (any kind).
- added possibility to handle dialogs without creating a session (=AmSession).


git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@1006 8eb893ce-cfd4-0310-b710-fb5ebe64c474
sayer/1.4-spce2.6
Raphael Coeffic 18 years ago
parent 44cc8a0413
commit 7030673b90

@ -29,6 +29,7 @@
#include "AmUtils.h"
#include "AmPlugIn.h"
#include "AmSessionContainer.h"
#include "AmEventDispatcher.h"
#define MOD_NAME "registrar_client"
@ -36,8 +37,18 @@
#include <unistd.h>
EXPORT_SIP_EVENT_HANDLER_FACTORY(SIPRegistrarClient, MOD_NAME);
EXPORT_PLUGIN_CLASS_FACTORY(SIPRegistrarClient, MOD_NAME);
//EXPORT_SIP_EVENT_HANDLER_FACTORY(SIPRegistrarClient, MOD_NAME);
//EXPORT_PLUGIN_CLASS_FACTORY(SIPRegistrarClient, MOD_NAME);
extern "C" void* plugin_class_create()
{
SIPRegistrarClient* reg_c = SIPRegistrarClient::instance();
assert(dynamic_cast<AmDynInvokeFactory*>(reg_c));
DBG("Hallo, alles in ordnung!\n");
return (AmPluginFactory*)reg_c;
}
SIPRegistration::SIPRegistration(const string& handle,
@ -179,12 +190,12 @@ SIPRegistrarClient* SIPRegistrarClient::instance()
if(_instance == NULL){
_instance = new SIPRegistrarClient(MOD_NAME);
}
return _instance;
}
SIPRegistrarClient::SIPRegistrarClient(const string& name)
: AmSIPEventHandler(name),
AmEventQueue(this) ,
: AmEventQueue(this),
uac_auth_i(NULL),
AmDynInvokeFactory(MOD_NAME)
{
@ -515,6 +526,9 @@ remove_reg_unsafe(const string& reg_id) {
reg = it->second;
registrations.erase(it);
}
AmEventDispatcher::instance()->delEventQueue(reg_id);
return reg;
}
@ -531,6 +545,8 @@ add_reg(const string& reg_id, SIPRegistration* new_reg)
}
registrations[reg_id] = new_reg;
AmEventDispatcher::instance()->addEventQueue(reg_id,this);
reg_mut.unlock();
if (reg != NULL)

@ -142,8 +142,7 @@ class SIPRegistration : public AmSipDialogEventHandler,
class SIPNewRegistrationEvent;
class SIPRemoveRegistrationEvent;
class SIPRegistrarClient : public AmSIPEventHandler,
public AmThread,
class SIPRegistrarClient : public AmThread,
public AmEventQueue,
public AmEventHandler,
public AmDynInvoke,

@ -351,7 +351,7 @@ void WebConferenceFactory::invoke(const string& method,
string WebConferenceFactory::getServerInfoString() {
string res = "Server: "
DEFAULT_SIGNATURE " calls: " +
int2str(AmSessionContainer::instance()->getSize())+
int2str(AmSession::getSessionNum())+
" active";
if (stats != NULL) {

@ -74,11 +74,20 @@ void AmSessionFactory::configureSession(AmSession* sess) {
//SessionTimer::sess->configureSessionTimer(mod_conf);
}
void AmSessionFactory::postEvent(AmEvent* ev) {
ERROR("unhandled Event in %s module\n", getName().c_str());
delete ev;
void AmSessionFactory::onOoDRequest(const AmSipRequest& req)
{
ERROR("sorry, we don't support beginning a new session with "
"a '%s' message\n", req.method.c_str());
AmSipDialog::reply_error(req,501,"Not Implemented");
return;
}
// void AmSessionFactory::postEvent(AmEvent* ev) {
// ERROR("unhandled Event in %s module\n", getName().c_str());
// delete ev;
// }
AmSessionEventHandlerFactory::AmSessionEventHandlerFactory(const string& name)
: AmPluginFactory(name)
{
@ -90,10 +99,10 @@ bool AmSessionEventHandlerFactory::onInvite(const AmSipRequest& req,
return onInvite(req);
}
AmSIPEventHandler::AmSIPEventHandler(const string& name)
: AmPluginFactory(name)
{
}
// AmSIPEventHandler::AmSIPEventHandler(const string& name)
// : AmPluginFactory(name)
// {
// }
AmLoggingFacility::AmLoggingFacility(const string& name)
: AmPluginFactory(name)

@ -182,38 +182,37 @@ class AmSessionFactory: public AmPluginFactory
AmArg& session_params);
/**
* method to receive an Event that is posted
* to the factory
* Method to receive any out-of-dialog request
* other than INVITE, REFER and OPTIONS
*
* Warning:
* This method should not make any expensive
* processing as it would block the thread
* posting the event!
*/
virtual void postEvent(AmEvent* ev);
virtual void onOoDRequest(const AmSipRequest& req);
};
/** \brief Interface for plugins that implement session-less
* UA behaviour (e.g. registrar client, event notification
* client)
*/
class AmSIPEventHandler : public AmPluginFactory
{
public:
AmSIPEventHandler(const string& name);
virtual ~AmSIPEventHandler() { }
/** will be called on incoming replies which do
* not belong to a dialog of a session in the
* SessionContainer.
*
* @return true if reply was handled by plugin, false
* otherwise
*/
virtual bool onSipReply(const AmSipReply& rep) = 0;
};
// class AmSIPEventHandler : public AmPluginFactory
// {
// public:
// AmSIPEventHandler(const string& name);
// virtual ~AmSIPEventHandler() { }
// /** will be called on incoming replies which do
// * not belong to a dialog of a session in the
// * SessionContainer.
// *
// * @return true if reply was handled by plugin, false
// * otherwise
// */
// virtual bool onSipReply(const AmSipReply& rep) = 0;
// };
/** \brief Interface for plugins that implement a
* logging facility
@ -230,8 +229,6 @@ class AmLoggingFacility : public AmPluginFactory
virtual void log(int level, const char* msg) = 0;
};
class AmInterfaceHandler;
class AmCtrlInterface: public AmThread
{
public:
@ -260,10 +257,9 @@ class AmCtrlInterface: public AmThread
* For sending messages, appropriate methods are exposed (the send()s).
* The interface defines a thread that runs, polling on the two listening unix
* sockets (one for requests, one for replies). After receiving a message,
* a registered 'AmInterfaceHandler' is used to handle the incomming SIP
* events (that end up either opening/updating a UAC session or posting to a
* SIP event queue).
*
* AmSipDispatcher shall be used to dispatch the incomming SIP messages
* (that end up either opening/updating a UAC session or posting to some
* event queue).
*/
class AmCtrlInterfaceFactory : public AmPluginFactory
{

@ -0,0 +1,147 @@
/*
* $Id: $
*
* Copyright (C) 2007 Raphael Coeffic
*
* This file is part of sems, a free SIP media server.
*
* sems is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the ser software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* sems is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "AmEventDispatcher.h"
#include "AmSipEvent.h"
AmEventDispatcher* AmEventDispatcher::_instance=NULL;
AmEventDispatcher* AmEventDispatcher::instance()
{
return _instance ? _instance : ((_instance = new AmEventDispatcher()));
}
bool AmEventDispatcher::addEventQueue(const string& local_tag,
AmEventQueue* q,
const string& callid,
const string& remote_tag)
{
bool exists = false;
m_queues.lock();
exists = queues.find(local_tag) != queues.end();
if(!callid.empty() && !remote_tag.empty()) {
exists = exists ||
(id_lookup.find(callid+remote_tag) != id_lookup.end());
}
if(!exists){
queues[local_tag] = q;
if(!callid.empty() && !remote_tag.empty())
id_lookup[callid+remote_tag] = local_tag;
}
m_queues.unlock();
return exists;
}
AmEventQueue* AmEventDispatcher::delEventQueue(const string& local_tag,
const string& callid,
const string& remote_tag)
{
AmEventQueue* q = NULL;
m_queues.lock();
EvQueueMapIter qi = queues.find(local_tag);
if(qi != queues.end()) {
q = qi->second;
queues.erase(qi);
if(!callid.empty() && !remote_tag.empty()) {
DictIter di = id_lookup.find(callid+remote_tag);
if(di != id_lookup.end()) {
id_lookup.erase(di);
}
}
}
m_queues.unlock();
return q;
}
bool AmEventDispatcher::post(const string& local_tag, AmEvent* ev)
{
bool posted = false;
m_queues.lock();
EvQueueMapIter it = queues.find(local_tag);
if(it != queues.end()){
it->second->postEvent(ev);
posted = true;
}
m_queues.unlock();
return posted;
}
bool AmEventDispatcher::post(const string& callid, const string& remote_tag, AmEvent* ev)
{
bool posted = false;
m_queues.lock();
DictIter di = id_lookup.find(callid+remote_tag);
if(di != id_lookup.end()) {
EvQueueMapIter it = queues.find(di->second);
if(it != queues.end()){
it->second->postEvent(ev);
posted = true;
}
}
m_queues.unlock();
return posted;
}
bool AmEventDispatcher::postSipRequest(const string& callid, const string& remote_tag,
const AmSipRequest& req)
{
bool posted = false;
m_queues.lock();
DictIter di = id_lookup.find(callid+remote_tag);
if(di != id_lookup.end()) {
EvQueueMapIter it = queues.find(di->second);
if(it != queues.end()){
it->second->postEvent(new AmSipRequestEvent(req));
posted = true;
}
}
m_queues.unlock();
return posted;
}

@ -0,0 +1,85 @@
/*
* $Id: $
*
* Copyright (C) 2008 Raphael Coeffic
*
* This file is part of sems, a free SIP media server.
*
* sems is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the ser software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* sems is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _AMEVENTDISPATCHER_h_
#define _AMEVENTDISPATCHER_h_
#include "AmEventQueue.h"
#include "AmSipMsg.h"
#include <map>
class AmEventDispatcher
{
public:
typedef std::map<string, AmEventQueue*> EvQueueMap;
typedef EvQueueMap::iterator EvQueueMapIter;
typedef std::map<string,string> Dictionnary;
typedef Dictionnary::iterator DictIter;
private:
static AmEventDispatcher *_instance;
/**
* Container for active sessions
* local tag -> event queue
*/
EvQueueMap queues;
/**
* Call ID + remote tag -> local tag
* (needed for CANCELs and some provisionnal answers)
* (UAS sessions only)
*/
Dictionnary id_lookup;
// mutex for "queues" and "id_lookup"
AmMutex m_queues;
public:
static AmEventDispatcher* instance();
bool postSipRequest(const string& callid, const string& remote_tag,
const AmSipRequest& req);
bool post(const string& local_tag, AmEvent* ev);
bool post(const string& callid, const string& remote_tag, AmEvent* ev);
bool addEventQueue(const string& local_tag,
AmEventQueue* q,
const string& callid="",
const string& remote_tag="");
AmEventQueue* delEventQueue(const string& local_tag,
const string& callid="",
const string& remote_tag="");
};
#endif

@ -51,7 +51,7 @@ protected:
public:
AmEventQueue(AmEventHandler*);
~AmEventQueue();
virtual ~AmEventQueue();
void postEvent(AmEvent*);
void processEvents();

@ -227,15 +227,15 @@ int AmPlugIn::load(const string& directory, const string& plugins)
}
// load SIPEventHandlers
for(std::map<std::string,AmSIPEventHandler*>::iterator it = name2sipeh.begin();
it != name2sipeh.end(); it++){
err = it->second->onLoad();
if(err)
return err;
// register for receiving replys
//AmReplyHandler::get()->registerReplyHandler(it->second);
AmSipDispatcher::instance()->registerReplyHandler(it->second);
}
// for(std::map<std::string,AmSIPEventHandler*>::iterator it = name2sipeh.begin();
// it != name2sipeh.end(); it++){
// err = it->second->onLoad();
// if(err)
// return err;
// // register for receiving replys
// //AmReplyHandler::get()->registerReplyHandler(it->second);
// AmSipDispatcher::instance()->registerReplyHandler(it->second);
// }
// init logging facilities
for(std::map<std::string,AmLoggingFacility*>::iterator it = name2logfac.begin();
@ -311,11 +311,11 @@ int AmPlugIn::loadPlugIn(const string& file)
has_sym=true;
}
if((fc = (FactoryCreate)dlsym(h_dl,FACTORY_SIP_EVENT_HANDLER_EXPORT_STR)) != NULL){
if(loadSIPehPlugIn((AmPluginFactory*)fc()))
goto error;
has_sym=true;
}
// if((fc = (FactoryCreate)dlsym(h_dl,FACTORY_SIP_EVENT_HANDLER_EXPORT_STR)) != NULL){
// if(loadSIPehPlugIn((AmPluginFactory*)fc()))
// goto error;
// has_sym=true;
// }
if((fc = (FactoryCreate)dlsym(h_dl,FACTORY_LOG_FACILITY_EXPORT_STR)) != NULL){
if(loadLogFacPlugIn((AmPluginFactory*)fc()))
@ -444,13 +444,13 @@ AmDynInvokeFactory* AmPlugIn::getFactory4Di(const string& name)
return 0;
}
AmSIPEventHandler* AmPlugIn::getFactory4SIPeh(const string& name)
{
std::map<std::string,AmSIPEventHandler*>::iterator it = name2sipeh.find(name);
if(it != name2sipeh.end())
return it->second;
return 0;
}
// AmSIPEventHandler* AmPlugIn::getFactory4SIPeh(const string& name)
// {
// std::map<std::string,AmSIPEventHandler*>::iterator it = name2sipeh.find(name);
// if(it != name2sipeh.end())
// return it->second;
// return 0;
// }
AmLoggingFacility* AmPlugIn::getFactory4LogFaclty(const string& name)
{
@ -574,28 +574,28 @@ int AmPlugIn::loadDiPlugIn(AmPluginFactory* f)
return -1;
}
int AmPlugIn::loadSIPehPlugIn(AmPluginFactory* f)
{
AmSIPEventHandler* sf = dynamic_cast<AmSIPEventHandler*>(f);
if(!sf){
ERROR("invalid SIP event handler plug-in!\n");
goto error;
}
// int AmPlugIn::loadSIPehPlugIn(AmPluginFactory* f)
// {
// AmSIPEventHandler* sf = dynamic_cast<AmSIPEventHandler*>(f);
// if(!sf){
// ERROR("invalid SIP event handler plug-in!\n");
// goto error;
// }
if(name2sipeh.find(sf->getName()) != name2sipeh.end()){
ERROR("sip event handler component '%s' already loaded !\n",
sf->getName().c_str());
goto error;
}
// if(name2sipeh.find(sf->getName()) != name2sipeh.end()){
// ERROR("sip event handler component '%s' already loaded !\n",
// sf->getName().c_str());
// goto error;
// }
name2sipeh.insert(std::make_pair(sf->getName(),sf));
DBG("sip event handler component '%s' loaded.\n",sf->getName().c_str());
// name2sipeh.insert(std::make_pair(sf->getName(),sf));
// DBG("sip event handler component '%s' loaded.\n",sf->getName().c_str());
return 0;
// return 0;
error:
return -1;
}
// error:
// return -1;
// }
int AmPlugIn::loadLogFacPlugIn(AmPluginFactory* f)
{
@ -745,3 +745,55 @@ bool AmPlugIn::registerFactory4App(const string& app_name, AmSessionFactory* f)
name2app.insert(make_pair(app_name,f));
return true;
}
AmSessionFactory* AmPlugIn::findSessionFactory(AmSipRequest& req)
{
if(req.cmd.empty()){
ERROR("AmPlugIn::findSessionFactory: req.cmd is empty!\n");
return NULL;
}
else if(req.cmd == "sems"){
switch (AmConfig::AppSelect) {
case AmConfig::App_RURIUSER:
req.cmd = req.user;
break;
case AmConfig::App_APPHDR:
req.cmd = getHeader(req.hdrs, APPNAME_HDR);
break;
case AmConfig::App_RURIPARAM:
req.cmd = get_header_param(req.r_uri, "app");
break;
case AmConfig::App_MAPPING:
{
req.cmd = "";
for (AmConfig::AppMappingVector::iterator it =
AmConfig::AppMapping.begin();
it != AmConfig::AppMapping.end(); it++){
if (!regexec(&it->first, req.r_uri.c_str(), 0, NULL, 0)) {
DBG("match of r_uri '%s' to application %s\n",
req.r_uri.c_str(), it->second.c_str());
req.cmd = it->second;
break;
}
}
} break;
case AmConfig::App_SPECIFIED:
req.cmd = AmConfig::Application;
break;
}
if (req.cmd.empty()) {
ERROR("could not find any application matching configured criteria\n");
return NULL;
}
}
AmSessionFactory* session_factory = getFactory4App(req.cmd);
if(!session_factory) {
ERROR("AmPlugIn::findSessionFactory: application '%s' not found !\n", req.cmd.c_str());
}
return session_factory;
}

@ -40,9 +40,10 @@ class AmPluginFactory;
class AmSessionFactory;
class AmSessionEventHandlerFactory;
class AmDynInvokeFactory;
class AmSIPEventHandler;
//class AmSIPEventHandler;
class AmLoggingFacility;
class AmCtrlInterfaceFactory;
class AmSipRequest;
struct amci_exports_t;
struct amci_codec_t;
@ -91,13 +92,13 @@ class AmPlugIn : public AmPayloadProviderInterface
std::map<int,amci_payload_t*> payloads;
std::map<int,int> payload_order;
std::map<string,amci_inoutfmt_t*> file_formats;
std::map<string,AmSessionFactory*> name2app;
std::map<string,AmSessionFactory*> name2app;
std::map<string,AmSessionEventHandlerFactory*> name2seh;
std::map<string,AmPluginFactory*> name2base;
std::map<string,AmDynInvokeFactory*> name2di;
std::map<string,AmSIPEventHandler*> name2sipeh;
std::map<string,AmLoggingFacility*> name2logfac;
std::map<string,AmPluginFactory*> name2base;
std::map<string,AmDynInvokeFactory*> name2di;
std::map<string,AmLoggingFacility*> name2logfac;
AmCtrlInterfaceFactory *ctrlIface;
int dynamic_pl; // range: 96->127, see RFC 1890
@ -114,7 +115,6 @@ class AmPlugIn : public AmPayloadProviderInterface
int loadSehPlugIn(AmPluginFactory* cb);
int loadBasePlugIn(AmPluginFactory* cb);
int loadDiPlugIn(AmPluginFactory* cb);
int loadSIPehPlugIn(AmPluginFactory* f);
int loadLogFacPlugIn(AmPluginFactory* f);
int loadCtrlFacPlugIn(AmPluginFactory* f);
@ -150,8 +150,10 @@ class AmPlugIn : public AmPayloadProviderInterface
/** @return the suported payloads. */
const std::map<int,amci_payload_t*>& getPayloads() { return payloads; }
/** @return the order of payloads. */
const std::map<int,int>& getPayloadOrder() { return payload_order; }
/**
* File format lookup according to the
* format name and/or file extension.
@ -160,6 +162,7 @@ class AmPlugIn : public AmPayloadProviderInterface
* @return NULL if failed.
*/
amci_inoutfmt_t* fileFormat(const string& fmt_name, const string& ext = "");
/**
* File format's subtype lookup function.
* @param iofmt The file format.
@ -167,12 +170,14 @@ class AmPlugIn : public AmPayloadProviderInterface
* @return NULL if failed.
*/
amci_subtype_t* subtype(amci_inoutfmt_t* iofmt, int subtype);
/**
* Codec lookup function.
* @param id Codec ID (see amci/codecs.h).
* @return NULL if failed.
*/
amci_codec_t* codec(int id);
/**
* Application lookup function
* @param app_name application name
@ -180,6 +185,15 @@ class AmPlugIn : public AmPayloadProviderInterface
*/
AmSessionFactory* getFactory4App(const string& app_name);
/** @return true if this record has been inserted. */
bool registerFactory4App(const string& app_name, AmSessionFactory* f);
/**
* Find the proper SessionFactory
* for the given request.
*/
AmSessionFactory* findSessionFactory(AmSipRequest& req);
/**
* Session event handler lookup function
* @param name application name
@ -192,13 +206,6 @@ class AmPlugIn : public AmPayloadProviderInterface
*/
AmDynInvokeFactory* getFactory4Di(const string& name);
/**
* SIP event handler lookup function
* @param name application name
* @return NULL if failed (-> handler not found).
*/
AmSIPEventHandler* getFactory4SIPeh(const string& name);
/**
* logging facility lookup function
* @param name application name
@ -206,8 +213,6 @@ class AmPlugIn : public AmPayloadProviderInterface
*/
AmLoggingFacility* getFactory4LogFaclty(const string& name);
/** @return true if this record has been inserted. */
bool registerFactory4App(const string& app_name, AmSessionFactory* f);
};
#endif

@ -49,6 +49,9 @@
#include <assert.h>
#include <sys/time.h>
volatile unsigned int AmSession::session_num = 0;
// AmSessionEventHandler methods
bool AmSessionEventHandler::process(AmEvent*)
{
@ -387,6 +390,8 @@ void AmSession::run()
}
#endif
session_num++;
try {
try {
@ -430,8 +435,10 @@ void AmSession::run()
catch(const AmSession::Exception& e){
ERROR("%i %s\n",e.code,e.reason.c_str());
}
destroy();
session_num--;
// wait at least until session is out of RtpScheduler
DBG("session is stopped.\n");
@ -469,6 +476,12 @@ string AmSession::getNewId()
return id;
}
unsigned int AmSession::getSessionNum()
{
return AmSession::session_num;
}
void AmSession::setInbandDetector(Dtmf::InbandDetectorType t)
{
m_dtmfDetector.setInbandDetector(t);

@ -58,7 +58,8 @@ class AmDtmfEvent;
#define FL_FORCE_ACTIVE 2
/**
* \brief Interface for SIP events signaling plugins implement
* \brief Interface for SIP signaling plugins that
* must change requests or replies (ex: session timer).
*
* Signaling plugins must inherite from this class.
*/
@ -73,8 +74,8 @@ public:
virtual ~AmSessionEventHandler() {}
/*
* All the methods return true if event processing
* should stopped after calling them.
* All the methods return true if the event processing
* shall be stopped after them.
*/
virtual bool process(AmEvent*);
virtual bool onSipEvent(AmSipEvent*);
@ -142,6 +143,8 @@ private:
AmCondition<bool> sess_stopped;
AmCondition<bool> detached;
static volatile unsigned int session_num;
friend class AmMediaProcessor;
friend class AmMediaProcessorThread;
friend class AmSessionContainer;
@ -364,6 +367,11 @@ public:
*/
static string getNewId();
/**
* Gets the number of running sessions
*/
static unsigned int getSessionNum();
/**
* Entry point for DTMF events
*/

@ -31,9 +31,12 @@
#include "AmApi.h"
#include "AmConfig.h"
#include "AmUtils.h"
#include "AmEventDispatcher.h"
#include <assert.h>
#include <sys/types.h>
#include <unistd.h>
#include "sems.h"
// AmSessionContainer methods
@ -41,7 +44,8 @@
AmSessionContainer* AmSessionContainer::_SessionContainer=0;
AmSessionContainer::AmSessionContainer()
: _run_cond(false)
: _run_cond(false)
{
}
@ -136,52 +140,22 @@ void AmSessionContainer::destroySession(AmSession* s)
void AmSessionContainer::destroySession(const string& local_tag)
{
as_mut.lock();
SessionMapIter sess_it = a_sessions.find(local_tag);
if(sess_it != a_sessions.end()){
AmSession* sess = sess_it->second;
as_id_lookup.erase(sess->getCallID() + sess->getRemoteTag());
a_sessions.erase(sess_it);
AmSession* s = NULL;
AmEventQueue* q = AmEventDispatcher::instance()->
delEventQueue(local_tag);
if(q &&
(s = dynamic_cast<AmSession*>(q))) {
stopAndQueue(sess);
DBG("session stopped and queued for deletion (#sessions=%u)\n",
(unsigned int)a_sessions.size());
}
else {
DBG("could not remove session: id not found\n");
}
as_mut.unlock();
}
AmSession* AmSessionContainer::getSession(const string& callid, const string& remote_tag)
{
DictIter it = as_id_lookup.find(callid+remote_tag);
if(it == as_id_lookup.end()){
//ERROR("could not find session (callid='%s'; remote_tag='%s')\n",
// callid.c_str(),remote_tag.c_str());
return NULL;
}
return getSession(it->second);
}
AmSession* AmSessionContainer::getSession(const string& local_tag)
{
SessionMapIter it = a_sessions.find(local_tag);
if(it == a_sessions.end()){
//ERROR("could not find session (local_tag='%s')\n",local_tag.c_str());
return NULL;
}
return it->second;
stopAndQueue(s);
}
else {
DBG("could not remove session: id not found or wrong type\n");
}
}
AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session_params) {
AmSession* session = NULL;
as_mut.lock();
try {
if((session = createSession(req, session_params)) != 0){
session->dlg.updateStatusFromLocalRequest(req); // sets local tag as well
@ -192,7 +166,6 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session
ERROR("INVITE could not be sent: error code = %d.\n",
err);
delete session;
as_mut.unlock();
return NULL;
}
@ -203,9 +176,7 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session
session->start();
addSession_unsafe(req.callid,"",req.from_tag,session);
// session does not get its own INVITE
// session->postEvent(new AmSipRequestEvent(req));
addSession("","",req.from_tag,session);
}
}
catch(const AmSession::Exception& e){
@ -220,23 +191,13 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session
ERROR("unexpected exception\n");
AmSipDialog::reply_error(req,500,"unexpected exception");
}
as_mut.unlock();
return session;
}
void AmSessionContainer::startSessionUAS(AmSipRequest& req)
{
as_mut.lock();
try {
AmSession* session = getSession(req.callid,req.from_tag);
if( session ){
// it's a forked-and-merged INVITE
// reply 482 Loop detected
throw AmSession::Exception(482, "Loop detected");
}
else {
// Call-ID and From-Tag are unknown: it's a new session
AmSession* session;
if((session = createSession(req)) != 0){
@ -254,10 +215,9 @@ void AmSessionContainer::startSessionUAS(AmSipRequest& req)
session->start();
addSession_unsafe(req.callid,req.from_tag,local_tag,session);
addSession(req.callid,req.from_tag,local_tag,session);
session->postEvent(new AmSipRequestEvent(req));
}
}
}
catch(const AmSession::Exception& e){
ERROR("%i %s\n",e.code,e.reason.c_str());
@ -271,7 +231,6 @@ void AmSessionContainer::startSessionUAS(AmSipRequest& req)
ERROR("unexpected exception\n");
AmSipDialog::reply_error(req,500,"unexpected exception");
}
as_mut.unlock();
}
@ -279,186 +238,95 @@ bool AmSessionContainer::postEvent(const string& callid,
const string& remote_tag,
AmEvent* event)
{
// DBG("postEvent: callid = %s; remote_tag = %s\n",
// callid.c_str(),remote_tag.c_str());
bool posted =
AmEventDispatcher::instance()->
post(callid,remote_tag,event);
as_mut.lock();
AmSession* s = getSession(callid,remote_tag);
if(!s){
as_mut.unlock();
delete event;
return false;
}
s->postEvent(event);
as_mut.unlock();
if(!posted)
delete event;
return true;
return posted;
}
bool AmSessionContainer::postEvent(const string& local_tag,
AmEvent* event)
{
// DBG("postEvent: local_tag = %s\n",local_tag.c_str());
as_mut.lock();
AmSession* s = getSession(local_tag);
if (s != NULL) {
s->postEvent(event);
as_mut.unlock();
return true;
}
as_mut.unlock();
// try session factories
AmSessionFactory* sf = AmPlugIn::instance()->getFactory4App(local_tag);
if (sf != NULL) {
sf->postEvent(event);
return true;
}
bool posted =
AmEventDispatcher::instance()->
post(local_tag,event);
if(!posted)
delete event;
return posted;
delete event;
return false;
}
AmSession* AmSessionContainer::createSession(AmSipRequest& req,
AmArg* session_params)
{
if (AmConfig::SessionLimit &&
AmConfig::SessionLimit <= a_sessions.size()) {
DBG("session_limit %d reached. Not creating session.\n",
AmConfig::SessionLimit);
throw AmSession::Exception(AmConfig::SessionLimitErrCode, AmConfig::SessionLimitErrReason);
if (AmConfig::SessionLimit &&
AmConfig::SessionLimit <= AmSession::session_num) {
DBG("session_limit %d reached. Not creating session.\n",
AmConfig::SessionLimit);
AmSipDialog::reply_error(req,AmConfig::SessionLimitErrCode,
AmConfig::SessionLimitErrReason);
return NULL;
}
if(req.cmd.empty()){
throw string("AmSessionContainer::createSession: req.cmd is empty!\n");
}
else if(req.cmd == "sems"){
switch (AmConfig::AppSelect) {
case AmConfig::App_RURIUSER:
req.cmd = req.user;
break;
case AmConfig::App_APPHDR:
req.cmd = getHeader(req.hdrs, APPNAME_HDR);
break;
case AmConfig::App_RURIPARAM:
req.cmd = get_header_param(req.r_uri, "app");
break;
case AmConfig::App_MAPPING:
{
req.cmd = "";
for (AmConfig::AppMappingVector::iterator it =
AmConfig::AppMapping.begin();
it != AmConfig::AppMapping.end(); it++){
if (!regexec(&it->first, req.r_uri.c_str(), 0, NULL, 0)) {
DBG("match of r_uri '%s' to application %s\n",
req.r_uri.c_str(), it->second.c_str());
req.cmd = it->second;
break;
}
}
} break;
case AmConfig::App_SPECIFIED:
req.cmd = AmConfig::Application;
break;
}
if (req.cmd.empty()) {
string ex = "Unknown Application";
throw ex;
}
}
AmSessionFactory* session_factory =
AmPlugIn::instance()->findSessionFactory(req);
AmSessionFactory* state_factory = AmPlugIn::instance()->getFactory4App(req.cmd);
if(!state_factory) {
ERROR("application '%s' not found !\n", req.cmd.c_str());
throw string("application '" + req.cmd + "' not found !");
if(!session_factory) {
ERROR("No session factory");
AmSipDialog::reply_error(req,500,"No session factory");
return NULL;
}
AmSession* session = NULL;
if (req.method == "INVITE") {
if (NULL != session_params)
session = state_factory->onInvite(req, *session_params);
session = session_factory->onInvite(req, *session_params);
else
session = state_factory->onInvite(req);
session = session_factory->onInvite(req);
} else if (req.method == "REFER") {
if (NULL != session_params)
session = state_factory->onRefer(req, *session_params);
session = session_factory->onRefer(req, *session_params);
else
session = state_factory->onRefer(req);
session = session_factory->onRefer(req);
}
if(!session) {
// State creation failed:
// Session creation failed:
// application denied session creation
// or there was an error.
//
// let's hope the createState function has replied...
// ... and do nothing !
DBG("onInvite returned NULL\n");
return 0;
DBG("onInvite/onRefer returned NULL\n");
}
//state_factory->configureSession(session);
//session->checkSessionExpires(req);
return session;
}
bool AmSessionContainer::addSession_unsafe(const string& callid,
const string& remote_tag,
const string& local_tag,
AmSession* session)
{
if(getSession(callid,remote_tag))
return false;
if (!remote_tag.empty())
as_id_lookup[callid+remote_tag] = local_tag;
return addSession_unsafe(local_tag,session);
}
bool AmSessionContainer::addSession_unsafe(const string& local_tag,
AmSession* session)
{
if(getSession(local_tag))
return false;
//DBG("a_sessions['%s'] = session\n",local_tag.c_str());
a_sessions[local_tag] = session;
return true;
}
bool AmSessionContainer::addSession(const string& callid,
const string& remote_tag,
const string& local_tag,
AmSession* session)
{
as_mut.lock();
bool ret = addSession_unsafe(callid,remote_tag,local_tag,session);
as_mut.unlock();
return ret;
return AmEventDispatcher::instance()->
addEventQueue(local_tag,(AmEventQueue*)session,
callid,remote_tag);
}
bool AmSessionContainer::addSession(const string& local_tag,
AmSession* session)
{
as_mut.lock();
bool ret = addSession_unsafe(local_tag,session);
as_mut.unlock();
return ret;
}
int AmSessionContainer::getSize()
{
int res=0;
as_mut.lock();
res = a_sessions.size();
as_mut.unlock();
return res;
return AmEventDispatcher::instance()->
addEventQueue(local_tag,(AmEventQueue*)session);
}

@ -49,32 +49,8 @@ class AmSessionContainer : public AmThread
{
static AmSessionContainer* _SessionContainer;
// some typedefs ...
typedef std::map<string,AmSession*> SessionMap;
typedef SessionMap::iterator SessionMapIter;
typedef std::map<string,string> Dictionnary;
typedef Dictionnary::iterator DictIter;
typedef std::queue<AmSession*> SessionQueue;
/**
* Container for active sessions
* local tag -> session
*/
SessionMap a_sessions;
/**
* Call ID + remote tag -> local tag
* (needed for CANCELs and some provisionnal answers)
* (UAS sessions only)
*/
Dictionnary as_id_lookup;
/** Mutex to protect the active session container */
AmMutex as_mut;
/** Container for dead sessions */
SessionQueue d_sessions;
/** Mutex to protect the dead session container */
@ -86,39 +62,6 @@ class AmSessionContainer : public AmThread
/** We are a Singleton ! Avoid people to have their own instance. */
AmSessionContainer();
/**
* Search the container for a session coresponding
* to callid and remote_tag. (UAS only).
*
* @return the session related to callid & remote_tag
* or NULL if none has been found.
*/
AmSession* getSession(const string& callid, const string& remote_tag);
/**
* Search the container for a session coresponding to local_tag.
*
* @return the session related to local_tag
* or NULL if none has been found.
*/
AmSession* getSession(const string& local_tag);
/**
* Adds a session to the container. (UAS only)
* @return true if the session is new within the container.
*/
bool addSession_unsafe(const string& callid,
const string& remote_tag,
const string& local_tag,
AmSession* session);
/**
* Adds a session to the container.
* @return true if the session is new within the container.
*/
bool addSession_unsafe(const string& local_tag,
AmSession* session);
/**
* Tries to stop the session and queue it destruction.
*/
@ -154,7 +97,7 @@ class AmSessionContainer : public AmThread
* @return true if the session is new within the container.
*/
bool addSession(const string& local_tag,
AmSession* session);
AmSession* session);
/**
* Constructs a new session and adds it to the active session container.
@ -175,11 +118,6 @@ class AmSessionContainer : public AmThread
void destroySession(AmSession* s);
void destroySession(const string& local_tag);
/**
* Query the number of active sessions
*/
int getSize();
/**
* post an event into the event queue of the identified dialog.
* @return false if session doesn't exist

@ -1,3 +1,29 @@
/*
* $Id: $
*
* Copyright (C) 2008 Raphael Coeffic
*
* This file is part of sems, a free SIP media server.
*
* sems is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the ser software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* sems is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "AmSessionContainer.h"
#include "AmSipDialog.h"
@ -5,6 +31,7 @@
#include "log.h"
#include "AmSipDispatcher.h"
#include "AmEventDispatcher.h"
AmSipDispatcher *AmSipDispatcher::_instance;
@ -15,13 +42,12 @@ AmSipDispatcher* AmSipDispatcher::instance()
void AmSipDispatcher::handleSipMsg(AmSipReply &reply)
{
if (!AmSessionContainer::instance()->postEvent(reply.local_tag,
new AmSipReplyEvent(reply))) {
for (vector<AmSIPEventHandler*>::iterator it =
reply_handlers.begin(); it != reply_handlers.end(); it++)
if ((*it)->onSipReply(reply))
break;
}
AmSipReplyEvent* ev = new AmSipReplyEvent(reply);
if(!AmEventDispatcher::instance()->post(reply.local_tag,ev)){
ERROR("could not dispatch reply\n");
delete ev;
}
}
void AmSipDispatcher::handleSipMsg(AmSipRequest &req)
@ -30,44 +56,46 @@ void AmSipDispatcher::handleSipMsg(AmSipRequest &req)
string remote_tag = req.from_tag;
string local_tag = req.to_tag;
bool sess_exists;
AmSipRequestEvent* ev = new AmSipRequestEvent(req);
AmSessionContainer* sess_cont = AmSessionContainer::instance();
if(local_tag.empty())
sess_exists = sess_cont->postEvent(callid,remote_tag,ev);
else
sess_exists = sess_cont->postEvent(local_tag,ev);
AmEventDispatcher* ev_disp = AmEventDispatcher::instance();
if(!sess_exists){
DBG("method: `%s' [%zd].\n", req.method.c_str(), req.method.length());
if((req.method == "INVITE")){
if(!local_tag.empty()) {
AmSipRequestEvent* ev = new AmSipRequestEvent(req);
sess_cont->startSessionUAS(req);
}
else if(req.method == "OPTIONS"){
// Basic OPTIONS support
AmSipDialog::reply_error(req,200,"OK");
return;
}
else {
if(!ev_disp->post(local_tag,ev)) {
if(!local_tag.empty() || req.method == "CANCEL"){
delete ev;
AmSipDialog::reply_error(req,481,
"Call leg/Transaction does not exist");
}
AmSipDialog::reply_error(req,481,
"Call leg/Transaction does not exist");
return;
}
} else {
if(ev_disp->postSipRequest(callid, remote_tag, req)){
return;
}
DBG("method: `%s' [%zd].\n", req.method.c_str(), req.method.length());
if((req.method == "INVITE")){
AmSessionContainer::instance()->startSessionUAS(req);
}
else if(req.method == "OPTIONS"){
// Basic OPTIONS support
AmSipDialog::reply_error(req,200,"OK");
return;
}
else {
ERROR("sorry, we don't support beginning a new session with "
"a '%s' message\n", req.method.c_str());
AmSessionFactory* sess_fact = AmPlugIn::instance()->findSessionFactory(req);
if(!sess_fact){
AmSipDialog::reply_error(req,501,"Not Implemented");
return;
AmSipDialog::reply_error(req,500,"Not implemented");
return;
}
}
}
}

@ -1,30 +1,43 @@
/*
* $Id: $
*
* Copyright (C) 2008 Raphael Coeffic
*
* This file is part of sems, a free SIP media server.
*
* sems is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* For a license to use the ser software under conditions
* other than those described here, or to purchase support for this
* software, please contact iptel.org by e-mail at the following addresses:
* info@iptel.org
*
* sems is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef __AMSIPDISPATCHER_H__
#define __AMSIPDISPATCHER_H__
#include <vector>
#include "AmSipMsg.h"
#include "AmApi.h"
using std::vector;
class AmSipDispatcher
{
private:
static AmSipDispatcher *_instance;
vector<AmSIPEventHandler*> reply_handlers;
public:
void handleSipMsg(AmSipRequest &);
void handleSipMsg(AmSipReply &);
/** register a reply handler for incoming replies pertaining
* to a dialog without a session (not in SessionContainer) */
void registerReplyHandler(AmSIPEventHandler* eh)
{
reply_handlers.push_back(eh);
}
static AmSipDispatcher* instance();
};

@ -60,6 +60,7 @@ SessionTimer::SessionTimer(AmSession* s)
bool SessionTimer::process(AmEvent* ev)
{
assert(ev);
AmTimeoutEvent* timeout_ev = dynamic_cast<AmTimeoutEvent*>(ev);
if (timeout_ev) {
DBG("received timeout Event with ID %d\n", timeout_ev->data.get(0).asInt());

@ -243,7 +243,7 @@ int StatsUDPServer::execute(char* msg_buf, string& reply,
msg_get_param(msg_c,cmd_str,buffer,CTRL_MSGBUF_SIZE);
if(cmd_str == "calls")
reply = "Active calls: " + int2str(sc->getSize()) + "\n";
reply = "Active calls: " + int2str(AmSession::getSessionNum()) + "\n";
else if (cmd_str == "which") {
reply =
"calls - number of active calls (Session Container size)\n"

@ -474,7 +474,6 @@ static void getInterfaceList(int sd, std::vector<std::pair<string,string> >& if_
static string getLocalIP(const string& dev_name)
{
//DBG("getLocalIP(%s)\n", dev_name.c_str());
#ifdef SUPPORT_IPV6
struct sockaddr_storage ss;

Loading…
Cancel
Save