Skip to content

Instantly share code, notes, and snippets.

@oNddleo
Created March 10, 2025 09:48
Show Gist options
  • Save oNddleo/4a0287178d983f81310023e39e7b3dfd to your computer and use it in GitHub Desktop.
Save oNddleo/4a0287178d983f81310023e39e7b3dfd to your computer and use it in GitHub Desktop.
Compare java and go processing

Thread and Worker Models for Kafka Event Processing

I'll implement worker/thread models for processing the Kafka order events in Java and compare with another language (Go), including pros and cons of each approach.

Java Implementation: Thread Pool Worker Model

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class OrderProcessingService {
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService executorService;
    private final int workerCount;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final OrderEventProcessor processor;
    
    public OrderProcessingService(String bootstrapServers, 
                                  String groupId, 
                                  String topic,
                                  int workerCount) {
        this.workerCount = workerCount;
        
        // Configure Kafka consumer
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        
        this.consumer = new KafkaConsumer<>(props);
        this.consumer.subscribe(Collections.singletonList(topic));
        
        // Create thread pool for processing
        this.executorService = Executors.newFixedThreadPool(workerCount);
        this.processor = new OrderEventProcessor();
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
    }
    
    public void start() {
        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                if (!records.isEmpty()) {
                    // Submit batch to thread pool
                    executorService.submit(() -> processRecords(records));
                }
            }
        } catch (WakeupException e) {
            // Ignore, this is expected during shutdown
        } finally {
            consumer.close();
            shutdown();
        }
    }
    
    private void processRecords(ConsumerRecords<String, String> records) {
        try {
            for (ConsumerRecord<String, String> record : records) {
                processor.processEvent(record.value());
            }
            // Commit offsets after successful processing
            consumer.commitSync();
        } catch (Exception e) {
            System.err.println("Error processing batch: " + e.getMessage());
            // Error handling strategy (retry, dead letter queue, etc.)
        }
    }
    
    public void shutdown() {
        if (running.getAndSet(false)) {
            consumer.wakeup();
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) {
        OrderProcessingService service = new OrderProcessingService(
            "localhost:9092",
            "order-processing-group",
            "trading.orders.created",
            10 // Number of worker threads
        );
        service.start();
    }
}

Java Implementation: Work Queue Pattern

import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class OrderWorkQueueProcessor {
    private final KafkaConsumer<String, String> consumer;
    private final BlockingQueue<String> workQueue;
    private final Thread consumerThread;
    private final Thread[] workerThreads;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final OrderEventProcessor processor;
    
    public OrderWorkQueueProcessor(String bootstrapServers,
                                  String groupId,
                                  String topic,
                                  int workerCount,
                                  int queueCapacity) {
        // Configure Kafka consumer
        Properties props = new Properties();
        // ... same Kafka setup as before
        
        this.consumer = new KafkaConsumer<>(props);
        this.consumer.subscribe(Collections.singletonList(topic));
        
        // Create work queue and processor
        this.workQueue = new LinkedBlockingQueue<>(queueCapacity);
        this.processor = new OrderEventProcessor();
        
        // Create consumer thread
        this.consumerThread = new Thread(this::consumeMessages);
        
        // Create worker threads
        this.workerThreads = new Thread[workerCount];
        for (int i = 0; i < workerCount; i++) {
            workerThreads[i] = new Thread(this::processMessages);
        }
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
    }
    
    public void start() {
        consumerThread.start();
        
        for (Thread workerThread : workerThreads) {
            workerThread.start();
        }
    }
    
    private void consumeMessages() {
        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    // Put messages in work queue
                    workQueue.put(record.value());
                }
                
                // Only commit once messages are in the queue
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
            }
        } catch (InterruptedException | WakeupException e) {
            // Expected during shutdown
        } finally {
            consumer.close();
        }
    }
    
    private void processMessages() {
        try {
            while (running.get()) {
                String eventJson = workQueue.poll(100, TimeUnit.MILLISECONDS);
                if (eventJson != null) {
                    processor.processEvent(eventJson);
                }
            }
        } catch (InterruptedException e) {
            // Expected during shutdown
            Thread.currentThread().interrupt();
        }
    }
    
    public void shutdown() {
        if (running.getAndSet(false)) {
            consumer.wakeup();
            consumerThread.interrupt();
            
            for (Thread workerThread : workerThreads) {
                workerThread.interrupt();
            }
            
            try {
                consumerThread.join(5000);
                
                for (Thread workerThread : workerThreads) {
                    workerThread.join(5000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Go Implementation: Goroutine-Based Workers

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

type OrderEvent struct {
	EventType  string                 `json:"eventType"`
	Version    string                 `json:"version"`
	Timestamp  string                 `json:"timestamp"`
	OrderId    string                 `json:"orderId"`
	ProducerId string                 `json:"producerId"`
	Payload    map[string]interface{} `json:"payload"`
	Metadata   map[string]interface{} `json:"metadata"`
	Hash       string                 `json:"hash"`
}

func main() {
	// Set up Kafka consumer
	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":  "localhost:9092",
		"group.id":           "order-processing-group",
		"auto.offset.reset":  "earliest",
		"enable.auto.commit": "false",
	})

	if err != nil {
		log.Fatalf("Failed to create consumer: %s", err)
	}
	defer consumer.Close()

	// Subscribe to topic
	err = consumer.SubscribeTopics([]string{"trading.orders.created"}, nil)
	if err != nil {
		log.Fatalf("Failed to subscribe: %s", err)
	}

	// Set up channel for received messages
	messageCh := make(chan *kafka.Message, 100)
	
	// Context for graceful shutdown
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	
	// Set up signal handling
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	
	// WaitGroup for tracking active goroutines
	var wg sync.WaitGroup
	
	// Start consumer goroutine
	wg.Add(1)
	go func() {
		defer wg.Done()
		consumeMessages(ctx, consumer, messageCh)
	}()
	
	// Start worker goroutines
	numWorkers := 10
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func(workerId int) {
			defer wg.Done()
			processMessages(ctx, workerId, messageCh, consumer)
		}(i)
	}
	
	// Wait for termination signal
	<-sigCh
	fmt.Println("Received termination signal, shutting down...")
	cancel()
	
	// Wait for goroutines to finish
	wg.Wait()
	close(messageCh)
	fmt.Println("Shutdown complete")
}

func consumeMessages(ctx context.Context, consumer *kafka.Consumer, messageCh chan<- *kafka.Message) {
	for {
		select {
		case <-ctx.Done():
			return
		default:
			ev := consumer.Poll(100)
			if ev == nil {
				continue
			}

			switch e := ev.(type) {
			case *kafka.Message:
				// Send message to processing channel
				messageCh <- e
			case kafka.Error:
				log.Printf("Kafka error: %v", e)
			}
		}
	}
}

func processMessages(ctx context.Context, workerId int, messageCh <-chan *kafka.Message, consumer *kafka.Consumer) {
	for {
		select {
		case <-ctx.Done():
			return
		case msg, ok := <-messageCh:
			if !ok {
				return // Channel closed
			}
			
			// Process message
			err := processOrderEvent(msg.Value)
			if err != nil {
				log.Printf("Worker %d error: %v", workerId, err)
				continue
			}
			
			// Commit offset after successful processing
			_, err = consumer.CommitMessage(msg)
			if err != nil {
				log.Printf("Failed to commit offset: %v", err)
			}
		}
	}
}

func processOrderEvent(data []byte) error {
	var event OrderEvent
	err := json.Unmarshal(data, &event)
	if err != nil {
		return fmt.Errorf("failed to unmarshal event: %w", err)
	}
	
	// Verify event hash
	// ... hash verification code ...
	
	// Process based on event type
	switch event.EventType {
	case "ORDER_CREATED":
		return processOrderCreated(event)
	case "ORDER_UPDATED":
		return processOrderUpdated(event)
	default:
		return fmt.Errorf("unknown event type: %s", event.EventType)
	}
}

func processOrderCreated(event OrderEvent) error {
	// Processing logic for order creation
	time.Sleep(50 * time.Millisecond) // Simulate work
	log.Printf("Processed ORDER_CREATED: %s", event.OrderId)
	return nil
}

func processOrderUpdated(event OrderEvent) error {
	// Processing logic for order updates
	time.Sleep(30 * time.Millisecond) // Simulate work
	log.Printf("Processed ORDER_UPDATED: %s", event.OrderId)
	return nil
}

Comparing Java and Go Approaches

Java Pros:

  1. Mature Ecosystem: Rich libraries and extensive Kafka client support
  2. Robust Threading Model: ExecutorService provides sophisticated thread management
  3. Memory Management: JVM memory management optimized for long-running services
  4. Enterprise Adoption: Widely used in financial services for trading systems

Java Cons:

  1. Resource Overhead: Higher memory footprint compared to Go
  2. Thread Management Complexity: Requires careful thread and resource management
  3. Startup Time: Slower startup compared to Go
  4. More Verbose: Requires more boilerplate code

Go Pros:

  1. Lightweight Concurrency: Goroutines are more resource-efficient than Java threads
  2. Simple Concurrency Patterns: Channels and goroutines provide elegant message passing
  3. Lower Resource Usage: Smaller memory footprint and faster startup
  4. Built-in Concurrency: Language-level support for concurrent programming

Go Cons:

  1. Less Mature Kafka Ecosystem: Fewer client libraries and features
  2. Error Handling: Less sophisticated exception handling than Java
  3. GC Pauses: Go's GC can introduce latency spikes in high-throughput systems
  4. Less Industry Adoption: Less commonly used for critical financial systems

Key Differences in Implementation

  1. Concurrency Model

    • Java: Thread pools with explicit synchronization
    • Go: Lightweight goroutines with channel-based communication
  2. Error Handling

    • Java: Try-catch blocks with detailed exception handling
    • Go: Error return values with explicit checking
  3. Resource Management

    • Java: Explicit resource cleanup with try-finally blocks
    • Go: Defer statements for cleanup
  4. Message Passing

    • Java: Shared data structures with locks/synchronized blocks
    • Go: Channels for communication between goroutines

Recommendations Based on Requirements

Choose Java When:

  • Enterprise integration is critical
  • You need robust transaction support
  • Your team has JVM expertise
  • You require integration with existing Java-based systems
  • High throughput with predictable latency is needed

Choose Go When:

  • Resource efficiency is paramount
  • You need faster startup time
  • Simple deployment is important
  • Your trading system requires lower latency
  • You're building microservices with simpler scope

Additional Architecture Considerations

For a production trading system, consider:

  1. Back-pressure Mechanisms: Both implementations need backpressure handling to prevent overwhelming downstream systems during high volume

  2. Dead Letter Queue: Add error handling that forwards unprocessable messages to a DLQ

  3. Circuit Breaking: Implement circuit breakers to prevent cascading failures

  4. Distributed Tracing: Add tracing for observability in a distributed system

  5. Metrics & Monitoring: Include detailed metrics on message processing time, queue depths, and error rates

Would you like me to elaborate on any particular aspect of these implementations or architecture patterns?​​​​​​​​​​​​​​​​

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment