diff --git a/core/AmSessionProcessor.cpp b/core/AmSessionProcessor.cpp new file mode 100644 index 00000000..dc71cbb2 --- /dev/null +++ b/core/AmSessionProcessor.cpp @@ -0,0 +1,176 @@ +/* + * $Id: AmSessionProcessor.cpp 1585 2009-10-28 22:31:08Z sayer $ + * + * Copyright (C) 2010 Stefan Sayer + * + * This file is part of SEMS, a free SIP media server. + * + * SEMS is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * For a license to use the SEMS software under conditions + * other than those described here, or to purchase support for this + * software, please contact iptel.org by e-mail at the following addresses: + * info@iptel.org + * + * SEMS is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifdef SESSION_THREADPOOL + +#include "AmSessionProcessor.h" +#include "AmSession.h" + +#include +#include + +vector AmSessionProcessor::threads; +AmMutex AmSessionProcessor::threads_mut; + +vector::iterator +AmSessionProcessor::threads_it = AmSessionProcessor::threads.begin(); + +AmSessionProcessorThread* AmSessionProcessor::getProcessorThread() { + threads_mut.lock(); + if (!threads.size()) { + ERROR("requesting Session processing thread but none available\n"); + threads_mut.unlock(); + return NULL; + } + + // round robin + if (threads_it == threads.end()) + threads_it = threads.begin(); + + AmSessionProcessorThread* res = *threads_it; + threads_it++; + threads_mut.unlock(); + return res; +} + +void AmSessionProcessor::addThreads(unsigned int num_threads) { + DBG("starting %zd session processor threads\n", num_threads); + threads_mut.lock(); + for (unsigned int i=0; i < num_threads;i++) { + threads.push_back(new AmSessionProcessorThread()); + threads.back()->start(); + } + threads_it = threads.begin(); + DBG("now %zd session processor threads running\n", threads.size()); + threads_mut.unlock(); +} + + +AmSessionProcessorThread::AmSessionProcessorThread() + : events(this), runcond(false) +{ +} + +AmSessionProcessorThread::~AmSessionProcessorThread() { +} + +void AmSessionProcessorThread::notify(AmEventQueue* sender) { + process_sessions_mut.lock(); + runcond.set(true); + process_sessions.insert(sender); + process_sessions_mut.unlock(); +} + +void AmSessionProcessorThread::run() { + + stop_requested = false; + while(!stop_requested.get()){ + + runcond.wait_for(); + + process_sessions_mut.lock(); + runcond.set(false); + // get the list of session s that need processing + std::set pending_process_sessions + = process_sessions; + process_sessions.clear(); + process_sessions_mut.unlock(); + + events.processEvents(); + + // startup all new sessions + if (startup_sessions.size()) { + DBG("starting up %zd sessions\n", startup_sessions.size()); + + for (std::vector::iterator it= + startup_sessions.begin(); + it != startup_sessions.end(); it++) { + + if ((*it)->startup()) + sessions.push_back(*it); // startup successful + } + + startup_sessions.clear(); + } + + std::vector fin_sessions; + + std::list::iterator it=sessions.begin(); + while (it != sessions.end()) { + if ((pending_process_sessions.find(*it)!= + pending_process_sessions.end()) && + (!(*it)->processingCycle())) { + fin_sessions.push_back(*it); + std::list::iterator d_it = it; + it++; + sessions.erase(d_it); + } else { + it++; + } + } + + if (fin_sessions.size()) { + DBG("finalizing %zd sessions\n", fin_sessions.size()); + for (std::vector::iterator it=fin_sessions.begin(); + it != fin_sessions.end(); it++) { + (*it)->finalize(); + } + } + } +} + +void AmSessionProcessorThread::on_stop() { + INFO("requesting session to stop.\n"); + stop_requested.set(true); +} + +// AmEventHandler interface +void AmSessionProcessorThread::process(AmEvent* e) { + AmSessionProcessorThreadAddEvent* add_ev = + dynamic_cast(e); + + if (NULL==add_ev) { + ERROR("received wrong event in AmSessionProcessorThread\n"); + return; + } + + DBG("scheduling session [%p] for startup\n", add_ev->s); + startup_sessions.push_back(add_ev->s); +} + +void AmSessionProcessorThread::startSession(AmSession* s) { + // register us to be notified if some event comes to the session + s->setEventNotificationSink(this); + + // add this to be scheduled + events.postEvent(new AmSessionProcessorThreadAddEvent(s)); + + // wakeup the thread + runcond.set(true); +} + +#endif diff --git a/core/AmSessionProcessor.h b/core/AmSessionProcessor.h new file mode 100644 index 00000000..3948b9c0 --- /dev/null +++ b/core/AmSessionProcessor.h @@ -0,0 +1,94 @@ +/* + * $Id: AmSessionProcessor.h 1585 2009-10-28 22:31:08Z sayer $ + * + * Copyright (C) 2010 Stefan Sayer + * + * This file is part of SEMS, a free SIP media server. + * + * SEMS is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * For a license to use the SEMS software under conditions + * other than those described here, or to purchase support for this + * software, please contact iptel.org by e-mail at the following addresses: + * info@iptel.org + * + * SEMS is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifdef SESSION_THREADPOOL + +#ifndef _AmSessionProcessor_h_ +#define _AmSessionProcessor_h_ + +#include "AmThread.h" +#include "AmEventQueue.h" + +#include +#include +#include +class AmSessionProcessorThread; +class AmSession; + +class AmSessionProcessor { + static vector threads; + static AmMutex threads_mut; + static vector::iterator + threads_it; + + public: + static AmSessionProcessorThread* getProcessorThread(); + static void addThreads(unsigned int num_threads); +}; + +struct AmSessionProcessorThreadAddEvent + : AmEvent +{ + AmSession* s; + AmSessionProcessorThreadAddEvent(AmSession* s) + : s(s), AmEvent(120) { } +}; + +class AmSessionProcessorThread +: public AmThread, + public AmEventHandler, + public AmEventNotificationSink +{ + AmEventQueue events; + std::list sessions; + std::vector startup_sessions; + AmSharedVar stop_requested; + + AmCondition runcond; + std::set process_sessions; + AmMutex process_sessions_mut; + + // AmEventHandler interface + void process(AmEvent* e); + + public: + AmSessionProcessorThread(); + ~AmSessionProcessorThread(); + + // AmThread interface + void run(); + void on_stop(); + + // AmEventNotificationSink interface + void notify(AmEventQueue* sender); + + void startSession(AmSession* s); +}; + +#endif // _AmSessionProcessor_h_ + +#endif // #ifdef SESSION_THREADPOOL