Coverage for app/auth/routes.py: 100%
524 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1import logging
2import os
3import time
4from datetime import datetime, timedelta, timezone
6import pyotp
7import pw_hash as _pw
8from extensions import _rate_limiting_disabled, cache as _cache, limiter as _limiter # pyright: ignore[reportMissingImports]
9from flask import (
10 Blueprint,
11 current_app,
12 flash,
13 redirect,
14 render_template,
15 request,
16 session,
17 url_for,
18)
19from flask.typing import ResponseReturnValue # pyright: ignore[reportMissingImports]
20from markupsafe import Markup, escape
22from flask_babel import gettext as _ # pyright: ignore[reportMissingImports]
24from models import (
25 OperatingModel,
26 PasswordResetToken,
27 Role,
28 Tenant,
29 TenantProfile,
30 TenantUser,
31 User,
32 db,
33)
34from utils import login_required
36_log = logging.getLogger("openhangar.auth")
39def _sl(value: object) -> str:
40 """Sanitize a value for log output — strips CR/LF to prevent log injection (CWE-117)."""
41 return str(value).replace("\r\n", "").replace("\n", "").replace("\r", "")
44# Pre-computed Argon2id dummy hash used to equalise timing when no user record
45# is found (prevents timing-based account enumeration — CWE-208).
46_DUMMY_HASH: str = _pw.DUMMY_HASH
48auth_bp = Blueprint("auth", __name__)
50_COMPLEX_MODELS = frozenset({"shared_ownership", "flight_club", "flight_school"})
52# ── Login brute-force protection ──────────────────────────────────────────────
53# Two independent layers:
54# 1. IP backoff — progressive delay applied to any IP accumulating failures
55# 2. Account lock — 30-minute cache-based lock after 10 consecutive failures
56# on the same e-mail address (auto-unlocks, no admin needed)
58_IP_FAIL_TTL = 900 # reset IP counter if silent for 15 min
59_ACCT_FAIL_TTL = 1800 # reset account counter after 30 min
60_ACCT_LOCK_MINUTES = 30
61_ACCT_LOCK_TTL = _ACCT_LOCK_MINUTES * 60
62_ACCT_LOCK_THRESHOLD = 10
64# Delay (seconds) applied before returning a failed-login response, keyed on
65# the number of consecutive failures from the same IP address.
66_IP_BACKOFF: dict[int, int] = {3: 2, 4: 10, 5: 30}
67_IP_BACKOFF_MAX = 60 # applied for 6 or more failures
70def _ip_backoff_delay(ip: str) -> int:
71 count: int = _cache.get(f"login_fail_ip:{ip}") or 0
72 if count < 3:
73 return 0
74 return _IP_BACKOFF.get(count, _IP_BACKOFF_MAX)
77def _increment_ip_failures(ip: str) -> int:
78 count: int = (_cache.get(f"login_fail_ip:{ip}") or 0) + 1
79 _cache.set(f"login_fail_ip:{ip}", count, timeout=_IP_FAIL_TTL)
80 return count
83def _clear_ip_failures(ip: str) -> None:
84 _cache.delete(f"login_fail_ip:{ip}")
87def _check_account_locked(email: str) -> datetime | None:
88 """Return the lock-expiry datetime if the account is locked, else None."""
89 raw = _cache.get(f"login_lock_acct:{email}")
90 if not raw:
91 return None
92 locked_until = datetime.fromisoformat(raw)
93 if datetime.now(timezone.utc) < locked_until:
94 return locked_until
95 # Expired — clean up
96 _cache.delete(f"login_lock_acct:{email}")
97 _cache.delete(f"login_fail_acct:{email}")
98 return None
101def _increment_account_failures(email: str) -> int:
102 count: int = (_cache.get(f"login_fail_acct:{email}") or 0) + 1
103 _cache.set(f"login_fail_acct:{email}", count, timeout=_ACCT_FAIL_TTL)
104 return count
107def _lock_account(email: str, ip: str) -> None:
108 locked_until = datetime.now(timezone.utc) + timedelta(minutes=_ACCT_LOCK_MINUTES)
109 _cache.set(
110 f"login_lock_acct:{email}",
111 locked_until.isoformat(),
112 timeout=_ACCT_LOCK_TTL,
113 )
114 _cache.delete(f"login_fail_acct:{email}")
115 _log.warning(
116 "[SECURITY] auth.login.account_locked email=%s ip=%s locked_until=%s",
117 _sl(email),
118 _sl(ip),
119 _sl(locked_until.isoformat()),
120 )
123def _clear_account_failures(email: str) -> None:
124 _cache.delete(f"login_fail_acct:{email}")
125 _cache.delete(f"login_lock_acct:{email}")
128def _no_users() -> bool:
129 return db.session.query(User).count() == 0
132def _is_demo() -> bool:
133 return os.environ.get("OPENHANGAR_ENV") == "demo"
136# ── /login ────────────────────────────────────────────────────────────────────
139@auth_bp.route("/login", methods=["GET", "POST"])
140@_limiter.limit(
141 lambda: current_app.config.get("LOGIN_RATE_LIMIT", "20 per minute"),
142 methods=["POST"],
143 exempt_when=_rate_limiting_disabled,
144)
145def login() -> ResponseReturnValue:
146 if _no_users():
147 return redirect(url_for("auth.setup"))
149 if session.get("user_id"):
150 return redirect(url_for("index"))
152 if request.method == "POST":
153 return _login_post()
155 step = request.args.get("step", "credentials")
156 # TOTP steps only accessible after credentials have been verified
157 if step == "totp" and not session.get("login_pending_user_id"):
158 return redirect(url_for("auth.login"))
159 if step == "totp-enrol" and not session.get("totp_must_enrol"):
160 return redirect(url_for("auth.login"))
162 totp_secret = session.get("enrol_totp_secret")
163 totp_uri = session.get("enrol_totp_uri")
164 return render_template(
165 "auth/login.html", step=step, totp_secret=totp_secret, totp_uri=totp_uri
166 )
169def _login_post() -> ResponseReturnValue:
170 step = request.form.get("step")
171 if step == "totp":
172 return _login_totp()
173 if step == "totp-enrol":
174 return _login_totp_enrol()
175 return _login_credentials()
178def _login_credentials() -> ResponseReturnValue:
179 email = request.form.get("email", "").strip().lower()
180 password = request.form.get("password", "")
181 ip = _sl(request.remote_addr or "")
183 # ── Account lockout check (before any verification) ──────────────────────
184 locked_until = _check_account_locked(email)
185 if locked_until:
186 _log.warning(
187 "[SECURITY] auth.login.account_blocked email=%s ip=%s locked_until=%s",
188 _sl(email),
189 ip,
190 _sl(locked_until.isoformat()),
191 )
192 flash(
193 _(
194 "Account temporarily locked due to too many failed attempts."
195 " Try again in %(minutes)s minutes.",
196 minutes=_ACCT_LOCK_MINUTES,
197 ),
198 "danger",
199 )
200 return render_template("auth/login.html", step="credentials")
202 user = User.query.filter_by(email=email, is_active=True).first()
203 password_hash = user.password_hash if user else _DUMMY_HASH
204 # Always verify — never short-circuit on user is None.
205 # Without this, a missing-user response is faster, enabling timing-based
206 # account enumeration (CWE-208). pw_hash.verify handles both Argon2id and
207 # legacy bcrypt hashes transparently.
208 password_ok = _pw.verify(password, password_hash)
210 if not user or not password_ok:
211 _log.warning(
212 "[SECURITY] auth.credentials.failed email=%s ip=%s",
213 _sl(email),
214 ip,
215 )
217 # ── IP backoff ────────────────────────────────────────────────────────
218 ip_count = _increment_ip_failures(ip)
219 delay = _ip_backoff_delay(ip)
220 if delay:
221 _log.warning(
222 "[SECURITY] auth.login.backoff ip=%s failures=%s delay=%ss",
223 ip,
224 ip_count,
225 delay,
226 )
228 # ── Account failure tracking ──────────────────────────────────────────
229 acct_count = _increment_account_failures(email)
230 if acct_count >= _ACCT_LOCK_THRESHOLD:
231 _lock_account(email, ip)
233 if delay:
234 time.sleep(delay)
236 flash(_("Invalid email or password."), "danger")
237 return render_template("auth/login.html", step="credentials")
239 # Credentials verified — clear failure counters for this IP and account.
240 _clear_ip_failures(ip)
241 _clear_account_failures(email)
243 # Upgrade legacy bcrypt hashes to Argon2id transparently at login time.
244 if _pw.needs_rehash(user.password_hash):
245 user.password_hash = _pw.hash(password)
246 db.session.commit()
248 # Block users whose only tenant(s) have been deactivated, unless they are
249 # the instance admin (who must always be able to log in).
250 if not user.is_instance_admin:
251 active_tenant_ids = {
252 tu.tenant_id for tu in user.tenants if tu.tenant and tu.tenant.is_active
253 }
254 if not active_tenant_ids:
255 _log.warning(
256 "[SECURITY] auth.credentials.deactivated email=%s ip=%s",
257 _sl(email),
258 _sl(request.remote_addr),
259 )
260 flash(
261 _("Your account has been deactivated. Contact the administrator."),
262 "danger",
263 )
264 return render_template("auth/login.html", step="credentials")
266 if user.totp_secret:
267 session["login_pending_user_id"] = user.id
268 return redirect(url_for("auth.login", step="totp"))
270 # If the tenant mandates TOTP and the user has none, redirect to enrolment.
271 tu = TenantUser.query.filter_by(user_id=user.id).first()
272 if tu and tu.tenant and tu.tenant.require_totp:
273 totp_secret = pyotp.random_base32()
274 totp_uri = pyotp.TOTP(totp_secret).provisioning_uri(
275 name=user.email, issuer_name="OpenHangar"
276 )
277 session["login_pending_user_id"] = user.id
278 session["totp_must_enrol"] = True
279 session["enrol_totp_secret"] = totp_secret
280 session["enrol_totp_uri"] = totp_uri
281 return redirect(url_for("auth.login", step="totp-enrol"))
283 session.clear()
284 session["user_id"] = user.id
285 session.permanent = True
286 return redirect(url_for("index"))
289def _login_totp() -> ResponseReturnValue:
290 pending_id = session.get("login_pending_user_id")
291 if not pending_id:
292 return redirect(url_for("auth.login"))
294 user = db.session.get(User, pending_id)
295 if not user:
296 session.pop("login_pending_user_id", None)
297 return redirect(url_for("auth.login"))
299 totp_code = request.form.get("totp_code", "").strip()
301 _totp_cache_key = f"totp_used:{user.id}:{totp_code}"
302 if _cache.get(_totp_cache_key):
303 _log.warning(
304 "[SECURITY] auth.totp.replay user_id=%s ip=%s",
305 _sl(pending_id),
306 _sl(request.remote_addr),
307 )
308 flash(_("Invalid authenticator code."), "danger")
309 return render_template("auth/login.html", step="totp")
311 if not pyotp.TOTP(str(user.totp_secret)).verify(totp_code, valid_window=1):
312 _log.warning(
313 "[SECURITY] auth.totp.failed user_id=%s ip=%s",
314 _sl(pending_id),
315 _sl(request.remote_addr),
316 )
317 flash(_("Invalid authenticator code."), "danger")
318 return render_template("auth/login.html", step="totp")
320 _cache.set(_totp_cache_key, True, timeout=90)
322 session.clear()
323 session["user_id"] = user.id
324 session.permanent = True
325 return redirect(url_for("index"))
328def _login_totp_enrol() -> ResponseReturnValue:
329 """Mandatory TOTP enrolment during login when tenant.require_totp is True."""
330 pending_id = session.get("login_pending_user_id")
331 totp_secret = session.get("enrol_totp_secret")
332 totp_uri = session.get("enrol_totp_uri")
333 if not pending_id or not totp_secret:
334 return redirect(url_for("auth.login"))
336 user = db.session.get(User, pending_id)
337 if not user:
338 session.clear()
339 return redirect(url_for("auth.login"))
341 totp_code = request.form.get("totp_code", "").strip()
342 if not pyotp.TOTP(totp_secret).verify(totp_code, valid_window=1):
343 flash(_("Invalid code — please try again."), "danger")
344 return render_template(
345 "auth/login.html",
346 step="totp-enrol",
347 totp_secret=totp_secret,
348 totp_uri=totp_uri,
349 )
351 user.totp_secret = totp_secret
352 db.session.commit()
353 _log.warning(
354 "[SECURITY] auth.totp.enrolment_forced user_id=%s ip=%s",
355 _sl(str(user.id)),
356 _sl(request.remote_addr),
357 )
359 session.clear()
360 session["user_id"] = user.id
361 session.permanent = True
362 flash(_("Two-factor authentication is now active on your account."), "success")
363 return redirect(url_for("index"))
366# ── /logout ───────────────────────────────────────────────────────────────────
369@auth_bp.route("/logout")
370def logout() -> ResponseReturnValue:
371 slot_id = session.get("demo_slot_id")
372 session.clear()
373 if slot_id is not None:
374 # Preserve the slot so the visitor can re-enter the same sandbox
375 session["demo_slot_id"] = slot_id
376 return redirect(url_for("index"))
379# ── /setup ────────────────────────────────────────────────────────────────────
381_WIZARD_STEPS = [
382 "account",
383 "totp",
384 "operating_model",
385 "aircraft_count",
386 "org_name",
387 "co_owners",
388 "summary",
389]
391_OPERATING_MODELS = {
392 OperatingModel.SOLE_PILOT,
393 OperatingModel.SOLE_OPERATOR,
394 OperatingModel.SHARED_OWNERSHIP,
395 OperatingModel.FLIGHT_CLUB,
396 OperatingModel.FLIGHT_SCHOOL,
397}
400def _wizard_phase(step: str) -> int:
401 """Map a wizard step to a 1-based display phase (for the progress indicator)."""
402 if step in ("account", "totp"):
403 return 1
404 if step == "operating_model":
405 return 2
406 if step in ("aircraft_count", "org_name", "co_owners"):
407 return 3
408 return 4 # summary
411def _next_step(current: str) -> str:
412 """Compute the next wizard step based on current step and session choices."""
413 operating_model = session.get("setup_operating_model", "")
415 if current == "account": # pragma: no cover
416 return "totp"
417 if current == "totp": # pragma: no cover
418 return "operating_model"
419 if current == "operating_model": # pragma: no cover
420 return "summary" if operating_model == "sole_pilot" else "aircraft_count"
421 if current == "aircraft_count":
422 if operating_model in ("flight_club", "flight_school"):
423 return "org_name"
424 if operating_model == "shared_ownership":
425 return "co_owners"
426 return "summary"
427 if current in ("org_name", "co_owners"): # pragma: no cover
428 return "summary"
429 return "summary" # pragma: no cover
432@auth_bp.route("/setup", methods=["GET", "POST"])
433@_limiter.limit("10 per minute", methods=["POST"], exempt_when=_rate_limiting_disabled)
434def setup() -> ResponseReturnValue:
435 if _is_demo():
436 flash(_("Account creation is disabled in demo mode."), "warning")
437 return redirect(url_for("index"))
439 if not _no_users():
440 return redirect(url_for("config.index"))
442 # Determine current step from form data (POST) or query string (GET)
443 step = request.form.get("step") or request.args.get("step", "account")
444 if step not in _WIZARD_STEPS:
445 return redirect(url_for("auth.setup"))
447 if request.method == "POST":
448 if step == "account":
449 return _setup_account()
450 if step == "totp":
451 return _setup_totp()
452 if step == "operating_model":
453 return _setup_operating_model()
454 if step == "aircraft_count":
455 return _setup_aircraft_count()
456 if step == "org_name":
457 return _setup_org_name()
458 if step == "co_owners":
459 return _setup_co_owners()
460 if step == "summary":
461 return _setup_finish()
463 # GET handlers — validate session state before rendering each step
464 phase = _wizard_phase(step)
466 if step == "totp":
467 if not session.get("setup_totp_secret"):
468 return redirect(url_for("auth.setup"))
469 return render_template(
470 "auth/setup.html",
471 step="totp",
472 phase=phase,
473 show_review=False,
474 totp_secret=session["setup_totp_secret"],
475 provisioning_uri=session["setup_provisioning_uri"],
476 )
478 if step == "operating_model":
479 if not session.get("setup_totp_done"):
480 return redirect(url_for("auth.setup"))
481 return render_template(
482 "auth/setup.html", step="operating_model", phase=phase, show_review=False
483 )
485 if step == "aircraft_count":
486 if not session.get("setup_operating_model"):
487 return redirect(url_for("auth.setup", step="operating_model"))
488 return render_template(
489 "auth/setup.html",
490 step="aircraft_count",
491 phase=phase,
492 show_review=session.get("setup_operating_model") in _COMPLEX_MODELS,
493 operating_model=session.get("setup_operating_model"),
494 )
496 if step == "org_name":
497 model = session.get("setup_operating_model", "")
498 if model not in ("flight_club", "flight_school"):
499 return redirect(url_for("auth.setup", step="summary"))
500 return render_template(
501 "auth/setup.html",
502 step="org_name",
503 phase=phase,
504 show_review=True,
505 operating_model=model,
506 )
508 if step == "co_owners":
509 if session.get("setup_operating_model") != "shared_ownership":
510 return redirect(url_for("auth.setup", step="summary"))
511 return render_template(
512 "auth/setup.html", step="co_owners", phase=phase, show_review=True
513 )
515 if step == "summary":
516 if not session.get("setup_operating_model"):
517 return redirect(url_for("auth.setup", step="operating_model"))
518 if session.get("setup_operating_model") in ("sole_pilot", "sole_operator"):
519 return redirect(url_for("auth.setup", step="operating_model"))
520 return render_template(
521 "auth/setup.html",
522 step="summary",
523 phase=phase,
524 show_review=True,
525 operating_model=session.get("setup_operating_model"),
526 aircraft_count=session.get("setup_aircraft_count"),
527 allows_rental=session.get("setup_allows_rental", False),
528 org_name=session.get("setup_org_name", ""),
529 co_owners=session.get("setup_co_owners", []),
530 setup_name=session.get("setup_name", ""),
531 setup_email=session.get("setup_email", ""),
532 )
534 return render_template(
535 "auth/setup.html", step="account", phase=1, show_review=False
536 )
539def _setup_account() -> ResponseReturnValue:
540 email = request.form.get("email", "").strip().lower()
541 password = request.form.get("password", "")
542 name = request.form.get("name", "").strip()
544 errors = []
545 if not email or "@" not in email:
546 errors.append(_("A valid email address is required."))
547 if len(password) < 12:
548 errors.append(_("Password must be at least 12 characters."))
550 if errors:
551 for msg in errors:
552 flash(msg, "danger")
553 return render_template(
554 "auth/setup.html", step="account", phase=1, show_review=False
555 )
557 totp_secret = pyotp.random_base32()
558 provisioning_uri = pyotp.TOTP(totp_secret).provisioning_uri(
559 name=email, issuer_name="OpenHangar"
560 )
562 session["setup_email"] = email
563 session["setup_name"] = name or None
564 session["setup_password_hash"] = _pw.hash(password)
565 session["setup_totp_secret"] = totp_secret
566 session["setup_provisioning_uri"] = provisioning_uri
568 return redirect(url_for("auth.setup", step="totp"))
571def _setup_totp() -> ResponseReturnValue:
572 email = session.get("setup_email")
573 password_hash = session.get("setup_password_hash")
574 totp_secret = session.get("setup_totp_secret")
575 provisioning_uri = session.get("setup_provisioning_uri")
577 if not all([email, password_hash, totp_secret]):
578 flash(_("Session expired. Please start over."), "danger")
579 return redirect(url_for("auth.setup"))
581 # "Skip" path — user will not have TOTP
582 if request.form.get("action") == "skip":
583 session["setup_totp_to_save"] = None
584 else:
585 totp_code = request.form.get("totp_code", "").strip()
586 if not pyotp.TOTP(str(totp_secret)).verify(totp_code, valid_window=1):
587 flash(_("Invalid code. Please try again."), "danger")
588 return render_template(
589 "auth/setup.html",
590 step="totp",
591 phase=1,
592 show_review=False,
593 totp_secret=totp_secret,
594 provisioning_uri=provisioning_uri,
595 )
596 session["setup_totp_to_save"] = totp_secret
598 session["setup_totp_done"] = True
599 return redirect(url_for("auth.setup", step="operating_model"))
602def _setup_operating_model() -> ResponseReturnValue:
603 if not session.get("setup_totp_done"):
604 return redirect(url_for("auth.setup"))
606 model = request.form.get("operating_model", "")
607 valid = {m.value for m in _OPERATING_MODELS}
608 if model not in valid:
609 flash(_("Please select an option."), "danger")
610 return render_template(
611 "auth/setup.html", step="operating_model", phase=2, show_review=False
612 )
614 session["setup_operating_model"] = model
615 if model == "sole_pilot":
616 return _setup_finish()
617 return redirect(url_for("auth.setup", step="aircraft_count"))
620def _setup_aircraft_count() -> ResponseReturnValue:
621 if not session.get("setup_operating_model"):
622 return redirect(url_for("auth.setup", step="operating_model"))
624 count_str = request.form.get("aircraft_count", "").strip()
625 try:
626 count = int(count_str)
627 if count < 0:
628 raise ValueError
629 except (ValueError, TypeError):
630 flash(_("Please enter a valid number of aircraft (0 or more)."), "danger")
631 return render_template(
632 "auth/setup.html",
633 step="aircraft_count",
634 phase=3,
635 show_review=session.get("setup_operating_model") in _COMPLEX_MODELS,
636 operating_model=session.get("setup_operating_model"),
637 )
639 allows_rental = "allows_rental" in request.form
640 session["setup_aircraft_count"] = count
641 session["setup_allows_rental"] = allows_rental
643 next_step = _next_step("aircraft_count")
644 if next_step == "summary":
645 return _setup_finish()
646 return redirect(url_for("auth.setup", step=next_step))
649def _setup_org_name() -> ResponseReturnValue:
650 model = session.get("setup_operating_model", "")
651 if model not in ("flight_club", "flight_school"):
652 return redirect(url_for("auth.setup", step="summary"))
654 org_name = request.form.get("org_name", "").strip()
655 if not org_name:
656 flash(_("Please enter a name."), "danger")
657 return render_template(
658 "auth/setup.html",
659 step="org_name",
660 phase=3,
661 show_review=True,
662 operating_model=model,
663 )
665 session["setup_org_name"] = org_name
666 return redirect(url_for("auth.setup", step="summary"))
669def _setup_co_owners() -> ResponseReturnValue:
670 if session.get("setup_operating_model") != "shared_ownership":
671 return redirect(url_for("auth.setup", step="summary"))
673 names = request.form.getlist("co_owner_name")
674 emails = request.form.getlist("co_owner_email")
675 roles = request.form.getlist("co_owner_role")
677 co_owners = []
678 for name, email, role in zip(names, emails, roles):
679 name = name.strip()
680 email = email.strip().lower()
681 role = role if role in ("owner", "admin") else "owner"
682 if name or email:
683 co_owners.append(
684 {"name": name or None, "email": email or None, "role": role}
685 )
687 session["setup_co_owners"] = co_owners
688 return redirect(url_for("auth.setup", step="summary"))
691def _setup_finish() -> ResponseReturnValue:
692 required = ["setup_email", "setup_password_hash", "setup_operating_model"]
693 if not all(session.get(k) for k in required) or not session.get("setup_totp_done"):
694 flash(_("Session expired. Please start over."), "danger")
695 return redirect(url_for("auth.setup"))
697 from models import UserInvitation
699 operating_model_raw = session.get("setup_operating_model", "")
700 aircraft_count = session.get("setup_aircraft_count")
701 allows_rental = bool(session.get("setup_allows_rental", False))
702 org_name = session.get("setup_org_name", "")
703 co_owners = session.get("setup_co_owners", [])
705 # Choose tenant name based on operating model
706 tenant_name = "My Hangar"
707 if operating_model_raw in ("flight_club", "flight_school") and org_name:
708 tenant_name = org_name
710 tenant = Tenant(name=tenant_name)
711 db.session.add(tenant)
712 db.session.flush()
714 # Auto-generate the Hangar ID from the tenant name so canonical document
715 # paths work immediately, without requiring a trip to Settings first.
716 from documents.routes import _ensure_tenant_slug # pyright: ignore[reportMissingImports]
718 _ensure_tenant_slug(tenant)
720 user = User(
721 email=session["setup_email"],
722 password_hash=session["setup_password_hash"],
723 totp_secret=session.get("setup_totp_to_save"),
724 name=session.get("setup_name"),
725 is_active=True,
726 is_instance_admin=True,
727 )
728 db.session.add(user)
729 db.session.flush()
731 db.session.add(TenantUser(user_id=user.id, tenant_id=tenant.id, role=Role.ADMIN))
733 # Determine profile values from wizard
734 try:
735 op_model: OperatingModel | None = OperatingModel(operating_model_raw)
736 except ValueError:
737 op_model = None
738 planned_count: int | None = (
739 0 if operating_model_raw == "sole_pilot" else aircraft_count
740 )
742 club_name = org_name if operating_model_raw == "flight_club" else None
743 school_name = org_name if operating_model_raw == "flight_school" else None
745 profile = TenantProfile(
746 tenant_id=tenant.id,
747 operating_model=op_model,
748 planned_aircraft_count=planned_count,
749 allows_rental=allows_rental,
750 club_name=club_name,
751 school_name=school_name,
752 setup_complete=True,
753 )
754 db.session.add(profile)
756 # Create co-owner invitations (shared_ownership path)
757 for co in co_owners:
758 inv_role = Role.ADMIN if co.get("role") == "admin" else Role.OWNER
759 inv = UserInvitation(
760 tenant_id=tenant.id,
761 invited_by_user_id=user.id,
762 email=co.get("email") or None,
763 display_name=co.get("name") or None,
764 role=inv_role,
765 expires_at=datetime.now(timezone.utc) + timedelta(days=7),
766 )
767 db.session.add(inv)
769 db.session.commit()
771 aircraft_url = url_for("aircraft.new_aircraft")
772 flight_url = url_for("flights.log_flight")
773 is_pilot_ctx = operating_model_raw in (
774 "sole_pilot",
775 "sole_operator",
776 "shared_ownership",
777 )
778 is_operator_ctx = operating_model_raw != "sole_pilot"
779 welcome = escape(_("Setup complete. Welcome to OpenHangar!"))
780 aircraft_label = _("Add your first aircraft")
781 flight_label = _("Register your first flight")
782 aircraft_link = Markup(
783 f'<a href="{aircraft_url}" class="alert-link">{escape(aircraft_label)}</a>'
784 )
785 flight_link = Markup(
786 f'<a href="{flight_url}" class="alert-link">{escape(flight_label)}</a>'
787 )
788 if is_pilot_ctx and is_operator_ctx:
789 msg = Markup(f"{welcome} {aircraft_link} · {flight_link}")
790 elif is_operator_ctx:
791 msg = Markup(f"{welcome} {aircraft_link}")
792 else:
793 msg = Markup(f"{welcome} {flight_link}")
795 _clear_setup_session()
796 session["user_id"] = user.id
797 session.permanent = True
798 flash(msg, "success")
799 return redirect(url_for("index"))
802def _clear_setup_session() -> None:
803 for key in (
804 "setup_email",
805 "setup_name",
806 "setup_password_hash",
807 "setup_totp_secret",
808 "setup_provisioning_uri",
809 "setup_totp_to_save",
810 "setup_totp_done",
811 "setup_operating_model",
812 "setup_aircraft_count",
813 "setup_allows_rental",
814 "setup_org_name",
815 "setup_co_owners",
816 ):
817 session.pop(key, None)
820# ── /profile ──────────────────────────────────────────────────────────────────
823@auth_bp.route("/profile", methods=["GET", "POST"])
824@login_required
825def profile() -> ResponseReturnValue:
826 user = db.session.get(User, session["user_id"])
827 if not user:
828 return redirect(url_for("auth.logout"))
830 if request.method == "POST":
831 action = request.form.get("action")
832 if action == "update_name":
833 return _profile_update_name(user)
834 if action == "change_password":
835 return _profile_change_password(user)
836 if action == "setup_totp":
837 return _profile_setup_totp(user)
838 if action == "confirm_totp":
839 return _profile_confirm_totp(user)
840 if action == "disable_totp":
841 return _profile_disable_totp(user)
843 totp_secret = session.pop("profile_totp_secret", None)
844 totp_uri = session.pop("profile_totp_uri", None)
845 return render_template(
846 "auth/profile.html", user=user, totp_secret=totp_secret, totp_uri=totp_uri
847 )
850def _profile_update_name(user: User) -> ResponseReturnValue:
851 name = request.form.get("name", "").strip()
852 user.name = name or None
853 db.session.commit()
854 flash(_("Display name updated."), "success")
855 return redirect(url_for("auth.profile"))
858def _profile_change_password(user: User) -> ResponseReturnValue:
859 current_pw = request.form.get("current_password", "")
860 new_pw = request.form.get("new_password", "")
861 confirm_pw = request.form.get("confirm_password", "")
863 if not _pw.verify(current_pw, user.password_hash):
864 flash(_("Current password is incorrect."), "danger")
865 return render_template("auth/profile.html", user=user, totp_secret=None)
866 if len(new_pw) < 12:
867 flash(_("Password must be at least 12 characters."), "danger")
868 return render_template("auth/profile.html", user=user, totp_secret=None)
869 if new_pw != confirm_pw:
870 flash(_("Passwords do not match."), "danger")
871 return render_template("auth/profile.html", user=user, totp_secret=None)
873 user.password_hash = _pw.hash(new_pw)
874 db.session.commit()
875 _log.warning(
876 "[SECURITY] auth.password.changed user_id=%s ip=%s",
877 _sl(str(user.id)),
878 _sl(request.remote_addr),
879 )
880 flash(_("Password updated successfully."), "success")
881 return redirect(url_for("auth.profile"))
884def _profile_setup_totp(user: User) -> ResponseReturnValue:
885 totp_secret = pyotp.random_base32()
886 totp_uri = pyotp.TOTP(totp_secret).provisioning_uri(
887 name=user.email, issuer_name="OpenHangar"
888 )
889 session["profile_totp_secret"] = totp_secret
890 session["profile_totp_uri"] = totp_uri
891 return render_template(
892 "auth/profile.html", user=user, totp_secret=totp_secret, totp_uri=totp_uri
893 )
896def _profile_confirm_totp(user: User) -> ResponseReturnValue:
897 totp_secret = session.get("profile_totp_secret")
898 totp_uri = session.get("profile_totp_uri")
899 if not totp_secret:
900 flash(_("Session expired. Please try again."), "danger")
901 return redirect(url_for("auth.profile"))
903 code = request.form.get("totp_code", "").strip()
904 if not pyotp.TOTP(totp_secret).verify(code, valid_window=1):
905 flash(_("Invalid code. Please try again."), "danger")
906 return render_template(
907 "auth/profile.html", user=user, totp_secret=totp_secret, totp_uri=totp_uri
908 )
910 user.totp_secret = totp_secret
911 db.session.commit()
912 session.pop("profile_totp_secret", None)
913 session.pop("profile_totp_uri", None)
914 _log.warning(
915 "[SECURITY] auth.totp.enabled user_id=%s ip=%s",
916 _sl(str(user.id)),
917 _sl(request.remote_addr),
918 )
919 flash(_("Two-factor authentication enabled."), "success")
920 return redirect(url_for("auth.profile"))
923def _profile_disable_totp(user: User) -> ResponseReturnValue:
924 tu = TenantUser.query.filter_by(user_id=user.id).first()
925 if tu and tu.tenant and tu.tenant.require_totp:
926 flash(
927 _(
928 "Your administrator requires two-factor authentication."
929 " It cannot be disabled on this account."
930 ),
931 "danger",
932 )
933 return redirect(url_for("auth.profile"))
934 current_pw = request.form.get("current_password", "")
935 if not _pw.verify(current_pw, user.password_hash):
936 flash(_("Current password is incorrect."), "danger")
937 return redirect(url_for("auth.profile"))
938 user.totp_secret = None
939 db.session.commit()
940 _log.warning(
941 "[SECURITY] auth.totp.disabled user_id=%s ip=%s",
942 _sl(str(user.id)),
943 _sl(request.remote_addr),
944 )
945 flash(_("Two-factor authentication disabled."), "success")
946 return redirect(url_for("auth.profile"))
949# ── /reset-password/<token> ───────────────────────────────────────────────────
952@auth_bp.route("/reset-password/<token>", methods=["GET", "POST"])
953def reset_password(token: str) -> ResponseReturnValue:
954 """Consume a PasswordResetToken generated by the instance admin."""
955 prt = PasswordResetToken.query.filter_by(token=token).first_or_404()
957 if prt.is_used:
958 flash(_("This reset link has already been used."), "danger")
959 return redirect(url_for("auth.login"))
961 if prt.is_expired:
962 flash(
963 _("This reset link has expired. Ask the administrator for a new one."),
964 "danger",
965 )
966 return redirect(url_for("auth.login"))
968 if request.method == "POST":
969 new_pw = request.form.get("new_password", "")
970 confirm_pw = request.form.get("confirm_password", "")
972 if len(new_pw) < 12:
973 flash(_("Password must be at least 12 characters."), "danger")
974 return render_template("auth/reset_password.html", token=token)
975 if new_pw != confirm_pw:
976 flash(_("Passwords do not match."), "danger")
977 return render_template("auth/reset_password.html", token=token)
979 user = db.session.get(User, prt.user_id)
980 if not user: # pragma: no cover — token CASCADE-deletes with user
981 flash(_("User not found."), "danger")
982 return redirect(url_for("auth.login"))
984 user.password_hash = _pw.hash(new_pw)
985 user.totp_secret = None # clear TOTP so the user can re-enrol
986 prt.used_at = datetime.now(timezone.utc)
987 db.session.commit()
989 flash(_("Password reset successfully. You can now log in."), "success")
990 return redirect(url_for("auth.login"))
992 return render_template("auth/reset_password.html", token=token)