Skip to content

Instantly share code, notes, and snippets.

@h0ffy
Last active June 5, 2025 18:59
Show Gist options
  • Save h0ffy/595ef5de76c2735a1895efbdd120efb9 to your computer and use it in GitHub Desktop.
Save h0ffy/595ef5de76c2735a1895efbdd120efb9 to your computer and use it in GitHub Desktop.
// JennyLab Research! ( https://www.jennylab.net )
// ========================================================================================= //
// == SEASTAR HIGH-THROUGHPUT PATTERN GENERATION & DETECTION ENGINE == //
// == (No Device Interaction, Focus on Core Logic) == //
// ========================================================================================= //
// NOTE: Focus on data generation/detection. No SDR/USB. Detection logic is a placeholder. //
// Requires Seastar, Boost.Program_Options, Nlohmann JSON. //
// ========================================================================================= //
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <deque>
#include <optional>
#include <memory>
#include <algorithm>
#include <iomanip>
#include <variant>
#include <random> // For MockDataSource
#include <charconv> // For hex parsing
// Seastar Headers
#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/print.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/metrics.hh>
// Nlohmann JSON
#include "json.hpp" // Assumes json.hpp is accessible
using json = nlohmann::json;
// ========================================================================================= //
// == CORE DATA TYPES == //
// ========================================================================================= //
using LogicalPattern = std::vector<uint8_t>;
using FormattedDataPacket = std::vector<uint8_t>;
using DataChunk = std::vector<uint8_t>;
namespace Utility {
void print_byte_vector(const std::vector<uint8_t>& vec, const std::string& prefix = "", size_t max_len = 32) {
seastar::print("%s", prefix);
for (size_t i = 0; i < std::min(vec.size(), max_len); ++i) {
seastar::print("%02x%s", vec[i], (i == std::min(vec.size(), max_len) - 1) ? "" : " ");
}
if (vec.size() > max_len) {
seastar::print("... (%lu bytes total)", vec.size());
}
seastar::print("\n");
}
std::vector<uint8_t> hex_string_to_bytes(const std::string& hex) {
std::vector<uint8_t> bytes;
if (hex.length() % 2 != 0) {
seastar::print("Warning: Hex string has odd length: %s\n", hex);
// Optionally throw, or ignore last char, or prepend 0. Here, just return empty.
return bytes;
}
for (size_t i = 0; i < hex.length(); i += 2) {
uint8_t byte;
std::string byte_str = hex.substr(i, 2);
auto [ptr, ec] = std::from_chars(byte_str.data(), byte_str.data() + 2, byte, 16);
if (ec == std::errc()) {
bytes.push_back(byte);
} else {
seastar::print("Warning: Invalid hex byte '%s' in string '%s'\n", byte_str, hex);
// Optionally throw or skip.
}
}
return bytes;
}
}
// ========================================================================================= //
// == CONFIGURATION (GENERATION & DETECTION) == //
// ========================================================================================= //
enum class LogicalPatternType {
STATIC_LIST,
SEQUENTIAL_BYTES,
RANDOM_BYTES
};
struct LogicalPatternConfig {
std::string name = "default_logical_gen";
LogicalPatternType type = LogicalPatternType::SEQUENTIAL_BYTES;
std::vector<LogicalPattern> static_patterns_data; // Parsed from hex strings
uint64_t seq_start_val = 0;
uint64_t seq_end_val = 255;
int seq_length_bytes = 1;
int random_min_len = 8;
int random_max_len = 64;
uint64_t limit_generated_patterns = 0; // 0 for unlimited (until provider specific end)
};
struct ProtocolFormattingRule {
std::string name = "default_formatter";
std::vector<uint8_t> header_data; // Parsed from hex string
std::vector<uint8_t> footer_data; // Parsed from hex string
bool add_mock_checksum = false;
};
enum class DetectionConditionType {
EXACT_SEQUENCE,
CONTAINS_SUBSEQUENCE,
// REGEX_MATCH // Requires regex library
};
struct DetectionRuleConfig {
std::string name = "default_detection_rule";
DetectionConditionType condition_type = DetectionConditionType::EXACT_SEQUENCE;
std::vector<uint8_t> byte_sequence_to_detect_data; // Parsed from hex string
// std::string regex_to_detect;
std::string action_on_detect = "log";
};
class AppConfig {
public:
std::vector<LogicalPatternConfig> logical_pattern_configs;
std::vector<ProtocolFormattingRule> protocol_formatting_rules;
std::vector<DetectionRuleConfig> detection_rules_configs;
int generators_parallelism = 1;
int detectors_parallelism = seastar::smp::count;
size_t data_source_chunk_size = 4096;
uint64_t data_source_max_chunks_per_detector = 10000; // Limit for mock data source
static AppConfig load_from_json_string(const std::string& json_str) {
AppConfig cfg;
json j;
try {
j = json::parse(json_str);
if (j.contains("logical_pattern_generators")) {
for (const auto& gen_j : j["logical_pattern_generators"]) {
LogicalPatternConfig lpc;
lpc.name = gen_j.value("name", "unnamed_logical_gen");
std::string type_str = gen_j.value("type", "sequential_bytes");
if (type_str == "static_list") {
lpc.type = LogicalPatternType::STATIC_LIST;
if (gen_j.contains("static_patterns_hex")) {
for(const auto& p_hex : gen_j["static_patterns_hex"]) {
lpc.static_patterns_data.push_back(Utility::hex_string_to_bytes(p_hex.get<std::string>()));
}
}
} else if (type_str == "sequential_bytes") {
lpc.type = LogicalPatternType::SEQUENTIAL_BYTES;
lpc.seq_start_val = gen_j.value("seq_start_val", 0);
lpc.seq_end_val = gen_j.value("seq_end_val", 255);
lpc.seq_length_bytes = gen_j.value("seq_length_bytes", 1);
} else if (type_str == "random_bytes") {
lpc.type = LogicalPatternType::RANDOM_BYTES;
lpc.random_min_len = gen_j.value("random_min_len", 8);
lpc.random_max_len = gen_j.value("random_max_len", 64);
}
lpc.limit_generated_patterns = gen_j.value("limit_generated_patterns", 0);
cfg.logical_pattern_configs.push_back(lpc);
}
}
if (j.contains("protocol_formatters")) {
for (const auto& fmt_j : j["protocol_formatters"]) {
ProtocolFormattingRule pfr;
pfr.name = fmt_j.value("name", "unnamed_formatter");
pfr.header_data = Utility::hex_string_to_bytes(fmt_j.value("header_hex", ""));
pfr.footer_data = Utility::hex_string_to_bytes(fmt_j.value("footer_hex", ""));
pfr.add_mock_checksum = fmt_j.value("add_mock_checksum", false);
cfg.protocol_formatting_rules.push_back(pfr);
}
}
if (j.contains("detection_rules")) {
for (const auto& dr_j : j["detection_rules"]) {
DetectionRuleConfig drc;
drc.name = dr_j.value("name", "unnamed_detection_rule");
std::string cond_type_str = dr_j.value("condition_type", "exact_sequence");
if (cond_type_str == "exact_sequence") drc.condition_type = DetectionConditionType::EXACT_SEQUENCE;
else if (cond_type_str == "contains_subsequence") drc.condition_type = DetectionConditionType::CONTAINS_SUBSEQUENCE;
drc.byte_sequence_to_detect_data = Utility::hex_string_to_bytes(dr_j.value("byte_sequence_hex", ""));
drc.action_on_detect = dr_j.value("action_on_detect", "log");
cfg.detection_rules_configs.push_back(drc);
}
}
cfg.generators_parallelism = j.value("generators_parallelism", 1);
cfg.detectors_parallelism = j.value("detectors_parallelism", (int)seastar::smp::count);
cfg.data_source_chunk_size = j.value("data_source_chunk_size", 4096);
cfg.data_source_max_chunks_per_detector = j.value("data_source_max_chunks_per_detector", 10000);
} catch (json::exception& e) {
seastar::print("Error parsing AppConfig JSON: %s\n", e.what());
throw;
}
seastar::print("AppConfig loaded. Logical Gens: %d, Formatters: %d, Detection Rules: %d\n",
cfg.logical_pattern_configs.size(), cfg.protocol_formatting_rules.size(), cfg.detection_rules_configs.size());
return cfg;
}
};
// ========================================================================================= //
// == PATTERN GENERATION MODULE == //
// ========================================================================================= //
class ILogicalPatternProvider {
public:
virtual ~ILogicalPatternProvider() = default;
virtual seastar::future<std::optional<LogicalPattern>> next_pattern() = 0;
virtual std::string get_name() const = 0;
virtual void reset() = 0;
};
class FlexibleLogicalPatternProvider : public ILogicalPatternProvider {
LogicalPatternConfig _config;
uint64_t _current_seq_val;
size_t _current_static_idx;
std::mt19937_64 _rng;
uint64_t _generated_count = 0;
public:
FlexibleLogicalPatternProvider(const LogicalPatternConfig& cfg)
: _config(cfg), _current_seq_val(cfg.seq_start_val), _current_static_idx(0) {
std::random_device rd;
_rng.seed(rd() + seastar::this_shard_id());
}
std::string get_name() const override { return _config.name; }
void reset() override {
_current_seq_val = _config.seq_start_val;
_current_static_idx = 0;
_generated_count = 0;
}
seastar::future<std::optional<LogicalPattern>> next_pattern() override {
if (_config.limit_generated_patterns > 0 && _generated_count >= _config.limit_generated_patterns) {
return seastar::make_ready_future<std::optional<LogicalPattern>>(std::nullopt);
}
LogicalPattern p;
bool pattern_generated = false;
switch(_config.type) {
case LogicalPatternType::SEQUENTIAL_BYTES:
if (_current_seq_val <= _config.seq_end_val) {
p.resize(_config.seq_length_bytes);
uint64_t temp = _current_seq_val;
for (int i = _config.seq_length_bytes - 1; i >= 0; --i) {
p[i] = static_cast<uint8_t>(temp & 0xFF);
temp >>= 8;
}
_current_seq_val++;
pattern_generated = true;
}
break;
case LogicalPatternType::RANDOM_BYTES: {
size_t len = std::uniform_int_distribution<size_t>(_config.random_min_len, _config.random_max_len)(_rng);
p.resize(len);
for(size_t i=0; i<len; ++i) p[i] = static_cast<uint8_t>(_rng() % 256);
pattern_generated = true;
}
break;
case LogicalPatternType::STATIC_LIST:
if (_current_static_idx < _config.static_patterns_data.size()) {
p = _config.static_patterns_data[_current_static_idx];
_current_static_idx++;
pattern_generated = true;
}
break;
}
if (pattern_generated) {
_generated_count++;
return seastar::make_ready_future<std::optional<LogicalPattern>>(std::move(p));
}
return seastar::make_ready_future<std::optional<LogicalPattern>>(std::nullopt);
}
};
class ProtocolFormatter {
ProtocolFormattingRule _rule;
public:
ProtocolFormatter(const ProtocolFormattingRule& rule) : _rule(rule) {}
const std::string& get_name() const { return _rule.name; }
FormattedDataPacket format(const LogicalPattern& pattern) const {
FormattedDataPacket packet;
packet.reserve(_rule.header_data.size() + pattern.size() + _rule.footer_data.size() + (_rule.add_mock_checksum ? 1 : 0));
packet.insert(packet.end(), _rule.header_data.begin(), _rule.header_data.end());
packet.insert(packet.end(), pattern.begin(), pattern.end());
if (_rule.add_mock_checksum && !pattern.empty()) {
uint8_t checksum = 0;
for (uint8_t byte : pattern) checksum ^= byte; // Simple XOR checksum
packet.push_back(checksum);
}
packet.insert(packet.end(), _rule.footer_data.begin(), _rule.footer_data.end());
return packet;
}
};
class GenerationOrchestrator {
std::unique_ptr<ILogicalPatternProvider> _logical_gen;
ProtocolFormatter _formatter;
uint64_t _packets_generated_total = 0;
seastar::gate _gate;
seastar::metrics::counter _packets_generated_metric;
public:
GenerationOrchestrator(std::unique_ptr<ILogicalPatternProvider> lg, ProtocolFormatter pf)
: _logical_gen(std::move(lg)), _formatter(std::move(pf)) {}
seastar::future<> register_metrics(const std::string& id_prefix) {
seastar::metrics::metric_group_definition mgs;
mgs.add_counter(id_prefix + "_packets_generated_total", _packets_generated_metric, "Total packets generated by this orchestrator");
return seastar::metrics::add_group(seastar::sprint("generator_%s_%s", _logical_gen->get_name(), _formatter.get_name()), mgs);
}
seastar::future<uint64_t> run_generation_loop(uint64_t num_packets_to_generate_if_limited) {
return seastar::with_gate(_gate, [this, num_packets_to_generate_if_limited] {
uint64_t limit = num_packets_to_generate_if_limited > 0 ?
num_packets_to_generate_if_limited :
std::numeric_limits<uint64_t>::max();
return seastar::repeat([this, limit]() -> seastar::future<seastar::stop_iteration> {
if (_packets_generated_total >= limit || _gate.is_closed()) {
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes);
}
return _logical_gen->next_pattern().then([this](std::optional<LogicalPattern> opt_lp) {
if (!opt_lp) {
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes);
}
FormattedDataPacket packet = _formatter.format(*opt_lp);
_packets_generated_total++;
_packets_generated_metric++;
if (_packets_generated_total % 500000 == 0) { // Log less frequently
seastar::print("Gen (%s/%s) Shard %u: %lu packets generated.\n",
_logical_gen->get_name(), _formatter.get_name(), seastar::this_shard_id(), _packets_generated_total);
}
// In a real system, packet would be pushed to a queue or consumer
// For now, it's just generated.
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::no);
});
}).then([this] {
seastar::print("Gen (%s/%s) Shard %u: Loop finished. Total generated: %lu\n",
_logical_gen->get_name(), _formatter.get_name(), seastar::this_shard_id(), _packets_generated_total);
return seastar::make_ready_future<uint64_t>(_packets_generated_total);
});
});
}
seastar::future<> stop() { return _gate.close(); }
};
// ========================================================================================= //
// == PATTERN DETECTION MODULE == //
// ========================================================================================= //
class IDataSource {
public:
virtual ~IDataSource() = default;
virtual seastar::future<std::optional<DataChunk>> get_chunk() = 0;
virtual std::string get_name() const = 0;
};
class MockDataSource : public IDataSource {
std::string _name;
size_t _chunk_size;
uint64_t _chunks_generated = 0;
uint64_t _max_chunks;
std::mt19937_64 _rng;
std::vector<FormattedDataPacket> _inject_packets;
size_t _inject_idx = 0;
bool _cycled_injections = false;
public:
MockDataSource(std::string name, size_t chunk_size, uint64_t max_chunks, std::vector<FormattedDataPacket> injections = {})
: _name(name), _chunk_size(chunk_size), _max_chunks(max_chunks), _inject_packets(std::move(injections)) {
std::random_device rd;
_rng.seed(rd() + seastar::this_shard_id() + std::hash<std::string>{}(name));
}
std::string get_name() const override { return _name; }
seastar::future<std::optional<DataChunk>> get_chunk() override {
if (_chunks_generated >= _max_chunks) {
return seastar::make_ready_future<std::optional<DataChunk>>(std::nullopt);
}
DataChunk chunk;
chunk.reserve(_chunk_size);
// Inject a predefined packet (if any left and at certain intervals)
if (!_inject_packets.empty() && (_chunks_generated % 7 == 0 || !_cycled_injections)) { // Inject more frequently at start
if (_inject_idx < _inject_packets.size()) {
const auto& packet_to_inject = _inject_packets[_inject_idx];
chunk.insert(chunk.end(), packet_to_inject.begin(), packet_to_inject.end());
_inject_idx++;
if(_inject_idx >= _inject_packets.size()) _cycled_injections = true; // Cycled through all injections at least once
}
}
while(chunk.size() < _chunk_size) {
chunk.push_back(static_cast<uint8_t>(_rng() % 256));
}
if(chunk.size() > _chunk_size) chunk.resize(_chunk_size); // Ensure exact chunk size
_chunks_generated++;
return seastar::make_ready_future<std::optional<DataChunk>>(std::move(chunk));
}
};
class PreparedDetectionRule {
DetectionRuleConfig _config;
// For CONTAINS_SUBSEQUENCE, pre-calculate KMP next table or use Boyer-Moore for single pattern.
// For multiple patterns, Aho-Corasick automaton would be built here.
public:
PreparedDetectionRule(const DetectionRuleConfig& cfg) : _config(cfg) {}
const std::string& get_name() const { return _config.name; }
bool check(const DataChunk& chunk) const {
// Placeholder for advanced detection algorithms.
// This simple std::search is not "high-performance" for "many requests".
if (_config.byte_sequence_to_detect_data.empty()) return false;
if (_config.condition_type == DetectionConditionType::EXACT_SEQUENCE) {
return chunk == _config.byte_sequence_to_detect_data;
} else if (_config.condition_type == DetectionConditionType::CONTAINS_SUBSEQUENCE) {
if (chunk.size() < _config.byte_sequence_to_detect_data.size()) return false;
auto it = std::search(chunk.begin(), chunk.end(),
_config.byte_sequence_to_detect_data.begin(),
_config.byte_sequence_to_detect_data.end());
return it != chunk.end();
}
return false;
}
const std::string& get_action() const { return _config.action_on_detect; }
};
class DetectionEngine {
std::string _id;
std::unique_ptr<IDataSource> _data_source;
std::vector<PreparedDetectionRule> _rules;
uint64_t _chunks_processed = 0;
uint64_t _detections_made = 0;
seastar::gate _gate;
seastar::metrics::counter _chunks_processed_metric;
seastar::metrics::counter _detections_made_metric;
public:
DetectionEngine(std::string id, std::unique_ptr<IDataSource> ds, const std::vector<DetectionRuleConfig>& rule_cfgs)
: _id(std::move(id)), _data_source(std::move(ds)) {
for (const auto& r_cfg : rule_cfgs) {
_rules.emplace_back(r_cfg);
}
}
seastar::future<> register_metrics(const std::string& id_prefix) {
seastar::metrics::metric_group_definition mgs;
mgs.add_counter(id_prefix + "_chunks_processed_total", _chunks_processed_metric, "Total data chunks processed by this engine");
mgs.add_counter(id_prefix + "_detections_made_total", _detections_made_metric, "Total detections made by this engine");
return seastar::metrics::add_group(seastar::sprint("detector_%s", _id), mgs);
}
seastar::future<uint64_t> run_detection_loop() {
seastar::print("DetectionEngine (%s) Shard %u: Starting detection. Rules: %d, Source: '%s'.\n",
_id, seastar::this_shard_id(), _rules.size(), _data_source->get_name());
return seastar::with_gate(_gate, [this] {
return seastar::repeat([this]() -> seastar::future<seastar::stop_iteration> {
if (_gate.is_closed()) {
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes);
}
return _data_source->get_chunk().then([this](std::optional<DataChunk> opt_chunk) {
if (!opt_chunk) {
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes);
}
DataChunk chunk = std::move(*opt_chunk);
_chunks_processed++;
_chunks_processed_metric++;
for (const auto& rule : _rules) {
if (rule.check(chunk)) {
_detections_made++;
_detections_made_metric++;
seastar::print("!!! DETECTION (%s) Shard %u: Rule '%s'. Action: '%s'. Total Detections: %lu !!!\n",
_id, seastar::this_shard_id(), rule.get_name(), rule.get_action(), _detections_made);
}
}
if (_chunks_processed % 2000 == 0) { // Log less frequently
seastar::print("DetectionEngine (%s) Shard %u: Chunks: %lu. Detections: %lu\n",
_id, seastar::this_shard_id(), _chunks_processed, _detections_made);
}
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::no);
});
}).then([this] {
seastar::print("DetectionEngine (%s) Shard %u: Loop finished. Total Chunks: %lu, Total Detections: %lu\n",
_id, seastar::this_shard_id(), _chunks_processed, _detections_made);
return seastar::make_ready_future<uint64_t>(_detections_made);
});
});
}
seastar::future<> stop() { return _gate.close(); }
};
// ========================================================================================= //
// == MAIN APP == //
// ========================================================================================= //
const std::string DEFAULT_CONFIG_JSON = R"({
"logical_pattern_generators": [
{
"name": "payload_0_to_ff",
"type": "sequential_bytes",
"seq_start_val": 0,
"seq_end_val": 255,
"seq_length_bytes": 2,
"limit_generated_patterns": 256
},
{
"name": "some_random_data",
"type": "random_bytes",
"random_min_len": 8,
"random_max_len": 16,
"limit_generated_patterns": 100
}
],
"protocol_formatters": [
{
"name": "simple_header_proto",
"header_hex": "1A2B3C",
"footer_hex": "DDEE",
"add_mock_checksum": true
}
],
"detection_rules": [
{
"name": "find_header_1A2B3C",
"condition_type": "CONTAINS_SUBSEQUENCE",
"byte_sequence_hex": "1A2B3C",
"action_on_detect": "log_header_1A2B3C_detected"
},
{
"name": "find_payload_0001_in_header_proto",
"condition_type": "CONTAINS_SUBSEQUENCE",
"byte_sequence_hex": "1A2B3C0001",
"action_on_detect": "alert_critical_payload_0001"
}
],
"generators_parallelism": 0,
"detectors_parallelism": 2,
"data_source_chunk_size": 1024,
"data_source_max_chunks_per_detector": 50000
})";
int main(int argc, char** argv) {
seastar::app_template app;
app.add_options()
("config-json", boost::program_options::value<std::string>()->default_value(DEFAULT_CONFIG_JSON), "JSON string for configuration")
("run-generators", boost::program_options::value<bool>()->default_value(false), "Enable data generation loop (for benchmarking/load)")
("generator-pattern-limit", boost::program_options::value<uint64_t>()->default_value(1000000), "Limit for #patterns from active generators");
return app.run(argc, argv, [&]() -> seastar::future<> {
seastar::print("High-Performance Pattern Engine (Seastar) - START\n");
auto& conf = app.configuration();
AppConfig config = AppConfig::load_from_json_string(conf["config-json"].as<std::string>());
bool run_generators_flag = conf["run-generators"].as<bool>();
uint64_t generator_pattern_limit_flag = conf["generator-pattern-limit"].as<uint64_t>();
std::vector<seastar::future<>> all_tasks_futures;
// --- Prepare data for injection (generated once) ---
std::vector<FormattedDataPacket> generated_packets_for_injection;
if (!config.logical_pattern_configs.empty() && !config.protocol_formatting_rules.empty()) {
auto lg_cfg_for_injection = config.logical_pattern_configs[0]; // Use first generator config
lg_cfg_for_injection.limit_generated_patterns = 20; // Limit to 20 for injection
auto formatter_rule_for_injection = config.protocol_formatting_rules[0]; // Use first formatter
auto temp_lg_provider = std::make_unique<FlexibleLogicalPatternProvider>(lg_cfg_for_injection);
ProtocolFormatter temp_formatter(formatter_rule_for_injection);
seastar::print("Generating packets for detector injection...\n");
while(true) {
auto opt_lp = co_await temp_lg_provider->next_pattern();
if (opt_lp) {
generated_packets_for_injection.push_back(temp_formatter.format(*opt_lp));
} else break;
}
seastar::print("Generated %d packets for injection into MockDataSources.\n", generated_packets_for_injection.size());
if (!generated_packets_for_injection.empty()) {
Utility::print_byte_vector(generated_packets_for_injection[0], " Example Injection Packet: ");
}
}
// --- Initialize and Run Detection Engines ---
int num_detectors = std::min((int)seastar::smp::count, config.detectors_parallelism);
num_detectors = std::max(1, num_detectors);
std::vector<seastar::shared_ptr<DetectionEngine>> detection_engines;
for (int i = 0; i < num_detectors; ++i) {
auto ds_name = seastar::sprint("mock_ds_%d", i);
auto data_source = std::make_unique<MockDataSource>(
ds_name,
config.data_source_chunk_size,
config.data_source_max_chunks_per_detector,
generated_packets_for_injection
);
auto engine_id = seastar::sprint("engine_%d_shard%u", i, seastar::this_shard_id()); // Note: this_shard_id() here is main shard
auto engine = seastar::make_shared<DetectionEngine>(engine_id, std::move(data_source), config.detection_rules_configs);
detection_engines.push_back(engine);
}
for (unsigned i = 0; i < detection_engines.size(); ++i) {
auto engine = detection_engines[i];
unsigned target_shard = i % seastar::smp::count; // Distribute engines
all_tasks_futures.push_back(
seastar::smp::submit_to(target_shard, [engine, target_shard]() {
return engine->register_metrics(seastar::sprint("s%u_", target_shard))
.then([engine] { return engine->run_detection_loop(); })
.finally([engine] { return engine->stop(); });
}).then_wrapped([](auto&& f){
try { f.get(); } catch (const std::exception& e) { seastar::print("Detection task failed: %s\n", e.what());}
return seastar::make_ready_future<>(); // Ensure outer future completes
}) // Add error handling for the task itself
);
}
seastar::print("Launched %d Detection Engines.\n", detection_engines.size());
// --- Initialize and Run Generation Orchestrators (Optional) ---
if (run_generators_flag && !config.logical_pattern_configs.empty() && !config.protocol_formatting_rules.empty()) {
int num_generators = std::min((int)seastar::smp::count, config.generators_parallelism);
num_generators = std::max(1, num_generators);
std::vector<seastar::shared_ptr<GenerationOrchestrator>> gen_orchestrators;
for (int i = 0; i < num_generators; ++i) {
// Cycle through logical pattern configs and formatters for variety if multiple generators run
const auto& lg_cfg = config.logical_pattern_configs[i % config.logical_pattern_configs.size()];
const auto& fmt_rule = config.protocol_formatting_rules[i % config.protocol_formatting_rules.size()];
auto logical_gen = std::make_unique<FlexibleLogicalPatternProvider>(lg_cfg);
ProtocolFormatter formatter(fmt_rule);
auto orchestrator_id = seastar::sprint("gen_orch_%d_shard%u", i, seastar::this_shard_id());
auto orchestrator = seastar::make_shared<GenerationOrchestrator>(std::move(logical_gen), formatter);
gen_orchestrators.push_back(orchestrator);
}
for (unsigned i = 0; i < gen_orchestrators.size(); ++i) {
auto orchestrator = gen_orchestrators[i];
unsigned target_shard = (detection_engines.size() + i) % seastar::smp::count; // Try to put on different shards from detectors
all_tasks_futures.push_back(
seastar::smp::submit_to(target_shard, [orchestrator, generator_pattern_limit_flag, target_shard]() {
return orchestrator->register_metrics(seastar::sprint("s%u_", target_shard))
.then([orchestrator, generator_pattern_limit_flag]{ return orchestrator->run_generation_loop(generator_pattern_limit_flag);})
.finally([orchestrator] { return orchestrator->stop(); });
}).then_wrapped([](auto&& f){
try { f.get(); } catch (const std::exception& e) { seastar::print("Generation task failed: %s\n", e.what());}
return seastar::make_ready_future<>();
})
);
}
seastar::print("Launched %d Generation Orchestrators.\n", gen_orchestrators.size());
}
// Wait for all tasks (detection and generation) to complete
co_await seastar::when_all(all_tasks_futures.begin(), all_tasks_futures.end())
.then_wrapped([](auto&& f) { // Use then_wrapped to catch exceptions from when_all itself
try {
f.get(); // Propagate exceptions if any task failed critically
seastar::print("All tasks completed.\n");
} catch (const std::exception& e) {
seastar::print("Error during when_all: %s\n", e.what());
}
});
seastar::print("High-Performance Pattern Engine (Seastar) - END\n");
co_return;
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment