laminar_core/dag/checkpoint.rs
1//! Chandy-Lamport barrier checkpointing for DAG pipelines.
2//!
3//! **Note:** This module is used by the DAG execution model only. The TPC
4//! pipeline path uses [`CheckpointCoordinator`](crate::checkpoint) and
5//! barrier injection via [`CheckpointBarrierInjector`](crate::checkpoint::CheckpointBarrierInjector).
6//! Both models share the same barrier types from [`crate::checkpoint::barrier`].
7//!
8//! This module implements barrier-based checkpointing:
9//!
10//! - [`CheckpointBarrier`] — marker injected at source nodes
11//! - [`BarrierAligner`] — buffers events at fan-in (MPSC) nodes until all
12//! upstream inputs have delivered their barrier
13//! - [`DagCheckpointCoordinator`] — Ring 1 orchestrator that triggers
14//! checkpoints, tracks progress, and produces snapshots
15//! - [`DagCheckpointConfig`] — tuning knobs (interval, timeout, retention)
16//!
17//! Barriers do NOT flow through event queues — they are handled by a separate
18//! orchestration path, keeping the hot-path [`Event`]
19//! type unchanged.
20
21use std::collections::VecDeque;
22use std::time::Duration;
23
24use rustc_hash::FxHashMap;
25
26use crate::operator::{Event, OperatorState};
27
28use super::error::DagError;
29use super::recovery::DagCheckpointSnapshot;
30use super::topology::NodeId;
31
32/// Checkpoint identifier.
33pub type CheckpointId = u64;
34
35/// Barrier type for checkpoint coordination.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum BarrierType {
38 /// All inputs must deliver the barrier before the node snapshots.
39 /// Events from already-aligned inputs are buffered until alignment.
40 Aligned,
41}
42
43/// A checkpoint barrier injected at source nodes and propagated through the DAG.
44#[derive(Debug, Clone)]
45pub struct CheckpointBarrier {
46 /// Unique identifier for this checkpoint.
47 pub checkpoint_id: CheckpointId,
48 /// Monotonically increasing epoch counter.
49 pub epoch: u64,
50 /// Timestamp when the barrier was created (event-time millis).
51 pub timestamp: i64,
52 /// Barrier alignment strategy.
53 pub barrier_type: BarrierType,
54}
55
56/// Configuration for DAG checkpointing.
57#[derive(Debug, Clone)]
58pub struct DagCheckpointConfig {
59 /// How often to trigger checkpoints.
60 pub interval: Duration,
61 /// Barrier alignment strategy.
62 pub barrier_type: BarrierType,
63 /// Maximum time to wait for barrier alignment at fan-in nodes.
64 pub alignment_timeout: Duration,
65 /// Whether to use incremental checkpoints (future use).
66 pub incremental: bool,
67 /// Maximum number of concurrent in-flight checkpoints.
68 pub max_concurrent: usize,
69 /// Maximum number of completed snapshots to retain.
70 pub max_retained: usize,
71}
72
73impl Default for DagCheckpointConfig {
74 fn default() -> Self {
75 Self {
76 interval: Duration::from_secs(60),
77 barrier_type: BarrierType::Aligned,
78 alignment_timeout: Duration::from_secs(10),
79 incremental: false,
80 max_concurrent: 1,
81 max_retained: 3,
82 }
83 }
84}
85
86// ---------------------------------------------------------------------------
87// BarrierAligner
88// ---------------------------------------------------------------------------
89
90/// Result of presenting a barrier to the aligner.
91#[derive(Debug)]
92pub enum AlignmentResult {
93 /// Not all inputs have delivered their barrier yet.
94 Pending,
95 /// All inputs have delivered their barrier; checkpoint can proceed.
96 Aligned {
97 /// The checkpoint barrier (from the final input).
98 barrier: CheckpointBarrier,
99 /// Events that were buffered from already-aligned inputs.
100 buffered_events: Vec<Event>,
101 },
102}
103
104/// Buffers events at fan-in (MPSC) nodes until all upstream inputs
105/// have delivered their checkpoint barrier.
106///
107/// For nodes with a single input (`expected_inputs == 1`), alignment
108/// is immediate — no buffering occurs.
109#[derive(Debug)]
110pub struct BarrierAligner {
111 /// Number of upstream inputs that must deliver a barrier.
112 expected_inputs: usize,
113 /// Barriers received so far, keyed by source node.
114 barriers_received: FxHashMap<NodeId, CheckpointBarrier>,
115 /// Events buffered from sources that have already delivered their barrier.
116 buffered_events: FxHashMap<NodeId, VecDeque<Event>>,
117 /// The checkpoint currently being aligned (if any).
118 current_checkpoint_id: Option<CheckpointId>,
119}
120
121impl BarrierAligner {
122 /// Creates a new barrier aligner for a node with `expected_inputs` upstream edges.
123 #[must_use]
124 pub fn new(expected_inputs: usize) -> Self {
125 Self {
126 expected_inputs,
127 barriers_received: FxHashMap::default(),
128 buffered_events: FxHashMap::default(),
129 current_checkpoint_id: None,
130 }
131 }
132
133 /// Processes a barrier arriving from `source_node`.
134 ///
135 /// Returns [`AlignmentResult::Pending`] until all inputs have reported,
136 /// then returns [`AlignmentResult::Aligned`] with the barrier and any
137 /// buffered events sorted by event timestamp.
138 ///
139 /// ## Cross-input ordering
140 ///
141 /// Buffered events from different upstream inputs are merged and sorted
142 /// by `event.timestamp` to ensure deterministic event-time ordering at
143 /// fan-in nodes. Operators that require event-time ordering can rely on
144 /// this guarantee for the post-alignment batch.
145 ///
146 /// # Panics
147 ///
148 /// Panics if the internal barrier map is empty after insertion (should
149 /// never happen).
150 pub fn on_barrier(
151 &mut self,
152 source_node: NodeId,
153 barrier: CheckpointBarrier,
154 ) -> AlignmentResult {
155 self.current_checkpoint_id = Some(barrier.checkpoint_id);
156 self.barriers_received.insert(source_node, barrier);
157
158 if self.barriers_received.len() >= self.expected_inputs {
159 // All inputs aligned — drain buffered events.
160 // Sort by event timestamp to ensure deterministic ordering
161 // regardless of FxHashMap iteration order.
162 let mut all_buffered = Vec::new();
163 for (_node, mut events) in self.buffered_events.drain() {
164 all_buffered.extend(events.drain(..));
165 }
166 all_buffered.sort_by_key(|e| e.timestamp);
167
168 // Take the last barrier received as the canonical one.
169 let barrier = self
170 .barriers_received
171 .values()
172 .last()
173 .cloned()
174 .expect("at least one barrier");
175
176 AlignmentResult::Aligned {
177 barrier,
178 buffered_events: all_buffered,
179 }
180 } else {
181 AlignmentResult::Pending
182 }
183 }
184
185 /// Buffers an event from `source_node` if that source has already
186 /// delivered its barrier for the current checkpoint.
187 ///
188 /// Returns `true` if the event was buffered (source already aligned),
189 /// `false` if the source has not yet delivered its barrier (event should
190 /// be processed normally).
191 pub fn buffer_if_aligned(&mut self, source_node: NodeId, event: Event) -> bool {
192 if self.barriers_received.contains_key(&source_node) {
193 self.buffered_events
194 .entry(source_node)
195 .or_default()
196 .push_back(event);
197 true
198 } else {
199 false
200 }
201 }
202
203 /// Returns whether `source_node` has already delivered its barrier.
204 #[must_use]
205 pub fn is_source_aligned(&self, source_node: NodeId) -> bool {
206 self.barriers_received.contains_key(&source_node)
207 }
208
209 /// Resets alignment state after a checkpoint completes.
210 pub fn complete_checkpoint(&mut self) {
211 self.barriers_received.clear();
212 self.buffered_events.clear();
213 self.current_checkpoint_id = None;
214 }
215
216 /// Returns how many barriers have been received so far.
217 #[must_use]
218 pub fn barriers_received_count(&self) -> usize {
219 self.barriers_received.len()
220 }
221
222 /// Returns the expected number of upstream inputs.
223 #[must_use]
224 pub fn expected_inputs(&self) -> usize {
225 self.expected_inputs
226 }
227}
228
229// ---------------------------------------------------------------------------
230// DagCheckpointCoordinator
231// ---------------------------------------------------------------------------
232
233/// Tracks progress of an in-flight checkpoint.
234struct CheckpointProgress {
235 /// Checkpoint identifier.
236 checkpoint_id: CheckpointId,
237 /// Epoch number.
238 epoch: u64,
239 /// Operator states reported by completed nodes.
240 completed_nodes: FxHashMap<NodeId, OperatorState>,
241 /// Nodes that have not yet reported.
242 pending_nodes: Vec<NodeId>,
243 /// When the checkpoint was triggered (event-time millis).
244 triggered_at: i64,
245}
246
247/// Ring 1 checkpoint coordinator.
248///
249/// Orchestrates the checkpoint lifecycle:
250/// 1. [`trigger_checkpoint()`](Self::trigger_checkpoint) — creates a barrier
251/// 2. Barrier propagates through the DAG (external to this struct)
252/// 3. [`on_node_snapshot_complete()`](Self::on_node_snapshot_complete) —
253/// each node reports its state
254/// 4. [`finalize_checkpoint()`](Self::finalize_checkpoint) — produces a
255/// [`DagCheckpointSnapshot`]
256pub struct DagCheckpointCoordinator {
257 /// Configuration.
258 config: DagCheckpointConfig,
259 /// Source node IDs (barrier injection points).
260 source_nodes: Vec<NodeId>,
261 /// All node IDs in the DAG.
262 all_nodes: Vec<NodeId>,
263 /// Next epoch counter.
264 next_epoch: u64,
265 /// Next checkpoint ID counter.
266 next_checkpoint_id: CheckpointId,
267 /// Currently in-flight checkpoint progress.
268 in_progress: Option<CheckpointProgress>,
269 /// Completed snapshots (bounded by `max_retained`).
270 completed_snapshots: Vec<DagCheckpointSnapshot>,
271 /// Maximum number of snapshots to retain.
272 max_retained: usize,
273}
274
275impl DagCheckpointCoordinator {
276 /// Creates a new checkpoint coordinator.
277 ///
278 /// # Arguments
279 ///
280 /// * `source_nodes` — DAG source node IDs (barrier injection points)
281 /// * `all_nodes` — all node IDs in the DAG
282 /// * `config` — checkpoint configuration
283 #[must_use]
284 pub fn new(
285 source_nodes: Vec<NodeId>,
286 all_nodes: Vec<NodeId>,
287 config: DagCheckpointConfig,
288 ) -> Self {
289 let max_retained = config.max_retained;
290 Self {
291 config,
292 source_nodes,
293 all_nodes,
294 next_epoch: 1,
295 next_checkpoint_id: 1,
296 in_progress: None,
297 completed_snapshots: Vec::new(),
298 max_retained,
299 }
300 }
301
302 /// Triggers a new checkpoint by creating a barrier.
303 ///
304 /// # Errors
305 ///
306 /// Returns [`DagError::CheckpointInProgress`] if a checkpoint is already
307 /// in flight.
308 pub fn trigger_checkpoint(&mut self) -> Result<CheckpointBarrier, DagError> {
309 if let Some(ref progress) = self.in_progress {
310 return Err(DagError::CheckpointInProgress(progress.epoch));
311 }
312
313 let checkpoint_id = self.next_checkpoint_id;
314 self.next_checkpoint_id += 1;
315 let epoch = self.next_epoch;
316 self.next_epoch += 1;
317
318 #[allow(clippy::cast_possible_truncation)]
319 // Timestamp ms fits i64 for ~292 years from epoch
320 let timestamp = std::time::SystemTime::now()
321 .duration_since(std::time::UNIX_EPOCH)
322 .map_or(0, |d| d.as_millis() as i64);
323
324 let barrier = CheckpointBarrier {
325 checkpoint_id,
326 epoch,
327 timestamp,
328 barrier_type: self.config.barrier_type,
329 };
330
331 self.in_progress = Some(CheckpointProgress {
332 checkpoint_id,
333 epoch,
334 completed_nodes: FxHashMap::default(),
335 pending_nodes: self.all_nodes.clone(),
336 triggered_at: barrier.timestamp,
337 });
338
339 Ok(barrier)
340 }
341
342 /// Records that a node has completed its snapshot.
343 ///
344 /// Returns `true` if all nodes have now reported (checkpoint is ready
345 /// to finalize).
346 pub fn on_node_snapshot_complete(&mut self, node_id: NodeId, state: OperatorState) -> bool {
347 if let Some(ref mut progress) = self.in_progress {
348 progress.pending_nodes.retain(|&n| n != node_id);
349 progress.completed_nodes.insert(node_id, state);
350 progress.pending_nodes.is_empty()
351 } else {
352 false
353 }
354 }
355
356 /// Finalizes the current checkpoint, producing a snapshot.
357 ///
358 /// # Errors
359 ///
360 /// Returns [`DagError::NoCheckpointInProgress`] if no checkpoint is active.
361 /// Returns [`DagError::CheckpointIncomplete`] if not all nodes have reported.
362 pub fn finalize_checkpoint(&mut self) -> Result<DagCheckpointSnapshot, DagError> {
363 let progress = self
364 .in_progress
365 .take()
366 .ok_or(DagError::NoCheckpointInProgress)?;
367
368 if !progress.pending_nodes.is_empty() {
369 let pending = progress.pending_nodes.len();
370 // Put progress back so it can continue.
371 self.in_progress = Some(progress);
372 return Err(DagError::CheckpointIncomplete { pending });
373 }
374
375 let snapshot = DagCheckpointSnapshot::from_operator_states(
376 progress.checkpoint_id,
377 progress.epoch,
378 progress.triggered_at,
379 &progress.completed_nodes,
380 );
381
382 self.completed_snapshots.push(snapshot.clone());
383
384 // Trim old snapshots.
385 while self.completed_snapshots.len() > self.max_retained {
386 self.completed_snapshots.remove(0);
387 }
388
389 Ok(snapshot)
390 }
391
392 /// Returns whether a checkpoint is currently in progress.
393 #[must_use]
394 pub fn is_checkpoint_in_progress(&self) -> bool {
395 self.in_progress.is_some()
396 }
397
398 /// Returns completed snapshots.
399 #[must_use]
400 pub fn completed_snapshots(&self) -> &[DagCheckpointSnapshot] {
401 &self.completed_snapshots
402 }
403
404 /// Returns the latest completed snapshot (if any).
405 #[must_use]
406 pub fn latest_snapshot(&self) -> Option<&DagCheckpointSnapshot> {
407 self.completed_snapshots.last()
408 }
409
410 /// Returns the current epoch counter (next epoch to be assigned).
411 #[must_use]
412 pub fn current_epoch(&self) -> u64 {
413 self.next_epoch
414 }
415
416 /// Returns the source node IDs.
417 #[must_use]
418 pub fn source_nodes(&self) -> &[NodeId] {
419 &self.source_nodes
420 }
421}
422
423impl std::fmt::Debug for DagCheckpointCoordinator {
424 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425 f.debug_struct("DagCheckpointCoordinator")
426 .field("next_epoch", &self.next_epoch)
427 .field("next_checkpoint_id", &self.next_checkpoint_id)
428 .field("in_progress", &self.in_progress.is_some())
429 .field("completed_count", &self.completed_snapshots.len())
430 .field("source_nodes", &self.source_nodes)
431 .finish_non_exhaustive()
432 }
433}