Skip to main content

laminar_db/pipeline/
config.rs

1//! Pipeline configuration.
2
3use std::time::Duration;
4
5use laminar_connectors::connector::DeliveryGuarantee;
6
7/// Configuration for the event-driven connector pipeline.
8#[derive(Debug, Clone)]
9pub struct PipelineConfig {
10    /// Maximum records per `poll_batch()` call.
11    pub max_poll_records: usize,
12
13    /// Channel capacity for per-source `mpsc` sender → coordinator.
14    pub channel_capacity: usize,
15
16    /// Fallback poll interval when a source returns `data_ready_notify() == None`.
17    pub fallback_poll_interval: Duration,
18
19    /// Checkpoint interval (if checkpointing is enabled).
20    ///
21    /// This controls the **timer-based** checkpoint mode (at-least-once).
22    /// For exactly-once semantics, use barrier-aligned checkpoints via
23    /// [`PipelineCallback::checkpoint_with_barrier`](super::callback::PipelineCallback::checkpoint_with_barrier) instead. Timer-based
24    /// checkpoints capture offsets *before* operator state, which means
25    /// on recovery the consumer replays from the offset and operators may
26    /// re-process some records (at-least-once, not exactly-once).
27    pub checkpoint_interval: Option<Duration>,
28
29    /// Coordinator micro-batch window.
30    ///
31    /// After receiving the first event in a cycle, the coordinator sleeps
32    /// for this duration to let more events accumulate in the channel
33    /// before executing the SQL cycle. This bounds the number of SQL
34    /// executions per second while preserving low latency.
35    ///
36    /// During this window the coordinator does **not** drain the channel,
37    /// so the bounded channel provides natural backpressure to source
38    /// tasks (they block on `tx.send().await` when the channel fills).
39    ///
40    /// Set to `Duration::ZERO` to execute immediately (no batching).
41    pub batch_window: Duration,
42
43    /// Maximum time to wait for all sources to align on a checkpoint
44    /// barrier before cancelling the checkpoint.
45    pub barrier_alignment_timeout: Duration,
46
47    /// End-to-end delivery guarantee for the pipeline.
48    ///
49    /// Validated at startup: `ExactlyOnce` requires all sources to support
50    /// replay, all sinks to support exactly-once, and checkpointing to be
51    /// enabled. See [`DeliveryGuarantee`] for details.
52    pub delivery_guarantee: DeliveryGuarantee,
53}
54
55impl Default for PipelineConfig {
56    fn default() -> Self {
57        Self {
58            max_poll_records: 1024,
59            channel_capacity: 64,
60            fallback_poll_interval: Duration::from_millis(10),
61            checkpoint_interval: None,
62            batch_window: Duration::from_millis(5),
63            barrier_alignment_timeout: Duration::from_secs(30),
64            delivery_guarantee: DeliveryGuarantee::default(),
65        }
66    }
67}