Skip to main content

laminar_core/dag/
checkpoint.rs

1//! Chandy-Lamport barrier checkpointing for DAG pipelines.
2//!
3//! **Note:** This module is used by the DAG execution model only. The TPC
4//! pipeline path uses [`CheckpointCoordinator`](crate::checkpoint) and
5//! barrier injection via [`CheckpointBarrierInjector`](crate::checkpoint::CheckpointBarrierInjector).
6//! Both models share the same barrier types from [`crate::checkpoint::barrier`].
7//!
8//! This module implements barrier-based checkpointing:
9//!
10//! - [`CheckpointBarrier`] — marker injected at source nodes
11//! - [`BarrierAligner`] — buffers events at fan-in (MPSC) nodes until all
12//!   upstream inputs have delivered their barrier
13//! - [`DagCheckpointCoordinator`] — Ring 1 orchestrator that triggers
14//!   checkpoints, tracks progress, and produces snapshots
15//! - [`DagCheckpointConfig`] — tuning knobs (interval, timeout, retention)
16//!
17//! Barriers do NOT flow through event queues — they are handled by a separate
18//! orchestration path, keeping the hot-path [`Event`]
19//! type unchanged.
20
21use std::collections::VecDeque;
22use std::time::Duration;
23
24use rustc_hash::FxHashMap;
25
26use crate::operator::{Event, OperatorState};
27
28use super::error::DagError;
29use super::recovery::DagCheckpointSnapshot;
30use super::topology::NodeId;
31
32/// Checkpoint identifier.
33pub type CheckpointId = u64;
34
35/// Barrier type for checkpoint coordination.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum BarrierType {
38    /// All inputs must deliver the barrier before the node snapshots.
39    /// Events from already-aligned inputs are buffered until alignment.
40    Aligned,
41}
42
43/// A checkpoint barrier injected at source nodes and propagated through the DAG.
44#[derive(Debug, Clone)]
45pub struct CheckpointBarrier {
46    /// Unique identifier for this checkpoint.
47    pub checkpoint_id: CheckpointId,
48    /// Monotonically increasing epoch counter.
49    pub epoch: u64,
50    /// Timestamp when the barrier was created (event-time millis).
51    pub timestamp: i64,
52    /// Barrier alignment strategy.
53    pub barrier_type: BarrierType,
54}
55
56/// Configuration for DAG checkpointing.
57#[derive(Debug, Clone)]
58pub struct DagCheckpointConfig {
59    /// How often to trigger checkpoints.
60    pub interval: Duration,
61    /// Barrier alignment strategy.
62    pub barrier_type: BarrierType,
63    /// Maximum time to wait for barrier alignment at fan-in nodes.
64    pub alignment_timeout: Duration,
65    /// Whether to use incremental checkpoints (future use).
66    pub incremental: bool,
67    /// Maximum number of concurrent in-flight checkpoints.
68    pub max_concurrent: usize,
69    /// Maximum number of completed snapshots to retain.
70    pub max_retained: usize,
71}
72
73impl Default for DagCheckpointConfig {
74    fn default() -> Self {
75        Self {
76            interval: Duration::from_secs(60),
77            barrier_type: BarrierType::Aligned,
78            alignment_timeout: Duration::from_secs(10),
79            incremental: false,
80            max_concurrent: 1,
81            max_retained: 3,
82        }
83    }
84}
85
86// ---------------------------------------------------------------------------
87// BarrierAligner
88// ---------------------------------------------------------------------------
89
90/// Result of presenting a barrier to the aligner.
91#[derive(Debug)]
92pub enum AlignmentResult {
93    /// Not all inputs have delivered their barrier yet.
94    Pending,
95    /// All inputs have delivered their barrier; checkpoint can proceed.
96    Aligned {
97        /// The checkpoint barrier (from the final input).
98        barrier: CheckpointBarrier,
99        /// Events that were buffered from already-aligned inputs.
100        buffered_events: Vec<Event>,
101    },
102}
103
104/// Buffers events at fan-in (MPSC) nodes until all upstream inputs
105/// have delivered their checkpoint barrier.
106///
107/// For nodes with a single input (`expected_inputs == 1`), alignment
108/// is immediate — no buffering occurs.
109#[derive(Debug)]
110pub struct BarrierAligner {
111    /// Number of upstream inputs that must deliver a barrier.
112    expected_inputs: usize,
113    /// Barriers received so far, keyed by source node.
114    barriers_received: FxHashMap<NodeId, CheckpointBarrier>,
115    /// Events buffered from sources that have already delivered their barrier.
116    buffered_events: FxHashMap<NodeId, VecDeque<Event>>,
117    /// The checkpoint currently being aligned (if any).
118    current_checkpoint_id: Option<CheckpointId>,
119}
120
121impl BarrierAligner {
122    /// Creates a new barrier aligner for a node with `expected_inputs` upstream edges.
123    #[must_use]
124    pub fn new(expected_inputs: usize) -> Self {
125        Self {
126            expected_inputs,
127            barriers_received: FxHashMap::default(),
128            buffered_events: FxHashMap::default(),
129            current_checkpoint_id: None,
130        }
131    }
132
133    /// Processes a barrier arriving from `source_node`.
134    ///
135    /// Returns [`AlignmentResult::Pending`] until all inputs have reported,
136    /// then returns [`AlignmentResult::Aligned`] with the barrier and any
137    /// buffered events sorted by event timestamp.
138    ///
139    /// ## Cross-input ordering
140    ///
141    /// Buffered events from different upstream inputs are merged and sorted
142    /// by `event.timestamp` to ensure deterministic event-time ordering at
143    /// fan-in nodes. Operators that require event-time ordering can rely on
144    /// this guarantee for the post-alignment batch.
145    ///
146    /// # Panics
147    ///
148    /// Panics if the internal barrier map is empty after insertion (should
149    /// never happen).
150    pub fn on_barrier(
151        &mut self,
152        source_node: NodeId,
153        barrier: CheckpointBarrier,
154    ) -> AlignmentResult {
155        self.current_checkpoint_id = Some(barrier.checkpoint_id);
156        self.barriers_received.insert(source_node, barrier);
157
158        if self.barriers_received.len() >= self.expected_inputs {
159            // All inputs aligned — drain buffered events.
160            // Sort by event timestamp to ensure deterministic ordering
161            // regardless of FxHashMap iteration order.
162            let mut all_buffered = Vec::new();
163            for (_node, mut events) in self.buffered_events.drain() {
164                all_buffered.extend(events.drain(..));
165            }
166            all_buffered.sort_by_key(|e| e.timestamp);
167
168            // Take the last barrier received as the canonical one.
169            let barrier = self
170                .barriers_received
171                .values()
172                .last()
173                .cloned()
174                .expect("at least one barrier");
175
176            AlignmentResult::Aligned {
177                barrier,
178                buffered_events: all_buffered,
179            }
180        } else {
181            AlignmentResult::Pending
182        }
183    }
184
185    /// Buffers an event from `source_node` if that source has already
186    /// delivered its barrier for the current checkpoint.
187    ///
188    /// Returns `true` if the event was buffered (source already aligned),
189    /// `false` if the source has not yet delivered its barrier (event should
190    /// be processed normally).
191    pub fn buffer_if_aligned(&mut self, source_node: NodeId, event: Event) -> bool {
192        if self.barriers_received.contains_key(&source_node) {
193            self.buffered_events
194                .entry(source_node)
195                .or_default()
196                .push_back(event);
197            true
198        } else {
199            false
200        }
201    }
202
203    /// Returns whether `source_node` has already delivered its barrier.
204    #[must_use]
205    pub fn is_source_aligned(&self, source_node: NodeId) -> bool {
206        self.barriers_received.contains_key(&source_node)
207    }
208
209    /// Resets alignment state after a checkpoint completes.
210    pub fn complete_checkpoint(&mut self) {
211        self.barriers_received.clear();
212        self.buffered_events.clear();
213        self.current_checkpoint_id = None;
214    }
215
216    /// Returns how many barriers have been received so far.
217    #[must_use]
218    pub fn barriers_received_count(&self) -> usize {
219        self.barriers_received.len()
220    }
221
222    /// Returns the expected number of upstream inputs.
223    #[must_use]
224    pub fn expected_inputs(&self) -> usize {
225        self.expected_inputs
226    }
227}
228
229// ---------------------------------------------------------------------------
230// DagCheckpointCoordinator
231// ---------------------------------------------------------------------------
232
233/// Tracks progress of an in-flight checkpoint.
234struct CheckpointProgress {
235    /// Checkpoint identifier.
236    checkpoint_id: CheckpointId,
237    /// Epoch number.
238    epoch: u64,
239    /// Operator states reported by completed nodes.
240    completed_nodes: FxHashMap<NodeId, OperatorState>,
241    /// Nodes that have not yet reported.
242    pending_nodes: Vec<NodeId>,
243    /// When the checkpoint was triggered (event-time millis).
244    triggered_at: i64,
245}
246
247/// Ring 1 checkpoint coordinator.
248///
249/// Orchestrates the checkpoint lifecycle:
250/// 1. [`trigger_checkpoint()`](Self::trigger_checkpoint) — creates a barrier
251/// 2. Barrier propagates through the DAG (external to this struct)
252/// 3. [`on_node_snapshot_complete()`](Self::on_node_snapshot_complete) —
253///    each node reports its state
254/// 4. [`finalize_checkpoint()`](Self::finalize_checkpoint) — produces a
255///    [`DagCheckpointSnapshot`]
256pub struct DagCheckpointCoordinator {
257    /// Configuration.
258    config: DagCheckpointConfig,
259    /// Source node IDs (barrier injection points).
260    source_nodes: Vec<NodeId>,
261    /// All node IDs in the DAG.
262    all_nodes: Vec<NodeId>,
263    /// Next epoch counter.
264    next_epoch: u64,
265    /// Next checkpoint ID counter.
266    next_checkpoint_id: CheckpointId,
267    /// Currently in-flight checkpoint progress.
268    in_progress: Option<CheckpointProgress>,
269    /// Completed snapshots (bounded by `max_retained`).
270    completed_snapshots: Vec<DagCheckpointSnapshot>,
271    /// Maximum number of snapshots to retain.
272    max_retained: usize,
273}
274
275impl DagCheckpointCoordinator {
276    /// Creates a new checkpoint coordinator.
277    ///
278    /// # Arguments
279    ///
280    /// * `source_nodes` — DAG source node IDs (barrier injection points)
281    /// * `all_nodes` — all node IDs in the DAG
282    /// * `config` — checkpoint configuration
283    #[must_use]
284    pub fn new(
285        source_nodes: Vec<NodeId>,
286        all_nodes: Vec<NodeId>,
287        config: DagCheckpointConfig,
288    ) -> Self {
289        let max_retained = config.max_retained;
290        Self {
291            config,
292            source_nodes,
293            all_nodes,
294            next_epoch: 1,
295            next_checkpoint_id: 1,
296            in_progress: None,
297            completed_snapshots: Vec::new(),
298            max_retained,
299        }
300    }
301
302    /// Triggers a new checkpoint by creating a barrier.
303    ///
304    /// # Errors
305    ///
306    /// Returns [`DagError::CheckpointInProgress`] if a checkpoint is already
307    /// in flight.
308    pub fn trigger_checkpoint(&mut self) -> Result<CheckpointBarrier, DagError> {
309        if let Some(ref progress) = self.in_progress {
310            return Err(DagError::CheckpointInProgress(progress.epoch));
311        }
312
313        let checkpoint_id = self.next_checkpoint_id;
314        self.next_checkpoint_id += 1;
315        let epoch = self.next_epoch;
316        self.next_epoch += 1;
317
318        #[allow(clippy::cast_possible_truncation)]
319        // Timestamp ms fits i64 for ~292 years from epoch
320        let timestamp = std::time::SystemTime::now()
321            .duration_since(std::time::UNIX_EPOCH)
322            .map_or(0, |d| d.as_millis() as i64);
323
324        let barrier = CheckpointBarrier {
325            checkpoint_id,
326            epoch,
327            timestamp,
328            barrier_type: self.config.barrier_type,
329        };
330
331        self.in_progress = Some(CheckpointProgress {
332            checkpoint_id,
333            epoch,
334            completed_nodes: FxHashMap::default(),
335            pending_nodes: self.all_nodes.clone(),
336            triggered_at: barrier.timestamp,
337        });
338
339        Ok(barrier)
340    }
341
342    /// Records that a node has completed its snapshot.
343    ///
344    /// Returns `true` if all nodes have now reported (checkpoint is ready
345    /// to finalize).
346    pub fn on_node_snapshot_complete(&mut self, node_id: NodeId, state: OperatorState) -> bool {
347        if let Some(ref mut progress) = self.in_progress {
348            progress.pending_nodes.retain(|&n| n != node_id);
349            progress.completed_nodes.insert(node_id, state);
350            progress.pending_nodes.is_empty()
351        } else {
352            false
353        }
354    }
355
356    /// Finalizes the current checkpoint, producing a snapshot.
357    ///
358    /// # Errors
359    ///
360    /// Returns [`DagError::NoCheckpointInProgress`] if no checkpoint is active.
361    /// Returns [`DagError::CheckpointIncomplete`] if not all nodes have reported.
362    pub fn finalize_checkpoint(&mut self) -> Result<DagCheckpointSnapshot, DagError> {
363        let progress = self
364            .in_progress
365            .take()
366            .ok_or(DagError::NoCheckpointInProgress)?;
367
368        if !progress.pending_nodes.is_empty() {
369            let pending = progress.pending_nodes.len();
370            // Put progress back so it can continue.
371            self.in_progress = Some(progress);
372            return Err(DagError::CheckpointIncomplete { pending });
373        }
374
375        let snapshot = DagCheckpointSnapshot::from_operator_states(
376            progress.checkpoint_id,
377            progress.epoch,
378            progress.triggered_at,
379            &progress.completed_nodes,
380        );
381
382        self.completed_snapshots.push(snapshot.clone());
383
384        // Trim old snapshots.
385        while self.completed_snapshots.len() > self.max_retained {
386            self.completed_snapshots.remove(0);
387        }
388
389        Ok(snapshot)
390    }
391
392    /// Returns whether a checkpoint is currently in progress.
393    #[must_use]
394    pub fn is_checkpoint_in_progress(&self) -> bool {
395        self.in_progress.is_some()
396    }
397
398    /// Returns completed snapshots.
399    #[must_use]
400    pub fn completed_snapshots(&self) -> &[DagCheckpointSnapshot] {
401        &self.completed_snapshots
402    }
403
404    /// Returns the latest completed snapshot (if any).
405    #[must_use]
406    pub fn latest_snapshot(&self) -> Option<&DagCheckpointSnapshot> {
407        self.completed_snapshots.last()
408    }
409
410    /// Returns the current epoch counter (next epoch to be assigned).
411    #[must_use]
412    pub fn current_epoch(&self) -> u64 {
413        self.next_epoch
414    }
415
416    /// Returns the source node IDs.
417    #[must_use]
418    pub fn source_nodes(&self) -> &[NodeId] {
419        &self.source_nodes
420    }
421}
422
423impl std::fmt::Debug for DagCheckpointCoordinator {
424    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425        f.debug_struct("DagCheckpointCoordinator")
426            .field("next_epoch", &self.next_epoch)
427            .field("next_checkpoint_id", &self.next_checkpoint_id)
428            .field("in_progress", &self.in_progress.is_some())
429            .field("completed_count", &self.completed_snapshots.len())
430            .field("source_nodes", &self.source_nodes)
431            .finish_non_exhaustive()
432    }
433}