Skip to main content

laminar_db/
recovery_manager.rs

1//! Unified recovery manager.
2//!
3//! Single recovery path that loads a
4//! [`CheckpointManifest`](laminar_storage::checkpoint_manifest::CheckpointManifest) and restores
5//! ALL state: source offsets, sink epochs, operator states, table offsets,
6//! and watermarks.
7//!
8//! ## Recovery Protocol
9//!
10//! 1. `store.load_latest()` → `Option<CheckpointManifest>`
11//! 2. If `None` → fresh start (no recovery needed)
12//! 3. For each source: `source.restore(manifest.source_offsets[name])`
13//! 4. For each table source: `source.restore(manifest.table_offsets[name])`
14//! 5. For each exactly-once sink: `sink.rollback_epoch(manifest.epoch)`
15//! 6. Return recovered state (watermark, epoch, operator states)
16//!    — caller is responsible for restoring DAG/TPC operators from `operator_states`
17//!
18//! ## Fallback Recovery
19//!
20//! If the latest checkpoint is corrupt or fails to restore, the manager
21//! iterates through all available checkpoints in reverse chronological order
22//! until one succeeds. This prevents a single corrupt checkpoint from causing
23//! total data loss.
24#![allow(clippy::disallowed_types)] // cold path
25
26use std::collections::HashMap;
27
28use laminar_storage::checkpoint_manifest::{CheckpointManifest, SinkCommitStatus};
29use laminar_storage::checkpoint_store::CheckpointStore;
30use laminar_storage::ValidationResult;
31use tracing::{debug, error, info, warn};
32
33use crate::checkpoint_coordinator::{
34    connector_to_source_checkpoint, RegisteredSink, RegisteredSource,
35};
36use crate::error::DbError;
37
38/// Result of a successful recovery from a checkpoint.
39#[derive(Debug)]
40pub struct RecoveredState {
41    /// The manifest that was loaded and restored from.
42    pub manifest: CheckpointManifest,
43    /// Number of sources successfully restored.
44    pub sources_restored: usize,
45    /// Number of table sources successfully restored.
46    pub tables_restored: usize,
47    /// Number of sinks rolled back.
48    pub sinks_rolled_back: usize,
49    /// Sources that failed to restore (name → error message).
50    pub source_errors: HashMap<String, String>,
51    /// Sinks that failed to roll back (name → error message).
52    pub sink_errors: HashMap<String, String>,
53}
54
55impl RecoveredState {
56    /// Returns the recovered epoch.
57    #[must_use]
58    pub fn epoch(&self) -> u64 {
59        self.manifest.epoch
60    }
61
62    /// Returns the recovered watermark.
63    #[must_use]
64    pub fn watermark(&self) -> Option<i64> {
65        self.manifest.watermark
66    }
67
68    /// Returns whether there were any errors during recovery.
69    #[must_use]
70    pub fn has_errors(&self) -> bool {
71        !self.source_errors.is_empty() || !self.sink_errors.is_empty()
72    }
73
74    /// Returns the recovered operator states (for DAG restoration).
75    #[must_use]
76    pub fn operator_states(
77        &self,
78    ) -> &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
79        &self.manifest.operator_states
80    }
81
82    /// Returns the table store checkpoint path, if any.
83    #[must_use]
84    pub fn table_store_checkpoint_path(&self) -> Option<&str> {
85        self.manifest.table_store_checkpoint_path.as_deref()
86    }
87}
88
89/// Recovery manager.
90///
91/// Loads the latest [`CheckpointManifest`] from a [`CheckpointStore`] and
92/// restores all registered sources, sinks, and tables to their checkpointed
93/// state.
94pub struct RecoveryManager<'a> {
95    store: &'a dyn CheckpointStore,
96    /// When true, any source/sink restore failure aborts the entire recovery.
97    /// When false (default), failures are logged and recorded in `RecoveredState`
98    /// but recovery continues — the pipeline resumes with potentially
99    /// mismatched offsets.
100    strict: bool,
101}
102
103impl<'a> RecoveryManager<'a> {
104    /// Creates a new recovery manager using the given checkpoint store.
105    ///
106    /// Defaults to strict mode: any source/sink restore failure aborts
107    /// the entire recovery. Use [`Self::lenient`] to allow partial recovery.
108    #[must_use]
109    pub fn new(store: &'a dyn CheckpointStore) -> Self {
110        Self {
111            store,
112            strict: true,
113        }
114    }
115
116    /// Creates a lenient recovery manager.
117    ///
118    /// In lenient mode, source/sink restore failures are logged and
119    /// recorded in `RecoveredState` but do not abort recovery. The
120    /// pipeline resumes with potentially mismatched offsets.
121    #[must_use]
122    pub fn lenient(store: &'a dyn CheckpointStore) -> Self {
123        Self {
124            store,
125            strict: false,
126        }
127    }
128
129    /// Attempts to recover from the latest checkpoint, with fallback to older
130    /// checkpoints if the latest is corrupt or fails to restore.
131    ///
132    /// Returns `Ok(None)` if no checkpoint exists (fresh start).
133    /// Returns `Ok(Some(RecoveredState))` on successful recovery.
134    ///
135    /// Recovery is best-effort: individual source/sink failures are recorded
136    /// in `RecoveredState` but do not abort the entire recovery. This allows
137    /// partial recovery (e.g., one source fails to seek but others succeed).
138    ///
139    /// ## Fallback Behavior
140    ///
141    /// If the latest checkpoint fails (corrupt manifest, deserialization error),
142    /// the manager iterates through all available checkpoints in reverse
143    /// chronological order until one succeeds or all are exhausted. This
144    /// prevents a single corrupt checkpoint from causing total data loss.
145    ///
146    /// # Errors
147    ///
148    /// Returns `DbError::Checkpoint` if the checkpoint store fails, or
149    /// in strict mode, if any source/sink restore fails.
150    pub(crate) async fn recover(
151        &self,
152        sources: &[RegisteredSource],
153        sinks: &[RegisteredSink],
154        table_sources: &[RegisteredSource],
155    ) -> Result<Option<RecoveredState>, DbError> {
156        // Fast path: try load_latest() first.
157        match self.store.load_latest().await {
158            Ok(Some(manifest)) => {
159                if self.is_checkpoint_corrupt(&manifest).await {
160                    warn!(
161                        checkpoint_id = manifest.checkpoint_id,
162                        "[LDB-6010] latest checkpoint corrupt, trying fallback"
163                    );
164                } else if Self::has_pending_sinks(&manifest) {
165                    warn!(
166                        checkpoint_id = manifest.checkpoint_id,
167                        epoch = manifest.epoch,
168                        "[LDB-6015] checkpoint has uncommitted sinks — source offsets \
169                         may be past uncommitted data, falling back to previous checkpoint"
170                    );
171                } else {
172                    let state = self
173                        .restore_from(manifest, sources, sinks, table_sources)
174                        .await;
175                    if let Err(e) = self.check_strict(&state) {
176                        warn!(
177                            checkpoint_id = state.manifest.checkpoint_id,
178                            error = %e,
179                            "latest checkpoint restore had strict errors, trying fallback"
180                        );
181                    } else {
182                        return Ok(Some(state));
183                    }
184                }
185            }
186            Ok(None) => {
187                info!("no checkpoint found, starting fresh");
188                return Ok(None);
189            }
190            Err(e) => {
191                warn!(error = %e, "latest checkpoint load failed, trying fallback");
192            }
193        }
194
195        // Fallback: iterate through all checkpoints in reverse order.
196        let checkpoints = self.store.list().await.map_err(DbError::from)?;
197
198        if checkpoints.is_empty() {
199            warn!("no checkpoints available for fallback, starting fresh");
200            return Ok(None);
201        }
202
203        for &(checkpoint_id, _epoch) in checkpoints.iter().rev() {
204            match self.store.load_by_id(checkpoint_id).await {
205                Ok(Some(manifest)) => {
206                    if self.is_checkpoint_corrupt(&manifest).await {
207                        warn!(
208                            checkpoint_id,
209                            "[LDB-6010] fallback checkpoint corrupt, skipping"
210                        );
211                        continue;
212                    }
213                    if Self::has_pending_sinks(&manifest) {
214                        warn!(
215                            checkpoint_id,
216                            "[LDB-6015] fallback checkpoint has uncommitted sinks, skipping"
217                        );
218                        continue;
219                    }
220                    info!(checkpoint_id, "recovering from fallback checkpoint");
221                    let state = self
222                        .restore_from(manifest, sources, sinks, table_sources)
223                        .await;
224                    if let Err(e) = self.check_strict(&state) {
225                        warn!(
226                            checkpoint_id,
227                            error = %e,
228                            "fallback checkpoint restore had strict errors, trying next"
229                        );
230                        continue;
231                    }
232                    return Ok(Some(state));
233                }
234                Ok(None) => {
235                    debug!(checkpoint_id, "fallback checkpoint not found, skipping");
236                }
237                Err(e) => {
238                    warn!(
239                        checkpoint_id,
240                        error = %e,
241                        "fallback checkpoint load failed, trying next"
242                    );
243                }
244            }
245        }
246
247        warn!("all checkpoints failed to load, starting fresh");
248        Ok(None)
249    }
250
251    /// Resolves external operator states by loading the sidecar file.
252    ///
253    /// For any operator state marked as `external`, loads the corresponding
254    /// bytes from `state.bin` and replaces it with an inline entry. This
255    /// makes the rest of recovery code work uniformly with inline state.
256    ///
257    /// Returns `true` if all external states were resolved successfully.
258    /// Returns `false` if any state could not be resolved (missing sidecar,
259    /// truncated sidecar, or I/O error). In strict mode, the caller should
260    /// treat a `false` return as a corrupt checkpoint and try fallback.
261    async fn resolve_external_states(&self, manifest: &mut CheckpointManifest) -> bool {
262        let external_ops: Vec<String> = manifest
263            .operator_states
264            .iter()
265            .filter(|(_, op)| op.external)
266            .map(|(name, _)| name.clone())
267            .collect();
268
269        if external_ops.is_empty() {
270            return true;
271        }
272
273        let state_data = match self.store.load_state_data(manifest.checkpoint_id).await {
274            Ok(Some(data)) => data,
275            Ok(None) => {
276                error!(
277                    checkpoint_id = manifest.checkpoint_id,
278                    operators = ?external_ops,
279                    "[LDB-6010] sidecar state.bin missing — external operator states \
280                     cannot be resolved; operators will start with empty state"
281                );
282                // Clear external flag so recovery doesn't attempt to
283                // dereference invalid offsets later
284                for name in &external_ops {
285                    if let Some(op) = manifest.operator_states.get_mut(name) {
286                        *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
287                    }
288                }
289                return false;
290            }
291            Err(e) => {
292                error!(
293                    checkpoint_id = manifest.checkpoint_id,
294                    error = %e,
295                    operators = ?external_ops,
296                    "[LDB-6010] failed to load sidecar state.bin — external operator states \
297                     cannot be resolved; operators will start with empty state"
298                );
299                for name in &external_ops {
300                    if let Some(op) = manifest.operator_states.get_mut(name) {
301                        *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
302                    }
303                }
304                return false;
305            }
306        };
307
308        let mut all_resolved = true;
309        for (name, op) in &mut manifest.operator_states {
310            if op.external {
311                #[allow(clippy::cast_possible_truncation)] // Sidecar files are always < 4 GB
312                let start = op.external_offset as usize;
313                #[allow(clippy::cast_possible_truncation)]
314                let end = start + op.external_length as usize;
315                if end <= state_data.len() {
316                    let external_offset = op.external_offset;
317                    let external_length = op.external_length;
318                    let data = &state_data[start..end];
319                    *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(data);
320                    debug!(
321                        operator = %name,
322                        offset = external_offset,
323                        length = external_length,
324                        "resolved external operator state from sidecar"
325                    );
326                } else {
327                    error!(
328                        operator = %name,
329                        offset = start,
330                        length = op.external_length,
331                        sidecar_len = state_data.len(),
332                        "[LDB-6010] sidecar too small for external operator state — \
333                         operator will start with empty state"
334                    );
335                    *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
336                    all_resolved = false;
337                }
338            }
339        }
340        all_resolved
341    }
342
343    /// Restores pipeline state from a loaded manifest.
344    ///
345    /// This is the inner restore logic shared by both the fast path
346    /// (latest checkpoint) and fallback path (older checkpoints).
347    #[allow(clippy::too_many_lines)]
348    async fn restore_from(
349        &self,
350        mut manifest: CheckpointManifest,
351        sources: &[RegisteredSource],
352        sinks: &[RegisteredSink],
353        table_sources: &[RegisteredSource],
354    ) -> RecoveredState {
355        // Resolve external operator states from sidecar before recovery.
356        // In strict mode, unresolved sidecar state is recorded as a source
357        // error so check_strict() will reject this checkpoint.
358        let sidecar_ok = self.resolve_external_states(&mut manifest).await;
359        if !sidecar_ok && self.strict {
360            warn!(
361                checkpoint_id = manifest.checkpoint_id,
362                "[LDB-6010] sidecar resolution failed in strict mode — \
363                 checkpoint will be rejected"
364            );
365        }
366
367        // Validate manifest consistency before restoring state.
368        // `DEFAULT_VNODE_COUNT` is a placeholder; the runtime
369        // `VnodeRegistry` value is not threaded through recovery yet.
370        let validation_errors =
371            manifest.validate(laminar_storage::checkpoint_manifest::DEFAULT_VNODE_COUNT);
372        if !validation_errors.is_empty() {
373            for err in &validation_errors {
374                warn!(
375                    checkpoint_id = manifest.checkpoint_id,
376                    error = %err,
377                    "manifest validation warning"
378                );
379            }
380        }
381
382        // Topology drift detection: compare current sources/sinks against
383        // the checkpoint to warn the operator about changes.
384        if !manifest.source_names.is_empty() {
385            let mut current_sources: Vec<&str> = sources.iter().map(|s| s.name.as_str()).collect();
386            current_sources.sort_unstable();
387            let checkpoint_sources: Vec<&str> =
388                manifest.source_names.iter().map(String::as_str).collect();
389            let added: Vec<&&str> = current_sources
390                .iter()
391                .filter(|n| !checkpoint_sources.contains(n))
392                .collect();
393            let removed: Vec<&&str> = checkpoint_sources
394                .iter()
395                .filter(|n| !current_sources.contains(n))
396                .collect();
397            if !added.is_empty() {
398                warn!(
399                    sources = ?added,
400                    "new sources added since checkpoint — no saved offsets"
401                );
402            }
403            if !removed.is_empty() {
404                warn!(
405                    sources = ?removed,
406                    "sources removed since checkpoint — orphaned offsets"
407                );
408            }
409        }
410        if !manifest.sink_names.is_empty() {
411            let mut current_sinks: Vec<&str> = sinks.iter().map(|s| s.name.as_str()).collect();
412            current_sinks.sort_unstable();
413            let checkpoint_sinks: Vec<&str> =
414                manifest.sink_names.iter().map(String::as_str).collect();
415            let added: Vec<&&str> = current_sinks
416                .iter()
417                .filter(|n| !checkpoint_sinks.contains(n))
418                .collect();
419            let removed: Vec<&&str> = checkpoint_sinks
420                .iter()
421                .filter(|n| !current_sinks.contains(n))
422                .collect();
423            if !added.is_empty() {
424                warn!(
425                    sinks = ?added,
426                    "new sinks added since checkpoint — no saved epoch"
427                );
428            }
429            if !removed.is_empty() {
430                warn!(
431                    sinks = ?removed,
432                    "sinks removed since checkpoint — orphaned epochs"
433                );
434            }
435        }
436
437        info!(
438            checkpoint_id = manifest.checkpoint_id,
439            epoch = manifest.epoch,
440            validation_warnings = validation_errors.len(),
441            "recovering from checkpoint"
442        );
443
444        let mut result = RecoveredState {
445            manifest: manifest.clone(),
446            sources_restored: 0,
447            tables_restored: 0,
448            sinks_rolled_back: 0,
449            source_errors: HashMap::new(),
450            sink_errors: HashMap::new(),
451        };
452
453        // Record sidecar failure so check_strict() rejects this checkpoint.
454        if !sidecar_ok {
455            result.source_errors.insert(
456                "__sidecar__".into(),
457                "[LDB-6010] sidecar state.bin missing or truncated — \
458                 operator state cannot be fully restored"
459                    .into(),
460            );
461        }
462
463        // Step 3: Restore source offsets
464        for source in sources {
465            if !source.supports_replay {
466                info!(
467                    source = %source.name,
468                    "skipping restore for non-replayable source (at-most-once)"
469                );
470                continue;
471            }
472            if let Some(cp) = manifest.source_offsets.get(&source.name) {
473                let source_cp = connector_to_source_checkpoint(cp);
474                let mut last_err = None;
475                for attempt in 0..3u32 {
476                    let mut connector = source.connector.lock().await;
477                    match connector.restore(&source_cp).await {
478                        Ok(()) => {
479                            last_err = None;
480                            break;
481                        }
482                        Err(e) => {
483                            warn!(
484                                source = %source.name, attempt,
485                                error = %e, "source restore failed, retrying"
486                            );
487                            last_err = Some(e);
488                            drop(connector);
489                            if attempt < 2 {
490                                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
491                            }
492                        }
493                    }
494                }
495                if let Some(e) = last_err {
496                    let msg = format!("source restore failed after 3 attempts: {e}");
497                    result.source_errors.insert(source.name.clone(), msg);
498                } else {
499                    result.sources_restored += 1;
500                    debug!(source = %source.name, epoch = cp.epoch, "source restored");
501                }
502            }
503        }
504
505        // Step 4: Restore table source offsets
506        for table_source in table_sources {
507            if let Some(cp) = manifest.table_offsets.get(&table_source.name) {
508                let source_cp = connector_to_source_checkpoint(cp);
509                let mut connector = table_source.connector.lock().await;
510                match connector.restore(&source_cp).await {
511                    Ok(()) => {
512                        result.tables_restored += 1;
513                        debug!(table = %table_source.name, epoch = cp.epoch, "table source restored");
514                    }
515                    Err(e) => {
516                        let msg = format!("table source restore failed: {e}");
517                        warn!(table = %table_source.name, error = %e, "table source restore failed");
518                        result.source_errors.insert(table_source.name.clone(), msg);
519                    }
520                }
521            }
522        }
523
524        // Step 5: Rollback sinks for exactly-once semantics.
525        // Only roll back sinks that did NOT successfully commit (Pending or Failed).
526        // Sinks with SinkCommitStatus::Committed already completed their commit
527        // and should not be rolled back.
528        for sink in sinks {
529            if sink.exactly_once {
530                // Check per-sink commit status — if the sink committed, skip rollback.
531                let already_committed = manifest
532                    .sink_commit_statuses
533                    .get(&sink.name)
534                    .is_some_and(|s| matches!(s, SinkCommitStatus::Committed));
535
536                if already_committed {
537                    debug!(
538                        sink = %sink.name,
539                        epoch = manifest.epoch,
540                        "sink already committed, skipping rollback"
541                    );
542                    continue;
543                }
544
545                match sink.handle.rollback_epoch(manifest.epoch).await {
546                    Ok(()) => {
547                        result.sinks_rolled_back += 1;
548                        debug!(sink = %sink.name, epoch = manifest.epoch, "sink rolled back");
549                    }
550                    Err(e) => {
551                        result
552                            .sink_errors
553                            .insert(sink.name.clone(), format!("rollback failed: {e}"));
554                        warn!(
555                            sink = %sink.name,
556                            epoch = manifest.epoch,
557                            error = %e,
558                            "[LDB-6016] sink rollback failed during recovery"
559                        );
560                    }
561                }
562            }
563        }
564
565        info!(
566            checkpoint_id = manifest.checkpoint_id,
567            epoch = manifest.epoch,
568            sources_restored = result.sources_restored,
569            tables_restored = result.tables_restored,
570            sinks_rolled_back = result.sinks_rolled_back,
571            errors = result.source_errors.len() + result.sink_errors.len(),
572            "recovery complete"
573        );
574
575        result
576    }
577
578    /// Checks whether a checkpoint's sidecar data is corrupt.
579    ///
580    /// Returns `true` if the checkpoint has a `state_checksum` and
581    /// [`CheckpointStore::validate_checkpoint`] reports a checksum mismatch
582    /// or missing sidecar. Returns `false` if there is no sidecar, or
583    /// if the sidecar is valid, or if validation I/O fails (we proceed
584    /// with caution rather than blocking recovery).
585    /// Returns `true` if the checkpoint fails integrity validation.
586    ///
587    /// Checks both sidecar checksum and manifest validity. Any validation
588    /// failure (sidecar corruption, missing data, or manifest issues like
589    /// epoch=0) causes the checkpoint to be rejected so fallback can try
590    /// an older one.
591    ///
592    /// Only returns `false` (proceed) when validation passes OR when the
593    /// checkpoint has no sidecar to validate.
594    async fn is_checkpoint_corrupt(&self, manifest: &CheckpointManifest) -> bool {
595        // No sidecar and no state_checksum → nothing to validate beyond
596        // manifest parsing (which already succeeded if we got here).
597        if manifest.state_checksum.is_none() && manifest.operator_states.is_empty() {
598            return false;
599        }
600        match self.store.validate_checkpoint(manifest.checkpoint_id).await {
601            Ok(ValidationResult {
602                valid: false,
603                ref issues,
604                ..
605            }) => {
606                error!(
607                    checkpoint_id = manifest.checkpoint_id,
608                    issues = ?issues,
609                    "[LDB-6010] checkpoint integrity check failed"
610                );
611                true
612            }
613            Ok(_) => false, // valid
614            Err(e) => {
615                // I/O errors during validation are treated as corruption —
616                // if we can't verify the checkpoint, don't trust it.
617                error!(
618                    checkpoint_id = manifest.checkpoint_id,
619                    error = %e,
620                    "[LDB-6010] checkpoint validation I/O error — \
621                     treating as corrupt for safety"
622                );
623                true
624            }
625        }
626    }
627
628    /// In strict mode, returns an error if any source or sink had restore failures.
629    /// Returns true if any exactly-once sink has Pending commit status.
630    ///
631    /// A checkpoint with Pending sinks was persisted before sink commit
632    /// completed. Recovering from it would advance source offsets past
633    /// data the sinks never received — causing silent data loss.
634    fn has_pending_sinks(manifest: &CheckpointManifest) -> bool {
635        manifest
636            .sink_commit_statuses
637            .values()
638            .any(|s| matches!(s, SinkCommitStatus::Pending))
639    }
640
641    fn check_strict(&self, state: &RecoveredState) -> Result<(), DbError> {
642        if !self.strict || !state.has_errors() {
643            return Ok(());
644        }
645        let mut msgs: Vec<String> = state
646            .source_errors
647            .iter()
648            .map(|(k, v)| format!("source '{k}': {v}"))
649            .collect();
650        for (k, v) in &state.sink_errors {
651            msgs.push(format!("sink '{k}': {v}"));
652        }
653        Err(DbError::Checkpoint(format!(
654            "strict recovery failed — {} restore error(s): {}",
655            msgs.len(),
656            msgs.join("; ")
657        )))
658    }
659
660    /// Loads the latest manifest without performing recovery.
661    ///
662    /// Useful for inspecting checkpoint state or building a recovery plan.
663    ///
664    /// # Errors
665    ///
666    /// Returns `DbError::Checkpoint` if the store fails.
667    pub async fn load_latest(&self) -> Result<Option<CheckpointManifest>, DbError> {
668        self.store.load_latest().await.map_err(DbError::from)
669    }
670
671    /// Loads a specific checkpoint by ID.
672    ///
673    /// # Errors
674    ///
675    /// Returns `DbError::Checkpoint` if the store fails.
676    pub async fn load_by_id(
677        &self,
678        checkpoint_id: u64,
679    ) -> Result<Option<CheckpointManifest>, DbError> {
680        self.store
681            .load_by_id(checkpoint_id)
682            .await
683            .map_err(DbError::from)
684    }
685}
686
687#[cfg(test)]
688mod tests {
689    use super::*;
690    use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
691    use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
692
693    fn make_store(dir: &std::path::Path) -> FileSystemCheckpointStore {
694        FileSystemCheckpointStore::new(dir, 3)
695    }
696
697    #[tokio::test]
698    async fn test_recover_no_checkpoint() {
699        let dir = tempfile::tempdir().unwrap();
700        let store = make_store(dir.path());
701        let mgr = RecoveryManager::new(&store);
702
703        let result = mgr.recover(&[], &[], &[]).await.unwrap();
704        assert!(result.is_none());
705    }
706
707    #[tokio::test]
708    async fn test_recover_empty_checkpoint() {
709        let dir = tempfile::tempdir().unwrap();
710        let store = make_store(dir.path());
711
712        // Save a basic checkpoint
713        let manifest = CheckpointManifest::new(1, 5);
714        store.save(&manifest).await.unwrap();
715
716        let mgr = RecoveryManager::new(&store);
717        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
718
719        assert_eq!(result.epoch(), 5);
720        assert_eq!(result.sources_restored, 0);
721        assert_eq!(result.tables_restored, 0);
722        assert_eq!(result.sinks_rolled_back, 0);
723        assert!(!result.has_errors());
724    }
725
726    #[tokio::test]
727    async fn test_recover_with_watermark() {
728        let dir = tempfile::tempdir().unwrap();
729        let store = make_store(dir.path());
730
731        let mut manifest = CheckpointManifest::new(1, 3);
732        manifest.watermark = Some(42_000);
733        store.save(&manifest).await.unwrap();
734
735        let mgr = RecoveryManager::new(&store);
736        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
737
738        assert_eq!(result.watermark(), Some(42_000));
739    }
740
741    #[tokio::test]
742    async fn test_recover_with_operator_states() {
743        let dir = tempfile::tempdir().unwrap();
744        let store = make_store(dir.path());
745
746        let mut manifest = CheckpointManifest::new(1, 7);
747        manifest
748            .operator_states
749            .insert("0".to_string(), OperatorCheckpoint::inline(b"window-state"));
750        manifest
751            .operator_states
752            .insert("3".to_string(), OperatorCheckpoint::inline(b"filter-state"));
753        store.save(&manifest).await.unwrap();
754
755        let mgr = RecoveryManager::new(&store);
756        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
757
758        assert_eq!(result.operator_states().len(), 2);
759        let op0 = result.operator_states().get("0").unwrap();
760        assert_eq!(op0.decode_inline().unwrap(), b"window-state");
761    }
762
763    #[tokio::test]
764    async fn test_recover_table_store_path() {
765        let dir = tempfile::tempdir().unwrap();
766        let store = make_store(dir.path());
767
768        let mut manifest = CheckpointManifest::new(1, 1);
769        manifest.table_store_checkpoint_path = Some("/data/rocksdb_cp_001".into());
770        store.save(&manifest).await.unwrap();
771
772        let mgr = RecoveryManager::new(&store);
773        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
774
775        assert_eq!(
776            result.table_store_checkpoint_path(),
777            Some("/data/rocksdb_cp_001")
778        );
779    }
780
781    #[tokio::test]
782    async fn test_load_latest_no_checkpoint() {
783        let dir = tempfile::tempdir().unwrap();
784        let store = make_store(dir.path());
785        let mgr = RecoveryManager::new(&store);
786
787        assert!(mgr.load_latest().await.unwrap().is_none());
788    }
789
790    #[tokio::test]
791    async fn test_load_by_id() {
792        let dir = tempfile::tempdir().unwrap();
793        let store = make_store(dir.path());
794
795        store.save(&CheckpointManifest::new(1, 1)).await.unwrap();
796        store.save(&CheckpointManifest::new(2, 2)).await.unwrap();
797
798        let mgr = RecoveryManager::new(&store);
799        let m = mgr.load_by_id(1).await.unwrap().unwrap();
800        assert_eq!(m.checkpoint_id, 1);
801
802        let m2 = mgr.load_by_id(2).await.unwrap().unwrap();
803        assert_eq!(m2.checkpoint_id, 2);
804
805        assert!(mgr.load_by_id(999).await.unwrap().is_none());
806    }
807
808    #[tokio::test]
809    async fn test_recover_fallback_to_previous_checkpoint() {
810        let dir = tempfile::tempdir().unwrap();
811        let store = FileSystemCheckpointStore::new(dir.path(), 10);
812
813        // Save two valid checkpoints
814        let mut m1 = CheckpointManifest::new(1, 10);
815        m1.watermark = Some(1000);
816        store.save(&m1).await.unwrap();
817
818        let mut m2 = CheckpointManifest::new(2, 20);
819        m2.watermark = Some(2000);
820        store.save(&m2).await.unwrap();
821
822        // Corrupt the latest checkpoint by writing invalid JSON
823        let latest_manifest_path = dir
824            .path()
825            .join("checkpoints")
826            .join("checkpoint_000002")
827            .join("manifest.json");
828        std::fs::write(&latest_manifest_path, "not valid json!!!").unwrap();
829
830        // Also corrupt latest.txt to point to the corrupt checkpoint
831        // (it already does from the save, but the manifest file is now corrupt)
832
833        let mgr = RecoveryManager::new(&store);
834        let result = mgr.recover(&[], &[], &[]).await.unwrap();
835
836        // Should fall back to checkpoint 1
837        let recovered = result.expect("should recover from fallback checkpoint");
838        assert_eq!(recovered.manifest.checkpoint_id, 1);
839        assert_eq!(recovered.epoch(), 10);
840        assert_eq!(recovered.watermark(), Some(1000));
841    }
842
843    #[tokio::test]
844    async fn test_recover_all_checkpoints_corrupt_starts_fresh() {
845        let dir = tempfile::tempdir().unwrap();
846        let store = FileSystemCheckpointStore::new(dir.path(), 10);
847
848        // Save a checkpoint then corrupt it
849        store.save(&CheckpointManifest::new(1, 5)).await.unwrap();
850
851        let manifest_path = dir
852            .path()
853            .join("checkpoints")
854            .join("checkpoint_000001")
855            .join("manifest.json");
856        std::fs::write(&manifest_path, "corrupt").unwrap();
857
858        let mgr = RecoveryManager::new(&store);
859        let result = mgr.recover(&[], &[], &[]).await.unwrap();
860
861        // All checkpoints corrupt → fresh start
862        assert!(result.is_none());
863    }
864
865    #[tokio::test]
866    async fn test_recover_latest_ok_no_fallback_needed() {
867        let dir = tempfile::tempdir().unwrap();
868        let store = FileSystemCheckpointStore::new(dir.path(), 10);
869
870        store.save(&CheckpointManifest::new(1, 10)).await.unwrap();
871        store.save(&CheckpointManifest::new(2, 20)).await.unwrap();
872
873        let mgr = RecoveryManager::new(&store);
874        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
875
876        // Should use the latest (no fallback needed)
877        assert_eq!(result.manifest.checkpoint_id, 2);
878        assert_eq!(result.epoch(), 20);
879    }
880
881    #[tokio::test]
882    async fn test_recover_with_sidecar_state() {
883        let dir = tempfile::tempdir().unwrap();
884        let store = make_store(dir.path());
885
886        // Build a manifest with an external operator state
887        let mut manifest = CheckpointManifest::new(1, 5);
888        let large_data = vec![0xAB; 2048];
889        manifest
890            .operator_states
891            .insert("big-op".into(), OperatorCheckpoint::external(0, 2048));
892
893        // Write sidecar first, then manifest
894        store
895            .save_state_data(1, &[bytes::Bytes::copy_from_slice(&large_data)])
896            .await
897            .unwrap();
898        store.save(&manifest).await.unwrap();
899
900        let mgr = RecoveryManager::new(&store);
901        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
902
903        // External state should have been resolved to inline
904        let op = result.operator_states().get("big-op").unwrap();
905        assert!(!op.external, "external state should be resolved to inline");
906        assert_eq!(op.decode_inline().unwrap(), large_data);
907    }
908
909    #[tokio::test]
910    async fn test_recover_mixed_inline_and_external() {
911        let dir = tempfile::tempdir().unwrap();
912        let store = make_store(dir.path());
913
914        let mut manifest = CheckpointManifest::new(1, 3);
915        // Small inline state
916        manifest
917            .operator_states
918            .insert("small-op".into(), OperatorCheckpoint::inline(b"tiny"));
919        // Large external state
920        let large_data = vec![0xCD; 4096];
921        manifest
922            .operator_states
923            .insert("big-op".into(), OperatorCheckpoint::external(0, 4096));
924
925        store
926            .save_state_data(1, &[bytes::Bytes::copy_from_slice(&large_data)])
927            .await
928            .unwrap();
929        store.save(&manifest).await.unwrap();
930
931        let mgr = RecoveryManager::new(&store);
932        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
933
934        let small = result.operator_states().get("small-op").unwrap();
935        assert_eq!(small.decode_inline().unwrap(), b"tiny");
936
937        let big = result.operator_states().get("big-op").unwrap();
938        assert_eq!(big.decode_inline().unwrap(), large_data);
939    }
940
941    #[tokio::test]
942    async fn test_recover_missing_sidecar_graceful() {
943        let dir = tempfile::tempdir().unwrap();
944        let store = make_store(dir.path());
945
946        // Manifest references external state but sidecar is missing
947        let mut manifest = CheckpointManifest::new(1, 1);
948        manifest
949            .operator_states
950            .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
951        store.save(&manifest).await.unwrap();
952
953        // Use lenient mode — graceful degradation replaces missing
954        // sidecar state with empty inline. Strict mode rejects this
955        // checkpoint entirely (see test_recover_missing_sidecar_strict).
956        let mgr = RecoveryManager::lenient(&store);
957        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
958
959        // Should still recover (gracefully) — external state replaced with
960        // empty inline to avoid dangling offset references
961        let op = result.operator_states().get("orphan").unwrap();
962        assert!(
963            !op.external,
964            "unresolved external state replaced with inline empty"
965        );
966        assert!(
967            op.state_b64.as_ref().is_none_or(String::is_empty),
968            "replaced state should be empty"
969        );
970    }
971
972    #[tokio::test]
973    async fn test_recovered_state_has_errors() {
974        let state = RecoveredState {
975            manifest: CheckpointManifest::new(1, 1),
976            sources_restored: 0,
977            tables_restored: 0,
978            sinks_rolled_back: 0,
979            source_errors: HashMap::new(),
980            sink_errors: HashMap::new(),
981        };
982        assert!(!state.has_errors());
983
984        let state_with_errors = RecoveredState {
985            manifest: CheckpointManifest::new(1, 1),
986            sources_restored: 0,
987            tables_restored: 0,
988            sinks_rolled_back: 0,
989            source_errors: HashMap::from([("source1".into(), "failed".into())]),
990            sink_errors: HashMap::new(),
991        };
992        assert!(state_with_errors.has_errors());
993    }
994
995    #[tokio::test]
996    async fn test_recover_missing_sidecar_strict_rejects() {
997        let dir = tempfile::tempdir().unwrap();
998        let store = make_store(dir.path());
999
1000        // Manifest references external state but sidecar is missing
1001        let mut manifest = CheckpointManifest::new(1, 1);
1002        manifest
1003            .operator_states
1004            .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
1005        store.save(&manifest).await.unwrap();
1006
1007        // Strict mode: missing sidecar causes the checkpoint to be rejected
1008        // and recovery falls back. With only one checkpoint, this means
1009        // fresh start.
1010        let mgr = RecoveryManager::new(&store);
1011        let result = mgr.recover(&[], &[], &[]).await.unwrap();
1012        assert!(
1013            result.is_none(),
1014            "strict mode should reject checkpoint with missing sidecar"
1015        );
1016    }
1017
1018    #[tokio::test]
1019    async fn test_recover_skips_pending_sinks_falls_back() {
1020        let dir = tempfile::tempdir().unwrap();
1021        let store = make_store(dir.path());
1022
1023        // Epoch 1: fully committed checkpoint (good).
1024        let mut m1 = CheckpointManifest::new(1, 1);
1025        m1.sink_commit_statuses
1026            .insert("delta_sink".into(), SinkCommitStatus::Committed);
1027        store.save(&m1).await.unwrap();
1028
1029        // Epoch 2: crashed between manifest persist and sink commit (Pending).
1030        let mut m2 = CheckpointManifest::new(2, 2);
1031        m2.sink_commit_statuses
1032            .insert("delta_sink".into(), SinkCommitStatus::Pending);
1033        store.save(&m2).await.unwrap();
1034
1035        let mgr = RecoveryManager::new(&store);
1036        let result = mgr.recover(&[], &[], &[]).await.unwrap();
1037        let state = result.expect("should recover from epoch 1 fallback");
1038
1039        // Must fall back to epoch 1 (the last fully committed checkpoint),
1040        // not epoch 2 (which has uncommitted sink data).
1041        assert_eq!(
1042            state.epoch(),
1043            1,
1044            "recovery must skip checkpoint with Pending sinks"
1045        );
1046    }
1047
1048    #[tokio::test]
1049    async fn test_recover_all_pending_starts_fresh() {
1050        let dir = tempfile::tempdir().unwrap();
1051        let store = make_store(dir.path());
1052
1053        // Only checkpoint has Pending sinks — no safe fallback.
1054        let mut m = CheckpointManifest::new(1, 1);
1055        m.sink_commit_statuses
1056            .insert("sink".into(), SinkCommitStatus::Pending);
1057        store.save(&m).await.unwrap();
1058
1059        let mgr = RecoveryManager::new(&store);
1060        let result = mgr.recover(&[], &[], &[]).await.unwrap();
1061        assert!(
1062            result.is_none(),
1063            "should start fresh when all checkpoints have pending sinks"
1064        );
1065    }
1066}