1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
5use std::path::PathBuf;
6
7use datafusion_expr::{AggregateUDF, ScalarUDF};
8use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
9
10use crate::config::LaminarConfig;
11use crate::db::LaminarDB;
12use crate::error::DbError;
13use crate::profile::Profile;
14
15type ConnectorCallback = Box<dyn FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send>;
17
18pub struct LaminarDbBuilder {
30 config: LaminarConfig,
31 config_vars: HashMap<String, String>,
32 connector_callbacks: Vec<ConnectorCallback>,
33 profile: Profile,
34 profile_explicit: bool,
35 object_store_url: Option<String>,
36 object_store_options: HashMap<String, String>,
37 custom_udfs: Vec<ScalarUDF>,
38 custom_udafs: Vec<AggregateUDF>,
39 #[cfg(feature = "cluster")]
42 cluster_controller: Option<std::sync::Arc<laminar_core::cluster::control::ClusterController>>,
43 #[cfg(feature = "cluster")]
47 shuffle_sender: Option<std::sync::Arc<laminar_core::shuffle::ShuffleSender>>,
48 #[cfg(feature = "cluster")]
50 shuffle_receiver: Option<std::sync::Arc<laminar_core::shuffle::ShuffleReceiver>>,
51 #[cfg(feature = "cluster")]
53 decision_store: Option<std::sync::Arc<laminar_core::cluster::control::CheckpointDecisionStore>>,
54 #[cfg(feature = "cluster")]
56 assignment_snapshot_store:
57 Option<std::sync::Arc<laminar_core::cluster::control::AssignmentSnapshotStore>>,
58 #[cfg(feature = "cluster")]
60 catalog_manifest_store:
61 Option<std::sync::Arc<laminar_core::cluster::control::CatalogManifestStore>>,
62 state_backend: Option<std::sync::Arc<dyn laminar_core::state::StateBackend>>,
66 vnode_registry: Option<std::sync::Arc<laminar_core::state::VnodeRegistry>>,
68 physical_optimizer_rules: Vec<
71 std::sync::Arc<dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync>,
72 >,
73 target_partitions: Option<usize>,
76 ai_runtime: Option<std::sync::Arc<crate::ai::AiRuntime>>,
78}
79
80impl LaminarDbBuilder {
81 #[must_use]
83 pub fn new() -> Self {
84 Self {
85 config: LaminarConfig::default(),
86 config_vars: HashMap::new(),
87 connector_callbacks: Vec::new(),
88 profile: Profile::default(),
89 profile_explicit: false,
90 object_store_url: None,
91 object_store_options: HashMap::new(),
92 custom_udfs: Vec::new(),
93 custom_udafs: Vec::new(),
94 #[cfg(feature = "cluster")]
95 cluster_controller: None,
96 #[cfg(feature = "cluster")]
97 shuffle_sender: None,
98 #[cfg(feature = "cluster")]
99 shuffle_receiver: None,
100 #[cfg(feature = "cluster")]
101 decision_store: None,
102 #[cfg(feature = "cluster")]
103 assignment_snapshot_store: None,
104 #[cfg(feature = "cluster")]
105 catalog_manifest_store: None,
106 state_backend: None,
107 vnode_registry: None,
108 physical_optimizer_rules: Vec::new(),
109 target_partitions: None,
110 ai_runtime: None,
111 }
112 }
113
114 #[must_use]
118 pub fn ai(mut self, runtime: std::sync::Arc<crate::ai::AiRuntime>) -> Self {
119 self.ai_runtime = Some(runtime);
120 self
121 }
122
123 #[must_use]
126 pub fn target_partitions(mut self, n: usize) -> Self {
127 self.target_partitions = Some(n);
128 self
129 }
130
131 #[must_use]
133 pub fn physical_optimizer_rule(
134 mut self,
135 rule: std::sync::Arc<
136 dyn datafusion::physical_optimizer::PhysicalOptimizerRule + Send + Sync,
137 >,
138 ) -> Self {
139 self.physical_optimizer_rules.push(rule);
140 self
141 }
142
143 #[must_use]
147 pub fn state_backend(
148 mut self,
149 backend: std::sync::Arc<dyn laminar_core::state::StateBackend>,
150 ) -> Self {
151 self.state_backend = Some(backend);
152 self
153 }
154
155 #[must_use]
159 pub fn vnode_registry(
160 mut self,
161 registry: std::sync::Arc<laminar_core::state::VnodeRegistry>,
162 ) -> Self {
163 self.vnode_registry = Some(registry);
164 self
165 }
166
167 #[cfg(feature = "cluster")]
172 #[must_use]
173 pub fn cluster_controller(
174 mut self,
175 controller: std::sync::Arc<laminar_core::cluster::control::ClusterController>,
176 ) -> Self {
177 self.cluster_controller = Some(controller);
178 self
179 }
180
181 #[cfg(feature = "cluster")]
186 #[must_use]
187 pub fn shuffle_sender(
188 mut self,
189 sender: std::sync::Arc<laminar_core::shuffle::ShuffleSender>,
190 ) -> Self {
191 self.shuffle_sender = Some(sender);
192 self
193 }
194
195 #[cfg(feature = "cluster")]
199 #[must_use]
200 pub fn shuffle_receiver(
201 mut self,
202 receiver: std::sync::Arc<laminar_core::shuffle::ShuffleReceiver>,
203 ) -> Self {
204 self.shuffle_receiver = Some(receiver);
205 self
206 }
207
208 #[cfg(feature = "cluster")]
210 #[must_use]
211 pub fn decision_store(
212 mut self,
213 store: std::sync::Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
214 ) -> Self {
215 self.decision_store = Some(store);
216 self
217 }
218
219 #[cfg(feature = "cluster")]
221 #[must_use]
222 pub fn assignment_snapshot_store(
223 mut self,
224 store: std::sync::Arc<laminar_core::cluster::control::AssignmentSnapshotStore>,
225 ) -> Self {
226 self.assignment_snapshot_store = Some(store);
227 self
228 }
229
230 #[cfg(feature = "cluster")]
232 #[must_use]
233 pub fn catalog_manifest_store(
234 mut self,
235 store: std::sync::Arc<laminar_core::cluster::control::CatalogManifestStore>,
236 ) -> Self {
237 self.catalog_manifest_store = Some(store);
238 self
239 }
240
241 #[must_use]
243 pub fn config_var(mut self, key: &str, value: &str) -> Self {
244 self.config_vars.insert(key.to_string(), value.to_string());
245 self
246 }
247
248 #[must_use]
251 pub fn http_auth_token(mut self, token: impl Into<String>) -> Self {
252 self.config.http_auth_token = Some(crate::config::SecretString::new(token));
253 self
254 }
255
256 #[must_use]
258 pub fn buffer_size(mut self, size: usize) -> Self {
259 self.config.default_buffer_size = size;
260 self
261 }
262
263 #[must_use]
265 pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
266 self.config.default_backpressure = strategy;
267 self
268 }
269
270 #[must_use]
272 pub fn storage_dir(mut self, path: impl Into<PathBuf>) -> Self {
273 self.config.storage_dir = Some(path.into());
274 self
275 }
276
277 #[must_use]
279 pub fn checkpoint(mut self, config: StreamCheckpointConfig) -> Self {
280 self.config.checkpoint = Some(config);
281 self
282 }
283
284 #[must_use]
288 pub fn profile(mut self, profile: Profile) -> Self {
289 self.profile = profile;
290 self.profile_explicit = true;
291 self
292 }
293
294 #[must_use]
299 pub fn object_store_url(mut self, url: impl Into<String>) -> Self {
300 self.object_store_url = Some(url.into());
301 self
302 }
303
304 #[must_use]
309 pub fn object_store_options(mut self, opts: HashMap<String, String>) -> Self {
310 self.object_store_options = opts;
311 self
312 }
313
314 #[must_use]
316 pub fn delivery_guarantee(
317 mut self,
318 guarantee: laminar_connectors::connector::DeliveryGuarantee,
319 ) -> Self {
320 self.config.delivery_guarantee = guarantee;
321 self
322 }
323
324 #[must_use]
339 pub fn register_udf(mut self, udf: ScalarUDF) -> Self {
340 self.custom_udfs.push(udf);
341 self
342 }
343
344 #[must_use]
359 pub fn register_udaf(mut self, udaf: AggregateUDF) -> Self {
360 self.custom_udafs.push(udaf);
361 self
362 }
363
364 #[must_use]
367 pub fn pipeline_channel_capacity(mut self, capacity: usize) -> Self {
368 self.config.pipeline_channel_capacity = Some(capacity);
369 self
370 }
371
372 #[must_use]
375 pub fn pipeline_batch_window(mut self, window: std::time::Duration) -> Self {
376 self.config.pipeline_batch_window = Some(window);
377 self
378 }
379
380 #[must_use]
383 pub fn pipeline_drain_budget_ns(mut self, ns: u64) -> Self {
384 self.config.pipeline_drain_budget_ns = Some(ns);
385 self
386 }
387
388 #[must_use]
391 pub fn pipeline_query_budget_ns(mut self, ns: u64) -> Self {
392 self.config.pipeline_query_budget_ns = Some(ns);
393 self
394 }
395
396 #[must_use]
398 pub fn pipeline_max_input_buf_batches(mut self, batches: usize) -> Self {
399 self.config.pipeline_max_input_buf_batches = Some(batches);
400 self
401 }
402
403 #[must_use]
405 pub fn pipeline_max_input_buf_bytes(mut self, bytes: usize) -> Self {
406 self.config.pipeline_max_input_buf_bytes = Some(bytes);
407 self
408 }
409
410 #[must_use]
412 pub fn pipeline_backpressure_policy(
413 mut self,
414 policy: crate::config::BackpressurePolicy,
415 ) -> Self {
416 self.config.pipeline_backpressure_policy = policy;
417 self
418 }
419
420 #[must_use]
437 pub fn register_connector(
438 mut self,
439 f: impl FnOnce(&laminar_connectors::registry::ConnectorRegistry) + Send + 'static,
440 ) -> Self {
441 self.connector_callbacks.push(Box::new(f));
442 self
443 }
444
445 #[allow(clippy::unused_async)]
451 pub async fn build(mut self) -> Result<LaminarDB, DbError> {
452 self.config.object_store_url = self.object_store_url;
454 self.config.object_store_options = self.object_store_options;
455
456 if !self.profile_explicit {
458 self.profile = Profile::from_config(&self.config, false);
459 }
460
461 self.profile
463 .validate_features()
464 .map_err(|e| DbError::Config(e.to_string()))?;
465 self.profile
466 .validate_config(&self.config, self.config.object_store_url.as_deref())
467 .map_err(|e| DbError::Config(e.to_string()))?;
468
469 Self::validate_backpressure(&self.config)?;
470
471 match (&self.state_backend, &self.vnode_registry) {
473 (Some(_), None) => {
474 return Err(DbError::Config(
475 "state_backend is set but vnode_registry is missing".into(),
476 ));
477 }
478 (None, Some(_)) => {
479 return Err(DbError::Config(
480 "vnode_registry is set but state_backend is missing".into(),
481 ));
482 }
483 _ => {}
484 }
485
486 self.profile.apply_defaults(&mut self.config);
488
489 let mut db = LaminarDB::open_with_config_and_vars_and_rules(
490 self.config,
491 self.config_vars,
492 &self.physical_optimizer_rules,
493 self.target_partitions,
494 )?;
495 if let Some(runtime) = self.ai_runtime {
496 let handle = tokio::runtime::Handle::try_current().map_err(|_| {
498 DbError::InvalidOperation(
499 "LaminarDB::build() with an AI runtime must run inside a Tokio runtime"
500 .to_string(),
501 )
502 })?;
503 db.set_ai_runtime(runtime, handle);
504 }
505 for callback in self.connector_callbacks {
506 callback(db.connector_registry());
507 }
508 for udf in self.custom_udfs {
509 db.register_custom_udf(udf);
510 }
511 for udaf in self.custom_udafs {
512 db.register_custom_udaf(udaf);
513 }
514 #[cfg(feature = "cluster")]
515 if let Some(controller) = self.cluster_controller {
516 db.set_cluster_controller(controller);
517 }
518 #[cfg(feature = "cluster")]
519 if let Some(sender) = self.shuffle_sender {
520 db.set_shuffle_sender(sender);
521 }
522 #[cfg(feature = "cluster")]
523 if let Some(receiver) = self.shuffle_receiver {
524 db.set_shuffle_receiver(receiver);
525 }
526 #[cfg(feature = "cluster")]
527 if let Some(store) = self.decision_store {
528 db.set_decision_store(store);
529 }
530 #[cfg(feature = "cluster")]
531 if let Some(store) = self.assignment_snapshot_store {
532 db.set_assignment_snapshot_store(store);
533 }
534 #[cfg(feature = "cluster")]
535 if let Some(store) = self.catalog_manifest_store {
536 db.set_catalog_manifest_store(store);
537 }
538 if let Some(backend) = self.state_backend {
539 db.set_state_backend(backend);
540 }
541 if let Some(registry) = self.vnode_registry {
542 db.set_vnode_registry(registry);
543 }
544 Ok(db)
545 }
546
547 fn validate_backpressure(config: &LaminarConfig) -> Result<(), DbError> {
548 use crate::config::BackpressurePolicy;
549 use laminar_connectors::connector::DeliveryGuarantee;
550
551 let policy = config.pipeline_backpressure_policy;
552 if policy == BackpressurePolicy::Backpressure {
553 return Ok(());
554 }
555
556 let has_count_cap = config.pipeline_max_input_buf_batches.is_none_or(|c| c > 0);
558 let has_byte_cap = config.pipeline_max_input_buf_bytes.is_some_and(|b| b > 0);
559 if !has_count_cap && !has_byte_cap {
560 return Err(DbError::Config(format!(
561 "backpressure_policy={policy:?} requires at least one of \
562 pipeline_max_input_buf_batches (>0) or pipeline_max_input_buf_bytes"
563 )));
564 }
565
566 if policy == BackpressurePolicy::ShedOldest
567 && config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
568 {
569 return Err(DbError::Config(
570 "ShedOldest drops data; it is incompatible with exactly-once \
571 delivery. Use Backpressure or Fail, or downgrade the guarantee."
572 .into(),
573 ));
574 }
575 Ok(())
576 }
577}
578
579impl Default for LaminarDbBuilder {
580 fn default() -> Self {
581 Self::new()
582 }
583}
584
585impl std::fmt::Debug for LaminarDbBuilder {
586 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
587 f.debug_struct("LaminarDbBuilder")
588 .field("config", &self.config)
589 .field("profile", &self.profile)
590 .field("profile_explicit", &self.profile_explicit)
591 .field("object_store_url", &self.object_store_url)
592 .field(
593 "object_store_options_count",
594 &self.object_store_options.len(),
595 )
596 .field("config_vars_count", &self.config_vars.len())
597 .field("connector_callbacks", &self.connector_callbacks.len())
598 .field("custom_udfs", &self.custom_udfs.len())
599 .field("custom_udafs", &self.custom_udafs.len())
600 .finish_non_exhaustive()
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607
608 #[tokio::test]
609 async fn test_default_builder() {
610 let db = LaminarDbBuilder::new().build().await.unwrap();
611 assert!(!db.is_closed());
612 }
613
614 #[tokio::test]
615 async fn test_shed_oldest_requires_cap() {
616 use crate::config::BackpressurePolicy;
617 let err = LaminarDbBuilder::new()
618 .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
619 .pipeline_max_input_buf_batches(0)
620 .build()
621 .await
622 .expect_err("ShedOldest with no caps must be rejected");
623 assert!(err.to_string().contains("requires at least one"), "{err}");
624 }
625
626 #[tokio::test]
627 async fn test_shed_oldest_rejects_exactly_once() {
628 use crate::config::BackpressurePolicy;
629 use laminar_connectors::connector::DeliveryGuarantee;
630 let err = LaminarDbBuilder::new()
631 .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
632 .pipeline_max_input_buf_batches(64)
633 .delivery_guarantee(DeliveryGuarantee::ExactlyOnce)
634 .build()
635 .await
636 .expect_err("ShedOldest + ExactlyOnce must be rejected");
637 assert!(err.to_string().contains("exactly-once"), "{err}");
638 }
639
640 #[tokio::test]
641 async fn test_valid_shed_oldest_builds() {
642 use crate::config::BackpressurePolicy;
643 let db = LaminarDbBuilder::new()
644 .pipeline_backpressure_policy(BackpressurePolicy::ShedOldest)
645 .pipeline_max_input_buf_batches(64)
646 .build()
647 .await
648 .unwrap();
649 assert!(!db.is_closed());
650 }
651
652 #[tokio::test]
653 async fn test_builder_with_config_vars() {
654 let db = LaminarDbBuilder::new()
655 .config_var("KAFKA_BROKERS", "localhost:9092")
656 .config_var("GROUP_ID", "test-group")
657 .build()
658 .await
659 .unwrap();
660 assert!(!db.is_closed());
661 }
662
663 #[tokio::test]
664 async fn test_builder_with_options() {
665 let db = LaminarDbBuilder::new()
666 .buffer_size(131_072)
667 .build()
668 .await
669 .unwrap();
670 assert!(!db.is_closed());
671 }
672
673 #[tokio::test]
674 async fn test_builder_from_laminardb() {
675 let db = LaminarDB::builder().build().await.unwrap();
676 assert!(!db.is_closed());
677 }
678
679 #[test]
680 fn test_builder_debug() {
681 let builder = LaminarDbBuilder::new().config_var("K", "V");
682 let debug = format!("{builder:?}");
683 assert!(debug.contains("LaminarDbBuilder"));
684 assert!(debug.contains("config_vars_count: 1"));
685 }
686
687 #[tokio::test]
688 async fn test_builder_register_udf() {
689 use std::any::Any;
690 use std::hash::{Hash, Hasher};
691
692 use arrow::datatypes::DataType;
693 use datafusion_expr::{
694 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
695 };
696
697 #[derive(Debug)]
699 struct FortyTwo {
700 signature: Signature,
701 }
702
703 impl FortyTwo {
704 fn new() -> Self {
705 Self {
706 signature: Signature::new(TypeSignature::Nullary, Volatility::Immutable),
707 }
708 }
709 }
710
711 impl PartialEq for FortyTwo {
712 fn eq(&self, _: &Self) -> bool {
713 true
714 }
715 }
716
717 impl Eq for FortyTwo {}
718
719 impl Hash for FortyTwo {
720 fn hash<H: Hasher>(&self, state: &mut H) {
721 "forty_two".hash(state);
722 }
723 }
724
725 impl ScalarUDFImpl for FortyTwo {
726 fn as_any(&self) -> &dyn Any {
727 self
728 }
729 fn name(&self) -> &'static str {
730 "forty_two"
731 }
732 fn signature(&self) -> &Signature {
733 &self.signature
734 }
735 fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
736 Ok(DataType::Int64)
737 }
738 fn invoke_with_args(
739 &self,
740 _args: ScalarFunctionArgs,
741 ) -> datafusion_common::Result<ColumnarValue> {
742 Ok(ColumnarValue::Scalar(
743 datafusion_common::ScalarValue::Int64(Some(42)),
744 ))
745 }
746 }
747
748 let udf = ScalarUDF::new_from_impl(FortyTwo::new());
749 let db = LaminarDB::builder()
750 .register_udf(udf)
751 .build()
752 .await
753 .unwrap();
754
755 let result = db.execute("SELECT forty_two()").await;
757 assert!(result.is_ok(), "UDF should be callable: {result:?}");
758 }
759}