Skip to main content

laminar_connectors/lakehouse/
delta_table_provider.rs

1//! Delta Lake table provider integration with `DataFusion`.
2//!
3//! This module provides a thin helper to open a Delta Lake table and
4//! register it as a `TableProvider` in a `SessionContext`.
5//!
6//! # Usage
7//!
8//! ```rust,ignore
9//! use laminar_connectors::lakehouse::delta_table_provider::register_delta_table;
10//! use datafusion::prelude::SessionContext;
11//! use std::collections::HashMap;
12//!
13//! let ctx = SessionContext::new();
14//! register_delta_table(&ctx, "my_table", "/path/to/delta/table", HashMap::new()).await?;
15//!
16//! // Now query it:
17//! let df = ctx.sql("SELECT * FROM my_table").await?;
18//! ```
19
20#[cfg(feature = "delta-lake")]
21use std::collections::HashMap;
22
23#[cfg(feature = "delta-lake")]
24use std::sync::Arc;
25
26#[cfg(feature = "delta-lake")]
27use datafusion::prelude::SessionContext;
28
29#[cfg(feature = "delta-lake")]
30use tracing::info;
31
32#[cfg(feature = "delta-lake")]
33use crate::error::ConnectorError;
34
35/// Opens a Delta Lake table and registers it as a table provider in the
36/// given `DataFusion` `SessionContext`.
37///
38/// # Arguments
39///
40/// * `ctx` - The `DataFusion` session context to register in
41/// * `name` - The SQL table name (e.g., `"trades"`)
42/// * `table_uri` - Path to the Delta Lake table (local, `s3://`, `az://`, `gs://`)
43/// * `storage_options` - Storage credentials and configuration
44///
45/// # Errors
46///
47/// Returns `ConnectorError::ConnectionFailed` if the table cannot be opened,
48/// or `ConnectorError::Internal` if registration fails.
49#[cfg(feature = "delta-lake")]
50#[allow(clippy::implicit_hasher)]
51pub async fn register_delta_table(
52    ctx: &SessionContext,
53    name: &str,
54    table_uri: &str,
55    storage_options: HashMap<String, String>,
56) -> Result<(), ConnectorError> {
57    use super::delta_io;
58
59    info!(
60        name,
61        table_uri, "registering Delta Lake table as TableProvider"
62    );
63
64    // Open the existing table.
65    let table = delta_io::open_or_create_table(table_uri, storage_options, None).await?;
66
67    // Build a DeltaTableProvider (which implements TableProvider) from the table.
68    let provider =
69        table.table_provider().build().await.map_err(|e| {
70            ConnectorError::Internal(format!("failed to build table provider: {e}"))
71        })?;
72
73    ctx.register_table(name, Arc::new(provider)).map_err(|e| {
74        ConnectorError::Internal(format!("failed to register Delta table '{name}': {e}"))
75    })?;
76
77    info!(name, table_uri, "Delta Lake table registered successfully");
78
79    Ok(())
80}
81
82#[cfg(all(test, feature = "delta-lake"))]
83mod tests {
84    use super::*;
85    use arrow_array::{Float64Array, Int64Array, StringArray};
86    use arrow_schema::{DataType, Field, Schema, SchemaRef};
87    use tempfile::TempDir;
88
89    fn test_schema() -> SchemaRef {
90        Arc::new(Schema::new(vec![
91            Field::new("id", DataType::Int64, false),
92            Field::new("name", DataType::Utf8, true),
93            Field::new("value", DataType::Float64, true),
94        ]))
95    }
96
97    #[allow(clippy::cast_precision_loss)]
98    fn test_batch(n: usize) -> arrow_array::RecordBatch {
99        let ids: Vec<i64> = (0..n as i64).collect();
100        let names: Vec<&str> = (0..n).map(|_| "test").collect();
101        let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
102
103        arrow_array::RecordBatch::try_new(
104            test_schema(),
105            vec![
106                Arc::new(Int64Array::from(ids)),
107                Arc::new(StringArray::from(names)),
108                Arc::new(Float64Array::from(values)),
109            ],
110        )
111        .unwrap()
112    }
113
114    #[tokio::test]
115    async fn test_register_and_query_delta_table() {
116        use super::super::delta_io;
117        use deltalake::protocol::SaveMode;
118
119        let temp_dir = TempDir::new().unwrap();
120        let table_path = temp_dir.path().to_str().unwrap();
121
122        // Create a Delta table with some data.
123        let schema = test_schema();
124        let table = delta_io::open_or_create_table(table_path, HashMap::new(), Some(&schema))
125            .await
126            .unwrap();
127
128        let batch = test_batch(10);
129        let (_table, version) = delta_io::write_batches(
130            table,
131            vec![batch],
132            "test-writer",
133            1,
134            SaveMode::Append,
135            None,
136            false,
137        )
138        .await
139        .unwrap();
140        assert_eq!(version, 1);
141
142        // Register as TableProvider and query.
143        let ctx = SessionContext::new();
144        register_delta_table(&ctx, "test_delta", table_path, HashMap::new())
145            .await
146            .unwrap();
147
148        let df = ctx
149            .sql("SELECT COUNT(*) AS cnt FROM test_delta")
150            .await
151            .unwrap();
152        let results = df.collect().await.unwrap();
153
154        assert_eq!(results.len(), 1);
155        let count = results[0]
156            .column(0)
157            .as_any()
158            .downcast_ref::<Int64Array>()
159            .unwrap()
160            .value(0);
161        assert_eq!(count, 10);
162    }
163}