1pub mod delta;
40pub mod delta_config;
41#[cfg(feature = "delta-lake")]
42pub mod delta_io;
43pub mod delta_metrics;
44pub mod delta_source;
45pub mod delta_source_config;
46#[cfg(feature = "delta-lake")]
47pub mod delta_table_provider;
48
49pub mod metrics;
51
52pub use delta::DeltaLakeSink;
54pub use delta_config::{
55 CompactionConfig, DeliveryGuarantee, DeltaCatalogType, DeltaLakeSinkConfig, DeltaWriteMode,
56};
57pub use delta_metrics::DeltaLakeSinkMetrics;
58pub use delta_source::DeltaSource;
59pub use delta_source_config::DeltaSourceConfig;
60pub use metrics::LakehouseSinkMetrics;
61
62use std::sync::Arc;
63
64use crate::config::{ConfigKeySpec, ConnectorInfo};
65use crate::registry::ConnectorRegistry;
66
67pub fn register_delta_lake_sink(registry: &ConnectorRegistry) {
69 let info = ConnectorInfo {
70 name: "delta-lake".to_string(),
71 display_name: "Delta Lake Sink".to_string(),
72 version: env!("CARGO_PKG_VERSION").to_string(),
73 is_source: false,
74 is_sink: true,
75 config_keys: delta_lake_config_keys(),
76 };
77
78 registry.register_sink(
79 "delta-lake",
80 info,
81 Arc::new(|| Box::new(DeltaLakeSink::new(DeltaLakeSinkConfig::default()))),
82 );
83}
84
85pub fn register_delta_lake_source(registry: &ConnectorRegistry) {
87 let info = ConnectorInfo {
88 name: "delta-lake".to_string(),
89 display_name: "Delta Lake Source".to_string(),
90 version: env!("CARGO_PKG_VERSION").to_string(),
91 is_source: true,
92 is_sink: false,
93 config_keys: delta_lake_source_config_keys(),
94 };
95
96 registry.register_source(
97 "delta-lake",
98 info,
99 Arc::new(|| Box::new(DeltaSource::new(DeltaSourceConfig::default()))),
100 );
101}
102
103pub fn register_lakehouse_sinks(registry: &ConnectorRegistry) {
105 register_delta_lake_sink(registry);
106}
107
108#[allow(clippy::too_many_lines)]
109fn delta_lake_config_keys() -> Vec<ConfigKeySpec> {
110 vec![
111 ConfigKeySpec::required(
112 "table.path",
113 "Path to Delta Lake table (local, s3://, az://, gs://)",
114 ),
115 ConfigKeySpec::optional(
116 "partition.columns",
117 "Comma-separated partition column names",
118 "",
119 ),
120 ConfigKeySpec::optional(
121 "target.file.size",
122 "Target Parquet file size in bytes",
123 "134217728",
124 ),
125 ConfigKeySpec::optional(
126 "max.buffer.records",
127 "Maximum records to buffer before flushing",
128 "100000",
129 ),
130 ConfigKeySpec::optional(
131 "max.buffer.duration.ms",
132 "Maximum time to buffer before flushing (ms)",
133 "60000",
134 ),
135 ConfigKeySpec::optional(
136 "checkpoint.interval",
137 "Create Delta checkpoint every N commits",
138 "10",
139 ),
140 ConfigKeySpec::optional(
141 "schema.evolution",
142 "Enable automatic schema evolution (additive columns)",
143 "false",
144 ),
145 ConfigKeySpec::optional(
146 "write.mode",
147 "Write mode: append, overwrite, upsert",
148 "append",
149 ),
150 ConfigKeySpec::optional(
151 "merge.key.columns",
152 "Key columns for upsert MERGE (required for upsert mode)",
153 "",
154 ),
155 ConfigKeySpec::optional(
156 "delivery.guarantee",
157 "exactly-once or at-least-once",
158 "at-least-once",
159 ),
160 ConfigKeySpec::optional(
161 "compaction.enabled",
162 "Enable background OPTIMIZE compaction",
163 "true",
164 ),
165 ConfigKeySpec::optional(
166 "compaction.z-order.columns",
167 "Columns for Z-ORDER clustering",
168 "",
169 ),
170 ConfigKeySpec::optional(
171 "compaction.min-files",
172 "Minimum files before triggering compaction",
173 "10",
174 ),
175 ConfigKeySpec::optional(
176 "vacuum.retention.hours",
177 "Hours to retain old files during VACUUM",
178 "168",
179 ),
180 ConfigKeySpec::optional(
181 "writer.id",
182 "Writer ID for exactly-once deduplication (auto UUID if not set)",
183 "",
184 ),
185 ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
187 ConfigKeySpec::optional(
188 "catalog.database",
189 "Catalog database name (required for Glue)",
190 "",
191 ),
192 ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
193 ConfigKeySpec::optional(
194 "catalog.schema",
195 "Catalog schema name (required for Unity)",
196 "",
197 ),
198 ConfigKeySpec::optional(
199 "catalog.workspace_url",
200 "Databricks workspace URL (required for Unity)",
201 "",
202 ),
203 ConfigKeySpec::optional(
204 "catalog.access_token",
205 "Databricks access token (required for Unity)",
206 "",
207 ),
208 ConfigKeySpec::optional(
209 "catalog.prop.*",
210 "Catalog-specific properties (pass-through)",
211 "",
212 ),
213 ConfigKeySpec::optional(
215 "storage.aws_access_key_id",
216 "AWS access key ID (falls back to AWS_ACCESS_KEY_ID env var)",
217 "",
218 ),
219 ConfigKeySpec::optional(
220 "storage.aws_secret_access_key",
221 "AWS secret access key (falls back to AWS_SECRET_ACCESS_KEY env var)",
222 "",
223 ),
224 ConfigKeySpec::optional(
225 "storage.aws_region",
226 "AWS region for S3 paths (falls back to AWS_REGION env var)",
227 "",
228 ),
229 ConfigKeySpec::optional(
230 "storage.aws_session_token",
231 "AWS session token for temporary credentials (falls back to AWS_SESSION_TOKEN)",
232 "",
233 ),
234 ConfigKeySpec::optional(
235 "storage.aws_endpoint",
236 "Custom S3 endpoint (MinIO, LocalStack; falls back to AWS_ENDPOINT_URL)",
237 "",
238 ),
239 ConfigKeySpec::optional(
240 "storage.aws_profile",
241 "AWS profile name (falls back to AWS_PROFILE env var)",
242 "",
243 ),
244 ConfigKeySpec::optional(
245 "storage.azure_storage_account_name",
246 "Azure storage account name (falls back to AZURE_STORAGE_ACCOUNT_NAME)",
247 "",
248 ),
249 ConfigKeySpec::optional(
250 "storage.azure_storage_account_key",
251 "Azure storage account key (falls back to AZURE_STORAGE_ACCOUNT_KEY)",
252 "",
253 ),
254 ConfigKeySpec::optional(
255 "storage.azure_storage_sas_token",
256 "Azure SAS token (falls back to AZURE_STORAGE_SAS_TOKEN)",
257 "",
258 ),
259 ConfigKeySpec::optional(
260 "storage.azure_storage_client_id",
261 "Azure client ID for service principal auth (falls back to AZURE_CLIENT_ID)",
262 "",
263 ),
264 ConfigKeySpec::optional(
265 "storage.google_service_account_path",
266 "Path to GCS service account JSON (falls back to GOOGLE_APPLICATION_CREDENTIALS)",
267 "",
268 ),
269 ConfigKeySpec::optional(
270 "storage.google_service_account_key",
271 "Inline GCS service account JSON (falls back to GOOGLE_SERVICE_ACCOUNT_KEY)",
272 "",
273 ),
274 ]
275}
276
277fn delta_lake_source_config_keys() -> Vec<ConfigKeySpec> {
278 vec![
279 ConfigKeySpec::required(
280 "table.path",
281 "Path to Delta Lake table (local, s3://, az://, gs://)",
282 ),
283 ConfigKeySpec::optional(
284 "starting.version",
285 "Starting version to read from (default: latest)",
286 "",
287 ),
288 ConfigKeySpec::optional(
289 "poll.interval.ms",
290 "How often to poll for new versions (ms)",
291 "1000",
292 ),
293 ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
295 ConfigKeySpec::optional(
296 "catalog.database",
297 "Catalog database name (required for Glue)",
298 "",
299 ),
300 ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
301 ConfigKeySpec::optional(
302 "catalog.schema",
303 "Catalog schema name (required for Unity)",
304 "",
305 ),
306 ConfigKeySpec::optional(
307 "catalog.workspace_url",
308 "Databricks workspace URL (required for Unity)",
309 "",
310 ),
311 ConfigKeySpec::optional(
312 "catalog.access_token",
313 "Databricks access token (required for Unity)",
314 "",
315 ),
316 ConfigKeySpec::optional(
317 "catalog.prop.*",
318 "Catalog-specific properties (pass-through)",
319 "",
320 ),
321 ConfigKeySpec::optional("storage.aws_access_key_id", "AWS access key ID", ""),
323 ConfigKeySpec::optional("storage.aws_secret_access_key", "AWS secret access key", ""),
324 ConfigKeySpec::optional("storage.aws_region", "AWS region for S3 paths", ""),
325 ConfigKeySpec::optional(
326 "storage.azure_storage_account_name",
327 "Azure storage account name",
328 "",
329 ),
330 ConfigKeySpec::optional(
331 "storage.azure_storage_account_key",
332 "Azure storage account key",
333 "",
334 ),
335 ConfigKeySpec::optional(
336 "storage.google_service_account_path",
337 "Path to GCS service account JSON",
338 "",
339 ),
340 ]
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346
347 #[test]
348 fn test_register_delta_lake_sink() {
349 let registry = ConnectorRegistry::new();
350 register_delta_lake_sink(®istry);
351
352 let info = registry.sink_info("delta-lake");
353 assert!(info.is_some());
354 let info = info.unwrap();
355 assert_eq!(info.name, "delta-lake");
356 assert!(info.is_sink);
357 assert!(!info.is_source);
358 assert!(!info.config_keys.is_empty());
359 }
360
361 #[test]
362 fn test_config_keys_required() {
363 let keys = delta_lake_config_keys();
364 let required: Vec<&str> = keys
365 .iter()
366 .filter(|k| k.required)
367 .map(|k| k.key.as_str())
368 .collect();
369 assert!(required.contains(&"table.path"));
370 assert_eq!(required.len(), 1);
371 }
372
373 #[test]
374 fn test_config_keys_include_cloud_storage() {
375 let keys = delta_lake_config_keys();
376 let key_names: Vec<&str> = keys.iter().map(|k| k.key.as_str()).collect();
377 assert!(key_names.contains(&"storage.aws_access_key_id"));
378 assert!(key_names.contains(&"storage.aws_secret_access_key"));
379 assert!(key_names.contains(&"storage.aws_region"));
380 assert!(key_names.contains(&"storage.azure_storage_account_name"));
381 assert!(key_names.contains(&"storage.azure_storage_account_key"));
382 assert!(key_names.contains(&"storage.google_service_account_path"));
383 }
384
385 #[test]
386 fn test_config_keys_optional_present() {
387 let keys = delta_lake_config_keys();
388 let optional: Vec<&str> = keys
389 .iter()
390 .filter(|k| !k.required)
391 .map(|k| k.key.as_str())
392 .collect();
393 assert!(optional.contains(&"partition.columns"));
394 assert!(optional.contains(&"target.file.size"));
395 assert!(optional.contains(&"write.mode"));
396 assert!(optional.contains(&"delivery.guarantee"));
397 assert!(optional.contains(&"merge.key.columns"));
398 assert!(optional.contains(&"schema.evolution"));
399 assert!(optional.contains(&"compaction.enabled"));
400 assert!(optional.contains(&"compaction.z-order.columns"));
401 assert!(optional.contains(&"vacuum.retention.hours"));
402 assert!(optional.contains(&"writer.id"));
403 assert!(optional.contains(&"catalog.type"));
405 assert!(optional.contains(&"catalog.database"));
406 assert!(optional.contains(&"catalog.name"));
407 assert!(optional.contains(&"catalog.schema"));
408 assert!(optional.contains(&"catalog.workspace_url"));
409 assert!(optional.contains(&"catalog.access_token"));
410 assert!(optional.contains(&"catalog.prop.*"));
411 }
412
413 #[test]
414 fn test_factory_creates_sink() {
415 let registry = ConnectorRegistry::new();
416 register_delta_lake_sink(®istry);
417
418 let config = crate::config::ConnectorConfig::new("delta-lake");
419 let sink = registry.create_sink(&config);
420 assert!(sink.is_ok());
421 }
422
423 #[test]
426 fn test_register_delta_lake_source() {
427 let registry = ConnectorRegistry::new();
428 register_delta_lake_source(®istry);
429
430 let info = registry.source_info("delta-lake");
431 assert!(info.is_some());
432 let info = info.unwrap();
433 assert_eq!(info.name, "delta-lake");
434 assert!(info.is_source);
435 assert!(!info.is_sink);
436 assert!(!info.config_keys.is_empty());
437 }
438
439 #[test]
440 fn test_source_config_keys() {
441 let keys = delta_lake_source_config_keys();
442 let required: Vec<&str> = keys
443 .iter()
444 .filter(|k| k.required)
445 .map(|k| k.key.as_str())
446 .collect();
447 assert!(required.contains(&"table.path"));
448 assert_eq!(required.len(), 1);
449
450 let optional: Vec<&str> = keys
451 .iter()
452 .filter(|k| !k.required)
453 .map(|k| k.key.as_str())
454 .collect();
455 assert!(optional.contains(&"starting.version"));
456 assert!(optional.contains(&"poll.interval.ms"));
457 assert!(optional.contains(&"catalog.type"));
459 assert!(optional.contains(&"catalog.database"));
460 }
461
462 #[test]
463 fn test_factory_creates_source() {
464 let registry = ConnectorRegistry::new();
465 register_delta_lake_source(®istry);
466
467 let config = crate::config::ConnectorConfig::new("delta-lake");
468 let source = registry.create_source(&config);
469 assert!(source.is_ok());
470 }
471
472 #[test]
473 fn test_register_lakehouse_sinks() {
474 let registry = ConnectorRegistry::new();
475 register_lakehouse_sinks(®istry);
476
477 assert!(registry.sink_info("delta-lake").is_some());
478 }
479}