1use std::collections::VecDeque;
4use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9#[cfg(feature = "delta-lake")]
10use std::time::Instant;
11#[cfg(feature = "delta-lake")]
12use tracing::debug;
13use tracing::info;
14#[cfg(feature = "delta-lake")]
15use tracing::warn;
16
17#[cfg(feature = "delta-lake")]
18use deltalake::DeltaTable;
19
20use crate::checkpoint::SourceCheckpoint;
21use crate::config::{ConnectorConfig, ConnectorState};
22use crate::connector::{SourceBatch, SourceConnector};
23use crate::error::ConnectorError;
24use crate::health::HealthStatus;
25use crate::metrics::ConnectorMetrics;
26
27use super::delta_source_config::DeltaSourceConfig;
28#[cfg(feature = "delta-lake")]
29use super::delta_source_config::{DeltaReadMode, SchemaEvolutionAction};
30
31pub struct DeltaSource {
45 config: DeltaSourceConfig,
47 state: ConnectorState,
49 schema: Option<SchemaRef>,
51 current_version: i64,
54 #[cfg(feature = "delta-lake")]
58 inflight_version: Option<i64>,
59 #[cfg(feature = "delta-lake")]
63 known_latest_version: i64,
64 pending_batches: VecDeque<RecordBatch>,
66 records_read: u64,
68 #[cfg(feature = "delta-lake")]
70 table: Option<DeltaTable>,
71 #[cfg(feature = "delta-lake")]
75 last_version_check: Option<Instant>,
76 #[cfg(feature = "delta-lake")]
79 projection_indices: Option<Vec<Option<usize>>>,
80}
81
82impl DeltaSource {
83 #[must_use]
85 pub fn new(config: DeltaSourceConfig, _registry: Option<&prometheus::Registry>) -> Self {
86 Self {
87 config,
88 state: ConnectorState::Created,
89 schema: None,
90 current_version: -1,
91 #[cfg(feature = "delta-lake")]
92 inflight_version: None,
93 #[cfg(feature = "delta-lake")]
94 known_latest_version: -1,
95 pending_batches: VecDeque::new(),
96 records_read: 0,
97 #[cfg(feature = "delta-lake")]
98 table: None,
99 #[cfg(feature = "delta-lake")]
100 last_version_check: None,
101 #[cfg(feature = "delta-lake")]
102 projection_indices: None,
103 }
104 }
105
106 #[must_use]
108 pub fn state(&self) -> ConnectorState {
109 self.state
110 }
111
112 #[must_use]
114 pub fn current_version(&self) -> i64 {
115 self.current_version
116 }
117
118 #[must_use]
120 pub fn config(&self) -> &DeltaSourceConfig {
121 &self.config
122 }
123
124 #[cfg(feature = "delta-lake")]
126 async fn reopen_table(&mut self) -> Result<(), ConnectorError> {
127 use super::delta_io;
128
129 let table = delta_io::open_or_create_table(
130 &self.config.table_path,
131 self.config.storage_options.clone(),
132 None,
133 )
134 .await?;
135
136 self.table = Some(table);
137 Ok(())
138 }
139}
140
141#[async_trait]
142#[allow(clippy::too_many_lines)]
143impl SourceConnector for DeltaSource {
144 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
145 self.state = ConnectorState::Initializing;
146
147 if !config.properties().is_empty() {
149 self.config = DeltaSourceConfig::from_config(config)?;
150 }
151
152 info!(
153 table_path = %self.config.table_path,
154 starting_version = ?self.config.starting_version,
155 "opening Delta Lake source connector"
156 );
157
158 #[cfg(feature = "delta-lake")]
159 {
160 use super::delta_io;
161
162 let table = delta_io::open_or_create_table(
164 &self.config.table_path,
165 self.config.storage_options.clone(),
166 None,
167 )
168 .await?;
169
170 if let Ok(schema) = delta_io::get_table_schema(&table) {
172 self.schema = Some(schema);
173 }
174
175 if let Some(start) = self.config.starting_version {
178 self.current_version = start;
179 } else {
180 self.current_version = -1;
181 }
182 let table_version = table.version().unwrap_or(0);
183
184 info!(
185 table_path = %self.config.table_path,
186 table_version,
187 current_version = self.current_version,
188 "Delta Lake source: resolved starting version"
189 );
190
191 self.table = Some(table);
192 }
193
194 #[cfg(not(feature = "delta-lake"))]
195 {
196 self.state = ConnectorState::Failed;
197 return Err(ConnectorError::ConfigurationError(
198 "Delta Lake source requires the 'delta-lake' feature to be enabled. \
199 Build with: cargo build --features delta-lake"
200 .into(),
201 ));
202 }
203
204 #[cfg(feature = "delta-lake")]
205 {
206 self.state = ConnectorState::Running;
207 info!("Delta Lake source connector opened successfully");
208 Ok(())
209 }
210 }
211
212 #[allow(unused_variables)]
213 async fn poll_batch(
214 &mut self,
215 max_records: usize,
216 ) -> Result<Option<SourceBatch>, ConnectorError> {
217 if self.state != ConnectorState::Running {
218 return Err(ConnectorError::InvalidState {
219 expected: "Running".into(),
220 actual: self.state.to_string(),
221 });
222 }
223
224 if let Some(batch) = self.pending_batches.pop_front() {
228 self.records_read += batch.num_rows() as u64;
229
230 #[cfg(feature = "delta-lake")]
231 if self.pending_batches.is_empty() {
232 if let Some(v) = self.inflight_version.take() {
233 self.current_version = v;
234 }
235 }
236
237 return Ok(Some(SourceBatch::new(batch)));
238 }
239
240 #[cfg(feature = "delta-lake")]
242 {
243 use super::delta_io;
244
245 if self.table.is_none() {
247 match self.reopen_table().await {
248 Ok(()) => {
249 info!("Delta Lake source: re-opened table after lost handle");
250 }
251 Err(e) => {
252 warn!(error = %e, "Delta Lake source: reopen failed, will retry");
253 return Ok(None);
254 }
255 }
256 }
257
258 let needs_refresh = self.known_latest_version <= self.current_version;
264 if needs_refresh {
265 if let Some(last_check) = self.last_version_check {
266 if last_check.elapsed() < self.config.poll_interval {
267 return Ok(None);
268 }
269 }
270 self.last_version_check = Some(Instant::now());
271
272 let table = self
273 .table
274 .as_mut()
275 .ok_or_else(|| ConnectorError::InvalidState {
276 expected: "table initialized".into(),
277 actual: "table not initialized".into(),
278 })?;
279 let latest_version = match delta_io::get_latest_version(table).await {
280 Ok(v) => v,
281 Err(e) => {
282 warn!(error = %e, "Delta Lake source: version check failed, will retry");
283 return Ok(None);
284 }
285 };
286 self.known_latest_version = latest_version;
287
288 if latest_version <= self.current_version {
289 return Ok(None); }
291
292 debug!(
293 current_version = self.current_version,
294 latest_version, "Delta Lake source: new version(s) available"
295 );
296 }
297
298 let target_version = match self.config.read_mode {
299 DeltaReadMode::Snapshot => self.known_latest_version,
300 DeltaReadMode::Incremental => self.current_version + 1,
301 };
302
303 let table = self
307 .table
308 .as_mut()
309 .ok_or_else(|| ConnectorError::InvalidState {
310 expected: "table initialized".into(),
311 actual: "table not initialized".into(),
312 })?;
313 let partition_filter = self.config.partition_filter.clone();
314
315 let mut use_snapshot_fallback = false;
318 if self.config.read_mode == DeltaReadMode::Incremental && target_version > 0 {
319 let log_store = table.log_store();
320 if let Ok(None) = log_store.read_commit_entry(target_version).await {
321 warn!(
322 target_version,
323 known_latest = self.known_latest_version,
324 "version unavailable, falling back to snapshot at latest"
325 );
326 use_snapshot_fallback = true;
327 }
328 }
329
330 let target_version = if use_snapshot_fallback {
333 self.known_latest_version
334 } else {
335 target_version
336 };
337
338 let (batches, fully_consumed) = if use_snapshot_fallback {
344 delta_io::read_batches_at_version(table, target_version, max_records).await?
345 } else if self.config.cdf_enabled
346 && self.config.read_mode == DeltaReadMode::Incremental
347 && target_version > 0
348 {
349 let taken_table =
352 self.table
353 .take()
354 .ok_or_else(|| ConnectorError::InvalidState {
355 expected: "table initialized".into(),
356 actual: "table not initialized".into(),
357 })?;
358 let cdf_batches =
359 delta_io::read_cdf_batches(taken_table, target_version, target_version).await?;
360
361 self.reopen_table().await?;
363
364 let mut mapped = Vec::new();
366 for batch in &cdf_batches {
367 if let Some(mapped_batch) = delta_io::map_cdf_to_changelog(batch)? {
368 mapped.push(mapped_batch);
369 }
370 }
371 (mapped, true)
372 } else {
373 match self.config.read_mode {
374 DeltaReadMode::Snapshot => {
375 delta_io::read_batches_at_version(table, target_version, max_records)
376 .await?
377 }
378 DeltaReadMode::Incremental => {
379 let (b, _) = delta_io::read_version_diff(
382 table,
383 target_version,
384 usize::MAX,
385 partition_filter.as_deref(),
386 )
387 .await?;
388 (b, true)
389 }
390 }
391 };
392
393 {
397 let table = self
398 .table
399 .as_ref()
400 .ok_or_else(|| ConnectorError::InvalidState {
401 expected: "table initialized".into(),
402 actual: "table not initialized".into(),
403 })?;
404 if let Ok(snapshot) = table.snapshot() {
405 let new_schema = snapshot.snapshot().arrow_schema();
406 if let Some(existing) = &self.schema {
407 if existing.fields() != new_schema.fields() {
408 match self.config.schema_evolution_action {
409 SchemaEvolutionAction::Warn => {
410 warn!(
411 table_path = %self.config.table_path,
412 old_fields = ?existing.fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>(),
413 new_fields = ?new_schema.fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>(),
414 "Delta Lake source: schema evolved, projecting to original"
415 );
416 let indices: Vec<Option<usize>> = existing
419 .fields()
420 .iter()
421 .map(|f| new_schema.index_of(f.name()).ok())
422 .collect();
423 self.projection_indices = Some(indices);
424 }
427 SchemaEvolutionAction::Error => {
428 return Err(ConnectorError::SchemaMismatch(format!(
429 "schema evolved at version {target_version}"
430 )));
431 }
432 }
433 }
434 } else {
435 self.schema = Some(new_schema);
436 }
437 }
438 }
439
440 for batch in batches {
444 if batch.num_rows() == 0 {
445 continue;
446 }
447 let batch = if let Some(ref indices) = self.projection_indices {
449 let original_schema = self.schema.as_ref().unwrap();
450 let num_rows = batch.num_rows();
451 let columns: Vec<Arc<dyn arrow_array::Array>> = indices
452 .iter()
453 .zip(original_schema.fields())
454 .map(|(idx, field)| match idx {
455 Some(i) => batch.column(*i).clone(),
456 None => arrow_array::new_null_array(field.data_type(), num_rows),
457 })
458 .collect();
459 RecordBatch::try_new(original_schema.clone(), columns).map_err(|e| {
460 ConnectorError::ReadError(format!(
461 "failed to project batch to original schema: {e}"
462 ))
463 })?
464 } else {
465 batch
466 };
467 self.pending_batches.push_back(batch);
468 }
469
470 if !fully_consumed {
471 } else if self.pending_batches.is_empty() {
476 self.current_version = target_version;
478 } else {
479 self.inflight_version = Some(target_version);
481 }
482
483 if let Some(batch) = self.pending_batches.pop_front() {
484 self.records_read += batch.num_rows() as u64;
485
486 if self.pending_batches.is_empty() {
488 if let Some(v) = self.inflight_version.take() {
489 self.current_version = v;
490 }
491 }
492
493 return Ok(Some(SourceBatch::new(batch)));
494 }
495 }
496
497 Ok(None)
498 }
499
500 fn schema(&self) -> SchemaRef {
501 self.schema
502 .clone()
503 .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
504 }
505
506 fn checkpoint(&self) -> SourceCheckpoint {
507 let mut cp = SourceCheckpoint::new(0);
508 cp.set_offset("delta_version", self.current_version.to_string());
509 cp.set_offset("read_mode", self.config.read_mode.to_string());
510 cp
511 }
512
513 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
514 if let Some(version_str) = checkpoint.get_offset("delta_version") {
515 self.current_version = version_str.parse::<i64>().map_err(|_| {
516 ConnectorError::ConfigurationError(format!(
517 "invalid delta_version in checkpoint: '{version_str}'"
518 ))
519 })?;
520 info!(
521 restored_version = self.current_version,
522 "Delta Lake source: restored from checkpoint"
523 );
524 }
525 Ok(())
526 }
527
528 fn health_check(&self) -> HealthStatus {
529 match self.state {
530 ConnectorState::Running => HealthStatus::Healthy,
531 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
532 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
533 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
534 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
535 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
536 }
537 }
538
539 fn metrics(&self) -> ConnectorMetrics {
540 ConnectorMetrics {
541 records_total: self.records_read,
542 ..ConnectorMetrics::default()
543 }
544 }
545
546 async fn close(&mut self) -> Result<(), ConnectorError> {
547 info!("closing Delta Lake source connector");
548
549 #[cfg(feature = "delta-lake")]
550 {
551 self.table = None;
552 }
553
554 self.pending_batches.clear();
555 self.state = ConnectorState::Closed;
556
557 info!(
558 table_path = %self.config.table_path,
559 current_version = self.current_version,
560 records_read = self.records_read,
561 "Delta Lake source connector closed"
562 );
563
564 Ok(())
565 }
566}
567
568impl std::fmt::Debug for DeltaSource {
569 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570 f.debug_struct("DeltaSource")
571 .field("state", &self.state)
572 .field("table_path", &self.config.table_path)
573 .field("read_mode", &self.config.read_mode)
574 .field("current_version", &self.current_version)
575 .field("pending_batches", &self.pending_batches.len())
576 .field("records_read", &self.records_read)
577 .finish_non_exhaustive()
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584 use arrow_array::{Float64Array, Int64Array, StringArray};
585 use arrow_schema::{DataType, Field, Schema};
586
587 fn test_config() -> DeltaSourceConfig {
588 DeltaSourceConfig::new("/tmp/delta_source_test")
589 }
590
591 fn test_schema() -> SchemaRef {
592 Arc::new(Schema::new(vec![
593 Field::new("id", DataType::Int64, false),
594 Field::new("name", DataType::Utf8, true),
595 Field::new("value", DataType::Float64, true),
596 ]))
597 }
598
599 #[allow(clippy::cast_precision_loss)]
600 fn test_batch(n: usize) -> RecordBatch {
601 let ids: Vec<i64> = (0..n as i64).collect();
602 let names: Vec<&str> = (0..n).map(|_| "test").collect();
603 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
604
605 RecordBatch::try_new(
606 test_schema(),
607 vec![
608 Arc::new(Int64Array::from(ids)),
609 Arc::new(StringArray::from(names)),
610 Arc::new(Float64Array::from(values)),
611 ],
612 )
613 .unwrap()
614 }
615
616 #[test]
617 fn test_new_defaults() {
618 let source = DeltaSource::new(test_config(), None);
619 assert_eq!(source.state(), ConnectorState::Created);
620 assert_eq!(source.current_version(), -1);
621 assert!(source.schema.is_none());
622 }
623
624 #[test]
625 fn test_checkpoint_roundtrip() {
626 let mut source = DeltaSource::new(test_config(), None);
627 source.current_version = 42;
628
629 let cp = source.checkpoint();
630 assert_eq!(cp.get_offset("delta_version"), Some("42"));
631 }
632
633 #[tokio::test]
634 async fn test_restore_from_checkpoint() {
635 let mut source = DeltaSource::new(test_config(), None);
636 assert_eq!(source.current_version(), -1);
637
638 let mut cp = SourceCheckpoint::new(0);
639 cp.set_offset("delta_version", "10");
640 source.restore(&cp).await.unwrap();
641
642 assert_eq!(source.current_version(), 10);
643 }
644
645 #[test]
646 fn test_health_check() {
647 let mut source = DeltaSource::new(test_config(), None);
648 assert_eq!(source.health_check(), HealthStatus::Unknown);
649
650 source.state = ConnectorState::Running;
651 assert_eq!(source.health_check(), HealthStatus::Healthy);
652
653 source.state = ConnectorState::Closed;
654 assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
655 }
656
657 #[test]
658 fn test_schema_empty_when_none() {
659 let source = DeltaSource::new(test_config(), None);
660 let schema = source.schema();
661 assert_eq!(schema.fields().len(), 0);
662 }
663
664 #[tokio::test]
665 async fn test_poll_not_running() {
666 let mut source = DeltaSource::new(test_config(), None);
667 let result = source.poll_batch(100).await;
669 assert!(result.is_err());
670 }
671
672 #[tokio::test]
673 async fn test_poll_returns_buffered_batches() {
674 let mut source = DeltaSource::new(test_config(), None);
675 source.state = ConnectorState::Running;
676
677 source.pending_batches.push_back(test_batch(5));
679 source.pending_batches.push_back(test_batch(3));
680
681 let batch1 = source.poll_batch(100).await.unwrap();
682 assert!(batch1.is_some());
683 assert_eq!(batch1.unwrap().records.num_rows(), 5);
684
685 let batch2 = source.poll_batch(100).await.unwrap();
686 assert!(batch2.is_some());
687 assert_eq!(batch2.unwrap().records.num_rows(), 3);
688
689 assert_eq!(source.records_read, 8);
690 }
691
692 #[tokio::test]
696 async fn test_poll_batch_returns_buffered_incrementally() {
697 let mut source = DeltaSource::new(test_config(), None);
698 source.state = ConnectorState::Running;
699
700 for _ in 0..10 {
702 source.pending_batches.push_back(test_batch(100));
703 }
704
705 let batch = source.poll_batch(50).await.unwrap();
707 assert!(batch.is_some());
708 assert_eq!(batch.unwrap().records.num_rows(), 100);
709 assert_eq!(source.pending_batches.len(), 9);
711 }
712
713 #[tokio::test]
717 async fn test_version_deferred_until_buffer_drained() {
718 let mut source = DeltaSource::new(test_config(), None);
719 source.state = ConnectorState::Running;
720 source.current_version = 5;
721
722 source.pending_batches.push_back(test_batch(10));
728 source.pending_batches.push_back(test_batch(10));
729 source.pending_batches.push_back(test_batch(10));
730
731 let b1 = source.poll_batch(100).await.unwrap();
734 assert!(b1.is_some());
735 assert_eq!(source.pending_batches.len(), 2);
736
737 let b2 = source.poll_batch(100).await.unwrap();
738 assert!(b2.is_some());
739 assert_eq!(source.pending_batches.len(), 1);
740
741 let b3 = source.poll_batch(100).await.unwrap();
742 assert!(b3.is_some());
743 assert!(source.pending_batches.is_empty());
744 assert_eq!(source.records_read, 30);
745 }
746
747 #[test]
750 fn test_poll_interval_is_stored() {
751 let mut config = test_config();
752 config.poll_interval = std::time::Duration::from_millis(500);
753 let source = DeltaSource::new(config, None);
754 assert_eq!(
755 source.config().poll_interval,
756 std::time::Duration::from_millis(500)
757 );
758 }
759
760 #[test]
761 fn test_debug_output() {
762 let source = DeltaSource::new(test_config(), None);
763 let debug = format!("{source:?}");
764 assert!(debug.contains("DeltaSource"));
765 assert!(debug.contains("/tmp/delta_source_test"));
766 }
767
768 #[tokio::test]
769 async fn test_close() {
770 let mut source = DeltaSource::new(test_config(), None);
771 source.state = ConnectorState::Running;
772 source.pending_batches.push_back(test_batch(5));
773
774 source.close().await.unwrap();
775 assert_eq!(source.state(), ConnectorState::Closed);
776 assert!(source.pending_batches.is_empty());
777 }
778
779 #[cfg(not(feature = "delta-lake"))]
781 #[tokio::test]
782 async fn test_open_requires_feature() {
783 let mut source = DeltaSource::new(test_config(), None);
784 let connector_config = crate::config::ConnectorConfig::new("delta-lake");
785 let result = source.open(&connector_config).await;
786 assert!(result.is_err());
787 let err = result.unwrap_err().to_string();
788 assert!(err.contains("delta-lake"), "error: {err}");
789 }
790}