Coverage for app/sync_watcher.py: 100%

97 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 23:33 +0000

1""" 

2Background polling thread — automatically imports documents that arrive via 

3Syncthing (or any other file-sync tool) into the canonical folder structure. 

4 

5Canonical path layout (relative to UPLOAD_FOLDER): 

6 {tenant_slug}/{aircraft_reg}/{category}/YYYY-MM-DD - title.ext 

7 

8Behaviour on discovery: 

9 - File matches canonical structure AND aircraft is recognised in the tenant 

10 → Document row created immediately (auto-import). 

11 - File path is valid but aircraft/category cannot be resolved unambiguously 

12 → PendingReconcile entry created for manual review in the UI. 

13 - File already tracked (filename in documents table) or already pending 

14 → Skipped. 

15 

16Enabled only when UPLOAD_FOLDER is set and the database is PostgreSQL 

17(SQLite = dev/test; the watcher is skipped there to avoid confusion). 

18 

19Interval is configured via SYNC_SCAN_INTERVAL env var (default: 60 s). 

20""" 

21 

22import contextlib 

23import logging 

24import mimetypes 

25import os 

26import re as _re 

27import threading 

28import time 

29from datetime import date as _date 

30from typing import Any 

31 

32log = logging.getLogger("openhangar.sync_watcher") 

33 

34_CATEGORY_VALUES: set[str] | None = None 

35_DATE_TITLE_RE = _re.compile(r"^(\d{4}-\d{2}-\d{2}) - (.+?)(\.[^.]+)?$") 

36 

37 

38def _categories() -> set[str]: 

39 global _CATEGORY_VALUES 

40 if _CATEGORY_VALUES is None: 

41 from models import DocCategory # pyright: ignore[reportMissingImports] 

42 

43 _CATEGORY_VALUES = set(DocCategory.ALL) 

44 return _CATEGORY_VALUES 

45 

46 

47def _scan_once(app: Any) -> None: 

48 """Single scan pass — runs inside an app context.""" 

49 from models import ( # pyright: ignore[reportMissingImports] 

50 Aircraft, 

51 Document, 

52 PendingReconcile, 

53 Tenant, 

54 db, 

55 ) 

56 

57 folder = app.config.get("UPLOAD_FOLDER", "/data/uploads") 

58 if not os.path.isdir(folder): 

59 return 

60 

61 with app.app_context(): 

62 # Build lookup tables once per scan 

63 tenants = { 

64 t.slug: t for t in Tenant.query.filter(Tenant.slug.isnot(None)).all() 

65 } 

66 if not tenants: 

67 return 

68 

69 known_filenames: set[str] = { 

70 doc.filename 

71 for doc in Document.query.with_entities(Document.filename).all() 

72 } 

73 pending_filepaths: set[str] = { 

74 pr.filepath 

75 for pr in PendingReconcile.query.with_entities( 

76 PendingReconcile.filepath 

77 ).all() 

78 } 

79 

80 for tenant_slug, tenant in tenants.items(): 

81 slug_dir = os.path.join(folder, tenant_slug) 

82 if not os.path.isdir(slug_dir): 

83 continue 

84 

85 # Drop pending entries whose file no longer exists on disk 

86 for pr in PendingReconcile.query.filter_by( 

87 tenant_id=tenant.id, reconciled_at=None, ignored=False 

88 ).all(): 

89 if not os.path.exists(os.path.join(folder, pr.filepath)): 

90 db.session.delete(pr) 

91 pending_filepaths.discard(pr.filepath) 

92 

93 # Build registration → aircraft map for this tenant 

94 aircraft_by_reg: dict[str, Any] = { 

95 ac.registration.upper().replace("-", "").replace(" ", ""): ac 

96 for ac in Aircraft.query.filter_by(tenant_id=tenant.id).all() 

97 } 

98 

99 for dirpath, _dirs, filenames in os.walk(slug_dir): 

100 for fname in filenames: 

101 if fname.startswith(".") or fname.startswith("_"): 

102 continue 

103 

104 full = os.path.join(dirpath, fname) 

105 relpath = os.path.relpath(full, folder).replace("\\", "/") 

106 

107 if relpath in known_filenames or relpath in pending_filepaths: 

108 continue 

109 

110 _process_file( 

111 app, 

112 full, 

113 relpath, 

114 fname, 

115 tenant, 

116 aircraft_by_reg, 

117 known_filenames, 

118 pending_filepaths, 

119 db, 

120 Document, 

121 PendingReconcile, 

122 ) 

123 

124 db.session.commit() 

125 

126 

127def _process_file( # noqa: PLR0913 

128 app: Any, 

129 full_path: str, 

130 relpath: str, 

131 fname: str, 

132 tenant: Any, 

133 aircraft_by_reg: dict[str, Any], 

134 known_filenames: set[str], 

135 pending_filepaths: set[str], 

136 db: Any, 

137 Document: Any, 

138 PendingReconcile: Any, 

139) -> None: 

140 """Decide whether to auto-import or queue for review.""" 

141 parts = relpath.split("/") 

142 # Expected: slug / reg / category / filename (4 parts minimum) 

143 if len(parts) < 4: 

144 _queue_pending( 

145 relpath, fname, None, None, None, None, tenant, db, PendingReconcile 

146 ) 

147 pending_filepaths.add(relpath) 

148 return 

149 

150 reg_raw = parts[1].upper().replace("-", "").replace(" ", "") 

151 cat_str = parts[2] 

152 filename_part = parts[3] 

153 

154 aircraft = aircraft_by_reg.get(reg_raw) 

155 cat_lower = cat_str.lower() 

156 category = cat_lower if cat_lower in _categories() else None 

157 

158 # Parse "YYYY-MM-DD - title.ext" from the filename 

159 m = _DATE_TITLE_RE.match(filename_part) 

160 title_hint: str | None = None 

161 date_hint: _date | None = None 

162 if m: 

163 with contextlib.suppress( 

164 ValueError 

165 ): # regex matched date-like string but it's invalid (e.g. month 13); treat as no date 

166 date_hint = _date.fromisoformat(m.group(1)) 

167 title_hint = m.group(2) 

168 else: 

169 title_hint = os.path.splitext(filename_part)[0] 

170 

171 if aircraft and category: 

172 # Fully resolved — auto-import immediately 

173 mime = mimetypes.guess_type(fname)[0] or "application/octet-stream" 

174 size = None 

175 with contextlib.suppress( 

176 OSError 

177 ): # file may have disappeared between scan and import; size stays None 

178 size = os.path.getsize(full_path) 

179 doc = Document( 

180 aircraft_id=aircraft.id, 

181 filename=relpath, 

182 original_filename=fname, 

183 mime_type=mime, 

184 size_bytes=size, 

185 title=title_hint, 

186 category=category, 

187 ) 

188 db.session.add(doc) 

189 known_filenames.add(relpath) 

190 log.info( 

191 "sync_watcher: auto-imported %s → aircraft=%s category=%s", 

192 relpath, 

193 aircraft.registration, 

194 category, 

195 ) 

196 else: 

197 # Ambiguous — queue for manual review 

198 _queue_pending( 

199 relpath, 

200 fname, 

201 aircraft, 

202 category, 

203 title_hint, 

204 date_hint, 

205 tenant, 

206 db, 

207 PendingReconcile, 

208 ) 

209 pending_filepaths.add(relpath) 

210 log.info( 

211 "sync_watcher: queued for review %s (aircraft=%s category=%s)", 

212 relpath, 

213 aircraft.registration if aircraft else "?", 

214 category or "?", 

215 ) 

216 

217 

218def _queue_pending( 

219 relpath: str, 

220 fname: str, 

221 aircraft: Any, 

222 category: str | None, 

223 title_hint: str | None, 

224 date_hint: _date | None, 

225 tenant: Any, 

226 db: Any, 

227 PendingReconcile: Any, 

228) -> None: 

229 pr = PendingReconcile( 

230 tenant_id=tenant.id, 

231 aircraft_id=aircraft.id if aircraft else None, 

232 filepath=relpath, 

233 category=category, 

234 title_hint=title_hint or os.path.splitext(fname)[0], 

235 date_hint=date_hint, 

236 ) 

237 db.session.add(pr) 

238 

239 

240def _watcher_loop(app: Any, interval: int) -> None: 

241 log.info("sync_watcher: started (interval=%ds)", interval) 

242 while True: 

243 try: 

244 _scan_once(app) 

245 except Exception: 

246 log.exception("sync_watcher: unhandled error during scan") 

247 time.sleep(interval) 

248 

249 

250def start_sync_watcher(app: Any) -> None: 

251 """Start the background sync watcher thread (idempotent, daemon thread).""" 

252 try: 

253 interval = int(os.environ.get("OPENHANGAR_SYNC_SCAN_INTERVAL", "60")) 

254 except ValueError: 

255 interval = 60 

256 

257 t = threading.Thread( 

258 target=_watcher_loop, 

259 args=(app, interval), 

260 daemon=True, 

261 name="sync-watcher", 

262 ) 

263 t.start()