1pub mod delta;
5pub mod delta_config;
6#[cfg(feature = "delta-lake")]
7pub mod delta_io;
8pub mod delta_metrics;
9pub mod delta_source;
10pub mod delta_source_config;
11#[cfg(feature = "delta-lake")]
12pub mod delta_table_provider;
13#[cfg(feature = "delta-lake-unity")]
14pub(crate) mod unity_catalog;
15
16pub mod iceberg;
18pub mod iceberg_config;
19#[cfg(feature = "iceberg")]
20pub mod iceberg_incremental;
21#[cfg(feature = "iceberg")]
22pub mod iceberg_io;
23pub mod iceberg_reference;
24pub mod iceberg_source;
25
26pub mod metrics;
28
29pub use delta::DeltaLakeSink;
31pub use delta_config::{
32 CompactionConfig, DeliveryGuarantee, DeltaCatalogType, DeltaLakeSinkConfig, DeltaWriteMode,
33};
34pub use delta_metrics::DeltaLakeSinkMetrics;
35pub use delta_source::DeltaSource;
36pub use delta_source_config::{DeltaReadMode, DeltaSourceConfig, SchemaEvolutionAction};
37pub use metrics::LakehouseSinkMetrics;
38
39pub use iceberg::IcebergSink;
41pub use iceberg_config::{
42 IcebergCatalogConfig, IcebergCatalogType, IcebergSinkConfig, IcebergSourceConfig,
43};
44pub use iceberg_reference::IcebergReferenceTableSource;
45pub use iceberg_source::IcebergSource;
46
47use std::sync::Arc;
48
49use crate::config::{ConfigKeySpec, ConnectorInfo};
50use crate::registry::ConnectorRegistry;
51
52pub fn register_delta_lake_sink(registry: &ConnectorRegistry) {
54 let info = ConnectorInfo {
55 name: "delta-lake".to_string(),
56 display_name: "Delta Lake Sink".to_string(),
57 version: env!("CARGO_PKG_VERSION").to_string(),
58 is_source: false,
59 is_sink: true,
60 config_keys: delta_lake_config_keys(),
61 };
62
63 registry.register_sink(
64 "delta-lake",
65 info,
66 Arc::new(|registry: Option<&prometheus::Registry>| {
67 Box::new(DeltaLakeSink::new(DeltaLakeSinkConfig::default(), registry))
68 }),
69 );
70}
71
72pub fn register_delta_lake_source(registry: &ConnectorRegistry) {
74 let info = ConnectorInfo {
75 name: "delta-lake".to_string(),
76 display_name: "Delta Lake Source".to_string(),
77 version: env!("CARGO_PKG_VERSION").to_string(),
78 is_source: true,
79 is_sink: false,
80 config_keys: delta_lake_source_config_keys(),
81 };
82
83 registry.register_source(
84 "delta-lake",
85 info.clone(),
86 Arc::new(|registry: Option<&prometheus::Registry>| {
87 Box::new(DeltaSource::new(DeltaSourceConfig::default(), registry))
88 }),
89 );
90
91 #[cfg(feature = "delta-lake")]
94 registry.register_table_source(
95 "delta-lake",
96 info,
97 Arc::new(|config| {
98 Ok(Box::new(
99 crate::lookup::delta_reference::DeltaReferenceTableSource::from_connector_config(
100 config,
101 )?,
102 ))
103 }),
104 );
105
106 #[cfg(feature = "delta-lake")]
108 registry.register_lookup_source("delta-lake", Arc::new(DeltaLookupFactory));
109}
110
111#[cfg(feature = "delta-lake")]
112struct DeltaLookupFactory;
113
114#[cfg(feature = "delta-lake")]
115#[async_trait::async_trait]
116impl crate::registry::LookupSourceFactory for DeltaLookupFactory {
117 async fn build(
118 &self,
119 config: crate::config::ConnectorConfig,
120 ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, crate::error::ConnectorError>
121 {
122 use crate::lakehouse::delta_source_config::DeltaSourceConfig;
123 use crate::lookup::delta_lookup::{DeltaLookupSource, DeltaLookupSourceConfig};
124
125 let pk_columns: Vec<String> = config
126 .get("_primary_key_columns")
127 .unwrap_or("")
128 .split(',')
129 .map(|s| s.trim().to_string())
130 .filter(|s| !s.is_empty())
131 .collect();
132
133 if pk_columns.is_empty() {
134 return Err(crate::error::ConnectorError::ConfigurationError(
135 "delta-lake lookup source requires primary key columns".into(),
136 ));
137 }
138
139 let src_config = DeltaSourceConfig::from_config(&config)?;
140
141 let (resolved_path, resolved_opts) = crate::lakehouse::delta_io::resolve_catalog_options(
142 &src_config.catalog_type,
143 src_config.catalog_database.as_deref(),
144 src_config.catalog_name.as_deref(),
145 src_config.catalog_schema.as_deref(),
146 &src_config.table_path,
147 &src_config.storage_options,
148 )
149 .await?;
150
151 let lookup_config = DeltaLookupSourceConfig {
152 table_path: resolved_path,
153 storage_options: resolved_opts,
154 primary_key_columns: pk_columns,
155 table_name: "delta_lookup".to_string(),
156 };
157
158 let source = DeltaLookupSource::open(lookup_config).await?;
160
161 Ok(Arc::new(source) as Arc<dyn laminar_core::lookup::source::LookupSourceDyn>)
162 }
163}
164
165#[allow(clippy::missing_panics_doc)]
167pub fn register_iceberg_sink(registry: &ConnectorRegistry) {
168 let info = ConnectorInfo {
169 name: "iceberg".to_string(),
170 display_name: "Apache Iceberg Sink".to_string(),
171 version: env!("CARGO_PKG_VERSION").to_string(),
172 is_source: false,
173 is_sink: true,
174 config_keys: iceberg_sink_config_keys(),
175 };
176
177 registry.register_sink(
178 "iceberg",
179 info,
180 Arc::new(|registry: Option<&prometheus::Registry>| {
181 let mut cfg = crate::config::ConnectorConfig::new("iceberg");
183 cfg.set("catalog.uri", "http://localhost:8181");
184 cfg.set("warehouse", "s3://default/wh");
185 cfg.set("namespace", "default");
186 cfg.set("table.name", "default");
187 Box::new(IcebergSink::new(
188 IcebergSinkConfig::from_config(&cfg).expect("default iceberg sink config"),
189 registry,
190 ))
191 }),
192 );
193}
194
195#[allow(clippy::missing_panics_doc)]
197pub fn register_iceberg_source(registry: &ConnectorRegistry) {
198 let info = ConnectorInfo {
199 name: "iceberg".to_string(),
200 display_name: "Apache Iceberg Source".to_string(),
201 version: env!("CARGO_PKG_VERSION").to_string(),
202 is_source: true,
203 is_sink: false,
204 config_keys: iceberg_source_config_keys(),
205 };
206
207 registry.register_source(
208 "iceberg",
209 info.clone(),
210 Arc::new(|registry: Option<&prometheus::Registry>| {
211 let mut cfg = crate::config::ConnectorConfig::new("iceberg");
212 cfg.set("catalog.uri", "http://localhost:8181");
213 cfg.set("warehouse", "s3://default/wh");
214 cfg.set("namespace", "default");
215 cfg.set("table.name", "default");
216 Box::new(IcebergSource::new(
217 IcebergSourceConfig::from_config(&cfg).expect("default iceberg source config"),
218 registry,
219 ))
220 }),
221 );
222
223 #[cfg(feature = "iceberg")]
225 registry.register_table_source(
226 "iceberg",
227 info,
228 Arc::new(|config| {
229 Ok(Box::new(
230 IcebergReferenceTableSource::from_connector_config(config)?,
231 ))
232 }),
233 );
234}
235
236pub fn register_lakehouse_sinks(registry: &ConnectorRegistry) {
238 register_delta_lake_sink(registry);
239 register_iceberg_sink(registry);
240}
241
242pub fn register_lakehouse_sources(registry: &ConnectorRegistry) {
244 register_delta_lake_source(registry);
245 register_iceberg_source(registry);
246}
247
248#[allow(clippy::too_many_lines)]
249fn delta_lake_config_keys() -> Vec<ConfigKeySpec> {
250 vec![
251 ConfigKeySpec::required(
252 "table.path",
253 "Path to Delta Lake table (local, s3://, az://, gs://)",
254 ),
255 ConfigKeySpec::optional(
256 "partition.columns",
257 "Comma-separated partition column names",
258 "",
259 ),
260 ConfigKeySpec::optional(
261 "target.file.size",
262 "Target Parquet file size in bytes",
263 "134217728",
264 ),
265 ConfigKeySpec::optional(
266 "max.buffer.records",
267 "Maximum records to buffer before flushing",
268 "100000",
269 ),
270 ConfigKeySpec::optional(
271 "max.buffer.duration.ms",
272 "Maximum time to buffer before flushing (ms)",
273 "60000",
274 ),
275 ConfigKeySpec::optional(
276 "checkpoint.interval",
277 "Create Delta checkpoint every N commits",
278 "10",
279 ),
280 ConfigKeySpec::optional(
281 "schema.evolution",
282 "Enable automatic schema evolution (additive columns)",
283 "false",
284 ),
285 ConfigKeySpec::optional(
286 "write.mode",
287 "Write mode: append, overwrite, upsert",
288 "append",
289 ),
290 ConfigKeySpec::optional(
291 "merge.key.columns",
292 "Key columns for upsert MERGE (required for upsert mode)",
293 "",
294 ),
295 ConfigKeySpec::optional(
296 "delivery.guarantee",
297 "exactly-once or at-least-once",
298 "at-least-once",
299 ),
300 ConfigKeySpec::optional(
301 "compaction.enabled",
302 "Enable background OPTIMIZE compaction",
303 "true",
304 ),
305 ConfigKeySpec::optional(
306 "compaction.z-order.columns",
307 "Columns for Z-ORDER clustering",
308 "",
309 ),
310 ConfigKeySpec::optional(
311 "compaction.target-file-size",
312 "Target file size after compaction (bytes, defaults to target.file.size)",
313 "",
314 ),
315 ConfigKeySpec::optional(
316 "compaction.min-files",
317 "Minimum files before triggering compaction",
318 "10",
319 ),
320 ConfigKeySpec::optional(
321 "compaction.check-interval.ms",
322 "How often to check if compaction is needed (milliseconds)",
323 "3600000",
324 ),
325 ConfigKeySpec::optional(
326 "vacuum.retention.hours",
327 "Hours to retain old files during VACUUM",
328 "168",
329 ),
330 ConfigKeySpec::optional(
331 "writer.id",
332 "Writer ID for exactly-once deduplication (auto UUID if not set)",
333 "",
334 ),
335 ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
337 ConfigKeySpec::optional(
338 "catalog.database",
339 "Catalog database name (required for Glue)",
340 "",
341 ),
342 ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
343 ConfigKeySpec::optional(
344 "catalog.schema",
345 "Catalog schema name (required for Unity)",
346 "",
347 ),
348 ConfigKeySpec::optional(
349 "catalog.workspace_url",
350 "Databricks workspace URL (required for Unity)",
351 "",
352 ),
353 ConfigKeySpec::optional(
354 "catalog.access_token",
355 "Databricks access token (required for Unity)",
356 "",
357 ),
358 ConfigKeySpec::optional(
359 "catalog.storage.location",
360 "Storage location for auto-created UC external tables (e.g. s3://bucket/path)",
361 "",
362 ),
363 ConfigKeySpec::optional(
364 "max.commit.retries",
365 "Maximum retries on optimistic concurrency conflicts",
366 "3",
367 ),
368 ConfigKeySpec::optional(
370 "storage.s3_locking_provider",
371 "S3 locking provider: 'dynamodb' for DynamoDB-backed log store",
372 "",
373 ),
374 ConfigKeySpec::optional(
375 "storage.dynamodb_table_name",
376 "DynamoDB table name for S3 locking (default: delta_log)",
377 "",
378 ),
379 ConfigKeySpec::optional(
381 "storage.aws_access_key_id",
382 "AWS access key ID (falls back to AWS_ACCESS_KEY_ID env var)",
383 "",
384 ),
385 ConfigKeySpec::optional(
386 "storage.aws_secret_access_key",
387 "AWS secret access key (falls back to AWS_SECRET_ACCESS_KEY env var)",
388 "",
389 ),
390 ConfigKeySpec::optional(
391 "storage.aws_region",
392 "AWS region for S3 paths (falls back to AWS_REGION env var)",
393 "",
394 ),
395 ConfigKeySpec::optional(
396 "storage.aws_session_token",
397 "AWS session token for temporary credentials (falls back to AWS_SESSION_TOKEN)",
398 "",
399 ),
400 ConfigKeySpec::optional(
401 "storage.aws_endpoint",
402 "Custom S3 endpoint (MinIO, LocalStack; falls back to AWS_ENDPOINT_URL)",
403 "",
404 ),
405 ConfigKeySpec::optional(
406 "storage.aws_profile",
407 "AWS profile name (falls back to AWS_PROFILE env var)",
408 "",
409 ),
410 ConfigKeySpec::optional(
411 "storage.azure_storage_account_name",
412 "Azure storage account name (falls back to AZURE_STORAGE_ACCOUNT_NAME)",
413 "",
414 ),
415 ConfigKeySpec::optional(
416 "storage.azure_storage_account_key",
417 "Azure storage account key (falls back to AZURE_STORAGE_ACCOUNT_KEY)",
418 "",
419 ),
420 ConfigKeySpec::optional(
421 "storage.azure_storage_sas_token",
422 "Azure SAS token (falls back to AZURE_STORAGE_SAS_TOKEN)",
423 "",
424 ),
425 ConfigKeySpec::optional(
426 "storage.azure_storage_client_id",
427 "Azure client ID for service principal auth (falls back to AZURE_CLIENT_ID)",
428 "",
429 ),
430 ConfigKeySpec::optional(
431 "storage.google_service_account_path",
432 "Path to GCS service account JSON (falls back to GOOGLE_APPLICATION_CREDENTIALS)",
433 "",
434 ),
435 ConfigKeySpec::optional(
436 "storage.google_service_account_key",
437 "Inline GCS service account JSON (falls back to GOOGLE_SERVICE_ACCOUNT_KEY)",
438 "",
439 ),
440 ]
441}
442
443fn delta_lake_source_config_keys() -> Vec<ConfigKeySpec> {
444 vec![
445 ConfigKeySpec::required(
446 "table.path",
447 "Path to Delta Lake table (local, s3://, az://, gs://)",
448 ),
449 ConfigKeySpec::optional(
450 "starting.version",
451 "Starting version to read from (default: latest)",
452 "",
453 ),
454 ConfigKeySpec::optional(
455 "poll.interval.ms",
456 "How often to poll for new versions (ms)",
457 "1000",
458 ),
459 ConfigKeySpec::optional(
460 "read.mode",
461 "Read mode: 'incremental' (changes only) or 'snapshot' (full re-read)",
462 "incremental",
463 ),
464 ConfigKeySpec::optional(
465 "partition.filter",
466 "SQL predicate for partition filter pushdown (e.g. \"date = '2024-01-01'\")",
467 "",
468 ),
469 ConfigKeySpec::optional(
470 "schema.evolution.action",
471 "Action on schema change: 'warn' or 'error'",
472 "warn",
473 ),
474 ConfigKeySpec::optional(
475 "cdf.enabled",
476 "Use Change Data Feed for incremental reads (requires CDF on table)",
477 "false",
478 ),
479 ConfigKeySpec::optional("catalog.type", "Catalog type: none, glue, unity", "none"),
481 ConfigKeySpec::optional(
482 "catalog.database",
483 "Catalog database name (required for Glue)",
484 "",
485 ),
486 ConfigKeySpec::optional("catalog.name", "Catalog name (required for Unity)", ""),
487 ConfigKeySpec::optional(
488 "catalog.schema",
489 "Catalog schema name (required for Unity)",
490 "",
491 ),
492 ConfigKeySpec::optional(
493 "catalog.workspace_url",
494 "Databricks workspace URL (required for Unity)",
495 "",
496 ),
497 ConfigKeySpec::optional(
498 "catalog.access_token",
499 "Databricks access token (required for Unity)",
500 "",
501 ),
502 ConfigKeySpec::optional(
504 "storage.s3_locking_provider",
505 "S3 locking provider: 'dynamodb' for DynamoDB-backed log store",
506 "",
507 ),
508 ConfigKeySpec::optional(
509 "storage.dynamodb_table_name",
510 "DynamoDB table name for S3 locking (default: delta_log)",
511 "",
512 ),
513 ConfigKeySpec::optional("storage.aws_access_key_id", "AWS access key ID", ""),
515 ConfigKeySpec::optional("storage.aws_secret_access_key", "AWS secret access key", ""),
516 ConfigKeySpec::optional("storage.aws_region", "AWS region for S3 paths", ""),
517 ConfigKeySpec::optional(
518 "storage.azure_storage_account_name",
519 "Azure storage account name",
520 "",
521 ),
522 ConfigKeySpec::optional(
523 "storage.azure_storage_account_key",
524 "Azure storage account key",
525 "",
526 ),
527 ConfigKeySpec::optional(
528 "storage.google_service_account_path",
529 "Path to GCS service account JSON",
530 "",
531 ),
532 ]
533}
534
535fn iceberg_sink_config_keys() -> Vec<ConfigKeySpec> {
536 vec![
537 ConfigKeySpec::required(
538 "catalog.uri",
539 "REST catalog URI (e.g., http://polaris:8181)",
540 ),
541 ConfigKeySpec::required(
542 "warehouse",
543 "Warehouse name (REST catalog) or URL (Hadoop catalog, e.g. s3://bucket/wh)",
544 ),
545 ConfigKeySpec::required("namespace", "Iceberg namespace (e.g., prod)"),
546 ConfigKeySpec::required("table.name", "Table name within the namespace"),
547 ConfigKeySpec::optional("catalog.type", "Catalog type: rest", "rest"),
548 ConfigKeySpec::optional(
549 "storage.type",
550 "Storage backend (s3 | s3a | fs). Required when warehouse is a name, not a URL",
551 "",
552 ),
553 ConfigKeySpec::optional(
554 "compression",
555 "Parquet compression: zstd, snappy, none",
556 "zstd",
557 ),
558 ConfigKeySpec::optional("auto.create", "Auto-create table if not exists", "false"),
559 ConfigKeySpec::optional(
560 "writer.id",
561 "Writer ID for exactly-once deduplication (auto UUID if not set)",
562 "",
563 ),
564 ]
565}
566
567fn iceberg_source_config_keys() -> Vec<ConfigKeySpec> {
568 vec![
569 ConfigKeySpec::required(
570 "catalog.uri",
571 "REST catalog URI (e.g., http://polaris:8181)",
572 ),
573 ConfigKeySpec::required(
574 "warehouse",
575 "Warehouse location (e.g., s3://bucket/warehouse)",
576 ),
577 ConfigKeySpec::required("namespace", "Iceberg namespace (e.g., prod)"),
578 ConfigKeySpec::required("table.name", "Table name within the namespace"),
579 ConfigKeySpec::optional("catalog.type", "Catalog type: rest", "rest"),
580 ConfigKeySpec::optional(
581 "poll.interval.ms",
582 "How often to poll for new snapshots (ms)",
583 "60000",
584 ),
585 ConfigKeySpec::optional("snapshot.id", "Pin to a specific snapshot ID", ""),
586 ConfigKeySpec::optional(
587 "select.columns",
588 "Comma-separated column names to select (empty = all)",
589 "",
590 ),
591 ]
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597
598 #[test]
599 fn test_register_delta_lake_sink() {
600 let registry = ConnectorRegistry::new();
601 register_delta_lake_sink(®istry);
602
603 let info = registry.sink_info("delta-lake");
604 assert!(info.is_some());
605 let info = info.unwrap();
606 assert_eq!(info.name, "delta-lake");
607 assert!(info.is_sink);
608 assert!(!info.is_source);
609 assert!(!info.config_keys.is_empty());
610 }
611
612 #[test]
613 fn test_config_keys_required() {
614 let keys = delta_lake_config_keys();
615 let required: Vec<&str> = keys
616 .iter()
617 .filter(|k| k.required)
618 .map(|k| k.key.as_str())
619 .collect();
620 assert!(required.contains(&"table.path"));
621 assert_eq!(required.len(), 1);
622 }
623
624 #[test]
625 fn test_config_keys_include_cloud_storage() {
626 let keys = delta_lake_config_keys();
627 let key_names: Vec<&str> = keys.iter().map(|k| k.key.as_str()).collect();
628 assert!(key_names.contains(&"storage.aws_access_key_id"));
629 assert!(key_names.contains(&"storage.aws_secret_access_key"));
630 assert!(key_names.contains(&"storage.aws_region"));
631 assert!(key_names.contains(&"storage.azure_storage_account_name"));
632 assert!(key_names.contains(&"storage.azure_storage_account_key"));
633 assert!(key_names.contains(&"storage.google_service_account_path"));
634 }
635
636 #[test]
637 fn test_config_keys_optional_present() {
638 let keys = delta_lake_config_keys();
639 let optional: Vec<&str> = keys
640 .iter()
641 .filter(|k| !k.required)
642 .map(|k| k.key.as_str())
643 .collect();
644 assert!(optional.contains(&"partition.columns"));
645 assert!(optional.contains(&"target.file.size"));
646 assert!(optional.contains(&"write.mode"));
647 assert!(optional.contains(&"delivery.guarantee"));
648 assert!(optional.contains(&"merge.key.columns"));
649 assert!(optional.contains(&"schema.evolution"));
650 assert!(optional.contains(&"compaction.enabled"));
651 assert!(optional.contains(&"compaction.z-order.columns"));
652 assert!(optional.contains(&"vacuum.retention.hours"));
653 assert!(optional.contains(&"writer.id"));
654 assert!(optional.contains(&"catalog.type"));
656 assert!(optional.contains(&"catalog.database"));
657 assert!(optional.contains(&"catalog.name"));
658 assert!(optional.contains(&"catalog.schema"));
659 assert!(optional.contains(&"catalog.workspace_url"));
660 assert!(optional.contains(&"catalog.access_token"));
661 assert!(optional.contains(&"catalog.storage.location"));
662 }
663
664 #[test]
665 fn test_factory_creates_sink() {
666 let registry = ConnectorRegistry::new();
667 register_delta_lake_sink(®istry);
668
669 let config = crate::config::ConnectorConfig::new("delta-lake");
670 let sink = registry.create_sink(&config, None);
671 assert!(sink.is_ok());
672 }
673
674 #[test]
677 fn test_register_delta_lake_source() {
678 let registry = ConnectorRegistry::new();
679 register_delta_lake_source(®istry);
680
681 let info = registry.source_info("delta-lake");
682 assert!(info.is_some());
683 let info = info.unwrap();
684 assert_eq!(info.name, "delta-lake");
685 assert!(info.is_source);
686 assert!(!info.is_sink);
687 assert!(!info.config_keys.is_empty());
688 }
689
690 #[test]
691 fn test_source_config_keys() {
692 let keys = delta_lake_source_config_keys();
693 let required: Vec<&str> = keys
694 .iter()
695 .filter(|k| k.required)
696 .map(|k| k.key.as_str())
697 .collect();
698 assert!(required.contains(&"table.path"));
699 assert_eq!(required.len(), 1);
700
701 let optional: Vec<&str> = keys
702 .iter()
703 .filter(|k| !k.required)
704 .map(|k| k.key.as_str())
705 .collect();
706 assert!(optional.contains(&"starting.version"));
707 assert!(optional.contains(&"poll.interval.ms"));
708 assert!(optional.contains(&"catalog.type"));
710 assert!(optional.contains(&"catalog.database"));
711 }
712
713 #[test]
714 fn test_factory_creates_source() {
715 let registry = ConnectorRegistry::new();
716 register_delta_lake_source(®istry);
717
718 let config = crate::config::ConnectorConfig::new("delta-lake");
719 let source = registry.create_source(&config, None);
720 assert!(source.is_ok());
721 }
722
723 #[test]
724 fn test_register_lakehouse_sinks() {
725 let registry = ConnectorRegistry::new();
726 register_lakehouse_sinks(®istry);
727
728 assert!(registry.sink_info("delta-lake").is_some());
729 assert!(registry.sink_info("iceberg").is_some());
730 }
731
732 #[test]
735 fn test_register_iceberg_sink() {
736 let registry = ConnectorRegistry::new();
737 register_iceberg_sink(®istry);
738
739 let info = registry.sink_info("iceberg");
740 assert!(info.is_some());
741 let info = info.unwrap();
742 assert_eq!(info.name, "iceberg");
743 assert!(info.is_sink);
744 assert!(!info.is_source);
745 assert!(!info.config_keys.is_empty());
746 }
747
748 #[test]
749 fn test_register_iceberg_source() {
750 let registry = ConnectorRegistry::new();
751 register_iceberg_source(®istry);
752
753 let info = registry.source_info("iceberg");
754 assert!(info.is_some());
755 let info = info.unwrap();
756 assert_eq!(info.name, "iceberg");
757 assert!(info.is_source);
758 assert!(!info.is_sink);
759 }
760
761 #[test]
762 fn test_iceberg_sink_config_keys() {
763 let keys = iceberg_sink_config_keys();
764 let required: Vec<&str> = keys
765 .iter()
766 .filter(|k| k.required)
767 .map(|k| k.key.as_str())
768 .collect();
769 assert!(required.contains(&"catalog.uri"));
770 assert!(required.contains(&"warehouse"));
771 assert!(required.contains(&"namespace"));
772 assert!(required.contains(&"table.name"));
773 assert_eq!(required.len(), 4);
774 }
775
776 #[test]
777 fn test_iceberg_source_config_keys() {
778 let keys = iceberg_source_config_keys();
779 let required: Vec<&str> = keys
780 .iter()
781 .filter(|k| k.required)
782 .map(|k| k.key.as_str())
783 .collect();
784 assert!(required.contains(&"catalog.uri"));
785 assert!(required.contains(&"warehouse"));
786 assert!(required.contains(&"namespace"));
787 assert!(required.contains(&"table.name"));
788 assert_eq!(required.len(), 4);
789 }
790
791 #[test]
792 fn test_factory_creates_iceberg_sink() {
793 let registry = ConnectorRegistry::new();
794 register_iceberg_sink(®istry);
795
796 let config = crate::config::ConnectorConfig::new("iceberg");
797 let sink = registry.create_sink(&config, None);
798 assert!(sink.is_ok());
799 }
800
801 #[test]
802 fn test_factory_creates_iceberg_source() {
803 let registry = ConnectorRegistry::new();
804 register_iceberg_source(®istry);
805
806 let config = crate::config::ConnectorConfig::new("iceberg");
807 let source = registry.create_source(&config, None);
808 assert!(source.is_ok());
809 }
810}