diff --git a/Makefile.defs b/Makefile.defs index 8509e207..990296a9 100644 --- a/Makefile.defs +++ b/Makefile.defs @@ -48,11 +48,10 @@ C_FLAGS ?= -Wall -fPIC -g \ -DOS='"$(OS)"' \ # -DSUPPORT_IPV6 -ifeq ($(WITH_ADAPTIVE_JITTERBUFFER), yes) -CXX_FLAGS += -DUSE_ADAPTIVE_JB +ifeq ($(DEBUG_PLAYOUT), yes) +CXX_FLAGS += -DDEBUG_PLAYOUTBUF endif - TARGET = ifeq ($(OS), linux) LD_FLAGS += -ldl -rdynamic -lpthread diff --git a/apps/conference/Conference.cpp b/apps/conference/Conference.cpp index e252f7d0..c471b5b8 100644 --- a/apps/conference/Conference.cpp +++ b/apps/conference/Conference.cpp @@ -50,6 +50,7 @@ string ConferenceFactory::LonelyUserFile; string ConferenceFactory::JoinSound; string ConferenceFactory::DropSound; string ConferenceFactory::DialoutSuffix; +PlayoutType ConferenceFactory::m_PlayoutType = ADAPTIVE_PLAYOUT; int ConferenceFactory::onLoad() { @@ -70,14 +71,23 @@ int ConferenceFactory::onLoad() JoinSound = cfg.getParameter("join_sound"); DropSound = cfg.getParameter("drop_sound"); - //RingTone = cfg.getParameter("ring_tone"); - DialoutSuffix = cfg.getParameter("dialout_suffix"); if(DialoutSuffix.empty()){ WARN("No dialout_suffix has been configured in the conference plug-in:\n"); WARN("\t -> dial out will not be available\n"); } + string playout_type = cfg.getParameter("playout_type"); + if (playout_type == "simple") { + m_PlayoutType = SIMPLE_PLAYOUT; + DBG("Using simple (fifo) buffer as playout technique.\n"); + } else if (playout_type == "adaptive_jb") { + m_PlayoutType = JB_PLAYOUT; + DBG("Using adaptive jitter buffer as playout technique.\n"); + } else { + DBG("Using adaptive playout buffer as playout technique.\n"); + } + return 0; } @@ -110,7 +120,7 @@ ConferenceDialog::ConferenceDialog(const string& conf_id, allow_dialout(false) { dialedout = this->dialout_channel.get() != 0; -// rtp_str.setAdaptivePlayout(true); + rtp_str.setPlayoutType(ConferenceFactory::m_PlayoutType); } ConferenceDialog::~ConferenceDialog() diff --git a/apps/conference/Conference.h b/apps/conference/Conference.h index 059690cf..af9c039a 100644 --- a/apps/conference/Conference.h +++ b/apps/conference/Conference.h @@ -73,7 +73,7 @@ public: static string JoinSound; static string DropSound; static string DialoutSuffix; - //static string RingTone; + static PlayoutType m_PlayoutType; ConferenceFactory(const string& _app_name); virtual AmSession* onInvite(const AmSipRequest&); diff --git a/core/AmJitterBuffer.cpp b/core/AmJitterBuffer.cpp index 9363047e..ba109697 100644 --- a/core/AmJitterBuffer.cpp +++ b/core/AmJitterBuffer.cpp @@ -24,16 +24,25 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include "AmRtpStream.h" + +#include "AmAudio.h" #include "AmJitterBuffer.h" -#include "AmRtpPacket.h" #include "log.h" #include "SampleArray.h" bool Packet::operator < (const Packet& p) const { - return ts_less()(m_packet.timestamp, p.m_packet.timestamp); + return ts_less()(m_ts, p.m_ts); +} + +void Packet::init(const ShortSample *data, unsigned int size, unsigned int ts) +{ + size = PCM16_S2B(size); + if (size > sizeof(m_data)) + size = sizeof(m_data); + m_size = PCM16_B2S(size); + memcpy(m_data, data, size); + m_ts = ts; } PacketAllocator::PacketAllocator() @@ -45,13 +54,15 @@ PacketAllocator::PacketAllocator() m_packets[MAX_JITTER / 80 - 1].m_next = NULL; } -Packet *PacketAllocator::alloc(const AmRtpPacket *p) +Packet *PacketAllocator::alloc(const ShortSample *data, unsigned int size, unsigned int ts) { if (m_free_packets == NULL) return NULL; Packet *retval = m_free_packets; m_free_packets = retval->m_next; - memcpy(&retval->m_packet, p, sizeof(*p)); + + retval->init(data, size, ts); + retval->m_next = retval->m_prev = NULL; return retval; } @@ -63,40 +74,42 @@ void PacketAllocator::free(Packet *p) m_free_packets = p; } -AmJitterBuffer::AmJitterBuffer(AmRtpStream *owner) +AmJitterBuffer::AmJitterBuffer() : m_tsInited(false), m_tsDeltaInited(false), m_delayCount(0), - m_jitter(INITIAL_JITTER), m_owner(owner), + m_jitter(INITIAL_JITTER), m_tail(NULL), m_head(NULL), m_forceResync(false) { } -void AmJitterBuffer::put(const AmRtpPacket *p) +void AmJitterBuffer::put(const ShortSample *data, unsigned int size, unsigned int ts, bool begin_talk) { m_mutex.lock(); - if (p->marker) + if (begin_talk) m_forceResync = true; - if (m_tsInited && !m_forceResync && ts_less()(m_lastTs + m_jitter, p->timestamp)) + if (m_tsInited && !m_forceResync && ts_less()(m_lastTs + m_jitter, ts)) { - unsigned int delay = p->timestamp - m_lastTs; + unsigned int delay = ts - m_lastTs; if (delay > m_jitter && m_jitter < MAX_JITTER) { m_jitter += (delay - m_jitter) / 2; if (m_jitter > MAX_JITTER) m_jitter = MAX_JITTER; - // DBG("Jitter buffer delay increased to %u\n", m_jitter); +#ifdef DEBUG_PLAYOUTBUF + DBG("Jitter buffer delay increased to %u\n", m_jitter); +#endif } // Packet arrived too late to be put into buffer - if (ts_less()(p->timestamp + m_jitter, m_lastTs)) { + if (ts_less()(ts + m_jitter, m_lastTs)) { m_mutex.unlock(); return; } } - Packet *elem = m_allocator.alloc(p); + Packet *elem = m_allocator.alloc(data, size, ts); if (elem == NULL) { elem = m_head; m_head = m_head->m_next; m_head->m_prev = NULL; - memcpy(&elem->m_packet, p, sizeof(*p)); + elem->init(data, size, ts); } if (m_tail == NULL) { @@ -124,11 +137,11 @@ void AmJitterBuffer::put(const AmRtpPacket *p) } } if (!m_tsInited) { - m_lastTs = p->timestamp; + m_lastTs = ts; m_tsInited = true; } - else if (ts_less()(m_lastTs, p->timestamp) || m_forceResync) { - m_lastTs = p->timestamp; + else if (ts_less()(m_lastTs, ts) || m_forceResync) { + m_lastTs = ts; } m_mutex.unlock(); @@ -139,7 +152,8 @@ void AmJitterBuffer::put(const AmRtpPacket *p) * To get all the packets for the single ts the caller must call this * method with the same ts till the return value will become false. */ -bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms) +bool AmJitterBuffer::get(unsigned int ts, unsigned int ms, ShortSample *out_buf, + unsigned int *out_size, unsigned int *out_ts) { bool retval = true; @@ -152,7 +166,11 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms) m_tsDelta = m_lastTs - ts + ms; m_tsDeltaInited = true; m_lastAudioTs = ts; - m_forceResync = true; + m_forceResync = false; +#ifdef DEBUG_PLAYOUTBUF + DBG("Jitter buffer: initialized tsDelta with %u\n", m_tsDelta); + m_tsDeltaStart = m_tsDelta; +#endif } else if (m_lastAudioTs != ts && m_lastResyncTs != m_lastTs) { if (ts_less()(ts + m_tsDelta, m_lastTs)) { @@ -161,15 +179,20 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms) * immediate resync required */ m_tsDelta += m_lastTs - ts + ms; - // DBG("Jitter buffer resynced forward (-> %u)\n", m_tsDelta); +#ifdef DEBUG_PLAYOUTBUF + DBG("Jitter buffer resynced forward (-> %d rel)\n", + m_tsDelta - m_tsDeltaStart); +#endif m_delayCount = 0; - } - else if (ts_less()(m_lastTs, ts + m_tsDelta - m_jitter / 2)) { + } else if (ts_less()(m_lastTs, ts + m_tsDelta - m_jitter / 2)) { /* New packet hasn't arrived yet */ if (m_delayCount > RESYNC_THRESHOLD) { unsigned int d = m_tsDelta -(m_lastTs - ts + ms); m_tsDelta -= d / 2; - // DBG("Jitter buffer resynced backward (-> %u)\n", m_tsDelta); +#ifdef DEBUG_PLAYOUTBUF + DBG("Jitter buffer resynced backward (-> %d rel)\n", + m_tsDelta - m_tsDeltaStart); +#endif } else ++m_delayCount; @@ -185,7 +208,7 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms) // DBG("Getting pkt at %u, res ts = %u\n", get_ts / m_frameSize, p.timestamp); // First of all throw away all too old packets from the head Packet *tmp; - for (tmp = m_head; tmp && ts_less()(tmp->m_packet.timestamp + m_owner->bytes2samples(tmp->m_packet.getDataSize()), get_ts); ) + for (tmp = m_head; tmp && ts_less()(tmp->ts() + tmp->size(), get_ts); ) { m_head = tmp->m_next; if (m_head == NULL) @@ -196,7 +219,7 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms) tmp = m_head; } // Get the packet from the head - if (m_head && ts_less()(m_head->m_packet.timestamp, get_ts + ms)) + if (m_head && ts_less()(m_head->ts(), get_ts + ms)) { tmp = m_head; m_head = tmp->m_next; @@ -204,9 +227,10 @@ bool AmJitterBuffer::get(AmRtpPacket& p, unsigned int ts, unsigned int ms) m_tail = NULL; else m_head->m_prev = NULL; - memcpy(&p, &tmp->m_packet, sizeof(p)); + memcpy(out_buf, tmp->data(), PCM16_S2B(tmp->size())); // Map RTP timestamp to internal audio timestamp - p.timestamp -= m_tsDelta - m_jitter; + *out_ts = tmp->ts() - m_tsDelta + m_jitter; + *out_size = tmp->size(); m_allocator.free(tmp); } else diff --git a/core/AmJitterBuffer.h b/core/AmJitterBuffer.h index 17d59c68..1fe32e8f 100644 --- a/core/AmJitterBuffer.h +++ b/core/AmJitterBuffer.h @@ -28,20 +28,26 @@ #ifndef _AmJitterBuffer_h_ #define _AmJitterBuffer_h_ +#include "amci/amci.h" #include "AmThread.h" -#include "AmRtpPacket.h" - -class AmRtpStream; +#include "SampleArray.h" #define INITIAL_JITTER 640 // 80 miliseconds #define MAX_JITTER 16000 // 2 seconds #define RESYNC_THRESHOLD 2 class Packet { + ShortSample m_data[AUDIO_BUFFER_SIZE * 2]; + unsigned int m_size; + unsigned int m_ts; public: - AmRtpPacket m_packet; Packet *m_next; Packet *m_prev; + void init(const ShortSample *data, unsigned int size, unsigned int ts); + + unsigned int size() const { return m_size; } + unsigned int ts() const { return m_ts; } + ShortSample *data() { return m_data; } bool operator < (const Packet&) const; }; @@ -54,7 +60,7 @@ private: public: PacketAllocator(); - Packet *alloc(const AmRtpPacket *); + Packet *alloc(const ShortSample *data, unsigned int size, unsigned int ts); void free(Packet *p); }; @@ -73,13 +79,17 @@ private: bool m_tsDeltaInited; int m_delayCount; unsigned int m_jitter; - AmRtpStream *m_owner; +// AmRtpStream *m_owner; bool m_forceResync; +#ifdef DEBUG_PLAYOUTBUF + unsigned int m_tsDeltaStart; +#endif + public: - AmJitterBuffer(AmRtpStream *owner); - void put(const AmRtpPacket *); - bool get(AmRtpPacket &, unsigned int ts, unsigned int ms); + AmJitterBuffer(); + void put(const ShortSample *data, unsigned int size, unsigned int ts, bool begin_talk); + bool get(unsigned int ts, unsigned int ms, ShortSample *out, unsigned int *size, unsigned int *out_ts); }; #endif // _AmJitterBuffer_h_ diff --git a/core/AmPlayoutBuffer.cpp b/core/AmPlayoutBuffer.cpp index 6ec56257..b9c53d3c 100644 --- a/core/AmPlayoutBuffer.cpp +++ b/core/AmPlayoutBuffer.cpp @@ -1,5 +1,6 @@ #include "AmPlayoutBuffer.h" #include "AmAudio.h" +#include "AmRtpAudio.h" #define SEARCH_OFFSET 140 @@ -14,30 +15,79 @@ #define PI 3.14 -#define DEBUG_PLAYOUTBUF +#define MAX_DELAY 8000 /* 1 second */ -AmPlayoutBuffer::AmPlayoutBuffer() - : r_ts(0),w_ts(0) + +AmPlayoutBuffer::AmPlayoutBuffer(AmRtpAudio *owner) + : r_ts(0),w_ts(0), last_ts_i(false), m_owner(owner) { } -void AmPlayoutBuffer::direct_write(unsigned int ts, ShortSample* buf, unsigned int len) +void AmPlayoutBuffer::direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len) { -#ifndef USE_ADAPTIVE_JB buffer_put(w_ts,buf,len); -#else - buffer_put(ts, buf, len); -#endif // USE_ADAPTIVE_JB } -void AmPlayoutBuffer::write(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len) +void AmPlayoutBuffer::write(u_int32_t ref_ts, u_int32_t rtp_ts, + int16_t* buf, u_int32_t len, bool begin_talk) +{ + unsigned int mapped_ts; + if(!recv_offset_i) + { + recv_offset = rtp_ts - ref_ts; + recv_offset_i = true; + DBG("initialized recv_offset with %i (%i - %i)\n", + recv_offset, ref_ts, rtp_ts); + mapped_ts = ref_ts;// + jitter_delay; + } + else { + mapped_ts = rtp_ts - recv_offset;// + jitter_delay; + + // resync + if( ts_less()(mapped_ts, ref_ts - MAX_DELAY/2) || + !ts_less()(mapped_ts, ref_ts + MAX_DELAY) ){ + + DBG("resync needed: reference ts = %u; write ts = %u\n", + ref_ts, mapped_ts); + recv_offset = rtp_ts - ref_ts; + mapped_ts = ref_ts;// + jitter_delay; + } + } + + if(!last_ts_i) + { + last_ts = mapped_ts; + last_ts_i = true; + } + + if(ts_less()(last_ts, mapped_ts) && !begin_talk + && (mapped_ts - last_ts <= PLC_MAX_SAMPLES)) + { + unsigned char tmp[AUDIO_BUFFER_SIZE * 2]; + int l_size = m_owner->conceal_loss(mapped_ts - last_ts, tmp); + if (l_size>0) + { + direct_write_buffer(last_ts, (ShortSample*)tmp, PCM16_B2S(l_size)); + } + } + write_buffer(ref_ts, mapped_ts, buf, len); + + m_owner->add_to_history(buf, PCM16_S2B(len)); + + // update last_ts to end of received packet + // if not out-of-sequence + if (ts_less()(last_ts, mapped_ts) || last_ts == mapped_ts) + last_ts = mapped_ts + len; +} + + +void AmPlayoutBuffer::write_buffer(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len) { buffer_put(w_ts,buf,len); } u_int32_t AmPlayoutBuffer::read(u_int32_t ts, int16_t* buf, u_int32_t len) { -#ifndef USE_ADAPTIVE_JB if(ts_less()(r_ts,w_ts)){ u_int32_t rlen=0; @@ -51,15 +101,28 @@ u_int32_t AmPlayoutBuffer::read(u_int32_t ts, int16_t* buf, u_int32_t len) } return 0; -#else - buffer_get(ts, buf, len); - return len; -#endif // USE_ADAPTIVE_JB } -AmAdaptivePlayout::AmAdaptivePlayout() - : idx(0), +void AmPlayoutBuffer::buffer_put(unsigned int ts, ShortSample* buf, unsigned int len) +{ + buffer.put(ts,buf,len); + + if(ts_less()(w_ts,ts+len)) + w_ts = ts + len; +} + +void AmPlayoutBuffer::buffer_get(unsigned int ts, ShortSample* buf, unsigned int len) +{ + buffer.get(ts,buf,len); + + if(ts_less()(r_ts,ts+len)) + r_ts = ts + len; +} + +AmAdaptivePlayout::AmAdaptivePlayout(AmRtpAudio *owner) + : AmPlayoutBuffer(owner), + idx(0), loss_rate(ORDER_STAT_LOSS_RATE), wsola_off(WSOLA_START_OFF), shr_threshold(SHR_THRESHOLD), @@ -126,7 +189,7 @@ u_int32_t AmAdaptivePlayout::next_delay(u_int32_t ref_ts, u_int32_t ts) return D; } -void AmAdaptivePlayout::write(u_int32_t ref_ts, u_int32_t ts, +void AmAdaptivePlayout::write_buffer(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len) { // predict next delay @@ -234,27 +297,11 @@ u_int32_t AmAdaptivePlayout::read(u_int32_t ts, int16_t* buf, u_int32_t len) return len; } -void AmAdaptivePlayout::direct_write(unsigned int ts, ShortSample* buf, unsigned int len) +void AmAdaptivePlayout::direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len) { buffer_put(ts+wsola_off,buf,len); } -void AmPlayoutBuffer::buffer_put(unsigned int ts, ShortSample* buf, unsigned int len) -{ - buffer.put(ts,buf,len); - - if(ts_less()(w_ts,ts+len)) - w_ts = ts + len; -} - -void AmPlayoutBuffer::buffer_get(unsigned int ts, ShortSample* buf, unsigned int len) -{ - buffer.get(ts,buf,len); - - if(ts_less()(r_ts,ts+len)) - r_ts = ts + len; -} - /** * find best cross correlation of a TEMPLATE_SEG samples * long frame @@ -387,7 +434,8 @@ u_int32_t AmAdaptivePlayout::time_scale(u_int32_t ts, float factor, float act_fact = s / (float)packet_len; #ifdef DEBUG_PLAYOUTBUF - DBG("at ts %u: new size = %u, ratio = %f, requested = %f\n", ts, s, act_fact, factor); + DBG("at ts %u: new size = %u, ratio = %f, requested = %f (wsola_off = %ld)\n", + ts, s, act_fact, factor, (long)wsola_off); #endif // break condition: coming to the end of the frame (with safety margin) if(p_buf_end - tmpl < TEMPLATE_SEG + DELTA) @@ -407,3 +455,67 @@ u_int32_t AmAdaptivePlayout::time_scale(u_int32_t ts, float factor, return s; } + +/***************************************************************** + * + * AmJbPlayout class methods + * + *****************************************************************/ + +AmJbPlayout::AmJbPlayout(AmRtpAudio *owner) + : AmPlayoutBuffer(owner) +{ +} + +u_int32_t AmJbPlayout::read(u_int32_t ts, int16_t* buf, u_int32_t len) +{ + prepare_buffer(ts, len); + buffer_get(ts, buf, len); + return len; +} + +void AmJbPlayout::direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len) +{ + buffer_put(ts, buf, len); +} + +void AmJbPlayout::prepare_buffer(unsigned int audio_buffer_ts, unsigned int ms) +{ + ShortSample buf[AUDIO_BUFFER_SIZE]; + unsigned int ts; + unsigned int nb_samples; + /** + * Get all RTP packets that correspond to the required interval, + * decode them and put into playout buffer. + */ + while (m_jb.get(audio_buffer_ts, ms, buf, &nb_samples, &ts)) + { + direct_write_buffer(ts, buf, nb_samples); + m_owner->add_to_history(buf, PCM16_S2B(nb_samples)); + /* Conceal the gap between previous and current RTP packets */ + if (last_ts_i && ts_less()(m_last_rtp_endts, ts)) + { + int concealed_size = m_owner->conceal_loss(ts - m_last_rtp_endts, (unsigned char *)buf); + if (concealed_size > 0) + direct_write_buffer(m_last_rtp_endts, buf, PCM16_B2S(concealed_size)); + } + m_last_rtp_endts = ts + nb_samples; + last_ts_i = true; + } + if (!last_ts_i) { + return; + } + if (ts_less()(m_last_rtp_endts, audio_buffer_ts + ms)) + { + /* Last packets have been lost. Conceal them */ + int concealed_size = m_owner->conceal_loss(audio_buffer_ts + ms - m_last_rtp_endts, (unsigned char *)buf); + if (concealed_size > 0) + direct_write_buffer(m_last_rtp_endts, buf, PCM16_B2S(concealed_size)); + m_last_rtp_endts = audio_buffer_ts + ms; + } +} + +void AmJbPlayout::write(u_int32_t ref_ts, u_int32_t rtp_ts, int16_t* buf, u_int32_t len, bool begin_talk) +{ + m_jb.put(buf, len, rtp_ts, begin_talk); +} diff --git a/core/AmPlayoutBuffer.h b/core/AmPlayoutBuffer.h index 8208dd2e..ca20ba3d 100644 --- a/core/AmPlayoutBuffer.h +++ b/core/AmPlayoutBuffer.h @@ -4,6 +4,7 @@ #include "SampleArray.h" #include "AmStats.h" #include "LowcFE.h" +#include "AmJitterBuffer.h" #include using std::multiset; @@ -23,6 +24,12 @@ using std::multiset; // search segments of size TEMPLATE_SEG samples #define TEMPLATE_SEG 80 +// Maximum value: AUDIO_BUFFER_SIZE / 2 +// Note: plc result get stored in our back buffer +#define PLC_MAX_SAMPLES (160*4) + +class AmRtpAudio; + /** \brief base class for Playout buffer */ class AmPlayoutBuffer { @@ -31,17 +38,29 @@ class AmPlayoutBuffer protected: u_int32_t r_ts,w_ts; + AmRtpAudio *m_owner; + + unsigned int last_ts; + bool last_ts_i; + + /** the offset RTP receive TS <-> audio_buffer TS */ + unsigned int recv_offset; + /** the recv_offset initialized ? */ + bool recv_offset_i; void buffer_put(unsigned int ts, ShortSample* buf, unsigned int len); void buffer_get(unsigned int ts, ShortSample* buf, unsigned int len); - + + virtual void write_buffer(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len); + virtual void direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len); public: - AmPlayoutBuffer(); + AmPlayoutBuffer(AmRtpAudio *owner); virtual ~AmPlayoutBuffer() {} - virtual void direct_write(unsigned int ts, ShortSample* buf, unsigned int len); - virtual void write(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len); + virtual void write(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len, bool begin_talk); virtual u_int32_t read(u_int32_t ts, int16_t* buf, u_int32_t len); + + void clearLastTs() { last_ts_i = false; } }; /** \brief adaptive playout buffer */ @@ -73,17 +92,36 @@ class AmAdaptivePlayout: public AmPlayoutBuffer public: - AmAdaptivePlayout(); + AmAdaptivePlayout(AmRtpAudio *); /** write len samples beginning from timestamp ts from buf */ - void direct_write(unsigned int ts, ShortSample* buf, unsigned int len); + void direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len); /** write len samples which beginn from timestamp ts from buf reference ts of buffer (monotonic increasing buffer ts) is ref_ts */ - void write(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len); + void write_buffer(u_int32_t ref_ts, u_int32_t ts, int16_t* buf, u_int32_t len); /** read len samples beginn from timestamp ts into buf */ u_int32_t read(u_int32_t ts, int16_t* buf, u_int32_t len); + +}; + +/** \brief adaptive jitter buffer */ +class AmJbPlayout : public AmPlayoutBuffer +{ +private: + AmJitterBuffer m_jb; + unsigned int m_last_rtp_endts; + +protected: + void direct_write_buffer(unsigned int ts, ShortSample* buf, unsigned int len); + void prepare_buffer(unsigned int ts, unsigned int ms); + +public: + AmJbPlayout(AmRtpAudio *owner); + + u_int32_t read(u_int32_t ts, int16_t* buf, u_int32_t len); + void write(u_int32_t ref_ts, u_int32_t rtp_ts, int16_t* buf, u_int32_t len, bool begin_talk); }; diff --git a/core/AmRtpAudio.cpp b/core/AmRtpAudio.cpp index afac390c..77f5557d 100644 --- a/core/AmRtpAudio.cpp +++ b/core/AmRtpAudio.cpp @@ -29,11 +29,12 @@ #include #include #include "AmSession.h" +#include "AmPlayoutBuffer.h" AmRtpAudio::AmRtpAudio(AmSession* _s) : AmRtpStream(_s), AmAudio(0), - last_ts_i(false), use_default_plc(true), - send_only(false), playout_buffer(new AmPlayoutBuffer()), + /*last_ts_i(false),*/ use_default_plc(true), + send_only(false), playout_buffer(new AmPlayoutBuffer(this)), last_check(0),last_check_i(false),send_int(false) { } @@ -72,122 +73,30 @@ unsigned int AmRtpAudio::bytes2samples(unsigned int bytes) const */ int AmRtpAudio::receive(unsigned int audio_buffer_ts) { -#ifndef USE_ADAPTIVE_JB -// using fifo or adaptive playout buffer int size; - unsigned int ts; + unsigned int rtp_ts; while(true) { - size = AmRtpStream::receive((unsigned char*)samples, - (unsigned int)AUDIO_BUFFER_SIZE,ts, - audio_buffer_ts); + size = AmRtpStream::receive((unsigned char*)samples, + (unsigned int)AUDIO_BUFFER_SIZE, rtp_ts); if(size <= 0) break; if(send_only){ - last_ts_i = false; + playout_buffer->clearLastTs(); continue; } - if(!last_ts_i){ - last_ts = ts; - last_ts_i = true; - } - - if(ts_less()(last_ts,ts) && !begin_talk - && (ts-last_ts <= PLC_MAX_SAMPLES)) { - - int l_size = conceal_loss(ts - last_ts); - if(l_size>0){ - - playout_buffer->direct_write(last_ts, - (ShortSample*)samples.back_buffer(), - PCM16_B2S(l_size)); - } - } - size = decode(size); if(size <= 0){ ERROR("decode() returned %i\n",size); return -1; } - if(use_default_plc) - add_to_history(size); - - playout_buffer->write(audio_buffer_ts, ts, - (ShortSample*)((unsigned char*)samples), - PCM16_B2S(size)); - // update last_ts to end of received packet - // if not out-of-sequence - if (ts_less()(last_ts,ts) || last_ts == ts) - last_ts = ts + PCM16_B2S(size); + playout_buffer->write(audio_buffer_ts, rtp_ts, (ShortSample*)((unsigned char *)samples), + PCM16_B2S(size), begin_talk); } - return size; -#else // using adaptive JB - int rtp_size; - int audio_size; - unsigned int ts; - - if (send_only) { - last_ts_i = false; - return 0; - } - /** - * Receive all RTP packets that correspond to the required interval, - * decode them and put into playout buffer. - */ - while (true) - { - rtp_size = AmRtpStream::receive((unsigned char*)samples, - (unsigned int)AUDIO_BUFFER_SIZE, &ts, - audio_buffer_ts, getFrameSize()); - if (rtp_size < 0) { - return rtp_size; - } - if (rtp_size == 0) { // No more RTP packets for this interval - break; - } - audio_size = decode(rtp_size); - if (audio_size <= 0) { - ERROR("decode() returned %i\n", audio_size); - return -1; - } - playout_buffer->direct_write(ts, - (ShortSample*)((unsigned char*)samples), - PCM16_B2S(audio_size)); - /* Conceal the gap between previous and current RTP packets */ - if (last_ts_i && ts_less()(m_last_rtp_endts, ts)) - { - int concealed_size = conceal_loss(ts - m_last_rtp_endts); - if (concealed_size > 0) - playout_buffer->direct_write(m_last_rtp_endts, - (ShortSample*)((unsigned char*)samples.back_buffer()), - PCM16_B2S(concealed_size)); - } - m_last_rtp_endts = ts + bytes2samples(rtp_size); - last_ts_i = true; - if(use_default_plc) { - add_to_history(audio_size); - } - } - if (!last_ts_i) { - return 0; - } - if (ts_less()(m_last_rtp_endts, audio_buffer_ts + getFrameSize())) - { - /* Last packets have been lost. Conceal them */ - int concealed_size = conceal_loss(audio_buffer_ts + getFrameSize() - m_last_rtp_endts); - if (concealed_size > 0) - playout_buffer->direct_write(m_last_rtp_endts, - (ShortSample*)((unsigned char*)samples.back_buffer()), - PCM16_B2S(concealed_size)); - m_last_rtp_endts = audio_buffer_ts + getFrameSize(); - } - - return PCM16_S2B(getFrameSize()); -#endif } int AmRtpAudio::get(unsigned int user_ts, unsigned char* buffer, unsigned int nb_samples) @@ -223,7 +132,7 @@ void AmRtpAudio::init(const SdpPayload* sdp_payload) use_default_plc = !(codec && codec->plc); } -unsigned int AmRtpAudio::conceal_loss(unsigned int ts_diff) +unsigned int AmRtpAudio::conceal_loss(unsigned int ts_diff, unsigned char *buffer) { int s=0; if(!use_default_plc){ @@ -232,13 +141,13 @@ unsigned int AmRtpAudio::conceal_loss(unsigned int ts_diff) long h_codec = fmt->getHCodec(); assert(codec && codec->plc); - s = (*codec->plc)(samples.back_buffer(),PCM16_S2B(ts_diff), + s = (*codec->plc)(buffer, PCM16_S2B(ts_diff), fmt->channels,fmt->rate,h_codec); DBG("codec specific PLC (ts_diff = %i; s = %i)\n",ts_diff,s); } else { - s = default_plc(samples.back_buffer(),PCM16_S2B(ts_diff), + s = default_plc(buffer, PCM16_S2B(ts_diff), fmt->channels,fmt->rate); DBG("default PLC (ts_diff = %i; s = %i)\n",ts_diff,s); @@ -263,9 +172,12 @@ unsigned int AmRtpAudio::default_plc(unsigned char* out_buf, return PCM16_S2B(buf_offset - (short*)out_buf); } -void AmRtpAudio::add_to_history(unsigned int size) +void AmRtpAudio::add_to_history(int16_t *buffer, unsigned int size) { - short* buf_offset = (short*)((unsigned char*)samples); + int16_t* buf_offset = buffer; + + if (!use_default_plc) + return; for(unsigned int i=0; i<(PCM16_B2S(size)/FRAMESZ); i++){ @@ -274,25 +186,33 @@ void AmRtpAudio::add_to_history(unsigned int size) } } -void AmRtpAudio::setAdaptivePlayout(bool on) +void AmRtpAudio::setPlayoutType(PlayoutType type) { - bool is_adaptive = - dynamic_cast - (playout_buffer.get()) != 0; - - if(on == !is_adaptive){ + PlayoutType curr_type = SIMPLE_PLAYOUT; + if (dynamic_cast(playout_buffer.get())) + curr_type = ADAPTIVE_PLAYOUT; + else if (dynamic_cast(playout_buffer.get())) + curr_type = JB_PLAYOUT; - if(!is_adaptive){ - session->lockAudio(); - playout_buffer.reset(new AmAdaptivePlayout()); + if (curr_type != type) + { + if (type == ADAPTIVE_PLAYOUT) { + session->lockAudio(); + playout_buffer.reset(new AmAdaptivePlayout(this)); session->unlockAudio(); DBG("Adaptive playout buffer activated\n"); } - else{ + else if (type == JB_PLAYOUT) { + session->lockAudio(); + playout_buffer.reset(new AmJbPlayout(this)); + session->unlockAudio(); + DBG("Adaptive jitter buffer activated\n"); + } + else { session->lockAudio(); - playout_buffer.reset(new AmPlayoutBuffer()); + playout_buffer.reset(new AmPlayoutBuffer(this)); session->unlockAudio(); - DBG("Adaptive playout buffer deactivated\n"); + DBG("Simple playout buffer activated\n"); } } } diff --git a/core/AmRtpAudio.h b/core/AmRtpAudio.h index 3c4f9431..c5d166a0 100644 --- a/core/AmRtpAudio.h +++ b/core/AmRtpAudio.h @@ -30,13 +30,15 @@ #include "AmAudio.h" #include "AmRtpStream.h" -#include "AmPlayoutBuffer.h" #include "LowcFE.h" -// Maximum value: AUDIO_BUFFER_SIZE / 2 -// Note: plc result get stored in our back buffer -#define PLC_MAX_SAMPLES (160*4) +class AmPlayoutBuffer; +enum PlayoutType { + ADAPTIVE_PLAYOUT, + JB_PLAYOUT, + SIMPLE_PLAYOUT +}; /** * \brief binds together a \ref AmRtpStream and an \ref AmAudio for a session */ @@ -52,19 +54,8 @@ class AmRtpAudio: public AmRtpStream, public AmAudio bool last_check_i; bool send_int; -#ifndef USE_ADAPTIVE_JB - unsigned int last_ts; -#else - unsigned int m_last_rtp_endts; -#endif - bool last_ts_i; - bool send_only; - // Conceals packet loss into the back buffer - // @return length in bytes of the recivered segment - unsigned int conceal_loss(unsigned int ts_diff); - // // Default packet loss concealment functions // @@ -73,8 +64,6 @@ class AmRtpAudio: public AmRtpStream, public AmAudio unsigned int channels, unsigned int rate); - void add_to_history(unsigned int size); - public: AmRtpAudio(AmSession* _s=0); @@ -97,9 +86,15 @@ public: // AmRtpStream interface void init(const SdpPayload* sdp_payload); - void setAdaptivePlayout(bool on); + void setPlayoutType(PlayoutType type); virtual unsigned int bytes2samples(unsigned int) const; + + void add_to_history(int16_t *buffer, unsigned int size); + + // Conceals packet loss into the out_buffer + // @return length in bytes of the recivered segment + unsigned int conceal_loss(unsigned int ts_diff, unsigned char *out_buffer); }; #endif diff --git a/core/AmRtpStream.cpp b/core/AmRtpStream.cpp index bdb0582b..b535269e 100644 --- a/core/AmRtpStream.cpp +++ b/core/AmRtpStream.cpp @@ -54,8 +54,6 @@ #include using std::set; -#define MAX_DELAY 8000 /* 1 second */ - /* * This function must be called before setLocalPort, because * setLocalPort will bind the socket and it will be not @@ -188,14 +186,13 @@ int AmRtpStream::send( unsigned int ts, unsigned char* buffer, unsigned int size return size; } -#ifndef USE_ADAPTIVE_JB // returns // @param ts [out] timestamp of the received packet, // in audio buffer relative time // @param audio_buffer_ts [in] current ts at the audio_buffer int AmRtpStream::receive( unsigned char* buffer, unsigned int size, - unsigned int& ts, unsigned int audio_buffer_ts) + unsigned int& ts) { AmRtpPacket rp; int err = nextPacket(rp); @@ -235,139 +232,47 @@ int AmRtpStream::receive( unsigned char* buffer, unsigned int size, /* do we have a new talk spurt? */ begin_talk = ((last_payload == 13) || rp.marker); last_payload = rp.payload; - + if(!rp.getDataSize()) return RTP_EMPTY; - - if(payload != rp.payload){ - - if (telephone_event_pt.get() && rp.payload == telephone_event_pt->payload_type) - { - dtmf_payload_t* dpl = (dtmf_payload_t*)rp.getData(); - - DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i\n", - dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration)); - session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getTelephoneEventRate())); - return RTP_DTMF; - } - else - return RTP_UNKNOWN_PL; - } - - if(!recv_offset_i){ - recv_offset = rp.timestamp - audio_buffer_ts; - recv_offset_i = true; - DBG("initialized recv_offset with %i (%i - %i)\n", - recv_offset,audio_buffer_ts,rp.timestamp); - ts = audio_buffer_ts;// + jitter_delay; - } - else { - ts = rp.timestamp - recv_offset;// + jitter_delay; - - // resync - if( ts_less()(ts, audio_buffer_ts - MAX_DELAY/2) || - !ts_less()(ts, audio_buffer_ts + MAX_DELAY) ){ - - DBG("resync needed: reference ts = %u; write ts = %u\n", - audio_buffer_ts,ts); - recv_offset = rp.timestamp - audio_buffer_ts; - ts = audio_buffer_ts;// + jitter_delay; - } - } - - assert(rp.getData()); - if(rp.getDataSize() > size){ - ERROR("received too big RTP packet\n"); - return RTP_BUFFER_SIZE; - } - memcpy(buffer,rp.getData(),rp.getDataSize()); - - return rp.getDataSize(); -} -#else -// returns -// @param ts [out] timestamp of the received packet, -// in audio buffer relative time -// @param audio_buffer_ts [in] current ts at the audio_buffer - -int AmRtpStream::receive( unsigned char* buffer, unsigned int buf_size, - unsigned int *ts, - unsigned int audio_buffer_ts, unsigned int ms) -{ - AmRtpPacket rp; - AmRtpPacket dtmf_pkt; - - while (m_telephone_event_jb->get(dtmf_pkt, audio_buffer_ts, ms)) + if (telephone_event_pt.get() && rp.payload == telephone_event_pt->payload_type) { - dtmf_payload_t* dpl = (dtmf_payload_t*)dtmf_pkt.getData(); + dtmf_payload_t* dpl = (dtmf_payload_t*)rp.getData(); - DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i\n", - dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration)); - session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getTelephoneEventRate())); + DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i\n", + dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration)); + session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getTelephoneEventRate())); + return RTP_DTMF; } - - int err = nextAudioPacket(rp, audio_buffer_ts, ms); - if(err <= 0) - return err; -#ifdef SUPPORT_IPV6 - struct sockaddr_storage recv_addr; -#else - struct sockaddr_in recv_addr; -#endif - rp.getAddr(&recv_addr); -#ifndef SUPPORT_IPV6 - // symmetric RTP - if( passive && ((recv_addr.sin_port != r_saddr.sin_port) - || (recv_addr.sin_addr.s_addr - != r_saddr.sin_addr.s_addr)) ) { - - string addr_str = get_addr_str(recv_addr.sin_addr); - int port = ntohs(recv_addr.sin_port); - setRAddr(addr_str,port); - DBG("Symmetric RTP: setting new remote address: %s:%i\n",addr_str.c_str(),port); - // avoid comparing each time sender address - passive = false; + if (payload != rp.payload){ + return RTP_UNKNOWN_PL; } -#endif - - /* do we have a new talk spurt? */ - begin_talk = ((last_payload == 13) || rp.marker); - last_payload = rp.payload; - - if(!rp.getDataSize()) - return RTP_EMPTY; assert(rp.getData()); - if(rp.getDataSize() > buf_size){ + if(rp.getDataSize() > size){ ERROR("received too big RTP packet\n"); return RTP_BUFFER_SIZE; } memcpy(buffer,rp.getData(),rp.getDataSize()); - *ts = rp.timestamp; + ts = rp.timestamp; - return rp.getDataSize(); + return rp.getDataSize(); } -#endif // USE_ADAPTIVE_JB AmRtpStream::AmRtpStream(AmSession* _s) : runcond(0), r_port(0), l_port(0), l_sd(0), - recv_offset_i(false), +// recv_offset_i(false), r_ssrc_i(false), session(_s), passive(false), telephone_event_pt(NULL), mute(false), receiving(true) -#ifdef USE_ADAPTIVE_JB - , - m_main_jb(new AmJitterBuffer(this)), - m_telephone_event_jb(new AmJitterBuffer(this)) -#endif { @@ -395,11 +300,6 @@ AmRtpStream::~AmRtpStream() AmRtpReceiver::instance()->removeStream(l_sd); close(l_sd); } -#ifdef USE_ADAPTIVE_JB - if (m_main_jb) delete(m_main_jb); - if (m_telephone_event_jb) - delete (m_telephone_event_jb); -#endif } int AmRtpStream::getLocalPort() @@ -488,7 +388,6 @@ void AmRtpStream::icmpError() } } -#ifndef USE_ADAPTIVE_JB void AmRtpStream::bufferPacket(const AmRtpPacket* p) { if (!receiving && !passive) @@ -530,40 +429,6 @@ int AmRtpStream::nextPacket(AmRtpPacket& p) return 1; } -#else -void AmRtpStream::bufferPacket(const AmRtpPacket* p) -{ - if (!receiving && !passive) - return; - - gettimeofday(&last_recv_time,NULL); - if (p->payload == payload && m_main_jb) - m_main_jb->put(p); - else if (telephone_event_pt.get() && p->payload == telephone_event_pt->payload_type) - m_telephone_event_jb->put(p); -} - -int AmRtpStream::nextAudioPacket(AmRtpPacket& p, unsigned int ts, unsigned int ms) -{ - if (!receiving && !passive) - return RTP_EMPTY; - - if (m_main_jb && m_main_jb->get(p, ts, ms)) - return 1; - - struct timeval now; - struct timeval diff; - gettimeofday(&now,NULL); - - timersub(&now,&last_recv_time,&diff); - if(diff.tv_sec > DEAD_RTP_TIME){ - WARN("RTP Timeout detected. Last received packet is too old.\n"); - DBG("diff.tv_sec = %i\n",(unsigned int)diff.tv_sec); - return RTP_TIMEOUT; - } - return RTP_EMPTY; -} -#endif int AmRtpStream::getTelephoneEventRate() { diff --git a/core/AmRtpStream.h b/core/AmRtpStream.h index de2f1d9a..692cbabf 100644 --- a/core/AmRtpStream.h +++ b/core/AmRtpStream.h @@ -62,10 +62,6 @@ class AmSession; class SdpPayload; typedef map JitterBuffer; -#ifdef USE_ADAPTIVE_JB -class AmJitterBuffer; -#endif - /** * \brief RTP implementation * @@ -118,11 +114,6 @@ protected: struct timeval last_recv_time; - /** the offset RTP receive TS <-> audio_buffer TS */ - unsigned int recv_offset; - /** the recv_offset initialized ? */ - bool recv_offset_i; - unsigned int l_ssrc; unsigned int r_ssrc; bool r_ssrc_i; @@ -134,19 +125,11 @@ protected: auto_ptr telephone_event_pt; -#ifndef USE_ADAPTIVE_JB JitterBuffer jitter_buf; AmMutex jitter_mut; /* get next packet in buffer */ int nextPacket(AmRtpPacket& p); -#else - AmJitterBuffer *m_main_jb; - AmJitterBuffer *m_telephone_event_jb; - - /* get next packet in buffer */ - int nextAudioPacket(AmRtpPacket& p, unsigned int ts, unsigned int ms); -#endif AmSession* session; @@ -167,14 +150,9 @@ public: unsigned char* buffer, unsigned int size ); -#ifndef USE_ADAPTIVE_JB int receive( unsigned char* buffer, unsigned int size, - unsigned int& ts, unsigned int audio_buffer_ts); -#else - int receive( unsigned char* buffer, unsigned int size, unsigned int *ts, - unsigned int audio_buffer_ts, unsigned int ms); -#endif + unsigned int& ts); /** Allocates resources for future use of RTP. */ AmRtpStream(AmSession* _s=0); diff --git a/core/AmSession.cpp b/core/AmSession.cpp index 31dafe97..6ac1911d 100644 --- a/core/AmSession.cpp +++ b/core/AmSession.cpp @@ -37,6 +37,7 @@ #include "AmSessionContainer.h" #include "AmMediaProcessor.h" #include "AmDtmfDetector.h" +#include "AmPlayoutBuffer.h" #include "log.h" diff --git a/core/plug-in/echo/Echo.cpp b/core/plug-in/echo/Echo.cpp index 42814e96..795d8afc 100644 --- a/core/plug-in/echo/Echo.cpp +++ b/core/plug-in/echo/Echo.cpp @@ -62,7 +62,7 @@ AmSession* EchoFactory::onInvite(const AmSipRequest& req) } EchoDialog::EchoDialog() - : adaptive_playout(false) + : playout_type(SIMPLE_PLAYOUT) { } @@ -85,10 +85,18 @@ void EchoDialog::onDtmf(int event, int duration) { #ifdef STAR_SWITCHES_PLAYOUTBUFFER if (event == 10) { - adaptive_playout = !adaptive_playout; - DBG("received *. set adaptive playout to %s.\n", adaptive_playout ? "true":"false"); - - rtp_str.setAdaptivePlayout(adaptive_playout); + char* pt = "simple (fifo) playout buffer"; + if (playout_type == SIMPLE_PLAYOUT) { + playout_type = ADAPTIVE_PLAYOUT; + pt = "adaptive playout buffer"; + } else if (playout_type == ADAPTIVE_PLAYOUT) { + pt = "adaptive jitter buffer"; + playout_type = JB_PLAYOUT; + } else + playout_type = SIMPLE_PLAYOUT; + DBG("received *. set playout technique to %s.\n", pt); + + rtp_str.setPlayoutType(playout_type); } #endif } diff --git a/core/plug-in/echo/Echo.h b/core/plug-in/echo/Echo.h index 3a5e1d9c..c4e0e565 100644 --- a/core/plug-in/echo/Echo.h +++ b/core/plug-in/echo/Echo.h @@ -48,7 +48,7 @@ public: class EchoDialog : public AmSession { AmAudioEcho echo; - bool adaptive_playout; + PlayoutType playout_type; public: EchoDialog(); ~EchoDialog();