Skip to content

Instantly share code, notes, and snippets.

@stek29
Last active June 13, 2026 17:19
Show Gist options
  • Select an option

  • Save stek29/07ebf46f597b5a16e3b5b5e2bbe5848f to your computer and use it in GitHub Desktop.

Select an option

Save stek29/07ebf46f597b5a16e3b5b5e2bbe5848f to your computer and use it in GitHub Desktop.
convert findmy.py json and plist to opentagviewer zip file
#!/usr/bin/env python3
"""
Convert FindMy data into an OpenTagViewer-compatible zip archive.
Scans a directory for both FindMy.py JSON files and decrypted plist files,
deduplicates by accessory identifier, and produces a zip that can be imported
in the OpenTagViewer Android app.
The structure follows the wiki guide:
https://github.com/parawanderer/OpenTagViewer/wiki/How-To:-Manually-Export-AirTags
Usage::
python findmy_json_to_opentagviewer_zip.py \\
-i ./devices/ \\
-o ./export.zip \\
-u "user@example.com"
"""
from __future__ import annotations
import argparse
import hashlib
import json
import plistlib
import re
import shutil
import sys
import tempfile
import time
import uuid as uuid_mod
from datetime import datetime, timezone
from pathlib import Path
from zipfile import ZipFile
WIZARD_METADATA_FILENAME = "OPENTAGVIEWER.yml"
WIZARD_METADATA_VERSION = "1.0.0"
def _fix_nano_dates(data: bytes) -> bytes:
"""Strip fractional seconds unsupported by plistlib."""
return re.sub(
rb"(<date>[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})\.[0-9]+Z(</date>)",
rb"\1Z\2",
data,
)
def _deterministic_uuid4(value: str) -> str:
"""Derive a stable UUID with the version/variant bits required by the app."""
raw = bytearray(hashlib.sha256(value.encode("utf-8")).digest()[:16])
raw[6] = (raw[6] & 0x0F) | 0x40
raw[8] = (raw[8] & 0x3F) | 0x80
return str(uuid_mod.UUID(bytes=bytes(raw))).upper()
def _plist_date(value: str | datetime) -> datetime:
dt = datetime.fromisoformat(value) if isinstance(value, str) else value
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
return dt.replace(microsecond=0)
def _key_data(value: object, field: str) -> bytes:
if isinstance(value, bytes):
return value
if isinstance(value, dict):
key = value.get("key")
if isinstance(key, dict) and isinstance(key.get("data"), bytes):
return key["data"]
raise ValueError(f"{field} must contain binary data under key.data")
def _key_wrapper(data: bytes) -> dict:
return {"key": {"data": data}}
def _cloudkit_metadata(timestamp: datetime) -> bytes:
metadata = {
"$archiver": "NSKeyedArchiver",
"$version": 100000,
"$objects": [
"$null",
{
"RecordCtime": timestamp,
"RecordMtime": timestamp,
"ModifiedByDevice": "export-findmy",
},
],
"$top": {"root": plistlib.UID(1)},
}
return plistlib.dumps(metadata, fmt=plistlib.FMT_BINARY, sort_keys=False)
def _stable_identifier(raw: dict) -> str:
stable = raw.get("stableIdentifier")
if isinstance(stable, list) and stable and isinstance(stable[0], str):
return stable[0]
if isinstance(stable, str) and stable:
return stable
identifier = raw.get("identifier")
if isinstance(identifier, str) and identifier:
return identifier
raise ValueError("missing identifier/stableIdentifier")
def _metadata_defaults(stable_identifier: str, model: str) -> dict:
is_airtag = stable_identifier.startswith("2006~#")
return {
"batteryLevel": 0,
"model": model,
"productId": 21760 if is_airtag else -1,
"systemVersion": "",
"vendorId": 76 if is_airtag else -1,
}
# ── conversion to staging ────────────────────────────────────────────
def _acc_to_ownedbeacon_plist(acc: dict) -> dict:
master_key = bytes.fromhex(acc["master_key"])
skn = bytes.fromhex(acc["skn"])
sks = bytes.fromhex(acc["sks"])
if len(master_key) != 28:
raise ValueError("master_key must be exactly 28 bytes")
if len(skn) != 32 or len(sks) != 32:
raise ValueError("skn and sks must be exactly 32 bytes")
stable_identifier = acc["identifier"]
model = acc.get("model") or ""
plist: dict = {
**_metadata_defaults(stable_identifier, model),
"privateKey": _key_wrapper(master_key),
"sharedSecret": _key_wrapper(skn),
"secondarySharedSecret": _key_wrapper(sks),
"pairingDate": _plist_date(acc["paired_at"]),
"identifier": stable_identifier,
"stableIdentifier": [stable_identifier],
}
gid = acc.get("group_identifier")
if gid:
plist["groupIdentifier"] = gid
return plist
def _acc_to_beaconnaming_plist(acc: dict, archive_id: str, record_id: str) -> dict:
paired_at = _plist_date(acc["paired_at"])
plist: dict = {
"identifier": record_id,
"name": acc.get("name") or acc.get("model") or "Unknown",
"emoji": acc.get("emoji") or "",
"associatedBeacon": archive_id,
"cloudKitMetadata": _cloudkit_metadata(paired_at),
}
plist["addDate"] = paired_at
return plist
def _plist_to_archive_records(raw: dict, filename: str) -> tuple[str, str, dict, dict]:
stable_identifier = _stable_identifier(raw)
archive_id = _deterministic_uuid4(f"beacon:{stable_identifier}")
record_id = _deterministic_uuid4(f"naming:{stable_identifier}")
model = raw.get("model") if isinstance(raw.get("model"), str) else ""
ob = {
**_metadata_defaults(stable_identifier, model),
**{
key: value
for key, value in raw.items()
if key not in ("name", "emoji", "addDate")
},
"identifier": raw.get("identifier") or stable_identifier,
"stableIdentifier": [stable_identifier],
}
for field in (
"privateKey",
"sharedSecret",
"secondarySharedSecret",
"secureLocationsSharedSecret",
"publicKey",
):
if field in ob:
ob[field] = _key_wrapper(_key_data(ob[field], field))
if "privateKey" not in ob or "sharedSecret" not in ob:
raise ValueError("missing privateKey or sharedSecret")
if (
"secondarySharedSecret" not in ob
and "secureLocationsSharedSecret" not in ob
):
raise ValueError(
"missing secondarySharedSecret/secureLocationsSharedSecret"
)
if "pairingDate" not in ob:
raise ValueError("missing pairingDate")
ob["pairingDate"] = _plist_date(ob["pairingDate"])
naming_date = _plist_date(raw.get("addDate", ob["pairingDate"]))
nr = {
"identifier": record_id,
"name": raw.get("name") or model or Path(filename).stem,
"emoji": raw.get("emoji") or "",
"associatedBeacon": archive_id,
"cloudKitMetadata": _cloudkit_metadata(naming_date),
"addDate": naming_date,
}
return stable_identifier, archive_id, ob, nr
# ── input scanning ───────────────────────────────────────────────────
def _collect_and_convert(args: argparse.Namespace) -> None:
src = Path(args.input_dir)
if not src.is_dir():
print(f"Error: '{args.input_dir}' is not a directory")
sys.exit(1)
json_files = sorted(src.glob("*.json"))
plist_files = sorted(src.glob("*.plist"))
if not json_files and not plist_files:
print(f"Error: no .json or .plist files found in '{args.input_dir}'")
sys.exit(1)
# stable identifier -> (archive UUID, OwnedBeacon plist, naming plist)
accessories: dict[str, tuple[str, dict, dict]] = {}
# Load plist exports first as a fallback. When both formats describe the
# same accessory, the richer FindMy.py JSON export is authoritative.
for pf in plist_files:
try:
raw = pf.read_bytes()
raw = _fix_nano_dates(raw)
plist = plistlib.loads(raw)
if not isinstance(plist, dict):
continue
stable_identifier, archive_id, ob, nr = _plist_to_archive_records(
plist, pf.name
)
except Exception as exc:
print(f"Warning: could not parse '{pf.name}': {exc}")
continue
accessories[stable_identifier] = (archive_id, ob, nr)
for jf in json_files:
try:
with open(jf) as fh:
data = json.load(fh)
items = data if isinstance(data, list) else [data]
for acc in items:
if not isinstance(acc, dict) or acc.get("type") != "accessory":
continue
stable_identifier = acc["identifier"]
archive_id = _deterministic_uuid4(f"beacon:{stable_identifier}")
record_id = _deterministic_uuid4(f"naming:{stable_identifier}")
accessories[stable_identifier] = (
archive_id,
_acc_to_ownedbeacon_plist(acc),
_acc_to_beaconnaming_plist(acc, archive_id, record_id),
)
except Exception as exc:
print(f"Warning: could not convert '{jf.name}': {exc}")
if not accessories:
print("Error: no valid accessories found")
sys.exit(1)
print(f"Found {len(accessories)} unique accessory(ies) in '{args.input_dir}'")
staging = Path(tempfile.mkdtemp(prefix="findmy_to_zip_"))
try:
ob_dir = staging / "OwnedBeacons"
nr_dir = staging / "BeaconNamingRecord"
ob_dir.mkdir()
nr_dir.mkdir()
for stable_identifier, (archive_id, ob, nr) in accessories.items():
record_id = _deterministic_uuid4(f"naming:{stable_identifier}")
(ob_dir / f"{archive_id}.plist").write_bytes(
plistlib.dumps(ob, sort_keys=False)
)
(nr_dir / archive_id).mkdir(exist_ok=True)
(nr_dir / archive_id / f"{record_id}.plist").write_bytes(
plistlib.dumps(nr, sort_keys=False)
)
_write_metadata(staging, args.user, "Converted from FindMy data")
_create_zip(staging, args.output, len(accessories))
finally:
shutil.rmtree(staging, ignore_errors=True)
# ── shared helpers ───────────────────────────────────────────────────
def _write_metadata(staging: Path, user: str, via: str) -> None:
export_ts = int(time.time() * 1000)
lines = [
f"version: {json.dumps(WIZARD_METADATA_VERSION)}",
f"exportTimestamp: {export_ts}",
f"sourceUser: {json.dumps(user)}",
f"via: {json.dumps(via)}",
"",
]
(staging / WIZARD_METADATA_FILENAME).write_text("\n".join(lines))
def _create_zip(staging: Path, output: Path, count: int) -> None:
output.parent.mkdir(parents=True, exist_ok=True)
with ZipFile(output, "w") as zf:
for fpath in sorted(staging.rglob("*")):
if fpath.is_file():
zf.write(fpath, fpath.relative_to(staging))
print(f"Created '{output}' ({count} accessories)")
# ── CLI ──────────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Convert FindMy JSON/plist files into an OpenTagViewer-compatible "
"zip archive."
),
)
parser.add_argument(
"-i", "--input-dir",
type=Path,
required=True,
help="Directory containing .json and/or .plist files",
)
parser.add_argument(
"-o", "--output",
type=Path,
required=True,
help="Output zip file path",
)
parser.add_argument(
"-u", "--user",
type=str,
required=True,
help="Source user identifier (e.g. email address)",
)
return parser
def main() -> None:
parser = build_parser()
args = parser.parse_args()
_collect_and_convert(args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment