diff --git a/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java b/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java
index 0b4451415..67a81a13a 100644
--- a/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java
+++ b/src/net/java/sip/communicator/impl/neomedia/MediaStreamImpl.java
@@ -20,6 +20,7 @@
import com.sun.media.rtp.*;
+import net.java.sip.communicator.impl.neomedia.*;
import net.java.sip.communicator.impl.neomedia.device.*;
import net.java.sip.communicator.impl.neomedia.format.*;
import net.java.sip.communicator.impl.neomedia.transform.*;
@@ -172,11 +173,6 @@ else if (MediaDeviceSession
*/
private boolean mute = false;
- /**
- * The current ZrtpControl
- */
- private ZrtpControlImpl zrtpControl = null;
-
/**
* The map of currently active RTPExtensions and the IDs that they
* have been assigned for the lifetime of this MediaStream.
@@ -196,6 +192,11 @@ else if (MediaDeviceSession
protected Map advancedAttributes =
new Hashtable();
+ /**
+ * The current ZrtpControl.
+ */
+ private final ZrtpControlImpl zrtpControl;
+
/**
* Needed when restarting zrtp control.
*/
@@ -214,30 +215,53 @@ else if (MediaDeviceSession
* @param zrtpControl a control which is already created, used to control
* the zrtp operations.
*/
- public MediaStreamImpl(StreamConnector connector, MediaDevice device,
- ZrtpControlImpl zrtpControl)
+ public MediaStreamImpl(
+ StreamConnector connector,
+ MediaDevice device,
+ ZrtpControlImpl zrtpControl)
{
/*
- * XXX Set the device early in order to make sure that its of the right
- * type because we do not support just about any MediaDevice yet.
+ * XXX Set the device early in order to make sure that it is of the
+ * right type because we do not support just about any MediaDevice yet.
*/
setDevice(device);
- this.rtpConnector = new RTPTransformConnector(connector);
+ rtpConnector
+ = new RTPTransformConnector(connector)
+ {
+ @Override
+ protected TransformOutputStream createDataOutputStream()
+ throws IOException
+ {
+ TransformOutputStream dataOutputStream
+ = super.createDataOutputStream();
- if(zrtpControl != null)
- {
- this.zrtpControl = zrtpControl;
- }
- else
- this.zrtpControl = new ZrtpControlImpl();
+ if (dataOutputStream != null)
+ configureDataOutputStream(dataOutputStream);
+ return dataOutputStream;
+ }
+ };
+ this.zrtpControl
+ = (zrtpControl == null) ? new ZrtpControlImpl() : zrtpControl;
this.zrtpControl.setConnector(rtpConnector);
- //register the transform engines that we will be using in this stream.
- TransformEngineChain engineChain = createTransformEngineChain();
+ // Register the transform engines that we will be using in this stream.
+ rtpConnector.setEngine(createTransformEngineChain());
+ }
- rtpConnector.setEngine(engineChain);
+ /**
+ * Performs any optional configuration on a specific
+ * RTPConnectorOuputStream of an RTPManager to be used by
+ * this MediaStreamImpl. Allows extenders to override.
+ *
+ * @param dataOutputStream the RTPConnectorOutputStream to be used
+ * by an RTPManager of this MediaStreamImpl and to be
+ * configured
+ */
+ protected void configureDataOutputStream(
+ RTPConnectorOutputStream dataOutputStream)
+ {
}
/**
@@ -269,25 +293,27 @@ protected void configureRTPManagerBufferControl(
private TransformEngineChain createTransformEngineChain()
{
ArrayList engineChain
- = new ArrayList(3);
+ = new ArrayList(3);
- //CSRCs and audio levels
- if(csrcEngine == null)
+ // CSRCs and audio levels
+ if (csrcEngine == null)
csrcEngine = new CsrcTransformEngine(this);
engineChain.add(csrcEngine);
- //DTMF
+ // DTMF
DtmfTransformEngine dtmfEngine = createDtmfTransformEngine();
- if(dtmfEngine != null)
+ if (dtmfEngine != null)
engineChain.add(dtmfEngine);
- //ZRTP
+ // ZRTP
engineChain.add(zrtpControl.getZrtpEngine());
- return new TransformEngineChain( engineChain.toArray(
- new TransformEngine[engineChain.size()]));
+ return
+ new TransformEngineChain(
+ engineChain.toArray(
+ new TransformEngine[engineChain.size()]));
}
/**
@@ -896,15 +922,21 @@ public ZrtpControl getZrtpControl()
*/
private void restartZrtpControl()
{
- // if there is no current secure communication we don't need to do that
+ /*
+ * If there is no current secure communication, we don't need to do
+ * that.
+ */
if(!zrtpControl.getSecureCommunicationStatus())
return;
zrtpControl.cleanup();
- // as we are recreating this stream and it was obviously secured
- // it may happen we receive unencrepted data and we will hear
- // noise, so we mute it till secure connection is again established
+ /*
+ * As we are recreating this stream and it was obviously secured, it may
+ * happen so that we receive unencrepted data. Which will produce noise
+ * for us to hear. So we mute it till a secure connection is again
+ * established.
+ */
zrtpControl.getZrtpEngine().setStartMuted(true);
this.zrtpControl.setConnector(rtpConnector);
@@ -1657,16 +1689,16 @@ else if (event instanceof TimeoutEvent)
{
ReceiveStream receiveStream = event.getReceiveStream();
- // if we recreate streams we will already have restarted
- // zrtp control
- // but when on the other end some one has recreated his streams
- // we will received a ByeEvent(TimeoutEvent) and so we must also
- // restart our zrtp, this happens when we are already in a call
- // and the other party starts an conf call
+ /*
+ * If we recreate streams, we will already have restarted
+ * zrtpControl. But when on the other end someone recreates his
+ * streams, we will receive a ByeEvent (which extends TimeoutEvent)
+ * and then we must also restart our ZRTP. This happens, for
+ * example, when we are already in a call and the remote peer
+ * converts his side of the call into a conference call.
+ */
if(!zrtpRestarted)
- {
restartZrtpControl();
- }
if (receiveStream != null)
{
diff --git a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
index 84fd2251d..9e866ea85 100755
--- a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
+++ b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
@@ -13,12 +13,33 @@
import javax.media.rtp.*;
/**
+ *
* @author Bing SU (nova.su@gmail.com)
* @author Lubomir Marinov
*/
public class RTPConnectorOutputStream
implements OutputDataStream
{
+
+ /**
+ * The maximum number of packets to be sent to be kept in the queue of
+ * MaxPacketsPerMillisPolicy. When the maximum is reached, the next
+ * attempt to write a new packet in the queue will block until at least one
+ * packet from the queue is sent. Defined in order to prevent
+ * OutOfMemoryErrors which, technically, may arise if the capacity
+ * of the queue is unlimited.
+ */
+ private static final int
+ MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY
+ = 256;
+
+ /**
+ * The functionality which allows this OutputDataStream to control
+ * how many RTP packets it sends through its DatagramSocket per a
+ * specific number of milliseconds.
+ */
+ private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy;
+
/**
* UDP socket used to send packet data
*/
@@ -77,8 +98,8 @@ protected RawPacket createRawPacket(byte[] buffer, int offset, int length)
*
* @param remoteAddr target ip address
* @param remotePort target port
- * @return true if the target is in stream target list and can be removed
- * false if not
+ * @return true if the target is in stream target list and can be
+ * removed; false, otherwise
*/
public boolean removeTarget(InetAddress remoteAddr, int remotePort)
{
@@ -105,37 +126,321 @@ public void removeTargets()
targets.clear();
}
- /*
- * Implements OutputDataStream#write(byte[], int, int).
+ /**
+ * Sends a specific RTP packet through the DatagramSocket of this
+ * OutputDataSource.
+ *
+ * @param packet the RTP packet to be sent through the
+ * DatagramSocket of this OutputDataSource
+ * @return true if the specified packet was successfully
+ * sent; otherwise, false
*/
- public int write(byte[] buffer, int offset, int length)
+ private boolean send(RawPacket packet)
{
- RawPacket pkt = createRawPacket(buffer, offset, length);
-
- /*
- * If we got extended, the delivery of the packet may have been
- * canceled.
- */
- if (pkt == null)
- return length;
-
for (InetSocketAddress target : targets)
+ {
try
{
- socket
- .send(
+ socket.send(
new DatagramPacket(
- pkt.getBuffer(),
- pkt.getOffset(),
- pkt.getLength(),
+ packet.getBuffer(),
+ packet.getOffset(),
+ packet.getLength(),
target.getAddress(),
target.getPort()));
}
catch (IOException ex)
{
// TODO error handling
- return -1;
+ return false;
}
+ }
+ return true;
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * OutputDataStream through its DatagramSocket per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by this
+ * OutputDataStream through its DatagramSocket per the
+ * specified number of milliseconds; -1 if no maximum is to be set
+ * @param perMillis the number of milliseconds per which maxPackets
+ * are to be sent by this OutputDataStream through its
+ * DatagramSocket
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (maxPackets > 0)
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ maxPacketsPerMillisPolicy
+ = new MaxPacketsPerMillisPolicy(maxPackets, perMillis);
+ }
+ }
+ else
+ {
+ maxPacketsPerMillisPolicy
+ .setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+ }
+
+ /**
+ * Implements {@link OutputDataStream#write(byte[], int, int)}.
+ *
+ * @param buffer
+ * @param offset
+ * @param length
+ * @return
+ */
+ public int write(byte[] buffer, int offset, int length)
+ {
+ if (maxPacketsPerMillisPolicy != null)
+ {
+ byte[] newBuffer = new byte[length];
+
+ System.arraycopy(buffer, offset, newBuffer, 0, length);
+ buffer = newBuffer;
+ offset = 0;
+ }
+
+ RawPacket packet = createRawPacket(buffer, offset, length);
+
+ /*
+ * If we got extended, the delivery of the packet may have been
+ * canceled.
+ */
+ if (packet != null)
+ {
+ if (maxPacketsPerMillisPolicy == null)
+ {
+ if (!send(packet))
+ return -1;
+ }
+ else
+ maxPacketsPerMillisPolicy.write(packet);
+ }
return length;
}
+
+ /**
+ * Implements the functionality which allows this OutputDataStream
+ * to control how many RTP packets it sends through its
+ * DatagramSocket per a specific number of milliseconds.
+ */
+ private class MaxPacketsPerMillisPolicy
+ {
+
+ /**
+ * The maximum number of RTP packets to be sent by this
+ * OutputDataStream through its DatagramSocket per
+ * {@link #perMillis} milliseconds.
+ */
+ private int maxPackets = -1;
+
+ /**
+ * The time stamp in nanoseconds of the start of the current
+ * perNanos interval.
+ */
+ private long millisStartTime = 0;
+
+ /**
+ * The list of RTP packets to be sent through the
+ * DatagramSocket of this OutputDataSource.
+ */
+ private final List packetQueue
+ = new LinkedList();
+
+ /**
+ * The number of RTP packets already sent during the current
+ * perNanos interval.
+ */
+ private long packetsSentInMillis = 0;
+
+ /**
+ * The time interval in nanoseconds during which {@link #maxPackets}
+ * number of RTP packets are to be sent through the
+ * DatagramSocket of this OutputDataSource.
+ */
+ private long perNanos = -1;
+
+ /**
+ * The Thread which is to send the RTP packets in
+ * {@link #packetQueue} through the DatagramSocket of this
+ * OutputDataSource.
+ */
+ private Thread sendThread;
+
+ /**
+ * Initializes a new MaxPacketsPerMillisPolicy instance which
+ * is to control how many RTP packets this OutputDataSource is
+ * to send through its DatagramSocket per a specific number of
+ * milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent per
+ * perMillis milliseconds through the DatagramSocket
+ * of this OutputDataStream
+ * @param perMillis the number of milliseconds per which a maximum of
+ * maxPackets RTP packets are to be sent through the
+ * DatagramSocket of this OutputDataStream
+ */
+ public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
+ {
+ setMaxPacketsPerMillis(maxPackets, perMillis);
+ }
+
+ /**
+ * Sends the RTP packets in {@link #packetQueue} in accord with
+ * {@link #maxPackets} and {@link #perMillis}.
+ */
+ private void runInSendThread()
+ {
+ try
+ {
+ while (true)
+ {
+ RawPacket packet;
+
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size() < 1)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packet = packetQueue.remove(0);
+ packetQueue.notifyAll();
+ }
+
+ long time = System.nanoTime();
+ long millisRemainingTime = time - millisStartTime;
+
+ if ((perNanos < 1)
+ || (millisRemainingTime >= perNanos))
+ {
+ millisStartTime = time;
+ packetsSentInMillis = 0;
+ }
+ else if ((maxPackets > 0)
+ && (packetsSentInMillis >= maxPackets))
+ {
+ while (true)
+ {
+ millisRemainingTime
+ = System.nanoTime() - millisStartTime;
+ if (millisRemainingTime >= perNanos)
+ break;
+ try
+ {
+ Thread.sleep(
+ millisRemainingTime / 1000000,
+ (int) (millisRemainingTime % 1000000));
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ millisStartTime = System.nanoTime();
+ packetsSentInMillis = 0;
+ }
+
+ send(packet);
+ packetsSentInMillis++;
+ }
+ }
+ finally
+ {
+ synchronized (packetQueue)
+ {
+ if (Thread.currentThread().equals(sendThread))
+ sendThread = null;
+ }
+ }
+ }
+
+ /**
+ * Sets the maximum number of RTP packets to be sent by this
+ * OutputDataStream through its DatagramSocket per
+ * a specific number of milliseconds.
+ *
+ * @param maxPackets the maximum number of RTP packets to be sent by
+ * this OutputDataStream through its DatagramSocket
+ * per the specified number of milliseconds; -1 if no maximum
+ * is to be set
+ * @param perMillis the number of milliseconds per which
+ * maxPackets are to be sent by this OutputDataStream
+ * through its DatagramSocket
+ */
+ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
+ {
+ if (maxPackets < 1)
+ {
+ this.maxPackets = -1;
+ this.perNanos = -1;
+ }
+ else
+ {
+ if (perMillis < 1)
+ throw new IllegalArgumentException("perMillis");
+
+ this.maxPackets = maxPackets;
+ this.perNanos = perMillis * 1000000;
+ }
+ }
+
+ /**
+ * Queues a specific RTP packet to be sent through the
+ * DatagramSocket of this OutputDataStream.
+ *
+ * @param packet the RTP packet to be queued for sending through the
+ * DatagramSocket of this OutputDataStream
+ */
+ public void write(RawPacket packet)
+ {
+ synchronized (packetQueue)
+ {
+ while (packetQueue.size()
+ >= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY)
+ {
+ try
+ {
+ packetQueue.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+
+ packetQueue.add(packet);
+
+ if (sendThread == null)
+ {
+ sendThread
+ = new Thread(getClass().getName())
+ {
+ @Override
+ public void run()
+ {
+ runInSendThread();
+ }
+ };
+ sendThread.setDaemon(true);
+ sendThread.start();
+ }
+
+ packetQueue.notifyAll();
+ }
+ }
+ }
}
diff --git a/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java b/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java
index 755fcac1b..8c0e9471c 100644
--- a/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java
+++ b/src/net/java/sip/communicator/impl/neomedia/VideoMediaStreamImpl.java
@@ -16,6 +16,7 @@
import javax.media.protocol.*;
import javax.media.rtp.*;
+import net.java.sip.communicator.impl.neomedia.*;
import net.java.sip.communicator.impl.neomedia.codec.*;
import net.java.sip.communicator.impl.neomedia.device.*;
import net.java.sip.communicator.service.neomedia.*;
@@ -268,6 +269,24 @@ public void addVideoListener(VideoListener listener)
videoNotifierSupport.addVideoListener(listener);
}
+ /**
+ * Performs any optional configuration on a specific
+ * RTPConnectorOuputStream of an RTPManager to be used by
+ * this MediaStreamImpl.
+ *
+ * @param dataOutputStream the RTPConnectorOutputStream to be used
+ * by an RTPManager of this MediaStreamImpl and to be
+ * configured
+ */
+ @Override
+ protected void configureDataOutputStream(
+ RTPConnectorOutputStream dataOutputStream)
+ {
+ super.configureDataOutputStream(dataOutputStream);
+
+ dataOutputStream.setMaxPacketsPerMillis(1, 10);
+ }
+
/**
* Performs any optional configuration on the BufferControl of the
* specified RTPManager which is to be used as the
@@ -283,6 +302,8 @@ protected void configureRTPManagerBufferControl(
RTPManager rtpManager,
BufferControl bufferControl)
{
+ super.configureRTPManagerBufferControl(rtpManager, bufferControl);
+
bufferControl.setBufferLength(BufferControl.MAX_VALUE);
}
diff --git a/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java b/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java
index 0ae4e3127..a380732d5 100644
--- a/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java
+++ b/src/net/java/sip/communicator/impl/neomedia/codec/video/h264/DePacketizer.java
@@ -46,6 +46,18 @@ public class DePacketizer
*/
private static final byte[] NAL_PREFIX = { 0, 0, 1 };
+ /**
+ * The indicator which determines whether incomplete NAL units are output
+ * from the H.264 DePacketizer to the decoder. It is advisable to
+ * output incomplete NAL units because the FFmpeg H.264 decoder is able to
+ * decode them. If false, incomplete NAL units will be discarded
+ * and, consequently, the video quality will be worse (e.g. if the last RTP
+ * packet of a fragmented NAL unit carrying a keyframe does not arrive from
+ * the network, the whole keyframe will be discarded and thus all NAL units
+ * upto the next keyframe will be useless).
+ */
+ private static final boolean OUTPUT_INCOMPLETE_NAL_UNITS = true;
+
/**
* Interval between a PLI request and its reemission (in milliseconds).
*/
@@ -99,8 +111,8 @@ public class DePacketizer
private long remoteSSRC = -1;
/**
- * Use or not RTCP PLI message when depacketizer miss
- * packets.
+ * The indicator which determines whether RTCP PLI is to be used when this
+ * DePacketizer detects that video data has been lost.
*/
private boolean usePLI = false;
@@ -475,7 +487,8 @@ private int reset(Buffer outBuffer)
* are only given meaning for the purposes of the network and not the
* H.264 decoder.
*/
- if (fuaStartedAndNotEnded
+ if (OUTPUT_INCOMPLETE_NAL_UNITS
+ && fuaStartedAndNotEnded
&& (outBuffer.getLength() >= (NAL_PREFIX.length + 1 + 1)))
{
Object outData = outBuffer.getData();
@@ -549,8 +562,9 @@ public void setRtcpFeedbackPLI(boolean use)
private class PLISendThread extends Thread
{
/**
- * Entry point of the thread.
+ * Represents the entry point of PLISendThread.
*/
+ @Override
public void run()
{
while(isPLIThreadRunning)