-
Apache Kafka
- Usage: Message broker infrastructure, topic management, partition replication
- Key Components: Kafka Streams API for processing, AdminClient for topic management
-
Apache Avro
- Usage: Schema definition, binary serialization/deserialization
- Key Classes:
GenericRecord
,GenericDatumReader
,GenericDatumWriter
-
LZ4 Compression
- Usage: High-speed compression for binary messages
- Performance: Optimized for compression/decompression speed over ratio
-
Micrometer Metrics
- Usage: Performance monitoring, throughput tracking, latency measurement
- Integration: Prometheus reporter for metrics visualization
-
ThreadPoolExecutor
- Usage: Managed thread pools for parallel message processing
- Configuration: CallerRunsPolicy for backpressure
-
BlockingQueue Implementations
- Types: ArrayBlockingQueue, LinkedBlockingQueue
- Usage: Bounded message buffers with backpressure handling
-
Atomic Variables
- Classes: AtomicLong, AtomicInteger, AtomicBoolean
- Usage: Thread-safe counters and state flags without locking
-
CountDownLatch
- Usage: Synchronization for batch processing coordination
-
Custom Binary Header Format
- Structure: 8-byte header (version, type, priority, flags, timestamp)
- Optimization: Minimal overhead for message routing
-
ByteBuffer
- Usage: Zero-copy buffer management for binary data
- Optimization: Direct buffers for native I/O
-
Binary Serialization Pipeline
- Custom Classes: BinaryMessageBuilder, MessageMetadata
- Features: Schema version encoding, compression flags
-
Producer Optimizations
- Batching: Configured for batches up to 64KB
- Linger Time: 5ms for optimal batching
- Buffer Memory: 128MB per producer
-
Consumer Optimizations
- Fetch Size: Configured for 1MB per partition
- Max Poll Records: 10,000 records per poll
- Auto Commit: Disabled for manual offset management
-
Exactly-Once Semantics
- Transaction Support: Enabled with unique transaction IDs
- Isolation Level: read_committed for consumers
-
Circuit Breaker Pattern
- Implementation: AdaptiveCircuitBreaker with dynamic thresholds
- State Management: Failure rate tracking with reset mechanism
-
Backpressure Mechanisms
- Producer: Buffer limits and blocking queue
- Consumer: CallerRunsPolicy in thread pool executor
-
Memory Management
- Buffer Pools: Pre-allocated buffers for message handling
- Bounded Queues: Preventing OOM during traffic spikes
-
Prometheus Integration
- Metrics Types: Counters, gauges, timers, histograms
- Custom Metrics: Throughput, latency percentiles, backlog
-
JVM Metrics Collection
- Areas Monitored: Memory, GC, threads, classloading
- Integration: Automatic binding to metrics registry
-
KStream Processing
- Topology: Source → Process → Branch → Sink
- Stateless Operations: map, filter, branch by priority
-
Dynamic Message Routing
- Implementation: Dynamic topic selection based on message metadata
- Optimization: Sticky partition assignment
flowchart TD
classDef threadPool fill:#6366F1,color:white,stroke:#4F46E5,stroke-width:2px
classDef queueComponent fill:#F59E0B,color:white,stroke:#D97706,stroke-width:2px
classDef rateControl fill:#10B981,color:white,stroke:#059669,stroke-width:2px
classDef messageComponent fill:#EF4444,color:white,stroke:#DC2626,stroke-width:2px
classDef kafkaComponent fill:#3B82F6,color:white,stroke:#2563EB,stroke-width:2px
classDef metricComponent fill:#8B5CF6,color:white,stroke:#7C3AED,stroke-width:2px
%% Main Thread Pool Controller
TP[Thread Pool Executor<br/>Fixed Size = CPU Cores x 2]:::threadPool
%% Thread Pool Distribution
TP --> TP1[Producer Thread 1]
TP --> TP2[Producer Thread 2]
TP --> TP3[Producer Thread 3]
TP --> TP4[Producer Thread N]
TP1:::threadPool
TP2:::threadPool
TP3:::threadPool
TP4:::threadPool
%% Rate Control Components
RC[Central Rate Controller]:::rateControl
RateCalc[Rate Calculator<br/>Target = 50K msgs/sec/thread]:::rateControl
TokenBucket[Token Bucket Algorithm<br/>Refill Rate = Target Rate]:::rateControl
AdaptiveRate[Adaptive Rate Adjuster<br/>Based on Broker Feedback]:::rateControl
%% Rate Control Flow
RC --> RateCalc
RateCalc --> TokenBucket
MetricsFeedback --> AdaptiveRate
AdaptiveRate --> RateCalc
%% Per-Thread Message Generation
TP1 --> MG1[Message Generator]
TP2 --> MG2[Message Generator]
TP3 --> MG3[Message Generator]
TP4 --> MG4[Message Generator]
MG1:::messageComponent
MG2:::messageComponent
MG3:::messageComponent
MG4:::messageComponent
%% Message Generation Components per Thread
MG1 --> MGen1[Generate Binary Message]
MGen1 --> MEncode1[Encode with Avro]
MEncode1 --> MCompress1[Apply LZ4 Compression]
MCompress1 --> MPriority1[Assign Priority]
MPriority1 --> MBatch1[Add to Batch]
MGen1:::messageComponent
MEncode1:::messageComponent
MCompress1:::messageComponent
MPriority1:::messageComponent
MBatch1:::messageComponent
%% Rate Control Integration
TokenBucket --> MG1
TokenBucket --> MG2
TokenBucket --> MG3
TokenBucket --> MG4
%% Message Batching
MBatch1 --> SharedBatch[Batch Accumulator<br/>Target Size = 64KB]
MG2 --> SharedBatch
MG3 --> SharedBatch
MG4 --> SharedBatch
SharedBatch:::queueComponent
%% Batch Processing Components
SharedBatch --> BatchProcessor[Batch Processor<br/>Linger Time = 5ms]
BatchProcessor:::queueComponent
BatchProcessor --> PartitionSelector[Partition Selector]
PartitionSelector:::kafkaComponent
%% Producer API Integration
PartitionSelector --> KafkaProducer[Kafka Producer API]
KafkaProducer:::kafkaComponent
KafkaProducer --> KafkaBroker[Kafka Brokers]
KafkaBroker:::kafkaComponent
%% Metrics Collection
MetricsCollector[Metrics Collector]:::metricComponent
TP1 --> MetricsCollector
TP2 --> MetricsCollector
TP3 --> MetricsCollector
TP4 --> MetricsCollector
KafkaProducer --> MetricsCollector
MetricsCollector --> MetricsFeedback[Metrics Feedback Loop]
MetricsFeedback:::metricComponent
%% Detailed Rate Limiting Components
subgraph "Rate Limiting Details"
direction TB
RLT[Token Bucket]:::rateControl
RLT --> RLT1[Current Token Count]
RLT --> RLT2[Max Bucket Size]
RLT --> RLT3[Refill Rate]
RLT --> RLT4[Refill Interval]
RLA[Adaptive Control]:::rateControl
RLA --> RLA1[Broker Latency Feedback]
RLA --> RLA2[Queue Time Monitoring]
RLA --> RLA3[Error Rate Monitoring]
RLA --> RLA4[Network Utilization]
end
%% Thread State Management
subgraph "Thread State Management"
direction TB
TSM[Thread State Manager]:::threadPool
TSM --> TSM1[Active Thread Count]
TSM --> TSM2[Thread Health Check]
TSM --> TSM3[Stalled Thread Detection]
TSM --> TSM4[Thread Replacement Policy]
end
%% Message Batch Composition
subgraph "Message Batch Structure"
direction TB
MBS[Message Batch]:::messageComponent
MBS --> MBS1[Header 8 bytes]
MBS --> MBS2[Message ID 16 bytes]
MBS --> MBS3[Schema Version 1 byte]
MBS --> MBS4[Priority 1 byte]
MBS --> MBS5[Payload variable]
end
flowchart TD
subgraph "Message Creation"
A1[Client Request] --> A2[BinaryMessageBuilder]
A2 --> A3[Avro Serialization]
A3 --> A4[Binary Message\nwith Header]
end
subgraph "Producer Flow"
B1[Producer Thread Pool] --> B2[Create Message Batch]
B2 --> B3[Priority Assignment]
B3 --> B4[Compression]
B4 --> B5[Send to Kafka]
B5 --> B6[Monitor Throughput]
end
subgraph "Kafka Cluster"
C1[Input Topics\nwith Partitions] --> C2[Replication]
C2 --> C3[Retention\nPolicies]
C3 --> C4[Consumer\nGroups]
end
subgraph "Consumer Flow"
D1[Consumer Poll] --> D2[Message Batching]
D2 --> D3[Partition-Aware\nProcessing]
D3 --> D4[Memory Buffer Queue]
D4 --> D5[Circuit Breaker]
D5 --> D6[Processor Thread Pool]
end
subgraph "Message Processing"
E1[Extract Binary Header] --> E2[Priority Detection]
E2 --> E3[Schema Resolution]
E3 --> E4[Avro Deserialization]
E4 --> E5[Business Logic\nProcessing]
E5 --> E6[Re-encode for Output]
end
subgraph "Metrics & Monitoring"
F1[Message Counters] --> F2[Throughput Calculation]
F2 --> F3[Latency Tracking]
F3 --> F4[Error Rates]
F4 --> F5[JVM Metrics]
F5 --> F6[Prometheus\nEndpoints]
end
subgraph "Error Handling"
G1[Detect Error] --> G2[Record Failure]
G2 --> G3[Circuit Breaker\nThreshold Check]
G3 --> G4[Retry Logic]
G4 --> G5[Error Topic\nPublication]
end
%% Connect the flows
A4 --> B1
B6 --> F1
B5 --> C1
C4 --> D1
D6 --> E1
E5 --> F1
E6 --> C1
E5 --> G1
%% Feedback loops
F6 -.-> B1
F6 -.-> D1
G3 -.-> D5
classDef producer fill:#22C55E,color:white
classDef kafka fill:#082f49,color:white
classDef consumer fill:#7c3aed,color:white
classDef processing fill:#ef4444,color:white
classDef metrics fill:#f59e0b,color:white
classDef errors fill:#dc2626,color:white
class A1,A2,A3,A4 producer
class B1,B2,B3,B4,B5,B6 producer
class C1,C2,C3,C4 kafka
class D1,D2,D3,D4,D5,D6 consumer
class E1,E2,E3,E4,E5,E6 processing
class F1,F2,F3,F4,F5,F6 metrics
class G1,G2,G3,G4,G5 errors
The producer thread design follows a multi-layered approach for maximum throughput and controlled resource utilization:
// From KafkaUltraScaleRunner.java
private static final int PRODUCER_THREADS = 16;
// Thread pool creation with fixed size
this.producerExecutor = Executors.newFixedThreadPool(PRODUCER_THREADS,
new ThreadFactory() {
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "producer-" + counter++);
t.setPriority(Thread.NORM_PRIORITY + 1); // Slightly higher priority
return t;
}
}
);
Key Design Elements:
- Fixed Thread Count: Sized to
CPU cores × 2
for optimal parallelism - Custom Thread Factory: Named threads for easier debugging and monitoring
- Priority Management: Producer threads get slightly higher priority than standard threads
- Lifecycle Management: Centralized control for graceful shutdown
// From ProducerTask.java
private class RateController {
private final AtomicLong tokenBucket = new AtomicLong();
private final long maxTokens;
private final long refillRate;
private final long refillIntervalNanos;
private long lastRefillTime;
// Token bucket implementation
public boolean tryAcquire() {
long now = System.nanoTime();
refillTokens(now);
if (tokenBucket.get() > 0) {
tokenBucket.decrementAndGet();
return true;
}
return false;
}
private void refillTokens(long now) {
long elapsedNanos = now - lastRefillTime;
if (elapsedNanos > refillIntervalNanos) {
long tokensToAdd = (elapsedNanos / refillIntervalNanos) * refillRate;
long newTokenCount = Math.min(maxTokens, tokenBucket.get() + tokensToAdd);
tokenBucket.set(newTokenCount);
lastRefillTime = now;
}
}
}
Rate Control Features:
- Token Bucket Algorithm: Controls message rate with burst capability
- Configurable Rate: Default 50,000 messages/second per thread
- Adaptive Rate Control: Adjusts based on broker feedback
- Precise Timing: Nanosecond-level refill intervals
// Message creation process in ProducerTask
private byte[] createBinaryMessage() throws IOException {
// Step 1: Create Avro record with random data
GenericRecord record = new GenericData.Record(schema);
record.put("id", UUID.randomUUID().toString());
record.put("timestamp", System.currentTimeMillis());
record.put("priority", random.nextInt(10) + 1);
// Step 2: Generate binary payload (100-5100 bytes)
byte[] content = new byte[random.nextInt(5000) + 100];
random.nextBytes(content);
record.put("content", ByteBuffer.wrap(content));
// Step 3: Add metadata
Map<String, String> metadata = new HashMap<>();
metadata.put("source", "kafka-test-app");
metadata.put("threadId", String.valueOf(threadId));
record.put("metadata", metadata);
// Step 4: Create binary output stream for full message
ByteArrayOutputStream out = new ByteArrayOutputStream();
// Step 5: Write binary header (8 bytes total)
out.write(1); // Schema version (1 byte)
out.write(random.nextInt(5) + 1); // Message type (1 byte)
out.write(random.nextInt(10) + 1); // Priority (1 byte)
out.write(random.nextInt(256)); // Flags (1 byte)
// Step 6: Write timestamp as 4 bytes (seconds since epoch)
int timestamp = (int) (System.currentTimeMillis() / 1000);
out.write((timestamp >> 24) & 0xFF);
out.write((timestamp >> 16) & 0xFF);
out.write((timestamp >> 8) & 0xFF);
out.write(timestamp & 0xFF);
// Step 7: Serialize Avro record to binary
GenericDatumWriter<GenericRecord> writer =
new GenericDatumWriter<>(schema);
BinaryEncoder encoder =
EncoderFactory.get().binaryEncoder(out, null);
writer.write(record, encoder);
encoder.flush();
return out.toByteArray();
}
Message Generation Features:
- Avro Serialization: Schema-based binary serialization
- Custom Binary Header: 8-byte header with critical routing metadata
- Efficient Memory Usage: Pre-allocated ByteArrayOutputStream
- Variable Message Sizes: Configurable payload sizing
// Producer configuration for batching in UltraScaleKafkaProcessor
private KafkaProducer<byte[], byte[]> createProducer() {
Properties props = new Properties();
// ... other properties ...
// Batching configuration
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 5ms wait time
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384 * 4); // 64KB batches
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864 * 2); // 128MB buffer
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // Fast compression
return new KafkaProducer<>(props);
}
Batching Features:
- Optimized Batch Size: 64KB default with compression
- Linger Time: 5ms for filling batches efficiently
- Large Producer Buffer: 128MB for handling traffic spikes
- LZ4 Compression: Best balance of CPU usage vs. compression ratio
// Metrics tracking in ProducerTask
private void trackMetrics(int messagesSent, long bytesSent, long latencyNanos) {
// Update atomic counters
totalMessagesSent.addAndGet(messagesSent);
totalBytesSent.addAndGet(bytesSent);
// Record latencies for percentile calculation
latencyRecorder.record(latencyNanos, TimeUnit.NANOSECONDS);
// Update per-thread metrics
threadMessageRate.mark(messagesSent);
threadByteRate.mark(bytesSent);
// Dynamic rate adjustment based on broker feedback
if (latencyNanos > targetLatencyThresholdNanos) {
// Reduce rate if latency is too high
long currentRate = rateController.getRate();
long newRate = (long)(currentRate * 0.95); // 5% reduction
rateController.setRate(newRate);
metricsRegistry.incrementCounter("rate_adjustments.down");
} else if (latencyNanos < targetLatencyThresholdNanos / 2
&& errorRate.getOneMinuteRate() < 0.001) {
// Increase rate if latency is low and errors are minimal
long currentRate = rateController.getRate();
long newRate = (long)(currentRate * 1.05); // 5% increase
rateController.setRate(Math.min(newRate, maxRate));
metricsRegistry.incrementCounter("rate_adjustments.up");
}
}
Metrics and Feedback Features:
- Comprehensive Metrics: Messages/sec, bytes/sec, latency, error rate
- Percentile Tracking: p50, p95, p99 latency measurements
- Dynamic Rate Adjustment: Based on broker latency feedback
- Adaptive Control Loop: Increases/decreases rate based on system health
The system implements message rate control at multiple levels:
- Thread-Level Control: Each producer thread has its own rate limiter
// Thread-level rate limiting in ProducerTask.run() while (remainingMessages > 0 && running.get()) { if (!rateController.tryAcquire()) { // Cannot send yet due to rate limiting LockSupport.parkNanos(100_000); // 100μs pause continue; } // Send message when token is acquired sendMessage(); remainingMessages--; }
- Process-Level Control: Aggregate rate monitoring across all threads
// Process-level aggregation in KafkaUltraScaleRunner private void monitorAggregateRate() { long currentMessages = totalMessagesSent.get(); long previousMessages = lastMessageCount.getAndSet(currentMessages); long messagesDelta = currentMessages - previousMessages; long currentTime = System.currentTimeMillis(); long elapsedMs = currentTime - lastCheckTime.getAndSet(currentTime); if (elapsedMs > 0) { double messagesPerSecond = (messagesDelta * 1000.0) / elapsedMs; // Check if we're exceeding global target rate if (messagesPerSecond > targetGlobalRate * 1.1) { // 10% tolerance // Signal all threads to reduce their rate signalRateAdjustment(0.9); // 10% reduction } } }
- Kafka Producer Buffer Control: Backpressure from producer buffers
// In ProducerTask.sendMessage() try { Future<RecordMetadata> future = producer.send(record, callback); // Monitor buffer exhaustion if (producer.metrics() .get(new MetricName("buffer-exhausted-total", "producer-metrics", Collections.emptyMap())) .metricValue() > 0) { // Buffer exhaustion detected, apply backpressure rateController.applyBackpressure(); metricsRegistry.incrementCounter("buffer_exhaustion"); } }
The system uses a sophisticated PID-like controller to adapt message rates:
// Adaptive rate controller class
private class AdaptiveRateController {
private final double kp = 0.5; // Proportional term
private final double ki = 0.2; // Integral term
private final double kd = 0.1; // Derivative term
private final double targetLatency;
private double errorSum = 0;
private double lastError = 0;
private long currentRate;
public void adjustRate(double currentLatency) {
// Calculate error terms
double error = targetLatency - currentLatency;
errorSum += error;
double errorDelta = error - lastError;
// Calculate PID adjustment
double adjustment = (kp * error) + (ki * errorSum) + (kd * errorDelta);
// Apply adjustment to rate (with bounds)
double rateMultiplier = Math.max(0.5, Math.min(1.5, 1.0 + (adjustment * 0.1)));
currentRate = (long) (currentRate * rateMultiplier);
// Enforce min/max bounds
currentRate = Math.max(minRate, Math.min(maxRate, currentRate));
// Update state
lastError = error;
}
}
The system dynamically adjusts batch sizes based on message rate:
// Dynamic batch size calculation
private int calculateOptimalBatchSize(double messagesPerSecond) {
// Base batch size of 16KB
int baseBatchSize = 16 * 1024;
if (messagesPerSecond < 10000) {
// Lower rates: smaller batches for less latency
return baseBatchSize / 2;
} else if (messagesPerSecond > 50000) {
// Higher rates: larger batches for better throughput
return baseBatchSize * 2;
} else {
// Default for moderate rates
return baseBatchSize;
}
}
The system includes sophisticated thread management capabilities:
// Thread monitoring in KafkaUltraScaleRunner
private void monitorProducerThreads() {
for (Map.Entry<Thread, ThreadStats> entry : threadStats.entrySet()) {
Thread thread = entry.getKey();
ThreadStats stats = entry.getValue();
if (!thread.isAlive()) {
// Thread died, replace it
logThreadFailure(thread);
replaceThread(thread);
continue;
}
// Check for stalled threads
long lastActivity = stats.getLastActivityTime();
if (System.currentTimeMillis() - lastActivity > THREAD_STALL_THRESHOLD_MS) {
// Thread is stalled, interrupt and replace
logThreadStall(thread);
thread.interrupt();
replaceThread(thread);
}
}
}
The producer thread and message rate control architecture demonstrates several key design principles:
- Multi-level Throttling: Thread-level, process-level, and broker feedback
- Adaptive Control: Dynamic adjustment based on system conditions
- Resource Awareness: CPU, memory, and network-aware throttling
- Efficient Memory Usage: Binary protocols with minimal overhead
- Fail-Safe Operations: Monitoring, detection, and recovery from thread issues
- Metrics-Driven: Comprehensive metrics collection drives adjustments
- Batching Optimization: Dynamic batch sizing based on throughput requirements