Skip to content

Instantly share code, notes, and snippets.

@drego85
Created April 15, 2026 14:58
Show Gist options
  • Select an option

  • Save drego85/25db1165ebadf5abced1bbe2be06ac0f to your computer and use it in GitHub Desktop.

Select an option

Save drego85/25db1165ebadf5abced1bbe2be06ac0f to your computer and use it in GitHub Desktop.
Automated Mirax dropper unpacker that derives RESOURCE_KEY and extracts embedded payload APKs from StreamTV-like samples.
#!/usr/bin/env python3
"""
Mirax extraction pipeline for StreamTV-like droppers.
Threat intelligence recap (based on the analyzed streamtv_* samples):
- The visible APK is usually a dropper stage.
- A bootstrap Application class loads a deeply hidden asset from assets/.
- That hidden blob is RC4-encrypted; the RC4 key is hardcoded in the bootstrap class.
- After RC4 decryption, the blob becomes a stage-1 ZIP containing classes*.dex.
- Stage-1 code embeds the RESOURCE_KEY (64-hex), used for storage/payload decryption.
- The final implant payload is typically in res/raw/*.bin and is decrypted with XOR
using RESOURCE_KEY, producing a valid APK (PK\\x03\\x04 magic).
What this script does per APK:
1) Decompile bootstrap classes and recover:
- hidden asset path/filename
- RC4 key
2) Extract hidden asset from the APK and RC4-decrypt it.
3) Parse stage-1 dex files and recover RESOURCE_KEY candidates.
4) Validate the key by trying XOR decryption on res/raw/*.bin candidates.
5) If valid APK payload is recovered, save it as:
<original_stem>_extracted_payload.apk
6) If key/payload cannot be recovered, skip the sample and continue.
Console output is intentionally minimal:
- sample name
- extracted key
- extracted payload filename
- source raw file + payload SHA-256
Requirements:
- apkInspector CLI must be installed and available in PATH.
- Official repository: https://github.com/erev0s/apkInspector
- JADX CLI is also required in PATH for bootstrap/source analysis.
Made with ♥ by Andrea Draghetti
This file may be licensed under the terms of of the
GNU General Public License Version 3 (the ``GPL'').
"""
from __future__ import annotations
import argparse
import hashlib
import io
import re
import shutil
import subprocess
import sys
import tempfile
import zipfile
import zlib
from pathlib import Path
from typing import Any
def run_capture(cmd: list[str], check: bool = True) -> tuple[int, str]:
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if check and p.returncode != 0:
raise RuntimeError(f"Command failed ({p.returncode}): {' '.join(cmd)}\n{p.stdout}")
return p.returncode, p.stdout
def ensure_unique_path(path: Path) -> Path:
if not path.exists():
return path
stem, suffix, parent = path.stem, path.suffix, path.parent
i = 1
while True:
c = parent / f"{stem}_{i}{suffix}"
if not c.exists():
return c
i += 1
def rc4_crypt(data: bytes, key: bytes) -> bytes:
s = list(range(256))
j = 0
for i in range(256):
j = (j + s[i] + key[i % len(key)]) % 256
s[i], s[j] = s[j], s[i]
i = 0
j = 0
out = bytearray(len(data))
for n, b in enumerate(data):
i = (i + 1) % 256
j = (j + s[i]) % 256
s[i], s[j] = s[j], s[i]
k = s[(s[i] + s[j]) % 256]
out[n] = b ^ k
return bytes(out)
def xor_crypt(data: bytes, key: bytes) -> bytes:
return bytes(b ^ key[i % len(key)] for i, b in enumerate(data))
def digest(data: bytes, algo: str) -> str:
h = hashlib.new(algo)
h.update(data)
return h.hexdigest()
def parse_entries(apk_path: Path) -> list[dict[str, Any]]:
_, out = run_capture(["apkInspector", "-apk", str(apk_path), "-lc"], check=True)
entries: list[dict[str, Any]] = []
for m in re.finditer(r"\{[^\n]*'filename': '([^']+)'[^\n]*\}", out):
line = m.group(0)
filename = m.group(1)
def gi(pattern: str) -> int | None:
mm = re.search(pattern, line)
return int(mm.group(1)) if mm else None
c = gi(r"'compressed_size':\s*(\d+)")
u = gi(r"'uncompressed_size':\s*(\d+)")
o = gi(r"'relative_offset_of_local_file_header':\s*(\d+)")
cm = gi(r"'compression_method':\s*(\d+)")
if None in (c, u, o, cm):
continue
entries.append({
"filename": filename,
"compressed_size": c,
"uncompressed_size": u,
"offset": o,
"compression_method": cm,
})
return entries
def extract_entry_data(apk_bytes: bytes, entry: dict[str, Any]) -> bytes:
off = int(entry["offset"])
if apk_bytes[off:off + 4] != b"PK\x03\x04":
raise ValueError(f"Invalid local header signature at offset {off}")
hdr = apk_bytes[off + 4:off + 30]
local_cm = int.from_bytes(hdr[4:6], "little")
fnl = int.from_bytes(hdr[22:24], "little")
exl = int.from_bytes(hdr[24:26], "little")
start = off + 30 + fnl + exl
csize = int(entry["compressed_size"])
comp_data = apk_bytes[start:start + csize]
if local_cm == 0:
return comp_data
if local_cm == 8:
return zlib.decompress(comp_data, -15)
raise ValueError(f"Unsupported compression method: {local_cm}")
def decompile_sources(apk_path: Path, tmp: Path) -> Path:
jadx_out = tmp / "jadx"
dex_files: list[Path] = []
for d in ("classes.dex", "classes2.dex", "classes3.dex", "classes4.dex"):
out_file = Path.cwd() / f"EXTRACTED_{d}"
if out_file.exists():
out_file.unlink()
code, _ = run_capture(["apkInspector", "-apk", str(apk_path), "-f", d, "-x"], check=False)
if code == 0 and out_file.exists():
dst = tmp / out_file.name
shutil.copy2(out_file, dst)
dex_files.append(dst)
if not dex_files:
raise RuntimeError("No classes*.dex extracted")
run_capture(["jadx", "-d", str(jadx_out), *[str(x) for x in dex_files]], check=False)
return jadx_out / "sources"
def find_bootstrap(sources_dir: Path) -> dict[str, str] | None:
for jf in sources_dir.rglob("*.java"):
text = jf.read_text(errors="ignore")
if "extends Application" not in text:
continue
m_open = re.search(r"getAssets\(\)\.open\(this\.(\w+)\s*\+\s*this\.(\w+)\)", text)
m_keyvar = re.search(r"byte\[\]\s+bytes\s*=\s*this\.(\w+)\.getBytes\(\)", text)
if not m_open or not m_keyvar:
continue
path_var, file_var = m_open.group(1), m_open.group(2)
key_var = m_keyvar.group(1)
fields: dict[str, str] = {}
for fm in re.finditer(r'public\s+String\s+(\w+)\s*=\s*"([^"]*)";', text):
fields[fm.group(1)] = fm.group(2)
fn = fields.get(file_var, "")
rc4 = fields.get(key_var, "")
if fn and rc4:
return {
"class_file": str(jf),
"asset_prefix": fields.get(path_var, ""),
"asset_filename": fn,
"rc4_key": rc4,
}
return None
def pick_asset_entry(entries: list[dict[str, Any]], prefix: str, filename: str) -> dict[str, Any] | None:
exact = prefix + filename
for e in entries:
if e["filename"] == exact:
return e
cands = [e for e in entries if str(e["filename"]).startswith("assets/") and str(e["filename"]).endswith(filename)]
if cands:
cands.sort(key=lambda x: int(x["uncompressed_size"]), reverse=True)
return cands[0]
return None
def find_resource_key_in_stage1(stage1_zip: bytes) -> tuple[str | None, list[str]]:
with zipfile.ZipFile(io.BytesIO(stage1_zip)) as zf:
dex_names = [n for n in zf.namelist() if n.startswith("classes") and n.endswith(".dex")]
if not dex_names:
return None, []
dex_blob = b"".join(zf.read(n) for n in dex_names)
keys = sorted(set(x.decode("ascii") for x in re.findall(rb"[0-9a-f]{64}", dex_blob)))
marker = b"RESOURCE_KEY"
idx = dex_blob.find(marker)
if idx != -1:
window = dex_blob[idx:idx + 8192]
m = re.search(rb"[0-9a-f]{64}", window)
if m:
return m.group(0).decode("ascii"), keys
if len(keys) == 1:
return keys[0], keys
return None, keys
def extract_payload_with_key(apk_bytes: bytes, entries: list[dict[str, Any]], key_hex: str) -> tuple[str, bytes] | None:
key = bytes.fromhex(key_hex)
raw_bins = [e for e in entries if str(e["filename"]).startswith("res/raw/") and str(e["filename"]).endswith(".bin")]
raw_bins.sort(key=lambda x: int(x["uncompressed_size"]), reverse=True)
for e in raw_bins:
try:
enc = extract_entry_data(apk_bytes, e)
except Exception:
continue
dec = xor_crypt(enc, key)
if dec.startswith(b"PK\x03\x04"):
return str(e["filename"]), dec
return None
def process_apk(apk_path: Path, out_dir: Path) -> int:
apk_bytes = apk_path.read_bytes()
entries = parse_entries(apk_path)
with tempfile.TemporaryDirectory(prefix="st_pipeline_") as td:
tmp = Path(td)
try:
sources = decompile_sources(apk_path, tmp)
bs = find_bootstrap(sources)
if not bs:
print(f"[SKIP] {apk_path.name} | key: not found (bootstrap)")
return 1
asset_entry = pick_asset_entry(entries, bs["asset_prefix"], bs["asset_filename"])
if not asset_entry:
print(f"[SKIP] {apk_path.name} | key: not found (asset)")
return 1
stage0 = extract_entry_data(apk_bytes, asset_entry)
stage1 = rc4_crypt(stage0, bs["rc4_key"].encode("utf-8"))
if not stage1.startswith(b"PK\x03\x04"):
print(f"[SKIP] {apk_path.name} | key: not found (stage1)")
return 1
key, candidates = find_resource_key_in_stage1(stage1)
if not key:
# fallback: test all candidates
for k in candidates:
if extract_payload_with_key(apk_bytes, entries, k):
key = k
break
if not key:
print(f"[SKIP] {apk_path.name} | key: not found (resource_key)")
return 1
payload = extract_payload_with_key(apk_bytes, entries, key)
if not payload:
print(f"[SKIP] {apk_path.name} | key: {key} | payload: not found")
return 1
inner_file, payload_bytes = payload
out_file = ensure_unique_path(out_dir / f"{apk_path.stem}_extracted_payload.apk")
out_file.write_bytes(payload_bytes)
print(f"[OK] {apk_path.name}")
print(f" key: {key}")
print(f" payload: {out_file.name}")
print(f" source: {inner_file} | sha256: {digest(payload_bytes, 'sha256')}")
return 0
except Exception as e:
print(f"[ERR] {apk_path.name} | {e}")
return 2
def main() -> int:
ap = argparse.ArgumentParser(description="Unified key+payload extraction pipeline for streamtv APKs")
ap.add_argument("apk", nargs="?", help="Single APK path")
ap.add_argument("--glob", default="", help="Batch glob (e.g. 'streamtv_*.apk')")
ap.add_argument("--cwd", default=".", help="Directory for --glob")
ap.add_argument("--out-dir", default="streamtv_payloads_pipeline", help="Output directory for payload APKs")
args = ap.parse_args()
out_dir = Path(args.out_dir).expanduser().resolve()
out_dir.mkdir(parents=True, exist_ok=True)
if args.glob:
cwd = Path(args.cwd).expanduser().resolve()
apks = sorted(cwd.glob(args.glob))
if not apks:
print(f"No files matched: {args.glob}", file=sys.stderr)
return 1
ok = skip = err = 0
for apk in apks:
rc = process_apk(apk.resolve(), out_dir)
if rc == 0:
ok += 1
elif rc == 1:
skip += 1
else:
err += 1
print(f"\nSummary: ok={ok}, skipped={skip}, errors={err}, total={len(apks)}")
return 0 if err == 0 else 2
if not args.apk:
print("Provide an APK path or use --glob", file=sys.stderr)
return 1
apk = Path(args.apk).expanduser().resolve()
if not apk.exists():
print(f"APK not found: {apk}", file=sys.stderr)
return 1
return process_apk(apk, out_dir)
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment