laminar_connectors/lakehouse/
delta_source.rs1use std::collections::VecDeque;
4use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9#[cfg(feature = "delta-lake")]
10use std::time::Instant;
11#[cfg(feature = "delta-lake")]
12use tracing::debug;
13use tracing::info;
14#[cfg(feature = "delta-lake")]
15use tracing::warn;
16
17#[cfg(feature = "delta-lake")]
18use deltalake::DeltaTable;
19
20use crate::checkpoint::SourceCheckpoint;
21use crate::config::{ConnectorConfig, ConnectorState};
22use crate::connector::{SourceBatch, SourceConnector};
23use crate::error::ConnectorError;
24
25use super::delta_source_config::DeltaSourceConfig;
26#[cfg(feature = "delta-lake")]
27use super::delta_source_config::{DeltaReadMode, SchemaEvolutionAction};
28
29pub struct DeltaSource {
43 config: DeltaSourceConfig,
45 state: ConnectorState,
47 schema: Option<SchemaRef>,
49 current_version: i64,
52 #[cfg(feature = "delta-lake")]
56 inflight_version: Option<i64>,
57 #[cfg(feature = "delta-lake")]
61 known_latest_version: i64,
62 pending_batches: VecDeque<RecordBatch>,
64 records_read: u64,
66 #[cfg(feature = "delta-lake")]
68 table: Option<DeltaTable>,
69 #[cfg(feature = "delta-lake")]
73 last_version_check: Option<Instant>,
74 #[cfg(feature = "delta-lake")]
77 projection_indices: Option<Vec<Option<usize>>>,
78}
79
80impl DeltaSource {
81 #[must_use]
83 pub fn new(config: DeltaSourceConfig, _registry: Option<&prometheus::Registry>) -> Self {
84 Self {
85 config,
86 state: ConnectorState::Created,
87 schema: None,
88 current_version: -1,
89 #[cfg(feature = "delta-lake")]
90 inflight_version: None,
91 #[cfg(feature = "delta-lake")]
92 known_latest_version: -1,
93 pending_batches: VecDeque::new(),
94 records_read: 0,
95 #[cfg(feature = "delta-lake")]
96 table: None,
97 #[cfg(feature = "delta-lake")]
98 last_version_check: None,
99 #[cfg(feature = "delta-lake")]
100 projection_indices: None,
101 }
102 }
103
104 #[must_use]
106 pub fn state(&self) -> ConnectorState {
107 self.state
108 }
109
110 #[must_use]
112 pub fn current_version(&self) -> i64 {
113 self.current_version
114 }
115
116 #[must_use]
118 pub fn config(&self) -> &DeltaSourceConfig {
119 &self.config
120 }
121
122 #[cfg(feature = "delta-lake")]
124 async fn reopen_table(&mut self) -> Result<(), ConnectorError> {
125 use super::delta_io;
126
127 let table = delta_io::open_or_create_table(
128 &self.config.table_path,
129 self.config.storage_options.clone(),
130 None,
131 )
132 .await?;
133
134 self.table = Some(table);
135 Ok(())
136 }
137}
138
139#[async_trait]
140#[allow(clippy::too_many_lines)]
141impl SourceConnector for DeltaSource {
142 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
143 self.state = ConnectorState::Initializing;
144
145 if !config.properties().is_empty() {
147 self.config = DeltaSourceConfig::from_config(config)?;
148 }
149
150 info!(
151 table_path = %self.config.table_path,
152 starting_version = ?self.config.starting_version,
153 "opening Delta Lake source connector"
154 );
155
156 #[cfg(feature = "delta-lake")]
157 {
158 use super::delta_io;
159
160 let table = delta_io::open_or_create_table(
162 &self.config.table_path,
163 self.config.storage_options.clone(),
164 None,
165 )
166 .await?;
167
168 if let Ok(schema) = delta_io::get_table_schema(&table) {
170 self.schema = Some(schema);
171 }
172
173 if let Some(start) = self.config.starting_version {
176 self.current_version = start;
177 } else {
178 self.current_version = -1;
179 }
180 let table_version = table.version().unwrap_or(0);
181
182 info!(
183 table_path = %self.config.table_path,
184 table_version,
185 current_version = self.current_version,
186 "Delta Lake source: resolved starting version"
187 );
188
189 self.table = Some(table);
190 }
191
192 #[cfg(not(feature = "delta-lake"))]
193 {
194 self.state = ConnectorState::Failed;
195 return Err(ConnectorError::ConfigurationError(
196 "Delta Lake source requires the 'delta-lake' feature to be enabled. \
197 Build with: cargo build --features delta-lake"
198 .into(),
199 ));
200 }
201
202 #[cfg(feature = "delta-lake")]
203 {
204 self.state = ConnectorState::Running;
205 info!("Delta Lake source connector opened successfully");
206 Ok(())
207 }
208 }
209
210 #[allow(unused_variables)]
211 async fn poll_batch(
212 &mut self,
213 max_records: usize,
214 ) -> Result<Option<SourceBatch>, ConnectorError> {
215 if self.state != ConnectorState::Running {
216 return Err(ConnectorError::InvalidState {
217 expected: "Running".into(),
218 actual: self.state.to_string(),
219 });
220 }
221
222 if let Some(batch) = self.pending_batches.pop_front() {
226 self.records_read += batch.num_rows() as u64;
227
228 #[cfg(feature = "delta-lake")]
229 if self.pending_batches.is_empty() {
230 if let Some(v) = self.inflight_version.take() {
231 self.current_version = v;
232 }
233 }
234
235 return Ok(Some(SourceBatch::new(batch)));
236 }
237
238 #[cfg(feature = "delta-lake")]
240 {
241 use super::delta_io;
242
243 if self.table.is_none() {
245 match self.reopen_table().await {
246 Ok(()) => {
247 info!("Delta Lake source: re-opened table after lost handle");
248 }
249 Err(e) => {
250 warn!(error = %e, "Delta Lake source: reopen failed, will retry");
251 return Ok(None);
252 }
253 }
254 }
255
256 let needs_refresh = self.known_latest_version <= self.current_version;
262 if needs_refresh {
263 if let Some(last_check) = self.last_version_check {
264 if last_check.elapsed() < self.config.poll_interval {
265 return Ok(None);
266 }
267 }
268 self.last_version_check = Some(Instant::now());
269
270 let table = self
271 .table
272 .as_mut()
273 .ok_or_else(|| ConnectorError::InvalidState {
274 expected: "table initialized".into(),
275 actual: "table not initialized".into(),
276 })?;
277 let latest_version = match delta_io::get_latest_version(table).await {
278 Ok(v) => v,
279 Err(e) => {
280 warn!(error = %e, "Delta Lake source: version check failed, will retry");
281 return Ok(None);
282 }
283 };
284 self.known_latest_version = latest_version;
285
286 if latest_version <= self.current_version {
287 return Ok(None); }
289
290 debug!(
291 current_version = self.current_version,
292 latest_version, "Delta Lake source: new version(s) available"
293 );
294 }
295
296 let target_version = match self.config.read_mode {
297 DeltaReadMode::Snapshot => self.known_latest_version,
298 DeltaReadMode::Incremental => self.current_version + 1,
299 };
300
301 let table = self
305 .table
306 .as_mut()
307 .ok_or_else(|| ConnectorError::InvalidState {
308 expected: "table initialized".into(),
309 actual: "table not initialized".into(),
310 })?;
311 let partition_filter = self.config.partition_filter.clone();
312
313 let mut use_snapshot_fallback = false;
316 if self.config.read_mode == DeltaReadMode::Incremental && target_version > 0 {
317 let log_store = table.log_store();
318 if let Ok(None) = log_store.read_commit_entry(target_version).await {
319 warn!(
320 target_version,
321 known_latest = self.known_latest_version,
322 "version unavailable, falling back to snapshot at latest"
323 );
324 use_snapshot_fallback = true;
325 }
326 }
327
328 let target_version = if use_snapshot_fallback {
331 self.known_latest_version
332 } else {
333 target_version
334 };
335
336 let (batches, fully_consumed) = if use_snapshot_fallback {
342 delta_io::read_batches_at_version(table, target_version, max_records).await?
343 } else if self.config.cdf_enabled
344 && self.config.read_mode == DeltaReadMode::Incremental
345 && target_version > 0
346 {
347 let taken_table =
350 self.table
351 .take()
352 .ok_or_else(|| ConnectorError::InvalidState {
353 expected: "table initialized".into(),
354 actual: "table not initialized".into(),
355 })?;
356 let cdf_batches =
357 delta_io::read_cdf_batches(taken_table, target_version, target_version).await?;
358
359 self.reopen_table().await?;
361
362 let mut mapped = Vec::new();
364 for batch in &cdf_batches {
365 if let Some(mapped_batch) = delta_io::map_cdf_to_changelog(batch)? {
366 mapped.push(mapped_batch);
367 }
368 }
369 (mapped, true)
370 } else {
371 match self.config.read_mode {
372 DeltaReadMode::Snapshot => {
373 delta_io::read_batches_at_version(table, target_version, max_records)
374 .await?
375 }
376 DeltaReadMode::Incremental => {
377 let (b, _) = delta_io::read_version_diff(
380 table,
381 target_version,
382 usize::MAX,
383 partition_filter.as_deref(),
384 )
385 .await?;
386 (b, true)
387 }
388 }
389 };
390
391 {
395 let table = self
396 .table
397 .as_ref()
398 .ok_or_else(|| ConnectorError::InvalidState {
399 expected: "table initialized".into(),
400 actual: "table not initialized".into(),
401 })?;
402 if let Ok(snapshot) = table.snapshot() {
403 let new_schema = snapshot.snapshot().arrow_schema();
404 if let Some(existing) = &self.schema {
405 if existing.fields() != new_schema.fields() {
406 match self.config.schema_evolution_action {
407 SchemaEvolutionAction::Warn => {
408 warn!(
409 table_path = %self.config.table_path,
410 old_fields = ?existing.fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>(),
411 new_fields = ?new_schema.fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>(),
412 "Delta Lake source: schema evolved, projecting to original"
413 );
414 let indices: Vec<Option<usize>> = existing
417 .fields()
418 .iter()
419 .map(|f| new_schema.index_of(f.name()).ok())
420 .collect();
421 self.projection_indices = Some(indices);
422 }
425 SchemaEvolutionAction::Error => {
426 return Err(ConnectorError::SchemaMismatch(format!(
427 "schema evolved at version {target_version}"
428 )));
429 }
430 }
431 }
432 } else {
433 self.schema = Some(new_schema);
434 }
435 }
436 }
437
438 for batch in batches {
442 if batch.num_rows() == 0 {
443 continue;
444 }
445 let batch = if let Some(ref indices) = self.projection_indices {
447 let original_schema = self.schema.as_ref().unwrap();
448 let num_rows = batch.num_rows();
449 let columns: Vec<Arc<dyn arrow_array::Array>> = indices
450 .iter()
451 .zip(original_schema.fields())
452 .map(|(idx, field)| match idx {
453 Some(i) => batch.column(*i).clone(),
454 None => arrow_array::new_null_array(field.data_type(), num_rows),
455 })
456 .collect();
457 RecordBatch::try_new(original_schema.clone(), columns).map_err(|e| {
458 ConnectorError::ReadError(format!(
459 "failed to project batch to original schema: {e}"
460 ))
461 })?
462 } else {
463 batch
464 };
465 self.pending_batches.push_back(batch);
466 }
467
468 if !fully_consumed {
469 } else if self.pending_batches.is_empty() {
474 self.current_version = target_version;
476 } else {
477 self.inflight_version = Some(target_version);
479 }
480
481 if let Some(batch) = self.pending_batches.pop_front() {
482 self.records_read += batch.num_rows() as u64;
483
484 if self.pending_batches.is_empty() {
486 if let Some(v) = self.inflight_version.take() {
487 self.current_version = v;
488 }
489 }
490
491 return Ok(Some(SourceBatch::new(batch)));
492 }
493 }
494
495 Ok(None)
496 }
497
498 fn schema(&self) -> SchemaRef {
499 self.schema
500 .clone()
501 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
502 }
503
504 fn checkpoint(&self) -> SourceCheckpoint {
505 let mut cp = SourceCheckpoint::new(0);
506 cp.set_offset("delta_version", self.current_version.to_string());
507 cp.set_offset("read_mode", self.config.read_mode.to_string());
508 cp
509 }
510
511 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
512 if let Some(version_str) = checkpoint.get_offset("delta_version") {
513 self.current_version = version_str.parse::<i64>().map_err(|_| {
514 ConnectorError::ConfigurationError(format!(
515 "invalid delta_version in checkpoint: '{version_str}'"
516 ))
517 })?;
518 info!(
519 restored_version = self.current_version,
520 "Delta Lake source: restored from checkpoint"
521 );
522 }
523 Ok(())
524 }
525
526 async fn close(&mut self) -> Result<(), ConnectorError> {
527 info!("closing Delta Lake source connector");
528
529 #[cfg(feature = "delta-lake")]
530 {
531 self.table = None;
532 }
533
534 self.pending_batches.clear();
535 self.state = ConnectorState::Closed;
536
537 info!(
538 table_path = %self.config.table_path,
539 current_version = self.current_version,
540 records_read = self.records_read,
541 "Delta Lake source connector closed"
542 );
543
544 Ok(())
545 }
546}
547
548impl std::fmt::Debug for DeltaSource {
549 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
550 f.debug_struct("DeltaSource")
551 .field("state", &self.state)
552 .field("table_path", &self.config.table_path)
553 .field("read_mode", &self.config.read_mode)
554 .field("current_version", &self.current_version)
555 .field("pending_batches", &self.pending_batches.len())
556 .field("records_read", &self.records_read)
557 .finish_non_exhaustive()
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use arrow_array::{Float64Array, Int64Array, StringArray};
565 use arrow_schema::{DataType, Field, Schema};
566
567 fn test_config() -> DeltaSourceConfig {
568 DeltaSourceConfig::new("/tmp/delta_source_test")
569 }
570
571 fn test_schema() -> SchemaRef {
572 Arc::new(Schema::new(vec![
573 Field::new("id", DataType::Int64, false),
574 Field::new("name", DataType::Utf8, true),
575 Field::new("value", DataType::Float64, true),
576 ]))
577 }
578
579 #[allow(clippy::cast_precision_loss)]
580 fn test_batch(n: usize) -> RecordBatch {
581 let ids: Vec<i64> = (0..n as i64).collect();
582 let names: Vec<&str> = (0..n).map(|_| "test").collect();
583 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
584
585 RecordBatch::try_new(
586 test_schema(),
587 vec![
588 Arc::new(Int64Array::from(ids)),
589 Arc::new(StringArray::from(names)),
590 Arc::new(Float64Array::from(values)),
591 ],
592 )
593 .unwrap()
594 }
595
596 #[test]
597 fn test_new_defaults() {
598 let source = DeltaSource::new(test_config(), None);
599 assert_eq!(source.state(), ConnectorState::Created);
600 assert_eq!(source.current_version(), -1);
601 assert!(source.schema.is_none());
602 }
603
604 #[test]
605 fn test_checkpoint_roundtrip() {
606 let mut source = DeltaSource::new(test_config(), None);
607 source.current_version = 42;
608
609 let cp = source.checkpoint();
610 assert_eq!(cp.get_offset("delta_version"), Some("42"));
611 }
612
613 #[tokio::test]
614 async fn test_restore_from_checkpoint() {
615 let mut source = DeltaSource::new(test_config(), None);
616 assert_eq!(source.current_version(), -1);
617
618 let mut cp = SourceCheckpoint::new(0);
619 cp.set_offset("delta_version", "10");
620 source.restore(&cp).await.unwrap();
621
622 assert_eq!(source.current_version(), 10);
623 }
624
625 #[test]
626 fn test_schema_empty_when_none() {
627 let source = DeltaSource::new(test_config(), None);
628 let schema = source.schema();
629 assert_eq!(schema.fields().len(), 0);
630 }
631
632 #[tokio::test]
633 async fn test_poll_not_running() {
634 let mut source = DeltaSource::new(test_config(), None);
635 let result = source.poll_batch(100).await;
637 assert!(result.is_err());
638 }
639
640 #[tokio::test]
641 async fn test_poll_returns_buffered_batches() {
642 let mut source = DeltaSource::new(test_config(), None);
643 source.state = ConnectorState::Running;
644
645 source.pending_batches.push_back(test_batch(5));
647 source.pending_batches.push_back(test_batch(3));
648
649 let batch1 = source.poll_batch(100).await.unwrap();
650 assert!(batch1.is_some());
651 assert_eq!(batch1.unwrap().records.num_rows(), 5);
652
653 let batch2 = source.poll_batch(100).await.unwrap();
654 assert!(batch2.is_some());
655 assert_eq!(batch2.unwrap().records.num_rows(), 3);
656
657 assert_eq!(source.records_read, 8);
658 }
659
660 #[tokio::test]
664 async fn test_poll_batch_returns_buffered_incrementally() {
665 let mut source = DeltaSource::new(test_config(), None);
666 source.state = ConnectorState::Running;
667
668 for _ in 0..10 {
670 source.pending_batches.push_back(test_batch(100));
671 }
672
673 let batch = source.poll_batch(50).await.unwrap();
675 assert!(batch.is_some());
676 assert_eq!(batch.unwrap().records.num_rows(), 100);
677 assert_eq!(source.pending_batches.len(), 9);
679 }
680
681 #[tokio::test]
685 async fn test_version_deferred_until_buffer_drained() {
686 let mut source = DeltaSource::new(test_config(), None);
687 source.state = ConnectorState::Running;
688 source.current_version = 5;
689
690 source.pending_batches.push_back(test_batch(10));
696 source.pending_batches.push_back(test_batch(10));
697 source.pending_batches.push_back(test_batch(10));
698
699 let b1 = source.poll_batch(100).await.unwrap();
702 assert!(b1.is_some());
703 assert_eq!(source.pending_batches.len(), 2);
704
705 let b2 = source.poll_batch(100).await.unwrap();
706 assert!(b2.is_some());
707 assert_eq!(source.pending_batches.len(), 1);
708
709 let b3 = source.poll_batch(100).await.unwrap();
710 assert!(b3.is_some());
711 assert!(source.pending_batches.is_empty());
712 assert_eq!(source.records_read, 30);
713 }
714
715 #[test]
718 fn test_poll_interval_is_stored() {
719 let mut config = test_config();
720 config.poll_interval = std::time::Duration::from_millis(500);
721 let source = DeltaSource::new(config, None);
722 assert_eq!(
723 source.config().poll_interval,
724 std::time::Duration::from_millis(500)
725 );
726 }
727
728 #[test]
729 fn test_debug_output() {
730 let source = DeltaSource::new(test_config(), None);
731 let debug = format!("{source:?}");
732 assert!(debug.contains("DeltaSource"));
733 assert!(debug.contains("/tmp/delta_source_test"));
734 }
735
736 #[tokio::test]
737 async fn test_close() {
738 let mut source = DeltaSource::new(test_config(), None);
739 source.state = ConnectorState::Running;
740 source.pending_batches.push_back(test_batch(5));
741
742 source.close().await.unwrap();
743 assert_eq!(source.state(), ConnectorState::Closed);
744 assert!(source.pending_batches.is_empty());
745 }
746
747 #[cfg(not(feature = "delta-lake"))]
749 #[tokio::test]
750 async fn test_open_requires_feature() {
751 let mut source = DeltaSource::new(test_config(), None);
752 let connector_config = crate::config::ConnectorConfig::new("delta-lake");
753 let result = source.open(&connector_config).await;
754 assert!(result.is_err());
755 let err = result.unwrap_err().to_string();
756 assert!(err.contains("delta-lake"), "error: {err}");
757 }
758}