Skip to main content

laminar_connectors/lakehouse/
delta_source.rs

1//! Delta Lake source connector.
2
3use std::collections::VecDeque;
4use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9#[cfg(feature = "delta-lake")]
10use std::time::Instant;
11#[cfg(feature = "delta-lake")]
12use tracing::debug;
13use tracing::info;
14#[cfg(feature = "delta-lake")]
15use tracing::warn;
16
17#[cfg(feature = "delta-lake")]
18use deltalake::DeltaTable;
19
20use crate::checkpoint::SourceCheckpoint;
21use crate::config::{ConnectorConfig, ConnectorState};
22use crate::connector::{SourceBatch, SourceConnector};
23use crate::error::ConnectorError;
24use crate::health::HealthStatus;
25use crate::metrics::ConnectorMetrics;
26
27use super::delta_source_config::DeltaSourceConfig;
28#[cfg(feature = "delta-lake")]
29use super::delta_source_config::{DeltaReadMode, SchemaEvolutionAction};
30
31/// Delta Lake source connector.
32///
33/// Reads Arrow `RecordBatch` data from Delta Lake tables by polling for
34/// new table versions. Supports both incremental (changes-only) and
35/// snapshot (full re-read) modes.
36///
37/// # Lifecycle
38///
39/// ```text
40/// new() -> open() -> [poll_batch()]* -> close()
41///                          |
42///                 checkpoint() / restore()
43/// ```
44pub struct DeltaSource {
45    /// Source configuration.
46    config: DeltaSourceConfig,
47    /// Connector lifecycle state.
48    state: ConnectorState,
49    /// Arrow schema (set from table metadata on open).
50    schema: Option<SchemaRef>,
51    /// Current Delta Lake version cursor — the last *fully consumed* version.
52    /// Only advanced after all buffered batches for a version are drained.
53    current_version: i64,
54    /// The version currently being drained. While `pending_batches` is
55    /// non-empty this holds the version they came from. Once drained,
56    /// `current_version` is advanced to this value and the field is cleared.
57    #[cfg(feature = "delta-lake")]
58    inflight_version: Option<i64>,
59    /// The latest version known at the table. Used in incremental mode to
60    /// walk versions one-by-one without re-calling `get_latest_version` for
61    /// each step.
62    #[cfg(feature = "delta-lake")]
63    known_latest_version: i64,
64    /// Buffered batches from the last version load.
65    pending_batches: VecDeque<RecordBatch>,
66    /// Total records read so far.
67    records_read: u64,
68    /// Delta Lake table handle.
69    #[cfg(feature = "delta-lake")]
70    table: Option<DeltaTable>,
71    /// Last time we checked for new Delta versions. Used to throttle
72    /// `get_latest_version()` calls to `poll_interval` instead of
73    /// hammering every source-adapter tick (10ms).
74    #[cfg(feature = "delta-lake")]
75    last_version_check: Option<Instant>,
76    /// Per-field projection: `Some(idx)` = take column idx from new batch,
77    /// `None` = emit null column. Aligned with `self.schema.fields()`.
78    #[cfg(feature = "delta-lake")]
79    projection_indices: Option<Vec<Option<usize>>>,
80}
81
82impl DeltaSource {
83    /// Creates a new Delta Lake source with the given configuration.
84    #[must_use]
85    pub fn new(config: DeltaSourceConfig, _registry: Option<&prometheus::Registry>) -> Self {
86        Self {
87            config,
88            state: ConnectorState::Created,
89            schema: None,
90            current_version: -1,
91            #[cfg(feature = "delta-lake")]
92            inflight_version: None,
93            #[cfg(feature = "delta-lake")]
94            known_latest_version: -1,
95            pending_batches: VecDeque::new(),
96            records_read: 0,
97            #[cfg(feature = "delta-lake")]
98            table: None,
99            #[cfg(feature = "delta-lake")]
100            last_version_check: None,
101            #[cfg(feature = "delta-lake")]
102            projection_indices: None,
103        }
104    }
105
106    /// Returns the current connector state.
107    #[must_use]
108    pub fn state(&self) -> ConnectorState {
109        self.state
110    }
111
112    /// Returns the current Delta Lake version cursor.
113    #[must_use]
114    pub fn current_version(&self) -> i64 {
115        self.current_version
116    }
117
118    /// Returns the source configuration.
119    #[must_use]
120    pub fn config(&self) -> &DeltaSourceConfig {
121        &self.config
122    }
123
124    /// Re-opens the Delta Lake table (e.g., after a connection failure).
125    #[cfg(feature = "delta-lake")]
126    async fn reopen_table(&mut self) -> Result<(), ConnectorError> {
127        use super::delta_io;
128
129        let table = delta_io::open_or_create_table(
130            &self.config.table_path,
131            self.config.storage_options.clone(),
132            None,
133        )
134        .await?;
135
136        self.table = Some(table);
137        Ok(())
138    }
139}
140
141#[async_trait]
142#[allow(clippy::too_many_lines)]
143impl SourceConnector for DeltaSource {
144    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
145        self.state = ConnectorState::Initializing;
146
147        // Re-parse config if properties provided.
148        if !config.properties().is_empty() {
149            self.config = DeltaSourceConfig::from_config(config)?;
150        }
151
152        info!(
153            table_path = %self.config.table_path,
154            starting_version = ?self.config.starting_version,
155            "opening Delta Lake source connector"
156        );
157
158        #[cfg(feature = "delta-lake")]
159        {
160            use super::delta_io;
161
162            // Open the existing table (source requires the table to exist).
163            let table = delta_io::open_or_create_table(
164                &self.config.table_path,
165                self.config.storage_options.clone(),
166                None,
167            )
168            .await?;
169
170            // Read schema from table.
171            if let Ok(schema) = delta_io::get_table_schema(&table) {
172                self.schema = Some(schema);
173            }
174
175            // Start from -1 (or explicit starting_version) and let
176            // poll_batch() walk versions incrementally with bounded reads.
177            if let Some(start) = self.config.starting_version {
178                self.current_version = start;
179            } else {
180                self.current_version = -1;
181            }
182            let table_version = table.version().unwrap_or(0);
183
184            info!(
185                table_path = %self.config.table_path,
186                table_version,
187                current_version = self.current_version,
188                "Delta Lake source: resolved starting version"
189            );
190
191            self.table = Some(table);
192        }
193
194        #[cfg(not(feature = "delta-lake"))]
195        {
196            self.state = ConnectorState::Failed;
197            return Err(ConnectorError::ConfigurationError(
198                "Delta Lake source requires the 'delta-lake' feature to be enabled. \
199                 Build with: cargo build --features delta-lake"
200                    .into(),
201            ));
202        }
203
204        #[cfg(feature = "delta-lake")]
205        {
206            self.state = ConnectorState::Running;
207            info!("Delta Lake source connector opened successfully");
208            Ok(())
209        }
210    }
211
212    #[allow(unused_variables)]
213    async fn poll_batch(
214        &mut self,
215        max_records: usize,
216    ) -> Result<Option<SourceBatch>, ConnectorError> {
217        if self.state != ConnectorState::Running {
218            return Err(ConnectorError::InvalidState {
219                expected: "Running".into(),
220                actual: self.state.to_string(),
221            });
222        }
223
224        // Return buffered batches first. When the buffer drains
225        // completely, advance current_version to the inflight version
226        // so that checkpoint() reports the fully-consumed position.
227        if let Some(batch) = self.pending_batches.pop_front() {
228            self.records_read += batch.num_rows() as u64;
229
230            #[cfg(feature = "delta-lake")]
231            if self.pending_batches.is_empty() {
232                if let Some(v) = self.inflight_version.take() {
233                    self.current_version = v;
234                }
235            }
236
237            return Ok(Some(SourceBatch::new(batch)));
238        }
239
240        // Check for new versions, throttled by poll_interval.
241        #[cfg(feature = "delta-lake")]
242        {
243            use super::delta_io;
244
245            // Recover from lost table handle (e.g., connection failure).
246            if self.table.is_none() {
247                match self.reopen_table().await {
248                    Ok(()) => {
249                        info!("Delta Lake source: re-opened table after lost handle");
250                    }
251                    Err(e) => {
252                        warn!(error = %e, "Delta Lake source: reopen failed, will retry");
253                        return Ok(None);
254                    }
255                }
256            }
257
258            // Throttle version checks: skip if less than poll_interval has
259            // elapsed since the last check. This prevents hammering
260            // get_latest_version() on every source-adapter tick (10ms).
261            // In incremental mode, skip the throttle if we already know
262            // there are more versions to process (catch-up).
263            let needs_refresh = self.known_latest_version <= self.current_version;
264            if needs_refresh {
265                if let Some(last_check) = self.last_version_check {
266                    if last_check.elapsed() < self.config.poll_interval {
267                        return Ok(None);
268                    }
269                }
270                self.last_version_check = Some(Instant::now());
271
272                let table = self
273                    .table
274                    .as_mut()
275                    .ok_or_else(|| ConnectorError::InvalidState {
276                        expected: "table initialized".into(),
277                        actual: "table not initialized".into(),
278                    })?;
279                let latest_version = match delta_io::get_latest_version(table).await {
280                    Ok(v) => v,
281                    Err(e) => {
282                        warn!(error = %e, "Delta Lake source: version check failed, will retry");
283                        return Ok(None);
284                    }
285                };
286                self.known_latest_version = latest_version;
287
288                if latest_version <= self.current_version {
289                    return Ok(None); // No new data
290                }
291
292                debug!(
293                    current_version = self.current_version,
294                    latest_version, "Delta Lake source: new version(s) available"
295                );
296            }
297
298            let target_version = match self.config.read_mode {
299                DeltaReadMode::Snapshot => self.known_latest_version,
300                DeltaReadMode::Incremental => self.current_version + 1,
301            };
302
303            // Read data first. Both read_batches_at_version and
304            // read_version_diff call load_version(target_version) internally,
305            // so the table's snapshot will be at target_version after this.
306            let table = self
307                .table
308                .as_mut()
309                .ok_or_else(|| ConnectorError::InvalidState {
310                    expected: "table initialized".into(),
311                    actual: "table not initialized".into(),
312                })?;
313            let partition_filter = self.config.partition_filter.clone();
314
315            // Version-gap detection: if the target commit was cleaned up,
316            // skip ahead to a snapshot at the latest available version.
317            let mut use_snapshot_fallback = false;
318            if self.config.read_mode == DeltaReadMode::Incremental && target_version > 0 {
319                let log_store = table.log_store();
320                if let Ok(None) = log_store.read_commit_entry(target_version).await {
321                    warn!(
322                        target_version,
323                        known_latest = self.known_latest_version,
324                        "version unavailable, falling back to snapshot at latest"
325                    );
326                    use_snapshot_fallback = true;
327                }
328            }
329
330            // On gap fallback, override target_version so inflight_version
331            // tracks the snapshot version, not the missing one.
332            let target_version = if use_snapshot_fallback {
333                self.known_latest_version
334            } else {
335                target_version
336            };
337
338            // Incremental reads (read_version_diff, CDF) always consume the
339            // full version — each version's diff is O(new_files), not
340            // O(table_size), so unbounded reads are safe. This avoids the
341            // re-read-from-start problem when max_records truncates a version.
342            // Snapshot reads stay bounded by max_records (can be table-sized).
343            let (batches, fully_consumed) = if use_snapshot_fallback {
344                delta_io::read_batches_at_version(table, target_version, max_records).await?
345            } else if self.config.cdf_enabled
346                && self.config.read_mode == DeltaReadMode::Incremental
347                && target_version > 0
348            {
349                // CDF mode: scan_cdf() consumes the DeltaTable. Take it,
350                // read CDF batches, then re-open the table handle.
351                let taken_table =
352                    self.table
353                        .take()
354                        .ok_or_else(|| ConnectorError::InvalidState {
355                            expected: "table initialized".into(),
356                            actual: "table not initialized".into(),
357                        })?;
358                let cdf_batches =
359                    delta_io::read_cdf_batches(taken_table, target_version, target_version).await?;
360
361                // Re-open table since scan_cdf consumed it.
362                self.reopen_table().await?;
363
364                // Map CDF _change_type to LaminarDB _op.
365                let mut mapped = Vec::new();
366                for batch in &cdf_batches {
367                    if let Some(mapped_batch) = delta_io::map_cdf_to_changelog(batch)? {
368                        mapped.push(mapped_batch);
369                    }
370                }
371                (mapped, true)
372            } else {
373                match self.config.read_mode {
374                    DeltaReadMode::Snapshot => {
375                        delta_io::read_batches_at_version(table, target_version, max_records)
376                            .await?
377                    }
378                    DeltaReadMode::Incremental => {
379                        // Read full version diff — each version is one
380                        // commit's worth of files, safe to read unbounded.
381                        let (b, _) = delta_io::read_version_diff(
382                            table,
383                            target_version,
384                            usize::MAX,
385                            partition_filter.as_deref(),
386                        )
387                        .await?;
388                        (b, true)
389                    }
390                }
391            };
392
393            // Schema evolution detection: extract schema from the snapshot
394            // that read_version_diff/read_batches_at_version already loaded.
395            // This avoids a redundant load_version call.
396            {
397                let table = self
398                    .table
399                    .as_ref()
400                    .ok_or_else(|| ConnectorError::InvalidState {
401                        expected: "table initialized".into(),
402                        actual: "table not initialized".into(),
403                    })?;
404                if let Ok(snapshot) = table.snapshot() {
405                    let new_schema = snapshot.snapshot().arrow_schema();
406                    if let Some(existing) = &self.schema {
407                        if existing.fields() != new_schema.fields() {
408                            match self.config.schema_evolution_action {
409                                SchemaEvolutionAction::Warn => {
410                                    warn!(
411                                        table_path = %self.config.table_path,
412                                        old_fields = ?existing.fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>(),
413                                        new_fields = ?new_schema.fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>(),
414                                        "Delta Lake source: schema evolved, projecting to original"
415                                    );
416                                    // Map each original field to its index in the new
417                                    // schema, or None if the field was removed.
418                                    let indices: Vec<Option<usize>> = existing
419                                        .fields()
420                                        .iter()
421                                        .map(|f| new_schema.index_of(f.name()).ok())
422                                        .collect();
423                                    self.projection_indices = Some(indices);
424                                    // Do NOT update self.schema — keep the original
425                                    // for downstream stability.
426                                }
427                                SchemaEvolutionAction::Error => {
428                                    return Err(ConnectorError::SchemaMismatch(format!(
429                                        "schema evolved at version {target_version}"
430                                    )));
431                                }
432                            }
433                        }
434                    } else {
435                        self.schema = Some(new_schema);
436                    }
437                }
438            }
439
440            // Buffer all batches. Do NOT advance current_version yet —
441            // it is only safe to checkpoint this version after the
442            // buffer is fully drained. Store it as inflight_version.
443            for batch in batches {
444                if batch.num_rows() == 0 {
445                    continue;
446                }
447                // Apply schema projection if schema evolution was detected.
448                let batch = if let Some(ref indices) = self.projection_indices {
449                    let original_schema = self.schema.as_ref().unwrap();
450                    let num_rows = batch.num_rows();
451                    let columns: Vec<Arc<dyn arrow_array::Array>> = indices
452                        .iter()
453                        .zip(original_schema.fields())
454                        .map(|(idx, field)| match idx {
455                            Some(i) => batch.column(*i).clone(),
456                            None => arrow_array::new_null_array(field.data_type(), num_rows),
457                        })
458                        .collect();
459                    RecordBatch::try_new(original_schema.clone(), columns).map_err(|e| {
460                        ConnectorError::ReadError(format!(
461                            "failed to project batch to original schema: {e}"
462                        ))
463                    })?
464                } else {
465                    batch
466                };
467                self.pending_batches.push_back(batch);
468            }
469
470            if !fully_consumed {
471                // Snapshot mode: max_records truncated the version.
472                // Don't advance — next poll re-reads the same version.
473                // (Incremental mode always reads full versions, so
474                // fully_consumed is always true there.)
475            } else if self.pending_batches.is_empty() {
476                // Version fully consumed with no data rows (metadata-only).
477                self.current_version = target_version;
478            } else {
479                // Version fully consumed, batches buffered. Advance after drain.
480                self.inflight_version = Some(target_version);
481            }
482
483            if let Some(batch) = self.pending_batches.pop_front() {
484                self.records_read += batch.num_rows() as u64;
485
486                // Single-batch version: buffer is already empty, advance now.
487                if self.pending_batches.is_empty() {
488                    if let Some(v) = self.inflight_version.take() {
489                        self.current_version = v;
490                    }
491                }
492
493                return Ok(Some(SourceBatch::new(batch)));
494            }
495        }
496
497        Ok(None)
498    }
499
500    fn schema(&self) -> SchemaRef {
501        self.schema
502            .clone()
503            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
504    }
505
506    fn checkpoint(&self) -> SourceCheckpoint {
507        let mut cp = SourceCheckpoint::new(0);
508        cp.set_offset("delta_version", self.current_version.to_string());
509        cp.set_offset("read_mode", self.config.read_mode.to_string());
510        cp
511    }
512
513    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
514        if let Some(version_str) = checkpoint.get_offset("delta_version") {
515            self.current_version = version_str.parse::<i64>().map_err(|_| {
516                ConnectorError::ConfigurationError(format!(
517                    "invalid delta_version in checkpoint: '{version_str}'"
518                ))
519            })?;
520            info!(
521                restored_version = self.current_version,
522                "Delta Lake source: restored from checkpoint"
523            );
524        }
525        Ok(())
526    }
527
528    fn health_check(&self) -> HealthStatus {
529        match self.state {
530            ConnectorState::Running => HealthStatus::Healthy,
531            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
532            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
533            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
534            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
535            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
536        }
537    }
538
539    fn metrics(&self) -> ConnectorMetrics {
540        ConnectorMetrics {
541            records_total: self.records_read,
542            ..ConnectorMetrics::default()
543        }
544    }
545
546    async fn close(&mut self) -> Result<(), ConnectorError> {
547        info!("closing Delta Lake source connector");
548
549        #[cfg(feature = "delta-lake")]
550        {
551            self.table = None;
552        }
553
554        self.pending_batches.clear();
555        self.state = ConnectorState::Closed;
556
557        info!(
558            table_path = %self.config.table_path,
559            current_version = self.current_version,
560            records_read = self.records_read,
561            "Delta Lake source connector closed"
562        );
563
564        Ok(())
565    }
566}
567
568impl std::fmt::Debug for DeltaSource {
569    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570        f.debug_struct("DeltaSource")
571            .field("state", &self.state)
572            .field("table_path", &self.config.table_path)
573            .field("read_mode", &self.config.read_mode)
574            .field("current_version", &self.current_version)
575            .field("pending_batches", &self.pending_batches.len())
576            .field("records_read", &self.records_read)
577            .finish_non_exhaustive()
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584    use arrow_array::{Float64Array, Int64Array, StringArray};
585    use arrow_schema::{DataType, Field, Schema};
586
587    fn test_config() -> DeltaSourceConfig {
588        DeltaSourceConfig::new("/tmp/delta_source_test")
589    }
590
591    fn test_schema() -> SchemaRef {
592        Arc::new(Schema::new(vec![
593            Field::new("id", DataType::Int64, false),
594            Field::new("name", DataType::Utf8, true),
595            Field::new("value", DataType::Float64, true),
596        ]))
597    }
598
599    #[allow(clippy::cast_precision_loss)]
600    fn test_batch(n: usize) -> RecordBatch {
601        let ids: Vec<i64> = (0..n as i64).collect();
602        let names: Vec<&str> = (0..n).map(|_| "test").collect();
603        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
604
605        RecordBatch::try_new(
606            test_schema(),
607            vec![
608                Arc::new(Int64Array::from(ids)),
609                Arc::new(StringArray::from(names)),
610                Arc::new(Float64Array::from(values)),
611            ],
612        )
613        .unwrap()
614    }
615
616    #[test]
617    fn test_new_defaults() {
618        let source = DeltaSource::new(test_config(), None);
619        assert_eq!(source.state(), ConnectorState::Created);
620        assert_eq!(source.current_version(), -1);
621        assert!(source.schema.is_none());
622    }
623
624    #[test]
625    fn test_checkpoint_roundtrip() {
626        let mut source = DeltaSource::new(test_config(), None);
627        source.current_version = 42;
628
629        let cp = source.checkpoint();
630        assert_eq!(cp.get_offset("delta_version"), Some("42"));
631    }
632
633    #[tokio::test]
634    async fn test_restore_from_checkpoint() {
635        let mut source = DeltaSource::new(test_config(), None);
636        assert_eq!(source.current_version(), -1);
637
638        let mut cp = SourceCheckpoint::new(0);
639        cp.set_offset("delta_version", "10");
640        source.restore(&cp).await.unwrap();
641
642        assert_eq!(source.current_version(), 10);
643    }
644
645    #[test]
646    fn test_health_check() {
647        let mut source = DeltaSource::new(test_config(), None);
648        assert_eq!(source.health_check(), HealthStatus::Unknown);
649
650        source.state = ConnectorState::Running;
651        assert_eq!(source.health_check(), HealthStatus::Healthy);
652
653        source.state = ConnectorState::Closed;
654        assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
655    }
656
657    #[test]
658    fn test_schema_empty_when_none() {
659        let source = DeltaSource::new(test_config(), None);
660        let schema = source.schema();
661        assert_eq!(schema.fields().len(), 0);
662    }
663
664    #[tokio::test]
665    async fn test_poll_not_running() {
666        let mut source = DeltaSource::new(test_config(), None);
667        // state is Created, not Running
668        let result = source.poll_batch(100).await;
669        assert!(result.is_err());
670    }
671
672    #[tokio::test]
673    async fn test_poll_returns_buffered_batches() {
674        let mut source = DeltaSource::new(test_config(), None);
675        source.state = ConnectorState::Running;
676
677        // Manually buffer some batches.
678        source.pending_batches.push_back(test_batch(5));
679        source.pending_batches.push_back(test_batch(3));
680
681        let batch1 = source.poll_batch(100).await.unwrap();
682        assert!(batch1.is_some());
683        assert_eq!(batch1.unwrap().records.num_rows(), 5);
684
685        let batch2 = source.poll_batch(100).await.unwrap();
686        assert!(batch2.is_some());
687        assert_eq!(batch2.unwrap().records.num_rows(), 3);
688
689        assert_eq!(source.records_read, 8);
690    }
691
692    /// D002/D003: Verify `max_records` bounds the pending buffer.
693    /// Without the delta-lake feature, `poll_batch` returns buffered data
694    /// incrementally; with the feature, `read_batches_at_version` applies LIMIT.
695    #[tokio::test]
696    async fn test_poll_batch_returns_buffered_incrementally() {
697        let mut source = DeltaSource::new(test_config(), None);
698        source.state = ConnectorState::Running;
699
700        // Simulate what read_batches_at_version produces: many small batches
701        for _ in 0..10 {
702            source.pending_batches.push_back(test_batch(100));
703        }
704
705        // Each poll_batch returns exactly one buffered batch
706        let batch = source.poll_batch(50).await.unwrap();
707        assert!(batch.is_some());
708        assert_eq!(batch.unwrap().records.num_rows(), 100);
709        // 9 remaining
710        assert_eq!(source.pending_batches.len(), 9);
711    }
712
713    /// Version is only advanced after the inflight buffer is fully drained.
714    /// With multiple buffered batches, `current_version` stays at the old value
715    /// until the last batch is consumed, then jumps to the target version.
716    #[tokio::test]
717    async fn test_version_deferred_until_buffer_drained() {
718        let mut source = DeltaSource::new(test_config(), None);
719        source.state = ConnectorState::Running;
720        source.current_version = 5;
721
722        // Simulate: read_batches_at_version loaded version 42 with 3 batches.
723        // In production the delta-lake cfg block sets inflight_version; here
724        // we set it manually to test the drain logic (which is not cfg-gated
725        // inside the pop_front path above — it is, so we test via the
726        // non-feature path by just checking the pending_batches drain).
727        source.pending_batches.push_back(test_batch(10));
728        source.pending_batches.push_back(test_batch(10));
729        source.pending_batches.push_back(test_batch(10));
730
731        // Without delta-lake feature, inflight_version doesn't exist, so
732        // current_version won't auto-advance. Verify the buffer drains.
733        let b1 = source.poll_batch(100).await.unwrap();
734        assert!(b1.is_some());
735        assert_eq!(source.pending_batches.len(), 2);
736
737        let b2 = source.poll_batch(100).await.unwrap();
738        assert!(b2.is_some());
739        assert_eq!(source.pending_batches.len(), 1);
740
741        let b3 = source.poll_batch(100).await.unwrap();
742        assert!(b3.is_some());
743        assert!(source.pending_batches.is_empty());
744        assert_eq!(source.records_read, 30);
745    }
746
747    /// D004: `poll_interval` is parsed and stored in config.
748    /// The field is used by the delta-lake feature to throttle version checks.
749    #[test]
750    fn test_poll_interval_is_stored() {
751        let mut config = test_config();
752        config.poll_interval = std::time::Duration::from_millis(500);
753        let source = DeltaSource::new(config, None);
754        assert_eq!(
755            source.config().poll_interval,
756            std::time::Duration::from_millis(500)
757        );
758    }
759
760    #[test]
761    fn test_debug_output() {
762        let source = DeltaSource::new(test_config(), None);
763        let debug = format!("{source:?}");
764        assert!(debug.contains("DeltaSource"));
765        assert!(debug.contains("/tmp/delta_source_test"));
766    }
767
768    #[tokio::test]
769    async fn test_close() {
770        let mut source = DeltaSource::new(test_config(), None);
771        source.state = ConnectorState::Running;
772        source.pending_batches.push_back(test_batch(5));
773
774        source.close().await.unwrap();
775        assert_eq!(source.state(), ConnectorState::Closed);
776        assert!(source.pending_batches.is_empty());
777    }
778
779    /// D020: Source `open()` must error without delta-lake feature.
780    #[cfg(not(feature = "delta-lake"))]
781    #[tokio::test]
782    async fn test_open_requires_feature() {
783        let mut source = DeltaSource::new(test_config(), None);
784        let connector_config = crate::config::ConnectorConfig::new("delta-lake");
785        let result = source.open(&connector_config).await;
786        assert!(result.is_err());
787        let err = result.unwrap_err().to_string();
788        assert!(err.contains("delta-lake"), "error: {err}");
789    }
790}