1use std::collections::HashMap;
4use std::time::Duration;
5
6use crate::config::ConnectorConfig;
7use crate::error::ConnectorError;
8
9#[derive(Debug, Clone)]
11pub struct FileSourceConfig {
12 pub path: String,
14
15 pub format: Option<FileFormat>,
17
18 pub poll_interval: Duration,
20
21 pub stabilisation_delay: Duration,
23
24 pub max_files_per_poll: usize,
26
27 pub include_metadata: bool,
29
30 pub allow_overwrites: bool,
32
33 pub manifest_retention_count: usize,
35
36 pub manifest_retention_age_days: u64,
38
39 pub max_file_bytes: usize,
41
42 pub glob_pattern: Option<String>,
44
45 pub csv_delimiter: u8,
47
48 pub csv_has_header: bool,
50}
51
52impl FileSourceConfig {
53 pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
60 let props = config.properties();
61
62 let path = props
63 .get("path")
64 .cloned()
65 .ok_or_else(|| ConnectorError::ConfigurationError("'path' is required".into()))?;
66
67 let format = props
68 .get("format")
69 .map(|s| FileFormat::parse(s))
70 .transpose()?;
71
72 let poll_interval = parse_duration(props, "poll_interval", Duration::from_secs(10))?;
73 let stabilisation_delay =
74 parse_duration(props, "stabilisation_delay", Duration::from_secs(1))?;
75 let max_files_per_poll = parse_usize(props, "max_files_per_poll", 100)?;
76 let include_metadata = parse_bool(props, "include_metadata", false)?;
77 let allow_overwrites = parse_bool(props, "allow_overwrites", false)?;
78 let manifest_retention_count = parse_usize(props, "manifest_retention_count", 100_000)?;
79 let manifest_retention_age_days = parse_u64(props, "manifest_retention_age_days", 90)?;
80 let max_file_bytes = parse_usize(props, "max_file_bytes", 256 * 1024 * 1024)?;
81 let glob_pattern = props.get("glob_pattern").cloned();
82
83 let is_tsv = props
85 .get("format")
86 .is_some_and(|f| f.eq_ignore_ascii_case("tsv"));
87 let csv_delimiter = props
88 .get("csv.delimiter")
89 .and_then(|s| s.as_bytes().first().copied())
90 .unwrap_or(if is_tsv { b'\t' } else { b',' });
91 let csv_has_header = parse_bool(props, "csv.has_header", true)?;
92
93 Ok(Self {
94 path,
95 format,
96 poll_interval,
97 stabilisation_delay,
98 max_files_per_poll,
99 include_metadata,
100 allow_overwrites,
101 manifest_retention_count,
102 manifest_retention_age_days,
103 max_file_bytes,
104 glob_pattern,
105 csv_delimiter,
106 csv_has_header,
107 })
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct FileSinkConfig {
114 pub path: String,
116
117 pub format: FileFormat,
119
120 pub mode: SinkMode,
122
123 pub prefix: String,
125
126 pub max_file_size: Option<usize>,
128
129 pub compression: String,
131}
132
133impl FileSinkConfig {
134 pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
141 let props = config.properties();
142
143 let path = props
144 .get("path")
145 .cloned()
146 .ok_or_else(|| ConnectorError::ConfigurationError("'path' is required".into()))?;
147
148 let format = props
149 .get("format")
150 .ok_or_else(|| {
151 ConnectorError::ConfigurationError("'format' is required for file sink".into())
152 })
153 .and_then(|s| FileFormat::parse(s))?;
154
155 let mode = match props.get("mode").map(String::as_str) {
156 Some("append") => SinkMode::Append,
157 Some("rolling") | None => SinkMode::Rolling,
158 Some(other) => {
159 return Err(ConnectorError::ConfigurationError(format!(
160 "unknown sink mode: '{other}' (expected 'append' or 'rolling')"
161 )));
162 }
163 };
164
165 let prefix = props
166 .get("prefix")
167 .cloned()
168 .unwrap_or_else(|| "part".to_string());
169
170 let max_file_size = props
171 .get("max_file_size")
172 .map(|s| {
173 s.parse::<usize>().map_err(|e| {
174 ConnectorError::ConfigurationError(format!("invalid max_file_size: {e}"))
175 })
176 })
177 .transpose()?;
178
179 let compression = props
180 .get("compression")
181 .cloned()
182 .unwrap_or_else(|| "snappy".to_string());
183
184 Ok(Self {
185 path,
186 format,
187 mode,
188 prefix,
189 max_file_size,
190 compression,
191 })
192 }
193}
194
195#[derive(Debug, Clone, Copy, PartialEq, Eq)]
197pub enum FileFormat {
198 Csv,
200 Json,
202 Text,
204 Parquet,
206}
207
208impl FileFormat {
209 pub fn parse(s: &str) -> Result<Self, ConnectorError> {
215 match s.to_lowercase().as_str() {
216 "csv" | "tsv" => Ok(Self::Csv),
217 "json" | "jsonl" | "ndjson" | "json_lines" => Ok(Self::Json),
218 "text" | "txt" | "plain" => Ok(Self::Text),
219 "parquet" | "parq" => Ok(Self::Parquet),
220 other => Err(ConnectorError::ConfigurationError(format!(
221 "unknown file format: '{other}' (expected csv, json, text, or parquet)"
222 ))),
223 }
224 }
225
226 pub fn from_extension(path: &str) -> Option<Self> {
228 let ext = path.rsplit('.').next()?.to_lowercase();
229 match ext.as_str() {
230 "csv" | "tsv" => Some(Self::Csv),
231 "json" | "jsonl" | "ndjson" => Some(Self::Json),
232 "txt" | "log" => Some(Self::Text),
233 "parquet" | "parq" => Some(Self::Parquet),
234 _ => None,
235 }
236 }
237
238 #[must_use]
240 pub fn extension(&self) -> &'static str {
241 match self {
242 Self::Csv => "csv",
243 Self::Json => "jsonl",
244 Self::Text => "txt",
245 Self::Parquet => "parquet",
246 }
247 }
248
249 #[must_use]
251 pub fn is_bulk_format(&self) -> bool {
252 matches!(self, Self::Parquet)
253 }
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
258pub enum SinkMode {
259 Append,
261 Rolling,
263}
264
265fn parse_duration(
268 props: &HashMap<String, String>,
269 key: &str,
270 default: Duration,
271) -> Result<Duration, ConnectorError> {
272 match props.get(key) {
273 Some(s) => parse_duration_str(s),
274 None => Ok(default),
275 }
276}
277
278fn parse_duration_str(s: &str) -> Result<Duration, ConnectorError> {
279 if let Some(ms) = s.strip_suffix("ms") {
281 let n: u64 = ms
282 .parse()
283 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid duration: {e}")))?;
284 return Ok(Duration::from_millis(n));
285 }
286 let secs_str = s.strip_suffix('s').unwrap_or(s);
287 let n: u64 = secs_str
288 .parse()
289 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid duration: {e}")))?;
290 Ok(Duration::from_secs(n))
291}
292
293fn parse_usize(
294 props: &HashMap<String, String>,
295 key: &str,
296 default: usize,
297) -> Result<usize, ConnectorError> {
298 match props.get(key) {
299 Some(s) => s
300 .parse()
301 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid {key}: {e}"))),
302 None => Ok(default),
303 }
304}
305
306fn parse_u64(
307 props: &HashMap<String, String>,
308 key: &str,
309 default: u64,
310) -> Result<u64, ConnectorError> {
311 match props.get(key) {
312 Some(s) => s
313 .parse()
314 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid {key}: {e}"))),
315 None => Ok(default),
316 }
317}
318
319fn parse_bool(
320 props: &HashMap<String, String>,
321 key: &str,
322 default: bool,
323) -> Result<bool, ConnectorError> {
324 match props.get(key) {
325 Some(s) => match s.to_lowercase().as_str() {
326 "true" | "1" | "yes" => Ok(true),
327 "false" | "0" | "no" => Ok(false),
328 other => Err(ConnectorError::ConfigurationError(format!(
329 "invalid boolean for {key}: '{other}'"
330 ))),
331 },
332 None => Ok(default),
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn test_file_format_parse() {
342 assert_eq!(FileFormat::parse("csv").unwrap(), FileFormat::Csv);
343 assert_eq!(FileFormat::parse("JSON").unwrap(), FileFormat::Json);
344 assert_eq!(FileFormat::parse("jsonl").unwrap(), FileFormat::Json);
345 assert_eq!(FileFormat::parse("ndjson").unwrap(), FileFormat::Json);
346 assert_eq!(FileFormat::parse("text").unwrap(), FileFormat::Text);
347 assert_eq!(FileFormat::parse("parquet").unwrap(), FileFormat::Parquet);
348 assert_eq!(FileFormat::parse("parq").unwrap(), FileFormat::Parquet);
349 assert!(FileFormat::parse("xml").is_err());
350 }
351
352 #[test]
353 fn test_file_format_from_extension() {
354 assert_eq!(
355 FileFormat::from_extension("/data/logs/app.csv"),
356 Some(FileFormat::Csv)
357 );
358 assert_eq!(
359 FileFormat::from_extension("events.jsonl"),
360 Some(FileFormat::Json)
361 );
362 assert_eq!(
363 FileFormat::from_extension("data.parquet"),
364 Some(FileFormat::Parquet)
365 );
366 assert_eq!(
367 FileFormat::from_extension("log.txt"),
368 Some(FileFormat::Text)
369 );
370 assert_eq!(FileFormat::from_extension("file.bin"), None);
371 }
372
373 #[test]
374 fn test_file_format_extension() {
375 assert_eq!(FileFormat::Csv.extension(), "csv");
376 assert_eq!(FileFormat::Json.extension(), "jsonl");
377 assert_eq!(FileFormat::Text.extension(), "txt");
378 assert_eq!(FileFormat::Parquet.extension(), "parquet");
379 }
380
381 #[test]
382 fn test_file_format_is_bulk() {
383 assert!(!FileFormat::Csv.is_bulk_format());
384 assert!(!FileFormat::Json.is_bulk_format());
385 assert!(!FileFormat::Text.is_bulk_format());
386 assert!(FileFormat::Parquet.is_bulk_format());
387 }
388
389 #[test]
390 fn test_source_config_from_connector() {
391 let mut config = ConnectorConfig::new("files");
392 config.set("path", "/data/logs/*.csv");
393 config.set("format", "csv");
394 config.set("max_files_per_poll", "50");
395 config.set("include_metadata", "true");
396
397 let src = FileSourceConfig::from_connector_config(&config).unwrap();
398 assert_eq!(src.path, "/data/logs/*.csv");
399 assert_eq!(src.format, Some(FileFormat::Csv));
400 assert_eq!(src.max_files_per_poll, 50);
401 assert!(src.include_metadata);
402 assert!(!src.allow_overwrites);
403 }
404
405 #[test]
406 fn test_source_config_missing_path() {
407 let config = ConnectorConfig::new("files");
408 assert!(FileSourceConfig::from_connector_config(&config).is_err());
409 }
410
411 #[test]
412 fn test_sink_config_from_connector() {
413 let mut config = ConnectorConfig::new("files");
414 config.set("path", "/output");
415 config.set("format", "parquet");
416 config.set("mode", "rolling");
417 config.set("compression", "zstd");
418
419 let sink = FileSinkConfig::from_connector_config(&config).unwrap();
420 assert_eq!(sink.path, "/output");
421 assert_eq!(sink.format, FileFormat::Parquet);
422 assert_eq!(sink.mode, SinkMode::Rolling);
423 assert_eq!(sink.compression, "zstd");
424 }
425
426 #[test]
427 fn test_sink_config_missing_format() {
428 let mut config = ConnectorConfig::new("files");
429 config.set("path", "/output");
430 assert!(FileSinkConfig::from_connector_config(&config).is_err());
431 }
432
433 #[test]
434 fn test_parse_duration_str() {
435 assert_eq!(parse_duration_str("10").unwrap(), Duration::from_secs(10));
436 assert_eq!(parse_duration_str("10s").unwrap(), Duration::from_secs(10));
437 assert_eq!(
438 parse_duration_str("500ms").unwrap(),
439 Duration::from_millis(500)
440 );
441 }
442
443 #[test]
444 fn test_sink_mode_default() {
445 let mut config = ConnectorConfig::new("files");
446 config.set("path", "/output");
447 config.set("format", "csv");
448 let sink = FileSinkConfig::from_connector_config(&config).unwrap();
449 assert_eq!(sink.mode, SinkMode::Rolling);
450 }
451}