Coverage for app/pilots/logbook_import.py: 100%
438 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1"""Pilot logbook import — parsing, normalisation, mapping, and execution."""
3from __future__ import annotations
5import csv
6import hashlib
7import io
8import json
9import re
10from dataclasses import dataclass, field
11from datetime import date, datetime, time, timedelta
12from typing import Any, Callable
14import openpyxl # pyright: ignore[reportMissingImports]
15from flask_babel import gettext as _ # pyright: ignore[reportMissingImports]
17# ── Constants ─────────────────────────────────────────────────────────────────
19TARGET_FIELDS: list[str] = [
20 "date",
21 "aircraft_type",
22 "aircraft_registration",
23 "departure_place",
24 "departure_time",
25 "arrival_place",
26 "arrival_time",
27 "pic_name",
28 "night_time",
29 "instrument_time",
30 "cross_country",
31 "landings_day",
32 "landings_night",
33 "single_pilot_se",
34 "single_pilot_me",
35 "multi_pilot",
36 "function_pic",
37 "function_copilot",
38 "function_dual",
39 "function_instructor",
40 "remarks",
41 # virtual target — used for import validation only, not stored
42 "total_flight_time_check",
43]
45# Normalised source column name → target field.
46# Positional disambiguation suffixes (_2, _3 …) are applied before lookup,
47# so "time" is the first TIME column (departure) and "time_2" is the second (arrival).
48_ALIASES: dict[str, str] = {
49 # EASA standard logbook column names (normalised)
50 "date dd/mm/yy": "date",
51 "date": "date",
52 "aircraft type": "aircraft_type",
53 "type": "aircraft_type",
54 "aircraft registration number": "aircraft_registration",
55 "registration": "aircraft_registration",
56 "reg": "aircraft_registration",
57 "from": "departure_place",
58 "departure": "departure_place",
59 "dep": "departure_place",
60 "time": "departure_time", # first TIME → departure
61 "time_2": "arrival_time", # second TIME → arrival
62 "departure time": "departure_time",
63 "arrival time": "arrival_time",
64 "to": "arrival_place",
65 "arrival": "arrival_place",
66 "arr": "arrival_place",
67 "pic name": "pic_name",
68 "pic name (if not student pilot)": "pic_name",
69 "captain": "pic_name",
70 # Landings group — first DAY / NIGHT pair
71 "day": "landings_day",
72 "night": "landings_night",
73 # Operational conditions group — second DAY / NIGHT pair (positional suffix)
74 "day_2": "ignore", # cross-country day — not a logbook field
75 "night_2": "night_time", # night flying time
76 # Aircraft category
77 "se": "single_pilot_se",
78 "single engine": "single_pilot_se",
79 "single pilot se": "single_pilot_se",
80 "me": "single_pilot_me",
81 "multi engine": "single_pilot_me",
82 "single pilot me": "single_pilot_me",
83 "multi pilot": "multi_pilot",
84 "mp": "multi_pilot",
85 # Pilot function
86 "pic": "function_pic",
87 "p1": "function_pic",
88 "co-pic": "function_copilot",
89 "co-pilot": "function_copilot",
90 "copilot": "function_copilot",
91 "p2": "function_copilot",
92 "dual received": "function_dual",
93 "dual": "function_dual",
94 "student": "function_dual",
95 "instructor": "function_instructor",
96 "fi": "function_instructor",
97 # Cross-country time
98 "cross-country": "cross_country",
99 "cross country": "cross_country",
100 "x-country": "cross_country",
101 "xc": "cross_country",
102 # Total flight time — imported only to validate against computed sum
103 "total flight time": "total_flight_time_check",
104 # Catch-all
105 "total": "ignore",
106 "no. istr. appr.": "ignore",
107 "ifr approaches": "ignore",
108 "page": "ignore",
109 "line": "ignore",
110 "remarks": "remarks",
111 "notes": "remarks",
112 "comments": "remarks",
113 "instrument time": "instrument_time",
114 "ifr": "instrument_time",
115 "instrument": "instrument_time",
116 "night time": "night_time",
117 # Group-prefixed column names (e.g. Belgian EASA logbook with a span header row)
118 "departure & arrival from": "departure_place",
119 "departure & arrival time": "departure_time",
120 "departure & arrival time_2": "arrival_time",
121 "departure & arrival to": "arrival_place",
122 "aircraft category se": "single_pilot_se",
123 "aircraft category me": "single_pilot_me",
124 "operational conditions cross-country": "cross_country",
125 "operational conditions day": "ignore",
126 "operational conditions night": "night_time",
127 "pilot function pic": "function_pic",
128 "pilot function co-pic": "function_copilot",
129 "pilot function dual received": "function_dual",
130 "page subtotals total flight time": "ignore",
131 "page subtotals total flight time_2": "ignore",
132 "page subtotals pic": "ignore",
133 "page subtotals dual": "ignore",
134 "page subtotals night": "ignore",
135 "page subtotals landings – day": "ignore",
136 "page subtotals landings – night": "ignore",
137 "page subtotals formated date": "ignore",
138 "page subtotals formated type": "ignore",
139 "page subtotals days since flight": "ignore",
140 "page subtotals daytime duration": "ignore",
141 "page subtotals non pic or dual": "ignore",
142 "formated date": "ignore",
143 "formated type": "ignore",
144 "days since flight": "ignore",
145 "daytime duration": "ignore",
146 "non pic or dual": "ignore",
147 # Landings group (group-prefixed)
148 "landings day": "landings_day",
149 "landings night": "landings_night",
150 # Aircraft type (multiline cell name normalised to single space)
151 "aircraft type name, model, variant": "aircraft_type",
152}
154# ── Data structures ───────────────────────────────────────────────────────────
157@dataclass
158class ParsedFile:
159 """Result of parsing an uploaded logbook file."""
161 norm_cols: list[str] # normalised + disambiguated column keys
162 raw_cols: list[str] # original column labels (for display)
163 header_row_index: int # 0-based row index of the detected header
164 data_rows: list[list[Any]] # all rows after the header (including subtotals)
165 fingerprint: str # SHA-256 of norm_cols
168@dataclass
169class MappingProposal:
170 """Proposed column mapping and how it was derived."""
172 mapping: dict[str, str] # norm_col_key → target_field or "ignore"
173 match_type: str # "exact", "fuzzy", "alias"
174 fuzzy_score: float = 0.0 # 0.0–1.0, only meaningful for "fuzzy"
175 matched_mapping_id: int | None = None
178@dataclass
179class ImportResult:
180 """Summary returned after executing an import."""
182 imported: int = 0
183 subtotals: int = 0
184 skipped: list[tuple[int, str]] = field(default_factory=list) # (row_num, reason)
185 # (row_num, source_col, target_field, repr(raw)) — non-empty cells that couldn't parse
186 parse_warnings: list[tuple[int, str, str, str]] = field(default_factory=list)
187 # (row_num, source_total, computed_total) — rows where total ≠ sum of components
188 total_mismatch_warnings: list[tuple[int, float, float]] = field(
189 default_factory=list
190 )
191 has_opening_balance: bool = False
194# ── Normalisation ─────────────────────────────────────────────────────────────
197def _norm(text: str) -> str:
198 """Strip, collapse whitespace, lower-case."""
199 return re.sub(r"\s+", " ", str(text).strip().lower())
202def _disambiguate(names: list[str]) -> list[str]:
203 """Append _2, _3 … to duplicate names to make them unique."""
204 seen: dict[str, int] = {}
205 result: list[str] = []
206 for n in names:
207 if n in seen:
208 seen[n] += 1
209 result.append(f"{n}_{seen[n]}")
210 else:
211 seen[n] = 1
212 result.append(n)
213 return result
216def _fingerprint(norm_cols: list[str]) -> str:
217 payload = json.dumps(norm_cols, ensure_ascii=False, sort_keys=False)
218 return hashlib.sha256(payload.encode()).hexdigest()
221# ── Header detection ──────────────────────────────────────────────────────────
224def _is_header_row(row: list[Any]) -> bool:
225 """True if ≥ 50 % of non-empty cells are non-numeric strings and ≥ 4 non-empty."""
226 non_empty = [c for c in row if c is not None and str(c).strip()]
227 if len(non_empty) < 4:
228 return False
229 string_like = [
230 c for c in non_empty if isinstance(c, str) and not _is_numeric_str(str(c))
231 ]
232 return len(string_like) / len(non_empty) >= 0.5
235def _is_numeric_str(s: str) -> bool:
236 try:
237 float(s)
238 return True
239 except ValueError:
240 return False
243def _header_alias_score(row: list[Any]) -> int:
244 """Count how many cells in *row* match a known alias."""
245 return sum(1 for c in row if c is not None and _norm(str(c)) in _ALIASES)
248def _find_header_row(rows: list[list[Any]], max_scan: int = 20) -> int | None:
249 """Return 0-based index of the best header row within the first max_scan rows.
251 Many logbook templates have a group-label row (e.g. "DEPARTURE & ARRIVAL",
252 "LANDINGS") above the actual column-header row. Both pass _is_header_row,
253 but the actual header row has far more alias matches. We score every
254 candidate and return the one with the highest score; ties go to the earlier
255 row. If no alias matches are found we fall back to the first text-like row.
256 """
257 best_idx: int | None = None
258 best_score = -1
259 first_text_row: int | None = None
261 for i, row in enumerate(rows[:max_scan]):
262 if not _is_header_row(row):
263 continue
264 if first_text_row is None:
265 first_text_row = i
266 score = _header_alias_score(row)
267 if score > best_score:
268 best_score = score
269 best_idx = i
271 return best_idx if (best_idx is not None and best_score > 0) else first_text_row
274def _trim_trailing_empty_cols(
275 all_rows: list[list[Any]], max_scan: int = 50
276) -> list[list[Any]]:
277 """Trim columns beyond the rightmost non-empty value in the first max_scan rows.
279 Excel templates often declare thousands of formatted-but-empty columns.
280 Without trimming, every empty cell becomes a separate mapping entry.
281 """
282 max_col = 0
283 for row in all_rows[:max_scan]:
284 for j in range(len(row) - 1, -1, -1):
285 if row[j] is not None and str(row[j]).strip():
286 if j + 1 > max_col:
287 max_col = j + 1
288 break
289 if max_col == 0:
290 return (
291 all_rows # pragma: no cover — blank file rejected by header detection first
292 )
293 return [row[:max_col] for row in all_rows]
296# ── Group-header detection ────────────────────────────────────────────────────
299def _merge_label_map(ws: Any) -> dict[tuple[int, int], str]:
300 """Return {(0-based row, 0-based col): group_label} for every merged region.
302 The label is the value of the top-left cell of each merged range.
303 Every cell in the range gets the same label so downstream code can do a
304 simple lookup without forward-fill.
305 """
306 result: dict[tuple[int, int], str] = {}
307 for mc in ws.merged_cells.ranges:
308 val = ws.cell(mc.min_row, mc.min_col).value
309 if val is None or not str(val).strip():
310 continue
311 label = str(val).strip()
312 for r in range(mc.min_row - 1, mc.max_row):
313 for c in range(mc.min_col - 1, mc.max_col):
314 result[(r, c)] = label
315 return result
318def _group_labels_from_map(
319 merge_map: dict[tuple[int, int], str], row_idx: int, width: int
320) -> list[str]:
321 """Return per-column group labels for *row_idx* using exact merge metadata."""
322 return [merge_map.get((row_idx, c), "") for c in range(width)]
325def _group_labels_heuristic(row: list[Any], width: int) -> list[str] | None:
326 """Fallback for CSV: forward-fill a sparse row to infer group labels.
328 Requires at least two non-empty values with a gap between them (so a single
329 spanning title cell does not fire). Returns None if the pattern is absent.
330 """
331 padded: list[Any] = list(row[:width]) + [None] * max(0, width - len(row))
333 prev_nonempty_idx: int | None = None
334 has_span = False
335 for i, val in enumerate(padded):
336 if val is not None and str(val).strip():
337 if prev_nonempty_idx is not None and i > prev_nonempty_idx + 1:
338 has_span = True
339 break
340 prev_nonempty_idx = i
341 if not has_span:
342 return None
344 result: list[str] = []
345 current = ""
346 for val in padded:
347 if val is not None and str(val).strip():
348 current = str(val).strip()
349 result.append(current)
350 return result
353def _apply_group_labels(group_labels: list[str], raw_header: list[str]) -> list[str]:
354 """Prepend each non-empty group label to the corresponding column header."""
355 result = []
356 for i, col in enumerate(raw_header):
357 label = group_labels[i] if i < len(group_labels) else ""
358 if label and col:
359 result.append(f"{label} {col}")
360 elif label:
361 result.append(label)
362 else:
363 result.append(col)
364 return result
367# ── File parsing ──────────────────────────────────────────────────────────────
370def parse_file(data: bytes, filename: str) -> ParsedFile:
371 """Parse bytes from an uploaded file into a ParsedFile.
373 Raises ValueError with a user-friendly message on format errors.
374 """
375 ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
376 if ext in ("xlsx", "xls"):
377 return _parse_excel(data, filename)
378 if ext == "csv":
379 return _parse_csv(data, filename)
380 raise ValueError(
381 f"Unsupported file format: .{ext} — please upload a .csv or .xlsx file."
382 )
385# English-only Excel tab names used to identify the logbook sheet.
386# Locale-specific equivalents live in the translation catalogue — look for
387# entries marked "Excel tab name" in the translator comments.
388_PREFERRED_SHEET_NAMES_EN: frozenset[str] = frozenset(
389 {
390 "logbook",
391 "log",
392 "flights",
393 "flight log",
394 "flights log",
395 "journal",
396 }
397)
400def _preferred_sheet_names() -> frozenset[str]:
401 """Return preferred sheet names merged with their active-locale translations.
403 All _() calls below are extracted by pybabel; locale-specific Excel tab
404 names belong in the translation catalogue (see "Excel tab name" entries).
405 """
406 t_logbook = _("logbook")
407 t_log = _("log")
408 t_flights = _("flights")
409 t_flight_log = _("flight log")
410 t_flights_log = _("flights log")
411 t_journal = _("journal")
412 translated = frozenset(
413 {t_logbook, t_log, t_flights, t_flight_log, t_flights_log, t_journal}
414 )
415 return _PREFERRED_SHEET_NAMES_EN | frozenset(s.strip().lower() for s in translated)
418def _pick_best_excel_sheet(wb: Any) -> tuple[str, list[list[Any]]]:
419 """Return (sheet_name, all_rows) for the most likely logbook sheet.
421 Prefers sheets whose name matches a known logbook keyword (fast path, reads
422 only the chosen sheet). When no name matches, reads every sheet and scores
423 by alias matches in the detected header, returning the highest-scoring sheet
424 together with its rows. Falls back to the first sheet when scoring is
425 inconclusive.
427 Rows are returned here because openpyxl read-only worksheets use a one-shot
428 generator — calling iter_rows() twice on the same worksheet yields nothing on
429 the second call.
430 """
431 preferred = _preferred_sheet_names()
432 for ws in wb.worksheets:
433 if ws.title.strip().lower() in preferred:
434 return ws.title, [list(row) for row in ws.iter_rows(values_only=True)]
436 best_name = wb.worksheets[0].title if wb.worksheets else ""
437 best_score = -1
438 best_rows: list[list[Any]] = []
439 for ws in wb.worksheets:
440 rows = [list(row) for row in ws.iter_rows(values_only=True)]
441 trimmed = _trim_trailing_empty_cols(list(rows))
442 hi = _find_header_row(trimmed)
443 score = _header_alias_score(trimmed[hi]) if hi is not None else 0
444 if score > best_score:
445 best_score = score
446 best_name = ws.title
447 best_rows = rows
448 return best_name, best_rows
451def _parse_excel(data: bytes, filename: str) -> ParsedFile:
452 try:
453 wb_ro = openpyxl.load_workbook(io.BytesIO(data), read_only=True, data_only=True)
454 except Exception as exc:
455 raise ValueError(f"Could not open Excel file: {exc}") from exc
456 sheet_name, all_rows = _pick_best_excel_sheet(wb_ro)
457 wb_ro.close()
459 # Second load (non-read-only) to access merged cell ranges for exact group labels.
460 excel_merge_map: dict[tuple[int, int], str] | None = None
461 try:
462 wb_full = openpyxl.load_workbook(io.BytesIO(data), data_only=True)
463 excel_merge_map = _merge_label_map(wb_full[sheet_name])
464 wb_full.close()
465 except Exception: # noqa: S110 # fall back to heuristic in _build_parsed_file
466 pass
468 return _build_parsed_file(all_rows, filename, excel_merge_map=excel_merge_map)
471def _parse_csv(data: bytes, filename: str) -> ParsedFile:
472 try:
473 text = data.decode("utf-8-sig")
474 except UnicodeDecodeError:
475 text = data.decode("latin-1")
477 sample = text[:4096]
478 try:
479 dialect = csv.Sniffer().sniff(sample)
480 except csv.Error:
481 dialect = csv.excel
483 reader = csv.reader(io.StringIO(text), dialect)
484 all_rows: list[list[Any]] = list(reader)
485 return _build_parsed_file(all_rows, filename)
488def _build_parsed_file(
489 all_rows: list[list[Any]],
490 filename: str,
491 excel_merge_map: dict[tuple[int, int], str] | None = None,
492) -> ParsedFile:
493 all_rows = _trim_trailing_empty_cols(all_rows)
494 header_idx = _find_header_row(all_rows)
495 if header_idx is None:
496 raise ValueError(
497 "Could not detect a header row in this file. "
498 "Make sure the file contains column names."
499 )
501 raw_header = [str(c).strip() if c is not None else "" for c in all_rows[header_idx]]
503 # Prepend group labels from the row above the header, if one exists
504 if header_idx > 0:
505 width = len(raw_header)
506 if excel_merge_map is not None:
507 # Excel: exact boundaries from merged cell metadata
508 _gl = _group_labels_from_map(excel_merge_map, header_idx - 1, width)
509 group_labels: list[str] | None = _gl if any(_gl) else None
510 else:
511 # CSV: heuristic forward-fill (merged cell info unavailable)
512 group_labels = _group_labels_heuristic(all_rows[header_idx - 1], width)
513 if group_labels is not None:
514 raw_header = _apply_group_labels(group_labels, raw_header)
516 norm_raw = [_norm(c) for c in raw_header]
517 norm_cols = _disambiguate(norm_raw)
518 data_rows = all_rows[header_idx + 1 :]
520 return ParsedFile(
521 norm_cols=norm_cols,
522 raw_cols=raw_header,
523 header_row_index=header_idx,
524 data_rows=data_rows,
525 fingerprint=_fingerprint(norm_cols),
526 )
529# ── Mapping proposal ──────────────────────────────────────────────────────────
532def propose_mapping(
533 parsed: ParsedFile,
534 existing_mappings: list[Any], # list[LogbookImportMapping]
535) -> MappingProposal:
536 """Return the best mapping proposal for *parsed*, checking saved mappings first."""
537 # 1. Exact fingerprint match
538 for m in existing_mappings:
539 if m.source_fingerprint == parsed.fingerprint:
540 return MappingProposal(
541 mapping=json.loads(m.column_mapping),
542 match_type="exact",
543 matched_mapping_id=m.id,
544 )
546 # 2. Fuzzy match — best overlap among saved mappings
547 best_score = 0.0
548 best_m = None
549 new_set = set(parsed.norm_cols)
550 for m in existing_mappings:
551 saved_cols: list[str] = json.loads(m.source_columns)
552 saved_set = set(saved_cols)
553 if not saved_set:
554 continue
555 overlap = len(new_set & saved_set)
556 score = overlap / max(len(new_set), len(saved_set))
557 if score > best_score:
558 best_score = score
559 best_m = m
561 if best_m is not None and best_score >= 0.6:
562 saved_map: dict[str, str] = json.loads(best_m.column_mapping)
563 # Build a mapping for the new columns, falling back to alias for unmatched ones
564 merged = _alias_mapping(parsed.norm_cols)
565 for col in parsed.norm_cols:
566 if col in saved_map:
567 merged[col] = saved_map[col]
568 return MappingProposal(
569 mapping=merged,
570 match_type="fuzzy",
571 fuzzy_score=best_score,
572 matched_mapping_id=best_m.id,
573 )
575 # 3. Alias-only auto-mapping
576 return MappingProposal(
577 mapping=_alias_mapping(parsed.norm_cols),
578 match_type="alias",
579 )
582def _alias_mapping(norm_cols: list[str]) -> dict[str, str]:
583 """Apply the built-in alias table; unknown columns default to 'ignore'."""
584 result: dict[str, str] = {}
585 for col in norm_cols:
586 result[col] = _ALIASES.get(col, "ignore")
587 return result
590# ── Subtotal detection ────────────────────────────────────────────────────────
593def _is_subtotal_row(row: list[Any], date_col_idx: int | None) -> bool:
594 if date_col_idx is None:
595 return False
596 if date_col_idx >= len(row):
597 return True
598 val = row[date_col_idx]
599 if isinstance(val, timedelta):
600 return True
601 if val is None or (isinstance(val, str) and not val.strip()):
602 return True
603 if isinstance(val, str) and "total" in val.lower():
604 return True
605 return False
608# ── Value parsing ─────────────────────────────────────────────────────────────
611def parse_date_value(val: Any) -> date | None:
612 if isinstance(val, datetime):
613 return val.date()
614 if isinstance(val, date):
615 return val
616 if isinstance(val, (int, float)):
617 # Excel serial date number — openpyxl returns datetime; guard anyway
618 return None
619 if not isinstance(val, str):
620 return None
621 s = val.strip()
622 if not s:
623 return None
624 for fmt in ("%d/%m/%y", "%d/%m/%Y", "%Y-%m-%d", "%m/%d/%Y", "%d-%m-%Y"):
625 try:
626 return datetime.strptime(s, fmt).date()
627 except ValueError:
628 pass
629 return None
632def parse_time_value(val: Any) -> time | None:
633 """Parse a time-of-day value (HH:MM or Python time)."""
634 if isinstance(val, time):
635 return val
636 if isinstance(val, datetime):
637 return val.time()
638 if not isinstance(val, str):
639 return None
640 s = val.strip()
641 if not s:
642 return None
643 m = re.match(r"^(\d{1,2}):(\d{2})$", s)
644 if m:
645 try:
646 return time(int(m.group(1)), int(m.group(2)))
647 except ValueError:
648 return None
649 return None
652def parse_duration_value(val: Any) -> float | None:
653 """Parse a duration into decimal hours (e.g. time(0,42) → 0.7, '1:24' → 1.4)."""
654 if isinstance(val, timedelta):
655 hours = val.total_seconds() / 3600
656 return round(hours, 1) if hours >= 0 else None
657 if isinstance(val, time):
658 return round(val.hour + val.minute / 60, 1)
659 if isinstance(val, datetime):
660 # Excel sometimes returns a dummy date + the time-of-day
661 t = val.time()
662 return round(t.hour + t.minute / 60, 1)
663 if isinstance(val, (int, float)):
664 if val < 0:
665 return None
666 return round(float(val), 1)
667 if isinstance(val, str):
668 s = val.strip()
669 if not s:
670 return None
671 m = re.match(r"^(\d+):(\d{2})$", s)
672 if m:
673 return round(int(m.group(1)) + int(m.group(2)) / 60, 1)
674 try:
675 v = float(s)
676 return round(v, 1) if v >= 0 else None
677 except ValueError:
678 return None
679 return None
682def parse_int_value(val: Any) -> int | None:
683 if isinstance(val, int):
684 return val if val >= 0 else None
685 if isinstance(val, float):
686 return int(val) if val >= 0 else None
687 if isinstance(val, str):
688 s = val.strip()
689 if not s:
690 return None
691 try:
692 return int(float(s))
693 except ValueError:
694 return None
695 return None
698def _is_nonempty(val: Any) -> bool:
699 """Return True when val carries a real value (not None, not blank string)."""
700 if val is None:
701 return False
702 if isinstance(val, str):
703 return bool(val.strip())
704 return True
707# Map target field → (human-readable type name, parser function)
708_FIELD_TYPE: dict[str, tuple[str, Callable[[Any], Any]]] = {
709 "date": ("date", parse_date_value),
710 "departure_time": ("time", parse_time_value),
711 "arrival_time": ("time", parse_time_value),
712 "night_time": ("duration", parse_duration_value),
713 "instrument_time": ("duration", parse_duration_value),
714 "cross_country": ("duration", parse_duration_value),
715 "total_flight_time_check": ("duration", parse_duration_value),
716 "single_pilot_se": ("duration", parse_duration_value),
717 "single_pilot_me": ("duration", parse_duration_value),
718 "multi_pilot": ("duration", parse_duration_value),
719 "function_pic": ("duration", parse_duration_value),
720 "function_copilot": ("duration", parse_duration_value),
721 "function_dual": ("duration", parse_duration_value),
722 "function_instructor": ("duration", parse_duration_value),
723 "landings_day": ("integer", parse_int_value),
724 "landings_night": ("integer", parse_int_value),
725}
727_HINT_SAMPLE_ROWS = 5
730def type_hints(parsed: ParsedFile, mapping: dict[str, str]) -> dict[str, str]:
731 """Return {col: hint_text} for columns where sample data doesn't match the proposed type.
733 Samples up to _HINT_SAMPLE_ROWS rows. Returns a hint when non-empty values are
734 present but any of them fail to parse — indicating a likely mapping mismatch.
735 """
736 col_index = {col: i for i, col in enumerate(parsed.norm_cols)}
737 hints: dict[str, str] = {}
738 for col, target in mapping.items():
739 if target not in _FIELD_TYPE:
740 continue
741 type_name, parser = _FIELD_TYPE[target]
742 idx = col_index.get(col)
743 if idx is None:
744 continue
745 sample = [
746 row[idx]
747 for row in parsed.data_rows[:_HINT_SAMPLE_ROWS]
748 if idx < len(row) and _is_nonempty(row[idx])
749 ]
750 if not sample:
751 continue
752 failed = [v for v in sample if parser(v) is None]
753 if failed:
754 example = str(failed[0])[:30]
755 hints[col] = (
756 f"Sample data doesn't look like a {type_name} (e.g. {example!r})"
757 )
758 return hints
761# ── Preview rows ──────────────────────────────────────────────────────────────
764def preview_rows(
765 parsed: ParsedFile,
766 mapping: dict[str, str],
767 n: int = 5,
768) -> list[dict[str, Any]]:
769 """Return up to *n* non-subtotal data rows mapped to target field names."""
770 date_idx = _date_col_index(parsed.norm_cols, mapping)
771 result: list[dict[str, Any]] = []
772 for row in parsed.data_rows:
773 if _is_subtotal_row(row, date_idx):
774 continue
775 mapped: dict[str, Any] = {}
776 for i, col in enumerate(parsed.norm_cols):
777 target = mapping.get(col, "ignore")
778 if target == "ignore":
779 continue
780 mapped[target] = row[i] if i < len(row) else None
781 result.append(mapped)
782 if len(result) >= n:
783 break
784 return result
787def _date_col_index(norm_cols: list[str], mapping: dict[str, str]) -> int | None:
788 for i, col in enumerate(norm_cols):
789 if mapping.get(col) == "date":
790 return i
791 return None
794# ── Import execution ──────────────────────────────────────────────────────────
797def execute_import(
798 parsed: ParsedFile,
799 mapping: dict[str, str],
800 pilot_user_id: int,
801 batch_id: int,
802 opening_balance: dict[str, Any] | None = None,
803) -> ImportResult:
804 """Create PilotLogbookEntry rows from *parsed* using *mapping*.
806 Returns an ImportResult describing what happened. Entries are added to
807 db.session but NOT committed — the caller commits after also saving the
808 batch/mapping records.
809 """
810 from models import PilotLogbookEntry, db # pyright: ignore[reportMissingImports]
812 result = ImportResult()
813 date_idx = _date_col_index(parsed.norm_cols, mapping)
814 col_index = {col: i for i, col in enumerate(parsed.norm_cols)}
816 def _get(row: list[Any], col: str) -> Any:
817 i = col_index.get(col)
818 return row[i] if i is not None and i < len(row) else None
820 entries_to_add: list[PilotLogbookEntry] = []
822 for row_num, row in enumerate(parsed.data_rows, start=1):
823 if _is_subtotal_row(row, date_idx):
824 result.subtotals += 1
825 continue
827 # Find and parse date
828 date_val: date | None = None
829 for col, target in mapping.items():
830 if target == "date":
831 raw = _get(row, col)
832 date_val = parse_date_value(raw)
833 break
835 if date_val is None:
836 raw_date = (
837 row[date_idx] if date_idx is not None and date_idx < len(row) else None
838 )
839 result.skipped.append((row_num, f"unparseable date: {raw_date!r}"))
840 continue
842 # Build entry fields from mapping
843 kwargs: dict[str, Any] = {
844 "pilot_user_id": pilot_user_id,
845 "import_batch_id": batch_id,
846 "source": "import",
847 "date": date_val,
848 }
850 source_total: float | None = None
852 for col, target in mapping.items():
853 if target in ("ignore", "date"):
854 continue
855 raw = _get(row, col)
856 if target == "total_flight_time_check":
857 if _is_nonempty(raw):
858 source_total = parse_duration_value(raw)
859 continue
860 if target in _FIELD_TYPE:
861 _, parser = _FIELD_TYPE[target]
862 parsed_val = parser(raw)
863 if parsed_val is None and _is_nonempty(raw):
864 result.parse_warnings.append(
865 (row_num, col, target, repr(str(raw)[:40]))
866 )
867 kwargs[target] = parsed_val
868 elif target == "aircraft_type":
869 val = str(raw).strip() if raw is not None else None
870 kwargs["aircraft_type"] = val
871 if val:
872 from utils import resolve_aircraft_type_icao # pyright: ignore[reportMissingImports]
874 kwargs["aircraft_type_icao"] = resolve_aircraft_type_icao(val)
875 elif target in (
876 "aircraft_registration",
877 "departure_place",
878 "arrival_place",
879 "pic_name",
880 "remarks",
881 ):
882 kwargs[target] = str(raw).strip() if raw is not None else None
884 if source_total is not None:
885 computed = round(
886 sum(
887 float(kwargs.get(f) or 0)
888 for f in ("single_pilot_se", "single_pilot_me", "multi_pilot")
889 ),
890 1,
891 )
892 if abs(source_total - computed) >= 0.15:
893 result.total_mismatch_warnings.append((row_num, source_total, computed))
895 entries_to_add.append(PilotLogbookEntry(**kwargs))
897 result.imported = len(entries_to_add)
898 for e in entries_to_add:
899 db.session.add(e)
901 # Opening balance — synthetic entry dated one day before earliest imported
902 if opening_balance and any(v for v in opening_balance.values() if v):
903 earliest = (
904 min(e.date for e in entries_to_add) if entries_to_add else date.today()
905 )
906 from datetime import timedelta as _td
908 balance_date = earliest - _td(days=1)
909 balance_entry = PilotLogbookEntry(
910 pilot_user_id=pilot_user_id,
911 import_batch_id=batch_id,
912 source="import",
913 date=balance_date,
914 remarks="Opening balance (imported)",
915 night_time=opening_balance.get("night_time"),
916 instrument_time=opening_balance.get("instrument_time"),
917 single_pilot_se=opening_balance.get("single_pilot_se"),
918 single_pilot_me=opening_balance.get("single_pilot_me"),
919 multi_pilot=opening_balance.get("multi_pilot"),
920 function_pic=opening_balance.get("function_pic"),
921 function_copilot=opening_balance.get("function_copilot"),
922 function_dual=opening_balance.get("function_dual"),
923 function_instructor=opening_balance.get("function_instructor"),
924 )
925 db.session.add(balance_entry)
926 result.has_opening_balance = True
928 return result