Skip to main content

laminar_connectors/lookup/
delta_reference.rs

1//! Delta Lake reference table source for lookup/enrichment joins.
2//!
3//! Implements `ReferenceTableSource` to populate lookup tables from Delta
4//! Lake tables. Supports catalog resolution (Unity, Glue) via the existing
5//! `resolve_catalog_options` function in `delta_io`.
6
7use std::collections::VecDeque;
8
9use arrow_array::RecordBatch;
10use tracing::info;
11
12#[cfg(feature = "delta-lake")]
13use std::time::Instant;
14
15#[cfg(feature = "delta-lake")]
16use tracing::{debug, warn};
17
18#[cfg(feature = "delta-lake")]
19use deltalake::DeltaTable;
20
21use crate::checkpoint::SourceCheckpoint;
22use crate::config::ConnectorConfig;
23use crate::error::ConnectorError;
24use crate::lakehouse::delta_source_config::DeltaSourceConfig;
25#[cfg(feature = "delta-lake")]
26use crate::lakehouse::delta_source_config::SchemaEvolutionAction;
27use crate::reference::ReferenceTableSource;
28
29/// Lifecycle phase.
30#[allow(dead_code)] // Variants used only with delta-lake feature
31enum Phase {
32    Init,
33    Snapshot,
34    Changes,
35    Closed,
36}
37
38/// Delta Lake reference table source for `CREATE LOOKUP TABLE`.
39///
40/// Reads a Delta Lake table as a dimension/reference table for enrichment
41/// joins. The table is loaded as a full snapshot on first access, then
42/// incrementally refreshed as new Delta versions appear.
43pub struct DeltaReferenceTableSource {
44    #[allow(dead_code)] // read in feature-gated methods
45    config: DeltaSourceConfig,
46    phase: Phase,
47    #[cfg(feature = "delta-lake")]
48    table: Option<DeltaTable>,
49    current_version: i64,
50    /// Version being drained. Promoted to `current_version` only when
51    /// `pending_batches` is fully consumed, so checkpoint is safe.
52    #[cfg(feature = "delta-lake")]
53    inflight_version: Option<i64>,
54    pending_batches: VecDeque<RecordBatch>,
55    #[cfg(feature = "delta-lake")]
56    last_version_check: Option<Instant>,
57    #[cfg(feature = "delta-lake")]
58    known_schema: Option<arrow_schema::SchemaRef>,
59}
60
61impl DeltaReferenceTableSource {
62    /// Creates a new Delta Lake reference table source from a pre-parsed config.
63    #[must_use]
64    pub fn from_source_config(config: DeltaSourceConfig) -> Self {
65        Self {
66            config,
67            phase: Phase::Init,
68            #[cfg(feature = "delta-lake")]
69            table: None,
70            current_version: -1,
71            #[cfg(feature = "delta-lake")]
72            inflight_version: None,
73            pending_batches: VecDeque::new(),
74            #[cfg(feature = "delta-lake")]
75            last_version_check: None,
76            #[cfg(feature = "delta-lake")]
77            known_schema: None,
78        }
79    }
80
81    /// Creates a new source from a [`ConnectorConfig`] (SQL WITH clause).
82    ///
83    /// # Errors
84    ///
85    /// Returns `ConnectorError` if required config keys are missing.
86    pub fn from_connector_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
87        let source_config = DeltaSourceConfig::from_config(config)?;
88        Ok(Self::from_source_config(source_config))
89    }
90
91    #[cfg(feature = "delta-lake")]
92    async fn open_table(&mut self) -> Result<(), ConnectorError> {
93        use crate::lakehouse::delta_io;
94
95        let (resolved_path, resolved_opts) = delta_io::resolve_catalog_options(
96            &self.config.catalog_type,
97            self.config.catalog_database.as_deref(),
98            self.config.catalog_name.as_deref(),
99            self.config.catalog_schema.as_deref(),
100            &self.config.table_path,
101            &self.config.storage_options,
102        )
103        .await?;
104
105        info!(
106            table_path = %self.config.table_path,
107            resolved_path = %resolved_path,
108            catalog = %self.config.catalog_type,
109            "delta lookup: resolved catalog"
110        );
111
112        let table = delta_io::open_or_create_table(&resolved_path, resolved_opts, None).await?;
113
114        info!(
115            resolved_path = %resolved_path,
116            table_version = table.version().unwrap_or(0),
117            "delta lookup: table opened"
118        );
119
120        self.table = Some(table);
121        Ok(())
122    }
123
124    /// Loads all batches at the latest version into `pending_batches`.
125    /// When `partition_filter` is set, applies it as a WHERE clause.
126    #[cfg(feature = "delta-lake")]
127    async fn load_snapshot(&mut self) -> Result<(), ConnectorError> {
128        use crate::lakehouse::delta_io;
129
130        let table = self
131            .table
132            .as_mut()
133            .ok_or_else(|| ConnectorError::Internal("table not opened".into()))?;
134
135        let latest = delta_io::get_latest_version(table).await?;
136
137        let batches = if self.config.partition_filter.is_some() {
138            self.load_snapshot_filtered(latest).await?
139        } else {
140            let (b, _) = delta_io::read_batches_at_version(table, latest, usize::MAX).await?;
141            b
142        };
143
144        // Seed schema from table metadata if no rows returned (empty/filtered table).
145        if let Some(first) = batches.first() {
146            self.known_schema = Some(first.schema());
147        } else if self.known_schema.is_none() {
148            let table = self
149                .table
150                .as_ref()
151                .ok_or_else(|| ConnectorError::Internal("table not opened".into()))?;
152            if let Ok(s) = delta_io::get_table_schema(table) {
153                self.known_schema = Some(s);
154            }
155        }
156
157        let total_rows: usize = batches.iter().map(RecordBatch::num_rows).sum();
158        info!(
159            version = latest,
160            batches = batches.len(),
161            rows = total_rows,
162            partition_filter = ?self.config.partition_filter,
163            "delta lookup: snapshot loaded"
164        );
165
166        self.pending_batches = VecDeque::from(batches);
167        self.current_version = latest;
168        self.last_version_check = Some(Instant::now());
169        Ok(())
170    }
171
172    /// Loads snapshot with partition filter via `DataFusion` WHERE clause.
173    #[cfg(feature = "delta-lake")]
174    async fn load_snapshot_filtered(
175        &mut self,
176        version: i64,
177    ) -> Result<Vec<RecordBatch>, ConnectorError> {
178        use tokio_stream::StreamExt;
179
180        let table = self
181            .table
182            .as_mut()
183            .ok_or_else(|| ConnectorError::Internal("table not opened".into()))?;
184
185        table
186            .load_version(version)
187            .await
188            .map_err(|e| ConnectorError::ReadError(format!("load version {version}: {e}")))?;
189
190        let provider = table
191            .table_provider()
192            .build()
193            .await
194            .map_err(|e| ConnectorError::ReadError(format!("build table provider: {e}")))?;
195
196        let ctx = datafusion::prelude::SessionContext::new();
197        ctx.register_table("delta_lookup_scan", std::sync::Arc::new(provider))
198            .map_err(|e| ConnectorError::ReadError(format!("register scan: {e}")))?;
199
200        // Caller guarantees partition_filter is Some.
201        let filter = self.config.partition_filter.as_deref().unwrap_or("1=1");
202        let sql = format!("SELECT * FROM delta_lookup_scan WHERE {filter}");
203
204        let df = ctx
205            .sql(&sql)
206            .await
207            .map_err(|e| ConnectorError::ReadError(format!("filtered scan: {e}")))?;
208
209        let mut stream = df
210            .execute_stream()
211            .await
212            .map_err(|e| ConnectorError::ReadError(format!("stream: {e}")))?;
213
214        let mut batches = Vec::new();
215        while let Some(result) = stream.next().await {
216            let batch = result.map_err(|e| ConnectorError::ReadError(format!("batch: {e}")))?;
217            if batch.num_rows() > 0 {
218                batches.push(batch);
219            }
220        }
221
222        Ok(batches)
223    }
224
225    /// Checks for new Delta versions and loads diffs, capped at
226    /// `MAX_VERSIONS_PER_POLL` per call to avoid blocking the pipeline.
227    #[cfg(feature = "delta-lake")]
228    async fn check_for_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
229        use crate::lakehouse::delta_io;
230
231        /// Max versions to read per `poll_changes()` call.
232        const MAX_VERSIONS_PER_POLL: i64 = 10;
233
234        // Throttle version checks to poll_interval.
235        if let Some(last) = self.last_version_check {
236            if last.elapsed() < self.config.poll_interval {
237                return Ok(None);
238            }
239        }
240
241        let table = self
242            .table
243            .as_mut()
244            .ok_or_else(|| ConnectorError::Internal("table not opened".into()))?;
245
246        let latest = delta_io::get_latest_version(table).await?;
247
248        if latest <= self.current_version {
249            self.last_version_check = Some(Instant::now());
250            return Ok(None);
251        }
252
253        let target = latest.min(self.current_version + MAX_VERSIONS_PER_POLL);
254
255        let mut all_batches: Vec<RecordBatch> = Vec::new();
256        for v in (self.current_version + 1)..=target {
257            let (batches, _) = delta_io::read_version_diff(
258                table,
259                v,
260                usize::MAX,
261                self.config.partition_filter.as_deref(),
262            )
263            .await?;
264
265            if let Some(first) = batches.first() {
266                let new_schema = first.schema();
267                if let Some(known) = &self.known_schema {
268                    if known.as_ref() != new_schema.as_ref() {
269                        match self.config.schema_evolution_action {
270                            SchemaEvolutionAction::Warn => {
271                                warn!(version = v, "delta lookup: schema changed between versions");
272                                self.known_schema = Some(new_schema);
273                            }
274                            SchemaEvolutionAction::Error => {
275                                if v > self.current_version + 1 {
276                                    self.current_version = v - 1;
277                                }
278                                return Err(ConnectorError::Internal(format!(
279                                    "delta lookup: schema evolution detected at version {v} \
280                                     (action=error)"
281                                )));
282                            }
283                        }
284                    }
285                } else {
286                    self.known_schema = Some(new_schema);
287                }
288            }
289
290            all_batches.extend(batches);
291        }
292
293        let total_rows: usize = all_batches.iter().map(RecordBatch::num_rows).sum();
294        if total_rows > 0 {
295            debug!(
296                from = self.current_version + 1,
297                to = target,
298                latest,
299                rows = total_rows,
300                "delta lookup: version diff loaded"
301            );
302        }
303
304        // If more versions remain, skip throttle so next poll re-enters immediately.
305        if target < latest {
306            self.last_version_check = None;
307        } else {
308            self.last_version_check = Some(Instant::now());
309        }
310
311        let mut batch_iter = all_batches.into_iter();
312        let first = batch_iter.next();
313        self.pending_batches.extend(batch_iter);
314
315        // Defer version advancement: only promote when buffer is drained,
316        // so checkpoint() never reports a version ahead of consumed data.
317        if self.pending_batches.is_empty() {
318            self.current_version = target;
319        } else {
320            self.inflight_version = Some(target);
321        }
322        Ok(first)
323    }
324}
325
326#[async_trait::async_trait]
327impl ReferenceTableSource for DeltaReferenceTableSource {
328    async fn poll_snapshot(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
329        if matches!(self.phase, Phase::Init) {
330            #[cfg(feature = "delta-lake")]
331            {
332                self.open_table().await?;
333                self.load_snapshot().await?;
334                self.phase = Phase::Snapshot;
335            }
336
337            #[cfg(not(feature = "delta-lake"))]
338            {
339                return Err(ConnectorError::ConfigurationError(
340                    "Delta Lake lookup requires the 'delta-lake' feature. \
341                     Build with: cargo build --features delta-lake"
342                        .into(),
343                ));
344            }
345        }
346
347        if !matches!(self.phase, Phase::Snapshot) {
348            return Ok(None);
349        }
350
351        if let Some(batch) = self.pending_batches.pop_front() {
352            return Ok(Some(batch));
353        }
354
355        self.phase = Phase::Changes;
356        Ok(None)
357    }
358
359    fn is_snapshot_complete(&self) -> bool {
360        matches!(self.phase, Phase::Changes | Phase::Closed)
361    }
362
363    async fn poll_changes(&mut self) -> Result<Option<RecordBatch>, ConnectorError> {
364        if !matches!(self.phase, Phase::Changes) {
365            return Ok(None);
366        }
367
368        if let Some(batch) = self.pending_batches.pop_front() {
369            // Promote inflight version when buffer is fully drained.
370            #[cfg(feature = "delta-lake")]
371            if self.pending_batches.is_empty() {
372                if let Some(v) = self.inflight_version.take() {
373                    self.current_version = v;
374                }
375            }
376            return Ok(Some(batch));
377        }
378
379        #[cfg(feature = "delta-lake")]
380        {
381            return self.check_for_changes().await;
382        }
383
384        #[cfg(not(feature = "delta-lake"))]
385        Ok(None)
386    }
387
388    fn checkpoint(&self) -> SourceCheckpoint {
389        let mut cp = SourceCheckpoint::new(0);
390        cp.set_offset("delta_version", self.current_version.to_string());
391        cp
392    }
393
394    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
395        if let Some(v) = checkpoint.get_offset("delta_version") {
396            self.current_version = v.parse().map_err(|_| {
397                ConnectorError::Internal(format!("invalid delta_version in checkpoint: '{v}'"))
398            })?;
399            info!(
400                version = self.current_version,
401                "delta lookup: restored from checkpoint"
402            );
403        }
404        Ok(())
405    }
406
407    async fn close(&mut self) -> Result<(), ConnectorError> {
408        self.phase = Phase::Closed;
409        #[cfg(feature = "delta-lake")]
410        {
411            self.table = None;
412        }
413        self.pending_batches.clear();
414        Ok(())
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::lakehouse::delta_source_config::DeltaSourceConfig;
422
423    #[test]
424    fn test_from_source_config() {
425        let config = DeltaSourceConfig::new("/tmp/test_delta");
426        let src = DeltaReferenceTableSource::from_source_config(config);
427        assert!(!src.is_snapshot_complete());
428        assert_eq!(src.current_version, -1);
429    }
430
431    #[test]
432    fn test_from_connector_config() {
433        let mut config = ConnectorConfig::new("delta-lake");
434        config.set("table.path", "/tmp/test_delta");
435        let src = DeltaReferenceTableSource::from_connector_config(&config).unwrap();
436        assert!(!src.is_snapshot_complete());
437    }
438
439    #[test]
440    fn test_from_connector_config_missing_path() {
441        let config = ConnectorConfig::new("delta-lake");
442        assert!(DeltaReferenceTableSource::from_connector_config(&config).is_err());
443    }
444
445    #[test]
446    fn test_checkpoint_round_trip() {
447        let config = DeltaSourceConfig::new("/tmp/test");
448        let mut src = DeltaReferenceTableSource::from_source_config(config);
449        src.current_version = 42;
450        let cp = src.checkpoint();
451        assert_eq!(cp.get_offset("delta_version"), Some("42"));
452    }
453
454    #[tokio::test]
455    async fn test_restore_from_checkpoint() {
456        let config = DeltaSourceConfig::new("/tmp/test");
457        let mut src = DeltaReferenceTableSource::from_source_config(config);
458        let mut cp = SourceCheckpoint::new(0);
459        cp.set_offset("delta_version", "17");
460        src.restore(&cp).await.unwrap();
461        assert_eq!(src.current_version, 17);
462    }
463
464    #[tokio::test]
465    async fn test_restore_invalid_version() {
466        let config = DeltaSourceConfig::new("/tmp/test");
467        let mut src = DeltaReferenceTableSource::from_source_config(config);
468        let mut cp = SourceCheckpoint::new(0);
469        cp.set_offset("delta_version", "not_a_number");
470        assert!(src.restore(&cp).await.is_err());
471    }
472
473    #[tokio::test]
474    async fn test_close_sets_phase() {
475        let config = DeltaSourceConfig::new("/tmp/test");
476        let mut src = DeltaReferenceTableSource::from_source_config(config);
477        src.close().await.unwrap();
478        assert!(src.is_snapshot_complete());
479    }
480
481    #[tokio::test]
482    async fn test_poll_changes_before_snapshot_returns_none() {
483        let config = DeltaSourceConfig::new("/tmp/test");
484        let mut src = DeltaReferenceTableSource::from_source_config(config);
485        assert!(src.poll_changes().await.unwrap().is_none());
486    }
487
488    #[cfg(feature = "delta-lake")]
489    mod integration {
490        use super::*;
491        use arrow_array::{Int64Array, StringArray};
492        use arrow_schema::{DataType, Field, Schema, SchemaRef};
493        use std::collections::HashMap;
494        use std::sync::Arc;
495        use tempfile::TempDir;
496
497        fn test_schema() -> SchemaRef {
498            Arc::new(Schema::new(vec![
499                Field::new("id", DataType::Int64, false),
500                Field::new("name", DataType::Utf8, true),
501            ]))
502        }
503
504        fn test_batch(ids: &[i64], names: &[&str]) -> RecordBatch {
505            RecordBatch::try_new(
506                test_schema(),
507                vec![
508                    Arc::new(Int64Array::from(ids.to_vec())),
509                    Arc::new(StringArray::from(names.to_vec())),
510                ],
511            )
512            .unwrap()
513        }
514
515        async fn write_delta_version(path: &str, batches: Vec<RecordBatch>, epoch: u64) -> i64 {
516            use crate::lakehouse::delta_io;
517            use deltalake::protocol::SaveMode;
518
519            let schema = test_schema();
520            let table = delta_io::open_or_create_table(path, HashMap::new(), Some(&schema))
521                .await
522                .unwrap();
523
524            let (_table, version) = delta_io::write_batches(
525                table,
526                batches,
527                "test-writer",
528                epoch,
529                SaveMode::Append,
530                None,
531                false,
532                None,
533                false,
534                None,
535            )
536            .await
537            .unwrap();
538            version
539        }
540
541        #[tokio::test]
542        async fn test_snapshot_lifecycle() {
543            let temp_dir = TempDir::new().unwrap();
544            let table_path = temp_dir.path().to_str().unwrap();
545
546            let batch = test_batch(&[1, 2, 3], &["Alice", "Bob", "Carol"]);
547            write_delta_version(table_path, vec![batch], 1).await;
548
549            let config = DeltaSourceConfig::new(table_path);
550            let mut src = DeltaReferenceTableSource::from_source_config(config);
551
552            assert!(!src.is_snapshot_complete());
553
554            let mut total_rows = 0;
555            while let Some(batch) = src.poll_snapshot().await.unwrap() {
556                total_rows += batch.num_rows();
557            }
558            assert_eq!(total_rows, 3);
559            assert!(src.is_snapshot_complete());
560            assert!(src.poll_snapshot().await.unwrap().is_none());
561            assert!(src.poll_changes().await.unwrap().is_none());
562
563            src.close().await.unwrap();
564        }
565
566        #[tokio::test]
567        async fn test_checkpoint_preserves_version() {
568            let temp_dir = TempDir::new().unwrap();
569            let table_path = temp_dir.path().to_str().unwrap();
570
571            write_delta_version(table_path, vec![test_batch(&[1], &["Alice"])], 1).await;
572
573            let config = DeltaSourceConfig::new(table_path);
574            let mut src = DeltaReferenceTableSource::from_source_config(config);
575            while src.poll_snapshot().await.unwrap().is_some() {}
576
577            let cp = src.checkpoint();
578            assert!(src.current_version >= 0);
579            assert_eq!(
580                cp.get_offset("delta_version"),
581                Some(src.current_version.to_string().as_str())
582            );
583            src.close().await.unwrap();
584        }
585
586        #[tokio::test]
587        async fn test_poll_changes_picks_up_new_version() {
588            let temp_dir = TempDir::new().unwrap();
589            let table_path = temp_dir.path().to_str().unwrap();
590
591            // Version 1.
592            write_delta_version(table_path, vec![test_batch(&[1, 2], &["Alice", "Bob"])], 1).await;
593
594            // Snapshot.
595            let mut config = DeltaSourceConfig::new(table_path);
596            config.poll_interval = std::time::Duration::from_millis(0);
597            let mut src = DeltaReferenceTableSource::from_source_config(config);
598            while src.poll_snapshot().await.unwrap().is_some() {}
599            let v1 = src.current_version;
600
601            // Write version 2.
602            write_delta_version(table_path, vec![test_batch(&[3], &["Carol"])], 2).await;
603
604            // poll_changes should pick it up.
605            let mut change_rows = 0;
606            while let Some(batch) = src.poll_changes().await.unwrap() {
607                change_rows += batch.num_rows();
608            }
609            assert_eq!(change_rows, 1);
610            assert!(src.current_version > v1);
611            src.close().await.unwrap();
612        }
613
614        #[tokio::test]
615        async fn test_from_connector_config_with_catalog_none() {
616            let temp_dir = TempDir::new().unwrap();
617            let table_path = temp_dir.path().to_str().unwrap();
618
619            write_delta_version(table_path, vec![test_batch(&[10], &["Test"])], 1).await;
620
621            let mut config = ConnectorConfig::new("delta-lake");
622            config.set("table.path", table_path);
623            config.set("catalog.type", "none");
624
625            let mut src = DeltaReferenceTableSource::from_connector_config(&config).unwrap();
626
627            let mut total_rows = 0;
628            while let Some(batch) = src.poll_snapshot().await.unwrap() {
629                total_rows += batch.num_rows();
630            }
631            assert_eq!(total_rows, 1);
632            assert!(src.is_snapshot_complete());
633            src.close().await.unwrap();
634        }
635    }
636}