applies patch from Andriy which makes adaptive jitter buffer and adaptive playout buffer selectable at run time

git-svn-id: http://svn.berlios.de/svnroot/repos/sems/trunk@254 8eb893ce-cfd4-0310-b710-fb5ebe64c474
sayer/1.4-spce2.6
Stefan Sayer 20 years ago
parent 00e3b5b89f
commit f4d64a3d76

@ -48,11 +48,10 @@ C_FLAGS ?= -Wall -fPIC -g \
-DOS='"$(OS)"' \
# -DSUPPORT_IPV6
ifeq ($(WITH_ADAPTIVE_JITTERBUFFER), yes)
CXX_FLAGS += -DUSE_ADAPTIVE_JB
ifeq ($(DEBUG_PLAYOUT), yes)
CXX_FLAGS += -DDEBUG_PLAYOUTBUF
endif
TARGET =
ifeq ($(OS), linux)
LD_FLAGS += -ldl -rdynamic -lpthread

@ -50,6 +50,7 @@ string ConferenceFactory::LonelyUserFile;
string ConferenceFactory::JoinSound;
string ConferenceFactory::DropSound;
string ConferenceFactory::DialoutSuffix;
PlayoutType ConferenceFactory::m_PlayoutType = ADAPTIVE_PLAYOUT;
int ConferenceFactory::onLoad()
{
@ -70,14 +71,23 @@ int ConferenceFactory::onLoad()
JoinSound = cfg.getParameter("join_sound");
DropSound = cfg.getParameter("drop_sound");
//RingTone = cfg.getParameter("ring_tone");
DialoutSuffix = cfg.getParameter("dialout_suffix");
if(DialoutSuffix.empty()){
WARN("No dialout_suffix has been configured in the conference plug-in:\n");
WARN("\t -> dial out will not be available\n");
}
string playout_type = cfg.getParameter("playout_type");
if (playout_type == "simple") {
m_PlayoutType = SIMPLE_PLAYOUT;
DBG("Using simple (fifo) buffer as playout technique.\n");
} else if (playout_type == "adaptive_jb") {
m_PlayoutType = JB_PLAYOUT;
DBG("Using adaptive jitter buffer as playout technique.\n");
} else {
DBG("Using adaptive playout buffer as playout technique.\n");
}
return 0;
}
@ -110,7 +120,7 @@ ConferenceDialog::ConferenceDialog(const string& conf_id,
allow_dialout(false)
{
dialedout = this->dialout_channel.get() != 0;
// rtp_str.setAdaptivePlayout(true);
rtp_str.setPlayoutType(ConferenceFactory::m_PlayoutType);
}
ConferenceDialog::~ConferenceDialog()

@ -73,7 +73,7 @@ public:
static string JoinSound;
static string DropSound;
static string DialoutSuffix;
//static string RingTone;
static PlayoutType m_PlayoutType;
ConferenceFactory(const string& _app_name);
virtual AmSession* onInvite(const AmSipRequest&);

@ -24,16 +24,25 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "AmRtpStream.h"
#include "AmAudio.h"
#include "AmJitterBuffer.h"
#include "AmRtpPacket.h"
#include "log.h"
#include "SampleArray.h"
bool Packet::operator < (const Packet& p) const
{
return ts_less()(m_packet.timestamp, p.m_packet.timestamp);
return ts_less()(m_ts, p.m_ts);
}
void Packet::init(const ShortSample *data, unsigned int size, unsigned int ts)
{
size = PCM16_S2B(size);
if (size > sizeof(m_data))
size = sizeof(m_data);
m_size = PCM16_B2S(size);
memcpy(m_data, data, size);
m_ts = ts;
}
PacketAllocator::PacketAllocator()
@ -45,13 +54,15 @@ PacketAllocator::PacketAllocator()
m_packets[MAX_JITTER / 80 - 1].m_next = NULL;
}
Packet *PacketAllocator::alloc(const AmRtpPacket *p)
Packet *PacketAllocator::alloc(const ShortSample *data, unsigned int size, unsigned int ts)
{
if (m_free_packets == NULL)
return NULL;
Packet *retval = m_free_packets;
m_free_packets = retval->m_next;
memcpy(&retval->m_packet, p, sizeof(*p));
retval->init(data, size, ts);
retval->m_next = retval->m_prev = NULL;
return retval;
}
@ -63,40 +74,42 @@ void PacketAllocator::free(Packet *p)
m_free_packets = p;
}
AmJitterBuffer::AmJitterBuffer(AmRtpStream *owner)
AmJitterBuffer::AmJitterBuffer()
: m_tsInited(false), m_tsDeltaInited(false), m_delayCount(0),
m_jitter(INITIAL_JITTER), m_owner(owner),
m_jitter(INITIAL_JITTER),
m_tail(NULL), m_head(NULL), m_forceResync(false)
{
}
void AmJitterBuffer::put(const AmRtpPacket *p)
void AmJitterBuffer::put(const ShortSample *data, unsigned int size, unsigned int ts, bool begin_talk)
{
m_mutex.lock();
if (p->marker)
if (begin_talk)
m_forceResync = true;
if (m_tsInited && !m_forceResync && ts_less()(m_lastTs + m_jitter, p->timestamp))
if (m_tsInited && !m_forceResync && ts_less()(m_lastTs + m_jitter, ts))
{
unsigned int delay = p->timestamp - m_lastTs;
unsigned int delay = ts - m_lastTs;
if (delay > m_jitter && m_jitter < MAX_JITTER)
{
m_jitter += (delay - m_jitter) / 2;
if (m_jitter > MAX_JITTER)
m_jitter = MAX_JITTER;
// DBG("Jitter buffer delay increased to %u\n", m_jitter);
#ifdef DEBUG_PLAYOUTBUF
DBG("Jitter buffer delay increased to %u\n", m_jitter);
#endif
}
// Packet arrived too late to be put into buffer
if (ts_less()(p->timestamp + m_jitter, m_lastTs)) {
if (ts_less()(ts + m_jitter, m_lastTs)) {
m_mutex.unlock();
return;
}
}
Packet *elem = m_allocator.alloc(p);
Packet *elem = m_allocator.alloc(data, size, ts);
if (elem == NULL) {
elem = m_head;
m_head = m_head->m_next;
m_head->m_prev = NULL;
memcpy(&elem->m_packet, p, sizeof(*p));
elem->init(data, size, ts);
}
if (m_tail == NULL)
{
@ -124,11 +137,11 @@ void AmJitterBuffer::put(const AmRtpPacket *p)
}
}
if (!m_tsInited) {
m_lastTs = p->timestamp;
m_lastTs = ts;
m_tsInited = true;
}
else if (ts_less()(m_lastTs, p->timestamp) || m_forceResync) {
m_lastTs = p->timestamp;
else if (ts_less()(m_lastTs, ts) || m_forceResync) {
m_lastTs = ts;
}
m_mutex.unlock();
@ -139,7 +152,8 @@ void AmJitterBuffer::put(const AmRtpPacket *p)
* To get all the packets for the single ts the caller must call this
* method with the same ts till the return value will become false.
*/
bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms)
bool AmJitterBuffer::get(unsigned int ts, unsigned int ms, ShortSample *out_buf,
unsigned int *out_size, unsigned int *out_ts)
{
bool retval = true;
@ -152,7 +166,11 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms)
m_tsDelta = m_lastTs - ts + ms;
m_tsDeltaInited = true;
m_lastAudioTs = ts;
m_forceResync = true;
m_forceResync = false;
#ifdef DEBUG_PLAYOUTBUF
DBG("Jitter buffer: initialized tsDelta with %u\n", m_tsDelta);
m_tsDeltaStart = m_tsDelta;
#endif
}
else if (m_lastAudioTs != ts && m_lastResyncTs != m_lastTs) {
if (ts_less()(ts + m_tsDelta, m_lastTs)) {
@ -161,15 +179,20 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms)
* immediate resync required
*/
m_tsDelta += m_lastTs - ts + ms;
// DBG("Jitter buffer resynced forward (-> %u)\n", m_tsDelta);
#ifdef DEBUG_PLAYOUTBUF
DBG("Jitter buffer resynced forward (-> %d rel)\n",
m_tsDelta - m_tsDeltaStart);
#endif
m_delayCount = 0;
}
else if (ts_less()(m_lastTs, ts + m_tsDelta - m_jitter / 2)) {
} else if (ts_less()(m_lastTs, ts + m_tsDelta - m_jitter / 2)) {
/* New packet hasn't arrived yet */
if (m_delayCount > RESYNC_THRESHOLD) {
unsigned int d = m_tsDelta -(m_lastTs - ts + ms);
m_tsDelta -= d / 2;
// DBG("Jitter buffer resynced backward (-> %u)\n", m_tsDelta);
#ifdef DEBUG_PLAYOUTBUF
DBG("Jitter buffer resynced backward (-> %d rel)\n",
m_tsDelta - m_tsDeltaStart);
#endif
}
else
++m_delayCount;
@ -185,7 +208,7 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms)
// DBG("Getting pkt at %u, res ts = %u\n", get_ts / m_frameSize, p.timestamp);
// First of all throw away all too old packets from the head
Packet *tmp;
for (tmp = m_head; tmp && ts_less()(tmp->m_packet.timestamp + m_owner->bytes2samples(tmp->m_packet.getDataSize()), get_ts); )
for (tmp = m_head; tmp && ts_less()(tmp->ts() + tmp->size(), get_ts); )
{
m_head = tmp->m_next;
if (m_head == NULL)
@ -196,7 +219,7 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms)
tmp = m_head;
}
// Get the packet from the head
if (m_head && ts_less()(m_head->m_packet.timestamp, get_ts + ms))
if (m_head && ts_less()(m_head->ts(), get_ts + ms))
{
tmp = m_head;
m_head = tmp->m_next;
@ -204,9 +227,10 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms)
m_tail = NULL;
else
m_head->m_prev = NULL;
memcpy(&p, &tmp->m_packet, sizeof(p));
memcpy(out_buf, tmp->data(), PCM16_S2B(tmp->size()));
// Map RTP timestamp to internal audio timestamp
p.timestamp -= m_tsDelta - m_jitter;
*out_ts = tmp->ts() - m_tsDelta + m_jitter;
*out_size = tmp->size();
m_allocator.free(tmp);
}
else

@ -28,20 +28,26 @@
#ifndef _AmJitterBuffer_h_
#define _AmJitterBuffer_h_
#include "amci/amci.h"
#include "AmThread.h"
#include "AmRtpPacket.h"
class AmRtpStream;
#include "SampleArray.h"
#define INITIAL_JITTER 640 // 80 miliseconds
#define MAX_JITTER 16000 // 2 seconds
#define RESYNC_THRESHOLD 2
class Packet {
ShortSample m_data[AUDIO_BUFFER_SIZE * 2];
unsigned int m_size;
unsigned int m_ts;
public:
AmRtpPacket m_packet;
Packet *m_next;
Packet *m_prev;
void init(const ShortSample *data, unsigned int size, unsigned int ts);
unsigned int size() const { return m_size; }
unsigned int ts() const { return m_ts; }
ShortSample *data() { return m_data; }
bool operator < (const Packet&) const;
};
@ -54,7 +60,7 @@ private:
public:
PacketAllocator();
Packet *alloc(const AmRtpPacket *);
Packet *alloc(const ShortSample *data, unsigned int size, unsigned int ts);
void free(Packet *p);
};
@ -73,13 +79,17 @@ private:
bool m_tsDeltaInited;
int m_delayCount;
unsigned int m_jitter;
AmRtpStream *m_owner;
// AmRtpStream *m_owner;
bool m_forceResync;
#ifdef DEBUG_PLAYOUTBUF
unsigned int m_tsDeltaStart;
#endif
public:
AmJitterBuffer(AmRtpStream *owner);
void put(const AmRtpPacket *);
bool get(AmRtpPacket &, unsigned int ts, unsigned int ms);
AmJitterBuffer();
void put(const ShortSample *data, unsigned int size, unsigned int ts, bool begin_talk);
bool get(unsigned int ts, unsigned int ms, ShortSample *out, unsigned int *size, unsigned int *out_ts);
};
#endif // _AmJitterBuffer_h_

@ -1,5 +1,6 @@
#include "AmPlayoutBuffer.h"
#include "AmAudio.h"
#include "AmRtpAudio.h"
#define SEARCH_OFFSET 140
@ -14,30 +15,79 @@
#define PI 3.14
#define DEBUG_PLAYOUTBUF
#define MAX_DELAY 8000 /* 1 second */
AmPlayoutBuffer::AmPlayoutBuffer()
: r_ts(0),w_ts(0)
AmPlayoutBuffer::AmPlayoutBuffer(AmRtpAudio *owner)
: r_ts(0),w_ts(0), last_ts_i(false), m_owner(owner)
{
}
void AmPlayoutBuffer::direct_write(unsigned int ts, ShortSample* buf, unsigned int len)
void AmPlayoutBuffer::direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len)
{
#ifndef USE_ADAPTIVE_JB
buffer_put(w_ts,buf,len);
#else
buffer_put(ts, buf, len);
#endif // USE_ADAPTIVE_JB
}
void AmPlayoutBuffer::write(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len)
void AmPlayoutBuffer::write(u_int32_t ref_ts, u_int32_t rtp_ts,
int16_t* buf, u_int32_t len, bool begin_talk)
{
unsigned int mapped_ts;
if(!recv_offset_i)
{
recv_offset = rtp_ts - ref_ts;
recv_offset_i = true;
DBG("initialized recv_offset with %i (%i - %i)\n",
recv_offset, ref_ts, rtp_ts);
mapped_ts = ref_ts;// + jitter_delay;
}
else {
mapped_ts = rtp_ts - recv_offset;// + jitter_delay;
// resync
if( ts_less()(mapped_ts, ref_ts - MAX_DELAY/2) ||
!ts_less()(mapped_ts, ref_ts + MAX_DELAY) ){
DBG("resync needed: reference ts = %u; write ts = %u\n",
ref_ts, mapped_ts);
recv_offset = rtp_ts - ref_ts;
mapped_ts = ref_ts;// + jitter_delay;
}
}
if(!last_ts_i)
{
last_ts = mapped_ts;
last_ts_i = true;
}
if(ts_less()(last_ts, mapped_ts) && !begin_talk
&& (mapped_ts - last_ts <= PLC_MAX_SAMPLES))
{
unsigned char tmp[AUDIO_BUFFER_SIZE * 2];
int l_size = m_owner->conceal_loss(mapped_ts - last_ts, tmp);
if (l_size>0)
{
direct_write_buffer(last_ts, (ShortSample*)tmp, PCM16_B2S(l_size));
}
}
write_buffer(ref_ts, mapped_ts, buf, len);
m_owner->add_to_history(buf, PCM16_S2B(len));
// update last_ts to end of received packet
// if not out-of-sequence
if (ts_less()(last_ts, mapped_ts) || last_ts == mapped_ts)
last_ts = mapped_ts + len;
}
void AmPlayoutBuffer::write_buffer(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len)
{
buffer_put(w_ts,buf,len);
}
u_int32_t AmPlayoutBuffer::read(u_int32_t ts, int16_t* buf, u_int32_t len)
{
#ifndef USE_ADAPTIVE_JB
if(ts_less()(r_ts,w_ts)){
u_int32_t rlen=0;
@ -51,15 +101,28 @@ u_int32_t AmPlayoutBuffer::read(u_int32_t ts, int16_t* buf, u_int32_t len)
}
return 0;
#else
buffer_get(ts, buf, len);
return len;
#endif // USE_ADAPTIVE_JB
}
AmAdaptivePlayout::AmAdaptivePlayout()
: idx(0),
void AmPlayoutBuffer::buffer_put(unsigned int ts, ShortSample* buf, unsigned int len)
{
buffer.put(ts,buf,len);
if(ts_less()(w_ts,ts+len))
w_ts = ts + len;
}
void AmPlayoutBuffer::buffer_get(unsigned int ts, ShortSample* buf, unsigned int len)
{
buffer.get(ts,buf,len);
if(ts_less()(r_ts,ts+len))
r_ts = ts + len;
}
AmAdaptivePlayout::AmAdaptivePlayout(AmRtpAudio *owner)
: AmPlayoutBuffer(owner),
idx(0),
loss_rate(ORDER_STAT_LOSS_RATE),
wsola_off(WSOLA_START_OFF),
shr_threshold(SHR_THRESHOLD),
@ -126,7 +189,7 @@ u_int32_t AmAdaptivePlayout::next_delay(u_int32_t ref_ts, u_int32_t ts)
return D;
}
void AmAdaptivePlayout::write(u_int32_t ref_ts, u_int32_t ts,
void AmAdaptivePlayout::write_buffer(u_int32_t ref_ts, u_int32_t ts,
int16_t* buf, u_int32_t len)
{
// predict next delay
@ -234,27 +297,11 @@ u_int32_t AmAdaptivePlayout::read(u_int32_t ts, int16_t* buf, u_int32_t len)
return len;
}
void AmAdaptivePlayout::direct_write(unsigned int ts, ShortSample* buf, unsigned int len)
void AmAdaptivePlayout::direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len)
{
buffer_put(ts+wsola_off,buf,len);
}
void AmPlayoutBuffer::buffer_put(unsigned int ts, ShortSample* buf, unsigned int len)
{
buffer.put(ts,buf,len);
if(ts_less()(w_ts,ts+len))
w_ts = ts + len;
}
void AmPlayoutBuffer::buffer_get(unsigned int ts, ShortSample* buf, unsigned int len)
{
buffer.get(ts,buf,len);
if(ts_less()(r_ts,ts+len))
r_ts = ts + len;
}
/**
* find best cross correlation of a TEMPLATE_SEG samples
* long frame
@ -387,7 +434,8 @@ u_int32_t AmAdaptivePlayout::time_scale(u_int32_t ts, float factor,
float act_fact = s / (float)packet_len;
#ifdef DEBUG_PLAYOUTBUF
DBG("at ts %u: new size = %u, ratio = %f, requested = %f\n", ts, s, act_fact, factor);
DBG("at ts %u: new size = %u, ratio = %f, requested = %f (wsola_off = %ld)\n",
ts, s, act_fact, factor, (long)wsola_off);
#endif
// break condition: coming to the end of the frame (with safety margin)
if(p_buf_end - tmpl < TEMPLATE_SEG + DELTA)
@ -407,3 +455,67 @@ u_int32_t AmAdaptivePlayout::time_scale(u_int32_t ts, float factor,
return s;
}
/*****************************************************************
*
* AmJbPlayout class methods
*
*****************************************************************/
AmJbPlayout::AmJbPlayout(AmRtpAudio *owner)
: AmPlayoutBuffer(owner)
{
}
u_int32_t AmJbPlayout::read(u_int32_t ts, int16_t* buf, u_int32_t len)
{
prepare_buffer(ts, len);
buffer_get(ts, buf, len);
return len;
}
void AmJbPlayout::direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len)
{
buffer_put(ts, buf, len);
}
void AmJbPlayout::prepare_buffer(unsigned int audio_buffer_ts, unsigned int ms)
{
ShortSample buf[AUDIO_BUFFER_SIZE];
unsigned int ts;
unsigned int nb_samples;
/**
* Get all RTP packets that correspond to the required interval,
* decode them and put into playout buffer.
*/
while (m_jb.get(audio_buffer_ts, ms, buf, &nb_samples, &ts))
{
direct_write_buffer(ts, buf, nb_samples);
m_owner->add_to_history(buf, PCM16_S2B(nb_samples));
/* Conceal the gap between previous and current RTP packets */
if (last_ts_i && ts_less()(m_last_rtp_endts, ts))
{
int concealed_size = m_owner->conceal_loss(ts - m_last_rtp_endts, (unsigned char *)buf);
if (concealed_size > 0)
direct_write_buffer(m_last_rtp_endts, buf, PCM16_B2S(concealed_size));
}
m_last_rtp_endts = ts + nb_samples;
last_ts_i = true;
}
if (!last_ts_i) {
return;
}
if (ts_less()(m_last_rtp_endts, audio_buffer_ts + ms))
{
/* Last packets have been lost. Conceal them */
int concealed_size = m_owner->conceal_loss(audio_buffer_ts + ms - m_last_rtp_endts, (unsigned char *)buf);
if (concealed_size > 0)
direct_write_buffer(m_last_rtp_endts, buf, PCM16_B2S(concealed_size));
m_last_rtp_endts = audio_buffer_ts + ms;
}
}
void AmJbPlayout::write(u_int32_t ref_ts, u_int32_t rtp_ts, int16_t* buf, u_int32_t len, bool begin_talk)
{
m_jb.put(buf, len, rtp_ts, begin_talk);
}

