Skip to content

Instantly share code, notes, and snippets.

@v21
Created June 12, 2026 11:34
Show Gist options
  • Select an option

  • Save v21/293f1533f20636a3d624fd38f40c5025 to your computer and use it in GitHub Desktop.

Select an option

Save v21/293f1533f20636a3d624fd38f40c5025 to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S /Users/v/orbitals/orbitals-godot/tools/.venv/bin/python
"""Forward the USB controller's serial lines as UDP packets to the Godot
editor's SerialController autoload.
The web export reads Web Serial directly and does not need this bridge.
In the editor / desktop builds, run this alongside Godot:
./tools/serial_bridge.py
It auto-reopens the serial port on disconnect and just blasts UDP packets;
the Godot side derives its connected state from packet timing. Don't run
two instances — Godot will receive duplicated updates.
Usage:
./serial_bridge.py [device] [baud] [udp_port]
Defaults: /dev/cu.usbserial-5B1E0647691 @ 115200 → udp://127.0.0.1:7117
"""
import socket
import sys
import time
import serial
DEFAULT_DEV = "/dev/cu.usbserial-5B1E0647691"
DEFAULT_BAUD = 115200
DEFAULT_UDP_PORT = 7117
RECONNECT_BACKOFF_S = 0.5
def open_serial(dev: str, baud: int) -> serial.Serial:
return serial.Serial(
port=dev,
baudrate=baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1.0,
)
def main() -> None:
dev = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DEV
baud = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BAUD
udp_port = int(sys.argv[3]) if len(sys.argv) > 3 else DEFAULT_UDP_PORT
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
dest = ("127.0.0.1", udp_port)
print(f"Bridging {dev} @ {baud} -> udp://127.0.0.1:{udp_port}. Ctrl-C to quit.", flush=True)
sent = 0
last_status_t = time.monotonic()
while True:
try:
ser = open_serial(dev, baud)
except (serial.SerialException, OSError) as e:
print(f" serial open failed: {e}; retrying in {RECONNECT_BACKOFF_S}s", flush=True)
time.sleep(RECONNECT_BACKOFF_S)
continue
print(f" opened {dev}", flush=True)
ser.reset_input_buffer()
ser.readline() # discard the first (likely partial) line
try:
while True:
raw = ser.readline()
if not raw:
continue
line = raw.strip()
if not line:
continue
sock.sendto(line, dest)
sent += 1
now = time.monotonic()
if now - last_status_t > 5.0:
print(f" forwarded {sent} packets", flush=True)
last_status_t = now
except (serial.SerialException, OSError) as e:
print(f" serial read error: {e}; reopening", flush=True)
try:
ser.close()
except Exception:
pass
time.sleep(RECONNECT_BACKOFF_S)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nbye")
#!/usr/bin/env -S /Users/v/orbitals/orbitals-godot/tools/.venv/bin/python
"""Dump bytes from a USB serial device.
Usage:
./serial_dump.py [device] [baud]
Defaults: /dev/cu.usbserial-5B1E0647691 @ 115200, 8N1.
Coalesces bytes that arrive together into one line, formatted as:
HH:MM:SS.mmm [N] hex bytes |printable ASCII|
Ctrl-C to quit.
"""
import sys
import time
import serial
DEFAULT_DEV = "/dev/cu.usbserial-5B1E0647691"
DEFAULT_BAUD = 115200
IDLE_FLUSH_S = 0.02 # flush a line if 20ms passes without new bytes
def printable(b: int) -> str:
return chr(b) if 0x20 <= b < 0x7f else "."
def main() -> None:
dev = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DEV
baud = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BAUD
print(f"Opening {dev} @ {baud} 8N1. Ctrl-C to quit.\n")
ser = serial.Serial(
port=dev,
baudrate=baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.01,
)
ser.reset_input_buffer()
total = 0
last_heartbeat = time.monotonic()
try:
while True:
chunk = ser.read(256)
now = time.monotonic()
if chunk:
total += len(chunk)
hex_part = " ".join(f"{b:02x}" for b in chunk)
ascii_part = "".join(printable(b) for b in chunk)
ts = time.strftime("%H:%M:%S")
ms = int((now % 1) * 1000)
print(f"{ts}.{ms:03d} [{len(chunk):3d}] {hex_part:<60} |{ascii_part}|", flush=True)
last_heartbeat = now
elif now - last_heartbeat >= 1.0:
ts = time.strftime("%H:%M:%S")
print(f"{ts} (listening, {total} bytes received so far)", flush=True)
last_heartbeat = now
except KeyboardInterrupt:
print("\nbye")
finally:
ser.close()
if __name__ == "__main__":
main()
#!/usr/bin/env -S /Users/v/orbitals/orbitals-godot/tools/.venv/bin/python
"""Parse and display the labeled control stream from the USB serial device.
Each line is 7 comma-separated ints, CRLF-terminated:
joy_x, joy_y, slider, button1, button2, encoder_press, encoder_pos
Joystick/slider raw range is 0-4096 (uncalibrated).
Buttons and encoder-press are 0 or 1000.
Encoder position is a signed integer count.
Usage:#!/usr/bin/env -S /Users/v/orbitals/orbitals-godot/tools/.venv/bin/python
"""Parse and display the labeled control stream from the USB serial device.
Each line is 7 comma-separated ints, CRLF-terminated:
joy_x, joy_y, slider, button1, button2, encoder_press, encoder_pos
Joystick/slider raw range is 0-4096 (uncalibrated).
Buttons and encoder-press are 0 or 1000.
Encoder position is a signed integer count.
Usage:
./serial_show.py [device] [baud]
"""
import sys
import time
import serial
DEFAULT_DEV = "/dev/cu.usbserial-5B1E0647691"
DEFAULT_BAUD = 115200
LABELS = ["joy_x", "joy_y", "slider", "button1", "button2", "enc_press", "enc_pos"]
CLEAR = "\033[H\033[2J"
HOME = "\033[H"
HIDE_CURSOR = "\033[?25l"
SHOW_CURSOR = "\033[?25h"
# Alternate screen buffer (what vim/less use) — keeps our output independent
# of the scrollback. Without this, a render taller than the terminal would
# scroll the top off, and the next \033[H wouldn't return to a useful row.
ENTER_ALT_BUFFER = "\033[?1049h"
EXIT_ALT_BUFFER = "\033[?1049l"
def bar(value: int, lo: int, hi: int, width: int = 30) -> str:
span = max(1, hi - lo)
frac = max(0.0, min(1.0, (value - lo) / span))
filled = int(round(frac * width))
return "[" + "#" * filled + "-" * (width - filled) + "]"
def render(
values: list[int],
mins: list[int],
maxs: list[int],
rate_hz: float,
measurements: list[int],
in_progress: int | None,
rest_samples: list[tuple[int, int]],
) -> str:
joy_x, joy_y, slider, b1, b2, ep, enc = values
lines = [
f" joy_x {joy_x:5d} {bar(joy_x, 0, 4096)} (seen {mins[0]}..{maxs[0]})",
f" joy_y {joy_y:5d} {bar(joy_y, 0, 4096)} (seen {mins[1]}..{maxs[1]})",
f" slider {slider:5d} {bar(slider, 0, 4096)} (seen {mins[2]}..{maxs[2]})",
"",
f" button1 {'ON ' if b1 else 'off':3s} [{b1}]",
f" button2 {'ON ' if b2 else 'off':3s} [{b2}]",
f" enc_press {'ON ' if ep else 'off':3s} [{ep}]",
f" enc_pos {enc:+d}",
"",
f" rate {rate_hz:5.1f} Hz",
"",
" --- encoder-per-turn measurement ---",
" Turn the encoder 360 deg, then press button1 to record.",
]
if in_progress is None:
lines.append(" current: (waiting for first button1 press to start)")
else:
lines.append(f" current: {in_progress:+5d} counts since last press")
if measurements:
recent = measurements[-10:]
avg = sum(measurements) / len(measurements)
lines.append(f" recorded: {len(measurements):d} turns avg = {avg:+.1f} counts/turn")
lines.append(" last: " + ", ".join(f"{m:+d}" for m in recent))
else:
lines.append(" recorded: none yet")
lines.append(" last: -")
lines += [
"",
" --- joystick deadzone characterization ---",
" Let go of the stick, press button2 to capture one rest sample.",
" Repeat 5-10 times (release stick differently each press to capture true variability).",
f" samples: {len(rest_samples)}",
]
if len(rest_samples) < 2:
lines.append(" range: (need at least 2 samples)")
lines.append(" recommended deadzone: --")
else:
xs = [s[0] for s in rest_samples]
ys = [s[1] for s in rest_samples]
x_min, x_max = min(xs), max(xs)
y_min, y_max = min(ys), max(ys)
x_center = (x_min + x_max) / 2
y_center = (y_min + y_max) / 2
x_dev = max(x_center - x_min, x_max - x_center)
y_dev = max(y_center - y_min, y_max - y_center)
# Normalize against the same half-range the autoload uses (2048).
x_dz_pct = (x_dev / 2048) * 100
y_dz_pct = (y_dev / 2048) * 100
worst = max(x_dz_pct, y_dz_pct)
# Suggest 1.5x the observed worst-axis spread, rounded up to the nearest %.
suggested = max(1, int(worst * 1.5 + 0.999))
lines.append(f" x: range [{x_min}..{x_max}] center≈{x_center:.0f} ±{x_dev:.0f} ({x_dz_pct:.1f}%)")
lines.append(f" y: range [{y_min}..{y_max}] center≈{y_center:.0f} ±{y_dev:.0f} ({y_dz_pct:.1f}%)")
lines.append(f" recommended deadzone: {suggested}% (worst-axis spread × 1.5)")
recent = rest_samples[-6:]
lines.append(" recent: " + " ".join(f"({x},{y})" for x, y in recent))
lines += ["", " Ctrl-C to quit."]
# \033[K clears to end of line so leftover chars from longer prior values vanish
return HOME + "\n".join(line + "\033[K" for line in lines) + "\033[J"
def main() -> None:
dev = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DEV
baud = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BAUD
ser = serial.Serial(
port=dev,
baudrate=baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1.0,
)
ser.reset_input_buffer()
# First read after open often catches a half-line — skip it
ser.readline()
mins = [4096] * 7
maxs = [0] * 7
mins[6] = maxs[6] = 0 # enc_pos starts unknown; we won't use these anyway
sys.stdout.write(ENTER_ALT_BUFFER + HIDE_CURSOR + CLEAR)
sys.stdout.flush()
last_t = time.monotonic()
rate_hz = 0.0
ALPHA = 0.1 # EWMA smoothing for rate
measurements: list[int] = []
b1_was_pressed = False
b2_was_pressed = False
enc_ref: int | None = None
rest_samples: list[tuple[int, int]] = []
try:
while True:
raw = ser.readline()
if not raw:
continue
line = raw.decode("ascii", errors="replace").strip()
parts = line.split(",")
if len(parts) != 7:
continue
try:
values = [int(p) for p in parts]
except ValueError:
continue
for i, v in enumerate(values):
if i < 6:
if v < mins[i]:
mins[i] = v
if v > maxs[i]:
maxs[i] = v
b1_now = values[3] >= 500
b2_now = values[4] >= 500
enc_now = values[6]
if b1_now and not b1_was_pressed:
if enc_ref is not None:
measurements.append(enc_now - enc_ref)
enc_ref = enc_now
b1_was_pressed = b1_now
if b2_now and not b2_was_pressed:
rest_samples.append((values[0], values[1]))
b2_was_pressed = b2_now
in_progress = (enc_now - enc_ref) if enc_ref is not None else None
now = time.monotonic()
dt = now - last_t
if dt > 0:
inst = 1.0 / dt
rate_hz = inst if rate_hz == 0 else (ALPHA * inst + (1 - ALPHA) * rate_hz)
last_t = now
sys.stdout.write(render(values, mins, maxs, rate_hz, measurements, in_progress, rest_samples))
sys.stdout.flush()
except KeyboardInterrupt:
pass
finally:
sys.stdout.write(SHOW_CURSOR + EXIT_ALT_BUFFER)
sys.stdout.flush()
ser.close()
if __name__ == "__main__":
main()
./serial_show.py [device] [baud]
"""
import sys
import time
import serial
DEFAULT_DEV = "/dev/cu.usbserial-5B1E0647691"
DEFAULT_BAUD = 115200
LABELS = ["joy_x", "joy_y", "slider", "button1", "button2", "enc_press", "enc_pos"]
CLEAR = "\033[H\033[2J"
HOME = "\033[H"
HIDE_CURSOR = "\033[?25l"
SHOW_CURSOR = "\033[?25h"
# Alternate screen buffer (what vim/less use) — keeps our output independent
# of the scrollback. Without this, a render taller than the terminal would
# scroll the top off, and the next \033[H wouldn't return to a useful row.
ENTER_ALT_BUFFER = "\033[?1049h"
EXIT_ALT_BUFFER = "\033[?1049l"
def bar(value: int, lo: int, hi: int, width: int = 30) -> str:
span = max(1, hi - lo)
frac = max(0.0, min(1.0, (value - lo) / span))
filled = int(round(frac * width))
return "[" + "#" * filled + "-" * (width - filled) + "]"
def render(
values: list[int],
mins: list[int],
maxs: list[int],
rate_hz: float,
measurements: list[int],
in_progress: int | None,
rest_samples: list[tuple[int, int]],
) -> str:
joy_x, joy_y, slider, b1, b2, ep, enc = values
lines = [
f" joy_x {joy_x:5d} {bar(joy_x, 0, 4096)} (seen {mins[0]}..{maxs[0]})",
f" joy_y {joy_y:5d} {bar(joy_y, 0, 4096)} (seen {mins[1]}..{maxs[1]})",
f" slider {slider:5d} {bar(slider, 0, 4096)} (seen {mins[2]}..{maxs[2]})",
"",
f" button1 {'ON ' if b1 else 'off':3s} [{b1}]",
f" button2 {'ON ' if b2 else 'off':3s} [{b2}]",
f" enc_press {'ON ' if ep else 'off':3s} [{ep}]",
f" enc_pos {enc:+d}",
"",
f" rate {rate_hz:5.1f} Hz",
"",
" --- encoder-per-turn measurement ---",
" Turn the encoder 360 deg, then press button1 to record.",
]
if in_progress is None:
lines.append(" current: (waiting for first button1 press to start)")
else:
lines.append(f" current: {in_progress:+5d} counts since last press")
if measurements:
recent = measurements[-10:]
avg = sum(measurements) / len(measurements)
lines.append(f" recorded: {len(measurements):d} turns avg = {avg:+.1f} counts/turn")
lines.append(" last: " + ", ".join(f"{m:+d}" for m in recent))
else:
lines.append(" recorded: none yet")
lines.append(" last: -")
lines += [
"",
" --- joystick deadzone characterization ---",
" Let go of the stick, press button2 to capture one rest sample.",
" Repeat 5-10 times (release stick differently each press to capture true variability).",
f" samples: {len(rest_samples)}",
]
if len(rest_samples) < 2:
lines.append(" range: (need at least 2 samples)")
lines.append(" recommended deadzone: --")
else:
xs = [s[0] for s in rest_samples]
ys = [s[1] for s in rest_samples]
x_min, x_max = min(xs), max(xs)
y_min, y_max = min(ys), max(ys)
x_center = (x_min + x_max) / 2
y_center = (y_min + y_max) / 2
x_dev = max(x_center - x_min, x_max - x_center)
y_dev = max(y_center - y_min, y_max - y_center)
# Normalize against the same half-range the autoload uses (2048).
x_dz_pct = (x_dev / 2048) * 100
y_dz_pct = (y_dev / 2048) * 100
worst = max(x_dz_pct, y_dz_pct)
# Suggest 1.5x the observed worst-axis spread, rounded up to the nearest %.
suggested = max(1, int(worst * 1.5 + 0.999))
lines.append(f" x: range [{x_min}..{x_max}] center≈{x_center:.0f} ±{x_dev:.0f} ({x_dz_pct:.1f}%)")
lines.append(f" y: range [{y_min}..{y_max}] center≈{y_center:.0f} ±{y_dev:.0f} ({y_dz_pct:.1f}%)")
lines.append(f" recommended deadzone: {suggested}% (worst-axis spread × 1.5)")
recent = rest_samples[-6:]
lines.append(" recent: " + " ".join(f"({x},{y})" for x, y in recent))
lines += ["", " Ctrl-C to quit."]
# \033[K clears to end of line so leftover chars from longer prior values vanish
return HOME + "\n".join(line + "\033[K" for line in lines) + "\033[J"
def main() -> None:
dev = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DEV
baud = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BAUD
ser = serial.Serial(
port=dev,
baudrate=baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1.0,
)
ser.reset_input_buffer()
# First read after open often catches a half-line — skip it
ser.readline()
mins = [4096] * 7
maxs = [0] * 7
mins[6] = maxs[6] = 0 # enc_pos starts unknown; we won't use these anyway
sys.stdout.write(ENTER_ALT_BUFFER + HIDE_CURSOR + CLEAR)
sys.stdout.flush()
last_t = time.monotonic()
rate_hz = 0.0
ALPHA = 0.1 # EWMA smoothing for rate
measurements: list[int] = []
b1_was_pressed = False
b2_was_pressed = False
enc_ref: int | None = None
rest_samples: list[tuple[int, int]] = []
try:
while True:
raw = ser.readline()
if not raw:
continue
line = raw.decode("ascii", errors="replace").strip()
parts = line.split(",")
if len(parts) != 7:
continue
try:
values = [int(p) for p in parts]
except ValueError:
continue
for i, v in enumerate(values):
if i < 6:
if v < mins[i]:
mins[i] = v
if v > maxs[i]:
maxs[i] = v
b1_now = values[3] >= 500
b2_now = values[4] >= 500
enc_now = values[6]
if b1_now and not b1_was_pressed:
if enc_ref is not None:
measurements.append(enc_now - enc_ref)
enc_ref = enc_now
b1_was_pressed = b1_now
if b2_now and not b2_was_pressed:
rest_samples.append((values[0], values[1]))
b2_was_pressed = b2_now
in_progress = (enc_now - enc_ref) if enc_ref is not None else None
now = time.monotonic()
dt = now - last_t
if dt > 0:
inst = 1.0 / dt
rate_hz = inst if rate_hz == 0 else (ALPHA * inst + (1 - ALPHA) * rate_hz)
last_t = now
sys.stdout.write(render(values, mins, maxs, rate_hz, measurements, in_progress, rest_samples))
sys.stdout.flush()
except KeyboardInterrupt:
pass
finally:
sys.stdout.write(SHOW_CURSOR + EXIT_ALT_BUFFER)
sys.stdout.flush()
ser.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment