Skip to content

Instantly share code, notes, and snippets.

@architectureman
Last active March 11, 2025 05:00
Show Gist options
  • Save architectureman/c7c0f1d7b2f698d409999b447791d463 to your computer and use it in GitHub Desktop.
Save architectureman/c7c0f1d7b2f698d409999b447791d463 to your computer and use it in GitHub Desktop.
MSG_OMS_SYS_DESIGN

Thiết kế hệ thống xử lý Message cho hệ thống giao dịch chứng khoán

Mục lục

  1. Tổng quan
  2. Kiến trúc tổng thể
  3. Lựa chọn công nghệ và cơ sở
  4. Thiết kế Message Binary Format
  5. Hạ tầng xử lý Message
  6. Xử lý lỗi và phục hồi
  7. Chiến lược triển khai
  8. Tối ưu hiệu năng
  9. Kết luận và khuyến nghị

Tổng quan

Tài liệu này trình bày thiết kế chi tiết về hệ thống xử lý message hỗ trợ cho hệ thống OMS (Order Management System) và các tác vụ xử lý bất đồng bộ trong giao dịch chứng khoán. Hệ thống được thiết kế theo mô hình event-driven và stream processing, đảm bảo khả năng xử lý khối lượng giao dịch cao với độ tin cậy, khả năng phục hồi và mở rộng.

Các yêu cầu chính của hệ thống bao gồm:

  • Khả năng xử lý message với khối lượng cực lớn (hàng triệu message/giây)
  • Đảm bảo tính chính xác và toàn vẹn dữ liệu
  • Khả năng tự phục hồi sau sự cố
  • Khả năng mở rộng linh hoạt để đáp ứng tải
  • Tiết kiệm chi phí vận hành

Kiến trúc tổng thể

Hệ thống được thiết kế theo kiến trúc microservices kết hợp với event-driven, sử dụng các thành phần chính:

flowchart TB
    Client([Client]) --> Gateway[API Gateway\nOpenResty]
    Gateway --> MsgFormatter[Message Formatter\nRust]
    
    subgraph Kafka[Kafka Cluster]
        KTopic1[Topic\norders]
        KTopic2[Topic\ntasks]
        KTopic3[Topic\nevents]
    end
    
    MsgFormatter --> KTopic1
    MsgFormatter --> KTopic2
    MsgFormatter --> KTopic3
    
    subgraph ConsumerGroups["Consumer Group Manager"]
        CG1[Group\ncritical]
        CG2[Group\nnormal]
        CG3[Group\nbatch]
    end
    
    KTopic1 --> ConsumerGroups
    KTopic2 --> ConsumerGroups
    KTopic3 --> ConsumerGroups
    
    subgraph ThreadPools["Thread Pools"]
        TP1[Pool\ncritical]
        TP2[Pool\nnormal]
        TP3[Pool\nbatch]
    end
    
    CG1 --> TP1
    CG2 --> TP2
    CG3 --> TP3
    
    ThreadPools --> Processors[Processor\nElixir]
    
    Processors --> DragonflyCache[Dragonfly\nHot Data]
    Processors --> ScyllaDB[ScyllaDB\nCold Data]
    
    subgraph ErrorHandling["Error Handling"]
        RetryQ[Retry Queue]
        DLQ[Dead Letter Queue]
        CircuitB[Circuit Breaker]
    end
    
    Processors --> ErrorHandling
    ErrorHandling --> KTopic2
    
    subgraph Monitoring["Monitoring & Alerting"]
        Prometheus[Prometheus]
        Grafana[Grafana]
        AlertManager[Alert Manager]
    end
    
    Processors --> Monitoring
    ErrorHandling --> Monitoring
    ThreadPools --> Monitoring
    ConsumerGroups --> Monitoring
    
    style Gateway fill:#f9d,stroke:#333,stroke-width:2px
    style Kafka fill:#bdf,stroke:#333,stroke-width:2px
    style ThreadPools fill:#dfd,stroke:#333,stroke-width:2px
    style ErrorHandling fill:#fdd,stroke:#333,stroke-width:2px
Loading

Lựa chọn công nghệ và cơ sở

API Gateway: OpenResty vs Traefik

Đặc tính OpenResty Traefik
Hiệu năng Rất cao (dựa trên Nginx) Cao (dựa trên Go)
Khả năng xử lý đồng thời 100K+ connections 10K+ connections
Latency Cực thấp (microseconds) Thấp (milliseconds)
Tính linh hoạt Cao (Lua scripting) Vừa phải (middleware)
Tự động phát hiện service Không có sẵn (cần tích hợp) Có sẵn
Let's Encrypt Cần cấu hình Tích hợp sẵn
Metrics Cần cấu hình Tích hợp sẵn
Độ phức tạp triển khai Cao Thấp

Lựa chọn: OpenResty

Lý do chọn:

  1. Hiệu năng vượt trội: OpenResty (dựa trên Nginx) có khả năng xử lý đồng thời cao hơn nhiều lần so với Traefik. Trong môi trường giao dịch chứng khoán, việc xử lý hàng trăm nghìn kết nối đồng thời là yêu cầu thiết yếu.
  2. Latency cực thấp: OpenResty cung cấp độ trễ ở mức microsecond, trong khi Traefik thường ở mức millisecond.
  3. Tính linh hoạt cao: Khả năng lập trình với Lua trong OpenResty giúp tùy biến logic xử lý phức tạp như authentication, rate limiting, và transformation tại edge.
  4. Tiêu thụ tài nguyên thấp: OpenResty sử dụng ít tài nguyên hơn đáng kể so với Traefik, giúp tiết kiệm chi phí vận hành.

Mặc dù Traefik có ưu điểm về tính dễ dùng và tích hợp sẵn với hệ sinh thái cloud-native, nhưng với yêu cầu về hiệu năng và tính linh hoạt của hệ thống giao dịch chứng khoán, OpenResty là lựa chọn phù hợp hơn.

Message Broker: Kafka vs Redis Stream

Đặc tính Kafka Redis Stream
Throughput Cực cao (hàng triệu msg/giây) Cao (hàng trăm nghìn msg/giây)
Latency Thấp (milliseconds) Cực thấp (microseconds)
Dung lượng lưu trữ Không giới hạn (disk-based) Giới hạn bởi RAM
Độ bền dữ liệu Rất cao (replication, persistence) Vừa phải (cần cấu hình đặc biệt)
Khả năng mở rộng Tuyến tính, phân vùng Cần cluster, phức tạp hơn
Consumer groups Mạnh mẽ, cân bằng tự động Cơ bản, yêu cầu quản lý thủ công hơn
Replay khả năng Hoàn toàn (từ bất kỳ offset) Giới hạn (phụ thuộc cấu hình)

Lựa chọn: Kết hợp Kafka và Redis Stream/Dragonfly

Lý do chọn:

  1. Kafka làm backbone chính:

    • Lưu trữ tin cậy và lâu dài cho tất cả message
    • Khả năng replay toàn bộ stream từ bất kỳ thời điểm nào
    • Mở rộng tuyến tính để đáp ứng tải cao
    • Phân vùng linh hoạt cho phép xử lý song song
  2. Redis Stream/Dragonfly cho xử lý realtime:

    • Độ trễ cực thấp cho các tác vụ cần phản hồi ngay (ví dụ: tín hiệu giá)
    • Cache cho dữ liệu hot
    • Hỗ trợ xử lý stateful sessions
    • Độ tin cậy cao hơn khi cấu hình với Redis Sentinel/Cluster hoặc Dragonfly

Sự kết hợp này cho phép hệ thống vừa đạt hiệu năng cao với Redis Stream/Dragonfly cho các tác vụ độ trễ thấp, vừa đảm bảo độ tin cậy với Kafka cho việc lưu trữ và xử lý dữ liệu lâu dài.

In-Memory Store: Redis vs Dragonfly

Đặc tính Redis Dragonfly
Mô hình thread Single-threaded (chính) Multi-threaded (từ đầu)
Hiệu năng Cao (~1M ops/sec/node) Rất cao (2-5x Redis ở nhiều workload)
Memory efficiency Tốt Tốt hơn (tiết kiệm 30-50% RAM)
Scalability Redis Cluster (cần cấu hình) Scale-out với nhiều node
CPU utilization Giới hạn ở 1 core chính Tận dụng đa core hiệu quả
Tương thích Redis Native ~99% tương thích với Redis API
Khả năng phục hồi Cần cấu hình Redis Sentinel Tự phục hồi với cụm Dragonfly
Streams API Đầy đủ Tương thích với Redis Streams
Độ trưởng thành Rất cao (từ 2009) Mới (từ 2021)
Hỗ trợ cộng đồng Rộng rãi Đang phát triển
Triển khai thực tế Hàng triệu triển khai Số lượng hạn chế

Lựa chọn: Dragonfly (kết hợp với Kafka)

Lý do chọn:

  1. Hiệu năng vượt trội: Dragonfly với mô hình multi-threaded có khả năng xử lý nhiều hơn 2-5 lần so với Redis trên cùng phần cứng, đặc biệt là khi có nhiều core CPU.
  2. Tiết kiệm bộ nhớ: Sử dụng ít hơn 30-50% RAM cho cùng một dataset, làm giảm chi phí vận hành đáng kể.
  3. Tận dụng đa lõi: Thiết kế từ đầu để tận dụng tối đa tất cả CPU core, không bị giới hạn bởi mô hình single-threaded như Redis.
  4. Tương thích Redis: Khoảng 99% tương thích với Redis API, cho phép sử dụng các client Redis hiện có mà không cần thay đổi code đáng kể.
  5. Streams tương thích: Hỗ trợ đầy đủ Redis Streams API, đảm bảo khả năng sử dụng cho trường hợp cần xử lý message với độ trễ cực thấp.
  6. Khả năng scale: Khả năng mở rộng đơn giản hơn với hiệu suất tuyến tính khi thêm node.

Tuy Dragonfly là một công nghệ tương đối mới so với Redis, nhưng ưu điểm về hiệu năng và khả năng tận dụng nhiều CPU core là vô cùng quan trọng cho hệ thống xử lý giao dịch khối lượng lớn. Việc kết hợp Dragonfly với Kafka tạo nên một giải pháp toàn diện:

  • Kafka đảm bảo lưu trữ message bền vững, khả năng replay và xử lý song song khối lượng lớn
  • Dragonfly cung cấp layer cache và xử lý message tốc độ cao với độ trễ thấp, đồng thời tiết kiệm tài nguyên

Cách triển khai này đặc biệt hiệu quả khi mỗi node xử lý có nhiều CPU core, tận dụng được ưu điểm của Dragonfly so với Redis.

Storage: Redis vs ScyllaDB

Đặc tính Redis ScyllaDB
Kiểu lưu trữ In-memory (với persistence) Disk-based
Latency Cực thấp (microseconds) Thấp (sub-milliseconds)
Throughput Cao (~1M ops/sec/node) Cực cao (hàng triệu ops/sec/cluster)
Dung lượng lưu trữ Giới hạn bởi RAM Không giới hạn
Cấu trúc dữ liệu Phong phú (string, hash, list, set, zset...) Column-family store
Khả năng query Limited CQL (SQL-like)
Khả năng mở rộng Cluster (complex) Linear sharding (simple)
Chi phí Cao (RAM-intensive) Vừa phải (disk-based)

Lựa chọn: Kết hợp Dragonfly và ScyllaDB

Lý do chọn:

  1. Dragonfly cho Hot Data:

    • Lưu trữ state ngắn hạn với độ trễ cực thấp
    • Cache cho dữ liệu truy cập thường xuyên
    • Cấu trúc dữ liệu phong phú phù hợp cho các tác vụ đặc thù
    • Rate limiting, session management
    • Hiệu suất cao hơn và sử dụng bộ nhớ hiệu quả hơn Redis
  2. ScyllaDB cho Cold Data:

    • Lưu trữ dữ liệu lâu dài với khối lượng lớn
    • Time-series data cho phân tích
    • Kiến trúc shard-per-core tận dụng tối đa phần cứng
    • Chi phí thấp hơn cho lưu trữ dài hạn

Sự kết hợp này mang lại hiệu quả tối ưu về chi phí và hiệu năng, với Dragonfly xử lý dữ liệu cần độ trễ thấp và ScyllaDB xử lý dữ liệu lịch sử với khối lượng lớn.

Ngôn ngữ lập trình

Để tối ưu hiệu năng và phù hợp với đặc thù của từng thành phần, thiết kế sử dụng kết hợp ba ngôn ngữ lập trình:

Elixir/Erlang (BEAM VM)

Sử dụng cho:

  • Xử lý message và business logic
  • Quản lý concurrency và state
  • Error handling và recovery
  • Stream processing

Lý do chọn:

  • Concurrency vượt trội: Mô hình actor cho phép xử lý hàng triệu "process" đồng thời với chi phí thấp
  • Fault tolerance: Triết lý "let it crash" và giám sát supervisors tạo hệ thống có khả năng tự phục hồi cao
  • Soft real-time: Độ trễ dự đoán được, phù hợp cho hệ thống giao dịch
  • Hot code swapping: Cập nhật hệ thống mà không cần downtime
defmodule OrderProcessor do
  use GenServer

  # Khởi tạo processor với cấu hình tùy chỉnh
  def start_link(init_args) do
    GenServer.start_link(__MODULE__, init_args, name: __MODULE__)
  end

  def init(init_args) do
    # Khởi tạo state từ args
    {:ok, %{config: init_args}}
  end

  # Xử lý message không đồng bộ
  def handle_cast({:process_order, order_message}, state) do
    # Xử lý order message
    with {:ok, decoded} <- decode_message(order_message),
         {:ok, validated} <- validate_order(decoded),
         {:ok, result} <- apply_business_rules(validated) do
      # Cập nhật state và gửi kết quả
      {:noreply, update_state(state, result)}
    else
      {:error, reason} ->
        # Xử lý lỗi với supervisor
        {:noreply, handle_error(state, reason)}
    end
  end

  # Các hàm xử lý nội bộ
  defp decode_message(message) do
    # Giải mã binary message
  end

  defp validate_order(order) do
    # Kiểm tra tính hợp lệ của order
  end
  
  defp apply_business_rules(order) do
    # Áp dụng logic nghiệp vụ
  end
  
  defp update_state(state, result) do
    # Cập nhật state với kết quả xử lý
  end
  
  defp handle_error(state, reason) do
    # Xử lý lỗi và recovery
  end
end

Rust

Sử dụng cho:

  • Message encoder/decoder
  • Producer/Consumer performant
  • Binary protocol handling
  • CPU-intensive tasks

Lý do chọn:

  • Hiệu năng cao: Tốc độ xử lý gần với C/C++ nhưng an toàn về bộ nhớ
  • Memory safety: Không cần GC, giảm độ trễ và gián đoạn
  • Tiêu thụ tài nguyên thấp: Phù hợp cho các thành phần cần tối ưu chi phí phần cứng
  • Concurrency model hiện đại: Async/await mang lại hiệu suất tốt cho I/O-bound tasks
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use std::time::Duration;

// Hàm encode và gửi message
async fn encode_and_send_order(
    order: &Order,
    producer: &FutureProducer,
    bit_mode: u8
) -> Result<(), String> {
    // Encode order thành binary format với bit_mode tùy chỉnh
    let encoded = match encode_message(order, bit_mode) {
        Ok(data) => data,
        Err(e) => return Err(format!("Encoding error: {}", e)),
    };
    
    // Tính checksum
    let checksum = calculate_checksum(&encoded);
    let message_with_checksum = add_checksum(encoded, checksum);
    
    // Xác định partition dựa trên priority
    let partition = determine_partition(order.priority);
    
    // Gửi message đến Kafka
    let record = FutureRecord::to("orders")
        .key(&order.id)
        .payload(&message_with_checksum);
        
    match producer.send(record, Timeout::After(Duration::from_millis(100))).await {
        Ok(_) => Ok(()),
        Err((e, _)) => Err(format!("Kafka error: {}", e)),
    }
}

// Hàm decode message
fn decode_message(data: &[u8]) -> Result<Order, String> {
    // Verify checksum trước
    if !verify_checksum(data) {
        return Err("Checksum verification failed".to_string());
    }
    
    // Đọc header để xác định bit_mode
    let header = read_header(data)?;
    
    // Decode theo bit_mode
    decode_by_bit_mode(data, header.bit_mode)
}

Java

Sử dụng cho:

  • Integration với hệ thống hiện có
  • Admin services và APIs
  • Thành phần cần tham chiếu tới thư viện Java hiện có

Lý do chọn:

  • Hệ sinh thái phong phú: Nhiều thư viện và framework sẵn có
  • JVM ổn định: Đã được tối ưu qua nhiều năm, nhiều công cụ monitoring
  • Tương thích cao: Dễ dàng tích hợp với các hệ thống enterprise
  • Nhân sự sẵn có: Dễ tìm nhân sự có kinh nghiệm
@Service
public class OrderServiceIntegration {
    
    private final KafkaTemplate<String, byte[]> kafkaTemplate;
    private final OrderRepository orderRepository;
    
    @Autowired
    public OrderServiceIntegration(KafkaTemplate<String, byte[]> kafkaTemplate, 
                                  OrderRepository orderRepository) {
        this.kafkaTemplate = kafkaTemplate;
        this.orderRepository = orderRepository;
    }
    
    @Transactional
    public CompletableFuture<OrderResult> processOrder(Order order) {
        // Lưu order vào database trước (atomic operation)
        Order savedOrder = orderRepository.save(order);
        
        // Serialize order sang binary format
        byte[] orderMessage = MessageEncoder.encode(savedOrder);
        
        // Gửi message đến Kafka và trả về future
        return kafkaTemplate.send("orders", order.getId(), orderMessage)
            .thenApply(result -> new OrderResult(savedOrder.getId(), true))
            .exceptionally(ex -> {
                // Xử lý lỗi Kafka
                log.error("Failed to send order to Kafka", ex);
                return new OrderResult(savedOrder.getId(), false, ex.getMessage());
            });
    }
}

Thiết kế Message Binary Format

Cấu trúc Header

Thiết kế header linh hoạt với hỗ trợ đa dạng bit modes:

classDiagram
    class MessageHeader {
        +uint8 magic
        +uint8 version
        +uint16 flags
        +uint32 schemaId
        +uint32 length
        +uint8 headerEx
        +uint8 bitMode
        +uint16 reserved
    }
    
    class SegmentDescriptor {
        +uint8 segmentCount
        +uint8 segmentType[]
        +uint32 segmentOffset[]
        +uint32 segmentLength[]
    }
    
    class Metadata {
        +string messageId
        +int64 timestamp
        +string source
        +string correlationId
        +uint16 retryCount
        +uint8 priority
        +uint32 ttl
        +map~string,string~ custom
    }
    
    class Payload {
        +variable fields
    }
    
    class IntegrityBlock {
        +uint64 checksum
        +bytes[32] hash
        +variable signature
    }
    
    MessageHeader --> SegmentDescriptor
    SegmentDescriptor --> Metadata
    SegmentDescriptor --> Payload
    SegmentDescriptor --> IntegrityBlock
Loading

Chi tiết Header (16 bytes):

  • Magic (1B): Byte cố định (0xAB) để nhận dạng message
  • Version (1B): Phiên bản định dạng message
  • Flags (2B): Cờ đánh dấu (compression, encryption, priority...)
  • SchemaID (4B): ID của schema để giải mã payload
  • Length (4B): Tổng độ dài message
  • HeaderEx (1B): Byte chỉ ra số byte mở rộng của header
  • BitMode (1B): Chỉ định chế độ bit (0x01: 8-bit, 0x02: 16-bit, 0x03: 32-bit, 0x04: 64-bit)
  • Reserved (2B): Dành cho sử dụng trong tương lai

Cấu hình Bit Mode

Hệ thống hỗ trợ đa dạng bit mode để tối ưu kích thước message:

  1. BIT_MODE = 0x01: 8-bit mode

    • Các giá trị số nguyên được biểu diễn bằng 8 bits
    • Phù hợp cho các message nhỏ, đơn giản
    • Tiết kiệm băng thông tối đa
  2. BIT_MODE = 0x02: 16-bit mode

    • Các giá trị số nguyên được biểu diễn bằng 16 bits
    • Cân bằng giữa kích thước message và phạm vi giá trị
  3. BIT_MODE = 0x03: 32-bit mode

    • Các giá trị số nguyên được biểu diễn bằng 32 bits
    • Phổ biến nhất, tương thích với hầu hết hệ thống
  4. BIT_MODE = 0x04: 64-bit mode

    • Các giá trị số nguyên được biểu diễn bằng 64 bits
    • Phù hợp cho dữ liệu lớn hoặc độ chính xác cao
    • Cần thiết cho timestamps độ chính xác cao và giá trị tài chính

Mã nguồn ví dụ cho việc chuyển đổi giữa các bit mode:

-- Encode số nguyên theo bit mode
function encode_integer(value, bit_mode)
    if bit_mode == BIT_MODE.BIT_8 then
        if value < -128 or value > 127 then
            return nil, "Integer overflow for 8-bit mode"
        end
        return string.char(bit.band(value, 0xFF))
    elseif bit_mode == BIT_MODE.BIT_16 then
        if value < -32768 or value > 32767 then
            return nil, "Integer overflow for 16-bit mode"
        end
        return string.char(bit.band(bit.rshift(value, 8), 0xFF),
                          bit.band(value, 0xFF))
    elseif bit_mode == BIT_MODE.BIT_32 then
        -- Encode 32-bit integer
    elseif bit_mode == BIT_MODE.BIT_64 then
        -- Encode 64-bit integer
    end
end

Quản lý Schema

Hỗ trợ schema versioning và tương thích ngược:

classDiagram
    class SchemaRegistry {
        +registerSchema(id, schema)
        +getSchema(id)
        +findCompatibleSchema(fields)
        +validateAgainstSchema(message, schema)
    }
    
    class SchemaDefinition {
        +int version
        +Field[] fields
    }
    
    class Field {
        +int id
        +string name
        +string type
        +bool required
    }
    
    SchemaRegistry --> SchemaDefinition
    SchemaDefinition --> Field
Loading

Cài đặt Schema Registry trong OpenResty:

-- Schema Registry implementation
local _M = {}
local schema_cache = ngx.shared.schemas

-- Đăng ký schema mới
function _M.register_schema(schema_id, schema_def)
    local success = schema_cache:set(schema_id, cjson.encode(schema_def))
    return success
end

-- Lấy schema theo ID
function _M.get_schema(schema_id)
    local schema_json = schema_cache:get(schema_id)
    if not schema_json then
        return nil, "Schema not found: " .. schema_id
    end
    
    return cjson.decode(schema_json)
end

-- Tìm schema tương thích
function _M.find_compatible_schema(required_fields)
    -- Tìm schema phù hợp nhất với các field cần thiết
end

-- Kiểm tra message với schema
function _M.validate_against_schema(message, schema_id)
    local schema, err = _M.get_schema(schema_id)
    if not schema then
        return false, err
    end
    
    -- Kiểm tra tính hợp lệ của message với schema
end

return _M

Cơ chế đảm bảo tính toàn vẹn

Để đảm bảo tính toàn vẹn của message, hệ thống sử dụng nhiều lớp bảo vệ:

flowchart TD
    A[Message Creation] --> B[Add Sequence Number]
    B --> C[Calculate CRC-64 Checksum]
    C --> D[Calculate SHA-256 Hash]
    D --> E{Critical Message?}
    E -->|Yes| F[Add Digital Signature]
    E -->|No| G[Send Message]
    F --> G
    
    G --> H[Receive Message]
    H --> I[Verify Checksum]
    I -->|Invalid| J[Reject Message]
    I -->|Valid| K[Verify Hash]
    K -->|Invalid| J
    K -->|Valid| L{Has Signature?}
    L -->|Yes| M[Verify Signature]
    L -->|No| N[Check Idempotency]
    M -->|Invalid| J
    M -->|Valid| N
    N -->|Duplicate| O[Log Duplicate]
    N -->|New| P[Process Message]
    P --> Q[Create Audit Log]
Loading

Mã nguồn ví dụ cho tính toàn vẹn message:

-- Tính CRC-64 checksum
function calculate_checksum(data)
    local crc64 = require "crc64"
    return crc64.crc64(data)
end

-- Thêm checksum vào message
function add_checksum(message)
    local checksum_data = string.sub(message, 1, -9) -- Loại bỏ 8 byte cuối
    local checksum = calculate_checksum(checksum_data)
    return checksum_data .. encode_uint64(checksum)
end

-- Kiểm tra checksum
function verify_checksum(message)
    local message_body = string.sub(message, 1, -9)
    local checksum_bytes = string.sub(message, -8)
    local stored_checksum = decode_uint64(checksum_bytes)
    local calculated_checksum = calculate_checksum(message_body)
    return calculated_checksum == stored_checksum
end

-- Tính SHA-256 hash
function calculate_hash(message)
    local resty_sha256 = require "resty.sha256"
    local sha256 = resty_sha256:new()
    sha256:update(message)
    return sha256:final()
end

-- Kiểm tra idempotency
function check_idempotency(key, ttl)
    local dragon = require "resty.dragonfly"
    local df = dragon:new()
    df:connect("dragonfly-host", 6379)
    
    -- Thử thiết lập key với NX (chỉ thành công nếu key chưa tồn tại)
    local status = df:set("idempotent:" .. key, "1", "EX", ttl, "NX")
    df:set_keepalive(10000, 100)
    
    return status == "OK" -- true nếu key mới (không trùng lặp)
end

Hạ tầng xử lý Message

Xử lý dựa trên độ ưu tiên

Hệ thống thiết kế các luồng xử lý riêng biệt cho các mức ưu tiên khác nhau:

flowchart TB
    Kafka[Kafka Cluster] --> P0[Partition 0\nCRITICAL]
    Kafka --> P1[Partition 1\nHIGH]
    Kafka --> P2[Partition 2-5\nNORMAL]
    Kafka --> P3[Partition 6-7\nLOW/BATCH]
    
    P0 --> CG0[Consumer Group\nCritical]
    P1 --> CG0
    P2 --> CG1[Consumer Group\nNormal]
    P3 --> CG2[Consumer Group\nBatch]
    
    subgraph CriticalPool[Critical Pool]
        CP1[Worker 1]
        CP2[Worker 2]
        CP3["..."]
        CP4[Worker N]
    end
    
    subgraph NormalPool[Normal Pool]
        NP1[Worker 1]
        NP2[Worker 2]
        NP3["..."]
        NP4[Worker N]
    end
    
    subgraph BatchPool[Batch Pool]
        BP1[Worker 1]
        BP2[Worker 2]
        BP3["..."]
        BP4[Worker N]
    end
    
    CG0 --> CriticalPool
    CG1 --> NormalPool
    CG2 --> BatchPool
    
    CriticalPool --> Dragonfly
    NormalPool --> Dragonfly
    BatchPool --> Dragonfly
    
    Dragonfly --> ScyllaDB
    
    style CriticalPool fill:#f99,stroke:#333,stroke-width:2px
    style NormalPool fill:#9f9,stroke:#333,stroke-width:2px
    style BatchPool fill:#99f,stroke:#333,stroke-width:2px
Loading

Mã nguồn xác định partition dựa trên mức ưu tiên:

-- Định nghĩa các mức ưu tiên
PRIORITY = {
    CRITICAL = 0,  -- Ưu tiên cao nhất, xử lý ngay lập tức
    HIGH = 1,      -- Ưu tiên cao, xử lý sớm
    NORMAL = 2,    -- Mức tiêu chuẩn
    LOW = 3,       -- Ưu tiên thấp, có thể trì hoãn
    BATCH = 4      -- Xử lý hàng loạt, không cần real-time
}

-- Xác định partition dựa trên mức ưu tiên
function determine_partition(priority)
    if priority == PRIORITY.CRITICAL then
        return 0  -- Partition 0 dành riêng cho CRITICAL
    elseif priority == PRIORITY.HIGH then
        return 1  -- Partition 1 dành riêng cho HIGH
    elseif priority == PRIORITY.NORMAL then
        -- Phân tán đều giữa các partition normal
        return 2 + ngx.now() % 4  -- Partitions 2-5
    else
        -- LOW và BATCH
        return 6 + ngx.now() % 2  -- Partitions 6-7
    end
end

Tổ chức Thread Pool

Thiết kế thread pool tùy chỉnh cho từng loại message:

classDiagram
    class ThreadPoolManager {
        +createPool(name, size, priority)
        +submitTask(task, priority)
        +adjustPoolSize(name, newSize)
        +getStats()
    }
    
    class ThreadPool {
        -string name
        -int size
        -int priority
        -Task[] queue
        +enqueue(task)
        +dequeue()
        +increaseSize()
        +decreaseSize()
    }
    
    class Worker {
        -int id
        -ThreadPool pool
        +start()
        +stop()
        +processTask(task)
    }
    
    class Task {
        -byte[] message
        -int priority
        -int retryCount
        -int timeout
    }
    
    ThreadPoolManager --> ThreadPool
    ThreadPool --> Worker
    Worker --> Task
Loading

Cấu hình OpenResty cho Thread Pool:

# Cấu hình Nginx với nhiều worker pools
worker_processes auto;

# Cấu hình thread pools
thread_pool critical_pool threads=16;
thread_pool normal_pool threads=32;
thread_pool batch_pool threads=8;

# Sử dụng các pool khác nhau trong location
location /api/critical {
    set $pool critical_pool;
    content_by_lua_block {
        process_with_priority(PRIORITY.CRITICAL)
    }
}

location /api/normal {
    set $pool normal_pool;
    content_by_lua_block {
        process_with_priority(PRIORITY.NORMAL)
    }
}

Điều chỉnh độ ưu tiên động

Cơ chế điều chỉnh độ ưu tiên tự động dựa trên điều kiện hệ thống:

-- Điều chỉnh ưu tiên dựa trên điều kiện hệ thống
function adjust_priority(message)
    local header = decode_header(message)
    local metadata = get_message_metadata(message)
    local current_priority = get_message_priority(message)
    
    -- Kiểm tra các điều kiện để nâng cao ưu tiên
    local new_priority = current_priority
    
    -- 1. Kiểm tra thời gian chờ (aging)
    local current_time = ngx.now() * 1000
    local message_age = current_time - metadata.timestamp
    
    if message_age > 60000 and current_priority > PRIORITY.HIGH then
        -- Message đã chờ hơn 60 giây, nâng ưu tiên lên HIGH
        new_priority = PRIORITY.HIGH
    end
    
    -- 2. Kiểm tra retry count
    if metadata.retry_count > 3 and current_priority > PRIORITY.HIGH then
        -- Đã retry nhiều lần, nâng ưu tiên
        new_priority = PRIORITY.HIGH
    end
    
    -- 3. Kiểm tra system load
    local system_load = get_system_load()
    if system_load < 0.5 and current_priority == PRIORITY.BATCH then
        -- Hệ thống đang nhàn rỗi, có thể nâng ưu tiên cho batch
        new_priority = PRIORITY.LOW
    end
    
    -- Cập nhật ưu tiên nếu có thay đổi
    if new_priority ~= current_priority then
        message = set_message_priority(message, new_priority)
        
        -- Ghi log điều chỉnh ưu tiên
        ngx.log(ngx.INFO, "Adjusted message priority: " .. metadata.message_id .. 
                " from " .. current_priority .. " to " .. new_priority)
    end
    
    return message
end

Xử lý lỗi và phục hồi

Mô hình Circuit Breaker

Triển khai mô hình Circuit Breaker để ngăn ngừa lỗi cascade:

stateDiagram-v2
    [*] --> Closed
    
    state Closed {
        [*] --> Operational
        Operational --> FailureDetected: Error occurs
        FailureDetected --> Operational: Success
        FailureDetected --> ThresholdReached: Failure count >= threshold
        ThresholdReached --> [*]: Open circuit
    }
    
    Closed --> Open: Threshold exceeded
    
    state Open {
        [*] --> Waiting
        Waiting --> Timeout: Reset timeout elapsed
        Timeout --> [*]: Half-open circuit
    }
    
    Open --> HalfOpen: Timeout elapsed
    
    state HalfOpen {
        [*] --> TestRequest
        TestRequest --> Success: Request succeeds
        TestRequest --> Failure: Request fails
        Success --> [*]: Close circuit
        Failure --> [*]: Re-open circuit
    }
    
    HalfOpen --> Closed: Success
    HalfOpen --> Open: Failure
Loading

Mã nguồn triển khai Circuit Breaker:

-- Tạo circuit breaker mới
function create_circuit_breaker(name, threshold, reset_timeout)
    local state = {
        failure_count = 0,
        last_failure_time = 0,
        is_open = false
    }
    
    -- Ghi nhận thất bại
    local function record_failure()
        state.failure_count = state.failure_count + 1
        state.last_failure_time = ngx.now()
        
        if state.failure_count >= threshold then
            state.is_open = true
            ngx.log(ngx.WARN, "Circuit " .. name .. " is now OPEN")
        end
    end
    
    -- Ghi nhận thành công
    local function record_success()
        if not state.is_open then
            -- Giảm dần failure count khi thành công
            if state.failure_count > 0 then
                state.failure_count = state.failure_count - 1
            end
        end
    end
    
    -- Kiểm tra trạng thái circuit
    local function is_open()
        -- Kiểm tra nếu circuit đã mở đủ lâu để thử lại
        if state.is_open and ngx.now() - state.last_failure_time >= reset_timeout then
            state.is_open = false
            state.failure_count = 0
            ngx.log(ngx.INFO, "Circuit " .. name .. " is now HALF-OPEN")
            return false
        end
        
        return state.is_open
    end
    
    -- Sử dụng circuit breaker
    local function execute(func, fallback)
        if is_open() then
            ngx.log(ngx.WARN, "Circuit " .. name .. " is OPEN, using fallback")
            return fallback()
        end
        
        local success, result = pcall(func)
        
        if success then
            record_success()
            return result
        else
            record_failure()
            ngx.log(ngx.ERR, "Circuit " .. name .. " failure: " .. tostring(result))
            return fallback()
        end
    end
    
    return {
        record_failure = record_failure,
        record_success = record_success,
        is_open = is_open,
        execute = execute
    }
end

Cơ chế Backpressure

Triển khai backpressure để đảm bảo hệ thống không bị quá tải:

flowchart TB
    Client([Client]) --> Gateway[API Gateway]
    Gateway --> CheckQueueSize{Check Queue Size}
    CheckQueueSize -->|Queue OK| AcceptRequest[Accept Request]
    CheckQueueSize -->|Queue Full| CheckPriority{Check Priority}
    
    CheckPriority -->|Critical/High| ForceAccept[Force Accept]
    CheckPriority -->|Normal| SlowDown[Rate Limit]
    CheckPriority -->|Low/Batch| Reject[Reject Request]
    
    ForceAccept --> ProcessRequest[Process Request]
    AcceptRequest --> ProcessRequest
    SlowDown --> ProcessRequest
    
    ProcessRequest --> Broker[Message Broker]
    Broker --> Consumer[Consumer]
    Consumer --> CheckConsumerLag{Check Consumer Lag}
    
    CheckConsumerLag -->|Lag OK| Process[Process Message]
    CheckConsumerLag -->|High Lag| AdjustBatchSize[Reduce Batch Size]
    
    AdjustBatchSize --> Process
    Process --> Complete[Complete]
Loading

Mã nguồn triển khai Backpressure:

-- Kiểm tra backpressure
function check_backpressure()
    -- Lấy thống kê hiện tại của các queue
    local stats = task_queue.get_stats()
    
    -- Kiểm tra ngưỡng
    if stats.total > 10000 or 
       stats.critical > 100 or 
       stats.high > 1000 then
        
        -- Áp dụng backpressure
        return true, "System overloaded"
    end
    
    -- Kiểm tra consumer lag
    local kafka_lag = get_kafka_consumer_lag()
    if kafka_lag > 5000 then
        return true, "Consumer lag too high: " .. kafka_lag
    end
    
    return false
end

-- Áp dụng backpressure trong API Gateway
function api_gateway_backpressure()
    local backpressure, reason = check_backpressure()
    
    if backpressure then
        -- Lấy mức ưu tiên từ request
        local priority = tonumber(ngx.req.get_headers()["X-Priority"] or PRIORITY.NORMAL)
        
        if priority <= PRIORITY.HIGH then
            -- Cho phép request ưu tiên cao đi qua nhưng ghi log
            ngx.log(ngx.WARN, "Allowing high priority request despite backpressure")
            return false
        elseif priority == PRIORITY.NORMAL then
            -- Làm chậm request bình thường
            ngx.sleep(0.5)
            return false
        else
            -- Từ chối request ưu tiên thấp
            ngx.status = 429  -- Too Many Requests
            ngx.header["Retry-After"] = "10"
            ngx.say(cjson.encode({
                error = "Service temporarily unavailable due to high load",
                reason = reason,
                retry_after = 10
            }))
            return ngx.exit(429)
        end
    end
    
    -- Không áp dụng backpressure
    return false
end

Chiến lược Retry

Cài đặt chiến lược retry với exponential backoff:

flowchart LR
    Start([Process Message]) --> Process[Process Message]
    Process --> CheckResult{Success?}
    
    CheckResult -->|Yes| Complete([Complete])
    CheckResult -->|No| CheckRetryable{Is Retryable?}
    
    CheckRetryable -->|No| DLQ[Send to Dead Letter Queue]
    CheckRetryable -->|Yes| CheckRetryCount{Retry Count < Max?}
    
    CheckRetryCount -->|No| DLQ
    CheckRetryCount -->|Yes| CalculateBackoff[Calculate Backoff]
    
    CalculateBackoff --> ScheduleRetry[Schedule Retry]
    ScheduleRetry --> RetryTopic[Send to Retry Topic]
    RetryTopic -.->|After delay| Process
    
    DLQ --> Alert[Send Alert]
    DLQ --> Log[Log Failure]
Loading

Mã nguồn triển khai Retry:

-- Tính toán khoảng thời gian backoff
function calculate_backoff(retry_count, base_delay)
    -- Exponential backoff với jitter
    local max_delay = 60000  -- 60 seconds max
    local delay = math.min(max_delay, (base_delay or 100) * math.pow(2, retry_count))
    
    -- Thêm jitter để tránh thundering herd
    local jitter = delay * 0.2  -- 20% jitter
    delay = delay + (math.random() * jitter - jitter/2)
    
    return delay
end

-- Xử lý retry
function handle_retry(message, error_info)
    local metadata = get_message_metadata(message)
    local retry_count = metadata.retry_count or 0
    
    -- Kiểm tra nếu đã vượt quá số lần retry tối đa
    local max_retries = get_max_retries_for_priority(get_message_priority(message))
    
    if retry_count >= max_retries then
        -- Chuyển sang Dead Letter Queue
        send_to_dlq(message, error_info)
        return
    end
    
    -- Tăng retry count
    metadata.retry_count = retry_count + 1
    update_message_metadata(message, metadata)
    
    -- Tính delay dựa trên retry count
    local delay = calculate_backoff(retry_count, 100)
    
    -- Lưu vào Dragonfly để retry sau
    local dragon = require "resty.dragonfly"
    local df = dragon:new()
    df:connect("dragonfly-host", 6379)
    
    -- Sử dụng Sorted Set với score là thời gian retry
    local retry_time = ngx.now() * 1000 + delay
    df:zadd("retry_queue", retry_time, encode_base64(message))
    
    df:set_keepalive(10000, 100)
    
    ngx.log(ngx.INFO, "Scheduled retry for message " .. metadata.message_id .. 
            " in " .. delay .. "ms (retry " .. metadata.retry_count .. ")")
end

-- Worker riêng để xử lý retry queue
function retry_worker()
    local dragon = require "resty.dragonfly"
    
    while true do
        local df = dragon:new()
        df:connect("dragonfly-host", 6379)
        
        -- Lấy các message cần retry (có score <= thời gian hiện tại)
        local current_time = ngx.now() * 1000
        local retry_messages = df:zrangebyscore("retry_queue", 0, current_time, "LIMIT", 0, 10)
        
        if #retry_messages > 0 then
            -- Xử lý các message cần retry
            for _, encoded_message in ipairs(retry_messages) do
                local message = decode_base64(encoded_message)
                
                -- Xóa khỏi retry queue
                df:zrem("retry_queue", encoded_message)
                
                -- Gửi lại message vào Kafka
                local priority = get_message_priority(message)
                local partition = determine_partition(priority)
                send_to_kafka(message, "orders", partition)
                
                ngx.log(ngx.INFO, "Retrying message from retry queue")
            end
        end
        
        df:set_keepalive(10000, 100)
        
        -- Chờ một khoảng thời gian trước khi kiểm tra lại
        ngx.sleep(1)
    end
end

Chiến lược triển khai

Cấu hình Kubernetes

Triển khai hệ thống trên Kubernetes sử dụng StatefulSets cho các thành phần có state và Deployments cho các thành phần stateless:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: order-processor-critical
spec:
  serviceName: "order-processor-critical"
  replicas: 3
  selector:
    matchLabels:
      app: order-processor
      priority: critical
  template:
    metadata:
      labels:
        app: order-processor
        priority: critical
    spec:
      containers:
      - name: order-processor
        image: order-processor:1.0.0
        env:
        - name: CONSUMER_GROUP
          value: "order-processor-critical"
        - name: PARTITIONS
          value: "0,1"  # Chỉ xử lý CRITICAL và HIGH
        - name: MAX_THREADS
          value: "16"
        - name: PRIORITY_LEVEL
          value: "critical"
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        volumeMounts:
        - name: config-volume
          mountPath: /etc/openresty/conf.d
      volumes:
      - name: config-volume
        configMap:
          name: openresty-critical-config
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: order-processor-normal-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: StatefulSet
    name: order-processor-normal
  minReplicas: 5
  maxReplicas: 20
  metrics:
  - type: External
    external:
      metric:
        name: kafka_consumer_lag
        selector:
          matchLabels:
            consumer_group: order-processor-normal
      target:
        type: AverageValue
        averageValue: 1000

Helm chart structure:

trading-system/
├── Chart.yaml
├── values.yaml
├── templates/
│   ├── configmap.yaml
│   ├── deployment.yaml
│   ├── service.yaml
│   ├── statefulset.yaml
│   ├── hpa.yaml
│   ├── ingress.yaml
│   └── serviceaccount.yaml
└── charts/
    ├── kafka/
    ├── dragonfly/
    ├── scylladb/
    └── prometheus/

Cấu hình Nix

Sử dụng Nix để cài đặt và cấu hình môi trường một cách nhất quán và tái sản xuất được:

{ pkgs ? import <nixpkgs> {} }:

let
  # Các gói phụ thuộc
  deps = with pkgs; [
    openresty
    dragonfly
    kafkacat
    scylla
    erlang
    elixir
    rustup
    jdk11
    prometheus
    grafana
    kubectl
    kubernetes-helm
  ];
  
  # Tạo shell script để thiết lập môi trường
  setupScript = pkgs.writeShellScriptBin "setup-trading-system" ''
    # Cài đặt các thành phần cần thiết
    echo "Setting up trading system environment..."
    
    # Cấu hình Kafka
    echo "Configuring Kafka..."
    ${pkgs.kafkacat}/bin/kcat -b localhost:9092 -L
    
    # Cấu hình Dragonfly
    echo "Configuring Dragonfly..."
    ${pkgs.dragonfly}/bin/cli -h localhost -p 6379 ping
    
    # Triển khai lên Kubernetes
    echo "Deploying to Kubernetes..."
    ${pkgs.kubernetes-helm}/bin/helm install trading-system ./helm/trading-system
    
    echo "Setup completed successfully!"
  '';
  
  # Tạo shell script để khởi động hệ thống
  startScript = pkgs.writeShellScriptBin "start-trading-system" ''
    # Khởi động các thành phần cần thiết
    echo "Starting trading system..."
    
    # Khởi động Kafka
    echo "Starting Kafka..."
    
    # Khởi động Dragonfly
    echo "Starting Dragonfly..."
    ${pkgs.dragonfly}/bin/dragonfly --daemonize
    
    # Khởi động ScyllaDB
    echo "Starting ScyllaDB..."
    
    # Khởi động OpenResty
    echo "Starting OpenResty..."
    ${pkgs.openresty}/bin/openresty -p $PWD/openresty -c conf/nginx.conf
    
    echo "Trading system started successfully!"
  '';
  
  # Tạo shell script để dừng hệ thống
  stopScript = pkgs.writeShellScriptBin "stop-trading-system" ''
    # Dừng các thành phần
    echo "Stopping trading system..."
    
    # Dừng OpenResty
    echo "Stopping OpenResty..."
    ${pkgs.openresty}/bin/openresty -p $PWD/openresty -s stop
    
    # Dừng Dragonfly
    echo "Stopping Dragonfly..."
    ${pkgs.dragonfly}/bin/cli -h localhost -p 6379 shutdown
    
    # Dừng ScyllaDB
    echo "Stopping ScyllaDB..."
    
    # Dừng Kafka
    echo "Stopping Kafka..."
    
    echo "Trading system stopped successfully!"
  '';
  
in pkgs.mkShell {
  name = "trading-system-env";
  buildInputs = deps ++ [ setupScript startScript stopScript ];
  
  shellHook = ''
    export KUBECONFIG=~/.kube/config
    export PATH=$PATH:$PWD/bin
    
    echo "Trading system development environment activated."
    echo "Available commands:"
    echo "  setup-trading-system - Set up the trading system environment"
    echo "  start-trading-system - Start all trading system components"
    echo "  stop-trading-system  - Stop all trading system components"
  '';
}

Sử dụng Nix:

# Tạo môi trường phát triển
nix-shell

# Thiết lập hệ thống
setup-trading-system

# Khởi động hệ thống
start-trading-system

# Dừng hệ thống
stop-trading-system

Monitoring và Observability

Triển khai hệ thống monitoring đầy đủ với Prometheus, Grafana, và ELK stack:

flowchart TB
    Services[Services] --> ServiceMetrics[Service Metrics]
    Services --> AppLogs[Application Logs]
    Services --> TraceData[Trace Data]
    
    ServiceMetrics --> Prometheus[Prometheus]
    AppLogs --> Fluent[FluentBit]
    TraceData --> Jaeger[Jaeger]
    
    Prometheus --> AlertManager[Alert Manager]
    Prometheus --> Grafana[Grafana]
    
    Fluent --> Elasticsearch[Elasticsearch]
    Elasticsearch --> Kibana[Kibana]
    
    AlertManager --> Alerting[Alerting\nEmail/Slack/PagerDuty]
    
    Grafana --> Dashboard[Dashboards]
    Kibana --> LogExplorer[Log Explorer]
    Jaeger --> TraceViewer[Trace Explorer]
Loading

Cấu hình Prometheus để giám sát hệ thống:

apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
      
    rule_files:
      - /etc/prometheus/rules/*.rules
      
    alerting:
      alertmanagers:
        - static_configs:
            - targets: ['alertmanager:9093']
      
    scrape_configs:
      - job_name: 'kubernetes-pods'
        kubernetes_sd_configs:
          - role: pod
        relabel_configs:
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
            action: keep
            regex: true
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
            action: replace
            target_label: __metrics_path__
            regex: (.+)
          - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
            action: replace
            regex: ([^:]+)(?::\d+)?;(\d+)
            replacement: $1:$2
            target_label: __address__
          - action: labelmap
            regex: __meta_kubernetes_pod_label_(.+)
          - source_labels: [__meta_kubernetes_namespace]
            action: replace
            target_label: kubernetes_namespace
          - source_labels: [__meta_kubernetes_pod_name]
            action: replace
            target_label: kubernetes_pod_name
      
      - job_name: 'kafka'
        static_configs:
          - targets: ['kafka-exporter:9308']
      
      - job_name: 'dragonfly'
        static_configs:
          - targets: ['dragonfly-exporter:9121']
          
      - job_name: 'openresty'
        static_configs:
          - targets: ['openresty-exporter:9145']

Tối ưu hiệu năng

Các biện pháp tối ưu hiệu năng chính:

  1. Sử dụng Binary Protocol thay vì JSON: Giảm kích thước message 40-60%

  2. Message Batching: Gom nhóm các message có cùng mức ưu tiên

-- Thực hiện batch cho các message có cùng độ ưu tiên
function batch_messages(messages, max_batch_size)
    local batches = {}
    
    -- Nhóm message theo độ ưu tiên
    local priority_groups = {}
    
    for _, message in ipairs(messages) do
        local priority = get_message_priority(message)
        priority_groups[priority] = priority_groups[priority] or {}
        table.insert(priority_groups[priority], message)
    end
    
    -- Tạo batches cho mỗi nhóm ưu tiên
    for priority, msgs in pairs(priority_groups) do
        local batch = {}
        
        for i, msg in ipairs(msgs) do
            table.insert(batch, msg)
            
            if #batch >= max_batch_size or i == #msgs then
                table.insert(batches, {
                    priority = priority,
                    messages = batch
                })
                batch = {}
            end
        end
    end
    
    return batches
end
  1. Zero-copy Buffer Management: Giảm thiểu sao chép dữ liệu không cần thiết
// Zero-copy buffer management trong Rust
fn process_message(buffer: &[u8]) -> Result<(), Error> {
    // Get message header without copying the data
    let header = parse_header(&buffer[0..16])?;
    
    // Get message body using a slice reference
    let body = &buffer[16..header.length as usize];
    
    // Process the message without additional copies
    process_body(body, header.bit_mode)
}
  1. Connection Pooling: Tái sử dụng kết nối đến Dragonfly và ScyllaDB
-- Connection pooling cho Dragonfly
local function get_dragonfly_connection()
    local dragon = require "resty.dragonfly"
    local df = dragon:new()
    
    df:set_timeout(1000)
    
    local ok, err = df:connect("dragonfly-host", 6379)
    if not ok then
        return nil, err
    end
    
    return df
end

-- Sử dụng connection và trả lại pool
local function with_dragonfly(callback)
    local df, err = get_dragonfly_connection()
    if not df then
        return nil, err
    end
    
    local result, err = callback(df)
    
    -- Trả connection về pool
    local ok, err = df:set_keepalive(10000, 100)
    if not ok then
        ngx.log(ngx.ERR, "Failed to set keepalive: ", err)
    end
    
    return result, err
end
  1. Caching Schema Data: Lưu cache schema để tối ưu hiệu năng decode
-- Tạo và sử dụng lru cache cho schema
local lrucache = require "resty.lrucache"
local schema_cache = lrucache.new(200)  -- Lưu trữ tối đa 200 schema

function get_schema(schema_id)
    -- Thử lấy từ cache trước
    local schema = schema_cache:get(schema_id)
    if schema then
        return schema
    end
    
    -- Nếu không có trong cache, lấy từ Dragonfly
    local dragon = require "resty.dragonfly"
    local df = dragon:new()
    df:connect("dragonfly-host", 6379)
    
    local schema_json = df:get("schema:" .. schema_id)
    df:set_keepalive(10000, 100)
    
    if not schema_json then
        return nil, "Schema not found"
    end
    
    -- Parse schema và lưu vào cache
    local schema = cjson.decode(schema_json)
    schema_cache:set(schema_id, schema, 3600)  -- Cache 1 giờ
    
    return schema
end
  1. Kernel Tuning: Tối ưu các tham số hệ thống
# Tăng số lượng file descriptors
sysctl -w fs.file-max=2097152

# Tăng giới hạn cho ephemeral ports
sysctl -w net.ipv4.ip_local_port_range="1024 65535"

# Tăng backlog cho TCP connections
sysctl -w net.core.somaxconn=65536
sysctl -w net.ipv4.tcp_max_syn_backlog=8192

# Tăng kích thước window cho TCP
sysctl -w net.ipv4.tcp_rmem="4096 87380 16777216"
sysctl -w net.ipv4.tcp_wmem="4096 65536 16777216"

# Enable TCP BBR congestion control
sysctl -w net.core.default_qdisc=fq
sysctl -w net.ipv4.tcp_congestion_control=bbr

Kết luận và khuyến nghị

Ưu điểm của thiết kế

  1. Hiệu năng cao: Kết hợp OpenResty, Kafka, Dragonfly và định dạng binary message mang lại hiệu suất xử lý cao.

  2. Độ tin cậy: Cơ chế đa lớp đảm bảo tính toàn vẹn message và khả năng phục hồi.

  3. Khả năng mở rộng: Thiết kế cho phép mở rộng dễ dàng để đáp ứng tải cao.

  4. Quản lý ưu tiên thông minh: Phân tách luồng xử lý theo mức độ ưu tiên giúp tối ưu tài nguyên.

  5. Tính linh hoạt: Hỗ trợ đa dạng bit mode và schema versioning giúp thích ứng với các yêu cầu khác nhau.

  6. Observability: Hệ thống giám sát và cảnh báo đầy đủ giúp phát hiện và xử lý sự cố kịp thời.

Khuyến nghị triển khai

  1. Triển khai từng giai đoạn:

    • Giai đoạn 1: Triển khai hạ tầng cơ bản (Kafka, Dragonfly, ScyllaDB)
    • Giai đoạn 2: Triển khai API Gateway và Message Formatter
    • Giai đoạn 3: Triển khai Consumer Groups và Thread Pools
    • Giai đoạn 4: Triển khai Error Handling và Recovery
    • Giai đoạn 5: Triển khai Monitoring và Alerting
  2. Kiểm thử tải: Thực hiện kiểm thử tải trước khi triển khai production để xác định các giới hạn và điểm nghẽn.

  3. Triển khai Multi-region: Xem xét triển khai hệ thống trên nhiều region để đảm bảo tính sẵn sàng cao.

  4. Tài liệu hóa: Tạo tài liệu chi tiết cho hệ thống, bao gồm kiến trúc, quy trình triển khai, và quy trình vận hành.

  5. Đào tạo: Đào tạo đội ngũ vận hành về cách giám sát và xử lý sự cố hệ thống.

Với thiết kế được đề xuất, hệ thống xử lý message sẽ đáp ứng tốt các yêu cầu về hiệu năng, độ tin cậy, và khả năng mở rộng cho hệ thống giao dịch chứng khoán.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment