From c2bcedcf6aaaa8d1851b356ac01d7f770ee72e49 Mon Sep 17 00:00:00 2001 From: Werner Dittmann Date: Tue, 11 May 2010 16:44:04 +0000 Subject: [PATCH] Use BlockingQueue and add methods to stop send thread. Use a standard Java BlockingQueue (from concurrent package) to maintain a maximum number of outstanding packets for send. Introduce and call methods to gracefully stop send thread. --- .../impl/neomedia/RTPConnectorImpl.java | 13 +- .../neomedia/RTPConnectorOutputStream.java | 128 ++++++++++-------- 2 files changed, 83 insertions(+), 58 deletions(-) diff --git a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorImpl.java b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorImpl.java index 80a889f03..823505931 100755 --- a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorImpl.java +++ b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorImpl.java @@ -102,8 +102,17 @@ public void addTarget(SessionAddress target) */ public void close() { - dataOutputStream = null; - controlOutputStream = null; + if (dataOutputStream != null) + { + dataOutputStream.close(); + dataOutputStream = null; + } + + if (controlOutputStream != null) + { + controlOutputStream.close(); + controlOutputStream = null; + } if (dataInputStream != null) { diff --git a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java index 9e866ea85..88ad4396b 100755 --- a/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java +++ b/src/net/java/sip/communicator/impl/neomedia/RTPConnectorOutputStream.java @@ -9,6 +9,8 @@ import java.io.*; import java.net.*; import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; import javax.media.rtp.*; @@ -73,6 +75,19 @@ public void addTarget(InetAddress remoteAddr, int remotePort) targets.add(new InetSocketAddress(remoteAddr, remotePort)); } + /** + * Close this output stream. + */ + public void close() + { + if (maxPacketsPerMillisPolicy != null) + { + maxPacketsPerMillisPolicy.close(); + } + maxPacketsPerMillisPolicy = null; + removeTargets(); + } + /** * Creates a new RawPacket from a specific byte[] buffer * in order to have this instance send its packet data through its @@ -253,8 +268,8 @@ private class MaxPacketsPerMillisPolicy * The list of RTP packets to be sent through the * DatagramSocket of this OutputDataSource. */ - private final List packetQueue - = new LinkedList(); + private final ArrayBlockingQueue packetQueue + = new ArrayBlockingQueue(MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY); /** * The number of RTP packets already sent during the current @@ -275,6 +290,11 @@ private class MaxPacketsPerMillisPolicy * OutputDataSource. */ private Thread sendThread; + + /** + * To signal run or stop condition to send thread. + */ + private boolean sendRun = true; /** * Initializes a new MaxPacketsPerMillisPolicy instance which @@ -292,6 +312,32 @@ private class MaxPacketsPerMillisPolicy public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis) { setMaxPacketsPerMillis(maxPackets, perMillis); + synchronized (this) { + if (sendThread == null) + { + sendThread + = new Thread(getClass().getName()) + { + @Override + public void run() + { + runInSendThread(); + } + }; + sendThread.setDaemon(true); + sendThread.start(); + } + } + } + + synchronized void close() + { + if (!sendRun) + return; + sendRun = false; + // just offer a new packt to wakeup thread in case it waits for + // a packet. + packetQueue.offer(new RawPacket(null, 0, 0)); } /** @@ -302,27 +348,25 @@ private void runInSendThread() { try { - while (true) + while (sendRun) { - RawPacket packet; + RawPacket packet = null; - synchronized (packetQueue) + while (true) { - while (packetQueue.size() < 1) + try { - try - { - packetQueue.wait(); - } - catch (InterruptedException iex) - { - } + packet = packetQueue.take(); + break; + } + catch (InterruptedException iex) + { + continue; } - - packet = packetQueue.remove(0); - packetQueue.notifyAll(); } - + if (!sendRun) + break; + long time = System.nanoTime(); long millisRemainingTime = time - millisStartTime; @@ -335,21 +379,13 @@ private void runInSendThread() else if ((maxPackets > 0) && (packetsSentInMillis >= maxPackets)) { - while (true) + while (true) { - millisRemainingTime - = System.nanoTime() - millisStartTime; + millisRemainingTime = System.nanoTime() + - millisStartTime; if (millisRemainingTime >= perNanos) break; - try - { - Thread.sleep( - millisRemainingTime / 1000000, - (int) (millisRemainingTime % 1000000)); - } - catch (InterruptedException iex) - { - } + LockSupport.parkNanos(millisRemainingTime); } millisStartTime = System.nanoTime(); packetsSentInMillis = 0; @@ -361,6 +397,7 @@ else if ((maxPackets > 0) } finally { + packetQueue.clear(); synchronized (packetQueue) { if (Thread.currentThread().equals(sendThread)) @@ -408,38 +445,17 @@ public void setMaxPacketsPerMillis(int maxPackets, long perMillis) */ public void write(RawPacket packet) { - synchronized (packetQueue) + while (true) { - while (packetQueue.size() - >= MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY) + try { - try - { - packetQueue.wait(); - } - catch (InterruptedException iex) - { - } + packetQueue.put(packet); + break; } - - packetQueue.add(packet); - - if (sendThread == null) + catch (InterruptedException iex) { - sendThread - = new Thread(getClass().getName()) - { - @Override - public void run() - { - runInSendThread(); - } - }; - sendThread.setDaemon(true); - sendThread.start(); + continue; } - - packetQueue.notifyAll(); } } }