I'll implement worker/thread models for processing the Kafka order events in Java and compare with another language (Go), including pros and cons of each approach.
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class OrderProcessingService {
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executorService;
private final int workerCount;
private final AtomicBoolean running = new AtomicBoolean(true);
private final OrderEventProcessor processor;
public OrderProcessingService(String bootstrapServers,
String groupId,
String topic,
int workerCount) {
this.workerCount = workerCount;
// Configure Kafka consumer
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
this.consumer = new KafkaConsumer<>(props);
this.consumer.subscribe(Collections.singletonList(topic));
// Create thread pool for processing
this.executorService = Executors.newFixedThreadPool(workerCount);
this.processor = new OrderEventProcessor();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
public void start() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
// Submit batch to thread pool
executorService.submit(() -> processRecords(records));
}
}
} catch (WakeupException e) {
// Ignore, this is expected during shutdown
} finally {
consumer.close();
shutdown();
}
}
private void processRecords(ConsumerRecords<String, String> records) {
try {
for (ConsumerRecord<String, String> record : records) {
processor.processEvent(record.value());
}
// Commit offsets after successful processing
consumer.commitSync();
} catch (Exception e) {
System.err.println("Error processing batch: " + e.getMessage());
// Error handling strategy (retry, dead letter queue, etc.)
}
}
public void shutdown() {
if (running.getAndSet(false)) {
consumer.wakeup();
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
OrderProcessingService service = new OrderProcessingService(
"localhost:9092",
"order-processing-group",
"trading.orders.created",
10 // Number of worker threads
);
service.start();
}
}
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class OrderWorkQueueProcessor {
private final KafkaConsumer<String, String> consumer;
private final BlockingQueue<String> workQueue;
private final Thread consumerThread;
private final Thread[] workerThreads;
private final AtomicBoolean running = new AtomicBoolean(true);
private final OrderEventProcessor processor;
public OrderWorkQueueProcessor(String bootstrapServers,
String groupId,
String topic,
int workerCount,
int queueCapacity) {
// Configure Kafka consumer
Properties props = new Properties();
// ... same Kafka setup as before
this.consumer = new KafkaConsumer<>(props);
this.consumer.subscribe(Collections.singletonList(topic));
// Create work queue and processor
this.workQueue = new LinkedBlockingQueue<>(queueCapacity);
this.processor = new OrderEventProcessor();
// Create consumer thread
this.consumerThread = new Thread(this::consumeMessages);
// Create worker threads
this.workerThreads = new Thread[workerCount];
for (int i = 0; i < workerCount; i++) {
workerThreads[i] = new Thread(this::processMessages);
}
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
public void start() {
consumerThread.start();
for (Thread workerThread : workerThreads) {
workerThread.start();
}
}
private void consumeMessages() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Put messages in work queue
workQueue.put(record.value());
}
// Only commit once messages are in the queue
if (!records.isEmpty()) {
consumer.commitSync();
}
}
} catch (InterruptedException | WakeupException e) {
// Expected during shutdown
} finally {
consumer.close();
}
}
private void processMessages() {
try {
while (running.get()) {
String eventJson = workQueue.poll(100, TimeUnit.MILLISECONDS);
if (eventJson != null) {
processor.processEvent(eventJson);
}
}
} catch (InterruptedException e) {
// Expected during shutdown
Thread.currentThread().interrupt();
}
}
public void shutdown() {
if (running.getAndSet(false)) {
consumer.wakeup();
consumerThread.interrupt();
for (Thread workerThread : workerThreads) {
workerThread.interrupt();
}
try {
consumerThread.join(5000);
for (Thread workerThread : workerThreads) {
workerThread.join(5000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
type OrderEvent struct {
EventType string `json:"eventType"`
Version string `json:"version"`
Timestamp string `json:"timestamp"`
OrderId string `json:"orderId"`
ProducerId string `json:"producerId"`
Payload map[string]interface{} `json:"payload"`
Metadata map[string]interface{} `json:"metadata"`
Hash string `json:"hash"`
}
func main() {
// Set up Kafka consumer
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "order-processing-group",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
})
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
defer consumer.Close()
// Subscribe to topic
err = consumer.SubscribeTopics([]string{"trading.orders.created"}, nil)
if err != nil {
log.Fatalf("Failed to subscribe: %s", err)
}
// Set up channel for received messages
messageCh := make(chan *kafka.Message, 100)
// Context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set up signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// WaitGroup for tracking active goroutines
var wg sync.WaitGroup
// Start consumer goroutine
wg.Add(1)
go func() {
defer wg.Done()
consumeMessages(ctx, consumer, messageCh)
}()
// Start worker goroutines
numWorkers := 10
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerId int) {
defer wg.Done()
processMessages(ctx, workerId, messageCh, consumer)
}(i)
}
// Wait for termination signal
<-sigCh
fmt.Println("Received termination signal, shutting down...")
cancel()
// Wait for goroutines to finish
wg.Wait()
close(messageCh)
fmt.Println("Shutdown complete")
}
func consumeMessages(ctx context.Context, consumer *kafka.Consumer, messageCh chan<- *kafka.Message) {
for {
select {
case <-ctx.Done():
return
default:
ev := consumer.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
// Send message to processing channel
messageCh <- e
case kafka.Error:
log.Printf("Kafka error: %v", e)
}
}
}
}
func processMessages(ctx context.Context, workerId int, messageCh <-chan *kafka.Message, consumer *kafka.Consumer) {
for {
select {
case <-ctx.Done():
return
case msg, ok := <-messageCh:
if !ok {
return // Channel closed
}
// Process message
err := processOrderEvent(msg.Value)
if err != nil {
log.Printf("Worker %d error: %v", workerId, err)
continue
}
// Commit offset after successful processing
_, err = consumer.CommitMessage(msg)
if err != nil {
log.Printf("Failed to commit offset: %v", err)
}
}
}
}
func processOrderEvent(data []byte) error {
var event OrderEvent
err := json.Unmarshal(data, &event)
if err != nil {
return fmt.Errorf("failed to unmarshal event: %w", err)
}
// Verify event hash
// ... hash verification code ...
// Process based on event type
switch event.EventType {
case "ORDER_CREATED":
return processOrderCreated(event)
case "ORDER_UPDATED":
return processOrderUpdated(event)
default:
return fmt.Errorf("unknown event type: %s", event.EventType)
}
}
func processOrderCreated(event OrderEvent) error {
// Processing logic for order creation
time.Sleep(50 * time.Millisecond) // Simulate work
log.Printf("Processed ORDER_CREATED: %s", event.OrderId)
return nil
}
func processOrderUpdated(event OrderEvent) error {
// Processing logic for order updates
time.Sleep(30 * time.Millisecond) // Simulate work
log.Printf("Processed ORDER_UPDATED: %s", event.OrderId)
return nil
}
- Mature Ecosystem: Rich libraries and extensive Kafka client support
- Robust Threading Model: ExecutorService provides sophisticated thread management
- Memory Management: JVM memory management optimized for long-running services
- Enterprise Adoption: Widely used in financial services for trading systems
- Resource Overhead: Higher memory footprint compared to Go
- Thread Management Complexity: Requires careful thread and resource management
- Startup Time: Slower startup compared to Go
- More Verbose: Requires more boilerplate code
- Lightweight Concurrency: Goroutines are more resource-efficient than Java threads
- Simple Concurrency Patterns: Channels and goroutines provide elegant message passing
- Lower Resource Usage: Smaller memory footprint and faster startup
- Built-in Concurrency: Language-level support for concurrent programming
- Less Mature Kafka Ecosystem: Fewer client libraries and features
- Error Handling: Less sophisticated exception handling than Java
- GC Pauses: Go's GC can introduce latency spikes in high-throughput systems
- Less Industry Adoption: Less commonly used for critical financial systems
-
Concurrency Model
- Java: Thread pools with explicit synchronization
- Go: Lightweight goroutines with channel-based communication
-
Error Handling
- Java: Try-catch blocks with detailed exception handling
- Go: Error return values with explicit checking
-
Resource Management
- Java: Explicit resource cleanup with try-finally blocks
- Go: Defer statements for cleanup
-
Message Passing
- Java: Shared data structures with locks/synchronized blocks
- Go: Channels for communication between goroutines
- Enterprise integration is critical
- You need robust transaction support
- Your team has JVM expertise
- You require integration with existing Java-based systems
- High throughput with predictable latency is needed
- Resource efficiency is paramount
- You need faster startup time
- Simple deployment is important
- Your trading system requires lower latency
- You're building microservices with simpler scope
For a production trading system, consider:
-
Back-pressure Mechanisms: Both implementations need backpressure handling to prevent overwhelming downstream systems during high volume
-
Dead Letter Queue: Add error handling that forwards unprocessable messages to a DLQ
-
Circuit Breaking: Implement circuit breakers to prevent cascading failures
-
Distributed Tracing: Add tracing for observability in a distributed system
-
Metrics & Monitoring: Include detailed metrics on message processing time, queue depths, and error rates
Would you like me to elaborate on any particular aspect of these implementations or architecture patterns?