Skip to content

Instantly share code, notes, and snippets.

@rromanchuk
Last active February 27, 2026 23:24
Show Gist options
  • Select an option

  • Save rromanchuk/8e7b221db4f83a2120fcf2573b1ae1b6 to your computer and use it in GitHub Desktop.

Select an option

Save rromanchuk/8e7b221db4f83a2120fcf2573b1ae1b6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S uv run --script
#
# /// script
# dependencies = []
# ///
"""
Decode MeshCore RX log and extract public keys from ADVERT packets.
Packet wire format (from meshcore-dev/MeshCore src/Packet.cpp):
[header (1 byte)]
[transport_codes (4 bytes, only if route=TransportFlood(0) or TransportDirect(3))]
[path_len (1 byte)]
[path (path_len bytes)]
[payload (remaining bytes)]
Header byte:
bits 1-0: route type
bits 5-2: payload type
bits 7-6: payload version
ADVERT payload format (from michaelhart/meshcore-decoder advert.ts):
[public_key (32 bytes)]
[timestamp (4 bytes LE)]
[signature (64 bytes)]
[flags (1 byte)]
[optional: lat (4 bytes) + lon (4 bytes) if HasLocation flag]
[optional: name (variable) if HasName flag]
"""
import json
import sys
from datetime import datetime, timezone
# Route types
ROUTE_TRANSPORT_FLOOD = 0x00
ROUTE_FLOOD = 0x01
ROUTE_DIRECT = 0x02
ROUTE_TRANSPORT_DIRECT = 0x03
# Payload types
PAYLOAD_TYPE_REQ = 0x00
PAYLOAD_TYPE_RESPONSE = 0x01
PAYLOAD_TYPE_TXT_MSG = 0x02
PAYLOAD_TYPE_ACK = 0x03
PAYLOAD_TYPE_ADVERT = 0x04
PAYLOAD_TYPE_GRP_TXT = 0x05
PAYLOAD_TYPE_GRP_DATA = 0x06
PAYLOAD_TYPE_ANON_REQ = 0x07
PAYLOAD_TYPE_PATH = 0x08
PAYLOAD_TYPE_TRACE = 0x09
PAYLOAD_TYPE_MULTIPART = 0x0A
PAYLOAD_TYPE_CONTROL = 0x0B
PAYLOAD_TYPE_NAMES = {
0x00: "REQ", 0x01: "RESPONSE", 0x02: "TXT_MSG", 0x03: "ACK",
0x04: "ADVERT", 0x05: "GRP_TXT", 0x06: "GRP_DATA", 0x07: "ANON_REQ",
0x08: "PATH", 0x09: "TRACE", 0x0A: "MULTIPART", 0x0B: "CONTROL",
}
# Advert flags
FLAG_HAS_LOCATION = 0x10
FLAG_HAS_FEATURE1 = 0x20
FLAG_HAS_FEATURE2 = 0x40
FLAG_HAS_NAME = 0x80
DEVICE_ROLES = {0x01: "ChatNode", 0x02: "Repeater", 0x03: "RoomServer", 0x04: "Sensor"}
def decode_packet(hex_str):
data = bytes.fromhex(hex_str)
if len(data) < 2:
return None
header = data[0]
route_type = header & 0x03
payload_type = (header >> 2) & 0x0F
payload_ver = (header >> 6) & 0x03
offset = 1
# transport codes (4 bytes) for TransportFlood/TransportDirect
transport_codes = None
if route_type in (ROUTE_TRANSPORT_FLOOD, ROUTE_TRANSPORT_DIRECT):
if len(data) < offset + 4:
return None
transport_codes = data[offset:offset + 4]
offset += 4
# path length + path
if len(data) < offset + 1:
return None
path_len = data[offset]
offset += 1
if len(data) < offset + path_len:
return None
path = data[offset:offset + path_len]
offset += path_len
payload = data[offset:]
return {
"route_type": route_type,
"payload_type": payload_type,
"payload_ver": payload_ver,
"transport_codes": transport_codes,
"path": path,
"payload": payload,
}
def decode_advert(payload):
# minimum: public_key(32) + timestamp(4) + signature(64) + flags(1) = 101
if len(payload) < 101:
return None
offset = 0
public_key = payload[offset:offset + 32].hex().upper()
offset += 32
timestamp = int.from_bytes(payload[offset:offset + 4], "little")
offset += 4
signature = payload[offset:offset + 64].hex()
offset += 64
flags = payload[offset]
offset += 1
role = DEVICE_ROLES.get(flags & 0x0F, "Unknown")
has_location = bool(flags & FLAG_HAS_LOCATION)
has_name = bool(flags & FLAG_HAS_NAME)
location = None
if has_location and len(payload) >= offset + 8:
lat_raw = int.from_bytes(payload[offset:offset + 4], "little", signed=True)
lon_raw = int.from_bytes(payload[offset + 4:offset + 8], "little", signed=True)
location = {"lat": lat_raw / 1_000_000, "lon": lon_raw / 1_000_000}
offset += 8
if flags & FLAG_HAS_FEATURE1:
offset += 2
if flags & FLAG_HAS_FEATURE2:
offset += 2
name = None
if has_name and offset < len(payload):
name = payload[offset:].decode("utf-8", errors="replace").rstrip("\x00")
ts_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() if timestamp else None
return {
"public_key": public_key,
"timestamp": timestamp,
"timestamp_str": ts_str,
"signature": signature,
"role": role,
"has_location": has_location,
"location": location,
"name": name,
}
# Payload types where payload[0]=dest_hash, payload[1]=src_hash
HASHED_PAYLOAD_TYPES = {
PAYLOAD_TYPE_REQ, PAYLOAD_TYPE_RESPONSE, PAYLOAD_TYPE_TXT_MSG,
PAYLOAD_TYPE_ACK, PAYLOAD_TYPE_GRP_TXT, PAYLOAD_TYPE_GRP_DATA,
PAYLOAD_TYPE_ANON_REQ, PAYLOAD_TYPE_PATH,
}
def collect_node_ids(entries):
"""Collect all observed 1-byte node IDs from paths and payload src/dest hashes."""
from collections import Counter
counts = Counter()
advert_keys = {} # node_id_byte -> full public key + metadata
for entry in entries:
pkt = decode_packet(entry["packet"])
if not pkt:
continue
# path bytes are relay node IDs
for b in pkt["path"]:
counts[b] += 1
payload = pkt["payload"]
ptype = pkt["payload_type"]
if ptype == PAYLOAD_TYPE_ADVERT:
advert = decode_advert(payload)
if advert:
node_id = int(advert["public_key"][:2], 16)
counts[node_id] += 1
advert_keys[node_id] = advert
elif ptype in HASHED_PAYLOAD_TYPES and len(payload) >= 2:
dest, src = payload[0], payload[1]
counts[dest] += 1
counts[src] += 1
return counts, advert_keys
NON_REPEATER_ROLES = {"ChatNode", "RoomServer", "Sensor"}
def main(log_path):
with open(log_path) as f:
entries = json.load(f)
counts, advert_keys = collect_node_ids(entries)
# Classify known nodes by role
known_clients = {nid for nid, info in advert_keys.items() if info["role"] in NON_REPEATER_ROLES}
known_repeaters = {nid for nid, info in advert_keys.items() if info["role"] == "Repeater"}
# --- Repeater public keys ---
repeater_adverts = {nid: advert_keys[nid] for nid in known_repeaters}
print(f"Repeaters seen in ADVERT packets ({len(repeater_adverts)}):\n")
for node_id, info in sorted(repeater_adverts.items()):
print(f" {info['public_key']}")
if info["name"]:
print(f" name: {info['name']}")
print()
# --- Prefix frequency (repeaters + unknown role only) ---
# Exclude prefixes we know belong to non-repeater nodes
filtered = {nid: cnt for nid, cnt in counts.items() if nid not in known_clients}
print(f"Prefix frequency — repeaters + unknown role ({len(filtered)} prefixes, {len(known_clients)} client prefix(es) excluded):\n")
print(f" {'PREFIX':6} {'COUNT':>6} {'ROLE':<12} {'NAME'}")
print(f" {'------':6} {'------':>6} {'----':<12} {'----'}")
for node_id, count in sorted(filtered.items(), key=lambda x: -x[1]):
if node_id in advert_keys:
role = advert_keys[node_id]["role"]
name = advert_keys[node_id]["name"] or ""
else:
role, name = "unknown", ""
print(f" {node_id:02X} {count:>5} {role:<12} {name}")
# --- Unused prefixes (not seen at all, excluding 00/FF) ---
used = set(counts.keys())
unused = [f"{i:02X}" for i in range(0x01, 0xFF) if i not in used]
print(f"\n{len(unused)} unused prefixes (first 50):\n")
cols = 10
for i in range(0, min(50, len(unused)), cols):
print(" " + " ".join(unused[i:i + cols]))
return [info["public_key"] for info in repeater_adverts.values()]
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Decode MeshCore RX log and report node prefix frequency")
parser.add_argument("log", nargs="?", default="meshcore_rxlog_20260227_164713.json", help="Path to RX log JSON file")
args = parser.parse_args()
keys = main(args.log)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment