From 715acd65b3dda52ef0efd2eb910507c02e018f27 Mon Sep 17 00:00:00 2001 From: Lyubomir Marinov Date: Fri, 7 May 2010 15:33:40 +0000 Subject: [PATCH] Allows controlling the number of RTP packets sent per millisecond in the video stream of a call. --- .../impl/neomedia/MediaStreamImpl.java | 110 ++++-- .../neomedia/RTPConnectorOutputStream.java | 345 +++++++++++++++++- .../impl/neomedia/VideoMediaStreamImpl.java | 21 ++ .../codec/video/h264/DePacketizer.java | 22 +- 4 files changed, 435 insertions(+), 63 deletions(-) diff --git a/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java b/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java index 0b4451415..67a81a13a 100644 --- a/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java +++ b/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java @@ -20,6 +20,7 @@ import com.sun.media.rtp.*; +import net.java.sip.communicator.impl.neomedia.*; import net.java.sip.communicator.impl.neomedia.device.*; import net.java.sip.communicator.impl.neomedia.format.*; import net.java.sip.communicator.impl.neomedia.transform.*; @@ -172,11 +173,6 @@ else if (MediaDeviceSession */ private boolean mute = false; - /** - * The current ZrtpControl - */ - private ZrtpControlImpl zrtpControl = null; - /** * The map of currently active RTPExtensions and the IDs that they * have been assigned for the lifetime of this MediaStream. @@ -196,6 +192,11 @@ else if (MediaDeviceSession protected Map advancedAttributes = new Hashtable(); + /** + * The current ZrtpControl. + */ + private final ZrtpControlImpl zrtpControl; + /** * Needed when restarting zrtp control. */ @@ -214,30 +215,53 @@ else if (MediaDeviceSession * @param zrtpControl a control which is already created, used to control * the zrtp operations. */ - public MediaStreamImpl(StreamConnector connector, MediaDevice device, - ZrtpControlImpl zrtpControl) + public MediaStreamImpl( + StreamConnector connector, + MediaDevice device, + ZrtpControlImpl zrtpControl) { /* - * XXX Set the device early in order to make sure that its of the right - * type because we do not support just about any MediaDevice yet. + * XXX Set the device early in order to make sure that it is of the + * right type because we do not support just about any MediaDevice yet. */ setDevice(device); - this.rtpConnector = new RTPTransformConnector(connector); + rtpConnector + = new RTPTransformConnector(connector) + { + @Override + protected TransformOutputStream createDataOutputStream() + throws IOException + { + TransformOutputStream dataOutputStream + = super.createDataOutputStream(); - if(zrtpControl != null) - { - this.zrtpControl = zrtpControl; - } - else - this.zrtpControl = new ZrtpControlImpl(); + if (dataOutputStream != null) + configureDataOutputStream(dataOutputStream); + return dataOutputStream; + } + }; + this.zrtpControl + = (zrtpControl == null) ? new ZrtpControlImpl() : zrtpControl; this.zrtpControl.setConnector(rtpConnector); - //register the transform engines that we will be using in this stream. - TransformEngineChain engineChain = createTransformEngineChain(); + // Register the transform engines that we will be using in this stream. + rtpConnector.setEngine(createTransformEngineChain()); + } - rtpConnector.setEngine(engineChain); + /** + * Performs any optional configuration on a specific + * RTPConnectorOuputStream of an RTPManager to be used by + * this MediaStreamImpl. Allows extenders to override. + * + * @param dataOutputStream the RTPConnectorOutputStream to be used + * by an RTPManager of this MediaStreamImpl and to be + * configured + */ + protected void configureDataOutputStream( + RTPConnectorOutputStream dataOutputStream) + { } /** @@ -269,25 +293,27 @@ protected void configureRTPManagerBufferControl( private TransformEngineChain createTransformEngineChain() { ArrayList engineChain - = new ArrayList(3); + = new ArrayList(3); - //CSRCs and audio levels - if(csrcEngine == null) + // CSRCs and audio levels + if (csrcEngine == null) csrcEngine = new CsrcTransformEngine(this); engineChain.add(csrcEngine); - //DTMF + // DTMF DtmfTransformEngine dtmfEngine = createDtmfTransformEngine(); - if(dtmfEngine != null) + if (dtmfEngine != null) engineChain.add(dtmfEngine); - //ZRTP + // ZRTP engineChain.add(zrtpControl.getZrtpEngine()); - return new TransformEngineChain( engineChain.toArray( - new TransformEngine[engineChain.size()])); + return + new TransformEngineChain( + engineChain.toArray( + new TransformEngine[engineChain.size()])); } /** @@ -896,15 +922,21 @@ public ZrtpControl getZrtpControl() */ private void restartZrtpControl() { - // if there is no current secure communication we don't need to do that + /* + * If there is no current secure communication, we don't need to do + * that. + */ if(!zrtpControl.getSecureCommunicationStatus()) return; zrtpControl.cleanup(); - // as we are recreating this stream and it was obviously secured - // it may happen we receive unencrepted data and we will hear - // noise, so we mute it till secure connection is again established + /* + * As we are recreating this stream and it was obviously secured, it may + * happen so that we receive unencrepted data. Which will produce noise + * for us to hear. So we mute it till a secure connection is again + * established. + */ zrtpControl.getZrtpEngine().setStartMuted(true); this.zrtpControl.setConnector(rtpConnector); @@ -1657,16 +1689,16 @@ else if (event instanceof TimeoutEvent) { ReceiveStream receiveStream = event.getReceiveStream(); - // if we recreate streams we will already have restarted - // zrtp control - // but when on the other end some one has recreated his streams - // we will received a ByeEvent(TimeoutEvent) and so we must also - // restart our zrtp, this happens when we are already in a call - // and the other party starts an conf call + /* + * If we recreate streams, we will already have restarted + * zrtpControl. But when on the other end someone recreates his + * streams, we will receive a ByeEvent (which extends TimeoutEvent) + * and then we must also restart our ZRTP. This happens, for + * example, when we are already in a call and the remote peer + * converts his side of the call into a conference call. + */ if(!zrtpRestarted) - { restartZrtpControl(); - } if (receiveStream != null) { diff --git a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java index 84fd2251d..9e866ea85 100755 --- a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java +++ b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java @@ -13,12 +13,33 @@ import javax.media.rtp.*; /** + * * @author Bing SU (nova.su@gmail.com) * @author Lubomir Marinov */ public class RTPConnectorOutputStream implements OutputDataStream { + + /** + * The maximum number of packets to be sent to be kept in the queue of + * MaxPacketsPerMillisPolicy. When the maximum is reached, the next + * attempt to write a new packet in the queue will block until at least one + * packet from the queue is sent. Defined in order to prevent + * OutOfMemoryErrors which, technically, may arise if the capacity + * of the queue is unlimited. + */ + private static final int + MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY + = 256; + + /** + * The functionality which allows this OutputDataStream to control + * how many RTP packets it sends through its DatagramSocket per a + * specific number of milliseconds. + */ + private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy; + /** * UDP socket used to send packet data */ @@ -77,8 +98,8 @@ protected RawPacket createRawPacket(byte[] buffer, int offset, int length) * * @param remoteAddr target ip address * @param remotePort target port - * @return true if the target is in stream target list and can be removed - * false if not + * @return true if the target is in stream target list and can be + * removed; false, otherwise */ public boolean removeTarget(InetAddress remoteAddr, int remotePort) { @@ -105,37 +126,321 @@ public void removeTargets() targets.clear(); } - /* - * Implements OutputDataStream#write(byte[], int, int). + /** + * Sends a specific RTP packet through the DatagramSocket of this + * OutputDataSource. + * + * @param packet the RTP packet to be sent through the + * DatagramSocket of this OutputDataSource + * @return true if the specified packet was successfully + * sent; otherwise, false */ - public int write(byte[] buffer, int offset, int length) + private boolean send(RawPacket packet) { - RawPacket pkt = createRawPacket(buffer, offset, length); - - /* - * If we got extended, the delivery of the packet may have been - * canceled. - */ - if (pkt == null) - return length; - for (InetSocketAddress target : targets) + { try { - socket - .send( + socket.send( new DatagramPacket( - pkt.getBuffer(), - pkt.getOffset(), - pkt.getLength(), + packet.getBuffer(), + packet.getOffset(), + packet.getLength(), target.getAddress(), target.getPort())); } catch (IOException ex) { // TODO error handling - return -1; + return false; } + } + return true; + } + + /** + * Sets the maximum number of RTP packets to be sent by this + * OutputDataStream through its DatagramSocket per + * a specific number of milliseconds. + * + * @param maxPackets the maximum number of RTP packets to be sent by this + * OutputDataStream through its DatagramSocket per the + * specified number of milliseconds; -1 if no maximum is to be set + * @param perMillis the number of milliseconds per which maxPackets + * are to be sent by this OutputDataStream through its + * DatagramSocket + */ + public void setMaxPacketsPerMillis(int maxPackets, long perMillis) + { + if (maxPacketsPerMillisPolicy == null) + { + if (maxPackets > 0) + { + if (perMillis < 1) + throw new IllegalArgumentException("perMillis"); + + maxPacketsPerMillisPolicy + = new MaxPacketsPerMillisPolicy(maxPackets, perMillis); + } + } + else + { + maxPacketsPerMillisPolicy + .setMaxPacketsPerMillis(maxPackets, perMillis); + } + } + + /** + * Implements {@link OutputDataStream#write(byte[], int, int)}. + * + * @param buffer + * @param offset + * @param length + * @return + */ + public int write(byte[] buffer, int offset, int length) + { + if (maxPacketsPerMillisPolicy != null) + { + byte[] newBuffer = new byte[length]; + + System.arraycopy(buffer, offset, newBuffer, 0, length); + buffer = newBuffer; + offset = 0; + } + + RawPacket packet = createRawPacket(buffer, offset, length); + + /* + * If we got extended, the delivery of the packet may have been + * canceled. + */ + if (packet != null) + { + if (maxPacketsPerMillisPolicy == null) + { + if (!send(packet)) + return -1; + } + else + maxPacketsPerMillisPolicy.write(packet); + } return length; } + + /** + * Implements the functionality which allows this OutputDataStream + * to control how many RTP packets it sends through its + * DatagramSocket per a specific number of milliseconds. + */ + private class MaxPacketsPerMillisPolicy + { + + /** + * The maximum number of RTP packets to be sent by this + * OutputDataStream through its DatagramSocket per + * {@link #perMillis} milliseconds. + */ + private int maxPackets = -1; + + /** + * The time stamp in nanoseconds of the start of the current + * perNanos interval. + */ + private long millisStartTime = 0; + + /** + * The list of RTP packets to be sent through the + * DatagramSocket of this OutputDataSource. + */ + private final List packetQueue + = new LinkedList(); + + /** + * The number of RTP packets already sent during the current + * perNanos interval. + */ + private long packetsSentInMillis = 0; + + /** + * The time interval in nanoseconds during which {@link #maxPackets} + * number of RTP packets are to be sent through the + * DatagramSocket of this OutputDataSource. + */ + private long perNanos = -1; + + /** + * The Thread which is to send the RTP packets in + * {@link #packetQueue} through the DatagramSocket of this + * OutputDataSource. + */ + private Thread sendThread; + + /** + * Initializes a new MaxPacketsPerMillisPolicy instance which + * is to control how many RTP packets this OutputDataSource is + * to send through its DatagramSocket per a specific number of + * milliseconds. + * + * @param maxPackets the maximum number of RTP packets to be sent per + * perMillis milliseconds through the DatagramSocket + * of this OutputDataStream + * @param perMillis the number of milliseconds per which a maximum of + * maxPackets RTP packets are to be sent through the + * DatagramSocket of this OutputDataStream + */ + public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis) + { + setMaxPacketsPerMillis(maxPackets, perMillis); + } + + /** + * Sends the RTP packets in {@link #packetQueue} in accord with + * {@link #maxPackets} and {@link #perMillis}. + */ + private void runInSendThread() + { + try + { + while (true) + { + RawPacket packet; + + synchronized (packetQueue) + { + while (packetQueue.size() < 1) + { + try + { + packetQueue.wait(); + } + catch (InterruptedException iex) + { + } + } + + packet = packetQueue.remove(0); + packetQueue.notifyAll(); + } + + long time = System.nanoTime(); + long millisRemainingTime = time - millisStartTime; + + if ((perNanos < 1) + || (millisRemainingTime >= perNanos)) + { + millisStartTime = time; + packetsSentInMillis = 0; + } + else if ((maxPackets > 0) + && (packetsSentInMillis >= maxPackets)) + { + while (true) + { + millisRemainingTime + = System.nanoTime() - millisStartTime; + if (millisRemainingTime >= perNanos) + break; + try + { + Thread.sleep( + millisRemainingTime / 1000000, + (int) (millisRemainingTime % 1000000)); + } + catch (InterruptedException iex) + { + } + } + millisStartTime = System.nanoTime(); + packetsSentInMillis = 0; + } + + send(packet); + packetsSentInMillis++; + } + } + finally + { + synchronized (packetQueue) + { + if (Thread.currentThread().equals(sendThread)) + sendThread = null; + } + } + } + + /** + * Sets the maximum number of RTP packets to be sent by this + * OutputDataStream through its DatagramSocket per + * a specific number of milliseconds. + * + * @param maxPackets the maximum number of RTP packets to be sent by + * this OutputDataStream through its DatagramSocket + * per the specified number of milliseconds; -1 if no maximum + * is to be set + * @param perMillis the number of milliseconds per which + * maxPackets are to be sent by this OutputDataStream + * through its DatagramSocket + */ + public void setMaxPacketsPerMillis(int maxPackets, long perMillis) + { + if (maxPackets < 1) + { + this.maxPackets = -1; + this.perNanos = -1; + } + else + { + if (perMillis < 1) + throw new IllegalArgumentException("perMillis"); + + this.maxPackets = maxPackets; + this.perNanos = perMillis * 1000000; + } + } + + /** + * Queues a specific RTP packet to be sent through the + * DatagramSocket of this OutputDataStream. + * + * @param packet the RTP packet to be queued for sending through the + * DatagramSocket of this OutputDataStream + */ + public void write(RawPacket packet) + { + synchronized (packetQueue) + { + while (packetQueue.size() + >= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY) + { + try + { + packetQueue.wait(); + } + catch (InterruptedException iex) + { + } + } + + packetQueue.add(packet); + + if (sendThread == null) + { + sendThread + = new Thread(getClass().getName()) + { + @Override + public void run() + { + runInSendThread(); + } + }; + sendThread.setDaemon(true); + sendThread.start(); + } + + packetQueue.notifyAll(); + } + } + } } diff --git a/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java b/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java index 755fcac1b..8c0e9471c 100644 --- a/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java +++ b/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java @@ -16,6 +16,7 @@ import javax.media.protocol.*; import javax.media.rtp.*; +import net.java.sip.communicator.impl.neomedia.*; import net.java.sip.communicator.impl.neomedia.codec.*; import net.java.sip.communicator.impl.neomedia.device.*; import net.java.sip.communicator.service.neomedia.*; @@ -268,6 +269,24 @@ public void addVideoListener(VideoListener listener) videoNotifierSupport.addVideoListener(listener); } + /** + * Performs any optional configuration on a specific + * RTPConnectorOuputStream of an RTPManager to be used by + * this MediaStreamImpl. + * + * @param dataOutputStream the RTPConnectorOutputStream to be used + * by an RTPManager of this MediaStreamImpl and to be + * configured + */ + @Override + protected void configureDataOutputStream( + RTPConnectorOutputStream dataOutputStream) + { + super.configureDataOutputStream(dataOutputStream); + + dataOutputStream.setMaxPacketsPerMillis(1, 10); + } + /** * Performs any optional configuration on the BufferControl of the * specified RTPManager which is to be used as the @@ -283,6 +302,8 @@ protected void configureRTPManagerBufferControl( RTPManager rtpManager, BufferControl bufferControl) { + super.configureRTPManagerBufferControl(rtpManager, bufferControl); + bufferControl.setBufferLength(BufferControl.MAX_VALUE); } diff --git a/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java b/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java index 0ae4e3127..a380732d5 100644 --- a/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java +++ b/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java @@ -46,6 +46,18 @@ public class DePacketizer */ private static final byte[] NAL_PREFIX = { 0, 0, 1 }; + /** + * The indicator which determines whether incomplete NAL units are output + * from the H.264 DePacketizer to the decoder. It is advisable to + * output incomplete NAL units because the FFmpeg H.264 decoder is able to + * decode them. If false, incomplete NAL units will be discarded + * and, consequently, the video quality will be worse (e.g. if the last RTP + * packet of a fragmented NAL unit carrying a keyframe does not arrive from + * the network, the whole keyframe will be discarded and thus all NAL units + * upto the next keyframe will be useless). + */ + private static final boolean OUTPUT_INCOMPLETE_NAL_UNITS = true; + /** * Interval between a PLI request and its reemission (in milliseconds). */ @@ -99,8 +111,8 @@ public class DePacketizer private long remoteSSRC = -1; /** - * Use or not RTCP PLI message when depacketizer miss - * packets. + * The indicator which determines whether RTCP PLI is to be used when this + * DePacketizer detects that video data has been lost. */ private boolean usePLI = false; @@ -475,7 +487,8 @@ private int reset(Buffer outBuffer) * are only given meaning for the purposes of the network and not the * H.264 decoder. */ - if (fuaStartedAndNotEnded + if (OUTPUT_INCOMPLETE_NAL_UNITS + && fuaStartedAndNotEnded && (outBuffer.getLength() >= (NAL_PREFIX.length + 1 + 1))) { Object outData = outBuffer.getData(); @@ -549,8 +562,9 @@ public void setRtcpFeedbackPLI(boolean use) private class PLISendThread extends Thread { /** - * Entry point of the thread. + * Represents the entry point of PLISendThread. */ + @Override public void run() { while(isPLIThreadRunning)