Coverage for app/config/routes.py: 100%
545 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1"""
2Configuration blueprint — backup management, email settings, and future config sections.
3"""
5import contextlib
6import hashlib
7import io
8import json
9import logging
10import os
11import subprocess # nosec B404
12import urllib.error
13import urllib.request
14import zipfile
15from datetime import datetime, timezone
17from flask import (
18 Blueprint,
19 abort,
20 current_app,
21 flash,
22 jsonify,
23 redirect,
24 render_template,
25 request,
26 session,
27 url_for,
28) # pyright: ignore[reportMissingImports]
29from flask.typing import ResponseReturnValue # pyright: ignore[reportMissingImports]
30from flask_babel import gettext as _, ngettext # pyright: ignore[reportMissingImports]
32from models import AppSetting, BackupRecord, db # pyright: ignore[reportMissingImports]
33from utils import login_required, require_instance_admin # pyright: ignore[reportMissingImports]
35config_bp = Blueprint("config", __name__, url_prefix="/config")
36log = logging.getLogger(__name__)
39# ── helpers ───────────────────────────────────────────────────────────────────
42def _derive_key(passphrase: str) -> bytes:
43 """Derive a 32-byte AES key from a passphrase using HKDF-SHA256."""
44 from cryptography.hazmat.primitives.kdf.hkdf import HKDF # pyright: ignore[reportMissingImports]
45 from cryptography.hazmat.primitives import hashes # pyright: ignore[reportMissingImports]
47 return HKDF(
48 algorithm=hashes.SHA256(),
49 length=32,
50 salt=b"openhangar-backup-kdf-salt-v1",
51 info=b"openhangar-backup-v1",
52 ).derive(passphrase.encode())
55def _encrypt_bytes(plaintext: bytes, key: bytes) -> bytes:
56 """Encrypt *plaintext* with AES-256-GCM, prepending the 12-byte nonce."""
57 from cryptography.hazmat.primitives.ciphers.aead import AESGCM # pyright: ignore[reportMissingImports]
58 import os as _os
60 nonce = _os.urandom(12)
61 ct = AESGCM(key).encrypt(nonce, plaintext, None)
62 return nonce + ct
65def _get_alembic_head() -> str | None:
66 """Return current Alembic revision from the DB, or None if unavailable."""
67 try:
68 from sqlalchemy import text # pyright: ignore[reportMissingImports]
70 return db.session.execute(
71 text("SELECT version_num FROM alembic_version LIMIT 1")
72 ).scalar()
73 except Exception:
74 return None
77def _parse_gatus_env() -> tuple[str, str, str | None] | None:
78 """Return (base_url, endpoint_key, auth_header_or_None) from env vars, or None if not configured."""
79 endpoint_url = os.environ.get("OPENHANGAR_GATUS_ENDPOINT_URL", "").rstrip("/")
80 if not endpoint_url or "/endpoints/" not in endpoint_url:
81 return None
82 base_url, _, endpoint_key = endpoint_url.rpartition("/endpoints/")
83 if not base_url or not endpoint_key:
84 return None
85 auth_header = os.environ.get("OPENHANGAR_GATUS_AUTH_HEADER") or None
86 return base_url, endpoint_key, auth_header
89def run_backup() -> BackupRecord:
90 """
91 Produce an encrypted ZIP backup of the PostgreSQL database and uploaded
92 documents.
94 The ZIP contains:
95 - ``openhangar.sql`` — full pg_dump output
96 - ``uploads/<filename>`` — every file from the uploads folder
98 The ZIP is then AES-256-GCM encrypted and written to the backup folder.
99 A ``BackupRecord`` row is committed and returned.
101 Raises ``RuntimeError`` on failure; the record is still committed with
102 ``status='failed'`` so operators can see the attempt.
103 """
104 from flask import current_app # pyright: ignore[reportMissingImports]
106 backup_folder = current_app.config.get("BACKUP_FOLDER", "/data/backups")
107 upload_folder = current_app.config.get("UPLOAD_FOLDER", "/data/uploads")
108 encryption_key_raw = os.environ.get("OPENHANGAR_BACKUP_ENCRYPTION_KEY", "")
109 database_url = current_app.config.get("SQLALCHEMY_DATABASE_URI", "")
111 os.makedirs(backup_folder, exist_ok=True)
113 ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
114 app_version = os.environ.get("OPENHANGAR_VERSION", "development")
115 filename = f"openhangar_backup_{ts}_{app_version}.zip.enc"
116 path = os.path.join(backup_folder, filename)
117 alembic_head = _get_alembic_head()
118 metadata = {
119 "app_version": app_version,
120 "alembic_head": alembic_head,
121 "created_at": datetime.now(timezone.utc).isoformat(),
122 }
124 record = BackupRecord(
125 filename=filename,
126 path=path,
127 status="failed",
128 app_version=app_version,
129 alembic_head=alembic_head,
130 )
131 db.session.add(record)
132 db.session.flush() # get an id without committing
134 try:
135 sql_bytes = _pg_dump(database_url)
137 buf = io.BytesIO()
138 with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
139 zf.writestr("openhangar.sql", sql_bytes)
140 zf.writestr("metadata.json", json.dumps(metadata, indent=2))
141 _add_uploads_to_zip(zf, upload_folder)
142 zip_bytes = buf.getvalue()
144 if encryption_key_raw:
145 key = _derive_key(encryption_key_raw)
146 payload = _encrypt_bytes(zip_bytes, key)
147 else:
148 payload = zip_bytes
149 log.warning(
150 "OPENHANGAR_BACKUP_ENCRYPTION_KEY not set — backup is unencrypted"
151 )
153 with open(path, "wb") as fh:
154 fh.write(payload)
156 meta_path = path.replace(".zip.enc", ".meta")
157 with open(meta_path, "w") as fh:
158 json.dump(metadata, fh, indent=2)
160 sha256 = hashlib.sha256(payload).hexdigest()
161 record.size_bytes = len(payload)
162 record.sha256 = sha256
163 record.status = "ok"
164 except Exception as exc:
165 log.error("Backup failed: %s", exc)
166 db.session.commit()
167 raise RuntimeError(str(exc)) from exc
169 db.session.commit()
170 return record
173def _add_uploads_to_zip(zf: zipfile.ZipFile, upload_folder: str) -> None:
174 """Add every file in *upload_folder* into the ZIP under ``uploads/``."""
175 if not os.path.isdir(upload_folder):
176 return
177 for entry in os.scandir(upload_folder):
178 if entry.is_file():
179 zf.write(entry.path, arcname=f"uploads/{entry.name}")
182def _pg_dump(database_url: str) -> bytes:
183 """Run pg_dump against *database_url* and return the SQL as bytes."""
184 env = os.environ.copy()
185 if database_url.startswith("postgresql"):
186 env["DATABASE_URL"] = database_url
187 # pg_dump reads PGPASSWORD / connection string
188 cmd = ["pg_dump", "--no-password", database_url]
189 else:
190 raise RuntimeError(f"Unsupported database URL scheme: {database_url!r}")
192 result = subprocess.run( # nosec B603 # fixed list, no shell, DB URL from server config
193 cmd,
194 capture_output=True,
195 env=env,
196 timeout=120,
197 )
198 if result.returncode != 0:
199 raise RuntimeError(result.stderr.decode(errors="replace"))
200 return result.stdout
203# ── views ─────────────────────────────────────────────────────────────────────
206@config_bp.before_request
207def _block_in_demo() -> None:
208 if os.environ.get("OPENHANGAR_ENV") == "demo":
209 abort(403)
210 if session.get("user_id"):
211 # All logged-in users may manage their own notification preferences
212 if request.endpoint == "config.notification_preferences":
213 return
214 from models import Role, User # pyright: ignore[reportMissingImports]
215 from utils import current_user_role # pyright: ignore[reportMissingImports]
217 user = db.session.get(User, session["user_id"])
218 # Instance admins always pass — they may not have a tenant role
219 if user and user.is_instance_admin:
220 return
221 if current_user_role() not in (Role.ADMIN, Role.OWNER):
222 abort(403)
225@config_bp.route("/")
226def index() -> ResponseReturnValue:
227 if not session.get("user_id"):
228 return redirect(url_for("auth.login"))
229 from services.email_service import get_email_health, get_smtp_status # pyright: ignore[reportMissingImports]
231 _BACKUP_DISPLAY_LIMIT = 10
232 total_backups = BackupRecord.query.count()
233 records = (
234 BackupRecord.query.order_by(BackupRecord.created_at.desc())
235 .limit(_BACKUP_DISPLAY_LIMIT)
236 .all()
237 )
238 backup_extra = max(0, total_backups - _BACKUP_DISPLAY_LIMIT)
239 from sqlalchemy import func # pyright: ignore[reportMissingImports]
240 from models import Role, TenantUser, User, UserInvitation # pyright: ignore[reportMissingImports]
242 _role_labels = {
243 Role.ADMIN: "Admin",
244 Role.OWNER: "Owner",
245 Role.PILOT: "Pilot / Renter",
246 Role.MAINTENANCE: "Maintenance",
247 Role.VIEWER: "Viewer",
248 }
249 tu_self = TenantUser.query.filter_by(user_id=session["user_id"]).first()
250 tid = tu_self.tenant_id if tu_self else None
251 user_counts = []
252 open_invitations = 0
253 if tid:
254 results = (
255 db.session.query(TenantUser.role, func.count(TenantUser.user_id))
256 .join(User, TenantUser.user_id == User.id)
257 .filter(TenantUser.tenant_id == tid, User.is_active.is_(True))
258 .group_by(TenantUser.role)
259 .all()
260 )
261 counts_by_role = dict(results)
262 user_counts = [
263 (_role_labels[r], counts_by_role[r])
264 for r in Role
265 if counts_by_role.get(r, 0) > 0
266 ]
267 open_invitations = (
268 UserInvitation.query.filter_by(tenant_id=tid)
269 .filter(UserInvitation.accepted_at.is_(None))
270 .count()
271 )
272 current_version = os.environ.get("OPENHANGAR_VERSION", "development")
273 latest_setting = db.session.get(AppSetting, "latest_version")
274 latest_version = latest_setting.value if latest_setting else None
275 try:
276 from packaging.version import Version # pyright: ignore[reportMissingImports]
278 update_available = bool(
279 latest_version
280 and current_version != "development"
281 and Version(latest_version) > Version(current_version)
282 )
283 except Exception:
284 update_available = False
285 versions_behind: int | None = None
286 try:
287 import json as _json
289 _all_v_setting = db.session.get(AppSetting, "all_versions")
290 if _all_v_setting and current_version != "development":
291 _all_versions = _json.loads(_all_v_setting.value)
292 if isinstance(_all_versions, list) and current_version in _all_versions:
293 _idx = _all_versions.index(current_version)
294 if _idx > 0:
295 versions_behind = _idx
296 except Exception as exc:
297 log.debug("Could not compute versions-behind count: %s", exc)
298 db_size: str | None = None
299 try:
300 from sqlalchemy import text as _text # pyright: ignore[reportMissingImports]
302 _res = db.session.execute(
303 _text("SELECT pg_size_pretty(pg_database_size(current_database()))")
304 ).scalar()
305 db_size = str(_res) if _res is not None else None
306 except Exception as exc:
307 log.debug("Could not retrieve DB size: %s", exc)
308 upload_size_bytes: int | None = None
309 try:
310 _upload_folder = current_app.config.get("UPLOAD_FOLDER", "/data/uploads")
311 if os.path.isdir(_upload_folder):
312 upload_size_bytes = sum(
313 int(e.stat().st_size) for e in os.scandir(_upload_folder) if e.is_file()
314 )
315 except Exception as exc:
316 log.debug("Could not retrieve upload folder size: %s", exc)
317 from models import Tenant, User # pyright: ignore[reportMissingImports]
319 current_user = db.session.get(User, session["user_id"])
320 tenant_count = Tenant.query.count()
321 _tenant = db.session.get(Tenant, tid) if tid else None
322 upgrade_dir = os.environ.get("OPENHANGAR_UPGRADE_DIR", "")
323 upgrade_dir_enabled = bool(upgrade_dir)
324 upgrade_active = False
325 if upgrade_dir:
326 upgrade_active = os.path.exists(
327 os.path.join(upgrade_dir, "trigger")
328 ) or os.path.exists(os.path.join(upgrade_dir, "trigger.running"))
329 return render_template(
330 "config/settings.html",
331 records=records,
332 backup_extra=backup_extra,
333 backup_encryption_key_set=bool(
334 os.environ.get("OPENHANGAR_BACKUP_ENCRYPTION_KEY")
335 ),
336 backup_folder=current_app.config.get("BACKUP_FOLDER", "/data/backups"),
337 smtp_status=get_smtp_status(),
338 email_health=get_email_health(),
339 user_counts=user_counts,
340 open_invitations=open_invitations,
341 current_version=current_version,
342 latest_version=latest_version,
343 update_available=update_available,
344 versions_behind=versions_behind,
345 db_size=db_size,
346 upload_size_bytes=upload_size_bytes,
347 current_user=current_user,
348 tenant_count=tenant_count,
349 tenant=_tenant,
350 openaip_api_key=(
351 db.session.get(AppSetting, "openaip_api_key")
352 or type("_", (), {"value": None})()
353 ).value,
354 gatus_configured=_parse_gatus_env() is not None,
355 upgrade_dir_enabled=upgrade_dir_enabled,
356 upgrade_active=upgrade_active,
357 )
360@config_bp.route("/tenant-slug", methods=["POST"])
361@login_required
362def update_tenant_slug() -> ResponseReturnValue:
363 import re as _re
364 import shutil
365 from models import Document, PendingReconcile, Tenant, TenantUser # pyright: ignore[reportMissingImports]
367 tu = TenantUser.query.filter_by(user_id=session["user_id"]).first()
368 if not tu:
369 abort(403) # pragma: no cover
370 tenant = db.session.get(Tenant, tu.tenant_id)
371 if not tenant:
372 abort(403) # pragma: no cover
374 raw = request.form.get("slug", "").strip().lower()
375 if not raw:
376 flash(_("Hangar ID cannot be empty."), "danger")
377 return redirect(url_for("config.index"))
379 slug = _re.sub(r"[^a-z0-9]+", "-", raw).strip("-")[:64]
380 if not slug:
381 flash(_("Hangar ID must contain at least one letter or digit."), "danger")
382 return redirect(url_for("config.index"))
384 existing = Tenant.query.filter(Tenant.slug == slug, Tenant.id != tenant.id).first()
385 if existing:
386 flash(_("That Hangar ID is already in use. Please choose another."), "danger")
387 return redirect(url_for("config.index"))
389 old_slug = tenant.slug
390 tenant.slug = slug
392 if old_slug and old_slug != slug:
393 from documents.routes import _safe_join # pyright: ignore[reportMissingImports]
395 folder = current_app.config.get("UPLOAD_FOLDER", "/data/uploads")
396 old_dir = _safe_join(folder, old_slug)
397 new_dir = _safe_join(folder, slug)
398 if os.path.isdir(old_dir):
399 if os.path.isdir(new_dir):
400 # Destination already exists — merge file-by-file
401 for dirpath, _dirs, filenames in os.walk(old_dir):
402 rel = os.path.relpath(dirpath, old_dir)
403 dest_dir = os.path.join(new_dir, rel)
404 os.makedirs(dest_dir, exist_ok=True)
405 for fname in filenames:
406 shutil.move(
407 os.path.join(dirpath, fname),
408 os.path.join(dest_dir, fname),
409 )
410 shutil.rmtree(old_dir, ignore_errors=True)
411 else:
412 os.rename(old_dir, new_dir)
414 # Rewrite stored paths in the database
415 prefix_old = old_slug + "/"
416 prefix_new = slug + "/"
417 for doc in Document.query.filter(Document.filename.like(old_slug + "/%")).all():
418 doc.filename = prefix_new + doc.filename[len(prefix_old) :]
419 for pr in PendingReconcile.query.filter(
420 PendingReconcile.filepath.like(old_slug + "/%")
421 ).all():
422 pr.filepath = prefix_new + pr.filepath[len(prefix_old) :]
424 db.session.commit()
425 flash(_("Hangar ID saved."), "success")
426 return redirect(url_for("config.index"))
429@config_bp.route("/map-tiles", methods=["POST"])
430@login_required
431def update_map_tiles() -> ResponseReturnValue:
432 # ADMIN/OWNER enforcement is handled by config_bp.before_request.
433 key = request.form.get("openaip_api_key", "").strip()
434 setting = db.session.get(AppSetting, "openaip_api_key")
435 if key:
436 if setting:
437 setting.value = key
438 else:
439 db.session.add(AppSetting(key="openaip_api_key", value=key))
440 db.session.commit()
441 flash(_("OpenAIP API key saved."), "success")
442 else:
443 if setting:
444 db.session.delete(setting)
445 db.session.commit()
446 flash(_("OpenAIP API key removed."), "success")
447 return redirect(url_for("config.index"))
450@config_bp.route("/run", methods=["POST"])
451def run_backup_now() -> ResponseReturnValue:
452 if not session.get("user_id"):
453 abort(403)
454 try:
455 record = run_backup()
456 flash(_("Backup completed: %(filename)s", filename=record.filename), "success")
457 except RuntimeError as exc:
458 flash(_("Backup failed: %(error)s", error=exc), "danger")
459 return redirect(url_for("config.index"))
462@config_bp.route("/profile", methods=["POST"])
463def update_profile() -> ResponseReturnValue:
464 if not session.get("user_id"):
465 abort(403)
466 from models import OperatingModel, Tenant, TenantProfile, TenantUser # pyright: ignore[reportMissingImports]
468 tu = TenantUser.query.filter_by(user_id=session["user_id"]).first()
469 if not tu:
470 abort(403) # pragma: no cover
472 profile = TenantProfile.query.filter_by(tenant_id=tu.tenant_id).first()
473 if not profile:
474 profile = TenantProfile(tenant_id=tu.tenant_id, setup_complete=True)
475 db.session.add(profile)
477 model_str = request.form.get("operating_model", "sole_operator")
478 try:
479 profile.operating_model = OperatingModel(model_str)
480 except ValueError:
481 flash(_("Invalid operating model."), "danger")
482 return redirect(url_for("config.index"))
483 if model_str == "sole_pilot":
484 profile.planned_aircraft_count = 0
485 profile.allows_rental = False
486 else:
487 try:
488 count = max(1, int(request.form.get("planned_aircraft_count") or 1))
489 except (ValueError, TypeError):
490 count = 1
491 profile.planned_aircraft_count = count
492 profile.allows_rental = bool(request.form.get("allows_rental"))
494 tenant = db.session.get(Tenant, tu.tenant_id)
495 if tenant:
496 tenant.require_totp = bool(request.form.get("require_totp"))
498 db.session.commit()
499 flash(_("Usage profile updated."), "success")
500 return redirect(url_for("config.index"))
503@config_bp.route("/email/test", methods=["POST"])
504def test_email() -> ResponseReturnValue:
505 if not session.get("user_id"):
506 abort(403)
507 from models import User # pyright: ignore[reportMissingImports]
508 from services.email_service import (
509 EmailNotConfiguredError,
510 EmailSendError,
511 send_email,
512 ) # pyright: ignore[reportMissingImports]
514 user = db.session.get(User, session["user_id"])
515 if not user:
516 abort(403) # pragma: no cover
517 try:
518 send_email(
519 to=user.email,
520 subject="OpenHangar — test email",
521 text_body=(
522 "This is a test email from your OpenHangar instance.\n\n"
523 "If you received this, your SMTP configuration is working correctly."
524 ),
525 )
526 flash(_("Test email sent to %(email)s.", email=user.email), "success")
527 except EmailNotConfiguredError as exc:
528 flash(_("Email not configured: %(error)s", error=exc), "warning")
529 except EmailSendError as exc:
530 flash(_("Email send failed: %(error)s", error=exc), "danger")
531 return redirect(url_for("config.index"))
534@config_bp.route("/check-version", methods=["POST"])
535def check_version() -> ResponseReturnValue:
536 if not session.get("user_id"):
537 abort(403)
538 import json as _json
539 from datetime import datetime, timezone
540 from services.version_service import fetch_versions, upsert_app_setting # pyright: ignore[reportMissingImports]
542 versions = fetch_versions()
543 upsert_app_setting(
544 db.session,
545 "version_last_checked_at",
546 datetime.now(timezone.utc).isoformat(),
547 )
548 if versions:
549 upsert_app_setting(db.session, "latest_version", versions[0])
550 upsert_app_setting(db.session, "all_versions", _json.dumps(versions))
551 db.session.commit()
552 flash(_("Version check refreshed."), "success")
553 return redirect(url_for("config.index"))
556# ── One-click upgrade ─────────────────────────────────────────────────────────
559@config_bp.route("/trigger-upgrade", methods=["POST"])
560def trigger_upgrade() -> ResponseReturnValue:
561 if not session.get("user_id"):
562 abort(403)
563 upgrade_dir = os.environ.get("OPENHANGAR_UPGRADE_DIR", "")
564 if not upgrade_dir:
565 abort(404)
566 os.makedirs(upgrade_dir, exist_ok=True)
567 running_path = os.path.join(upgrade_dir, "trigger.running")
568 trigger_path = os.path.join(upgrade_dir, "trigger")
569 if os.path.exists(running_path):
570 flash(_("An upgrade is already in progress."), "warning")
571 return redirect(url_for("config.index"))
572 if os.path.exists(trigger_path):
573 flash(_("Upgrade already triggered."), "info")
574 return redirect(url_for("config.index"))
575 from models import User # pyright: ignore[reportMissingImports]
577 user = db.session.get(User, session["user_id"])
578 trigger_data = {
579 "triggered_by": user.email if user else "unknown",
580 "triggered_at": datetime.now(timezone.utc).isoformat(),
581 }
582 with open(trigger_path, "w") as fh:
583 json.dump(trigger_data, fh)
584 flash(_("Upgrade triggered. The service will restart shortly."), "info")
585 return redirect(url_for("config.index"))
588@config_bp.route("/upgrade-status")
589def upgrade_status() -> ResponseReturnValue:
590 if not session.get("user_id"):
591 abort(403)
592 upgrade_dir = os.environ.get("OPENHANGAR_UPGRADE_DIR", "")
593 if not upgrade_dir:
594 return abort(404)
595 done_path = os.path.join(upgrade_dir, "trigger.done")
596 failed_path = os.path.join(upgrade_dir, "trigger.failed")
597 running_path = os.path.join(upgrade_dir, "trigger.running")
598 trigger_path = os.path.join(upgrade_dir, "trigger")
599 if os.path.exists(done_path):
600 with contextlib.suppress(OSError):
601 os.remove(done_path)
602 return jsonify({"status": "done"})
603 if os.path.exists(failed_path):
604 msg = ""
605 with contextlib.suppress(OSError):
606 with open(failed_path) as fh:
607 msg = fh.read().strip()
608 os.remove(failed_path)
609 return jsonify({"status": "failed", "message": msg})
610 if os.path.exists(running_path):
611 return jsonify({"status": "in-progress"})
612 if os.path.exists(trigger_path):
613 return jsonify({"status": "triggered"})
614 return jsonify({"status": "idle"})
617# ── Phase 29: Tenant management (instance admin only) ─────────────────────────
620@config_bp.route("/tenants")
621@require_instance_admin
622def tenant_list() -> ResponseReturnValue:
623 from models import Aircraft, Role, Tenant, TenantUser # pyright: ignore[reportMissingImports]
625 tenants = Tenant.query.order_by(Tenant.created_at).all()
626 stats = []
627 for t in tenants:
628 user_count = TenantUser.query.filter_by(tenant_id=t.id).count()
629 aircraft_count = Aircraft.query.filter_by(tenant_id=t.id).count()
630 owners = (
631 TenantUser.query.filter_by(tenant_id=t.id)
632 .filter(TenantUser.role.in_([Role.OWNER, Role.ADMIN]))
633 .all()
634 )
635 stats.append(
636 {
637 "tenant": t,
638 "user_count": user_count,
639 "aircraft_count": aircraft_count,
640 "owners": owners,
641 }
642 )
644 return render_template("config/tenant_list.html", stats=stats)
647@config_bp.route("/tenants/create", methods=["GET", "POST"])
648@require_instance_admin
649def tenant_create() -> ResponseReturnValue:
650 from datetime import timedelta
652 from models import OperatingModel, Role, Tenant, TenantProfile, User, UserInvitation # pyright: ignore[reportMissingImports]
654 user = db.session.get(User, session["user_id"])
655 assert user is not None # guaranteed by @require_instance_admin
657 if request.method == "POST":
658 name = request.form.get("name", "").strip()
659 admin_email = request.form.get("admin_email", "").strip().lower()
660 model_str = request.form.get("operating_model", "sole_operator")
662 if not name:
663 flash(_("Tenant name is required."), "danger")
664 return render_template("config/tenant_create.html")
665 if not admin_email:
666 flash(_("Admin email is required."), "danger")
667 return render_template("config/tenant_create.html")
669 tenant = Tenant(name=name, is_active=True)
670 db.session.add(tenant)
671 db.session.flush()
673 try:
674 op_model: OperatingModel | None = OperatingModel(model_str)
675 except ValueError:
676 op_model = None
678 profile = TenantProfile(
679 tenant_id=tenant.id,
680 operating_model=op_model,
681 setup_complete=False,
682 )
683 db.session.add(profile)
685 invitation = UserInvitation(
686 tenant_id=tenant.id,
687 invited_by_user_id=user.id,
688 email=admin_email,
689 role=Role.OWNER,
690 expires_at=datetime.now(timezone.utc) + timedelta(days=7),
691 )
692 db.session.add(invitation)
693 db.session.commit()
695 flash(
696 _(
697 "Tenant '%(name)s' created. Share this invite link with the owner: %(url)s",
698 name=name,
699 url=url_for(
700 "users.accept_invite", token=invitation.token, _external=True
701 ),
702 ),
703 "success",
704 )
705 return redirect(url_for("config.tenant_list"))
707 return render_template("config/tenant_create.html")
710@config_bp.route("/tenants/<int:tenant_id>/toggle", methods=["POST"])
711@require_instance_admin
712def tenant_toggle_active(tenant_id: int) -> ResponseReturnValue:
713 from models import Tenant # pyright: ignore[reportMissingImports]
715 tenant = db.session.get(Tenant, tenant_id)
716 if not tenant:
717 abort(404)
719 tenant.is_active = not tenant.is_active
720 db.session.commit()
722 if tenant.is_active:
723 flash(_("Tenant '%(name)s' reactivated.", name=tenant.name), "success")
724 else:
725 flash(_("Tenant '%(name)s' deactivated.", name=tenant.name), "warning")
727 return redirect(url_for("config.tenant_list"))
730@config_bp.route("/tenants/<int:tenant_id>/reset-password", methods=["POST"])
731@require_instance_admin
732def tenant_reset_owner_password(tenant_id: int) -> ResponseReturnValue:
733 from datetime import timedelta
735 from models import PasswordResetToken, Role, Tenant, TenantUser, User # pyright: ignore[reportMissingImports]
737 admin = db.session.get(User, session["user_id"])
738 assert admin is not None # guaranteed by @require_instance_admin
739 tenant = db.session.get(Tenant, tenant_id)
740 if not tenant:
741 abort(404)
743 owner_user_id = request.form.get("owner_user_id", type=int)
744 if not owner_user_id:
745 flash(_("No user selected."), "danger")
746 return redirect(url_for("config.tenant_list"))
748 tu = TenantUser.query.filter_by(tenant_id=tenant_id, user_id=owner_user_id).first()
749 if not tu or tu.role not in (Role.OWNER, Role.ADMIN):
750 abort(403)
752 token = PasswordResetToken(
753 user_id=owner_user_id,
754 generated_by_user_id=admin.id,
755 expires_at=datetime.now(timezone.utc) + timedelta(hours=24),
756 )
757 db.session.add(token)
758 db.session.commit()
760 reset_url = url_for("auth.reset_password", token=token.token, _external=True)
761 return render_template(
762 "config/tenant_reset_token.html",
763 tenant=tenant,
764 reset_url=reset_url,
765 expires_at=token.expires_at,
766 )
769@config_bp.route("/notifications/", methods=["GET", "POST"])
770@login_required
771def notification_preferences() -> ResponseReturnValue:
772 """Manage per-user notification preferences. Accessible to all logged-in users."""
773 from models import ( # pyright: ignore[reportMissingImports]
774 NotificationPreference,
775 NotificationType,
776 Role,
777 TenantNotificationDefault,
778 TenantProfile,
779 TenantUser,
780 User,
781 )
782 from utils import current_user_role # pyright: ignore[reportMissingImports]
784 user = db.session.get(User, session["user_id"])
785 if not user:
786 abort(403)
788 tu = TenantUser.query.filter_by(user_id=user.id).first()
789 tenant_id = tu.tenant_id if tu else None
790 role = current_user_role()
792 is_owner = role in (Role.ADMIN, Role.OWNER)
793 is_pilot = (
794 role in (Role.ADMIN, Role.OWNER, Role.PILOT, Role.INSTRUCTOR) or user.is_pilot
795 )
796 is_maint = (
797 role in (Role.ADMIN, Role.OWNER, Role.MAINTENANCE, Role.INSTRUCTOR)
798 or user.is_maintenance
799 )
801 def _user_has_cap(caps: list[str]) -> bool:
802 return (
803 ("is_owner" in caps and is_owner)
804 or ("is_pilot" in caps and is_pilot)
805 or ("is_maint" in caps and is_maint)
806 )
808 visible_types = [
809 t
810 for t in NotificationType.ALL
811 if _user_has_cap(NotificationType.REQUIRED_CAPS.get(t, []))
812 ]
814 if request.method == "POST":
815 if tenant_id is None:
816 flash(_("Cannot save: no tenant associated."), "danger")
817 return redirect(url_for("config.notification_preferences"))
819 for notif_type in visible_types:
820 enabled = bool(request.form.get(f"enabled_{notif_type}"))
821 threshold_raw = request.form.get(f"threshold_{notif_type}", "").strip()
822 threshold_days: int | None = None
823 if notif_type in NotificationType.HAS_THRESHOLD and threshold_raw:
824 try:
825 threshold_days = max(1, int(threshold_raw))
826 except ValueError:
827 threshold_days = None
829 existing = NotificationPreference.query.filter_by(
830 user_id=user.id, tenant_id=tenant_id, notification_type=notif_type
831 ).first()
832 # Only save if the user's preference differs from the effective default
833 system_default = NotificationType.SYSTEM_DEFAULTS.get(notif_type, {})
834 same_as_default = enabled == system_default.get(
835 "enabled", False
836 ) and threshold_days == system_default.get("threshold_days")
837 if same_as_default and existing:
838 db.session.delete(existing)
839 elif not same_as_default:
840 if existing:
841 existing.enabled = enabled
842 existing.threshold_days = threshold_days
843 else:
844 db.session.add(
845 NotificationPreference(
846 user_id=user.id,
847 tenant_id=tenant_id,
848 notification_type=notif_type,
849 enabled=enabled,
850 threshold_days=threshold_days,
851 )
852 )
853 db.session.commit()
854 flash(_("Notification preferences saved."), "success")
855 return redirect(url_for("config.notification_preferences"))
857 # Build current effective preferences for display
858 prefs: dict[str, dict[str, object]] = {}
859 for notif_type in visible_types:
860 if tenant_id:
861 from services.notification_service import get_effective_preference # pyright: ignore[reportMissingImports]
863 prefs[notif_type] = get_effective_preference(user.id, tenant_id, notif_type)
864 else:
865 prefs[notif_type] = dict(
866 NotificationType.SYSTEM_DEFAULTS.get(
867 notif_type, {"enabled": False, "threshold_days": None}
868 )
869 )
871 # Tenant defaults visible only to admins/owners
872 tenant_defaults: dict[str, dict[str, object]] | None = None
873 if is_owner and tenant_id:
874 tenant_defaults = {}
875 for notif_type in NotificationType.ALL:
876 td = TenantNotificationDefault.query.filter_by(
877 tenant_id=tenant_id, notification_type=notif_type
878 ).first()
879 if td:
880 tenant_defaults[notif_type] = {
881 "enabled": td.enabled,
882 "threshold_days": td.threshold_days,
883 }
884 else:
885 tenant_defaults[notif_type] = dict(
886 NotificationType.SYSTEM_DEFAULTS.get(
887 notif_type, {"enabled": False, "threshold_days": None}
888 )
889 )
891 profile = (
892 TenantProfile.query.filter_by(tenant_id=tenant_id).first()
893 if tenant_id
894 else None
895 )
897 return render_template(
898 "config/notifications.html",
899 visible_types=visible_types,
900 prefs=prefs,
901 has_threshold=NotificationType.HAS_THRESHOLD,
902 system_defaults=NotificationType.SYSTEM_DEFAULTS,
903 tenant_defaults=tenant_defaults,
904 is_owner=is_owner,
905 profile=profile,
906 )
909@config_bp.route("/backfill/aircraft-type-icao", methods=["POST"])
910@require_instance_admin
911def backfill_aircraft_type_icao() -> ResponseReturnValue:
912 """Resolve aircraft_type_icao for all logbook entries that have aircraft_type but no icao designator."""
913 from models import PilotLogbookEntry # pyright: ignore[reportMissingImports]
914 from utils import resolve_aircraft_type_icao # pyright: ignore[reportMissingImports]
916 rows = PilotLogbookEntry.query.filter(
917 PilotLogbookEntry.aircraft_type.isnot(None),
918 PilotLogbookEntry.aircraft_type_icao.is_(None),
919 ).all()
921 updated = 0
922 for entry in rows:
923 resolved = resolve_aircraft_type_icao(entry.aircraft_type)
924 if resolved:
925 entry.aircraft_type_icao = resolved
926 updated += 1
928 db.session.commit()
929 flash(
930 ngettext(
931 "Back-fill complete: one of %(total)d entry resolved.",
932 "Back-fill complete: %(updated)d of %(total)d entries resolved.",
933 updated,
934 updated=updated,
935 total=len(rows),
936 ),
937 "success",
938 )
939 return redirect(url_for("config.index"))
942_ALLOWED_BADGE_PATHS: dict[str, str] = {
943 "uptimes/1h/badge.svg": "uptimes/1h/badge.svg",
944 "uptimes/24h/badge.svg": "uptimes/24h/badge.svg",
945 "uptimes/7d/badge.svg": "uptimes/7d/badge.svg",
946 "uptimes/30d/badge.svg": "uptimes/30d/badge.svg",
947 "response-times/1h/badge.svg": "response-times/1h/badge.svg",
948 "response-times/24h/badge.svg": "response-times/24h/badge.svg",
949 "response-times/7d/badge.svg": "response-times/7d/badge.svg",
950 "response-times/30d/badge.svg": "response-times/30d/badge.svg",
951}
954@config_bp.route("/gatus-badge/<path:badge_path>")
955@login_required
956def gatus_badge(badge_path: str) -> ResponseReturnValue:
957 safe_path = _ALLOWED_BADGE_PATHS.get(badge_path)
958 if safe_path is None:
959 return abort(404)
960 gatus = _parse_gatus_env()
961 if gatus is None:
962 return abort(404)
963 base_url, endpoint_key, auth_header = gatus
964 badge_url = f"{base_url}/api/v1/endpoints/{endpoint_key}/{safe_path}"
965 req = urllib.request.Request(badge_url)
966 if auth_header:
967 req.add_header("Authorization", f"Basic {auth_header}")
968 try:
969 with urllib.request.urlopen(req, timeout=5) as resp: # nosec B310
970 content = resp.read()
971 content_type = resp.headers.get("Content-Type", "image/svg+xml")
972 return current_app.response_class(content, mimetype=content_type)
973 except urllib.error.URLError as exc:
974 log.warning("Gatus badge fetch failed (%s): %s", badge_url, repr(exc))
975 return abort(503)