@ -4,6 +4,7 @@
#include "SampleArray.h"
#include "AmStats.h"
#include "LowcFE.h"
#include "AmJitterBuffer.h"
#include <set>
using std::multiset;
@ -23,6 +24,12 @@ using std::multiset;
// search segments of size TEMPLATE_SEG samples
#define TEMPLATE_SEG 80
// Maximum value: AUDIO_BUFFER_SIZE / 2
// Note: plc result get stored in our back buffer
#define PLC_MAX_SAMPLES (160*4)
class AmRtpAudio;
/** \brief base class for Playout buffer */
class AmPlayoutBuffer
{
@ -31,17 +38,29 @@ class AmPlayoutBuffer
protected:
u_int32_t r_ts,w_ts;
AmRtpAudio *m_owner;
unsigned int last_ts;
bool last_ts_i;
/** the offset RTP receive TS <-> audio_buffer TS */
unsigned int recv_offset;
/** the recv_offset initialized ? */
bool recv_offset_i;
void buffer_put(unsigned int ts, ShortSample* buf, unsigned int len);
void buffer_get(unsigned int ts, ShortSample* buf, unsigned int len);
virtual void write_buffer(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len);
virtual void direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len);
public:
AmPlayoutBuffer();
AmPlayoutBuffer(AmRtpAudio *owner);
virtual ~AmPlayoutBuffer() {}
virtual void direct_write(unsigned int ts, ShortSample* buf, unsigned int len);
virtual void write(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len);
virtual void write(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len, bool begin_talk);
virtual u_int32_t read(u_int32_t ts, int16_t* buf, u_int32_t len);
void clearLastTs() { last_ts_i = false; }
};
/** \brief adaptive playout buffer */
@ -73,17 +92,36 @@ class AmAdaptivePlayout: public AmPlayoutBuffer
public:
AmAdaptivePlayout();
AmAdaptivePlayout(AmRtpAudio *);
/** write len samples beginning from timestamp ts from buf */
void direct_write(unsigned int ts, ShortSample* buf, unsigned int len);
void direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len);
/** write len samples which beginn from timestamp ts from buf
reference ts of buffer (monotonic increasing buffer ts) is ref_ts */
void write(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len);
void write_buffer(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len);
/** read len samples beginn from timestamp ts into buf */
u_int32_t read(u_int32_t ts, int16_t* buf, u_int32_t len);
};
/** \brief adaptive jitter buffer */
class AmJbPlayout : public AmPlayoutBuffer
{
private:
AmJitterBuffer m_jb;
unsigned int m_last_rtp_endts;
protected:
void direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len);
void prepare_buffer(unsigned int ts, unsigned int ms);
public:
AmJbPlayout(AmRtpAudio *owner);
u_int32_t read(u_int32_t ts, int16_t* buf, u_int32_t len);
void write(u_int32_t ref_ts, u_int32_t rtp_ts, int16_t* buf, u_int32_t len, bool begin_talk);
};

@ -29,11 +29,12 @@
#include <sys/time.h>
#include <assert.h>
#include "AmSession.h"
#include "AmPlayoutBuffer.h"
AmRtpAudio::AmRtpAudio(AmSession* _s)
: AmRtpStream(_s), AmAudio(0),
last_ts_i(false), use_default_plc(true),
send_only(false), playout_buffer(new AmPlayoutBuffer()),
/*last_ts_i(false),*/ use_default_plc(true),
send_only(false), playout_buffer(new AmPlayoutBuffer(this)),
last_check(0),last_check_i(false),send_int(false)
{
}
@ -72,122 +73,30 @@ unsigned int AmRtpAudio::bytes2samples(unsigned int bytes) const
*/
int AmRtpAudio::receive(unsigned int audio_buffer_ts)
{
#ifndef USE_ADAPTIVE_JB
// using fifo or adaptive playout buffer
int size;
unsigned int ts;
unsigned int rtp_ts;
while(true) {
size = AmRtpStream::receive((unsigned char*)samples,
(unsigned int)AUDIO_BUFFER_SIZE,ts,
audio_buffer_ts);
size = AmRtpStream::receive((unsigned char*)samples,
(unsigned int)AUDIO_BUFFER_SIZE, rtp_ts);
if(size <= 0)
break;
if(send_only){
last_ts_i = false;
playout_buffer->clearLastTs();
continue;
}
if(!last_ts_i){
last_ts = ts;
last_ts_i = true;
}
if(ts_less()(last_ts,ts) && !begin_talk
&& (ts-last_ts <= PLC_MAX_SAMPLES)) {
int l_size = conceal_loss(ts - last_ts);
if(l_size>0){
playout_buffer->direct_write(last_ts,
(ShortSample*)samples.back_buffer(),
PCM16_B2S(l_size));
}
}
size = decode(size);
if(size <= 0){
ERROR("decode() returned %i\n",size);
return -1;
}
if(use_default_plc)
add_to_history(size);
playout_buffer->write(audio_buffer_ts, ts,
(ShortSample*)((unsigned char*)samples),
PCM16_B2S(size));
// update last_ts to end of received packet
// if not out-of-sequence
if (ts_less()(last_ts,ts) || last_ts == ts)
last_ts = ts + PCM16_B2S(size);
playout_buffer->write(audio_buffer_ts, rtp_ts, (ShortSample*)((unsigned char *)samples),
PCM16_B2S(size), begin_talk);
}
return size;
#else // using adaptive JB
int rtp_size;
int audio_size;
unsigned int ts;
if (send_only) {
last_ts_i = false;
return 0;
}
/**
* Receive all RTP packets that correspond to the required interval,
* decode them and put into playout buffer.
*/
while (true)
{
rtp_size = AmRtpStream::receive((unsigned char*)samples,
(unsigned int)AUDIO_BUFFER_SIZE, &ts,
audio_buffer_ts, getFrameSize());
if (rtp_size < 0) {
return rtp_size;
}
if (rtp_size == 0) { // No more RTP packets for this interval
break;
}
audio_size = decode(rtp_size);
if (audio_size <= 0) {
ERROR("decode() returned %i\n", audio_size);
return -1;
}
playout_buffer->direct_write(ts,
(ShortSample*)((unsigned char*)samples),
PCM16_B2S(audio_size));
/* Conceal the gap between previous and current RTP packets */
if (last_ts_i && ts_less()(m_last_rtp_endts, ts))
{
int concealed_size = conceal_loss(ts - m_last_rtp_endts);
if (concealed_size > 0)
playout_buffer->direct_write(m_last_rtp_endts,
(ShortSample*)((unsigned char*)samples.back_buffer()),
PCM16_B2S(concealed_size));
}
m_last_rtp_endts = ts + bytes2samples(rtp_size);
last_ts_i = true;
if(use_default_plc) {
add_to_history(audio_size);
}
}
if (!last_ts_i) {
return 0;
}
if (ts_less()(m_last_rtp_endts, audio_buffer_ts + getFrameSize()))
{
/* Last packets have been lost. Conceal them */
int concealed_size = conceal_loss(audio_buffer_ts + getFrameSize() - m_last_rtp_endts);
if (concealed_size > 0)
playout_buffer->direct_write(m_last_rtp_endts,
(ShortSample*)((unsigned char*)samples.back_buffer()),
PCM16_B2S(concealed_size));
m_last_rtp_endts = audio_buffer_ts + getFrameSize();
}
return PCM16_S2B(getFrameSize());
#endif
}
int AmRtpAudio::get(unsigned int user_ts, unsigned char* buffer, unsigned int nb_samples)
@ -223,7 +132,7 @@ void AmRtpAudio::init(const SdpPayload* sdp_payload)
use_default_plc = !(codec && codec->plc);
}
unsigned int AmRtpAudio::conceal_loss(unsigned int ts_diff)
unsigned int AmRtpAudio::conceal_loss(unsigned int ts_diff, unsigned char *buffer)
{
int s=0;
if(!use_default_plc){
@ -232,13 +141,13 @@ unsigned int AmRtpAudio::conceal_loss(unsigned int ts_diff)
long h_codec = fmt->getHCodec();
assert(codec && codec->plc);
s = (*codec->plc)(samples.back_buffer(),PCM16_S2B(ts_diff),
s = (*codec->plc)(buffer, PCM16_S2B(ts_diff),
fmt->channels,fmt->rate,h_codec);
DBG("codec specific PLC (ts_diff = %i; s = %i)\n",ts_diff,s);
}
else {
s = default_plc(samples.back_buffer(),PCM16_S2B(ts_diff),
s = default_plc(buffer, PCM16_S2B(ts_diff),
fmt->channels,fmt->rate);
DBG("default PLC (ts_diff = %i; s = %i)\n",ts_diff,s);
@ -263,9 +172,12 @@ unsigned int AmRtpAudio::default_plc(unsigned char* out_buf,
return PCM16_S2B(buf_offset - (short*)out_buf);
}
void AmRtpAudio::add_to_history(unsigned int size)
void AmRtpAudio::add_to_history(int16_t *buffer, unsigned int size)
{
short* buf_offset = (short*)((unsigned char*)samples);
int16_t* buf_offset = buffer;
if (!use_default_plc)
return;
for(unsigned int i=0; i<(PCM16_B2S(size)/FRAMESZ); i++){
@ -274,25 +186,33 @@ void AmRtpAudio::add_to_history(unsigned int size)
}
}
void AmRtpAudio::setAdaptivePlayout(bool on)
void AmRtpAudio::setPlayoutType(PlayoutType type)
{
bool is_adaptive =
dynamic_cast<AmAdaptivePlayout*>
(playout_buffer.get()) != 0;
if(on == !is_adaptive){
PlayoutType curr_type = SIMPLE_PLAYOUT;
if (dynamic_cast<AmAdaptivePlayout *>(playout_buffer.get()))
curr_type = ADAPTIVE_PLAYOUT;
else if (dynamic_cast<AmJbPlayout *>(playout_buffer.get()))
curr_type = JB_PLAYOUT;
if(!is_adaptive){
session->lockAudio();
playout_buffer.reset(new AmAdaptivePlayout());
if (curr_type != type)
{
if (type == ADAPTIVE_PLAYOUT) {
session->lockAudio();
playout_buffer.reset(new AmAdaptivePlayout(this));
session->unlockAudio();
DBG("Adaptive playout buffer activated\n");
}
else{
else if (type == JB_PLAYOUT) {
session->lockAudio();
playout_buffer.reset(new AmJbPlayout(this));
session->unlockAudio();
DBG("Adaptive jitter buffer activated\n");
}
else {
session->lockAudio();
playout_buffer.reset(new AmPlayoutBuffer());
playout_buffer.reset(new AmPlayoutBuffer(this));
session->unlockAudio();
DBG("Adaptive playout buffer deactivated\n");
DBG("Simple playout buffer activated\n");
}
}
}

@ -30,13 +30,15 @@
#include "AmAudio.h"
#include "AmRtpStream.h"
#include "AmPlayoutBuffer.h"
#include "LowcFE.h"
// Maximum value: AUDIO_BUFFER_SIZE / 2
// Note: plc result get stored in our back buffer
#define PLC_MAX_SAMPLES (160*4)
class AmPlayoutBuffer;
enum PlayoutType {
ADAPTIVE_PLAYOUT,
JB_PLAYOUT,
SIMPLE_PLAYOUT
};
/**
* \brief binds together a \ref AmRtpStream and an \ref AmAudio for a session
*/
@ -52,19 +54,8 @@ class AmRtpAudio: public AmRtpStream, public AmAudio
bool last_check_i;
bool send_int;
#ifndef USE_ADAPTIVE_JB
unsigned int last_ts;
#else
unsigned int m_last_rtp_endts;
#endif
bool last_ts_i;
bool send_only;
// Conceals packet loss into the back buffer
// @return length in bytes of the recivered segment
unsigned int conceal_loss(unsigned int ts_diff);
//
// Default packet loss concealment functions
//
@ -73,8 +64,6 @@ class AmRtpAudio: public AmRtpStream, public AmAudio
unsigned int channels,
unsigned int rate);
void add_to_history(unsigned int size);
public:
AmRtpAudio(AmSession* _s=0);
@ -97,9 +86,15 @@ public:
// AmRtpStream interface
void init(const SdpPayload* sdp_payload);
void setAdaptivePlayout(bool on);
void setPlayoutType(PlayoutType type);
virtual unsigned int bytes2samples(unsigned int) const;
void add_to_history(int16_t *buffer, unsigned int size);
// Conceals packet loss into the out_buffer
// @return length in bytes of the recivered segment
unsigned int conceal_loss(unsigned int ts_diff, unsigned char *out_buffer);
};
#endif

@ -54,8 +54,6 @@
#include <set>
using std::set;
#define MAX_DELAY 8000 /* 1 second */
/*
* This function must be called before setLocalPort, because
* setLocalPort will bind the socket and it will be not
@ -188,14 +186,13 @@ int AmRtpStream::send( unsigned int ts, unsigned char* buffer, unsigned int size
return size;
}
#ifndef USE_ADAPTIVE_JB
// returns
// @param ts [out] timestamp of the received packet,
// in audio buffer relative time
// @param audio_buffer_ts [in] current ts at the audio_buffer
int AmRtpStream::receive( unsigned char* buffer, unsigned int size,
unsigned int& ts, unsigned int audio_buffer_ts)
unsigned int& ts)
{
AmRtpPacket rp;
int err = nextPacket(rp);
@ -235,139 +232,47 @@ int AmRtpStream::receive( unsigned char* buffer, unsigned int size,
/* do we have a new talk spurt? */
begin_talk = ((last_payload == 13) || rp.marker);
last_payload = rp.payload;
if(!rp.getDataSize())
return RTP_EMPTY;
if(payload != rp.payload){
if (telephone_event_pt.get() && rp.payload == telephone_event_pt->payload_type)
{
dtmf_payload_t* dpl = (dtmf_payload_t*)rp.getData();
DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i\n",
dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration));
session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getTelephoneEventRate()));
return RTP_DTMF;
}
else
return RTP_UNKNOWN_PL;
}
if(!recv_offset_i){
recv_offset = rp.timestamp - audio_buffer_ts;
recv_offset_i = true;
DBG("initialized recv_offset with %i (%i - %i)\n",
recv_offset,audio_buffer_ts,rp.timestamp);
ts = audio_buffer_ts;// + jitter_delay;
}
else {
ts = rp.timestamp - recv_offset;// + jitter_delay;
// resync
if( ts_less()(ts, audio_buffer_ts - MAX_DELAY/2) ||
!ts_less()(ts, audio_buffer_ts + MAX_DELAY) ){
DBG("resync needed: reference ts = %u; write ts = %u\n",
audio_buffer_ts,ts);
recv_offset = rp.timestamp - audio_buffer_ts;
ts = audio_buffer_ts;// + jitter_delay;
}
}
assert(rp.getData());
if(rp.getDataSize() > size){
ERROR("received too big RTP packet\n");
return RTP_BUFFER_SIZE;
}
memcpy(buffer,rp.getData(),rp.getDataSize());
return rp.getDataSize();
}
#else
// returns
// @param ts [out] timestamp of the received packet,
// in audio buffer relative time
// @param audio_buffer_ts [in] current ts at the audio_buffer
int AmRtpStream::receive( unsigned char* buffer, unsigned int buf_size,
unsigned int *ts,
unsigned int audio_buffer_ts, unsigned int ms)
{
AmRtpPacket rp;
AmRtpPacket dtmf_pkt;
while (m_telephone_event_jb->get(dtmf_pkt, audio_buffer_ts, ms))
if (telephone_event_pt.get() && rp.payload == telephone_event_pt->payload_type)
{
dtmf_payload_t* dpl = (dtmf_payload_t*)dtmf_pkt.getData();
dtmf_payload_t* dpl = (dtmf_payload_t*)rp.getData();
DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i\n",
dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration));
session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getTelephoneEventRate()));
DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i\n",
dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration));
session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getTelephoneEventRate()));
return RTP_DTMF;
}
int err = nextAudioPacket(rp, audio_buffer_ts, ms);
if(err <= 0)
return err;
#ifdef SUPPORT_IPV6
struct sockaddr_storage recv_addr;
#else
struct sockaddr_in recv_addr;
#endif
rp.getAddr(&recv_addr);
#ifndef SUPPORT_IPV6
// symmetric RTP
if( passive && ((recv_addr.sin_port != r_saddr.sin_port)
|| (recv_addr.sin_addr.s_addr
!= r_saddr.sin_addr.s_addr)) ) {
string addr_str = get_addr_str(recv_addr.sin_addr);
int port = ntohs(recv_addr.sin_port);
setRAddr(addr_str,port);
DBG("Symmetric RTP: setting new remote address: %s:%i\n",addr_str.c_str(),port);
// avoid comparing each time sender address
passive = false;
if (payload != rp.payload){
return RTP_UNKNOWN_PL;
}
#endif
/* do we have a new talk spurt? */
begin_talk = ((last_payload == 13) || rp.marker);
last_payload = rp.payload;
if(!rp.getDataSize())
return RTP_EMPTY;
assert(rp.getData());
if(rp.getDataSize() > buf_size){
if(rp.getDataSize() > size){
ERROR("received too big RTP packet\n");
return RTP_BUFFER_SIZE;
}
memcpy(buffer,rp.getData(),rp.getDataSize());
*ts = rp.timestamp;
ts = rp.timestamp;
return rp.getDataSize();
return rp.getDataSize();
}
#endif // USE_ADAPTIVE_JB
AmRtpStream::AmRtpStream(AmSession* _s)
: runcond(0),
r_port(0),
l_port(0),
l_sd(0),
recv_offset_i(false),
// recv_offset_i(false),
r_ssrc_i(false),
session(_s),
passive(false),
telephone_event_pt(NULL),
mute(false),
receiving(true)
#ifdef USE_ADAPTIVE_JB
,
m_main_jb(new AmJitterBuffer(this)),
m_telephone_event_jb(new AmJitterBuffer(this))
#endif
{
@ -395,11 +300,6 @@ AmRtpStream::~AmRtpStream()
AmRtpReceiver::instance()->removeStream(l_sd);
close(l_sd);
}
#ifdef USE_ADAPTIVE_JB
if (m_main_jb) delete(m_main_jb);
if (m_telephone_event_jb)
delete (m_telephone_event_jb);
#endif
}
int AmRtpStream::getLocalPort()
@ -488,7 +388,6 @@ void AmRtpStream::icmpError()
}
}
#ifndef USE_ADAPTIVE_JB
void AmRtpStream::bufferPacket(const AmRtpPacket* p)
{
if (!receiving && !passive)
@ -530,40 +429,6 @@ int AmRtpStream::nextPacket(AmRtpPacket& p)
return 1;
}
#else
void AmRtpStream::bufferPacket(const AmRtpPacket* p)
{
if (!receiving && !passive)
return;
gettimeofday(&last_recv_time,NULL);
if (p->payload == payload && m_main_jb)
m_main_jb->put(p);
else if (telephone_event_pt.get() && p->payload == telephone_event_pt->payload_type)
m_telephone_event_jb->put(p);
}
int AmRtpStream::nextAudioPacket(AmRtpPacket& p, unsigned int ts, unsigned int ms)
{
if (!receiving && !passive)
return RTP_EMPTY;
if (m_main_jb && m_main_jb->get(p, ts, ms))
return 1;
struct timeval now;
struct timeval diff;
gettimeofday(&now,NULL);
timersub(&now,&last_recv_time,&diff);
if(diff.tv_sec > DEAD_RTP_TIME){
WARN("RTP Timeout detected. Last received packet is too old.\n");
DBG("diff.tv_sec = %i\n",(unsigned int)diff.tv_sec);
return RTP_TIMEOUT;
}
return RTP_EMPTY;
}
#endif
int AmRtpStream::getTelephoneEventRate()
{

@ -62,10 +62,6 @@ class AmSession;
class SdpPayload;
typedef map<unsigned int, AmRtpPacket, ts_less> JitterBuffer;
#ifdef USE_ADAPTIVE_JB
class AmJitterBuffer;
#endif
/**
* \brief RTP implementation
*
@ -118,11 +114,6 @@ protected:
struct timeval last_recv_time;
/** the offset RTP receive TS <-> audio_buffer TS */
unsigned int recv_offset;
/** the recv_offset initialized ? */
bool recv_offset_i;
unsigned int l_ssrc;
unsigned int r_ssrc;
bool r_ssrc_i;
@ -134,19 +125,11 @@ protected:
auto_ptr<const SdpPayload> telephone_event_pt;
#ifndef USE_ADAPTIVE_JB
JitterBuffer jitter_buf;
AmMutex jitter_mut;
/* get next packet in buffer */
int nextPacket(AmRtpPacket& p);
#else
AmJitterBuffer *m_main_jb;
AmJitterBuffer *m_telephone_event_jb;
/* get next packet in buffer */
int nextAudioPacket(AmRtpPacket& p, unsigned int ts, unsigned int ms);
#endif
AmSession* session;
@ -167,14 +150,9 @@ public:
unsigned char* buffer,
unsigned int size );
#ifndef USE_ADAPTIVE_JB
int receive( unsigned char* buffer, unsigned int size,
unsigned int& ts, unsigned int audio_buffer_ts);
#else
int receive( unsigned char* buffer, unsigned int size, unsigned int *ts,
unsigned int audio_buffer_ts, unsigned int ms);
#endif
unsigned int& ts);
/** Allocates resources for future use of RTP. */
AmRtpStream(AmSession* _s=0);

@ -37,6 +37,7 @@
#include "AmSessionContainer.h"
#include "AmMediaProcessor.h"
#include "AmDtmfDetector.h"
#include "AmPlayoutBuffer.h"
#include "log.h"

@ -62,7 +62,7 @@ AmSession* EchoFactory::onInvite(const AmSipRequest& req)
}
EchoDialog::EchoDialog()
: adaptive_playout(false)
: playout_type(SIMPLE_PLAYOUT)
{
}
@ -85,10 +85,18 @@ void EchoDialog::onDtmf(int event, int duration)
{
#ifdef STAR_SWITCHES_PLAYOUTBUFFER
if (event == 10) {
adaptive_playout = !adaptive_playout;
DBG("received *. set adaptive playout to %s.\n", adaptive_playout ? "true":"false");
rtp_str.setAdaptivePlayout(adaptive_playout);
char* pt = "simple (fifo) playout buffer";
if (playout_type == SIMPLE_PLAYOUT) {
playout_type = ADAPTIVE_PLAYOUT;
pt = "adaptive playout buffer";
} else if (playout_type == ADAPTIVE_PLAYOUT) {
pt = "adaptive jitter buffer";
playout_type = JB_PLAYOUT;
} else
playout_type = SIMPLE_PLAYOUT;
DBG("received *. set playout technique to %s.\n", pt);
rtp_str.setPlayoutType(playout_type);
}
#endif
}

@ -48,7 +48,7 @@ public:
class EchoDialog : public AmSession
{
AmAudioEcho echo;
bool adaptive_playout;
PlayoutType playout_type;
public:
EchoDialog();
~EchoDialog();

Loading…
Cancel
Save