Skip to main content

laminar_db/
checkpoint_coordinator.rs

1//! Unified checkpoint coordinator.
2//!
3//! Single orchestrator for checkpoint lifecycle. Lives in Ring 2 (control plane).
4//!
5//! The checkpoint manifest is the source of truth for source offsets.
6//! Kafka broker commits are advisory (for monitoring tools). On recovery,
7//! offsets restore from manifest, not consumer group state.
8//!
9//! ## Checkpoint Cycle
10//!
11//! 1. Barrier injection — `CheckpointBarrierInjector.trigger()`
12//! 2. Operator snapshot — `OperatorGraph.snapshot_state()` → operator states
13//! 3. Source snapshot — `source.checkpoint()` for each source
14//! 4. Sink pre-commit — `sink.pre_commit(epoch)` for each exactly-once sink
15//! 5. Manifest persist — `store.save(&manifest)` (atomic write)
16//! 6. Sink commit — `sink.commit_epoch(epoch)` for each exactly-once sink
17//! 7. On ANY failure at 6 — `sink.rollback_epoch()` on remaining sinks
18#![allow(clippy::disallowed_types)] // cold path
19
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use laminar_connectors::checkpoint::SourceCheckpoint;
25use laminar_connectors::connector::SourceConnector;
26use laminar_core::state::StateBackend;
27use laminar_storage::checkpoint_manifest::{
28    CheckpointManifest, ConnectorCheckpoint, SinkCommitStatus,
29};
30use laminar_storage::checkpoint_store::CheckpointStore;
31use tracing::{debug, error, info, warn};
32
33use crate::error::DbError;
34
35/// Unified checkpoint configuration.
36///
37/// Timeouts prevent a stuck sink or hung filesystem from stalling the
38/// runtime. `state_inline_threshold` decides per-operator whether state
39/// inlines as base64 in the JSON manifest or lands in a sidecar file.
40/// `max_checkpoint_bytes` caps total sidecar size; an oversized
41/// checkpoint is rejected with `[LDB-6014]`.
42#[derive(Debug, Clone)]
43pub struct CheckpointConfig {
44    /// Interval between checkpoints. `None` = manual only.
45    pub interval: Option<Duration>,
46    /// Number of completed checkpoints retained on disk/in object store.
47    pub max_retained: usize,
48    /// Upper bound on barrier-alignment wait at fan-in operators.
49    pub alignment_timeout: Duration,
50    /// Upper bound on sink pre-commit (phase 1).
51    pub pre_commit_timeout: Duration,
52    /// Upper bound on manifest persist.
53    pub persist_timeout: Duration,
54    /// Upper bound on sink commit (phase 2).
55    pub commit_timeout: Duration,
56    /// Upper bound on sink rollback.
57    pub rollback_timeout: Duration,
58    /// States larger than this go to a sidecar rather than base64 JSON.
59    pub state_inline_threshold: usize,
60    /// Upper bound on operator state serialization.
61    pub serialization_timeout: Duration,
62    /// Cap on total sidecar bytes; `None` = no limit. 80% warn threshold.
63    pub max_checkpoint_bytes: Option<usize>,
64}
65
66impl Default for CheckpointConfig {
67    fn default() -> Self {
68        Self {
69            interval: Some(Duration::from_secs(60)),
70            max_retained: 3,
71            alignment_timeout: Duration::from_secs(30),
72            pre_commit_timeout: Duration::from_secs(30),
73            persist_timeout: Duration::from_secs(120),
74            commit_timeout: Duration::from_secs(60),
75            rollback_timeout: Duration::from_secs(30),
76            serialization_timeout: Duration::from_secs(120),
77            state_inline_threshold: 1_048_576,
78            max_checkpoint_bytes: None,
79        }
80    }
81}
82
83/// Parameters for a checkpoint operation.
84#[derive(Debug, Clone, Default)]
85pub struct CheckpointRequest {
86    /// Serialized operator states. `Bytes` (not `Vec<u8>`) so producers
87    /// (rkyv output, MV IPC bytes) can hand off the buffer without an
88    /// extra copy at each stage of the checkpoint pipeline.
89    pub operator_states: HashMap<String, bytes::Bytes>,
90    /// Current watermark timestamp.
91    pub watermark: Option<i64>,
92    /// Path for table store checkpoint data.
93    pub table_store_checkpoint_path: Option<String>,
94    /// Additional table offset overrides.
95    pub extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
96    /// Per-source watermark timestamps.
97    pub source_watermarks: HashMap<String, i64>,
98    /// Pipeline topology hash for change detection.
99    pub pipeline_hash: Option<u64>,
100    /// Source offset overrides for recovery.
101    pub source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
102}
103
104/// Phase of the checkpoint lifecycle.
105#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
106pub enum CheckpointPhase {
107    /// No checkpoint in progress.
108    Idle,
109    /// Operators snapshotted, collecting source positions.
110    Snapshotting,
111    /// Sinks pre-committing (phase 1).
112    PreCommitting,
113    /// Manifest being persisted.
114    Persisting,
115    /// Sinks committing (phase 2).
116    Committing,
117}
118
119impl std::fmt::Display for CheckpointPhase {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        match self {
122            Self::Idle => write!(f, "Idle"),
123            Self::Snapshotting => write!(f, "Snapshotting"),
124            Self::PreCommitting => write!(f, "PreCommitting"),
125            Self::Persisting => write!(f, "Persisting"),
126            Self::Committing => write!(f, "Committing"),
127        }
128    }
129}
130
131/// Result of a checkpoint attempt.
132#[derive(Debug, serde::Serialize)]
133pub struct CheckpointResult {
134    /// Whether the checkpoint succeeded.
135    pub success: bool,
136    /// Checkpoint ID (if created).
137    pub checkpoint_id: u64,
138    /// Epoch number.
139    pub epoch: u64,
140    /// Duration of the checkpoint operation.
141    pub duration: Duration,
142    /// Error message if failed.
143    pub error: Option<String>,
144}
145
146/// Registered source for checkpoint coordination.
147pub(crate) struct RegisteredSource {
148    /// Source name.
149    pub name: String,
150    /// Source connector handle.
151    pub connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
152    /// Whether this source supports replay from a checkpointed position.
153    ///
154    /// Sources that do not support replay (e.g., WebSocket) degrade
155    /// exactly-once semantics to at-most-once.
156    pub supports_replay: bool,
157}
158
159/// Registered sink for checkpoint coordination.
160pub(crate) struct RegisteredSink {
161    /// Sink name.
162    pub name: String,
163    /// Sink task handle (channel-based, no mutex contention).
164    pub handle: crate::sink_task::SinkTaskHandle,
165    /// Whether this sink supports exactly-once / two-phase commit.
166    pub exactly_once: bool,
167}
168
169/// Unified checkpoint coordinator.
170///
171/// Orchestrates the full checkpoint lifecycle across sources, sinks,
172/// and operator state, persisting everything in a single
173/// [`CheckpointManifest`].
174pub struct CheckpointCoordinator {
175    config: CheckpointConfig,
176    store: Arc<dyn CheckpointStore>,
177    sinks: Vec<RegisteredSink>,
178    next_checkpoint_id: u64,
179    epoch: u64,
180    phase: CheckpointPhase,
181    checkpoints_completed: u64,
182    checkpoints_failed: u64,
183    last_checkpoint_duration: Option<Duration>,
184    duration_histogram: DurationHistogram,
185    prom: Option<Arc<crate::engine_metrics::EngineMetrics>>,
186    total_bytes_written: u64,
187    /// Consulted between manifest persist and sink commit to verify
188    /// per-vnode durability.
189    state_backend: Option<Arc<dyn StateBackend>>,
190    /// Stamped into every `write_partial` for the split-brain fence.
191    /// Zero = fence disabled.
192    assignment_version: u64,
193    /// Shared commit-marker store. Written before `Commit` is
194    /// announced; absence means "no quorum, safe to abort". `None`
195    /// in single-instance mode.
196    #[cfg(feature = "cluster-unstable")]
197    decision_store: Option<Arc<laminar_core::cluster::control::CheckpointDecisionStore>>,
198    /// Reported in each `BarrierAck`; the leader folds these with its
199    /// own watermark to compute the cluster-wide min.
200    local_watermark_ms: Option<i64>,
201    /// Cluster-wide min watermark as of the last committed epoch
202    /// (leader-side; fanned out in the Commit announcement).
203    #[cfg(feature = "cluster-unstable")]
204    cluster_min_watermark: Option<i64>,
205    /// Vnodes this coordinator owns; drives per-vnode marker writes.
206    vnode_set: Vec<u32>,
207    /// Vnodes the leader's durability gate checks. In cluster mode the
208    /// full registry; single-instance mirrors `vnode_set`.
209    gate_vnode_set: Vec<u32>,
210    /// `Some` in cluster mode, `None` in single-instance / embedded.
211    #[cfg(feature = "cluster-unstable")]
212    cluster_controller: Option<Arc<laminar_core::cluster::control::ClusterController>>,
213    /// Cached sorted sink names; invalidated on `register_sink`.
214    cached_sorted_sink_names: Option<Vec<String>>,
215}
216
217impl CheckpointCoordinator {
218    /// Creates a new checkpoint coordinator, seeded from the latest
219    /// stored checkpoint.
220    ///
221    /// # Errors
222    /// Returns [`DbError::Checkpoint`] if `store.load_latest()` fails.
223    /// A store read error is surfaced rather than silently starting at
224    /// `(1, 1)` and clobbering existing on-disk state. `Ok(None)` is
225    /// the fresh-start path and is not an error.
226    pub async fn new(
227        config: CheckpointConfig,
228        store: Box<dyn CheckpointStore>,
229    ) -> Result<Self, DbError> {
230        let store: Arc<dyn CheckpointStore> = Arc::from(store);
231        let (next_id, epoch) = match store.load_latest().await {
232            Ok(Some(m)) => (m.checkpoint_id + 1, m.epoch + 1),
233            Ok(None) => (1, 1),
234            Err(e) => {
235                return Err(DbError::Checkpoint(format!(
236                    "[LDB-6028] failed to load latest checkpoint at coordinator \
237                     construction: {e} — refusing to start at epoch 1 and \
238                     clobber existing on-disk state"
239                )));
240            }
241        };
242
243        Ok(Self {
244            config,
245            store,
246            sinks: Vec::new(),
247            next_checkpoint_id: next_id,
248            epoch,
249            phase: CheckpointPhase::Idle,
250            checkpoints_completed: 0,
251            checkpoints_failed: 0,
252            last_checkpoint_duration: None,
253            duration_histogram: DurationHistogram::new(),
254            prom: None,
255            total_bytes_written: 0,
256            state_backend: None,
257            assignment_version: 0,
258            #[cfg(feature = "cluster-unstable")]
259            decision_store: None,
260            local_watermark_ms: None,
261            #[cfg(feature = "cluster-unstable")]
262            cluster_min_watermark: None,
263            vnode_set: Vec::new(),
264            gate_vnode_set: Vec::new(),
265            #[cfg(feature = "cluster-unstable")]
266            cluster_controller: None,
267            cached_sorted_sink_names: None,
268        })
269    }
270
271    /// Activates cluster-mode 2PC. Without this the coordinator runs
272    /// single-instance semantics.
273    #[cfg(feature = "cluster-unstable")]
274    pub fn set_cluster_controller(
275        &mut self,
276        controller: Arc<laminar_core::cluster::control::ClusterController>,
277    ) {
278        self.cluster_controller = Some(controller);
279    }
280
281    /// Wired with a non-empty `vnode_set` to enable per-vnode markers
282    /// and the `epoch_complete` durability gate.
283    pub fn set_state_backend(&mut self, backend: Arc<dyn StateBackend>) {
284        self.state_backend = Some(backend);
285    }
286
287    /// Install the shared commit-marker store.
288    #[cfg(feature = "cluster-unstable")]
289    pub fn set_decision_store(
290        &mut self,
291        store: Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
292    ) {
293        self.decision_store = Some(store);
294    }
295
296    /// Record the assignment generation this coordinator is writing
297    /// with. Forwarded to `backend.write_partial` so the split-brain
298    /// fence can reject stale writers. Host sets this whenever a fresh
299    /// `AssignmentSnapshot` rotates in.
300    pub fn set_assignment_version(&mut self, version: u64) {
301        self.assignment_version = version;
302    }
303
304    /// Record this instance's current local watermark, reported in
305    /// every subsequent `BarrierAck` so the leader can compute the
306    /// cluster-wide minimum. `None` disables the per-follower
307    /// contribution — leader falls back to its own watermark (and
308    /// the other followers').
309    pub fn set_local_watermark_ms(&mut self, watermark: Option<i64>) {
310        self.local_watermark_ms = watermark;
311    }
312
313    /// Vnodes this instance owns; drives marker writes. Also the
314    /// default gate set until [`Self::set_gate_vnode_set`] is called.
315    pub fn set_vnode_set(&mut self, vnodes: Vec<u32>) {
316        if self.gate_vnode_set.is_empty() {
317            self.gate_vnode_set.clone_from(&vnodes);
318        }
319        self.vnode_set = vnodes;
320    }
321
322    /// Set the vnodes the leader's durability gate checks (the full
323    /// registry in cluster mode). Defaults to `vnode_set` when unset.
324    pub fn set_gate_vnode_set(&mut self, vnodes: Vec<u32>) {
325        self.gate_vnode_set = vnodes;
326    }
327
328    /// Registers a sink connector for checkpoint coordination.
329    pub(crate) fn register_sink(
330        &mut self,
331        name: impl Into<String>,
332        handle: crate::sink_task::SinkTaskHandle,
333        exactly_once: bool,
334    ) {
335        self.sinks.push(RegisteredSink {
336            name: name.into(),
337            handle,
338            exactly_once,
339        });
340        // Invalidate the sorted-name cache; the next checkpoint will
341        // rebuild it.
342        self.cached_sorted_sink_names = None;
343    }
344
345    /// Begins the initial epoch on all exactly-once sinks.
346    ///
347    /// Must be called once after all sinks are registered and before any
348    /// writes occur. This starts the first Kafka transaction for exactly-once
349    /// sinks. Subsequent epochs are started automatically after each
350    /// successful checkpoint commit.
351    ///
352    /// # Errors
353    ///
354    /// Returns the first error from any sink that fails to begin the epoch.
355    pub async fn begin_initial_epoch(&self) -> Result<(), DbError> {
356        self.begin_epoch_for_sinks(self.epoch).await
357    }
358
359    /// Begins an epoch on all exactly-once sinks. If any sink fails,
360    /// rolls back sinks that already started the epoch.
361    async fn begin_epoch_for_sinks(&self, epoch: u64) -> Result<(), DbError> {
362        let mut started: Vec<&RegisteredSink> = Vec::new();
363        for sink in &self.sinks {
364            if sink.exactly_once {
365                match sink.handle.begin_epoch(epoch).await {
366                    Ok(()) => {
367                        started.push(sink);
368                        debug!(sink = %sink.name, epoch, "began epoch");
369                    }
370                    Err(e) => {
371                        // Roll back sinks that already started.
372                        for s in &started {
373                            if let Err(re) = s.handle.rollback_epoch(epoch).await {
374                                error!(sink = %s.name, epoch, error = %re,
375                                    "[LDB-6016] sink rollback failed during begin_epoch recovery");
376                            }
377                        }
378                        return Err(DbError::Checkpoint(format!(
379                            "sink '{}' failed to begin epoch {epoch}: {e}",
380                            sink.name
381                        )));
382                    }
383                }
384            }
385        }
386        Ok(())
387    }
388
389    /// Inject prometheus engine metrics.
390    pub fn set_metrics(&mut self, prom: Arc<crate::engine_metrics::EngineMetrics>) {
391        self.prom = Some(prom);
392    }
393
394    /// Emits checkpoint metrics to prometheus.
395    fn emit_checkpoint_metrics(&self, success: bool, epoch: u64, duration: Duration) {
396        if let Some(ref m) = self.prom {
397            if success {
398                m.checkpoints_completed.inc();
399            } else {
400                m.checkpoints_failed.inc();
401            }
402            #[allow(clippy::cast_possible_wrap)]
403            m.checkpoint_epoch.set(epoch as i64);
404            m.checkpoint_duration.observe(duration.as_secs_f64());
405        }
406    }
407
408    /// Performs a full checkpoint cycle (steps 3-7).
409    ///
410    /// Steps 1-2 (barrier propagation + operator snapshots) are handled
411    /// externally by the DAG executor and passed in via [`CheckpointRequest`].
412    ///
413    /// # Errors
414    ///
415    /// Returns `DbError::Checkpoint` if any phase fails.
416    pub async fn checkpoint(
417        &mut self,
418        request: CheckpointRequest,
419    ) -> Result<CheckpointResult, DbError> {
420        self.checkpoint_inner(request).await
421    }
422
423    /// Pre-commits all exactly-once sinks (phase 1) with a timeout.
424    ///
425    /// A stuck sink will not block checkpointing indefinitely. The timeout
426    /// is configured via [`CheckpointConfig::pre_commit_timeout`].
427    async fn pre_commit_sinks(&self, epoch: u64) -> Result<(), DbError> {
428        let timeout_dur = self.config.pre_commit_timeout;
429        let start = std::time::Instant::now();
430
431        let result =
432            match tokio::time::timeout(timeout_dur, self.pre_commit_sinks_inner(epoch)).await {
433                Ok(result) => result,
434                Err(_elapsed) => Err(DbError::Checkpoint(format!(
435                    "pre-commit timed out after {}s",
436                    timeout_dur.as_secs()
437                ))),
438            };
439
440        // Record pre-commit duration regardless of success/failure.
441        if let Some(ref m) = self.prom {
442            m.sink_precommit_duration
443                .observe(start.elapsed().as_secs_f64());
444        }
445
446        result
447    }
448
449    /// Inner pre-commit loop (no timeout).
450    ///
451    /// Only sinks with `exactly_once = true` participate in two-phase commit.
452    /// At-most-once sinks are skipped — they receive no `pre_commit`/`commit`
453    /// calls and provide no transactional guarantees.
454    ///
455    /// Fires every sink's `pre_commit` concurrently via `try_join_all` —
456    /// matching the rollback path's shape. The serial version was
457    /// `sum(per-sink pre-commit latency)`; with 4 Delta sinks at ~200ms
458    /// S3 write each, that was 800ms serial. Concurrent = 200ms.
459    async fn pre_commit_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
460        let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
461            let handle = sink.handle.clone();
462            let name = sink.name.clone();
463            async move {
464                let result = handle.pre_commit(epoch).await;
465                match result {
466                    Ok(()) => {
467                        debug!(sink = %name, epoch, "sink pre-committed");
468                        Ok(())
469                    }
470                    Err(e) => Err(DbError::Checkpoint(format!(
471                        "sink '{name}' pre-commit failed: {e}"
472                    ))),
473                }
474            }
475        });
476        futures::future::try_join_all(futures).await.map(|_| ())
477    }
478
479    /// Commits all exactly-once sinks with per-sink status tracking.
480    ///
481    /// Returns a map of sink name → commit status. Sinks that committed
482    /// successfully are `Committed`; failures are `Failed(message)`.
483    /// All sinks are attempted even if some fail.
484    ///
485    /// Bounded by [`CheckpointConfig::commit_timeout`] to prevent a stuck
486    /// sink from blocking checkpoint completion indefinitely.
487    async fn commit_sinks_tracked(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
488        let timeout_dur = self.config.commit_timeout;
489        let start = std::time::Instant::now();
490
491        let statuses = match tokio::time::timeout(timeout_dur, self.commit_sinks_inner(epoch)).await
492        {
493            Ok(statuses) => statuses,
494            Err(_elapsed) => {
495                error!(
496                    epoch,
497                    timeout_secs = timeout_dur.as_secs(),
498                    "[LDB-6012] sink commit timed out — marking all pending sinks as failed"
499                );
500                self.sinks
501                    .iter()
502                    .filter(|s| s.exactly_once)
503                    .map(|s| {
504                        (
505                            s.name.clone(),
506                            SinkCommitStatus::Failed(format!(
507                                "sink '{}' commit timed out after {}s",
508                                s.name,
509                                timeout_dur.as_secs()
510                            )),
511                        )
512                    })
513                    .collect()
514            }
515        };
516
517        // Record commit duration regardless of success/failure.
518        if let Some(ref m) = self.prom {
519            m.sink_commit_duration
520                .observe(start.elapsed().as_secs_f64());
521        }
522
523        statuses
524    }
525
526    /// Inner commit loop (no timeout).
527    ///
528    /// Fires every sink's `commit_epoch` concurrently via `join_all`
529    /// (not `try_join_all` — one sink failing must not short-circuit
530    /// the others, because each sink's status is tracked independently
531    /// and callers inspect the map). Matches the rollback path's
532    /// pattern at `rollback_sinks_inner`.
533    async fn commit_sinks_inner(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
534        let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
535            let handle = sink.handle.clone();
536            let name = sink.name.clone();
537            async move {
538                let status = match handle.commit_epoch(epoch).await {
539                    Ok(()) => {
540                        debug!(sink = %name, epoch, "sink committed");
541                        SinkCommitStatus::Committed
542                    }
543                    Err(e) => {
544                        let msg = format!("sink '{name}' commit failed: {e}");
545                        error!(sink = %name, epoch, error = %e, "sink commit failed");
546                        SinkCommitStatus::Failed(msg)
547                    }
548                };
549                (name, status)
550            }
551        });
552        let results = futures::future::join_all(futures).await;
553        results.into_iter().collect()
554    }
555
556    /// Saves a manifest to the checkpoint store.
557    ///
558    /// Uses [`CheckpointStore::save_with_state`] to write optional sidecar
559    /// data **before** the manifest, ensuring atomicity: if the sidecar write
560    /// fails, the manifest is never persisted.
561    ///
562    /// Takes `Arc<CheckpointManifest>` so the caller can retain its own copy
563    /// without a deep clone. Bounded by [`CheckpointConfig::persist_timeout`]
564    /// to prevent a hung filesystem from stalling the runtime indefinitely.
565    async fn save_manifest(
566        &self,
567        manifest: Arc<CheckpointManifest>,
568        state_data: Option<Vec<bytes::Bytes>>,
569    ) -> Result<(), DbError> {
570        let timeout_dur = self.config.persist_timeout;
571        let fut = self.store.save_with_state(&manifest, state_data.as_deref());
572        match tokio::time::timeout(timeout_dur, fut).await {
573            Ok(Ok(())) => Ok(()),
574            Ok(Err(e)) => Err(DbError::from(e)),
575            Err(_elapsed) => Err(DbError::Checkpoint(format!(
576                "[LDB-6011] manifest persist timed out after {}s — \
577                 filesystem may be degraded",
578                timeout_dur.as_secs()
579            ))),
580        }
581    }
582
583    /// Writes a per-vnode durability marker so the leader's
584    /// `epoch_complete` gate returns true and sinks can commit.
585    ///
586    /// Fires every vnode marker concurrently via `try_join_all`.
587    /// The serial version was O(`vnode_count` × per-write latency) —
588    /// ~256 awaits on a typical cluster, trivial CPU but each a
589    /// scheduling hop (and on remote object-store backends each
590    /// write is tens to hundreds of ms). Concurrent dispatch
591    /// collapses that to one round-trip's worth of wall time.
592    async fn write_vnode_markers(&self, epoch: u64, checkpoint_id: u64) -> Result<(), DbError> {
593        let Some(ref backend) = self.state_backend else {
594            return Ok(());
595        };
596        if self.vnode_set.is_empty() {
597            return Ok(());
598        }
599        let payload = bytes::Bytes::from(format!("ckpt:{checkpoint_id}").into_bytes());
600        // Stamp every marker with the current assignment generation.
601        // Zero means the host hasn't wired a version (single-instance
602        // path) and the fence is a no-op.
603        let caller_version = self.assignment_version;
604        let writes = self.vnode_set.iter().map(|&v| {
605            let backend = Arc::clone(backend);
606            let payload = payload.clone();
607            async move {
608                backend
609                    .write_partial(v, epoch, caller_version, payload)
610                    .await
611                    .map_err(|e| {
612                        DbError::Checkpoint(format!(
613                            "[LDB-6024] vnode marker write failed (vnode={v}, epoch={epoch}): {e}"
614                        ))
615                    })
616            }
617        });
618        futures::future::try_join_all(writes).await.map(|_| ())
619    }
620
621    /// On startup, reconcile any Pending sinks in the last manifest
622    /// against the durable commit marker. Marker present → drive
623    /// local commit (idempotent); marker absent → rollback. Runs on
624    /// every node; the leader additionally re-announces the decision.
625    #[cfg(feature = "cluster-unstable")]
626    pub async fn reconcile_prepared_on_init(&self) {
627        use laminar_core::cluster::control::Phase;
628
629        let Ok(Some(last)) = self.store.load_latest().await else {
630            return;
631        };
632        let has_pending = last
633            .sink_commit_statuses
634            .values()
635            .any(|s| matches!(s, SinkCommitStatus::Pending));
636        if !has_pending {
637            return;
638        }
639
640        let epoch = last.epoch;
641        let checkpoint_id = last.checkpoint_id;
642
643        let committed = match self.decision_store.as_ref() {
644            Some(ds) => ds.is_committed(epoch).await.unwrap_or_else(|e| {
645                warn!(
646                    epoch, checkpoint_id, error = %e,
647                    "[LDB-6040] decision store read failed — defaulting to Abort",
648                );
649                false
650            }),
651            None => false,
652        };
653
654        let is_leader = self
655            .cluster_controller
656            .as_ref()
657            .is_some_and(|cc| cc.is_leader());
658
659        if committed {
660            info!(
661                epoch,
662                checkpoint_id, "recovering Pending epoch as Committed"
663            );
664            let statuses = self.commit_sinks_tracked(epoch).await;
665            if let Err(e) = self
666                .persist_recovered_statuses(checkpoint_id, statuses)
667                .await
668            {
669                warn!(epoch, checkpoint_id, error = %e, "post-recovery manifest update failed");
670            }
671            if is_leader {
672                self.announce_if_leader(epoch, checkpoint_id, Phase::Commit, None)
673                    .await;
674            }
675        } else {
676            warn!(
677                epoch,
678                checkpoint_id, "[LDB-6035] Pending epoch with no commit marker — rolling back",
679            );
680            if let Err(e) = self.rollback_sinks(epoch).await {
681                error!(epoch, checkpoint_id, error = %e, "sink rollback failed during recovery");
682            }
683            if is_leader {
684                self.announce_if_leader(epoch, checkpoint_id, Phase::Abort, None)
685                    .await;
686            }
687        }
688
689        // Brief pause so the announcement gossips before the next checkpoint tick.
690        if is_leader {
691            tokio::time::sleep(Duration::from_millis(200)).await;
692        }
693    }
694
695    /// Overwrite the Pending sink statuses on the last manifest with
696    /// the outcomes produced during recovery.
697    #[cfg(feature = "cluster-unstable")]
698    async fn persist_recovered_statuses(
699        &self,
700        checkpoint_id: u64,
701        statuses: HashMap<String, SinkCommitStatus>,
702    ) -> Result<(), DbError> {
703        if statuses.is_empty() {
704            return Ok(());
705        }
706        match self.store.load_by_id(checkpoint_id).await {
707            Ok(Some(mut m)) => {
708                m.sink_commit_statuses = statuses;
709                self.update_manifest_only(Arc::new(m)).await
710            }
711            Ok(None) => Ok(()),
712            Err(e) => Err(DbError::from(e)),
713        }
714    }
715
716    /// No-op when not the leader. Errors are logged — worst case is a
717    /// longer follower timeout, not a correctness issue.
718    ///
719    /// `min_watermark_ms` is typically `None` on `Prepare`/`Abort` and
720    /// `Some(cluster_min)` on `Commit` (computed from follower acks +
721    /// local watermark). Downstream operators read the published
722    /// value from [`ClusterController`] so event-time decisions stay
723    /// consistent across the cluster.
724    #[cfg(feature = "cluster-unstable")]
725    async fn announce_if_leader(
726        &self,
727        epoch: u64,
728        checkpoint_id: u64,
729        phase: laminar_core::cluster::control::Phase,
730        min_watermark_ms: Option<i64>,
731    ) {
732        let Some(cc) = self.cluster_controller.as_ref() else {
733            return;
734        };
735        if !cc.is_leader() {
736            return;
737        }
738        let ann = laminar_core::cluster::control::BarrierAnnouncement {
739            epoch,
740            checkpoint_id,
741            phase,
742            flags: 0,
743            min_watermark_ms,
744        };
745        if let Err(e) = cc.announce_barrier(&ann).await {
746            warn!(
747                epoch,
748                checkpoint_id,
749                ?phase,
750                error = %e,
751                "[LDB-6031] barrier announcement failed",
752            );
753        }
754    }
755
756    /// Announce PREPARE and block for follower acks. Returns `None` on
757    /// quorum or no-op (not leader); `Some(msg)` with the failure.
758    /// When quorum is reached, the `Ok` path writes the cluster-wide
759    /// minimum watermark (leader's local + min of follower acks) into
760    /// `self.cluster_min_watermark` so the subsequent `Commit`
761    /// announcement can fan it out.
762    #[cfg(feature = "cluster-unstable")]
763    async fn await_prepare_quorum(&mut self, epoch: u64, checkpoint_id: u64) -> Option<String> {
764        use laminar_core::cluster::control::{Phase, QuorumOutcome};
765        let cc = self.cluster_controller.as_ref()?;
766        if !cc.is_leader() {
767            return None;
768        }
769        self.announce_if_leader(epoch, checkpoint_id, Phase::Prepare, None)
770            .await;
771
772        let mut followers = cc.live_instances();
773        followers.retain(|id| *id != cc.instance_id());
774        if followers.is_empty() {
775            // Leader-only cluster — cluster-wide min is just the
776            // leader's local watermark (if any).
777            self.cluster_min_watermark = self.local_watermark_ms;
778            if let Some(wm) = self.local_watermark_ms {
779                cc.publish_cluster_min_watermark(wm);
780            }
781            return None;
782        }
783
784        let outcome = cc
785            .wait_for_quorum(epoch, &followers, Duration::from_secs(30))
786            .await;
787        match outcome {
788            QuorumOutcome::Reached {
789                min_follower_watermark_ms,
790                ..
791            } => {
792                // Fold follower min with the leader's own watermark.
793                // Either may be `None` (unreported) — treated as
794                // "non-blocking" (ignored from the min computation).
795                let merged = match (self.local_watermark_ms, min_follower_watermark_ms) {
796                    (Some(a), Some(b)) => Some(a.min(b)),
797                    (Some(a), None) => Some(a),
798                    (None, Some(b)) => Some(b),
799                    (None, None) => None,
800                };
801                self.cluster_min_watermark = merged;
802                // Leader-side mirror: publish to the controller atomic
803                // so this instance's operators see the same value the
804                // followers will pick up via `observe_barrier(Commit)`.
805                // Without this, the leader's event-time decisions lag a
806                // full checkpoint behind its own announcements.
807                if let Some(wm) = merged {
808                    cc.publish_cluster_min_watermark(wm);
809                }
810                None
811            }
812            QuorumOutcome::TimedOut { missing, .. } => {
813                self.announce_if_leader(epoch, checkpoint_id, Phase::Abort, None)
814                    .await;
815                Some(format!(
816                    "quorum timeout: {} follower(s) did not ack",
817                    missing.len()
818                ))
819            }
820            QuorumOutcome::Failed { failures } => {
821                self.announce_if_leader(epoch, checkpoint_id, Phase::Abort, None)
822                    .await;
823                let first = failures.first().map_or("unknown", |(_, msg)| msg.as_str());
824                Some(format!(
825                    "follower snapshot failed on {} peer(s): {first}",
826                    failures.len()
827                ))
828            }
829        }
830    }
831
832    /// Overwrites an existing manifest with updated fields (e.g., sink commit
833    /// statuses after Step 6). Uses [`CheckpointStore::update_manifest`] which
834    /// does NOT use conditional PUT, so the overwrite always succeeds.
835    async fn update_manifest_only(&self, manifest: Arc<CheckpointManifest>) -> Result<(), DbError> {
836        let timeout_dur = self.config.persist_timeout;
837        let fut = self.store.update_manifest(&manifest);
838        match tokio::time::timeout(timeout_dur, fut).await {
839            Ok(Ok(())) => Ok(()),
840            Ok(Err(e)) => Err(DbError::from(e)),
841            Err(_elapsed) => Err(DbError::Checkpoint(format!(
842                "manifest update timed out after {}s",
843                timeout_dur.as_secs()
844            ))),
845        }
846    }
847
848    /// Returns initial sink commit statuses (all `Pending`) for the manifest.
849    fn initial_sink_commit_statuses(&self) -> HashMap<String, SinkCommitStatus> {
850        self.sinks
851            .iter()
852            .filter(|s| s.exactly_once)
853            .map(|s| (s.name.clone(), SinkCommitStatus::Pending))
854            .collect()
855    }
856
857    /// Packs operator states into a manifest with optional sidecar chunks.
858    ///
859    /// States larger than `threshold` are stored in a sidecar blob rather
860    /// than base64-inlined in the JSON manifest. The returned `Vec<Bytes>`
861    /// is handed to `save_with_state` as a chain — the object-store path
862    /// builds a multi-chunk `PutPayload` (no contiguous buffer), the FS
863    /// path writes chunks sequentially. Either way no full-state copy
864    /// happens here.
865    fn pack_operator_states(
866        manifest: &mut CheckpointManifest,
867        operator_states: &HashMap<String, bytes::Bytes>,
868        threshold: usize,
869    ) -> Option<Vec<bytes::Bytes>> {
870        let mut sidecar_chunks: Vec<bytes::Bytes> = Vec::new();
871        let mut offset: u64 = 0;
872        for (name, data) in operator_states {
873            let (op_ckpt, maybe_blob) =
874                laminar_storage::checkpoint_manifest::OperatorCheckpoint::from_bytes_shared(
875                    data.clone(),
876                    threshold,
877                    offset,
878                );
879            if let Some(blob) = maybe_blob {
880                offset += blob.len() as u64;
881                sidecar_chunks.push(blob);
882            }
883            manifest.operator_states.insert(name.clone(), op_ckpt);
884        }
885
886        if sidecar_chunks.is_empty() {
887            None
888        } else {
889            Some(sidecar_chunks)
890        }
891    }
892
893    /// Rolls back all exactly-once sinks in parallel, bounded by
894    /// [`CheckpointConfig::rollback_timeout`].
895    async fn rollback_sinks(&self, epoch: u64) -> Result<(), DbError> {
896        let timeout_dur = self.config.rollback_timeout;
897        match tokio::time::timeout(timeout_dur, self.rollback_sinks_inner(epoch)).await {
898            Ok(result) => result,
899            Err(_elapsed) => {
900                error!(
901                    epoch,
902                    timeout_secs = timeout_dur.as_secs(),
903                    "[LDB-6016] sink rollback timed out"
904                );
905                Err(DbError::Checkpoint(format!(
906                    "rollback timed out after {}s",
907                    timeout_dur.as_secs()
908                )))
909            }
910        }
911    }
912
913    async fn rollback_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
914        let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
915            let handle = sink.handle.clone();
916            let name = sink.name.clone();
917            async move {
918                let result = handle.rollback_epoch(epoch).await;
919                (name, result)
920            }
921        });
922        let results = futures::future::join_all(futures).await;
923
924        let mut errors = Vec::new();
925        for (name, result) in results {
926            if let Err(e) = result {
927                error!(sink = %name, epoch, error = %e, "[LDB-6016] sink rollback failed");
928                errors.push(format!("sink '{name}': {e}"));
929            }
930        }
931        if errors.is_empty() {
932            Ok(())
933        } else {
934            Err(DbError::Checkpoint(format!(
935                "rollback failed: {}",
936                errors.join("; ")
937            )))
938        }
939    }
940
941    /// Collects the last committed epoch from each sink.
942    fn collect_sink_epochs(&self) -> HashMap<String, u64> {
943        let mut epochs = HashMap::with_capacity(self.sinks.len());
944        for sink in &self.sinks {
945            // The epoch being committed is the current one
946            if sink.exactly_once {
947                epochs.insert(sink.name.clone(), self.epoch);
948            }
949        }
950        epochs
951    }
952
953    /// Returns sorted sink names for topology tracking in the manifest.
954    ///
955    /// Computed once per topology change (via `register_sink`) and
956    /// cached; subsequent checkpoints clone the cached Vec rather than
957    /// re-sorting the sinks list.
958    fn sorted_sink_names(&mut self) -> Vec<String> {
959        if self.cached_sorted_sink_names.is_none() {
960            let mut names: Vec<String> = self.sinks.iter().map(|s| s.name.clone()).collect();
961            names.sort();
962            self.cached_sorted_sink_names = Some(names);
963        }
964        // Invariant: set above.
965        self.cached_sorted_sink_names.as_ref().unwrap().clone()
966    }
967
968    /// Returns the current phase.
969    #[must_use]
970    pub fn phase(&self) -> CheckpointPhase {
971        self.phase
972    }
973
974    /// Returns the current epoch.
975    #[must_use]
976    pub fn epoch(&self) -> u64 {
977        self.epoch
978    }
979
980    /// Returns the next checkpoint ID.
981    #[must_use]
982    pub fn next_checkpoint_id(&self) -> u64 {
983        self.next_checkpoint_id
984    }
985
986    /// Returns the checkpoint config.
987    #[must_use]
988    pub fn config(&self) -> &CheckpointConfig {
989        &self.config
990    }
991
992    /// Returns checkpoint statistics.
993    #[must_use]
994    pub fn stats(&self) -> CheckpointStats {
995        let (p50, p95, p99) = self.duration_histogram.percentiles();
996        // Histogram stores microseconds; stats fields are milliseconds.
997        CheckpointStats {
998            completed: self.checkpoints_completed,
999            failed: self.checkpoints_failed,
1000            last_duration: self.last_checkpoint_duration,
1001            duration_p50_ms: p50 / 1_000,
1002            duration_p95_ms: p95 / 1_000,
1003            duration_p99_ms: p99 / 1_000,
1004            total_bytes_written: self.total_bytes_written,
1005            current_phase: self.phase,
1006            current_epoch: self.epoch,
1007        }
1008    }
1009
1010    /// Returns a reference to the underlying store.
1011    #[must_use]
1012    pub fn store(&self) -> &dyn CheckpointStore {
1013        &*self.store
1014    }
1015
1016    /// Performs a full checkpoint with pre-captured source offsets.
1017    ///
1018    /// When [`CheckpointRequest::source_offset_overrides`] is non-empty,
1019    /// those sources skip the live `snapshot_sources()` call and use the
1020    /// provided offsets instead. This is essential for barrier-aligned
1021    /// checkpoints where source positions must match the operator state
1022    /// at the barrier point.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns `DbError::Checkpoint` if any phase fails.
1027    pub async fn checkpoint_with_offsets(
1028        &mut self,
1029        request: CheckpointRequest,
1030    ) -> Result<CheckpointResult, DbError> {
1031        self.checkpoint_inner(request).await
1032    }
1033
1034    /// Follower half of a cluster checkpoint: pre-commit + save +
1035    /// markers + ack, then wait for the leader's commit/abort.
1036    /// `Ok(true)` = committed, `Ok(false)` = aborted/timed out.
1037    ///
1038    /// # Errors
1039    /// Propagates sink pre-commit, manifest save, or marker-write failures.
1040    #[cfg(feature = "cluster-unstable")]
1041    pub async fn follower_checkpoint(
1042        &mut self,
1043        request: CheckpointRequest,
1044        ann: laminar_core::cluster::control::BarrierAnnouncement,
1045        decision_timeout: Duration,
1046    ) -> Result<bool, DbError> {
1047        use laminar_core::cluster::control::{BarrierAck, Phase};
1048
1049        let Some(cc) = self.cluster_controller.clone() else {
1050            return Err(DbError::Checkpoint(
1051                "[LDB-6033] follower_checkpoint called without cluster controller".into(),
1052            ));
1053        };
1054
1055        let epoch = ann.epoch;
1056        let checkpoint_id = ann.checkpoint_id;
1057        // Align with the leader so a later leader-mode call resumes correctly.
1058        self.epoch = epoch;
1059        self.next_checkpoint_id = checkpoint_id.saturating_add(1);
1060
1061        // Phase 1: local prepare.
1062        let prepare_result = self.follower_prepare(request, epoch, checkpoint_id).await;
1063        let prepare_err = prepare_result.as_ref().err().map(ToString::to_string);
1064        cc.ack_barrier(&BarrierAck {
1065            epoch,
1066            ok: prepare_err.is_none(),
1067            error: prepare_err.clone(),
1068            // Leader folds this into the cluster-wide min before
1069            // announcing Commit. `None` is non-blocking.
1070            local_watermark_ms: self.local_watermark_ms,
1071        })
1072        .await
1073        .ok(); // best effort; leader's quorum wait tolerates missed acks
1074        if let Err(e) = prepare_result {
1075            self.rollback_sinks(epoch).await.ok();
1076            self.phase = CheckpointPhase::Idle;
1077            return Err(e);
1078        }
1079
1080        // Phase 2: wait for the leader's decision.
1081        let deadline = Instant::now() + decision_timeout;
1082        loop {
1083            match cc.observe_barrier().await.ok().flatten() {
1084                Some(a) if a.epoch == epoch && a.phase == Phase::Commit => {
1085                    return Ok(self.drive_follower_commit(epoch, checkpoint_id).await);
1086                }
1087                Some(a) if a.epoch == epoch && a.phase == Phase::Abort => {
1088                    self.rollback_sinks(epoch).await.ok();
1089                    self.checkpoints_failed += 1;
1090                    self.phase = CheckpointPhase::Idle;
1091                    return Ok(false);
1092                }
1093                _ => {}
1094            }
1095            if Instant::now() >= deadline {
1096                let committed = match self.decision_store.as_ref() {
1097                    Some(ds) => ds.is_committed(epoch).await.unwrap_or_else(|e| {
1098                        warn!(
1099                            epoch, checkpoint_id, error = %e,
1100                            "[LDB-6045] decision store read failed — defaulting to Abort",
1101                        );
1102                        false
1103                    }),
1104                    None => false,
1105                };
1106                if committed {
1107                    warn!(
1108                        epoch,
1109                        checkpoint_id,
1110                        "[LDB-6046] follower timeout but marker present — driving commit",
1111                    );
1112                    return Ok(self.drive_follower_commit(epoch, checkpoint_id).await);
1113                }
1114                warn!(
1115                    epoch,
1116                    checkpoint_id, "[LDB-6034] follower decision timeout; rolling back",
1117                );
1118                self.rollback_sinks(epoch).await.ok();
1119                self.checkpoints_failed += 1;
1120                self.phase = CheckpointPhase::Idle;
1121                return Ok(false);
1122            }
1123            tokio::time::sleep(Duration::from_millis(50)).await;
1124        }
1125    }
1126
1127    /// Commit this follower's sinks for `epoch` and update its
1128    /// manifest's Pending entries. Returns `true` on clean commit.
1129    #[cfg(feature = "cluster-unstable")]
1130    async fn drive_follower_commit(&mut self, epoch: u64, checkpoint_id: u64) -> bool {
1131        let statuses = self.commit_sinks_tracked(epoch).await;
1132        let has_failures = statuses
1133            .values()
1134            .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1135        if has_failures {
1136            error!(
1137                epoch,
1138                checkpoint_id, "follower sink commit partially failed — rolling back",
1139            );
1140            self.rollback_sinks(epoch).await.ok();
1141            self.checkpoints_failed += 1;
1142            self.phase = CheckpointPhase::Idle;
1143            return false;
1144        }
1145        // Overwrite the follower's own manifest so the Pending sink
1146        // entries stamped during prepare get replaced with the
1147        // Committed statuses we just produced.
1148        if let Err(e) = self
1149            .persist_recovered_statuses(checkpoint_id, statuses)
1150            .await
1151        {
1152            warn!(
1153                checkpoint_id,
1154                epoch,
1155                error = %e,
1156                "follower post-commit manifest update failed",
1157            );
1158        }
1159        self.checkpoints_completed += 1;
1160        self.epoch = epoch.saturating_add(1);
1161        self.phase = CheckpointPhase::Idle;
1162        true
1163    }
1164
1165    /// Pre-commit + save manifest + write vnode markers.
1166    #[cfg(feature = "cluster-unstable")]
1167    async fn follower_prepare(
1168        &mut self,
1169        request: CheckpointRequest,
1170        epoch: u64,
1171        checkpoint_id: u64,
1172    ) -> Result<(), DbError> {
1173        let CheckpointRequest {
1174            operator_states,
1175            watermark,
1176            table_store_checkpoint_path,
1177            extra_table_offsets,
1178            source_watermarks,
1179            pipeline_hash,
1180            source_offset_overrides,
1181        } = request;
1182
1183        self.phase = CheckpointPhase::PreCommitting;
1184        self.pre_commit_sinks(epoch).await?;
1185
1186        let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1187        manifest.source_offsets = source_offset_overrides;
1188        manifest.table_offsets = extra_table_offsets;
1189        manifest.sink_epochs = self.collect_sink_epochs();
1190        manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1191        manifest.watermark = watermark;
1192        manifest.source_watermarks = source_watermarks;
1193        manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1194        manifest.source_names = {
1195            let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
1196            names.sort();
1197            names
1198        };
1199        manifest.sink_names = self.sorted_sink_names();
1200        manifest.pipeline_hash = pipeline_hash;
1201        let state_data = Self::pack_operator_states(
1202            &mut manifest,
1203            &operator_states,
1204            self.config.state_inline_threshold,
1205        );
1206
1207        self.phase = CheckpointPhase::Persisting;
1208        self.save_manifest(Arc::new(manifest), state_data).await?;
1209        self.write_vnode_markers(epoch, checkpoint_id).await?;
1210        Ok(())
1211    }
1212
1213    /// Shared checkpoint implementation for all checkpoint entry points.
1214    ///
1215    /// When [`CheckpointRequest::source_offset_overrides`] is non-empty,
1216    /// those sources use the provided offsets instead of calling
1217    /// `snapshot_sources()`. This ensures barrier-aligned and pre-captured
1218    /// offsets are used atomically.
1219    #[allow(clippy::too_many_lines)]
1220    async fn checkpoint_inner(
1221        &mut self,
1222        request: CheckpointRequest,
1223    ) -> Result<CheckpointResult, DbError> {
1224        let CheckpointRequest {
1225            operator_states,
1226            watermark,
1227            table_store_checkpoint_path,
1228            extra_table_offsets,
1229            source_watermarks,
1230            pipeline_hash,
1231            source_offset_overrides,
1232        } = request;
1233        let start = Instant::now();
1234        let checkpoint_id = self.next_checkpoint_id;
1235        let epoch = self.epoch;
1236
1237        info!(checkpoint_id, epoch, "starting checkpoint");
1238
1239        // Source offsets are provided by the caller (pre-captured at barrier
1240        // alignment or pre-spawn). Table offsets come from extra_table_offsets.
1241        self.phase = CheckpointPhase::Snapshotting;
1242        let source_offsets = source_offset_overrides;
1243        let table_offsets = extra_table_offsets;
1244
1245        self.phase = CheckpointPhase::PreCommitting;
1246        if let Err(e) = self.pre_commit_sinks(epoch).await {
1247            self.phase = CheckpointPhase::Idle;
1248            self.checkpoints_failed += 1;
1249            // Roll back unconditionally — `rollback_epoch` is idempotent.
1250            // Without this, a poisoned epoch leaves Kafka transactions
1251            // open until the broker-side transaction.timeout.ms fires.
1252            if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1253                error!(
1254                    checkpoint_id,
1255                    epoch,
1256                    error = %rollback_err,
1257                    "[LDB-6004] sink rollback failed after pre-commit failure"
1258                );
1259            }
1260            let duration = start.elapsed();
1261            self.emit_checkpoint_metrics(false, epoch, duration);
1262            error!(checkpoint_id, epoch, error = %e, "pre-commit failed");
1263            return Ok(CheckpointResult {
1264                success: false,
1265                checkpoint_id,
1266                epoch,
1267                duration,
1268                error: Some(format!("pre-commit failed: {e}")),
1269            });
1270        }
1271
1272        let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1273        manifest.source_offsets = source_offsets;
1274        manifest.table_offsets = table_offsets;
1275        manifest.sink_epochs = self.collect_sink_epochs();
1276        // Mark all exactly-once sinks as Pending before commit phase.
1277        manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1278        manifest.watermark = watermark;
1279        // Use caller-provided per-source watermarks if available. When empty,
1280        // leave source_watermarks empty — recovery falls back to the global
1281        // manifest.watermark. Do NOT fabricate per-source values from the
1282        // global watermark, as that loses granularity on recovery.
1283        manifest.source_watermarks = source_watermarks;
1284        manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1285        manifest.source_names = {
1286            let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
1287            names.sort();
1288            names
1289        };
1290        manifest.sink_names = self.sorted_sink_names();
1291        manifest.pipeline_hash = pipeline_hash;
1292
1293        let state_data = Self::pack_operator_states(
1294            &mut manifest,
1295            &operator_states,
1296            self.config.state_inline_threshold,
1297        );
1298        let sidecar_bytes = state_data
1299            .as_ref()
1300            .map_or(0, |chunks| chunks.iter().map(bytes::Bytes::len).sum());
1301        if sidecar_bytes > 0 {
1302            debug!(
1303                checkpoint_id,
1304                sidecar_bytes, "writing operator state sidecar"
1305            );
1306        }
1307
1308        if let Some(cap) = self.config.max_checkpoint_bytes {
1309            if sidecar_bytes > cap {
1310                self.phase = CheckpointPhase::Idle;
1311                self.checkpoints_failed += 1;
1312                let duration = start.elapsed();
1313                self.emit_checkpoint_metrics(false, epoch, duration);
1314                let msg = format!(
1315                    "[LDB-6014] checkpoint size {sidecar_bytes} bytes exceeds \
1316                     cap {cap} bytes — checkpoint rejected"
1317                );
1318                error!(checkpoint_id, epoch, sidecar_bytes, cap, "{msg}");
1319                return Ok(CheckpointResult {
1320                    success: false,
1321                    checkpoint_id,
1322                    epoch,
1323                    duration,
1324                    error: Some(msg),
1325                });
1326            }
1327            let warn_threshold = cap * 4 / 5; // 80%
1328            if sidecar_bytes > warn_threshold {
1329                warn!(
1330                    checkpoint_id,
1331                    epoch, sidecar_bytes, cap, "checkpoint size approaching cap (>80%)"
1332                );
1333            }
1334        }
1335        let checkpoint_bytes = sidecar_bytes as u64;
1336
1337        self.phase = CheckpointPhase::Persisting;
1338        // Arc-wrap so the save task gets a cheap refcount bump instead of
1339        // a deep clone. After `save_manifest.await` the task drops its
1340        // Arc and we're the sole owner; `Arc::make_mut` below gets us a
1341        // free mutable reference for the post-commit sink-status update.
1342        let mut manifest = Arc::new(manifest);
1343        if let Err(e) = self.save_manifest(Arc::clone(&manifest), state_data).await {
1344            self.phase = CheckpointPhase::Idle;
1345            self.checkpoints_failed += 1;
1346            let duration = start.elapsed();
1347            self.emit_checkpoint_metrics(false, epoch, duration);
1348            if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1349                error!(
1350                    checkpoint_id,
1351                    epoch,
1352                    error = %rollback_err,
1353                    "[LDB-6004] sink rollback failed after manifest persist failure — \
1354                     sinks may be in an inconsistent state"
1355                );
1356            }
1357            error!(checkpoint_id, epoch, error = %e, "[LDB-6008] manifest persist failed");
1358            return Ok(CheckpointResult {
1359                success: false,
1360                checkpoint_id,
1361                epoch,
1362                duration,
1363                error: Some(format!("manifest persist failed: {e}")),
1364            });
1365        }
1366
1367        // Bridge: publish per-vnode durability markers so the gate below
1368        // has something to check. Replace with real per-operator partial
1369        // writes when operators adopt the state backend directly.
1370        if let Err(e) = self.write_vnode_markers(epoch, checkpoint_id).await {
1371            self.phase = CheckpointPhase::Idle;
1372            self.checkpoints_failed += 1;
1373            let duration = start.elapsed();
1374            self.emit_checkpoint_metrics(false, epoch, duration);
1375            if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1376                error!(
1377                    checkpoint_id,
1378                    epoch,
1379                    error = %rollback_err,
1380                    "[LDB-6025] sink rollback failed after marker write failure",
1381                );
1382            }
1383            error!(checkpoint_id, epoch, error = %e, "vnode marker write failed");
1384            return Ok(CheckpointResult {
1385                success: false,
1386                checkpoint_id,
1387                epoch,
1388                duration,
1389                error: Some(format!("vnode marker write failed: {e}")),
1390            });
1391        }
1392
1393        // Cluster 2PC phase 1: announce PREPARE and wait for followers
1394        // to snapshot + ack.
1395        #[cfg(feature = "cluster-unstable")]
1396        {
1397            if let Some(quorum_failure) = self.await_prepare_quorum(epoch, checkpoint_id).await {
1398                self.phase = CheckpointPhase::Idle;
1399                self.checkpoints_failed += 1;
1400                let duration = start.elapsed();
1401                self.emit_checkpoint_metrics(false, epoch, duration);
1402                if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1403                    error!(
1404                        checkpoint_id,
1405                        epoch,
1406                        error = %rollback_err,
1407                        "[LDB-6032] sink rollback failed after quorum miss",
1408                    );
1409                }
1410                return Ok(CheckpointResult {
1411                    success: false,
1412                    checkpoint_id,
1413                    epoch,
1414                    duration,
1415                    error: Some(quorum_failure),
1416                });
1417            }
1418        }
1419
1420        // Durability gate: confirm every participating vnode has its
1421        // partial persisted before sinks commit. `gate_vnode_set` is the
1422        // FULL registry in cluster mode — the leader verifies markers
1423        // from every follower's shared-storage writes, not just its own.
1424        // Single-instance defaults to `vnode_set` (the two match).
1425        if let Some(ref backend) = self.state_backend {
1426            if !self.gate_vnode_set.is_empty() {
1427                match backend.epoch_complete(epoch, &self.gate_vnode_set).await {
1428                    Ok(true) => {}
1429                    Ok(false) => {
1430                        warn!(
1431                            checkpoint_id,
1432                            epoch,
1433                            vnodes = self.gate_vnode_set.len(),
1434                            "[LDB-6020] state durability gate returned false — \
1435                             rolling back sinks",
1436                        );
1437                        #[cfg(feature = "cluster-unstable")]
1438                        self.announce_if_leader(
1439                            epoch,
1440                            checkpoint_id,
1441                            laminar_core::cluster::control::Phase::Abort,
1442                            None,
1443                        )
1444                        .await;
1445                        self.checkpoints_failed += 1;
1446                        self.phase = CheckpointPhase::Idle;
1447                        let duration = start.elapsed();
1448                        self.emit_checkpoint_metrics(false, epoch, duration);
1449                        if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1450                            error!(
1451                                checkpoint_id,
1452                                epoch,
1453                                error = %rollback_err,
1454                                "[LDB-6021] sink rollback failed after durability gate miss",
1455                            );
1456                        }
1457                        return Ok(CheckpointResult {
1458                            success: false,
1459                            checkpoint_id,
1460                            epoch,
1461                            duration,
1462                            error: Some("state durability gate: not all vnodes persisted".into()),
1463                        });
1464                    }
1465                    Err(e) => {
1466                        warn!(
1467                            checkpoint_id,
1468                            epoch,
1469                            error = %e,
1470                            "[LDB-6022] state backend error during durability gate — \
1471                             treating as gate miss, rolling back sinks",
1472                        );
1473                        #[cfg(feature = "cluster-unstable")]
1474                        self.announce_if_leader(
1475                            epoch,
1476                            checkpoint_id,
1477                            laminar_core::cluster::control::Phase::Abort,
1478                            None,
1479                        )
1480                        .await;
1481                        self.checkpoints_failed += 1;
1482                        self.phase = CheckpointPhase::Idle;
1483                        let duration = start.elapsed();
1484                        self.emit_checkpoint_metrics(false, epoch, duration);
1485                        if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1486                            error!(
1487                                checkpoint_id,
1488                                epoch,
1489                                error = %rollback_err,
1490                                "[LDB-6023] sink rollback failed after durability gate error",
1491                            );
1492                        }
1493                        return Ok(CheckpointResult {
1494                            success: false,
1495                            checkpoint_id,
1496                            epoch,
1497                            duration,
1498                            error: Some(format!("state durability gate: {e}")),
1499                        });
1500                    }
1501                }
1502            }
1503        }
1504
1505        // Record the commit marker before announcing Commit so a new
1506        // leader elected mid-2PC can recover the decision. A
1507        // cluster leader without a decision store is a misconfig.
1508        #[cfg(feature = "cluster-unstable")]
1509        if self
1510            .cluster_controller
1511            .as_ref()
1512            .is_some_and(|cc| cc.is_leader())
1513        {
1514            let marker_result = match self.decision_store.as_ref() {
1515                Some(ds) => ds.record_committed(epoch).await.map_err(|e| e.to_string()),
1516                None => Err("no decision store configured for cluster leader".to_string()),
1517            };
1518            if let Err(reason) = marker_result {
1519                error!(
1520                    checkpoint_id, epoch, error = %reason,
1521                    "[LDB-6038] cannot record commit marker — aborting epoch",
1522                );
1523                self.announce_if_leader(
1524                    epoch,
1525                    checkpoint_id,
1526                    laminar_core::cluster::control::Phase::Abort,
1527                    None,
1528                )
1529                .await;
1530                self.checkpoints_failed += 1;
1531                self.phase = CheckpointPhase::Idle;
1532                let duration = start.elapsed();
1533                self.emit_checkpoint_metrics(false, epoch, duration);
1534                if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1535                    error!(
1536                        checkpoint_id, epoch, error = %rollback_err,
1537                        "[LDB-6039] sink rollback failed after commit marker failure",
1538                    );
1539                }
1540                return Ok(CheckpointResult {
1541                    success: false,
1542                    checkpoint_id,
1543                    epoch,
1544                    duration,
1545                    error: Some(format!("commit marker: {reason}")),
1546                });
1547            }
1548        }
1549
1550        #[cfg(feature = "cluster-unstable")]
1551        // Fan out the cluster-wide min watermark computed during
1552        // `await_prepare_quorum`. Followers consume this from
1553        // `observe_barrier` and update their consumer-side view.
1554        self.announce_if_leader(
1555            epoch,
1556            checkpoint_id,
1557            laminar_core::cluster::control::Phase::Commit,
1558            self.cluster_min_watermark,
1559        )
1560        .await;
1561
1562        self.phase = CheckpointPhase::Committing;
1563        let sink_statuses = self.commit_sinks_tracked(epoch).await;
1564        let has_failures = sink_statuses
1565            .values()
1566            .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1567
1568        if !sink_statuses.is_empty() {
1569            // `Arc::make_mut`: COW. Refcount is 1 here (spawn_blocking
1570            // task in save_manifest has already returned and dropped its
1571            // clone), so this is a zero-copy mutable borrow.
1572            Arc::make_mut(&mut manifest).sink_commit_statuses = sink_statuses;
1573            if let Err(e) = self.update_manifest_only(Arc::clone(&manifest)).await {
1574                warn!(
1575                    checkpoint_id,
1576                    epoch,
1577                    error = %e,
1578                    "post-commit manifest update failed"
1579                );
1580            }
1581        }
1582
1583        if has_failures {
1584            self.checkpoints_failed += 1;
1585            error!(
1586                checkpoint_id,
1587                epoch, "sink commit partially failed — epoch NOT advanced, will retry"
1588            );
1589            self.phase = CheckpointPhase::Idle;
1590            let duration = start.elapsed();
1591            self.emit_checkpoint_metrics(false, epoch, duration);
1592            return Ok(CheckpointResult {
1593                success: false,
1594                checkpoint_id,
1595                epoch,
1596                duration,
1597                error: Some("partial sink commit failure".into()),
1598            });
1599        }
1600
1601        self.phase = CheckpointPhase::Idle;
1602        self.next_checkpoint_id += 1;
1603        self.epoch += 1;
1604        self.checkpoints_completed += 1;
1605        self.total_bytes_written += checkpoint_bytes;
1606        let duration = start.elapsed();
1607        self.last_checkpoint_duration = Some(duration);
1608        self.duration_histogram.record(duration);
1609        self.emit_checkpoint_metrics(true, epoch, duration);
1610
1611        // Emit checkpoint size metrics.
1612        if let Some(ref m) = self.prom {
1613            #[allow(clippy::cast_possible_wrap)]
1614            m.checkpoint_size_bytes.set(checkpoint_bytes as i64);
1615        }
1616
1617        // Garbage-collect state-backend partials / commit markers for
1618        // epochs no longer needed for recovery. Without this the
1619        // in-process backend grows per-checkpoint forever and the
1620        // object-store backend leaks `epoch=N/…` objects indefinitely.
1621        // `max_retained` is in terms of checkpoints which map 1:1 to
1622        // epochs here, so prune everything older than
1623        // `epoch - max_retained`.
1624        if let Some(ref backend) = self.state_backend {
1625            let horizon = epoch.saturating_sub(self.config.max_retained as u64);
1626            if horizon > 0 {
1627                if let Err(e) = backend.prune_before(horizon).await {
1628                    warn!(
1629                        epoch,
1630                        horizon,
1631                        error = %e,
1632                        "[LDB-6026] state backend prune failed; old partials will linger"
1633                    );
1634                }
1635                #[cfg(feature = "cluster-unstable")]
1636                if let Some(ref ds) = self.decision_store {
1637                    if let Err(e) = ds.prune_before(horizon).await {
1638                        warn!(epoch, horizon, error = %e, "decision prune failed");
1639                    }
1640                }
1641            }
1642        }
1643
1644        let next_epoch = self.epoch;
1645        let begin_epoch_error = match self.begin_epoch_for_sinks(next_epoch).await {
1646            Ok(()) => None,
1647            Err(e) => {
1648                error!(
1649                    next_epoch,
1650                    error = %e,
1651                    "[LDB-6015] failed to begin next epoch — writes will be non-transactional"
1652                );
1653                Some(e.to_string())
1654            }
1655        };
1656
1657        info!(
1658            checkpoint_id,
1659            epoch,
1660            duration_ms = duration.as_millis(),
1661            "checkpoint completed"
1662        );
1663
1664        // The checkpoint itself succeeded (state persisted, sinks committed).
1665        // begin_epoch failure for the *next* epoch is reported as a warning
1666        // but does not retroactively fail the completed checkpoint.
1667        Ok(CheckpointResult {
1668            success: true,
1669            checkpoint_id,
1670            epoch,
1671            duration,
1672            error: begin_epoch_error,
1673        })
1674    }
1675
1676    /// Attempts recovery from the latest checkpoint.
1677    ///
1678    /// Creates a [`RecoveryManager`](crate::recovery_manager::RecoveryManager)
1679    /// using the coordinator's store and delegates recovery to it.
1680    /// On success, advances `self.epoch` past the recovered epoch so the
1681    /// next checkpoint gets a fresh epoch number.
1682    ///
1683    /// Returns `Ok(None)` for a fresh start (no checkpoint found).
1684    ///
1685    /// # Errors
1686    ///
1687    /// Returns `DbError::Checkpoint` if the store itself fails.
1688    pub async fn recover(
1689        &mut self,
1690    ) -> Result<Option<crate::recovery_manager::RecoveredState>, DbError> {
1691        use crate::recovery_manager::RecoveryManager;
1692
1693        let mgr = RecoveryManager::new(&*self.store);
1694        // Sources and table sources are restored by the pipeline lifecycle
1695        // (via SourceRegistration.restore_checkpoint), not by the coordinator.
1696        // Pass empty slices — the coordinator only manages sink recovery here.
1697        let result = mgr.recover(&[], &self.sinks, &[]).await?;
1698
1699        if let Some(ref recovered) = result {
1700            // Advance epoch past the recovered one
1701            self.epoch = recovered.epoch() + 1;
1702            self.next_checkpoint_id = recovered.manifest.checkpoint_id + 1;
1703            info!(
1704                epoch = self.epoch,
1705                checkpoint_id = self.next_checkpoint_id,
1706                "coordinator epoch set after recovery"
1707            );
1708        }
1709
1710        Ok(result)
1711    }
1712
1713    /// Loads the latest manifest from the store.
1714    ///
1715    /// # Errors
1716    ///
1717    /// Returns `DbError::Checkpoint` on store errors.
1718    pub async fn load_latest_manifest(&self) -> Result<Option<CheckpointManifest>, DbError> {
1719        self.store.load_latest().await.map_err(DbError::from)
1720    }
1721}
1722
1723impl std::fmt::Debug for CheckpointCoordinator {
1724    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1725        f.debug_struct("CheckpointCoordinator")
1726            .field("epoch", &self.epoch)
1727            .field("next_checkpoint_id", &self.next_checkpoint_id)
1728            .field("phase", &self.phase)
1729            .field("sinks", &self.sinks.len())
1730            .field("completed", &self.checkpoints_completed)
1731            .field("failed", &self.checkpoints_failed)
1732            .finish_non_exhaustive()
1733    }
1734}
1735
1736/// Fixed-size ring buffer for duration percentile tracking.
1737///
1738/// Stores the last `CAPACITY` durations in **microseconds** and computes
1739/// p50/p95/p99 via sorted extraction. No heap allocation after construction.
1740#[derive(Clone)]
1741pub struct DurationHistogram {
1742    /// Ring buffer of durations in microseconds.
1743    samples: Box<[u64; Self::CAPACITY]>,
1744    /// Write cursor (wraps at `CAPACITY`).
1745    cursor: usize,
1746    /// Total samples written (may exceed `CAPACITY`).
1747    count: u64,
1748}
1749
1750impl Default for DurationHistogram {
1751    fn default() -> Self {
1752        Self::new()
1753    }
1754}
1755
1756impl DurationHistogram {
1757    const CAPACITY: usize = 100;
1758
1759    /// Creates an empty histogram.
1760    #[must_use]
1761    pub fn new() -> Self {
1762        Self {
1763            samples: Box::new([0; Self::CAPACITY]),
1764            cursor: 0,
1765            count: 0,
1766        }
1767    }
1768
1769    /// Records a duration sample (stored in microseconds).
1770    pub fn record(&mut self, duration: Duration) {
1771        #[allow(clippy::cast_possible_truncation)]
1772        let us = duration.as_micros() as u64;
1773        self.samples[self.cursor] = us;
1774        self.cursor = (self.cursor + 1) % Self::CAPACITY;
1775        self.count += 1;
1776    }
1777
1778    /// Returns `true` if no samples have been recorded.
1779    #[must_use]
1780    pub fn is_empty(&self) -> bool {
1781        self.count == 0
1782    }
1783
1784    /// Returns the number of recorded samples (up to `CAPACITY`).
1785    #[must_use]
1786    pub fn len(&self) -> usize {
1787        if self.count >= Self::CAPACITY as u64 {
1788            Self::CAPACITY
1789        } else {
1790            // SAFETY: count < CAPACITY (100), which always fits in usize.
1791            #[allow(clippy::cast_possible_truncation)]
1792            {
1793                self.count as usize
1794            }
1795        }
1796    }
1797
1798    /// Computes a percentile (0.0–1.0) from recorded samples.
1799    ///
1800    /// Returns 0 if no samples have been recorded.
1801    #[must_use]
1802    pub fn percentile(&self, p: f64) -> u64 {
1803        let n = self.len();
1804        if n == 0 {
1805            return 0;
1806        }
1807        let mut sorted: Vec<u64> = self.samples[..n].to_vec();
1808        sorted.sort_unstable();
1809        #[allow(
1810            clippy::cast_possible_truncation,
1811            clippy::cast_sign_loss,
1812            clippy::cast_precision_loss
1813        )]
1814        let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
1815        sorted[idx]
1816    }
1817
1818    /// Returns (p50, p95, p99) in microseconds. Sorts once.
1819    #[must_use]
1820    pub fn percentiles(&self) -> (u64, u64, u64) {
1821        let n = self.len();
1822        if n == 0 {
1823            return (0, 0, 0);
1824        }
1825        let mut sorted: Vec<u64> = self.samples[..n].to_vec();
1826        sorted.sort_unstable();
1827        #[allow(
1828            clippy::cast_possible_truncation,
1829            clippy::cast_sign_loss,
1830            clippy::cast_precision_loss
1831        )]
1832        let at = |p: f64| -> u64 {
1833            let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
1834            sorted[idx]
1835        };
1836        (at(0.50), at(0.95), at(0.99))
1837    }
1838}
1839
1840impl std::fmt::Debug for DurationHistogram {
1841    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1842        let (p50, p95, p99) = self.percentiles();
1843        f.debug_struct("DurationHistogram")
1844            .field("samples_len", &self.samples.len())
1845            .field("cursor", &self.cursor)
1846            .field("count", &self.count)
1847            .field("p50_us", &p50)
1848            .field("p95_us", &p95)
1849            .field("p99_us", &p99)
1850            .finish()
1851    }
1852}
1853
1854/// Checkpoint performance statistics.
1855#[derive(Debug, Clone, serde::Serialize)]
1856pub struct CheckpointStats {
1857    /// Total completed checkpoints.
1858    pub completed: u64,
1859    /// Total failed checkpoints.
1860    pub failed: u64,
1861    /// Duration of the last checkpoint.
1862    pub last_duration: Option<Duration>,
1863    /// p50 checkpoint duration in milliseconds.
1864    pub duration_p50_ms: u64,
1865    /// p95 checkpoint duration in milliseconds.
1866    pub duration_p95_ms: u64,
1867    /// p99 checkpoint duration in milliseconds.
1868    pub duration_p99_ms: u64,
1869    /// Total bytes written across all checkpoints.
1870    pub total_bytes_written: u64,
1871    /// Current checkpoint phase.
1872    pub current_phase: CheckpointPhase,
1873    /// Current epoch number.
1874    pub current_epoch: u64,
1875}
1876
1877/// Converts a `SourceCheckpoint` to a `ConnectorCheckpoint`.
1878#[must_use]
1879pub fn source_to_connector_checkpoint(cp: &SourceCheckpoint) -> ConnectorCheckpoint {
1880    ConnectorCheckpoint {
1881        offsets: cp.offsets().clone(),
1882        epoch: cp.epoch(),
1883        metadata: cp.metadata().clone(),
1884    }
1885}
1886
1887/// Converts a `ConnectorCheckpoint` back to a `SourceCheckpoint`.
1888#[must_use]
1889pub fn connector_to_source_checkpoint(cp: &ConnectorCheckpoint) -> SourceCheckpoint {
1890    let mut source_cp = SourceCheckpoint::with_offsets(cp.epoch, cp.offsets.clone());
1891    for (k, v) in &cp.metadata {
1892        source_cp.set_metadata(k.clone(), v.clone());
1893    }
1894    source_cp
1895}
1896
1897#[cfg(test)]
1898mod tests {
1899    use super::*;
1900    use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
1901
1902    async fn make_coordinator(dir: &std::path::Path) -> CheckpointCoordinator {
1903        let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
1904        CheckpointCoordinator::new(CheckpointConfig::default(), store)
1905            .await
1906            .unwrap()
1907    }
1908
1909    #[tokio::test]
1910    async fn test_coordinator_new() {
1911        let dir = tempfile::tempdir().unwrap();
1912        let coord = make_coordinator(dir.path()).await;
1913
1914        assert_eq!(coord.epoch(), 1);
1915        assert_eq!(coord.next_checkpoint_id(), 1);
1916        assert_eq!(coord.phase(), CheckpointPhase::Idle);
1917    }
1918
1919    #[tokio::test]
1920    async fn test_coordinator_resumes_from_stored_checkpoint() {
1921        let dir = tempfile::tempdir().unwrap();
1922
1923        // Save a checkpoint manually
1924        let store = FileSystemCheckpointStore::new(dir.path(), 3);
1925        let m = CheckpointManifest::new(5, 10);
1926        store.save(&m).await.unwrap();
1927
1928        // Coordinator should resume from epoch 11, checkpoint_id 6
1929        let coord = make_coordinator(dir.path()).await;
1930        assert_eq!(coord.epoch(), 11);
1931        assert_eq!(coord.next_checkpoint_id(), 6);
1932    }
1933
1934    #[test]
1935    fn test_checkpoint_phase_display() {
1936        assert_eq!(CheckpointPhase::Idle.to_string(), "Idle");
1937        assert_eq!(CheckpointPhase::Snapshotting.to_string(), "Snapshotting");
1938        assert_eq!(CheckpointPhase::PreCommitting.to_string(), "PreCommitting");
1939        assert_eq!(CheckpointPhase::Persisting.to_string(), "Persisting");
1940        assert_eq!(CheckpointPhase::Committing.to_string(), "Committing");
1941    }
1942
1943    #[test]
1944    fn test_source_to_connector_checkpoint() {
1945        let mut cp = SourceCheckpoint::new(5);
1946        cp.set_offset("partition-0", "1234");
1947        cp.set_metadata("topic", "events");
1948
1949        let cc = source_to_connector_checkpoint(&cp);
1950        assert_eq!(cc.epoch, 5);
1951        assert_eq!(cc.offsets.get("partition-0"), Some(&"1234".into()));
1952        assert_eq!(cc.metadata.get("topic"), Some(&"events".into()));
1953    }
1954
1955    #[test]
1956    fn test_connector_to_source_checkpoint() {
1957        let cc = ConnectorCheckpoint {
1958            offsets: HashMap::from([("lsn".into(), "0/ABCD".into())]),
1959            epoch: 3,
1960            metadata: HashMap::from([("type".into(), "postgres".into())]),
1961        };
1962
1963        let cp = connector_to_source_checkpoint(&cc);
1964        assert_eq!(cp.epoch(), 3);
1965        assert_eq!(cp.get_offset("lsn"), Some("0/ABCD"));
1966        assert_eq!(cp.get_metadata("type"), Some("postgres"));
1967    }
1968
1969    #[tokio::test]
1970    async fn test_stats_initial() {
1971        let dir = tempfile::tempdir().unwrap();
1972        let coord = make_coordinator(dir.path()).await;
1973        let stats = coord.stats();
1974
1975        assert_eq!(stats.completed, 0);
1976        assert_eq!(stats.failed, 0);
1977        assert!(stats.last_duration.is_none());
1978        assert_eq!(stats.duration_p50_ms, 0);
1979        assert_eq!(stats.duration_p95_ms, 0);
1980        assert_eq!(stats.duration_p99_ms, 0);
1981        assert_eq!(stats.current_phase, CheckpointPhase::Idle);
1982    }
1983
1984    #[tokio::test]
1985    async fn test_checkpoint_no_sources_no_sinks() {
1986        let dir = tempfile::tempdir().unwrap();
1987        let mut coord = make_coordinator(dir.path()).await;
1988
1989        let result = coord
1990            .checkpoint(CheckpointRequest {
1991                watermark: Some(1000),
1992                ..CheckpointRequest::default()
1993            })
1994            .await
1995            .unwrap();
1996
1997        assert!(result.success);
1998        assert_eq!(result.checkpoint_id, 1);
1999        assert_eq!(result.epoch, 1);
2000
2001        // Verify manifest was persisted
2002        let loaded = coord.store().load_latest().await.unwrap().unwrap();
2003        assert_eq!(loaded.checkpoint_id, 1);
2004        assert_eq!(loaded.epoch, 1);
2005        assert_eq!(loaded.watermark, Some(1000));
2006
2007        // Second checkpoint should increment
2008        let result2 = coord
2009            .checkpoint(CheckpointRequest {
2010                watermark: Some(2000),
2011                ..CheckpointRequest::default()
2012            })
2013            .await
2014            .unwrap();
2015
2016        assert!(result2.success);
2017        assert_eq!(result2.checkpoint_id, 2);
2018        assert_eq!(result2.epoch, 2);
2019
2020        let stats = coord.stats();
2021        assert_eq!(stats.completed, 2);
2022        assert_eq!(stats.failed, 0);
2023    }
2024
2025    #[tokio::test]
2026    async fn test_checkpoint_with_operator_states() {
2027        let dir = tempfile::tempdir().unwrap();
2028        let mut coord = make_coordinator(dir.path()).await;
2029
2030        let mut ops = HashMap::new();
2031        ops.insert(
2032            "window-agg".into(),
2033            bytes::Bytes::from_static(b"state-data"),
2034        );
2035        ops.insert("filter".into(), bytes::Bytes::from_static(b"filter-state"));
2036
2037        let result = coord
2038            .checkpoint(CheckpointRequest {
2039                operator_states: ops,
2040                ..CheckpointRequest::default()
2041            })
2042            .await
2043            .unwrap();
2044
2045        assert!(result.success);
2046
2047        let loaded = coord.store().load_latest().await.unwrap().unwrap();
2048        assert_eq!(loaded.operator_states.len(), 2);
2049
2050        let window_op = loaded.operator_states.get("window-agg").unwrap();
2051        assert_eq!(window_op.decode_inline().unwrap(), b"state-data");
2052    }
2053
2054    #[tokio::test]
2055    async fn test_checkpoint_with_table_store_path() {
2056        let dir = tempfile::tempdir().unwrap();
2057        let mut coord = make_coordinator(dir.path()).await;
2058
2059        let result = coord
2060            .checkpoint(CheckpointRequest {
2061                table_store_checkpoint_path: Some("/tmp/rocksdb_cp".into()),
2062                ..CheckpointRequest::default()
2063            })
2064            .await
2065            .unwrap();
2066
2067        assert!(result.success);
2068
2069        let loaded = coord.store().load_latest().await.unwrap().unwrap();
2070        assert_eq!(
2071            loaded.table_store_checkpoint_path.as_deref(),
2072            Some("/tmp/rocksdb_cp")
2073        );
2074    }
2075
2076    #[tokio::test]
2077    async fn test_load_latest_manifest_empty() {
2078        let dir = tempfile::tempdir().unwrap();
2079        let coord = make_coordinator(dir.path()).await;
2080        assert!(coord.load_latest_manifest().await.unwrap().is_none());
2081    }
2082
2083    #[tokio::test]
2084    async fn test_coordinator_debug() {
2085        let dir = tempfile::tempdir().unwrap();
2086        let coord = make_coordinator(dir.path()).await;
2087        let debug = format!("{coord:?}");
2088        assert!(debug.contains("CheckpointCoordinator"));
2089        assert!(debug.contains("epoch: 1"));
2090    }
2091
2092    #[tokio::test]
2093    async fn test_checkpoint_emits_metrics_on_success() {
2094        let dir = tempfile::tempdir().unwrap();
2095        let mut coord = make_coordinator(dir.path()).await;
2096
2097        let registry = prometheus::Registry::new();
2098        let prom = Arc::new(crate::engine_metrics::EngineMetrics::new(&registry));
2099        coord.set_metrics(Arc::clone(&prom));
2100
2101        let result = coord
2102            .checkpoint(CheckpointRequest {
2103                watermark: Some(1000),
2104                ..CheckpointRequest::default()
2105            })
2106            .await
2107            .unwrap();
2108
2109        assert!(result.success);
2110        assert_eq!(prom.checkpoints_completed.get(), 1);
2111        assert_eq!(prom.checkpoints_failed.get(), 0);
2112        assert_eq!(prom.checkpoint_epoch.get(), 1);
2113
2114        // Second checkpoint
2115        let result2 = coord
2116            .checkpoint(CheckpointRequest {
2117                watermark: Some(2000),
2118                ..CheckpointRequest::default()
2119            })
2120            .await
2121            .unwrap();
2122
2123        assert!(result2.success);
2124        assert_eq!(prom.checkpoints_completed.get(), 2);
2125        assert_eq!(prom.checkpoint_epoch.get(), 2);
2126    }
2127
2128    #[tokio::test]
2129    async fn test_checkpoint_without_metrics() {
2130        // Verify checkpoint works fine without metrics set
2131        let dir = tempfile::tempdir().unwrap();
2132        let mut coord = make_coordinator(dir.path()).await;
2133
2134        let result = coord
2135            .checkpoint(CheckpointRequest::default())
2136            .await
2137            .unwrap();
2138
2139        assert!(result.success);
2140        // No panics — metrics emission is a no-op
2141    }
2142
2143    #[test]
2144    fn test_histogram_empty() {
2145        let h = DurationHistogram::new();
2146        assert_eq!(h.len(), 0);
2147        assert_eq!(h.percentile(0.50), 0);
2148        assert_eq!(h.percentile(0.99), 0);
2149        let (p50, p95, p99) = h.percentiles();
2150        assert_eq!((p50, p95, p99), (0, 0, 0));
2151    }
2152
2153    #[test]
2154    fn test_histogram_single_sample() {
2155        let mut h = DurationHistogram::new();
2156        h.record(Duration::from_millis(42));
2157        assert_eq!(h.len(), 1);
2158        // 42ms = 42_000μs
2159        assert_eq!(h.percentile(0.50), 42_000);
2160        assert_eq!(h.percentile(0.99), 42_000);
2161    }
2162
2163    #[test]
2164    fn test_histogram_sub_millisecond() {
2165        let mut h = DurationHistogram::new();
2166        // 500μs — previously truncated to 0 with as_millis()
2167        h.record(Duration::from_micros(500));
2168        assert_eq!(h.percentile(0.50), 500);
2169        assert_eq!(h.percentile(0.99), 500);
2170    }
2171
2172    #[test]
2173    fn test_histogram_percentiles() {
2174        let mut h = DurationHistogram::new();
2175        // Record 1..=100ms in order → 1000..=100_000 μs.
2176        for i in 1..=100 {
2177            h.record(Duration::from_millis(i));
2178        }
2179        assert_eq!(h.len(), 100);
2180
2181        let p50 = h.percentile(0.50);
2182        let p95 = h.percentile(0.95);
2183        let p99 = h.percentile(0.99);
2184
2185        // Values in μs: 1000..=100_000
2186        //   p50 ≈ 50_000, p95 ≈ 95_000, p99 ≈ 99_000
2187        assert!((49_000..=51_000).contains(&p50), "p50={p50}");
2188        assert!((94_000..=96_000).contains(&p95), "p95={p95}");
2189        assert!((98_000..=100_000).contains(&p99), "p99={p99}");
2190    }
2191
2192    #[test]
2193    fn test_histogram_wraps_ring_buffer() {
2194        let mut h = DurationHistogram::new();
2195        // Write 150 samples — first 50 are overwritten.
2196        for i in 1..=150 {
2197            h.record(Duration::from_millis(i));
2198        }
2199        assert_eq!(h.len(), 100);
2200        assert_eq!(h.count, 150);
2201
2202        // Only samples 51..=150 remain in the buffer (51_000..=150_000 μs).
2203        let p50 = h.percentile(0.50);
2204        assert!((99_000..=101_000).contains(&p50), "p50={p50}");
2205    }
2206
2207    #[tokio::test]
2208    async fn test_sidecar_round_trip() {
2209        let dir = tempfile::tempdir().unwrap();
2210        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2211        let config = CheckpointConfig {
2212            state_inline_threshold: 100, // 100 bytes threshold
2213            ..CheckpointConfig::default()
2214        };
2215        let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
2216
2217        // Small state stays inline, large state goes to sidecar
2218        let mut ops = HashMap::new();
2219        ops.insert("small".into(), bytes::Bytes::from(vec![0xAAu8; 50]));
2220        ops.insert("large".into(), bytes::Bytes::from(vec![0xBBu8; 200]));
2221
2222        let result = coord
2223            .checkpoint(CheckpointRequest {
2224                operator_states: ops,
2225                ..CheckpointRequest::default()
2226            })
2227            .await
2228            .unwrap();
2229        assert!(result.success);
2230
2231        // Verify manifest
2232        let loaded = coord.store().load_latest().await.unwrap().unwrap();
2233        let small_op = loaded.operator_states.get("small").unwrap();
2234        assert!(!small_op.external, "small state should be inline");
2235        assert_eq!(small_op.decode_inline().unwrap(), vec![0xAAu8; 50]);
2236
2237        let large_op = loaded.operator_states.get("large").unwrap();
2238        assert!(large_op.external, "large state should be external");
2239        assert_eq!(large_op.external_length, 200);
2240
2241        // Verify sidecar file exists and has correct data
2242        let state_data = coord.store().load_state_data(1).await.unwrap().unwrap();
2243        assert_eq!(state_data.len(), 200);
2244        assert!(state_data.iter().all(|&b| b == 0xBB));
2245    }
2246
2247    #[tokio::test]
2248    async fn test_all_inline_no_sidecar() {
2249        let dir = tempfile::tempdir().unwrap();
2250        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2251        let config = CheckpointConfig::default(); // 1MB threshold
2252        let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
2253
2254        let mut ops = HashMap::new();
2255        ops.insert("op1".into(), bytes::Bytes::from_static(b"small-state"));
2256
2257        let result = coord
2258            .checkpoint(CheckpointRequest {
2259                operator_states: ops,
2260                ..CheckpointRequest::default()
2261            })
2262            .await
2263            .unwrap();
2264        assert!(result.success);
2265
2266        // No sidecar file
2267        assert!(coord.store().load_state_data(1).await.unwrap().is_none());
2268    }
2269
2270    // Durability gate tests.
2271
2272    #[tokio::test]
2273    async fn durability_gate_skipped_when_vnode_set_empty() {
2274        // With no state backend installed AND empty vnode set, the commit
2275        // path behaves as before. Regression guard: the durability gate
2276        // must not change single-instance semantics.
2277        let dir = tempfile::tempdir().unwrap();
2278        let mut coord = make_coordinator(dir.path()).await;
2279        let result = coord
2280            .checkpoint(CheckpointRequest::default())
2281            .await
2282            .unwrap();
2283        assert!(result.success, "baseline checkpoint must succeed");
2284    }
2285
2286    #[tokio::test]
2287    async fn bridge_writes_markers_and_gate_passes() {
2288        use laminar_core::state::InProcessBackend;
2289        let dir = tempfile::tempdir().unwrap();
2290        let mut coord = make_coordinator(dir.path()).await;
2291        let backend = Arc::new(InProcessBackend::new(4));
2292        coord.set_state_backend(backend.clone());
2293        coord.set_vnode_set(vec![0, 1, 2, 3]);
2294
2295        let result = coord
2296            .checkpoint(CheckpointRequest::default())
2297            .await
2298            .unwrap();
2299        assert!(result.success, "bridge writes markers → gate passes");
2300        // Every owned vnode has a marker for the completed epoch.
2301        for v in 0..4 {
2302            assert!(
2303                backend.read_partial(v, 1).await.unwrap().is_some(),
2304                "bridge should have written marker for vnode {v}",
2305            );
2306        }
2307    }
2308
2309    #[cfg(feature = "cluster-unstable")]
2310    #[tokio::test]
2311    async fn reconcile_announces_abort_when_no_decision_store() {
2312        // Fallback path: if no decision store is wired (e.g. legacy
2313        // deployments), absence of a marker == Abort. This is the
2314        // pre-decision-store behavior preserved for compatibility.
2315        use laminar_core::cluster::control::{
2316            BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase, ANNOUNCEMENT_KEY,
2317        };
2318        use laminar_core::cluster::discovery::NodeId;
2319        use laminar_storage::checkpoint_manifest::SinkCommitStatus;
2320        use tokio::sync::watch;
2321
2322        let dir = tempfile::tempdir().unwrap();
2323        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2324        let mut orphan = CheckpointManifest::new(42, 7);
2325        orphan
2326            .sink_commit_statuses
2327            .insert("kafka_out".into(), SinkCommitStatus::Pending);
2328        store.save_with_state(&orphan, None).await.unwrap();
2329
2330        let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2331            .await
2332            .unwrap();
2333        let self_id = NodeId(1);
2334        let kv = Arc::new(InMemoryKv::new(self_id));
2335        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2336        let (_tx, rx) = watch::channel(Vec::new());
2337        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2338        let mut coord = coord;
2339        coord.set_cluster_controller(controller);
2340
2341        coord.reconcile_prepared_on_init().await;
2342
2343        let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2344        let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
2345        assert_eq!(ann.phase, Phase::Abort);
2346        assert_eq!(ann.epoch, 7);
2347        assert_eq!(ann.checkpoint_id, 42);
2348    }
2349
2350    #[cfg(feature = "cluster-unstable")]
2351    #[tokio::test]
2352    async fn reconcile_announces_commit_when_marker_present() {
2353        use laminar_core::cluster::control::{
2354            BarrierAnnouncement, CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
2355            Phase, ANNOUNCEMENT_KEY,
2356        };
2357        use laminar_core::cluster::discovery::NodeId;
2358        use laminar_storage::checkpoint_manifest::SinkCommitStatus;
2359        use object_store::local::LocalFileSystem;
2360        use tokio::sync::watch;
2361
2362        let ckpt_dir = tempfile::tempdir().unwrap();
2363        let decision_dir = tempfile::tempdir().unwrap();
2364        let store = Box::new(FileSystemCheckpointStore::new(ckpt_dir.path(), 3));
2365        let mut orphan = CheckpointManifest::new(42, 7);
2366        orphan
2367            .sink_commit_statuses
2368            .insert("kafka_out".into(), SinkCommitStatus::Pending);
2369        store.save_with_state(&orphan, None).await.unwrap();
2370
2371        let decision_os: Arc<dyn object_store::ObjectStore> =
2372            Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2373        let decision_store = Arc::new(CheckpointDecisionStore::new(decision_os));
2374        decision_store.record_committed(7).await.unwrap();
2375
2376        let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2377            .await
2378            .unwrap();
2379        let self_id = NodeId(1);
2380        let kv = Arc::new(InMemoryKv::new(self_id));
2381        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2382        let (_tx, rx) = watch::channel(Vec::new());
2383        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2384        let mut coord = coord;
2385        coord.set_cluster_controller(controller);
2386        coord.set_decision_store(decision_store);
2387
2388        coord.reconcile_prepared_on_init().await;
2389
2390        let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2391        let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
2392        assert_eq!(ann.phase, Phase::Commit);
2393        assert_eq!(ann.epoch, 7);
2394        assert_eq!(ann.checkpoint_id, 42);
2395    }
2396
2397    #[cfg(feature = "cluster-unstable")]
2398    #[tokio::test]
2399    async fn reconcile_announces_abort_when_marker_missing() {
2400        // Decision store is wired but has no marker for this epoch —
2401        // the "leader crashed before commit point" case.
2402        use laminar_core::cluster::control::{
2403            BarrierAnnouncement, CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
2404            Phase, ANNOUNCEMENT_KEY,
2405        };
2406        use laminar_core::cluster::discovery::NodeId;
2407        use laminar_storage::checkpoint_manifest::SinkCommitStatus;
2408        use object_store::local::LocalFileSystem;
2409        use tokio::sync::watch;
2410
2411        let ckpt_dir = tempfile::tempdir().unwrap();
2412        let decision_dir = tempfile::tempdir().unwrap();
2413        let store = Box::new(FileSystemCheckpointStore::new(ckpt_dir.path(), 3));
2414        let mut orphan = CheckpointManifest::new(11, 3);
2415        orphan
2416            .sink_commit_statuses
2417            .insert("out".into(), SinkCommitStatus::Pending);
2418        store.save_with_state(&orphan, None).await.unwrap();
2419
2420        let decision_os: Arc<dyn object_store::ObjectStore> =
2421            Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2422        let decision_store = Arc::new(CheckpointDecisionStore::new(decision_os));
2423
2424        let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2425            .await
2426            .unwrap();
2427        let self_id = NodeId(1);
2428        let kv = Arc::new(InMemoryKv::new(self_id));
2429        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2430        let (_tx, rx) = watch::channel(Vec::new());
2431        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2432        let mut coord = coord;
2433        coord.set_cluster_controller(controller);
2434        coord.set_decision_store(decision_store);
2435
2436        coord.reconcile_prepared_on_init().await;
2437
2438        let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2439        let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
2440        assert_eq!(ann.phase, Phase::Abort);
2441        assert_eq!(ann.epoch, 3);
2442    }
2443
2444    #[cfg(feature = "cluster-unstable")]
2445    #[tokio::test]
2446    async fn reconcile_silent_when_manifest_clean() {
2447        use laminar_core::cluster::control::{
2448            ClusterController, ClusterKv, InMemoryKv, ANNOUNCEMENT_KEY,
2449        };
2450        use laminar_core::cluster::discovery::NodeId;
2451        use laminar_storage::checkpoint_manifest::SinkCommitStatus;
2452        use tokio::sync::watch;
2453
2454        let dir = tempfile::tempdir().unwrap();
2455        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2456        let mut clean = CheckpointManifest::new(5, 3);
2457        clean
2458            .sink_commit_statuses
2459            .insert("out".into(), SinkCommitStatus::Committed);
2460        store.save_with_state(&clean, None).await.unwrap();
2461
2462        let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2463            .await
2464            .unwrap();
2465        let self_id = NodeId(1);
2466        let kv = Arc::new(InMemoryKv::new(self_id));
2467        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2468        let (_tx, rx) = watch::channel(Vec::new());
2469        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2470        let mut coord = coord;
2471        coord.set_cluster_controller(controller);
2472
2473        coord.reconcile_prepared_on_init().await;
2474
2475        // No announcement emitted.
2476        assert!(kv.read_from(self_id, ANNOUNCEMENT_KEY).await.is_none());
2477    }
2478
2479    #[cfg(feature = "cluster-unstable")]
2480    #[tokio::test]
2481    async fn follower_checkpoint_commits_on_leader_commit() {
2482        use laminar_core::cluster::control::{
2483            BarrierAck, BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase,
2484            ACK_KEY, ANNOUNCEMENT_KEY,
2485        };
2486        use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
2487        use tokio::sync::watch;
2488
2489        let dir = tempfile::tempdir().unwrap();
2490        let mut coord = make_coordinator(dir.path()).await;
2491
2492        let leader_id = NodeId(1);
2493        let follower_id = NodeId(7);
2494
2495        // Follower's KV sees both its own writes and a seeded view of the
2496        // leader's announcements. `members_rx` includes the leader so
2497        // `current_leader()` picks the lowest id (the leader, not self).
2498        let kv = Arc::new(InMemoryKv::new(follower_id));
2499        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2500        let leader_info = NodeInfo {
2501            id: leader_id,
2502            name: "leader".into(),
2503            rpc_address: String::new(),
2504            raft_address: String::new(),
2505            state: NodeState::Active,
2506            metadata: NodeMetadata::default(),
2507            last_heartbeat_ms: 0,
2508        };
2509        let (_tx, rx) = watch::channel(vec![leader_info]);
2510        let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
2511        coord.set_cluster_controller(controller);
2512
2513        // Leader has already announced PREPARE and COMMIT (simulates
2514        // a fast-gossip scenario; follower sees both on its first poll).
2515        let prepare_json = serde_json::to_string(&BarrierAnnouncement {
2516            epoch: 1,
2517            checkpoint_id: 1,
2518            phase: Phase::Prepare,
2519            flags: 0,
2520            min_watermark_ms: None,
2521        })
2522        .unwrap();
2523        let commit_json = serde_json::to_string(&BarrierAnnouncement {
2524            epoch: 1,
2525            checkpoint_id: 1,
2526            phase: Phase::Commit,
2527            flags: 0,
2528            min_watermark_ms: None,
2529        })
2530        .unwrap();
2531        // Overwrite the prepare with commit — observe_barrier reads the
2532        // latest value. Real gossip shows both in order; for the unit
2533        // test, landing on Commit is enough for the decision loop.
2534        kv.seed(leader_id, ANNOUNCEMENT_KEY, prepare_json);
2535        kv.seed(leader_id, ANNOUNCEMENT_KEY, commit_json);
2536
2537        let ann = BarrierAnnouncement {
2538            epoch: 1,
2539            checkpoint_id: 1,
2540            phase: Phase::Prepare,
2541            flags: 0,
2542            min_watermark_ms: None,
2543        };
2544        let committed = coord
2545            .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(2))
2546            .await
2547            .unwrap();
2548        assert!(committed, "follower should commit on leader's Commit");
2549
2550        // Follower's ack landed in its own KV.
2551        let ack_raw = kv.read_from(follower_id, ACK_KEY).await.unwrap();
2552        let ack: BarrierAck = serde_json::from_str(&ack_raw).unwrap();
2553        assert_eq!(ack.epoch, 1);
2554        assert!(ack.ok, "prepare succeeded, ack should be ok");
2555
2556        // Follower's manifest is on disk at the leader's epoch.
2557        let stored = coord.store().load_latest().await.unwrap().unwrap();
2558        assert_eq!(stored.epoch, 1);
2559    }
2560
2561    #[cfg(feature = "cluster-unstable")]
2562    #[tokio::test]
2563    async fn follower_checkpoint_rolls_back_on_leader_abort() {
2564        use laminar_core::cluster::control::{
2565            BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase, ANNOUNCEMENT_KEY,
2566        };
2567        use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
2568        use tokio::sync::watch;
2569
2570        let dir = tempfile::tempdir().unwrap();
2571        let mut coord = make_coordinator(dir.path()).await;
2572
2573        let leader_id = NodeId(1);
2574        let follower_id = NodeId(9);
2575        let kv = Arc::new(InMemoryKv::new(follower_id));
2576        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2577        let leader_info = NodeInfo {
2578            id: leader_id,
2579            name: "leader".into(),
2580            rpc_address: String::new(),
2581            raft_address: String::new(),
2582            state: NodeState::Active,
2583            metadata: NodeMetadata::default(),
2584            last_heartbeat_ms: 0,
2585        };
2586        let (_tx, rx) = watch::channel(vec![leader_info]);
2587        let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
2588        coord.set_cluster_controller(controller);
2589
2590        let abort_json = serde_json::to_string(&BarrierAnnouncement {
2591            epoch: 1,
2592            checkpoint_id: 1,
2593            phase: Phase::Abort,
2594            flags: 0,
2595            min_watermark_ms: None,
2596        })
2597        .unwrap();
2598        kv.seed(leader_id, ANNOUNCEMENT_KEY, abort_json);
2599
2600        let ann = BarrierAnnouncement {
2601            epoch: 1,
2602            checkpoint_id: 1,
2603            phase: Phase::Prepare,
2604            flags: 0,
2605            min_watermark_ms: None,
2606        };
2607        let committed = coord
2608            .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(2))
2609            .await
2610            .unwrap();
2611        assert!(!committed, "follower should roll back on leader's Abort");
2612    }
2613
2614    #[cfg(feature = "cluster-unstable")]
2615    #[tokio::test]
2616    async fn leader_publishes_cluster_min_watermark_to_controller() {
2617        // On a solo cluster, `await_prepare_quorum` computes the
2618        // cluster-wide min as "leader's local watermark" (no followers
2619        // to fold). This must be mirrored into the controller atomic so
2620        // the leader's own operators consume the same value that
2621        // followers pick up via `observe_barrier(Commit)` — otherwise
2622        // the leader would drive event-time decisions off a watermark
2623        // that none of its peers have acked yet.
2624        use laminar_core::cluster::control::{
2625            CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
2626        };
2627        use laminar_core::cluster::discovery::NodeId;
2628        use object_store::local::LocalFileSystem;
2629        use tokio::sync::watch;
2630
2631        let dir = tempfile::tempdir().unwrap();
2632        let decision_dir = tempfile::tempdir().unwrap();
2633        let mut coord = make_coordinator(dir.path()).await;
2634
2635        let decision_os: Arc<dyn object_store::ObjectStore> =
2636            Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2637        coord.set_decision_store(Arc::new(CheckpointDecisionStore::new(decision_os)));
2638
2639        let self_id = NodeId(1);
2640        let kv = Arc::new(InMemoryKv::new(self_id));
2641        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2642        let (_tx, rx) = watch::channel(Vec::new());
2643        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2644        coord.set_cluster_controller(Arc::clone(&controller));
2645
2646        // Pre-condition: controller atomic is at its "unset" sentinel.
2647        assert_eq!(controller.cluster_min_watermark(), None);
2648
2649        // Seed a local watermark on the coordinator and drive a full
2650        // checkpoint. Solo cluster → leader's local value *is* the
2651        // cluster-wide min.
2652        coord.set_local_watermark_ms(Some(12_345));
2653        let result = coord
2654            .checkpoint(CheckpointRequest::default())
2655            .await
2656            .unwrap();
2657        assert!(result.success, "solo-cluster checkpoint should succeed");
2658
2659        assert_eq!(
2660            controller.cluster_min_watermark(),
2661            Some(12_345),
2662            "leader must mirror the cluster-wide min into its controller",
2663        );
2664
2665        // A subsequent checkpoint with a lower local watermark must
2666        // NOT regress the published value — event-time progress is
2667        // monotonic (same invariant the follower path already enforces).
2668        coord.set_local_watermark_ms(Some(42));
2669        let result = coord
2670            .checkpoint(CheckpointRequest::default())
2671            .await
2672            .unwrap();
2673        assert!(result.success);
2674        assert_eq!(
2675            controller.cluster_min_watermark(),
2676            Some(12_345),
2677            "stale local watermark must not lower the published cluster min",
2678        );
2679    }
2680
2681    #[cfg(feature = "cluster-unstable")]
2682    #[tokio::test]
2683    async fn leader_announces_prepare_and_commit_on_solo_cluster() {
2684        use laminar_core::cluster::control::{
2685            CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv, Phase,
2686            ANNOUNCEMENT_KEY,
2687        };
2688        use laminar_core::cluster::discovery::NodeId;
2689        use object_store::local::LocalFileSystem;
2690        use tokio::sync::watch;
2691
2692        let dir = tempfile::tempdir().unwrap();
2693        let decision_dir = tempfile::tempdir().unwrap();
2694        let mut coord = make_coordinator(dir.path()).await;
2695
2696        let decision_os: Arc<dyn object_store::ObjectStore> =
2697            Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2698        coord.set_decision_store(Arc::new(CheckpointDecisionStore::new(decision_os)));
2699
2700        let self_id = NodeId(1);
2701        let kv = Arc::new(InMemoryKv::new(self_id));
2702        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2703        let (_tx, rx) = watch::channel(Vec::new()); // solo — no peers
2704        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2705        coord.set_cluster_controller(controller);
2706
2707        let result = coord
2708            .checkpoint(CheckpointRequest::default())
2709            .await
2710            .unwrap();
2711        assert!(result.success, "solo-cluster checkpoint should succeed");
2712
2713        // The last announce on the leader's KV is COMMIT (PREPARE was
2714        // overwritten in the same slot).
2715        let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2716        let ann: laminar_core::cluster::control::BarrierAnnouncement =
2717            serde_json::from_str(&raw).unwrap();
2718        assert_eq!(ann.phase, Phase::Commit);
2719        assert_eq!(ann.epoch, result.epoch);
2720    }
2721
2722    #[tokio::test]
2723    async fn gate_checks_full_registry_not_just_owned() {
2724        // Leader owns vnodes {0, 1}. Cluster has 4 vnodes total; a
2725        // follower (simulated by pre-populating half the backend) owns
2726        // {2, 3}. If the follower's markers are missing, the leader's
2727        // gate must fail even though the leader wrote its own.
2728        use laminar_core::state::InProcessBackend;
2729        let dir = tempfile::tempdir().unwrap();
2730        let mut coord = make_coordinator(dir.path()).await;
2731        let backend = Arc::new(InProcessBackend::new(4));
2732        coord.set_state_backend(backend.clone());
2733        coord.set_vnode_set(vec![0, 1]); // leader's owned subset
2734        coord.set_gate_vnode_set(vec![0, 1, 2, 3]); // full cluster registry
2735
2736        let result = coord
2737            .checkpoint(CheckpointRequest::default())
2738            .await
2739            .unwrap();
2740        assert!(
2741            !result.success,
2742            "gate must fail when follower markers are missing",
2743        );
2744        let err = result.error.expect("failure produces an error message");
2745        assert!(
2746            err.contains("not all vnodes persisted"),
2747            "expected full-registry gate miss, got: {err}",
2748        );
2749    }
2750
2751    #[tokio::test]
2752    async fn gate_passes_when_all_registry_markers_present() {
2753        // Same topology as the previous test, but now the follower's
2754        // markers are pre-populated — the gate sees a complete set
2755        // across the full registry and the checkpoint succeeds.
2756        use bytes::Bytes;
2757        use laminar_core::state::InProcessBackend;
2758        let dir = tempfile::tempdir().unwrap();
2759        let mut coord = make_coordinator(dir.path()).await;
2760        let backend = Arc::new(InProcessBackend::new(4));
2761        // Simulate the follower's prior write on vnodes {2, 3} for the
2762        // epoch the leader is about to use (fresh store starts at 1).
2763        backend
2764            .write_partial(2, 1, 0, Bytes::from_static(b"follower"))
2765            .await
2766            .unwrap();
2767        backend
2768            .write_partial(3, 1, 0, Bytes::from_static(b"follower"))
2769            .await
2770            .unwrap();
2771        coord.set_state_backend(backend);
2772        coord.set_vnode_set(vec![0, 1]);
2773        coord.set_gate_vnode_set(vec![0, 1, 2, 3]);
2774
2775        let result = coord
2776            .checkpoint(CheckpointRequest::default())
2777            .await
2778            .unwrap();
2779        assert!(result.success, "gate should pass: every vnode has a marker");
2780    }
2781
2782    #[tokio::test]
2783    async fn marker_write_failure_aborts_checkpoint() {
2784        use laminar_core::state::InProcessBackend;
2785        let dir = tempfile::tempdir().unwrap();
2786        let mut coord = make_coordinator(dir.path()).await;
2787        // Backend is sized for 2 vnodes, but we claim to own vnode 99 →
2788        // bridge fails its write, checkpoint aborts cleanly.
2789        coord.set_state_backend(Arc::new(InProcessBackend::new(2)));
2790        coord.set_vnode_set(vec![0, 99]);
2791
2792        let result = coord
2793            .checkpoint(CheckpointRequest::default())
2794            .await
2795            .unwrap();
2796        assert!(
2797            !result.success,
2798            "out-of-range vnode must fail the checkpoint"
2799        );
2800        let err = result.error.expect("failure produces an error message");
2801        assert!(err.contains("vnode marker write failed"), "got: {err}");
2802    }
2803
2804    #[tokio::test]
2805    async fn test_stats_include_percentiles_after_checkpoints() {
2806        let dir = tempfile::tempdir().unwrap();
2807        let mut coord = make_coordinator(dir.path()).await;
2808
2809        // Run 3 checkpoints.
2810        for _ in 0..3 {
2811            let result = coord
2812                .checkpoint(CheckpointRequest::default())
2813                .await
2814                .unwrap();
2815            assert!(result.success);
2816        }
2817
2818        let stats = coord.stats();
2819        assert_eq!(stats.completed, 3);
2820        // After 3 fast checkpoints, percentiles should be > 0
2821        // (they're real durations, not zero).
2822        assert!(stats.last_duration.is_some());
2823    }
2824
2825    /// Sink whose `pre_commit` always fails; counts `rollback_epoch` calls.
2826    struct FailingPreCommitSink {
2827        rollback_count: Arc<std::sync::atomic::AtomicU64>,
2828        schema: arrow::datatypes::SchemaRef,
2829    }
2830
2831    #[async_trait::async_trait]
2832    impl laminar_connectors::connector::SinkConnector for FailingPreCommitSink {
2833        async fn open(
2834            &mut self,
2835            _config: &laminar_connectors::config::ConnectorConfig,
2836        ) -> Result<(), laminar_connectors::error::ConnectorError> {
2837            Ok(())
2838        }
2839
2840        async fn write_batch(
2841            &mut self,
2842            _batch: &arrow::array::RecordBatch,
2843        ) -> Result<
2844            laminar_connectors::connector::WriteResult,
2845            laminar_connectors::error::ConnectorError,
2846        > {
2847            Ok(laminar_connectors::connector::WriteResult::new(0, 0))
2848        }
2849
2850        async fn pre_commit(
2851            &mut self,
2852            epoch: u64,
2853        ) -> Result<(), laminar_connectors::error::ConnectorError> {
2854            Err(laminar_connectors::error::ConnectorError::TransactionError(
2855                format!("synthetic pre_commit failure at epoch {epoch}"),
2856            ))
2857        }
2858
2859        async fn rollback_epoch(
2860            &mut self,
2861            _epoch: u64,
2862        ) -> Result<(), laminar_connectors::error::ConnectorError> {
2863            self.rollback_count
2864                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2865            Ok(())
2866        }
2867
2868        async fn close(&mut self) -> Result<(), laminar_connectors::error::ConnectorError> {
2869            Ok(())
2870        }
2871
2872        fn schema(&self) -> arrow::datatypes::SchemaRef {
2873            Arc::clone(&self.schema)
2874        }
2875
2876        fn capabilities(&self) -> laminar_connectors::connector::SinkConnectorCapabilities {
2877            laminar_connectors::connector::SinkConnectorCapabilities::new(Duration::from_secs(5))
2878                .with_exactly_once()
2879                .with_two_phase_commit()
2880        }
2881    }
2882
2883    #[tokio::test]
2884    async fn test_pre_commit_failure_triggers_rollback() {
2885        use arrow::datatypes::{DataType, Field, Schema};
2886
2887        let dir = tempfile::tempdir().unwrap();
2888        let mut coord = make_coordinator(dir.path()).await;
2889
2890        let rollback_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
2891        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
2892        let sink = FailingPreCommitSink {
2893            rollback_count: Arc::clone(&rollback_count),
2894            schema,
2895        };
2896        let (event_tx, _event_rx) = laminar_core::streaming::channel::channel::<
2897            crate::sink_task::SinkEvent,
2898        >(crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY);
2899        let handle = crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
2900            name: "failing-sink".into(),
2901            sink_id: Arc::from("failing-sink"),
2902            connector: Box::new(sink),
2903            exactly_once: true,
2904            channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
2905            flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
2906            write_timeout: Duration::from_secs(5),
2907            event_tx,
2908        });
2909        coord.register_sink("failing-sink", handle, true);
2910
2911        coord.begin_initial_epoch().await.unwrap();
2912
2913        let result = coord
2914            .checkpoint(CheckpointRequest::default())
2915            .await
2916            .unwrap();
2917
2918        assert!(!result.success);
2919        assert!(
2920            result
2921                .error
2922                .as_deref()
2923                .is_some_and(|e| e.contains("pre-commit failed")),
2924            "error should mention pre-commit: got {:?}",
2925            result.error
2926        );
2927        assert_eq!(
2928            rollback_count.load(std::sync::atomic::Ordering::Relaxed),
2929            1,
2930            "rollback_epoch should have been called once"
2931        );
2932    }
2933
2934    /// `pre_commit` fails; `rollback_epoch` hangs forever.
2935    struct StuckRollbackSink {
2936        schema: arrow::datatypes::SchemaRef,
2937    }
2938
2939    #[async_trait::async_trait]
2940    impl laminar_connectors::connector::SinkConnector for StuckRollbackSink {
2941        async fn open(
2942            &mut self,
2943            _config: &laminar_connectors::config::ConnectorConfig,
2944        ) -> Result<(), laminar_connectors::error::ConnectorError> {
2945            Ok(())
2946        }
2947
2948        async fn write_batch(
2949            &mut self,
2950            _batch: &arrow::array::RecordBatch,
2951        ) -> Result<
2952            laminar_connectors::connector::WriteResult,
2953            laminar_connectors::error::ConnectorError,
2954        > {
2955            Ok(laminar_connectors::connector::WriteResult::new(0, 0))
2956        }
2957
2958        async fn pre_commit(
2959            &mut self,
2960            _epoch: u64,
2961        ) -> Result<(), laminar_connectors::error::ConnectorError> {
2962            Err(laminar_connectors::error::ConnectorError::TransactionError(
2963                "synthetic pre_commit failure".into(),
2964            ))
2965        }
2966
2967        async fn rollback_epoch(
2968            &mut self,
2969            _epoch: u64,
2970        ) -> Result<(), laminar_connectors::error::ConnectorError> {
2971            // Hang until the test runtime drops us.
2972            std::future::pending::<()>().await;
2973            Ok(())
2974        }
2975
2976        async fn close(&mut self) -> Result<(), laminar_connectors::error::ConnectorError> {
2977            Ok(())
2978        }
2979
2980        fn schema(&self) -> arrow::datatypes::SchemaRef {
2981            Arc::clone(&self.schema)
2982        }
2983
2984        fn capabilities(&self) -> laminar_connectors::connector::SinkConnectorCapabilities {
2985            laminar_connectors::connector::SinkConnectorCapabilities::new(Duration::from_secs(5))
2986                .with_exactly_once()
2987                .with_two_phase_commit()
2988        }
2989    }
2990
2991    #[tokio::test(start_paused = true)]
2992    async fn test_rollback_sinks_bounded_by_timeout() {
2993        use arrow::datatypes::{DataType, Field, Schema};
2994
2995        let dir = tempfile::tempdir().unwrap();
2996        let config = CheckpointConfig {
2997            rollback_timeout: Duration::from_millis(100),
2998            ..Default::default()
2999        };
3000        let store = Box::new(
3001            laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(dir.path(), 3),
3002        );
3003        let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
3004
3005        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
3006        let sink = StuckRollbackSink { schema };
3007        let (event_tx, _event_rx) = laminar_core::streaming::channel::channel::<
3008            crate::sink_task::SinkEvent,
3009        >(crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY);
3010        let handle = crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
3011            name: "stuck-sink".into(),
3012            sink_id: Arc::from("stuck-sink"),
3013            connector: Box::new(sink),
3014            exactly_once: true,
3015            channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
3016            flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
3017            write_timeout: Duration::from_secs(5),
3018            event_tx,
3019        });
3020        coord.register_sink("stuck-sink", handle, true);
3021        coord.begin_initial_epoch().await.unwrap();
3022
3023        // pre_commit fails → rollback_sinks fires → hangs → 100ms
3024        // rollback_timeout fires → coordinator returns.
3025        let result = coord
3026            .checkpoint(CheckpointRequest::default())
3027            .await
3028            .unwrap();
3029
3030        assert!(!result.success);
3031        assert!(
3032            result
3033                .error
3034                .as_deref()
3035                .is_some_and(|e| e.contains("pre-commit failed")),
3036            "checkpoint result should reflect pre-commit failure: got {:?}",
3037            result.error
3038        );
3039    }
3040}