Skip to main content

laminar_core/dag/recovery/
mod.rs

1//! Snapshot and recovery management for DAG checkpoints.
2//!
3//! [`DagCheckpointSnapshot`] captures operator state at a point-in-time.
4//! It uses `std::collections::HashMap` (not `FxHashMap`) because it must
5//! be `Serialize`/`Deserialize` for persistence by the caller.
6//!
7//! [`DagRecoveryManager`] holds snapshots and provides recovery APIs.
8
9#![allow(clippy::disallowed_types)] // serde serialization requires std HashMap
10
11use std::collections::HashMap;
12
13use rustc_hash::FxHashMap;
14use serde::{Deserialize, Serialize};
15
16use crate::operator::OperatorState;
17
18use super::error::DagError;
19use super::topology::NodeId;
20
21/// Serializable form of [`OperatorState`].
22///
23/// Uses standard library types so it can derive `Serialize`/`Deserialize`.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct SerializableOperatorState {
26    /// Operator identifier.
27    pub operator_id: String,
28    /// Serialized state data.
29    pub data: Vec<u8>,
30}
31
32impl From<OperatorState> for SerializableOperatorState {
33    fn from(state: OperatorState) -> Self {
34        Self {
35            operator_id: state.operator_id,
36            data: state.data,
37        }
38    }
39}
40
41impl From<SerializableOperatorState> for OperatorState {
42    fn from(state: SerializableOperatorState) -> Self {
43        Self {
44            operator_id: state.operator_id,
45            data: state.data,
46        }
47    }
48}
49
50/// A point-in-time snapshot of the entire DAG's operator state.
51///
52/// Produced by [`DagCheckpointCoordinator::finalize_checkpoint()`](super::checkpoint::DagCheckpointCoordinator::finalize_checkpoint).
53/// The snapshot is serializable — persistence is the caller's responsibility.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct DagCheckpointSnapshot {
56    /// Unique checkpoint identifier.
57    pub checkpoint_id: u64,
58    /// Monotonically increasing epoch.
59    pub epoch: u64,
60    /// Timestamp when the checkpoint was triggered (millis since epoch).
61    pub timestamp: i64,
62    /// Per-node operator state, keyed by `NodeId.0`.
63    pub node_states: HashMap<u32, SerializableOperatorState>,
64    /// Per-source offset tracking (source name → offset).
65    pub source_offsets: HashMap<String, u64>,
66    /// Watermark at checkpoint time.
67    pub watermark: Option<i64>,
68}
69
70impl DagCheckpointSnapshot {
71    /// Creates a snapshot from a map of operator states.
72    ///
73    /// Converts from `FxHashMap<NodeId, OperatorState>` (internal) to
74    /// `HashMap<u32, SerializableOperatorState>` (serializable).
75    pub(crate) fn from_operator_states(
76        checkpoint_id: u64,
77        epoch: u64,
78        timestamp: i64,
79        states: &FxHashMap<NodeId, OperatorState>,
80    ) -> Self {
81        let node_states = states
82            .iter()
83            .map(|(node_id, state)| (node_id.0, SerializableOperatorState::from(state.clone())))
84            .collect();
85
86        Self {
87            checkpoint_id,
88            epoch,
89            timestamp,
90            node_states,
91            source_offsets: HashMap::new(),
92            watermark: None,
93        }
94    }
95
96    /// Converts node states back to `FxHashMap<NodeId, OperatorState>`.
97    #[must_use]
98    pub fn to_operator_states(&self) -> FxHashMap<NodeId, OperatorState> {
99        self.node_states
100            .iter()
101            .map(|(&id, state)| (NodeId(id), OperatorState::from(state.clone())))
102            .collect()
103    }
104}
105
106/// Recovered DAG state from a checkpoint snapshot.
107pub struct RecoveredDagState {
108    /// The snapshot that was used for recovery.
109    pub snapshot: DagCheckpointSnapshot,
110    /// Operator states converted back to internal representation.
111    pub operator_states: FxHashMap<NodeId, OperatorState>,
112    /// Source offsets for resuming consumption.
113    pub source_offsets: HashMap<String, u64>,
114    /// Watermark at the time of the checkpoint.
115    pub watermark: Option<i64>,
116}
117
118impl std::fmt::Debug for RecoveredDagState {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.debug_struct("RecoveredDagState")
121            .field("checkpoint_id", &self.snapshot.checkpoint_id)
122            .field("epoch", &self.snapshot.epoch)
123            .field("operator_count", &self.operator_states.len())
124            .field("source_offsets", &self.source_offsets)
125            .field("watermark", &self.watermark)
126            .finish()
127    }
128}
129
130/// Manages checkpoint snapshots and provides recovery.
131///
132/// Snapshots are held in memory. Persistence to disk or object storage
133/// is the caller's responsibility.
134#[derive(Debug)]
135pub struct DagRecoveryManager {
136    /// Stored snapshots, ordered by `checkpoint_id`.
137    snapshots: Vec<DagCheckpointSnapshot>,
138}
139
140impl DagRecoveryManager {
141    /// Creates an empty recovery manager.
142    #[must_use]
143    pub fn new() -> Self {
144        Self {
145            snapshots: Vec::new(),
146        }
147    }
148
149    /// Creates a recovery manager pre-loaded with snapshots.
150    #[must_use]
151    pub fn with_snapshots(snapshots: Vec<DagCheckpointSnapshot>) -> Self {
152        Self { snapshots }
153    }
154
155    /// Adds a snapshot to the manager.
156    pub fn add_snapshot(&mut self, snapshot: DagCheckpointSnapshot) {
157        self.snapshots.push(snapshot);
158    }
159
160    /// Recovers from the latest (highest `checkpoint_id`) snapshot.
161    ///
162    /// # Errors
163    ///
164    /// Returns [`DagError::CheckpointNotFound`] if no snapshots exist.
165    pub fn recover_latest(&self) -> Result<RecoveredDagState, DagError> {
166        let snapshot = self
167            .snapshots
168            .iter()
169            .max_by_key(|s| s.checkpoint_id)
170            .ok_or(DagError::CheckpointNotFound)?
171            .clone();
172
173        Ok(Self::build_recovered_state(snapshot))
174    }
175
176    /// Recovers from a specific checkpoint by ID.
177    ///
178    /// # Errors
179    ///
180    /// Returns [`DagError::CheckpointNotFound`] if the checkpoint ID
181    /// does not exist.
182    pub fn recover_by_id(&self, checkpoint_id: u64) -> Result<RecoveredDagState, DagError> {
183        let snapshot = self
184            .snapshots
185            .iter()
186            .find(|s| s.checkpoint_id == checkpoint_id)
187            .ok_or(DagError::CheckpointNotFound)?
188            .clone();
189
190        Ok(Self::build_recovered_state(snapshot))
191    }
192
193    /// Returns the number of stored snapshots.
194    #[must_use]
195    pub fn snapshot_count(&self) -> usize {
196        self.snapshots.len()
197    }
198
199    /// Returns whether any snapshots are available.
200    #[must_use]
201    pub fn has_snapshots(&self) -> bool {
202        !self.snapshots.is_empty()
203    }
204
205    /// Builds a `RecoveredDagState` from a snapshot.
206    fn build_recovered_state(snapshot: DagCheckpointSnapshot) -> RecoveredDagState {
207        let operator_states = snapshot.to_operator_states();
208        let source_offsets = snapshot.source_offsets.clone();
209        let watermark = snapshot.watermark;
210
211        RecoveredDagState {
212            snapshot,
213            operator_states,
214            source_offsets,
215            watermark,
216        }
217    }
218}
219
220impl Default for DagRecoveryManager {
221    fn default() -> Self {
222        Self::new()
223    }
224}
225
226#[cfg(test)]
227mod tests;