import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestArrayBlockingQueuePerf {

    private static final int CHANNEL_NUM = 8;
    private static final int CHANNEL_BUFFER_SIZE = 65536;

    public static void main(String[] args) throws InterruptedException {
        List<BlockingQueue<Integer>> channels = new ArrayList<>(CHANNEL_NUM);
        channels.add(new ArrayBlockingQueue<>(CHANNEL_BUFFER_SIZE));
        for (int i = 1; i < CHANNEL_NUM; i++) {
            channels.add(new ArrayBlockingQueue<>(CHANNEL_BUFFER_SIZE));
            new Thread(new Adder(i, channels.get(i - 1), channels.get(i))).start();
        }
        long startTime = System.currentTimeMillis();
        Thread threadEnder = new Thread(new Ender(channels.get(CHANNEL_NUM - 1)));
        threadEnder.start();
        BlockingQueue<Integer> initChannel = channels.get(0);
        for (int x = 0; x < 10000000; x++) {
            initChannel.put(x);
        }
        initChannel.put(-1);
        threadEnder.join();
        System.out.println((System.currentTimeMillis() - startTime) / 1000.0);
    }

    static class Adder implements Runnable {
        private final int no;
        private final BlockingQueue<Integer> inputQueue;
        private final BlockingQueue<Integer> outputQueue;

        public Adder(int no, BlockingQueue<Integer> inputQueue, BlockingQueue<Integer> outputQueue) {
            this.no = no;
            this.inputQueue = inputQueue;
            this.outputQueue = outputQueue;
        }

        @Override
        public void run() {
            Integer x;
            try {
                do {
                    x = inputQueue.take();
                    outputQueue.put(x);
                } while (x != -1);
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
            System.out.println("Adder " + no + " exited");
        }
    }

    static class Ender implements Runnable {
        private final BlockingQueue<Integer> inputQueue;

        public Ender(BlockingQueue<Integer> inputQueue) {
            this.inputQueue = inputQueue;
        }

        @Override
        public void run() {
            long sum = 0;
            Integer x;
            try {
                while (true) {
                    x = inputQueue.take();
                    if (x != -1) {
                        sum += x;
                    } else {
                        break;
                    }
                }
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
            System.out.println("Ender exited, sum = " + sum);
        }
    }
}