Skip to main content

laminar_connectors/files/
discovery.rs

1//! File discovery engine (Ring 2).
2//!
3//! Watches a directory for new files and emits [`DiscoveredFile`] events
4//! to the source connector via an `mpsc` channel. Supports three modes:
5//!
6//! - **Event mode**: `notify::recommended_watcher()` for local filesystems
7//! - **Poll mode**: `notify::PollWatcher` for NFS/CIFS/FUSE mounts
8//! - **Cloud poll mode**: `object_store::list()` for cloud storage paths
9
10use std::path::Path;
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use crossfire::{mpsc, AsyncRx, MAsyncTx, MTx};
15use tracing::{debug, error, info, warn};
16
17use super::manifest::FileIngestionManifest;
18use crate::error::ConnectorError;
19
20/// A file discovered by the discovery engine.
21#[derive(Debug, Clone)]
22pub struct DiscoveredFile {
23    /// Full path or URL to the file.
24    pub path: String,
25    /// File size in bytes.
26    pub size: u64,
27    /// Last modification time (millis since epoch).
28    pub modified_ms: u64,
29}
30
31/// Configuration for the discovery engine.
32#[derive(Debug, Clone)]
33pub struct DiscoveryConfig {
34    /// Base path or cloud URL to watch.
35    pub path: String,
36    /// Polling interval for poll/cloud modes.
37    pub poll_interval: Duration,
38    /// Stabilisation delay after last modify event (event mode).
39    pub stabilisation_delay: Duration,
40    /// Glob pattern for filtering file names.
41    pub glob_pattern: Option<String>,
42}
43
44/// Handle to a running discovery engine. Dropping this stops the engine.
45pub struct FileDiscoveryEngine {
46    /// Channel receiver for discovered files.
47    rx: AsyncRx<mpsc::Array<DiscoveredFile>>,
48    /// Background task. Aborted on Drop so the docstring promise holds —
49    /// plain `JoinHandle::drop` would detach the task, leaking it.
50    handle: tokio::task::JoinHandle<()>,
51}
52
53impl Drop for FileDiscoveryEngine {
54    fn drop(&mut self) {
55        self.handle.abort();
56    }
57}
58
59impl FileDiscoveryEngine {
60    /// Starts a discovery engine for the given config.
61    ///
62    /// Files already present in `known_files` are skipped.
63    pub fn start(config: DiscoveryConfig, known_files: Arc<FileIngestionManifest>) -> Self {
64        let (tx, rx) = mpsc::bounded_async::<DiscoveredFile>(256);
65
66        let handle = if is_cloud_path(&config.path) {
67            tokio::spawn(cloud_poll_loop(config, tx, known_files))
68        } else {
69            tokio::spawn(async move {
70                if let Err(e) = local_discovery_loop(config, tx, known_files).await {
71                    error!(error = %e, "file discovery loop failed");
72                }
73            })
74        };
75
76        Self { rx, handle }
77    }
78
79    /// Drains available discovered files (non-blocking).
80    ///
81    /// Returns up to `max` files.
82    pub fn drain(&mut self, max: usize) -> Vec<DiscoveredFile> {
83        let mut files = Vec::with_capacity(max.min(64));
84        for _ in 0..max {
85            match self.rx.try_recv() {
86                Ok(file) => files.push(file),
87                Err(_) => break,
88            }
89        }
90        files
91    }
92}
93
94/// Returns `true` if the path is a cloud storage URL.
95fn is_cloud_path(path: &str) -> bool {
96    path.starts_with("s3://")
97        || path.starts_with("gs://")
98        || path.starts_with("az://")
99        || path.starts_with("abfs://")
100        || path.starts_with("abfss://")
101}
102
103// ── Cloud poll mode ──────────────────────────────────────────────────
104
105async fn cloud_poll_loop(
106    config: DiscoveryConfig,
107    tx: MAsyncTx<mpsc::Array<DiscoveredFile>>,
108    known: Arc<FileIngestionManifest>,
109) {
110    let (store, prefix) = match build_cloud_store(&config.path) {
111        Ok(v) => v,
112        Err(e) => {
113            warn!(
114                "file discovery: cannot create object store for '{}': {e}",
115                config.path
116            );
117            return;
118        }
119    };
120
121    let glob = config
122        .glob_pattern
123        .as_deref()
124        .and_then(|p| globset::Glob::new(p).ok())
125        .map(|g| g.compile_matcher());
126
127    let mut prev_sizes: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
128
129    loop {
130        match list_cloud_files(&store, &prefix).await {
131            Ok(entries) => {
132                for (path, size) in entries {
133                    // Glob filter on filename.
134                    if let Some(ref matcher) = glob {
135                        let filename = path.rsplit('/').next().unwrap_or(&path);
136                        if !matcher.is_match(filename) {
137                            continue;
138                        }
139                    }
140
141                    // Dedup against known files.
142                    if known.contains(&path) {
143                        continue;
144                    }
145
146                    // Size-stable check: need same size across two consecutive polls.
147                    match prev_sizes.get(&path) {
148                        Some(&prev) if prev == size => {
149                            // Stable — emit.
150                            let _ = tx
151                                .send(DiscoveredFile {
152                                    path: path.clone(),
153                                    size,
154                                    modified_ms: now_millis(),
155                                })
156                                .await;
157                            prev_sizes.remove(&path);
158                        }
159                        _ => {
160                            // First seen or size changed — record and wait.
161                            prev_sizes.insert(path, size);
162                        }
163                    }
164                }
165            }
166            Err(e) => {
167                warn!("file discovery: cloud list error: {e}");
168            }
169        }
170        tokio::time::sleep(config.poll_interval).await;
171    }
172}
173
174async fn list_cloud_files(
175    store: &Arc<dyn object_store::ObjectStore>,
176    prefix: &object_store::path::Path,
177) -> Result<Vec<(String, u64)>, object_store::Error> {
178    let mut entries = Vec::new();
179    let mut stream = store.list(Some(prefix));
180    use tokio_stream::StreamExt;
181    while let Some(result) = stream.next().await {
182        let meta = result?;
183        entries.push((meta.location.to_string(), meta.size));
184    }
185    Ok(entries)
186}
187
188fn build_cloud_store(
189    url: &str,
190) -> Result<
191    (Arc<dyn object_store::ObjectStore>, object_store::path::Path),
192    Box<dyn std::error::Error + Send + Sync>,
193> {
194    // For cloud paths, we need a proper object store. Use the InMemory store
195    // as a placeholder — production wiring uses laminar-storage's factory.
196    // Extract prefix from URL for listing.
197    let scheme_end = url.find("://").map(|i| i + 3).unwrap_or(0);
198    let rest = &url[scheme_end..];
199    // Split into bucket and prefix at first '/'.
200    let prefix = if let Some(slash) = rest.find('/') {
201        &rest[slash + 1..]
202    } else {
203        ""
204    };
205    let path = object_store::path::Path::from(prefix);
206    let store: Arc<dyn object_store::ObjectStore> = Arc::new(object_store::memory::InMemory::new());
207    warn!(
208        "file discovery: using InMemory object store for '{url}' — \
209           configure a real store via laminar-storage for production"
210    );
211    Ok((store, path))
212}
213
214// ── Local discovery (event + poll modes) ─────────────────────────────
215
216async fn local_discovery_loop(
217    config: DiscoveryConfig,
218    tx: MAsyncTx<mpsc::Array<DiscoveredFile>>,
219    known: Arc<FileIngestionManifest>,
220) -> Result<(), ConnectorError> {
221    use notify::{RecursiveMode, Watcher};
222
223    // Determine the directory to watch (strip glob from path).
224    let (watch_dir, glob_from_path) = split_dir_and_glob(&config.path);
225
226    let effective_glob = config.glob_pattern.as_deref().or(glob_from_path.as_deref());
227
228    let glob_matcher = effective_glob
229        .and_then(|p| globset::Glob::new(p).ok())
230        .map(|g| g.compile_matcher());
231
232    if !Path::new(&watch_dir).is_dir() {
233        return Err(ConnectorError::ConfigurationError(format!(
234            "file discovery: path '{watch_dir}' is not a directory",
235        )));
236    }
237
238    let use_poll = should_use_poll_watcher(&watch_dir);
239    if use_poll {
240        info!("file discovery: using poll watcher for '{watch_dir}' (network filesystem detected)");
241    }
242
243    // Channel from notify watcher → our async loop.
244    let (notify_tx, notify_rx) = mpsc::bounded_async::<String>(512);
245
246    // Start the appropriate watcher.
247    // Both RecommendedWatcher and PollWatcher are Send, but `dyn Watcher` is not.
248    // We hold them as concrete types inside an enum to preserve Send.
249    #[allow(dead_code)] // Fields held for drop behavior (stops the watcher).
250    enum WatcherHolder {
251        Recommended(notify::RecommendedWatcher),
252        Poll(notify::PollWatcher),
253    }
254
255    let _watcher: WatcherHolder = if use_poll {
256        let notify_tx_clone: MTx<_> = notify_tx.clone().into_blocking();
257        let poll_config = notify::Config::default().with_poll_interval(config.poll_interval);
258        let mut watcher = notify::PollWatcher::new(
259            move |result: Result<notify::Event, notify::Error>| {
260                if let Ok(event) = result {
261                    for path in event.paths {
262                        if let Some(s) = path.to_str() {
263                            let _ = notify_tx_clone.send(s.to_string());
264                        }
265                    }
266                }
267            },
268            poll_config,
269        )
270        .map_err(|e| {
271            ConnectorError::ConfigurationError(format!("failed to create PollWatcher: {e}"))
272        })?;
273        watcher
274            .watch(Path::new(&watch_dir), RecursiveMode::NonRecursive)
275            .map_err(|e| {
276                ConnectorError::ConfigurationError(format!("failed to watch directory: {e}"))
277            })?;
278        WatcherHolder::Poll(watcher)
279    } else {
280        let notify_tx_clone: MTx<_> = notify_tx.clone().into_blocking();
281        let mut watcher =
282            notify::recommended_watcher(move |result: Result<notify::Event, notify::Error>| {
283                if let Ok(event) = result {
284                    // Only emit for create/modify/rename-to events.
285                    use notify::EventKind;
286                    match event.kind {
287                        EventKind::Create(_) | EventKind::Modify(_) => {
288                            for path in event.paths {
289                                if let Some(s) = path.to_str() {
290                                    let _ = notify_tx_clone.send(s.to_string());
291                                }
292                            }
293                        }
294                        _ => {}
295                    }
296                }
297            })
298            .map_err(|e| {
299                ConnectorError::ConfigurationError(format!("failed to create watcher: {e}"))
300            })?;
301        watcher
302            .watch(Path::new(&watch_dir), RecursiveMode::NonRecursive)
303            .map_err(|e| {
304                ConnectorError::ConfigurationError(format!("failed to watch directory: {e}"))
305            })?;
306        WatcherHolder::Recommended(watcher)
307    };
308
309    // Also do an initial directory scan for files that already exist.
310    if let Ok(entries) = std::fs::read_dir(&watch_dir) {
311        for entry in entries.flatten() {
312            if let Some(s) = entry.path().to_str() {
313                let _ = notify_tx.send(s.to_string()).await;
314            }
315        }
316    }
317
318    // Stabilisation tracking: path → (size, last_seen_ms).
319    let mut pending: std::collections::HashMap<String, (u64, u64)> =
320        std::collections::HashMap::new();
321
322    let stabilise_ms = config.stabilisation_delay.as_millis() as u64;
323
324    loop {
325        // Drain new events.
326        while let Ok(path) = notify_rx.try_recv() {
327            // Apply glob filter.
328            if let Some(ref matcher) = glob_matcher {
329                let filename = Path::new(&path)
330                    .file_name()
331                    .and_then(|n| n.to_str())
332                    .unwrap_or("");
333                if !matcher.is_match(filename) {
334                    continue;
335                }
336            }
337
338            // Skip directories.
339            if Path::new(&path).is_dir() {
340                continue;
341            }
342
343            // Skip known files.
344            if known.contains(&path) {
345                continue;
346            }
347
348            let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
349            pending.insert(path, (size, now_millis()));
350        }
351
352        // Check stabilised files.
353        let now = now_millis();
354        let mut ready: Vec<String> = Vec::new();
355        for (path, (size, last_seen)) in &pending {
356            if now.saturating_sub(*last_seen) >= stabilise_ms {
357                // Verify size hasn't changed.
358                let current_size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
359                if current_size == *size && current_size > 0 {
360                    ready.push(path.clone());
361                }
362            }
363        }
364
365        for path in ready {
366            if let Some((size, _)) = pending.remove(&path) {
367                debug!("file discovery: file ready: {path} ({size} bytes)");
368                let _ = tx
369                    .send(DiscoveredFile {
370                        path,
371                        size,
372                        modified_ms: now,
373                    })
374                    .await;
375            }
376        }
377
378        tokio::time::sleep(Duration::from_millis(200)).await;
379    }
380}
381
382/// Splits a path like `/data/logs/*.csv` into `("/data/logs", Some("*.csv"))`.
383fn split_dir_and_glob(path: &str) -> (String, Option<String>) {
384    // If path contains glob characters, split at the last directory separator before them.
385    if path.contains('*') || path.contains('?') || path.contains('[') {
386        if let Some(sep) = path.rfind(['/', '\\']) {
387            let dir = &path[..sep];
388            let pattern = &path[sep + 1..];
389            return (dir.to_string(), Some(pattern.to_string()));
390        }
391    }
392    (path.to_string(), None)
393}
394
395/// Determines if a poll watcher should be used (network filesystem detection).
396#[allow(clippy::unnecessary_wraps)]
397fn should_use_poll_watcher(path: &str) -> bool {
398    #[cfg(target_os = "linux")]
399    {
400        use std::ffi::CString;
401        if let Ok(c_path) = CString::new(path) {
402            unsafe {
403                let mut buf: libc::statfs = std::mem::zeroed();
404                if libc::statfs(c_path.as_ptr(), &raw mut buf) == 0 {
405                    #[allow(clippy::cast_sign_loss)]
406                    let fs_type = buf.f_type as u64;
407                    return matches!(
408                        fs_type,
409                        0x6969          // NFS
410                        | 0x5346_544e   // NTFS (FUSE)
411                        | 0xFF53_4D42   // CIFS/SMB
412                        | 0x0027_e0eb   // ECRYPTFS
413                        | 0x6573_5546   // FUSE (general)
414                        | 0x6e66_7364 // nfsd
415                    );
416                }
417            }
418        }
419        false
420    }
421    #[cfg(not(target_os = "linux"))]
422    {
423        let _ = path;
424        false
425    }
426}
427
428#[allow(clippy::cast_possible_truncation)]
429fn now_millis() -> u64 {
430    SystemTime::now()
431        .duration_since(UNIX_EPOCH)
432        .unwrap_or_default()
433        .as_millis() as u64
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439
440    #[test]
441    fn test_split_dir_and_glob() {
442        let (dir, glob) = split_dir_and_glob("/data/logs/*.csv");
443        assert_eq!(dir, "/data/logs");
444        assert_eq!(glob.as_deref(), Some("*.csv"));
445
446        let (dir, glob) = split_dir_and_glob("/data/logs");
447        assert_eq!(dir, "/data/logs");
448        assert!(glob.is_none());
449
450        let (dir, glob) = split_dir_and_glob("/data/logs/events_*.json");
451        assert_eq!(dir, "/data/logs");
452        assert_eq!(glob.as_deref(), Some("events_*.json"));
453    }
454
455    #[test]
456    fn test_is_cloud_path() {
457        assert!(is_cloud_path("s3://bucket/prefix"));
458        assert!(is_cloud_path("gs://bucket/prefix"));
459        assert!(is_cloud_path("az://container/path"));
460        assert!(is_cloud_path("abfs://container@account/path"));
461        assert!(!is_cloud_path("/local/path"));
462        assert!(!is_cloud_path("C:\\Users\\data"));
463    }
464
465    #[test]
466    fn test_should_use_poll_on_local() {
467        // On non-Linux or local FS, should return false.
468        assert!(!should_use_poll_watcher("/tmp"));
469    }
470
471    #[tokio::test]
472    async fn test_drain_empty() {
473        let config = DiscoveryConfig {
474            path: "/nonexistent_test_dir_12345".into(),
475            poll_interval: Duration::from_secs(60),
476            stabilisation_delay: Duration::from_secs(1),
477            glob_pattern: None,
478        };
479        let known = Arc::new(FileIngestionManifest::new());
480        let mut engine = FileDiscoveryEngine::start(config, known);
481        // Give the background task a moment to start (and fail on the bad path).
482        tokio::time::sleep(Duration::from_millis(50)).await;
483        let files = engine.drain(10);
484        assert!(files.is_empty());
485    }
486}