Skip to main content

laminar_db/
builder.rs

1//! Fluent builder for `LaminarDB` construction.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use datafusion_expr::{AggregateUDF, ScalarUDF};
8use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
9
10use crate::config::LaminarConfig;
11use crate::db::LaminarDB;
12use crate::error::DbError;
13use crate::profile::Profile;
14
15/// Callback for registering custom connectors.
16type ConnectorCallback = Box<dyn FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send>;
17
18/// Fluent builder for constructing a [`LaminarDB`] instance.
19///
20/// # Example
21///
22/// ```rust,ignore
23/// let db = LaminarDB::builder()
24///     .config_var("KAFKA_BROKERS", "localhost:9092")
25///     .buffer_size(131072)
26///     .build()
27///     .await?;
28/// ```
29pub struct LaminarDbBuilder {
30    config: LaminarConfig,
31    config_vars: HashMap<String, String>,
32    connector_callbacks: Vec<ConnectorCallback>,
33    profile: Profile,
34    profile_explicit: bool,
35    object_store_url: Option<String>,
36    object_store_options: HashMap<String, String>,
37    custom_udfs: Vec<ScalarUDF>,
38    custom_udafs: Vec<AggregateUDF>,
39}
40
41impl LaminarDbBuilder {
42    /// Create a new builder with default settings.
43    #[must_use]
44    pub fn new() -> Self {
45        Self {
46            config: LaminarConfig::default(),
47            config_vars: HashMap::new(),
48            connector_callbacks: Vec::new(),
49            profile: Profile::default(),
50            profile_explicit: false,
51            object_store_url: None,
52            object_store_options: HashMap::new(),
53            custom_udfs: Vec::new(),
54            custom_udafs: Vec::new(),
55        }
56    }
57
58    /// Set a config variable for `${VAR}` substitution in SQL.
59    #[must_use]
60    pub fn config_var(mut self, key: &str, value: &str) -> Self {
61        self.config_vars.insert(key.to_string(), value.to_string());
62        self
63    }
64
65    /// Set the default buffer size for streaming channels.
66    #[must_use]
67    pub fn buffer_size(mut self, size: usize) -> Self {
68        self.config.default_buffer_size = size;
69        self
70    }
71
72    /// Set the default backpressure strategy.
73    #[must_use]
74    pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
75        self.config.default_backpressure = strategy;
76        self
77    }
78
79    /// Set the storage directory for WAL and checkpoints.
80    #[must_use]
81    pub fn storage_dir(mut self, path: impl Into<PathBuf>) -> Self {
82        self.config.storage_dir = Some(path.into());
83        self
84    }
85
86    /// Set checkpoint configuration.
87    #[must_use]
88    pub fn checkpoint(mut self, config: StreamCheckpointConfig) -> Self {
89        self.config.checkpoint = Some(config);
90        self
91    }
92
93    /// Set the deployment profile.
94    ///
95    /// See [`Profile`] for the available tiers.
96    #[must_use]
97    pub fn profile(mut self, profile: Profile) -> Self {
98        self.profile = profile;
99        self.profile_explicit = true;
100        self
101    }
102
103    /// Set the object-store URL for durable checkpoints.
104    ///
105    /// Required when using [`Profile::Durable`] or
106    /// [`Profile::Delta`].
107    #[must_use]
108    pub fn object_store_url(mut self, url: impl Into<String>) -> Self {
109        self.object_store_url = Some(url.into());
110        self
111    }
112
113    /// Set explicit credential/config overrides for the object store.
114    ///
115    /// Keys are backend-specific (e.g., `aws_access_key_id`, `aws_region`).
116    /// These supplement environment-variable-based credential resolution.
117    #[must_use]
118    pub fn object_store_options(mut self, opts: HashMap<String, String>) -> Self {
119        self.object_store_options = opts;
120        self
121    }
122
123    /// Set the S3 storage class tiering configuration.
124    #[must_use]
125    pub fn tiering(mut self, tiering: crate::config::TieringConfig) -> Self {
126        self.config.tiering = Some(tiering);
127        self
128    }
129
130    /// Override thread-per-core runtime settings.
131    ///
132    /// The pipeline always uses TPC mode. This method overrides the
133    /// auto-detected defaults (core count, CPU pinning, NUMA awareness).
134    #[must_use]
135    pub fn with_tpc(mut self, tpc: crate::config::TpcRuntimeConfig) -> Self {
136        self.config.tpc = Some(tpc);
137        self
138    }
139
140    /// Register a custom scalar UDF with the database.
141    ///
142    /// The UDF will be available in SQL queries after `build()`.
143    ///
144    /// # Example
145    ///
146    /// ```rust,ignore
147    /// use datafusion_expr::ScalarUDF;
148    ///
149    /// let db = LaminarDB::builder()
150    ///     .register_udf(my_scalar_udf)
151    ///     .build()
152    ///     .await?;
153    /// ```
154    #[must_use]
155    pub fn register_udf(mut self, udf: ScalarUDF) -> Self {
156        self.custom_udfs.push(udf);
157        self
158    }
159
160    /// Register a custom aggregate UDF (UDAF) with the database.
161    ///
162    /// The UDAF will be available in SQL queries after `build()`.
163    ///
164    /// # Example
165    ///
166    /// ```rust,ignore
167    /// use datafusion_expr::AggregateUDF;
168    ///
169    /// let db = LaminarDB::builder()
170    ///     .register_udaf(my_aggregate_udf)
171    ///     .build()
172    ///     .await?;
173    /// ```
174    #[must_use]
175    pub fn register_udaf(mut self, udaf: AggregateUDF) -> Self {
176        self.custom_udafs.push(udaf);
177        self
178    }
179
180    /// Register custom connectors with the `ConnectorRegistry`.
181    ///
182    /// The callback is invoked after the database is created and built-in
183    /// connectors are registered. Use it to add user-defined source/sink
184    /// implementations.
185    ///
186    /// # Example
187    ///
188    /// ```rust,ignore
189    /// let db = LaminarDB::builder()
190    ///     .register_connector(|registry| {
191    ///         registry.register_source("my-source", info, factory);
192    ///     })
193    ///     .build()
194    ///     .await?;
195    /// ```
196    #[must_use]
197    pub fn register_connector(
198        mut self,
199        f: impl FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send + 'static,
200    ) -> Self {
201        self.connector_callbacks.push(Box::new(f));
202        self
203    }
204
205    /// Build the `LaminarDB` instance.
206    ///
207    /// # Errors
208    ///
209    /// Returns `DbError` if database creation fails.
210    #[allow(clippy::unused_async)]
211    pub async fn build(mut self) -> Result<LaminarDB, DbError> {
212        // Forward object store settings into the config before profile detection.
213        self.config.object_store_url = self.object_store_url;
214        self.config.object_store_options = self.object_store_options;
215
216        // Auto-detect profile from config if not explicitly set.
217        if !self.profile_explicit {
218            self.profile = Profile::from_config(&self.config, false);
219        }
220
221        // Validate profile feature gates and config requirements.
222        self.profile
223            .validate_features()
224            .map_err(|e| DbError::Config(e.to_string()))?;
225        self.profile
226            .validate_config(&self.config, self.config.object_store_url.as_deref())
227            .map_err(|e| DbError::Config(e.to_string()))?;
228
229        // Apply profile defaults for fields the user hasn't set.
230        self.profile.apply_defaults(&mut self.config);
231
232        let db = LaminarDB::open_with_config_and_vars(self.config, self.config_vars)?;
233        for callback in self.connector_callbacks {
234            callback(db.connector_registry());
235        }
236        for udf in self.custom_udfs {
237            db.register_custom_udf(udf);
238        }
239        for udaf in self.custom_udafs {
240            db.register_custom_udaf(udaf);
241        }
242        Ok(db)
243    }
244}
245
246impl Default for LaminarDbBuilder {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252impl std::fmt::Debug for LaminarDbBuilder {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        f.debug_struct("LaminarDbBuilder")
255            .field("config", &self.config)
256            .field("profile", &self.profile)
257            .field("profile_explicit", &self.profile_explicit)
258            .field("object_store_url", &self.object_store_url)
259            .field(
260                "object_store_options_count",
261                &self.object_store_options.len(),
262            )
263            .field("config_vars_count", &self.config_vars.len())
264            .field("connector_callbacks", &self.connector_callbacks.len())
265            .field("custom_udfs", &self.custom_udfs.len())
266            .field("custom_udafs", &self.custom_udafs.len())
267            .finish()
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[tokio::test]
276    async fn test_default_builder() {
277        let db = LaminarDbBuilder::new().build().await.unwrap();
278        assert!(!db.is_closed());
279    }
280
281    #[tokio::test]
282    async fn test_builder_with_config_vars() {
283        let db = LaminarDbBuilder::new()
284            .config_var("KAFKA_BROKERS", "localhost:9092")
285            .config_var("GROUP_ID", "test-group")
286            .build()
287            .await
288            .unwrap();
289        assert!(!db.is_closed());
290    }
291
292    #[tokio::test]
293    async fn test_builder_with_options() {
294        let db = LaminarDbBuilder::new()
295            .buffer_size(131_072)
296            .build()
297            .await
298            .unwrap();
299        assert!(!db.is_closed());
300    }
301
302    #[tokio::test]
303    async fn test_builder_from_laminardb() {
304        let db = LaminarDB::builder().build().await.unwrap();
305        assert!(!db.is_closed());
306    }
307
308    #[test]
309    fn test_builder_debug() {
310        let builder = LaminarDbBuilder::new().config_var("K", "V");
311        let debug = format!("{builder:?}");
312        assert!(debug.contains("LaminarDbBuilder"));
313        assert!(debug.contains("config_vars_count: 1"));
314    }
315
316    #[tokio::test]
317    async fn test_builder_register_udf() {
318        use std::any::Any;
319        use std::hash::{Hash, Hasher};
320
321        use arrow::datatypes::DataType;
322        use datafusion_expr::{
323            ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
324        };
325
326        /// Trivial UDF that returns 42.
327        #[derive(Debug)]
328        struct FortyTwo {
329            signature: Signature,
330        }
331
332        impl FortyTwo {
333            fn new() -> Self {
334                Self {
335                    signature: Signature::new(TypeSignature::Nullary, Volatility::Immutable),
336                }
337            }
338        }
339
340        impl PartialEq for FortyTwo {
341            fn eq(&self, _: &Self) -> bool {
342                true
343            }
344        }
345
346        impl Eq for FortyTwo {}
347
348        impl Hash for FortyTwo {
349            fn hash<H: Hasher>(&self, state: &mut H) {
350                "forty_two".hash(state);
351            }
352        }
353
354        impl ScalarUDFImpl for FortyTwo {
355            fn as_any(&self) -> &dyn Any {
356                self
357            }
358            fn name(&self) -> &'static str {
359                "forty_two"
360            }
361            fn signature(&self) -> &Signature {
362                &self.signature
363            }
364            fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
365                Ok(DataType::Int64)
366            }
367            fn invoke_with_args(
368                &self,
369                _args: ScalarFunctionArgs,
370            ) -> datafusion_common::Result<ColumnarValue> {
371                Ok(ColumnarValue::Scalar(
372                    datafusion_common::ScalarValue::Int64(Some(42)),
373                ))
374            }
375        }
376
377        let udf = ScalarUDF::new_from_impl(FortyTwo::new());
378        let db = LaminarDB::builder()
379            .register_udf(udf)
380            .build()
381            .await
382            .unwrap();
383
384        // Verify the UDF is queryable
385        let result = db.execute("SELECT forty_two()").await;
386        assert!(result.is_ok(), "UDF should be callable: {result:?}");
387    }
388}