Skip to main content

laminar_connectors/lakehouse/
delta_source.rs

1//! Delta Lake source connector.
2
3use std::collections::VecDeque;
4use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9#[cfg(feature = "delta-lake")]
10use std::time::Instant;
11#[cfg(feature = "delta-lake")]
12use tracing::debug;
13use tracing::info;
14#[cfg(feature = "delta-lake")]
15use tracing::warn;
16
17#[cfg(feature = "delta-lake")]
18use deltalake::DeltaTable;
19
20use crate::checkpoint::SourceCheckpoint;
21use crate::config::{ConnectorConfig, ConnectorState};
22use crate::connector::{SourceBatch, SourceConnector};
23use crate::error::ConnectorError;
24
25use super::delta_source_config::DeltaSourceConfig;
26#[cfg(feature = "delta-lake")]
27use super::delta_source_config::{DeltaReadMode, SchemaEvolutionAction};
28
29/// Delta Lake source connector.
30///
31/// Reads Arrow `RecordBatch` data from Delta Lake tables by polling for
32/// new table versions. Supports both incremental (changes-only) and
33/// snapshot (full re-read) modes.
34///
35/// # Lifecycle
36///
37/// ```text
38/// new() -> open() -> [poll_batch()]* -> close()
39///                          |
40///                 checkpoint() / restore()
41/// ```
42pub struct DeltaSource {
43    /// Source configuration.
44    config: DeltaSourceConfig,
45    /// Connector lifecycle state.
46    state: ConnectorState,
47    /// Arrow schema (set from table metadata on open).
48    schema: Option<SchemaRef>,
49    /// Current Delta Lake version cursor — the last *fully consumed* version.
50    /// Only advanced after all buffered batches for a version are drained.
51    current_version: i64,
52    /// The version currently being drained. While `pending_batches` is
53    /// non-empty this holds the version they came from. Once drained,
54    /// `current_version` is advanced to this value and the field is cleared.
55    #[cfg(feature = "delta-lake")]
56    inflight_version: Option<i64>,
57    /// The latest version known at the table. Used in incremental mode to
58    /// walk versions one-by-one without re-calling `get_latest_version` for
59    /// each step.
60    #[cfg(feature = "delta-lake")]
61    known_latest_version: i64,
62    /// Buffered batches from the last version load.
63    pending_batches: VecDeque<RecordBatch>,
64    /// Total records read so far.
65    records_read: u64,
66    /// Delta Lake table handle.
67    #[cfg(feature = "delta-lake")]
68    table: Option<DeltaTable>,
69    /// Last time we checked for new Delta versions. Used to throttle
70    /// `get_latest_version()` calls to `poll_interval` instead of
71    /// hammering every source-adapter tick (10ms).
72    #[cfg(feature = "delta-lake")]
73    last_version_check: Option<Instant>,
74    /// Per-field projection: `Some(idx)` = take column idx from new batch,
75    /// `None` = emit null column. Aligned with `self.schema.fields()`.
76    #[cfg(feature = "delta-lake")]
77    projection_indices: Option<Vec<Option<usize>>>,
78}
79
80impl DeltaSource {
81    /// Creates a new Delta Lake source with the given configuration.
82    #[must_use]
83    pub fn new(config: DeltaSourceConfig, _registry: Option<&prometheus::Registry>) -> Self {
84        Self {
85            config,
86            state: ConnectorState::Created,
87            schema: None,
88            current_version: -1,
89            #[cfg(feature = "delta-lake")]
90            inflight_version: None,
91            #[cfg(feature = "delta-lake")]
92            known_latest_version: -1,
93            pending_batches: VecDeque::new(),
94            records_read: 0,
95            #[cfg(feature = "delta-lake")]
96            table: None,
97            #[cfg(feature = "delta-lake")]
98            last_version_check: None,
99            #[cfg(feature = "delta-lake")]
100            projection_indices: None,
101        }
102    }
103
104    /// Returns the current connector state.
105    #[must_use]
106    pub fn state(&self) -> ConnectorState {
107        self.state
108    }
109
110    /// Returns the current Delta Lake version cursor.
111    #[must_use]
112    pub fn current_version(&self) -> i64 {
113        self.current_version
114    }
115
116    /// Returns the source configuration.
117    #[must_use]
118    pub fn config(&self) -> &DeltaSourceConfig {
119        &self.config
120    }
121
122    /// Re-opens the Delta Lake table (e.g., after a connection failure).
123    #[cfg(feature = "delta-lake")]
124    async fn reopen_table(&mut self) -> Result<(), ConnectorError> {
125        use super::delta_io;
126
127        let table = delta_io::open_or_create_table(
128            &self.config.table_path,
129            self.config.storage_options.clone(),
130            None,
131        )
132        .await?;
133
134        self.table = Some(table);
135        Ok(())
136    }
137}
138
139#[async_trait]
140#[allow(clippy::too_many_lines)]
141impl SourceConnector for DeltaSource {
142    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
143        self.state = ConnectorState::Initializing;
144
145        // Re-parse config if properties provided.
146        if !config.properties().is_empty() {
147            self.config = DeltaSourceConfig::from_config(config)?;
148        }
149
150        info!(
151            table_path = %self.config.table_path,
152            starting_version = ?self.config.starting_version,
153            "opening Delta Lake source connector"
154        );
155
156        #[cfg(feature = "delta-lake")]
157        {
158            use super::delta_io;
159
160            // Open the existing table (source requires the table to exist).
161            let table = delta_io::open_or_create_table(
162                &self.config.table_path,
163                self.config.storage_options.clone(),
164                None,
165            )
166            .await?;
167
168            // Read schema from table.
169            if let Ok(schema) = delta_io::get_table_schema(&table) {
170                self.schema = Some(schema);
171            }
172
173            // Start from -1 (or explicit starting_version) and let
174            // poll_batch() walk versions incrementally with bounded reads.
175            if let Some(start) = self.config.starting_version {
176                self.current_version = start;
177            } else {
178                self.current_version = -1;
179            }
180            let table_version = table.version().unwrap_or(0);
181
182            info!(
183                table_path = %self.config.table_path,
184                table_version,
185                current_version = self.current_version,
186                "Delta Lake source: resolved starting version"
187            );
188
189            self.table = Some(table);
190        }
191
192        #[cfg(not(feature = "delta-lake"))]
193        {
194            self.state = ConnectorState::Failed;
195            return Err(ConnectorError::ConfigurationError(
196                "Delta Lake source requires the 'delta-lake' feature to be enabled. \
197                 Build with: cargo build --features delta-lake"
198                    .into(),
199            ));
200        }
201
202        #[cfg(feature = "delta-lake")]
203        {
204            self.state = ConnectorState::Running;
205            info!("Delta Lake source connector opened successfully");
206            Ok(())
207        }
208    }
209
210    #[allow(unused_variables)]
211    async fn poll_batch(
212        &mut self,
213        max_records: usize,
214    ) -> Result<Option<SourceBatch>, ConnectorError> {
215        if self.state != ConnectorState::Running {
216            return Err(ConnectorError::InvalidState {
217                expected: "Running".into(),
218                actual: self.state.to_string(),
219            });
220        }
221
222        // Return buffered batches first. When the buffer drains
223        // completely, advance current_version to the inflight version
224        // so that checkpoint() reports the fully-consumed position.
225        if let Some(batch) = self.pending_batches.pop_front() {
226            self.records_read += batch.num_rows() as u64;
227
228            #[cfg(feature = "delta-lake")]
229            if self.pending_batches.is_empty() {
230                if let Some(v) = self.inflight_version.take() {
231                    self.current_version = v;
232                }
233            }
234
235            return Ok(Some(SourceBatch::new(batch)));
236        }
237
238        // Check for new versions, throttled by poll_interval.
239        #[cfg(feature = "delta-lake")]
240        {
241            use super::delta_io;
242
243            // Recover from lost table handle (e.g., connection failure).
244            if self.table.is_none() {
245                match self.reopen_table().await {
246                    Ok(()) => {
247                        info!("Delta Lake source: re-opened table after lost handle");
248                    }
249                    Err(e) => {
250                        warn!(error = %e, "Delta Lake source: reopen failed, will retry");
251                        return Ok(None);
252                    }
253                }
254            }
255
256            // Throttle version checks: skip if less than poll_interval has
257            // elapsed since the last check. This prevents hammering
258            // get_latest_version() on every source-adapter tick (10ms).
259            // In incremental mode, skip the throttle if we already know
260            // there are more versions to process (catch-up).
261            let needs_refresh = self.known_latest_version <= self.current_version;
262            if needs_refresh {
263                if let Some(last_check) = self.last_version_check {
264                    if last_check.elapsed() < self.config.poll_interval {
265                        return Ok(None);
266                    }
267                }
268                self.last_version_check = Some(Instant::now());
269
270                let table = self
271                    .table
272                    .as_mut()
273                    .ok_or_else(|| ConnectorError::InvalidState {
274                        expected: "table initialized".into(),
275                        actual: "table not initialized".into(),
276                    })?;
277                let latest_version = match delta_io::get_latest_version(table).await {
278                    Ok(v) => v,
279                    Err(e) => {
280                        warn!(error = %e, "Delta Lake source: version check failed, will retry");
281                        return Ok(None);
282                    }
283                };
284                self.known_latest_version = latest_version;
285
286                if latest_version <= self.current_version {
287                    return Ok(None); // No new data
288                }
289
290                debug!(
291                    current_version = self.current_version,
292                    latest_version, "Delta Lake source: new version(s) available"
293                );
294            }
295
296            let target_version = match self.config.read_mode {
297                DeltaReadMode::Snapshot => self.known_latest_version,
298                DeltaReadMode::Incremental => self.current_version + 1,
299            };
300
301            // Read data first. Both read_batches_at_version and
302            // read_version_diff call load_version(target_version) internally,
303            // so the table's snapshot will be at target_version after this.
304            let table = self
305                .table
306                .as_mut()
307                .ok_or_else(|| ConnectorError::InvalidState {
308                    expected: "table initialized".into(),
309                    actual: "table not initialized".into(),
310                })?;
311            let partition_filter = self.config.partition_filter.clone();
312
313            // Version-gap detection: if the target commit was cleaned up,
314            // skip ahead to a snapshot at the latest available version.
315            let mut use_snapshot_fallback = false;
316            if self.config.read_mode == DeltaReadMode::Incremental && target_version > 0 {
317                let log_store = table.log_store();
318                if let Ok(None) = log_store.read_commit_entry(target_version).await {
319                    warn!(
320                        target_version,
321                        known_latest = self.known_latest_version,
322                        "version unavailable, falling back to snapshot at latest"
323                    );
324                    use_snapshot_fallback = true;
325                }
326            }
327
328            // On gap fallback, override target_version so inflight_version
329            // tracks the snapshot version, not the missing one.
330            let target_version = if use_snapshot_fallback {
331                self.known_latest_version
332            } else {
333                target_version
334            };
335
336            // Incremental reads (read_version_diff, CDF) always consume the
337            // full version — each version's diff is O(new_files), not
338            // O(table_size), so unbounded reads are safe. This avoids the
339            // re-read-from-start problem when max_records truncates a version.
340            // Snapshot reads stay bounded by max_records (can be table-sized).
341            let (batches, fully_consumed) = if use_snapshot_fallback {
342                delta_io::read_batches_at_version(table, target_version, max_records).await?
343            } else if self.config.cdf_enabled
344                && self.config.read_mode == DeltaReadMode::Incremental
345                && target_version > 0
346            {
347                // CDF mode: scan_cdf() consumes the DeltaTable. Take it,
348                // read CDF batches, then re-open the table handle.
349                let taken_table =
350                    self.table
351                        .take()
352                        .ok_or_else(|| ConnectorError::InvalidState {
353                            expected: "table initialized".into(),
354                            actual: "table not initialized".into(),
355                        })?;
356                let cdf_batches =
357                    delta_io::read_cdf_batches(taken_table, target_version, target_version).await?;
358
359                // Re-open table since scan_cdf consumed it.
360                self.reopen_table().await?;
361
362                // Map CDF _change_type to LaminarDB _op.
363                let mut mapped = Vec::new();
364                for batch in &cdf_batches {
365                    if let Some(mapped_batch) = delta_io::map_cdf_to_changelog(batch)? {
366                        mapped.push(mapped_batch);
367                    }
368                }
369                (mapped, true)
370            } else {
371                match self.config.read_mode {
372                    DeltaReadMode::Snapshot => {
373                        delta_io::read_batches_at_version(table, target_version, max_records)
374                            .await?
375                    }
376                    DeltaReadMode::Incremental => {
377                        // Read full version diff — each version is one
378                        // commit's worth of files, safe to read unbounded.
379                        let (b, _) = delta_io::read_version_diff(
380                            table,
381                            target_version,
382                            usize::MAX,
383                            partition_filter.as_deref(),
384                        )
385                        .await?;
386                        (b, true)
387                    }
388                }
389            };
390
391            // Schema evolution detection: extract schema from the snapshot
392            // that read_version_diff/read_batches_at_version already loaded.
393            // This avoids a redundant load_version call.
394            {
395                let table = self
396                    .table
397                    .as_ref()
398                    .ok_or_else(|| ConnectorError::InvalidState {
399                        expected: "table initialized".into(),
400                        actual: "table not initialized".into(),
401                    })?;
402                if let Ok(snapshot) = table.snapshot() {
403                    let new_schema = snapshot.snapshot().arrow_schema();
404                    if let Some(existing) = &self.schema {
405                        if existing.fields() != new_schema.fields() {
406                            match self.config.schema_evolution_action {
407                                SchemaEvolutionAction::Warn => {
408                                    warn!(
409                                        table_path = %self.config.table_path,
410                                        old_fields = ?existing.fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>(),
411                                        new_fields = ?new_schema.fields().iter().map(|f| f.name().as_str()).collect::<Vec<_>>(),
412                                        "Delta Lake source: schema evolved, projecting to original"
413                                    );
414                                    // Map each original field to its index in the new
415                                    // schema, or None if the field was removed.
416                                    let indices: Vec<Option<usize>> = existing
417                                        .fields()
418                                        .iter()
419                                        .map(|f| new_schema.index_of(f.name()).ok())
420                                        .collect();
421                                    self.projection_indices = Some(indices);
422                                    // Do NOT update self.schema — keep the original
423                                    // for downstream stability.
424                                }
425                                SchemaEvolutionAction::Error => {
426                                    return Err(ConnectorError::SchemaMismatch(format!(
427                                        "schema evolved at version {target_version}"
428                                    )));
429                                }
430                            }
431                        }
432                    } else {
433                        self.schema = Some(new_schema);
434                    }
435                }
436            }
437
438            // Buffer all batches. Do NOT advance current_version yet —
439            // it is only safe to checkpoint this version after the
440            // buffer is fully drained. Store it as inflight_version.
441            for batch in batches {
442                if batch.num_rows() == 0 {
443                    continue;
444                }
445                // Apply schema projection if schema evolution was detected.
446                let batch = if let Some(ref indices) = self.projection_indices {
447                    let original_schema = self.schema.as_ref().unwrap();
448                    let num_rows = batch.num_rows();
449                    let columns: Vec<Arc<dyn arrow_array::Array>> = indices
450                        .iter()
451                        .zip(original_schema.fields())
452                        .map(|(idx, field)| match idx {
453                            Some(i) => batch.column(*i).clone(),
454                            None => arrow_array::new_null_array(field.data_type(), num_rows),
455                        })
456                        .collect();
457                    RecordBatch::try_new(original_schema.clone(), columns).map_err(|e| {
458                        ConnectorError::ReadError(format!(
459                            "failed to project batch to original schema: {e}"
460                        ))
461                    })?
462                } else {
463                    batch
464                };
465                self.pending_batches.push_back(batch);
466            }
467
468            if !fully_consumed {
469                // Snapshot mode: max_records truncated the version.
470                // Don't advance — next poll re-reads the same version.
471                // (Incremental mode always reads full versions, so
472                // fully_consumed is always true there.)
473            } else if self.pending_batches.is_empty() {
474                // Version fully consumed with no data rows (metadata-only).
475                self.current_version = target_version;
476            } else {
477                // Version fully consumed, batches buffered. Advance after drain.
478                self.inflight_version = Some(target_version);
479            }
480
481            if let Some(batch) = self.pending_batches.pop_front() {
482                self.records_read += batch.num_rows() as u64;
483
484                // Single-batch version: buffer is already empty, advance now.
485                if self.pending_batches.is_empty() {
486                    if let Some(v) = self.inflight_version.take() {
487                        self.current_version = v;
488                    }
489                }
490
491                return Ok(Some(SourceBatch::new(batch)));
492            }
493        }
494
495        Ok(None)
496    }
497
498    fn schema(&self) -> SchemaRef {
499        self.schema
500            .clone()
501            .unwrap_or_else(|| Arc::new(arrow_schema::Schema::empty()))
502    }
503
504    fn checkpoint(&self) -> SourceCheckpoint {
505        let mut cp = SourceCheckpoint::new(0);
506        cp.set_offset("delta_version", self.current_version.to_string());
507        cp.set_offset("read_mode", self.config.read_mode.to_string());
508        cp
509    }
510
511    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
512        if let Some(version_str) = checkpoint.get_offset("delta_version") {
513            self.current_version = version_str.parse::<i64>().map_err(|_| {
514                ConnectorError::ConfigurationError(format!(
515                    "invalid delta_version in checkpoint: '{version_str}'"
516                ))
517            })?;
518            info!(
519                restored_version = self.current_version,
520                "Delta Lake source: restored from checkpoint"
521            );
522        }
523        Ok(())
524    }
525
526    async fn close(&mut self) -> Result<(), ConnectorError> {
527        info!("closing Delta Lake source connector");
528
529        #[cfg(feature = "delta-lake")]
530        {
531            self.table = None;
532        }
533
534        self.pending_batches.clear();
535        self.state = ConnectorState::Closed;
536
537        info!(
538            table_path = %self.config.table_path,
539            current_version = self.current_version,
540            records_read = self.records_read,
541            "Delta Lake source connector closed"
542        );
543
544        Ok(())
545    }
546}
547
548impl std::fmt::Debug for DeltaSource {
549    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
550        f.debug_struct("DeltaSource")
551            .field("state", &self.state)
552            .field("table_path", &self.config.table_path)
553            .field("read_mode", &self.config.read_mode)
554            .field("current_version", &self.current_version)
555            .field("pending_batches", &self.pending_batches.len())
556            .field("records_read", &self.records_read)
557            .finish_non_exhaustive()
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use arrow_array::{Float64Array, Int64Array, StringArray};
565    use arrow_schema::{DataType, Field, Schema};
566
567    fn test_config() -> DeltaSourceConfig {
568        DeltaSourceConfig::new("/tmp/delta_source_test")
569    }
570
571    fn test_schema() -> SchemaRef {
572        Arc::new(Schema::new(vec![
573            Field::new("id", DataType::Int64, false),
574            Field::new("name", DataType::Utf8, true),
575            Field::new("value", DataType::Float64, true),
576        ]))
577    }
578
579    #[allow(clippy::cast_precision_loss)]
580    fn test_batch(n: usize) -> RecordBatch {
581        let ids: Vec<i64> = (0..n as i64).collect();
582        let names: Vec<&str> = (0..n).map(|_| "test").collect();
583        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
584
585        RecordBatch::try_new(
586            test_schema(),
587            vec![
588                Arc::new(Int64Array::from(ids)),
589                Arc::new(StringArray::from(names)),
590                Arc::new(Float64Array::from(values)),
591            ],
592        )
593        .unwrap()
594    }
595
596    #[test]
597    fn test_new_defaults() {
598        let source = DeltaSource::new(test_config(), None);
599        assert_eq!(source.state(), ConnectorState::Created);
600        assert_eq!(source.current_version(), -1);
601        assert!(source.schema.is_none());
602    }
603
604    #[test]
605    fn test_checkpoint_roundtrip() {
606        let mut source = DeltaSource::new(test_config(), None);
607        source.current_version = 42;
608
609        let cp = source.checkpoint();
610        assert_eq!(cp.get_offset("delta_version"), Some("42"));
611    }
612
613    #[tokio::test]
614    async fn test_restore_from_checkpoint() {
615        let mut source = DeltaSource::new(test_config(), None);
616        assert_eq!(source.current_version(), -1);
617
618        let mut cp = SourceCheckpoint::new(0);
619        cp.set_offset("delta_version", "10");
620        source.restore(&cp).await.unwrap();
621
622        assert_eq!(source.current_version(), 10);
623    }
624
625    #[test]
626    fn test_schema_empty_when_none() {
627        let source = DeltaSource::new(test_config(), None);
628        let schema = source.schema();
629        assert_eq!(schema.fields().len(), 0);
630    }
631
632    #[tokio::test]
633    async fn test_poll_not_running() {
634        let mut source = DeltaSource::new(test_config(), None);
635        // state is Created, not Running
636        let result = source.poll_batch(100).await;
637        assert!(result.is_err());
638    }
639
640    #[tokio::test]
641    async fn test_poll_returns_buffered_batches() {
642        let mut source = DeltaSource::new(test_config(), None);
643        source.state = ConnectorState::Running;
644
645        // Manually buffer some batches.
646        source.pending_batches.push_back(test_batch(5));
647        source.pending_batches.push_back(test_batch(3));
648
649        let batch1 = source.poll_batch(100).await.unwrap();
650        assert!(batch1.is_some());
651        assert_eq!(batch1.unwrap().records.num_rows(), 5);
652
653        let batch2 = source.poll_batch(100).await.unwrap();
654        assert!(batch2.is_some());
655        assert_eq!(batch2.unwrap().records.num_rows(), 3);
656
657        assert_eq!(source.records_read, 8);
658    }
659
660    /// D002/D003: Verify `max_records` bounds the pending buffer.
661    /// Without the delta-lake feature, `poll_batch` returns buffered data
662    /// incrementally; with the feature, `read_batches_at_version` applies LIMIT.
663    #[tokio::test]
664    async fn test_poll_batch_returns_buffered_incrementally() {
665        let mut source = DeltaSource::new(test_config(), None);
666        source.state = ConnectorState::Running;
667
668        // Simulate what read_batches_at_version produces: many small batches
669        for _ in 0..10 {
670            source.pending_batches.push_back(test_batch(100));
671        }
672
673        // Each poll_batch returns exactly one buffered batch
674        let batch = source.poll_batch(50).await.unwrap();
675        assert!(batch.is_some());
676        assert_eq!(batch.unwrap().records.num_rows(), 100);
677        // 9 remaining
678        assert_eq!(source.pending_batches.len(), 9);
679    }
680
681    /// Version is only advanced after the inflight buffer is fully drained.
682    /// With multiple buffered batches, `current_version` stays at the old value
683    /// until the last batch is consumed, then jumps to the target version.
684    #[tokio::test]
685    async fn test_version_deferred_until_buffer_drained() {
686        let mut source = DeltaSource::new(test_config(), None);
687        source.state = ConnectorState::Running;
688        source.current_version = 5;
689
690        // Simulate: read_batches_at_version loaded version 42 with 3 batches.
691        // In production the delta-lake cfg block sets inflight_version; here
692        // we set it manually to test the drain logic (which is not cfg-gated
693        // inside the pop_front path above — it is, so we test via the
694        // non-feature path by just checking the pending_batches drain).
695        source.pending_batches.push_back(test_batch(10));
696        source.pending_batches.push_back(test_batch(10));
697        source.pending_batches.push_back(test_batch(10));
698
699        // Without delta-lake feature, inflight_version doesn't exist, so
700        // current_version won't auto-advance. Verify the buffer drains.
701        let b1 = source.poll_batch(100).await.unwrap();
702        assert!(b1.is_some());
703        assert_eq!(source.pending_batches.len(), 2);
704
705        let b2 = source.poll_batch(100).await.unwrap();
706        assert!(b2.is_some());
707        assert_eq!(source.pending_batches.len(), 1);
708
709        let b3 = source.poll_batch(100).await.unwrap();
710        assert!(b3.is_some());
711        assert!(source.pending_batches.is_empty());
712        assert_eq!(source.records_read, 30);
713    }
714
715    /// D004: `poll_interval` is parsed and stored in config.
716    /// The field is used by the delta-lake feature to throttle version checks.
717    #[test]
718    fn test_poll_interval_is_stored() {
719        let mut config = test_config();
720        config.poll_interval = std::time::Duration::from_millis(500);
721        let source = DeltaSource::new(config, None);
722        assert_eq!(
723            source.config().poll_interval,
724            std::time::Duration::from_millis(500)
725        );
726    }
727
728    #[test]
729    fn test_debug_output() {
730        let source = DeltaSource::new(test_config(), None);
731        let debug = format!("{source:?}");
732        assert!(debug.contains("DeltaSource"));
733        assert!(debug.contains("/tmp/delta_source_test"));
734    }
735
736    #[tokio::test]
737    async fn test_close() {
738        let mut source = DeltaSource::new(test_config(), None);
739        source.state = ConnectorState::Running;
740        source.pending_batches.push_back(test_batch(5));
741
742        source.close().await.unwrap();
743        assert_eq!(source.state(), ConnectorState::Closed);
744        assert!(source.pending_batches.is_empty());
745    }
746
747    /// D020: Source `open()` must error without delta-lake feature.
748    #[cfg(not(feature = "delta-lake"))]
749    #[tokio::test]
750    async fn test_open_requires_feature() {
751        let mut source = DeltaSource::new(test_config(), None);
752        let connector_config = crate::config::ConnectorConfig::new("delta-lake");
753        let result = source.open(&connector_config).await;
754        assert!(result.is_err());
755        let err = result.unwrap_err().to_string();
756        assert!(err.contains("delta-lake"), "error: {err}");
757    }
758}