1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
9use std::fmt;
10use std::str::FromStr;
11use std::time::Duration;
12
13use crate::config::ConnectorConfig;
14use crate::error::ConnectorError;
15use crate::storage::{
16 CloudConfigValidator, ResolvedStorageOptions, SecretMasker, StorageCredentialResolver,
17 StorageProvider,
18};
19
20#[derive(Debug, Clone)]
24pub struct DeltaLakeSinkConfig {
25 pub table_path: String,
27
28 pub partition_columns: Vec<String>,
30
31 pub target_file_size: usize,
33
34 pub max_buffer_records: usize,
36
37 pub max_buffer_duration: Duration,
39
40 pub checkpoint_interval: u64,
42
43 pub schema_evolution: bool,
45
46 pub write_mode: DeltaWriteMode,
48
49 pub merge_key_columns: Vec<String>,
51
52 pub storage_options: HashMap<String, String>,
54
55 pub compaction: CompactionConfig,
57
58 pub vacuum_retention: Duration,
60
61 pub delivery_guarantee: DeliveryGuarantee,
63
64 pub writer_id: String,
66
67 pub catalog_type: DeltaCatalogType,
69
70 pub catalog_database: Option<String>,
72
73 pub catalog_name: Option<String>,
75
76 pub catalog_schema: Option<String>,
78
79 pub catalog_properties: HashMap<String, String>,
81}
82
83impl Default for DeltaLakeSinkConfig {
84 fn default() -> Self {
85 Self {
86 table_path: String::new(),
87 partition_columns: Vec::new(),
88 target_file_size: 128 * 1024 * 1024, max_buffer_records: 100_000,
90 max_buffer_duration: Duration::from_secs(60),
91 checkpoint_interval: 10,
92 schema_evolution: false,
93 write_mode: DeltaWriteMode::Append,
94 merge_key_columns: Vec::new(),
95 storage_options: HashMap::new(),
96 compaction: CompactionConfig::default(),
97 vacuum_retention: Duration::from_secs(7 * 24 * 3600), delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
99 writer_id: uuid::Uuid::new_v4().to_string(),
100 catalog_type: DeltaCatalogType::None,
101 catalog_database: None,
102 catalog_name: None,
103 catalog_schema: None,
104 catalog_properties: HashMap::new(),
105 }
106 }
107}
108
109impl DeltaLakeSinkConfig {
110 #[must_use]
112 pub fn new(table_path: &str) -> Self {
113 Self {
114 table_path: table_path.to_string(),
115 ..Default::default()
116 }
117 }
118
119 #[allow(clippy::too_many_lines)]
130 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
131 let mut cfg = Self {
132 table_path: config.require("table.path")?.to_string(),
133 ..Self::default()
134 };
135
136 if let Some(v) = config.get("partition.columns") {
137 cfg.partition_columns = v
138 .split(',')
139 .map(|c| c.trim().to_string())
140 .filter(|c| !c.is_empty())
141 .collect();
142 }
143 if let Some(v) = config.get("target.file.size") {
144 cfg.target_file_size = v.parse().map_err(|_| {
145 ConnectorError::ConfigurationError(format!("invalid target.file.size: '{v}'"))
146 })?;
147 }
148 if let Some(v) = config.get("max.buffer.records") {
149 cfg.max_buffer_records = v.parse().map_err(|_| {
150 ConnectorError::ConfigurationError(format!("invalid max.buffer.records: '{v}'"))
151 })?;
152 }
153 if let Some(v) = config.get("max.buffer.duration.ms") {
154 let ms: u64 = v.parse().map_err(|_| {
155 ConnectorError::ConfigurationError(format!("invalid max.buffer.duration.ms: '{v}'"))
156 })?;
157 cfg.max_buffer_duration = Duration::from_millis(ms);
158 }
159 if let Some(v) = config.get("checkpoint.interval") {
160 cfg.checkpoint_interval = v.parse().map_err(|_| {
161 ConnectorError::ConfigurationError(format!("invalid checkpoint.interval: '{v}'"))
162 })?;
163 }
164 if let Some(v) = config.get("schema.evolution") {
165 cfg.schema_evolution = v.eq_ignore_ascii_case("true");
166 }
167 if let Some(v) = config.get("write.mode") {
168 cfg.write_mode = v.parse().map_err(|_| {
169 ConnectorError::ConfigurationError(format!(
170 "invalid write.mode: '{v}' (expected 'append', 'overwrite', or 'upsert')"
171 ))
172 })?;
173 }
174 if let Some(v) = config.get("merge.key.columns") {
175 cfg.merge_key_columns = v
176 .split(',')
177 .map(|c| c.trim().to_string())
178 .filter(|c| !c.is_empty())
179 .collect();
180 }
181 if let Some(v) = config.get("delivery.guarantee") {
182 cfg.delivery_guarantee = v.parse().map_err(|_| {
183 ConnectorError::ConfigurationError(format!(
184 "invalid delivery.guarantee: '{v}' \
185 (expected 'exactly-once' or 'at-least-once')"
186 ))
187 })?;
188 }
189 if let Some(v) = config.get("compaction.enabled") {
190 cfg.compaction.enabled = v.eq_ignore_ascii_case("true");
191 }
192 if let Some(v) = config.get("compaction.z-order.columns") {
193 cfg.compaction.z_order_columns = v
194 .split(',')
195 .map(|c| c.trim().to_string())
196 .filter(|c| !c.is_empty())
197 .collect();
198 }
199 if let Some(v) = config.get("compaction.min-files") {
200 cfg.compaction.min_files_for_compaction = v.parse().map_err(|_| {
201 ConnectorError::ConfigurationError(format!("invalid compaction.min-files: '{v}'"))
202 })?;
203 }
204 if let Some(v) = config.get("vacuum.retention.hours") {
205 let hours: u64 = v.parse().map_err(|_| {
206 ConnectorError::ConfigurationError(format!("invalid vacuum.retention.hours: '{v}'"))
207 })?;
208 cfg.vacuum_retention = Duration::from_secs(hours * 3600);
209 }
210 if let Some(v) = config.get("writer.id") {
211 cfg.writer_id = v.to_string();
212 } else if cfg.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
213 return Err(ConnectorError::ConfigurationError(
214 "exactly-once delivery requires an explicit 'writer.id' for stable \
215 recovery across restarts"
216 .into(),
217 ));
218 }
219
220 if let Some(v) = config.get("catalog.type") {
222 cfg.catalog_type = v.parse().map_err(|_| {
223 ConnectorError::ConfigurationError(format!(
224 "invalid catalog.type: '{v}' (expected 'none', 'glue', or 'unity')"
225 ))
226 })?;
227 }
228 if let Some(v) = config.get("catalog.database") {
229 cfg.catalog_database = Some(v.to_string());
230 }
231 if let Some(v) = config.get("catalog.name") {
232 cfg.catalog_name = Some(v.to_string());
233 }
234 if let Some(v) = config.get("catalog.schema") {
235 cfg.catalog_schema = Some(v.to_string());
236 }
237 if let DeltaCatalogType::Unity {
239 ref mut workspace_url,
240 ref mut access_token,
241 } = cfg.catalog_type
242 {
243 if let Some(v) = config.get("catalog.workspace_url") {
244 *workspace_url = v.to_string();
245 }
246 if let Some(v) = config.get("catalog.access_token") {
247 *access_token = v.to_string();
248 }
249 }
250 cfg.catalog_properties = config.properties_with_prefix("catalog.prop.");
251
252 let explicit_storage = config.properties_with_prefix("storage.");
254 let resolved = StorageCredentialResolver::resolve(&cfg.table_path, &explicit_storage);
255 cfg.storage_options = resolved.options;
256
257 cfg.validate()?;
258 Ok(cfg)
259 }
260
261 #[must_use]
263 pub fn display_storage_options(&self) -> String {
264 SecretMasker::display_map(&self.storage_options)
265 }
266
267 pub fn validate(&self) -> Result<(), ConnectorError> {
273 if self.table_path.is_empty() {
274 return Err(ConnectorError::MissingConfig("table.path".into()));
275 }
276 if self.write_mode == DeltaWriteMode::Upsert && self.merge_key_columns.is_empty() {
277 return Err(ConnectorError::ConfigurationError(
278 "upsert mode requires 'merge.key.columns' to be set".into(),
279 ));
280 }
281 if self.max_buffer_records == 0 {
282 return Err(ConnectorError::ConfigurationError(
283 "max.buffer.records must be > 0".into(),
284 ));
285 }
286 if self.target_file_size == 0 {
287 return Err(ConnectorError::ConfigurationError(
288 "target.file.size must be > 0".into(),
289 ));
290 }
291 if self.checkpoint_interval == 0 {
292 return Err(ConnectorError::ConfigurationError(
293 "checkpoint.interval must be > 0".into(),
294 ));
295 }
296
297 match &self.catalog_type {
299 DeltaCatalogType::None => {}
300 DeltaCatalogType::Glue => {
301 if self.catalog_database.is_none() {
302 return Err(ConnectorError::ConfigurationError(
303 "Glue catalog requires 'catalog.database' to be set".into(),
304 ));
305 }
306 }
307 DeltaCatalogType::Unity {
308 workspace_url,
309 access_token,
310 } => {
311 if workspace_url.is_empty() {
312 return Err(ConnectorError::ConfigurationError(
313 "Unity catalog requires 'catalog.workspace_url' to be set".into(),
314 ));
315 }
316 if access_token.is_empty() {
317 return Err(ConnectorError::ConfigurationError(
318 "Unity catalog requires 'catalog.access_token' to be set".into(),
319 ));
320 }
321 if self.catalog_name.is_none() {
322 return Err(ConnectorError::ConfigurationError(
323 "Unity catalog requires 'catalog.name' to be set".into(),
324 ));
325 }
326 if self.catalog_schema.is_none() {
327 return Err(ConnectorError::ConfigurationError(
328 "Unity catalog requires 'catalog.schema' to be set".into(),
329 ));
330 }
331 }
332 }
333
334 if self.catalog_type == DeltaCatalogType::None {
336 let resolved = ResolvedStorageOptions {
337 provider: StorageProvider::detect(&self.table_path),
338 options: self.storage_options.clone(),
339 env_resolved_keys: Vec::new(),
340 };
341 let cloud_result = CloudConfigValidator::validate(&resolved);
342 if !cloud_result.is_valid() {
343 return Err(ConnectorError::ConfigurationError(
344 cloud_result.error_message(),
345 ));
346 }
347 }
348
349 Ok(())
350 }
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq)]
355pub enum DeltaWriteMode {
356 Append,
358 Overwrite,
360 Upsert,
363}
364
365impl FromStr for DeltaWriteMode {
366 type Err = String;
367
368 fn from_str(s: &str) -> Result<Self, Self::Err> {
369 match s.to_lowercase().as_str() {
370 "append" => Ok(Self::Append),
371 "overwrite" => Ok(Self::Overwrite),
372 "upsert" | "merge" => Ok(Self::Upsert),
373 other => Err(format!("unknown write mode: '{other}'")),
374 }
375 }
376}
377
378impl fmt::Display for DeltaWriteMode {
379 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380 match self {
381 Self::Append => write!(f, "append"),
382 Self::Overwrite => write!(f, "overwrite"),
383 Self::Upsert => write!(f, "upsert"),
384 }
385 }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390pub enum DeliveryGuarantee {
391 AtLeastOnce,
393 ExactlyOnce,
395}
396
397impl FromStr for DeliveryGuarantee {
398 type Err = String;
399
400 fn from_str(s: &str) -> Result<Self, Self::Err> {
401 match s.to_lowercase().replace('-', "_").as_str() {
402 "at_least_once" | "atleastonce" => Ok(Self::AtLeastOnce),
403 "exactly_once" | "exactlyonce" => Ok(Self::ExactlyOnce),
404 other => Err(format!("unknown delivery guarantee: '{other}'")),
405 }
406 }
407}
408
409impl fmt::Display for DeliveryGuarantee {
410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411 match self {
412 Self::AtLeastOnce => write!(f, "at-least-once"),
413 Self::ExactlyOnce => write!(f, "exactly-once"),
414 }
415 }
416}
417
418#[derive(Debug, Clone, PartialEq, Eq, Default)]
422pub enum DeltaCatalogType {
423 #[default]
425 None,
426 Glue,
428 Unity {
430 workspace_url: String,
432 access_token: String,
434 },
435}
436
437impl FromStr for DeltaCatalogType {
438 type Err = String;
439
440 fn from_str(s: &str) -> Result<Self, Self::Err> {
441 match s.to_lowercase().as_str() {
442 "none" | "" => Ok(Self::None),
443 "glue" => Ok(Self::Glue),
444 "unity" => Ok(Self::Unity {
445 workspace_url: String::new(),
446 access_token: String::new(),
447 }),
448 other => Err(format!("unknown catalog type: '{other}'")),
449 }
450 }
451}
452
453impl fmt::Display for DeltaCatalogType {
454 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455 match self {
456 Self::None => write!(f, "none"),
457 Self::Glue => write!(f, "glue"),
458 Self::Unity { .. } => write!(f, "unity"),
459 }
460 }
461}
462
463#[derive(Debug, Clone)]
465pub struct CompactionConfig {
466 pub enabled: bool,
468
469 pub min_files_for_compaction: usize,
471
472 pub target_file_size: usize,
474
475 pub z_order_columns: Vec<String>,
477
478 pub check_interval: Duration,
480}
481
482impl Default for CompactionConfig {
483 fn default() -> Self {
484 Self {
485 enabled: true,
486 min_files_for_compaction: 10,
487 target_file_size: 128 * 1024 * 1024, z_order_columns: Vec::new(),
489 check_interval: Duration::from_secs(3600), }
491 }
492}
493
494#[cfg(test)]
495#[allow(clippy::field_reassign_with_default)]
496mod tests {
497 use super::*;
498
499 fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
500 let mut config = ConnectorConfig::new("delta-lake");
501 for (k, v) in pairs {
502 config.set(*k, *v);
503 }
504 config
505 }
506
507 fn required_pairs() -> Vec<(&'static str, &'static str)> {
508 vec![("table.path", "/data/warehouse/trades")]
509 }
510
511 #[test]
514 fn test_parse_required_fields() {
515 let config = make_config(&required_pairs());
516 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
517 assert_eq!(cfg.table_path, "/data/warehouse/trades");
518 assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
519 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
520 assert!(cfg.partition_columns.is_empty());
521 assert!(cfg.merge_key_columns.is_empty());
522 assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
523 assert_eq!(cfg.max_buffer_records, 100_000);
524 assert_eq!(cfg.checkpoint_interval, 10);
525 assert!(!cfg.schema_evolution);
526 assert!(cfg.compaction.enabled);
527 }
528
529 #[test]
530 fn test_missing_table_path() {
531 let config = ConnectorConfig::new("delta-lake");
532 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
533 }
534
535 #[test]
536 fn test_parse_all_optional_fields() {
537 let mut pairs = required_pairs();
538 pairs.extend_from_slice(&[
539 ("partition.columns", "trade_date, hour"),
540 ("target.file.size", "67108864"),
541 ("max.buffer.records", "50000"),
542 ("max.buffer.duration.ms", "30000"),
543 ("checkpoint.interval", "20"),
544 ("schema.evolution", "true"),
545 ("write.mode", "upsert"),
546 ("merge.key.columns", "customer_id, order_id"),
547 ("delivery.guarantee", "at-least-once"),
548 ("compaction.enabled", "true"),
549 ("compaction.z-order.columns", "customer_id, product_id"),
550 ("compaction.min-files", "20"),
551 ("vacuum.retention.hours", "336"),
552 ("writer.id", "my-writer"),
553 ("storage.aws_access_key_id", "AKID123"),
554 ("storage.aws_region", "us-east-1"),
555 ]);
556 let config = make_config(&pairs);
557 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
558
559 assert_eq!(cfg.partition_columns, vec!["trade_date", "hour"]);
560 assert_eq!(cfg.target_file_size, 67_108_864);
561 assert_eq!(cfg.max_buffer_records, 50_000);
562 assert_eq!(cfg.max_buffer_duration, Duration::from_millis(30_000));
563 assert_eq!(cfg.checkpoint_interval, 20);
564 assert!(cfg.schema_evolution);
565 assert_eq!(cfg.write_mode, DeltaWriteMode::Upsert);
566 assert_eq!(cfg.merge_key_columns, vec!["customer_id", "order_id"]);
567 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
568 assert!(cfg.compaction.enabled);
569 assert_eq!(
570 cfg.compaction.z_order_columns,
571 vec!["customer_id", "product_id"]
572 );
573 assert_eq!(cfg.compaction.min_files_for_compaction, 20);
574 assert_eq!(cfg.vacuum_retention, Duration::from_secs(336 * 3600));
575 assert_eq!(cfg.writer_id, "my-writer");
576 assert_eq!(
577 cfg.storage_options.get("aws_access_key_id"),
578 Some(&"AKID123".to_string())
579 );
580 assert_eq!(
581 cfg.storage_options.get("aws_region"),
582 Some(&"us-east-1".to_string())
583 );
584 }
585
586 #[test]
587 fn test_upsert_requires_merge_key() {
588 let mut pairs = required_pairs();
589 pairs.push(("write.mode", "upsert"));
590 let config = make_config(&pairs);
591 let result = DeltaLakeSinkConfig::from_config(&config);
592 assert!(result.is_err());
593 let err = result.unwrap_err().to_string();
594 assert!(err.contains("merge.key.columns"), "error: {err}");
595 }
596
597 #[test]
598 fn test_empty_table_path_rejected() {
599 let mut cfg = DeltaLakeSinkConfig::default();
600 cfg.table_path = String::new();
601 assert!(cfg.validate().is_err());
602 }
603
604 #[test]
605 fn test_zero_max_buffer_records_rejected() {
606 let mut pairs = required_pairs();
607 pairs.push(("max.buffer.records", "0"));
608 let config = make_config(&pairs);
609 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
610 }
611
612 #[test]
613 fn test_zero_target_file_size_rejected() {
614 let mut pairs = required_pairs();
615 pairs.push(("target.file.size", "0"));
616 let config = make_config(&pairs);
617 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
618 }
619
620 #[test]
621 fn test_zero_checkpoint_interval_rejected() {
622 let mut pairs = required_pairs();
623 pairs.push(("checkpoint.interval", "0"));
624 let config = make_config(&pairs);
625 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
626 }
627
628 #[test]
629 fn test_exactly_once_requires_writer_id() {
630 let mut pairs = required_pairs();
631 pairs.push(("delivery.guarantee", "exactly-once"));
632 let config = make_config(&pairs);
633 let err = DeltaLakeSinkConfig::from_config(&config).unwrap_err();
634 assert!(err.to_string().contains("writer.id"), "error: {err}");
635 }
636
637 #[test]
638 fn test_exactly_once_with_writer_id_ok() {
639 let mut pairs = required_pairs();
640 pairs.push(("delivery.guarantee", "exactly-once"));
641 pairs.push(("writer.id", "my-stable-writer"));
642 let config = make_config(&pairs);
643 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
644 assert_eq!(cfg.writer_id, "my-stable-writer");
645 }
646
647 #[test]
648 fn test_invalid_target_file_size() {
649 let mut pairs = required_pairs();
650 pairs.push(("target.file.size", "abc"));
651 let config = make_config(&pairs);
652 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
653 }
654
655 #[test]
656 fn test_invalid_write_mode() {
657 let mut pairs = required_pairs();
658 pairs.push(("write.mode", "unknown"));
659 let config = make_config(&pairs);
660 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
661 }
662
663 #[test]
664 fn test_storage_options_prefix_stripping() {
665 let mut pairs = required_pairs();
666 pairs.push(("storage.aws_access_key_id", "AKID"));
667 pairs.push(("storage.aws_secret_access_key", "SECRET"));
668 pairs.push(("table.path", "/data/test"));
669 let config = make_config(&pairs);
670 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
671
672 assert_eq!(cfg.storage_options.len(), 2);
673 assert!(cfg.storage_options.contains_key("aws_access_key_id"));
674 assert!(cfg.storage_options.contains_key("aws_secret_access_key"));
675 assert!(!cfg
676 .storage_options
677 .contains_key("storage.aws_access_key_id"));
678 }
679
680 #[test]
681 fn test_defaults() {
682 let cfg = DeltaLakeSinkConfig::default();
683 assert!(cfg.table_path.is_empty());
684 assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
685 assert_eq!(cfg.max_buffer_records, 100_000);
686 assert_eq!(cfg.max_buffer_duration, Duration::from_secs(60));
687 assert_eq!(cfg.checkpoint_interval, 10);
688 assert!(!cfg.schema_evolution);
689 assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
690 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
691 assert!(!cfg.writer_id.is_empty());
692 }
693
694 #[test]
695 fn test_new_helper() {
696 let cfg = DeltaLakeSinkConfig::new("/tmp/test_table");
697 assert_eq!(cfg.table_path, "/tmp/test_table");
698 assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
699 }
700
701 #[test]
704 fn test_write_mode_parse() {
705 assert_eq!(
706 "append".parse::<DeltaWriteMode>().unwrap(),
707 DeltaWriteMode::Append
708 );
709 assert_eq!(
710 "overwrite".parse::<DeltaWriteMode>().unwrap(),
711 DeltaWriteMode::Overwrite
712 );
713 assert_eq!(
714 "upsert".parse::<DeltaWriteMode>().unwrap(),
715 DeltaWriteMode::Upsert
716 );
717 assert_eq!(
718 "merge".parse::<DeltaWriteMode>().unwrap(),
719 DeltaWriteMode::Upsert
720 );
721 assert!("unknown".parse::<DeltaWriteMode>().is_err());
722 }
723
724 #[test]
725 fn test_write_mode_display() {
726 assert_eq!(DeltaWriteMode::Append.to_string(), "append");
727 assert_eq!(DeltaWriteMode::Overwrite.to_string(), "overwrite");
728 assert_eq!(DeltaWriteMode::Upsert.to_string(), "upsert");
729 }
730
731 #[test]
732 fn test_delivery_guarantee_parse() {
733 assert_eq!(
734 "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
735 DeliveryGuarantee::AtLeastOnce
736 );
737 assert_eq!(
738 "at_least_once".parse::<DeliveryGuarantee>().unwrap(),
739 DeliveryGuarantee::AtLeastOnce
740 );
741 assert_eq!(
742 "exactly-once".parse::<DeliveryGuarantee>().unwrap(),
743 DeliveryGuarantee::ExactlyOnce
744 );
745 assert_eq!(
746 "exactly_once".parse::<DeliveryGuarantee>().unwrap(),
747 DeliveryGuarantee::ExactlyOnce
748 );
749 assert!("unknown".parse::<DeliveryGuarantee>().is_err());
750 }
751
752 #[test]
753 fn test_delivery_guarantee_display() {
754 assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
755 assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
756 }
757
758 #[test]
759 fn test_compaction_config_defaults() {
760 let cfg = CompactionConfig::default();
761 assert!(cfg.enabled);
762 assert_eq!(cfg.min_files_for_compaction, 10);
763 assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
764 assert!(cfg.z_order_columns.is_empty());
765 assert_eq!(cfg.check_interval, Duration::from_secs(3600));
766 }
767
768 #[test]
769 fn test_partition_columns_empty_filter() {
770 let mut pairs = required_pairs();
771 pairs.push(("partition.columns", "a,,b, ,c"));
772 let config = make_config(&pairs);
773 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
774 assert_eq!(cfg.partition_columns, vec!["a", "b", "c"]);
775 }
776
777 #[test]
780 fn test_s3_path_requires_region() {
781 let config = make_config(&[("table.path", "s3://my-bucket/trades")]);
782 let result = DeltaLakeSinkConfig::from_config(&config);
783 assert!(result.is_err());
784 let err = result.unwrap_err().to_string();
785 assert!(err.contains("aws_region"), "error: {err}");
786 }
787
788 #[test]
789 fn test_s3_path_with_region_and_credentials() {
790 let config = make_config(&[
791 ("table.path", "s3://my-bucket/trades"),
792 ("storage.aws_region", "us-east-1"),
793 ("storage.aws_access_key_id", "AKID123"),
794 ("storage.aws_secret_access_key", "SECRET"),
795 ]);
796 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
797 assert_eq!(cfg.storage_options["aws_region"], "us-east-1");
798 assert_eq!(cfg.storage_options["aws_access_key_id"], "AKID123");
799 }
800
801 #[test]
802 fn test_s3_path_with_region_only_warns_no_error() {
803 let config = make_config(&[
805 ("table.path", "s3://my-bucket/trades"),
806 ("storage.aws_region", "us-east-1"),
807 ]);
808 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
809 }
810
811 #[test]
812 fn test_s3_path_access_key_without_secret_errors() {
813 let config = make_config(&[
814 ("table.path", "s3://my-bucket/trades"),
815 ("storage.aws_region", "us-east-1"),
816 ("storage.aws_access_key_id", "AKID123"),
817 ]);
818 let result = DeltaLakeSinkConfig::from_config(&config);
819 assert!(result.is_err());
820 let err = result.unwrap_err().to_string();
821 assert!(err.contains("aws_secret_access_key"), "error: {err}");
822 }
823
824 #[test]
825 fn test_azure_path_requires_account_name() {
826 let config = make_config(&[("table.path", "az://my-container/trades")]);
827 let result = DeltaLakeSinkConfig::from_config(&config);
828 assert!(result.is_err());
829 let err = result.unwrap_err().to_string();
830 assert!(err.contains("azure_storage_account_name"), "error: {err}");
831 }
832
833 #[test]
834 fn test_azure_path_with_account_name_and_key() {
835 let config = make_config(&[
836 ("table.path", "az://my-container/trades"),
837 ("storage.azure_storage_account_name", "myaccount"),
838 ("storage.azure_storage_account_key", "base64key=="),
839 ]);
840 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
841 }
842
843 #[test]
844 fn test_gcs_path_always_valid() {
845 let config = make_config(&[("table.path", "gs://my-bucket/trades")]);
847 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
848 }
849
850 #[test]
851 fn test_local_path_no_cloud_validation() {
852 let config = make_config(&[("table.path", "/data/warehouse/trades")]);
853 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
854 }
855
856 #[test]
857 fn test_display_storage_options_redacts_secrets() {
858 let mut cfg = DeltaLakeSinkConfig::new("s3://bucket/path");
859 cfg.storage_options
860 .insert("aws_region".to_string(), "us-east-1".to_string());
861 cfg.storage_options.insert(
862 "aws_secret_access_key".to_string(),
863 "TOP_SECRET".to_string(),
864 );
865
866 let display = cfg.display_storage_options();
867 assert!(display.contains("aws_region=us-east-1"));
868 assert!(display.contains("aws_secret_access_key=***"));
869 assert!(!display.contains("TOP_SECRET"));
870 }
871
872 #[test]
873 fn test_display_storage_options_empty() {
874 let cfg = DeltaLakeSinkConfig::new("/local/path");
875 assert!(cfg.display_storage_options().is_empty());
876 }
877
878 #[test]
881 fn test_catalog_type_parse() {
882 assert_eq!(
883 "none".parse::<DeltaCatalogType>().unwrap(),
884 DeltaCatalogType::None
885 );
886 assert_eq!(
887 "glue".parse::<DeltaCatalogType>().unwrap(),
888 DeltaCatalogType::Glue
889 );
890 assert!(matches!(
891 "unity".parse::<DeltaCatalogType>().unwrap(),
892 DeltaCatalogType::Unity { .. }
893 ));
894 assert!("unknown".parse::<DeltaCatalogType>().is_err());
895 }
896
897 #[test]
898 fn test_catalog_type_display() {
899 assert_eq!(DeltaCatalogType::None.to_string(), "none");
900 assert_eq!(DeltaCatalogType::Glue.to_string(), "glue");
901 assert_eq!(
902 DeltaCatalogType::Unity {
903 workspace_url: "url".into(),
904 access_token: "tok".into()
905 }
906 .to_string(),
907 "unity"
908 );
909 }
910
911 #[test]
912 fn test_catalog_none_default() {
913 let config = make_config(&required_pairs());
914 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
915 assert_eq!(cfg.catalog_type, DeltaCatalogType::None);
916 assert!(cfg.catalog_database.is_none());
917 assert!(cfg.catalog_name.is_none());
918 assert!(cfg.catalog_schema.is_none());
919 assert!(cfg.catalog_properties.is_empty());
920 }
921
922 #[test]
923 fn test_catalog_glue_valid() {
924 let mut pairs = required_pairs();
925 pairs.extend_from_slice(&[
926 ("catalog.type", "glue"),
927 ("catalog.database", "my_database"),
928 ]);
929 let config = make_config(&pairs);
930 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
931 assert_eq!(cfg.catalog_type, DeltaCatalogType::Glue);
932 assert_eq!(cfg.catalog_database.as_deref(), Some("my_database"));
933 }
934
935 #[test]
936 fn test_catalog_glue_missing_database() {
937 let mut pairs = required_pairs();
938 pairs.push(("catalog.type", "glue"));
939 let config = make_config(&pairs);
940 let result = DeltaLakeSinkConfig::from_config(&config);
941 assert!(result.is_err());
942 let err = result.unwrap_err().to_string();
943 assert!(err.contains("catalog.database"), "error: {err}");
944 }
945
946 #[test]
947 fn test_catalog_unity_valid() {
948 let mut pairs = required_pairs();
949 pairs.extend_from_slice(&[
950 ("catalog.type", "unity"),
951 ("catalog.workspace_url", "https://my.databricks.com"),
952 ("catalog.access_token", "dapi123"),
953 ("catalog.name", "main"),
954 ("catalog.schema", "default"),
955 ]);
956 let config = make_config(&pairs);
957 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
958 assert!(matches!(
959 cfg.catalog_type,
960 DeltaCatalogType::Unity {
961 ref workspace_url,
962 ref access_token
963 }
964 if workspace_url == "https://my.databricks.com"
965 && access_token == "dapi123"
966 ));
967 assert_eq!(cfg.catalog_name.as_deref(), Some("main"));
968 assert_eq!(cfg.catalog_schema.as_deref(), Some("default"));
969 }
970
971 #[test]
972 fn test_catalog_unity_missing_workspace_url() {
973 let mut pairs = required_pairs();
974 pairs.extend_from_slice(&[
975 ("catalog.type", "unity"),
976 ("catalog.access_token", "dapi123"),
977 ("catalog.name", "main"),
978 ("catalog.schema", "default"),
979 ]);
980 let config = make_config(&pairs);
981 let result = DeltaLakeSinkConfig::from_config(&config);
982 assert!(result.is_err());
983 let err = result.unwrap_err().to_string();
984 assert!(err.contains("workspace_url"), "error: {err}");
985 }
986
987 #[test]
988 fn test_catalog_unity_missing_access_token() {
989 let mut pairs = required_pairs();
990 pairs.extend_from_slice(&[
991 ("catalog.type", "unity"),
992 ("catalog.workspace_url", "https://my.databricks.com"),
993 ("catalog.name", "main"),
994 ("catalog.schema", "default"),
995 ]);
996 let config = make_config(&pairs);
997 let result = DeltaLakeSinkConfig::from_config(&config);
998 assert!(result.is_err());
999 let err = result.unwrap_err().to_string();
1000 assert!(err.contains("access_token"), "error: {err}");
1001 }
1002
1003 #[test]
1004 fn test_catalog_properties_prefix() {
1005 let mut pairs = required_pairs();
1006 pairs.extend_from_slice(&[
1007 ("catalog.prop.token", "my_token"),
1008 ("catalog.prop.warehouse", "my_wh"),
1009 ]);
1010 let config = make_config(&pairs);
1011 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1012 assert_eq!(
1013 cfg.catalog_properties.get("token"),
1014 Some(&"my_token".to_string())
1015 );
1016 assert_eq!(
1017 cfg.catalog_properties.get("warehouse"),
1018 Some(&"my_wh".to_string())
1019 );
1020 }
1021}