Skip to main content

laminar_connectors/lakehouse/
iceberg_reference.rs

1//! Iceberg reference table source for lookup/enrichment joins.
2//!
3//! Implements [`ReferenceTableSource`] to populate a dimension table from
4//! an Iceberg table. Delivers snapshot data as batches, then polls for
5//! new snapshots to deliver incremental changes via manifest diff.
6
7use std::collections::VecDeque;
8#[cfg(feature = "iceberg")]
9use std::sync::Arc;
10use std::time::Instant;
11
12use arrow_array::RecordBatch;
13use async_trait::async_trait;
14#[cfg(feature = "iceberg")]
15use tracing::{debug, info};
16
17use crate::checkpoint::SourceCheckpoint;
18use crate::config::ConnectorConfig;
19use crate::error::ConnectorError;
20use crate::reference::ReferenceTableSource;
21
22use super::iceberg_config::IcebergSourceConfig;
23
24/// Iceberg reference table source for lookup/enrichment joins.
25#[allow(dead_code)] // Fields used by feature-gated I/O methods.
26pub struct IcebergReferenceTableSource {
27    /// Source configuration.
28    config: IcebergSourceConfig,
29    /// Buffered snapshot batches (drained via `poll_snapshot`).
30    snapshot_batches: VecDeque<RecordBatch>,
31    /// Whether the initial snapshot has been fully delivered.
32    snapshot_complete: bool,
33    /// Buffered change batches (from newer snapshots).
34    change_batches: VecDeque<RecordBatch>,
35    /// Snapshot ID that has been loaded (may still have undelivered batches).
36    loaded_snapshot_id: Option<i64>,
37    /// Snapshot ID whose batches have been fully delivered — safe to checkpoint.
38    delivered_snapshot_id: Option<i64>,
39    /// Time of last change poll.
40    last_poll_time: Option<Instant>,
41    /// Current epoch.
42    epoch: u64,
43    /// Cached catalog connection.
44    #[cfg(feature = "iceberg")]
45    catalog: Option<Arc<dyn iceberg::Catalog>>,
46}
47
48impl IcebergReferenceTableSource {
49    /// Creates a new reference table source.
50    #[must_use]
51    pub fn new(config: IcebergSourceConfig) -> Self {
52        Self {
53            config,
54            snapshot_batches: VecDeque::new(),
55            snapshot_complete: false,
56            change_batches: VecDeque::new(),
57            loaded_snapshot_id: None,
58            delivered_snapshot_id: None,
59            last_poll_time: None,
60            epoch: 0,
61            #[cfg(feature = "iceberg")]
62            catalog: None,
63        }
64    }
65
66    /// Creates from a `ConnectorConfig` (SQL WITH clause).
67    ///
68    /// # Errors
69    ///
70    /// Returns `ConnectorError` on invalid configuration.
71    pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
72        let cfg = IcebergSourceConfig::from_config(config)?;
73        Ok(Self::new(cfg))
74    }
75
76    /// Loads the initial snapshot from the Iceberg table.
77    #[cfg(feature = "iceberg")]
78    async fn load_initial_snapshot(&mut self) -> Result<(), ConnectorError> {
79        let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
80        let table = super::iceberg_io::load_table(
81            catalog.as_ref(),
82            &self.config.catalog.namespace,
83            &self.config.catalog.table_name,
84        )
85        .await?;
86
87        let snap_id = self
88            .config
89            .snapshot_id
90            .or_else(|| super::iceberg_io::current_snapshot_id(&table));
91
92        let batches =
93            super::iceberg_io::scan_table(&table, snap_id, &self.config.select_columns).await?;
94
95        let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
96        info!(snapshot = ?snap_id, rows, "iceberg reference: initial snapshot loaded");
97
98        self.snapshot_batches.extend(batches);
99        self.loaded_snapshot_id = snap_id;
100        self.catalog = Some(catalog);
101
102        Ok(())
103    }
104
105    /// Checks for newer snapshots and loads change data.
106    #[cfg(feature = "iceberg")]
107    async fn poll_for_changes(&mut self) -> Result<(), ConnectorError> {
108        if let Some(last) = self.last_poll_time {
109            if last.elapsed() < self.config.poll_interval {
110                return Ok(());
111            }
112        }
113        self.last_poll_time = Some(Instant::now());
114
115        let catalog = self
116            .catalog
117            .as_ref()
118            .ok_or_else(|| ConnectorError::InvalidState {
119                expected: "snapshot loaded".into(),
120                actual: "catalog not initialized".into(),
121            })?;
122
123        let table = super::iceberg_io::load_table(
124            catalog.as_ref(),
125            &self.config.catalog.namespace,
126            &self.config.catalog.table_name,
127        )
128        .await?;
129
130        let current_snap = super::iceberg_io::current_snapshot_id(&table);
131
132        if current_snap == self.loaded_snapshot_id {
133            return Ok(());
134        }
135
136        // Incremental read if we have a previous snapshot, full scan otherwise.
137        let batches = if let (Some(old), Some(new)) = (self.loaded_snapshot_id, current_snap) {
138            super::iceberg_incremental::scan_incremental(
139                &table,
140                old,
141                new,
142                &self.config.select_columns,
143            )
144            .await?
145        } else {
146            super::iceberg_io::scan_table(&table, current_snap, &self.config.select_columns).await?
147        };
148
149        let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
150        debug!(
151            old_snapshot = ?self.loaded_snapshot_id,
152            new_snapshot = ?current_snap,
153            rows,
154            "iceberg reference: change snapshot loaded"
155        );
156
157        self.change_batches.extend(batches);
158        self.loaded_snapshot_id = current_snap;
159
160        Ok(())
161    }
162}
163
164#[async_trait]
165impl ReferenceTableSource for IcebergReferenceTableSource {
166    async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
167        if !self.snapshot_complete
168            && self.snapshot_batches.is_empty()
169            && self.loaded_snapshot_id.is_none()
170        {
171            #[cfg(feature = "iceberg")]
172            self.load_initial_snapshot().await?;
173        }
174
175        if let Some(batch) = self.snapshot_batches.pop_front() {
176            return Ok(Some(batch));
177        }
178
179        // All snapshot batches delivered — safe to checkpoint this snapshot.
180        self.delivered_snapshot_id = self.loaded_snapshot_id;
181        self.snapshot_complete = true;
182        Ok(None)
183    }
184
185    fn is_snapshot_complete(&self) -> bool {
186        self.snapshot_complete
187    }
188
189    async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
190        if let Some(batch) = self.change_batches.pop_front() {
191            return Ok(Some(batch));
192        }
193
194        // All change batches from the previous snapshot have been delivered.
195        self.delivered_snapshot_id = self.loaded_snapshot_id;
196
197        #[cfg(feature = "iceberg")]
198        self.poll_for_changes().await?;
199
200        Ok(self.change_batches.pop_front())
201    }
202
203    fn checkpoint(&self) -> SourceCheckpoint {
204        let mut cp = SourceCheckpoint::new(self.epoch);
205        if let Some(sid) = self.delivered_snapshot_id {
206            cp.set_offset("snapshot_id", sid.to_string());
207        }
208        cp.set_metadata("connector_type", "iceberg-reference");
209        cp
210    }
211
212    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
213        self.epoch = checkpoint.epoch();
214        if let Some(sid) = checkpoint.get_offset("snapshot_id") {
215            let snap: i64 = sid.parse().map_err(|_| {
216                ConnectorError::Internal(format!("invalid snapshot_id in checkpoint: '{sid}'"))
217            })?;
218            // Both cursors restored to the last fully-delivered snapshot.
219            self.delivered_snapshot_id = Some(snap);
220            self.loaded_snapshot_id = Some(snap);
221        }
222        Ok(())
223    }
224
225    async fn close(&mut self) -> Result<(), ConnectorError> {
226        #[cfg(feature = "iceberg")]
227        {
228            self.catalog = None;
229        }
230        Ok(())
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    fn test_source_config() -> IcebergSourceConfig {
239        let mut config = ConnectorConfig::new("iceberg");
240        config.set("catalog.uri", "http://localhost:8181");
241        config.set("warehouse", "s3://test/wh");
242        config.set("namespace", "test");
243        config.set("table.name", "dim_customers");
244        IcebergSourceConfig::from_config(&config).unwrap()
245    }
246
247    #[test]
248    fn test_reference_source_new() {
249        let source = IcebergReferenceTableSource::new(test_source_config());
250        assert!(!source.snapshot_complete);
251        assert!(source.snapshot_batches.is_empty());
252        assert!(source.loaded_snapshot_id.is_none());
253    }
254
255    #[tokio::test]
256    async fn test_reference_checkpoint_round_trip() {
257        let mut source = IcebergReferenceTableSource::new(test_source_config());
258        // Simulate fully-delivered snapshot.
259        source.delivered_snapshot_id = Some(99);
260        source.loaded_snapshot_id = Some(99);
261        source.epoch = 7;
262
263        let cp = source.checkpoint();
264        assert_eq!(cp.get_offset("snapshot_id"), Some("99"));
265        assert_eq!(cp.get_metadata("connector_type"), Some("iceberg-reference"));
266
267        let mut restored = IcebergReferenceTableSource::new(test_source_config());
268        restored.restore(&cp).await.unwrap();
269        assert_eq!(restored.delivered_snapshot_id, Some(99));
270        assert_eq!(restored.loaded_snapshot_id, Some(99));
271        assert_eq!(restored.epoch, 7);
272    }
273
274    #[tokio::test]
275    async fn test_checkpoint_only_after_delivery() {
276        let mut source = IcebergReferenceTableSource::new(test_source_config());
277        // Loaded but not yet delivered — checkpoint should NOT include snapshot_id.
278        source.loaded_snapshot_id = Some(42);
279        let cp = source.checkpoint();
280        assert_eq!(cp.get_offset("snapshot_id"), None);
281    }
282
283    #[tokio::test]
284    async fn test_reference_empty_snapshot_completes() {
285        let mut source = IcebergReferenceTableSource::new(test_source_config());
286        source.loaded_snapshot_id = Some(1);
287        let result = source.poll_snapshot().await.unwrap();
288        assert!(result.is_none());
289        assert!(source.is_snapshot_complete());
290    }
291
292    #[test]
293    fn test_from_connector_config() {
294        let mut config = ConnectorConfig::new("iceberg");
295        config.set("catalog.uri", "http://localhost:8181");
296        config.set("warehouse", "s3://test/wh");
297        config.set("namespace", "test");
298        config.set("table.name", "dim_customers");
299
300        let source = IcebergReferenceTableSource::from_connector_config(&config);
301        assert!(source.is_ok());
302    }
303}