laminar_connectors/lakehouse/
iceberg_source.rs1use std::collections::VecDeque;
12use std::sync::Arc;
13use std::time::Instant;
14
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use async_trait::async_trait;
18use tracing::debug;
19#[cfg(feature = "iceberg")]
20use tracing::info;
21
22use crate::checkpoint::SourceCheckpoint;
23use crate::config::{ConnectorConfig, ConnectorState};
24use crate::connector::{SourceBatch, SourceConnector};
25use crate::error::ConnectorError;
26
27use super::iceberg_config::IcebergSourceConfig;
28
29#[allow(dead_code)] pub struct IcebergSource {
35 config: IcebergSourceConfig,
37 schema: Option<SchemaRef>,
39 state: ConnectorState,
41 buffer: VecDeque<RecordBatch>,
43 last_snapshot_id: Option<i64>,
45 last_poll_time: Option<Instant>,
47 epoch: u64,
49 #[cfg(feature = "iceberg")]
51 catalog: Option<Arc<dyn iceberg::Catalog>>,
52 #[cfg(feature = "iceberg")]
54 table: Option<iceberg::table::Table>,
55}
56
57impl IcebergSource {
58 #[must_use]
60 pub fn new(config: IcebergSourceConfig, _registry: Option<&prometheus::Registry>) -> Self {
61 Self {
62 config,
63 schema: None,
64 state: ConnectorState::Created,
65 buffer: VecDeque::new(),
66 last_snapshot_id: None,
67 last_poll_time: None,
68 epoch: 0,
69 #[cfg(feature = "iceberg")]
70 catalog: None,
71 #[cfg(feature = "iceberg")]
72 table: None,
73 }
74 }
75
76 #[cfg(feature = "iceberg")]
78 async fn refresh(&mut self) -> Result<(), ConnectorError> {
79 if let Some(last) = self.last_poll_time {
80 if last.elapsed() < self.config.poll_interval {
81 return Ok(());
82 }
83 }
84 self.last_poll_time = Some(Instant::now());
85
86 let catalog = self
88 .catalog
89 .as_ref()
90 .ok_or_else(|| ConnectorError::InvalidState {
91 expected: "open".into(),
92 actual: "catalog not initialized".into(),
93 })?;
94 let table = super::iceberg_io::load_table(
95 catalog.as_ref(),
96 &self.config.catalog.namespace,
97 &self.config.catalog.table_name,
98 )
99 .await?;
100 self.table = Some(table);
101
102 let table = self.table.as_ref().unwrap();
103 let current_snap = super::iceberg_io::current_snapshot_id(table);
104
105 if let Some(pinned) = self.config.snapshot_id {
107 if self.last_snapshot_id.is_some() {
108 return Ok(());
109 }
110 let batches =
111 super::iceberg_io::scan_table(table, Some(pinned), &self.config.select_columns)
112 .await?;
113 self.buffer.extend(batches);
114 self.last_snapshot_id = Some(pinned);
115 return Ok(());
116 }
117
118 if current_snap == self.last_snapshot_id {
119 return Ok(());
120 }
121
122 let batches = if let (Some(old), Some(new)) = (self.last_snapshot_id, current_snap) {
124 super::iceberg_incremental::scan_incremental(
125 table,
126 old,
127 new,
128 &self.config.select_columns,
129 )
130 .await?
131 } else {
132 super::iceberg_io::scan_table(table, current_snap, &self.config.select_columns).await?
133 };
134
135 if self.schema.is_none() {
136 if let Some(first) = batches.first() {
137 self.schema = Some(first.schema());
138 }
139 }
140
141 let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
142 debug!(snapshot = ?current_snap, rows, "iceberg source loaded snapshot");
143
144 self.buffer.extend(batches);
145 self.last_snapshot_id = current_snap;
146
147 Ok(())
148 }
149}
150
151#[async_trait]
152impl SourceConnector for IcebergSource {
153 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
154 if !config.properties().is_empty() {
156 self.config = IcebergSourceConfig::from_config(config)?;
157 }
158
159 #[cfg(feature = "iceberg")]
160 {
161 let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
162 let table = super::iceberg_io::load_table(
163 catalog.as_ref(),
164 &self.config.catalog.namespace,
165 &self.config.catalog.table_name,
166 )
167 .await?;
168
169 let iceberg_schema = table.current_schema_ref();
170 let arrow_schema =
171 iceberg::arrow::schema_to_arrow_schema(&iceberg_schema).map_err(|e| {
172 ConnectorError::SchemaMismatch(format!("iceberg→arrow schema: {e}"))
173 })?;
174 self.schema = Some(Arc::new(arrow_schema));
175
176 info!(
177 table = self.config.catalog.table_name,
178 namespace = self.config.catalog.namespace,
179 "iceberg source connected"
180 );
181
182 self.catalog = Some(catalog);
183 self.table = Some(table);
184 self.state = ConnectorState::Running;
185 return Ok(());
186 }
187
188 #[cfg(not(feature = "iceberg"))]
189 {
190 self.state = ConnectorState::Failed;
191 Err(ConnectorError::ConfigurationError(
192 "Apache Iceberg requires the 'iceberg' feature".into(),
193 ))
194 }
195 }
196
197 async fn poll_batch(
198 &mut self,
199 max_records: usize,
200 ) -> Result<Option<SourceBatch>, ConnectorError> {
201 if let Some(batch) = self.buffer.pop_front() {
202 if batch.num_rows() <= max_records {
203 return Ok(Some(SourceBatch::new(batch)));
204 }
205 let take = batch.slice(0, max_records);
206 let remainder = batch.slice(max_records, batch.num_rows() - max_records);
207 self.buffer.push_front(remainder);
208 return Ok(Some(SourceBatch::new(take)));
209 }
210
211 #[cfg(feature = "iceberg")]
212 self.refresh().await?;
213
214 Ok(self.buffer.pop_front().map(SourceBatch::new))
215 }
216
217 fn schema(&self) -> SchemaRef {
218 self.schema
219 .clone()
220 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
221 }
222
223 fn checkpoint(&self) -> SourceCheckpoint {
224 let mut cp = SourceCheckpoint::new(self.epoch);
225 if let Some(sid) = self.last_snapshot_id {
226 cp.set_offset("snapshot_id", sid.to_string());
227 }
228 cp.set_metadata("connector_type", "iceberg");
229 cp
230 }
231
232 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
233 self.epoch = checkpoint.epoch();
234 if let Some(sid) = checkpoint.get_offset("snapshot_id") {
235 self.last_snapshot_id = Some(sid.parse().map_err(|_| {
236 ConnectorError::Internal(format!("invalid snapshot_id in checkpoint: '{sid}'"))
237 })?);
238 debug!(snapshot_id = ?self.last_snapshot_id, "iceberg source restored");
239 }
240 Ok(())
241 }
242
243 fn supports_replay(&self) -> bool {
244 false
245 }
246
247 async fn close(&mut self) -> Result<(), ConnectorError> {
248 #[cfg(feature = "iceberg")]
249 {
250 self.catalog = None;
251 self.table = None;
252 }
253 self.state = ConnectorState::Closed;
254 Ok(())
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::config::ConnectorConfig;
262
263 fn test_source_config() -> IcebergSourceConfig {
264 let mut config = ConnectorConfig::new("iceberg");
265 config.set("catalog.uri", "http://localhost:8181");
266 config.set("warehouse", "s3://test/wh");
267 config.set("namespace", "test");
268 config.set("table.name", "dim_customers");
269 IcebergSourceConfig::from_config(&config).unwrap()
270 }
271
272 #[test]
273 fn test_new_source() {
274 let source = IcebergSource::new(test_source_config(), None);
275 assert!(source.schema.is_none());
276 assert!(source.last_snapshot_id.is_none());
277 assert!(source.buffer.is_empty());
278 }
279
280 #[test]
281 fn test_checkpoint_round_trip() {
282 let mut source = IcebergSource::new(test_source_config(), None);
283 source.last_snapshot_id = Some(42);
284 source.epoch = 5;
285
286 let cp = source.checkpoint();
287 assert_eq!(cp.epoch(), 5);
288 assert_eq!(cp.get_offset("snapshot_id"), Some("42"));
289 assert_eq!(cp.get_metadata("connector_type"), Some("iceberg"));
290 }
291
292 #[tokio::test]
293 async fn test_restore_from_checkpoint() {
294 let mut source = IcebergSource::new(test_source_config(), None);
295 let mut cp = SourceCheckpoint::new(10);
296 cp.set_offset("snapshot_id", "123");
297
298 source.restore(&cp).await.unwrap();
299 assert_eq!(source.epoch, 10);
300 assert_eq!(source.last_snapshot_id, Some(123));
301 }
302
303 #[test]
304 fn test_supports_replay_false() {
305 let source = IcebergSource::new(test_source_config(), None);
306 assert!(!source.supports_replay());
307 }
308}