Coverage for app/aircraft/gps_import.py: 100%

358 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1"""GPS log file parsing for aircraft logbook import — Phase 30. 

2 

3Supported formats: 

4- GPX 1.1 (SkyDemon, ForeFlight): speed in m/s, UTC timestamps 

5- Garmin GTN/G1000 CSV: 3-row header, local time + UTC offset, GndSpd in kt 

6- KML with gx:Track (SkyDemon): lon/lat/alt order, speed derived from consecutive points 

7""" 

8 

9from __future__ import annotations 

10 

11import csv 

12import functools 

13import io 

14import math 

15import os 

16import re 

17from dataclasses import dataclass 

18from datetime import datetime, timezone 

19from typing import Any 

20import defusedxml.ElementTree as ET # guards against XML bomb / entity expansion 

21from xml.etree.ElementTree import ParseError as _ETParseError 

22 

23# ── Constants ───────────────────────────────────────────────────────────────── 

24 

25_MS_TO_KT = 1.94384 # m/s → knots 

26_FT_TO_M = 0.3048 # ft → metres 

27_KM_PER_NM = 1.852 # km per nautical mile 

28 

29_FLIGHT_SPEED_KT = 30.0 # sustained above this → airborne 

30_GROUND_MOVE_KT = 5.0 # above this (but not 30kt for 30s) → ground movement 

31_FLIGHT_SUSTAIN_S = 30.0 # seconds above 30kt required to classify as "flight" 

32_SEGMENT_GAP_S = 300.0 # 5 min of slow speed or time gap → segment break 

33_MAX_ICAO_DIST_KM = 5.0 # max distance for nearest-airport match 

34_MAX_TRACK_POINTS = 500 # downsample threshold for GeoJSON storage 

35 

36# ── Data structures ─────────────────────────────────────────────────────────── 

37 

38 

39@dataclass 

40class TrackPoint: 

41 lat: float 

42 lon: float 

43 alt_m: float 

44 speed_kt: float 

45 utc_dt: datetime # always timezone-aware UTC 

46 

47 

48@dataclass 

49class FlightSegment: 

50 trackpoints: list[TrackPoint] 

51 block_off_utc: datetime 

52 takeoff_utc: datetime | None 

53 landing_utc: datetime | None 

54 block_on_utc: datetime 

55 departure_icao: str | None 

56 arrival_icao: str | None 

57 flight_time_raw_h: float # block_on − block_off in decimal hours 

58 flight_time_rounded_h: float # rounded per aircraft precision setting 

59 track_geojson: dict[str, Any] # GeoJSON Feature 

60 landing_count: int 

61 is_ground_only: bool # True when no airborne portion detected 

62 hint_departure_icao: str | None 

63 hint_arrival_icao: str | None 

64 

65 

66@dataclass 

67class ParsedGpsFile: 

68 trackpoints: list[TrackPoint] 

69 format: str # "gpx" | "kml" | "garmin_csv" 

70 source_filename: str 

71 classification: str # "flight" | "ground_movement" | "empty" 

72 hint_departure_icao: str | None 

73 hint_arrival_icao: str | None 

74 device_id: str | None = None # avionics unit identifier (e.g. Garmin system_id) 

75 

76 

77# ── Airport database ────────────────────────────────────────────────────────── 

78 

79 

80@functools.lru_cache(maxsize=1) 

81def _load_airports() -> dict[str, tuple[float, float]]: 

82 """Load app/data/airports.csv once. Returns {icao: (lat, lon)}. 

83 

84 Only 4-letter ICAO codes are included. Returns an empty dict if the 

85 data file is missing (ICAO lookup will return None for all queries). 

86 """ 

87 data_path = os.path.join(os.path.dirname(__file__), "..", "data", "airports.csv") 

88 airports: dict[str, tuple[float, float]] = {} 

89 

90 if os.path.exists(data_path): 

91 with open(data_path, newline="", encoding="utf-8") as f: 

92 reader = csv.DictReader(f) 

93 for row in reader: 

94 ident = row.get("ident", "").strip() 

95 if not re.match(r"^[A-Z]{4}$", ident): 

96 continue 

97 try: 

98 lat = float(row["latitude_deg"]) 

99 lon = float(row["longitude_deg"]) 

100 except (ValueError, KeyError): 

101 continue 

102 airports[ident] = (lat, lon) 

103 

104 return airports 

105 

106 

107def _reset_airports_cache() -> None: 

108 """Reset the airport cache (for testing).""" 

109 _load_airports.cache_clear() 

110 

111 

112# ── Haversine ───────────────────────────────────────────────────────────────── 

113 

114 

115def _haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float: 

116 """Great-circle distance in kilometres.""" 

117 R = 6371.0 

118 phi1 = math.radians(lat1) 

119 phi2 = math.radians(lat2) 

120 dphi = math.radians(lat2 - lat1) 

121 dlambda = math.radians(lon2 - lon1) 

122 a = ( 

123 math.sin(dphi / 2) ** 2 

124 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2 

125 ) 

126 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) 

127 

128 

129# ── Format detection ────────────────────────────────────────────────────────── 

130 

131 

132def detect_format(data: bytes, filename: str) -> str: 

133 """Return "gpx", "kml", or "garmin_csv". Raise ValueError for unknown format.""" 

134 ext = os.path.splitext(filename.lower())[1] 

135 if ext == ".gpx": 

136 return "gpx" 

137 if ext == ".kml": 

138 return "kml" 

139 if ext == ".csv": 

140 lines = data.decode("utf-8-sig", errors="replace").splitlines() 

141 if lines and lines[0].startswith("#airframe_info"): 

142 return "garmin_csv" 

143 raise ValueError(f"Unsupported GPS file format: {filename!r}") 

144 

145 

146# ── File classification ─────────────────────────────────────────────────────── 

147 

148 

149def classify_track(trackpoints: list[TrackPoint]) -> str: 

150 """Return "flight", "ground_movement", or "empty".""" 

151 if not trackpoints: 

152 return "empty" 

153 

154 max_speed = max(tp.speed_kt for tp in trackpoints) 

155 if max_speed <= _GROUND_MOVE_KT: 

156 return "empty" 

157 

158 # Check for sustained window above 30kt 

159 fast_window_s = 0.0 

160 for prev, tp in zip(trackpoints, trackpoints[1:]): 

161 if tp.speed_kt > _FLIGHT_SPEED_KT: 

162 dt = (tp.utc_dt - prev.utc_dt).total_seconds() 

163 fast_window_s += max(0.0, dt) 

164 if fast_window_s >= _FLIGHT_SUSTAIN_S: 

165 return "flight" 

166 else: 

167 fast_window_s = 0.0 

168 

169 return "ground_movement" 

170 

171 

172# ── GPX parser ──────────────────────────────────────────────────────────────── 

173 

174_GPX_NS = "http://www.topografix.com/GPX/1/1" 

175 

176 

177def _extract_icao_hints(text: str) -> tuple[str | None, str | None]: 

178 """Extract departure and arrival ICAO codes from a track name string.""" 

179 icao_matches = re.findall(r"\b([A-Z]{4})\b", text) 

180 dep = icao_matches[0] if len(icao_matches) >= 1 else None 

181 arr = icao_matches[-1] if len(icao_matches) >= 2 else None 

182 return dep, arr 

183 

184 

185def _parse_gpx(data: bytes, filename: str) -> ParsedGpsFile: 

186 """Parse GPX 1.1 track. Speed field is in m/s; converted to kt.""" 

187 try: 

188 root = ET.fromstring(data.decode("utf-8-sig", errors="replace")) 

189 except _ETParseError as exc: 

190 raise ValueError(f"Invalid GPX XML in {filename!r}: {exc}") from exc 

191 

192 hint_dep: str | None = None 

193 hint_arr: str | None = None 

194 name_el = root.find(f".//{{{_GPX_NS}}}name") 

195 if name_el is not None and name_el.text: 

196 hint_dep, hint_arr = _extract_icao_hints(name_el.text) 

197 

198 trackpoints: list[TrackPoint] = [] 

199 for trkpt in root.findall(f".//{{{_GPX_NS}}}trkpt"): 

200 try: 

201 lat = float(trkpt.get("lat", "")) 

202 lon = float(trkpt.get("lon", "")) 

203 except (ValueError, TypeError): 

204 continue 

205 

206 ele_el = trkpt.find(f"{{{_GPX_NS}}}ele") 

207 alt_m = float(ele_el.text) if ele_el is not None and ele_el.text else 0.0 

208 

209 speed_el = trkpt.find(f"{{{_GPX_NS}}}speed") 

210 speed_kt = ( 

211 float(speed_el.text) * _MS_TO_KT 

212 if speed_el is not None and speed_el.text 

213 else 0.0 

214 ) 

215 

216 time_el = trkpt.find(f"{{{_GPX_NS}}}time") 

217 if time_el is None or not time_el.text: 

218 continue 

219 try: 

220 utc_dt = datetime.fromisoformat(time_el.text.replace("Z", "+00:00")) 

221 except ValueError: 

222 continue 

223 

224 trackpoints.append( 

225 TrackPoint(lat=lat, lon=lon, alt_m=alt_m, speed_kt=speed_kt, utc_dt=utc_dt) 

226 ) 

227 

228 return ParsedGpsFile( 

229 trackpoints=trackpoints, 

230 format="gpx", 

231 source_filename=filename, 

232 classification=classify_track(trackpoints), 

233 hint_departure_icao=hint_dep, 

234 hint_arrival_icao=hint_arr, 

235 ) 

236 

237 

238# ── Garmin CSV parser ───────────────────────────────────────────────────────── 

239 

240_VALID_GPS_FIX = {"3D", "3DDiff"} 

241 

242 

243def _parse_garmin_csv(data: bytes, filename: str) -> ParsedGpsFile: 

244 """Parse Garmin GTN/G1000 CSV with 3-row header. 

245 

246 Row 0: #airframe_info metadata 

247 Row 1: unit labels 

248 Row 2: column names (Lcl Date, Lcl Time, UTCOfst, Latitude, Longitude, AltMSL, GndSpd, …, GPSfix, …) 

249 Only rows with GPSfix in {"3D", "3DDiff"} are used. 

250 Departure ICAO hint is extracted from filename: log_YYMMDD_HHMMSS_ICAO.csv 

251 """ 

252 text = data.decode("utf-8-sig", errors="replace") 

253 lines = text.splitlines() 

254 

255 if len(lines) < 4: 

256 raise ValueError(f"Garmin CSV too short: {filename!r}") 

257 

258 # Device ID from #airframe_info header line 

259 device_id: str | None = None 

260 _did_match = re.search(r'system_id="([^"]+)"', lines[0]) 

261 if _did_match: 

262 device_id = _did_match.group(1) 

263 

264 # Departure ICAO from filename pattern 

265 hint_dep: str | None = None 

266 base = os.path.splitext(os.path.basename(filename))[0] 

267 parts = base.split("_") 

268 if len(parts) >= 4: 

269 candidate = parts[-1].strip() 

270 if re.match(r"^[A-Z]{4}$", candidate): 

271 hint_dep = candidate 

272 

273 # Skip rows 0–1 (metadata + units), use row 2 as header 

274 csv_text = "\n".join(lines[2:]) 

275 reader = csv.DictReader(io.StringIO(csv_text)) 

276 if reader.fieldnames: 

277 reader.fieldnames = [f.strip() for f in reader.fieldnames] 

278 

279 trackpoints: list[TrackPoint] = [] 

280 for row in reader: 

281 gpsfx = row.get("GPSfix", "").strip() 

282 if gpsfx not in _VALID_GPS_FIX: 

283 continue 

284 

285 try: 

286 lat = float(row["Latitude"].strip()) 

287 lon = float(row["Longitude"].strip()) 

288 except (ValueError, KeyError): 

289 continue 

290 

291 try: 

292 alt_m = float(row["AltMSL"].strip()) * _FT_TO_M 

293 except (ValueError, KeyError): 

294 alt_m = 0.0 

295 

296 try: 

297 speed_kt = float(row["GndSpd"].strip()) 

298 except (ValueError, KeyError): 

299 speed_kt = 0.0 

300 

301 try: 

302 date_str = row["Lcl Date"].strip() 

303 time_str = row["Lcl Time"].strip() 

304 utc_off = row["UTCOfst"].strip() 

305 local_dt = datetime.fromisoformat(f"{date_str}T{time_str}{utc_off}") 

306 utc_dt = local_dt.astimezone(timezone.utc) 

307 except (ValueError, KeyError): 

308 continue 

309 

310 trackpoints.append( 

311 TrackPoint(lat=lat, lon=lon, alt_m=alt_m, speed_kt=speed_kt, utc_dt=utc_dt) 

312 ) 

313 

314 return ParsedGpsFile( 

315 trackpoints=trackpoints, 

316 format="garmin_csv", 

317 source_filename=filename, 

318 classification=classify_track(trackpoints), 

319 hint_departure_icao=hint_dep, 

320 hint_arrival_icao=None, 

321 device_id=device_id, 

322 ) 

323 

324 

325# ── KML parser ──────────────────────────────────────────────────────────────── 

326 

327_KML_NS = "http://www.opengis.net/kml/2.2" 

328_GX_NS = "http://www.google.com/kml/ext/2.2" 

329 

330 

331def _parse_kml(data: bytes, filename: str) -> ParsedGpsFile: 

332 """Parse SkyDemon KML with gx:Track. 

333 

334 Coordinate order is lon/lat/alt (note: reversed from GPX). 

335 Speed is derived from consecutive point distance / time delta. 

336 """ 

337 try: 

338 root = ET.fromstring(data.decode("utf-8-sig", errors="replace")) 

339 except _ETParseError as exc: 

340 raise ValueError(f"Invalid KML XML in {filename!r}: {exc}") from exc 

341 

342 hint_dep: str | None = None 

343 hint_arr: str | None = None 

344 for pm in root.findall(f".//{{{_KML_NS}}}Placemark"): 

345 name_el = pm.find(f"{{{_KML_NS}}}name") 

346 if name_el is not None and name_el.text: 

347 dep, arr = _extract_icao_hints(name_el.text) 

348 if dep and arr: 

349 hint_dep, hint_arr = dep, arr 

350 break 

351 

352 track_el = root.find(f".//{{{_GX_NS}}}Track") 

353 if track_el is None: 

354 raise ValueError(f"No gx:Track element in KML: {filename!r}") 

355 

356 whens: list[datetime | None] = [] 

357 coords: list[tuple[float, float, float]] = [] 

358 

359 for child in track_el: 

360 if child.tag == f"{{{_KML_NS}}}when": 

361 if child.text: 

362 try: 

363 dt = datetime.fromisoformat(child.text.replace("Z", "+00:00")) 

364 whens.append(dt.astimezone(timezone.utc)) 

365 except ValueError: 

366 whens.append(None) 

367 else: 

368 whens.append(None) 

369 elif child.tag == f"{{{_GX_NS}}}coord": 

370 if child.text: 

371 parts = child.text.strip().split() 

372 if len(parts) >= 3: 

373 try: 

374 lon_c = float(parts[0]) 

375 lat_c = float(parts[1]) 

376 alt_c = float(parts[2]) 

377 except ValueError: 

378 lon_c, lat_c, alt_c = 0.0, 0.0, 0.0 

379 coords.append((lon_c, lat_c, alt_c)) 

380 continue 

381 coords.append((0.0, 0.0, 0.0)) 

382 

383 if len(whens) != len(coords): 

384 raise ValueError( 

385 f"KML when/coord count mismatch in {filename!r}: " 

386 f"{len(whens)} vs {len(coords)}" 

387 ) 

388 

389 trackpoints: list[TrackPoint] = [] 

390 for i, (when, (lon, lat, alt_m)) in enumerate(zip(whens, coords)): 

391 if when is None: 

392 continue 

393 

394 if trackpoints: 

395 prev = trackpoints[-1] 

396 dt_s = (when - prev.utc_dt).total_seconds() 

397 dist_km = _haversine_km(prev.lat, prev.lon, lat, lon) 

398 speed_kt = (dist_km / _KM_PER_NM * 3600.0 / dt_s) if dt_s > 0 else 0.0 

399 else: 

400 speed_kt = 0.0 

401 

402 trackpoints.append( 

403 TrackPoint(lat=lat, lon=lon, alt_m=alt_m, speed_kt=speed_kt, utc_dt=when) 

404 ) 

405 

406 return ParsedGpsFile( 

407 trackpoints=trackpoints, 

408 format="kml", 

409 source_filename=filename, 

410 classification=classify_track(trackpoints), 

411 hint_departure_icao=hint_dep, 

412 hint_arrival_icao=hint_arr, 

413 ) 

414 

415 

416# ── Entry point ─────────────────────────────────────────────────────────────── 

417 

418 

419def parse_gps_file(data: bytes, filename: str) -> ParsedGpsFile: 

420 """Detect format and parse. Raises ValueError on unsupported or invalid data.""" 

421 fmt = detect_format(data, filename) 

422 if fmt == "gpx": 

423 return _parse_gpx(data, filename) 

424 if fmt == "kml": 

425 return _parse_kml(data, filename) 

426 return _parse_garmin_csv(data, filename) 

427 

428 

429# ── Track merge ─────────────────────────────────────────────────────────────── 

430 

431 

432def merge_and_sort(files: list[ParsedGpsFile]) -> list[TrackPoint]: 

433 """Merge non-empty trackpoints from all files, sorted chronologically.""" 

434 all_pts: list[TrackPoint] = [] 

435 for f in files: 

436 if f.classification != "empty": 

437 all_pts.extend(f.trackpoints) 

438 all_pts.sort(key=lambda tp: tp.utc_dt) 

439 return all_pts 

440 

441 

442# ── Segment detection ───────────────────────────────────────────────────────── 

443 

444 

445def _split_into_raw_groups(trackpoints: list[TrackPoint]) -> list[list[TrackPoint]]: 

446 """Split merged trackpoints into groups at slow/time gaps ≥ 5 min. 

447 

448 Only looks for breaks between the first and last fast (≥ 30kt) points, so 

449 pre-flight taxi and post-landing taxi are preserved in the enclosing segment. 

450 """ 

451 n = len(trackpoints) 

452 if n == 0: 

453 return [] 

454 

455 fast_indices = [ 

456 i for i, tp in enumerate(trackpoints) if tp.speed_kt >= _FLIGHT_SPEED_KT 

457 ] 

458 if not fast_indices: 

459 return [trackpoints] 

460 

461 first_fast = fast_indices[0] 

462 last_fast = fast_indices[-1] 

463 

464 groups: list[list[TrackPoint]] = [] 

465 current_start = 0 

466 i = first_fast 

467 

468 while i < last_fast: 

469 # Large time gap between consecutive points (gap between uploaded files) 

470 time_gap = (trackpoints[i + 1].utc_dt - trackpoints[i].utc_dt).total_seconds() 

471 if time_gap >= _SEGMENT_GAP_S: 

472 groups.append(trackpoints[current_start : i + 1]) 

473 current_start = i + 1 

474 i += 1 

475 continue 

476 

477 # Slow run starting at i+1 

478 if trackpoints[i + 1].speed_kt < _FLIGHT_SPEED_KT: 

479 j = i + 2 

480 while j <= last_fast and trackpoints[j].speed_kt < _FLIGHT_SPEED_KT: 

481 j += 1 

482 slow_dur = ( 

483 trackpoints[j - 1].utc_dt - trackpoints[i + 1].utc_dt 

484 ).total_seconds() 

485 if slow_dur >= _SEGMENT_GAP_S: 

486 # Real segment break — exclude slow gap from both segments 

487 groups.append(trackpoints[current_start : i + 1]) 

488 current_start = j 

489 i = j 

490 else: 

491 i = j # short slow run — keep in current segment 

492 else: 

493 i += 1 

494 

495 groups.append(trackpoints[current_start:]) 

496 return [g for g in groups if g] 

497 

498 

499def _count_landings(pts: list[TrackPoint]) -> int: 

500 """Count transitions from airborne (≥30kt) to ground (<30kt).""" 

501 count = 0 

502 was_fast = False 

503 for tp in pts: 

504 is_fast = tp.speed_kt >= _FLIGHT_SPEED_KT 

505 if was_fast and not is_fast: 

506 count += 1 

507 was_fast = is_fast 

508 return count 

509 

510 

511def detect_segments( 

512 trackpoints: list[TrackPoint], 

513 aircraft_precision: str = "tenth_hour", 

514 hint_dep: str | None = None, 

515 hint_arr: str | None = None, 

516) -> list[FlightSegment]: 

517 """Build FlightSegment objects from merged trackpoints. 

518 

519 hint_dep / hint_arr are optional ICAO codes from GPX/KML track names or 

520 Garmin filename patterns, used as fallback when GPS-proximity lookup fails. 

521 """ 

522 raw_groups = _split_into_raw_groups(trackpoints) 

523 airports = _load_airports() 

524 segments: list[FlightSegment] = [] 

525 

526 for idx, pts in enumerate(raw_groups): 

527 block_off = pts[0].utc_dt 

528 block_on = pts[-1].utc_dt 

529 

530 takeoff_utc: datetime | None = None 

531 landing_utc: datetime | None = None 

532 for tp in pts: 

533 if tp.speed_kt >= _FLIGHT_SPEED_KT: 

534 if takeoff_utc is None: 

535 takeoff_utc = tp.utc_dt 

536 landing_utc = tp.utc_dt 

537 

538 is_ground_only = takeoff_utc is None 

539 landing_count = _count_landings(pts) 

540 

541 raw_h = (block_on - block_off).total_seconds() / 3600.0 

542 rounded_h = round_flight_time(raw_h, aircraft_precision) 

543 

544 dep_icao = resolve_icao(pts[0].lat, pts[0].lon, airports) or ( 

545 hint_dep if idx == 0 else None 

546 ) 

547 arr_icao = resolve_icao(pts[-1].lat, pts[-1].lon, airports) or ( 

548 hint_arr if idx == len(raw_groups) - 1 else None 

549 ) 

550 

551 downsampled = downsample_track(pts) 

552 geojson = build_geojson(downsampled) 

553 

554 segments.append( 

555 FlightSegment( 

556 trackpoints=pts, 

557 block_off_utc=block_off, 

558 takeoff_utc=takeoff_utc, 

559 landing_utc=landing_utc, 

560 block_on_utc=block_on, 

561 departure_icao=dep_icao, 

562 arrival_icao=arr_icao, 

563 flight_time_raw_h=raw_h, 

564 flight_time_rounded_h=rounded_h, 

565 track_geojson=geojson, 

566 landing_count=landing_count, 

567 is_ground_only=is_ground_only, 

568 hint_departure_icao=hint_dep if idx == 0 else None, 

569 hint_arrival_icao=hint_arr if idx == len(raw_groups) - 1 else None, 

570 ) 

571 ) 

572 

573 return segments 

574 

575 

576# ── ICAO resolution ─────────────────────────────────────────────────────────── 

577 

578 

579def resolve_icao( 

580 lat: float, 

581 lon: float, 

582 airports: dict[str, tuple[float, float]] | None = None, 

583) -> str | None: 

584 """Return the nearest ICAO code within 5 km, or None if none is close enough.""" 

585 if airports is None: 

586 airports = _load_airports() 

587 

588 best_code: str | None = None 

589 best_dist = _MAX_ICAO_DIST_KM 

590 

591 for code, (ap_lat, ap_lon) in airports.items(): 

592 d = _haversine_km(lat, lon, ap_lat, ap_lon) 

593 if d < best_dist: 

594 best_dist = d 

595 best_code = code 

596 

597 return best_code 

598 

599 

600# ── Time rounding ───────────────────────────────────────────────────────────── 

601 

602 

603def round_flight_time(raw_hours: float, precision: str) -> float: 

604 """Round raw_hours up to the nearest precision boundary. 

605 

606 precision="tenth_hour": round up to nearest 0.1 h (6-min boundary). 

607 precision="minute": round up to nearest 1/60 h (1-min boundary). 

608 """ 

609 if raw_hours <= 0: 

610 return 0.0 

611 if precision == "minute": 

612 minutes = math.ceil(raw_hours * 60) 

613 return round(minutes / 60, 4) 

614 # tenth_hour: ceiling to nearest 0.1 

615 return round(math.ceil(raw_hours * 10) / 10, 1) 

616 

617 

618# ── GeoJSON / downsampling ──────────────────────────────────────────────────── 

619 

620 

621def downsample_track( 

622 trackpoints: list[TrackPoint], max_points: int = _MAX_TRACK_POINTS 

623) -> list[TrackPoint]: 

624 """Return ≤ max_points trackpoints using uniform stride; first and last preserved.""" 

625 n = len(trackpoints) 

626 if n <= max_points: 

627 return trackpoints 

628 

629 stride = n / max_points 

630 indices: set[int] = {round(i * stride) for i in range(max_points)} 

631 indices.add(0) 

632 indices.add(n - 1) 

633 return [trackpoints[i] for i in sorted(indices) if i < n] 

634 

635 

636def build_geojson(trackpoints: list[TrackPoint]) -> dict[str, Any]: 

637 """Return a GeoJSON Feature with a LineString geometry. 

638 

639 Coordinates: [lon, lat, alt_m] per GeoJSON spec (RFC 7946). 

640 Properties carry parallel arrays of altitudes_m and speeds_kt for 

641 colour-gradient rendering in Leaflet. 

642 """ 

643 coords = [ 

644 [round(tp.lon, 6), round(tp.lat, 6), round(tp.alt_m, 1)] for tp in trackpoints 

645 ] 

646 return { 

647 "type": "Feature", 

648 "geometry": {"type": "LineString", "coordinates": coords}, 

649 "properties": { 

650 "altitudes_m": [round(tp.alt_m, 1) for tp in trackpoints], 

651 "speeds_kt": [round(tp.speed_kt, 1) for tp in trackpoints], 

652 }, 

653 }