1use std::collections::HashMap;
4use std::time::Duration;
5
6use crate::config::ConnectorConfig;
7use crate::error::ConnectorError;
8use laminar_core::time::parse_duration_str;
9
10#[derive(Debug, Clone)]
12pub struct FileSourceConfig {
13 pub path: String,
15
16 pub format: Option<FileFormat>,
18
19 pub poll_interval: Duration,
21
22 pub stabilisation_delay: Duration,
24
25 pub max_files_per_poll: usize,
27
28 pub include_metadata: bool,
30
31 pub allow_overwrites: bool,
33
34 pub manifest_retention_count: usize,
36
37 pub manifest_retention_age_days: u64,
39
40 pub max_file_bytes: usize,
42
43 pub glob_pattern: Option<String>,
45
46 pub csv_delimiter: u8,
48
49 pub csv_has_header: bool,
51}
52
53impl FileSourceConfig {
54 pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
61 let props = config.properties();
62
63 let path = props
64 .get("path")
65 .cloned()
66 .ok_or_else(|| ConnectorError::ConfigurationError("'path' is required".into()))?;
67
68 let format = props
69 .get("format")
70 .map(|s| FileFormat::parse(s))
71 .transpose()?;
72
73 let poll_interval = parse_duration(props, "poll_interval", Duration::from_secs(10))?;
74 let stabilisation_delay =
75 parse_duration(props, "stabilisation_delay", Duration::from_secs(1))?;
76 let max_files_per_poll = parse_usize(props, "max_files_per_poll", 100)?;
77 let include_metadata = parse_bool(props, "include_metadata", false)?;
78 let allow_overwrites = parse_bool(props, "allow_overwrites", false)?;
79 let manifest_retention_count = parse_usize(props, "manifest_retention_count", 100_000)?;
80 let manifest_retention_age_days = parse_u64(props, "manifest_retention_age_days", 90)?;
81 let max_file_bytes = parse_usize(props, "max_file_bytes", 256 * 1024 * 1024)?;
82 let glob_pattern = props.get("glob_pattern").cloned();
83
84 let is_tsv = props
86 .get("format")
87 .is_some_and(|f| f.eq_ignore_ascii_case("tsv"));
88 let csv_delimiter = props
89 .get("csv.delimiter")
90 .and_then(|s| s.as_bytes().first().copied())
91 .unwrap_or(if is_tsv { b'\t' } else { b',' });
92 let csv_has_header = parse_bool(props, "csv.has_header", true)?;
93
94 Ok(Self {
95 path,
96 format,
97 poll_interval,
98 stabilisation_delay,
99 max_files_per_poll,
100 include_metadata,
101 allow_overwrites,
102 manifest_retention_count,
103 manifest_retention_age_days,
104 max_file_bytes,
105 glob_pattern,
106 csv_delimiter,
107 csv_has_header,
108 })
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct FileSinkConfig {
115 pub path: String,
117
118 pub format: FileFormat,
120
121 pub mode: SinkMode,
123
124 pub prefix: String,
126
127 pub max_file_size: Option<usize>,
129
130 pub compression: String,
132
133 pub max_epoch_batches: usize,
135}
136
137impl FileSinkConfig {
138 pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
145 let props = config.properties();
146
147 let path = props
148 .get("path")
149 .cloned()
150 .ok_or_else(|| ConnectorError::ConfigurationError("'path' is required".into()))?;
151
152 let format = props
153 .get("format")
154 .ok_or_else(|| {
155 ConnectorError::ConfigurationError("'format' is required for file sink".into())
156 })
157 .and_then(|s| FileFormat::parse(s))?;
158
159 let mode = match props.get("mode").map(String::as_str) {
160 Some("append") => SinkMode::Append,
161 Some("rolling") | None => SinkMode::Rolling,
162 Some(other) => {
163 return Err(ConnectorError::ConfigurationError(format!(
164 "unknown sink mode: '{other}' (expected 'append' or 'rolling')"
165 )));
166 }
167 };
168
169 let prefix = props
170 .get("prefix")
171 .cloned()
172 .unwrap_or_else(|| "part".to_string());
173
174 let max_file_size = props
175 .get("max_file_size")
176 .map(|s| {
177 s.parse::<usize>().map_err(|e| {
178 ConnectorError::ConfigurationError(format!("invalid max_file_size: {e}"))
179 })
180 })
181 .transpose()?;
182
183 let compression = props
184 .get("compression")
185 .cloned()
186 .unwrap_or_else(|| "snappy".to_string());
187
188 let max_epoch_batches = props
189 .get("max_epoch_batches")
190 .map(|s| {
191 s.parse::<usize>().map_err(|e| {
192 ConnectorError::ConfigurationError(format!("invalid max_epoch_batches: {e}"))
193 })
194 })
195 .transpose()?
196 .unwrap_or(10_000);
197 if max_epoch_batches == 0 {
198 return Err(ConnectorError::ConfigurationError(
199 "max_epoch_batches must be > 0".into(),
200 ));
201 }
202
203 Ok(Self {
204 path,
205 format,
206 mode,
207 prefix,
208 max_file_size,
209 compression,
210 max_epoch_batches,
211 })
212 }
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217pub enum FileFormat {
218 Csv,
220 Json,
222 Text,
224 Parquet,
226 ArrowIpc,
228}
229
230impl FileFormat {
231 pub fn parse(s: &str) -> Result<Self, ConnectorError> {
237 match s.to_lowercase().as_str() {
238 "csv" | "tsv" => Ok(Self::Csv),
239 "json" | "jsonl" | "ndjson" | "json_lines" => Ok(Self::Json),
240 "text" | "txt" | "plain" => Ok(Self::Text),
241 "parquet" | "parq" => Ok(Self::Parquet),
242 "arrow" | "ipc" | "arrow_ipc" => Ok(Self::ArrowIpc),
243 other => Err(ConnectorError::ConfigurationError(format!(
244 "unknown file format: '{other}' (expected csv, json, text, parquet, or arrow)"
245 ))),
246 }
247 }
248
249 pub fn from_extension(path: &str) -> Option<Self> {
251 let ext = path.rsplit('.').next()?.to_lowercase();
252 match ext.as_str() {
253 "csv" | "tsv" => Some(Self::Csv),
254 "json" | "jsonl" | "ndjson" => Some(Self::Json),
255 "txt" | "log" => Some(Self::Text),
256 "parquet" | "parq" => Some(Self::Parquet),
257 "arrow" | "ipc" => Some(Self::ArrowIpc),
258 _ => None,
259 }
260 }
261
262 #[must_use]
264 pub fn extension(&self) -> &'static str {
265 match self {
266 Self::Csv => "csv",
267 Self::Json => "jsonl",
268 Self::Text => "txt",
269 Self::Parquet => "parquet",
270 Self::ArrowIpc => "arrow",
271 }
272 }
273
274 #[must_use]
276 pub fn is_bulk_format(&self) -> bool {
277 matches!(self, Self::Parquet | Self::ArrowIpc)
278 }
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283pub enum SinkMode {
284 Append,
286 Rolling,
288}
289
290fn parse_duration(
293 props: &HashMap<String, String>,
294 key: &str,
295 default: Duration,
296) -> Result<Duration, ConnectorError> {
297 match props.get(key) {
298 Some(s) => parse_duration_str(s).ok_or_else(|| {
299 ConnectorError::ConfigurationError(format!("invalid duration for {key}: '{s}'"))
300 }),
301 None => Ok(default),
302 }
303}
304
305fn parse_usize(
306 props: &HashMap<String, String>,
307 key: &str,
308 default: usize,
309) -> Result<usize, ConnectorError> {
310 match props.get(key) {
311 Some(s) => s
312 .parse()
313 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid {key}: {e}"))),
314 None => Ok(default),
315 }
316}
317
318fn parse_u64(
319 props: &HashMap<String, String>,
320 key: &str,
321 default: u64,
322) -> Result<u64, ConnectorError> {
323 match props.get(key) {
324 Some(s) => s
325 .parse()
326 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid {key}: {e}"))),
327 None => Ok(default),
328 }
329}
330
331fn parse_bool(
332 props: &HashMap<String, String>,
333 key: &str,
334 default: bool,
335) -> Result<bool, ConnectorError> {
336 match props.get(key) {
337 Some(s) => match s.to_lowercase().as_str() {
338 "true" | "1" | "yes" => Ok(true),
339 "false" | "0" | "no" => Ok(false),
340 other => Err(ConnectorError::ConfigurationError(format!(
341 "invalid boolean for {key}: '{other}'"
342 ))),
343 },
344 None => Ok(default),
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 #[test]
353 fn test_file_format_parse() {
354 assert_eq!(FileFormat::parse("csv").unwrap(), FileFormat::Csv);
355 assert_eq!(FileFormat::parse("JSON").unwrap(), FileFormat::Json);
356 assert_eq!(FileFormat::parse("jsonl").unwrap(), FileFormat::Json);
357 assert_eq!(FileFormat::parse("ndjson").unwrap(), FileFormat::Json);
358 assert_eq!(FileFormat::parse("text").unwrap(), FileFormat::Text);
359 assert_eq!(FileFormat::parse("parquet").unwrap(), FileFormat::Parquet);
360 assert_eq!(FileFormat::parse("parq").unwrap(), FileFormat::Parquet);
361 assert_eq!(FileFormat::parse("arrow").unwrap(), FileFormat::ArrowIpc);
362 assert_eq!(FileFormat::parse("ipc").unwrap(), FileFormat::ArrowIpc);
363 assert_eq!(
364 FileFormat::parse("arrow_ipc").unwrap(),
365 FileFormat::ArrowIpc
366 );
367 assert!(FileFormat::parse("xml").is_err());
368 }
369
370 #[test]
371 fn test_file_format_from_extension() {
372 assert_eq!(
373 FileFormat::from_extension("/data/logs/app.csv"),
374 Some(FileFormat::Csv)
375 );
376 assert_eq!(
377 FileFormat::from_extension("events.jsonl"),
378 Some(FileFormat::Json)
379 );
380 assert_eq!(
381 FileFormat::from_extension("data.parquet"),
382 Some(FileFormat::Parquet)
383 );
384 assert_eq!(
385 FileFormat::from_extension("log.txt"),
386 Some(FileFormat::Text)
387 );
388 assert_eq!(
389 FileFormat::from_extension("data.arrow"),
390 Some(FileFormat::ArrowIpc)
391 );
392 assert_eq!(
393 FileFormat::from_extension("stream.ipc"),
394 Some(FileFormat::ArrowIpc)
395 );
396 assert_eq!(FileFormat::from_extension("file.bin"), None);
397 }
398
399 #[test]
400 fn test_file_format_extension() {
401 assert_eq!(FileFormat::Csv.extension(), "csv");
402 assert_eq!(FileFormat::Json.extension(), "jsonl");
403 assert_eq!(FileFormat::Text.extension(), "txt");
404 assert_eq!(FileFormat::Parquet.extension(), "parquet");
405 assert_eq!(FileFormat::ArrowIpc.extension(), "arrow");
406 }
407
408 #[test]
409 fn test_file_format_is_bulk() {
410 assert!(!FileFormat::Csv.is_bulk_format());
411 assert!(!FileFormat::Json.is_bulk_format());
412 assert!(!FileFormat::Text.is_bulk_format());
413 assert!(FileFormat::ArrowIpc.is_bulk_format());
414 assert!(FileFormat::Parquet.is_bulk_format());
415 }
416
417 #[test]
418 fn test_source_config_from_connector() {
419 let mut config = ConnectorConfig::new("files");
420 config.set("path", "/data/logs/*.csv");
421 config.set("format", "csv");
422 config.set("max_files_per_poll", "50");
423 config.set("include_metadata", "true");
424
425 let src = FileSourceConfig::from_connector_config(&config).unwrap();
426 assert_eq!(src.path, "/data/logs/*.csv");
427 assert_eq!(src.format, Some(FileFormat::Csv));
428 assert_eq!(src.max_files_per_poll, 50);
429 assert!(src.include_metadata);
430 assert!(!src.allow_overwrites);
431 }
432
433 #[test]
434 fn test_source_config_missing_path() {
435 let config = ConnectorConfig::new("files");
436 assert!(FileSourceConfig::from_connector_config(&config).is_err());
437 }
438
439 #[test]
440 fn test_sink_config_from_connector() {
441 let mut config = ConnectorConfig::new("files");
442 config.set("path", "/output");
443 config.set("format", "parquet");
444 config.set("mode", "rolling");
445 config.set("compression", "zstd");
446
447 let sink = FileSinkConfig::from_connector_config(&config).unwrap();
448 assert_eq!(sink.path, "/output");
449 assert_eq!(sink.format, FileFormat::Parquet);
450 assert_eq!(sink.mode, SinkMode::Rolling);
451 assert_eq!(sink.compression, "zstd");
452 }
453
454 #[test]
455 fn test_sink_config_missing_format() {
456 let mut config = ConnectorConfig::new("files");
457 config.set("path", "/output");
458 assert!(FileSinkConfig::from_connector_config(&config).is_err());
459 }
460
461 #[test]
462 fn test_parse_duration_str() {
463 assert_eq!(parse_duration_str("10").unwrap(), Duration::from_secs(10));
464 assert_eq!(parse_duration_str("10s").unwrap(), Duration::from_secs(10));
465 assert_eq!(
466 parse_duration_str("500ms").unwrap(),
467 Duration::from_millis(500)
468 );
469 }
470
471 #[test]
472 fn test_sink_mode_default() {
473 let mut config = ConnectorConfig::new("files");
474 config.set("path", "/output");
475 config.set("format", "csv");
476 let sink = FileSinkConfig::from_connector_config(&config).unwrap();
477 assert_eq!(sink.mode, SinkMode::Rolling);
478 }
479}