Skip to main content

laminar_connectors/lakehouse/
delta_source_config.rs

1//! Delta Lake source connector configuration.
2//!
3//! [`DeltaSourceConfig`] encapsulates all settings for reading Arrow
4//! `RecordBatch` data from Delta Lake tables, parsed from SQL `WITH (...)`
5//! clauses via [`from_config`](DeltaSourceConfig::from_config).
6#![allow(clippy::disallowed_types)] // cold path: lakehouse configuration
7
8use std::collections::HashMap;
9use std::time::Duration;
10
11use crate::config::ConnectorConfig;
12use crate::error::ConnectorError;
13use crate::storage::{
14    CloudConfigValidator, ResolvedStorageOptions, SecretMasker, StorageCredentialResolver,
15    StorageProvider,
16};
17
18use super::delta_config::DeltaCatalogType;
19
20/// Configuration for the Delta Lake source connector.
21///
22/// Parsed from SQL `WITH (...)` clause options or constructed programmatically.
23#[derive(Debug, Clone)]
24pub struct DeltaSourceConfig {
25    /// Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`).
26    pub table_path: String,
27
28    /// Starting version to read from. `None` means start from the latest version.
29    pub starting_version: Option<i64>,
30
31    /// How often to poll for new versions (default: 1 second).
32    pub poll_interval: Duration,
33
34    /// Storage options (S3 credentials, Azure keys, etc.).
35    pub storage_options: HashMap<String, String>,
36
37    /// Catalog type for table discovery.
38    pub catalog_type: DeltaCatalogType,
39
40    /// Catalog database name (required for Glue).
41    pub catalog_database: Option<String>,
42
43    /// Catalog name (required for Unity).
44    pub catalog_name: Option<String>,
45
46    /// Catalog schema name (required for Unity).
47    pub catalog_schema: Option<String>,
48
49    /// Additional catalog-specific properties.
50    pub catalog_properties: HashMap<String, String>,
51}
52
53impl Default for DeltaSourceConfig {
54    fn default() -> Self {
55        Self {
56            table_path: String::new(),
57            starting_version: None,
58            poll_interval: Duration::from_millis(1000),
59            storage_options: HashMap::new(),
60            catalog_type: DeltaCatalogType::None,
61            catalog_database: None,
62            catalog_name: None,
63            catalog_schema: None,
64            catalog_properties: HashMap::new(),
65        }
66    }
67}
68
69impl DeltaSourceConfig {
70    /// Creates a minimal config for testing.
71    #[must_use]
72    pub fn new(table_path: &str) -> Self {
73        Self {
74            table_path: table_path.to_string(),
75            ..Default::default()
76        }
77    }
78
79    /// Parses a source config from a [`ConnectorConfig`] (SQL WITH clause).
80    ///
81    /// # Required keys
82    ///
83    /// - `table.path` - Path to Delta Lake table
84    ///
85    /// # Errors
86    ///
87    /// Returns `ConnectorError::MissingConfig` if required keys are absent,
88    /// or `ConnectorError::ConfigurationError` on invalid values.
89    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
90        let mut cfg = Self {
91            table_path: config.require("table.path")?.to_string(),
92            ..Self::default()
93        };
94
95        if let Some(v) = config.get("starting.version") {
96            cfg.starting_version = Some(v.parse().map_err(|_| {
97                ConnectorError::ConfigurationError(format!("invalid starting.version: '{v}'"))
98            })?);
99        }
100        if let Some(v) = config.get("poll.interval.ms") {
101            let ms: u64 = v.parse().map_err(|_| {
102                ConnectorError::ConfigurationError(format!("invalid poll.interval.ms: '{v}'"))
103            })?;
104            cfg.poll_interval = Duration::from_millis(ms);
105        }
106
107        // ── Catalog configuration ──
108        if let Some(v) = config.get("catalog.type") {
109            cfg.catalog_type = v.parse().map_err(|_| {
110                ConnectorError::ConfigurationError(format!(
111                    "invalid catalog.type: '{v}' (expected 'none', 'glue', or 'unity')"
112                ))
113            })?;
114        }
115        if let Some(v) = config.get("catalog.database") {
116            cfg.catalog_database = Some(v.to_string());
117        }
118        if let Some(v) = config.get("catalog.name") {
119            cfg.catalog_name = Some(v.to_string());
120        }
121        if let Some(v) = config.get("catalog.schema") {
122            cfg.catalog_schema = Some(v.to_string());
123        }
124        if let DeltaCatalogType::Unity {
125            ref mut workspace_url,
126            ref mut access_token,
127        } = cfg.catalog_type
128        {
129            if let Some(v) = config.get("catalog.workspace_url") {
130                *workspace_url = v.to_string();
131            }
132            if let Some(v) = config.get("catalog.access_token") {
133                *access_token = v.to_string();
134            }
135        }
136        cfg.catalog_properties = config.properties_with_prefix("catalog.prop.");
137
138        // Resolve storage credentials.
139        let explicit_storage = config.properties_with_prefix("storage.");
140        let resolved = StorageCredentialResolver::resolve(&cfg.table_path, &explicit_storage);
141        cfg.storage_options = resolved.options;
142
143        cfg.validate()?;
144        Ok(cfg)
145    }
146
147    /// Formats the storage options for safe logging with secrets redacted.
148    #[must_use]
149    pub fn display_storage_options(&self) -> String {
150        SecretMasker::display_map(&self.storage_options)
151    }
152
153    /// Validates the configuration for consistency.
154    ///
155    /// # Errors
156    ///
157    /// Returns `ConnectorError::ConfigurationError` on invalid combinations.
158    pub fn validate(&self) -> Result<(), ConnectorError> {
159        if self.table_path.is_empty() {
160            return Err(ConnectorError::MissingConfig("table.path".into()));
161        }
162
163        // Validate catalog-specific requirements.
164        match &self.catalog_type {
165            DeltaCatalogType::None => {}
166            DeltaCatalogType::Glue => {
167                if self.catalog_database.is_none() {
168                    return Err(ConnectorError::ConfigurationError(
169                        "Glue catalog requires 'catalog.database' to be set".into(),
170                    ));
171                }
172            }
173            DeltaCatalogType::Unity {
174                workspace_url,
175                access_token,
176            } => {
177                if workspace_url.is_empty() {
178                    return Err(ConnectorError::ConfigurationError(
179                        "Unity catalog requires 'catalog.workspace_url' to be set".into(),
180                    ));
181                }
182                if access_token.is_empty() {
183                    return Err(ConnectorError::ConfigurationError(
184                        "Unity catalog requires 'catalog.access_token' to be set".into(),
185                    ));
186                }
187                if self.catalog_name.is_none() {
188                    return Err(ConnectorError::ConfigurationError(
189                        "Unity catalog requires 'catalog.name' to be set".into(),
190                    ));
191                }
192                if self.catalog_schema.is_none() {
193                    return Err(ConnectorError::ConfigurationError(
194                        "Unity catalog requires 'catalog.schema' to be set".into(),
195                    ));
196                }
197            }
198        }
199
200        // Validate cloud storage credentials for the detected provider.
201        let resolved = ResolvedStorageOptions {
202            provider: StorageProvider::detect(&self.table_path),
203            options: self.storage_options.clone(),
204            env_resolved_keys: Vec::new(),
205        };
206        let cloud_result = CloudConfigValidator::validate(&resolved);
207        if !cloud_result.is_valid() {
208            return Err(ConnectorError::ConfigurationError(
209                cloud_result.error_message(),
210            ));
211        }
212
213        Ok(())
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
222        let mut config = ConnectorConfig::new("delta-lake-source");
223        for (k, v) in pairs {
224            config.set(*k, *v);
225        }
226        config
227    }
228
229    #[test]
230    fn test_defaults() {
231        let cfg = DeltaSourceConfig::default();
232        assert!(cfg.table_path.is_empty());
233        assert!(cfg.starting_version.is_none());
234        assert_eq!(cfg.poll_interval, Duration::from_millis(1000));
235    }
236
237    #[test]
238    fn test_new_helper() {
239        let cfg = DeltaSourceConfig::new("/tmp/test_table");
240        assert_eq!(cfg.table_path, "/tmp/test_table");
241    }
242
243    #[test]
244    fn test_parse_required_fields() {
245        let config = make_config(&[("table.path", "/data/warehouse/trades")]);
246        let cfg = DeltaSourceConfig::from_config(&config).unwrap();
247        assert_eq!(cfg.table_path, "/data/warehouse/trades");
248        assert!(cfg.starting_version.is_none());
249    }
250
251    #[test]
252    fn test_missing_table_path() {
253        let config = ConnectorConfig::new("delta-lake-source");
254        assert!(DeltaSourceConfig::from_config(&config).is_err());
255    }
256
257    #[test]
258    fn test_parse_optional_fields() {
259        let config = make_config(&[
260            ("table.path", "/data/test"),
261            ("starting.version", "5"),
262            ("poll.interval.ms", "500"),
263        ]);
264        let cfg = DeltaSourceConfig::from_config(&config).unwrap();
265        assert_eq!(cfg.starting_version, Some(5));
266        assert_eq!(cfg.poll_interval, Duration::from_millis(500));
267    }
268
269    #[test]
270    fn test_invalid_starting_version() {
271        let config = make_config(&[("table.path", "/data/test"), ("starting.version", "abc")]);
272        assert!(DeltaSourceConfig::from_config(&config).is_err());
273    }
274
275    #[test]
276    fn test_empty_table_path_rejected() {
277        let mut cfg = DeltaSourceConfig::default();
278        cfg.table_path = String::new();
279        assert!(cfg.validate().is_err());
280    }
281}