1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::path::PathBuf;
6
7use datafusion_expr::{AggregateUDF, ScalarUDF};
8use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
9
10use crate::config::LaminarConfig;
11use crate::db::LaminarDB;
12use crate::error::DbError;
13use crate::profile::Profile;
14
15type ConnectorCallback = Box<dyn FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send>;
17
18pub struct LaminarDbBuilder {
30 config: LaminarConfig,
31 config_vars: HashMap<String, String>,
32 connector_callbacks: Vec<ConnectorCallback>,
33 profile: Profile,
34 profile_explicit: bool,
35 object_store_url: Option<String>,
36 object_store_options: HashMap<String, String>,
37 custom_udfs: Vec<ScalarUDF>,
38 custom_udafs: Vec<AggregateUDF>,
39 #[cfg(feature = "cluster-unstable")]
42 cluster_controller: Option<std::sync::Arc<laminar_core::cluster::control::ClusterController>>,
43 #[cfg(feature = "cluster-unstable")]
47 shuffle_sender: Option<std::sync::Arc<laminar_core::shuffle::ShuffleSender>>,
48 #[cfg(feature = "cluster-unstable")]
50 shuffle_receiver: Option<std::sync::Arc<laminar_core::shuffle::ShuffleReceiver>>,
51 #[cfg(feature = "cluster-unstable")]
53 decision_store: Option<std::sync::Arc<laminar_core::cluster::control::CheckpointDecisionStore>>,
54 #[cfg(feature = "cluster-unstable")]
56 assignment_snapshot_store:
57 Option<std::sync::Arc<laminar_core::cluster::control::AssignmentSnapshotStore>>,
58 state_backend: Option<std::sync::Arc<dyn laminar_core::state::StateBackend>>,
62 vnode_registry: Option<std::sync::Arc<laminar_core::state::VnodeRegistry>>,
64 physical_optimizer_rules: Vec<
67 std::sync::Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
68 >,
69 target_partitions: Option<usize>,
72}
73
74impl LaminarDbBuilder {
75 #[must_use]
77 pub fn new() -> Self {
78 Self {
79 config: LaminarConfig::default(),
80 config_vars: HashMap::new(),
81 connector_callbacks: Vec::new(),
82 profile: Profile::default(),
83 profile_explicit: false,
84 object_store_url: None,
85 object_store_options: HashMap::new(),
86 custom_udfs: Vec::new(),
87 custom_udafs: Vec::new(),
88 #[cfg(feature = "cluster-unstable")]
89 cluster_controller: None,
90 #[cfg(feature = "cluster-unstable")]
91 shuffle_sender: None,
92 #[cfg(feature = "cluster-unstable")]
93 shuffle_receiver: None,
94 #[cfg(feature = "cluster-unstable")]
95 decision_store: None,
96 #[cfg(feature = "cluster-unstable")]
97 assignment_snapshot_store: None,
98 state_backend: None,
99 vnode_registry: None,
100 physical_optimizer_rules: Vec::new(),
101 target_partitions: None,
102 }
103 }
104
105 #[must_use]
108 pub fn target_partitions(mut self, n: usize) -> Self {
109 self.target_partitions = Some(n);
110 self
111 }
112
113 #[must_use]
115 pub fn physical_optimizer_rule(
116 mut self,
117 rule: std::sync::Arc<
118 dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync,
119 >,
120 ) -> Self {
121 self.physical_optimizer_rules.push(rule);
122 self
123 }
124
125 #[must_use]
129 pub fn state_backend(
130 mut self,
131 backend: std::sync::Arc<dyn laminar_core::state::StateBackend>,
132 ) -> Self {
133 self.state_backend = Some(backend);
134 self
135 }
136
137 #[must_use]
141 pub fn vnode_registry(
142 mut self,
143 registry: std::sync::Arc<laminar_core::state::VnodeRegistry>,
144 ) -> Self {
145 self.vnode_registry = Some(registry);
146 self
147 }
148
149 #[cfg(feature = "cluster-unstable")]
154 #[must_use]
155 pub fn cluster_controller(
156 mut self,
157 controller: std::sync::Arc<laminar_core::cluster::control::ClusterController>,
158 ) -> Self {
159 self.cluster_controller = Some(controller);
160 self
161 }
162
163 #[cfg(feature = "cluster-unstable")]
168 #[must_use]
169 pub fn shuffle_sender(
170 mut self,
171 sender: std::sync::Arc<laminar_core::shuffle::ShuffleSender>,
172 ) -> Self {
173 self.shuffle_sender = Some(sender);
174 self
175 }
176
177 #[cfg(feature = "cluster-unstable")]
181 #[must_use]
182 pub fn shuffle_receiver(
183 mut self,
184 receiver: std::sync::Arc<laminar_core::shuffle::ShuffleReceiver>,
185 ) -> Self {
186 self.shuffle_receiver = Some(receiver);
187 self
188 }
189
190 #[cfg(feature = "cluster-unstable")]
192 #[must_use]
193 pub fn decision_store(
194 mut self,
195 store: std::sync::Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
196 ) -> Self {
197 self.decision_store = Some(store);
198 self
199 }
200
201 #[cfg(feature = "cluster-unstable")]
203 #[must_use]
204 pub fn assignment_snapshot_store(
205 mut self,
206 store: std::sync::Arc<laminar_core::cluster::control::AssignmentSnapshotStore>,
207 ) -> Self {
208 self.assignment_snapshot_store = Some(store);
209 self
210 }
211
212 #[must_use]
214 pub fn config_var(mut self, key: &str, value: &str) -> Self {
215 self.config_vars.insert(key.to_string(), value.to_string());
216 self
217 }
218
219 #[must_use]
221 pub fn buffer_size(mut self, size: usize) -> Self {
222 self.config.default_buffer_size = size;
223 self
224 }
225
226 #[must_use]
228 pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
229 self.config.default_backpressure = strategy;
230 self
231 }
232
233 #[must_use]
235 pub fn storage_dir(mut self, path: impl Into<PathBuf>) -> Self {
236 self.config.storage_dir = Some(path.into());
237 self
238 }
239
240 #[must_use]
242 pub fn checkpoint(mut self, config: StreamCheckpointConfig) -> Self {
243 self.config.checkpoint = Some(config);
244 self
245 }
246
247 #[must_use]
251 pub fn profile(mut self, profile: Profile) -> Self {
252 self.profile = profile;
253 self.profile_explicit = true;
254 self
255 }
256
257 #[must_use]
262 pub fn object_store_url(mut self, url: impl Into<String>) -> Self {
263 self.object_store_url = Some(url.into());
264 self
265 }
266
267 #[must_use]
272 pub fn object_store_options(mut self, opts: HashMap<String, String>) -> Self {
273 self.object_store_options = opts;
274 self
275 }
276
277 #[must_use]
279 pub fn tiering(mut self, tiering: crate::config::TieringConfig) -> Self {
280 self.config.tiering = Some(tiering);
281 self
282 }
283
284 #[must_use]
286 pub fn delivery_guarantee(
287 mut self,
288 guarantee: laminar_connectors::connector::DeliveryGuarantee,
289 ) -> Self {
290 self.config.delivery_guarantee = guarantee;
291 self
292 }
293
294 #[must_use]
309 pub fn register_udf(mut self, udf: ScalarUDF) -> Self {
310 self.custom_udfs.push(udf);
311 self
312 }
313
314 #[must_use]
329 pub fn register_udaf(mut self, udaf: AggregateUDF) -> Self {
330 self.custom_udafs.push(udaf);
331 self
332 }
333
334 #[must_use]
337 pub fn pipeline_channel_capacity(mut self, capacity: usize) -> Self {
338 self.config.pipeline_channel_capacity = Some(capacity);
339 self
340 }
341
342 #[must_use]
345 pub fn pipeline_batch_window(mut self, window: std::time::Duration) -> Self {
346 self.config.pipeline_batch_window = Some(window);
347 self
348 }
349
350 #[must_use]
353 pub fn pipeline_drain_budget_ns(mut self, ns: u64) -> Self {
354 self.config.pipeline_drain_budget_ns = Some(ns);
355 self
356 }
357
358 #[must_use]
361 pub fn pipeline_query_budget_ns(mut self, ns: u64) -> Self {
362 self.config.pipeline_query_budget_ns = Some(ns);
363 self
364 }
365
366 #[must_use]
368 pub fn pipeline_max_input_buf_batches(mut self, batches: usize) -> Self {
369 self.config.pipeline_max_input_buf_batches = Some(batches);
370 self
371 }
372
373 #[must_use]
375 pub fn pipeline_max_input_buf_bytes(mut self, bytes: usize) -> Self {
376 self.config.pipeline_max_input_buf_bytes = Some(bytes);
377 self
378 }
379
380 #[must_use]
382 pub fn pipeline_backpressure_policy(
383 mut self,
384 policy: crate::config::BackpressurePolicy,
385 ) -> Self {
386 self.config.pipeline_backpressure_policy = policy;
387 self
388 }
389
390 #[must_use]
407 pub fn register_connector(
408 mut self,
409 f: impl FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send + 'static,
410 ) -> Self {
411 self.connector_callbacks.push(Box::new(f));
412 self
413 }
414
415 #[allow(clippy::unused_async)]
421 pub async fn build(mut self) -> Result<LaminarDB, DbError> {
422 self.config.object_store_url = self.object_store_url;
424 self.config.object_store_options = self.object_store_options;
425
426 if !self.profile_explicit {
428 self.profile = Profile::from_config(&self.config, false);
429 }
430
431 self.profile
433 .validate_features()
434 .map_err(|e| DbError::Config(e.to_string()))?;
435 self.profile
436 .validate_config(&self.config, self.config.object_store_url.as_deref())
437 .map_err(|e| DbError::Config(e.to_string()))?;
438
439 Self::validate_backpressure(&self.config)?;
440
441 match (&self.state_backend, &self.vnode_registry) {
444 (Some(_), None) => {
445 return Err(DbError::Config(
446 "state_backend is set but vnode_registry is missing".into(),
447 ));
448 }
449 (None, Some(_)) => {
450 return Err(DbError::Config(
451 "vnode_registry is set but state_backend is missing".into(),
452 ));
453 }
454 _ => {}
455 }
456
457 self.profile.apply_defaults(&mut self.config);
459
460 let db = LaminarDB::open_with_config_and_vars_and_rules(
461 self.config,
462 self.config_vars,
463 &self.physical_optimizer_rules,
464 self.target_partitions,
465 )?;
466 for callback in self.connector_callbacks {
467 callback(db.connector_registry());
468 }
469 for udf in self.custom_udfs {
470 db.register_custom_udf(udf);
471 }
472 for udaf in self.custom_udafs {
473 db.register_custom_udaf(udaf);
474 }
475 #[cfg(feature = "cluster-unstable")]
476 if let Some(controller) = self.cluster_controller {
477 db.set_cluster_controller(controller);
478 }
479 #[cfg(feature = "cluster-unstable")]
480 if let Some(sender) = self.shuffle_sender {
481 db.set_shuffle_sender(sender);
482 }
483 #[cfg(feature = "cluster-unstable")]
484 if let Some(receiver) = self.shuffle_receiver {
485 db.set_shuffle_receiver(receiver);
486 }
487 #[cfg(feature = "cluster-unstable")]
488 if let Some(store) = self.decision_store {
489 db.set_decision_store(store);
490 }
491 #[cfg(feature = "cluster-unstable")]
492 if let Some(store) = self.assignment_snapshot_store {
493 db.set_assignment_snapshot_store(store);
494 }
495 if let Some(backend) = self.state_backend {
496 db.set_state_backend(backend);
497 }
498 if let Some(registry) = self.vnode_registry {
499 db.set_vnode_registry(registry);
500 }
501 Ok(db)
502 }
503
504 fn validate_backpressure(config: &LaminarConfig) -> Result<(), DbError> {
505 use crate::config::BackpressurePolicy;
506 use laminar_connectors::connector::DeliveryGuarantee;
507
508 let policy = config.pipeline_backpressure_policy;
509 if policy == BackpressurePolicy::Backpressure {
510 return Ok(());
511 }
512
513 let has_count_cap = config.pipeline_max_input_buf_batches.is_none_or(|c| c > 0);
515 let has_byte_cap = config.pipeline_max_input_buf_bytes.is_some_and(|b| b > 0);
516 if !has_count_cap && !has_byte_cap {
517 return Err(DbError::Config(format!(
518 "backpressure_policy={policy:?} requires at least one of \
519 pipeline_max_input_buf_batches (>0) or pipeline_max_input_buf_bytes"
520 )));
521 }
522
523 if policy == BackpressurePolicy::ShedOldest
524 && config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
525 {
526 return Err(DbError::Config(
527 "ShedOldest drops data; it is incompatible with exactly-once \
528 delivery. Use Backpressure or Fail, or downgrade the guarantee."
529 .into(),
530 ));
531 }
532 Ok(())
533 }
534}
535
536impl Default for LaminarDbBuilder {
537 fn default() -> Self {
538 Self::new()
539 }
540}
541
542impl std::fmt::Debug for LaminarDbBuilder {
543 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544 f.debug_struct("LaminarDbBuilder")
545 .field("config", &self.config)
546 .field("profile", &self.profile)
547 .field("profile_explicit", &self.profile_explicit)
548 .field("object_store_url", &self.object_store_url)
549 .field(
550 "object_store_options_count",
551 &self.object_store_options.len(),
552 )
553 .field("config_vars_count", &self.config_vars.len())
554 .field("connector_callbacks", &self.connector_callbacks.len())
555 .field("custom_udfs", &self.custom_udfs.len())
556 .field("custom_udafs", &self.custom_udafs.len())
557 .finish_non_exhaustive()
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564
565 #[tokio::test]
566 async fn test_default_builder() {
567 let db = LaminarDbBuilder::new().build().await.unwrap();
568 assert!(!db.is_closed());
569 }
570
571 #[tokio::test]
572 async fn test_shed_oldest_requires_cap() {
573 use crate::config::BackpressurePolicy;
574 let err = LaminarDbBuilder::new()
575 .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
576 .pipeline_max_input_buf_batches(0)
577 .build()
578 .await
579 .expect_err("ShedOldest with no caps must be rejected");
580 assert!(err.to_string().contains("requires at least one"), "{err}");
581 }
582
583 #[tokio::test]
584 async fn test_shed_oldest_rejects_exactly_once() {
585 use crate::config::BackpressurePolicy;
586 use laminar_connectors::connector::DeliveryGuarantee;
587 let err = LaminarDbBuilder::new()
588 .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
589 .pipeline_max_input_buf_batches(64)
590 .delivery_guarantee(DeliveryGuarantee::ExactlyOnce)
591 .build()
592 .await
593 .expect_err("ShedOldest + ExactlyOnce must be rejected");
594 assert!(err.to_string().contains("exactly-once"), "{err}");
595 }
596
597 #[tokio::test]
598 async fn test_valid_shed_oldest_builds() {
599 use crate::config::BackpressurePolicy;
600 let db = LaminarDbBuilder::new()
601 .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
602 .pipeline_max_input_buf_batches(64)
603 .build()
604 .await
605 .unwrap();
606 assert!(!db.is_closed());
607 }
608
609 #[tokio::test]
610 async fn test_builder_with_config_vars() {
611 let db = LaminarDbBuilder::new()
612 .config_var("KAFKA_BROKERS", "localhost:9092")
613 .config_var("GROUP_ID", "test-group")
614 .build()
615 .await
616 .unwrap();
617 assert!(!db.is_closed());
618 }
619
620 #[tokio::test]
621 async fn test_builder_with_options() {
622 let db = LaminarDbBuilder::new()
623 .buffer_size(131_072)
624 .build()
625 .await
626 .unwrap();
627 assert!(!db.is_closed());
628 }
629
630 #[tokio::test]
631 async fn test_builder_from_laminardb() {
632 let db = LaminarDB::builder().build().await.unwrap();
633 assert!(!db.is_closed());
634 }
635
636 #[test]
637 fn test_builder_debug() {
638 let builder = LaminarDbBuilder::new().config_var("K", "V");
639 let debug = format!("{builder:?}");
640 assert!(debug.contains("LaminarDbBuilder"));
641 assert!(debug.contains("config_vars_count: 1"));
642 }
643
644 #[tokio::test]
645 async fn test_builder_register_udf() {
646 use std::any::Any;
647 use std::hash::{Hash, Hasher};
648
649 use arrow::datatypes::DataType;
650 use datafusion_expr::{
651 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
652 };
653
654 #[derive(Debug)]
656 struct FortyTwo {
657 signature: Signature,
658 }
659
660 impl FortyTwo {
661 fn new() -> Self {
662 Self {
663 signature: Signature::new(TypeSignature::Nullary, Volatility::Immutable),
664 }
665 }
666 }
667
668 impl PartialEq for FortyTwo {
669 fn eq(&self, _: &Self) -> bool {
670 true
671 }
672 }
673
674 impl Eq for FortyTwo {}
675
676 impl Hash for FortyTwo {
677 fn hash<H: Hasher>(&self, state: &mut H) {
678 "forty_two".hash(state);
679 }
680 }
681
682 impl ScalarUDFImpl for FortyTwo {
683 fn as_any(&self) -> &dyn Any {
684 self
685 }
686 fn name(&self) -> &'static str {
687 "forty_two"
688 }
689 fn signature(&self) -> &Signature {
690 &self.signature
691 }
692 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
693 Ok(DataType::Int64)
694 }
695 fn invoke_with_args(
696 &self,
697 _args: ScalarFunctionArgs,
698 ) -> datafusion_common::Result<ColumnarValue> {
699 Ok(ColumnarValue::Scalar(
700 datafusion_common::ScalarValue::Int64(Some(42)),
701 ))
702 }
703 }
704
705 let udf = ScalarUDF::new_from_impl(FortyTwo::new());
706 let db = LaminarDB::builder()
707 .register_udf(udf)
708 .build()
709 .await
710 .unwrap();
711
712 let result = db.execute("SELECT forty_two()").await;
714 assert!(result.is_ok(), "UDF should be callable: {result:?}");
715 }
716}