1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
27
28use laminar_storage::checkpoint_manifest::{CheckpointManifest, SinkCommitStatus};
29use laminar_storage::checkpoint_store::CheckpointStore;
30use laminar_storage::ValidationResult;
31use tracing::{debug, error, info, warn};
32
33use crate::checkpoint_coordinator::{
34 connector_to_source_checkpoint, RegisteredSink, RegisteredSource,
35};
36use crate::error::DbError;
37
38#[derive(Debug)]
40pub struct RecoveredState {
41 pub manifest: CheckpointManifest,
43 pub sources_restored: usize,
45 pub tables_restored: usize,
47 pub sinks_rolled_back: usize,
49 pub source_errors: HashMap<String, String>,
51 pub sink_errors: HashMap<String, String>,
53}
54
55impl RecoveredState {
56 #[must_use]
58 pub fn epoch(&self) -> u64 {
59 self.manifest.epoch
60 }
61
62 #[must_use]
64 pub fn watermark(&self) -> Option<i64> {
65 self.manifest.watermark
66 }
67
68 #[must_use]
70 pub fn has_errors(&self) -> bool {
71 !self.source_errors.is_empty() || !self.sink_errors.is_empty()
72 }
73
74 #[must_use]
76 pub fn operator_states(
77 &self,
78 ) -> &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
79 &self.manifest.operator_states
80 }
81
82 #[must_use]
84 pub fn table_store_checkpoint_path(&self) -> Option<&str> {
85 self.manifest.table_store_checkpoint_path.as_deref()
86 }
87}
88
89pub struct RecoveryManager<'a> {
95 store: &'a dyn CheckpointStore,
96 strict: bool,
101}
102
103impl<'a> RecoveryManager<'a> {
104 #[must_use]
109 pub fn new(store: &'a dyn CheckpointStore) -> Self {
110 Self {
111 store,
112 strict: true,
113 }
114 }
115
116 #[must_use]
122 pub fn lenient(store: &'a dyn CheckpointStore) -> Self {
123 Self {
124 store,
125 strict: false,
126 }
127 }
128
129 pub(crate) async fn recover(
151 &self,
152 sources: &[RegisteredSource],
153 sinks: &[RegisteredSink],
154 table_sources: &[RegisteredSource],
155 ) -> Result<Option<RecoveredState>, DbError> {
156 match self.store.load_latest().await {
158 Ok(Some(manifest)) => {
159 if self.is_checkpoint_corrupt(&manifest).await {
160 warn!(
161 checkpoint_id = manifest.checkpoint_id,
162 "[LDB-6010] latest checkpoint corrupt, trying fallback"
163 );
164 } else if Self::has_pending_sinks(&manifest) {
165 warn!(
166 checkpoint_id = manifest.checkpoint_id,
167 epoch = manifest.epoch,
168 "[LDB-6015] checkpoint has uncommitted sinks — source offsets \
169 may be past uncommitted data, falling back to previous checkpoint"
170 );
171 } else {
172 let state = self
173 .restore_from(manifest, sources, sinks, table_sources)
174 .await;
175 if let Err(e) = self.check_strict(&state) {
176 warn!(
177 checkpoint_id = state.manifest.checkpoint_id,
178 error = %e,
179 "latest checkpoint restore had strict errors, trying fallback"
180 );
181 } else {
182 return Ok(Some(state));
183 }
184 }
185 }
186 Ok(None) => {
187 info!("no checkpoint found, starting fresh");
188 return Ok(None);
189 }
190 Err(e) => {
191 warn!(error = %e, "latest checkpoint load failed, trying fallback");
192 }
193 }
194
195 let checkpoints = self.store.list().await.map_err(DbError::from)?;
197
198 if checkpoints.is_empty() {
199 warn!("no checkpoints available for fallback, starting fresh");
200 return Ok(None);
201 }
202
203 for &(checkpoint_id, _epoch) in checkpoints.iter().rev() {
204 match self.store.load_by_id(checkpoint_id).await {
205 Ok(Some(manifest)) => {
206 if self.is_checkpoint_corrupt(&manifest).await {
207 warn!(
208 checkpoint_id,
209 "[LDB-6010] fallback checkpoint corrupt, skipping"
210 );
211 continue;
212 }
213 if Self::has_pending_sinks(&manifest) {
214 warn!(
215 checkpoint_id,
216 "[LDB-6015] fallback checkpoint has uncommitted sinks, skipping"
217 );
218 continue;
219 }
220 info!(checkpoint_id, "recovering from fallback checkpoint");
221 let state = self
222 .restore_from(manifest, sources, sinks, table_sources)
223 .await;
224 if let Err(e) = self.check_strict(&state) {
225 warn!(
226 checkpoint_id,
227 error = %e,
228 "fallback checkpoint restore had strict errors, trying next"
229 );
230 continue;
231 }
232 return Ok(Some(state));
233 }
234 Ok(None) => {
235 debug!(checkpoint_id, "fallback checkpoint not found, skipping");
236 }
237 Err(e) => {
238 warn!(
239 checkpoint_id,
240 error = %e,
241 "fallback checkpoint load failed, trying next"
242 );
243 }
244 }
245 }
246
247 warn!("all checkpoints failed to load, starting fresh");
248 Ok(None)
249 }
250
251 async fn resolve_external_states(&self, manifest: &mut CheckpointManifest) -> bool {
262 let external_ops: Vec<String> = manifest
263 .operator_states
264 .iter()
265 .filter(|(_, op)| op.external)
266 .map(|(name, _)| name.clone())
267 .collect();
268
269 if external_ops.is_empty() {
270 return true;
271 }
272
273 let state_data = match self.store.load_state_data(manifest.checkpoint_id).await {
274 Ok(Some(data)) => data,
275 Ok(None) => {
276 error!(
277 checkpoint_id = manifest.checkpoint_id,
278 operators = ?external_ops,
279 "[LDB-6010] sidecar state.bin missing — external operator states \
280 cannot be resolved; operators will start with empty state"
281 );
282 for name in &external_ops {
285 if let Some(op) = manifest.operator_states.get_mut(name) {
286 *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
287 }
288 }
289 return false;
290 }
291 Err(e) => {
292 error!(
293 checkpoint_id = manifest.checkpoint_id,
294 error = %e,
295 operators = ?external_ops,
296 "[LDB-6010] failed to load sidecar state.bin — external operator states \
297 cannot be resolved; operators will start with empty state"
298 );
299 for name in &external_ops {
300 if let Some(op) = manifest.operator_states.get_mut(name) {
301 *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
302 }
303 }
304 return false;
305 }
306 };
307
308 let mut all_resolved = true;
309 for (name, op) in &mut manifest.operator_states {
310 if op.external {
311 #[allow(clippy::cast_possible_truncation)] let start = op.external_offset as usize;
313 #[allow(clippy::cast_possible_truncation)]
314 let end = start + op.external_length as usize;
315 if end <= state_data.len() {
316 let external_offset = op.external_offset;
317 let external_length = op.external_length;
318 let data = &state_data[start..end];
319 *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(data);
320 debug!(
321 operator = %name,
322 offset = external_offset,
323 length = external_length,
324 "resolved external operator state from sidecar"
325 );
326 } else {
327 error!(
328 operator = %name,
329 offset = start,
330 length = op.external_length,
331 sidecar_len = state_data.len(),
332 "[LDB-6010] sidecar too small for external operator state — \
333 operator will start with empty state"
334 );
335 *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
336 all_resolved = false;
337 }
338 }
339 }
340 all_resolved
341 }
342
343 #[allow(clippy::too_many_lines)]
348 async fn restore_from(
349 &self,
350 mut manifest: CheckpointManifest,
351 sources: &[RegisteredSource],
352 sinks: &[RegisteredSink],
353 table_sources: &[RegisteredSource],
354 ) -> RecoveredState {
355 let sidecar_ok = self.resolve_external_states(&mut manifest).await;
359 if !sidecar_ok && self.strict {
360 warn!(
361 checkpoint_id = manifest.checkpoint_id,
362 "[LDB-6010] sidecar resolution failed in strict mode — \
363 checkpoint will be rejected"
364 );
365 }
366
367 let validation_errors =
371 manifest.validate(laminar_storage::checkpoint_manifest::DEFAULT_VNODE_COUNT);
372 if !validation_errors.is_empty() {
373 for err in &validation_errors {
374 warn!(
375 checkpoint_id = manifest.checkpoint_id,
376 error = %err,
377 "manifest validation warning"
378 );
379 }
380 }
381
382 if !manifest.source_names.is_empty() {
385 let mut current_sources: Vec<&str> = sources.iter().map(|s| s.name.as_str()).collect();
386 current_sources.sort_unstable();
387 let checkpoint_sources: Vec<&str> =
388 manifest.source_names.iter().map(String::as_str).collect();
389 let added: Vec<&&str> = current_sources
390 .iter()
391 .filter(|n| !checkpoint_sources.contains(n))
392 .collect();
393 let removed: Vec<&&str> = checkpoint_sources
394 .iter()
395 .filter(|n| !current_sources.contains(n))
396 .collect();
397 if !added.is_empty() {
398 warn!(
399 sources = ?added,
400 "new sources added since checkpoint — no saved offsets"
401 );
402 }
403 if !removed.is_empty() {
404 warn!(
405 sources = ?removed,
406 "sources removed since checkpoint — orphaned offsets"
407 );
408 }
409 }
410 if !manifest.sink_names.is_empty() {
411 let mut current_sinks: Vec<&str> = sinks.iter().map(|s| s.name.as_str()).collect();
412 current_sinks.sort_unstable();
413 let checkpoint_sinks: Vec<&str> =
414 manifest.sink_names.iter().map(String::as_str).collect();
415 let added: Vec<&&str> = current_sinks
416 .iter()
417 .filter(|n| !checkpoint_sinks.contains(n))
418 .collect();
419 let removed: Vec<&&str> = checkpoint_sinks
420 .iter()
421 .filter(|n| !current_sinks.contains(n))
422 .collect();
423 if !added.is_empty() {
424 warn!(
425 sinks = ?added,
426 "new sinks added since checkpoint — no saved epoch"
427 );
428 }
429 if !removed.is_empty() {
430 warn!(
431 sinks = ?removed,
432 "sinks removed since checkpoint — orphaned epochs"
433 );
434 }
435 }
436
437 info!(
438 checkpoint_id = manifest.checkpoint_id,
439 epoch = manifest.epoch,
440 validation_warnings = validation_errors.len(),
441 "recovering from checkpoint"
442 );
443
444 let mut result = RecoveredState {
445 manifest: manifest.clone(),
446 sources_restored: 0,
447 tables_restored: 0,
448 sinks_rolled_back: 0,
449 source_errors: HashMap::new(),
450 sink_errors: HashMap::new(),
451 };
452
453 if !sidecar_ok {
455 result.source_errors.insert(
456 "__sidecar__".into(),
457 "[LDB-6010] sidecar state.bin missing or truncated — \
458 operator state cannot be fully restored"
459 .into(),
460 );
461 }
462
463 for source in sources {
465 if !source.supports_replay {
466 info!(
467 source = %source.name,
468 "skipping restore for non-replayable source (at-most-once)"
469 );
470 continue;
471 }
472 if let Some(cp) = manifest.source_offsets.get(&source.name) {
473 let source_cp = connector_to_source_checkpoint(cp);
474 let mut last_err = None;
475 for attempt in 0..3u32 {
476 let mut connector = source.connector.lock().await;
477 match connector.restore(&source_cp).await {
478 Ok(()) => {
479 last_err = None;
480 break;
481 }
482 Err(e) => {
483 warn!(
484 source = %source.name, attempt,
485 error = %e, "source restore failed, retrying"
486 );
487 last_err = Some(e);
488 drop(connector);
489 if attempt < 2 {
490 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
491 }
492 }
493 }
494 }
495 if let Some(e) = last_err {
496 let msg = format!("source restore failed after 3 attempts: {e}");
497 result.source_errors.insert(source.name.clone(), msg);
498 } else {
499 result.sources_restored += 1;
500 debug!(source = %source.name, epoch = cp.epoch, "source restored");
501 }
502 }
503 }
504
505 for table_source in table_sources {
507 if let Some(cp) = manifest.table_offsets.get(&table_source.name) {
508 let source_cp = connector_to_source_checkpoint(cp);
509 let mut connector = table_source.connector.lock().await;
510 match connector.restore(&source_cp).await {
511 Ok(()) => {
512 result.tables_restored += 1;
513 debug!(table = %table_source.name, epoch = cp.epoch, "table source restored");
514 }
515 Err(e) => {
516 let msg = format!("table source restore failed: {e}");
517 warn!(table = %table_source.name, error = %e, "table source restore failed");
518 result.source_errors.insert(table_source.name.clone(), msg);
519 }
520 }
521 }
522 }
523
524 for sink in sinks {
529 if sink.exactly_once {
530 let already_committed = manifest
532 .sink_commit_statuses
533 .get(&sink.name)
534 .is_some_and(|s| matches!(s, SinkCommitStatus::Committed));
535
536 if already_committed {
537 debug!(
538 sink = %sink.name,
539 epoch = manifest.epoch,
540 "sink already committed, skipping rollback"
541 );
542 continue;
543 }
544
545 match sink.handle.rollback_epoch(manifest.epoch).await {
546 Ok(()) => {
547 result.sinks_rolled_back += 1;
548 debug!(sink = %sink.name, epoch = manifest.epoch, "sink rolled back");
549 }
550 Err(e) => {
551 result
552 .sink_errors
553 .insert(sink.name.clone(), format!("rollback failed: {e}"));
554 warn!(
555 sink = %sink.name,
556 epoch = manifest.epoch,
557 error = %e,
558 "[LDB-6016] sink rollback failed during recovery"
559 );
560 }
561 }
562 }
563 }
564
565 info!(
566 checkpoint_id = manifest.checkpoint_id,
567 epoch = manifest.epoch,
568 sources_restored = result.sources_restored,
569 tables_restored = result.tables_restored,
570 sinks_rolled_back = result.sinks_rolled_back,
571 errors = result.source_errors.len() + result.sink_errors.len(),
572 "recovery complete"
573 );
574
575 result
576 }
577
578 async fn is_checkpoint_corrupt(&self, manifest: &CheckpointManifest) -> bool {
595 if manifest.state_checksum.is_none() && manifest.operator_states.is_empty() {
598 return false;
599 }
600 match self.store.validate_checkpoint(manifest.checkpoint_id).await {
601 Ok(ValidationResult {
602 valid: false,
603 ref issues,
604 ..
605 }) => {
606 error!(
607 checkpoint_id = manifest.checkpoint_id,
608 issues = ?issues,
609 "[LDB-6010] checkpoint integrity check failed"
610 );
611 true
612 }
613 Ok(_) => false, Err(e) => {
615 error!(
618 checkpoint_id = manifest.checkpoint_id,
619 error = %e,
620 "[LDB-6010] checkpoint validation I/O error — \
621 treating as corrupt for safety"
622 );
623 true
624 }
625 }
626 }
627
628 fn has_pending_sinks(manifest: &CheckpointManifest) -> bool {
635 manifest
636 .sink_commit_statuses
637 .values()
638 .any(|s| matches!(s, SinkCommitStatus::Pending))
639 }
640
641 fn check_strict(&self, state: &RecoveredState) -> Result<(), DbError> {
642 if !self.strict || !state.has_errors() {
643 return Ok(());
644 }
645 let mut msgs: Vec<String> = state
646 .source_errors
647 .iter()
648 .map(|(k, v)| format!("source '{k}': {v}"))
649 .collect();
650 for (k, v) in &state.sink_errors {
651 msgs.push(format!("sink '{k}': {v}"));
652 }
653 Err(DbError::Checkpoint(format!(
654 "strict recovery failed — {} restore error(s): {}",
655 msgs.len(),
656 msgs.join("; ")
657 )))
658 }
659
660 pub async fn load_latest(&self) -> Result<Option<CheckpointManifest>, DbError> {
668 self.store.load_latest().await.map_err(DbError::from)
669 }
670
671 pub async fn load_by_id(
677 &self,
678 checkpoint_id: u64,
679 ) -> Result<Option<CheckpointManifest>, DbError> {
680 self.store
681 .load_by_id(checkpoint_id)
682 .await
683 .map_err(DbError::from)
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use super::*;
690 use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
691 use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
692
693 fn make_store(dir: &std::path::Path) -> FileSystemCheckpointStore {
694 FileSystemCheckpointStore::new(dir, 3)
695 }
696
697 #[tokio::test]
698 async fn test_recover_no_checkpoint() {
699 let dir = tempfile::tempdir().unwrap();
700 let store = make_store(dir.path());
701 let mgr = RecoveryManager::new(&store);
702
703 let result = mgr.recover(&[], &[], &[]).await.unwrap();
704 assert!(result.is_none());
705 }
706
707 #[tokio::test]
708 async fn test_recover_empty_checkpoint() {
709 let dir = tempfile::tempdir().unwrap();
710 let store = make_store(dir.path());
711
712 let manifest = CheckpointManifest::new(1, 5);
714 store.save(&manifest).await.unwrap();
715
716 let mgr = RecoveryManager::new(&store);
717 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
718
719 assert_eq!(result.epoch(), 5);
720 assert_eq!(result.sources_restored, 0);
721 assert_eq!(result.tables_restored, 0);
722 assert_eq!(result.sinks_rolled_back, 0);
723 assert!(!result.has_errors());
724 }
725
726 #[tokio::test]
727 async fn test_recover_with_watermark() {
728 let dir = tempfile::tempdir().unwrap();
729 let store = make_store(dir.path());
730
731 let mut manifest = CheckpointManifest::new(1, 3);
732 manifest.watermark = Some(42_000);
733 store.save(&manifest).await.unwrap();
734
735 let mgr = RecoveryManager::new(&store);
736 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
737
738 assert_eq!(result.watermark(), Some(42_000));
739 }
740
741 #[tokio::test]
742 async fn test_recover_with_operator_states() {
743 let dir = tempfile::tempdir().unwrap();
744 let store = make_store(dir.path());
745
746 let mut manifest = CheckpointManifest::new(1, 7);
747 manifest
748 .operator_states
749 .insert("0".to_string(), OperatorCheckpoint::inline(b"window-state"));
750 manifest
751 .operator_states
752 .insert("3".to_string(), OperatorCheckpoint::inline(b"filter-state"));
753 store.save(&manifest).await.unwrap();
754
755 let mgr = RecoveryManager::new(&store);
756 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
757
758 assert_eq!(result.operator_states().len(), 2);
759 let op0 = result.operator_states().get("0").unwrap();
760 assert_eq!(op0.decode_inline().unwrap(), b"window-state");
761 }
762
763 #[tokio::test]
764 async fn test_recover_table_store_path() {
765 let dir = tempfile::tempdir().unwrap();
766 let store = make_store(dir.path());
767
768 let mut manifest = CheckpointManifest::new(1, 1);
769 manifest.table_store_checkpoint_path = Some("/data/rocksdb_cp_001".into());
770 store.save(&manifest).await.unwrap();
771
772 let mgr = RecoveryManager::new(&store);
773 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
774
775 assert_eq!(
776 result.table_store_checkpoint_path(),
777 Some("/data/rocksdb_cp_001")
778 );
779 }
780
781 #[tokio::test]
782 async fn test_load_latest_no_checkpoint() {
783 let dir = tempfile::tempdir().unwrap();
784 let store = make_store(dir.path());
785 let mgr = RecoveryManager::new(&store);
786
787 assert!(mgr.load_latest().await.unwrap().is_none());
788 }
789
790 #[tokio::test]
791 async fn test_load_by_id() {
792 let dir = tempfile::tempdir().unwrap();
793 let store = make_store(dir.path());
794
795 store.save(&CheckpointManifest::new(1, 1)).await.unwrap();
796 store.save(&CheckpointManifest::new(2, 2)).await.unwrap();
797
798 let mgr = RecoveryManager::new(&store);
799 let m = mgr.load_by_id(1).await.unwrap().unwrap();
800 assert_eq!(m.checkpoint_id, 1);
801
802 let m2 = mgr.load_by_id(2).await.unwrap().unwrap();
803 assert_eq!(m2.checkpoint_id, 2);
804
805 assert!(mgr.load_by_id(999).await.unwrap().is_none());
806 }
807
808 #[tokio::test]
809 async fn test_recover_fallback_to_previous_checkpoint() {
810 let dir = tempfile::tempdir().unwrap();
811 let store = FileSystemCheckpointStore::new(dir.path(), 10);
812
813 let mut m1 = CheckpointManifest::new(1, 10);
815 m1.watermark = Some(1000);
816 store.save(&m1).await.unwrap();
817
818 let mut m2 = CheckpointManifest::new(2, 20);
819 m2.watermark = Some(2000);
820 store.save(&m2).await.unwrap();
821
822 let latest_manifest_path = dir
824 .path()
825 .join("checkpoints")
826 .join("checkpoint_000002")
827 .join("manifest.json");
828 std::fs::write(&latest_manifest_path, "not valid json!!!").unwrap();
829
830 let mgr = RecoveryManager::new(&store);
834 let result = mgr.recover(&[], &[], &[]).await.unwrap();
835
836 let recovered = result.expect("should recover from fallback checkpoint");
838 assert_eq!(recovered.manifest.checkpoint_id, 1);
839 assert_eq!(recovered.epoch(), 10);
840 assert_eq!(recovered.watermark(), Some(1000));
841 }
842
843 #[tokio::test]
844 async fn test_recover_all_checkpoints_corrupt_starts_fresh() {
845 let dir = tempfile::tempdir().unwrap();
846 let store = FileSystemCheckpointStore::new(dir.path(), 10);
847
848 store.save(&CheckpointManifest::new(1, 5)).await.unwrap();
850
851 let manifest_path = dir
852 .path()
853 .join("checkpoints")
854 .join("checkpoint_000001")
855 .join("manifest.json");
856 std::fs::write(&manifest_path, "corrupt").unwrap();
857
858 let mgr = RecoveryManager::new(&store);
859 let result = mgr.recover(&[], &[], &[]).await.unwrap();
860
861 assert!(result.is_none());
863 }
864
865 #[tokio::test]
866 async fn test_recover_latest_ok_no_fallback_needed() {
867 let dir = tempfile::tempdir().unwrap();
868 let store = FileSystemCheckpointStore::new(dir.path(), 10);
869
870 store.save(&CheckpointManifest::new(1, 10)).await.unwrap();
871 store.save(&CheckpointManifest::new(2, 20)).await.unwrap();
872
873 let mgr = RecoveryManager::new(&store);
874 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
875
876 assert_eq!(result.manifest.checkpoint_id, 2);
878 assert_eq!(result.epoch(), 20);
879 }
880
881 #[tokio::test]
882 async fn test_recover_with_sidecar_state() {
883 let dir = tempfile::tempdir().unwrap();
884 let store = make_store(dir.path());
885
886 let mut manifest = CheckpointManifest::new(1, 5);
888 let large_data = vec![0xAB; 2048];
889 manifest
890 .operator_states
891 .insert("big-op".into(), OperatorCheckpoint::external(0, 2048));
892
893 store
895 .save_state_data(1, &[bytes::Bytes::copy_from_slice(&large_data)])
896 .await
897 .unwrap();
898 store.save(&manifest).await.unwrap();
899
900 let mgr = RecoveryManager::new(&store);
901 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
902
903 let op = result.operator_states().get("big-op").unwrap();
905 assert!(!op.external, "external state should be resolved to inline");
906 assert_eq!(op.decode_inline().unwrap(), large_data);
907 }
908
909 #[tokio::test]
910 async fn test_recover_mixed_inline_and_external() {
911 let dir = tempfile::tempdir().unwrap();
912 let store = make_store(dir.path());
913
914 let mut manifest = CheckpointManifest::new(1, 3);
915 manifest
917 .operator_states
918 .insert("small-op".into(), OperatorCheckpoint::inline(b"tiny"));
919 let large_data = vec![0xCD; 4096];
921 manifest
922 .operator_states
923 .insert("big-op".into(), OperatorCheckpoint::external(0, 4096));
924
925 store
926 .save_state_data(1, &[bytes::Bytes::copy_from_slice(&large_data)])
927 .await
928 .unwrap();
929 store.save(&manifest).await.unwrap();
930
931 let mgr = RecoveryManager::new(&store);
932 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
933
934 let small = result.operator_states().get("small-op").unwrap();
935 assert_eq!(small.decode_inline().unwrap(), b"tiny");
936
937 let big = result.operator_states().get("big-op").unwrap();
938 assert_eq!(big.decode_inline().unwrap(), large_data);
939 }
940
941 #[tokio::test]
942 async fn test_recover_missing_sidecar_graceful() {
943 let dir = tempfile::tempdir().unwrap();
944 let store = make_store(dir.path());
945
946 let mut manifest = CheckpointManifest::new(1, 1);
948 manifest
949 .operator_states
950 .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
951 store.save(&manifest).await.unwrap();
952
953 let mgr = RecoveryManager::lenient(&store);
957 let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
958
959 let op = result.operator_states().get("orphan").unwrap();
962 assert!(
963 !op.external,
964 "unresolved external state replaced with inline empty"
965 );
966 assert!(
967 op.state_b64.as_ref().is_none_or(String::is_empty),
968 "replaced state should be empty"
969 );
970 }
971
972 #[tokio::test]
973 async fn test_recovered_state_has_errors() {
974 let state = RecoveredState {
975 manifest: CheckpointManifest::new(1, 1),
976 sources_restored: 0,
977 tables_restored: 0,
978 sinks_rolled_back: 0,
979 source_errors: HashMap::new(),
980 sink_errors: HashMap::new(),
981 };
982 assert!(!state.has_errors());
983
984 let state_with_errors = RecoveredState {
985 manifest: CheckpointManifest::new(1, 1),
986 sources_restored: 0,
987 tables_restored: 0,
988 sinks_rolled_back: 0,
989 source_errors: HashMap::from([("source1".into(), "failed".into())]),
990 sink_errors: HashMap::new(),
991 };
992 assert!(state_with_errors.has_errors());
993 }
994
995 #[tokio::test]
996 async fn test_recover_missing_sidecar_strict_rejects() {
997 let dir = tempfile::tempdir().unwrap();
998 let store = make_store(dir.path());
999
1000 let mut manifest = CheckpointManifest::new(1, 1);
1002 manifest
1003 .operator_states
1004 .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
1005 store.save(&manifest).await.unwrap();
1006
1007 let mgr = RecoveryManager::new(&store);
1011 let result = mgr.recover(&[], &[], &[]).await.unwrap();
1012 assert!(
1013 result.is_none(),
1014 "strict mode should reject checkpoint with missing sidecar"
1015 );
1016 }
1017
1018 #[tokio::test]
1019 async fn test_recover_skips_pending_sinks_falls_back() {
1020 let dir = tempfile::tempdir().unwrap();
1021 let store = make_store(dir.path());
1022
1023 let mut m1 = CheckpointManifest::new(1, 1);
1025 m1.sink_commit_statuses
1026 .insert("delta_sink".into(), SinkCommitStatus::Committed);
1027 store.save(&m1).await.unwrap();
1028
1029 let mut m2 = CheckpointManifest::new(2, 2);
1031 m2.sink_commit_statuses
1032 .insert("delta_sink".into(), SinkCommitStatus::Pending);
1033 store.save(&m2).await.unwrap();
1034
1035 let mgr = RecoveryManager::new(&store);
1036 let result = mgr.recover(&[], &[], &[]).await.unwrap();
1037 let state = result.expect("should recover from epoch 1 fallback");
1038
1039 assert_eq!(
1042 state.epoch(),
1043 1,
1044 "recovery must skip checkpoint with Pending sinks"
1045 );
1046 }
1047
1048 #[tokio::test]
1049 async fn test_recover_all_pending_starts_fresh() {
1050 let dir = tempfile::tempdir().unwrap();
1051 let store = make_store(dir.path());
1052
1053 let mut m = CheckpointManifest::new(1, 1);
1055 m.sink_commit_statuses
1056 .insert("sink".into(), SinkCommitStatus::Pending);
1057 store.save(&m).await.unwrap();
1058
1059 let mgr = RecoveryManager::new(&store);
1060 let result = mgr.recover(&[], &[], &[]).await.unwrap();
1061 assert!(
1062 result.is_none(),
1063 "should start fresh when all checkpoints have pending sinks"
1064 );
1065 }
1066}