Skip to main content

laminar_connectors/files/
source.rs

1//! File source connector implementing [`SourceConnector`].
2//!
3//! Watches a directory or cloud path for new files, decodes them using the
4//! configured format (CSV, JSON, text, Parquet), and produces `RecordBatch`es.
5
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use arrow_array::RecordBatch;
10use arrow_schema::{DataType, Field, Schema, SchemaRef};
11use async_trait::async_trait;
12use tracing::{debug, info, warn};
13
14use crate::checkpoint::SourceCheckpoint;
15use crate::config::ConnectorConfig;
16use crate::connector::{SourceBatch, SourceConnector};
17use crate::error::ConnectorError;
18use crate::schema::traits::FormatDecoder;
19use crate::schema::types::RawRecord;
20
21use super::config::{FileFormat, FileSourceConfig};
22use super::discovery::{DiscoveryConfig, FileDiscoveryEngine};
23use super::manifest::{FileEntry, FileIngestionManifest};
24use super::text_decoder::TextLineDecoder;
25
26/// AutoLoader-style file source connector.
27///
28/// Watches a directory (local or cloud) for new files, infers schema if
29/// needed, and produces `RecordBatch`es via `poll_batch()`.
30pub struct FileSource {
31    /// Parsed configuration.
32    config: Option<FileSourceConfig>,
33    /// Output Arrow schema (resolved in `open()`).
34    schema: SchemaRef,
35    /// Format decoder (created in `open()`).
36    decoder: Option<Box<dyn FormatDecoder>>,
37    /// File discovery engine (started in `open()`).
38    discovery: Option<FileDiscoveryEngine>,
39    /// File ingestion manifest (tracks processed files).
40    manifest: FileIngestionManifest,
41    /// Whether the connector is open.
42    is_open: bool,
43}
44
45impl FileSource {
46    /// Creates a new file source with a placeholder schema.
47    #[must_use]
48    pub fn new() -> Self {
49        Self::with_registry(None)
50    }
51
52    /// Creates a new file source with an optional Prometheus registry.
53    #[must_use]
54    pub fn with_registry(_registry: Option<&prometheus::Registry>) -> Self {
55        let empty_schema = Arc::new(Schema::empty());
56        Self {
57            config: None,
58            schema: empty_schema,
59            decoder: None,
60            discovery: None,
61            manifest: FileIngestionManifest::new(),
62            is_open: false,
63        }
64    }
65}
66
67impl Default for FileSource {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl std::fmt::Debug for FileSource {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("FileSource")
76            .field("is_open", &self.is_open)
77            .field("schema_fields", &self.schema.fields().len())
78            .field("manifest_count", &self.manifest.active_count())
79            .finish()
80    }
81}
82
83#[async_trait]
84impl SourceConnector for FileSource {
85    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
86        let src_config = FileSourceConfig::from_connector_config(config)?;
87
88        // Cloud read isn't implemented in `read_file_bytes`; rejecting at
89        // open() is the only honest answer. Silently accepting and then
90        // erroring on first poll leaves the connector "registered but
91        // dead" — far worse than a clear configuration error up front.
92        if is_cloud_url(&src_config.path) {
93            return Err(ConnectorError::ConfigurationError(format!(
94                "cloud paths are not supported by the 'files' source yet: {}",
95                src_config.path
96            )));
97        }
98
99        // Resolve format (explicit or auto-detect from path).
100        let format = match src_config.format {
101            Some(f) => f,
102            None => FileFormat::from_extension(&src_config.path).ok_or_else(|| {
103                ConnectorError::ConfigurationError(
104                    "cannot detect format from path; specify 'format' explicitly".into(),
105                )
106            })?,
107        };
108
109        // Build decoder and resolve schema.
110        let (decoder, schema) = build_decoder_and_schema(format, &src_config, config)?;
111
112        // Optionally append _metadata struct column.
113        let final_schema = if src_config.include_metadata {
114            let mut fields: Vec<Field> =
115                schema.fields().iter().map(|f| f.as_ref().clone()).collect();
116            fields.push(Field::new(
117                "_metadata",
118                DataType::Struct(
119                    vec![
120                        Field::new("file_path", DataType::Utf8, false),
121                        Field::new("file_name", DataType::Utf8, false),
122                        Field::new("file_size", DataType::UInt64, false),
123                        Field::new(
124                            "file_modification_time",
125                            DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
126                            true,
127                        ),
128                    ]
129                    .into(),
130                ),
131                false,
132            ));
133            Arc::new(Schema::new(fields))
134        } else {
135            schema
136        };
137
138        // Start discovery engine with a snapshot of the current manifest for dedup.
139        let discovery_config = DiscoveryConfig {
140            path: src_config.path.clone(),
141            poll_interval: src_config.poll_interval,
142            stabilisation_delay: src_config.stabilisation_delay,
143            glob_pattern: src_config.glob_pattern.clone(),
144        };
145        let known = Arc::new(self.manifest.snapshot_for_dedup());
146        let discovery = FileDiscoveryEngine::start(discovery_config, known);
147
148        self.config = Some(src_config);
149        self.schema = final_schema;
150        self.decoder = Some(decoder);
151        self.discovery = Some(discovery);
152        self.is_open = true;
153
154        info!(
155            "file source opened: format={format:?}, schema_fields={}",
156            self.schema.fields().len()
157        );
158        Ok(())
159    }
160
161    async fn poll_batch(
162        &mut self,
163        _max_records: usize,
164    ) -> Result<Option<SourceBatch>, ConnectorError> {
165        let config = self
166            .config
167            .as_ref()
168            .ok_or_else(|| ConnectorError::InvalidState {
169                expected: "open".into(),
170                actual: "closed".into(),
171            })?;
172        let discovery = self
173            .discovery
174            .as_mut()
175            .ok_or_else(|| ConnectorError::InvalidState {
176                expected: "discovery running".into(),
177                actual: "no discovery".into(),
178            })?;
179        let decoder = self
180            .decoder
181            .as_ref()
182            .ok_or_else(|| ConnectorError::InvalidState {
183                expected: "decoder ready".into(),
184                actual: "no decoder".into(),
185            })?;
186
187        let files = discovery.drain(config.max_files_per_poll);
188        if files.is_empty() {
189            return Ok(None);
190        }
191
192        let mut all_batches: Vec<RecordBatch> = Vec::new();
193
194        for file in &files {
195            // Size-change guard.
196            if !config.allow_overwrites && self.manifest.size_changed(&file.path, file.size) {
197                warn!(
198                    "file source: skipping '{}' — size changed (was {}, now {})",
199                    file.path,
200                    self.manifest
201                        .active_entries()
202                        .find(|(p, _)| *p == file.path)
203                        .map(|(_, e)| e.size)
204                        .unwrap_or(0),
205                    file.size
206                );
207                continue;
208            }
209
210            // Already ingested check (belt-and-suspenders with discovery dedup).
211            if self.manifest.contains(&file.path) {
212                continue;
213            }
214
215            // Max file size guard (primarily for Parquet).
216            if file.size > config.max_file_bytes as u64 {
217                warn!(
218                    "file source: skipping '{}' — size {} exceeds max_file_bytes {}",
219                    file.path, file.size, config.max_file_bytes
220                );
221                continue;
222            }
223
224            // Read file contents.
225            let bytes = match read_file_bytes(&file.path).await {
226                Ok(b) => b,
227                Err(e) => {
228                    warn!("file source: cannot read '{}': {e}", file.path);
229                    continue;
230                }
231            };
232
233            // Decode.
234            let record = RawRecord::new(bytes);
235            match decoder.decode_batch(&[record]) {
236                Ok(batch) if batch.num_rows() > 0 => {
237                    let batch = if config.include_metadata {
238                        append_metadata_column(&batch, &file.path, file.size, file.modified_ms)?
239                    } else {
240                        batch
241                    };
242                    all_batches.push(batch);
243                }
244                Ok(_) => {
245                    debug!("file source: empty batch from '{}'", file.path);
246                }
247                Err(e) => {
248                    warn!("file source: decode error for '{}': {e}", file.path);
249                    continue;
250                }
251            }
252
253            // Record in manifest.
254            self.manifest.insert(
255                file.path.clone(),
256                FileEntry {
257                    size: file.size,
258                    discovered_at: file.modified_ms,
259                    ingested_at: now_millis(),
260                },
261            );
262        }
263
264        // Evict old manifest entries if needed.
265        if let Some(cfg) = &self.config {
266            let max_age_ms = cfg.manifest_retention_age_days * 24 * 60 * 60 * 1000;
267            self.manifest
268                .maybe_evict(cfg.manifest_retention_count, max_age_ms);
269        }
270
271        if all_batches.is_empty() {
272            return Ok(None);
273        }
274
275        // Concatenate all batches.
276        let combined = if all_batches.len() == 1 {
277            all_batches.into_iter().next().unwrap()
278        } else {
279            arrow_select::concat::concat_batches(&self.schema, &all_batches)
280                .map_err(|e| ConnectorError::ReadError(format!("batch concat error: {e}")))?
281        };
282
283        Ok(Some(SourceBatch::new(combined)))
284    }
285
286    fn schema(&self) -> SchemaRef {
287        self.schema.clone()
288    }
289
290    fn checkpoint(&self) -> SourceCheckpoint {
291        let mut cp = SourceCheckpoint::new(0);
292        self.manifest.to_checkpoint(&mut cp);
293        cp
294    }
295
296    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
297        match FileIngestionManifest::from_checkpoint(checkpoint) {
298            Ok(manifest) => {
299                info!(
300                    "file source: restored manifest with {} active entries",
301                    manifest.active_count()
302                );
303                self.manifest = manifest;
304            }
305            Err(e) => {
306                warn!("file source: manifest restore failed: {e} — starting fresh");
307                self.manifest = FileIngestionManifest::new();
308            }
309        }
310        Ok(())
311    }
312
313    async fn close(&mut self) -> Result<(), ConnectorError> {
314        self.discovery = None;
315        self.decoder = None;
316        self.is_open = false;
317        info!("file source closed");
318        Ok(())
319    }
320
321    fn supports_replay(&self) -> bool {
322        true
323    }
324}
325
326// ── Helpers ──────────────────────────────────────────────────────────
327
328fn build_decoder_and_schema(
329    format: FileFormat,
330    src_config: &FileSourceConfig,
331    connector_config: &ConnectorConfig,
332) -> Result<(Box<dyn FormatDecoder>, SchemaRef), ConnectorError> {
333    match format {
334        FileFormat::Csv => {
335            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
336                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
337            });
338            let csv_config = crate::schema::CsvDecoderConfig {
339                delimiter: src_config.csv_delimiter,
340                has_header: src_config.csv_has_header,
341                ..crate::schema::CsvDecoderConfig::default()
342            };
343            let decoder = crate::schema::CsvDecoder::with_config(schema.clone(), csv_config);
344            Ok((Box::new(decoder), schema))
345        }
346        FileFormat::Json => {
347            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
348                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
349            });
350            let json_config = crate::schema::JsonDecoderConfig::default();
351            let decoder = crate::schema::JsonDecoder::with_config(schema.clone(), json_config);
352            Ok((Box::new(decoder), schema))
353        }
354        FileFormat::Text => {
355            let decoder = TextLineDecoder::new();
356            let schema = decoder.output_schema();
357            Ok((Box::new(decoder), schema))
358        }
359        FileFormat::Parquet => {
360            // For Parquet, schema comes from the file footer (authoritative).
361            // Use a placeholder schema; it will be refined on the first file read.
362            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
363                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
364            });
365            let decoder = crate::schema::parquet::ParquetDecoder::new(schema.clone());
366            Ok((Box::new(decoder), schema))
367        }
368        FileFormat::ArrowIpc => {
369            // Arrow IPC files embed their schema in the header — use it as
370            // authoritative. The DDL schema is used as placeholder until
371            // the first file is read.
372            let schema = connector_config.arrow_schema().unwrap_or_else(|| {
373                Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]))
374            });
375            let decoder = super::arrow_ipc_codec::ArrowIpcDecoder::new(schema.clone());
376            Ok((Box::new(decoder), schema))
377        }
378    }
379}
380
381fn is_cloud_url(path: &str) -> bool {
382    const CLOUD_SCHEMES: &[&str] = &["s3", "s3a", "s3n", "gs", "gcs", "az", "abfs", "abfss"];
383    let Some((scheme, _)) = path.split_once("://") else {
384        return false;
385    };
386    CLOUD_SCHEMES.iter().any(|s| scheme.eq_ignore_ascii_case(s))
387}
388
389async fn read_file_bytes(path: &str) -> Result<Vec<u8>, ConnectorError> {
390    // Cloud paths are rejected at `open()`; this path is local-only.
391    debug_assert!(
392        !is_cloud_url(path),
393        "cloud paths must be rejected at open()"
394    );
395    tokio::fs::read(path)
396        .await
397        .map_err(|e| ConnectorError::ReadError(format!("cannot read file '{path}': {e}")))
398}
399
400fn append_metadata_column(
401    batch: &RecordBatch,
402    file_path: &str,
403    file_size: u64,
404    modified_ms: u64,
405) -> Result<RecordBatch, ConnectorError> {
406    use arrow_array::{ArrayRef, StringArray, StructArray, UInt64Array};
407
408    let n = batch.num_rows();
409    let file_name = std::path::Path::new(file_path)
410        .file_name()
411        .and_then(|n| n.to_str())
412        .unwrap_or(file_path);
413
414    let path_array: ArrayRef = Arc::new(StringArray::from(vec![file_path; n]));
415    let name_array: ArrayRef = Arc::new(StringArray::from(vec![file_name; n]));
416    let size_array: ArrayRef = Arc::new(UInt64Array::from(vec![file_size; n]));
417    #[allow(clippy::cast_possible_wrap)]
418    let mod_array: ArrayRef = Arc::new(arrow_array::TimestampMillisecondArray::from(vec![
419        modified_ms as i64;
420        n
421    ]));
422
423    let fields = vec![
424        Field::new("file_path", DataType::Utf8, false),
425        Field::new("file_name", DataType::Utf8, false),
426        Field::new("file_size", DataType::UInt64, false),
427        Field::new(
428            "file_modification_time",
429            DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
430            true,
431        ),
432    ];
433    let struct_array = StructArray::try_new(
434        fields.into(),
435        vec![path_array, name_array, size_array, mod_array],
436        None,
437    )
438    .map_err(|e| ConnectorError::ReadError(format!("metadata struct error: {e}")))?;
439
440    // Append struct column to batch.
441    let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
442    columns.push(Arc::new(struct_array));
443
444    let mut fields: Vec<Field> = batch
445        .schema()
446        .fields()
447        .iter()
448        .map(|f| f.as_ref().clone())
449        .collect();
450    fields.push(Field::new(
451        "_metadata",
452        DataType::Struct(
453            vec![
454                Field::new("file_path", DataType::Utf8, false),
455                Field::new("file_name", DataType::Utf8, false),
456                Field::new("file_size", DataType::UInt64, false),
457                Field::new(
458                    "file_modification_time",
459                    DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
460                    true,
461                ),
462            ]
463            .into(),
464        ),
465        false,
466    ));
467
468    RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
469        .map_err(|e| ConnectorError::ReadError(format!("metadata append error: {e}")))
470}
471
472#[allow(clippy::cast_possible_truncation)]
473fn now_millis() -> u64 {
474    SystemTime::now()
475        .duration_since(UNIX_EPOCH)
476        .unwrap_or_default()
477        .as_millis() as u64
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483
484    #[test]
485    fn test_file_source_default() {
486        let source = FileSource::new();
487        assert!(!source.is_open);
488        assert_eq!(source.manifest.active_count(), 0);
489    }
490
491    #[tokio::test]
492    async fn test_open_missing_path() {
493        let mut source = FileSource::new();
494        let config = ConnectorConfig::new("files");
495        let result = source.open(&config).await;
496        assert!(result.is_err());
497    }
498
499    #[tokio::test]
500    async fn test_open_with_text_format() {
501        let mut source = FileSource::new();
502        let mut config = ConnectorConfig::new("files");
503        config.set("path", "/tmp");
504        config.set("format", "text");
505        let result = source.open(&config).await;
506        assert!(result.is_ok());
507        assert!(source.is_open);
508        assert_eq!(source.schema().field(0).name(), "line");
509        source.close().await.unwrap();
510    }
511
512    #[tokio::test]
513    async fn test_poll_batch_when_not_open() {
514        let mut source = FileSource::new();
515        let result = source.poll_batch(100).await;
516        assert!(result.is_err());
517    }
518
519    #[test]
520    fn test_checkpoint_roundtrip() {
521        let mut source = FileSource::new();
522        source.manifest.insert(
523            "test.csv".into(),
524            FileEntry {
525                size: 100,
526                discovered_at: 1000,
527                ingested_at: 2000,
528            },
529        );
530        let cp = source.checkpoint();
531        assert!(cp.get_offset("manifest").is_some());
532    }
533
534    #[tokio::test]
535    async fn test_restore_from_checkpoint() {
536        let mut source = FileSource::new();
537
538        // Build a checkpoint with manifest data.
539        let mut cp = SourceCheckpoint::new(1);
540        cp.set_offset(
541            "manifest",
542            r#"{"a.csv":{"size":100,"discovered_at":900,"ingested_at":1000}}"#,
543        );
544
545        source.restore(&cp).await.unwrap();
546        assert_eq!(source.manifest.active_count(), 1);
547        assert!(source.manifest.contains("a.csv"));
548    }
549
550    #[test]
551    fn test_supports_replay() {
552        let source = FileSource::new();
553        assert!(source.supports_replay());
554    }
555}