Skip to content

Instantly share code, notes, and snippets.

@jlgabriel
Last active April 6, 2025 01:43
Show Gist options
  • Save jlgabriel/fff943a5eb056dc4af664f8e27114996 to your computer and use it in GitHub Desktop.
Save jlgabriel/fff943a5eb056dc4af664f8e27114996 to your computer and use it in GitHub Desktop.
Aerofly FS4 basic integration with SayIntentions.AI via UDP Foreflight streaming data
#!/usr/bin/env python3
##################################################################################
# Aerofly FS4 to SayIntentions.AI Adapter
# Developed to connect Aerofly FS4 Flight Simulator from IPACS (TM) to SayIntentions.AI
# Based on ForeFlight-Shirley-Bridge.py
##################################################################################
import asyncio
import socket
import json
import time
import tkinter as tk
from tkinter import ttk, font as tkfont
import threading
from dataclasses import dataclass
import sys
import os
from typing import Optional, Union, Dict, Any
from pathlib import Path
################################################################################
# Constants & Utilities
################################################################################
METERS_TO_FEET = 3.28084
MPS_TO_KTS = 1.94384
MPS_TO_FPM = 196.85 # Meters per second to feet per minute
# Define paths for SimAPI files
def get_local_appdata_path():
"""Get the local appdata directory path"""
return os.path.join(os.environ.get('LOCALAPPDATA', ''), 'SayIntentionsAI')
def ensure_simapi_dir():
"""Ensure the SayIntentionsAI directory exists"""
path = get_local_appdata_path()
os.makedirs(path, exist_ok=True)
return path
# SimAPI file paths
def get_simapi_input_path():
"""Get the path for the simAPI_input.json file"""
return os.path.join(get_local_appdata_path(), 'simAPI_input.json')
def get_simapi_output_path():
"""Get the path for the simAPI_output.jsonl file"""
return os.path.join(get_local_appdata_path(), 'simAPI_output.jsonl')
################################################################################
# Data Classes for Parsed ForeFlight Messages
################################################################################
@dataclass
class XGPSData:
"""
Represents ForeFlight XGPS data.
e.g. XGPS<sim_name>,<longitude>,<latitude>,<altitude_msl_meters>,<track_true_north>,<groundspeed_m/s>
"""
sim_name: str
longitude: float
latitude: float
alt_msl_meters: float
track_deg: float
ground_speed_mps: float
@dataclass
class XATTData:
"""
Represents ForeFlight XATT data.
e.g. XATT<sim_name>,<true_heading>,<pitch_degrees>,<roll_degrees>
"""
sim_name: str
heading_deg: float
pitch_deg: float
roll_deg: float
@dataclass
class UnknownData:
"""
Returned if we fail to parse or data type is not recognized.
"""
raw_line: str
################################################################################
# Main SimData Model (for SayIntentions.AI)
################################################################################
class SimData:
"""
Holds references to the latest XGPSData and XATTData from the simulator
and transforms them into the format SayIntentions.AI expects.
"""
def __init__(self):
# Store references to the most recent XGPSData and XATTData
# Initialize them to defaults
self.xgps: Optional[XGPSData] = None
self.xatt: Optional[XATTData] = None
# For concurrency control
self._lock = asyncio.Lock()
async def update_from_xgps(self, xgps: XGPSData):
"""Replace the reference to the latest XGPSData."""
async with self._lock:
self.xgps = xgps
async def update_from_xatt(self, xatt: XATTData):
"""Replace the reference to the latest XATTData."""
async with self._lock:
self.xatt = xatt
async def get_data_snapshot(self) -> Dict[str, Any]:
"""
Produce a dictionary in the format expected by SayIntentions.AI simAPI.
We do the unit conversions here, using the references to XGPSData and XATTData.
"""
async with self._lock:
# Local copies for clarity
xgps = self.xgps
xatt = self.xatt
# Set default values for the variables dictionary
variables = {
# Basic SimVars with default values
"PLANE LATITUDE": 0.0,
"PLANE LONGITUDE": 0.0,
"PLANE ALTITUDE": 0.0, # In feet
"GROUND VELOCITY": 0.0, # In knots
"PLANE HEADING DEGREES TRUE": 0.0,
"PLANE PITCH DEGREES": 0.0,
"PLANE BANK DEGREES": 0.0,
"VERTICAL SPEED": 0.0, # In feet per minute
"AIRSPEED INDICATED": 0.0, # In knots (estimated from ground speed)
# Radio stack (placeholder values)
"COM ACTIVE FREQUENCY:1": 118.0, # Example: 118.0 MHz
"COM STANDBY FREQUENCY:1": 118.5, # Example: 118.5 MHz
"NAV ACTIVE FREQUENCY:1": 110.0, # Example: 110.0 MHz
"NAV STANDBY FREQUENCY:1": 110.5, # Example: 110.5 MHz
"TRANSPONDER CODE:1": 1200, # Example: Squawk 1200
# Other required variables with placeholder values
"ATC ID": "N12345", # Aircraft registration/callsign
"PLANE ALT ABOVE GROUND": 0.0, # AGL in feet
"SIM ON GROUND": 1 # 1 for on ground, 0 for in air
}
# Update values if we have XGPSData
if xgps:
variables.update({
"PLANE LATITUDE": xgps.latitude,
"PLANE LONGITUDE": xgps.longitude,
"PLANE ALTITUDE": xgps.alt_msl_meters * METERS_TO_FEET,
"GROUND VELOCITY": xgps.ground_speed_mps * MPS_TO_KTS,
"PLANE HEADING DEGREES TRUE": xgps.track_deg % 360.0, # If no XATT is available
"AIRSPEED INDICATED": xgps.ground_speed_mps * MPS_TO_KTS, # Estimate IAS from ground speed
# Determine if on ground based on altitude and speed (simple heuristic)
"SIM ON GROUND": 1 if (xgps.alt_msl_meters < 10 and xgps.ground_speed_mps < 1) else 0,
# Guess AGL (crude approximation)
"PLANE ALT ABOVE GROUND": max(0, (xgps.alt_msl_meters * METERS_TO_FEET) - 50)
# Assuming 50ft terrain height
})
# Update attitude values if we have XATTData
if xatt:
variables.update({
"PLANE HEADING DEGREES TRUE": xatt.heading_deg % 360.0,
"PLANE PITCH DEGREES": xatt.pitch_deg,
"PLANE BANK DEGREES": xatt.roll_deg
})
# Try to estimate vertical speed from pitch (very crude approximation)
if xgps:
# Simple approximation: pitch angle * ground speed gives an estimate of vertical component
pitch_radians = xatt.pitch_deg * (3.14159 / 180.0)
vertical_component = xgps.ground_speed_mps * max(-1, min(1, pitch_radians))
variables["VERTICAL SPEED"] = vertical_component * MPS_TO_FPM
# Now return in the correct nested format
return {
"sim": {
"variables": variables,
"exe": "aerofly_fs_4.exe",
"simapi_version": "1.0",
"name": "AeroflyFS4",
"version": "7.0.0",
"adapter_version": "1.0.0"
}
}
################################################################################
# ForeFlight Data Parser
################################################################################
class ForeFlightParser:
"""
Parses strings in ForeFlight's XGPS / XATT formats,
returning typed objects: XGPSData, XATTData, or UnknownData.
"""
@staticmethod
def parse_line(line: str):
"""
Identify the data type (XGPS, XATT) and parse accordingly.
"""
line = line.strip()
if line.startswith("XGPS"):
return ForeFlightParser._parse_xgps(line)
elif line.startswith("XATT"):
return ForeFlightParser._parse_xatt(line)
else:
return UnknownData(raw_line=line)
@staticmethod
def _parse_xgps(line: str) -> Union[XGPSData, UnknownData]:
"""
Example XGPS line:
XGPSMySim,-80.11,34.55,1200.1,359.05,55.6
=> XGPS<sim_name>,<longitude>,<latitude>,<alt_msl_meters>,<track_deg_true>,<groundspeed_m/s>
"""
try:
raw = line[4:] # remove 'XGPS'
parts = raw.split(",")
sim_name = parts[0].strip()
longitude = float(parts[1])
latitude = float(parts[2])
alt_msl_meters = float(parts[3])
track_deg = float(parts[4])
ground_speed_mps = float(parts[5])
return XGPSData(
sim_name=sim_name,
longitude=longitude,
latitude=latitude,
alt_msl_meters=alt_msl_meters,
track_deg=track_deg,
ground_speed_mps=ground_speed_mps
)
except (ValueError, IndexError):
return UnknownData(raw_line=line)
@staticmethod
def _parse_xatt(line: str) -> Union[XATTData, UnknownData]:
"""
Example XATT line:
XATTMySim,180.2,0.1,0.2
=> XATT<sim_name>,<true_heading_deg>,<pitch_deg>,<roll_deg>
"""
try:
raw = line[4:]
parts = raw.split(",")
sim_name = parts[0].strip()
heading_deg = float(parts[1])
pitch_deg = float(parts[2])
roll_deg = float(parts[3])
return XATTData(
sim_name=sim_name,
heading_deg=heading_deg,
pitch_deg=pitch_deg,
roll_deg=roll_deg
)
except (ValueError, IndexError):
return UnknownData(raw_line=line)
################################################################################
# ForeFlight UDP Server
################################################################################
class ForeFlightUDPServer:
"""
Listens for ForeFlight-compatible data on UDP port 49002.
Parses lines and updates the shared SimData.
"""
def __init__(self, sim_data: SimData, parser: ForeFlightParser, port: int = 49002):
self.sim_data = sim_data
self.parser = parser
self.port = port
self.socket = None
self.lastDataReceivedTime = None
async def run(self):
"""
Main loop: bind to UDP port, receive data, parse, update sim_data.
We'll do a blocking recv in a background thread so we don't block the event loop.
"""
# Create UDP socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Enable broadcast & address reuse
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set blocking to True for our simple loop
self.socket.setblocking(True)
# Bind to all interfaces on the specified port:
self.socket.bind(('0.0.0.0', self.port))
print(f"[ForeFlightUDPServer] Listening on UDP port {self.port}...")
print(f"[ForeFlightUDPServer] Ready to receive data from Aerofly FS4...")
try:
while True:
try:
# When the socket is closed, this will raise an exception
data, addr = await asyncio.to_thread(self.socket.recvfrom, 1024)
line = data.decode('utf-8', errors='ignore').strip()
parsed_obj = self.parser.parse_line(line)
# If we haven't received data in a while, print a message
if self.lastDataReceivedTime is None or \
time.time() - self.lastDataReceivedTime > 5.0:
print(f"[ForeFlightUDPServer] Receiving data from {addr[0]}:{addr[1]}")
print(f"[ForeFlightUDPServer] Sample data: {line}")
print(f"[ForeFlightUDPServer] Parsed object: {parsed_obj}")
self.lastDataReceivedTime = time.time()
if isinstance(parsed_obj, XGPSData):
await self.sim_data.update_from_xgps(parsed_obj)
elif isinstance(parsed_obj, XATTData):
await self.sim_data.update_from_xatt(parsed_obj)
except (ConnectionError, OSError) as e:
# Socket was closed
print(f"[ForeFlightUDPServer] Socket error: {e}")
break
except asyncio.CancelledError:
# Task was cancelled
break
except Exception as e:
print(f"[ForeFlightUDPServer] Error receiving data: {e}")
# Continue despite errors
finally:
# Ensure socket is closed when the task ends
if self.socket:
try:
self.socket.close()
print("[ForeFlightUDPServer] Socket closed")
except Exception as ex:
print(f"[ForeFlightUDPServer] Error closing socket: {ex}")
################################################################################
# SayIntentions.AI SimAPI Server
################################################################################
class SimAPIServer:
"""
Writes SimAPI input file and reads SimAPI output file.
- Periodically writes the latest SimData to simAPI_input.json
- Reads and processes simAPI_output.jsonl for variable changes
"""
def __init__(self, sim_data: SimData, update_interval: float = 1.0):
self.sim_data = sim_data
self.update_interval = update_interval # seconds
self.last_write_time = 0
self.last_read_time = 0
# Ensure directory exists
ensure_simapi_dir()
# File paths
self.input_file_path = get_simapi_input_path()
self.output_file_path = get_simapi_output_path()
print(f"[SimAPIServer] Input file: {self.input_file_path}")
print(f"[SimAPIServer] Output file: {self.output_file_path}")
async def write_input_file(self):
"""Write the SimAPI input file with the latest sim data"""
try:
data = await self.sim_data.get_data_snapshot()
# Make sure the directory exists
simapi_dir = os.path.dirname(self.input_file_path)
try:
os.makedirs(simapi_dir, exist_ok=True)
print(f"[SimAPIServer] Ensuring directory exists: {simapi_dir}")
except Exception as dir_err:
print(f"[SimAPIServer] Error creating directory: {dir_err}")
return False
# Write the file
try:
# First write to a temporary file then rename it for atomic update
temp_path = f"{self.input_file_path}.tmp"
with open(temp_path, 'w') as f:
json.dump(data, f, indent=2)
# On Windows, we need to remove the destination file first for rename to work
if os.path.exists(self.input_file_path):
os.remove(self.input_file_path)
# Rename the temp file to the actual file
os.rename(temp_path, self.input_file_path)
self.last_write_time = time.time()
return True
except Exception as file_err:
print(f"[SimAPIServer] Error writing input file: {file_err}")
return False
except Exception as e:
print(f"[SimAPIServer] Error preparing data for input file: {e}")
return False
async def read_output_file(self):
"""
Read the SimAPI output file and process any variable changes.
The file format is JSONL (JSON Lines - one JSON object per line).
"""
try:
# If the file doesn't exist, try to create an empty one
if not os.path.exists(self.output_file_path):
try:
# Make sure the directory exists
os.makedirs(os.path.dirname(self.output_file_path), exist_ok=True)
# Create an empty file
with open(self.output_file_path, 'w') as f:
pass
print(f"[SimAPIServer] Created empty output file: {self.output_file_path}")
except Exception as create_err:
print(f"[SimAPIServer] Failed to create output file: {create_err}")
return []
# Check if file is empty
if os.path.getsize(self.output_file_path) == 0:
return []
# Read the file
try:
with open(self.output_file_path, 'r') as f:
lines = f.readlines()
except Exception as read_err:
print(f"[SimAPIServer] Error reading output file: {read_err}")
return []
# Process each line
commands = []
for line in lines:
line = line.strip()
if line: # Skip empty lines
try:
command = json.loads(line)
commands.append(command)
print(f"[SimAPIServer] Received command: {command}")
except json.JSONDecodeError as e:
print(f"[SimAPIServer] Error parsing JSON: {e}, line: {line}")
# Clear the file if we have commands
if commands:
try:
with open(self.output_file_path, 'w') as f:
pass
print(f"[SimAPIServer] Cleared output file after processing {len(commands)} commands")
except Exception as clear_err:
print(f"[SimAPIServer] Error clearing output file: {clear_err}")
self.last_read_time = time.time()
return commands
except Exception as e:
print(f"[SimAPIServer] Unexpected error reading output file: {e}")
return []
async def process_commands(self, commands):
"""
Process commands from the SimAPI output file.
Currently just logs the commands as we can't directly set sim variables
in Aerofly FS4 (you would need to implement this for your simulator).
"""
for cmd in commands:
if 'setvar' in cmd and 'value' in cmd:
var_name = cmd['setvar']
value = cmd['value']
print(f"[SimAPIServer] Should set {var_name} to {value}")
# Here you would implement the logic to set variables in Aerofly FS4
# For now, we just log the command
# Example implementation for COM/NAV frequencies:
# if var_name == "COM_ACTIVE_RADIO_SET_HZ":
# # Set COM1 active frequency in the simulator
# pass
# elif var_name == "COM_STBY_RADIO_SET_HZ":
# # Set COM1 standby frequency in the simulator
# pass
async def run(self):
"""
Main loop that periodically:
1. Writes the latest SimData to the input file
2. Reads and processes the output file
"""
try:
while True:
# Write input file
if time.time() - self.last_write_time >= self.update_interval:
await self.write_input_file()
# Read output file
if time.time() - self.last_read_time >= self.update_interval:
commands = await self.read_output_file()
if commands:
await self.process_commands(commands)
# Sleep briefly
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("[SimAPIServer] Server task cancelled")
raise
except Exception as e:
print(f"[SimAPIServer] Error in run loop: {e}")
raise
################################################################################
# Bridge Core
################################################################################
class AeroflyToSayIntentionsBridge:
"""
High-level orchestrator that sets up:
1) A SimData object
2) A ForeFlightUDPServer (listens on 49002)
3) A SimAPIServer (handles SimAPI input/output files)
4) Runs them concurrently with asyncio.
"""
def __init__(self, udp_port=49002, update_interval=1.0):
self.sim_data = SimData()
self.parser = ForeFlightParser()
self.udp_server = ForeFlightUDPServer(self.sim_data, self.parser, port=udp_port)
self.simapi_server = SimAPIServer(self.sim_data, update_interval=update_interval)
async def run(self):
await asyncio.gather(
self.udp_server.run(),
self.simapi_server.run()
)
################################################################################
# GUI Implementation
################################################################################
class BridgeGUI:
def __init__(self, master):
self.master = master
self.master.title("Aerofly FS4 to SayIntentions.AI Bridge")
self.master.geometry("600x500")
# Configure grid
self.master.grid_columnconfigure(0, weight=1)
self.master.grid_rowconfigure(1, weight=1)
# Initialize UI elements that will be created in setup_ui
self.status_label = None
self.connection_status = None
self.info_display = None
self.close_button = None
# Initialize bridge components
self.bridge = None
self.bridge_task = None
self.event_loop = None
self.bridge_thread = None
# Set up UI components
self.setup_ui()
# Set up window close protocol
self.master.protocol("WM_DELETE_WINDOW", self.close_application)
# Start the bridge
self.start_bridge()
def setup_ui(self):
"""Set up the user interface components"""
# Status Frame
status_frame = ttk.Frame(self.master, padding="10")
status_frame.grid(row=0, column=0, sticky="ew")
# Status Label
self.status_label = ttk.Label(
status_frame,
text="Status: Starting...",
font=tkfont.Font(size=10)
)
self.status_label.pack(side="left")
# Connection status
self.connection_status = ttk.Label(
status_frame,
text="Disconnected",
foreground="red",
font=tkfont.Font(size=10, weight="bold")
)
self.connection_status.pack(side="right")
# Info Frame
info_frame = ttk.Frame(self.master, padding="10")
info_frame.grid(row=1, column=0, sticky="nsew")
# Info Display
self.info_display = tk.Text(
info_frame,
wrap=tk.WORD,
width=60,
height=20,
font=tkfont.Font(family="Consolas", size=9)
)
self.info_display.pack(fill=tk.BOTH, expand=True)
# Control Frame
control_frame = ttk.Frame(self.master, padding="10")
control_frame.grid(row=2, column=0, sticky="ew")
# Buttons container
buttons_frame = ttk.Frame(control_frame)
buttons_frame.pack(fill=tk.X, expand=True)
# Test Connection Button
self.test_conn_button = ttk.Button(
buttons_frame,
text="Test Connection",
command=self.test_connection
)
self.test_conn_button.pack(side="left", padx=5)
# Manual Update Button
self.update_button = ttk.Button(
buttons_frame,
text="Update SimAPI Files",
command=self.manual_update
)
self.update_button.pack(side="left", padx=5)
# Close Button
self.close_button = ttk.Button(
control_frame,
text="Close",
command=self.close_application
)
self.close_button.pack(side="bottom", pady=10)
def test_connection(self):
"""Test connection by sending a UDP packet to known aerofly ports"""
if not self.bridge or not self.bridge.udp_server or not self.bridge.udp_server.socket:
self.show_error("Bridge not initialized")
return
try:
# Create a temporary socket for sending test packets
test_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
test_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# List of potential ports where Aerofly might be listening
# (we don't actually know if Aerofly listens on any port, but it's worth a try)
targets = [
('127.0.0.1', 49002), # Local loopback
('127.0.0.1', 49001), # Another possible port
('255.255.255.255', 49002) # Broadcast
]
# Simple ping message
message = b"PING_AEROFLY_FROM_SAYINTENTIONS_BRIDGE"
# Try sending to all potential targets
for target in targets:
try:
test_socket.sendto(message, target)
print(f"Sent test packet to {target[0]}:{target[1]}")
except Exception as e:
print(f"Failed to send to {target[0]}:{target[1]} - {e}")
# Close the socket
test_socket.close()
# Show success message in status
if self.status_label:
self.status_label.config(text="Status: Test packets sent")
# Schedule a reset of the status message
self.master.after(3000, lambda: self.status_label.config(text="Status: Running"))
except Exception as e:
self.show_error(f"Test failed: {e}")
def manual_update(self):
"""Manually trigger an update of the SimAPI files"""
if not self.bridge or not self.bridge.simapi_server:
self.show_error("Bridge not initialized")
return
try:
# Schedule the write operation in the bridge's event loop
if self.event_loop and self.bridge.simapi_server:
future = asyncio.run_coroutine_threadsafe(
self.bridge.simapi_server.write_input_file(),
self.event_loop
)
# Wait for result with a timeout
result = future.result(timeout=1.0)
# Show success/failure message
if result:
if self.status_label:
self.status_label.config(text="Status: SimAPI files updated")
else:
if self.status_label:
self.status_label.config(text="Status: Update failed")
# Schedule a reset of the status message
self.master.after(3000, lambda: self.status_label.config(text="Status: Running"))
except Exception as e:
self.show_error(f"Manual update failed: {e}")
def start_bridge(self):
"""Start the Aerofly-SayIntentions bridge in a separate thread"""
def run_bridge():
try:
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self.bridge = AeroflyToSayIntentionsBridge()
# Update status
if self.master and self.status_label:
self.master.after(0, lambda: self.status_label.config(
text="Status: Bridge started"
))
# Start monitoring UDP data
if self.master:
self.master.after(100, self.check_udp_data)
self.bridge_task = self.event_loop.create_task(self.bridge.run())
self.event_loop.run_forever()
except Exception as e:
if self.master:
self.master.after(0, lambda: self.show_error(str(e)))
self.bridge_thread = threading.Thread(target=run_bridge, daemon=True)
self.bridge_thread.start()
def check_udp_data(self):
"""Check if we're receiving UDP data and update the UI accordingly"""
# Make sure the app hasn't been closed
if not self.master or not self.master.winfo_exists():
return
# Get current time for tracking connection status
current_time = time.time()
# Default connection state
connection_state = "Disconnected"
connection_color = "red"
# Default info text
info_text = f"=== Connection Status ===\n"
info_text += f"UDP Input: Listening on port 49002\n"
# Check if we have recent data (within last 5 seconds)
has_recent_data = False
elapsed_time = 0
if self.bridge and self.bridge.udp_server.lastDataReceivedTime is not None:
elapsed_time = current_time - self.bridge.udp_server.lastDataReceivedTime
has_recent_data = elapsed_time < 5.0
if has_recent_data:
connection_state = "Connected"
connection_color = "green"
info_text += f"Last data received: {elapsed_time:.1f} seconds ago\n"
else:
connection_state = "No Data"
connection_color = "orange"
info_text += f"No data in {elapsed_time:.1f} seconds\n"
info_text += f"Make sure Aerofly FS4 is running and ForeFlight output is enabled\n"
else:
info_text += "Waiting for first data from Aerofly FS4...\n"
info_text += "1. Make sure Aerofly FS4 is running\n"
info_text += "2. Check that 'Output data to ForeFlight' is enabled in Aerofly settings\n"
info_text += "3. Ensure this computer's firewall allows UDP port 49002\n"
# Update connection status
if self.connection_status:
self.connection_status.config(
text=connection_state,
foreground=connection_color
)
# If we have simulator data, show it
if self.bridge and self.bridge.sim_data.xgps:
gps = self.bridge.sim_data.xgps
att = self.bridge.sim_data.xatt
# Add simulator data to info display
info_text += f"\n=== Simulator Data ===\n"
info_text += f"Latitude: {gps.latitude:.4f}°\n"
info_text += f"Longitude: {gps.longitude:.4f}°\n"
info_text += f"Altitude: {gps.alt_msl_meters * METERS_TO_FEET:.0f} ft\n"
info_text += f"Speed: {gps.ground_speed_mps * MPS_TO_KTS:.1f} kts\n"
if att:
info_text += f"Roll: {att.roll_deg:.1f}°\n"
info_text += f"Pitch: {att.pitch_deg:.1f}°\n"
info_text += f"Heading: {att.heading_deg:.1f}°\n"
# Add SimAPI status
info_text += f"\n=== SimAPI Status ===\n"
info_text += f"SimAPI Input: {get_simapi_input_path()}\n"
info_text += f"SimAPI Output: {get_simapi_output_path()}\n"
if self.bridge and self.bridge.simapi_server:
if self.bridge.simapi_server.last_write_time > 0:
info_text += f"Last Write: {time.strftime('%H:%M:%S', time.localtime(self.bridge.simapi_server.last_write_time))}\n"
else:
info_text += "Last Write: Never\n"
if self.bridge.simapi_server.last_read_time > 0:
info_text += f"Last Read: {time.strftime('%H:%M:%S', time.localtime(self.bridge.simapi_server.last_read_time))}\n"
else:
info_text += "Last Read: Never\n"
# Update the info display
if self.info_display:
self.info_display.delete(1.0, tk.END)
self.info_display.insert(tk.END, info_text)
# Schedule next check if the app is still running
if self.master and self.master.winfo_exists():
self.master.after(100, self.check_udp_data)
def show_error(self, error_msg):
"""Display error message in the info display"""
if self.info_display:
self.info_display.delete(1.0, tk.END)
self.info_display.insert(tk.END, f"Error: {error_msg}")
if self.status_label:
self.status_label.config(text="Status: Error")
if self.connection_status:
self.connection_status.config(text="Error", foreground="red")
def close_application(self):
"""Clean up and close the application"""
# First destroy the UI to prevent more UI updates
if self.master:
# Create local reference to avoid using self.master after destroy
master_copy = self.master
self.master = None # Remove reference to prevent future access
master_copy.destroy()
async def shutdown_async():
try:
# Cancel the main bridge task
if self.bridge_task:
self.bridge_task.cancel()
try:
await self.bridge_task
except asyncio.CancelledError:
pass
# Close socket if it exists
if self.bridge and self.bridge.udp_server and self.bridge.udp_server.socket:
self.bridge.udp_server.socket.close()
except Exception as err:
print(f"Error during async shutdown: {err}")
if self.event_loop and self.event_loop.is_running():
try:
# Schedule the shutdown coroutine and stop the loop
asyncio.run_coroutine_threadsafe(shutdown_async(), self.event_loop)
# Give it a moment to execute
time.sleep(0.2)
# Then stop the loop
self.event_loop.stop()
except Exception as e:
print(f"Error during shutdown: {e}")
################################################################################
# Entry Point
################################################################################
def run_cli():
"""Run the bridge in command-line mode (no GUI)"""
bridge = AeroflyToSayIntentionsBridge()
print("Starting Aerofly FS4 to SayIntentions.AI Bridge in CLI mode...")
print("Press Ctrl+C to exit")
try:
asyncio.run(bridge.run())
except KeyboardInterrupt:
print("\nBridge shutting down.")
def run_gui():
"""Run the bridge with GUI"""
root = tk.Tk()
BridgeGUI(root)
root.mainloop()
if __name__ == "__main__":
# By default, run with GUI
# Use '--cli' flag to run without GUI
if len(sys.argv) > 1 and sys.argv[1] == '--cli':
run_cli()
else:
run_gui()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment