Coverage for app/utils.py: 100%
575 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1"""Shared utilities available to all blueprints."""
3import csv
4import functools
5import io
6import logging
7import math
8import os
9from collections import defaultdict
10from functools import wraps
11from typing import Any, Callable
13from flask import abort, redirect, session, url_for # pyright: ignore[reportMissingImports]
15_log = logging.getLogger(__name__)
18# ── Tracks GIF export ─────────────────────────────────────────────────────────
20_GIF_W, _GIF_H = 800, 480
21_GIF_PAD = 30 # pixel padding around the track area
22_GIF_STEP_MS = 600 # ms per frame in the animated GIF
23_GIF_HOLD_MS = 3000 # ms for the final "all tracks" frame
24_TRACK_COLOUR = (180, 30, 200) # current/newest track: vivid purple
25_HISTORY_COLOUR = (
26 130,
27 20,
28 150,
29) # previously drawn tracks: slightly darker purple (GIF uses flat colour; web uses opacity fade)
30_BG_COLOUR = (248, 248, 252) # near-white background
33def _mercator_y(lat_deg: float) -> float:
34 """Web-Mercator y value for a latitude (not clamped)."""
35 lat = math.radians(max(-85.0, min(85.0, lat_deg)))
36 return math.log(math.tan(math.pi / 4 + lat / 2))
39def _build_gif_projection(
40 all_coords: list[tuple[float, float]],
41 canvas_w: int = _GIF_W,
42 canvas_h: int = _GIF_H,
43 pad: int = _GIF_PAD,
44) -> "tuple[Any, Any] | None":
45 """Return (project_fn, bbox) or None if there are fewer than 2 unique points."""
46 if len(all_coords) < 2:
47 return None
48 lons = [c[0] for c in all_coords]
49 lats = [c[1] for c in all_coords]
50 min_lon, max_lon = min(lons), max(lons)
51 min_lat, max_lat = min(lats), max(lats)
52 # Add small margin so tracks don't touch the edge
53 dlon = max(max_lon - min_lon, 0.1) * 0.1
54 dlat = max(max_lat - min_lat, 0.1) * 0.1
55 min_lon -= dlon
56 max_lon += dlon
57 min_lat -= dlat
58 max_lat += dlat
60 usable_w = canvas_w - 2 * pad
61 usable_h = canvas_h - 2 * pad
63 y_min = _mercator_y(min_lat)
64 y_max = _mercator_y(max_lat)
65 y_range = y_max - y_min or 1e-9 # Mercator log-tan units (NOT degrees)
66 x_range = max_lon - min_lon or 1e-9 # degrees
68 # Web Mercator geographic aspect: scale_y_mercator = scale_x * 180/π
69 # (1 Mercator-y unit = 111 km at any latitude; 1° lon at centre = cos(φ)*111 km,
70 # so equal-km scales satisfy scale_y = scale_x * 180/π).
71 # Pick scale_x so BOTH axes fit: scale_x ≤ w/x_range AND scale_x * 180/π * y_range ≤ h.
72 scale_x = min(
73 usable_w / x_range,
74 usable_h * math.pi / (180.0 * y_range),
75 )
76 scale_y = scale_x * 180.0 / math.pi # pixels per Mercator-y unit
78 off_x = pad + (usable_w - x_range * scale_x) / 2
79 off_y = pad + (usable_h - y_range * scale_y) / 2
81 def project(lon: float, lat: float) -> tuple[int, int]:
82 px = off_x + (lon - min_lon) * scale_x
83 py = off_y + (y_max - _mercator_y(lat)) * scale_y
84 return int(px), int(py)
86 return project, (min_lon, min_lat, max_lon, max_lat)
89def _coords_from_geojson(geojson: dict[str, Any] | None) -> list[tuple[float, float]]:
90 """Extract (lon, lat) pairs from a GeoJSON Feature or FeatureCollection."""
91 if not geojson:
92 return []
93 if geojson.get("type") == "Feature":
94 geom = geojson.get("geometry") or {}
95 return [(c[0], c[1]) for c in geom.get("coordinates", []) if len(c) >= 2]
96 if geojson.get("type") == "FeatureCollection":
97 result = []
98 for feat in geojson.get("features", []):
99 result.extend(_coords_from_geojson(feat))
100 return result
101 return []
104def _make_tile_background(
105 project: Callable[[float, float], tuple[int, int]],
106 min_lon: float,
107 max_lon: float,
108 min_lat: float,
109 max_lat: float,
110 canvas_w: int,
111 canvas_h: int,
112 openaip_key: str | None = None,
113 tile_cache: dict[Any, bytes] | None = None,
114 max_tiles: int = 36,
115) -> Any:
116 """Fetch OSM raster tiles and composite them into a background PIL Image.
118 Returns a PIL Image, or None on failure (caller falls back to plain fill).
119 Tiles are fetched from OpenStreetMap; zoom level is chosen automatically.
120 Max 36 tiles are fetched to keep export time reasonable.
122 tile_cache, if provided, is a dict keyed by (z, tx, ty) or ("opi", z, tx, ty)
123 holding raw PNG bytes so repeated calls across frames skip network fetches.
124 """
125 import urllib.request
126 from PIL import Image as _Img # pyright: ignore[reportMissingImports]
128 # Compute pixels-per-degree-lon from the projection function
129 mid_lat = (min_lat + max_lat) / 2.0
130 px0x = project(min_lon, mid_lat)[0]
131 px1x = project(min_lon + 1.0, mid_lat)[0]
132 scale_x = float(px1x - px0x) # canvas pixels per degree-lon
133 if scale_x <= 0:
134 return None
136 # Choose zoom so each tile is ~256 canvas pixels wide (clamped 2–14)
137 z = int(round(math.log2(max(scale_x * 360.0 / 256.0, 1.0))))
138 z = max(2, min(z, 14))
139 n = 2**z
141 def _lon_to_tx(lon: float) -> int:
142 return int(int((lon + 180.0) / 360.0 * n) % n)
144 def _lat_to_ty(lat: float) -> int:
145 lat_r = math.radians(max(-85.0, min(85.0, lat)))
146 return int(
147 (1.0 - math.log(math.tan(lat_r) + 1.0 / math.cos(lat_r)) / math.pi)
148 / 2.0
149 * n
150 )
152 def _tile_nw_lonlat(tx: int, ty: int) -> tuple[float, float]:
153 lon = tx * 360.0 / n - 180.0
154 lat = math.degrees(math.atan(math.sinh(math.pi * (1.0 - 2.0 * ty / n))))
155 return lon, lat
157 tx_min = _lon_to_tx(min_lon)
158 tx_max = _lon_to_tx(max_lon)
159 ty_min = _lat_to_ty(max_lat)
160 ty_max = _lat_to_ty(min_lat)
162 # Guard against tile count explosion
163 if (tx_max - tx_min + 1) * (ty_max - ty_min + 1) > max_tiles:
164 return None
166 bg = _Img.new("RGB", (canvas_w, canvas_h), _BG_COLOUR)
167 ua = "OpenHangar flight-logbook GIF export (https://github.com/e2jk/OpenHangar)"
169 for tx in range(tx_min, tx_max + 1):
170 for ty in range(ty_min, ty_max + 1):
171 try:
172 tx_w = tx % n
173 # Compute pixel bounds for this tile by projecting its NW corner
174 # and the NW corner of the tile to its SE — then size the resize
175 # to exactly span that gap plus 1 px overlap, eliminating seams
176 # caused by integer rounding of adjacent corner coordinates.
177 lon_nw, lat_nw = _tile_nw_lonlat(tx, ty)
178 lon_se, lat_se = _tile_nw_lonlat(tx + 1, ty + 1)
179 px, py = project(lon_nw, lat_nw)
180 px_se, py_se = project(lon_se, lat_se)
181 tile_w = max(1, px_se - px + 1)
182 tile_h = max(1, py_se - py + 1)
184 # Base map: CARTO light (same as web animation)
185 base_key = (z, tx_w, ty)
186 if tile_cache is not None and base_key in tile_cache:
187 raw = tile_cache[base_key]
188 else:
189 base_url = (
190 f"https://a.basemaps.cartocdn.com/light_all/{z}/{tx_w}/{ty}.png"
191 )
192 req = urllib.request.Request(base_url, headers={"User-Agent": ua})
193 with urllib.request.urlopen(req, timeout=5) as resp:
194 raw = resp.read()
195 if tile_cache is not None:
196 tile_cache[base_key] = raw
197 tile = _Img.open(io.BytesIO(raw)).convert("RGBA")
198 tile = tile.resize((tile_w, tile_h), _Img.Resampling.LANCZOS)
199 bg.paste(tile.convert("RGB"), (px, py))
200 # Aviation overlay: OpenAIP (only at zoom ≤ 14, requires key)
201 if openaip_key and z <= 14:
202 opi_key = ("opi", z, tx_w, ty)
203 if tile_cache is not None and opi_key in tile_cache:
204 opi_raw = tile_cache[opi_key]
205 else:
206 opi_url = f"https://api.tiles.openaip.net/api/data/openaip/{z}/{tx_w}/{ty}.png?apiKey={openaip_key}"
207 opi_req = urllib.request.Request(
208 opi_url, headers={"User-Agent": ua}
209 )
210 with urllib.request.urlopen(opi_req, timeout=5) as opi_resp:
211 opi_raw = opi_resp.read()
212 if tile_cache is not None:
213 tile_cache[opi_key] = opi_raw
214 opi_tile = _Img.open(io.BytesIO(opi_raw)).convert("RGBA")
215 opi_tile = opi_tile.resize(
216 (tile_w, tile_h), _Img.Resampling.LANCZOS
217 )
218 bg.paste(opi_tile.convert("RGB"), (px, py), opi_tile)
219 except Exception as exc:
220 _log.debug("OPI tile unavailable, leaving area as background: %s", exc)
222 return bg
225def _canvas_geo_bounds(
226 project_fn: Callable[[float, float], tuple[int, int]],
227 canvas_w: int,
228 canvas_h: int,
229 min_lon: float,
230 max_lon: float,
231 min_lat: float,
232 max_lat: float,
233) -> tuple[float, float, float, float]:
234 """Return (lon_min, lat_min, lon_max, lat_max) spanning the full canvas.
236 Expands the geographic bbox beyond the track bounding box so the full
237 canvas area is covered by map tiles, eliminating plain-background padding.
238 Returns the original bbox if the inverse projection cannot be computed.
239 """
240 mid_lat = (min_lat + max_lat) / 2.0
241 # Longitude is linear with pixel x in Mercator
242 px0, _ = project_fn(min_lon, mid_lat)
243 px1, _ = project_fn(min_lon + 1.0, mid_lat)
244 scale_x = float(px1 - px0)
245 if scale_x <= 0:
246 return min_lon, min_lat, max_lon, max_lat
247 c_lon_min = min_lon - float(px0) / scale_x
248 c_lon_max = min_lon + float(canvas_w - px0) / scale_x
250 # Latitude: derive Mercator scale from two known projection points
251 _, py_bot = project_fn(min_lon, min_lat) # larger py (bottom of canvas area)
252 _, py_top = project_fn(min_lon, max_lat) # smaller py (top of canvas area)
253 merc_bot = _mercator_y(min_lat)
254 merc_top = _mercator_y(max_lat)
255 merc_range = merc_top - merc_bot
256 py_range = float(py_bot - py_top)
257 if merc_range <= 0 or py_range <= 0:
258 return c_lon_min, min_lat, c_lon_max, max_lat
259 scale_y = py_range / merc_range # pixels per Mercator-y unit
260 # Derived: merc_y at canvas row py = merc_top + (py_top - py) / scale_y
261 merc_canvas_top = merc_top + float(py_top) / scale_y
262 merc_canvas_bot = merc_top + float(py_top - canvas_h) / scale_y
263 # Clamp to avoid math.atan overflow at extreme Mercator values
264 merc_canvas_top = max(-10.0, min(10.0, merc_canvas_top))
265 merc_canvas_bot = max(-10.0, min(10.0, merc_canvas_bot))
266 c_lat_max = math.degrees(2.0 * math.atan(math.exp(merc_canvas_top)) - math.pi / 2.0)
267 c_lat_min = math.degrees(2.0 * math.atan(math.exp(merc_canvas_bot)) - math.pi / 2.0)
268 return c_lon_min, max(-85.0, c_lat_min), c_lon_max, min(85.0, c_lat_max)
271def sort_tracks_oldest_first(
272 rows: list[dict[str, Any]],
273) -> list[dict[str, Any]]:
274 """Return rows sorted ascending by their 'date' string key (oldest first).
276 Both the web animation and the GIF export use this ordering so that
277 chronological playback and progressive opacity fading are consistent
278 across rendering environments. Call this once in the route handler and
279 pass the result to both the template context and generate_tracks_gif().
280 """
281 return sorted(rows, key=lambda r: r.get("date", ""))
284def generate_tracks_gif(
285 track_rows: list[dict[str, Any]],
286 _font_path: str = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
287 _openaip_key: str | None = None,
288 canvas_w: int = _GIF_W,
289 canvas_h: int = _GIF_H,
290 high_res: bool = False,
291) -> bytes:
292 """Render an animated GIF of the flight tracks, oldest-first.
294 track_rows must already be sorted oldest-first; use sort_tracks_oldest_first().
295 Each frame adds one more track (cumulative) and re-fits the map to the
296 bounding box of tracks seen so far, creating a progressive zoom-out effect
297 that mirrors the web animation. Previous tracks are drawn in a lighter
298 colour; the newest one in vivid purple. Returns raw GIF bytes ready to
299 stream to the browser.
301 high_res=True doubles line widths, font sizes and padding, uses a 256-colour
302 palette, and raises the tile-fetch cap to 64 — producing a sharper map with
303 readable labels at the cost of a larger file.
304 """
305 from PIL import Image, ImageDraw, ImageFont # pyright: ignore[reportMissingImports]
307 _q_scale = 2 if high_res else 1
308 _pad = _GIF_PAD * _q_scale
309 _line_curr = 3 * _q_scale
310 _line_hist = 2 * _q_scale
311 _font_sz = 14 * _q_scale
312 _font_sz_sm = 11 * _q_scale
313 _txt_margin = 8 * _q_scale
314 _q_colors = 256 if high_res else 128
315 _max_tiles = 64 if high_res else 36
316 # Canvas-extent calls cover the full 1600×960 canvas which can require
317 # ~70–90 tiles at certain zoom levels (wide flat tracks at z=7), so use a
318 # higher cap than the track-bbox call. 128 = 2× the high-res track cap.
319 _max_tiles_canvas = _max_tiles * 2 if high_res else _max_tiles
321 # Pre-compute per-track coordinates (avoids re-parsing GeoJSON in the inner loop)
322 per_track_coords = [_coords_from_geojson(r.get("geojson")) for r in track_rows]
323 all_coords: list[tuple[float, float]] = [c for tc in per_track_coords for c in tc]
325 if len(all_coords) < 2:
326 # Fallback: single blank frame
327 img = Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR)
328 buf = io.BytesIO()
329 img.save(buf, format="GIF")
330 return buf.getvalue()
332 font: Any # PIL font type varies across Pillow versions
333 font_sm: Any
334 try:
335 font = ImageFont.truetype(_font_path, _font_sz)
336 font_sm = ImageFont.truetype(_font_path, _font_sz_sm)
337 except (IOError, OSError):
338 font = font_sm = ImageFont.load_default()
340 def draw_shadow_text(draw: Any, text: str, font: Any) -> None:
341 draw.text(
342 (_txt_margin + 1, _txt_margin + 1), text, fill=(255, 255, 255), font=font
343 )
344 draw.text((_txt_margin, _txt_margin), text, fill=(40, 40, 60), font=font)
346 def draw_track(
347 draw: Any,
348 coords: list[tuple[float, float]],
349 colour: tuple[int, int, int],
350 width: int,
351 project_fn: Callable[[float, float], tuple[int, int]],
352 ) -> None:
353 pts = [project_fn(lon, lat) for lon, lat in coords]
354 if len(pts) >= 2:
355 draw.line(pts, fill=colour, width=width)
356 r = width + 2 * _q_scale
357 sx, sy = pts[0]
358 draw.ellipse([sx - r, sy - r, sx + r, sy + r], fill=(40, 160, 60))
359 ex, ey = pts[-1]
360 draw.ellipse([ex - r, ey - r, ex + r, ey + r], fill=(200, 40, 40))
362 # Shared tile cache: (z, tx, ty) → raw PNG bytes; ("opi", z, tx, ty) for OpenAIP.
363 # Avoids re-fetching tiles that appear in multiple frames at the same zoom level.
364 tile_cache: dict[Any, bytes] = {}
366 def _frame_bg(
367 project_fn: Callable[[float, float], tuple[int, int]],
368 min_lon: float,
369 max_lon: float,
370 min_lat: float,
371 max_lat: float,
372 ) -> Any:
373 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = (
374 min_lon,
375 min_lat,
376 max_lon,
377 max_lat,
378 )
379 if high_res:
380 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = (
381 _canvas_geo_bounds(
382 project_fn,
383 canvas_w,
384 canvas_h,
385 min_lon,
386 max_lon,
387 min_lat,
388 max_lat,
389 )
390 )
391 bg = _make_tile_background(
392 project_fn,
393 fetch_lon_min,
394 fetch_lon_max,
395 fetch_lat_min,
396 fetch_lat_max,
397 canvas_w,
398 canvas_h,
399 openaip_key=_openaip_key,
400 tile_cache=tile_cache,
401 max_tiles=_max_tiles_canvas,
402 )
403 if bg is None and high_res:
404 # Canvas extent still exceeded the raised cap; fall back to track bbox
405 bg = _make_tile_background(
406 project_fn,
407 min_lon,
408 max_lon,
409 min_lat,
410 max_lat,
411 canvas_w,
412 canvas_h,
413 openaip_key=_openaip_key,
414 tile_cache=tile_cache,
415 max_tiles=_max_tiles,
416 )
417 return (
418 bg.copy()
419 if bg is not None
420 else Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR)
421 )
423 frames: list[Any] = []
424 durations: list[int] = []
425 accumulated_coords: list[tuple[float, float]] = []
427 for frame_idx in range(len(track_rows)):
428 accumulated_coords.extend(per_track_coords[frame_idx])
429 proj_result = _build_gif_projection(
430 accumulated_coords, canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad
431 )
432 if proj_result is None:
433 continue # not enough coords yet (e.g. leading rows with no geojson)
435 project_fn, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result
436 img = _frame_bg(project_fn, f_min_lon, f_max_lon, f_min_lat, f_max_lat)
437 draw = ImageDraw.Draw(img)
439 for i in range(frame_idx):
440 draw_track(
441 draw, per_track_coords[i], _HISTORY_COLOUR, _line_hist, project_fn
442 )
443 draw_track(
444 draw, per_track_coords[frame_idx], _TRACK_COLOUR, _line_curr, project_fn
445 )
447 row = track_rows[frame_idx]
448 label = f"{row.get('date', '')} {row.get('dep', '')} → {row.get('arr', '')}"
449 draw_shadow_text(draw, label, font)
450 draw.text(
451 (_txt_margin, canvas_h - _font_sz_sm - _txt_margin),
452 f"{frame_idx + 1} / {len(track_rows)}",
453 fill=(80, 80, 100),
454 font=font_sm,
455 )
457 frames.append(img)
458 durations.append(_GIF_STEP_MS)
460 # Final frame: all tracks at equal weight using the full bounding box, longer hold
461 if frames:
462 proj_result_final = _build_gif_projection(
463 all_coords, canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad
464 )
465 # proj_result_final is guaranteed non-None: all_coords has ≥ 2 points (checked above)
466 project_final, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result_final # type: ignore[misc]
467 img = _frame_bg(project_final, f_min_lon, f_max_lon, f_min_lat, f_max_lat)
468 draw = ImageDraw.Draw(img)
469 for tc in per_track_coords:
470 draw_track(draw, tc, _TRACK_COLOUR, _line_hist, project_final)
471 draw_shadow_text(draw, f"All {len(track_rows)} tracks", font)
472 frames.append(img)
473 durations.append(_GIF_HOLD_MS)
475 # Quantise all frames to a shared 128-colour palette built from the last frame
476 # (widest view, most visually representative). A single global palette means
477 # identical background areas compress very well with GIF's LZW codec.
478 palette_src = frames[-1].quantize(colors=_q_colors, dither=Image.Dither.NONE)
479 quantized: list[Any] = [
480 f.quantize(palette=palette_src, dither=Image.Dither.NONE) for f in frames
481 ]
483 buf = io.BytesIO()
484 quantized[0].save(
485 buf,
486 format="GIF",
487 save_all=True,
488 append_images=quantized[1:],
489 loop=0,
490 duration=durations,
491 optimize=True,
492 )
493 return buf.getvalue()
496def generate_single_track_image(
497 geojson: dict[str, Any] | None,
498 date: str = "",
499 dep: str = "",
500 arr: str = "",
501 _font_path: str = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
502 _openaip_key: str | None = None,
503 canvas_w: int = _GIF_W,
504 canvas_h: int = _GIF_H,
505 high_res: bool = False,
506) -> bytes:
507 """Render a single GPS track as a static PNG.
509 Returns raw PNG bytes. If the track has fewer than 2 points a blank
510 canvas is returned so the caller can always stream a valid image.
511 """
512 from PIL import Image, ImageDraw, ImageFont # pyright: ignore[reportMissingImports]
514 all_coords = _coords_from_geojson(geojson)
516 _q_scale = 2 if high_res else 1
517 _pad = _GIF_PAD * _q_scale
518 _line_w = 3 * _q_scale
519 _font_sz = 14 * _q_scale
520 _txt_margin = 8 * _q_scale
521 _max_tiles = 64 if high_res else 36
522 _max_tiles_canvas = _max_tiles * 2 if high_res else _max_tiles
524 def _blank_png() -> bytes:
525 buf = io.BytesIO()
526 Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR).save(buf, format="PNG")
527 return buf.getvalue()
529 if len(all_coords) < 2:
530 return _blank_png()
532 proj_result = _build_gif_projection(
533 all_coords, canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad
534 )
535 if proj_result is None:
536 return _blank_png()
538 project_fn, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result
540 tile_cache: dict[Any, bytes] = {}
541 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = (
542 f_min_lon,
543 f_min_lat,
544 f_max_lon,
545 f_max_lat,
546 )
547 if high_res:
548 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = _canvas_geo_bounds(
549 project_fn, canvas_w, canvas_h, f_min_lon, f_max_lon, f_min_lat, f_max_lat
550 )
551 bg = _make_tile_background(
552 project_fn,
553 fetch_lon_min,
554 fetch_lon_max,
555 fetch_lat_min,
556 fetch_lat_max,
557 canvas_w,
558 canvas_h,
559 openaip_key=_openaip_key,
560 tile_cache=tile_cache,
561 max_tiles=_max_tiles_canvas,
562 )
563 if bg is None and high_res:
564 bg = _make_tile_background(
565 project_fn,
566 f_min_lon,
567 f_max_lon,
568 f_min_lat,
569 f_max_lat,
570 canvas_w,
571 canvas_h,
572 openaip_key=_openaip_key,
573 tile_cache=tile_cache,
574 max_tiles=_max_tiles,
575 )
576 img = (
577 bg.copy()
578 if bg is not None
579 else Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR)
580 )
582 draw = ImageDraw.Draw(img)
583 pts = [project_fn(lon, lat) for lon, lat in all_coords]
584 if len(pts) >= 2:
585 draw.line(pts, fill=_TRACK_COLOUR, width=_line_w)
586 r = _line_w + 2 * _q_scale
587 sx, sy = pts[0]
588 draw.ellipse([sx - r, sy - r, sx + r, sy + r], fill=(40, 160, 60))
589 ex, ey = pts[-1]
590 draw.ellipse([ex - r, ey - r, ex + r, ey + r], fill=(200, 40, 40))
592 label = f"{date} {dep} → {arr}".strip(" →").strip()
593 if label:
594 try:
595 font: Any = ImageFont.truetype(_font_path, _font_sz)
596 except (IOError, OSError):
597 font = ImageFont.load_default()
598 draw.text(
599 (_txt_margin + 1, _txt_margin + 1), label, fill=(255, 255, 255), font=font
600 )
601 draw.text((_txt_margin, _txt_margin), label, fill=(40, 40, 60), font=font)
603 buf = io.BytesIO()
604 img.save(buf, format="PNG")
605 return buf.getvalue()
608def generate_single_track_gif(
609 geojson: dict[str, Any] | None,
610 date: str = "",
611 dep: str = "",
612 arr: str = "",
613 _font_path: str = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
614 _openaip_key: str | None = None,
615 canvas_w: int = _GIF_W,
616 canvas_h: int = _GIF_H,
617 high_res: bool = False,
618) -> bytes:
619 """Render an animated GIF of a single flight track drawn progressively.
621 The track's coordinate array is split into N equal chunks (3–10 depending
622 on track length). Each frame draws all chunks revealed so far, fitting the
623 map bounding box to the accumulated coords so the view zooms out as the
624 route grows — mirroring the per-flight behaviour of the web Animate button.
626 Falls back to a single-frame GIF wrapping the still PNG when the track has
627 fewer than 20 GPS points (too sparse for meaningful animation).
628 """
629 from PIL import Image, ImageDraw, ImageFont # pyright: ignore[reportMissingImports]
631 all_coords = _coords_from_geojson(geojson)
633 _q_scale = 2 if high_res else 1
634 _pad = _GIF_PAD * _q_scale
635 _line_curr = 3 * _q_scale
636 _line_hist = 2 * _q_scale
637 _font_sz = 14 * _q_scale
638 _font_sz_sm = 11 * _q_scale
639 _txt_margin = 8 * _q_scale
640 _q_colors = 256 if high_res else 128
641 _max_tiles = 64 if high_res else 36
642 _max_tiles_canvas = _max_tiles * 2 if high_res else _max_tiles
644 def _blank_gif() -> bytes:
645 buf = io.BytesIO()
646 Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR).save(buf, format="GIF")
647 return buf.getvalue()
649 if len(all_coords) < 2:
650 return _blank_gif()
652 # Sparse-track fallback: wrap the still image in a single-frame GIF
653 if len(all_coords) < 20:
654 png = generate_single_track_image(
655 geojson,
656 date=date,
657 dep=dep,
658 arr=arr,
659 _font_path=_font_path,
660 _openaip_key=_openaip_key,
661 canvas_w=canvas_w,
662 canvas_h=canvas_h,
663 high_res=high_res,
664 )
665 img = Image.open(io.BytesIO(png)).convert("RGB")
666 buf = io.BytesIO()
667 img.quantize(colors=_q_colors, dither=Image.Dither.NONE).save(buf, format="GIF")
668 return buf.getvalue()
670 # Split coords into N equal chunks (3–10)
671 n_chunks = max(3, min(10, len(all_coords) // 10))
672 chunk_size = len(all_coords) // n_chunks
673 chunks: list[list[tuple[float, float]]] = [
674 list(all_coords[i * chunk_size : (i + 1) * chunk_size])
675 for i in range(n_chunks - 1)
676 ]
677 chunks.append(
678 list(all_coords[(n_chunks - 1) * chunk_size :])
679 ) # last catches remainder
681 try:
682 font: Any = ImageFont.truetype(_font_path, _font_sz)
683 font_sm: Any = ImageFont.truetype(_font_path, _font_sz_sm)
684 except (IOError, OSError):
685 font = font_sm = ImageFont.load_default()
687 def draw_shadow_text(draw: Any, text: str, fnt: Any) -> None:
688 draw.text(
689 (_txt_margin + 1, _txt_margin + 1), text, fill=(255, 255, 255), font=fnt
690 )
691 draw.text((_txt_margin, _txt_margin), text, fill=(40, 40, 60), font=fnt)
693 def draw_track(
694 draw: Any,
695 coords: list[tuple[float, float]],
696 colour: tuple[int, int, int],
697 width: int,
698 project_fn: Callable[[float, float], tuple[int, int]],
699 ) -> None:
700 pts = [project_fn(lon, lat) for lon, lat in coords]
701 if len(pts) >= 2:
702 draw.line(pts, fill=colour, width=width)
703 r = width + 2 * _q_scale
704 sx, sy = pts[0]
705 draw.ellipse([sx - r, sy - r, sx + r, sy + r], fill=(40, 160, 60))
706 ex, ey = pts[-1]
707 draw.ellipse([ex - r, ey - r, ex + r, ey + r], fill=(200, 40, 40))
709 tile_cache: dict[Any, bytes] = {}
711 def _frame_bg(
712 project_fn: Callable[[float, float], tuple[int, int]],
713 min_lon: float,
714 max_lon: float,
715 min_lat: float,
716 max_lat: float,
717 ) -> Any:
718 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = (
719 min_lon,
720 min_lat,
721 max_lon,
722 max_lat,
723 )
724 if high_res:
725 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = (
726 _canvas_geo_bounds(
727 project_fn, canvas_w, canvas_h, min_lon, max_lon, min_lat, max_lat
728 )
729 )
730 bg = _make_tile_background(
731 project_fn,
732 fetch_lon_min,
733 fetch_lon_max,
734 fetch_lat_min,
735 fetch_lat_max,
736 canvas_w,
737 canvas_h,
738 openaip_key=_openaip_key,
739 tile_cache=tile_cache,
740 max_tiles=_max_tiles_canvas,
741 )
742 if bg is None and high_res:
743 bg = _make_tile_background(
744 project_fn,
745 min_lon,
746 max_lon,
747 min_lat,
748 max_lat,
749 canvas_w,
750 canvas_h,
751 openaip_key=_openaip_key,
752 tile_cache=tile_cache,
753 max_tiles=_max_tiles,
754 )
755 return (
756 bg.copy()
757 if bg is not None
758 else Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR)
759 )
761 label = f"{date} {dep} → {arr}".strip(" →").strip()
763 frames: list[Any] = []
764 durations: list[int] = []
765 accumulated: list[tuple[float, float]] = []
767 for chunk_idx, chunk in enumerate(chunks):
768 accumulated.extend(chunk)
769 proj_result = _build_gif_projection(
770 accumulated, canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad
771 )
772 if proj_result is None:
773 continue
775 project_fn, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result
776 img = _frame_bg(project_fn, f_min_lon, f_max_lon, f_min_lat, f_max_lat)
777 draw = ImageDraw.Draw(img)
779 for i in range(chunk_idx):
780 draw_track(draw, chunks[i], _HISTORY_COLOUR, _line_hist, project_fn)
781 draw_track(draw, chunk, _TRACK_COLOUR, _line_curr, project_fn)
783 if label:
784 draw_shadow_text(draw, label, font)
785 draw.text(
786 (_txt_margin, canvas_h - _font_sz_sm - _txt_margin),
787 f"{chunk_idx + 1} / {n_chunks}",
788 fill=(80, 80, 100),
789 font=font_sm,
790 )
791 frames.append(img)
792 durations.append(_GIF_STEP_MS)
794 if not frames:
795 return _blank_gif()
797 # Final hold frame: full track, all chunks at equal weight
798 proj_result_final = _build_gif_projection(
799 list(all_coords), canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad
800 )
801 # guaranteed non-None: all_coords has ≥ 20 points
802 project_final, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result_final # type: ignore[misc]
803 img = _frame_bg(project_final, f_min_lon, f_max_lon, f_min_lat, f_max_lat)
804 draw = ImageDraw.Draw(img)
805 for chunk in chunks:
806 draw_track(draw, chunk, _TRACK_COLOUR, _line_hist, project_final)
807 if label:
808 draw_shadow_text(draw, label, font)
809 frames.append(img)
810 durations.append(_GIF_HOLD_MS)
812 palette_src = frames[-1].quantize(colors=_q_colors, dither=Image.Dither.NONE)
813 quantized: list[Any] = [
814 f.quantize(palette=palette_src, dither=Image.Dither.NONE) for f in frames
815 ]
816 buf = io.BytesIO()
817 quantized[0].save(
818 buf,
819 format="GIF",
820 save_all=True,
821 append_images=quantized[1:],
822 loop=0,
823 duration=durations,
824 optimize=True,
825 )
826 return buf.getvalue()
829@functools.lru_cache(maxsize=1)
830def _load_aircraft_types() -> dict[str, tuple[str, str]]:
831 """Return {type_designator: (manufacturer, model)} from aircraft_types.csv."""
832 path = os.path.join(os.path.dirname(__file__), "data", "aircraft_types.csv")
833 result: dict[str, tuple[str, str]] = {}
834 try:
835 with open(path, newline="", encoding="utf-8") as f:
836 for row in csv.DictReader(f):
837 des = row.get("type_designator", "").strip().upper()
838 mfr = row.get("manufacturer", "").strip()
839 model = row.get("model", "").strip()
840 if des:
841 result[des] = (mfr, model)
842 except OSError as exc:
843 _log.warning("aircraft_types.csv not found: %s", exc)
844 return result
847def _load_aircraft_type_variants() -> list[tuple[str, str, str, str]]:
848 """Return all (type_designator, full_name, manufacturer, model) tuples — one per CSV row.
850 Unlike _load_aircraft_types(), duplicate designators are preserved so
851 the search endpoint can surface every variant (e.g. all PA-28-181 models
852 that share the P28A ICAO code). manufacturer and model are kept separate
853 so callers can pre-fill form fields for the exact selected variant rather
854 than an arbitrary one sharing the same ICAO code.
855 """
856 path = os.path.join(os.path.dirname(__file__), "data", "aircraft_types.csv")
857 result: list[tuple[str, str, str, str]] = []
858 seen: set[tuple[str, str]] = set()
859 try:
860 with open(path, newline="", encoding="utf-8") as f:
861 for row in csv.DictReader(f):
862 des = row.get("type_designator", "").strip().upper()
863 mfr = row.get("manufacturer", "").strip()
864 model = row.get("model", "").strip()
865 name = f"{mfr} {model}".strip()
866 if des and (des, name) not in seen:
867 result.append((des, name, mfr, model))
868 seen.add((des, name))
869 except OSError as exc:
870 _log.warning("aircraft_types.csv not found: %s", exc)
871 return result
874@functools.lru_cache(maxsize=1)
875def _load_aircraft_type_engine_data() -> dict[str, tuple[int, str]]:
876 """Return {type_designator: (engine_count, engine_type)} from aircraft_types.csv.
878 For designators with multiple rows (variants), uses data from the first row.
879 """
880 path = os.path.join(os.path.dirname(__file__), "data", "aircraft_types.csv")
881 result: dict[str, tuple[int, str]] = {}
882 try:
883 with open(path, newline="", encoding="utf-8") as f:
884 for row in csv.DictReader(f):
885 des = row.get("type_designator", "").strip().upper()
886 if not des or des in result:
887 continue
888 try:
889 ec = int(row.get("engine_count", "1"))
890 except ValueError:
891 ec = 1
892 et = row.get("engine_type", "").strip()
893 result[des] = (ec, et)
894 except OSError as exc:
895 _log.warning("aircraft_types.csv not found: %s", exc)
896 return result
899def get_aircraft_type_engine_info(icao_code: str) -> tuple[int, str] | None:
900 """Return (engine_count, engine_type) for the given ICAO code, or None."""
901 return _load_aircraft_type_engine_data().get(icao_code.strip().upper())
904def resolve_aircraft_type_icao(aircraft_type: str | None) -> str | None:
905 """Return the matching ICAO type designator for *aircraft_type*, or None."""
906 if not aircraft_type:
907 return None
908 types = _load_aircraft_types()
909 norm = aircraft_type.strip().upper()
910 if norm in types:
911 return norm
912 # Try stripping hyphens and spaces (e.g. "PA-28" → "PA28")
913 compact = norm.replace("-", "").replace(" ", "")
914 if compact in types:
915 return compact
916 return None
919@functools.lru_cache(maxsize=1)
920def _load_airport_names() -> dict[str, str]:
921 """Return {ICAO ident: airport name} for all airports in airports.csv."""
922 path = os.path.join(os.path.dirname(__file__), "data", "airports.csv")
923 result: dict[str, str] = {}
924 try:
925 with open(path, newline="", encoding="utf-8") as f:
926 for row in csv.DictReader(f):
927 ident = row.get("ident", "").strip()
928 name = row.get("name", "").strip()
929 if ident and name:
930 result[ident] = name
931 except OSError as exc:
932 _log.warning("airports.csv not found: %s", exc)
933 return result
936_alog = logging.getLogger("openhangar.activity")
939def _sl(value: object) -> str:
940 """Sanitize a value for log output — strips CR/LF to prevent log injection (CWE-117)."""
941 return str(value).replace("\r\n", "").replace("\n", "").replace("\r", "")
944def activity(event: str, **fields: object) -> None:
945 """Emit a structured [ACTIVITY] log entry with user_id and ip automatically included."""
946 from flask import request, session # noqa: PLC0415
948 uid = session.get("user_id", "")
949 ip = request.remote_addr or ""
950 parts = [f"[ACTIVITY] {event}", f"user_id={_sl(uid)}", f"ip={_sl(ip)}"]
951 parts.extend(f"{k}={_sl(v)}" for k, v in fields.items())
952 _alog.info(" ".join(parts))
955def login_required(f: Callable[..., Any]) -> Callable[..., Any]:
956 """Redirect unauthenticated users to the login page."""
958 @wraps(f)
959 def decorated(*args: Any, **kwargs: Any) -> Any:
960 if not session.get("user_id"):
961 return redirect(url_for("auth.login"))
962 return f(*args, **kwargs)
964 return decorated
967def require_instance_admin(f: Callable[..., Any]) -> Callable[..., Any]:
968 """Abort 403 unless the current user is the instance admin."""
970 @wraps(f)
971 def decorated(*args: Any, **kwargs: Any) -> Any:
972 from models import User, db
974 user_id = session.get("user_id")
975 if not user_id:
976 return redirect(url_for("auth.login"))
977 user = db.session.get(User, user_id)
978 if not user or not user.is_instance_admin:
979 abort(403)
980 return f(*args, **kwargs)
982 return decorated
985def current_user_role() -> str | None:
986 """Return the Role of the current user in their tenant, or None."""
987 from models import TenantUser
989 user_id = session.get("user_id")
990 if not user_id:
991 return None
992 tu = TenantUser.query.filter_by(user_id=user_id).first()
993 return tu.role if tu else None
996def require_role(*roles: str) -> Callable[..., Any]:
997 """Decorator: abort 403 if the current user's role is not in *roles*."""
999 def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
1000 @wraps(f)
1001 def decorated(*args: Any, **kwargs: Any) -> Any:
1002 if current_user_role() not in roles:
1003 abort(403)
1004 return f(*args, **kwargs)
1006 return decorated
1008 return decorator
1011def require_pilot_access(f: Callable[..., Any]) -> Callable[..., Any]:
1012 """Decorator: abort 403 unless the user has pilot access.
1014 Pilot access is granted by ADMIN/OWNER/PILOT/STUDENT/INSTRUCTOR role,
1015 or by the per-user is_pilot capability flag.
1016 """
1018 @wraps(f)
1019 def decorated(*args: Any, **kwargs: Any) -> Any:
1020 from models import Role, User, db
1022 role = current_user_role()
1023 if role in (Role.ADMIN, Role.OWNER, Role.PILOT, Role.STUDENT, Role.INSTRUCTOR):
1024 return f(*args, **kwargs)
1025 uid = session.get("user_id")
1026 if uid:
1027 user = db.session.get(User, uid)
1028 if user and user.is_pilot:
1029 return f(*args, **kwargs)
1030 return abort(403)
1032 return decorated
1035def require_maint_access(f: Callable[..., Any]) -> Callable[..., Any]:
1036 """Decorator: abort 403 unless the user has maintenance access.
1038 Maintenance access is granted by ADMIN/OWNER/MAINTENANCE/INSTRUCTOR role,
1039 or by the per-user is_maintenance capability flag.
1040 """
1042 @wraps(f)
1043 def decorated(*args: Any, **kwargs: Any) -> Any:
1044 from models import Role, User, db
1046 role = current_user_role()
1047 if role in (Role.ADMIN, Role.OWNER, Role.MAINTENANCE, Role.INSTRUCTOR):
1048 return f(*args, **kwargs)
1049 uid = session.get("user_id")
1050 if uid:
1051 user = db.session.get(User, uid)
1052 if user and user.is_maintenance:
1053 return f(*args, **kwargs)
1054 return abort(403)
1056 return decorated
1059def user_can_access_aircraft(aircraft_id: int) -> bool:
1060 """Return True when the current user may access this aircraft.
1062 ADMIN and OWNER bypass the check entirely. Other roles need either a
1063 UserAllAircraftAccess row (all-planes grant) or a per-aircraft
1064 UserAircraftAccess row.
1065 """
1066 from models import Role, TenantUser, UserAircraftAccess, UserAllAircraftAccess
1068 role = current_user_role()
1069 if role in (Role.ADMIN, Role.OWNER):
1070 return True
1071 uid = session.get("user_id")
1072 if not uid:
1073 return False
1074 tu = TenantUser.query.filter_by(user_id=uid).first()
1075 if (
1076 tu
1077 and UserAllAircraftAccess.query.filter_by(
1078 user_id=uid, tenant_id=tu.tenant_id
1079 ).first()
1080 ):
1081 return True
1082 return (
1083 UserAircraftAccess.query.filter_by(user_id=uid, aircraft_id=aircraft_id).first()
1084 is not None
1085 )
1088def accessible_aircraft(tenant_id: int) -> Any:
1089 """Return a query of Aircraft the current user is allowed to see.
1091 ADMIN and OWNER see every aircraft in the tenant. A user with a
1092 UserAllAircraftAccess row for the tenant also sees all aircraft.
1093 Other roles see only aircraft granted via UserAircraftAccess.
1094 """
1095 from models import Aircraft, Role, UserAircraftAccess, UserAllAircraftAccess
1097 base = Aircraft.query.filter_by(tenant_id=tenant_id).order_by(Aircraft.registration)
1098 role = current_user_role()
1099 if role in (Role.ADMIN, Role.OWNER):
1100 return base
1101 uid = session.get("user_id")
1102 if not uid:
1103 from sqlalchemy import false
1105 return base.filter(false())
1106 if UserAllAircraftAccess.query.filter_by(user_id=uid, tenant_id=tenant_id).first():
1107 return base
1108 ids = [
1109 row.aircraft_id
1110 for row in (
1111 UserAircraftAccess.query.filter_by(user_id=uid)
1112 .with_entities(UserAircraftAccess.aircraft_id)
1113 .all()
1114 )
1115 ]
1116 if not ids:
1117 from sqlalchemy import false
1119 return base.filter(false())
1120 return base.filter(Aircraft.id.in_(ids))
1123def compute_aircraft_statuses(
1124 aircraft_list: Any, triggers: Any, hobbs_by_id: Any
1125) -> dict[int, str]:
1126 """Return {aircraft_id: 'grounded'|'overdue'|'due_soon'|'ok'} for every aircraft.
1128 Grounded (expired insurance or unresolved grounding snag) takes priority.
1129 Among maintenance: overdue > due_soon > ok.
1130 Insurance expiring soon maps to due_soon.
1131 """
1132 by_aircraft = defaultdict(list)
1133 for t in triggers:
1134 by_aircraft[t.aircraft_id].append(t)
1136 result = {}
1137 for ac in aircraft_list:
1138 if ac.is_grounded:
1139 result[ac.id] = "grounded"
1140 continue
1141 hobbs = hobbs_by_id.get(ac.id)
1142 statuses = [t.status(hobbs) for t in by_aircraft.get(ac.id, [])]
1143 ins = ac.insurance_status
1144 if ins == "expiring_soon":
1145 statuses.append("due_soon")
1146 if "overdue" in statuses:
1147 result[ac.id] = "overdue"
1148 elif "due_soon" in statuses:
1149 result[ac.id] = "due_soon"
1150 else:
1151 result[ac.id] = "ok"
1152 return result