Last active
February 27, 2026 23:24
-
-
Save rromanchuk/8e7b221db4f83a2120fcf2573b1ae1b6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # | |
| # /// script | |
| # dependencies = [] | |
| # /// | |
| """ | |
| Decode MeshCore RX log and extract public keys from ADVERT packets. | |
| Packet wire format (from meshcore-dev/MeshCore src/Packet.cpp): | |
| [header (1 byte)] | |
| [transport_codes (4 bytes, only if route=TransportFlood(0) or TransportDirect(3))] | |
| [path_len (1 byte)] | |
| [path (path_len bytes)] | |
| [payload (remaining bytes)] | |
| Header byte: | |
| bits 1-0: route type | |
| bits 5-2: payload type | |
| bits 7-6: payload version | |
| ADVERT payload format (from michaelhart/meshcore-decoder advert.ts): | |
| [public_key (32 bytes)] | |
| [timestamp (4 bytes LE)] | |
| [signature (64 bytes)] | |
| [flags (1 byte)] | |
| [optional: lat (4 bytes) + lon (4 bytes) if HasLocation flag] | |
| [optional: name (variable) if HasName flag] | |
| """ | |
| import json | |
| import sys | |
| from datetime import datetime, timezone | |
| # Route types | |
| ROUTE_TRANSPORT_FLOOD = 0x00 | |
| ROUTE_FLOOD = 0x01 | |
| ROUTE_DIRECT = 0x02 | |
| ROUTE_TRANSPORT_DIRECT = 0x03 | |
| # Payload types | |
| PAYLOAD_TYPE_REQ = 0x00 | |
| PAYLOAD_TYPE_RESPONSE = 0x01 | |
| PAYLOAD_TYPE_TXT_MSG = 0x02 | |
| PAYLOAD_TYPE_ACK = 0x03 | |
| PAYLOAD_TYPE_ADVERT = 0x04 | |
| PAYLOAD_TYPE_GRP_TXT = 0x05 | |
| PAYLOAD_TYPE_GRP_DATA = 0x06 | |
| PAYLOAD_TYPE_ANON_REQ = 0x07 | |
| PAYLOAD_TYPE_PATH = 0x08 | |
| PAYLOAD_TYPE_TRACE = 0x09 | |
| PAYLOAD_TYPE_MULTIPART = 0x0A | |
| PAYLOAD_TYPE_CONTROL = 0x0B | |
| PAYLOAD_TYPE_NAMES = { | |
| 0x00: "REQ", 0x01: "RESPONSE", 0x02: "TXT_MSG", 0x03: "ACK", | |
| 0x04: "ADVERT", 0x05: "GRP_TXT", 0x06: "GRP_DATA", 0x07: "ANON_REQ", | |
| 0x08: "PATH", 0x09: "TRACE", 0x0A: "MULTIPART", 0x0B: "CONTROL", | |
| } | |
| # Advert flags | |
| FLAG_HAS_LOCATION = 0x10 | |
| FLAG_HAS_FEATURE1 = 0x20 | |
| FLAG_HAS_FEATURE2 = 0x40 | |
| FLAG_HAS_NAME = 0x80 | |
| DEVICE_ROLES = {0x01: "ChatNode", 0x02: "Repeater", 0x03: "RoomServer", 0x04: "Sensor"} | |
| def decode_packet(hex_str): | |
| data = bytes.fromhex(hex_str) | |
| if len(data) < 2: | |
| return None | |
| header = data[0] | |
| route_type = header & 0x03 | |
| payload_type = (header >> 2) & 0x0F | |
| payload_ver = (header >> 6) & 0x03 | |
| offset = 1 | |
| # transport codes (4 bytes) for TransportFlood/TransportDirect | |
| transport_codes = None | |
| if route_type in (ROUTE_TRANSPORT_FLOOD, ROUTE_TRANSPORT_DIRECT): | |
| if len(data) < offset + 4: | |
| return None | |
| transport_codes = data[offset:offset + 4] | |
| offset += 4 | |
| # path length + path | |
| if len(data) < offset + 1: | |
| return None | |
| path_len = data[offset] | |
| offset += 1 | |
| if len(data) < offset + path_len: | |
| return None | |
| path = data[offset:offset + path_len] | |
| offset += path_len | |
| payload = data[offset:] | |
| return { | |
| "route_type": route_type, | |
| "payload_type": payload_type, | |
| "payload_ver": payload_ver, | |
| "transport_codes": transport_codes, | |
| "path": path, | |
| "payload": payload, | |
| } | |
| def decode_advert(payload): | |
| # minimum: public_key(32) + timestamp(4) + signature(64) + flags(1) = 101 | |
| if len(payload) < 101: | |
| return None | |
| offset = 0 | |
| public_key = payload[offset:offset + 32].hex().upper() | |
| offset += 32 | |
| timestamp = int.from_bytes(payload[offset:offset + 4], "little") | |
| offset += 4 | |
| signature = payload[offset:offset + 64].hex() | |
| offset += 64 | |
| flags = payload[offset] | |
| offset += 1 | |
| role = DEVICE_ROLES.get(flags & 0x0F, "Unknown") | |
| has_location = bool(flags & FLAG_HAS_LOCATION) | |
| has_name = bool(flags & FLAG_HAS_NAME) | |
| location = None | |
| if has_location and len(payload) >= offset + 8: | |
| lat_raw = int.from_bytes(payload[offset:offset + 4], "little", signed=True) | |
| lon_raw = int.from_bytes(payload[offset + 4:offset + 8], "little", signed=True) | |
| location = {"lat": lat_raw / 1_000_000, "lon": lon_raw / 1_000_000} | |
| offset += 8 | |
| if flags & FLAG_HAS_FEATURE1: | |
| offset += 2 | |
| if flags & FLAG_HAS_FEATURE2: | |
| offset += 2 | |
| name = None | |
| if has_name and offset < len(payload): | |
| name = payload[offset:].decode("utf-8", errors="replace").rstrip("\x00") | |
| ts_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() if timestamp else None | |
| return { | |
| "public_key": public_key, | |
| "timestamp": timestamp, | |
| "timestamp_str": ts_str, | |
| "signature": signature, | |
| "role": role, | |
| "has_location": has_location, | |
| "location": location, | |
| "name": name, | |
| } | |
| # Payload types where payload[0]=dest_hash, payload[1]=src_hash | |
| HASHED_PAYLOAD_TYPES = { | |
| PAYLOAD_TYPE_REQ, PAYLOAD_TYPE_RESPONSE, PAYLOAD_TYPE_TXT_MSG, | |
| PAYLOAD_TYPE_ACK, PAYLOAD_TYPE_GRP_TXT, PAYLOAD_TYPE_GRP_DATA, | |
| PAYLOAD_TYPE_ANON_REQ, PAYLOAD_TYPE_PATH, | |
| } | |
| def collect_node_ids(entries): | |
| """Collect all observed 1-byte node IDs from paths and payload src/dest hashes.""" | |
| from collections import Counter | |
| counts = Counter() | |
| advert_keys = {} # node_id_byte -> full public key + metadata | |
| for entry in entries: | |
| pkt = decode_packet(entry["packet"]) | |
| if not pkt: | |
| continue | |
| # path bytes are relay node IDs | |
| for b in pkt["path"]: | |
| counts[b] += 1 | |
| payload = pkt["payload"] | |
| ptype = pkt["payload_type"] | |
| if ptype == PAYLOAD_TYPE_ADVERT: | |
| advert = decode_advert(payload) | |
| if advert: | |
| node_id = int(advert["public_key"][:2], 16) | |
| counts[node_id] += 1 | |
| advert_keys[node_id] = advert | |
| elif ptype in HASHED_PAYLOAD_TYPES and len(payload) >= 2: | |
| dest, src = payload[0], payload[1] | |
| counts[dest] += 1 | |
| counts[src] += 1 | |
| return counts, advert_keys | |
| NON_REPEATER_ROLES = {"ChatNode", "RoomServer", "Sensor"} | |
| def main(log_path): | |
| with open(log_path) as f: | |
| entries = json.load(f) | |
| counts, advert_keys = collect_node_ids(entries) | |
| # Classify known nodes by role | |
| known_clients = {nid for nid, info in advert_keys.items() if info["role"] in NON_REPEATER_ROLES} | |
| known_repeaters = {nid for nid, info in advert_keys.items() if info["role"] == "Repeater"} | |
| # --- Repeater public keys --- | |
| repeater_adverts = {nid: advert_keys[nid] for nid in known_repeaters} | |
| print(f"Repeaters seen in ADVERT packets ({len(repeater_adverts)}):\n") | |
| for node_id, info in sorted(repeater_adverts.items()): | |
| print(f" {info['public_key']}") | |
| if info["name"]: | |
| print(f" name: {info['name']}") | |
| print() | |
| # --- Prefix frequency (repeaters + unknown role only) --- | |
| # Exclude prefixes we know belong to non-repeater nodes | |
| filtered = {nid: cnt for nid, cnt in counts.items() if nid not in known_clients} | |
| print(f"Prefix frequency — repeaters + unknown role ({len(filtered)} prefixes, {len(known_clients)} client prefix(es) excluded):\n") | |
| print(f" {'PREFIX':6} {'COUNT':>6} {'ROLE':<12} {'NAME'}") | |
| print(f" {'------':6} {'------':>6} {'----':<12} {'----'}") | |
| for node_id, count in sorted(filtered.items(), key=lambda x: -x[1]): | |
| if node_id in advert_keys: | |
| role = advert_keys[node_id]["role"] | |
| name = advert_keys[node_id]["name"] or "" | |
| else: | |
| role, name = "unknown", "" | |
| print(f" {node_id:02X} {count:>5} {role:<12} {name}") | |
| # --- Unused prefixes (not seen at all, excluding 00/FF) --- | |
| used = set(counts.keys()) | |
| unused = [f"{i:02X}" for i in range(0x01, 0xFF) if i not in used] | |
| print(f"\n{len(unused)} unused prefixes (first 50):\n") | |
| cols = 10 | |
| for i in range(0, min(50, len(unused)), cols): | |
| print(" " + " ".join(unused[i:i + cols])) | |
| return [info["public_key"] for info in repeater_adverts.values()] | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Decode MeshCore RX log and report node prefix frequency") | |
| parser.add_argument("log", nargs="?", default="meshcore_rxlog_20260227_164713.json", help="Path to RX log JSON file") | |
| args = parser.parse_args() | |
| keys = main(args.log) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment