Skip to main content

laminar_db/
config.rs

1//! Configuration for `LaminarDB`.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use laminar_connectors::connector::DeliveryGuarantee;
8use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
9
10/// What to do when an operator's input buffer exceeds its cap.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
12pub enum BackpressurePolicy {
13    /// Defer the producer; sources block on `send`. No data loss.
14    #[default]
15    Backpressure,
16    /// Drop oldest batches; counted in `shed_records_total`.
17    ShedOldest,
18    /// Error out the cycle.
19    Fail,
20}
21
22/// String wrapper whose `Debug` redacts the value, for credentials held in
23/// [`LaminarConfig`].
24#[derive(Clone)]
25pub struct SecretString(String);
26
27impl SecretString {
28    /// Wrap a secret value.
29    pub fn new(value: impl Into<String>) -> Self {
30        Self(value.into())
31    }
32
33    /// Borrow the underlying secret. Call only at the point of use.
34    #[must_use]
35    pub fn expose(&self) -> &str {
36        &self.0
37    }
38}
39
40impl std::fmt::Debug for SecretString {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        f.write_str("\"[REDACTED]\"")
43    }
44}
45
46/// Configuration for a `LaminarDB` instance.
47#[derive(Debug, Clone)]
48pub struct LaminarConfig {
49    /// Streaming channel buffer size.
50    pub default_buffer_size: usize,
51    /// Backpressure strategy.
52    pub default_backpressure: BackpressureStrategy,
53    /// Checkpoint directory. `None` = in-memory only.
54    pub storage_dir: Option<PathBuf>,
55    /// Checkpoint config. `None` = disabled.
56    pub checkpoint: Option<StreamCheckpointConfig>,
57    /// Cloud checkpoint URL, e.g. `s3://bucket/prefix`.
58    pub object_store_url: Option<String>,
59    /// Credential/config overrides for the object store.
60    pub object_store_options: HashMap<String, String>,
61    /// Bearer token presented when forwarding requests to the cluster leader's
62    /// HTTP API (set when the server gates `/api/v1` with `console_token`).
63    pub http_auth_token: Option<SecretString>,
64    /// Delivery guarantee.
65    pub delivery_guarantee: DeliveryGuarantee,
66    /// Per-operator state limit. At 80% warns, at 100% errors. `None` = unlimited.
67    pub max_state_bytes_per_operator: Option<usize>,
68
69    /// Source-to-coordinator channel capacity. `None` = 64.
70    pub pipeline_channel_capacity: Option<usize>,
71    /// Micro-batch coalescing window. `None` = 5ms connectors / 0 embedded.
72    pub pipeline_batch_window: Option<std::time::Duration>,
73    /// Drain budget per cycle (ns). `None` = 1ms.
74    pub pipeline_drain_budget_ns: Option<u64>,
75    /// Per-query budget (ns). `None` = 8ms.
76    pub pipeline_query_budget_ns: Option<u64>,
77    /// Per-port operator input-buffer cap (batches). `None` = 256.
78    pub pipeline_max_input_buf_batches: Option<usize>,
79    /// Per-port operator input-buffer cap (bytes). `None` = disabled.
80    pub pipeline_max_input_buf_bytes: Option<usize>,
81    /// Backpressure policy. See [`BackpressurePolicy`].
82    pub pipeline_backpressure_policy: BackpressurePolicy,
83}
84
85impl Default for LaminarConfig {
86    fn default() -> Self {
87        Self {
88            default_buffer_size: 65536,
89            default_backpressure: BackpressureStrategy::Block,
90            storage_dir: None,
91            checkpoint: None,
92            object_store_url: None,
93            object_store_options: HashMap::new(),
94            http_auth_token: None,
95            delivery_guarantee: DeliveryGuarantee::default(),
96            max_state_bytes_per_operator: None,
97            pipeline_channel_capacity: None,
98            pipeline_batch_window: None,
99            pipeline_drain_budget_ns: None,
100            pipeline_query_budget_ns: None,
101            pipeline_max_input_buf_batches: None,
102            pipeline_max_input_buf_bytes: None,
103            pipeline_backpressure_policy: BackpressurePolicy::default(),
104        }
105    }
106}