laminar_connectors/lakehouse/
delta_table_provider.rs1#[cfg(feature = "delta-lake")]
21use std::collections::HashMap;
22
23#[cfg(feature = "delta-lake")]
24use std::sync::Arc;
25
26#[cfg(feature = "delta-lake")]
27use datafusion::prelude::SessionContext;
28
29#[cfg(feature = "delta-lake")]
30use tracing::info;
31
32#[cfg(feature = "delta-lake")]
33use crate::error::ConnectorError;
34
35#[cfg(feature = "delta-lake")]
50#[allow(clippy::implicit_hasher)]
51pub async fn register_delta_table(
52 ctx: &SessionContext,
53 name: &str,
54 table_uri: &str,
55 storage_options: HashMap<String, String>,
56) -> Result<(), ConnectorError> {
57 use super::delta_io;
58
59 info!(
60 name,
61 table_uri, "registering Delta Lake table as TableProvider"
62 );
63
64 let table = delta_io::open_or_create_table(table_uri, storage_options, None).await?;
66
67 let provider =
69 table.table_provider().build().await.map_err(|e| {
70 ConnectorError::Internal(format!("failed to build table provider: {e}"))
71 })?;
72
73 ctx.register_table(name, Arc::new(provider)).map_err(|e| {
74 ConnectorError::Internal(format!("failed to register Delta table '{name}': {e}"))
75 })?;
76
77 info!(name, table_uri, "Delta Lake table registered successfully");
78
79 Ok(())
80}
81
82#[cfg(all(test, feature = "delta-lake"))]
83mod tests {
84 use super::*;
85 use arrow_array::{Float64Array, Int64Array, StringArray};
86 use arrow_schema::{DataType, Field, Schema, SchemaRef};
87 use tempfile::TempDir;
88
89 fn test_schema() -> SchemaRef {
90 Arc::new(Schema::new(vec![
91 Field::new("id", DataType::Int64, false),
92 Field::new("name", DataType::Utf8, true),
93 Field::new("value", DataType::Float64, true),
94 ]))
95 }
96
97 #[allow(clippy::cast_precision_loss)]
98 fn test_batch(n: usize) -> arrow_array::RecordBatch {
99 let ids: Vec<i64> = (0..n as i64).collect();
100 let names: Vec<&str> = (0..n).map(|_| "test").collect();
101 let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
102
103 arrow_array::RecordBatch::try_new(
104 test_schema(),
105 vec![
106 Arc::new(Int64Array::from(ids)),
107 Arc::new(StringArray::from(names)),
108 Arc::new(Float64Array::from(values)),
109 ],
110 )
111 .unwrap()
112 }
113
114 #[tokio::test]
115 async fn test_register_and_query_delta_table() {
116 use super::super::delta_io;
117 use deltalake::protocol::SaveMode;
118
119 let temp_dir = TempDir::new().unwrap();
120 let table_path = temp_dir.path().to_str().unwrap();
121
122 let schema = test_schema();
124 let table = delta_io::open_or_create_table(table_path, HashMap::new(), Some(&schema))
125 .await
126 .unwrap();
127
128 let batch = test_batch(10);
129 let (_table, version) = delta_io::write_batches(
130 table,
131 vec![batch],
132 "test-writer",
133 1,
134 SaveMode::Append,
135 None,
136 false,
137 )
138 .await
139 .unwrap();
140 assert_eq!(version, 1);
141
142 let ctx = SessionContext::new();
144 register_delta_table(&ctx, "test_delta", table_path, HashMap::new())
145 .await
146 .unwrap();
147
148 let df = ctx
149 .sql("SELECT COUNT(*) AS cnt FROM test_delta")
150 .await
151 .unwrap();
152 let results = df.collect().await.unwrap();
153
154 assert_eq!(results.len(), 1);
155 let count = results[0]
156 .column(0)
157 .as_any()
158 .downcast_ref::<Int64Array>()
159 .unwrap()
160 .value(0);
161 assert_eq!(count, 10);
162 }
163}