laminar_connectors/lakehouse/
iceberg_config.rs1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
9use std::fmt;
10use std::str::FromStr;
11use std::time::Duration;
12
13use crate::config::ConnectorConfig;
14use crate::error::ConnectorError;
15
16#[derive(Debug, Clone, PartialEq, Eq, Default)]
18pub enum IcebergCatalogType {
19 #[default]
21 Rest,
22}
23
24impl FromStr for IcebergCatalogType {
25 type Err = String;
26
27 fn from_str(s: &str) -> Result<Self, Self::Err> {
28 match s.to_lowercase().as_str() {
29 "rest" => Ok(Self::Rest),
30 other => Err(format!("unsupported iceberg catalog type: '{other}'")),
31 }
32 }
33}
34
35impl fmt::Display for IcebergCatalogType {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 match self {
38 Self::Rest => write!(f, "rest"),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
45pub struct IcebergCatalogConfig {
46 pub catalog_type: IcebergCatalogType,
48 pub catalog_uri: String,
50 pub warehouse: String,
52 pub storage_type: Option<String>,
54 pub namespace: String,
56 pub table_name: String,
58 pub properties: HashMap<String, String>,
60}
61
62impl IcebergCatalogConfig {
63 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
69 let catalog_type = if let Some(v) = config.get("catalog.type") {
70 v.parse()
71 .map_err(|e: String| ConnectorError::ConfigurationError(e))?
72 } else {
73 IcebergCatalogType::default()
74 };
75
76 let catalog_uri = config.require("catalog.uri")?.to_string();
77 let warehouse = config.require("warehouse")?.to_string();
78 let storage_type = config.get("storage.type").map(str::to_string);
79 let namespace = config.require("namespace")?.to_string();
80 let table_name = config.require("table.name")?.to_string();
81
82 let properties = config.properties_with_prefix("catalog.property.");
83
84 Ok(Self {
85 catalog_type,
86 catalog_uri,
87 warehouse,
88 storage_type,
89 namespace,
90 table_name,
91 properties,
92 })
93 }
94}
95
96#[derive(Debug, Clone)]
100pub struct IcebergSinkConfig {
101 pub catalog: IcebergCatalogConfig,
103 pub compression: String,
105 pub auto_create: bool,
107 pub writer_id: String,
109}
110
111impl IcebergSinkConfig {
112 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
118 let catalog = IcebergCatalogConfig::from_config(config)?;
119
120 let compression = config.get("compression").unwrap_or("zstd").to_string();
121
122 let auto_create = config
123 .get("auto.create")
124 .is_some_and(|v| v.eq_ignore_ascii_case("true"));
125
126 let writer_id = config
127 .get("writer.id")
128 .filter(|v| !v.is_empty())
129 .map_or_else(|| uuid::Uuid::now_v7().to_string(), ToString::to_string);
130
131 Ok(Self {
132 catalog,
133 compression,
134 auto_create,
135 writer_id,
136 })
137 }
138}
139
140#[derive(Debug, Clone)]
144pub struct IcebergSourceConfig {
145 pub catalog: IcebergCatalogConfig,
147 pub poll_interval: Duration,
149 pub snapshot_id: Option<i64>,
151 pub select_columns: Vec<String>,
153}
154
155impl IcebergSourceConfig {
156 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
162 let catalog = IcebergCatalogConfig::from_config(config)?;
163
164 let poll_interval = config
165 .get_parsed::<u64>("poll.interval.ms")?
166 .map_or(Duration::from_secs(60), Duration::from_millis);
167
168 let snapshot_id = config.get_parsed::<i64>("snapshot.id")?;
169
170 let select_columns = config
171 .get("select.columns")
172 .unwrap_or("")
173 .split(',')
174 .map(|s| s.trim().to_string())
175 .filter(|s| !s.is_empty())
176 .collect();
177
178 Ok(Self {
179 catalog,
180 poll_interval,
181 snapshot_id,
182 select_columns,
183 })
184 }
185}
186
187fn is_safe_widening(from: &arrow_schema::DataType, to: &arrow_schema::DataType) -> bool {
189 use arrow_schema::DataType;
190 matches!(
191 (from, to),
192 (
193 DataType::Int8,
194 DataType::Int16 | DataType::Int32 | DataType::Int64
195 ) | (DataType::Int16, DataType::Int32 | DataType::Int64)
196 | (DataType::Int32, DataType::Int64)
197 | (DataType::Float32, DataType::Float64)
198 | (DataType::Utf8, DataType::LargeUtf8)
199 )
200}
201
202pub fn validate_sink_schema(
213 pipeline: &arrow_schema::Schema,
214 table: &arrow_schema::Schema,
215) -> Result<(), ConnectorError> {
216 for field in pipeline.fields() {
217 match table.field_with_name(field.name()) {
218 Ok(table_field) => {
219 if field.data_type() != table_field.data_type()
220 && !is_safe_widening(field.data_type(), table_field.data_type())
221 {
222 return Err(ConnectorError::SchemaMismatch(format!(
223 "field '{}': pipeline type {} incompatible with table type {}",
224 field.name(),
225 field.data_type(),
226 table_field.data_type(),
227 )));
228 }
229 }
230 Err(_) => {
231 return Err(ConnectorError::SchemaMismatch(format!(
232 "pipeline field '{}' ({}) not found in Iceberg table schema",
233 field.name(),
234 field.data_type(),
235 )));
236 }
237 }
238 }
239 Ok(())
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn test_catalog_type_parse() {
248 assert_eq!(
249 "rest".parse::<IcebergCatalogType>().unwrap(),
250 IcebergCatalogType::Rest
251 );
252 assert!("unknown".parse::<IcebergCatalogType>().is_err());
253 }
254
255 #[test]
256 fn test_sink_config_from_config() {
257 let mut config = ConnectorConfig::new("iceberg");
258 config.set("catalog.uri", "http://localhost:8181");
259 config.set("warehouse", "s3://bucket/wh");
260 config.set("namespace", "prod");
261 config.set("table.name", "events");
262 config.set("compression", "snappy");
263
264 let cfg = IcebergSinkConfig::from_config(&config).unwrap();
265 assert_eq!(cfg.catalog.catalog_uri, "http://localhost:8181");
266 assert_eq!(cfg.catalog.warehouse, "s3://bucket/wh");
267 assert_eq!(cfg.catalog.namespace, "prod");
268 assert_eq!(cfg.catalog.table_name, "events");
269 assert_eq!(cfg.compression, "snappy");
270 assert!(!cfg.writer_id.is_empty());
271 }
272
273 #[test]
274 fn test_source_config_from_config() {
275 let mut config = ConnectorConfig::new("iceberg");
276 config.set("catalog.uri", "http://localhost:8181");
277 config.set("warehouse", "s3://bucket/wh");
278 config.set("namespace", "prod");
279 config.set("table.name", "dim_customers");
280 config.set("poll.interval.ms", "30000");
281 config.set("snapshot.id", "42");
282
283 let cfg = IcebergSourceConfig::from_config(&config).unwrap();
284 assert_eq!(cfg.poll_interval, Duration::from_secs(30));
285 assert_eq!(cfg.snapshot_id, Some(42));
286 }
287
288 #[test]
289 fn test_missing_required_field() {
290 let config = ConnectorConfig::new("iceberg");
291 assert!(IcebergSinkConfig::from_config(&config).is_err());
292 }
293
294 #[test]
295 fn test_defaults() {
296 let mut config = ConnectorConfig::new("iceberg");
297 config.set("catalog.uri", "http://localhost:8181");
298 config.set("warehouse", "s3://bucket/wh");
299 config.set("namespace", "prod");
300 config.set("table.name", "events");
301
302 let cfg = IcebergSinkConfig::from_config(&config).unwrap();
303 assert_eq!(cfg.compression, "zstd");
304 assert!(!cfg.auto_create);
305 }
306
307 use arrow_schema::{DataType, Field, Schema};
310
311 fn schema(fields: Vec<(&str, DataType)>) -> Schema {
312 Schema::new(
313 fields
314 .into_iter()
315 .map(|(n, t)| Field::new(n, t, true))
316 .collect::<Vec<_>>(),
317 )
318 }
319
320 #[test]
321 fn test_validate_matching_schemas() {
322 let s = schema(vec![("id", DataType::Int64), ("name", DataType::Utf8)]);
323 assert!(validate_sink_schema(&s, &s).is_ok());
324 }
325
326 #[test]
327 fn test_validate_missing_field() {
328 let pipeline = schema(vec![("id", DataType::Int64), ("extra", DataType::Utf8)]);
329 let table = schema(vec![("id", DataType::Int64)]);
330 let err = validate_sink_schema(&pipeline, &table).unwrap_err();
331 assert!(err.to_string().contains("extra"));
332 }
333
334 #[test]
335 fn test_validate_type_mismatch() {
336 let pipeline = schema(vec![("id", DataType::Int64)]);
337 let table = schema(vec![("id", DataType::Utf8)]);
338 let err = validate_sink_schema(&pipeline, &table).unwrap_err();
339 assert!(err.to_string().contains("incompatible"));
340 }
341
342 #[test]
343 fn test_validate_extra_table_columns_ok() {
344 let pipeline = schema(vec![("id", DataType::Int64)]);
345 let table = schema(vec![("id", DataType::Int64), ("extra", DataType::Utf8)]);
346 assert!(validate_sink_schema(&pipeline, &table).is_ok());
347 }
348
349 #[test]
350 fn test_validate_safe_widening() {
351 let pipeline = schema(vec![("n", DataType::Int32), ("f", DataType::Float32)]);
352 let table = schema(vec![("n", DataType::Int64), ("f", DataType::Float64)]);
353 assert!(validate_sink_schema(&pipeline, &table).is_ok());
354 }
355}