Skip to content

Instantly share code, notes, and snippets.

@jlgabriel
Last active April 16, 2025 02:25
Show Gist options
  • Save jlgabriel/74f7b98e9eee7e7edd98f1b0b4d76299 to your computer and use it in GitHub Desktop.
Save jlgabriel/74f7b98e9eee7e7edd98f1b0b4d76299 to your computer and use it in GitHub Desktop.
Aerofly FS4 to IGC Recorder - developed to connect Aerofly FS4 Flight Simulator to generate gliders IGC flight logs
#!/usr/bin/env python3
##################################################################################
# Aerofly FS4 to IGC Recorder
# Developed to connect Aerofly FS4 Flight Simulator and generate IGC flight logs
# Copyright (c) 2025 Juan Luis Gabriel
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
##################################################################################
import asyncio
import socket
import json
import time
import tkinter as tk
from tkinter import ttk, font as tkfont, filedialog
import threading
from dataclasses import dataclass
import sys
import os
from typing import Optional, Union, Dict, Any, List
from pathlib import Path
import datetime
# Import aerofiles for IGC file handling
from aerofiles.igc import Writer
################################################################################
# Constants & Utilities
################################################################################
METERS_TO_FEET = 3.28084
MPS_TO_KTS = 1.94384
MPS_TO_FPM = 196.85 # Meters per second to feet per minute
# Define paths for output files
def get_default_igc_dir():
"""Get the default directory for IGC files"""
documents_dir = os.path.join(os.path.expanduser("~"), "Documents")
igc_dir = os.path.join(documents_dir, "AeroflyIGC")
os.makedirs(igc_dir, exist_ok=True)
return igc_dir
def generate_igc_filename():
"""Generate a filename for the IGC file based on current date and time"""
now = datetime.datetime.now()
return f"AEROFLY_{now.strftime('%Y%m%d_%H%M%S')}.igc"
################################################################################
# Data Classes for Parsed ForeFlight Messages
################################################################################
@dataclass
class XGPSData:
"""
Represents ForeFlight XGPS data.
e.g. XGPS<sim_name>,<longitude>,<latitude>,<altitude_msl_meters>,<track_true_north>,<groundspeed_m/s>
"""
sim_name: str
longitude: float
latitude: float
alt_msl_meters: float
track_deg: float
ground_speed_mps: float
@dataclass
class XATTData:
"""
Represents ForeFlight XATT data.
e.g. XATT<sim_name>,<true_heading>,<pitch_degrees>,<roll_degrees>
"""
sim_name: str
heading_deg: float
pitch_deg: float
roll_deg: float
@dataclass
class UnknownData:
"""
Returned if we fail to parse or data type is not recognized.
"""
raw_line: str
################################################################################
# IGC Recorder Class
################################################################################
class IGCRecorder:
"""
Records flight data to an IGC file using aerofiles library.
Handles both standard IGC records and custom attitude data.
"""
def __init__(self):
self.recording = False
self.igc_file = None
self.writer = None # Reference to aerofiles Writer
self.filename = None
self.start_time = None
self.end_time = None
self.fix_count = 0
self._lock = asyncio.Lock()
# Queue for position data
self.position_queue = asyncio.Queue()
# Task for processing queue
self.processing_task = None
def _get_fixed_time(self, timestamp=None):
"""
Returns a time object fixed at 12:00, but preserves the original date.
Can be used to generate fixed timestamps for IGC files.
"""
# If no timestamp is provided, use the current date
if timestamp is None:
try:
# For Python 3.11+
timestamp = datetime.datetime.now(datetime.UTC)
except AttributeError:
# For older Python versions
timestamp = datetime.datetime.utcnow()
# Create a new datetime with the same date but at 12:00:00
fixed_date = timestamp.date()
fixed_time = datetime.time(hour=12, minute=0, second=0)
# Return only the time object to use in B records
return fixed_time
async def start_recording(self, pilot_name="", glider_type="Aerofly FS4",
glider_id="SIM", gps_device="Aerofly GPS"):
"""Start recording a new IGC file"""
async with self._lock:
if self.recording:
print("[IGCRecorder] Already recording")
return False
try:
# Generate filename based on current date and time
self.filename = os.path.join(get_default_igc_dir(), generate_igc_filename())
# Open file for writing in binary mode (required by aerofiles)
self.igc_file = open(self.filename, 'wb')
# Create aerofiles IGC writer
self.writer = Writer(self.igc_file)
# Get current time
try:
# For Python 3.11+
utc_now = datetime.datetime.now(datetime.UTC)
except AttributeError:
# For older Python versions
utc_now = datetime.datetime.utcnow()
self.start_time = utc_now
# Initialize seconds counter for time simulation
self._second_counter = 0
# Write date record - keep the original date
self.writer.write_date(utc_now.date())
# Write basic header records
if pilot_name:
self.writer.write_pilot(pilot_name)
if glider_type:
self.writer.write_glider_type(glider_type)
if glider_id:
self.writer.write_glider_id(glider_id)
# Write logger info
self.writer.write_comment("LOG", "Aerofly FS4 Simulator")
self.writer.write_firmware_version("1.0")
# Add a comment about the source
self.writer.write_comment("GEN", "Generated by Aerofly FS4 to IGC Recorder")
# Reset fix count
self.fix_count = 0
# Start recording
self.recording = True
# Start the processing task
self.processing_task = asyncio.create_task(self._process_queue())
print(f"[IGCRecorder] Started recording to {self.filename}")
return True
except Exception as e:
print(f"[IGCRecorder] Error starting recording: {e}")
# Clean up if error occurs
if hasattr(self, 'igc_file') and self.igc_file:
self.igc_file.close()
self.igc_file = None
self.recording = False
return False
def _write_extensions_declaration(self):
"""
Write I record to declare extensions used in K records
This is a good practice for IGC files with custom extensions
"""
try:
# # Write a comment explaining the extensions
# self.writer.write_comment("ATT", "Attitude data: heading, roll, pitch")
#
# # Since aerofiles doesn't support I records directly, we write them manually
# # Format: I + number of extensions + list of extensions
# i_record = "I033638HDG3839ROLL4345PITCH\n"
# self.igc_file.write(i_record.encode('utf-8'))
return True
except Exception as e:
print(f"[IGCRecorder] Error writing extensions declaration: {e}")
return False
async def stop_recording(self):
"""Stop recording and close the IGC file"""
async with self._lock:
if not self.recording:
print("[IGCRecorder] Not recording")
return False
try:
# Set end time
try:
self.end_time = datetime.datetime.now(datetime.UTC)
except AttributeError:
self.end_time = datetime.datetime.utcnow()
# Cancel the processing task
if self.processing_task:
self.processing_task.cancel()
try:
await self.processing_task
except asyncio.CancelledError:
pass
self.processing_task = None
# Process any remaining items in the queue
while not self.position_queue.empty():
try:
gps_data, att_data, timestamp = await self.position_queue.get()
await self._write_position(gps_data, att_data, timestamp)
self.position_queue.task_done()
except Exception as e:
print(f"[IGCRecorder] Error processing remaining queue items: {e}")
# Close the file
if hasattr(self, 'igc_file') and self.igc_file:
# Add final comment with recording end time
if hasattr(self, 'writer'):
try:
self.writer.write_comment("END", f"Recording ended at {self.end_time.strftime('%H:%M:%S')}")
except Exception as e:
print(f"[IGCRecorder] Error writing end comment: {e}")
# Close the file
self.igc_file.close()
self.igc_file = None
if self.fix_count > 0:
print(f"[IGCRecorder] Stopped recording. Wrote {self.fix_count} fixes to {self.filename}")
result_file = self.filename
self.filename = None
self.recording = False
return result_file
else:
print("[IGCRecorder] No data recorded, deleting file")
if self.filename and os.path.exists(self.filename):
os.remove(self.filename)
self.filename = None
self.recording = False
return None
else:
self.recording = False
return None
except Exception as e:
print(f"[IGCRecorder] Error stopping recording: {e}")
# Clean up even if error occurs
if hasattr(self, 'igc_file') and self.igc_file:
self.igc_file.close()
self.igc_file = None
self.recording = False
return None
async def add_position(self, gps_data: XGPSData, att_data: Optional[XATTData] = None):
"""Add a position fix to the IGC file"""
if not self.recording:
return False
try:
# Get current timestamp - ensure we use the same type of datetime as in start_recording
# Check if start_time is timezone-aware
if hasattr(self.start_time, 'tzinfo') and self.start_time.tzinfo is not None:
# Use timezone-aware datetime
try:
timestamp = datetime.datetime.now(datetime.UTC)
except AttributeError:
# For older Python, create an aware datetime
timestamp = datetime.datetime.now(datetime.timezone.utc)
else:
# Use naive datetime to match
timestamp = datetime.datetime.utcnow()
# Add to queue for processing
await self.position_queue.put((gps_data, att_data, timestamp))
return True
except Exception as e:
print(f"[IGCRecorder] Error adding position: {e}")
return False
async def _process_queue(self):
"""Process the position queue in the background"""
try:
while True:
# Get item from queue
gps_data, att_data, timestamp = await self.position_queue.get()
# Process item
await self._write_position(gps_data, att_data, timestamp)
# Mark item as done
self.position_queue.task_done()
# Small sleep to avoid consuming too much CPU
await asyncio.sleep(0.01)
except asyncio.CancelledError:
# Task was cancelled, clean exit
print("[IGCRecorder] Position processing task cancelled")
raise
except Exception as e:
print(f"[IGCRecorder] Error in position processing task: {e}")
async def _write_k_record(self, timestamp: datetime.time, code: str, data: str):
"""
Write a custom K record (extension data) to the IGC file
Since aerofiles doesn't support K records directly, we write them manually
"""
if not self.recording or not self.igc_file:
return False
try:
# Format time as HHMMSS
time_str = timestamp.strftime('%H%M%S')
# Format K record: K + time + data
k_record = f"K{time_str}{code}{data}\n"
# Write directly to the file - ensure it's encoded to bytes
# since we opened the file in binary mode
self.igc_file.write(k_record.encode('utf-8'))
return True
except Exception as e:
print(f"[IGCRecorder] Error writing K record: {e}")
return False
async def _write_position(self, gps_data: XGPSData, att_data: Optional[XATTData] = None,
timestamp: datetime.datetime = None):
"""Write position fix to the IGC file using aerofiles"""
async with self._lock:
if not self.recording or not self.igc_file or not hasattr(self, 'writer'):
return False
try:
# Use current time if not provided
if timestamp is None:
try:
timestamp = datetime.datetime.now(datetime.UTC)
except AttributeError:
timestamp = datetime.datetime.utcnow()
# Extract data from GPS
latitude = gps_data.latitude
longitude = gps_data.longitude
altitude = int(gps_data.alt_msl_meters) # IGC uses meters
# Get pressure altitude (we don't have this in simulator, use MSL as approximation)
pressure_alt = altitude
# For screen display, continue incrementing the seconds
if not hasattr(self, '_second_counter'):
self._second_counter = 0
else:
self._second_counter += 1
# Calculate hours, minutes, and seconds properly
total_seconds = self._second_counter
hours = 12 # Start at 12:00:00
minutes = (total_seconds // 60) % 60
seconds = total_seconds % 60
# Create a properly incremented time
simulated_time = datetime.time(
hour=hours,
minute=minutes,
second=seconds,
microsecond=0
)
# Write B record (position fix) using aerofiles
self.writer.write_fix(
time=simulated_time, # Use simulated time that increases properly
latitude=latitude,
longitude=longitude,
pressure_alt=pressure_alt,
gps_alt=altitude
)
# Increment fix count
self.fix_count += 1
return True
except Exception as e:
print(f"[IGCRecorder] Error writing position: {e}")
return False
################################################################################
# ForeFlight Data Parser
################################################################################
class ForeFlightParser:
"""
Parses strings in ForeFlight's XGPS / XATT formats,
returning typed objects: XGPSData, XATTData, or UnknownData.
"""
@staticmethod
def parse_line(line: str):
"""
Identify the data type (XGPS, XATT) and parse accordingly.
"""
line = line.strip()
if line.startswith("XGPS"):
return ForeFlightParser._parse_xgps(line)
elif line.startswith("XATT"):
return ForeFlightParser._parse_xatt(line)
else:
return UnknownData(raw_line=line)
@staticmethod
def _parse_xgps(line: str) -> Union[XGPSData, UnknownData]:
"""
Example XGPS line:
XGPSMySim,-80.11,34.55,1200.1,359.05,55.6
=> XGPS<sim_name>,<longitude>,<latitude>,<alt_msl_meters>,<track_deg_true>,<groundspeed_m/s>
"""
try:
raw = line[4:] # remove 'XGPS'
parts = raw.split(",")
sim_name = parts[0].strip()
longitude = float(parts[1])
latitude = float(parts[2])
alt_msl_meters = float(parts[3])
track_deg = float(parts[4])
ground_speed_mps = float(parts[5])
return XGPSData(
sim_name=sim_name,
longitude=longitude,
latitude=latitude,
alt_msl_meters=alt_msl_meters,
track_deg=track_deg,
ground_speed_mps=ground_speed_mps
)
except (ValueError, IndexError):
return UnknownData(raw_line=line)
@staticmethod
def _parse_xatt(line: str) -> Union[XATTData, UnknownData]:
"""
Example XATT line:
XATTMySim,180.2,0.1,0.2
=> XATT<sim_name>,<true_heading_deg>,<pitch_deg>,<roll_deg>
"""
try:
raw = line[4:]
parts = raw.split(",")
sim_name = parts[0].strip()
heading_deg = float(parts[1])
pitch_deg = float(parts[2])
roll_deg = float(parts[3])
return XATTData(
sim_name=sim_name,
heading_deg=heading_deg,
pitch_deg=pitch_deg,
roll_deg=roll_deg
)
except (ValueError, IndexError):
return UnknownData(raw_line=line)
################################################################################
# ForeFlight UDP Server
################################################################################
class ForeFlightUDPServer:
"""
Listens for ForeFlight-compatible data on UDP port 49002.
Parses lines and updates the shared SimData and IGC recorder.
"""
def __init__(self, igc_recorder: IGCRecorder, parser: ForeFlightParser, port: int = 49002):
self.igc_recorder = igc_recorder
self.parser = parser
self.port = port
self.socket = None
self.lastDataReceivedTime = None
self.last_record_time = None # Nuevo: para seguimiento del último registro
# Keep track of latest GPS and ATT data
self.latest_gps = None
self.latest_att = None
async def run(self):
"""
Main loop: bind to UDP port, receive data, parse, update igc_recorder.
We'll do a blocking recv in a background thread so we don't block the event loop.
"""
# Create UDP socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Enable broadcast & address reuse
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set blocking to True for our simple loop
self.socket.setblocking(True)
# Bind to all interfaces on the specified port:
self.socket.bind(('0.0.0.0', self.port))
print(f"[ForeFlightUDPServer] Listening on UDP port {self.port}...")
print(f"[ForeFlightUDPServer] Ready to receive data from Aerofly FS4...")
try:
while True:
try:
# When the socket is closed, this will raise an exception
data, addr = await asyncio.to_thread(self.socket.recvfrom, 1024)
line = data.decode('utf-8', errors='ignore').strip()
parsed_obj = self.parser.parse_line(line)
# If we haven't received data in a while, print a message
if self.lastDataReceivedTime is None or \
time.time() - self.lastDataReceivedTime > 5.0:
print(f"[ForeFlightUDPServer] Receiving data from {addr[0]}:{addr[1]}")
print(f"[ForeFlightUDPServer] Sample data: {line}")
print(f"[ForeFlightUDPServer] Parsed object: {parsed_obj}")
self.lastDataReceivedTime = time.time()
# Update latest data
if isinstance(parsed_obj, XGPSData):
self.latest_gps = parsed_obj
# Si estamos grabando y tenemos GPS data, verificar si ha pasado 1 segundo
current_time = time.time()
if (self.igc_recorder.recording and self.latest_gps and
(self.last_record_time is None or
current_time - self.last_record_time >= 1.0)):
await self.igc_recorder.add_position(self.latest_gps, self.latest_att)
self.last_record_time = current_time
elif isinstance(parsed_obj, XATTData):
self.latest_att = parsed_obj
except (ConnectionError, OSError) as e:
# Socket was closed
print(f"[ForeFlightUDPServer] Socket error: {e}")
break
except asyncio.CancelledError:
# Task was cancelled
break
except Exception as e:
print(f"[ForeFlightUDPServer] Error receiving data: {e}")
# Continue despite errors
finally:
# Ensure socket is closed when the task ends
if self.socket:
try:
self.socket.close()
print("[ForeFlightUDPServer] Socket closed")
except Exception as ex:
print(f"[ForeFlightUDPServer] Error closing socket: {ex}")
################################################################################
# Bridge Core
################################################################################
class AeroflyToIGCBridge:
"""
High-level orchestrator that sets up:
1) An IGCRecorder object
2) A ForeFlightUDPServer (listens on 49002)
3) Runs them concurrently with asyncio.
"""
def __init__(self, udp_port=49002):
self.igc_recorder = IGCRecorder()
self.parser = ForeFlightParser()
self.udp_server = ForeFlightUDPServer(self.igc_recorder, self.parser, port=udp_port)
async def run(self):
await self.udp_server.run()
async def start_recording(self, pilot_name="", glider_type="", glider_id=""):
"""Start recording an IGC file"""
return await self.igc_recorder.start_recording(
pilot_name=pilot_name,
glider_type=glider_type or "Aerofly FS4",
glider_id=glider_id or "SIM"
)
async def stop_recording(self):
"""Stop recording and return the filename of the IGC file"""
return await self.igc_recorder.stop_recording()
@property
def is_recording(self):
"""Return True if recording is in progress"""
return self.igc_recorder.recording
@property
def fix_count(self):
"""Return the number of fixes recorded so far"""
return self.igc_recorder.fix_count
@property
def current_filename(self):
"""Return the current IGC filename being recorded"""
return self.igc_recorder.filename
################################################################################
# GUI Implementation
################################################################################
class BridgeGUI:
def __init__(self, master):
self.master = master
self.master.title("Aerofly FS4 to IGC Recorder")
self.master.geometry("600x580")
# Configure grid
self.master.grid_columnconfigure(0, weight=1)
self.master.grid_rowconfigure(1, weight=1)
# Initialize UI elements that will be created in setup_ui
self.status_label = None
self.connection_status = None
self.info_display = None
self.recording_status = None
self.start_button = None
self.stop_button = None
self.pilot_entry = None
self.glider_entry = None
self.registration_entry = None
# Initialize bridge components
self.bridge = None
self.bridge_task = None
self.event_loop = None
self.bridge_thread = None
# Set up UI components
self.setup_ui()
# Set up window close protocol
self.master.protocol("WM_DELETE_WINDOW", self.close_application)
# Start the bridge
self.start_bridge()
def setup_ui(self):
"""Set up the user interface components"""
# Status Frame
status_frame = ttk.Frame(self.master, padding="10")
status_frame.grid(row=0, column=0, sticky="ew")
# Status Label
self.status_label = ttk.Label(
status_frame,
text="Status: Starting...",
font=tkfont.Font(size=10)
)
self.status_label.pack(side="left")
# Connection status
self.connection_status = ttk.Label(
status_frame,
text="Disconnected",
foreground="red",
font=tkfont.Font(size=10, weight="bold")
)
self.connection_status.pack(side="right")
# Info Frame
info_frame = ttk.Frame(self.master, padding="10")
info_frame.grid(row=1, column=0, sticky="nsew")
# Info Display
self.info_display = tk.Text(
info_frame,
wrap=tk.WORD,
width=60,
height=20,
font=tkfont.Font(family="Consolas", size=9)
)
self.info_display.pack(fill=tk.BOTH, expand=True)
# Recording Status
recording_frame = ttk.Frame(self.master, padding="10")
recording_frame.grid(row=2, column=0, sticky="ew")
self.recording_status = ttk.Label(
recording_frame,
text="Not Recording",
foreground="gray",
font=tkfont.Font(size=12, weight="bold")
)
self.recording_status.pack(side="top", fill="x")
# Flight details frame
flight_frame = ttk.Frame(self.master, padding="10")
flight_frame.grid(row=3, column=0, sticky="ew")
# Pilot name
ttk.Label(flight_frame, text="Pilot Name:").grid(row=0, column=0, sticky="w", padx=5, pady=5)
self.pilot_entry = ttk.Entry(flight_frame, width=30)
self.pilot_entry.grid(row=0, column=1, sticky="ew", padx=5, pady=5)
self.pilot_entry.insert(0, "Simulator Pilot")
# Glider type
ttk.Label(flight_frame, text="Aircraft Type:").grid(row=1, column=0, sticky="w", padx=5, pady=5)
self.glider_entry = ttk.Entry(flight_frame, width=30)
self.glider_entry.grid(row=1, column=1, sticky="ew", padx=5, pady=5)
self.glider_entry.insert(0, "Aerofly FS4")
# Registration
ttk.Label(flight_frame, text="Registration:").grid(row=2, column=0, sticky="w", padx=5, pady=5)
self.registration_entry = ttk.Entry(flight_frame, width=30)
self.registration_entry.grid(row=2, column=1, sticky="ew", padx=5, pady=5)
self.registration_entry.insert(0, "SIM")
# Configure the grid columns
flight_frame.columnconfigure(1, weight=1)
# Control Frame
control_frame = ttk.Frame(self.master, padding="10")
control_frame.grid(row=4, column=0, sticky="ew")
# Buttons container
buttons_frame = ttk.Frame(control_frame)
buttons_frame.pack(fill=tk.X, expand=True)
# Start Recording Button
self.start_button = ttk.Button(
buttons_frame,
text="Start Recording",
command=self.start_recording
)
self.start_button.pack(side="left", padx=5)
# Stop Recording Button
self.stop_button = ttk.Button(
buttons_frame,
text="Stop Recording",
command=self.stop_recording,
state="disabled"
)
self.stop_button.pack(side="left", padx=5)
# Open IGC Folder Button
self.open_folder_button = ttk.Button(
buttons_frame,
text="Open IGC Folder",
command=self.open_igc_folder
)
self.open_folder_button.pack(side="right", padx=5)
# Close Button
self.close_button = ttk.Button(
control_frame,
text="Close",
command=self.close_application
)
self.close_button.pack(side="bottom", pady=10)
def open_igc_folder(self):
"""Open the folder containing IGC files"""
igc_dir = get_default_igc_dir()
if os.path.exists(igc_dir):
if sys.platform == 'win32':
os.startfile(igc_dir)
elif sys.platform == 'darwin': # macOS
import subprocess
subprocess.Popen(['open', igc_dir])
else: # Linux
import subprocess
subprocess.Popen(['xdg-open', igc_dir])
else:
print(f"IGC directory does not exist: {igc_dir}")
def start_recording(self):
"""Start recording an IGC file"""
if not self.bridge:
self.show_error("Bridge not initialized")
return
# Get pilot info from entries
pilot_name = self.pilot_entry.get() or "Simulator Pilot"
glider_type = self.glider_entry.get() or "Aerofly FS4"
glider_id = self.registration_entry.get() or "SIM"
# Schedule the recording start in the bridge's event loop
if self.event_loop:
future = asyncio.run_coroutine_threadsafe(
self.bridge.start_recording(
pilot_name=pilot_name,
glider_type=glider_type,
glider_id=glider_id
),
self.event_loop
)
try:
# Wait for result with a timeout
result = future.result(timeout=1.0)
if result:
# Update UI
self.recording_status.config(
text="Recording...",
foreground="green"
)
self.start_button.config(state="disabled")
self.stop_button.config(state="normal")
# Disable entries during recording
self.pilot_entry.config(state="disabled")
self.glider_entry.config(state="disabled")
self.registration_entry.config(state="disabled")
# Update status
if self.status_label:
self.status_label.config(text="Status: Recording flight")
else:
self.show_error("Failed to start recording")
except Exception as e:
self.show_error(f"Error starting recording: {e}")
def stop_recording(self):
"""Stop recording and save the IGC file"""
if not self.bridge:
self.show_error("Bridge not initialized")
return
# Schedule the recording stop in the bridge's event loop
if self.event_loop:
future = asyncio.run_coroutine_threadsafe(
self.bridge.stop_recording(),
self.event_loop
)
try:
# Wait for result with a timeout
result = future.result(timeout=1.0)
# Update UI
self.recording_status.config(
text="Not Recording",
foreground="gray"
)
self.start_button.config(state="normal")
self.stop_button.config(state="disabled")
# Re-enable entries
self.pilot_entry.config(state="normal")
self.glider_entry.config(state="normal")
self.registration_entry.config(state="normal")
# Update status
if self.status_label:
if result:
self.status_label.config(text=f"Status: Flight saved to {os.path.basename(result)}")
# Show success message
tk.messagebox.showinfo(
"Recording Complete",
f"Flight recorded and saved to:\n{result}"
)
else:
self.status_label.config(text="Status: No flight data recorded")
except Exception as e:
self.show_error(f"Error stopping recording: {e}")
def start_bridge(self):
"""Start the Aerofly-IGC bridge in a separate thread"""
def run_bridge():
try:
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self.bridge = AeroflyToIGCBridge()
# Update status
if self.master and self.status_label:
self.master.after(0, lambda: self.status_label.config(
text="Status: Bridge started"
))
# Start monitoring UDP data
if self.master:
self.master.after(100, self.check_udp_data)
self.bridge_task = self.event_loop.create_task(self.bridge.run())
self.event_loop.run_forever()
except Exception as e:
if self.master:
self.master.after(0, lambda: self.show_error(str(e)))
self.bridge_thread = threading.Thread(target=run_bridge, daemon=True)
self.bridge_thread.start()
def check_udp_data(self):
"""Check if we're receiving UDP data and update the UI accordingly"""
# Make sure the app hasn't been closed
if not self.master or not self.master.winfo_exists():
return
# Get current time for tracking connection status
current_time = time.time()
# Default connection state
connection_state = "Disconnected"
connection_color = "red"
# Default info text
info_text = f"=== Connection Status ===\n"
info_text += f"UDP Input: Listening on port 49002\n"
# Check if we have recent data (within last 5 seconds)
has_recent_data = False
elapsed_time = 0
if self.bridge and self.bridge.udp_server.lastDataReceivedTime is not None:
elapsed_time = current_time - self.bridge.udp_server.lastDataReceivedTime
has_recent_data = elapsed_time < 5.0
if has_recent_data:
connection_state = "Connected"
connection_color = "green"
info_text += f"Last data received: {elapsed_time:.1f} seconds ago\n"
else:
connection_state = "No Data"
connection_color = "orange"
info_text += f"No data in {elapsed_time:.1f} seconds\n"
info_text += f"Make sure Aerofly FS4 is running and ForeFlight output is enabled\n"
else:
info_text += "Waiting for first data from Aerofly FS4...\n"
info_text += "1. Make sure Aerofly FS4 is running\n"
info_text += "2. Check that 'Output data to ForeFlight' is enabled in Aerofly settings\n"
info_text += "3. Ensure this computer's firewall allows UDP port 49002\n"
# Update connection status
if self.connection_status:
self.connection_status.config(
text=connection_state,
foreground=connection_color
)
# If we have simulator data, show it
if self.bridge and self.bridge.udp_server.latest_gps:
gps = self.bridge.udp_server.latest_gps
att = self.bridge.udp_server.latest_att
# Add simulator data to info display
info_text += f"\n=== Simulator Data ===\n"
info_text += f"Latitude: {gps.latitude:.6f}°\n"
info_text += f"Longitude: {gps.longitude:.6f}°\n"
info_text += f"Altitude: {gps.alt_msl_meters * METERS_TO_FEET:.0f} ft\n"
info_text += f"Speed: {gps.ground_speed_mps * MPS_TO_KTS:.1f} kts\n"
if att:
info_text += f"Roll: {att.roll_deg:.1f}°\n"
info_text += f"Pitch: {att.pitch_deg:.1f}°\n"
info_text += f"Heading: {att.heading_deg:.1f}°\n"
# Add recording status
if self.bridge and self.bridge.is_recording:
info_text += f"\n=== Recording Status ===\n"
info_text += f"Recording to: {os.path.basename(self.bridge.current_filename)}\n"
info_text += f"Fixes recorded: {self.bridge.fix_count}\n"
# Calculate recording duration
if self.bridge.igc_recorder.start_time:
try:
# If start_time is timezone-aware
if hasattr(self.bridge.igc_recorder.start_time,
'tzinfo') and self.bridge.igc_recorder.start_time.tzinfo is not None:
# Use timezone-aware datetime
try:
current = datetime.datetime.now(datetime.UTC)
except AttributeError:
current = datetime.datetime.now(datetime.timezone.utc)
else:
# If start_time is naive (no timezone)
current = datetime.datetime.utcnow()
duration = current - self.bridge.igc_recorder.start_time
hours, remainder = divmod(duration.total_seconds(), 3600)
minutes, seconds = divmod(remainder, 60)
info_text += f"Duration: {int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}\n"
except Exception as e:
info_text += f"Duration calculation error: {e}\n"
# Update the info display
if self.info_display:
self.info_display.delete(1.0, tk.END)
self.info_display.insert(tk.END, info_text)
# Schedule next check if the app is still running
if self.master and self.master.winfo_exists():
self.master.after(100, self.check_udp_data)
def show_error(self, error_msg):
"""Display error message in the info display"""
if self.info_display:
self.info_display.delete(1.0, tk.END)
self.info_display.insert(tk.END, f"Error: {error_msg}")
if self.status_label:
self.status_label.config(text="Status: Error")
if self.connection_status:
self.connection_status.config(text="Error", foreground="red")
def close_application(self):
"""Clean up and close the application"""
# If recording, stop it
if self.bridge and self.bridge.is_recording and self.event_loop:
try:
# Schedule the recording stop in the bridge's event loop
future = asyncio.run_coroutine_threadsafe(
self.bridge.stop_recording(),
self.event_loop
)
# Wait for result with a short timeout
future.result(timeout=0.5)
except Exception as e:
print(f"Error stopping recording during shutdown: {e}")
# First destroy the UI to prevent more UI updates
if self.master:
# Create local reference to avoid using self.master after destroy
master_copy = self.master
self.master = None # Remove reference to prevent future access
master_copy.destroy()
async def shutdown_async():
try:
# Cancel the main bridge task
if self.bridge_task:
self.bridge_task.cancel()
try:
await self.bridge_task
except asyncio.CancelledError:
pass
# Close socket if it exists
if self.bridge and self.bridge.udp_server and self.bridge.udp_server.socket:
self.bridge.udp_server.socket.close()
except Exception as err:
print(f"Error during async shutdown: {err}")
if self.event_loop and self.event_loop.is_running():
try:
# Schedule the shutdown coroutine and stop the loop
asyncio.run_coroutine_threadsafe(shutdown_async(), self.event_loop)
# Give it a moment to execute
time.sleep(0.2)
# Then stop the loop
self.event_loop.stop()
except Exception as e:
print(f"Error during shutdown: {e}")
################################################################################
# Entry Point
################################################################################
def run_cli():
"""Run the bridge in command-line mode (no GUI)"""
bridge = AeroflyToIGCBridge()
print("Starting Aerofly FS4 to IGC Recorder in CLI mode...")
print("Press Ctrl+C to exit")
# Run UDP server in the background
asyncio.create_task(bridge.run())
# Main CLI loop
try:
loop = asyncio.get_event_loop()
print("\nCommands:")
print(" start - Start recording")
print(" stop - Stop recording")
print(" status - Show status")
print(" exit - Exit program")
async def cli_loop():
while True:
print("\n> ", end="", flush=True)
command = await loop.run_in_executor(None, input)
if command.lower() == "start":
if bridge.is_recording:
print("Already recording")
else:
pilot = await loop.run_in_executor(None, lambda: input(
"Pilot name [Simulator Pilot]: ") or "Simulator Pilot")
glider = await loop.run_in_executor(None, lambda: input(
"Aircraft type [Aerofly FS4]: ") or "Aerofly FS4")
reg = await loop.run_in_executor(None, lambda: input("Registration [SIM]: ") or "SIM")
result = await bridge.start_recording(pilot_name=pilot, glider_type=glider, glider_id=reg)
if result:
print(f"Recording started to {bridge.current_filename}")
else:
print("Failed to start recording")
elif command.lower() == "stop":
if not bridge.is_recording:
print("Not recording")
else:
result = await bridge.stop_recording()
if result:
print(f"Recording stopped. Flight saved to {result}")
else:
print("No flight data recorded")
elif command.lower() == "status":
if bridge.udp_server.latest_gps:
gps = bridge.udp_server.latest_gps
print(
f"Last data: Lat={gps.latitude:.6f}, Lon={gps.longitude:.6f}, Alt={gps.alt_msl_meters * METERS_TO_FEET:.0f}ft")
if bridge.is_recording:
print(f"Recording to: {bridge.current_filename}")
print(f"Fixes recorded: {bridge.fix_count}")
else:
print("No data received yet")
elif command.lower() == "exit":
if bridge.is_recording:
confirm = await loop.run_in_executor(None, lambda: input(
"Recording in progress. Stop recording and exit? (y/n): "))
if confirm.lower() != 'y':
continue
await bridge.stop_recording()
break
else:
print("Unknown command")
# Run CLI loop
loop.run_until_complete(cli_loop())
except KeyboardInterrupt:
print("\nProgram interrupted.")
finally:
# If recording, stop it
if bridge.is_recording:
print("Stopping recording...")
loop.run_until_complete(bridge.stop_recording())
print("Bridge shutting down.")
def run_gui():
"""Run the bridge with GUI"""
# Check if tkinter is available
try:
import tkinter.messagebox
root = tk.Tk()
BridgeGUI(root)
root.mainloop()
except ImportError:
print("Tkinter not available, running in CLI mode")
run_cli()
except Exception as e:
print(f"Error starting GUI: {e}")
print("Falling back to CLI mode")
run_cli()
if __name__ == "__main__":
# By default, run with GUI
# Use '--cli' flag to run without GUI
if len(sys.argv) > 1 and sys.argv[1] == '--cli':
run_cli()
else:
run_gui()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment