Coverage for app/auth/routes.py: 100%

524 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1import logging 

2import os 

3import time 

4from datetime import datetime, timedelta, timezone 

5 

6import pyotp 

7import pw_hash as _pw 

8from extensions import _rate_limiting_disabled, cache as _cache, limiter as _limiter # pyright: ignore[reportMissingImports] 

9from flask import ( 

10 Blueprint, 

11 current_app, 

12 flash, 

13 redirect, 

14 render_template, 

15 request, 

16 session, 

17 url_for, 

18) 

19from flask.typing import ResponseReturnValue # pyright: ignore[reportMissingImports] 

20from markupsafe import Markup, escape 

21 

22from flask_babel import gettext as _ # pyright: ignore[reportMissingImports] 

23 

24from models import ( 

25 OperatingModel, 

26 PasswordResetToken, 

27 Role, 

28 Tenant, 

29 TenantProfile, 

30 TenantUser, 

31 User, 

32 db, 

33) 

34from utils import login_required 

35 

36_log = logging.getLogger("openhangar.auth") 

37 

38 

39def _sl(value: object) -> str: 

40 """Sanitize a value for log output — strips CR/LF to prevent log injection (CWE-117).""" 

41 return str(value).replace("\r\n", "").replace("\n", "").replace("\r", "") 

42 

43 

44# Pre-computed Argon2id dummy hash used to equalise timing when no user record 

45# is found (prevents timing-based account enumeration — CWE-208). 

46_DUMMY_HASH: str = _pw.DUMMY_HASH 

47 

48auth_bp = Blueprint("auth", __name__) 

49 

50_COMPLEX_MODELS = frozenset({"shared_ownership", "flight_club", "flight_school"}) 

51 

52# ── Login brute-force protection ────────────────────────────────────────────── 

53# Two independent layers: 

54# 1. IP backoff — progressive delay applied to any IP accumulating failures 

55# 2. Account lock — 30-minute cache-based lock after 10 consecutive failures 

56# on the same e-mail address (auto-unlocks, no admin needed) 

57 

58_IP_FAIL_TTL = 900 # reset IP counter if silent for 15 min 

59_ACCT_FAIL_TTL = 1800 # reset account counter after 30 min 

60_ACCT_LOCK_MINUTES = 30 

61_ACCT_LOCK_TTL = _ACCT_LOCK_MINUTES * 60 

62_ACCT_LOCK_THRESHOLD = 10 

63 

64# Delay (seconds) applied before returning a failed-login response, keyed on 

65# the number of consecutive failures from the same IP address. 

66_IP_BACKOFF: dict[int, int] = {3: 2, 4: 10, 5: 30} 

67_IP_BACKOFF_MAX = 60 # applied for 6 or more failures 

68 

69 

70def _ip_backoff_delay(ip: str) -> int: 

71 count: int = _cache.get(f"login_fail_ip:{ip}") or 0 

72 if count < 3: 

73 return 0 

74 return _IP_BACKOFF.get(count, _IP_BACKOFF_MAX) 

75 

76 

77def _increment_ip_failures(ip: str) -> int: 

78 count: int = (_cache.get(f"login_fail_ip:{ip}") or 0) + 1 

79 _cache.set(f"login_fail_ip:{ip}", count, timeout=_IP_FAIL_TTL) 

80 return count 

81 

82 

83def _clear_ip_failures(ip: str) -> None: 

84 _cache.delete(f"login_fail_ip:{ip}") 

85 

86 

87def _check_account_locked(email: str) -> datetime | None: 

88 """Return the lock-expiry datetime if the account is locked, else None.""" 

89 raw = _cache.get(f"login_lock_acct:{email}") 

90 if not raw: 

91 return None 

92 locked_until = datetime.fromisoformat(raw) 

93 if datetime.now(timezone.utc) < locked_until: 

94 return locked_until 

95 # Expired — clean up 

96 _cache.delete(f"login_lock_acct:{email}") 

97 _cache.delete(f"login_fail_acct:{email}") 

98 return None 

99 

100 

101def _increment_account_failures(email: str) -> int: 

102 count: int = (_cache.get(f"login_fail_acct:{email}") or 0) + 1 

103 _cache.set(f"login_fail_acct:{email}", count, timeout=_ACCT_FAIL_TTL) 

104 return count 

105 

106 

107def _lock_account(email: str, ip: str) -> None: 

108 locked_until = datetime.now(timezone.utc) + timedelta(minutes=_ACCT_LOCK_MINUTES) 

109 _cache.set( 

110 f"login_lock_acct:{email}", 

111 locked_until.isoformat(), 

112 timeout=_ACCT_LOCK_TTL, 

113 ) 

114 _cache.delete(f"login_fail_acct:{email}") 

115 _log.warning( 

116 "[SECURITY] auth.login.account_locked email=%s ip=%s locked_until=%s", 

117 _sl(email), 

118 _sl(ip), 

119 _sl(locked_until.isoformat()), 

120 ) 

121 

122 

123def _clear_account_failures(email: str) -> None: 

124 _cache.delete(f"login_fail_acct:{email}") 

125 _cache.delete(f"login_lock_acct:{email}") 

126 

127 

128def _no_users() -> bool: 

129 return db.session.query(User).count() == 0 

130 

131 

132def _is_demo() -> bool: 

133 return os.environ.get("OPENHANGAR_ENV") == "demo" 

134 

135 

136# ── /login ──────────────────────────────────────────────────────────────────── 

137 

138 

139@auth_bp.route("/login", methods=["GET", "POST"]) 

140@_limiter.limit( 

141 lambda: current_app.config.get("LOGIN_RATE_LIMIT", "20 per minute"), 

142 methods=["POST"], 

143 exempt_when=_rate_limiting_disabled, 

144) 

145def login() -> ResponseReturnValue: 

146 if _no_users(): 

147 return redirect(url_for("auth.setup")) 

148 

149 if session.get("user_id"): 

150 return redirect(url_for("index")) 

151 

152 if request.method == "POST": 

153 return _login_post() 

154 

155 step = request.args.get("step", "credentials") 

156 # TOTP steps only accessible after credentials have been verified 

157 if step == "totp" and not session.get("login_pending_user_id"): 

158 return redirect(url_for("auth.login")) 

159 if step == "totp-enrol" and not session.get("totp_must_enrol"): 

160 return redirect(url_for("auth.login")) 

161 

162 totp_secret = session.get("enrol_totp_secret") 

163 totp_uri = session.get("enrol_totp_uri") 

164 return render_template( 

165 "auth/login.html", step=step, totp_secret=totp_secret, totp_uri=totp_uri 

166 ) 

167 

168 

169def _login_post() -> ResponseReturnValue: 

170 step = request.form.get("step") 

171 if step == "totp": 

172 return _login_totp() 

173 if step == "totp-enrol": 

174 return _login_totp_enrol() 

175 return _login_credentials() 

176 

177 

178def _login_credentials() -> ResponseReturnValue: 

179 email = request.form.get("email", "").strip().lower() 

180 password = request.form.get("password", "") 

181 ip = _sl(request.remote_addr or "") 

182 

183 # ── Account lockout check (before any verification) ────────────────────── 

184 locked_until = _check_account_locked(email) 

185 if locked_until: 

186 _log.warning( 

187 "[SECURITY] auth.login.account_blocked email=%s ip=%s locked_until=%s", 

188 _sl(email), 

189 ip, 

190 _sl(locked_until.isoformat()), 

191 ) 

192 flash( 

193 _( 

194 "Account temporarily locked due to too many failed attempts." 

195 " Try again in %(minutes)s minutes.", 

196 minutes=_ACCT_LOCK_MINUTES, 

197 ), 

198 "danger", 

199 ) 

200 return render_template("auth/login.html", step="credentials") 

201 

202 user = User.query.filter_by(email=email, is_active=True).first() 

203 password_hash = user.password_hash if user else _DUMMY_HASH 

204 # Always verify — never short-circuit on user is None. 

205 # Without this, a missing-user response is faster, enabling timing-based 

206 # account enumeration (CWE-208). pw_hash.verify handles both Argon2id and 

207 # legacy bcrypt hashes transparently. 

208 password_ok = _pw.verify(password, password_hash) 

209 

210 if not user or not password_ok: 

211 _log.warning( 

212 "[SECURITY] auth.credentials.failed email=%s ip=%s", 

213 _sl(email), 

214 ip, 

215 ) 

216 

217 # ── IP backoff ──────────────────────────────────────────────────────── 

218 ip_count = _increment_ip_failures(ip) 

219 delay = _ip_backoff_delay(ip) 

220 if delay: 

221 _log.warning( 

222 "[SECURITY] auth.login.backoff ip=%s failures=%s delay=%ss", 

223 ip, 

224 ip_count, 

225 delay, 

226 ) 

227 

228 # ── Account failure tracking ────────────────────────────────────────── 

229 acct_count = _increment_account_failures(email) 

230 if acct_count >= _ACCT_LOCK_THRESHOLD: 

231 _lock_account(email, ip) 

232 

233 if delay: 

234 time.sleep(delay) 

235 

236 flash(_("Invalid email or password."), "danger") 

237 return render_template("auth/login.html", step="credentials") 

238 

239 # Credentials verified — clear failure counters for this IP and account. 

240 _clear_ip_failures(ip) 

241 _clear_account_failures(email) 

242 

243 # Upgrade legacy bcrypt hashes to Argon2id transparently at login time. 

244 if _pw.needs_rehash(user.password_hash): 

245 user.password_hash = _pw.hash(password) 

246 db.session.commit() 

247 

248 # Block users whose only tenant(s) have been deactivated, unless they are 

249 # the instance admin (who must always be able to log in). 

250 if not user.is_instance_admin: 

251 active_tenant_ids = { 

252 tu.tenant_id for tu in user.tenants if tu.tenant and tu.tenant.is_active 

253 } 

254 if not active_tenant_ids: 

255 _log.warning( 

256 "[SECURITY] auth.credentials.deactivated email=%s ip=%s", 

257 _sl(email), 

258 _sl(request.remote_addr), 

259 ) 

260 flash( 

261 _("Your account has been deactivated. Contact the administrator."), 

262 "danger", 

263 ) 

264 return render_template("auth/login.html", step="credentials") 

265 

266 if user.totp_secret: 

267 session["login_pending_user_id"] = user.id 

268 return redirect(url_for("auth.login", step="totp")) 

269 

270 # If the tenant mandates TOTP and the user has none, redirect to enrolment. 

271 tu = TenantUser.query.filter_by(user_id=user.id).first() 

272 if tu and tu.tenant and tu.tenant.require_totp: 

273 totp_secret = pyotp.random_base32() 

274 totp_uri = pyotp.TOTP(totp_secret).provisioning_uri( 

275 name=user.email, issuer_name="OpenHangar" 

276 ) 

277 session["login_pending_user_id"] = user.id 

278 session["totp_must_enrol"] = True 

279 session["enrol_totp_secret"] = totp_secret 

280 session["enrol_totp_uri"] = totp_uri 

281 return redirect(url_for("auth.login", step="totp-enrol")) 

282 

283 session.clear() 

284 session["user_id"] = user.id 

285 session.permanent = True 

286 return redirect(url_for("index")) 

287 

288 

289def _login_totp() -> ResponseReturnValue: 

290 pending_id = session.get("login_pending_user_id") 

291 if not pending_id: 

292 return redirect(url_for("auth.login")) 

293 

294 user = db.session.get(User, pending_id) 

295 if not user: 

296 session.pop("login_pending_user_id", None) 

297 return redirect(url_for("auth.login")) 

298 

299 totp_code = request.form.get("totp_code", "").strip() 

300 

301 _totp_cache_key = f"totp_used:{user.id}:{totp_code}" 

302 if _cache.get(_totp_cache_key): 

303 _log.warning( 

304 "[SECURITY] auth.totp.replay user_id=%s ip=%s", 

305 _sl(pending_id), 

306 _sl(request.remote_addr), 

307 ) 

308 flash(_("Invalid authenticator code."), "danger") 

309 return render_template("auth/login.html", step="totp") 

310 

311 if not pyotp.TOTP(str(user.totp_secret)).verify(totp_code, valid_window=1): 

312 _log.warning( 

313 "[SECURITY] auth.totp.failed user_id=%s ip=%s", 

314 _sl(pending_id), 

315 _sl(request.remote_addr), 

316 ) 

317 flash(_("Invalid authenticator code."), "danger") 

318 return render_template("auth/login.html", step="totp") 

319 

320 _cache.set(_totp_cache_key, True, timeout=90) 

321 

322 session.clear() 

323 session["user_id"] = user.id 

324 session.permanent = True 

325 return redirect(url_for("index")) 

326 

327 

328def _login_totp_enrol() -> ResponseReturnValue: 

329 """Mandatory TOTP enrolment during login when tenant.require_totp is True.""" 

330 pending_id = session.get("login_pending_user_id") 

331 totp_secret = session.get("enrol_totp_secret") 

332 totp_uri = session.get("enrol_totp_uri") 

333 if not pending_id or not totp_secret: 

334 return redirect(url_for("auth.login")) 

335 

336 user = db.session.get(User, pending_id) 

337 if not user: 

338 session.clear() 

339 return redirect(url_for("auth.login")) 

340 

341 totp_code = request.form.get("totp_code", "").strip() 

342 if not pyotp.TOTP(totp_secret).verify(totp_code, valid_window=1): 

343 flash(_("Invalid code — please try again."), "danger") 

344 return render_template( 

345 "auth/login.html", 

346 step="totp-enrol", 

347 totp_secret=totp_secret, 

348 totp_uri=totp_uri, 

349 ) 

350 

351 user.totp_secret = totp_secret 

352 db.session.commit() 

353 _log.warning( 

354 "[SECURITY] auth.totp.enrolment_forced user_id=%s ip=%s", 

355 _sl(str(user.id)), 

356 _sl(request.remote_addr), 

357 ) 

358 

359 session.clear() 

360 session["user_id"] = user.id 

361 session.permanent = True 

362 flash(_("Two-factor authentication is now active on your account."), "success") 

363 return redirect(url_for("index")) 

364 

365 

366# ── /logout ─────────────────────────────────────────────────────────────────── 

367 

368 

369@auth_bp.route("/logout") 

370def logout() -> ResponseReturnValue: 

371 slot_id = session.get("demo_slot_id") 

372 session.clear() 

373 if slot_id is not None: 

374 # Preserve the slot so the visitor can re-enter the same sandbox 

375 session["demo_slot_id"] = slot_id 

376 return redirect(url_for("index")) 

377 

378 

379# ── /setup ──────────────────────────────────────────────────────────────────── 

380 

381_WIZARD_STEPS = [ 

382 "account", 

383 "totp", 

384 "operating_model", 

385 "aircraft_count", 

386 "org_name", 

387 "co_owners", 

388 "summary", 

389] 

390 

391_OPERATING_MODELS = { 

392 OperatingModel.SOLE_PILOT, 

393 OperatingModel.SOLE_OPERATOR, 

394 OperatingModel.SHARED_OWNERSHIP, 

395 OperatingModel.FLIGHT_CLUB, 

396 OperatingModel.FLIGHT_SCHOOL, 

397} 

398 

399 

400def _wizard_phase(step: str) -> int: 

401 """Map a wizard step to a 1-based display phase (for the progress indicator).""" 

402 if step in ("account", "totp"): 

403 return 1 

404 if step == "operating_model": 

405 return 2 

406 if step in ("aircraft_count", "org_name", "co_owners"): 

407 return 3 

408 return 4 # summary 

409 

410 

411def _next_step(current: str) -> str: 

412 """Compute the next wizard step based on current step and session choices.""" 

413 operating_model = session.get("setup_operating_model", "") 

414 

415 if current == "account": # pragma: no cover 

416 return "totp" 

417 if current == "totp": # pragma: no cover 

418 return "operating_model" 

419 if current == "operating_model": # pragma: no cover 

420 return "summary" if operating_model == "sole_pilot" else "aircraft_count" 

421 if current == "aircraft_count": 

422 if operating_model in ("flight_club", "flight_school"): 

423 return "org_name" 

424 if operating_model == "shared_ownership": 

425 return "co_owners" 

426 return "summary" 

427 if current in ("org_name", "co_owners"): # pragma: no cover 

428 return "summary" 

429 return "summary" # pragma: no cover 

430 

431 

432@auth_bp.route("/setup", methods=["GET", "POST"]) 

433@_limiter.limit("10 per minute", methods=["POST"], exempt_when=_rate_limiting_disabled) 

434def setup() -> ResponseReturnValue: 

435 if _is_demo(): 

436 flash(_("Account creation is disabled in demo mode."), "warning") 

437 return redirect(url_for("index")) 

438 

439 if not _no_users(): 

440 return redirect(url_for("config.index")) 

441 

442 # Determine current step from form data (POST) or query string (GET) 

443 step = request.form.get("step") or request.args.get("step", "account") 

444 if step not in _WIZARD_STEPS: 

445 return redirect(url_for("auth.setup")) 

446 

447 if request.method == "POST": 

448 if step == "account": 

449 return _setup_account() 

450 if step == "totp": 

451 return _setup_totp() 

452 if step == "operating_model": 

453 return _setup_operating_model() 

454 if step == "aircraft_count": 

455 return _setup_aircraft_count() 

456 if step == "org_name": 

457 return _setup_org_name() 

458 if step == "co_owners": 

459 return _setup_co_owners() 

460 if step == "summary": 

461 return _setup_finish() 

462 

463 # GET handlers — validate session state before rendering each step 

464 phase = _wizard_phase(step) 

465 

466 if step == "totp": 

467 if not session.get("setup_totp_secret"): 

468 return redirect(url_for("auth.setup")) 

469 return render_template( 

470 "auth/setup.html", 

471 step="totp", 

472 phase=phase, 

473 show_review=False, 

474 totp_secret=session["setup_totp_secret"], 

475 provisioning_uri=session["setup_provisioning_uri"], 

476 ) 

477 

478 if step == "operating_model": 

479 if not session.get("setup_totp_done"): 

480 return redirect(url_for("auth.setup")) 

481 return render_template( 

482 "auth/setup.html", step="operating_model", phase=phase, show_review=False 

483 ) 

484 

485 if step == "aircraft_count": 

486 if not session.get("setup_operating_model"): 

487 return redirect(url_for("auth.setup", step="operating_model")) 

488 return render_template( 

489 "auth/setup.html", 

490 step="aircraft_count", 

491 phase=phase, 

492 show_review=session.get("setup_operating_model") in _COMPLEX_MODELS, 

493 operating_model=session.get("setup_operating_model"), 

494 ) 

495 

496 if step == "org_name": 

497 model = session.get("setup_operating_model", "") 

498 if model not in ("flight_club", "flight_school"): 

499 return redirect(url_for("auth.setup", step="summary")) 

500 return render_template( 

501 "auth/setup.html", 

502 step="org_name", 

503 phase=phase, 

504 show_review=True, 

505 operating_model=model, 

506 ) 

507 

508 if step == "co_owners": 

509 if session.get("setup_operating_model") != "shared_ownership": 

510 return redirect(url_for("auth.setup", step="summary")) 

511 return render_template( 

512 "auth/setup.html", step="co_owners", phase=phase, show_review=True 

513 ) 

514 

515 if step == "summary": 

516 if not session.get("setup_operating_model"): 

517 return redirect(url_for("auth.setup", step="operating_model")) 

518 if session.get("setup_operating_model") in ("sole_pilot", "sole_operator"): 

519 return redirect(url_for("auth.setup", step="operating_model")) 

520 return render_template( 

521 "auth/setup.html", 

522 step="summary", 

523 phase=phase, 

524 show_review=True, 

525 operating_model=session.get("setup_operating_model"), 

526 aircraft_count=session.get("setup_aircraft_count"), 

527 allows_rental=session.get("setup_allows_rental", False), 

528 org_name=session.get("setup_org_name", ""), 

529 co_owners=session.get("setup_co_owners", []), 

530 setup_name=session.get("setup_name", ""), 

531 setup_email=session.get("setup_email", ""), 

532 ) 

533 

534 return render_template( 

535 "auth/setup.html", step="account", phase=1, show_review=False 

536 ) 

537 

538 

539def _setup_account() -> ResponseReturnValue: 

540 email = request.form.get("email", "").strip().lower() 

541 password = request.form.get("password", "") 

542 name = request.form.get("name", "").strip() 

543 

544 errors = [] 

545 if not email or "@" not in email: 

546 errors.append(_("A valid email address is required.")) 

547 if len(password) < 12: 

548 errors.append(_("Password must be at least 12 characters.")) 

549 

550 if errors: 

551 for msg in errors: 

552 flash(msg, "danger") 

553 return render_template( 

554 "auth/setup.html", step="account", phase=1, show_review=False 

555 ) 

556 

557 totp_secret = pyotp.random_base32() 

558 provisioning_uri = pyotp.TOTP(totp_secret).provisioning_uri( 

559 name=email, issuer_name="OpenHangar" 

560 ) 

561 

562 session["setup_email"] = email 

563 session["setup_name"] = name or None 

564 session["setup_password_hash"] = _pw.hash(password) 

565 session["setup_totp_secret"] = totp_secret 

566 session["setup_provisioning_uri"] = provisioning_uri 

567 

568 return redirect(url_for("auth.setup", step="totp")) 

569 

570 

571def _setup_totp() -> ResponseReturnValue: 

572 email = session.get("setup_email") 

573 password_hash = session.get("setup_password_hash") 

574 totp_secret = session.get("setup_totp_secret") 

575 provisioning_uri = session.get("setup_provisioning_uri") 

576 

577 if not all([email, password_hash, totp_secret]): 

578 flash(_("Session expired. Please start over."), "danger") 

579 return redirect(url_for("auth.setup")) 

580 

581 # "Skip" path — user will not have TOTP 

582 if request.form.get("action") == "skip": 

583 session["setup_totp_to_save"] = None 

584 else: 

585 totp_code = request.form.get("totp_code", "").strip() 

586 if not pyotp.TOTP(str(totp_secret)).verify(totp_code, valid_window=1): 

587 flash(_("Invalid code. Please try again."), "danger") 

588 return render_template( 

589 "auth/setup.html", 

590 step="totp", 

591 phase=1, 

592 show_review=False, 

593 totp_secret=totp_secret, 

594 provisioning_uri=provisioning_uri, 

595 ) 

596 session["setup_totp_to_save"] = totp_secret 

597 

598 session["setup_totp_done"] = True 

599 return redirect(url_for("auth.setup", step="operating_model")) 

600 

601 

602def _setup_operating_model() -> ResponseReturnValue: 

603 if not session.get("setup_totp_done"): 

604 return redirect(url_for("auth.setup")) 

605 

606 model = request.form.get("operating_model", "") 

607 valid = {m.value for m in _OPERATING_MODELS} 

608 if model not in valid: 

609 flash(_("Please select an option."), "danger") 

610 return render_template( 

611 "auth/setup.html", step="operating_model", phase=2, show_review=False 

612 ) 

613 

614 session["setup_operating_model"] = model 

615 if model == "sole_pilot": 

616 return _setup_finish() 

617 return redirect(url_for("auth.setup", step="aircraft_count")) 

618 

619 

620def _setup_aircraft_count() -> ResponseReturnValue: 

621 if not session.get("setup_operating_model"): 

622 return redirect(url_for("auth.setup", step="operating_model")) 

623 

624 count_str = request.form.get("aircraft_count", "").strip() 

625 try: 

626 count = int(count_str) 

627 if count < 0: 

628 raise ValueError 

629 except (ValueError, TypeError): 

630 flash(_("Please enter a valid number of aircraft (0 or more)."), "danger") 

631 return render_template( 

632 "auth/setup.html", 

633 step="aircraft_count", 

634 phase=3, 

635 show_review=session.get("setup_operating_model") in _COMPLEX_MODELS, 

636 operating_model=session.get("setup_operating_model"), 

637 ) 

638 

639 allows_rental = "allows_rental" in request.form 

640 session["setup_aircraft_count"] = count 

641 session["setup_allows_rental"] = allows_rental 

642 

643 next_step = _next_step("aircraft_count") 

644 if next_step == "summary": 

645 return _setup_finish() 

646 return redirect(url_for("auth.setup", step=next_step)) 

647 

648 

649def _setup_org_name() -> ResponseReturnValue: 

650 model = session.get("setup_operating_model", "") 

651 if model not in ("flight_club", "flight_school"): 

652 return redirect(url_for("auth.setup", step="summary")) 

653 

654 org_name = request.form.get("org_name", "").strip() 

655 if not org_name: 

656 flash(_("Please enter a name."), "danger") 

657 return render_template( 

658 "auth/setup.html", 

659 step="org_name", 

660 phase=3, 

661 show_review=True, 

662 operating_model=model, 

663 ) 

664 

665 session["setup_org_name"] = org_name 

666 return redirect(url_for("auth.setup", step="summary")) 

667 

668 

669def _setup_co_owners() -> ResponseReturnValue: 

670 if session.get("setup_operating_model") != "shared_ownership": 

671 return redirect(url_for("auth.setup", step="summary")) 

672 

673 names = request.form.getlist("co_owner_name") 

674 emails = request.form.getlist("co_owner_email") 

675 roles = request.form.getlist("co_owner_role") 

676 

677 co_owners = [] 

678 for name, email, role in zip(names, emails, roles): 

679 name = name.strip() 

680 email = email.strip().lower() 

681 role = role if role in ("owner", "admin") else "owner" 

682 if name or email: 

683 co_owners.append( 

684 {"name": name or None, "email": email or None, "role": role} 

685 ) 

686 

687 session["setup_co_owners"] = co_owners 

688 return redirect(url_for("auth.setup", step="summary")) 

689 

690 

691def _setup_finish() -> ResponseReturnValue: 

692 required = ["setup_email", "setup_password_hash", "setup_operating_model"] 

693 if not all(session.get(k) for k in required) or not session.get("setup_totp_done"): 

694 flash(_("Session expired. Please start over."), "danger") 

695 return redirect(url_for("auth.setup")) 

696 

697 from models import UserInvitation 

698 

699 operating_model_raw = session.get("setup_operating_model", "") 

700 aircraft_count = session.get("setup_aircraft_count") 

701 allows_rental = bool(session.get("setup_allows_rental", False)) 

702 org_name = session.get("setup_org_name", "") 

703 co_owners = session.get("setup_co_owners", []) 

704 

705 # Choose tenant name based on operating model 

706 tenant_name = "My Hangar" 

707 if operating_model_raw in ("flight_club", "flight_school") and org_name: 

708 tenant_name = org_name 

709 

710 tenant = Tenant(name=tenant_name) 

711 db.session.add(tenant) 

712 db.session.flush() 

713 

714 # Auto-generate the Hangar ID from the tenant name so canonical document 

715 # paths work immediately, without requiring a trip to Settings first. 

716 from documents.routes import _ensure_tenant_slug # pyright: ignore[reportMissingImports] 

717 

718 _ensure_tenant_slug(tenant) 

719 

720 user = User( 

721 email=session["setup_email"], 

722 password_hash=session["setup_password_hash"], 

723 totp_secret=session.get("setup_totp_to_save"), 

724 name=session.get("setup_name"), 

725 is_active=True, 

726 is_instance_admin=True, 

727 ) 

728 db.session.add(user) 

729 db.session.flush() 

730 

731 db.session.add(TenantUser(user_id=user.id, tenant_id=tenant.id, role=Role.ADMIN)) 

732 

733 # Determine profile values from wizard 

734 try: 

735 op_model: OperatingModel | None = OperatingModel(operating_model_raw) 

736 except ValueError: 

737 op_model = None 

738 planned_count: int | None = ( 

739 0 if operating_model_raw == "sole_pilot" else aircraft_count 

740 ) 

741 

742 club_name = org_name if operating_model_raw == "flight_club" else None 

743 school_name = org_name if operating_model_raw == "flight_school" else None 

744 

745 profile = TenantProfile( 

746 tenant_id=tenant.id, 

747 operating_model=op_model, 

748 planned_aircraft_count=planned_count, 

749 allows_rental=allows_rental, 

750 club_name=club_name, 

751 school_name=school_name, 

752 setup_complete=True, 

753 ) 

754 db.session.add(profile) 

755 

756 # Create co-owner invitations (shared_ownership path) 

757 for co in co_owners: 

758 inv_role = Role.ADMIN if co.get("role") == "admin" else Role.OWNER 

759 inv = UserInvitation( 

760 tenant_id=tenant.id, 

761 invited_by_user_id=user.id, 

762 email=co.get("email") or None, 

763 display_name=co.get("name") or None, 

764 role=inv_role, 

765 expires_at=datetime.now(timezone.utc) + timedelta(days=7), 

766 ) 

767 db.session.add(inv) 

768 

769 db.session.commit() 

770 

771 aircraft_url = url_for("aircraft.new_aircraft") 

772 flight_url = url_for("flights.log_flight") 

773 is_pilot_ctx = operating_model_raw in ( 

774 "sole_pilot", 

775 "sole_operator", 

776 "shared_ownership", 

777 ) 

778 is_operator_ctx = operating_model_raw != "sole_pilot" 

779 welcome = escape(_("Setup complete. Welcome to OpenHangar!")) 

780 aircraft_label = _("Add your first aircraft") 

781 flight_label = _("Register your first flight") 

782 aircraft_link = Markup( 

783 f'<a href="{aircraft_url}" class="alert-link">{escape(aircraft_label)}</a>' 

784 ) 

785 flight_link = Markup( 

786 f'<a href="{flight_url}" class="alert-link">{escape(flight_label)}</a>' 

787 ) 

788 if is_pilot_ctx and is_operator_ctx: 

789 msg = Markup(f"{welcome} {aircraft_link} · {flight_link}") 

790 elif is_operator_ctx: 

791 msg = Markup(f"{welcome} {aircraft_link}") 

792 else: 

793 msg = Markup(f"{welcome} {flight_link}") 

794 

795 _clear_setup_session() 

796 session["user_id"] = user.id 

797 session.permanent = True 

798 flash(msg, "success") 

799 return redirect(url_for("index")) 

800 

801 

802def _clear_setup_session() -> None: 

803 for key in ( 

804 "setup_email", 

805 "setup_name", 

806 "setup_password_hash", 

807 "setup_totp_secret", 

808 "setup_provisioning_uri", 

809 "setup_totp_to_save", 

810 "setup_totp_done", 

811 "setup_operating_model", 

812 "setup_aircraft_count", 

813 "setup_allows_rental", 

814 "setup_org_name", 

815 "setup_co_owners", 

816 ): 

817 session.pop(key, None) 

818 

819 

820# ── /profile ────────────────────────────────────────────────────────────────── 

821 

822 

823@auth_bp.route("/profile", methods=["GET", "POST"]) 

824@login_required 

825def profile() -> ResponseReturnValue: 

826 user = db.session.get(User, session["user_id"]) 

827 if not user: 

828 return redirect(url_for("auth.logout")) 

829 

830 if request.method == "POST": 

831 action = request.form.get("action") 

832 if action == "update_name": 

833 return _profile_update_name(user) 

834 if action == "change_password": 

835 return _profile_change_password(user) 

836 if action == "setup_totp": 

837 return _profile_setup_totp(user) 

838 if action == "confirm_totp": 

839 return _profile_confirm_totp(user) 

840 if action == "disable_totp": 

841 return _profile_disable_totp(user) 

842 

843 totp_secret = session.pop("profile_totp_secret", None) 

844 totp_uri = session.pop("profile_totp_uri", None) 

845 return render_template( 

846 "auth/profile.html", user=user, totp_secret=totp_secret, totp_uri=totp_uri 

847 ) 

848 

849 

850def _profile_update_name(user: User) -> ResponseReturnValue: 

851 name = request.form.get("name", "").strip() 

852 user.name = name or None 

853 db.session.commit() 

854 flash(_("Display name updated."), "success") 

855 return redirect(url_for("auth.profile")) 

856 

857 

858def _profile_change_password(user: User) -> ResponseReturnValue: 

859 current_pw = request.form.get("current_password", "") 

860 new_pw = request.form.get("new_password", "") 

861 confirm_pw = request.form.get("confirm_password", "") 

862 

863 if not _pw.verify(current_pw, user.password_hash): 

864 flash(_("Current password is incorrect."), "danger") 

865 return render_template("auth/profile.html", user=user, totp_secret=None) 

866 if len(new_pw) < 12: 

867 flash(_("Password must be at least 12 characters."), "danger") 

868 return render_template("auth/profile.html", user=user, totp_secret=None) 

869 if new_pw != confirm_pw: 

870 flash(_("Passwords do not match."), "danger") 

871 return render_template("auth/profile.html", user=user, totp_secret=None) 

872 

873 user.password_hash = _pw.hash(new_pw) 

874 db.session.commit() 

875 _log.warning( 

876 "[SECURITY] auth.password.changed user_id=%s ip=%s", 

877 _sl(str(user.id)), 

878 _sl(request.remote_addr), 

879 ) 

880 flash(_("Password updated successfully."), "success") 

881 return redirect(url_for("auth.profile")) 

882 

883 

884def _profile_setup_totp(user: User) -> ResponseReturnValue: 

885 totp_secret = pyotp.random_base32() 

886 totp_uri = pyotp.TOTP(totp_secret).provisioning_uri( 

887 name=user.email, issuer_name="OpenHangar" 

888 ) 

889 session["profile_totp_secret"] = totp_secret 

890 session["profile_totp_uri"] = totp_uri 

891 return render_template( 

892 "auth/profile.html", user=user, totp_secret=totp_secret, totp_uri=totp_uri 

893 ) 

894 

895 

896def _profile_confirm_totp(user: User) -> ResponseReturnValue: 

897 totp_secret = session.get("profile_totp_secret") 

898 totp_uri = session.get("profile_totp_uri") 

899 if not totp_secret: 

900 flash(_("Session expired. Please try again."), "danger") 

901 return redirect(url_for("auth.profile")) 

902 

903 code = request.form.get("totp_code", "").strip() 

904 if not pyotp.TOTP(totp_secret).verify(code, valid_window=1): 

905 flash(_("Invalid code. Please try again."), "danger") 

906 return render_template( 

907 "auth/profile.html", user=user, totp_secret=totp_secret, totp_uri=totp_uri 

908 ) 

909 

910 user.totp_secret = totp_secret 

911 db.session.commit() 

912 session.pop("profile_totp_secret", None) 

913 session.pop("profile_totp_uri", None) 

914 _log.warning( 

915 "[SECURITY] auth.totp.enabled user_id=%s ip=%s", 

916 _sl(str(user.id)), 

917 _sl(request.remote_addr), 

918 ) 

919 flash(_("Two-factor authentication enabled."), "success") 

920 return redirect(url_for("auth.profile")) 

921 

922 

923def _profile_disable_totp(user: User) -> ResponseReturnValue: 

924 tu = TenantUser.query.filter_by(user_id=user.id).first() 

925 if tu and tu.tenant and tu.tenant.require_totp: 

926 flash( 

927 _( 

928 "Your administrator requires two-factor authentication." 

929 " It cannot be disabled on this account." 

930 ), 

931 "danger", 

932 ) 

933 return redirect(url_for("auth.profile")) 

934 current_pw = request.form.get("current_password", "") 

935 if not _pw.verify(current_pw, user.password_hash): 

936 flash(_("Current password is incorrect."), "danger") 

937 return redirect(url_for("auth.profile")) 

938 user.totp_secret = None 

939 db.session.commit() 

940 _log.warning( 

941 "[SECURITY] auth.totp.disabled user_id=%s ip=%s", 

942 _sl(str(user.id)), 

943 _sl(request.remote_addr), 

944 ) 

945 flash(_("Two-factor authentication disabled."), "success") 

946 return redirect(url_for("auth.profile")) 

947 

948 

949# ── /reset-password/<token> ─────────────────────────────────────────────────── 

950 

951 

952@auth_bp.route("/reset-password/<token>", methods=["GET", "POST"]) 

953def reset_password(token: str) -> ResponseReturnValue: 

954 """Consume a PasswordResetToken generated by the instance admin.""" 

955 prt = PasswordResetToken.query.filter_by(token=token).first_or_404() 

956 

957 if prt.is_used: 

958 flash(_("This reset link has already been used."), "danger") 

959 return redirect(url_for("auth.login")) 

960 

961 if prt.is_expired: 

962 flash( 

963 _("This reset link has expired. Ask the administrator for a new one."), 

964 "danger", 

965 ) 

966 return redirect(url_for("auth.login")) 

967 

968 if request.method == "POST": 

969 new_pw = request.form.get("new_password", "") 

970 confirm_pw = request.form.get("confirm_password", "") 

971 

972 if len(new_pw) < 12: 

973 flash(_("Password must be at least 12 characters."), "danger") 

974 return render_template("auth/reset_password.html", token=token) 

975 if new_pw != confirm_pw: 

976 flash(_("Passwords do not match."), "danger") 

977 return render_template("auth/reset_password.html", token=token) 

978 

979 user = db.session.get(User, prt.user_id) 

980 if not user: # pragma: no cover — token CASCADE-deletes with user 

981 flash(_("User not found."), "danger") 

982 return redirect(url_for("auth.login")) 

983 

984 user.password_hash = _pw.hash(new_pw) 

985 user.totp_secret = None # clear TOTP so the user can re-enrol 

986 prt.used_at = datetime.now(timezone.utc) 

987 db.session.commit() 

988 

989 flash(_("Password reset successfully. You can now log in."), "success") 

990 return redirect(url_for("auth.login")) 

991 

992 return render_template("auth/reset_password.html", token=token)