laminar_storage/checkpoint/
layout.rs1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
31use std::fmt;
32
33use serde::{Deserialize, Serialize};
34use uuid::Uuid;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
42pub struct CheckpointId(Uuid);
43
44impl CheckpointId {
45 #[must_use]
47 pub fn now() -> Self {
48 Self(Uuid::now_v7())
49 }
50
51 #[must_use]
53 pub const fn from_uuid(uuid: Uuid) -> Self {
54 Self(uuid)
55 }
56
57 #[must_use]
59 pub const fn as_uuid(&self) -> Uuid {
60 self.0
61 }
62
63 #[must_use]
65 pub fn to_string_id(&self) -> String {
66 self.0.to_string()
67 }
68}
69
70impl fmt::Display for CheckpointId {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 write!(f, "{}", self.0)
73 }
74}
75
76#[derive(Debug, Clone)]
81pub struct CheckpointPaths {
82 pub(crate) base_prefix: String,
84}
85
86impl CheckpointPaths {
87 #[must_use]
91 pub fn new(base_prefix: &str) -> Self {
92 let base_prefix = if base_prefix.ends_with('/') {
93 base_prefix.to_string()
94 } else {
95 format!("{base_prefix}/")
96 };
97 Self { base_prefix }
98 }
99
100 #[must_use]
102 pub fn latest_pointer(&self) -> String {
103 format!("{}_{}", self.base_prefix, "latest")
104 }
105
106 #[must_use]
108 pub fn checkpoint_dir(&self, id: &CheckpointId) -> String {
109 format!("{}{}/", self.base_prefix, id)
110 }
111
112 #[must_use]
114 pub fn manifest(&self, id: &CheckpointId) -> String {
115 format!("{}{}manifest.json", self.base_prefix, id)
116 }
117
118 #[must_use]
120 pub fn snapshot(&self, id: &CheckpointId, operator: &str, partition: u32) -> String {
121 format!(
122 "{}{}operators/{}/partition-{partition}.snap",
123 self.base_prefix, id, operator
124 )
125 }
126
127 #[must_use]
129 pub fn delta(&self, id: &CheckpointId, operator: &str, partition: u32) -> String {
130 format!(
131 "{}{}operators/{}/partition-{partition}.delta",
132 self.base_prefix, id, operator
133 )
134 }
135
136 #[must_use]
138 pub fn source_offset(&self, id: &CheckpointId, source_name: &str) -> String {
139 format!("{}{}offsets/{source_name}.json", self.base_prefix, id)
140 }
141}
142
143impl Default for CheckpointPaths {
144 fn default() -> Self {
145 Self::new("checkpoints/")
146 }
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
155pub struct CheckpointManifestV2 {
156 pub version: u32,
158 pub checkpoint_id: CheckpointId,
160 pub epoch: u64,
162 pub timestamp_ms: u64,
164
165 #[serde(default)]
167 pub operators: HashMap<String, OperatorSnapshotEntry>,
168
169 #[serde(default)]
171 pub source_offsets: HashMap<String, SourceOffsetEntry>,
172
173 #[serde(default)]
175 pub parent_id: Option<CheckpointId>,
176
177 #[serde(default)]
179 pub watermark: Option<i64>,
180
181 #[serde(default)]
183 pub total_size_bytes: u64,
184}
185
186impl CheckpointManifestV2 {
187 #[must_use]
189 #[allow(clippy::cast_possible_truncation)]
190 pub fn new(checkpoint_id: CheckpointId, epoch: u64) -> Self {
191 let timestamp_ms = std::time::SystemTime::now()
192 .duration_since(std::time::UNIX_EPOCH)
193 .unwrap_or_default()
194 .as_millis() as u64;
195
196 Self {
197 version: 2,
198 checkpoint_id,
199 epoch,
200 timestamp_ms,
201 operators: HashMap::new(),
202 source_offsets: HashMap::new(),
203 parent_id: None,
204 watermark: None,
205 total_size_bytes: 0,
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
212pub struct OperatorSnapshotEntry {
213 pub partitions: Vec<PartitionSnapshotEntry>,
215 #[serde(default)]
217 pub total_bytes: u64,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
222pub struct PartitionSnapshotEntry {
223 pub partition_id: u32,
225 pub is_delta: bool,
227 pub path: String,
229 pub size_bytes: u64,
231 #[serde(default)]
233 pub sha256: Option<String>,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
238pub struct SourceOffsetEntry {
239 pub source_type: String,
241 pub offsets: HashMap<String, String>,
243 pub epoch: u64,
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250
251 #[test]
252 fn test_checkpoint_id_time_sortable() {
253 let id1 = CheckpointId::now();
254 std::thread::sleep(std::time::Duration::from_millis(2));
256 let id2 = CheckpointId::now();
257
258 assert!(id1 < id2, "UUID v7 should be time-sortable");
260
261 assert!(id1.to_string_id() < id2.to_string_id());
263 }
264
265 #[test]
266 fn test_checkpoint_id_display() {
267 let id = CheckpointId::now();
268 let s = id.to_string();
269 assert_eq!(s.len(), 36);
271 assert_eq!(s.chars().filter(|c| *c == '-').count(), 4);
272 }
273
274 #[test]
275 fn test_checkpoint_paths() {
276 let paths = CheckpointPaths::new("s3://my-bucket/checkpoints");
277 let id = CheckpointId::now();
278
279 let latest = paths.latest_pointer();
280 assert!(latest.ends_with("_latest"));
281
282 let manifest = paths.manifest(&id);
283 assert!(manifest.ends_with("manifest.json"));
284 assert!(manifest.contains(&id.to_string()));
285
286 let snap = paths.snapshot(&id, "window-agg", 3);
287 assert!(snap.contains("operators/window-agg/"));
288 assert!(snap.ends_with("partition-3.snap"));
289
290 let delta = paths.delta(&id, "window-agg", 3);
291 assert!(delta.ends_with("partition-3.delta"));
292
293 let offset = paths.source_offset(&id, "kafka-trades");
294 assert!(offset.ends_with("kafka-trades.json"));
295 }
296
297 #[test]
298 fn test_checkpoint_paths_trailing_slash() {
299 let paths1 = CheckpointPaths::new("prefix/");
300 let paths2 = CheckpointPaths::new("prefix");
301 let id = CheckpointId::now();
302
303 assert_eq!(paths1.manifest(&id), paths2.manifest(&id));
305 }
306
307 #[test]
308 fn test_manifest_v2_json_round_trip() {
309 let id = CheckpointId::now();
310 let mut manifest = CheckpointManifestV2::new(id, 10);
311 manifest.watermark = Some(5000);
312 manifest.parent_id = Some(CheckpointId::now());
313
314 manifest.operators.insert(
315 "window-agg".into(),
316 OperatorSnapshotEntry {
317 partitions: vec![
318 PartitionSnapshotEntry {
319 partition_id: 0,
320 is_delta: false,
321 path: "operators/window-agg/partition-0.snap".into(),
322 size_bytes: 1024,
323 sha256: Some("abcd1234".into()),
324 },
325 PartitionSnapshotEntry {
326 partition_id: 1,
327 is_delta: true,
328 path: "operators/window-agg/partition-1.delta".into(),
329 size_bytes: 256,
330 sha256: None,
331 },
332 ],
333 total_bytes: 1280,
334 },
335 );
336
337 manifest.source_offsets.insert(
338 "kafka-trades".into(),
339 SourceOffsetEntry {
340 source_type: "kafka".into(),
341 offsets: HashMap::from([
342 ("partition-0".into(), "1234".into()),
343 ("partition-1".into(), "5678".into()),
344 ]),
345 epoch: 10,
346 },
347 );
348
349 let json = serde_json::to_string_pretty(&manifest).unwrap();
350 let restored: CheckpointManifestV2 = serde_json::from_str(&json).unwrap();
351
352 assert_eq!(restored.version, 2);
353 assert_eq!(restored.checkpoint_id, id);
354 assert_eq!(restored.epoch, 10);
355 assert_eq!(restored.watermark, Some(5000));
356 assert!(restored.parent_id.is_some());
357
358 let op = restored.operators.get("window-agg").unwrap();
359 assert_eq!(op.partitions.len(), 2);
360 assert_eq!(op.total_bytes, 1280);
361
362 let src = restored.source_offsets.get("kafka-trades").unwrap();
363 assert_eq!(src.source_type, "kafka");
364 assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
365 }
366
367 #[test]
368 fn test_manifest_v2_backward_compat_missing_fields() {
369 let id = CheckpointId::now();
370 let json = format!(
371 r#"{{
372 "version": 2,
373 "checkpoint_id": "{id}",
374 "epoch": 1,
375 "timestamp_ms": 1000
376 }}"#
377 );
378
379 let manifest: CheckpointManifestV2 = serde_json::from_str(&json).unwrap();
380 assert_eq!(manifest.version, 2);
381 assert!(manifest.operators.is_empty());
382 assert!(manifest.source_offsets.is_empty());
383 assert!(manifest.parent_id.is_none());
384 assert!(manifest.watermark.is_none());
385 }
386}