laminar_connectors/lakehouse/
iceberg_reference.rs1use std::collections::VecDeque;
8#[cfg(feature = "iceberg")]
9use std::sync::Arc;
10use std::time::Instant;
11
12use arrow_array::RecordBatch;
13use async_trait::async_trait;
14#[cfg(feature = "iceberg")]
15use tracing::{debug, info};
16
17use crate::checkpoint::SourceCheckpoint;
18use crate::config::ConnectorConfig;
19use crate::error::ConnectorError;
20use crate::reference::ReferenceTableSource;
21
22use super::iceberg_config::IcebergSourceConfig;
23
24#[allow(dead_code)] pub struct IcebergReferenceTableSource {
27 config: IcebergSourceConfig,
29 snapshot_batches: VecDeque<RecordBatch>,
31 snapshot_complete: bool,
33 change_batches: VecDeque<RecordBatch>,
35 loaded_snapshot_id: Option<i64>,
37 delivered_snapshot_id: Option<i64>,
39 last_poll_time: Option<Instant>,
41 epoch: u64,
43 #[cfg(feature = "iceberg")]
45 catalog: Option<Arc<dyn iceberg::Catalog>>,
46}
47
48impl IcebergReferenceTableSource {
49 #[must_use]
51 pub fn new(config: IcebergSourceConfig) -> Self {
52 Self {
53 config,
54 snapshot_batches: VecDeque::new(),
55 snapshot_complete: false,
56 change_batches: VecDeque::new(),
57 loaded_snapshot_id: None,
58 delivered_snapshot_id: None,
59 last_poll_time: None,
60 epoch: 0,
61 #[cfg(feature = "iceberg")]
62 catalog: None,
63 }
64 }
65
66 pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
72 let cfg = IcebergSourceConfig::from_config(config)?;
73 Ok(Self::new(cfg))
74 }
75
76 #[cfg(feature = "iceberg")]
78 async fn load_initial_snapshot(&mut self) -> Result<(), ConnectorError> {
79 let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
80 let table = super::iceberg_io::load_table(
81 catalog.as_ref(),
82 &self.config.catalog.namespace,
83 &self.config.catalog.table_name,
84 )
85 .await?;
86
87 let snap_id = self
88 .config
89 .snapshot_id
90 .or_else(|| super::iceberg_io::current_snapshot_id(&table));
91
92 let batches =
93 super::iceberg_io::scan_table(&table, snap_id, &self.config.select_columns).await?;
94
95 let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
96 info!(snapshot = ?snap_id, rows, "iceberg reference: initial snapshot loaded");
97
98 self.snapshot_batches.extend(batches);
99 self.loaded_snapshot_id = snap_id;
100 self.catalog = Some(catalog);
101
102 Ok(())
103 }
104
105 #[cfg(feature = "iceberg")]
107 async fn poll_for_changes(&mut self) -> Result<(), ConnectorError> {
108 if let Some(last) = self.last_poll_time {
109 if last.elapsed() < self.config.poll_interval {
110 return Ok(());
111 }
112 }
113 self.last_poll_time = Some(Instant::now());
114
115 let catalog = self
116 .catalog
117 .as_ref()
118 .ok_or_else(|| ConnectorError::InvalidState {
119 expected: "snapshot loaded".into(),
120 actual: "catalog not initialized".into(),
121 })?;
122
123 let table = super::iceberg_io::load_table(
124 catalog.as_ref(),
125 &self.config.catalog.namespace,
126 &self.config.catalog.table_name,
127 )
128 .await?;
129
130 let current_snap = super::iceberg_io::current_snapshot_id(&table);
131
132 if current_snap == self.loaded_snapshot_id {
133 return Ok(());
134 }
135
136 let batches = if let (Some(old), Some(new)) = (self.loaded_snapshot_id, current_snap) {
138 super::iceberg_incremental::scan_incremental(
139 &table,
140 old,
141 new,
142 &self.config.select_columns,
143 )
144 .await?
145 } else {
146 super::iceberg_io::scan_table(&table, current_snap, &self.config.select_columns).await?
147 };
148
149 let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
150 debug!(
151 old_snapshot = ?self.loaded_snapshot_id,
152 new_snapshot = ?current_snap,
153 rows,
154 "iceberg reference: change snapshot loaded"
155 );
156
157 self.change_batches.extend(batches);
158 self.loaded_snapshot_id = current_snap;
159
160 Ok(())
161 }
162}
163
164#[async_trait]
165impl ReferenceTableSource for IcebergReferenceTableSource {
166 async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
167 if !self.snapshot_complete
168 && self.snapshot_batches.is_empty()
169 && self.loaded_snapshot_id.is_none()
170 {
171 #[cfg(feature = "iceberg")]
172 self.load_initial_snapshot().await?;
173 }
174
175 if let Some(batch) = self.snapshot_batches.pop_front() {
176 return Ok(Some(batch));
177 }
178
179 self.delivered_snapshot_id = self.loaded_snapshot_id;
181 self.snapshot_complete = true;
182 Ok(None)
183 }
184
185 fn is_snapshot_complete(&self) -> bool {
186 self.snapshot_complete
187 }
188
189 async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
190 if let Some(batch) = self.change_batches.pop_front() {
191 return Ok(Some(batch));
192 }
193
194 self.delivered_snapshot_id = self.loaded_snapshot_id;
196
197 #[cfg(feature = "iceberg")]
198 self.poll_for_changes().await?;
199
200 Ok(self.change_batches.pop_front())
201 }
202
203 fn checkpoint(&self) -> SourceCheckpoint {
204 let mut cp = SourceCheckpoint::new(self.epoch);
205 if let Some(sid) = self.delivered_snapshot_id {
206 cp.set_offset("snapshot_id", sid.to_string());
207 }
208 cp.set_metadata("connector_type", "iceberg-reference");
209 cp
210 }
211
212 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
213 self.epoch = checkpoint.epoch();
214 if let Some(sid) = checkpoint.get_offset("snapshot_id") {
215 let snap: i64 = sid.parse().map_err(|_| {
216 ConnectorError::Internal(format!("invalid snapshot_id in checkpoint: '{sid}'"))
217 })?;
218 self.delivered_snapshot_id = Some(snap);
220 self.loaded_snapshot_id = Some(snap);
221 }
222 Ok(())
223 }
224
225 async fn close(&mut self) -> Result<(), ConnectorError> {
226 #[cfg(feature = "iceberg")]
227 {
228 self.catalog = None;
229 }
230 Ok(())
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 fn test_source_config() -> IcebergSourceConfig {
239 let mut config = ConnectorConfig::new("iceberg");
240 config.set("catalog.uri", "http://localhost:8181");
241 config.set("warehouse", "s3://test/wh");
242 config.set("namespace", "test");
243 config.set("table.name", "dim_customers");
244 IcebergSourceConfig::from_config(&config).unwrap()
245 }
246
247 #[test]
248 fn test_reference_source_new() {
249 let source = IcebergReferenceTableSource::new(test_source_config());
250 assert!(!source.snapshot_complete);
251 assert!(source.snapshot_batches.is_empty());
252 assert!(source.loaded_snapshot_id.is_none());
253 }
254
255 #[tokio::test]
256 async fn test_reference_checkpoint_round_trip() {
257 let mut source = IcebergReferenceTableSource::new(test_source_config());
258 source.delivered_snapshot_id = Some(99);
260 source.loaded_snapshot_id = Some(99);
261 source.epoch = 7;
262
263 let cp = source.checkpoint();
264 assert_eq!(cp.get_offset("snapshot_id"), Some("99"));
265 assert_eq!(cp.get_metadata("connector_type"), Some("iceberg-reference"));
266
267 let mut restored = IcebergReferenceTableSource::new(test_source_config());
268 restored.restore(&cp).await.unwrap();
269 assert_eq!(restored.delivered_snapshot_id, Some(99));
270 assert_eq!(restored.loaded_snapshot_id, Some(99));
271 assert_eq!(restored.epoch, 7);
272 }
273
274 #[tokio::test]
275 async fn test_checkpoint_only_after_delivery() {
276 let mut source = IcebergReferenceTableSource::new(test_source_config());
277 source.loaded_snapshot_id = Some(42);
279 let cp = source.checkpoint();
280 assert_eq!(cp.get_offset("snapshot_id"), None);
281 }
282
283 #[tokio::test]
284 async fn test_reference_empty_snapshot_completes() {
285 let mut source = IcebergReferenceTableSource::new(test_source_config());
286 source.loaded_snapshot_id = Some(1);
287 let result = source.poll_snapshot().await.unwrap();
288 assert!(result.is_none());
289 assert!(source.is_snapshot_complete());
290 }
291
292 #[test]
293 fn test_from_connector_config() {
294 let mut config = ConnectorConfig::new("iceberg");
295 config.set("catalog.uri", "http://localhost:8181");
296 config.set("warehouse", "s3://test/wh");
297 config.set("namespace", "test");
298 config.set("table.name", "dim_customers");
299
300 let source = IcebergReferenceTableSource::from_connector_config(&config);
301 assert!(source.is_ok());
302 }
303}