1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::path::PathBuf;
6
7use datafusion_expr::{AggregateUDF, ScalarUDF};
8use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
9
10use crate::config::LaminarConfig;
11use crate::db::LaminarDB;
12use crate::error::DbError;
13use crate::profile::Profile;
14
15type ConnectorCallback = Box<dyn FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send>;
17
18pub struct LaminarDbBuilder {
30 config: LaminarConfig,
31 config_vars: HashMap<String, String>,
32 connector_callbacks: Vec<ConnectorCallback>,
33 profile: Profile,
34 profile_explicit: bool,
35 object_store_url: Option<String>,
36 object_store_options: HashMap<String, String>,
37 custom_udfs: Vec<ScalarUDF>,
38 custom_udafs: Vec<AggregateUDF>,
39}
40
41impl LaminarDbBuilder {
42 #[must_use]
44 pub fn new() -> Self {
45 Self {
46 config: LaminarConfig::default(),
47 config_vars: HashMap::new(),
48 connector_callbacks: Vec::new(),
49 profile: Profile::default(),
50 profile_explicit: false,
51 object_store_url: None,
52 object_store_options: HashMap::new(),
53 custom_udfs: Vec::new(),
54 custom_udafs: Vec::new(),
55 }
56 }
57
58 #[must_use]
60 pub fn config_var(mut self, key: &str, value: &str) -> Self {
61 self.config_vars.insert(key.to_string(), value.to_string());
62 self
63 }
64
65 #[must_use]
67 pub fn buffer_size(mut self, size: usize) -> Self {
68 self.config.default_buffer_size = size;
69 self
70 }
71
72 #[must_use]
74 pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
75 self.config.default_backpressure = strategy;
76 self
77 }
78
79 #[must_use]
81 pub fn storage_dir(mut self, path: impl Into<PathBuf>) -> Self {
82 self.config.storage_dir = Some(path.into());
83 self
84 }
85
86 #[must_use]
88 pub fn checkpoint(mut self, config: StreamCheckpointConfig) -> Self {
89 self.config.checkpoint = Some(config);
90 self
91 }
92
93 #[must_use]
97 pub fn profile(mut self, profile: Profile) -> Self {
98 self.profile = profile;
99 self.profile_explicit = true;
100 self
101 }
102
103 #[must_use]
108 pub fn object_store_url(mut self, url: impl Into<String>) -> Self {
109 self.object_store_url = Some(url.into());
110 self
111 }
112
113 #[must_use]
118 pub fn object_store_options(mut self, opts: HashMap<String, String>) -> Self {
119 self.object_store_options = opts;
120 self
121 }
122
123 #[must_use]
125 pub fn tiering(mut self, tiering: crate::config::TieringConfig) -> Self {
126 self.config.tiering = Some(tiering);
127 self
128 }
129
130 #[must_use]
135 pub fn with_tpc(mut self, tpc: crate::config::TpcRuntimeConfig) -> Self {
136 self.config.tpc = Some(tpc);
137 self
138 }
139
140 #[must_use]
155 pub fn register_udf(mut self, udf: ScalarUDF) -> Self {
156 self.custom_udfs.push(udf);
157 self
158 }
159
160 #[must_use]
175 pub fn register_udaf(mut self, udaf: AggregateUDF) -> Self {
176 self.custom_udafs.push(udaf);
177 self
178 }
179
180 #[must_use]
197 pub fn register_connector(
198 mut self,
199 f: impl FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send + 'static,
200 ) -> Self {
201 self.connector_callbacks.push(Box::new(f));
202 self
203 }
204
205 #[allow(clippy::unused_async)]
211 pub async fn build(mut self) -> Result<LaminarDB, DbError> {
212 self.config.object_store_url = self.object_store_url;
214 self.config.object_store_options = self.object_store_options;
215
216 if !self.profile_explicit {
218 self.profile = Profile::from_config(&self.config, false);
219 }
220
221 self.profile
223 .validate_features()
224 .map_err(|e| DbError::Config(e.to_string()))?;
225 self.profile
226 .validate_config(&self.config, self.config.object_store_url.as_deref())
227 .map_err(|e| DbError::Config(e.to_string()))?;
228
229 self.profile.apply_defaults(&mut self.config);
231
232 let db = LaminarDB::open_with_config_and_vars(self.config, self.config_vars)?;
233 for callback in self.connector_callbacks {
234 callback(db.connector_registry());
235 }
236 for udf in self.custom_udfs {
237 db.register_custom_udf(udf);
238 }
239 for udaf in self.custom_udafs {
240 db.register_custom_udaf(udaf);
241 }
242 Ok(db)
243 }
244}
245
246impl Default for LaminarDbBuilder {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252impl std::fmt::Debug for LaminarDbBuilder {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 f.debug_struct("LaminarDbBuilder")
255 .field("config", &self.config)
256 .field("profile", &self.profile)
257 .field("profile_explicit", &self.profile_explicit)
258 .field("object_store_url", &self.object_store_url)
259 .field(
260 "object_store_options_count",
261 &self.object_store_options.len(),
262 )
263 .field("config_vars_count", &self.config_vars.len())
264 .field("connector_callbacks", &self.connector_callbacks.len())
265 .field("custom_udfs", &self.custom_udfs.len())
266 .field("custom_udafs", &self.custom_udafs.len())
267 .finish()
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[tokio::test]
276 async fn test_default_builder() {
277 let db = LaminarDbBuilder::new().build().await.unwrap();
278 assert!(!db.is_closed());
279 }
280
281 #[tokio::test]
282 async fn test_builder_with_config_vars() {
283 let db = LaminarDbBuilder::new()
284 .config_var("KAFKA_BROKERS", "localhost:9092")
285 .config_var("GROUP_ID", "test-group")
286 .build()
287 .await
288 .unwrap();
289 assert!(!db.is_closed());
290 }
291
292 #[tokio::test]
293 async fn test_builder_with_options() {
294 let db = LaminarDbBuilder::new()
295 .buffer_size(131_072)
296 .build()
297 .await
298 .unwrap();
299 assert!(!db.is_closed());
300 }
301
302 #[tokio::test]
303 async fn test_builder_from_laminardb() {
304 let db = LaminarDB::builder().build().await.unwrap();
305 assert!(!db.is_closed());
306 }
307
308 #[test]
309 fn test_builder_debug() {
310 let builder = LaminarDbBuilder::new().config_var("K", "V");
311 let debug = format!("{builder:?}");
312 assert!(debug.contains("LaminarDbBuilder"));
313 assert!(debug.contains("config_vars_count: 1"));
314 }
315
316 #[tokio::test]
317 async fn test_builder_register_udf() {
318 use std::any::Any;
319 use std::hash::{Hash, Hasher};
320
321 use arrow::datatypes::DataType;
322 use datafusion_expr::{
323 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
324 };
325
326 #[derive(Debug)]
328 struct FortyTwo {
329 signature: Signature,
330 }
331
332 impl FortyTwo {
333 fn new() -> Self {
334 Self {
335 signature: Signature::new(TypeSignature::Nullary, Volatility::Immutable),
336 }
337 }
338 }
339
340 impl PartialEq for FortyTwo {
341 fn eq(&self, _: &Self) -> bool {
342 true
343 }
344 }
345
346 impl Eq for FortyTwo {}
347
348 impl Hash for FortyTwo {
349 fn hash<H: Hasher>(&self, state: &mut H) {
350 "forty_two".hash(state);
351 }
352 }
353
354 impl ScalarUDFImpl for FortyTwo {
355 fn as_any(&self) -> &dyn Any {
356 self
357 }
358 fn name(&self) -> &'static str {
359 "forty_two"
360 }
361 fn signature(&self) -> &Signature {
362 &self.signature
363 }
364 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
365 Ok(DataType::Int64)
366 }
367 fn invoke_with_args(
368 &self,
369 _args: ScalarFunctionArgs,
370 ) -> datafusion_common::Result<ColumnarValue> {
371 Ok(ColumnarValue::Scalar(
372 datafusion_common::ScalarValue::Int64(Some(42)),
373 ))
374 }
375 }
376
377 let udf = ScalarUDF::new_from_impl(FortyTwo::new());
378 let db = LaminarDB::builder()
379 .register_udf(udf)
380 .build()
381 .await
382 .unwrap();
383
384 let result = db.execute("SELECT forty_two()").await;
386 assert!(result.is_ok(), "UDF should be callable: {result:?}");
387 }
388}