Skip to main content

laminar_connectors/lakehouse/
iceberg_source.rs

1//! Apache Iceberg source connector implementation.
2//!
3//! [`IcebergSource`] implements [`SourceConnector`] for polling Iceberg
4//! snapshots. It is a **reference/lookup table** source — Iceberg tables
5//! are snapshot-based with no push mechanism.
6//!
7//! On each poll cycle the source checks for a newer snapshot. If one exists,
8//! only the newly added data files are read via manifest-level diff
9//! (see `iceberg_incremental::scan_incremental`). The first read is a full scan.
10
11use std::collections::VecDeque;
12use std::sync::Arc;
13use std::time::Instant;
14
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use async_trait::async_trait;
18use tracing::debug;
19#[cfg(feature = "iceberg")]
20use tracing::info;
21
22use crate::checkpoint::SourceCheckpoint;
23use crate::config::{ConnectorConfig, ConnectorState};
24use crate::connector::{SourceBatch, SourceConnector};
25use crate::error::ConnectorError;
26
27use super::iceberg_config::IcebergSourceConfig;
28
29/// Apache Iceberg source connector.
30///
31/// Polls for new snapshots on a configurable interval and emits
32/// `RecordBatch` data. Supports pinning to a specific snapshot.
33#[allow(dead_code)] // Fields used by feature-gated I/O methods.
34pub struct IcebergSource {
35    /// Source configuration — reparsed from `ConnectorConfig` in `open()`.
36    config: IcebergSourceConfig,
37    /// Discovered Arrow schema.
38    schema: Option<SchemaRef>,
39    /// Connector lifecycle state.
40    state: ConnectorState,
41    /// Buffered batches from the most recent scan.
42    buffer: VecDeque<RecordBatch>,
43    /// Last fully-ingested snapshot ID.
44    last_snapshot_id: Option<i64>,
45    /// Time of last snapshot poll.
46    last_poll_time: Option<Instant>,
47    /// Current epoch for checkpoint.
48    epoch: u64,
49    /// Cached catalog connection.
50    #[cfg(feature = "iceberg")]
51    catalog: Option<Arc<dyn iceberg::Catalog>>,
52    /// Cached table handle.
53    #[cfg(feature = "iceberg")]
54    table: Option<iceberg::table::Table>,
55}
56
57impl IcebergSource {
58    /// Creates a new Iceberg source with the given configuration.
59    #[must_use]
60    pub fn new(config: IcebergSourceConfig, _registry: Option<&prometheus::Registry>) -> Self {
61        Self {
62            config,
63            schema: None,
64            state: ConnectorState::Created,
65            buffer: VecDeque::new(),
66            last_snapshot_id: None,
67            last_poll_time: None,
68            epoch: 0,
69            #[cfg(feature = "iceberg")]
70            catalog: None,
71            #[cfg(feature = "iceberg")]
72            table: None,
73        }
74    }
75
76    /// Checks for a new snapshot and loads data if available.
77    #[cfg(feature = "iceberg")]
78    async fn refresh(&mut self) -> Result<(), ConnectorError> {
79        if let Some(last) = self.last_poll_time {
80            if last.elapsed() < self.config.poll_interval {
81                return Ok(());
82            }
83        }
84        self.last_poll_time = Some(Instant::now());
85
86        // Reload table metadata to see new snapshots.
87        let catalog = self
88            .catalog
89            .as_ref()
90            .ok_or_else(|| ConnectorError::InvalidState {
91                expected: "open".into(),
92                actual: "catalog not initialized".into(),
93            })?;
94        let table = super::iceberg_io::load_table(
95            catalog.as_ref(),
96            &self.config.catalog.namespace,
97            &self.config.catalog.table_name,
98        )
99        .await?;
100        self.table = Some(table);
101
102        let table = self.table.as_ref().unwrap();
103        let current_snap = super::iceberg_io::current_snapshot_id(table);
104
105        // If pinned to a specific snapshot, only load once.
106        if let Some(pinned) = self.config.snapshot_id {
107            if self.last_snapshot_id.is_some() {
108                return Ok(());
109            }
110            let batches =
111                super::iceberg_io::scan_table(table, Some(pinned), &self.config.select_columns)
112                    .await?;
113            self.buffer.extend(batches);
114            self.last_snapshot_id = Some(pinned);
115            return Ok(());
116        }
117
118        if current_snap == self.last_snapshot_id {
119            return Ok(());
120        }
121
122        // Incremental read if we have a previous snapshot, full scan otherwise.
123        let batches = if let (Some(old), Some(new)) = (self.last_snapshot_id, current_snap) {
124            super::iceberg_incremental::scan_incremental(
125                table,
126                old,
127                new,
128                &self.config.select_columns,
129            )
130            .await?
131        } else {
132            super::iceberg_io::scan_table(table, current_snap, &self.config.select_columns).await?
133        };
134
135        if self.schema.is_none() {
136            if let Some(first) = batches.first() {
137                self.schema = Some(first.schema());
138            }
139        }
140
141        let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
142        debug!(snapshot = ?current_snap, rows, "iceberg source loaded snapshot");
143
144        self.buffer.extend(batches);
145        self.last_snapshot_id = current_snap;
146
147        Ok(())
148    }
149}
150
151#[async_trait]
152impl SourceConnector for IcebergSource {
153    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
154        // Re-parse config from runtime ConnectorConfig (not factory defaults).
155        if !config.properties().is_empty() {
156            self.config = IcebergSourceConfig::from_config(config)?;
157        }
158
159        #[cfg(feature = "iceberg")]
160        {
161            let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
162            let table = super::iceberg_io::load_table(
163                catalog.as_ref(),
164                &self.config.catalog.namespace,
165                &self.config.catalog.table_name,
166            )
167            .await?;
168
169            let iceberg_schema = table.current_schema_ref();
170            let arrow_schema =
171                iceberg::arrow::schema_to_arrow_schema(&iceberg_schema).map_err(|e| {
172                    ConnectorError::SchemaMismatch(format!("iceberg→arrow schema: {e}"))
173                })?;
174            self.schema = Some(Arc::new(arrow_schema));
175
176            info!(
177                table = self.config.catalog.table_name,
178                namespace = self.config.catalog.namespace,
179                "iceberg source connected"
180            );
181
182            self.catalog = Some(catalog);
183            self.table = Some(table);
184            self.state = ConnectorState::Running;
185            return Ok(());
186        }
187
188        #[cfg(not(feature = "iceberg"))]
189        {
190            self.state = ConnectorState::Failed;
191            Err(ConnectorError::ConfigurationError(
192                "Apache Iceberg requires the 'iceberg' feature".into(),
193            ))
194        }
195    }
196
197    async fn poll_batch(
198        &mut self,
199        max_records: usize,
200    ) -> Result<Option<SourceBatch>, ConnectorError> {
201        if let Some(batch) = self.buffer.pop_front() {
202            if batch.num_rows() <= max_records {
203                return Ok(Some(SourceBatch::new(batch)));
204            }
205            let take = batch.slice(0, max_records);
206            let remainder = batch.slice(max_records, batch.num_rows() - max_records);
207            self.buffer.push_front(remainder);
208            return Ok(Some(SourceBatch::new(take)));
209        }
210
211        #[cfg(feature = "iceberg")]
212        self.refresh().await?;
213
214        Ok(self.buffer.pop_front().map(SourceBatch::new))
215    }
216
217    fn schema(&self) -> SchemaRef {
218        self.schema
219            .clone()
220            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
221    }
222
223    fn checkpoint(&self) -> SourceCheckpoint {
224        let mut cp = SourceCheckpoint::new(self.epoch);
225        if let Some(sid) = self.last_snapshot_id {
226            cp.set_offset("snapshot_id", sid.to_string());
227        }
228        cp.set_metadata("connector_type", "iceberg");
229        cp
230    }
231
232    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
233        self.epoch = checkpoint.epoch();
234        if let Some(sid) = checkpoint.get_offset("snapshot_id") {
235            self.last_snapshot_id = Some(sid.parse().map_err(|_| {
236                ConnectorError::Internal(format!("invalid snapshot_id in checkpoint: '{sid}'"))
237            })?);
238            debug!(snapshot_id = ?self.last_snapshot_id, "iceberg source restored");
239        }
240        Ok(())
241    }
242
243    fn supports_replay(&self) -> bool {
244        false
245    }
246
247    async fn close(&mut self) -> Result<(), ConnectorError> {
248        #[cfg(feature = "iceberg")]
249        {
250            self.catalog = None;
251            self.table = None;
252        }
253        self.state = ConnectorState::Closed;
254        Ok(())
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::config::ConnectorConfig;
262
263    fn test_source_config() -> IcebergSourceConfig {
264        let mut config = ConnectorConfig::new("iceberg");
265        config.set("catalog.uri", "http://localhost:8181");
266        config.set("warehouse", "s3://test/wh");
267        config.set("namespace", "test");
268        config.set("table.name", "dim_customers");
269        IcebergSourceConfig::from_config(&config).unwrap()
270    }
271
272    #[test]
273    fn test_new_source() {
274        let source = IcebergSource::new(test_source_config(), None);
275        assert!(source.schema.is_none());
276        assert!(source.last_snapshot_id.is_none());
277        assert!(source.buffer.is_empty());
278    }
279
280    #[test]
281    fn test_checkpoint_round_trip() {
282        let mut source = IcebergSource::new(test_source_config(), None);
283        source.last_snapshot_id = Some(42);
284        source.epoch = 5;
285
286        let cp = source.checkpoint();
287        assert_eq!(cp.epoch(), 5);
288        assert_eq!(cp.get_offset("snapshot_id"), Some("42"));
289        assert_eq!(cp.get_metadata("connector_type"), Some("iceberg"));
290    }
291
292    #[tokio::test]
293    async fn test_restore_from_checkpoint() {
294        let mut source = IcebergSource::new(test_source_config(), None);
295        let mut cp = SourceCheckpoint::new(10);
296        cp.set_offset("snapshot_id", "123");
297
298        source.restore(&cp).await.unwrap();
299        assert_eq!(source.epoch, 10);
300        assert_eq!(source.last_snapshot_id, Some(123));
301    }
302
303    #[test]
304    fn test_supports_replay_false() {
305        let source = IcebergSource::new(test_source_config(), None);
306        assert!(!source.supports_replay());
307    }
308}