Last active
April 16, 2025 02:25
-
-
Save jlgabriel/74f7b98e9eee7e7edd98f1b0b4d76299 to your computer and use it in GitHub Desktop.
Aerofly FS4 to IGC Recorder - developed to connect Aerofly FS4 Flight Simulator to generate gliders IGC flight logs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
################################################################################## | |
# Aerofly FS4 to IGC Recorder | |
# Developed to connect Aerofly FS4 Flight Simulator and generate IGC flight logs | |
# Copyright (c) 2025 Juan Luis Gabriel | |
# This software is released under the MIT License. | |
# https://opensource.org/licenses/MIT | |
################################################################################## | |
import asyncio | |
import socket | |
import json | |
import time | |
import tkinter as tk | |
from tkinter import ttk, font as tkfont, filedialog | |
import threading | |
from dataclasses import dataclass | |
import sys | |
import os | |
from typing import Optional, Union, Dict, Any, List | |
from pathlib import Path | |
import datetime | |
# Import aerofiles for IGC file handling | |
from aerofiles.igc import Writer | |
################################################################################ | |
# Constants & Utilities | |
################################################################################ | |
METERS_TO_FEET = 3.28084 | |
MPS_TO_KTS = 1.94384 | |
MPS_TO_FPM = 196.85 # Meters per second to feet per minute | |
# Define paths for output files | |
def get_default_igc_dir(): | |
"""Get the default directory for IGC files""" | |
documents_dir = os.path.join(os.path.expanduser("~"), "Documents") | |
igc_dir = os.path.join(documents_dir, "AeroflyIGC") | |
os.makedirs(igc_dir, exist_ok=True) | |
return igc_dir | |
def generate_igc_filename(): | |
"""Generate a filename for the IGC file based on current date and time""" | |
now = datetime.datetime.now() | |
return f"AEROFLY_{now.strftime('%Y%m%d_%H%M%S')}.igc" | |
################################################################################ | |
# Data Classes for Parsed ForeFlight Messages | |
################################################################################ | |
@dataclass | |
class XGPSData: | |
""" | |
Represents ForeFlight XGPS data. | |
e.g. XGPS<sim_name>,<longitude>,<latitude>,<altitude_msl_meters>,<track_true_north>,<groundspeed_m/s> | |
""" | |
sim_name: str | |
longitude: float | |
latitude: float | |
alt_msl_meters: float | |
track_deg: float | |
ground_speed_mps: float | |
@dataclass | |
class XATTData: | |
""" | |
Represents ForeFlight XATT data. | |
e.g. XATT<sim_name>,<true_heading>,<pitch_degrees>,<roll_degrees> | |
""" | |
sim_name: str | |
heading_deg: float | |
pitch_deg: float | |
roll_deg: float | |
@dataclass | |
class UnknownData: | |
""" | |
Returned if we fail to parse or data type is not recognized. | |
""" | |
raw_line: str | |
################################################################################ | |
# IGC Recorder Class | |
################################################################################ | |
class IGCRecorder: | |
""" | |
Records flight data to an IGC file using aerofiles library. | |
Handles both standard IGC records and custom attitude data. | |
""" | |
def __init__(self): | |
self.recording = False | |
self.igc_file = None | |
self.writer = None # Reference to aerofiles Writer | |
self.filename = None | |
self.start_time = None | |
self.end_time = None | |
self.fix_count = 0 | |
self._lock = asyncio.Lock() | |
# Queue for position data | |
self.position_queue = asyncio.Queue() | |
# Task for processing queue | |
self.processing_task = None | |
def _get_fixed_time(self, timestamp=None): | |
""" | |
Returns a time object fixed at 12:00, but preserves the original date. | |
Can be used to generate fixed timestamps for IGC files. | |
""" | |
# If no timestamp is provided, use the current date | |
if timestamp is None: | |
try: | |
# For Python 3.11+ | |
timestamp = datetime.datetime.now(datetime.UTC) | |
except AttributeError: | |
# For older Python versions | |
timestamp = datetime.datetime.utcnow() | |
# Create a new datetime with the same date but at 12:00:00 | |
fixed_date = timestamp.date() | |
fixed_time = datetime.time(hour=12, minute=0, second=0) | |
# Return only the time object to use in B records | |
return fixed_time | |
async def start_recording(self, pilot_name="", glider_type="Aerofly FS4", | |
glider_id="SIM", gps_device="Aerofly GPS"): | |
"""Start recording a new IGC file""" | |
async with self._lock: | |
if self.recording: | |
print("[IGCRecorder] Already recording") | |
return False | |
try: | |
# Generate filename based on current date and time | |
self.filename = os.path.join(get_default_igc_dir(), generate_igc_filename()) | |
# Open file for writing in binary mode (required by aerofiles) | |
self.igc_file = open(self.filename, 'wb') | |
# Create aerofiles IGC writer | |
self.writer = Writer(self.igc_file) | |
# Get current time | |
try: | |
# For Python 3.11+ | |
utc_now = datetime.datetime.now(datetime.UTC) | |
except AttributeError: | |
# For older Python versions | |
utc_now = datetime.datetime.utcnow() | |
self.start_time = utc_now | |
# Initialize seconds counter for time simulation | |
self._second_counter = 0 | |
# Write date record - keep the original date | |
self.writer.write_date(utc_now.date()) | |
# Write basic header records | |
if pilot_name: | |
self.writer.write_pilot(pilot_name) | |
if glider_type: | |
self.writer.write_glider_type(glider_type) | |
if glider_id: | |
self.writer.write_glider_id(glider_id) | |
# Write logger info | |
self.writer.write_comment("LOG", "Aerofly FS4 Simulator") | |
self.writer.write_firmware_version("1.0") | |
# Add a comment about the source | |
self.writer.write_comment("GEN", "Generated by Aerofly FS4 to IGC Recorder") | |
# Reset fix count | |
self.fix_count = 0 | |
# Start recording | |
self.recording = True | |
# Start the processing task | |
self.processing_task = asyncio.create_task(self._process_queue()) | |
print(f"[IGCRecorder] Started recording to {self.filename}") | |
return True | |
except Exception as e: | |
print(f"[IGCRecorder] Error starting recording: {e}") | |
# Clean up if error occurs | |
if hasattr(self, 'igc_file') and self.igc_file: | |
self.igc_file.close() | |
self.igc_file = None | |
self.recording = False | |
return False | |
def _write_extensions_declaration(self): | |
""" | |
Write I record to declare extensions used in K records | |
This is a good practice for IGC files with custom extensions | |
""" | |
try: | |
# # Write a comment explaining the extensions | |
# self.writer.write_comment("ATT", "Attitude data: heading, roll, pitch") | |
# | |
# # Since aerofiles doesn't support I records directly, we write them manually | |
# # Format: I + number of extensions + list of extensions | |
# i_record = "I033638HDG3839ROLL4345PITCH\n" | |
# self.igc_file.write(i_record.encode('utf-8')) | |
return True | |
except Exception as e: | |
print(f"[IGCRecorder] Error writing extensions declaration: {e}") | |
return False | |
async def stop_recording(self): | |
"""Stop recording and close the IGC file""" | |
async with self._lock: | |
if not self.recording: | |
print("[IGCRecorder] Not recording") | |
return False | |
try: | |
# Set end time | |
try: | |
self.end_time = datetime.datetime.now(datetime.UTC) | |
except AttributeError: | |
self.end_time = datetime.datetime.utcnow() | |
# Cancel the processing task | |
if self.processing_task: | |
self.processing_task.cancel() | |
try: | |
await self.processing_task | |
except asyncio.CancelledError: | |
pass | |
self.processing_task = None | |
# Process any remaining items in the queue | |
while not self.position_queue.empty(): | |
try: | |
gps_data, att_data, timestamp = await self.position_queue.get() | |
await self._write_position(gps_data, att_data, timestamp) | |
self.position_queue.task_done() | |
except Exception as e: | |
print(f"[IGCRecorder] Error processing remaining queue items: {e}") | |
# Close the file | |
if hasattr(self, 'igc_file') and self.igc_file: | |
# Add final comment with recording end time | |
if hasattr(self, 'writer'): | |
try: | |
self.writer.write_comment("END", f"Recording ended at {self.end_time.strftime('%H:%M:%S')}") | |
except Exception as e: | |
print(f"[IGCRecorder] Error writing end comment: {e}") | |
# Close the file | |
self.igc_file.close() | |
self.igc_file = None | |
if self.fix_count > 0: | |
print(f"[IGCRecorder] Stopped recording. Wrote {self.fix_count} fixes to {self.filename}") | |
result_file = self.filename | |
self.filename = None | |
self.recording = False | |
return result_file | |
else: | |
print("[IGCRecorder] No data recorded, deleting file") | |
if self.filename and os.path.exists(self.filename): | |
os.remove(self.filename) | |
self.filename = None | |
self.recording = False | |
return None | |
else: | |
self.recording = False | |
return None | |
except Exception as e: | |
print(f"[IGCRecorder] Error stopping recording: {e}") | |
# Clean up even if error occurs | |
if hasattr(self, 'igc_file') and self.igc_file: | |
self.igc_file.close() | |
self.igc_file = None | |
self.recording = False | |
return None | |
async def add_position(self, gps_data: XGPSData, att_data: Optional[XATTData] = None): | |
"""Add a position fix to the IGC file""" | |
if not self.recording: | |
return False | |
try: | |
# Get current timestamp - ensure we use the same type of datetime as in start_recording | |
# Check if start_time is timezone-aware | |
if hasattr(self.start_time, 'tzinfo') and self.start_time.tzinfo is not None: | |
# Use timezone-aware datetime | |
try: | |
timestamp = datetime.datetime.now(datetime.UTC) | |
except AttributeError: | |
# For older Python, create an aware datetime | |
timestamp = datetime.datetime.now(datetime.timezone.utc) | |
else: | |
# Use naive datetime to match | |
timestamp = datetime.datetime.utcnow() | |
# Add to queue for processing | |
await self.position_queue.put((gps_data, att_data, timestamp)) | |
return True | |
except Exception as e: | |
print(f"[IGCRecorder] Error adding position: {e}") | |
return False | |
async def _process_queue(self): | |
"""Process the position queue in the background""" | |
try: | |
while True: | |
# Get item from queue | |
gps_data, att_data, timestamp = await self.position_queue.get() | |
# Process item | |
await self._write_position(gps_data, att_data, timestamp) | |
# Mark item as done | |
self.position_queue.task_done() | |
# Small sleep to avoid consuming too much CPU | |
await asyncio.sleep(0.01) | |
except asyncio.CancelledError: | |
# Task was cancelled, clean exit | |
print("[IGCRecorder] Position processing task cancelled") | |
raise | |
except Exception as e: | |
print(f"[IGCRecorder] Error in position processing task: {e}") | |
async def _write_k_record(self, timestamp: datetime.time, code: str, data: str): | |
""" | |
Write a custom K record (extension data) to the IGC file | |
Since aerofiles doesn't support K records directly, we write them manually | |
""" | |
if not self.recording or not self.igc_file: | |
return False | |
try: | |
# Format time as HHMMSS | |
time_str = timestamp.strftime('%H%M%S') | |
# Format K record: K + time + data | |
k_record = f"K{time_str}{code}{data}\n" | |
# Write directly to the file - ensure it's encoded to bytes | |
# since we opened the file in binary mode | |
self.igc_file.write(k_record.encode('utf-8')) | |
return True | |
except Exception as e: | |
print(f"[IGCRecorder] Error writing K record: {e}") | |
return False | |
async def _write_position(self, gps_data: XGPSData, att_data: Optional[XATTData] = None, | |
timestamp: datetime.datetime = None): | |
"""Write position fix to the IGC file using aerofiles""" | |
async with self._lock: | |
if not self.recording or not self.igc_file or not hasattr(self, 'writer'): | |
return False | |
try: | |
# Use current time if not provided | |
if timestamp is None: | |
try: | |
timestamp = datetime.datetime.now(datetime.UTC) | |
except AttributeError: | |
timestamp = datetime.datetime.utcnow() | |
# Extract data from GPS | |
latitude = gps_data.latitude | |
longitude = gps_data.longitude | |
altitude = int(gps_data.alt_msl_meters) # IGC uses meters | |
# Get pressure altitude (we don't have this in simulator, use MSL as approximation) | |
pressure_alt = altitude | |
# For screen display, continue incrementing the seconds | |
if not hasattr(self, '_second_counter'): | |
self._second_counter = 0 | |
else: | |
self._second_counter += 1 | |
# Calculate hours, minutes, and seconds properly | |
total_seconds = self._second_counter | |
hours = 12 # Start at 12:00:00 | |
minutes = (total_seconds // 60) % 60 | |
seconds = total_seconds % 60 | |
# Create a properly incremented time | |
simulated_time = datetime.time( | |
hour=hours, | |
minute=minutes, | |
second=seconds, | |
microsecond=0 | |
) | |
# Write B record (position fix) using aerofiles | |
self.writer.write_fix( | |
time=simulated_time, # Use simulated time that increases properly | |
latitude=latitude, | |
longitude=longitude, | |
pressure_alt=pressure_alt, | |
gps_alt=altitude | |
) | |
# Increment fix count | |
self.fix_count += 1 | |
return True | |
except Exception as e: | |
print(f"[IGCRecorder] Error writing position: {e}") | |
return False | |
################################################################################ | |
# ForeFlight Data Parser | |
################################################################################ | |
class ForeFlightParser: | |
""" | |
Parses strings in ForeFlight's XGPS / XATT formats, | |
returning typed objects: XGPSData, XATTData, or UnknownData. | |
""" | |
@staticmethod | |
def parse_line(line: str): | |
""" | |
Identify the data type (XGPS, XATT) and parse accordingly. | |
""" | |
line = line.strip() | |
if line.startswith("XGPS"): | |
return ForeFlightParser._parse_xgps(line) | |
elif line.startswith("XATT"): | |
return ForeFlightParser._parse_xatt(line) | |
else: | |
return UnknownData(raw_line=line) | |
@staticmethod | |
def _parse_xgps(line: str) -> Union[XGPSData, UnknownData]: | |
""" | |
Example XGPS line: | |
XGPSMySim,-80.11,34.55,1200.1,359.05,55.6 | |
=> XGPS<sim_name>,<longitude>,<latitude>,<alt_msl_meters>,<track_deg_true>,<groundspeed_m/s> | |
""" | |
try: | |
raw = line[4:] # remove 'XGPS' | |
parts = raw.split(",") | |
sim_name = parts[0].strip() | |
longitude = float(parts[1]) | |
latitude = float(parts[2]) | |
alt_msl_meters = float(parts[3]) | |
track_deg = float(parts[4]) | |
ground_speed_mps = float(parts[5]) | |
return XGPSData( | |
sim_name=sim_name, | |
longitude=longitude, | |
latitude=latitude, | |
alt_msl_meters=alt_msl_meters, | |
track_deg=track_deg, | |
ground_speed_mps=ground_speed_mps | |
) | |
except (ValueError, IndexError): | |
return UnknownData(raw_line=line) | |
@staticmethod | |
def _parse_xatt(line: str) -> Union[XATTData, UnknownData]: | |
""" | |
Example XATT line: | |
XATTMySim,180.2,0.1,0.2 | |
=> XATT<sim_name>,<true_heading_deg>,<pitch_deg>,<roll_deg> | |
""" | |
try: | |
raw = line[4:] | |
parts = raw.split(",") | |
sim_name = parts[0].strip() | |
heading_deg = float(parts[1]) | |
pitch_deg = float(parts[2]) | |
roll_deg = float(parts[3]) | |
return XATTData( | |
sim_name=sim_name, | |
heading_deg=heading_deg, | |
pitch_deg=pitch_deg, | |
roll_deg=roll_deg | |
) | |
except (ValueError, IndexError): | |
return UnknownData(raw_line=line) | |
################################################################################ | |
# ForeFlight UDP Server | |
################################################################################ | |
class ForeFlightUDPServer: | |
""" | |
Listens for ForeFlight-compatible data on UDP port 49002. | |
Parses lines and updates the shared SimData and IGC recorder. | |
""" | |
def __init__(self, igc_recorder: IGCRecorder, parser: ForeFlightParser, port: int = 49002): | |
self.igc_recorder = igc_recorder | |
self.parser = parser | |
self.port = port | |
self.socket = None | |
self.lastDataReceivedTime = None | |
self.last_record_time = None # Nuevo: para seguimiento del último registro | |
# Keep track of latest GPS and ATT data | |
self.latest_gps = None | |
self.latest_att = None | |
async def run(self): | |
""" | |
Main loop: bind to UDP port, receive data, parse, update igc_recorder. | |
We'll do a blocking recv in a background thread so we don't block the event loop. | |
""" | |
# Create UDP socket | |
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# Enable broadcast & address reuse | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
# Set blocking to True for our simple loop | |
self.socket.setblocking(True) | |
# Bind to all interfaces on the specified port: | |
self.socket.bind(('0.0.0.0', self.port)) | |
print(f"[ForeFlightUDPServer] Listening on UDP port {self.port}...") | |
print(f"[ForeFlightUDPServer] Ready to receive data from Aerofly FS4...") | |
try: | |
while True: | |
try: | |
# When the socket is closed, this will raise an exception | |
data, addr = await asyncio.to_thread(self.socket.recvfrom, 1024) | |
line = data.decode('utf-8', errors='ignore').strip() | |
parsed_obj = self.parser.parse_line(line) | |
# If we haven't received data in a while, print a message | |
if self.lastDataReceivedTime is None or \ | |
time.time() - self.lastDataReceivedTime > 5.0: | |
print(f"[ForeFlightUDPServer] Receiving data from {addr[0]}:{addr[1]}") | |
print(f"[ForeFlightUDPServer] Sample data: {line}") | |
print(f"[ForeFlightUDPServer] Parsed object: {parsed_obj}") | |
self.lastDataReceivedTime = time.time() | |
# Update latest data | |
if isinstance(parsed_obj, XGPSData): | |
self.latest_gps = parsed_obj | |
# Si estamos grabando y tenemos GPS data, verificar si ha pasado 1 segundo | |
current_time = time.time() | |
if (self.igc_recorder.recording and self.latest_gps and | |
(self.last_record_time is None or | |
current_time - self.last_record_time >= 1.0)): | |
await self.igc_recorder.add_position(self.latest_gps, self.latest_att) | |
self.last_record_time = current_time | |
elif isinstance(parsed_obj, XATTData): | |
self.latest_att = parsed_obj | |
except (ConnectionError, OSError) as e: | |
# Socket was closed | |
print(f"[ForeFlightUDPServer] Socket error: {e}") | |
break | |
except asyncio.CancelledError: | |
# Task was cancelled | |
break | |
except Exception as e: | |
print(f"[ForeFlightUDPServer] Error receiving data: {e}") | |
# Continue despite errors | |
finally: | |
# Ensure socket is closed when the task ends | |
if self.socket: | |
try: | |
self.socket.close() | |
print("[ForeFlightUDPServer] Socket closed") | |
except Exception as ex: | |
print(f"[ForeFlightUDPServer] Error closing socket: {ex}") | |
################################################################################ | |
# Bridge Core | |
################################################################################ | |
class AeroflyToIGCBridge: | |
""" | |
High-level orchestrator that sets up: | |
1) An IGCRecorder object | |
2) A ForeFlightUDPServer (listens on 49002) | |
3) Runs them concurrently with asyncio. | |
""" | |
def __init__(self, udp_port=49002): | |
self.igc_recorder = IGCRecorder() | |
self.parser = ForeFlightParser() | |
self.udp_server = ForeFlightUDPServer(self.igc_recorder, self.parser, port=udp_port) | |
async def run(self): | |
await self.udp_server.run() | |
async def start_recording(self, pilot_name="", glider_type="", glider_id=""): | |
"""Start recording an IGC file""" | |
return await self.igc_recorder.start_recording( | |
pilot_name=pilot_name, | |
glider_type=glider_type or "Aerofly FS4", | |
glider_id=glider_id or "SIM" | |
) | |
async def stop_recording(self): | |
"""Stop recording and return the filename of the IGC file""" | |
return await self.igc_recorder.stop_recording() | |
@property | |
def is_recording(self): | |
"""Return True if recording is in progress""" | |
return self.igc_recorder.recording | |
@property | |
def fix_count(self): | |
"""Return the number of fixes recorded so far""" | |
return self.igc_recorder.fix_count | |
@property | |
def current_filename(self): | |
"""Return the current IGC filename being recorded""" | |
return self.igc_recorder.filename | |
################################################################################ | |
# GUI Implementation | |
################################################################################ | |
class BridgeGUI: | |
def __init__(self, master): | |
self.master = master | |
self.master.title("Aerofly FS4 to IGC Recorder") | |
self.master.geometry("600x580") | |
# Configure grid | |
self.master.grid_columnconfigure(0, weight=1) | |
self.master.grid_rowconfigure(1, weight=1) | |
# Initialize UI elements that will be created in setup_ui | |
self.status_label = None | |
self.connection_status = None | |
self.info_display = None | |
self.recording_status = None | |
self.start_button = None | |
self.stop_button = None | |
self.pilot_entry = None | |
self.glider_entry = None | |
self.registration_entry = None | |
# Initialize bridge components | |
self.bridge = None | |
self.bridge_task = None | |
self.event_loop = None | |
self.bridge_thread = None | |
# Set up UI components | |
self.setup_ui() | |
# Set up window close protocol | |
self.master.protocol("WM_DELETE_WINDOW", self.close_application) | |
# Start the bridge | |
self.start_bridge() | |
def setup_ui(self): | |
"""Set up the user interface components""" | |
# Status Frame | |
status_frame = ttk.Frame(self.master, padding="10") | |
status_frame.grid(row=0, column=0, sticky="ew") | |
# Status Label | |
self.status_label = ttk.Label( | |
status_frame, | |
text="Status: Starting...", | |
font=tkfont.Font(size=10) | |
) | |
self.status_label.pack(side="left") | |
# Connection status | |
self.connection_status = ttk.Label( | |
status_frame, | |
text="Disconnected", | |
foreground="red", | |
font=tkfont.Font(size=10, weight="bold") | |
) | |
self.connection_status.pack(side="right") | |
# Info Frame | |
info_frame = ttk.Frame(self.master, padding="10") | |
info_frame.grid(row=1, column=0, sticky="nsew") | |
# Info Display | |
self.info_display = tk.Text( | |
info_frame, | |
wrap=tk.WORD, | |
width=60, | |
height=20, | |
font=tkfont.Font(family="Consolas", size=9) | |
) | |
self.info_display.pack(fill=tk.BOTH, expand=True) | |
# Recording Status | |
recording_frame = ttk.Frame(self.master, padding="10") | |
recording_frame.grid(row=2, column=0, sticky="ew") | |
self.recording_status = ttk.Label( | |
recording_frame, | |
text="Not Recording", | |
foreground="gray", | |
font=tkfont.Font(size=12, weight="bold") | |
) | |
self.recording_status.pack(side="top", fill="x") | |
# Flight details frame | |
flight_frame = ttk.Frame(self.master, padding="10") | |
flight_frame.grid(row=3, column=0, sticky="ew") | |
# Pilot name | |
ttk.Label(flight_frame, text="Pilot Name:").grid(row=0, column=0, sticky="w", padx=5, pady=5) | |
self.pilot_entry = ttk.Entry(flight_frame, width=30) | |
self.pilot_entry.grid(row=0, column=1, sticky="ew", padx=5, pady=5) | |
self.pilot_entry.insert(0, "Simulator Pilot") | |
# Glider type | |
ttk.Label(flight_frame, text="Aircraft Type:").grid(row=1, column=0, sticky="w", padx=5, pady=5) | |
self.glider_entry = ttk.Entry(flight_frame, width=30) | |
self.glider_entry.grid(row=1, column=1, sticky="ew", padx=5, pady=5) | |
self.glider_entry.insert(0, "Aerofly FS4") | |
# Registration | |
ttk.Label(flight_frame, text="Registration:").grid(row=2, column=0, sticky="w", padx=5, pady=5) | |
self.registration_entry = ttk.Entry(flight_frame, width=30) | |
self.registration_entry.grid(row=2, column=1, sticky="ew", padx=5, pady=5) | |
self.registration_entry.insert(0, "SIM") | |
# Configure the grid columns | |
flight_frame.columnconfigure(1, weight=1) | |
# Control Frame | |
control_frame = ttk.Frame(self.master, padding="10") | |
control_frame.grid(row=4, column=0, sticky="ew") | |
# Buttons container | |
buttons_frame = ttk.Frame(control_frame) | |
buttons_frame.pack(fill=tk.X, expand=True) | |
# Start Recording Button | |
self.start_button = ttk.Button( | |
buttons_frame, | |
text="Start Recording", | |
command=self.start_recording | |
) | |
self.start_button.pack(side="left", padx=5) | |
# Stop Recording Button | |
self.stop_button = ttk.Button( | |
buttons_frame, | |
text="Stop Recording", | |
command=self.stop_recording, | |
state="disabled" | |
) | |
self.stop_button.pack(side="left", padx=5) | |
# Open IGC Folder Button | |
self.open_folder_button = ttk.Button( | |
buttons_frame, | |
text="Open IGC Folder", | |
command=self.open_igc_folder | |
) | |
self.open_folder_button.pack(side="right", padx=5) | |
# Close Button | |
self.close_button = ttk.Button( | |
control_frame, | |
text="Close", | |
command=self.close_application | |
) | |
self.close_button.pack(side="bottom", pady=10) | |
def open_igc_folder(self): | |
"""Open the folder containing IGC files""" | |
igc_dir = get_default_igc_dir() | |
if os.path.exists(igc_dir): | |
if sys.platform == 'win32': | |
os.startfile(igc_dir) | |
elif sys.platform == 'darwin': # macOS | |
import subprocess | |
subprocess.Popen(['open', igc_dir]) | |
else: # Linux | |
import subprocess | |
subprocess.Popen(['xdg-open', igc_dir]) | |
else: | |
print(f"IGC directory does not exist: {igc_dir}") | |
def start_recording(self): | |
"""Start recording an IGC file""" | |
if not self.bridge: | |
self.show_error("Bridge not initialized") | |
return | |
# Get pilot info from entries | |
pilot_name = self.pilot_entry.get() or "Simulator Pilot" | |
glider_type = self.glider_entry.get() or "Aerofly FS4" | |
glider_id = self.registration_entry.get() or "SIM" | |
# Schedule the recording start in the bridge's event loop | |
if self.event_loop: | |
future = asyncio.run_coroutine_threadsafe( | |
self.bridge.start_recording( | |
pilot_name=pilot_name, | |
glider_type=glider_type, | |
glider_id=glider_id | |
), | |
self.event_loop | |
) | |
try: | |
# Wait for result with a timeout | |
result = future.result(timeout=1.0) | |
if result: | |
# Update UI | |
self.recording_status.config( | |
text="Recording...", | |
foreground="green" | |
) | |
self.start_button.config(state="disabled") | |
self.stop_button.config(state="normal") | |
# Disable entries during recording | |
self.pilot_entry.config(state="disabled") | |
self.glider_entry.config(state="disabled") | |
self.registration_entry.config(state="disabled") | |
# Update status | |
if self.status_label: | |
self.status_label.config(text="Status: Recording flight") | |
else: | |
self.show_error("Failed to start recording") | |
except Exception as e: | |
self.show_error(f"Error starting recording: {e}") | |
def stop_recording(self): | |
"""Stop recording and save the IGC file""" | |
if not self.bridge: | |
self.show_error("Bridge not initialized") | |
return | |
# Schedule the recording stop in the bridge's event loop | |
if self.event_loop: | |
future = asyncio.run_coroutine_threadsafe( | |
self.bridge.stop_recording(), | |
self.event_loop | |
) | |
try: | |
# Wait for result with a timeout | |
result = future.result(timeout=1.0) | |
# Update UI | |
self.recording_status.config( | |
text="Not Recording", | |
foreground="gray" | |
) | |
self.start_button.config(state="normal") | |
self.stop_button.config(state="disabled") | |
# Re-enable entries | |
self.pilot_entry.config(state="normal") | |
self.glider_entry.config(state="normal") | |
self.registration_entry.config(state="normal") | |
# Update status | |
if self.status_label: | |
if result: | |
self.status_label.config(text=f"Status: Flight saved to {os.path.basename(result)}") | |
# Show success message | |
tk.messagebox.showinfo( | |
"Recording Complete", | |
f"Flight recorded and saved to:\n{result}" | |
) | |
else: | |
self.status_label.config(text="Status: No flight data recorded") | |
except Exception as e: | |
self.show_error(f"Error stopping recording: {e}") | |
def start_bridge(self): | |
"""Start the Aerofly-IGC bridge in a separate thread""" | |
def run_bridge(): | |
try: | |
self.event_loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self.event_loop) | |
self.bridge = AeroflyToIGCBridge() | |
# Update status | |
if self.master and self.status_label: | |
self.master.after(0, lambda: self.status_label.config( | |
text="Status: Bridge started" | |
)) | |
# Start monitoring UDP data | |
if self.master: | |
self.master.after(100, self.check_udp_data) | |
self.bridge_task = self.event_loop.create_task(self.bridge.run()) | |
self.event_loop.run_forever() | |
except Exception as e: | |
if self.master: | |
self.master.after(0, lambda: self.show_error(str(e))) | |
self.bridge_thread = threading.Thread(target=run_bridge, daemon=True) | |
self.bridge_thread.start() | |
def check_udp_data(self): | |
"""Check if we're receiving UDP data and update the UI accordingly""" | |
# Make sure the app hasn't been closed | |
if not self.master or not self.master.winfo_exists(): | |
return | |
# Get current time for tracking connection status | |
current_time = time.time() | |
# Default connection state | |
connection_state = "Disconnected" | |
connection_color = "red" | |
# Default info text | |
info_text = f"=== Connection Status ===\n" | |
info_text += f"UDP Input: Listening on port 49002\n" | |
# Check if we have recent data (within last 5 seconds) | |
has_recent_data = False | |
elapsed_time = 0 | |
if self.bridge and self.bridge.udp_server.lastDataReceivedTime is not None: | |
elapsed_time = current_time - self.bridge.udp_server.lastDataReceivedTime | |
has_recent_data = elapsed_time < 5.0 | |
if has_recent_data: | |
connection_state = "Connected" | |
connection_color = "green" | |
info_text += f"Last data received: {elapsed_time:.1f} seconds ago\n" | |
else: | |
connection_state = "No Data" | |
connection_color = "orange" | |
info_text += f"No data in {elapsed_time:.1f} seconds\n" | |
info_text += f"Make sure Aerofly FS4 is running and ForeFlight output is enabled\n" | |
else: | |
info_text += "Waiting for first data from Aerofly FS4...\n" | |
info_text += "1. Make sure Aerofly FS4 is running\n" | |
info_text += "2. Check that 'Output data to ForeFlight' is enabled in Aerofly settings\n" | |
info_text += "3. Ensure this computer's firewall allows UDP port 49002\n" | |
# Update connection status | |
if self.connection_status: | |
self.connection_status.config( | |
text=connection_state, | |
foreground=connection_color | |
) | |
# If we have simulator data, show it | |
if self.bridge and self.bridge.udp_server.latest_gps: | |
gps = self.bridge.udp_server.latest_gps | |
att = self.bridge.udp_server.latest_att | |
# Add simulator data to info display | |
info_text += f"\n=== Simulator Data ===\n" | |
info_text += f"Latitude: {gps.latitude:.6f}°\n" | |
info_text += f"Longitude: {gps.longitude:.6f}°\n" | |
info_text += f"Altitude: {gps.alt_msl_meters * METERS_TO_FEET:.0f} ft\n" | |
info_text += f"Speed: {gps.ground_speed_mps * MPS_TO_KTS:.1f} kts\n" | |
if att: | |
info_text += f"Roll: {att.roll_deg:.1f}°\n" | |
info_text += f"Pitch: {att.pitch_deg:.1f}°\n" | |
info_text += f"Heading: {att.heading_deg:.1f}°\n" | |
# Add recording status | |
if self.bridge and self.bridge.is_recording: | |
info_text += f"\n=== Recording Status ===\n" | |
info_text += f"Recording to: {os.path.basename(self.bridge.current_filename)}\n" | |
info_text += f"Fixes recorded: {self.bridge.fix_count}\n" | |
# Calculate recording duration | |
if self.bridge.igc_recorder.start_time: | |
try: | |
# If start_time is timezone-aware | |
if hasattr(self.bridge.igc_recorder.start_time, | |
'tzinfo') and self.bridge.igc_recorder.start_time.tzinfo is not None: | |
# Use timezone-aware datetime | |
try: | |
current = datetime.datetime.now(datetime.UTC) | |
except AttributeError: | |
current = datetime.datetime.now(datetime.timezone.utc) | |
else: | |
# If start_time is naive (no timezone) | |
current = datetime.datetime.utcnow() | |
duration = current - self.bridge.igc_recorder.start_time | |
hours, remainder = divmod(duration.total_seconds(), 3600) | |
minutes, seconds = divmod(remainder, 60) | |
info_text += f"Duration: {int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}\n" | |
except Exception as e: | |
info_text += f"Duration calculation error: {e}\n" | |
# Update the info display | |
if self.info_display: | |
self.info_display.delete(1.0, tk.END) | |
self.info_display.insert(tk.END, info_text) | |
# Schedule next check if the app is still running | |
if self.master and self.master.winfo_exists(): | |
self.master.after(100, self.check_udp_data) | |
def show_error(self, error_msg): | |
"""Display error message in the info display""" | |
if self.info_display: | |
self.info_display.delete(1.0, tk.END) | |
self.info_display.insert(tk.END, f"Error: {error_msg}") | |
if self.status_label: | |
self.status_label.config(text="Status: Error") | |
if self.connection_status: | |
self.connection_status.config(text="Error", foreground="red") | |
def close_application(self): | |
"""Clean up and close the application""" | |
# If recording, stop it | |
if self.bridge and self.bridge.is_recording and self.event_loop: | |
try: | |
# Schedule the recording stop in the bridge's event loop | |
future = asyncio.run_coroutine_threadsafe( | |
self.bridge.stop_recording(), | |
self.event_loop | |
) | |
# Wait for result with a short timeout | |
future.result(timeout=0.5) | |
except Exception as e: | |
print(f"Error stopping recording during shutdown: {e}") | |
# First destroy the UI to prevent more UI updates | |
if self.master: | |
# Create local reference to avoid using self.master after destroy | |
master_copy = self.master | |
self.master = None # Remove reference to prevent future access | |
master_copy.destroy() | |
async def shutdown_async(): | |
try: | |
# Cancel the main bridge task | |
if self.bridge_task: | |
self.bridge_task.cancel() | |
try: | |
await self.bridge_task | |
except asyncio.CancelledError: | |
pass | |
# Close socket if it exists | |
if self.bridge and self.bridge.udp_server and self.bridge.udp_server.socket: | |
self.bridge.udp_server.socket.close() | |
except Exception as err: | |
print(f"Error during async shutdown: {err}") | |
if self.event_loop and self.event_loop.is_running(): | |
try: | |
# Schedule the shutdown coroutine and stop the loop | |
asyncio.run_coroutine_threadsafe(shutdown_async(), self.event_loop) | |
# Give it a moment to execute | |
time.sleep(0.2) | |
# Then stop the loop | |
self.event_loop.stop() | |
except Exception as e: | |
print(f"Error during shutdown: {e}") | |
################################################################################ | |
# Entry Point | |
################################################################################ | |
def run_cli(): | |
"""Run the bridge in command-line mode (no GUI)""" | |
bridge = AeroflyToIGCBridge() | |
print("Starting Aerofly FS4 to IGC Recorder in CLI mode...") | |
print("Press Ctrl+C to exit") | |
# Run UDP server in the background | |
asyncio.create_task(bridge.run()) | |
# Main CLI loop | |
try: | |
loop = asyncio.get_event_loop() | |
print("\nCommands:") | |
print(" start - Start recording") | |
print(" stop - Stop recording") | |
print(" status - Show status") | |
print(" exit - Exit program") | |
async def cli_loop(): | |
while True: | |
print("\n> ", end="", flush=True) | |
command = await loop.run_in_executor(None, input) | |
if command.lower() == "start": | |
if bridge.is_recording: | |
print("Already recording") | |
else: | |
pilot = await loop.run_in_executor(None, lambda: input( | |
"Pilot name [Simulator Pilot]: ") or "Simulator Pilot") | |
glider = await loop.run_in_executor(None, lambda: input( | |
"Aircraft type [Aerofly FS4]: ") or "Aerofly FS4") | |
reg = await loop.run_in_executor(None, lambda: input("Registration [SIM]: ") or "SIM") | |
result = await bridge.start_recording(pilot_name=pilot, glider_type=glider, glider_id=reg) | |
if result: | |
print(f"Recording started to {bridge.current_filename}") | |
else: | |
print("Failed to start recording") | |
elif command.lower() == "stop": | |
if not bridge.is_recording: | |
print("Not recording") | |
else: | |
result = await bridge.stop_recording() | |
if result: | |
print(f"Recording stopped. Flight saved to {result}") | |
else: | |
print("No flight data recorded") | |
elif command.lower() == "status": | |
if bridge.udp_server.latest_gps: | |
gps = bridge.udp_server.latest_gps | |
print( | |
f"Last data: Lat={gps.latitude:.6f}, Lon={gps.longitude:.6f}, Alt={gps.alt_msl_meters * METERS_TO_FEET:.0f}ft") | |
if bridge.is_recording: | |
print(f"Recording to: {bridge.current_filename}") | |
print(f"Fixes recorded: {bridge.fix_count}") | |
else: | |
print("No data received yet") | |
elif command.lower() == "exit": | |
if bridge.is_recording: | |
confirm = await loop.run_in_executor(None, lambda: input( | |
"Recording in progress. Stop recording and exit? (y/n): ")) | |
if confirm.lower() != 'y': | |
continue | |
await bridge.stop_recording() | |
break | |
else: | |
print("Unknown command") | |
# Run CLI loop | |
loop.run_until_complete(cli_loop()) | |
except KeyboardInterrupt: | |
print("\nProgram interrupted.") | |
finally: | |
# If recording, stop it | |
if bridge.is_recording: | |
print("Stopping recording...") | |
loop.run_until_complete(bridge.stop_recording()) | |
print("Bridge shutting down.") | |
def run_gui(): | |
"""Run the bridge with GUI""" | |
# Check if tkinter is available | |
try: | |
import tkinter.messagebox | |
root = tk.Tk() | |
BridgeGUI(root) | |
root.mainloop() | |
except ImportError: | |
print("Tkinter not available, running in CLI mode") | |
run_cli() | |
except Exception as e: | |
print(f"Error starting GUI: {e}") | |
print("Falling back to CLI mode") | |
run_cli() | |
if __name__ == "__main__": | |
# By default, run with GUI | |
# Use '--cli' flag to run without GUI | |
if len(sys.argv) > 1 and sys.argv[1] == '--cli': | |
run_cli() | |
else: | |
run_gui() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment