Skip to main content

laminar_db/
builder.rs

1//! Fluent builder for `LaminarDB` construction.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use datafusion_expr::{AggregateUDF, ScalarUDF};
8use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
9
10use crate::config::LaminarConfig;
11use crate::db::LaminarDB;
12use crate::error::DbError;
13use crate::profile::Profile;
14
15/// Callback for registering custom connectors.
16type ConnectorCallback = Box<dyn FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send>;
17
18/// Fluent builder for constructing a [`LaminarDB`] instance.
19///
20/// # Example
21///
22/// ```rust,ignore
23/// let db = LaminarDB::builder()
24///     .config_var("KAFKA_BROKERS", "localhost:9092")
25///     .buffer_size(131072)
26///     .build()
27///     .await?;
28/// ```
29pub struct LaminarDbBuilder {
30    config: LaminarConfig,
31    config_vars: HashMap<String, String>,
32    connector_callbacks: Vec<ConnectorCallback>,
33    profile: Profile,
34    profile_explicit: bool,
35    object_store_url: Option<String>,
36    object_store_options: HashMap<String, String>,
37    custom_udfs: Vec<ScalarUDF>,
38    custom_udafs: Vec<AggregateUDF>,
39    /// Cluster control facade installed at cluster-mode startup.
40    /// Stays `None` in embedded / single-instance builds.
41    #[cfg(feature = "cluster-unstable")]
42    cluster_controller: Option<std::sync::Arc<laminar_core::cluster::control::ClusterController>>,
43    /// Outbound shuffle handle for cluster-mode streaming aggregates.
44    /// Pair with `shuffle_receiver`; without it, streaming aggregates
45    /// run single-node even when the cluster controller is installed.
46    #[cfg(feature = "cluster-unstable")]
47    shuffle_sender: Option<std::sync::Arc<laminar_core::shuffle::ShuffleSender>>,
48    /// Inbound shuffle handle for cluster-mode streaming aggregates.
49    #[cfg(feature = "cluster-unstable")]
50    shuffle_receiver: Option<std::sync::Arc<laminar_core::shuffle::ShuffleReceiver>>,
51    /// Commit-marker store for cross-instance 2PC.
52    #[cfg(feature = "cluster-unstable")]
53    decision_store: Option<std::sync::Arc<laminar_core::cluster::control::CheckpointDecisionStore>>,
54    /// Assignment-snapshot store for dynamic rebalance.
55    #[cfg(feature = "cluster-unstable")]
56    assignment_snapshot_store:
57        Option<std::sync::Arc<laminar_core::cluster::control::AssignmentSnapshotStore>>,
58    /// Optional state backend. When paired with `vnode_registry`, the
59    /// coordinator writes per-vnode durability markers each checkpoint
60    /// and consults `epoch_complete` before committing sinks.
61    state_backend: Option<std::sync::Arc<dyn laminar_core::state::StateBackend>>,
62    /// Optional vnode topology. See `state_backend`.
63    vnode_registry: Option<std::sync::Arc<laminar_core::state::VnodeRegistry>>,
64    /// Extra physical optimizer rules installed on the `SessionState`
65    /// at construction.
66    physical_optimizer_rules: Vec<
67        std::sync::Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
68    >,
69    /// Override for `target_partitions`; cluster mode sets this to
70    /// `vnode_count`. Default 1 for single-instance streaming.
71    target_partitions: Option<usize>,
72}
73
74impl LaminarDbBuilder {
75    /// Create a new builder with default settings.
76    #[must_use]
77    pub fn new() -> Self {
78        Self {
79            config: LaminarConfig::default(),
80            config_vars: HashMap::new(),
81            connector_callbacks: Vec::new(),
82            profile: Profile::default(),
83            profile_explicit: false,
84            object_store_url: None,
85            object_store_options: HashMap::new(),
86            custom_udfs: Vec::new(),
87            custom_udafs: Vec::new(),
88            #[cfg(feature = "cluster-unstable")]
89            cluster_controller: None,
90            #[cfg(feature = "cluster-unstable")]
91            shuffle_sender: None,
92            #[cfg(feature = "cluster-unstable")]
93            shuffle_receiver: None,
94            #[cfg(feature = "cluster-unstable")]
95            decision_store: None,
96            #[cfg(feature = "cluster-unstable")]
97            assignment_snapshot_store: None,
98            state_backend: None,
99            vnode_registry: None,
100            physical_optimizer_rules: Vec::new(),
101            target_partitions: None,
102        }
103    }
104
105    /// Override `target_partitions`; requires a distributed-aware
106    /// physical optimizer rule to replace `RepartitionExec`.
107    #[must_use]
108    pub fn target_partitions(mut self, n: usize) -> Self {
109        self.target_partitions = Some(n);
110        self
111    }
112
113    /// Register an additional `PhysicalOptimizerRule` on the session state.
114    #[must_use]
115    pub fn physical_optimizer_rule(
116        mut self,
117        rule: std::sync::Arc<
118            dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync,
119        >,
120    ) -> Self {
121        self.physical_optimizer_rules.push(rule);
122        self
123    }
124
125    /// Install a state backend. Must be paired with
126    /// [`Self::vnode_registry`] — `build()` rejects a half-set
127    /// configuration.
128    #[must_use]
129    pub fn state_backend(
130        mut self,
131        backend: std::sync::Arc<dyn laminar_core::state::StateBackend>,
132    ) -> Self {
133        self.state_backend = Some(backend);
134        self
135    }
136
137    /// Install a vnode registry. Must be paired with
138    /// [`Self::state_backend`] — `build()` rejects a half-set
139    /// configuration.
140    #[must_use]
141    pub fn vnode_registry(
142        mut self,
143        registry: std::sync::Arc<laminar_core::state::VnodeRegistry>,
144    ) -> Self {
145        self.vnode_registry = Some(registry);
146        self
147    }
148
149    /// Install a cluster control facade. Activates cluster-mode
150    /// checkpoint / shuffle semantics inside the engine. Called from
151    /// `laminar-server`'s cluster startup path after discovery has
152    /// converged.
153    #[cfg(feature = "cluster-unstable")]
154    #[must_use]
155    pub fn cluster_controller(
156        mut self,
157        controller: std::sync::Arc<laminar_core::cluster::control::ClusterController>,
158    ) -> Self {
159        self.cluster_controller = Some(controller);
160        self
161    }
162
163    /// Install the outbound shuffle handle used by cluster-mode streaming
164    /// aggregates. Rows whose group key hashes to a remote vnode are
165    /// shipped through this sender. Pair with [`Self::shuffle_receiver`];
166    /// either alone is a no-op.
167    #[cfg(feature = "cluster-unstable")]
168    #[must_use]
169    pub fn shuffle_sender(
170        mut self,
171        sender: std::sync::Arc<laminar_core::shuffle::ShuffleSender>,
172    ) -> Self {
173        self.shuffle_sender = Some(sender);
174        self
175    }
176
177    /// Install the inbound shuffle handle used by cluster-mode streaming
178    /// aggregates. Remote partial-aggregate rows arrive here and are
179    /// drained into the local accumulator each cycle.
180    #[cfg(feature = "cluster-unstable")]
181    #[must_use]
182    pub fn shuffle_receiver(
183        mut self,
184        receiver: std::sync::Arc<laminar_core::shuffle::ShuffleReceiver>,
185    ) -> Self {
186        self.shuffle_receiver = Some(receiver);
187        self
188    }
189
190    /// Install the commit-marker store for cross-instance 2PC.
191    #[cfg(feature = "cluster-unstable")]
192    #[must_use]
193    pub fn decision_store(
194        mut self,
195        store: std::sync::Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
196    ) -> Self {
197        self.decision_store = Some(store);
198        self
199    }
200
201    /// Install the assignment-snapshot store for dynamic rebalance.
202    #[cfg(feature = "cluster-unstable")]
203    #[must_use]
204    pub fn assignment_snapshot_store(
205        mut self,
206        store: std::sync::Arc<laminar_core::cluster::control::AssignmentSnapshotStore>,
207    ) -> Self {
208        self.assignment_snapshot_store = Some(store);
209        self
210    }
211
212    /// Set a config variable for `${VAR}` substitution in SQL.
213    #[must_use]
214    pub fn config_var(mut self, key: &str, value: &str) -> Self {
215        self.config_vars.insert(key.to_string(), value.to_string());
216        self
217    }
218
219    /// Set the default buffer size for streaming channels.
220    #[must_use]
221    pub fn buffer_size(mut self, size: usize) -> Self {
222        self.config.default_buffer_size = size;
223        self
224    }
225
226    /// Set the default backpressure strategy.
227    #[must_use]
228    pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
229        self.config.default_backpressure = strategy;
230        self
231    }
232
233    /// Set the storage directory for WAL and checkpoints.
234    #[must_use]
235    pub fn storage_dir(mut self, path: impl Into<PathBuf>) -> Self {
236        self.config.storage_dir = Some(path.into());
237        self
238    }
239
240    /// Set checkpoint configuration.
241    #[must_use]
242    pub fn checkpoint(mut self, config: StreamCheckpointConfig) -> Self {
243        self.config.checkpoint = Some(config);
244        self
245    }
246
247    /// Set the deployment profile.
248    ///
249    /// See [`Profile`] for the available tiers.
250    #[must_use]
251    pub fn profile(mut self, profile: Profile) -> Self {
252        self.profile = profile;
253        self.profile_explicit = true;
254        self
255    }
256
257    /// Set the object-store URL for durable checkpoints.
258    ///
259    /// Required when using [`Profile::Durable`] or
260    /// [`Profile::Cluster`].
261    #[must_use]
262    pub fn object_store_url(mut self, url: impl Into<String>) -> Self {
263        self.object_store_url = Some(url.into());
264        self
265    }
266
267    /// Set explicit credential/config overrides for the object store.
268    ///
269    /// Keys are backend-specific (e.g., `aws_access_key_id`, `aws_region`).
270    /// These supplement environment-variable-based credential resolution.
271    #[must_use]
272    pub fn object_store_options(mut self, opts: HashMap<String, String>) -> Self {
273        self.object_store_options = opts;
274        self
275    }
276
277    /// Set the S3 storage class tiering configuration.
278    #[must_use]
279    pub fn tiering(mut self, tiering: crate::config::TieringConfig) -> Self {
280        self.config.tiering = Some(tiering);
281        self
282    }
283
284    /// Set the end-to-end delivery guarantee for the pipeline.
285    #[must_use]
286    pub fn delivery_guarantee(
287        mut self,
288        guarantee: laminar_connectors::connector::DeliveryGuarantee,
289    ) -> Self {
290        self.config.delivery_guarantee = guarantee;
291        self
292    }
293
294    /// Register a custom scalar UDF with the database.
295    ///
296    /// The UDF will be available in SQL queries after `build()`.
297    ///
298    /// # Example
299    ///
300    /// ```rust,ignore
301    /// use datafusion_expr::ScalarUDF;
302    ///
303    /// let db = LaminarDB::builder()
304    ///     .register_udf(my_scalar_udf)
305    ///     .build()
306    ///     .await?;
307    /// ```
308    #[must_use]
309    pub fn register_udf(mut self, udf: ScalarUDF) -> Self {
310        self.custom_udfs.push(udf);
311        self
312    }
313
314    /// Register a custom aggregate UDF (UDAF) with the database.
315    ///
316    /// The UDAF will be available in SQL queries after `build()`.
317    ///
318    /// # Example
319    ///
320    /// ```rust,ignore
321    /// use datafusion_expr::AggregateUDF;
322    ///
323    /// let db = LaminarDB::builder()
324    ///     .register_udaf(my_aggregate_udf)
325    ///     .build()
326    ///     .await?;
327    /// ```
328    #[must_use]
329    pub fn register_udaf(mut self, udaf: AggregateUDF) -> Self {
330        self.custom_udafs.push(udaf);
331        self
332    }
333
334    /// Source → coordinator channel capacity (default 64). Increase for
335    /// burst absorption at the cost of memory.
336    #[must_use]
337    pub fn pipeline_channel_capacity(mut self, capacity: usize) -> Self {
338        self.config.pipeline_channel_capacity = Some(capacity);
339        self
340    }
341
342    /// Micro-batch coalescing window (default 5ms for connectors, 0 for
343    /// embedded). Larger values amortize per-cycle SQL overhead.
344    #[must_use]
345    pub fn pipeline_batch_window(mut self, window: std::time::Duration) -> Self {
346        self.config.pipeline_batch_window = Some(window);
347        self
348    }
349
350    /// Max time draining the source channel per cycle, in nanoseconds
351    /// (default 1ms). Increase to process more messages per SQL execution.
352    #[must_use]
353    pub fn pipeline_drain_budget_ns(mut self, ns: u64) -> Self {
354        self.config.pipeline_drain_budget_ns = Some(ns);
355        self
356    }
357
358    /// Per-query execution budget in nanoseconds (default 8ms). When
359    /// exceeded, remaining queries are deferred to the next cycle.
360    #[must_use]
361    pub fn pipeline_query_budget_ns(mut self, ns: u64) -> Self {
362        self.config.pipeline_query_budget_ns = Some(ns);
363        self
364    }
365
366    /// Per-port operator input-buffer cap in batches (default 256).
367    #[must_use]
368    pub fn pipeline_max_input_buf_batches(mut self, batches: usize) -> Self {
369        self.config.pipeline_max_input_buf_batches = Some(batches);
370        self
371    }
372
373    /// Per-port operator input-buffer cap in bytes.
374    #[must_use]
375    pub fn pipeline_max_input_buf_bytes(mut self, bytes: usize) -> Self {
376        self.config.pipeline_max_input_buf_bytes = Some(bytes);
377        self
378    }
379
380    /// Backpressure policy (default `Backpressure`).
381    #[must_use]
382    pub fn pipeline_backpressure_policy(
383        mut self,
384        policy: crate::config::BackpressurePolicy,
385    ) -> Self {
386        self.config.pipeline_backpressure_policy = policy;
387        self
388    }
389
390    /// Register custom connectors with the `ConnectorRegistry`.
391    ///
392    /// The callback is invoked after the database is created and built-in
393    /// connectors are registered. Use it to add user-defined source/sink
394    /// implementations.
395    ///
396    /// # Example
397    ///
398    /// ```rust,ignore
399    /// let db = LaminarDB::builder()
400    ///     .register_connector(|registry| {
401    ///         registry.register_source("my-source", info, factory);
402    ///     })
403    ///     .build()
404    ///     .await?;
405    /// ```
406    #[must_use]
407    pub fn register_connector(
408        mut self,
409        f: impl FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send + 'static,
410    ) -> Self {
411        self.connector_callbacks.push(Box::new(f));
412        self
413    }
414
415    /// Build the `LaminarDB` instance.
416    ///
417    /// # Errors
418    ///
419    /// Returns `DbError` if database creation fails.
420    #[allow(clippy::unused_async)]
421    pub async fn build(mut self) -> Result<LaminarDB, DbError> {
422        // Forward object store settings into the config before profile detection.
423        self.config.object_store_url = self.object_store_url;
424        self.config.object_store_options = self.object_store_options;
425
426        // Auto-detect profile from config if not explicitly set.
427        if !self.profile_explicit {
428            self.profile = Profile::from_config(&self.config, false);
429        }
430
431        // Validate profile feature gates and config requirements.
432        self.profile
433            .validate_features()
434            .map_err(|e| DbError::Config(e.to_string()))?;
435        self.profile
436            .validate_config(&self.config, self.config.object_store_url.as_deref())
437            .map_err(|e| DbError::Config(e.to_string()))?;
438
439        Self::validate_backpressure(&self.config)?;
440
441        // State backend and vnode registry must be paired — half-set
442        // would leave the durability gate silently disabled.
443        match (&self.state_backend, &self.vnode_registry) {
444            (Some(_), None) => {
445                return Err(DbError::Config(
446                    "state_backend is set but vnode_registry is missing".into(),
447                ));
448            }
449            (None, Some(_)) => {
450                return Err(DbError::Config(
451                    "vnode_registry is set but state_backend is missing".into(),
452                ));
453            }
454            _ => {}
455        }
456
457        // Apply profile defaults for fields the user hasn't set.
458        self.profile.apply_defaults(&mut self.config);
459
460        let db = LaminarDB::open_with_config_and_vars_and_rules(
461            self.config,
462            self.config_vars,
463            &self.physical_optimizer_rules,
464            self.target_partitions,
465        )?;
466        for callback in self.connector_callbacks {
467            callback(db.connector_registry());
468        }
469        for udf in self.custom_udfs {
470            db.register_custom_udf(udf);
471        }
472        for udaf in self.custom_udafs {
473            db.register_custom_udaf(udaf);
474        }
475        #[cfg(feature = "cluster-unstable")]
476        if let Some(controller) = self.cluster_controller {
477            db.set_cluster_controller(controller);
478        }
479        #[cfg(feature = "cluster-unstable")]
480        if let Some(sender) = self.shuffle_sender {
481            db.set_shuffle_sender(sender);
482        }
483        #[cfg(feature = "cluster-unstable")]
484        if let Some(receiver) = self.shuffle_receiver {
485            db.set_shuffle_receiver(receiver);
486        }
487        #[cfg(feature = "cluster-unstable")]
488        if let Some(store) = self.decision_store {
489            db.set_decision_store(store);
490        }
491        #[cfg(feature = "cluster-unstable")]
492        if let Some(store) = self.assignment_snapshot_store {
493            db.set_assignment_snapshot_store(store);
494        }
495        if let Some(backend) = self.state_backend {
496            db.set_state_backend(backend);
497        }
498        if let Some(registry) = self.vnode_registry {
499            db.set_vnode_registry(registry);
500        }
501        Ok(db)
502    }
503
504    fn validate_backpressure(config: &LaminarConfig) -> Result<(), DbError> {
505        use crate::config::BackpressurePolicy;
506        use laminar_connectors::connector::DeliveryGuarantee;
507
508        let policy = config.pipeline_backpressure_policy;
509        if policy == BackpressurePolicy::Backpressure {
510            return Ok(());
511        }
512
513        // Non-default policy needs at least one finite, non-zero cap.
514        let has_count_cap = config.pipeline_max_input_buf_batches.is_none_or(|c| c > 0);
515        let has_byte_cap = config.pipeline_max_input_buf_bytes.is_some_and(|b| b > 0);
516        if !has_count_cap && !has_byte_cap {
517            return Err(DbError::Config(format!(
518                "backpressure_policy={policy:?} requires at least one of \
519                 pipeline_max_input_buf_batches (>0) or pipeline_max_input_buf_bytes"
520            )));
521        }
522
523        if policy == BackpressurePolicy::ShedOldest
524            && config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
525        {
526            return Err(DbError::Config(
527                "ShedOldest drops data; it is incompatible with exactly-once \
528                 delivery. Use Backpressure or Fail, or downgrade the guarantee."
529                    .into(),
530            ));
531        }
532        Ok(())
533    }
534}
535
536impl Default for LaminarDbBuilder {
537    fn default() -> Self {
538        Self::new()
539    }
540}
541
542impl std::fmt::Debug for LaminarDbBuilder {
543    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544        f.debug_struct("LaminarDbBuilder")
545            .field("config", &self.config)
546            .field("profile", &self.profile)
547            .field("profile_explicit", &self.profile_explicit)
548            .field("object_store_url", &self.object_store_url)
549            .field(
550                "object_store_options_count",
551                &self.object_store_options.len(),
552            )
553            .field("config_vars_count", &self.config_vars.len())
554            .field("connector_callbacks", &self.connector_callbacks.len())
555            .field("custom_udfs", &self.custom_udfs.len())
556            .field("custom_udafs", &self.custom_udafs.len())
557            .finish_non_exhaustive()
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564
565    #[tokio::test]
566    async fn test_default_builder() {
567        let db = LaminarDbBuilder::new().build().await.unwrap();
568        assert!(!db.is_closed());
569    }
570
571    #[tokio::test]
572    async fn test_shed_oldest_requires_cap() {
573        use crate::config::BackpressurePolicy;
574        let err = LaminarDbBuilder::new()
575            .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
576            .pipeline_max_input_buf_batches(0)
577            .build()
578            .await
579            .expect_err("ShedOldest with no caps must be rejected");
580        assert!(err.to_string().contains("requires at least one"), "{err}");
581    }
582
583    #[tokio::test]
584    async fn test_shed_oldest_rejects_exactly_once() {
585        use crate::config::BackpressurePolicy;
586        use laminar_connectors::connector::DeliveryGuarantee;
587        let err = LaminarDbBuilder::new()
588            .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
589            .pipeline_max_input_buf_batches(64)
590            .delivery_guarantee(DeliveryGuarantee::ExactlyOnce)
591            .build()
592            .await
593            .expect_err("ShedOldest + ExactlyOnce must be rejected");
594        assert!(err.to_string().contains("exactly-once"), "{err}");
595    }
596
597    #[tokio::test]
598    async fn test_valid_shed_oldest_builds() {
599        use crate::config::BackpressurePolicy;
600        let db = LaminarDbBuilder::new()
601            .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
602            .pipeline_max_input_buf_batches(64)
603            .build()
604            .await
605            .unwrap();
606        assert!(!db.is_closed());
607    }
608
609    #[tokio::test]
610    async fn test_builder_with_config_vars() {
611        let db = LaminarDbBuilder::new()
612            .config_var("KAFKA_BROKERS", "localhost:9092")
613            .config_var("GROUP_ID", "test-group")
614            .build()
615            .await
616            .unwrap();
617        assert!(!db.is_closed());
618    }
619
620    #[tokio::test]
621    async fn test_builder_with_options() {
622        let db = LaminarDbBuilder::new()
623            .buffer_size(131_072)
624            .build()
625            .await
626            .unwrap();
627        assert!(!db.is_closed());
628    }
629
630    #[tokio::test]
631    async fn test_builder_from_laminardb() {
632        let db = LaminarDB::builder().build().await.unwrap();
633        assert!(!db.is_closed());
634    }
635
636    #[test]
637    fn test_builder_debug() {
638        let builder = LaminarDbBuilder::new().config_var("K", "V");
639        let debug = format!("{builder:?}");
640        assert!(debug.contains("LaminarDbBuilder"));
641        assert!(debug.contains("config_vars_count: 1"));
642    }
643
644    #[tokio::test]
645    async fn test_builder_register_udf() {
646        use std::any::Any;
647        use std::hash::{Hash, Hasher};
648
649        use arrow::datatypes::DataType;
650        use datafusion_expr::{
651            ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
652        };
653
654        /// Trivial UDF that returns 42.
655        #[derive(Debug)]
656        struct FortyTwo {
657            signature: Signature,
658        }
659
660        impl FortyTwo {
661            fn new() -> Self {
662                Self {
663                    signature: Signature::new(TypeSignature::Nullary, Volatility::Immutable),
664                }
665            }
666        }
667
668        impl PartialEq for FortyTwo {
669            fn eq(&self, _: &Self) -> bool {
670                true
671            }
672        }
673
674        impl Eq for FortyTwo {}
675
676        impl Hash for FortyTwo {
677            fn hash<H: Hasher>(&self, state: &mut H) {
678                "forty_two".hash(state);
679            }
680        }
681
682        impl ScalarUDFImpl for FortyTwo {
683            fn as_any(&self) -> &dyn Any {
684                self
685            }
686            fn name(&self) -> &'static str {
687                "forty_two"
688            }
689            fn signature(&self) -> &Signature {
690                &self.signature
691            }
692            fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
693                Ok(DataType::Int64)
694            }
695            fn invoke_with_args(
696                &self,
697                _args: ScalarFunctionArgs,
698            ) -> datafusion_common::Result<ColumnarValue> {
699                Ok(ColumnarValue::Scalar(
700                    datafusion_common::ScalarValue::Int64(Some(42)),
701                ))
702            }
703        }
704
705        let udf = ScalarUDF::new_from_impl(FortyTwo::new());
706        let db = LaminarDB::builder()
707            .register_udf(udf)
708            .build()
709            .await
710            .unwrap();
711
712        // Verify the UDF is queryable
713        let result = db.execute("SELECT forty_two()").await;
714        assert!(result.is_ok(), "UDF should be callable: {result:?}");
715    }
716}