Skip to main content

laminar_connectors/lakehouse/
iceberg_source.rs

1//! Apache Iceberg source connector implementation.
2//!
3//! [`IcebergSource`] implements [`SourceConnector`] for polling Iceberg
4//! snapshots. It is a **reference/lookup table** source — Iceberg tables
5//! are snapshot-based with no push mechanism.
6//!
7//! On each poll cycle the source checks for a newer snapshot. If one exists,
8//! only the newly added data files are read via manifest-level diff
9//! (see `iceberg_incremental::scan_incremental`). The first read is a full scan.
10
11use std::collections::VecDeque;
12use std::sync::Arc;
13use std::time::Instant;
14
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use async_trait::async_trait;
18use tracing::debug;
19#[cfg(feature = "iceberg")]
20use tracing::info;
21
22use crate::checkpoint::SourceCheckpoint;
23use crate::config::{ConnectorConfig, ConnectorState};
24use crate::connector::{SourceBatch, SourceConnector};
25use crate::error::ConnectorError;
26use crate::health::HealthStatus;
27use crate::metrics::ConnectorMetrics;
28
29use super::iceberg_config::IcebergSourceConfig;
30
31/// Apache Iceberg source connector.
32///
33/// Polls for new snapshots on a configurable interval and emits
34/// `RecordBatch` data. Supports pinning to a specific snapshot.
35#[allow(dead_code)] // Fields used by feature-gated I/O methods.
36pub struct IcebergSource {
37    /// Source configuration — reparsed from `ConnectorConfig` in `open()`.
38    config: IcebergSourceConfig,
39    /// Discovered Arrow schema.
40    schema: Option<SchemaRef>,
41    /// Connector lifecycle state.
42    state: ConnectorState,
43    /// Buffered batches from the most recent scan.
44    buffer: VecDeque<RecordBatch>,
45    /// Last fully-ingested snapshot ID.
46    last_snapshot_id: Option<i64>,
47    /// Time of last snapshot poll.
48    last_poll_time: Option<Instant>,
49    /// Current epoch for checkpoint.
50    epoch: u64,
51    /// Cached catalog connection.
52    #[cfg(feature = "iceberg")]
53    catalog: Option<Arc<dyn iceberg::Catalog>>,
54    /// Cached table handle.
55    #[cfg(feature = "iceberg")]
56    table: Option<iceberg::table::Table>,
57}
58
59impl IcebergSource {
60    /// Creates a new Iceberg source with the given configuration.
61    #[must_use]
62    pub fn new(config: IcebergSourceConfig, _registry: Option<&prometheus::Registry>) -> Self {
63        Self {
64            config,
65            schema: None,
66            state: ConnectorState::Created,
67            buffer: VecDeque::new(),
68            last_snapshot_id: None,
69            last_poll_time: None,
70            epoch: 0,
71            #[cfg(feature = "iceberg")]
72            catalog: None,
73            #[cfg(feature = "iceberg")]
74            table: None,
75        }
76    }
77
78    /// Checks for a new snapshot and loads data if available.
79    #[cfg(feature = "iceberg")]
80    async fn refresh(&mut self) -> Result<(), ConnectorError> {
81        if let Some(last) = self.last_poll_time {
82            if last.elapsed() < self.config.poll_interval {
83                return Ok(());
84            }
85        }
86        self.last_poll_time = Some(Instant::now());
87
88        // Reload table metadata to see new snapshots.
89        let catalog = self
90            .catalog
91            .as_ref()
92            .ok_or_else(|| ConnectorError::InvalidState {
93                expected: "open".into(),
94                actual: "catalog not initialized".into(),
95            })?;
96        let table = super::iceberg_io::load_table(
97            catalog.as_ref(),
98            &self.config.catalog.namespace,
99            &self.config.catalog.table_name,
100        )
101        .await?;
102        self.table = Some(table);
103
104        let table = self.table.as_ref().unwrap();
105        let current_snap = super::iceberg_io::current_snapshot_id(table);
106
107        // If pinned to a specific snapshot, only load once.
108        if let Some(pinned) = self.config.snapshot_id {
109            if self.last_snapshot_id.is_some() {
110                return Ok(());
111            }
112            let batches =
113                super::iceberg_io::scan_table(table, Some(pinned), &self.config.select_columns)
114                    .await?;
115            self.buffer.extend(batches);
116            self.last_snapshot_id = Some(pinned);
117            return Ok(());
118        }
119
120        if current_snap == self.last_snapshot_id {
121            return Ok(());
122        }
123
124        // Incremental read if we have a previous snapshot, full scan otherwise.
125        let batches = if let (Some(old), Some(new)) = (self.last_snapshot_id, current_snap) {
126            super::iceberg_incremental::scan_incremental(
127                table,
128                old,
129                new,
130                &self.config.select_columns,
131            )
132            .await?
133        } else {
134            super::iceberg_io::scan_table(table, current_snap, &self.config.select_columns).await?
135        };
136
137        if self.schema.is_none() {
138            if let Some(first) = batches.first() {
139                self.schema = Some(first.schema());
140            }
141        }
142
143        let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
144        debug!(snapshot = ?current_snap, rows, "iceberg source loaded snapshot");
145
146        self.buffer.extend(batches);
147        self.last_snapshot_id = current_snap;
148
149        Ok(())
150    }
151}
152
153#[async_trait]
154impl SourceConnector for IcebergSource {
155    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
156        // Re-parse config from runtime ConnectorConfig (not factory defaults).
157        if !config.properties().is_empty() {
158            self.config = IcebergSourceConfig::from_config(config)?;
159        }
160
161        #[cfg(feature = "iceberg")]
162        {
163            let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
164            let table = super::iceberg_io::load_table(
165                catalog.as_ref(),
166                &self.config.catalog.namespace,
167                &self.config.catalog.table_name,
168            )
169            .await?;
170
171            let iceberg_schema = table.current_schema_ref();
172            let arrow_schema =
173                iceberg::arrow::schema_to_arrow_schema(&iceberg_schema).map_err(|e| {
174                    ConnectorError::SchemaMismatch(format!("iceberg→arrow schema: {e}"))
175                })?;
176            self.schema = Some(Arc::new(arrow_schema));
177
178            info!(
179                table = self.config.catalog.table_name,
180                namespace = self.config.catalog.namespace,
181                "iceberg source connected"
182            );
183
184            self.catalog = Some(catalog);
185            self.table = Some(table);
186            self.state = ConnectorState::Running;
187            return Ok(());
188        }
189
190        #[cfg(not(feature = "iceberg"))]
191        {
192            self.state = ConnectorState::Failed;
193            Err(ConnectorError::ConfigurationError(
194                "Apache Iceberg requires the 'iceberg' feature".into(),
195            ))
196        }
197    }
198
199    async fn poll_batch(
200        &mut self,
201        max_records: usize,
202    ) -> Result<Option<SourceBatch>, ConnectorError> {
203        if let Some(batch) = self.buffer.pop_front() {
204            if batch.num_rows() <= max_records {
205                return Ok(Some(SourceBatch::new(batch)));
206            }
207            let take = batch.slice(0, max_records);
208            let remainder = batch.slice(max_records, batch.num_rows() - max_records);
209            self.buffer.push_front(remainder);
210            return Ok(Some(SourceBatch::new(take)));
211        }
212
213        #[cfg(feature = "iceberg")]
214        self.refresh().await?;
215
216        Ok(self.buffer.pop_front().map(SourceBatch::new))
217    }
218
219    fn schema(&self) -> SchemaRef {
220        self.schema
221            .clone()
222            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
223    }
224
225    fn checkpoint(&self) -> SourceCheckpoint {
226        let mut cp = SourceCheckpoint::new(self.epoch);
227        if let Some(sid) = self.last_snapshot_id {
228            cp.set_offset("snapshot_id", sid.to_string());
229        }
230        cp.set_metadata("connector_type", "iceberg");
231        cp
232    }
233
234    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
235        self.epoch = checkpoint.epoch();
236        if let Some(sid) = checkpoint.get_offset("snapshot_id") {
237            self.last_snapshot_id = Some(sid.parse().map_err(|_| {
238                ConnectorError::Internal(format!("invalid snapshot_id in checkpoint: '{sid}'"))
239            })?);
240            debug!(snapshot_id = ?self.last_snapshot_id, "iceberg source restored");
241        }
242        Ok(())
243    }
244
245    fn supports_replay(&self) -> bool {
246        false
247    }
248
249    fn health_check(&self) -> HealthStatus {
250        match self.state {
251            ConnectorState::Running => HealthStatus::Healthy,
252            ConnectorState::Failed => HealthStatus::Unhealthy("source failed".into()),
253            _ => HealthStatus::Unknown,
254        }
255    }
256
257    fn metrics(&self) -> ConnectorMetrics {
258        ConnectorMetrics::default()
259    }
260
261    async fn close(&mut self) -> Result<(), ConnectorError> {
262        #[cfg(feature = "iceberg")]
263        {
264            self.catalog = None;
265            self.table = None;
266        }
267        self.state = ConnectorState::Closed;
268        Ok(())
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::config::ConnectorConfig;
276
277    fn test_source_config() -> IcebergSourceConfig {
278        let mut config = ConnectorConfig::new("iceberg");
279        config.set("catalog.uri", "http://localhost:8181");
280        config.set("warehouse", "s3://test/wh");
281        config.set("namespace", "test");
282        config.set("table.name", "dim_customers");
283        IcebergSourceConfig::from_config(&config).unwrap()
284    }
285
286    #[test]
287    fn test_new_source() {
288        let source = IcebergSource::new(test_source_config(), None);
289        assert!(source.schema.is_none());
290        assert!(source.last_snapshot_id.is_none());
291        assert!(source.buffer.is_empty());
292    }
293
294    #[test]
295    fn test_checkpoint_round_trip() {
296        let mut source = IcebergSource::new(test_source_config(), None);
297        source.last_snapshot_id = Some(42);
298        source.epoch = 5;
299
300        let cp = source.checkpoint();
301        assert_eq!(cp.epoch(), 5);
302        assert_eq!(cp.get_offset("snapshot_id"), Some("42"));
303        assert_eq!(cp.get_metadata("connector_type"), Some("iceberg"));
304    }
305
306    #[tokio::test]
307    async fn test_restore_from_checkpoint() {
308        let mut source = IcebergSource::new(test_source_config(), None);
309        let mut cp = SourceCheckpoint::new(10);
310        cp.set_offset("snapshot_id", "123");
311
312        source.restore(&cp).await.unwrap();
313        assert_eq!(source.epoch, 10);
314        assert_eq!(source.last_snapshot_id, Some(123));
315    }
316
317    #[test]
318    fn test_supports_replay_false() {
319        let source = IcebergSource::new(test_source_config(), None);
320        assert!(!source.supports_replay());
321    }
322}