Skip to main content

laminar_connectors/lakehouse/
delta_config.rs

1//! Delta Lake sink connector configuration.
2//!
3//! [`DeltaLakeSinkConfig`] encapsulates all settings for writing Arrow
4//! `RecordBatch` data to Delta Lake tables, parsed from SQL `WITH (...)`
5//! clauses via [`from_config`](DeltaLakeSinkConfig::from_config).
6#![allow(clippy::disallowed_types)] // cold path: lakehouse configuration
7
8use std::collections::HashMap;
9use std::fmt;
10use std::str::FromStr;
11use std::time::Duration;
12
13use crate::config::ConnectorConfig;
14use crate::error::ConnectorError;
15use crate::storage::{
16    CloudConfigValidator, ResolvedStorageOptions, SecretMasker, StorageCredentialResolver,
17    StorageProvider,
18};
19
20/// Configuration for the Delta Lake sink connector.
21///
22/// Parsed from SQL `WITH (...)` clause options or constructed programmatically.
23#[derive(Debug, Clone)]
24pub struct DeltaLakeSinkConfig {
25    /// Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`).
26    pub table_path: String,
27
28    /// Columns to partition by (e.g., `["trade_date", "hour"]`).
29    pub partition_columns: Vec<String>,
30
31    /// Target Parquet file size in bytes (default: 128 MB).
32    pub target_file_size: usize,
33
34    /// Maximum number of records to buffer before flushing to Parquet.
35    pub max_buffer_records: usize,
36
37    /// Maximum time to buffer records before flushing.
38    pub max_buffer_duration: Duration,
39
40    /// Delta Lake checkpoint interval (create checkpoint every N commits).
41    pub checkpoint_interval: u64,
42
43    /// Whether to enable schema evolution (auto-merge new columns).
44    pub schema_evolution: bool,
45
46    /// Write mode: Append, Overwrite, or Upsert (CDC merge).
47    pub write_mode: DeltaWriteMode,
48
49    /// Key columns for upsert/merge operations (required for Upsert mode).
50    pub merge_key_columns: Vec<String>,
51
52    /// Storage options (S3 credentials, Azure keys, etc.).
53    pub storage_options: HashMap<String, String>,
54
55    /// Compaction configuration.
56    pub compaction: CompactionConfig,
57
58    /// Vacuum retention period for old files.
59    pub vacuum_retention: Duration,
60
61    /// Delivery guarantee: `AtLeastOnce` or `ExactlyOnce`.
62    pub delivery_guarantee: DeliveryGuarantee,
63
64    /// Writer ID for exactly-once deduplication.
65    pub writer_id: String,
66
67    /// Catalog type for table discovery.
68    pub catalog_type: DeltaCatalogType,
69
70    /// Catalog database name (required for Glue).
71    pub catalog_database: Option<String>,
72
73    /// Catalog name (required for Unity).
74    pub catalog_name: Option<String>,
75
76    /// Catalog schema name (required for Unity).
77    pub catalog_schema: Option<String>,
78
79    /// Additional catalog-specific properties.
80    pub catalog_properties: HashMap<String, String>,
81}
82
83impl Default for DeltaLakeSinkConfig {
84    fn default() -> Self {
85        Self {
86            table_path: String::new(),
87            partition_columns: Vec::new(),
88            target_file_size: 128 * 1024 * 1024, // 128 MB
89            max_buffer_records: 100_000,
90            max_buffer_duration: Duration::from_secs(60),
91            checkpoint_interval: 10,
92            schema_evolution: false,
93            write_mode: DeltaWriteMode::Append,
94            merge_key_columns: Vec::new(),
95            storage_options: HashMap::new(),
96            compaction: CompactionConfig::default(),
97            vacuum_retention: Duration::from_secs(7 * 24 * 3600), // 7 days
98            delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
99            writer_id: uuid::Uuid::new_v4().to_string(),
100            catalog_type: DeltaCatalogType::None,
101            catalog_database: None,
102            catalog_name: None,
103            catalog_schema: None,
104            catalog_properties: HashMap::new(),
105        }
106    }
107}
108
109impl DeltaLakeSinkConfig {
110    /// Creates a minimal config for testing.
111    #[must_use]
112    pub fn new(table_path: &str) -> Self {
113        Self {
114            table_path: table_path.to_string(),
115            ..Default::default()
116        }
117    }
118
119    /// Parses a sink config from a [`ConnectorConfig`] (SQL WITH clause).
120    ///
121    /// # Required keys
122    ///
123    /// - `table.path` - Path to Delta Lake table
124    ///
125    /// # Errors
126    ///
127    /// Returns `ConnectorError::MissingConfig` if required keys are absent,
128    /// or `ConnectorError::ConfigurationError` on invalid values.
129    #[allow(clippy::too_many_lines)]
130    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
131        let mut cfg = Self {
132            table_path: config.require("table.path")?.to_string(),
133            ..Self::default()
134        };
135
136        if let Some(v) = config.get("partition.columns") {
137            cfg.partition_columns = v
138                .split(',')
139                .map(|c| c.trim().to_string())
140                .filter(|c| !c.is_empty())
141                .collect();
142        }
143        if let Some(v) = config.get("target.file.size") {
144            cfg.target_file_size = v.parse().map_err(|_| {
145                ConnectorError::ConfigurationError(format!("invalid target.file.size: '{v}'"))
146            })?;
147        }
148        if let Some(v) = config.get("max.buffer.records") {
149            cfg.max_buffer_records = v.parse().map_err(|_| {
150                ConnectorError::ConfigurationError(format!("invalid max.buffer.records: '{v}'"))
151            })?;
152        }
153        if let Some(v) = config.get("max.buffer.duration.ms") {
154            let ms: u64 = v.parse().map_err(|_| {
155                ConnectorError::ConfigurationError(format!("invalid max.buffer.duration.ms: '{v}'"))
156            })?;
157            cfg.max_buffer_duration = Duration::from_millis(ms);
158        }
159        if let Some(v) = config.get("checkpoint.interval") {
160            cfg.checkpoint_interval = v.parse().map_err(|_| {
161                ConnectorError::ConfigurationError(format!("invalid checkpoint.interval: '{v}'"))
162            })?;
163        }
164        if let Some(v) = config.get("schema.evolution") {
165            cfg.schema_evolution = v.eq_ignore_ascii_case("true");
166        }
167        if let Some(v) = config.get("write.mode") {
168            cfg.write_mode = v.parse().map_err(|_| {
169                ConnectorError::ConfigurationError(format!(
170                    "invalid write.mode: '{v}' (expected 'append', 'overwrite', or 'upsert')"
171                ))
172            })?;
173        }
174        if let Some(v) = config.get("merge.key.columns") {
175            cfg.merge_key_columns = v
176                .split(',')
177                .map(|c| c.trim().to_string())
178                .filter(|c| !c.is_empty())
179                .collect();
180        }
181        if let Some(v) = config.get("delivery.guarantee") {
182            cfg.delivery_guarantee = v.parse().map_err(|_| {
183                ConnectorError::ConfigurationError(format!(
184                    "invalid delivery.guarantee: '{v}' \
185                     (expected 'exactly-once' or 'at-least-once')"
186                ))
187            })?;
188        }
189        if let Some(v) = config.get("compaction.enabled") {
190            cfg.compaction.enabled = v.eq_ignore_ascii_case("true");
191        }
192        if let Some(v) = config.get("compaction.z-order.columns") {
193            cfg.compaction.z_order_columns = v
194                .split(',')
195                .map(|c| c.trim().to_string())
196                .filter(|c| !c.is_empty())
197                .collect();
198        }
199        if let Some(v) = config.get("compaction.min-files") {
200            cfg.compaction.min_files_for_compaction = v.parse().map_err(|_| {
201                ConnectorError::ConfigurationError(format!("invalid compaction.min-files: '{v}'"))
202            })?;
203        }
204        if let Some(v) = config.get("vacuum.retention.hours") {
205            let hours: u64 = v.parse().map_err(|_| {
206                ConnectorError::ConfigurationError(format!("invalid vacuum.retention.hours: '{v}'"))
207            })?;
208            cfg.vacuum_retention = Duration::from_secs(hours * 3600);
209        }
210        if let Some(v) = config.get("writer.id") {
211            cfg.writer_id = v.to_string();
212        } else if cfg.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
213            return Err(ConnectorError::ConfigurationError(
214                "exactly-once delivery requires an explicit 'writer.id' for stable \
215                 recovery across restarts"
216                    .into(),
217            ));
218        }
219
220        // ── Catalog configuration ──
221        if let Some(v) = config.get("catalog.type") {
222            cfg.catalog_type = v.parse().map_err(|_| {
223                ConnectorError::ConfigurationError(format!(
224                    "invalid catalog.type: '{v}' (expected 'none', 'glue', or 'unity')"
225                ))
226            })?;
227        }
228        if let Some(v) = config.get("catalog.database") {
229            cfg.catalog_database = Some(v.to_string());
230        }
231        if let Some(v) = config.get("catalog.name") {
232            cfg.catalog_name = Some(v.to_string());
233        }
234        if let Some(v) = config.get("catalog.schema") {
235            cfg.catalog_schema = Some(v.to_string());
236        }
237        // Unity-specific: populate workspace_url and access_token into the enum variant.
238        if let DeltaCatalogType::Unity {
239            ref mut workspace_url,
240            ref mut access_token,
241        } = cfg.catalog_type
242        {
243            if let Some(v) = config.get("catalog.workspace_url") {
244                *workspace_url = v.to_string();
245            }
246            if let Some(v) = config.get("catalog.access_token") {
247                *access_token = v.to_string();
248            }
249        }
250        cfg.catalog_properties = config.properties_with_prefix("catalog.prop.");
251
252        // Resolve storage credentials: explicit options + environment variable fallbacks.
253        let explicit_storage = config.properties_with_prefix("storage.");
254        let resolved = StorageCredentialResolver::resolve(&cfg.table_path, &explicit_storage);
255        cfg.storage_options = resolved.options;
256
257        cfg.validate()?;
258        Ok(cfg)
259    }
260
261    /// Formats the storage options for safe logging with secrets redacted.
262    #[must_use]
263    pub fn display_storage_options(&self) -> String {
264        SecretMasker::display_map(&self.storage_options)
265    }
266
267    /// Validates the configuration for consistency.
268    ///
269    /// # Errors
270    ///
271    /// Returns `ConnectorError::ConfigurationError` on invalid combinations.
272    pub fn validate(&self) -> Result<(), ConnectorError> {
273        if self.table_path.is_empty() {
274            return Err(ConnectorError::MissingConfig("table.path".into()));
275        }
276        if self.write_mode == DeltaWriteMode::Upsert && self.merge_key_columns.is_empty() {
277            return Err(ConnectorError::ConfigurationError(
278                "upsert mode requires 'merge.key.columns' to be set".into(),
279            ));
280        }
281        if self.max_buffer_records == 0 {
282            return Err(ConnectorError::ConfigurationError(
283                "max.buffer.records must be > 0".into(),
284            ));
285        }
286        if self.target_file_size == 0 {
287            return Err(ConnectorError::ConfigurationError(
288                "target.file.size must be > 0".into(),
289            ));
290        }
291        if self.checkpoint_interval == 0 {
292            return Err(ConnectorError::ConfigurationError(
293                "checkpoint.interval must be > 0".into(),
294            ));
295        }
296
297        // Validate catalog-specific requirements.
298        match &self.catalog_type {
299            DeltaCatalogType::None => {}
300            DeltaCatalogType::Glue => {
301                if self.catalog_database.is_none() {
302                    return Err(ConnectorError::ConfigurationError(
303                        "Glue catalog requires 'catalog.database' to be set".into(),
304                    ));
305                }
306            }
307            DeltaCatalogType::Unity {
308                workspace_url,
309                access_token,
310            } => {
311                if workspace_url.is_empty() {
312                    return Err(ConnectorError::ConfigurationError(
313                        "Unity catalog requires 'catalog.workspace_url' to be set".into(),
314                    ));
315                }
316                if access_token.is_empty() {
317                    return Err(ConnectorError::ConfigurationError(
318                        "Unity catalog requires 'catalog.access_token' to be set".into(),
319                    ));
320                }
321                if self.catalog_name.is_none() {
322                    return Err(ConnectorError::ConfigurationError(
323                        "Unity catalog requires 'catalog.name' to be set".into(),
324                    ));
325                }
326                if self.catalog_schema.is_none() {
327                    return Err(ConnectorError::ConfigurationError(
328                        "Unity catalog requires 'catalog.schema' to be set".into(),
329                    ));
330                }
331            }
332        }
333
334        // Validate cloud storage credentials (skip when catalog resolves the path).
335        if self.catalog_type == DeltaCatalogType::None {
336            let resolved = ResolvedStorageOptions {
337                provider: StorageProvider::detect(&self.table_path),
338                options: self.storage_options.clone(),
339                env_resolved_keys: Vec::new(),
340            };
341            let cloud_result = CloudConfigValidator::validate(&resolved);
342            if !cloud_result.is_valid() {
343                return Err(ConnectorError::ConfigurationError(
344                    cloud_result.error_message(),
345                ));
346            }
347        }
348
349        Ok(())
350    }
351}
352
353/// Delta Lake write mode.
354#[derive(Debug, Clone, Copy, PartialEq, Eq)]
355pub enum DeltaWriteMode {
356    /// Append-only: all records are inserts. Most efficient for immutable streams.
357    Append,
358    /// Overwrite: replace partition contents. Used for batch-style recomputation.
359    Overwrite,
360    /// Upsert/Merge: CDC-style insert/update/delete via MERGE statement.
361    /// Requires `merge_key_columns` to be set. Integrates with Z-sets.
362    Upsert,
363}
364
365impl FromStr for DeltaWriteMode {
366    type Err = String;
367
368    fn from_str(s: &str) -> Result<Self, Self::Err> {
369        match s.to_lowercase().as_str() {
370            "append" => Ok(Self::Append),
371            "overwrite" => Ok(Self::Overwrite),
372            "upsert" | "merge" => Ok(Self::Upsert),
373            other => Err(format!("unknown write mode: '{other}'")),
374        }
375    }
376}
377
378impl fmt::Display for DeltaWriteMode {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        match self {
381            Self::Append => write!(f, "append"),
382            Self::Overwrite => write!(f, "overwrite"),
383            Self::Upsert => write!(f, "upsert"),
384        }
385    }
386}
387
388/// Delivery guarantee level.
389#[derive(Debug, Clone, Copy, PartialEq, Eq)]
390pub enum DeliveryGuarantee {
391    /// May produce duplicates on recovery. Simpler, slightly higher throughput.
392    AtLeastOnce,
393    /// No duplicates. Uses epoch-to-Delta-version mapping.
394    ExactlyOnce,
395}
396
397impl FromStr for DeliveryGuarantee {
398    type Err = String;
399
400    fn from_str(s: &str) -> Result<Self, Self::Err> {
401        match s.to_lowercase().replace('-', "_").as_str() {
402            "at_least_once" | "atleastonce" => Ok(Self::AtLeastOnce),
403            "exactly_once" | "exactlyonce" => Ok(Self::ExactlyOnce),
404            other => Err(format!("unknown delivery guarantee: '{other}'")),
405        }
406    }
407}
408
409impl fmt::Display for DeliveryGuarantee {
410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411        match self {
412            Self::AtLeastOnce => write!(f, "at-least-once"),
413            Self::ExactlyOnce => write!(f, "exactly-once"),
414        }
415    }
416}
417
418/// Delta Lake catalog type for table discovery.
419///
420/// Catalogs enable referencing tables by logical names instead of raw paths.
421#[derive(Debug, Clone, PartialEq, Eq, Default)]
422pub enum DeltaCatalogType {
423    /// No catalog — table path is a direct file or cloud URI.
424    #[default]
425    None,
426    /// AWS Glue Data Catalog.
427    Glue,
428    /// Databricks Unity Catalog.
429    Unity {
430        /// Databricks workspace URL (e.g., `https://xxx.cloud.databricks.com`).
431        workspace_url: String,
432        /// Databricks access token.
433        access_token: String,
434    },
435}
436
437impl FromStr for DeltaCatalogType {
438    type Err = String;
439
440    fn from_str(s: &str) -> Result<Self, Self::Err> {
441        match s.to_lowercase().as_str() {
442            "none" | "" => Ok(Self::None),
443            "glue" => Ok(Self::Glue),
444            "unity" => Ok(Self::Unity {
445                workspace_url: String::new(),
446                access_token: String::new(),
447            }),
448            other => Err(format!("unknown catalog type: '{other}'")),
449        }
450    }
451}
452
453impl fmt::Display for DeltaCatalogType {
454    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
455        match self {
456            Self::None => write!(f, "none"),
457            Self::Glue => write!(f, "glue"),
458            Self::Unity { .. } => write!(f, "unity"),
459        }
460    }
461}
462
463/// Configuration for background compaction.
464#[derive(Debug, Clone)]
465pub struct CompactionConfig {
466    /// Whether to run automatic compaction.
467    pub enabled: bool,
468
469    /// Minimum number of files before triggering compaction.
470    pub min_files_for_compaction: usize,
471
472    /// Target file size after compaction.
473    pub target_file_size: usize,
474
475    /// Columns for Z-ORDER clustering (optional).
476    pub z_order_columns: Vec<String>,
477
478    /// How often to check if compaction is needed.
479    pub check_interval: Duration,
480}
481
482impl Default for CompactionConfig {
483    fn default() -> Self {
484        Self {
485            enabled: true,
486            min_files_for_compaction: 10,
487            target_file_size: 128 * 1024 * 1024, // 128 MB
488            z_order_columns: Vec::new(),
489            check_interval: Duration::from_secs(3600), // 60 minutes
490        }
491    }
492}
493
494#[cfg(test)]
495#[allow(clippy::field_reassign_with_default)]
496mod tests {
497    use super::*;
498
499    fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
500        let mut config = ConnectorConfig::new("delta-lake");
501        for (k, v) in pairs {
502            config.set(*k, *v);
503        }
504        config
505    }
506
507    fn required_pairs() -> Vec<(&'static str, &'static str)> {
508        vec![("table.path", "/data/warehouse/trades")]
509    }
510
511    // ── Config parsing tests ──
512
513    #[test]
514    fn test_parse_required_fields() {
515        let config = make_config(&required_pairs());
516        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
517        assert_eq!(cfg.table_path, "/data/warehouse/trades");
518        assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
519        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
520        assert!(cfg.partition_columns.is_empty());
521        assert!(cfg.merge_key_columns.is_empty());
522        assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
523        assert_eq!(cfg.max_buffer_records, 100_000);
524        assert_eq!(cfg.checkpoint_interval, 10);
525        assert!(!cfg.schema_evolution);
526        assert!(cfg.compaction.enabled);
527    }
528
529    #[test]
530    fn test_missing_table_path() {
531        let config = ConnectorConfig::new("delta-lake");
532        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
533    }
534
535    #[test]
536    fn test_parse_all_optional_fields() {
537        let mut pairs = required_pairs();
538        pairs.extend_from_slice(&[
539            ("partition.columns", "trade_date, hour"),
540            ("target.file.size", "67108864"),
541            ("max.buffer.records", "50000"),
542            ("max.buffer.duration.ms", "30000"),
543            ("checkpoint.interval", "20"),
544            ("schema.evolution", "true"),
545            ("write.mode", "upsert"),
546            ("merge.key.columns", "customer_id, order_id"),
547            ("delivery.guarantee", "at-least-once"),
548            ("compaction.enabled", "true"),
549            ("compaction.z-order.columns", "customer_id, product_id"),
550            ("compaction.min-files", "20"),
551            ("vacuum.retention.hours", "336"),
552            ("writer.id", "my-writer"),
553            ("storage.aws_access_key_id", "AKID123"),
554            ("storage.aws_region", "us-east-1"),
555        ]);
556        let config = make_config(&pairs);
557        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
558
559        assert_eq!(cfg.partition_columns, vec!["trade_date", "hour"]);
560        assert_eq!(cfg.target_file_size, 67_108_864);
561        assert_eq!(cfg.max_buffer_records, 50_000);
562        assert_eq!(cfg.max_buffer_duration, Duration::from_millis(30_000));
563        assert_eq!(cfg.checkpoint_interval, 20);
564        assert!(cfg.schema_evolution);
565        assert_eq!(cfg.write_mode, DeltaWriteMode::Upsert);
566        assert_eq!(cfg.merge_key_columns, vec!["customer_id", "order_id"]);
567        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
568        assert!(cfg.compaction.enabled);
569        assert_eq!(
570            cfg.compaction.z_order_columns,
571            vec!["customer_id", "product_id"]
572        );
573        assert_eq!(cfg.compaction.min_files_for_compaction, 20);
574        assert_eq!(cfg.vacuum_retention, Duration::from_secs(336 * 3600));
575        assert_eq!(cfg.writer_id, "my-writer");
576        assert_eq!(
577            cfg.storage_options.get("aws_access_key_id"),
578            Some(&"AKID123".to_string())
579        );
580        assert_eq!(
581            cfg.storage_options.get("aws_region"),
582            Some(&"us-east-1".to_string())
583        );
584    }
585
586    #[test]
587    fn test_upsert_requires_merge_key() {
588        let mut pairs = required_pairs();
589        pairs.push(("write.mode", "upsert"));
590        let config = make_config(&pairs);
591        let result = DeltaLakeSinkConfig::from_config(&config);
592        assert!(result.is_err());
593        let err = result.unwrap_err().to_string();
594        assert!(err.contains("merge.key.columns"), "error: {err}");
595    }
596
597    #[test]
598    fn test_empty_table_path_rejected() {
599        let mut cfg = DeltaLakeSinkConfig::default();
600        cfg.table_path = String::new();
601        assert!(cfg.validate().is_err());
602    }
603
604    #[test]
605    fn test_zero_max_buffer_records_rejected() {
606        let mut pairs = required_pairs();
607        pairs.push(("max.buffer.records", "0"));
608        let config = make_config(&pairs);
609        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
610    }
611
612    #[test]
613    fn test_zero_target_file_size_rejected() {
614        let mut pairs = required_pairs();
615        pairs.push(("target.file.size", "0"));
616        let config = make_config(&pairs);
617        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
618    }
619
620    #[test]
621    fn test_zero_checkpoint_interval_rejected() {
622        let mut pairs = required_pairs();
623        pairs.push(("checkpoint.interval", "0"));
624        let config = make_config(&pairs);
625        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
626    }
627
628    #[test]
629    fn test_exactly_once_requires_writer_id() {
630        let mut pairs = required_pairs();
631        pairs.push(("delivery.guarantee", "exactly-once"));
632        let config = make_config(&pairs);
633        let err = DeltaLakeSinkConfig::from_config(&config).unwrap_err();
634        assert!(err.to_string().contains("writer.id"), "error: {err}");
635    }
636
637    #[test]
638    fn test_exactly_once_with_writer_id_ok() {
639        let mut pairs = required_pairs();
640        pairs.push(("delivery.guarantee", "exactly-once"));
641        pairs.push(("writer.id", "my-stable-writer"));
642        let config = make_config(&pairs);
643        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
644        assert_eq!(cfg.writer_id, "my-stable-writer");
645    }
646
647    #[test]
648    fn test_invalid_target_file_size() {
649        let mut pairs = required_pairs();
650        pairs.push(("target.file.size", "abc"));
651        let config = make_config(&pairs);
652        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
653    }
654
655    #[test]
656    fn test_invalid_write_mode() {
657        let mut pairs = required_pairs();
658        pairs.push(("write.mode", "unknown"));
659        let config = make_config(&pairs);
660        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
661    }
662
663    #[test]
664    fn test_storage_options_prefix_stripping() {
665        let mut pairs = required_pairs();
666        pairs.push(("storage.aws_access_key_id", "AKID"));
667        pairs.push(("storage.aws_secret_access_key", "SECRET"));
668        pairs.push(("table.path", "/data/test"));
669        let config = make_config(&pairs);
670        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
671
672        assert_eq!(cfg.storage_options.len(), 2);
673        assert!(cfg.storage_options.contains_key("aws_access_key_id"));
674        assert!(cfg.storage_options.contains_key("aws_secret_access_key"));
675        assert!(!cfg
676            .storage_options
677            .contains_key("storage.aws_access_key_id"));
678    }
679
680    #[test]
681    fn test_defaults() {
682        let cfg = DeltaLakeSinkConfig::default();
683        assert!(cfg.table_path.is_empty());
684        assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
685        assert_eq!(cfg.max_buffer_records, 100_000);
686        assert_eq!(cfg.max_buffer_duration, Duration::from_secs(60));
687        assert_eq!(cfg.checkpoint_interval, 10);
688        assert!(!cfg.schema_evolution);
689        assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
690        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
691        assert!(!cfg.writer_id.is_empty());
692    }
693
694    #[test]
695    fn test_new_helper() {
696        let cfg = DeltaLakeSinkConfig::new("/tmp/test_table");
697        assert_eq!(cfg.table_path, "/tmp/test_table");
698        assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
699    }
700
701    // ── Enum tests ──
702
703    #[test]
704    fn test_write_mode_parse() {
705        assert_eq!(
706            "append".parse::<DeltaWriteMode>().unwrap(),
707            DeltaWriteMode::Append
708        );
709        assert_eq!(
710            "overwrite".parse::<DeltaWriteMode>().unwrap(),
711            DeltaWriteMode::Overwrite
712        );
713        assert_eq!(
714            "upsert".parse::<DeltaWriteMode>().unwrap(),
715            DeltaWriteMode::Upsert
716        );
717        assert_eq!(
718            "merge".parse::<DeltaWriteMode>().unwrap(),
719            DeltaWriteMode::Upsert
720        );
721        assert!("unknown".parse::<DeltaWriteMode>().is_err());
722    }
723
724    #[test]
725    fn test_write_mode_display() {
726        assert_eq!(DeltaWriteMode::Append.to_string(), "append");
727        assert_eq!(DeltaWriteMode::Overwrite.to_string(), "overwrite");
728        assert_eq!(DeltaWriteMode::Upsert.to_string(), "upsert");
729    }
730
731    #[test]
732    fn test_delivery_guarantee_parse() {
733        assert_eq!(
734            "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
735            DeliveryGuarantee::AtLeastOnce
736        );
737        assert_eq!(
738            "at_least_once".parse::<DeliveryGuarantee>().unwrap(),
739            DeliveryGuarantee::AtLeastOnce
740        );
741        assert_eq!(
742            "exactly-once".parse::<DeliveryGuarantee>().unwrap(),
743            DeliveryGuarantee::ExactlyOnce
744        );
745        assert_eq!(
746            "exactly_once".parse::<DeliveryGuarantee>().unwrap(),
747            DeliveryGuarantee::ExactlyOnce
748        );
749        assert!("unknown".parse::<DeliveryGuarantee>().is_err());
750    }
751
752    #[test]
753    fn test_delivery_guarantee_display() {
754        assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
755        assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
756    }
757
758    #[test]
759    fn test_compaction_config_defaults() {
760        let cfg = CompactionConfig::default();
761        assert!(cfg.enabled);
762        assert_eq!(cfg.min_files_for_compaction, 10);
763        assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
764        assert!(cfg.z_order_columns.is_empty());
765        assert_eq!(cfg.check_interval, Duration::from_secs(3600));
766    }
767
768    #[test]
769    fn test_partition_columns_empty_filter() {
770        let mut pairs = required_pairs();
771        pairs.push(("partition.columns", "a,,b, ,c"));
772        let config = make_config(&pairs);
773        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
774        assert_eq!(cfg.partition_columns, vec!["a", "b", "c"]);
775    }
776
777    // ── Cloud storage integration tests ──
778
779    #[test]
780    fn test_s3_path_requires_region() {
781        let config = make_config(&[("table.path", "s3://my-bucket/trades")]);
782        let result = DeltaLakeSinkConfig::from_config(&config);
783        assert!(result.is_err());
784        let err = result.unwrap_err().to_string();
785        assert!(err.contains("aws_region"), "error: {err}");
786    }
787
788    #[test]
789    fn test_s3_path_with_region_and_credentials() {
790        let config = make_config(&[
791            ("table.path", "s3://my-bucket/trades"),
792            ("storage.aws_region", "us-east-1"),
793            ("storage.aws_access_key_id", "AKID123"),
794            ("storage.aws_secret_access_key", "SECRET"),
795        ]);
796        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
797        assert_eq!(cfg.storage_options["aws_region"], "us-east-1");
798        assert_eq!(cfg.storage_options["aws_access_key_id"], "AKID123");
799    }
800
801    #[test]
802    fn test_s3_path_with_region_only_warns_no_error() {
803        // Missing credentials is a warning (IAM fallback), not a hard error.
804        let config = make_config(&[
805            ("table.path", "s3://my-bucket/trades"),
806            ("storage.aws_region", "us-east-1"),
807        ]);
808        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
809    }
810
811    #[test]
812    fn test_s3_path_access_key_without_secret_errors() {
813        let config = make_config(&[
814            ("table.path", "s3://my-bucket/trades"),
815            ("storage.aws_region", "us-east-1"),
816            ("storage.aws_access_key_id", "AKID123"),
817        ]);
818        let result = DeltaLakeSinkConfig::from_config(&config);
819        assert!(result.is_err());
820        let err = result.unwrap_err().to_string();
821        assert!(err.contains("aws_secret_access_key"), "error: {err}");
822    }
823
824    #[test]
825    fn test_azure_path_requires_account_name() {
826        let config = make_config(&[("table.path", "az://my-container/trades")]);
827        let result = DeltaLakeSinkConfig::from_config(&config);
828        assert!(result.is_err());
829        let err = result.unwrap_err().to_string();
830        assert!(err.contains("azure_storage_account_name"), "error: {err}");
831    }
832
833    #[test]
834    fn test_azure_path_with_account_name_and_key() {
835        let config = make_config(&[
836            ("table.path", "az://my-container/trades"),
837            ("storage.azure_storage_account_name", "myaccount"),
838            ("storage.azure_storage_account_key", "base64key=="),
839        ]);
840        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
841    }
842
843    #[test]
844    fn test_gcs_path_always_valid() {
845        // GCS missing credentials is warning-only (Application Default Credentials).
846        let config = make_config(&[("table.path", "gs://my-bucket/trades")]);
847        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
848    }
849
850    #[test]
851    fn test_local_path_no_cloud_validation() {
852        let config = make_config(&[("table.path", "/data/warehouse/trades")]);
853        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
854    }
855
856    #[test]
857    fn test_display_storage_options_redacts_secrets() {
858        let mut cfg = DeltaLakeSinkConfig::new("s3://bucket/path");
859        cfg.storage_options
860            .insert("aws_region".to_string(), "us-east-1".to_string());
861        cfg.storage_options.insert(
862            "aws_secret_access_key".to_string(),
863            "TOP_SECRET".to_string(),
864        );
865
866        let display = cfg.display_storage_options();
867        assert!(display.contains("aws_region=us-east-1"));
868        assert!(display.contains("aws_secret_access_key=***"));
869        assert!(!display.contains("TOP_SECRET"));
870    }
871
872    #[test]
873    fn test_display_storage_options_empty() {
874        let cfg = DeltaLakeSinkConfig::new("/local/path");
875        assert!(cfg.display_storage_options().is_empty());
876    }
877
878    // ── Catalog tests ──
879
880    #[test]
881    fn test_catalog_type_parse() {
882        assert_eq!(
883            "none".parse::<DeltaCatalogType>().unwrap(),
884            DeltaCatalogType::None
885        );
886        assert_eq!(
887            "glue".parse::<DeltaCatalogType>().unwrap(),
888            DeltaCatalogType::Glue
889        );
890        assert!(matches!(
891            "unity".parse::<DeltaCatalogType>().unwrap(),
892            DeltaCatalogType::Unity { .. }
893        ));
894        assert!("unknown".parse::<DeltaCatalogType>().is_err());
895    }
896
897    #[test]
898    fn test_catalog_type_display() {
899        assert_eq!(DeltaCatalogType::None.to_string(), "none");
900        assert_eq!(DeltaCatalogType::Glue.to_string(), "glue");
901        assert_eq!(
902            DeltaCatalogType::Unity {
903                workspace_url: "url".into(),
904                access_token: "tok".into()
905            }
906            .to_string(),
907            "unity"
908        );
909    }
910
911    #[test]
912    fn test_catalog_none_default() {
913        let config = make_config(&required_pairs());
914        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
915        assert_eq!(cfg.catalog_type, DeltaCatalogType::None);
916        assert!(cfg.catalog_database.is_none());
917        assert!(cfg.catalog_name.is_none());
918        assert!(cfg.catalog_schema.is_none());
919        assert!(cfg.catalog_properties.is_empty());
920    }
921
922    #[test]
923    fn test_catalog_glue_valid() {
924        let mut pairs = required_pairs();
925        pairs.extend_from_slice(&[
926            ("catalog.type", "glue"),
927            ("catalog.database", "my_database"),
928        ]);
929        let config = make_config(&pairs);
930        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
931        assert_eq!(cfg.catalog_type, DeltaCatalogType::Glue);
932        assert_eq!(cfg.catalog_database.as_deref(), Some("my_database"));
933    }
934
935    #[test]
936    fn test_catalog_glue_missing_database() {
937        let mut pairs = required_pairs();
938        pairs.push(("catalog.type", "glue"));
939        let config = make_config(&pairs);
940        let result = DeltaLakeSinkConfig::from_config(&config);
941        assert!(result.is_err());
942        let err = result.unwrap_err().to_string();
943        assert!(err.contains("catalog.database"), "error: {err}");
944    }
945
946    #[test]
947    fn test_catalog_unity_valid() {
948        let mut pairs = required_pairs();
949        pairs.extend_from_slice(&[
950            ("catalog.type", "unity"),
951            ("catalog.workspace_url", "https://my.databricks.com"),
952            ("catalog.access_token", "dapi123"),
953            ("catalog.name", "main"),
954            ("catalog.schema", "default"),
955        ]);
956        let config = make_config(&pairs);
957        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
958        assert!(matches!(
959            cfg.catalog_type,
960            DeltaCatalogType::Unity {
961                ref workspace_url,
962                ref access_token
963            }
964            if workspace_url == "https://my.databricks.com"
965                && access_token == "dapi123"
966        ));
967        assert_eq!(cfg.catalog_name.as_deref(), Some("main"));
968        assert_eq!(cfg.catalog_schema.as_deref(), Some("default"));
969    }
970
971    #[test]
972    fn test_catalog_unity_missing_workspace_url() {
973        let mut pairs = required_pairs();
974        pairs.extend_from_slice(&[
975            ("catalog.type", "unity"),
976            ("catalog.access_token", "dapi123"),
977            ("catalog.name", "main"),
978            ("catalog.schema", "default"),
979        ]);
980        let config = make_config(&pairs);
981        let result = DeltaLakeSinkConfig::from_config(&config);
982        assert!(result.is_err());
983        let err = result.unwrap_err().to_string();
984        assert!(err.contains("workspace_url"), "error: {err}");
985    }
986
987    #[test]
988    fn test_catalog_unity_missing_access_token() {
989        let mut pairs = required_pairs();
990        pairs.extend_from_slice(&[
991            ("catalog.type", "unity"),
992            ("catalog.workspace_url", "https://my.databricks.com"),
993            ("catalog.name", "main"),
994            ("catalog.schema", "default"),
995        ]);
996        let config = make_config(&pairs);
997        let result = DeltaLakeSinkConfig::from_config(&config);
998        assert!(result.is_err());
999        let err = result.unwrap_err().to_string();
1000        assert!(err.contains("access_token"), "error: {err}");
1001    }
1002
1003    #[test]
1004    fn test_catalog_properties_prefix() {
1005        let mut pairs = required_pairs();
1006        pairs.extend_from_slice(&[
1007            ("catalog.prop.token", "my_token"),
1008            ("catalog.prop.warehouse", "my_wh"),
1009        ]);
1010        let config = make_config(&pairs);
1011        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1012        assert_eq!(
1013            cfg.catalog_properties.get("token"),
1014            Some(&"my_token".to_string())
1015        );
1016        assert_eq!(
1017            cfg.catalog_properties.get("warehouse"),
1018            Some(&"my_wh".to_string())
1019        );
1020    }
1021}