renamed to MediaProcessor

git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@94 8eb893ce-cfd4-0310-b710-fb5ebe64c474
sayer/1.4-spce2.6
Stefan Sayer 20 years ago
parent 64f94d8423
commit 183eeca64a

@ -1,5 +1,5 @@
/*
* $Id: AmSessionScheduler.cpp,v 1.1.2.9 2005/08/31 13:54:29 rco Exp $
* $Id: AmMediaProcessor.cpp,v 1.1.2.9 2005/08/31 13:54:29 rco Exp $
*
* Copyright (C) 2002-2003 Fhg Fokus
*
@ -25,14 +25,13 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "AmSessionScheduler.h"
#include <assert.h>
#include <sys/time.h>
#include <signal.h>
/** \brief Request event to the SessionScheduler (remove,...) */
/** \brief Request event to the MediaProcessor (remove,...) */
struct SchedRequest :
public AmEvent
{
@ -44,37 +43,37 @@ struct SchedRequest :
/* session scheduler */
AmSessionScheduler* AmSessionScheduler::_instance;
AmMediaProcessor* AmMediaProcessor::_instance;
AmSessionScheduler::AmSessionScheduler()
AmMediaProcessor::AmMediaProcessor()
{
}
AmSessionScheduler::~AmSessionScheduler()
AmMediaProcessor::~AmMediaProcessor()
{
}
void AmSessionScheduler::init() {
void AmMediaProcessor::init() {
// start the threads
num_threads = AmConfig::SessionSchedulerThreads;
num_threads = AmConfig::MediaProcessorThreads;
assert(num_threads > 0);
DBG("Starting %u SessionSchedulerThreads.\n", num_threads);
threads = new AmSessionSchedulerThread*[num_threads];
DBG("Starting %u MediaProcessorThreads.\n", num_threads);
threads = new AmMediaProcessorThread*[num_threads];
for (unsigned int i=0;i<num_threads;i++) {
threads[i] = new AmSessionSchedulerThread();
threads[i] = new AmMediaProcessorThread();
threads[i]->start();
}
}
AmSessionScheduler* AmSessionScheduler::instance()
AmMediaProcessor* AmMediaProcessor::instance()
{
if(!_instance)
_instance = new AmSessionScheduler();
_instance = new AmMediaProcessor();
return _instance;
}
void AmSessionScheduler::addSession(AmSession* s,
void AmMediaProcessor::addSession(AmSession* s,
const string& callgroup)
{
s->detached.set(false);
@ -113,9 +112,9 @@ void AmSessionScheduler::addSession(AmSession* s,
postRequest(new SchedRequest(InsertSession,s));
}
void AmSessionScheduler::removeSession(AmSession* s)
void AmMediaProcessor::removeSession(AmSession* s)
{
DBG("AmSessionScheduler::removeSession\n");
DBG("AmMediaProcessor::removeSession\n");
group_mut.lock();
// get scheduler
string callgroup = session2callgroup[s];
@ -146,19 +145,19 @@ void AmSessionScheduler::removeSession(AmSession* s)
/* the actual session scheduler thread */
AmSessionSchedulerThread::AmSessionSchedulerThread()
AmMediaProcessorThread::AmMediaProcessorThread()
: events(this)
{
}
AmSessionSchedulerThread::~AmSessionSchedulerThread()
AmMediaProcessorThread::~AmMediaProcessorThread()
{
}
void AmSessionSchedulerThread::on_stop()
void AmMediaProcessorThread::on_stop()
{
}
void AmSessionSchedulerThread::run()
void AmMediaProcessorThread::run()
{
struct timeval now,next_tick,diff,tick;
unsigned int ts = 0;
@ -197,7 +196,7 @@ void AmSessionSchedulerThread::run()
/**
* process pending DTMF events
*/
void AmSessionSchedulerThread::processDtmfEvents()
void AmMediaProcessorThread::processDtmfEvents()
{
for(set<AmSession*>::iterator it = sessions.begin();
it != sessions.end(); it++)
@ -207,7 +206,7 @@ void AmSessionSchedulerThread::processDtmfEvents()
}
}
void AmSessionSchedulerThread::processAudio(unsigned int ts)
void AmMediaProcessorThread::processAudio(unsigned int ts)
{
for(set<AmSession*>::iterator it = sessions.begin();
it != sessions.end(); it++){
@ -232,7 +231,7 @@ void AmSessionSchedulerThread::processAudio(unsigned int ts)
case RTP_BUFFER_SIZE:
default:
ERROR("AmRtpAudio::receive() returned %i\n",ret);
postRequest(new SchedRequest(AmSessionScheduler::RemoveSession,s));
postRequest(new SchedRequest(AmMediaProcessor::RemoveSession,s));
break;
}
}
@ -244,7 +243,7 @@ void AmSessionSchedulerThread::processAudio(unsigned int ts)
int ret = input->put(ts,buffer,size);
if(ret < 0){
DBG("input->put() returned: %i\n",ret);
postRequest(new SchedRequest(AmSessionScheduler::RemoveSession,s));
postRequest(new SchedRequest(AmMediaProcessor::RemoveSession,s));
}
}
if (s->isDtmfDetectionEnabled())
@ -266,12 +265,12 @@ void AmSessionSchedulerThread::processAudio(unsigned int ts)
int size = output->get(ts,buffer,s->rtp_str.getFrameSize());
if(size <= 0){
DBG("output->get() returned: %i\n",size);
postRequest(new SchedRequest(AmSessionScheduler::RemoveSession,s)); //removeSession(s);
postRequest(new SchedRequest(AmMediaProcessor::RemoveSession,s)); //removeSession(s);
}
else if(!s->rtp_str.mute){
if(s->rtp_str.put(ts,buffer,size)<0)
postRequest(new SchedRequest(AmSessionScheduler::RemoveSession,s));
postRequest(new SchedRequest(AmMediaProcessor::RemoveSession,s));
// removeSession(s);
}
}
@ -279,22 +278,22 @@ void AmSessionSchedulerThread::processAudio(unsigned int ts)
}
}
void AmSessionSchedulerThread::process(AmEvent* e)
void AmMediaProcessorThread::process(AmEvent* e)
{
SchedRequest* sr = dynamic_cast<SchedRequest*>(e);
if(!sr){
ERROR("AmSessionSchedulerThread::process: wrong event type\n");
ERROR("AmMediaProcessorThread::process: wrong event type\n");
return;
}
switch(sr->event_id){
case AmSessionScheduler::InsertSession:
case AmMediaProcessor::InsertSession:
DBG("Session inserted to the scheduler\n");
sessions.insert(sr->s);
break;
case AmSessionScheduler::RemoveSession:{
case AmMediaProcessor::RemoveSession:{
AmSession* s = sr->s;
set<AmSession*>::iterator s_it = sessions.find(s);
if(s_it != sessions.end()){
@ -307,16 +306,16 @@ void AmSessionSchedulerThread::process(AmEvent* e)
break;
default:
ERROR("AmSessionSchedulerThread::process: unknown event id.");
ERROR("AmMediaProcessorThread::process: unknown event id.");
break;
}
}
unsigned int AmSessionSchedulerThread::getLoad() {
unsigned int AmMediaProcessorThread::getLoad() {
// lock ?
return sessions.size();
}
inline void AmSessionSchedulerThread::postRequest(SchedRequest* sr) {
inline void AmMediaProcessorThread::postRequest(SchedRequest* sr) {
events.postEvent(sr);
}

@ -1,5 +1,5 @@
/*
* $Id: AmSessionScheduler.h,v 1.1.2.4 2005/06/01 12:00:24 rco Exp $
* $Id: AmMediaProcessor.h,v 1.1.2.4 2005/06/01 12:00:24 rco Exp $
*
* Copyright (C) 2002-2003 Fhg Fokus
*
@ -25,8 +25,8 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef _AmSessionScheduler_h_
#define _AmSessionScheduler_h_
#ifndef _AmMediaProcessor_h_
#define _AmMediaProcessor_h_
#include "AmSession.h"
#include "AmEventQueue.h"
@ -46,7 +46,7 @@ struct SchedRequest;
* It processes the media and triggers the sending of RTP
* of all sessions added to it.
*/
class AmSessionSchedulerThread :
class AmMediaProcessorThread :
public AmThread,
public AmEventHandler
{
@ -67,8 +67,8 @@ class AmSessionSchedulerThread :
// AmEventHandler interface
void process(AmEvent* e);
public:
AmSessionSchedulerThread();
~AmSessionSchedulerThread();
AmMediaProcessorThread();
~AmMediaProcessorThread();
inline void postRequest(SchedRequest* sr);
@ -79,29 +79,29 @@ public:
* \brief Media processing thread manager
*
* This class implements the manager that assigns and removes
* the Sessions to the various \ref SessionSchedulerThreads,
* the Sessions to the various \ref MediaProcessorThreads,
* according to their call group. This class contains the API
* for the SessionScheduler.
* for the MediaProcessor.
*/
class AmSessionScheduler
class AmMediaProcessor
{
static AmSessionScheduler* _instance;
static AmMediaProcessor* _instance;
unsigned int num_threads;
AmSessionSchedulerThread** threads;
AmMediaProcessorThread** threads;
map<string, unsigned int> callgroup2thread;
multimap<string, AmSession*> callgroupmembers;
map<AmSession*, string> session2callgroup;
AmMutex group_mut;
AmSessionScheduler();
~AmSessionScheduler();
AmMediaProcessor();
~AmMediaProcessor();
public:
enum { InsertSession, RemoveSession };
static AmSessionScheduler* instance();
static AmMediaProcessor* instance();
void init();
void addSession(AmSession* s, const string& callgroup);

Loading…
Cancel
Save