Coverage for app/utils.py: 100%

575 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1"""Shared utilities available to all blueprints.""" 

2 

3import csv 

4import functools 

5import io 

6import logging 

7import math 

8import os 

9from collections import defaultdict 

10from functools import wraps 

11from typing import Any, Callable 

12 

13from flask import abort, redirect, session, url_for # pyright: ignore[reportMissingImports] 

14 

15_log = logging.getLogger(__name__) 

16 

17 

18# ── Tracks GIF export ───────────────────────────────────────────────────────── 

19 

20_GIF_W, _GIF_H = 800, 480 

21_GIF_PAD = 30 # pixel padding around the track area 

22_GIF_STEP_MS = 600 # ms per frame in the animated GIF 

23_GIF_HOLD_MS = 3000 # ms for the final "all tracks" frame 

24_TRACK_COLOUR = (180, 30, 200) # current/newest track: vivid purple 

25_HISTORY_COLOUR = ( 

26 130, 

27 20, 

28 150, 

29) # previously drawn tracks: slightly darker purple (GIF uses flat colour; web uses opacity fade) 

30_BG_COLOUR = (248, 248, 252) # near-white background 

31 

32 

33def _mercator_y(lat_deg: float) -> float: 

34 """Web-Mercator y value for a latitude (not clamped).""" 

35 lat = math.radians(max(-85.0, min(85.0, lat_deg))) 

36 return math.log(math.tan(math.pi / 4 + lat / 2)) 

37 

38 

39def _build_gif_projection( 

40 all_coords: list[tuple[float, float]], 

41 canvas_w: int = _GIF_W, 

42 canvas_h: int = _GIF_H, 

43 pad: int = _GIF_PAD, 

44) -> "tuple[Any, Any] | None": 

45 """Return (project_fn, bbox) or None if there are fewer than 2 unique points.""" 

46 if len(all_coords) < 2: 

47 return None 

48 lons = [c[0] for c in all_coords] 

49 lats = [c[1] for c in all_coords] 

50 min_lon, max_lon = min(lons), max(lons) 

51 min_lat, max_lat = min(lats), max(lats) 

52 # Add small margin so tracks don't touch the edge 

53 dlon = max(max_lon - min_lon, 0.1) * 0.1 

54 dlat = max(max_lat - min_lat, 0.1) * 0.1 

55 min_lon -= dlon 

56 max_lon += dlon 

57 min_lat -= dlat 

58 max_lat += dlat 

59 

60 usable_w = canvas_w - 2 * pad 

61 usable_h = canvas_h - 2 * pad 

62 

63 y_min = _mercator_y(min_lat) 

64 y_max = _mercator_y(max_lat) 

65 y_range = y_max - y_min or 1e-9 # Mercator log-tan units (NOT degrees) 

66 x_range = max_lon - min_lon or 1e-9 # degrees 

67 

68 # Web Mercator geographic aspect: scale_y_mercator = scale_x * 180/π 

69 # (1 Mercator-y unit = 111 km at any latitude; 1° lon at centre = cos(φ)*111 km, 

70 # so equal-km scales satisfy scale_y = scale_x * 180/π). 

71 # Pick scale_x so BOTH axes fit: scale_x ≤ w/x_range AND scale_x * 180/π * y_range ≤ h. 

72 scale_x = min( 

73 usable_w / x_range, 

74 usable_h * math.pi / (180.0 * y_range), 

75 ) 

76 scale_y = scale_x * 180.0 / math.pi # pixels per Mercator-y unit 

77 

78 off_x = pad + (usable_w - x_range * scale_x) / 2 

79 off_y = pad + (usable_h - y_range * scale_y) / 2 

80 

81 def project(lon: float, lat: float) -> tuple[int, int]: 

82 px = off_x + (lon - min_lon) * scale_x 

83 py = off_y + (y_max - _mercator_y(lat)) * scale_y 

84 return int(px), int(py) 

85 

86 return project, (min_lon, min_lat, max_lon, max_lat) 

87 

88 

89def _coords_from_geojson(geojson: dict[str, Any] | None) -> list[tuple[float, float]]: 

90 """Extract (lon, lat) pairs from a GeoJSON Feature or FeatureCollection.""" 

91 if not geojson: 

92 return [] 

93 if geojson.get("type") == "Feature": 

94 geom = geojson.get("geometry") or {} 

95 return [(c[0], c[1]) for c in geom.get("coordinates", []) if len(c) >= 2] 

96 if geojson.get("type") == "FeatureCollection": 

97 result = [] 

98 for feat in geojson.get("features", []): 

99 result.extend(_coords_from_geojson(feat)) 

100 return result 

101 return [] 

102 

103 

104def _make_tile_background( 

105 project: Callable[[float, float], tuple[int, int]], 

106 min_lon: float, 

107 max_lon: float, 

108 min_lat: float, 

109 max_lat: float, 

110 canvas_w: int, 

111 canvas_h: int, 

112 openaip_key: str | None = None, 

113 tile_cache: dict[Any, bytes] | None = None, 

114 max_tiles: int = 36, 

115) -> Any: 

116 """Fetch OSM raster tiles and composite them into a background PIL Image. 

117 

118 Returns a PIL Image, or None on failure (caller falls back to plain fill). 

119 Tiles are fetched from OpenStreetMap; zoom level is chosen automatically. 

120 Max 36 tiles are fetched to keep export time reasonable. 

121 

122 tile_cache, if provided, is a dict keyed by (z, tx, ty) or ("opi", z, tx, ty) 

123 holding raw PNG bytes so repeated calls across frames skip network fetches. 

124 """ 

125 import urllib.request 

126 from PIL import Image as _Img # pyright: ignore[reportMissingImports] 

127 

128 # Compute pixels-per-degree-lon from the projection function 

129 mid_lat = (min_lat + max_lat) / 2.0 

130 px0x = project(min_lon, mid_lat)[0] 

131 px1x = project(min_lon + 1.0, mid_lat)[0] 

132 scale_x = float(px1x - px0x) # canvas pixels per degree-lon 

133 if scale_x <= 0: 

134 return None 

135 

136 # Choose zoom so each tile is ~256 canvas pixels wide (clamped 2–14) 

137 z = int(round(math.log2(max(scale_x * 360.0 / 256.0, 1.0)))) 

138 z = max(2, min(z, 14)) 

139 n = 2**z 

140 

141 def _lon_to_tx(lon: float) -> int: 

142 return int(int((lon + 180.0) / 360.0 * n) % n) 

143 

144 def _lat_to_ty(lat: float) -> int: 

145 lat_r = math.radians(max(-85.0, min(85.0, lat))) 

146 return int( 

147 (1.0 - math.log(math.tan(lat_r) + 1.0 / math.cos(lat_r)) / math.pi) 

148 / 2.0 

149 * n 

150 ) 

151 

152 def _tile_nw_lonlat(tx: int, ty: int) -> tuple[float, float]: 

153 lon = tx * 360.0 / n - 180.0 

154 lat = math.degrees(math.atan(math.sinh(math.pi * (1.0 - 2.0 * ty / n)))) 

155 return lon, lat 

156 

157 tx_min = _lon_to_tx(min_lon) 

158 tx_max = _lon_to_tx(max_lon) 

159 ty_min = _lat_to_ty(max_lat) 

160 ty_max = _lat_to_ty(min_lat) 

161 

162 # Guard against tile count explosion 

163 if (tx_max - tx_min + 1) * (ty_max - ty_min + 1) > max_tiles: 

164 return None 

165 

166 bg = _Img.new("RGB", (canvas_w, canvas_h), _BG_COLOUR) 

167 ua = "OpenHangar flight-logbook GIF export (https://github.com/e2jk/OpenHangar)" 

168 

169 for tx in range(tx_min, tx_max + 1): 

170 for ty in range(ty_min, ty_max + 1): 

171 try: 

172 tx_w = tx % n 

173 # Compute pixel bounds for this tile by projecting its NW corner 

174 # and the NW corner of the tile to its SE — then size the resize 

175 # to exactly span that gap plus 1 px overlap, eliminating seams 

176 # caused by integer rounding of adjacent corner coordinates. 

177 lon_nw, lat_nw = _tile_nw_lonlat(tx, ty) 

178 lon_se, lat_se = _tile_nw_lonlat(tx + 1, ty + 1) 

179 px, py = project(lon_nw, lat_nw) 

180 px_se, py_se = project(lon_se, lat_se) 

181 tile_w = max(1, px_se - px + 1) 

182 tile_h = max(1, py_se - py + 1) 

183 

184 # Base map: CARTO light (same as web animation) 

185 base_key = (z, tx_w, ty) 

186 if tile_cache is not None and base_key in tile_cache: 

187 raw = tile_cache[base_key] 

188 else: 

189 base_url = ( 

190 f"https://a.basemaps.cartocdn.com/light_all/{z}/{tx_w}/{ty}.png" 

191 ) 

192 req = urllib.request.Request(base_url, headers={"User-Agent": ua}) 

193 with urllib.request.urlopen(req, timeout=5) as resp: 

194 raw = resp.read() 

195 if tile_cache is not None: 

196 tile_cache[base_key] = raw 

197 tile = _Img.open(io.BytesIO(raw)).convert("RGBA") 

198 tile = tile.resize((tile_w, tile_h), _Img.Resampling.LANCZOS) 

199 bg.paste(tile.convert("RGB"), (px, py)) 

200 # Aviation overlay: OpenAIP (only at zoom ≤ 14, requires key) 

201 if openaip_key and z <= 14: 

202 opi_key = ("opi", z, tx_w, ty) 

203 if tile_cache is not None and opi_key in tile_cache: 

204 opi_raw = tile_cache[opi_key] 

205 else: 

206 opi_url = f"https://api.tiles.openaip.net/api/data/openaip/{z}/{tx_w}/{ty}.png?apiKey={openaip_key}" 

207 opi_req = urllib.request.Request( 

208 opi_url, headers={"User-Agent": ua} 

209 ) 

210 with urllib.request.urlopen(opi_req, timeout=5) as opi_resp: 

211 opi_raw = opi_resp.read() 

212 if tile_cache is not None: 

213 tile_cache[opi_key] = opi_raw 

214 opi_tile = _Img.open(io.BytesIO(opi_raw)).convert("RGBA") 

215 opi_tile = opi_tile.resize( 

216 (tile_w, tile_h), _Img.Resampling.LANCZOS 

217 ) 

218 bg.paste(opi_tile.convert("RGB"), (px, py), opi_tile) 

219 except Exception as exc: 

220 _log.debug("OPI tile unavailable, leaving area as background: %s", exc) 

221 

222 return bg 

223 

224 

225def _canvas_geo_bounds( 

226 project_fn: Callable[[float, float], tuple[int, int]], 

227 canvas_w: int, 

228 canvas_h: int, 

229 min_lon: float, 

230 max_lon: float, 

231 min_lat: float, 

232 max_lat: float, 

233) -> tuple[float, float, float, float]: 

234 """Return (lon_min, lat_min, lon_max, lat_max) spanning the full canvas. 

235 

236 Expands the geographic bbox beyond the track bounding box so the full 

237 canvas area is covered by map tiles, eliminating plain-background padding. 

238 Returns the original bbox if the inverse projection cannot be computed. 

239 """ 

240 mid_lat = (min_lat + max_lat) / 2.0 

241 # Longitude is linear with pixel x in Mercator 

242 px0, _ = project_fn(min_lon, mid_lat) 

243 px1, _ = project_fn(min_lon + 1.0, mid_lat) 

244 scale_x = float(px1 - px0) 

245 if scale_x <= 0: 

246 return min_lon, min_lat, max_lon, max_lat 

247 c_lon_min = min_lon - float(px0) / scale_x 

248 c_lon_max = min_lon + float(canvas_w - px0) / scale_x 

249 

250 # Latitude: derive Mercator scale from two known projection points 

251 _, py_bot = project_fn(min_lon, min_lat) # larger py (bottom of canvas area) 

252 _, py_top = project_fn(min_lon, max_lat) # smaller py (top of canvas area) 

253 merc_bot = _mercator_y(min_lat) 

254 merc_top = _mercator_y(max_lat) 

255 merc_range = merc_top - merc_bot 

256 py_range = float(py_bot - py_top) 

257 if merc_range <= 0 or py_range <= 0: 

258 return c_lon_min, min_lat, c_lon_max, max_lat 

259 scale_y = py_range / merc_range # pixels per Mercator-y unit 

260 # Derived: merc_y at canvas row py = merc_top + (py_top - py) / scale_y 

261 merc_canvas_top = merc_top + float(py_top) / scale_y 

262 merc_canvas_bot = merc_top + float(py_top - canvas_h) / scale_y 

263 # Clamp to avoid math.atan overflow at extreme Mercator values 

264 merc_canvas_top = max(-10.0, min(10.0, merc_canvas_top)) 

265 merc_canvas_bot = max(-10.0, min(10.0, merc_canvas_bot)) 

266 c_lat_max = math.degrees(2.0 * math.atan(math.exp(merc_canvas_top)) - math.pi / 2.0) 

267 c_lat_min = math.degrees(2.0 * math.atan(math.exp(merc_canvas_bot)) - math.pi / 2.0) 

268 return c_lon_min, max(-85.0, c_lat_min), c_lon_max, min(85.0, c_lat_max) 

269 

270 

271def sort_tracks_oldest_first( 

272 rows: list[dict[str, Any]], 

273) -> list[dict[str, Any]]: 

274 """Return rows sorted ascending by their 'date' string key (oldest first). 

275 

276 Both the web animation and the GIF export use this ordering so that 

277 chronological playback and progressive opacity fading are consistent 

278 across rendering environments. Call this once in the route handler and 

279 pass the result to both the template context and generate_tracks_gif(). 

280 """ 

281 return sorted(rows, key=lambda r: r.get("date", "")) 

282 

283 

284def generate_tracks_gif( 

285 track_rows: list[dict[str, Any]], 

286 _font_path: str = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 

287 _openaip_key: str | None = None, 

288 canvas_w: int = _GIF_W, 

289 canvas_h: int = _GIF_H, 

290 high_res: bool = False, 

291) -> bytes: 

292 """Render an animated GIF of the flight tracks, oldest-first. 

293 

294 track_rows must already be sorted oldest-first; use sort_tracks_oldest_first(). 

295 Each frame adds one more track (cumulative) and re-fits the map to the 

296 bounding box of tracks seen so far, creating a progressive zoom-out effect 

297 that mirrors the web animation. Previous tracks are drawn in a lighter 

298 colour; the newest one in vivid purple. Returns raw GIF bytes ready to 

299 stream to the browser. 

300 

301 high_res=True doubles line widths, font sizes and padding, uses a 256-colour 

302 palette, and raises the tile-fetch cap to 64 — producing a sharper map with 

303 readable labels at the cost of a larger file. 

304 """ 

305 from PIL import Image, ImageDraw, ImageFont # pyright: ignore[reportMissingImports] 

306 

307 _q_scale = 2 if high_res else 1 

308 _pad = _GIF_PAD * _q_scale 

309 _line_curr = 3 * _q_scale 

310 _line_hist = 2 * _q_scale 

311 _font_sz = 14 * _q_scale 

312 _font_sz_sm = 11 * _q_scale 

313 _txt_margin = 8 * _q_scale 

314 _q_colors = 256 if high_res else 128 

315 _max_tiles = 64 if high_res else 36 

316 # Canvas-extent calls cover the full 1600×960 canvas which can require 

317 # ~70–90 tiles at certain zoom levels (wide flat tracks at z=7), so use a 

318 # higher cap than the track-bbox call. 128 = 2× the high-res track cap. 

319 _max_tiles_canvas = _max_tiles * 2 if high_res else _max_tiles 

320 

321 # Pre-compute per-track coordinates (avoids re-parsing GeoJSON in the inner loop) 

322 per_track_coords = [_coords_from_geojson(r.get("geojson")) for r in track_rows] 

323 all_coords: list[tuple[float, float]] = [c for tc in per_track_coords for c in tc] 

324 

325 if len(all_coords) < 2: 

326 # Fallback: single blank frame 

327 img = Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR) 

328 buf = io.BytesIO() 

329 img.save(buf, format="GIF") 

330 return buf.getvalue() 

331 

332 font: Any # PIL font type varies across Pillow versions 

333 font_sm: Any 

334 try: 

335 font = ImageFont.truetype(_font_path, _font_sz) 

336 font_sm = ImageFont.truetype(_font_path, _font_sz_sm) 

337 except (IOError, OSError): 

338 font = font_sm = ImageFont.load_default() 

339 

340 def draw_shadow_text(draw: Any, text: str, font: Any) -> None: 

341 draw.text( 

342 (_txt_margin + 1, _txt_margin + 1), text, fill=(255, 255, 255), font=font 

343 ) 

344 draw.text((_txt_margin, _txt_margin), text, fill=(40, 40, 60), font=font) 

345 

346 def draw_track( 

347 draw: Any, 

348 coords: list[tuple[float, float]], 

349 colour: tuple[int, int, int], 

350 width: int, 

351 project_fn: Callable[[float, float], tuple[int, int]], 

352 ) -> None: 

353 pts = [project_fn(lon, lat) for lon, lat in coords] 

354 if len(pts) >= 2: 

355 draw.line(pts, fill=colour, width=width) 

356 r = width + 2 * _q_scale 

357 sx, sy = pts[0] 

358 draw.ellipse([sx - r, sy - r, sx + r, sy + r], fill=(40, 160, 60)) 

359 ex, ey = pts[-1] 

360 draw.ellipse([ex - r, ey - r, ex + r, ey + r], fill=(200, 40, 40)) 

361 

362 # Shared tile cache: (z, tx, ty) → raw PNG bytes; ("opi", z, tx, ty) for OpenAIP. 

363 # Avoids re-fetching tiles that appear in multiple frames at the same zoom level. 

364 tile_cache: dict[Any, bytes] = {} 

365 

366 def _frame_bg( 

367 project_fn: Callable[[float, float], tuple[int, int]], 

368 min_lon: float, 

369 max_lon: float, 

370 min_lat: float, 

371 max_lat: float, 

372 ) -> Any: 

373 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = ( 

374 min_lon, 

375 min_lat, 

376 max_lon, 

377 max_lat, 

378 ) 

379 if high_res: 

380 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = ( 

381 _canvas_geo_bounds( 

382 project_fn, 

383 canvas_w, 

384 canvas_h, 

385 min_lon, 

386 max_lon, 

387 min_lat, 

388 max_lat, 

389 ) 

390 ) 

391 bg = _make_tile_background( 

392 project_fn, 

393 fetch_lon_min, 

394 fetch_lon_max, 

395 fetch_lat_min, 

396 fetch_lat_max, 

397 canvas_w, 

398 canvas_h, 

399 openaip_key=_openaip_key, 

400 tile_cache=tile_cache, 

401 max_tiles=_max_tiles_canvas, 

402 ) 

403 if bg is None and high_res: 

404 # Canvas extent still exceeded the raised cap; fall back to track bbox 

405 bg = _make_tile_background( 

406 project_fn, 

407 min_lon, 

408 max_lon, 

409 min_lat, 

410 max_lat, 

411 canvas_w, 

412 canvas_h, 

413 openaip_key=_openaip_key, 

414 tile_cache=tile_cache, 

415 max_tiles=_max_tiles, 

416 ) 

417 return ( 

418 bg.copy() 

419 if bg is not None 

420 else Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR) 

421 ) 

422 

423 frames: list[Any] = [] 

424 durations: list[int] = [] 

425 accumulated_coords: list[tuple[float, float]] = [] 

426 

427 for frame_idx in range(len(track_rows)): 

428 accumulated_coords.extend(per_track_coords[frame_idx]) 

429 proj_result = _build_gif_projection( 

430 accumulated_coords, canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad 

431 ) 

432 if proj_result is None: 

433 continue # not enough coords yet (e.g. leading rows with no geojson) 

434 

435 project_fn, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result 

436 img = _frame_bg(project_fn, f_min_lon, f_max_lon, f_min_lat, f_max_lat) 

437 draw = ImageDraw.Draw(img) 

438 

439 for i in range(frame_idx): 

440 draw_track( 

441 draw, per_track_coords[i], _HISTORY_COLOUR, _line_hist, project_fn 

442 ) 

443 draw_track( 

444 draw, per_track_coords[frame_idx], _TRACK_COLOUR, _line_curr, project_fn 

445 ) 

446 

447 row = track_rows[frame_idx] 

448 label = f"{row.get('date', '')} {row.get('dep', '')}{row.get('arr', '')}" 

449 draw_shadow_text(draw, label, font) 

450 draw.text( 

451 (_txt_margin, canvas_h - _font_sz_sm - _txt_margin), 

452 f"{frame_idx + 1} / {len(track_rows)}", 

453 fill=(80, 80, 100), 

454 font=font_sm, 

455 ) 

456 

457 frames.append(img) 

458 durations.append(_GIF_STEP_MS) 

459 

460 # Final frame: all tracks at equal weight using the full bounding box, longer hold 

461 if frames: 

462 proj_result_final = _build_gif_projection( 

463 all_coords, canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad 

464 ) 

465 # proj_result_final is guaranteed non-None: all_coords has ≥ 2 points (checked above) 

466 project_final, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result_final # type: ignore[misc] 

467 img = _frame_bg(project_final, f_min_lon, f_max_lon, f_min_lat, f_max_lat) 

468 draw = ImageDraw.Draw(img) 

469 for tc in per_track_coords: 

470 draw_track(draw, tc, _TRACK_COLOUR, _line_hist, project_final) 

471 draw_shadow_text(draw, f"All {len(track_rows)} tracks", font) 

472 frames.append(img) 

473 durations.append(_GIF_HOLD_MS) 

474 

475 # Quantise all frames to a shared 128-colour palette built from the last frame 

476 # (widest view, most visually representative). A single global palette means 

477 # identical background areas compress very well with GIF's LZW codec. 

478 palette_src = frames[-1].quantize(colors=_q_colors, dither=Image.Dither.NONE) 

479 quantized: list[Any] = [ 

480 f.quantize(palette=palette_src, dither=Image.Dither.NONE) for f in frames 

481 ] 

482 

483 buf = io.BytesIO() 

484 quantized[0].save( 

485 buf, 

486 format="GIF", 

487 save_all=True, 

488 append_images=quantized[1:], 

489 loop=0, 

490 duration=durations, 

491 optimize=True, 

492 ) 

493 return buf.getvalue() 

494 

495 

496def generate_single_track_image( 

497 geojson: dict[str, Any] | None, 

498 date: str = "", 

499 dep: str = "", 

500 arr: str = "", 

501 _font_path: str = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 

502 _openaip_key: str | None = None, 

503 canvas_w: int = _GIF_W, 

504 canvas_h: int = _GIF_H, 

505 high_res: bool = False, 

506) -> bytes: 

507 """Render a single GPS track as a static PNG. 

508 

509 Returns raw PNG bytes. If the track has fewer than 2 points a blank 

510 canvas is returned so the caller can always stream a valid image. 

511 """ 

512 from PIL import Image, ImageDraw, ImageFont # pyright: ignore[reportMissingImports] 

513 

514 all_coords = _coords_from_geojson(geojson) 

515 

516 _q_scale = 2 if high_res else 1 

517 _pad = _GIF_PAD * _q_scale 

518 _line_w = 3 * _q_scale 

519 _font_sz = 14 * _q_scale 

520 _txt_margin = 8 * _q_scale 

521 _max_tiles = 64 if high_res else 36 

522 _max_tiles_canvas = _max_tiles * 2 if high_res else _max_tiles 

523 

524 def _blank_png() -> bytes: 

525 buf = io.BytesIO() 

526 Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR).save(buf, format="PNG") 

527 return buf.getvalue() 

528 

529 if len(all_coords) < 2: 

530 return _blank_png() 

531 

532 proj_result = _build_gif_projection( 

533 all_coords, canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad 

534 ) 

535 if proj_result is None: 

536 return _blank_png() 

537 

538 project_fn, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result 

539 

540 tile_cache: dict[Any, bytes] = {} 

541 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = ( 

542 f_min_lon, 

543 f_min_lat, 

544 f_max_lon, 

545 f_max_lat, 

546 ) 

547 if high_res: 

548 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = _canvas_geo_bounds( 

549 project_fn, canvas_w, canvas_h, f_min_lon, f_max_lon, f_min_lat, f_max_lat 

550 ) 

551 bg = _make_tile_background( 

552 project_fn, 

553 fetch_lon_min, 

554 fetch_lon_max, 

555 fetch_lat_min, 

556 fetch_lat_max, 

557 canvas_w, 

558 canvas_h, 

559 openaip_key=_openaip_key, 

560 tile_cache=tile_cache, 

561 max_tiles=_max_tiles_canvas, 

562 ) 

563 if bg is None and high_res: 

564 bg = _make_tile_background( 

565 project_fn, 

566 f_min_lon, 

567 f_max_lon, 

568 f_min_lat, 

569 f_max_lat, 

570 canvas_w, 

571 canvas_h, 

572 openaip_key=_openaip_key, 

573 tile_cache=tile_cache, 

574 max_tiles=_max_tiles, 

575 ) 

576 img = ( 

577 bg.copy() 

578 if bg is not None 

579 else Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR) 

580 ) 

581 

582 draw = ImageDraw.Draw(img) 

583 pts = [project_fn(lon, lat) for lon, lat in all_coords] 

584 if len(pts) >= 2: 

585 draw.line(pts, fill=_TRACK_COLOUR, width=_line_w) 

586 r = _line_w + 2 * _q_scale 

587 sx, sy = pts[0] 

588 draw.ellipse([sx - r, sy - r, sx + r, sy + r], fill=(40, 160, 60)) 

589 ex, ey = pts[-1] 

590 draw.ellipse([ex - r, ey - r, ex + r, ey + r], fill=(200, 40, 40)) 

591 

592 label = f"{date} {dep}{arr}".strip(" →").strip() 

593 if label: 

594 try: 

595 font: Any = ImageFont.truetype(_font_path, _font_sz) 

596 except (IOError, OSError): 

597 font = ImageFont.load_default() 

598 draw.text( 

599 (_txt_margin + 1, _txt_margin + 1), label, fill=(255, 255, 255), font=font 

600 ) 

601 draw.text((_txt_margin, _txt_margin), label, fill=(40, 40, 60), font=font) 

602 

603 buf = io.BytesIO() 

604 img.save(buf, format="PNG") 

605 return buf.getvalue() 

606 

607 

608def generate_single_track_gif( 

609 geojson: dict[str, Any] | None, 

610 date: str = "", 

611 dep: str = "", 

612 arr: str = "", 

613 _font_path: str = "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 

614 _openaip_key: str | None = None, 

615 canvas_w: int = _GIF_W, 

616 canvas_h: int = _GIF_H, 

617 high_res: bool = False, 

618) -> bytes: 

619 """Render an animated GIF of a single flight track drawn progressively. 

620 

621 The track's coordinate array is split into N equal chunks (3–10 depending 

622 on track length). Each frame draws all chunks revealed so far, fitting the 

623 map bounding box to the accumulated coords so the view zooms out as the 

624 route grows — mirroring the per-flight behaviour of the web Animate button. 

625 

626 Falls back to a single-frame GIF wrapping the still PNG when the track has 

627 fewer than 20 GPS points (too sparse for meaningful animation). 

628 """ 

629 from PIL import Image, ImageDraw, ImageFont # pyright: ignore[reportMissingImports] 

630 

631 all_coords = _coords_from_geojson(geojson) 

632 

633 _q_scale = 2 if high_res else 1 

634 _pad = _GIF_PAD * _q_scale 

635 _line_curr = 3 * _q_scale 

636 _line_hist = 2 * _q_scale 

637 _font_sz = 14 * _q_scale 

638 _font_sz_sm = 11 * _q_scale 

639 _txt_margin = 8 * _q_scale 

640 _q_colors = 256 if high_res else 128 

641 _max_tiles = 64 if high_res else 36 

642 _max_tiles_canvas = _max_tiles * 2 if high_res else _max_tiles 

643 

644 def _blank_gif() -> bytes: 

645 buf = io.BytesIO() 

646 Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR).save(buf, format="GIF") 

647 return buf.getvalue() 

648 

649 if len(all_coords) < 2: 

650 return _blank_gif() 

651 

652 # Sparse-track fallback: wrap the still image in a single-frame GIF 

653 if len(all_coords) < 20: 

654 png = generate_single_track_image( 

655 geojson, 

656 date=date, 

657 dep=dep, 

658 arr=arr, 

659 _font_path=_font_path, 

660 _openaip_key=_openaip_key, 

661 canvas_w=canvas_w, 

662 canvas_h=canvas_h, 

663 high_res=high_res, 

664 ) 

665 img = Image.open(io.BytesIO(png)).convert("RGB") 

666 buf = io.BytesIO() 

667 img.quantize(colors=_q_colors, dither=Image.Dither.NONE).save(buf, format="GIF") 

668 return buf.getvalue() 

669 

670 # Split coords into N equal chunks (3–10) 

671 n_chunks = max(3, min(10, len(all_coords) // 10)) 

672 chunk_size = len(all_coords) // n_chunks 

673 chunks: list[list[tuple[float, float]]] = [ 

674 list(all_coords[i * chunk_size : (i + 1) * chunk_size]) 

675 for i in range(n_chunks - 1) 

676 ] 

677 chunks.append( 

678 list(all_coords[(n_chunks - 1) * chunk_size :]) 

679 ) # last catches remainder 

680 

681 try: 

682 font: Any = ImageFont.truetype(_font_path, _font_sz) 

683 font_sm: Any = ImageFont.truetype(_font_path, _font_sz_sm) 

684 except (IOError, OSError): 

685 font = font_sm = ImageFont.load_default() 

686 

687 def draw_shadow_text(draw: Any, text: str, fnt: Any) -> None: 

688 draw.text( 

689 (_txt_margin + 1, _txt_margin + 1), text, fill=(255, 255, 255), font=fnt 

690 ) 

691 draw.text((_txt_margin, _txt_margin), text, fill=(40, 40, 60), font=fnt) 

692 

693 def draw_track( 

694 draw: Any, 

695 coords: list[tuple[float, float]], 

696 colour: tuple[int, int, int], 

697 width: int, 

698 project_fn: Callable[[float, float], tuple[int, int]], 

699 ) -> None: 

700 pts = [project_fn(lon, lat) for lon, lat in coords] 

701 if len(pts) >= 2: 

702 draw.line(pts, fill=colour, width=width) 

703 r = width + 2 * _q_scale 

704 sx, sy = pts[0] 

705 draw.ellipse([sx - r, sy - r, sx + r, sy + r], fill=(40, 160, 60)) 

706 ex, ey = pts[-1] 

707 draw.ellipse([ex - r, ey - r, ex + r, ey + r], fill=(200, 40, 40)) 

708 

709 tile_cache: dict[Any, bytes] = {} 

710 

711 def _frame_bg( 

712 project_fn: Callable[[float, float], tuple[int, int]], 

713 min_lon: float, 

714 max_lon: float, 

715 min_lat: float, 

716 max_lat: float, 

717 ) -> Any: 

718 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = ( 

719 min_lon, 

720 min_lat, 

721 max_lon, 

722 max_lat, 

723 ) 

724 if high_res: 

725 fetch_lon_min, fetch_lat_min, fetch_lon_max, fetch_lat_max = ( 

726 _canvas_geo_bounds( 

727 project_fn, canvas_w, canvas_h, min_lon, max_lon, min_lat, max_lat 

728 ) 

729 ) 

730 bg = _make_tile_background( 

731 project_fn, 

732 fetch_lon_min, 

733 fetch_lon_max, 

734 fetch_lat_min, 

735 fetch_lat_max, 

736 canvas_w, 

737 canvas_h, 

738 openaip_key=_openaip_key, 

739 tile_cache=tile_cache, 

740 max_tiles=_max_tiles_canvas, 

741 ) 

742 if bg is None and high_res: 

743 bg = _make_tile_background( 

744 project_fn, 

745 min_lon, 

746 max_lon, 

747 min_lat, 

748 max_lat, 

749 canvas_w, 

750 canvas_h, 

751 openaip_key=_openaip_key, 

752 tile_cache=tile_cache, 

753 max_tiles=_max_tiles, 

754 ) 

755 return ( 

756 bg.copy() 

757 if bg is not None 

758 else Image.new("RGB", (canvas_w, canvas_h), _BG_COLOUR) 

759 ) 

760 

761 label = f"{date} {dep}{arr}".strip(" →").strip() 

762 

763 frames: list[Any] = [] 

764 durations: list[int] = [] 

765 accumulated: list[tuple[float, float]] = [] 

766 

767 for chunk_idx, chunk in enumerate(chunks): 

768 accumulated.extend(chunk) 

769 proj_result = _build_gif_projection( 

770 accumulated, canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad 

771 ) 

772 if proj_result is None: 

773 continue 

774 

775 project_fn, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result 

776 img = _frame_bg(project_fn, f_min_lon, f_max_lon, f_min_lat, f_max_lat) 

777 draw = ImageDraw.Draw(img) 

778 

779 for i in range(chunk_idx): 

780 draw_track(draw, chunks[i], _HISTORY_COLOUR, _line_hist, project_fn) 

781 draw_track(draw, chunk, _TRACK_COLOUR, _line_curr, project_fn) 

782 

783 if label: 

784 draw_shadow_text(draw, label, font) 

785 draw.text( 

786 (_txt_margin, canvas_h - _font_sz_sm - _txt_margin), 

787 f"{chunk_idx + 1} / {n_chunks}", 

788 fill=(80, 80, 100), 

789 font=font_sm, 

790 ) 

791 frames.append(img) 

792 durations.append(_GIF_STEP_MS) 

793 

794 if not frames: 

795 return _blank_gif() 

796 

797 # Final hold frame: full track, all chunks at equal weight 

798 proj_result_final = _build_gif_projection( 

799 list(all_coords), canvas_w=canvas_w, canvas_h=canvas_h, pad=_pad 

800 ) 

801 # guaranteed non-None: all_coords has ≥ 20 points 

802 project_final, (f_min_lon, f_min_lat, f_max_lon, f_max_lat) = proj_result_final # type: ignore[misc] 

803 img = _frame_bg(project_final, f_min_lon, f_max_lon, f_min_lat, f_max_lat) 

804 draw = ImageDraw.Draw(img) 

805 for chunk in chunks: 

806 draw_track(draw, chunk, _TRACK_COLOUR, _line_hist, project_final) 

807 if label: 

808 draw_shadow_text(draw, label, font) 

809 frames.append(img) 

810 durations.append(_GIF_HOLD_MS) 

811 

812 palette_src = frames[-1].quantize(colors=_q_colors, dither=Image.Dither.NONE) 

813 quantized: list[Any] = [ 

814 f.quantize(palette=palette_src, dither=Image.Dither.NONE) for f in frames 

815 ] 

816 buf = io.BytesIO() 

817 quantized[0].save( 

818 buf, 

819 format="GIF", 

820 save_all=True, 

821 append_images=quantized[1:], 

822 loop=0, 

823 duration=durations, 

824 optimize=True, 

825 ) 

826 return buf.getvalue() 

827 

828 

829@functools.lru_cache(maxsize=1) 

830def _load_aircraft_types() -> dict[str, tuple[str, str]]: 

831 """Return {type_designator: (manufacturer, model)} from aircraft_types.csv.""" 

832 path = os.path.join(os.path.dirname(__file__), "data", "aircraft_types.csv") 

833 result: dict[str, tuple[str, str]] = {} 

834 try: 

835 with open(path, newline="", encoding="utf-8") as f: 

836 for row in csv.DictReader(f): 

837 des = row.get("type_designator", "").strip().upper() 

838 mfr = row.get("manufacturer", "").strip() 

839 model = row.get("model", "").strip() 

840 if des: 

841 result[des] = (mfr, model) 

842 except OSError as exc: 

843 _log.warning("aircraft_types.csv not found: %s", exc) 

844 return result 

845 

846 

847def _load_aircraft_type_variants() -> list[tuple[str, str, str, str]]: 

848 """Return all (type_designator, full_name, manufacturer, model) tuples — one per CSV row. 

849 

850 Unlike _load_aircraft_types(), duplicate designators are preserved so 

851 the search endpoint can surface every variant (e.g. all PA-28-181 models 

852 that share the P28A ICAO code). manufacturer and model are kept separate 

853 so callers can pre-fill form fields for the exact selected variant rather 

854 than an arbitrary one sharing the same ICAO code. 

855 """ 

856 path = os.path.join(os.path.dirname(__file__), "data", "aircraft_types.csv") 

857 result: list[tuple[str, str, str, str]] = [] 

858 seen: set[tuple[str, str]] = set() 

859 try: 

860 with open(path, newline="", encoding="utf-8") as f: 

861 for row in csv.DictReader(f): 

862 des = row.get("type_designator", "").strip().upper() 

863 mfr = row.get("manufacturer", "").strip() 

864 model = row.get("model", "").strip() 

865 name = f"{mfr} {model}".strip() 

866 if des and (des, name) not in seen: 

867 result.append((des, name, mfr, model)) 

868 seen.add((des, name)) 

869 except OSError as exc: 

870 _log.warning("aircraft_types.csv not found: %s", exc) 

871 return result 

872 

873 

874@functools.lru_cache(maxsize=1) 

875def _load_aircraft_type_engine_data() -> dict[str, tuple[int, str]]: 

876 """Return {type_designator: (engine_count, engine_type)} from aircraft_types.csv. 

877 

878 For designators with multiple rows (variants), uses data from the first row. 

879 """ 

880 path = os.path.join(os.path.dirname(__file__), "data", "aircraft_types.csv") 

881 result: dict[str, tuple[int, str]] = {} 

882 try: 

883 with open(path, newline="", encoding="utf-8") as f: 

884 for row in csv.DictReader(f): 

885 des = row.get("type_designator", "").strip().upper() 

886 if not des or des in result: 

887 continue 

888 try: 

889 ec = int(row.get("engine_count", "1")) 

890 except ValueError: 

891 ec = 1 

892 et = row.get("engine_type", "").strip() 

893 result[des] = (ec, et) 

894 except OSError as exc: 

895 _log.warning("aircraft_types.csv not found: %s", exc) 

896 return result 

897 

898 

899def get_aircraft_type_engine_info(icao_code: str) -> tuple[int, str] | None: 

900 """Return (engine_count, engine_type) for the given ICAO code, or None.""" 

901 return _load_aircraft_type_engine_data().get(icao_code.strip().upper()) 

902 

903 

904def resolve_aircraft_type_icao(aircraft_type: str | None) -> str | None: 

905 """Return the matching ICAO type designator for *aircraft_type*, or None.""" 

906 if not aircraft_type: 

907 return None 

908 types = _load_aircraft_types() 

909 norm = aircraft_type.strip().upper() 

910 if norm in types: 

911 return norm 

912 # Try stripping hyphens and spaces (e.g. "PA-28" → "PA28") 

913 compact = norm.replace("-", "").replace(" ", "") 

914 if compact in types: 

915 return compact 

916 return None 

917 

918 

919@functools.lru_cache(maxsize=1) 

920def _load_airport_names() -> dict[str, str]: 

921 """Return {ICAO ident: airport name} for all airports in airports.csv.""" 

922 path = os.path.join(os.path.dirname(__file__), "data", "airports.csv") 

923 result: dict[str, str] = {} 

924 try: 

925 with open(path, newline="", encoding="utf-8") as f: 

926 for row in csv.DictReader(f): 

927 ident = row.get("ident", "").strip() 

928 name = row.get("name", "").strip() 

929 if ident and name: 

930 result[ident] = name 

931 except OSError as exc: 

932 _log.warning("airports.csv not found: %s", exc) 

933 return result 

934 

935 

936_alog = logging.getLogger("openhangar.activity") 

937 

938 

939def _sl(value: object) -> str: 

940 """Sanitize a value for log output — strips CR/LF to prevent log injection (CWE-117).""" 

941 return str(value).replace("\r\n", "").replace("\n", "").replace("\r", "") 

942 

943 

944def activity(event: str, **fields: object) -> None: 

945 """Emit a structured [ACTIVITY] log entry with user_id and ip automatically included.""" 

946 from flask import request, session # noqa: PLC0415 

947 

948 uid = session.get("user_id", "") 

949 ip = request.remote_addr or "" 

950 parts = [f"[ACTIVITY] {event}", f"user_id={_sl(uid)}", f"ip={_sl(ip)}"] 

951 parts.extend(f"{k}={_sl(v)}" for k, v in fields.items()) 

952 _alog.info(" ".join(parts)) 

953 

954 

955def login_required(f: Callable[..., Any]) -> Callable[..., Any]: 

956 """Redirect unauthenticated users to the login page.""" 

957 

958 @wraps(f) 

959 def decorated(*args: Any, **kwargs: Any) -> Any: 

960 if not session.get("user_id"): 

961 return redirect(url_for("auth.login")) 

962 return f(*args, **kwargs) 

963 

964 return decorated 

965 

966 

967def require_instance_admin(f: Callable[..., Any]) -> Callable[..., Any]: 

968 """Abort 403 unless the current user is the instance admin.""" 

969 

970 @wraps(f) 

971 def decorated(*args: Any, **kwargs: Any) -> Any: 

972 from models import User, db 

973 

974 user_id = session.get("user_id") 

975 if not user_id: 

976 return redirect(url_for("auth.login")) 

977 user = db.session.get(User, user_id) 

978 if not user or not user.is_instance_admin: 

979 abort(403) 

980 return f(*args, **kwargs) 

981 

982 return decorated 

983 

984 

985def current_user_role() -> str | None: 

986 """Return the Role of the current user in their tenant, or None.""" 

987 from models import TenantUser 

988 

989 user_id = session.get("user_id") 

990 if not user_id: 

991 return None 

992 tu = TenantUser.query.filter_by(user_id=user_id).first() 

993 return tu.role if tu else None 

994 

995 

996def require_role(*roles: str) -> Callable[..., Any]: 

997 """Decorator: abort 403 if the current user's role is not in *roles*.""" 

998 

999 def decorator(f: Callable[..., Any]) -> Callable[..., Any]: 

1000 @wraps(f) 

1001 def decorated(*args: Any, **kwargs: Any) -> Any: 

1002 if current_user_role() not in roles: 

1003 abort(403) 

1004 return f(*args, **kwargs) 

1005 

1006 return decorated 

1007 

1008 return decorator 

1009 

1010 

1011def require_pilot_access(f: Callable[..., Any]) -> Callable[..., Any]: 

1012 """Decorator: abort 403 unless the user has pilot access. 

1013 

1014 Pilot access is granted by ADMIN/OWNER/PILOT/STUDENT/INSTRUCTOR role, 

1015 or by the per-user is_pilot capability flag. 

1016 """ 

1017 

1018 @wraps(f) 

1019 def decorated(*args: Any, **kwargs: Any) -> Any: 

1020 from models import Role, User, db 

1021 

1022 role = current_user_role() 

1023 if role in (Role.ADMIN, Role.OWNER, Role.PILOT, Role.STUDENT, Role.INSTRUCTOR): 

1024 return f(*args, **kwargs) 

1025 uid = session.get("user_id") 

1026 if uid: 

1027 user = db.session.get(User, uid) 

1028 if user and user.is_pilot: 

1029 return f(*args, **kwargs) 

1030 return abort(403) 

1031 

1032 return decorated 

1033 

1034 

1035def require_maint_access(f: Callable[..., Any]) -> Callable[..., Any]: 

1036 """Decorator: abort 403 unless the user has maintenance access. 

1037 

1038 Maintenance access is granted by ADMIN/OWNER/MAINTENANCE/INSTRUCTOR role, 

1039 or by the per-user is_maintenance capability flag. 

1040 """ 

1041 

1042 @wraps(f) 

1043 def decorated(*args: Any, **kwargs: Any) -> Any: 

1044 from models import Role, User, db 

1045 

1046 role = current_user_role() 

1047 if role in (Role.ADMIN, Role.OWNER, Role.MAINTENANCE, Role.INSTRUCTOR): 

1048 return f(*args, **kwargs) 

1049 uid = session.get("user_id") 

1050 if uid: 

1051 user = db.session.get(User, uid) 

1052 if user and user.is_maintenance: 

1053 return f(*args, **kwargs) 

1054 return abort(403) 

1055 

1056 return decorated 

1057 

1058 

1059def user_can_access_aircraft(aircraft_id: int) -> bool: 

1060 """Return True when the current user may access this aircraft. 

1061 

1062 ADMIN and OWNER bypass the check entirely. Other roles need either a 

1063 UserAllAircraftAccess row (all-planes grant) or a per-aircraft 

1064 UserAircraftAccess row. 

1065 """ 

1066 from models import Role, TenantUser, UserAircraftAccess, UserAllAircraftAccess 

1067 

1068 role = current_user_role() 

1069 if role in (Role.ADMIN, Role.OWNER): 

1070 return True 

1071 uid = session.get("user_id") 

1072 if not uid: 

1073 return False 

1074 tu = TenantUser.query.filter_by(user_id=uid).first() 

1075 if ( 

1076 tu 

1077 and UserAllAircraftAccess.query.filter_by( 

1078 user_id=uid, tenant_id=tu.tenant_id 

1079 ).first() 

1080 ): 

1081 return True 

1082 return ( 

1083 UserAircraftAccess.query.filter_by(user_id=uid, aircraft_id=aircraft_id).first() 

1084 is not None 

1085 ) 

1086 

1087 

1088def accessible_aircraft(tenant_id: int) -> Any: 

1089 """Return a query of Aircraft the current user is allowed to see. 

1090 

1091 ADMIN and OWNER see every aircraft in the tenant. A user with a 

1092 UserAllAircraftAccess row for the tenant also sees all aircraft. 

1093 Other roles see only aircraft granted via UserAircraftAccess. 

1094 """ 

1095 from models import Aircraft, Role, UserAircraftAccess, UserAllAircraftAccess 

1096 

1097 base = Aircraft.query.filter_by(tenant_id=tenant_id).order_by(Aircraft.registration) 

1098 role = current_user_role() 

1099 if role in (Role.ADMIN, Role.OWNER): 

1100 return base 

1101 uid = session.get("user_id") 

1102 if not uid: 

1103 from sqlalchemy import false 

1104 

1105 return base.filter(false()) 

1106 if UserAllAircraftAccess.query.filter_by(user_id=uid, tenant_id=tenant_id).first(): 

1107 return base 

1108 ids = [ 

1109 row.aircraft_id 

1110 for row in ( 

1111 UserAircraftAccess.query.filter_by(user_id=uid) 

1112 .with_entities(UserAircraftAccess.aircraft_id) 

1113 .all() 

1114 ) 

1115 ] 

1116 if not ids: 

1117 from sqlalchemy import false 

1118 

1119 return base.filter(false()) 

1120 return base.filter(Aircraft.id.in_(ids)) 

1121 

1122 

1123def compute_aircraft_statuses( 

1124 aircraft_list: Any, triggers: Any, hobbs_by_id: Any 

1125) -> dict[int, str]: 

1126 """Return {aircraft_id: 'grounded'|'overdue'|'due_soon'|'ok'} for every aircraft. 

1127 

1128 Grounded (expired insurance or unresolved grounding snag) takes priority. 

1129 Among maintenance: overdue > due_soon > ok. 

1130 Insurance expiring soon maps to due_soon. 

1131 """ 

1132 by_aircraft = defaultdict(list) 

1133 for t in triggers: 

1134 by_aircraft[t.aircraft_id].append(t) 

1135 

1136 result = {} 

1137 for ac in aircraft_list: 

1138 if ac.is_grounded: 

1139 result[ac.id] = "grounded" 

1140 continue 

1141 hobbs = hobbs_by_id.get(ac.id) 

1142 statuses = [t.status(hobbs) for t in by_aircraft.get(ac.id, [])] 

1143 ins = ac.insurance_status 

1144 if ins == "expiring_soon": 

1145 statuses.append("due_soon") 

1146 if "overdue" in statuses: 

1147 result[ac.id] = "overdue" 

1148 elif "due_soon" in statuses: 

1149 result[ac.id] = "due_soon" 

1150 else: 

1151 result[ac.id] = "ok" 

1152 return result