Skip to main content

laminar_connectors/files/
source.rs

1//! File source connector implementing [`SourceConnector`].
2//!
3//! Watches a directory or cloud path for new files, decodes them using the
4//! configured format (CSV, JSON, text, Parquet), and produces `RecordBatch`es.
5
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use arrow_array::RecordBatch;
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use async_trait::async_trait;
12use tracing::{debug, info, warn};
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::schema::traits::FormatDecoder;
20use crate::schema::types::RawRecord;
21
22use super::config::{FileFormat, FileSourceConfig};
23use super::discovery::{DiscoveryConfig, FileDiscoveryEngine};
24use super::manifest::{FileEntry, FileIngestionManifest};
25use super::text_decoder::TextLineDecoder;
26
27/// AutoLoader-style file source connector.
28///
29/// Watches a directory (local or cloud) for new files, infers schema if
30/// needed, and produces `RecordBatch`es via `poll_batch()`.
31pub struct FileSource {
32    /// Parsed configuration.
33    config: Option<FileSourceConfig>,
34    /// Output Arrow schema (resolved in `open()`).
35    schema: SchemaRef,
36    /// Format decoder (created in `open()`).
37    decoder: Option<Box<dyn FormatDecoder>>,
38    /// File discovery engine (started in `open()`).
39    discovery: Option<FileDiscoveryEngine>,
40    /// File ingestion manifest (tracks processed files).
41    manifest: FileIngestionManifest,
42    /// Whether the connector is open.
43    is_open: bool,
44}
45
46impl FileSource {
47    /// Creates a new file source with a placeholder schema.
48    #[must_use]
49    pub fn new() -> Self {
50        Self::with_registry(None)
51    }
52
53    /// Creates a new file source with an optional Prometheus registry.
54    #[must_use]
55    pub fn with_registry(_registry: Option<&prometheus::Registry>) -> Self {
56        let empty_schema = Arc::new(Schema::empty());
57        Self {
58            config: None,
59            schema: empty_schema,
60            decoder: None,
61            discovery: None,
62            manifest: FileIngestionManifest::new(),
63            is_open: false,
64        }
65    }
66}
67
68impl Default for FileSource {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74impl std::fmt::Debug for FileSource {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("FileSource")
77            .field("is_open", &self.is_open)
78            .field("schema_fields", &self.schema.fields().len())
79            .field("manifest_count", &self.manifest.active_count())
80            .finish()
81    }
82}
83
84#[async_trait]
85impl SourceConnector for FileSource {
86    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
87        let src_config = FileSourceConfig::from_connector_config(config)?;
88
89        // Resolve format (explicit or auto-detect from path).
90        let format = match src_config.format {
91            Some(f) => f,
92            None => FileFormat::from_extension(&src_config.path).ok_or_else(|| {
93                ConnectorError::ConfigurationError(
94                    "cannot detect format from path; specify 'format' explicitly".into(),
95                )
96            })?,
97        };
98
99        // Build decoder and resolve schema.
100        let (decoder, schema) = build_decoder_and_schema(format, &src_config, config)?;
101
102        // Optionally append _metadata struct column.
103        let final_schema = if src_config.include_metadata {
104            let mut fields: Vec<Field> =
105                schema.fields().iter().map(|f| f.as_ref().clone()).collect();
106            fields.push(Field::new(
107                "_metadata",
108                DataType::Struct(
109                    vec![
110                        Field::new("file_path", DataType::Utf8, false),
111                        Field::new("file_name", DataType::Utf8, false),
112                        Field::new("file_size", DataType::UInt64, false),
113                        Field::new(
114                            "file_modification_time",
115                            DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
116                            true,
117                        ),
118                    ]
119                    .into(),
120                ),
121                false,
122            ));
123            Arc::new(Schema::new(fields))
124        } else {
125            schema
126        };
127
128        // Start discovery engine with a snapshot of the current manifest for dedup.
129        let discovery_config = DiscoveryConfig {
130            path: src_config.path.clone(),
131            poll_interval: src_config.poll_interval,
132            stabilisation_delay: src_config.stabilisation_delay,
133            glob_pattern: src_config.glob_pattern.clone(),
134        };
135        let known = Arc::new(self.manifest.snapshot_for_dedup());
136        let discovery = FileDiscoveryEngine::start(discovery_config, known);
137
138        self.config = Some(src_config);
139        self.schema = final_schema;
140        self.decoder = Some(decoder);
141        self.discovery = Some(discovery);
142        self.is_open = true;
143
144        info!(
145            "file source opened: format={format:?}, schema_fields={}",
146            self.schema.fields().len()
147        );
148        Ok(())
149    }
150
151    async fn poll_batch(
152        &mut self,
153        _max_records: usize,
154    ) -> Result<Option<SourceBatch>, ConnectorError> {
155        let config = self
156            .config
157            .as_ref()
158            .ok_or_else(|| ConnectorError::InvalidState {
159                expected: "open".into(),
160                actual: "closed".into(),
161            })?;
162        let discovery = self
163            .discovery
164            .as_mut()
165            .ok_or_else(|| ConnectorError::InvalidState {
166                expected: "discovery running".into(),
167                actual: "no discovery".into(),
168            })?;
169        let decoder = self
170            .decoder
171            .as_ref()
172            .ok_or_else(|| ConnectorError::InvalidState {
173                expected: "decoder ready".into(),
174                actual: "no decoder".into(),
175            })?;
176
177        let files = discovery.drain(config.max_files_per_poll);
178        if files.is_empty() {
179            return Ok(None);
180        }
181
182        let mut all_batches: Vec<RecordBatch> = Vec::new();
183
184        for file in &files {
185            // Size-change guard.
186            if !config.allow_overwrites && self.manifest.size_changed(&file.path, file.size) {
187                warn!(
188                    "file source: skipping '{}' — size changed (was {}, now {})",
189                    file.path,
190                    self.manifest
191                        .active_entries()
192                        .find(|(p, _)| *p == file.path)
193                        .map(|(_, e)| e.size)
194                        .unwrap_or(0),
195                    file.size
196                );
197                continue;
198            }
199
200            // Already ingested check (belt-and-suspenders with discovery dedup).
201            if self.manifest.contains(&file.path) {
202                continue;
203            }
204
205            // Max file size guard (primarily for Parquet).
206            if file.size > config.max_file_bytes as u64 {
207                warn!(
208                    "file source: skipping '{}' — size {} exceeds max_file_bytes {}",
209                    file.path, file.size, config.max_file_bytes
210                );
211                continue;
212            }
213
214            // Read file contents.
215            let bytes = match read_file_bytes(&file.path).await {
216                Ok(b) => b,
217                Err(e) => {
218                    warn!("file source: cannot read '{}': {e}", file.path);
219                    continue;
220                }
221            };
222
223            // Decode.
224            let record = RawRecord::new(bytes);
225            match decoder.decode_batch(&[record]) {
226                Ok(batch) if batch.num_rows() > 0 => {
227                    let batch = if config.include_metadata {
228                        append_metadata_column(&batch, &file.path, file.size, file.modified_ms)?
229                    } else {
230                        batch
231                    };
232                    all_batches.push(batch);
233                }
234                Ok(_) => {
235                    debug!("file source: empty batch from '{}'", file.path);
236                }
237                Err(e) => {
238                    warn!("file source: decode error for '{}': {e}", file.path);
239                    continue;
240                }
241            }
242
243            // Record in manifest.
244            self.manifest.insert(
245                file.path.clone(),
246                FileEntry {
247                    size: file.size,
248                    discovered_at: file.modified_ms,
249                    ingested_at: now_millis(),
250                },
251            );
252        }
253
254        // Evict old manifest entries if needed.
255        if let Some(cfg) = &self.config {
256            let max_age_ms = cfg.manifest_retention_age_days * 24 * 60 * 60 * 1000;
257            self.manifest
258                .maybe_evict(cfg.manifest_retention_count, max_age_ms);
259        }
260
261        if all_batches.is_empty() {
262            return Ok(None);
263        }
264
265        // Concatenate all batches.
266        let combined = if all_batches.len() == 1 {
267            all_batches.into_iter().next().unwrap()
268        } else {
269            arrow_select::concat::concat_batches(&self.schema, &all_batches)
270                .map_err(|e| ConnectorError::ReadError(format!("batch concat error: {e}")))?
271        };
272
273        Ok(Some(SourceBatch::new(combined)))
274    }
275
276    fn schema(&self) -> SchemaRef {
277        self.schema.clone()
278    }
279
280    fn checkpoint(&self) -> SourceCheckpoint {
281        let mut cp = SourceCheckpoint::new(0);
282        self.manifest.to_checkpoint(&mut cp);
283        cp
284    }
285
286    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
287        match FileIngestionManifest::from_checkpoint(checkpoint) {
288            Ok(manifest) => {
289                info!(
290                    "file source: restored manifest with {} active entries",
291                    manifest.active_count()
292                );
293                self.manifest = manifest;
294            }
295            Err(e) => {
296                warn!("file source: manifest restore failed: {e} — starting fresh");
297                self.manifest = FileIngestionManifest::new();
298            }
299        }
300        Ok(())
301    }
302
303    fn health_check(&self) -> HealthStatus {
304        if self.is_open {
305            HealthStatus::Healthy
306        } else {
307            HealthStatus::Unknown
308        }
309    }
310
311    async fn close(&mut self) -> Result<(), ConnectorError> {
312        self.discovery = None;
313        self.decoder = None;
314        self.is_open = false;
315        info!("file source closed");
316        Ok(())
317    }
318
319    fn supports_replay(&self) -> bool {
320        true
321    }
322}
323
324// ── Helpers ──────────────────────────────────────────────────────────
325
326fn build_decoder_and_schema(
327    format: FileFormat,
328    src_config: &FileSourceConfig,
329    connector_config: &ConnectorConfig,
330) -> Result<(Box<dyn FormatDecoder>, SchemaRef), ConnectorError> {
331    match format {
332        FileFormat::Csv => {
333            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
334                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
335            });
336            let csv_config = crate::schema::CsvDecoderConfig {
337                delimiter: src_config.csv_delimiter,
338                has_header: src_config.csv_has_header,
339                ..crate::schema::CsvDecoderConfig::default()
340            };
341            let decoder = crate::schema::CsvDecoder::with_config(schema.clone(), csv_config);
342            Ok((Box::new(decoder), schema))
343        }
344        FileFormat::Json => {
345            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
346                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
347            });
348            let json_config = crate::schema::JsonDecoderConfig::default();
349            let decoder = crate::schema::JsonDecoder::with_config(schema.clone(), json_config);
350            Ok((Box::new(decoder), schema))
351        }
352        FileFormat::Text => {
353            let decoder = TextLineDecoder::new();
354            let schema = decoder.output_schema();
355            Ok((Box::new(decoder), schema))
356        }
357        FileFormat::Parquet => {
358            // For Parquet, schema comes from the file footer (authoritative).
359            // Use a placeholder schema; it will be refined on the first file read.
360            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
361                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
362            });
363            let decoder = crate::schema::parquet::ParquetDecoder::new(schema.clone());
364            Ok((Box::new(decoder), schema))
365        }
366        FileFormat::ArrowIpc => {
367            // Arrow IPC files embed their schema in the header — use it as
368            // authoritative. The DDL schema is used as placeholder until
369            // the first file is read.
370            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
371                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
372            });
373            let decoder = super::arrow_ipc_codec::ArrowIpcDecoder::new(schema.clone());
374            Ok((Box::new(decoder), schema))
375        }
376    }
377}
378
379async fn read_file_bytes(path: &str) -> Result<Vec<u8>, ConnectorError> {
380    // Cloud paths would use object_store; local paths use tokio::fs.
381    if path.starts_with("s3://")
382        || path.starts_with("gs://")
383        || path.starts_with("az://")
384        || path.starts_with("abfs://")
385        || path.starts_with("abfss://")
386    {
387        Err(ConnectorError::ConfigurationError(
388            "cloud file reading not yet implemented in poll_batch; use local paths".into(),
389        ))
390    } else {
391        tokio::fs::read(path)
392            .await
393            .map_err(|e| ConnectorError::ReadError(format!("cannot read file '{path}': {e}")))
394    }
395}
396
397fn append_metadata_column(
398    batch: &RecordBatch,
399    file_path: &str,
400    file_size: u64,
401    modified_ms: u64,
402) -> Result<RecordBatch, ConnectorError> {
403    use arrow_array::{ArrayRef, StringArray, StructArray, UInt64Array};
404
405    let n = batch.num_rows();
406    let file_name = std::path::Path::new(file_path)
407        .file_name()
408        .and_then(|n| n.to_str())
409        .unwrap_or(file_path);
410
411    let path_array: ArrayRef = Arc::new(StringArray::from(vec![file_path; n]));
412    let name_array: ArrayRef = Arc::new(StringArray::from(vec![file_name; n]));
413    let size_array: ArrayRef = Arc::new(UInt64Array::from(vec![file_size; n]));
414    #[allow(clippy::cast_possible_wrap)]
415    let mod_array: ArrayRef = Arc::new(arrow_array::TimestampMillisecondArray::from(vec![
416        modified_ms as i64;
417        n
418    ]));
419
420    let fields = vec![
421        Field::new("file_path", DataType::Utf8, false),
422        Field::new("file_name", DataType::Utf8, false),
423        Field::new("file_size", DataType::UInt64, false),
424        Field::new(
425            "file_modification_time",
426            DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
427            true,
428        ),
429    ];
430    let struct_array = StructArray::try_new(
431        fields.into(),
432        vec![path_array, name_array, size_array, mod_array],
433        None,
434    )
435    .map_err(|e| ConnectorError::ReadError(format!("metadata struct error: {e}")))?;
436
437    // Append struct column to batch.
438    let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
439    columns.push(Arc::new(struct_array));
440
441    let mut fields: Vec<Field> = batch
442        .schema()
443        .fields()
444        .iter()
445        .map(|f| f.as_ref().clone())
446        .collect();
447    fields.push(Field::new(
448        "_metadata",
449        DataType::Struct(
450            vec![
451                Field::new("file_path", DataType::Utf8, false),
452                Field::new("file_name", DataType::Utf8, false),
453                Field::new("file_size", DataType::UInt64, false),
454                Field::new(
455                    "file_modification_time",
456                    DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
457                    true,
458                ),
459            ]
460            .into(),
461        ),
462        false,
463    ));
464
465    RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
466        .map_err(|e| ConnectorError::ReadError(format!("metadata append error: {e}")))
467}
468
469#[allow(clippy::cast_possible_truncation)]
470fn now_millis() -> u64 {
471    SystemTime::now()
472        .duration_since(UNIX_EPOCH)
473        .unwrap_or_default()
474        .as_millis() as u64
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480
481    #[test]
482    fn test_file_source_default() {
483        let source = FileSource::new();
484        assert!(!source.is_open);
485        assert_eq!(source.manifest.active_count(), 0);
486    }
487
488    #[tokio::test]
489    async fn test_open_missing_path() {
490        let mut source = FileSource::new();
491        let config = ConnectorConfig::new("files");
492        let result = source.open(&config).await;
493        assert!(result.is_err());
494    }
495
496    #[tokio::test]
497    async fn test_open_with_text_format() {
498        let mut source = FileSource::new();
499        let mut config = ConnectorConfig::new("files");
500        config.set("path", "/tmp");
501        config.set("format", "text");
502        let result = source.open(&config).await;
503        assert!(result.is_ok());
504        assert!(source.is_open);
505        assert_eq!(source.schema().field(0).name(), "line");
506        source.close().await.unwrap();
507    }
508
509    #[tokio::test]
510    async fn test_poll_batch_when_not_open() {
511        let mut source = FileSource::new();
512        let result = source.poll_batch(100).await;
513        assert!(result.is_err());
514    }
515
516    #[test]
517    fn test_checkpoint_roundtrip() {
518        let mut source = FileSource::new();
519        source.manifest.insert(
520            "test.csv".into(),
521            FileEntry {
522                size: 100,
523                discovered_at: 1000,
524                ingested_at: 2000,
525            },
526        );
527        let cp = source.checkpoint();
528        assert!(cp.get_offset("manifest").is_some());
529    }
530
531    #[tokio::test]
532    async fn test_restore_from_checkpoint() {
533        let mut source = FileSource::new();
534
535        // Build a checkpoint with manifest data.
536        let mut cp = SourceCheckpoint::new(1);
537        cp.set_offset(
538            "manifest",
539            r#"{"a.csv":{"size":100,"discovered_at":900,"ingested_at":1000}}"#,
540        );
541
542        source.restore(&cp).await.unwrap();
543        assert_eq!(source.manifest.active_count(), 1);
544        assert!(source.manifest.contains("a.csv"));
545    }
546
547    #[test]
548    fn test_health_check() {
549        let source = FileSource::new();
550        assert!(matches!(source.health_check(), HealthStatus::Unknown));
551    }
552
553    #[test]
554    fn test_supports_replay() {
555        let source = FileSource::new();
556        assert!(source.supports_replay());
557    }
558}