Skip to main content

laminar_connectors/files/
config.rs

1//! Configuration types for the file source and sink connectors.
2
3use std::collections::HashMap;
4use std::time::Duration;
5
6use crate::config::ConnectorConfig;
7use crate::error::ConnectorError;
8use laminar_core::time::parse_duration_str;
9
10/// Parsed configuration for [`super::source::FileSource`].
11#[derive(Debug, Clone)]
12pub struct FileSourceConfig {
13    /// Directory path, glob pattern, or cloud URL.
14    pub path: String,
15
16    /// Data format (`csv`, `json`, `text`, `parquet`). `None` = auto-detect.
17    pub format: Option<FileFormat>,
18
19    /// Discovery polling interval.
20    pub poll_interval: Duration,
21
22    /// Wait after last modify event before considering a file complete.
23    pub stabilisation_delay: Duration,
24
25    /// Maximum files to process per `poll_batch` call.
26    pub max_files_per_poll: usize,
27
28    /// Whether to append a `_metadata` struct column.
29    pub include_metadata: bool,
30
31    /// Whether to allow re-ingesting files whose size has changed.
32    pub allow_overwrites: bool,
33
34    /// Maximum active entries in the file ingestion manifest.
35    pub manifest_retention_count: usize,
36
37    /// Maximum age (days) for manifest entries before eviction.
38    pub manifest_retention_age_days: u64,
39
40    /// Safety limit for reading a single file (bytes). Primarily for Parquet.
41    pub max_file_bytes: usize,
42
43    /// Additional glob pattern to filter discovered file names.
44    pub glob_pattern: Option<String>,
45
46    /// CSV-specific: field delimiter.
47    pub csv_delimiter: u8,
48
49    /// CSV-specific: whether the first row is a header.
50    pub csv_has_header: bool,
51}
52
53impl FileSourceConfig {
54    /// Parse from connector properties.
55    ///
56    /// # Errors
57    ///
58    /// Returns `ConnectorError::ConfigurationError` if required options are
59    /// missing or values cannot be parsed.
60    pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
61        let props = config.properties();
62
63        let path = props
64            .get("path")
65            .cloned()
66            .ok_or_else(|| ConnectorError::ConfigurationError("'path' is required".into()))?;
67
68        let format = props
69            .get("format")
70            .map(|s| FileFormat::parse(s))
71            .transpose()?;
72
73        let poll_interval = parse_duration(props, "poll_interval", Duration::from_secs(10))?;
74        let stabilisation_delay =
75            parse_duration(props, "stabilisation_delay", Duration::from_secs(1))?;
76        let max_files_per_poll = parse_usize(props, "max_files_per_poll", 100)?;
77        let include_metadata = parse_bool(props, "include_metadata", false)?;
78        let allow_overwrites = parse_bool(props, "allow_overwrites", false)?;
79        let manifest_retention_count = parse_usize(props, "manifest_retention_count", 100_000)?;
80        let manifest_retention_age_days = parse_u64(props, "manifest_retention_age_days", 90)?;
81        let max_file_bytes = parse_usize(props, "max_file_bytes", 256 * 1024 * 1024)?;
82        let glob_pattern = props.get("glob_pattern").cloned();
83
84        // Default delimiter: tab for tsv, comma otherwise.
85        let is_tsv = props
86            .get("format")
87            .is_some_and(|f| f.eq_ignore_ascii_case("tsv"));
88        let csv_delimiter = props
89            .get("csv.delimiter")
90            .and_then(|s| s.as_bytes().first().copied())
91            .unwrap_or(if is_tsv { b'\t' } else { b',' });
92        let csv_has_header = parse_bool(props, "csv.has_header", true)?;
93
94        Ok(Self {
95            path,
96            format,
97            poll_interval,
98            stabilisation_delay,
99            max_files_per_poll,
100            include_metadata,
101            allow_overwrites,
102            manifest_retention_count,
103            manifest_retention_age_days,
104            max_file_bytes,
105            glob_pattern,
106            csv_delimiter,
107            csv_has_header,
108        })
109    }
110}
111
112/// Parsed configuration for [`super::sink::FileSink`].
113#[derive(Debug, Clone)]
114pub struct FileSinkConfig {
115    /// Output directory path.
116    pub path: String,
117
118    /// Output format.
119    pub format: FileFormat,
120
121    /// Write mode.
122    pub mode: SinkMode,
123
124    /// File name prefix for rolling mode.
125    pub prefix: String,
126
127    /// Maximum file size before rotation (row formats only).
128    pub max_file_size: Option<usize>,
129
130    /// Parquet compression codec.
131    pub compression: String,
132
133    /// Maximum batches to buffer per epoch for Parquet (default: 10,000).
134    pub max_epoch_batches: usize,
135}
136
137impl FileSinkConfig {
138    /// Parse from connector properties.
139    ///
140    /// # Errors
141    ///
142    /// Returns `ConnectorError::ConfigurationError` if required options are
143    /// missing or values cannot be parsed.
144    pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
145        let props = config.properties();
146
147        let path = props
148            .get("path")
149            .cloned()
150            .ok_or_else(|| ConnectorError::ConfigurationError("'path' is required".into()))?;
151
152        let format = props
153            .get("format")
154            .ok_or_else(|| {
155                ConnectorError::ConfigurationError("'format' is required for file sink".into())
156            })
157            .and_then(|s| FileFormat::parse(s))?;
158
159        let mode = match props.get("mode").map(String::as_str) {
160            Some("append") => SinkMode::Append,
161            Some("rolling") | None => SinkMode::Rolling,
162            Some(other) => {
163                return Err(ConnectorError::ConfigurationError(format!(
164                    "unknown sink mode: '{other}' (expected 'append' or 'rolling')"
165                )));
166            }
167        };
168
169        let prefix = props
170            .get("prefix")
171            .cloned()
172            .unwrap_or_else(|| "part".to_string());
173
174        let max_file_size = props
175            .get("max_file_size")
176            .map(|s| {
177                s.parse::<usize>().map_err(|e| {
178                    ConnectorError::ConfigurationError(format!("invalid max_file_size: {e}"))
179                })
180            })
181            .transpose()?;
182
183        let compression = props
184            .get("compression")
185            .cloned()
186            .unwrap_or_else(|| "snappy".to_string());
187
188        let max_epoch_batches = props
189            .get("max_epoch_batches")
190            .map(|s| {
191                s.parse::<usize>().map_err(|e| {
192                    ConnectorError::ConfigurationError(format!("invalid max_epoch_batches: {e}"))
193                })
194            })
195            .transpose()?
196            .unwrap_or(10_000);
197        if max_epoch_batches == 0 {
198            return Err(ConnectorError::ConfigurationError(
199                "max_epoch_batches must be > 0".into(),
200            ));
201        }
202
203        Ok(Self {
204            path,
205            format,
206            mode,
207            prefix,
208            max_file_size,
209            compression,
210            max_epoch_batches,
211        })
212    }
213}
214
215/// Supported file formats.
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217pub enum FileFormat {
218    /// Comma-separated values.
219    Csv,
220    /// Newline-delimited JSON.
221    Json,
222    /// Plain text (one line per record).
223    Text,
224    /// Apache Parquet.
225    Parquet,
226    /// Arrow IPC file format (random-access `.arrow` files).
227    ArrowIpc,
228}
229
230impl FileFormat {
231    /// Parse a format string.
232    ///
233    /// # Errors
234    ///
235    /// Returns `ConnectorError::ConfigurationError` for unknown formats.
236    pub fn parse(s: &str) -> Result<Self, ConnectorError> {
237        match s.to_lowercase().as_str() {
238            "csv" | "tsv" => Ok(Self::Csv),
239            "json" | "jsonl" | "ndjson" | "json_lines" => Ok(Self::Json),
240            "text" | "txt" | "plain" => Ok(Self::Text),
241            "parquet" | "parq" => Ok(Self::Parquet),
242            "arrow" | "ipc" | "arrow_ipc" => Ok(Self::ArrowIpc),
243            other => Err(ConnectorError::ConfigurationError(format!(
244                "unknown file format: '{other}' (expected csv, json, text, parquet, or arrow)"
245            ))),
246        }
247    }
248
249    /// Detect format from a file path extension.
250    pub fn from_extension(path: &str) -> Option<Self> {
251        let ext = path.rsplit('.').next()?.to_lowercase();
252        match ext.as_str() {
253            "csv" | "tsv" => Some(Self::Csv),
254            "json" | "jsonl" | "ndjson" => Some(Self::Json),
255            "txt" | "log" => Some(Self::Text),
256            "parquet" | "parq" => Some(Self::Parquet),
257            "arrow" | "ipc" => Some(Self::ArrowIpc),
258            _ => None,
259        }
260    }
261
262    /// Returns the canonical file extension for this format.
263    #[must_use]
264    pub fn extension(&self) -> &'static str {
265        match self {
266            Self::Csv => "csv",
267            Self::Json => "jsonl",
268            Self::Text => "txt",
269            Self::Parquet => "parquet",
270            Self::ArrowIpc => "arrow",
271        }
272    }
273
274    /// Whether this is a columnar/bulk format (cannot be truncated mid-file).
275    #[must_use]
276    pub fn is_bulk_format(&self) -> bool {
277        matches!(self, Self::Parquet | Self::ArrowIpc)
278    }
279}
280
281/// Sink write mode.
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283pub enum SinkMode {
284    /// Append to a single file.
285    Append,
286    /// Create a new file per epoch (with optional mid-epoch rotation for row formats).
287    Rolling,
288}
289
290// ── Helpers ──────────────────────────────────────────────────────────
291
292fn parse_duration(
293    props: &HashMap<String, String>,
294    key: &str,
295    default: Duration,
296) -> Result<Duration, ConnectorError> {
297    match props.get(key) {
298        Some(s) => parse_duration_str(s).ok_or_else(|| {
299            ConnectorError::ConfigurationError(format!("invalid duration for {key}: '{s}'"))
300        }),
301        None => Ok(default),
302    }
303}
304
305fn parse_usize(
306    props: &HashMap<String, String>,
307    key: &str,
308    default: usize,
309) -> Result<usize, ConnectorError> {
310    match props.get(key) {
311        Some(s) => s
312            .parse()
313            .map_err(|e| ConnectorError::ConfigurationError(format!("invalid {key}: {e}"))),
314        None => Ok(default),
315    }
316}
317
318fn parse_u64(
319    props: &HashMap<String, String>,
320    key: &str,
321    default: u64,
322) -> Result<u64, ConnectorError> {
323    match props.get(key) {
324        Some(s) => s
325            .parse()
326            .map_err(|e| ConnectorError::ConfigurationError(format!("invalid {key}: {e}"))),
327        None => Ok(default),
328    }
329}
330
331fn parse_bool(
332    props: &HashMap<String, String>,
333    key: &str,
334    default: bool,
335) -> Result<bool, ConnectorError> {
336    match props.get(key) {
337        Some(s) => match s.to_lowercase().as_str() {
338            "true" | "1" | "yes" => Ok(true),
339            "false" | "0" | "no" => Ok(false),
340            other => Err(ConnectorError::ConfigurationError(format!(
341                "invalid boolean for {key}: '{other}'"
342            ))),
343        },
344        None => Ok(default),
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    #[test]
353    fn test_file_format_parse() {
354        assert_eq!(FileFormat::parse("csv").unwrap(), FileFormat::Csv);
355        assert_eq!(FileFormat::parse("JSON").unwrap(), FileFormat::Json);
356        assert_eq!(FileFormat::parse("jsonl").unwrap(), FileFormat::Json);
357        assert_eq!(FileFormat::parse("ndjson").unwrap(), FileFormat::Json);
358        assert_eq!(FileFormat::parse("text").unwrap(), FileFormat::Text);
359        assert_eq!(FileFormat::parse("parquet").unwrap(), FileFormat::Parquet);
360        assert_eq!(FileFormat::parse("parq").unwrap(), FileFormat::Parquet);
361        assert_eq!(FileFormat::parse("arrow").unwrap(), FileFormat::ArrowIpc);
362        assert_eq!(FileFormat::parse("ipc").unwrap(), FileFormat::ArrowIpc);
363        assert_eq!(
364            FileFormat::parse("arrow_ipc").unwrap(),
365            FileFormat::ArrowIpc
366        );
367        assert!(FileFormat::parse("xml").is_err());
368    }
369
370    #[test]
371    fn test_file_format_from_extension() {
372        assert_eq!(
373            FileFormat::from_extension("/data/logs/app.csv"),
374            Some(FileFormat::Csv)
375        );
376        assert_eq!(
377            FileFormat::from_extension("events.jsonl"),
378            Some(FileFormat::Json)
379        );
380        assert_eq!(
381            FileFormat::from_extension("data.parquet"),
382            Some(FileFormat::Parquet)
383        );
384        assert_eq!(
385            FileFormat::from_extension("log.txt"),
386            Some(FileFormat::Text)
387        );
388        assert_eq!(
389            FileFormat::from_extension("data.arrow"),
390            Some(FileFormat::ArrowIpc)
391        );
392        assert_eq!(
393            FileFormat::from_extension("stream.ipc"),
394            Some(FileFormat::ArrowIpc)
395        );
396        assert_eq!(FileFormat::from_extension("file.bin"), None);
397    }
398
399    #[test]
400    fn test_file_format_extension() {
401        assert_eq!(FileFormat::Csv.extension(), "csv");
402        assert_eq!(FileFormat::Json.extension(), "jsonl");
403        assert_eq!(FileFormat::Text.extension(), "txt");
404        assert_eq!(FileFormat::Parquet.extension(), "parquet");
405        assert_eq!(FileFormat::ArrowIpc.extension(), "arrow");
406    }
407
408    #[test]
409    fn test_file_format_is_bulk() {
410        assert!(!FileFormat::Csv.is_bulk_format());
411        assert!(!FileFormat::Json.is_bulk_format());
412        assert!(!FileFormat::Text.is_bulk_format());
413        assert!(FileFormat::ArrowIpc.is_bulk_format());
414        assert!(FileFormat::Parquet.is_bulk_format());
415    }
416
417    #[test]
418    fn test_source_config_from_connector() {
419        let mut config = ConnectorConfig::new("files");
420        config.set("path", "/data/logs/*.csv");
421        config.set("format", "csv");
422        config.set("max_files_per_poll", "50");
423        config.set("include_metadata", "true");
424
425        let src = FileSourceConfig::from_connector_config(&config).unwrap();
426        assert_eq!(src.path, "/data/logs/*.csv");
427        assert_eq!(src.format, Some(FileFormat::Csv));
428        assert_eq!(src.max_files_per_poll, 50);
429        assert!(src.include_metadata);
430        assert!(!src.allow_overwrites);
431    }
432
433    #[test]
434    fn test_source_config_missing_path() {
435        let config = ConnectorConfig::new("files");
436        assert!(FileSourceConfig::from_connector_config(&config).is_err());
437    }
438
439    #[test]
440    fn test_sink_config_from_connector() {
441        let mut config = ConnectorConfig::new("files");
442        config.set("path", "/output");
443        config.set("format", "parquet");
444        config.set("mode", "rolling");
445        config.set("compression", "zstd");
446
447        let sink = FileSinkConfig::from_connector_config(&config).unwrap();
448        assert_eq!(sink.path, "/output");
449        assert_eq!(sink.format, FileFormat::Parquet);
450        assert_eq!(sink.mode, SinkMode::Rolling);
451        assert_eq!(sink.compression, "zstd");
452    }
453
454    #[test]
455    fn test_sink_config_missing_format() {
456        let mut config = ConnectorConfig::new("files");
457        config.set("path", "/output");
458        assert!(FileSinkConfig::from_connector_config(&config).is_err());
459    }
460
461    #[test]
462    fn test_parse_duration_str() {
463        assert_eq!(parse_duration_str("10").unwrap(), Duration::from_secs(10));
464        assert_eq!(parse_duration_str("10s").unwrap(), Duration::from_secs(10));
465        assert_eq!(
466            parse_duration_str("500ms").unwrap(),
467            Duration::from_millis(500)
468        );
469    }
470
471    #[test]
472    fn test_sink_mode_default() {
473        let mut config = ConnectorConfig::new("files");
474        config.set("path", "/output");
475        config.set("format", "csv");
476        let sink = FileSinkConfig::from_connector_config(&config).unwrap();
477        assert_eq!(sink.mode, SinkMode::Rolling);
478    }
479}