Last active
March 20, 2025 21:21
-
-
Save peasead/872a832c6c9d21c7b29ed7a36bb947be to your computer and use it in GitHub Desktop.
Convert an Elastic Common Schema (ECS) formatted NDJSON file into a STIX 2.1 JSON file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import uuid | |
import argparse | |
import os | |
from stix2 import Indicator, Bundle, AttackPattern, MarkingDefinition | |
# Author: @andythevariable | |
# Usage: | |
# python -m venv ecs-to-stix | |
# source ecs-to-stix/bin/activate | |
# pip install stix2 | |
# python ecs-to-stix.py your-ecs-file.ndjson | |
# Define TLP:CLEAR manually | |
TLP_CLEAR = MarkingDefinition( | |
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", | |
created="2017-01-20T00:00:00.000Z", | |
definition_type="statement", | |
definition={"statement": "TLP:CLEAR"} | |
) | |
# Helper to generate STIX object IDs | |
def stix_id(type_): | |
return f"{type_}--{uuid.uuid4()}" | |
# Main conversion function | |
def convert_ecs_to_stix(input_file): | |
# 📂 Load NDJSON data | |
with open(input_file, "r") as f: | |
ecs_data = [json.loads(line) for line in f] | |
stix_objects = [TLP_CLEAR] | |
attack_patterns = [] | |
for doc in ecs_data: | |
ind = doc["threat"]["indicator"] | |
threat = doc["threat"] | |
ts = doc["@timestamp"] | |
type_ = ind["type"] | |
pattern = "" | |
# Generate STIX pattern | |
if type_ == "file": | |
hashes = ind.get("file", {}).get("hash", {}) | |
pattern_parts = [] | |
for algo in ["md5", "sha1", "sha256"]: | |
hash_val = hashes.get(algo) | |
if hash_val: | |
pattern_parts.append(f"file:hashes.'{algo.upper()}' = '{hash_val}'") | |
if not pattern_parts: | |
continue | |
pattern = " AND ".join(pattern_parts) | |
elif type_ == "ip": | |
pattern = f"ipv4-addr:value = '{ind['ip']}'" | |
elif type_ == "domain-name": | |
pattern = f"domain-name:value = '{ind['domain']}'" | |
elif type_ == "url": | |
pattern = f"url:value = '{ind['url']['full']}'" | |
else: | |
continue | |
# Add ATT&CK technique if present | |
tech = threat.get("technique", {}) | |
tech_id = tech.get("id") | |
tech_name = tech.get("name", "") | |
if tech_id and tech_id not in [ap.id for ap in attack_patterns]: | |
ap = AttackPattern( | |
id=stix_id("attack-pattern"), | |
name=tech_name, | |
external_references=[{ | |
"source_name": "mitre-attack", | |
"external_id": tech_id, | |
"url": tech.get("reference", f"https://attack.mitre.org/techniques/{tech_id}/") | |
}] | |
) | |
attack_patterns.append(ap) | |
stix_objects.append(ap) | |
# Create the STIX Indicator | |
stix_indicator = Indicator( | |
id=stix_id("indicator"), | |
created=ts, | |
modified=ts, | |
name=ind.get("description", "Malicious Indicator"), | |
pattern=f"[{pattern}]", | |
pattern_type="stix", | |
labels=["malicious-activity"], | |
valid_from=ts, | |
object_marking_refs=[TLP_CLEAR["id"]], | |
external_references=[ | |
{"source_name": "your-org-name-here", "url": ref} | |
for ref in ind.get("reference", []) | |
] | |
) | |
stix_objects.append(stix_indicator) | |
# Bundle and export | |
bundle = Bundle(objects=stix_objects) | |
output_file = os.path.splitext(input_file)[0] + ".stix2.json" | |
with open(output_file, "w") as f: | |
f.write(bundle.serialize(pretty=True)) | |
print(f"STIX 2.1 bundle written to {output_file}") | |
# 🧭 CLI argument parsing | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Convert ECS NDJSON to STIX 2.1") | |
parser.add_argument("input", help="Path to ECS NDJSON file (e.g. indicators.ndjson)") | |
args = parser.parse_args() | |
convert_ecs_to_stix(args.input) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment