Skip to main content

laminar_connectors/lakehouse/
iceberg.rs

1//! Apache Iceberg sink connector implementation.
2//!
3//! [`IcebergSink`] implements [`SinkConnector`], writing Arrow `RecordBatch`
4//! data to Iceberg tables with exactly-once semantics tied to checkpoint
5//! epochs via Iceberg's atomic transaction commits.
6//!
7//! `pre_commit()` moves the buffer into `staged_batches`.
8//! `commit_epoch()` writes Parquet data files and commits via Iceberg
9//! `Transaction::fast_append()`. `rollback_epoch()` discards staged data
10//! without side effects.
11
12use std::time::Duration;
13
14use arrow_array::RecordBatch;
15use arrow_schema::SchemaRef;
16use async_trait::async_trait;
17#[cfg(feature = "iceberg")]
18use tracing::info;
19use tracing::{debug, warn};
20
21use crate::config::{ConnectorConfig, ConnectorState};
22use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
23use crate::error::ConnectorError;
24use crate::health::HealthStatus;
25use crate::metrics::ConnectorMetrics;
26
27use super::iceberg_config::IcebergSinkConfig;
28
29/// Apache Iceberg sink connector.
30///
31/// Buffers `RecordBatch` data during each checkpoint epoch and commits to
32/// an Iceberg table atomically when the epoch commits. Each epoch produces
33/// at most one Iceberg transaction.
34pub struct IcebergSink {
35    /// Sink configuration — reparsed from `ConnectorConfig` in `open()`.
36    config: IcebergSinkConfig,
37    /// Arrow schema for input batches.
38    schema: Option<SchemaRef>,
39    /// Connector lifecycle state.
40    state: ConnectorState,
41    /// Current epoch being written.
42    current_epoch: u64,
43    /// Last successfully committed epoch.
44    last_committed_epoch: u64,
45    /// `RecordBatch` buffer for the current epoch.
46    buffer: Vec<RecordBatch>,
47    /// Total rows buffered in current epoch.
48    buffered_rows: usize,
49    /// Staged batches ready for commit (populated by `pre_commit()`).
50    staged_batches: Vec<RecordBatch>,
51    /// Rows staged for commit.
52    staged_rows: usize,
53    /// Whether the current epoch was skipped (already committed).
54    epoch_skipped: bool,
55    /// Cached catalog connection (initialized in `open()`).
56    #[cfg(feature = "iceberg")]
57    catalog: Option<std::sync::Arc<dyn iceberg::Catalog>>,
58    /// Cached table handle (updated after each commit).
59    #[cfg(feature = "iceberg")]
60    table: Option<iceberg::table::Table>,
61    /// Arrow schema derived from the Iceberg table schema (carries
62    /// `PARQUET:field_id` metadata required by the Iceberg Parquet writer).
63    #[cfg(feature = "iceberg")]
64    iceberg_arrow_schema: Option<SchemaRef>,
65}
66
67impl IcebergSink {
68    /// Creates a new Iceberg sink with the given configuration.
69    #[must_use]
70    pub fn new(config: IcebergSinkConfig, _registry: Option<&prometheus::Registry>) -> Self {
71        Self {
72            config,
73            schema: None,
74            state: ConnectorState::Created,
75            current_epoch: 0,
76            last_committed_epoch: 0,
77            buffer: Vec::new(),
78            buffered_rows: 0,
79            staged_batches: Vec::new(),
80            staged_rows: 0,
81            epoch_skipped: false,
82            #[cfg(feature = "iceberg")]
83            catalog: None,
84            #[cfg(feature = "iceberg")]
85            table: None,
86            #[cfg(feature = "iceberg")]
87            iceberg_arrow_schema: None,
88        }
89    }
90
91    fn clear_buffer(&mut self) {
92        self.buffer.clear();
93        self.buffered_rows = 0;
94    }
95
96    fn clear_staged(&mut self) {
97        self.staged_batches.clear();
98        self.staged_rows = 0;
99    }
100
101    /// Reprojects a pipeline `RecordBatch` onto the Iceberg-derived Arrow
102    /// schema so that every field carries `PARQUET:field_id` metadata.
103    ///
104    /// Fast path: when the batch schema fields match the Iceberg schema
105    /// field-for-field (same names, types, count), just swap the schema
106    /// wrapper — columns are already in the right order.
107    ///
108    /// Slow path: match columns by name, cast where types differ (safe
109    /// widening validated in `open()`), fill nullable extras with nulls.
110    #[cfg(feature = "iceberg")]
111    fn align_batch_to_iceberg_schema(
112        &self,
113        batch: &RecordBatch,
114    ) -> Result<RecordBatch, ConnectorError> {
115        let target_schema =
116            self.iceberg_arrow_schema
117                .as_ref()
118                .ok_or_else(|| ConnectorError::InvalidState {
119                    expected: "open".into(),
120                    actual: "iceberg arrow schema not initialized".into(),
121                })?;
122
123        // Fast path: field names, types, and count match — only metadata differs.
124        // Avoids per-column name lookup and Vec construction.
125        let batch_schema = batch.schema();
126        if batch_schema.fields().len() == target_schema.fields().len()
127            && batch_schema
128                .fields()
129                .iter()
130                .zip(target_schema.fields().iter())
131                .all(|(a, b)| a.name() == b.name() && a.data_type() == b.data_type())
132        {
133            return RecordBatch::try_new(target_schema.clone(), batch.columns().to_vec()).map_err(
134                |e| ConnectorError::WriteError(format!("align batch to iceberg schema: {e}")),
135            );
136        }
137
138        // Slow path: column reordering, type casting, or null-filling needed.
139        let mut columns = Vec::with_capacity(target_schema.fields().len());
140
141        for field in target_schema.fields() {
142            if let Ok(col_idx) = batch_schema.index_of(field.name()) {
143                let col = batch.column(col_idx);
144                if col.data_type() == field.data_type() {
145                    columns.push(col.clone());
146                } else {
147                    columns.push(arrow_cast::cast(col, field.data_type()).map_err(|e| {
148                        ConnectorError::WriteError(format!(
149                            "cast field '{}' from {} to {}: {e}",
150                            field.name(),
151                            col.data_type(),
152                            field.data_type(),
153                        ))
154                    })?);
155                }
156            } else if field.is_nullable() {
157                // Nullable Iceberg column not in pipeline — fill with nulls.
158                columns.push(arrow_array::new_null_array(
159                    field.data_type(),
160                    batch.num_rows(),
161                ));
162            } else {
163                return Err(ConnectorError::SchemaMismatch(format!(
164                    "Iceberg column '{}' is NOT NULL but missing from pipeline",
165                    field.name(),
166                )));
167            }
168        }
169
170        // Detect batch columns that would be silently dropped — every field
171        // in the source batch must map to a field in the target schema.
172        for field in batch_schema.fields() {
173            if target_schema.field_with_name(field.name()).is_err() {
174                return Err(ConnectorError::SchemaMismatch(format!(
175                    "pipeline column '{}' has no matching field in Iceberg table schema \
176                     (schema evolved since open?)",
177                    field.name(),
178                )));
179            }
180        }
181
182        RecordBatch::try_new(target_schema.clone(), columns)
183            .map_err(|e| ConnectorError::WriteError(format!("align batch to iceberg schema: {e}")))
184    }
185
186    /// Parses compression config string to parquet Compression.
187    #[cfg(feature = "iceberg")]
188    fn parquet_compression(name: &str) -> parquet::basic::Compression {
189        match name.to_lowercase().as_str() {
190            "snappy" => parquet::basic::Compression::SNAPPY,
191            "none" | "uncompressed" => parquet::basic::Compression::UNCOMPRESSED,
192            "lz4" => parquet::basic::Compression::LZ4,
193            // Default to zstd(3) for anything else including "zstd".
194            _ => parquet::basic::Compression::ZSTD(
195                parquet::basic::ZstdLevel::try_new(3).unwrap_or_default(),
196            ),
197        }
198    }
199
200    /// Checks that every pipeline field still exists in the refreshed
201    /// Iceberg Arrow schema. Returns `SchemaMismatch` on drift.
202    #[cfg(feature = "iceberg")]
203    fn validate_schema_not_drifted(&self) -> Result<(), ConnectorError> {
204        if let (Some(pipeline_schema), Some(target_schema)) =
205            (&self.schema, &self.iceberg_arrow_schema)
206        {
207            for field in pipeline_schema.fields() {
208                if target_schema.field_with_name(field.name()).is_err() {
209                    return Err(ConnectorError::SchemaMismatch(format!(
210                        "pipeline field '{}' no longer exists in Iceberg table schema \
211                         (concurrent schema evolution?)",
212                        field.name(),
213                    )));
214                }
215            }
216        }
217        Ok(())
218    }
219
220    /// Writes staged batches to Iceberg as data files and commits.
221    #[cfg(feature = "iceberg")]
222    async fn commit_to_iceberg(&mut self) -> Result<(), ConnectorError> {
223        use iceberg::writer::file_writer::{FileWriter, FileWriterBuilder, ParquetWriterBuilder};
224
225        let catalog = self
226            .catalog
227            .as_ref()
228            .ok_or_else(|| ConnectorError::InvalidState {
229                expected: "open".into(),
230                actual: "catalog not initialized".into(),
231            })?;
232        let table = self
233            .table
234            .as_ref()
235            .ok_or_else(|| ConnectorError::InvalidState {
236                expected: "open".into(),
237                actual: "table not loaded".into(),
238            })?;
239
240        let file_io = table.file_io().clone();
241        let location = table.metadata().location().to_string();
242        let schema = table.current_schema_ref();
243
244        self.validate_schema_not_drifted()?;
245
246        let props = parquet::file::properties::WriterProperties::builder()
247            .set_compression(Self::parquet_compression(&self.config.compression))
248            .build();
249        let writer_builder = ParquetWriterBuilder::new(props, schema);
250
251        let mut all_data_files = Vec::new();
252
253        for (idx, batch) in self.staged_batches.iter().enumerate() {
254            if batch.num_rows() == 0 {
255                continue;
256            }
257
258            // Reproject the batch onto the Iceberg-derived Arrow schema so
259            // that every field carries PARQUET:field_id metadata. Without
260            // this the writer cannot correlate Arrow fields with Iceberg
261            // field IDs and fails with "Field id N not found in struct array".
262            let aligned = self.align_batch_to_iceberg_schema(batch)?;
263
264            let file_path = format!(
265                "{location}/data/{}-{}-{idx}.parquet",
266                self.config.writer_id, self.current_epoch,
267            );
268
269            let output_file = file_io
270                .new_output(&file_path)
271                .map_err(|e| ConnectorError::WriteError(format!("create output: {e}")))?;
272
273            let mut writer = writer_builder
274                .clone()
275                .build(output_file)
276                .await
277                .map_err(|e| ConnectorError::WriteError(format!("build parquet writer: {e}")))?;
278
279            writer
280                .write(&aligned)
281                .await
282                .map_err(|e| ConnectorError::WriteError(format!("parquet write: {e}")))?;
283
284            let data_file_builders = writer
285                .close()
286                .await
287                .map_err(|e| ConnectorError::WriteError(format!("close parquet writer: {e}")))?;
288
289            for dfb in data_file_builders {
290                let data_file = dfb
291                    .build()
292                    .map_err(|e| ConnectorError::WriteError(format!("data file build: {e}")))?;
293                all_data_files.push(data_file);
294            }
295        }
296
297        if all_data_files.is_empty() {
298            debug!(epoch = self.current_epoch, "no data files to commit");
299            return Ok(());
300        }
301
302        let updated_table = super::iceberg_io::commit_data_files(
303            table,
304            catalog.as_ref(),
305            all_data_files,
306            Some((&self.config.writer_id, self.current_epoch)),
307        )
308        .await?;
309
310        // The Iceberg commit is durable at this point. Update table handle
311        // unconditionally so the next commit sees the new snapshot.
312        self.table = Some(updated_table);
313
314        // Best-effort schema refresh — keep the cached Arrow schema in sync
315        // with the table for the next epoch. A failure here must NOT prevent
316        // epoch advancement: if commit_to_iceberg() returned Err after the
317        // durable commit, commit_epoch() would skip last_committed_epoch
318        // update, and the next retry would duplicate the append.
319        let table = self.table.as_ref().expect("just set above");
320        let new_iceberg_schema = table.current_schema_ref();
321        match iceberg::arrow::schema_to_arrow_schema(&new_iceberg_schema) {
322            Ok(arrow_schema) => {
323                self.iceberg_arrow_schema = Some(std::sync::Arc::new(arrow_schema));
324            }
325            Err(e) => {
326                // Invalidate the cache so the next align_batch_to_iceberg_schema()
327                // fails with InvalidState rather than silently writing with stale
328                // field IDs that no longer match table.current_schema_ref().
329                self.iceberg_arrow_schema = None;
330                warn!(
331                    epoch = self.current_epoch,
332                    error = %e,
333                    "failed to refresh Iceberg Arrow schema; cache invalidated"
334                );
335            }
336        }
337
338        info!(
339            epoch = self.current_epoch,
340            rows = self.staged_rows,
341            "iceberg commit succeeded"
342        );
343
344        Ok(())
345    }
346}
347
348#[async_trait]
349impl SinkConnector for IcebergSink {
350    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
351        // Re-parse config from the runtime ConnectorConfig (not factory defaults).
352        if !config.properties().is_empty() {
353            self.config = IcebergSinkConfig::from_config(config)?;
354        }
355
356        #[cfg(feature = "iceberg")]
357        {
358            let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
359            let ns = &self.config.catalog.namespace;
360            let tbl = &self.config.catalog.table_name;
361
362            if self.config.auto_create {
363                if let Some(schema) = config.arrow_schema() {
364                    super::iceberg_io::ensure_table_exists(catalog.as_ref(), ns, tbl, &schema)
365                        .await?;
366                }
367            }
368
369            let table = super::iceberg_io::load_table(catalog.as_ref(), ns, tbl).await?;
370
371            // Always derive the canonical schema from the Iceberg table.
372            let iceberg_schema = table.current_schema_ref();
373            let table_schema = std::sync::Arc::new(
374                iceberg::arrow::schema_to_arrow_schema(&iceberg_schema).map_err(|e| {
375                    ConnectorError::SchemaMismatch(format!("iceberg→arrow schema: {e}"))
376                })?,
377            );
378
379            // Store the Iceberg-derived Arrow schema (with PARQUET:field_id
380            // metadata) for use during Parquet writes.
381            self.iceberg_arrow_schema = Some(table_schema.clone());
382
383            if self.schema.is_none() {
384                self.schema = Some(table_schema.clone());
385            }
386
387            // Recover last committed epoch from table properties.
388            if let Some(epoch) =
389                super::iceberg_io::get_last_committed_epoch(&table, &self.config.writer_id)
390            {
391                self.last_committed_epoch = epoch;
392                info!(writer_id = %self.config.writer_id, epoch, "recovered last committed epoch");
393            }
394
395            // Validate pipeline schema against table schema, then use the
396            // pipeline schema as self.schema (it's what write_batch receives).
397            if let Some(pipeline_schema) = config.arrow_schema() {
398                super::iceberg_config::validate_sink_schema(&pipeline_schema, &table_schema)?;
399                self.schema = Some(pipeline_schema);
400            }
401
402            self.catalog = Some(catalog);
403            self.table = Some(table);
404            self.state = ConnectorState::Running;
405
406            info!(table = tbl, namespace = ns, "iceberg sink connected");
407            return Ok(());
408        }
409
410        #[cfg(not(feature = "iceberg"))]
411        {
412            self.state = ConnectorState::Failed;
413            Err(ConnectorError::ConfigurationError(
414                "Apache Iceberg requires the 'iceberg' feature".into(),
415            ))
416        }
417    }
418
419    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
420        if batch.num_rows() == 0 || self.epoch_skipped {
421            return Ok(WriteResult::new(0, 0));
422        }
423
424        if self.schema.is_none() {
425            self.schema = Some(batch.schema());
426        }
427
428        let rows = batch.num_rows();
429        self.buffer.push(batch.clone());
430        self.buffered_rows += rows;
431
432        Ok(WriteResult::new(rows, 0))
433    }
434
435    fn schema(&self) -> SchemaRef {
436        self.schema
437            .clone()
438            .unwrap_or_else(|| std::sync::Arc::new(arrow_schema::Schema::empty()))
439    }
440
441    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
442        self.current_epoch = epoch;
443        self.epoch_skipped = false;
444        self.clear_buffer();
445        self.clear_staged();
446
447        if epoch > 0 && epoch <= self.last_committed_epoch {
448            debug!(
449                epoch,
450                last = self.last_committed_epoch,
451                "epoch already committed, skipping"
452            );
453            self.epoch_skipped = true;
454        }
455
456        Ok(())
457    }
458
459    async fn pre_commit(&mut self, _epoch: u64) -> Result<(), ConnectorError> {
460        if self.epoch_skipped {
461            return Ok(());
462        }
463
464        std::mem::swap(&mut self.staged_batches, &mut self.buffer);
465        self.staged_rows = self.buffered_rows;
466        self.clear_buffer();
467
468        Ok(())
469    }
470
471    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
472        if self.epoch_skipped || self.staged_rows == 0 {
473            self.clear_staged();
474            return Ok(());
475        }
476
477        #[cfg(feature = "iceberg")]
478        {
479            self.commit_to_iceberg().await?;
480        }
481
482        self.last_committed_epoch = epoch;
483        self.clear_staged();
484        Ok(())
485    }
486
487    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
488        warn!(epoch, "iceberg rollback: discarding staged data");
489        self.clear_buffer();
490        self.clear_staged();
491        self.epoch_skipped = false;
492        Ok(())
493    }
494
495    fn health_check(&self) -> HealthStatus {
496        match self.state {
497            ConnectorState::Running => HealthStatus::Healthy,
498            ConnectorState::Failed => HealthStatus::Unhealthy("sink failed".into()),
499            _ => HealthStatus::Unknown,
500        }
501    }
502
503    fn metrics(&self) -> ConnectorMetrics {
504        ConnectorMetrics::default()
505    }
506
507    fn capabilities(&self) -> SinkConnectorCapabilities {
508        // Iceberg catalog writes can be slow under contention.
509        SinkConnectorCapabilities::new(Duration::from_secs(300))
510            .with_exactly_once()
511            .with_two_phase_commit()
512    }
513
514    async fn close(&mut self) -> Result<(), ConnectorError> {
515        #[cfg(feature = "iceberg")]
516        {
517            self.catalog = None;
518            self.table = None;
519            self.iceberg_arrow_schema = None;
520        }
521        self.state = ConnectorState::Closed;
522        Ok(())
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529    use arrow_array::Int64Array;
530    use arrow_schema::{DataType, Field, Schema};
531    use std::sync::Arc;
532
533    fn test_schema() -> SchemaRef {
534        Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]))
535    }
536
537    fn test_batch(n: usize) -> RecordBatch {
538        let ids: Vec<i64> = (0..n as i64).collect();
539        RecordBatch::try_new(test_schema(), vec![Arc::new(Int64Array::from(ids))]).unwrap()
540    }
541
542    fn test_config() -> IcebergSinkConfig {
543        let mut config = ConnectorConfig::new("iceberg");
544        config.set("catalog.uri", "http://localhost:8181");
545        config.set("warehouse", "s3://test/wh");
546        config.set("namespace", "test");
547        config.set("table.name", "events");
548        IcebergSinkConfig::from_config(&config).unwrap()
549    }
550
551    #[test]
552    fn test_new_sink() {
553        let sink = IcebergSink::new(test_config(), None);
554        assert!(sink.schema.is_none());
555        assert_eq!(sink.current_epoch, 0);
556        assert_eq!(sink.buffered_rows, 0);
557    }
558
559    #[tokio::test]
560    async fn test_write_buffers_batches() {
561        let mut sink = IcebergSink::new(test_config(), None);
562        sink.begin_epoch(1).await.unwrap();
563
564        let result = sink.write_batch(&test_batch(100)).await.unwrap();
565        assert_eq!(result.records_written, 100);
566        assert_eq!(sink.buffered_rows, 100);
567        assert_eq!(sink.buffer.len(), 1);
568
569        let result = sink.write_batch(&test_batch(50)).await.unwrap();
570        assert_eq!(result.records_written, 50);
571        assert_eq!(sink.buffered_rows, 150);
572        assert_eq!(sink.buffer.len(), 2);
573    }
574
575    #[tokio::test]
576    async fn test_pre_commit_stages_buffer() {
577        let mut sink = IcebergSink::new(test_config(), None);
578        sink.begin_epoch(1).await.unwrap();
579        sink.write_batch(&test_batch(100)).await.unwrap();
580
581        sink.pre_commit(1).await.unwrap();
582        assert_eq!(sink.staged_rows, 100);
583        assert_eq!(sink.staged_batches.len(), 1);
584        assert!(sink.buffer.is_empty());
585        assert_eq!(sink.buffered_rows, 0);
586    }
587
588    #[tokio::test]
589    async fn test_rollback_clears_staged() {
590        let mut sink = IcebergSink::new(test_config(), None);
591        sink.begin_epoch(1).await.unwrap();
592        sink.write_batch(&test_batch(100)).await.unwrap();
593        sink.pre_commit(1).await.unwrap();
594
595        sink.rollback_epoch(1).await.unwrap();
596        assert!(sink.staged_batches.is_empty());
597        assert_eq!(sink.staged_rows, 0);
598        assert!(sink.buffer.is_empty());
599    }
600
601    #[tokio::test]
602    async fn test_epoch_skip_when_already_committed() {
603        let mut sink = IcebergSink::new(test_config(), None);
604        sink.last_committed_epoch = 5;
605
606        sink.begin_epoch(3).await.unwrap();
607        assert!(sink.epoch_skipped);
608
609        let result = sink.write_batch(&test_batch(100)).await.unwrap();
610        assert_eq!(result.records_written, 0);
611    }
612
613    #[tokio::test]
614    async fn test_empty_epoch_commit() {
615        let mut sink = IcebergSink::new(test_config(), None);
616        sink.begin_epoch(1).await.unwrap();
617        sink.pre_commit(1).await.unwrap();
618        sink.commit_epoch(1).await.unwrap();
619    }
620
621    #[test]
622    fn test_capabilities() {
623        let sink = IcebergSink::new(test_config(), None);
624        let caps = sink.capabilities();
625        assert!(caps.exactly_once);
626        assert!(caps.two_phase_commit);
627        assert!(!caps.partitioned);
628        assert!(!caps.upsert);
629    }
630}