Created
November 29, 2018 17:26
-
-
Save dpsoft/be817541b9d4132f3f4ff36949b90b3c to your computer and use it in GitHub Desktop.
Smart Batching
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package smart; | |
import java.io.File; | |
import java.io.FileNotFoundException; | |
import java.io.RandomAccessFile; | |
import java.nio.ByteBuffer; | |
public class FileTransport implements Transport { | |
private final RandomAccessFile file; | |
public FileTransport(String fileName) throws FileNotFoundException { | |
this.file = new RandomAccessFile(new File(fileName), "rw"); | |
} | |
public void send(ByteBuffer buffer) throws Exception { | |
file.write(buffer.array()); | |
} | |
public int maxPacketSize() { | |
return 4096; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package smart; | |
import java.util.concurrent.locks.LockSupport; | |
public interface IdleStrategy { | |
void idle(); | |
enum ParkStrategy implements IdleStrategy { | |
DEFAULT(100); | |
final long sleepPeriodNs; | |
ParkStrategy(long sleepPeriodNs) { | |
this.sleepPeriodNs = sleepPeriodNs; | |
} | |
public void idle() { | |
LockSupport.parkNanos(sleepPeriodNs); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package smart; | |
import org.jctools.queues.MpscArrayQueue; | |
import java.io.FileNotFoundException; | |
import java.util.Scanner; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
public class Main { | |
public static void main(String... args) throws FileNotFoundException { | |
final Scanner s = new Scanner(System.in); | |
final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(); | |
final ExecutorService producerExecutor = Executors.newFixedThreadPool(50); | |
final Transport transport = new FileTransport("awesome-file"); | |
final MpscArrayQueue<Message> queue = new MpscArrayQueue<Message>(100000); | |
final IdleStrategy idleStrategy = IdleStrategy.ParkStrategy.DEFAULT; | |
final MessageBatcher messageBatcher = new MessageBatcher(transport, queue, idleStrategy); | |
consumerExecutor.submit(messageBatcher); | |
producerExecutor.submit(new Producer(queue)); | |
producerExecutor.submit(new Producer(queue)); | |
producerExecutor.submit(new Producer(queue)); | |
producerExecutor.submit(new Producer(queue)); | |
producerExecutor.submit(new Producer(queue)); | |
System.out.println("Press enter to continue....."); | |
s.nextLine(); | |
consumerExecutor.shutdownNow(); | |
producerExecutor.shutdownNow(); | |
} | |
static class Producer implements Runnable { | |
private MpscArrayQueue<Message> queue; | |
Producer(MpscArrayQueue<Message> queue) { | |
this.queue = queue; | |
} | |
public void run() { | |
for(int i=0; i < 100000000; i ++) { | |
queue.offer(new Message("Hello: " + i)); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package smart; | |
public final class Message { | |
private String message; | |
public Message(String message) { | |
this.message = message; | |
} | |
public int size() { | |
return message.length(); | |
} | |
public byte[] getBytes() { | |
return message.getBytes(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package smart; | |
import org.jctools.queues.MpscArrayQueue; | |
import java.nio.ByteBuffer; | |
public final class MessageBatcher implements Runnable { | |
private final MpscArrayQueue<Message> queue; | |
private final ByteBuffer buffer; | |
private final Transport transport; | |
private final IdleStrategy waitIdleStrategy; | |
public MessageBatcher(final Transport transport, final MpscArrayQueue<Message> queue, final IdleStrategy waitStrategy) { | |
this.transport = transport; | |
this.buffer = ByteBuffer.allocate(transport.maxPacketSize()); | |
this.queue = queue; | |
this.waitIdleStrategy = waitStrategy; | |
} | |
public void run() { | |
while (!Thread.currentThread().isInterrupted()) { | |
while(queue.peek() == null) { | |
waitIdleStrategy.idle(); //sleep for a given period when idle. | |
} | |
Message message; | |
while((message = queue.poll()) != null) { | |
if (message.size() > buffer.remaining()){ sendBuffer();} | |
buffer.put(message.getBytes()); | |
} | |
sendBuffer(); | |
} | |
} | |
private void sendBuffer(){ | |
buffer.flip(); | |
try { | |
transport.send(buffer); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
buffer.clear(); | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package smart; | |
import java.nio.ByteBuffer; | |
public interface Transport { | |
void send(ByteBuffer buffer) throws Exception; | |
int maxPacketSize(); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment