Coverage for app/pwa/routes.py: 100%
174 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1import contextlib
2import mimetypes as _mimetypes
3import os
4import re as _re
5import shutil
6import tempfile
7import uuid
8from datetime import date as _date
9from typing import cast
11from flask import ( # pyright: ignore[reportMissingImports]
12 Blueprint,
13 abort,
14 current_app,
15 flash,
16 redirect,
17 render_template,
18 request,
19 session,
20 url_for,
21)
22from flask.typing import ResponseReturnValue # pyright: ignore[reportMissingImports]
23from flask_babel import gettext as _ # pyright: ignore[reportMissingImports]
25from models import ( # pyright: ignore[reportMissingImports]
26 Aircraft,
27 DocCategory,
28 Document,
29 Role,
30 Tenant,
31 TenantUser,
32 db,
33)
34from utils import login_required # pyright: ignore[reportMissingImports]
36pwa_bp = Blueprint("pwa", __name__)
38_OWNER_ROLES = (Role.ADMIN, Role.OWNER)
40# MIME types accepted per destination
41_DEST_ACCEPT: dict[str, frozenset[str]] = {
42 "document": frozenset(
43 {
44 "application/pdf",
45 "image/jpeg",
46 "image/png",
47 "image/gif",
48 "image/webp",
49 "image/heic",
50 "application/msword",
51 "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
52 "application/vnd.ms-excel",
53 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
54 "text/plain",
55 }
56 ),
57 "expense": frozenset(
58 {"application/pdf", "image/jpeg", "image/png", "image/gif", "image/webp"}
59 ),
60 "maintenance": frozenset({"application/pdf", "image/jpeg", "image/png"}),
61 "flight_photo": frozenset(
62 {"image/jpeg", "image/png", "image/gif", "image/webp", "image/heic"}
63 ),
64}
66# Static MIME-type → file extension mapping. Used to derive the stored
67# extension from the content type rather than the user-supplied filename,
68# which breaks the taint chain for path-injection analysis.
69_MIME_TO_EXT: dict[str, str] = {
70 "application/pdf": ".pdf",
71 "image/jpeg": ".jpg",
72 "image/png": ".png",
73 "image/gif": ".gif",
74 "image/webp": ".webp",
75 "image/heic": ".heic",
76 "application/msword": ".doc",
77 "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
78 "application/vnd.ms-excel": ".xls",
79 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
80 "text/plain": ".txt",
81}
84def _allowed_destinations(mimetypes: list[str]) -> list[str]:
85 """Return destinations that accept every provided MIME type."""
86 mt_set = set(mimetypes)
87 return [dest for dest in _DEST_ACCEPT if mt_set.issubset(_DEST_ACCEPT[dest])]
90def _dest_labels() -> dict[str, str]:
91 return {
92 "document": _("Aircraft document"),
93 "expense": _("Expense receipt"),
94 "maintenance": _("Maintenance record"),
95 "flight_photo": _("Flight photo"),
96 }
99def _category_labels() -> list[tuple[str, str]]:
100 return [
101 (DocCategory.MAINTENANCE, _("Maintenance")),
102 (DocCategory.INSURANCE, _("Insurance")),
103 (DocCategory.POH, _("POH / Flight Manual")),
104 (DocCategory.AIRWORTHINESS, _("Airworthiness")),
105 (DocCategory.LOGBOOK, _("Logbook")),
106 (DocCategory.INVOICE, _("Invoice")),
107 (DocCategory.OTHER, _("Other")),
108 (DocCategory.UNCATEGORISED, _("Uncategorised")),
109 ]
112def _get_user_aircraft() -> list[Aircraft]:
113 tu = TenantUser.query.filter_by(user_id=session.get("user_id")).first()
114 if not tu:
115 return []
116 return cast(
117 list[Aircraft],
118 Aircraft.query.filter_by(tenant_id=tu.tenant_id)
119 .order_by(Aircraft.registration)
120 .all(),
121 )
124def _ensure_tenant_slug(tenant: Tenant) -> str:
125 if tenant.slug:
126 return str(tenant.slug)
127 base = _re.sub(r"[^a-z0-9]+", "-", tenant.name.lower()).strip("-")[:64]
128 slug = base
129 n = 1
130 while Tenant.query.filter(Tenant.slug == slug, Tenant.id != tenant.id).first():
131 slug = f"{base}-{n}"
132 n += 1
133 tenant.slug = slug
134 db.session.flush()
135 return slug
138def _safe_path_component(s: str) -> str:
139 return _re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", s).strip()
142def _cleanup_temp(tmp_dir: str) -> None:
143 session.pop("share_pending", None)
144 shutil.rmtree(tmp_dir, ignore_errors=True)
147# ── Routes ────────────────────────────────────────────────────────────────────
150@pwa_bp.route("/pwa/shared", methods=["GET"])
151@login_required
152def share_target_get() -> ResponseReturnValue:
153 return redirect(url_for("index"))
156@pwa_bp.route("/pwa/shared", methods=["POST"])
157@login_required
158def share_target() -> ResponseReturnValue:
159 files = request.files.getlist("files")
160 title = request.form.get("title", "").strip()
162 valid_files = [f for f in files if f.filename]
163 if not valid_files:
164 flash(_("No files were shared."), "warning")
165 return redirect(url_for("index"))
167 tmp_dir = tempfile.mkdtemp(prefix="oh-share-")
168 saved: list[dict[str, str]] = []
169 mimetypes: list[str] = []
171 for f in valid_files:
172 original_name = f.filename or "unnamed"
173 safe_name = f"{uuid.uuid4().hex}_{os.path.basename(original_name)}"
174 dest_path = os.path.join(tmp_dir, safe_name)
175 f.save(dest_path)
176 mime = (
177 f.content_type
178 or _mimetypes.guess_type(original_name)[0]
179 or "application/octet-stream"
180 )
181 saved.append({"original": original_name, "saved": safe_name, "mime": mime})
182 mimetypes.append(mime)
184 session["share_pending"] = {
185 "tmp_dir": tmp_dir,
186 "files": saved,
187 "title": title,
188 }
190 destinations = _allowed_destinations(mimetypes)
191 return render_template(
192 "pwa/share_target.html",
193 pending_files=saved,
194 title=title,
195 destinations=destinations,
196 dest_labels=_dest_labels(),
197 aircraft_list=_get_user_aircraft(),
198 categories=_category_labels(),
199 )
202@pwa_bp.route("/pwa/shared/confirm", methods=["POST"])
203@login_required
204def share_confirm() -> ResponseReturnValue:
205 pending = session.get("share_pending")
206 if not pending:
207 flash(_("No pending shared files. Please try sharing again."), "warning")
208 return redirect(url_for("index"))
210 destination = request.form.get("destination", "")
211 tmp_dir: str = pending["tmp_dir"]
212 files_meta: list[dict[str, str]] = pending["files"]
213 title: str = pending.get("title", "")
215 if destination == "document":
216 return _process_document(tmp_dir, files_meta, title)
218 if destination == "expense":
219 _cleanup_temp(tmp_dir)
220 flash(_("File received — please attach it manually to the expense."), "info")
221 aircraft_id_raw = request.form.get("aircraft_id", "")
222 if aircraft_id_raw:
223 try:
224 return redirect(
225 url_for("expenses.add_expense", aircraft_id=int(aircraft_id_raw))
226 )
227 except (ValueError, TypeError):
228 pass
229 return redirect(url_for("index"))
231 if destination == "maintenance":
232 _cleanup_temp(tmp_dir)
233 flash(
234 _("File received — please reference it in the maintenance notes."), "info"
235 )
236 aircraft_id_raw = request.form.get("aircraft_id", "")
237 if aircraft_id_raw:
238 try:
239 return redirect(
240 url_for(
241 "maintenance.list_triggers", aircraft_id=int(aircraft_id_raw)
242 )
243 )
244 except (ValueError, TypeError):
245 pass
246 return redirect(url_for("index"))
248 if destination == "flight_photo":
249 _cleanup_temp(tmp_dir)
250 flash(
251 _("File received — please attach it manually when logging the flight."),
252 "info",
253 )
254 return redirect(url_for("flights.log_flight"))
256 _cleanup_temp(tmp_dir)
257 flash(_("Unknown destination."), "danger")
258 return redirect(url_for("index"))
261def _process_document(
262 tmp_dir: str, files_meta: list[dict[str, str]], title: str
263) -> ResponseReturnValue:
264 tu = TenantUser.query.filter_by(user_id=session.get("user_id")).first()
265 if not tu or tu.role not in _OWNER_ROLES:
266 _cleanup_temp(tmp_dir)
267 abort(403)
269 aircraft_id_raw = request.form.get("aircraft_id", "")
270 try:
271 aircraft_id = int(aircraft_id_raw)
272 except (ValueError, TypeError):
273 _cleanup_temp(tmp_dir)
274 flash(_("Please select an aircraft."), "danger")
275 return redirect(url_for("index"))
277 ac = Aircraft.query.filter_by(id=aircraft_id, tenant_id=tu.tenant_id).first()
278 if not ac:
279 _cleanup_temp(tmp_dir)
280 abort(404)
282 tenant = db.session.get(Tenant, tu.tenant_id)
283 assert tenant is not None # FK guarantees this
285 _raw_category = request.form.get("category") or ""
286 category = next((c for c in DocCategory.ALL if c == _raw_category), None)
287 is_sensitive = bool(request.form.get("is_sensitive"))
288 valid_until_str = request.form.get("valid_until", "").strip()
289 valid_until: _date | None = None
290 if valid_until_str:
291 with contextlib.suppress(ValueError):
292 valid_until = _date.fromisoformat(valid_until_str)
294 doc_title = title.strip() or None
295 upload_folder = current_app.config.get("UPLOAD_FOLDER", "/data/uploads")
297 for file_meta in files_meta:
298 src_path = os.path.join(tmp_dir, file_meta["saved"])
299 original_name = file_meta["original"]
300 mime = file_meta["mime"]
301 ext = next((e for m, e in _MIME_TO_EXT.items() if m == mime), "")
303 if category:
304 slug = _ensure_tenant_slug(tenant)
305 safe_reg = ac.registration.replace("/", "-").replace(" ", "-").upper()
306 today = _date.today().isoformat()
307 safe_t = os.path.basename(
308 _safe_path_component(doc_title or os.path.splitext(original_name)[0])
309 )[:100]
310 fname = f"{today} - {safe_t}{ext}"
311 rel_dir = os.path.join(slug, safe_reg, category)
312 full_dir = os.path.join(upload_folder, rel_dir)
313 os.makedirs(full_dir, exist_ok=True)
314 stored = os.path.join(rel_dir, fname)
315 dest_full = os.path.join(upload_folder, stored)
316 if os.path.exists(dest_full):
317 base, ext2 = os.path.splitext(fname)
318 stored = os.path.join(rel_dir, f"{base}_{uuid.uuid4().hex[:6]}{ext2}")
319 dest_full = os.path.join(upload_folder, stored)
320 else:
321 stored_name = f"doc_share_{uuid.uuid4().hex[:12]}{ext}"
322 os.makedirs(upload_folder, exist_ok=True)
323 stored = stored_name
324 dest_full = os.path.join(upload_folder, stored)
326 shutil.copy2(src_path, dest_full)
327 size = os.path.getsize(dest_full)
329 doc = Document(
330 aircraft_id=ac.id,
331 filename=stored,
332 original_filename=original_name,
333 mime_type=mime,
334 size_bytes=size,
335 title=doc_title,
336 category=category,
337 valid_until=valid_until,
338 is_sensitive=is_sensitive,
339 )
340 db.session.add(doc)
342 db.session.commit()
343 _cleanup_temp(tmp_dir)
344 flash(_("Document uploaded."), "success")
345 return redirect(url_for("documents.list_documents", aircraft_id=ac.id))