Coverage for app/users/routes.py: 100%

275 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1""" 

2Users blueprint — user management, invitations, and role changes. 

3Only ADMIN/OWNER roles can manage users; the invitation-accept route is public. 

4""" 

5 

6import json as _json 

7import logging as _logging 

8import os 

9from datetime import datetime, timedelta, timezone 

10 

11import pw_hash as _pw # pyright: ignore[reportMissingImports] 

12from flask import ( # pyright: ignore[reportMissingImports] 

13 Blueprint, 

14 abort, 

15 current_app, 

16 flash, 

17 redirect, 

18 render_template, 

19 request, 

20 session, 

21 url_for, 

22) 

23from flask.typing import ResponseReturnValue # pyright: ignore[reportMissingImports] 

24from flask_babel import gettext as _ # pyright: ignore[reportMissingImports] 

25 

26from models import ( 

27 Aircraft, 

28 PermissionBit, 

29 Role, 

30 TenantUser, 

31 User, 

32 UserAircraftAccess, 

33 UserAllAircraftAccess, 

34 UserInvitation, 

35 db, 

36) 

37from utils import activity, login_required, require_role 

38 

39users_bp = Blueprint("users", __name__, url_prefix="/config/users") 

40_log = _logging.getLogger("openhangar.users") 

41 

42_INVITATION_EXPIRY_DAYS = 7 

43 

44 

45def _sl(value: object) -> str: 

46 """Sanitize a value for log output — strips CR/LF to prevent log injection (CWE-117).""" 

47 return str(value).replace("\r\n", "").replace("\n", "").replace("\r", "") 

48 

49 

50@users_bp.before_request 

51def _block_in_demo() -> None: 

52 if os.environ.get("OPENHANGAR_ENV") == "demo": 

53 abort(403) 

54 

55 

56ROLE_LABELS = { 

57 Role.ADMIN: "Admin", 

58 Role.OWNER: "Owner", 

59 Role.PILOT: "Pilot / Renter", 

60 Role.STUDENT: "Student", 

61 Role.INSTRUCTOR: "Instructor", 

62 Role.MAINTENANCE: "Maintenance", 

63 Role.VIEWER: "Viewer", 

64} 

65 

66 

67def _tenant_id() -> int: 

68 tu = TenantUser.query.filter_by(user_id=session["user_id"]).first() 

69 if not tu: 

70 abort(403) # pragma: no cover 

71 return int(tu.tenant_id) 

72 

73 

74# ── User list ───────────────────────────────────────────────────────────────── 

75 

76 

77@users_bp.route("/") 

78@login_required 

79@require_role(Role.ADMIN, Role.OWNER) 

80def list_users() -> ResponseReturnValue: 

81 tid = _tenant_id() 

82 tenant_users = TenantUser.query.filter_by(tenant_id=tid).join(User).all() 

83 invitations = ( 

84 UserInvitation.query.filter_by(tenant_id=tid) 

85 .filter(UserInvitation.accepted_at.is_(None)) 

86 .order_by(UserInvitation.created_at.desc()) 

87 .all() 

88 ) 

89 all_aircraft = ( 

90 Aircraft.query.filter_by(tenant_id=tid).order_by(Aircraft.registration).all() 

91 ) 

92 # Build per-user set of accessible aircraft IDs for the template 

93 access_rows = UserAircraftAccess.query.filter( 

94 UserAircraftAccess.aircraft_id.in_([ac.id for ac in all_aircraft]) 

95 ).all() 

96 user_aircraft_ids: dict[int, set[int]] = {} 

97 for row in access_rows: 

98 user_aircraft_ids.setdefault(row.user_id, set()).add(row.aircraft_id) 

99 # Build per-user all-planes flag 

100 all_planes_user_ids: set[int] = { 

101 row.user_id 

102 for row in UserAllAircraftAccess.query.filter_by(tenant_id=tid).all() 

103 } 

104 return render_template( 

105 "users/list.html", 

106 tenant_users=tenant_users, 

107 invitations=invitations, 

108 role_labels=ROLE_LABELS, 

109 current_user_id=session["user_id"], 

110 all_roles=[r for r in Role if r not in (Role.ADMIN,)], 

111 all_aircraft=all_aircraft, 

112 user_aircraft_ids=user_aircraft_ids, 

113 all_planes_user_ids=all_planes_user_ids, 

114 Role=Role, 

115 ) 

116 

117 

118# ── Invite ──────────────────────────────────────────────────────────────────── 

119 

120 

121@users_bp.route("/invite", methods=["POST"]) 

122@login_required 

123@require_role(Role.ADMIN, Role.OWNER) 

124def invite() -> ResponseReturnValue: 

125 tid = _tenant_id() 

126 

127 # Multi-invite: names, emails, roles, display_names are parallel lists 

128 emails = [e.strip().lower() or None for e in request.form.getlist("email")] 

129 roles_raw = request.form.getlist("role") 

130 display_names = [n.strip() or None for n in request.form.getlist("display_name")] 

131 aircraft_ids_lists = request.form.getlist("aircraft_ids") 

132 

133 if not emails: 

134 flash(_("No invitations to create."), "warning") 

135 return redirect(url_for("users.list_users")) 

136 

137 created_urls: list[str] = [] 

138 

139 for i, email_raw in enumerate(emails): 

140 role_raw = roles_raw[i] if i < len(roles_raw) else Role.PILOT.value 

141 display_name = display_names[i] if i < len(display_names) else None 

142 

143 try: 

144 role = Role(role_raw) 

145 except ValueError: 

146 role = Role.PILOT 

147 if role == Role.ADMIN: 

148 role = Role.OWNER 

149 

150 invited_aircraft_ids: list[int] | None = None 

151 if role not in (Role.ADMIN, Role.OWNER): 

152 raw_ids = ( 

153 aircraft_ids_lists[i].split(",") if i < len(aircraft_ids_lists) else [] 

154 ) 

155 try: 

156 invited_aircraft_ids = [int(x) for x in raw_ids if x.strip()] 

157 except ValueError: 

158 invited_aircraft_ids = [] 

159 

160 inv = UserInvitation( 

161 tenant_id=tid, 

162 invited_by_user_id=session["user_id"], 

163 email=email_raw, 

164 display_name=display_name, 

165 role=role, 

166 aircraft_ids=invited_aircraft_ids, 

167 expires_at=datetime.now(timezone.utc) 

168 + timedelta(days=_INVITATION_EXPIRY_DAYS), 

169 ) 

170 db.session.add(inv) 

171 db.session.flush() 

172 activity( 

173 "user.invited", invitation_id=inv.id, email=email_raw or "", role=str(role) 

174 ) 

175 

176 accept_url = url_for("users.accept_invite", token=inv.token, _external=True) 

177 created_urls.append(accept_url) 

178 

179 if email_raw: 

180 _try_send_invite_email(email_raw, accept_url, role, display_name) 

181 

182 db.session.commit() 

183 

184 if len(created_urls) == 1: 

185 flash( 

186 _("Invitation created. Share this link: %(url)s", url=created_urls[0]), 

187 "success", 

188 ) 

189 else: 

190 flash( 

191 _("%(n)s invitations created.", n=len(created_urls)), 

192 "success", 

193 ) 

194 return redirect(url_for("users.list_users")) 

195 

196 

197def _try_send_invite_email( 

198 to: str, accept_url: str, role: Role, display_name: str | None = None 

199) -> None: 

200 try: 

201 from services.email_service import send_email # pyright: ignore[reportMissingImports] 

202 

203 greeting = f"Hi {display_name},\n\n" if display_name else "" 

204 send_email( 

205 to=to, 

206 subject=_("You've been invited to OpenHangar"), 

207 text_body=( 

208 f"{greeting}You have been invited to join an OpenHangar hangar as {ROLE_LABELS[role]}.\n\n" 

209 f"Accept your invitation here:\n{accept_url}\n\n" 

210 f"This link expires in {_INVITATION_EXPIRY_DAYS} days." 

211 ), 

212 ) 

213 except Exception: 

214 current_app.logger.warning( 

215 "Failed to send invitation email to %s", to, exc_info=True 

216 ) 

217 

218 

219# ── Accept invitation ───────────────────────────────────────────────────────── 

220 

221 

222@users_bp.route("/invite/<token>", methods=["GET", "POST"]) 

223def accept_invite(token: str) -> ResponseReturnValue: 

224 inv = UserInvitation.query.filter_by(token=token).first_or_404() 

225 

226 if inv.is_accepted: 

227 flash(_("This invitation has already been used."), "warning") 

228 return redirect(url_for("auth.login")) 

229 

230 if inv.is_expired: 

231 flash(_("This invitation has expired."), "danger") 

232 return redirect(url_for("auth.login")) 

233 

234 if request.method == "GET": 

235 return render_template( 

236 "users/invite_accept.html", 

237 invitation=inv, 

238 role_labels=ROLE_LABELS, 

239 prefill_email="", 

240 ) 

241 

242 # POST — create user 

243 email = request.form.get("email", "").strip().lower() 

244 password = request.form.get("password", "") 

245 password2 = request.form.get("password2", "") 

246 

247 errors = [] 

248 if not email or "@" not in email: 

249 errors.append(_("A valid email address is required.")) 

250 if len(password) < 12: 

251 errors.append(_("Password must be at least 12 characters.")) 

252 if password != password2: 

253 errors.append(_("Passwords do not match.")) 

254 if User.query.filter_by(email=email).first(): 

255 errors.append(_("An account with this email already exists.")) 

256 

257 if errors: 

258 for msg in errors: 

259 flash(msg, "danger") 

260 return render_template( 

261 "users/invite_accept.html", 

262 invitation=inv, 

263 role_labels=ROLE_LABELS, 

264 prefill_email=email, 

265 ) 

266 

267 user = User( 

268 email=email, 

269 password_hash=_pw.hash(password), 

270 is_active=True, 

271 ) 

272 db.session.add(user) 

273 db.session.flush() 

274 

275 db.session.add( 

276 TenantUser( 

277 user_id=user.id, 

278 tenant_id=inv.tenant_id, 

279 role=inv.role, 

280 ) 

281 ) 

282 

283 # Grant per-aircraft access for non-owner roles 

284 if inv.role not in (Role.ADMIN, Role.OWNER) and inv.aircraft_ids: 

285 for acid in inv.aircraft_ids: 

286 db.session.add(UserAircraftAccess(user_id=user.id, aircraft_id=acid)) 

287 

288 inv.accepted_at = datetime.now(timezone.utc) 

289 db.session.commit() 

290 activity( 

291 "user.invite_accepted", invitation_id=inv.id, email=email, user_id_new=user.id 

292 ) 

293 

294 flash(_("Account created. You can now log in."), "success") 

295 return redirect(url_for("auth.login")) 

296 

297 

298# ── Change role ─────────────────────────────────────────────────────────────── 

299 

300 

301@users_bp.route("/<int:user_id>/role", methods=["POST"]) 

302@login_required 

303@require_role(Role.ADMIN, Role.OWNER) 

304def change_role(user_id: int) -> ResponseReturnValue: 

305 tid = _tenant_id() 

306 if user_id == session["user_id"]: 

307 flash(_("You cannot change your own role."), "danger") 

308 return redirect(url_for("users.list_users")) 

309 

310 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404() 

311 role_raw = request.form.get("role", "") 

312 try: 

313 new_role = Role(role_raw) 

314 except ValueError: 

315 abort(400) 

316 if new_role == Role.ADMIN: 

317 abort(400) 

318 old_role = tu.role 

319 tu.role = new_role 

320 # When promoted to owner/admin, per-aircraft access rows are no longer needed 

321 if new_role in (Role.ADMIN, Role.OWNER): 

322 UserAircraftAccess.query.filter_by(user_id=user_id).delete() 

323 db.session.commit() 

324 _log.warning( 

325 "[SECURITY] users.role.changed target_user_id=%s old_role=%s new_role=%s admin_user_id=%s ip=%s", 

326 _sl(str(user_id)), 

327 _sl(old_role.value), 

328 _sl(new_role.value), 

329 _sl(str(session["user_id"])), 

330 _sl(request.remote_addr), 

331 ) 

332 flash(_("Role updated."), "success") 

333 return redirect(url_for("users.list_users")) 

334 

335 

336# ── Revoke access ───────────────────────────────────────────────────────────── 

337 

338 

339@users_bp.route("/<int:user_id>/revoke", methods=["POST"]) 

340@login_required 

341@require_role(Role.ADMIN, Role.OWNER) 

342def revoke_access(user_id: int) -> ResponseReturnValue: 

343 tid = _tenant_id() 

344 if user_id == session["user_id"]: 

345 flash(_("You cannot revoke your own access."), "danger") 

346 return redirect(url_for("users.list_users")) 

347 

348 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404() 

349 user = db.session.get(User, user_id) 

350 if user: 

351 user.is_active = False 

352 db.session.delete(tu) 

353 db.session.commit() 

354 _log.warning( 

355 "[SECURITY] users.access.revoked target_user_id=%s admin_user_id=%s ip=%s", 

356 _sl(str(user_id)), 

357 _sl(str(session["user_id"])), 

358 _sl(request.remote_addr), 

359 ) 

360 flash(_("Access revoked."), "success") 

361 return redirect(url_for("users.list_users")) 

362 

363 

364# ── Revoke pending invitation ───────────────────────────────────────────────── 

365 

366 

367@users_bp.route("/invite/<int:inv_id>/revoke", methods=["POST"]) 

368@login_required 

369@require_role(Role.ADMIN, Role.OWNER) 

370def revoke_invite(inv_id: int) -> ResponseReturnValue: 

371 tid = _tenant_id() 

372 inv = UserInvitation.query.filter_by(id=inv_id, tenant_id=tid).first_or_404() 

373 db.session.delete(inv) 

374 db.session.commit() 

375 flash(_("Invitation revoked."), "success") 

376 return redirect(url_for("users.list_users")) 

377 

378 

379# ── Update aircraft access ──────────────────────────────────────────────────── 

380 

381 

382@users_bp.route("/<int:user_id>/aircraft-access", methods=["POST"]) 

383@login_required 

384@require_role(Role.ADMIN, Role.OWNER) 

385def update_aircraft_access(user_id: int) -> ResponseReturnValue: 

386 tid = _tenant_id() 

387 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404() 

388 

389 # Owners/admins bypass the access table — no rows needed 

390 if tu.role in (Role.ADMIN, Role.OWNER): 

391 flash(_("Owners and admins always have full fleet access."), "info") 

392 return redirect(url_for("users.list_users")) 

393 

394 raw_ids = request.form.getlist("aircraft_ids") 

395 try: 

396 new_ids = {int(x) for x in raw_ids if x} 

397 except ValueError: 

398 abort(400) 

399 

400 # Verify all aircraft belong to this tenant 

401 valid_ids = { 

402 ac.id 

403 for ac in Aircraft.query.filter( 

404 Aircraft.id.in_(new_ids), Aircraft.tenant_id == tid 

405 ).all() 

406 } 

407 

408 # Replace existing access rows for this user in this tenant 

409 existing = UserAircraftAccess.query.filter( 

410 UserAircraftAccess.user_id == user_id, 

411 UserAircraftAccess.aircraft_id.in_( 

412 [ac.id for ac in Aircraft.query.filter_by(tenant_id=tid).all()] 

413 ), 

414 ).all() 

415 for row in existing: 

416 db.session.delete(row) 

417 

418 for acid in valid_ids: 

419 db.session.add(UserAircraftAccess(user_id=user_id, aircraft_id=acid)) 

420 

421 db.session.commit() 

422 flash(_("Aircraft access updated."), "success") 

423 return redirect(url_for("users.list_users")) 

424 

425 

426# ── Toggle all-planes access ────────────────────────────────────────────────── 

427 

428 

429@users_bp.route("/<int:user_id>/all-planes", methods=["POST"]) 

430@login_required 

431@require_role(Role.ADMIN, Role.OWNER) 

432def toggle_all_planes(user_id: int) -> ResponseReturnValue: 

433 tid = _tenant_id() 

434 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404() 

435 

436 if tu.role in (Role.ADMIN, Role.OWNER): 

437 flash(_("Owners and admins always have full fleet access."), "info") 

438 return redirect(url_for("users.list_users")) 

439 

440 existing = UserAllAircraftAccess.query.filter_by( 

441 user_id=user_id, tenant_id=tid 

442 ).first() 

443 if existing: 

444 db.session.delete(existing) 

445 else: 

446 db.session.add(UserAllAircraftAccess(user_id=user_id, tenant_id=tid)) 

447 db.session.commit() 

448 flash(_("All-aircraft access updated."), "success") 

449 return redirect(url_for("users.list_users")) 

450 

451 

452# ── Toggle user capability flags ───────────────────────────────────────────── 

453 

454 

455@users_bp.route("/<int:user_id>/flags", methods=["POST"]) 

456@login_required 

457@require_role(Role.ADMIN, Role.OWNER) 

458def update_user_flags(user_id: int) -> ResponseReturnValue: 

459 tid = _tenant_id() 

460 TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404() 

461 if user_id == session["user_id"]: 

462 flash(_("You cannot change your own capability flags."), "danger") 

463 return redirect(url_for("users.list_users")) 

464 

465 user = db.session.get(User, user_id) 

466 if not user: 

467 abort(404) 

468 

469 user.is_pilot = "is_pilot" in request.form 

470 user.is_maintenance = "is_maintenance" in request.form 

471 user.view_only = "view_only" in request.form 

472 db.session.commit() 

473 flash(_("User flags updated."), "success") 

474 return redirect(url_for("users.list_users")) 

475 

476 

477# ── Per-aircraft permission editor ─────────────────────────────────────────── 

478 

479_PERM_BITS: list[tuple[int, str, str]] = [ 

480 (PermissionBit.VIEW_AIRCRAFT, "view_aircraft", "View"), 

481 (PermissionBit.EDIT_AIRCRAFT, "edit_aircraft", "Edit aircraft"), 

482 (PermissionBit.READ_MAINT_FULL, "read_maint_full", "Full maintenance"), 

483 (PermissionBit.READ_MAINT_LIMITED, "read_maint_limited", "Limited maintenance"), 

484 (PermissionBit.WRITE_MAINTENANCE, "write_maintenance", "Write maintenance"), 

485 (PermissionBit.EDIT_COMPONENTS, "edit_components", "Edit components"), 

486 (PermissionBit.WRITE_LOGBOOK, "write_logbook", "Write logbook"), 

487 (PermissionBit.RESERVE_AIRCRAFT, "reserve_aircraft", "Reserve"), 

488] 

489_BIT_VALUES = [bit for bit, _, _ in _PERM_BITS] 

490 

491 

492@users_bp.route("/<int:user_id>/permissions", methods=["GET", "POST"]) 

493@login_required 

494@require_role(Role.ADMIN, Role.OWNER) 

495def edit_permissions(user_id: int) -> ResponseReturnValue: 

496 tid = _tenant_id() 

497 tu = TenantUser.query.filter_by(user_id=user_id, tenant_id=tid).first_or_404() 

498 user = db.session.get(User, user_id) 

499 if not user: 

500 abort(404) 

501 

502 if tu.role in (Role.ADMIN, Role.OWNER): 

503 flash( 

504 _( 

505 "Owners and admins always have full fleet access — no custom permissions needed." 

506 ), 

507 "info", 

508 ) 

509 return redirect(url_for("users.list_users")) 

510 

511 all_aircraft = ( 

512 Aircraft.query.filter_by(tenant_id=tid).order_by(Aircraft.registration).all() 

513 ) 

514 all_planes_row = UserAllAircraftAccess.query.filter_by( 

515 user_id=user_id, tenant_id=tid 

516 ).first() 

517 aircraft_access = { 

518 row.aircraft_id: row 

519 for row in UserAircraftAccess.query.filter( 

520 UserAircraftAccess.user_id == user_id, 

521 UserAircraftAccess.aircraft_id.in_([ac.id for ac in all_aircraft]), 

522 ).all() 

523 } 

524 

525 role_default_mask = PermissionBit.ROLE_DEFAULTS.get(tu.role.value, 0) 

526 

527 def _parse_mask(scope: str) -> int | None: 

528 mask = 0 

529 for b in _BIT_VALUES: 

530 if request.form.get(f"bit_{scope}_{b}"): 

531 mask |= b 

532 return None if mask == role_default_mask else mask 

533 

534 if request.method == "POST": 

535 if all_planes_row: 

536 all_planes_row.permissions_mask = _parse_mask("all") 

537 for ac in all_aircraft: 

538 row = aircraft_access.get(ac.id) 

539 if row: 

540 row.permissions_mask = _parse_mask(str(ac.id)) 

541 db.session.commit() 

542 flash(_("Permissions updated."), "success") 

543 return redirect(url_for("users.edit_permissions", user_id=user_id)) 

544 

545 def _bits_for(mask: int | None) -> set[int]: 

546 m = mask if mask is not None else role_default_mask 

547 return {b for b in _BIT_VALUES if m & b} 

548 

549 all_planes_bits = ( 

550 _bits_for(all_planes_row.permissions_mask) if all_planes_row else set() 

551 ) 

552 per_aircraft_bits: dict[int, set[int]] = { 

553 ac.id: _bits_for(aircraft_access[ac.id].permissions_mask) 

554 for ac in all_aircraft 

555 if ac.id in aircraft_access 

556 } 

557 per_aircraft_is_custom: dict[int, bool] = { 

558 ac.id: aircraft_access[ac.id].permissions_mask is not None 

559 for ac in all_aircraft 

560 if ac.id in aircraft_access 

561 } 

562 

563 return render_template( 

564 "users/permissions.html", 

565 tu=tu, 

566 user=user, 

567 all_aircraft=all_aircraft, 

568 all_planes_row=all_planes_row, 

569 all_planes_bits=all_planes_bits, 

570 all_planes_is_custom=all_planes_row is not None 

571 and all_planes_row.permissions_mask is not None, 

572 per_aircraft_bits=per_aircraft_bits, 

573 per_aircraft_is_custom=per_aircraft_is_custom, 

574 aircraft_access=aircraft_access, 

575 perm_bits=_PERM_BITS, 

576 role_default_mask=role_default_mask, 

577 role_defaults_json=_json.dumps(PermissionBit.ROLE_DEFAULTS), 

578 )