Skip to main content

laminar_connectors/lakehouse/
iceberg_config.rs

1//! Apache Iceberg connector configuration.
2//!
3//! [`IcebergSinkConfig`] and [`IcebergSourceConfig`] encapsulate settings for
4//! writing to and reading from Iceberg tables, parsed from SQL `WITH (...)`
5//! clauses via their respective `from_config` methods.
6#![allow(clippy::disallowed_types)] // cold path: lakehouse configuration
7
8use std::collections::HashMap;
9use std::fmt;
10use std::str::FromStr;
11use std::time::Duration;
12
13use crate::config::ConnectorConfig;
14use crate::error::ConnectorError;
15
16/// Iceberg catalog type.
17#[derive(Debug, Clone, PartialEq, Eq, Default)]
18pub enum IcebergCatalogType {
19    /// REST catalog (Polaris, Nessie, Unity, Glue adapter).
20    #[default]
21    Rest,
22}
23
24impl FromStr for IcebergCatalogType {
25    type Err = String;
26
27    fn from_str(s: &str) -> Result<Self, Self::Err> {
28        match s.to_lowercase().as_str() {
29            "rest" => Ok(Self::Rest),
30            other => Err(format!("unsupported iceberg catalog type: '{other}'")),
31        }
32    }
33}
34
35impl fmt::Display for IcebergCatalogType {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        match self {
38            Self::Rest => write!(f, "rest"),
39        }
40    }
41}
42
43/// Shared catalog connection settings for both source and sink.
44#[derive(Debug, Clone)]
45pub struct IcebergCatalogConfig {
46    /// Catalog type (currently only REST).
47    pub catalog_type: IcebergCatalogType,
48    /// REST catalog URI (e.g., `http://polaris:8181`).
49    pub catalog_uri: String,
50    /// Warehouse URL (Hadoop-style: `s3://bucket/wh`) or name (REST catalogs).
51    pub warehouse: String,
52    /// Explicit storage backend. Required when `warehouse` is a name.
53    pub storage_type: Option<String>,
54    /// Iceberg namespace (e.g., `prod` or `prod.analytics`).
55    pub namespace: String,
56    /// Table name within the namespace.
57    pub table_name: String,
58    /// Additional catalog properties (credentials, endpoints, etc.).
59    pub properties: HashMap<String, String>,
60}
61
62impl IcebergCatalogConfig {
63    /// Parses shared catalog settings from a [`ConnectorConfig`].
64    ///
65    /// # Errors
66    ///
67    /// Returns `ConnectorError::MissingConfig` if required keys are absent.
68    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
69        let catalog_type = if let Some(v) = config.get("catalog.type") {
70            v.parse()
71                .map_err(|e: String| ConnectorError::ConfigurationError(e))?
72        } else {
73            IcebergCatalogType::default()
74        };
75
76        let catalog_uri = config.require("catalog.uri")?.to_string();
77        let warehouse = config.require("warehouse")?.to_string();
78        let storage_type = config.get("storage.type").map(str::to_string);
79        let namespace = config.require("namespace")?.to_string();
80        let table_name = config.require("table.name")?.to_string();
81
82        let properties = config.properties_with_prefix("catalog.property.");
83
84        Ok(Self {
85            catalog_type,
86            catalog_uri,
87            warehouse,
88            storage_type,
89            namespace,
90            table_name,
91            properties,
92        })
93    }
94}
95
96// ── Sink Configuration ──
97
98/// Configuration for the Iceberg sink connector.
99#[derive(Debug, Clone)]
100pub struct IcebergSinkConfig {
101    /// Shared catalog connection settings.
102    pub catalog: IcebergCatalogConfig,
103    /// Parquet compression codec (default: zstd).
104    pub compression: String,
105    /// Auto-create table if it doesn't exist.
106    pub auto_create: bool,
107    /// Writer ID for exactly-once deduplication (auto UUID if not set).
108    pub writer_id: String,
109}
110
111impl IcebergSinkConfig {
112    /// Parses a sink config from a [`ConnectorConfig`] (SQL WITH clause).
113    ///
114    /// # Errors
115    ///
116    /// Returns `ConnectorError` on missing or invalid values.
117    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
118        let catalog = IcebergCatalogConfig::from_config(config)?;
119
120        let compression = config.get("compression").unwrap_or("zstd").to_string();
121
122        let auto_create = config
123            .get("auto.create")
124            .is_some_and(|v| v.eq_ignore_ascii_case("true"));
125
126        let writer_id = config
127            .get("writer.id")
128            .filter(|v| !v.is_empty())
129            .map_or_else(|| uuid::Uuid::now_v7().to_string(), ToString::to_string);
130
131        Ok(Self {
132            catalog,
133            compression,
134            auto_create,
135            writer_id,
136        })
137    }
138}
139
140// ── Source Configuration ──
141
142/// Configuration for the Iceberg source connector (lookup/reference table).
143#[derive(Debug, Clone)]
144pub struct IcebergSourceConfig {
145    /// Shared catalog connection settings.
146    pub catalog: IcebergCatalogConfig,
147    /// How often to poll for new snapshots (default: 60s).
148    pub poll_interval: Duration,
149    /// Pin to a specific snapshot ID (no polling if set).
150    pub snapshot_id: Option<i64>,
151    /// Columns to select (empty = all columns).
152    pub select_columns: Vec<String>,
153}
154
155impl IcebergSourceConfig {
156    /// Parses a source config from a [`ConnectorConfig`] (SQL WITH clause).
157    ///
158    /// # Errors
159    ///
160    /// Returns `ConnectorError` on missing or invalid values.
161    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
162        let catalog = IcebergCatalogConfig::from_config(config)?;
163
164        let poll_interval = config
165            .get_parsed::<u64>("poll.interval.ms")?
166            .map_or(Duration::from_secs(60), Duration::from_millis);
167
168        let snapshot_id = config.get_parsed::<i64>("snapshot.id")?;
169
170        let select_columns = config
171            .get("select.columns")
172            .unwrap_or("")
173            .split(',')
174            .map(|s| s.trim().to_string())
175            .filter(|s| !s.is_empty())
176            .collect();
177
178        Ok(Self {
179            catalog,
180            poll_interval,
181            snapshot_id,
182            select_columns,
183        })
184    }
185}
186
187/// Checks if `from` can be safely widened to `to` without data loss.
188fn is_safe_widening(from: &arrow_schema::DataType, to: &arrow_schema::DataType) -> bool {
189    use arrow_schema::DataType;
190    matches!(
191        (from, to),
192        (
193            DataType::Int8,
194            DataType::Int16 | DataType::Int32 | DataType::Int64
195        ) | (DataType::Int16, DataType::Int32 | DataType::Int64)
196            | (DataType::Int32, DataType::Int64)
197            | (DataType::Float32, DataType::Float64)
198            | (DataType::Utf8, DataType::LargeUtf8)
199    )
200}
201
202/// Validates that a pipeline's output schema is compatible with an Iceberg
203/// table's Arrow schema.
204///
205/// Every field in `pipeline` must exist in `table` with a matching or
206/// safely-widenable type. Extra columns in `table` are acceptable (Iceberg
207/// fills them with nulls).
208///
209/// # Errors
210///
211/// Returns `ConnectorError::SchemaMismatch` on incompatible fields.
212pub fn validate_sink_schema(
213    pipeline: &arrow_schema::Schema,
214    table: &arrow_schema::Schema,
215) -> Result<(), ConnectorError> {
216    for field in pipeline.fields() {
217        match table.field_with_name(field.name()) {
218            Ok(table_field) => {
219                if field.data_type() != table_field.data_type()
220                    && !is_safe_widening(field.data_type(), table_field.data_type())
221                {
222                    return Err(ConnectorError::SchemaMismatch(format!(
223                        "field '{}': pipeline type {} incompatible with table type {}",
224                        field.name(),
225                        field.data_type(),
226                        table_field.data_type(),
227                    )));
228                }
229            }
230            Err(_) => {
231                return Err(ConnectorError::SchemaMismatch(format!(
232                    "pipeline field '{}' ({}) not found in Iceberg table schema",
233                    field.name(),
234                    field.data_type(),
235                )));
236            }
237        }
238    }
239    Ok(())
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn test_catalog_type_parse() {
248        assert_eq!(
249            "rest".parse::<IcebergCatalogType>().unwrap(),
250            IcebergCatalogType::Rest
251        );
252        assert!("unknown".parse::<IcebergCatalogType>().is_err());
253    }
254
255    #[test]
256    fn test_sink_config_from_config() {
257        let mut config = ConnectorConfig::new("iceberg");
258        config.set("catalog.uri", "http://localhost:8181");
259        config.set("warehouse", "s3://bucket/wh");
260        config.set("namespace", "prod");
261        config.set("table.name", "events");
262        config.set("compression", "snappy");
263
264        let cfg = IcebergSinkConfig::from_config(&config).unwrap();
265        assert_eq!(cfg.catalog.catalog_uri, "http://localhost:8181");
266        assert_eq!(cfg.catalog.warehouse, "s3://bucket/wh");
267        assert_eq!(cfg.catalog.namespace, "prod");
268        assert_eq!(cfg.catalog.table_name, "events");
269        assert_eq!(cfg.compression, "snappy");
270        assert!(!cfg.writer_id.is_empty());
271    }
272
273    #[test]
274    fn test_source_config_from_config() {
275        let mut config = ConnectorConfig::new("iceberg");
276        config.set("catalog.uri", "http://localhost:8181");
277        config.set("warehouse", "s3://bucket/wh");
278        config.set("namespace", "prod");
279        config.set("table.name", "dim_customers");
280        config.set("poll.interval.ms", "30000");
281        config.set("snapshot.id", "42");
282
283        let cfg = IcebergSourceConfig::from_config(&config).unwrap();
284        assert_eq!(cfg.poll_interval, Duration::from_secs(30));
285        assert_eq!(cfg.snapshot_id, Some(42));
286    }
287
288    #[test]
289    fn test_missing_required_field() {
290        let config = ConnectorConfig::new("iceberg");
291        assert!(IcebergSinkConfig::from_config(&config).is_err());
292    }
293
294    #[test]
295    fn test_defaults() {
296        let mut config = ConnectorConfig::new("iceberg");
297        config.set("catalog.uri", "http://localhost:8181");
298        config.set("warehouse", "s3://bucket/wh");
299        config.set("namespace", "prod");
300        config.set("table.name", "events");
301
302        let cfg = IcebergSinkConfig::from_config(&config).unwrap();
303        assert_eq!(cfg.compression, "zstd");
304        assert!(!cfg.auto_create);
305    }
306
307    // ── Schema validation tests ──
308
309    use arrow_schema::{DataType, Field, Schema};
310
311    fn schema(fields: Vec<(&str, DataType)>) -> Schema {
312        Schema::new(
313            fields
314                .into_iter()
315                .map(|(n, t)| Field::new(n, t, true))
316                .collect::<Vec<_>>(),
317        )
318    }
319
320    #[test]
321    fn test_validate_matching_schemas() {
322        let s = schema(vec![("id", DataType::Int64), ("name", DataType::Utf8)]);
323        assert!(validate_sink_schema(&s, &s).is_ok());
324    }
325
326    #[test]
327    fn test_validate_missing_field() {
328        let pipeline = schema(vec![("id", DataType::Int64), ("extra", DataType::Utf8)]);
329        let table = schema(vec![("id", DataType::Int64)]);
330        let err = validate_sink_schema(&pipeline, &table).unwrap_err();
331        assert!(err.to_string().contains("extra"));
332    }
333
334    #[test]
335    fn test_validate_type_mismatch() {
336        let pipeline = schema(vec![("id", DataType::Int64)]);
337        let table = schema(vec![("id", DataType::Utf8)]);
338        let err = validate_sink_schema(&pipeline, &table).unwrap_err();
339        assert!(err.to_string().contains("incompatible"));
340    }
341
342    #[test]
343    fn test_validate_extra_table_columns_ok() {
344        let pipeline = schema(vec![("id", DataType::Int64)]);
345        let table = schema(vec![("id", DataType::Int64), ("extra", DataType::Utf8)]);
346        assert!(validate_sink_schema(&pipeline, &table).is_ok());
347    }
348
349    #[test]
350    fn test_validate_safe_widening() {
351        let pipeline = schema(vec![("n", DataType::Int32), ("f", DataType::Float32)]);
352        let table = schema(vec![("n", DataType::Int64), ("f", DataType::Float64)]);
353        assert!(validate_sink_schema(&pipeline, &table).is_ok());
354    }
355}