Skip to main content

laminar_connectors/lakehouse/
iceberg_io.rs

1//! Feature-gated I/O operations for Apache Iceberg.
2//!
3//! Contains catalog construction, table loading, scanning, and writing
4//! functions. All code requires the `iceberg` feature.
5#![allow(clippy::disallowed_types)] // cold path: lakehouse I/O
6#![cfg(feature = "iceberg")]
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use arrow_array::RecordBatch;
12use iceberg::table::Table;
13use iceberg::transaction::{ApplyTransactionAction, Transaction};
14use iceberg::{Catalog, CatalogBuilder, TableIdent};
15use iceberg_catalog_rest::RestCatalogBuilder;
16use iceberg_storage_opendal::OpenDalStorageFactory;
17use tokio_stream::StreamExt;
18
19use super::iceberg_config::{IcebergCatalogConfig, IcebergCatalogType};
20use crate::error::ConnectorError;
21
22/// Selects the `OpenDalStorageFactory` for the table-data URLs the catalog
23/// will return. Explicit `storage.type` wins; otherwise inferred from the
24/// `s3://` / `s3a://` / `file://` warehouse URL.
25fn storage_factory(
26    warehouse: &str,
27    storage_type: Option<&str>,
28) -> Result<Arc<dyn iceberg::io::StorageFactory>, ConnectorError> {
29    let scheme = storage_type
30        .map(str::to_lowercase)
31        .or_else(|| {
32            if warehouse.starts_with("s3a://") {
33                Some("s3a".to_string())
34            } else if warehouse.starts_with("s3://") {
35                Some("s3".to_string())
36            } else if warehouse.starts_with("file://") {
37                Some("fs".to_string())
38            } else {
39                None
40            }
41        })
42        .ok_or_else(|| {
43            ConnectorError::ConfigurationError(format!(
44                "[LDB-5100] cannot infer storage backend from warehouse '{warehouse}'; \
45                 set storage.type = 's3' | 's3a' | 'fs'"
46            ))
47        })?;
48
49    // configured_scheme is the bare scheme ("s3"), NOT "s3://" —
50    // iceberg-storage-opendal formats the prefix as `{scheme}://{bucket}/`.
51    let factory: Arc<dyn iceberg::io::StorageFactory> = match scheme.as_str() {
52        "s3" | "s3a" => Arc::new(OpenDalStorageFactory::S3 {
53            configured_scheme: scheme,
54            customized_credential_load: None,
55        }),
56        "fs" => Arc::new(OpenDalStorageFactory::Fs),
57        other => {
58            return Err(ConnectorError::ConfigurationError(format!(
59                "[LDB-5101] unsupported storage.type '{other}'; expected s3 | s3a | fs"
60            )));
61        }
62    };
63    Ok(factory)
64}
65
66/// Builds a REST catalog from configuration.
67///
68/// # Errors
69///
70/// Returns `ConnectorError::ConnectionFailed` if catalog initialization fails.
71pub async fn build_catalog(
72    config: &IcebergCatalogConfig,
73) -> Result<Arc<dyn Catalog>, ConnectorError> {
74    match config.catalog_type {
75        IcebergCatalogType::Rest => build_rest_catalog(config).await,
76    }
77}
78
79async fn build_rest_catalog(
80    config: &IcebergCatalogConfig,
81) -> Result<Arc<dyn Catalog>, ConnectorError> {
82    let storage_factory = storage_factory(&config.warehouse, config.storage_type.as_deref())?;
83
84    let mut props = HashMap::new();
85    props.insert("uri".to_string(), config.catalog_uri.clone());
86    props.insert("warehouse".to_string(), config.warehouse.clone());
87
88    for (k, v) in &config.properties {
89        props.insert(k.clone(), v.clone());
90    }
91
92    let catalog = RestCatalogBuilder::default()
93        .with_storage_factory(storage_factory)
94        .load("laminardb", props)
95        .await
96        .map_err(|e| ConnectorError::ConnectionFailed(format!("iceberg catalog: {e}")))?;
97
98    Ok(Arc::new(catalog))
99}
100
101/// Loads an Iceberg table from the catalog.
102///
103/// # Errors
104///
105/// Returns `ConnectorError::ReadError` if the table cannot be loaded.
106pub async fn load_table(
107    catalog: &dyn Catalog,
108    namespace: &str,
109    table_name: &str,
110) -> Result<Table, ConnectorError> {
111    let ns = iceberg::NamespaceIdent::from_strs(namespace.split('.').collect::<Vec<_>>())
112        .map_err(|e| ConnectorError::ConfigurationError(format!("invalid namespace: {e}")))?;
113
114    let ident = TableIdent::new(ns, table_name.to_string());
115
116    catalog
117        .load_table(&ident)
118        .await
119        .map_err(|e| ConnectorError::ReadError(format!("load table '{table_name}': {e}")))
120}
121
122/// Scans a table and returns all record batches for the current snapshot.
123///
124/// # Errors
125///
126/// Returns `ConnectorError::ReadError` on scan failure.
127pub async fn scan_table(
128    table: &Table,
129    snapshot_id: Option<i64>,
130    select_columns: &[String],
131) -> Result<Vec<RecordBatch>, ConnectorError> {
132    let mut scan_builder = table.scan();
133
134    if let Some(sid) = snapshot_id {
135        scan_builder = scan_builder.snapshot_id(sid);
136    }
137
138    if select_columns.is_empty() {
139        scan_builder = scan_builder.select_all();
140    } else {
141        scan_builder = scan_builder.select(select_columns.iter().map(String::as_str));
142    }
143
144    let scan = scan_builder
145        .build()
146        .map_err(|e| ConnectorError::ReadError(format!("build scan: {e}")))?;
147
148    let stream = scan
149        .to_arrow()
150        .await
151        .map_err(|e| ConnectorError::ReadError(format!("scan to arrow: {e}")))?;
152
153    let mut batches = Vec::new();
154    let mut stream = std::pin::pin!(stream);
155    while let Some(result) = stream.next().await {
156        let batch = result.map_err(|e| ConnectorError::ReadError(format!("read batch: {e}")))?;
157        batches.push(batch);
158    }
159
160    Ok(batches)
161}
162
163/// Returns the current snapshot ID of a table, if any.
164#[must_use]
165pub fn current_snapshot_id(table: &Table) -> Option<i64> {
166    table.metadata().current_snapshot().map(|s| s.snapshot_id())
167}
168
169/// Commits data files to an Iceberg table via a fast-append transaction.
170///
171/// When `epoch_metadata` is provided, also stores `writer_id` and `epoch`
172/// in table properties for exactly-once recovery. Both the data append
173/// and property update are committed atomically in a single transaction.
174///
175/// Returns the updated table with the new snapshot.
176///
177/// # Errors
178///
179/// Returns `ConnectorError::TransactionError` on commit failure.
180pub async fn commit_data_files(
181    table: &Table,
182    catalog: &dyn Catalog,
183    data_files: Vec<iceberg::spec::DataFile>,
184    epoch_metadata: Option<(&str, u64)>,
185) -> Result<Table, ConnectorError> {
186    let tx = Transaction::new(table);
187    let tx: Transaction = tx
188        .fast_append()
189        .add_data_files(data_files)
190        .apply(tx)
191        .map_err(|e| ConnectorError::TransactionError(format!("apply fast_append: {e}")))?;
192
193    // Chain epoch metadata into the same atomic transaction.
194    let tx = if let Some((writer_id, epoch)) = epoch_metadata {
195        let key = format!("laminardb.writer.{writer_id}.last_epoch");
196        tx.update_table_properties()
197            .set(key, epoch.to_string())
198            .apply(tx)
199            .map_err(|e| {
200                ConnectorError::TransactionError(format!("apply update_table_properties: {e}"))
201            })?
202    } else {
203        tx
204    };
205
206    tx.commit(catalog)
207        .await
208        .map_err(|e| ConnectorError::TransactionError(format!("commit: {e}")))
209}
210
211/// Reads the last committed epoch for a writer from Iceberg table properties.
212///
213/// Returns `None` if the writer has never committed to this table.
214#[must_use]
215pub fn get_last_committed_epoch(table: &Table, writer_id: &str) -> Option<u64> {
216    let key = format!("laminardb.writer.{writer_id}.last_epoch");
217    table.metadata().properties().get(&key)?.parse().ok()
218}
219
220/// Creates an Iceberg table (and namespace) if it does not already exist.
221///
222/// # Errors
223///
224/// Returns `ConnectorError` on creation failure.
225pub async fn ensure_table_exists(
226    catalog: &dyn Catalog,
227    namespace: &str,
228    table_name: &str,
229    arrow_schema: &arrow_schema::SchemaRef,
230) -> Result<(), ConnectorError> {
231    let ns = iceberg::NamespaceIdent::from_strs(namespace.split('.').collect::<Vec<_>>())
232        .map_err(|e| ConnectorError::ConfigurationError(format!("invalid namespace: {e}")))?;
233
234    let ident = TableIdent::new(ns.clone(), table_name.to_string());
235
236    if catalog
237        .table_exists(&ident)
238        .await
239        .map_err(|e| ConnectorError::ReadError(format!("table_exists: {e}")))?
240    {
241        return Ok(());
242    }
243
244    // Pipeline-derived Arrow schemas don't carry `PARQUET:field_id`
245    // metadata; let iceberg-rust assign sequential IDs.
246    let iceberg_schema = iceberg::arrow::arrow_schema_to_schema_auto_assign_ids(arrow_schema)
247        .map_err(|e| {
248            ConnectorError::SchemaMismatch(format!("arrow→iceberg schema conversion: {e}"))
249        })?;
250
251    if !catalog
252        .namespace_exists(&ns)
253        .await
254        .map_err(|e| ConnectorError::ReadError(format!("namespace_exists: {e}")))?
255    {
256        catalog
257            .create_namespace(&ns, HashMap::new())
258            .await
259            .map_err(|e| ConnectorError::WriteError(format!("create namespace: {e}")))?;
260    }
261
262    let creation = iceberg::TableCreation::builder()
263        .name(table_name.to_string())
264        .schema(iceberg_schema)
265        .build();
266
267    catalog
268        .create_table(&ns, creation)
269        .await
270        .map_err(|e| ConnectorError::WriteError(format!("create table: {e}")))?;
271
272    Ok(())
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn test_storage_factory_infers_s3_from_warehouse_url() {
281        let f = storage_factory("s3://bucket/warehouse", None).unwrap();
282        assert!(format!("{f:?}").contains("S3"));
283    }
284
285    #[test]
286    fn test_storage_factory_infers_s3a_from_warehouse_url() {
287        let f = storage_factory("s3a://bucket/warehouse", None).unwrap();
288        assert!(format!("{f:?}").contains("S3"));
289    }
290
291    #[test]
292    fn test_storage_factory_infers_fs_from_file_url() {
293        let f = storage_factory("file:///tmp/warehouse", None).unwrap();
294        assert!(format!("{f:?}").contains("Fs"));
295    }
296
297    #[test]
298    fn test_storage_factory_bare_path_requires_explicit_storage_type() {
299        // Trimmed `/` and `./` inference: REST catalogs use logical names
300        // and we don't want a silent default to local fs.
301        let err = storage_factory("/tmp/warehouse", None)
302            .unwrap_err()
303            .to_string();
304        assert!(err.contains("LDB-5100"), "got: {err}");
305    }
306
307    #[test]
308    fn test_storage_factory_explicit_overrides_inference() {
309        // Lakekeeper-style: warehouse is a name, storage backend is S3.
310        let f = storage_factory("demo", Some("s3")).unwrap();
311        assert!(format!("{f:?}").contains("S3"));
312    }
313
314    #[test]
315    fn test_storage_factory_unknown_warehouse_without_storage_type_errors() {
316        let err = storage_factory("demo", None).unwrap_err().to_string();
317        assert!(err.contains("LDB-5100"), "got: {err}");
318    }
319
320    #[test]
321    fn test_storage_factory_rejects_unknown_storage_type() {
322        let err = storage_factory("demo", Some("hdfs"))
323            .unwrap_err()
324            .to_string();
325        assert!(err.contains("LDB-5101"), "got: {err}");
326    }
327
328    #[test]
329    fn test_current_snapshot_id_empty_table() {
330        let schema = iceberg::spec::Schema::builder()
331            .with_fields(vec![])
332            .build()
333            .unwrap();
334
335        let creation = iceberg::TableCreation::builder()
336            .name("test_table".to_string())
337            .schema(schema)
338            .location("s3://test/location".to_string())
339            .build();
340
341        let metadata = iceberg::spec::TableMetadataBuilder::from_table_creation(creation)
342            .unwrap()
343            .build()
344            .unwrap();
345
346        let table = Table::builder()
347            .metadata(metadata.metadata)
348            .identifier(TableIdent::new(
349                iceberg::NamespaceIdent::new("test".to_string()),
350                "t".to_string(),
351            ))
352            .file_io(iceberg::io::FileIO::new_with_memory())
353            .build()
354            .unwrap();
355
356        assert!(current_snapshot_id(&table).is_none());
357    }
358}