Skip to main content

laminar_db/
recovery_manager.rs

1//! Unified recovery manager.
2//!
3//! Single recovery path that loads a
4//! [`CheckpointManifest`](laminar_core::storage::checkpoint_manifest::CheckpointManifest) and restores
5//! ALL state: source offsets, sink epochs, operator states, table offsets,
6//! and watermarks.
7//!
8//! ## Recovery Protocol
9//!
10//! 1. `store.load_latest()` → `Option<CheckpointManifest>`
11//! 2. If `None` → fresh start (no recovery needed)
12//! 3. For each source: `source.restore(manifest.source_offsets[name])`
13//! 4. For each table source: `source.restore(manifest.table_offsets[name])`
14//! 5. For each exactly-once sink: `sink.rollback_epoch(manifest.epoch)`
15//! 6. Return recovered state (watermark, epoch, operator states)
16//!    — caller is responsible for restoring DAG/TPC operators from `operator_states`
17//!
18//! ## Fallback Recovery
19//!
20//! If the latest checkpoint is corrupt or fails to restore, the manager
21//! iterates through all available checkpoints in reverse chronological order
22//! until one succeeds. This prevents a single corrupt checkpoint from causing
23//! total data loss.
24#![allow(clippy::disallowed_types)] // cold path
25
26use std::collections::HashMap;
27
28use bytes::Bytes;
29use laminar_core::state::StateBackend;
30use laminar_core::storage::checkpoint_manifest::{CheckpointManifest, SinkCommitStatus};
31use laminar_core::storage::checkpoint_store::CheckpointStore;
32use laminar_core::storage::ValidationResult;
33use tracing::{debug, error, info, warn};
34
35use crate::checkpoint_coordinator::{
36    connector_to_source_checkpoint, RegisteredSink, RegisteredSource,
37};
38use crate::error::DbError;
39
40/// Result of a successful recovery from a checkpoint.
41#[derive(Debug)]
42pub struct RecoveredState {
43    /// The manifest that was loaded and restored from.
44    pub manifest: CheckpointManifest,
45    /// Number of sources successfully restored.
46    pub sources_restored: usize,
47    /// Number of table sources successfully restored.
48    pub tables_restored: usize,
49    /// Number of sinks rolled back.
50    pub sinks_rolled_back: usize,
51    /// Sources that failed to restore (name → error message).
52    pub source_errors: HashMap<String, String>,
53    /// Sinks that failed to roll back (name → error message).
54    pub sink_errors: HashMap<String, String>,
55}
56
57impl RecoveredState {
58    /// Returns the recovered epoch.
59    #[must_use]
60    pub fn epoch(&self) -> u64 {
61        self.manifest.epoch
62    }
63
64    /// Returns the recovered watermark.
65    #[must_use]
66    pub fn watermark(&self) -> Option<i64> {
67        self.manifest.watermark
68    }
69
70    /// Returns whether there were any errors during recovery.
71    #[must_use]
72    pub fn has_errors(&self) -> bool {
73        !self.source_errors.is_empty() || !self.sink_errors.is_empty()
74    }
75
76    /// Returns the recovered operator states (for DAG restoration).
77    #[must_use]
78    pub fn operator_states(
79        &self,
80    ) -> &HashMap<String, laminar_core::storage::checkpoint_manifest::OperatorCheckpoint> {
81        &self.manifest.operator_states
82    }
83
84    /// Returns the table store checkpoint path, if any.
85    #[must_use]
86    pub fn table_store_checkpoint_path(&self) -> Option<&str> {
87        self.manifest.table_store_checkpoint_path.as_deref()
88    }
89}
90
91/// Outcome of rehydrating a set of vnodes from durable state.
92///
93/// Best-effort by construction: a per-vnode read failure is recorded in
94/// [`errors`](Self::errors) rather than aborting the others, so a single
95/// transient object-store hiccup can't strand an entire rebalance.
96#[derive(Debug, Default)]
97pub struct VnodeRehydration {
98    /// Committed epoch the partials were read from. `None` when the
99    /// backend reported no committed epoch (every requested vnode is
100    /// treated as a fresh start).
101    pub epoch: Option<u64>,
102    /// vnode → restored partial bytes, for vnodes that had durable state
103    /// at [`epoch`](Self::epoch).
104    pub restored: HashMap<u32, Bytes>,
105    /// Requested vnodes with no durable partial — never written, or
106    /// pruned below the committed epoch. These resume from empty state.
107    pub missing: Vec<u32>,
108    /// vnode → error message for reads that failed.
109    pub errors: HashMap<u32, String>,
110}
111
112impl VnodeRehydration {
113    /// Number of vnodes whose state was successfully read back.
114    #[must_use]
115    pub fn restored_count(&self) -> usize {
116        self.restored.len()
117    }
118
119    /// Whether any per-vnode read failed.
120    #[must_use]
121    pub fn has_errors(&self) -> bool {
122        !self.errors.is_empty()
123    }
124}
125
126/// Reads committed per-vnode partial state from a durable
127/// [`StateBackend`] so a node that newly acquires vnodes during a
128/// rebalance resumes from the last committed epoch instead of empty
129/// state.
130///
131/// This is the read half of the rehydration life cycle: it resolves the
132/// highest committed epoch once via
133/// [`StateBackend::latest_committed_epoch`] and pulls each vnode's
134/// `partial.bin` at that epoch. Applying the bytes into live operators is
135/// the caller's responsibility (operators that adopt the per-vnode state
136/// backend consume the staged blobs on their next cycle).
137pub struct VnodeRehydrator<'a> {
138    backend: &'a dyn StateBackend,
139}
140
141impl<'a> VnodeRehydrator<'a> {
142    /// Wrap the durable backend the partials live on.
143    #[must_use]
144    pub fn new(backend: &'a dyn StateBackend) -> Self {
145        Self { backend }
146    }
147
148    /// Read the latest committed partial for each vnode in `vnodes`.
149    ///
150    /// Resolves the highest committed epoch once, then reads each
151    /// vnode's `partial.bin` at that epoch. Best-effort: a read failure
152    /// for one vnode is recorded in [`VnodeRehydration::errors`] and does
153    /// not abort the rest. Returns an empty report (with `epoch = None`,
154    /// every vnode in `missing`) when the store has no committed epoch.
155    pub async fn rehydrate(&self, vnodes: &[u32]) -> VnodeRehydration {
156        let mut report = VnodeRehydration::default();
157        if vnodes.is_empty() {
158            return report;
159        }
160
161        let epoch = match self.backend.latest_committed_epoch().await {
162            Ok(Some(epoch)) => epoch,
163            Ok(None) => {
164                debug!(
165                    vnodes = ?vnodes,
166                    "rehydrate: no committed epoch on backend — vnodes start fresh"
167                );
168                report.missing = vnodes.to_vec();
169                return report;
170            }
171            Err(e) => {
172                warn!(
173                    error = %e,
174                    "[LDB-6050] rehydrate: latest_committed_epoch failed — \
175                     vnodes start fresh"
176                );
177                report.missing = vnodes.to_vec();
178                return report;
179            }
180        };
181        report.epoch = Some(epoch);
182
183        for &vnode in vnodes {
184            match self.backend.read_partial(vnode, epoch).await {
185                Ok(Some(bytes)) => {
186                    // An unchanged-vnode partial is a reference to the
187                    // epoch of its last full upload —
188                    // follow the single hop to the real state. A blob
189                    // that doesn't decode is handed through unchanged
190                    // (the apply path skips undecodable partials).
191                    let bytes = match crate::vnode_partial::VnodePartial::decode(&bytes) {
192                        Ok(p) => match p.base_epoch {
193                            Some(base) => match self.backend.read_partial(vnode, base).await {
194                                Ok(Some(base_bytes)) => base_bytes,
195                                Ok(None) => {
196                                    warn!(
197                                        vnode,
198                                        epoch,
199                                        base,
200                                        "[LDB-6052] reference partial's base is missing — \
201                                         vnode starts fresh"
202                                    );
203                                    report.missing.push(vnode);
204                                    continue;
205                                }
206                                Err(e) => {
207                                    warn!(
208                                        vnode, epoch, base, error = %e,
209                                        "[LDB-6051] rehydrate: base read failed — vnode starts fresh"
210                                    );
211                                    report.errors.insert(vnode, e.to_string());
212                                    continue;
213                                }
214                            },
215                            None => bytes,
216                        },
217                        Err(_) => bytes,
218                    };
219                    debug!(
220                        vnode,
221                        epoch,
222                        bytes = bytes.len(),
223                        "rehydrated vnode partial"
224                    );
225                    report.restored.insert(vnode, bytes);
226                }
227                Ok(None) => {
228                    debug!(
229                        vnode,
230                        epoch, "rehydrate: no partial for vnode at committed epoch"
231                    );
232                    report.missing.push(vnode);
233                }
234                Err(e) => {
235                    warn!(
236                        vnode,
237                        epoch,
238                        error = %e,
239                        "[LDB-6051] rehydrate: read_partial failed — vnode starts fresh"
240                    );
241                    report.errors.insert(vnode, e.to_string());
242                }
243            }
244        }
245
246        info!(
247            epoch,
248            restored = report.restored.len(),
249            missing = report.missing.len(),
250            errors = report.errors.len(),
251            "vnode rehydration complete"
252        );
253        report
254    }
255}
256
257/// Recovery manager.
258///
259/// Loads the latest [`CheckpointManifest`] from a [`CheckpointStore`] and
260/// restores all registered sources, sinks, and tables to their checkpointed
261/// state.
262pub struct RecoveryManager<'a> {
263    store: &'a dyn CheckpointStore,
264    /// When true, any source/sink restore failure aborts the entire recovery.
265    /// When false (default), failures are logged and recorded in `RecoveredState`
266    /// but recovery continues — the pipeline resumes with potentially
267    /// mismatched offsets.
268    strict: bool,
269}
270
271impl<'a> RecoveryManager<'a> {
272    /// Creates a new recovery manager using the given checkpoint store.
273    ///
274    /// Defaults to strict mode: any source/sink restore failure aborts
275    /// the entire recovery. Use [`Self::lenient`] to allow partial recovery.
276    #[must_use]
277    pub fn new(store: &'a dyn CheckpointStore) -> Self {
278        Self {
279            store,
280            strict: true,
281        }
282    }
283
284    /// Creates a lenient recovery manager.
285    ///
286    /// In lenient mode, source/sink restore failures are logged and
287    /// recorded in `RecoveredState` but do not abort recovery. The
288    /// pipeline resumes with potentially mismatched offsets.
289    #[must_use]
290    pub fn lenient(store: &'a dyn CheckpointStore) -> Self {
291        Self {
292            store,
293            strict: false,
294        }
295    }
296
297    /// Attempts to recover from the latest checkpoint, with fallback to older
298    /// checkpoints if the latest is corrupt or fails to restore.
299    ///
300    /// Returns `Ok(None)` if no checkpoint exists (fresh start).
301    /// Returns `Ok(Some(RecoveredState))` on successful recovery.
302    ///
303    /// Recovery is best-effort: individual source/sink failures are recorded
304    /// in `RecoveredState` but do not abort the entire recovery. This allows
305    /// partial recovery (e.g., one source fails to seek but others succeed).
306    ///
307    /// ## Fallback Behavior
308    ///
309    /// If the latest checkpoint fails (corrupt manifest, deserialization error),
310    /// the manager iterates through all available checkpoints in reverse
311    /// chronological order until one succeeds or all are exhausted. This
312    /// prevents a single corrupt checkpoint from causing total data loss.
313    ///
314    /// # Errors
315    ///
316    /// Returns `DbError::Checkpoint` if the checkpoint store fails, or
317    /// in strict mode, if any source/sink restore fails.
318    pub(crate) async fn recover(
319        &self,
320        sources: &[RegisteredSource],
321        sinks: &[RegisteredSink],
322        table_sources: &[RegisteredSource],
323    ) -> Result<Option<RecoveredState>, DbError> {
324        // Fast path: try load_latest() first.
325        match self.store.load_latest().await {
326            Ok(Some(manifest)) => {
327                if self.is_checkpoint_corrupt(&manifest).await {
328                    warn!(
329                        checkpoint_id = manifest.checkpoint_id,
330                        "[LDB-6010] latest checkpoint corrupt, trying fallback"
331                    );
332                } else if Self::has_pending_sinks(&manifest) {
333                    warn!(
334                        checkpoint_id = manifest.checkpoint_id,
335                        epoch = manifest.epoch,
336                        "[LDB-6015] checkpoint has uncommitted sinks — source offsets \
337                         may be past uncommitted data, falling back to previous checkpoint"
338                    );
339                } else {
340                    let state = self
341                        .restore_from(manifest, sources, sinks, table_sources)
342                        .await;
343                    if let Err(e) = self.check_strict(&state) {
344                        warn!(
345                            checkpoint_id = state.manifest.checkpoint_id,
346                            error = %e,
347                            "latest checkpoint restore had strict errors, trying fallback"
348                        );
349                    } else {
350                        return Ok(Some(state));
351                    }
352                }
353            }
354            Ok(None) => {
355                info!("no checkpoint found, starting fresh");
356                return Ok(None);
357            }
358            Err(e) => {
359                warn!(error = %e, "latest checkpoint load failed, trying fallback");
360            }
361        }
362
363        // Fallback: iterate through all checkpoints in reverse order.
364        let checkpoints = self.store.list().await.map_err(DbError::from)?;
365
366        if checkpoints.is_empty() {
367            warn!("no checkpoints available for fallback, starting fresh");
368            return Ok(None);
369        }
370
371        for &(checkpoint_id, _epoch) in checkpoints.iter().rev() {
372            match self.store.load_by_id(checkpoint_id).await {
373                Ok(Some(manifest)) => {
374                    if self.is_checkpoint_corrupt(&manifest).await {
375                        warn!(
376                            checkpoint_id,
377                            "[LDB-6010] fallback checkpoint corrupt, skipping"
378                        );
379                        continue;
380                    }
381                    if Self::has_pending_sinks(&manifest) {
382                        warn!(
383                            checkpoint_id,
384                            "[LDB-6015] fallback checkpoint has uncommitted sinks, skipping"
385                        );
386                        continue;
387                    }
388                    info!(checkpoint_id, "recovering from fallback checkpoint");
389                    let state = self
390                        .restore_from(manifest, sources, sinks, table_sources)
391                        .await;
392                    if let Err(e) = self.check_strict(&state) {
393                        warn!(
394                            checkpoint_id,
395                            error = %e,
396                            "fallback checkpoint restore had strict errors, trying next"
397                        );
398                        continue;
399                    }
400                    return Ok(Some(state));
401                }
402                Ok(None) => {
403                    debug!(checkpoint_id, "fallback checkpoint not found, skipping");
404                }
405                Err(e) => {
406                    warn!(
407                        checkpoint_id,
408                        error = %e,
409                        "fallback checkpoint load failed, trying next"
410                    );
411                }
412            }
413        }
414
415        warn!("all checkpoints failed to load, starting fresh");
416        Ok(None)
417    }
418
419    /// Resolves external operator states by loading the sidecar file.
420    ///
421    /// For any operator state marked as `external`, loads the corresponding
422    /// bytes from `state.bin` and replaces it with an inline entry. This
423    /// makes the rest of recovery code work uniformly with inline state.
424    ///
425    /// Returns `true` if all external states were resolved successfully.
426    /// Returns `false` if any state could not be resolved (missing sidecar,
427    /// truncated sidecar, or I/O error). In strict mode, the caller should
428    /// treat a `false` return as a corrupt checkpoint and try fallback.
429    async fn resolve_external_states(&self, manifest: &mut CheckpointManifest) -> bool {
430        let external_ops: Vec<String> = manifest
431            .operator_states
432            .iter()
433            .filter(|(_, op)| op.external)
434            .map(|(name, _)| name.clone())
435            .collect();
436
437        if external_ops.is_empty() {
438            return true;
439        }
440
441        let state_data = match self.store.load_state_data(manifest.checkpoint_id).await {
442            Ok(Some(data)) => data,
443            Ok(None) => {
444                error!(
445                    checkpoint_id = manifest.checkpoint_id,
446                    operators = ?external_ops,
447                    "[LDB-6010] sidecar state.bin missing — external operator states \
448                     cannot be resolved; operators will start with empty state"
449                );
450                // Clear external flag so recovery doesn't attempt to
451                // dereference invalid offsets later
452                for name in &external_ops {
453                    if let Some(op) = manifest.operator_states.get_mut(name) {
454                        *op =
455                            laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::inline(
456                                &[],
457                            );
458                    }
459                }
460                return false;
461            }
462            Err(e) => {
463                error!(
464                    checkpoint_id = manifest.checkpoint_id,
465                    error = %e,
466                    operators = ?external_ops,
467                    "[LDB-6010] failed to load sidecar state.bin — external operator states \
468                     cannot be resolved; operators will start with empty state"
469                );
470                for name in &external_ops {
471                    if let Some(op) = manifest.operator_states.get_mut(name) {
472                        *op =
473                            laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::inline(
474                                &[],
475                            );
476                    }
477                }
478                return false;
479            }
480        };
481
482        let mut all_resolved = true;
483        for (name, op) in &mut manifest.operator_states {
484            if op.external {
485                // `external_offset`/`external_length` are u64 from the on-disk
486                // manifest; use checked arithmetic so a corrupt/tampered
487                // manifest can't overflow past the length check.
488                let range = match (
489                    usize::try_from(op.external_offset),
490                    usize::try_from(op.external_length),
491                ) {
492                    (Ok(start), Ok(len)) => start.checked_add(len).map(|end| (start, end)),
493                    _ => None,
494                }
495                .filter(|&(_, end)| end <= state_data.len());
496                if let Some((start, end)) = range {
497                    let external_offset = op.external_offset;
498                    let external_length = op.external_length;
499                    let data = &state_data[start..end];
500                    *op = laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::inline(
501                        data,
502                    );
503                    debug!(
504                        operator = %name,
505                        offset = external_offset,
506                        length = external_length,
507                        "resolved external operator state from sidecar"
508                    );
509                } else {
510                    error!(
511                        operator = %name,
512                        offset = op.external_offset,
513                        length = op.external_length,
514                        sidecar_len = state_data.len(),
515                        "[LDB-6010] sidecar too small or offset/length out of \
516                         range for external operator state — operator will \
517                         start with empty state"
518                    );
519                    *op =
520                        laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
521                    all_resolved = false;
522                }
523            }
524        }
525        all_resolved
526    }
527
528    /// Restores pipeline state from a loaded manifest.
529    ///
530    /// This is the inner restore logic shared by both the fast path
531    /// (latest checkpoint) and fallback path (older checkpoints).
532    #[allow(clippy::too_many_lines)]
533    async fn restore_from(
534        &self,
535        mut manifest: CheckpointManifest,
536        sources: &[RegisteredSource],
537        sinks: &[RegisteredSink],
538        table_sources: &[RegisteredSource],
539    ) -> RecoveredState {
540        // Resolve external operator states from sidecar before recovery.
541        // In strict mode, unresolved sidecar state is recorded as a source
542        // error so check_strict() will reject this checkpoint.
543        let sidecar_ok = self.resolve_external_states(&mut manifest).await;
544        if !sidecar_ok && self.strict {
545            warn!(
546                checkpoint_id = manifest.checkpoint_id,
547                "[LDB-6010] sidecar resolution failed in strict mode — \
548                 checkpoint will be rejected"
549            );
550        }
551
552        // Validate manifest consistency before restoring state.
553        // `DEFAULT_VNODE_COUNT` is a placeholder; the runtime
554        // `VnodeRegistry` value is not threaded through recovery yet.
555        let validation_errors =
556            manifest.validate(laminar_core::storage::checkpoint_manifest::DEFAULT_VNODE_COUNT);
557        if !validation_errors.is_empty() {
558            for err in &validation_errors {
559                warn!(
560                    checkpoint_id = manifest.checkpoint_id,
561                    error = %err,
562                    "manifest validation warning"
563                );
564            }
565        }
566
567        // Topology drift detection: compare current sources/sinks against
568        // the checkpoint to warn the operator about changes.
569        if !manifest.source_names.is_empty() {
570            let mut current_sources: Vec<&str> = sources.iter().map(|s| s.name.as_str()).collect();
571            current_sources.sort_unstable();
572            let checkpoint_sources: Vec<&str> =
573                manifest.source_names.iter().map(String::as_str).collect();
574            let added: Vec<&&str> = current_sources
575                .iter()
576                .filter(|n| !checkpoint_sources.contains(n))
577                .collect();
578            let removed: Vec<&&str> = checkpoint_sources
579                .iter()
580                .filter(|n| !current_sources.contains(n))
581                .collect();
582            if !added.is_empty() {
583                warn!(
584                    sources = ?added,
585                    "new sources added since checkpoint — no saved offsets"
586                );
587            }
588            if !removed.is_empty() {
589                warn!(
590                    sources = ?removed,
591                    "sources removed since checkpoint — orphaned offsets"
592                );
593            }
594        }
595        if !manifest.sink_names.is_empty() {
596            let mut current_sinks: Vec<&str> = sinks.iter().map(|s| s.name.as_str()).collect();
597            current_sinks.sort_unstable();
598            let checkpoint_sinks: Vec<&str> =
599                manifest.sink_names.iter().map(String::as_str).collect();
600            let added: Vec<&&str> = current_sinks
601                .iter()
602                .filter(|n| !checkpoint_sinks.contains(n))
603                .collect();
604            let removed: Vec<&&str> = checkpoint_sinks
605                .iter()
606                .filter(|n| !current_sinks.contains(n))
607                .collect();
608            if !added.is_empty() {
609                warn!(
610                    sinks = ?added,
611                    "new sinks added since checkpoint — no saved epoch"
612                );
613            }
614            if !removed.is_empty() {
615                warn!(
616                    sinks = ?removed,
617                    "sinks removed since checkpoint — orphaned epochs"
618                );
619            }
620        }
621
622        info!(
623            checkpoint_id = manifest.checkpoint_id,
624            epoch = manifest.epoch,
625            validation_warnings = validation_errors.len(),
626            "recovering from checkpoint"
627        );
628
629        let mut result = RecoveredState {
630            manifest: manifest.clone(),
631            sources_restored: 0,
632            tables_restored: 0,
633            sinks_rolled_back: 0,
634            source_errors: HashMap::new(),
635            sink_errors: HashMap::new(),
636        };
637
638        // Record sidecar failure so check_strict() rejects this checkpoint.
639        if !sidecar_ok {
640            result.source_errors.insert(
641                "__sidecar__".into(),
642                "[LDB-6010] sidecar state.bin missing or truncated — \
643                 operator state cannot be fully restored"
644                    .into(),
645            );
646        }
647
648        // Restore source offsets.
649        for source in sources {
650            if !source.supports_replay {
651                info!(
652                    source = %source.name,
653                    "skipping restore for non-replayable source (at-most-once)"
654                );
655                continue;
656            }
657            if let Some(cp) = manifest.source_offsets.get(&source.name) {
658                let source_cp = connector_to_source_checkpoint(cp);
659                let mut last_err = None;
660                for attempt in 0..3u32 {
661                    let mut connector = source.connector.lock().await;
662                    match connector.restore(&source_cp).await {
663                        Ok(()) => {
664                            last_err = None;
665                            break;
666                        }
667                        Err(e) => {
668                            warn!(
669                                source = %source.name, attempt,
670                                error = %e, "source restore failed, retrying"
671                            );
672                            last_err = Some(e);
673                            drop(connector);
674                            if attempt < 2 {
675                                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
676                            }
677                        }
678                    }
679                }
680                if let Some(e) = last_err {
681                    let msg = format!("source restore failed after 3 attempts: {e}");
682                    result.source_errors.insert(source.name.clone(), msg);
683                } else {
684                    result.sources_restored += 1;
685                    debug!(source = %source.name, epoch = cp.epoch, "source restored");
686                }
687            }
688        }
689
690        // Restore table source offsets.
691        for table_source in table_sources {
692            if let Some(cp) = manifest.table_offsets.get(&table_source.name) {
693                let source_cp = connector_to_source_checkpoint(cp);
694                let mut connector = table_source.connector.lock().await;
695                match connector.restore(&source_cp).await {
696                    Ok(()) => {
697                        result.tables_restored += 1;
698                        debug!(table = %table_source.name, epoch = cp.epoch, "table source restored");
699                    }
700                    Err(e) => {
701                        let msg = format!("table source restore failed: {e}");
702                        warn!(table = %table_source.name, error = %e, "table source restore failed");
703                        result.source_errors.insert(table_source.name.clone(), msg);
704                    }
705                }
706            }
707        }
708
709        // Roll back exactly-once sinks that did NOT commit (Pending or Failed).
710        // Already-Committed sinks are left alone.
711        for sink in sinks {
712            if sink.exactly_once {
713                // Check per-sink commit status — if the sink committed, skip rollback.
714                let already_committed = manifest
715                    .sink_commit_statuses
716                    .get(&sink.name)
717                    .is_some_and(|s| matches!(s, SinkCommitStatus::Committed));
718
719                if already_committed {
720                    debug!(
721                        sink = %sink.name,
722                        epoch = manifest.epoch,
723                        "sink already committed, skipping rollback"
724                    );
725                    continue;
726                }
727
728                match sink.handle.rollback_epoch(manifest.epoch).await {
729                    Ok(()) => {
730                        result.sinks_rolled_back += 1;
731                        debug!(sink = %sink.name, epoch = manifest.epoch, "sink rolled back");
732                    }
733                    Err(e) => {
734                        result
735                            .sink_errors
736                            .insert(sink.name.clone(), format!("rollback failed: {e}"));
737                        warn!(
738                            sink = %sink.name,
739                            epoch = manifest.epoch,
740                            error = %e,
741                            "[LDB-6016] sink rollback failed during recovery"
742                        );
743                    }
744                }
745            }
746        }
747
748        info!(
749            checkpoint_id = manifest.checkpoint_id,
750            epoch = manifest.epoch,
751            sources_restored = result.sources_restored,
752            tables_restored = result.tables_restored,
753            sinks_rolled_back = result.sinks_rolled_back,
754            errors = result.source_errors.len() + result.sink_errors.len(),
755            "recovery complete"
756        );
757
758        result
759    }
760
761    /// Checks whether a checkpoint's sidecar data is corrupt.
762    ///
763    /// Returns `true` if the checkpoint has a `state_checksum` and
764    /// [`CheckpointStore::validate_checkpoint`] reports a checksum mismatch
765    /// or missing sidecar. Returns `false` if there is no sidecar, or
766    /// if the sidecar is valid, or if validation I/O fails (we proceed
767    /// with caution rather than blocking recovery).
768    /// Returns `true` if the checkpoint fails integrity validation.
769    ///
770    /// Checks both sidecar checksum and manifest validity. Any validation
771    /// failure (sidecar corruption, missing data, or manifest issues like
772    /// epoch=0) causes the checkpoint to be rejected so fallback can try
773    /// an older one.
774    ///
775    /// Only returns `false` (proceed) when validation passes OR when the
776    /// checkpoint has no sidecar to validate.
777    async fn is_checkpoint_corrupt(&self, manifest: &CheckpointManifest) -> bool {
778        // No sidecar and no state_checksum → nothing to validate beyond
779        // manifest parsing (which already succeeded if we got here).
780        if manifest.state_checksum.is_none() && manifest.operator_states.is_empty() {
781            return false;
782        }
783        match self.store.validate_checkpoint(manifest.checkpoint_id).await {
784            Ok(ValidationResult {
785                valid: false,
786                ref issues,
787                ..
788            }) => {
789                error!(
790                    checkpoint_id = manifest.checkpoint_id,
791                    issues = ?issues,
792                    "[LDB-6010] checkpoint integrity check failed"
793                );
794                true
795            }
796            Ok(_) => false, // valid
797            Err(e) => {
798                // I/O errors during validation are treated as corruption —
799                // if we can't verify the checkpoint, don't trust it.
800                error!(
801                    checkpoint_id = manifest.checkpoint_id,
802                    error = %e,
803                    "[LDB-6010] checkpoint validation I/O error — \
804                     treating as corrupt for safety"
805                );
806                true
807            }
808        }
809    }
810
811    /// In strict mode, returns an error if any source or sink had restore failures.
812    /// Returns true if any exactly-once sink has Pending commit status.
813    ///
814    /// A checkpoint with Pending sinks was persisted before sink commit
815    /// completed. Recovering from it would advance source offsets past
816    /// data the sinks never received — causing silent data loss.
817    fn has_pending_sinks(manifest: &CheckpointManifest) -> bool {
818        manifest
819            .sink_commit_statuses
820            .values()
821            .any(|s| matches!(s, SinkCommitStatus::Pending))
822    }
823
824    fn check_strict(&self, state: &RecoveredState) -> Result<(), DbError> {
825        if !self.strict || !state.has_errors() {
826            return Ok(());
827        }
828        let mut msgs: Vec<String> = state
829            .source_errors
830            .iter()
831            .map(|(k, v)| format!("source '{k}': {v}"))
832            .collect();
833        for (k, v) in &state.sink_errors {
834            msgs.push(format!("sink '{k}': {v}"));
835        }
836        Err(DbError::Checkpoint(format!(
837            "strict recovery failed — {} restore error(s): {}",
838            msgs.len(),
839            msgs.join("; ")
840        )))
841    }
842
843    /// Loads the latest manifest without performing recovery.
844    ///
845    /// Useful for inspecting checkpoint state or building a recovery plan.
846    ///
847    /// # Errors
848    ///
849    /// Returns `DbError::Checkpoint` if the store fails.
850    pub async fn load_latest(&self) -> Result<Option<CheckpointManifest>, DbError> {
851        self.store.load_latest().await.map_err(DbError::from)
852    }
853
854    /// Loads a specific checkpoint by ID.
855    ///
856    /// # Errors
857    ///
858    /// Returns `DbError::Checkpoint` if the store fails.
859    pub async fn load_by_id(
860        &self,
861        checkpoint_id: u64,
862    ) -> Result<Option<CheckpointManifest>, DbError> {
863        self.store
864            .load_by_id(checkpoint_id)
865            .await
866            .map_err(DbError::from)
867    }
868}
869
870#[cfg(test)]
871mod tests {
872    use super::*;
873    use laminar_core::storage::checkpoint_manifest::OperatorCheckpoint;
874    use laminar_core::storage::checkpoint_store::FileSystemCheckpointStore;
875
876    fn make_store(dir: &std::path::Path) -> FileSystemCheckpointStore {
877        FileSystemCheckpointStore::new(dir, 3)
878    }
879
880    #[tokio::test]
881    async fn test_recover_no_checkpoint() {
882        let dir = tempfile::tempdir().unwrap();
883        let store = make_store(dir.path());
884        let mgr = RecoveryManager::new(&store);
885
886        let result = mgr.recover(&[], &[], &[]).await.unwrap();
887        assert!(result.is_none());
888    }
889
890    #[tokio::test]
891    async fn test_recover_empty_checkpoint() {
892        let dir = tempfile::tempdir().unwrap();
893        let store = make_store(dir.path());
894
895        // Save a basic checkpoint
896        let manifest = CheckpointManifest::new(1, 5);
897        store.save(&manifest).await.unwrap();
898
899        let mgr = RecoveryManager::new(&store);
900        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
901
902        assert_eq!(result.epoch(), 5);
903        assert_eq!(result.sources_restored, 0);
904        assert_eq!(result.tables_restored, 0);
905        assert_eq!(result.sinks_rolled_back, 0);
906        assert!(!result.has_errors());
907    }
908
909    #[tokio::test]
910    async fn test_recover_with_watermark() {
911        let dir = tempfile::tempdir().unwrap();
912        let store = make_store(dir.path());
913
914        let mut manifest = CheckpointManifest::new(1, 3);
915        manifest.watermark = Some(42_000);
916        store.save(&manifest).await.unwrap();
917
918        let mgr = RecoveryManager::new(&store);
919        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
920
921        assert_eq!(result.watermark(), Some(42_000));
922    }
923
924    #[tokio::test]
925    async fn test_recover_with_operator_states() {
926        let dir = tempfile::tempdir().unwrap();
927        let store = make_store(dir.path());
928
929        let mut manifest = CheckpointManifest::new(1, 7);
930        manifest
931            .operator_states
932            .insert("0".to_string(), OperatorCheckpoint::inline(b"window-state"));
933        manifest
934            .operator_states
935            .insert("3".to_string(), OperatorCheckpoint::inline(b"filter-state"));
936        store.save(&manifest).await.unwrap();
937
938        let mgr = RecoveryManager::new(&store);
939        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
940
941        assert_eq!(result.operator_states().len(), 2);
942        let op0 = result.operator_states().get("0").unwrap();
943        assert_eq!(op0.decode_inline().unwrap(), b"window-state");
944    }
945
946    #[tokio::test]
947    async fn test_recover_table_store_path() {
948        let dir = tempfile::tempdir().unwrap();
949        let store = make_store(dir.path());
950
951        let mut manifest = CheckpointManifest::new(1, 1);
952        manifest.table_store_checkpoint_path = Some("/data/table_store_cp_001".into());
953        store.save(&manifest).await.unwrap();
954
955        let mgr = RecoveryManager::new(&store);
956        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
957
958        assert_eq!(
959            result.table_store_checkpoint_path(),
960            Some("/data/table_store_cp_001")
961        );
962    }
963
964    #[tokio::test]
965    async fn test_load_latest_no_checkpoint() {
966        let dir = tempfile::tempdir().unwrap();
967        let store = make_store(dir.path());
968        let mgr = RecoveryManager::new(&store);
969
970        assert!(mgr.load_latest().await.unwrap().is_none());
971    }
972
973    #[tokio::test]
974    async fn test_load_by_id() {
975        let dir = tempfile::tempdir().unwrap();
976        let store = make_store(dir.path());
977
978        store.save(&CheckpointManifest::new(1, 1)).await.unwrap();
979        store.save(&CheckpointManifest::new(2, 2)).await.unwrap();
980
981        let mgr = RecoveryManager::new(&store);
982        let m = mgr.load_by_id(1).await.unwrap().unwrap();
983        assert_eq!(m.checkpoint_id, 1);
984
985        let m2 = mgr.load_by_id(2).await.unwrap().unwrap();
986        assert_eq!(m2.checkpoint_id, 2);
987
988        assert!(mgr.load_by_id(999).await.unwrap().is_none());
989    }
990
991    #[tokio::test]
992    async fn test_recover_fallback_to_previous_checkpoint() {
993        let dir = tempfile::tempdir().unwrap();
994        let store = FileSystemCheckpointStore::new(dir.path(), 10);
995
996        // Save two valid checkpoints
997        let mut m1 = CheckpointManifest::new(1, 10);
998        m1.watermark = Some(1000);
999        store.save(&m1).await.unwrap();
1000
1001        let mut m2 = CheckpointManifest::new(2, 20);
1002        m2.watermark = Some(2000);
1003        store.save(&m2).await.unwrap();
1004
1005        // Corrupt the latest checkpoint by writing invalid JSON
1006        let latest_manifest_path = dir
1007            .path()
1008            .join("checkpoints")
1009            .join("checkpoint_000002")
1010            .join("manifest.json");
1011        std::fs::write(&latest_manifest_path, "not valid json!!!").unwrap();
1012
1013        // Also corrupt latest.txt to point to the corrupt checkpoint
1014        // (it already does from the save, but the manifest file is now corrupt)
1015
1016        let mgr = RecoveryManager::new(&store);
1017        let result = mgr.recover(&[], &[], &[]).await.unwrap();
1018
1019        // Should fall back to checkpoint 1
1020        let recovered = result.expect("should recover from fallback checkpoint");
1021        assert_eq!(recovered.manifest.checkpoint_id, 1);
1022        assert_eq!(recovered.epoch(), 10);
1023        assert_eq!(recovered.watermark(), Some(1000));
1024    }
1025
1026    #[tokio::test]
1027    async fn test_recover_all_checkpoints_corrupt_starts_fresh() {
1028        let dir = tempfile::tempdir().unwrap();
1029        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1030
1031        // Save a checkpoint then corrupt it
1032        store.save(&CheckpointManifest::new(1, 5)).await.unwrap();
1033
1034        let manifest_path = dir
1035            .path()
1036            .join("checkpoints")
1037            .join("checkpoint_000001")
1038            .join("manifest.json");
1039        std::fs::write(&manifest_path, "corrupt").unwrap();
1040
1041        let mgr = RecoveryManager::new(&store);
1042        let result = mgr.recover(&[], &[], &[]).await.unwrap();
1043
1044        // All checkpoints corrupt → fresh start
1045        assert!(result.is_none());
1046    }
1047
1048    #[tokio::test]
1049    async fn test_recover_latest_ok_no_fallback_needed() {
1050        let dir = tempfile::tempdir().unwrap();
1051        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1052
1053        store.save(&CheckpointManifest::new(1, 10)).await.unwrap();
1054        store.save(&CheckpointManifest::new(2, 20)).await.unwrap();
1055
1056        let mgr = RecoveryManager::new(&store);
1057        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
1058
1059        // Should use the latest (no fallback needed)
1060        assert_eq!(result.manifest.checkpoint_id, 2);
1061        assert_eq!(result.epoch(), 20);
1062    }
1063
1064    #[tokio::test]
1065    async fn test_recover_with_sidecar_state() {
1066        let dir = tempfile::tempdir().unwrap();
1067        let store = make_store(dir.path());
1068
1069        // Build a manifest with an external operator state
1070        let mut manifest = CheckpointManifest::new(1, 5);
1071        let large_data = vec![0xAB; 2048];
1072        manifest
1073            .operator_states
1074            .insert("big-op".into(), OperatorCheckpoint::external(0, 2048));
1075
1076        // Write sidecar first, then manifest
1077        store
1078            .save_state_data(1, &[bytes::Bytes::copy_from_slice(&large_data)])
1079            .await
1080            .unwrap();
1081        store.save(&manifest).await.unwrap();
1082
1083        let mgr = RecoveryManager::new(&store);
1084        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
1085
1086        // External state should have been resolved to inline
1087        let op = result.operator_states().get("big-op").unwrap();
1088        assert!(!op.external, "external state should be resolved to inline");
1089        assert_eq!(op.decode_inline().unwrap(), large_data);
1090    }
1091
1092    #[tokio::test]
1093    async fn test_recover_mixed_inline_and_external() {
1094        let dir = tempfile::tempdir().unwrap();
1095        let store = make_store(dir.path());
1096
1097        let mut manifest = CheckpointManifest::new(1, 3);
1098        // Small inline state
1099        manifest
1100            .operator_states
1101            .insert("small-op".into(), OperatorCheckpoint::inline(b"tiny"));
1102        // Large external state
1103        let large_data = vec![0xCD; 4096];
1104        manifest
1105            .operator_states
1106            .insert("big-op".into(), OperatorCheckpoint::external(0, 4096));
1107
1108        store
1109            .save_state_data(1, &[bytes::Bytes::copy_from_slice(&large_data)])
1110            .await
1111            .unwrap();
1112        store.save(&manifest).await.unwrap();
1113
1114        let mgr = RecoveryManager::new(&store);
1115        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
1116
1117        let small = result.operator_states().get("small-op").unwrap();
1118        assert_eq!(small.decode_inline().unwrap(), b"tiny");
1119
1120        let big = result.operator_states().get("big-op").unwrap();
1121        assert_eq!(big.decode_inline().unwrap(), large_data);
1122    }
1123
1124    #[tokio::test]
1125    async fn test_recover_missing_sidecar_graceful() {
1126        let dir = tempfile::tempdir().unwrap();
1127        let store = make_store(dir.path());
1128
1129        // Manifest references external state but sidecar is missing
1130        let mut manifest = CheckpointManifest::new(1, 1);
1131        manifest
1132            .operator_states
1133            .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
1134        store.save(&manifest).await.unwrap();
1135
1136        // Use lenient mode — graceful degradation replaces missing
1137        // sidecar state with empty inline. Strict mode rejects this
1138        // checkpoint entirely (see test_recover_missing_sidecar_strict).
1139        let mgr = RecoveryManager::lenient(&store);
1140        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
1141
1142        // Should still recover (gracefully) — external state replaced with
1143        // empty inline to avoid dangling offset references
1144        let op = result.operator_states().get("orphan").unwrap();
1145        assert!(
1146            !op.external,
1147            "unresolved external state replaced with inline empty"
1148        );
1149        assert!(
1150            op.state_b64.as_ref().is_none_or(String::is_empty),
1151            "replaced state should be empty"
1152        );
1153    }
1154
1155    #[tokio::test]
1156    async fn test_recovered_state_has_errors() {
1157        let state = RecoveredState {
1158            manifest: CheckpointManifest::new(1, 1),
1159            sources_restored: 0,
1160            tables_restored: 0,
1161            sinks_rolled_back: 0,
1162            source_errors: HashMap::new(),
1163            sink_errors: HashMap::new(),
1164        };
1165        assert!(!state.has_errors());
1166
1167        let state_with_errors = RecoveredState {
1168            manifest: CheckpointManifest::new(1, 1),
1169            sources_restored: 0,
1170            tables_restored: 0,
1171            sinks_rolled_back: 0,
1172            source_errors: HashMap::from([("source1".into(), "failed".into())]),
1173            sink_errors: HashMap::new(),
1174        };
1175        assert!(state_with_errors.has_errors());
1176    }
1177
1178    #[tokio::test]
1179    async fn test_recover_missing_sidecar_strict_rejects() {
1180        let dir = tempfile::tempdir().unwrap();
1181        let store = make_store(dir.path());
1182
1183        // Manifest references external state but sidecar is missing
1184        let mut manifest = CheckpointManifest::new(1, 1);
1185        manifest
1186            .operator_states
1187            .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
1188        store.save(&manifest).await.unwrap();
1189
1190        // Strict mode: missing sidecar causes the checkpoint to be rejected
1191        // and recovery falls back. With only one checkpoint, this means
1192        // fresh start.
1193        let mgr = RecoveryManager::new(&store);
1194        let result = mgr.recover(&[], &[], &[]).await.unwrap();
1195        assert!(
1196            result.is_none(),
1197            "strict mode should reject checkpoint with missing sidecar"
1198        );
1199    }
1200
1201    #[tokio::test]
1202    async fn test_recover_skips_pending_sinks_falls_back() {
1203        let dir = tempfile::tempdir().unwrap();
1204        let store = make_store(dir.path());
1205
1206        // Epoch 1: fully committed checkpoint (good).
1207        let mut m1 = CheckpointManifest::new(1, 1);
1208        m1.sink_commit_statuses
1209            .insert("delta_sink".into(), SinkCommitStatus::Committed);
1210        store.save(&m1).await.unwrap();
1211
1212        // Epoch 2: crashed between manifest persist and sink commit (Pending).
1213        let mut m2 = CheckpointManifest::new(2, 2);
1214        m2.sink_commit_statuses
1215            .insert("delta_sink".into(), SinkCommitStatus::Pending);
1216        store.save(&m2).await.unwrap();
1217
1218        let mgr = RecoveryManager::new(&store);
1219        let result = mgr.recover(&[], &[], &[]).await.unwrap();
1220        let state = result.expect("should recover from epoch 1 fallback");
1221
1222        // Must fall back to epoch 1 (the last fully committed checkpoint),
1223        // not epoch 2 (which has uncommitted sink data).
1224        assert_eq!(
1225            state.epoch(),
1226            1,
1227            "recovery must skip checkpoint with Pending sinks"
1228        );
1229    }
1230
1231    #[tokio::test]
1232    async fn test_recover_all_pending_starts_fresh() {
1233        let dir = tempfile::tempdir().unwrap();
1234        let store = make_store(dir.path());
1235
1236        // Only checkpoint has Pending sinks — no safe fallback.
1237        let mut m = CheckpointManifest::new(1, 1);
1238        m.sink_commit_statuses
1239            .insert("sink".into(), SinkCommitStatus::Pending);
1240        store.save(&m).await.unwrap();
1241
1242        let mgr = RecoveryManager::new(&store);
1243        let result = mgr.recover(&[], &[], &[]).await.unwrap();
1244        assert!(
1245            result.is_none(),
1246            "should start fresh when all checkpoints have pending sinks"
1247        );
1248    }
1249}
1250
1251#[cfg(test)]
1252mod rehydration_tests {
1253    use super::*;
1254    use bytes::Bytes;
1255    use laminar_core::state::{InProcessBackend, ObjectStoreBackend};
1256
1257    async fn seal_epoch(backend: &dyn StateBackend, epoch: u64, vnodes: &[u32], tag: &[u8]) {
1258        for &v in vnodes {
1259            backend
1260                .write_partial(v, epoch, 0, Bytes::copy_from_slice(tag))
1261                .await
1262                .unwrap();
1263        }
1264        assert!(backend.epoch_complete(epoch, vnodes).await.unwrap());
1265    }
1266
1267    #[tokio::test]
1268    async fn rehydrate_reads_committed_partials() {
1269        let backend = InProcessBackend::new(4);
1270        seal_epoch(&backend, 7, &[0, 1, 2], b"v7").await;
1271
1272        let report = VnodeRehydrator::new(&backend).rehydrate(&[0, 1, 3]).await;
1273
1274        assert_eq!(report.epoch, Some(7));
1275        assert_eq!(report.restored_count(), 2);
1276        assert_eq!(report.restored.get(&0).map(|b| &b[..]), Some(&b"v7"[..]));
1277        assert_eq!(report.restored.get(&1).map(|b| &b[..]), Some(&b"v7"[..]));
1278        // vnode 3 was never written — fresh start, no error.
1279        assert_eq!(report.missing, vec![3]);
1280        assert!(!report.has_errors());
1281    }
1282
1283    #[tokio::test]
1284    async fn rehydrate_reads_latest_committed_epoch() {
1285        let backend = InProcessBackend::new(4);
1286        seal_epoch(&backend, 3, &[0, 1], b"old").await;
1287        seal_epoch(&backend, 9, &[0, 1], b"new").await;
1288
1289        let report = VnodeRehydrator::new(&backend).rehydrate(&[0, 1]).await;
1290
1291        assert_eq!(report.epoch, Some(9), "must read the highest sealed epoch");
1292        assert_eq!(report.restored.get(&0).map(|b| &b[..]), Some(&b"new"[..]));
1293    }
1294
1295    /// A reference partial resolves (one hop) to the
1296    /// full partial it points at.
1297    #[tokio::test]
1298    async fn rehydrate_resolves_reference_partials() {
1299        let backend = InProcessBackend::new(4);
1300
1301        let full = crate::vnode_partial::VnodePartial {
1302            checkpoint_id: 1,
1303            operators: vec![("agg".into(), vec![1, 2, 3])],
1304            base_epoch: None,
1305        };
1306        backend
1307            .write_partial(0, 5, 0, Bytes::from(full.encode().unwrap()))
1308            .await
1309            .unwrap();
1310        assert!(backend.epoch_complete(5, &[0]).await.unwrap());
1311
1312        let reference = crate::vnode_partial::VnodePartial {
1313            checkpoint_id: 2,
1314            operators: Vec::new(),
1315            base_epoch: Some(5),
1316        };
1317        backend
1318            .write_partial(0, 6, 0, Bytes::from(reference.encode().unwrap()))
1319            .await
1320            .unwrap();
1321        assert!(backend.epoch_complete(6, &[0]).await.unwrap());
1322
1323        let report = VnodeRehydrator::new(&backend).rehydrate(&[0]).await;
1324        assert_eq!(report.epoch, Some(6));
1325        let restored = crate::vnode_partial::VnodePartial::decode(
1326            report.restored.get(&0).expect("vnode restored"),
1327        )
1328        .unwrap();
1329        assert_eq!(
1330            restored.base_epoch, None,
1331            "the resolved partial must be the full base, not the reference",
1332        );
1333        assert_eq!(restored.operators[0].1, vec![1, 2, 3]);
1334    }
1335
1336    #[tokio::test]
1337    async fn rehydrate_no_committed_epoch_is_fresh() {
1338        let backend = InProcessBackend::new(4);
1339        let report = VnodeRehydrator::new(&backend).rehydrate(&[0, 1]).await;
1340        assert_eq!(report.epoch, None);
1341        assert!(report.restored.is_empty());
1342        assert_eq!(report.missing, vec![0, 1]);
1343    }
1344
1345    #[tokio::test]
1346    async fn rehydrate_empty_request_is_noop() {
1347        let backend = InProcessBackend::new(4);
1348        seal_epoch(&backend, 1, &[0], b"x").await;
1349        let report = VnodeRehydrator::new(&backend).rehydrate(&[]).await;
1350        assert_eq!(report.epoch, None);
1351        assert!(report.restored.is_empty());
1352        assert!(report.missing.is_empty());
1353    }
1354
1355    #[tokio::test]
1356    async fn rehydrate_over_object_store_backend() {
1357        use object_store::local::LocalFileSystem;
1358        use object_store::ObjectStore;
1359
1360        let dir = tempfile::tempdir().unwrap();
1361        let store: std::sync::Arc<dyn ObjectStore> =
1362            std::sync::Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
1363        let backend = ObjectStoreBackend::new(store, "node-0", 4);
1364        seal_epoch(&backend, 5, &[0, 1], b"durable").await;
1365
1366        let report = VnodeRehydrator::new(&backend).rehydrate(&[0, 1]).await;
1367
1368        assert_eq!(report.epoch, Some(5));
1369        assert_eq!(report.restored_count(), 2);
1370        assert_eq!(
1371            report.restored.get(&1).map(|b| &b[..]),
1372            Some(&b"durable"[..])
1373        );
1374    }
1375}