Skip to main content

laminar_db/
recovery_manager.rs

1//! Unified recovery manager.
2//!
3//! Single recovery path that loads a
4//! [`CheckpointManifest`](laminar_storage::checkpoint_manifest::CheckpointManifest) and restores
5//! ALL state: source offsets, sink epochs, operator states, table offsets,
6//! and watermarks.
7//!
8//! ## Recovery Protocol
9//!
10//! 1. `store.load_latest()` → `Option<CheckpointManifest>`
11//! 2. If `None` → fresh start (no recovery needed)
12//! 3. For each source: `source.restore(manifest.source_offsets[name])`
13//! 4. For each table source: `source.restore(manifest.table_offsets[name])`
14//! 5. For each exactly-once sink: `sink.rollback_epoch(manifest.epoch)`
15//! 6. Return recovered state (watermark, epoch, operator states)
16//!    — caller is responsible for restoring DAG/TPC operators from `operator_states`
17//!
18//! ## Fallback Recovery
19//!
20//! If the latest checkpoint is corrupt or fails to restore, the manager
21//! iterates through all available checkpoints in reverse chronological order
22//! until one succeeds. This prevents a single corrupt checkpoint from causing
23//! total data loss.
24#![allow(clippy::disallowed_types)] // cold path
25
26use std::collections::HashMap;
27
28use laminar_storage::checkpoint_manifest::{CheckpointManifest, SinkCommitStatus};
29use laminar_storage::checkpoint_store::CheckpointStore;
30use laminar_storage::ValidationResult;
31use tracing::{debug, error, info, warn};
32
33use crate::checkpoint_coordinator::{
34    connector_to_source_checkpoint, RegisteredSink, RegisteredSource,
35};
36use crate::error::DbError;
37
38/// Result of a successful recovery from a checkpoint.
39#[derive(Debug)]
40pub struct RecoveredState {
41    /// The manifest that was loaded and restored from.
42    pub manifest: CheckpointManifest,
43    /// Number of sources successfully restored.
44    pub sources_restored: usize,
45    /// Number of table sources successfully restored.
46    pub tables_restored: usize,
47    /// Number of sinks rolled back.
48    pub sinks_rolled_back: usize,
49    /// Sources that failed to restore (name → error message).
50    pub source_errors: HashMap<String, String>,
51    /// Sinks that failed to roll back (name → error message).
52    pub sink_errors: HashMap<String, String>,
53}
54
55impl RecoveredState {
56    /// Returns the recovered epoch.
57    #[must_use]
58    pub fn epoch(&self) -> u64 {
59        self.manifest.epoch
60    }
61
62    /// Returns the recovered watermark.
63    #[must_use]
64    pub fn watermark(&self) -> Option<i64> {
65        self.manifest.watermark
66    }
67
68    /// Returns whether there were any errors during recovery.
69    #[must_use]
70    pub fn has_errors(&self) -> bool {
71        !self.source_errors.is_empty() || !self.sink_errors.is_empty()
72    }
73
74    /// Returns the recovered operator states (for DAG restoration).
75    #[must_use]
76    pub fn operator_states(
77        &self,
78    ) -> &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
79        &self.manifest.operator_states
80    }
81
82    /// Returns the WAL position from the manifest.
83    #[must_use]
84    pub fn wal_position(&self) -> u64 {
85        self.manifest.wal_position
86    }
87
88    /// Returns the per-core WAL positions from the manifest.
89    #[must_use]
90    pub fn per_core_wal_positions(&self) -> &[u64] {
91        &self.manifest.per_core_wal_positions
92    }
93
94    /// Returns the table store checkpoint path, if any.
95    #[must_use]
96    pub fn table_store_checkpoint_path(&self) -> Option<&str> {
97        self.manifest.table_store_checkpoint_path.as_deref()
98    }
99
100    /// Returns whether this checkpoint has in-flight data (unaligned checkpoint).
101    ///
102    /// When true, the caller should replay the in-flight events before resuming
103    /// normal processing to maintain exactly-once semantics.
104    #[must_use]
105    pub fn has_inflight_data(&self) -> bool {
106        !self.manifest.inflight_data.is_empty()
107    }
108
109    /// Returns the in-flight data from an unaligned checkpoint.
110    ///
111    /// Key: operator name. Value: list of in-flight records per input channel.
112    #[must_use]
113    pub fn inflight_data(
114        &self,
115    ) -> &HashMap<String, Vec<laminar_storage::checkpoint_manifest::InFlightRecord>> {
116        &self.manifest.inflight_data
117    }
118}
119
120/// Unified recovery manager.
121///
122/// Loads the latest [`CheckpointManifest`] from a [`CheckpointStore`] and
123/// restores all registered sources, sinks, and tables to their checkpointed
124/// state.
125pub struct RecoveryManager<'a> {
126    store: &'a dyn CheckpointStore,
127    /// When true, any source/sink restore failure aborts the entire recovery.
128    /// When false (default), failures are logged and recorded in `RecoveredState`
129    /// but recovery continues — the pipeline resumes with potentially
130    /// mismatched offsets.
131    strict: bool,
132}
133
134impl<'a> RecoveryManager<'a> {
135    /// Creates a new recovery manager using the given checkpoint store.
136    ///
137    /// Defaults to strict mode: any source/sink restore failure aborts
138    /// the entire recovery. Use [`Self::lenient`] to allow partial recovery.
139    #[must_use]
140    pub fn new(store: &'a dyn CheckpointStore) -> Self {
141        Self {
142            store,
143            strict: true,
144        }
145    }
146
147    /// Creates a lenient recovery manager.
148    ///
149    /// In lenient mode, source/sink restore failures are logged and
150    /// recorded in `RecoveredState` but do not abort recovery. The
151    /// pipeline resumes with potentially mismatched offsets.
152    #[must_use]
153    pub fn lenient(store: &'a dyn CheckpointStore) -> Self {
154        Self {
155            store,
156            strict: false,
157        }
158    }
159
160    /// Attempts to recover from the latest checkpoint, with fallback to older
161    /// checkpoints if the latest is corrupt or fails to restore.
162    ///
163    /// Returns `Ok(None)` if no checkpoint exists (fresh start).
164    /// Returns `Ok(Some(RecoveredState))` on successful recovery.
165    ///
166    /// Recovery is best-effort: individual source/sink failures are recorded
167    /// in `RecoveredState` but do not abort the entire recovery. This allows
168    /// partial recovery (e.g., one source fails to seek but others succeed).
169    ///
170    /// ## Fallback Behavior
171    ///
172    /// If the latest checkpoint fails (corrupt manifest, deserialization error),
173    /// the manager iterates through all available checkpoints in reverse
174    /// chronological order until one succeeds or all are exhausted. This
175    /// prevents a single corrupt checkpoint from causing total data loss.
176    ///
177    /// # Errors
178    ///
179    /// Returns `DbError::Checkpoint` if the checkpoint store fails, or
180    /// in strict mode, if any source/sink restore fails.
181    pub(crate) async fn recover(
182        &self,
183        sources: &[RegisteredSource],
184        sinks: &[RegisteredSink],
185        table_sources: &[RegisteredSource],
186    ) -> Result<Option<RecoveredState>, DbError> {
187        // Fast path: try load_latest() first.
188        match self.store.load_latest() {
189            Ok(Some(manifest)) => {
190                if self.is_checkpoint_corrupt(&manifest) {
191                    warn!(
192                        checkpoint_id = manifest.checkpoint_id,
193                        "[LDB-6010] latest checkpoint corrupt, trying fallback"
194                    );
195                } else {
196                    let state = self
197                        .restore_from(manifest, sources, sinks, table_sources)
198                        .await;
199                    if let Err(e) = self.check_strict(&state) {
200                        warn!(
201                            checkpoint_id = state.manifest.checkpoint_id,
202                            error = %e,
203                            "latest checkpoint restore had strict errors, trying fallback"
204                        );
205                    } else {
206                        return Ok(Some(state));
207                    }
208                }
209            }
210            Ok(None) => {
211                info!("no checkpoint found, starting fresh");
212                return Ok(None);
213            }
214            Err(e) => {
215                warn!(error = %e, "latest checkpoint load failed, trying fallback");
216            }
217        }
218
219        // Fallback: iterate through all checkpoints in reverse order.
220        let checkpoints = self.store.list().map_err(|e| {
221            DbError::Checkpoint(format!("failed to list checkpoints for fallback: {e}"))
222        })?;
223
224        if checkpoints.is_empty() {
225            warn!("no checkpoints available for fallback, starting fresh");
226            return Ok(None);
227        }
228
229        for &(checkpoint_id, _epoch) in checkpoints.iter().rev() {
230            match self.store.load_by_id(checkpoint_id) {
231                Ok(Some(manifest)) => {
232                    if self.is_checkpoint_corrupt(&manifest) {
233                        warn!(
234                            checkpoint_id,
235                            "[LDB-6010] fallback checkpoint corrupt, skipping"
236                        );
237                        continue;
238                    }
239                    info!(checkpoint_id, "recovering from fallback checkpoint");
240                    let state = self
241                        .restore_from(manifest, sources, sinks, table_sources)
242                        .await;
243                    if let Err(e) = self.check_strict(&state) {
244                        warn!(
245                            checkpoint_id,
246                            error = %e,
247                            "fallback checkpoint restore had strict errors, trying next"
248                        );
249                        continue;
250                    }
251                    return Ok(Some(state));
252                }
253                Ok(None) => {
254                    debug!(checkpoint_id, "fallback checkpoint not found, skipping");
255                }
256                Err(e) => {
257                    warn!(
258                        checkpoint_id,
259                        error = %e,
260                        "fallback checkpoint load failed, trying next"
261                    );
262                }
263            }
264        }
265
266        warn!("all checkpoints failed to load, starting fresh");
267        Ok(None)
268    }
269
270    /// Resolves external operator states by loading the sidecar file.
271    ///
272    /// For any operator state marked as `external`, loads the corresponding
273    /// bytes from `state.bin` and replaces it with an inline entry. This
274    /// makes the rest of recovery code work uniformly with inline state.
275    fn resolve_external_states(&self, manifest: &mut CheckpointManifest) {
276        let external_ops: Vec<String> = manifest
277            .operator_states
278            .iter()
279            .filter(|(_, op)| op.external)
280            .map(|(name, _)| name.clone())
281            .collect();
282
283        if external_ops.is_empty() {
284            return;
285        }
286
287        let state_data = match self.store.load_state_data(manifest.checkpoint_id) {
288            Ok(Some(data)) => data,
289            Ok(None) => {
290                error!(
291                    checkpoint_id = manifest.checkpoint_id,
292                    operators = ?external_ops,
293                    "sidecar state.bin missing — external operator states \
294                     cannot be resolved; operators will start with empty state"
295                );
296                // Clear external flag so recovery doesn't attempt to
297                // dereference invalid offsets later
298                for name in &external_ops {
299                    if let Some(op) = manifest.operator_states.get_mut(name) {
300                        *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
301                    }
302                }
303                return;
304            }
305            Err(e) => {
306                error!(
307                    checkpoint_id = manifest.checkpoint_id,
308                    error = %e,
309                    operators = ?external_ops,
310                    "failed to load sidecar state.bin — external operator states \
311                     cannot be resolved; operators will start with empty state"
312                );
313                for name in &external_ops {
314                    if let Some(op) = manifest.operator_states.get_mut(name) {
315                        *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
316                    }
317                }
318                return;
319            }
320        };
321
322        for (name, op) in &mut manifest.operator_states {
323            if op.external {
324                #[allow(clippy::cast_possible_truncation)] // Sidecar files are always < 4 GB
325                let start = op.external_offset as usize;
326                #[allow(clippy::cast_possible_truncation)]
327                let end = start + op.external_length as usize;
328                if end <= state_data.len() {
329                    let external_offset = op.external_offset;
330                    let external_length = op.external_length;
331                    let data = &state_data[start..end];
332                    *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(data);
333                    debug!(
334                        operator = %name,
335                        offset = external_offset,
336                        length = external_length,
337                        "resolved external operator state from sidecar"
338                    );
339                } else {
340                    error!(
341                        operator = %name,
342                        offset = start,
343                        length = op.external_length,
344                        sidecar_len = state_data.len(),
345                        "sidecar too small for external operator state — \
346                         operator will start with empty state"
347                    );
348                    *op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
349                }
350            }
351        }
352    }
353
354    /// Restores pipeline state from a loaded manifest.
355    ///
356    /// This is the inner restore logic shared by both the fast path
357    /// (latest checkpoint) and fallback path (older checkpoints).
358    #[allow(clippy::too_many_lines)]
359    async fn restore_from(
360        &self,
361        mut manifest: CheckpointManifest,
362        sources: &[RegisteredSource],
363        sinks: &[RegisteredSink],
364        table_sources: &[RegisteredSource],
365    ) -> RecoveredState {
366        // Resolve external operator states from sidecar before recovery.
367        self.resolve_external_states(&mut manifest);
368
369        // Validate manifest consistency before restoring state.
370        let validation_errors = manifest.validate();
371        if !validation_errors.is_empty() {
372            for err in &validation_errors {
373                warn!(
374                    checkpoint_id = manifest.checkpoint_id,
375                    error = %err,
376                    "manifest validation warning"
377                );
378            }
379        }
380
381        // Topology drift detection: compare current sources/sinks against
382        // the checkpoint to warn the operator about changes.
383        if !manifest.source_names.is_empty() {
384            let mut current_sources: Vec<&str> = sources.iter().map(|s| s.name.as_str()).collect();
385            current_sources.sort_unstable();
386            let checkpoint_sources: Vec<&str> =
387                manifest.source_names.iter().map(String::as_str).collect();
388            let added: Vec<&&str> = current_sources
389                .iter()
390                .filter(|n| !checkpoint_sources.contains(n))
391                .collect();
392            let removed: Vec<&&str> = checkpoint_sources
393                .iter()
394                .filter(|n| !current_sources.contains(n))
395                .collect();
396            if !added.is_empty() {
397                warn!(
398                    sources = ?added,
399                    "new sources added since checkpoint — no saved offsets"
400                );
401            }
402            if !removed.is_empty() {
403                warn!(
404                    sources = ?removed,
405                    "sources removed since checkpoint — orphaned offsets"
406                );
407            }
408        }
409        if !manifest.sink_names.is_empty() {
410            let mut current_sinks: Vec<&str> = sinks.iter().map(|s| s.name.as_str()).collect();
411            current_sinks.sort_unstable();
412            let checkpoint_sinks: Vec<&str> =
413                manifest.sink_names.iter().map(String::as_str).collect();
414            let added: Vec<&&str> = current_sinks
415                .iter()
416                .filter(|n| !checkpoint_sinks.contains(n))
417                .collect();
418            let removed: Vec<&&str> = checkpoint_sinks
419                .iter()
420                .filter(|n| !current_sinks.contains(n))
421                .collect();
422            if !added.is_empty() {
423                warn!(
424                    sinks = ?added,
425                    "new sinks added since checkpoint — no saved epoch"
426                );
427            }
428            if !removed.is_empty() {
429                warn!(
430                    sinks = ?removed,
431                    "sinks removed since checkpoint — orphaned epochs"
432                );
433            }
434        }
435
436        info!(
437            checkpoint_id = manifest.checkpoint_id,
438            epoch = manifest.epoch,
439            validation_warnings = validation_errors.len(),
440            "recovering from checkpoint"
441        );
442
443        let mut result = RecoveredState {
444            manifest: manifest.clone(),
445            sources_restored: 0,
446            tables_restored: 0,
447            sinks_rolled_back: 0,
448            source_errors: HashMap::new(),
449            sink_errors: HashMap::new(),
450        };
451
452        // Step 3: Restore source offsets
453        for source in sources {
454            if !source.supports_replay {
455                info!(
456                    source = %source.name,
457                    "skipping restore for non-replayable source (at-most-once)"
458                );
459                continue;
460            }
461            if let Some(cp) = manifest.source_offsets.get(&source.name) {
462                let source_cp = connector_to_source_checkpoint(cp);
463                let mut connector = source.connector.lock().await;
464                match connector.restore(&source_cp).await {
465                    Ok(()) => {
466                        result.sources_restored += 1;
467                        debug!(source = %source.name, epoch = cp.epoch, "source restored");
468                    }
469                    Err(e) => {
470                        let msg = format!("source restore failed: {e}");
471                        warn!(source = %source.name, error = %e, "source restore failed");
472                        result.source_errors.insert(source.name.clone(), msg);
473                    }
474                }
475            }
476        }
477
478        // Step 4: Restore table source offsets
479        for table_source in table_sources {
480            if let Some(cp) = manifest.table_offsets.get(&table_source.name) {
481                let source_cp = connector_to_source_checkpoint(cp);
482                let mut connector = table_source.connector.lock().await;
483                match connector.restore(&source_cp).await {
484                    Ok(()) => {
485                        result.tables_restored += 1;
486                        debug!(table = %table_source.name, epoch = cp.epoch, "table source restored");
487                    }
488                    Err(e) => {
489                        let msg = format!("table source restore failed: {e}");
490                        warn!(table = %table_source.name, error = %e, "table source restore failed");
491                        result.source_errors.insert(table_source.name.clone(), msg);
492                    }
493                }
494            }
495        }
496
497        // Step 5: Rollback sinks for exactly-once semantics.
498        // Only roll back sinks that did NOT successfully commit (Pending or Failed).
499        // Sinks with SinkCommitStatus::Committed already completed their commit
500        // and should not be rolled back.
501        for sink in sinks {
502            if sink.exactly_once {
503                // Check per-sink commit status — if the sink committed, skip rollback.
504                let already_committed = manifest
505                    .sink_commit_statuses
506                    .get(&sink.name)
507                    .is_some_and(|s| matches!(s, SinkCommitStatus::Committed));
508
509                if already_committed {
510                    debug!(
511                        sink = %sink.name,
512                        epoch = manifest.epoch,
513                        "sink already committed, skipping rollback"
514                    );
515                    continue;
516                }
517
518                // Rollback is fire-and-forget via the task channel.
519                sink.handle.rollback_epoch(manifest.epoch).await;
520                result.sinks_rolled_back += 1;
521                debug!(sink = %sink.name, epoch = manifest.epoch, "sink rolled back");
522            }
523        }
524
525        // Log inflight data if present (unaligned checkpoint recovery).
526        if !manifest.inflight_data.is_empty() {
527            let total_records: usize = manifest.inflight_data.values().map(Vec::len).sum();
528            info!(
529                operators = manifest.inflight_data.len(),
530                total_records, "unaligned checkpoint: inflight data present for replay"
531            );
532        }
533
534        info!(
535            checkpoint_id = manifest.checkpoint_id,
536            epoch = manifest.epoch,
537            sources_restored = result.sources_restored,
538            tables_restored = result.tables_restored,
539            sinks_rolled_back = result.sinks_rolled_back,
540            errors = result.source_errors.len() + result.sink_errors.len(),
541            has_inflight_data = !manifest.inflight_data.is_empty(),
542            "recovery complete"
543        );
544
545        result
546    }
547
548    /// Checks whether a checkpoint's sidecar data is corrupt.
549    ///
550    /// Returns `true` if the checkpoint has a `state_checksum` and
551    /// [`CheckpointStore::validate_checkpoint`] reports a checksum mismatch
552    /// or missing sidecar. Returns `false` if there is no sidecar, or
553    /// if the sidecar is valid, or if validation I/O fails (we proceed
554    /// with caution rather than blocking recovery).
555    /// Returns `true` if the checkpoint fails integrity validation.
556    ///
557    /// Checks both sidecar checksum and manifest validity. Any validation
558    /// failure (sidecar corruption, missing data, or manifest issues like
559    /// epoch=0) causes the checkpoint to be rejected so fallback can try
560    /// an older one.
561    ///
562    /// Only returns `false` (proceed) when validation passes OR when the
563    /// checkpoint has no sidecar to validate.
564    fn is_checkpoint_corrupt(&self, manifest: &CheckpointManifest) -> bool {
565        // No sidecar and no state_checksum → nothing to validate beyond
566        // manifest parsing (which already succeeded if we got here).
567        if manifest.state_checksum.is_none() && manifest.operator_states.is_empty() {
568            return false;
569        }
570        match self.store.validate_checkpoint(manifest.checkpoint_id) {
571            Ok(ValidationResult {
572                valid: false,
573                ref issues,
574                ..
575            }) => {
576                error!(
577                    checkpoint_id = manifest.checkpoint_id,
578                    issues = ?issues,
579                    "[LDB-6010] checkpoint integrity check failed"
580                );
581                true
582            }
583            Ok(_) => false, // valid
584            Err(e) => {
585                // I/O errors during validation are treated as corruption —
586                // if we can't verify the checkpoint, don't trust it.
587                error!(
588                    checkpoint_id = manifest.checkpoint_id,
589                    error = %e,
590                    "[LDB-6010] checkpoint validation I/O error — \
591                     treating as corrupt for safety"
592                );
593                true
594            }
595        }
596    }
597
598    /// In strict mode, returns an error if any source or sink had restore failures.
599    fn check_strict(&self, state: &RecoveredState) -> Result<(), DbError> {
600        if !self.strict || !state.has_errors() {
601            return Ok(());
602        }
603        let mut msgs: Vec<String> = state
604            .source_errors
605            .iter()
606            .map(|(k, v)| format!("source '{k}': {v}"))
607            .collect();
608        for (k, v) in &state.sink_errors {
609            msgs.push(format!("sink '{k}': {v}"));
610        }
611        Err(DbError::Checkpoint(format!(
612            "strict recovery failed — {} restore error(s): {}",
613            msgs.len(),
614            msgs.join("; ")
615        )))
616    }
617
618    /// Loads the latest manifest without performing recovery.
619    ///
620    /// Useful for inspecting checkpoint state or building a recovery plan.
621    ///
622    /// # Errors
623    ///
624    /// Returns `DbError::Checkpoint` if the store fails.
625    pub fn load_latest(&self) -> Result<Option<CheckpointManifest>, DbError> {
626        self.store
627            .load_latest()
628            .map_err(|e| DbError::Checkpoint(format!("failed to load checkpoint: {e}")))
629    }
630
631    /// Loads a specific checkpoint by ID.
632    ///
633    /// # Errors
634    ///
635    /// Returns `DbError::Checkpoint` if the store fails.
636    pub fn load_by_id(&self, checkpoint_id: u64) -> Result<Option<CheckpointManifest>, DbError> {
637        self.store.load_by_id(checkpoint_id).map_err(|e| {
638            DbError::Checkpoint(format!("failed to load checkpoint {checkpoint_id}: {e}"))
639        })
640    }
641}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646    use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
647    use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
648
649    fn make_store(dir: &std::path::Path) -> FileSystemCheckpointStore {
650        FileSystemCheckpointStore::new(dir, 3)
651    }
652
653    #[tokio::test]
654    async fn test_recover_no_checkpoint() {
655        let dir = tempfile::tempdir().unwrap();
656        let store = make_store(dir.path());
657        let mgr = RecoveryManager::new(&store);
658
659        let result = mgr.recover(&[], &[], &[]).await.unwrap();
660        assert!(result.is_none());
661    }
662
663    #[tokio::test]
664    async fn test_recover_empty_checkpoint() {
665        let dir = tempfile::tempdir().unwrap();
666        let store = make_store(dir.path());
667
668        // Save a basic checkpoint
669        let manifest = CheckpointManifest::new(1, 5);
670        store.save(&manifest).unwrap();
671
672        let mgr = RecoveryManager::new(&store);
673        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
674
675        assert_eq!(result.epoch(), 5);
676        assert_eq!(result.sources_restored, 0);
677        assert_eq!(result.tables_restored, 0);
678        assert_eq!(result.sinks_rolled_back, 0);
679        assert!(!result.has_errors());
680    }
681
682    #[tokio::test]
683    async fn test_recover_with_watermark() {
684        let dir = tempfile::tempdir().unwrap();
685        let store = make_store(dir.path());
686
687        let mut manifest = CheckpointManifest::new(1, 3);
688        manifest.watermark = Some(42_000);
689        store.save(&manifest).unwrap();
690
691        let mgr = RecoveryManager::new(&store);
692        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
693
694        assert_eq!(result.watermark(), Some(42_000));
695    }
696
697    #[tokio::test]
698    async fn test_recover_with_operator_states() {
699        let dir = tempfile::tempdir().unwrap();
700        let store = make_store(dir.path());
701
702        let mut manifest = CheckpointManifest::new(1, 7);
703        manifest
704            .operator_states
705            .insert("0".to_string(), OperatorCheckpoint::inline(b"window-state"));
706        manifest
707            .operator_states
708            .insert("3".to_string(), OperatorCheckpoint::inline(b"filter-state"));
709        store.save(&manifest).unwrap();
710
711        let mgr = RecoveryManager::new(&store);
712        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
713
714        assert_eq!(result.operator_states().len(), 2);
715        let op0 = result.operator_states().get("0").unwrap();
716        assert_eq!(op0.decode_inline().unwrap(), b"window-state");
717    }
718
719    #[tokio::test]
720    async fn test_recover_wal_positions() {
721        let dir = tempfile::tempdir().unwrap();
722        let store = make_store(dir.path());
723
724        let mut manifest = CheckpointManifest::new(1, 2);
725        manifest.wal_position = 4096;
726        manifest.per_core_wal_positions = vec![100, 200, 300];
727        store.save(&manifest).unwrap();
728
729        let mgr = RecoveryManager::new(&store);
730        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
731
732        assert_eq!(result.wal_position(), 4096);
733        assert_eq!(result.per_core_wal_positions(), &[100, 200, 300]);
734    }
735
736    #[tokio::test]
737    async fn test_recover_table_store_path() {
738        let dir = tempfile::tempdir().unwrap();
739        let store = make_store(dir.path());
740
741        let mut manifest = CheckpointManifest::new(1, 1);
742        manifest.table_store_checkpoint_path = Some("/data/rocksdb_cp_001".into());
743        store.save(&manifest).unwrap();
744
745        let mgr = RecoveryManager::new(&store);
746        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
747
748        assert_eq!(
749            result.table_store_checkpoint_path(),
750            Some("/data/rocksdb_cp_001")
751        );
752    }
753
754    #[test]
755    fn test_load_latest_no_checkpoint() {
756        let dir = tempfile::tempdir().unwrap();
757        let store = make_store(dir.path());
758        let mgr = RecoveryManager::new(&store);
759
760        assert!(mgr.load_latest().unwrap().is_none());
761    }
762
763    #[test]
764    fn test_load_by_id() {
765        let dir = tempfile::tempdir().unwrap();
766        let store = make_store(dir.path());
767
768        store.save(&CheckpointManifest::new(1, 1)).unwrap();
769        store.save(&CheckpointManifest::new(2, 2)).unwrap();
770
771        let mgr = RecoveryManager::new(&store);
772        let m = mgr.load_by_id(1).unwrap().unwrap();
773        assert_eq!(m.checkpoint_id, 1);
774
775        let m2 = mgr.load_by_id(2).unwrap().unwrap();
776        assert_eq!(m2.checkpoint_id, 2);
777
778        assert!(mgr.load_by_id(999).unwrap().is_none());
779    }
780
781    #[tokio::test]
782    async fn test_recover_fallback_to_previous_checkpoint() {
783        let dir = tempfile::tempdir().unwrap();
784        let store = FileSystemCheckpointStore::new(dir.path(), 10);
785
786        // Save two valid checkpoints
787        let mut m1 = CheckpointManifest::new(1, 10);
788        m1.watermark = Some(1000);
789        store.save(&m1).unwrap();
790
791        let mut m2 = CheckpointManifest::new(2, 20);
792        m2.watermark = Some(2000);
793        store.save(&m2).unwrap();
794
795        // Corrupt the latest checkpoint by writing invalid JSON
796        let latest_manifest_path = dir
797            .path()
798            .join("checkpoints")
799            .join("checkpoint_000002")
800            .join("manifest.json");
801        std::fs::write(&latest_manifest_path, "not valid json!!!").unwrap();
802
803        // Also corrupt latest.txt to point to the corrupt checkpoint
804        // (it already does from the save, but the manifest file is now corrupt)
805
806        let mgr = RecoveryManager::new(&store);
807        let result = mgr.recover(&[], &[], &[]).await.unwrap();
808
809        // Should fall back to checkpoint 1
810        let recovered = result.expect("should recover from fallback checkpoint");
811        assert_eq!(recovered.manifest.checkpoint_id, 1);
812        assert_eq!(recovered.epoch(), 10);
813        assert_eq!(recovered.watermark(), Some(1000));
814    }
815
816    #[tokio::test]
817    async fn test_recover_all_checkpoints_corrupt_starts_fresh() {
818        let dir = tempfile::tempdir().unwrap();
819        let store = FileSystemCheckpointStore::new(dir.path(), 10);
820
821        // Save a checkpoint then corrupt it
822        store.save(&CheckpointManifest::new(1, 5)).unwrap();
823
824        let manifest_path = dir
825            .path()
826            .join("checkpoints")
827            .join("checkpoint_000001")
828            .join("manifest.json");
829        std::fs::write(&manifest_path, "corrupt").unwrap();
830
831        let mgr = RecoveryManager::new(&store);
832        let result = mgr.recover(&[], &[], &[]).await.unwrap();
833
834        // All checkpoints corrupt → fresh start
835        assert!(result.is_none());
836    }
837
838    #[tokio::test]
839    async fn test_recover_latest_ok_no_fallback_needed() {
840        let dir = tempfile::tempdir().unwrap();
841        let store = FileSystemCheckpointStore::new(dir.path(), 10);
842
843        store.save(&CheckpointManifest::new(1, 10)).unwrap();
844        store.save(&CheckpointManifest::new(2, 20)).unwrap();
845
846        let mgr = RecoveryManager::new(&store);
847        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
848
849        // Should use the latest (no fallback needed)
850        assert_eq!(result.manifest.checkpoint_id, 2);
851        assert_eq!(result.epoch(), 20);
852    }
853
854    #[tokio::test]
855    async fn test_recover_with_sidecar_state() {
856        let dir = tempfile::tempdir().unwrap();
857        let store = make_store(dir.path());
858
859        // Build a manifest with an external operator state
860        let mut manifest = CheckpointManifest::new(1, 5);
861        let large_data = vec![0xAB; 2048];
862        manifest
863            .operator_states
864            .insert("big-op".into(), OperatorCheckpoint::external(0, 2048));
865
866        // Write sidecar first, then manifest
867        store.save_state_data(1, &large_data).unwrap();
868        store.save(&manifest).unwrap();
869
870        let mgr = RecoveryManager::new(&store);
871        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
872
873        // External state should have been resolved to inline
874        let op = result.operator_states().get("big-op").unwrap();
875        assert!(!op.external, "external state should be resolved to inline");
876        assert_eq!(op.decode_inline().unwrap(), large_data);
877    }
878
879    #[tokio::test]
880    async fn test_recover_mixed_inline_and_external() {
881        let dir = tempfile::tempdir().unwrap();
882        let store = make_store(dir.path());
883
884        let mut manifest = CheckpointManifest::new(1, 3);
885        // Small inline state
886        manifest
887            .operator_states
888            .insert("small-op".into(), OperatorCheckpoint::inline(b"tiny"));
889        // Large external state
890        let large_data = vec![0xCD; 4096];
891        manifest
892            .operator_states
893            .insert("big-op".into(), OperatorCheckpoint::external(0, 4096));
894
895        store.save_state_data(1, &large_data).unwrap();
896        store.save(&manifest).unwrap();
897
898        let mgr = RecoveryManager::new(&store);
899        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
900
901        let small = result.operator_states().get("small-op").unwrap();
902        assert_eq!(small.decode_inline().unwrap(), b"tiny");
903
904        let big = result.operator_states().get("big-op").unwrap();
905        assert_eq!(big.decode_inline().unwrap(), large_data);
906    }
907
908    #[tokio::test]
909    async fn test_recover_with_inflight_data() {
910        use laminar_storage::checkpoint_manifest::InFlightRecord;
911
912        let dir = tempfile::tempdir().unwrap();
913        let store = make_store(dir.path());
914
915        let mut manifest = CheckpointManifest::new(1, 5);
916        // Pre-encoded "inflight-event" in base64
917        let record = InFlightRecord {
918            input_id: 0,
919            data_b64: "aW5mbGlnaHQtZXZlbnQ=".into(),
920        };
921        manifest
922            .inflight_data
923            .insert("join-op".into(), vec![record]);
924        store.save(&manifest).unwrap();
925
926        let mgr = RecoveryManager::new(&store);
927        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
928
929        assert!(result.has_inflight_data());
930        let inflight = result.inflight_data();
931        assert_eq!(inflight.len(), 1);
932        let records = inflight.get("join-op").unwrap();
933        assert_eq!(records.len(), 1);
934        assert_eq!(records[0].input_id, 0);
935        assert_eq!(records[0].data_b64, "aW5mbGlnaHQtZXZlbnQ=");
936    }
937
938    #[tokio::test]
939    async fn test_recover_missing_sidecar_graceful() {
940        let dir = tempfile::tempdir().unwrap();
941        let store = make_store(dir.path());
942
943        // Manifest references external state but sidecar is missing
944        let mut manifest = CheckpointManifest::new(1, 1);
945        manifest
946            .operator_states
947            .insert("orphan".into(), OperatorCheckpoint::external(0, 100));
948        store.save(&manifest).unwrap();
949
950        let mgr = RecoveryManager::new(&store);
951        let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
952
953        // Should still recover (gracefully) — external state replaced with
954        // empty inline to avoid dangling offset references
955        let op = result.operator_states().get("orphan").unwrap();
956        assert!(
957            !op.external,
958            "unresolved external state replaced with inline empty"
959        );
960        assert!(
961            op.state_b64.as_ref().is_none_or(String::is_empty),
962            "replaced state should be empty"
963        );
964    }
965
966    #[tokio::test]
967    async fn test_recovered_state_has_errors() {
968        let state = RecoveredState {
969            manifest: CheckpointManifest::new(1, 1),
970            sources_restored: 0,
971            tables_restored: 0,
972            sinks_rolled_back: 0,
973            source_errors: HashMap::new(),
974            sink_errors: HashMap::new(),
975        };
976        assert!(!state.has_errors());
977
978        let state_with_errors = RecoveredState {
979            manifest: CheckpointManifest::new(1, 1),
980            sources_restored: 0,
981            tables_restored: 0,
982            sinks_rolled_back: 0,
983            source_errors: HashMap::from([("source1".into(), "failed".into())]),
984            sink_errors: HashMap::new(),
985        };
986        assert!(state_with_errors.has_errors());
987    }
988}