Skip to content

Instantly share code, notes, and snippets.

@oNddleo
Created March 14, 2025 10:03
Show Gist options
  • Save oNddleo/7a44cb2857d6d68a8951cb97127d228a to your computer and use it in GitHub Desktop.
Save oNddleo/7a44cb2857d6d68a8951cb97127d228a to your computer and use it in GitHub Desktop.
Design system OMS

Technical summary Ultra Scale Kafka system

Core

  1. Apache Kafka

    • Usage: Message broker infrastructure, topic management, partition replication
    • Key Components: Kafka Streams API for processing, AdminClient for topic management
  2. Apache Avro

    • Usage: Schema definition, binary serialization/deserialization
    • Key Classes: GenericRecord, GenericDatumReader, GenericDatumWriter
  3. LZ4 Compression

    • Usage: High-speed compression for binary messages
    • Performance: Optimized for compression/decompression speed over ratio
  4. Micrometer Metrics

    • Usage: Performance monitoring, throughput tracking, latency measurement
    • Integration: Prometheus reporter for metrics visualization

Java Concurrency

  1. ThreadPoolExecutor

    • Usage: Managed thread pools for parallel message processing
    • Configuration: CallerRunsPolicy for backpressure
  2. BlockingQueue Implementations

    • Types: ArrayBlockingQueue, LinkedBlockingQueue
    • Usage: Bounded message buffers with backpressure handling
  3. Atomic Variables

    • Classes: AtomicLong, AtomicInteger, AtomicBoolean
    • Usage: Thread-safe counters and state flags without locking
  4. CountDownLatch

    • Usage: Synchronization for batch processing coordination

Binary Protocol

  1. Custom Binary Header Format

    • Structure: 8-byte header (version, type, priority, flags, timestamp)
    • Optimization: Minimal overhead for message routing
  2. ByteBuffer

    • Usage: Zero-copy buffer management for binary data
    • Optimization: Direct buffers for native I/O
  3. Binary Serialization Pipeline

    • Custom Classes: BinaryMessageBuilder, MessageMetadata
    • Features: Schema version encoding, compression flags

Kafka Client Configuration

  1. Producer Optimizations

    • Batching: Configured for batches up to 64KB
    • Linger Time: 5ms for optimal batching
    • Buffer Memory: 128MB per producer
  2. Consumer Optimizations

    • Fetch Size: Configured for 1MB per partition
    • Max Poll Records: 10,000 records per poll
    • Auto Commit: Disabled for manual offset management
  3. Exactly-Once Semantics

    • Transaction Support: Enabled with unique transaction IDs
    • Isolation Level: read_committed for consumers

Resource Management Techniques

  1. Circuit Breaker Pattern

    • Implementation: AdaptiveCircuitBreaker with dynamic thresholds
    • State Management: Failure rate tracking with reset mechanism
  2. Backpressure Mechanisms

    • Producer: Buffer limits and blocking queue
    • Consumer: CallerRunsPolicy in thread pool executor
  3. Memory Management

    • Buffer Pools: Pre-allocated buffers for message handling
    • Bounded Queues: Preventing OOM during traffic spikes

Monitoring and Observability

  1. Prometheus Integration

    • Metrics Types: Counters, gauges, timers, histograms
    • Custom Metrics: Throughput, latency percentiles, backlog
  2. JVM Metrics Collection

    • Areas Monitored: Memory, GC, threads, classloading
    • Integration: Automatic binding to metrics registry

Kafka Streams Advanced Features

  1. KStream Processing

    • Topology: Source → Process → Branch → Sink
    • Stateless Operations: map, filter, branch by priority
  2. Dynamic Message Routing

    • Implementation: Dynamic topic selection based on message metadata
    • Optimization: Sticky partition assignment

Follow

Producer Thread Architecture and Rate Control

