laminar_connectors/lakehouse/
delta_table_provider.rs1#[cfg(feature = "delta-lake")]
21use std::collections::HashMap;
22
23#[cfg(feature = "delta-lake")]
24use std::sync::Arc;
25
26#[cfg(feature = "delta-lake")]
27use datafusion::prelude::SessionContext;
28
29#[cfg(feature = "delta-lake")]
30use tracing::info;
31
32#[cfg(feature = "delta-lake")]
33use crate::error::ConnectorError;
34
35#[cfg(feature = "delta-lake")]
50#[allow(clippy::implicit_hasher)]
51pub async fn register_delta_table(
52 ctx: &SessionContext,
53 name: &str,
54 table_uri: &str,
55 storage_options: HashMap<String, String>,
56) -> Result<(), ConnectorError> {
57 use super::delta_io;
58
59 info!(
60 name,
61 table_uri, "registering Delta Lake table as TableProvider"
62 );
63
64 let table = delta_io::open_or_create_table(table_uri, storage_options, None).await?;
66
67 table
72 .update_datafusion_session(&ctx.state())
73 .map_err(|e| ConnectorError::Internal(format!("register Delta object store: {e}")))?;
74
75 let provider =
77 table.table_provider().build().await.map_err(|e| {
78 ConnectorError::Internal(format!("failed to build table provider: {e}"))
79 })?;
80
81 ctx.register_table(name, Arc::new(provider)).map_err(|e| {
82 ConnectorError::Internal(format!("failed to register Delta table '{name}': {e}"))
83 })?;
84
85 info!(name, table_uri, "Delta Lake table registered successfully");
86
87 Ok(())
88}
89
90#[cfg(all(test, feature = "delta-lake"))]
91mod tests {
92 use super::*;
93 use arrow_array::{Float64Array, Int64Array, StringArray};
94 use arrow_schema::{DataType, Field, Schema, SchemaRef};
95 use tempfile::TempDir;
96
97 fn test_schema() -> SchemaRef {
98 Arc::new(Schema::new(vec![
99 Field::new("id", DataType::Int64, false),
100 Field::new("name", DataType::Utf8, true),
101 Field::new("value", DataType::Float64, true),
102 ]))
103 }
104
105 #[allow(clippy::cast_precision_loss)]
106 fn test_batch(n: usize) -> arrow_array::RecordBatch {
107 let ids: Vec<i64> = (0..n as i64).collect();
108 let names: Vec<&str> = (0..n).map(|_| "test").collect();
109 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
110
111 arrow_array::RecordBatch::try_new(
112 test_schema(),
113 vec![
114 Arc::new(Int64Array::from(ids)),
115 Arc::new(StringArray::from(names)),
116 Arc::new(Float64Array::from(values)),
117 ],
118 )
119 .unwrap()
120 }
121
122 #[tokio::test]
123 async fn test_register_and_query_delta_table() {
124 use super::super::delta_io;
125 use deltalake::protocol::SaveMode;
126
127 let temp_dir = TempDir::new().unwrap();
128 let table_path = temp_dir.path().to_str().unwrap();
129
130 let schema = test_schema();
132 let table = delta_io::open_or_create_table(table_path, HashMap::new(), Some(&schema))
133 .await
134 .unwrap();
135
136 let batch = test_batch(10);
137 let (_table, version) = delta_io::write_batches(
138 table,
139 vec![batch],
140 "test-writer",
141 1,
142 SaveMode::Append,
143 None,
144 false,
145 None,
146 false,
147 None,
148 )
149 .await
150 .unwrap();
151 assert_eq!(version, 1);
152
153 let ctx = SessionContext::new();
155 register_delta_table(&ctx, "test_delta", table_path, HashMap::new())
156 .await
157 .unwrap();
158
159 let df = ctx
160 .sql("SELECT COUNT(*) AS cnt FROM test_delta")
161 .await
162 .unwrap();
163 let results = df.collect().await.unwrap();
164
165 assert_eq!(results.len(), 1);
166 let count = results[0]
167 .column(0)
168 .as_any()
169 .downcast_ref::<Int64Array>()
170 .unwrap()
171 .value(0);
172 assert_eq!(count, 10);
173 }
174}