Last active
April 16, 2025 02:25
-
-
Save jlgabriel/74f7b98e9eee7e7edd98f1b0b4d76299 to your computer and use it in GitHub Desktop.
Revisions
-
jlgabriel revised this gist
Apr 16, 2025 . 1 changed file with 47 additions and 23 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -108,6 +108,27 @@ def __init__(self): # Task for processing queue self.processing_task = None def _get_fixed_time(self, timestamp=None): """ Returns a time object fixed at 12:00, but preserves the original date. Can be used to generate fixed timestamps for IGC files. """ # If no timestamp is provided, use the current date if timestamp is None: try: # For Python 3.11+ timestamp = datetime.datetime.now(datetime.UTC) except AttributeError: # For older Python versions timestamp = datetime.datetime.utcnow() # Create a new datetime with the same date but at 12:00:00 fixed_date = timestamp.date() fixed_time = datetime.time(hour=12, minute=0, second=0) # Return only the time object to use in B records return fixed_time async def start_recording(self, pilot_name="", glider_type="Aerofly FS4", glider_id="SIM", gps_device="Aerofly GPS"): """Start recording a new IGC file""" @@ -136,7 +157,10 @@ async def start_recording(self, pilot_name="", glider_type="Aerofly FS4", self.start_time = utc_now # Initialize seconds counter for time simulation self._second_counter = 0 # Write date record - keep the original date self.writer.write_date(utc_now.date()) # Write basic header records @@ -150,16 +174,12 @@ async def start_recording(self, pilot_name="", glider_type="Aerofly FS4", self.writer.write_glider_id(glider_id) # Write logger info self.writer.write_comment("LOG", "Aerofly FS4 Simulator") self.writer.write_firmware_version("1.0") # Add a comment about the source self.writer.write_comment("GEN", "Generated by Aerofly FS4 to IGC Recorder") # Reset fix count self.fix_count = 0 @@ -366,31 +386,36 @@ async def _write_position(self, gps_data: XGPSData, att_data: Optional[XATTData] # Get pressure altitude (we don't have this in simulator, use MSL as approximation) pressure_alt = altitude # For screen display, continue incrementing the seconds if not hasattr(self, '_second_counter'): self._second_counter = 0 else: self._second_counter += 1 # Calculate hours, minutes, and seconds properly total_seconds = self._second_counter hours = 12 # Start at 12:00:00 minutes = (total_seconds // 60) % 60 seconds = total_seconds % 60 # Create a properly incremented time simulated_time = datetime.time( hour=hours, minute=minutes, second=seconds, microsecond=0 ) # Write B record (position fix) using aerofiles self.writer.write_fix( time=simulated_time, # Use simulated time that increases properly latitude=latitude, longitude=longitude, pressure_alt=pressure_alt, gps_alt=altitude ) # Increment fix count self.fix_count += 1 return True @@ -1184,5 +1209,4 @@ def run_gui(): if len(sys.argv) > 1 and sys.argv[1] == '--cli': run_cli() else: run_gui() -
jlgabriel revised this gist
Apr 15, 2025 . 1 changed file with 29 additions and 23 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -187,13 +187,13 @@ def _write_extensions_declaration(self): This is a good practice for IGC files with custom extensions """ try: # # Write a comment explaining the extensions # self.writer.write_comment("ATT", "Attitude data: heading, roll, pitch") # # # Since aerofiles doesn't support I records directly, we write them manually # # Format: I + number of extensions + list of extensions # i_record = "I033638HDG3839ROLL4345PITCH\n" # self.igc_file.write(i_record.encode('utf-8')) return True except Exception as e: @@ -378,19 +378,19 @@ async def _write_position(self, gps_data: XGPSData, att_data: Optional[XATTData] gps_alt=altitude ) # # If we have attitude data, write a K record # if att_data: # hdg = int(att_data.heading_deg) % 360 # roll = int(att_data.roll_deg) # pitch = int(att_data.pitch_deg) # # # Format attitude data for K record # att_data_str = f"HDG{hdg:03d}ROLL{abs(roll):03d}{'+' if roll >= 0 else '-'}PITCH{abs(pitch):03d}{'+' if pitch >= 0 else '-'}" # # # Write K record # await self._write_k_record(fix_time, "ATT", att_data_str) # # # Increment fix count self.fix_count += 1 return True @@ -491,6 +491,7 @@ def __init__(self, igc_recorder: IGCRecorder, parser: ForeFlightParser, port: in self.port = port self.socket = None self.lastDataReceivedTime = None self.last_record_time = None # Nuevo: para seguimiento del último registro # Keep track of latest GPS and ATT data self.latest_gps = None @@ -536,9 +537,13 @@ async def run(self): # Update latest data if isinstance(parsed_obj, XGPSData): self.latest_gps = parsed_obj # Si estamos grabando y tenemos GPS data, verificar si ha pasado 1 segundo current_time = time.time() if (self.igc_recorder.recording and self.latest_gps and (self.last_record_time is None or current_time - self.last_record_time >= 1.0)): await self.igc_recorder.add_position(self.latest_gps, self.latest_att) self.last_record_time = current_time elif isinstance(parsed_obj, XATTData): self.latest_att = parsed_obj except (ConnectionError, OSError) as e: @@ -1179,4 +1184,5 @@ def run_gui(): if len(sys.argv) > 1 and sys.argv[1] == '--cli': run_cli() else: run_gui()
-
jlgabriel created this gist
Apr 15, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,1182 @@ #!/usr/bin/env python3 ################################################################################## # Aerofly FS4 to IGC Recorder # Developed to connect Aerofly FS4 Flight Simulator and generate IGC flight logs # Copyright (c) 2025 Juan Luis Gabriel # This software is released under the MIT License. # https://opensource.org/licenses/MIT ################################################################################## import asyncio import socket import json import time import tkinter as tk from tkinter import ttk, font as tkfont, filedialog import threading from dataclasses import dataclass import sys import os from typing import Optional, Union, Dict, Any, List from pathlib import Path import datetime # Import aerofiles for IGC file handling from aerofiles.igc import Writer ################################################################################ # Constants & Utilities ################################################################################ METERS_TO_FEET = 3.28084 MPS_TO_KTS = 1.94384 MPS_TO_FPM = 196.85 # Meters per second to feet per minute # Define paths for output files def get_default_igc_dir(): """Get the default directory for IGC files""" documents_dir = os.path.join(os.path.expanduser("~"), "Documents") igc_dir = os.path.join(documents_dir, "AeroflyIGC") os.makedirs(igc_dir, exist_ok=True) return igc_dir def generate_igc_filename(): """Generate a filename for the IGC file based on current date and time""" now = datetime.datetime.now() return f"AEROFLY_{now.strftime('%Y%m%d_%H%M%S')}.igc" ################################################################################ # Data Classes for Parsed ForeFlight Messages ################################################################################ @dataclass class XGPSData: """ Represents ForeFlight XGPS data. e.g. XGPS<sim_name>,<longitude>,<latitude>,<altitude_msl_meters>,<track_true_north>,<groundspeed_m/s> """ sim_name: str longitude: float latitude: float alt_msl_meters: float track_deg: float ground_speed_mps: float @dataclass class XATTData: """ Represents ForeFlight XATT data. e.g. XATT<sim_name>,<true_heading>,<pitch_degrees>,<roll_degrees> """ sim_name: str heading_deg: float pitch_deg: float roll_deg: float @dataclass class UnknownData: """ Returned if we fail to parse or data type is not recognized. """ raw_line: str ################################################################################ # IGC Recorder Class ################################################################################ class IGCRecorder: """ Records flight data to an IGC file using aerofiles library. Handles both standard IGC records and custom attitude data. """ def __init__(self): self.recording = False self.igc_file = None self.writer = None # Reference to aerofiles Writer self.filename = None self.start_time = None self.end_time = None self.fix_count = 0 self._lock = asyncio.Lock() # Queue for position data self.position_queue = asyncio.Queue() # Task for processing queue self.processing_task = None async def start_recording(self, pilot_name="", glider_type="Aerofly FS4", glider_id="SIM", gps_device="Aerofly GPS"): """Start recording a new IGC file""" async with self._lock: if self.recording: print("[IGCRecorder] Already recording") return False try: # Generate filename based on current date and time self.filename = os.path.join(get_default_igc_dir(), generate_igc_filename()) # Open file for writing in binary mode (required by aerofiles) self.igc_file = open(self.filename, 'wb') # Create aerofiles IGC writer self.writer = Writer(self.igc_file) # Get current time try: # For Python 3.11+ utc_now = datetime.datetime.now(datetime.UTC) except AttributeError: # For older Python versions utc_now = datetime.datetime.utcnow() self.start_time = utc_now # Write date record self.writer.write_date(utc_now.date()) # Write basic header records if pilot_name: self.writer.write_pilot(pilot_name) if glider_type: self.writer.write_glider_type(glider_type) if glider_id: self.writer.write_glider_id(glider_id) # Write logger info #self.writer.write_logger_id("AFL", "01") self.writer.write_comment("LOG", "Aerofly FS4 Simulator") self.writer.write_firmware_version("1.0") # Add a comment about the source self.writer.write_comment("GEN", "Generated by Aerofly FS4 to IGC Recorder") # Write extensions declaration for K records self._write_extensions_declaration() # Reset fix count self.fix_count = 0 # Start recording self.recording = True # Start the processing task self.processing_task = asyncio.create_task(self._process_queue()) print(f"[IGCRecorder] Started recording to {self.filename}") return True except Exception as e: print(f"[IGCRecorder] Error starting recording: {e}") # Clean up if error occurs if hasattr(self, 'igc_file') and self.igc_file: self.igc_file.close() self.igc_file = None self.recording = False return False def _write_extensions_declaration(self): """ Write I record to declare extensions used in K records This is a good practice for IGC files with custom extensions """ try: # Write a comment explaining the extensions self.writer.write_comment("ATT", "Attitude data: heading, roll, pitch") # Since aerofiles doesn't support I records directly, we write them manually # Format: I + number of extensions + list of extensions i_record = "I033638HDG3839ROLL4345PITCH\n" self.igc_file.write(i_record.encode('utf-8')) return True except Exception as e: print(f"[IGCRecorder] Error writing extensions declaration: {e}") return False async def stop_recording(self): """Stop recording and close the IGC file""" async with self._lock: if not self.recording: print("[IGCRecorder] Not recording") return False try: # Set end time try: self.end_time = datetime.datetime.now(datetime.UTC) except AttributeError: self.end_time = datetime.datetime.utcnow() # Cancel the processing task if self.processing_task: self.processing_task.cancel() try: await self.processing_task except asyncio.CancelledError: pass self.processing_task = None # Process any remaining items in the queue while not self.position_queue.empty(): try: gps_data, att_data, timestamp = await self.position_queue.get() await self._write_position(gps_data, att_data, timestamp) self.position_queue.task_done() except Exception as e: print(f"[IGCRecorder] Error processing remaining queue items: {e}") # Close the file if hasattr(self, 'igc_file') and self.igc_file: # Add final comment with recording end time if hasattr(self, 'writer'): try: self.writer.write_comment("END", f"Recording ended at {self.end_time.strftime('%H:%M:%S')}") except Exception as e: print(f"[IGCRecorder] Error writing end comment: {e}") # Close the file self.igc_file.close() self.igc_file = None if self.fix_count > 0: print(f"[IGCRecorder] Stopped recording. Wrote {self.fix_count} fixes to {self.filename}") result_file = self.filename self.filename = None self.recording = False return result_file else: print("[IGCRecorder] No data recorded, deleting file") if self.filename and os.path.exists(self.filename): os.remove(self.filename) self.filename = None self.recording = False return None else: self.recording = False return None except Exception as e: print(f"[IGCRecorder] Error stopping recording: {e}") # Clean up even if error occurs if hasattr(self, 'igc_file') and self.igc_file: self.igc_file.close() self.igc_file = None self.recording = False return None async def add_position(self, gps_data: XGPSData, att_data: Optional[XATTData] = None): """Add a position fix to the IGC file""" if not self.recording: return False try: # Get current timestamp - ensure we use the same type of datetime as in start_recording # Check if start_time is timezone-aware if hasattr(self.start_time, 'tzinfo') and self.start_time.tzinfo is not None: # Use timezone-aware datetime try: timestamp = datetime.datetime.now(datetime.UTC) except AttributeError: # For older Python, create an aware datetime timestamp = datetime.datetime.now(datetime.timezone.utc) else: # Use naive datetime to match timestamp = datetime.datetime.utcnow() # Add to queue for processing await self.position_queue.put((gps_data, att_data, timestamp)) return True except Exception as e: print(f"[IGCRecorder] Error adding position: {e}") return False async def _process_queue(self): """Process the position queue in the background""" try: while True: # Get item from queue gps_data, att_data, timestamp = await self.position_queue.get() # Process item await self._write_position(gps_data, att_data, timestamp) # Mark item as done self.position_queue.task_done() # Small sleep to avoid consuming too much CPU await asyncio.sleep(0.01) except asyncio.CancelledError: # Task was cancelled, clean exit print("[IGCRecorder] Position processing task cancelled") raise except Exception as e: print(f"[IGCRecorder] Error in position processing task: {e}") async def _write_k_record(self, timestamp: datetime.time, code: str, data: str): """ Write a custom K record (extension data) to the IGC file Since aerofiles doesn't support K records directly, we write them manually """ if not self.recording or not self.igc_file: return False try: # Format time as HHMMSS time_str = timestamp.strftime('%H%M%S') # Format K record: K + time + data k_record = f"K{time_str}{code}{data}\n" # Write directly to the file - ensure it's encoded to bytes # since we opened the file in binary mode self.igc_file.write(k_record.encode('utf-8')) return True except Exception as e: print(f"[IGCRecorder] Error writing K record: {e}") return False async def _write_position(self, gps_data: XGPSData, att_data: Optional[XATTData] = None, timestamp: datetime.datetime = None): """Write position fix to the IGC file using aerofiles""" async with self._lock: if not self.recording or not self.igc_file or not hasattr(self, 'writer'): return False try: # Use current time if not provided if timestamp is None: try: timestamp = datetime.datetime.now(datetime.UTC) except AttributeError: timestamp = datetime.datetime.utcnow() # Extract data from GPS latitude = gps_data.latitude longitude = gps_data.longitude altitude = int(gps_data.alt_msl_meters) # IGC uses meters # Get pressure altitude (we don't have this in simulator, use MSL as approximation) pressure_alt = altitude # Convert datetime to time object (hours, minutes, seconds) fix_time = timestamp.time() # Write B record (position fix) using aerofiles self.writer.write_fix( time=fix_time, latitude=latitude, longitude=longitude, pressure_alt=pressure_alt, gps_alt=altitude ) # If we have attitude data, write a K record if att_data: hdg = int(att_data.heading_deg) % 360 roll = int(att_data.roll_deg) pitch = int(att_data.pitch_deg) # Format attitude data for K record att_data_str = f"HDG{hdg:03d}ROLL{abs(roll):03d}{'+' if roll >= 0 else '-'}PITCH{abs(pitch):03d}{'+' if pitch >= 0 else '-'}" # Write K record await self._write_k_record(fix_time, "ATT", att_data_str) # Increment fix count self.fix_count += 1 return True except Exception as e: print(f"[IGCRecorder] Error writing position: {e}") return False ################################################################################ # ForeFlight Data Parser ################################################################################ class ForeFlightParser: """ Parses strings in ForeFlight's XGPS / XATT formats, returning typed objects: XGPSData, XATTData, or UnknownData. """ @staticmethod def parse_line(line: str): """ Identify the data type (XGPS, XATT) and parse accordingly. """ line = line.strip() if line.startswith("XGPS"): return ForeFlightParser._parse_xgps(line) elif line.startswith("XATT"): return ForeFlightParser._parse_xatt(line) else: return UnknownData(raw_line=line) @staticmethod def _parse_xgps(line: str) -> Union[XGPSData, UnknownData]: """ Example XGPS line: XGPSMySim,-80.11,34.55,1200.1,359.05,55.6 => XGPS<sim_name>,<longitude>,<latitude>,<alt_msl_meters>,<track_deg_true>,<groundspeed_m/s> """ try: raw = line[4:] # remove 'XGPS' parts = raw.split(",") sim_name = parts[0].strip() longitude = float(parts[1]) latitude = float(parts[2]) alt_msl_meters = float(parts[3]) track_deg = float(parts[4]) ground_speed_mps = float(parts[5]) return XGPSData( sim_name=sim_name, longitude=longitude, latitude=latitude, alt_msl_meters=alt_msl_meters, track_deg=track_deg, ground_speed_mps=ground_speed_mps ) except (ValueError, IndexError): return UnknownData(raw_line=line) @staticmethod def _parse_xatt(line: str) -> Union[XATTData, UnknownData]: """ Example XATT line: XATTMySim,180.2,0.1,0.2 => XATT<sim_name>,<true_heading_deg>,<pitch_deg>,<roll_deg> """ try: raw = line[4:] parts = raw.split(",") sim_name = parts[0].strip() heading_deg = float(parts[1]) pitch_deg = float(parts[2]) roll_deg = float(parts[3]) return XATTData( sim_name=sim_name, heading_deg=heading_deg, pitch_deg=pitch_deg, roll_deg=roll_deg ) except (ValueError, IndexError): return UnknownData(raw_line=line) ################################################################################ # ForeFlight UDP Server ################################################################################ class ForeFlightUDPServer: """ Listens for ForeFlight-compatible data on UDP port 49002. Parses lines and updates the shared SimData and IGC recorder. """ def __init__(self, igc_recorder: IGCRecorder, parser: ForeFlightParser, port: int = 49002): self.igc_recorder = igc_recorder self.parser = parser self.port = port self.socket = None self.lastDataReceivedTime = None # Keep track of latest GPS and ATT data self.latest_gps = None self.latest_att = None async def run(self): """ Main loop: bind to UDP port, receive data, parse, update igc_recorder. We'll do a blocking recv in a background thread so we don't block the event loop. """ # Create UDP socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Enable broadcast & address reuse self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Set blocking to True for our simple loop self.socket.setblocking(True) # Bind to all interfaces on the specified port: self.socket.bind(('0.0.0.0', self.port)) print(f"[ForeFlightUDPServer] Listening on UDP port {self.port}...") print(f"[ForeFlightUDPServer] Ready to receive data from Aerofly FS4...") try: while True: try: # When the socket is closed, this will raise an exception data, addr = await asyncio.to_thread(self.socket.recvfrom, 1024) line = data.decode('utf-8', errors='ignore').strip() parsed_obj = self.parser.parse_line(line) # If we haven't received data in a while, print a message if self.lastDataReceivedTime is None or \ time.time() - self.lastDataReceivedTime > 5.0: print(f"[ForeFlightUDPServer] Receiving data from {addr[0]}:{addr[1]}") print(f"[ForeFlightUDPServer] Sample data: {line}") print(f"[ForeFlightUDPServer] Parsed object: {parsed_obj}") self.lastDataReceivedTime = time.time() # Update latest data if isinstance(parsed_obj, XGPSData): self.latest_gps = parsed_obj # If we're recording and have GPS data, add position if self.igc_recorder.recording and self.latest_gps: await self.igc_recorder.add_position(self.latest_gps, self.latest_att) elif isinstance(parsed_obj, XATTData): self.latest_att = parsed_obj except (ConnectionError, OSError) as e: # Socket was closed print(f"[ForeFlightUDPServer] Socket error: {e}") break except asyncio.CancelledError: # Task was cancelled break except Exception as e: print(f"[ForeFlightUDPServer] Error receiving data: {e}") # Continue despite errors finally: # Ensure socket is closed when the task ends if self.socket: try: self.socket.close() print("[ForeFlightUDPServer] Socket closed") except Exception as ex: print(f"[ForeFlightUDPServer] Error closing socket: {ex}") ################################################################################ # Bridge Core ################################################################################ class AeroflyToIGCBridge: """ High-level orchestrator that sets up: 1) An IGCRecorder object 2) A ForeFlightUDPServer (listens on 49002) 3) Runs them concurrently with asyncio. """ def __init__(self, udp_port=49002): self.igc_recorder = IGCRecorder() self.parser = ForeFlightParser() self.udp_server = ForeFlightUDPServer(self.igc_recorder, self.parser, port=udp_port) async def run(self): await self.udp_server.run() async def start_recording(self, pilot_name="", glider_type="", glider_id=""): """Start recording an IGC file""" return await self.igc_recorder.start_recording( pilot_name=pilot_name, glider_type=glider_type or "Aerofly FS4", glider_id=glider_id or "SIM" ) async def stop_recording(self): """Stop recording and return the filename of the IGC file""" return await self.igc_recorder.stop_recording() @property def is_recording(self): """Return True if recording is in progress""" return self.igc_recorder.recording @property def fix_count(self): """Return the number of fixes recorded so far""" return self.igc_recorder.fix_count @property def current_filename(self): """Return the current IGC filename being recorded""" return self.igc_recorder.filename ################################################################################ # GUI Implementation ################################################################################ class BridgeGUI: def __init__(self, master): self.master = master self.master.title("Aerofly FS4 to IGC Recorder") self.master.geometry("600x580") # Configure grid self.master.grid_columnconfigure(0, weight=1) self.master.grid_rowconfigure(1, weight=1) # Initialize UI elements that will be created in setup_ui self.status_label = None self.connection_status = None self.info_display = None self.recording_status = None self.start_button = None self.stop_button = None self.pilot_entry = None self.glider_entry = None self.registration_entry = None # Initialize bridge components self.bridge = None self.bridge_task = None self.event_loop = None self.bridge_thread = None # Set up UI components self.setup_ui() # Set up window close protocol self.master.protocol("WM_DELETE_WINDOW", self.close_application) # Start the bridge self.start_bridge() def setup_ui(self): """Set up the user interface components""" # Status Frame status_frame = ttk.Frame(self.master, padding="10") status_frame.grid(row=0, column=0, sticky="ew") # Status Label self.status_label = ttk.Label( status_frame, text="Status: Starting...", font=tkfont.Font(size=10) ) self.status_label.pack(side="left") # Connection status self.connection_status = ttk.Label( status_frame, text="Disconnected", foreground="red", font=tkfont.Font(size=10, weight="bold") ) self.connection_status.pack(side="right") # Info Frame info_frame = ttk.Frame(self.master, padding="10") info_frame.grid(row=1, column=0, sticky="nsew") # Info Display self.info_display = tk.Text( info_frame, wrap=tk.WORD, width=60, height=20, font=tkfont.Font(family="Consolas", size=9) ) self.info_display.pack(fill=tk.BOTH, expand=True) # Recording Status recording_frame = ttk.Frame(self.master, padding="10") recording_frame.grid(row=2, column=0, sticky="ew") self.recording_status = ttk.Label( recording_frame, text="Not Recording", foreground="gray", font=tkfont.Font(size=12, weight="bold") ) self.recording_status.pack(side="top", fill="x") # Flight details frame flight_frame = ttk.Frame(self.master, padding="10") flight_frame.grid(row=3, column=0, sticky="ew") # Pilot name ttk.Label(flight_frame, text="Pilot Name:").grid(row=0, column=0, sticky="w", padx=5, pady=5) self.pilot_entry = ttk.Entry(flight_frame, width=30) self.pilot_entry.grid(row=0, column=1, sticky="ew", padx=5, pady=5) self.pilot_entry.insert(0, "Simulator Pilot") # Glider type ttk.Label(flight_frame, text="Aircraft Type:").grid(row=1, column=0, sticky="w", padx=5, pady=5) self.glider_entry = ttk.Entry(flight_frame, width=30) self.glider_entry.grid(row=1, column=1, sticky="ew", padx=5, pady=5) self.glider_entry.insert(0, "Aerofly FS4") # Registration ttk.Label(flight_frame, text="Registration:").grid(row=2, column=0, sticky="w", padx=5, pady=5) self.registration_entry = ttk.Entry(flight_frame, width=30) self.registration_entry.grid(row=2, column=1, sticky="ew", padx=5, pady=5) self.registration_entry.insert(0, "SIM") # Configure the grid columns flight_frame.columnconfigure(1, weight=1) # Control Frame control_frame = ttk.Frame(self.master, padding="10") control_frame.grid(row=4, column=0, sticky="ew") # Buttons container buttons_frame = ttk.Frame(control_frame) buttons_frame.pack(fill=tk.X, expand=True) # Start Recording Button self.start_button = ttk.Button( buttons_frame, text="Start Recording", command=self.start_recording ) self.start_button.pack(side="left", padx=5) # Stop Recording Button self.stop_button = ttk.Button( buttons_frame, text="Stop Recording", command=self.stop_recording, state="disabled" ) self.stop_button.pack(side="left", padx=5) # Open IGC Folder Button self.open_folder_button = ttk.Button( buttons_frame, text="Open IGC Folder", command=self.open_igc_folder ) self.open_folder_button.pack(side="right", padx=5) # Close Button self.close_button = ttk.Button( control_frame, text="Close", command=self.close_application ) self.close_button.pack(side="bottom", pady=10) def open_igc_folder(self): """Open the folder containing IGC files""" igc_dir = get_default_igc_dir() if os.path.exists(igc_dir): if sys.platform == 'win32': os.startfile(igc_dir) elif sys.platform == 'darwin': # macOS import subprocess subprocess.Popen(['open', igc_dir]) else: # Linux import subprocess subprocess.Popen(['xdg-open', igc_dir]) else: print(f"IGC directory does not exist: {igc_dir}") def start_recording(self): """Start recording an IGC file""" if not self.bridge: self.show_error("Bridge not initialized") return # Get pilot info from entries pilot_name = self.pilot_entry.get() or "Simulator Pilot" glider_type = self.glider_entry.get() or "Aerofly FS4" glider_id = self.registration_entry.get() or "SIM" # Schedule the recording start in the bridge's event loop if self.event_loop: future = asyncio.run_coroutine_threadsafe( self.bridge.start_recording( pilot_name=pilot_name, glider_type=glider_type, glider_id=glider_id ), self.event_loop ) try: # Wait for result with a timeout result = future.result(timeout=1.0) if result: # Update UI self.recording_status.config( text="Recording...", foreground="green" ) self.start_button.config(state="disabled") self.stop_button.config(state="normal") # Disable entries during recording self.pilot_entry.config(state="disabled") self.glider_entry.config(state="disabled") self.registration_entry.config(state="disabled") # Update status if self.status_label: self.status_label.config(text="Status: Recording flight") else: self.show_error("Failed to start recording") except Exception as e: self.show_error(f"Error starting recording: {e}") def stop_recording(self): """Stop recording and save the IGC file""" if not self.bridge: self.show_error("Bridge not initialized") return # Schedule the recording stop in the bridge's event loop if self.event_loop: future = asyncio.run_coroutine_threadsafe( self.bridge.stop_recording(), self.event_loop ) try: # Wait for result with a timeout result = future.result(timeout=1.0) # Update UI self.recording_status.config( text="Not Recording", foreground="gray" ) self.start_button.config(state="normal") self.stop_button.config(state="disabled") # Re-enable entries self.pilot_entry.config(state="normal") self.glider_entry.config(state="normal") self.registration_entry.config(state="normal") # Update status if self.status_label: if result: self.status_label.config(text=f"Status: Flight saved to {os.path.basename(result)}") # Show success message tk.messagebox.showinfo( "Recording Complete", f"Flight recorded and saved to:\n{result}" ) else: self.status_label.config(text="Status: No flight data recorded") except Exception as e: self.show_error(f"Error stopping recording: {e}") def start_bridge(self): """Start the Aerofly-IGC bridge in a separate thread""" def run_bridge(): try: self.event_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.event_loop) self.bridge = AeroflyToIGCBridge() # Update status if self.master and self.status_label: self.master.after(0, lambda: self.status_label.config( text="Status: Bridge started" )) # Start monitoring UDP data if self.master: self.master.after(100, self.check_udp_data) self.bridge_task = self.event_loop.create_task(self.bridge.run()) self.event_loop.run_forever() except Exception as e: if self.master: self.master.after(0, lambda: self.show_error(str(e))) self.bridge_thread = threading.Thread(target=run_bridge, daemon=True) self.bridge_thread.start() def check_udp_data(self): """Check if we're receiving UDP data and update the UI accordingly""" # Make sure the app hasn't been closed if not self.master or not self.master.winfo_exists(): return # Get current time for tracking connection status current_time = time.time() # Default connection state connection_state = "Disconnected" connection_color = "red" # Default info text info_text = f"=== Connection Status ===\n" info_text += f"UDP Input: Listening on port 49002\n" # Check if we have recent data (within last 5 seconds) has_recent_data = False elapsed_time = 0 if self.bridge and self.bridge.udp_server.lastDataReceivedTime is not None: elapsed_time = current_time - self.bridge.udp_server.lastDataReceivedTime has_recent_data = elapsed_time < 5.0 if has_recent_data: connection_state = "Connected" connection_color = "green" info_text += f"Last data received: {elapsed_time:.1f} seconds ago\n" else: connection_state = "No Data" connection_color = "orange" info_text += f"No data in {elapsed_time:.1f} seconds\n" info_text += f"Make sure Aerofly FS4 is running and ForeFlight output is enabled\n" else: info_text += "Waiting for first data from Aerofly FS4...\n" info_text += "1. Make sure Aerofly FS4 is running\n" info_text += "2. Check that 'Output data to ForeFlight' is enabled in Aerofly settings\n" info_text += "3. Ensure this computer's firewall allows UDP port 49002\n" # Update connection status if self.connection_status: self.connection_status.config( text=connection_state, foreground=connection_color ) # If we have simulator data, show it if self.bridge and self.bridge.udp_server.latest_gps: gps = self.bridge.udp_server.latest_gps att = self.bridge.udp_server.latest_att # Add simulator data to info display info_text += f"\n=== Simulator Data ===\n" info_text += f"Latitude: {gps.latitude:.6f}°\n" info_text += f"Longitude: {gps.longitude:.6f}°\n" info_text += f"Altitude: {gps.alt_msl_meters * METERS_TO_FEET:.0f} ft\n" info_text += f"Speed: {gps.ground_speed_mps * MPS_TO_KTS:.1f} kts\n" if att: info_text += f"Roll: {att.roll_deg:.1f}°\n" info_text += f"Pitch: {att.pitch_deg:.1f}°\n" info_text += f"Heading: {att.heading_deg:.1f}°\n" # Add recording status if self.bridge and self.bridge.is_recording: info_text += f"\n=== Recording Status ===\n" info_text += f"Recording to: {os.path.basename(self.bridge.current_filename)}\n" info_text += f"Fixes recorded: {self.bridge.fix_count}\n" # Calculate recording duration if self.bridge.igc_recorder.start_time: try: # If start_time is timezone-aware if hasattr(self.bridge.igc_recorder.start_time, 'tzinfo') and self.bridge.igc_recorder.start_time.tzinfo is not None: # Use timezone-aware datetime try: current = datetime.datetime.now(datetime.UTC) except AttributeError: current = datetime.datetime.now(datetime.timezone.utc) else: # If start_time is naive (no timezone) current = datetime.datetime.utcnow() duration = current - self.bridge.igc_recorder.start_time hours, remainder = divmod(duration.total_seconds(), 3600) minutes, seconds = divmod(remainder, 60) info_text += f"Duration: {int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}\n" except Exception as e: info_text += f"Duration calculation error: {e}\n" # Update the info display if self.info_display: self.info_display.delete(1.0, tk.END) self.info_display.insert(tk.END, info_text) # Schedule next check if the app is still running if self.master and self.master.winfo_exists(): self.master.after(100, self.check_udp_data) def show_error(self, error_msg): """Display error message in the info display""" if self.info_display: self.info_display.delete(1.0, tk.END) self.info_display.insert(tk.END, f"Error: {error_msg}") if self.status_label: self.status_label.config(text="Status: Error") if self.connection_status: self.connection_status.config(text="Error", foreground="red") def close_application(self): """Clean up and close the application""" # If recording, stop it if self.bridge and self.bridge.is_recording and self.event_loop: try: # Schedule the recording stop in the bridge's event loop future = asyncio.run_coroutine_threadsafe( self.bridge.stop_recording(), self.event_loop ) # Wait for result with a short timeout future.result(timeout=0.5) except Exception as e: print(f"Error stopping recording during shutdown: {e}") # First destroy the UI to prevent more UI updates if self.master: # Create local reference to avoid using self.master after destroy master_copy = self.master self.master = None # Remove reference to prevent future access master_copy.destroy() async def shutdown_async(): try: # Cancel the main bridge task if self.bridge_task: self.bridge_task.cancel() try: await self.bridge_task except asyncio.CancelledError: pass # Close socket if it exists if self.bridge and self.bridge.udp_server and self.bridge.udp_server.socket: self.bridge.udp_server.socket.close() except Exception as err: print(f"Error during async shutdown: {err}") if self.event_loop and self.event_loop.is_running(): try: # Schedule the shutdown coroutine and stop the loop asyncio.run_coroutine_threadsafe(shutdown_async(), self.event_loop) # Give it a moment to execute time.sleep(0.2) # Then stop the loop self.event_loop.stop() except Exception as e: print(f"Error during shutdown: {e}") ################################################################################ # Entry Point ################################################################################ def run_cli(): """Run the bridge in command-line mode (no GUI)""" bridge = AeroflyToIGCBridge() print("Starting Aerofly FS4 to IGC Recorder in CLI mode...") print("Press Ctrl+C to exit") # Run UDP server in the background asyncio.create_task(bridge.run()) # Main CLI loop try: loop = asyncio.get_event_loop() print("\nCommands:") print(" start - Start recording") print(" stop - Stop recording") print(" status - Show status") print(" exit - Exit program") async def cli_loop(): while True: print("\n> ", end="", flush=True) command = await loop.run_in_executor(None, input) if command.lower() == "start": if bridge.is_recording: print("Already recording") else: pilot = await loop.run_in_executor(None, lambda: input( "Pilot name [Simulator Pilot]: ") or "Simulator Pilot") glider = await loop.run_in_executor(None, lambda: input( "Aircraft type [Aerofly FS4]: ") or "Aerofly FS4") reg = await loop.run_in_executor(None, lambda: input("Registration [SIM]: ") or "SIM") result = await bridge.start_recording(pilot_name=pilot, glider_type=glider, glider_id=reg) if result: print(f"Recording started to {bridge.current_filename}") else: print("Failed to start recording") elif command.lower() == "stop": if not bridge.is_recording: print("Not recording") else: result = await bridge.stop_recording() if result: print(f"Recording stopped. Flight saved to {result}") else: print("No flight data recorded") elif command.lower() == "status": if bridge.udp_server.latest_gps: gps = bridge.udp_server.latest_gps print( f"Last data: Lat={gps.latitude:.6f}, Lon={gps.longitude:.6f}, Alt={gps.alt_msl_meters * METERS_TO_FEET:.0f}ft") if bridge.is_recording: print(f"Recording to: {bridge.current_filename}") print(f"Fixes recorded: {bridge.fix_count}") else: print("No data received yet") elif command.lower() == "exit": if bridge.is_recording: confirm = await loop.run_in_executor(None, lambda: input( "Recording in progress. Stop recording and exit? (y/n): ")) if confirm.lower() != 'y': continue await bridge.stop_recording() break else: print("Unknown command") # Run CLI loop loop.run_until_complete(cli_loop()) except KeyboardInterrupt: print("\nProgram interrupted.") finally: # If recording, stop it if bridge.is_recording: print("Stopping recording...") loop.run_until_complete(bridge.stop_recording()) print("Bridge shutting down.") def run_gui(): """Run the bridge with GUI""" # Check if tkinter is available try: import tkinter.messagebox root = tk.Tk() BridgeGUI(root) root.mainloop() except ImportError: print("Tkinter not available, running in CLI mode") run_cli() except Exception as e: print(f"Error starting GUI: {e}") print("Falling back to CLI mode") run_cli() if __name__ == "__main__": # By default, run with GUI # Use '--cli' flag to run without GUI if len(sys.argv) > 1 and sys.argv[1] == '--cli': run_cli() else: run_gui()