import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class TestArrayBlockingQueuePerf { private static final int CHANNEL_NUM = 8; private static final int CHANNEL_BUFFER_SIZE = 65536; public static void main(String[] args) throws InterruptedException { List<BlockingQueue<Integer>> channels = new ArrayList<>(CHANNEL_NUM); channels.add(new ArrayBlockingQueue<>(CHANNEL_BUFFER_SIZE)); for (int i = 1; i < CHANNEL_NUM; i++) { channels.add(new ArrayBlockingQueue<>(CHANNEL_BUFFER_SIZE)); new Thread(new Adder(i, channels.get(i - 1), channels.get(i))).start(); } long startTime = System.currentTimeMillis(); Thread threadEnder = new Thread(new Ender(channels.get(CHANNEL_NUM - 1))); threadEnder.start(); BlockingQueue<Integer> initChannel = channels.get(0); for (int x = 0; x < 10000000; x++) { initChannel.put(x); } initChannel.put(-1); threadEnder.join(); System.out.println((System.currentTimeMillis() - startTime) / 1000.0); } static class Adder implements Runnable { private final int no; private final BlockingQueue<Integer> inputQueue; private final BlockingQueue<Integer> outputQueue; public Adder(int no, BlockingQueue<Integer> inputQueue, BlockingQueue<Integer> outputQueue) { this.no = no; this.inputQueue = inputQueue; this.outputQueue = outputQueue; } @Override public void run() { Integer x; try { do { x = inputQueue.take(); outputQueue.put(x); } while (x != -1); } catch (InterruptedException ex) { throw new RuntimeException(ex); } System.out.println("Adder " + no + " exited"); } } static class Ender implements Runnable { private final BlockingQueue<Integer> inputQueue; public Ender(BlockingQueue<Integer> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { long sum = 0; Integer x; try { while (true) { x = inputQueue.take(); if (x != -1) { sum += x; } else { break; } } } catch (InterruptedException ex) { throw new RuntimeException(ex); } System.out.println("Ender exited, sum = " + sum); } } }