laminar_connectors/lookup/
delta_reference.rs1use std::collections::VecDeque;
8
9use arrow_array::RecordBatch;
10use tracing::info;
11
12#[cfg(feature = "delta-lake")]
13use std::time::Instant;
14
15#[cfg(feature = "delta-lake")]
16use tracing::{debug, warn};
17
18#[cfg(feature = "delta-lake")]
19use deltalake::DeltaTable;
20
21use crate::checkpoint::SourceCheckpoint;
22use crate::config::ConnectorConfig;
23use crate::error::ConnectorError;
24use crate::lakehouse::delta_source_config::DeltaSourceConfig;
25#[cfg(feature = "delta-lake")]
26use crate::lakehouse::delta_source_config::SchemaEvolutionAction;
27use crate::reference::ReferenceTableSource;
28
29#[allow(dead_code)] enum Phase {
32 Init,
33 Snapshot,
34 Changes,
35 Closed,
36}
37
38pub struct DeltaReferenceTableSource {
44 #[allow(dead_code)] config: DeltaSourceConfig,
46 phase: Phase,
47 #[cfg(feature = "delta-lake")]
48 table: Option<DeltaTable>,
49 current_version: i64,
50 #[cfg(feature = "delta-lake")]
53 inflight_version: Option<i64>,
54 pending_batches: VecDeque<RecordBatch>,
55 #[cfg(feature = "delta-lake")]
56 last_version_check: Option<Instant>,
57 #[cfg(feature = "delta-lake")]
58 known_schema: Option<arrow_schema::SchemaRef>,
59}
60
61impl DeltaReferenceTableSource {
62 #[must_use]
64 pub fn from_source_config(config: DeltaSourceConfig) -> Self {
65 Self {
66 config,
67 phase: Phase::Init,
68 #[cfg(feature = "delta-lake")]
69 table: None,
70 current_version: -1,
71 #[cfg(feature = "delta-lake")]
72 inflight_version: None,
73 pending_batches: VecDeque::new(),
74 #[cfg(feature = "delta-lake")]
75 last_version_check: None,
76 #[cfg(feature = "delta-lake")]
77 known_schema: None,
78 }
79 }
80
81 pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
87 let source_config = DeltaSourceConfig::from_config(config)?;
88 Ok(Self::from_source_config(source_config))
89 }
90
91 #[cfg(feature = "delta-lake")]
92 async fn open_table(&mut self) -> Result<(), ConnectorError> {
93 use crate::lakehouse::delta_io;
94
95 let (resolved_path, resolved_opts) = delta_io::resolve_catalog_options(
96 &self.config.catalog_type,
97 self.config.catalog_database.as_deref(),
98 self.config.catalog_name.as_deref(),
99 self.config.catalog_schema.as_deref(),
100 &self.config.table_path,
101 &self.config.storage_options,
102 )
103 .await?;
104
105 info!(
106 table_path = %self.config.table_path,
107 resolved_path = %resolved_path,
108 catalog = %self.config.catalog_type,
109 "delta lookup: resolved catalog"
110 );
111
112 let table = delta_io::open_or_create_table(&resolved_path, resolved_opts, None).await?;
113
114 info!(
115 resolved_path = %resolved_path,
116 table_version = table.version().unwrap_or(0),
117 "delta lookup: table opened"
118 );
119
120 self.table = Some(table);
121 Ok(())
122 }
123
124 #[cfg(feature = "delta-lake")]
127 async fn load_snapshot(&mut self) -> Result<(), ConnectorError> {
128 use crate::lakehouse::delta_io;
129
130 let table = self
131 .table
132 .as_mut()
133 .ok_or_else(|| ConnectorError::Internal("table not opened".into()))?;
134
135 let latest = delta_io::get_latest_version(table).await?;
136
137 let batches = if self.config.partition_filter.is_some() {
138 self.load_snapshot_filtered(latest).await?
139 } else {
140 let (b, _) = delta_io::read_batches_at_version(table, latest, usize::MAX).await?;
141 b
142 };
143
144 if let Some(first) = batches.first() {
146 self.known_schema = Some(first.schema());
147 } else if self.known_schema.is_none() {
148 let table = self
149 .table
150 .as_ref()
151 .ok_or_else(|| ConnectorError::Internal("table not opened".into()))?;
152 if let Ok(s) = delta_io::get_table_schema(table) {
153 self.known_schema = Some(s);
154 }
155 }
156
157 let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
158 info!(
159 version = latest,
160 batches = batches.len(),
161 rows = total_rows,
162 partition_filter = ?self.config.partition_filter,
163 "delta lookup: snapshot loaded"
164 );
165
166 self.pending_batches = VecDeque::from(batches);
167 self.current_version = latest;
168 self.last_version_check = Some(Instant::now());
169 Ok(())
170 }
171
172 #[cfg(feature = "delta-lake")]
174 async fn load_snapshot_filtered(
175 &mut self,
176 version: i64,
177 ) -> Result<Vec<RecordBatch>, ConnectorError> {
178 use tokio_stream::StreamExt;
179
180 let table = self
181 .table
182 .as_mut()
183 .ok_or_else(|| ConnectorError::Internal("table not opened".into()))?;
184
185 table
186 .load_version(version)
187 .await
188 .map_err(|e| ConnectorError::ReadError(format!("load version {version}: {e}")))?;
189
190 let provider = table
191 .table_provider()
192 .build()
193 .await
194 .map_err(|e| ConnectorError::ReadError(format!("build table provider: {e}")))?;
195
196 let ctx = datafusion::prelude::SessionContext::new();
197 ctx.register_table("delta_lookup_scan", std::sync::Arc::new(provider))
198 .map_err(|e| ConnectorError::ReadError(format!("register scan: {e}")))?;
199
200 let filter = self.config.partition_filter.as_deref().unwrap_or("1=1");
202 let sql = format!("SELECT * FROM delta_lookup_scan WHERE {filter}");
203
204 let df = ctx
205 .sql(&sql)
206 .await
207 .map_err(|e| ConnectorError::ReadError(format!("filtered scan: {e}")))?;
208
209 let mut stream = df
210 .execute_stream()
211 .await
212 .map_err(|e| ConnectorError::ReadError(format!("stream: {e}")))?;
213
214 let mut batches = Vec::new();
215 while let Some(result) = stream.next().await {
216 let batch = result.map_err(|e| ConnectorError::ReadError(format!("batch: {e}")))?;
217 if batch.num_rows() > 0 {
218 batches.push(batch);
219 }
220 }
221
222 Ok(batches)
223 }
224
225 #[cfg(feature = "delta-lake")]
228 async fn check_for_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
229 use crate::lakehouse::delta_io;
230
231 const MAX_VERSIONS_PER_POLL: i64 = 10;
233
234 if let Some(last) = self.last_version_check {
236 if last.elapsed() < self.config.poll_interval {
237 return Ok(None);
238 }
239 }
240
241 let table = self
242 .table
243 .as_mut()
244 .ok_or_else(|| ConnectorError::Internal("table not opened".into()))?;
245
246 let latest = delta_io::get_latest_version(table).await?;
247
248 if latest <= self.current_version {
249 self.last_version_check = Some(Instant::now());
250 return Ok(None);
251 }
252
253 let target = latest.min(self.current_version + MAX_VERSIONS_PER_POLL);
254
255 let mut all_batches: Vec<RecordBatch> = Vec::new();
256 for v in (self.current_version + 1)..=target {
257 let (batches, _) = delta_io::read_version_diff(
258 table,
259 v,
260 usize::MAX,
261 self.config.partition_filter.as_deref(),
262 )
263 .await?;
264
265 if let Some(first) = batches.first() {
266 let new_schema = first.schema();
267 if let Some(known) = &self.known_schema {
268 if known.as_ref() != new_schema.as_ref() {
269 match self.config.schema_evolution_action {
270 SchemaEvolutionAction::Warn => {
271 warn!(version = v, "delta lookup: schema changed between versions");
272 self.known_schema = Some(new_schema);
273 }
274 SchemaEvolutionAction::Error => {
275 if v > self.current_version + 1 {
276 self.current_version = v - 1;
277 }
278 return Err(ConnectorError::Internal(format!(
279 "delta lookup: schema evolution detected at version {v} \
280 (action=error)"
281 )));
282 }
283 }
284 }
285 } else {
286 self.known_schema = Some(new_schema);
287 }
288 }
289
290 all_batches.extend(batches);
291 }
292
293 let total_rows: usize = all_batches.iter().map(RecordBatch::num_rows).sum();
294 if total_rows > 0 {
295 debug!(
296 from = self.current_version + 1,
297 to = target,
298 latest,
299 rows = total_rows,
300 "delta lookup: version diff loaded"
301 );
302 }
303
304 if target < latest {
306 self.last_version_check = None;
307 } else {
308 self.last_version_check = Some(Instant::now());
309 }
310
311 let mut batch_iter = all_batches.into_iter();
312 let first = batch_iter.next();
313 self.pending_batches.extend(batch_iter);
314
315 if self.pending_batches.is_empty() {
318 self.current_version = target;
319 } else {
320 self.inflight_version = Some(target);
321 }
322 Ok(first)
323 }
324}
325
326#[async_trait::async_trait]
327impl ReferenceTableSource for DeltaReferenceTableSource {
328 async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
329 if matches!(self.phase, Phase::Init) {
330 #[cfg(feature = "delta-lake")]
331 {
332 self.open_table().await?;
333 self.load_snapshot().await?;
334 self.phase = Phase::Snapshot;
335 }
336
337 #[cfg(not(feature = "delta-lake"))]
338 {
339 return Err(ConnectorError::ConfigurationError(
340 "Delta Lake lookup requires the 'delta-lake' feature. \
341 Build with: cargo build --features delta-lake"
342 .into(),
343 ));
344 }
345 }
346
347 if !matches!(self.phase, Phase::Snapshot) {
348 return Ok(None);
349 }
350
351 if let Some(batch) = self.pending_batches.pop_front() {
352 return Ok(Some(batch));
353 }
354
355 self.phase = Phase::Changes;
356 Ok(None)
357 }
358
359 fn is_snapshot_complete(&self) -> bool {
360 matches!(self.phase, Phase::Changes | Phase::Closed)
361 }
362
363 async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
364 if !matches!(self.phase, Phase::Changes) {
365 return Ok(None);
366 }
367
368 if let Some(batch) = self.pending_batches.pop_front() {
369 #[cfg(feature = "delta-lake")]
371 if self.pending_batches.is_empty() {
372 if let Some(v) = self.inflight_version.take() {
373 self.current_version = v;
374 }
375 }
376 return Ok(Some(batch));
377 }
378
379 #[cfg(feature = "delta-lake")]
380 {
381 return self.check_for_changes().await;
382 }
383
384 #[cfg(not(feature = "delta-lake"))]
385 Ok(None)
386 }
387
388 fn checkpoint(&self) -> SourceCheckpoint {
389 let mut cp = SourceCheckpoint::new(0);
390 cp.set_offset("delta_version", self.current_version.to_string());
391 cp
392 }
393
394 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
395 if let Some(v) = checkpoint.get_offset("delta_version") {
396 self.current_version = v.parse().map_err(|_| {
397 ConnectorError::Internal(format!("invalid delta_version in checkpoint: '{v}'"))
398 })?;
399 info!(
400 version = self.current_version,
401 "delta lookup: restored from checkpoint"
402 );
403 }
404 Ok(())
405 }
406
407 async fn close(&mut self) -> Result<(), ConnectorError> {
408 self.phase = Phase::Closed;
409 #[cfg(feature = "delta-lake")]
410 {
411 self.table = None;
412 }
413 self.pending_batches.clear();
414 Ok(())
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use crate::lakehouse::delta_source_config::DeltaSourceConfig;
422
423 #[test]
424 fn test_from_source_config() {
425 let config = DeltaSourceConfig::new("/tmp/test_delta");
426 let src = DeltaReferenceTableSource::from_source_config(config);
427 assert!(!src.is_snapshot_complete());
428 assert_eq!(src.current_version, -1);
429 }
430
431 #[test]
432 fn test_from_connector_config() {
433 let mut config = ConnectorConfig::new("delta-lake");
434 config.set("table.path", "/tmp/test_delta");
435 let src = DeltaReferenceTableSource::from_connector_config(&config).unwrap();
436 assert!(!src.is_snapshot_complete());
437 }
438
439 #[test]
440 fn test_from_connector_config_missing_path() {
441 let config = ConnectorConfig::new("delta-lake");
442 assert!(DeltaReferenceTableSource::from_connector_config(&config).is_err());
443 }
444
445 #[test]
446 fn test_checkpoint_round_trip() {
447 let config = DeltaSourceConfig::new("/tmp/test");
448 let mut src = DeltaReferenceTableSource::from_source_config(config);
449 src.current_version = 42;
450 let cp = src.checkpoint();
451 assert_eq!(cp.get_offset("delta_version"), Some("42"));
452 }
453
454 #[tokio::test]
455 async fn test_restore_from_checkpoint() {
456 let config = DeltaSourceConfig::new("/tmp/test");
457 let mut src = DeltaReferenceTableSource::from_source_config(config);
458 let mut cp = SourceCheckpoint::new(0);
459 cp.set_offset("delta_version", "17");
460 src.restore(&cp).await.unwrap();
461 assert_eq!(src.current_version, 17);
462 }
463
464 #[tokio::test]
465 async fn test_restore_invalid_version() {
466 let config = DeltaSourceConfig::new("/tmp/test");
467 let mut src = DeltaReferenceTableSource::from_source_config(config);
468 let mut cp = SourceCheckpoint::new(0);
469 cp.set_offset("delta_version", "not_a_number");
470 assert!(src.restore(&cp).await.is_err());
471 }
472
473 #[tokio::test]
474 async fn test_close_sets_phase() {
475 let config = DeltaSourceConfig::new("/tmp/test");
476 let mut src = DeltaReferenceTableSource::from_source_config(config);
477 src.close().await.unwrap();
478 assert!(src.is_snapshot_complete());
479 }
480
481 #[tokio::test]
482 async fn test_poll_changes_before_snapshot_returns_none() {
483 let config = DeltaSourceConfig::new("/tmp/test");
484 let mut src = DeltaReferenceTableSource::from_source_config(config);
485 assert!(src.poll_changes().await.unwrap().is_none());
486 }
487
488 #[cfg(feature = "delta-lake")]
489 mod integration {
490 use super::*;
491 use arrow_array::{Int64Array, StringArray};
492 use arrow_schema::{DataType, Field, Schema, SchemaRef};
493 use std::collections::HashMap;
494 use std::sync::Arc;
495 use tempfile::TempDir;
496
497 fn test_schema() -> SchemaRef {
498 Arc::new(Schema::new(vec![
499 Field::new("id", DataType::Int64, false),
500 Field::new("name", DataType::Utf8, true),
501 ]))
502 }
503
504 fn test_batch(ids: &[i64], names: &[&str]) -> RecordBatch {
505 RecordBatch::try_new(
506 test_schema(),
507 vec![
508 Arc::new(Int64Array::from(ids.to_vec())),
509 Arc::new(StringArray::from(names.to_vec())),
510 ],
511 )
512 .unwrap()
513 }
514
515 async fn write_delta_version(path: &str, batches: Vec<RecordBatch>, epoch: u64) -> i64 {
516 use crate::lakehouse::delta_io;
517 use deltalake::protocol::SaveMode;
518
519 let schema = test_schema();
520 let table = delta_io::open_or_create_table(path, HashMap::new(), Some(&schema))
521 .await
522 .unwrap();
523
524 let (_table, version) = delta_io::write_batches(
525 table,
526 batches,
527 "test-writer",
528 epoch,
529 SaveMode::Append,
530 None,
531 false,
532 None,
533 false,
534 None,
535 )
536 .await
537 .unwrap();
538 version
539 }
540
541 #[tokio::test]
542 async fn test_snapshot_lifecycle() {
543 let temp_dir = TempDir::new().unwrap();
544 let table_path = temp_dir.path().to_str().unwrap();
545
546 let batch = test_batch(&[1, 2, 3], &["Alice", "Bob", "Carol"]);
547 write_delta_version(table_path, vec![batch], 1).await;
548
549 let config = DeltaSourceConfig::new(table_path);
550 let mut src = DeltaReferenceTableSource::from_source_config(config);
551
552 assert!(!src.is_snapshot_complete());
553
554 let mut total_rows = 0;
555 while let Some(batch) = src.poll_snapshot().await.unwrap() {
556 total_rows += batch.num_rows();
557 }
558 assert_eq!(total_rows, 3);
559 assert!(src.is_snapshot_complete());
560 assert!(src.poll_snapshot().await.unwrap().is_none());
561 assert!(src.poll_changes().await.unwrap().is_none());
562
563 src.close().await.unwrap();
564 }
565
566 #[tokio::test]
567 async fn test_checkpoint_preserves_version() {
568 let temp_dir = TempDir::new().unwrap();
569 let table_path = temp_dir.path().to_str().unwrap();
570
571 write_delta_version(table_path, vec![test_batch(&[1], &["Alice"])], 1).await;
572
573 let config = DeltaSourceConfig::new(table_path);
574 let mut src = DeltaReferenceTableSource::from_source_config(config);
575 while src.poll_snapshot().await.unwrap().is_some() {}
576
577 let cp = src.checkpoint();
578 assert!(src.current_version >= 0);
579 assert_eq!(
580 cp.get_offset("delta_version"),
581 Some(src.current_version.to_string().as_str())
582 );
583 src.close().await.unwrap();
584 }
585
586 #[tokio::test]
587 async fn test_poll_changes_picks_up_new_version() {
588 let temp_dir = TempDir::new().unwrap();
589 let table_path = temp_dir.path().to_str().unwrap();
590
591 write_delta_version(table_path, vec![test_batch(&[1, 2], &["Alice", "Bob"])], 1).await;
593
594 let mut config = DeltaSourceConfig::new(table_path);
596 config.poll_interval = std::time::Duration::from_millis(0);
597 let mut src = DeltaReferenceTableSource::from_source_config(config);
598 while src.poll_snapshot().await.unwrap().is_some() {}
599 let v1 = src.current_version;
600
601 write_delta_version(table_path, vec![test_batch(&[3], &["Carol"])], 2).await;
603
604 let mut change_rows = 0;
606 while let Some(batch) = src.poll_changes().await.unwrap() {
607 change_rows += batch.num_rows();
608 }
609 assert_eq!(change_rows, 1);
610 assert!(src.current_version > v1);
611 src.close().await.unwrap();
612 }
613
614 #[tokio::test]
615 async fn test_from_connector_config_with_catalog_none() {
616 let temp_dir = TempDir::new().unwrap();
617 let table_path = temp_dir.path().to_str().unwrap();
618
619 write_delta_version(table_path, vec![test_batch(&[10], &["Test"])], 1).await;
620
621 let mut config = ConnectorConfig::new("delta-lake");
622 config.set("table.path", table_path);
623 config.set("catalog.type", "none");
624
625 let mut src = DeltaReferenceTableSource::from_connector_config(&config).unwrap();
626
627 let mut total_rows = 0;
628 while let Some(batch) = src.poll_snapshot().await.unwrap() {
629 total_rows += batch.num_rows();
630 }
631 assert_eq!(total_rows, 1);
632 assert!(src.is_snapshot_complete());
633 src.close().await.unwrap();
634 }
635 }
636}