Coverage for app/security_alerts.py: 100%
100 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1"""Real-time alerting for escalated [SECURITY] log events.
3Attaches a SecurityAlertHandler to the 'openhangar' logger. When a WARNING+
4record whose message contains '[SECURITY]' matches one of the escalated event
5types, it fires up to three delivery channels — ntfy, email, webhook — each
6gated by its own env var. Channels that are not configured are silently skipped.
7Delivery failures are logged and never re-raised; alerting must not break the app.
9Env vars (all optional):
10 OPENHANGAR_ALERT_NTFY_TOPIC_URL — ntfy topic URL (hosted or self-hosted)
11 OPENHANGAR_ALERT_EMAIL_TO — recipient address for alert emails
12 OPENHANGAR_ALERT_WEBHOOK_URL — generic HTTP POST endpoint (Slack, etc.)
14Email alerts reuse the existing OPENHANGAR_SMTP_* env vars.
15"""
17import json
18import logging
19import os
20import smtplib
21import threading
22import time
23import urllib.error
24import urllib.request
25from email.mime.text import MIMEText
27_ESCALATED: frozenset[str] = frozenset(
28 {
29 "auth.login.account_locked",
30 "auth.login.account_blocked",
31 "auth.totp.replay",
32 # Disabling 2FA is a classic account-takeover step (an attacker locking
33 # out the legitimate owner); rare enough to alert on without noise.
34 "auth.totp.disabled",
35 "users.role.changed",
36 "users.access.revoked",
37 }
38)
40_DEBOUNCE_SECONDS = 60
42_log = logging.getLogger(__name__)
44_DEFAULT_FORMATTER = logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")
47class SecurityAlertHandler(logging.Handler):
48 """Logging handler that fires real-time alerts for escalated security events."""
50 def __init__(self) -> None:
51 super().__init__(level=logging.WARNING)
52 self.setFormatter(_DEFAULT_FORMATTER)
53 self._debounce: dict[str, float] = {}
54 self._lock = threading.Lock()
56 def emit(self, record: logging.LogRecord) -> None:
57 if record.levelno < self.level:
58 return
59 try:
60 raw = record.getMessage()
61 if "[SECURITY]" not in raw:
62 return
64 parts = raw.split()
65 try:
66 sec_idx = parts.index("[SECURITY]")
67 except ValueError:
68 return
69 if sec_idx + 1 >= len(parts):
70 return
72 event_type = parts[sec_idx + 1]
73 if event_type not in _ESCALATED:
74 return
76 now = time.monotonic()
77 with self._lock:
78 if now - self._debounce.get(event_type, 0.0) < _DEBOUNCE_SECONDS:
79 return
80 self._debounce[event_type] = now
82 body = self.format(record)
83 self._dispatch(event_type, body)
84 except Exception:
85 self.handleError(record)
87 def _dispatch(self, event_type: str, detail: str) -> None:
88 ntfy_url = os.environ.get("OPENHANGAR_ALERT_NTFY_TOPIC_URL", "").strip()
89 alert_email = os.environ.get("OPENHANGAR_ALERT_EMAIL_TO", "").strip()
90 webhook_url = os.environ.get("OPENHANGAR_ALERT_WEBHOOK_URL", "").strip()
92 if ntfy_url:
93 self._send_ntfy(ntfy_url, event_type, detail)
94 if alert_email:
95 self._send_email(alert_email, event_type, detail)
96 if webhook_url:
97 self._send_webhook(webhook_url, event_type, detail)
99 def _send_ntfy(self, url: str, event_type: str, detail: str) -> None:
100 try:
101 req = urllib.request.Request(
102 url,
103 data=detail.encode("utf-8"),
104 headers={
105 "Title": f"OpenHangar security alert: {event_type}",
106 "Priority": "high",
107 "Tags": "warning,lock",
108 },
109 method="POST",
110 )
111 with urllib.request.urlopen(req, timeout=10):
112 pass
113 except Exception as exc:
114 _log.error("Security alert: ntfy delivery failed: %s", exc)
116 def _send_email(self, to: str, event_type: str, detail: str) -> None:
117 try:
118 host = os.environ.get("OPENHANGAR_SMTP_HOST", "").strip()
119 from_addr = os.environ.get("OPENHANGAR_SMTP_FROM_ADDRESS", "").strip()
120 if not host or not from_addr:
121 _log.error(
122 "Security alert: email delivery skipped — "
123 "OPENHANGAR_SMTP_HOST or OPENHANGAR_SMTP_FROM_ADDRESS not configured"
124 )
125 return
127 port = int(os.environ.get("OPENHANGAR_SMTP_PORT", "587"))
128 user = os.environ.get("OPENHANGAR_SMTP_USER", "").strip()
129 password = os.environ.get("OPENHANGAR_SMTP_PASSWORD", "")
130 use_tls = os.environ.get("OPENHANGAR_SMTP_USE_TLS", "true").lower() not in (
131 "false",
132 "0",
133 "no",
134 )
136 msg = MIMEText(detail, "plain", "utf-8")
137 msg["Subject"] = f"[OpenHangar] Security alert: {event_type}"
138 msg["From"] = from_addr
139 msg["To"] = to
141 conn = smtplib.SMTP(host, port, timeout=10)
142 if use_tls:
143 conn.ehlo()
144 conn.starttls()
145 conn.ehlo()
146 if user:
147 conn.login(user, password)
148 conn.sendmail(from_addr, [to], msg.as_bytes())
149 conn.quit()
150 except Exception as exc:
151 _log.error("Security alert: email delivery failed: %s", exc)
153 def _send_webhook(self, url: str, event_type: str, detail: str) -> None:
154 try:
155 payload = json.dumps({"event": event_type, "detail": detail}).encode(
156 "utf-8"
157 )
158 req = urllib.request.Request(
159 url,
160 data=payload,
161 headers={"Content-Type": "application/json"},
162 method="POST",
163 )
164 with urllib.request.urlopen(req, timeout=10):
165 pass
166 except Exception as exc:
167 _log.error("Security alert: webhook delivery failed: %s", exc)
170def attach_to_logger() -> None:
171 """Attach the SecurityAlertHandler to the openhangar logger. Idempotent."""
172 logger = logging.getLogger("openhangar")
173 if not any(isinstance(h, SecurityAlertHandler) for h in logger.handlers):
174 logger.addHandler(SecurityAlertHandler())