Last active
June 5, 2025 18:59
-
-
Save h0ffy/595ef5de76c2735a1895efbdd120efb9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// JennyLab Research! ( https://www.jennylab.net ) | |
// ========================================================================================= // | |
// == SEASTAR HIGH-THROUGHPUT PATTERN GENERATION & DETECTION ENGINE == // | |
// == (No Device Interaction, Focus on Core Logic) == // | |
// ========================================================================================= // | |
// NOTE: Focus on data generation/detection. No SDR/USB. Detection logic is a placeholder. // | |
// Requires Seastar, Boost.Program_Options, Nlohmann JSON. // | |
// ========================================================================================= // | |
#include <iostream> | |
#include <fstream> | |
#include <string> | |
#include <vector> | |
#include <deque> | |
#include <optional> | |
#include <memory> | |
#include <algorithm> | |
#include <iomanip> | |
#include <variant> | |
#include <random> // For MockDataSource | |
#include <charconv> // For hex parsing | |
// Seastar Headers | |
#include <seastar/core/app-template.hh> | |
#include <seastar/core/reactor.hh> | |
#include <seastar/core/future.hh> | |
#include <seastar/core/sleep.hh> | |
#include <seastar/core/shared_ptr.hh> | |
#include <seastar/core/smp.hh> | |
#include <seastar/core/print.hh> | |
#include <seastar/core/gate.hh> | |
#include <seastar/core/circular_buffer.hh> | |
#include <seastar/core/metrics.hh> | |
// Nlohmann JSON | |
#include "json.hpp" // Assumes json.hpp is accessible | |
using json = nlohmann::json; | |
// ========================================================================================= // | |
// == CORE DATA TYPES == // | |
// ========================================================================================= // | |
using LogicalPattern = std::vector<uint8_t>; | |
using FormattedDataPacket = std::vector<uint8_t>; | |
using DataChunk = std::vector<uint8_t>; | |
namespace Utility { | |
void print_byte_vector(const std::vector<uint8_t>& vec, const std::string& prefix = "", size_t max_len = 32) { | |
seastar::print("%s", prefix); | |
for (size_t i = 0; i < std::min(vec.size(), max_len); ++i) { | |
seastar::print("%02x%s", vec[i], (i == std::min(vec.size(), max_len) - 1) ? "" : " "); | |
} | |
if (vec.size() > max_len) { | |
seastar::print("... (%lu bytes total)", vec.size()); | |
} | |
seastar::print("\n"); | |
} | |
std::vector<uint8_t> hex_string_to_bytes(const std::string& hex) { | |
std::vector<uint8_t> bytes; | |
if (hex.length() % 2 != 0) { | |
seastar::print("Warning: Hex string has odd length: %s\n", hex); | |
// Optionally throw, or ignore last char, or prepend 0. Here, just return empty. | |
return bytes; | |
} | |
for (size_t i = 0; i < hex.length(); i += 2) { | |
uint8_t byte; | |
std::string byte_str = hex.substr(i, 2); | |
auto [ptr, ec] = std::from_chars(byte_str.data(), byte_str.data() + 2, byte, 16); | |
if (ec == std::errc()) { | |
bytes.push_back(byte); | |
} else { | |
seastar::print("Warning: Invalid hex byte '%s' in string '%s'\n", byte_str, hex); | |
// Optionally throw or skip. | |
} | |
} | |
return bytes; | |
} | |
} | |
// ========================================================================================= // | |
// == CONFIGURATION (GENERATION & DETECTION) == // | |
// ========================================================================================= // | |
enum class LogicalPatternType { | |
STATIC_LIST, | |
SEQUENTIAL_BYTES, | |
RANDOM_BYTES | |
}; | |
struct LogicalPatternConfig { | |
std::string name = "default_logical_gen"; | |
LogicalPatternType type = LogicalPatternType::SEQUENTIAL_BYTES; | |
std::vector<LogicalPattern> static_patterns_data; // Parsed from hex strings | |
uint64_t seq_start_val = 0; | |
uint64_t seq_end_val = 255; | |
int seq_length_bytes = 1; | |
int random_min_len = 8; | |
int random_max_len = 64; | |
uint64_t limit_generated_patterns = 0; // 0 for unlimited (until provider specific end) | |
}; | |
struct ProtocolFormattingRule { | |
std::string name = "default_formatter"; | |
std::vector<uint8_t> header_data; // Parsed from hex string | |
std::vector<uint8_t> footer_data; // Parsed from hex string | |
bool add_mock_checksum = false; | |
}; | |
enum class DetectionConditionType { | |
EXACT_SEQUENCE, | |
CONTAINS_SUBSEQUENCE, | |
// REGEX_MATCH // Requires regex library | |
}; | |
struct DetectionRuleConfig { | |
std::string name = "default_detection_rule"; | |
DetectionConditionType condition_type = DetectionConditionType::EXACT_SEQUENCE; | |
std::vector<uint8_t> byte_sequence_to_detect_data; // Parsed from hex string | |
// std::string regex_to_detect; | |
std::string action_on_detect = "log"; | |
}; | |
class AppConfig { | |
public: | |
std::vector<LogicalPatternConfig> logical_pattern_configs; | |
std::vector<ProtocolFormattingRule> protocol_formatting_rules; | |
std::vector<DetectionRuleConfig> detection_rules_configs; | |
int generators_parallelism = 1; | |
int detectors_parallelism = seastar::smp::count; | |
size_t data_source_chunk_size = 4096; | |
uint64_t data_source_max_chunks_per_detector = 10000; // Limit for mock data source | |
static AppConfig load_from_json_string(const std::string& json_str) { | |
AppConfig cfg; | |
json j; | |
try { | |
j = json::parse(json_str); | |
if (j.contains("logical_pattern_generators")) { | |
for (const auto& gen_j : j["logical_pattern_generators"]) { | |
LogicalPatternConfig lpc; | |
lpc.name = gen_j.value("name", "unnamed_logical_gen"); | |
std::string type_str = gen_j.value("type", "sequential_bytes"); | |
if (type_str == "static_list") { | |
lpc.type = LogicalPatternType::STATIC_LIST; | |
if (gen_j.contains("static_patterns_hex")) { | |
for(const auto& p_hex : gen_j["static_patterns_hex"]) { | |
lpc.static_patterns_data.push_back(Utility::hex_string_to_bytes(p_hex.get<std::string>())); | |
} | |
} | |
} else if (type_str == "sequential_bytes") { | |
lpc.type = LogicalPatternType::SEQUENTIAL_BYTES; | |
lpc.seq_start_val = gen_j.value("seq_start_val", 0); | |
lpc.seq_end_val = gen_j.value("seq_end_val", 255); | |
lpc.seq_length_bytes = gen_j.value("seq_length_bytes", 1); | |
} else if (type_str == "random_bytes") { | |
lpc.type = LogicalPatternType::RANDOM_BYTES; | |
lpc.random_min_len = gen_j.value("random_min_len", 8); | |
lpc.random_max_len = gen_j.value("random_max_len", 64); | |
} | |
lpc.limit_generated_patterns = gen_j.value("limit_generated_patterns", 0); | |
cfg.logical_pattern_configs.push_back(lpc); | |
} | |
} | |
if (j.contains("protocol_formatters")) { | |
for (const auto& fmt_j : j["protocol_formatters"]) { | |
ProtocolFormattingRule pfr; | |
pfr.name = fmt_j.value("name", "unnamed_formatter"); | |
pfr.header_data = Utility::hex_string_to_bytes(fmt_j.value("header_hex", "")); | |
pfr.footer_data = Utility::hex_string_to_bytes(fmt_j.value("footer_hex", "")); | |
pfr.add_mock_checksum = fmt_j.value("add_mock_checksum", false); | |
cfg.protocol_formatting_rules.push_back(pfr); | |
} | |
} | |
if (j.contains("detection_rules")) { | |
for (const auto& dr_j : j["detection_rules"]) { | |
DetectionRuleConfig drc; | |
drc.name = dr_j.value("name", "unnamed_detection_rule"); | |
std::string cond_type_str = dr_j.value("condition_type", "exact_sequence"); | |
if (cond_type_str == "exact_sequence") drc.condition_type = DetectionConditionType::EXACT_SEQUENCE; | |
else if (cond_type_str == "contains_subsequence") drc.condition_type = DetectionConditionType::CONTAINS_SUBSEQUENCE; | |
drc.byte_sequence_to_detect_data = Utility::hex_string_to_bytes(dr_j.value("byte_sequence_hex", "")); | |
drc.action_on_detect = dr_j.value("action_on_detect", "log"); | |
cfg.detection_rules_configs.push_back(drc); | |
} | |
} | |
cfg.generators_parallelism = j.value("generators_parallelism", 1); | |
cfg.detectors_parallelism = j.value("detectors_parallelism", (int)seastar::smp::count); | |
cfg.data_source_chunk_size = j.value("data_source_chunk_size", 4096); | |
cfg.data_source_max_chunks_per_detector = j.value("data_source_max_chunks_per_detector", 10000); | |
} catch (json::exception& e) { | |
seastar::print("Error parsing AppConfig JSON: %s\n", e.what()); | |
throw; | |
} | |
seastar::print("AppConfig loaded. Logical Gens: %d, Formatters: %d, Detection Rules: %d\n", | |
cfg.logical_pattern_configs.size(), cfg.protocol_formatting_rules.size(), cfg.detection_rules_configs.size()); | |
return cfg; | |
} | |
}; | |
// ========================================================================================= // | |
// == PATTERN GENERATION MODULE == // | |
// ========================================================================================= // | |
class ILogicalPatternProvider { | |
public: | |
virtual ~ILogicalPatternProvider() = default; | |
virtual seastar::future<std::optional<LogicalPattern>> next_pattern() = 0; | |
virtual std::string get_name() const = 0; | |
virtual void reset() = 0; | |
}; | |
class FlexibleLogicalPatternProvider : public ILogicalPatternProvider { | |
LogicalPatternConfig _config; | |
uint64_t _current_seq_val; | |
size_t _current_static_idx; | |
std::mt19937_64 _rng; | |
uint64_t _generated_count = 0; | |
public: | |
FlexibleLogicalPatternProvider(const LogicalPatternConfig& cfg) | |
: _config(cfg), _current_seq_val(cfg.seq_start_val), _current_static_idx(0) { | |
std::random_device rd; | |
_rng.seed(rd() + seastar::this_shard_id()); | |
} | |
std::string get_name() const override { return _config.name; } | |
void reset() override { | |
_current_seq_val = _config.seq_start_val; | |
_current_static_idx = 0; | |
_generated_count = 0; | |
} | |
seastar::future<std::optional<LogicalPattern>> next_pattern() override { | |
if (_config.limit_generated_patterns > 0 && _generated_count >= _config.limit_generated_patterns) { | |
return seastar::make_ready_future<std::optional<LogicalPattern>>(std::nullopt); | |
} | |
LogicalPattern p; | |
bool pattern_generated = false; | |
switch(_config.type) { | |
case LogicalPatternType::SEQUENTIAL_BYTES: | |
if (_current_seq_val <= _config.seq_end_val) { | |
p.resize(_config.seq_length_bytes); | |
uint64_t temp = _current_seq_val; | |
for (int i = _config.seq_length_bytes - 1; i >= 0; --i) { | |
p[i] = static_cast<uint8_t>(temp & 0xFF); | |
temp >>= 8; | |
} | |
_current_seq_val++; | |
pattern_generated = true; | |
} | |
break; | |
case LogicalPatternType::RANDOM_BYTES: { | |
size_t len = std::uniform_int_distribution<size_t>(_config.random_min_len, _config.random_max_len)(_rng); | |
p.resize(len); | |
for(size_t i=0; i<len; ++i) p[i] = static_cast<uint8_t>(_rng() % 256); | |
pattern_generated = true; | |
} | |
break; | |
case LogicalPatternType::STATIC_LIST: | |
if (_current_static_idx < _config.static_patterns_data.size()) { | |
p = _config.static_patterns_data[_current_static_idx]; | |
_current_static_idx++; | |
pattern_generated = true; | |
} | |
break; | |
} | |
if (pattern_generated) { | |
_generated_count++; | |
return seastar::make_ready_future<std::optional<LogicalPattern>>(std::move(p)); | |
} | |
return seastar::make_ready_future<std::optional<LogicalPattern>>(std::nullopt); | |
} | |
}; | |
class ProtocolFormatter { | |
ProtocolFormattingRule _rule; | |
public: | |
ProtocolFormatter(const ProtocolFormattingRule& rule) : _rule(rule) {} | |
const std::string& get_name() const { return _rule.name; } | |
FormattedDataPacket format(const LogicalPattern& pattern) const { | |
FormattedDataPacket packet; | |
packet.reserve(_rule.header_data.size() + pattern.size() + _rule.footer_data.size() + (_rule.add_mock_checksum ? 1 : 0)); | |
packet.insert(packet.end(), _rule.header_data.begin(), _rule.header_data.end()); | |
packet.insert(packet.end(), pattern.begin(), pattern.end()); | |
if (_rule.add_mock_checksum && !pattern.empty()) { | |
uint8_t checksum = 0; | |
for (uint8_t byte : pattern) checksum ^= byte; // Simple XOR checksum | |
packet.push_back(checksum); | |
} | |
packet.insert(packet.end(), _rule.footer_data.begin(), _rule.footer_data.end()); | |
return packet; | |
} | |
}; | |
class GenerationOrchestrator { | |
std::unique_ptr<ILogicalPatternProvider> _logical_gen; | |
ProtocolFormatter _formatter; | |
uint64_t _packets_generated_total = 0; | |
seastar::gate _gate; | |
seastar::metrics::counter _packets_generated_metric; | |
public: | |
GenerationOrchestrator(std::unique_ptr<ILogicalPatternProvider> lg, ProtocolFormatter pf) | |
: _logical_gen(std::move(lg)), _formatter(std::move(pf)) {} | |
seastar::future<> register_metrics(const std::string& id_prefix) { | |
seastar::metrics::metric_group_definition mgs; | |
mgs.add_counter(id_prefix + "_packets_generated_total", _packets_generated_metric, "Total packets generated by this orchestrator"); | |
return seastar::metrics::add_group(seastar::sprint("generator_%s_%s", _logical_gen->get_name(), _formatter.get_name()), mgs); | |
} | |
seastar::future<uint64_t> run_generation_loop(uint64_t num_packets_to_generate_if_limited) { | |
return seastar::with_gate(_gate, [this, num_packets_to_generate_if_limited] { | |
uint64_t limit = num_packets_to_generate_if_limited > 0 ? | |
num_packets_to_generate_if_limited : | |
std::numeric_limits<uint64_t>::max(); | |
return seastar::repeat([this, limit]() -> seastar::future<seastar::stop_iteration> { | |
if (_packets_generated_total >= limit || _gate.is_closed()) { | |
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes); | |
} | |
return _logical_gen->next_pattern().then([this](std::optional<LogicalPattern> opt_lp) { | |
if (!opt_lp) { | |
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes); | |
} | |
FormattedDataPacket packet = _formatter.format(*opt_lp); | |
_packets_generated_total++; | |
_packets_generated_metric++; | |
if (_packets_generated_total % 500000 == 0) { // Log less frequently | |
seastar::print("Gen (%s/%s) Shard %u: %lu packets generated.\n", | |
_logical_gen->get_name(), _formatter.get_name(), seastar::this_shard_id(), _packets_generated_total); | |
} | |
// In a real system, packet would be pushed to a queue or consumer | |
// For now, it's just generated. | |
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::no); | |
}); | |
}).then([this] { | |
seastar::print("Gen (%s/%s) Shard %u: Loop finished. Total generated: %lu\n", | |
_logical_gen->get_name(), _formatter.get_name(), seastar::this_shard_id(), _packets_generated_total); | |
return seastar::make_ready_future<uint64_t>(_packets_generated_total); | |
}); | |
}); | |
} | |
seastar::future<> stop() { return _gate.close(); } | |
}; | |
// ========================================================================================= // | |
// == PATTERN DETECTION MODULE == // | |
// ========================================================================================= // | |
class IDataSource { | |
public: | |
virtual ~IDataSource() = default; | |
virtual seastar::future<std::optional<DataChunk>> get_chunk() = 0; | |
virtual std::string get_name() const = 0; | |
}; | |
class MockDataSource : public IDataSource { | |
std::string _name; | |
size_t _chunk_size; | |
uint64_t _chunks_generated = 0; | |
uint64_t _max_chunks; | |
std::mt19937_64 _rng; | |
std::vector<FormattedDataPacket> _inject_packets; | |
size_t _inject_idx = 0; | |
bool _cycled_injections = false; | |
public: | |
MockDataSource(std::string name, size_t chunk_size, uint64_t max_chunks, std::vector<FormattedDataPacket> injections = {}) | |
: _name(name), _chunk_size(chunk_size), _max_chunks(max_chunks), _inject_packets(std::move(injections)) { | |
std::random_device rd; | |
_rng.seed(rd() + seastar::this_shard_id() + std::hash<std::string>{}(name)); | |
} | |
std::string get_name() const override { return _name; } | |
seastar::future<std::optional<DataChunk>> get_chunk() override { | |
if (_chunks_generated >= _max_chunks) { | |
return seastar::make_ready_future<std::optional<DataChunk>>(std::nullopt); | |
} | |
DataChunk chunk; | |
chunk.reserve(_chunk_size); | |
// Inject a predefined packet (if any left and at certain intervals) | |
if (!_inject_packets.empty() && (_chunks_generated % 7 == 0 || !_cycled_injections)) { // Inject more frequently at start | |
if (_inject_idx < _inject_packets.size()) { | |
const auto& packet_to_inject = _inject_packets[_inject_idx]; | |
chunk.insert(chunk.end(), packet_to_inject.begin(), packet_to_inject.end()); | |
_inject_idx++; | |
if(_inject_idx >= _inject_packets.size()) _cycled_injections = true; // Cycled through all injections at least once | |
} | |
} | |
while(chunk.size() < _chunk_size) { | |
chunk.push_back(static_cast<uint8_t>(_rng() % 256)); | |
} | |
if(chunk.size() > _chunk_size) chunk.resize(_chunk_size); // Ensure exact chunk size | |
_chunks_generated++; | |
return seastar::make_ready_future<std::optional<DataChunk>>(std::move(chunk)); | |
} | |
}; | |
class PreparedDetectionRule { | |
DetectionRuleConfig _config; | |
// For CONTAINS_SUBSEQUENCE, pre-calculate KMP next table or use Boyer-Moore for single pattern. | |
// For multiple patterns, Aho-Corasick automaton would be built here. | |
public: | |
PreparedDetectionRule(const DetectionRuleConfig& cfg) : _config(cfg) {} | |
const std::string& get_name() const { return _config.name; } | |
bool check(const DataChunk& chunk) const { | |
// Placeholder for advanced detection algorithms. | |
// This simple std::search is not "high-performance" for "many requests". | |
if (_config.byte_sequence_to_detect_data.empty()) return false; | |
if (_config.condition_type == DetectionConditionType::EXACT_SEQUENCE) { | |
return chunk == _config.byte_sequence_to_detect_data; | |
} else if (_config.condition_type == DetectionConditionType::CONTAINS_SUBSEQUENCE) { | |
if (chunk.size() < _config.byte_sequence_to_detect_data.size()) return false; | |
auto it = std::search(chunk.begin(), chunk.end(), | |
_config.byte_sequence_to_detect_data.begin(), | |
_config.byte_sequence_to_detect_data.end()); | |
return it != chunk.end(); | |
} | |
return false; | |
} | |
const std::string& get_action() const { return _config.action_on_detect; } | |
}; | |
class DetectionEngine { | |
std::string _id; | |
std::unique_ptr<IDataSource> _data_source; | |
std::vector<PreparedDetectionRule> _rules; | |
uint64_t _chunks_processed = 0; | |
uint64_t _detections_made = 0; | |
seastar::gate _gate; | |
seastar::metrics::counter _chunks_processed_metric; | |
seastar::metrics::counter _detections_made_metric; | |
public: | |
DetectionEngine(std::string id, std::unique_ptr<IDataSource> ds, const std::vector<DetectionRuleConfig>& rule_cfgs) | |
: _id(std::move(id)), _data_source(std::move(ds)) { | |
for (const auto& r_cfg : rule_cfgs) { | |
_rules.emplace_back(r_cfg); | |
} | |
} | |
seastar::future<> register_metrics(const std::string& id_prefix) { | |
seastar::metrics::metric_group_definition mgs; | |
mgs.add_counter(id_prefix + "_chunks_processed_total", _chunks_processed_metric, "Total data chunks processed by this engine"); | |
mgs.add_counter(id_prefix + "_detections_made_total", _detections_made_metric, "Total detections made by this engine"); | |
return seastar::metrics::add_group(seastar::sprint("detector_%s", _id), mgs); | |
} | |
seastar::future<uint64_t> run_detection_loop() { | |
seastar::print("DetectionEngine (%s) Shard %u: Starting detection. Rules: %d, Source: '%s'.\n", | |
_id, seastar::this_shard_id(), _rules.size(), _data_source->get_name()); | |
return seastar::with_gate(_gate, [this] { | |
return seastar::repeat([this]() -> seastar::future<seastar::stop_iteration> { | |
if (_gate.is_closed()) { | |
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes); | |
} | |
return _data_source->get_chunk().then([this](std::optional<DataChunk> opt_chunk) { | |
if (!opt_chunk) { | |
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::yes); | |
} | |
DataChunk chunk = std::move(*opt_chunk); | |
_chunks_processed++; | |
_chunks_processed_metric++; | |
for (const auto& rule : _rules) { | |
if (rule.check(chunk)) { | |
_detections_made++; | |
_detections_made_metric++; | |
seastar::print("!!! DETECTION (%s) Shard %u: Rule '%s'. Action: '%s'. Total Detections: %lu !!!\n", | |
_id, seastar::this_shard_id(), rule.get_name(), rule.get_action(), _detections_made); | |
} | |
} | |
if (_chunks_processed % 2000 == 0) { // Log less frequently | |
seastar::print("DetectionEngine (%s) Shard %u: Chunks: %lu. Detections: %lu\n", | |
_id, seastar::this_shard_id(), _chunks_processed, _detections_made); | |
} | |
return seastar::make_ready_future<seastar::stop_iteration>(seastar::stop_iteration::no); | |
}); | |
}).then([this] { | |
seastar::print("DetectionEngine (%s) Shard %u: Loop finished. Total Chunks: %lu, Total Detections: %lu\n", | |
_id, seastar::this_shard_id(), _chunks_processed, _detections_made); | |
return seastar::make_ready_future<uint64_t>(_detections_made); | |
}); | |
}); | |
} | |
seastar::future<> stop() { return _gate.close(); } | |
}; | |
// ========================================================================================= // | |
// == MAIN APP == // | |
// ========================================================================================= // | |
const std::string DEFAULT_CONFIG_JSON = R"({ | |
"logical_pattern_generators": [ | |
{ | |
"name": "payload_0_to_ff", | |
"type": "sequential_bytes", | |
"seq_start_val": 0, | |
"seq_end_val": 255, | |
"seq_length_bytes": 2, | |
"limit_generated_patterns": 256 | |
}, | |
{ | |
"name": "some_random_data", | |
"type": "random_bytes", | |
"random_min_len": 8, | |
"random_max_len": 16, | |
"limit_generated_patterns": 100 | |
} | |
], | |
"protocol_formatters": [ | |
{ | |
"name": "simple_header_proto", | |
"header_hex": "1A2B3C", | |
"footer_hex": "DDEE", | |
"add_mock_checksum": true | |
} | |
], | |
"detection_rules": [ | |
{ | |
"name": "find_header_1A2B3C", | |
"condition_type": "CONTAINS_SUBSEQUENCE", | |
"byte_sequence_hex": "1A2B3C", | |
"action_on_detect": "log_header_1A2B3C_detected" | |
}, | |
{ | |
"name": "find_payload_0001_in_header_proto", | |
"condition_type": "CONTAINS_SUBSEQUENCE", | |
"byte_sequence_hex": "1A2B3C0001", | |
"action_on_detect": "alert_critical_payload_0001" | |
} | |
], | |
"generators_parallelism": 0, | |
"detectors_parallelism": 2, | |
"data_source_chunk_size": 1024, | |
"data_source_max_chunks_per_detector": 50000 | |
})"; | |
int main(int argc, char** argv) { | |
seastar::app_template app; | |
app.add_options() | |
("config-json", boost::program_options::value<std::string>()->default_value(DEFAULT_CONFIG_JSON), "JSON string for configuration") | |
("run-generators", boost::program_options::value<bool>()->default_value(false), "Enable data generation loop (for benchmarking/load)") | |
("generator-pattern-limit", boost::program_options::value<uint64_t>()->default_value(1000000), "Limit for #patterns from active generators"); | |
return app.run(argc, argv, [&]() -> seastar::future<> { | |
seastar::print("High-Performance Pattern Engine (Seastar) - START\n"); | |
auto& conf = app.configuration(); | |
AppConfig config = AppConfig::load_from_json_string(conf["config-json"].as<std::string>()); | |
bool run_generators_flag = conf["run-generators"].as<bool>(); | |
uint64_t generator_pattern_limit_flag = conf["generator-pattern-limit"].as<uint64_t>(); | |
std::vector<seastar::future<>> all_tasks_futures; | |
// --- Prepare data for injection (generated once) --- | |
std::vector<FormattedDataPacket> generated_packets_for_injection; | |
if (!config.logical_pattern_configs.empty() && !config.protocol_formatting_rules.empty()) { | |
auto lg_cfg_for_injection = config.logical_pattern_configs[0]; // Use first generator config | |
lg_cfg_for_injection.limit_generated_patterns = 20; // Limit to 20 for injection | |
auto formatter_rule_for_injection = config.protocol_formatting_rules[0]; // Use first formatter | |
auto temp_lg_provider = std::make_unique<FlexibleLogicalPatternProvider>(lg_cfg_for_injection); | |
ProtocolFormatter temp_formatter(formatter_rule_for_injection); | |
seastar::print("Generating packets for detector injection...\n"); | |
while(true) { | |
auto opt_lp = co_await temp_lg_provider->next_pattern(); | |
if (opt_lp) { | |
generated_packets_for_injection.push_back(temp_formatter.format(*opt_lp)); | |
} else break; | |
} | |
seastar::print("Generated %d packets for injection into MockDataSources.\n", generated_packets_for_injection.size()); | |
if (!generated_packets_for_injection.empty()) { | |
Utility::print_byte_vector(generated_packets_for_injection[0], " Example Injection Packet: "); | |
} | |
} | |
// --- Initialize and Run Detection Engines --- | |
int num_detectors = std::min((int)seastar::smp::count, config.detectors_parallelism); | |
num_detectors = std::max(1, num_detectors); | |
std::vector<seastar::shared_ptr<DetectionEngine>> detection_engines; | |
for (int i = 0; i < num_detectors; ++i) { | |
auto ds_name = seastar::sprint("mock_ds_%d", i); | |
auto data_source = std::make_unique<MockDataSource>( | |
ds_name, | |
config.data_source_chunk_size, | |
config.data_source_max_chunks_per_detector, | |
generated_packets_for_injection | |
); | |
auto engine_id = seastar::sprint("engine_%d_shard%u", i, seastar::this_shard_id()); // Note: this_shard_id() here is main shard | |
auto engine = seastar::make_shared<DetectionEngine>(engine_id, std::move(data_source), config.detection_rules_configs); | |
detection_engines.push_back(engine); | |
} | |
for (unsigned i = 0; i < detection_engines.size(); ++i) { | |
auto engine = detection_engines[i]; | |
unsigned target_shard = i % seastar::smp::count; // Distribute engines | |
all_tasks_futures.push_back( | |
seastar::smp::submit_to(target_shard, [engine, target_shard]() { | |
return engine->register_metrics(seastar::sprint("s%u_", target_shard)) | |
.then([engine] { return engine->run_detection_loop(); }) | |
.finally([engine] { return engine->stop(); }); | |
}).then_wrapped([](auto&& f){ | |
try { f.get(); } catch (const std::exception& e) { seastar::print("Detection task failed: %s\n", e.what());} | |
return seastar::make_ready_future<>(); // Ensure outer future completes | |
}) // Add error handling for the task itself | |
); | |
} | |
seastar::print("Launched %d Detection Engines.\n", detection_engines.size()); | |
// --- Initialize and Run Generation Orchestrators (Optional) --- | |
if (run_generators_flag && !config.logical_pattern_configs.empty() && !config.protocol_formatting_rules.empty()) { | |
int num_generators = std::min((int)seastar::smp::count, config.generators_parallelism); | |
num_generators = std::max(1, num_generators); | |
std::vector<seastar::shared_ptr<GenerationOrchestrator>> gen_orchestrators; | |
for (int i = 0; i < num_generators; ++i) { | |
// Cycle through logical pattern configs and formatters for variety if multiple generators run | |
const auto& lg_cfg = config.logical_pattern_configs[i % config.logical_pattern_configs.size()]; | |
const auto& fmt_rule = config.protocol_formatting_rules[i % config.protocol_formatting_rules.size()]; | |
auto logical_gen = std::make_unique<FlexibleLogicalPatternProvider>(lg_cfg); | |
ProtocolFormatter formatter(fmt_rule); | |
auto orchestrator_id = seastar::sprint("gen_orch_%d_shard%u", i, seastar::this_shard_id()); | |
auto orchestrator = seastar::make_shared<GenerationOrchestrator>(std::move(logical_gen), formatter); | |
gen_orchestrators.push_back(orchestrator); | |
} | |
for (unsigned i = 0; i < gen_orchestrators.size(); ++i) { | |
auto orchestrator = gen_orchestrators[i]; | |
unsigned target_shard = (detection_engines.size() + i) % seastar::smp::count; // Try to put on different shards from detectors | |
all_tasks_futures.push_back( | |
seastar::smp::submit_to(target_shard, [orchestrator, generator_pattern_limit_flag, target_shard]() { | |
return orchestrator->register_metrics(seastar::sprint("s%u_", target_shard)) | |
.then([orchestrator, generator_pattern_limit_flag]{ return orchestrator->run_generation_loop(generator_pattern_limit_flag);}) | |
.finally([orchestrator] { return orchestrator->stop(); }); | |
}).then_wrapped([](auto&& f){ | |
try { f.get(); } catch (const std::exception& e) { seastar::print("Generation task failed: %s\n", e.what());} | |
return seastar::make_ready_future<>(); | |
}) | |
); | |
} | |
seastar::print("Launched %d Generation Orchestrators.\n", gen_orchestrators.size()); | |
} | |
// Wait for all tasks (detection and generation) to complete | |
co_await seastar::when_all(all_tasks_futures.begin(), all_tasks_futures.end()) | |
.then_wrapped([](auto&& f) { // Use then_wrapped to catch exceptions from when_all itself | |
try { | |
f.get(); // Propagate exceptions if any task failed critically | |
seastar::print("All tasks completed.\n"); | |
} catch (const std::exception& e) { | |
seastar::print("Error during when_all: %s\n", e.what()); | |
} | |
}); | |
seastar::print("High-Performance Pattern Engine (Seastar) - END\n"); | |
co_return; | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment