Skip to main content

laminar_connectors/lakehouse/
delta_table_provider.rs

1//! Delta Lake table provider integration with `DataFusion`.
2//!
3//! This module provides a thin helper to open a Delta Lake table and
4//! register it as a `TableProvider` in a `SessionContext`.
5//!
6//! # Usage
7//!
8//! ```rust,ignore
9//! use laminar_connectors::lakehouse::delta_table_provider::register_delta_table;
10//! use datafusion::prelude::SessionContext;
11//! use std::collections::HashMap;
12//!
13//! let ctx = SessionContext::new();
14//! register_delta_table(&ctx, "my_table", "/path/to/delta/table", HashMap::new()).await?;
15//!
16//! // Now query it:
17//! let df = ctx.sql("SELECT * FROM my_table").await?;
18//! ```
19
20#[cfg(feature = "delta-lake")]
21use std::collections::HashMap;
22
23#[cfg(feature = "delta-lake")]
24use std::sync::Arc;
25
26#[cfg(feature = "delta-lake")]
27use datafusion::prelude::SessionContext;
28
29#[cfg(feature = "delta-lake")]
30use tracing::info;
31
32#[cfg(feature = "delta-lake")]
33use crate::error::ConnectorError;
34
35/// Opens a Delta Lake table and registers it as a table provider in the
36/// given `DataFusion` `SessionContext`.
37///
38/// # Arguments
39///
40/// * `ctx` - The `DataFusion` session context to register in
41/// * `name` - The SQL table name (e.g., `"trades"`)
42/// * `table_uri` - Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`)
43/// * `storage_options` - Storage credentials and configuration
44///
45/// # Errors
46///
47/// Returns `ConnectorError::ConnectionFailed` if the table cannot be opened,
48/// or `ConnectorError::Internal` if registration fails.
49#[cfg(feature = "delta-lake")]
50#[allow(clippy::implicit_hasher)]
51pub async fn register_delta_table(
52    ctx: &SessionContext,
53    name: &str,
54    table_uri: &str,
55    storage_options: HashMap<String, String>,
56) -> Result<(), ConnectorError> {
57    use super::delta_io;
58
59    info!(
60        name,
61        table_uri, "registering Delta Lake table as TableProvider"
62    );
63
64    // Open the existing table.
65    let table = delta_io::open_or_create_table(table_uri, storage_options, None).await?;
66
67    // Register the table's object store with the session so scans can resolve
68    // non-local URLs (s3://, az://, gs://); without it, reading an
69    // object-store-backed table fails with "No suitable object store found".
70    // (Local-filesystem tables use DataFusion's built-in store.)
71    table
72        .update_datafusion_session(&ctx.state())
73        .map_err(|e| ConnectorError::Internal(format!("register Delta object store: {e}")))?;
74
75    // Build a DeltaTableProvider (which implements TableProvider) from the table.
76    let provider =
77        table.table_provider().build().await.map_err(|e| {
78            ConnectorError::Internal(format!("failed to build table provider: {e}"))
79        })?;
80
81    ctx.register_table(name, Arc::new(provider)).map_err(|e| {
82        ConnectorError::Internal(format!("failed to register Delta table '{name}': {e}"))
83    })?;
84
85    info!(name, table_uri, "Delta Lake table registered successfully");
86
87    Ok(())
88}
89
90#[cfg(all(test, feature = "delta-lake"))]
91mod tests {
92    use super::*;
93    use arrow_array::{Float64Array, Int64Array, StringArray};
94    use arrow_schema::{DataType, Field, Schema, SchemaRef};
95    use tempfile::TempDir;
96
97    fn test_schema() -> SchemaRef {
98        Arc::new(Schema::new(vec![
99            Field::new("id", DataType::Int64, false),
100            Field::new("name", DataType::Utf8, true),
101            Field::new("value", DataType::Float64, true),
102        ]))
103    }
104
105    #[allow(clippy::cast_precision_loss)]
106    fn test_batch(n: usize) -> arrow_array::RecordBatch {
107        let ids: Vec<i64> = (0..n as i64).collect();
108        let names: Vec<&str> = (0..n).map(|_| "test").collect();
109        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
110
111        arrow_array::RecordBatch::try_new(
112            test_schema(),
113            vec![
114                Arc::new(Int64Array::from(ids)),
115                Arc::new(StringArray::from(names)),
116                Arc::new(Float64Array::from(values)),
117            ],
118        )
119        .unwrap()
120    }
121
122    #[tokio::test]
123    async fn test_register_and_query_delta_table() {
124        use super::super::delta_io;
125        use deltalake::protocol::SaveMode;
126
127        let temp_dir = TempDir::new().unwrap();
128        let table_path = temp_dir.path().to_str().unwrap();
129
130        // Create a Delta table with some data.
131        let schema = test_schema();
132        let table = delta_io::open_or_create_table(table_path, HashMap::new(), Some(&schema))
133            .await
134            .unwrap();
135
136        let batch = test_batch(10);
137        let (_table, version) = delta_io::write_batches(
138            table,
139            vec![batch],
140            "test-writer",
141            1,
142            SaveMode::Append,
143            None,
144            false,
145            None,
146            false,
147            None,
148        )
149        .await
150        .unwrap();
151        assert_eq!(version, 1);
152
153        // Register as TableProvider and query.
154        let ctx = SessionContext::new();
155        register_delta_table(&ctx, "test_delta", table_path, HashMap::new())
156            .await
157            .unwrap();
158
159        let df = ctx
160            .sql("SELECT COUNT(*) AS cnt FROM test_delta")
161            .await
162            .unwrap();
163        let results = df.collect().await.unwrap();
164
165        assert_eq!(results.len(), 1);
166        let count = results[0]
167            .column(0)
168            .as_any()
169            .downcast_ref::<Int64Array>()
170            .unwrap()
171            .value(0);
172        assert_eq!(count, 10);
173    }
174}