Coverage for app/config/routes.py: 100%

545 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1""" 

2Configuration blueprint — backup management, email settings, and future config sections. 

3""" 

4 

5import contextlib 

6import hashlib 

7import io 

8import json 

9import logging 

10import os 

11import subprocess # nosec B404 

12import urllib.error 

13import urllib.request 

14import zipfile 

15from datetime import datetime, timezone 

16 

17from flask import ( 

18 Blueprint, 

19 abort, 

20 current_app, 

21 flash, 

22 jsonify, 

23 redirect, 

24 render_template, 

25 request, 

26 session, 

27 url_for, 

28) # pyright: ignore[reportMissingImports] 

29from flask.typing import ResponseReturnValue # pyright: ignore[reportMissingImports] 

30from flask_babel import gettext as _, ngettext # pyright: ignore[reportMissingImports] 

31 

32from models import AppSetting, BackupRecord, db # pyright: ignore[reportMissingImports] 

33from utils import login_required, require_instance_admin # pyright: ignore[reportMissingImports] 

34 

35config_bp = Blueprint("config", __name__, url_prefix="/config") 

36log = logging.getLogger(__name__) 

37 

38 

39# ── helpers ─────────────────────────────────────────────────────────────────── 

40 

41 

42def _derive_key(passphrase: str) -> bytes: 

43 """Derive a 32-byte AES key from a passphrase using HKDF-SHA256.""" 

44 from cryptography.hazmat.primitives.kdf.hkdf import HKDF # pyright: ignore[reportMissingImports] 

45 from cryptography.hazmat.primitives import hashes # pyright: ignore[reportMissingImports] 

46 

47 return HKDF( 

48 algorithm=hashes.SHA256(), 

49 length=32, 

50 salt=b"openhangar-backup-kdf-salt-v1", 

51 info=b"openhangar-backup-v1", 

52 ).derive(passphrase.encode()) 

53 

54 

55def _encrypt_bytes(plaintext: bytes, key: bytes) -> bytes: 

56 """Encrypt *plaintext* with AES-256-GCM, prepending the 12-byte nonce.""" 

57 from cryptography.hazmat.primitives.ciphers.aead import AESGCM # pyright: ignore[reportMissingImports] 

58 import os as _os 

59 

60 nonce = _os.urandom(12) 

61 ct = AESGCM(key).encrypt(nonce, plaintext, None) 

62 return nonce + ct 

63 

64 

65def _get_alembic_head() -> str | None: 

66 """Return current Alembic revision from the DB, or None if unavailable.""" 

67 try: 

68 from sqlalchemy import text # pyright: ignore[reportMissingImports] 

69 

70 return db.session.execute( 

71 text("SELECT version_num FROM alembic_version LIMIT 1") 

72 ).scalar() 

73 except Exception: 

74 return None 

75 

76 

77def _parse_gatus_env() -> tuple[str, str, str | None] | None: 

78 """Return (base_url, endpoint_key, auth_header_or_None) from env vars, or None if not configured.""" 

79 endpoint_url = os.environ.get("OPENHANGAR_GATUS_ENDPOINT_URL", "").rstrip("/") 

80 if not endpoint_url or "/endpoints/" not in endpoint_url: 

81 return None 

82 base_url, _, endpoint_key = endpoint_url.rpartition("/endpoints/") 

83 if not base_url or not endpoint_key: 

84 return None 

85 auth_header = os.environ.get("OPENHANGAR_GATUS_AUTH_HEADER") or None 

86 return base_url, endpoint_key, auth_header 

87 

88 

89def run_backup() -> BackupRecord: 

90 """ 

91 Produce an encrypted ZIP backup of the PostgreSQL database and uploaded 

92 documents. 

93 

94 The ZIP contains: 

95 - ``openhangar.sql`` — full pg_dump output 

96 - ``uploads/<filename>`` — every file from the uploads folder 

97 

98 The ZIP is then AES-256-GCM encrypted and written to the backup folder. 

99 A ``BackupRecord`` row is committed and returned. 

100 

101 Raises ``RuntimeError`` on failure; the record is still committed with 

102 ``status='failed'`` so operators can see the attempt. 

103 """ 

104 from flask import current_app # pyright: ignore[reportMissingImports] 

105 

106 backup_folder = current_app.config.get("BACKUP_FOLDER", "/data/backups") 

107 upload_folder = current_app.config.get("UPLOAD_FOLDER", "/data/uploads") 

108 encryption_key_raw = os.environ.get("OPENHANGAR_BACKUP_ENCRYPTION_KEY", "") 

109 database_url = current_app.config.get("SQLALCHEMY_DATABASE_URI", "") 

110 

111 os.makedirs(backup_folder, exist_ok=True) 

112 

113 ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") 

114 app_version = os.environ.get("OPENHANGAR_VERSION", "development") 

115 filename = f"openhangar_backup_{ts}_{app_version}.zip.enc" 

116 path = os.path.join(backup_folder, filename) 

117 alembic_head = _get_alembic_head() 

118 metadata = { 

119 "app_version": app_version, 

120 "alembic_head": alembic_head, 

121 "created_at": datetime.now(timezone.utc).isoformat(), 

122 } 

123 

124 record = BackupRecord( 

125 filename=filename, 

126 path=path, 

127 status="failed", 

128 app_version=app_version, 

129 alembic_head=alembic_head, 

130 ) 

131 db.session.add(record) 

132 db.session.flush() # get an id without committing 

133 

134 try: 

135 sql_bytes = _pg_dump(database_url) 

136 

137 buf = io.BytesIO() 

138 with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: 

139 zf.writestr("openhangar.sql", sql_bytes) 

140 zf.writestr("metadata.json", json.dumps(metadata, indent=2)) 

141 _add_uploads_to_zip(zf, upload_folder) 

142 zip_bytes = buf.getvalue() 

143 

144 if encryption_key_raw: 

145 key = _derive_key(encryption_key_raw) 

146 payload = _encrypt_bytes(zip_bytes, key) 

147 else: 

148 payload = zip_bytes 

149 log.warning( 

150 "OPENHANGAR_BACKUP_ENCRYPTION_KEY not set — backup is unencrypted" 

151 ) 

152 

153 with open(path, "wb") as fh: 

154 fh.write(payload) 

155 

156 meta_path = path.replace(".zip.enc", ".meta") 

157 with open(meta_path, "w") as fh: 

158 json.dump(metadata, fh, indent=2) 

159 

160 sha256 = hashlib.sha256(payload).hexdigest() 

161 record.size_bytes = len(payload) 

162 record.sha256 = sha256 

163 record.status = "ok" 

164 except Exception as exc: 

165 log.error("Backup failed: %s", exc) 

166 db.session.commit() 

167 raise RuntimeError(str(exc)) from exc 

168 

169 db.session.commit() 

170 return record 

171 

172 

173def _add_uploads_to_zip(zf: zipfile.ZipFile, upload_folder: str) -> None: 

174 """Add every file in *upload_folder* into the ZIP under ``uploads/``.""" 

175 if not os.path.isdir(upload_folder): 

176 return 

177 for entry in os.scandir(upload_folder): 

178 if entry.is_file(): 

179 zf.write(entry.path, arcname=f"uploads/{entry.name}") 

180 

181 

182def _pg_dump(database_url: str) -> bytes: 

183 """Run pg_dump against *database_url* and return the SQL as bytes.""" 

184 env = os.environ.copy() 

185 if database_url.startswith("postgresql"): 

186 env["DATABASE_URL"] = database_url 

187 # pg_dump reads PGPASSWORD / connection string 

188 cmd = ["pg_dump", "--no-password", database_url] 

189 else: 

190 raise RuntimeError(f"Unsupported database URL scheme: {database_url!r}") 

191 

192 result = subprocess.run( # nosec B603 # fixed list, no shell, DB URL from server config 

193 cmd, 

194 capture_output=True, 

195 env=env, 

196 timeout=120, 

197 ) 

198 if result.returncode != 0: 

199 raise RuntimeError(result.stderr.decode(errors="replace")) 

200 return result.stdout 

201 

202 

203# ── views ───────────────────────────────────────────────────────────────────── 

204 

205 

206@config_bp.before_request 

207def _block_in_demo() -> None: 

208 if os.environ.get("OPENHANGAR_ENV") == "demo": 

209 abort(403) 

210 if session.get("user_id"): 

211 # All logged-in users may manage their own notification preferences 

212 if request.endpoint == "config.notification_preferences": 

213 return 

214 from models import Role, User # pyright: ignore[reportMissingImports] 

215 from utils import current_user_role # pyright: ignore[reportMissingImports] 

216 

217 user = db.session.get(User, session["user_id"]) 

218 # Instance admins always pass — they may not have a tenant role 

219 if user and user.is_instance_admin: 

220 return 

221 if current_user_role() not in (Role.ADMIN, Role.OWNER): 

222 abort(403) 

223 

224 

225@config_bp.route("/") 

226def index() -> ResponseReturnValue: 

227 if not session.get("user_id"): 

228 return redirect(url_for("auth.login")) 

229 from services.email_service import get_email_health, get_smtp_status # pyright: ignore[reportMissingImports] 

230 

231 _BACKUP_DISPLAY_LIMIT = 10 

232 total_backups = BackupRecord.query.count() 

233 records = ( 

234 BackupRecord.query.order_by(BackupRecord.created_at.desc()) 

235 .limit(_BACKUP_DISPLAY_LIMIT) 

236 .all() 

237 ) 

238 backup_extra = max(0, total_backups - _BACKUP_DISPLAY_LIMIT) 

239 from sqlalchemy import func # pyright: ignore[reportMissingImports] 

240 from models import Role, TenantUser, User, UserInvitation # pyright: ignore[reportMissingImports] 

241 

242 _role_labels = { 

243 Role.ADMIN: "Admin", 

244 Role.OWNER: "Owner", 

245 Role.PILOT: "Pilot / Renter", 

246 Role.MAINTENANCE: "Maintenance", 

247 Role.VIEWER: "Viewer", 

248 } 

249 tu_self = TenantUser.query.filter_by(user_id=session["user_id"]).first() 

250 tid = tu_self.tenant_id if tu_self else None 

251 user_counts = [] 

252 open_invitations = 0 

253 if tid: 

254 results = ( 

255 db.session.query(TenantUser.role, func.count(TenantUser.user_id)) 

256 .join(User, TenantUser.user_id == User.id) 

257 .filter(TenantUser.tenant_id == tid, User.is_active.is_(True)) 

258 .group_by(TenantUser.role) 

259 .all() 

260 ) 

261 counts_by_role = dict(results) 

262 user_counts = [ 

263 (_role_labels[r], counts_by_role[r]) 

264 for r in Role 

265 if counts_by_role.get(r, 0) > 0 

266 ] 

267 open_invitations = ( 

268 UserInvitation.query.filter_by(tenant_id=tid) 

269 .filter(UserInvitation.accepted_at.is_(None)) 

270 .count() 

271 ) 

272 current_version = os.environ.get("OPENHANGAR_VERSION", "development") 

273 latest_setting = db.session.get(AppSetting, "latest_version") 

274 latest_version = latest_setting.value if latest_setting else None 

275 try: 

276 from packaging.version import Version # pyright: ignore[reportMissingImports] 

277 

278 update_available = bool( 

279 latest_version 

280 and current_version != "development" 

281 and Version(latest_version) > Version(current_version) 

282 ) 

283 except Exception: 

284 update_available = False 

285 versions_behind: int | None = None 

286 try: 

287 import json as _json 

288 

289 _all_v_setting = db.session.get(AppSetting, "all_versions") 

290 if _all_v_setting and current_version != "development": 

291 _all_versions = _json.loads(_all_v_setting.value) 

292 if isinstance(_all_versions, list) and current_version in _all_versions: 

293 _idx = _all_versions.index(current_version) 

294 if _idx > 0: 

295 versions_behind = _idx 

296 except Exception as exc: 

297 log.debug("Could not compute versions-behind count: %s", exc) 

298 db_size: str | None = None 

299 try: 

300 from sqlalchemy import text as _text # pyright: ignore[reportMissingImports] 

301 

302 _res = db.session.execute( 

303 _text("SELECT pg_size_pretty(pg_database_size(current_database()))") 

304 ).scalar() 

305 db_size = str(_res) if _res is not None else None 

306 except Exception as exc: 

307 log.debug("Could not retrieve DB size: %s", exc) 

308 upload_size_bytes: int | None = None 

309 try: 

310 _upload_folder = current_app.config.get("UPLOAD_FOLDER", "/data/uploads") 

311 if os.path.isdir(_upload_folder): 

312 upload_size_bytes = sum( 

313 int(e.stat().st_size) for e in os.scandir(_upload_folder) if e.is_file() 

314 ) 

315 except Exception as exc: 

316 log.debug("Could not retrieve upload folder size: %s", exc) 

317 from models import Tenant, User # pyright: ignore[reportMissingImports] 

318 

319 current_user = db.session.get(User, session["user_id"]) 

320 tenant_count = Tenant.query.count() 

321 _tenant = db.session.get(Tenant, tid) if tid else None 

322 upgrade_dir = os.environ.get("OPENHANGAR_UPGRADE_DIR", "") 

323 upgrade_dir_enabled = bool(upgrade_dir) 

324 upgrade_active = False 

325 if upgrade_dir: 

326 upgrade_active = os.path.exists( 

327 os.path.join(upgrade_dir, "trigger") 

328 ) or os.path.exists(os.path.join(upgrade_dir, "trigger.running")) 

329 return render_template( 

330 "config/settings.html", 

331 records=records, 

332 backup_extra=backup_extra, 

333 backup_encryption_key_set=bool( 

334 os.environ.get("OPENHANGAR_BACKUP_ENCRYPTION_KEY") 

335 ), 

336 backup_folder=current_app.config.get("BACKUP_FOLDER", "/data/backups"), 

337 smtp_status=get_smtp_status(), 

338 email_health=get_email_health(), 

339 user_counts=user_counts, 

340 open_invitations=open_invitations, 

341 current_version=current_version, 

342 latest_version=latest_version, 

343 update_available=update_available, 

344 versions_behind=versions_behind, 

345 db_size=db_size, 

346 upload_size_bytes=upload_size_bytes, 

347 current_user=current_user, 

348 tenant_count=tenant_count, 

349 tenant=_tenant, 

350 openaip_api_key=( 

351 db.session.get(AppSetting, "openaip_api_key") 

352 or type("_", (), {"value": None})() 

353 ).value, 

354 gatus_configured=_parse_gatus_env() is not None, 

355 upgrade_dir_enabled=upgrade_dir_enabled, 

356 upgrade_active=upgrade_active, 

357 ) 

358 

359 

360@config_bp.route("/tenant-slug", methods=["POST"]) 

361@login_required 

362def update_tenant_slug() -> ResponseReturnValue: 

363 import re as _re 

364 import shutil 

365 from models import Document, PendingReconcile, Tenant, TenantUser # pyright: ignore[reportMissingImports] 

366 

367 tu = TenantUser.query.filter_by(user_id=session["user_id"]).first() 

368 if not tu: 

369 abort(403) # pragma: no cover 

370 tenant = db.session.get(Tenant, tu.tenant_id) 

371 if not tenant: 

372 abort(403) # pragma: no cover 

373 

374 raw = request.form.get("slug", "").strip().lower() 

375 if not raw: 

376 flash(_("Hangar ID cannot be empty."), "danger") 

377 return redirect(url_for("config.index")) 

378 

379 slug = _re.sub(r"[^a-z0-9]+", "-", raw).strip("-")[:64] 

380 if not slug: 

381 flash(_("Hangar ID must contain at least one letter or digit."), "danger") 

382 return redirect(url_for("config.index")) 

383 

384 existing = Tenant.query.filter(Tenant.slug == slug, Tenant.id != tenant.id).first() 

385 if existing: 

386 flash(_("That Hangar ID is already in use. Please choose another."), "danger") 

387 return redirect(url_for("config.index")) 

388 

389 old_slug = tenant.slug 

390 tenant.slug = slug 

391 

392 if old_slug and old_slug != slug: 

393 from documents.routes import _safe_join # pyright: ignore[reportMissingImports] 

394 

395 folder = current_app.config.get("UPLOAD_FOLDER", "/data/uploads") 

396 old_dir = _safe_join(folder, old_slug) 

397 new_dir = _safe_join(folder, slug) 

398 if os.path.isdir(old_dir): 

399 if os.path.isdir(new_dir): 

400 # Destination already exists — merge file-by-file 

401 for dirpath, _dirs, filenames in os.walk(old_dir): 

402 rel = os.path.relpath(dirpath, old_dir) 

403 dest_dir = os.path.join(new_dir, rel) 

404 os.makedirs(dest_dir, exist_ok=True) 

405 for fname in filenames: 

406 shutil.move( 

407 os.path.join(dirpath, fname), 

408 os.path.join(dest_dir, fname), 

409 ) 

410 shutil.rmtree(old_dir, ignore_errors=True) 

411 else: 

412 os.rename(old_dir, new_dir) 

413 

414 # Rewrite stored paths in the database 

415 prefix_old = old_slug + "/" 

416 prefix_new = slug + "/" 

417 for doc in Document.query.filter(Document.filename.like(old_slug + "/%")).all(): 

418 doc.filename = prefix_new + doc.filename[len(prefix_old) :] 

419 for pr in PendingReconcile.query.filter( 

420 PendingReconcile.filepath.like(old_slug + "/%") 

421 ).all(): 

422 pr.filepath = prefix_new + pr.filepath[len(prefix_old) :] 

423 

424 db.session.commit() 

425 flash(_("Hangar ID saved."), "success") 

426 return redirect(url_for("config.index")) 

427 

428 

429@config_bp.route("/map-tiles", methods=["POST"]) 

430@login_required 

431def update_map_tiles() -> ResponseReturnValue: 

432 # ADMIN/OWNER enforcement is handled by config_bp.before_request. 

433 key = request.form.get("openaip_api_key", "").strip() 

434 setting = db.session.get(AppSetting, "openaip_api_key") 

435 if key: 

436 if setting: 

437 setting.value = key 

438 else: 

439 db.session.add(AppSetting(key="openaip_api_key", value=key)) 

440 db.session.commit() 

441 flash(_("OpenAIP API key saved."), "success") 

442 else: 

443 if setting: 

444 db.session.delete(setting) 

445 db.session.commit() 

446 flash(_("OpenAIP API key removed."), "success") 

447 return redirect(url_for("config.index")) 

448 

449 

450@config_bp.route("/run", methods=["POST"]) 

451def run_backup_now() -> ResponseReturnValue: 

452 if not session.get("user_id"): 

453 abort(403) 

454 try: 

455 record = run_backup() 

456 flash(_("Backup completed: %(filename)s", filename=record.filename), "success") 

457 except RuntimeError as exc: 

458 flash(_("Backup failed: %(error)s", error=exc), "danger") 

459 return redirect(url_for("config.index")) 

460 

461 

462@config_bp.route("/profile", methods=["POST"]) 

463def update_profile() -> ResponseReturnValue: 

464 if not session.get("user_id"): 

465 abort(403) 

466 from models import OperatingModel, Tenant, TenantProfile, TenantUser # pyright: ignore[reportMissingImports] 

467 

468 tu = TenantUser.query.filter_by(user_id=session["user_id"]).first() 

469 if not tu: 

470 abort(403) # pragma: no cover 

471 

472 profile = TenantProfile.query.filter_by(tenant_id=tu.tenant_id).first() 

473 if not profile: 

474 profile = TenantProfile(tenant_id=tu.tenant_id, setup_complete=True) 

475 db.session.add(profile) 

476 

477 model_str = request.form.get("operating_model", "sole_operator") 

478 try: 

479 profile.operating_model = OperatingModel(model_str) 

480 except ValueError: 

481 flash(_("Invalid operating model."), "danger") 

482 return redirect(url_for("config.index")) 

483 if model_str == "sole_pilot": 

484 profile.planned_aircraft_count = 0 

485 profile.allows_rental = False 

486 else: 

487 try: 

488 count = max(1, int(request.form.get("planned_aircraft_count") or 1)) 

489 except (ValueError, TypeError): 

490 count = 1 

491 profile.planned_aircraft_count = count 

492 profile.allows_rental = bool(request.form.get("allows_rental")) 

493 

494 tenant = db.session.get(Tenant, tu.tenant_id) 

495 if tenant: 

496 tenant.require_totp = bool(request.form.get("require_totp")) 

497 

498 db.session.commit() 

499 flash(_("Usage profile updated."), "success") 

500 return redirect(url_for("config.index")) 

501 

502 

503@config_bp.route("/email/test", methods=["POST"]) 

504def test_email() -> ResponseReturnValue: 

505 if not session.get("user_id"): 

506 abort(403) 

507 from models import User # pyright: ignore[reportMissingImports] 

508 from services.email_service import ( 

509 EmailNotConfiguredError, 

510 EmailSendError, 

511 send_email, 

512 ) # pyright: ignore[reportMissingImports] 

513 

514 user = db.session.get(User, session["user_id"]) 

515 if not user: 

516 abort(403) # pragma: no cover 

517 try: 

518 send_email( 

519 to=user.email, 

520 subject="OpenHangar — test email", 

521 text_body=( 

522 "This is a test email from your OpenHangar instance.\n\n" 

523 "If you received this, your SMTP configuration is working correctly." 

524 ), 

525 ) 

526 flash(_("Test email sent to %(email)s.", email=user.email), "success") 

527 except EmailNotConfiguredError as exc: 

528 flash(_("Email not configured: %(error)s", error=exc), "warning") 

529 except EmailSendError as exc: 

530 flash(_("Email send failed: %(error)s", error=exc), "danger") 

531 return redirect(url_for("config.index")) 

532 

533 

534@config_bp.route("/check-version", methods=["POST"]) 

535def check_version() -> ResponseReturnValue: 

536 if not session.get("user_id"): 

537 abort(403) 

538 import json as _json 

539 from datetime import datetime, timezone 

540 from services.version_service import fetch_versions, upsert_app_setting # pyright: ignore[reportMissingImports] 

541 

542 versions = fetch_versions() 

543 upsert_app_setting( 

544 db.session, 

545 "version_last_checked_at", 

546 datetime.now(timezone.utc).isoformat(), 

547 ) 

548 if versions: 

549 upsert_app_setting(db.session, "latest_version", versions[0]) 

550 upsert_app_setting(db.session, "all_versions", _json.dumps(versions)) 

551 db.session.commit() 

552 flash(_("Version check refreshed."), "success") 

553 return redirect(url_for("config.index")) 

554 

555 

556# ── One-click upgrade ───────────────────────────────────────────────────────── 

557 

558 

559@config_bp.route("/trigger-upgrade", methods=["POST"]) 

560def trigger_upgrade() -> ResponseReturnValue: 

561 if not session.get("user_id"): 

562 abort(403) 

563 upgrade_dir = os.environ.get("OPENHANGAR_UPGRADE_DIR", "") 

564 if not upgrade_dir: 

565 abort(404) 

566 os.makedirs(upgrade_dir, exist_ok=True) 

567 running_path = os.path.join(upgrade_dir, "trigger.running") 

568 trigger_path = os.path.join(upgrade_dir, "trigger") 

569 if os.path.exists(running_path): 

570 flash(_("An upgrade is already in progress."), "warning") 

571 return redirect(url_for("config.index")) 

572 if os.path.exists(trigger_path): 

573 flash(_("Upgrade already triggered."), "info") 

574 return redirect(url_for("config.index")) 

575 from models import User # pyright: ignore[reportMissingImports] 

576 

577 user = db.session.get(User, session["user_id"]) 

578 trigger_data = { 

579 "triggered_by": user.email if user else "unknown", 

580 "triggered_at": datetime.now(timezone.utc).isoformat(), 

581 } 

582 with open(trigger_path, "w") as fh: 

583 json.dump(trigger_data, fh) 

584 flash(_("Upgrade triggered. The service will restart shortly."), "info") 

585 return redirect(url_for("config.index")) 

586 

587 

588@config_bp.route("/upgrade-status") 

589def upgrade_status() -> ResponseReturnValue: 

590 if not session.get("user_id"): 

591 abort(403) 

592 upgrade_dir = os.environ.get("OPENHANGAR_UPGRADE_DIR", "") 

593 if not upgrade_dir: 

594 return abort(404) 

595 done_path = os.path.join(upgrade_dir, "trigger.done") 

596 failed_path = os.path.join(upgrade_dir, "trigger.failed") 

597 running_path = os.path.join(upgrade_dir, "trigger.running") 

598 trigger_path = os.path.join(upgrade_dir, "trigger") 

599 if os.path.exists(done_path): 

600 with contextlib.suppress(OSError): 

601 os.remove(done_path) 

602 return jsonify({"status": "done"}) 

603 if os.path.exists(failed_path): 

604 msg = "" 

605 with contextlib.suppress(OSError): 

606 with open(failed_path) as fh: 

607 msg = fh.read().strip() 

608 os.remove(failed_path) 

609 return jsonify({"status": "failed", "message": msg}) 

610 if os.path.exists(running_path): 

611 return jsonify({"status": "in-progress"}) 

612 if os.path.exists(trigger_path): 

613 return jsonify({"status": "triggered"}) 

614 return jsonify({"status": "idle"}) 

615 

616 

617# ── Phase 29: Tenant management (instance admin only) ───────────────────────── 

618 

619 

620@config_bp.route("/tenants") 

621@require_instance_admin 

622def tenant_list() -> ResponseReturnValue: 

623 from models import Aircraft, Role, Tenant, TenantUser # pyright: ignore[reportMissingImports] 

624 

625 tenants = Tenant.query.order_by(Tenant.created_at).all() 

626 stats = [] 

627 for t in tenants: 

628 user_count = TenantUser.query.filter_by(tenant_id=t.id).count() 

629 aircraft_count = Aircraft.query.filter_by(tenant_id=t.id).count() 

630 owners = ( 

631 TenantUser.query.filter_by(tenant_id=t.id) 

632 .filter(TenantUser.role.in_([Role.OWNER, Role.ADMIN])) 

633 .all() 

634 ) 

635 stats.append( 

636 { 

637 "tenant": t, 

638 "user_count": user_count, 

639 "aircraft_count": aircraft_count, 

640 "owners": owners, 

641 } 

642 ) 

643 

644 return render_template("config/tenant_list.html", stats=stats) 

645 

646 

647@config_bp.route("/tenants/create", methods=["GET", "POST"]) 

648@require_instance_admin 

649def tenant_create() -> ResponseReturnValue: 

650 from datetime import timedelta 

651 

652 from models import OperatingModel, Role, Tenant, TenantProfile, User, UserInvitation # pyright: ignore[reportMissingImports] 

653 

654 user = db.session.get(User, session["user_id"]) 

655 assert user is not None # guaranteed by @require_instance_admin 

656 

657 if request.method == "POST": 

658 name = request.form.get("name", "").strip() 

659 admin_email = request.form.get("admin_email", "").strip().lower() 

660 model_str = request.form.get("operating_model", "sole_operator") 

661 

662 if not name: 

663 flash(_("Tenant name is required."), "danger") 

664 return render_template("config/tenant_create.html") 

665 if not admin_email: 

666 flash(_("Admin email is required."), "danger") 

667 return render_template("config/tenant_create.html") 

668 

669 tenant = Tenant(name=name, is_active=True) 

670 db.session.add(tenant) 

671 db.session.flush() 

672 

673 try: 

674 op_model: OperatingModel | None = OperatingModel(model_str) 

675 except ValueError: 

676 op_model = None 

677 

678 profile = TenantProfile( 

679 tenant_id=tenant.id, 

680 operating_model=op_model, 

681 setup_complete=False, 

682 ) 

683 db.session.add(profile) 

684 

685 invitation = UserInvitation( 

686 tenant_id=tenant.id, 

687 invited_by_user_id=user.id, 

688 email=admin_email, 

689 role=Role.OWNER, 

690 expires_at=datetime.now(timezone.utc) + timedelta(days=7), 

691 ) 

692 db.session.add(invitation) 

693 db.session.commit() 

694 

695 flash( 

696 _( 

697 "Tenant '%(name)s' created. Share this invite link with the owner: %(url)s", 

698 name=name, 

699 url=url_for( 

700 "users.accept_invite", token=invitation.token, _external=True 

701 ), 

702 ), 

703 "success", 

704 ) 

705 return redirect(url_for("config.tenant_list")) 

706 

707 return render_template("config/tenant_create.html") 

708 

709 

710@config_bp.route("/tenants/<int:tenant_id>/toggle", methods=["POST"]) 

711@require_instance_admin 

712def tenant_toggle_active(tenant_id: int) -> ResponseReturnValue: 

713 from models import Tenant # pyright: ignore[reportMissingImports] 

714 

715 tenant = db.session.get(Tenant, tenant_id) 

716 if not tenant: 

717 abort(404) 

718 

719 tenant.is_active = not tenant.is_active 

720 db.session.commit() 

721 

722 if tenant.is_active: 

723 flash(_("Tenant '%(name)s' reactivated.", name=tenant.name), "success") 

724 else: 

725 flash(_("Tenant '%(name)s' deactivated.", name=tenant.name), "warning") 

726 

727 return redirect(url_for("config.tenant_list")) 

728 

729 

730@config_bp.route("/tenants/<int:tenant_id>/reset-password", methods=["POST"]) 

731@require_instance_admin 

732def tenant_reset_owner_password(tenant_id: int) -> ResponseReturnValue: 

733 from datetime import timedelta 

734 

735 from models import PasswordResetToken, Role, Tenant, TenantUser, User # pyright: ignore[reportMissingImports] 

736 

737 admin = db.session.get(User, session["user_id"]) 

738 assert admin is not None # guaranteed by @require_instance_admin 

739 tenant = db.session.get(Tenant, tenant_id) 

740 if not tenant: 

741 abort(404) 

742 

743 owner_user_id = request.form.get("owner_user_id", type=int) 

744 if not owner_user_id: 

745 flash(_("No user selected."), "danger") 

746 return redirect(url_for("config.tenant_list")) 

747 

748 tu = TenantUser.query.filter_by(tenant_id=tenant_id, user_id=owner_user_id).first() 

749 if not tu or tu.role not in (Role.OWNER, Role.ADMIN): 

750 abort(403) 

751 

752 token = PasswordResetToken( 

753 user_id=owner_user_id, 

754 generated_by_user_id=admin.id, 

755 expires_at=datetime.now(timezone.utc) + timedelta(hours=24), 

756 ) 

757 db.session.add(token) 

758 db.session.commit() 

759 

760 reset_url = url_for("auth.reset_password", token=token.token, _external=True) 

761 return render_template( 

762 "config/tenant_reset_token.html", 

763 tenant=tenant, 

764 reset_url=reset_url, 

765 expires_at=token.expires_at, 

766 ) 

767 

768 

769@config_bp.route("/notifications/", methods=["GET", "POST"]) 

770@login_required 

771def notification_preferences() -> ResponseReturnValue: 

772 """Manage per-user notification preferences. Accessible to all logged-in users.""" 

773 from models import ( # pyright: ignore[reportMissingImports] 

774 NotificationPreference, 

775 NotificationType, 

776 Role, 

777 TenantNotificationDefault, 

778 TenantProfile, 

779 TenantUser, 

780 User, 

781 ) 

782 from utils import current_user_role # pyright: ignore[reportMissingImports] 

783 

784 user = db.session.get(User, session["user_id"]) 

785 if not user: 

786 abort(403) 

787 

788 tu = TenantUser.query.filter_by(user_id=user.id).first() 

789 tenant_id = tu.tenant_id if tu else None 

790 role = current_user_role() 

791 

792 is_owner = role in (Role.ADMIN, Role.OWNER) 

793 is_pilot = ( 

794 role in (Role.ADMIN, Role.OWNER, Role.PILOT, Role.INSTRUCTOR) or user.is_pilot 

795 ) 

796 is_maint = ( 

797 role in (Role.ADMIN, Role.OWNER, Role.MAINTENANCE, Role.INSTRUCTOR) 

798 or user.is_maintenance 

799 ) 

800 

801 def _user_has_cap(caps: list[str]) -> bool: 

802 return ( 

803 ("is_owner" in caps and is_owner) 

804 or ("is_pilot" in caps and is_pilot) 

805 or ("is_maint" in caps and is_maint) 

806 ) 

807 

808 visible_types = [ 

809 t 

810 for t in NotificationType.ALL 

811 if _user_has_cap(NotificationType.REQUIRED_CAPS.get(t, [])) 

812 ] 

813 

814 if request.method == "POST": 

815 if tenant_id is None: 

816 flash(_("Cannot save: no tenant associated."), "danger") 

817 return redirect(url_for("config.notification_preferences")) 

818 

819 for notif_type in visible_types: 

820 enabled = bool(request.form.get(f"enabled_{notif_type}")) 

821 threshold_raw = request.form.get(f"threshold_{notif_type}", "").strip() 

822 threshold_days: int | None = None 

823 if notif_type in NotificationType.HAS_THRESHOLD and threshold_raw: 

824 try: 

825 threshold_days = max(1, int(threshold_raw)) 

826 except ValueError: 

827 threshold_days = None 

828 

829 existing = NotificationPreference.query.filter_by( 

830 user_id=user.id, tenant_id=tenant_id, notification_type=notif_type 

831 ).first() 

832 # Only save if the user's preference differs from the effective default 

833 system_default = NotificationType.SYSTEM_DEFAULTS.get(notif_type, {}) 

834 same_as_default = enabled == system_default.get( 

835 "enabled", False 

836 ) and threshold_days == system_default.get("threshold_days") 

837 if same_as_default and existing: 

838 db.session.delete(existing) 

839 elif not same_as_default: 

840 if existing: 

841 existing.enabled = enabled 

842 existing.threshold_days = threshold_days 

843 else: 

844 db.session.add( 

845 NotificationPreference( 

846 user_id=user.id, 

847 tenant_id=tenant_id, 

848 notification_type=notif_type, 

849 enabled=enabled, 

850 threshold_days=threshold_days, 

851 ) 

852 ) 

853 db.session.commit() 

854 flash(_("Notification preferences saved."), "success") 

855 return redirect(url_for("config.notification_preferences")) 

856 

857 # Build current effective preferences for display 

858 prefs: dict[str, dict[str, object]] = {} 

859 for notif_type in visible_types: 

860 if tenant_id: 

861 from services.notification_service import get_effective_preference # pyright: ignore[reportMissingImports] 

862 

863 prefs[notif_type] = get_effective_preference(user.id, tenant_id, notif_type) 

864 else: 

865 prefs[notif_type] = dict( 

866 NotificationType.SYSTEM_DEFAULTS.get( 

867 notif_type, {"enabled": False, "threshold_days": None} 

868 ) 

869 ) 

870 

871 # Tenant defaults visible only to admins/owners 

872 tenant_defaults: dict[str, dict[str, object]] | None = None 

873 if is_owner and tenant_id: 

874 tenant_defaults = {} 

875 for notif_type in NotificationType.ALL: 

876 td = TenantNotificationDefault.query.filter_by( 

877 tenant_id=tenant_id, notification_type=notif_type 

878 ).first() 

879 if td: 

880 tenant_defaults[notif_type] = { 

881 "enabled": td.enabled, 

882 "threshold_days": td.threshold_days, 

883 } 

884 else: 

885 tenant_defaults[notif_type] = dict( 

886 NotificationType.SYSTEM_DEFAULTS.get( 

887 notif_type, {"enabled": False, "threshold_days": None} 

888 ) 

889 ) 

890 

891 profile = ( 

892 TenantProfile.query.filter_by(tenant_id=tenant_id).first() 

893 if tenant_id 

894 else None 

895 ) 

896 

897 return render_template( 

898 "config/notifications.html", 

899 visible_types=visible_types, 

900 prefs=prefs, 

901 has_threshold=NotificationType.HAS_THRESHOLD, 

902 system_defaults=NotificationType.SYSTEM_DEFAULTS, 

903 tenant_defaults=tenant_defaults, 

904 is_owner=is_owner, 

905 profile=profile, 

906 ) 

907 

908 

909@config_bp.route("/backfill/aircraft-type-icao", methods=["POST"]) 

910@require_instance_admin 

911def backfill_aircraft_type_icao() -> ResponseReturnValue: 

912 """Resolve aircraft_type_icao for all logbook entries that have aircraft_type but no icao designator.""" 

913 from models import PilotLogbookEntry # pyright: ignore[reportMissingImports] 

914 from utils import resolve_aircraft_type_icao # pyright: ignore[reportMissingImports] 

915 

916 rows = PilotLogbookEntry.query.filter( 

917 PilotLogbookEntry.aircraft_type.isnot(None), 

918 PilotLogbookEntry.aircraft_type_icao.is_(None), 

919 ).all() 

920 

921 updated = 0 

922 for entry in rows: 

923 resolved = resolve_aircraft_type_icao(entry.aircraft_type) 

924 if resolved: 

925 entry.aircraft_type_icao = resolved 

926 updated += 1 

927 

928 db.session.commit() 

929 flash( 

930 ngettext( 

931 "Back-fill complete: one of %(total)d entry resolved.", 

932 "Back-fill complete: %(updated)d of %(total)d entries resolved.", 

933 updated, 

934 updated=updated, 

935 total=len(rows), 

936 ), 

937 "success", 

938 ) 

939 return redirect(url_for("config.index")) 

940 

941 

942_ALLOWED_BADGE_PATHS: dict[str, str] = { 

943 "uptimes/1h/badge.svg": "uptimes/1h/badge.svg", 

944 "uptimes/24h/badge.svg": "uptimes/24h/badge.svg", 

945 "uptimes/7d/badge.svg": "uptimes/7d/badge.svg", 

946 "uptimes/30d/badge.svg": "uptimes/30d/badge.svg", 

947 "response-times/1h/badge.svg": "response-times/1h/badge.svg", 

948 "response-times/24h/badge.svg": "response-times/24h/badge.svg", 

949 "response-times/7d/badge.svg": "response-times/7d/badge.svg", 

950 "response-times/30d/badge.svg": "response-times/30d/badge.svg", 

951} 

952 

953 

954@config_bp.route("/gatus-badge/<path:badge_path>") 

955@login_required 

956def gatus_badge(badge_path: str) -> ResponseReturnValue: 

957 safe_path = _ALLOWED_BADGE_PATHS.get(badge_path) 

958 if safe_path is None: 

959 return abort(404) 

960 gatus = _parse_gatus_env() 

961 if gatus is None: 

962 return abort(404) 

963 base_url, endpoint_key, auth_header = gatus 

964 badge_url = f"{base_url}/api/v1/endpoints/{endpoint_key}/{safe_path}" 

965 req = urllib.request.Request(badge_url) 

966 if auth_header: 

967 req.add_header("Authorization", f"Basic {auth_header}") 

968 try: 

969 with urllib.request.urlopen(req, timeout=5) as resp: # nosec B310 

970 content = resp.read() 

971 content_type = resp.headers.get("Content-Type", "image/svg+xml") 

972 return current_app.response_class(content, mimetype=content_type) 

973 except urllib.error.URLError as exc: 

974 log.warning("Gatus badge fetch failed (%s): %s", badge_url, repr(exc)) 

975 return abort(503)