1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
27
28use bytes::Bytes;
29use laminar_core::state::StateBackend;
30use laminar_core::storage::checkpoint_manifest::{CheckpointManifest, SinkCommitStatus};
31use laminar_core::storage::checkpoint_store::CheckpointStore;
32use laminar_core::storage::ValidationResult;
33use tracing::{debug, error, info, warn};
34
35use crate::checkpoint_coordinator::{
36 connector_to_source_checkpoint, RegisteredSink, RegisteredSource,
37};
38use crate::error::DbError;
39
40#[derive(Debug)]
42pub struct RecoveredState {
43 pub manifest: CheckpointManifest,
45 pub sources_restored: usize,
47 pub tables_restored: usize,
49 pub sinks_rolled_back: usize,
51 pub source_errors: HashMap<String, String>,
53 pub sink_errors: HashMap<String, String>,
55}
56
57impl RecoveredState {
58 #[must_use]
60 pub fn epoch(&self) -> u64 {
61 self.manifest.epoch
62 }
63
64 #[must_use]
66 pub fn watermark(&self) -> Option<i64> {
67 self.manifest.watermark
68 }
69
70 #[must_use]
72 pub fn has_errors(&self) -> bool {
73 !self.source_errors.is_empty() || !self.sink_errors.is_empty()
74 }
75
76 #[must_use]
78 pub fn operator_states(
79 &self,
80 ) -> &HashMap<String, laminar_core::storage::checkpoint_manifest::OperatorCheckpoint> {
81 &self.manifest.operator_states
82 }
83
84 #[must_use]
86 pub fn table_store_checkpoint_path(&self) -> Option<&str> {
87 self.manifest.table_store_checkpoint_path.as_deref()
88 }
89}
90
91#[derive(Debug, Default)]
97pub struct VnodeRehydration {
98 pub epoch: Option<u64>,
102 pub restored: HashMap<u32, Bytes>,
105 pub missing: Vec<u32>,
108 pub errors: HashMap<u32, String>,
110}
111
112impl VnodeRehydration {
113 #[must_use]
115 pub fn restored_count(&self) -> usize {
116 self.restored.len()
117 }
118
119 #[must_use]
121 pub fn has_errors(&self) -> bool {
122 !self.errors.is_empty()
123 }
124}
125
126pub struct VnodeRehydrator<'a> {
138 backend: &'a dyn StateBackend,
139}
140
141impl<'a> VnodeRehydrator<'a> {
142 #[must_use]
144 pub fn new(backend: &'a dyn StateBackend) -> Self {
145 Self { backend }
146 }
147
148 pub async fn rehydrate(&self, vnodes: &[u32]) -> VnodeRehydration {
156 let mut report = VnodeRehydration::default();
157 if vnodes.is_empty() {
158 return report;
159 }
160
161 let epoch = match self.backend.latest_committed_epoch().await {
162 Ok(Some(epoch)) => epoch,
163 Ok(None) => {
164 debug!(
165 vnodes = ?vnodes,
166 "rehydrate: no committed epoch on backend — vnodes start fresh"
167 );
168 report.missing = vnodes.to_vec();
169 return report;
170 }
171 Err(e) => {
172 warn!(
173 error = %e,
174 "[LDB-6050] rehydrate: latest_committed_epoch failed — \
175 vnodes start fresh"
176 );
177 report.missing = vnodes.to_vec();
178 return report;
179 }
180 };
181 report.epoch = Some(epoch);
182
183 for &vnode in vnodes {
184 match self.backend.read_partial(vnode, epoch).await {
185 Ok(Some(bytes)) => {
186 let bytes = match crate::vnode_partial::VnodePartial::decode(&bytes) {
192 Ok(p) => match p.base_epoch {
193 Some(base) => match self.backend.read_partial(vnode, base).await {
194 Ok(Some(base_bytes)) => base_bytes,
195 Ok(None) => {
196 warn!(
197 vnode,
198 epoch,
199 base,
200 "[LDB-6052] reference partial's base is missing — \
201 vnode starts fresh"
202 );
203 report.missing.push(vnode);
204 continue;
205 }
206 Err(e) => {
207 warn!(
208 vnode, epoch, base, error = %e,
209 "[LDB-6051] rehydrate: base read failed — vnode starts fresh"
210 );
211 report.errors.insert(vnode, e.to_string());
212 continue;
213 }
214 },
215 None => bytes,
216 },
217 Err(_) => bytes,
218 };
219 debug!(
220 vnode,
221 epoch,
222 bytes = bytes.len(),
223 "rehydrated vnode partial"
224 );
225 report.restored.insert(vnode, bytes);
226 }
227 Ok(None) => {
228 debug!(
229 vnode,
230 epoch, "rehydrate: no partial for vnode at committed epoch"
231 );
232 report.missing.push(vnode);
233 }
234 Err(e) => {
235 warn!(
236 vnode,
237 epoch,
238 error = %e,
239 "[LDB-6051] rehydrate: read_partial failed — vnode starts fresh"
240 );
241 report.errors.insert(vnode, e.to_string());
242 }
243 }
244 }
245
246 info!(
247 epoch,
248 restored = report.restored.len(),
249 missing = report.missing.len(),
250 errors = report.errors.len(),
251 "vnode rehydration complete"
252 );
253 report
254 }
255}
256
257pub struct RecoveryManager<'a> {
263 store: &'a dyn CheckpointStore,
264 strict: bool,
269}
270
271impl<'a> RecoveryManager<'a> {
272 #[must_use]
277 pub fn new(store: &'a dyn CheckpointStore) -> Self {
278 Self {
279 store,
280 strict: true,
281 }
282 }
283
284 #[must_use]
290 pub fn lenient(store: &'a dyn CheckpointStore) -> Self {
291 Self {
292 store,
293 strict: false,
294 }
295 }
296
297 pub(crate) async fn recover(
319 &self,
320 sources: &[RegisteredSource],
321 sinks: &[RegisteredSink],
322 table_sources: &[RegisteredSource],
323 ) -> Result<Option<RecoveredState>, DbError> {
324 match self.store.load_latest().await {
326 Ok(Some(manifest)) => {
327 if self.is_checkpoint_corrupt(&manifest).await {
328 warn!(
329 checkpoint_id = manifest.checkpoint_id,
330 "[LDB-6010] latest checkpoint corrupt, trying fallback"
331 );
332 } else if Self::has_pending_sinks(&manifest) {
333 warn!(
334 checkpoint_id = manifest.checkpoint_id,
335 epoch = manifest.epoch,
336 "[LDB-6015] checkpoint has uncommitted sinks — source offsets \
337 may be past uncommitted data, falling back to previous checkpoint"
338 );
339 } else {
340 let state = self
341 .restore_from(manifest, sources, sinks, table_sources)
342 .await;
343 if let Err(e) = self.check_strict(&state) {
344 warn!(
345 checkpoint_id = state.manifest.checkpoint_id,
346 error = %e,
347 "latest checkpoint restore had strict errors, trying fallback"
348 );
349 } else {
350 return Ok(Some(state));
351 }
352 }
353 }
354 Ok(None) => {
355 info!("no checkpoint found, starting fresh");
356 return Ok(None);
357 }
358 Err(e) => {
359 warn!(error = %e, "latest checkpoint load failed, trying fallback");
360 }
361 }
362
363 let checkpoints = self.store.list().await.map_err(DbError::from)?;
365
366 if checkpoints.is_empty() {
367 warn!("no checkpoints available for fallback, starting fresh");
368 return Ok(None);
369 }
370
371 for &(checkpoint_id, _epoch) in checkpoints.iter().rev() {
372 match self.store.load_by_id(checkpoint_id).await {
373 Ok(Some(manifest)) => {
374 if self.is_checkpoint_corrupt(&manifest).await {
375 warn!(
376 checkpoint_id,
377 "[LDB-6010] fallback checkpoint corrupt, skipping"
378 );
379 continue;
380 }
381 if Self::has_pending_sinks(&manifest) {
382 warn!(
383 checkpoint_id,
384 "[LDB-6015] fallback checkpoint has uncommitted sinks, skipping"
385 );
386 continue;
387 }
388 info!(checkpoint_id, "recovering from fallback checkpoint");
389 let state = self
390 .restore_from(manifest, sources, sinks, table_sources)
391 .await;
392 if let Err(e) = self.check_strict(&state) {
393 warn!(
394 checkpoint_id,
395 error = %e,
396 "fallback checkpoint restore had strict errors, trying next"
397 );
398 continue;
399 }
400 return Ok(Some(state));
401 }
402 Ok(None) => {
403 debug!(checkpoint_id, "fallback checkpoint not found, skipping");
404 }
405 Err(e) => {
406 warn!(
407 checkpoint_id,
408 error = %e,
409 "fallback checkpoint load failed, trying next"
410 );
411 }
412 }
413 }
414
415 warn!("all checkpoints failed to load, starting fresh");
416 Ok(None)
417 }
418
419 async fn resolve_external_states(&self, manifest: &mut CheckpointManifest) -> bool {
430 let external_ops: Vec<String> = manifest
431 .operator_states
432 .iter()
433 .filter(|(_, op)| op.external)
434 .map(|(name, _)| name.clone())
435 .collect();
436
437 if external_ops.is_empty() {
438 return true;
439 }
440
441 let state_data = match self.store.load_state_data(manifest.checkpoint_id).await {
442 Ok(Some(data)) => data,
443 Ok(None) => {
444 error!(
445 checkpoint_id = manifest.checkpoint_id,
446 operators = ?external_ops,
447 "[LDB-6010] sidecar state.bin missing — external operator states \
448 cannot be resolved; operators will start with empty state"
449 );
450 for name in &external_ops {
453 if let Some(op) = manifest.operator_states.get_mut(name) {
454 *op =
455 laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::inline(
456 &[],
457 );
458 }
459 }
460 return false;
461 }
462 Err(e) => {
463 error!(
464 checkpoint_id = manifest.checkpoint_id,
465 error = %e,
466 operators = ?external_ops,
467 "[LDB-6010] failed to load sidecar state.bin — external operator states \
468 cannot be resolved; operators will start with empty state"
469 );
470 for name in &external_ops {
471 if let Some(op) = manifest.operator_states.get_mut(name) {
472 *op =
473 laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::inline(
474 &[],
475 );
476 }
477 }
478 return false;
479 }
480 };
481
482 let mut all_resolved = true;
483 for (name, op) in &mut manifest.operator_states {
484 if op.external {
485 let range = match (
489 usize::try_from(op.external_offset),
490 usize::try_from(op.external_length),
491 ) {
492 (Ok(start), Ok(len)) => start.checked_add(len).map(|end| (start, end)),
493 _ => None,
494 }
495 .filter(|&(_, end)| end <= state_data.len());
496 if let Some((start, end)) = range {
497 let external_offset = op.external_offset;
498 let external_length = op.external_length;
499 let data = &state_data[start..end];
500 *op = laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::inline(
501 data,
502 );
503 debug!(
504 operator = %name,
505 offset = external_offset,
506 length = external_length,
507 "resolved external operator state from sidecar"
508 );
509 } else {
510 error!(
511 operator = %name,
512 offset = op.external_offset,
513 length = op.external_length,
514 sidecar_len = state_data.len(),
515 "[LDB-6010] sidecar too small or offset/length out of \
516 range for external operator state — operator will \
517 start with empty state"
518 );
519 *op =
520 laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
521 all_resolved = false;
522 }
523 }
524 }
525 all_resolved
526 }
527
528 #[allow(clippy::too_many_lines)]
533 async fn restore_from(
534 &self,
535 mut manifest: CheckpointManifest,
536 sources: &[RegisteredSource],
537 sinks: &[RegisteredSink],
538 table_sources: &[RegisteredSource],
539 ) -> RecoveredState {
540 let sidecar_ok = self.resolve_external_states(&mut manifest).await;
544 if !sidecar_ok && self.strict {
545 warn!(
546 checkpoint_id = manifest.checkpoint_id,
547 "[LDB-6010] sidecar resolution failed in strict mode — \
548 checkpoint will be rejected"
549 );
550 }
551
552 let validation_errors =
556 manifest.validate(laminar_core::storage::checkpoint_manifest::DEFAULT_VNODE_COUNT);
557 if !validation_errors.is_empty() {
558 for err in &validation_errors {
559 warn!(
560 checkpoint_id = manifest.checkpoint_id,
561 error = %err,
562 "manifest validation warning"
563 );
564 }
565 }
566
567 if !manifest.source_names.is_empty() {
570 let mut current_sources: Vec<&str> = sources.iter().map(|s| s.name.as_str()).collect();
571 current_sources.sort_unstable();
572 let checkpoint_sources: Vec<&str> =
573 manifest.source_names.iter().map(String::as_str).collect();
574 let added: Vec<&&str> = current_sources
575 .iter()
576 .filter(|n| !checkpoint_sources.contains(n))
577 .collect();
578 let removed: Vec<&&str> = checkpoint_sources
579 .iter()
580 .filter(|n| !current_sources.contains(n))
581 .collect();
582 if !added.is_empty() {
583 warn!(
584 sources = ?added,
585 "new sources added since checkpoint — no saved offsets"
586 );
587 }
588 if !removed.is_empty() {
589 warn!(
590 sources = ?removed,
591 "sources removed since checkpoint — orphaned offsets"
592 );
593 }
594 }
595 if !manifest.sink_names.is_empty() {
596 let mut current_sinks: Vec<&str> = sinks.iter().map(|s| s.name.as_str()).collect();
597 current_sinks.sort_unstable();
598 let checkpoint_sinks: Vec<&str> =
599 manifest.sink_names.iter().map(String::as_str).collect();
600 let added: Vec<&&str> = current_sinks
601 .iter()
602 .filter(|n| !checkpoint_sinks.contains(n))
603 .collect();
604 let removed: Vec<&&str> = checkpoint_sinks
605 .iter()
606 .filter(|n| !current_sinks.contains(n))
607 .collect();
608 if !added.is_empty() {
609 warn!(
610 sinks = ?added,
611 "new sinks added since checkpoint — no saved epoch"
612 );
613 }
614 if !removed.is_empty() {
615 warn!(
616 sinks = ?removed,
617 "sinks removed since checkpoint — orphaned epochs"
618 );
619 }
620 }
621
622 info!(
623 checkpoint_id = manifest.checkpoint_id,
624 epoch = manifest.epoch,
625 validation_warnings = validation_errors.len(),
626 "recovering from checkpoint"
627 );
628
629 let mut result = RecoveredState {
630 manifest: manifest.clone(),
631 sources_restored: 0,
632 tables_restored: 0,
633 sinks_rolled_back: 0,
634 source_errors: HashMap::new(),
635 sink_errors: HashMap::new(),
636 };
637
638 if !sidecar_ok {
640 result.source_errors.insert(
641 "__sidecar__".into(),
642 "[LDB-6010] sidecar state.bin missing or truncated — \
643 operator state cannot be fully restored"
644 .into(),
645 );
646 }
647
648 for source in sources {
650 if !source.supports_replay {
651 info!(
652 source = %source.name,
653 "skipping restore for non-replayable source (at-most-once)"
654 );
655 continue;
656 }
657 if let Some(cp) = manifest.source_offsets.get(&source.name) {
658 let source_cp = connector_to_source_checkpoint(cp);
659 let mut last_err = None;
660 for attempt in 0..3u32 {
661 let mut connector = source.connector.lock().await;
662 match connector.restore(&source_cp).await {
663 Ok(()) => {
664 last_err = None;
665 break;
666 }
667 Err(e) => {
668 warn!(
669 source = %source.name, attempt,
670 error = %e, "source restore failed, retrying"
671 );
672 last_err = Some(e);
673 drop(connector);
674 if attempt < 2 {
675 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
676 }
677 }
678 }
679 }
680 if let Some(e) = last_err {
681 let msg = format!("source restore failed after 3 attempts: {e}");
682 result.source_errors.insert(source.name.clone(), msg);
683 } else {
684 result.sources_restored += 1;
685 debug!(source = %source.name, epoch = cp.epoch, "source restored");
686 }
687 }
688 }
689
690 for table_source in table_sources {
692 if let Some(cp) = manifest.table_offsets.get(&table_source.name) {
693 let source_cp = connector_to_source_checkpoint(cp);
694 let mut connector = table_source.connector.lock().await;
695 match connector.restore(&source_cp).await {
696 Ok(()) => {
697 result.tables_restored += 1;
698 debug!(table = %table_source.name, epoch = cp.epoch, "table source restored");
699 }
700 Err(e) => {
701 let msg = format!("table source restore failed: {e}");
702 warn!(table = %table_source.name, error = %e, "table source restore failed");
703 result.source_errors.insert(table_source.name.clone(), msg);
704 }
705 }
706 }
707 }
708
709 for sink in sinks {
712 if sink.exactly_once {
713 let already_committed = manifest
715 .sink_commit_statuses
716 .get(&sink.name)
717 .is_some_and(|s| matches!(s, SinkCommitStatus::Committed));
718
719 if already_committed {
720 debug!(
721 sink = %sink.name,
722 epoch = manifest.epoch,
723 "sink already committed, skipping rollback"
724 );
725 continue;
726 }
727
728 match sink.handle.rollback_epoch(manifest.epoch).await {
729 Ok(()) => {
730 result.sinks_rolled_back += 1;
731 debug!(sink = %sink.name, epoch = manifest.epoch, "sink rolled back");
732 }
733 Err(e) => {
734 result
735 .sink_errors
736 .insert(sink.name.clone(), format!("rollback failed: {e}"));
737 warn!(
738 sink = %sink.name,
739 epoch = manifest.epoch,
740 error = %e,
741 "[LDB-6016] sink rollback failed during recovery"
742 );
743 }
744 }
745 }
746 }
747
748 info!(
749 checkpoint_id = manifest.checkpoint_id,
750 epoch = manifest.epoch,
751 sources_restored = result.sources_restored,
752 tables_restored = result.tables_restored,
753 sinks_rolled_back = result.sinks_rolled_back,
754 errors = result.source_errors.len() + result.sink_errors.len(),
755 "recovery complete"
756 );
757
758 result
759 }
760
761 async fn is_checkpoint_corrupt(&self, manifest: &CheckpointManifest) -> bool {
778 if manifest.state_checksum.is_none() && manifest.operator_states.is_empty() {
781 return false;
782 }
783 match self.store.validate_checkpoint(manifest.checkpoint_id).await {
784 Ok(ValidationResult {
785 valid: false,
786 ref issues,
787 ..
788 }) => {
789 error!(
790 checkpoint_id = manifest.checkpoint_id,
791 issues = ?issues,
792 "[LDB-6010] checkpoint integrity check failed"
793 );
794 true
795 }
796 Ok(_) => false, Err(e) => {
798 error!(
801 checkpoint_id = manifest.checkpoint_id,
802 error = %e,
803 "[LDB-6010] checkpoint validation I/O error — \
804 treating as corrupt for safety"
805 );
806 true
807 }
808 }
809 }
810
811 fn has_pending_sinks(manifest: &CheckpointManifest) -> bool {
818 manifest
819 .sink_commit_statuses
820 .values()
821 .any(|s| matches!(s, SinkCommitStatus::Pending))
822 }
823
824 fn check_strict(&self, state: &RecoveredState) -> Result<(), DbError> {
825 if !self.strict || !state.has_errors() {
826 return Ok(());
827 }
828 let mut msgs: Vec<String> = state
829 .source_errors
830 .iter()
831 .map(|(k, v)| format!("source '{k}': {v}"))
832 .collect();
833 for (k, v) in &state.sink_errors {
834 msgs.push(format!("sink '{k}': {v}"));
835 }
836 Err(DbError::Checkpoint(format!(
837 "strict recovery failed — {} restore error(s): {}",
838 msgs.len(),
839 msgs.join("; ")
840 )))
841 }
842
843 pub async fn load_latest(&self) -> Result<Option<CheckpointManifest>, DbError> {
851 self.store.load_latest().await.map_err(DbError::from)
852 }
853
854 pub async fn load_by_id(
860 &self,
861 checkpoint_id: u64,
862 ) -> Result<Option<CheckpointManifest>, DbError> {
863 self.store
864 .load_by_id(checkpoint_id)
865 .await
866 .map_err(DbError::from)
867 }
868}
869
870#[cfg(test)]
871mod tests {
872 use super::*;
873 use laminar_core::storage::checkpoint_manifest::OperatorCheckpoint;
874 use laminar_core::storage::checkpoint_store::FileSystemCheckpointStore;
875
876 fn make_store(dir: &std::path::Path) -> FileSystemCheckpointStore {
877 FileSystemCheckpointStore::new(dir, 3)
878 }
879
880 #[tokio::test]
881 async fn test_recover_no_checkpoint() {
882 let dir = tempfile::tempdir().unwrap();
883 let store = make_store(dir.path());
884 let mgr = RecoveryManager::new(&store);
885
886 let result = mgr.recover(&[], &[], &[]).await.unwrap();
887 assert!(result.is_none());
888 }
889
890 #[tokio::test]
891 async fn test_recover_empty_checkpoint() {
892 let dir = tempfile::tempdir().unwrap();
893 let store = make_store(dir.path());
894
895 let manifest = CheckpointManifest::new(1, 5);
897 store.save(&manifest).await.unwrap();
898
899 let mgr = RecoveryManager::new(&store);
900 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
901
902 assert_eq!(result.epoch(), 5);
903 assert_eq!(result.sources_restored, 0);
904 assert_eq!(result.tables_restored, 0);
905 assert_eq!(result.sinks_rolled_back, 0);
906 assert!(!result.has_errors());
907 }
908
909 #[tokio::test]
910 async fn test_recover_with_watermark() {
911 let dir = tempfile::tempdir().unwrap();
912 let store = make_store(dir.path());
913
914 let mut manifest = CheckpointManifest::new(1, 3);
915 manifest.watermark = Some(42_000);
916 store.save(&manifest).await.unwrap();
917
918 let mgr = RecoveryManager::new(&store);
919 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
920
921 assert_eq!(result.watermark(), Some(42_000));
922 }
923
924 #[tokio::test]
925 async fn test_recover_with_operator_states() {
926 let dir = tempfile::tempdir().unwrap();
927 let store = make_store(dir.path());
928
929 let mut manifest = CheckpointManifest::new(1, 7);
930 manifest
931 .operator_states
932 .insert("0".to_string(), OperatorCheckpoint::inline(b"window-state"));
933 manifest
934 .operator_states
935 .insert("3".to_string(), OperatorCheckpoint::inline(b"filter-state"));
936 store.save(&manifest).await.unwrap();
937
938 let mgr = RecoveryManager::new(&store);
939 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
940
941 assert_eq!(result.operator_states().len(), 2);
942 let op0 = result.operator_states().get("0").unwrap();
943 assert_eq!(op0.decode_inline().unwrap(), b"window-state");
944 }
945
946 #[tokio::test]
947 async fn test_recover_table_store_path() {
948 let dir = tempfile::tempdir().unwrap();
949 let store = make_store(dir.path());
950
951 let mut manifest = CheckpointManifest::new(1, 1);
952 manifest.table_store_checkpoint_path = Some("/data/table_store_cp_001".into());
953 store.save(&manifest).await.unwrap();
954
955 let mgr = RecoveryManager::new(&store);
956 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
957
958 assert_eq!(
959 result.table_store_checkpoint_path(),
960 Some("/data/table_store_cp_001")
961 );
962 }
963
964 #[tokio::test]
965 async fn test_load_latest_no_checkpoint() {
966 let dir = tempfile::tempdir().unwrap();
967 let store = make_store(dir.path());
968 let mgr = RecoveryManager::new(&store);
969
970 assert!(mgr.load_latest().await.unwrap().is_none());
971 }
972
973 #[tokio::test]
974 async fn test_load_by_id() {
975 let dir = tempfile::tempdir().unwrap();
976 let store = make_store(dir.path());
977
978 store.save(&CheckpointManifest::new(1, 1)).await.unwrap();
979 store.save(&CheckpointManifest::new(2, 2)).await.unwrap();
980
981 let mgr = RecoveryManager::new(&store);
982 let m = mgr.load_by_id(1).await.unwrap().unwrap();
983 assert_eq!(m.checkpoint_id, 1);
984
985 let m2 = mgr.load_by_id(2).await.unwrap().unwrap();
986 assert_eq!(m2.checkpoint_id, 2);
987
988 assert!(mgr.load_by_id(999).await.unwrap().is_none());
989 }
990
991 #[tokio::test]
992 async fn test_recover_fallback_to_previous_checkpoint() {
993 let dir = tempfile::tempdir().unwrap();
994 let store = FileSystemCheckpointStore::new(dir.path(), 10);
995
996 let mut m1 = CheckpointManifest::new(1, 10);
998 m1.watermark = Some(1000);
999 store.save(&m1).await.unwrap();
1000
1001 let mut m2 = CheckpointManifest::new(2, 20);
1002 m2.watermark = Some(2000);
1003 store.save(&m2).await.unwrap();
1004
1005 let latest_manifest_path = dir
1007 .path()
1008 .join("checkpoints")
1009 .join("checkpoint_000002")
1010 .join("manifest.json");
1011 std::fs::write(&latest_manifest_path, "not valid json!!!").unwrap();
1012
1013 let mgr = RecoveryManager::new(&store);
1017 let result = mgr.recover(&[], &[], &[]).await.unwrap();
1018
1019 let recovered = result.expect("should recover from fallback checkpoint");
1021 assert_eq!(recovered.manifest.checkpoint_id, 1);
1022 assert_eq!(recovered.epoch(), 10);
1023 assert_eq!(recovered.watermark(), Some(1000));
1024 }
1025
1026 #[tokio::test]
1027 async fn test_recover_all_checkpoints_corrupt_starts_fresh() {
1028 let dir = tempfile::tempdir().unwrap();
1029 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1030
1031 store.save(&CheckpointManifest::new(1, 5)).await.unwrap();
1033
1034 let manifest_path = dir
1035 .path()
1036 .join("checkpoints")
1037 .join("checkpoint_000001")
1038 .join("manifest.json");
1039 std::fs::write(&manifest_path, "corrupt").unwrap();
1040
1041 let mgr = RecoveryManager::new(&store);
1042 let result = mgr.recover(&[], &[], &[]).await.unwrap();
1043
1044 assert!(result.is_none());
1046 }
1047
1048 #[tokio::test]
1049 async fn test_recover_latest_ok_no_fallback_needed() {
1050 let dir = tempfile::tempdir().unwrap();
1051 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1052
1053 store.save(&CheckpointManifest::new(1, 10)).await.unwrap();
1054 store.save(&CheckpointManifest::new(2, 20)).await.unwrap();
1055
1056 let mgr = RecoveryManager::new(&store);
1057 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
1058
1059 assert_eq!(result.manifest.checkpoint_id, 2);
1061 assert_eq!(result.epoch(), 20);
1062 }
1063
1064 #[tokio::test]
1065 async fn test_recover_with_sidecar_state() {
1066 let dir = tempfile::tempdir().unwrap();
1067 let store = make_store(dir.path());
1068
1069 let mut manifest = CheckpointManifest::new(1, 5);
1071 let large_data = vec![0xAB; 2048];
1072 manifest
1073 .operator_states
1074 .insert("big-op".into(), OperatorCheckpoint::external(0, 2048));
1075
1076 store
1078 .save_state_data(1, &[bytes::Bytes::copy_from_slice(&large_data)])
1079 .await
1080 .unwrap();
1081 store.save(&manifest).await.unwrap();
1082
1083 let mgr = RecoveryManager::new(&store);
1084 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
1085
1086 let op = result.operator_states().get("big-op").unwrap();
1088 assert!(!op.external, "external state should be resolved to inline");
1089 assert_eq!(op.decode_inline().unwrap(), large_data);
1090 }
1091
1092 #[tokio::test]
1093 async fn test_recover_mixed_inline_and_external() {
1094 let dir = tempfile::tempdir().unwrap();
1095 let store = make_store(dir.path());
1096
1097 let mut manifest = CheckpointManifest::new(1, 3);
1098 manifest
1100 .operator_states
1101 .insert("small-op".into(), OperatorCheckpoint::inline(b"tiny"));
1102 let large_data = vec![0xCD; 4096];
1104 manifest
1105 .operator_states
1106 .insert("big-op".into(), OperatorCheckpoint::external(0, 4096));
1107
1108 store
1109 .save_state_data(1, &[bytes::Bytes::copy_from_slice(&large_data)])
1110 .await
1111 .unwrap();
1112 store.save(&manifest).await.unwrap();
1113
1114 let mgr = RecoveryManager::new(&store);
1115 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
1116
1117 let small = result.operator_states().get("small-op").unwrap();
1118 assert_eq!(small.decode_inline().unwrap(), b"tiny");
1119
1120 let big = result.operator_states().get("big-op").unwrap();
1121 assert_eq!(big.decode_inline().unwrap(), large_data);
1122 }
1123
1124 #[tokio::test]
1125 async fn test_recover_missing_sidecar_graceful() {
1126 let dir = tempfile::tempdir().unwrap();
1127 let store = make_store(dir.path());
1128
1129 let mut manifest = CheckpointManifest::new(1, 1);
1131 manifest
1132 .operator_states
1133 .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
1134 store.save(&manifest).await.unwrap();
1135
1136 let mgr = RecoveryManager::lenient(&store);
1140 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
1141
1142 let op = result.operator_states().get("orphan").unwrap();
1145 assert!(
1146 !op.external,
1147 "unresolved external state replaced with inline empty"
1148 );
1149 assert!(
1150 op.state_b64.as_ref().is_none_or(String::is_empty),
1151 "replaced state should be empty"
1152 );
1153 }
1154
1155 #[tokio::test]
1156 async fn test_recovered_state_has_errors() {
1157 let state = RecoveredState {
1158 manifest: CheckpointManifest::new(1, 1),
1159 sources_restored: 0,
1160 tables_restored: 0,
1161 sinks_rolled_back: 0,
1162 source_errors: HashMap::new(),
1163 sink_errors: HashMap::new(),
1164 };
1165 assert!(!state.has_errors());
1166
1167 let state_with_errors = RecoveredState {
1168 manifest: CheckpointManifest::new(1, 1),
1169 sources_restored: 0,
1170 tables_restored: 0,
1171 sinks_rolled_back: 0,
1172 source_errors: HashMap::from([("source1".into(), "failed".into())]),
1173 sink_errors: HashMap::new(),
1174 };
1175 assert!(state_with_errors.has_errors());
1176 }
1177
1178 #[tokio::test]
1179 async fn test_recover_missing_sidecar_strict_rejects() {
1180 let dir = tempfile::tempdir().unwrap();
1181 let store = make_store(dir.path());
1182
1183 let mut manifest = CheckpointManifest::new(1, 1);
1185 manifest
1186 .operator_states
1187 .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
1188 store.save(&manifest).await.unwrap();
1189
1190 let mgr = RecoveryManager::new(&store);
1194 let result = mgr.recover(&[], &[], &[]).await.unwrap();
1195 assert!(
1196 result.is_none(),
1197 "strict mode should reject checkpoint with missing sidecar"
1198 );
1199 }
1200
1201 #[tokio::test]
1202 async fn test_recover_skips_pending_sinks_falls_back() {
1203 let dir = tempfile::tempdir().unwrap();
1204 let store = make_store(dir.path());
1205
1206 let mut m1 = CheckpointManifest::new(1, 1);
1208 m1.sink_commit_statuses
1209 .insert("delta_sink".into(), SinkCommitStatus::Committed);
1210 store.save(&m1).await.unwrap();
1211
1212 let mut m2 = CheckpointManifest::new(2, 2);
1214 m2.sink_commit_statuses
1215 .insert("delta_sink".into(), SinkCommitStatus::Pending);
1216 store.save(&m2).await.unwrap();
1217
1218 let mgr = RecoveryManager::new(&store);
1219 let result = mgr.recover(&[], &[], &[]).await.unwrap();
1220 let state = result.expect("should recover from epoch 1 fallback");
1221
1222 assert_eq!(
1225 state.epoch(),
1226 1,
1227 "recovery must skip checkpoint with Pending sinks"
1228 );
1229 }
1230
1231 #[tokio::test]
1232 async fn test_recover_all_pending_starts_fresh() {
1233 let dir = tempfile::tempdir().unwrap();
1234 let store = make_store(dir.path());
1235
1236 let mut m = CheckpointManifest::new(1, 1);
1238 m.sink_commit_statuses
1239 .insert("sink".into(), SinkCommitStatus::Pending);
1240 store.save(&m).await.unwrap();
1241
1242 let mgr = RecoveryManager::new(&store);
1243 let result = mgr.recover(&[], &[], &[]).await.unwrap();
1244 assert!(
1245 result.is_none(),
1246 "should start fresh when all checkpoints have pending sinks"
1247 );
1248 }
1249}
1250
1251#[cfg(test)]
1252mod rehydration_tests {
1253 use super::*;
1254 use bytes::Bytes;
1255 use laminar_core::state::{InProcessBackend, ObjectStoreBackend};
1256
1257 async fn seal_epoch(backend: &dyn StateBackend, epoch: u64, vnodes: &[u32], tag: &[u8]) {
1258 for &v in vnodes {
1259 backend
1260 .write_partial(v, epoch, 0, Bytes::copy_from_slice(tag))
1261 .await
1262 .unwrap();
1263 }
1264 assert!(backend.epoch_complete(epoch, vnodes).await.unwrap());
1265 }
1266
1267 #[tokio::test]
1268 async fn rehydrate_reads_committed_partials() {
1269 let backend = InProcessBackend::new(4);
1270 seal_epoch(&backend, 7, &[0, 1, 2], b"v7").await;
1271
1272 let report = VnodeRehydrator::new(&backend).rehydrate(&[0, 1, 3]).await;
1273
1274 assert_eq!(report.epoch, Some(7));
1275 assert_eq!(report.restored_count(), 2);
1276 assert_eq!(report.restored.get(&0).map(|b| &b[..]), Some(&b"v7"[..]));
1277 assert_eq!(report.restored.get(&1).map(|b| &b[..]), Some(&b"v7"[..]));
1278 assert_eq!(report.missing, vec![3]);
1280 assert!(!report.has_errors());
1281 }
1282
1283 #[tokio::test]
1284 async fn rehydrate_reads_latest_committed_epoch() {
1285 let backend = InProcessBackend::new(4);
1286 seal_epoch(&backend, 3, &[0, 1], b"old").await;
1287 seal_epoch(&backend, 9, &[0, 1], b"new").await;
1288
1289 let report = VnodeRehydrator::new(&backend).rehydrate(&[0, 1]).await;
1290
1291 assert_eq!(report.epoch, Some(9), "must read the highest sealed epoch");
1292 assert_eq!(report.restored.get(&0).map(|b| &b[..]), Some(&b"new"[..]));
1293 }
1294
1295 #[tokio::test]
1298 async fn rehydrate_resolves_reference_partials() {
1299 let backend = InProcessBackend::new(4);
1300
1301 let full = crate::vnode_partial::VnodePartial {
1302 checkpoint_id: 1,
1303 operators: vec![("agg".into(), vec![1, 2, 3])],
1304 base_epoch: None,
1305 };
1306 backend
1307 .write_partial(0, 5, 0, Bytes::from(full.encode().unwrap()))
1308 .await
1309 .unwrap();
1310 assert!(backend.epoch_complete(5, &[0]).await.unwrap());
1311
1312 let reference = crate::vnode_partial::VnodePartial {
1313 checkpoint_id: 2,
1314 operators: Vec::new(),
1315 base_epoch: Some(5),
1316 };
1317 backend
1318 .write_partial(0, 6, 0, Bytes::from(reference.encode().unwrap()))
1319 .await
1320 .unwrap();
1321 assert!(backend.epoch_complete(6, &[0]).await.unwrap());
1322
1323 let report = VnodeRehydrator::new(&backend).rehydrate(&[0]).await;
1324 assert_eq!(report.epoch, Some(6));
1325 let restored = crate::vnode_partial::VnodePartial::decode(
1326 report.restored.get(&0).expect("vnode restored"),
1327 )
1328 .unwrap();
1329 assert_eq!(
1330 restored.base_epoch, None,
1331 "the resolved partial must be the full base, not the reference",
1332 );
1333 assert_eq!(restored.operators[0].1, vec![1, 2, 3]);
1334 }
1335
1336 #[tokio::test]
1337 async fn rehydrate_no_committed_epoch_is_fresh() {
1338 let backend = InProcessBackend::new(4);
1339 let report = VnodeRehydrator::new(&backend).rehydrate(&[0, 1]).await;
1340 assert_eq!(report.epoch, None);
1341 assert!(report.restored.is_empty());
1342 assert_eq!(report.missing, vec![0, 1]);
1343 }
1344
1345 #[tokio::test]
1346 async fn rehydrate_empty_request_is_noop() {
1347 let backend = InProcessBackend::new(4);
1348 seal_epoch(&backend, 1, &[0], b"x").await;
1349 let report = VnodeRehydrator::new(&backend).rehydrate(&[]).await;
1350 assert_eq!(report.epoch, None);
1351 assert!(report.restored.is_empty());
1352 assert!(report.missing.is_empty());
1353 }
1354
1355 #[tokio::test]
1356 async fn rehydrate_over_object_store_backend() {
1357 use object_store::local::LocalFileSystem;
1358 use object_store::ObjectStore;
1359
1360 let dir = tempfile::tempdir().unwrap();
1361 let store: std::sync::Arc<dyn ObjectStore> =
1362 std::sync::Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
1363 let backend = ObjectStoreBackend::new(store, "node-0", 4);
1364 seal_epoch(&backend, 5, &[0, 1], b"durable").await;
1365
1366 let report = VnodeRehydrator::new(&backend).rehydrate(&[0, 1]).await;
1367
1368 assert_eq!(report.epoch, Some(5));
1369 assert_eq!(report.restored_count(), 2);
1370 assert_eq!(
1371 report.restored.get(&1).map(|b| &b[..]),
1372 Some(&b"durable"[..])
1373 );
1374 }
1375}