Skip to content

Instantly share code, notes, and snippets.

@dpsoft
Created November 29, 2018 17:26
Show Gist options
  • Save dpsoft/be817541b9d4132f3f4ff36949b90b3c to your computer and use it in GitHub Desktop.
Save dpsoft/be817541b9d4132f3f4ff36949b90b3c to your computer and use it in GitHub Desktop.
Smart Batching
package smart;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
public class FileTransport implements Transport {
private final RandomAccessFile file;
public FileTransport(String fileName) throws FileNotFoundException {
this.file = new RandomAccessFile(new File(fileName), "rw");
}
public void send(ByteBuffer buffer) throws Exception {
file.write(buffer.array());
}
public int maxPacketSize() {
return 4096;
}
}
package smart;
import java.util.concurrent.locks.LockSupport;
public interface IdleStrategy {
void idle();
enum ParkStrategy implements IdleStrategy {
DEFAULT(100);
final long sleepPeriodNs;
ParkStrategy(long sleepPeriodNs) {
this.sleepPeriodNs = sleepPeriodNs;
}
public void idle() {
LockSupport.parkNanos(sleepPeriodNs);
}
}
}
package smart;
import org.jctools.queues.MpscArrayQueue;
import java.io.FileNotFoundException;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String... args) throws FileNotFoundException {
final Scanner s = new Scanner(System.in);
final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
final ExecutorService producerExecutor = Executors.newFixedThreadPool(50);
final Transport transport = new FileTransport("awesome-file");
final MpscArrayQueue<Message> queue = new MpscArrayQueue<Message>(100000);
final IdleStrategy idleStrategy = IdleStrategy.ParkStrategy.DEFAULT;
final MessageBatcher messageBatcher = new MessageBatcher(transport, queue, idleStrategy);
consumerExecutor.submit(messageBatcher);
producerExecutor.submit(new Producer(queue));
producerExecutor.submit(new Producer(queue));
producerExecutor.submit(new Producer(queue));
producerExecutor.submit(new Producer(queue));
producerExecutor.submit(new Producer(queue));
System.out.println("Press enter to continue.....");
s.nextLine();
consumerExecutor.shutdownNow();
producerExecutor.shutdownNow();
}
static class Producer implements Runnable {
private MpscArrayQueue<Message> queue;
Producer(MpscArrayQueue<Message> queue) {
this.queue = queue;
}
public void run() {
for(int i=0; i < 100000000; i ++) {
queue.offer(new Message("Hello: " + i));
}
}
}
}
package smart;
public final class Message {
private String message;
public Message(String message) {
this.message = message;
}
public int size() {
return message.length();
}
public byte[] getBytes() {
return message.getBytes();
}
}
package smart;
import org.jctools.queues.MpscArrayQueue;
import java.nio.ByteBuffer;
public final class MessageBatcher implements Runnable {
private final MpscArrayQueue<Message> queue;
private final ByteBuffer buffer;
private final Transport transport;
private final IdleStrategy waitIdleStrategy;
public MessageBatcher(final Transport transport, final MpscArrayQueue<Message> queue, final IdleStrategy waitStrategy) {
this.transport = transport;
this.buffer = ByteBuffer.allocate(transport.maxPacketSize());
this.queue = queue;
this.waitIdleStrategy = waitStrategy;
}
public void run() {
while (!Thread.currentThread().isInterrupted()) {
while(queue.peek() == null) {
waitIdleStrategy.idle(); //sleep for a given period when idle.
}
Message message;
while((message = queue.poll()) != null) {
if (message.size() > buffer.remaining()){ sendBuffer();}
buffer.put(message.getBytes());
}
sendBuffer();
}
}
private void sendBuffer(){
buffer.flip();
try {
transport.send(buffer);
} catch (Exception e) {
e.printStackTrace();
}
buffer.clear();
}
}
package smart;
import java.nio.ByteBuffer;
public interface Transport {
void send(ByteBuffer buffer) throws Exception;
int maxPacketSize();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment