Skip to content

Instantly share code, notes, and snippets.

@peasead
Last active March 20, 2025 21:21
Show Gist options
  • Save peasead/872a832c6c9d21c7b29ed7a36bb947be to your computer and use it in GitHub Desktop.
Save peasead/872a832c6c9d21c7b29ed7a36bb947be to your computer and use it in GitHub Desktop.
Convert an Elastic Common Schema (ECS) formatted NDJSON file into a STIX 2.1 JSON file.
import json
import uuid
import argparse
import os
from stix2 import Indicator, Bundle, AttackPattern, MarkingDefinition
# Author: @andythevariable
# Usage:
# python -m venv ecs-to-stix
# source ecs-to-stix/bin/activate
# pip install stix2
# python ecs-to-stix.py your-ecs-file.ndjson
# Define TLP:CLEAR manually
TLP_CLEAR = MarkingDefinition(
id="marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
created="2017-01-20T00:00:00.000Z",
definition_type="statement",
definition={"statement": "TLP:CLEAR"}
)
# Helper to generate STIX object IDs
def stix_id(type_):
return f"{type_}--{uuid.uuid4()}"
# Main conversion function
def convert_ecs_to_stix(input_file):
# 📂 Load NDJSON data
with open(input_file, "r") as f:
ecs_data = [json.loads(line) for line in f]
stix_objects = [TLP_CLEAR]
attack_patterns = []
for doc in ecs_data:
ind = doc["threat"]["indicator"]
threat = doc["threat"]
ts = doc["@timestamp"]
type_ = ind["type"]
pattern = ""
# Generate STIX pattern
if type_ == "file":
hashes = ind.get("file", {}).get("hash", {})
pattern_parts = []
for algo in ["md5", "sha1", "sha256"]:
hash_val = hashes.get(algo)
if hash_val:
pattern_parts.append(f"file:hashes.'{algo.upper()}' = '{hash_val}'")
if not pattern_parts:
continue
pattern = " AND ".join(pattern_parts)
elif type_ == "ip":
pattern = f"ipv4-addr:value = '{ind['ip']}'"
elif type_ == "domain-name":
pattern = f"domain-name:value = '{ind['domain']}'"
elif type_ == "url":
pattern = f"url:value = '{ind['url']['full']}'"
else:
continue
# Add ATT&CK technique if present
tech = threat.get("technique", {})
tech_id = tech.get("id")
tech_name = tech.get("name", "")
if tech_id and tech_id not in [ap.id for ap in attack_patterns]:
ap = AttackPattern(
id=stix_id("attack-pattern"),
name=tech_name,
external_references=[{
"source_name": "mitre-attack",
"external_id": tech_id,
"url": tech.get("reference", f"https://attack.mitre.org/techniques/{tech_id}/")
}]
)
attack_patterns.append(ap)
stix_objects.append(ap)
# Create the STIX Indicator
stix_indicator = Indicator(
id=stix_id("indicator"),
created=ts,
modified=ts,
name=ind.get("description", "Malicious Indicator"),
pattern=f"[{pattern}]",
pattern_type="stix",
labels=["malicious-activity"],
valid_from=ts,
object_marking_refs=[TLP_CLEAR["id"]],
external_references=[
{"source_name": "your-org-name-here", "url": ref}
for ref in ind.get("reference", [])
]
)
stix_objects.append(stix_indicator)
# Bundle and export
bundle = Bundle(objects=stix_objects)
output_file = os.path.splitext(input_file)[0] + ".stix2.json"
with open(output_file, "w") as f:
f.write(bundle.serialize(pretty=True))
print(f"STIX 2.1 bundle written to {output_file}")
# 🧭 CLI argument parsing
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert ECS NDJSON to STIX 2.1")
parser.add_argument("input", help="Path to ECS NDJSON file (e.g. indicators.ndjson)")
args = parser.parse_args()
convert_ecs_to_stix(args.input)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment