laminar_connectors/lakehouse/
delta_source_config.rs1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
6use std::fmt;
7use std::str::FromStr;
8use std::time::Duration;
9
10use crate::config::ConnectorConfig;
11use crate::error::ConnectorError;
12use crate::storage::{
13 CloudConfigValidator, ResolvedStorageOptions, SecretMasker, StorageCredentialResolver,
14 StorageProvider,
15};
16
17use super::delta_config::DeltaCatalogType;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub enum DeltaReadMode {
24 Snapshot,
29 #[default]
34 Incremental,
35}
36
37impl FromStr for DeltaReadMode {
38 type Err = String;
39
40 fn from_str(s: &str) -> Result<Self, Self::Err> {
41 match s.to_lowercase().as_str() {
42 "snapshot" | "batch" => Ok(Self::Snapshot),
43 "incremental" | "streaming" | "stream" => Ok(Self::Incremental),
44 other => Err(format!("unknown read mode: '{other}'")),
45 }
46 }
47}
48
49impl fmt::Display for DeltaReadMode {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 match self {
52 Self::Snapshot => write!(f, "snapshot"),
53 Self::Incremental => write!(f, "incremental"),
54 }
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
60pub enum SchemaEvolutionAction {
61 #[default]
63 Warn,
64 Error,
66}
67
68impl FromStr for SchemaEvolutionAction {
69 type Err = String;
70
71 fn from_str(s: &str) -> Result<Self, Self::Err> {
72 match s.to_lowercase().as_str() {
73 "warn" | "warning" => Ok(Self::Warn),
74 "error" | "fail" => Ok(Self::Error),
75 other => Err(format!("unknown schema evolution action: '{other}'")),
76 }
77 }
78}
79
80impl fmt::Display for SchemaEvolutionAction {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 match self {
83 Self::Warn => write!(f, "warn"),
84 Self::Error => write!(f, "error"),
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
93pub struct DeltaSourceConfig {
94 pub table_path: String,
96
97 pub starting_version: Option<i64>,
99
100 pub poll_interval: Duration,
102
103 pub read_mode: DeltaReadMode,
105
106 pub partition_filter: Option<String>,
108
109 pub schema_evolution_action: SchemaEvolutionAction,
111
112 pub cdf_enabled: bool,
114
115 pub storage_options: HashMap<String, String>,
117
118 pub catalog_type: DeltaCatalogType,
120
121 pub catalog_database: Option<String>,
123
124 pub catalog_name: Option<String>,
126
127 pub catalog_schema: Option<String>,
129}
130
131impl Default for DeltaSourceConfig {
132 fn default() -> Self {
133 Self {
134 table_path: String::new(),
135 starting_version: None,
136 poll_interval: Duration::from_secs(1),
137 read_mode: DeltaReadMode::default(),
138 partition_filter: None,
139 schema_evolution_action: SchemaEvolutionAction::default(),
140 cdf_enabled: false,
141 storage_options: HashMap::new(),
142 catalog_type: DeltaCatalogType::None,
143 catalog_database: None,
144 catalog_name: None,
145 catalog_schema: None,
146 }
147 }
148}
149
150impl DeltaSourceConfig {
151 #[must_use]
153 pub fn new(table_path: &str) -> Self {
154 Self {
155 table_path: table_path.to_string(),
156 ..Default::default()
157 }
158 }
159
160 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
171 let mut cfg = Self {
172 table_path: config.require("table.path")?.to_string(),
173 ..Self::default()
174 };
175
176 if let Some(v) = config.get("starting.version") {
177 cfg.starting_version = Some(v.parse().map_err(|_| {
178 ConnectorError::ConfigurationError(format!("invalid starting.version: '{v}'"))
179 })?);
180 }
181 if let Some(v) = config.get("poll.interval.ms") {
182 let ms: u64 = v.parse().map_err(|_| {
183 ConnectorError::ConfigurationError(format!("invalid poll.interval.ms: '{v}'"))
184 })?;
185 cfg.poll_interval = Duration::from_millis(ms);
186 }
187 if let Some(v) = config.get("read.mode") {
188 cfg.read_mode = v.parse().map_err(|_| {
189 ConnectorError::ConfigurationError(format!(
190 "invalid read.mode: '{v}' (expected 'snapshot' or 'incremental')"
191 ))
192 })?;
193 }
194 if let Some(v) = config.get("partition.filter") {
195 let trimmed = v.trim();
196 if !trimmed.is_empty() {
197 cfg.partition_filter = Some(trimmed.to_string());
198 }
199 }
200 if let Some(v) = config.get("cdf.enabled") {
201 cfg.cdf_enabled = v.eq_ignore_ascii_case("true");
202 }
203 if let Some(v) = config.get("schema.evolution.action") {
204 cfg.schema_evolution_action = v.parse().map_err(|_| {
205 ConnectorError::ConfigurationError(format!(
206 "invalid schema.evolution.action: '{v}' (expected 'warn' or 'error')"
207 ))
208 })?;
209 }
210
211 if let Some(v) = config.get("catalog.type") {
213 cfg.catalog_type = v.parse().map_err(|_| {
214 ConnectorError::ConfigurationError(format!(
215 "invalid catalog.type: '{v}' (expected 'none', 'glue', or 'unity')"
216 ))
217 })?;
218 }
219 if let Some(v) = config.get("catalog.database") {
220 cfg.catalog_database = Some(v.to_string());
221 }
222 if let Some(v) = config.get("catalog.name") {
223 cfg.catalog_name = Some(v.to_string());
224 }
225 if let Some(v) = config.get("catalog.schema") {
226 cfg.catalog_schema = Some(v.to_string());
227 }
228 if let DeltaCatalogType::Unity {
229 ref mut workspace_url,
230 ref mut access_token,
231 } = cfg.catalog_type
232 {
233 if let Some(v) = config.get("catalog.workspace_url") {
234 *workspace_url = v.to_string();
235 }
236 if let Some(v) = config.get("catalog.access_token") {
237 *access_token = v.to_string();
238 }
239 }
240 let explicit_storage = config.properties_with_prefix("storage.");
242 let resolved = StorageCredentialResolver::resolve(&cfg.table_path, &explicit_storage);
243 cfg.storage_options = resolved.options;
244
245 if let Some(v) = config.get("storage.s3_locking_provider") {
247 cfg.storage_options
248 .insert("AWS_S3_LOCKING_PROVIDER".to_string(), v.to_string());
249 }
250 if let Some(v) = config.get("storage.dynamodb_table_name") {
251 cfg.storage_options
252 .insert("DELTA_DYNAMO_TABLE_NAME".to_string(), v.to_string());
253 }
254
255 cfg.validate()?;
256 Ok(cfg)
257 }
258
259 #[must_use]
261 pub fn display_storage_options(&self) -> String {
262 SecretMasker::display_map(&self.storage_options)
263 }
264
265 pub fn validate(&self) -> Result<(), ConnectorError> {
271 if self.table_path.is_empty() {
272 return Err(ConnectorError::missing_config("table.path"));
273 }
274
275 match &self.catalog_type {
277 DeltaCatalogType::None => {}
278 DeltaCatalogType::Glue => {
279 if self.catalog_database.is_none() {
280 return Err(ConnectorError::ConfigurationError(
281 "Glue catalog requires 'catalog.database' to be set".into(),
282 ));
283 }
284 }
285 DeltaCatalogType::Unity {
286 workspace_url,
287 access_token,
288 } => {
289 if workspace_url.is_empty() {
290 return Err(ConnectorError::ConfigurationError(
291 "Unity catalog requires 'catalog.workspace_url' to be set".into(),
292 ));
293 }
294 if access_token.is_empty() {
295 return Err(ConnectorError::ConfigurationError(
296 "Unity catalog requires 'catalog.access_token' to be set".into(),
297 ));
298 }
299 if self.catalog_name.is_none() {
300 return Err(ConnectorError::ConfigurationError(
301 "Unity catalog requires 'catalog.name' to be set".into(),
302 ));
303 }
304 if self.catalog_schema.is_none() {
305 return Err(ConnectorError::ConfigurationError(
306 "Unity catalog requires 'catalog.schema' to be set".into(),
307 ));
308 }
309 }
310 }
311
312 let resolved = ResolvedStorageOptions {
314 provider: StorageProvider::detect(&self.table_path),
315 options: self.storage_options.clone(),
316 env_resolved_keys: Vec::new(),
317 };
318 let cloud_result = CloudConfigValidator::validate(&resolved);
319 if !cloud_result.is_valid() {
320 return Err(ConnectorError::ConfigurationError(
321 cloud_result.error_message(),
322 ));
323 }
324
325 Ok(())
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
334 let mut config = ConnectorConfig::new("delta-lake-source");
335 for (k, v) in pairs {
336 config.set(*k, *v);
337 }
338 config
339 }
340
341 #[test]
342 fn test_defaults() {
343 let cfg = DeltaSourceConfig::default();
344 assert!(cfg.table_path.is_empty());
345 assert!(cfg.starting_version.is_none());
346 assert_eq!(cfg.poll_interval, Duration::from_secs(1));
347 }
348
349 #[test]
350 fn test_new_helper() {
351 let cfg = DeltaSourceConfig::new("/tmp/test_table");
352 assert_eq!(cfg.table_path, "/tmp/test_table");
353 }
354
355 #[test]
356 fn test_parse_required_fields() {
357 let config = make_config(&[("table.path", "/data/warehouse/trades")]);
358 let cfg = DeltaSourceConfig::from_config(&config).unwrap();
359 assert_eq!(cfg.table_path, "/data/warehouse/trades");
360 assert!(cfg.starting_version.is_none());
361 }
362
363 #[test]
364 fn test_missing_table_path() {
365 let config = ConnectorConfig::new("delta-lake-source");
366 assert!(DeltaSourceConfig::from_config(&config).is_err());
367 }
368
369 #[test]
370 fn test_parse_optional_fields() {
371 let config = make_config(&[
372 ("table.path", "/data/test"),
373 ("starting.version", "5"),
374 ("poll.interval.ms", "500"),
375 ]);
376 let cfg = DeltaSourceConfig::from_config(&config).unwrap();
377 assert_eq!(cfg.starting_version, Some(5));
378 assert_eq!(cfg.poll_interval, Duration::from_millis(500));
379 }
380
381 #[test]
382 fn test_invalid_starting_version() {
383 let config = make_config(&[("table.path", "/data/test"), ("starting.version", "abc")]);
384 assert!(DeltaSourceConfig::from_config(&config).is_err());
385 }
386
387 #[test]
388 fn test_empty_table_path_rejected() {
389 let mut cfg = DeltaSourceConfig::default();
390 cfg.table_path = String::new();
391 assert!(cfg.validate().is_err());
392 }
393
394 #[test]
397 fn test_read_mode_defaults_to_incremental() {
398 let cfg = DeltaSourceConfig::default();
399 assert_eq!(cfg.read_mode, DeltaReadMode::Incremental);
400 }
401
402 #[test]
403 fn test_read_mode_parse() {
404 assert_eq!(
405 "snapshot".parse::<DeltaReadMode>().unwrap(),
406 DeltaReadMode::Snapshot
407 );
408 assert_eq!(
409 "batch".parse::<DeltaReadMode>().unwrap(),
410 DeltaReadMode::Snapshot
411 );
412 assert_eq!(
413 "incremental".parse::<DeltaReadMode>().unwrap(),
414 DeltaReadMode::Incremental
415 );
416 assert_eq!(
417 "streaming".parse::<DeltaReadMode>().unwrap(),
418 DeltaReadMode::Incremental
419 );
420 assert_eq!(
421 "stream".parse::<DeltaReadMode>().unwrap(),
422 DeltaReadMode::Incremental
423 );
424 assert!("unknown".parse::<DeltaReadMode>().is_err());
425 }
426
427 #[test]
428 fn test_read_mode_display() {
429 assert_eq!(DeltaReadMode::Snapshot.to_string(), "snapshot");
430 assert_eq!(DeltaReadMode::Incremental.to_string(), "incremental");
431 }
432
433 #[test]
434 fn test_read_mode_from_config() {
435 let config = make_config(&[("table.path", "/data/test"), ("read.mode", "snapshot")]);
436 let cfg = DeltaSourceConfig::from_config(&config).unwrap();
437 assert_eq!(cfg.read_mode, DeltaReadMode::Snapshot);
438 }
439
440 #[test]
441 fn test_read_mode_invalid() {
442 let config = make_config(&[("table.path", "/data/test"), ("read.mode", "invalid")]);
443 assert!(DeltaSourceConfig::from_config(&config).is_err());
444 }
445
446 #[test]
447 fn test_partition_filter_from_config() {
448 let config = make_config(&[
449 ("table.path", "/data/test"),
450 ("partition.filter", "date = '2024-01-01'"),
451 ]);
452 let cfg = DeltaSourceConfig::from_config(&config).unwrap();
453 assert_eq!(cfg.partition_filter.as_deref(), Some("date = '2024-01-01'"));
454 }
455
456 #[test]
457 fn test_partition_filter_empty_is_none() {
458 let config = make_config(&[("table.path", "/data/test"), ("partition.filter", "")]);
459 let cfg = DeltaSourceConfig::from_config(&config).unwrap();
460 assert!(cfg.partition_filter.is_none());
461 }
462
463 #[test]
464 fn test_schema_evolution_action_parse() {
465 assert_eq!(
466 "warn".parse::<SchemaEvolutionAction>().unwrap(),
467 SchemaEvolutionAction::Warn
468 );
469 assert_eq!(
470 "error".parse::<SchemaEvolutionAction>().unwrap(),
471 SchemaEvolutionAction::Error
472 );
473 assert_eq!(
474 "fail".parse::<SchemaEvolutionAction>().unwrap(),
475 SchemaEvolutionAction::Error
476 );
477 assert!("unknown".parse::<SchemaEvolutionAction>().is_err());
478 }
479
480 #[test]
481 fn test_schema_evolution_action_from_config() {
482 let config = make_config(&[
483 ("table.path", "/data/test"),
484 ("schema.evolution.action", "error"),
485 ]);
486 let cfg = DeltaSourceConfig::from_config(&config).unwrap();
487 assert_eq!(cfg.schema_evolution_action, SchemaEvolutionAction::Error);
488 }
489
490 #[test]
491 fn test_schema_evolution_action_default() {
492 let cfg = DeltaSourceConfig::default();
493 assert_eq!(cfg.schema_evolution_action, SchemaEvolutionAction::Warn);
494 }
495}