Skip to main content

laminar_db/
config.rs

1//! Configuration for `LaminarDB`.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use laminar_connectors::connector::DeliveryGuarantee;
8use laminar_core::streaming::{BackpressureStrategy, StreamCheckpointConfig};
9
10/// What to do when an operator's input buffer exceeds its cap.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
12pub enum BackpressurePolicy {
13    /// Defer the producer; sources block on `send`. No data loss.
14    #[default]
15    Backpressure,
16    /// Drop oldest batches; counted in `shed_records_total`.
17    ShedOldest,
18    /// Error out the cycle.
19    Fail,
20}
21
22/// How unquoted SQL identifiers are matched against Arrow schema field names.
23///
24/// Defaults to `CaseSensitive` so mixed-case columns from external sources
25/// (Kafka, CDC, WebSocket) work without double-quoting.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum IdentifierCaseSensitivity {
28    /// Preserve case as-written (default).
29    #[default]
30    CaseSensitive,
31    /// Normalize unquoted identifiers to lowercase (standard SQL).
32    Lowercase,
33}
34
35/// S3 storage class tiering for checkpoint cost optimization.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct TieringConfig {
38    /// e.g. `"EXPRESS_ONE_ZONE"`, `"STANDARD"`.
39    pub hot_class: String,
40    /// e.g. `"STANDARD"`.
41    pub warm_class: String,
42    /// e.g. `"GLACIER_IR"`. Empty = no cold tier.
43    pub cold_class: String,
44    /// Seconds before hot-to-warm transition.
45    pub hot_retention_secs: u64,
46    /// Seconds before warm-to-cold transition. 0 = no cold tier.
47    pub warm_retention_secs: u64,
48}
49
50impl Default for TieringConfig {
51    fn default() -> Self {
52        Self {
53            hot_class: "STANDARD".to_string(),
54            warm_class: "STANDARD".to_string(),
55            cold_class: String::new(),
56            hot_retention_secs: 86400,    // 24h
57            warm_retention_secs: 604_800, // 7d
58        }
59    }
60}
61
62/// Configuration for a `LaminarDB` instance.
63#[derive(Debug, Clone)]
64pub struct LaminarConfig {
65    /// Streaming channel buffer size.
66    pub default_buffer_size: usize,
67    /// Backpressure strategy.
68    pub default_backpressure: BackpressureStrategy,
69    /// Checkpoint directory. `None` = in-memory only.
70    pub storage_dir: Option<PathBuf>,
71    /// Checkpoint config. `None` = disabled.
72    pub checkpoint: Option<StreamCheckpointConfig>,
73    /// Identifier case sensitivity.
74    pub identifier_case: IdentifierCaseSensitivity,
75    /// Cloud checkpoint URL, e.g. `s3://bucket/prefix`.
76    pub object_store_url: Option<String>,
77    /// Credential/config overrides for the object store.
78    pub object_store_options: HashMap<String, String>,
79    /// S3 tiering. `None` = default STANDARD.
80    pub tiering: Option<TieringConfig>,
81    /// Delivery guarantee.
82    pub delivery_guarantee: DeliveryGuarantee,
83    /// Per-operator state limit. At 80% warns, at 100% errors. `None` = unlimited.
84    pub max_state_bytes_per_operator: Option<usize>,
85
86    /// Source-to-coordinator channel capacity. `None` = 64.
87    pub pipeline_channel_capacity: Option<usize>,
88    /// Micro-batch coalescing window. `None` = 5ms connectors / 0 embedded.
89    pub pipeline_batch_window: Option<std::time::Duration>,
90    /// Drain budget per cycle (ns). `None` = 1ms.
91    pub pipeline_drain_budget_ns: Option<u64>,
92    /// Per-query budget (ns). `None` = 8ms.
93    pub pipeline_query_budget_ns: Option<u64>,
94    /// Per-port operator input-buffer cap (batches). `None` = 256.
95    pub pipeline_max_input_buf_batches: Option<usize>,
96    /// Per-port operator input-buffer cap (bytes). `None` = disabled.
97    pub pipeline_max_input_buf_bytes: Option<usize>,
98    /// Backpressure policy. See [`BackpressurePolicy`].
99    pub pipeline_backpressure_policy: BackpressurePolicy,
100}
101
102impl Default for LaminarConfig {
103    fn default() -> Self {
104        Self {
105            default_buffer_size: 65536,
106            default_backpressure: BackpressureStrategy::Block,
107            storage_dir: None,
108            checkpoint: None,
109            identifier_case: IdentifierCaseSensitivity::default(),
110            object_store_url: None,
111            object_store_options: HashMap::new(),
112            tiering: None,
113            delivery_guarantee: DeliveryGuarantee::default(),
114            max_state_bytes_per_operator: None,
115            pipeline_channel_capacity: None,
116            pipeline_batch_window: None,
117            pipeline_drain_budget_ns: None,
118            pipeline_query_budget_ns: None,
119            pipeline_max_input_buf_batches: None,
120            pipeline_max_input_buf_bytes: None,
121            pipeline_backpressure_policy: BackpressurePolicy::default(),
122        }
123    }
124}