diff --git a/core/AmSessionContainer.cpp b/core/AmSessionContainer.cpp index 301c6172..1149d054 100644 --- a/core/AmSessionContainer.cpp +++ b/core/AmSessionContainer.cpp @@ -44,8 +44,13 @@ AmSessionContainer* AmSessionContainer::_instance=NULL; _MONITORING_DECLARE_INTERFACE(AmSessionContainer); AmSessionContainer::AmSessionContainer() - : _run_cond(false), _container_closed(false), enable_unclean_shutdown(false), - CPSLimit(0), CPSHardLimit(0), max_cps(0), cps_samplerate(5) + : _run_cond(false), + _container_closed(false), + enable_unclean_shutdown(false), + CPSLimit(0), + CPSHardLimit(0), + max_cps(0), + cps_samplerate(5) { } @@ -57,14 +62,14 @@ AmSessionContainer* AmSessionContainer::instance() return _instance; } -void AmSessionContainer::dispose() +void AmSessionContainer::dispose() { - if(_instance != NULL) { - if(!_instance->is_stopped()) { + if (_instance != NULL) { + if (!_instance->is_stopped()) { _instance->stop(); while (!_instance->is_stopped()) - usleep(10000); + usleep(10000); } // todo: add locking here delete _instance; @@ -72,57 +77,61 @@ void AmSessionContainer::dispose() } } -bool AmSessionContainer::clean_sessions() { +bool AmSessionContainer::clean_sessions() +{ ds_mut.lock(); DBG("Session cleaner starting its work\n"); try { SessionQueue n_sessions; - - while(!d_sessions.empty()){ - + + while (!d_sessions.empty()) + { AmSession* cur_session = d_sessions.front(); d_sessions.pop(); ds_mut.unlock(); - - if(cur_session->is_stopped() && !cur_session->isProcessingMedia()){ - - MONITORING_MARK_FINISHED(cur_session->getLocalTag().c_str()); - DBG("session [%p] has been destroyed\n",(void*)cur_session->_pid); - delete cur_session; + if (cur_session->is_stopped() && !cur_session->isProcessingMedia()) { + MONITORING_MARK_FINISHED(cur_session->getLocalTag().c_str()); + + DBG("session [%p] has been destroyed\n",(void*)cur_session->_pid); + delete cur_session; } else { - DBG("session [%p] still running\n",(void*)cur_session->_pid); - n_sessions.push(cur_session); + DBG("session [%p] still running\n",(void*)cur_session->_pid); + n_sessions.push(cur_session); } - + ds_mut.lock(); } - + swap(d_sessions,n_sessions); - - }catch(std::exception& e){ + + } + catch (std::exception& e) { ERROR("exception caught in session cleaner: %s\n", e.what()); throw; /* throw again as this is fatal (because unlocking the mutex fails!! */ - }catch(...){ + } + catch(...) { ERROR("unknown exception caught in session cleaner!\n"); throw; /* throw again as this is fatal (because unlocking the mutex fails!! */ } + bool more = !d_sessions.empty(); ds_mut.unlock(); return more; } -void AmSessionContainer::initMonitoring() { +void AmSessionContainer::initMonitoring() +{ _MONITORING_INIT; } void AmSessionContainer::run() { - while(!_container_closed.get()){ - + while (!_container_closed.get()) + { _run_cond.wait_for(); if(_container_closed.get()) @@ -140,15 +149,14 @@ void AmSessionContainer::run() DBG("Session cleaner terminating\n"); } -void AmSessionContainer::broadcastShutdown() { - DBG("brodcasting ServerShutdown system event to %u sessions...\n", - AmSession::getSessionNum()); - AmEventDispatcher::instance()-> - broadcast(new AmSystemEvent(AmSystemEvent::ServerShutdown)); +void AmSessionContainer::broadcastShutdown() +{ + DBG("brodcasting ServerShutdown system event to %u sessions...\n", AmSession::getSessionNum()); + AmEventDispatcher::instance()->broadcast(new AmSystemEvent(AmSystemEvent::ServerShutdown)); } -void AmSessionContainer::on_stop() -{ +void AmSessionContainer::on_stop() +{ _container_closed.set(true); if (enable_unclean_shutdown) { @@ -159,18 +167,21 @@ void AmSessionContainer::on_stop() DBG("waiting for active event queues to stop...\n"); for (unsigned int i=0; - (!AmEventDispatcher::instance()->empty() && - (!AmConfig::MaxShutdownTime || - i < AmConfig::MaxShutdownTime * 1000 / 10));i++) + (!AmEventDispatcher::instance()->empty() && (!AmConfig::MaxShutdownTime || i < AmConfig::MaxShutdownTime * 1000 / 10)); + i++) + { usleep(10000); + } if (!AmEventDispatcher::instance()->empty()) { WARN("Not all calls cleanly ended!\n"); } - + DBG("cleaning sessions...\n"); - while (clean_sessions()) + while (clean_sessions()) + { usleep(10000); + } } _run_cond.set(true); // so that thread stops @@ -178,10 +189,8 @@ void AmSessionContainer::on_stop() void AmSessionContainer::stopAndQueue(AmSession* s) { - - if (AmConfig::LogSessions) { - INFO("session cleaner about to stop %s\n", - s->getLocalTag().c_str()); + if (AmConfig::LogSessions) { + INFO("session cleaner about to stop %s\n", s->getLocalTag().c_str()); } s->stop(); @@ -194,18 +203,18 @@ void AmSessionContainer::stopAndQueue(AmSession* s) void AmSessionContainer::destroySession(AmSession* s) { - AmEventQueueInterface* q = AmEventDispatcher::instance()-> - delEventQueue(s->getLocalTag()); - - if(q) { - stopAndQueue(s); - } - else { - WARN("could not remove session: id not found or wrong type\n"); - } + AmEventQueueInterface* q = AmEventDispatcher::instance()->delEventQueue(s->getLocalTag()); + + if (q) { + stopAndQueue(s); + } + else { + WARN("could not remove session: id not found or wrong type\n"); + } } -string AmSessionContainer::startSessionUAC(const AmSipRequest& req, string& app_name, const AmArg* session_params) +string AmSessionContainer::startSessionUAC(const AmSipRequest& req, string& app_name, + const AmArg* session_params) { try { @@ -345,34 +354,27 @@ void AmSessionContainer::startSessionUAS(AmSipRequest& req) } } - bool AmSessionContainer::postEvent(const string& callid, - const string& remote_tag, - const string& via_branch, - AmEvent* event) + const string& remote_tag, + const string& via_branch, + AmEvent* event) { - bool posted = - AmEventDispatcher::instance()-> - post(callid,remote_tag,via_branch,event); + bool posted = AmEventDispatcher::instance()->post(callid,remote_tag,via_branch,event); - if(!posted) - delete event; + if (!posted) + delete event; - return posted; + return posted; } -bool AmSessionContainer::postEvent(const string& local_tag, - AmEvent* event) +bool AmSessionContainer::postEvent(const string& local_tag, AmEvent* event) { - bool posted = - AmEventDispatcher::instance()-> - post(local_tag,event); - - if(!posted) - delete event; + bool posted = AmEventDispatcher::instance()->post(local_tag,event); - return posted; + if (!posted) + delete event; + return posted; } void AmSessionContainer::setCPSLimit(unsigned int limit) @@ -393,7 +395,8 @@ void AmSessionContainer::setCPSSoftLimit(unsigned int percent) lock_guard lock(cps_mut); - while (cps_queue.size()) { + while (cps_queue.size()) + { timersub(&tv, &cps_queue.front(), &res); if (res.tv_sec >= cps_samplerate) { cps_queue.pop(); @@ -402,8 +405,10 @@ void AmSessionContainer::setCPSSoftLimit(unsigned int percent) break; } } + CPSLimit = (static_cast(percent) / 100) * (static_cast(cps_queue.size()) / cps_samplerate); - if(0 == CPSLimit) CPSLimit = 1; + if (0 == CPSLimit) + CPSLimit = 1; } pair AmSessionContainer::getCPSLimit() @@ -419,7 +424,8 @@ unsigned int AmSessionContainer::getAvgCPS() lock_guard lock(cps_mut); - while (cps_queue.size()) { + while (cps_queue.size()) + { timersub(&tv, &cps_queue.front(), &res); if (res.tv_sec >= cps_samplerate) { cps_queue.pop(); @@ -457,9 +463,11 @@ bool AmSessionContainer::check_and_add_cps(bool emergency_flag) DBG("Emergency call detected, but global policy is to do the CPSLimit check (license).\n"); } - if (emergency_flag) DBG("Emergency call detected, skip CPSLimit check (license).\n"); + if (emergency_flag) + DBG("Emergency call detected, skip CPSLimit check (license).\n"); - while (cps_queue.size()) { + while (cps_queue.size()) + { timersub(&tv, &cps_queue.front(), &res); if (res.tv_sec >= cps_samplerate) { cps_queue.pop(); @@ -485,27 +493,21 @@ bool AmSessionContainer::check_and_add_cps(bool emergency_flag) } AmSession* AmSessionContainer::createSession(const AmSipRequest& req, - string& app_name, - const AmArg* session_params) + string& app_name, + const AmArg* session_params) { if (AmConfig::ShutdownMode) { _run_cond.set(true); // so that thread stops DBG("Shutdown mode. Not creating session.\n"); - AmSipDialog::reply_error(req,AmConfig::ShutdownModeErrCode, - AmConfig::ShutdownModeErrReason); + AmSipDialog::reply_error(req, AmConfig::ShutdownModeErrCode, AmConfig::ShutdownModeErrReason); return NULL; } - if (AmConfig::SessionLimit && - AmConfig::SessionLimit <= AmSession::session_num) { - - DBG("session_limit %d reached. Not creating session.\n", - AmConfig::SessionLimit); - - AmSipDialog::reply_error(req,AmConfig::SessionLimitErrCode, - AmConfig::SessionLimitErrReason); - return NULL; + if (AmConfig::SessionLimit && AmConfig::SessionLimit <= AmSession::session_num) { + DBG("session_limit %d reached. Not creating session.\n", AmConfig::SessionLimit); + AmSipDialog::reply_error(req,AmConfig::SessionLimitErrCode, AmConfig::SessionLimitErrReason); + return NULL; } map app_params; @@ -522,8 +524,7 @@ AmSession* AmSessionContainer::createSession(const AmSipRequest& req, } if (check_and_add_cps(emergency_flag)) { - AmSipDialog::reply_error(req,AmConfig::CPSLimitErrCode, - AmConfig::CPSLimitErrReason); + AmSipDialog::reply_error(req,AmConfig::CPSLimitErrCode, AmConfig::CPSLimitErrReason); return NULL; } @@ -534,11 +535,10 @@ AmSession* AmSessionContainer::createSession(const AmSipRequest& req, session_factory = AmPlugIn::instance()->findSessionFactory(req,app_name); if(!session_factory) { + ERROR("No session factory for application\n"); + AmSipDialog::reply_error(req,500,SIP_REPLY_SERVER_INTERNAL_ERROR); - ERROR("No session factory for application\n"); - AmSipDialog::reply_error(req,500,SIP_REPLY_SERVER_INTERNAL_ERROR); - - return NULL; + return NULL; } AmSession* session = NULL; @@ -556,7 +556,7 @@ AmSession* AmSessionContainer::createSession(const AmSipRequest& req, session = session_factory->onRefer(req, app_name, app_params); } - if(!session) { + if (!session) { // Session creation failed: // application denied session creation // or there was an error. @@ -576,17 +576,18 @@ AmSession* AmSessionContainer::createSession(const AmSipRequest& req, AmSessionContainer::AddSessionStatus AmSessionContainer::addSession(const string& callid, - const string& remote_tag, - const string& local_tag, - const string& via_branch, - AmSession* session) + const string& remote_tag, + const string& local_tag, + const string& via_branch, + AmSession* session) { - if(_container_closed.get()) + if (_container_closed.get()) return ShutDown; - if(AmEventDispatcher::instance()-> - addEventQueue(local_tag,static_cast(session), - callid,remote_tag,via_branch)) { + if (AmEventDispatcher::instance()->addEventQueue(local_tag, + static_cast(session), + callid,remote_tag,via_branch)) + { return Inserted; } @@ -594,14 +595,14 @@ AmSessionContainer::addSession(const string& callid, } AmSessionContainer::AddSessionStatus -AmSessionContainer::addSession(const string& local_tag, - AmSession* session) +AmSessionContainer::addSession(const string& local_tag, AmSession* session) { if(_container_closed.get()) return ShutDown; - if(AmEventDispatcher::instance()-> - addEventQueue(local_tag,static_cast(session))){ + if(AmEventDispatcher::instance()->addEventQueue(local_tag, + static_cast(session))) + { return Inserted; }