Skip to main content

laminar_connectors/files/
discovery.rs

1//! File discovery engine (Ring 2).
2//!
3//! Watches a directory for new files and emits [`DiscoveredFile`] events
4//! to the source connector via an `mpsc` channel. Supports three modes:
5//!
6//! - **Event mode**: `notify::recommended_watcher()` for local filesystems
7//! - **Poll mode**: `notify::PollWatcher` for NFS/CIFS/FUSE mounts
8//! - **Cloud poll mode**: `object_store::list()` for cloud storage paths
9
10use std::path::Path;
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use tokio::sync::mpsc;
15use tracing::{debug, error, info, warn};
16
17use super::manifest::FileIngestionManifest;
18use crate::error::ConnectorError;
19
20/// A file discovered by the discovery engine.
21#[derive(Debug, Clone)]
22pub struct DiscoveredFile {
23    /// Full path or URL to the file.
24    pub path: String,
25    /// File size in bytes.
26    pub size: u64,
27    /// Last modification time (millis since epoch).
28    pub modified_ms: u64,
29}
30
31/// Configuration for the discovery engine.
32#[derive(Debug, Clone)]
33pub struct DiscoveryConfig {
34    /// Base path or cloud URL to watch.
35    pub path: String,
36    /// Polling interval for poll/cloud modes.
37    pub poll_interval: Duration,
38    /// Stabilisation delay after last modify event (event mode).
39    pub stabilisation_delay: Duration,
40    /// Glob pattern for filtering file names.
41    pub glob_pattern: Option<String>,
42}
43
44/// Handle to a running discovery engine. Dropping this stops the engine.
45pub struct FileDiscoveryEngine {
46    /// Channel receiver for discovered files.
47    rx: mpsc::Receiver<DiscoveredFile>,
48    /// Abort handle for the background task.
49    _abort: tokio::task::JoinHandle<()>,
50}
51
52impl FileDiscoveryEngine {
53    /// Starts a discovery engine for the given config.
54    ///
55    /// Files already present in `known_files` are skipped.
56    pub fn start(config: DiscoveryConfig, known_files: Arc<FileIngestionManifest>) -> Self {
57        let (tx, rx) = mpsc::channel(256);
58
59        let abort = if is_cloud_path(&config.path) {
60            tokio::spawn(cloud_poll_loop(config, tx, known_files))
61        } else {
62            tokio::spawn(async move {
63                if let Err(e) = local_discovery_loop(config, tx, known_files).await {
64                    error!(error = %e, "file discovery loop failed");
65                }
66            })
67        };
68
69        Self { rx, _abort: abort }
70    }
71
72    /// Drains available discovered files (non-blocking).
73    ///
74    /// Returns up to `max` files.
75    pub fn drain(&mut self, max: usize) -> Vec<DiscoveredFile> {
76        let mut files = Vec::with_capacity(max.min(64));
77        for _ in 0..max {
78            match self.rx.try_recv() {
79                Ok(file) => files.push(file),
80                Err(_) => break,
81            }
82        }
83        files
84    }
85}
86
87/// Returns `true` if the path is a cloud storage URL.
88fn is_cloud_path(path: &str) -> bool {
89    path.starts_with("s3://")
90        || path.starts_with("gs://")
91        || path.starts_with("az://")
92        || path.starts_with("abfs://")
93        || path.starts_with("abfss://")
94}
95
96// ── Cloud poll mode ──────────────────────────────────────────────────
97
98async fn cloud_poll_loop(
99    config: DiscoveryConfig,
100    tx: mpsc::Sender<DiscoveredFile>,
101    known: Arc<FileIngestionManifest>,
102) {
103    let (store, prefix) = match build_cloud_store(&config.path) {
104        Ok(v) => v,
105        Err(e) => {
106            warn!(
107                "file discovery: cannot create object store for '{}': {e}",
108                config.path
109            );
110            return;
111        }
112    };
113
114    let glob = config
115        .glob_pattern
116        .as_deref()
117        .and_then(|p| globset::Glob::new(p).ok())
118        .map(|g| g.compile_matcher());
119
120    let mut prev_sizes: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
121
122    loop {
123        match list_cloud_files(&store, &prefix).await {
124            Ok(entries) => {
125                for (path, size) in entries {
126                    // Glob filter on filename.
127                    if let Some(ref matcher) = glob {
128                        let filename = path.rsplit('/').next().unwrap_or(&path);
129                        if !matcher.is_match(filename) {
130                            continue;
131                        }
132                    }
133
134                    // Dedup against known files.
135                    if known.contains(&path) {
136                        continue;
137                    }
138
139                    // Size-stable check: need same size across two consecutive polls.
140                    match prev_sizes.get(&path) {
141                        Some(&prev) if prev == size => {
142                            // Stable — emit.
143                            let _ = tx
144                                .send(DiscoveredFile {
145                                    path: path.clone(),
146                                    size,
147                                    modified_ms: now_millis(),
148                                })
149                                .await;
150                            prev_sizes.remove(&path);
151                        }
152                        _ => {
153                            // First seen or size changed — record and wait.
154                            prev_sizes.insert(path, size);
155                        }
156                    }
157                }
158            }
159            Err(e) => {
160                warn!("file discovery: cloud list error: {e}");
161            }
162        }
163        tokio::time::sleep(config.poll_interval).await;
164    }
165}
166
167async fn list_cloud_files(
168    store: &Arc<dyn object_store::ObjectStore>,
169    prefix: &object_store::path::Path,
170) -> Result<Vec<(String, u64)>, object_store::Error> {
171    let mut entries = Vec::new();
172    let mut stream = store.list(Some(prefix));
173    use tokio_stream::StreamExt;
174    while let Some(result) = stream.next().await {
175        let meta = result?;
176        entries.push((meta.location.to_string(), meta.size));
177    }
178    Ok(entries)
179}
180
181fn build_cloud_store(
182    url: &str,
183) -> Result<
184    (Arc<dyn object_store::ObjectStore>, object_store::path::Path),
185    Box<dyn std::error::Error + Send + Sync>,
186> {
187    // For cloud paths, we need a proper object store. Use the InMemory store
188    // as a placeholder — production wiring uses laminar-storage's factory.
189    // Extract prefix from URL for listing.
190    let scheme_end = url.find("://").map(|i| i + 3).unwrap_or(0);
191    let rest = &url[scheme_end..];
192    // Split into bucket and prefix at first '/'.
193    let prefix = if let Some(slash) = rest.find('/') {
194        &rest[slash + 1..]
195    } else {
196        ""
197    };
198    let path = object_store::path::Path::from(prefix);
199    let store: Arc<dyn object_store::ObjectStore> = Arc::new(object_store::memory::InMemory::new());
200    warn!(
201        "file discovery: using InMemory object store for '{url}' — \
202           configure a real store via laminar-storage for production"
203    );
204    Ok((store, path))
205}
206
207// ── Local discovery (event + poll modes) ─────────────────────────────
208
209async fn local_discovery_loop(
210    config: DiscoveryConfig,
211    tx: mpsc::Sender<DiscoveredFile>,
212    known: Arc<FileIngestionManifest>,
213) -> Result<(), ConnectorError> {
214    use notify::{RecursiveMode, Watcher};
215
216    // Determine the directory to watch (strip glob from path).
217    let (watch_dir, glob_from_path) = split_dir_and_glob(&config.path);
218
219    let effective_glob = config.glob_pattern.as_deref().or(glob_from_path.as_deref());
220
221    let glob_matcher = effective_glob
222        .and_then(|p| globset::Glob::new(p).ok())
223        .map(|g| g.compile_matcher());
224
225    if !Path::new(&watch_dir).is_dir() {
226        return Err(ConnectorError::ConfigurationError(format!(
227            "file discovery: path '{watch_dir}' is not a directory",
228        )));
229    }
230
231    let use_poll = should_use_poll_watcher(&watch_dir);
232    if use_poll {
233        info!("file discovery: using poll watcher for '{watch_dir}' (network filesystem detected)");
234    }
235
236    // Channel from notify watcher → our async loop.
237    let (notify_tx, mut notify_rx) = mpsc::channel::<String>(512);
238
239    // Start the appropriate watcher.
240    // Both RecommendedWatcher and PollWatcher are Send, but `dyn Watcher` is not.
241    // We hold them as concrete types inside an enum to preserve Send.
242    #[allow(dead_code)] // Fields held for drop behavior (stops the watcher).
243    enum WatcherHolder {
244        Recommended(notify::RecommendedWatcher),
245        Poll(notify::PollWatcher),
246    }
247
248    let _watcher: WatcherHolder = if use_poll {
249        let notify_tx_clone = notify_tx.clone();
250        let poll_config = notify::Config::default().with_poll_interval(config.poll_interval);
251        let mut watcher = notify::PollWatcher::new(
252            move |result: Result<notify::Event, notify::Error>| {
253                if let Ok(event) = result {
254                    for path in event.paths {
255                        if let Some(s) = path.to_str() {
256                            let _ = notify_tx_clone.blocking_send(s.to_string());
257                        }
258                    }
259                }
260            },
261            poll_config,
262        )
263        .map_err(|e| {
264            ConnectorError::ConfigurationError(format!("failed to create PollWatcher: {e}"))
265        })?;
266        watcher
267            .watch(Path::new(&watch_dir), RecursiveMode::NonRecursive)
268            .map_err(|e| {
269                ConnectorError::ConfigurationError(format!("failed to watch directory: {e}"))
270            })?;
271        WatcherHolder::Poll(watcher)
272    } else {
273        let notify_tx_clone = notify_tx.clone();
274        let mut watcher =
275            notify::recommended_watcher(move |result: Result<notify::Event, notify::Error>| {
276                if let Ok(event) = result {
277                    // Only emit for create/modify/rename-to events.
278                    use notify::EventKind;
279                    match event.kind {
280                        EventKind::Create(_) | EventKind::Modify(_) => {
281                            for path in event.paths {
282                                if let Some(s) = path.to_str() {
283                                    let _ = notify_tx_clone.blocking_send(s.to_string());
284                                }
285                            }
286                        }
287                        _ => {}
288                    }
289                }
290            })
291            .map_err(|e| {
292                ConnectorError::ConfigurationError(format!("failed to create watcher: {e}"))
293            })?;
294        watcher
295            .watch(Path::new(&watch_dir), RecursiveMode::NonRecursive)
296            .map_err(|e| {
297                ConnectorError::ConfigurationError(format!("failed to watch directory: {e}"))
298            })?;
299        WatcherHolder::Recommended(watcher)
300    };
301
302    // Also do an initial directory scan for files that already exist.
303    if let Ok(entries) = std::fs::read_dir(&watch_dir) {
304        for entry in entries.flatten() {
305            if let Some(s) = entry.path().to_str() {
306                let _ = notify_tx.send(s.to_string()).await;
307            }
308        }
309    }
310
311    // Stabilisation tracking: path → (size, last_seen_ms).
312    let mut pending: std::collections::HashMap<String, (u64, u64)> =
313        std::collections::HashMap::new();
314
315    let stabilise_ms = config.stabilisation_delay.as_millis() as u64;
316
317    loop {
318        // Drain new events.
319        while let Ok(path) = notify_rx.try_recv() {
320            // Apply glob filter.
321            if let Some(ref matcher) = glob_matcher {
322                let filename = Path::new(&path)
323                    .file_name()
324                    .and_then(|n| n.to_str())
325                    .unwrap_or("");
326                if !matcher.is_match(filename) {
327                    continue;
328                }
329            }
330
331            // Skip directories.
332            if Path::new(&path).is_dir() {
333                continue;
334            }
335
336            // Skip known files.
337            if known.contains(&path) {
338                continue;
339            }
340
341            let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
342            pending.insert(path, (size, now_millis()));
343        }
344
345        // Check stabilised files.
346        let now = now_millis();
347        let mut ready: Vec<String> = Vec::new();
348        for (path, (size, last_seen)) in &pending {
349            if now.saturating_sub(*last_seen) >= stabilise_ms {
350                // Verify size hasn't changed.
351                let current_size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
352                if current_size == *size && current_size > 0 {
353                    ready.push(path.clone());
354                }
355            }
356        }
357
358        for path in ready {
359            if let Some((size, _)) = pending.remove(&path) {
360                debug!("file discovery: file ready: {path} ({size} bytes)");
361                let _ = tx
362                    .send(DiscoveredFile {
363                        path,
364                        size,
365                        modified_ms: now,
366                    })
367                    .await;
368            }
369        }
370
371        tokio::time::sleep(Duration::from_millis(200)).await;
372    }
373}
374
375/// Splits a path like `/data/logs/*.csv` into `("/data/logs", Some("*.csv"))`.
376fn split_dir_and_glob(path: &str) -> (String, Option<String>) {
377    // If path contains glob characters, split at the last directory separator before them.
378    if path.contains('*') || path.contains('?') || path.contains('[') {
379        if let Some(sep) = path.rfind(['/', '\\']) {
380            let dir = &path[..sep];
381            let pattern = &path[sep + 1..];
382            return (dir.to_string(), Some(pattern.to_string()));
383        }
384    }
385    (path.to_string(), None)
386}
387
388/// Determines if a poll watcher should be used (network filesystem detection).
389#[allow(clippy::unnecessary_wraps)]
390fn should_use_poll_watcher(path: &str) -> bool {
391    #[cfg(target_os = "linux")]
392    {
393        use std::ffi::CString;
394        if let Ok(c_path) = CString::new(path) {
395            unsafe {
396                let mut buf: libc::statfs = std::mem::zeroed();
397                if libc::statfs(c_path.as_ptr(), &raw mut buf) == 0 {
398                    #[allow(clippy::cast_sign_loss)]
399                    let fs_type = buf.f_type as u64;
400                    return matches!(
401                        fs_type,
402                        0x6969          // NFS
403                        | 0x5346_544e   // NTFS (FUSE)
404                        | 0xFF53_4D42   // CIFS/SMB
405                        | 0x0027_e0eb   // ECRYPTFS
406                        | 0x6573_5546   // FUSE (general)
407                        | 0x6e66_7364 // nfsd
408                    );
409                }
410            }
411        }
412        false
413    }
414    #[cfg(not(target_os = "linux"))]
415    {
416        let _ = path;
417        false
418    }
419}
420
421#[allow(clippy::cast_possible_truncation)]
422fn now_millis() -> u64 {
423    SystemTime::now()
424        .duration_since(UNIX_EPOCH)
425        .unwrap_or_default()
426        .as_millis() as u64
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    #[test]
434    fn test_split_dir_and_glob() {
435        let (dir, glob) = split_dir_and_glob("/data/logs/*.csv");
436        assert_eq!(dir, "/data/logs");
437        assert_eq!(glob.as_deref(), Some("*.csv"));
438
439        let (dir, glob) = split_dir_and_glob("/data/logs");
440        assert_eq!(dir, "/data/logs");
441        assert!(glob.is_none());
442
443        let (dir, glob) = split_dir_and_glob("/data/logs/events_*.json");
444        assert_eq!(dir, "/data/logs");
445        assert_eq!(glob.as_deref(), Some("events_*.json"));
446    }
447
448    #[test]
449    fn test_is_cloud_path() {
450        assert!(is_cloud_path("s3://bucket/prefix"));
451        assert!(is_cloud_path("gs://bucket/prefix"));
452        assert!(is_cloud_path("az://container/path"));
453        assert!(is_cloud_path("abfs://container@account/path"));
454        assert!(!is_cloud_path("/local/path"));
455        assert!(!is_cloud_path("C:\\Users\\data"));
456    }
457
458    #[test]
459    fn test_should_use_poll_on_local() {
460        // On non-Linux or local FS, should return false.
461        assert!(!should_use_poll_watcher("/tmp"));
462    }
463
464    #[tokio::test]
465    async fn test_drain_empty() {
466        let config = DiscoveryConfig {
467            path: "/nonexistent_test_dir_12345".into(),
468            poll_interval: Duration::from_secs(60),
469            stabilisation_delay: Duration::from_secs(1),
470            glob_pattern: None,
471        };
472        let known = Arc::new(FileIngestionManifest::new());
473        let mut engine = FileDiscoveryEngine::start(config, known);
474        // Give the background task a moment to start (and fail on the bad path).
475        tokio::time::sleep(Duration::from_millis(50)).await;
476        let files = engine.drain(10);
477        assert!(files.is_empty());
478    }
479}