Use BlockingQueue and add methods to stop send thread.

Use a standard Java BlockingQueue (from concurrent package) to maintain
a maximum number of outstanding packets for send.

Introduce and call methods to gracefully stop send thread.
cusax-fix
Werner Dittmann 16 years ago
parent bad092320e
commit c2bcedcf6a

@ -102,8 +102,17 @@ public void addTarget(SessionAddress target)
*/
public void close()
{
dataOutputStream = null;
controlOutputStream = null;
if (dataOutputStream != null)
{
dataOutputStream.close();
dataOutputStream = null;
}
if (controlOutputStream != null)
{
controlOutputStream.close();
controlOutputStream = null;
}
if (dataInputStream != null)
{

@ -9,6 +9,8 @@
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import javax.media.rtp.*;
@ -73,6 +75,19 @@ public void addTarget(InetAddress remoteAddr, int remotePort)
targets.add(new InetSocketAddress(remoteAddr, remotePort));
}
/**
* Close this output stream.
*/
public void close()
{
if (maxPacketsPerMillisPolicy != null)
{
maxPacketsPerMillisPolicy.close();
}
maxPacketsPerMillisPolicy = null;
removeTargets();
}
/**
* Creates a new <tt>RawPacket</tt> from a specific <tt>byte[]</tt> buffer
* in order to have this instance send its packet data through its
@ -253,8 +268,8 @@ private class MaxPacketsPerMillisPolicy
* The list of RTP packets to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
*/
private final List<RawPacket> packetQueue
= new LinkedList<RawPacket>();
private final ArrayBlockingQueue<RawPacket> packetQueue
= new ArrayBlockingQueue<RawPacket>(MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY);
/**
* The number of RTP packets already sent during the current
@ -275,6 +290,11 @@ private class MaxPacketsPerMillisPolicy
* <tt>OutputDataSource</tt>.
*/
private Thread sendThread;
/**
* To signal run or stop condition to send thread.
*/
private boolean sendRun = true;
/**
* Initializes a new <tt>MaxPacketsPerMillisPolicy</tt> instance which
@ -292,6 +312,32 @@ private class MaxPacketsPerMillisPolicy
public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
{
setMaxPacketsPerMillis(maxPackets, perMillis);
synchronized (this) {
if (sendThread == null)
{
sendThread
= new Thread(getClass().getName())
{
@Override
public void run()
{
runInSendThread();
}
};
sendThread.setDaemon(true);
sendThread.start();
}
}
}
synchronized void close()
{
if (!sendRun)
return;
sendRun = false;
// just offer a new packt to wakeup thread in case it waits for
// a packet.
packetQueue.offer(new RawPacket(null, 0, 0));
}
/**
@ -302,27 +348,25 @@ private void runInSendThread()
{
try
{
while (true)
while (sendRun)
{
RawPacket packet;
RawPacket packet = null;
synchronized (packetQueue)
while (true)
{
while (packetQueue.size() < 1)
try
{
try
{
packetQueue.wait();
}
catch (InterruptedException iex)
{
}
packet = packetQueue.take();
break;
}
catch (InterruptedException iex)
{
continue;
}
packet = packetQueue.remove(0);
packetQueue.notifyAll();
}
if (!sendRun)
break;
long time = System.nanoTime();
long millisRemainingTime = time - millisStartTime;
@ -335,21 +379,13 @@ private void runInSendThread()
else if ((maxPackets > 0)
&& (packetsSentInMillis >= maxPackets))
{
while (true)
while (true)
{
millisRemainingTime
= System.nanoTime() - millisStartTime;
millisRemainingTime = System.nanoTime()
- millisStartTime;
if (millisRemainingTime >= perNanos)
break;
try
{
Thread.sleep(
millisRemainingTime / 1000000,
(int) (millisRemainingTime % 1000000));
}
catch (InterruptedException iex)
{
}
LockSupport.parkNanos(millisRemainingTime);
}
millisStartTime = System.nanoTime();
packetsSentInMillis = 0;
@ -361,6 +397,7 @@ else if ((maxPackets > 0)
}
finally
{
packetQueue.clear();
synchronized (packetQueue)
{
if (Thread.currentThread().equals(sendThread))
@ -408,38 +445,17 @@ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
*/
public void write(RawPacket packet)
{
synchronized (packetQueue)
while (true)
{
while (packetQueue.size()
>= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY)
try
{
try
{
packetQueue.wait();
}
catch (InterruptedException iex)
{
}
packetQueue.put(packet);
break;
}
packetQueue.add(packet);
if (sendThread == null)
catch (InterruptedException iex)
{
sendThread
= new Thread(getClass().getName())
{
@Override
public void run()
{
runInSendThread();
}
};
sendThread.setDaemon(true);
sendThread.start();
continue;
}
packetQueue.notifyAll();
}
}
}

Loading…
Cancel
Save