Coverage for app/aircraft/gps_import.py: 100%
358 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1"""GPS log file parsing for aircraft logbook import — Phase 30.
3Supported formats:
4- GPX 1.1 (SkyDemon, ForeFlight): speed in m/s, UTC timestamps
5- Garmin GTN/G1000 CSV: 3-row header, local time + UTC offset, GndSpd in kt
6- KML with gx:Track (SkyDemon): lon/lat/alt order, speed derived from consecutive points
7"""
9from __future__ import annotations
11import csv
12import functools
13import io
14import math
15import os
16import re
17from dataclasses import dataclass
18from datetime import datetime, timezone
19from typing import Any
20import defusedxml.ElementTree as ET # guards against XML bomb / entity expansion
21from xml.etree.ElementTree import ParseError as _ETParseError
23# ── Constants ─────────────────────────────────────────────────────────────────
25_MS_TO_KT = 1.94384 # m/s → knots
26_FT_TO_M = 0.3048 # ft → metres
27_KM_PER_NM = 1.852 # km per nautical mile
29_FLIGHT_SPEED_KT = 30.0 # sustained above this → airborne
30_GROUND_MOVE_KT = 5.0 # above this (but not 30kt for 30s) → ground movement
31_FLIGHT_SUSTAIN_S = 30.0 # seconds above 30kt required to classify as "flight"
32_SEGMENT_GAP_S = 300.0 # 5 min of slow speed or time gap → segment break
33_MAX_ICAO_DIST_KM = 5.0 # max distance for nearest-airport match
34_MAX_TRACK_POINTS = 500 # downsample threshold for GeoJSON storage
36# ── Data structures ───────────────────────────────────────────────────────────
39@dataclass
40class TrackPoint:
41 lat: float
42 lon: float
43 alt_m: float
44 speed_kt: float
45 utc_dt: datetime # always timezone-aware UTC
48@dataclass
49class FlightSegment:
50 trackpoints: list[TrackPoint]
51 block_off_utc: datetime
52 takeoff_utc: datetime | None
53 landing_utc: datetime | None
54 block_on_utc: datetime
55 departure_icao: str | None
56 arrival_icao: str | None
57 flight_time_raw_h: float # block_on − block_off in decimal hours
58 flight_time_rounded_h: float # rounded per aircraft precision setting
59 track_geojson: dict[str, Any] # GeoJSON Feature
60 landing_count: int
61 is_ground_only: bool # True when no airborne portion detected
62 hint_departure_icao: str | None
63 hint_arrival_icao: str | None
66@dataclass
67class ParsedGpsFile:
68 trackpoints: list[TrackPoint]
69 format: str # "gpx" | "kml" | "garmin_csv"
70 source_filename: str
71 classification: str # "flight" | "ground_movement" | "empty"
72 hint_departure_icao: str | None
73 hint_arrival_icao: str | None
74 device_id: str | None = None # avionics unit identifier (e.g. Garmin system_id)
77# ── Airport database ──────────────────────────────────────────────────────────
80@functools.lru_cache(maxsize=1)
81def _load_airports() -> dict[str, tuple[float, float]]:
82 """Load app/data/airports.csv once. Returns {icao: (lat, lon)}.
84 Only 4-letter ICAO codes are included. Returns an empty dict if the
85 data file is missing (ICAO lookup will return None for all queries).
86 """
87 data_path = os.path.join(os.path.dirname(__file__), "..", "data", "airports.csv")
88 airports: dict[str, tuple[float, float]] = {}
90 if os.path.exists(data_path):
91 with open(data_path, newline="", encoding="utf-8") as f:
92 reader = csv.DictReader(f)
93 for row in reader:
94 ident = row.get("ident", "").strip()
95 if not re.match(r"^[A-Z]{4}$", ident):
96 continue
97 try:
98 lat = float(row["latitude_deg"])
99 lon = float(row["longitude_deg"])
100 except (ValueError, KeyError):
101 continue
102 airports[ident] = (lat, lon)
104 return airports
107def _reset_airports_cache() -> None:
108 """Reset the airport cache (for testing)."""
109 _load_airports.cache_clear()
112# ── Haversine ─────────────────────────────────────────────────────────────────
115def _haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
116 """Great-circle distance in kilometres."""
117 R = 6371.0
118 phi1 = math.radians(lat1)
119 phi2 = math.radians(lat2)
120 dphi = math.radians(lat2 - lat1)
121 dlambda = math.radians(lon2 - lon1)
122 a = (
123 math.sin(dphi / 2) ** 2
124 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
125 )
126 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
129# ── Format detection ──────────────────────────────────────────────────────────
132def detect_format(data: bytes, filename: str) -> str:
133 """Return "gpx", "kml", or "garmin_csv". Raise ValueError for unknown format."""
134 ext = os.path.splitext(filename.lower())[1]
135 if ext == ".gpx":
136 return "gpx"
137 if ext == ".kml":
138 return "kml"
139 if ext == ".csv":
140 lines = data.decode("utf-8-sig", errors="replace").splitlines()
141 if lines and lines[0].startswith("#airframe_info"):
142 return "garmin_csv"
143 raise ValueError(f"Unsupported GPS file format: {filename!r}")
146# ── File classification ───────────────────────────────────────────────────────
149def classify_track(trackpoints: list[TrackPoint]) -> str:
150 """Return "flight", "ground_movement", or "empty"."""
151 if not trackpoints:
152 return "empty"
154 max_speed = max(tp.speed_kt for tp in trackpoints)
155 if max_speed <= _GROUND_MOVE_KT:
156 return "empty"
158 # Check for sustained window above 30kt
159 fast_window_s = 0.0
160 for prev, tp in zip(trackpoints, trackpoints[1:]):
161 if tp.speed_kt > _FLIGHT_SPEED_KT:
162 dt = (tp.utc_dt - prev.utc_dt).total_seconds()
163 fast_window_s += max(0.0, dt)
164 if fast_window_s >= _FLIGHT_SUSTAIN_S:
165 return "flight"
166 else:
167 fast_window_s = 0.0
169 return "ground_movement"
172# ── GPX parser ────────────────────────────────────────────────────────────────
174_GPX_NS = "http://www.topografix.com/GPX/1/1"
177def _extract_icao_hints(text: str) -> tuple[str | None, str | None]:
178 """Extract departure and arrival ICAO codes from a track name string."""
179 icao_matches = re.findall(r"\b([A-Z]{4})\b", text)
180 dep = icao_matches[0] if len(icao_matches) >= 1 else None
181 arr = icao_matches[-1] if len(icao_matches) >= 2 else None
182 return dep, arr
185def _parse_gpx(data: bytes, filename: str) -> ParsedGpsFile:
186 """Parse GPX 1.1 track. Speed field is in m/s; converted to kt."""
187 try:
188 root = ET.fromstring(data.decode("utf-8-sig", errors="replace"))
189 except _ETParseError as exc:
190 raise ValueError(f"Invalid GPX XML in {filename!r}: {exc}") from exc
192 hint_dep: str | None = None
193 hint_arr: str | None = None
194 name_el = root.find(f".//{{{_GPX_NS}}}name")
195 if name_el is not None and name_el.text:
196 hint_dep, hint_arr = _extract_icao_hints(name_el.text)
198 trackpoints: list[TrackPoint] = []
199 for trkpt in root.findall(f".//{{{_GPX_NS}}}trkpt"):
200 try:
201 lat = float(trkpt.get("lat", ""))
202 lon = float(trkpt.get("lon", ""))
203 except (ValueError, TypeError):
204 continue
206 ele_el = trkpt.find(f"{{{_GPX_NS}}}ele")
207 alt_m = float(ele_el.text) if ele_el is not None and ele_el.text else 0.0
209 speed_el = trkpt.find(f"{{{_GPX_NS}}}speed")
210 speed_kt = (
211 float(speed_el.text) * _MS_TO_KT
212 if speed_el is not None and speed_el.text
213 else 0.0
214 )
216 time_el = trkpt.find(f"{{{_GPX_NS}}}time")
217 if time_el is None or not time_el.text:
218 continue
219 try:
220 utc_dt = datetime.fromisoformat(time_el.text.replace("Z", "+00:00"))
221 except ValueError:
222 continue
224 trackpoints.append(
225 TrackPoint(lat=lat, lon=lon, alt_m=alt_m, speed_kt=speed_kt, utc_dt=utc_dt)
226 )
228 return ParsedGpsFile(
229 trackpoints=trackpoints,
230 format="gpx",
231 source_filename=filename,
232 classification=classify_track(trackpoints),
233 hint_departure_icao=hint_dep,
234 hint_arrival_icao=hint_arr,
235 )
238# ── Garmin CSV parser ─────────────────────────────────────────────────────────
240_VALID_GPS_FIX = {"3D", "3DDiff"}
243def _parse_garmin_csv(data: bytes, filename: str) -> ParsedGpsFile:
244 """Parse Garmin GTN/G1000 CSV with 3-row header.
246 Row 0: #airframe_info metadata
247 Row 1: unit labels
248 Row 2: column names (Lcl Date, Lcl Time, UTCOfst, Latitude, Longitude, AltMSL, GndSpd, …, GPSfix, …)
249 Only rows with GPSfix in {"3D", "3DDiff"} are used.
250 Departure ICAO hint is extracted from filename: log_YYMMDD_HHMMSS_ICAO.csv
251 """
252 text = data.decode("utf-8-sig", errors="replace")
253 lines = text.splitlines()
255 if len(lines) < 4:
256 raise ValueError(f"Garmin CSV too short: {filename!r}")
258 # Device ID from #airframe_info header line
259 device_id: str | None = None
260 _did_match = re.search(r'system_id="([^"]+)"', lines[0])
261 if _did_match:
262 device_id = _did_match.group(1)
264 # Departure ICAO from filename pattern
265 hint_dep: str | None = None
266 base = os.path.splitext(os.path.basename(filename))[0]
267 parts = base.split("_")
268 if len(parts) >= 4:
269 candidate = parts[-1].strip()
270 if re.match(r"^[A-Z]{4}$", candidate):
271 hint_dep = candidate
273 # Skip rows 0–1 (metadata + units), use row 2 as header
274 csv_text = "\n".join(lines[2:])
275 reader = csv.DictReader(io.StringIO(csv_text))
276 if reader.fieldnames:
277 reader.fieldnames = [f.strip() for f in reader.fieldnames]
279 trackpoints: list[TrackPoint] = []
280 for row in reader:
281 gpsfx = row.get("GPSfix", "").strip()
282 if gpsfx not in _VALID_GPS_FIX:
283 continue
285 try:
286 lat = float(row["Latitude"].strip())
287 lon = float(row["Longitude"].strip())
288 except (ValueError, KeyError):
289 continue
291 try:
292 alt_m = float(row["AltMSL"].strip()) * _FT_TO_M
293 except (ValueError, KeyError):
294 alt_m = 0.0
296 try:
297 speed_kt = float(row["GndSpd"].strip())
298 except (ValueError, KeyError):
299 speed_kt = 0.0
301 try:
302 date_str = row["Lcl Date"].strip()
303 time_str = row["Lcl Time"].strip()
304 utc_off = row["UTCOfst"].strip()
305 local_dt = datetime.fromisoformat(f"{date_str}T{time_str}{utc_off}")
306 utc_dt = local_dt.astimezone(timezone.utc)
307 except (ValueError, KeyError):
308 continue
310 trackpoints.append(
311 TrackPoint(lat=lat, lon=lon, alt_m=alt_m, speed_kt=speed_kt, utc_dt=utc_dt)
312 )
314 return ParsedGpsFile(
315 trackpoints=trackpoints,
316 format="garmin_csv",
317 source_filename=filename,
318 classification=classify_track(trackpoints),
319 hint_departure_icao=hint_dep,
320 hint_arrival_icao=None,
321 device_id=device_id,
322 )
325# ── KML parser ────────────────────────────────────────────────────────────────
327_KML_NS = "http://www.opengis.net/kml/2.2"
328_GX_NS = "http://www.google.com/kml/ext/2.2"
331def _parse_kml(data: bytes, filename: str) -> ParsedGpsFile:
332 """Parse SkyDemon KML with gx:Track.
334 Coordinate order is lon/lat/alt (note: reversed from GPX).
335 Speed is derived from consecutive point distance / time delta.
336 """
337 try:
338 root = ET.fromstring(data.decode("utf-8-sig", errors="replace"))
339 except _ETParseError as exc:
340 raise ValueError(f"Invalid KML XML in {filename!r}: {exc}") from exc
342 hint_dep: str | None = None
343 hint_arr: str | None = None
344 for pm in root.findall(f".//{{{_KML_NS}}}Placemark"):
345 name_el = pm.find(f"{{{_KML_NS}}}name")
346 if name_el is not None and name_el.text:
347 dep, arr = _extract_icao_hints(name_el.text)
348 if dep and arr:
349 hint_dep, hint_arr = dep, arr
350 break
352 track_el = root.find(f".//{{{_GX_NS}}}Track")
353 if track_el is None:
354 raise ValueError(f"No gx:Track element in KML: {filename!r}")
356 whens: list[datetime | None] = []
357 coords: list[tuple[float, float, float]] = []
359 for child in track_el:
360 if child.tag == f"{{{_KML_NS}}}when":
361 if child.text:
362 try:
363 dt = datetime.fromisoformat(child.text.replace("Z", "+00:00"))
364 whens.append(dt.astimezone(timezone.utc))
365 except ValueError:
366 whens.append(None)
367 else:
368 whens.append(None)
369 elif child.tag == f"{{{_GX_NS}}}coord":
370 if child.text:
371 parts = child.text.strip().split()
372 if len(parts) >= 3:
373 try:
374 lon_c = float(parts[0])
375 lat_c = float(parts[1])
376 alt_c = float(parts[2])
377 except ValueError:
378 lon_c, lat_c, alt_c = 0.0, 0.0, 0.0
379 coords.append((lon_c, lat_c, alt_c))
380 continue
381 coords.append((0.0, 0.0, 0.0))
383 if len(whens) != len(coords):
384 raise ValueError(
385 f"KML when/coord count mismatch in {filename!r}: "
386 f"{len(whens)} vs {len(coords)}"
387 )
389 trackpoints: list[TrackPoint] = []
390 for i, (when, (lon, lat, alt_m)) in enumerate(zip(whens, coords)):
391 if when is None:
392 continue
394 if trackpoints:
395 prev = trackpoints[-1]
396 dt_s = (when - prev.utc_dt).total_seconds()
397 dist_km = _haversine_km(prev.lat, prev.lon, lat, lon)
398 speed_kt = (dist_km / _KM_PER_NM * 3600.0 / dt_s) if dt_s > 0 else 0.0
399 else:
400 speed_kt = 0.0
402 trackpoints.append(
403 TrackPoint(lat=lat, lon=lon, alt_m=alt_m, speed_kt=speed_kt, utc_dt=when)
404 )
406 return ParsedGpsFile(
407 trackpoints=trackpoints,
408 format="kml",
409 source_filename=filename,
410 classification=classify_track(trackpoints),
411 hint_departure_icao=hint_dep,
412 hint_arrival_icao=hint_arr,
413 )
416# ── Entry point ───────────────────────────────────────────────────────────────
419def parse_gps_file(data: bytes, filename: str) -> ParsedGpsFile:
420 """Detect format and parse. Raises ValueError on unsupported or invalid data."""
421 fmt = detect_format(data, filename)
422 if fmt == "gpx":
423 return _parse_gpx(data, filename)
424 if fmt == "kml":
425 return _parse_kml(data, filename)
426 return _parse_garmin_csv(data, filename)
429# ── Track merge ───────────────────────────────────────────────────────────────
432def merge_and_sort(files: list[ParsedGpsFile]) -> list[TrackPoint]:
433 """Merge non-empty trackpoints from all files, sorted chronologically."""
434 all_pts: list[TrackPoint] = []
435 for f in files:
436 if f.classification != "empty":
437 all_pts.extend(f.trackpoints)
438 all_pts.sort(key=lambda tp: tp.utc_dt)
439 return all_pts
442# ── Segment detection ─────────────────────────────────────────────────────────
445def _split_into_raw_groups(trackpoints: list[TrackPoint]) -> list[list[TrackPoint]]:
446 """Split merged trackpoints into groups at slow/time gaps ≥ 5 min.
448 Only looks for breaks between the first and last fast (≥ 30kt) points, so
449 pre-flight taxi and post-landing taxi are preserved in the enclosing segment.
450 """
451 n = len(trackpoints)
452 if n == 0:
453 return []
455 fast_indices = [
456 i for i, tp in enumerate(trackpoints) if tp.speed_kt >= _FLIGHT_SPEED_KT
457 ]
458 if not fast_indices:
459 return [trackpoints]
461 first_fast = fast_indices[0]
462 last_fast = fast_indices[-1]
464 groups: list[list[TrackPoint]] = []
465 current_start = 0
466 i = first_fast
468 while i < last_fast:
469 # Large time gap between consecutive points (gap between uploaded files)
470 time_gap = (trackpoints[i + 1].utc_dt - trackpoints[i].utc_dt).total_seconds()
471 if time_gap >= _SEGMENT_GAP_S:
472 groups.append(trackpoints[current_start : i + 1])
473 current_start = i + 1
474 i += 1
475 continue
477 # Slow run starting at i+1
478 if trackpoints[i + 1].speed_kt < _FLIGHT_SPEED_KT:
479 j = i + 2
480 while j <= last_fast and trackpoints[j].speed_kt < _FLIGHT_SPEED_KT:
481 j += 1
482 slow_dur = (
483 trackpoints[j - 1].utc_dt - trackpoints[i + 1].utc_dt
484 ).total_seconds()
485 if slow_dur >= _SEGMENT_GAP_S:
486 # Real segment break — exclude slow gap from both segments
487 groups.append(trackpoints[current_start : i + 1])
488 current_start = j
489 i = j
490 else:
491 i = j # short slow run — keep in current segment
492 else:
493 i += 1
495 groups.append(trackpoints[current_start:])
496 return [g for g in groups if g]
499def _count_landings(pts: list[TrackPoint]) -> int:
500 """Count transitions from airborne (≥30kt) to ground (<30kt)."""
501 count = 0
502 was_fast = False
503 for tp in pts:
504 is_fast = tp.speed_kt >= _FLIGHT_SPEED_KT
505 if was_fast and not is_fast:
506 count += 1
507 was_fast = is_fast
508 return count
511def detect_segments(
512 trackpoints: list[TrackPoint],
513 aircraft_precision: str = "tenth_hour",
514 hint_dep: str | None = None,
515 hint_arr: str | None = None,
516) -> list[FlightSegment]:
517 """Build FlightSegment objects from merged trackpoints.
519 hint_dep / hint_arr are optional ICAO codes from GPX/KML track names or
520 Garmin filename patterns, used as fallback when GPS-proximity lookup fails.
521 """
522 raw_groups = _split_into_raw_groups(trackpoints)
523 airports = _load_airports()
524 segments: list[FlightSegment] = []
526 for idx, pts in enumerate(raw_groups):
527 block_off = pts[0].utc_dt
528 block_on = pts[-1].utc_dt
530 takeoff_utc: datetime | None = None
531 landing_utc: datetime | None = None
532 for tp in pts:
533 if tp.speed_kt >= _FLIGHT_SPEED_KT:
534 if takeoff_utc is None:
535 takeoff_utc = tp.utc_dt
536 landing_utc = tp.utc_dt
538 is_ground_only = takeoff_utc is None
539 landing_count = _count_landings(pts)
541 raw_h = (block_on - block_off).total_seconds() / 3600.0
542 rounded_h = round_flight_time(raw_h, aircraft_precision)
544 dep_icao = resolve_icao(pts[0].lat, pts[0].lon, airports) or (
545 hint_dep if idx == 0 else None
546 )
547 arr_icao = resolve_icao(pts[-1].lat, pts[-1].lon, airports) or (
548 hint_arr if idx == len(raw_groups) - 1 else None
549 )
551 downsampled = downsample_track(pts)
552 geojson = build_geojson(downsampled)
554 segments.append(
555 FlightSegment(
556 trackpoints=pts,
557 block_off_utc=block_off,
558 takeoff_utc=takeoff_utc,
559 landing_utc=landing_utc,
560 block_on_utc=block_on,
561 departure_icao=dep_icao,
562 arrival_icao=arr_icao,
563 flight_time_raw_h=raw_h,
564 flight_time_rounded_h=rounded_h,
565 track_geojson=geojson,
566 landing_count=landing_count,
567 is_ground_only=is_ground_only,
568 hint_departure_icao=hint_dep if idx == 0 else None,
569 hint_arrival_icao=hint_arr if idx == len(raw_groups) - 1 else None,
570 )
571 )
573 return segments
576# ── ICAO resolution ───────────────────────────────────────────────────────────
579def resolve_icao(
580 lat: float,
581 lon: float,
582 airports: dict[str, tuple[float, float]] | None = None,
583) -> str | None:
584 """Return the nearest ICAO code within 5 km, or None if none is close enough."""
585 if airports is None:
586 airports = _load_airports()
588 best_code: str | None = None
589 best_dist = _MAX_ICAO_DIST_KM
591 for code, (ap_lat, ap_lon) in airports.items():
592 d = _haversine_km(lat, lon, ap_lat, ap_lon)
593 if d < best_dist:
594 best_dist = d
595 best_code = code
597 return best_code
600# ── Time rounding ─────────────────────────────────────────────────────────────
603def round_flight_time(raw_hours: float, precision: str) -> float:
604 """Round raw_hours up to the nearest precision boundary.
606 precision="tenth_hour": round up to nearest 0.1 h (6-min boundary).
607 precision="minute": round up to nearest 1/60 h (1-min boundary).
608 """
609 if raw_hours <= 0:
610 return 0.0
611 if precision == "minute":
612 minutes = math.ceil(raw_hours * 60)
613 return round(minutes / 60, 4)
614 # tenth_hour: ceiling to nearest 0.1
615 return round(math.ceil(raw_hours * 10) / 10, 1)
618# ── GeoJSON / downsampling ────────────────────────────────────────────────────
621def downsample_track(
622 trackpoints: list[TrackPoint], max_points: int = _MAX_TRACK_POINTS
623) -> list[TrackPoint]:
624 """Return ≤ max_points trackpoints using uniform stride; first and last preserved."""
625 n = len(trackpoints)
626 if n <= max_points:
627 return trackpoints
629 stride = n / max_points
630 indices: set[int] = {round(i * stride) for i in range(max_points)}
631 indices.add(0)
632 indices.add(n - 1)
633 return [trackpoints[i] for i in sorted(indices) if i < n]
636def build_geojson(trackpoints: list[TrackPoint]) -> dict[str, Any]:
637 """Return a GeoJSON Feature with a LineString geometry.
639 Coordinates: [lon, lat, alt_m] per GeoJSON spec (RFC 7946).
640 Properties carry parallel arrays of altitudes_m and speeds_kt for
641 colour-gradient rendering in Leaflet.
642 """
643 coords = [
644 [round(tp.lon, 6), round(tp.lat, 6), round(tp.alt_m, 1)] for tp in trackpoints
645 ]
646 return {
647 "type": "Feature",
648 "geometry": {"type": "LineString", "coordinates": coords},
649 "properties": {
650 "altitudes_m": [round(tp.alt_m, 1) for tp in trackpoints],
651 "speeds_kt": [round(tp.speed_kt, 1) for tp in trackpoints],
652 },
653 }