flowchart TD
classDef threadPool fill:#6366F1,color:white,stroke:#4F46E5,stroke-width:2px
classDef queueComponent fill:#F59E0B,color:white,stroke:#D97706,stroke-width:2px
classDef rateControl fill:#10B981,color:white,stroke:#059669,stroke-width:2px
classDef messageComponent fill:#EF4444,color:white,stroke:#DC2626,stroke-width:2px
classDef kafkaComponent fill:#3B82F6,color:white,stroke:#2563EB,stroke-width:2px
classDef metricComponent fill:#8B5CF6,color:white,stroke:#7C3AED,stroke-width:2px

%% Main Thread Pool Controller
TP[Thread Pool Executor<br/>Fixed Size = CPU Cores x 2]:::threadPool

%% Thread Pool Distribution
TP --> TP1[Producer Thread 1]
TP --> TP2[Producer Thread 2]
TP --> TP3[Producer Thread 3]
TP --> TP4[Producer Thread N]
TP1:::threadPool
TP2:::threadPool
TP3:::threadPool
TP4:::threadPool

%% Rate Control Components
RC[Central Rate Controller]:::rateControl
RateCalc[Rate Calculator<br/>Target = 50K msgs/sec/thread]:::rateControl
TokenBucket[Token Bucket Algorithm<br/>Refill Rate = Target Rate]:::rateControl
AdaptiveRate[Adaptive Rate Adjuster<br/>Based on Broker Feedback]:::rateControl

%% Rate Control Flow
RC --> RateCalc
RateCalc --> TokenBucket
MetricsFeedback --> AdaptiveRate
AdaptiveRate --> RateCalc

%% Per-Thread Message Generation
TP1 --> MG1[Message Generator]
TP2 --> MG2[Message Generator]
TP3 --> MG3[Message Generator]
TP4 --> MG4[Message Generator]
MG1:::messageComponent
MG2:::messageComponent
MG3:::messageComponent
MG4:::messageComponent

%% Message Generation Components per Thread
MG1 --> MGen1[Generate Binary Message]
MGen1 --> MEncode1[Encode with Avro]
MEncode1 --> MCompress1[Apply LZ4 Compression]
MCompress1 --> MPriority1[Assign Priority]
MPriority1 --> MBatch1[Add to Batch]
MGen1:::messageComponent
MEncode1:::messageComponent
MCompress1:::messageComponent
MPriority1:::messageComponent
MBatch1:::messageComponent

%% Rate Control Integration
TokenBucket --> MG1
TokenBucket --> MG2
TokenBucket --> MG3
TokenBucket --> MG4

%% Message Batching
MBatch1 --> SharedBatch[Batch Accumulator<br/>Target Size = 64KB]
MG2 --> SharedBatch
MG3 --> SharedBatch
MG4 --> SharedBatch
SharedBatch:::queueComponent

%% Batch Processing Components
SharedBatch --> BatchProcessor[Batch Processor<br/>Linger Time = 5ms]
BatchProcessor:::queueComponent
BatchProcessor --> PartitionSelector[Partition Selector]
PartitionSelector:::kafkaComponent

%% Producer API Integration
PartitionSelector --> KafkaProducer[Kafka Producer API]
KafkaProducer:::kafkaComponent
KafkaProducer --> KafkaBroker[Kafka Brokers]
KafkaBroker:::kafkaComponent

%% Metrics Collection
MetricsCollector[Metrics Collector]:::metricComponent
TP1 --> MetricsCollector
TP2 --> MetricsCollector
TP3 --> MetricsCollector
TP4 --> MetricsCollector
KafkaProducer --> MetricsCollector
MetricsCollector --> MetricsFeedback[Metrics Feedback Loop]
MetricsFeedback:::metricComponent

%% Detailed Rate Limiting Components
subgraph "Rate Limiting Details"
    direction TB
    RLT[Token Bucket]:::rateControl
    RLT --> RLT1[Current Token Count]
    RLT --> RLT2[Max Bucket Size]
    RLT --> RLT3[Refill Rate]
    RLT --> RLT4[Refill Interval]
    
    RLA[Adaptive Control]:::rateControl
    RLA --> RLA1[Broker Latency Feedback]
    RLA --> RLA2[Queue Time Monitoring]
    RLA --> RLA3[Error Rate Monitoring]
    RLA --> RLA4[Network Utilization]
