1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
22use std::fs;
23use std::path::{Path, PathBuf};
24use std::time::Duration;
25
26use laminar_core::state::StateSnapshot;
27use tracing::{debug, info, warn};
28
29use super::error::IncrementalCheckpointError;
30use super::manager::{
31 CheckpointConfig, IncrementalCheckpointManager, IncrementalCheckpointMetadata,
32};
33use crate::wal::{WalEntry, WalReadResult, WriteAheadLog};
34
35struct WalReplayResult {
37 entries_replayed: u64,
39 final_position: u64,
41 source_offsets: HashMap<String, u64>,
43 watermark: Option<i64>,
45 state_changes: Vec<(Vec<u8>, Option<Vec<u8>>)>,
47}
48
49#[derive(Debug)]
51pub struct RecoveredState {
52 pub epoch: u64,
54 pub state_snapshot: Option<StateSnapshot>,
56 pub wal_position: u64,
58 pub wal_entries_replayed: u64,
60 pub source_offsets: HashMap<String, u64>,
62 pub watermark: Option<i64>,
64 pub checkpoint_id: Option<u64>,
66 pub state_changes: Vec<(Vec<u8>, Option<Vec<u8>>)>,
68}
69
70impl RecoveredState {
71 #[must_use]
73 pub fn empty() -> Self {
74 Self {
75 epoch: 0,
76 state_snapshot: None,
77 wal_position: 0,
78 wal_entries_replayed: 0,
79 source_offsets: HashMap::new(),
80 watermark: None,
81 checkpoint_id: None,
82 state_changes: Vec::new(),
83 }
84 }
85
86 #[must_use]
88 pub fn has_state(&self) -> bool {
89 self.state_snapshot.is_some() || !self.state_changes.is_empty()
90 }
91}
92
93#[derive(Debug, Clone)]
95pub struct RecoveryConfig {
96 pub checkpoint_dir: PathBuf,
98 pub wal_path: PathBuf,
100 pub repair_wal: bool,
102 pub collect_state_changes: bool,
104 pub max_wal_entries: u64,
106}
107
108impl RecoveryConfig {
109 #[must_use]
111 pub fn new(checkpoint_dir: &Path, wal_path: &Path) -> Self {
112 Self {
113 checkpoint_dir: checkpoint_dir.to_path_buf(),
114 wal_path: wal_path.to_path_buf(),
115 repair_wal: true,
116 collect_state_changes: false,
117 max_wal_entries: 0,
118 }
119 }
120
121 #[must_use]
123 pub fn with_repair_wal(mut self, enabled: bool) -> Self {
124 self.repair_wal = enabled;
125 self
126 }
127
128 #[must_use]
130 pub fn with_collect_state_changes(mut self, enabled: bool) -> Self {
131 self.collect_state_changes = enabled;
132 self
133 }
134
135 #[must_use]
137 pub fn with_max_wal_entries(mut self, max: u64) -> Self {
138 self.max_wal_entries = max;
139 self
140 }
141}
142
143pub struct RecoveryManager {
145 config: RecoveryConfig,
147}
148
149impl RecoveryManager {
150 #[must_use]
152 pub fn new(config: RecoveryConfig) -> Self {
153 Self { config }
154 }
155
156 pub fn recover(&self) -> Result<RecoveredState, IncrementalCheckpointError> {
167 info!(
168 checkpoint_dir = %self.config.checkpoint_dir.display(),
169 wal_path = %self.config.wal_path.display(),
170 "Starting recovery"
171 );
172
173 let mut result = RecoveredState::empty();
174
175 let checkpoint_config = CheckpointConfig::new(&self.config.checkpoint_dir);
177 let manager = IncrementalCheckpointManager::new(checkpoint_config)?;
178
179 if let Some(checkpoint) = manager.find_latest_checkpoint()? {
180 result = Self::load_checkpoint(&manager, &checkpoint);
181 info!(
182 checkpoint_id = checkpoint.id,
183 epoch = checkpoint.epoch,
184 wal_position = checkpoint.wal_position,
185 "Loaded checkpoint"
186 );
187 } else {
188 debug!("No checkpoint found, starting from WAL beginning");
189 }
190
191 if self.config.wal_path.exists() {
193 let wal_result = self.replay_wal(result.wal_position)?;
194 result.wal_entries_replayed = wal_result.entries_replayed;
195 result.wal_position = wal_result.final_position;
196
197 for (source, offset) in wal_result.source_offsets {
199 result.source_offsets.insert(source, offset);
200 }
201 if wal_result.watermark.is_some() {
202 result.watermark = wal_result.watermark;
203 }
204
205 if self.config.collect_state_changes {
207 result.state_changes = wal_result.state_changes;
208 }
209
210 info!(
211 entries_replayed = wal_result.entries_replayed,
212 final_position = wal_result.final_position,
213 "WAL replay complete"
214 );
215 }
216
217 Ok(result)
218 }
219
220 fn load_checkpoint(
222 manager: &IncrementalCheckpointManager,
223 checkpoint: &IncrementalCheckpointMetadata,
224 ) -> RecoveredState {
225 let mut result = RecoveredState::empty();
226 result.checkpoint_id = Some(checkpoint.id);
227 result.epoch = checkpoint.epoch;
228 result.wal_position = checkpoint.wal_position;
229 result.source_offsets.clone_from(&checkpoint.source_offsets);
230 result.watermark = checkpoint.watermark;
231
232 if let Ok(state_data) = manager.load_checkpoint_state(checkpoint.id) {
234 match StateSnapshot::from_bytes(&state_data) {
235 Ok(snapshot) => {
236 result.state_snapshot = Some(snapshot);
237 }
238 Err(e) => {
239 warn!(
240 checkpoint_id = checkpoint.id,
241 error = %e,
242 "Failed to deserialize state snapshot"
243 );
244 }
245 }
246 }
247
248 result
249 }
250
251 fn replay_wal(
253 &self,
254 start_position: u64,
255 ) -> Result<WalReplayResult, IncrementalCheckpointError> {
256 let mut wal = WriteAheadLog::new(&self.config.wal_path, Duration::from_millis(100))
257 .map_err(|e| IncrementalCheckpointError::Wal(e.to_string()))?;
258
259 if self.config.repair_wal {
261 if let Err(e) = wal.repair() {
262 warn!(error = %e, "WAL repair failed, continuing anyway");
263 }
264 }
265
266 let mut reader = wal
267 .read_from(start_position)
268 .map_err(|e| IncrementalCheckpointError::Wal(e.to_string()))?;
269
270 let mut result = WalReplayResult {
271 entries_replayed: 0,
272 final_position: start_position,
273 source_offsets: HashMap::new(),
274 watermark: None,
275 state_changes: Vec::new(),
276 };
277
278 let max_entries = if self.config.max_wal_entries > 0 {
279 self.config.max_wal_entries
280 } else {
281 u64::MAX
282 };
283
284 loop {
285 if result.entries_replayed >= max_entries {
286 debug!(max_entries, "Reached max WAL entries limit");
287 break;
288 }
289
290 match reader.read_next() {
291 Ok(WalReadResult::Entry(entry)) => {
292 result.final_position = reader.position();
293 result.entries_replayed += 1;
294
295 match entry {
296 WalEntry::Put { key, value } => {
297 if self.config.collect_state_changes {
298 result.state_changes.push((key, Some(value)));
299 }
300 }
301 WalEntry::Delete { key } => {
302 if self.config.collect_state_changes {
303 result.state_changes.push((key, None));
304 }
305 }
306 WalEntry::Commit { offsets, watermark } => {
307 for (source, offset) in offsets {
308 result.source_offsets.insert(source, offset);
309 }
310 if watermark.is_some() {
311 result.watermark = watermark;
312 }
313 }
314 WalEntry::Checkpoint { id } => {
315 debug!(checkpoint_id = id, "Skipping checkpoint marker in WAL");
316 }
317 }
318 }
319 Ok(WalReadResult::Eof) => {
320 debug!("Reached end of WAL");
321 break;
322 }
323 Ok(WalReadResult::TornWrite { position, reason }) => {
324 warn!(position, reason, "Torn write detected, stopping replay");
325 break;
326 }
327 Ok(WalReadResult::ChecksumMismatch { position, .. }) => {
328 warn!(position, "CRC mismatch detected, stopping replay");
329 break;
330 }
331 Ok(WalReadResult::Corrupted { position, reason }) => {
332 warn!(
333 position,
334 reason, "[LDB-6006] Corrupted WAL entry detected, stopping replay"
335 );
336 break;
337 }
338 Err(e) => {
339 return Err(IncrementalCheckpointError::Wal(format!(
340 "WAL read error: {e}"
341 )));
342 }
343 }
344 }
345
346 Ok(result)
347 }
348
349 pub fn recover_simple(
355 checkpoint_dir: &Path,
356 wal_path: &Path,
357 ) -> Result<RecoveredState, IncrementalCheckpointError> {
358 let config = RecoveryConfig::new(checkpoint_dir, wal_path);
359 let manager = RecoveryManager::new(config);
360 manager.recover()
361 }
362
363 pub fn recover_with_changes(
369 checkpoint_dir: &Path,
370 wal_path: &Path,
371 ) -> Result<RecoveredState, IncrementalCheckpointError> {
372 let config = RecoveryConfig::new(checkpoint_dir, wal_path).with_collect_state_changes(true);
373 let manager = RecoveryManager::new(config);
374 manager.recover()
375 }
376}
377
378pub fn validate_checkpoint(
384 checkpoint_dir: &Path,
385) -> Result<IncrementalCheckpointMetadata, IncrementalCheckpointError> {
386 let metadata_path = checkpoint_dir.join("metadata.json");
387
388 if !metadata_path.exists() {
389 return Err(IncrementalCheckpointError::NotFound(
390 "metadata.json not found".to_string(),
391 ));
392 }
393
394 let metadata_json = fs::read_to_string(&metadata_path)?;
395 let metadata = IncrementalCheckpointMetadata::from_json(&metadata_json)?;
396
397 let state_path = checkpoint_dir.join("state.bin");
399 if state_path.exists() {
400 let state_data = fs::read(&state_path)?;
401 StateSnapshot::from_bytes(&state_data)
402 .map_err(|e| IncrementalCheckpointError::Corruption(e.to_string()))?;
403 }
404
405 Ok(metadata)
406}
407
408pub fn wal_size(wal_path: &Path) -> Result<u64, IncrementalCheckpointError> {
414 if !wal_path.exists() {
415 return Ok(0);
416 }
417
418 let metadata = fs::metadata(wal_path)?;
419 Ok(metadata.len())
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use std::time::Duration;
426 use tempfile::TempDir;
427
428 #[test]
429 fn test_recovered_state_empty() {
430 let state = RecoveredState::empty();
431 assert_eq!(state.epoch, 0);
432 assert!(state.state_snapshot.is_none());
433 assert!(state.source_offsets.is_empty());
434 assert!(!state.has_state());
435 }
436
437 #[test]
438 fn test_recovery_config() {
439 let config = RecoveryConfig::new(Path::new("/checkpoints"), Path::new("/wal.log"))
440 .with_repair_wal(true)
441 .with_collect_state_changes(true)
442 .with_max_wal_entries(1000);
443
444 assert!(config.repair_wal);
445 assert!(config.collect_state_changes);
446 assert_eq!(config.max_wal_entries, 1000);
447 }
448
449 #[test]
450 fn test_recovery_no_checkpoint_no_wal() {
451 let temp_dir = TempDir::new().unwrap();
452 let checkpoint_dir = temp_dir.path().join("checkpoints");
453 let wal_path = temp_dir.path().join("wal.log");
454
455 fs::create_dir_all(&checkpoint_dir).unwrap();
456
457 let config = RecoveryConfig::new(&checkpoint_dir, &wal_path);
458 let manager = RecoveryManager::new(config);
459
460 let result = manager.recover().unwrap();
461 assert_eq!(result.epoch, 0);
462 assert!(result.state_snapshot.is_none());
463 assert!(result.checkpoint_id.is_none());
464 assert_eq!(result.wal_entries_replayed, 0);
465 }
466
467 #[test]
468 fn test_recovery_with_checkpoint_only() {
469 let temp_dir = TempDir::new().unwrap();
470 let checkpoint_dir = temp_dir.path().join("checkpoints");
471 let wal_path = temp_dir.path().join("wal.log");
472
473 let config = CheckpointConfig::new(&checkpoint_dir);
475 let mut ckpt_manager = IncrementalCheckpointManager::new(config).unwrap();
476 ckpt_manager.set_epoch(42);
477
478 let mut offsets = HashMap::new();
479 offsets.insert("source1".to_string(), 100);
480
481 let state_data = StateSnapshot::new(vec![
482 (b"key1".to_vec(), b"value1".to_vec()),
483 (b"key2".to_vec(), b"value2".to_vec()),
484 ])
485 .to_bytes()
486 .unwrap();
487
488 let metadata = ckpt_manager
489 .create_checkpoint_with_state(500, offsets, Some(5000), &state_data)
490 .unwrap();
491
492 let recovery_config = RecoveryConfig::new(&checkpoint_dir, &wal_path);
494 let recovery_manager = RecoveryManager::new(recovery_config);
495
496 let result = recovery_manager.recover().unwrap();
497 assert_eq!(result.epoch, 42);
498 assert_eq!(result.checkpoint_id, Some(metadata.id));
499 assert_eq!(result.wal_position, 500);
500 assert_eq!(result.watermark, Some(5000));
501 assert_eq!(result.source_offsets.get("source1"), Some(&100));
502 assert!(result.state_snapshot.is_some());
503
504 let snapshot = result.state_snapshot.unwrap();
505 assert_eq!(snapshot.len(), 2);
506 }
507
508 #[test]
509 fn test_recovery_with_wal_only() {
510 let temp_dir = TempDir::new().unwrap();
511 let checkpoint_dir = temp_dir.path().join("checkpoints");
512 let wal_path = temp_dir.path().join("wal.log");
513
514 fs::create_dir_all(&checkpoint_dir).unwrap();
515
516 {
518 let mut wal = WriteAheadLog::new(&wal_path, Duration::from_millis(10)).unwrap();
519 wal.set_sync_on_write(true);
520
521 wal.append(&WalEntry::Put {
522 key: b"key1".to_vec(),
523 value: b"value1".to_vec(),
524 })
525 .unwrap();
526
527 wal.append(&WalEntry::Put {
528 key: b"key2".to_vec(),
529 value: b"value2".to_vec(),
530 })
531 .unwrap();
532
533 let mut offsets = HashMap::new();
534 offsets.insert("source1".to_string(), 50);
535 wal.append(&WalEntry::Commit {
536 offsets,
537 watermark: Some(1000),
538 })
539 .unwrap();
540
541 wal.sync().unwrap();
542 }
543
544 let config =
546 RecoveryConfig::new(&checkpoint_dir, &wal_path).with_collect_state_changes(true);
547 let manager = RecoveryManager::new(config);
548
549 let result = manager.recover().unwrap();
550 assert!(result.checkpoint_id.is_none());
551 assert_eq!(result.wal_entries_replayed, 3);
552 assert_eq!(result.watermark, Some(1000));
553 assert_eq!(result.source_offsets.get("source1"), Some(&50));
554 assert_eq!(result.state_changes.len(), 2);
555 }
556
557 #[test]
558 fn test_recovery_checkpoint_plus_wal() {
559 let temp_dir = TempDir::new().unwrap();
560 let checkpoint_dir = temp_dir.path().join("checkpoints");
561 let wal_path = temp_dir.path().join("wal.log");
562
563 let config = CheckpointConfig::new(&checkpoint_dir);
565 let mut ckpt_manager = IncrementalCheckpointManager::new(config).unwrap();
566 ckpt_manager.set_epoch(10);
567
568 let state_data = StateSnapshot::new(vec![(b"key1".to_vec(), b"value1".to_vec())])
569 .to_bytes()
570 .unwrap();
571
572 ckpt_manager
573 .create_checkpoint_with_state(0, HashMap::new(), Some(1000), &state_data)
574 .unwrap();
575
576 {
578 let mut wal = WriteAheadLog::new(&wal_path, Duration::from_millis(10)).unwrap();
579 wal.set_sync_on_write(true);
580
581 wal.append(&WalEntry::Put {
582 key: b"key2".to_vec(),
583 value: b"value2".to_vec(),
584 })
585 .unwrap();
586
587 wal.append(&WalEntry::Delete {
588 key: b"key1".to_vec(),
589 })
590 .unwrap();
591
592 let mut offsets = HashMap::new();
593 offsets.insert("source1".to_string(), 100);
594 wal.append(&WalEntry::Commit {
595 offsets,
596 watermark: Some(2000),
597 })
598 .unwrap();
599
600 wal.sync().unwrap();
601 }
602
603 let recovery_config =
605 RecoveryConfig::new(&checkpoint_dir, &wal_path).with_collect_state_changes(true);
606 let recovery_manager = RecoveryManager::new(recovery_config);
607
608 let result = recovery_manager.recover().unwrap();
609 assert!(result.checkpoint_id.is_some());
610 assert_eq!(result.wal_entries_replayed, 3);
611 assert_eq!(result.watermark, Some(2000)); assert_eq!(result.source_offsets.get("source1"), Some(&100));
613 assert!(result.state_snapshot.is_some());
614 assert_eq!(result.state_changes.len(), 2);
615 }
616
617 #[test]
618 fn test_validate_checkpoint() {
619 let temp_dir = TempDir::new().unwrap();
620 let checkpoint_dir = temp_dir.path().join("checkpoints");
621
622 let config = CheckpointConfig::new(&checkpoint_dir);
624 let mut manager = IncrementalCheckpointManager::new(config).unwrap();
625 manager.set_epoch(5);
626
627 let state_data = StateSnapshot::new(vec![(b"key".to_vec(), b"value".to_vec())])
628 .to_bytes()
629 .unwrap();
630
631 let metadata = manager
632 .create_checkpoint_with_state(100, HashMap::new(), None, &state_data)
633 .unwrap();
634
635 let checkpoint_path = metadata.checkpoint_path(&checkpoint_dir);
637 let validated = validate_checkpoint(&checkpoint_path).unwrap();
638 assert_eq!(validated.id, metadata.id);
639 assert_eq!(validated.epoch, 5);
640 }
641
642 #[test]
643 fn test_wal_size() {
644 let temp_dir = TempDir::new().unwrap();
645 let wal_path = temp_dir.path().join("wal.log");
646
647 assert_eq!(wal_size(&wal_path).unwrap(), 0);
649
650 {
652 let mut wal = WriteAheadLog::new(&wal_path, Duration::from_millis(10)).unwrap();
653 wal.append(&WalEntry::Put {
654 key: b"key".to_vec(),
655 value: b"value".to_vec(),
656 })
657 .unwrap();
658 wal.sync().unwrap();
659 }
660
661 let size = wal_size(&wal_path).unwrap();
662 assert!(size > 0);
663 }
664
665 #[test]
666 fn test_recovery_max_entries() {
667 let temp_dir = TempDir::new().unwrap();
668 let checkpoint_dir = temp_dir.path().join("checkpoints");
669 let wal_path = temp_dir.path().join("wal.log");
670
671 fs::create_dir_all(&checkpoint_dir).unwrap();
672
673 {
675 let mut wal = WriteAheadLog::new(&wal_path, Duration::from_millis(10)).unwrap();
676 wal.set_sync_on_write(true);
677
678 for i in 0..100 {
679 wal.append(&WalEntry::Put {
680 key: format!("key{i}").into_bytes(),
681 value: format!("value{i}").into_bytes(),
682 })
683 .unwrap();
684 }
685 wal.sync().unwrap();
686 }
687
688 let config = RecoveryConfig::new(&checkpoint_dir, &wal_path).with_max_wal_entries(10);
690 let manager = RecoveryManager::new(config);
691
692 let result = manager.recover().unwrap();
693 assert_eq!(result.wal_entries_replayed, 10);
694 }
695}