1pub mod delta;
5pub mod delta_config;
6#[cfg(feature = "delta-lake")]
7pub mod delta_io;
8pub mod delta_metrics;
9pub mod delta_source;
10pub mod delta_source_config;
11#[cfg(feature = "delta-lake")]
12pub mod delta_table_provider;
13#[cfg(feature = "delta-lake-unity")]
14pub(crate) mod unity_catalog;
15
16pub mod iceberg;
18pub mod iceberg_config;
19#[cfg(feature = "iceberg")]
20pub mod iceberg_incremental;
21#[cfg(feature = "iceberg")]
22pub mod iceberg_io;
23pub mod iceberg_reference;
24pub mod iceberg_source;
25
26pub mod metrics;
28
29pub use delta::DeltaLakeSink;
31pub use delta_config::{
32 CompactionConfig, DeliveryGuarantee, DeltaCatalogType, DeltaLakeSinkConfig, DeltaWriteMode,
33};
34pub use delta_metrics::DeltaLakeSinkMetrics;
35pub use delta_source::DeltaSource;
36pub use delta_source_config::{DeltaReadMode, DeltaSourceConfig, SchemaEvolutionAction};
37pub use metrics::LakehouseSinkMetrics;
38
39pub use iceberg::IcebergSink;
41pub use iceberg_config::{
42 IcebergCatalogConfig, IcebergCatalogType, IcebergSinkConfig, IcebergSourceConfig,
43};
44pub use iceberg_reference::IcebergReferenceTableSource;
45pub use iceberg_source::IcebergSource;
46
47use std::sync::Arc;
48
49use crate::config::{ConfigKeySpec, ConnectorInfo};
50use crate::registry::ConnectorRegistry;
51
52pub fn register_delta_lake_sink(registry: &ConnectorRegistry) {
54 let info = ConnectorInfo {
55 name: "delta-lake".to_string(),
56 display_name: "Delta Lake Sink".to_string(),
57 version: env!("CARGO_PKG_VERSION").to_string(),
58 is_source: false,
59 is_sink: true,
60 config_keys: delta_lake_config_keys(),
61 };
62
63 registry.register_sink(
64 "delta-lake",
65 info,
66 Arc::new(|registry: Option<&prometheus::Registry>| {
67 Box::new(DeltaLakeSink::new(DeltaLakeSinkConfig::default(), registry))
68 }),
69 );
70}
71
72pub fn register_delta_lake_source(registry: &ConnectorRegistry) {
74 let info = ConnectorInfo {
75 name: "delta-lake".to_string(),
76 display_name: "Delta Lake Source".to_string(),
77 version: env!("CARGO_PKG_VERSION").to_string(),
78 is_source: true,
79 is_sink: false,
80 config_keys: delta_lake_source_config_keys(),
81 };
82
83 registry.register_source(
84 "delta-lake",
85 info.clone(),
86 Arc::new(|registry: Option<&prometheus::Registry>| {
87 Box::new(DeltaSource::new(DeltaSourceConfig::default(), registry))
88 }),
89 );
90
91 #[cfg(feature = "delta-lake")]
94 registry.register_table_source(
95 "delta-lake",
96 info,
97 Arc::new(|config| {
98 Ok(Box::new(
99 crate::lookup::delta_reference::DeltaReferenceTableSource::from_connector_config(
100 config,
101 )?,
102 ))
103 }),
104 );
105
106 #[cfg(feature = "delta-lake")]
108 registry.register_lookup_source("delta-lake", Arc::new(DeltaLookupFactory));
109}
110
111#[cfg(feature = "delta-lake")]
112struct DeltaLookupFactory;
113
114#[cfg(feature = "delta-lake")]
115#[async_trait::async_trait]
116impl crate::registry::LookupSourceFactory for DeltaLookupFactory {
117 async fn build(
118 &self,
119 config: crate::config::ConnectorConfig,
120 _declared_schema: Option<arrow_schema::SchemaRef>,
121 ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
122 {
123 use crate::lakehouse::delta_source_config::DeltaSourceConfig;
124 use crate::lookup::delta_lookup::{DeltaLookupSource, DeltaLookupSourceConfig};
125
126 let pk_columns: Vec<String> = config
127 .get("_primary_key_columns")
128 .unwrap_or("")
129 .split(',')
130 .map(|s| s.trim().to_string())
131 .filter(|s| !s.is_empty())
132 .collect();
133
134 if pk_columns.is_empty() {
135 return Err(crate::error::ConnectorError::ConfigurationError(
136 "delta-lake lookup source requires primary key columns".into(),
137 ));
138 }
139
140 let src_config = DeltaSourceConfig::from_config(&config)?;
141
142 let (resolved_path, resolved_opts) = crate::lakehouse::delta_io::resolve_catalog_options(
143 &src_config.catalog_type,
144 src_config.catalog_database.as_deref(),
145 src_config.catalog_name.as_deref(),
146 src_config.catalog_schema.as_deref(),
147 &src_config.table_path,
148 &src_config.storage_options,
149 )
150 .await?;
151
152 let lookup_config = DeltaLookupSourceConfig {
153 table_path: resolved_path,
154 storage_options: resolved_opts,
155 primary_key_columns: pk_columns,
156 table_name: "delta_lookup".to_string(),
157 };
158
159 let source = DeltaLookupSource::open(lookup_config).await?;
161
162 Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
163 }
164}
165
166#[allow(clippy::missing_panics_doc)]
168pub fn register_iceberg_sink(registry: &ConnectorRegistry) {
169 let info = ConnectorInfo {
170 name: "iceberg".to_string(),
171 display_name: "Apache Iceberg Sink".to_string(),
172 version: env!("CARGO_PKG_VERSION").to_string(),
173 is_source: false,
174 is_sink: true,
175 config_keys: iceberg_sink_config_keys(),
176 };
177
178 registry.register_sink(
179 "iceberg",
180 info,
181 Arc::new(|registry: Option<&prometheus::Registry>| {
182 let mut cfg = crate::config::ConnectorConfig::new("iceberg");
184 cfg.set("catalog.uri", "http://localhost:8181");
185 cfg.set("warehouse", "s3://default/wh");
186 cfg.set("namespace", "default");
187 cfg.set("table.name", "default");
188 Box::new(IcebergSink::new(
189 IcebergSinkConfig::from_config(&cfg).expect("default iceberg sink config"),
190 registry,
191 ))
192 }),
193 );
194}
195
196#[allow(clippy::missing_panics_doc)]
198pub fn register_iceberg_source(registry: &ConnectorRegistry) {
199 let info = ConnectorInfo {
200 name: "iceberg".to_string(),
201 display_name: "Apache Iceberg Source".to_string(),
202 version: env!("CARGO_PKG_VERSION").to_string(),
203 is_source: true,
204 is_sink: false,
205 config_keys: iceberg_source_config_keys(),
206 };
207
208 registry.register_source(
209 "iceberg",
210 info.clone(),
211 Arc::new(|registry: Option<&prometheus::Registry>| {
212 let mut cfg = crate::config::ConnectorConfig::new("iceberg");
213 cfg.set("catalog.uri", "http://localhost:8181");
214 cfg.set("warehouse", "s3://default/wh");
215 cfg.set("namespace", "default");
216 cfg.set("table.name", "default");
217 Box::new(IcebergSource::new(
218 IcebergSourceConfig::from_config(&cfg).expect("default iceberg source config"),
219 registry,
220 ))
221 }),
222 );
223
224 #[cfg(feature = "iceberg")]
226 registry.register_table_source(
227 "iceberg",
228 info,
229 Arc::new(|config| {
230 Ok(Box::new(
231 IcebergReferenceTableSource::from_connector_config(config)?,
232 ))
233 }),
234 );
235
236 #[cfg(feature = "iceberg")]
238 registry.register_lookup_source("iceberg", Arc::new(IcebergLookupFactory));
239}
240
241#[cfg(feature = "iceberg")]
242struct IcebergLookupFactory;
243
244#[cfg(feature = "iceberg")]
245#[async_trait::async_trait]
246impl crate::registry::LookupSourceFactory for IcebergLookupFactory {
247 async fn build(
248 &self,
249 config: crate::config::ConnectorConfig,
250 _declared_schema: Option<arrow_schema::SchemaRef>,
251 ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
252 {
253 use crate::lakehouse::iceberg_config::IcebergCatalogConfig;
254 use crate::lookup::iceberg_lookup::{IcebergLookupSource, IcebergLookupSourceConfig};
255
256 let pk_columns: Vec<String> = config
257 .get("_primary_key_columns")
258 .unwrap_or("")
259 .split(',')
260 .map(|s| s.trim().to_string())
261 .filter(|s| !s.is_empty())
262 .collect();
263
264 if pk_columns.is_empty() {
265 return Err(crate::error::ConnectorError::ConfigurationError(
266 "iceberg lookup source requires primary key columns".into(),
267 ));
268 }
269
270 let catalog = IcebergCatalogConfig::from_config(&config)?;
271 let lookup_config = IcebergLookupSourceConfig {
272 catalog,
273 primary_key_columns: pk_columns,
274 };
275
276 let source = IcebergLookupSource::open(lookup_config).await?;
277 Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
278 }
279}
280
281pub fn register_lakehouse_sinks(registry: &ConnectorRegistry) {
283 register_delta_lake_sink(registry);
284 register_iceberg_sink(registry);
285}
286
287pub fn register_lakehouse_sources(registry: &ConnectorRegistry) {
289 register_delta_lake_source(registry);
290 register_iceberg_source(registry);
291}
292
293#[allow(clippy::too_many_lines)]
294fn delta_lake_config_keys() -> Vec<ConfigKeySpec> {
295 vec![
296 ConfigKeySpec::required(
297 "table.path",
298 "Path to Delta Lake table (local, s3://, az://, gs://)",
299 ),
300 ConfigKeySpec::optional(
301 "partition.columns",
302 "Comma-separated partition column names",
303 "",
304 ),
305 ConfigKeySpec::optional(
306 "target.file.size",
307 "Target Parquet file size in bytes",
308 "134217728",
309 ),
310 ConfigKeySpec::optional(
311 "max.buffer.records",
312 "Maximum records to buffer before flushing",
313 "100000",
314 ),
315 ConfigKeySpec::optional(
316 "max.buffer.duration.ms",
317 "Maximum time to buffer before flushing (ms)",
318 "60000",
319 ),
320 ConfigKeySpec::optional(
321 "checkpoint.interval",
322 "Create Delta checkpoint every N commits",
323 "10",
324 ),
325 ConfigKeySpec::optional(
326 "schema.evolution",
327 "Enable automatic schema evolution (additive columns)",
328 "false",
329 ),
330 ConfigKeySpec::optional(
331 "write.mode",
332 "Write mode: append, overwrite, upsert",
333 "append",
334 ),
335 ConfigKeySpec::optional(
336 "merge.key.columns",
337 "Key columns for upsert MERGE (required for upsert mode)",
338 "",
339 ),
340 ConfigKeySpec::optional(
341 "delivery.guarantee",
342 "exactly-once or at-least-once",
343 "at-least-once",
344 ),
345 ConfigKeySpec::optional(
346 "compaction.enabled",
347 "Enable background OPTIMIZE compaction",
348 "true",
349 ),
350 ConfigKeySpec::optional(
351 "compaction.z-order.columns",
352 "Columns for Z-ORDER clustering",
353 "",
354 ),
355 ConfigKeySpec::optional(
356 "compaction.target-file-size",
357 "Target file size after compaction (bytes, defaults to target.file.size)",
358 "",
359 ),
360 ConfigKeySpec::optional(
361 "compaction.min-files",
362 "Minimum files before triggering compaction",
363 "10",
364 ),
365 ConfigKeySpec::optional(
366 "compaction.check-interval.ms",
367 "How often to check if compaction is needed (milliseconds)",
368 "3600000",
369 ),
370 ConfigKeySpec::optional(
371 "vacuum.retention.hours",
372 "Hours to retain old files during VACUUM",
373 "168",
374 ),
375 ConfigKeySpec::optional(
376 "writer.id",
377 "Writer ID for exactly-once deduplication (auto UUID if not set)",
378 "",
379 ),
380 ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
382 ConfigKeySpec::optional(
383 "catalog.database",
384 "Catalog database name (required for Glue)",
385 "",
386 ),
387 ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
388 ConfigKeySpec::optional(
389 "catalog.schema",
390 "Catalog schema name (required for Unity)",
391 "",
392 ),
393 ConfigKeySpec::optional(
394 "catalog.workspace_url",
395 "Databricks workspace URL (required for Unity)",
396 "",
397 ),
398 ConfigKeySpec::optional(
399 "catalog.access_token",
400 "Databricks access token (required for Unity)",
401 "",
402 ),
403 ConfigKeySpec::optional(
404 "catalog.storage.location",
405 "Storage location for auto-created UC external tables (e.g. s3://bucket/path)",
406 "",
407 ),
408 ConfigKeySpec::optional(
409 "max.commit.retries",
410 "Maximum retries on optimistic concurrency conflicts",
411 "3",
412 ),
413 ConfigKeySpec::optional(
415 "storage.s3_locking_provider",
416 "S3 locking provider: 'dynamodb' for DynamoDB-backed log store",
417 "",
418 ),
419 ConfigKeySpec::optional(
420 "storage.dynamodb_table_name",
421 "DynamoDB table name for S3 locking (default: delta_log)",
422 "",
423 ),
424 ConfigKeySpec::optional(
426 "storage.aws_access_key_id",
427 "AWS access key ID (falls back to AWS_ACCESS_KEY_ID env var)",
428 "",
429 ),
430 ConfigKeySpec::optional(
431 "storage.aws_secret_access_key",
432 "AWS secret access key (falls back to AWS_SECRET_ACCESS_KEY env var)",
433 "",
434 ),
435 ConfigKeySpec::optional(
436 "storage.aws_region",
437 "AWS region for S3 paths (falls back to AWS_REGION env var)",
438 "",
439 ),
440 ConfigKeySpec::optional(
441 "storage.aws_session_token",
442 "AWS session token for temporary credentials (falls back to AWS_SESSION_TOKEN)",
443 "",
444 ),
445 ConfigKeySpec::optional(
446 "storage.aws_endpoint",
447 "Custom S3 endpoint (MinIO, LocalStack; falls back to AWS_ENDPOINT_URL)",
448 "",
449 ),
450 ConfigKeySpec::optional(
451 "storage.aws_profile",
452 "AWS profile name (falls back to AWS_PROFILE env var)",
453 "",
454 ),
455 ConfigKeySpec::optional(
456 "storage.azure_storage_account_name",
457 "Azure storage account name (falls back to AZURE_STORAGE_ACCOUNT_NAME)",
458 "",
459 ),
460 ConfigKeySpec::optional(
461 "storage.azure_storage_account_key",
462 "Azure storage account key (falls back to AZURE_STORAGE_ACCOUNT_KEY)",
463 "",
464 ),
465 ConfigKeySpec::optional(
466 "storage.azure_storage_sas_token",
467 "Azure SAS token (falls back to AZURE_STORAGE_SAS_TOKEN)",
468 "",
469 ),
470 ConfigKeySpec::optional(
471 "storage.azure_storage_client_id",
472 "Azure client ID for service principal auth (falls back to AZURE_CLIENT_ID)",
473 "",
474 ),
475 ConfigKeySpec::optional(
476 "storage.google_service_account_path",
477 "Path to GCS service account JSON (falls back to GOOGLE_APPLICATION_CREDENTIALS)",
478 "",
479 ),
480 ConfigKeySpec::optional(
481 "storage.google_service_account_key",
482 "Inline GCS service account JSON (falls back to GOOGLE_SERVICE_ACCOUNT_KEY)",
483 "",
484 ),
485 ]
486}
487
488fn delta_lake_source_config_keys() -> Vec<ConfigKeySpec> {
489 vec![
490 ConfigKeySpec::required(
491 "table.path",
492 "Path to Delta Lake table (local, s3://, az://, gs://)",
493 ),
494 ConfigKeySpec::optional(
495 "starting.version",
496 "Starting version to read from (default: latest)",
497 "",
498 ),
499 ConfigKeySpec::optional(
500 "poll.interval.ms",
501 "How often to poll for new versions (ms)",
502 "1000",
503 ),
504 ConfigKeySpec::optional(
505 "read.mode",
506 "Read mode: 'incremental' (changes only) or 'snapshot' (full re-read)",
507 "incremental",
508 ),
509 ConfigKeySpec::optional(
510 "partition.filter",
511 "SQL predicate for partition filter pushdown (e.g. \"date = '2024-01-01'\")",
512 "",
513 ),
514 ConfigKeySpec::optional(
515 "schema.evolution.action",
516 "Action on schema change: 'warn' or 'error'",
517 "warn",
518 ),
519 ConfigKeySpec::optional(
520 "cdf.enabled",
521 "Use Change Data Feed for incremental reads (requires CDF on table)",
522 "false",
523 ),
524 ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
526 ConfigKeySpec::optional(
527 "catalog.database",
528 "Catalog database name (required for Glue)",
529 "",
530 ),
531 ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
532 ConfigKeySpec::optional(
533 "catalog.schema",
534 "Catalog schema name (required for Unity)",
535 "",
536 ),
537 ConfigKeySpec::optional(
538 "catalog.workspace_url",
539 "Databricks workspace URL (required for Unity)",
540 "",
541 ),
542 ConfigKeySpec::optional(
543 "catalog.access_token",
544 "Databricks access token (required for Unity)",
545 "",
546 ),
547 ConfigKeySpec::optional(
549 "storage.s3_locking_provider",
550 "S3 locking provider: 'dynamodb' for DynamoDB-backed log store",
551 "",
552 ),
553 ConfigKeySpec::optional(
554 "storage.dynamodb_table_name",
555 "DynamoDB table name for S3 locking (default: delta_log)",
556 "",
557 ),
558 ConfigKeySpec::optional("storage.aws_access_key_id", "AWS access key ID", ""),
560 ConfigKeySpec::optional("storage.aws_secret_access_key", "AWS secret access key", ""),
561 ConfigKeySpec::optional("storage.aws_region", "AWS region for S3 paths", ""),
562 ConfigKeySpec::optional(
563 "storage.azure_storage_account_name",
564 "Azure storage account name",
565 "",
566 ),
567 ConfigKeySpec::optional(
568 "storage.azure_storage_account_key",
569 "Azure storage account key",
570 "",
571 ),
572 ConfigKeySpec::optional(
573 "storage.google_service_account_path",
574 "Path to GCS service account JSON",
575 "",
576 ),
577 ]
578}
579
580fn iceberg_sink_config_keys() -> Vec<ConfigKeySpec> {
581 vec![
582 ConfigKeySpec::required(
583 "catalog.uri",
584 "REST catalog URI (e.g., http://polaris:8181)",
585 ),
586 ConfigKeySpec::required(
587 "warehouse",
588 "Warehouse name (REST catalog) or URL (Hadoop catalog, e.g. s3://bucket/wh)",
589 ),
590 ConfigKeySpec::required("namespace", "Iceberg namespace (e.g., prod)"),
591 ConfigKeySpec::required("table.name", "Table name within the namespace"),
592 ConfigKeySpec::optional("catalog.type", "Catalog type: rest", "rest"),
593 ConfigKeySpec::optional(
594 "storage.type",
595 "Storage backend (s3 | s3a | fs). Required when warehouse is a name, not a URL",
596 "",
597 ),
598 ConfigKeySpec::optional(
599 "compression",
600 "Parquet compression: zstd, snappy, none",
601 "zstd",
602 ),
603 ConfigKeySpec::optional("auto.create", "Auto-create table if not exists", "false"),
604 ConfigKeySpec::optional(
605 "writer.id",
606 "Writer ID for exactly-once deduplication (auto UUID if not set)",
607 "",
608 ),
609 ]
610}
611
612fn iceberg_source_config_keys() -> Vec<ConfigKeySpec> {
613 vec![
614 ConfigKeySpec::required(
615 "catalog.uri",
616 "REST catalog URI (e.g., http://polaris:8181)",
617 ),
618 ConfigKeySpec::required(
619 "warehouse",
620 "Warehouse location (e.g., s3://bucket/warehouse)",
621 ),
622 ConfigKeySpec::required("namespace", "Iceberg namespace (e.g., prod)"),
623 ConfigKeySpec::required("table.name", "Table name within the namespace"),
624 ConfigKeySpec::optional("catalog.type", "Catalog type: rest", "rest"),
625 ConfigKeySpec::optional(
626 "poll.interval.ms",
627 "How often to poll for new snapshots (ms)",
628 "60000",
629 ),
630 ConfigKeySpec::optional("snapshot.id", "Pin to a specific snapshot ID", ""),
631 ConfigKeySpec::optional(
632 "select.columns",
633 "Comma-separated column names to select (empty = all)",
634 "",
635 ),
636 ]
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642
643 #[test]
644 fn test_register_delta_lake_sink() {
645 let registry = ConnectorRegistry::new();
646 register_delta_lake_sink(®istry);
647
648 let info = registry.sink_info("delta-lake");
649 assert!(info.is_some());
650 let info = info.unwrap();
651 assert_eq!(info.name, "delta-lake");
652 assert!(info.is_sink);
653 assert!(!info.is_source);
654 assert!(!info.config_keys.is_empty());
655 }
656
657 #[test]
658 fn test_config_keys_required() {
659 let keys = delta_lake_config_keys();
660 let required: Vec<&str> = keys
661 .iter()
662 .filter(|k| k.required)
663 .map(|k| k.key.as_str())
664 .collect();
665 assert!(required.contains(&"table.path"));
666 assert_eq!(required.len(), 1);
667 }
668
669 #[test]
670 fn test_config_keys_include_cloud_storage() {
671 let keys = delta_lake_config_keys();
672 let key_names: Vec<&str> = keys.iter().map(|k| k.key.as_str()).collect();
673 assert!(key_names.contains(&"storage.aws_access_key_id"));
674 assert!(key_names.contains(&"storage.aws_secret_access_key"));
675 assert!(key_names.contains(&"storage.aws_region"));
676 assert!(key_names.contains(&"storage.azure_storage_account_name"));
677 assert!(key_names.contains(&"storage.azure_storage_account_key"));
678 assert!(key_names.contains(&"storage.google_service_account_path"));
679 }
680
681 #[test]
682 fn test_config_keys_optional_present() {
683 let keys = delta_lake_config_keys();
684 let optional: Vec<&str> = keys
685 .iter()
686 .filter(|k| !k.required)
687 .map(|k| k.key.as_str())
688 .collect();
689 assert!(optional.contains(&"partition.columns"));
690 assert!(optional.contains(&"target.file.size"));
691 assert!(optional.contains(&"write.mode"));
692 assert!(optional.contains(&"delivery.guarantee"));
693 assert!(optional.contains(&"merge.key.columns"));
694 assert!(optional.contains(&"schema.evolution"));
695 assert!(optional.contains(&"compaction.enabled"));
696 assert!(optional.contains(&"compaction.z-order.columns"));
697 assert!(optional.contains(&"vacuum.retention.hours"));
698 assert!(optional.contains(&"writer.id"));
699 assert!(optional.contains(&"catalog.type"));
701 assert!(optional.contains(&"catalog.database"));
702 assert!(optional.contains(&"catalog.name"));
703 assert!(optional.contains(&"catalog.schema"));
704 assert!(optional.contains(&"catalog.workspace_url"));
705 assert!(optional.contains(&"catalog.access_token"));
706 assert!(optional.contains(&"catalog.storage.location"));
707 }
708
709 #[test]
710 fn test_factory_creates_sink() {
711 let registry = ConnectorRegistry::new();
712 register_delta_lake_sink(®istry);
713
714 let config = crate::config::ConnectorConfig::new("delta-lake");
715 let sink = registry.create_sink(&config, None);
716 assert!(sink.is_ok());
717 }
718
719 #[test]
722 fn test_register_delta_lake_source() {
723 let registry = ConnectorRegistry::new();
724 register_delta_lake_source(®istry);
725
726 let info = registry.source_info("delta-lake");
727 assert!(info.is_some());
728 let info = info.unwrap();
729 assert_eq!(info.name, "delta-lake");
730 assert!(info.is_source);
731 assert!(!info.is_sink);
732 assert!(!info.config_keys.is_empty());
733 }
734
735 #[test]
736 fn test_source_config_keys() {
737 let keys = delta_lake_source_config_keys();
738 let required: Vec<&str> = keys
739 .iter()
740 .filter(|k| k.required)
741 .map(|k| k.key.as_str())
742 .collect();
743 assert!(required.contains(&"table.path"));
744 assert_eq!(required.len(), 1);
745
746 let optional: Vec<&str> = keys
747 .iter()
748 .filter(|k| !k.required)
749 .map(|k| k.key.as_str())
750 .collect();
751 assert!(optional.contains(&"starting.version"));
752 assert!(optional.contains(&"poll.interval.ms"));
753 assert!(optional.contains(&"catalog.type"));
755 assert!(optional.contains(&"catalog.database"));
756 }
757
758 #[test]
759 fn test_factory_creates_source() {
760 let registry = ConnectorRegistry::new();
761 register_delta_lake_source(®istry);
762
763 let config = crate::config::ConnectorConfig::new("delta-lake");
764 let source = registry.create_source(&config, None);
765 assert!(source.is_ok());
766 }
767
768 #[test]
769 fn test_register_lakehouse_sinks() {
770 let registry = ConnectorRegistry::new();
771 register_lakehouse_sinks(®istry);
772
773 assert!(registry.sink_info("delta-lake").is_some());
774 assert!(registry.sink_info("iceberg").is_some());
775 }
776
777 #[test]
780 fn test_register_iceberg_sink() {
781 let registry = ConnectorRegistry::new();
782 register_iceberg_sink(®istry);
783
784 let info = registry.sink_info("iceberg");
785 assert!(info.is_some());
786 let info = info.unwrap();
787 assert_eq!(info.name, "iceberg");
788 assert!(info.is_sink);
789 assert!(!info.is_source);
790 assert!(!info.config_keys.is_empty());
791 }
792
793 #[test]
794 fn test_register_iceberg_source() {
795 let registry = ConnectorRegistry::new();
796 register_iceberg_source(®istry);
797
798 let info = registry.source_info("iceberg");
799 assert!(info.is_some());
800 let info = info.unwrap();
801 assert_eq!(info.name, "iceberg");
802 assert!(info.is_source);
803 assert!(!info.is_sink);
804 }
805
806 #[test]
807 fn test_iceberg_sink_config_keys() {
808 let keys = iceberg_sink_config_keys();
809 let required: Vec<&str> = keys
810 .iter()
811 .filter(|k| k.required)
812 .map(|k| k.key.as_str())
813 .collect();
814 assert!(required.contains(&"catalog.uri"));
815 assert!(required.contains(&"warehouse"));
816 assert!(required.contains(&"namespace"));
817 assert!(required.contains(&"table.name"));
818 assert_eq!(required.len(), 4);
819 }
820
821 #[test]
822 fn test_iceberg_source_config_keys() {
823 let keys = iceberg_source_config_keys();
824 let required: Vec<&str> = keys
825 .iter()
826 .filter(|k| k.required)
827 .map(|k| k.key.as_str())
828 .collect();
829 assert!(required.contains(&"catalog.uri"));
830 assert!(required.contains(&"warehouse"));
831 assert!(required.contains(&"namespace"));
832 assert!(required.contains(&"table.name"));
833 assert_eq!(required.len(), 4);
834 }
835
836 #[test]
837 fn test_factory_creates_iceberg_sink() {
838 let registry = ConnectorRegistry::new();
839 register_iceberg_sink(®istry);
840
841 let config = crate::config::ConnectorConfig::new("iceberg");
842 let sink = registry.create_sink(&config, None);
843 assert!(sink.is_ok());
844 }
845
846 #[test]
847 fn test_factory_creates_iceberg_source() {
848 let registry = ConnectorRegistry::new();
849 register_iceberg_source(®istry);
850
851 let config = crate::config::ConnectorConfig::new("iceberg");
852 let source = registry.create_source(&config, None);
853 assert!(source.is_ok());
854 }
855}