Skip to main content

laminar_connectors/lakehouse/
mod.rs

1//! Lakehouse connectors (Delta Lake).
2//!
3//! Writes Arrow `RecordBatch` data to lakehouse table formats with
4//! ACID transactions and at-least-once delivery (exactly-once opt-in).
5//!
6//! # Architecture
7//!
8//! ```text
9//! Ring 0 (Hot Path):  SPSC push only (~5ns, zero sink code)
10//! Ring 1 (Background): Batch buffering -> Parquet writes -> transaction commits
11//! Ring 2 (Control):    Schema management, configuration, health checks
12//! ```
13//!
14//! # Module Structure
15//!
16//! ## Delta Lake
17//! - `delta` - `DeltaLakeSink` implementing `SinkConnector`
18//! - `delta_config` - Configuration and enums
19//! - `delta_metrics` - Lock-free atomic metrics
20//!
21//! # Usage
22//!
23//! ## Delta Lake
24//!
25//! ```rust,ignore
26//! use laminar_connectors::lakehouse::{DeltaLakeSink, DeltaLakeSinkConfig, DeltaWriteMode};
27//!
28//! let config = DeltaLakeSinkConfig {
29//!     table_path: "s3://data-lake/trades/".to_string(),
30//!     write_mode: DeltaWriteMode::Append,
31//!     partition_columns: vec!["trade_date".to_string()],
32//!     ..Default::default()
33//! };
34//!
35//! let sink = DeltaLakeSink::new(config);
36//! ```
37
38// Delta Lake modules
39pub mod delta;
40pub mod delta_config;
41#[cfg(feature = "delta-lake")]
42pub mod delta_io;
43pub mod delta_metrics;
44pub mod delta_source;
45pub mod delta_source_config;
46#[cfg(feature = "delta-lake")]
47pub mod delta_table_provider;
48
49// Common metrics
50pub mod metrics;
51
52// Re-export Delta Lake types at module level.
53pub use delta::DeltaLakeSink;
54pub use delta_config::{
55    CompactionConfig, DeliveryGuarantee, DeltaCatalogType, DeltaLakeSinkConfig, DeltaWriteMode,
56};
57pub use delta_metrics::DeltaLakeSinkMetrics;
58pub use delta_source::DeltaSource;
59pub use delta_source_config::DeltaSourceConfig;
60pub use metrics::LakehouseSinkMetrics;
61
62use std::sync::Arc;
63
64use crate::config::{ConfigKeySpec, ConnectorInfo};
65use crate::registry::ConnectorRegistry;
66
67/// Registers the Delta Lake sink connector with the given registry.
68pub fn register_delta_lake_sink(registry: &ConnectorRegistry) {
69    let info = ConnectorInfo {
70        name: "delta-lake".to_string(),
71        display_name: "Delta Lake Sink".to_string(),
72        version: env!("CARGO_PKG_VERSION").to_string(),
73        is_source: false,
74        is_sink: true,
75        config_keys: delta_lake_config_keys(),
76    };
77
78    registry.register_sink(
79        "delta-lake",
80        info,
81        Arc::new(|| Box::new(DeltaLakeSink::new(DeltaLakeSinkConfig::default()))),
82    );
83}
84
85/// Registers the Delta Lake source connector with the given registry.
86pub fn register_delta_lake_source(registry: &ConnectorRegistry) {
87    let info = ConnectorInfo {
88        name: "delta-lake".to_string(),
89        display_name: "Delta Lake Source".to_string(),
90        version: env!("CARGO_PKG_VERSION").to_string(),
91        is_source: true,
92        is_sink: false,
93        config_keys: delta_lake_source_config_keys(),
94    };
95
96    registry.register_source(
97        "delta-lake",
98        info,
99        Arc::new(|| Box::new(DeltaSource::new(DeltaSourceConfig::default()))),
100    );
101}
102
103/// Registers all lakehouse sink connectors (Delta Lake).
104pub fn register_lakehouse_sinks(registry: &ConnectorRegistry) {
105    register_delta_lake_sink(registry);
106}
107
108#[allow(clippy::too_many_lines)]
109fn delta_lake_config_keys() -> Vec<ConfigKeySpec> {
110    vec![
111        ConfigKeySpec::required(
112            "table.path",
113            "Path to Delta Lake table (local, s3://, az://, gs://)",
114        ),
115        ConfigKeySpec::optional(
116            "partition.columns",
117            "Comma-separated partition column names",
118            "",
119        ),
120        ConfigKeySpec::optional(
121            "target.file.size",
122            "Target Parquet file size in bytes",
123            "134217728",
124        ),
125        ConfigKeySpec::optional(
126            "max.buffer.records",
127            "Maximum records to buffer before flushing",
128            "100000",
129        ),
130        ConfigKeySpec::optional(
131            "max.buffer.duration.ms",
132            "Maximum time to buffer before flushing (ms)",
133            "60000",
134        ),
135        ConfigKeySpec::optional(
136            "checkpoint.interval",
137            "Create Delta checkpoint every N commits",
138            "10",
139        ),
140        ConfigKeySpec::optional(
141            "schema.evolution",
142            "Enable automatic schema evolution (additive columns)",
143            "false",
144        ),
145        ConfigKeySpec::optional(
146            "write.mode",
147            "Write mode: append, overwrite, upsert",
148            "append",
149        ),
150        ConfigKeySpec::optional(
151            "merge.key.columns",
152            "Key columns for upsert MERGE (required for upsert mode)",
153            "",
154        ),
155        ConfigKeySpec::optional(
156            "delivery.guarantee",
157            "exactly-once or at-least-once",
158            "at-least-once",
159        ),
160        ConfigKeySpec::optional(
161            "compaction.enabled",
162            "Enable background OPTIMIZE compaction",
163            "true",
164        ),
165        ConfigKeySpec::optional(
166            "compaction.z-order.columns",
167            "Columns for Z-ORDER clustering",
168            "",
169        ),
170        ConfigKeySpec::optional(
171            "compaction.min-files",
172            "Minimum files before triggering compaction",
173            "10",
174        ),
175        ConfigKeySpec::optional(
176            "vacuum.retention.hours",
177            "Hours to retain old files during VACUUM",
178            "168",
179        ),
180        ConfigKeySpec::optional(
181            "writer.id",
182            "Writer ID for exactly-once deduplication (auto UUID if not set)",
183            "",
184        ),
185        // ── Catalog configuration ──
186        ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
187        ConfigKeySpec::optional(
188            "catalog.database",
189            "Catalog database name (required for Glue)",
190            "",
191        ),
192        ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
193        ConfigKeySpec::optional(
194            "catalog.schema",
195            "Catalog schema name (required for Unity)",
196            "",
197        ),
198        ConfigKeySpec::optional(
199            "catalog.workspace_url",
200            "Databricks workspace URL (required for Unity)",
201            "",
202        ),
203        ConfigKeySpec::optional(
204            "catalog.access_token",
205            "Databricks access token (required for Unity)",
206            "",
207        ),
208        ConfigKeySpec::optional(
209            "catalog.prop.*",
210            "Catalog-specific properties (pass-through)",
211            "",
212        ),
213        // ── Cloud storage credentials (resolved via StorageCredentialResolver) ──
214        ConfigKeySpec::optional(
215            "storage.aws_access_key_id",
216            "AWS access key ID (falls back to AWS_ACCESS_KEY_ID env var)",
217            "",
218        ),
219        ConfigKeySpec::optional(
220            "storage.aws_secret_access_key",
221            "AWS secret access key (falls back to AWS_SECRET_ACCESS_KEY env var)",
222            "",
223        ),
224        ConfigKeySpec::optional(
225            "storage.aws_region",
226            "AWS region for S3 paths (falls back to AWS_REGION env var)",
227            "",
228        ),
229        ConfigKeySpec::optional(
230            "storage.aws_session_token",
231            "AWS session token for temporary credentials (falls back to AWS_SESSION_TOKEN)",
232            "",
233        ),
234        ConfigKeySpec::optional(
235            "storage.aws_endpoint",
236            "Custom S3 endpoint (MinIO, LocalStack; falls back to AWS_ENDPOINT_URL)",
237            "",
238        ),
239        ConfigKeySpec::optional(
240            "storage.aws_profile",
241            "AWS profile name (falls back to AWS_PROFILE env var)",
242            "",
243        ),
244        ConfigKeySpec::optional(
245            "storage.azure_storage_account_name",
246            "Azure storage account name (falls back to AZURE_STORAGE_ACCOUNT_NAME)",
247            "",
248        ),
249        ConfigKeySpec::optional(
250            "storage.azure_storage_account_key",
251            "Azure storage account key (falls back to AZURE_STORAGE_ACCOUNT_KEY)",
252            "",
253        ),
254        ConfigKeySpec::optional(
255            "storage.azure_storage_sas_token",
256            "Azure SAS token (falls back to AZURE_STORAGE_SAS_TOKEN)",
257            "",
258        ),
259        ConfigKeySpec::optional(
260            "storage.azure_storage_client_id",
261            "Azure client ID for service principal auth (falls back to AZURE_CLIENT_ID)",
262            "",
263        ),
264        ConfigKeySpec::optional(
265            "storage.google_service_account_path",
266            "Path to GCS service account JSON (falls back to GOOGLE_APPLICATION_CREDENTIALS)",
267            "",
268        ),
269        ConfigKeySpec::optional(
270            "storage.google_service_account_key",
271            "Inline GCS service account JSON (falls back to GOOGLE_SERVICE_ACCOUNT_KEY)",
272            "",
273        ),
274    ]
275}
276
277fn delta_lake_source_config_keys() -> Vec<ConfigKeySpec> {
278    vec![
279        ConfigKeySpec::required(
280            "table.path",
281            "Path to Delta Lake table (local, s3://, az://, gs://)",
282        ),
283        ConfigKeySpec::optional(
284            "starting.version",
285            "Starting version to read from (default: latest)",
286            "",
287        ),
288        ConfigKeySpec::optional(
289            "poll.interval.ms",
290            "How often to poll for new versions (ms)",
291            "1000",
292        ),
293        // ── Catalog configuration ──
294        ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
295        ConfigKeySpec::optional(
296            "catalog.database",
297            "Catalog database name (required for Glue)",
298            "",
299        ),
300        ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
301        ConfigKeySpec::optional(
302            "catalog.schema",
303            "Catalog schema name (required for Unity)",
304            "",
305        ),
306        ConfigKeySpec::optional(
307            "catalog.workspace_url",
308            "Databricks workspace URL (required for Unity)",
309            "",
310        ),
311        ConfigKeySpec::optional(
312            "catalog.access_token",
313            "Databricks access token (required for Unity)",
314            "",
315        ),
316        ConfigKeySpec::optional(
317            "catalog.prop.*",
318            "Catalog-specific properties (pass-through)",
319            "",
320        ),
321        // ── Cloud storage credentials ──
322        ConfigKeySpec::optional("storage.aws_access_key_id", "AWS access key ID", ""),
323        ConfigKeySpec::optional("storage.aws_secret_access_key", "AWS secret access key", ""),
324        ConfigKeySpec::optional("storage.aws_region", "AWS region for S3 paths", ""),
325        ConfigKeySpec::optional(
326            "storage.azure_storage_account_name",
327            "Azure storage account name",
328            "",
329        ),
330        ConfigKeySpec::optional(
331            "storage.azure_storage_account_key",
332            "Azure storage account key",
333            "",
334        ),
335        ConfigKeySpec::optional(
336            "storage.google_service_account_path",
337            "Path to GCS service account JSON",
338            "",
339        ),
340    ]
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    #[test]
348    fn test_register_delta_lake_sink() {
349        let registry = ConnectorRegistry::new();
350        register_delta_lake_sink(&registry);
351
352        let info = registry.sink_info("delta-lake");
353        assert!(info.is_some());
354        let info = info.unwrap();
355        assert_eq!(info.name, "delta-lake");
356        assert!(info.is_sink);
357        assert!(!info.is_source);
358        assert!(!info.config_keys.is_empty());
359    }
360
361    #[test]
362    fn test_config_keys_required() {
363        let keys = delta_lake_config_keys();
364        let required: Vec<&str> = keys
365            .iter()
366            .filter(|k| k.required)
367            .map(|k| k.key.as_str())
368            .collect();
369        assert!(required.contains(&"table.path"));
370        assert_eq!(required.len(), 1);
371    }
372
373    #[test]
374    fn test_config_keys_include_cloud_storage() {
375        let keys = delta_lake_config_keys();
376        let key_names: Vec<&str> = keys.iter().map(|k| k.key.as_str()).collect();
377        assert!(key_names.contains(&"storage.aws_access_key_id"));
378        assert!(key_names.contains(&"storage.aws_secret_access_key"));
379        assert!(key_names.contains(&"storage.aws_region"));
380        assert!(key_names.contains(&"storage.azure_storage_account_name"));
381        assert!(key_names.contains(&"storage.azure_storage_account_key"));
382        assert!(key_names.contains(&"storage.google_service_account_path"));
383    }
384
385    #[test]
386    fn test_config_keys_optional_present() {
387        let keys = delta_lake_config_keys();
388        let optional: Vec<&str> = keys
389            .iter()
390            .filter(|k| !k.required)
391            .map(|k| k.key.as_str())
392            .collect();
393        assert!(optional.contains(&"partition.columns"));
394        assert!(optional.contains(&"target.file.size"));
395        assert!(optional.contains(&"write.mode"));
396        assert!(optional.contains(&"delivery.guarantee"));
397        assert!(optional.contains(&"merge.key.columns"));
398        assert!(optional.contains(&"schema.evolution"));
399        assert!(optional.contains(&"compaction.enabled"));
400        assert!(optional.contains(&"compaction.z-order.columns"));
401        assert!(optional.contains(&"vacuum.retention.hours"));
402        assert!(optional.contains(&"writer.id"));
403        // Catalog keys
404        assert!(optional.contains(&"catalog.type"));
405        assert!(optional.contains(&"catalog.database"));
406        assert!(optional.contains(&"catalog.name"));
407        assert!(optional.contains(&"catalog.schema"));
408        assert!(optional.contains(&"catalog.workspace_url"));
409        assert!(optional.contains(&"catalog.access_token"));
410        assert!(optional.contains(&"catalog.prop.*"));
411    }
412
413    #[test]
414    fn test_factory_creates_sink() {
415        let registry = ConnectorRegistry::new();
416        register_delta_lake_sink(&registry);
417
418        let config = crate::config::ConnectorConfig::new("delta-lake");
419        let sink = registry.create_sink(&config);
420        assert!(sink.is_ok());
421    }
422
423    // ── Delta Lake source registration tests ──
424
425    #[test]
426    fn test_register_delta_lake_source() {
427        let registry = ConnectorRegistry::new();
428        register_delta_lake_source(&registry);
429
430        let info = registry.source_info("delta-lake");
431        assert!(info.is_some());
432        let info = info.unwrap();
433        assert_eq!(info.name, "delta-lake");
434        assert!(info.is_source);
435        assert!(!info.is_sink);
436        assert!(!info.config_keys.is_empty());
437    }
438
439    #[test]
440    fn test_source_config_keys() {
441        let keys = delta_lake_source_config_keys();
442        let required: Vec<&str> = keys
443            .iter()
444            .filter(|k| k.required)
445            .map(|k| k.key.as_str())
446            .collect();
447        assert!(required.contains(&"table.path"));
448        assert_eq!(required.len(), 1);
449
450        let optional: Vec<&str> = keys
451            .iter()
452            .filter(|k| !k.required)
453            .map(|k| k.key.as_str())
454            .collect();
455        assert!(optional.contains(&"starting.version"));
456        assert!(optional.contains(&"poll.interval.ms"));
457        // Catalog keys
458        assert!(optional.contains(&"catalog.type"));
459        assert!(optional.contains(&"catalog.database"));
460    }
461
462    #[test]
463    fn test_factory_creates_source() {
464        let registry = ConnectorRegistry::new();
465        register_delta_lake_source(&registry);
466
467        let config = crate::config::ConnectorConfig::new("delta-lake");
468        let source = registry.create_source(&config);
469        assert!(source.is_ok());
470    }
471
472    #[test]
473    fn test_register_lakehouse_sinks() {
474        let registry = ConnectorRegistry::new();
475        register_lakehouse_sinks(&registry);
476
477        assert!(registry.sink_info("delta-lake").is_some());
478    }
479}