laminar_core/dag/recovery/
mod.rs1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
12
13use rustc_hash::FxHashMap;
14use serde::{Deserialize, Serialize};
15
16use crate::operator::OperatorState;
17
18use super::error::DagError;
19use super::topology::NodeId;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct SerializableOperatorState {
26 pub operator_id: String,
28 pub data: Vec<u8>,
30}
31
32impl From<OperatorState> for SerializableOperatorState {
33 fn from(state: OperatorState) -> Self {
34 Self {
35 operator_id: state.operator_id,
36 data: state.data,
37 }
38 }
39}
40
41impl From<SerializableOperatorState> for OperatorState {
42 fn from(state: SerializableOperatorState) -> Self {
43 Self {
44 operator_id: state.operator_id,
45 data: state.data,
46 }
47 }
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct DagCheckpointSnapshot {
56 pub checkpoint_id: u64,
58 pub epoch: u64,
60 pub timestamp: i64,
62 pub node_states: HashMap<u32, SerializableOperatorState>,
64 pub source_offsets: HashMap<String, u64>,
66 pub watermark: Option<i64>,
68}
69
70impl DagCheckpointSnapshot {
71 pub(crate) fn from_operator_states(
76 checkpoint_id: u64,
77 epoch: u64,
78 timestamp: i64,
79 states: &FxHashMap<NodeId, OperatorState>,
80 ) -> Self {
81 let node_states = states
82 .iter()
83 .map(|(node_id, state)| (node_id.0, SerializableOperatorState::from(state.clone())))
84 .collect();
85
86 Self {
87 checkpoint_id,
88 epoch,
89 timestamp,
90 node_states,
91 source_offsets: HashMap::new(),
92 watermark: None,
93 }
94 }
95
96 #[must_use]
98 pub fn to_operator_states(&self) -> FxHashMap<NodeId, OperatorState> {
99 self.node_states
100 .iter()
101 .map(|(&id, state)| (NodeId(id), OperatorState::from(state.clone())))
102 .collect()
103 }
104}
105
106pub struct RecoveredDagState {
108 pub snapshot: DagCheckpointSnapshot,
110 pub operator_states: FxHashMap<NodeId, OperatorState>,
112 pub source_offsets: HashMap<String, u64>,
114 pub watermark: Option<i64>,
116}
117
118impl std::fmt::Debug for RecoveredDagState {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 f.debug_struct("RecoveredDagState")
121 .field("checkpoint_id", &self.snapshot.checkpoint_id)
122 .field("epoch", &self.snapshot.epoch)
123 .field("operator_count", &self.operator_states.len())
124 .field("source_offsets", &self.source_offsets)
125 .field("watermark", &self.watermark)
126 .finish()
127 }
128}
129
130#[derive(Debug)]
135pub struct DagRecoveryManager {
136 snapshots: Vec<DagCheckpointSnapshot>,
138}
139
140impl DagRecoveryManager {
141 #[must_use]
143 pub fn new() -> Self {
144 Self {
145 snapshots: Vec::new(),
146 }
147 }
148
149 #[must_use]
151 pub fn with_snapshots(snapshots: Vec<DagCheckpointSnapshot>) -> Self {
152 Self { snapshots }
153 }
154
155 pub fn add_snapshot(&mut self, snapshot: DagCheckpointSnapshot) {
157 self.snapshots.push(snapshot);
158 }
159
160 pub fn recover_latest(&self) -> Result<RecoveredDagState, DagError> {
166 let snapshot = self
167 .snapshots
168 .iter()
169 .max_by_key(|s| s.checkpoint_id)
170 .ok_or(DagError::CheckpointNotFound)?
171 .clone();
172
173 Ok(Self::build_recovered_state(snapshot))
174 }
175
176 pub fn recover_by_id(&self, checkpoint_id: u64) -> Result<RecoveredDagState, DagError> {
183 let snapshot = self
184 .snapshots
185 .iter()
186 .find(|s| s.checkpoint_id == checkpoint_id)
187 .ok_or(DagError::CheckpointNotFound)?
188 .clone();
189
190 Ok(Self::build_recovered_state(snapshot))
191 }
192
193 #[must_use]
195 pub fn snapshot_count(&self) -> usize {
196 self.snapshots.len()
197 }
198
199 #[must_use]
201 pub fn has_snapshots(&self) -> bool {
202 !self.snapshots.is_empty()
203 }
204
205 fn build_recovered_state(snapshot: DagCheckpointSnapshot) -> RecoveredDagState {
207 let operator_states = snapshot.to_operator_states();
208 let source_offsets = snapshot.source_offsets.clone();
209 let watermark = snapshot.watermark;
210
211 RecoveredDagState {
212 snapshot,
213 operator_states,
214 source_offsets,
215 watermark,
216 }
217 }
218}
219
220impl Default for DagRecoveryManager {
221 fn default() -> Self {
222 Self::new()
223 }
224}
225
226#[cfg(test)]
227mod tests;