Coverage for app/users/routes.py: 100%
275 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1"""
2Users blueprint — user management, invitations, and role changes.
3Only ADMIN/OWNER roles can manage users; the invitation-accept route is public.
4"""
6import json as _json
7import logging as _logging
8import os
9from datetime import datetime, timedelta, timezone
11import pw_hash as _pw # pyright: ignore[reportMissingImports]
12from flask import ( # pyright: ignore[reportMissingImports]
13 Blueprint,
14 abort,
15 current_app,
16 flash,
17 redirect,
18 render_template,
19 request,
20 session,
21 url_for,
22)
23from flask.typing import ResponseReturnValue # pyright: ignore[reportMissingImports]
24from flask_babel import gettext as _ # pyright: ignore[reportMissingImports]
26from models import (
27 Aircraft,
28 PermissionBit,
29 Role,
30 TenantUser,
31 User,
32 UserAircraftAccess,
33 UserAllAircraftAccess,
34 UserInvitation,
35 db,
36)
37from utils import activity, login_required, require_role
39users_bp = Blueprint("users", __name__, url_prefix="/config/users")
40_log = _logging.getLogger("openhangar.users")
42_INVITATION_EXPIRY_DAYS = 7
45def _sl(value: object) -> str:
46 """Sanitize a value for log output — strips CR/LF to prevent log injection (CWE-117)."""
47 return str(value).replace("\r\n", "").replace("\n", "").replace("\r", "")
50@users_bp.before_request
51def _block_in_demo() -> None:
52 if os.environ.get("OPENHANGAR_ENV") == "demo":
53 abort(403)
56ROLE_LABELS = {
57 Role.ADMIN: "Admin",
58 Role.OWNER: "Owner",
59 Role.PILOT: "Pilot / Renter",
60 Role.STUDENT: "Student",
61 Role.INSTRUCTOR: "Instructor",
62 Role.MAINTENANCE: "Maintenance",
63 Role.VIEWER: "Viewer",
64}
67def _tenant_id() -> int:
68 tu = TenantUser.query.filter_by(user_id=session["user_id"]).first()
69 if not tu:
70 abort(403) # pragma: no cover
71 return int(tu.tenant_id)
74# ── User list ─────────────────────────────────────────────────────────────────
77@users_bp.route("/")
78@login_required
79@require_role(Role.ADMIN, Role.OWNER)
80def list_users() -> ResponseReturnValue:
81 tid = _tenant_id()
82 tenant_users = TenantUser.query.filter_by(tenant_id=tid).join(User).all()
83 invitations = (
84 UserInvitation.query.filter_by(tenant_id=tid)
85 .filter(UserInvitation.accepted_at.is_(None))
86 .order_by(UserInvitation.created_at.desc())
87 .all()
88 )
89 all_aircraft = (
90 Aircraft.query.filter_by(tenant_id=tid).order_by(Aircraft.registration).all()
91 )
92 # Build per-user set of accessible aircraft IDs for the template
93 access_rows = UserAircraftAccess.query.filter(
94 UserAircraftAccess.aircraft_id.in_([ac.id for ac in all_aircraft])
95 ).all()
96 user_aircraft_ids: dict[int, set[int]] = {}
97 for row in access_rows:
98 user_aircraft_ids.setdefault(row.user_id, set()).add(row.aircraft_id)
99 # Build per-user all-planes flag
100 all_planes_user_ids: set[int] = {
101 row.user_id
102 for row in UserAllAircraftAccess.query.filter_by(tenant_id=tid).all()
103 }
104 return render_template(
105 "users/list.html",
106 tenant_users=tenant_users,
107 invitations=invitations,
108 role_labels=ROLE_LABELS,
109 current_user_id=session["user_id"],
110 all_roles=[r for r in Role if r not in (Role.ADMIN,)],
111 all_aircraft=all_aircraft,
112 user_aircraft_ids=user_aircraft_ids,
113 all_planes_user_ids=all_planes_user_ids,
114 Role=Role,
115 )
118# ── Invite ────────────────────────────────────────────────────────────────────
121@users_bp.route("/invite", methods=["POST"])
122@login_required
123@require_role(Role.ADMIN, Role.OWNER)
124def invite() -> ResponseReturnValue:
125 tid = _tenant_id()
127 # Multi-invite: names, emails, roles, display_names are parallel lists
128 emails = [e.strip().lower() or None for e in request.form.getlist("email")]
129 roles_raw = request.form.getlist("role")
130 display_names = [n.strip() or None for n in request.form.getlist("display_name")]
131 aircraft_ids_lists = request.form.getlist("aircraft_ids")
133 if not emails:
134 flash(_("No invitations to create."), "warning")
135 return redirect(url_for("users.list_users"))
137 created_urls: list[str] = []
139 for i, email_raw in enumerate(emails):
140 role_raw = roles_raw[i] if i < len(roles_raw) else Role.PILOT.value
141 display_name = display_names[i] if i < len(display_names) else None
143 try:
144 role = Role(role_raw)
145 except ValueError:
146 role = Role.PILOT
147 if role == Role.ADMIN:
148 role = Role.OWNER
150 invited_aircraft_ids: list[int] | None = None
151 if role not in (Role.ADMIN, Role.OWNER):
152 raw_ids = (
153 aircraft_ids_lists[i].split(",") if i < len(aircraft_ids_lists) else []
154 )
155 try:
156 invited_aircraft_ids = [int(x) for x in raw_ids if x.strip()]
157 except ValueError:
158 invited_aircraft_ids = []
160 inv = UserInvitation(
161 tenant_id=tid,
162 invited_by_user_id=session["user_id"],
163 email=email_raw,
164 display_name=display_name,
165 role=role,
166 aircraft_ids=invited_aircraft_ids,
167 expires_at=datetime.now(timezone.utc)
168 + timedelta(days=_INVITATION_EXPIRY_DAYS),
169 )
170 db.session.add(inv)
171 db.session.flush()
172 activity(
173 "user.invited", invitation_id=inv.id, email=email_raw or "", role=str(role)
174 )
176 accept_url = url_for("users.accept_invite", token=inv.token, _external=True)
177 created_urls.append(accept_url)
179 if email_raw:
180 _try_send_invite_email(email_raw, accept_url, role, display_name)
182 db.session.commit()
184 if len(created_urls) == 1:
185 flash(
186 _("Invitation created. Share this link: %(url)s", url=created_urls[0]),
187 "success",
188 )
189 else:
190 flash(
191 _("%(n)s invitations created.", n=len(created_urls)),
192 "success",
193 )
194 return redirect(url_for("users.list_users"))
197def _try_send_invite_email(
198 to: str, accept_url: str, role: Role, display_name: str | None = None
199) -> None:
200 try:
201 from services.email_service import send_email # pyright: ignore[reportMissingImports]
203 greeting = f"Hi {display_name},\n\n" if display_name else ""
204 send_email(
205 to=to,
206 subject=_("You've been invited to OpenHangar"),
207 text_body=(
208 f"{greeting}You have been invited to join an OpenHangar hangar as {ROLE_LABELS[role]}.\n\n"
209 f"Accept your invitation here:\n{accept_url}\n\n"
210 f"This link expires in {_INVITATION_EXPIRY_DAYS} days."
211 ),
212 )
213 except Exception:
214 current_app.logger.warning(
215 "Failed to send invitation email to %s", to, exc_info=True
216 )
219# ── Accept invitation ─────────────────────────────────────────────────────────
222@users_bp.route("/invite/<token>", methods=["GET", "POST"])
223def accept_invite(token: str) -> ResponseReturnValue:
224 inv = UserInvitation.query.filter_by(token=token).first_or_404()
226 if inv.is_accepted:
227 flash(_("This invitation has already been used."), "warning")
228 return redirect(url_for("auth.login"))
230 if inv.is_expired:
231 flash(_("This invitation has expired."), "danger")
232 return redirect(url_for("auth.login"))
234 if request.method == "GET":
235 return render_template(
236 "users/invite_accept.html",
237 invitation=inv,
238 role_labels=ROLE_LABELS,
239 prefill_email="",
240 )
242 # POST — create user
243 email = request.form.get("email", "").strip().lower()
244 password = request.form.get("password", "")
245 password2 = request.form.get("password2", "")
247 errors = []
248 if not email or "@" not in email:
249 errors.append(_("A valid email address is required."))
250 if len(password) < 12:
251 errors.append(_("Password must be at least 12 characters."))
252 if password != password2:
253 errors.append(_("Passwords do not match."))
254 if User.query.filter_by(email=email).first():
255 errors.append(_("An account with this email already exists."))
257 if errors:
258 for msg in errors:
259 flash(msg, "danger")
260 return render_template(
261 "users/invite_accept.html",
262 invitation=inv,
263 role_labels=ROLE_LABELS,
264 prefill_email=email,
265 )
267 user = User(
268 email=email,
269 password_hash=_pw.hash(password),
270 is_active=True,
271 )
272 db.session.add(user)
273 db.session.flush()
275 db.session.add(
276 TenantUser(
277 user_id=user.id,
278 tenant_id=inv.tenant_id,
279 role=inv.role,
280 )
281 )
283 # Grant per-aircraft access for non-owner roles
284 if inv.role not in (Role.ADMIN, Role.OWNER) and inv.aircraft_ids:
285 for acid in inv.aircraft_ids:
286 db.session.add(UserAircraftAccess(user_id=user.id, aircraft_id=acid))
288 inv.accepted_at = datetime.now(timezone.utc)
289 db.session.commit()
290 activity(
291 "user.invite_accepted", invitation_id=inv.id, email=email, user_id_new=user.id
292 )
294 flash(_("Account created. You can now log in."), "success")
295 return redirect(url_for("auth.login"))
298# ── Change role ───────────────────────────────────────────────────────────────
301@users_bp.route("/<int:user_id>/role", methods=["POST"])
302@login_required
303@require_role(Role.ADMIN, Role.OWNER)
304def change_role(user_id: int) -> ResponseReturnValue:
305 tid = _tenant_id()
306 if user_id == session["user_id"]:
307 flash(_("You cannot change your own role."), "danger")
308 return redirect(url_for("users.list_users"))
310 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404()
311 role_raw = request.form.get("role", "")
312 try:
313 new_role = Role(role_raw)
314 except ValueError:
315 abort(400)
316 if new_role == Role.ADMIN:
317 abort(400)
318 old_role = tu.role
319 tu.role = new_role
320 # When promoted to owner/admin, per-aircraft access rows are no longer needed
321 if new_role in (Role.ADMIN, Role.OWNER):
322 UserAircraftAccess.query.filter_by(user_id=user_id).delete()
323 db.session.commit()
324 _log.warning(
325 "[SECURITY] users.role.changed target_user_id=%s old_role=%s new_role=%s admin_user_id=%s ip=%s",
326 _sl(str(user_id)),
327 _sl(old_role.value),
328 _sl(new_role.value),
329 _sl(str(session["user_id"])),
330 _sl(request.remote_addr),
331 )
332 flash(_("Role updated."), "success")
333 return redirect(url_for("users.list_users"))
336# ── Revoke access ─────────────────────────────────────────────────────────────
339@users_bp.route("/<int:user_id>/revoke", methods=["POST"])
340@login_required
341@require_role(Role.ADMIN, Role.OWNER)
342def revoke_access(user_id: int) -> ResponseReturnValue:
343 tid = _tenant_id()
344 if user_id == session["user_id"]:
345 flash(_("You cannot revoke your own access."), "danger")
346 return redirect(url_for("users.list_users"))
348 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404()
349 user = db.session.get(User, user_id)
350 if user:
351 user.is_active = False
352 db.session.delete(tu)
353 db.session.commit()
354 _log.warning(
355 "[SECURITY] users.access.revoked target_user_id=%s admin_user_id=%s ip=%s",
356 _sl(str(user_id)),
357 _sl(str(session["user_id"])),
358 _sl(request.remote_addr),
359 )
360 flash(_("Access revoked."), "success")
361 return redirect(url_for("users.list_users"))
364# ── Revoke pending invitation ─────────────────────────────────────────────────
367@users_bp.route("/invite/<int:inv_id>/revoke", methods=["POST"])
368@login_required
369@require_role(Role.ADMIN, Role.OWNER)
370def revoke_invite(inv_id: int) -> ResponseReturnValue:
371 tid = _tenant_id()
372 inv = UserInvitation.query.filter_by(id=inv_id, tenant_id=tid).first_or_404()
373 db.session.delete(inv)
374 db.session.commit()
375 flash(_("Invitation revoked."), "success")
376 return redirect(url_for("users.list_users"))
379# ── Update aircraft access ────────────────────────────────────────────────────
382@users_bp.route("/<int:user_id>/aircraft-access", methods=["POST"])
383@login_required
384@require_role(Role.ADMIN, Role.OWNER)
385def update_aircraft_access(user_id: int) -> ResponseReturnValue:
386 tid = _tenant_id()
387 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404()
389 # Owners/admins bypass the access table — no rows needed
390 if tu.role in (Role.ADMIN, Role.OWNER):
391 flash(_("Owners and admins always have full fleet access."), "info")
392 return redirect(url_for("users.list_users"))
394 raw_ids = request.form.getlist("aircraft_ids")
395 try:
396 new_ids = {int(x) for x in raw_ids if x}
397 except ValueError:
398 abort(400)
400 # Verify all aircraft belong to this tenant
401 valid_ids = {
402 ac.id
403 for ac in Aircraft.query.filter(
404 Aircraft.id.in_(new_ids), Aircraft.tenant_id == tid
405 ).all()
406 }
408 # Replace existing access rows for this user in this tenant
409 existing = UserAircraftAccess.query.filter(
410 UserAircraftAccess.user_id == user_id,
411 UserAircraftAccess.aircraft_id.in_(
412 [ac.id for ac in Aircraft.query.filter_by(tenant_id=tid).all()]
413 ),
414 ).all()
415 for row in existing:
416 db.session.delete(row)
418 for acid in valid_ids:
419 db.session.add(UserAircraftAccess(user_id=user_id, aircraft_id=acid))
421 db.session.commit()
422 flash(_("Aircraft access updated."), "success")
423 return redirect(url_for("users.list_users"))
426# ── Toggle all-planes access ──────────────────────────────────────────────────
429@users_bp.route("/<int:user_id>/all-planes", methods=["POST"])
430@login_required
431@require_role(Role.ADMIN, Role.OWNER)
432def toggle_all_planes(user_id: int) -> ResponseReturnValue:
433 tid = _tenant_id()
434 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404()
436 if tu.role in (Role.ADMIN, Role.OWNER):
437 flash(_("Owners and admins always have full fleet access."), "info")
438 return redirect(url_for("users.list_users"))
440 existing = UserAllAircraftAccess.query.filter_by(
441 user_id=user_id, tenant_id=tid
442 ).first()
443 if existing:
444 db.session.delete(existing)
445 else:
446 db.session.add(UserAllAircraftAccess(user_id=user_id, tenant_id=tid))
447 db.session.commit()
448 flash(_("All-aircraft access updated."), "success")
449 return redirect(url_for("users.list_users"))
452# ── Toggle user capability flags ─────────────────────────────────────────────
455@users_bp.route("/<int:user_id>/flags", methods=["POST"])
456@login_required
457@require_role(Role.ADMIN, Role.OWNER)
458def update_user_flags(user_id: int) -> ResponseReturnValue:
459 tid = _tenant_id()
460 TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404()
461 if user_id == session["user_id"]:
462 flash(_("You cannot change your own capability flags."), "danger")
463 return redirect(url_for("users.list_users"))
465 user = db.session.get(User, user_id)
466 if not user:
467 abort(404)
469 user.is_pilot = "is_pilot" in request.form
470 user.is_maintenance = "is_maintenance" in request.form
471 user.view_only = "view_only" in request.form
472 db.session.commit()
473 flash(_("User flags updated."), "success")
474 return redirect(url_for("users.list_users"))
477# ── Per-aircraft permission editor ───────────────────────────────────────────
479_PERM_BITS: list[tuple[int, str, str]] = [
480 (PermissionBit.VIEW_AIRCRAFT, "view_aircraft", "View"),
481 (PermissionBit.EDIT_AIRCRAFT, "edit_aircraft", "Edit aircraft"),
482 (PermissionBit.READ_MAINT_FULL, "read_maint_full", "Full maintenance"),
483 (PermissionBit.READ_MAINT_LIMITED, "read_maint_limited", "Limited maintenance"),
484 (PermissionBit.WRITE_MAINTENANCE, "write_maintenance", "Write maintenance"),
485 (PermissionBit.EDIT_COMPONENTS, "edit_components", "Edit components"),
486 (PermissionBit.WRITE_LOGBOOK, "write_logbook", "Write logbook"),
487 (PermissionBit.RESERVE_AIRCRAFT, "reserve_aircraft", "Reserve"),
488]
489_BIT_VALUES = [bit for bit, _, _ in _PERM_BITS]
492@users_bp.route("/<int:user_id>/permissions", methods=["GET", "POST"])
493@login_required
494@require_role(Role.ADMIN, Role.OWNER)
495def edit_permissions(user_id: int) -> ResponseReturnValue:
496 tid = _tenant_id()
497 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404()
498 user = db.session.get(User, user_id)
499 if not user:
500 abort(404)
502 if tu.role in (Role.ADMIN, Role.OWNER):
503 flash(
504 _(
505 "Owners and admins always have full fleet access — no custom permissions needed."
506 ),
507 "info",
508 )
509 return redirect(url_for("users.list_users"))
511 all_aircraft = (
512 Aircraft.query.filter_by(tenant_id=tid).order_by(Aircraft.registration).all()
513 )
514 all_planes_row = UserAllAircraftAccess.query.filter_by(
515 user_id=user_id, tenant_id=tid
516 ).first()
517 aircraft_access = {
518 row.aircraft_id: row
519 for row in UserAircraftAccess.query.filter(
520 UserAircraftAccess.user_id == user_id,
521 UserAircraftAccess.aircraft_id.in_([ac.id for ac in all_aircraft]),
522 ).all()
523 }
525 role_default_mask = PermissionBit.ROLE_DEFAULTS.get(tu.role.value, 0)
527 def _parse_mask(scope: str) -> int | None:
528 mask = 0
529 for b in _BIT_VALUES:
530 if request.form.get(f"bit_{scope}_{b}"):
531 mask |= b
532 return None if mask == role_default_mask else mask
534 if request.method == "POST":
535 if all_planes_row:
536 all_planes_row.permissions_mask = _parse_mask("all")
537 for ac in all_aircraft:
538 row = aircraft_access.get(ac.id)
539 if row:
540 row.permissions_mask = _parse_mask(str(ac.id))
541 db.session.commit()
542 flash(_("Permissions updated."), "success")
543 return redirect(url_for("users.edit_permissions", user_id=user_id))
545 def _bits_for(mask: int | None) -> set[int]:
546 m = mask if mask is not None else role_default_mask
547 return {b for b in _BIT_VALUES if m & b}
549 all_planes_bits = (
550 _bits_for(all_planes_row.permissions_mask) if all_planes_row else set()
551 )
552 per_aircraft_bits: dict[int, set[int]] = {
553 ac.id: _bits_for(aircraft_access[ac.id].permissions_mask)
554 for ac in all_aircraft
555 if ac.id in aircraft_access
556 }
557 per_aircraft_is_custom: dict[int, bool] = {
558 ac.id: aircraft_access[ac.id].permissions_mask is not None
559 for ac in all_aircraft
560 if ac.id in aircraft_access
561 }
563 return render_template(
564 "users/permissions.html",
565 tu=tu,
566 user=user,
567 all_aircraft=all_aircraft,
568 all_planes_row=all_planes_row,
569 all_planes_bits=all_planes_bits,
570 all_planes_is_custom=all_planes_row is not None
571 and all_planes_row.permissions_mask is not None,
572 per_aircraft_bits=per_aircraft_bits,
573 per_aircraft_is_custom=per_aircraft_is_custom,
574 aircraft_access=aircraft_access,
575 perm_bits=_PERM_BITS,
576 role_default_mask=role_default_mask,
577 role_defaults_json=_json.dumps(PermissionBit.ROLE_DEFAULTS),
578 )