Skip to content

Instantly share code, notes, and snippets.

@jlgabriel
Created April 1, 2025 01:17
Show Gist options
  • Save jlgabriel/c5653ba39c33544763962918591d73a1 to your computer and use it in GitHub Desktop.
Save jlgabriel/c5653ba39c33544763962918591d73a1 to your computer and use it in GitHub Desktop.
Aerofly FS4 ForeFlight Protocol to Shirley Bridge with GUI
#!/usr/bin/env python3
##################################################################################
# ForeFlight Protocol to Shirley Bridge with GUI
# Developed to connect Aerofly FS4 Flight Simulator from IPACS (TM) to Fly Shirley
# Reference: https://support.foreflight.com/hc/en-us/articles/204115005
##################################################################################
import asyncio
import socket
import json
import time
import tkinter as tk
from tkinter import ttk, font as tkfont
import threading
from dataclasses import dataclass
import sys
from typing import Optional, Set, Union
# Import websockets with correct types
import websockets
import websockets.exceptions
################################################################################
# Constants & Utilities
################################################################################
METERS_TO_FEET = 3.28084
MPS_TO_KTS = 1.94384
################################################################################
# Data Classes for Parsed ForeFlight Messages
################################################################################
@dataclass
class XGPSData:
"""
Represents ForeFlight XGPS data.
e.g. XGPS<sim_name>,<longitude>,<latitude>,<altitude_msl_meters>,<track_true_north>,<groundspeed_m/s>
"""
sim_name: str
longitude: float
latitude: float
alt_msl_meters: float
track_deg: float
ground_speed_mps: float
@dataclass
class XATTData:
"""
Represents ForeFlight XATT data.
e.g. XATT<sim_name>,<true_heading>,<pitch_degrees>,<roll_degrees>
"""
sim_name: str
heading_deg: float
pitch_deg: float
roll_deg: float
@dataclass
class UnknownData:
"""
Returned if we fail to parse or data type is not recognized.
"""
raw_line: str
################################################################################
# Main SimData Model (for Shirley)
################################################################################
class SimData:
"""
Holds references to the latest XGPSData and XATTData from the simulator
and transforms them into the format Shirley expects.
"""
def __init__(self):
# Store references to the most recent XGPSData and XATTData
# Initialize them to defaults
self.xgps: Optional[XGPSData] = None
self.xatt: Optional[XATTData] = None
# For concurrency control
self._lock = asyncio.Lock()
async def update_from_xgps(self, xgps: XGPSData):
"""Replace the reference to the latest XGPSData."""
async with self._lock:
self.xgps = xgps
async def update_from_xatt(self, xatt: XATTData):
"""Replace the reference to the latest XATTData."""
async with self._lock:
self.xatt = xatt
async def get_data_snapshot(self):
"""
Produce a dictionary of position and attitude in the format Shirley expects.
We do the unit conversions (e.g. M -> ft) here, using the references to XGPSData and XATTData.
"""
async with self._lock:
# Local copies for clarity
xgps = self.xgps
xatt = self.xatt
# Defaults in case we have no data yet
latitude = 0.0
longitude = 0.0
msl_alt_ft = 0.0
ground_speed_kts = 0.0
heading_deg = 0.0
pitch_deg = 0.0
roll_deg = 0.0
if xgps:
latitude = xgps.latitude
longitude = xgps.longitude
msl_alt_ft = xgps.alt_msl_meters * METERS_TO_FEET
ground_speed_kts = xgps.ground_speed_mps * MPS_TO_KTS
heading_deg = xgps.track_deg % 360.0 # if no XATT is available
if xatt:
# If we do have XATT, prefer that heading
heading_deg = xatt.heading_deg % 360.0
pitch_deg = xatt.pitch_deg
roll_deg = xatt.roll_deg
# Build final object for Shirley
return {
"position": {
"latitudeDeg": latitude,
"longitudeDeg": longitude,
"mslAltitudeFt": msl_alt_ft,
"gpsGroundSpeedKts": ground_speed_kts,
},
"attitude": {
"rollAngleDegRight": roll_deg,
"pitchAngleDegUp": pitch_deg,
"trueHeadingDeg": heading_deg,
}
}
################################################################################
# ForeFlight Data Parser
################################################################################
class ForeFlightParser:
"""
Parses strings in ForeFlight's XGPS / XATT formats,
returning typed objects: XGPSData, XATTData, or UnknownData.
"""
@staticmethod
def parse_line(line: str):
"""
Identify the data type (XGPS, XATT) and parse accordingly.
"""
line = line.strip()
if line.startswith("XGPS"):
return ForeFlightParser._parse_xgps(line)
elif line.startswith("XATT"):
return ForeFlightParser._parse_xatt(line)
else:
return UnknownData(raw_line=line)
@staticmethod
def _parse_xgps(line: str) -> Union[XGPSData, UnknownData]:
"""
Example XGPS line:
XGPSMySim,-80.11,34.55,1200.1,359.05,55.6
=> XGPS<sim_name>,<longitude>,<latitude>,<alt_msl_meters>,<track_deg_true>,<groundspeed_m/s>
"""
try:
raw = line[4:] # remove 'XGPS'
parts = raw.split(",")
sim_name = parts[0].strip()
longitude = float(parts[1])
latitude = float(parts[2])
alt_msl_meters = float(parts[3])
track_deg = float(parts[4])
ground_speed_mps = float(parts[5])
return XGPSData(
sim_name=sim_name,
longitude=longitude,
latitude=latitude,
alt_msl_meters=alt_msl_meters,
track_deg=track_deg,
ground_speed_mps=ground_speed_mps
)
except (ValueError, IndexError):
return UnknownData(raw_line=line)
@staticmethod
def _parse_xatt(line: str) -> Union[XATTData, UnknownData]:
"""
Example XATT line:
XATTMySim,180.2,0.1,0.2
=> XATT<sim_name>,<true_heading_deg>,<pitch_deg>,<roll_deg>
"""
try:
raw = line[4:]
parts = raw.split(",")
sim_name = parts[0].strip()
heading_deg = float(parts[1])
pitch_deg = float(parts[2])
roll_deg = float(parts[3])
return XATTData(
sim_name=sim_name,
heading_deg=heading_deg,
pitch_deg=pitch_deg,
roll_deg=roll_deg
)
except (ValueError, IndexError):
return UnknownData(raw_line=line)
################################################################################
# ForeFlight UDP Server
################################################################################
class ForeFlightUDPServer:
"""
Listens for ForeFlight-compatible data on UDP port 49002.
Parses lines and updates the shared SimData.
"""
def __init__(self, sim_data: SimData, parser: ForeFlightParser, port: int = 49002):
self.sim_data = sim_data
self.parser = parser
self.port = port
self.socket = None
self.lastDataReceivedTime = None
async def run(self):
"""
Main loop: bind to UDP port, receive data, parse, update sim_data.
We'll do a blocking recv in a background thread so we don't block the event loop.
"""
# Create UDP socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Enable broadcast & address reuse
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set blocking to True for our simple loop
self.socket.setblocking(True)
# Bind to all interfaces on the specified port:
self.socket.bind(('', self.port))
print(f"[ForeFlightUDPServer] Listening on UDP port {self.port}...")
try:
while True:
try:
# When the socket is closed, this will raise an exception
data, _ = await asyncio.to_thread(self.socket.recvfrom, 1024)
line = data.decode('utf-8', errors='ignore').strip()
parsed_obj = self.parser.parse_line(line)
# If we haven't received data in a while, print a message
if self.lastDataReceivedTime is None or \
time.time() - self.lastDataReceivedTime > 5.0:
print(f"[ForeFlightUDPServer] Starting to receive data (e.g.): {parsed_obj}")
self.lastDataReceivedTime = time.time()
if isinstance(parsed_obj, XGPSData):
await self.sim_data.update_from_xgps(parsed_obj)
elif isinstance(parsed_obj, XATTData):
await self.sim_data.update_from_xatt(parsed_obj)
except (ConnectionError, OSError):
# Socket was closed
break
except asyncio.CancelledError:
# Task was cancelled
break
except Exception as e:
print(f"[ForeFlightUDPServer] Error receiving data: {e}")
# Continue despite errors
finally:
# Ensure socket is closed when the task ends
if self.socket:
try:
self.socket.close()
print("[ForeFlightUDPServer] Socket closed")
except Exception as ex:
print(f"[ForeFlightUDPServer] Error closing socket: {ex}")
################################################################################
# Shirley WebSocket Server
################################################################################
class ShirleyWebSocketServer:
"""
Hosts a WebSocket server on a specified port & path.
- When clients connect, we hold onto their connections.
- We periodically broadcast the latest `SimData` to every connected client.
"""
def __init__(self, sim_data: SimData, host="0.0.0.0", port=2992, path="/api/v1"):
self.sim_data = sim_data
self.host = host
self.port = port
self.path = path
self.send_interval = 0.25 # seconds (4 Hz)
self.connections: Set = set()
self.server = None
async def handler(self, websocket):
"""
Called for every new client connection. We can optionally listen for messages,
but here we primarily just broadcast out.
"""
# Store connection info before adding to set
client_info = websocket.remote_address if hasattr(websocket, 'remote_address') else "Unknown"
self.connections.add(websocket)
print(f"[ShirleyWebSocketServer] WebSocket Client connected: {client_info}")
# Check path if available
client_path = getattr(websocket, 'path', None)
if client_path and not client_path.endswith(self.path):
print(f"[ShirleyWebSocketServer] Warning: WS Connected to {client_path}, not {self.path}")
try:
# Keep reading messages in case the client sends anything
async for setSimData in websocket:
print(f"[ShirleyWebSocketServer] WebSocket Received SetSimData message: {setSimData}")
except websockets.exceptions.ConnectionClosed:
pass
except Exception as e:
print(f"[ShirleyWebSocketServer] Error in handler: {e}")
finally:
# Safe removal from connections set
if websocket in self.connections:
self.connections.remove(websocket)
print(f"[ShirleyWebSocketServer] WebSocket Client disconnected: {client_info}")
async def broadcast_loop(self):
"""
Periodically broadcast the latest SimData to all connected clients.
"""
try:
while True:
data = await self.sim_data.get_data_snapshot()
message = json.dumps(data)
stale_connections = []
for ws in self.connections:
try:
await ws.send(message)
except websockets.exceptions.ConnectionClosed:
stale_connections.append(ws)
except Exception as e:
print(f"[ShirleyWebSocketServer] Send error: {e}")
stale_connections.append(ws)
# Remove any closed connections
for ws in stale_connections:
if ws in self.connections: # Check to avoid KeyError
self.connections.remove(ws)
try:
await asyncio.sleep(self.send_interval)
except asyncio.CancelledError:
# Exit gracefully if our task is cancelled
break
except asyncio.CancelledError:
# Ensure we handle task cancellation cleanly
print("[ShirleyWebSocketServer] Broadcast loop stopped")
raise
except Exception as e:
print(f"[ShirleyWebSocketServer] Broadcast loop error: {e}")
raise
async def run(self):
# Start the websockets server
self.server = await websockets.serve(self.handler, self.host, self.port)
print(f"[ShirleyWebSocketServer] Serving at ws://{self.host}:{self.port}{self.path}")
# Create broadcast task so we can properly handle cancellation
broadcast_task = asyncio.create_task(self.broadcast_loop())
try:
# Run both the server and broadcast loop
await asyncio.gather(
self.server.wait_closed(),
broadcast_task
)
except asyncio.CancelledError:
# Handle cancellation gracefully
print("[ShirleyWebSocketServer] Server shutting down")
# Cancel our broadcast task
if not broadcast_task.done():
broadcast_task.cancel()
try:
await broadcast_task
except asyncio.CancelledError:
pass
# Close all client connections
close_tasks = []
for ws in list(self.connections):
close_tasks.append(asyncio.create_task(ws.close()))
if close_tasks:
await asyncio.gather(*close_tasks, return_exceptions=True)
self.connections.clear()
# Close the server
if self.server:
self.server.close()
# Raise so that our parent gatherer knows we're cancelled
raise
################################################################################
# Bridge Core
################################################################################
class ForeFlightToShirleyBridge:
"""
High-level orchestrator that sets up:
1) A SimData object
2) A ForeFlightUDPServer (listens on 49002)
3) A ShirleyWebSocketServer (hosts on 2992/api/v1)
4) Runs them concurrently with asyncio.
"""
def __init__(self, udp_port=49002, ws_host="0.0.0.0", ws_port=2992, ws_path="/api/v1"):
self.sim_data = SimData()
self.parser = ForeFlightParser()
self.udp_server = ForeFlightUDPServer(self.sim_data, self.parser, port=udp_port)
self.ws_server = ShirleyWebSocketServer(self.sim_data, host=ws_host, port=ws_port, path=ws_path)
async def run(self):
await asyncio.gather(
self.udp_server.run(),
self.ws_server.run()
)
################################################################################
# GUI Implementation
################################################################################
class BridgeGUI:
def __init__(self, master):
self.master = master
self.master.title("ForeFlight-Shirley Bridge")
self.master.geometry("400x300")
# Configure grid
self.master.grid_columnconfigure(0, weight=1)
self.master.grid_rowconfigure(1, weight=1)
# Initialize UI elements that will be created in setup_ui
self.status_label = None
self.connection_status = None
self.info_display = None
self.close_button = None
# Initialize bridge components
self.bridge = None
self.bridge_task = None
self.event_loop = None
self.bridge_thread = None
# Set up UI components
self.setup_ui()
# Set up window close protocol
self.master.protocol("WM_DELETE_WINDOW", self.close_application)
# Start the bridge
self.start_bridge()
def setup_ui(self):
"""Set up the user interface components"""
# Status Frame
status_frame = ttk.Frame(self.master, padding="10")
status_frame.grid(row=0, column=0, sticky="ew")
# Status Label
self.status_label = ttk.Label(
status_frame,
text="Status: Starting...",
font=tkfont.Font(size=10)
)
self.status_label.pack(side="left")
# Connection status
self.connection_status = ttk.Label(
status_frame,
text="Disconnected",
foreground="red",
font=tkfont.Font(size=10, weight="bold")
)
self.connection_status.pack(side="right")
# Info Frame
info_frame = ttk.Frame(self.master, padding="10")
info_frame.grid(row=1, column=0, sticky="nsew")
# Info Display
self.info_display = tk.Text(
info_frame,
wrap=tk.WORD,
width=40,
height=10,
font=tkfont.Font(family="Consolas", size=9)
)
self.info_display.pack(fill=tk.BOTH, expand=True)
# Control Frame
control_frame = ttk.Frame(self.master, padding="10")
control_frame.grid(row=2, column=0, sticky="ew")
# Close Button
self.close_button = ttk.Button(
control_frame,
text="Close",
command=self.close_application
)
self.close_button.pack(side="bottom")
def start_bridge(self):
"""Start the ForeFlight-Shirley bridge in a separate thread"""
def run_bridge():
try:
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self.bridge = ForeFlightToShirleyBridge()
# Update status
if self.master and self.status_label:
self.master.after(0, lambda: self.status_label.config(
text="Status: Bridge started"
))
# Start monitoring UDP data
if self.master:
self.master.after(100, self.check_udp_data)
self.bridge_task = self.event_loop.create_task(self.bridge.run())
self.event_loop.run_forever()
except Exception as e:
if self.master:
self.master.after(0, lambda: self.show_error(str(e)))
self.bridge_thread = threading.Thread(target=run_bridge, daemon=True)
self.bridge_thread.start()
def check_udp_data(self):
"""Check if we're receiving UDP data and update the UI accordingly"""
# Make sure the app hasn't been closed
if not self.master or not self.master.winfo_exists():
return
if self.bridge and self.bridge.udp_server.lastDataReceivedTime is not None:
# Update connection status
if self.connection_status:
self.connection_status.config(
text="Connected",
foreground="green"
)
# Get latest data
if self.bridge.sim_data.xgps:
gps = self.bridge.sim_data.xgps
att = self.bridge.sim_data.xatt
# Update info display
info_text = f"=== Simulator Data ===\n"
info_text += f"Latitude: {gps.latitude:.4f}°\n"
info_text += f"Longitude: {gps.longitude:.4f}°\n"
info_text += f"Altitude: {gps.alt_msl_meters * 3.28084:.0f} ft\n"
info_text += f"Speed: {gps.ground_speed_mps * 1.94384:.1f} kts\n"
if att:
info_text += f"Roll: {att.roll_deg:.1f}°\n"
info_text += f"Pitch: {att.pitch_deg:.1f}°\n"
info_text += f"Heading: {att.heading_deg:.1f}°\n"
info_text += "\n=== Bridge Status ===\n"
info_text += f"UDP: Port 49002\n"
info_text += f"WebSocket: ws://0.0.0.0:2992/api/v1"
if self.info_display:
self.info_display.delete(1.0, tk.END)
self.info_display.insert(tk.END, info_text)
else:
if self.connection_status:
self.connection_status.config(
text="Disconnected",
foreground="red"
)
# Schedule next check if the app is still running
if self.master and self.master.winfo_exists():
self.master.after(100, self.check_udp_data)
def show_error(self, error_msg):
"""Display error message in the info display"""
if self.info_display:
self.info_display.delete(1.0, tk.END)
self.info_display.insert(tk.END, f"Error: {error_msg}")
if self.status_label:
self.status_label.config(text="Status: Error")
if self.connection_status:
self.connection_status.config(text="Error", foreground="red")
def close_application(self):
"""Clean up and close the application"""
# First destroy the UI to prevent more UI updates
if self.master:
# Create local reference to avoid using self.master after destroy
master_copy = self.master
self.master = None # Remove reference to prevent future access
master_copy.destroy()
async def shutdown_async():
try:
# Close all websocket connections first
if self.bridge and self.bridge.ws_server:
for ws in list(self.bridge.ws_server.connections):
try:
await ws.close()
except (websockets.exceptions.ConnectionClosed, Exception):
pass
self.bridge.ws_server.connections.clear()
# Then cancel the main bridge task
if self.bridge_task:
self.bridge_task.cancel()
try:
await self.bridge_task
except asyncio.CancelledError:
pass
# Close socket if it exists
if self.bridge and self.bridge.udp_server and self.bridge.udp_server.socket:
self.bridge.udp_server.socket.close()
except Exception as err:
print(f"Error during async shutdown: {err}")
if self.event_loop and self.event_loop.is_running():
try:
# Schedule the shutdown coroutine and stop the loop
asyncio.run_coroutine_threadsafe(shutdown_async(), self.event_loop)
# Give it a moment to execute
time.sleep(0.2)
# Then stop the loop
self.event_loop.stop()
except Exception as e:
print(f"Error during shutdown: {e}")
################################################################################
# Entry Point
################################################################################
def run_cli():
"""Run the bridge in command-line mode (no GUI)"""
bridge = ForeFlightToShirleyBridge()
print("Starting ForeFlight-Shirley Bridge in CLI mode...")
print("Press Ctrl+C to exit")
try:
asyncio.run(bridge.run())
except KeyboardInterrupt:
print("\nBridge shutting down.")
def run_gui():
"""Run the bridge with GUI"""
root = tk.Tk()
BridgeGUI(root)
root.mainloop()
if __name__ == "__main__":
# By default, run with GUI
# Use '--cli' flag to run without GUI
if len(sys.argv) > 1 and sys.argv[1] == '--cli':
run_cli()
else:
run_gui()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment