Skip to main content

laminar_db/pipeline/
config.rs

1//! Pipeline configuration.
2
3use std::time::Duration;
4
5use laminar_connectors::connector::DeliveryGuarantee;
6
7use crate::config::BackpressurePolicy;
8
9/// Configuration for the event-driven connector pipeline.
10#[derive(Debug, Clone)]
11pub struct PipelineConfig {
12    /// Maximum records per `poll_batch()` call.
13    pub max_poll_records: usize,
14
15    /// Channel capacity for per-source `mpsc` sender → coordinator.
16    pub channel_capacity: usize,
17
18    /// Fallback poll interval when a source returns `data_ready_notify() == None`.
19    pub fallback_poll_interval: Duration,
20
21    /// Checkpoint interval (if checkpointing is enabled).
22    ///
23    /// This controls the **timer-based** checkpoint mode (at-least-once).
24    /// For exactly-once semantics, use barrier-aligned checkpoints via
25    /// [`PipelineCallback::checkpoint_with_barrier`](super::callback::PipelineCallback::checkpoint_with_barrier) instead. Timer-based
26    /// checkpoints capture offsets *before* operator state, which means
27    /// on recovery the consumer replays from the offset and operators may
28    /// re-process some records (at-least-once, not exactly-once).
29    pub checkpoint_interval: Option<Duration>,
30
31    /// Coordinator micro-batch window.
32    ///
33    /// After receiving the first event in a cycle, the coordinator sleeps
34    /// for this duration to let more events accumulate in the channel
35    /// before executing the SQL cycle. This bounds the number of SQL
36    /// executions per second while preserving low latency.
37    ///
38    /// During this window the coordinator does **not** drain the channel,
39    /// so the bounded channel provides natural backpressure to source
40    /// tasks (they block on `tx.send().await` when the channel fills).
41    ///
42    /// Set to `Duration::ZERO` to execute immediately (no batching).
43    pub batch_window: Duration,
44
45    /// Maximum time to wait for all sources to align on a checkpoint
46    /// barrier before cancelling the checkpoint.
47    pub barrier_alignment_timeout: Duration,
48
49    /// End-to-end delivery guarantee for the pipeline.
50    ///
51    /// Validated at startup: `ExactlyOnce` requires all sources to support
52    /// replay, all sinks to support exactly-once, and checkpointing to be
53    /// enabled. See [`DeliveryGuarantee`] for details.
54    pub delivery_guarantee: DeliveryGuarantee,
55
56    /// Maximum wall-clock time for a single processing cycle (nanoseconds).
57    /// Logged at DEBUG when exceeded. Default: 10ms.
58    pub cycle_budget_ns: u64,
59
60    /// Maximum wall-clock time for the message drain phase (nanoseconds).
61    /// The drain loop terminates early when this budget is exhausted.
62    /// Default: 1ms.
63    pub drain_budget_ns: u64,
64
65    /// Maximum wall-clock time for per-query execution within a cycle
66    /// (nanoseconds). When elapsed time exceeds this budget, remaining
67    /// queries are deferred to the next cycle. At least one query always
68    /// executes for forward progress. Default: 8ms.
69    pub query_budget_ns: u64,
70
71    /// Maximum wall-clock time for background work (barriers, checkpoint,
72    /// table polling) after SQL execution (nanoseconds). When exceeded,
73    /// low-priority tasks (table polling) are skipped. Default: 5ms.
74    pub background_budget_ns: u64,
75
76    /// Per-input-port batch cap. Default: 256.
77    pub max_input_buf_batches: usize,
78
79    /// Per-input-port byte cap. `None` = disabled.
80    pub max_input_buf_bytes: Option<usize>,
81
82    /// What to do when either cap is exceeded.
83    pub backpressure_policy: BackpressurePolicy,
84}
85
86impl Default for PipelineConfig {
87    fn default() -> Self {
88        Self {
89            max_poll_records: 1024,
90            channel_capacity: 64,
91            fallback_poll_interval: Duration::from_millis(10),
92            checkpoint_interval: None,
93            batch_window: Duration::from_millis(5),
94            barrier_alignment_timeout: Duration::from_secs(30),
95            delivery_guarantee: DeliveryGuarantee::default(),
96            cycle_budget_ns: 10_000_000,     // 10ms
97            drain_budget_ns: 1_000_000,      // 1ms
98            query_budget_ns: 8_000_000,      // 8ms
99            background_budget_ns: 5_000_000, // 5ms
100            max_input_buf_batches: 256,
101            max_input_buf_bytes: None,
102            backpressure_policy: BackpressurePolicy::default(),
103        }
104    }
105}