Skip to main content

laminar_db/
checkpoint_coordinator.rs

1//! Unified checkpoint coordinator.
2//!
3//! Single orchestrator for checkpoint lifecycle. Lives in Ring 2 (control plane).
4//!
5//! The checkpoint manifest is the source of truth for source offsets.
6//! Kafka broker commits are advisory (for monitoring tools). On recovery,
7//! offsets restore from manifest, not consumer group state.
8//!
9//! ## Checkpoint Cycle
10//!
11//! 1. Barrier injection — `CheckpointBarrierInjector.trigger()`
12//! 2. Operator snapshot — `OperatorGraph.snapshot_state()` → operator states
13//! 3. Source snapshot — `source.checkpoint()` for each source
14//! 4. Sink pre-commit — `sink.pre_commit(epoch)` for each exactly-once sink
15//! 5. Manifest persist — `store.save(&manifest)` (atomic write)
16//! 6. Sink commit — `sink.commit_epoch(epoch)` for each exactly-once sink
17//! 7. On ANY failure at 6 — `sink.rollback_epoch()` on remaining sinks
18#![allow(clippy::disallowed_types)] // cold path
19
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use laminar_connectors::checkpoint::SourceCheckpoint;
25use laminar_connectors::connector::SourceConnector;
26use laminar_core::state::StateBackend;
27use laminar_core::storage::checkpoint_manifest::{
28    CheckpointManifest, ConnectorCheckpoint, SinkCommitStatus,
29};
30use laminar_core::storage::checkpoint_store::CheckpointStore;
31use tracing::{debug, error, info, warn};
32
33use crate::error::DbError;
34
35/// Unified checkpoint configuration.
36///
37/// Timeouts prevent a stuck sink or hung filesystem from stalling the
38/// runtime. `state_inline_threshold` decides per-operator whether state
39/// inlines as base64 in the JSON manifest or lands in a sidecar file.
40/// `max_checkpoint_bytes` caps total sidecar size; an oversized
41/// checkpoint is rejected with `[LDB-6014]`.
42#[derive(Debug, Clone)]
43pub struct CheckpointConfig {
44    /// Interval between checkpoints. `None` = manual only.
45    pub interval: Option<Duration>,
46    /// Number of completed checkpoints retained on disk/in object store.
47    pub max_retained: usize,
48    /// Upper bound on barrier-alignment wait at fan-in operators.
49    pub alignment_timeout: Duration,
50    /// Upper bound on sink pre-commit (phase 1).
51    pub pre_commit_timeout: Duration,
52    /// Upper bound on manifest persist.
53    pub persist_timeout: Duration,
54    /// Upper bound on sink commit (phase 2).
55    pub commit_timeout: Duration,
56    /// Upper bound on sink rollback.
57    pub rollback_timeout: Duration,
58    /// States larger than this go to a sidecar rather than base64 JSON.
59    pub state_inline_threshold: usize,
60    /// Upper bound on operator state serialization.
61    pub serialization_timeout: Duration,
62    /// Cap on total sidecar bytes; `None` = no limit. 80% warn threshold.
63    pub max_checkpoint_bytes: Option<usize>,
64    /// Quorum wait timeout for checkpoint coordination.
65    pub quorum_timeout: Duration,
66    /// Max wait for every participating vnode's partial to land before
67    /// the epoch is declared restorable. Followers ack at *capture* and
68    /// upload asynchronously after acking, so the
69    /// leader's durability gate polls for partial presence instead of
70    /// checking once. Expiry aborts the epoch.
71    pub restorable_gate_timeout: Duration,
72    /// Maximum epochs admitted between `Aligned` and restorable — the
73    /// upload backlog. Exactly-once pipelines are
74    /// capped at 1: a single-open-transaction sink (e.g. a Kafka
75    /// transactional producer) cannot overlap epochs.
76    pub max_in_flight_epochs: u64,
77    /// Cap on captured-state bytes held in memory by in-flight epochs
78    /// awaiting upload. At the cap, barrier admission pauses — cadence
79    /// degrades to upload speed instead of exhausting memory.
80    pub max_staged_bytes: u64,
81}
82
83impl Default for CheckpointConfig {
84    fn default() -> Self {
85        Self {
86            interval: Some(Duration::from_secs(60)),
87            max_retained: 3,
88            alignment_timeout: Duration::from_secs(30),
89            pre_commit_timeout: Duration::from_secs(30),
90            persist_timeout: Duration::from_secs(120),
91            commit_timeout: Duration::from_secs(60),
92            rollback_timeout: Duration::from_secs(30),
93            serialization_timeout: Duration::from_secs(120),
94            state_inline_threshold: 1_048_576,
95            max_checkpoint_bytes: None,
96            quorum_timeout: Duration::from_secs(3),
97            // Last-resort bound only: membership/unresponsive/rotation
98            // fail-fasts catch dead participants in seconds. Long values
99            // serialize recovery (pipelined doomed epochs each burn it).
100            restorable_gate_timeout: Duration::from_secs(10),
101            max_in_flight_epochs: 4,
102            max_staged_bytes: 512 * 1024 * 1024,
103        }
104    }
105}
106
107/// Parameters for a checkpoint operation.
108#[derive(Debug, Clone, Default)]
109pub struct CheckpointRequest {
110    /// Serialized operator states. `Bytes` (not `Vec<u8>`) so producers
111    /// (rkyv output, MV IPC bytes) can hand off the buffer without an
112    /// extra copy at each stage of the checkpoint pipeline.
113    pub operator_states: HashMap<String, bytes::Bytes>,
114    /// Current watermark timestamp.
115    pub watermark: Option<i64>,
116    /// Path for table store checkpoint data.
117    pub table_store_checkpoint_path: Option<String>,
118    /// Additional table offset overrides.
119    pub extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
120    /// Per-source watermark timestamps.
121    pub source_watermarks: HashMap<String, i64>,
122    /// Pipeline topology hash for change detection.
123    pub pipeline_hash: Option<u64>,
124    /// Source offset overrides for recovery.
125    pub source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
126}
127
128/// Lock-free epoch/checkpoint-id allocator.
129///
130/// Ids are handed out at barrier admission — possibly while an earlier
131/// epoch's durable tail still holds the coordinator mutex — so they
132/// live outside the coordinator's exclusive state. Failed epochs are
133/// abandoned, never re-allocated.
134#[derive(Debug)]
135pub(crate) struct EpochAllocator {
136    epoch: std::sync::atomic::AtomicU64,
137    next_checkpoint_id: std::sync::atomic::AtomicU64,
138}
139
140impl EpochAllocator {
141    fn new(epoch: u64, next_checkpoint_id: u64) -> Self {
142        Self {
143            epoch: std::sync::atomic::AtomicU64::new(epoch),
144            next_checkpoint_id: std::sync::atomic::AtomicU64::new(next_checkpoint_id),
145        }
146    }
147
148    /// Claim the next `(epoch, checkpoint_id)` pair.
149    ///
150    /// The two counters advance independently (not as one atomic unit):
151    /// every allocation site runs on the pipeline task or under the
152    /// coordinator mutex, so concurrent `allocate` calls cannot occur
153    /// today. A new call site off those paths would need this upgraded
154    /// to a single CAS over a packed pair.
155    pub(crate) fn allocate(&self) -> (u64, u64) {
156        use std::sync::atomic::Ordering;
157        (
158            self.epoch.fetch_add(1, Ordering::AcqRel),
159            self.next_checkpoint_id.fetch_add(1, Ordering::AcqRel),
160        )
161    }
162
163    /// The pair the next [`allocate`](Self::allocate) would return.
164    pub(crate) fn peek(&self) -> (u64, u64) {
165        use std::sync::atomic::Ordering;
166        (
167            self.epoch.load(Ordering::Acquire),
168            self.next_checkpoint_id.load(Ordering::Acquire),
169        )
170    }
171
172    /// Monotonic advance — ids never walk backwards. An aborted epoch
173    /// leaves artifacts behind (Pending manifest, uploaded partials);
174    /// recovery restoring an OLDER committed epoch must not pull ids
175    /// back down onto them and re-allocate the aborted epoch.
176    fn advance_to(&self, epoch: u64, next_checkpoint_id: u64) {
177        use std::sync::atomic::Ordering;
178        self.epoch.fetch_max(epoch, Ordering::AcqRel);
179        self.next_checkpoint_id
180            .fetch_max(next_checkpoint_id, Ordering::AcqRel);
181    }
182}
183
184/// Capture-quorum participant id. Aliased so non-cluster builds (where
185/// `laminar_core::cluster` is compiled out and participant sets are
186/// always empty) still type-check the shared plumbing.
187#[cfg(feature = "cluster")]
188pub(crate) type QuorumPeer = laminar_core::cluster::discovery::NodeId;
189#[cfg(not(feature = "cluster"))]
190pub(crate) type QuorumPeer = u64;
191
192/// Whether `checkpoint_inner` still needs to run the cluster capture
193/// quorum, or a pipelined tail already ran it before taking the
194/// coordinator mutex.
195#[derive(Debug, Clone)]
196pub(crate) enum QuorumStage {
197    /// Run the quorum + `Aligned` announce inline (forced/timer paths).
198    RunInline,
199    /// Already reached before the coordinator lock; carries the merged
200    /// cluster-min watermark for the `Commit` announcement plus the
201    /// capture-time follower set, so the durability gate can fail fast
202    /// if one of them dies instead of burning its full timeout.
203    #[cfg_attr(not(feature = "cluster"), allow(dead_code))]
204    Done {
205        /// Merged cluster-min watermark from the capture acks.
206        min_watermark_ms: Option<i64>,
207        /// Followers that acked the capture quorum.
208        participants: Vec<QuorumPeer>,
209    },
210}
211
212/// Phase of the checkpoint lifecycle.
213#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
214pub enum CheckpointPhase {
215    /// No checkpoint in progress.
216    Idle,
217    /// Operators snapshotted, collecting source positions.
218    Snapshotting,
219    /// Sinks pre-committing (phase 1).
220    PreCommitting,
221    /// Manifest being persisted.
222    Persisting,
223    /// Sinks committing (phase 2).
224    Committing,
225}
226
227impl std::fmt::Display for CheckpointPhase {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        match self {
230            Self::Idle => write!(f, "Idle"),
231            Self::Snapshotting => write!(f, "Snapshotting"),
232            Self::PreCommitting => write!(f, "PreCommitting"),
233            Self::Persisting => write!(f, "Persisting"),
234            Self::Committing => write!(f, "Committing"),
235        }
236    }
237}
238
239/// Result of a checkpoint attempt.
240#[derive(Debug, serde::Serialize, serde::Deserialize)]
241pub struct CheckpointResult {
242    /// Whether the checkpoint succeeded.
243    pub success: bool,
244    /// Checkpoint ID (if created).
245    pub checkpoint_id: u64,
246    /// Epoch number.
247    pub epoch: u64,
248    /// Duration of the checkpoint operation.
249    pub duration: Duration,
250    /// Error message if failed.
251    pub error: Option<String>,
252}
253
254/// Registered source for checkpoint coordination.
255pub(crate) struct RegisteredSource {
256    /// Source name.
257    pub name: String,
258    /// Source connector handle.
259    pub connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
260    /// Whether this source supports replay from a checkpointed position.
261    ///
262    /// Sources that do not support replay (e.g., WebSocket) degrade
263    /// exactly-once semantics to at-most-once.
264    pub supports_replay: bool,
265}
266
267/// Registered sink for checkpoint coordination.
268pub(crate) struct RegisteredSink {
269    /// Sink name.
270    pub name: String,
271    /// Sink task handle (channel-based, no mutex contention).
272    pub handle: crate::sink_task::SinkTaskHandle,
273    /// Whether this sink supports exactly-once / two-phase commit.
274    pub exactly_once: bool,
275}
276
277/// Unified checkpoint coordinator.
278///
279/// Orchestrates the full checkpoint lifecycle across sources, sinks,
280/// and operator state, persisting everything in a single
281/// [`CheckpointManifest`].
282pub struct CheckpointCoordinator {
283    config: CheckpointConfig,
284    store: Arc<dyn CheckpointStore>,
285    sinks: Vec<RegisteredSink>,
286    /// Shared with the pipeline callback, which allocates ids at
287    /// barrier admission without taking the coordinator mutex.
288    allocator: Arc<EpochAllocator>,
289    phase: CheckpointPhase,
290    checkpoints_completed: u64,
291    checkpoints_failed: u64,
292    last_checkpoint_duration: Option<Duration>,
293    duration_histogram: DurationHistogram,
294    prom: Option<Arc<crate::engine_metrics::EngineMetrics>>,
295    total_bytes_written: u64,
296    /// Consulted between manifest persist and sink commit to verify
297    /// per-vnode durability.
298    state_backend: Option<Arc<dyn StateBackend>>,
299    /// Stamped into every `write_partial` for the split-brain fence.
300    /// Zero = fence disabled.
301    assignment_version: u64,
302    /// Durable commit marker, written before sinks are told to commit
303    /// so recovery can tell the 2PC verdict apart from a mid-flight
304    /// crash. `None` falls back to "rollback on recovery".
305    decision_store: Option<Arc<laminar_core::checkpoint_decision::CheckpointDecisionStore>>,
306    /// Reported in each `BarrierAck`; the leader folds these with its
307    /// own watermark to compute the cluster-wide min.
308    local_watermark_ms: Option<i64>,
309    /// Cluster-wide min watermark as of the last committed epoch
310    /// (leader-side; fanned out in the Commit announcement).
311    #[cfg(feature = "cluster")]
312    cluster_min_watermark: Option<i64>,
313    /// Vnodes this coordinator owns; drives per-vnode marker writes.
314    vnode_set: Vec<u32>,
315    /// Vnodes the leader's durability gate checks. In cluster mode the
316    /// full registry; single-instance mirrors `vnode_set`.
317    gate_vnode_set: Vec<u32>,
318    /// First epoch admitted at-or-after the latest vnode rotation. An
319    /// in-flight epoch BELOW this captured under the previous
320    /// assignment: vnodes that changed hands mid-epoch were captured by
321    /// nobody, so its durability gate can never seal — fail it fast
322    /// instead of burning the gate timeout (with pipelining, several
323    /// such epochs would burn serially).
324    rotation_epoch_floor: u64,
325    /// Per-vnode operator-state slices for the in-flight checkpoint,
326    /// `vnode → (operator_name → bytes)`. Set fresh before each checkpoint
327    /// by the pipeline callback from `OperatorGraph::snapshot_state_by_vnode`;
328    /// folded into each owned vnode's `partial.bin` by
329    /// [`write_vnode_partials`](Self::write_vnode_partials). Empty in
330    /// single-instance mode (the partial is then just a durability marker).
331    #[allow(clippy::disallowed_types)] // matches the graph snapshot shape
332    pending_vnode_states:
333        std::collections::HashMap<u32, std::collections::HashMap<String, bytes::Bytes>>,
334    /// Per-vnode `(epoch, slices)` of the last *full* partial uploaded —
335    /// the bases for unchanged-vnode reference partials. Bytes are
336    /// refcounted, so this holds one serialized
337    /// snapshot's worth of memory, not a deep copy per epoch.
338    #[allow(clippy::disallowed_types)]
339    last_vnode_uploads:
340        std::collections::HashMap<u32, (u64, std::collections::HashMap<String, bytes::Bytes>)>,
341    /// `Some` in cluster mode, `None` in single-instance / embedded.
342    #[cfg(feature = "cluster")]
343    cluster_controller: Option<Arc<laminar_core::cluster::control::ClusterController>>,
344    /// Cached sorted sink names; invalidated on `register_sink`.
345    cached_sorted_sink_names: Option<Vec<String>>,
346}
347
348/// Highest-loadable manifest — tolerates a torn `latest.txt` pointer.
349async fn load_highest(
350    store: &dyn CheckpointStore,
351) -> Result<Option<CheckpointManifest>, laminar_core::storage::checkpoint_store::CheckpointStoreError>
352{
353    let ids = store.list_ids().await?;
354    for id in ids.iter().rev() {
355        if let Ok(Some(m)) = store.load_by_id(*id).await {
356            return Ok(Some(m));
357        }
358    }
359    Ok(None)
360}
361
362impl CheckpointCoordinator {
363    /// Creates a new checkpoint coordinator, seeded from the highest
364    /// loadable stored checkpoint.
365    ///
366    /// # Errors
367    /// Surfaces a store read failure rather than silently starting at
368    /// `(1, 1)` and clobbering on-disk state.
369    pub async fn new(
370        config: CheckpointConfig,
371        store: Box<dyn CheckpointStore>,
372    ) -> Result<Self, DbError> {
373        let store: Arc<dyn CheckpointStore> = Arc::from(store);
374        let highest = load_highest(store.as_ref()).await.map_err(|e| {
375            DbError::Checkpoint(format!(
376                "[LDB-6028] failed to list checkpoints at coordinator \
377                 construction: {e} — refusing to start at epoch 1 and \
378                 clobber existing on-disk state"
379            ))
380        })?;
381        let (next_id, epoch) = match highest.as_ref() {
382            Some(m) => (m.checkpoint_id + 1, m.epoch + 1),
383            None => (1, 1),
384        };
385
386        Ok(Self {
387            config,
388            store,
389            sinks: Vec::new(),
390            allocator: Arc::new(EpochAllocator::new(epoch, next_id)),
391            phase: CheckpointPhase::Idle,
392            checkpoints_completed: 0,
393            checkpoints_failed: 0,
394            last_checkpoint_duration: None,
395            duration_histogram: DurationHistogram::new(),
396            prom: None,
397            total_bytes_written: 0,
398            state_backend: None,
399            assignment_version: 0,
400            decision_store: None,
401            local_watermark_ms: None,
402            #[cfg(feature = "cluster")]
403            cluster_min_watermark: None,
404            vnode_set: Vec::new(),
405            gate_vnode_set: Vec::new(),
406            rotation_epoch_floor: 0,
407            pending_vnode_states: std::collections::HashMap::new(),
408            last_vnode_uploads: std::collections::HashMap::new(),
409            #[cfg(feature = "cluster")]
410            cluster_controller: None,
411            cached_sorted_sink_names: None,
412        })
413    }
414
415    /// Activates cluster-mode 2PC. Without this the coordinator runs
416    /// single-instance semantics.
417    #[cfg(feature = "cluster")]
418    pub fn set_cluster_controller(
419        &mut self,
420        controller: Arc<laminar_core::cluster::control::ClusterController>,
421    ) {
422        self.cluster_controller = Some(controller);
423    }
424
425    /// Wired with a non-empty `vnode_set` to enable per-vnode markers
426    /// and the `epoch_complete` durability gate.
427    pub fn set_state_backend(&mut self, backend: Arc<dyn StateBackend>) {
428        self.state_backend = Some(backend);
429    }
430
431    /// Install the durable commit-marker store.
432    pub fn set_decision_store(
433        &mut self,
434        store: Arc<laminar_core::checkpoint_decision::CheckpointDecisionStore>,
435    ) {
436        self.decision_store = Some(store);
437    }
438
439    /// Record the assignment generation this coordinator is writing
440    /// with. Forwarded to `backend.write_partial` so the split-brain
441    /// fence can reject stale writers. Host sets this whenever a fresh
442    /// `AssignmentSnapshot` rotates in.
443    pub fn set_assignment_version(&mut self, version: u64) {
444        self.assignment_version = version;
445    }
446
447    /// Record this instance's current local watermark, reported in
448    /// every subsequent `BarrierAck` so the leader can compute the
449    /// cluster-wide minimum. `None` disables the per-follower
450    /// contribution — leader falls back to its own watermark (and
451    /// the other followers').
452    pub fn set_local_watermark_ms(&mut self, watermark: Option<i64>) {
453        self.local_watermark_ms = watermark;
454    }
455
456    /// Stage the per-vnode operator-state slices for the next checkpoint.
457    ///
458    /// Each owned vnode's slice is folded into its `partial.bin` by
459    /// `write_vnode_partials` so a node that
460    /// later acquires the vnode can rehydrate exactly that vnode's state.
461    /// Call once per checkpoint (even with an empty map) so a prior epoch's
462    /// slices never leak forward.
463    #[allow(clippy::disallowed_types)]
464    pub fn set_pending_vnode_states(
465        &mut self,
466        states: std::collections::HashMap<u32, std::collections::HashMap<String, bytes::Bytes>>,
467    ) {
468        self.pending_vnode_states = states;
469    }
470
471    /// Vnodes this instance owns; drives marker writes. Also the
472    /// default gate set until [`Self::set_gate_vnode_set`] is called.
473    pub fn set_vnode_set(&mut self, vnodes: Vec<u32>) {
474        if self.gate_vnode_set.is_empty() {
475            self.gate_vnode_set.clone_from(&vnodes);
476        }
477        self.rotation_epoch_floor = self.allocator.peek().0;
478        // Drop reference bases for vnodes shed in a rebalance — they
479        // hold refcounts on serialized state this node no longer owns
480        // (and the new owner builds its own bases from a full upload).
481        self.last_vnode_uploads.retain(|v, _| vnodes.contains(v));
482        self.vnode_set = vnodes;
483    }
484
485    /// Set the vnodes the leader's durability gate checks (the full
486    /// registry in cluster mode). Defaults to `vnode_set` when unset.
487    pub fn set_gate_vnode_set(&mut self, vnodes: Vec<u32>) {
488        self.gate_vnode_set = vnodes;
489    }
490
491    /// Registers a sink connector for checkpoint coordination.
492    pub(crate) fn register_sink(
493        &mut self,
494        name: impl Into<String>,
495        handle: crate::sink_task::SinkTaskHandle,
496        exactly_once: bool,
497    ) {
498        self.sinks.push(RegisteredSink {
499            name: name.into(),
500            handle,
501            exactly_once,
502        });
503        // Invalidate the sorted-name cache; the next checkpoint will
504        // rebuild it.
505        self.cached_sorted_sink_names = None;
506    }
507
508    /// Begins the initial epoch on all exactly-once sinks.
509    ///
510    /// Must be called once after all sinks are registered and before any
511    /// writes occur. This starts the first Kafka transaction for exactly-once
512    /// sinks. Subsequent epochs are started automatically after each
513    /// successful checkpoint commit.
514    ///
515    /// # Errors
516    ///
517    /// Returns the first error from any sink that fails to begin the epoch.
518    pub async fn begin_initial_epoch(&self) -> Result<(), DbError> {
519        self.begin_epoch_for_sinks(self.allocator.peek().0).await
520    }
521
522    /// Shared id allocator, cloned by the pipeline callback so barrier
523    /// admission can claim ids without the coordinator mutex.
524    pub(crate) fn epoch_allocator(&self) -> Arc<EpochAllocator> {
525        Arc::clone(&self.allocator)
526    }
527
528    /// Begins an epoch on all exactly-once sinks. If any sink fails,
529    /// rolls back sinks that already started the epoch.
530    async fn begin_epoch_for_sinks(&self, epoch: u64) -> Result<(), DbError> {
531        let mut started: Vec<&RegisteredSink> = Vec::new();
532        for sink in &self.sinks {
533            if sink.exactly_once {
534                match sink.handle.begin_epoch(epoch).await {
535                    Ok(()) => {
536                        started.push(sink);
537                        debug!(sink = %sink.name, epoch, "began epoch");
538                    }
539                    Err(e) => {
540                        // Roll back sinks that already started.
541                        for s in &started {
542                            if let Err(re) = s.handle.rollback_epoch(epoch).await {
543                                error!(sink = %s.name, epoch, error = %re,
544                                    "[LDB-6016] sink rollback failed during begin_epoch recovery");
545                            }
546                        }
547                        return Err(DbError::Checkpoint(format!(
548                            "sink '{}' failed to begin epoch {epoch}: {e}",
549                            sink.name
550                        )));
551                    }
552                }
553            }
554        }
555        Ok(())
556    }
557
558    /// Inject prometheus engine metrics.
559    pub fn set_metrics(&mut self, prom: Arc<crate::engine_metrics::EngineMetrics>) {
560        self.prom = Some(prom);
561    }
562
563    /// Emits checkpoint metrics to prometheus.
564    fn emit_checkpoint_metrics(&self, success: bool, epoch: u64, duration: Duration) {
565        if let Some(ref m) = self.prom {
566            if success {
567                m.checkpoints_completed.inc();
568            } else {
569                m.checkpoints_failed.inc();
570            }
571            #[allow(clippy::cast_possible_wrap)]
572            m.checkpoint_epoch.set(epoch as i64);
573            m.checkpoint_duration.observe(duration.as_secs_f64());
574        }
575    }
576
577    /// Performs a full checkpoint cycle (steps 3-7).
578    ///
579    /// Steps 1-2 (barrier propagation + operator snapshots) are handled
580    /// externally by the DAG executor and passed in via [`CheckpointRequest`].
581    ///
582    /// # Errors
583    ///
584    /// Returns `DbError::Checkpoint` if any phase fails.
585    pub async fn checkpoint(
586        &mut self,
587        request: CheckpointRequest,
588    ) -> Result<CheckpointResult, DbError> {
589        self.checkpoint_inner(request, None, QuorumStage::RunInline)
590            .await
591    }
592
593    /// Pre-commits all exactly-once sinks (phase 1) with a timeout.
594    ///
595    /// A stuck sink will not block checkpointing indefinitely. The timeout
596    /// is configured via [`CheckpointConfig::pre_commit_timeout`].
597    async fn pre_commit_sinks(&self, epoch: u64) -> Result<(), DbError> {
598        let timeout_dur = self.config.pre_commit_timeout;
599        let start = std::time::Instant::now();
600
601        let result =
602            match tokio::time::timeout(timeout_dur, self.pre_commit_sinks_inner(epoch)).await {
603                Ok(result) => result,
604                Err(_elapsed) => Err(DbError::Checkpoint(format!(
605                    "pre-commit timed out after {}s",
606                    timeout_dur.as_secs()
607                ))),
608            };
609
610        // Record pre-commit duration regardless of success/failure.
611        if let Some(ref m) = self.prom {
612            m.sink_precommit_duration
613                .observe(start.elapsed().as_secs_f64());
614        }
615
616        result
617    }
618
619    /// Inner pre-commit loop (no timeout).
620    ///
621    /// Only sinks with `exactly_once = true` participate in two-phase commit.
622    /// At-most-once sinks are skipped — they receive no `pre_commit`/`commit`
623    /// calls and provide no transactional guarantees.
624    ///
625    /// Fires every sink's `pre_commit` concurrently via `try_join_all` —
626    /// matching the rollback path's shape. The serial version was
627    /// `sum(per-sink pre-commit latency)`; with 4 Delta sinks at ~200ms
628    /// S3 write each, that was 800ms serial. Concurrent = 200ms.
629    async fn pre_commit_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
630        let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
631            let handle = sink.handle.clone();
632            let name = sink.name.clone();
633            async move {
634                let result = handle.pre_commit(epoch).await;
635                match result {
636                    Ok(()) => {
637                        debug!(sink = %name, epoch, "sink pre-committed");
638                        Ok(())
639                    }
640                    Err(e) => Err(DbError::Checkpoint(format!(
641                        "sink '{name}' pre-commit failed: {e}"
642                    ))),
643                }
644            }
645        });
646        futures::future::try_join_all(futures).await.map(|_| ())
647    }
648
649    /// Commit each exactly-once sink in its own task, bounded by
650    /// `commit_timeout`. Per-sink isolation avoids a slow sink blanket-
651    /// failing the whole batch; cancellation lives inside the spawned
652    /// task so an outer drop doesn't leave the sink-task with a dropped
653    /// oneshot ack.
654    async fn commit_sinks_tracked(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
655        let timeout_dur = self.config.commit_timeout;
656        let start = std::time::Instant::now();
657
658        let tasks: Vec<_> = self
659            .sinks
660            .iter()
661            .filter(|s| s.exactly_once)
662            .map(|sink| {
663                let handle = sink.handle.clone();
664                let name = sink.name.clone();
665                let task = tokio::spawn(async move {
666                    tokio::time::timeout(timeout_dur, handle.commit_epoch(epoch)).await
667                });
668                (name, task)
669            })
670            .collect();
671
672        let mut statuses = HashMap::new();
673        for (name, task) in tasks {
674            let status = match task.await {
675                Ok(Ok(Ok(()))) => SinkCommitStatus::Committed,
676                Ok(Ok(Err(e))) => {
677                    error!(sink = %name, epoch, error = %e, "sink commit failed");
678                    SinkCommitStatus::Failed(format!("sink '{name}' commit failed: {e}"))
679                }
680                Ok(Err(_)) => {
681                    error!(
682                        sink = %name, epoch,
683                        timeout_secs = timeout_dur.as_secs(),
684                        "[LDB-6012] sink commit timed out",
685                    );
686                    SinkCommitStatus::Failed(format!(
687                        "sink '{name}' commit timed out after {}s",
688                        timeout_dur.as_secs()
689                    ))
690                }
691                Err(join_err) => {
692                    error!(sink = %name, epoch, error = %join_err, "sink commit task panicked");
693                    SinkCommitStatus::Failed(format!("sink '{name}' commit panicked: {join_err}"))
694                }
695            };
696            statuses.insert(name, status);
697        }
698
699        if let Some(ref m) = self.prom {
700            m.sink_commit_duration
701                .observe(start.elapsed().as_secs_f64());
702        }
703        statuses
704    }
705
706    /// Saves a manifest to the checkpoint store.
707    ///
708    /// Uses [`CheckpointStore::save_with_state`] to write optional sidecar
709    /// data **before** the manifest, ensuring atomicity: if the sidecar write
710    /// fails, the manifest is never persisted.
711    ///
712    /// Takes `Arc<CheckpointManifest>` so the caller can retain its own copy
713    /// without a deep clone. Bounded by [`CheckpointConfig::persist_timeout`]
714    /// to prevent a hung filesystem from stalling the runtime indefinitely.
715    async fn save_manifest(
716        &self,
717        manifest: Arc<CheckpointManifest>,
718        state_data: Option<Vec<bytes::Bytes>>,
719    ) -> Result<(), DbError> {
720        let timeout_dur = self.config.persist_timeout;
721        let fut = self.store.save_with_state(&manifest, state_data.as_deref());
722        match tokio::time::timeout(timeout_dur, fut).await {
723            Ok(Ok(())) => Ok(()),
724            Ok(Err(e)) => Err(DbError::from(e)),
725            Err(_elapsed) => Err(DbError::Checkpoint(format!(
726                "[LDB-6011] manifest persist timed out after {}s — \
727                 filesystem may be degraded",
728                timeout_dur.as_secs()
729            ))),
730        }
731    }
732
733    /// Writes each owned vnode's `partial.bin` so the leader's
734    /// `epoch_complete` gate returns true and sinks can commit.
735    ///
736    /// The payload is a [`VnodePartial`](crate::vnode_partial::VnodePartial)
737    /// carrying the operator-state slices staged via
738    /// [`set_pending_vnode_states`](Self::set_pending_vnode_states) for that
739    /// vnode. A vnode with no staged state writes an empty `VnodePartial`,
740    /// which still seals the durability gate (presence is all the gate checks).
741    ///
742    /// A vnode whose slices are byte-identical to its last full upload
743    /// writes a tiny *reference* partial instead — upload cost scales
744    /// with changed vnodes. References are forced back to full before
745    /// their base leaves the `max_retained` prune window; bases record
746    /// only after every write lands, so a partially-failed epoch
747    /// re-uploads full.
748    ///
749    /// Fires every vnode write concurrently via `try_join_all`: the serial
750    /// version was O(`vnode_count` × per-write latency) — trivial CPU but each
751    /// a scheduling hop (and tens-to-hundreds of ms on a remote object store).
752    async fn write_vnode_partials(
753        &mut self,
754        epoch: u64,
755        checkpoint_id: u64,
756    ) -> Result<(), DbError> {
757        let Some(ref backend) = self.state_backend else {
758            return Ok(());
759        };
760        if self.vnode_set.is_empty() {
761            return Ok(());
762        }
763        // Stamp every write with the current assignment generation. Zero
764        // means the host hasn't wired a version (single-instance path) and
765        // the fence is a no-op.
766        let caller_version = self.assignment_version;
767        let max_ref_age = (self.config.max_retained as u64).max(1);
768
769        // Step 1 (sync): classify each vnode as reference or full and
770        // encode its payload.
771        let mut full_uploads: Vec<(u32, std::collections::HashMap<String, bytes::Bytes>)> =
772            Vec::new();
773        let mut emptied: Vec<u32> = Vec::new();
774        let mut reference_count: u64 = 0;
775        let mut encoded: Vec<(u32, bytes::Bytes)> = Vec::with_capacity(self.vnode_set.len());
776        for &v in &self.vnode_set {
777            let ops = self.pending_vnode_states.get(&v);
778            let base = ops.filter(|ops| !ops.is_empty()).and_then(|ops| {
779                self.last_vnode_uploads
780                    .get(&v)
781                    .filter(|(base, last)| epoch.saturating_sub(*base) < max_ref_age && last == ops)
782                    .map(|(base, _)| *base)
783            });
784            let partial = if let Some(base_epoch) = base {
785                reference_count += 1;
786                crate::vnode_partial::VnodePartial {
787                    checkpoint_id,
788                    operators: Vec::new(),
789                    base_epoch: Some(base_epoch),
790                }
791            } else {
792                match ops {
793                    Some(ops) if !ops.is_empty() => full_uploads.push((v, ops.clone())),
794                    _ => emptied.push(v),
795                }
796                crate::vnode_partial::VnodePartial {
797                    checkpoint_id,
798                    operators: ops
799                        .map(|ops| ops.iter().map(|(n, b)| (n.clone(), b.to_vec())).collect())
800                        .unwrap_or_default(),
801                    base_epoch: None,
802                }
803            };
804            encoded.push((v, bytes::Bytes::from(partial.encode()?)));
805        }
806
807        // Step 2 (async): upload concurrently.
808        let writes = encoded.into_iter().map(|(v, payload)| {
809            let backend = Arc::clone(backend);
810            async move {
811                backend
812                    .write_partial(v, epoch, caller_version, payload)
813                    .await
814                    .map_err(|e| {
815                        DbError::Checkpoint(format!(
816                            "[LDB-6024] vnode partial write failed (vnode={v}, epoch={epoch}): {e}"
817                        ))
818                    })
819            }
820        });
821        futures::future::try_join_all(writes).await?;
822
823        // Step 3 (sync): record the new bases / clear emptied vnodes.
824        for (v, ops) in full_uploads {
825            self.last_vnode_uploads.insert(v, (epoch, ops));
826        }
827        for v in emptied {
828            self.last_vnode_uploads.remove(&v);
829        }
830        if reference_count > 0 {
831            if let Some(ref m) = self.prom {
832                m.checkpoint_unchanged_vnodes.inc_by(reference_count);
833            }
834        }
835        Ok(())
836    }
837
838    /// Poll the state backend's durability gate until every vnode in
839    /// `gate_vnode_set` has its partial for `epoch` persisted (sealing
840    /// the epoch's `_COMMIT` marker), or `restorable_gate_timeout`
841    /// expires. No-op without a backend or with an empty gate set.
842    ///
843    /// Each node writes its partials *after* sink pre-commit + manifest
844    /// save, so full presence proves every node finished its durable
845    /// prepare. Transient backend errors retry until the deadline; a
846    /// split-brain commit marker aborts immediately.
847    async fn await_restorable_gate(
848        &self,
849        epoch: u64,
850        participants: &[QuorumPeer],
851    ) -> Result<(), String> {
852        use laminar_core::state::StateBackendError;
853
854        // Each poll LISTs the epoch prefix on the object store, so back
855        // off exponentially: fast for the common local/in-process case,
856        // ~1 LIST/s steady-state per slow epoch on S3 (gates also
857        // serialize on the coordinator mutex, so at most one loop polls
858        // at a time regardless of pipelining depth). Push-driven
859        // follower upload-completion acks are the protocol-level
860        // replacement for this poll (tracked in the plan).
861        const INITIAL_POLL: Duration = Duration::from_millis(100);
862        const MAX_POLL: Duration = Duration::from_secs(1);
863
864        let Some(ref backend) = self.state_backend else {
865            return Ok(());
866        };
867        if self.gate_vnode_set.is_empty() {
868            return Ok(());
869        }
870
871        let deadline = Instant::now() + self.config.restorable_gate_timeout;
872        let mut interval = INITIAL_POLL;
873        let mut last_state = String::from("not all vnodes persisted");
874        loop {
875            if epoch < self.rotation_epoch_floor {
876                return Err(format!(
877                    "vnode assignment rotated after epoch {epoch} captured \
878                     (rotation floor {}); epoch cannot seal",
879                    self.rotation_epoch_floor
880                ));
881            }
882            match backend.epoch_complete(epoch, &self.gate_vnode_set).await {
883                Ok(true) => return Ok(()),
884                // Keep the default/previous message; a lingering
885                // transient-error string is more informative than
886                // resetting it.
887                Ok(false) => {}
888                Err(e @ StateBackendError::SplitBrainCommit { .. }) => {
889                    return Err(format!("state durability gate: {e}"));
890                }
891                Err(e) => {
892                    // Transient I/O — keep polling until the deadline.
893                    debug!(epoch, error = %e, "durability gate poll error; retrying");
894                    last_state = e.to_string();
895                }
896            }
897            // Fail fast when a capture participant dies: its uploads
898            // will never arrive, so waiting out the timeout only
899            // stalls the pipeline — and with pipelining, queued doomed
900            // epochs would each burn the full timeout serially.
901            #[cfg(feature = "cluster")]
902            if let Some(cc) = self.cluster_controller.as_ref() {
903                if let Some(reason) =
904                    Self::unhealthy_participant(&cc.members_watch().borrow(), participants)
905                {
906                    return Err(format!("durability gate fail-fast: {reason}"));
907                }
908                if let Some(p) = participants
909                    .iter()
910                    .find(|p| cc.is_recently_unresponsive(**p))
911                {
912                    return Err(format!(
913                        "durability gate fail-fast: follower {} missed a capture quorum",
914                        p.0
915                    ));
916                }
917            }
918            #[cfg(not(feature = "cluster"))]
919            let _ = participants;
920            if Instant::now() >= deadline {
921                return Err(format!(
922                    "state durability gate timed out after {:?}: {last_state}",
923                    self.config.restorable_gate_timeout
924                ));
925            }
926            tokio::time::sleep(interval).await;
927            interval = (interval * 2).min(MAX_POLL);
928        }
929    }
930
931    /// Common abandon path for a failed checkpoint attempt: announce
932    /// `Abort` so prepared followers release immediately, roll the
933    /// epoch's sink transactions back, and begin the next epoch's
934    /// transactions so writes arriving after the failure are not
935    /// orphaned in a rolled-back transaction. Ids were allocated at the
936    /// start of [`checkpoint_inner`](Self::checkpoint_inner), so the
937    /// failed epoch is abandoned, never retried.
938    async fn fail_epoch(
939        &mut self,
940        checkpoint_id: u64,
941        epoch: u64,
942        started: Instant,
943        error: String,
944    ) -> CheckpointResult {
945        #[cfg(feature = "cluster")]
946        self.announce_if_leader(
947            epoch,
948            checkpoint_id,
949            laminar_core::cluster::control::Phase::Abort,
950            None,
951        )
952        .await;
953        self.checkpoints_failed += 1;
954        self.phase = CheckpointPhase::Idle;
955        let duration = started.elapsed();
956        self.emit_checkpoint_metrics(false, epoch, duration);
957        if let Err(e) = self.rollback_sinks(epoch).await {
958            error!(
959                checkpoint_id, epoch, error = %e,
960                "[LDB-6004] sink rollback failed after checkpoint failure",
961            );
962        }
963        self.begin_next_epoch_bounded().await;
964        self.pending_vnode_states.clear();
965        CheckpointResult {
966            success: false,
967            checkpoint_id,
968            epoch,
969            duration,
970            error: Some(error),
971        }
972    }
973
974    /// Begin the (already-allocated) next epoch's sink transactions
975    /// after a failed or partially-committed checkpoint, bounded by
976    /// `rollback_timeout` — the sink that just failed may be wedged,
977    /// and an unbounded `begin_epoch` await would hang the coordinator
978    /// on it.
979    async fn begin_next_epoch_bounded(&self) {
980        let next_epoch = self.allocator.peek().0;
981        match tokio::time::timeout(
982            self.config.rollback_timeout,
983            self.begin_epoch_for_sinks(next_epoch),
984        )
985        .await
986        {
987            Ok(Ok(())) => {}
988            Ok(Err(e)) => error!(
989                next_epoch, error = %e,
990                "[LDB-6015] failed to begin next epoch after abandoning a \
991                 failed one — writes will be non-transactional",
992            ),
993            Err(_) => error!(
994                next_epoch,
995                timeout_secs = self.config.rollback_timeout.as_secs(),
996                "[LDB-6015] begin next epoch timed out after a failed \
997                 checkpoint — writes will be non-transactional",
998            ),
999        }
1000    }
1001
1002    /// On startup, reconcile any Pending sinks in the last manifest
1003    /// against the durable commit marker. Marker present → drive
1004    /// local commit (idempotent); marker absent → rollback. Runs on
1005    /// every node; in cluster mode the leader additionally
1006    /// re-announces the decision.
1007    pub async fn reconcile_prepared_on_init(&self) {
1008        let last = match load_highest(self.store.as_ref()).await {
1009            Ok(Some(m)) => m,
1010            Ok(None) => return,
1011            Err(e) => {
1012                error!(error = %e, "[LDB-6041] reconcile skipped: could not load checkpoints");
1013                return;
1014            }
1015        };
1016        let has_pending = last
1017            .sink_commit_statuses
1018            .values()
1019            .any(|s| matches!(s, SinkCommitStatus::Pending));
1020        if !has_pending {
1021            return;
1022        }
1023
1024        let epoch = last.epoch;
1025        let checkpoint_id = last.checkpoint_id;
1026
1027        let committed = match self.decision_store.as_ref() {
1028            Some(ds) => ds.is_committed(epoch).await.unwrap_or_else(|e| {
1029                warn!(
1030                    epoch, checkpoint_id, error = %e,
1031                    "[LDB-6040] decision store read failed — defaulting to Abort",
1032                );
1033                false
1034            }),
1035            None => false,
1036        };
1037
1038        #[cfg(feature = "cluster")]
1039        let is_leader = self
1040            .cluster_controller
1041            .as_ref()
1042            .is_some_and(|cc| cc.is_leader());
1043
1044        if committed {
1045            info!(
1046                epoch,
1047                checkpoint_id, "recovering Pending epoch as Committed"
1048            );
1049            let statuses = self.commit_sinks_tracked(epoch).await;
1050            if let Err(e) = self
1051                .persist_recovered_statuses(checkpoint_id, statuses)
1052                .await
1053            {
1054                warn!(epoch, checkpoint_id, error = %e, "post-recovery manifest update failed");
1055            }
1056            #[cfg(feature = "cluster")]
1057            if is_leader {
1058                self.announce_if_leader(
1059                    epoch,
1060                    checkpoint_id,
1061                    laminar_core::cluster::control::Phase::Commit,
1062                    None,
1063                )
1064                .await;
1065            }
1066        } else {
1067            warn!(
1068                epoch,
1069                checkpoint_id, "[LDB-6035] Pending epoch with no commit marker — rolling back",
1070            );
1071            if let Err(e) = self.rollback_sinks(epoch).await {
1072                error!(epoch, checkpoint_id, error = %e, "sink rollback failed during recovery");
1073            }
1074            #[cfg(feature = "cluster")]
1075            if is_leader {
1076                self.announce_if_leader(
1077                    epoch,
1078                    checkpoint_id,
1079                    laminar_core::cluster::control::Phase::Abort,
1080                    None,
1081                )
1082                .await;
1083            }
1084        }
1085
1086        // Brief pause so the announcement gossips before the next checkpoint tick.
1087        #[cfg(feature = "cluster")]
1088        if is_leader {
1089            tokio::time::sleep(Duration::from_millis(200)).await;
1090        }
1091    }
1092
1093    /// Overwrite the Pending sink statuses on the last manifest with
1094    /// the outcomes produced during recovery.
1095    async fn persist_recovered_statuses(
1096        &self,
1097        checkpoint_id: u64,
1098        statuses: HashMap<String, SinkCommitStatus>,
1099    ) -> Result<(), DbError> {
1100        if statuses.is_empty() {
1101            return Ok(());
1102        }
1103        match self.store.load_by_id(checkpoint_id).await {
1104            Ok(Some(mut m)) => {
1105                m.sink_commit_statuses = statuses;
1106                self.update_manifest_only(Arc::new(m)).await
1107            }
1108            Ok(None) => Ok(()),
1109            Err(e) => Err(DbError::from(e)),
1110        }
1111    }
1112
1113    /// No-op when not the leader. Errors are logged — worst case is a
1114    /// longer follower timeout, not a correctness issue.
1115    ///
1116    /// `min_watermark_ms` is typically `None` on `Prepare`/`Abort` and
1117    /// `Some(cluster_min)` on `Commit` (computed from follower acks +
1118    /// local watermark). Downstream operators read the published
1119    /// value from [`ClusterController`] so event-time decisions stay
1120    /// consistent across the cluster.
1121    #[cfg(feature = "cluster")]
1122    async fn announce_if_leader(
1123        &self,
1124        epoch: u64,
1125        checkpoint_id: u64,
1126        phase: laminar_core::cluster::control::Phase,
1127        min_watermark_ms: Option<i64>,
1128    ) {
1129        let Some(cc) = self.cluster_controller.as_ref() else {
1130            return;
1131        };
1132        if !cc.is_leader() {
1133            return;
1134        }
1135        let ann = laminar_core::cluster::control::BarrierAnnouncement {
1136            epoch,
1137            checkpoint_id,
1138            phase,
1139            flags: 0,
1140            min_watermark_ms,
1141        };
1142        if let Err(e) = cc.announce_barrier(&ann).await {
1143            warn!(
1144                epoch,
1145                checkpoint_id,
1146                ?phase,
1147                error = %e,
1148                "[LDB-6031] barrier announcement failed",
1149            );
1150        }
1151    }
1152
1153    /// Announce PREPARE and block for follower acks. On quorum (or
1154    /// no-op — not leader / no controller) returns the capture-time
1155    /// follower set for the durability gate fail-fast and writes the
1156    /// cluster-wide minimum watermark into `self.cluster_min_watermark`
1157    /// so the subsequent `Commit` announcement can fan it out. On
1158    /// failure, announces `Abort` and returns the failure message.
1159    #[cfg(feature = "cluster")]
1160    async fn await_prepare_quorum(
1161        &mut self,
1162        epoch: u64,
1163        checkpoint_id: u64,
1164    ) -> Result<Vec<laminar_core::cluster::discovery::NodeId>, String> {
1165        use laminar_core::cluster::control::Phase;
1166        let Some(cc) = self.cluster_controller.clone() else {
1167            return Ok(Vec::new());
1168        };
1169        if !cc.is_leader() {
1170            return Ok(Vec::new());
1171        }
1172        match Self::run_prepare_quorum(
1173            &cc,
1174            self.config.quorum_timeout,
1175            epoch,
1176            checkpoint_id,
1177            self.local_watermark_ms,
1178        )
1179        .await
1180        {
1181            Ok((merged, participants)) => {
1182                self.cluster_min_watermark = merged;
1183                Ok(participants)
1184            }
1185            Err(msg) => {
1186                self.announce_if_leader(epoch, checkpoint_id, Phase::Abort, None)
1187                    .await;
1188                Err(msg)
1189            }
1190        }
1191    }
1192
1193    /// Shared health predicate for the capture quorum and the
1194    /// durability gate: a participant that is suspected, draining,
1195    /// left, or vanished can no longer contribute to this epoch, so
1196    /// waiting out the full timeout only stalls the pipeline.
1197    #[cfg(feature = "cluster")]
1198    fn unhealthy_participant(
1199        members: &[laminar_core::cluster::discovery::NodeInfo],
1200        participants: &[QuorumPeer],
1201    ) -> Option<String> {
1202        use laminar_core::cluster::discovery::NodeState;
1203        for &id in participants {
1204            match members.iter().find(|m| m.id.0 == id.0) {
1205                Some(node)
1206                    if matches!(
1207                        node.state,
1208                        NodeState::Suspected | NodeState::Left | NodeState::Draining
1209                    ) =>
1210                {
1211                    return Some(format!(
1212                        "Follower {} transitioned to unhealthy state {:?}",
1213                        id.0, node.state
1214                    ));
1215                }
1216                Some(_) => {}
1217                None => {
1218                    return Some(format!("Follower {} missing from cluster membership", id.0));
1219                }
1220            }
1221        }
1222        None
1223    }
1224
1225    /// The capture-quorum stage, callable without the coordinator mutex
1226    /// so pipelined barrier tails can reach `Aligned` while an earlier
1227    /// epoch's durable tail still holds it. Announces `Prepare`, waits
1228    /// for every live follower's capture ack (failing fast on unhealthy
1229    /// membership), and returns the merged cluster-min watermark. The
1230    /// caller announces `Aligned` on success / `Abort` on failure and
1231    /// publishes the watermark via the announcement.
1232    #[cfg(feature = "cluster")]
1233    pub(crate) async fn run_prepare_quorum(
1234        cc: &laminar_core::cluster::control::ClusterController,
1235        quorum_timeout: Duration,
1236        epoch: u64,
1237        checkpoint_id: u64,
1238        local_watermark_ms: Option<i64>,
1239    ) -> Result<(Option<i64>, Vec<laminar_core::cluster::discovery::NodeId>), String> {
1240        use laminar_core::cluster::control::{BarrierAnnouncement, Phase, QuorumOutcome};
1241
1242        if let Err(e) = cc
1243            .announce_barrier(&BarrierAnnouncement {
1244                epoch,
1245                checkpoint_id,
1246                phase: Phase::Prepare,
1247                flags: 0,
1248                min_watermark_ms: None,
1249            })
1250            .await
1251        {
1252            warn!(epoch, checkpoint_id, error = %e, "[LDB-6031] prepare announcement failed");
1253        }
1254
1255        let mut followers = cc.live_instances();
1256        followers.retain(|id| *id != cc.instance_id());
1257        if followers.is_empty() {
1258            // Leader-only cluster — cluster-wide min is just the
1259            // leader's local watermark (if any).
1260            if let Some(wm) = local_watermark_ms {
1261                cc.publish_cluster_min_watermark(wm);
1262            }
1263            return Ok((local_watermark_ms, Vec::new()));
1264        }
1265
1266        let mut members_rx = cc.members_watch();
1267
1268        let quorum_fut = cc.wait_for_quorum(epoch, &followers, quorum_timeout);
1269        let membership_fut = async {
1270            loop {
1271                if let Some(reason) = Self::unhealthy_participant(&members_rx.borrow(), &followers)
1272                {
1273                    return reason;
1274                }
1275                if members_rx.changed().await.is_err() {
1276                    // Membership watch closed (host shutting down): this
1277                    // arm can no longer fail fast, so park it and let the
1278                    // sibling `select!` arm — the deadline-bounded quorum
1279                    // wait — decide the outcome. Not a leak: the select
1280                    // always completes via that arm.
1281                    futures::future::pending::<()>().await;
1282                }
1283            }
1284        };
1285
1286        let outcome = tokio::select! {
1287            o = quorum_fut => Ok(o),
1288            e = membership_fut => Err(e),
1289        };
1290
1291        match outcome {
1292            Ok(QuorumOutcome::Reached {
1293                min_follower_watermark_ms,
1294                ref acks,
1295            }) => {
1296                // Demonstrably alive — clear any quorum-miss suspicion.
1297                cc.note_responsive(acks);
1298                // Fold follower min with the leader's own watermark.
1299                let merged = match (local_watermark_ms, min_follower_watermark_ms) {
1300                    (Some(a), Some(b)) => Some(a.min(b)),
1301                    (Some(a), None) => Some(a),
1302                    (None, Some(b)) => Some(b),
1303                    (None, None) => None,
1304                };
1305                if let Some(wm) = merged {
1306                    cc.publish_cluster_min_watermark(wm);
1307                }
1308                Ok((merged, followers))
1309            }
1310            Ok(QuorumOutcome::TimedOut { missing, .. }) => {
1311                // Gossip failure detection can lag a hard kill by tens
1312                // of seconds; record the leader's own faster signal so
1313                // the durability gates of already-captured epochs fail
1314                // fast instead of each burning their full timeout.
1315                cc.note_unresponsive(&missing);
1316                Err(format!(
1317                    "quorum timeout: {} follower(s) did not ack",
1318                    missing.len()
1319                ))
1320            }
1321            Ok(QuorumOutcome::Failed { failures }) => {
1322                let first = failures.first().map_or("unknown", |(_, msg)| msg.as_str());
1323                Err(format!(
1324                    "follower snapshot failed on {} peer(s): {first}",
1325                    failures.len()
1326                ))
1327            }
1328            Err(err_msg) => Err(format!("fail-fast: {err_msg}")),
1329        }
1330    }
1331
1332    /// Overwrites an existing manifest with updated fields (e.g., sink commit
1333    /// statuses after Step 6). Uses [`CheckpointStore::update_manifest`] which
1334    /// does NOT use conditional PUT, so the overwrite always succeeds.
1335    async fn update_manifest_only(&self, manifest: Arc<CheckpointManifest>) -> Result<(), DbError> {
1336        let timeout_dur = self.config.persist_timeout;
1337        let fut = self.store.update_manifest(&manifest);
1338        match tokio::time::timeout(timeout_dur, fut).await {
1339            Ok(Ok(())) => Ok(()),
1340            Ok(Err(e)) => Err(DbError::from(e)),
1341            Err(_elapsed) => Err(DbError::Checkpoint(format!(
1342                "manifest update timed out after {}s",
1343                timeout_dur.as_secs()
1344            ))),
1345        }
1346    }
1347
1348    /// Returns initial sink commit statuses (all `Pending`) for the manifest.
1349    fn initial_sink_commit_statuses(&self) -> HashMap<String, SinkCommitStatus> {
1350        self.sinks
1351            .iter()
1352            .filter(|s| s.exactly_once)
1353            .map(|s| (s.name.clone(), SinkCommitStatus::Pending))
1354            .collect()
1355    }
1356
1357    /// Packs operator states into a manifest with optional sidecar chunks.
1358    ///
1359    /// States larger than `threshold` are stored in a sidecar blob rather
1360    /// than base64-inlined in the JSON manifest. The returned `Vec<Bytes>`
1361    /// is handed to `save_with_state` as a chain — the object-store path
1362    /// builds a multi-chunk `PutPayload` (no contiguous buffer), the FS
1363    /// path writes chunks sequentially. Either way no full-state copy
1364    /// happens here.
1365    fn pack_operator_states(
1366        manifest: &mut CheckpointManifest,
1367        operator_states: &HashMap<String, bytes::Bytes>,
1368        threshold: usize,
1369    ) -> Option<Vec<bytes::Bytes>> {
1370        let mut sidecar_chunks: Vec<bytes::Bytes> = Vec::new();
1371        let mut offset: u64 = 0;
1372        for (name, data) in operator_states {
1373            let (op_ckpt, maybe_blob) =
1374                laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::from_bytes_shared(
1375                    data.clone(),
1376                    threshold,
1377                    offset,
1378                );
1379            if let Some(blob) = maybe_blob {
1380                offset += blob.len() as u64;
1381                sidecar_chunks.push(blob);
1382            }
1383            manifest.operator_states.insert(name.clone(), op_ckpt);
1384        }
1385
1386        if sidecar_chunks.is_empty() {
1387            None
1388        } else {
1389            Some(sidecar_chunks)
1390        }
1391    }
1392
1393    /// Rolls back all exactly-once sinks in parallel, bounded by
1394    /// [`CheckpointConfig::rollback_timeout`].
1395    async fn rollback_sinks(&self, epoch: u64) -> Result<(), DbError> {
1396        let timeout_dur = self.config.rollback_timeout;
1397        match tokio::time::timeout(timeout_dur, self.rollback_sinks_inner(epoch)).await {
1398            Ok(result) => result,
1399            Err(_elapsed) => {
1400                error!(
1401                    epoch,
1402                    timeout_secs = timeout_dur.as_secs(),
1403                    "[LDB-6016] sink rollback timed out"
1404                );
1405                Err(DbError::Checkpoint(format!(
1406                    "rollback timed out after {}s",
1407                    timeout_dur.as_secs()
1408                )))
1409            }
1410        }
1411    }
1412
1413    async fn rollback_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
1414        let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
1415            let handle = sink.handle.clone();
1416            let name = sink.name.clone();
1417            async move {
1418                // Live abandon, not a hard rollback — a healthy sink
1419                // keeps its pending output (see `SinkCommand::RollbackEpoch`).
1420                // Restart-time rollback (recovery manager) stays hard.
1421                let result = handle.abandon_epoch(epoch).await;
1422                (name, result)
1423            }
1424        });
1425        let results = futures::future::join_all(futures).await;
1426
1427        let mut errors = Vec::new();
1428        for (name, result) in results {
1429            if let Err(e) = result {
1430                error!(sink = %name, epoch, error = %e, "[LDB-6016] sink rollback failed");
1431                errors.push(format!("sink '{name}': {e}"));
1432            }
1433        }
1434        if errors.is_empty() {
1435            Ok(())
1436        } else {
1437            Err(DbError::Checkpoint(format!(
1438                "rollback failed: {}",
1439                errors.join("; ")
1440            )))
1441        }
1442    }
1443
1444    /// Per-sink epoch map for the manifest: every exactly-once sink is
1445    /// committing `epoch` (passed in — `self.epoch` already points at
1446    /// the next epoch once a checkpoint is underway).
1447    fn collect_sink_epochs(&self, epoch: u64) -> HashMap<String, u64> {
1448        let mut epochs = HashMap::with_capacity(self.sinks.len());
1449        for sink in &self.sinks {
1450            if sink.exactly_once {
1451                epochs.insert(sink.name.clone(), epoch);
1452            }
1453        }
1454        epochs
1455    }
1456
1457    /// Returns sorted sink names for topology tracking in the manifest.
1458    ///
1459    /// Computed once per topology change (via `register_sink`) and
1460    /// cached; subsequent checkpoints clone the cached Vec rather than
1461    /// re-sorting the sinks list.
1462    fn sorted_sink_names(&mut self) -> Vec<String> {
1463        if self.cached_sorted_sink_names.is_none() {
1464            let mut names: Vec<String> = self.sinks.iter().map(|s| s.name.clone()).collect();
1465            names.sort();
1466            self.cached_sorted_sink_names = Some(names);
1467        }
1468        // Invariant: set above.
1469        self.cached_sorted_sink_names.as_ref().unwrap().clone()
1470    }
1471
1472    /// Returns the current phase.
1473    #[must_use]
1474    pub fn phase(&self) -> CheckpointPhase {
1475        self.phase
1476    }
1477
1478    /// Returns the next epoch to be allocated.
1479    #[must_use]
1480    pub fn epoch(&self) -> u64 {
1481        self.allocator.peek().0
1482    }
1483
1484    /// Returns the next checkpoint ID to be allocated.
1485    #[must_use]
1486    pub fn next_checkpoint_id(&self) -> u64 {
1487        self.allocator.peek().1
1488    }
1489
1490    /// Returns the checkpoint config.
1491    #[must_use]
1492    pub fn config(&self) -> &CheckpointConfig {
1493        &self.config
1494    }
1495
1496    /// Returns checkpoint statistics.
1497    #[must_use]
1498    pub fn stats(&self) -> CheckpointStats {
1499        let (p50, p95, p99) = self.duration_histogram.percentiles();
1500        // Histogram stores microseconds; stats fields are milliseconds.
1501        CheckpointStats {
1502            completed: self.checkpoints_completed,
1503            failed: self.checkpoints_failed,
1504            last_duration: self.last_checkpoint_duration,
1505            duration_p50_ms: p50 / 1_000,
1506            duration_p95_ms: p95 / 1_000,
1507            duration_p99_ms: p99 / 1_000,
1508            total_bytes_written: self.total_bytes_written,
1509            current_phase: self.phase,
1510            current_epoch: self.allocator.peek().0,
1511        }
1512    }
1513
1514    /// Returns a reference to the underlying store.
1515    #[must_use]
1516    pub fn store(&self) -> &dyn CheckpointStore {
1517        &*self.store
1518    }
1519
1520    /// Performs a full checkpoint with pre-captured source offsets.
1521    ///
1522    /// When [`CheckpointRequest::source_offset_overrides`] is non-empty,
1523    /// those sources skip the live `snapshot_sources()` call and use the
1524    /// provided offsets instead. This is essential for barrier-aligned
1525    /// checkpoints where source positions must match the operator state
1526    /// at the barrier point.
1527    ///
1528    /// # Errors
1529    ///
1530    /// Returns `DbError::Checkpoint` if any phase fails.
1531    pub async fn checkpoint_with_offsets(
1532        &mut self,
1533        request: CheckpointRequest,
1534    ) -> Result<CheckpointResult, DbError> {
1535        self.checkpoint_inner(request, None, QuorumStage::RunInline)
1536            .await
1537    }
1538
1539    /// Pipelined-barrier entry point: ids were allocated at admission
1540    /// and the capture quorum already ran before the coordinator mutex
1541    /// was taken (see `QuorumStage::Done`).
1542    ///
1543    /// # Errors
1544    /// Returns `DbError::Checkpoint` if any phase fails.
1545    pub(crate) async fn checkpoint_preallocated(
1546        &mut self,
1547        request: CheckpointRequest,
1548        epoch: u64,
1549        checkpoint_id: u64,
1550        quorum: QuorumStage,
1551    ) -> Result<CheckpointResult, DbError> {
1552        self.checkpoint_inner(request, Some((epoch, checkpoint_id)), quorum)
1553            .await
1554    }
1555
1556    /// Abandon a pre-allocated epoch whose pre-mutex stage (alignment
1557    /// or capture quorum) failed: announce `Abort`, roll back, and
1558    /// begin the next epoch's sink transactions.
1559    #[cfg(feature = "cluster")]
1560    pub(crate) async fn abandon_epoch(
1561        &mut self,
1562        checkpoint_id: u64,
1563        epoch: u64,
1564        error: String,
1565    ) -> CheckpointResult {
1566        self.fail_epoch(checkpoint_id, epoch, Instant::now(), error)
1567            .await
1568    }
1569
1570    /// Follower half of a cluster checkpoint: ack the capture, run the
1571    /// durable prepare (pre-commit + manifest + partial uploads), then
1572    /// wait for the leader's commit/abort.
1573    /// `Ok(true)` = committed, `Ok(false)` = aborted/timed out.
1574    ///
1575    /// The ack precedes the durable prepare — it means "aligned +
1576    /// captured". The leader verifies prepare completion through the
1577    /// restorable gate instead (partials are written last, so their
1578    /// presence implies the whole prepare finished).
1579    ///
1580    /// # Errors
1581    /// Propagates sink pre-commit, manifest save, or marker-write failures.
1582    #[cfg(feature = "cluster")]
1583    pub async fn follower_checkpoint(
1584        &mut self,
1585        request: CheckpointRequest,
1586        ann: laminar_core::cluster::control::BarrierAnnouncement,
1587        decision_timeout: Duration,
1588    ) -> Result<bool, DbError> {
1589        use laminar_core::cluster::control::BarrierAck;
1590
1591        let Some(cc) = self.cluster_controller.clone() else {
1592            return Err(DbError::Checkpoint(
1593                "[LDB-6033] follower_checkpoint called without cluster controller".into(),
1594            ));
1595        };
1596
1597        // Capture ack — state is already consistently captured by the
1598        // caller; the leader needs nothing more to release pipelines.
1599        cc.ack_barrier(&BarrierAck {
1600            epoch: ann.epoch,
1601            ok: true,
1602            error: None,
1603            // Leader folds this into the cluster-wide min announced on
1604            // Aligned/Commit. `None` is non-blocking.
1605            local_watermark_ms: self.local_watermark_ms,
1606        })
1607        .await
1608        .ok(); // best effort; leader's quorum wait tolerates missed acks
1609
1610        self.follower_checkpoint_acked(request, ann, decision_timeout)
1611            .await
1612    }
1613
1614    /// [`follower_checkpoint`](Self::follower_checkpoint) minus the
1615    /// capture ack: prepare, await the decision, then commit/rollback.
1616    /// Pipelined tails call the three stages separately so the
1617    /// decision wait does not hold the coordinator mutex (the next
1618    /// epoch's uploads would queue behind it for up to
1619    /// `decision_timeout`).
1620    ///
1621    /// # Errors
1622    /// Propagates sink pre-commit, manifest save, or marker-write failures.
1623    #[cfg(feature = "cluster")]
1624    pub(crate) async fn follower_checkpoint_acked(
1625        &mut self,
1626        request: CheckpointRequest,
1627        ann: laminar_core::cluster::control::BarrierAnnouncement,
1628        decision_timeout: Duration,
1629    ) -> Result<bool, DbError> {
1630        let Some(cc) = self.cluster_controller.clone() else {
1631            return Err(DbError::Checkpoint(
1632                "[LDB-6033] follower_checkpoint called without cluster controller".into(),
1633            ));
1634        };
1635        let (epoch, checkpoint_id) = (ann.epoch, ann.checkpoint_id);
1636        self.follower_prepare_acked(request, epoch, checkpoint_id)
1637            .await?;
1638        let committed = Self::await_follower_decision(
1639            &cc,
1640            self.decision_store.as_deref(),
1641            epoch,
1642            checkpoint_id,
1643            decision_timeout,
1644        )
1645        .await;
1646        Ok(self.follower_finish(epoch, checkpoint_id, committed).await)
1647    }
1648
1649    /// Stage 1 of the follower tail: durable prepare (sink pre-commit +
1650    /// manifest + partial uploads), after the capture ack. On failure,
1651    /// a best-effort `ok = false` ack overwrites the capture ack and
1652    /// the epoch rolls back.
1653    ///
1654    /// # Errors
1655    /// Propagates sink pre-commit, manifest save, or marker-write failures.
1656    #[cfg(feature = "cluster")]
1657    pub(crate) async fn follower_prepare_acked(
1658        &mut self,
1659        request: CheckpointRequest,
1660        epoch: u64,
1661        checkpoint_id: u64,
1662    ) -> Result<(), DbError> {
1663        use laminar_core::cluster::control::BarrierAck;
1664
1665        // Track the leader's ids so a later leader-mode call resumes
1666        // correctly. Monotonic: a depth>1 tail finishing late must not
1667        // walk the allocator backwards past a successor's ids.
1668        self.allocator
1669            .advance_to(epoch, checkpoint_id.saturating_add(1));
1670
1671        if let Err(e) = self.follower_prepare(request, epoch, checkpoint_id).await {
1672            if let Some(cc) = self.cluster_controller.clone() {
1673                cc.ack_barrier(&BarrierAck {
1674                    epoch,
1675                    ok: false,
1676                    error: Some(e.to_string()),
1677                    local_watermark_ms: self.local_watermark_ms,
1678                })
1679                .await
1680                .ok();
1681            }
1682            self.rollback_sinks(epoch).await.ok();
1683            self.phase = CheckpointPhase::Idle;
1684            // The rollback aborted the sinks' open transaction; open
1685            // the next epoch's so post-failure writes stay
1686            // transactional (mirrors the leader's `fail_epoch`).
1687            self.begin_next_epoch_bounded().await;
1688            return Err(e);
1689        }
1690        Ok(())
1691    }
1692
1693    /// Stage 2 of the follower tail: wait for the leader's decision —
1694    /// `&self`-free so the wait never holds the coordinator mutex.
1695    /// Only Commit/Abort (or epoch advancement) end the wait; a newer
1696    /// epoch's announcement can supersede this epoch's Commit under
1697    /// latest-wins observation, so the durable marker is the
1698    /// tie-breaker. Returns the verdict.
1699    #[cfg(feature = "cluster")]
1700    pub(crate) async fn await_follower_decision(
1701        cc: &laminar_core::cluster::control::ClusterController,
1702        decision_store: Option<&laminar_core::checkpoint_decision::CheckpointDecisionStore>,
1703        epoch: u64,
1704        checkpoint_id: u64,
1705        decision_timeout: Duration,
1706    ) -> bool {
1707        use laminar_core::cluster::control::Phase;
1708
1709        let is_marked = || async {
1710            match decision_store {
1711                Some(ds) => ds.is_committed(epoch).await.unwrap_or_else(|e| {
1712                    warn!(
1713                        epoch, checkpoint_id, error = %e,
1714                        "[LDB-6045] decision store read failed — defaulting to Abort",
1715                    );
1716                    false
1717                }),
1718                None => false,
1719            }
1720        };
1721
1722        let deadline = Instant::now() + decision_timeout;
1723        loop {
1724            let remaining = deadline.saturating_duration_since(Instant::now());
1725            let decision = cc
1726                .wait_for_barrier(
1727                    |a| {
1728                        a.epoch > epoch
1729                            || (a.epoch == epoch && matches!(a.phase, Phase::Commit | Phase::Abort))
1730                    },
1731                    remaining,
1732                )
1733                .await;
1734            match decision {
1735                Some(a) if a.epoch == epoch => return a.phase == Phase::Commit,
1736                Some(a) => {
1737                    // Newer epoch supersedes the decision announcement.
1738                    if is_marked().await {
1739                        info!(
1740                            epoch,
1741                            checkpoint_id,
1742                            observed_epoch = a.epoch,
1743                            "newer epoch observed with commit marker present — committing",
1744                        );
1745                        return true;
1746                    }
1747                    // No marker yet — keep waiting. Each pass through
1748                    // this (rare) state costs an object-store HEAD, so
1749                    // pace the re-check well below the decision timeout.
1750                    tokio::time::sleep(Duration::from_millis(500)).await;
1751                }
1752                None => {
1753                    // Deadline: one last durable-marker check.
1754                    if is_marked().await {
1755                        warn!(
1756                            epoch,
1757                            checkpoint_id,
1758                            "[LDB-6046] follower timeout but marker present — committing",
1759                        );
1760                        return true;
1761                    }
1762                    warn!(
1763                        epoch,
1764                        checkpoint_id, "[LDB-6034] follower decision timeout; rolling back",
1765                    );
1766                    return false;
1767                }
1768            }
1769        }
1770    }
1771
1772    /// Stage 3 of the follower tail: act on the decision. Returns
1773    /// `true` on a clean commit.
1774    #[cfg(feature = "cluster")]
1775    pub(crate) async fn follower_finish(
1776        &mut self,
1777        epoch: u64,
1778        checkpoint_id: u64,
1779        committed: bool,
1780    ) -> bool {
1781        let clean = if committed {
1782            self.drive_follower_commit(epoch, checkpoint_id).await
1783        } else {
1784            self.rollback_sinks(epoch).await.ok();
1785            self.checkpoints_failed += 1;
1786            self.phase = CheckpointPhase::Idle;
1787            false
1788        };
1789        // Commit and rollback both close the sinks' open transaction;
1790        // open the next epoch's so subsequent writes are transactional
1791        // (the leader's mirror lives in `checkpoint_inner`/`fail_epoch`).
1792        self.begin_next_epoch_bounded().await;
1793        clean
1794    }
1795
1796    /// Durable commit-marker store handle, for the lock-free decision
1797    /// wait in pipelined follower tails.
1798    #[cfg(feature = "cluster")]
1799    pub(crate) fn decision_store_handle(
1800        &self,
1801    ) -> Option<Arc<laminar_core::checkpoint_decision::CheckpointDecisionStore>> {
1802        self.decision_store.clone()
1803    }
1804
1805    /// Commit this follower's sinks for `epoch` and update its
1806    /// manifest's Pending entries. Returns `true` on clean commit.
1807    #[cfg(feature = "cluster")]
1808    async fn drive_follower_commit(&mut self, epoch: u64, checkpoint_id: u64) -> bool {
1809        let statuses = self.commit_sinks_tracked(epoch).await;
1810        let has_failures = statuses
1811            .values()
1812            .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1813        if has_failures {
1814            error!(
1815                epoch,
1816                checkpoint_id, "follower sink commit partially failed — rolling back",
1817            );
1818            self.rollback_sinks(epoch).await.ok();
1819            self.checkpoints_failed += 1;
1820            self.phase = CheckpointPhase::Idle;
1821            return false;
1822        }
1823        // Overwrite the follower's own manifest so the Pending sink
1824        // entries stamped during prepare get replaced with the
1825        // Committed statuses we just produced.
1826        if let Err(e) = self
1827            .persist_recovered_statuses(checkpoint_id, statuses)
1828            .await
1829        {
1830            warn!(
1831                checkpoint_id,
1832                epoch,
1833                error = %e,
1834                "follower post-commit manifest update failed",
1835            );
1836        }
1837        self.checkpoints_completed += 1;
1838        let (_, next_id) = self.allocator.peek();
1839        self.allocator.advance_to(epoch.saturating_add(1), next_id);
1840        self.phase = CheckpointPhase::Idle;
1841        true
1842    }
1843
1844    /// Pre-commit + save manifest + write vnode markers.
1845    #[cfg(feature = "cluster")]
1846    async fn follower_prepare(
1847        &mut self,
1848        request: CheckpointRequest,
1849        epoch: u64,
1850        checkpoint_id: u64,
1851    ) -> Result<(), DbError> {
1852        let CheckpointRequest {
1853            operator_states,
1854            watermark,
1855            table_store_checkpoint_path,
1856            extra_table_offsets,
1857            source_watermarks,
1858            pipeline_hash,
1859            source_offset_overrides,
1860        } = request;
1861
1862        self.phase = CheckpointPhase::PreCommitting;
1863        if let Err(e) = self.pre_commit_sinks(epoch).await {
1864            self.pending_vnode_states.clear();
1865            return Err(e);
1866        }
1867
1868        let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1869        manifest.source_offsets = source_offset_overrides;
1870        manifest.table_offsets = extra_table_offsets;
1871        manifest.sink_epochs = self.collect_sink_epochs(epoch);
1872        manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1873        manifest.watermark = watermark;
1874        manifest.source_watermarks = source_watermarks;
1875        manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1876        manifest.source_names = {
1877            let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
1878            names.sort();
1879            names
1880        };
1881        manifest.sink_names = self.sorted_sink_names();
1882        manifest.pipeline_hash = pipeline_hash;
1883        let state_data = Self::pack_operator_states(
1884            &mut manifest,
1885            &operator_states,
1886            self.config.state_inline_threshold,
1887        );
1888
1889        self.phase = CheckpointPhase::Persisting;
1890        if let Err(e) = self.save_manifest(Arc::new(manifest), state_data).await {
1891            self.pending_vnode_states.clear();
1892            return Err(e);
1893        }
1894        if let Err(e) = self.write_vnode_partials(epoch, checkpoint_id).await {
1895            self.pending_vnode_states.clear();
1896            return Err(e);
1897        }
1898        self.pending_vnode_states.clear();
1899        Ok(())
1900    }
1901
1902    /// Shared checkpoint implementation for all checkpoint entry points.
1903    ///
1904    /// When [`CheckpointRequest::source_offset_overrides`] is non-empty,
1905    /// those sources use the provided offsets instead of calling
1906    /// `snapshot_sources()`. This ensures barrier-aligned and pre-captured
1907    /// offsets are used atomically.
1908    #[allow(clippy::too_many_lines)]
1909    async fn checkpoint_inner(
1910        &mut self,
1911        request: CheckpointRequest,
1912        ids: Option<(u64, u64)>,
1913        quorum: QuorumStage,
1914    ) -> Result<CheckpointResult, DbError> {
1915        let CheckpointRequest {
1916            operator_states,
1917            watermark,
1918            table_store_checkpoint_path,
1919            extra_table_offsets,
1920            source_watermarks,
1921            pipeline_hash,
1922            source_offset_overrides,
1923        } = request;
1924        let start = Instant::now();
1925        // Ids are allocated up front (Flink-style): a failed attempt is
1926        // abandoned — never retried under the same ids — so a future
1927        // epoch's identity never depends on this one's outcome, which
1928        // is what allows epochs to overlap. Pipelined barrier paths
1929        // allocate at admission and pass the ids in; forced/timer
1930        // paths allocate here.
1931        let (epoch, checkpoint_id) = ids.unwrap_or_else(|| self.allocator.allocate());
1932
1933        info!(checkpoint_id, epoch, "starting checkpoint");
1934
1935        // Source offsets are provided by the caller (pre-captured at barrier
1936        // alignment or pre-spawn). Table offsets come from extra_table_offsets.
1937        self.phase = CheckpointPhase::Snapshotting;
1938        let source_offsets = source_offset_overrides;
1939        let table_offsets = extra_table_offsets;
1940
1941        // Two-level completion, level 1: collect capture acks from
1942        // every live follower, then announce `Aligned` (the pipeline
1943        // resume gate); the restorable gate below verifies their
1944        // durable prepares. Pipelined barrier tails run this stage
1945        // pre-mutex and pass `Done` here.
1946        #[cfg(feature = "cluster")]
1947        #[allow(unused_assignments)] // both match arms assign; init keeps non-cluster shape
1948        let mut quorum_participants: Vec<QuorumPeer> = Vec::new();
1949        #[cfg(feature = "cluster")]
1950        match quorum {
1951            QuorumStage::RunInline => {
1952                match self.await_prepare_quorum(epoch, checkpoint_id).await {
1953                    Ok(p) => quorum_participants = p,
1954                    Err(quorum_failure) => {
1955                        error!(checkpoint_id, epoch, error = %quorum_failure, "[LDB-6032] quorum miss");
1956                        return Ok(self
1957                            .fail_epoch(checkpoint_id, epoch, start, quorum_failure)
1958                            .await);
1959                    }
1960                }
1961                self.announce_if_leader(
1962                    epoch,
1963                    checkpoint_id,
1964                    laminar_core::cluster::control::Phase::Aligned,
1965                    self.cluster_min_watermark,
1966                )
1967                .await;
1968            }
1969            QuorumStage::Done {
1970                min_watermark_ms,
1971                participants,
1972            } => {
1973                self.cluster_min_watermark = min_watermark_ms;
1974                quorum_participants = participants;
1975            }
1976        }
1977        #[cfg(not(feature = "cluster"))]
1978        let _ = quorum;
1979
1980        self.phase = CheckpointPhase::PreCommitting;
1981        if let Err(e) = self.pre_commit_sinks(epoch).await {
1982            error!(checkpoint_id, epoch, error = %e, "pre-commit failed");
1983            return Ok(self
1984                .fail_epoch(
1985                    checkpoint_id,
1986                    epoch,
1987                    start,
1988                    format!("pre-commit failed: {e}"),
1989                )
1990                .await);
1991        }
1992
1993        let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1994        manifest.source_offsets = source_offsets;
1995        manifest.table_offsets = table_offsets;
1996        manifest.sink_epochs = self.collect_sink_epochs(epoch);
1997        // Mark all exactly-once sinks as Pending before commit phase.
1998        manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1999        manifest.watermark = watermark;
2000        // Use caller-provided per-source watermarks if available. When empty,
2001        // leave source_watermarks empty — recovery falls back to the global
2002        // manifest.watermark. Do NOT fabricate per-source values from the
2003        // global watermark, as that loses granularity on recovery.
2004        manifest.source_watermarks = source_watermarks;
2005        manifest.table_store_checkpoint_path = table_store_checkpoint_path;
2006        manifest.source_names = {
2007            let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
2008            names.sort();
2009            names
2010        };
2011        manifest.sink_names = self.sorted_sink_names();
2012        manifest.pipeline_hash = pipeline_hash;
2013
2014        let state_data = Self::pack_operator_states(
2015            &mut manifest,
2016            &operator_states,
2017            self.config.state_inline_threshold,
2018        );
2019        let sidecar_bytes = state_data
2020            .as_ref()
2021            .map_or(0, |chunks| chunks.iter().map(bytes::Bytes::len).sum());
2022        if sidecar_bytes > 0 {
2023            debug!(
2024                checkpoint_id,
2025                sidecar_bytes, "writing operator state sidecar"
2026            );
2027        }
2028
2029        if let Some(cap) = self.config.max_checkpoint_bytes {
2030            if sidecar_bytes > cap {
2031                let msg = format!(
2032                    "[LDB-6014] checkpoint size {sidecar_bytes} bytes exceeds \
2033                     cap {cap} bytes — checkpoint rejected"
2034                );
2035                error!(checkpoint_id, epoch, sidecar_bytes, cap, "{msg}");
2036                // `fail_epoch` also rolls the pre-committed sinks back —
2037                // previously this path returned without a rollback,
2038                // leaving the epoch's transactions open.
2039                return Ok(self.fail_epoch(checkpoint_id, epoch, start, msg).await);
2040            }
2041            let warn_threshold = cap * 4 / 5; // 80%
2042            if sidecar_bytes > warn_threshold {
2043                warn!(
2044                    checkpoint_id,
2045                    epoch, sidecar_bytes, cap, "checkpoint size approaching cap (>80%)"
2046                );
2047            }
2048        }
2049        let checkpoint_bytes = sidecar_bytes as u64;
2050
2051        self.phase = CheckpointPhase::Persisting;
2052        // Arc-wrap so the save task gets a cheap refcount bump instead of
2053        // a deep clone. After `save_manifest.await` the task drops its
2054        // Arc and we're the sole owner; `Arc::make_mut` below gets us a
2055        // free mutable reference for the post-commit sink-status update.
2056        let mut manifest = Arc::new(manifest);
2057        if let Err(e) = self.save_manifest(Arc::clone(&manifest), state_data).await {
2058            error!(checkpoint_id, epoch, error = %e, "[LDB-6008] manifest persist failed");
2059            return Ok(self
2060                .fail_epoch(
2061                    checkpoint_id,
2062                    epoch,
2063                    start,
2064                    format!("manifest persist failed: {e}"),
2065                )
2066                .await);
2067        }
2068
2069        // Publish each owned vnode's partial (operator-state slice + commit
2070        // marker in one blob) so the durability gate below has something to
2071        // check and a future owner can rehydrate the vnode's state.
2072        if let Err(e) = self.write_vnode_partials(epoch, checkpoint_id).await {
2073            error!(checkpoint_id, epoch, error = %e, "[LDB-6025] vnode partial write failed");
2074            return Ok(self
2075                .fail_epoch(
2076                    checkpoint_id,
2077                    epoch,
2078                    start,
2079                    format!("vnode partial write failed: {e}"),
2080                )
2081                .await);
2082        }
2083
2084        // Durability gate (level 2, "restorable"): confirm every
2085        // participating vnode has its partial persisted before sinks
2086        // commit. `gate_vnode_set` is the FULL registry in cluster mode —
2087        // the leader verifies markers from every follower's
2088        // shared-storage writes, not just its own. Single-instance
2089        // defaults to `vnode_set` (the two match). Followers upload
2090        // asynchronously after their capture ack, so this polls until
2091        // `restorable_gate_timeout` rather than checking once.
2092        #[cfg(not(feature = "cluster"))]
2093        let quorum_participants: Vec<QuorumPeer> = Vec::new();
2094        let gate_start = Instant::now();
2095        let gate_result = self
2096            .await_restorable_gate(epoch, &quorum_participants)
2097            .await;
2098        // Observed on failure too — a burned gate timeout is the signal
2099        // that decides when the push-driven completion-ack follow-up is
2100        // worth building.
2101        if let Some(ref m) = self.prom {
2102            m.checkpoint_restorable_gate_wait
2103                .observe(gate_start.elapsed().as_secs_f64());
2104        }
2105        if let Err(gate_err) = gate_result {
2106            warn!(
2107                checkpoint_id,
2108                epoch,
2109                vnodes = self.gate_vnode_set.len(),
2110                error = %gate_err,
2111                "[LDB-6020] state durability gate failed — rolling back sinks",
2112            );
2113            return Ok(self.fail_epoch(checkpoint_id, epoch, start, gate_err).await);
2114        }
2115
2116        // Record the commit marker before issuing sink commits — this
2117        // is the durable record of the commit decision that recovery
2118        // reads to distinguish "committed mid-flight" from "never
2119        // committed". Cluster mode gates on leadership.
2120        let is_decision_leader = {
2121            #[cfg(feature = "cluster")]
2122            {
2123                self.cluster_controller
2124                    .as_ref()
2125                    .is_none_or(|cc| cc.is_leader())
2126            }
2127            #[cfg(not(feature = "cluster"))]
2128            {
2129                true
2130            }
2131        };
2132        if is_decision_leader {
2133            if let Some(ds) = self.decision_store.as_ref() {
2134                if let Err(e) = ds.record_committed(epoch).await {
2135                    error!(
2136                        checkpoint_id, epoch, error = %e,
2137                        "[LDB-6038] cannot record commit marker — aborting epoch",
2138                    );
2139                    return Ok(self
2140                        .fail_epoch(checkpoint_id, epoch, start, format!("commit marker: {e}"))
2141                        .await);
2142                }
2143            }
2144        }
2145
2146        #[cfg(feature = "cluster")]
2147        // Fan out the cluster-wide min watermark computed during
2148        // `await_prepare_quorum`. Followers consume this from
2149        // `observe_barrier` and update their consumer-side view.
2150        self.announce_if_leader(
2151            epoch,
2152            checkpoint_id,
2153            laminar_core::cluster::control::Phase::Commit,
2154            self.cluster_min_watermark,
2155        )
2156        .await;
2157
2158        self.phase = CheckpointPhase::Committing;
2159        let sink_statuses = self.commit_sinks_tracked(epoch).await;
2160        let has_failures = sink_statuses
2161            .values()
2162            .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
2163
2164        if !sink_statuses.is_empty() {
2165            // `Arc::make_mut`: COW. Refcount is 1 here (spawn_blocking
2166            // task in save_manifest has already returned and dropped its
2167            // clone), so this is a zero-copy mutable borrow.
2168            Arc::make_mut(&mut manifest).sink_commit_statuses = sink_statuses;
2169            if let Err(e) = self.update_manifest_only(Arc::clone(&manifest)).await {
2170                warn!(
2171                    checkpoint_id,
2172                    epoch,
2173                    error = %e,
2174                    "post-commit manifest update failed"
2175                );
2176            }
2177        }
2178
2179        if has_failures {
2180            // The commit decision is already durable (marker + Commit
2181            // announcement), so the epoch is NOT rolled back: the failed
2182            // sinks' statuses are recorded in the manifest and re-driven
2183            // by `reconcile_prepared_on_init` on restart. Begin the next
2184            // epoch so subsequent writes stay transactional.
2185            self.checkpoints_failed += 1;
2186            error!(
2187                checkpoint_id,
2188                epoch,
2189                "sink commit partially failed after the commit decision — \
2190                 statuses recorded for recovery-time re-drive"
2191            );
2192            self.phase = CheckpointPhase::Idle;
2193            let duration = start.elapsed();
2194            self.emit_checkpoint_metrics(false, epoch, duration);
2195            self.begin_next_epoch_bounded().await;
2196            self.pending_vnode_states.clear();
2197            return Ok(CheckpointResult {
2198                success: false,
2199                checkpoint_id,
2200                epoch,
2201                duration,
2202                error: Some("partial sink commit failure".into()),
2203            });
2204        }
2205
2206        self.phase = CheckpointPhase::Idle;
2207        self.checkpoints_completed += 1;
2208        self.total_bytes_written += checkpoint_bytes;
2209        let duration = start.elapsed();
2210        self.last_checkpoint_duration = Some(duration);
2211        self.duration_histogram.record(duration);
2212        self.emit_checkpoint_metrics(true, epoch, duration);
2213
2214        // Emit checkpoint size metrics.
2215        if let Some(ref m) = self.prom {
2216            #[allow(clippy::cast_possible_wrap)]
2217            m.checkpoint_size_bytes.set(checkpoint_bytes as i64);
2218        }
2219
2220        // Garbage-collect state-backend partials / commit markers for
2221        // epochs no longer needed for recovery. Without this the
2222        // in-process backend grows per-checkpoint forever and the
2223        // object-store backend leaks `epoch=N/…` objects indefinitely.
2224        // `max_retained` is in terms of checkpoints which map 1:1 to
2225        // epochs here, so prune everything older than
2226        // `epoch - max_retained`.
2227        if let Some(ref backend) = self.state_backend {
2228            let horizon = epoch.saturating_sub(self.config.max_retained as u64);
2229            if horizon > 0 {
2230                if let Err(e) = backend.prune_before(horizon).await {
2231                    warn!(
2232                        epoch,
2233                        horizon,
2234                        error = %e,
2235                        "[LDB-6026] state backend prune failed; old partials will linger"
2236                    );
2237                }
2238                if let Some(ref ds) = self.decision_store {
2239                    if let Err(e) = ds.prune_before(horizon).await {
2240                        warn!(epoch, horizon, error = %e, "decision prune failed");
2241                    }
2242                }
2243            }
2244        }
2245
2246        let next_epoch = self.allocator.peek().0;
2247        let begin_epoch_error = match self.begin_epoch_for_sinks(next_epoch).await {
2248            Ok(()) => None,
2249            Err(e) => {
2250                error!(
2251                    next_epoch,
2252                    error = %e,
2253                    "[LDB-6015] failed to begin next epoch — writes will be non-transactional"
2254                );
2255                Some(e.to_string())
2256            }
2257        };
2258
2259        info!(
2260            checkpoint_id,
2261            epoch,
2262            duration_ms = duration.as_millis(),
2263            "checkpoint completed"
2264        );
2265
2266        // The checkpoint itself succeeded (state persisted, sinks committed).
2267        // begin_epoch failure for the *next* epoch is reported as a warning
2268        // but does not retroactively fail the completed checkpoint.
2269        self.pending_vnode_states.clear();
2270        Ok(CheckpointResult {
2271            success: true,
2272            checkpoint_id,
2273            epoch,
2274            duration,
2275            error: begin_epoch_error,
2276        })
2277    }
2278
2279    /// Attempts recovery from the latest checkpoint.
2280    ///
2281    /// Creates a [`RecoveryManager`](crate::recovery_manager::RecoveryManager)
2282    /// using the coordinator's store and delegates recovery to it.
2283    /// On success, advances `self.epoch` past the recovered epoch so the
2284    /// next checkpoint gets a fresh epoch number.
2285    ///
2286    /// Returns `Ok(None)` for a fresh start (no checkpoint found).
2287    ///
2288    /// # Errors
2289    ///
2290    /// Returns `DbError::Checkpoint` if the store itself fails.
2291    pub async fn recover(
2292        &mut self,
2293    ) -> Result<Option<crate::recovery_manager::RecoveredState>, DbError> {
2294        use crate::recovery_manager::RecoveryManager;
2295
2296        let mgr = RecoveryManager::new(&*self.store);
2297        // Sources and table sources are restored by the pipeline lifecycle
2298        // (via SourceRegistration.restore_checkpoint), not by the coordinator.
2299        // Pass empty slices — the coordinator only manages sink recovery here.
2300        let result = mgr.recover(&[], &self.sinks, &[]).await?;
2301
2302        if let Some(ref recovered) = result {
2303            // Monotonic: the recovered (committed) epoch may be older
2304            // than a Pending manifest this node seeded its ids from.
2305            self.allocator
2306                .advance_to(recovered.epoch() + 1, recovered.manifest.checkpoint_id + 1);
2307            let (epoch, checkpoint_id) = self.allocator.peek();
2308            info!(epoch, checkpoint_id, "coordinator epoch set after recovery");
2309        }
2310
2311        Ok(result)
2312    }
2313
2314    /// Loads the latest manifest from the store.
2315    ///
2316    /// # Errors
2317    ///
2318    /// Returns `DbError::Checkpoint` on store errors.
2319    pub async fn load_latest_manifest(&self) -> Result<Option<CheckpointManifest>, DbError> {
2320        self.store.load_latest().await.map_err(DbError::from)
2321    }
2322}
2323
2324impl std::fmt::Debug for CheckpointCoordinator {
2325    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2326        f.debug_struct("CheckpointCoordinator")
2327            .field("allocator", &self.allocator)
2328            .field("phase", &self.phase)
2329            .field("sinks", &self.sinks.len())
2330            .field("completed", &self.checkpoints_completed)
2331            .field("failed", &self.checkpoints_failed)
2332            .finish_non_exhaustive()
2333    }
2334}
2335
2336/// Fixed-size ring buffer for duration percentile tracking.
2337///
2338/// Stores the last `CAPACITY` durations in **microseconds** and computes
2339/// p50/p95/p99 via sorted extraction. No heap allocation after construction.
2340#[derive(Clone)]
2341pub struct DurationHistogram {
2342    /// Ring buffer of durations in microseconds.
2343    samples: Box<[u64; Self::CAPACITY]>,
2344    /// Write cursor (wraps at `CAPACITY`).
2345    cursor: usize,
2346    /// Total samples written (may exceed `CAPACITY`).
2347    count: u64,
2348}
2349
2350impl Default for DurationHistogram {
2351    fn default() -> Self {
2352        Self::new()
2353    }
2354}
2355
2356impl DurationHistogram {
2357    const CAPACITY: usize = 100;
2358
2359    /// Creates an empty histogram.
2360    #[must_use]
2361    pub fn new() -> Self {
2362        Self {
2363            samples: Box::new([0; Self::CAPACITY]),
2364            cursor: 0,
2365            count: 0,
2366        }
2367    }
2368
2369    /// Records a duration sample (stored in microseconds).
2370    pub fn record(&mut self, duration: Duration) {
2371        #[allow(clippy::cast_possible_truncation)]
2372        let us = duration.as_micros() as u64;
2373        self.samples[self.cursor] = us;
2374        self.cursor = (self.cursor + 1) % Self::CAPACITY;
2375        self.count += 1;
2376    }
2377
2378    /// Returns `true` if no samples have been recorded.
2379    #[must_use]
2380    pub fn is_empty(&self) -> bool {
2381        self.count == 0
2382    }
2383
2384    /// Returns the number of recorded samples (up to `CAPACITY`).
2385    #[must_use]
2386    pub fn len(&self) -> usize {
2387        if self.count >= Self::CAPACITY as u64 {
2388            Self::CAPACITY
2389        } else {
2390            // SAFETY: count < CAPACITY (100), which always fits in usize.
2391            #[allow(clippy::cast_possible_truncation)]
2392            {
2393                self.count as usize
2394            }
2395        }
2396    }
2397
2398    /// Computes a percentile (0.0–1.0) from recorded samples.
2399    ///
2400    /// Returns 0 if no samples have been recorded.
2401    #[must_use]
2402    pub fn percentile(&self, p: f64) -> u64 {
2403        let n = self.len();
2404        if n == 0 {
2405            return 0;
2406        }
2407        let mut sorted: Vec<u64> = self.samples[..n].to_vec();
2408        sorted.sort_unstable();
2409        #[allow(
2410            clippy::cast_possible_truncation,
2411            clippy::cast_sign_loss,
2412            clippy::cast_precision_loss
2413        )]
2414        let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
2415        sorted[idx]
2416    }
2417
2418    /// Returns (p50, p95, p99) in microseconds. Sorts once.
2419    #[must_use]
2420    pub fn percentiles(&self) -> (u64, u64, u64) {
2421        let n = self.len();
2422        if n == 0 {
2423            return (0, 0, 0);
2424        }
2425        let mut sorted: Vec<u64> = self.samples[..n].to_vec();
2426        sorted.sort_unstable();
2427        #[allow(
2428            clippy::cast_possible_truncation,
2429            clippy::cast_sign_loss,
2430            clippy::cast_precision_loss
2431        )]
2432        let at = |p: f64| -> u64 {
2433            let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
2434            sorted[idx]
2435        };
2436        (at(0.50), at(0.95), at(0.99))
2437    }
2438}
2439
2440impl std::fmt::Debug for DurationHistogram {
2441    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2442        let (p50, p95, p99) = self.percentiles();
2443        f.debug_struct("DurationHistogram")
2444            .field("samples_len", &self.samples.len())
2445            .field("cursor", &self.cursor)
2446            .field("count", &self.count)
2447            .field("p50_us", &p50)
2448            .field("p95_us", &p95)
2449            .field("p99_us", &p99)
2450            .finish()
2451    }
2452}
2453
2454/// Checkpoint performance statistics.
2455#[derive(Debug, Clone, serde::Serialize)]
2456pub struct CheckpointStats {
2457    /// Total completed checkpoints.
2458    pub completed: u64,
2459    /// Total failed checkpoints.
2460    pub failed: u64,
2461    /// Duration of the last checkpoint.
2462    pub last_duration: Option<Duration>,
2463    /// p50 checkpoint duration in milliseconds.
2464    pub duration_p50_ms: u64,
2465    /// p95 checkpoint duration in milliseconds.
2466    pub duration_p95_ms: u64,
2467    /// p99 checkpoint duration in milliseconds.
2468    pub duration_p99_ms: u64,
2469    /// Total bytes written across all checkpoints.
2470    pub total_bytes_written: u64,
2471    /// Current checkpoint phase.
2472    pub current_phase: CheckpointPhase,
2473    /// Current epoch number.
2474    pub current_epoch: u64,
2475}
2476
2477/// Converts a `SourceCheckpoint` to a `ConnectorCheckpoint`.
2478#[must_use]
2479pub fn source_to_connector_checkpoint(cp: &SourceCheckpoint) -> ConnectorCheckpoint {
2480    ConnectorCheckpoint {
2481        offsets: cp.offsets().clone(),
2482        epoch: cp.epoch(),
2483        metadata: cp.metadata().clone(),
2484    }
2485}
2486
2487/// Converts a `ConnectorCheckpoint` back to a `SourceCheckpoint`.
2488#[must_use]
2489pub fn connector_to_source_checkpoint(cp: &ConnectorCheckpoint) -> SourceCheckpoint {
2490    let mut source_cp = SourceCheckpoint::with_offsets(cp.epoch, cp.offsets.clone());
2491    for (k, v) in &cp.metadata {
2492        source_cp.set_metadata(k.clone(), v.clone());
2493    }
2494    source_cp
2495}
2496
2497#[cfg(test)]
2498mod tests {
2499    use super::*;
2500    use laminar_core::storage::checkpoint_store::FileSystemCheckpointStore;
2501
2502    async fn make_coordinator(dir: &std::path::Path) -> CheckpointCoordinator {
2503        let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
2504        CheckpointCoordinator::new(CheckpointConfig::default(), store)
2505            .await
2506            .unwrap()
2507    }
2508
2509    /// Coordinator whose restorable gate gives up quickly — for tests
2510    /// that exercise a gate *miss* (the default 30s poll would stall
2511    /// the suite).
2512    async fn make_coordinator_with_fast_gate(dir: &std::path::Path) -> CheckpointCoordinator {
2513        let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
2514        let config = CheckpointConfig {
2515            restorable_gate_timeout: Duration::from_millis(250),
2516            ..CheckpointConfig::default()
2517        };
2518        CheckpointCoordinator::new(config, store).await.unwrap()
2519    }
2520
2521    #[tokio::test]
2522    async fn test_coordinator_new() {
2523        let dir = tempfile::tempdir().unwrap();
2524        let coord = make_coordinator(dir.path()).await;
2525
2526        assert_eq!(coord.epoch(), 1);
2527        assert_eq!(coord.next_checkpoint_id(), 1);
2528        assert_eq!(coord.phase(), CheckpointPhase::Idle);
2529    }
2530
2531    #[tokio::test]
2532    async fn test_coordinator_resumes_from_stored_checkpoint() {
2533        let dir = tempfile::tempdir().unwrap();
2534
2535        // Save a checkpoint manually
2536        let store = FileSystemCheckpointStore::new(dir.path(), 3);
2537        let m = CheckpointManifest::new(5, 10);
2538        store.save(&m).await.unwrap();
2539
2540        // Coordinator should resume from epoch 11, checkpoint_id 6
2541        let coord = make_coordinator(dir.path()).await;
2542        assert_eq!(coord.epoch(), 11);
2543        assert_eq!(coord.next_checkpoint_id(), 6);
2544    }
2545
2546    #[test]
2547    fn test_checkpoint_phase_display() {
2548        assert_eq!(CheckpointPhase::Idle.to_string(), "Idle");
2549        assert_eq!(CheckpointPhase::Snapshotting.to_string(), "Snapshotting");
2550        assert_eq!(CheckpointPhase::PreCommitting.to_string(), "PreCommitting");
2551        assert_eq!(CheckpointPhase::Persisting.to_string(), "Persisting");
2552        assert_eq!(CheckpointPhase::Committing.to_string(), "Committing");
2553    }
2554
2555    #[test]
2556    fn test_source_to_connector_checkpoint() {
2557        let mut cp = SourceCheckpoint::new(5);
2558        cp.set_offset("partition-0", "1234");
2559        cp.set_metadata("topic", "events");
2560
2561        let cc = source_to_connector_checkpoint(&cp);
2562        assert_eq!(cc.epoch, 5);
2563        assert_eq!(cc.offsets.get("partition-0"), Some(&"1234".into()));
2564        assert_eq!(cc.metadata.get("topic"), Some(&"events".into()));
2565    }
2566
2567    #[test]
2568    fn test_connector_to_source_checkpoint() {
2569        let cc = ConnectorCheckpoint {
2570            offsets: HashMap::from([("lsn".into(), "0/ABCD".into())]),
2571            epoch: 3,
2572            metadata: HashMap::from([("type".into(), "postgres".into())]),
2573        };
2574
2575        let cp = connector_to_source_checkpoint(&cc);
2576        assert_eq!(cp.epoch(), 3);
2577        assert_eq!(cp.get_offset("lsn"), Some("0/ABCD"));
2578        assert_eq!(cp.get_metadata("type"), Some("postgres"));
2579    }
2580
2581    #[tokio::test]
2582    async fn test_stats_initial() {
2583        let dir = tempfile::tempdir().unwrap();
2584        let coord = make_coordinator(dir.path()).await;
2585        let stats = coord.stats();
2586
2587        assert_eq!(stats.completed, 0);
2588        assert_eq!(stats.failed, 0);
2589        assert!(stats.last_duration.is_none());
2590        assert_eq!(stats.duration_p50_ms, 0);
2591        assert_eq!(stats.duration_p95_ms, 0);
2592        assert_eq!(stats.duration_p99_ms, 0);
2593        assert_eq!(stats.current_phase, CheckpointPhase::Idle);
2594    }
2595
2596    #[tokio::test]
2597    async fn test_checkpoint_no_sources_no_sinks() {
2598        let dir = tempfile::tempdir().unwrap();
2599        let mut coord = make_coordinator(dir.path()).await;
2600
2601        let result = coord
2602            .checkpoint(CheckpointRequest {
2603                watermark: Some(1000),
2604                ..CheckpointRequest::default()
2605            })
2606            .await
2607            .unwrap();
2608
2609        assert!(result.success);
2610        assert_eq!(result.checkpoint_id, 1);
2611        assert_eq!(result.epoch, 1);
2612
2613        // Verify manifest was persisted
2614        let loaded = coord.store().load_latest().await.unwrap().unwrap();
2615        assert_eq!(loaded.checkpoint_id, 1);
2616        assert_eq!(loaded.epoch, 1);
2617        assert_eq!(loaded.watermark, Some(1000));
2618
2619        // Second checkpoint should increment
2620        let result2 = coord
2621            .checkpoint(CheckpointRequest {
2622                watermark: Some(2000),
2623                ..CheckpointRequest::default()
2624            })
2625            .await
2626            .unwrap();
2627
2628        assert!(result2.success);
2629        assert_eq!(result2.checkpoint_id, 2);
2630        assert_eq!(result2.epoch, 2);
2631
2632        let stats = coord.stats();
2633        assert_eq!(stats.completed, 2);
2634        assert_eq!(stats.failed, 0);
2635    }
2636
2637    #[tokio::test]
2638    async fn test_checkpoint_with_operator_states() {
2639        let dir = tempfile::tempdir().unwrap();
2640        let mut coord = make_coordinator(dir.path()).await;
2641
2642        let mut ops = HashMap::new();
2643        ops.insert(
2644            "window-agg".into(),
2645            bytes::Bytes::from_static(b"state-data"),
2646        );
2647        ops.insert("filter".into(), bytes::Bytes::from_static(b"filter-state"));
2648
2649        let result = coord
2650            .checkpoint(CheckpointRequest {
2651                operator_states: ops,
2652                ..CheckpointRequest::default()
2653            })
2654            .await
2655            .unwrap();
2656
2657        assert!(result.success);
2658
2659        let loaded = coord.store().load_latest().await.unwrap().unwrap();
2660        assert_eq!(loaded.operator_states.len(), 2);
2661
2662        let window_op = loaded.operator_states.get("window-agg").unwrap();
2663        assert_eq!(window_op.decode_inline().unwrap(), b"state-data");
2664    }
2665
2666    #[tokio::test]
2667    async fn test_checkpoint_with_table_store_path() {
2668        let dir = tempfile::tempdir().unwrap();
2669        let mut coord = make_coordinator(dir.path()).await;
2670
2671        let result = coord
2672            .checkpoint(CheckpointRequest {
2673                table_store_checkpoint_path: Some("/tmp/table_store_cp".into()),
2674                ..CheckpointRequest::default()
2675            })
2676            .await
2677            .unwrap();
2678
2679        assert!(result.success);
2680
2681        let loaded = coord.store().load_latest().await.unwrap().unwrap();
2682        assert_eq!(
2683            loaded.table_store_checkpoint_path.as_deref(),
2684            Some("/tmp/table_store_cp")
2685        );
2686    }
2687
2688    #[tokio::test]
2689    async fn test_load_latest_manifest_empty() {
2690        let dir = tempfile::tempdir().unwrap();
2691        let coord = make_coordinator(dir.path()).await;
2692        assert!(coord.load_latest_manifest().await.unwrap().is_none());
2693    }
2694
2695    #[tokio::test]
2696    async fn test_coordinator_debug() {
2697        let dir = tempfile::tempdir().unwrap();
2698        let coord = make_coordinator(dir.path()).await;
2699        let debug = format!("{coord:?}");
2700        assert!(debug.contains("CheckpointCoordinator"));
2701        assert!(debug.contains("epoch: 1"));
2702    }
2703
2704    #[tokio::test]
2705    async fn test_checkpoint_emits_metrics_on_success() {
2706        let dir = tempfile::tempdir().unwrap();
2707        let mut coord = make_coordinator(dir.path()).await;
2708
2709        let registry = prometheus::Registry::new();
2710        let prom = Arc::new(crate::engine_metrics::EngineMetrics::new(&registry));
2711        coord.set_metrics(Arc::clone(&prom));
2712
2713        let result = coord
2714            .checkpoint(CheckpointRequest {
2715                watermark: Some(1000),
2716                ..CheckpointRequest::default()
2717            })
2718            .await
2719            .unwrap();
2720
2721        assert!(result.success);
2722        assert_eq!(prom.checkpoints_completed.get(), 1);
2723        assert_eq!(prom.checkpoints_failed.get(), 0);
2724        assert_eq!(prom.checkpoint_epoch.get(), 1);
2725
2726        // Second checkpoint
2727        let result2 = coord
2728            .checkpoint(CheckpointRequest {
2729                watermark: Some(2000),
2730                ..CheckpointRequest::default()
2731            })
2732            .await
2733            .unwrap();
2734
2735        assert!(result2.success);
2736        assert_eq!(prom.checkpoints_completed.get(), 2);
2737        assert_eq!(prom.checkpoint_epoch.get(), 2);
2738    }
2739
2740    #[tokio::test]
2741    async fn test_checkpoint_without_metrics() {
2742        // Verify checkpoint works fine without metrics set
2743        let dir = tempfile::tempdir().unwrap();
2744        let mut coord = make_coordinator(dir.path()).await;
2745
2746        let result = coord
2747            .checkpoint(CheckpointRequest::default())
2748            .await
2749            .unwrap();
2750
2751        assert!(result.success);
2752        // No panics — metrics emission is a no-op
2753    }
2754
2755    #[test]
2756    fn test_histogram_empty() {
2757        let h = DurationHistogram::new();
2758        assert_eq!(h.len(), 0);
2759        assert_eq!(h.percentile(0.50), 0);
2760        assert_eq!(h.percentile(0.99), 0);
2761        let (p50, p95, p99) = h.percentiles();
2762        assert_eq!((p50, p95, p99), (0, 0, 0));
2763    }
2764
2765    #[test]
2766    fn test_histogram_single_sample() {
2767        let mut h = DurationHistogram::new();
2768        h.record(Duration::from_millis(42));
2769        assert_eq!(h.len(), 1);
2770        // 42ms = 42_000μs
2771        assert_eq!(h.percentile(0.50), 42_000);
2772        assert_eq!(h.percentile(0.99), 42_000);
2773    }
2774
2775    #[test]
2776    fn test_histogram_sub_millisecond() {
2777        let mut h = DurationHistogram::new();
2778        // 500μs — previously truncated to 0 with as_millis()
2779        h.record(Duration::from_micros(500));
2780        assert_eq!(h.percentile(0.50), 500);
2781        assert_eq!(h.percentile(0.99), 500);
2782    }
2783
2784    #[test]
2785    fn test_histogram_percentiles() {
2786        let mut h = DurationHistogram::new();
2787        // Record 1..=100ms in order → 1000..=100_000 μs.
2788        for i in 1..=100 {
2789            h.record(Duration::from_millis(i));
2790        }
2791        assert_eq!(h.len(), 100);
2792
2793        let p50 = h.percentile(0.50);
2794        let p95 = h.percentile(0.95);
2795        let p99 = h.percentile(0.99);
2796
2797        // Values in μs: 1000..=100_000
2798        //   p50 ≈ 50_000, p95 ≈ 95_000, p99 ≈ 99_000
2799        assert!((49_000..=51_000).contains(&p50), "p50={p50}");
2800        assert!((94_000..=96_000).contains(&p95), "p95={p95}");
2801        assert!((98_000..=100_000).contains(&p99), "p99={p99}");
2802    }
2803
2804    #[test]
2805    fn test_histogram_wraps_ring_buffer() {
2806        let mut h = DurationHistogram::new();
2807        // Write 150 samples — first 50 are overwritten.
2808        for i in 1..=150 {
2809            h.record(Duration::from_millis(i));
2810        }
2811        assert_eq!(h.len(), 100);
2812        assert_eq!(h.count, 150);
2813
2814        // Only samples 51..=150 remain in the buffer (51_000..=150_000 μs).
2815        let p50 = h.percentile(0.50);
2816        assert!((99_000..=101_000).contains(&p50), "p50={p50}");
2817    }
2818
2819    #[tokio::test]
2820    async fn test_sidecar_round_trip() {
2821        let dir = tempfile::tempdir().unwrap();
2822        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2823        let config = CheckpointConfig {
2824            state_inline_threshold: 100, // 100 bytes threshold
2825            ..CheckpointConfig::default()
2826        };
2827        let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
2828
2829        // Small state stays inline, large state goes to sidecar
2830        let mut ops = HashMap::new();
2831        ops.insert("small".into(), bytes::Bytes::from(vec![0xAAu8; 50]));
2832        ops.insert("large".into(), bytes::Bytes::from(vec![0xBBu8; 200]));
2833
2834        let result = coord
2835            .checkpoint(CheckpointRequest {
2836                operator_states: ops,
2837                ..CheckpointRequest::default()
2838            })
2839            .await
2840            .unwrap();
2841        assert!(result.success);
2842
2843        // Verify manifest
2844        let loaded = coord.store().load_latest().await.unwrap().unwrap();
2845        let small_op = loaded.operator_states.get("small").unwrap();
2846        assert!(!small_op.external, "small state should be inline");
2847        assert_eq!(small_op.decode_inline().unwrap(), vec![0xAAu8; 50]);
2848
2849        let large_op = loaded.operator_states.get("large").unwrap();
2850        assert!(large_op.external, "large state should be external");
2851        assert_eq!(large_op.external_length, 200);
2852
2853        // Verify sidecar file exists and has correct data
2854        let state_data = coord.store().load_state_data(1).await.unwrap().unwrap();
2855        assert_eq!(state_data.len(), 200);
2856        assert!(state_data.iter().all(|&b| b == 0xBB));
2857    }
2858
2859    #[tokio::test]
2860    async fn test_all_inline_no_sidecar() {
2861        let dir = tempfile::tempdir().unwrap();
2862        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2863        let config = CheckpointConfig::default(); // 1MB threshold
2864        let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
2865
2866        let mut ops = HashMap::new();
2867        ops.insert("op1".into(), bytes::Bytes::from_static(b"small-state"));
2868
2869        let result = coord
2870            .checkpoint(CheckpointRequest {
2871                operator_states: ops,
2872                ..CheckpointRequest::default()
2873            })
2874            .await
2875            .unwrap();
2876        assert!(result.success);
2877
2878        // No sidecar file
2879        assert!(coord.store().load_state_data(1).await.unwrap().is_none());
2880    }
2881
2882    // Durability gate tests.
2883
2884    #[tokio::test]
2885    async fn durability_gate_skipped_when_vnode_set_empty() {
2886        // With no state backend installed AND empty vnode set, the commit
2887        // path behaves as before. Regression guard: the durability gate
2888        // must not change single-instance semantics.
2889        let dir = tempfile::tempdir().unwrap();
2890        let mut coord = make_coordinator(dir.path()).await;
2891        let result = coord
2892            .checkpoint(CheckpointRequest::default())
2893            .await
2894            .unwrap();
2895        assert!(result.success, "baseline checkpoint must succeed");
2896    }
2897
2898    #[tokio::test]
2899    async fn bridge_writes_markers_and_gate_passes() {
2900        use laminar_core::state::InProcessBackend;
2901        let dir = tempfile::tempdir().unwrap();
2902        let mut coord = make_coordinator(dir.path()).await;
2903        let backend = Arc::new(InProcessBackend::new(4));
2904        coord.set_state_backend(backend.clone());
2905        coord.set_vnode_set(vec![0, 1, 2, 3]);
2906
2907        let result = coord
2908            .checkpoint(CheckpointRequest::default())
2909            .await
2910            .unwrap();
2911        assert!(result.success, "bridge writes markers → gate passes");
2912        // Every owned vnode has a marker for the completed epoch.
2913        for v in 0..4 {
2914            assert!(
2915                backend.read_partial(v, 1).await.unwrap().is_some(),
2916                "bridge should have written marker for vnode {v}",
2917            );
2918        }
2919    }
2920
2921    #[cfg(feature = "cluster")]
2922    #[tokio::test]
2923    async fn reconcile_announces_abort_when_no_decision_store() {
2924        // Fallback path: if no decision store is wired (e.g. legacy
2925        // deployments), absence of a marker == Abort. This is the
2926        // pre-decision-store behavior preserved for compatibility.
2927        use laminar_core::cluster::control::{
2928            BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase, ANNOUNCEMENT_KEY,
2929        };
2930        use laminar_core::cluster::discovery::NodeId;
2931        use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
2932        use tokio::sync::watch;
2933
2934        let dir = tempfile::tempdir().unwrap();
2935        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2936        let mut orphan = CheckpointManifest::new(42, 7);
2937        orphan
2938            .sink_commit_statuses
2939            .insert("kafka_out".into(), SinkCommitStatus::Pending);
2940        store.save_with_state(&orphan, None).await.unwrap();
2941
2942        let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2943            .await
2944            .unwrap();
2945        let self_id = NodeId(1);
2946        let kv = Arc::new(InMemoryKv::new(self_id));
2947        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2948        let (_tx, rx) = watch::channel(Vec::new());
2949        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2950        let mut coord = coord;
2951        coord.set_cluster_controller(controller);
2952
2953        coord.reconcile_prepared_on_init().await;
2954
2955        let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2956        let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
2957        assert_eq!(ann.phase, Phase::Abort);
2958        assert_eq!(ann.epoch, 7);
2959        assert_eq!(ann.checkpoint_id, 42);
2960    }
2961
2962    #[cfg(feature = "cluster")]
2963    #[tokio::test]
2964    async fn reconcile_announces_commit_when_marker_present() {
2965        use laminar_core::cluster::control::{
2966            BarrierAnnouncement, CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
2967            Phase, ANNOUNCEMENT_KEY,
2968        };
2969        use laminar_core::cluster::discovery::NodeId;
2970        use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
2971        use object_store::local::LocalFileSystem;
2972        use tokio::sync::watch;
2973
2974        let ckpt_dir = tempfile::tempdir().unwrap();
2975        let decision_dir = tempfile::tempdir().unwrap();
2976        let store = Box::new(FileSystemCheckpointStore::new(ckpt_dir.path(), 3));
2977        let mut orphan = CheckpointManifest::new(42, 7);
2978        orphan
2979            .sink_commit_statuses
2980            .insert("kafka_out".into(), SinkCommitStatus::Pending);
2981        store.save_with_state(&orphan, None).await.unwrap();
2982
2983        let decision_os: Arc<dyn object_store::ObjectStore> =
2984            Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2985        let decision_store = Arc::new(CheckpointDecisionStore::new(decision_os));
2986        decision_store.record_committed(7).await.unwrap();
2987
2988        let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2989            .await
2990            .unwrap();
2991        let self_id = NodeId(1);
2992        let kv = Arc::new(InMemoryKv::new(self_id));
2993        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2994        let (_tx, rx) = watch::channel(Vec::new());
2995        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2996        let mut coord = coord;
2997        coord.set_cluster_controller(controller);
2998        coord.set_decision_store(decision_store);
2999
3000        coord.reconcile_prepared_on_init().await;
3001
3002        let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
3003        let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
3004        assert_eq!(ann.phase, Phase::Commit);
3005        assert_eq!(ann.epoch, 7);
3006        assert_eq!(ann.checkpoint_id, 42);
3007    }
3008
3009    #[cfg(feature = "cluster")]
3010    #[tokio::test]
3011    async fn reconcile_announces_abort_when_marker_missing() {
3012        // Decision store is wired but has no marker for this epoch —
3013        // the "leader crashed before commit point" case.
3014        use laminar_core::cluster::control::{
3015            BarrierAnnouncement, CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
3016            Phase, ANNOUNCEMENT_KEY,
3017        };
3018        use laminar_core::cluster::discovery::NodeId;
3019        use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
3020        use object_store::local::LocalFileSystem;
3021        use tokio::sync::watch;
3022
3023        let ckpt_dir = tempfile::tempdir().unwrap();
3024        let decision_dir = tempfile::tempdir().unwrap();
3025        let store = Box::new(FileSystemCheckpointStore::new(ckpt_dir.path(), 3));
3026        let mut orphan = CheckpointManifest::new(11, 3);
3027        orphan
3028            .sink_commit_statuses
3029            .insert("out".into(), SinkCommitStatus::Pending);
3030        store.save_with_state(&orphan, None).await.unwrap();
3031
3032        let decision_os: Arc<dyn object_store::ObjectStore> =
3033            Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
3034        let decision_store = Arc::new(CheckpointDecisionStore::new(decision_os));
3035
3036        let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
3037            .await
3038            .unwrap();
3039        let self_id = NodeId(1);
3040        let kv = Arc::new(InMemoryKv::new(self_id));
3041        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3042        let (_tx, rx) = watch::channel(Vec::new());
3043        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
3044        let mut coord = coord;
3045        coord.set_cluster_controller(controller);
3046        coord.set_decision_store(decision_store);
3047
3048        coord.reconcile_prepared_on_init().await;
3049
3050        let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
3051        let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
3052        assert_eq!(ann.phase, Phase::Abort);
3053        assert_eq!(ann.epoch, 3);
3054    }
3055
3056    #[cfg(feature = "cluster")]
3057    #[tokio::test]
3058    async fn reconcile_silent_when_manifest_clean() {
3059        use laminar_core::cluster::control::{
3060            ClusterController, ClusterKv, InMemoryKv, ANNOUNCEMENT_KEY,
3061        };
3062        use laminar_core::cluster::discovery::NodeId;
3063        use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
3064        use tokio::sync::watch;
3065
3066        let dir = tempfile::tempdir().unwrap();
3067        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
3068        let mut clean = CheckpointManifest::new(5, 3);
3069        clean
3070            .sink_commit_statuses
3071            .insert("out".into(), SinkCommitStatus::Committed);
3072        store.save_with_state(&clean, None).await.unwrap();
3073
3074        let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
3075            .await
3076            .unwrap();
3077        let self_id = NodeId(1);
3078        let kv = Arc::new(InMemoryKv::new(self_id));
3079        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3080        let (_tx, rx) = watch::channel(Vec::new());
3081        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
3082        let mut coord = coord;
3083        coord.set_cluster_controller(controller);
3084
3085        coord.reconcile_prepared_on_init().await;
3086
3087        // No announcement emitted.
3088        assert!(kv.read_from(self_id, ANNOUNCEMENT_KEY).await.is_none());
3089    }
3090
3091    #[cfg(feature = "cluster")]
3092    #[tokio::test]
3093    async fn follower_checkpoint_commits_on_leader_commit() {
3094        use laminar_core::cluster::control::{
3095            BarrierAck, BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase,
3096            ACK_KEY, ANNOUNCEMENT_KEY,
3097        };
3098        use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
3099        use tokio::sync::watch;
3100
3101        let dir = tempfile::tempdir().unwrap();
3102        let mut coord = make_coordinator(dir.path()).await;
3103
3104        let leader_id = NodeId(1);
3105        let follower_id = NodeId(7);
3106
3107        // Follower's KV sees both its own writes and a seeded view of the
3108        // leader's announcements. `members_rx` includes the leader so
3109        // `current_leader()` picks the lowest id (the leader, not self).
3110        let kv = Arc::new(InMemoryKv::new(follower_id));
3111        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3112        let leader_info = NodeInfo {
3113            id: leader_id,
3114            name: "leader".into(),
3115            rpc_address: String::new(),
3116            raft_address: String::new(),
3117            state: NodeState::Active,
3118            metadata: NodeMetadata::default(),
3119            last_heartbeat_ms: 0,
3120        };
3121        let (_tx, rx) = watch::channel(vec![leader_info]);
3122        let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
3123        coord.set_cluster_controller(controller);
3124
3125        // Leader has already announced PREPARE and COMMIT (simulates
3126        // a fast-gossip scenario; follower sees both on its first poll).
3127        let prepare_json = serde_json::to_string(&BarrierAnnouncement {
3128            epoch: 1,
3129            checkpoint_id: 1,
3130            phase: Phase::Prepare,
3131            flags: 0,
3132            min_watermark_ms: None,
3133        })
3134        .unwrap();
3135        let commit_json = serde_json::to_string(&BarrierAnnouncement {
3136            epoch: 1,
3137            checkpoint_id: 1,
3138            phase: Phase::Commit,
3139            flags: 0,
3140            min_watermark_ms: None,
3141        })
3142        .unwrap();
3143        // Overwrite the prepare with commit — observe_barrier reads the
3144        // latest value. Real gossip shows both in order; for the unit
3145        // test, landing on Commit is enough for the decision loop.
3146        kv.seed(leader_id, ANNOUNCEMENT_KEY, prepare_json);
3147        kv.seed(leader_id, ANNOUNCEMENT_KEY, commit_json);
3148
3149        let ann = BarrierAnnouncement {
3150            epoch: 1,
3151            checkpoint_id: 1,
3152            phase: Phase::Prepare,
3153            flags: 0,
3154            min_watermark_ms: None,
3155        };
3156        let committed = coord
3157            .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(2))
3158            .await
3159            .unwrap();
3160        assert!(committed, "follower should commit on leader's Commit");
3161
3162        // Follower's ack landed in its own KV.
3163        let ack_raw = kv.read_from(follower_id, ACK_KEY).await.unwrap();
3164        let ack: BarrierAck = serde_json::from_str(&ack_raw).unwrap();
3165        assert_eq!(ack.epoch, 1);
3166        assert!(ack.ok, "prepare succeeded, ack should be ok");
3167
3168        // Follower's manifest is on disk at the leader's epoch.
3169        let stored = coord.store().load_latest().await.unwrap().unwrap();
3170        assert_eq!(stored.epoch, 1);
3171    }
3172
3173    #[cfg(feature = "cluster")]
3174    #[tokio::test]
3175    async fn follower_checkpoint_rolls_back_on_leader_abort() {
3176        use laminar_core::cluster::control::{
3177            BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase, ANNOUNCEMENT_KEY,
3178        };
3179        use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
3180        use tokio::sync::watch;
3181
3182        let dir = tempfile::tempdir().unwrap();
3183        let mut coord = make_coordinator(dir.path()).await;
3184
3185        let leader_id = NodeId(1);
3186        let follower_id = NodeId(9);
3187        let kv = Arc::new(InMemoryKv::new(follower_id));
3188        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3189        let leader_info = NodeInfo {
3190            id: leader_id,
3191            name: "leader".into(),
3192            rpc_address: String::new(),
3193            raft_address: String::new(),
3194            state: NodeState::Active,
3195            metadata: NodeMetadata::default(),
3196            last_heartbeat_ms: 0,
3197        };
3198        let (_tx, rx) = watch::channel(vec![leader_info]);
3199        let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
3200        coord.set_cluster_controller(controller);
3201
3202        let abort_json = serde_json::to_string(&BarrierAnnouncement {
3203            epoch: 1,
3204            checkpoint_id: 1,
3205            phase: Phase::Abort,
3206            flags: 0,
3207            min_watermark_ms: None,
3208        })
3209        .unwrap();
3210        kv.seed(leader_id, ANNOUNCEMENT_KEY, abort_json);
3211
3212        let ann = BarrierAnnouncement {
3213            epoch: 1,
3214            checkpoint_id: 1,
3215            phase: Phase::Prepare,
3216            flags: 0,
3217            min_watermark_ms: None,
3218        };
3219        let committed = coord
3220            .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(2))
3221            .await
3222            .unwrap();
3223        assert!(!committed, "follower should roll back on leader's Abort");
3224    }
3225
3226    /// KV that records every announcement written, preserving order —
3227    /// the single-slot `InMemoryKv` only keeps the latest.
3228    #[cfg(feature = "cluster")]
3229    struct RecordingKv {
3230        inner: laminar_core::cluster::control::InMemoryKv,
3231        announcements: Arc<parking_lot::Mutex<Vec<String>>>,
3232    }
3233
3234    #[cfg(feature = "cluster")]
3235    #[async_trait::async_trait]
3236    impl laminar_core::cluster::control::ClusterKv for RecordingKv {
3237        async fn write(&self, key: &str, value: String) {
3238            if key == laminar_core::cluster::control::ANNOUNCEMENT_KEY {
3239                self.announcements.lock().push(value.clone());
3240            }
3241            self.inner.write(key, value).await;
3242        }
3243        async fn read_from(
3244            &self,
3245            who: laminar_core::cluster::discovery::NodeId,
3246            key: &str,
3247        ) -> Option<String> {
3248            self.inner.read_from(who, key).await
3249        }
3250        async fn scan(&self, key: &str) -> Vec<(laminar_core::cluster::discovery::NodeId, String)> {
3251            self.inner.scan(key).await
3252        }
3253        async fn scan_prefix(
3254            &self,
3255            prefix: &str,
3256        ) -> Vec<(laminar_core::cluster::discovery::NodeId, String, String)> {
3257            self.inner.scan_prefix(prefix).await
3258        }
3259    }
3260
3261    /// Two-level completion: the leader must announce `Aligned`
3262    /// (the pipeline resume gate) after the capture quorum and *before*
3263    /// the durable tail's `Commit`.
3264    #[cfg(feature = "cluster")]
3265    #[tokio::test]
3266    async fn leader_announces_aligned_between_prepare_and_commit() {
3267        use laminar_core::cluster::control::{
3268            BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase,
3269        };
3270        use laminar_core::cluster::discovery::NodeId;
3271        use tokio::sync::watch;
3272
3273        let dir = tempfile::tempdir().unwrap();
3274        let mut coord = make_coordinator(dir.path()).await;
3275
3276        let self_id = NodeId(1);
3277        let announcements = Arc::new(parking_lot::Mutex::new(Vec::new()));
3278        let kv: Arc<dyn ClusterKv> = Arc::new(RecordingKv {
3279            inner: InMemoryKv::new(self_id),
3280            announcements: Arc::clone(&announcements),
3281        });
3282        let (_tx, rx) = watch::channel(Vec::new());
3283        let controller = Arc::new(ClusterController::new(self_id, kv, None, rx));
3284        coord.set_cluster_controller(controller);
3285
3286        let result = coord
3287            .checkpoint(CheckpointRequest::default())
3288            .await
3289            .unwrap();
3290        assert!(result.success);
3291
3292        let phases: Vec<Phase> = announcements
3293            .lock()
3294            .iter()
3295            .map(|json| {
3296                serde_json::from_str::<BarrierAnnouncement>(json)
3297                    .unwrap()
3298                    .phase
3299            })
3300            .collect();
3301        assert_eq!(
3302            phases,
3303            vec![Phase::Prepare, Phase::Aligned, Phase::Commit],
3304            "two-level completion must announce Aligned between Prepare and Commit",
3305        );
3306    }
3307
3308    /// The follower acks at capture (before its durable
3309    /// prepare). If the prepare then fails, a best-effort `ok = false`
3310    /// ack overwrites the capture ack so a still-polling leader can
3311    /// fail the quorum fast instead of waiting for its gate timeout.
3312    #[cfg(feature = "cluster")]
3313    #[tokio::test]
3314    async fn follower_prepare_failure_overwrites_capture_ack() {
3315        use laminar_core::cluster::control::{
3316            BarrierAck, BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase,
3317            ACK_KEY,
3318        };
3319        use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
3320        use laminar_core::state::InProcessBackend;
3321        use tokio::sync::watch;
3322
3323        let dir = tempfile::tempdir().unwrap();
3324        let mut coord = make_coordinator(dir.path()).await;
3325
3326        let leader_id = NodeId(1);
3327        let follower_id = NodeId(7);
3328        let kv = Arc::new(InMemoryKv::new(follower_id));
3329        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3330        let leader_info = NodeInfo {
3331            id: leader_id,
3332            name: "leader".into(),
3333            rpc_address: String::new(),
3334            raft_address: String::new(),
3335            state: NodeState::Active,
3336            metadata: NodeMetadata::default(),
3337            last_heartbeat_ms: 0,
3338        };
3339        let (_tx, rx) = watch::channel(vec![leader_info]);
3340        let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
3341        coord.set_cluster_controller(controller);
3342        // Backend sized for 2 vnodes but the follower claims vnode 99 —
3343        // `write_vnode_partials` (the last prepare step) fails.
3344        coord.set_state_backend(Arc::new(InProcessBackend::new(2)));
3345        coord.set_vnode_set(vec![99]);
3346
3347        let ann = BarrierAnnouncement {
3348            epoch: 1,
3349            checkpoint_id: 1,
3350            phase: Phase::Prepare,
3351            flags: 0,
3352            min_watermark_ms: None,
3353        };
3354        let result = coord
3355            .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(1))
3356            .await;
3357        assert!(result.is_err(), "prepare failure must surface as an error");
3358
3359        let ack_raw = kv.read_from(follower_id, ACK_KEY).await.unwrap();
3360        let ack: BarrierAck = serde_json::from_str(&ack_raw).unwrap();
3361        assert_eq!(ack.epoch, 1);
3362        assert!(!ack.ok, "the failure ack must overwrite the capture ack");
3363        assert!(
3364            ack.error.unwrap().contains("vnode partial write failed"),
3365            "failure ack should carry the prepare error",
3366        );
3367    }
3368
3369    #[cfg(feature = "cluster")]
3370    #[tokio::test]
3371    async fn leader_publishes_cluster_min_watermark_to_controller() {
3372        // On a solo cluster, `await_prepare_quorum` computes the
3373        // cluster-wide min as "leader's local watermark" (no followers
3374        // to fold). This must be mirrored into the controller atomic so
3375        // the leader's own operators consume the same value that
3376        // followers pick up via `observe_barrier(Commit)` — otherwise
3377        // the leader would drive event-time decisions off a watermark
3378        // that none of its peers have acked yet.
3379        use laminar_core::cluster::control::{
3380            CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
3381        };
3382        use laminar_core::cluster::discovery::NodeId;
3383        use object_store::local::LocalFileSystem;
3384        use tokio::sync::watch;
3385
3386        let dir = tempfile::tempdir().unwrap();
3387        let decision_dir = tempfile::tempdir().unwrap();
3388        let mut coord = make_coordinator(dir.path()).await;
3389
3390        let decision_os: Arc<dyn object_store::ObjectStore> =
3391            Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
3392        coord.set_decision_store(Arc::new(CheckpointDecisionStore::new(decision_os)));
3393
3394        let self_id = NodeId(1);
3395        let kv = Arc::new(InMemoryKv::new(self_id));
3396        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3397        let (_tx, rx) = watch::channel(Vec::new());
3398        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
3399        coord.set_cluster_controller(Arc::clone(&controller));
3400
3401        // Pre-condition: controller atomic is at its "unset" sentinel.
3402        assert_eq!(controller.cluster_min_watermark(), None);
3403
3404        // Seed a local watermark on the coordinator and drive a full
3405        // checkpoint. Solo cluster → leader's local value *is* the
3406        // cluster-wide min.
3407        coord.set_local_watermark_ms(Some(12_345));
3408        let result = coord
3409            .checkpoint(CheckpointRequest::default())
3410            .await
3411            .unwrap();
3412        assert!(result.success, "solo-cluster checkpoint should succeed");
3413
3414        assert_eq!(
3415            controller.cluster_min_watermark(),
3416            Some(12_345),
3417            "leader must mirror the cluster-wide min into its controller",
3418        );
3419
3420        // A subsequent checkpoint with a lower local watermark must
3421        // NOT regress the published value — event-time progress is
3422        // monotonic (same invariant the follower path already enforces).
3423        coord.set_local_watermark_ms(Some(42));
3424        let result = coord
3425            .checkpoint(CheckpointRequest::default())
3426            .await
3427            .unwrap();
3428        assert!(result.success);
3429        assert_eq!(
3430            controller.cluster_min_watermark(),
3431            Some(12_345),
3432            "stale local watermark must not lower the published cluster min",
3433        );
3434    }
3435
3436    #[cfg(feature = "cluster")]
3437    #[tokio::test]
3438    async fn leader_announces_prepare_and_commit_on_solo_cluster() {
3439        use laminar_core::cluster::control::{
3440            CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv, Phase,
3441            ANNOUNCEMENT_KEY,
3442        };
3443        use laminar_core::cluster::discovery::NodeId;
3444        use object_store::local::LocalFileSystem;
3445        use tokio::sync::watch;
3446
3447        let dir = tempfile::tempdir().unwrap();
3448        let decision_dir = tempfile::tempdir().unwrap();
3449        let mut coord = make_coordinator(dir.path()).await;
3450
3451        let decision_os: Arc<dyn object_store::ObjectStore> =
3452            Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
3453        coord.set_decision_store(Arc::new(CheckpointDecisionStore::new(decision_os)));
3454
3455        let self_id = NodeId(1);
3456        let kv = Arc::new(InMemoryKv::new(self_id));
3457        let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3458        let (_tx, rx) = watch::channel(Vec::new()); // solo — no peers
3459        let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
3460        coord.set_cluster_controller(controller);
3461
3462        let result = coord
3463            .checkpoint(CheckpointRequest::default())
3464            .await
3465            .unwrap();
3466        assert!(result.success, "solo-cluster checkpoint should succeed");
3467
3468        // The last announce on the leader's KV is COMMIT (PREPARE was
3469        // overwritten in the same slot).
3470        let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
3471        let ann: laminar_core::cluster::control::BarrierAnnouncement =
3472            serde_json::from_str(&raw).unwrap();
3473        assert_eq!(ann.phase, Phase::Commit);
3474        assert_eq!(ann.epoch, result.epoch);
3475    }
3476
3477    #[tokio::test]
3478    async fn gate_checks_full_registry_not_just_owned() {
3479        // Leader owns vnodes {0, 1}. Cluster has 4 vnodes total; a
3480        // follower (simulated by pre-populating half the backend) owns
3481        // {2, 3}. If the follower's markers are missing, the leader's
3482        // gate must fail even though the leader wrote its own.
3483        use laminar_core::state::InProcessBackend;
3484        let dir = tempfile::tempdir().unwrap();
3485        let mut coord = make_coordinator_with_fast_gate(dir.path()).await;
3486        let backend = Arc::new(InProcessBackend::new(4));
3487        coord.set_state_backend(backend.clone());
3488        coord.set_vnode_set(vec![0, 1]); // leader's owned subset
3489        coord.set_gate_vnode_set(vec![0, 1, 2, 3]); // full cluster registry
3490
3491        let result = coord
3492            .checkpoint(CheckpointRequest::default())
3493            .await
3494            .unwrap();
3495        assert!(
3496            !result.success,
3497            "gate must fail when follower markers are missing",
3498        );
3499        let err = result.error.expect("failure produces an error message");
3500        assert!(
3501            err.contains("not all vnodes persisted"),
3502            "expected full-registry gate miss, got: {err}",
3503        );
3504    }
3505
3506    /// Followers ack at capture and upload partials
3507    /// asynchronously, so the leader's restorable gate must *wait* for
3508    /// late partials rather than failing on the first check.
3509    #[tokio::test]
3510    async fn restorable_gate_waits_for_async_follower_uploads() {
3511        use bytes::Bytes;
3512        use laminar_core::state::InProcessBackend;
3513        let dir = tempfile::tempdir().unwrap();
3514        let mut coord = make_coordinator(dir.path()).await;
3515        let backend = Arc::new(InProcessBackend::new(4));
3516        // Leader's own partials are present; the "follower's" vnodes
3517        // {2, 3} land only after a delay, simulating its background
3518        // upload completing while the leader polls.
3519        backend
3520            .write_partial(0, 1, 0, Bytes::from_static(b"leader"))
3521            .await
3522            .unwrap();
3523        backend
3524            .write_partial(1, 1, 0, Bytes::from_static(b"leader"))
3525            .await
3526            .unwrap();
3527        let late = Arc::clone(&backend);
3528        tokio::spawn(async move {
3529            tokio::time::sleep(Duration::from_millis(300)).await;
3530            for v in [2u32, 3] {
3531                late.write_partial(v, 1, 0, Bytes::from_static(b"follower"))
3532                    .await
3533                    .unwrap();
3534            }
3535        });
3536        coord.set_state_backend(backend);
3537        coord.set_vnode_set(vec![0, 1]);
3538        coord.set_gate_vnode_set(vec![0, 1, 2, 3]);
3539
3540        let start = std::time::Instant::now();
3541        coord
3542            .await_restorable_gate(1, &[])
3543            .await
3544            .expect("gate must seal once the late partials land");
3545        assert!(
3546            start.elapsed() >= Duration::from_millis(250),
3547            "gate returned before the late partials could have landed",
3548        );
3549    }
3550
3551    #[tokio::test]
3552    async fn gate_passes_when_all_registry_markers_present() {
3553        // Same topology as the previous test, but now the follower's
3554        // markers are pre-populated — the gate sees a complete set
3555        // across the full registry and the checkpoint succeeds.
3556        use bytes::Bytes;
3557        use laminar_core::state::InProcessBackend;
3558        let dir = tempfile::tempdir().unwrap();
3559        let mut coord = make_coordinator(dir.path()).await;
3560        let backend = Arc::new(InProcessBackend::new(4));
3561        // Simulate the follower's prior write on vnodes {2, 3} for the
3562        // epoch the leader is about to use (fresh store starts at 1).
3563        backend
3564            .write_partial(2, 1, 0, Bytes::from_static(b"follower"))
3565            .await
3566            .unwrap();
3567        backend
3568            .write_partial(3, 1, 0, Bytes::from_static(b"follower"))
3569            .await
3570            .unwrap();
3571        coord.set_state_backend(backend);
3572        coord.set_vnode_set(vec![0, 1]);
3573        coord.set_gate_vnode_set(vec![0, 1, 2, 3]);
3574
3575        let result = coord
3576            .checkpoint(CheckpointRequest::default())
3577            .await
3578            .unwrap();
3579        assert!(result.success, "gate should pass: every vnode has a marker");
3580    }
3581
3582    #[tokio::test]
3583    async fn marker_write_failure_aborts_checkpoint() {
3584        use laminar_core::state::InProcessBackend;
3585        let dir = tempfile::tempdir().unwrap();
3586        let mut coord = make_coordinator(dir.path()).await;
3587        // Backend is sized for 2 vnodes, but we claim to own vnode 99 →
3588        // bridge fails its write, checkpoint aborts cleanly.
3589        coord.set_state_backend(Arc::new(InProcessBackend::new(2)));
3590        coord.set_vnode_set(vec![0, 99]);
3591
3592        let result = coord
3593            .checkpoint(CheckpointRequest::default())
3594            .await
3595            .unwrap();
3596        assert!(
3597            !result.success,
3598            "out-of-range vnode must fail the checkpoint"
3599        );
3600        let err = result.error.expect("failure produces an error message");
3601        assert!(err.contains("vnode partial write failed"), "got: {err}");
3602    }
3603
3604    /// A vnode whose slices didn't change uploads a
3605    /// reference to its last full partial instead of the state bytes,
3606    /// and is forced back to full before the base ages out of the
3607    /// prune retention window.
3608    #[tokio::test]
3609    async fn unchanged_vnode_state_becomes_reference_partial() {
3610        use laminar_core::state::InProcessBackend;
3611
3612        let dir = tempfile::tempdir().unwrap();
3613        let config = CheckpointConfig {
3614            max_retained: 2, // reference age cap = 2 epochs
3615            ..CheckpointConfig::default()
3616        };
3617        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
3618        let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
3619        let backend = Arc::new(InProcessBackend::new(2));
3620        coord.set_state_backend(Arc::clone(&backend) as Arc<dyn StateBackend>);
3621        coord.set_vnode_set(vec![0]);
3622
3623        let slices = || {
3624            let mut ops = std::collections::HashMap::new();
3625            ops.insert("agg".to_string(), bytes::Bytes::from_static(b"state-v1"));
3626            std::collections::HashMap::from([(0u32, ops)])
3627        };
3628
3629        // Epoch 1: full upload.
3630        coord.set_pending_vnode_states(slices());
3631        let r1 = coord
3632            .checkpoint(CheckpointRequest::default())
3633            .await
3634            .unwrap();
3635        assert!(r1.success);
3636        let p1 = crate::vnode_partial::VnodePartial::decode(
3637            &backend.read_partial(0, r1.epoch).await.unwrap().unwrap(),
3638        )
3639        .unwrap();
3640        assert_eq!(p1.base_epoch, None, "first upload must be full");
3641        assert!(!p1.operators.is_empty());
3642
3643        // Epoch 2: identical slices → reference to epoch 1.
3644        coord.set_pending_vnode_states(slices());
3645        let r2 = coord
3646            .checkpoint(CheckpointRequest::default())
3647            .await
3648            .unwrap();
3649        assert!(r2.success);
3650        let p2 = crate::vnode_partial::VnodePartial::decode(
3651            &backend.read_partial(0, r2.epoch).await.unwrap().unwrap(),
3652        )
3653        .unwrap();
3654        assert_eq!(
3655            p2.base_epoch,
3656            Some(r1.epoch),
3657            "unchanged slice must reference its base"
3658        );
3659        assert!(p2.operators.is_empty());
3660
3661        // Epoch 3: still identical, but the base would hit the age cap —
3662        // forced back to a full upload (new base).
3663        coord.set_pending_vnode_states(slices());
3664        let r3 = coord
3665            .checkpoint(CheckpointRequest::default())
3666            .await
3667            .unwrap();
3668        assert!(r3.success);
3669        let p3 = crate::vnode_partial::VnodePartial::decode(
3670            &backend.read_partial(0, r3.epoch).await.unwrap().unwrap(),
3671        )
3672        .unwrap();
3673        assert_eq!(
3674            p3.base_epoch, None,
3675            "reference age cap must force a full re-upload",
3676        );
3677
3678        // Changed slices always upload full.
3679        let mut changed = std::collections::HashMap::new();
3680        let mut ops = std::collections::HashMap::new();
3681        ops.insert("agg".to_string(), bytes::Bytes::from_static(b"state-v2"));
3682        changed.insert(0u32, ops);
3683        coord.set_pending_vnode_states(changed);
3684        let r4 = coord
3685            .checkpoint(CheckpointRequest::default())
3686            .await
3687            .unwrap();
3688        assert!(r4.success);
3689        let p4 = crate::vnode_partial::VnodePartial::decode(
3690            &backend.read_partial(0, r4.epoch).await.unwrap().unwrap(),
3691        )
3692        .unwrap();
3693        assert_eq!(p4.base_epoch, None);
3694        assert!(!p4.operators.is_empty());
3695    }
3696
3697    /// `InProcessBackend` wrapper with a per-write delay (forces epoch
3698    /// overlap) and injected failures keyed by `(epoch, vnode)`.
3699    struct FaultBackend {
3700        inner: laminar_core::state::InProcessBackend,
3701        fail: parking_lot::Mutex<std::collections::HashSet<(u64, u32)>>,
3702        write_delay: Duration,
3703    }
3704
3705    #[async_trait::async_trait]
3706    impl StateBackend for FaultBackend {
3707        async fn write_partial(
3708            &self,
3709            vnode: u32,
3710            epoch: u64,
3711            assignment_version: u64,
3712            bytes: bytes::Bytes,
3713        ) -> Result<(), laminar_core::state::StateBackendError> {
3714            tokio::time::sleep(self.write_delay).await;
3715            if self.fail.lock().contains(&(epoch, vnode)) {
3716                return Err(laminar_core::state::StateBackendError::Io(
3717                    "injected write failure".into(),
3718                ));
3719            }
3720            self.inner
3721                .write_partial(vnode, epoch, assignment_version, bytes)
3722                .await
3723        }
3724
3725        async fn read_partial(
3726            &self,
3727            vnode: u32,
3728            epoch: u64,
3729        ) -> Result<Option<bytes::Bytes>, laminar_core::state::StateBackendError> {
3730            self.inner.read_partial(vnode, epoch).await
3731        }
3732
3733        async fn epoch_complete(
3734            &self,
3735            epoch: u64,
3736            vnodes: &[u32],
3737        ) -> Result<bool, laminar_core::state::StateBackendError> {
3738            self.inner.epoch_complete(epoch, vnodes).await
3739        }
3740
3741        async fn prune_before(
3742            &self,
3743            before: u64,
3744        ) -> Result<(), laminar_core::state::StateBackendError> {
3745            self.inner.prune_before(before).await
3746        }
3747
3748        async fn latest_committed_epoch(
3749            &self,
3750        ) -> Result<Option<u64>, laminar_core::state::StateBackendError> {
3751            self.inner.latest_committed_epoch().await
3752        }
3753
3754        fn set_authoritative_version(&self, version: u64) {
3755            self.inner.set_authoritative_version(version);
3756        }
3757
3758        fn authoritative_version(&self) -> u64 {
3759            self.inner.authoritative_version()
3760        }
3761    }
3762
3763    /// Fault injection at pipeline depth > 1. Four
3764    /// epochs are admitted (ids allocated, tails spawned) while the
3765    /// first is still uploading; the third epoch's upload partially
3766    /// fails — one vnode's write lands, the other's is injected to
3767    /// fail. Must hold:
3768    /// - tails complete in admission order (FIFO coordinator mutex);
3769    /// - the failed epoch is abandoned without disturbing successors;
3770    /// - the recovery point is the last successful epoch;
3771    /// - the partial that *landed* for the failed epoch never becomes
3772    ///   a reference base (a successor with identical state must
3773    ///   re-upload full, or reference an older *successful* epoch).
3774    #[tokio::test]
3775    #[allow(clippy::too_many_lines)] // four-epoch fault sequence reads better unsplit
3776    async fn overlapping_epoch_failure_is_isolated() {
3777        let dir = tempfile::tempdir().unwrap();
3778        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
3779        let mut coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
3780            .await
3781            .unwrap();
3782        let backend = Arc::new(FaultBackend {
3783            inner: laminar_core::state::InProcessBackend::new(2),
3784            fail: parking_lot::Mutex::new(std::collections::HashSet::new()),
3785            write_delay: Duration::from_millis(100),
3786        });
3787        coord.set_state_backend(Arc::clone(&backend) as Arc<dyn StateBackend>);
3788        coord.set_vnode_set(vec![0, 1]);
3789
3790        let allocator = coord.epoch_allocator();
3791        let coordinator = Arc::new(tokio::sync::Mutex::new(Some(coord)));
3792        let (done_tx, mut done_rx) = tokio::sync::mpsc::unbounded_channel::<CheckpointResult>();
3793
3794        // Admit an epoch exactly as the pipeline callback does: claim
3795        // ids lock-free, spawn the tail; the FIFO mutex serializes the
3796        // durable work.
3797        let admit = |tag: &'static [u8]| {
3798            let (epoch, checkpoint_id) = allocator.allocate();
3799            let coordinator = Arc::clone(&coordinator);
3800            let done = done_tx.clone();
3801            let states = std::collections::HashMap::from([
3802                (
3803                    0u32,
3804                    std::collections::HashMap::from([(
3805                        "agg".to_string(),
3806                        bytes::Bytes::from_static(tag),
3807                    )]),
3808                ),
3809                (
3810                    1u32,
3811                    std::collections::HashMap::from([(
3812                        "agg".to_string(),
3813                        bytes::Bytes::from_static(tag),
3814                    )]),
3815                ),
3816            ]);
3817            tokio::spawn(async move {
3818                let mut guard = coordinator.lock().await;
3819                let coord = guard.as_mut().unwrap();
3820                coord.set_pending_vnode_states(states);
3821                let result = coord
3822                    .checkpoint_preallocated(
3823                        CheckpointRequest::default(),
3824                        epoch,
3825                        checkpoint_id,
3826                        QuorumStage::RunInline,
3827                    )
3828                    .await
3829                    .unwrap();
3830                done.send(result).unwrap();
3831            });
3832            epoch
3833        };
3834
3835        // All four admitted while epoch A's tail is still uploading
3836        // (each write sleeps 100ms; admissions are microseconds apart,
3837        // paced just enough that lock-queue order is admission order).
3838        let a = admit(b"v1");
3839        tokio::time::sleep(Duration::from_millis(10)).await;
3840        let b = admit(b"v1"); // unchanged → reference to A
3841        tokio::time::sleep(Duration::from_millis(10)).await;
3842        let (c_epoch, _) = allocator.peek();
3843        backend.fail.lock().insert((c_epoch, 1)); // vnode 0 lands, vnode 1 fails
3844        let c = admit(b"v2"); // changed → full attempt, partially fails
3845        tokio::time::sleep(Duration::from_millis(10)).await;
3846        let d = admit(b"v2"); // same state as the failed epoch
3847
3848        let mut results = Vec::new();
3849        for _ in 0..4 {
3850            results.push(done_rx.recv().await.unwrap());
3851        }
3852
3853        assert_eq!(
3854            results.iter().map(|r| r.epoch).collect::<Vec<_>>(),
3855            vec![a, b, c, d],
3856            "tails must complete in admission order",
3857        );
3858        assert_eq!(
3859            results.iter().map(|r| r.success).collect::<Vec<_>>(),
3860            vec![true, true, false, true],
3861            "the failed epoch must not disturb its successors",
3862        );
3863        assert!(results[2]
3864            .error
3865            .as_deref()
3866            .is_some_and(|e| e.contains("vnode partial write failed")));
3867
3868        // Recovery point: the failed epoch was never sealed.
3869        assert_eq!(
3870            backend.latest_committed_epoch().await.unwrap(),
3871            Some(d),
3872            "the last successful epoch is the recovery point",
3873        );
3874
3875        // B was unchanged from A → reference. D matches the FAILED
3876        // epoch's state, and C's vnode-0 write landed before the
3877        // injected failure — D must not reference it (bases are
3878        // recorded only after every write in an epoch lands).
3879        let p_b = crate::vnode_partial::VnodePartial::decode(
3880            &backend.read_partial(0, b).await.unwrap().unwrap(),
3881        )
3882        .unwrap();
3883        assert_eq!(p_b.base_epoch, Some(a));
3884        for vnode in [0u32, 1] {
3885            let p_d = crate::vnode_partial::VnodePartial::decode(
3886                &backend.read_partial(vnode, d).await.unwrap().unwrap(),
3887            )
3888            .unwrap();
3889            assert_eq!(
3890                p_d.base_epoch, None,
3891                "vnode {vnode}: a successor of a failed epoch must re-upload full, \
3892                 never reference the failed epoch's stray partial",
3893            );
3894            assert_eq!(p_d.operators[0].1, b"v2");
3895        }
3896        assert_eq!(c, d - 1, "abandoned epoch's id is burned, not reused");
3897    }
3898
3899    /// A follower persists its manifest before learning the leader
3900    /// aborted, so an aborted epoch's Pending manifest can be the
3901    /// highest on disk at restart. Construction seeds ids from it
3902    /// (high is safe); recovery then restores from the older committed
3903    /// epoch and must NOT walk the ids back down — that would
3904    /// re-allocate the aborted epoch over its stale artifacts.
3905    #[tokio::test]
3906    async fn recovery_never_walks_ids_back_onto_aborted_epochs() {
3907        use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
3908
3909        let dir = tempfile::tempdir().unwrap();
3910        let store = FileSystemCheckpointStore::new(dir.path(), 5);
3911        // Committed epoch 3.
3912        let mut committed = CheckpointManifest::new(3, 3);
3913        committed
3914            .sink_commit_statuses
3915            .insert("out".into(), SinkCommitStatus::Committed);
3916        store.save(&committed).await.unwrap();
3917        // Aborted epoch 5: persisted by a follower before the leader's
3918        // Abort, never committed.
3919        let mut aborted = CheckpointManifest::new(5, 5);
3920        aborted
3921            .sink_commit_statuses
3922            .insert("out".into(), SinkCommitStatus::Pending);
3923        store.save(&aborted).await.unwrap();
3924
3925        let mut coord = CheckpointCoordinator::new(CheckpointConfig::default(), Box::new(store))
3926            .await
3927            .unwrap();
3928        assert_eq!(coord.epoch(), 6, "seeds from the highest loadable manifest");
3929
3930        let recovered = coord.recover().await.unwrap().expect("recovers");
3931        assert_eq!(recovered.epoch(), 3, "restores from the committed epoch");
3932        assert_eq!(
3933            coord.epoch(),
3934            6,
3935            "ids must stay above the aborted epoch, never re-allocating it",
3936        );
3937    }
3938
3939    #[test]
3940    fn epoch_allocator_allocates_monotonic_pairs() {
3941        let a = EpochAllocator::new(5, 9);
3942        assert_eq!(a.peek(), (5, 9));
3943        assert_eq!(a.allocate(), (5, 9));
3944        assert_eq!(a.allocate(), (6, 10));
3945        assert_eq!(a.peek(), (7, 11));
3946        a.advance_to(20, 30);
3947        assert_eq!(a.allocate(), (20, 30));
3948        // Monotonic: never walks backwards.
3949        a.advance_to(5, 5);
3950        assert_eq!(a.peek(), (21, 31));
3951    }
3952
3953    /// Ids are allocated at the start of an attempt: a failed epoch is
3954    /// abandoned (Flink-style), never retried under the same ids.
3955    #[tokio::test]
3956    async fn failed_epoch_is_abandoned_not_retried() {
3957        let dir = tempfile::tempdir().unwrap();
3958        let config = CheckpointConfig {
3959            max_checkpoint_bytes: Some(16),
3960            ..CheckpointConfig::default()
3961        };
3962        let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
3963        let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
3964
3965        // Oversized state → size-cap rejection.
3966        let mut ops = HashMap::new();
3967        ops.insert("big".to_string(), bytes::Bytes::from(vec![0u8; 2_000_000]));
3968        let failed = coord
3969            .checkpoint(CheckpointRequest {
3970                operator_states: ops,
3971                ..CheckpointRequest::default()
3972            })
3973            .await
3974            .unwrap();
3975        assert!(!failed.success);
3976
3977        let ok = coord
3978            .checkpoint(CheckpointRequest::default())
3979            .await
3980            .unwrap();
3981        assert!(ok.success);
3982        assert_eq!(
3983            ok.epoch,
3984            failed.epoch + 1,
3985            "the failed epoch must be abandoned, not reused",
3986        );
3987        assert_eq!(ok.checkpoint_id, failed.checkpoint_id + 1);
3988    }
3989
3990    #[tokio::test]
3991    async fn test_stats_include_percentiles_after_checkpoints() {
3992        let dir = tempfile::tempdir().unwrap();
3993        let mut coord = make_coordinator(dir.path()).await;
3994
3995        // Run 3 checkpoints.
3996        for _ in 0..3 {
3997            let result = coord
3998                .checkpoint(CheckpointRequest::default())
3999                .await
4000                .unwrap();
4001            assert!(result.success);
4002        }
4003
4004        let stats = coord.stats();
4005        assert_eq!(stats.completed, 3);
4006        // After 3 fast checkpoints, percentiles should be > 0
4007        // (they're real durations, not zero).
4008        assert!(stats.last_duration.is_some());
4009    }
4010
4011    /// Sink whose `pre_commit` always fails; counts `rollback_epoch` calls.
4012    struct FailingPreCommitSink {
4013        rollback_count: Arc<std::sync::atomic::AtomicU64>,
4014        schema: arrow::datatypes::SchemaRef,
4015    }
4016
4017    #[async_trait::async_trait]
4018    impl laminar_connectors::connector::SinkConnector for FailingPreCommitSink {
4019        async fn open(
4020            &mut self,
4021            _config: &laminar_connectors::config::ConnectorConfig,
4022        ) -> Result<(), laminar_connectors::error::ConnectorError> {
4023            Ok(())
4024        }
4025
4026        async fn write_batch(
4027            &mut self,
4028            _batch: &arrow::array::RecordBatch,
4029        ) -> Result<
4030            laminar_connectors::connector::WriteResult,
4031            laminar_connectors::error::ConnectorError,
4032        > {
4033            Ok(laminar_connectors::connector::WriteResult::new(0, 0))
4034        }
4035
4036        async fn pre_commit(
4037            &mut self,
4038            epoch: u64,
4039        ) -> Result<(), laminar_connectors::error::ConnectorError> {
4040            Err(laminar_connectors::error::ConnectorError::TransactionError(
4041                format!("synthetic pre_commit failure at epoch {epoch}"),
4042            ))
4043        }
4044
4045        async fn rollback_epoch(
4046            &mut self,
4047            _epoch: u64,
4048        ) -> Result<(), laminar_connectors::error::ConnectorError> {
4049            self.rollback_count
4050                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4051            Ok(())
4052        }
4053
4054        async fn close(&mut self) -> Result<(), laminar_connectors::error::ConnectorError> {
4055            Ok(())
4056        }
4057
4058        fn schema(&self) -> arrow::datatypes::SchemaRef {
4059            Arc::clone(&self.schema)
4060        }
4061
4062        fn capabilities(&self) -> laminar_connectors::connector::SinkConnectorCapabilities {
4063            laminar_connectors::connector::SinkConnectorCapabilities::new(Duration::from_secs(5))
4064                .with_exactly_once()
4065                .with_two_phase_commit()
4066                .with_preserves_pending_on_abandon()
4067        }
4068    }
4069
4070    /// A pre-commit failure abandons the epoch but must NOT hard-roll
4071    /// back a connector that preserves pending output (see
4072    /// `SinkCommand::RollbackEpoch`): the pending rows ride into the
4073    /// next epoch's commit. Connectors without the capability ARE
4074    /// rolled back.
4075    #[tokio::test]
4076    async fn pre_commit_failure_abandons_without_connector_rollback() {
4077        use arrow::datatypes::{DataType, Field, Schema};
4078
4079        let dir = tempfile::tempdir().unwrap();
4080        let mut coord = make_coordinator(dir.path()).await;
4081
4082        let rollback_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
4083        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
4084        let sink = FailingPreCommitSink {
4085            rollback_count: Arc::clone(&rollback_count),
4086            schema,
4087        };
4088        let (event_tx, _event_rx) = laminar_core::streaming::channel::channel::<
4089            crate::sink_task::SinkEvent,
4090        >(crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY);
4091        let handle = crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
4092            name: "failing-sink".into(),
4093            sink_id: Arc::from("failing-sink"),
4094            connector: Box::new(sink),
4095            exactly_once: true,
4096            channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
4097            flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
4098            write_timeout: Duration::from_secs(5),
4099            event_tx,
4100        });
4101        coord.register_sink("failing-sink", handle, true);
4102
4103        coord.begin_initial_epoch().await.unwrap();
4104
4105        let result = coord
4106            .checkpoint(CheckpointRequest::default())
4107            .await
4108            .unwrap();
4109
4110        assert!(!result.success);
4111        assert!(
4112            result
4113                .error
4114                .as_deref()
4115                .is_some_and(|e| e.contains("pre-commit failed")),
4116            "error should mention pre-commit: got {:?}",
4117            result.error
4118        );
4119        assert_eq!(
4120            rollback_count.load(std::sync::atomic::Ordering::Relaxed),
4121            0,
4122            "a healthy sink must keep its pending output on a live abandon"
4123        );
4124    }
4125
4126    /// Writes fail (poisoning the epoch); `rollback_epoch` hangs
4127    /// forever. The poisoned epoch is what makes the live abandon take
4128    /// the forced connector-rollback path that can hang.
4129    struct StuckRollbackSink {
4130        schema: arrow::datatypes::SchemaRef,
4131    }
4132
4133    #[async_trait::async_trait]
4134    impl laminar_connectors::connector::SinkConnector for StuckRollbackSink {
4135        async fn open(
4136            &mut self,
4137            _config: &laminar_connectors::config::ConnectorConfig,
4138        ) -> Result<(), laminar_connectors::error::ConnectorError> {
4139            Ok(())
4140        }
4141
4142        async fn write_batch(
4143            &mut self,
4144            _batch: &arrow::array::RecordBatch,
4145        ) -> Result<
4146            laminar_connectors::connector::WriteResult,
4147            laminar_connectors::error::ConnectorError,
4148        > {
4149            Err(laminar_connectors::error::ConnectorError::WriteError(
4150                "synthetic write failure".into(),
4151            ))
4152        }
4153
4154        async fn pre_commit(
4155            &mut self,
4156            _epoch: u64,
4157        ) -> Result<(), laminar_connectors::error::ConnectorError> {
4158            Err(laminar_connectors::error::ConnectorError::TransactionError(
4159                "synthetic pre_commit failure".into(),
4160            ))
4161        }
4162
4163        async fn rollback_epoch(
4164            &mut self,
4165            _epoch: u64,
4166        ) -> Result<(), laminar_connectors::error::ConnectorError> {
4167            // Hang until the test runtime drops us.
4168            std::future::pending::<()>().await;
4169            Ok(())
4170        }
4171
4172        async fn close(&mut self) -> Result<(), laminar_connectors::error::ConnectorError> {
4173            Ok(())
4174        }
4175
4176        fn schema(&self) -> arrow::datatypes::SchemaRef {
4177            Arc::clone(&self.schema)
4178        }
4179
4180        fn capabilities(&self) -> laminar_connectors::connector::SinkConnectorCapabilities {
4181            laminar_connectors::connector::SinkConnectorCapabilities::new(Duration::from_secs(5))
4182                .with_exactly_once()
4183                .with_two_phase_commit()
4184        }
4185    }
4186
4187    #[tokio::test(start_paused = true)]
4188    async fn test_rollback_sinks_bounded_by_timeout() {
4189        use arrow::datatypes::{DataType, Field, Schema};
4190
4191        let dir = tempfile::tempdir().unwrap();
4192        let config = CheckpointConfig {
4193            rollback_timeout: Duration::from_millis(100),
4194            ..Default::default()
4195        };
4196        let store = Box::new(
4197            laminar_core::storage::checkpoint_store::FileSystemCheckpointStore::new(dir.path(), 3),
4198        );
4199        let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
4200
4201        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
4202        let sink = StuckRollbackSink { schema };
4203        let (event_tx, _event_rx) = laminar_core::streaming::channel::channel::<
4204            crate::sink_task::SinkEvent,
4205        >(crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY);
4206        let handle = crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
4207            name: "stuck-sink".into(),
4208            sink_id: Arc::from("stuck-sink"),
4209            connector: Box::new(sink),
4210            exactly_once: true,
4211            channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
4212            flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
4213            write_timeout: Duration::from_secs(5),
4214            event_tx,
4215        });
4216        coord.register_sink("stuck-sink", handle.clone(), true);
4217        coord.begin_initial_epoch().await.unwrap();
4218
4219        // Poison the epoch with a failing write — only a poisoned sink
4220        // takes the forced connector-rollback path that can hang.
4221        let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
4222        let batch = arrow::array::RecordBatch::try_new(
4223            schema,
4224            vec![Arc::new(arrow::array::Int32Array::from(vec![1]))],
4225        )
4226        .unwrap();
4227        handle.write_batch(batch).await.unwrap();
4228        handle.sync().await.unwrap();
4229
4230        // Poisoned pre_commit fails → rollback_sinks fires → connector
4231        // rollback hangs → 100ms rollback_timeout fires → coordinator
4232        // returns instead of wedging.
4233        let result = coord
4234            .checkpoint(CheckpointRequest::default())
4235            .await
4236            .unwrap();
4237
4238        assert!(!result.success);
4239        assert!(
4240            result
4241                .error
4242                .as_deref()
4243                .is_some_and(|e| e.contains("pre-commit failed")),
4244            "checkpoint result should reflect pre-commit failure: got {:?}",
4245            result.error
4246        );
4247    }
4248}