laminar_connectors/files/
discovery.rs1use std::path::Path;
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use tokio::sync::mpsc;
15use tracing::{debug, error, info, warn};
16
17use super::manifest::FileIngestionManifest;
18use crate::error::ConnectorError;
19
20#[derive(Debug, Clone)]
22pub struct DiscoveredFile {
23 pub path: String,
25 pub size: u64,
27 pub modified_ms: u64,
29}
30
31#[derive(Debug, Clone)]
33pub struct DiscoveryConfig {
34 pub path: String,
36 pub poll_interval: Duration,
38 pub stabilisation_delay: Duration,
40 pub glob_pattern: Option<String>,
42}
43
44pub struct FileDiscoveryEngine {
46 rx: mpsc::Receiver<DiscoveredFile>,
48 _abort: tokio::task::JoinHandle<()>,
50}
51
52impl FileDiscoveryEngine {
53 pub fn start(config: DiscoveryConfig, known_files: Arc<FileIngestionManifest>) -> Self {
57 let (tx, rx) = mpsc::channel(256);
58
59 let abort = if is_cloud_path(&config.path) {
60 tokio::spawn(cloud_poll_loop(config, tx, known_files))
61 } else {
62 tokio::spawn(async move {
63 if let Err(e) = local_discovery_loop(config, tx, known_files).await {
64 error!(error = %e, "file discovery loop failed");
65 }
66 })
67 };
68
69 Self { rx, _abort: abort }
70 }
71
72 pub fn drain(&mut self, max: usize) -> Vec<DiscoveredFile> {
76 let mut files = Vec::with_capacity(max.min(64));
77 for _ in 0..max {
78 match self.rx.try_recv() {
79 Ok(file) => files.push(file),
80 Err(_) => break,
81 }
82 }
83 files
84 }
85}
86
87fn is_cloud_path(path: &str) -> bool {
89 path.starts_with("s3://")
90 || path.starts_with("gs://")
91 || path.starts_with("az://")
92 || path.starts_with("abfs://")
93 || path.starts_with("abfss://")
94}
95
96async fn cloud_poll_loop(
99 config: DiscoveryConfig,
100 tx: mpsc::Sender<DiscoveredFile>,
101 known: Arc<FileIngestionManifest>,
102) {
103 let (store, prefix) = match build_cloud_store(&config.path) {
104 Ok(v) => v,
105 Err(e) => {
106 warn!(
107 "file discovery: cannot create object store for '{}': {e}",
108 config.path
109 );
110 return;
111 }
112 };
113
114 let glob = config
115 .glob_pattern
116 .as_deref()
117 .and_then(|p| globset::Glob::new(p).ok())
118 .map(|g| g.compile_matcher());
119
120 let mut prev_sizes: std::collections::HashMap<String, u64> = std::collections::HashMap::new();
121
122 loop {
123 match list_cloud_files(&store, &prefix).await {
124 Ok(entries) => {
125 for (path, size) in entries {
126 if let Some(ref matcher) = glob {
128 let filename = path.rsplit('/').next().unwrap_or(&path);
129 if !matcher.is_match(filename) {
130 continue;
131 }
132 }
133
134 if known.contains(&path) {
136 continue;
137 }
138
139 match prev_sizes.get(&path) {
141 Some(&prev) if prev == size => {
142 let _ = tx
144 .send(DiscoveredFile {
145 path: path.clone(),
146 size,
147 modified_ms: now_millis(),
148 })
149 .await;
150 prev_sizes.remove(&path);
151 }
152 _ => {
153 prev_sizes.insert(path, size);
155 }
156 }
157 }
158 }
159 Err(e) => {
160 warn!("file discovery: cloud list error: {e}");
161 }
162 }
163 tokio::time::sleep(config.poll_interval).await;
164 }
165}
166
167async fn list_cloud_files(
168 store: &Arc<dyn object_store::ObjectStore>,
169 prefix: &object_store::path::Path,
170) -> Result<Vec<(String, u64)>, object_store::Error> {
171 let mut entries = Vec::new();
172 let mut stream = store.list(Some(prefix));
173 use tokio_stream::StreamExt;
174 while let Some(result) = stream.next().await {
175 let meta = result?;
176 entries.push((meta.location.to_string(), meta.size));
177 }
178 Ok(entries)
179}
180
181fn build_cloud_store(
182 url: &str,
183) -> Result<
184 (Arc<dyn object_store::ObjectStore>, object_store::path::Path),
185 Box<dyn std::error::Error + Send + Sync>,
186> {
187 let scheme_end = url.find("://").map(|i| i + 3).unwrap_or(0);
191 let rest = &url[scheme_end..];
192 let prefix = if let Some(slash) = rest.find('/') {
194 &rest[slash + 1..]
195 } else {
196 ""
197 };
198 let path = object_store::path::Path::from(prefix);
199 let store: Arc<dyn object_store::ObjectStore> = Arc::new(object_store::memory::InMemory::new());
200 warn!(
201 "file discovery: using InMemory object store for '{url}' — \
202 configure a real store via laminar-storage for production"
203 );
204 Ok((store, path))
205}
206
207async fn local_discovery_loop(
210 config: DiscoveryConfig,
211 tx: mpsc::Sender<DiscoveredFile>,
212 known: Arc<FileIngestionManifest>,
213) -> Result<(), ConnectorError> {
214 use notify::{RecursiveMode, Watcher};
215
216 let (watch_dir, glob_from_path) = split_dir_and_glob(&config.path);
218
219 let effective_glob = config.glob_pattern.as_deref().or(glob_from_path.as_deref());
220
221 let glob_matcher = effective_glob
222 .and_then(|p| globset::Glob::new(p).ok())
223 .map(|g| g.compile_matcher());
224
225 if !Path::new(&watch_dir).is_dir() {
226 return Err(ConnectorError::ConfigurationError(format!(
227 "file discovery: path '{watch_dir}' is not a directory",
228 )));
229 }
230
231 let use_poll = should_use_poll_watcher(&watch_dir);
232 if use_poll {
233 info!("file discovery: using poll watcher for '{watch_dir}' (network filesystem detected)");
234 }
235
236 let (notify_tx, mut notify_rx) = mpsc::channel::<String>(512);
238
239 #[allow(dead_code)] enum WatcherHolder {
244 Recommended(notify::RecommendedWatcher),
245 Poll(notify::PollWatcher),
246 }
247
248 let _watcher: WatcherHolder = if use_poll {
249 let notify_tx_clone = notify_tx.clone();
250 let poll_config = notify::Config::default().with_poll_interval(config.poll_interval);
251 let mut watcher = notify::PollWatcher::new(
252 move |result: Result<notify::Event, notify::Error>| {
253 if let Ok(event) = result {
254 for path in event.paths {
255 if let Some(s) = path.to_str() {
256 let _ = notify_tx_clone.blocking_send(s.to_string());
257 }
258 }
259 }
260 },
261 poll_config,
262 )
263 .map_err(|e| {
264 ConnectorError::ConfigurationError(format!("failed to create PollWatcher: {e}"))
265 })?;
266 watcher
267 .watch(Path::new(&watch_dir), RecursiveMode::NonRecursive)
268 .map_err(|e| {
269 ConnectorError::ConfigurationError(format!("failed to watch directory: {e}"))
270 })?;
271 WatcherHolder::Poll(watcher)
272 } else {
273 let notify_tx_clone = notify_tx.clone();
274 let mut watcher =
275 notify::recommended_watcher(move |result: Result<notify::Event, notify::Error>| {
276 if let Ok(event) = result {
277 use notify::EventKind;
279 match event.kind {
280 EventKind::Create(_) | EventKind::Modify(_) => {
281 for path in event.paths {
282 if let Some(s) = path.to_str() {
283 let _ = notify_tx_clone.blocking_send(s.to_string());
284 }
285 }
286 }
287 _ => {}
288 }
289 }
290 })
291 .map_err(|e| {
292 ConnectorError::ConfigurationError(format!("failed to create watcher: {e}"))
293 })?;
294 watcher
295 .watch(Path::new(&watch_dir), RecursiveMode::NonRecursive)
296 .map_err(|e| {
297 ConnectorError::ConfigurationError(format!("failed to watch directory: {e}"))
298 })?;
299 WatcherHolder::Recommended(watcher)
300 };
301
302 if let Ok(entries) = std::fs::read_dir(&watch_dir) {
304 for entry in entries.flatten() {
305 if let Some(s) = entry.path().to_str() {
306 let _ = notify_tx.send(s.to_string()).await;
307 }
308 }
309 }
310
311 let mut pending: std::collections::HashMap<String, (u64, u64)> =
313 std::collections::HashMap::new();
314
315 let stabilise_ms = config.stabilisation_delay.as_millis() as u64;
316
317 loop {
318 while let Ok(path) = notify_rx.try_recv() {
320 if let Some(ref matcher) = glob_matcher {
322 let filename = Path::new(&path)
323 .file_name()
324 .and_then(|n| n.to_str())
325 .unwrap_or("");
326 if !matcher.is_match(filename) {
327 continue;
328 }
329 }
330
331 if Path::new(&path).is_dir() {
333 continue;
334 }
335
336 if known.contains(&path) {
338 continue;
339 }
340
341 let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
342 pending.insert(path, (size, now_millis()));
343 }
344
345 let now = now_millis();
347 let mut ready: Vec<String> = Vec::new();
348 for (path, (size, last_seen)) in &pending {
349 if now.saturating_sub(*last_seen) >= stabilise_ms {
350 let current_size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
352 if current_size == *size && current_size > 0 {
353 ready.push(path.clone());
354 }
355 }
356 }
357
358 for path in ready {
359 if let Some((size, _)) = pending.remove(&path) {
360 debug!("file discovery: file ready: {path} ({size} bytes)");
361 let _ = tx
362 .send(DiscoveredFile {
363 path,
364 size,
365 modified_ms: now,
366 })
367 .await;
368 }
369 }
370
371 tokio::time::sleep(Duration::from_millis(200)).await;
372 }
373}
374
375fn split_dir_and_glob(path: &str) -> (String, Option<String>) {
377 if path.contains('*') || path.contains('?') || path.contains('[') {
379 if let Some(sep) = path.rfind(['/', '\\']) {
380 let dir = &path[..sep];
381 let pattern = &path[sep + 1..];
382 return (dir.to_string(), Some(pattern.to_string()));
383 }
384 }
385 (path.to_string(), None)
386}
387
388#[allow(clippy::unnecessary_wraps)]
390fn should_use_poll_watcher(path: &str) -> bool {
391 #[cfg(target_os = "linux")]
392 {
393 use std::ffi::CString;
394 if let Ok(c_path) = CString::new(path) {
395 unsafe {
396 let mut buf: libc::statfs = std::mem::zeroed();
397 if libc::statfs(c_path.as_ptr(), &raw mut buf) == 0 {
398 #[allow(clippy::cast_sign_loss)]
399 let fs_type = buf.f_type as u64;
400 return matches!(
401 fs_type,
402 0x6969 | 0x5346_544e | 0xFF53_4D42 | 0x0027_e0eb | 0x6573_5546 | 0x6e66_7364 );
409 }
410 }
411 }
412 false
413 }
414 #[cfg(not(target_os = "linux"))]
415 {
416 let _ = path;
417 false
418 }
419}
420
421#[allow(clippy::cast_possible_truncation)]
422fn now_millis() -> u64 {
423 SystemTime::now()
424 .duration_since(UNIX_EPOCH)
425 .unwrap_or_default()
426 .as_millis() as u64
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432
433 #[test]
434 fn test_split_dir_and_glob() {
435 let (dir, glob) = split_dir_and_glob("/data/logs/*.csv");
436 assert_eq!(dir, "/data/logs");
437 assert_eq!(glob.as_deref(), Some("*.csv"));
438
439 let (dir, glob) = split_dir_and_glob("/data/logs");
440 assert_eq!(dir, "/data/logs");
441 assert!(glob.is_none());
442
443 let (dir, glob) = split_dir_and_glob("/data/logs/events_*.json");
444 assert_eq!(dir, "/data/logs");
445 assert_eq!(glob.as_deref(), Some("events_*.json"));
446 }
447
448 #[test]
449 fn test_is_cloud_path() {
450 assert!(is_cloud_path("s3://bucket/prefix"));
451 assert!(is_cloud_path("gs://bucket/prefix"));
452 assert!(is_cloud_path("az://container/path"));
453 assert!(is_cloud_path("abfs://container@account/path"));
454 assert!(!is_cloud_path("/local/path"));
455 assert!(!is_cloud_path("C:\\Users\\data"));
456 }
457
458 #[test]
459 fn test_should_use_poll_on_local() {
460 assert!(!should_use_poll_watcher("/tmp"));
462 }
463
464 #[tokio::test]
465 async fn test_drain_empty() {
466 let config = DiscoveryConfig {
467 path: "/nonexistent_test_dir_12345".into(),
468 poll_interval: Duration::from_secs(60),
469 stabilisation_delay: Duration::from_secs(1),
470 glob_pattern: None,
471 };
472 let known = Arc::new(FileIngestionManifest::new());
473 let mut engine = FileDiscoveryEngine::start(config, known);
474 tokio::time::sleep(Duration::from_millis(50)).await;
476 let files = engine.drain(10);
477 assert!(files.is_empty());
478 }
479}