end

%% Thread State Management
subgraph "Thread State Management"
    direction TB
    TSM[Thread State Manager]:::threadPool
    TSM --> TSM1[Active Thread Count]
    TSM --> TSM2[Thread Health Check]
    TSM --> TSM3[Stalled Thread Detection]
    TSM --> TSM4[Thread Replacement Policy]
end

%% Message Batch Composition
subgraph "Message Batch Structure"
    direction TB
    MBS[Message Batch]:::messageComponent
    MBS --> MBS1[Header 8 bytes]
    MBS --> MBS2[Message ID 16 bytes]
    MBS --> MBS3[Schema Version 1 byte]
    MBS --> MBS4[Priority 1 byte]
    MBS --> MBS5[Payload variable]
end
Loading

Design Class Structure

flowchart TD
subgraph "Message Creation"
A1[Client Request] --> A2[BinaryMessageBuilder]
A2 --> A3[Avro Serialization]
A3 --> A4[Binary Message\nwith Header]
end

subgraph "Producer Flow"
    B1[Producer Thread Pool] --> B2[Create Message Batch]
    B2 --> B3[Priority Assignment]
    B3 --> B4[Compression]
    B4 --> B5[Send to Kafka]
    B5 --> B6[Monitor Throughput]
end

subgraph "Kafka Cluster"
    C1[Input Topics\nwith Partitions] --> C2[Replication]
    C2 --> C3[Retention\nPolicies]
    C3 --> C4[Consumer\nGroups]
end

subgraph "Consumer Flow"
    D1[Consumer Poll] --> D2[Message Batching]
    D2 --> D3[Partition-Aware\nProcessing]
    D3 --> D4[Memory Buffer Queue]
    D4 --> D5[Circuit Breaker]
    D5 --> D6[Processor Thread Pool]
end

subgraph "Message Processing"
    E1[Extract Binary Header] --> E2[Priority Detection]
    E2 --> E3[Schema Resolution]
    E3 --> E4[Avro Deserialization]
    E4 --> E5[Business Logic\nProcessing]
    E5 --> E6[Re-encode for Output]
end

subgraph "Metrics & Monitoring"
    F1[Message Counters] --> F2[Throughput Calculation]
    F2 --> F3[Latency Tracking]
    F3 --> F4[Error Rates]
    F4 --> F5[JVM Metrics]
    F5 --> F6[Prometheus\nEndpoints]
end

subgraph "Error Handling"
    G1[Detect Error] --> G2[Record Failure]
    G2 --> G3[Circuit Breaker\nThreshold Check]
    G3 --> G4[Retry Logic]
    G4 --> G5[Error Topic\nPublication]
end

%% Connect the flows
A4 --> B1
B6 --> F1
B5 --> C1
C4 --> D1
D6 --> E1
E5 --> F1
E6 --> C1
E5 --> G1

%% Feedback loops
F6 -.-> B1
F6 -.-> D1
G3 -.-> D5

classDef producer fill:#22C55E,color:white
classDef kafka fill:#082f49,color:white
classDef consumer fill:#7c3aed,color:white
classDef processing fill:#ef4444,color:white
classDef metrics fill:#f59e0b,color:white
classDef errors fill:#dc2626,color:white

class A1,A2,A3,A4 producer
class B1,B2,B3,B4,B5,B6 producer
class C1,C2,C3,C4 kafka
class D1,D2,D3,D4,D5,D6 consumer
class E1,E2,E3,E4,E5,E6 processing
class F1,F2,F3,F4,F5,F6 metrics
class G1,G2,G3,G4,G5 errors
Loading

Detailed Design: Producer Thread Architecture & Message Rate Control

Producer Thread Architecture

The producer thread design follows a multi-layered approach for maximum throughput and controlled resource utilization:

Producer Thread Architecture: Detailed Components

1. Thread Pool Management

// From KafkaUltraScaleRunner.java
private static final int PRODUCER_THREADS = 16;

// Thread pool creation with fixed size
this.producerExecutor = Executors.newFixedThreadPool(PRODUCER_THREADS, 
    new ThreadFactory() {
        private int counter = 0;
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "producer-" + counter++);
            t.setPriority(Thread.NORM_PRIORITY + 1); // Slightly higher priority
            return t;
        }
    }
);

Key Design Elements:

  • Fixed Thread Count: Sized to CPU cores × 2 for optimal parallelism
  • Custom Thread Factory: Named threads for easier debugging and monitoring
  • Priority Management: Producer threads get slightly higher priority than standard threads
  • Lifecycle Management: Centralized control for graceful shutdown

2. Rate Control Mechanism

// From ProducerTask.java
private class RateController {
    private final AtomicLong tokenBucket = new AtomicLong();
    private final long maxTokens;
    private final long refillRate;
    private final long refillIntervalNanos;
    private long lastRefillTime;
    
    // Token bucket implementation
    public boolean tryAcquire() {
        long now = System.nanoTime();
        refillTokens(now);
        
        if (tokenBucket.get() > 0) {
            tokenBucket.decrementAndGet();
            return true;
        }
        return false;
    }
    
    private void refillTokens(long now) {
        long elapsedNanos = now - lastRefillTime;
        if (elapsedNanos > refillIntervalNanos) {
            long tokensToAdd = (elapsedNanos / refillIntervalNanos) * refillRate;
            long newTokenCount = Math.min(maxTokens, tokenBucket.get() + tokensToAdd);
            tokenBucket.set(newTokenCount);
            lastRefillTime = now;
        }
    }
}

Rate Control Features:

  • Token Bucket Algorithm: Controls message rate with burst capability
  • Configurable Rate: Default 50,000 messages/second per thread
  • Adaptive Rate Control: Adjusts based on broker feedback
  • Precise Timing: Nanosecond-level refill intervals

3. Message Generation Pipeline

// Message creation process in ProducerTask
private byte[] createBinaryMessage() throws IOException {
    // Step 1: Create Avro record with random data
    GenericRecord record = new GenericData.Record(schema);
    record.put("id", UUID.randomUUID().toString());
    record.put("timestamp", System.currentTimeMillis());
    record.put("priority", random.nextInt(10) + 1);
    
    // Step 2: Generate binary payload (100-5100 bytes)
    byte[] content = new byte[random.nextInt(5000) + 100];
    random.nextBytes(content);
    record.put("content", ByteBuffer.wrap(content));
    
    // Step 3: Add metadata
    Map<String, String> metadata = new HashMap<>();
    metadata.put("source", "kafka-test-app");
    metadata.put("threadId", String.valueOf(threadId));
    record.put("metadata", metadata);
    
    // Step 4: Create binary output stream for full message
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    
    // Step 5: Write binary header (8 bytes total)
    out.write(1);                         // Schema version (1 byte)
    out.write(random.nextInt(5) + 1);     // Message type (1 byte)
    out.write(random.nextInt(10) + 1);    // Priority (1 byte)
    out.write(random.nextInt(256));       // Flags (1 byte)
    
    // Step 6: Write timestamp as 4 bytes (seconds since epoch)
    int timestamp = (int) (System.currentTimeMillis() / 1000);
    out.write((timestamp >> 24) & 0xFF);
    out.write((timestamp >> 16) & 0xFF);
    out.write((timestamp >> 8) & 0xFF);
    out.write(timestamp & 0xFF);
    
    // Step 7: Serialize Avro record to binary
    GenericDatumWriter<GenericRecord> writer = 
        new GenericDatumWriter<>(schema);
    BinaryEncoder encoder = 
        EncoderFactory.get().binaryEncoder(out, null);
    writer.write(record, encoder);
    encoder.flush();
    
    return out.toByteArray();
}

Message Generation Features:

  • Avro Serialization: Schema-based binary serialization
  • Custom Binary Header: 8-byte header with critical routing metadata
  • Efficient Memory Usage: Pre-allocated ByteArrayOutputStream
  • Variable Message Sizes: Configurable payload sizing

4. Batch Accumulation and Processing

// Producer configuration for batching in UltraScaleKafkaProcessor
private KafkaProducer<byte[], byte[]> createProducer() {
    Properties props = new Properties();
    // ... other properties ...
    
    // Batching configuration
    props.put(ProducerConfig.LINGER_MS_CONFIG, 5);           // 5ms wait time
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4);  // 64KB batches
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864 * 2); // 128MB buffer
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // Fast compression
    
    return new KafkaProducer<>(props);
}

Batching Features:

  • Optimized Batch Size: 64KB default with compression
  • Linger Time: 5ms for filling batches efficiently
  • Large Producer Buffer: 128MB for handling traffic spikes
  • LZ4 Compression: Best balance of CPU usage vs. compression ratio

5. Metrics Collection and Feedback Loop

// Metrics tracking in ProducerTask
private void trackMetrics(int messagesSent, long bytesSent, long latencyNanos) {
    // Update atomic counters
    totalMessagesSent.addAndGet(messagesSent);
    totalBytesSent.addAndGet(bytesSent);
    
    // Record latencies for percentile calculation
    latencyRecorder.record(latencyNanos, TimeUnit.NANOSECONDS);
    
    // Update per-thread metrics
    threadMessageRate.mark(messagesSent);
    threadByteRate.mark(bytesSent);
    
    // Dynamic rate adjustment based on broker feedback
    if (latencyNanos > targetLatencyThresholdNanos) {
        // Reduce rate if latency is too high
        long currentRate = rateController.getRate();
        long newRate = (long)(currentRate * 0.95); // 5% reduction
        rateController.setRate(newRate);
        
        metricsRegistry.incrementCounter("rate_adjustments.down");
    } else if (latencyNanos < targetLatencyThresholdNanos / 2 
               && errorRate.getOneMinuteRate() < 0.001) {
        // Increase rate if latency is low and errors are minimal
        long currentRate = rateController.getRate();
        long newRate = (long)(currentRate * 1.05); // 5% increase
        rateController.setRate(Math.min(newRate, maxRate));
        
        metricsRegistry.incrementCounter("rate_adjustments.up");
    }
}

Metrics and Feedback Features:

  • Comprehensive Metrics: Messages/sec, bytes/sec, latency, error rate
  • Percentile Tracking: p50, p95, p99 latency measurements
  • Dynamic Rate Adjustment: Based on broker latency feedback
  • Adaptive Control Loop: Increases/decreases rate based on system health

Message Rate Control: In-Depth Analysis

1. Multi-level Rate Control

The system implements message rate control at multiple levels:

  1. Thread-Level Control: Each producer thread has its own rate limiter
    // Thread-level rate limiting in ProducerTask.run()
    while (remainingMessages > 0 && running.get()) {
        if (!rateController.tryAcquire()) {
            // Cannot send yet due to rate limiting
            LockSupport.parkNanos(100_000); // 100μs pause
            continue;
        }
        
        // Send message when token is acquired
        sendMessage();
        remainingMessages--;
    }
  2. Process-Level Control: Aggregate rate monitoring across all threads
    // Process-level aggregation in KafkaUltraScaleRunner
    private void monitorAggregateRate() {
        long currentMessages = totalMessagesSent.get();
        long previousMessages = lastMessageCount.getAndSet(currentMessages);
        long messagesDelta = currentMessages - previousMessages;
        
        long currentTime = System.currentTimeMillis();
        long elapsedMs = currentTime - lastCheckTime.getAndSet(currentTime);
        
        if (elapsedMs > 0) {
            double messagesPerSecond = (messagesDelta * 1000.0) / elapsedMs;
            
            // Check if we're exceeding global target rate
            if (messagesPerSecond > targetGlobalRate * 1.1) { // 10% tolerance
                // Signal all threads to reduce their rate
                signalRateAdjustment(0.9); // 10% reduction
            }
        }
    }
  3. Kafka Producer Buffer Control: Backpressure from producer buffers
    // In ProducerTask.sendMessage()
    try {
        Future<RecordMetadata> future = producer.send(record, callback);
        
        // Monitor buffer exhaustion
        if (producer.metrics()
            .get(new MetricName("buffer-exhausted-total", 
                               "producer-metrics", 
                               Collections.emptyMap()))
            .metricValue() > 0) {
            // Buffer exhaustion detected, apply backpressure
            rateController.applyBackpressure();
            metricsRegistry.incrementCounter("buffer_exhaustion");
        }
    }

2. Adaptive Rate Algorithm

The system uses a sophisticated PID-like controller to adapt message rates:

// Adaptive rate controller class
private class AdaptiveRateController {
    private final double kp = 0.5;  // Proportional term
    private final double ki = 0.2;  // Integral term
    private final double kd = 0.1;  // Derivative term
    
    private final double targetLatency;
    private double errorSum = 0;
    private double lastError = 0;
    private long currentRate;
    
    public void adjustRate(double currentLatency) {
        // Calculate error terms
        double error = targetLatency - currentLatency;
        errorSum += error;
        double errorDelta = error - lastError;
        
        // Calculate PID adjustment
        double adjustment = (kp * error) + (ki * errorSum) + (kd * errorDelta);
        
        // Apply adjustment to rate (with bounds)
        double rateMultiplier = Math.max(0.5, Math.min(1.5, 1.0 + (adjustment * 0.1)));
        currentRate = (long) (currentRate * rateMultiplier);
        
        // Enforce min/max bounds
        currentRate = Math.max(minRate, Math.min(maxRate, currentRate));
        
        // Update state
        lastError = error;
    }
}

3. Dynamic Batch Sizing

The system dynamically adjusts batch sizes based on message rate:

// Dynamic batch size calculation
private int calculateOptimalBatchSize(double messagesPerSecond) {
    // Base batch size of 16KB
    int baseBatchSize = 16 * 1024;
    
    if (messagesPerSecond < 10000) {
        // Lower rates: smaller batches for less latency
        return baseBatchSize / 2;
    } else if (messagesPerSecond > 50000) {
        // Higher rates: larger batches for better throughput
        return baseBatchSize * 2;
    } else {
        // Default for moderate rates
        return baseBatchSize;
    }
}

Thread Management and Monitoring

The system includes sophisticated thread management capabilities:

// Thread monitoring in KafkaUltraScaleRunner
private void monitorProducerThreads() {
    for (Map.Entry<Thread, ThreadStats> entry : threadStats.entrySet()) {
        Thread thread = entry.getKey();
        ThreadStats stats = entry.getValue();
        
        if (!thread.isAlive()) {
            // Thread died, replace it
            logThreadFailure(thread);
            replaceThread(thread);
            continue;
        }
        
        // Check for stalled threads
        long lastActivity = stats.getLastActivityTime();
        if (System.currentTimeMillis() - lastActivity > THREAD_STALL_THRESHOLD_MS) {
            // Thread is stalled, interrupt and replace
            logThreadStall(thread);
            thread.interrupt();
            replaceThread(thread);
        }
    }
}

Conclusion: Key Design Principles

The producer thread and message rate control architecture demonstrates several key design principles:

  1. Multi-level Throttling: Thread-level, process-level, and broker feedback
  2. Adaptive Control: Dynamic adjustment based on system conditions
  3. Resource Awareness: CPU, memory, and network-aware throttling
  4. Efficient Memory Usage: Binary protocols with minimal overhead
  5. Fail-Safe Operations: Monitoring, detection, and recovery from thread issues
  6. Metrics-Driven: Comprehensive metrics collection drives adjustments
  7. Batching Optimization: Dynamic batch sizing based on throughput requirements
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment