diff --git a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorImpl.java b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorImpl.java
index 80a889f03..823505931 100755
--- a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorImpl.java
+++ b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorImpl.java
@@ -102,8 +102,17 @@ public void addTarget(SessionAddress target)
*/
public void close()
{
- dataOutputStream = null;
- controlOutputStream = null;
+ if (dataOutputStream != null)
+ {
+ dataOutputStream.close();
+ dataOutputStream = null;
+ }
+
+ if (controlOutputStream != null)
+ {
+ controlOutputStream.close();
+ controlOutputStream = null;
+ }
if (dataInputStream != null)
{
diff --git a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
index 9e866ea85..88ad4396b 100755
--- a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
+++ b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java
@@ -9,6 +9,8 @@
import java.io.*;
import java.net.*;
import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
import javax.media.rtp.*;
@@ -73,6 +75,19 @@ public void addTarget(InetAddress remoteAddr, int remotePort)
targets.add(new InetSocketAddress(remoteAddr, remotePort));
}
+ /**
+ * Close this output stream.
+ */
+ public void close()
+ {
+ if (maxPacketsPerMillisPolicy != null)
+ {
+ maxPacketsPerMillisPolicy.close();
+ }
+ maxPacketsPerMillisPolicy = null;
+ removeTargets();
+ }
+
/**
* Creates a new RawPacket from a specific byte[] buffer
* in order to have this instance send its packet data through its
@@ -253,8 +268,8 @@ private class MaxPacketsPerMillisPolicy
* The list of RTP packets to be sent through the
* DatagramSocket of this OutputDataSource.
*/
- private final List packetQueue
- = new LinkedList();
+ private final ArrayBlockingQueue packetQueue
+ = new ArrayBlockingQueue(MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY);
/**
* The number of RTP packets already sent during the current
@@ -275,6 +290,11 @@ private class MaxPacketsPerMillisPolicy
* OutputDataSource.
*/
private Thread sendThread;
+
+ /**
+ * To signal run or stop condition to send thread.
+ */
+ private boolean sendRun = true;
/**
* Initializes a new MaxPacketsPerMillisPolicy instance which
@@ -292,6 +312,32 @@ private class MaxPacketsPerMillisPolicy
public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
{
setMaxPacketsPerMillis(maxPackets, perMillis);
+ synchronized (this) {
+ if (sendThread == null)
+ {
+ sendThread
+ = new Thread(getClass().getName())
+ {
+ @Override
+ public void run()
+ {
+ runInSendThread();
+ }
+ };
+ sendThread.setDaemon(true);
+ sendThread.start();
+ }
+ }
+ }
+
+ synchronized void close()
+ {
+ if (!sendRun)
+ return;
+ sendRun = false;
+ // just offer a new packt to wakeup thread in case it waits for
+ // a packet.
+ packetQueue.offer(new RawPacket(null, 0, 0));
}
/**
@@ -302,27 +348,25 @@ private void runInSendThread()
{
try
{
- while (true)
+ while (sendRun)
{
- RawPacket packet;
+ RawPacket packet = null;
- synchronized (packetQueue)
+ while (true)
{
- while (packetQueue.size() < 1)
+ try
{
- try
- {
- packetQueue.wait();
- }
- catch (InterruptedException iex)
- {
- }
+ packet = packetQueue.take();
+ break;
+ }
+ catch (InterruptedException iex)
+ {
+ continue;
}
-
- packet = packetQueue.remove(0);
- packetQueue.notifyAll();
}
-
+ if (!sendRun)
+ break;
+
long time = System.nanoTime();
long millisRemainingTime = time - millisStartTime;
@@ -335,21 +379,13 @@ private void runInSendThread()
else if ((maxPackets > 0)
&& (packetsSentInMillis >= maxPackets))
{
- while (true)
+ while (true)
{
- millisRemainingTime
- = System.nanoTime() - millisStartTime;
+ millisRemainingTime = System.nanoTime()
+ - millisStartTime;
if (millisRemainingTime >= perNanos)
break;
- try
- {
- Thread.sleep(
- millisRemainingTime / 1000000,
- (int) (millisRemainingTime % 1000000));
- }
- catch (InterruptedException iex)
- {
- }
+ LockSupport.parkNanos(millisRemainingTime);
}
millisStartTime = System.nanoTime();
packetsSentInMillis = 0;
@@ -361,6 +397,7 @@ else if ((maxPackets > 0)
}
finally
{
+ packetQueue.clear();
synchronized (packetQueue)
{
if (Thread.currentThread().equals(sendThread))
@@ -408,38 +445,17 @@ public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
*/
public void write(RawPacket packet)
{
- synchronized (packetQueue)
+ while (true)
{
- while (packetQueue.size()
- >= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY)
+ try
{
- try
- {
- packetQueue.wait();
- }
- catch (InterruptedException iex)
- {
- }
+ packetQueue.put(packet);
+ break;
}
-
- packetQueue.add(packet);
-
- if (sendThread == null)
+ catch (InterruptedException iex)
{
- sendThread
- = new Thread(getClass().getName())
- {
- @Override
- public void run()
- {
- runInSendThread();
- }
- };
- sendThread.setDaemon(true);
- sendThread.start();
+ continue;
}
-
- packetQueue.notifyAll();
}
}
}