Skip to main content

laminar_db/
builder.rs

1//! Fluent builder for `LaminarDB` construction.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use datafusion_expr::{AggregateUDF, ScalarUDF};
8use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
9
10use crate::config::LaminarConfig;
11use crate::db::LaminarDB;
12use crate::error::DbError;
13use crate::profile::Profile;
14
15/// Callback for registering custom connectors.
16type ConnectorCallback = Box<dyn FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send>;
17
18/// Fluent builder for constructing a [`LaminarDB`] instance.
19///
20/// # Example
21///
22/// ```rust,ignore
23/// let db = LaminarDB::builder()
24///     .config_var("KAFKA_BROKERS", "localhost:9092")
25///     .buffer_size(131072)
26///     .build()
27///     .await?;
28/// ```
29pub struct LaminarDbBuilder {
30    config: LaminarConfig,
31    config_vars: HashMap<String, String>,
32    connector_callbacks: Vec<ConnectorCallback>,
33    profile: Profile,
34    profile_explicit: bool,
35    object_store_url: Option<String>,
36    object_store_options: HashMap<String, String>,
37    custom_udfs: Vec<ScalarUDF>,
38    custom_udafs: Vec<AggregateUDF>,
39    /// Cluster control facade installed at cluster-mode startup.
40    /// Stays `None` in embedded / single-instance builds.
41    #[cfg(feature = "cluster")]
42    cluster_controller: Option<std::sync::Arc<laminar_core::cluster::control::ClusterController>>,
43    /// Outbound shuffle handle for cluster-mode streaming aggregates.
44    /// Pair with `shuffle_receiver`; without it, streaming aggregates
45    /// run single-node even when the cluster controller is installed.
46    #[cfg(feature = "cluster")]
47    shuffle_sender: Option<std::sync::Arc<laminar_core::shuffle::ShuffleSender>>,
48    /// Inbound shuffle handle for cluster-mode streaming aggregates.
49    #[cfg(feature = "cluster")]
50    shuffle_receiver: Option<std::sync::Arc<laminar_core::shuffle::ShuffleReceiver>>,
51    /// Commit-marker store for cross-instance 2PC.
52    #[cfg(feature = "cluster")]
53    decision_store: Option<std::sync::Arc<laminar_core::cluster::control::CheckpointDecisionStore>>,
54    /// Assignment-snapshot store for dynamic rebalance.
55    #[cfg(feature = "cluster")]
56    assignment_snapshot_store:
57        Option<std::sync::Arc<laminar_core::cluster::control::AssignmentSnapshotStore>>,
58    /// Catalog-manifest store for cluster-wide DDL replay on boot/rebalance.
59    #[cfg(feature = "cluster")]
60    catalog_manifest_store:
61        Option<std::sync::Arc<laminar_core::cluster::control::CatalogManifestStore>>,
62    /// Optional state backend. When paired with `vnode_registry`, the
63    /// coordinator writes per-vnode durability markers each checkpoint
64    /// and consults `epoch_complete` before committing sinks.
65    state_backend: Option<std::sync::Arc<dyn laminar_core::state::StateBackend>>,
66    /// Optional vnode topology. See `state_backend`.
67    vnode_registry: Option<std::sync::Arc<laminar_core::state::VnodeRegistry>>,
68    /// Extra physical optimizer rules installed on the `SessionState`
69    /// at construction.
70    physical_optimizer_rules: Vec<
71        std::sync::Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
72    >,
73    /// Override for `target_partitions`; cluster mode sets this to
74    /// `vnode_count`. Default 1 for single-instance streaming.
75    target_partitions: Option<usize>,
76    /// Assembled AI subsystem; installed when `[ai]`/`[models]` are configured.
77    ai_runtime: Option<std::sync::Arc<crate::ai::AiRuntime>>,
78}
79
80impl LaminarDbBuilder {
81    /// Create a new builder with default settings.
82    #[must_use]
83    pub fn new() -> Self {
84        Self {
85            config: LaminarConfig::default(),
86            config_vars: HashMap::new(),
87            connector_callbacks: Vec::new(),
88            profile: Profile::default(),
89            profile_explicit: false,
90            object_store_url: None,
91            object_store_options: HashMap::new(),
92            custom_udfs: Vec::new(),
93            custom_udafs: Vec::new(),
94            #[cfg(feature = "cluster")]
95            cluster_controller: None,
96            #[cfg(feature = "cluster")]
97            shuffle_sender: None,
98            #[cfg(feature = "cluster")]
99            shuffle_receiver: None,
100            #[cfg(feature = "cluster")]
101            decision_store: None,
102            #[cfg(feature = "cluster")]
103            assignment_snapshot_store: None,
104            #[cfg(feature = "cluster")]
105            catalog_manifest_store: None,
106            state_backend: None,
107            vnode_registry: None,
108            physical_optimizer_rules: Vec::new(),
109            target_partitions: None,
110            ai_runtime: None,
111        }
112    }
113
114    /// Install the assembled AI subsystem (model registry + provider clients +
115    /// result cache + call log). Without it, `ai_*` SQL functions fail at plan
116    /// time. Built from server `[ai]`/`[models]` configuration.
117    #[must_use]
118    pub fn ai(mut self, runtime: std::sync::Arc<crate::ai::AiRuntime>) -> Self {
119        self.ai_runtime = Some(runtime);
120        self
121    }
122
123    /// Override `target_partitions`; requires a distributed-aware
124    /// physical optimizer rule to replace `RepartitionExec`.
125    #[must_use]
126    pub fn target_partitions(mut self, n: usize) -> Self {
127        self.target_partitions = Some(n);
128        self
129    }
130
131    /// Register an additional `PhysicalOptimizerRule` on the session state.
132    #[must_use]
133    pub fn physical_optimizer_rule(
134        mut self,
135        rule: std::sync::Arc<
136            dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync,
137        >,
138    ) -> Self {
139        self.physical_optimizer_rules.push(rule);
140        self
141    }
142
143    /// Install a state backend. Must be paired with
144    /// [`Self::vnode_registry`] — `build()` rejects a half-set
145    /// configuration.
146    #[must_use]
147    pub fn state_backend(
148        mut self,
149        backend: std::sync::Arc<dyn laminar_core::state::StateBackend>,
150    ) -> Self {
151        self.state_backend = Some(backend);
152        self
153    }
154
155    /// Install a vnode registry. Must be paired with
156    /// [`Self::state_backend`] — `build()` rejects a half-set
157    /// configuration.
158    #[must_use]
159    pub fn vnode_registry(
160        mut self,
161        registry: std::sync::Arc<laminar_core::state::VnodeRegistry>,
162    ) -> Self {
163        self.vnode_registry = Some(registry);
164        self
165    }
166
167    /// Install a cluster control facade. Activates cluster-mode
168    /// checkpoint / shuffle semantics inside the engine. Called from
169    /// `laminar-server`'s cluster startup path after discovery has
170    /// converged.
171    #[cfg(feature = "cluster")]
172    #[must_use]
173    pub fn cluster_controller(
174        mut self,
175        controller: std::sync::Arc<laminar_core::cluster::control::ClusterController>,
176    ) -> Self {
177        self.cluster_controller = Some(controller);
178        self
179    }
180
181    /// Install the outbound shuffle handle used by cluster-mode streaming
182    /// aggregates. Rows whose group key hashes to a remote vnode are
183    /// shipped through this sender. Pair with [`Self::shuffle_receiver`];
184    /// either alone is a no-op.
185    #[cfg(feature = "cluster")]
186    #[must_use]
187    pub fn shuffle_sender(
188        mut self,
189        sender: std::sync::Arc<laminar_core::shuffle::ShuffleSender>,
190    ) -> Self {
191        self.shuffle_sender = Some(sender);
192        self
193    }
194
195    /// Install the inbound shuffle handle used by cluster-mode streaming
196    /// aggregates. Remote partial-aggregate rows arrive here and are
197    /// drained into the local accumulator each cycle.
198    #[cfg(feature = "cluster")]
199    #[must_use]
200    pub fn shuffle_receiver(
201        mut self,
202        receiver: std::sync::Arc<laminar_core::shuffle::ShuffleReceiver>,
203    ) -> Self {
204        self.shuffle_receiver = Some(receiver);
205        self
206    }
207
208    /// Install the commit-marker store for cross-instance 2PC.
209    #[cfg(feature = "cluster")]
210    #[must_use]
211    pub fn decision_store(
212        mut self,
213        store: std::sync::Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
214    ) -> Self {
215        self.decision_store = Some(store);
216        self
217    }
218
219    /// Install the assignment-snapshot store for dynamic rebalance.
220    #[cfg(feature = "cluster")]
221    #[must_use]
222    pub fn assignment_snapshot_store(
223        mut self,
224        store: std::sync::Arc<laminar_core::cluster::control::AssignmentSnapshotStore>,
225    ) -> Self {
226        self.assignment_snapshot_store = Some(store);
227        self
228    }
229
230    /// Install the catalog-manifest store for cluster-wide DDL replay.
231    #[cfg(feature = "cluster")]
232    #[must_use]
233    pub fn catalog_manifest_store(
234        mut self,
235        store: std::sync::Arc<laminar_core::cluster::control::CatalogManifestStore>,
236    ) -> Self {
237        self.catalog_manifest_store = Some(store);
238        self
239    }
240
241    /// Set a config variable for `${VAR}` substitution in SQL.
242    #[must_use]
243    pub fn config_var(mut self, key: &str, value: &str) -> Self {
244        self.config_vars.insert(key.to_string(), value.to_string());
245        self
246    }
247
248    /// Set the bearer token presented when forwarding requests to the cluster
249    /// leader's HTTP API.
250    #[must_use]
251    pub fn http_auth_token(mut self, token: impl Into<String>) -> Self {
252        self.config.http_auth_token = Some(crate::config::SecretString::new(token));
253        self
254    }
255
256    /// Set the default buffer size for streaming channels.
257    #[must_use]
258    pub fn buffer_size(mut self, size: usize) -> Self {
259        self.config.default_buffer_size = size;
260        self
261    }
262
263    /// Set the default backpressure strategy.
264    #[must_use]
265    pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
266        self.config.default_backpressure = strategy;
267        self
268    }
269
270    /// Set the storage directory for WAL and checkpoints.
271    #[must_use]
272    pub fn storage_dir(mut self, path: impl Into<PathBuf>) -> Self {
273        self.config.storage_dir = Some(path.into());
274        self
275    }
276
277    /// Set checkpoint configuration.
278    #[must_use]
279    pub fn checkpoint(mut self, config: StreamCheckpointConfig) -> Self {
280        self.config.checkpoint = Some(config);
281        self
282    }
283
284    /// Set the deployment profile.
285    ///
286    /// See [`Profile`] for the available tiers.
287    #[must_use]
288    pub fn profile(mut self, profile: Profile) -> Self {
289        self.profile = profile;
290        self.profile_explicit = true;
291        self
292    }
293
294    /// Set the object-store URL for durable checkpoints.
295    ///
296    /// Required when using [`Profile::Durable`] or
297    /// [`Profile::Cluster`].
298    #[must_use]
299    pub fn object_store_url(mut self, url: impl Into<String>) -> Self {
300        self.object_store_url = Some(url.into());
301        self
302    }
303
304    /// Set explicit credential/config overrides for the object store.
305    ///
306    /// Keys are backend-specific (e.g., `aws_access_key_id`, `aws_region`).
307    /// These supplement environment-variable-based credential resolution.
308    #[must_use]
309    pub fn object_store_options(mut self, opts: HashMap<String, String>) -> Self {
310        self.object_store_options = opts;
311        self
312    }
313
314    /// Set the end-to-end delivery guarantee for the pipeline.
315    #[must_use]
316    pub fn delivery_guarantee(
317        mut self,
318        guarantee: laminar_connectors::connector::DeliveryGuarantee,
319    ) -> Self {
320        self.config.delivery_guarantee = guarantee;
321        self
322    }
323
324    /// Register a custom scalar UDF with the database.
325    ///
326    /// The UDF will be available in SQL queries after `build()`.
327    ///
328    /// # Example
329    ///
330    /// ```rust,ignore
331    /// use datafusion_expr::ScalarUDF;
332    ///
333    /// let db = LaminarDB::builder()
334    ///     .register_udf(my_scalar_udf)
335    ///     .build()
336    ///     .await?;
337    /// ```
338    #[must_use]
339    pub fn register_udf(mut self, udf: ScalarUDF) -> Self {
340        self.custom_udfs.push(udf);
341        self
342    }
343
344    /// Register a custom aggregate UDF (UDAF) with the database.
345    ///
346    /// The UDAF will be available in SQL queries after `build()`.
347    ///
348    /// # Example
349    ///
350    /// ```rust,ignore
351    /// use datafusion_expr::AggregateUDF;
352    ///
353    /// let db = LaminarDB::builder()
354    ///     .register_udaf(my_aggregate_udf)
355    ///     .build()
356    ///     .await?;
357    /// ```
358    #[must_use]
359    pub fn register_udaf(mut self, udaf: AggregateUDF) -> Self {
360        self.custom_udafs.push(udaf);
361        self
362    }
363
364    /// Source → coordinator channel capacity (default 64). Increase for
365    /// burst absorption at the cost of memory.
366    #[must_use]
367    pub fn pipeline_channel_capacity(mut self, capacity: usize) -> Self {
368        self.config.pipeline_channel_capacity = Some(capacity);
369        self
370    }
371
372    /// Micro-batch coalescing window (default 5ms for connectors, 0 for
373    /// embedded). Larger values amortize per-cycle SQL overhead.
374    #[must_use]
375    pub fn pipeline_batch_window(mut self, window: std::time::Duration) -> Self {
376        self.config.pipeline_batch_window = Some(window);
377        self
378    }
379
380    /// Max time draining the source channel per cycle, in nanoseconds
381    /// (default 1ms). Increase to process more messages per SQL execution.
382    #[must_use]
383    pub fn pipeline_drain_budget_ns(mut self, ns: u64) -> Self {
384        self.config.pipeline_drain_budget_ns = Some(ns);
385        self
386    }
387
388    /// Per-query execution budget in nanoseconds (default 8ms). When
389    /// exceeded, remaining queries are deferred to the next cycle.
390    #[must_use]
391    pub fn pipeline_query_budget_ns(mut self, ns: u64) -> Self {
392        self.config.pipeline_query_budget_ns = Some(ns);
393        self
394    }
395
396    /// Per-port operator input-buffer cap in batches (default 256).
397    #[must_use]
398    pub fn pipeline_max_input_buf_batches(mut self, batches: usize) -> Self {
399        self.config.pipeline_max_input_buf_batches = Some(batches);
400        self
401    }
402
403    /// Per-port operator input-buffer cap in bytes.
404    #[must_use]
405    pub fn pipeline_max_input_buf_bytes(mut self, bytes: usize) -> Self {
406        self.config.pipeline_max_input_buf_bytes = Some(bytes);
407        self
408    }
409
410    /// Backpressure policy (default `Backpressure`).
411    #[must_use]
412    pub fn pipeline_backpressure_policy(
413        mut self,
414        policy: crate::config::BackpressurePolicy,
415    ) -> Self {
416        self.config.pipeline_backpressure_policy = policy;
417        self
418    }
419
420    /// Register custom connectors with the `ConnectorRegistry`.
421    ///
422    /// The callback is invoked after the database is created and built-in
423    /// connectors are registered. Use it to add user-defined source/sink
424    /// implementations.
425    ///
426    /// # Example
427    ///
428    /// ```rust,ignore
429    /// let db = LaminarDB::builder()
430    ///     .register_connector(|registry| {
431    ///         registry.register_source("my-source", info, factory);
432    ///     })
433    ///     .build()
434    ///     .await?;
435    /// ```
436    #[must_use]
437    pub fn register_connector(
438        mut self,
439        f: impl FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send + 'static,
440    ) -> Self {
441        self.connector_callbacks.push(Box::new(f));
442        self
443    }
444
445    /// Build the `LaminarDB` instance.
446    ///
447    /// # Errors
448    ///
449    /// Returns `DbError` if database creation fails.
450    #[allow(clippy::unused_async)]
451    pub async fn build(mut self) -> Result<LaminarDB, DbError> {
452        // Forward object store settings into the config before profile detection.
453        self.config.object_store_url = self.object_store_url;
454        self.config.object_store_options = self.object_store_options;
455
456        // Auto-detect profile from config if not explicitly set.
457        if !self.profile_explicit {
458            self.profile = Profile::from_config(&self.config, false);
459        }
460
461        // Validate profile feature gates and config requirements.
462        self.profile
463            .validate_features()
464            .map_err(|e| DbError::Config(e.to_string()))?;
465        self.profile
466            .validate_config(&self.config, self.config.object_store_url.as_deref())
467            .map_err(|e| DbError::Config(e.to_string()))?;
468
469        Self::validate_backpressure(&self.config)?;
470
471        // State backend and vnode registry must be paired.
472        match (&self.state_backend, &self.vnode_registry) {
473            (Some(_), None) => {
474                return Err(DbError::Config(
475                    "state_backend is set but vnode_registry is missing".into(),
476                ));
477            }
478            (None, Some(_)) => {
479                return Err(DbError::Config(
480                    "vnode_registry is set but state_backend is missing".into(),
481                ));
482            }
483            _ => {}
484        }
485
486        // Apply profile defaults for fields the user hasn't set.
487        self.profile.apply_defaults(&mut self.config);
488
489        let mut db = LaminarDB::open_with_config_and_vars_and_rules(
490            self.config,
491            self.config_vars,
492            &self.physical_optimizer_rules,
493            self.target_partitions,
494        )?;
495        if let Some(runtime) = self.ai_runtime {
496            // Inference workers require a running Tokio runtime.
497            let handle = tokio::runtime::Handle::try_current().map_err(|_| {
498                DbError::InvalidOperation(
499                    "LaminarDB::build() with an AI runtime must run inside a Tokio runtime"
500                        .to_string(),
501                )
502            })?;
503            db.set_ai_runtime(runtime, handle);
504        }
505        for callback in self.connector_callbacks {
506            callback(db.connector_registry());
507        }
508        for udf in self.custom_udfs {
509            db.register_custom_udf(udf);
510        }
511        for udaf in self.custom_udafs {
512            db.register_custom_udaf(udaf);
513        }
514        #[cfg(feature = "cluster")]
515        if let Some(controller) = self.cluster_controller {
516            db.set_cluster_controller(controller);
517        }
518        #[cfg(feature = "cluster")]
519        if let Some(sender) = self.shuffle_sender {
520            db.set_shuffle_sender(sender);
521        }
522        #[cfg(feature = "cluster")]
523        if let Some(receiver) = self.shuffle_receiver {
524            db.set_shuffle_receiver(receiver);
525        }
526        #[cfg(feature = "cluster")]
527        if let Some(store) = self.decision_store {
528            db.set_decision_store(store);
529        }
530        #[cfg(feature = "cluster")]
531        if let Some(store) = self.assignment_snapshot_store {
532            db.set_assignment_snapshot_store(store);
533        }
534        #[cfg(feature = "cluster")]
535        if let Some(store) = self.catalog_manifest_store {
536            db.set_catalog_manifest_store(store);
537        }
538        if let Some(backend) = self.state_backend {
539            db.set_state_backend(backend);
540        }
541        if let Some(registry) = self.vnode_registry {
542            db.set_vnode_registry(registry);
543        }
544        Ok(db)
545    }
546
547    fn validate_backpressure(config: &LaminarConfig) -> Result<(), DbError> {
548        use crate::config::BackpressurePolicy;
549        use laminar_connectors::connector::DeliveryGuarantee;
550
551        let policy = config.pipeline_backpressure_policy;
552        if policy == BackpressurePolicy::Backpressure {
553            return Ok(());
554        }
555
556        // Non-default policy needs at least one finite, non-zero cap.
557        let has_count_cap = config.pipeline_max_input_buf_batches.is_none_or(|c| c > 0);
558        let has_byte_cap = config.pipeline_max_input_buf_bytes.is_some_and(|b| b > 0);
559        if !has_count_cap && !has_byte_cap {
560            return Err(DbError::Config(format!(
561                "backpressure_policy={policy:?} requires at least one of \
562                 pipeline_max_input_buf_batches (>0) or pipeline_max_input_buf_bytes"
563            )));
564        }
565
566        if policy == BackpressurePolicy::ShedOldest
567            && config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
568        {
569            return Err(DbError::Config(
570                "ShedOldest drops data; it is incompatible with exactly-once \
571                 delivery. Use Backpressure or Fail, or downgrade the guarantee."
572                    .into(),
573            ));
574        }
575        Ok(())
576    }
577}
578
579impl Default for LaminarDbBuilder {
580    fn default() -> Self {
581        Self::new()
582    }
583}
584
585impl std::fmt::Debug for LaminarDbBuilder {
586    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
587        f.debug_struct("LaminarDbBuilder")
588            .field("config", &self.config)
589            .field("profile", &self.profile)
590            .field("profile_explicit", &self.profile_explicit)
591            .field("object_store_url", &self.object_store_url)
592            .field(
593                "object_store_options_count",
594                &self.object_store_options.len(),
595            )
596            .field("config_vars_count", &self.config_vars.len())
597            .field("connector_callbacks", &self.connector_callbacks.len())
598            .field("custom_udfs", &self.custom_udfs.len())
599            .field("custom_udafs", &self.custom_udafs.len())
600            .finish_non_exhaustive()
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607
608    #[tokio::test]
609    async fn test_default_builder() {
610        let db = LaminarDbBuilder::new().build().await.unwrap();
611        assert!(!db.is_closed());
612    }
613
614    #[tokio::test]
615    async fn test_shed_oldest_requires_cap() {
616        use crate::config::BackpressurePolicy;
617        let err = LaminarDbBuilder::new()
618            .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
619            .pipeline_max_input_buf_batches(0)
620            .build()
621            .await
622            .expect_err("ShedOldest with no caps must be rejected");
623        assert!(err.to_string().contains("requires at least one"), "{err}");
624    }
625
626    #[tokio::test]
627    async fn test_shed_oldest_rejects_exactly_once() {
628        use crate::config::BackpressurePolicy;
629        use laminar_connectors::connector::DeliveryGuarantee;
630        let err = LaminarDbBuilder::new()
631            .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
632            .pipeline_max_input_buf_batches(64)
633            .delivery_guarantee(DeliveryGuarantee::ExactlyOnce)
634            .build()
635            .await
636            .expect_err("ShedOldest + ExactlyOnce must be rejected");
637        assert!(err.to_string().contains("exactly-once"), "{err}");
638    }
639
640    #[tokio::test]
641    async fn test_valid_shed_oldest_builds() {
642        use crate::config::BackpressurePolicy;
643        let db = LaminarDbBuilder::new()
644            .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
645            .pipeline_max_input_buf_batches(64)
646            .build()
647            .await
648            .unwrap();
649        assert!(!db.is_closed());
650    }
651
652    #[tokio::test]
653    async fn test_builder_with_config_vars() {
654        let db = LaminarDbBuilder::new()
655            .config_var("KAFKA_BROKERS", "localhost:9092")
656            .config_var("GROUP_ID", "test-group")
657            .build()
658            .await
659            .unwrap();
660        assert!(!db.is_closed());
661    }
662
663    #[tokio::test]
664    async fn test_builder_with_options() {
665        let db = LaminarDbBuilder::new()
666            .buffer_size(131_072)
667            .build()
668            .await
669            .unwrap();
670        assert!(!db.is_closed());
671    }
672
673    #[tokio::test]
674    async fn test_builder_from_laminardb() {
675        let db = LaminarDB::builder().build().await.unwrap();
676        assert!(!db.is_closed());
677    }
678
679    #[test]
680    fn test_builder_debug() {
681        let builder = LaminarDbBuilder::new().config_var("K", "V");
682        let debug = format!("{builder:?}");
683        assert!(debug.contains("LaminarDbBuilder"));
684        assert!(debug.contains("config_vars_count: 1"));
685    }
686
687    #[tokio::test]
688    async fn test_builder_register_udf() {
689        use std::any::Any;
690        use std::hash::{Hash, Hasher};
691
692        use arrow::datatypes::DataType;
693        use datafusion_expr::{
694            ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
695        };
696
697        /// Trivial UDF that returns 42.
698        #[derive(Debug)]
699        struct FortyTwo {
700            signature: Signature,
701        }
702
703        impl FortyTwo {
704            fn new() -> Self {
705                Self {
706                    signature: Signature::new(TypeSignature::Nullary, Volatility::Immutable),
707                }
708            }
709        }
710
711        impl PartialEq for FortyTwo {
712            fn eq(&self, _: &Self) -> bool {
713                true
714            }
715        }
716
717        impl Eq for FortyTwo {}
718
719        impl Hash for FortyTwo {
720            fn hash<H: Hasher>(&self, state: &mut H) {
721                "forty_two".hash(state);
722            }
723        }
724
725        impl ScalarUDFImpl for FortyTwo {
726            fn as_any(&self) -> &dyn Any {
727                self
728            }
729            fn name(&self) -> &'static str {
730                "forty_two"
731            }
732            fn signature(&self) -> &Signature {
733                &self.signature
734            }
735            fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
736                Ok(DataType::Int64)
737            }
738            fn invoke_with_args(
739                &self,
740                _args: ScalarFunctionArgs,
741            ) -> datafusion_common::Result<ColumnarValue> {
742                Ok(ColumnarValue::Scalar(
743                    datafusion_common::ScalarValue::Int64(Some(42)),
744                ))
745            }
746        }
747
748        let udf = ScalarUDF::new_from_impl(FortyTwo::new());
749        let db = LaminarDB::builder()
750            .register_udf(udf)
751            .build()
752            .await
753            .unwrap();
754
755        // Verify the UDF is queryable
756        let result = db.execute("SELECT forty_two()").await;
757        assert!(result.is_ok(), "UDF should be callable: {result:?}");
758    }
759}