From 7030673b905419c117eb181e900aa908232dbad3 Mon Sep 17 00:00:00 2001 From: Raphael Coeffic Date: Wed, 4 Jun 2008 10:56:01 +0000 Subject: [PATCH] - moved the "dialog-lookup-container" away from the session container into AmEventDispatcher. - removed AmSIPEventHandler (its functionalities are now in AmEventDispatcher). - added possibility for each plug-in to receive out-of-dialog messages (any kind). - added possibility to handle dialogs without creating a session (=AmSession). git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@1006 8eb893ce-cfd4-0310-b710-fb5ebe64c474 --- apps/registrar_client/SIPRegistrarClient.cpp | 24 +- apps/registrar_client/SIPRegistrarClient.h | 3 +- apps/webconference/WebConference.cpp | 2 +- core/AmApi.cpp | 23 +- core/AmApi.h | 48 ++-- core/AmEventDispatcher.cpp | 147 +++++++++++ core/AmEventDispatcher.h | 85 ++++++ core/AmEventQueue.h | 2 +- core/AmPlugIn.cpp | 130 +++++++--- core/AmPlugIn.h | 37 +-- core/AmSession.cpp | 15 +- core/AmSession.h | 14 +- core/AmSessionContainer.cpp | 256 +++++-------------- core/AmSessionContainer.h | 64 +---- core/AmSipDispatcher.cpp | 104 +++++--- core/AmSipDispatcher.h | 39 ++- core/plug-in/session_timer/SessionTimer.cpp | 1 + core/plug-in/stats/StatsUDPServer.cpp | 2 +- core/sems.cpp | 1 - 19 files changed, 587 insertions(+), 410 deletions(-) create mode 100644 core/AmEventDispatcher.cpp create mode 100644 core/AmEventDispatcher.h diff --git a/apps/registrar_client/SIPRegistrarClient.cpp b/apps/registrar_client/SIPRegistrarClient.cpp index e5d0d0b1..ca09fd88 100644 --- a/apps/registrar_client/SIPRegistrarClient.cpp +++ b/apps/registrar_client/SIPRegistrarClient.cpp @@ -29,6 +29,7 @@ #include "AmUtils.h" #include "AmPlugIn.h" #include "AmSessionContainer.h" +#include "AmEventDispatcher.h" #define MOD_NAME "registrar_client" @@ -36,8 +37,18 @@ #include -EXPORT_SIP_EVENT_HANDLER_FACTORY(SIPRegistrarClient, MOD_NAME); -EXPORT_PLUGIN_CLASS_FACTORY(SIPRegistrarClient, MOD_NAME); +//EXPORT_SIP_EVENT_HANDLER_FACTORY(SIPRegistrarClient, MOD_NAME); +//EXPORT_PLUGIN_CLASS_FACTORY(SIPRegistrarClient, MOD_NAME); + +extern "C" void* plugin_class_create() +{ + SIPRegistrarClient* reg_c = SIPRegistrarClient::instance(); + assert(dynamic_cast(reg_c)); + + DBG("Hallo, alles in ordnung!\n"); + + return (AmPluginFactory*)reg_c; +} SIPRegistration::SIPRegistration(const string& handle, @@ -179,12 +190,12 @@ SIPRegistrarClient* SIPRegistrarClient::instance() if(_instance == NULL){ _instance = new SIPRegistrarClient(MOD_NAME); } + return _instance; } SIPRegistrarClient::SIPRegistrarClient(const string& name) - : AmSIPEventHandler(name), - AmEventQueue(this) , + : AmEventQueue(this), uac_auth_i(NULL), AmDynInvokeFactory(MOD_NAME) { @@ -515,6 +526,9 @@ remove_reg_unsafe(const string& reg_id) { reg = it->second; registrations.erase(it); } + + AmEventDispatcher::instance()->delEventQueue(reg_id); + return reg; } @@ -531,6 +545,8 @@ add_reg(const string& reg_id, SIPRegistration* new_reg) } registrations[reg_id] = new_reg; + + AmEventDispatcher::instance()->addEventQueue(reg_id,this); reg_mut.unlock(); if (reg != NULL) diff --git a/apps/registrar_client/SIPRegistrarClient.h b/apps/registrar_client/SIPRegistrarClient.h index 93767b86..cc9f6328 100644 --- a/apps/registrar_client/SIPRegistrarClient.h +++ b/apps/registrar_client/SIPRegistrarClient.h @@ -142,8 +142,7 @@ class SIPRegistration : public AmSipDialogEventHandler, class SIPNewRegistrationEvent; class SIPRemoveRegistrationEvent; -class SIPRegistrarClient : public AmSIPEventHandler, - public AmThread, +class SIPRegistrarClient : public AmThread, public AmEventQueue, public AmEventHandler, public AmDynInvoke, diff --git a/apps/webconference/WebConference.cpp b/apps/webconference/WebConference.cpp index 3fa23dca..cc9c60eb 100644 --- a/apps/webconference/WebConference.cpp +++ b/apps/webconference/WebConference.cpp @@ -351,7 +351,7 @@ void WebConferenceFactory::invoke(const string& method, string WebConferenceFactory::getServerInfoString() { string res = "Server: " DEFAULT_SIGNATURE " calls: " + - int2str(AmSessionContainer::instance()->getSize())+ + int2str(AmSession::getSessionNum())+ " active"; if (stats != NULL) { diff --git a/core/AmApi.cpp b/core/AmApi.cpp index 3d3f4af6..ce818e56 100644 --- a/core/AmApi.cpp +++ b/core/AmApi.cpp @@ -74,11 +74,20 @@ void AmSessionFactory::configureSession(AmSession* sess) { //SessionTimer::sess->configureSessionTimer(mod_conf); } -void AmSessionFactory::postEvent(AmEvent* ev) { - ERROR("unhandled Event in %s module\n", getName().c_str()); - delete ev; +void AmSessionFactory::onOoDRequest(const AmSipRequest& req) +{ + ERROR("sorry, we don't support beginning a new session with " + "a '%s' message\n", req.method.c_str()); + + AmSipDialog::reply_error(req,501,"Not Implemented"); + return; } +// void AmSessionFactory::postEvent(AmEvent* ev) { +// ERROR("unhandled Event in %s module\n", getName().c_str()); +// delete ev; +// } + AmSessionEventHandlerFactory::AmSessionEventHandlerFactory(const string& name) : AmPluginFactory(name) { @@ -90,10 +99,10 @@ bool AmSessionEventHandlerFactory::onInvite(const AmSipRequest& req, return onInvite(req); } -AmSIPEventHandler::AmSIPEventHandler(const string& name) - : AmPluginFactory(name) -{ -} +// AmSIPEventHandler::AmSIPEventHandler(const string& name) +// : AmPluginFactory(name) +// { +// } AmLoggingFacility::AmLoggingFacility(const string& name) : AmPluginFactory(name) diff --git a/core/AmApi.h b/core/AmApi.h index 698acd89..3e476caa 100644 --- a/core/AmApi.h +++ b/core/AmApi.h @@ -182,38 +182,37 @@ class AmSessionFactory: public AmPluginFactory AmArg& session_params); /** - * method to receive an Event that is posted - * to the factory + * Method to receive any out-of-dialog request + * other than INVITE, REFER and OPTIONS * * Warning: * This method should not make any expensive * processing as it would block the thread * posting the event! */ - virtual void postEvent(AmEvent* ev); - + virtual void onOoDRequest(const AmSipRequest& req); }; /** \brief Interface for plugins that implement session-less * UA behaviour (e.g. registrar client, event notification * client) */ -class AmSIPEventHandler : public AmPluginFactory -{ - - public: - AmSIPEventHandler(const string& name); - virtual ~AmSIPEventHandler() { } - - /** will be called on incoming replies which do - * not belong to a dialog of a session in the - * SessionContainer. - * - * @return true if reply was handled by plugin, false - * otherwise - */ - virtual bool onSipReply(const AmSipReply& rep) = 0; -}; +// class AmSIPEventHandler : public AmPluginFactory +// { + +// public: +// AmSIPEventHandler(const string& name); +// virtual ~AmSIPEventHandler() { } + +// /** will be called on incoming replies which do +// * not belong to a dialog of a session in the +// * SessionContainer. +// * +// * @return true if reply was handled by plugin, false +// * otherwise +// */ +// virtual bool onSipReply(const AmSipReply& rep) = 0; +// }; /** \brief Interface for plugins that implement a * logging facility @@ -230,8 +229,6 @@ class AmLoggingFacility : public AmPluginFactory virtual void log(int level, const char* msg) = 0; }; -class AmInterfaceHandler; - class AmCtrlInterface: public AmThread { public: @@ -260,10 +257,9 @@ class AmCtrlInterface: public AmThread * For sending messages, appropriate methods are exposed (the send()s). * The interface defines a thread that runs, polling on the two listening unix * sockets (one for requests, one for replies). After receiving a message, - * a registered 'AmInterfaceHandler' is used to handle the incomming SIP - * events (that end up either opening/updating a UAC session or posting to a - * SIP event queue). - * + * AmSipDispatcher shall be used to dispatch the incomming SIP messages + * (that end up either opening/updating a UAC session or posting to some + * event queue). */ class AmCtrlInterfaceFactory : public AmPluginFactory { diff --git a/core/AmEventDispatcher.cpp b/core/AmEventDispatcher.cpp new file mode 100644 index 00000000..1fc49e91 --- /dev/null +++ b/core/AmEventDispatcher.cpp @@ -0,0 +1,147 @@ +/* + * $Id: $ + * + * Copyright (C) 2007 Raphael Coeffic + * + * This file is part of sems, a free SIP media server. + * + * sems is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * For a license to use the ser software under conditions + * other than those described here, or to purchase support for this + * software, please contact iptel.org by e-mail at the following addresses: + * info@iptel.org + * + * sems is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include "AmEventDispatcher.h" +#include "AmSipEvent.h" + +AmEventDispatcher* AmEventDispatcher::_instance=NULL; + +AmEventDispatcher* AmEventDispatcher::instance() +{ + return _instance ? _instance : ((_instance = new AmEventDispatcher())); +} + +bool AmEventDispatcher::addEventQueue(const string& local_tag, + AmEventQueue* q, + const string& callid, + const string& remote_tag) +{ + bool exists = false; + + m_queues.lock(); + + exists = queues.find(local_tag) != queues.end(); + + if(!callid.empty() && !remote_tag.empty()) { + exists = exists || + (id_lookup.find(callid+remote_tag) != id_lookup.end()); + } + + if(!exists){ + queues[local_tag] = q; + + if(!callid.empty() && !remote_tag.empty()) + id_lookup[callid+remote_tag] = local_tag; + } + + m_queues.unlock(); + + return exists; +} + +AmEventQueue* AmEventDispatcher::delEventQueue(const string& local_tag, + const string& callid, + const string& remote_tag) +{ + AmEventQueue* q = NULL; + + m_queues.lock(); + + EvQueueMapIter qi = queues.find(local_tag); + if(qi != queues.end()) { + + q = qi->second; + queues.erase(qi); + + if(!callid.empty() && !remote_tag.empty()) { + + DictIter di = id_lookup.find(callid+remote_tag); + if(di != id_lookup.end()) { + + id_lookup.erase(di); + } + } + } + m_queues.unlock(); + + return q; +} + +bool AmEventDispatcher::post(const string& local_tag, AmEvent* ev) +{ + bool posted = false; + m_queues.lock(); + + EvQueueMapIter it = queues.find(local_tag); + if(it != queues.end()){ + it->second->postEvent(ev); + posted = true; + } + + m_queues.unlock(); + + return posted; +} + +bool AmEventDispatcher::post(const string& callid, const string& remote_tag, AmEvent* ev) +{ + bool posted = false; + m_queues.lock(); + + DictIter di = id_lookup.find(callid+remote_tag); + if(di != id_lookup.end()) { + + EvQueueMapIter it = queues.find(di->second); + if(it != queues.end()){ + it->second->postEvent(ev); + posted = true; + } + } + m_queues.unlock(); + + return posted; +} + +bool AmEventDispatcher::postSipRequest(const string& callid, const string& remote_tag, + const AmSipRequest& req) +{ + bool posted = false; + m_queues.lock(); + + DictIter di = id_lookup.find(callid+remote_tag); + if(di != id_lookup.end()) { + + EvQueueMapIter it = queues.find(di->second); + if(it != queues.end()){ + it->second->postEvent(new AmSipRequestEvent(req)); + posted = true; + } + } + m_queues.unlock(); + + return posted; +} diff --git a/core/AmEventDispatcher.h b/core/AmEventDispatcher.h new file mode 100644 index 00000000..ab3d65ed --- /dev/null +++ b/core/AmEventDispatcher.h @@ -0,0 +1,85 @@ +/* + * $Id: $ + * + * Copyright (C) 2008 Raphael Coeffic + * + * This file is part of sems, a free SIP media server. + * + * sems is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * For a license to use the ser software under conditions + * other than those described here, or to purchase support for this + * software, please contact iptel.org by e-mail at the following addresses: + * info@iptel.org + * + * sems is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#ifndef _AMEVENTDISPATCHER_h_ +#define _AMEVENTDISPATCHER_h_ + +#include "AmEventQueue.h" +#include "AmSipMsg.h" +#include + +class AmEventDispatcher +{ +public: + + typedef std::map EvQueueMap; + typedef EvQueueMap::iterator EvQueueMapIter; + + typedef std::map Dictionnary; + typedef Dictionnary::iterator DictIter; + + +private: + + static AmEventDispatcher *_instance; + + /** + * Container for active sessions + * local tag -> event queue + */ + EvQueueMap queues; + + /** + * Call ID + remote tag -> local tag + * (needed for CANCELs and some provisionnal answers) + * (UAS sessions only) + */ + Dictionnary id_lookup; + + // mutex for "queues" and "id_lookup" + AmMutex m_queues; + +public: + + static AmEventDispatcher* instance(); + + bool postSipRequest(const string& callid, const string& remote_tag, + const AmSipRequest& req); + + bool post(const string& local_tag, AmEvent* ev); + bool post(const string& callid, const string& remote_tag, AmEvent* ev); + + bool addEventQueue(const string& local_tag, + AmEventQueue* q, + const string& callid="", + const string& remote_tag=""); + + AmEventQueue* delEventQueue(const string& local_tag, + const string& callid="", + const string& remote_tag=""); +}; + +#endif diff --git a/core/AmEventQueue.h b/core/AmEventQueue.h index a9dc4435..4c74d848 100644 --- a/core/AmEventQueue.h +++ b/core/AmEventQueue.h @@ -51,7 +51,7 @@ protected: public: AmEventQueue(AmEventHandler*); - ~AmEventQueue(); + virtual ~AmEventQueue(); void postEvent(AmEvent*); void processEvents(); diff --git a/core/AmPlugIn.cpp b/core/AmPlugIn.cpp index 66e9f4f3..53760acf 100644 --- a/core/AmPlugIn.cpp +++ b/core/AmPlugIn.cpp @@ -227,15 +227,15 @@ int AmPlugIn::load(const string& directory, const string& plugins) } // load SIPEventHandlers - for(std::map::iterator it = name2sipeh.begin(); - it != name2sipeh.end(); it++){ - err = it->second->onLoad(); - if(err) - return err; - // register for receiving replys - //AmReplyHandler::get()->registerReplyHandler(it->second); - AmSipDispatcher::instance()->registerReplyHandler(it->second); - } +// for(std::map::iterator it = name2sipeh.begin(); +// it != name2sipeh.end(); it++){ +// err = it->second->onLoad(); +// if(err) +// return err; +// // register for receiving replys +// //AmReplyHandler::get()->registerReplyHandler(it->second); +// AmSipDispatcher::instance()->registerReplyHandler(it->second); +// } // init logging facilities for(std::map::iterator it = name2logfac.begin(); @@ -311,11 +311,11 @@ int AmPlugIn::loadPlugIn(const string& file) has_sym=true; } - if((fc = (FactoryCreate)dlsym(h_dl,FACTORY_SIP_EVENT_HANDLER_EXPORT_STR)) != NULL){ - if(loadSIPehPlugIn((AmPluginFactory*)fc())) - goto error; - has_sym=true; - } +// if((fc = (FactoryCreate)dlsym(h_dl,FACTORY_SIP_EVENT_HANDLER_EXPORT_STR)) != NULL){ +// if(loadSIPehPlugIn((AmPluginFactory*)fc())) +// goto error; +// has_sym=true; +// } if((fc = (FactoryCreate)dlsym(h_dl,FACTORY_LOG_FACILITY_EXPORT_STR)) != NULL){ if(loadLogFacPlugIn((AmPluginFactory*)fc())) @@ -444,13 +444,13 @@ AmDynInvokeFactory* AmPlugIn::getFactory4Di(const string& name) return 0; } -AmSIPEventHandler* AmPlugIn::getFactory4SIPeh(const string& name) -{ - std::map::iterator it = name2sipeh.find(name); - if(it != name2sipeh.end()) - return it->second; - return 0; -} +// AmSIPEventHandler* AmPlugIn::getFactory4SIPeh(const string& name) +// { +// std::map::iterator it = name2sipeh.find(name); +// if(it != name2sipeh.end()) +// return it->second; +// return 0; +// } AmLoggingFacility* AmPlugIn::getFactory4LogFaclty(const string& name) { @@ -574,28 +574,28 @@ int AmPlugIn::loadDiPlugIn(AmPluginFactory* f) return -1; } -int AmPlugIn::loadSIPehPlugIn(AmPluginFactory* f) -{ - AmSIPEventHandler* sf = dynamic_cast(f); - if(!sf){ - ERROR("invalid SIP event handler plug-in!\n"); - goto error; - } +// int AmPlugIn::loadSIPehPlugIn(AmPluginFactory* f) +// { +// AmSIPEventHandler* sf = dynamic_cast(f); +// if(!sf){ +// ERROR("invalid SIP event handler plug-in!\n"); +// goto error; +// } - if(name2sipeh.find(sf->getName()) != name2sipeh.end()){ - ERROR("sip event handler component '%s' already loaded !\n", - sf->getName().c_str()); - goto error; - } +// if(name2sipeh.find(sf->getName()) != name2sipeh.end()){ +// ERROR("sip event handler component '%s' already loaded !\n", +// sf->getName().c_str()); +// goto error; +// } - name2sipeh.insert(std::make_pair(sf->getName(),sf)); - DBG("sip event handler component '%s' loaded.\n",sf->getName().c_str()); +// name2sipeh.insert(std::make_pair(sf->getName(),sf)); +// DBG("sip event handler component '%s' loaded.\n",sf->getName().c_str()); - return 0; +// return 0; - error: - return -1; -} +// error: +// return -1; +// } int AmPlugIn::loadLogFacPlugIn(AmPluginFactory* f) { @@ -745,3 +745,55 @@ bool AmPlugIn::registerFactory4App(const string& app_name, AmSessionFactory* f) name2app.insert(make_pair(app_name,f)); return true; } + +AmSessionFactory* AmPlugIn::findSessionFactory(AmSipRequest& req) +{ + if(req.cmd.empty()){ + ERROR("AmPlugIn::findSessionFactory: req.cmd is empty!\n"); + return NULL; + } + else if(req.cmd == "sems"){ + + switch (AmConfig::AppSelect) { + + case AmConfig::App_RURIUSER: + req.cmd = req.user; + break; + case AmConfig::App_APPHDR: + req.cmd = getHeader(req.hdrs, APPNAME_HDR); + break; + case AmConfig::App_RURIPARAM: + req.cmd = get_header_param(req.r_uri, "app"); + break; + case AmConfig::App_MAPPING: + { + req.cmd = ""; + for (AmConfig::AppMappingVector::iterator it = + AmConfig::AppMapping.begin(); + it != AmConfig::AppMapping.end(); it++){ + if (!regexec(&it->first, req.r_uri.c_str(), 0, NULL, 0)) { + DBG("match of r_uri '%s' to application %s\n", + req.r_uri.c_str(), it->second.c_str()); + req.cmd = it->second; + break; + } + } + } break; + case AmConfig::App_SPECIFIED: + req.cmd = AmConfig::Application; + break; + } + + if (req.cmd.empty()) { + ERROR("could not find any application matching configured criteria\n"); + return NULL; + } + } + + AmSessionFactory* session_factory = getFactory4App(req.cmd); + if(!session_factory) { + ERROR("AmPlugIn::findSessionFactory: application '%s' not found !\n", req.cmd.c_str()); + } + + return session_factory; +} diff --git a/core/AmPlugIn.h b/core/AmPlugIn.h index 25f0987f..9c5be94b 100644 --- a/core/AmPlugIn.h +++ b/core/AmPlugIn.h @@ -40,9 +40,10 @@ class AmPluginFactory; class AmSessionFactory; class AmSessionEventHandlerFactory; class AmDynInvokeFactory; -class AmSIPEventHandler; +//class AmSIPEventHandler; class AmLoggingFacility; class AmCtrlInterfaceFactory; +class AmSipRequest; struct amci_exports_t; struct amci_codec_t; @@ -91,13 +92,13 @@ class AmPlugIn : public AmPayloadProviderInterface std::map payloads; std::map payload_order; std::map file_formats; - std::map name2app; + std::map name2app; std::map name2seh; - std::map name2base; - std::map name2di; - std::map name2sipeh; - std::map name2logfac; + std::map name2base; + std::map name2di; + std::map name2logfac; + AmCtrlInterfaceFactory *ctrlIface; int dynamic_pl; // range: 96->127, see RFC 1890 @@ -114,7 +115,6 @@ class AmPlugIn : public AmPayloadProviderInterface int loadSehPlugIn(AmPluginFactory* cb); int loadBasePlugIn(AmPluginFactory* cb); int loadDiPlugIn(AmPluginFactory* cb); - int loadSIPehPlugIn(AmPluginFactory* f); int loadLogFacPlugIn(AmPluginFactory* f); int loadCtrlFacPlugIn(AmPluginFactory* f); @@ -150,8 +150,10 @@ class AmPlugIn : public AmPayloadProviderInterface /** @return the suported payloads. */ const std::map& getPayloads() { return payloads; } + /** @return the order of payloads. */ const std::map& getPayloadOrder() { return payload_order; } + /** * File format lookup according to the * format name and/or file extension. @@ -160,6 +162,7 @@ class AmPlugIn : public AmPayloadProviderInterface * @return NULL if failed. */ amci_inoutfmt_t* fileFormat(const string& fmt_name, const string& ext = ""); + /** * File format's subtype lookup function. * @param iofmt The file format. @@ -167,12 +170,14 @@ class AmPlugIn : public AmPayloadProviderInterface * @return NULL if failed. */ amci_subtype_t* subtype(amci_inoutfmt_t* iofmt, int subtype); + /** * Codec lookup function. * @param id Codec ID (see amci/codecs.h). * @return NULL if failed. */ amci_codec_t* codec(int id); + /** * Application lookup function * @param app_name application name @@ -180,6 +185,15 @@ class AmPlugIn : public AmPayloadProviderInterface */ AmSessionFactory* getFactory4App(const string& app_name); + /** @return true if this record has been inserted. */ + bool registerFactory4App(const string& app_name, AmSessionFactory* f); + + /** + * Find the proper SessionFactory + * for the given request. + */ + AmSessionFactory* findSessionFactory(AmSipRequest& req); + /** * Session event handler lookup function * @param name application name @@ -192,13 +206,6 @@ class AmPlugIn : public AmPayloadProviderInterface */ AmDynInvokeFactory* getFactory4Di(const string& name); - /** - * SIP event handler lookup function - * @param name application name - * @return NULL if failed (-> handler not found). - */ - AmSIPEventHandler* getFactory4SIPeh(const string& name); - /** * logging facility lookup function * @param name application name @@ -206,8 +213,6 @@ class AmPlugIn : public AmPayloadProviderInterface */ AmLoggingFacility* getFactory4LogFaclty(const string& name); - /** @return true if this record has been inserted. */ - bool registerFactory4App(const string& app_name, AmSessionFactory* f); }; #endif diff --git a/core/AmSession.cpp b/core/AmSession.cpp index 71aeb90c..a74703b2 100644 --- a/core/AmSession.cpp +++ b/core/AmSession.cpp @@ -49,6 +49,9 @@ #include #include +volatile unsigned int AmSession::session_num = 0; + + // AmSessionEventHandler methods bool AmSessionEventHandler::process(AmEvent*) { @@ -387,6 +390,8 @@ void AmSession::run() } #endif + session_num++; + try { try { @@ -430,8 +435,10 @@ void AmSession::run() catch(const AmSession::Exception& e){ ERROR("%i %s\n",e.code,e.reason.c_str()); } - + destroy(); + + session_num--; // wait at least until session is out of RtpScheduler DBG("session is stopped.\n"); @@ -469,6 +476,12 @@ string AmSession::getNewId() return id; } +unsigned int AmSession::getSessionNum() +{ + return AmSession::session_num; +} + + void AmSession::setInbandDetector(Dtmf::InbandDetectorType t) { m_dtmfDetector.setInbandDetector(t); diff --git a/core/AmSession.h b/core/AmSession.h index 794e751e..12d42b73 100644 --- a/core/AmSession.h +++ b/core/AmSession.h @@ -58,7 +58,8 @@ class AmDtmfEvent; #define FL_FORCE_ACTIVE 2 /** - * \brief Interface for SIP events signaling plugins implement + * \brief Interface for SIP signaling plugins that + * must change requests or replies (ex: session timer). * * Signaling plugins must inherite from this class. */ @@ -73,8 +74,8 @@ public: virtual ~AmSessionEventHandler() {} /* - * All the methods return true if event processing - * should stopped after calling them. + * All the methods return true if the event processing + * shall be stopped after them. */ virtual bool process(AmEvent*); virtual bool onSipEvent(AmSipEvent*); @@ -142,6 +143,8 @@ private: AmCondition sess_stopped; AmCondition detached; + static volatile unsigned int session_num; + friend class AmMediaProcessor; friend class AmMediaProcessorThread; friend class AmSessionContainer; @@ -364,6 +367,11 @@ public: */ static string getNewId(); + /** + * Gets the number of running sessions + */ + static unsigned int getSessionNum(); + /** * Entry point for DTMF events */ diff --git a/core/AmSessionContainer.cpp b/core/AmSessionContainer.cpp index 8f84f900..d6be14e9 100644 --- a/core/AmSessionContainer.cpp +++ b/core/AmSessionContainer.cpp @@ -31,9 +31,12 @@ #include "AmApi.h" #include "AmConfig.h" #include "AmUtils.h" +#include "AmEventDispatcher.h" + #include #include #include + #include "sems.h" // AmSessionContainer methods @@ -41,7 +44,8 @@ AmSessionContainer* AmSessionContainer::_SessionContainer=0; AmSessionContainer::AmSessionContainer() - : _run_cond(false) + : _run_cond(false) + { } @@ -136,52 +140,22 @@ void AmSessionContainer::destroySession(AmSession* s) void AmSessionContainer::destroySession(const string& local_tag) { - as_mut.lock(); - - SessionMapIter sess_it = a_sessions.find(local_tag); - if(sess_it != a_sessions.end()){ - - AmSession* sess = sess_it->second; - as_id_lookup.erase(sess->getCallID() + sess->getRemoteTag()); - a_sessions.erase(sess_it); + AmSession* s = NULL; + AmEventQueue* q = AmEventDispatcher::instance()-> + delEventQueue(local_tag); + + if(q && + (s = dynamic_cast(q))) { - stopAndQueue(sess); - DBG("session stopped and queued for deletion (#sessions=%u)\n", - (unsigned int)a_sessions.size()); - } - else { - DBG("could not remove session: id not found\n"); - } - - as_mut.unlock(); -} - -AmSession* AmSessionContainer::getSession(const string& callid, const string& remote_tag) -{ - DictIter it = as_id_lookup.find(callid+remote_tag); - if(it == as_id_lookup.end()){ - //ERROR("could not find session (callid='%s'; remote_tag='%s')\n", - // callid.c_str(),remote_tag.c_str()); - return NULL; - } - - return getSession(it->second); -} - -AmSession* AmSessionContainer::getSession(const string& local_tag) -{ - SessionMapIter it = a_sessions.find(local_tag); - if(it == a_sessions.end()){ - //ERROR("could not find session (local_tag='%s')\n",local_tag.c_str()); - return NULL; - } - - return it->second; + stopAndQueue(s); + } + else { + DBG("could not remove session: id not found or wrong type\n"); + } } AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session_params) { AmSession* session = NULL; - as_mut.lock(); try { if((session = createSession(req, session_params)) != 0){ session->dlg.updateStatusFromLocalRequest(req); // sets local tag as well @@ -192,7 +166,6 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session ERROR("INVITE could not be sent: error code = %d.\n", err); delete session; - as_mut.unlock(); return NULL; } @@ -203,9 +176,7 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session session->start(); - addSession_unsafe(req.callid,"",req.from_tag,session); - // session does not get its own INVITE - // session->postEvent(new AmSipRequestEvent(req)); + addSession("","",req.from_tag,session); } } catch(const AmSession::Exception& e){ @@ -220,23 +191,13 @@ AmSession* AmSessionContainer::startSessionUAC(AmSipRequest& req, AmArg* session ERROR("unexpected exception\n"); AmSipDialog::reply_error(req,500,"unexpected exception"); } - as_mut.unlock(); + return session; } void AmSessionContainer::startSessionUAS(AmSipRequest& req) { - as_mut.lock(); try { - - AmSession* session = getSession(req.callid,req.from_tag); - if( session ){ - - // it's a forked-and-merged INVITE - // reply 482 Loop detected - throw AmSession::Exception(482, "Loop detected"); - } - else { // Call-ID and From-Tag are unknown: it's a new session AmSession* session; if((session = createSession(req)) != 0){ @@ -254,10 +215,9 @@ void AmSessionContainer::startSessionUAS(AmSipRequest& req) session->start(); - addSession_unsafe(req.callid,req.from_tag,local_tag,session); + addSession(req.callid,req.from_tag,local_tag,session); session->postEvent(new AmSipRequestEvent(req)); } - } } catch(const AmSession::Exception& e){ ERROR("%i %s\n",e.code,e.reason.c_str()); @@ -271,7 +231,6 @@ void AmSessionContainer::startSessionUAS(AmSipRequest& req) ERROR("unexpected exception\n"); AmSipDialog::reply_error(req,500,"unexpected exception"); } - as_mut.unlock(); } @@ -279,186 +238,95 @@ bool AmSessionContainer::postEvent(const string& callid, const string& remote_tag, AmEvent* event) { - // DBG("postEvent: callid = %s; remote_tag = %s\n", - // callid.c_str(),remote_tag.c_str()); + bool posted = + AmEventDispatcher::instance()-> + post(callid,remote_tag,event); - as_mut.lock(); - AmSession* s = getSession(callid,remote_tag); - - if(!s){ - as_mut.unlock(); - delete event; - return false; - } - - s->postEvent(event); - as_mut.unlock(); + if(!posted) + delete event; - return true; + return posted; } bool AmSessionContainer::postEvent(const string& local_tag, AmEvent* event) { - // DBG("postEvent: local_tag = %s\n",local_tag.c_str()); - as_mut.lock(); - AmSession* s = getSession(local_tag); - - if (s != NULL) { - s->postEvent(event); - as_mut.unlock(); - return true; - } - as_mut.unlock(); - - // try session factories - AmSessionFactory* sf = AmPlugIn::instance()->getFactory4App(local_tag); - if (sf != NULL) { - sf->postEvent(event); - return true; - } + bool posted = + AmEventDispatcher::instance()-> + post(local_tag,event); + + if(!posted) + delete event; + + return posted; - delete event; - return false; } AmSession* AmSessionContainer::createSession(AmSipRequest& req, AmArg* session_params) { - - if (AmConfig::SessionLimit && - AmConfig::SessionLimit <= a_sessions.size()) { - DBG("session_limit %d reached. Not creating session.\n", - AmConfig::SessionLimit); - throw AmSession::Exception(AmConfig::SessionLimitErrCode, AmConfig::SessionLimitErrReason); + if (AmConfig::SessionLimit && + AmConfig::SessionLimit <= AmSession::session_num) { + + DBG("session_limit %d reached. Not creating session.\n", + AmConfig::SessionLimit); + + AmSipDialog::reply_error(req,AmConfig::SessionLimitErrCode, + AmConfig::SessionLimitErrReason); + return NULL; } - if(req.cmd.empty()){ - throw string("AmSessionContainer::createSession: req.cmd is empty!\n"); - } - else if(req.cmd == "sems"){ - switch (AmConfig::AppSelect) { - case AmConfig::App_RURIUSER: - req.cmd = req.user; - break; - case AmConfig::App_APPHDR: - req.cmd = getHeader(req.hdrs, APPNAME_HDR); - break; - case AmConfig::App_RURIPARAM: - req.cmd = get_header_param(req.r_uri, "app"); - break; - case AmConfig::App_MAPPING: - { - req.cmd = ""; - for (AmConfig::AppMappingVector::iterator it = - AmConfig::AppMapping.begin(); - it != AmConfig::AppMapping.end(); it++){ - if (!regexec(&it->first, req.r_uri.c_str(), 0, NULL, 0)) { - DBG("match of r_uri '%s' to application %s\n", - req.r_uri.c_str(), it->second.c_str()); - req.cmd = it->second; - break; - } - } - } break; - case AmConfig::App_SPECIFIED: - req.cmd = AmConfig::Application; - break; - } - if (req.cmd.empty()) { - string ex = "Unknown Application"; - throw ex; - } - } + AmSessionFactory* session_factory = + AmPlugIn::instance()->findSessionFactory(req); - AmSessionFactory* state_factory = AmPlugIn::instance()->getFactory4App(req.cmd); - if(!state_factory) { - ERROR("application '%s' not found !\n", req.cmd.c_str()); - throw string("application '" + req.cmd + "' not found !"); + if(!session_factory) { + + ERROR("No session factory"); + AmSipDialog::reply_error(req,500,"No session factory"); + + return NULL; } - + AmSession* session = NULL; if (req.method == "INVITE") { if (NULL != session_params) - session = state_factory->onInvite(req, *session_params); + session = session_factory->onInvite(req, *session_params); else - session = state_factory->onInvite(req); + session = session_factory->onInvite(req); } else if (req.method == "REFER") { if (NULL != session_params) - session = state_factory->onRefer(req, *session_params); + session = session_factory->onRefer(req, *session_params); else - session = state_factory->onRefer(req); + session = session_factory->onRefer(req); } if(!session) { - // State creation failed: + // Session creation failed: // application denied session creation // or there was an error. // // let's hope the createState function has replied... // ... and do nothing ! - DBG("onInvite returned NULL\n"); - return 0; + DBG("onInvite/onRefer returned NULL\n"); } - - //state_factory->configureSession(session); - //session->checkSessionExpires(req); return session; } -bool AmSessionContainer::addSession_unsafe(const string& callid, - const string& remote_tag, - const string& local_tag, - AmSession* session) -{ - if(getSession(callid,remote_tag)) - return false; - - if (!remote_tag.empty()) - as_id_lookup[callid+remote_tag] = local_tag; - - return addSession_unsafe(local_tag,session); -} - -bool AmSessionContainer::addSession_unsafe(const string& local_tag, - AmSession* session) -{ - if(getSession(local_tag)) - return false; - - //DBG("a_sessions['%s'] = session\n",local_tag.c_str()); - a_sessions[local_tag] = session; - return true; -} - bool AmSessionContainer::addSession(const string& callid, const string& remote_tag, const string& local_tag, AmSession* session) { - as_mut.lock(); - bool ret = addSession_unsafe(callid,remote_tag,local_tag,session); - as_mut.unlock(); - return ret; + return AmEventDispatcher::instance()-> + addEventQueue(local_tag,(AmEventQueue*)session, + callid,remote_tag); } bool AmSessionContainer::addSession(const string& local_tag, AmSession* session) { - as_mut.lock(); - bool ret = addSession_unsafe(local_tag,session); - as_mut.unlock(); - return ret; -} - -int AmSessionContainer::getSize() -{ - int res=0; - as_mut.lock(); - res = a_sessions.size(); - as_mut.unlock(); - - return res; + return AmEventDispatcher::instance()-> + addEventQueue(local_tag,(AmEventQueue*)session); } diff --git a/core/AmSessionContainer.h b/core/AmSessionContainer.h index 2927ab71..561abf7a 100644 --- a/core/AmSessionContainer.h +++ b/core/AmSessionContainer.h @@ -49,32 +49,8 @@ class AmSessionContainer : public AmThread { static AmSessionContainer* _SessionContainer; - // some typedefs ... - typedef std::map SessionMap; - typedef SessionMap::iterator SessionMapIter; - - typedef std::map Dictionnary; - typedef Dictionnary::iterator DictIter; - typedef std::queue SessionQueue; - /** - * Container for active sessions - * local tag -> session - */ - SessionMap a_sessions; - - /** - * Call ID + remote tag -> local tag - * (needed for CANCELs and some provisionnal answers) - * (UAS sessions only) - */ - Dictionnary as_id_lookup; - - /** Mutex to protect the active session container */ - AmMutex as_mut; - - /** Container for dead sessions */ SessionQueue d_sessions; /** Mutex to protect the dead session container */ @@ -86,39 +62,6 @@ class AmSessionContainer : public AmThread /** We are a Singleton ! Avoid people to have their own instance. */ AmSessionContainer(); - /** - * Search the container for a session coresponding - * to callid and remote_tag. (UAS only). - * - * @return the session related to callid & remote_tag - * or NULL if none has been found. - */ - AmSession* getSession(const string& callid, const string& remote_tag); - - /** - * Search the container for a session coresponding to local_tag. - * - * @return the session related to local_tag - * or NULL if none has been found. - */ - AmSession* getSession(const string& local_tag); - - /** - * Adds a session to the container. (UAS only) - * @return true if the session is new within the container. - */ - bool addSession_unsafe(const string& callid, - const string& remote_tag, - const string& local_tag, - AmSession* session); - - /** - * Adds a session to the container. - * @return true if the session is new within the container. - */ - bool addSession_unsafe(const string& local_tag, - AmSession* session); - /** * Tries to stop the session and queue it destruction. */ @@ -154,7 +97,7 @@ class AmSessionContainer : public AmThread * @return true if the session is new within the container. */ bool addSession(const string& local_tag, - AmSession* session); + AmSession* session); /** * Constructs a new session and adds it to the active session container. @@ -175,11 +118,6 @@ class AmSessionContainer : public AmThread void destroySession(AmSession* s); void destroySession(const string& local_tag); - /** - * Query the number of active sessions - */ - int getSize(); - /** * post an event into the event queue of the identified dialog. * @return false if session doesn't exist diff --git a/core/AmSipDispatcher.cpp b/core/AmSipDispatcher.cpp index ad58aaff..1fa91487 100644 --- a/core/AmSipDispatcher.cpp +++ b/core/AmSipDispatcher.cpp @@ -1,3 +1,29 @@ +/* + * $Id: $ + * + * Copyright (C) 2008 Raphael Coeffic + * + * This file is part of sems, a free SIP media server. + * + * sems is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * For a license to use the ser software under conditions + * other than those described here, or to purchase support for this + * software, please contact iptel.org by e-mail at the following addresses: + * info@iptel.org + * + * sems is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ #include "AmSessionContainer.h" #include "AmSipDialog.h" @@ -5,6 +31,7 @@ #include "log.h" #include "AmSipDispatcher.h" +#include "AmEventDispatcher.h" AmSipDispatcher *AmSipDispatcher::_instance; @@ -15,13 +42,12 @@ AmSipDispatcher* AmSipDispatcher::instance() void AmSipDispatcher::handleSipMsg(AmSipReply &reply) { - if (!AmSessionContainer::instance()->postEvent(reply.local_tag, - new AmSipReplyEvent(reply))) { - for (vector::iterator it = - reply_handlers.begin(); it != reply_handlers.end(); it++) - if ((*it)->onSipReply(reply)) - break; - } + AmSipReplyEvent* ev = new AmSipReplyEvent(reply); + if(!AmEventDispatcher::instance()->post(reply.local_tag,ev)){ + + ERROR("could not dispatch reply\n"); + delete ev; + } } void AmSipDispatcher::handleSipMsg(AmSipRequest &req) @@ -30,44 +56,46 @@ void AmSipDispatcher::handleSipMsg(AmSipRequest &req) string remote_tag = req.from_tag; string local_tag = req.to_tag; - bool sess_exists; - AmSipRequestEvent* ev = new AmSipRequestEvent(req); - AmSessionContainer* sess_cont = AmSessionContainer::instance(); - - if(local_tag.empty()) - sess_exists = sess_cont->postEvent(callid,remote_tag,ev); - else - sess_exists = sess_cont->postEvent(local_tag,ev); + AmEventDispatcher* ev_disp = AmEventDispatcher::instance(); - if(!sess_exists){ - - DBG("method: `%s' [%zd].\n", req.method.c_str(), req.method.length()); - - if((req.method == "INVITE")){ + if(!local_tag.empty()) { + AmSipRequestEvent* ev = new AmSipRequestEvent(req); - sess_cont->startSessionUAS(req); - } - else if(req.method == "OPTIONS"){ - - // Basic OPTIONS support - AmSipDialog::reply_error(req,200,"OK"); - return; - } - else { + if(!ev_disp->post(local_tag,ev)) { - if(!local_tag.empty() || req.method == "CANCEL"){ + delete ev; + + AmSipDialog::reply_error(req,481, + "Call leg/Transaction does not exist"); + } - AmSipDialog::reply_error(req,481, - "Call leg/Transaction does not exist"); + return; + } - } else { + if(ev_disp->postSipRequest(callid, remote_tag, req)){ + + return; + } + + DBG("method: `%s' [%zd].\n", req.method.c_str(), req.method.length()); + + if((req.method == "INVITE")){ + + AmSessionContainer::instance()->startSessionUAS(req); + } + else if(req.method == "OPTIONS"){ + + // Basic OPTIONS support + AmSipDialog::reply_error(req,200,"OK"); + return; + } + else { - ERROR("sorry, we don't support beginning a new session with " - "a '%s' message\n", req.method.c_str()); + AmSessionFactory* sess_fact = AmPlugIn::instance()->findSessionFactory(req); + if(!sess_fact){ - AmSipDialog::reply_error(req,501,"Not Implemented"); - return; + AmSipDialog::reply_error(req,500,"Not implemented"); + return; } - } } } diff --git a/core/AmSipDispatcher.h b/core/AmSipDispatcher.h index d5cac0d6..3c848f9a 100644 --- a/core/AmSipDispatcher.h +++ b/core/AmSipDispatcher.h @@ -1,30 +1,43 @@ +/* + * $Id: $ + * + * Copyright (C) 2008 Raphael Coeffic + * + * This file is part of sems, a free SIP media server. + * + * sems is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * For a license to use the ser software under conditions + * other than those described here, or to purchase support for this + * software, please contact iptel.org by e-mail at the following addresses: + * info@iptel.org + * + * sems is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ #ifndef __AMSIPDISPATCHER_H__ #define __AMSIPDISPATCHER_H__ -#include - #include "AmSipMsg.h" -#include "AmApi.h" - -using std::vector; class AmSipDispatcher { private: static AmSipDispatcher *_instance; - vector reply_handlers; public: void handleSipMsg(AmSipRequest &); void handleSipMsg(AmSipReply &); - /** register a reply handler for incoming replies pertaining - * to a dialog without a session (not in SessionContainer) */ - void registerReplyHandler(AmSIPEventHandler* eh) - { - reply_handlers.push_back(eh); - } - static AmSipDispatcher* instance(); }; diff --git a/core/plug-in/session_timer/SessionTimer.cpp b/core/plug-in/session_timer/SessionTimer.cpp index 02d26a25..99b1f484 100644 --- a/core/plug-in/session_timer/SessionTimer.cpp +++ b/core/plug-in/session_timer/SessionTimer.cpp @@ -60,6 +60,7 @@ SessionTimer::SessionTimer(AmSession* s) bool SessionTimer::process(AmEvent* ev) { + assert(ev); AmTimeoutEvent* timeout_ev = dynamic_cast(ev); if (timeout_ev) { DBG("received timeout Event with ID %d\n", timeout_ev->data.get(0).asInt()); diff --git a/core/plug-in/stats/StatsUDPServer.cpp b/core/plug-in/stats/StatsUDPServer.cpp index f3a5af6f..a09d2770 100644 --- a/core/plug-in/stats/StatsUDPServer.cpp +++ b/core/plug-in/stats/StatsUDPServer.cpp @@ -243,7 +243,7 @@ int StatsUDPServer::execute(char* msg_buf, string& reply, msg_get_param(msg_c,cmd_str,buffer,CTRL_MSGBUF_SIZE); if(cmd_str == "calls") - reply = "Active calls: " + int2str(sc->getSize()) + "\n"; + reply = "Active calls: " + int2str(AmSession::getSessionNum()) + "\n"; else if (cmd_str == "which") { reply = "calls - number of active calls (Session Container size)\n" diff --git a/core/sems.cpp b/core/sems.cpp index e14a2dbb..2f6f49b4 100644 --- a/core/sems.cpp +++ b/core/sems.cpp @@ -474,7 +474,6 @@ static void getInterfaceList(int sd, std::vector >& if_ static string getLocalIP(const string& dev_name) { - //DBG("getLocalIP(%s)\n", dev_name.c_str()); #ifdef SUPPORT_IPV6 struct sockaddr_storage ss;