Created
April 1, 2025 01:17
-
-
Save jlgabriel/c5653ba39c33544763962918591d73a1 to your computer and use it in GitHub Desktop.
Aerofly FS4 ForeFlight Protocol to Shirley Bridge with GUI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
################################################################################## | |
# ForeFlight Protocol to Shirley Bridge with GUI | |
# Developed to connect Aerofly FS4 Flight Simulator from IPACS (TM) to Fly Shirley | |
# Reference: https://support.foreflight.com/hc/en-us/articles/204115005 | |
################################################################################## | |
import asyncio | |
import socket | |
import json | |
import time | |
import tkinter as tk | |
from tkinter import ttk, font as tkfont | |
import threading | |
from dataclasses import dataclass | |
import sys | |
from typing import Optional, Set, Union | |
# Import websockets with correct types | |
import websockets | |
import websockets.exceptions | |
################################################################################ | |
# Constants & Utilities | |
################################################################################ | |
METERS_TO_FEET = 3.28084 | |
MPS_TO_KTS = 1.94384 | |
################################################################################ | |
# Data Classes for Parsed ForeFlight Messages | |
################################################################################ | |
@dataclass | |
class XGPSData: | |
""" | |
Represents ForeFlight XGPS data. | |
e.g. XGPS<sim_name>,<longitude>,<latitude>,<altitude_msl_meters>,<track_true_north>,<groundspeed_m/s> | |
""" | |
sim_name: str | |
longitude: float | |
latitude: float | |
alt_msl_meters: float | |
track_deg: float | |
ground_speed_mps: float | |
@dataclass | |
class XATTData: | |
""" | |
Represents ForeFlight XATT data. | |
e.g. XATT<sim_name>,<true_heading>,<pitch_degrees>,<roll_degrees> | |
""" | |
sim_name: str | |
heading_deg: float | |
pitch_deg: float | |
roll_deg: float | |
@dataclass | |
class UnknownData: | |
""" | |
Returned if we fail to parse or data type is not recognized. | |
""" | |
raw_line: str | |
################################################################################ | |
# Main SimData Model (for Shirley) | |
################################################################################ | |
class SimData: | |
""" | |
Holds references to the latest XGPSData and XATTData from the simulator | |
and transforms them into the format Shirley expects. | |
""" | |
def __init__(self): | |
# Store references to the most recent XGPSData and XATTData | |
# Initialize them to defaults | |
self.xgps: Optional[XGPSData] = None | |
self.xatt: Optional[XATTData] = None | |
# For concurrency control | |
self._lock = asyncio.Lock() | |
async def update_from_xgps(self, xgps: XGPSData): | |
"""Replace the reference to the latest XGPSData.""" | |
async with self._lock: | |
self.xgps = xgps | |
async def update_from_xatt(self, xatt: XATTData): | |
"""Replace the reference to the latest XATTData.""" | |
async with self._lock: | |
self.xatt = xatt | |
async def get_data_snapshot(self): | |
""" | |
Produce a dictionary of position and attitude in the format Shirley expects. | |
We do the unit conversions (e.g. M -> ft) here, using the references to XGPSData and XATTData. | |
""" | |
async with self._lock: | |
# Local copies for clarity | |
xgps = self.xgps | |
xatt = self.xatt | |
# Defaults in case we have no data yet | |
latitude = 0.0 | |
longitude = 0.0 | |
msl_alt_ft = 0.0 | |
ground_speed_kts = 0.0 | |
heading_deg = 0.0 | |
pitch_deg = 0.0 | |
roll_deg = 0.0 | |
if xgps: | |
latitude = xgps.latitude | |
longitude = xgps.longitude | |
msl_alt_ft = xgps.alt_msl_meters * METERS_TO_FEET | |
ground_speed_kts = xgps.ground_speed_mps * MPS_TO_KTS | |
heading_deg = xgps.track_deg % 360.0 # if no XATT is available | |
if xatt: | |
# If we do have XATT, prefer that heading | |
heading_deg = xatt.heading_deg % 360.0 | |
pitch_deg = xatt.pitch_deg | |
roll_deg = xatt.roll_deg | |
# Build final object for Shirley | |
return { | |
"position": { | |
"latitudeDeg": latitude, | |
"longitudeDeg": longitude, | |
"mslAltitudeFt": msl_alt_ft, | |
"gpsGroundSpeedKts": ground_speed_kts, | |
}, | |
"attitude": { | |
"rollAngleDegRight": roll_deg, | |
"pitchAngleDegUp": pitch_deg, | |
"trueHeadingDeg": heading_deg, | |
} | |
} | |
################################################################################ | |
# ForeFlight Data Parser | |
################################################################################ | |
class ForeFlightParser: | |
""" | |
Parses strings in ForeFlight's XGPS / XATT formats, | |
returning typed objects: XGPSData, XATTData, or UnknownData. | |
""" | |
@staticmethod | |
def parse_line(line: str): | |
""" | |
Identify the data type (XGPS, XATT) and parse accordingly. | |
""" | |
line = line.strip() | |
if line.startswith("XGPS"): | |
return ForeFlightParser._parse_xgps(line) | |
elif line.startswith("XATT"): | |
return ForeFlightParser._parse_xatt(line) | |
else: | |
return UnknownData(raw_line=line) | |
@staticmethod | |
def _parse_xgps(line: str) -> Union[XGPSData, UnknownData]: | |
""" | |
Example XGPS line: | |
XGPSMySim,-80.11,34.55,1200.1,359.05,55.6 | |
=> XGPS<sim_name>,<longitude>,<latitude>,<alt_msl_meters>,<track_deg_true>,<groundspeed_m/s> | |
""" | |
try: | |
raw = line[4:] # remove 'XGPS' | |
parts = raw.split(",") | |
sim_name = parts[0].strip() | |
longitude = float(parts[1]) | |
latitude = float(parts[2]) | |
alt_msl_meters = float(parts[3]) | |
track_deg = float(parts[4]) | |
ground_speed_mps = float(parts[5]) | |
return XGPSData( | |
sim_name=sim_name, | |
longitude=longitude, | |
latitude=latitude, | |
alt_msl_meters=alt_msl_meters, | |
track_deg=track_deg, | |
ground_speed_mps=ground_speed_mps | |
) | |
except (ValueError, IndexError): | |
return UnknownData(raw_line=line) | |
@staticmethod | |
def _parse_xatt(line: str) -> Union[XATTData, UnknownData]: | |
""" | |
Example XATT line: | |
XATTMySim,180.2,0.1,0.2 | |
=> XATT<sim_name>,<true_heading_deg>,<pitch_deg>,<roll_deg> | |
""" | |
try: | |
raw = line[4:] | |
parts = raw.split(",") | |
sim_name = parts[0].strip() | |
heading_deg = float(parts[1]) | |
pitch_deg = float(parts[2]) | |
roll_deg = float(parts[3]) | |
return XATTData( | |
sim_name=sim_name, | |
heading_deg=heading_deg, | |
pitch_deg=pitch_deg, | |
roll_deg=roll_deg | |
) | |
except (ValueError, IndexError): | |
return UnknownData(raw_line=line) | |
################################################################################ | |
# ForeFlight UDP Server | |
################################################################################ | |
class ForeFlightUDPServer: | |
""" | |
Listens for ForeFlight-compatible data on UDP port 49002. | |
Parses lines and updates the shared SimData. | |
""" | |
def __init__(self, sim_data: SimData, parser: ForeFlightParser, port: int = 49002): | |
self.sim_data = sim_data | |
self.parser = parser | |
self.port = port | |
self.socket = None | |
self.lastDataReceivedTime = None | |
async def run(self): | |
""" | |
Main loop: bind to UDP port, receive data, parse, update sim_data. | |
We'll do a blocking recv in a background thread so we don't block the event loop. | |
""" | |
# Create UDP socket | |
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# Enable broadcast & address reuse | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
# Set blocking to True for our simple loop | |
self.socket.setblocking(True) | |
# Bind to all interfaces on the specified port: | |
self.socket.bind(('', self.port)) | |
print(f"[ForeFlightUDPServer] Listening on UDP port {self.port}...") | |
try: | |
while True: | |
try: | |
# When the socket is closed, this will raise an exception | |
data, _ = await asyncio.to_thread(self.socket.recvfrom, 1024) | |
line = data.decode('utf-8', errors='ignore').strip() | |
parsed_obj = self.parser.parse_line(line) | |
# If we haven't received data in a while, print a message | |
if self.lastDataReceivedTime is None or \ | |
time.time() - self.lastDataReceivedTime > 5.0: | |
print(f"[ForeFlightUDPServer] Starting to receive data (e.g.): {parsed_obj}") | |
self.lastDataReceivedTime = time.time() | |
if isinstance(parsed_obj, XGPSData): | |
await self.sim_data.update_from_xgps(parsed_obj) | |
elif isinstance(parsed_obj, XATTData): | |
await self.sim_data.update_from_xatt(parsed_obj) | |
except (ConnectionError, OSError): | |
# Socket was closed | |
break | |
except asyncio.CancelledError: | |
# Task was cancelled | |
break | |
except Exception as e: | |
print(f"[ForeFlightUDPServer] Error receiving data: {e}") | |
# Continue despite errors | |
finally: | |
# Ensure socket is closed when the task ends | |
if self.socket: | |
try: | |
self.socket.close() | |
print("[ForeFlightUDPServer] Socket closed") | |
except Exception as ex: | |
print(f"[ForeFlightUDPServer] Error closing socket: {ex}") | |
################################################################################ | |
# Shirley WebSocket Server | |
################################################################################ | |
class ShirleyWebSocketServer: | |
""" | |
Hosts a WebSocket server on a specified port & path. | |
- When clients connect, we hold onto their connections. | |
- We periodically broadcast the latest `SimData` to every connected client. | |
""" | |
def __init__(self, sim_data: SimData, host="0.0.0.0", port=2992, path="/api/v1"): | |
self.sim_data = sim_data | |
self.host = host | |
self.port = port | |
self.path = path | |
self.send_interval = 0.25 # seconds (4 Hz) | |
self.connections: Set = set() | |
self.server = None | |
async def handler(self, websocket): | |
""" | |
Called for every new client connection. We can optionally listen for messages, | |
but here we primarily just broadcast out. | |
""" | |
# Store connection info before adding to set | |
client_info = websocket.remote_address if hasattr(websocket, 'remote_address') else "Unknown" | |
self.connections.add(websocket) | |
print(f"[ShirleyWebSocketServer] WebSocket Client connected: {client_info}") | |
# Check path if available | |
client_path = getattr(websocket, 'path', None) | |
if client_path and not client_path.endswith(self.path): | |
print(f"[ShirleyWebSocketServer] Warning: WS Connected to {client_path}, not {self.path}") | |
try: | |
# Keep reading messages in case the client sends anything | |
async for setSimData in websocket: | |
print(f"[ShirleyWebSocketServer] WebSocket Received SetSimData message: {setSimData}") | |
except websockets.exceptions.ConnectionClosed: | |
pass | |
except Exception as e: | |
print(f"[ShirleyWebSocketServer] Error in handler: {e}") | |
finally: | |
# Safe removal from connections set | |
if websocket in self.connections: | |
self.connections.remove(websocket) | |
print(f"[ShirleyWebSocketServer] WebSocket Client disconnected: {client_info}") | |
async def broadcast_loop(self): | |
""" | |
Periodically broadcast the latest SimData to all connected clients. | |
""" | |
try: | |
while True: | |
data = await self.sim_data.get_data_snapshot() | |
message = json.dumps(data) | |
stale_connections = [] | |
for ws in self.connections: | |
try: | |
await ws.send(message) | |
except websockets.exceptions.ConnectionClosed: | |
stale_connections.append(ws) | |
except Exception as e: | |
print(f"[ShirleyWebSocketServer] Send error: {e}") | |
stale_connections.append(ws) | |
# Remove any closed connections | |
for ws in stale_connections: | |
if ws in self.connections: # Check to avoid KeyError | |
self.connections.remove(ws) | |
try: | |
await asyncio.sleep(self.send_interval) | |
except asyncio.CancelledError: | |
# Exit gracefully if our task is cancelled | |
break | |
except asyncio.CancelledError: | |
# Ensure we handle task cancellation cleanly | |
print("[ShirleyWebSocketServer] Broadcast loop stopped") | |
raise | |
except Exception as e: | |
print(f"[ShirleyWebSocketServer] Broadcast loop error: {e}") | |
raise | |
async def run(self): | |
# Start the websockets server | |
self.server = await websockets.serve(self.handler, self.host, self.port) | |
print(f"[ShirleyWebSocketServer] Serving at ws://{self.host}:{self.port}{self.path}") | |
# Create broadcast task so we can properly handle cancellation | |
broadcast_task = asyncio.create_task(self.broadcast_loop()) | |
try: | |
# Run both the server and broadcast loop | |
await asyncio.gather( | |
self.server.wait_closed(), | |
broadcast_task | |
) | |
except asyncio.CancelledError: | |
# Handle cancellation gracefully | |
print("[ShirleyWebSocketServer] Server shutting down") | |
# Cancel our broadcast task | |
if not broadcast_task.done(): | |
broadcast_task.cancel() | |
try: | |
await broadcast_task | |
except asyncio.CancelledError: | |
pass | |
# Close all client connections | |
close_tasks = [] | |
for ws in list(self.connections): | |
close_tasks.append(asyncio.create_task(ws.close())) | |
if close_tasks: | |
await asyncio.gather(*close_tasks, return_exceptions=True) | |
self.connections.clear() | |
# Close the server | |
if self.server: | |
self.server.close() | |
# Raise so that our parent gatherer knows we're cancelled | |
raise | |
################################################################################ | |
# Bridge Core | |
################################################################################ | |
class ForeFlightToShirleyBridge: | |
""" | |
High-level orchestrator that sets up: | |
1) A SimData object | |
2) A ForeFlightUDPServer (listens on 49002) | |
3) A ShirleyWebSocketServer (hosts on 2992/api/v1) | |
4) Runs them concurrently with asyncio. | |
""" | |
def __init__(self, udp_port=49002, ws_host="0.0.0.0", ws_port=2992, ws_path="/api/v1"): | |
self.sim_data = SimData() | |
self.parser = ForeFlightParser() | |
self.udp_server = ForeFlightUDPServer(self.sim_data, self.parser, port=udp_port) | |
self.ws_server = ShirleyWebSocketServer(self.sim_data, host=ws_host, port=ws_port, path=ws_path) | |
async def run(self): | |
await asyncio.gather( | |
self.udp_server.run(), | |
self.ws_server.run() | |
) | |
################################################################################ | |
# GUI Implementation | |
################################################################################ | |
class BridgeGUI: | |
def __init__(self, master): | |
self.master = master | |
self.master.title("ForeFlight-Shirley Bridge") | |
self.master.geometry("400x300") | |
# Configure grid | |
self.master.grid_columnconfigure(0, weight=1) | |
self.master.grid_rowconfigure(1, weight=1) | |
# Initialize UI elements that will be created in setup_ui | |
self.status_label = None | |
self.connection_status = None | |
self.info_display = None | |
self.close_button = None | |
# Initialize bridge components | |
self.bridge = None | |
self.bridge_task = None | |
self.event_loop = None | |
self.bridge_thread = None | |
# Set up UI components | |
self.setup_ui() | |
# Set up window close protocol | |
self.master.protocol("WM_DELETE_WINDOW", self.close_application) | |
# Start the bridge | |
self.start_bridge() | |
def setup_ui(self): | |
"""Set up the user interface components""" | |
# Status Frame | |
status_frame = ttk.Frame(self.master, padding="10") | |
status_frame.grid(row=0, column=0, sticky="ew") | |
# Status Label | |
self.status_label = ttk.Label( | |
status_frame, | |
text="Status: Starting...", | |
font=tkfont.Font(size=10) | |
) | |
self.status_label.pack(side="left") | |
# Connection status | |
self.connection_status = ttk.Label( | |
status_frame, | |
text="Disconnected", | |
foreground="red", | |
font=tkfont.Font(size=10, weight="bold") | |
) | |
self.connection_status.pack(side="right") | |
# Info Frame | |
info_frame = ttk.Frame(self.master, padding="10") | |
info_frame.grid(row=1, column=0, sticky="nsew") | |
# Info Display | |
self.info_display = tk.Text( | |
info_frame, | |
wrap=tk.WORD, | |
width=40, | |
height=10, | |
font=tkfont.Font(family="Consolas", size=9) | |
) | |
self.info_display.pack(fill=tk.BOTH, expand=True) | |
# Control Frame | |
control_frame = ttk.Frame(self.master, padding="10") | |
control_frame.grid(row=2, column=0, sticky="ew") | |
# Close Button | |
self.close_button = ttk.Button( | |
control_frame, | |
text="Close", | |
command=self.close_application | |
) | |
self.close_button.pack(side="bottom") | |
def start_bridge(self): | |
"""Start the ForeFlight-Shirley bridge in a separate thread""" | |
def run_bridge(): | |
try: | |
self.event_loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self.event_loop) | |
self.bridge = ForeFlightToShirleyBridge() | |
# Update status | |
if self.master and self.status_label: | |
self.master.after(0, lambda: self.status_label.config( | |
text="Status: Bridge started" | |
)) | |
# Start monitoring UDP data | |
if self.master: | |
self.master.after(100, self.check_udp_data) | |
self.bridge_task = self.event_loop.create_task(self.bridge.run()) | |
self.event_loop.run_forever() | |
except Exception as e: | |
if self.master: | |
self.master.after(0, lambda: self.show_error(str(e))) | |
self.bridge_thread = threading.Thread(target=run_bridge, daemon=True) | |
self.bridge_thread.start() | |
def check_udp_data(self): | |
"""Check if we're receiving UDP data and update the UI accordingly""" | |
# Make sure the app hasn't been closed | |
if not self.master or not self.master.winfo_exists(): | |
return | |
if self.bridge and self.bridge.udp_server.lastDataReceivedTime is not None: | |
# Update connection status | |
if self.connection_status: | |
self.connection_status.config( | |
text="Connected", | |
foreground="green" | |
) | |
# Get latest data | |
if self.bridge.sim_data.xgps: | |
gps = self.bridge.sim_data.xgps | |
att = self.bridge.sim_data.xatt | |
# Update info display | |
info_text = f"=== Simulator Data ===\n" | |
info_text += f"Latitude: {gps.latitude:.4f}°\n" | |
info_text += f"Longitude: {gps.longitude:.4f}°\n" | |
info_text += f"Altitude: {gps.alt_msl_meters * 3.28084:.0f} ft\n" | |
info_text += f"Speed: {gps.ground_speed_mps * 1.94384:.1f} kts\n" | |
if att: | |
info_text += f"Roll: {att.roll_deg:.1f}°\n" | |
info_text += f"Pitch: {att.pitch_deg:.1f}°\n" | |
info_text += f"Heading: {att.heading_deg:.1f}°\n" | |
info_text += "\n=== Bridge Status ===\n" | |
info_text += f"UDP: Port 49002\n" | |
info_text += f"WebSocket: ws://0.0.0.0:2992/api/v1" | |
if self.info_display: | |
self.info_display.delete(1.0, tk.END) | |
self.info_display.insert(tk.END, info_text) | |
else: | |
if self.connection_status: | |
self.connection_status.config( | |
text="Disconnected", | |
foreground="red" | |
) | |
# Schedule next check if the app is still running | |
if self.master and self.master.winfo_exists(): | |
self.master.after(100, self.check_udp_data) | |
def show_error(self, error_msg): | |
"""Display error message in the info display""" | |
if self.info_display: | |
self.info_display.delete(1.0, tk.END) | |
self.info_display.insert(tk.END, f"Error: {error_msg}") | |
if self.status_label: | |
self.status_label.config(text="Status: Error") | |
if self.connection_status: | |
self.connection_status.config(text="Error", foreground="red") | |
def close_application(self): | |
"""Clean up and close the application""" | |
# First destroy the UI to prevent more UI updates | |
if self.master: | |
# Create local reference to avoid using self.master after destroy | |
master_copy = self.master | |
self.master = None # Remove reference to prevent future access | |
master_copy.destroy() | |
async def shutdown_async(): | |
try: | |
# Close all websocket connections first | |
if self.bridge and self.bridge.ws_server: | |
for ws in list(self.bridge.ws_server.connections): | |
try: | |
await ws.close() | |
except (websockets.exceptions.ConnectionClosed, Exception): | |
pass | |
self.bridge.ws_server.connections.clear() | |
# Then cancel the main bridge task | |
if self.bridge_task: | |
self.bridge_task.cancel() | |
try: | |
await self.bridge_task | |
except asyncio.CancelledError: | |
pass | |
# Close socket if it exists | |
if self.bridge and self.bridge.udp_server and self.bridge.udp_server.socket: | |
self.bridge.udp_server.socket.close() | |
except Exception as err: | |
print(f"Error during async shutdown: {err}") | |
if self.event_loop and self.event_loop.is_running(): | |
try: | |
# Schedule the shutdown coroutine and stop the loop | |
asyncio.run_coroutine_threadsafe(shutdown_async(), self.event_loop) | |
# Give it a moment to execute | |
time.sleep(0.2) | |
# Then stop the loop | |
self.event_loop.stop() | |
except Exception as e: | |
print(f"Error during shutdown: {e}") | |
################################################################################ | |
# Entry Point | |
################################################################################ | |
def run_cli(): | |
"""Run the bridge in command-line mode (no GUI)""" | |
bridge = ForeFlightToShirleyBridge() | |
print("Starting ForeFlight-Shirley Bridge in CLI mode...") | |
print("Press Ctrl+C to exit") | |
try: | |
asyncio.run(bridge.run()) | |
except KeyboardInterrupt: | |
print("\nBridge shutting down.") | |
def run_gui(): | |
"""Run the bridge with GUI""" | |
root = tk.Tk() | |
BridgeGUI(root) | |
root.mainloop() | |
if __name__ == "__main__": | |
# By default, run with GUI | |
# Use '--cli' flag to run without GUI | |
if len(sys.argv) > 1 and sys.argv[1] == '--cli': | |
run_cli() | |
else: | |
run_gui() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment