Coverage for app/pwa/routes.py: 100%

174 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1import contextlib 

2import mimetypes as _mimetypes 

3import os 

4import re as _re 

5import shutil 

6import tempfile 

7import uuid 

8from datetime import date as _date 

9from typing import cast 

10 

11from flask import ( # pyright: ignore[reportMissingImports] 

12 Blueprint, 

13 abort, 

14 current_app, 

15 flash, 

16 redirect, 

17 render_template, 

18 request, 

19 session, 

20 url_for, 

21) 

22from flask.typing import ResponseReturnValue # pyright: ignore[reportMissingImports] 

23from flask_babel import gettext as _ # pyright: ignore[reportMissingImports] 

24 

25from models import ( # pyright: ignore[reportMissingImports] 

26 Aircraft, 

27 DocCategory, 

28 Document, 

29 Role, 

30 Tenant, 

31 TenantUser, 

32 db, 

33) 

34from utils import login_required # pyright: ignore[reportMissingImports] 

35 

36pwa_bp = Blueprint("pwa", __name__) 

37 

38_OWNER_ROLES = (Role.ADMIN, Role.OWNER) 

39 

40# MIME types accepted per destination 

41_DEST_ACCEPT: dict[str, frozenset[str]] = { 

42 "document": frozenset( 

43 { 

44 "application/pdf", 

45 "image/jpeg", 

46 "image/png", 

47 "image/gif", 

48 "image/webp", 

49 "image/heic", 

50 "application/msword", 

51 "application/vnd.openxmlformats-officedocument.wordprocessingml.document", 

52 "application/vnd.ms-excel", 

53 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", 

54 "text/plain", 

55 } 

56 ), 

57 "expense": frozenset( 

58 {"application/pdf", "image/jpeg", "image/png", "image/gif", "image/webp"} 

59 ), 

60 "maintenance": frozenset({"application/pdf", "image/jpeg", "image/png"}), 

61 "flight_photo": frozenset( 

62 {"image/jpeg", "image/png", "image/gif", "image/webp", "image/heic"} 

63 ), 

64} 

65 

66# Static MIME-type → file extension mapping. Used to derive the stored 

67# extension from the content type rather than the user-supplied filename, 

68# which breaks the taint chain for path-injection analysis. 

69_MIME_TO_EXT: dict[str, str] = { 

70 "application/pdf": ".pdf", 

71 "image/jpeg": ".jpg", 

72 "image/png": ".png", 

73 "image/gif": ".gif", 

74 "image/webp": ".webp", 

75 "image/heic": ".heic", 

76 "application/msword": ".doc", 

77 "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", 

78 "application/vnd.ms-excel": ".xls", 

79 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", 

80 "text/plain": ".txt", 

81} 

82 

83 

84def _allowed_destinations(mimetypes: list[str]) -> list[str]: 

85 """Return destinations that accept every provided MIME type.""" 

86 mt_set = set(mimetypes) 

87 return [dest for dest in _DEST_ACCEPT if mt_set.issubset(_DEST_ACCEPT[dest])] 

88 

89 

90def _dest_labels() -> dict[str, str]: 

91 return { 

92 "document": _("Aircraft document"), 

93 "expense": _("Expense receipt"), 

94 "maintenance": _("Maintenance record"), 

95 "flight_photo": _("Flight photo"), 

96 } 

97 

98 

99def _category_labels() -> list[tuple[str, str]]: 

100 return [ 

101 (DocCategory.MAINTENANCE, _("Maintenance")), 

102 (DocCategory.INSURANCE, _("Insurance")), 

103 (DocCategory.POH, _("POH / Flight Manual")), 

104 (DocCategory.AIRWORTHINESS, _("Airworthiness")), 

105 (DocCategory.LOGBOOK, _("Logbook")), 

106 (DocCategory.INVOICE, _("Invoice")), 

107 (DocCategory.OTHER, _("Other")), 

108 (DocCategory.UNCATEGORISED, _("Uncategorised")), 

109 ] 

110 

111 

112def _get_user_aircraft() -> list[Aircraft]: 

113 tu = TenantUser.query.filter_by(user_id=session.get("user_id")).first() 

114 if not tu: 

115 return [] 

116 return cast( 

117 list[Aircraft], 

118 Aircraft.query.filter_by(tenant_id=tu.tenant_id) 

119 .order_by(Aircraft.registration) 

120 .all(), 

121 ) 

122 

123 

124def _ensure_tenant_slug(tenant: Tenant) -> str: 

125 if tenant.slug: 

126 return str(tenant.slug) 

127 base = _re.sub(r"[^a-z0-9]+", "-", tenant.name.lower()).strip("-")[:64] 

128 slug = base 

129 n = 1 

130 while Tenant.query.filter(Tenant.slug == slug, Tenant.id != tenant.id).first(): 

131 slug = f"{base}-{n}" 

132 n += 1 

133 tenant.slug = slug 

134 db.session.flush() 

135 return slug 

136 

137 

138def _safe_path_component(s: str) -> str: 

139 return _re.sub(r'[<>:"/\\|?*\x00-\x1f]', "", s).strip() 

140 

141 

142def _cleanup_temp(tmp_dir: str) -> None: 

143 session.pop("share_pending", None) 

144 shutil.rmtree(tmp_dir, ignore_errors=True) 

145 

146 

147# ── Routes ──────────────────────────────────────────────────────────────────── 

148 

149 

150@pwa_bp.route("/pwa/shared", methods=["GET"]) 

151@login_required 

152def share_target_get() -> ResponseReturnValue: 

153 return redirect(url_for("index")) 

154 

155 

156@pwa_bp.route("/pwa/shared", methods=["POST"]) 

157@login_required 

158def share_target() -> ResponseReturnValue: 

159 files = request.files.getlist("files") 

160 title = request.form.get("title", "").strip() 

161 

162 valid_files = [f for f in files if f.filename] 

163 if not valid_files: 

164 flash(_("No files were shared."), "warning") 

165 return redirect(url_for("index")) 

166 

167 tmp_dir = tempfile.mkdtemp(prefix="oh-share-") 

168 saved: list[dict[str, str]] = [] 

169 mimetypes: list[str] = [] 

170 

171 for f in valid_files: 

172 original_name = f.filename or "unnamed" 

173 safe_name = f"{uuid.uuid4().hex}_{os.path.basename(original_name)}" 

174 dest_path = os.path.join(tmp_dir, safe_name) 

175 f.save(dest_path) 

176 mime = ( 

177 f.content_type 

178 or _mimetypes.guess_type(original_name)[0] 

179 or "application/octet-stream" 

180 ) 

181 saved.append({"original": original_name, "saved": safe_name, "mime": mime}) 

182 mimetypes.append(mime) 

183 

184 session["share_pending"] = { 

185 "tmp_dir": tmp_dir, 

186 "files": saved, 

187 "title": title, 

188 } 

189 

190 destinations = _allowed_destinations(mimetypes) 

191 return render_template( 

192 "pwa/share_target.html", 

193 pending_files=saved, 

194 title=title, 

195 destinations=destinations, 

196 dest_labels=_dest_labels(), 

197 aircraft_list=_get_user_aircraft(), 

198 categories=_category_labels(), 

199 ) 

200 

201 

202@pwa_bp.route("/pwa/shared/confirm", methods=["POST"]) 

203@login_required 

204def share_confirm() -> ResponseReturnValue: 

205 pending = session.get("share_pending") 

206 if not pending: 

207 flash(_("No pending shared files. Please try sharing again."), "warning") 

208 return redirect(url_for("index")) 

209 

210 destination = request.form.get("destination", "") 

211 tmp_dir: str = pending["tmp_dir"] 

212 files_meta: list[dict[str, str]] = pending["files"] 

213 title: str = pending.get("title", "") 

214 

215 if destination == "document": 

216 return _process_document(tmp_dir, files_meta, title) 

217 

218 if destination == "expense": 

219 _cleanup_temp(tmp_dir) 

220 flash(_("File received — please attach it manually to the expense."), "info") 

221 aircraft_id_raw = request.form.get("aircraft_id", "") 

222 if aircraft_id_raw: 

223 try: 

224 return redirect( 

225 url_for("expenses.add_expense", aircraft_id=int(aircraft_id_raw)) 

226 ) 

227 except (ValueError, TypeError): 

228 pass 

229 return redirect(url_for("index")) 

230 

231 if destination == "maintenance": 

232 _cleanup_temp(tmp_dir) 

233 flash( 

234 _("File received — please reference it in the maintenance notes."), "info" 

235 ) 

236 aircraft_id_raw = request.form.get("aircraft_id", "") 

237 if aircraft_id_raw: 

238 try: 

239 return redirect( 

240 url_for( 

241 "maintenance.list_triggers", aircraft_id=int(aircraft_id_raw) 

242 ) 

243 ) 

