Coverage for app/sync_watcher.py: 100%
97 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 23:33 +0000
1"""
2Background polling thread — automatically imports documents that arrive via
3Syncthing (or any other file-sync tool) into the canonical folder structure.
5Canonical path layout (relative to UPLOAD_FOLDER):
6 {tenant_slug}/{aircraft_reg}/{category}/YYYY-MM-DD - title.ext
8Behaviour on discovery:
9 - File matches canonical structure AND aircraft is recognised in the tenant
10 → Document row created immediately (auto-import).
11 - File path is valid but aircraft/category cannot be resolved unambiguously
12 → PendingReconcile entry created for manual review in the UI.
13 - File already tracked (filename in documents table) or already pending
14 → Skipped.
16Enabled only when UPLOAD_FOLDER is set and the database is PostgreSQL
17(SQLite = dev/test; the watcher is skipped there to avoid confusion).
19Interval is configured via SYNC_SCAN_INTERVAL env var (default: 60 s).
20"""
22import contextlib
23import logging
24import mimetypes
25import os
26import re as _re
27import threading
28import time
29from datetime import date as _date
30from typing import Any
32log = logging.getLogger("openhangar.sync_watcher")
34_CATEGORY_VALUES: set[str] | None = None
35_DATE_TITLE_RE = _re.compile(r"^(\d{4}-\d{2}-\d{2}) - (.+?)(\.[^.]+)?$")
38def _categories() -> set[str]:
39 global _CATEGORY_VALUES
40 if _CATEGORY_VALUES is None:
41 from models import DocCategory # pyright: ignore[reportMissingImports]
43 _CATEGORY_VALUES = set(DocCategory.ALL)
44 return _CATEGORY_VALUES
47def _scan_once(app: Any) -> None:
48 """Single scan pass — runs inside an app context."""
49 from models import ( # pyright: ignore[reportMissingImports]
50 Aircraft,
51 Document,
52 PendingReconcile,
53 Tenant,
54 db,
55 )
57 folder = app.config.get("UPLOAD_FOLDER", "/data/uploads")
58 if not os.path.isdir(folder):
59 return
61 with app.app_context():
62 # Build lookup tables once per scan
63 tenants = {
64 t.slug: t for t in Tenant.query.filter(Tenant.slug.isnot(None)).all()
65 }
66 if not tenants:
67 return
69 known_filenames: set[str] = {
70 doc.filename
71 for doc in Document.query.with_entities(Document.filename).all()
72 }
73 pending_filepaths: set[str] = {
74 pr.filepath
75 for pr in PendingReconcile.query.with_entities(
76 PendingReconcile.filepath
77 ).all()
78 }
80 for tenant_slug, tenant in tenants.items():
81 slug_dir = os.path.join(folder, tenant_slug)
82 if not os.path.isdir(slug_dir):
83 continue
85 # Drop pending entries whose file no longer exists on disk
86 for pr in PendingReconcile.query.filter_by(
87 tenant_id=tenant.id, reconciled_at=None, ignored=False
88 ).all():
89 if not os.path.exists(os.path.join(folder, pr.filepath)):
90 db.session.delete(pr)
91 pending_filepaths.discard(pr.filepath)
93 # Build registration → aircraft map for this tenant
94 aircraft_by_reg: dict[str, Any] = {
95 ac.registration.upper().replace("-", "").replace(" ", ""): ac
96 for ac in Aircraft.query.filter_by(tenant_id=tenant.id).all()
97 }
99 for dirpath, _dirs, filenames in os.walk(slug_dir):
100 for fname in filenames:
101 if fname.startswith(".") or fname.startswith("_"):
102 continue
104 full = os.path.join(dirpath, fname)
105 relpath = os.path.relpath(full, folder).replace("\\", "/")
107 if relpath in known_filenames or relpath in pending_filepaths:
108 continue
110 _process_file(
111 app,
112 full,
113 relpath,
114 fname,
115 tenant,
116 aircraft_by_reg,
117 known_filenames,
118 pending_filepaths,
119 db,
120 Document,
121 PendingReconcile,
122 )
124 db.session.commit()
127def _process_file( # noqa: PLR0913
128 app: Any,
129 full_path: str,
130 relpath: str,
131 fname: str,
132 tenant: Any,
133 aircraft_by_reg: dict[str, Any],
134 known_filenames: set[str],
135 pending_filepaths: set[str],
136 db: Any,
137 Document: Any,
138 PendingReconcile: Any,
139) -> None:
140 """Decide whether to auto-import or queue for review."""
141 parts = relpath.split("/")
142 # Expected: slug / reg / category / filename (4 parts minimum)
143 if len(parts) < 4:
144 _queue_pending(
145 relpath, fname, None, None, None, None, tenant, db, PendingReconcile
146 )
147 pending_filepaths.add(relpath)
148 return
150 reg_raw = parts[1].upper().replace("-", "").replace(" ", "")
151 cat_str = parts[2]
152 filename_part = parts[3]
154 aircraft = aircraft_by_reg.get(reg_raw)
155 cat_lower = cat_str.lower()
156 category = cat_lower if cat_lower in _categories() else None
158 # Parse "YYYY-MM-DD - title.ext" from the filename
159 m = _DATE_TITLE_RE.match(filename_part)
160 title_hint: str | None = None
161 date_hint: _date | None = None
162 if m:
163 with contextlib.suppress(
164 ValueError
165 ): # regex matched date-like string but it's invalid (e.g. month 13); treat as no date
166 date_hint = _date.fromisoformat(m.group(1))
167 title_hint = m.group(2)
168 else:
169 title_hint = os.path.splitext(filename_part)[0]
171 if aircraft and category:
172 # Fully resolved — auto-import immediately
173 mime = mimetypes.guess_type(fname)[0] or "application/octet-stream"
174 size = None
175 with contextlib.suppress(
176 OSError
177 ): # file may have disappeared between scan and import; size stays None
178 size = os.path.getsize(full_path)
179 doc = Document(
180 aircraft_id=aircraft.id,
181 filename=relpath,
182 original_filename=fname,
183 mime_type=mime,
184 size_bytes=size,
185 title=title_hint,
186 category=category,
187 )
188 db.session.add(doc)
189 known_filenames.add(relpath)
190 log.info(
191 "sync_watcher: auto-imported %s → aircraft=%s category=%s",
192 relpath,
193 aircraft.registration,
194 category,
195 )
196 else:
197 # Ambiguous — queue for manual review
198 _queue_pending(
199 relpath,
200 fname,
201 aircraft,
202 category,
203 title_hint,
204 date_hint,
205 tenant,
206 db,
207 PendingReconcile,
208 )
209 pending_filepaths.add(relpath)
210 log.info(
211 "sync_watcher: queued for review %s (aircraft=%s category=%s)",
212 relpath,
213 aircraft.registration if aircraft else "?",
214 category or "?",
215 )
218def _queue_pending(
219 relpath: str,
220 fname: str,
221 aircraft: Any,
222 category: str | None,
223 title_hint: str | None,
224 date_hint: _date | None,
225 tenant: Any,
226 db: Any,
227 PendingReconcile: Any,
228) -> None:
229 pr = PendingReconcile(
230 tenant_id=tenant.id,
231 aircraft_id=aircraft.id if aircraft else None,
232 filepath=relpath,
233 category=category,
234 title_hint=title_hint or os.path.splitext(fname)[0],
235 date_hint=date_hint,
236 )
237 db.session.add(pr)
240def _watcher_loop(app: Any, interval: int) -> None:
241 log.info("sync_watcher: started (interval=%ds)", interval)
242 while True:
243 try:
244 _scan_once(app)
245 except Exception:
246 log.exception("sync_watcher: unhandled error during scan")
247 time.sleep(interval)
250def start_sync_watcher(app: Any) -> None:
251 """Start the background sync watcher thread (idempotent, daemon thread)."""
252 try:
253 interval = int(os.environ.get("OPENHANGAR_SYNC_SCAN_INTERVAL", "60"))
254 except ValueError:
255 interval = 60
257 t = threading.Thread(
258 target=_watcher_loop,
259 args=(app, interval),
260 daemon=True,
261 name="sync-watcher",
262 )
263 t.start()