o early media receiving support

o local audio input support
o posting conference event support
o explicit monitor_rtp_timeout set


git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@505 8eb893ce-cfd4-0310-b710-fb5ebe64c474
sayer/1.4-spce2.6
Stefan Sayer 18 years ago
parent a5e8881084
commit 061dcad669

@ -420,7 +420,10 @@ unsigned int AmAudio::downMix(unsigned int size)
unsigned int AmAudio::getFrameSize()
{
assert(fmt.get());
if (!fmt.get())
fmt.reset(new AmAudioSimpleFormat(CODEC_PCM16));
return fmt->frame_size;
}

@ -67,6 +67,27 @@ AmConferenceChannel* AmConferenceStatus::getChannel(const string& cid,
return ch;
}
void AmConferenceStatus::postConferenceEvent(const string& cid,
int event_id, const string& sess_id) {
AmConferenceStatus* st = 0;
cid2s_mut.lock();
map<string,AmConferenceStatus*>::iterator it = cid2status.find(cid);
if(it != cid2status.end()){
st = it->second;
}
else {
st = new AmConferenceStatus(cid);
cid2status[cid] = st;
}
st->postConferenceEvent(event_id, sess_id);
cid2s_mut.unlock();
}
void AmConferenceStatus::releaseChannel(const string& cid, unsigned int ch_id)
{
cid2s_mut.lock();
@ -100,6 +121,21 @@ AmConferenceStatus::~AmConferenceStatus()
DBG("AmConferenceStatus::~AmConferenceStatus(): conf_id = %s\n",conf_id.c_str());
}
void AmConferenceStatus::postConferenceEvent(int event_id, const string& sess_id)
{
sessions_mut.lock();
int participants = sessions.size();
for(map<string, unsigned int>::iterator it = sessions.begin();
it != sessions.end(); it++){
AmSessionContainer::instance()->postEvent(
it->first,
new ConferenceEvent(event_id,
participants,conf_id,sess_id)
);
}
sessions_mut.unlock();
}
AmConferenceChannel* AmConferenceStatus::getChannel(const string& sess_id)
{
AmConferenceChannel* ch = 0;
@ -107,24 +143,18 @@ AmConferenceChannel* AmConferenceStatus::getChannel(const string& sess_id)
sessions_mut.lock();
map<string, unsigned int>::iterator it = sessions.find(sess_id);
if(it != sessions.end()){
ch = new AmConferenceChannel(this,it->second,false);
}
else {
ch = new AmConferenceChannel(this,it->second,false);
} else {
if(!sessions.empty()){
int participants = sessions.size()+1;
for(it = sessions.begin(); it != sessions.end(); it++){
AmSessionContainer::instance()->postEvent(
it->first,
new ConferenceEvent(ConfNewParticipant,
participants,conf_id,sess_id)
);
}
}
else {
} else {
// The First participant gets its own NewParticipant message
AmSessionContainer::instance()->postEvent(
sess_id, new ConferenceEvent(ConfNewParticipant,1,

@ -101,6 +101,8 @@ class AmConferenceStatus
int releaseChannel(unsigned int ch_id);
void postConferenceEvent(int event_id, const string& sess_id);
public:
const string& getConfID() { return conf_id; }
AmMultiPartyMixer* getMixer() { return &mixer; }
@ -109,6 +111,9 @@ public:
const string& local_tag);
static void releaseChannel(const string& cid, unsigned int ch_id);
static void postConferenceEvent(const string& cid, int event_id,
const string& sess_id);
};
#endif

@ -232,45 +232,66 @@ void AmMediaProcessorThread::processAudio(unsigned int ts)
it != sessions.end(); it++){
AmSession* s = (*it);
if (s->rtp_str.checkInterval(ts) &&
(s->rtp_str.receiving || s->rtp_str.getPassiveMode())) {
// todo: get frame size/checkInterval from local audio if local in+out (?)
unsigned int f_size = s->rtp_str.getFrameSize();
// complete frame time reached?
if (s->rtp_str.checkInterval(ts, f_size)) {
s->lockAudio();
AmAudio* input = s->getInput();
int ret = s->rtp_str.receive(ts);
if(ret < 0){
switch(ret){
case RTP_DTMF:
case RTP_UNKNOWN_PL:
case RTP_PARSE_ERROR:
break;
case RTP_TIMEOUT:
postRequest(new SchedRequest(AmMediaProcessor::RemoveSession,s));
s->postEvent(new AmRtpTimeoutEvent());
break;
case RTP_BUFFER_SIZE:
default:
ERROR("AmRtpAudio::receive() returned %i\n",ret);
postRequest(new SchedRequest(AmMediaProcessor::ClearSession,s));
break;
int got_audio = -1;
// get/receive audio
if (!s->getAudioLocal(AM_AUDIO_IN)) {
// input is not local - receive from rtp stream
if (s->rtp_str.receiving || s->rtp_str.getPassiveMode()) {
int ret = s->rtp_str.receive(ts);
if(ret < 0){
switch(ret){
case RTP_DTMF:
case RTP_UNKNOWN_PL:
case RTP_PARSE_ERROR:
break;
case RTP_TIMEOUT:
postRequest(new SchedRequest(AmMediaProcessor::RemoveSession,s));
s->postEvent(new AmRtpTimeoutEvent());
break;
case RTP_BUFFER_SIZE:
default:
ERROR("AmRtpAudio::receive() returned %i\n",ret);
postRequest(new SchedRequest(AmMediaProcessor::ClearSession,s));
break;
}
} else {
got_audio = s->rtp_str.get(ts,buffer,f_size);
if (s->isDtmfDetectionEnabled() && got_audio > 0)
s->putDtmfAudio(buffer, got_audio, ts);
}
}
} else {
// input is local - get audio from local_in
AmAudio* local_input = s->getLocalInput();
if (local_input) {
got_audio = local_input->get(ts,buffer,f_size);
}
}
else {
unsigned int f_size = s->rtp_str.getFrameSize();
int size = s->rtp_str.get(ts,buffer,f_size);
// process received audio
if (got_audio >= 0) {
AmAudio* input = s->getInput();
if (input) {
int ret = input->put(ts,buffer,size);
int ret = input->put(ts,buffer,got_audio);
if(ret < 0){
DBG("input->put() returned: %i\n",ret);
postRequest(new SchedRequest(AmMediaProcessor::ClearSession,s));
}
}
if (s->isDtmfDetectionEnabled())
s->putDtmfAudio(buffer, size, ts);
}
s->unlockAudio();
}
}
@ -290,10 +311,22 @@ void AmMediaProcessorThread::processAudio(unsigned int ts)
DBG("output->get() returned: %i\n",size);
postRequest(new SchedRequest(AmMediaProcessor::ClearSession,s));
}
else if(!s->rtp_str.mute){
if(s->rtp_str.put(ts,buffer,size)<0)
postRequest(new SchedRequest(AmMediaProcessor::ClearSession,s));
else {
if (!s->getAudioLocal(AM_AUDIO_OUT)) {
// audio should go to RTP
if(!s->rtp_str.mute){
if(s->rtp_str.put(ts,buffer,size)<0)
postRequest(new SchedRequest(AmMediaProcessor::ClearSession,s));
}
} else {
// output is local - audio should go in local_out
AmAudio* local_output = s->getLocalOutput();
if (local_output) {
if (local_output->put(ts,buffer,size)) {
postRequest(new SchedRequest(AmMediaProcessor::ClearSession,s));
}
}
}
}
}
s->unlockAudio();

@ -39,7 +39,7 @@ AmRtpAudio::AmRtpAudio(AmSession* _s)
{
}
bool AmRtpAudio::checkInterval(unsigned int ts)
bool AmRtpAudio::checkInterval(unsigned int ts, unsigned int frame_size)
{
if(!last_check_i){
send_int = true;
@ -47,7 +47,7 @@ bool AmRtpAudio::checkInterval(unsigned int ts)
last_check = ts;
}
else {
if((ts - last_check) >= getFrameSize()){
if((ts - last_check) >= frame_size){
send_int = true;
last_check = ts;
}

@ -85,7 +85,7 @@ class AmRtpAudio: public AmRtpStream, public AmAudio, public AmPLCBuffer
public:
AmRtpAudio(AmSession* _s=0);
bool checkInterval(unsigned int ts);
bool checkInterval(unsigned int ts, unsigned int frame_size);
bool sendIntReached();
int setCurrentPayload(int payload);

@ -271,7 +271,8 @@ AmRtpStream::AmRtpStream(AmSession* _s)
telephone_event_pt(NULL),
mute(false),
hold(false),
receiving(true)
receiving(true),
monitor_rtp_timeout(true)
{
@ -423,7 +424,8 @@ int AmRtpStream::nextPacket(AmRtpPacket& p)
receive_mut.lock();
timersub(&now,&last_recv_time,&diff);
if(AmConfig::DeadRtpTime &&
if(monitor_rtp_timeout &&
AmConfig::DeadRtpTime &&
(diff.tv_sec > 0) &&
((unsigned int)diff.tv_sec > AmConfig::DeadRtpTime)){
WARN("RTP Timeout detected. Last received packet is too old.\n");

@ -137,8 +137,13 @@ public:
/** Mute */
bool mute;
/** mute && port == 0 */
bool hold;
/** marker flag */
bool begin_talk;
/** do check rtp timeout */
bool monitor_rtp_timeout;
/** should we receive packets? if not -> drop */
bool receiving;
@ -225,6 +230,11 @@ public:
/** get the RTP stream on hold */
bool getOnHold();
/** setter for monitor_rtp_timeout */
void setMonitorRTPTimeout(bool m) { monitor_rtp_timeout = m; }
/** getter for monitor_rtp_timeout */
bool getMonitorRTPTimeout() { return monitor_rtp_timeout; }
/**
* Report an ICMP error.
*/

@ -46,7 +46,6 @@
#include <assert.h>
#include <sys/time.h>
// AmSessionEventHandler methods
bool AmSessionEventHandler::process(AmEvent*)
@ -124,10 +123,13 @@ AmSession::AmSession()
dlg(this),
detached(true),
sess_stopped(false),rtp_str(this),negotiate_onreply(false),
input(0), output(0),
input(0), output(0), local_input(0), local_output(0),
m_dtmfDetector(this), m_dtmfEventQueue(&m_dtmfDetector),
m_dtmfDetectionEnabled(true)
m_dtmfDetectionEnabled(true),
accept_early_session(false)
{
use_local_audio[AM_AUDIO_IN] = false;
use_local_audio[AM_AUDIO_OUT] = false;
}
AmSession::~AmSession()
@ -175,6 +177,39 @@ void AmSession::setInOut(AmAudio* in,AmAudio* out)
unlockAudio();
}
void AmSession::setLocalInput(AmAudio* in)
{
lockAudio();
local_input = in;
unlockAudio();
}
void AmSession::setLocalOutput(AmAudio* out)
{
lockAudio();
local_output = out;
unlockAudio();
}
void AmSession::setLocalInOut(AmAudio* in,AmAudio* out)
{
lockAudio();
local_input = in;
local_output = out;
unlockAudio();
}
void AmSession::setAudioLocal(unsigned int dir,
bool local) {
assert(dir<2);
use_local_audio[dir] = local;
}
bool AmSession::getAudioLocal(unsigned int dir) {
assert(dir<2);
return use_local_audio[dir];
}
void AmSession::lockAudio()
{
audio_mut.lock();
@ -413,6 +448,14 @@ void AmSession::clearAudio()
output->close();
output = 0;
}
if(local_input){
local_input->close();
local_input = 0;
}
if(local_output){
local_output->close();
local_output = 0;
}
unlockAudio();
DBG("Audio cleared !!!\n");
postEvent(new AmAudioEvent(AmAudioEvent::cleared));
@ -488,7 +531,7 @@ void AmSession::onSipRequest(const AmSipRequest& req)
onSessionStart(req);
if(input || output)
if(input || output || local_input || local_output)
AmMediaProcessor::instance()->addSession(this, callgroup);
else {
DBG("no audio input and output set. "
@ -544,13 +587,15 @@ void AmSession::onSipReply(const AmSipReply& reply)
case AmSipDialog::Connected:
try {
rtp_str.setMonitorRTPTimeout(true);
acceptAudio(reply.body,reply.hdrs);
if(detached.get() && !getStopped()){
if(!getStopped()){
onSessionStart(reply);
if(input || output)
if(input || output || local_input || local_output)
AmMediaProcessor::instance()->addSession(this,
callgroup);
else {
@ -571,8 +616,36 @@ void AmSession::onSipReply(const AmSipReply& reply)
case AmSipDialog::Pending:
switch(reply.code){
case 180: break;//TODO: local ring tone.
case 183: break;//TODO: remote ring tone.
case 180: {
onRinging(reply);
rtp_str.setMonitorRTPTimeout(false);
if(input || output || local_input || local_output)
AmMediaProcessor::instance()->addSession(this,
callgroup);
} break;
case 183: {
if (accept_early_session) {
try {
setMute(true);
acceptAudio(reply.body,reply.hdrs);
onEarlySessionStart(reply);
rtp_str.setMonitorRTPTimeout(false);
if(input || output || local_input || local_output)
AmMediaProcessor::instance()->addSession(this,
callgroup);
} catch(const AmSession::Exception& e){
ERROR("%i %s\n",e.code,e.reason.c_str());
} // exceptions are not critical here
}
} break;
default: break;// continue waiting.
}
}
@ -643,11 +716,6 @@ int AmSession::acceptAudio(const string& body,
catch(const AmSession::Exception& e){
ERROR("%i %s\n",e.code,e.reason.c_str());
throw;
// if(dlg.reply(req,e.code,e.reason, "")){
// dlg.bye();
// }
// setStopped();
}
return -1;

@ -96,8 +96,11 @@ public:
#define AM_AUDIO_IN 0
#define AM_AUDIO_OUT 1
/**
* \brief Implements the behavior of one session
* \brief Implements the default behavior of one session
*
* The session is identified by Call-ID, From-Tag and To-Tag.
*/
@ -107,9 +110,16 @@ class AmSession : public AmThread,
public AmSipDialogEventHandler
{
AmMutex audio_mut;
// remote (to/from RTP) audio inout
AmAudio* input;
AmAudio* output;
// local (to/from audio dev) audio inout
AmAudio* local_input;
AmAudio* local_output;
bool use_local_audio[2];
vector<SdpPayload *> m_payloads;
bool negotiate_onreply;
@ -141,6 +151,9 @@ protected:
- by default local tag */
string callgroup;
/** do accept early session? */
bool accept_early_session;
public:
AmSipDialog dlg;
@ -227,6 +240,32 @@ public:
void setOutput(AmAudio* out);
void setInOut(AmAudio* in, AmAudio* out);
/**
* Local audio input getter .
* Note: audio must be locked!
*/
AmAudio* getLocalInput() { return local_input; }
/**
* Local audio output getter.
* Note: audio must be locked!
*/
AmAudio* getLocalOutput(){ return local_output;}
/**
* Local audio input & output set methods.
* Note: audio will be locked by the methods.
*/
void setLocalInput(AmAudio* in);
void setLocalOutput(AmAudio* out);
void setLocalInOut(AmAudio* in, AmAudio* out);
/** this switches between local and remote
* audio inout
*/
void setAudioLocal(unsigned int dir, bool local);
bool getAudioLocal(unsigned int dir);
/**
* Clears input & ouput (no need to lock)
*/
@ -361,6 +400,20 @@ public:
*/
virtual void onSessionStart(const AmSipReply& reply){}
/**
* onEarlySessionStart will be called after
* 183 early media reply is received and early session
* is setup, if accept_early_session is set.
*/
virtual void onEarlySessionStart(const AmSipReply& reply){}
/**
* onRinging will be called after 180 is received.
* If local audio is set up, session is added to scheduler.
*/
virtual void onRinging(const AmSipReply& reply){}
/**
* @see AmDialogState
*/

@ -1,4 +1,11 @@
Changelog for SEMS
- support for posting events into conferences
- support for receiving early media
- support for local audio as audio sources into audio engine
on the same channel as RTP
- outbound_proxy option sets next hop on outgoing dialogs and
registrations

Loading…
Cancel
Save