Skip to main content

laminar_connectors/files/
source.rs

1//! File source connector implementing [`SourceConnector`].
2//!
3//! Watches a directory or cloud path for new files, decodes them using the
4//! configured format (CSV, JSON, text, Parquet), and produces `RecordBatch`es.
5
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use arrow_array::RecordBatch;
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use async_trait::async_trait;
12use tracing::{debug, info, warn};
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::schema::traits::FormatDecoder;
20use crate::schema::types::RawRecord;
21
22use super::config::{FileFormat, FileSourceConfig};
23use super::discovery::{DiscoveryConfig, FileDiscoveryEngine};
24use super::manifest::{FileEntry, FileIngestionManifest};
25use super::text_decoder::TextLineDecoder;
26
27/// AutoLoader-style file source connector.
28///
29/// Watches a directory (local or cloud) for new files, infers schema if
30/// needed, and produces `RecordBatch`es via `poll_batch()`.
31pub struct FileSource {
32    /// Parsed configuration.
33    config: Option<FileSourceConfig>,
34    /// Output Arrow schema (resolved in `open()`).
35    schema: SchemaRef,
36    /// Format decoder (created in `open()`).
37    decoder: Option<Box<dyn FormatDecoder>>,
38    /// File discovery engine (started in `open()`).
39    discovery: Option<FileDiscoveryEngine>,
40    /// File ingestion manifest (tracks processed files).
41    manifest: FileIngestionManifest,
42    /// Whether the connector is open.
43    is_open: bool,
44}
45
46impl FileSource {
47    /// Creates a new file source with a placeholder schema.
48    #[must_use]
49    pub fn new() -> Self {
50        let empty_schema = Arc::new(Schema::empty());
51        Self {
52            config: None,
53            schema: empty_schema,
54            decoder: None,
55            discovery: None,
56            manifest: FileIngestionManifest::new(),
57            is_open: false,
58        }
59    }
60}
61
62impl Default for FileSource {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl std::fmt::Debug for FileSource {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("FileSource")
71            .field("is_open", &self.is_open)
72            .field("schema_fields", &self.schema.fields().len())
73            .field("manifest_count", &self.manifest.active_count())
74            .finish()
75    }
76}
77
78#[async_trait]
79impl SourceConnector for FileSource {
80    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
81        let src_config = FileSourceConfig::from_connector_config(config)?;
82
83        // Resolve format (explicit or auto-detect from path).
84        let format = match src_config.format {
85            Some(f) => f,
86            None => FileFormat::from_extension(&src_config.path).ok_or_else(|| {
87                ConnectorError::ConfigurationError(
88                    "cannot detect format from path; specify 'format' explicitly".into(),
89                )
90            })?,
91        };
92
93        // Build decoder and resolve schema.
94        let (decoder, schema) = build_decoder_and_schema(format, &src_config, config)?;
95
96        // Optionally append _metadata struct column.
97        let final_schema = if src_config.include_metadata {
98            let mut fields: Vec<Field> =
99                schema.fields().iter().map(|f| f.as_ref().clone()).collect();
100            fields.push(Field::new(
101                "_metadata",
102                DataType::Struct(
103                    vec![
104                        Field::new("file_path", DataType::Utf8, false),
105                        Field::new("file_name", DataType::Utf8, false),
106                        Field::new("file_size", DataType::UInt64, false),
107                        Field::new("file_modification_time", DataType::Int64, true),
108                    ]
109                    .into(),
110                ),
111                false,
112            ));
113            Arc::new(Schema::new(fields))
114        } else {
115            schema
116        };
117
118        // Start discovery engine with a snapshot of the current manifest for dedup.
119        let discovery_config = DiscoveryConfig {
120            path: src_config.path.clone(),
121            poll_interval: src_config.poll_interval,
122            stabilisation_delay: src_config.stabilisation_delay,
123            glob_pattern: src_config.glob_pattern.clone(),
124        };
125        let known = Arc::new(self.manifest.snapshot_for_dedup());
126        let discovery = FileDiscoveryEngine::start(discovery_config, known);
127
128        self.config = Some(src_config);
129        self.schema = final_schema;
130        self.decoder = Some(decoder);
131        self.discovery = Some(discovery);
132        self.is_open = true;
133
134        info!(
135            "file source opened: format={format:?}, schema_fields={}",
136            self.schema.fields().len()
137        );
138        Ok(())
139    }
140
141    async fn poll_batch(
142        &mut self,
143        _max_records: usize,
144    ) -> Result<Option<SourceBatch>, ConnectorError> {
145        let config = self
146            .config
147            .as_ref()
148            .ok_or_else(|| ConnectorError::InvalidState {
149                expected: "open".into(),
150                actual: "closed".into(),
151            })?;
152        let discovery = self
153            .discovery
154            .as_mut()
155            .ok_or_else(|| ConnectorError::InvalidState {
156                expected: "discovery running".into(),
157                actual: "no discovery".into(),
158            })?;
159        let decoder = self
160            .decoder
161            .as_ref()
162            .ok_or_else(|| ConnectorError::InvalidState {
163                expected: "decoder ready".into(),
164                actual: "no decoder".into(),
165            })?;
166
167        let files = discovery.drain(config.max_files_per_poll);
168        if files.is_empty() {
169            return Ok(None);
170        }
171
172        let mut all_batches: Vec<RecordBatch> = Vec::new();
173
174        for file in &files {
175            // Size-change guard.
176            if !config.allow_overwrites && self.manifest.size_changed(&file.path, file.size) {
177                warn!(
178                    "file source: skipping '{}' — size changed (was {}, now {})",
179                    file.path,
180                    self.manifest
181                        .active_entries()
182                        .find(|(p, _)| *p == file.path)
183                        .map(|(_, e)| e.size)
184                        .unwrap_or(0),
185                    file.size
186                );
187                continue;
188            }
189
190            // Already ingested check (belt-and-suspenders with discovery dedup).
191            if self.manifest.contains(&file.path) {
192                continue;
193            }
194
195            // Max file size guard (primarily for Parquet).
196            if file.size > config.max_file_bytes as u64 {
197                warn!(
198                    "file source: skipping '{}' — size {} exceeds max_file_bytes {}",
199                    file.path, file.size, config.max_file_bytes
200                );
201                continue;
202            }
203
204            // Read file contents.
205            let bytes = match read_file_bytes(&file.path).await {
206                Ok(b) => b,
207                Err(e) => {
208                    warn!("file source: cannot read '{}': {e}", file.path);
209                    continue;
210                }
211            };
212
213            // Decode.
214            let record = RawRecord::new(bytes);
215            match decoder.decode_batch(&[record]) {
216                Ok(batch) if batch.num_rows() > 0 => {
217                    let batch = if config.include_metadata {
218                        append_metadata_column(&batch, &file.path, file.size, file.modified_ms)?
219                    } else {
220                        batch
221                    };
222                    all_batches.push(batch);
223                }
224                Ok(_) => {
225                    debug!("file source: empty batch from '{}'", file.path);
226                }
227                Err(e) => {
228                    warn!("file source: decode error for '{}': {e}", file.path);
229                    continue;
230                }
231            }
232
233            // Record in manifest.
234            self.manifest.insert(
235                file.path.clone(),
236                FileEntry {
237                    size: file.size,
238                    discovered_at: file.modified_ms,
239                    ingested_at: now_millis(),
240                },
241            );
242        }
243
244        // Evict old manifest entries if needed.
245        if let Some(cfg) = &self.config {
246            let max_age_ms = cfg.manifest_retention_age_days * 24 * 60 * 60 * 1000;
247            self.manifest
248                .maybe_evict(cfg.manifest_retention_count, max_age_ms);
249        }
250
251        if all_batches.is_empty() {
252            return Ok(None);
253        }
254
255        // Concatenate all batches.
256        let combined = if all_batches.len() == 1 {
257            all_batches.into_iter().next().unwrap()
258        } else {
259            arrow_select::concat::concat_batches(&self.schema, &all_batches)
260                .map_err(|e| ConnectorError::ReadError(format!("batch concat error: {e}")))?
261        };
262
263        Ok(Some(SourceBatch::new(combined)))
264    }
265
266    fn schema(&self) -> SchemaRef {
267        self.schema.clone()
268    }
269
270    fn checkpoint(&self) -> SourceCheckpoint {
271        let mut cp = SourceCheckpoint::new(0);
272        self.manifest.to_checkpoint(&mut cp);
273        cp
274    }
275
276    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
277        match FileIngestionManifest::from_checkpoint(checkpoint) {
278            Ok(manifest) => {
279                info!(
280                    "file source: restored manifest with {} active entries",
281                    manifest.active_count()
282                );
283                self.manifest = manifest;
284            }
285            Err(e) => {
286                warn!("file source: manifest restore failed: {e} — starting fresh");
287                self.manifest = FileIngestionManifest::new();
288            }
289        }
290        Ok(())
291    }
292
293    fn health_check(&self) -> HealthStatus {
294        if self.is_open {
295            HealthStatus::Healthy
296        } else {
297            HealthStatus::Unknown
298        }
299    }
300
301    async fn close(&mut self) -> Result<(), ConnectorError> {
302        self.discovery = None;
303        self.decoder = None;
304        self.is_open = false;
305        info!("file source closed");
306        Ok(())
307    }
308
309    fn supports_replay(&self) -> bool {
310        true
311    }
312
313    fn as_schema_evolvable(&self) -> Option<&dyn crate::schema::traits::SchemaEvolvable> {
314        // Delegate to DefaultSchemaEvolver when needed.
315        None
316    }
317}
318
319// ── Helpers ──────────────────────────────────────────────────────────
320
321fn build_decoder_and_schema(
322    format: FileFormat,
323    src_config: &FileSourceConfig,
324    connector_config: &ConnectorConfig,
325) -> Result<(Box<dyn FormatDecoder>, SchemaRef), ConnectorError> {
326    match format {
327        FileFormat::Csv => {
328            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
329                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
330            });
331            let csv_config = crate::schema::CsvDecoderConfig {
332                delimiter: src_config.csv_delimiter,
333                has_header: src_config.csv_has_header,
334                ..crate::schema::CsvDecoderConfig::default()
335            };
336            let decoder = crate::schema::CsvDecoder::with_config(schema.clone(), csv_config);
337            Ok((Box::new(decoder), schema))
338        }
339        FileFormat::Json => {
340            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
341                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
342            });
343            let json_config = crate::schema::JsonDecoderConfig::default();
344            let decoder = crate::schema::JsonDecoder::with_config(schema.clone(), json_config);
345            Ok((Box::new(decoder), schema))
346        }
347        FileFormat::Text => {
348            let decoder = TextLineDecoder::new();
349            let schema = decoder.output_schema();
350            Ok((Box::new(decoder), schema))
351        }
352        FileFormat::Parquet => {
353            // For Parquet, schema comes from the file footer (authoritative).
354            // Use a placeholder schema; it will be refined on the first file read.
355            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
356                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
357            });
358            let decoder = crate::schema::parquet::ParquetDecoder::new(schema.clone());
359            Ok((Box::new(decoder), schema))
360        }
361    }
362}
363
364async fn read_file_bytes(path: &str) -> Result<Vec<u8>, ConnectorError> {
365    // Cloud paths would use object_store; local paths use tokio::fs.
366    if path.starts_with("s3://")
367        || path.starts_with("gs://")
368        || path.starts_with("az://")
369        || path.starts_with("abfs://")
370        || path.starts_with("abfss://")
371    {
372        Err(ConnectorError::ConfigurationError(
373            "cloud file reading not yet implemented in poll_batch; use local paths".into(),
374        ))
375    } else {
376        tokio::fs::read(path)
377            .await
378            .map_err(|e| ConnectorError::ReadError(format!("cannot read file '{path}': {e}")))
379    }
380}
381
382fn append_metadata_column(
383    batch: &RecordBatch,
384    file_path: &str,
385    file_size: u64,
386    modified_ms: u64,
387) -> Result<RecordBatch, ConnectorError> {
388    use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, UInt64Array};
389
390    let n = batch.num_rows();
391    let file_name = std::path::Path::new(file_path)
392        .file_name()
393        .and_then(|n| n.to_str())
394        .unwrap_or(file_path);
395
396    let path_array: ArrayRef = Arc::new(StringArray::from(vec![file_path; n]));
397    let name_array: ArrayRef = Arc::new(StringArray::from(vec![file_name; n]));
398    let size_array: ArrayRef = Arc::new(UInt64Array::from(vec![file_size; n]));
399    #[allow(clippy::cast_possible_wrap)]
400    let mod_array: ArrayRef = Arc::new(Int64Array::from(vec![modified_ms as i64; n]));
401
402    let fields = vec![
403        Field::new("file_path", DataType::Utf8, false),
404        Field::new("file_name", DataType::Utf8, false),
405        Field::new("file_size", DataType::UInt64, false),
406        Field::new("file_modification_time", DataType::Int64, true),
407    ];
408    let struct_array = StructArray::try_new(
409        fields.into(),
410        vec![path_array, name_array, size_array, mod_array],
411        None,
412    )
413    .map_err(|e| ConnectorError::ReadError(format!("metadata struct error: {e}")))?;
414
415    // Append struct column to batch.
416    let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
417    columns.push(Arc::new(struct_array));
418
419    let mut fields: Vec<Field> = batch
420        .schema()
421        .fields()
422        .iter()
423        .map(|f| f.as_ref().clone())
424        .collect();
425    fields.push(Field::new(
426        "_metadata",
427        DataType::Struct(
428            vec![
429                Field::new("file_path", DataType::Utf8, false),
430                Field::new("file_name", DataType::Utf8, false),
431                Field::new("file_size", DataType::UInt64, false),
432                Field::new("file_modification_time", DataType::Int64, true),
433            ]
434            .into(),
435        ),
436        false,
437    ));
438
439    RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
440        .map_err(|e| ConnectorError::ReadError(format!("metadata append error: {e}")))
441}
442
443#[allow(clippy::cast_possible_truncation)]
444fn now_millis() -> u64 {
445    SystemTime::now()
446        .duration_since(UNIX_EPOCH)
447        .unwrap_or_default()
448        .as_millis() as u64
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn test_file_source_default() {
457        let source = FileSource::new();
458        assert!(!source.is_open);
459        assert_eq!(source.manifest.active_count(), 0);
460    }
461
462    #[tokio::test]
463    async fn test_open_missing_path() {
464        let mut source = FileSource::new();
465        let config = ConnectorConfig::new("files");
466        let result = source.open(&config).await;
467        assert!(result.is_err());
468    }
469
470    #[tokio::test]
471    async fn test_open_with_text_format() {
472        let mut source = FileSource::new();
473        let mut config = ConnectorConfig::new("files");
474        config.set("path", "/tmp");
475        config.set("format", "text");
476        let result = source.open(&config).await;
477        assert!(result.is_ok());
478        assert!(source.is_open);
479        assert_eq!(source.schema().field(0).name(), "line");
480        source.close().await.unwrap();
481    }
482
483    #[tokio::test]
484    async fn test_poll_batch_when_not_open() {
485        let mut source = FileSource::new();
486        let result = source.poll_batch(100).await;
487        assert!(result.is_err());
488    }
489
490    #[test]
491    fn test_checkpoint_roundtrip() {
492        let mut source = FileSource::new();
493        source.manifest.insert(
494            "test.csv".into(),
495            FileEntry {
496                size: 100,
497                discovered_at: 1000,
498                ingested_at: 2000,
499            },
500        );
501        let cp = source.checkpoint();
502        assert!(cp.get_offset("manifest").is_some());
503    }
504
505    #[tokio::test]
506    async fn test_restore_from_checkpoint() {
507        let mut source = FileSource::new();
508
509        // Build a checkpoint with manifest data.
510        let mut cp = SourceCheckpoint::new(1);
511        cp.set_offset(
512            "manifest",
513            r#"{"a.csv":{"size":100,"discovered_at":900,"ingested_at":1000}}"#,
514        );
515
516        source.restore(&cp).await.unwrap();
517        assert_eq!(source.manifest.active_count(), 1);
518        assert!(source.manifest.contains("a.csv"));
519    }
520
521    #[test]
522    fn test_health_check() {
523        let source = FileSource::new();
524        assert!(matches!(source.health_check(), HealthStatus::Unknown));
525    }
526
527    #[test]
528    fn test_supports_replay() {
529        let source = FileSource::new();
530        assert!(source.supports_replay());
531    }
532}