Allows controlling the number of RTP packets sent per millisecond in the video stream of a call.

cusax-fix
Lyubomir Marinov 16 years ago
parent 4129f18be7
commit 715acd65b3

@ -20,6 +20,7 @@
import com.sun.media.rtp.*;
import net.java.sip.communicator.impl.neomedia.*;
import net.java.sip.communicator.impl.neomedia.device.*;
import net.java.sip.communicator.impl.neomedia.format.*;
import net.java.sip.communicator.impl.neomedia.transform.*;
@ -172,11 +173,6 @@ else if (MediaDeviceSession
*/
private boolean mute = false;
/**
* The current <tt>ZrtpControl</tt>
*/
private ZrtpControlImpl zrtpControl = null;
/**
* The map of currently active <tt>RTPExtension</tt>s and the IDs that they
* have been assigned for the lifetime of this <tt>MediaStream</tt>.
@ -196,6 +192,11 @@ else if (MediaDeviceSession
protected Map<String, String> advancedAttributes =
new Hashtable<String, String>();
/**
* The current <tt>ZrtpControl</tt>.
*/
private final ZrtpControlImpl zrtpControl;
/**
* Needed when restarting zrtp control.
*/
@ -214,30 +215,53 @@ else if (MediaDeviceSession
* @param zrtpControl a control which is already created, used to control
* the zrtp operations.
*/
public MediaStreamImpl(StreamConnector connector, MediaDevice device,
ZrtpControlImpl zrtpControl)
public MediaStreamImpl(
StreamConnector connector,
MediaDevice device,
ZrtpControlImpl zrtpControl)
{
/*
* XXX Set the device early in order to make sure that its of the right
* type because we do not support just about any MediaDevice yet.
* XXX Set the device early in order to make sure that it is of the
* right type because we do not support just about any MediaDevice yet.
*/
setDevice(device);
this.rtpConnector = new RTPTransformConnector(connector);
rtpConnector
= new RTPTransformConnector(connector)
{
@Override
protected TransformOutputStream createDataOutputStream()
throws IOException
{
TransformOutputStream dataOutputStream
= super.createDataOutputStream();
if(zrtpControl != null)
{
this.zrtpControl = zrtpControl;
}
else
this.zrtpControl = new ZrtpControlImpl();
if (dataOutputStream != null)
configureDataOutputStream(dataOutputStream);
return dataOutputStream;
}
};
this.zrtpControl
= (zrtpControl == null) ? new ZrtpControlImpl() : zrtpControl;
this.zrtpControl.setConnector(rtpConnector);
//register the transform engines that we will be using in this stream.
TransformEngineChain engineChain = createTransformEngineChain();
// Register the transform engines that we will be using in this stream.
rtpConnector.setEngine(createTransformEngineChain());
}
rtpConnector.setEngine(engineChain);
/**
* Performs any optional configuration on a specific
* <tt>RTPConnectorOuputStream</tt> of an <tt>RTPManager</tt> to be used by
* this <tt>MediaStreamImpl</tt>. Allows extenders to override.
*
* @param dataOutputStream the <tt>RTPConnectorOutputStream</tt> to be used
* by an <tt>RTPManager</tt> of this <tt>MediaStreamImpl</tt> and to be
* configured
*/
protected void configureDataOutputStream(
RTPConnectorOutputStream dataOutputStream)
{
}
/**
@ -269,25 +293,27 @@ protected void configureRTPManagerBufferControl(
private TransformEngineChain createTransformEngineChain()
{
ArrayList<TransformEngine> engineChain
= new ArrayList<TransformEngine>(3);
= new ArrayList<TransformEngine>(3);
//CSRCs and audio levels
if(csrcEngine == null)
// CSRCs and audio levels
if (csrcEngine == null)
csrcEngine = new CsrcTransformEngine(this);
engineChain.add(csrcEngine);
//DTMF
// DTMF
DtmfTransformEngine dtmfEngine = createDtmfTransformEngine();
if(dtmfEngine != null)
if (dtmfEngine != null)
engineChain.add(dtmfEngine);
//ZRTP
// ZRTP
engineChain.add(zrtpControl.getZrtpEngine());
return new TransformEngineChain( engineChain.toArray(
new TransformEngine[engineChain.size()]));
return
new TransformEngineChain(
engineChain.toArray(
new TransformEngine[engineChain.size()]));
}
/**
@ -896,15 +922,21 @@ public ZrtpControl getZrtpControl()
*/
private void restartZrtpControl()
{
// if there is no current secure communication we don't need to do that
/*
* If there is no current secure communication, we don't need to do
* that.
*/
if(!zrtpControl.getSecureCommunicationStatus())
return;
zrtpControl.cleanup();
// as we are recreating this stream and it was obviously secured
// it may happen we receive unencrepted data and we will hear
// noise, so we mute it till secure connection is again established
/*
* As we are recreating this stream and it was obviously secured, it may
* happen so that we receive unencrepted data. Which will produce noise
* for us to hear. So we mute it till a secure connection is again
* established.
*/
zrtpControl.getZrtpEngine().setStartMuted(true);
this.zrtpControl.setConnector(rtpConnector);
@ -1657,16 +1689,16 @@ else if (event instanceof TimeoutEvent)
{
ReceiveStream receiveStream = event.getReceiveStream();
// if we recreate streams we will already have restarted
// zrtp control
// but when on the other end some one has recreated his streams
// we will received a ByeEvent(TimeoutEvent) and so we must also
// restart our zrtp, this happens when we are already in a call
// and the other party starts an conf call
/*
* If we recreate streams, we will already have restarted
* zrtpControl. But when on the other end someone recreates his
* streams, we will receive a ByeEvent (which extends TimeoutEvent)
* and then we must also restart our ZRTP. This happens, for
* example, when we are already in a call and the remote peer
* converts his side of the call into a conference call.
*/
if(!zrtpRestarted)
{
restartZrtpControl();
}
if (receiveStream != null)
{

@ -13,12 +13,33 @@
import javax.media.rtp.*;
/**
*
* @author Bing SU (nova.su@gmail.com)
* @author Lubomir Marinov
*/
public class RTPConnectorOutputStream
implements OutputDataStream
{
/**
* The maximum number of packets to be sent to be kept in the queue of
* <tt>MaxPacketsPerMillisPolicy</tt>. When the maximum is reached, the next
* attempt to write a new packet in the queue will block until at least one
* packet from the queue is sent. Defined in order to prevent
* <tt>OutOfMemoryError</tt>s which, technically, may arise if the capacity
* of the queue is unlimited.
*/
private static final int
MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY
= 256;
/**
* The functionality which allows this <tt>OutputDataStream</tt> to control
* how many RTP packets it sends through its <tt>DatagramSocket</tt> per a
* specific number of milliseconds.
*/
private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy;
/**
* UDP socket used to send packet data
*/
@ -77,8 +98,8 @@ protected RawPacket createRawPacket(byte[] buffer, int offset, int length)
*
* @param remoteAddr target ip address
* @param remotePort target port
* @return true if the target is in stream target list and can be removed
* false if not
* @return <tt>true</tt> if the target is in stream target list and can be
* removed; <tt>false</tt>, otherwise
*/
public boolean removeTarget(InetAddress remoteAddr, int remotePort)
{
@ -105,37 +126,321 @@ public void removeTargets()
targets.clear();
}
/*
* Implements OutputDataStream#write(byte[], int, int).
/**
* Sends a specific RTP packet through the <tt>DatagramSocket</tt> of this
* <tt>OutputDataSource</tt>.
*
* @param packet the RTP packet to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>
* @return <tt>true</tt> if the specified <tt>packet</tt> was successfully
* sent; otherwise, <tt>false</tt>
*/
public int write(byte[] buffer, int offset, int length)
private boolean send(RawPacket packet)
{
RawPacket pkt = createRawPacket(buffer, offset, length);
/*
* If we got extended, the delivery of the packet may have been
* canceled.
*/
if (pkt == null)
return length;
for (InetSocketAddress target : targets)
{
try
{
socket
.send(
socket.send(
new DatagramPacket(
pkt.getBuffer(),
pkt.getOffset(),
pkt.getLength(),
packet.getBuffer(),
packet.getOffset(),
packet.getLength(),
target.getAddress(),
target.getPort()));
}
catch (IOException ex)
{
// TODO error handling
return -1;
return false;
}
}
return true;
}
/**
* Sets the maximum number of RTP packets to be sent by this
* <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
* a specific number of milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent by this
* <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per the
* specified number of milliseconds; <tt>-1</tt> if no maximum is to be set
* @param perMillis the number of milliseconds per which <tt>maxPackets</tt>
* are to be sent by this <tt>OutputDataStream</tt> through its
* <tt>DatagramSocket</tt>
*/
public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
{
if (maxPacketsPerMillisPolicy == null)
{
if (maxPackets > 0)
{
if (perMillis < 1)
throw new IllegalArgumentException("perMillis");
maxPacketsPerMillisPolicy
= new MaxPacketsPerMillisPolicy(maxPackets, perMillis);
}
}
else
{
maxPacketsPerMillisPolicy
.setMaxPacketsPerMillis(maxPackets, perMillis);
}
}
/**
* Implements {@link OutputDataStream#write(byte[], int, int)}.
*
* @param buffer
* @param offset
* @param length
* @return
*/
public int write(byte[] buffer, int offset, int length)
{
if (maxPacketsPerMillisPolicy != null)
{
byte[] newBuffer = new byte[length];
System.arraycopy(buffer, offset, newBuffer, 0, length);
buffer = newBuffer;
offset = 0;
}
RawPacket packet = createRawPacket(buffer, offset, length);
/*
* If we got extended, the delivery of the packet may have been
* canceled.
*/
if (packet != null)
{
if (maxPacketsPerMillisPolicy == null)
{
if (!send(packet))
return -1;
}
else
maxPacketsPerMillisPolicy.write(packet);
}
return length;
}
/**
* Implements the functionality which allows this <tt>OutputDataStream</tt>
* to control how many RTP packets it sends through its
* <tt>DatagramSocket</tt> per a specific number of milliseconds.
*/
private class MaxPacketsPerMillisPolicy
{
/**
* The maximum number of RTP packets to be sent by this
* <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
* {@link #perMillis} milliseconds.
*/
private int maxPackets = -1;
/**
* The time stamp in nanoseconds of the start of the current
* <tt>perNanos</tt> interval.
*/
private long millisStartTime = 0;
/**
* The list of RTP packets to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
*/
private final List<RawPacket> packetQueue
= new LinkedList<RawPacket>();
/**
* The number of RTP packets already sent during the current
* <tt>perNanos</tt> interval.
*/
private long packetsSentInMillis = 0;
/**
* The time interval in nanoseconds during which {@link #maxPackets}
* number of RTP packets are to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
*/
private long perNanos = -1;
/**
* The <tt>Thread</tt> which is to send the RTP packets in
* {@link #packetQueue} through the <tt>DatagramSocket</tt> of this
* <tt>OutputDataSource</tt>.
*/
private Thread sendThread;
/**
* Initializes a new <tt>MaxPacketsPerMillisPolicy</tt> instance which
* is to control how many RTP packets this <tt>OutputDataSource</tt> is
* to send through its <tt>DatagramSocket</tt> per a specific number of
* milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent per
* <tt>perMillis</tt> milliseconds through the <tt>DatagramSocket</tt>
* of this <tt>OutputDataStream</tt>
* @param perMillis the number of milliseconds per which a maximum of
* <tt>maxPackets</tt> RTP packets are to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
*/
public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
{
setMaxPacketsPerMillis(maxPackets, perMillis);
}
/**
* Sends the RTP packets in {@link #packetQueue} in accord with
* {@link #maxPackets} and {@link #perMillis}.
*/
private void runInSendThread()
{
try
{
while (true)
{
RawPacket packet;
synchronized (packetQueue)
{
while (packetQueue.size() < 1)
{
try
{
packetQueue.wait();
}
catch (InterruptedException iex)
{
}
}
packet = packetQueue.remove(0);
packetQueue.notifyAll();
}
long time = System.nanoTime();
long millisRemainingTime = time - millisStartTime;
if ((perNanos < 1)
|| (millisRemainingTime >= perNanos))
{
millisStartTime = time;
packetsSentInMillis = 0;
}
else if ((maxPackets > 0)
&& (packetsSentInMillis >= maxPackets))
{
while (true)
{
millisRemainingTime
= System.nanoTime() - millisStartTime;
if (millisRemainingTime >= perNanos)
break;
try
{
Thread.sleep(
millisRemainingTime / 1000000,
(int) (millisRemainingTime % 1000000));
}
catch (InterruptedException iex)
{
}
}
millisStartTime = System.nanoTime();
packetsSentInMillis = 0;
}
send(packet);
packetsSentInMillis++;
}
}
finally
{
synchronized (packetQueue)
{
if (Thread.currentThread().equals(sendThread))
sendThread = null;
}
}
}
/**
* Sets the maximum number of RTP packets to be sent by this
* <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
* a specific number of milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent by
* this <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt>
* per the specified number of milliseconds; <tt>-1</tt> if no maximum
* is to be set
* @param perMillis the number of milliseconds per which
* <tt>maxPackets</tt> are to be sent by this <tt>OutputDataStream</tt>
* through its <tt>DatagramSocket</tt>
*/
public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
{
if (maxPackets < 1)
{
this.maxPackets = -1;
this.perNanos = -1;
}
else
{
if (perMillis < 1)
throw new IllegalArgumentException("perMillis");
this.maxPackets = maxPackets;
this.perNanos = perMillis * 1000000;
}
}
/**
* Queues a specific RTP packet to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>.
*
* @param packet the RTP packet to be queued for sending through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
*/
public void write(RawPacket packet)
{
synchronized (packetQueue)
{
while (packetQueue.size()
>= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY)
{
try
{
packetQueue.wait();
}
catch (InterruptedException iex)
{
}
}
packetQueue.add(packet);
if (sendThread == null)
{
sendThread
= new Thread(getClass().getName())
{
@Override
public void run()
{
runInSendThread();
}
};
sendThread.setDaemon(true);
sendThread.start();
}
packetQueue.notifyAll();
}
}
}
}

@ -16,6 +16,7 @@
import javax.media.protocol.*;
import javax.media.rtp.*;
import net.java.sip.communicator.impl.neomedia.*;
import net.java.sip.communicator.impl.neomedia.codec.*;
import net.java.sip.communicator.impl.neomedia.device.*;
import net.java.sip.communicator.service.neomedia.*;
@ -268,6 +269,24 @@ public void addVideoListener(VideoListener listener)
videoNotifierSupport.addVideoListener(listener);
}
/**
* Performs any optional configuration on a specific
* <tt>RTPConnectorOuputStream</tt> of an <tt>RTPManager</tt> to be used by
* this <tt>MediaStreamImpl</tt>.
*
* @param dataOutputStream the <tt>RTPConnectorOutputStream</tt> to be used
* by an <tt>RTPManager</tt> of this <tt>MediaStreamImpl</tt> and to be
* configured
*/
@Override
protected void configureDataOutputStream(
RTPConnectorOutputStream dataOutputStream)
{
super.configureDataOutputStream(dataOutputStream);
dataOutputStream.setMaxPacketsPerMillis(1, 10);
}
/**
* Performs any optional configuration on the <tt>BufferControl</tt> of the
* specified <tt>RTPManager</tt> which is to be used as the
@ -283,6 +302,8 @@ protected void configureRTPManagerBufferControl(
RTPManager rtpManager,
BufferControl bufferControl)
{
super.configureRTPManagerBufferControl(rtpManager, bufferControl);
bufferControl.setBufferLength(BufferControl.MAX_VALUE);
}

@ -46,6 +46,18 @@ public class DePacketizer
*/
private static final byte[] NAL_PREFIX = { 0, 0, 1 };
/**
* The indicator which determines whether incomplete NAL units are output
* from the H.264 <tt>DePacketizer</tt> to the decoder. It is advisable to
* output incomplete NAL units because the FFmpeg H.264 decoder is able to
* decode them. If <tt>false</tt>, incomplete NAL units will be discarded
* and, consequently, the video quality will be worse (e.g. if the last RTP
* packet of a fragmented NAL unit carrying a keyframe does not arrive from
* the network, the whole keyframe will be discarded and thus all NAL units
* upto the next keyframe will be useless).
*/
private static final boolean OUTPUT_INCOMPLETE_NAL_UNITS = true;
/**
* Interval between a PLI request and its reemission (in milliseconds).
*/
@ -99,8 +111,8 @@ public class DePacketizer
private long remoteSSRC = -1;
/**
* Use or not RTCP PLI message when depacketizer miss
* packets.
* The indicator which determines whether RTCP PLI is to be used when this
* <tt>DePacketizer</tt> detects that video data has been lost.
*/
private boolean usePLI = false;
@ -475,7 +487,8 @@ private int reset(Buffer outBuffer)
* are only given meaning for the purposes of the network and not the
* H.264 decoder.
*/
if (fuaStartedAndNotEnded
if (OUTPUT_INCOMPLETE_NAL_UNITS
&& fuaStartedAndNotEnded
&& (outBuffer.getLength() >= (NAL_PREFIX.length + 1 + 1)))
{
Object outData = outBuffer.getData();
@ -549,8 +562,9 @@ public void setRtcpFeedbackPLI(boolean use)
private class PLISendThread extends Thread
{
/**
* Entry point of the thread.
* Represents the entry point of <tt>PLISendThread</tt>.
*/
@Override
public void run()
{
while(isPLIThreadRunning)

Loading…
Cancel
Save