laminar_db/pipeline/config.rs
1//! Pipeline configuration.
2
3use std::time::Duration;
4
5use laminar_connectors::connector::DeliveryGuarantee;
6
7/// Configuration for the event-driven connector pipeline.
8#[derive(Debug, Clone)]
9pub struct PipelineConfig {
10 /// Maximum records per `poll_batch()` call.
11 pub max_poll_records: usize,
12
13 /// Channel capacity for per-source `mpsc` sender → coordinator.
14 pub channel_capacity: usize,
15
16 /// Fallback poll interval when a source returns `data_ready_notify() == None`.
17 pub fallback_poll_interval: Duration,
18
19 /// Checkpoint interval (if checkpointing is enabled).
20 ///
21 /// This controls the **timer-based** checkpoint mode (at-least-once).
22 /// For exactly-once semantics, use barrier-aligned checkpoints via
23 /// [`PipelineCallback::checkpoint_with_barrier`](super::callback::PipelineCallback::checkpoint_with_barrier) instead. Timer-based
24 /// checkpoints capture offsets *before* operator state, which means
25 /// on recovery the consumer replays from the offset and operators may
26 /// re-process some records (at-least-once, not exactly-once).
27 pub checkpoint_interval: Option<Duration>,
28
29 /// Coordinator micro-batch window.
30 ///
31 /// After receiving the first event in a cycle, the coordinator sleeps
32 /// for this duration to let more events accumulate in the channel
33 /// before executing the SQL cycle. This bounds the number of SQL
34 /// executions per second while preserving low latency.
35 ///
36 /// During this window the coordinator does **not** drain the channel,
37 /// so the bounded channel provides natural backpressure to source
38 /// tasks (they block on `tx.send().await` when the channel fills).
39 ///
40 /// Set to `Duration::ZERO` to execute immediately (no batching).
41 pub batch_window: Duration,
42
43 /// Maximum time to wait for all sources to align on a checkpoint
44 /// barrier before cancelling the checkpoint.
45 pub barrier_alignment_timeout: Duration,
46
47 /// End-to-end delivery guarantee for the pipeline.
48 ///
49 /// Validated at startup: `ExactlyOnce` requires all sources to support
50 /// replay, all sinks to support exactly-once, and checkpointing to be
51 /// enabled. See [`DeliveryGuarantee`] for details.
52 pub delivery_guarantee: DeliveryGuarantee,
53}
54
55impl Default for PipelineConfig {
56 fn default() -> Self {
57 Self {
58 max_poll_records: 1024,
59 channel_capacity: 64,
60 fallback_poll_interval: Duration::from_millis(10),
61 checkpoint_interval: None,
62 batch_window: Duration::from_millis(5),
63 barrier_alignment_timeout: Duration::from_secs(30),
64 delivery_guarantee: DeliveryGuarantee::default(),
65 }
66 }
67}