laminar_db/pipeline/config.rs
1//! Pipeline configuration.
2
3use std::time::Duration;
4
5use laminar_connectors::connector::DeliveryGuarantee;
6
7use crate::config::BackpressurePolicy;
8
9/// Configuration for the event-driven connector pipeline.
10#[derive(Debug, Clone)]
11pub struct PipelineConfig {
12 /// Maximum records per `poll_batch()` call.
13 pub max_poll_records: usize,
14
15 /// Channel capacity for per-source `mpsc` sender → coordinator.
16 pub channel_capacity: usize,
17
18 /// Fallback poll interval when a source returns `data_ready_notify() == None`.
19 pub fallback_poll_interval: Duration,
20
21 /// Checkpoint interval (if checkpointing is enabled).
22 ///
23 /// This controls the **timer-based** checkpoint mode (at-least-once).
24 /// For exactly-once semantics, use barrier-aligned checkpoints via
25 /// [`PipelineCallback::checkpoint_with_barrier`](super::callback::PipelineCallback::checkpoint_with_barrier) instead. Timer-based
26 /// checkpoints capture offsets *before* operator state, which means
27 /// on recovery the consumer replays from the offset and operators may
28 /// re-process some records (at-least-once, not exactly-once).
29 pub checkpoint_interval: Option<Duration>,
30
31 /// Coordinator micro-batch window.
32 ///
33 /// After receiving the first event in a cycle, the coordinator sleeps
34 /// for this duration to let more events accumulate in the channel
35 /// before executing the SQL cycle. This bounds the number of SQL
36 /// executions per second while preserving low latency.
37 ///
38 /// During this window the coordinator does **not** drain the channel,
39 /// so the bounded channel provides natural backpressure to source
40 /// tasks (they block on `tx.send().await` when the channel fills).
41 ///
42 /// Set to `Duration::ZERO` to execute immediately (no batching).
43 pub batch_window: Duration,
44
45 /// Maximum time to wait for all sources to align on a checkpoint
46 /// barrier before cancelling the checkpoint.
47 pub barrier_alignment_timeout: Duration,
48
49 /// End-to-end delivery guarantee for the pipeline.
50 ///
51 /// Validated at startup: `ExactlyOnce` requires all sources to support
52 /// replay, all sinks to support exactly-once, and checkpointing to be
53 /// enabled. See [`DeliveryGuarantee`] for details.
54 pub delivery_guarantee: DeliveryGuarantee,
55
56 /// Maximum wall-clock time for a single processing cycle (nanoseconds).
57 /// Logged at DEBUG when exceeded. Default: 10ms.
58 pub cycle_budget_ns: u64,
59
60 /// Maximum wall-clock time for the message drain phase (nanoseconds).
61 /// The drain loop terminates early when this budget is exhausted.
62 /// Default: 1ms.
63 pub drain_budget_ns: u64,
64
65 /// Maximum wall-clock time for per-query execution within a cycle
66 /// (nanoseconds). When elapsed time exceeds this budget, remaining
67 /// queries are deferred to the next cycle. At least one query always
68 /// executes for forward progress. Default: 8ms.
69 pub query_budget_ns: u64,
70
71 /// Maximum wall-clock time for background work (barriers, checkpoint,
72 /// table polling) after SQL execution (nanoseconds). When exceeded,
73 /// low-priority tasks (table polling) are skipped. Default: 5ms.
74 pub background_budget_ns: u64,
75
76 /// Per-input-port batch cap. Default: 256.
77 pub max_input_buf_batches: usize,
78
79 /// Per-input-port byte cap. `None` = disabled.
80 pub max_input_buf_bytes: Option<usize>,
81
82 /// What to do when either cap is exceeded.
83 pub backpressure_policy: BackpressurePolicy,
84}
85
86impl Default for PipelineConfig {
87 fn default() -> Self {
88 Self {
89 max_poll_records: 1024,
90 channel_capacity: 64,
91 fallback_poll_interval: Duration::from_millis(10),
92 checkpoint_interval: None,
93 batch_window: Duration::from_millis(5),
94 barrier_alignment_timeout: Duration::from_secs(30),
95 delivery_guarantee: DeliveryGuarantee::default(),
96 cycle_budget_ns: 10_000_000, // 10ms
97 drain_budget_ns: 1_000_000, // 1ms
98 query_budget_ns: 8_000_000, // 8ms
99 background_budget_ns: 5_000_000, // 5ms
100 max_input_buf_batches: 256,
101 max_input_buf_bytes: None,
102 backpressure_policy: BackpressurePolicy::default(),
103 }
104 }
105}