Coverage for app/pilots/logbook_import.py: 100%

438 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1"""Pilot logbook import — parsing, normalisation, mapping, and execution.""" 

2 

3from __future__ import annotations 

4 

5import csv 

6import hashlib 

7import io 

8import json 

9import re 

10from dataclasses import dataclass, field 

11from datetime import date, datetime, time, timedelta 

12from typing import Any, Callable 

13 

14import openpyxl # pyright: ignore[reportMissingImports] 

15from flask_babel import gettext as _ # pyright: ignore[reportMissingImports] 

16 

17# ── Constants ───────────────────────────────────────────────────────────────── 

18 

19TARGET_FIELDS: list[str] = [ 

20 "date", 

21 "aircraft_type", 

22 "aircraft_registration", 

23 "departure_place", 

24 "departure_time", 

25 "arrival_place", 

26 "arrival_time", 

27 "pic_name", 

28 "night_time", 

29 "instrument_time", 

30 "cross_country", 

31 "landings_day", 

32 "landings_night", 

33 "single_pilot_se", 

34 "single_pilot_me", 

35 "multi_pilot", 

36 "function_pic", 

37 "function_copilot", 

38 "function_dual", 

39 "function_instructor", 

40 "remarks", 

41 # virtual target — used for import validation only, not stored 

42 "total_flight_time_check", 

43] 

44 

45# Normalised source column name → target field. 

46# Positional disambiguation suffixes (_2, _3 …) are applied before lookup, 

47# so "time" is the first TIME column (departure) and "time_2" is the second (arrival). 

48_ALIASES: dict[str, str] = { 

49 # EASA standard logbook column names (normalised) 

50 "date dd/mm/yy": "date", 

51 "date": "date", 

52 "aircraft type": "aircraft_type", 

53 "type": "aircraft_type", 

54 "aircraft registration number": "aircraft_registration", 

55 "registration": "aircraft_registration", 

56 "reg": "aircraft_registration", 

57 "from": "departure_place", 

58 "departure": "departure_place", 

59 "dep": "departure_place", 

60 "time": "departure_time", # first TIME → departure 

61 "time_2": "arrival_time", # second TIME → arrival 

62 "departure time": "departure_time", 

63 "arrival time": "arrival_time", 

64 "to": "arrival_place", 

65 "arrival": "arrival_place", 

66 "arr": "arrival_place", 

67 "pic name": "pic_name", 

68 "pic name (if not student pilot)": "pic_name", 

69 "captain": "pic_name", 

70 # Landings group — first DAY / NIGHT pair 

71 "day": "landings_day", 

72 "night": "landings_night", 

73 # Operational conditions group — second DAY / NIGHT pair (positional suffix) 

74 "day_2": "ignore", # cross-country day — not a logbook field 

75 "night_2": "night_time", # night flying time 

76 # Aircraft category 

77 "se": "single_pilot_se", 

78 "single engine": "single_pilot_se", 

79 "single pilot se": "single_pilot_se", 

80 "me": "single_pilot_me", 

81 "multi engine": "single_pilot_me", 

82 "single pilot me": "single_pilot_me", 

83 "multi pilot": "multi_pilot", 

84 "mp": "multi_pilot", 

85 # Pilot function 

86 "pic": "function_pic", 

87 "p1": "function_pic", 

88 "co-pic": "function_copilot", 

89 "co-pilot": "function_copilot", 

90 "copilot": "function_copilot", 

91 "p2": "function_copilot", 

92 "dual received": "function_dual", 

93 "dual": "function_dual", 

94 "student": "function_dual", 

95 "instructor": "function_instructor", 

96 "fi": "function_instructor", 

97 # Cross-country time 

98 "cross-country": "cross_country", 

99 "cross country": "cross_country", 

100 "x-country": "cross_country", 

101 "xc": "cross_country", 

102 # Total flight time — imported only to validate against computed sum 

103 "total flight time": "total_flight_time_check", 

104 # Catch-all 

105 "total": "ignore", 

106 "no. istr. appr.": "ignore", 

107 "ifr approaches": "ignore", 

108 "page": "ignore", 

109 "line": "ignore", 

110 "remarks": "remarks", 

111 "notes": "remarks", 

112 "comments": "remarks", 

113 "instrument time": "instrument_time", 

114 "ifr": "instrument_time", 

115 "instrument": "instrument_time", 

116 "night time": "night_time", 

117 # Group-prefixed column names (e.g. Belgian EASA logbook with a span header row) 

118 "departure & arrival from": "departure_place", 

119 "departure & arrival time": "departure_time", 

120 "departure & arrival time_2": "arrival_time", 

121 "departure & arrival to": "arrival_place", 

122 "aircraft category se": "single_pilot_se", 

123 "aircraft category me": "single_pilot_me", 

124 "operational conditions cross-country": "cross_country", 

125 "operational conditions day": "ignore", 

126 "operational conditions night": "night_time", 

127 "pilot function pic": "function_pic", 

128 "pilot function co-pic": "function_copilot", 

129 "pilot function dual received": "function_dual", 

130 "page subtotals total flight time": "ignore", 

131 "page subtotals total flight time_2": "ignore", 

132 "page subtotals pic": "ignore", 

133 "page subtotals dual": "ignore", 

134 "page subtotals night": "ignore", 

135 "page subtotals landings – day": "ignore", 

136 "page subtotals landings – night": "ignore", 

137 "page subtotals formated date": "ignore", 

138 "page subtotals formated type": "ignore", 

139 "page subtotals days since flight": "ignore", 

140 "page subtotals daytime duration": "ignore", 

141 "page subtotals non pic or dual": "ignore", 

142 "formated date": "ignore", 

143 "formated type": "ignore", 

144 "days since flight": "ignore", 

145 "daytime duration": "ignore", 

146 "non pic or dual": "ignore", 

147 # Landings group (group-prefixed) 

148 "landings day": "landings_day", 

149 "landings night": "landings_night", 

150 # Aircraft type (multiline cell name normalised to single space) 

151 "aircraft type name, model, variant": "aircraft_type", 

152} 

153 

154# ── Data structures ─────────────────────────────────────────────────────────── 

155 

156 

157@dataclass 

158class ParsedFile: 

159 """Result of parsing an uploaded logbook file.""" 

160 

161 norm_cols: list[str] # normalised + disambiguated column keys 

162 raw_cols: list[str] # original column labels (for display) 

163 header_row_index: int # 0-based row index of the detected header 

164 data_rows: list[list[Any]] # all rows after the header (including subtotals) 

165 fingerprint: str # SHA-256 of norm_cols 

166 

167 

168@dataclass 

169class MappingProposal: 

170 """Proposed column mapping and how it was derived.""" 

171 

172 mapping: dict[str, str] # norm_col_key → target_field or "ignore" 

173 match_type: str # "exact", "fuzzy", "alias" 

174 fuzzy_score: float = 0.0 # 0.0–1.0, only meaningful for "fuzzy" 

175 matched_mapping_id: int | None = None 

176 

177 

178@dataclass 

179class ImportResult: 

180 """Summary returned after executing an import.""" 

181 

182 imported: int = 0 

183 subtotals: int = 0 

184 skipped: list[tuple[int, str]] = field(default_factory=list) # (row_num, reason) 

185 # (row_num, source_col, target_field, repr(raw)) — non-empty cells that couldn't parse 

186 parse_warnings: list[tuple[int, str, str, str]] = field(default_factory=list) 

187 # (row_num, source_total, computed_total) — rows where total ≠ sum of components 

188 total_mismatch_warnings: list[tuple[int, float, float]] = field( 

189 default_factory=list 

190 ) 

191 has_opening_balance: bool = False 

192 

193 

194# ── Normalisation ───────────────────────────────────────────────────────────── 

195 

196 

197def _norm(text: str) -> str: 

198 """Strip, collapse whitespace, lower-case.""" 

199 return re.sub(r"\s+", " ", str(text).strip().lower()) 

200 

201 

202def _disambiguate(names: list[str]) -> list[str]: 

203 """Append _2, _3 … to duplicate names to make them unique.""" 

204 seen: dict[str, int] = {} 

205 result: list[str] = [] 

206 for n in names: 

207 if n in seen: 

208 seen[n] += 1 

209 result.append(f"{n}_{seen[n]}") 

210 else: 

211 seen[n] = 1 

212 result.append(n) 

213 return result 

214 

215 

216def _fingerprint(norm_cols: list[str]) -> str: 

217 payload = json.dumps(norm_cols, ensure_ascii=False, sort_keys=False) 

218 return hashlib.sha256(payload.encode()).hexdigest() 

219 

220 

221# ── Header detection ────────────────────────────────────────────────────────── 

222 

223 

224def _is_header_row(row: list[Any]) -> bool: 

225 """True if ≥ 50 % of non-empty cells are non-numeric strings and ≥ 4 non-empty.""" 

226 non_empty = [c for c in row if c is not None and str(c).strip()] 

227 if len(non_empty) < 4: 

228 return False 

229 string_like = [ 

230 c for c in non_empty if isinstance(c, str) and not _is_numeric_str(str(c)) 

231 ] 

232 return len(string_like) / len(non_empty) >= 0.5 

233 

234 

235def _is_numeric_str(s: str) -> bool: 

236 try: 

237 float(s) 

238 return True 

239 except ValueError: 

240 return False 

241 

242 

243def _header_alias_score(row: list[Any]) -> int: 

244 """Count how many cells in *row* match a known alias.""" 

245 return sum(1 for c in row if c is not None and _norm(str(c)) in _ALIASES) 

246 

247 

248def _find_header_row(rows: list[list[Any]], max_scan: int = 20) -> int | None: 

249 """Return 0-based index of the best header row within the first max_scan rows. 

250 

251 Many logbook templates have a group-label row (e.g. "DEPARTURE & ARRIVAL", 

252 "LANDINGS") above the actual column-header row. Both pass _is_header_row, 

253 but the actual header row has far more alias matches. We score every 

254 candidate and return the one with the highest score; ties go to the earlier 

255 row. If no alias matches are found we fall back to the first text-like row. 

256 """ 

257 best_idx: int | None = None 

258 best_score = -1 

259 first_text_row: int | None = None 

260 

261 for i, row in enumerate(rows[:max_scan]): 

262 if not _is_header_row(row): 

263 continue 

264 if first_text_row is None: 

265 first_text_row = i 

266 score = _header_alias_score(row) 

267 if score > best_score: 

268 best_score = score 

269 best_idx = i 

270 

271 return best_idx if (best_idx is not None and best_score > 0) else first_text_row 

272 

273 

274def _trim_trailing_empty_cols( 

275 all_rows: list[list[Any]], max_scan: int = 50 

276) -> list[list[Any]]: 

277 """Trim columns beyond the rightmost non-empty value in the first max_scan rows. 

278 

279 Excel templates often declare thousands of formatted-but-empty columns. 

280 Without trimming, every empty cell becomes a separate mapping entry. 

281 """ 

282 max_col = 0 

283 for row in all_rows[:max_scan]: 

284 for j in range(len(row) - 1, -1, -1): 

285 if row[j] is not None and str(row[j]).strip(): 

286 if j + 1 > max_col: 

287 max_col = j + 1 

288 break 

289 if max_col == 0: 

290 return ( 

291 all_rows # pragma: no cover — blank file rejected by header detection first 

292 ) 

293 return [row[:max_col] for row in all_rows] 

294 

295 

296# ── Group-header detection ──────────────────────────────────────────────────── 

297 

298 

299def _merge_label_map(ws: Any) -> dict[tuple[int, int], str]: 

300 """Return {(0-based row, 0-based col): group_label} for every merged region. 

301 

302 The label is the value of the top-left cell of each merged range. 

303 Every cell in the range gets the same label so downstream code can do a 

304 simple lookup without forward-fill. 

305 """ 

306 result: dict[tuple[int, int], str] = {} 

307 for mc in ws.merged_cells.ranges: 

308 val = ws.cell(mc.min_row, mc.min_col).value 

309 if val is None or not str(val).strip(): 

310 continue 

311 label = str(val).strip() 

312 for r in range(mc.min_row - 1, mc.max_row): 

313 for c in range(mc.min_col - 1, mc.max_col): 

314 result[(r, c)] = label 

315 return result 

316 

317 

318def _group_labels_from_map( 

319 merge_map: dict[tuple[int, int], str], row_idx: int, width: int 

320) -> list[str]: 

321 """Return per-column group labels for *row_idx* using exact merge metadata.""" 

322 return [merge_map.get((row_idx, c), "") for c in range(width)] 

323 

324 

325def _group_labels_heuristic(row: list[Any], width: int) -> list[str] | None: 

326 """Fallback for CSV: forward-fill a sparse row to infer group labels. 

327 

328 Requires at least two non-empty values with a gap between them (so a single 

329 spanning title cell does not fire). Returns None if the pattern is absent. 

330 """ 

331 padded: list[Any] = list(row[:width]) + [None] * max(0, width - len(row)) 

332 

333 prev_nonempty_idx: int | None = None 

334 has_span = False 

335 for i, val in enumerate(padded): 

336 if val is not None and str(val).strip(): 

337 if prev_nonempty_idx is not None and i > prev_nonempty_idx + 1: 

338 has_span = True 

339 break 

340 prev_nonempty_idx = i 

341 if not has_span: 

342 return None 

343 

344 result: list[str] = [] 

345 current = "" 

346 for val in padded: 

347 if val is not None and str(val).strip(): 

348 current = str(val).strip() 

349 result.append(current) 

350 return result 

351 

352 

353def _apply_group_labels(group_labels: list[str], raw_header: list[str]) -> list[str]: 

354 """Prepend each non-empty group label to the corresponding column header.""" 

355 result = [] 

356 for i, col in enumerate(raw_header): 

357 label = group_labels[i] if i < len(group_labels) else "" 

358 if label and col: 

359 result.append(f"{label} {col}") 

360 elif label: 

361 result.append(label) 

362 else: 

363 result.append(col) 

364 return result 

365 

366 

367# ── File parsing ────────────────────────────────────────────────────────────── 

368 

369 

370def parse_file(data: bytes, filename: str) -> ParsedFile: 

371 """Parse bytes from an uploaded file into a ParsedFile. 

372 

373 Raises ValueError with a user-friendly message on format errors. 

374 """ 

375 ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" 

376 if ext in ("xlsx", "xls"): 

377 return _parse_excel(data, filename) 

378 if ext == "csv": 

379 return _parse_csv(data, filename) 

380 raise ValueError( 

381 f"Unsupported file format: .{ext} — please upload a .csv or .xlsx file." 

382 ) 

383 

384 

385# English-only Excel tab names used to identify the logbook sheet. 

386# Locale-specific equivalents live in the translation catalogue — look for 

387# entries marked "Excel tab name" in the translator comments. 

388_PREFERRED_SHEET_NAMES_EN: frozenset[str] = frozenset( 

389 { 

390 "logbook", 

391 "log", 

392 "flights", 

393 "flight log", 

394 "flights log", 

395 "journal", 

396 } 

397) 

398 

399 

400def _preferred_sheet_names() -> frozenset[str]: 

401 """Return preferred sheet names merged with their active-locale translations. 

402 

403 All _() calls below are extracted by pybabel; locale-specific Excel tab 

404 names belong in the translation catalogue (see "Excel tab name" entries). 

405 """ 

406 t_logbook = _("logbook") 

407 t_log = _("log") 

408 t_flights = _("flights") 

409 t_flight_log = _("flight log") 

410 t_flights_log = _("flights log") 

411 t_journal = _("journal") 

412 translated = frozenset( 

413 {t_logbook, t_log, t_flights, t_flight_log, t_flights_log, t_journal} 

414 ) 

415 return _PREFERRED_SHEET_NAMES_EN | frozenset(s.strip().lower() for s in translated) 

416 

417 

418def _pick_best_excel_sheet(wb: Any) -> tuple[str, list[list[Any]]]: 

419 """Return (sheet_name, all_rows) for the most likely logbook sheet. 

420 

421 Prefers sheets whose name matches a known logbook keyword (fast path, reads 

422 only the chosen sheet). When no name matches, reads every sheet and scores 

423 by alias matches in the detected header, returning the highest-scoring sheet 

424 together with its rows. Falls back to the first sheet when scoring is 

425 inconclusive. 

426 

427 Rows are returned here because openpyxl read-only worksheets use a one-shot 

428 generator — calling iter_rows() twice on the same worksheet yields nothing on 

429 the second call. 

430 """ 

431 preferred = _preferred_sheet_names() 

432 for ws in wb.worksheets: 

433 if ws.title.strip().lower() in preferred: 

434 return ws.title, [list(row) for row in ws.iter_rows(values_only=True)] 

435 

436 best_name = wb.worksheets[0].title if wb.worksheets else "" 

437 best_score = -1 

438 best_rows: list[list[Any]] = [] 

439 for ws in wb.worksheets: 

440 rows = [list(row) for row in ws.iter_rows(values_only=True)] 

441 trimmed = _trim_trailing_empty_cols(list(rows)) 

442 hi = _find_header_row(trimmed) 

443 score = _header_alias_score(trimmed[hi]) if hi is not None else 0 

444 if score > best_score: 

445 best_score = score 

446 best_name = ws.title 

447 best_rows = rows 

448 return best_name, best_rows 

449 

450 

451def _parse_excel(data: bytes, filename: str) -> ParsedFile: 

452 try: 

453 wb_ro = openpyxl.load_workbook(io.BytesIO(data), read_only=True, data_only=True) 

454 except Exception as exc: 

455 raise ValueError(f"Could not open Excel file: {exc}") from exc 

456 sheet_name, all_rows = _pick_best_excel_sheet(wb_ro) 

457 wb_ro.close() 

458 

459 # Second load (non-read-only) to access merged cell ranges for exact group labels. 

460 excel_merge_map: dict[tuple[int, int], str] | None = None 

461 try: 

462 wb_full = openpyxl.load_workbook(io.BytesIO(data), data_only=True) 

463 excel_merge_map = _merge_label_map(wb_full[sheet_name]) 

464 wb_full.close() 

465 except Exception: # noqa: S110 # fall back to heuristic in _build_parsed_file 

466 pass 

467 

468 return _build_parsed_file(all_rows, filename, excel_merge_map=excel_merge_map) 

469 

470 

471def _parse_csv(data: bytes, filename: str) -> ParsedFile: 

472 try: 

473 text = data.decode("utf-8-sig") 

474 except UnicodeDecodeError: 

475 text = data.decode("latin-1") 

476 

477 sample = text[:4096] 

478 try: 

479 dialect = csv.Sniffer().sniff(sample) 

480 except csv.Error: 

481 dialect = csv.excel 

482 

483 reader = csv.reader(io.StringIO(text), dialect) 

484 all_rows: list[list[Any]] = list(reader) 

485 return _build_parsed_file(all_rows, filename) 

486 

487 

488def _build_parsed_file( 

489 all_rows: list[list[Any]], 

490 filename: str, 

491 excel_merge_map: dict[tuple[int, int], str] | None = None, 

492) -> ParsedFile: 

493 all_rows = _trim_trailing_empty_cols(all_rows) 

494 header_idx = _find_header_row(all_rows) 

495 if header_idx is None: 

496 raise ValueError( 

497 "Could not detect a header row in this file. " 

498 "Make sure the file contains column names." 

499 ) 

500 

501 raw_header = [str(c).strip() if c is not None else "" for c in all_rows[header_idx]] 

502 

503 # Prepend group labels from the row above the header, if one exists 

504 if header_idx > 0: 

505 width = len(raw_header) 

506 if excel_merge_map is not None: 

507 # Excel: exact boundaries from merged cell metadata 

508 _gl = _group_labels_from_map(excel_merge_map, header_idx - 1, width) 

509 group_labels: list[str] | None = _gl if any(_gl) else None 

510 else: 

511 # CSV: heuristic forward-fill (merged cell info unavailable) 

512 group_labels = _group_labels_heuristic(all_rows[header_idx - 1], width) 

513 if group_labels is not None: 

514 raw_header = _apply_group_labels(group_labels, raw_header) 

515 

516 norm_raw = [_norm(c) for c in raw_header] 

517 norm_cols = _disambiguate(norm_raw) 

518 data_rows = all_rows[header_idx + 1 :] 

519 

520 return ParsedFile( 

521 norm_cols=norm_cols, 

522 raw_cols=raw_header, 

523 header_row_index=header_idx, 

524 data_rows=data_rows, 

525 fingerprint=_fingerprint(norm_cols), 

526 ) 

527 

528 

529# ── Mapping proposal ────────────────────────────────────────────────────────── 

530 

531 

532def propose_mapping( 

533 parsed: ParsedFile, 

534 existing_mappings: list[Any], # list[LogbookImportMapping] 

535) -> MappingProposal: 

536 """Return the best mapping proposal for *parsed*, checking saved mappings first.""" 

537 # 1. Exact fingerprint match 

538 for m in existing_mappings: 

539 if m.source_fingerprint == parsed.fingerprint: 

540 return MappingProposal( 

541 mapping=json.loads(m.column_mapping), 

542 match_type="exact", 

543 matched_mapping_id=m.id, 

544 ) 

545 

546 # 2. Fuzzy match — best overlap among saved mappings 

547 best_score = 0.0 

548 best_m = None 

549 new_set = set(parsed.norm_cols) 

550 for m in existing_mappings: 

551 saved_cols: list[str] = json.loads(m.source_columns) 

552 saved_set = set(saved_cols) 

553 if not saved_set: 

554 continue 

555 overlap = len(new_set & saved_set) 

556 score = overlap / max(len(new_set), len(saved_set)) 

557 if score > best_score: 

558 best_score = score 

559 best_m = m 

560 

561 if best_m is not None and best_score >= 0.6: 

562 saved_map: dict[str, str] = json.loads(best_m.column_mapping) 

563 # Build a mapping for the new columns, falling back to alias for unmatched ones 

564 merged = _alias_mapping(parsed.norm_cols) 

565 for col in parsed.norm_cols: 

566 if col in saved_map: 

567 merged[col] = saved_map[col] 

568 return MappingProposal( 

569 mapping=merged, 

570 match_type="fuzzy", 

571 fuzzy_score=best_score, 

572 matched_mapping_id=best_m.id, 

573 ) 

574 

575 # 3. Alias-only auto-mapping 

576 return MappingProposal( 

577 mapping=_alias_mapping(parsed.norm_cols), 

578 match_type="alias", 

579 ) 

580 

581 

582def _alias_mapping(norm_cols: list[str]) -> dict[str, str]: 

583 """Apply the built-in alias table; unknown columns default to 'ignore'.""" 

584 result: dict[str, str] = {} 

585 for col in norm_cols: 

586 result[col] = _ALIASES.get(col, "ignore") 

587 return result 

588 

589 

590# ── Subtotal detection ──────────────────────────────────────────────────────── 

591 

592 

593def _is_subtotal_row(row: list[Any], date_col_idx: int | None) -> bool: 

594 if date_col_idx is None: 

595 return False 

596 if date_col_idx >= len(row): 

597 return True 

598 val = row[date_col_idx] 

599 if isinstance(val, timedelta): 

600 return True 

601 if val is None or (isinstance(val, str) and not val.strip()): 

602 return True 

603 if isinstance(val, str) and "total" in val.lower(): 

604 return True 

605 return False 

606 

607 

608# ── Value parsing ───────────────────────────────────────────────────────────── 

609 

610 

611def parse_date_value(val: Any) -> date | None: 

612 if isinstance(val, datetime): 

613 return val.date() 

614 if isinstance(val, date): 

615 return val 

616 if isinstance(val, (int, float)): 

617 # Excel serial date number — openpyxl returns datetime; guard anyway 

618 return None 

619 if not isinstance(val, str): 

620 return None 

621 s = val.strip() 

622 if not s: 

623 return None 

624 for fmt in ("%d/%m/%y", "%d/%m/%Y", "%Y-%m-%d", "%m/%d/%Y", "%d-%m-%Y"): 

625 try: 

626 return datetime.strptime(s, fmt).date() 

627 except ValueError: 

628 pass 

629 return None 

630 

631 

632def parse_time_value(val: Any) -> time | None: 

633 """Parse a time-of-day value (HH:MM or Python time).""" 

634 if isinstance(val, time): 

635 return val 

636 if isinstance(val, datetime): 

637 return val.time() 

638 if not isinstance(val, str): 

639 return None 

640 s = val.strip() 

641 if not s: 

642 return None 

643 m = re.match(r"^(\d{1,2}):(\d{2})$", s) 

644 if m: 

645 try: 

646 return time(int(m.group(1)), int(m.group(2))) 

647 except ValueError: 

648 return None 

649 return None 

650 

651 

652def parse_duration_value(val: Any) -> float | None: 

653 """Parse a duration into decimal hours (e.g. time(0,42) → 0.7, '1:24' → 1.4).""" 

654 if isinstance(val, timedelta): 

655 hours = val.total_seconds() / 3600 

656 return round(hours, 1) if hours >= 0 else None 

657 if isinstance(val, time): 

658 return round(val.hour + val.minute / 60, 1) 

659 if isinstance(val, datetime): 

660 # Excel sometimes returns a dummy date + the time-of-day 

661 t = val.time() 

662 return round(t.hour + t.minute / 60, 1) 

663 if isinstance(val, (int, float)): 

664 if val < 0: 

665 return None 

666 return round(float(val), 1) 

667 if isinstance(val, str): 

668 s = val.strip() 

669 if not s: 

670 return None 

671 m = re.match(r"^(\d+):(\d{2})$", s) 

672 if m: 

673 return round(int(m.group(1)) + int(m.group(2)) / 60, 1) 

674 try: 

675 v = float(s) 

676 return round(v, 1) if v >= 0 else None 

677 except ValueError: 

678 return None 

679 return None 

680 

681 

682def parse_int_value(val: Any) -> int | None: 

683 if isinstance(val, int): 

684 return val if val >= 0 else None 

685 if isinstance(val, float): 

686 return int(val) if val >= 0 else None 

687 if isinstance(val, str): 

688 s = val.strip() 

689 if not s: 

690 return None 

691 try: 

692 return int(float(s)) 

693 except ValueError: 

694 return None 

695 return None 

696 

697 

698def _is_nonempty(val: Any) -> bool: 

699 """Return True when val carries a real value (not None, not blank string).""" 

700 if val is None: 

701 return False 

702 if isinstance(val, str): 

703 return bool(val.strip()) 

704 return True 

705 

706 

707# Map target field → (human-readable type name, parser function) 

708_FIELD_TYPE: dict[str, tuple[str, Callable[[Any], Any]]] = { 

709 "date": ("date", parse_date_value), 

710 "departure_time": ("time", parse_time_value), 

711 "arrival_time": ("time", parse_time_value), 

712 "night_time": ("duration", parse_duration_value), 

713 "instrument_time": ("duration", parse_duration_value), 

714 "cross_country": ("duration", parse_duration_value), 

715 "total_flight_time_check": ("duration", parse_duration_value), 

716 "single_pilot_se": ("duration", parse_duration_value), 

717 "single_pilot_me": ("duration", parse_duration_value), 

718 "multi_pilot": ("duration", parse_duration_value), 

719 "function_pic": ("duration", parse_duration_value), 

720 "function_copilot": ("duration", parse_duration_value), 

721 "function_dual": ("duration", parse_duration_value), 

722 "function_instructor": ("duration", parse_duration_value), 

723 "landings_day": ("integer", parse_int_value), 

724 "landings_night": ("integer", parse_int_value), 

725} 

726 

727_HINT_SAMPLE_ROWS = 5 

728 

729 

730def type_hints(parsed: ParsedFile, mapping: dict[str, str]) -> dict[str, str]: 

731 """Return {col: hint_text} for columns where sample data doesn't match the proposed type. 

732 

733 Samples up to _HINT_SAMPLE_ROWS rows. Returns a hint when non-empty values are 

734 present but any of them fail to parse — indicating a likely mapping mismatch. 

735 """ 

736 col_index = {col: i for i, col in enumerate(parsed.norm_cols)} 

737 hints: dict[str, str] = {} 

738 for col, target in mapping.items(): 

739 if target not in _FIELD_TYPE: 

740 continue 

741 type_name, parser = _FIELD_TYPE[target] 

742 idx = col_index.get(col) 

743 if idx is None: 

744 continue 

745 sample = [ 

746 row[idx] 

747 for row in parsed.data_rows[:_HINT_SAMPLE_ROWS] 

748 if idx < len(row) and _is_nonempty(row[idx]) 

749 ] 

750 if not sample: 

751 continue 

752 failed = [v for v in sample if parser(v) is None] 

753 if failed: 

754 example = str(failed[0])[:30] 

755 hints[col] = ( 

756 f"Sample data doesn't look like a {type_name} (e.g. {example!r})" 

757 ) 

758 return hints 

759 

760 

761# ── Preview rows ────────────────────────────────────────────────────────────── 

762 

763 

764def preview_rows( 

765 parsed: ParsedFile, 

766 mapping: dict[str, str], 

767 n: int = 5, 

768) -> list[dict[str, Any]]: 

769 """Return up to *n* non-subtotal data rows mapped to target field names.""" 

770 date_idx = _date_col_index(parsed.norm_cols, mapping) 

771 result: list[dict[str, Any]] = [] 

772 for row in parsed.data_rows: 

773 if _is_subtotal_row(row, date_idx): 

774 continue 

775 mapped: dict[str, Any] = {} 

776 for i, col in enumerate(parsed.norm_cols): 

777 target = mapping.get(col, "ignore") 

778 if target == "ignore": 

779 continue 

780 mapped[target] = row[i] if i < len(row) else None 

781 result.append(mapped) 

782 if len(result) >= n: 

783 break 

784 return result 

785 

786 

787def _date_col_index(norm_cols: list[str], mapping: dict[str, str]) -> int | None: 

788 for i, col in enumerate(norm_cols): 

789 if mapping.get(col) == "date": 

790 return i 

791 return None 

792 

793 

794# ── Import execution ────────────────────────────────────────────────────────── 

795 

796 

797def execute_import( 

798 parsed: ParsedFile, 

799 mapping: dict[str, str], 

800 pilot_user_id: int, 

801 batch_id: int, 

802 opening_balance: dict[str, Any] | None = None, 

803) -> ImportResult: 

804 """Create PilotLogbookEntry rows from *parsed* using *mapping*. 

805 

806 Returns an ImportResult describing what happened. Entries are added to 

807 db.session but NOT committed — the caller commits after also saving the 

808 batch/mapping records. 

809 """ 

810 from models import PilotLogbookEntry, db # pyright: ignore[reportMissingImports] 

811 

812 result = ImportResult() 

813 date_idx = _date_col_index(parsed.norm_cols, mapping) 

814 col_index = {col: i for i, col in enumerate(parsed.norm_cols)} 

815 

816 def _get(row: list[Any], col: str) -> Any: 

817 i = col_index.get(col) 

818 return row[i] if i is not None and i < len(row) else None 

819 

820 entries_to_add: list[PilotLogbookEntry] = [] 

821 

822 for row_num, row in enumerate(parsed.data_rows, start=1): 

823 if _is_subtotal_row(row, date_idx): 

824 result.subtotals += 1 

825 continue 

826 

827 # Find and parse date 

828 date_val: date | None = None 

829 for col, target in mapping.items(): 

830 if target == "date": 

831 raw = _get(row, col) 

832 date_val = parse_date_value(raw) 

833 break 

834 

835 if date_val is None: 

836 raw_date = ( 

837 row[date_idx] if date_idx is not None and date_idx < len(row) else None 

838 ) 

839 result.skipped.append((row_num, f"unparseable date: {raw_date!r}")) 

840 continue 

841 

842 # Build entry fields from mapping 

843 kwargs: dict[str, Any] = { 

844 "pilot_user_id": pilot_user_id, 

845 "import_batch_id": batch_id, 

846 "source": "import", 

847 "date": date_val, 

848 } 

849 

850 source_total: float | None = None 

851 

852 for col, target in mapping.items(): 

853 if target in ("ignore", "date"): 

854 continue 

855 raw = _get(row, col) 

856 if target == "total_flight_time_check": 

857 if _is_nonempty(raw): 

858 source_total = parse_duration_value(raw) 

859 continue 

860 if target in _FIELD_TYPE: 

861 _, parser = _FIELD_TYPE[target] 

862 parsed_val = parser(raw) 

863 if parsed_val is None and _is_nonempty(raw): 

864 result.parse_warnings.append( 

865 (row_num, col, target, repr(str(raw)[:40])) 

866 ) 

867 kwargs[target] = parsed_val 

868 elif target == "aircraft_type": 

869 val = str(raw).strip() if raw is not None else None 

870 kwargs["aircraft_type"] = val 

871 if val: 

872 from utils import resolve_aircraft_type_icao # pyright: ignore[reportMissingImports] 

873 

874 kwargs["aircraft_type_icao"] = resolve_aircraft_type_icao(val) 

875 elif target in ( 

876 "aircraft_registration", 

877 "departure_place", 

878 "arrival_place", 

879 "pic_name", 

880 "remarks", 

881 ): 

882 kwargs[target] = str(raw).strip() if raw is not None else None 

883 

884 if source_total is not None: 

885 computed = round( 

886 sum( 

887 float(kwargs.get(f) or 0) 

888 for f in ("single_pilot_se", "single_pilot_me", "multi_pilot") 

889 ), 

890 1, 

891 ) 

892 if abs(source_total - computed) >= 0.15: 

893 result.total_mismatch_warnings.append((row_num, source_total, computed)) 

894 

895 entries_to_add.append(PilotLogbookEntry(**kwargs)) 

896 

897 result.imported = len(entries_to_add) 

898 for e in entries_to_add: 

899 db.session.add(e) 

900 

901 # Opening balance — synthetic entry dated one day before earliest imported 

902 if opening_balance and any(v for v in opening_balance.values() if v): 

903 earliest = ( 

904 min(e.date for e in entries_to_add) if entries_to_add else date.today() 

905 ) 

906 from datetime import timedelta as _td 

907 

908 balance_date = earliest - _td(days=1) 

909 balance_entry = PilotLogbookEntry( 

910 pilot_user_id=pilot_user_id, 

911 import_batch_id=batch_id, 

912 source="import", 

913 date=balance_date, 

914 remarks="Opening balance (imported)", 

915 night_time=opening_balance.get("night_time"), 

916 instrument_time=opening_balance.get("instrument_time"), 

917 single_pilot_se=opening_balance.get("single_pilot_se"), 

918 single_pilot_me=opening_balance.get("single_pilot_me"), 

919 multi_pilot=opening_balance.get("multi_pilot"), 

920 function_pic=opening_balance.get("function_pic"), 

921 function_copilot=opening_balance.get("function_copilot"), 

922 function_dual=opening_balance.get("function_dual"), 

923 function_instructor=opening_balance.get("function_instructor"), 

924 ) 

925 db.session.add(balance_entry) 

926 result.has_opening_balance = True 

927 

928 return result