Skip to main content

laminar_connectors/files/
config.rs

1//! Configuration types for the file source and sink connectors.
2
3use std::collections::HashMap;
4use std::time::Duration;
5
6use crate::config::ConnectorConfig;
7use crate::error::ConnectorError;
8
9/// Parsed configuration for [`super::source::FileSource`].
10#[derive(Debug, Clone)]
11pub struct FileSourceConfig {
12    /// Directory path, glob pattern, or cloud URL.
13    pub path: String,
14
15    /// Data format (`csv`, `json`, `text`, `parquet`). `None` = auto-detect.
16    pub format: Option<FileFormat>,
17
18    /// Discovery polling interval.
19    pub poll_interval: Duration,
20
21    /// Wait after last modify event before considering a file complete.
22    pub stabilisation_delay: Duration,
23
24    /// Maximum files to process per `poll_batch` call.
25    pub max_files_per_poll: usize,
26
27    /// Whether to append a `_metadata` struct column.
28    pub include_metadata: bool,
29
30    /// Whether to allow re-ingesting files whose size has changed.
31    pub allow_overwrites: bool,
32
33    /// Maximum active entries in the file ingestion manifest.
34    pub manifest_retention_count: usize,
35
36    /// Maximum age (days) for manifest entries before eviction.
37    pub manifest_retention_age_days: u64,
38
39    /// Safety limit for reading a single file (bytes). Primarily for Parquet.
40    pub max_file_bytes: usize,
41
42    /// Additional glob pattern to filter discovered file names.
43    pub glob_pattern: Option<String>,
44
45    /// CSV-specific: field delimiter.
46    pub csv_delimiter: u8,
47
48    /// CSV-specific: whether the first row is a header.
49    pub csv_has_header: bool,
50}
51
52impl FileSourceConfig {
53    /// Parse from connector properties.
54    ///
55    /// # Errors
56    ///
57    /// Returns `ConnectorError::ConfigurationError` if required options are
58    /// missing or values cannot be parsed.
59    pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
60        let props = config.properties();
61
62        let path = props
63            .get("path")
64            .cloned()
65            .ok_or_else(|| ConnectorError::ConfigurationError("'path' is required".into()))?;
66
67        let format = props
68            .get("format")
69            .map(|s| FileFormat::parse(s))
70            .transpose()?;
71
72        let poll_interval = parse_duration(props, "poll_interval", Duration::from_secs(10))?;
73        let stabilisation_delay =
74            parse_duration(props, "stabilisation_delay", Duration::from_secs(1))?;
75        let max_files_per_poll = parse_usize(props, "max_files_per_poll", 100)?;
76        let include_metadata = parse_bool(props, "include_metadata", false)?;
77        let allow_overwrites = parse_bool(props, "allow_overwrites", false)?;
78        let manifest_retention_count = parse_usize(props, "manifest_retention_count", 100_000)?;
79        let manifest_retention_age_days = parse_u64(props, "manifest_retention_age_days", 90)?;
80        let max_file_bytes = parse_usize(props, "max_file_bytes", 256 * 1024 * 1024)?;
81        let glob_pattern = props.get("glob_pattern").cloned();
82
83        // Default delimiter: tab for tsv, comma otherwise.
84        let is_tsv = props
85            .get("format")
86            .is_some_and(|f| f.eq_ignore_ascii_case("tsv"));
87        let csv_delimiter = props
88            .get("csv.delimiter")
89            .and_then(|s| s.as_bytes().first().copied())
90            .unwrap_or(if is_tsv { b'\t' } else { b',' });
91        let csv_has_header = parse_bool(props, "csv.has_header", true)?;
92
93        Ok(Self {
94            path,
95            format,
96            poll_interval,
97            stabilisation_delay,
98            max_files_per_poll,
99            include_metadata,
100            allow_overwrites,
101            manifest_retention_count,
102            manifest_retention_age_days,
103            max_file_bytes,
104            glob_pattern,
105            csv_delimiter,
106            csv_has_header,
107        })
108    }
109}
110
111/// Parsed configuration for [`super::sink::FileSink`].
112#[derive(Debug, Clone)]
113pub struct FileSinkConfig {
114    /// Output directory path.
115    pub path: String,
116
117    /// Output format.
118    pub format: FileFormat,
119
120    /// Write mode.
121    pub mode: SinkMode,
122
123    /// File name prefix for rolling mode.
124    pub prefix: String,
125
126    /// Maximum file size before rotation (row formats only).
127    pub max_file_size: Option<usize>,
128
129    /// Parquet compression codec.
130    pub compression: String,
131}
132
133impl FileSinkConfig {
134    /// Parse from connector properties.
135    ///
136    /// # Errors
137    ///
138    /// Returns `ConnectorError::ConfigurationError` if required options are
139    /// missing or values cannot be parsed.
140    pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
141        let props = config.properties();
142
143        let path = props
144            .get("path")
145            .cloned()
146            .ok_or_else(|| ConnectorError::ConfigurationError("'path' is required".into()))?;
147
148        let format = props
149            .get("format")
150            .ok_or_else(|| {
151                ConnectorError::ConfigurationError("'format' is required for file sink".into())
152            })
153            .and_then(|s| FileFormat::parse(s))?;
154
155        let mode = match props.get("mode").map(String::as_str) {
156            Some("append") => SinkMode::Append,
157            Some("rolling") | None => SinkMode::Rolling,
158            Some(other) => {
159                return Err(ConnectorError::ConfigurationError(format!(
160                    "unknown sink mode: '{other}' (expected 'append' or 'rolling')"
161                )));
162            }
163        };
164
165        let prefix = props
166            .get("prefix")
167            .cloned()
168            .unwrap_or_else(|| "part".to_string());
169
170        let max_file_size = props
171            .get("max_file_size")
172            .map(|s| {
173                s.parse::<usize>().map_err(|e| {
174                    ConnectorError::ConfigurationError(format!("invalid max_file_size: {e}"))
175                })
176            })
177            .transpose()?;
178
179        let compression = props
180            .get("compression")
181            .cloned()
182            .unwrap_or_else(|| "snappy".to_string());
183
184        Ok(Self {
185            path,
186            format,
187            mode,
188            prefix,
189            max_file_size,
190            compression,
191        })
192    }
193}
194
195/// Supported file formats.
196#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum FileFormat {
198    /// Comma-separated values.
199    Csv,
200    /// Newline-delimited JSON.
201    Json,
202    /// Plain text (one line per record).
203    Text,
204    /// Apache Parquet.
205    Parquet,
206}
207
208impl FileFormat {
209    /// Parse a format string.
210    ///
211    /// # Errors
212    ///
213    /// Returns `ConnectorError::ConfigurationError` for unknown formats.
214    pub fn parse(s: &str) -> Result<Self, ConnectorError> {
215        match s.to_lowercase().as_str() {
216            "csv" | "tsv" => Ok(Self::Csv),
217            "json" | "jsonl" | "ndjson" | "json_lines" => Ok(Self::Json),
218            "text" | "txt" | "plain" => Ok(Self::Text),
219            "parquet" | "parq" => Ok(Self::Parquet),
220            other => Err(ConnectorError::ConfigurationError(format!(
221                "unknown file format: '{other}' (expected csv, json, text, or parquet)"
222            ))),
223        }
224    }
225
226    /// Detect format from a file path extension.
227    pub fn from_extension(path: &str) -> Option<Self> {
228        let ext = path.rsplit('.').next()?.to_lowercase();
229        match ext.as_str() {
230            "csv" | "tsv" => Some(Self::Csv),
231            "json" | "jsonl" | "ndjson" => Some(Self::Json),
232            "txt" | "log" => Some(Self::Text),
233            "parquet" | "parq" => Some(Self::Parquet),
234            _ => None,
235        }
236    }
237
238    /// Returns the canonical file extension for this format.
239    #[must_use]
240    pub fn extension(&self) -> &'static str {
241        match self {
242            Self::Csv => "csv",
243            Self::Json => "jsonl",
244            Self::Text => "txt",
245            Self::Parquet => "parquet",
246        }
247    }
248
249    /// Whether this is a columnar/bulk format (cannot be truncated mid-file).
250    #[must_use]
251    pub fn is_bulk_format(&self) -> bool {
252        matches!(self, Self::Parquet)
253    }
254}
255
256/// Sink write mode.
257#[derive(Debug, Clone, Copy, PartialEq, Eq)]
258pub enum SinkMode {
259    /// Append to a single file.
260    Append,
261    /// Create a new file per epoch (with optional mid-epoch rotation for row formats).
262    Rolling,
263}
264
265// ── Helpers ──────────────────────────────────────────────────────────
266
267fn parse_duration(
268    props: &HashMap<String, String>,
269    key: &str,
270    default: Duration,
271) -> Result<Duration, ConnectorError> {
272    match props.get(key) {
273        Some(s) => parse_duration_str(s),
274        None => Ok(default),
275    }
276}
277
278fn parse_duration_str(s: &str) -> Result<Duration, ConnectorError> {
279    // Accept plain seconds or suffixed values (e.g. "10s", "5000ms").
280    if let Some(ms) = s.strip_suffix("ms") {
281        let n: u64 = ms
282            .parse()
283            .map_err(|e| ConnectorError::ConfigurationError(format!("invalid duration: {e}")))?;
284        return Ok(Duration::from_millis(n));
285    }
286    let secs_str = s.strip_suffix('s').unwrap_or(s);
287    let n: u64 = secs_str
288        .parse()
289        .map_err(|e| ConnectorError::ConfigurationError(format!("invalid duration: {e}")))?;
290    Ok(Duration::from_secs(n))
291}
292
293fn parse_usize(
294    props: &HashMap<String, String>,
295    key: &str,
296    default: usize,
297) -> Result<usize, ConnectorError> {
298    match props.get(key) {
299        Some(s) => s
300            .parse()
301            .map_err(|e| ConnectorError::ConfigurationError(format!("invalid {key}: {e}"))),
302        None => Ok(default),
303    }
304}
305
306fn parse_u64(
307    props: &HashMap<String, String>,
308    key: &str,
309    default: u64,
310) -> Result<u64, ConnectorError> {
311    match props.get(key) {
312        Some(s) => s
313            .parse()
314            .map_err(|e| ConnectorError::ConfigurationError(format!("invalid {key}: {e}"))),
315        None => Ok(default),
316    }
317}
318
319fn parse_bool(
320    props: &HashMap<String, String>,
321    key: &str,
322    default: bool,
323) -> Result<bool, ConnectorError> {
324    match props.get(key) {
325        Some(s) => match s.to_lowercase().as_str() {
326            "true" | "1" | "yes" => Ok(true),
327            "false" | "0" | "no" => Ok(false),
328            other => Err(ConnectorError::ConfigurationError(format!(
329                "invalid boolean for {key}: '{other}'"
330            ))),
331        },
332        None => Ok(default),
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn test_file_format_parse() {
342        assert_eq!(FileFormat::parse("csv").unwrap(), FileFormat::Csv);
343        assert_eq!(FileFormat::parse("JSON").unwrap(), FileFormat::Json);
344        assert_eq!(FileFormat::parse("jsonl").unwrap(), FileFormat::Json);
345        assert_eq!(FileFormat::parse("ndjson").unwrap(), FileFormat::Json);
346        assert_eq!(FileFormat::parse("text").unwrap(), FileFormat::Text);
347        assert_eq!(FileFormat::parse("parquet").unwrap(), FileFormat::Parquet);
348        assert_eq!(FileFormat::parse("parq").unwrap(), FileFormat::Parquet);
349        assert!(FileFormat::parse("xml").is_err());
350    }
351
352    #[test]
353    fn test_file_format_from_extension() {
354        assert_eq!(
355            FileFormat::from_extension("/data/logs/app.csv"),
356            Some(FileFormat::Csv)
357        );
358        assert_eq!(
359            FileFormat::from_extension("events.jsonl"),
360            Some(FileFormat::Json)
361        );
362        assert_eq!(
363            FileFormat::from_extension("data.parquet"),
364            Some(FileFormat::Parquet)
365        );
366        assert_eq!(
367            FileFormat::from_extension("log.txt"),
368            Some(FileFormat::Text)
369        );
370        assert_eq!(FileFormat::from_extension("file.bin"), None);
371    }
372
373    #[test]
374    fn test_file_format_extension() {
375        assert_eq!(FileFormat::Csv.extension(), "csv");
376        assert_eq!(FileFormat::Json.extension(), "jsonl");
377        assert_eq!(FileFormat::Text.extension(), "txt");
378        assert_eq!(FileFormat::Parquet.extension(), "parquet");
379    }
380
381    #[test]
382    fn test_file_format_is_bulk() {
383        assert!(!FileFormat::Csv.is_bulk_format());
384        assert!(!FileFormat::Json.is_bulk_format());
385        assert!(!FileFormat::Text.is_bulk_format());
386        assert!(FileFormat::Parquet.is_bulk_format());
387    }
388
389    #[test]
390    fn test_source_config_from_connector() {
391        let mut config = ConnectorConfig::new("files");
392        config.set("path", "/data/logs/*.csv");
393        config.set("format", "csv");
394        config.set("max_files_per_poll", "50");
395        config.set("include_metadata", "true");
396
397        let src = FileSourceConfig::from_connector_config(&config).unwrap();
398        assert_eq!(src.path, "/data/logs/*.csv");
399        assert_eq!(src.format, Some(FileFormat::Csv));
400        assert_eq!(src.max_files_per_poll, 50);
401        assert!(src.include_metadata);
402        assert!(!src.allow_overwrites);
403    }
404
405    #[test]
406    fn test_source_config_missing_path() {
407        let config = ConnectorConfig::new("files");
408        assert!(FileSourceConfig::from_connector_config(&config).is_err());
409    }
410
411    #[test]
412    fn test_sink_config_from_connector() {
413        let mut config = ConnectorConfig::new("files");
414        config.set("path", "/output");
415        config.set("format", "parquet");
416        config.set("mode", "rolling");
417        config.set("compression", "zstd");
418
419        let sink = FileSinkConfig::from_connector_config(&config).unwrap();
420        assert_eq!(sink.path, "/output");
421        assert_eq!(sink.format, FileFormat::Parquet);
422        assert_eq!(sink.mode, SinkMode::Rolling);
423        assert_eq!(sink.compression, "zstd");
424    }
425
426    #[test]
427    fn test_sink_config_missing_format() {
428        let mut config = ConnectorConfig::new("files");
429        config.set("path", "/output");
430        assert!(FileSinkConfig::from_connector_config(&config).is_err());
431    }
432
433    #[test]
434    fn test_parse_duration_str() {
435        assert_eq!(parse_duration_str("10").unwrap(), Duration::from_secs(10));
436        assert_eq!(parse_duration_str("10s").unwrap(), Duration::from_secs(10));
437        assert_eq!(
438            parse_duration_str("500ms").unwrap(),
439            Duration::from_millis(500)
440        );
441    }
442
443    #[test]
444    fn test_sink_mode_default() {
445        let mut config = ConnectorConfig::new("files");
446        config.set("path", "/output");
447        config.set("format", "csv");
448        let sink = FileSinkConfig::from_connector_config(&config).unwrap();
449        assert_eq!(sink.mode, SinkMode::Rolling);
450    }
451}