laminar_connectors/files/
discovery.rs1use std::path::Path;
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use crossfire::{mpsc, AsyncRx, MAsyncTx, MTx};
15use tracing::{debug, error, info, warn};
16
17use super::manifest::FileIngestionManifest;
18use crate::error::ConnectorError;
19
20#[derive(Debug, Clone)]
22pub struct DiscoveredFile {
23 pub path: String,
25 pub size: u64,
27 pub modified_ms: u64,
29}
30
31#[derive(Debug, Clone)]
33pub struct DiscoveryConfig {
34 pub path: String,
36 pub poll_interval: Duration,
38 pub stabilisation_delay: Duration,
40 pub glob_pattern: Option<String>,
42}
43
44pub struct FileDiscoveryEngine {
46 rx: AsyncRx<mpsc::Array<DiscoveredFile>>,
48 handle: tokio::task::JoinHandle<()>,
51}
52
53impl Drop for FileDiscoveryEngine {
54 fn drop(&mut self) {
55 self.handle.abort();
56 }
57}
58
59impl FileDiscoveryEngine {
60 pub fn start(config: DiscoveryConfig, known_files: Arc<FileIngestionManifest>) -> Self {
64 let (tx, rx) = mpsc::bounded_async::<DiscoveredFile>(256);
65
66 let handle = if is_cloud_path(&config.path) {
67 tokio::spawn(cloud_poll_loop(config, tx, known_files))
68 } else {
69 tokio::spawn(async move {
70 if let Err(e) = local_discovery_loop(config, tx, known_files).await {
71 error!(error = %e, "file discovery loop failed");
72 }
73 })
74 };
75
76 Self { rx, handle }
77 }
78
79 pub fn drain(&mut self, max: usize) -> Vec<DiscoveredFile> {
83 let mut files = Vec::with_capacity(max.min(64));
84 for _ in 0..max {
85 match self.rx.try_recv() {
86 Ok(file) => files.push(file),
87 Err(_) => break,
88 }
89 }
90 files
91 }
92}
93
94fn is_cloud_path(path: &str) -> bool {
96 path.starts_with("s3://")
97 || path.starts_with("gs://")
98 || path.starts_with("az://")
99 || path.starts_with("abfs://")
100 || path.starts_with("abfss://")
101}
102
103async fn cloud_poll_loop(
106 config: DiscoveryConfig,
107 tx: MAsyncTx<mpsc::Array<DiscoveredFile>>,
108 known: Arc<FileIngestionManifest>,
109) {
110 let (store, prefix) = match build_cloud_store(&config.path) {
111 Ok(v) => v,
112 Err(e) => {
113 warn!(
114 "file discovery: cannot create object store for '{}': {e}",
115 config.path
116 );
117 return;
118 }
119 };
120
121 let glob = config
122 .glob_pattern
123 .as_deref()
124 .and_then(|p| globset::Glob::new(p).ok())
125 .map(|g| g.compile_matcher());
126
127 let mut prev_sizes: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
128
129 loop {
130 match list_cloud_files(&store, &prefix).await {
131 Ok(entries) => {
132 for (path, size) in entries {
133 if let Some(ref matcher) = glob {
135 let filename = path.rsplit('/').next().unwrap_or(&path);
136 if !matcher.is_match(filename) {
137 continue;
138 }
139 }
140
141 if known.contains(&path) {
143 continue;
144 }
145
146 match prev_sizes.get(&path) {
148 Some(&prev) if prev == size => {
149 let _ = tx
151 .send(DiscoveredFile {
152 path: path.clone(),
153 size,
154 modified_ms: now_millis(),
155 })
156 .await;
157 prev_sizes.remove(&path);
158 }
159 _ => {
160 prev_sizes.insert(path, size);
162 }
163 }
164 }
165 }
166 Err(e) => {
167 warn!("file discovery: cloud list error: {e}");
168 }
169 }
170 tokio::time::sleep(config.poll_interval).await;
171 }
172}
173
174async fn list_cloud_files(
175 store: &Arc<dyn object_store::ObjectStore>,
176 prefix: &object_store::path::Path,
177) -> Result<Vec<(String, u64)>, object_store::Error> {
178 let mut entries = Vec::new();
179 let mut stream = store.list(Some(prefix));
180 use tokio_stream::StreamExt;
181 while let Some(result) = stream.next().await {
182 let meta = result?;
183 entries.push((meta.location.to_string(), meta.size));
184 }
185 Ok(entries)
186}
187
188fn build_cloud_store(
189 url: &str,
190) -> Result<
191 (Arc<dyn object_store::ObjectStore>, object_store::path::Path),
192 Box<dyn std::error::Error + Send + Sync>,
193> {
194 let scheme_end = url.find("://").map(|i| i + 3).unwrap_or(0);
198 let rest = &url[scheme_end..];
199 let prefix = if let Some(slash) = rest.find('/') {
201 &rest[slash + 1..]
202 } else {
203 ""
204 };
205 let path = object_store::path::Path::from(prefix);
206 let store: Arc<dyn object_store::ObjectStore> = Arc::new(object_store::memory::InMemory::new());
207 warn!(
208 "file discovery: using InMemory object store for '{url}' — \
209 configure a real store via laminar-storage for production"
210 );
211 Ok((store, path))
212}
213
214async fn local_discovery_loop(
217 config: DiscoveryConfig,
218 tx: MAsyncTx<mpsc::Array<DiscoveredFile>>,
219 known: Arc<FileIngestionManifest>,
220) -> Result<(), ConnectorError> {
221 use notify::{RecursiveMode, Watcher};
222
223 let (watch_dir, glob_from_path) = split_dir_and_glob(&config.path);
225
226 let effective_glob = config.glob_pattern.as_deref().or(glob_from_path.as_deref());
227
228 let glob_matcher = effective_glob
229 .and_then(|p| globset::Glob::new(p).ok())
230 .map(|g| g.compile_matcher());
231
232 if !Path::new(&watch_dir).is_dir() {
233 return Err(ConnectorError::ConfigurationError(format!(
234 "file discovery: path '{watch_dir}' is not a directory",
235 )));
236 }
237
238 let use_poll = should_use_poll_watcher(&watch_dir);
239 if use_poll {
240 info!("file discovery: using poll watcher for '{watch_dir}' (network filesystem detected)");
241 }
242
243 let (notify_tx, notify_rx) = mpsc::bounded_async::<String>(512);
245
246 #[allow(dead_code)] enum WatcherHolder {
251 Recommended(notify::RecommendedWatcher),
252 Poll(notify::PollWatcher),
253 }
254
255 let _watcher: WatcherHolder = if use_poll {
256 let notify_tx_clone: MTx<_> = notify_tx.clone().into_blocking();
257 let poll_config = notify::Config::default().with_poll_interval(config.poll_interval);
258 let mut watcher = notify::PollWatcher::new(
259 move |result: Result<notify::Event, notify::Error>| {
260 if let Ok(event) = result {
261 for path in event.paths {
262 if let Some(s) = path.to_str() {
263 let _ = notify_tx_clone.send(s.to_string());
264 }
265 }
266 }
267 },
268 poll_config,
269 )
270 .map_err(|e| {
271 ConnectorError::ConfigurationError(format!("failed to create PollWatcher: {e}"))
272 })?;
273 watcher
274 .watch(Path::new(&watch_dir), RecursiveMode::NonRecursive)
275 .map_err(|e| {
276 ConnectorError::ConfigurationError(format!("failed to watch directory: {e}"))
277 })?;
278 WatcherHolder::Poll(watcher)
279 } else {
280 let notify_tx_clone: MTx<_> = notify_tx.clone().into_blocking();
281 let mut watcher =
282 notify::recommended_watcher(move |result: Result<notify::Event, notify::Error>| {
283 if let Ok(event) = result {
284 use notify::EventKind;
286 match event.kind {
287 EventKind::Create(_) | EventKind::Modify(_) => {
288 for path in event.paths {
289 if let Some(s) = path.to_str() {
290 let _ = notify_tx_clone.send(s.to_string());
291 }
292 }
293 }
294 _ => {}
295 }
296 }
297 })
298 .map_err(|e| {
299 ConnectorError::ConfigurationError(format!("failed to create watcher: {e}"))
300 })?;
301 watcher
302 .watch(Path::new(&watch_dir), RecursiveMode::NonRecursive)
303 .map_err(|e| {
304 ConnectorError::ConfigurationError(format!("failed to watch directory: {e}"))
305 })?;
306 WatcherHolder::Recommended(watcher)
307 };
308
309 if let Ok(entries) = std::fs::read_dir(&watch_dir) {
311 for entry in entries.flatten() {
312 if let Some(s) = entry.path().to_str() {
313 let _ = notify_tx.send(s.to_string()).await;
314 }
315 }
316 }
317
318 let mut pending: std::collections::HashMap<String, (u64, u64)> =
320 std::collections::HashMap::new();
321
322 let stabilise_ms = config.stabilisation_delay.as_millis() as u64;
323
324 loop {
325 while let Ok(path) = notify_rx.try_recv() {
327 if let Some(ref matcher) = glob_matcher {
329 let filename = Path::new(&path)
330 .file_name()
331 .and_then(|n| n.to_str())
332 .unwrap_or("");
333 if !matcher.is_match(filename) {
334 continue;
335 }
336 }
337
338 if Path::new(&path).is_dir() {
340 continue;
341 }
342
343 if known.contains(&path) {
345 continue;
346 }
347
348 let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
349 pending.insert(path, (size, now_millis()));
350 }
351
352 let now = now_millis();
354 let mut ready: Vec<String> = Vec::new();
355 for (path, (size, last_seen)) in &pending {
356 if now.saturating_sub(*last_seen) >= stabilise_ms {
357 let current_size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
359 if current_size == *size && current_size > 0 {
360 ready.push(path.clone());
361 }
362 }
363 }
364
365 for path in ready {
366 if let Some((size, _)) = pending.remove(&path) {
367 debug!("file discovery: file ready: {path} ({size} bytes)");
368 let _ = tx
369 .send(DiscoveredFile {
370 path,
371 size,
372 modified_ms: now,
373 })
374 .await;
375 }
376 }
377
378 tokio::time::sleep(Duration::from_millis(200)).await;
379 }
380}
381
382fn split_dir_and_glob(path: &str) -> (String, Option<String>) {
384 if path.contains('*') || path.contains('?') || path.contains('[') {
386 if let Some(sep) = path.rfind(['/', '\\']) {
387 let dir = &path[..sep];
388 let pattern = &path[sep + 1..];
389 return (dir.to_string(), Some(pattern.to_string()));
390 }
391 }
392 (path.to_string(), None)
393}
394
395#[allow(clippy::unnecessary_wraps)]
397fn should_use_poll_watcher(path: &str) -> bool {
398 #[cfg(target_os = "linux")]
399 {
400 use std::ffi::CString;
401 if let Ok(c_path) = CString::new(path) {
402 unsafe {
403 let mut buf: libc::statfs = std::mem::zeroed();
404 if libc::statfs(c_path.as_ptr(), &raw mut buf) == 0 {
405 #[allow(clippy::cast_sign_loss)]
406 let fs_type = buf.f_type as u64;
407 return matches!(
408 fs_type,
409 0x6969 | 0x5346_544e | 0xFF53_4D42 | 0x0027_e0eb | 0x6573_5546 | 0x6e66_7364 );
416 }
417 }
418 }
419 false
420 }
421 #[cfg(not(target_os = "linux"))]
422 {
423 let _ = path;
424 false
425 }
426}
427
428#[allow(clippy::cast_possible_truncation)]
429fn now_millis() -> u64 {
430 SystemTime::now()
431 .duration_since(UNIX_EPOCH)
432 .unwrap_or_default()
433 .as_millis() as u64
434}
435
436#[cfg(test)]
437mod tests {
438 use super::*;
439
440 #[test]
441 fn test_split_dir_and_glob() {
442 let (dir, glob) = split_dir_and_glob("/data/logs/*.csv");
443 assert_eq!(dir, "/data/logs");
444 assert_eq!(glob.as_deref(), Some("*.csv"));
445
446 let (dir, glob) = split_dir_and_glob("/data/logs");
447 assert_eq!(dir, "/data/logs");
448 assert!(glob.is_none());
449
450 let (dir, glob) = split_dir_and_glob("/data/logs/events_*.json");
451 assert_eq!(dir, "/data/logs");
452 assert_eq!(glob.as_deref(), Some("events_*.json"));
453 }
454
455 #[test]
456 fn test_is_cloud_path() {
457 assert!(is_cloud_path("s3://bucket/prefix"));
458 assert!(is_cloud_path("gs://bucket/prefix"));
459 assert!(is_cloud_path("az://container/path"));
460 assert!(is_cloud_path("abfs://container@account/path"));
461 assert!(!is_cloud_path("/local/path"));
462 assert!(!is_cloud_path("C:\\Users\\data"));
463 }
464
465 #[test]
466 fn test_should_use_poll_on_local() {
467 assert!(!should_use_poll_watcher("/tmp"));
469 }
470
471 #[tokio::test]
472 async fn test_drain_empty() {
473 let config = DiscoveryConfig {
474 path: "/nonexistent_test_dir_12345".into(),
475 poll_interval: Duration::from_secs(60),
476 stabilisation_delay: Duration::from_secs(1),
477 glob_pattern: None,
478 };
479 let known = Arc::new(FileIngestionManifest::new());
480 let mut engine = FileDiscoveryEngine::start(config, known);
481 tokio::time::sleep(Duration::from_millis(50)).await;
483 let files = engine.drain(10);
484 assert!(files.is_empty());
485 }
486}