Skip to main content

laminar_connectors/lakehouse/
iceberg.rs

1//! Apache Iceberg sink connector implementation.
2//!
3//! [`IcebergSink`] implements [`SinkConnector`], writing Arrow `RecordBatch`
4//! data to Iceberg tables with exactly-once semantics tied to checkpoint
5//! epochs via Iceberg's atomic transaction commits.
6//!
7//! `pre_commit()` moves the buffer into `staged_batches`.
8//! `commit_epoch()` writes Parquet data files and commits via Iceberg
9//! `Transaction::fast_append()`. `rollback_epoch()` discards staged data
10//! without side effects.
11
12use std::time::Duration;
13
14use arrow_array::RecordBatch;
15use arrow_schema::SchemaRef;
16use async_trait::async_trait;
17#[cfg(feature = "iceberg")]
18use tracing::info;
19use tracing::{debug, warn};
20
21use crate::config::{ConnectorConfig, ConnectorState};
22use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
23use crate::error::ConnectorError;
24
25use super::iceberg_config::IcebergSinkConfig;
26
27/// Apache Iceberg sink connector.
28///
29/// Buffers `RecordBatch` data during each checkpoint epoch and commits to
30/// an Iceberg table atomically when the epoch commits. Each epoch produces
31/// at most one Iceberg transaction.
32pub struct IcebergSink {
33    /// Sink configuration — reparsed from `ConnectorConfig` in `open()`.
34    config: IcebergSinkConfig,
35    /// Arrow schema for input batches.
36    schema: Option<SchemaRef>,
37    /// Connector lifecycle state.
38    state: ConnectorState,
39    /// Current epoch being written.
40    current_epoch: u64,
41    /// Last successfully committed epoch.
42    last_committed_epoch: u64,
43    /// `RecordBatch` buffer for the current epoch.
44    buffer: Vec<RecordBatch>,
45    /// Total rows buffered in current epoch.
46    buffered_rows: usize,
47    /// Staged batches ready for commit (populated by `pre_commit()`).
48    staged_batches: Vec<RecordBatch>,
49    /// Rows staged for commit.
50    staged_rows: usize,
51    /// Whether the current epoch was skipped (already committed).
52    epoch_skipped: bool,
53    /// Cached catalog connection (initialized in `open()`).
54    #[cfg(feature = "iceberg")]
55    catalog: Option<std::sync::Arc<dyn iceberg::Catalog>>,
56    /// Cached table handle (updated after each commit).
57    #[cfg(feature = "iceberg")]
58    table: Option<iceberg::table::Table>,
59    /// Arrow schema derived from the Iceberg table schema (carries
60    /// `PARQUET:field_id` metadata required by the Iceberg Parquet writer).
61    #[cfg(feature = "iceberg")]
62    iceberg_arrow_schema: Option<SchemaRef>,
63}
64
65impl IcebergSink {
66    /// Creates a new Iceberg sink with the given configuration.
67    #[must_use]
68    pub fn new(config: IcebergSinkConfig, _registry: Option<&prometheus::Registry>) -> Self {
69        Self {
70            config,
71            schema: None,
72            state: ConnectorState::Created,
73            current_epoch: 0,
74            last_committed_epoch: 0,
75            buffer: Vec::new(),
76            buffered_rows: 0,
77            staged_batches: Vec::new(),
78            staged_rows: 0,
79            epoch_skipped: false,
80            #[cfg(feature = "iceberg")]
81            catalog: None,
82            #[cfg(feature = "iceberg")]
83            table: None,
84            #[cfg(feature = "iceberg")]
85            iceberg_arrow_schema: None,
86        }
87    }
88
89    fn clear_buffer(&mut self) {
90        self.buffer.clear();
91        self.buffered_rows = 0;
92    }
93
94    fn clear_staged(&mut self) {
95        self.staged_batches.clear();
96        self.staged_rows = 0;
97    }
98
99    /// Reprojects a pipeline `RecordBatch` onto the Iceberg-derived Arrow
100    /// schema so that every field carries `PARQUET:field_id` metadata.
101    ///
102    /// Fast path: when the batch schema fields match the Iceberg schema
103    /// field-for-field (same names, types, count), just swap the schema
104    /// wrapper — columns are already in the right order.
105    ///
106    /// Slow path: match columns by name, cast where types differ (safe
107    /// widening validated in `open()`), fill nullable extras with nulls.
108    #[cfg(feature = "iceberg")]
109    fn align_batch_to_iceberg_schema(
110        &self,
111        batch: &RecordBatch,
112    ) -> Result<RecordBatch, ConnectorError> {
113        let target_schema =
114            self.iceberg_arrow_schema
115                .as_ref()
116                .ok_or_else(|| ConnectorError::InvalidState {
117                    expected: "open".into(),
118                    actual: "iceberg arrow schema not initialized".into(),
119                })?;
120
121        // Fast path: field names, types, and count match — only metadata differs.
122        // Avoids per-column name lookup and Vec construction.
123        let batch_schema = batch.schema();
124        if batch_schema.fields().len() == target_schema.fields().len()
125            && batch_schema
126                .fields()
127                .iter()
128                .zip(target_schema.fields().iter())
129                .all(|(a, b)| a.name() == b.name() && a.data_type() == b.data_type())
130        {
131            return RecordBatch::try_new(target_schema.clone(), batch.columns().to_vec()).map_err(
132                |e| ConnectorError::WriteError(format!("align batch to iceberg schema: {e}")),
133            );
134        }
135
136        // Slow path: column reordering, type casting, or null-filling needed.
137        let mut columns = Vec::with_capacity(target_schema.fields().len());
138
139        for field in target_schema.fields() {
140            if let Ok(col_idx) = batch_schema.index_of(field.name()) {
141                let col = batch.column(col_idx);
142                if col.data_type() == field.data_type() {
143                    columns.push(col.clone());
144                } else {
145                    columns.push(arrow_cast::cast(col, field.data_type()).map_err(|e| {
146                        ConnectorError::WriteError(format!(
147                            "cast field '{}' from {} to {}: {e}",
148                            field.name(),
149                            col.data_type(),
150                            field.data_type(),
151                        ))
152                    })?);
153                }
154            } else if field.is_nullable() {
155                // Nullable Iceberg column not in pipeline — fill with nulls.
156                columns.push(arrow_array::new_null_array(
157                    field.data_type(),
158                    batch.num_rows(),
159                ));
160            } else {
161                return Err(ConnectorError::SchemaMismatch(format!(
162                    "Iceberg column '{}' is NOT NULL but missing from pipeline",
163                    field.name(),
164                )));
165            }
166        }
167
168        // Detect batch columns that would be silently dropped — every field
169        // in the source batch must map to a field in the target schema.
170        for field in batch_schema.fields() {
171            if target_schema.field_with_name(field.name()).is_err() {
172                return Err(ConnectorError::SchemaMismatch(format!(
173                    "pipeline column '{}' has no matching field in Iceberg table schema \
174                     (schema evolved since open?)",
175                    field.name(),
176                )));
177            }
178        }
179
180        RecordBatch::try_new(target_schema.clone(), columns)
181            .map_err(|e| ConnectorError::WriteError(format!("align batch to iceberg schema: {e}")))
182    }
183
184    /// Parses compression config string to parquet Compression.
185    #[cfg(feature = "iceberg")]
186    fn parquet_compression(name: &str) -> parquet::basic::Compression {
187        match name.to_lowercase().as_str() {
188            "snappy" => parquet::basic::Compression::SNAPPY,
189            "none" | "uncompressed" => parquet::basic::Compression::UNCOMPRESSED,
190            "lz4" => parquet::basic::Compression::LZ4,
191            // Default to zstd(3) for anything else including "zstd".
192            _ => parquet::basic::Compression::ZSTD(
193                parquet::basic::ZstdLevel::try_new(3).unwrap_or_default(),
194            ),
195        }
196    }
197
198    /// Checks that every pipeline field still exists in the refreshed
199    /// Iceberg Arrow schema. Returns `SchemaMismatch` on drift.
200    #[cfg(feature = "iceberg")]
201    fn validate_schema_not_drifted(&self) -> Result<(), ConnectorError> {
202        if let (Some(pipeline_schema), Some(target_schema)) =
203            (&self.schema, &self.iceberg_arrow_schema)
204        {
205            for field in pipeline_schema.fields() {
206                if target_schema.field_with_name(field.name()).is_err() {
207                    return Err(ConnectorError::SchemaMismatch(format!(
208                        "pipeline field '{}' no longer exists in Iceberg table schema \
209                         (concurrent schema evolution?)",
210                        field.name(),
211                    )));
212                }
213            }
214        }
215        Ok(())
216    }
217
218    /// Writes staged batches to Iceberg as data files and commits.
219    #[cfg(feature = "iceberg")]
220    async fn commit_to_iceberg(&mut self) -> Result<(), ConnectorError> {
221        use iceberg::writer::file_writer::{FileWriter, FileWriterBuilder, ParquetWriterBuilder};
222
223        let catalog = self
224            .catalog
225            .as_ref()
226            .ok_or_else(|| ConnectorError::InvalidState {
227                expected: "open".into(),
228                actual: "catalog not initialized".into(),
229            })?;
230        let table = self
231            .table
232            .as_ref()
233            .ok_or_else(|| ConnectorError::InvalidState {
234                expected: "open".into(),
235                actual: "table not loaded".into(),
236            })?;
237
238        let file_io = table.file_io().clone();
239        let location = table.metadata().location().to_string();
240        let schema = table.current_schema_ref();
241
242        self.validate_schema_not_drifted()?;
243
244        let props = parquet::file::properties::WriterProperties::builder()
245            .set_compression(Self::parquet_compression(&self.config.compression))
246            .build();
247        let writer_builder = ParquetWriterBuilder::new(props, schema);
248
249        let mut all_data_files = Vec::new();
250
251        for (idx, batch) in self.staged_batches.iter().enumerate() {
252            if batch.num_rows() == 0 {
253                continue;
254            }
255
256            // Reproject the batch onto the Iceberg-derived Arrow schema so
257            // that every field carries PARQUET:field_id metadata. Without
258            // this the writer cannot correlate Arrow fields with Iceberg
259            // field IDs and fails with "Field id N not found in struct array".
260            let aligned = self.align_batch_to_iceberg_schema(batch)?;
261
262            let file_path = format!(
263                "{location}/data/{}-{}-{}-{idx}.parquet",
264                self.config.writer_id,
265                self.current_epoch,
266                uuid::Uuid::new_v4(),
267            );
268
269            let output_file = file_io
270                .new_output(&file_path)
271                .map_err(|e| ConnectorError::WriteError(format!("create output: {e}")))?;
272
273            let mut writer = writer_builder
274                .clone()
275                .build(output_file)
276                .await
277                .map_err(|e| ConnectorError::WriteError(format!("build parquet writer: {e}")))?;
278
279            writer
280                .write(&aligned)
281                .await
282                .map_err(|e| ConnectorError::WriteError(format!("parquet write: {e}")))?;
283
284            let data_file_builders = writer
285                .close()
286                .await
287                .map_err(|e| ConnectorError::WriteError(format!("close parquet writer: {e}")))?;
288
289            for dfb in data_file_builders {
290                let data_file = dfb
291                    .build()
292                    .map_err(|e| ConnectorError::WriteError(format!("data file build: {e}")))?;
293                all_data_files.push(data_file);
294            }
295        }
296
297        if all_data_files.is_empty() {
298            debug!(epoch = self.current_epoch, "no data files to commit");
299            return Ok(());
300        }
301
302        let updated_table = super::iceberg_io::commit_data_files(
303            table,
304            catalog.as_ref(),
305            all_data_files,
306            Some((&self.config.writer_id, self.current_epoch)),
307        )
308        .await?;
309
310        // The Iceberg commit is durable at this point. Update table handle
311        // unconditionally so the next commit sees the new snapshot.
312        self.table = Some(updated_table);
313
314        // Best-effort schema refresh — keep the cached Arrow schema in sync
315        // with the table for the next epoch. A failure here must NOT prevent
316        // epoch advancement: if commit_to_iceberg() returned Err after the
317        // durable commit, commit_epoch() would skip last_committed_epoch
318        // update, and the next retry would duplicate the append.
319        let table = self.table.as_ref().expect("just set above");
320        let new_iceberg_schema = table.current_schema_ref();
321        match iceberg::arrow::schema_to_arrow_schema(&new_iceberg_schema) {
322            Ok(arrow_schema) => {
323                self.iceberg_arrow_schema = Some(std::sync::Arc::new(arrow_schema));
324            }
325            Err(e) => {
326                // Invalidate the cache so the next align_batch_to_iceberg_schema()
327                // fails with InvalidState rather than silently writing with stale
328                // field IDs that no longer match table.current_schema_ref().
329                self.iceberg_arrow_schema = None;
330                warn!(
331                    epoch = self.current_epoch,
332                    error = %e,
333                    "failed to refresh Iceberg Arrow schema; cache invalidated"
334                );
335            }
336        }
337
338        info!(
339            epoch = self.current_epoch,
340            rows = self.staged_rows,
341            "iceberg commit succeeded"
342        );
343
344        Ok(())
345    }
346}
347
348#[async_trait]
349impl SinkConnector for IcebergSink {
350    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
351        // Re-parse config from the runtime ConnectorConfig (not factory defaults).
352        if !config.properties().is_empty() {
353            self.config = IcebergSinkConfig::from_config(config)?;
354        }
355
356        #[cfg(feature = "iceberg")]
357        {
358            let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
359            let ns = &self.config.catalog.namespace;
360            let tbl = &self.config.catalog.table_name;
361
362            if self.config.auto_create {
363                if let Some(schema) = config.arrow_schema() {
364                    super::iceberg_io::ensure_table_exists(catalog.as_ref(), ns, tbl, &schema)
365                        .await?;
366                }
367            }
368
369            let table = super::iceberg_io::load_table(catalog.as_ref(), ns, tbl).await?;
370
371            // Always derive the canonical schema from the Iceberg table.
372            let iceberg_schema = table.current_schema_ref();
373            let table_schema = std::sync::Arc::new(
374                iceberg::arrow::schema_to_arrow_schema(&iceberg_schema).map_err(|e| {
375                    ConnectorError::SchemaMismatch(format!("iceberg→arrow schema: {e}"))
376                })?,
377            );
378
379            // Store the Iceberg-derived Arrow schema (with PARQUET:field_id
380            // metadata) for use during Parquet writes.
381            self.iceberg_arrow_schema = Some(table_schema.clone());
382
383            if self.schema.is_none() {
384                self.schema = Some(table_schema.clone());
385            }
386
387            // Recover last committed epoch from table properties.
388            if let Some(epoch) =
389                super::iceberg_io::get_last_committed_epoch(&table, &self.config.writer_id)
390            {
391                self.last_committed_epoch = epoch;
392                info!(writer_id = %self.config.writer_id, epoch, "recovered last committed epoch");
393            }
394
395            // Validate pipeline schema against table schema, then use the
396            // pipeline schema as self.schema (it's what write_batch receives).
397            if let Some(pipeline_schema) = config.arrow_schema() {
398                super::iceberg_config::validate_sink_schema(&pipeline_schema, &table_schema)?;
399                self.schema = Some(pipeline_schema);
400            }
401
402            self.catalog = Some(catalog);
403            self.table = Some(table);
404            self.state = ConnectorState::Running;
405
406            info!(table = tbl, namespace = ns, "iceberg sink connected");
407            return Ok(());
408        }
409
410        #[cfg(not(feature = "iceberg"))]
411        {
412            self.state = ConnectorState::Failed;
413            Err(ConnectorError::ConfigurationError(
414                "Apache Iceberg requires the 'iceberg' feature".into(),
415            ))
416        }
417    }
418
419    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
420        if batch.num_rows() == 0 || self.epoch_skipped {
421            return Ok(WriteResult::new(0, 0));
422        }
423
424        if self.schema.is_none() {
425            self.schema = Some(batch.schema());
426        }
427
428        let rows = batch.num_rows();
429        self.buffer.push(batch.clone());
430        self.buffered_rows += rows;
431
432        Ok(WriteResult::new(rows, 0))
433    }
434
435    fn schema(&self) -> SchemaRef {
436        self.schema
437            .clone()
438            .unwrap_or_else(|| std::sync::Arc::new(arrow_schema::Schema::empty()))
439    }
440
441    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
442        self.current_epoch = epoch;
443        self.epoch_skipped = false;
444        self.clear_buffer();
445        self.clear_staged();
446
447        if epoch > 0 && epoch <= self.last_committed_epoch {
448            debug!(
449                epoch,
450                last = self.last_committed_epoch,
451                "epoch already committed, skipping"
452            );
453            self.epoch_skipped = true;
454        }
455
456        Ok(())
457    }
458
459    async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
460        if self.epoch_skipped {
461            return Ok(());
462        }
463
464        std::mem::swap(&mut self.staged_batches, &mut self.buffer);
465        self.staged_rows = self.buffered_rows;
466        self.clear_buffer();
467
468        Ok(())
469    }
470
471    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
472        if self.epoch_skipped || self.staged_rows == 0 {
473            self.clear_staged();
474            return Ok(());
475        }
476
477        #[cfg(feature = "iceberg")]
478        {
479            self.commit_to_iceberg().await?;
480        }
481
482        self.last_committed_epoch = epoch;
483        self.clear_staged();
484        Ok(())
485    }
486
487    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
488        warn!(epoch, "iceberg rollback: discarding staged data");
489        self.clear_buffer();
490        self.clear_staged();
491        self.epoch_skipped = false;
492        Ok(())
493    }
494
495    fn capabilities(&self) -> SinkConnectorCapabilities {
496        // Iceberg catalog writes can be slow under contention.
497        SinkConnectorCapabilities::new(Duration::from_secs(300))
498            .with_exactly_once()
499            .with_two_phase_commit()
500    }
501
502    async fn close(&mut self) -> Result<(), ConnectorError> {
503        #[cfg(feature = "iceberg")]
504        {
505            self.catalog = None;
506            self.table = None;
507            self.iceberg_arrow_schema = None;
508        }
509        self.state = ConnectorState::Closed;
510        Ok(())
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517    use arrow_array::Int64Array;
518    use arrow_schema::{DataType, Field, Schema};
519    use std::sync::Arc;
520
521    fn test_schema() -> SchemaRef {
522        Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
523    }
524
525    fn test_batch(n: usize) -> RecordBatch {
526        let ids: Vec<i64> = (0..n as i64).collect();
527        RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
528    }
529
530    fn test_config() -> IcebergSinkConfig {
531        let mut config = ConnectorConfig::new("iceberg");
532        config.set("catalog.uri", "http://localhost:8181");
533        config.set("warehouse", "s3://test/wh");
534        config.set("namespace", "test");
535        config.set("table.name", "events");
536        IcebergSinkConfig::from_config(&config).unwrap()
537    }
538
539    #[test]
540    fn test_new_sink() {
541        let sink = IcebergSink::new(test_config(), None);
542        assert!(sink.schema.is_none());
543        assert_eq!(sink.current_epoch, 0);
544        assert_eq!(sink.buffered_rows, 0);
545    }
546
547    #[tokio::test]
548    async fn test_write_buffers_batches() {
549        let mut sink = IcebergSink::new(test_config(), None);
550        sink.begin_epoch(1).await.unwrap();
551
552        let result = sink.write_batch(&test_batch(100)).await.unwrap();
553        assert_eq!(result.records_written, 100);
554        assert_eq!(sink.buffered_rows, 100);
555        assert_eq!(sink.buffer.len(), 1);
556
557        let result = sink.write_batch(&test_batch(50)).await.unwrap();
558        assert_eq!(result.records_written, 50);
559        assert_eq!(sink.buffered_rows, 150);
560        assert_eq!(sink.buffer.len(), 2);
561    }
562
563    #[tokio::test]
564    async fn test_pre_commit_stages_buffer() {
565        let mut sink = IcebergSink::new(test_config(), None);
566        sink.begin_epoch(1).await.unwrap();
567        sink.write_batch(&test_batch(100)).await.unwrap();
568
569        sink.pre_commit(1).await.unwrap();
570        assert_eq!(sink.staged_rows, 100);
571        assert_eq!(sink.staged_batches.len(), 1);
572        assert!(sink.buffer.is_empty());
573        assert_eq!(sink.buffered_rows, 0);
574    }
575
576    #[tokio::test]
577    async fn test_rollback_clears_staged() {
578        let mut sink = IcebergSink::new(test_config(), None);
579        sink.begin_epoch(1).await.unwrap();
580        sink.write_batch(&test_batch(100)).await.unwrap();
581        sink.pre_commit(1).await.unwrap();
582
583        sink.rollback_epoch(1).await.unwrap();
584        assert!(sink.staged_batches.is_empty());
585        assert_eq!(sink.staged_rows, 0);
586        assert!(sink.buffer.is_empty());
587    }
588
589    #[tokio::test]
590    async fn test_epoch_skip_when_already_committed() {
591        let mut sink = IcebergSink::new(test_config(), None);
592        sink.last_committed_epoch = 5;
593
594        sink.begin_epoch(3).await.unwrap();
595        assert!(sink.epoch_skipped);
596
597        let result = sink.write_batch(&test_batch(100)).await.unwrap();
598        assert_eq!(result.records_written, 0);
599    }
600
601    #[tokio::test]
602    async fn test_empty_epoch_commit() {
603        let mut sink = IcebergSink::new(test_config(), None);
604        sink.begin_epoch(1).await.unwrap();
605        sink.pre_commit(1).await.unwrap();
606        sink.commit_epoch(1).await.unwrap();
607    }
608
609    #[test]
610    fn test_capabilities() {
611        let sink = IcebergSink::new(test_config(), None);
612        let caps = sink.capabilities();
613        assert!(caps.exactly_once);
614        assert!(caps.two_phase_commit);
615        assert!(!caps.partitioned);
616        assert!(!caps.upsert);
617    }
618}