1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
27
28use laminar_storage::checkpoint_manifest::{CheckpointManifest, SinkCommitStatus};
29use laminar_storage::checkpoint_store::CheckpointStore;
30use laminar_storage::ValidationResult;
31use tracing::{debug, error, info, warn};
32
33use crate::checkpoint_coordinator::{
34 connector_to_source_checkpoint, RegisteredSink, RegisteredSource,
35};
36use crate::error::DbError;
37
38#[derive(Debug)]
40pub struct RecoveredState {
41 pub manifest: CheckpointManifest,
43 pub sources_restored: usize,
45 pub tables_restored: usize,
47 pub sinks_rolled_back: usize,
49 pub source_errors: HashMap<String, String>,
51 pub sink_errors: HashMap<String, String>,
53}
54
55impl RecoveredState {
56 #[must_use]
58 pub fn epoch(&self) -> u64 {
59 self.manifest.epoch
60 }
61
62 #[must_use]
64 pub fn watermark(&self) -> Option<i64> {
65 self.manifest.watermark
66 }
67
68 #[must_use]
70 pub fn has_errors(&self) -> bool {
71 !self.source_errors.is_empty() || !self.sink_errors.is_empty()
72 }
73
74 #[must_use]
76 pub fn operator_states(
77 &self,
78 ) -> &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
79 &self.manifest.operator_states
80 }
81
82 #[must_use]
84 pub fn wal_position(&self) -> u64 {
85 self.manifest.wal_position
86 }
87
88 #[must_use]
90 pub fn per_core_wal_positions(&self) -> &[u64] {
91 &self.manifest.per_core_wal_positions
92 }
93
94 #[must_use]
96 pub fn table_store_checkpoint_path(&self) -> Option<&str> {
97 self.manifest.table_store_checkpoint_path.as_deref()
98 }
99
100 #[must_use]
105 pub fn has_inflight_data(&self) -> bool {
106 !self.manifest.inflight_data.is_empty()
107 }
108
109 #[must_use]
113 pub fn inflight_data(
114 &self,
115 ) -> &HashMap<String, Vec<laminar_storage::checkpoint_manifest::InFlightRecord>> {
116 &self.manifest.inflight_data
117 }
118}
119
120pub struct RecoveryManager<'a> {
126 store: &'a dyn CheckpointStore,
127 strict: bool,
132}
133
134impl<'a> RecoveryManager<'a> {
135 #[must_use]
140 pub fn new(store: &'a dyn CheckpointStore) -> Self {
141 Self {
142 store,
143 strict: true,
144 }
145 }
146
147 #[must_use]
153 pub fn lenient(store: &'a dyn CheckpointStore) -> Self {
154 Self {
155 store,
156 strict: false,
157 }
158 }
159
160 pub(crate) async fn recover(
182 &self,
183 sources: &[RegisteredSource],
184 sinks: &[RegisteredSink],
185 table_sources: &[RegisteredSource],
186 ) -> Result<Option<RecoveredState>, DbError> {
187 match self.store.load_latest() {
189 Ok(Some(manifest)) => {
190 if self.is_checkpoint_corrupt(&manifest) {
191 warn!(
192 checkpoint_id = manifest.checkpoint_id,
193 "[LDB-6010] latest checkpoint corrupt, trying fallback"
194 );
195 } else {
196 let state = self
197 .restore_from(manifest, sources, sinks, table_sources)
198 .await;
199 if let Err(e) = self.check_strict(&state) {
200 warn!(
201 checkpoint_id = state.manifest.checkpoint_id,
202 error = %e,
203 "latest checkpoint restore had strict errors, trying fallback"
204 );
205 } else {
206 return Ok(Some(state));
207 }
208 }
209 }
210 Ok(None) => {
211 info!("no checkpoint found, starting fresh");
212 return Ok(None);
213 }
214 Err(e) => {
215 warn!(error = %e, "latest checkpoint load failed, trying fallback");
216 }
217 }
218
219 let checkpoints = self.store.list().map_err(|e| {
221 DbError::Checkpoint(format!("failed to list checkpoints for fallback: {e}"))
222 })?;
223
224 if checkpoints.is_empty() {
225 warn!("no checkpoints available for fallback, starting fresh");
226 return Ok(None);
227 }
228
229 for &(checkpoint_id, _epoch) in checkpoints.iter().rev() {
230 match self.store.load_by_id(checkpoint_id) {
231 Ok(Some(manifest)) => {
232 if self.is_checkpoint_corrupt(&manifest) {
233 warn!(
234 checkpoint_id,
235 "[LDB-6010] fallback checkpoint corrupt, skipping"
236 );
237 continue;
238 }
239 info!(checkpoint_id, "recovering from fallback checkpoint");
240 let state = self
241 .restore_from(manifest, sources, sinks, table_sources)
242 .await;
243 if let Err(e) = self.check_strict(&state) {
244 warn!(
245 checkpoint_id,
246 error = %e,
247 "fallback checkpoint restore had strict errors, trying next"
248 );
249 continue;
250 }
251 return Ok(Some(state));
252 }
253 Ok(None) => {
254 debug!(checkpoint_id, "fallback checkpoint not found, skipping");
255 }
256 Err(e) => {
257 warn!(
258 checkpoint_id,
259 error = %e,
260 "fallback checkpoint load failed, trying next"
261 );
262 }
263 }
264 }
265
266 warn!("all checkpoints failed to load, starting fresh");
267 Ok(None)
268 }
269
270 fn resolve_external_states(&self, manifest: &mut CheckpointManifest) {
276 let external_ops: Vec<String> = manifest
277 .operator_states
278 .iter()
279 .filter(|(_, op)| op.external)
280 .map(|(name, _)| name.clone())
281 .collect();
282
283 if external_ops.is_empty() {
284 return;
285 }
286
287 let state_data = match self.store.load_state_data(manifest.checkpoint_id) {
288 Ok(Some(data)) => data,
289 Ok(None) => {
290 error!(
291 checkpoint_id = manifest.checkpoint_id,
292 operators = ?external_ops,
293 "sidecar state.bin missing — external operator states \
294 cannot be resolved; operators will start with empty state"
295 );
296 for name in &external_ops {
299 if let Some(op) = manifest.operator_states.get_mut(name) {
300 *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
301 }
302 }
303 return;
304 }
305 Err(e) => {
306 error!(
307 checkpoint_id = manifest.checkpoint_id,
308 error = %e,
309 operators = ?external_ops,
310 "failed to load sidecar state.bin — external operator states \
311 cannot be resolved; operators will start with empty state"
312 );
313 for name in &external_ops {
314 if let Some(op) = manifest.operator_states.get_mut(name) {
315 *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
316 }
317 }
318 return;
319 }
320 };
321
322 for (name, op) in &mut manifest.operator_states {
323 if op.external {
324 #[allow(clippy::cast_possible_truncation)] let start = op.external_offset as usize;
326 #[allow(clippy::cast_possible_truncation)]
327 let end = start + op.external_length as usize;
328 if end <= state_data.len() {
329 let external_offset = op.external_offset;
330 let external_length = op.external_length;
331 let data = &state_data[start..end];
332 *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(data);
333 debug!(
334 operator = %name,
335 offset = external_offset,
336 length = external_length,
337 "resolved external operator state from sidecar"
338 );
339 } else {
340 error!(
341 operator = %name,
342 offset = start,
343 length = op.external_length,
344 sidecar_len = state_data.len(),
345 "sidecar too small for external operator state — \
346 operator will start with empty state"
347 );
348 *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
349 }
350 }
351 }
352 }
353
354 #[allow(clippy::too_many_lines)]
359 async fn restore_from(
360 &self,
361 mut manifest: CheckpointManifest,
362 sources: &[RegisteredSource],
363 sinks: &[RegisteredSink],
364 table_sources: &[RegisteredSource],
365 ) -> RecoveredState {
366 self.resolve_external_states(&mut manifest);
368
369 let validation_errors = manifest.validate();
371 if !validation_errors.is_empty() {
372 for err in &validation_errors {
373 warn!(
374 checkpoint_id = manifest.checkpoint_id,
375 error = %err,
376 "manifest validation warning"
377 );
378 }
379 }
380
381 if !manifest.source_names.is_empty() {
384 let mut current_sources: Vec<&str> = sources.iter().map(|s| s.name.as_str()).collect();
385 current_sources.sort_unstable();
386 let checkpoint_sources: Vec<&str> =
387 manifest.source_names.iter().map(String::as_str).collect();
388 let added: Vec<&&str> = current_sources
389 .iter()
390 .filter(|n| !checkpoint_sources.contains(n))
391 .collect();
392 let removed: Vec<&&str> = checkpoint_sources
393 .iter()
394 .filter(|n| !current_sources.contains(n))
395 .collect();
396 if !added.is_empty() {
397 warn!(
398 sources = ?added,
399 "new sources added since checkpoint — no saved offsets"
400 );
401 }
402 if !removed.is_empty() {
403 warn!(
404 sources = ?removed,
405 "sources removed since checkpoint — orphaned offsets"
406 );
407 }
408 }
409 if !manifest.sink_names.is_empty() {
410 let mut current_sinks: Vec<&str> = sinks.iter().map(|s| s.name.as_str()).collect();
411 current_sinks.sort_unstable();
412 let checkpoint_sinks: Vec<&str> =
413 manifest.sink_names.iter().map(String::as_str).collect();
414 let added: Vec<&&str> = current_sinks
415 .iter()
416 .filter(|n| !checkpoint_sinks.contains(n))
417 .collect();
418 let removed: Vec<&&str> = checkpoint_sinks
419 .iter()
420 .filter(|n| !current_sinks.contains(n))
421 .collect();
422 if !added.is_empty() {
423 warn!(
424 sinks = ?added,
425 "new sinks added since checkpoint — no saved epoch"
426 );
427 }
428 if !removed.is_empty() {
429 warn!(
430 sinks = ?removed,
431 "sinks removed since checkpoint — orphaned epochs"
432 );
433 }
434 }
435
436 info!(
437 checkpoint_id = manifest.checkpoint_id,
438 epoch = manifest.epoch,
439 validation_warnings = validation_errors.len(),
440 "recovering from checkpoint"
441 );
442
443 let mut result = RecoveredState {
444 manifest: manifest.clone(),
445 sources_restored: 0,
446 tables_restored: 0,
447 sinks_rolled_back: 0,
448 source_errors: HashMap::new(),
449 sink_errors: HashMap::new(),
450 };
451
452 for source in sources {
454 if !source.supports_replay {
455 info!(
456 source = %source.name,
457 "skipping restore for non-replayable source (at-most-once)"
458 );
459 continue;
460 }
461 if let Some(cp) = manifest.source_offsets.get(&source.name) {
462 let source_cp = connector_to_source_checkpoint(cp);
463 let mut connector = source.connector.lock().await;
464 match connector.restore(&source_cp).await {
465 Ok(()) => {
466 result.sources_restored += 1;
467 debug!(source = %source.name, epoch = cp.epoch, "source restored");
468 }
469 Err(e) => {
470 let msg = format!("source restore failed: {e}");
471 warn!(source = %source.name, error = %e, "source restore failed");
472 result.source_errors.insert(source.name.clone(), msg);
473 }
474 }
475 }
476 }
477
478 for table_source in table_sources {
480 if let Some(cp) = manifest.table_offsets.get(&table_source.name) {
481 let source_cp = connector_to_source_checkpoint(cp);
482 let mut connector = table_source.connector.lock().await;
483 match connector.restore(&source_cp).await {
484 Ok(()) => {
485 result.tables_restored += 1;
486 debug!(table = %table_source.name, epoch = cp.epoch, "table source restored");
487 }
488 Err(e) => {
489 let msg = format!("table source restore failed: {e}");
490 warn!(table = %table_source.name, error = %e, "table source restore failed");
491 result.source_errors.insert(table_source.name.clone(), msg);
492 }
493 }
494 }
495 }
496
497 for sink in sinks {
502 if sink.exactly_once {
503 let already_committed = manifest
505 .sink_commit_statuses
506 .get(&sink.name)
507 .is_some_and(|s| matches!(s, SinkCommitStatus::Committed));
508
509 if already_committed {
510 debug!(
511 sink = %sink.name,
512 epoch = manifest.epoch,
513 "sink already committed, skipping rollback"
514 );
515 continue;
516 }
517
518 sink.handle.rollback_epoch(manifest.epoch).await;
520 result.sinks_rolled_back += 1;
521 debug!(sink = %sink.name, epoch = manifest.epoch, "sink rolled back");
522 }
523 }
524
525 if !manifest.inflight_data.is_empty() {
527 let total_records: usize = manifest.inflight_data.values().map(Vec::len).sum();
528 info!(
529 operators = manifest.inflight_data.len(),
530 total_records, "unaligned checkpoint: inflight data present for replay"
531 );
532 }
533
534 info!(
535 checkpoint_id = manifest.checkpoint_id,
536 epoch = manifest.epoch,
537 sources_restored = result.sources_restored,
538 tables_restored = result.tables_restored,
539 sinks_rolled_back = result.sinks_rolled_back,
540 errors = result.source_errors.len() + result.sink_errors.len(),
541 has_inflight_data = !manifest.inflight_data.is_empty(),
542 "recovery complete"
543 );
544
545 result
546 }
547
548 fn is_checkpoint_corrupt(&self, manifest: &CheckpointManifest) -> bool {
565 if manifest.state_checksum.is_none() && manifest.operator_states.is_empty() {
568 return false;
569 }
570 match self.store.validate_checkpoint(manifest.checkpoint_id) {
571 Ok(ValidationResult {
572 valid: false,
573 ref issues,
574 ..
575 }) => {
576 error!(
577 checkpoint_id = manifest.checkpoint_id,
578 issues = ?issues,
579 "[LDB-6010] checkpoint integrity check failed"
580 );
581 true
582 }
583 Ok(_) => false, Err(e) => {
585 error!(
588 checkpoint_id = manifest.checkpoint_id,
589 error = %e,
590 "[LDB-6010] checkpoint validation I/O error — \
591 treating as corrupt for safety"
592 );
593 true
594 }
595 }
596 }
597
598 fn check_strict(&self, state: &RecoveredState) -> Result<(), DbError> {
600 if !self.strict || !state.has_errors() {
601 return Ok(());
602 }
603 let mut msgs: Vec<String> = state
604 .source_errors
605 .iter()
606 .map(|(k, v)| format!("source '{k}': {v}"))
607 .collect();
608 for (k, v) in &state.sink_errors {
609 msgs.push(format!("sink '{k}': {v}"));
610 }
611 Err(DbError::Checkpoint(format!(
612 "strict recovery failed — {} restore error(s): {}",
613 msgs.len(),
614 msgs.join("; ")
615 )))
616 }
617
618 pub fn load_latest(&self) -> Result<Option<CheckpointManifest>, DbError> {
626 self.store
627 .load_latest()
628 .map_err(|e| DbError::Checkpoint(format!("failed to load checkpoint: {e}")))
629 }
630
631 pub fn load_by_id(&self, checkpoint_id: u64) -> Result<Option<CheckpointManifest>, DbError> {
637 self.store.load_by_id(checkpoint_id).map_err(|e| {
638 DbError::Checkpoint(format!("failed to load checkpoint {checkpoint_id}: {e}"))
639 })
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646 use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
647 use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
648
649 fn make_store(dir: &std::path::Path) -> FileSystemCheckpointStore {
650 FileSystemCheckpointStore::new(dir, 3)
651 }
652
653 #[tokio::test]
654 async fn test_recover_no_checkpoint() {
655 let dir = tempfile::tempdir().unwrap();
656 let store = make_store(dir.path());
657 let mgr = RecoveryManager::new(&store);
658
659 let result = mgr.recover(&[], &[], &[]).await.unwrap();
660 assert!(result.is_none());
661 }
662
663 #[tokio::test]
664 async fn test_recover_empty_checkpoint() {
665 let dir = tempfile::tempdir().unwrap();
666 let store = make_store(dir.path());
667
668 let manifest = CheckpointManifest::new(1, 5);
670 store.save(&manifest).unwrap();
671
672 let mgr = RecoveryManager::new(&store);
673 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
674
675 assert_eq!(result.epoch(), 5);
676 assert_eq!(result.sources_restored, 0);
677 assert_eq!(result.tables_restored, 0);
678 assert_eq!(result.sinks_rolled_back, 0);
679 assert!(!result.has_errors());
680 }
681
682 #[tokio::test]
683 async fn test_recover_with_watermark() {
684 let dir = tempfile::tempdir().unwrap();
685 let store = make_store(dir.path());
686
687 let mut manifest = CheckpointManifest::new(1, 3);
688 manifest.watermark = Some(42_000);
689 store.save(&manifest).unwrap();
690
691 let mgr = RecoveryManager::new(&store);
692 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
693
694 assert_eq!(result.watermark(), Some(42_000));
695 }
696
697 #[tokio::test]
698 async fn test_recover_with_operator_states() {
699 let dir = tempfile::tempdir().unwrap();
700 let store = make_store(dir.path());
701
702 let mut manifest = CheckpointManifest::new(1, 7);
703 manifest
704 .operator_states
705 .insert("0".to_string(), OperatorCheckpoint::inline(b"window-state"));
706 manifest
707 .operator_states
708 .insert("3".to_string(), OperatorCheckpoint::inline(b"filter-state"));
709 store.save(&manifest).unwrap();
710
711 let mgr = RecoveryManager::new(&store);
712 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
713
714 assert_eq!(result.operator_states().len(), 2);
715 let op0 = result.operator_states().get("0").unwrap();
716 assert_eq!(op0.decode_inline().unwrap(), b"window-state");
717 }
718
719 #[tokio::test]
720 async fn test_recover_wal_positions() {
721 let dir = tempfile::tempdir().unwrap();
722 let store = make_store(dir.path());
723
724 let mut manifest = CheckpointManifest::new(1, 2);
725 manifest.wal_position = 4096;
726 manifest.per_core_wal_positions = vec![100, 200, 300];
727 store.save(&manifest).unwrap();
728
729 let mgr = RecoveryManager::new(&store);
730 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
731
732 assert_eq!(result.wal_position(), 4096);
733 assert_eq!(result.per_core_wal_positions(), &[100, 200, 300]);
734 }
735
736 #[tokio::test]
737 async fn test_recover_table_store_path() {
738 let dir = tempfile::tempdir().unwrap();
739 let store = make_store(dir.path());
740
741 let mut manifest = CheckpointManifest::new(1, 1);
742 manifest.table_store_checkpoint_path = Some("/data/rocksdb_cp_001".into());
743 store.save(&manifest).unwrap();
744
745 let mgr = RecoveryManager::new(&store);
746 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
747
748 assert_eq!(
749 result.table_store_checkpoint_path(),
750 Some("/data/rocksdb_cp_001")
751 );
752 }
753
754 #[test]
755 fn test_load_latest_no_checkpoint() {
756 let dir = tempfile::tempdir().unwrap();
757 let store = make_store(dir.path());
758 let mgr = RecoveryManager::new(&store);
759
760 assert!(mgr.load_latest().unwrap().is_none());
761 }
762
763 #[test]
764 fn test_load_by_id() {
765 let dir = tempfile::tempdir().unwrap();
766 let store = make_store(dir.path());
767
768 store.save(&CheckpointManifest::new(1, 1)).unwrap();
769 store.save(&CheckpointManifest::new(2, 2)).unwrap();
770
771 let mgr = RecoveryManager::new(&store);
772 let m = mgr.load_by_id(1).unwrap().unwrap();
773 assert_eq!(m.checkpoint_id, 1);
774
775 let m2 = mgr.load_by_id(2).unwrap().unwrap();
776 assert_eq!(m2.checkpoint_id, 2);
777
778 assert!(mgr.load_by_id(999).unwrap().is_none());
779 }
780
781 #[tokio::test]
782 async fn test_recover_fallback_to_previous_checkpoint() {
783 let dir = tempfile::tempdir().unwrap();
784 let store = FileSystemCheckpointStore::new(dir.path(), 10);
785
786 let mut m1 = CheckpointManifest::new(1, 10);
788 m1.watermark = Some(1000);
789 store.save(&m1).unwrap();
790
791 let mut m2 = CheckpointManifest::new(2, 20);
792 m2.watermark = Some(2000);
793 store.save(&m2).unwrap();
794
795 let latest_manifest_path = dir
797 .path()
798 .join("checkpoints")
799 .join("checkpoint_000002")
800 .join("manifest.json");
801 std::fs::write(&latest_manifest_path, "not valid json!!!").unwrap();
802
803 let mgr = RecoveryManager::new(&store);
807 let result = mgr.recover(&[], &[], &[]).await.unwrap();
808
809 let recovered = result.expect("should recover from fallback checkpoint");
811 assert_eq!(recovered.manifest.checkpoint_id, 1);
812 assert_eq!(recovered.epoch(), 10);
813 assert_eq!(recovered.watermark(), Some(1000));
814 }
815
816 #[tokio::test]
817 async fn test_recover_all_checkpoints_corrupt_starts_fresh() {
818 let dir = tempfile::tempdir().unwrap();
819 let store = FileSystemCheckpointStore::new(dir.path(), 10);
820
821 store.save(&CheckpointManifest::new(1, 5)).unwrap();
823
824 let manifest_path = dir
825 .path()
826 .join("checkpoints")
827 .join("checkpoint_000001")
828 .join("manifest.json");
829 std::fs::write(&manifest_path, "corrupt").unwrap();
830
831 let mgr = RecoveryManager::new(&store);
832 let result = mgr.recover(&[], &[], &[]).await.unwrap();
833
834 assert!(result.is_none());
836 }
837
838 #[tokio::test]
839 async fn test_recover_latest_ok_no_fallback_needed() {
840 let dir = tempfile::tempdir().unwrap();
841 let store = FileSystemCheckpointStore::new(dir.path(), 10);
842
843 store.save(&CheckpointManifest::new(1, 10)).unwrap();
844 store.save(&CheckpointManifest::new(2, 20)).unwrap();
845
846 let mgr = RecoveryManager::new(&store);
847 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
848
849 assert_eq!(result.manifest.checkpoint_id, 2);
851 assert_eq!(result.epoch(), 20);
852 }
853
854 #[tokio::test]
855 async fn test_recover_with_sidecar_state() {
856 let dir = tempfile::tempdir().unwrap();
857 let store = make_store(dir.path());
858
859 let mut manifest = CheckpointManifest::new(1, 5);
861 let large_data = vec![0xAB; 2048];
862 manifest
863 .operator_states
864 .insert("big-op".into(), OperatorCheckpoint::external(0, 2048));
865
866 store.save_state_data(1, &large_data).unwrap();
868 store.save(&manifest).unwrap();
869
870 let mgr = RecoveryManager::new(&store);
871 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
872
873 let op = result.operator_states().get("big-op").unwrap();
875 assert!(!op.external, "external state should be resolved to inline");
876 assert_eq!(op.decode_inline().unwrap(), large_data);
877 }
878
879 #[tokio::test]
880 async fn test_recover_mixed_inline_and_external() {
881 let dir = tempfile::tempdir().unwrap();
882 let store = make_store(dir.path());
883
884 let mut manifest = CheckpointManifest::new(1, 3);
885 manifest
887 .operator_states
888 .insert("small-op".into(), OperatorCheckpoint::inline(b"tiny"));
889 let large_data = vec![0xCD; 4096];
891 manifest
892 .operator_states
893 .insert("big-op".into(), OperatorCheckpoint::external(0, 4096));
894
895 store.save_state_data(1, &large_data).unwrap();
896 store.save(&manifest).unwrap();
897
898 let mgr = RecoveryManager::new(&store);
899 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
900
901 let small = result.operator_states().get("small-op").unwrap();
902 assert_eq!(small.decode_inline().unwrap(), b"tiny");
903
904 let big = result.operator_states().get("big-op").unwrap();
905 assert_eq!(big.decode_inline().unwrap(), large_data);
906 }
907
908 #[tokio::test]
909 async fn test_recover_with_inflight_data() {
910 use laminar_storage::checkpoint_manifest::InFlightRecord;
911
912 let dir = tempfile::tempdir().unwrap();
913 let store = make_store(dir.path());
914
915 let mut manifest = CheckpointManifest::new(1, 5);
916 let record = InFlightRecord {
918 input_id: 0,
919 data_b64: "aW5mbGlnaHQtZXZlbnQ=".into(),
920 };
921 manifest
922 .inflight_data
923 .insert("join-op".into(), vec![record]);
924 store.save(&manifest).unwrap();
925
926 let mgr = RecoveryManager::new(&store);
927 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
928
929 assert!(result.has_inflight_data());
930 let inflight = result.inflight_data();
931 assert_eq!(inflight.len(), 1);
932 let records = inflight.get("join-op").unwrap();
933 assert_eq!(records.len(), 1);
934 assert_eq!(records[0].input_id, 0);
935 assert_eq!(records[0].data_b64, "aW5mbGlnaHQtZXZlbnQ=");
936 }
937
938 #[tokio::test]
939 async fn test_recover_missing_sidecar_graceful() {
940 let dir = tempfile::tempdir().unwrap();
941 let store = make_store(dir.path());
942
943 let mut manifest = CheckpointManifest::new(1, 1);
945 manifest
946 .operator_states
947 .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
948 store.save(&manifest).unwrap();
949
950 let mgr = RecoveryManager::new(&store);
951 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
952
953 let op = result.operator_states().get("orphan").unwrap();
956 assert!(
957 !op.external,
958 "unresolved external state replaced with inline empty"
959 );
960 assert!(
961 op.state_b64.as_ref().is_none_or(String::is_empty),
962 "replaced state should be empty"
963 );
964 }
965
966 #[tokio::test]
967 async fn test_recovered_state_has_errors() {
968 let state = RecoveredState {
969 manifest: CheckpointManifest::new(1, 1),
970 sources_restored: 0,
971 tables_restored: 0,
972 sinks_rolled_back: 0,
973 source_errors: HashMap::new(),
974 sink_errors: HashMap::new(),
975 };
976 assert!(!state.has_errors());
977
978 let state_with_errors = RecoveredState {
979 manifest: CheckpointManifest::new(1, 1),
980 sources_restored: 0,
981 tables_restored: 0,
982 sinks_rolled_back: 0,
983 source_errors: HashMap::from([("source1".into(), "failed".into())]),
984 sink_errors: HashMap::new(),
985 };
986 assert!(state_with_errors.has_errors());
987 }
988}