laminar_connectors/lakehouse/
iceberg_io.rs1#![allow(clippy::disallowed_types)] #![cfg(feature = "iceberg")]
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use arrow_array::RecordBatch;
12use iceberg::table::Table;
13use iceberg::transaction::{ApplyTransactionAction, Transaction};
14use iceberg::{Catalog, CatalogBuilder, TableIdent};
15use iceberg_catalog_rest::RestCatalogBuilder;
16use iceberg_storage_opendal::OpenDalStorageFactory;
17use tokio_stream::StreamExt;
18
19use super::iceberg_config::{IcebergCatalogConfig, IcebergCatalogType};
20use crate::error::ConnectorError;
21
22fn storage_factory(
26 warehouse: &str,
27 storage_type: Option<&str>,
28) -> Result<Arc<dyn iceberg::io::StorageFactory>, ConnectorError> {
29 let scheme = storage_type
30 .map(str::to_lowercase)
31 .or_else(|| {
32 if warehouse.starts_with("s3a://") {
33 Some("s3a".to_string())
34 } else if warehouse.starts_with("s3://") {
35 Some("s3".to_string())
36 } else if warehouse.starts_with("file://") {
37 Some("fs".to_string())
38 } else {
39 None
40 }
41 })
42 .ok_or_else(|| {
43 ConnectorError::ConfigurationError(format!(
44 "[LDB-5100] cannot infer storage backend from warehouse '{warehouse}'; \
45 set storage.type = 's3' | 's3a' | 'fs'"
46 ))
47 })?;
48
49 let factory: Arc<dyn iceberg::io::StorageFactory> = match scheme.as_str() {
52 "s3" | "s3a" => Arc::new(OpenDalStorageFactory::S3 {
53 configured_scheme: scheme,
54 customized_credential_load: None,
55 }),
56 "fs" => Arc::new(OpenDalStorageFactory::Fs),
57 other => {
58 return Err(ConnectorError::ConfigurationError(format!(
59 "[LDB-5101] unsupported storage.type '{other}'; expected s3 | s3a | fs"
60 )));
61 }
62 };
63 Ok(factory)
64}
65
66pub async fn build_catalog(
72 config: &IcebergCatalogConfig,
73) -> Result<Arc<dyn Catalog>, ConnectorError> {
74 match config.catalog_type {
75 IcebergCatalogType::Rest => build_rest_catalog(config).await,
76 }
77}
78
79async fn build_rest_catalog(
80 config: &IcebergCatalogConfig,
81) -> Result<Arc<dyn Catalog>, ConnectorError> {
82 let storage_factory = storage_factory(&config.warehouse, config.storage_type.as_deref())?;
83
84 let mut props = HashMap::new();
85 props.insert("uri".to_string(), config.catalog_uri.clone());
86 props.insert("warehouse".to_string(), config.warehouse.clone());
87
88 for (k, v) in &config.properties {
89 props.insert(k.clone(), v.clone());
90 }
91
92 let catalog = RestCatalogBuilder::default()
93 .with_storage_factory(storage_factory)
94 .load("laminardb", props)
95 .await
96 .map_err(|e| ConnectorError::ConnectionFailed(format!("iceberg catalog: {e}")))?;
97
98 Ok(Arc::new(catalog))
99}
100
101pub async fn load_table(
107 catalog: &dyn Catalog,
108 namespace: &str,
109 table_name: &str,
110) -> Result<Table, ConnectorError> {
111 let ns = iceberg::NamespaceIdent::from_strs(namespace.split('.').collect::<Vec<_>>())
112 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid namespace: {e}")))?;
113
114 let ident = TableIdent::new(ns, table_name.to_string());
115
116 catalog
117 .load_table(&ident)
118 .await
119 .map_err(|e| ConnectorError::ReadError(format!("load table '{table_name}': {e}")))
120}
121
122pub async fn scan_table(
128 table: &Table,
129 snapshot_id: Option<i64>,
130 select_columns: &[String],
131) -> Result<Vec<RecordBatch>, ConnectorError> {
132 let mut scan_builder = table.scan();
133
134 if let Some(sid) = snapshot_id {
135 scan_builder = scan_builder.snapshot_id(sid);
136 }
137
138 if select_columns.is_empty() {
139 scan_builder = scan_builder.select_all();
140 } else {
141 scan_builder = scan_builder.select(select_columns.iter().map(String::as_str));
142 }
143
144 let scan = scan_builder
145 .build()
146 .map_err(|e| ConnectorError::ReadError(format!("build scan: {e}")))?;
147
148 let stream = scan
149 .to_arrow()
150 .await
151 .map_err(|e| ConnectorError::ReadError(format!("scan to arrow: {e}")))?;
152
153 let mut batches = Vec::new();
154 let mut stream = std::pin::pin!(stream);
155 while let Some(result) = stream.next().await {
156 let batch = result.map_err(|e| ConnectorError::ReadError(format!("read batch: {e}")))?;
157 batches.push(batch);
158 }
159
160 Ok(batches)
161}
162
163#[must_use]
165pub fn current_snapshot_id(table: &Table) -> Option<i64> {
166 table.metadata().current_snapshot().map(|s| s.snapshot_id())
167}
168
169pub async fn commit_data_files(
181 table: &Table,
182 catalog: &dyn Catalog,
183 data_files: Vec<iceberg::spec::DataFile>,
184 epoch_metadata: Option<(&str, u64)>,
185) -> Result<Table, ConnectorError> {
186 let tx = Transaction::new(table);
187 let tx: Transaction = tx
188 .fast_append()
189 .add_data_files(data_files)
190 .apply(tx)
191 .map_err(|e| ConnectorError::TransactionError(format!("apply fast_append: {e}")))?;
192
193 let tx = if let Some((writer_id, epoch)) = epoch_metadata {
195 let key = format!("laminardb.writer.{writer_id}.last_epoch");
196 tx.update_table_properties()
197 .set(key, epoch.to_string())
198 .apply(tx)
199 .map_err(|e| {
200 ConnectorError::TransactionError(format!("apply update_table_properties: {e}"))
201 })?
202 } else {
203 tx
204 };
205
206 tx.commit(catalog)
207 .await
208 .map_err(|e| ConnectorError::TransactionError(format!("commit: {e}")))
209}
210
211#[must_use]
215pub fn get_last_committed_epoch(table: &Table, writer_id: &str) -> Option<u64> {
216 let key = format!("laminardb.writer.{writer_id}.last_epoch");
217 table.metadata().properties().get(&key)?.parse().ok()
218}
219
220pub async fn ensure_table_exists(
226 catalog: &dyn Catalog,
227 namespace: &str,
228 table_name: &str,
229 arrow_schema: &arrow_schema::SchemaRef,
230) -> Result<(), ConnectorError> {
231 let ns = iceberg::NamespaceIdent::from_strs(namespace.split('.').collect::<Vec<_>>())
232 .map_err(|e| ConnectorError::ConfigurationError(format!("invalid namespace: {e}")))?;
233
234 let ident = TableIdent::new(ns.clone(), table_name.to_string());
235
236 if catalog
237 .table_exists(&ident)
238 .await
239 .map_err(|e| ConnectorError::ReadError(format!("table_exists: {e}")))?
240 {
241 return Ok(());
242 }
243
244 let iceberg_schema = iceberg::arrow::arrow_schema_to_schema_auto_assign_ids(arrow_schema)
247 .map_err(|e| {
248 ConnectorError::SchemaMismatch(format!("arrow→iceberg schema conversion: {e}"))
249 })?;
250
251 if !catalog
252 .namespace_exists(&ns)
253 .await
254 .map_err(|e| ConnectorError::ReadError(format!("namespace_exists: {e}")))?
255 {
256 catalog
257 .create_namespace(&ns, HashMap::new())
258 .await
259 .map_err(|e| ConnectorError::WriteError(format!("create namespace: {e}")))?;
260 }
261
262 let creation = iceberg::TableCreation::builder()
263 .name(table_name.to_string())
264 .schema(iceberg_schema)
265 .build();
266
267 catalog
268 .create_table(&ns, creation)
269 .await
270 .map_err(|e| ConnectorError::WriteError(format!("create table: {e}")))?;
271
272 Ok(())
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn test_storage_factory_infers_s3_from_warehouse_url() {
281 let f = storage_factory("s3://bucket/warehouse", None).unwrap();
282 assert!(format!("{f:?}").contains("S3"));
283 }
284
285 #[test]
286 fn test_storage_factory_infers_s3a_from_warehouse_url() {
287 let f = storage_factory("s3a://bucket/warehouse", None).unwrap();
288 assert!(format!("{f:?}").contains("S3"));
289 }
290
291 #[test]
292 fn test_storage_factory_infers_fs_from_file_url() {
293 let f = storage_factory("file:///tmp/warehouse", None).unwrap();
294 assert!(format!("{f:?}").contains("Fs"));
295 }
296
297 #[test]
298 fn test_storage_factory_bare_path_requires_explicit_storage_type() {
299 let err = storage_factory("/tmp/warehouse", None)
302 .unwrap_err()
303 .to_string();
304 assert!(err.contains("LDB-5100"), "got: {err}");
305 }
306
307 #[test]
308 fn test_storage_factory_explicit_overrides_inference() {
309 let f = storage_factory("demo", Some("s3")).unwrap();
311 assert!(format!("{f:?}").contains("S3"));
312 }
313
314 #[test]
315 fn test_storage_factory_unknown_warehouse_without_storage_type_errors() {
316 let err = storage_factory("demo", None).unwrap_err().to_string();
317 assert!(err.contains("LDB-5100"), "got: {err}");
318 }
319
320 #[test]
321 fn test_storage_factory_rejects_unknown_storage_type() {
322 let err = storage_factory("demo", Some("hdfs"))
323 .unwrap_err()
324 .to_string();
325 assert!(err.contains("LDB-5101"), "got: {err}");
326 }
327
328 #[test]
329 fn test_current_snapshot_id_empty_table() {
330 let schema = iceberg::spec::Schema::builder()
331 .with_fields(vec![])
332 .build()
333 .unwrap();
334
335 let creation = iceberg::TableCreation::builder()
336 .name("test_table".to_string())
337 .schema(schema)
338 .location("s3://test/location".to_string())
339 .build();
340
341 let metadata = iceberg::spec::TableMetadataBuilder::from_table_creation(creation)
342 .unwrap()
343 .build()
344 .unwrap();
345
346 let table = Table::builder()
347 .metadata(metadata.metadata)
348 .identifier(TableIdent::new(
349 iceberg::NamespaceIdent::new("test".to_string()),
350 "t".to_string(),
351 ))
352 .file_io(iceberg::io::FileIO::new_with_memory())
353 .build()
354 .unwrap();
355
356 assert!(current_snapshot_id(&table).is_none());
357 }
358}