Skip to main content

laminar_connectors/lakehouse/
mod.rs

1//! Lakehouse connectors (Delta Lake, Apache Iceberg).
2
3// Delta Lake modules
4pub mod delta;
5pub mod delta_config;
6#[cfg(feature = "delta-lake")]
7pub mod delta_io;
8pub mod delta_metrics;
9pub mod delta_source;
10pub mod delta_source_config;
11#[cfg(feature = "delta-lake")]
12pub mod delta_table_provider;
13#[cfg(feature = "delta-lake-unity")]
14pub(crate) mod unity_catalog;
15
16// Apache Iceberg modules
17pub mod iceberg;
18pub mod iceberg_config;
19#[cfg(feature = "iceberg")]
20pub mod iceberg_incremental;
21#[cfg(feature = "iceberg")]
22pub mod iceberg_io;
23pub mod iceberg_reference;
24pub mod iceberg_source;
25
26// Common metrics
27pub mod metrics;
28
29// Re-export Delta Lake types at module level.
30pub use delta::DeltaLakeSink;
31pub use delta_config::{
32    CompactionConfig, DeliveryGuarantee, DeltaCatalogType, DeltaLakeSinkConfig, DeltaWriteMode,
33};
34pub use delta_metrics::DeltaLakeSinkMetrics;
35pub use delta_source::DeltaSource;
36pub use delta_source_config::{DeltaReadMode, DeltaSourceConfig, SchemaEvolutionAction};
37pub use metrics::LakehouseSinkMetrics;
38
39// Re-export Iceberg types at module level.
40pub use iceberg::IcebergSink;
41pub use iceberg_config::{
42    IcebergCatalogConfig, IcebergCatalogType, IcebergSinkConfig, IcebergSourceConfig,
43};
44pub use iceberg_reference::IcebergReferenceTableSource;
45pub use iceberg_source::IcebergSource;
46
47use std::sync::Arc;
48
49use crate::config::{ConfigKeySpec, ConnectorInfo};
50use crate::registry::ConnectorRegistry;
51
52/// Registers the Delta Lake sink connector with the given registry.
53pub fn register_delta_lake_sink(registry: &ConnectorRegistry) {
54    let info = ConnectorInfo {
55        name: "delta-lake".to_string(),
56        display_name: "Delta Lake Sink".to_string(),
57        version: env!("CARGO_PKG_VERSION").to_string(),
58        is_source: false,
59        is_sink: true,
60        config_keys: delta_lake_config_keys(),
61    };
62
63    registry.register_sink(
64        "delta-lake",
65        info,
66        Arc::new(|registry: Option<&prometheus::Registry>| {
67            Box::new(DeltaLakeSink::new(DeltaLakeSinkConfig::default(), registry))
68        }),
69    );
70}
71
72/// Registers the Delta Lake source connector with the given registry.
73pub fn register_delta_lake_source(registry: &ConnectorRegistry) {
74    let info = ConnectorInfo {
75        name: "delta-lake".to_string(),
76        display_name: "Delta Lake Source".to_string(),
77        version: env!("CARGO_PKG_VERSION").to_string(),
78        is_source: true,
79        is_sink: false,
80        config_keys: delta_lake_source_config_keys(),
81    };
82
83    registry.register_source(
84        "delta-lake",
85        info.clone(),
86        Arc::new(|registry: Option<&prometheus::Registry>| {
87            Box::new(DeltaSource::new(DeltaSourceConfig::default(), registry))
88        }),
89    );
90
91    // Also register as a table source so CREATE LOOKUP TABLE ... WITH
92    // (connector = 'delta-lake') can use Delta tables as reference data.
93    #[cfg(feature = "delta-lake")]
94    registry.register_table_source(
95        "delta-lake",
96        info,
97        Arc::new(|config| {
98            Ok(Box::new(
99                crate::lookup::delta_reference::DeltaReferenceTableSource::from_connector_config(
100                    config,
101                )?,
102            ))
103        }),
104    );
105
106    // Register lookup source factory for on-demand/partial cache mode.
107    #[cfg(feature = "delta-lake")]
108    registry.register_lookup_source("delta-lake", Arc::new(DeltaLookupFactory));
109}
110
111#[cfg(feature = "delta-lake")]
112struct DeltaLookupFactory;
113
114#[cfg(feature = "delta-lake")]
115#[async_trait::async_trait]
116impl crate::registry::LookupSourceFactory for DeltaLookupFactory {
117    async fn build(
118        &self,
119        config: crate::config::ConnectorConfig,
120        _declared_schema: Option<arrow_schema::SchemaRef>,
121    ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
122    {
123        use crate::lakehouse::delta_source_config::DeltaSourceConfig;
124        use crate::lookup::delta_lookup::{DeltaLookupSource, DeltaLookupSourceConfig};
125
126        let pk_columns: Vec<String> = config
127            .get("_primary_key_columns")
128            .unwrap_or("")
129            .split(',')
130            .map(|s| s.trim().to_string())
131            .filter(|s| !s.is_empty())
132            .collect();
133
134        if pk_columns.is_empty() {
135            return Err(crate::error::ConnectorError::ConfigurationError(
136                "delta-lake lookup source requires primary key columns".into(),
137            ));
138        }
139
140        let src_config = DeltaSourceConfig::from_config(&config)?;
141
142        let (resolved_path, resolved_opts) = crate::lakehouse::delta_io::resolve_catalog_options(
143            &src_config.catalog_type,
144            src_config.catalog_database.as_deref(),
145            src_config.catalog_name.as_deref(),
146            src_config.catalog_schema.as_deref(),
147            &src_config.table_path,
148            &src_config.storage_options,
149        )
150        .await?;
151
152        let lookup_config = DeltaLookupSourceConfig {
153            table_path: resolved_path,
154            storage_options: resolved_opts,
155            primary_key_columns: pk_columns,
156            table_name: "delta_lookup".to_string(),
157        };
158
159        // `From<LookupError>` preserves transient/non-transient class.
160        let source = DeltaLookupSource::open(lookup_config).await?;
161
162        Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
163    }
164}
165
166/// Registers the Iceberg sink connector with the given registry.
167#[allow(clippy::missing_panics_doc)]
168pub fn register_iceberg_sink(registry: &ConnectorRegistry) {
169    let info = ConnectorInfo {
170        name: "iceberg".to_string(),
171        display_name: "Apache Iceberg Sink".to_string(),
172        version: env!("CARGO_PKG_VERSION").to_string(),
173        is_source: false,
174        is_sink: true,
175        config_keys: iceberg_sink_config_keys(),
176    };
177
178    registry.register_sink(
179        "iceberg",
180        info,
181        Arc::new(|registry: Option<&prometheus::Registry>| {
182            // Default config with placeholder values — real config arrives via open().
183            let mut cfg = crate::config::ConnectorConfig::new("iceberg");
184            cfg.set("catalog.uri", "http://localhost:8181");
185            cfg.set("warehouse", "s3://default/wh");
186            cfg.set("namespace", "default");
187            cfg.set("table.name", "default");
188            Box::new(IcebergSink::new(
189                IcebergSinkConfig::from_config(&cfg).expect("default iceberg sink config"),
190                registry,
191            ))
192        }),
193    );
194}
195
196/// Registers the Iceberg source connector with the given registry.
197#[allow(clippy::missing_panics_doc)]
198pub fn register_iceberg_source(registry: &ConnectorRegistry) {
199    let info = ConnectorInfo {
200        name: "iceberg".to_string(),
201        display_name: "Apache Iceberg Source".to_string(),
202        version: env!("CARGO_PKG_VERSION").to_string(),
203        is_source: true,
204        is_sink: false,
205        config_keys: iceberg_source_config_keys(),
206    };
207
208    registry.register_source(
209        "iceberg",
210        info.clone(),
211        Arc::new(|registry: Option<&prometheus::Registry>| {
212            let mut cfg = crate::config::ConnectorConfig::new("iceberg");
213            cfg.set("catalog.uri", "http://localhost:8181");
214            cfg.set("warehouse", "s3://default/wh");
215            cfg.set("namespace", "default");
216            cfg.set("table.name", "default");
217            Box::new(IcebergSource::new(
218                IcebergSourceConfig::from_config(&cfg).expect("default iceberg source config"),
219                registry,
220            ))
221        }),
222    );
223
224    // Register as table source for CREATE LOOKUP TABLE ... WITH (connector = 'iceberg').
225    #[cfg(feature = "iceberg")]
226    registry.register_table_source(
227        "iceberg",
228        info,
229        Arc::new(|config| {
230            Ok(Box::new(
231                IcebergReferenceTableSource::from_connector_config(config)?,
232            ))
233        }),
234    );
235
236    // Register lookup source factory for on-demand/partial cache mode.
237    #[cfg(feature = "iceberg")]
238    registry.register_lookup_source("iceberg", Arc::new(IcebergLookupFactory));
239}
240
241#[cfg(feature = "iceberg")]
242struct IcebergLookupFactory;
243
244#[cfg(feature = "iceberg")]
245#[async_trait::async_trait]
246impl crate::registry::LookupSourceFactory for IcebergLookupFactory {
247    async fn build(
248        &self,
249        config: crate::config::ConnectorConfig,
250        _declared_schema: Option<arrow_schema::SchemaRef>,
251    ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
252    {
253        use crate::lakehouse::iceberg_config::IcebergCatalogConfig;
254        use crate::lookup::iceberg_lookup::{IcebergLookupSource, IcebergLookupSourceConfig};
255
256        let pk_columns: Vec<String> = config
257            .get("_primary_key_columns")
258            .unwrap_or("")
259            .split(',')
260            .map(|s| s.trim().to_string())
261            .filter(|s| !s.is_empty())
262            .collect();
263
264        if pk_columns.is_empty() {
265            return Err(crate::error::ConnectorError::ConfigurationError(
266                "iceberg lookup source requires primary key columns".into(),
267            ));
268        }
269
270        let catalog = IcebergCatalogConfig::from_config(&config)?;
271        let lookup_config = IcebergLookupSourceConfig {
272            catalog,
273            primary_key_columns: pk_columns,
274        };
275
276        let source = IcebergLookupSource::open(lookup_config).await?;
277        Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
278    }
279}
280
281/// Registers all lakehouse sink connectors (Delta Lake, Iceberg).
282pub fn register_lakehouse_sinks(registry: &ConnectorRegistry) {
283    register_delta_lake_sink(registry);
284    register_iceberg_sink(registry);
285}
286
287/// Registers all lakehouse source connectors.
288pub fn register_lakehouse_sources(registry: &ConnectorRegistry) {
289    register_delta_lake_source(registry);
290    register_iceberg_source(registry);
291}
292
293#[allow(clippy::too_many_lines)]
294fn delta_lake_config_keys() -> Vec<ConfigKeySpec> {
295    vec![
296        ConfigKeySpec::required(
297            "table.path",
298            "Path to Delta Lake table (local, s3://, az://, gs://)",
299        ),
300        ConfigKeySpec::optional(
301            "partition.columns",
302            "Comma-separated partition column names",
303            "",
304        ),
305        ConfigKeySpec::optional(
306            "target.file.size",
307            "Target Parquet file size in bytes",
308            "134217728",
309        ),
310        ConfigKeySpec::optional(
311            "max.buffer.records",
312            "Maximum records to buffer before flushing",
313            "100000",
314        ),
315        ConfigKeySpec::optional(
316            "max.buffer.duration.ms",
317            "Maximum time to buffer before flushing (ms)",
318            "60000",
319        ),
320        ConfigKeySpec::optional(
321            "checkpoint.interval",
322            "Create Delta checkpoint every N commits",
323            "10",
324        ),
325        ConfigKeySpec::optional(
326            "schema.evolution",
327            "Enable automatic schema evolution (additive columns)",
328            "false",
329        ),
330        ConfigKeySpec::optional(
331            "write.mode",
332            "Write mode: append, overwrite, upsert",
333            "append",
334        ),
335        ConfigKeySpec::optional(
336            "merge.key.columns",
337            "Key columns for upsert MERGE (required for upsert mode)",
338            "",
339        ),
340        ConfigKeySpec::optional(
341            "delivery.guarantee",
342            "exactly-once or at-least-once",
343            "at-least-once",
344        ),
345        ConfigKeySpec::optional(
346            "compaction.enabled",
347            "Enable background OPTIMIZE compaction",
348            "true",
349        ),
350        ConfigKeySpec::optional(
351            "compaction.z-order.columns",
352            "Columns for Z-ORDER clustering",
353            "",
354        ),
355        ConfigKeySpec::optional(
356            "compaction.target-file-size",
357            "Target file size after compaction (bytes, defaults to target.file.size)",
358            "",
359        ),
360        ConfigKeySpec::optional(
361            "compaction.min-files",
362            "Minimum files before triggering compaction",
363            "10",
364        ),
365        ConfigKeySpec::optional(
366            "compaction.check-interval.ms",
367            "How often to check if compaction is needed (milliseconds)",
368            "3600000",
369        ),
370        ConfigKeySpec::optional(
371            "vacuum.retention.hours",
372            "Hours to retain old files during VACUUM",
373            "168",
374        ),
375        ConfigKeySpec::optional(
376            "writer.id",
377            "Writer ID for exactly-once deduplication (auto UUID if not set)",
378            "",
379        ),
380        // ── Catalog configuration ──
381        ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
382        ConfigKeySpec::optional(
383            "catalog.database",
384            "Catalog database name (required for Glue)",
385            "",
386        ),
387        ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
388        ConfigKeySpec::optional(
389            "catalog.schema",
390            "Catalog schema name (required for Unity)",
391            "",
392        ),
393        ConfigKeySpec::optional(
394            "catalog.workspace_url",
395            "Databricks workspace URL (required for Unity)",
396            "",
397        ),
398        ConfigKeySpec::optional(
399            "catalog.access_token",
400            "Databricks access token (required for Unity)",
401            "",
402        ),
403        ConfigKeySpec::optional(
404            "catalog.storage.location",
405            "Storage location for auto-created UC external tables (e.g. s3://bucket/path)",
406            "",
407        ),
408        ConfigKeySpec::optional(
409            "max.commit.retries",
410            "Maximum retries on optimistic concurrency conflicts",
411            "3",
412        ),
413        // ── LogStore configuration ──
414        ConfigKeySpec::optional(
415            "storage.s3_locking_provider",
416            "S3 locking provider: 'dynamodb' for DynamoDB-backed log store",
417            "",
418        ),
419        ConfigKeySpec::optional(
420            "storage.dynamodb_table_name",
421            "DynamoDB table name for S3 locking (default: delta_log)",
422            "",
423        ),
424        // ── Cloud storage credentials (resolved via StorageCredentialResolver) ──
425        ConfigKeySpec::optional(
426            "storage.aws_access_key_id",
427            "AWS access key ID (falls back to AWS_ACCESS_KEY_ID env var)",
428            "",
429        ),
430        ConfigKeySpec::optional(
431            "storage.aws_secret_access_key",
432            "AWS secret access key (falls back to AWS_SECRET_ACCESS_KEY env var)",
433            "",
434        ),
435        ConfigKeySpec::optional(
436            "storage.aws_region",
437            "AWS region for S3 paths (falls back to AWS_REGION env var)",
438            "",
439        ),
440        ConfigKeySpec::optional(
441            "storage.aws_session_token",
442            "AWS session token for temporary credentials (falls back to AWS_SESSION_TOKEN)",
443            "",
444        ),
445        ConfigKeySpec::optional(
446            "storage.aws_endpoint",
447            "Custom S3 endpoint (MinIO, LocalStack; falls back to AWS_ENDPOINT_URL)",
448            "",
449        ),
450        ConfigKeySpec::optional(
451            "storage.aws_profile",
452            "AWS profile name (falls back to AWS_PROFILE env var)",
453            "",
454        ),
455        ConfigKeySpec::optional(
456            "storage.azure_storage_account_name",
457            "Azure storage account name (falls back to AZURE_STORAGE_ACCOUNT_NAME)",
458            "",
459        ),
460        ConfigKeySpec::optional(
461            "storage.azure_storage_account_key",
462            "Azure storage account key (falls back to AZURE_STORAGE_ACCOUNT_KEY)",
463            "",
464        ),
465        ConfigKeySpec::optional(
466            "storage.azure_storage_sas_token",
467            "Azure SAS token (falls back to AZURE_STORAGE_SAS_TOKEN)",
468            "",
469        ),
470        ConfigKeySpec::optional(
471            "storage.azure_storage_client_id",
472            "Azure client ID for service principal auth (falls back to AZURE_CLIENT_ID)",
473            "",
474        ),
475        ConfigKeySpec::optional(
476            "storage.google_service_account_path",
477            "Path to GCS service account JSON (falls back to GOOGLE_APPLICATION_CREDENTIALS)",
478            "",
479        ),
480        ConfigKeySpec::optional(
481            "storage.google_service_account_key",
482            "Inline GCS service account JSON (falls back to GOOGLE_SERVICE_ACCOUNT_KEY)",
483            "",
484        ),
485    ]
486}
487
488fn delta_lake_source_config_keys() -> Vec<ConfigKeySpec> {
489    vec![
490        ConfigKeySpec::required(
491            "table.path",
492            "Path to Delta Lake table (local, s3://, az://, gs://)",
493        ),
494        ConfigKeySpec::optional(
495            "starting.version",
496            "Starting version to read from (default: latest)",
497            "",
498        ),
499        ConfigKeySpec::optional(
500            "poll.interval.ms",
501            "How often to poll for new versions (ms)",
502            "1000",
503        ),
504        ConfigKeySpec::optional(
505            "read.mode",
506            "Read mode: 'incremental' (changes only) or 'snapshot' (full re-read)",
507            "incremental",
508        ),
509        ConfigKeySpec::optional(
510            "partition.filter",
511            "SQL predicate for partition filter pushdown (e.g. \"date = '2024-01-01'\")",
512            "",
513        ),
514        ConfigKeySpec::optional(
515            "schema.evolution.action",
516            "Action on schema change: 'warn' or 'error'",
517            "warn",
518        ),
519        ConfigKeySpec::optional(
520            "cdf.enabled",
521            "Use Change Data Feed for incremental reads (requires CDF on table)",
522            "false",
523        ),
524        // ── Catalog configuration ──
525        ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
526        ConfigKeySpec::optional(
527            "catalog.database",
528            "Catalog database name (required for Glue)",
529            "",
530        ),
531        ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
532        ConfigKeySpec::optional(
533            "catalog.schema",
534            "Catalog schema name (required for Unity)",
535            "",
536        ),
537        ConfigKeySpec::optional(
538            "catalog.workspace_url",
539            "Databricks workspace URL (required for Unity)",
540            "",
541        ),
542        ConfigKeySpec::optional(
543            "catalog.access_token",
544            "Databricks access token (required for Unity)",
545            "",
546        ),
547        // ── LogStore configuration ──
548        ConfigKeySpec::optional(
549            "storage.s3_locking_provider",
550            "S3 locking provider: 'dynamodb' for DynamoDB-backed log store",
551            "",
552        ),
553        ConfigKeySpec::optional(
554            "storage.dynamodb_table_name",
555            "DynamoDB table name for S3 locking (default: delta_log)",
556            "",
557        ),
558        // ── Cloud storage credentials ──
559        ConfigKeySpec::optional("storage.aws_access_key_id", "AWS access key ID", ""),
560        ConfigKeySpec::optional("storage.aws_secret_access_key", "AWS secret access key", ""),
561        ConfigKeySpec::optional("storage.aws_region", "AWS region for S3 paths", ""),
562        ConfigKeySpec::optional(
563            "storage.azure_storage_account_name",
564            "Azure storage account name",
565            "",
566        ),
567        ConfigKeySpec::optional(
568            "storage.azure_storage_account_key",
569            "Azure storage account key",
570            "",
571        ),
572        ConfigKeySpec::optional(
573            "storage.google_service_account_path",
574            "Path to GCS service account JSON",
575            "",
576        ),
577    ]
578}
579
580fn iceberg_sink_config_keys() -> Vec<ConfigKeySpec> {
581    vec![
582        ConfigKeySpec::required(
583            "catalog.uri",
584            "REST catalog URI (e.g., http://polaris:8181)",
585        ),
586        ConfigKeySpec::required(
587            "warehouse",
588            "Warehouse name (REST catalog) or URL (Hadoop catalog, e.g. s3://bucket/wh)",
589        ),
590        ConfigKeySpec::required("namespace", "Iceberg namespace (e.g., prod)"),
591        ConfigKeySpec::required("table.name", "Table name within the namespace"),
592        ConfigKeySpec::optional("catalog.type", "Catalog type: rest", "rest"),
593        ConfigKeySpec::optional(
594            "storage.type",
595            "Storage backend (s3 | s3a | fs). Required when warehouse is a name, not a URL",
596            "",
597        ),
598        ConfigKeySpec::optional(
599            "compression",
600            "Parquet compression: zstd, snappy, none",
601            "zstd",
602        ),
603        ConfigKeySpec::optional("auto.create", "Auto-create table if not exists", "false"),
604        ConfigKeySpec::optional(
605            "writer.id",
606            "Writer ID for exactly-once deduplication (auto UUID if not set)",
607            "",
608        ),
609    ]
610}
611
612fn iceberg_source_config_keys() -> Vec<ConfigKeySpec> {
613    vec![
614        ConfigKeySpec::required(
615            "catalog.uri",
616            "REST catalog URI (e.g., http://polaris:8181)",
617        ),
618        ConfigKeySpec::required(
619            "warehouse",
620            "Warehouse location (e.g., s3://bucket/warehouse)",
621        ),
622        ConfigKeySpec::required("namespace", "Iceberg namespace (e.g., prod)"),
623        ConfigKeySpec::required("table.name", "Table name within the namespace"),
624        ConfigKeySpec::optional("catalog.type", "Catalog type: rest", "rest"),
625        ConfigKeySpec::optional(
626            "poll.interval.ms",
627            "How often to poll for new snapshots (ms)",
628            "60000",
629        ),
630        ConfigKeySpec::optional("snapshot.id", "Pin to a specific snapshot ID", ""),
631        ConfigKeySpec::optional(
632            "select.columns",
633            "Comma-separated column names to select (empty = all)",
634            "",
635        ),
636    ]
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642
643    #[test]
644    fn test_register_delta_lake_sink() {
645        let registry = ConnectorRegistry::new();
646        register_delta_lake_sink(&registry);
647
648        let info = registry.sink_info("delta-lake");
649        assert!(info.is_some());
650        let info = info.unwrap();
651        assert_eq!(info.name, "delta-lake");
652        assert!(info.is_sink);
653        assert!(!info.is_source);
654        assert!(!info.config_keys.is_empty());
655    }
656
657    #[test]
658    fn test_config_keys_required() {
659        let keys = delta_lake_config_keys();
660        let required: Vec<&str> = keys
661            .iter()
662            .filter(|k| k.required)
663            .map(|k| k.key.as_str())
664            .collect();
665        assert!(required.contains(&"table.path"));
666        assert_eq!(required.len(), 1);
667    }
668
669    #[test]
670    fn test_config_keys_include_cloud_storage() {
671        let keys = delta_lake_config_keys();
672        let key_names: Vec<&str> = keys.iter().map(|k| k.key.as_str()).collect();
673        assert!(key_names.contains(&"storage.aws_access_key_id"));
674        assert!(key_names.contains(&"storage.aws_secret_access_key"));
675        assert!(key_names.contains(&"storage.aws_region"));
676        assert!(key_names.contains(&"storage.azure_storage_account_name"));
677        assert!(key_names.contains(&"storage.azure_storage_account_key"));
678        assert!(key_names.contains(&"storage.google_service_account_path"));
679    }
680
681    #[test]
682    fn test_config_keys_optional_present() {
683        let keys = delta_lake_config_keys();
684        let optional: Vec<&str> = keys
685            .iter()
686            .filter(|k| !k.required)
687            .map(|k| k.key.as_str())
688            .collect();
689        assert!(optional.contains(&"partition.columns"));
690        assert!(optional.contains(&"target.file.size"));
691        assert!(optional.contains(&"write.mode"));
692        assert!(optional.contains(&"delivery.guarantee"));
693        assert!(optional.contains(&"merge.key.columns"));
694        assert!(optional.contains(&"schema.evolution"));
695        assert!(optional.contains(&"compaction.enabled"));
696        assert!(optional.contains(&"compaction.z-order.columns"));
697        assert!(optional.contains(&"vacuum.retention.hours"));
698        assert!(optional.contains(&"writer.id"));
699        // Catalog keys
700        assert!(optional.contains(&"catalog.type"));
701        assert!(optional.contains(&"catalog.database"));
702        assert!(optional.contains(&"catalog.name"));
703        assert!(optional.contains(&"catalog.schema"));
704        assert!(optional.contains(&"catalog.workspace_url"));
705        assert!(optional.contains(&"catalog.access_token"));
706        assert!(optional.contains(&"catalog.storage.location"));
707    }
708
709    #[test]
710    fn test_factory_creates_sink() {
711        let registry = ConnectorRegistry::new();
712        register_delta_lake_sink(&registry);
713
714        let config = crate::config::ConnectorConfig::new("delta-lake");
715        let sink = registry.create_sink(&config, None);
716        assert!(sink.is_ok());
717    }
718
719    // ── Delta Lake source registration tests ──
720
721    #[test]
722    fn test_register_delta_lake_source() {
723        let registry = ConnectorRegistry::new();
724        register_delta_lake_source(&registry);
725
726        let info = registry.source_info("delta-lake");
727        assert!(info.is_some());
728        let info = info.unwrap();
729        assert_eq!(info.name, "delta-lake");
730        assert!(info.is_source);
731        assert!(!info.is_sink);
732        assert!(!info.config_keys.is_empty());
733    }
734
735    #[test]
736    fn test_source_config_keys() {
737        let keys = delta_lake_source_config_keys();
738        let required: Vec<&str> = keys
739            .iter()
740            .filter(|k| k.required)
741            .map(|k| k.key.as_str())
742            .collect();
743        assert!(required.contains(&"table.path"));
744        assert_eq!(required.len(), 1);
745
746        let optional: Vec<&str> = keys
747            .iter()
748            .filter(|k| !k.required)
749            .map(|k| k.key.as_str())
750            .collect();
751        assert!(optional.contains(&"starting.version"));
752        assert!(optional.contains(&"poll.interval.ms"));
753        // Catalog keys
754        assert!(optional.contains(&"catalog.type"));
755        assert!(optional.contains(&"catalog.database"));
756    }
757
758    #[test]
759    fn test_factory_creates_source() {
760        let registry = ConnectorRegistry::new();
761        register_delta_lake_source(&registry);
762
763        let config = crate::config::ConnectorConfig::new("delta-lake");
764        let source = registry.create_source(&config, None);
765        assert!(source.is_ok());
766    }
767
768    #[test]
769    fn test_register_lakehouse_sinks() {
770        let registry = ConnectorRegistry::new();
771        register_lakehouse_sinks(&registry);
772
773        assert!(registry.sink_info("delta-lake").is_some());
774        assert!(registry.sink_info("iceberg").is_some());
775    }
776
777    // ── Iceberg registration tests ──
778
779    #[test]
780    fn test_register_iceberg_sink() {
781        let registry = ConnectorRegistry::new();
782        register_iceberg_sink(&registry);
783
784        let info = registry.sink_info("iceberg");
785        assert!(info.is_some());
786        let info = info.unwrap();
787        assert_eq!(info.name, "iceberg");
788        assert!(info.is_sink);
789        assert!(!info.is_source);
790        assert!(!info.config_keys.is_empty());
791    }
792
793    #[test]
794    fn test_register_iceberg_source() {
795        let registry = ConnectorRegistry::new();
796        register_iceberg_source(&registry);
797
798        let info = registry.source_info("iceberg");
799        assert!(info.is_some());
800        let info = info.unwrap();
801        assert_eq!(info.name, "iceberg");
802        assert!(info.is_source);
803        assert!(!info.is_sink);
804    }
805
806    #[test]
807    fn test_iceberg_sink_config_keys() {
808        let keys = iceberg_sink_config_keys();
809        let required: Vec<&str> = keys
810            .iter()
811            .filter(|k| k.required)
812            .map(|k| k.key.as_str())
813            .collect();
814        assert!(required.contains(&"catalog.uri"));
815        assert!(required.contains(&"warehouse"));
816        assert!(required.contains(&"namespace"));
817        assert!(required.contains(&"table.name"));
818        assert_eq!(required.len(), 4);
819    }
820
821    #[test]
822    fn test_iceberg_source_config_keys() {
823        let keys = iceberg_source_config_keys();
824        let required: Vec<&str> = keys
825            .iter()
826            .filter(|k| k.required)
827            .map(|k| k.key.as_str())
828            .collect();
829        assert!(required.contains(&"catalog.uri"));
830        assert!(required.contains(&"warehouse"));
831        assert!(required.contains(&"namespace"));
832        assert!(required.contains(&"table.name"));
833        assert_eq!(required.len(), 4);
834    }
835
836    #[test]
837    fn test_factory_creates_iceberg_sink() {
838        let registry = ConnectorRegistry::new();
839        register_iceberg_sink(&registry);
840
841        let config = crate::config::ConnectorConfig::new("iceberg");
842        let sink = registry.create_sink(&config, None);
843        assert!(sink.is_ok());
844    }
845
846    #[test]
847    fn test_factory_creates_iceberg_source() {
848        let registry = ConnectorRegistry::new();
849        register_iceberg_source(&registry);
850
851        let config = crate::config::ConnectorConfig::new("iceberg");
852        let source = registry.create_source(&config, None);
853        assert!(source.is_ok());
854    }
855}