Last active
April 6, 2025 01:43
-
-
Save jlgabriel/fff943a5eb056dc4af664f8e27114996 to your computer and use it in GitHub Desktop.
Aerofly FS4 basic integration with SayIntentions.AI via UDP Foreflight streaming data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
################################################################################## | |
# Aerofly FS4 to SayIntentions.AI Adapter | |
# Developed to connect Aerofly FS4 Flight Simulator from IPACS (TM) to SayIntentions.AI | |
# Based on ForeFlight-Shirley-Bridge.py | |
################################################################################## | |
import asyncio | |
import socket | |
import json | |
import time | |
import tkinter as tk | |
from tkinter import ttk, font as tkfont | |
import threading | |
from dataclasses import dataclass | |
import sys | |
import os | |
from typing import Optional, Union, Dict, Any | |
from pathlib import Path | |
################################################################################ | |
# Constants & Utilities | |
################################################################################ | |
METERS_TO_FEET = 3.28084 | |
MPS_TO_KTS = 1.94384 | |
MPS_TO_FPM = 196.85 # Meters per second to feet per minute | |
# Define paths for SimAPI files | |
def get_local_appdata_path(): | |
"""Get the local appdata directory path""" | |
return os.path.join(os.environ.get('LOCALAPPDATA', ''), 'SayIntentionsAI') | |
def ensure_simapi_dir(): | |
"""Ensure the SayIntentionsAI directory exists""" | |
path = get_local_appdata_path() | |
os.makedirs(path, exist_ok=True) | |
return path | |
# SimAPI file paths | |
def get_simapi_input_path(): | |
"""Get the path for the simAPI_input.json file""" | |
return os.path.join(get_local_appdata_path(), 'simAPI_input.json') | |
def get_simapi_output_path(): | |
"""Get the path for the simAPI_output.jsonl file""" | |
return os.path.join(get_local_appdata_path(), 'simAPI_output.jsonl') | |
################################################################################ | |
# Data Classes for Parsed ForeFlight Messages | |
################################################################################ | |
@dataclass | |
class XGPSData: | |
""" | |
Represents ForeFlight XGPS data. | |
e.g. XGPS<sim_name>,<longitude>,<latitude>,<altitude_msl_meters>,<track_true_north>,<groundspeed_m/s> | |
""" | |
sim_name: str | |
longitude: float | |
latitude: float | |
alt_msl_meters: float | |
track_deg: float | |
ground_speed_mps: float | |
@dataclass | |
class XATTData: | |
""" | |
Represents ForeFlight XATT data. | |
e.g. XATT<sim_name>,<true_heading>,<pitch_degrees>,<roll_degrees> | |
""" | |
sim_name: str | |
heading_deg: float | |
pitch_deg: float | |
roll_deg: float | |
@dataclass | |
class UnknownData: | |
""" | |
Returned if we fail to parse or data type is not recognized. | |
""" | |
raw_line: str | |
################################################################################ | |
# Main SimData Model (for SayIntentions.AI) | |
################################################################################ | |
class SimData: | |
""" | |
Holds references to the latest XGPSData and XATTData from the simulator | |
and transforms them into the format SayIntentions.AI expects. | |
""" | |
def __init__(self): | |
# Store references to the most recent XGPSData and XATTData | |
# Initialize them to defaults | |
self.xgps: Optional[XGPSData] = None | |
self.xatt: Optional[XATTData] = None | |
# For concurrency control | |
self._lock = asyncio.Lock() | |
async def update_from_xgps(self, xgps: XGPSData): | |
"""Replace the reference to the latest XGPSData.""" | |
async with self._lock: | |
self.xgps = xgps | |
async def update_from_xatt(self, xatt: XATTData): | |
"""Replace the reference to the latest XATTData.""" | |
async with self._lock: | |
self.xatt = xatt | |
async def get_data_snapshot(self) -> Dict[str, Any]: | |
""" | |
Produce a dictionary in the format expected by SayIntentions.AI simAPI. | |
We do the unit conversions here, using the references to XGPSData and XATTData. | |
""" | |
async with self._lock: | |
# Local copies for clarity | |
xgps = self.xgps | |
xatt = self.xatt | |
# Set default values for the variables dictionary | |
variables = { | |
# Basic SimVars with default values | |
"PLANE LATITUDE": 0.0, | |
"PLANE LONGITUDE": 0.0, | |
"PLANE ALTITUDE": 0.0, # In feet | |
"GROUND VELOCITY": 0.0, # In knots | |
"PLANE HEADING DEGREES TRUE": 0.0, | |
"PLANE PITCH DEGREES": 0.0, | |
"PLANE BANK DEGREES": 0.0, | |
"VERTICAL SPEED": 0.0, # In feet per minute | |
"AIRSPEED INDICATED": 0.0, # In knots (estimated from ground speed) | |
# Radio stack (placeholder values) | |
"COM ACTIVE FREQUENCY:1": 118.0, # Example: 118.0 MHz | |
"COM STANDBY FREQUENCY:1": 118.5, # Example: 118.5 MHz | |
"NAV ACTIVE FREQUENCY:1": 110.0, # Example: 110.0 MHz | |
"NAV STANDBY FREQUENCY:1": 110.5, # Example: 110.5 MHz | |
"TRANSPONDER CODE:1": 1200, # Example: Squawk 1200 | |
# Other required variables with placeholder values | |
"ATC ID": "N12345", # Aircraft registration/callsign | |
"PLANE ALT ABOVE GROUND": 0.0, # AGL in feet | |
"SIM ON GROUND": 1 # 1 for on ground, 0 for in air | |
} | |
# Update values if we have XGPSData | |
if xgps: | |
variables.update({ | |
"PLANE LATITUDE": xgps.latitude, | |
"PLANE LONGITUDE": xgps.longitude, | |
"PLANE ALTITUDE": xgps.alt_msl_meters * METERS_TO_FEET, | |
"GROUND VELOCITY": xgps.ground_speed_mps * MPS_TO_KTS, | |
"PLANE HEADING DEGREES TRUE": xgps.track_deg % 360.0, # If no XATT is available | |
"AIRSPEED INDICATED": xgps.ground_speed_mps * MPS_TO_KTS, # Estimate IAS from ground speed | |
# Determine if on ground based on altitude and speed (simple heuristic) | |
"SIM ON GROUND": 1 if (xgps.alt_msl_meters < 10 and xgps.ground_speed_mps < 1) else 0, | |
# Guess AGL (crude approximation) | |
"PLANE ALT ABOVE GROUND": max(0, (xgps.alt_msl_meters * METERS_TO_FEET) - 50) | |
# Assuming 50ft terrain height | |
}) | |
# Update attitude values if we have XATTData | |
if xatt: | |
variables.update({ | |
"PLANE HEADING DEGREES TRUE": xatt.heading_deg % 360.0, | |
"PLANE PITCH DEGREES": xatt.pitch_deg, | |
"PLANE BANK DEGREES": xatt.roll_deg | |
}) | |
# Try to estimate vertical speed from pitch (very crude approximation) | |
if xgps: | |
# Simple approximation: pitch angle * ground speed gives an estimate of vertical component | |
pitch_radians = xatt.pitch_deg * (3.14159 / 180.0) | |
vertical_component = xgps.ground_speed_mps * max(-1, min(1, pitch_radians)) | |
variables["VERTICAL SPEED"] = vertical_component * MPS_TO_FPM | |
# Now return in the correct nested format | |
return { | |
"sim": { | |
"variables": variables, | |
"exe": "aerofly_fs_4.exe", | |
"simapi_version": "1.0", | |
"name": "AeroflyFS4", | |
"version": "7.0.0", | |
"adapter_version": "1.0.0" | |
} | |
} | |
################################################################################ | |
# ForeFlight Data Parser | |
################################################################################ | |
class ForeFlightParser: | |
""" | |
Parses strings in ForeFlight's XGPS / XATT formats, | |
returning typed objects: XGPSData, XATTData, or UnknownData. | |
""" | |
@staticmethod | |
def parse_line(line: str): | |
""" | |
Identify the data type (XGPS, XATT) and parse accordingly. | |
""" | |
line = line.strip() | |
if line.startswith("XGPS"): | |
return ForeFlightParser._parse_xgps(line) | |
elif line.startswith("XATT"): | |
return ForeFlightParser._parse_xatt(line) | |
else: | |
return UnknownData(raw_line=line) | |
@staticmethod | |
def _parse_xgps(line: str) -> Union[XGPSData, UnknownData]: | |
""" | |
Example XGPS line: | |
XGPSMySim,-80.11,34.55,1200.1,359.05,55.6 | |
=> XGPS<sim_name>,<longitude>,<latitude>,<alt_msl_meters>,<track_deg_true>,<groundspeed_m/s> | |
""" | |
try: | |
raw = line[4:] # remove 'XGPS' | |
parts = raw.split(",") | |
sim_name = parts[0].strip() | |
longitude = float(parts[1]) | |
latitude = float(parts[2]) | |
alt_msl_meters = float(parts[3]) | |
track_deg = float(parts[4]) | |
ground_speed_mps = float(parts[5]) | |
return XGPSData( | |
sim_name=sim_name, | |
longitude=longitude, | |
latitude=latitude, | |
alt_msl_meters=alt_msl_meters, | |
track_deg=track_deg, | |
ground_speed_mps=ground_speed_mps | |
) | |
except (ValueError, IndexError): | |
return UnknownData(raw_line=line) | |
@staticmethod | |
def _parse_xatt(line: str) -> Union[XATTData, UnknownData]: | |
""" | |
Example XATT line: | |
XATTMySim,180.2,0.1,0.2 | |
=> XATT<sim_name>,<true_heading_deg>,<pitch_deg>,<roll_deg> | |
""" | |
try: | |
raw = line[4:] | |
parts = raw.split(",") | |
sim_name = parts[0].strip() | |
heading_deg = float(parts[1]) | |
pitch_deg = float(parts[2]) | |
roll_deg = float(parts[3]) | |
return XATTData( | |
sim_name=sim_name, | |
heading_deg=heading_deg, | |
pitch_deg=pitch_deg, | |
roll_deg=roll_deg | |
) | |
except (ValueError, IndexError): | |
return UnknownData(raw_line=line) | |
################################################################################ | |
# ForeFlight UDP Server | |
################################################################################ | |
class ForeFlightUDPServer: | |
""" | |
Listens for ForeFlight-compatible data on UDP port 49002. | |
Parses lines and updates the shared SimData. | |
""" | |
def __init__(self, sim_data: SimData, parser: ForeFlightParser, port: int = 49002): | |
self.sim_data = sim_data | |
self.parser = parser | |
self.port = port | |
self.socket = None | |
self.lastDataReceivedTime = None | |
async def run(self): | |
""" | |
Main loop: bind to UDP port, receive data, parse, update sim_data. | |
We'll do a blocking recv in a background thread so we don't block the event loop. | |
""" | |
# Create UDP socket | |
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# Enable broadcast & address reuse | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
# Set blocking to True for our simple loop | |
self.socket.setblocking(True) | |
# Bind to all interfaces on the specified port: | |
self.socket.bind(('0.0.0.0', self.port)) | |
print(f"[ForeFlightUDPServer] Listening on UDP port {self.port}...") | |
print(f"[ForeFlightUDPServer] Ready to receive data from Aerofly FS4...") | |
try: | |
while True: | |
try: | |
# When the socket is closed, this will raise an exception | |
data, addr = await asyncio.to_thread(self.socket.recvfrom, 1024) | |
line = data.decode('utf-8', errors='ignore').strip() | |
parsed_obj = self.parser.parse_line(line) | |
# If we haven't received data in a while, print a message | |
if self.lastDataReceivedTime is None or \ | |
time.time() - self.lastDataReceivedTime > 5.0: | |
print(f"[ForeFlightUDPServer] Receiving data from {addr[0]}:{addr[1]}") | |
print(f"[ForeFlightUDPServer] Sample data: {line}") | |
print(f"[ForeFlightUDPServer] Parsed object: {parsed_obj}") | |
self.lastDataReceivedTime = time.time() | |
if isinstance(parsed_obj, XGPSData): | |
await self.sim_data.update_from_xgps(parsed_obj) | |
elif isinstance(parsed_obj, XATTData): | |
await self.sim_data.update_from_xatt(parsed_obj) | |
except (ConnectionError, OSError) as e: | |
# Socket was closed | |
print(f"[ForeFlightUDPServer] Socket error: {e}") | |
break | |
except asyncio.CancelledError: | |
# Task was cancelled | |
break | |
except Exception as e: | |
print(f"[ForeFlightUDPServer] Error receiving data: {e}") | |
# Continue despite errors | |
finally: | |
# Ensure socket is closed when the task ends | |
if self.socket: | |
try: | |
self.socket.close() | |
print("[ForeFlightUDPServer] Socket closed") | |
except Exception as ex: | |
print(f"[ForeFlightUDPServer] Error closing socket: {ex}") | |
################################################################################ | |
# SayIntentions.AI SimAPI Server | |
################################################################################ | |
class SimAPIServer: | |
""" | |
Writes SimAPI input file and reads SimAPI output file. | |
- Periodically writes the latest SimData to simAPI_input.json | |
- Reads and processes simAPI_output.jsonl for variable changes | |
""" | |
def __init__(self, sim_data: SimData, update_interval: float = 1.0): | |
self.sim_data = sim_data | |
self.update_interval = update_interval # seconds | |
self.last_write_time = 0 | |
self.last_read_time = 0 | |
# Ensure directory exists | |
ensure_simapi_dir() | |
# File paths | |
self.input_file_path = get_simapi_input_path() | |
self.output_file_path = get_simapi_output_path() | |
print(f"[SimAPIServer] Input file: {self.input_file_path}") | |
print(f"[SimAPIServer] Output file: {self.output_file_path}") | |
async def write_input_file(self): | |
"""Write the SimAPI input file with the latest sim data""" | |
try: | |
data = await self.sim_data.get_data_snapshot() | |
# Make sure the directory exists | |
simapi_dir = os.path.dirname(self.input_file_path) | |
try: | |
os.makedirs(simapi_dir, exist_ok=True) | |
print(f"[SimAPIServer] Ensuring directory exists: {simapi_dir}") | |
except Exception as dir_err: | |
print(f"[SimAPIServer] Error creating directory: {dir_err}") | |
return False | |
# Write the file | |
try: | |
# First write to a temporary file then rename it for atomic update | |
temp_path = f"{self.input_file_path}.tmp" | |
with open(temp_path, 'w') as f: | |
json.dump(data, f, indent=2) | |
# On Windows, we need to remove the destination file first for rename to work | |
if os.path.exists(self.input_file_path): | |
os.remove(self.input_file_path) | |
# Rename the temp file to the actual file | |
os.rename(temp_path, self.input_file_path) | |
self.last_write_time = time.time() | |
return True | |
except Exception as file_err: | |
print(f"[SimAPIServer] Error writing input file: {file_err}") | |
return False | |
except Exception as e: | |
print(f"[SimAPIServer] Error preparing data for input file: {e}") | |
return False | |
async def read_output_file(self): | |
""" | |
Read the SimAPI output file and process any variable changes. | |
The file format is JSONL (JSON Lines - one JSON object per line). | |
""" | |
try: | |
# If the file doesn't exist, try to create an empty one | |
if not os.path.exists(self.output_file_path): | |
try: | |
# Make sure the directory exists | |
os.makedirs(os.path.dirname(self.output_file_path), exist_ok=True) | |
# Create an empty file | |
with open(self.output_file_path, 'w') as f: | |
pass | |
print(f"[SimAPIServer] Created empty output file: {self.output_file_path}") | |
except Exception as create_err: | |
print(f"[SimAPIServer] Failed to create output file: {create_err}") | |
return [] | |
# Check if file is empty | |
if os.path.getsize(self.output_file_path) == 0: | |
return [] | |
# Read the file | |
try: | |
with open(self.output_file_path, 'r') as f: | |
lines = f.readlines() | |
except Exception as read_err: | |
print(f"[SimAPIServer] Error reading output file: {read_err}") | |
return [] | |
# Process each line | |
commands = [] | |
for line in lines: | |
line = line.strip() | |
if line: # Skip empty lines | |
try: | |
command = json.loads(line) | |
commands.append(command) | |
print(f"[SimAPIServer] Received command: {command}") | |
except json.JSONDecodeError as e: | |
print(f"[SimAPIServer] Error parsing JSON: {e}, line: {line}") | |
# Clear the file if we have commands | |
if commands: | |
try: | |
with open(self.output_file_path, 'w') as f: | |
pass | |
print(f"[SimAPIServer] Cleared output file after processing {len(commands)} commands") | |
except Exception as clear_err: | |
print(f"[SimAPIServer] Error clearing output file: {clear_err}") | |
self.last_read_time = time.time() | |
return commands | |
except Exception as e: | |
print(f"[SimAPIServer] Unexpected error reading output file: {e}") | |
return [] | |
async def process_commands(self, commands): | |
""" | |
Process commands from the SimAPI output file. | |
Currently just logs the commands as we can't directly set sim variables | |
in Aerofly FS4 (you would need to implement this for your simulator). | |
""" | |
for cmd in commands: | |
if 'setvar' in cmd and 'value' in cmd: | |
var_name = cmd['setvar'] | |
value = cmd['value'] | |
print(f"[SimAPIServer] Should set {var_name} to {value}") | |
# Here you would implement the logic to set variables in Aerofly FS4 | |
# For now, we just log the command | |
# Example implementation for COM/NAV frequencies: | |
# if var_name == "COM_ACTIVE_RADIO_SET_HZ": | |
# # Set COM1 active frequency in the simulator | |
# pass | |
# elif var_name == "COM_STBY_RADIO_SET_HZ": | |
# # Set COM1 standby frequency in the simulator | |
# pass | |
async def run(self): | |
""" | |
Main loop that periodically: | |
1. Writes the latest SimData to the input file | |
2. Reads and processes the output file | |
""" | |
try: | |
while True: | |
# Write input file | |
if time.time() - self.last_write_time >= self.update_interval: | |
await self.write_input_file() | |
# Read output file | |
if time.time() - self.last_read_time >= self.update_interval: | |
commands = await self.read_output_file() | |
if commands: | |
await self.process_commands(commands) | |
# Sleep briefly | |
await asyncio.sleep(0.1) | |
except asyncio.CancelledError: | |
print("[SimAPIServer] Server task cancelled") | |
raise | |
except Exception as e: | |
print(f"[SimAPIServer] Error in run loop: {e}") | |
raise | |
################################################################################ | |
# Bridge Core | |
################################################################################ | |
class AeroflyToSayIntentionsBridge: | |
""" | |
High-level orchestrator that sets up: | |
1) A SimData object | |
2) A ForeFlightUDPServer (listens on 49002) | |
3) A SimAPIServer (handles SimAPI input/output files) | |
4) Runs them concurrently with asyncio. | |
""" | |
def __init__(self, udp_port=49002, update_interval=1.0): | |
self.sim_data = SimData() | |
self.parser = ForeFlightParser() | |
self.udp_server = ForeFlightUDPServer(self.sim_data, self.parser, port=udp_port) | |
self.simapi_server = SimAPIServer(self.sim_data, update_interval=update_interval) | |
async def run(self): | |
await asyncio.gather( | |
self.udp_server.run(), | |
self.simapi_server.run() | |
) | |
################################################################################ | |
# GUI Implementation | |
################################################################################ | |
class BridgeGUI: | |
def __init__(self, master): | |
self.master = master | |
self.master.title("Aerofly FS4 to SayIntentions.AI Bridge") | |
self.master.geometry("600x500") | |
# Configure grid | |
self.master.grid_columnconfigure(0, weight=1) | |
self.master.grid_rowconfigure(1, weight=1) | |
# Initialize UI elements that will be created in setup_ui | |
self.status_label = None | |
self.connection_status = None | |
self.info_display = None | |
self.close_button = None | |
# Initialize bridge components | |
self.bridge = None | |
self.bridge_task = None | |
self.event_loop = None | |
self.bridge_thread = None | |
# Set up UI components | |
self.setup_ui() | |
# Set up window close protocol | |
self.master.protocol("WM_DELETE_WINDOW", self.close_application) | |
# Start the bridge | |
self.start_bridge() | |
def setup_ui(self): | |
"""Set up the user interface components""" | |
# Status Frame | |
status_frame = ttk.Frame(self.master, padding="10") | |
status_frame.grid(row=0, column=0, sticky="ew") | |
# Status Label | |
self.status_label = ttk.Label( | |
status_frame, | |
text="Status: Starting...", | |
font=tkfont.Font(size=10) | |
) | |
self.status_label.pack(side="left") | |
# Connection status | |
self.connection_status = ttk.Label( | |
status_frame, | |
text="Disconnected", | |
foreground="red", | |
font=tkfont.Font(size=10, weight="bold") | |
) | |
self.connection_status.pack(side="right") | |
# Info Frame | |
info_frame = ttk.Frame(self.master, padding="10") | |
info_frame.grid(row=1, column=0, sticky="nsew") | |
# Info Display | |
self.info_display = tk.Text( | |
info_frame, | |
wrap=tk.WORD, | |
width=60, | |
height=20, | |
font=tkfont.Font(family="Consolas", size=9) | |
) | |
self.info_display.pack(fill=tk.BOTH, expand=True) | |
# Control Frame | |
control_frame = ttk.Frame(self.master, padding="10") | |
control_frame.grid(row=2, column=0, sticky="ew") | |
# Buttons container | |
buttons_frame = ttk.Frame(control_frame) | |
buttons_frame.pack(fill=tk.X, expand=True) | |
# Test Connection Button | |
self.test_conn_button = ttk.Button( | |
buttons_frame, | |
text="Test Connection", | |
command=self.test_connection | |
) | |
self.test_conn_button.pack(side="left", padx=5) | |
# Manual Update Button | |
self.update_button = ttk.Button( | |
buttons_frame, | |
text="Update SimAPI Files", | |
command=self.manual_update | |
) | |
self.update_button.pack(side="left", padx=5) | |
# Close Button | |
self.close_button = ttk.Button( | |
control_frame, | |
text="Close", | |
command=self.close_application | |
) | |
self.close_button.pack(side="bottom", pady=10) | |
def test_connection(self): | |
"""Test connection by sending a UDP packet to known aerofly ports""" | |
if not self.bridge or not self.bridge.udp_server or not self.bridge.udp_server.socket: | |
self.show_error("Bridge not initialized") | |
return | |
try: | |
# Create a temporary socket for sending test packets | |
test_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
# List of potential ports where Aerofly might be listening | |
# (we don't actually know if Aerofly listens on any port, but it's worth a try) | |
targets = [ | |
('127.0.0.1', 49002), # Local loopback | |
('127.0.0.1', 49001), # Another possible port | |
('255.255.255.255', 49002) # Broadcast | |
] | |
# Simple ping message | |
message = b"PING_AEROFLY_FROM_SAYINTENTIONS_BRIDGE" | |
# Try sending to all potential targets | |
for target in targets: | |
try: | |
test_socket.sendto(message, target) | |
print(f"Sent test packet to {target[0]}:{target[1]}") | |
except Exception as e: | |
print(f"Failed to send to {target[0]}:{target[1]} - {e}") | |
# Close the socket | |
test_socket.close() | |
# Show success message in status | |
if self.status_label: | |
self.status_label.config(text="Status: Test packets sent") | |
# Schedule a reset of the status message | |
self.master.after(3000, lambda: self.status_label.config(text="Status: Running")) | |
except Exception as e: | |
self.show_error(f"Test failed: {e}") | |
def manual_update(self): | |
"""Manually trigger an update of the SimAPI files""" | |
if not self.bridge or not self.bridge.simapi_server: | |
self.show_error("Bridge not initialized") | |
return | |
try: | |
# Schedule the write operation in the bridge's event loop | |
if self.event_loop and self.bridge.simapi_server: | |
future = asyncio.run_coroutine_threadsafe( | |
self.bridge.simapi_server.write_input_file(), | |
self.event_loop | |
) | |
# Wait for result with a timeout | |
result = future.result(timeout=1.0) | |
# Show success/failure message | |
if result: | |
if self.status_label: | |
self.status_label.config(text="Status: SimAPI files updated") | |
else: | |
if self.status_label: | |
self.status_label.config(text="Status: Update failed") | |
# Schedule a reset of the status message | |
self.master.after(3000, lambda: self.status_label.config(text="Status: Running")) | |
except Exception as e: | |
self.show_error(f"Manual update failed: {e}") | |
def start_bridge(self): | |
"""Start the Aerofly-SayIntentions bridge in a separate thread""" | |
def run_bridge(): | |
try: | |
self.event_loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self.event_loop) | |
self.bridge = AeroflyToSayIntentionsBridge() | |
# Update status | |
if self.master and self.status_label: | |
self.master.after(0, lambda: self.status_label.config( | |
text="Status: Bridge started" | |
)) | |
# Start monitoring UDP data | |
if self.master: | |
self.master.after(100, self.check_udp_data) | |
self.bridge_task = self.event_loop.create_task(self.bridge.run()) | |
self.event_loop.run_forever() | |
except Exception as e: | |
if self.master: | |
self.master.after(0, lambda: self.show_error(str(e))) | |
self.bridge_thread = threading.Thread(target=run_bridge, daemon=True) | |
self.bridge_thread.start() | |
def check_udp_data(self): | |
"""Check if we're receiving UDP data and update the UI accordingly""" | |
# Make sure the app hasn't been closed | |
if not self.master or not self.master.winfo_exists(): | |
return | |
# Get current time for tracking connection status | |
current_time = time.time() | |
# Default connection state | |
connection_state = "Disconnected" | |
connection_color = "red" | |
# Default info text | |
info_text = f"=== Connection Status ===\n" | |
info_text += f"UDP Input: Listening on port 49002\n" | |
# Check if we have recent data (within last 5 seconds) | |
has_recent_data = False | |
elapsed_time = 0 | |
if self.bridge and self.bridge.udp_server.lastDataReceivedTime is not None: | |
elapsed_time = current_time - self.bridge.udp_server.lastDataReceivedTime | |
has_recent_data = elapsed_time < 5.0 | |
if has_recent_data: | |
connection_state = "Connected" | |
connection_color = "green" | |
info_text += f"Last data received: {elapsed_time:.1f} seconds ago\n" | |
else: | |
connection_state = "No Data" | |
connection_color = "orange" | |
info_text += f"No data in {elapsed_time:.1f} seconds\n" | |
info_text += f"Make sure Aerofly FS4 is running and ForeFlight output is enabled\n" | |
else: | |
info_text += "Waiting for first data from Aerofly FS4...\n" | |
info_text += "1. Make sure Aerofly FS4 is running\n" | |
info_text += "2. Check that 'Output data to ForeFlight' is enabled in Aerofly settings\n" | |
info_text += "3. Ensure this computer's firewall allows UDP port 49002\n" | |
# Update connection status | |
if self.connection_status: | |
self.connection_status.config( | |
text=connection_state, | |
foreground=connection_color | |
) | |
# If we have simulator data, show it | |
if self.bridge and self.bridge.sim_data.xgps: | |
gps = self.bridge.sim_data.xgps | |
att = self.bridge.sim_data.xatt | |
# Add simulator data to info display | |
info_text += f"\n=== Simulator Data ===\n" | |
info_text += f"Latitude: {gps.latitude:.4f}°\n" | |
info_text += f"Longitude: {gps.longitude:.4f}°\n" | |
info_text += f"Altitude: {gps.alt_msl_meters * METERS_TO_FEET:.0f} ft\n" | |
info_text += f"Speed: {gps.ground_speed_mps * MPS_TO_KTS:.1f} kts\n" | |
if att: | |
info_text += f"Roll: {att.roll_deg:.1f}°\n" | |
info_text += f"Pitch: {att.pitch_deg:.1f}°\n" | |
info_text += f"Heading: {att.heading_deg:.1f}°\n" | |
# Add SimAPI status | |
info_text += f"\n=== SimAPI Status ===\n" | |
info_text += f"SimAPI Input: {get_simapi_input_path()}\n" | |
info_text += f"SimAPI Output: {get_simapi_output_path()}\n" | |
if self.bridge and self.bridge.simapi_server: | |
if self.bridge.simapi_server.last_write_time > 0: | |
info_text += f"Last Write: {time.strftime('%H:%M:%S', time.localtime(self.bridge.simapi_server.last_write_time))}\n" | |
else: | |
info_text += "Last Write: Never\n" | |
if self.bridge.simapi_server.last_read_time > 0: | |
info_text += f"Last Read: {time.strftime('%H:%M:%S', time.localtime(self.bridge.simapi_server.last_read_time))}\n" | |
else: | |
info_text += "Last Read: Never\n" | |
# Update the info display | |
if self.info_display: | |
self.info_display.delete(1.0, tk.END) | |
self.info_display.insert(tk.END, info_text) | |
# Schedule next check if the app is still running | |
if self.master and self.master.winfo_exists(): | |
self.master.after(100, self.check_udp_data) | |
def show_error(self, error_msg): | |
"""Display error message in the info display""" | |
if self.info_display: | |
self.info_display.delete(1.0, tk.END) | |
self.info_display.insert(tk.END, f"Error: {error_msg}") | |
if self.status_label: | |
self.status_label.config(text="Status: Error") | |
if self.connection_status: | |
self.connection_status.config(text="Error", foreground="red") | |
def close_application(self): | |
"""Clean up and close the application""" | |
# First destroy the UI to prevent more UI updates | |
if self.master: | |
# Create local reference to avoid using self.master after destroy | |
master_copy = self.master | |
self.master = None # Remove reference to prevent future access | |
master_copy.destroy() | |
async def shutdown_async(): | |
try: | |
# Cancel the main bridge task | |
if self.bridge_task: | |
self.bridge_task.cancel() | |
try: | |
await self.bridge_task | |
except asyncio.CancelledError: | |
pass | |
# Close socket if it exists | |
if self.bridge and self.bridge.udp_server and self.bridge.udp_server.socket: | |
self.bridge.udp_server.socket.close() | |
except Exception as err: | |
print(f"Error during async shutdown: {err}") | |
if self.event_loop and self.event_loop.is_running(): | |
try: | |
# Schedule the shutdown coroutine and stop the loop | |
asyncio.run_coroutine_threadsafe(shutdown_async(), self.event_loop) | |
# Give it a moment to execute | |
time.sleep(0.2) | |
# Then stop the loop | |
self.event_loop.stop() | |
except Exception as e: | |
print(f"Error during shutdown: {e}") | |
################################################################################ | |
# Entry Point | |
################################################################################ | |
def run_cli(): | |
"""Run the bridge in command-line mode (no GUI)""" | |
bridge = AeroflyToSayIntentionsBridge() | |
print("Starting Aerofly FS4 to SayIntentions.AI Bridge in CLI mode...") | |
print("Press Ctrl+C to exit") | |
try: | |
asyncio.run(bridge.run()) | |
except KeyboardInterrupt: | |
print("\nBridge shutting down.") | |
def run_gui(): | |
"""Run the bridge with GUI""" | |
root = tk.Tk() | |
BridgeGUI(root) | |
root.mainloop() | |
if __name__ == "__main__": | |
# By default, run with GUI | |
# Use '--cli' flag to run without GUI | |
if len(sys.argv) > 1 and sys.argv[1] == '--cli': | |
run_cli() | |
else: | |
run_gui() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment