Skip to main content

laminar_connectors/lakehouse/
mod.rs

1//! Lakehouse connectors (Delta Lake, Apache Iceberg).
2
3// Delta Lake modules
4pub mod delta;
5pub mod delta_config;
6#[cfg(feature = "delta-lake")]
7pub mod delta_io;
8pub mod delta_metrics;
9pub mod delta_source;
10pub mod delta_source_config;
11#[cfg(feature = "delta-lake")]
12pub mod delta_table_provider;
13#[cfg(feature = "delta-lake-unity")]
14pub(crate) mod unity_catalog;
15
16// Apache Iceberg modules
17pub mod iceberg;
18pub mod iceberg_config;
19#[cfg(feature = "iceberg")]
20pub mod iceberg_incremental;
21#[cfg(feature = "iceberg")]
22pub mod iceberg_io;
23pub mod iceberg_reference;
24pub mod iceberg_source;
25
26// Common metrics
27pub mod metrics;
28
29// Re-export Delta Lake types at module level.
30pub use delta::DeltaLakeSink;
31pub use delta_config::{
32    CompactionConfig, DeliveryGuarantee, DeltaCatalogType, DeltaLakeSinkConfig, DeltaWriteMode,
33};
34pub use delta_metrics::DeltaLakeSinkMetrics;
35pub use delta_source::DeltaSource;
36pub use delta_source_config::{DeltaReadMode, DeltaSourceConfig, SchemaEvolutionAction};
37pub use metrics::LakehouseSinkMetrics;
38
39// Re-export Iceberg types at module level.
40pub use iceberg::IcebergSink;
41pub use iceberg_config::{
42    IcebergCatalogConfig, IcebergCatalogType, IcebergSinkConfig, IcebergSourceConfig,
43};
44pub use iceberg_reference::IcebergReferenceTableSource;
45pub use iceberg_source::IcebergSource;
46
47use std::sync::Arc;
48
49use crate::config::{ConfigKeySpec, ConnectorInfo};
50use crate::registry::ConnectorRegistry;
51
52/// Registers the Delta Lake sink connector with the given registry.
53pub fn register_delta_lake_sink(registry: &ConnectorRegistry) {
54    let info = ConnectorInfo {
55        name: "delta-lake".to_string(),
56        display_name: "Delta Lake Sink".to_string(),
57        version: env!("CARGO_PKG_VERSION").to_string(),
58        is_source: false,
59        is_sink: true,
60        config_keys: delta_lake_config_keys(),
61    };
62
63    registry.register_sink(
64        "delta-lake",
65        info,
66        Arc::new(|registry: Option<&prometheus::Registry>| {
67            Box::new(DeltaLakeSink::new(DeltaLakeSinkConfig::default(), registry))
68        }),
69    );
70}
71
72/// Registers the Delta Lake source connector with the given registry.
73pub fn register_delta_lake_source(registry: &ConnectorRegistry) {
74    let info = ConnectorInfo {
75        name: "delta-lake".to_string(),
76        display_name: "Delta Lake Source".to_string(),
77        version: env!("CARGO_PKG_VERSION").to_string(),
78        is_source: true,
79        is_sink: false,
80        config_keys: delta_lake_source_config_keys(),
81    };
82
83    registry.register_source(
84        "delta-lake",
85        info.clone(),
86        Arc::new(|registry: Option<&prometheus::Registry>| {
87            Box::new(DeltaSource::new(DeltaSourceConfig::default(), registry))
88        }),
89    );
90
91    // Also register as a table source so CREATE LOOKUP TABLE ... WITH
92    // (connector = 'delta-lake') can use Delta tables as reference data.
93    #[cfg(feature = "delta-lake")]
94    registry.register_table_source(
95        "delta-lake",
96        info,
97        Arc::new(|config| {
98            Ok(Box::new(
99                crate::lookup::delta_reference::DeltaReferenceTableSource::from_connector_config(
100                    config,
101                )?,
102            ))
103        }),
104    );
105
106    // Register lookup source factory for on-demand/partial cache mode.
107    #[cfg(feature = "delta-lake")]
108    registry.register_lookup_source("delta-lake", Arc::new(DeltaLookupFactory));
109}
110
111#[cfg(feature = "delta-lake")]
112struct DeltaLookupFactory;
113
114#[cfg(feature = "delta-lake")]
115#[async_trait::async_trait]
116impl crate::registry::LookupSourceFactory for DeltaLookupFactory {
117    async fn build(
118        &self,
119        config: crate::config::ConnectorConfig,
120    ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
121    {
122        use crate::lakehouse::delta_source_config::DeltaSourceConfig;
123        use crate::lookup::delta_lookup::{DeltaLookupSource, DeltaLookupSourceConfig};
124
125        let pk_columns: Vec<String> = config
126            .get("_primary_key_columns")
127            .unwrap_or("")
128            .split(',')
129            .map(|s| s.trim().to_string())
130            .filter(|s| !s.is_empty())
131            .collect();
132
133        if pk_columns.is_empty() {
134            return Err(crate::error::ConnectorError::ConfigurationError(
135                "delta-lake lookup source requires primary key columns".into(),
136            ));
137        }
138
139        let src_config = DeltaSourceConfig::from_config(&config)?;
140
141        let (resolved_path, resolved_opts) = crate::lakehouse::delta_io::resolve_catalog_options(
142            &src_config.catalog_type,
143            src_config.catalog_database.as_deref(),
144            src_config.catalog_name.as_deref(),
145            src_config.catalog_schema.as_deref(),
146            &src_config.table_path,
147            &src_config.storage_options,
148        )
149        .await?;
150
151        let lookup_config = DeltaLookupSourceConfig {
152            table_path: resolved_path,
153            storage_options: resolved_opts,
154            primary_key_columns: pk_columns,
155            table_name: "delta_lookup".to_string(),
156        };
157
158        // `From<LookupError>` preserves transient/non-transient class.
159        let source = DeltaLookupSource::open(lookup_config).await?;
160
161        Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
162    }
163}
164
165/// Registers the Iceberg sink connector with the given registry.
166#[allow(clippy::missing_panics_doc)]
167pub fn register_iceberg_sink(registry: &ConnectorRegistry) {
168    let info = ConnectorInfo {
169        name: "iceberg".to_string(),
170        display_name: "Apache Iceberg Sink".to_string(),
171        version: env!("CARGO_PKG_VERSION").to_string(),
172        is_source: false,
173        is_sink: true,
174        config_keys: iceberg_sink_config_keys(),
175    };
176
177    registry.register_sink(
178        "iceberg",
179        info,
180        Arc::new(|registry: Option<&prometheus::Registry>| {
181            // Default config with placeholder values — real config arrives via open().
182            let mut cfg = crate::config::ConnectorConfig::new("iceberg");
183            cfg.set("catalog.uri", "http://localhost:8181");
184            cfg.set("warehouse", "s3://default/wh");
185            cfg.set("namespace", "default");
186            cfg.set("table.name", "default");
187            Box::new(IcebergSink::new(
188                IcebergSinkConfig::from_config(&cfg).expect("default iceberg sink config"),
189                registry,
190            ))
191        }),
192    );
193}
194
195/// Registers the Iceberg source connector with the given registry.
196#[allow(clippy::missing_panics_doc)]
197pub fn register_iceberg_source(registry: &ConnectorRegistry) {
198    let info = ConnectorInfo {
199        name: "iceberg".to_string(),
200        display_name: "Apache Iceberg Source".to_string(),
201        version: env!("CARGO_PKG_VERSION").to_string(),
202        is_source: true,
203        is_sink: false,
204        config_keys: iceberg_source_config_keys(),
205    };
206
207    registry.register_source(
208        "iceberg",
209        info.clone(),
210        Arc::new(|registry: Option<&prometheus::Registry>| {
211            let mut cfg = crate::config::ConnectorConfig::new("iceberg");
212            cfg.set("catalog.uri", "http://localhost:8181");
213            cfg.set("warehouse", "s3://default/wh");
214            cfg.set("namespace", "default");
215            cfg.set("table.name", "default");
216            Box::new(IcebergSource::new(
217                IcebergSourceConfig::from_config(&cfg).expect("default iceberg source config"),
218                registry,
219            ))
220        }),
221    );
222
223    // Register as table source for CREATE LOOKUP TABLE ... WITH (connector = 'iceberg').
224    #[cfg(feature = "iceberg")]
225    registry.register_table_source(
226        "iceberg",
227        info,
228        Arc::new(|config| {
229            Ok(Box::new(
230                IcebergReferenceTableSource::from_connector_config(config)?,
231            ))
232        }),
233    );
234}
235
236/// Registers all lakehouse sink connectors (Delta Lake, Iceberg).
237pub fn register_lakehouse_sinks(registry: &ConnectorRegistry) {
238    register_delta_lake_sink(registry);
239    register_iceberg_sink(registry);
240}
241
242/// Registers all lakehouse source connectors.
243pub fn register_lakehouse_sources(registry: &ConnectorRegistry) {
244    register_delta_lake_source(registry);
245    register_iceberg_source(registry);
246}
247
248#[allow(clippy::too_many_lines)]
249fn delta_lake_config_keys() -> Vec<ConfigKeySpec> {
250    vec![
251        ConfigKeySpec::required(
252            "table.path",
253            "Path to Delta Lake table (local, s3://, az://, gs://)",
254        ),
255        ConfigKeySpec::optional(
256            "partition.columns",
257            "Comma-separated partition column names",
258            "",
259        ),
260        ConfigKeySpec::optional(
261            "target.file.size",
262            "Target Parquet file size in bytes",
263            "134217728",
264        ),
265        ConfigKeySpec::optional(
266            "max.buffer.records",
267            "Maximum records to buffer before flushing",
268            "100000",
269        ),
270        ConfigKeySpec::optional(
271            "max.buffer.duration.ms",
272            "Maximum time to buffer before flushing (ms)",
273            "60000",
274        ),
275        ConfigKeySpec::optional(
276            "checkpoint.interval",
277            "Create Delta checkpoint every N commits",
278            "10",
279        ),
280        ConfigKeySpec::optional(
281            "schema.evolution",
282            "Enable automatic schema evolution (additive columns)",
283            "false",
284        ),
285        ConfigKeySpec::optional(
286            "write.mode",
287            "Write mode: append, overwrite, upsert",
288            "append",
289        ),
290        ConfigKeySpec::optional(
291            "merge.key.columns",
292            "Key columns for upsert MERGE (required for upsert mode)",
293            "",
294        ),
295        ConfigKeySpec::optional(
296            "delivery.guarantee",
297            "exactly-once or at-least-once",
298            "at-least-once",
299        ),
300        ConfigKeySpec::optional(
301            "compaction.enabled",
302            "Enable background OPTIMIZE compaction",
303            "true",
304        ),
305        ConfigKeySpec::optional(
306            "compaction.z-order.columns",
307            "Columns for Z-ORDER clustering",
308            "",
309        ),
310        ConfigKeySpec::optional(
311            "compaction.target-file-size",
312            "Target file size after compaction (bytes, defaults to target.file.size)",
313            "",
314        ),
315        ConfigKeySpec::optional(
316            "compaction.min-files",
317            "Minimum files before triggering compaction",
318            "10",
319        ),
320        ConfigKeySpec::optional(
321            "compaction.check-interval.ms",
322            "How often to check if compaction is needed (milliseconds)",
323            "3600000",
324        ),
325        ConfigKeySpec::optional(
326            "vacuum.retention.hours",
327            "Hours to retain old files during VACUUM",
328            "168",
329        ),
330        ConfigKeySpec::optional(
331            "writer.id",
332            "Writer ID for exactly-once deduplication (auto UUID if not set)",
333            "",
334        ),
335        // ── Catalog configuration ──
336        ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
337        ConfigKeySpec::optional(
338            "catalog.database",
339            "Catalog database name (required for Glue)",
340            "",
341        ),
342        ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
343        ConfigKeySpec::optional(
344            "catalog.schema",
345            "Catalog schema name (required for Unity)",
346            "",
347        ),
348        ConfigKeySpec::optional(
349            "catalog.workspace_url",
350            "Databricks workspace URL (required for Unity)",
351            "",
352        ),
353        ConfigKeySpec::optional(
354            "catalog.access_token",
355            "Databricks access token (required for Unity)",
356            "",
357        ),
358        ConfigKeySpec::optional(
359            "catalog.storage.location",
360            "Storage location for auto-created UC external tables (e.g. s3://bucket/path)",
361            "",
362        ),
363        ConfigKeySpec::optional(
364            "max.commit.retries",
365            "Maximum retries on optimistic concurrency conflicts",
366            "3",
367        ),
368        // ── LogStore configuration ──
369        ConfigKeySpec::optional(
370            "storage.s3_locking_provider",
371            "S3 locking provider: 'dynamodb' for DynamoDB-backed log store",
372            "",
373        ),
374        ConfigKeySpec::optional(
375            "storage.dynamodb_table_name",
376            "DynamoDB table name for S3 locking (default: delta_log)",
377            "",
378        ),
379        // ── Cloud storage credentials (resolved via StorageCredentialResolver) ──
380        ConfigKeySpec::optional(
381            "storage.aws_access_key_id",
382            "AWS access key ID (falls back to AWS_ACCESS_KEY_ID env var)",
383            "",
384        ),
385        ConfigKeySpec::optional(
386            "storage.aws_secret_access_key",
387            "AWS secret access key (falls back to AWS_SECRET_ACCESS_KEY env var)",
388            "",
389        ),
390        ConfigKeySpec::optional(
391            "storage.aws_region",
392            "AWS region for S3 paths (falls back to AWS_REGION env var)",
393            "",
394        ),
395        ConfigKeySpec::optional(
396            "storage.aws_session_token",
397            "AWS session token for temporary credentials (falls back to AWS_SESSION_TOKEN)",
398            "",
399        ),
400        ConfigKeySpec::optional(
401            "storage.aws_endpoint",
402            "Custom S3 endpoint (MinIO, LocalStack; falls back to AWS_ENDPOINT_URL)",
403            "",
404        ),
405        ConfigKeySpec::optional(
406            "storage.aws_profile",
407            "AWS profile name (falls back to AWS_PROFILE env var)",
408            "",
409        ),
410        ConfigKeySpec::optional(
411            "storage.azure_storage_account_name",
412            "Azure storage account name (falls back to AZURE_STORAGE_ACCOUNT_NAME)",
413            "",
414        ),
415        ConfigKeySpec::optional(
416            "storage.azure_storage_account_key",
417            "Azure storage account key (falls back to AZURE_STORAGE_ACCOUNT_KEY)",
418            "",
419        ),
420        ConfigKeySpec::optional(
421            "storage.azure_storage_sas_token",
422            "Azure SAS token (falls back to AZURE_STORAGE_SAS_TOKEN)",
423            "",
424        ),
425        ConfigKeySpec::optional(
426            "storage.azure_storage_client_id",
427            "Azure client ID for service principal auth (falls back to AZURE_CLIENT_ID)",
428            "",
429        ),
430        ConfigKeySpec::optional(
431            "storage.google_service_account_path",
432            "Path to GCS service account JSON (falls back to GOOGLE_APPLICATION_CREDENTIALS)",
433            "",
434        ),
435        ConfigKeySpec::optional(
436            "storage.google_service_account_key",
437            "Inline GCS service account JSON (falls back to GOOGLE_SERVICE_ACCOUNT_KEY)",
438            "",
439        ),
440    ]
441}
442
443fn delta_lake_source_config_keys() -> Vec<ConfigKeySpec> {
444    vec![
445        ConfigKeySpec::required(
446            "table.path",
447            "Path to Delta Lake table (local, s3://, az://, gs://)",
448        ),
449        ConfigKeySpec::optional(
450            "starting.version",
451            "Starting version to read from (default: latest)",
452            "",
453        ),
454        ConfigKeySpec::optional(
455            "poll.interval.ms",
456            "How often to poll for new versions (ms)",
457            "1000",
458        ),
459        ConfigKeySpec::optional(
460            "read.mode",
461            "Read mode: 'incremental' (changes only) or 'snapshot' (full re-read)",
462            "incremental",
463        ),
464        ConfigKeySpec::optional(
465            "partition.filter",
466            "SQL predicate for partition filter pushdown (e.g. \"date = '2024-01-01'\")",
467            "",
468        ),
469        ConfigKeySpec::optional(
470            "schema.evolution.action",
471            "Action on schema change: 'warn' or 'error'",
472            "warn",
473        ),
474        ConfigKeySpec::optional(
475            "cdf.enabled",
476            "Use Change Data Feed for incremental reads (requires CDF on table)",
477            "false",
478        ),
479        // ── Catalog configuration ──
480        ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
481        ConfigKeySpec::optional(
482            "catalog.database",
483            "Catalog database name (required for Glue)",
484            "",
485        ),
486        ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
487        ConfigKeySpec::optional(
488            "catalog.schema",
489            "Catalog schema name (required for Unity)",
490            "",
491        ),
492        ConfigKeySpec::optional(
493            "catalog.workspace_url",
494            "Databricks workspace URL (required for Unity)",
495            "",
496        ),
497        ConfigKeySpec::optional(
498            "catalog.access_token",
499            "Databricks access token (required for Unity)",
500            "",
501        ),
502        // ── LogStore configuration ──
503        ConfigKeySpec::optional(
504            "storage.s3_locking_provider",
505            "S3 locking provider: 'dynamodb' for DynamoDB-backed log store",
506            "",
507        ),
508        ConfigKeySpec::optional(
509            "storage.dynamodb_table_name",
510            "DynamoDB table name for S3 locking (default: delta_log)",
511            "",
512        ),
513        // ── Cloud storage credentials ──
514        ConfigKeySpec::optional("storage.aws_access_key_id", "AWS access key ID", ""),
515        ConfigKeySpec::optional("storage.aws_secret_access_key", "AWS secret access key", ""),
516        ConfigKeySpec::optional("storage.aws_region", "AWS region for S3 paths", ""),
517        ConfigKeySpec::optional(
518            "storage.azure_storage_account_name",
519            "Azure storage account name",
520            "",
521        ),
522        ConfigKeySpec::optional(
523            "storage.azure_storage_account_key",
524            "Azure storage account key",
525            "",
526        ),
527        ConfigKeySpec::optional(
528            "storage.google_service_account_path",
529            "Path to GCS service account JSON",
530            "",
531        ),
532    ]
533}
534
535fn iceberg_sink_config_keys() -> Vec<ConfigKeySpec> {
536    vec![
537        ConfigKeySpec::required(
538            "catalog.uri",
539            "REST catalog URI (e.g., http://polaris:8181)",
540        ),
541        ConfigKeySpec::required(
542            "warehouse",
543            "Warehouse name (REST catalog) or URL (Hadoop catalog, e.g. s3://bucket/wh)",
544        ),
545        ConfigKeySpec::required("namespace", "Iceberg namespace (e.g., prod)"),
546        ConfigKeySpec::required("table.name", "Table name within the namespace"),
547        ConfigKeySpec::optional("catalog.type", "Catalog type: rest", "rest"),
548        ConfigKeySpec::optional(
549            "storage.type",
550            "Storage backend (s3 | s3a | fs). Required when warehouse is a name, not a URL",
551            "",
552        ),
553        ConfigKeySpec::optional(
554            "compression",
555            "Parquet compression: zstd, snappy, none",
556            "zstd",
557        ),
558        ConfigKeySpec::optional("auto.create", "Auto-create table if not exists", "false"),
559        ConfigKeySpec::optional(
560            "writer.id",
561            "Writer ID for exactly-once deduplication (auto UUID if not set)",
562            "",
563        ),
564    ]
565}
566
567fn iceberg_source_config_keys() -> Vec<ConfigKeySpec> {
568    vec![
569        ConfigKeySpec::required(
570            "catalog.uri",
571            "REST catalog URI (e.g., http://polaris:8181)",
572        ),
573        ConfigKeySpec::required(
574            "warehouse",
575            "Warehouse location (e.g., s3://bucket/warehouse)",
576        ),
577        ConfigKeySpec::required("namespace", "Iceberg namespace (e.g., prod)"),
578        ConfigKeySpec::required("table.name", "Table name within the namespace"),
579        ConfigKeySpec::optional("catalog.type", "Catalog type: rest", "rest"),
580        ConfigKeySpec::optional(
581            "poll.interval.ms",
582            "How often to poll for new snapshots (ms)",
583            "60000",
584        ),
585        ConfigKeySpec::optional("snapshot.id", "Pin to a specific snapshot ID", ""),
586        ConfigKeySpec::optional(
587            "select.columns",
588            "Comma-separated column names to select (empty = all)",
589            "",
590        ),
591    ]
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597
598    #[test]
599    fn test_register_delta_lake_sink() {
600        let registry = ConnectorRegistry::new();
601        register_delta_lake_sink(&registry);
602
603        let info = registry.sink_info("delta-lake");
604        assert!(info.is_some());
605        let info = info.unwrap();
606        assert_eq!(info.name, "delta-lake");
607        assert!(info.is_sink);
608        assert!(!info.is_source);
609        assert!(!info.config_keys.is_empty());
610    }
611
612    #[test]
613    fn test_config_keys_required() {
614        let keys = delta_lake_config_keys();
615        let required: Vec<&str> = keys
616            .iter()
617            .filter(|k| k.required)
618            .map(|k| k.key.as_str())
619            .collect();
620        assert!(required.contains(&"table.path"));
621        assert_eq!(required.len(), 1);
622    }
623
624    #[test]
625    fn test_config_keys_include_cloud_storage() {
626        let keys = delta_lake_config_keys();
627        let key_names: Vec<&str> = keys.iter().map(|k| k.key.as_str()).collect();
628        assert!(key_names.contains(&"storage.aws_access_key_id"));
629        assert!(key_names.contains(&"storage.aws_secret_access_key"));
630        assert!(key_names.contains(&"storage.aws_region"));
631        assert!(key_names.contains(&"storage.azure_storage_account_name"));
632        assert!(key_names.contains(&"storage.azure_storage_account_key"));
633        assert!(key_names.contains(&"storage.google_service_account_path"));
634    }
635
636    #[test]
637    fn test_config_keys_optional_present() {
638        let keys = delta_lake_config_keys();
639        let optional: Vec<&str> = keys
640            .iter()
641            .filter(|k| !k.required)
642            .map(|k| k.key.as_str())
643            .collect();
644        assert!(optional.contains(&"partition.columns"));
645        assert!(optional.contains(&"target.file.size"));
646        assert!(optional.contains(&"write.mode"));
647        assert!(optional.contains(&"delivery.guarantee"));
648        assert!(optional.contains(&"merge.key.columns"));
649        assert!(optional.contains(&"schema.evolution"));
650        assert!(optional.contains(&"compaction.enabled"));
651        assert!(optional.contains(&"compaction.z-order.columns"));
652        assert!(optional.contains(&"vacuum.retention.hours"));
653        assert!(optional.contains(&"writer.id"));
654        // Catalog keys
655        assert!(optional.contains(&"catalog.type"));
656        assert!(optional.contains(&"catalog.database"));
657        assert!(optional.contains(&"catalog.name"));
658        assert!(optional.contains(&"catalog.schema"));
659        assert!(optional.contains(&"catalog.workspace_url"));
660        assert!(optional.contains(&"catalog.access_token"));
661        assert!(optional.contains(&"catalog.storage.location"));
662    }
663
664    #[test]
665    fn test_factory_creates_sink() {
666        let registry = ConnectorRegistry::new();
667        register_delta_lake_sink(&registry);
668
669        let config = crate::config::ConnectorConfig::new("delta-lake");
670        let sink = registry.create_sink(&config, None);
671        assert!(sink.is_ok());
672    }
673
674    // ── Delta Lake source registration tests ──
675
676    #[test]
677    fn test_register_delta_lake_source() {
678        let registry = ConnectorRegistry::new();
679        register_delta_lake_source(&registry);
680
681        let info = registry.source_info("delta-lake");
682        assert!(info.is_some());
683        let info = info.unwrap();
684        assert_eq!(info.name, "delta-lake");
685        assert!(info.is_source);
686        assert!(!info.is_sink);
687        assert!(!info.config_keys.is_empty());
688    }
689
690    #[test]
691    fn test_source_config_keys() {
692        let keys = delta_lake_source_config_keys();
693        let required: Vec<&str> = keys
694            .iter()
695            .filter(|k| k.required)
696            .map(|k| k.key.as_str())
697            .collect();
698        assert!(required.contains(&"table.path"));
699        assert_eq!(required.len(), 1);
700
701        let optional: Vec<&str> = keys
702            .iter()
703            .filter(|k| !k.required)
704            .map(|k| k.key.as_str())
705            .collect();
706        assert!(optional.contains(&"starting.version"));
707        assert!(optional.contains(&"poll.interval.ms"));
708        // Catalog keys
709        assert!(optional.contains(&"catalog.type"));
710        assert!(optional.contains(&"catalog.database"));
711    }
712
713    #[test]
714    fn test_factory_creates_source() {
715        let registry = ConnectorRegistry::new();
716        register_delta_lake_source(&registry);
717
718        let config = crate::config::ConnectorConfig::new("delta-lake");
719        let source = registry.create_source(&config, None);
720        assert!(source.is_ok());
721    }
722
723    #[test]
724    fn test_register_lakehouse_sinks() {
725        let registry = ConnectorRegistry::new();
726        register_lakehouse_sinks(&registry);
727
728        assert!(registry.sink_info("delta-lake").is_some());
729        assert!(registry.sink_info("iceberg").is_some());
730    }
731
732    // ── Iceberg registration tests ──
733
734    #[test]
735    fn test_register_iceberg_sink() {
736        let registry = ConnectorRegistry::new();
737        register_iceberg_sink(&registry);
738
739        let info = registry.sink_info("iceberg");
740        assert!(info.is_some());
741        let info = info.unwrap();
742        assert_eq!(info.name, "iceberg");
743        assert!(info.is_sink);
744        assert!(!info.is_source);
745        assert!(!info.config_keys.is_empty());
746    }
747
748    #[test]
749    fn test_register_iceberg_source() {
750        let registry = ConnectorRegistry::new();
751        register_iceberg_source(&registry);
752
753        let info = registry.source_info("iceberg");
754        assert!(info.is_some());
755        let info = info.unwrap();
756        assert_eq!(info.name, "iceberg");
757        assert!(info.is_source);
758        assert!(!info.is_sink);
759    }
760
761    #[test]
762    fn test_iceberg_sink_config_keys() {
763        let keys = iceberg_sink_config_keys();
764        let required: Vec<&str> = keys
765            .iter()
766            .filter(|k| k.required)
767            .map(|k| k.key.as_str())
768            .collect();
769        assert!(required.contains(&"catalog.uri"));
770        assert!(required.contains(&"warehouse"));
771        assert!(required.contains(&"namespace"));
772        assert!(required.contains(&"table.name"));
773        assert_eq!(required.len(), 4);
774    }
775
776    #[test]
777    fn test_iceberg_source_config_keys() {
778        let keys = iceberg_source_config_keys();
779        let required: Vec<&str> = keys
780            .iter()
781            .filter(|k| k.required)
782            .map(|k| k.key.as_str())
783            .collect();
784        assert!(required.contains(&"catalog.uri"));
785        assert!(required.contains(&"warehouse"));
786        assert!(required.contains(&"namespace"));
787        assert!(required.contains(&"table.name"));
788        assert_eq!(required.len(), 4);
789    }
790
791    #[test]
792    fn test_factory_creates_iceberg_sink() {
793        let registry = ConnectorRegistry::new();
794        register_iceberg_sink(&registry);
795
796        let config = crate::config::ConnectorConfig::new("iceberg");
797        let sink = registry.create_sink(&config, None);
798        assert!(sink.is_ok());
799    }
800
801    #[test]
802    fn test_factory_creates_iceberg_source() {
803        let registry = ConnectorRegistry::new();
804        register_iceberg_source(&registry);
805
806        let config = crate::config::ConnectorConfig::new("iceberg");
807        let source = registry.create_source(&config, None);
808        assert!(source.is_ok());
809    }
810}