Coverage for app/security_alerts.py: 100%

100 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1"""Real-time alerting for escalated [SECURITY] log events. 

2 

3Attaches a SecurityAlertHandler to the 'openhangar' logger. When a WARNING+ 

4record whose message contains '[SECURITY]' matches one of the escalated event 

5types, it fires up to three delivery channels — ntfy, email, webhook — each 

6gated by its own env var. Channels that are not configured are silently skipped. 

7Delivery failures are logged and never re-raised; alerting must not break the app. 

8 

9Env vars (all optional): 

10 OPENHANGAR_ALERT_NTFY_TOPIC_URL — ntfy topic URL (hosted or self-hosted) 

11 OPENHANGAR_ALERT_EMAIL_TO — recipient address for alert emails 

12 OPENHANGAR_ALERT_WEBHOOK_URL — generic HTTP POST endpoint (Slack, etc.) 

13 

14Email alerts reuse the existing OPENHANGAR_SMTP_* env vars. 

15""" 

16 

17import json 

18import logging 

19import os 

20import smtplib 

21import threading 

22import time 

23import urllib.error 

24import urllib.request 

25from email.mime.text import MIMEText 

26 

27_ESCALATED: frozenset[str] = frozenset( 

28 { 

29 "auth.login.account_locked", 

30 "auth.login.account_blocked", 

31 "auth.totp.replay", 

32 # Disabling 2FA is a classic account-takeover step (an attacker locking 

33 # out the legitimate owner); rare enough to alert on without noise. 

34 "auth.totp.disabled", 

35 "users.role.changed", 

36 "users.access.revoked", 

37 } 

38) 

39 

40_DEBOUNCE_SECONDS = 60 

41 

42_log = logging.getLogger(__name__) 

43 

44_DEFAULT_FORMATTER = logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s") 

45 

46 

47class SecurityAlertHandler(logging.Handler): 

48 """Logging handler that fires real-time alerts for escalated security events.""" 

49 

50 def __init__(self) -> None: 

51 super().__init__(level=logging.WARNING) 

52 self.setFormatter(_DEFAULT_FORMATTER) 

53 self._debounce: dict[str, float] = {} 

54 self._lock = threading.Lock() 

55 

56 def emit(self, record: logging.LogRecord) -> None: 

57 if record.levelno < self.level: 

58 return 

59 try: 

60 raw = record.getMessage() 

61 if "[SECURITY]" not in raw: 

62 return 

63 

64 parts = raw.split() 

65 try: 

66 sec_idx = parts.index("[SECURITY]") 

67 except ValueError: 

68 return 

69 if sec_idx + 1 >= len(parts): 

70 return 

71 

72 event_type = parts[sec_idx + 1] 

73 if event_type not in _ESCALATED: 

74 return 

75 

76 now = time.monotonic() 

77 with self._lock: 

78 if now - self._debounce.get(event_type, 0.0) < _DEBOUNCE_SECONDS: 

79 return 

80 self._debounce[event_type] = now 

81 

82 body = self.format(record) 

83 self._dispatch(event_type, body) 

84 except Exception: 

85 self.handleError(record) 

86 

87 def _dispatch(self, event_type: str, detail: str) -> None: 

88 ntfy_url = os.environ.get("OPENHANGAR_ALERT_NTFY_TOPIC_URL", "").strip() 

89 alert_email = os.environ.get("OPENHANGAR_ALERT_EMAIL_TO", "").strip() 

90 webhook_url = os.environ.get("OPENHANGAR_ALERT_WEBHOOK_URL", "").strip() 

91 

92 if ntfy_url: 

93 self._send_ntfy(ntfy_url, event_type, detail) 

94 if alert_email: 

95 self._send_email(alert_email, event_type, detail) 

96 if webhook_url: 

97 self._send_webhook(webhook_url, event_type, detail) 

98 

99 def _send_ntfy(self, url: str, event_type: str, detail: str) -> None: 

100 try: 

101 req = urllib.request.Request( 

102 url, 

103 data=detail.encode("utf-8"), 

104 headers={ 

105 "Title": f"OpenHangar security alert: {event_type}", 

106 "Priority": "high", 

107 "Tags": "warning,lock", 

108 }, 

109 method="POST", 

110 ) 

111 with urllib.request.urlopen(req, timeout=10): 

112 pass 

113 except Exception as exc: 

114 _log.error("Security alert: ntfy delivery failed: %s", exc) 

115 

116 def _send_email(self, to: str, event_type: str, detail: str) -> None: 

117 try: 

118 host = os.environ.get("OPENHANGAR_SMTP_HOST", "").strip() 

119 from_addr = os.environ.get("OPENHANGAR_SMTP_FROM_ADDRESS", "").strip() 

120 if not host or not from_addr: 

121 _log.error( 

122 "Security alert: email delivery skipped — " 

123 "OPENHANGAR_SMTP_HOST or OPENHANGAR_SMTP_FROM_ADDRESS not configured" 

124 ) 

125 return 

126 

127 port = int(os.environ.get("OPENHANGAR_SMTP_PORT", "587")) 

128 user = os.environ.get("OPENHANGAR_SMTP_USER", "").strip() 

129 password = os.environ.get("OPENHANGAR_SMTP_PASSWORD", "") 

130 use_tls = os.environ.get("OPENHANGAR_SMTP_USE_TLS", "true").lower() not in ( 

131 "false", 

132 "0", 

133 "no", 

134 ) 

135 

136 msg = MIMEText(detail, "plain", "utf-8") 

137 msg["Subject"] = f"[OpenHangar] Security alert: {event_type}" 

138 msg["From"] = from_addr 

139 msg["To"] = to 

140 

141 conn = smtplib.SMTP(host, port, timeout=10) 

142 if use_tls: 

143 conn.ehlo() 

144 conn.starttls() 

145 conn.ehlo() 

146 if user: 

147 conn.login(user, password) 

148 conn.sendmail(from_addr, [to], msg.as_bytes()) 

149 conn.quit() 

150 except Exception as exc: 

151 _log.error("Security alert: email delivery failed: %s", exc) 

152 

153 def _send_webhook(self, url: str, event_type: str, detail: str) -> None: 

154 try: 

155 payload = json.dumps({"event": event_type, "detail": detail}).encode( 

156 "utf-8" 

157 ) 

158 req = urllib.request.Request( 

159 url, 

160 data=payload, 

161 headers={"Content-Type": "application/json"}, 

162 method="POST", 

163 ) 

164 with urllib.request.urlopen(req, timeout=10): 

165 pass 

166 except Exception as exc: 

167 _log.error("Security alert: webhook delivery failed: %s", exc) 

168 

169 

170def attach_to_logger() -> None: 

171 """Attach the SecurityAlertHandler to the openhangar logger. Idempotent.""" 

172 logger = logging.getLogger("openhangar") 

173 if not any(isinstance(h, SecurityAlertHandler) for h in logger.handlers): 

174 logger.addHandler(SecurityAlertHandler())