Last active
June 13, 2026 17:19
-
-
Save stek29/07ebf46f597b5a16e3b5b5e2bbe5848f to your computer and use it in GitHub Desktop.
convert findmy.py json and plist to opentagviewer zip file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Convert FindMy data into an OpenTagViewer-compatible zip archive. | |
| Scans a directory for both FindMy.py JSON files and decrypted plist files, | |
| deduplicates by accessory identifier, and produces a zip that can be imported | |
| in the OpenTagViewer Android app. | |
| The structure follows the wiki guide: | |
| https://github.com/parawanderer/OpenTagViewer/wiki/How-To:-Manually-Export-AirTags | |
| Usage:: | |
| python findmy_json_to_opentagviewer_zip.py \\ | |
| -i ./devices/ \\ | |
| -o ./export.zip \\ | |
| -u "user@example.com" | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import hashlib | |
| import json | |
| import plistlib | |
| import re | |
| import shutil | |
| import sys | |
| import tempfile | |
| import time | |
| import uuid as uuid_mod | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from zipfile import ZipFile | |
| WIZARD_METADATA_FILENAME = "OPENTAGVIEWER.yml" | |
| WIZARD_METADATA_VERSION = "1.0.0" | |
| def _fix_nano_dates(data: bytes) -> bytes: | |
| """Strip fractional seconds unsupported by plistlib.""" | |
| return re.sub( | |
| rb"(<date>[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})\.[0-9]+Z(</date>)", | |
| rb"\1Z\2", | |
| data, | |
| ) | |
| def _deterministic_uuid4(value: str) -> str: | |
| """Derive a stable UUID with the version/variant bits required by the app.""" | |
| raw = bytearray(hashlib.sha256(value.encode("utf-8")).digest()[:16]) | |
| raw[6] = (raw[6] & 0x0F) | 0x40 | |
| raw[8] = (raw[8] & 0x3F) | 0x80 | |
| return str(uuid_mod.UUID(bytes=bytes(raw))).upper() | |
| def _plist_date(value: str | datetime) -> datetime: | |
| dt = datetime.fromisoformat(value) if isinstance(value, str) else value | |
| if dt.tzinfo is not None: | |
| dt = dt.astimezone(timezone.utc).replace(tzinfo=None) | |
| return dt.replace(microsecond=0) | |
| def _key_data(value: object, field: str) -> bytes: | |
| if isinstance(value, bytes): | |
| return value | |
| if isinstance(value, dict): | |
| key = value.get("key") | |
| if isinstance(key, dict) and isinstance(key.get("data"), bytes): | |
| return key["data"] | |
| raise ValueError(f"{field} must contain binary data under key.data") | |
| def _key_wrapper(data: bytes) -> dict: | |
| return {"key": {"data": data}} | |
| def _cloudkit_metadata(timestamp: datetime) -> bytes: | |
| metadata = { | |
| "$archiver": "NSKeyedArchiver", | |
| "$version": 100000, | |
| "$objects": [ | |
| "$null", | |
| { | |
| "RecordCtime": timestamp, | |
| "RecordMtime": timestamp, | |
| "ModifiedByDevice": "export-findmy", | |
| }, | |
| ], | |
| "$top": {"root": plistlib.UID(1)}, | |
| } | |
| return plistlib.dumps(metadata, fmt=plistlib.FMT_BINARY, sort_keys=False) | |
| def _stable_identifier(raw: dict) -> str: | |
| stable = raw.get("stableIdentifier") | |
| if isinstance(stable, list) and stable and isinstance(stable[0], str): | |
| return stable[0] | |
| if isinstance(stable, str) and stable: | |
| return stable | |
| identifier = raw.get("identifier") | |
| if isinstance(identifier, str) and identifier: | |
| return identifier | |
| raise ValueError("missing identifier/stableIdentifier") | |
| def _metadata_defaults(stable_identifier: str, model: str) -> dict: | |
| is_airtag = stable_identifier.startswith("2006~#") | |
| return { | |
| "batteryLevel": 0, | |
| "model": model, | |
| "productId": 21760 if is_airtag else -1, | |
| "systemVersion": "", | |
| "vendorId": 76 if is_airtag else -1, | |
| } | |
| # ── conversion to staging ──────────────────────────────────────────── | |
| def _acc_to_ownedbeacon_plist(acc: dict) -> dict: | |
| master_key = bytes.fromhex(acc["master_key"]) | |
| skn = bytes.fromhex(acc["skn"]) | |
| sks = bytes.fromhex(acc["sks"]) | |
| if len(master_key) != 28: | |
| raise ValueError("master_key must be exactly 28 bytes") | |
| if len(skn) != 32 or len(sks) != 32: | |
| raise ValueError("skn and sks must be exactly 32 bytes") | |
| stable_identifier = acc["identifier"] | |
| model = acc.get("model") or "" | |
| plist: dict = { | |
| **_metadata_defaults(stable_identifier, model), | |
| "privateKey": _key_wrapper(master_key), | |
| "sharedSecret": _key_wrapper(skn), | |
| "secondarySharedSecret": _key_wrapper(sks), | |
| "pairingDate": _plist_date(acc["paired_at"]), | |
| "identifier": stable_identifier, | |
| "stableIdentifier": [stable_identifier], | |
| } | |
| gid = acc.get("group_identifier") | |
| if gid: | |
| plist["groupIdentifier"] = gid | |
| return plist | |
| def _acc_to_beaconnaming_plist(acc: dict, archive_id: str, record_id: str) -> dict: | |
| paired_at = _plist_date(acc["paired_at"]) | |
| plist: dict = { | |
| "identifier": record_id, | |
| "name": acc.get("name") or acc.get("model") or "Unknown", | |
| "emoji": acc.get("emoji") or "", | |
| "associatedBeacon": archive_id, | |
| "cloudKitMetadata": _cloudkit_metadata(paired_at), | |
| } | |
| plist["addDate"] = paired_at | |
| return plist | |
| def _plist_to_archive_records(raw: dict, filename: str) -> tuple[str, str, dict, dict]: | |
| stable_identifier = _stable_identifier(raw) | |
| archive_id = _deterministic_uuid4(f"beacon:{stable_identifier}") | |
| record_id = _deterministic_uuid4(f"naming:{stable_identifier}") | |
| model = raw.get("model") if isinstance(raw.get("model"), str) else "" | |
| ob = { | |
| **_metadata_defaults(stable_identifier, model), | |
| **{ | |
| key: value | |
| for key, value in raw.items() | |
| if key not in ("name", "emoji", "addDate") | |
| }, | |
| "identifier": raw.get("identifier") or stable_identifier, | |
| "stableIdentifier": [stable_identifier], | |
| } | |
| for field in ( | |
| "privateKey", | |
| "sharedSecret", | |
| "secondarySharedSecret", | |
| "secureLocationsSharedSecret", | |
| "publicKey", | |
| ): | |
| if field in ob: | |
| ob[field] = _key_wrapper(_key_data(ob[field], field)) | |
| if "privateKey" not in ob or "sharedSecret" not in ob: | |
| raise ValueError("missing privateKey or sharedSecret") | |
| if ( | |
| "secondarySharedSecret" not in ob | |
| and "secureLocationsSharedSecret" not in ob | |
| ): | |
| raise ValueError( | |
| "missing secondarySharedSecret/secureLocationsSharedSecret" | |
| ) | |
| if "pairingDate" not in ob: | |
| raise ValueError("missing pairingDate") | |
| ob["pairingDate"] = _plist_date(ob["pairingDate"]) | |
| naming_date = _plist_date(raw.get("addDate", ob["pairingDate"])) | |
| nr = { | |
| "identifier": record_id, | |
| "name": raw.get("name") or model or Path(filename).stem, | |
| "emoji": raw.get("emoji") or "", | |
| "associatedBeacon": archive_id, | |
| "cloudKitMetadata": _cloudkit_metadata(naming_date), | |
| "addDate": naming_date, | |
| } | |
| return stable_identifier, archive_id, ob, nr | |
| # ── input scanning ─────────────────────────────────────────────────── | |
| def _collect_and_convert(args: argparse.Namespace) -> None: | |
| src = Path(args.input_dir) | |
| if not src.is_dir(): | |
| print(f"Error: '{args.input_dir}' is not a directory") | |
| sys.exit(1) | |
| json_files = sorted(src.glob("*.json")) | |
| plist_files = sorted(src.glob("*.plist")) | |
| if not json_files and not plist_files: | |
| print(f"Error: no .json or .plist files found in '{args.input_dir}'") | |
| sys.exit(1) | |
| # stable identifier -> (archive UUID, OwnedBeacon plist, naming plist) | |
| accessories: dict[str, tuple[str, dict, dict]] = {} | |
| # Load plist exports first as a fallback. When both formats describe the | |
| # same accessory, the richer FindMy.py JSON export is authoritative. | |
| for pf in plist_files: | |
| try: | |
| raw = pf.read_bytes() | |
| raw = _fix_nano_dates(raw) | |
| plist = plistlib.loads(raw) | |
| if not isinstance(plist, dict): | |
| continue | |
| stable_identifier, archive_id, ob, nr = _plist_to_archive_records( | |
| plist, pf.name | |
| ) | |
| except Exception as exc: | |
| print(f"Warning: could not parse '{pf.name}': {exc}") | |
| continue | |
| accessories[stable_identifier] = (archive_id, ob, nr) | |
| for jf in json_files: | |
| try: | |
| with open(jf) as fh: | |
| data = json.load(fh) | |
| items = data if isinstance(data, list) else [data] | |
| for acc in items: | |
| if not isinstance(acc, dict) or acc.get("type") != "accessory": | |
| continue | |
| stable_identifier = acc["identifier"] | |
| archive_id = _deterministic_uuid4(f"beacon:{stable_identifier}") | |
| record_id = _deterministic_uuid4(f"naming:{stable_identifier}") | |
| accessories[stable_identifier] = ( | |
| archive_id, | |
| _acc_to_ownedbeacon_plist(acc), | |
| _acc_to_beaconnaming_plist(acc, archive_id, record_id), | |
| ) | |
| except Exception as exc: | |
| print(f"Warning: could not convert '{jf.name}': {exc}") | |
| if not accessories: | |
| print("Error: no valid accessories found") | |
| sys.exit(1) | |
| print(f"Found {len(accessories)} unique accessory(ies) in '{args.input_dir}'") | |
| staging = Path(tempfile.mkdtemp(prefix="findmy_to_zip_")) | |
| try: | |
| ob_dir = staging / "OwnedBeacons" | |
| nr_dir = staging / "BeaconNamingRecord" | |
| ob_dir.mkdir() | |
| nr_dir.mkdir() | |
| for stable_identifier, (archive_id, ob, nr) in accessories.items(): | |
| record_id = _deterministic_uuid4(f"naming:{stable_identifier}") | |
| (ob_dir / f"{archive_id}.plist").write_bytes( | |
| plistlib.dumps(ob, sort_keys=False) | |
| ) | |
| (nr_dir / archive_id).mkdir(exist_ok=True) | |
| (nr_dir / archive_id / f"{record_id}.plist").write_bytes( | |
| plistlib.dumps(nr, sort_keys=False) | |
| ) | |
| _write_metadata(staging, args.user, "Converted from FindMy data") | |
| _create_zip(staging, args.output, len(accessories)) | |
| finally: | |
| shutil.rmtree(staging, ignore_errors=True) | |
| # ── shared helpers ─────────────────────────────────────────────────── | |
| def _write_metadata(staging: Path, user: str, via: str) -> None: | |
| export_ts = int(time.time() * 1000) | |
| lines = [ | |
| f"version: {json.dumps(WIZARD_METADATA_VERSION)}", | |
| f"exportTimestamp: {export_ts}", | |
| f"sourceUser: {json.dumps(user)}", | |
| f"via: {json.dumps(via)}", | |
| "", | |
| ] | |
| (staging / WIZARD_METADATA_FILENAME).write_text("\n".join(lines)) | |
| def _create_zip(staging: Path, output: Path, count: int) -> None: | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| with ZipFile(output, "w") as zf: | |
| for fpath in sorted(staging.rglob("*")): | |
| if fpath.is_file(): | |
| zf.write(fpath, fpath.relative_to(staging)) | |
| print(f"Created '{output}' ({count} accessories)") | |
| # ── CLI ────────────────────────────────────────────────────────────── | |
| def build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser( | |
| description=( | |
| "Convert FindMy JSON/plist files into an OpenTagViewer-compatible " | |
| "zip archive." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "-i", "--input-dir", | |
| type=Path, | |
| required=True, | |
| help="Directory containing .json and/or .plist files", | |
| ) | |
| parser.add_argument( | |
| "-o", "--output", | |
| type=Path, | |
| required=True, | |
| help="Output zip file path", | |
| ) | |
| parser.add_argument( | |
| "-u", "--user", | |
| type=str, | |
| required=True, | |
| help="Source user identifier (e.g. email address)", | |
| ) | |
| return parser | |
| def main() -> None: | |
| parser = build_parser() | |
| args = parser.parse_args() | |
| _collect_and_convert(args) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment