laminar_connectors/lakehouse/
delta_source_config.rs1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
9use std::time::Duration;
10
11use crate::config::ConnectorConfig;
12use crate::error::ConnectorError;
13use crate::storage::{
14 CloudConfigValidator, ResolvedStorageOptions, SecretMasker, StorageCredentialResolver,
15 StorageProvider,
16};
17
18use super::delta_config::DeltaCatalogType;
19
20#[derive(Debug, Clone)]
24pub struct DeltaSourceConfig {
25 pub table_path: String,
27
28 pub starting_version: Option<i64>,
30
31 pub poll_interval: Duration,
33
34 pub storage_options: HashMap<String, String>,
36
37 pub catalog_type: DeltaCatalogType,
39
40 pub catalog_database: Option<String>,
42
43 pub catalog_name: Option<String>,
45
46 pub catalog_schema: Option<String>,
48
49 pub catalog_properties: HashMap<String, String>,
51}
52
53impl Default for DeltaSourceConfig {
54 fn default() -> Self {
55 Self {
56 table_path: String::new(),
57 starting_version: None,
58 poll_interval: Duration::from_millis(1000),
59 storage_options: HashMap::new(),
60 catalog_type: DeltaCatalogType::None,
61 catalog_database: None,
62 catalog_name: None,
63 catalog_schema: None,
64 catalog_properties: HashMap::new(),
65 }
66 }
67}
68
69impl DeltaSourceConfig {
70 #[must_use]
72 pub fn new(table_path: &str) -> Self {
73 Self {
74 table_path: table_path.to_string(),
75 ..Default::default()
76 }
77 }
78
79 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
90 let mut cfg = Self {
91 table_path: config.require("table.path")?.to_string(),
92 ..Self::default()
93 };
94
95 if let Some(v) = config.get("starting.version") {
96 cfg.starting_version = Some(v.parse().map_err(|_| {
97 ConnectorError::ConfigurationError(format!("invalid starting.version: '{v}'"))
98 })?);
99 }
100 if let Some(v) = config.get("poll.interval.ms") {
101 let ms: u64 = v.parse().map_err(|_| {
102 ConnectorError::ConfigurationError(format!("invalid poll.interval.ms: '{v}'"))
103 })?;
104 cfg.poll_interval = Duration::from_millis(ms);
105 }
106
107 if let Some(v) = config.get("catalog.type") {
109 cfg.catalog_type = v.parse().map_err(|_| {
110 ConnectorError::ConfigurationError(format!(
111 "invalid catalog.type: '{v}' (expected 'none', 'glue', or 'unity')"
112 ))
113 })?;
114 }
115 if let Some(v) = config.get("catalog.database") {
116 cfg.catalog_database = Some(v.to_string());
117 }
118 if let Some(v) = config.get("catalog.name") {
119 cfg.catalog_name = Some(v.to_string());
120 }
121 if let Some(v) = config.get("catalog.schema") {
122 cfg.catalog_schema = Some(v.to_string());
123 }
124 if let DeltaCatalogType::Unity {
125 ref mut workspace_url,
126 ref mut access_token,
127 } = cfg.catalog_type
128 {
129 if let Some(v) = config.get("catalog.workspace_url") {
130 *workspace_url = v.to_string();
131 }
132 if let Some(v) = config.get("catalog.access_token") {
133 *access_token = v.to_string();
134 }
135 }
136 cfg.catalog_properties = config.properties_with_prefix("catalog.prop.");
137
138 let explicit_storage = config.properties_with_prefix("storage.");
140 let resolved = StorageCredentialResolver::resolve(&cfg.table_path, &explicit_storage);
141 cfg.storage_options = resolved.options;
142
143 cfg.validate()?;
144 Ok(cfg)
145 }
146
147 #[must_use]
149 pub fn display_storage_options(&self) -> String {
150 SecretMasker::display_map(&self.storage_options)
151 }
152
153 pub fn validate(&self) -> Result<(), ConnectorError> {
159 if self.table_path.is_empty() {
160 return Err(ConnectorError::MissingConfig("table.path".into()));
161 }
162
163 match &self.catalog_type {
165 DeltaCatalogType::None => {}
166 DeltaCatalogType::Glue => {
167 if self.catalog_database.is_none() {
168 return Err(ConnectorError::ConfigurationError(
169 "Glue catalog requires 'catalog.database' to be set".into(),
170 ));
171 }
172 }
173 DeltaCatalogType::Unity {
174 workspace_url,
175 access_token,
176 } => {
177 if workspace_url.is_empty() {
178 return Err(ConnectorError::ConfigurationError(
179 "Unity catalog requires 'catalog.workspace_url' to be set".into(),
180 ));
181 }
182 if access_token.is_empty() {
183 return Err(ConnectorError::ConfigurationError(
184 "Unity catalog requires 'catalog.access_token' to be set".into(),
185 ));
186 }
187 if self.catalog_name.is_none() {
188 return Err(ConnectorError::ConfigurationError(
189 "Unity catalog requires 'catalog.name' to be set".into(),
190 ));
191 }
192 if self.catalog_schema.is_none() {
193 return Err(ConnectorError::ConfigurationError(
194 "Unity catalog requires 'catalog.schema' to be set".into(),
195 ));
196 }
197 }
198 }
199
200 let resolved = ResolvedStorageOptions {
202 provider: StorageProvider::detect(&self.table_path),
203 options: self.storage_options.clone(),
204 env_resolved_keys: Vec::new(),
205 };
206 let cloud_result = CloudConfigValidator::validate(&resolved);
207 if !cloud_result.is_valid() {
208 return Err(ConnectorError::ConfigurationError(
209 cloud_result.error_message(),
210 ));
211 }
212
213 Ok(())
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
222 let mut config = ConnectorConfig::new("delta-lake-source");
223 for (k, v) in pairs {
224 config.set(*k, *v);
225 }
226 config
227 }
228
229 #[test]
230 fn test_defaults() {
231 let cfg = DeltaSourceConfig::default();
232 assert!(cfg.table_path.is_empty());
233 assert!(cfg.starting_version.is_none());
234 assert_eq!(cfg.poll_interval, Duration::from_millis(1000));
235 }
236
237 #[test]
238 fn test_new_helper() {
239 let cfg = DeltaSourceConfig::new("/tmp/test_table");
240 assert_eq!(cfg.table_path, "/tmp/test_table");
241 }
242
243 #[test]
244 fn test_parse_required_fields() {
245 let config = make_config(&[("table.path", "/data/warehouse/trades")]);
246 let cfg = DeltaSourceConfig::from_config(&config).unwrap();
247 assert_eq!(cfg.table_path, "/data/warehouse/trades");
248 assert!(cfg.starting_version.is_none());
249 }
250
251 #[test]
252 fn test_missing_table_path() {
253 let config = ConnectorConfig::new("delta-lake-source");
254 assert!(DeltaSourceConfig::from_config(&config).is_err());
255 }
256
257 #[test]
258 fn test_parse_optional_fields() {
259 let config = make_config(&[
260 ("table.path", "/data/test"),
261 ("starting.version", "5"),
262 ("poll.interval.ms", "500"),
263 ]);
264 let cfg = DeltaSourceConfig::from_config(&config).unwrap();
265 assert_eq!(cfg.starting_version, Some(5));
266 assert_eq!(cfg.poll_interval, Duration::from_millis(500));
267 }
268
269 #[test]
270 fn test_invalid_starting_version() {
271 let config = make_config(&[("table.path", "/data/test"), ("starting.version", "abc")]);
272 assert!(DeltaSourceConfig::from_config(&config).is_err());
273 }
274
275 #[test]
276 fn test_empty_table_path_rejected() {
277 let mut cfg = DeltaSourceConfig::default();
278 cfg.table_path = String::new();
279 assert!(cfg.validate().is_err());
280 }
281}