laminar_connectors/lakehouse/
iceberg_source.rs1use std::collections::VecDeque;
12use std::sync::Arc;
13use std::time::Instant;
14
15use arrow_array::RecordBatch;
16use arrow_schema::SchemaRef;
17use async_trait::async_trait;
18use tracing::debug;
19#[cfg(feature = "iceberg")]
20use tracing::info;
21
22use crate::checkpoint::SourceCheckpoint;
23use crate::config::{ConnectorConfig, ConnectorState};
24use crate::connector::{SourceBatch, SourceConnector};
25use crate::error::ConnectorError;
26use crate::health::HealthStatus;
27use crate::metrics::ConnectorMetrics;
28
29use super::iceberg_config::IcebergSourceConfig;
30
31#[allow(dead_code)] pub struct IcebergSource {
37 config: IcebergSourceConfig,
39 schema: Option<SchemaRef>,
41 state: ConnectorState,
43 buffer: VecDeque<RecordBatch>,
45 last_snapshot_id: Option<i64>,
47 last_poll_time: Option<Instant>,
49 epoch: u64,
51 #[cfg(feature = "iceberg")]
53 catalog: Option<Arc<dyn iceberg::Catalog>>,
54 #[cfg(feature = "iceberg")]
56 table: Option<iceberg::table::Table>,
57}
58
59impl IcebergSource {
60 #[must_use]
62 pub fn new(config: IcebergSourceConfig, _registry: Option<&prometheus::Registry>) -> Self {
63 Self {
64 config,
65 schema: None,
66 state: ConnectorState::Created,
67 buffer: VecDeque::new(),
68 last_snapshot_id: None,
69 last_poll_time: None,
70 epoch: 0,
71 #[cfg(feature = "iceberg")]
72 catalog: None,
73 #[cfg(feature = "iceberg")]
74 table: None,
75 }
76 }
77
78 #[cfg(feature = "iceberg")]
80 async fn refresh(&mut self) -> Result<(), ConnectorError> {
81 if let Some(last) = self.last_poll_time {
82 if last.elapsed() < self.config.poll_interval {
83 return Ok(());
84 }
85 }
86 self.last_poll_time = Some(Instant::now());
87
88 let catalog = self
90 .catalog
91 .as_ref()
92 .ok_or_else(|| ConnectorError::InvalidState {
93 expected: "open".into(),
94 actual: "catalog not initialized".into(),
95 })?;
96 let table = super::iceberg_io::load_table(
97 catalog.as_ref(),
98 &self.config.catalog.namespace,
99 &self.config.catalog.table_name,
100 )
101 .await?;
102 self.table = Some(table);
103
104 let table = self.table.as_ref().unwrap();
105 let current_snap = super::iceberg_io::current_snapshot_id(table);
106
107 if let Some(pinned) = self.config.snapshot_id {
109 if self.last_snapshot_id.is_some() {
110 return Ok(());
111 }
112 let batches =
113 super::iceberg_io::scan_table(table, Some(pinned), &self.config.select_columns)
114 .await?;
115 self.buffer.extend(batches);
116 self.last_snapshot_id = Some(pinned);
117 return Ok(());
118 }
119
120 if current_snap == self.last_snapshot_id {
121 return Ok(());
122 }
123
124 let batches = if let (Some(old), Some(new)) = (self.last_snapshot_id, current_snap) {
126 super::iceberg_incremental::scan_incremental(
127 table,
128 old,
129 new,
130 &self.config.select_columns,
131 )
132 .await?
133 } else {
134 super::iceberg_io::scan_table(table, current_snap, &self.config.select_columns).await?
135 };
136
137 if self.schema.is_none() {
138 if let Some(first) = batches.first() {
139 self.schema = Some(first.schema());
140 }
141 }
142
143 let rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
144 debug!(snapshot = ?current_snap, rows, "iceberg source loaded snapshot");
145
146 self.buffer.extend(batches);
147 self.last_snapshot_id = current_snap;
148
149 Ok(())
150 }
151}
152
153#[async_trait]
154impl SourceConnector for IcebergSource {
155 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
156 if !config.properties().is_empty() {
158 self.config = IcebergSourceConfig::from_config(config)?;
159 }
160
161 #[cfg(feature = "iceberg")]
162 {
163 let catalog = super::iceberg_io::build_catalog(&self.config.catalog).await?;
164 let table = super::iceberg_io::load_table(
165 catalog.as_ref(),
166 &self.config.catalog.namespace,
167 &self.config.catalog.table_name,
168 )
169 .await?;
170
171 let iceberg_schema = table.current_schema_ref();
172 let arrow_schema =
173 iceberg::arrow::schema_to_arrow_schema(&iceberg_schema).map_err(|e| {
174 ConnectorError::SchemaMismatch(format!("iceberg→arrow schema: {e}"))
175 })?;
176 self.schema = Some(Arc::new(arrow_schema));
177
178 info!(
179 table = self.config.catalog.table_name,
180 namespace = self.config.catalog.namespace,
181 "iceberg source connected"
182 );
183
184 self.catalog = Some(catalog);
185 self.table = Some(table);
186 self.state = ConnectorState::Running;
187 return Ok(());
188 }
189
190 #[cfg(not(feature = "iceberg"))]
191 {
192 self.state = ConnectorState::Failed;
193 Err(ConnectorError::ConfigurationError(
194 "Apache Iceberg requires the 'iceberg' feature".into(),
195 ))
196 }
197 }
198
199 async fn poll_batch(
200 &mut self,
201 max_records: usize,
202 ) -> Result<Option<SourceBatch>, ConnectorError> {
203 if let Some(batch) = self.buffer.pop_front() {
204 if batch.num_rows() <= max_records {
205 return Ok(Some(SourceBatch::new(batch)));
206 }
207 let take = batch.slice(0, max_records);
208 let remainder = batch.slice(max_records, batch.num_rows() - max_records);
209 self.buffer.push_front(remainder);
210 return Ok(Some(SourceBatch::new(take)));
211 }
212
213 #[cfg(feature = "iceberg")]
214 self.refresh().await?;
215
216 Ok(self.buffer.pop_front().map(SourceBatch::new))
217 }
218
219 fn schema(&self) -> SchemaRef {
220 self.schema
221 .clone()
222 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
223 }
224
225 fn checkpoint(&self) -> SourceCheckpoint {
226 let mut cp = SourceCheckpoint::new(self.epoch);
227 if let Some(sid) = self.last_snapshot_id {
228 cp.set_offset("snapshot_id", sid.to_string());
229 }
230 cp.set_metadata("connector_type", "iceberg");
231 cp
232 }
233
234 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
235 self.epoch = checkpoint.epoch();
236 if let Some(sid) = checkpoint.get_offset("snapshot_id") {
237 self.last_snapshot_id = Some(sid.parse().map_err(|_| {
238 ConnectorError::Internal(format!("invalid snapshot_id in checkpoint: '{sid}'"))
239 })?);
240 debug!(snapshot_id = ?self.last_snapshot_id, "iceberg source restored");
241 }
242 Ok(())
243 }
244
245 fn supports_replay(&self) -> bool {
246 false
247 }
248
249 fn health_check(&self) -> HealthStatus {
250 match self.state {
251 ConnectorState::Running => HealthStatus::Healthy,
252 ConnectorState::Failed => HealthStatus::Unhealthy("source failed".into()),
253 _ => HealthStatus::Unknown,
254 }
255 }
256
257 fn metrics(&self) -> ConnectorMetrics {
258 ConnectorMetrics::default()
259 }
260
261 async fn close(&mut self) -> Result<(), ConnectorError> {
262 #[cfg(feature = "iceberg")]
263 {
264 self.catalog = None;
265 self.table = None;
266 }
267 self.state = ConnectorState::Closed;
268 Ok(())
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::config::ConnectorConfig;
276
277 fn test_source_config() -> IcebergSourceConfig {
278 let mut config = ConnectorConfig::new("iceberg");
279 config.set("catalog.uri", "http://localhost:8181");
280 config.set("warehouse", "s3://test/wh");
281 config.set("namespace", "test");
282 config.set("table.name", "dim_customers");
283 IcebergSourceConfig::from_config(&config).unwrap()
284 }
285
286 #[test]
287 fn test_new_source() {
288 let source = IcebergSource::new(test_source_config(), None);
289 assert!(source.schema.is_none());
290 assert!(source.last_snapshot_id.is_none());
291 assert!(source.buffer.is_empty());
292 }
293
294 #[test]
295 fn test_checkpoint_round_trip() {
296 let mut source = IcebergSource::new(test_source_config(), None);
297 source.last_snapshot_id = Some(42);
298 source.epoch = 5;
299
300 let cp = source.checkpoint();
301 assert_eq!(cp.epoch(), 5);
302 assert_eq!(cp.get_offset("snapshot_id"), Some("42"));
303 assert_eq!(cp.get_metadata("connector_type"), Some("iceberg"));
304 }
305
306 #[tokio::test]
307 async fn test_restore_from_checkpoint() {
308 let mut source = IcebergSource::new(test_source_config(), None);
309 let mut cp = SourceCheckpoint::new(10);
310 cp.set_offset("snapshot_id", "123");
311
312 source.restore(&cp).await.unwrap();
313 assert_eq!(source.epoch, 10);
314 assert_eq!(source.last_snapshot_id, Some(123));
315 }
316
317 #[test]
318 fn test_supports_replay_false() {
319 let source = IcebergSource::new(test_source_config(), None);
320 assert!(!source.supports_replay());
321 }
322}