Skip to main content

laminar_connectors/lakehouse/
delta_source_config.rs

1//! Delta Lake source config. Parsed from SQL `WITH (...)` via
2//! [`DeltaSourceConfig::from_config`].
3#![allow(clippy::disallowed_types)] // cold path: lakehouse configuration
4
5use std::collections::HashMap;
6use std::fmt;
7use std::str::FromStr;
8use std::time::Duration;
9
10use crate::config::ConnectorConfig;
11use crate::error::ConnectorError;
12use crate::storage::{
13    CloudConfigValidator, ResolvedStorageOptions, SecretMasker, StorageCredentialResolver,
14    StorageProvider,
15};
16
17use super::delta_config::DeltaCatalogType;
18
19/// Read mode for the Delta Lake source.
20///
21/// Controls whether the source reads full snapshots or incremental changes.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub enum DeltaReadMode {
24    /// Read full table snapshot at each new version.
25    ///
26    /// Every version change triggers a complete table scan. Useful for
27    /// batch-style materialization or tables small enough to re-read.
28    Snapshot,
29    /// Read data version-by-version for incremental processing.
30    ///
31    /// Walks versions one-by-one from `current_version + 1` to latest.
32    /// Each version's data is emitted exactly once.
33    #[default]
34    Incremental,
35}
36
37impl FromStr for DeltaReadMode {
38    type Err = String;
39
40    fn from_str(s: &str) -> Result<Self, Self::Err> {
41        match s.to_lowercase().as_str() {
42            "snapshot" | "batch" => Ok(Self::Snapshot),
43            "incremental" | "streaming" | "stream" => Ok(Self::Incremental),
44            other => Err(format!("unknown read mode: '{other}'")),
45        }
46    }
47}
48
49impl fmt::Display for DeltaReadMode {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        match self {
52            Self::Snapshot => write!(f, "snapshot"),
53            Self::Incremental => write!(f, "incremental"),
54        }
55    }
56}
57
58/// Action to take when schema evolution is detected across versions.
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
60pub enum SchemaEvolutionAction {
61    /// Log a warning and continue with the new schema.
62    #[default]
63    Warn,
64    /// Return an error and stop the source.
65    Error,
66}
67
68impl FromStr for SchemaEvolutionAction {
69    type Err = String;
70
71    fn from_str(s: &str) -> Result<Self, Self::Err> {
72        match s.to_lowercase().as_str() {
73            "warn" | "warning" => Ok(Self::Warn),
74            "error" | "fail" => Ok(Self::Error),
75            other => Err(format!("unknown schema evolution action: '{other}'")),
76        }
77    }
78}
79
80impl fmt::Display for SchemaEvolutionAction {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        match self {
83            Self::Warn => write!(f, "warn"),
84            Self::Error => write!(f, "error"),
85        }
86    }
87}
88
89/// Configuration for the Delta Lake source connector.
90///
91/// Parsed from SQL `WITH (...)` clause options or constructed programmatically.
92#[derive(Debug, Clone)]
93pub struct DeltaSourceConfig {
94    /// Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`).
95    pub table_path: String,
96
97    /// Starting version to read from. `None` means start from the latest version.
98    pub starting_version: Option<i64>,
99
100    /// How often to poll for new versions (default: 1 second).
101    pub poll_interval: Duration,
102
103    /// Read mode: snapshot (full re-read) or incremental (changes only).
104    pub read_mode: DeltaReadMode,
105
106    /// Optional partition filter predicate (SQL expression, e.g. `"date = '2024-01-01'"`).
107    pub partition_filter: Option<String>,
108
109    /// Action to take on schema evolution between versions.
110    pub schema_evolution_action: SchemaEvolutionAction,
111
112    /// Use Change Data Feed for incremental reads (requires CDF on table).
113    pub cdf_enabled: bool,
114
115    /// Storage options (S3 credentials, Azure keys, etc.).
116    pub storage_options: HashMap<String, String>,
117
118    /// Catalog type for table discovery.
119    pub catalog_type: DeltaCatalogType,
120
121    /// Catalog database name (required for Glue).
122    pub catalog_database: Option<String>,
123
124    /// Catalog name (required for Unity).
125    pub catalog_name: Option<String>,
126
127    /// Catalog schema name (required for Unity).
128    pub catalog_schema: Option<String>,
129}
130
131impl Default for DeltaSourceConfig {
132    fn default() -> Self {
133        Self {
134            table_path: String::new(),
135            starting_version: None,
136            poll_interval: Duration::from_secs(1),
137            read_mode: DeltaReadMode::default(),
138            partition_filter: None,
139            schema_evolution_action: SchemaEvolutionAction::default(),
140            cdf_enabled: false,
141            storage_options: HashMap::new(),
142            catalog_type: DeltaCatalogType::None,
143            catalog_database: None,
144            catalog_name: None,
145            catalog_schema: None,
146        }
147    }
148}
149
150impl DeltaSourceConfig {
151    /// Creates a minimal config for testing.
152    #[must_use]
153    pub fn new(table_path: &str) -> Self {
154        Self {
155            table_path: table_path.to_string(),
156            ..Default::default()
157        }
158    }
159
160    /// Parses a source config from a [`ConnectorConfig`] (SQL WITH clause).
161    ///
162    /// # Required keys
163    ///
164    /// - `table.path` - Path to Delta Lake table
165    ///
166    /// # Errors
167    ///
168    /// Returns `ConnectorError::MissingConfig` if required keys are absent,
169    /// or `ConnectorError::ConfigurationError` on invalid values.
170    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
171        let mut cfg = Self {
172            table_path: config.require("table.path")?.to_string(),
173            ..Self::default()
174        };
175
176        if let Some(v) = config.get("starting.version") {
177            cfg.starting_version = Some(v.parse().map_err(|_| {
178                ConnectorError::ConfigurationError(format!("invalid starting.version: '{v}'"))
179            })?);
180        }
181        if let Some(v) = config.get("poll.interval.ms") {
182            let ms: u64 = v.parse().map_err(|_| {
183                ConnectorError::ConfigurationError(format!("invalid poll.interval.ms: '{v}'"))
184            })?;
185            cfg.poll_interval = Duration::from_millis(ms);
186        }
187        if let Some(v) = config.get("read.mode") {
188            cfg.read_mode = v.parse().map_err(|_| {
189                ConnectorError::ConfigurationError(format!(
190                    "invalid read.mode: '{v}' (expected 'snapshot' or 'incremental')"
191                ))
192            })?;
193        }
194        if let Some(v) = config.get("partition.filter") {
195            let trimmed = v.trim();
196            if !trimmed.is_empty() {
197                cfg.partition_filter = Some(trimmed.to_string());
198            }
199        }
200        if let Some(v) = config.get("cdf.enabled") {
201            cfg.cdf_enabled = v.eq_ignore_ascii_case("true");
202        }
203        if let Some(v) = config.get("schema.evolution.action") {
204            cfg.schema_evolution_action = v.parse().map_err(|_| {
205                ConnectorError::ConfigurationError(format!(
206                    "invalid schema.evolution.action: '{v}' (expected 'warn' or 'error')"
207                ))
208            })?;
209        }
210
211        // ── Catalog configuration ──
212        if let Some(v) = config.get("catalog.type") {
213            cfg.catalog_type = v.parse().map_err(|_| {
214                ConnectorError::ConfigurationError(format!(
215                    "invalid catalog.type: '{v}' (expected 'none', 'glue', or 'unity')"
216                ))
217            })?;
218        }
219        if let Some(v) = config.get("catalog.database") {
220            cfg.catalog_database = Some(v.to_string());
221        }
222        if let Some(v) = config.get("catalog.name") {
223            cfg.catalog_name = Some(v.to_string());
224        }
225        if let Some(v) = config.get("catalog.schema") {
226            cfg.catalog_schema = Some(v.to_string());
227        }
228        if let DeltaCatalogType::Unity {
229            ref mut workspace_url,
230            ref mut access_token,
231        } = cfg.catalog_type
232        {
233            if let Some(v) = config.get("catalog.workspace_url") {
234                *workspace_url = v.to_string();
235            }
236            if let Some(v) = config.get("catalog.access_token") {
237                *access_token = v.to_string();
238            }
239        }
240        // Resolve storage credentials.
241        let explicit_storage = config.properties_with_prefix("storage.");
242        let resolved = StorageCredentialResolver::resolve(&cfg.table_path, &explicit_storage);
243        cfg.storage_options = resolved.options;
244
245        // Map LogStore configuration keys to delta-rs storage options.
246        if let Some(v) = config.get("storage.s3_locking_provider") {
247            cfg.storage_options
248                .insert("AWS_S3_LOCKING_PROVIDER".to_string(), v.to_string());
249        }
250        if let Some(v) = config.get("storage.dynamodb_table_name") {
251            cfg.storage_options
252                .insert("DELTA_DYNAMO_TABLE_NAME".to_string(), v.to_string());
253        }
254
255        cfg.validate()?;
256        Ok(cfg)
257    }
258
259    /// Formats the storage options for safe logging with secrets redacted.
260    #[must_use]
261    pub fn display_storage_options(&self) -> String {
262        SecretMasker::display_map(&self.storage_options)
263    }
264
265    /// Validates the configuration for consistency.
266    ///
267    /// # Errors
268    ///
269    /// Returns `ConnectorError::ConfigurationError` on invalid combinations.
270    pub fn validate(&self) -> Result<(), ConnectorError> {
271        if self.table_path.is_empty() {
272            return Err(ConnectorError::missing_config("table.path"));
273        }
274
275        // Validate catalog-specific requirements.
276        match &self.catalog_type {
277            DeltaCatalogType::None => {}
278            DeltaCatalogType::Glue => {
279                if self.catalog_database.is_none() {
280                    return Err(ConnectorError::ConfigurationError(
281                        "Glue catalog requires 'catalog.database' to be set".into(),
282                    ));
283                }
284            }
285            DeltaCatalogType::Unity {
286                workspace_url,
287                access_token,
288            } => {
289                if workspace_url.is_empty() {
290                    return Err(ConnectorError::ConfigurationError(
291                        "Unity catalog requires 'catalog.workspace_url' to be set".into(),
292                    ));
293                }
294                if access_token.is_empty() {
295                    return Err(ConnectorError::ConfigurationError(
296                        "Unity catalog requires 'catalog.access_token' to be set".into(),
297                    ));
298                }
299                if self.catalog_name.is_none() {
300                    return Err(ConnectorError::ConfigurationError(
301                        "Unity catalog requires 'catalog.name' to be set".into(),
302                    ));
303                }
304                if self.catalog_schema.is_none() {
305                    return Err(ConnectorError::ConfigurationError(
306                        "Unity catalog requires 'catalog.schema' to be set".into(),
307                    ));
308                }
309            }
310        }
311
312        // Validate cloud storage credentials for the detected provider.
313        let resolved = ResolvedStorageOptions {
314            provider: StorageProvider::detect(&self.table_path),
315            options: self.storage_options.clone(),
316            env_resolved_keys: Vec::new(),
317        };
318        let cloud_result = CloudConfigValidator::validate(&resolved);
319        if !cloud_result.is_valid() {
320            return Err(ConnectorError::ConfigurationError(
321                cloud_result.error_message(),
322            ));
323        }
324
325        Ok(())
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
334        let mut config = ConnectorConfig::new("delta-lake-source");
335        for (k, v) in pairs {
336            config.set(*k, *v);
337        }
338        config
339    }
340
341    #[test]
342    fn test_defaults() {
343        let cfg = DeltaSourceConfig::default();
344        assert!(cfg.table_path.is_empty());
345        assert!(cfg.starting_version.is_none());
346        assert_eq!(cfg.poll_interval, Duration::from_secs(1));
347    }
348
349    #[test]
350    fn test_new_helper() {
351        let cfg = DeltaSourceConfig::new("/tmp/test_table");
352        assert_eq!(cfg.table_path, "/tmp/test_table");
353    }
354
355    #[test]
356    fn test_parse_required_fields() {
357        let config = make_config(&[("table.path", "/data/warehouse/trades")]);
358        let cfg = DeltaSourceConfig::from_config(&config).unwrap();
359        assert_eq!(cfg.table_path, "/data/warehouse/trades");
360        assert!(cfg.starting_version.is_none());
361    }
362
363    #[test]
364    fn test_missing_table_path() {
365        let config = ConnectorConfig::new("delta-lake-source");
366        assert!(DeltaSourceConfig::from_config(&config).is_err());
367    }
368
369    #[test]
370    fn test_parse_optional_fields() {
371        let config = make_config(&[
372            ("table.path", "/data/test"),
373            ("starting.version", "5"),
374            ("poll.interval.ms", "500"),
375        ]);
376        let cfg = DeltaSourceConfig::from_config(&config).unwrap();
377        assert_eq!(cfg.starting_version, Some(5));
378        assert_eq!(cfg.poll_interval, Duration::from_millis(500));
379    }
380
381    #[test]
382    fn test_invalid_starting_version() {
383        let config = make_config(&[("table.path", "/data/test"), ("starting.version", "abc")]);
384        assert!(DeltaSourceConfig::from_config(&config).is_err());
385    }
386
387    #[test]
388    fn test_empty_table_path_rejected() {
389        let mut cfg = DeltaSourceConfig::default();
390        cfg.table_path = String::new();
391        assert!(cfg.validate().is_err());
392    }
393
394    // ── New config fields tests ──
395
396    #[test]
397    fn test_read_mode_defaults_to_incremental() {
398        let cfg = DeltaSourceConfig::default();
399        assert_eq!(cfg.read_mode, DeltaReadMode::Incremental);
400    }
401
402    #[test]
403    fn test_read_mode_parse() {
404        assert_eq!(
405            "snapshot".parse::<DeltaReadMode>().unwrap(),
406            DeltaReadMode::Snapshot
407        );
408        assert_eq!(
409            "batch".parse::<DeltaReadMode>().unwrap(),
410            DeltaReadMode::Snapshot
411        );
412        assert_eq!(
413            "incremental".parse::<DeltaReadMode>().unwrap(),
414            DeltaReadMode::Incremental
415        );
416        assert_eq!(
417            "streaming".parse::<DeltaReadMode>().unwrap(),
418            DeltaReadMode::Incremental
419        );
420        assert_eq!(
421            "stream".parse::<DeltaReadMode>().unwrap(),
422            DeltaReadMode::Incremental
423        );
424        assert!("unknown".parse::<DeltaReadMode>().is_err());
425    }
426
427    #[test]
428    fn test_read_mode_display() {
429        assert_eq!(DeltaReadMode::Snapshot.to_string(), "snapshot");
430        assert_eq!(DeltaReadMode::Incremental.to_string(), "incremental");
431    }
432
433    #[test]
434    fn test_read_mode_from_config() {
435        let config = make_config(&[("table.path", "/data/test"), ("read.mode", "snapshot")]);
436        let cfg = DeltaSourceConfig::from_config(&config).unwrap();
437        assert_eq!(cfg.read_mode, DeltaReadMode::Snapshot);
438    }
439
440    #[test]
441    fn test_read_mode_invalid() {
442        let config = make_config(&[("table.path", "/data/test"), ("read.mode", "invalid")]);
443        assert!(DeltaSourceConfig::from_config(&config).is_err());
444    }
445
446    #[test]
447    fn test_partition_filter_from_config() {
448        let config = make_config(&[
449            ("table.path", "/data/test"),
450            ("partition.filter", "date = '2024-01-01'"),
451        ]);
452        let cfg = DeltaSourceConfig::from_config(&config).unwrap();
453        assert_eq!(cfg.partition_filter.as_deref(), Some("date = '2024-01-01'"));
454    }
455
456    #[test]
457    fn test_partition_filter_empty_is_none() {
458        let config = make_config(&[("table.path", "/data/test"), ("partition.filter", "")]);
459        let cfg = DeltaSourceConfig::from_config(&config).unwrap();
460        assert!(cfg.partition_filter.is_none());
461    }
462
463    #[test]
464    fn test_schema_evolution_action_parse() {
465        assert_eq!(
466            "warn".parse::<SchemaEvolutionAction>().unwrap(),
467            SchemaEvolutionAction::Warn
468        );
469        assert_eq!(
470            "error".parse::<SchemaEvolutionAction>().unwrap(),
471            SchemaEvolutionAction::Error
472        );
473        assert_eq!(
474            "fail".parse::<SchemaEvolutionAction>().unwrap(),
475            SchemaEvolutionAction::Error
476        );
477        assert!("unknown".parse::<SchemaEvolutionAction>().is_err());
478    }
479
480    #[test]
481    fn test_schema_evolution_action_from_config() {
482        let config = make_config(&[
483            ("table.path", "/data/test"),
484            ("schema.evolution.action", "error"),
485        ]);
486        let cfg = DeltaSourceConfig::from_config(&config).unwrap();
487        assert_eq!(cfg.schema_evolution_action, SchemaEvolutionAction::Error);
488    }
489
490    #[test]
491    fn test_schema_evolution_action_default() {
492        let cfg = DeltaSourceConfig::default();
493        assert_eq!(cfg.schema_evolution_action, SchemaEvolutionAction::Warn);
494    }
495}