244 except (ValueError, TypeError): 

245 pass 

246 return redirect(url_for("index")) 

247 

248 if destination == "flight_photo": 

249 _cleanup_temp(tmp_dir) 

250 flash( 

251 _("File received — please attach it manually when logging the flight."), 

252 "info", 

253 ) 

254 return redirect(url_for("flights.log_flight")) 

255 

256 _cleanup_temp(tmp_dir) 

257 flash(_("Unknown destination."), "danger") 

258 return redirect(url_for("index")) 

259 

260 

261def _process_document( 

262 tmp_dir: str, files_meta: list[dict[str, str]], title: str 

263) -> ResponseReturnValue: 

264 tu = TenantUser.query.filter_by(user_id=session.get("user_id")).first() 

265 if not tu or tu.role not in _OWNER_ROLES: 

266 _cleanup_temp(tmp_dir) 

267 abort(403) 

268 

269 aircraft_id_raw = request.form.get("aircraft_id", "") 

270 try: 

271 aircraft_id = int(aircraft_id_raw) 

272 except (ValueError, TypeError): 

273 _cleanup_temp(tmp_dir) 

274 flash(_("Please select an aircraft."), "danger") 

275 return redirect(url_for("index")) 

276 

277 ac = Aircraft.query.filter_by(id=aircraft_id, tenant_id=tu.tenant_id).first() 

278 if not ac: 

279 _cleanup_temp(tmp_dir) 

280 abort(404) 

281 

282 tenant = db.session.get(Tenant, tu.tenant_id) 

283 assert tenant is not None # FK guarantees this 

284 

285 _raw_category = request.form.get("category") or "" 

286 category = next((c for c in DocCategory.ALL if c == _raw_category), None) 

287 is_sensitive = bool(request.form.get("is_sensitive")) 

288 valid_until_str = request.form.get("valid_until", "").strip() 

289 valid_until: _date | None = None 

290 if valid_until_str: 

291 with contextlib.suppress(ValueError): 

292 valid_until = _date.fromisoformat(valid_until_str) 

293 

294 doc_title = title.strip() or None 

295 upload_folder = current_app.config.get("UPLOAD_FOLDER", "/data/uploads") 

296 

297 for file_meta in files_meta: 

298 src_path = os.path.join(tmp_dir, file_meta["saved"]) 

299 original_name = file_meta["original"] 

300 mime = file_meta["mime"] 

301 ext = next((e for m, e in _MIME_TO_EXT.items() if m == mime), "") 

302 

303 if category: 

304 slug = _ensure_tenant_slug(tenant) 

305 safe_reg = ac.registration.replace("/", "-").replace(" ", "-").upper() 

306 today = _date.today().isoformat() 

307 safe_t = os.path.basename( 

308 _safe_path_component(doc_title or os.path.splitext(original_name)[0]) 

309 )[:100] 

310 fname = f"{today} - {safe_t}{ext}" 

311 rel_dir = os.path.join(slug, safe_reg, category) 

312 full_dir = os.path.join(upload_folder, rel_dir) 

313 os.makedirs(full_dir, exist_ok=True) 

314 stored = os.path.join(rel_dir, fname) 

315 dest_full = os.path.join(upload_folder, stored) 

316 if os.path.exists(dest_full): 

317 base, ext2 = os.path.splitext(fname) 

318 stored = os.path.join(rel_dir, f"{base}_{uuid.uuid4().hex[:6]}{ext2}") 

319 dest_full = os.path.join(upload_folder, stored) 

320 else: 

321 stored_name = f"doc_share_{uuid.uuid4().hex[:12]}{ext}" 

322 os.makedirs(upload_folder, exist_ok=True) 

323 stored = stored_name 

324 dest_full = os.path.join(upload_folder, stored) 

325 

326 shutil.copy2(src_path, dest_full) 

327 size = os.path.getsize(dest_full) 

328 

329 doc = Document( 

330 aircraft_id=ac.id, 

331 filename=stored, 

332 original_filename=original_name, 

333 mime_type=mime, 

334 size_bytes=size, 

335 title=doc_title, 

336 category=category, 

337 valid_until=valid_until, 

338 is_sensitive=is_sensitive, 

339 ) 

340 db.session.add(doc) 

341 

342 db.session.commit() 

343 _cleanup_temp(tmp_dir) 

344 flash(_("Document uploaded."), "success") 

345 return redirect(url_for("documents.list_documents", aircraft_id=ac.id))