Skip to main content

laminar_connectors/lakehouse/
delta_config.rs

1//! Delta Lake sink config. Parsed from SQL `WITH (...)` via
2//! [`DeltaLakeSinkConfig::from_config`].
3#![allow(clippy::disallowed_types)] // cold path: lakehouse configuration
4
5use std::collections::HashMap;
6use std::fmt;
7use std::str::FromStr;
8use std::time::Duration;
9
10use crate::config::ConnectorConfig;
11use crate::error::ConnectorError;
12use crate::storage::{
13    CloudConfigValidator, ResolvedStorageOptions, SecretMasker, StorageCredentialResolver,
14    StorageProvider,
15};
16
17/// Configuration for the Delta Lake sink connector.
18///
19/// Parsed from SQL `WITH (...)` clause options or constructed programmatically.
20#[derive(Debug, Clone)]
21pub struct DeltaLakeSinkConfig {
22    /// Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`).
23    pub table_path: String,
24
25    /// Columns to partition by (e.g., `["trade_date", "hour"]`).
26    pub partition_columns: Vec<String>,
27
28    /// Target Parquet file size in bytes (default: 128 MB).
29    pub target_file_size: usize,
30
31    /// Maximum number of records to buffer before flushing to Parquet.
32    pub max_buffer_records: usize,
33
34    /// Maximum time to buffer records before flushing.
35    pub max_buffer_duration: Duration,
36
37    /// Delta Lake checkpoint interval (create checkpoint every N commits).
38    pub checkpoint_interval: u64,
39
40    /// Whether to enable schema evolution (auto-merge new columns).
41    pub schema_evolution: bool,
42
43    /// Write mode: Append, Overwrite, or Upsert (CDC merge).
44    pub write_mode: DeltaWriteMode,
45
46    /// Key columns for upsert/merge operations (required for Upsert mode).
47    pub merge_key_columns: Vec<String>,
48
49    /// Storage options (S3 credentials, Azure keys, etc.).
50    pub storage_options: HashMap<String, String>,
51
52    /// Compaction configuration.
53    pub compaction: CompactionConfig,
54
55    /// Vacuum retention period for old files.
56    pub vacuum_retention: Duration,
57
58    /// Delivery guarantee: `AtLeastOnce` or `ExactlyOnce`.
59    pub delivery_guarantee: DeliveryGuarantee,
60
61    /// Writer ID for exactly-once deduplication.
62    pub writer_id: String,
63
64    /// Catalog type for table discovery.
65    pub catalog_type: DeltaCatalogType,
66
67    /// Catalog database name (required for Glue).
68    pub catalog_database: Option<String>,
69
70    /// Catalog name (required for Unity).
71    pub catalog_name: Option<String>,
72
73    /// Catalog schema name (required for Unity).
74    pub catalog_schema: Option<String>,
75
76    /// Storage location for auto-created Unity Catalog external tables.
77    /// When set and the `uc://` table doesn't exist, the sink creates it
78    /// via the Unity Catalog REST API at this storage location.
79    pub catalog_storage_location: Option<String>,
80
81    /// Maximum number of retries on optimistic concurrency conflicts
82    /// (default: 3). After exhausting retries, the conflict error is
83    /// propagated as fatal. Uses exponential backoff (100ms, 500ms, 2s).
84    pub max_commit_retries: u32,
85
86    /// Timeout for individual Delta write operations (default: 30s).
87    pub write_timeout: Duration,
88
89    /// Parquet writer properties (compression, bloom filters, statistics, etc.).
90    pub parquet: ParquetWriteConfig,
91}
92
93impl Default for DeltaLakeSinkConfig {
94    fn default() -> Self {
95        Self {
96            table_path: String::new(),
97            partition_columns: Vec::new(),
98            target_file_size: 128 * 1024 * 1024, // 128 MB
99            max_buffer_records: 100_000,
100            max_buffer_duration: Duration::from_secs(60),
101            checkpoint_interval: 10,
102            schema_evolution: false,
103            write_mode: DeltaWriteMode::Append,
104            merge_key_columns: Vec::new(),
105            storage_options: HashMap::new(),
106            compaction: CompactionConfig::default(),
107            vacuum_retention: Duration::from_secs(7 * 24 * 3600),
108            delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
109            writer_id: uuid::Uuid::new_v4().to_string(),
110            catalog_type: DeltaCatalogType::None,
111            catalog_database: None,
112            catalog_name: None,
113            catalog_schema: None,
114            catalog_storage_location: None,
115            max_commit_retries: 3,
116            write_timeout: Duration::from_secs(30),
117            parquet: ParquetWriteConfig::default(),
118        }
119    }
120}
121
122impl DeltaLakeSinkConfig {
123    /// Creates a minimal config for testing.
124    #[must_use]
125    pub fn new(table_path: &str) -> Self {
126        Self {
127            table_path: table_path.to_string(),
128            ..Default::default()
129        }
130    }
131
132    /// Parses a sink config from a [`ConnectorConfig`] (SQL WITH clause).
133    ///
134    /// # Required keys
135    ///
136    /// - `table.path` - Path to Delta Lake table
137    ///
138    /// # Errors
139    ///
140    /// Returns `ConnectorError::MissingConfig` if required keys are absent,
141    /// or `ConnectorError::ConfigurationError` on invalid values.
142    #[allow(clippy::too_many_lines)]
143    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
144        let mut cfg = Self {
145            table_path: config.require("table.path")?.to_string(),
146            ..Self::default()
147        };
148
149        if let Some(v) = config.get("partition.columns") {
150            cfg.partition_columns = v
151                .split(',')
152                .map(|c| c.trim().to_string())
153                .filter(|c| !c.is_empty())
154                .collect();
155        }
156        if let Some(v) = config.get("target.file.size") {
157            cfg.target_file_size = v.parse().map_err(|_| {
158                ConnectorError::ConfigurationError(format!("invalid target.file.size: '{v}'"))
159            })?;
160        }
161        if let Some(v) = config.get("max.buffer.records") {
162            cfg.max_buffer_records = v.parse().map_err(|_| {
163                ConnectorError::ConfigurationError(format!("invalid max.buffer.records: '{v}'"))
164            })?;
165        }
166        if let Some(v) = config.get("max.buffer.duration.ms") {
167            let ms: u64 = v.parse().map_err(|_| {
168                ConnectorError::ConfigurationError(format!("invalid max.buffer.duration.ms: '{v}'"))
169            })?;
170            cfg.max_buffer_duration = Duration::from_millis(ms);
171        }
172        if let Some(v) = config.get("checkpoint.interval") {
173            cfg.checkpoint_interval = v.parse().map_err(|_| {
174                ConnectorError::ConfigurationError(format!("invalid checkpoint.interval: '{v}'"))
175            })?;
176        }
177        if let Some(v) = config.get("schema.evolution") {
178            cfg.schema_evolution = v.eq_ignore_ascii_case("true");
179        }
180        if let Some(v) = config.get("write.mode") {
181            cfg.write_mode = v.parse().map_err(|_| {
182                ConnectorError::ConfigurationError(format!(
183                    "invalid write.mode: '{v}' (expected 'append', 'overwrite', or 'upsert')"
184                ))
185            })?;
186        }
187        if let Some(v) = config.get("merge.key.columns") {
188            cfg.merge_key_columns = v
189                .split(',')
190                .map(|c| c.trim().to_string())
191                .filter(|c| !c.is_empty())
192                .collect();
193        }
194        if let Some(v) = config.get("delivery.guarantee") {
195            cfg.delivery_guarantee = v.parse().map_err(|_| {
196                ConnectorError::ConfigurationError(format!(
197                    "invalid delivery.guarantee: '{v}' \
198                     (expected 'exactly-once' or 'at-least-once')"
199                ))
200            })?;
201        }
202        if let Some(v) = config.get("compaction.enabled") {
203            cfg.compaction.enabled = v.eq_ignore_ascii_case("true");
204        }
205        if let Some(v) = config.get("compaction.z-order.columns") {
206            cfg.compaction.z_order_columns = v
207                .split(',')
208                .map(|c| c.trim().to_string())
209                .filter(|c| !c.is_empty())
210                .collect();
211        }
212        if let Some(v) = config.get("compaction.target-file-size") {
213            cfg.compaction.target_file_size = v.parse().map_err(|_| {
214                ConnectorError::ConfigurationError(format!(
215                    "invalid compaction.target-file-size: '{v}'"
216                ))
217            })?;
218        } else {
219            // Default compaction target file size to the sink's target_file_size.
220            cfg.compaction.target_file_size = cfg.target_file_size;
221        }
222        if let Some(v) = config.get("compaction.min-files") {
223            cfg.compaction.min_files_for_compaction = v.parse().map_err(|_| {
224                ConnectorError::ConfigurationError(format!("invalid compaction.min-files: '{v}'"))
225            })?;
226        }
227        if let Some(v) = config.get("compaction.check-interval.ms") {
228            let ms: u64 = v.parse().map_err(|_| {
229                ConnectorError::ConfigurationError(format!(
230                    "invalid compaction.check-interval.ms: '{v}'"
231                ))
232            })?;
233            if ms == 0 {
234                return Err(ConnectorError::ConfigurationError(
235                    "compaction.check-interval.ms must be > 0".into(),
236                ));
237            }
238            cfg.compaction.check_interval = Duration::from_millis(ms);
239        }
240        if let Some(v) = config.get("vacuum.retention.hours") {
241            let hours: u64 = v.parse().map_err(|_| {
242                ConnectorError::ConfigurationError(format!("invalid vacuum.retention.hours: '{v}'"))
243            })?;
244            cfg.vacuum_retention = Duration::from_secs(hours * 3600);
245        }
246        if let Some(v) = config.get("writer.id") {
247            cfg.writer_id = v.to_string();
248        } else if cfg.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
249            return Err(ConnectorError::ConfigurationError(
250                "exactly-once delivery requires an explicit 'writer.id' for stable \
251                 recovery across restarts"
252                    .into(),
253            ));
254        }
255
256        // ── Catalog configuration ──
257        if let Some(v) = config.get("catalog.type") {
258            cfg.catalog_type = v.parse().map_err(|_| {
259                ConnectorError::ConfigurationError(format!(
260                    "invalid catalog.type: '{v}' (expected 'none', 'glue', or 'unity')"
261                ))
262            })?;
263        }
264        if let Some(v) = config.get("catalog.database") {
265            cfg.catalog_database = Some(v.to_string());
266        }
267        if let Some(v) = config.get("catalog.name") {
268            cfg.catalog_name = Some(v.to_string());
269        }
270        if let Some(v) = config.get("catalog.schema") {
271            cfg.catalog_schema = Some(v.to_string());
272        }
273        // Unity-specific: populate workspace_url and access_token into the enum variant.
274        if let DeltaCatalogType::Unity {
275            ref mut workspace_url,
276            ref mut access_token,
277        } = cfg.catalog_type
278        {
279            if let Some(v) = config.get("catalog.workspace_url") {
280                *workspace_url = v.to_string();
281            }
282            if let Some(v) = config.get("catalog.access_token") {
283                *access_token = v.to_string();
284            }
285        }
286        if let Some(v) = config.get("catalog.storage.location") {
287            cfg.catalog_storage_location = Some(v.to_string());
288        }
289        if let Some(v) = config.get("max.commit.retries") {
290            cfg.max_commit_retries = v.parse().map_err(|_| {
291                ConnectorError::ConfigurationError(format!("invalid max.commit.retries: '{v}'"))
292            })?;
293        }
294        if let Some(v) = config.get("write.timeout.ms") {
295            let ms: u64 = v.parse().map_err(|_| {
296                ConnectorError::ConfigurationError(format!("invalid write.timeout.ms: '{v}'"))
297            })?;
298            cfg.write_timeout = Duration::from_millis(ms);
299        }
300
301        // ── Parquet writer configuration ──
302        if let Some(v) = config.get("parquet.compression") {
303            cfg.parquet.compression = v.to_string();
304        }
305        if let Some(v) = config.get("parquet.compression.level") {
306            cfg.parquet.compression_level = v.parse().map_err(|_| {
307                ConnectorError::ConfigurationError(format!(
308                    "invalid parquet.compression.level: '{v}'"
309                ))
310            })?;
311        }
312        if let Some(v) = config.get("parquet.compaction.compression.level") {
313            cfg.parquet.compaction_compression_level = Some(v.parse().map_err(|_| {
314                ConnectorError::ConfigurationError(format!(
315                    "invalid parquet.compaction.compression.level: '{v}'"
316                ))
317            })?);
318        }
319        if let Some(v) = config.get("parquet.dictionary.enabled") {
320            cfg.parquet.dictionary_enabled = v.eq_ignore_ascii_case("true");
321        }
322        if let Some(v) = config.get("parquet.statistics") {
323            cfg.parquet.statistics = v.to_string();
324        }
325        if let Some(v) = config.get("parquet.bloom.filter.columns") {
326            cfg.parquet.bloom_filter_columns = v
327                .split(',')
328                .map(|c| c.trim().to_string())
329                .filter(|c| !c.is_empty())
330                .collect();
331        }
332        if let Some(v) = config.get("parquet.bloom.filter.fpp") {
333            cfg.parquet.bloom_filter_fpp = v.parse().map_err(|_| {
334                ConnectorError::ConfigurationError(format!(
335                    "invalid parquet.bloom.filter.fpp: '{v}'"
336                ))
337            })?;
338        }
339        if let Some(v) = config.get("parquet.bloom.filter.ndv") {
340            cfg.parquet.bloom_filter_ndv = v.parse().map_err(|_| {
341                ConnectorError::ConfigurationError(format!(
342                    "invalid parquet.bloom.filter.ndv: '{v}'"
343                ))
344            })?;
345        }
346        if let Some(v) = config.get("parquet.max.row.group.size") {
347            cfg.parquet.max_row_group_size = v.parse().map_err(|_| {
348                ConnectorError::ConfigurationError(format!(
349                    "invalid parquet.max.row.group.size: '{v}'"
350                ))
351            })?;
352        }
353
354        // Resolve storage credentials: explicit options + environment variable fallbacks.
355        let explicit_storage = config.properties_with_prefix("storage.");
356        let resolved = StorageCredentialResolver::resolve(&cfg.table_path, &explicit_storage);
357        cfg.storage_options = resolved.options;
358
359        // Map LogStore configuration keys to delta-rs storage options.
360        if let Some(v) = config.get("storage.s3_locking_provider") {
361            cfg.storage_options
362                .insert("AWS_S3_LOCKING_PROVIDER".to_string(), v.to_string());
363        }
364        if let Some(v) = config.get("storage.dynamodb_table_name") {
365            cfg.storage_options
366                .insert("DELTA_DYNAMO_TABLE_NAME".to_string(), v.to_string());
367        }
368
369        cfg.validate()?;
370        Ok(cfg)
371    }
372
373    /// Formats the storage options for safe logging with secrets redacted.
374    #[must_use]
375    pub fn display_storage_options(&self) -> String {
376        SecretMasker::display_map(&self.storage_options)
377    }
378
379    /// Validates the configuration for consistency.
380    ///
381    /// # Errors
382    ///
383    /// Returns `ConnectorError::ConfigurationError` on invalid combinations.
384    pub fn validate(&self) -> Result<(), ConnectorError> {
385        if self.table_path.is_empty() {
386            return Err(ConnectorError::missing_config("table.path"));
387        }
388        if self.write_mode == DeltaWriteMode::Upsert && self.merge_key_columns.is_empty() {
389            return Err(ConnectorError::ConfigurationError(
390                "upsert mode requires 'merge.key.columns' to be set".into(),
391            ));
392        }
393        if self.max_buffer_records == 0 {
394            return Err(ConnectorError::ConfigurationError(
395                "max.buffer.records must be > 0".into(),
396            ));
397        }
398        if self.target_file_size == 0 {
399            return Err(ConnectorError::ConfigurationError(
400                "target.file.size must be > 0".into(),
401            ));
402        }
403        if self.checkpoint_interval == 0 {
404            return Err(ConnectorError::ConfigurationError(
405                "checkpoint.interval must be > 0".into(),
406            ));
407        }
408        if self.write_timeout < Duration::from_secs(5) {
409            return Err(ConnectorError::ConfigurationError(
410                "write.timeout.ms must be >= 5000 (5 seconds)".into(),
411            ));
412        }
413        if self.compaction.check_interval.is_zero() {
414            return Err(ConnectorError::ConfigurationError(
415                "compaction.check-interval.ms must be > 0".into(),
416            ));
417        }
418        if self.vacuum_retention < Duration::from_secs(86400) {
419            return Err(ConnectorError::ConfigurationError(
420                "vacuum.retention.hours must be >= 24 (Delta Lake safety minimum)".into(),
421            ));
422        }
423
424        match self.parquet.compression.to_lowercase().as_str() {
425            "zstd" | "snappy" | "lz4" | "gzip" | "none" | "uncompressed" => {}
426            other => {
427                return Err(ConnectorError::ConfigurationError(format!(
428                    "unknown parquet.compression: '{other}' \
429                     (expected 'zstd', 'snappy', 'lz4', 'gzip', or 'none')"
430                )));
431            }
432        }
433        match self.parquet.statistics.to_lowercase().as_str() {
434            "none" | "chunk" | "page" => {}
435            other => {
436                return Err(ConnectorError::ConfigurationError(format!(
437                    "unknown parquet.statistics: '{other}' (expected 'none', 'chunk', or 'page')"
438                )));
439            }
440        }
441        if self.parquet.bloom_filter_fpp <= 0.0 || self.parquet.bloom_filter_fpp >= 1.0 {
442            return Err(ConnectorError::ConfigurationError(
443                "parquet.bloom.filter.fpp must be in (0.0, 1.0)".into(),
444            ));
445        }
446        if self.parquet.max_row_group_size == 0 {
447            return Err(ConnectorError::ConfigurationError(
448                "parquet.max.row.group.size must be > 0".into(),
449            ));
450        }
451        // Eagerly validate that WriterProperties can be built so invalid
452        // codec/level combos are caught at config time, not first write.
453        #[cfg(feature = "delta-lake")]
454        {
455            self.parquet.to_writer_properties()?;
456            self.parquet.compaction_writer_properties()?;
457        }
458
459        self.validate_catalog()?;
460
461        // Validate cloud storage credentials (skip when catalog resolves the path).
462        if self.catalog_type == DeltaCatalogType::None {
463            let resolved = ResolvedStorageOptions {
464                provider: StorageProvider::detect(&self.table_path),
465                options: self.storage_options.clone(),
466                env_resolved_keys: Vec::new(),
467            };
468            let cloud_result = CloudConfigValidator::validate(&resolved);
469            if !cloud_result.is_valid() {
470                return Err(ConnectorError::ConfigurationError(
471                    cloud_result.error_message(),
472                ));
473            }
474        }
475
476        Ok(())
477    }
478
479    /// Validates catalog-specific requirements.
480    fn validate_catalog(&self) -> Result<(), ConnectorError> {
481        match &self.catalog_type {
482            DeltaCatalogType::None => {}
483            DeltaCatalogType::Glue => {
484                #[cfg(not(feature = "delta-lake-glue"))]
485                return Err(ConnectorError::ConfigurationError(
486                    "Glue catalog requires the 'delta-lake-glue' feature. \
487                     Build with: cargo build --features delta-lake-glue"
488                        .into(),
489                ));
490                #[cfg(feature = "delta-lake-glue")]
491                if self.catalog_database.is_none() {
492                    return Err(ConnectorError::ConfigurationError(
493                        "Glue catalog requires 'catalog.database' to be set".into(),
494                    ));
495                }
496            }
497            DeltaCatalogType::Unity {
498                workspace_url,
499                access_token,
500            } => {
501                #[cfg(not(feature = "delta-lake-unity"))]
502                {
503                    let _ = (workspace_url, access_token);
504                    return Err(ConnectorError::ConfigurationError(
505                        "Unity catalog requires the 'delta-lake-unity' feature. \
506                         Build with: cargo build --features delta-lake-unity"
507                            .into(),
508                    ));
509                }
510                #[cfg(feature = "delta-lake-unity")]
511                {
512                    if workspace_url.is_empty() {
513                        return Err(ConnectorError::ConfigurationError(
514                            "Unity catalog requires 'catalog.workspace_url' to be set".into(),
515                        ));
516                    }
517                    if access_token.is_empty() {
518                        return Err(ConnectorError::ConfigurationError(
519                            "Unity catalog requires 'catalog.access_token' to be set".into(),
520                        ));
521                    }
522                    if self.catalog_storage_location.is_some() {
523                        if self.catalog_name.is_none() {
524                            return Err(ConnectorError::ConfigurationError(
525                                "Unity catalog auto-create requires 'catalog.name' to be set"
526                                    .into(),
527                            ));
528                        }
529                        if self.catalog_schema.is_none() {
530                            return Err(ConnectorError::ConfigurationError(
531                                "Unity catalog auto-create requires 'catalog.schema' to be set"
532                                    .into(),
533                            ));
534                        }
535                    }
536                }
537            }
538        }
539        Ok(())
540    }
541}
542
543/// Delta Lake write mode.
544#[derive(Debug, Clone, Copy, PartialEq, Eq)]
545pub enum DeltaWriteMode {
546    /// Append-only: all records are inserts. Most efficient for immutable streams.
547    Append,
548    /// Overwrite: replace partition contents. Used for batch-style recomputation.
549    Overwrite,
550    /// Upsert/Merge: CDC-style insert/update/delete via MERGE statement.
551    /// Requires `merge_key_columns` to be set. Integrates with Z-sets.
552    Upsert,
553}
554
555impl FromStr for DeltaWriteMode {
556    type Err = String;
557
558    fn from_str(s: &str) -> Result<Self, Self::Err> {
559        match s.to_lowercase().as_str() {
560            "append" => Ok(Self::Append),
561            "overwrite" => Ok(Self::Overwrite),
562            "upsert" | "merge" => Ok(Self::Upsert),
563            other => Err(format!("unknown write mode: '{other}'")),
564        }
565    }
566}
567
568impl fmt::Display for DeltaWriteMode {
569    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570        match self {
571            Self::Append => write!(f, "append"),
572            Self::Overwrite => write!(f, "overwrite"),
573            Self::Upsert => write!(f, "upsert"),
574        }
575    }
576}
577
578pub use crate::connector::DeliveryGuarantee;
579
580/// Delta Lake catalog type for table discovery.
581///
582/// Catalogs enable referencing tables by logical names instead of raw paths.
583#[derive(Debug, Clone, PartialEq, Eq, Default)]
584pub enum DeltaCatalogType {
585    /// No catalog — table path is a direct file or cloud URI.
586    #[default]
587    None,
588    /// AWS Glue Data Catalog.
589    Glue,
590    /// Databricks Unity Catalog.
591    Unity {
592        /// Databricks workspace URL (e.g., `https://xxx.cloud.databricks.com`).
593        workspace_url: String,
594        /// Databricks access token.
595        access_token: String,
596    },
597}
598
599impl FromStr for DeltaCatalogType {
600    type Err = String;
601
602    fn from_str(s: &str) -> Result<Self, Self::Err> {
603        match s.to_lowercase().as_str() {
604            "none" | "" => Ok(Self::None),
605            "glue" => Ok(Self::Glue),
606            "unity" => Ok(Self::Unity {
607                workspace_url: String::new(),
608                access_token: String::new(),
609            }),
610            other => Err(format!("unknown catalog type: '{other}'")),
611        }
612    }
613}
614
615impl fmt::Display for DeltaCatalogType {
616    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617        match self {
618            Self::None => write!(f, "none"),
619            Self::Glue => write!(f, "glue"),
620            Self::Unity { .. } => write!(f, "unity"),
621        }
622    }
623}
624
625/// Configuration for background compaction.
626#[derive(Debug, Clone)]
627pub struct CompactionConfig {
628    /// Whether to run automatic compaction.
629    pub enabled: bool,
630
631    /// Minimum number of files before triggering compaction.
632    pub min_files_for_compaction: usize,
633
634    /// Target file size after compaction.
635    pub target_file_size: usize,
636
637    /// Columns for Z-ORDER clustering (optional).
638    pub z_order_columns: Vec<String>,
639
640    /// How often to check if compaction is needed.
641    pub check_interval: Duration,
642}
643
644impl Default for CompactionConfig {
645    fn default() -> Self {
646        Self {
647            enabled: true,
648            min_files_for_compaction: 10,
649            target_file_size: 128 * 1024 * 1024, // 128 MB
650            z_order_columns: Vec::new(),
651            check_interval: Duration::from_secs(3600), // 60 minutes
652        }
653    }
654}
655
656/// Configuration for Parquet writer properties (compression, dictionary
657/// encoding, statistics, bloom filters, row group sizing).
658#[derive(Debug, Clone)]
659pub struct ParquetWriteConfig {
660    /// Compression codec: `"zstd"`, `"snappy"`, `"lz4"`, `"gzip"`, or `"none"`.
661    pub compression: String,
662    /// Compression level (default: 1 — ZSTD L1 for hot writes).
663    pub compression_level: i32,
664    /// Optional higher compression level used during compaction (default: 3).
665    pub compaction_compression_level: Option<i32>,
666    /// Whether to enable dictionary encoding (default: true).
667    pub dictionary_enabled: bool,
668    /// Statistics granularity: `"none"`, `"chunk"`, or `"page"` (default: `"page"`).
669    pub statistics: String,
670    /// Columns to build bloom filters for (default: empty).
671    pub bloom_filter_columns: Vec<String>,
672    /// Bloom filter false-positive probability (default: 0.01).
673    pub bloom_filter_fpp: f64,
674    /// Bloom filter expected number of distinct values (0 = parquet default).
675    pub bloom_filter_ndv: u64,
676    /// Maximum rows per row group (default: 1,000,000).
677    pub max_row_group_size: usize,
678}
679
680impl Default for ParquetWriteConfig {
681    fn default() -> Self {
682        Self {
683            compression: "zstd".to_string(),
684            compression_level: 1,
685            compaction_compression_level: Some(3),
686            dictionary_enabled: true,
687            statistics: "page".to_string(),
688            bloom_filter_columns: Vec::new(),
689            bloom_filter_fpp: 0.01,
690            bloom_filter_ndv: 0,
691            max_row_group_size: 1_000_000,
692        }
693    }
694}
695
696#[cfg(feature = "delta-lake")]
697impl ParquetWriteConfig {
698    /// Builds `WriterProperties` for hot-path writes (uses `compression_level`).
699    ///
700    /// # Errors
701    ///
702    /// Returns `ConnectorError::ConfigurationError` on invalid codec/level.
703    pub fn to_writer_properties(
704        &self,
705    ) -> Result<deltalake::parquet::file::properties::WriterProperties, ConnectorError> {
706        self.build_properties(self.compression_level)
707    }
708
709    /// Builds `WriterProperties` for compaction (uses `compaction_compression_level`
710    /// if set, otherwise falls back to `compression_level`).
711    ///
712    /// # Errors
713    ///
714    /// Returns `ConnectorError::ConfigurationError` on invalid codec/level.
715    pub fn compaction_writer_properties(
716        &self,
717    ) -> Result<deltalake::parquet::file::properties::WriterProperties, ConnectorError> {
718        let level = self
719            .compaction_compression_level
720            .unwrap_or(self.compression_level);
721        self.build_properties(level)
722    }
723
724    /// Shared builder: maps string codec → `Compression`, sets dictionary,
725    /// statistics, bloom filters, and row group size.
726    fn build_properties(
727        &self,
728        level: i32,
729    ) -> Result<deltalake::parquet::file::properties::WriterProperties, ConnectorError> {
730        use deltalake::parquet::basic::{Compression, GzipLevel, ZstdLevel};
731        use deltalake::parquet::file::properties::{EnabledStatistics, WriterProperties};
732        use deltalake::parquet::schema::types::ColumnPath;
733
734        let compression = match self.compression.to_lowercase().as_str() {
735            "zstd" => {
736                let zstd_level = ZstdLevel::try_new(level).map_err(|e| {
737                    ConnectorError::ConfigurationError(format!("invalid ZSTD level {level}: {e}"))
738                })?;
739                Compression::ZSTD(zstd_level)
740            }
741            "snappy" => Compression::SNAPPY,
742            "lz4" => Compression::LZ4_RAW,
743            "gzip" => {
744                let level_u32: u32 = level.try_into().map_err(|_| {
745                    ConnectorError::ConfigurationError(format!(
746                        "invalid GZIP level {level}: must be non-negative"
747                    ))
748                })?;
749                let gzip_level = GzipLevel::try_new(level_u32).map_err(|e| {
750                    ConnectorError::ConfigurationError(format!("invalid GZIP level {level}: {e}"))
751                })?;
752                Compression::GZIP(gzip_level)
753            }
754            "none" | "uncompressed" => Compression::UNCOMPRESSED,
755            other => {
756                return Err(ConnectorError::ConfigurationError(format!(
757                    "unknown parquet.compression: '{other}' \
758                     (expected 'zstd', 'snappy', 'lz4', 'gzip', or 'none')"
759                )));
760            }
761        };
762
763        let statistics = match self.statistics.to_lowercase().as_str() {
764            "none" => EnabledStatistics::None,
765            "chunk" => EnabledStatistics::Chunk,
766            "page" => EnabledStatistics::Page,
767            other => {
768                return Err(ConnectorError::ConfigurationError(format!(
769                    "unknown parquet.statistics: '{other}' (expected 'none', 'chunk', or 'page')"
770                )));
771            }
772        };
773
774        let mut builder = WriterProperties::builder()
775            .set_compression(compression)
776            .set_dictionary_enabled(self.dictionary_enabled)
777            .set_statistics_enabled(statistics)
778            .set_max_row_group_size(self.max_row_group_size);
779
780        for col_name in &self.bloom_filter_columns {
781            let col_path = ColumnPath::from(col_name.as_str());
782            builder = builder
783                .set_column_bloom_filter_enabled(col_path.clone(), true)
784                .set_column_bloom_filter_fpp(col_path.clone(), self.bloom_filter_fpp);
785            if self.bloom_filter_ndv > 0 {
786                builder = builder.set_column_bloom_filter_ndv(col_path, self.bloom_filter_ndv);
787            }
788        }
789
790        Ok(builder.build())
791    }
792}
793
794#[cfg(test)]
795#[allow(clippy::field_reassign_with_default)]
796mod tests {
797    use super::*;
798
799    fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
800        let mut config = ConnectorConfig::new("delta-lake");
801        for (k, v) in pairs {
802            config.set(*k, *v);
803        }
804        config
805    }
806
807    fn required_pairs() -> Vec<(&'static str, &'static str)> {
808        vec![("table.path", "/data/warehouse/trades")]
809    }
810
811    // ── Config parsing tests ──
812
813    #[test]
814    fn test_parse_required_fields() {
815        let config = make_config(&required_pairs());
816        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
817        assert_eq!(cfg.table_path, "/data/warehouse/trades");
818        assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
819        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
820        assert!(cfg.partition_columns.is_empty());
821        assert!(cfg.merge_key_columns.is_empty());
822        assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
823        assert_eq!(cfg.max_buffer_records, 100_000);
824        assert_eq!(cfg.checkpoint_interval, 10);
825        assert!(!cfg.schema_evolution);
826        assert!(cfg.compaction.enabled);
827    }
828
829    #[test]
830    fn test_missing_table_path() {
831        let config = ConnectorConfig::new("delta-lake");
832        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
833    }
834
835    #[test]
836    fn test_parse_all_optional_fields() {
837        let mut pairs = required_pairs();
838        pairs.extend_from_slice(&[
839            ("partition.columns", "trade_date, hour"),
840            ("target.file.size", "67108864"),
841            ("max.buffer.records", "50000"),
842            ("max.buffer.duration.ms", "30000"),
843            ("checkpoint.interval", "20"),
844            ("schema.evolution", "true"),
845            ("write.mode", "upsert"),
846            ("merge.key.columns", "customer_id, order_id"),
847            ("delivery.guarantee", "at-least-once"),
848            ("compaction.enabled", "true"),
849            ("compaction.z-order.columns", "customer_id, product_id"),
850            ("compaction.min-files", "20"),
851            ("vacuum.retention.hours", "336"),
852            ("writer.id", "my-writer"),
853            ("storage.aws_access_key_id", "AKID123"),
854            ("storage.aws_region", "us-east-1"),
855        ]);
856        let config = make_config(&pairs);
857        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
858
859        assert_eq!(cfg.partition_columns, vec!["trade_date", "hour"]);
860        assert_eq!(cfg.target_file_size, 67_108_864);
861        assert_eq!(cfg.max_buffer_records, 50_000);
862        assert_eq!(cfg.max_buffer_duration, Duration::from_secs(30));
863        assert_eq!(cfg.checkpoint_interval, 20);
864        assert!(cfg.schema_evolution);
865        assert_eq!(cfg.write_mode, DeltaWriteMode::Upsert);
866        assert_eq!(cfg.merge_key_columns, vec!["customer_id", "order_id"]);
867        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
868        assert!(cfg.compaction.enabled);
869        assert_eq!(
870            cfg.compaction.z_order_columns,
871            vec!["customer_id", "product_id"]
872        );
873        assert_eq!(cfg.compaction.min_files_for_compaction, 20);
874        assert_eq!(cfg.vacuum_retention, Duration::from_secs(1209600));
875        assert_eq!(cfg.writer_id, "my-writer");
876        assert_eq!(
877            cfg.storage_options.get("aws_access_key_id"),
878            Some(&"AKID123".to_string())
879        );
880        assert_eq!(
881            cfg.storage_options.get("aws_region"),
882            Some(&"us-east-1".to_string())
883        );
884    }
885
886    #[test]
887    fn test_upsert_requires_merge_key() {
888        let mut pairs = required_pairs();
889        pairs.push(("write.mode", "upsert"));
890        let config = make_config(&pairs);
891        let result = DeltaLakeSinkConfig::from_config(&config);
892        assert!(result.is_err());
893        let err = result.unwrap_err().to_string();
894        assert!(err.contains("merge.key.columns"), "error: {err}");
895    }
896
897    #[test]
898    fn test_empty_table_path_rejected() {
899        let mut cfg = DeltaLakeSinkConfig::default();
900        cfg.table_path = String::new();
901        assert!(cfg.validate().is_err());
902    }
903
904    #[test]
905    fn test_zero_max_buffer_records_rejected() {
906        let mut pairs = required_pairs();
907        pairs.push(("max.buffer.records", "0"));
908        let config = make_config(&pairs);
909        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
910    }
911
912    #[test]
913    fn test_zero_target_file_size_rejected() {
914        let mut pairs = required_pairs();
915        pairs.push(("target.file.size", "0"));
916        let config = make_config(&pairs);
917        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
918    }
919
920    #[test]
921    fn test_zero_checkpoint_interval_rejected() {
922        let mut pairs = required_pairs();
923        pairs.push(("checkpoint.interval", "0"));
924        let config = make_config(&pairs);
925        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
926    }
927
928    #[test]
929    fn test_vacuum_retention_below_24h_rejected() {
930        let mut pairs = required_pairs();
931        pairs.push(("vacuum.retention.hours", "12"));
932        let config = make_config(&pairs);
933        let result = DeltaLakeSinkConfig::from_config(&config);
934        assert!(result.is_err());
935        let err = result.unwrap_err().to_string();
936        assert!(err.contains("24"), "error: {err}");
937    }
938
939    #[test]
940    fn test_vacuum_retention_24h_accepted() {
941        let mut pairs = required_pairs();
942        pairs.push(("vacuum.retention.hours", "24"));
943        let config = make_config(&pairs);
944        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
945    }
946
947    #[test]
948    fn test_compaction_target_file_size_from_config() {
949        let mut pairs = required_pairs();
950        pairs.push(("compaction.target-file-size", "67108864"));
951        let config = make_config(&pairs);
952        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
953        assert_eq!(cfg.compaction.target_file_size, 67_108_864);
954    }
955
956    #[test]
957    fn test_compaction_target_file_size_defaults_to_sink() {
958        let mut pairs = required_pairs();
959        pairs.push(("target.file.size", "33554432"));
960        let config = make_config(&pairs);
961        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
962        assert_eq!(cfg.compaction.target_file_size, 33_554_432);
963    }
964
965    #[test]
966    fn test_exactly_once_requires_writer_id() {
967        let mut pairs = required_pairs();
968        pairs.push(("delivery.guarantee", "exactly-once"));
969        let config = make_config(&pairs);
970        let err = DeltaLakeSinkConfig::from_config(&config).unwrap_err();
971        assert!(err.to_string().contains("writer.id"), "error: {err}");
972    }
973
974    #[test]
975    fn test_exactly_once_with_writer_id_ok() {
976        let mut pairs = required_pairs();
977        pairs.push(("delivery.guarantee", "exactly-once"));
978        pairs.push(("writer.id", "my-stable-writer"));
979        let config = make_config(&pairs);
980        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
981        assert_eq!(cfg.writer_id, "my-stable-writer");
982    }
983
984    #[test]
985    fn test_invalid_target_file_size() {
986        let mut pairs = required_pairs();
987        pairs.push(("target.file.size", "abc"));
988        let config = make_config(&pairs);
989        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
990    }
991
992    #[test]
993    fn test_invalid_write_mode() {
994        let mut pairs = required_pairs();
995        pairs.push(("write.mode", "unknown"));
996        let config = make_config(&pairs);
997        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
998    }
999
1000    #[test]
1001    fn test_storage_options_prefix_stripping() {
1002        let mut pairs = required_pairs();
1003        pairs.push(("storage.aws_access_key_id", "AKID"));
1004        pairs.push(("storage.aws_secret_access_key", "SECRET"));
1005        pairs.push(("table.path", "/data/test"));
1006        let config = make_config(&pairs);
1007        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1008
1009        assert_eq!(cfg.storage_options.len(), 2);
1010        assert!(cfg.storage_options.contains_key("aws_access_key_id"));
1011        assert!(cfg.storage_options.contains_key("aws_secret_access_key"));
1012        assert!(!cfg
1013            .storage_options
1014            .contains_key("storage.aws_access_key_id"));
1015    }
1016
1017    #[test]
1018    fn test_defaults() {
1019        let cfg = DeltaLakeSinkConfig::default();
1020        assert!(cfg.table_path.is_empty());
1021        assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
1022        assert_eq!(cfg.max_buffer_records, 100_000);
1023        assert_eq!(cfg.max_buffer_duration, Duration::from_secs(60));
1024        assert_eq!(cfg.checkpoint_interval, 10);
1025        assert!(!cfg.schema_evolution);
1026        assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
1027        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
1028        assert!(!cfg.writer_id.is_empty());
1029        assert_eq!(cfg.max_commit_retries, 3);
1030    }
1031
1032    #[test]
1033    fn test_max_commit_retries_from_config() {
1034        let mut pairs = required_pairs();
1035        pairs.push(("max.commit.retries", "5"));
1036        let config = make_config(&pairs);
1037        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1038        assert_eq!(cfg.max_commit_retries, 5);
1039    }
1040
1041    #[test]
1042    fn test_max_commit_retries_invalid() {
1043        let mut pairs = required_pairs();
1044        pairs.push(("max.commit.retries", "abc"));
1045        let config = make_config(&pairs);
1046        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1047    }
1048
1049    #[test]
1050    fn test_new_helper() {
1051        let cfg = DeltaLakeSinkConfig::new("/tmp/test_table");
1052        assert_eq!(cfg.table_path, "/tmp/test_table");
1053        assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
1054    }
1055
1056    // ── Enum tests ──
1057
1058    #[test]
1059    fn test_write_mode_parse() {
1060        assert_eq!(
1061            "append".parse::<DeltaWriteMode>().unwrap(),
1062            DeltaWriteMode::Append
1063        );
1064        assert_eq!(
1065            "overwrite".parse::<DeltaWriteMode>().unwrap(),
1066            DeltaWriteMode::Overwrite
1067        );
1068        assert_eq!(
1069            "upsert".parse::<DeltaWriteMode>().unwrap(),
1070            DeltaWriteMode::Upsert
1071        );
1072        assert_eq!(
1073            "merge".parse::<DeltaWriteMode>().unwrap(),
1074            DeltaWriteMode::Upsert
1075        );
1076        assert!("unknown".parse::<DeltaWriteMode>().is_err());
1077    }
1078
1079    #[test]
1080    fn test_write_mode_display() {
1081        assert_eq!(DeltaWriteMode::Append.to_string(), "append");
1082        assert_eq!(DeltaWriteMode::Overwrite.to_string(), "overwrite");
1083        assert_eq!(DeltaWriteMode::Upsert.to_string(), "upsert");
1084    }
1085
1086    #[test]
1087    fn test_delivery_guarantee_parse() {
1088        assert_eq!(
1089            "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
1090            DeliveryGuarantee::AtLeastOnce
1091        );
1092        assert_eq!(
1093            "at_least_once".parse::<DeliveryGuarantee>().unwrap(),
1094            DeliveryGuarantee::AtLeastOnce
1095        );
1096        assert_eq!(
1097            "exactly-once".parse::<DeliveryGuarantee>().unwrap(),
1098            DeliveryGuarantee::ExactlyOnce
1099        );
1100        assert_eq!(
1101            "exactly_once".parse::<DeliveryGuarantee>().unwrap(),
1102            DeliveryGuarantee::ExactlyOnce
1103        );
1104        assert!("unknown".parse::<DeliveryGuarantee>().is_err());
1105    }
1106
1107    #[test]
1108    fn test_delivery_guarantee_display() {
1109        assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
1110        assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
1111    }
1112
1113    #[test]
1114    fn test_compaction_config_defaults() {
1115        let cfg = CompactionConfig::default();
1116        assert!(cfg.enabled);
1117        assert_eq!(cfg.min_files_for_compaction, 10);
1118        assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
1119        assert!(cfg.z_order_columns.is_empty());
1120        assert_eq!(cfg.check_interval, Duration::from_secs(3600));
1121    }
1122
1123    #[test]
1124    fn test_partition_columns_empty_filter() {
1125        let mut pairs = required_pairs();
1126        pairs.push(("partition.columns", "a,,b, ,c"));
1127        let config = make_config(&pairs);
1128        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1129        assert_eq!(cfg.partition_columns, vec!["a", "b", "c"]);
1130    }
1131
1132    // ── Cloud storage integration tests ──
1133
1134    #[test]
1135    fn test_s3_path_requires_region() {
1136        let config = make_config(&[("table.path", "s3://my-bucket/trades")]);
1137        let result = DeltaLakeSinkConfig::from_config(&config);
1138        assert!(result.is_err());
1139        let err = result.unwrap_err().to_string();
1140        assert!(err.contains("aws_region"), "error: {err}");
1141    }
1142
1143    #[test]
1144    fn test_s3_path_with_region_and_credentials() {
1145        let config = make_config(&[
1146            ("table.path", "s3://my-bucket/trades"),
1147            ("storage.aws_region", "us-east-1"),
1148            ("storage.aws_access_key_id", "AKID123"),
1149            ("storage.aws_secret_access_key", "SECRET"),
1150        ]);
1151        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1152        assert_eq!(cfg.storage_options["aws_region"], "us-east-1");
1153        assert_eq!(cfg.storage_options["aws_access_key_id"], "AKID123");
1154    }
1155
1156    #[test]
1157    fn test_s3_path_with_region_only_warns_no_error() {
1158        // Missing credentials is a warning (IAM fallback), not a hard error.
1159        let config = make_config(&[
1160            ("table.path", "s3://my-bucket/trades"),
1161            ("storage.aws_region", "us-east-1"),
1162        ]);
1163        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
1164    }
1165
1166    #[test]
1167    fn test_s3_path_access_key_without_secret_errors() {
1168        let config = make_config(&[
1169            ("table.path", "s3://my-bucket/trades"),
1170            ("storage.aws_region", "us-east-1"),
1171            ("storage.aws_access_key_id", "AKID123"),
1172        ]);
1173        let result = DeltaLakeSinkConfig::from_config(&config);
1174        assert!(result.is_err());
1175        let err = result.unwrap_err().to_string();
1176        assert!(err.contains("aws_secret_access_key"), "error: {err}");
1177    }
1178
1179    #[test]
1180    fn test_azure_path_requires_account_name() {
1181        let config = make_config(&[("table.path", "az://my-container/trades")]);
1182        let result = DeltaLakeSinkConfig::from_config(&config);
1183        assert!(result.is_err());
1184        let err = result.unwrap_err().to_string();
1185        assert!(err.contains("azure_storage_account_name"), "error: {err}");
1186    }
1187
1188    #[test]
1189    fn test_azure_path_with_account_name_and_key() {
1190        let config = make_config(&[
1191            ("table.path", "az://my-container/trades"),
1192            ("storage.azure_storage_account_name", "myaccount"),
1193            ("storage.azure_storage_account_key", "base64key=="),
1194        ]);
1195        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
1196    }
1197
1198    #[test]
1199    fn test_gcs_path_always_valid() {
1200        // GCS missing credentials is warning-only (Application Default Credentials).
1201        let config = make_config(&[("table.path", "gs://my-bucket/trades")]);
1202        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
1203    }
1204
1205    #[test]
1206    fn test_local_path_no_cloud_validation() {
1207        let config = make_config(&[("table.path", "/data/warehouse/trades")]);
1208        assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
1209    }
1210
1211    #[test]
1212    fn test_display_storage_options_redacts_secrets() {
1213        let mut cfg = DeltaLakeSinkConfig::new("s3://bucket/path");
1214        cfg.storage_options
1215            .insert("aws_region".to_string(), "us-east-1".to_string());
1216        cfg.storage_options.insert(
1217            "aws_secret_access_key".to_string(),
1218            "TOP_SECRET".to_string(),
1219        );
1220
1221        let display = cfg.display_storage_options();
1222        assert!(display.contains("aws_region=us-east-1"));
1223        assert!(display.contains("aws_secret_access_key=***"));
1224        assert!(!display.contains("TOP_SECRET"));
1225    }
1226
1227    #[test]
1228    fn test_display_storage_options_empty() {
1229        let cfg = DeltaLakeSinkConfig::new("/local/path");
1230        assert!(cfg.display_storage_options().is_empty());
1231    }
1232
1233    // ── Catalog tests ──
1234
1235    #[test]
1236    fn test_catalog_type_parse() {
1237        assert_eq!(
1238            "none".parse::<DeltaCatalogType>().unwrap(),
1239            DeltaCatalogType::None
1240        );
1241        assert_eq!(
1242            "glue".parse::<DeltaCatalogType>().unwrap(),
1243            DeltaCatalogType::Glue
1244        );
1245        assert!(matches!(
1246            "unity".parse::<DeltaCatalogType>().unwrap(),
1247            DeltaCatalogType::Unity { .. }
1248        ));
1249        assert!("unknown".parse::<DeltaCatalogType>().is_err());
1250    }
1251
1252    #[test]
1253    fn test_catalog_type_display() {
1254        assert_eq!(DeltaCatalogType::None.to_string(), "none");
1255        assert_eq!(DeltaCatalogType::Glue.to_string(), "glue");
1256        assert_eq!(
1257            DeltaCatalogType::Unity {
1258                workspace_url: "url".into(),
1259                access_token: "tok".into()
1260            }
1261            .to_string(),
1262            "unity"
1263        );
1264    }
1265
1266    #[test]
1267    fn test_catalog_none_default() {
1268        let config = make_config(&required_pairs());
1269        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1270        assert_eq!(cfg.catalog_type, DeltaCatalogType::None);
1271        assert!(cfg.catalog_database.is_none());
1272        assert!(cfg.catalog_name.is_none());
1273        assert!(cfg.catalog_schema.is_none());
1274        assert!(cfg.catalog_storage_location.is_none());
1275    }
1276
1277    #[cfg(feature = "delta-lake-glue")]
1278    #[test]
1279    fn test_catalog_glue_valid() {
1280        let mut pairs = required_pairs();
1281        pairs.extend_from_slice(&[
1282            ("catalog.type", "glue"),
1283            ("catalog.database", "my_database"),
1284        ]);
1285        let config = make_config(&pairs);
1286        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1287        assert_eq!(cfg.catalog_type, DeltaCatalogType::Glue);
1288        assert_eq!(cfg.catalog_database.as_deref(), Some("my_database"));
1289    }
1290
1291    #[cfg(feature = "delta-lake-glue")]
1292    #[test]
1293    fn test_catalog_glue_missing_database() {
1294        let mut pairs = required_pairs();
1295        pairs.push(("catalog.type", "glue"));
1296        let config = make_config(&pairs);
1297        let result = DeltaLakeSinkConfig::from_config(&config);
1298        assert!(result.is_err());
1299        let err = result.unwrap_err().to_string();
1300        assert!(err.contains("catalog.database"), "error: {err}");
1301    }
1302
1303    #[cfg(feature = "delta-lake-unity")]
1304    #[test]
1305    fn test_catalog_unity_valid() {
1306        let mut pairs = required_pairs();
1307        pairs.extend_from_slice(&[
1308            ("catalog.type", "unity"),
1309            ("catalog.workspace_url", "https://my.databricks.com"),
1310            ("catalog.access_token", "dapi123"),
1311            ("catalog.name", "main"),
1312            ("catalog.schema", "default"),
1313        ]);
1314        let config = make_config(&pairs);
1315        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1316        assert!(matches!(
1317            cfg.catalog_type,
1318            DeltaCatalogType::Unity {
1319                ref workspace_url,
1320                ref access_token
1321            }
1322            if workspace_url == "https://my.databricks.com"
1323                && access_token == "dapi123"
1324        ));
1325        assert_eq!(cfg.catalog_name.as_deref(), Some("main"));
1326        assert_eq!(cfg.catalog_schema.as_deref(), Some("default"));
1327    }
1328
1329    #[cfg(feature = "delta-lake-unity")]
1330    #[test]
1331    fn test_catalog_unity_missing_workspace_url() {
1332        let mut pairs = required_pairs();
1333        pairs.extend_from_slice(&[
1334            ("catalog.type", "unity"),
1335            ("catalog.access_token", "dapi123"),
1336            ("catalog.name", "main"),
1337            ("catalog.schema", "default"),
1338        ]);
1339        let config = make_config(&pairs);
1340        let result = DeltaLakeSinkConfig::from_config(&config);
1341        assert!(result.is_err());
1342        let err = result.unwrap_err().to_string();
1343        assert!(err.contains("workspace_url"), "error: {err}");
1344    }
1345
1346    #[cfg(feature = "delta-lake-unity")]
1347    #[test]
1348    fn test_catalog_unity_missing_access_token() {
1349        let mut pairs = required_pairs();
1350        pairs.extend_from_slice(&[
1351            ("catalog.type", "unity"),
1352            ("catalog.workspace_url", "https://my.databricks.com"),
1353            ("catalog.name", "main"),
1354            ("catalog.schema", "default"),
1355        ]);
1356        let config = make_config(&pairs);
1357        let result = DeltaLakeSinkConfig::from_config(&config);
1358        assert!(result.is_err());
1359        let err = result.unwrap_err().to_string();
1360        assert!(err.contains("access_token"), "error: {err}");
1361    }
1362
1363    #[test]
1364    fn test_catalog_storage_location_default_none() {
1365        let config = make_config(&required_pairs());
1366        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1367        assert!(cfg.catalog_storage_location.is_none());
1368    }
1369
1370    #[test]
1371    fn test_catalog_storage_location_parsed() {
1372        let mut pairs = required_pairs();
1373        pairs.push(("catalog.storage.location", "s3://bucket/warehouse/table"));
1374        let config = make_config(&pairs);
1375        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1376        assert_eq!(
1377            cfg.catalog_storage_location.as_deref(),
1378            Some("s3://bucket/warehouse/table")
1379        );
1380    }
1381
1382    // ── Parquet config tests ──
1383
1384    #[test]
1385    fn test_parquet_config_defaults() {
1386        let cfg = ParquetWriteConfig::default();
1387        assert_eq!(cfg.compression, "zstd");
1388        assert_eq!(cfg.compression_level, 1);
1389        assert_eq!(cfg.compaction_compression_level, Some(3));
1390        assert!(cfg.dictionary_enabled);
1391        assert_eq!(cfg.statistics, "page");
1392        assert!(cfg.bloom_filter_columns.is_empty());
1393        assert!((cfg.bloom_filter_fpp - 0.01).abs() < f64::EPSILON);
1394        assert_eq!(cfg.bloom_filter_ndv, 0);
1395        assert_eq!(cfg.max_row_group_size, 1_000_000);
1396    }
1397
1398    #[test]
1399    fn test_parquet_compression_parsing() {
1400        for codec in &["zstd", "snappy", "lz4", "gzip", "none"] {
1401            let mut pairs = required_pairs();
1402            pairs.push(("parquet.compression", codec));
1403            let config = make_config(&pairs);
1404            let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1405            assert_eq!(cfg.parquet.compression, *codec);
1406        }
1407    }
1408
1409    #[test]
1410    fn test_parquet_compression_level_parsing() {
1411        let mut pairs = required_pairs();
1412        pairs.push(("parquet.compression.level", "5"));
1413        let config = make_config(&pairs);
1414        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1415        assert_eq!(cfg.parquet.compression_level, 5);
1416    }
1417
1418    #[test]
1419    fn test_parquet_compression_level_invalid() {
1420        let mut pairs = required_pairs();
1421        pairs.push(("parquet.compression.level", "abc"));
1422        let config = make_config(&pairs);
1423        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1424    }
1425
1426    #[test]
1427    fn test_parquet_compaction_compression_level() {
1428        let mut pairs = required_pairs();
1429        pairs.push(("parquet.compaction.compression.level", "7"));
1430        let config = make_config(&pairs);
1431        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1432        assert_eq!(cfg.parquet.compaction_compression_level, Some(7));
1433    }
1434
1435    #[test]
1436    fn test_parquet_bloom_filter_columns_parsing() {
1437        let mut pairs = required_pairs();
1438        pairs.push((
1439            "parquet.bloom.filter.columns",
1440            " user_id , event_type , ts ",
1441        ));
1442        let config = make_config(&pairs);
1443        let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1444        assert_eq!(
1445            cfg.parquet.bloom_filter_columns,
1446            vec!["user_id", "event_type", "ts"]
1447        );
1448    }
1449
1450    #[test]
1451    fn test_parquet_bloom_filter_fpp_validation() {
1452        // fpp = 0.0 should be rejected
1453        let mut pairs = required_pairs();
1454        pairs.push(("parquet.bloom.filter.fpp", "0.0"));
1455        let config = make_config(&pairs);
1456        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1457
1458        // fpp = 1.0 should be rejected
1459        let mut pairs = required_pairs();
1460        pairs.push(("parquet.bloom.filter.fpp", "1.0"));
1461        let config = make_config(&pairs);
1462        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1463    }
1464
1465    #[test]
1466    fn test_parquet_max_row_group_size_zero_rejected() {
1467        let mut pairs = required_pairs();
1468        pairs.push(("parquet.max.row.group.size", "0"));
1469        let config = make_config(&pairs);
1470        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1471    }
1472
1473    #[test]
1474    fn test_parquet_statistics_parsing() {
1475        for stat in &["none", "chunk", "page"] {
1476            let mut pairs = required_pairs();
1477            pairs.push(("parquet.statistics", stat));
1478            let config = make_config(&pairs);
1479            let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1480            assert_eq!(cfg.parquet.statistics, *stat);
1481        }
1482    }
1483
1484    #[test]
1485    fn test_parquet_invalid_statistics_rejected() {
1486        let mut pairs = required_pairs();
1487        pairs.push(("parquet.statistics", "full"));
1488        let config = make_config(&pairs);
1489        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1490    }
1491
1492    #[test]
1493    fn test_parquet_invalid_compression_rejected() {
1494        let mut pairs = required_pairs();
1495        pairs.push(("parquet.compression", "brotli"));
1496        let config = make_config(&pairs);
1497        assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1498    }
1499
1500    #[cfg(feature = "delta-lake")]
1501    #[test]
1502    fn test_writer_properties_default_zstd() {
1503        let cfg = ParquetWriteConfig::default();
1504        assert!(cfg.to_writer_properties().is_ok());
1505    }
1506
1507    #[cfg(feature = "delta-lake")]
1508    #[test]
1509    fn test_compaction_writer_properties_higher_level() {
1510        let cfg = ParquetWriteConfig::default();
1511        // Should succeed and use level 3 for compaction.
1512        assert!(cfg.compaction_writer_properties().is_ok());
1513    }
1514
1515    #[cfg(feature = "delta-lake")]
1516    #[test]
1517    fn test_writer_properties_invalid_codec() {
1518        let mut cfg = ParquetWriteConfig::default();
1519        cfg.compression = "brotli".to_string();
1520        assert!(cfg.to_writer_properties().is_err());
1521    }
1522
1523    #[cfg(feature = "delta-lake")]
1524    #[test]
1525    fn test_writer_properties_with_bloom_filters() {
1526        let mut cfg = ParquetWriteConfig::default();
1527        cfg.bloom_filter_columns = vec!["user_id".to_string(), "event_type".to_string()];
1528        assert!(cfg.to_writer_properties().is_ok());
1529    }
1530}