/* * Copyright (C) 2002-2003 Fhg Fokus * * This file is part of SEMS, a free SIP media server. * * SEMS is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. This program is released under * the GPL with the additional exemption that compiling, linking, * and/or using OpenSSL is allowed. * * For a license to use the SEMS software under conditions * other than those described here, or to purchase support for this * software, please contact iptel.org by e-mail at the following addresses: * info@iptel.org * * SEMS is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "AmRtpStream.h" #include "AmRtpPacket.h" #include "AmRtpReceiver.h" #include "AmConfig.h" #include "AmPlugIn.h" #include "AmAudio.h" #include "AmUtils.h" #include "AmSession.h" #include "AmDtmfDetector.h" #include "rtp/telephone_event.h" #include "amci/codecs.h" #include "AmJitterBuffer.h" #include "sip/resolver.h" #include "sip/ip_util.h" #include "sip/raw_sender.h" #include "sip/msg_logger.h" #include "log.h" #include #include #include #include #include #include #include #include #include #ifdef WITH_ZRTP #include "libzrtp/zrtp.h" #endif #include "rtp/rtp.h" #include using std::set; void PayloadMask::clear() { memset(bits, 0, sizeof(bits)); } void PayloadMask::set_all() { memset(bits, 0xFF, sizeof(bits)); } void PayloadMask::invert() { // assumes that bits[] contains 128 bits unsigned long long* ull = (unsigned long long*)bits; ull[0] = ~ull[0]; ull[1] = ~ull[1]; } PayloadMask::PayloadMask(const PayloadMask &src) { memcpy(bits, src.bits, sizeof(bits)); } ////////////////////////////////////////////////////////////////////////////////////////////////////////// /* * This function must be called before setLocalPort, because * setLocalPort will bind the socket and it will be not * possible to change the IP later */ void AmRtpStream::setLocalIP(const string& ip) { if (!am_inet_pton(ip.c_str(), &l_saddr)) { throw string ("AmRtpStream::setLocalIP: Invalid IP address: ") + ip; } DBG("ip = %s\n",ip.c_str()); } int AmRtpStream::hasLocalSocket() { return l_sd; } int AmRtpStream::getLocalSocket() { if (l_sd) return l_sd; int sd=0, rtcp_sd=0; if((sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1) { ERROR("%s\n",strerror(errno)); throw string ("while creating new socket."); } if((rtcp_sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1) { ERROR("%s\n",strerror(errno)); throw string ("while creating new socket."); } int true_opt = 1; if(ioctl(sd, FIONBIO , &true_opt) == -1){ ERROR("%s\n",strerror(errno)); close(sd); throw string ("while setting socket non blocking."); } if(ioctl(rtcp_sd, FIONBIO , &true_opt) == -1){ ERROR("%s\n",strerror(errno)); close(sd); throw string ("while setting socket non blocking."); } l_sd = sd; l_rtcp_sd = rtcp_sd; return l_sd; } void AmRtpStream::setLocalPort(unsigned short p) { if(l_port) return; if(l_if < 0) { if (session) l_if = session->getRtpInterface(); else { ERROR("BUG: no session when initializing RTP stream, invalid interface can be used\n"); l_if = 0; } } int retry = 10; unsigned short port = 0; for(;retry; --retry){ if (!getLocalSocket()) return; if(!p) port = AmConfig::RTP_Ifs[l_if].getNextRtpPort(); else port = p; am_set_port(&l_saddr,port+1); if(bind(l_rtcp_sd,(const struct sockaddr*)&l_saddr,SA_len(&l_saddr))) { DBG("bind: %s\n",strerror(errno)); goto try_another_port; } am_set_port(&l_saddr,port); if(bind(l_sd,(const struct sockaddr*)&l_saddr,SA_len(&l_saddr))) { DBG("bind: %s\n",strerror(errno)); goto try_another_port; } // both bind() succeeded! break; try_another_port: close(l_sd); l_sd = 0; close(l_rtcp_sd); l_rtcp_sd = 0; } int true_opt = 1; if (!retry){ ERROR("could not find a free RTP port\n"); throw string("could not find a free RTP port"); } // rco: does that make sense after bind() ???? if(setsockopt(l_sd, SOL_SOCKET, SO_REUSEADDR, (void*)&true_opt, sizeof (true_opt)) == -1) { ERROR("%s\n",strerror(errno)); close(l_sd); l_sd = 0; throw string ("while setting local address reusable."); } l_port = port; l_rtcp_port = port+1; if(!p) { AmRtpReceiver::instance()->addStream(l_sd, this); AmRtpReceiver::instance()->addStream(l_rtcp_sd, this); DBG("added stream [%p] to RTP receiver (%s:%i/%i)\n", this, get_addr_str((sockaddr_storage*)&l_saddr).c_str(),l_port,l_rtcp_port); } memcpy(&l_rtcp_saddr, &l_saddr, sizeof(l_saddr)); am_set_port(&l_rtcp_saddr, l_rtcp_port); } int AmRtpStream::ping() { // TODO: // - we'd better send an empty UDP packet // for this purpose. unsigned char ping_chr[2]; ping_chr[0] = 0; ping_chr[1] = 0; AmRtpPacket rp; rp.payload = payload; rp.marker = true; rp.sequence = sequence++; rp.timestamp = 0; rp.ssrc = l_ssrc; rp.compile((unsigned char*)ping_chr,2); rp.setAddr(&r_saddr); if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx,&l_saddr) < 0){ ERROR("while sending RTP packet.\n"); return -1; } return 2; } int AmRtpStream::compile_and_send(const int payload, bool marker, unsigned int ts, unsigned char* buffer, unsigned int size) { AmRtpPacket rp; rp.payload = payload; rp.timestamp = ts; rp.marker = marker; rp.sequence = sequence++; rp.ssrc = l_ssrc; rp.compile((unsigned char*)buffer,size); rp.setAddr(&r_saddr); #ifdef WITH_ZRTP if (session && session->enable_zrtp){ if (NULL == session->zrtp_session_state.zrtp_audio) { ERROR("ZRTP enabled on session, but no audio stream created\n"); return -1; } unsigned int size = rp.getBufferSize(); zrtp_status_t status = zrtp_process_rtp(session->zrtp_session_state.zrtp_audio, (char*)rp.getBuffer(), &size); switch (status) { case zrtp_status_drop: { DBG("ZRTP says: drop packet! %u - %u\n", size, rp.getBufferSize()); return 0; } case zrtp_status_ok: { // DBG("ZRTP says: ok!\n"); if (rp.getBufferSize() != size) // DBG("SEND packet size before: %d, after %d\n", // rp.getBufferSize(), size); rp.setBufferSize(size); } break; default: case zrtp_status_fail: { DBG("ZRTP says: fail!\n"); // DBG("(f)"); return 0; } } } #endif if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr) < 0){ ERROR("while sending RTP packet.\n"); return -1; } if (logger) rp.logSent(logger, &l_saddr); return size; } int AmRtpStream::send( unsigned int ts, unsigned char* buffer, unsigned int size ) { if ((mute) || (hold)) return 0; if(remote_telephone_event_pt.get()) dtmf_sender.sendPacket(ts,remote_telephone_event_pt->payload_type,this); if(!size) return -1; PayloadMappingTable::iterator it = pl_map.find(payload); if ((it == pl_map.end()) || (it->second.remote_pt < 0)) { ERROR("sending packet with unsupported remote payload type %d\n", payload); return -1; } return compile_and_send(it->second.remote_pt, false, ts, buffer, size); } int AmRtpStream::send_raw( char* packet, unsigned int length ) { if ((mute) || (hold)) return 0; AmRtpPacket rp; rp.compile_raw((unsigned char*)packet, length); rp.setAddr(&r_saddr); if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr) < 0){ ERROR("while sending raw RTP packet.\n"); return -1; } if (logger) rp.logSent(logger, &l_saddr); return length; } // returns // @param ts [out] timestamp of the received packet, // in audio buffer relative time // @param audio_buffer_ts [in] current ts at the audio_buffer int AmRtpStream::receive( unsigned char* buffer, unsigned int size, unsigned int& ts, int &out_payload) { AmRtpPacket* rp = NULL; int err = nextPacket(rp); if(err <= 0) return err; if (!rp) return 0; handleSymmetricRtp(&rp->addr,false); /* do we have a new talk spurt? */ begin_talk = ((last_payload == 13) || rp->marker); last_payload = rp->payload; if(!rp->getDataSize()) { mem.freePacket(rp); return RTP_EMPTY; } if (rp->payload == getLocalTelephoneEventPT()) { recvDtmfPacket(rp); mem.freePacket(rp); return RTP_DTMF; } assert(rp->getData()); if(rp->getDataSize() > size){ ERROR("received too big RTP packet\n"); mem.freePacket(rp); return RTP_BUFFER_SIZE; } memcpy(buffer,rp->getData(),rp->getDataSize()); ts = rp->timestamp; out_payload = rp->payload; int res = rp->getDataSize(); mem.freePacket(rp); return res; } AmRtpStream::AmRtpStream(AmSession* _s, int _if) : r_port(0), l_if(_if), l_port(0), l_sd(0), r_ssrc_i(false), session(_s), logger(NULL), passive(false), passive_rtcp(false), offer_answer_used(true), active(false), // do not return any data unless something really received mute(false), hold(false), receiving(true), monitor_rtp_timeout(true), relay_stream(NULL), relay_enabled(false), relay_raw(false), sdp_media_index(-1), relay_transparent_ssrc(true), relay_transparent_seqno(true), relay_filter_dtmf(false), force_receive_dtmf(false) { memset(&r_saddr,0,sizeof(struct sockaddr_storage)); memset(&l_saddr,0,sizeof(struct sockaddr_storage)); l_ssrc = get_random(); sequence = get_random(); clearRTPTimeout(); // by default the system codecs payload_provider = AmPlugIn::instance(); } AmRtpStream::~AmRtpStream() { if(l_sd){ if (AmRtpReceiver::haveInstance()){ AmRtpReceiver::instance()->removeStream(l_sd); AmRtpReceiver::instance()->removeStream(l_rtcp_sd); } close(l_sd); close(l_rtcp_sd); } if (logger) dec_ref(logger); } int AmRtpStream::getLocalPort() { // if (hold) // return 0; if(!l_port) setLocalPort(); return l_port; } int AmRtpStream::getLocalRtcpPort() { if(!l_rtcp_port) setLocalPort(); return l_rtcp_port; } int AmRtpStream::getRPort() { return r_port; } string AmRtpStream::getRHost() { return r_host; } void AmRtpStream::setRAddr(const string& addr, unsigned short port, unsigned short rtcp_port) { DBG("RTP remote address set to %s:(%u/%u)\n", addr.c_str(),port,rtcp_port); struct sockaddr_storage ss; memset (&ss, 0, sizeof (ss)); /* inet_aton only supports dot-notation IP address strings... but an RFC * 4566 unicast-address, as found in c=, can be an FQDN (or other!). */ dns_handle dh; if (resolver::instance()->resolve_name(addr.c_str(),&dh,&ss,IPv4) < 0) { WARN("Address not valid (host: %s).\n", addr.c_str()); throw string("invalid address") + addr; } r_host = addr; if(port) r_port = port; if(rtcp_port) r_rtcp_port = rtcp_port; memcpy(&r_saddr,&ss,sizeof(struct sockaddr_storage)); am_set_port(&r_saddr,r_port); mute = ((r_saddr.ss_family == AF_INET) && (SAv4(&r_saddr)->sin_addr.s_addr == INADDR_ANY)) || ((r_saddr.ss_family == AF_INET6) && IN6_IS_ADDR_UNSPECIFIED(&SAv6(&r_saddr)->sin6_addr)); } void AmRtpStream::handleSymmetricRtp(struct sockaddr_storage* recv_addr, bool rtcp) { if((!rtcp && passive) || (rtcp && passive_rtcp)) { struct sockaddr_in* in_recv = (struct sockaddr_in*)recv_addr; struct sockaddr_in6* in6_recv = (struct sockaddr_in6*)recv_addr; struct sockaddr_in* in_addr = (struct sockaddr_in*)&r_saddr; struct sockaddr_in6* in6_addr = (struct sockaddr_in6*)&r_saddr; unsigned short port = am_get_port(recv_addr); // symmetric RTP if ( (!rtcp && (port != r_port)) || (rtcp && (port != r_rtcp_port)) || ((recv_addr->ss_family == AF_INET) && (in_addr->sin_addr.s_addr != in_recv->sin_addr.s_addr)) || ((recv_addr->ss_family == AF_INET6) && (memcmp(&in6_addr->sin6_addr, &in6_recv->sin6_addr, sizeof(struct in6_addr)))) ) { string addr_str = get_addr_str(recv_addr); setRAddr(addr_str, !rtcp ? port : 0, rtcp ? port : 0); DBG("Symmetric %s: setting new remote address: %s:%i\n", !rtcp ? "RTP" : "RTCP", addr_str.c_str(),port); } else { const char* prot = rtcp ? "RTCP" : "RTP"; DBG("Symmetric %s: remote end sends %s from advertised address." " Leaving passive mode.\n",prot,prot); } // avoid comparing each time sender address if(!rtcp) passive = false; else passive_rtcp = false; } } void AmRtpStream::setPassiveMode(bool p) { passive_rtcp = passive = p; if (p) { DBG("The other UA is NATed or passive mode forced: switched to passive mode.\n"); } else { DBG("Passive mode not activated.\n"); } } void AmRtpStream::getSdp(SdpMedia& m) { m.port = getLocalPort(); m.nports = 0; m.transport = TP_RTPAVP; m.send = !hold; m.recv = receiving; m.dir = SdpMedia::DirBoth; } void AmRtpStream::getSdpOffer(unsigned int index, SdpMedia& offer) { sdp_media_index = index; getSdp(offer); offer.payloads.clear(); payload_provider->getPayloads(offer.payloads); } void AmRtpStream::getSdpAnswer(unsigned int index, const SdpMedia& offer, SdpMedia& answer) { sdp_media_index = index; getSdp(answer); offer.calcAnswer(payload_provider,answer); } int AmRtpStream::init(const AmSdp& local, const AmSdp& remote, bool force_passive_mode) { if((sdp_media_index < 0) || ((unsigned)sdp_media_index >= local.media.size()) || ((unsigned)sdp_media_index >= remote.media.size()) ) { ERROR("Media index %i is invalid, either within local or remote SDP (or both)",sdp_media_index); return -1; } const SdpMedia& local_media = local.media[sdp_media_index]; const SdpMedia& remote_media = remote.media[sdp_media_index]; payloads.clear(); pl_map.clear(); payloads.resize(local_media.payloads.size()); int i=0; vector::const_iterator sdp_it = local_media.payloads.begin(); vector::iterator p_it = payloads.begin(); /* first pass on local SDP - fill pl_map with intersection of codecs */ while(sdp_it != local_media.payloads.end()) { int int_pt; if (local_media.transport == TP_RTPAVP && sdp_it->payload_type < 20) { int_pt = sdp_it->payload_type; } else { int_pt = payload_provider->getDynPayload(sdp_it->encoding_name, sdp_it->clock_rate, sdp_it->encoding_param); } amci_payload_t* a_pl = NULL; if(int_pt >= 0) a_pl = payload_provider->payload(int_pt); if (a_pl == NULL) { if (relay_payloads.get(sdp_it->payload_type)) { /* this payload should be relayed, ignore */ ++sdp_it; continue; } else { DBG("No internal payload corresponding to type %s/%i (ignoring)\n", sdp_it->encoding_name.c_str(), sdp_it->clock_rate); /* ignore this payload */ ++sdp_it; continue; } }; p_it->pt = sdp_it->payload_type; p_it->name = sdp_it->encoding_name; p_it->codec_id = a_pl->codec_id; p_it->clock_rate = a_pl->sample_rate; p_it->advertised_clock_rate = sdp_it->clock_rate; p_it->format_parameters = sdp_it->sdp_format_parameters; pl_map[sdp_it->payload_type].index = i; pl_map[sdp_it->payload_type].remote_pt = -1; ++p_it; ++sdp_it; ++i; } /* remove payloads which were not initialised (because of unknown payloads which are to be relayed) */ if (p_it != payloads.end()) payloads.erase(p_it, payloads.end()); /* second pass on remote SDP - initialize payload IDs used by remote (remote_pt) */ sdp_it = remote_media.payloads.begin(); while (sdp_it != remote_media.payloads.end()) { /* TODO: match not only on encoding name * but also on parameters, if necessary * Some codecs define multiple payloads * with different encoding parameters */ PayloadMappingTable::iterator pmt_it = pl_map.end(); if (sdp_it->encoding_name.empty() || (local_media.transport == TP_RTPAVP && sdp_it->payload_type < 20)) { /* must be a static payload */ pmt_it = pl_map.find(sdp_it->payload_type); } else { for (p_it = payloads.begin(); p_it != payloads.end(); ++p_it) { if (!strcasecmp(p_it->name.c_str(),sdp_it->encoding_name.c_str()) && (p_it->advertised_clock_rate == (unsigned int)sdp_it->clock_rate)) { pmt_it = pl_map.find(p_it->pt); break; } } } /* TODO: remove following code once proper * payload matching is implemented * * initialize remote_pt if not already there */ if (pmt_it != pl_map.end() && (pmt_it->second.remote_pt < 0)) { pmt_it->second.remote_pt = sdp_it->payload_type; } ++sdp_it; } if (!l_port) { /* only if socket not yet bound: */ if (session) { setLocalIP(session->localMediaIP()); } else { /* set local address - media c-line having precedence over session c-line */ if (local_media.conn.address.empty()) setLocalIP(local.conn.address); else setLocalIP(local_media.conn.address); } DBG("setting local port to %i",local_media.port); setLocalPort(local_media.port); } setPassiveMode(remote_media.dir == SdpMedia::DirActive || force_passive_mode); /* set remote address - media c-line having precedence over session c-line */ if (remote.conn.address.empty() && remote_media.conn.address.empty()) { WARN("no c= line given globally or in m= section in remote SDP\n"); return -1; } if (remote_media.conn.address.empty()) setRAddr(remote.conn.address, remote_media.port, remote_media.port+1); else setRAddr(remote_media.conn.address, remote_media.port, remote_media.port+1); if (local_media.payloads.empty()) { DBG("local_media.payloads.empty()\n"); return -1; } remote_telephone_event_pt.reset(remote.telephoneEventPayload()); if (remote_telephone_event_pt.get()) { DBG("remote party supports telephone events (pt=%i)\n", remote_telephone_event_pt->payload_type); } else { DBG("remote party doesn't support telephone events\n"); } local_telephone_event_pt.reset(local.telephoneEventPayload()); if(local_media.recv) resume(); else pause(); if (local_media.send && !hold && (remote_media.port != 0) && (((r_saddr.ss_family == AF_INET) && (SAv4(&r_saddr)->sin_addr.s_addr != 0)) || ((r_saddr.ss_family == AF_INET6) && (!IN6_IS_ADDR_UNSPECIFIED(&SAv6(&r_saddr)->sin6_addr)))) ) { mute = false; } else { mute = true; } payload = getDefaultPT(); if(payload < 0) { DBG("could not set a default payload\n"); return -1; } DBG("default payload selected = %i\n",payload); last_payload = payload; active = false; // mark as nothing received yet return 0; } void AmRtpStream::setReceiving(bool r) { DBG("RTP stream instance [%p] set receiving=%s\n", this, r?"true":"false"); receiving = r; } void AmRtpStream::pause() { DBG("RTP Stream instance [%p] pausing (receiving=false)\n", this); receiving = false; #ifdef WITH_ZRTP if (session && session->enable_zrtp) { session->zrtp_session_state.stopStreams(); } #endif } void AmRtpStream::resume() { DBG("RTP Stream instance [%p] resuming (receiving=true, clearing biffers/TS/TO)\n", this); clearRTPTimeout(); receive_mut.lock(); mem.clear(); receive_buf.clear(); while (!rtp_ev_qu.empty()) rtp_ev_qu.pop(); receive_mut.unlock(); receiving = true; #ifdef WITH_ZRTP if (session && session->enable_zrtp) { session->zrtp_session_state.startStreams(get_ssrc()); } #endif } void AmRtpStream::setOnHold(bool on_hold) { hold = on_hold; } bool AmRtpStream::getOnHold() { return hold; } void AmRtpStream::recvDtmfPacket(AmRtpPacket* p) { if (p->payload == getLocalTelephoneEventPT()) { dtmf_payload_t* dpl = (dtmf_payload_t*)p->getData(); DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i; ts=%u session = [%p]\n", dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration),p->timestamp, session); if (session) session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getLocalTelephoneEventRate(), p->timestamp)); } } void AmRtpStream::bufferPacket(AmRtpPacket* p) { clearRTPTimeout(&p->recv_time); if (!receiving) { if (passive) { handleSymmetricRtp(&p->addr,false); } if (force_receive_dtmf) { recvDtmfPacket(p); } mem.freePacket(p); return; } if (relay_enabled) { // todo: ZRTP if (force_receive_dtmf) { recvDtmfPacket(p); } // Relay DTMF packets if current audio payload // is also relayed. // Else, check whether or not we should relay this payload bool is_dtmf_packet = (p->payload == getLocalTelephoneEventPT()); if (relay_raw || (is_dtmf_packet && !active) || relay_payloads.get(p->payload)) { if(active){ DBG("switching to relay-mode\t(ts=%u;stream=%p)\n", p->timestamp,this); active = false; } handleSymmetricRtp(&p->addr,false); if (NULL != relay_stream && (!(relay_filter_dtmf && is_dtmf_packet))) { relay_stream->relay(p); } mem.freePacket(p); return; } } #ifndef WITH_ZRTP // throw away ZRTP packets if(p->version != RTP_VERSION) { mem.freePacket(p); return; } #endif receive_mut.lock(); #ifdef WITH_ZRTP if (session && session->enable_zrtp) { if (NULL == session->zrtp_session_state.zrtp_audio) { WARN("dropping received packet, as there's no ZRTP stream initialized\n"); receive_mut.unlock(); mem.freePacket(p); return; } unsigned int size = p->getBufferSize(); zrtp_status_t status = zrtp_process_srtp(session->zrtp_session_state.zrtp_audio, (char*)p->getBuffer(), &size); switch (status) { case zrtp_status_ok: { p->setBufferSize(size); if (p->parse() < 0) { ERROR("parsing decoded packet!\n"); mem.freePacket(p); } else { if(p->payload == getLocalTelephoneEventPT()) { rtp_ev_qu.push(p); } else { if(!receive_buf.insert(ReceiveBuffer::value_type(p->timestamp,p)).second) { // insert failed mem.freePacket(p); } } } } break; case zrtp_status_drop: { // // This is a protocol ZRTP packet or masked RTP media. // In either case the packet must be dropped to protect your // media codec mem.freePacket(p); } break; case zrtp_status_fail: default: { ERROR("zrtp_status_fail!\n"); // // This is some kind of error - see logs for more information // mem.freePacket(p); } break; } } else { #endif // WITH_ZRTP if(p->payload == getLocalTelephoneEventPT()) { rtp_ev_qu.push(p); } else { if(!receive_buf.insert(ReceiveBuffer::value_type(p->timestamp,p)).second) { // insert failed mem.freePacket(p); } } #ifdef WITH_ZRTP } #endif receive_mut.unlock(); } void AmRtpStream::clearRTPTimeout(struct timeval* recv_time) { memcpy(&last_recv_time, recv_time, sizeof(struct timeval)); } void AmRtpStream::clearRTPTimeout() { gettimeofday(&last_recv_time,NULL); } int AmRtpStream::getDefaultPT() { for(PayloadCollection::iterator it = payloads.begin(); it != payloads.end(); ++it){ // skip telephone-events payload if(it->codec_id == CODEC_TELEPHONE_EVENT) continue; // skip incompatible payloads PayloadMappingTable::iterator pl_it = pl_map.find(it->pt); if ((pl_it == pl_map.end()) || (pl_it->second.remote_pt < 0)) continue; return it->pt; } return -1; } int AmRtpStream::nextPacket(AmRtpPacket*& p) { if (!receiving && !passive) return RTP_EMPTY; struct timeval now; struct timeval diff; gettimeofday(&now,NULL); receive_mut.lock(); timersub(&now,&last_recv_time,&diff); if(monitor_rtp_timeout && AmConfig::DeadRtpTime && (diff.tv_sec > 0) && ((unsigned int)diff.tv_sec > AmConfig::DeadRtpTime)){ WARN("RTP Timeout detected. Last received packet is too old " "(diff.tv_sec = %i\n",(unsigned int)diff.tv_sec); receive_mut.unlock(); return RTP_TIMEOUT; } if(!rtp_ev_qu.empty()) { // first return RTP telephone event payloads p = rtp_ev_qu.front(); rtp_ev_qu.pop(); receive_mut.unlock(); return 1; } if(receive_buf.empty()){ receive_mut.unlock(); return RTP_EMPTY; } p = receive_buf.begin()->second; receive_buf.erase(receive_buf.begin()); receive_mut.unlock(); return 1; } AmRtpPacket *AmRtpStream::reuseBufferedPacket() { AmRtpPacket *p = NULL; receive_mut.lock(); if(!receive_buf.empty()) { p = receive_buf.begin()->second; receive_buf.erase(receive_buf.begin()); } receive_mut.unlock(); return p; } void AmRtpStream::recvPacket(int fd) { if(fd == l_rtcp_sd){ recvRtcpPacket(); return; } AmRtpPacket* p = mem.newPacket(); if (!p) p = reuseBufferedPacket(); if (!p) { DBG("out of buffers for RTP packets, dropping (stream [%p])\n", this); // drop received data AmRtpPacket dummy; dummy.recv(l_sd); return; } if(p->recv(l_sd) > 0){ int parse_res = 0; if (logger) p->logReceived(logger, &l_saddr); gettimeofday(&p->recv_time,NULL); if(!relay_raw #ifdef WITH_ZRTP && !(session && session->enable_zrtp) #endif ) { parse_res = p->parse(); } if (parse_res == -1) { DBG("error while parsing RTP packet.\n"); clearRTPTimeout(&p->recv_time); mem.freePacket(p); } else { bufferPacket(p); } } else { mem.freePacket(p); } } void AmRtpStream::recvRtcpPacket() { struct sockaddr_storage recv_addr; socklen_t recv_addr_len = sizeof(recv_addr); unsigned char buffer[4096]; int recved_bytes = recvfrom(l_rtcp_sd,buffer,sizeof(buffer),0, (struct sockaddr*)&recv_addr, &recv_addr_len); if(recved_bytes < 0) { if((errno != EINTR) && (errno != EAGAIN)) { ERROR("rtcp recv(%d): %s",l_rtcp_sd,strerror(errno)); } return; } else if(!recved_bytes) return; static const cstring empty; if (logger) logger->log((const char *)buffer, recved_bytes, &recv_addr, &l_rtcp_saddr, empty); // clear RTP timer clearRTPTimeout(); handleSymmetricRtp(&recv_addr,true); if(!relay_enabled || !relay_stream || !relay_stream->l_sd) return; if((size_t)recved_bytes > sizeof(buffer)) { ERROR("recved huge RTCP packet (%d)",recved_bytes); return; } struct sockaddr_storage rtcp_raddr; memcpy(&rtcp_raddr,&relay_stream->r_saddr,sizeof(rtcp_raddr)); am_set_port(&rtcp_raddr, relay_stream->r_rtcp_port); int err; if(AmConfig::UseRawSockets) { err = raw_sender::send((char*)buffer,recved_bytes, AmConfig::RTP_Ifs[l_if].NetIfIdx, &relay_stream->l_saddr, &rtcp_raddr); } else { err = sendto(relay_stream->l_rtcp_sd,buffer,recved_bytes,0, (const struct sockaddr *)&rtcp_raddr, SA_len(&rtcp_raddr)); } if(err < 0){ ERROR("could not relay RTCP packet: %s\n",strerror(errno)); return; } if (logger) logger->log((const char *)buffer, recved_bytes, &relay_stream->l_rtcp_saddr, &rtcp_raddr, empty); } void AmRtpStream::relay(AmRtpPacket* p) { // not yet initialized // or muted/on-hold if (!l_port || mute || hold) return; if(session && !session->onBeforeRTPRelay(p,&r_saddr)) return; rtp_hdr_t* hdr = (rtp_hdr_t*)p->getBuffer(); if (!relay_raw && !relay_transparent_seqno) hdr->seq = htons(sequence++); if (!relay_raw && !relay_transparent_ssrc) hdr->ssrc = htonl(l_ssrc); p->setAddr(&r_saddr); if(p->send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr) < 0){ ERROR("while sending RTP packet to '%s':%i\n", get_addr_str(&r_saddr).c_str(),am_get_port(&r_saddr)); } else { if (logger) p->logSent(logger, &l_saddr); if(session) session->onAfterRTPRelay(p,&r_saddr); } } int AmRtpStream::getLocalTelephoneEventRate() { if (local_telephone_event_pt.get()) return local_telephone_event_pt->clock_rate; return 0; } int AmRtpStream::getLocalTelephoneEventPT() { if(local_telephone_event_pt.get()) return local_telephone_event_pt->payload_type; return -1; } void AmRtpStream::setPayloadProvider(AmPayloadProvider* pl_prov) { payload_provider = pl_prov; } void AmRtpStream::sendDtmf(int event, unsigned int duration_ms) { dtmf_sender.queueEvent(event,duration_ms,getLocalTelephoneEventRate()); } void AmRtpStream::setRelayStream(AmRtpStream* stream) { relay_stream = stream; DBG("set relay stream [%p] for RTP instance [%p]\n", stream, this); } void AmRtpStream::setRelayPayloads(const PayloadMask &_relay_payloads) { relay_payloads = _relay_payloads; } void AmRtpStream::enableRtpRelay() { DBG("enabled RTP relay for RTP stream instance [%p]\n", this); relay_enabled = true; } void AmRtpStream::disableRtpRelay() { DBG("disabled RTP relay for RTP stream instance [%p]\n", this); relay_enabled = false; } void AmRtpStream::enableRawRelay() { DBG("enabled RAW relay for RTP stream instance [%p]\n", this); relay_raw = true; } void AmRtpStream::disableRawRelay() { DBG("disabled RAW relay for RTP stream instance [%p]\n", this); relay_raw = false; } void AmRtpStream::setRtpRelayTransparentSeqno(bool transparent) { DBG("%sabled RTP relay transparent seqno for RTP stream instance [%p]\n", transparent ? "en":"dis", this); relay_transparent_seqno = transparent; } void AmRtpStream::setRtpRelayTransparentSSRC(bool transparent) { DBG("%sabled RTP relay transparent SSRC for RTP stream instance [%p]\n", transparent ? "en":"dis", this); relay_transparent_ssrc = transparent; } void AmRtpStream::setRtpRelayFilterRtpDtmf(bool filter) { DBG("%sabled RTP relay filtering of RTP DTMF (2833 / 4733) for RTP stream instance [%p]\n", filter ? "en":"dis", this); relay_filter_dtmf = filter; } void AmRtpStream::stopReceiving() { if (hasLocalSocket()){ DBG("remove stream [%p] from RTP receiver\n", this); AmRtpReceiver::instance()->removeStream(getLocalSocket()); if (l_rtcp_sd > 0) AmRtpReceiver::instance()->removeStream(l_rtcp_sd); } } void AmRtpStream::resumeReceiving() { if (hasLocalSocket()){ DBG("add/resume stream [%p] into RTP receiver\n",this); AmRtpReceiver::instance()->addStream(getLocalSocket(), this); if (l_rtcp_sd > 0) AmRtpReceiver::instance()->addStream(l_rtcp_sd, this); } } string AmRtpStream::getPayloadName(int payload_type) { for(PayloadCollection::iterator it = payloads.begin(); it != payloads.end(); ++it){ if (it->pt == payload_type) return it->name; } return string(""); } PacketMem::PacketMem() : cur_idx(0), n_used(0) { memset(used, 0, sizeof(used)); } inline AmRtpPacket* PacketMem::newPacket() { if(n_used >= MAX_PACKETS) return NULL; // full while(used[cur_idx]) cur_idx = (cur_idx + 1) & MAX_PACKETS_MASK; used[cur_idx] = true; n_used++; AmRtpPacket* p = &packets[cur_idx]; cur_idx = (cur_idx + 1) & MAX_PACKETS_MASK; return p; } inline void PacketMem::freePacket(AmRtpPacket* p) { if (!p) return; int idx = p-packets; assert(idx >= 0); assert(idx < MAX_PACKETS); if(!used[idx]) { ERROR("freePacket() double free: n_used = %d, idx = %d",n_used,idx); return; } used[p-packets] = false; n_used--; } inline void PacketMem::clear() { memset(used, 0, sizeof(used)); n_used = cur_idx = 0; } void AmRtpStream::setLogger(msg_logger* _logger) { if (logger) dec_ref(logger); logger = _logger; if (logger) inc_ref(logger); } void AmRtpStream::debug() { #define BOOL_STR(b) ((b) ? "yes" : "no") if(hasLocalSocket() > 0) { DBG("\t<%i> <-> <%s:%i>", getLocalPort(), getRHost().c_str(), getRPort()); } else { DBG("\t <-> <%s:%i>", getRHost().c_str(), getLocalPort()); } if (relay_stream) { DBG("\tinternal relay to stream %p (local port %i)", relay_stream, relay_stream->getLocalPort()); } else DBG("\tno relay"); DBG("\tmute: %s, hold: %s, receiving: %s", BOOL_STR(mute), BOOL_STR(hold), BOOL_STR(receiving)); #undef BOOL_STR }