Created
April 16, 2026 13:45
-
-
Save smeschke/6a463086c284cda264b52d820587149b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| OKdo LiDAR HAT (LD06) - Fullscreen OpenCV live view. | |
| Press 'q' or Escape to quit. | |
| """ | |
| import sys | |
| import struct | |
| import numpy as np | |
| import cv2 | |
| import serial | |
| import tkinter as tk | |
| # --- Settings --- | |
| SERIAL_PORT = "/dev/ttyAMA0" | |
| BAUD_RATE = 230400 | |
| MAX_RANGE = 4.0 # metres — determines zoom level | |
| PACKETS_PER_FRAME = 40 # ~1 full rotation before redraw | |
| MARKER_RADIUS = 3 # pixel radius for each point | |
| # --- Protocol constants --- | |
| PACKET_LENGTH = 47 | |
| MEASUREMENT_COUNT = 12 | |
| MSG_FORMAT = "<xBHH" + "HB" * MEASUREMENT_COUNT + "HHB" | |
| def parse_packet(data): | |
| """Parse one LD06 packet → list of (angle_deg, distance_m).""" | |
| try: | |
| unpacked = struct.unpack(MSG_FORMAT, data) | |
| except struct.error: | |
| return [] | |
| start_angle = unpacked[2] / 100.0 | |
| stop_angle = unpacked[-3] / 100.0 | |
| if stop_angle < start_angle: | |
| stop_angle += 360.0 | |
| step = (stop_angle - start_angle) / (MEASUREMENT_COUNT - 1) | |
| raw = unpacked[3:-3] | |
| points = [] | |
| for i in range(MEASUREMENT_COUNT): | |
| distance_mm = raw[i * 2] | |
| confidence = raw[i * 2 + 1] | |
| if distance_mm == 0 or confidence < 10: | |
| continue | |
| points.append((start_angle + step * i, distance_mm / 1000.0)) | |
| return points | |
| def read_packet(ser): | |
| """Block until a valid packet arrives on the serial port.""" | |
| while True: | |
| if ser.read(1) == b'\x54': | |
| if ser.read(1) == b'\x2C': | |
| rest = ser.read(PACKET_LENGTH - 2) | |
| if len(rest) == PACKET_LENGTH - 2: | |
| return b'\x54\x2C' + rest | |
| def polar_to_screen(angles_deg, dists_m, cx, cy, scale): | |
| """ | |
| Convert arrays of (angle_deg, dist_m) to integer screen (x, y) pairs. | |
| Angle 0 = up (North), increases clockwise — matches the sensor convention. | |
| """ | |
| rad = np.radians(angles_deg) | |
| sx = (cx - dists_m * np.sin(rad) * scale).astype(int) | |
| sy = (cy + dists_m * np.cos(rad) * scale).astype(int) | |
| return sx, sy | |
| def run_live(): | |
| print(f"Opening {SERIAL_PORT} at {BAUD_RATE} baud — press q or Esc to quit.") | |
| try: | |
| ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1.0) | |
| except serial.SerialException as e: | |
| print(f"ERROR: {e}") | |
| print("Try changing SERIAL_PORT to /dev/ttyS0") | |
| return | |
| # Get actual screen resolution before creating the window | |
| _root = tk.Tk() | |
| _root.withdraw() | |
| w = _root.winfo_screenwidth() | |
| h = _root.winfo_screenheight() | |
| _root.destroy() | |
| win = "LiDAR" | |
| cv2.namedWindow(win, cv2.WINDOW_NORMAL) | |
| cv2.setWindowProperty(win, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) | |
| cx, cy = w // 2, h // 2 | |
| scale = min(cx, cy) / MAX_RANGE | |
| angles_buf = [] | |
| dists_buf = [] | |
| while True: | |
| # Collect one frame's worth of packets | |
| for _ in range(PACKETS_PER_FRAME): | |
| packet = read_packet(ser) | |
| for angle_deg, dist_m in parse_packet(packet): | |
| angles_buf.append(angle_deg) | |
| dists_buf.append(dist_m) | |
| max_pts = PACKETS_PER_FRAME * MEASUREMENT_COUNT | |
| angles_buf = angles_buf[-max_pts:] | |
| dists_buf = dists_buf[-max_pts:] | |
| # Build the frame | |
| frame = np.zeros((h, w, 3), dtype=np.uint8) | |
| if angles_buf: | |
| a = np.array(angles_buf) | |
| d = np.array(dists_buf) | |
| sx, sy = polar_to_screen(a, d, cx, cy, scale) | |
| # Keep only points that fall within the frame | |
| mask = (sx >= 0) & (sx < w) & (sy >= 0) & (sy < h) | |
| for x, y in zip(sx[mask], sy[mask]): | |
| cv2.circle(frame, (x, y), MARKER_RADIUS, (0, 255, 0), -1) | |
| cv2.imshow(win, frame) | |
| key = cv2.waitKey(1) & 0xFF | |
| if key in (ord('q'), 27): # 27 = Escape | |
| break | |
| ser.close() | |
| cv2.destroyAllWindows() | |
| if __name__ == "__main__": | |
| run_live() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment