1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
6use std::fmt;
7use std::str::FromStr;
8use std::time::Duration;
9
10use crate::config::ConnectorConfig;
11use crate::error::ConnectorError;
12use crate::storage::{
13 CloudConfigValidator, ResolvedStorageOptions, SecretMasker, StorageCredentialResolver,
14 StorageProvider,
15};
16
17#[derive(Debug, Clone)]
21pub struct DeltaLakeSinkConfig {
22 pub table_path: String,
24
25 pub partition_columns: Vec<String>,
27
28 pub target_file_size: usize,
30
31 pub max_buffer_records: usize,
33
34 pub max_buffer_duration: Duration,
36
37 pub checkpoint_interval: u64,
39
40 pub schema_evolution: bool,
42
43 pub write_mode: DeltaWriteMode,
45
46 pub merge_key_columns: Vec<String>,
48
49 pub storage_options: HashMap<String, String>,
51
52 pub compaction: CompactionConfig,
54
55 pub vacuum_retention: Duration,
57
58 pub delivery_guarantee: DeliveryGuarantee,
60
61 pub writer_id: String,
63
64 pub catalog_type: DeltaCatalogType,
66
67 pub catalog_database: Option<String>,
69
70 pub catalog_name: Option<String>,
72
73 pub catalog_schema: Option<String>,
75
76 pub catalog_storage_location: Option<String>,
80
81 pub max_commit_retries: u32,
85
86 pub write_timeout: Duration,
88
89 pub parquet: ParquetWriteConfig,
91}
92
93impl Default for DeltaLakeSinkConfig {
94 fn default() -> Self {
95 Self {
96 table_path: String::new(),
97 partition_columns: Vec::new(),
98 target_file_size: 128 * 1024 * 1024, max_buffer_records: 100_000,
100 max_buffer_duration: Duration::from_secs(60),
101 checkpoint_interval: 10,
102 schema_evolution: false,
103 write_mode: DeltaWriteMode::Append,
104 merge_key_columns: Vec::new(),
105 storage_options: HashMap::new(),
106 compaction: CompactionConfig::default(),
107 vacuum_retention: Duration::from_secs(7 * 24 * 3600),
108 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
109 writer_id: uuid::Uuid::new_v4().to_string(),
110 catalog_type: DeltaCatalogType::None,
111 catalog_database: None,
112 catalog_name: None,
113 catalog_schema: None,
114 catalog_storage_location: None,
115 max_commit_retries: 3,
116 write_timeout: Duration::from_secs(30),
117 parquet: ParquetWriteConfig::default(),
118 }
119 }
120}
121
122impl DeltaLakeSinkConfig {
123 #[must_use]
125 pub fn new(table_path: &str) -> Self {
126 Self {
127 table_path: table_path.to_string(),
128 ..Default::default()
129 }
130 }
131
132 #[allow(clippy::too_many_lines)]
143 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
144 let mut cfg = Self {
145 table_path: config.require("table.path")?.to_string(),
146 ..Self::default()
147 };
148
149 if let Some(v) = config.get("partition.columns") {
150 cfg.partition_columns = v
151 .split(',')
152 .map(|c| c.trim().to_string())
153 .filter(|c| !c.is_empty())
154 .collect();
155 }
156 if let Some(v) = config.get("target.file.size") {
157 cfg.target_file_size = v.parse().map_err(|_| {
158 ConnectorError::ConfigurationError(format!("invalid target.file.size: '{v}'"))
159 })?;
160 }
161 if let Some(v) = config.get("max.buffer.records") {
162 cfg.max_buffer_records = v.parse().map_err(|_| {
163 ConnectorError::ConfigurationError(format!("invalid max.buffer.records: '{v}'"))
164 })?;
165 }
166 if let Some(v) = config.get("max.buffer.duration.ms") {
167 let ms: u64 = v.parse().map_err(|_| {
168 ConnectorError::ConfigurationError(format!("invalid max.buffer.duration.ms: '{v}'"))
169 })?;
170 cfg.max_buffer_duration = Duration::from_millis(ms);
171 }
172 if let Some(v) = config.get("checkpoint.interval") {
173 cfg.checkpoint_interval = v.parse().map_err(|_| {
174 ConnectorError::ConfigurationError(format!("invalid checkpoint.interval: '{v}'"))
175 })?;
176 }
177 if let Some(v) = config.get("schema.evolution") {
178 cfg.schema_evolution = v.eq_ignore_ascii_case("true");
179 }
180 if let Some(v) = config.get("write.mode") {
181 cfg.write_mode = v.parse().map_err(|_| {
182 ConnectorError::ConfigurationError(format!(
183 "invalid write.mode: '{v}' (expected 'append', 'overwrite', or 'upsert')"
184 ))
185 })?;
186 }
187 if let Some(v) = config.get("merge.key.columns") {
188 cfg.merge_key_columns = v
189 .split(',')
190 .map(|c| c.trim().to_string())
191 .filter(|c| !c.is_empty())
192 .collect();
193 }
194 if let Some(v) = config.get("delivery.guarantee") {
195 cfg.delivery_guarantee = v.parse().map_err(|_| {
196 ConnectorError::ConfigurationError(format!(
197 "invalid delivery.guarantee: '{v}' \
198 (expected 'exactly-once' or 'at-least-once')"
199 ))
200 })?;
201 }
202 if let Some(v) = config.get("compaction.enabled") {
203 cfg.compaction.enabled = v.eq_ignore_ascii_case("true");
204 }
205 if let Some(v) = config.get("compaction.z-order.columns") {
206 cfg.compaction.z_order_columns = v
207 .split(',')
208 .map(|c| c.trim().to_string())
209 .filter(|c| !c.is_empty())
210 .collect();
211 }
212 if let Some(v) = config.get("compaction.target-file-size") {
213 cfg.compaction.target_file_size = v.parse().map_err(|_| {
214 ConnectorError::ConfigurationError(format!(
215 "invalid compaction.target-file-size: '{v}'"
216 ))
217 })?;
218 } else {
219 cfg.compaction.target_file_size = cfg.target_file_size;
221 }
222 if let Some(v) = config.get("compaction.min-files") {
223 cfg.compaction.min_files_for_compaction = v.parse().map_err(|_| {
224 ConnectorError::ConfigurationError(format!("invalid compaction.min-files: '{v}'"))
225 })?;
226 }
227 if let Some(v) = config.get("compaction.check-interval.ms") {
228 let ms: u64 = v.parse().map_err(|_| {
229 ConnectorError::ConfigurationError(format!(
230 "invalid compaction.check-interval.ms: '{v}'"
231 ))
232 })?;
233 if ms == 0 {
234 return Err(ConnectorError::ConfigurationError(
235 "compaction.check-interval.ms must be > 0".into(),
236 ));
237 }
238 cfg.compaction.check_interval = Duration::from_millis(ms);
239 }
240 if let Some(v) = config.get("vacuum.retention.hours") {
241 let hours: u64 = v.parse().map_err(|_| {
242 ConnectorError::ConfigurationError(format!("invalid vacuum.retention.hours: '{v}'"))
243 })?;
244 cfg.vacuum_retention = Duration::from_secs(hours * 3600);
245 }
246 if let Some(v) = config.get("writer.id") {
247 cfg.writer_id = v.to_string();
248 } else if cfg.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
249 return Err(ConnectorError::ConfigurationError(
250 "exactly-once delivery requires an explicit 'writer.id' for stable \
251 recovery across restarts"
252 .into(),
253 ));
254 }
255
256 if let Some(v) = config.get("catalog.type") {
258 cfg.catalog_type = v.parse().map_err(|_| {
259 ConnectorError::ConfigurationError(format!(
260 "invalid catalog.type: '{v}' (expected 'none', 'glue', or 'unity')"
261 ))
262 })?;
263 }
264 if let Some(v) = config.get("catalog.database") {
265 cfg.catalog_database = Some(v.to_string());
266 }
267 if let Some(v) = config.get("catalog.name") {
268 cfg.catalog_name = Some(v.to_string());
269 }
270 if let Some(v) = config.get("catalog.schema") {
271 cfg.catalog_schema = Some(v.to_string());
272 }
273 if let DeltaCatalogType::Unity {
275 ref mut workspace_url,
276 ref mut access_token,
277 } = cfg.catalog_type
278 {
279 if let Some(v) = config.get("catalog.workspace_url") {
280 *workspace_url = v.to_string();
281 }
282 if let Some(v) = config.get("catalog.access_token") {
283 *access_token = v.to_string();
284 }
285 }
286 if let Some(v) = config.get("catalog.storage.location") {
287 cfg.catalog_storage_location = Some(v.to_string());
288 }
289 if let Some(v) = config.get("max.commit.retries") {
290 cfg.max_commit_retries = v.parse().map_err(|_| {
291 ConnectorError::ConfigurationError(format!("invalid max.commit.retries: '{v}'"))
292 })?;
293 }
294 if let Some(v) = config.get("write.timeout.ms") {
295 let ms: u64 = v.parse().map_err(|_| {
296 ConnectorError::ConfigurationError(format!("invalid write.timeout.ms: '{v}'"))
297 })?;
298 cfg.write_timeout = Duration::from_millis(ms);
299 }
300
301 if let Some(v) = config.get("parquet.compression") {
303 cfg.parquet.compression = v.to_string();
304 }
305 if let Some(v) = config.get("parquet.compression.level") {
306 cfg.parquet.compression_level = v.parse().map_err(|_| {
307 ConnectorError::ConfigurationError(format!(
308 "invalid parquet.compression.level: '{v}'"
309 ))
310 })?;
311 }
312 if let Some(v) = config.get("parquet.compaction.compression.level") {
313 cfg.parquet.compaction_compression_level = Some(v.parse().map_err(|_| {
314 ConnectorError::ConfigurationError(format!(
315 "invalid parquet.compaction.compression.level: '{v}'"
316 ))
317 })?);
318 }
319 if let Some(v) = config.get("parquet.dictionary.enabled") {
320 cfg.parquet.dictionary_enabled = v.eq_ignore_ascii_case("true");
321 }
322 if let Some(v) = config.get("parquet.statistics") {
323 cfg.parquet.statistics = v.to_string();
324 }
325 if let Some(v) = config.get("parquet.bloom.filter.columns") {
326 cfg.parquet.bloom_filter_columns = v
327 .split(',')
328 .map(|c| c.trim().to_string())
329 .filter(|c| !c.is_empty())
330 .collect();
331 }
332 if let Some(v) = config.get("parquet.bloom.filter.fpp") {
333 cfg.parquet.bloom_filter_fpp = v.parse().map_err(|_| {
334 ConnectorError::ConfigurationError(format!(
335 "invalid parquet.bloom.filter.fpp: '{v}'"
336 ))
337 })?;
338 }
339 if let Some(v) = config.get("parquet.bloom.filter.ndv") {
340 cfg.parquet.bloom_filter_ndv = v.parse().map_err(|_| {
341 ConnectorError::ConfigurationError(format!(
342 "invalid parquet.bloom.filter.ndv: '{v}'"
343 ))
344 })?;
345 }
346 if let Some(v) = config.get("parquet.max.row.group.size") {
347 cfg.parquet.max_row_group_size = v.parse().map_err(|_| {
348 ConnectorError::ConfigurationError(format!(
349 "invalid parquet.max.row.group.size: '{v}'"
350 ))
351 })?;
352 }
353
354 let explicit_storage = config.properties_with_prefix("storage.");
356 let resolved = StorageCredentialResolver::resolve(&cfg.table_path, &explicit_storage);
357 cfg.storage_options = resolved.options;
358
359 if let Some(v) = config.get("storage.s3_locking_provider") {
361 cfg.storage_options
362 .insert("AWS_S3_LOCKING_PROVIDER".to_string(), v.to_string());
363 }
364 if let Some(v) = config.get("storage.dynamodb_table_name") {
365 cfg.storage_options
366 .insert("DELTA_DYNAMO_TABLE_NAME".to_string(), v.to_string());
367 }
368
369 cfg.validate()?;
370 Ok(cfg)
371 }
372
373 #[must_use]
375 pub fn display_storage_options(&self) -> String {
376 SecretMasker::display_map(&self.storage_options)
377 }
378
379 pub fn validate(&self) -> Result<(), ConnectorError> {
385 if self.table_path.is_empty() {
386 return Err(ConnectorError::missing_config("table.path"));
387 }
388 if self.write_mode == DeltaWriteMode::Upsert && self.merge_key_columns.is_empty() {
389 return Err(ConnectorError::ConfigurationError(
390 "upsert mode requires 'merge.key.columns' to be set".into(),
391 ));
392 }
393 if self.max_buffer_records == 0 {
394 return Err(ConnectorError::ConfigurationError(
395 "max.buffer.records must be > 0".into(),
396 ));
397 }
398 if self.target_file_size == 0 {
399 return Err(ConnectorError::ConfigurationError(
400 "target.file.size must be > 0".into(),
401 ));
402 }
403 if self.checkpoint_interval == 0 {
404 return Err(ConnectorError::ConfigurationError(
405 "checkpoint.interval must be > 0".into(),
406 ));
407 }
408 if self.write_timeout < Duration::from_secs(5) {
409 return Err(ConnectorError::ConfigurationError(
410 "write.timeout.ms must be >= 5000 (5 seconds)".into(),
411 ));
412 }
413 if self.compaction.check_interval.is_zero() {
414 return Err(ConnectorError::ConfigurationError(
415 "compaction.check-interval.ms must be > 0".into(),
416 ));
417 }
418 if self.vacuum_retention < Duration::from_secs(86400) {
419 return Err(ConnectorError::ConfigurationError(
420 "vacuum.retention.hours must be >= 24 (Delta Lake safety minimum)".into(),
421 ));
422 }
423
424 match self.parquet.compression.to_lowercase().as_str() {
425 "zstd" | "snappy" | "lz4" | "gzip" | "none" | "uncompressed" => {}
426 other => {
427 return Err(ConnectorError::ConfigurationError(format!(
428 "unknown parquet.compression: '{other}' \
429 (expected 'zstd', 'snappy', 'lz4', 'gzip', or 'none')"
430 )));
431 }
432 }
433 match self.parquet.statistics.to_lowercase().as_str() {
434 "none" | "chunk" | "page" => {}
435 other => {
436 return Err(ConnectorError::ConfigurationError(format!(
437 "unknown parquet.statistics: '{other}' (expected 'none', 'chunk', or 'page')"
438 )));
439 }
440 }
441 if self.parquet.bloom_filter_fpp <= 0.0 || self.parquet.bloom_filter_fpp >= 1.0 {
442 return Err(ConnectorError::ConfigurationError(
443 "parquet.bloom.filter.fpp must be in (0.0, 1.0)".into(),
444 ));
445 }
446 if self.parquet.max_row_group_size == 0 {
447 return Err(ConnectorError::ConfigurationError(
448 "parquet.max.row.group.size must be > 0".into(),
449 ));
450 }
451 #[cfg(feature = "delta-lake")]
454 {
455 self.parquet.to_writer_properties()?;
456 self.parquet.compaction_writer_properties()?;
457 }
458
459 self.validate_catalog()?;
460
461 if self.catalog_type == DeltaCatalogType::None {
463 let resolved = ResolvedStorageOptions {
464 provider: StorageProvider::detect(&self.table_path),
465 options: self.storage_options.clone(),
466 env_resolved_keys: Vec::new(),
467 };
468 let cloud_result = CloudConfigValidator::validate(&resolved);
469 if !cloud_result.is_valid() {
470 return Err(ConnectorError::ConfigurationError(
471 cloud_result.error_message(),
472 ));
473 }
474 }
475
476 Ok(())
477 }
478
479 fn validate_catalog(&self) -> Result<(), ConnectorError> {
481 match &self.catalog_type {
482 DeltaCatalogType::None => {}
483 DeltaCatalogType::Glue => {
484 #[cfg(not(feature = "delta-lake-glue"))]
485 return Err(ConnectorError::ConfigurationError(
486 "Glue catalog requires the 'delta-lake-glue' feature. \
487 Build with: cargo build --features delta-lake-glue"
488 .into(),
489 ));
490 #[cfg(feature = "delta-lake-glue")]
491 if self.catalog_database.is_none() {
492 return Err(ConnectorError::ConfigurationError(
493 "Glue catalog requires 'catalog.database' to be set".into(),
494 ));
495 }
496 }
497 DeltaCatalogType::Unity {
498 workspace_url,
499 access_token,
500 } => {
501 #[cfg(not(feature = "delta-lake-unity"))]
502 {
503 let _ = (workspace_url, access_token);
504 return Err(ConnectorError::ConfigurationError(
505 "Unity catalog requires the 'delta-lake-unity' feature. \
506 Build with: cargo build --features delta-lake-unity"
507 .into(),
508 ));
509 }
510 #[cfg(feature = "delta-lake-unity")]
511 {
512 if workspace_url.is_empty() {
513 return Err(ConnectorError::ConfigurationError(
514 "Unity catalog requires 'catalog.workspace_url' to be set".into(),
515 ));
516 }
517 if access_token.is_empty() {
518 return Err(ConnectorError::ConfigurationError(
519 "Unity catalog requires 'catalog.access_token' to be set".into(),
520 ));
521 }
522 if self.catalog_storage_location.is_some() {
523 if self.catalog_name.is_none() {
524 return Err(ConnectorError::ConfigurationError(
525 "Unity catalog auto-create requires 'catalog.name' to be set"
526 .into(),
527 ));
528 }
529 if self.catalog_schema.is_none() {
530 return Err(ConnectorError::ConfigurationError(
531 "Unity catalog auto-create requires 'catalog.schema' to be set"
532 .into(),
533 ));
534 }
535 }
536 }
537 }
538 }
539 Ok(())
540 }
541}
542
543#[derive(Debug, Clone, Copy, PartialEq, Eq)]
545pub enum DeltaWriteMode {
546 Append,
548 Overwrite,
550 Upsert,
553}
554
555impl FromStr for DeltaWriteMode {
556 type Err = String;
557
558 fn from_str(s: &str) -> Result<Self, Self::Err> {
559 match s.to_lowercase().as_str() {
560 "append" => Ok(Self::Append),
561 "overwrite" => Ok(Self::Overwrite),
562 "upsert" | "merge" => Ok(Self::Upsert),
563 other => Err(format!("unknown write mode: '{other}'")),
564 }
565 }
566}
567
568impl fmt::Display for DeltaWriteMode {
569 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
570 match self {
571 Self::Append => write!(f, "append"),
572 Self::Overwrite => write!(f, "overwrite"),
573 Self::Upsert => write!(f, "upsert"),
574 }
575 }
576}
577
578pub use crate::connector::DeliveryGuarantee;
579
580#[derive(Debug, Clone, PartialEq, Eq, Default)]
584pub enum DeltaCatalogType {
585 #[default]
587 None,
588 Glue,
590 Unity {
592 workspace_url: String,
594 access_token: String,
596 },
597}
598
599impl FromStr for DeltaCatalogType {
600 type Err = String;
601
602 fn from_str(s: &str) -> Result<Self, Self::Err> {
603 match s.to_lowercase().as_str() {
604 "none" | "" => Ok(Self::None),
605 "glue" => Ok(Self::Glue),
606 "unity" => Ok(Self::Unity {
607 workspace_url: String::new(),
608 access_token: String::new(),
609 }),
610 other => Err(format!("unknown catalog type: '{other}'")),
611 }
612 }
613}
614
615impl fmt::Display for DeltaCatalogType {
616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617 match self {
618 Self::None => write!(f, "none"),
619 Self::Glue => write!(f, "glue"),
620 Self::Unity { .. } => write!(f, "unity"),
621 }
622 }
623}
624
625#[derive(Debug, Clone)]
627pub struct CompactionConfig {
628 pub enabled: bool,
630
631 pub min_files_for_compaction: usize,
633
634 pub target_file_size: usize,
636
637 pub z_order_columns: Vec<String>,
639
640 pub check_interval: Duration,
642}
643
644impl Default for CompactionConfig {
645 fn default() -> Self {
646 Self {
647 enabled: true,
648 min_files_for_compaction: 10,
649 target_file_size: 128 * 1024 * 1024, z_order_columns: Vec::new(),
651 check_interval: Duration::from_secs(3600), }
653 }
654}
655
656#[derive(Debug, Clone)]
659pub struct ParquetWriteConfig {
660 pub compression: String,
662 pub compression_level: i32,
664 pub compaction_compression_level: Option<i32>,
666 pub dictionary_enabled: bool,
668 pub statistics: String,
670 pub bloom_filter_columns: Vec<String>,
672 pub bloom_filter_fpp: f64,
674 pub bloom_filter_ndv: u64,
676 pub max_row_group_size: usize,
678}
679
680impl Default for ParquetWriteConfig {
681 fn default() -> Self {
682 Self {
683 compression: "zstd".to_string(),
684 compression_level: 1,
685 compaction_compression_level: Some(3),
686 dictionary_enabled: true,
687 statistics: "page".to_string(),
688 bloom_filter_columns: Vec::new(),
689 bloom_filter_fpp: 0.01,
690 bloom_filter_ndv: 0,
691 max_row_group_size: 1_000_000,
692 }
693 }
694}
695
696#[cfg(feature = "delta-lake")]
697impl ParquetWriteConfig {
698 pub fn to_writer_properties(
704 &self,
705 ) -> Result<deltalake::parquet::file::properties::WriterProperties, ConnectorError> {
706 self.build_properties(self.compression_level)
707 }
708
709 pub fn compaction_writer_properties(
716 &self,
717 ) -> Result<deltalake::parquet::file::properties::WriterProperties, ConnectorError> {
718 let level = self
719 .compaction_compression_level
720 .unwrap_or(self.compression_level);
721 self.build_properties(level)
722 }
723
724 fn build_properties(
727 &self,
728 level: i32,
729 ) -> Result<deltalake::parquet::file::properties::WriterProperties, ConnectorError> {
730 use deltalake::parquet::basic::{Compression, GzipLevel, ZstdLevel};
731 use deltalake::parquet::file::properties::{EnabledStatistics, WriterProperties};
732 use deltalake::parquet::schema::types::ColumnPath;
733
734 let compression = match self.compression.to_lowercase().as_str() {
735 "zstd" => {
736 let zstd_level = ZstdLevel::try_new(level).map_err(|e| {
737 ConnectorError::ConfigurationError(format!("invalid ZSTD level {level}: {e}"))
738 })?;
739 Compression::ZSTD(zstd_level)
740 }
741 "snappy" => Compression::SNAPPY,
742 "lz4" => Compression::LZ4_RAW,
743 "gzip" => {
744 let level_u32: u32 = level.try_into().map_err(|_| {
745 ConnectorError::ConfigurationError(format!(
746 "invalid GZIP level {level}: must be non-negative"
747 ))
748 })?;
749 let gzip_level = GzipLevel::try_new(level_u32).map_err(|e| {
750 ConnectorError::ConfigurationError(format!("invalid GZIP level {level}: {e}"))
751 })?;
752 Compression::GZIP(gzip_level)
753 }
754 "none" | "uncompressed" => Compression::UNCOMPRESSED,
755 other => {
756 return Err(ConnectorError::ConfigurationError(format!(
757 "unknown parquet.compression: '{other}' \
758 (expected 'zstd', 'snappy', 'lz4', 'gzip', or 'none')"
759 )));
760 }
761 };
762
763 let statistics = match self.statistics.to_lowercase().as_str() {
764 "none" => EnabledStatistics::None,
765 "chunk" => EnabledStatistics::Chunk,
766 "page" => EnabledStatistics::Page,
767 other => {
768 return Err(ConnectorError::ConfigurationError(format!(
769 "unknown parquet.statistics: '{other}' (expected 'none', 'chunk', or 'page')"
770 )));
771 }
772 };
773
774 let mut builder = WriterProperties::builder()
775 .set_compression(compression)
776 .set_dictionary_enabled(self.dictionary_enabled)
777 .set_statistics_enabled(statistics)
778 .set_max_row_group_size(self.max_row_group_size);
779
780 for col_name in &self.bloom_filter_columns {
781 let col_path = ColumnPath::from(col_name.as_str());
782 builder = builder
783 .set_column_bloom_filter_enabled(col_path.clone(), true)
784 .set_column_bloom_filter_fpp(col_path.clone(), self.bloom_filter_fpp);
785 if self.bloom_filter_ndv > 0 {
786 builder = builder.set_column_bloom_filter_ndv(col_path, self.bloom_filter_ndv);
787 }
788 }
789
790 Ok(builder.build())
791 }
792}
793
794#[cfg(test)]
795#[allow(clippy::field_reassign_with_default)]
796mod tests {
797 use super::*;
798
799 fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
800 let mut config = ConnectorConfig::new("delta-lake");
801 for (k, v) in pairs {
802 config.set(*k, *v);
803 }
804 config
805 }
806
807 fn required_pairs() -> Vec<(&'static str, &'static str)> {
808 vec![("table.path", "/data/warehouse/trades")]
809 }
810
811 #[test]
814 fn test_parse_required_fields() {
815 let config = make_config(&required_pairs());
816 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
817 assert_eq!(cfg.table_path, "/data/warehouse/trades");
818 assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
819 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
820 assert!(cfg.partition_columns.is_empty());
821 assert!(cfg.merge_key_columns.is_empty());
822 assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
823 assert_eq!(cfg.max_buffer_records, 100_000);
824 assert_eq!(cfg.checkpoint_interval, 10);
825 assert!(!cfg.schema_evolution);
826 assert!(cfg.compaction.enabled);
827 }
828
829 #[test]
830 fn test_missing_table_path() {
831 let config = ConnectorConfig::new("delta-lake");
832 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
833 }
834
835 #[test]
836 fn test_parse_all_optional_fields() {
837 let mut pairs = required_pairs();
838 pairs.extend_from_slice(&[
839 ("partition.columns", "trade_date, hour"),
840 ("target.file.size", "67108864"),
841 ("max.buffer.records", "50000"),
842 ("max.buffer.duration.ms", "30000"),
843 ("checkpoint.interval", "20"),
844 ("schema.evolution", "true"),
845 ("write.mode", "upsert"),
846 ("merge.key.columns", "customer_id, order_id"),
847 ("delivery.guarantee", "at-least-once"),
848 ("compaction.enabled", "true"),
849 ("compaction.z-order.columns", "customer_id, product_id"),
850 ("compaction.min-files", "20"),
851 ("vacuum.retention.hours", "336"),
852 ("writer.id", "my-writer"),
853 ("storage.aws_access_key_id", "AKID123"),
854 ("storage.aws_region", "us-east-1"),
855 ]);
856 let config = make_config(&pairs);
857 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
858
859 assert_eq!(cfg.partition_columns, vec!["trade_date", "hour"]);
860 assert_eq!(cfg.target_file_size, 67_108_864);
861 assert_eq!(cfg.max_buffer_records, 50_000);
862 assert_eq!(cfg.max_buffer_duration, Duration::from_secs(30));
863 assert_eq!(cfg.checkpoint_interval, 20);
864 assert!(cfg.schema_evolution);
865 assert_eq!(cfg.write_mode, DeltaWriteMode::Upsert);
866 assert_eq!(cfg.merge_key_columns, vec!["customer_id", "order_id"]);
867 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
868 assert!(cfg.compaction.enabled);
869 assert_eq!(
870 cfg.compaction.z_order_columns,
871 vec!["customer_id", "product_id"]
872 );
873 assert_eq!(cfg.compaction.min_files_for_compaction, 20);
874 assert_eq!(cfg.vacuum_retention, Duration::from_secs(1209600));
875 assert_eq!(cfg.writer_id, "my-writer");
876 assert_eq!(
877 cfg.storage_options.get("aws_access_key_id"),
878 Some(&"AKID123".to_string())
879 );
880 assert_eq!(
881 cfg.storage_options.get("aws_region"),
882 Some(&"us-east-1".to_string())
883 );
884 }
885
886 #[test]
887 fn test_upsert_requires_merge_key() {
888 let mut pairs = required_pairs();
889 pairs.push(("write.mode", "upsert"));
890 let config = make_config(&pairs);
891 let result = DeltaLakeSinkConfig::from_config(&config);
892 assert!(result.is_err());
893 let err = result.unwrap_err().to_string();
894 assert!(err.contains("merge.key.columns"), "error: {err}");
895 }
896
897 #[test]
898 fn test_empty_table_path_rejected() {
899 let mut cfg = DeltaLakeSinkConfig::default();
900 cfg.table_path = String::new();
901 assert!(cfg.validate().is_err());
902 }
903
904 #[test]
905 fn test_zero_max_buffer_records_rejected() {
906 let mut pairs = required_pairs();
907 pairs.push(("max.buffer.records", "0"));
908 let config = make_config(&pairs);
909 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
910 }
911
912 #[test]
913 fn test_zero_target_file_size_rejected() {
914 let mut pairs = required_pairs();
915 pairs.push(("target.file.size", "0"));
916 let config = make_config(&pairs);
917 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
918 }
919
920 #[test]
921 fn test_zero_checkpoint_interval_rejected() {
922 let mut pairs = required_pairs();
923 pairs.push(("checkpoint.interval", "0"));
924 let config = make_config(&pairs);
925 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
926 }
927
928 #[test]
929 fn test_vacuum_retention_below_24h_rejected() {
930 let mut pairs = required_pairs();
931 pairs.push(("vacuum.retention.hours", "12"));
932 let config = make_config(&pairs);
933 let result = DeltaLakeSinkConfig::from_config(&config);
934 assert!(result.is_err());
935 let err = result.unwrap_err().to_string();
936 assert!(err.contains("24"), "error: {err}");
937 }
938
939 #[test]
940 fn test_vacuum_retention_24h_accepted() {
941 let mut pairs = required_pairs();
942 pairs.push(("vacuum.retention.hours", "24"));
943 let config = make_config(&pairs);
944 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
945 }
946
947 #[test]
948 fn test_compaction_target_file_size_from_config() {
949 let mut pairs = required_pairs();
950 pairs.push(("compaction.target-file-size", "67108864"));
951 let config = make_config(&pairs);
952 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
953 assert_eq!(cfg.compaction.target_file_size, 67_108_864);
954 }
955
956 #[test]
957 fn test_compaction_target_file_size_defaults_to_sink() {
958 let mut pairs = required_pairs();
959 pairs.push(("target.file.size", "33554432"));
960 let config = make_config(&pairs);
961 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
962 assert_eq!(cfg.compaction.target_file_size, 33_554_432);
963 }
964
965 #[test]
966 fn test_exactly_once_requires_writer_id() {
967 let mut pairs = required_pairs();
968 pairs.push(("delivery.guarantee", "exactly-once"));
969 let config = make_config(&pairs);
970 let err = DeltaLakeSinkConfig::from_config(&config).unwrap_err();
971 assert!(err.to_string().contains("writer.id"), "error: {err}");
972 }
973
974 #[test]
975 fn test_exactly_once_with_writer_id_ok() {
976 let mut pairs = required_pairs();
977 pairs.push(("delivery.guarantee", "exactly-once"));
978 pairs.push(("writer.id", "my-stable-writer"));
979 let config = make_config(&pairs);
980 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
981 assert_eq!(cfg.writer_id, "my-stable-writer");
982 }
983
984 #[test]
985 fn test_invalid_target_file_size() {
986 let mut pairs = required_pairs();
987 pairs.push(("target.file.size", "abc"));
988 let config = make_config(&pairs);
989 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
990 }
991
992 #[test]
993 fn test_invalid_write_mode() {
994 let mut pairs = required_pairs();
995 pairs.push(("write.mode", "unknown"));
996 let config = make_config(&pairs);
997 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
998 }
999
1000 #[test]
1001 fn test_storage_options_prefix_stripping() {
1002 let mut pairs = required_pairs();
1003 pairs.push(("storage.aws_access_key_id", "AKID"));
1004 pairs.push(("storage.aws_secret_access_key", "SECRET"));
1005 pairs.push(("table.path", "/data/test"));
1006 let config = make_config(&pairs);
1007 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1008
1009 assert_eq!(cfg.storage_options.len(), 2);
1010 assert!(cfg.storage_options.contains_key("aws_access_key_id"));
1011 assert!(cfg.storage_options.contains_key("aws_secret_access_key"));
1012 assert!(!cfg
1013 .storage_options
1014 .contains_key("storage.aws_access_key_id"));
1015 }
1016
1017 #[test]
1018 fn test_defaults() {
1019 let cfg = DeltaLakeSinkConfig::default();
1020 assert!(cfg.table_path.is_empty());
1021 assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
1022 assert_eq!(cfg.max_buffer_records, 100_000);
1023 assert_eq!(cfg.max_buffer_duration, Duration::from_secs(60));
1024 assert_eq!(cfg.checkpoint_interval, 10);
1025 assert!(!cfg.schema_evolution);
1026 assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
1027 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
1028 assert!(!cfg.writer_id.is_empty());
1029 assert_eq!(cfg.max_commit_retries, 3);
1030 }
1031
1032 #[test]
1033 fn test_max_commit_retries_from_config() {
1034 let mut pairs = required_pairs();
1035 pairs.push(("max.commit.retries", "5"));
1036 let config = make_config(&pairs);
1037 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1038 assert_eq!(cfg.max_commit_retries, 5);
1039 }
1040
1041 #[test]
1042 fn test_max_commit_retries_invalid() {
1043 let mut pairs = required_pairs();
1044 pairs.push(("max.commit.retries", "abc"));
1045 let config = make_config(&pairs);
1046 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1047 }
1048
1049 #[test]
1050 fn test_new_helper() {
1051 let cfg = DeltaLakeSinkConfig::new("/tmp/test_table");
1052 assert_eq!(cfg.table_path, "/tmp/test_table");
1053 assert_eq!(cfg.write_mode, DeltaWriteMode::Append);
1054 }
1055
1056 #[test]
1059 fn test_write_mode_parse() {
1060 assert_eq!(
1061 "append".parse::<DeltaWriteMode>().unwrap(),
1062 DeltaWriteMode::Append
1063 );
1064 assert_eq!(
1065 "overwrite".parse::<DeltaWriteMode>().unwrap(),
1066 DeltaWriteMode::Overwrite
1067 );
1068 assert_eq!(
1069 "upsert".parse::<DeltaWriteMode>().unwrap(),
1070 DeltaWriteMode::Upsert
1071 );
1072 assert_eq!(
1073 "merge".parse::<DeltaWriteMode>().unwrap(),
1074 DeltaWriteMode::Upsert
1075 );
1076 assert!("unknown".parse::<DeltaWriteMode>().is_err());
1077 }
1078
1079 #[test]
1080 fn test_write_mode_display() {
1081 assert_eq!(DeltaWriteMode::Append.to_string(), "append");
1082 assert_eq!(DeltaWriteMode::Overwrite.to_string(), "overwrite");
1083 assert_eq!(DeltaWriteMode::Upsert.to_string(), "upsert");
1084 }
1085
1086 #[test]
1087 fn test_delivery_guarantee_parse() {
1088 assert_eq!(
1089 "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
1090 DeliveryGuarantee::AtLeastOnce
1091 );
1092 assert_eq!(
1093 "at_least_once".parse::<DeliveryGuarantee>().unwrap(),
1094 DeliveryGuarantee::AtLeastOnce
1095 );
1096 assert_eq!(
1097 "exactly-once".parse::<DeliveryGuarantee>().unwrap(),
1098 DeliveryGuarantee::ExactlyOnce
1099 );
1100 assert_eq!(
1101 "exactly_once".parse::<DeliveryGuarantee>().unwrap(),
1102 DeliveryGuarantee::ExactlyOnce
1103 );
1104 assert!("unknown".parse::<DeliveryGuarantee>().is_err());
1105 }
1106
1107 #[test]
1108 fn test_delivery_guarantee_display() {
1109 assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
1110 assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
1111 }
1112
1113 #[test]
1114 fn test_compaction_config_defaults() {
1115 let cfg = CompactionConfig::default();
1116 assert!(cfg.enabled);
1117 assert_eq!(cfg.min_files_for_compaction, 10);
1118 assert_eq!(cfg.target_file_size, 128 * 1024 * 1024);
1119 assert!(cfg.z_order_columns.is_empty());
1120 assert_eq!(cfg.check_interval, Duration::from_secs(3600));
1121 }
1122
1123 #[test]
1124 fn test_partition_columns_empty_filter() {
1125 let mut pairs = required_pairs();
1126 pairs.push(("partition.columns", "a,,b, ,c"));
1127 let config = make_config(&pairs);
1128 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1129 assert_eq!(cfg.partition_columns, vec!["a", "b", "c"]);
1130 }
1131
1132 #[test]
1135 fn test_s3_path_requires_region() {
1136 let config = make_config(&[("table.path", "s3://my-bucket/trades")]);
1137 let result = DeltaLakeSinkConfig::from_config(&config);
1138 assert!(result.is_err());
1139 let err = result.unwrap_err().to_string();
1140 assert!(err.contains("aws_region"), "error: {err}");
1141 }
1142
1143 #[test]
1144 fn test_s3_path_with_region_and_credentials() {
1145 let config = make_config(&[
1146 ("table.path", "s3://my-bucket/trades"),
1147 ("storage.aws_region", "us-east-1"),
1148 ("storage.aws_access_key_id", "AKID123"),
1149 ("storage.aws_secret_access_key", "SECRET"),
1150 ]);
1151 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1152 assert_eq!(cfg.storage_options["aws_region"], "us-east-1");
1153 assert_eq!(cfg.storage_options["aws_access_key_id"], "AKID123");
1154 }
1155
1156 #[test]
1157 fn test_s3_path_with_region_only_warns_no_error() {
1158 let config = make_config(&[
1160 ("table.path", "s3://my-bucket/trades"),
1161 ("storage.aws_region", "us-east-1"),
1162 ]);
1163 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
1164 }
1165
1166 #[test]
1167 fn test_s3_path_access_key_without_secret_errors() {
1168 let config = make_config(&[
1169 ("table.path", "s3://my-bucket/trades"),
1170 ("storage.aws_region", "us-east-1"),
1171 ("storage.aws_access_key_id", "AKID123"),
1172 ]);
1173 let result = DeltaLakeSinkConfig::from_config(&config);
1174 assert!(result.is_err());
1175 let err = result.unwrap_err().to_string();
1176 assert!(err.contains("aws_secret_access_key"), "error: {err}");
1177 }
1178
1179 #[test]
1180 fn test_azure_path_requires_account_name() {
1181 let config = make_config(&[("table.path", "az://my-container/trades")]);
1182 let result = DeltaLakeSinkConfig::from_config(&config);
1183 assert!(result.is_err());
1184 let err = result.unwrap_err().to_string();
1185 assert!(err.contains("azure_storage_account_name"), "error: {err}");
1186 }
1187
1188 #[test]
1189 fn test_azure_path_with_account_name_and_key() {
1190 let config = make_config(&[
1191 ("table.path", "az://my-container/trades"),
1192 ("storage.azure_storage_account_name", "myaccount"),
1193 ("storage.azure_storage_account_key", "base64key=="),
1194 ]);
1195 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
1196 }
1197
1198 #[test]
1199 fn test_gcs_path_always_valid() {
1200 let config = make_config(&[("table.path", "gs://my-bucket/trades")]);
1202 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
1203 }
1204
1205 #[test]
1206 fn test_local_path_no_cloud_validation() {
1207 let config = make_config(&[("table.path", "/data/warehouse/trades")]);
1208 assert!(DeltaLakeSinkConfig::from_config(&config).is_ok());
1209 }
1210
1211 #[test]
1212 fn test_display_storage_options_redacts_secrets() {
1213 let mut cfg = DeltaLakeSinkConfig::new("s3://bucket/path");
1214 cfg.storage_options
1215 .insert("aws_region".to_string(), "us-east-1".to_string());
1216 cfg.storage_options.insert(
1217 "aws_secret_access_key".to_string(),
1218 "TOP_SECRET".to_string(),
1219 );
1220
1221 let display = cfg.display_storage_options();
1222 assert!(display.contains("aws_region=us-east-1"));
1223 assert!(display.contains("aws_secret_access_key=***"));
1224 assert!(!display.contains("TOP_SECRET"));
1225 }
1226
1227 #[test]
1228 fn test_display_storage_options_empty() {
1229 let cfg = DeltaLakeSinkConfig::new("/local/path");
1230 assert!(cfg.display_storage_options().is_empty());
1231 }
1232
1233 #[test]
1236 fn test_catalog_type_parse() {
1237 assert_eq!(
1238 "none".parse::<DeltaCatalogType>().unwrap(),
1239 DeltaCatalogType::None
1240 );
1241 assert_eq!(
1242 "glue".parse::<DeltaCatalogType>().unwrap(),
1243 DeltaCatalogType::Glue
1244 );
1245 assert!(matches!(
1246 "unity".parse::<DeltaCatalogType>().unwrap(),
1247 DeltaCatalogType::Unity { .. }
1248 ));
1249 assert!("unknown".parse::<DeltaCatalogType>().is_err());
1250 }
1251
1252 #[test]
1253 fn test_catalog_type_display() {
1254 assert_eq!(DeltaCatalogType::None.to_string(), "none");
1255 assert_eq!(DeltaCatalogType::Glue.to_string(), "glue");
1256 assert_eq!(
1257 DeltaCatalogType::Unity {
1258 workspace_url: "url".into(),
1259 access_token: "tok".into()
1260 }
1261 .to_string(),
1262 "unity"
1263 );
1264 }
1265
1266 #[test]
1267 fn test_catalog_none_default() {
1268 let config = make_config(&required_pairs());
1269 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1270 assert_eq!(cfg.catalog_type, DeltaCatalogType::None);
1271 assert!(cfg.catalog_database.is_none());
1272 assert!(cfg.catalog_name.is_none());
1273 assert!(cfg.catalog_schema.is_none());
1274 assert!(cfg.catalog_storage_location.is_none());
1275 }
1276
1277 #[cfg(feature = "delta-lake-glue")]
1278 #[test]
1279 fn test_catalog_glue_valid() {
1280 let mut pairs = required_pairs();
1281 pairs.extend_from_slice(&[
1282 ("catalog.type", "glue"),
1283 ("catalog.database", "my_database"),
1284 ]);
1285 let config = make_config(&pairs);
1286 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1287 assert_eq!(cfg.catalog_type, DeltaCatalogType::Glue);
1288 assert_eq!(cfg.catalog_database.as_deref(), Some("my_database"));
1289 }
1290
1291 #[cfg(feature = "delta-lake-glue")]
1292 #[test]
1293 fn test_catalog_glue_missing_database() {
1294 let mut pairs = required_pairs();
1295 pairs.push(("catalog.type", "glue"));
1296 let config = make_config(&pairs);
1297 let result = DeltaLakeSinkConfig::from_config(&config);
1298 assert!(result.is_err());
1299 let err = result.unwrap_err().to_string();
1300 assert!(err.contains("catalog.database"), "error: {err}");
1301 }
1302
1303 #[cfg(feature = "delta-lake-unity")]
1304 #[test]
1305 fn test_catalog_unity_valid() {
1306 let mut pairs = required_pairs();
1307 pairs.extend_from_slice(&[
1308 ("catalog.type", "unity"),
1309 ("catalog.workspace_url", "https://my.databricks.com"),
1310 ("catalog.access_token", "dapi123"),
1311 ("catalog.name", "main"),
1312 ("catalog.schema", "default"),
1313 ]);
1314 let config = make_config(&pairs);
1315 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1316 assert!(matches!(
1317 cfg.catalog_type,
1318 DeltaCatalogType::Unity {
1319 ref workspace_url,
1320 ref access_token
1321 }
1322 if workspace_url == "https://my.databricks.com"
1323 && access_token == "dapi123"
1324 ));
1325 assert_eq!(cfg.catalog_name.as_deref(), Some("main"));
1326 assert_eq!(cfg.catalog_schema.as_deref(), Some("default"));
1327 }
1328
1329 #[cfg(feature = "delta-lake-unity")]
1330 #[test]
1331 fn test_catalog_unity_missing_workspace_url() {
1332 let mut pairs = required_pairs();
1333 pairs.extend_from_slice(&[
1334 ("catalog.type", "unity"),
1335 ("catalog.access_token", "dapi123"),
1336 ("catalog.name", "main"),
1337 ("catalog.schema", "default"),
1338 ]);
1339 let config = make_config(&pairs);
1340 let result = DeltaLakeSinkConfig::from_config(&config);
1341 assert!(result.is_err());
1342 let err = result.unwrap_err().to_string();
1343 assert!(err.contains("workspace_url"), "error: {err}");
1344 }
1345
1346 #[cfg(feature = "delta-lake-unity")]
1347 #[test]
1348 fn test_catalog_unity_missing_access_token() {
1349 let mut pairs = required_pairs();
1350 pairs.extend_from_slice(&[
1351 ("catalog.type", "unity"),
1352 ("catalog.workspace_url", "https://my.databricks.com"),
1353 ("catalog.name", "main"),
1354 ("catalog.schema", "default"),
1355 ]);
1356 let config = make_config(&pairs);
1357 let result = DeltaLakeSinkConfig::from_config(&config);
1358 assert!(result.is_err());
1359 let err = result.unwrap_err().to_string();
1360 assert!(err.contains("access_token"), "error: {err}");
1361 }
1362
1363 #[test]
1364 fn test_catalog_storage_location_default_none() {
1365 let config = make_config(&required_pairs());
1366 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1367 assert!(cfg.catalog_storage_location.is_none());
1368 }
1369
1370 #[test]
1371 fn test_catalog_storage_location_parsed() {
1372 let mut pairs = required_pairs();
1373 pairs.push(("catalog.storage.location", "s3://bucket/warehouse/table"));
1374 let config = make_config(&pairs);
1375 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1376 assert_eq!(
1377 cfg.catalog_storage_location.as_deref(),
1378 Some("s3://bucket/warehouse/table")
1379 );
1380 }
1381
1382 #[test]
1385 fn test_parquet_config_defaults() {
1386 let cfg = ParquetWriteConfig::default();
1387 assert_eq!(cfg.compression, "zstd");
1388 assert_eq!(cfg.compression_level, 1);
1389 assert_eq!(cfg.compaction_compression_level, Some(3));
1390 assert!(cfg.dictionary_enabled);
1391 assert_eq!(cfg.statistics, "page");
1392 assert!(cfg.bloom_filter_columns.is_empty());
1393 assert!((cfg.bloom_filter_fpp - 0.01).abs() < f64::EPSILON);
1394 assert_eq!(cfg.bloom_filter_ndv, 0);
1395 assert_eq!(cfg.max_row_group_size, 1_000_000);
1396 }
1397
1398 #[test]
1399 fn test_parquet_compression_parsing() {
1400 for codec in &["zstd", "snappy", "lz4", "gzip", "none"] {
1401 let mut pairs = required_pairs();
1402 pairs.push(("parquet.compression", codec));
1403 let config = make_config(&pairs);
1404 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1405 assert_eq!(cfg.parquet.compression, *codec);
1406 }
1407 }
1408
1409 #[test]
1410 fn test_parquet_compression_level_parsing() {
1411 let mut pairs = required_pairs();
1412 pairs.push(("parquet.compression.level", "5"));
1413 let config = make_config(&pairs);
1414 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1415 assert_eq!(cfg.parquet.compression_level, 5);
1416 }
1417
1418 #[test]
1419 fn test_parquet_compression_level_invalid() {
1420 let mut pairs = required_pairs();
1421 pairs.push(("parquet.compression.level", "abc"));
1422 let config = make_config(&pairs);
1423 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1424 }
1425
1426 #[test]
1427 fn test_parquet_compaction_compression_level() {
1428 let mut pairs = required_pairs();
1429 pairs.push(("parquet.compaction.compression.level", "7"));
1430 let config = make_config(&pairs);
1431 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1432 assert_eq!(cfg.parquet.compaction_compression_level, Some(7));
1433 }
1434
1435 #[test]
1436 fn test_parquet_bloom_filter_columns_parsing() {
1437 let mut pairs = required_pairs();
1438 pairs.push((
1439 "parquet.bloom.filter.columns",
1440 " user_id , event_type , ts ",
1441 ));
1442 let config = make_config(&pairs);
1443 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1444 assert_eq!(
1445 cfg.parquet.bloom_filter_columns,
1446 vec!["user_id", "event_type", "ts"]
1447 );
1448 }
1449
1450 #[test]
1451 fn test_parquet_bloom_filter_fpp_validation() {
1452 let mut pairs = required_pairs();
1454 pairs.push(("parquet.bloom.filter.fpp", "0.0"));
1455 let config = make_config(&pairs);
1456 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1457
1458 let mut pairs = required_pairs();
1460 pairs.push(("parquet.bloom.filter.fpp", "1.0"));
1461 let config = make_config(&pairs);
1462 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1463 }
1464
1465 #[test]
1466 fn test_parquet_max_row_group_size_zero_rejected() {
1467 let mut pairs = required_pairs();
1468 pairs.push(("parquet.max.row.group.size", "0"));
1469 let config = make_config(&pairs);
1470 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1471 }
1472
1473 #[test]
1474 fn test_parquet_statistics_parsing() {
1475 for stat in &["none", "chunk", "page"] {
1476 let mut pairs = required_pairs();
1477 pairs.push(("parquet.statistics", stat));
1478 let config = make_config(&pairs);
1479 let cfg = DeltaLakeSinkConfig::from_config(&config).unwrap();
1480 assert_eq!(cfg.parquet.statistics, *stat);
1481 }
1482 }
1483
1484 #[test]
1485 fn test_parquet_invalid_statistics_rejected() {
1486 let mut pairs = required_pairs();
1487 pairs.push(("parquet.statistics", "full"));
1488 let config = make_config(&pairs);
1489 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1490 }
1491
1492 #[test]
1493 fn test_parquet_invalid_compression_rejected() {
1494 let mut pairs = required_pairs();
1495 pairs.push(("parquet.compression", "brotli"));
1496 let config = make_config(&pairs);
1497 assert!(DeltaLakeSinkConfig::from_config(&config).is_err());
1498 }
1499
1500 #[cfg(feature = "delta-lake")]
1501 #[test]
1502 fn test_writer_properties_default_zstd() {
1503 let cfg = ParquetWriteConfig::default();
1504 assert!(cfg.to_writer_properties().is_ok());
1505 }
1506
1507 #[cfg(feature = "delta-lake")]
1508 #[test]
1509 fn test_compaction_writer_properties_higher_level() {
1510 let cfg = ParquetWriteConfig::default();
1511 assert!(cfg.compaction_writer_properties().is_ok());
1513 }
1514
1515 #[cfg(feature = "delta-lake")]
1516 #[test]
1517 fn test_writer_properties_invalid_codec() {
1518 let mut cfg = ParquetWriteConfig::default();
1519 cfg.compression = "brotli".to_string();
1520 assert!(cfg.to_writer_properties().is_err());
1521 }
1522
1523 #[cfg(feature = "delta-lake")]
1524 #[test]
1525 fn test_writer_properties_with_bloom_filters() {
1526 let mut cfg = ParquetWriteConfig::default();
1527 cfg.bloom_filter_columns = vec!["user_id".to_string(), "event_type".to_string()];
1528 assert!(cfg.to_writer_properties().is_ok());
1529 }
1530}