Created
June 12, 2026 11:34
-
-
Save v21/293f1533f20636a3d624fd38f40c5025 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S /Users/v/orbitals/orbitals-godot/tools/.venv/bin/python | |
| """Forward the USB controller's serial lines as UDP packets to the Godot | |
| editor's SerialController autoload. | |
| The web export reads Web Serial directly and does not need this bridge. | |
| In the editor / desktop builds, run this alongside Godot: | |
| ./tools/serial_bridge.py | |
| It auto-reopens the serial port on disconnect and just blasts UDP packets; | |
| the Godot side derives its connected state from packet timing. Don't run | |
| two instances — Godot will receive duplicated updates. | |
| Usage: | |
| ./serial_bridge.py [device] [baud] [udp_port] | |
| Defaults: /dev/cu.usbserial-5B1E0647691 @ 115200 → udp://127.0.0.1:7117 | |
| """ | |
| import socket | |
| import sys | |
| import time | |
| import serial | |
| DEFAULT_DEV = "/dev/cu.usbserial-5B1E0647691" | |
| DEFAULT_BAUD = 115200 | |
| DEFAULT_UDP_PORT = 7117 | |
| RECONNECT_BACKOFF_S = 0.5 | |
| def open_serial(dev: str, baud: int) -> serial.Serial: | |
| return serial.Serial( | |
| port=dev, | |
| baudrate=baud, | |
| bytesize=serial.EIGHTBITS, | |
| parity=serial.PARITY_NONE, | |
| stopbits=serial.STOPBITS_ONE, | |
| timeout=1.0, | |
| ) | |
| def main() -> None: | |
| dev = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DEV | |
| baud = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BAUD | |
| udp_port = int(sys.argv[3]) if len(sys.argv) > 3 else DEFAULT_UDP_PORT | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| dest = ("127.0.0.1", udp_port) | |
| print(f"Bridging {dev} @ {baud} -> udp://127.0.0.1:{udp_port}. Ctrl-C to quit.", flush=True) | |
| sent = 0 | |
| last_status_t = time.monotonic() | |
| while True: | |
| try: | |
| ser = open_serial(dev, baud) | |
| except (serial.SerialException, OSError) as e: | |
| print(f" serial open failed: {e}; retrying in {RECONNECT_BACKOFF_S}s", flush=True) | |
| time.sleep(RECONNECT_BACKOFF_S) | |
| continue | |
| print(f" opened {dev}", flush=True) | |
| ser.reset_input_buffer() | |
| ser.readline() # discard the first (likely partial) line | |
| try: | |
| while True: | |
| raw = ser.readline() | |
| if not raw: | |
| continue | |
| line = raw.strip() | |
| if not line: | |
| continue | |
| sock.sendto(line, dest) | |
| sent += 1 | |
| now = time.monotonic() | |
| if now - last_status_t > 5.0: | |
| print(f" forwarded {sent} packets", flush=True) | |
| last_status_t = now | |
| except (serial.SerialException, OSError) as e: | |
| print(f" serial read error: {e}; reopening", flush=True) | |
| try: | |
| ser.close() | |
| except Exception: | |
| pass | |
| time.sleep(RECONNECT_BACKOFF_S) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\nbye") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S /Users/v/orbitals/orbitals-godot/tools/.venv/bin/python | |
| """Dump bytes from a USB serial device. | |
| Usage: | |
| ./serial_dump.py [device] [baud] | |
| Defaults: /dev/cu.usbserial-5B1E0647691 @ 115200, 8N1. | |
| Coalesces bytes that arrive together into one line, formatted as: | |
| HH:MM:SS.mmm [N] hex bytes |printable ASCII| | |
| Ctrl-C to quit. | |
| """ | |
| import sys | |
| import time | |
| import serial | |
| DEFAULT_DEV = "/dev/cu.usbserial-5B1E0647691" | |
| DEFAULT_BAUD = 115200 | |
| IDLE_FLUSH_S = 0.02 # flush a line if 20ms passes without new bytes | |
| def printable(b: int) -> str: | |
| return chr(b) if 0x20 <= b < 0x7f else "." | |
| def main() -> None: | |
| dev = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DEV | |
| baud = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BAUD | |
| print(f"Opening {dev} @ {baud} 8N1. Ctrl-C to quit.\n") | |
| ser = serial.Serial( | |
| port=dev, | |
| baudrate=baud, | |
| bytesize=serial.EIGHTBITS, | |
| parity=serial.PARITY_NONE, | |
| stopbits=serial.STOPBITS_ONE, | |
| timeout=0.01, | |
| ) | |
| ser.reset_input_buffer() | |
| total = 0 | |
| last_heartbeat = time.monotonic() | |
| try: | |
| while True: | |
| chunk = ser.read(256) | |
| now = time.monotonic() | |
| if chunk: | |
| total += len(chunk) | |
| hex_part = " ".join(f"{b:02x}" for b in chunk) | |
| ascii_part = "".join(printable(b) for b in chunk) | |
| ts = time.strftime("%H:%M:%S") | |
| ms = int((now % 1) * 1000) | |
| print(f"{ts}.{ms:03d} [{len(chunk):3d}] {hex_part:<60} |{ascii_part}|", flush=True) | |
| last_heartbeat = now | |
| elif now - last_heartbeat >= 1.0: | |
| ts = time.strftime("%H:%M:%S") | |
| print(f"{ts} (listening, {total} bytes received so far)", flush=True) | |
| last_heartbeat = now | |
| except KeyboardInterrupt: | |
| print("\nbye") | |
| finally: | |
| ser.close() | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S /Users/v/orbitals/orbitals-godot/tools/.venv/bin/python | |
| """Parse and display the labeled control stream from the USB serial device. | |
| Each line is 7 comma-separated ints, CRLF-terminated: | |
| joy_x, joy_y, slider, button1, button2, encoder_press, encoder_pos | |
| Joystick/slider raw range is 0-4096 (uncalibrated). | |
| Buttons and encoder-press are 0 or 1000. | |
| Encoder position is a signed integer count. | |
| Usage:#!/usr/bin/env -S /Users/v/orbitals/orbitals-godot/tools/.venv/bin/python | |
| """Parse and display the labeled control stream from the USB serial device. | |
| Each line is 7 comma-separated ints, CRLF-terminated: | |
| joy_x, joy_y, slider, button1, button2, encoder_press, encoder_pos | |
| Joystick/slider raw range is 0-4096 (uncalibrated). | |
| Buttons and encoder-press are 0 or 1000. | |
| Encoder position is a signed integer count. | |
| Usage: | |
| ./serial_show.py [device] [baud] | |
| """ | |
| import sys | |
| import time | |
| import serial | |
| DEFAULT_DEV = "/dev/cu.usbserial-5B1E0647691" | |
| DEFAULT_BAUD = 115200 | |
| LABELS = ["joy_x", "joy_y", "slider", "button1", "button2", "enc_press", "enc_pos"] | |
| CLEAR = "\033[H\033[2J" | |
| HOME = "\033[H" | |
| HIDE_CURSOR = "\033[?25l" | |
| SHOW_CURSOR = "\033[?25h" | |
| # Alternate screen buffer (what vim/less use) — keeps our output independent | |
| # of the scrollback. Without this, a render taller than the terminal would | |
| # scroll the top off, and the next \033[H wouldn't return to a useful row. | |
| ENTER_ALT_BUFFER = "\033[?1049h" | |
| EXIT_ALT_BUFFER = "\033[?1049l" | |
| def bar(value: int, lo: int, hi: int, width: int = 30) -> str: | |
| span = max(1, hi - lo) | |
| frac = max(0.0, min(1.0, (value - lo) / span)) | |
| filled = int(round(frac * width)) | |
| return "[" + "#" * filled + "-" * (width - filled) + "]" | |
| def render( | |
| values: list[int], | |
| mins: list[int], | |
| maxs: list[int], | |
| rate_hz: float, | |
| measurements: list[int], | |
| in_progress: int | None, | |
| rest_samples: list[tuple[int, int]], | |
| ) -> str: | |
| joy_x, joy_y, slider, b1, b2, ep, enc = values | |
| lines = [ | |
| f" joy_x {joy_x:5d} {bar(joy_x, 0, 4096)} (seen {mins[0]}..{maxs[0]})", | |
| f" joy_y {joy_y:5d} {bar(joy_y, 0, 4096)} (seen {mins[1]}..{maxs[1]})", | |
| f" slider {slider:5d} {bar(slider, 0, 4096)} (seen {mins[2]}..{maxs[2]})", | |
| "", | |
| f" button1 {'ON ' if b1 else 'off':3s} [{b1}]", | |
| f" button2 {'ON ' if b2 else 'off':3s} [{b2}]", | |
| f" enc_press {'ON ' if ep else 'off':3s} [{ep}]", | |
| f" enc_pos {enc:+d}", | |
| "", | |
| f" rate {rate_hz:5.1f} Hz", | |
| "", | |
| " --- encoder-per-turn measurement ---", | |
| " Turn the encoder 360 deg, then press button1 to record.", | |
| ] | |
| if in_progress is None: | |
| lines.append(" current: (waiting for first button1 press to start)") | |
| else: | |
| lines.append(f" current: {in_progress:+5d} counts since last press") | |
| if measurements: | |
| recent = measurements[-10:] | |
| avg = sum(measurements) / len(measurements) | |
| lines.append(f" recorded: {len(measurements):d} turns avg = {avg:+.1f} counts/turn") | |
| lines.append(" last: " + ", ".join(f"{m:+d}" for m in recent)) | |
| else: | |
| lines.append(" recorded: none yet") | |
| lines.append(" last: -") | |
| lines += [ | |
| "", | |
| " --- joystick deadzone characterization ---", | |
| " Let go of the stick, press button2 to capture one rest sample.", | |
| " Repeat 5-10 times (release stick differently each press to capture true variability).", | |
| f" samples: {len(rest_samples)}", | |
| ] | |
| if len(rest_samples) < 2: | |
| lines.append(" range: (need at least 2 samples)") | |
| lines.append(" recommended deadzone: --") | |
| else: | |
| xs = [s[0] for s in rest_samples] | |
| ys = [s[1] for s in rest_samples] | |
| x_min, x_max = min(xs), max(xs) | |
| y_min, y_max = min(ys), max(ys) | |
| x_center = (x_min + x_max) / 2 | |
| y_center = (y_min + y_max) / 2 | |
| x_dev = max(x_center - x_min, x_max - x_center) | |
| y_dev = max(y_center - y_min, y_max - y_center) | |
| # Normalize against the same half-range the autoload uses (2048). | |
| x_dz_pct = (x_dev / 2048) * 100 | |
| y_dz_pct = (y_dev / 2048) * 100 | |
| worst = max(x_dz_pct, y_dz_pct) | |
| # Suggest 1.5x the observed worst-axis spread, rounded up to the nearest %. | |
| suggested = max(1, int(worst * 1.5 + 0.999)) | |
| lines.append(f" x: range [{x_min}..{x_max}] center≈{x_center:.0f} ±{x_dev:.0f} ({x_dz_pct:.1f}%)") | |
| lines.append(f" y: range [{y_min}..{y_max}] center≈{y_center:.0f} ±{y_dev:.0f} ({y_dz_pct:.1f}%)") | |
| lines.append(f" recommended deadzone: {suggested}% (worst-axis spread × 1.5)") | |
| recent = rest_samples[-6:] | |
| lines.append(" recent: " + " ".join(f"({x},{y})" for x, y in recent)) | |
| lines += ["", " Ctrl-C to quit."] | |
| # \033[K clears to end of line so leftover chars from longer prior values vanish | |
| return HOME + "\n".join(line + "\033[K" for line in lines) + "\033[J" | |
| def main() -> None: | |
| dev = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DEV | |
| baud = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BAUD | |
| ser = serial.Serial( | |
| port=dev, | |
| baudrate=baud, | |
| bytesize=serial.EIGHTBITS, | |
| parity=serial.PARITY_NONE, | |
| stopbits=serial.STOPBITS_ONE, | |
| timeout=1.0, | |
| ) | |
| ser.reset_input_buffer() | |
| # First read after open often catches a half-line — skip it | |
| ser.readline() | |
| mins = [4096] * 7 | |
| maxs = [0] * 7 | |
| mins[6] = maxs[6] = 0 # enc_pos starts unknown; we won't use these anyway | |
| sys.stdout.write(ENTER_ALT_BUFFER + HIDE_CURSOR + CLEAR) | |
| sys.stdout.flush() | |
| last_t = time.monotonic() | |
| rate_hz = 0.0 | |
| ALPHA = 0.1 # EWMA smoothing for rate | |
| measurements: list[int] = [] | |
| b1_was_pressed = False | |
| b2_was_pressed = False | |
| enc_ref: int | None = None | |
| rest_samples: list[tuple[int, int]] = [] | |
| try: | |
| while True: | |
| raw = ser.readline() | |
| if not raw: | |
| continue | |
| line = raw.decode("ascii", errors="replace").strip() | |
| parts = line.split(",") | |
| if len(parts) != 7: | |
| continue | |
| try: | |
| values = [int(p) for p in parts] | |
| except ValueError: | |
| continue | |
| for i, v in enumerate(values): | |
| if i < 6: | |
| if v < mins[i]: | |
| mins[i] = v | |
| if v > maxs[i]: | |
| maxs[i] = v | |
| b1_now = values[3] >= 500 | |
| b2_now = values[4] >= 500 | |
| enc_now = values[6] | |
| if b1_now and not b1_was_pressed: | |
| if enc_ref is not None: | |
| measurements.append(enc_now - enc_ref) | |
| enc_ref = enc_now | |
| b1_was_pressed = b1_now | |
| if b2_now and not b2_was_pressed: | |
| rest_samples.append((values[0], values[1])) | |
| b2_was_pressed = b2_now | |
| in_progress = (enc_now - enc_ref) if enc_ref is not None else None | |
| now = time.monotonic() | |
| dt = now - last_t | |
| if dt > 0: | |
| inst = 1.0 / dt | |
| rate_hz = inst if rate_hz == 0 else (ALPHA * inst + (1 - ALPHA) * rate_hz) | |
| last_t = now | |
| sys.stdout.write(render(values, mins, maxs, rate_hz, measurements, in_progress, rest_samples)) | |
| sys.stdout.flush() | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| sys.stdout.write(SHOW_CURSOR + EXIT_ALT_BUFFER) | |
| sys.stdout.flush() | |
| ser.close() | |
| if __name__ == "__main__": | |
| main() | |
| ./serial_show.py [device] [baud] | |
| """ | |
| import sys | |
| import time | |
| import serial | |
| DEFAULT_DEV = "/dev/cu.usbserial-5B1E0647691" | |
| DEFAULT_BAUD = 115200 | |
| LABELS = ["joy_x", "joy_y", "slider", "button1", "button2", "enc_press", "enc_pos"] | |
| CLEAR = "\033[H\033[2J" | |
| HOME = "\033[H" | |
| HIDE_CURSOR = "\033[?25l" | |
| SHOW_CURSOR = "\033[?25h" | |
| # Alternate screen buffer (what vim/less use) — keeps our output independent | |
| # of the scrollback. Without this, a render taller than the terminal would | |
| # scroll the top off, and the next \033[H wouldn't return to a useful row. | |
| ENTER_ALT_BUFFER = "\033[?1049h" | |
| EXIT_ALT_BUFFER = "\033[?1049l" | |
| def bar(value: int, lo: int, hi: int, width: int = 30) -> str: | |
| span = max(1, hi - lo) | |
| frac = max(0.0, min(1.0, (value - lo) / span)) | |
| filled = int(round(frac * width)) | |
| return "[" + "#" * filled + "-" * (width - filled) + "]" | |
| def render( | |
| values: list[int], | |
| mins: list[int], | |
| maxs: list[int], | |
| rate_hz: float, | |
| measurements: list[int], | |
| in_progress: int | None, | |
| rest_samples: list[tuple[int, int]], | |
| ) -> str: | |
| joy_x, joy_y, slider, b1, b2, ep, enc = values | |
| lines = [ | |
| f" joy_x {joy_x:5d} {bar(joy_x, 0, 4096)} (seen {mins[0]}..{maxs[0]})", | |
| f" joy_y {joy_y:5d} {bar(joy_y, 0, 4096)} (seen {mins[1]}..{maxs[1]})", | |
| f" slider {slider:5d} {bar(slider, 0, 4096)} (seen {mins[2]}..{maxs[2]})", | |
| "", | |
| f" button1 {'ON ' if b1 else 'off':3s} [{b1}]", | |
| f" button2 {'ON ' if b2 else 'off':3s} [{b2}]", | |
| f" enc_press {'ON ' if ep else 'off':3s} [{ep}]", | |
| f" enc_pos {enc:+d}", | |
| "", | |
| f" rate {rate_hz:5.1f} Hz", | |
| "", | |
| " --- encoder-per-turn measurement ---", | |
| " Turn the encoder 360 deg, then press button1 to record.", | |
| ] | |
| if in_progress is None: | |
| lines.append(" current: (waiting for first button1 press to start)") | |
| else: | |
| lines.append(f" current: {in_progress:+5d} counts since last press") | |
| if measurements: | |
| recent = measurements[-10:] | |
| avg = sum(measurements) / len(measurements) | |
| lines.append(f" recorded: {len(measurements):d} turns avg = {avg:+.1f} counts/turn") | |
| lines.append(" last: " + ", ".join(f"{m:+d}" for m in recent)) | |
| else: | |
| lines.append(" recorded: none yet") | |
| lines.append(" last: -") | |
| lines += [ | |
| "", | |
| " --- joystick deadzone characterization ---", | |
| " Let go of the stick, press button2 to capture one rest sample.", | |
| " Repeat 5-10 times (release stick differently each press to capture true variability).", | |
| f" samples: {len(rest_samples)}", | |
| ] | |
| if len(rest_samples) < 2: | |
| lines.append(" range: (need at least 2 samples)") | |
| lines.append(" recommended deadzone: --") | |
| else: | |
| xs = [s[0] for s in rest_samples] | |
| ys = [s[1] for s in rest_samples] | |
| x_min, x_max = min(xs), max(xs) | |
| y_min, y_max = min(ys), max(ys) | |
| x_center = (x_min + x_max) / 2 | |
| y_center = (y_min + y_max) / 2 | |
| x_dev = max(x_center - x_min, x_max - x_center) | |
| y_dev = max(y_center - y_min, y_max - y_center) | |
| # Normalize against the same half-range the autoload uses (2048). | |
| x_dz_pct = (x_dev / 2048) * 100 | |
| y_dz_pct = (y_dev / 2048) * 100 | |
| worst = max(x_dz_pct, y_dz_pct) | |
| # Suggest 1.5x the observed worst-axis spread, rounded up to the nearest %. | |
| suggested = max(1, int(worst * 1.5 + 0.999)) | |
| lines.append(f" x: range [{x_min}..{x_max}] center≈{x_center:.0f} ±{x_dev:.0f} ({x_dz_pct:.1f}%)") | |
| lines.append(f" y: range [{y_min}..{y_max}] center≈{y_center:.0f} ±{y_dev:.0f} ({y_dz_pct:.1f}%)") | |
| lines.append(f" recommended deadzone: {suggested}% (worst-axis spread × 1.5)") | |
| recent = rest_samples[-6:] | |
| lines.append(" recent: " + " ".join(f"({x},{y})" for x, y in recent)) | |
| lines += ["", " Ctrl-C to quit."] | |
| # \033[K clears to end of line so leftover chars from longer prior values vanish | |
| return HOME + "\n".join(line + "\033[K" for line in lines) + "\033[J" | |
| def main() -> None: | |
| dev = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_DEV | |
| baud = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BAUD | |
| ser = serial.Serial( | |
| port=dev, | |
| baudrate=baud, | |
| bytesize=serial.EIGHTBITS, | |
| parity=serial.PARITY_NONE, | |
| stopbits=serial.STOPBITS_ONE, | |
| timeout=1.0, | |
| ) | |
| ser.reset_input_buffer() | |
| # First read after open often catches a half-line — skip it | |
| ser.readline() | |
| mins = [4096] * 7 | |
| maxs = [0] * 7 | |
| mins[6] = maxs[6] = 0 # enc_pos starts unknown; we won't use these anyway | |
| sys.stdout.write(ENTER_ALT_BUFFER + HIDE_CURSOR + CLEAR) | |
| sys.stdout.flush() | |
| last_t = time.monotonic() | |
| rate_hz = 0.0 | |
| ALPHA = 0.1 # EWMA smoothing for rate | |
| measurements: list[int] = [] | |
| b1_was_pressed = False | |
| b2_was_pressed = False | |
| enc_ref: int | None = None | |
| rest_samples: list[tuple[int, int]] = [] | |
| try: | |
| while True: | |
| raw = ser.readline() | |
| if not raw: | |
| continue | |
| line = raw.decode("ascii", errors="replace").strip() | |
| parts = line.split(",") | |
| if len(parts) != 7: | |
| continue | |
| try: | |
| values = [int(p) for p in parts] | |
| except ValueError: | |
| continue | |
| for i, v in enumerate(values): | |
| if i < 6: | |
| if v < mins[i]: | |
| mins[i] = v | |
| if v > maxs[i]: | |
| maxs[i] = v | |
| b1_now = values[3] >= 500 | |
| b2_now = values[4] >= 500 | |
| enc_now = values[6] | |
| if b1_now and not b1_was_pressed: | |
| if enc_ref is not None: | |
| measurements.append(enc_now - enc_ref) | |
| enc_ref = enc_now | |
| b1_was_pressed = b1_now | |
| if b2_now and not b2_was_pressed: | |
| rest_samples.append((values[0], values[1])) | |
| b2_was_pressed = b2_now | |
| in_progress = (enc_now - enc_ref) if enc_ref is not None else None | |
| now = time.monotonic() | |
| dt = now - last_t | |
| if dt > 0: | |
| inst = 1.0 / dt | |
| rate_hz = inst if rate_hz == 0 else (ALPHA * inst + (1 - ALPHA) * rate_hz) | |
| last_t = now | |
| sys.stdout.write(render(values, mins, maxs, rate_hz, measurements, in_progress, rest_samples)) | |
| sys.stdout.flush() | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| sys.stdout.write(SHOW_CURSOR + EXIT_ALT_BUFFER) | |
| sys.stdout.flush() | |
| ser.close() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment