1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use laminar_connectors::checkpoint::SourceCheckpoint;
25use laminar_connectors::connector::SourceConnector;
26use laminar_core::state::StateBackend;
27use laminar_storage::checkpoint_manifest::{
28 CheckpointManifest, ConnectorCheckpoint, SinkCommitStatus,
29};
30use laminar_storage::checkpoint_store::CheckpointStore;
31use tracing::{debug, error, info, warn};
32
33use crate::error::DbError;
34
35#[derive(Debug, Clone)]
43pub struct CheckpointConfig {
44 pub interval: Option<Duration>,
46 pub max_retained: usize,
48 pub alignment_timeout: Duration,
50 pub pre_commit_timeout: Duration,
52 pub persist_timeout: Duration,
54 pub commit_timeout: Duration,
56 pub rollback_timeout: Duration,
58 pub state_inline_threshold: usize,
60 pub serialization_timeout: Duration,
62 pub max_checkpoint_bytes: Option<usize>,
64}
65
66impl Default for CheckpointConfig {
67 fn default() -> Self {
68 Self {
69 interval: Some(Duration::from_secs(60)),
70 max_retained: 3,
71 alignment_timeout: Duration::from_secs(30),
72 pre_commit_timeout: Duration::from_secs(30),
73 persist_timeout: Duration::from_secs(120),
74 commit_timeout: Duration::from_secs(60),
75 rollback_timeout: Duration::from_secs(30),
76 serialization_timeout: Duration::from_secs(120),
77 state_inline_threshold: 1_048_576,
78 max_checkpoint_bytes: None,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct CheckpointRequest {
86 pub operator_states: HashMap<String, bytes::Bytes>,
90 pub watermark: Option<i64>,
92 pub table_store_checkpoint_path: Option<String>,
94 pub extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
96 pub source_watermarks: HashMap<String, i64>,
98 pub pipeline_hash: Option<u64>,
100 pub source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
106pub enum CheckpointPhase {
107 Idle,
109 Snapshotting,
111 PreCommitting,
113 Persisting,
115 Committing,
117}
118
119impl std::fmt::Display for CheckpointPhase {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 match self {
122 Self::Idle => write!(f, "Idle"),
123 Self::Snapshotting => write!(f, "Snapshotting"),
124 Self::PreCommitting => write!(f, "PreCommitting"),
125 Self::Persisting => write!(f, "Persisting"),
126 Self::Committing => write!(f, "Committing"),
127 }
128 }
129}
130
131#[derive(Debug, serde::Serialize)]
133pub struct CheckpointResult {
134 pub success: bool,
136 pub checkpoint_id: u64,
138 pub epoch: u64,
140 pub duration: Duration,
142 pub error: Option<String>,
144}
145
146pub(crate) struct RegisteredSource {
148 pub name: String,
150 pub connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
152 pub supports_replay: bool,
157}
158
159pub(crate) struct RegisteredSink {
161 pub name: String,
163 pub handle: crate::sink_task::SinkTaskHandle,
165 pub exactly_once: bool,
167}
168
169pub struct CheckpointCoordinator {
175 config: CheckpointConfig,
176 store: Arc<dyn CheckpointStore>,
177 sinks: Vec<RegisteredSink>,
178 next_checkpoint_id: u64,
179 epoch: u64,
180 phase: CheckpointPhase,
181 checkpoints_completed: u64,
182 checkpoints_failed: u64,
183 last_checkpoint_duration: Option<Duration>,
184 duration_histogram: DurationHistogram,
185 prom: Option<Arc<crate::engine_metrics::EngineMetrics>>,
186 total_bytes_written: u64,
187 state_backend: Option<Arc<dyn StateBackend>>,
190 assignment_version: u64,
193 #[cfg(feature = "cluster-unstable")]
197 decision_store: Option<Arc<laminar_core::cluster::control::CheckpointDecisionStore>>,
198 local_watermark_ms: Option<i64>,
201 #[cfg(feature = "cluster-unstable")]
204 cluster_min_watermark: Option<i64>,
205 vnode_set: Vec<u32>,
207 gate_vnode_set: Vec<u32>,
210 #[cfg(feature = "cluster-unstable")]
212 cluster_controller: Option<Arc<laminar_core::cluster::control::ClusterController>>,
213 cached_sorted_sink_names: Option<Vec<String>>,
215}
216
217impl CheckpointCoordinator {
218 pub async fn new(
227 config: CheckpointConfig,
228 store: Box<dyn CheckpointStore>,
229 ) -> Result<Self, DbError> {
230 let store: Arc<dyn CheckpointStore> = Arc::from(store);
231 let (next_id, epoch) = match store.load_latest().await {
232 Ok(Some(m)) => (m.checkpoint_id + 1, m.epoch + 1),
233 Ok(None) => (1, 1),
234 Err(e) => {
235 return Err(DbError::Checkpoint(format!(
236 "[LDB-6028] failed to load latest checkpoint at coordinator \
237 construction: {e} — refusing to start at epoch 1 and \
238 clobber existing on-disk state"
239 )));
240 }
241 };
242
243 Ok(Self {
244 config,
245 store,
246 sinks: Vec::new(),
247 next_checkpoint_id: next_id,
248 epoch,
249 phase: CheckpointPhase::Idle,
250 checkpoints_completed: 0,
251 checkpoints_failed: 0,
252 last_checkpoint_duration: None,
253 duration_histogram: DurationHistogram::new(),
254 prom: None,
255 total_bytes_written: 0,
256 state_backend: None,
257 assignment_version: 0,
258 #[cfg(feature = "cluster-unstable")]
259 decision_store: None,
260 local_watermark_ms: None,
261 #[cfg(feature = "cluster-unstable")]
262 cluster_min_watermark: None,
263 vnode_set: Vec::new(),
264 gate_vnode_set: Vec::new(),
265 #[cfg(feature = "cluster-unstable")]
266 cluster_controller: None,
267 cached_sorted_sink_names: None,
268 })
269 }
270
271 #[cfg(feature = "cluster-unstable")]
274 pub fn set_cluster_controller(
275 &mut self,
276 controller: Arc<laminar_core::cluster::control::ClusterController>,
277 ) {
278 self.cluster_controller = Some(controller);
279 }
280
281 pub fn set_state_backend(&mut self, backend: Arc<dyn StateBackend>) {
284 self.state_backend = Some(backend);
285 }
286
287 #[cfg(feature = "cluster-unstable")]
289 pub fn set_decision_store(
290 &mut self,
291 store: Arc<laminar_core::cluster::control::CheckpointDecisionStore>,
292 ) {
293 self.decision_store = Some(store);
294 }
295
296 pub fn set_assignment_version(&mut self, version: u64) {
301 self.assignment_version = version;
302 }
303
304 pub fn set_local_watermark_ms(&mut self, watermark: Option<i64>) {
310 self.local_watermark_ms = watermark;
311 }
312
313 pub fn set_vnode_set(&mut self, vnodes: Vec<u32>) {
316 if self.gate_vnode_set.is_empty() {
317 self.gate_vnode_set.clone_from(&vnodes);
318 }
319 self.vnode_set = vnodes;
320 }
321
322 pub fn set_gate_vnode_set(&mut self, vnodes: Vec<u32>) {
325 self.gate_vnode_set = vnodes;
326 }
327
328 pub(crate) fn register_sink(
330 &mut self,
331 name: impl Into<String>,
332 handle: crate::sink_task::SinkTaskHandle,
333 exactly_once: bool,
334 ) {
335 self.sinks.push(RegisteredSink {
336 name: name.into(),
337 handle,
338 exactly_once,
339 });
340 self.cached_sorted_sink_names = None;
343 }
344
345 pub async fn begin_initial_epoch(&self) -> Result<(), DbError> {
356 self.begin_epoch_for_sinks(self.epoch).await
357 }
358
359 async fn begin_epoch_for_sinks(&self, epoch: u64) -> Result<(), DbError> {
362 let mut started: Vec<&RegisteredSink> = Vec::new();
363 for sink in &self.sinks {
364 if sink.exactly_once {
365 match sink.handle.begin_epoch(epoch).await {
366 Ok(()) => {
367 started.push(sink);
368 debug!(sink = %sink.name, epoch, "began epoch");
369 }
370 Err(e) => {
371 for s in &started {
373 if let Err(re) = s.handle.rollback_epoch(epoch).await {
374 error!(sink = %s.name, epoch, error = %re,
375 "[LDB-6016] sink rollback failed during begin_epoch recovery");
376 }
377 }
378 return Err(DbError::Checkpoint(format!(
379 "sink '{}' failed to begin epoch {epoch}: {e}",
380 sink.name
381 )));
382 }
383 }
384 }
385 }
386 Ok(())
387 }
388
389 pub fn set_metrics(&mut self, prom: Arc<crate::engine_metrics::EngineMetrics>) {
391 self.prom = Some(prom);
392 }
393
394 fn emit_checkpoint_metrics(&self, success: bool, epoch: u64, duration: Duration) {
396 if let Some(ref m) = self.prom {
397 if success {
398 m.checkpoints_completed.inc();
399 } else {
400 m.checkpoints_failed.inc();
401 }
402 #[allow(clippy::cast_possible_wrap)]
403 m.checkpoint_epoch.set(epoch as i64);
404 m.checkpoint_duration.observe(duration.as_secs_f64());
405 }
406 }
407
408 pub async fn checkpoint(
417 &mut self,
418 request: CheckpointRequest,
419 ) -> Result<CheckpointResult, DbError> {
420 self.checkpoint_inner(request).await
421 }
422
423 async fn pre_commit_sinks(&self, epoch: u64) -> Result<(), DbError> {
428 let timeout_dur = self.config.pre_commit_timeout;
429 let start = std::time::Instant::now();
430
431 let result =
432 match tokio::time::timeout(timeout_dur, self.pre_commit_sinks_inner(epoch)).await {
433 Ok(result) => result,
434 Err(_elapsed) => Err(DbError::Checkpoint(format!(
435 "pre-commit timed out after {}s",
436 timeout_dur.as_secs()
437 ))),
438 };
439
440 if let Some(ref m) = self.prom {
442 m.sink_precommit_duration
443 .observe(start.elapsed().as_secs_f64());
444 }
445
446 result
447 }
448
449 async fn pre_commit_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
460 let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
461 let handle = sink.handle.clone();
462 let name = sink.name.clone();
463 async move {
464 let result = handle.pre_commit(epoch).await;
465 match result {
466 Ok(()) => {
467 debug!(sink = %name, epoch, "sink pre-committed");
468 Ok(())
469 }
470 Err(e) => Err(DbError::Checkpoint(format!(
471 "sink '{name}' pre-commit failed: {e}"
472 ))),
473 }
474 }
475 });
476 futures::future::try_join_all(futures).await.map(|_| ())
477 }
478
479 async fn commit_sinks_tracked(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
488 let timeout_dur = self.config.commit_timeout;
489 let start = std::time::Instant::now();
490
491 let statuses = match tokio::time::timeout(timeout_dur, self.commit_sinks_inner(epoch)).await
492 {
493 Ok(statuses) => statuses,
494 Err(_elapsed) => {
495 error!(
496 epoch,
497 timeout_secs = timeout_dur.as_secs(),
498 "[LDB-6012] sink commit timed out — marking all pending sinks as failed"
499 );
500 self.sinks
501 .iter()
502 .filter(|s| s.exactly_once)
503 .map(|s| {
504 (
505 s.name.clone(),
506 SinkCommitStatus::Failed(format!(
507 "sink '{}' commit timed out after {}s",
508 s.name,
509 timeout_dur.as_secs()
510 )),
511 )
512 })
513 .collect()
514 }
515 };
516
517 if let Some(ref m) = self.prom {
519 m.sink_commit_duration
520 .observe(start.elapsed().as_secs_f64());
521 }
522
523 statuses
524 }
525
526 async fn commit_sinks_inner(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
534 let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
535 let handle = sink.handle.clone();
536 let name = sink.name.clone();
537 async move {
538 let status = match handle.commit_epoch(epoch).await {
539 Ok(()) => {
540 debug!(sink = %name, epoch, "sink committed");
541 SinkCommitStatus::Committed
542 }
543 Err(e) => {
544 let msg = format!("sink '{name}' commit failed: {e}");
545 error!(sink = %name, epoch, error = %e, "sink commit failed");
546 SinkCommitStatus::Failed(msg)
547 }
548 };
549 (name, status)
550 }
551 });
552 let results = futures::future::join_all(futures).await;
553 results.into_iter().collect()
554 }
555
556 async fn save_manifest(
566 &self,
567 manifest: Arc<CheckpointManifest>,
568 state_data: Option<Vec<bytes::Bytes>>,
569 ) -> Result<(), DbError> {
570 let timeout_dur = self.config.persist_timeout;
571 let fut = self.store.save_with_state(&manifest, state_data.as_deref());
572 match tokio::time::timeout(timeout_dur, fut).await {
573 Ok(Ok(())) => Ok(()),
574 Ok(Err(e)) => Err(DbError::from(e)),
575 Err(_elapsed) => Err(DbError::Checkpoint(format!(
576 "[LDB-6011] manifest persist timed out after {}s — \
577 filesystem may be degraded",
578 timeout_dur.as_secs()
579 ))),
580 }
581 }
582
583 async fn write_vnode_markers(&self, epoch: u64, checkpoint_id: u64) -> Result<(), DbError> {
593 let Some(ref backend) = self.state_backend else {
594 return Ok(());
595 };
596 if self.vnode_set.is_empty() {
597 return Ok(());
598 }
599 let payload = bytes::Bytes::from(format!("ckpt:{checkpoint_id}").into_bytes());
600 let caller_version = self.assignment_version;
604 let writes = self.vnode_set.iter().map(|&v| {
605 let backend = Arc::clone(backend);
606 let payload = payload.clone();
607 async move {
608 backend
609 .write_partial(v, epoch, caller_version, payload)
610 .await
611 .map_err(|e| {
612 DbError::Checkpoint(format!(
613 "[LDB-6024] vnode marker write failed (vnode={v}, epoch={epoch}): {e}"
614 ))
615 })
616 }
617 });
618 futures::future::try_join_all(writes).await.map(|_| ())
619 }
620
621 #[cfg(feature = "cluster-unstable")]
626 pub async fn reconcile_prepared_on_init(&self) {
627 use laminar_core::cluster::control::Phase;
628
629 let Ok(Some(last)) = self.store.load_latest().await else {
630 return;
631 };
632 let has_pending = last
633 .sink_commit_statuses
634 .values()
635 .any(|s| matches!(s, SinkCommitStatus::Pending));
636 if !has_pending {
637 return;
638 }
639
640 let epoch = last.epoch;
641 let checkpoint_id = last.checkpoint_id;
642
643 let committed = match self.decision_store.as_ref() {
644 Some(ds) => ds.is_committed(epoch).await.unwrap_or_else(|e| {
645 warn!(
646 epoch, checkpoint_id, error = %e,
647 "[LDB-6040] decision store read failed — defaulting to Abort",
648 );
649 false
650 }),
651 None => false,
652 };
653
654 let is_leader = self
655 .cluster_controller
656 .as_ref()
657 .is_some_and(|cc| cc.is_leader());
658
659 if committed {
660 info!(
661 epoch,
662 checkpoint_id, "recovering Pending epoch as Committed"
663 );
664 let statuses = self.commit_sinks_tracked(epoch).await;
665 if let Err(e) = self
666 .persist_recovered_statuses(checkpoint_id, statuses)
667 .await
668 {
669 warn!(epoch, checkpoint_id, error = %e, "post-recovery manifest update failed");
670 }
671 if is_leader {
672 self.announce_if_leader(epoch, checkpoint_id, Phase::Commit, None)
673 .await;
674 }
675 } else {
676 warn!(
677 epoch,
678 checkpoint_id, "[LDB-6035] Pending epoch with no commit marker — rolling back",
679 );
680 if let Err(e) = self.rollback_sinks(epoch).await {
681 error!(epoch, checkpoint_id, error = %e, "sink rollback failed during recovery");
682 }
683 if is_leader {
684 self.announce_if_leader(epoch, checkpoint_id, Phase::Abort, None)
685 .await;
686 }
687 }
688
689 if is_leader {
691 tokio::time::sleep(Duration::from_millis(200)).await;
692 }
693 }
694
695 #[cfg(feature = "cluster-unstable")]
698 async fn persist_recovered_statuses(
699 &self,
700 checkpoint_id: u64,
701 statuses: HashMap<String, SinkCommitStatus>,
702 ) -> Result<(), DbError> {
703 if statuses.is_empty() {
704 return Ok(());
705 }
706 match self.store.load_by_id(checkpoint_id).await {
707 Ok(Some(mut m)) => {
708 m.sink_commit_statuses = statuses;
709 self.update_manifest_only(Arc::new(m)).await
710 }
711 Ok(None) => Ok(()),
712 Err(e) => Err(DbError::from(e)),
713 }
714 }
715
716 #[cfg(feature = "cluster-unstable")]
725 async fn announce_if_leader(
726 &self,
727 epoch: u64,
728 checkpoint_id: u64,
729 phase: laminar_core::cluster::control::Phase,
730 min_watermark_ms: Option<i64>,
731 ) {
732 let Some(cc) = self.cluster_controller.as_ref() else {
733 return;
734 };
735 if !cc.is_leader() {
736 return;
737 }
738 let ann = laminar_core::cluster::control::BarrierAnnouncement {
739 epoch,
740 checkpoint_id,
741 phase,
742 flags: 0,
743 min_watermark_ms,
744 };
745 if let Err(e) = cc.announce_barrier(&ann).await {
746 warn!(
747 epoch,
748 checkpoint_id,
749 ?phase,
750 error = %e,
751 "[LDB-6031] barrier announcement failed",
752 );
753 }
754 }
755
756 #[cfg(feature = "cluster-unstable")]
763 async fn await_prepare_quorum(&mut self, epoch: u64, checkpoint_id: u64) -> Option<String> {
764 use laminar_core::cluster::control::{Phase, QuorumOutcome};
765 let cc = self.cluster_controller.as_ref()?;
766 if !cc.is_leader() {
767 return None;
768 }
769 self.announce_if_leader(epoch, checkpoint_id, Phase::Prepare, None)
770 .await;
771
772 let mut followers = cc.live_instances();
773 followers.retain(|id| *id != cc.instance_id());
774 if followers.is_empty() {
775 self.cluster_min_watermark = self.local_watermark_ms;
778 if let Some(wm) = self.local_watermark_ms {
779 cc.publish_cluster_min_watermark(wm);
780 }
781 return None;
782 }
783
784 let outcome = cc
785 .wait_for_quorum(epoch, &followers, Duration::from_secs(30))
786 .await;
787 match outcome {
788 QuorumOutcome::Reached {
789 min_follower_watermark_ms,
790 ..
791 } => {
792 let merged = match (self.local_watermark_ms, min_follower_watermark_ms) {
796 (Some(a), Some(b)) => Some(a.min(b)),
797 (Some(a), None) => Some(a),
798 (None, Some(b)) => Some(b),
799 (None, None) => None,
800 };
801 self.cluster_min_watermark = merged;
802 if let Some(wm) = merged {
808 cc.publish_cluster_min_watermark(wm);
809 }
810 None
811 }
812 QuorumOutcome::TimedOut { missing, .. } => {
813 self.announce_if_leader(epoch, checkpoint_id, Phase::Abort, None)
814 .await;
815 Some(format!(
816 "quorum timeout: {} follower(s) did not ack",
817 missing.len()
818 ))
819 }
820 QuorumOutcome::Failed { failures } => {
821 self.announce_if_leader(epoch, checkpoint_id, Phase::Abort, None)
822 .await;
823 let first = failures.first().map_or("unknown", |(_, msg)| msg.as_str());
824 Some(format!(
825 "follower snapshot failed on {} peer(s): {first}",
826 failures.len()
827 ))
828 }
829 }
830 }
831
832 async fn update_manifest_only(&self, manifest: Arc<CheckpointManifest>) -> Result<(), DbError> {
836 let timeout_dur = self.config.persist_timeout;
837 let fut = self.store.update_manifest(&manifest);
838 match tokio::time::timeout(timeout_dur, fut).await {
839 Ok(Ok(())) => Ok(()),
840 Ok(Err(e)) => Err(DbError::from(e)),
841 Err(_elapsed) => Err(DbError::Checkpoint(format!(
842 "manifest update timed out after {}s",
843 timeout_dur.as_secs()
844 ))),
845 }
846 }
847
848 fn initial_sink_commit_statuses(&self) -> HashMap<String, SinkCommitStatus> {
850 self.sinks
851 .iter()
852 .filter(|s| s.exactly_once)
853 .map(|s| (s.name.clone(), SinkCommitStatus::Pending))
854 .collect()
855 }
856
857 fn pack_operator_states(
866 manifest: &mut CheckpointManifest,
867 operator_states: &HashMap<String, bytes::Bytes>,
868 threshold: usize,
869 ) -> Option<Vec<bytes::Bytes>> {
870 let mut sidecar_chunks: Vec<bytes::Bytes> = Vec::new();
871 let mut offset: u64 = 0;
872 for (name, data) in operator_states {
873 let (op_ckpt, maybe_blob) =
874 laminar_storage::checkpoint_manifest::OperatorCheckpoint::from_bytes_shared(
875 data.clone(),
876 threshold,
877 offset,
878 );
879 if let Some(blob) = maybe_blob {
880 offset += blob.len() as u64;
881 sidecar_chunks.push(blob);
882 }
883 manifest.operator_states.insert(name.clone(), op_ckpt);
884 }
885
886 if sidecar_chunks.is_empty() {
887 None
888 } else {
889 Some(sidecar_chunks)
890 }
891 }
892
893 async fn rollback_sinks(&self, epoch: u64) -> Result<(), DbError> {
896 let timeout_dur = self.config.rollback_timeout;
897 match tokio::time::timeout(timeout_dur, self.rollback_sinks_inner(epoch)).await {
898 Ok(result) => result,
899 Err(_elapsed) => {
900 error!(
901 epoch,
902 timeout_secs = timeout_dur.as_secs(),
903 "[LDB-6016] sink rollback timed out"
904 );
905 Err(DbError::Checkpoint(format!(
906 "rollback timed out after {}s",
907 timeout_dur.as_secs()
908 )))
909 }
910 }
911 }
912
913 async fn rollback_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
914 let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
915 let handle = sink.handle.clone();
916 let name = sink.name.clone();
917 async move {
918 let result = handle.rollback_epoch(epoch).await;
919 (name, result)
920 }
921 });
922 let results = futures::future::join_all(futures).await;
923
924 let mut errors = Vec::new();
925 for (name, result) in results {
926 if let Err(e) = result {
927 error!(sink = %name, epoch, error = %e, "[LDB-6016] sink rollback failed");
928 errors.push(format!("sink '{name}': {e}"));
929 }
930 }
931 if errors.is_empty() {
932 Ok(())
933 } else {
934 Err(DbError::Checkpoint(format!(
935 "rollback failed: {}",
936 errors.join("; ")
937 )))
938 }
939 }
940
941 fn collect_sink_epochs(&self) -> HashMap<String, u64> {
943 let mut epochs = HashMap::with_capacity(self.sinks.len());
944 for sink in &self.sinks {
945 if sink.exactly_once {
947 epochs.insert(sink.name.clone(), self.epoch);
948 }
949 }
950 epochs
951 }
952
953 fn sorted_sink_names(&mut self) -> Vec<String> {
959 if self.cached_sorted_sink_names.is_none() {
960 let mut names: Vec<String> = self.sinks.iter().map(|s| s.name.clone()).collect();
961 names.sort();
962 self.cached_sorted_sink_names = Some(names);
963 }
964 self.cached_sorted_sink_names.as_ref().unwrap().clone()
966 }
967
968 #[must_use]
970 pub fn phase(&self) -> CheckpointPhase {
971 self.phase
972 }
973
974 #[must_use]
976 pub fn epoch(&self) -> u64 {
977 self.epoch
978 }
979
980 #[must_use]
982 pub fn next_checkpoint_id(&self) -> u64 {
983 self.next_checkpoint_id
984 }
985
986 #[must_use]
988 pub fn config(&self) -> &CheckpointConfig {
989 &self.config
990 }
991
992 #[must_use]
994 pub fn stats(&self) -> CheckpointStats {
995 let (p50, p95, p99) = self.duration_histogram.percentiles();
996 CheckpointStats {
998 completed: self.checkpoints_completed,
999 failed: self.checkpoints_failed,
1000 last_duration: self.last_checkpoint_duration,
1001 duration_p50_ms: p50 / 1_000,
1002 duration_p95_ms: p95 / 1_000,
1003 duration_p99_ms: p99 / 1_000,
1004 total_bytes_written: self.total_bytes_written,
1005 current_phase: self.phase,
1006 current_epoch: self.epoch,
1007 }
1008 }
1009
1010 #[must_use]
1012 pub fn store(&self) -> &dyn CheckpointStore {
1013 &*self.store
1014 }
1015
1016 pub async fn checkpoint_with_offsets(
1028 &mut self,
1029 request: CheckpointRequest,
1030 ) -> Result<CheckpointResult, DbError> {
1031 self.checkpoint_inner(request).await
1032 }
1033
1034 #[cfg(feature = "cluster-unstable")]
1041 pub async fn follower_checkpoint(
1042 &mut self,
1043 request: CheckpointRequest,
1044 ann: laminar_core::cluster::control::BarrierAnnouncement,
1045 decision_timeout: Duration,
1046 ) -> Result<bool, DbError> {
1047 use laminar_core::cluster::control::{BarrierAck, Phase};
1048
1049 let Some(cc) = self.cluster_controller.clone() else {
1050 return Err(DbError::Checkpoint(
1051 "[LDB-6033] follower_checkpoint called without cluster controller".into(),
1052 ));
1053 };
1054
1055 let epoch = ann.epoch;
1056 let checkpoint_id = ann.checkpoint_id;
1057 self.epoch = epoch;
1059 self.next_checkpoint_id = checkpoint_id.saturating_add(1);
1060
1061 let prepare_result = self.follower_prepare(request, epoch, checkpoint_id).await;
1063 let prepare_err = prepare_result.as_ref().err().map(ToString::to_string);
1064 cc.ack_barrier(&BarrierAck {
1065 epoch,
1066 ok: prepare_err.is_none(),
1067 error: prepare_err.clone(),
1068 local_watermark_ms: self.local_watermark_ms,
1071 })
1072 .await
1073 .ok(); if let Err(e) = prepare_result {
1075 self.rollback_sinks(epoch).await.ok();
1076 self.phase = CheckpointPhase::Idle;
1077 return Err(e);
1078 }
1079
1080 let deadline = Instant::now() + decision_timeout;
1082 loop {
1083 match cc.observe_barrier().await.ok().flatten() {
1084 Some(a) if a.epoch == epoch && a.phase == Phase::Commit => {
1085 return Ok(self.drive_follower_commit(epoch, checkpoint_id).await);
1086 }
1087 Some(a) if a.epoch == epoch && a.phase == Phase::Abort => {
1088 self.rollback_sinks(epoch).await.ok();
1089 self.checkpoints_failed += 1;
1090 self.phase = CheckpointPhase::Idle;
1091 return Ok(false);
1092 }
1093 _ => {}
1094 }
1095 if Instant::now() >= deadline {
1096 let committed = match self.decision_store.as_ref() {
1097 Some(ds) => ds.is_committed(epoch).await.unwrap_or_else(|e| {
1098 warn!(
1099 epoch, checkpoint_id, error = %e,
1100 "[LDB-6045] decision store read failed — defaulting to Abort",
1101 );
1102 false
1103 }),
1104 None => false,
1105 };
1106 if committed {
1107 warn!(
1108 epoch,
1109 checkpoint_id,
1110 "[LDB-6046] follower timeout but marker present — driving commit",
1111 );
1112 return Ok(self.drive_follower_commit(epoch, checkpoint_id).await);
1113 }
1114 warn!(
1115 epoch,
1116 checkpoint_id, "[LDB-6034] follower decision timeout; rolling back",
1117 );
1118 self.rollback_sinks(epoch).await.ok();
1119 self.checkpoints_failed += 1;
1120 self.phase = CheckpointPhase::Idle;
1121 return Ok(false);
1122 }
1123 tokio::time::sleep(Duration::from_millis(50)).await;
1124 }
1125 }
1126
1127 #[cfg(feature = "cluster-unstable")]
1130 async fn drive_follower_commit(&mut self, epoch: u64, checkpoint_id: u64) -> bool {
1131 let statuses = self.commit_sinks_tracked(epoch).await;
1132 let has_failures = statuses
1133 .values()
1134 .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1135 if has_failures {
1136 error!(
1137 epoch,
1138 checkpoint_id, "follower sink commit partially failed — rolling back",
1139 );
1140 self.rollback_sinks(epoch).await.ok();
1141 self.checkpoints_failed += 1;
1142 self.phase = CheckpointPhase::Idle;
1143 return false;
1144 }
1145 if let Err(e) = self
1149 .persist_recovered_statuses(checkpoint_id, statuses)
1150 .await
1151 {
1152 warn!(
1153 checkpoint_id,
1154 epoch,
1155 error = %e,
1156 "follower post-commit manifest update failed",
1157 );
1158 }
1159 self.checkpoints_completed += 1;
1160 self.epoch = epoch.saturating_add(1);
1161 self.phase = CheckpointPhase::Idle;
1162 true
1163 }
1164
1165 #[cfg(feature = "cluster-unstable")]
1167 async fn follower_prepare(
1168 &mut self,
1169 request: CheckpointRequest,
1170 epoch: u64,
1171 checkpoint_id: u64,
1172 ) -> Result<(), DbError> {
1173 let CheckpointRequest {
1174 operator_states,
1175 watermark,
1176 table_store_checkpoint_path,
1177 extra_table_offsets,
1178 source_watermarks,
1179 pipeline_hash,
1180 source_offset_overrides,
1181 } = request;
1182
1183 self.phase = CheckpointPhase::PreCommitting;
1184 self.pre_commit_sinks(epoch).await?;
1185
1186 let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1187 manifest.source_offsets = source_offset_overrides;
1188 manifest.table_offsets = extra_table_offsets;
1189 manifest.sink_epochs = self.collect_sink_epochs();
1190 manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1191 manifest.watermark = watermark;
1192 manifest.source_watermarks = source_watermarks;
1193 manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1194 manifest.source_names = {
1195 let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
1196 names.sort();
1197 names
1198 };
1199 manifest.sink_names = self.sorted_sink_names();
1200 manifest.pipeline_hash = pipeline_hash;
1201 let state_data = Self::pack_operator_states(
1202 &mut manifest,
1203 &operator_states,
1204 self.config.state_inline_threshold,
1205 );
1206
1207 self.phase = CheckpointPhase::Persisting;
1208 self.save_manifest(Arc::new(manifest), state_data).await?;
1209 self.write_vnode_markers(epoch, checkpoint_id).await?;
1210 Ok(())
1211 }
1212
1213 #[allow(clippy::too_many_lines)]
1220 async fn checkpoint_inner(
1221 &mut self,
1222 request: CheckpointRequest,
1223 ) -> Result<CheckpointResult, DbError> {
1224 let CheckpointRequest {
1225 operator_states,
1226 watermark,
1227 table_store_checkpoint_path,
1228 extra_table_offsets,
1229 source_watermarks,
1230 pipeline_hash,
1231 source_offset_overrides,
1232 } = request;
1233 let start = Instant::now();
1234 let checkpoint_id = self.next_checkpoint_id;
1235 let epoch = self.epoch;
1236
1237 info!(checkpoint_id, epoch, "starting checkpoint");
1238
1239 self.phase = CheckpointPhase::Snapshotting;
1242 let source_offsets = source_offset_overrides;
1243 let table_offsets = extra_table_offsets;
1244
1245 self.phase = CheckpointPhase::PreCommitting;
1246 if let Err(e) = self.pre_commit_sinks(epoch).await {
1247 self.phase = CheckpointPhase::Idle;
1248 self.checkpoints_failed += 1;
1249 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1253 error!(
1254 checkpoint_id,
1255 epoch,
1256 error = %rollback_err,
1257 "[LDB-6004] sink rollback failed after pre-commit failure"
1258 );
1259 }
1260 let duration = start.elapsed();
1261 self.emit_checkpoint_metrics(false, epoch, duration);
1262 error!(checkpoint_id, epoch, error = %e, "pre-commit failed");
1263 return Ok(CheckpointResult {
1264 success: false,
1265 checkpoint_id,
1266 epoch,
1267 duration,
1268 error: Some(format!("pre-commit failed: {e}")),
1269 });
1270 }
1271
1272 let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1273 manifest.source_offsets = source_offsets;
1274 manifest.table_offsets = table_offsets;
1275 manifest.sink_epochs = self.collect_sink_epochs();
1276 manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1278 manifest.watermark = watermark;
1279 manifest.source_watermarks = source_watermarks;
1284 manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1285 manifest.source_names = {
1286 let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
1287 names.sort();
1288 names
1289 };
1290 manifest.sink_names = self.sorted_sink_names();
1291 manifest.pipeline_hash = pipeline_hash;
1292
1293 let state_data = Self::pack_operator_states(
1294 &mut manifest,
1295 &operator_states,
1296 self.config.state_inline_threshold,
1297 );
1298 let sidecar_bytes = state_data
1299 .as_ref()
1300 .map_or(0, |chunks| chunks.iter().map(bytes::Bytes::len).sum());
1301 if sidecar_bytes > 0 {
1302 debug!(
1303 checkpoint_id,
1304 sidecar_bytes, "writing operator state sidecar"
1305 );
1306 }
1307
1308 if let Some(cap) = self.config.max_checkpoint_bytes {
1309 if sidecar_bytes > cap {
1310 self.phase = CheckpointPhase::Idle;
1311 self.checkpoints_failed += 1;
1312 let duration = start.elapsed();
1313 self.emit_checkpoint_metrics(false, epoch, duration);
1314 let msg = format!(
1315 "[LDB-6014] checkpoint size {sidecar_bytes} bytes exceeds \
1316 cap {cap} bytes — checkpoint rejected"
1317 );
1318 error!(checkpoint_id, epoch, sidecar_bytes, cap, "{msg}");
1319 return Ok(CheckpointResult {
1320 success: false,
1321 checkpoint_id,
1322 epoch,
1323 duration,
1324 error: Some(msg),
1325 });
1326 }
1327 let warn_threshold = cap * 4 / 5; if sidecar_bytes > warn_threshold {
1329 warn!(
1330 checkpoint_id,
1331 epoch, sidecar_bytes, cap, "checkpoint size approaching cap (>80%)"
1332 );
1333 }
1334 }
1335 let checkpoint_bytes = sidecar_bytes as u64;
1336
1337 self.phase = CheckpointPhase::Persisting;
1338 let mut manifest = Arc::new(manifest);
1343 if let Err(e) = self.save_manifest(Arc::clone(&manifest), state_data).await {
1344 self.phase = CheckpointPhase::Idle;
1345 self.checkpoints_failed += 1;
1346 let duration = start.elapsed();
1347 self.emit_checkpoint_metrics(false, epoch, duration);
1348 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1349 error!(
1350 checkpoint_id,
1351 epoch,
1352 error = %rollback_err,
1353 "[LDB-6004] sink rollback failed after manifest persist failure — \
1354 sinks may be in an inconsistent state"
1355 );
1356 }
1357 error!(checkpoint_id, epoch, error = %e, "[LDB-6008] manifest persist failed");
1358 return Ok(CheckpointResult {
1359 success: false,
1360 checkpoint_id,
1361 epoch,
1362 duration,
1363 error: Some(format!("manifest persist failed: {e}")),
1364 });
1365 }
1366
1367 if let Err(e) = self.write_vnode_markers(epoch, checkpoint_id).await {
1371 self.phase = CheckpointPhase::Idle;
1372 self.checkpoints_failed += 1;
1373 let duration = start.elapsed();
1374 self.emit_checkpoint_metrics(false, epoch, duration);
1375 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1376 error!(
1377 checkpoint_id,
1378 epoch,
1379 error = %rollback_err,
1380 "[LDB-6025] sink rollback failed after marker write failure",
1381 );
1382 }
1383 error!(checkpoint_id, epoch, error = %e, "vnode marker write failed");
1384 return Ok(CheckpointResult {
1385 success: false,
1386 checkpoint_id,
1387 epoch,
1388 duration,
1389 error: Some(format!("vnode marker write failed: {e}")),
1390 });
1391 }
1392
1393 #[cfg(feature = "cluster-unstable")]
1396 {
1397 if let Some(quorum_failure) = self.await_prepare_quorum(epoch, checkpoint_id).await {
1398 self.phase = CheckpointPhase::Idle;
1399 self.checkpoints_failed += 1;
1400 let duration = start.elapsed();
1401 self.emit_checkpoint_metrics(false, epoch, duration);
1402 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1403 error!(
1404 checkpoint_id,
1405 epoch,
1406 error = %rollback_err,
1407 "[LDB-6032] sink rollback failed after quorum miss",
1408 );
1409 }
1410 return Ok(CheckpointResult {
1411 success: false,
1412 checkpoint_id,
1413 epoch,
1414 duration,
1415 error: Some(quorum_failure),
1416 });
1417 }
1418 }
1419
1420 if let Some(ref backend) = self.state_backend {
1426 if !self.gate_vnode_set.is_empty() {
1427 match backend.epoch_complete(epoch, &self.gate_vnode_set).await {
1428 Ok(true) => {}
1429 Ok(false) => {
1430 warn!(
1431 checkpoint_id,
1432 epoch,
1433 vnodes = self.gate_vnode_set.len(),
1434 "[LDB-6020] state durability gate returned false — \
1435 rolling back sinks",
1436 );
1437 #[cfg(feature = "cluster-unstable")]
1438 self.announce_if_leader(
1439 epoch,
1440 checkpoint_id,
1441 laminar_core::cluster::control::Phase::Abort,
1442 None,
1443 )
1444 .await;
1445 self.checkpoints_failed += 1;
1446 self.phase = CheckpointPhase::Idle;
1447 let duration = start.elapsed();
1448 self.emit_checkpoint_metrics(false, epoch, duration);
1449 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1450 error!(
1451 checkpoint_id,
1452 epoch,
1453 error = %rollback_err,
1454 "[LDB-6021] sink rollback failed after durability gate miss",
1455 );
1456 }
1457 return Ok(CheckpointResult {
1458 success: false,
1459 checkpoint_id,
1460 epoch,
1461 duration,
1462 error: Some("state durability gate: not all vnodes persisted".into()),
1463 });
1464 }
1465 Err(e) => {
1466 warn!(
1467 checkpoint_id,
1468 epoch,
1469 error = %e,
1470 "[LDB-6022] state backend error during durability gate — \
1471 treating as gate miss, rolling back sinks",
1472 );
1473 #[cfg(feature = "cluster-unstable")]
1474 self.announce_if_leader(
1475 epoch,
1476 checkpoint_id,
1477 laminar_core::cluster::control::Phase::Abort,
1478 None,
1479 )
1480 .await;
1481 self.checkpoints_failed += 1;
1482 self.phase = CheckpointPhase::Idle;
1483 let duration = start.elapsed();
1484 self.emit_checkpoint_metrics(false, epoch, duration);
1485 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1486 error!(
1487 checkpoint_id,
1488 epoch,
1489 error = %rollback_err,
1490 "[LDB-6023] sink rollback failed after durability gate error",
1491 );
1492 }
1493 return Ok(CheckpointResult {
1494 success: false,
1495 checkpoint_id,
1496 epoch,
1497 duration,
1498 error: Some(format!("state durability gate: {e}")),
1499 });
1500 }
1501 }
1502 }
1503 }
1504
1505 #[cfg(feature = "cluster-unstable")]
1509 if self
1510 .cluster_controller
1511 .as_ref()
1512 .is_some_and(|cc| cc.is_leader())
1513 {
1514 let marker_result = match self.decision_store.as_ref() {
1515 Some(ds) => ds.record_committed(epoch).await.map_err(|e| e.to_string()),
1516 None => Err("no decision store configured for cluster leader".to_string()),
1517 };
1518 if let Err(reason) = marker_result {
1519 error!(
1520 checkpoint_id, epoch, error = %reason,
1521 "[LDB-6038] cannot record commit marker — aborting epoch",
1522 );
1523 self.announce_if_leader(
1524 epoch,
1525 checkpoint_id,
1526 laminar_core::cluster::control::Phase::Abort,
1527 None,
1528 )
1529 .await;
1530 self.checkpoints_failed += 1;
1531 self.phase = CheckpointPhase::Idle;
1532 let duration = start.elapsed();
1533 self.emit_checkpoint_metrics(false, epoch, duration);
1534 if let Err(rollback_err) = self.rollback_sinks(epoch).await {
1535 error!(
1536 checkpoint_id, epoch, error = %rollback_err,
1537 "[LDB-6039] sink rollback failed after commit marker failure",
1538 );
1539 }
1540 return Ok(CheckpointResult {
1541 success: false,
1542 checkpoint_id,
1543 epoch,
1544 duration,
1545 error: Some(format!("commit marker: {reason}")),
1546 });
1547 }
1548 }
1549
1550 #[cfg(feature = "cluster-unstable")]
1551 self.announce_if_leader(
1555 epoch,
1556 checkpoint_id,
1557 laminar_core::cluster::control::Phase::Commit,
1558 self.cluster_min_watermark,
1559 )
1560 .await;
1561
1562 self.phase = CheckpointPhase::Committing;
1563 let sink_statuses = self.commit_sinks_tracked(epoch).await;
1564 let has_failures = sink_statuses
1565 .values()
1566 .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1567
1568 if !sink_statuses.is_empty() {
1569 Arc::make_mut(&mut manifest).sink_commit_statuses = sink_statuses;
1573 if let Err(e) = self.update_manifest_only(Arc::clone(&manifest)).await {
1574 warn!(
1575 checkpoint_id,
1576 epoch,
1577 error = %e,
1578 "post-commit manifest update failed"
1579 );
1580 }
1581 }
1582
1583 if has_failures {
1584 self.checkpoints_failed += 1;
1585 error!(
1586 checkpoint_id,
1587 epoch, "sink commit partially failed — epoch NOT advanced, will retry"
1588 );
1589 self.phase = CheckpointPhase::Idle;
1590 let duration = start.elapsed();
1591 self.emit_checkpoint_metrics(false, epoch, duration);
1592 return Ok(CheckpointResult {
1593 success: false,
1594 checkpoint_id,
1595 epoch,
1596 duration,
1597 error: Some("partial sink commit failure".into()),
1598 });
1599 }
1600
1601 self.phase = CheckpointPhase::Idle;
1602 self.next_checkpoint_id += 1;
1603 self.epoch += 1;
1604 self.checkpoints_completed += 1;
1605 self.total_bytes_written += checkpoint_bytes;
1606 let duration = start.elapsed();
1607 self.last_checkpoint_duration = Some(duration);
1608 self.duration_histogram.record(duration);
1609 self.emit_checkpoint_metrics(true, epoch, duration);
1610
1611 if let Some(ref m) = self.prom {
1613 #[allow(clippy::cast_possible_wrap)]
1614 m.checkpoint_size_bytes.set(checkpoint_bytes as i64);
1615 }
1616
1617 if let Some(ref backend) = self.state_backend {
1625 let horizon = epoch.saturating_sub(self.config.max_retained as u64);
1626 if horizon > 0 {
1627 if let Err(e) = backend.prune_before(horizon).await {
1628 warn!(
1629 epoch,
1630 horizon,
1631 error = %e,
1632 "[LDB-6026] state backend prune failed; old partials will linger"
1633 );
1634 }
1635 #[cfg(feature = "cluster-unstable")]
1636 if let Some(ref ds) = self.decision_store {
1637 if let Err(e) = ds.prune_before(horizon).await {
1638 warn!(epoch, horizon, error = %e, "decision prune failed");
1639 }
1640 }
1641 }
1642 }
1643
1644 let next_epoch = self.epoch;
1645 let begin_epoch_error = match self.begin_epoch_for_sinks(next_epoch).await {
1646 Ok(()) => None,
1647 Err(e) => {
1648 error!(
1649 next_epoch,
1650 error = %e,
1651 "[LDB-6015] failed to begin next epoch — writes will be non-transactional"
1652 );
1653 Some(e.to_string())
1654 }
1655 };
1656
1657 info!(
1658 checkpoint_id,
1659 epoch,
1660 duration_ms = duration.as_millis(),
1661 "checkpoint completed"
1662 );
1663
1664 Ok(CheckpointResult {
1668 success: true,
1669 checkpoint_id,
1670 epoch,
1671 duration,
1672 error: begin_epoch_error,
1673 })
1674 }
1675
1676 pub async fn recover(
1689 &mut self,
1690 ) -> Result<Option<crate::recovery_manager::RecoveredState>, DbError> {
1691 use crate::recovery_manager::RecoveryManager;
1692
1693 let mgr = RecoveryManager::new(&*self.store);
1694 let result = mgr.recover(&[], &self.sinks, &[]).await?;
1698
1699 if let Some(ref recovered) = result {
1700 self.epoch = recovered.epoch() + 1;
1702 self.next_checkpoint_id = recovered.manifest.checkpoint_id + 1;
1703 info!(
1704 epoch = self.epoch,
1705 checkpoint_id = self.next_checkpoint_id,
1706 "coordinator epoch set after recovery"
1707 );
1708 }
1709
1710 Ok(result)
1711 }
1712
1713 pub async fn load_latest_manifest(&self) -> Result<Option<CheckpointManifest>, DbError> {
1719 self.store.load_latest().await.map_err(DbError::from)
1720 }
1721}
1722
1723impl std::fmt::Debug for CheckpointCoordinator {
1724 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1725 f.debug_struct("CheckpointCoordinator")
1726 .field("epoch", &self.epoch)
1727 .field("next_checkpoint_id", &self.next_checkpoint_id)
1728 .field("phase", &self.phase)
1729 .field("sinks", &self.sinks.len())
1730 .field("completed", &self.checkpoints_completed)
1731 .field("failed", &self.checkpoints_failed)
1732 .finish_non_exhaustive()
1733 }
1734}
1735
1736#[derive(Clone)]
1741pub struct DurationHistogram {
1742 samples: Box<[u64; Self::CAPACITY]>,
1744 cursor: usize,
1746 count: u64,
1748}
1749
1750impl Default for DurationHistogram {
1751 fn default() -> Self {
1752 Self::new()
1753 }
1754}
1755
1756impl DurationHistogram {
1757 const CAPACITY: usize = 100;
1758
1759 #[must_use]
1761 pub fn new() -> Self {
1762 Self {
1763 samples: Box::new([0; Self::CAPACITY]),
1764 cursor: 0,
1765 count: 0,
1766 }
1767 }
1768
1769 pub fn record(&mut self, duration: Duration) {
1771 #[allow(clippy::cast_possible_truncation)]
1772 let us = duration.as_micros() as u64;
1773 self.samples[self.cursor] = us;
1774 self.cursor = (self.cursor + 1) % Self::CAPACITY;
1775 self.count += 1;
1776 }
1777
1778 #[must_use]
1780 pub fn is_empty(&self) -> bool {
1781 self.count == 0
1782 }
1783
1784 #[must_use]
1786 pub fn len(&self) -> usize {
1787 if self.count >= Self::CAPACITY as u64 {
1788 Self::CAPACITY
1789 } else {
1790 #[allow(clippy::cast_possible_truncation)]
1792 {
1793 self.count as usize
1794 }
1795 }
1796 }
1797
1798 #[must_use]
1802 pub fn percentile(&self, p: f64) -> u64 {
1803 let n = self.len();
1804 if n == 0 {
1805 return 0;
1806 }
1807 let mut sorted: Vec<u64> = self.samples[..n].to_vec();
1808 sorted.sort_unstable();
1809 #[allow(
1810 clippy::cast_possible_truncation,
1811 clippy::cast_sign_loss,
1812 clippy::cast_precision_loss
1813 )]
1814 let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
1815 sorted[idx]
1816 }
1817
1818 #[must_use]
1820 pub fn percentiles(&self) -> (u64, u64, u64) {
1821 let n = self.len();
1822 if n == 0 {
1823 return (0, 0, 0);
1824 }
1825 let mut sorted: Vec<u64> = self.samples[..n].to_vec();
1826 sorted.sort_unstable();
1827 #[allow(
1828 clippy::cast_possible_truncation,
1829 clippy::cast_sign_loss,
1830 clippy::cast_precision_loss
1831 )]
1832 let at = |p: f64| -> u64 {
1833 let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
1834 sorted[idx]
1835 };
1836 (at(0.50), at(0.95), at(0.99))
1837 }
1838}
1839
1840impl std::fmt::Debug for DurationHistogram {
1841 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1842 let (p50, p95, p99) = self.percentiles();
1843 f.debug_struct("DurationHistogram")
1844 .field("samples_len", &self.samples.len())
1845 .field("cursor", &self.cursor)
1846 .field("count", &self.count)
1847 .field("p50_us", &p50)
1848 .field("p95_us", &p95)
1849 .field("p99_us", &p99)
1850 .finish()
1851 }
1852}
1853
1854#[derive(Debug, Clone, serde::Serialize)]
1856pub struct CheckpointStats {
1857 pub completed: u64,
1859 pub failed: u64,
1861 pub last_duration: Option<Duration>,
1863 pub duration_p50_ms: u64,
1865 pub duration_p95_ms: u64,
1867 pub duration_p99_ms: u64,
1869 pub total_bytes_written: u64,
1871 pub current_phase: CheckpointPhase,
1873 pub current_epoch: u64,
1875}
1876
1877#[must_use]
1879pub fn source_to_connector_checkpoint(cp: &SourceCheckpoint) -> ConnectorCheckpoint {
1880 ConnectorCheckpoint {
1881 offsets: cp.offsets().clone(),
1882 epoch: cp.epoch(),
1883 metadata: cp.metadata().clone(),
1884 }
1885}
1886
1887#[must_use]
1889pub fn connector_to_source_checkpoint(cp: &ConnectorCheckpoint) -> SourceCheckpoint {
1890 let mut source_cp = SourceCheckpoint::with_offsets(cp.epoch, cp.offsets.clone());
1891 for (k, v) in &cp.metadata {
1892 source_cp.set_metadata(k.clone(), v.clone());
1893 }
1894 source_cp
1895}
1896
1897#[cfg(test)]
1898mod tests {
1899 use super::*;
1900 use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
1901
1902 async fn make_coordinator(dir: &std::path::Path) -> CheckpointCoordinator {
1903 let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
1904 CheckpointCoordinator::new(CheckpointConfig::default(), store)
1905 .await
1906 .unwrap()
1907 }
1908
1909 #[tokio::test]
1910 async fn test_coordinator_new() {
1911 let dir = tempfile::tempdir().unwrap();
1912 let coord = make_coordinator(dir.path()).await;
1913
1914 assert_eq!(coord.epoch(), 1);
1915 assert_eq!(coord.next_checkpoint_id(), 1);
1916 assert_eq!(coord.phase(), CheckpointPhase::Idle);
1917 }
1918
1919 #[tokio::test]
1920 async fn test_coordinator_resumes_from_stored_checkpoint() {
1921 let dir = tempfile::tempdir().unwrap();
1922
1923 let store = FileSystemCheckpointStore::new(dir.path(), 3);
1925 let m = CheckpointManifest::new(5, 10);
1926 store.save(&m).await.unwrap();
1927
1928 let coord = make_coordinator(dir.path()).await;
1930 assert_eq!(coord.epoch(), 11);
1931 assert_eq!(coord.next_checkpoint_id(), 6);
1932 }
1933
1934 #[test]
1935 fn test_checkpoint_phase_display() {
1936 assert_eq!(CheckpointPhase::Idle.to_string(), "Idle");
1937 assert_eq!(CheckpointPhase::Snapshotting.to_string(), "Snapshotting");
1938 assert_eq!(CheckpointPhase::PreCommitting.to_string(), "PreCommitting");
1939 assert_eq!(CheckpointPhase::Persisting.to_string(), "Persisting");
1940 assert_eq!(CheckpointPhase::Committing.to_string(), "Committing");
1941 }
1942
1943 #[test]
1944 fn test_source_to_connector_checkpoint() {
1945 let mut cp = SourceCheckpoint::new(5);
1946 cp.set_offset("partition-0", "1234");
1947 cp.set_metadata("topic", "events");
1948
1949 let cc = source_to_connector_checkpoint(&cp);
1950 assert_eq!(cc.epoch, 5);
1951 assert_eq!(cc.offsets.get("partition-0"), Some(&"1234".into()));
1952 assert_eq!(cc.metadata.get("topic"), Some(&"events".into()));
1953 }
1954
1955 #[test]
1956 fn test_connector_to_source_checkpoint() {
1957 let cc = ConnectorCheckpoint {
1958 offsets: HashMap::from([("lsn".into(), "0/ABCD".into())]),
1959 epoch: 3,
1960 metadata: HashMap::from([("type".into(), "postgres".into())]),
1961 };
1962
1963 let cp = connector_to_source_checkpoint(&cc);
1964 assert_eq!(cp.epoch(), 3);
1965 assert_eq!(cp.get_offset("lsn"), Some("0/ABCD"));
1966 assert_eq!(cp.get_metadata("type"), Some("postgres"));
1967 }
1968
1969 #[tokio::test]
1970 async fn test_stats_initial() {
1971 let dir = tempfile::tempdir().unwrap();
1972 let coord = make_coordinator(dir.path()).await;
1973 let stats = coord.stats();
1974
1975 assert_eq!(stats.completed, 0);
1976 assert_eq!(stats.failed, 0);
1977 assert!(stats.last_duration.is_none());
1978 assert_eq!(stats.duration_p50_ms, 0);
1979 assert_eq!(stats.duration_p95_ms, 0);
1980 assert_eq!(stats.duration_p99_ms, 0);
1981 assert_eq!(stats.current_phase, CheckpointPhase::Idle);
1982 }
1983
1984 #[tokio::test]
1985 async fn test_checkpoint_no_sources_no_sinks() {
1986 let dir = tempfile::tempdir().unwrap();
1987 let mut coord = make_coordinator(dir.path()).await;
1988
1989 let result = coord
1990 .checkpoint(CheckpointRequest {
1991 watermark: Some(1000),
1992 ..CheckpointRequest::default()
1993 })
1994 .await
1995 .unwrap();
1996
1997 assert!(result.success);
1998 assert_eq!(result.checkpoint_id, 1);
1999 assert_eq!(result.epoch, 1);
2000
2001 let loaded = coord.store().load_latest().await.unwrap().unwrap();
2003 assert_eq!(loaded.checkpoint_id, 1);
2004 assert_eq!(loaded.epoch, 1);
2005 assert_eq!(loaded.watermark, Some(1000));
2006
2007 let result2 = coord
2009 .checkpoint(CheckpointRequest {
2010 watermark: Some(2000),
2011 ..CheckpointRequest::default()
2012 })
2013 .await
2014 .unwrap();
2015
2016 assert!(result2.success);
2017 assert_eq!(result2.checkpoint_id, 2);
2018 assert_eq!(result2.epoch, 2);
2019
2020 let stats = coord.stats();
2021 assert_eq!(stats.completed, 2);
2022 assert_eq!(stats.failed, 0);
2023 }
2024
2025 #[tokio::test]
2026 async fn test_checkpoint_with_operator_states() {
2027 let dir = tempfile::tempdir().unwrap();
2028 let mut coord = make_coordinator(dir.path()).await;
2029
2030 let mut ops = HashMap::new();
2031 ops.insert(
2032 "window-agg".into(),
2033 bytes::Bytes::from_static(b"state-data"),
2034 );
2035 ops.insert("filter".into(), bytes::Bytes::from_static(b"filter-state"));
2036
2037 let result = coord
2038 .checkpoint(CheckpointRequest {
2039 operator_states: ops,
2040 ..CheckpointRequest::default()
2041 })
2042 .await
2043 .unwrap();
2044
2045 assert!(result.success);
2046
2047 let loaded = coord.store().load_latest().await.unwrap().unwrap();
2048 assert_eq!(loaded.operator_states.len(), 2);
2049
2050 let window_op = loaded.operator_states.get("window-agg").unwrap();
2051 assert_eq!(window_op.decode_inline().unwrap(), b"state-data");
2052 }
2053
2054 #[tokio::test]
2055 async fn test_checkpoint_with_table_store_path() {
2056 let dir = tempfile::tempdir().unwrap();
2057 let mut coord = make_coordinator(dir.path()).await;
2058
2059 let result = coord
2060 .checkpoint(CheckpointRequest {
2061 table_store_checkpoint_path: Some("/tmp/rocksdb_cp".into()),
2062 ..CheckpointRequest::default()
2063 })
2064 .await
2065 .unwrap();
2066
2067 assert!(result.success);
2068
2069 let loaded = coord.store().load_latest().await.unwrap().unwrap();
2070 assert_eq!(
2071 loaded.table_store_checkpoint_path.as_deref(),
2072 Some("/tmp/rocksdb_cp")
2073 );
2074 }
2075
2076 #[tokio::test]
2077 async fn test_load_latest_manifest_empty() {
2078 let dir = tempfile::tempdir().unwrap();
2079 let coord = make_coordinator(dir.path()).await;
2080 assert!(coord.load_latest_manifest().await.unwrap().is_none());
2081 }
2082
2083 #[tokio::test]
2084 async fn test_coordinator_debug() {
2085 let dir = tempfile::tempdir().unwrap();
2086 let coord = make_coordinator(dir.path()).await;
2087 let debug = format!("{coord:?}");
2088 assert!(debug.contains("CheckpointCoordinator"));
2089 assert!(debug.contains("epoch: 1"));
2090 }
2091
2092 #[tokio::test]
2093 async fn test_checkpoint_emits_metrics_on_success() {
2094 let dir = tempfile::tempdir().unwrap();
2095 let mut coord = make_coordinator(dir.path()).await;
2096
2097 let registry = prometheus::Registry::new();
2098 let prom = Arc::new(crate::engine_metrics::EngineMetrics::new(®istry));
2099 coord.set_metrics(Arc::clone(&prom));
2100
2101 let result = coord
2102 .checkpoint(CheckpointRequest {
2103 watermark: Some(1000),
2104 ..CheckpointRequest::default()
2105 })
2106 .await
2107 .unwrap();
2108
2109 assert!(result.success);
2110 assert_eq!(prom.checkpoints_completed.get(), 1);
2111 assert_eq!(prom.checkpoints_failed.get(), 0);
2112 assert_eq!(prom.checkpoint_epoch.get(), 1);
2113
2114 let result2 = coord
2116 .checkpoint(CheckpointRequest {
2117 watermark: Some(2000),
2118 ..CheckpointRequest::default()
2119 })
2120 .await
2121 .unwrap();
2122
2123 assert!(result2.success);
2124 assert_eq!(prom.checkpoints_completed.get(), 2);
2125 assert_eq!(prom.checkpoint_epoch.get(), 2);
2126 }
2127
2128 #[tokio::test]
2129 async fn test_checkpoint_without_metrics() {
2130 let dir = tempfile::tempdir().unwrap();
2132 let mut coord = make_coordinator(dir.path()).await;
2133
2134 let result = coord
2135 .checkpoint(CheckpointRequest::default())
2136 .await
2137 .unwrap();
2138
2139 assert!(result.success);
2140 }
2142
2143 #[test]
2144 fn test_histogram_empty() {
2145 let h = DurationHistogram::new();
2146 assert_eq!(h.len(), 0);
2147 assert_eq!(h.percentile(0.50), 0);
2148 assert_eq!(h.percentile(0.99), 0);
2149 let (p50, p95, p99) = h.percentiles();
2150 assert_eq!((p50, p95, p99), (0, 0, 0));
2151 }
2152
2153 #[test]
2154 fn test_histogram_single_sample() {
2155 let mut h = DurationHistogram::new();
2156 h.record(Duration::from_millis(42));
2157 assert_eq!(h.len(), 1);
2158 assert_eq!(h.percentile(0.50), 42_000);
2160 assert_eq!(h.percentile(0.99), 42_000);
2161 }
2162
2163 #[test]
2164 fn test_histogram_sub_millisecond() {
2165 let mut h = DurationHistogram::new();
2166 h.record(Duration::from_micros(500));
2168 assert_eq!(h.percentile(0.50), 500);
2169 assert_eq!(h.percentile(0.99), 500);
2170 }
2171
2172 #[test]
2173 fn test_histogram_percentiles() {
2174 let mut h = DurationHistogram::new();
2175 for i in 1..=100 {
2177 h.record(Duration::from_millis(i));
2178 }
2179 assert_eq!(h.len(), 100);
2180
2181 let p50 = h.percentile(0.50);
2182 let p95 = h.percentile(0.95);
2183 let p99 = h.percentile(0.99);
2184
2185 assert!((49_000..=51_000).contains(&p50), "p50={p50}");
2188 assert!((94_000..=96_000).contains(&p95), "p95={p95}");
2189 assert!((98_000..=100_000).contains(&p99), "p99={p99}");
2190 }
2191
2192 #[test]
2193 fn test_histogram_wraps_ring_buffer() {
2194 let mut h = DurationHistogram::new();
2195 for i in 1..=150 {
2197 h.record(Duration::from_millis(i));
2198 }
2199 assert_eq!(h.len(), 100);
2200 assert_eq!(h.count, 150);
2201
2202 let p50 = h.percentile(0.50);
2204 assert!((99_000..=101_000).contains(&p50), "p50={p50}");
2205 }
2206
2207 #[tokio::test]
2208 async fn test_sidecar_round_trip() {
2209 let dir = tempfile::tempdir().unwrap();
2210 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2211 let config = CheckpointConfig {
2212 state_inline_threshold: 100, ..CheckpointConfig::default()
2214 };
2215 let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
2216
2217 let mut ops = HashMap::new();
2219 ops.insert("small".into(), bytes::Bytes::from(vec![0xAAu8; 50]));
2220 ops.insert("large".into(), bytes::Bytes::from(vec![0xBBu8; 200]));
2221
2222 let result = coord
2223 .checkpoint(CheckpointRequest {
2224 operator_states: ops,
2225 ..CheckpointRequest::default()
2226 })
2227 .await
2228 .unwrap();
2229 assert!(result.success);
2230
2231 let loaded = coord.store().load_latest().await.unwrap().unwrap();
2233 let small_op = loaded.operator_states.get("small").unwrap();
2234 assert!(!small_op.external, "small state should be inline");
2235 assert_eq!(small_op.decode_inline().unwrap(), vec![0xAAu8; 50]);
2236
2237 let large_op = loaded.operator_states.get("large").unwrap();
2238 assert!(large_op.external, "large state should be external");
2239 assert_eq!(large_op.external_length, 200);
2240
2241 let state_data = coord.store().load_state_data(1).await.unwrap().unwrap();
2243 assert_eq!(state_data.len(), 200);
2244 assert!(state_data.iter().all(|&b| b == 0xBB));
2245 }
2246
2247 #[tokio::test]
2248 async fn test_all_inline_no_sidecar() {
2249 let dir = tempfile::tempdir().unwrap();
2250 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2251 let config = CheckpointConfig::default(); let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
2253
2254 let mut ops = HashMap::new();
2255 ops.insert("op1".into(), bytes::Bytes::from_static(b"small-state"));
2256
2257 let result = coord
2258 .checkpoint(CheckpointRequest {
2259 operator_states: ops,
2260 ..CheckpointRequest::default()
2261 })
2262 .await
2263 .unwrap();
2264 assert!(result.success);
2265
2266 assert!(coord.store().load_state_data(1).await.unwrap().is_none());
2268 }
2269
2270 #[tokio::test]
2273 async fn durability_gate_skipped_when_vnode_set_empty() {
2274 let dir = tempfile::tempdir().unwrap();
2278 let mut coord = make_coordinator(dir.path()).await;
2279 let result = coord
2280 .checkpoint(CheckpointRequest::default())
2281 .await
2282 .unwrap();
2283 assert!(result.success, "baseline checkpoint must succeed");
2284 }
2285
2286 #[tokio::test]
2287 async fn bridge_writes_markers_and_gate_passes() {
2288 use laminar_core::state::InProcessBackend;
2289 let dir = tempfile::tempdir().unwrap();
2290 let mut coord = make_coordinator(dir.path()).await;
2291 let backend = Arc::new(InProcessBackend::new(4));
2292 coord.set_state_backend(backend.clone());
2293 coord.set_vnode_set(vec![0, 1, 2, 3]);
2294
2295 let result = coord
2296 .checkpoint(CheckpointRequest::default())
2297 .await
2298 .unwrap();
2299 assert!(result.success, "bridge writes markers → gate passes");
2300 for v in 0..4 {
2302 assert!(
2303 backend.read_partial(v, 1).await.unwrap().is_some(),
2304 "bridge should have written marker for vnode {v}",
2305 );
2306 }
2307 }
2308
2309 #[cfg(feature = "cluster-unstable")]
2310 #[tokio::test]
2311 async fn reconcile_announces_abort_when_no_decision_store() {
2312 use laminar_core::cluster::control::{
2316 BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase, ANNOUNCEMENT_KEY,
2317 };
2318 use laminar_core::cluster::discovery::NodeId;
2319 use laminar_storage::checkpoint_manifest::SinkCommitStatus;
2320 use tokio::sync::watch;
2321
2322 let dir = tempfile::tempdir().unwrap();
2323 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2324 let mut orphan = CheckpointManifest::new(42, 7);
2325 orphan
2326 .sink_commit_statuses
2327 .insert("kafka_out".into(), SinkCommitStatus::Pending);
2328 store.save_with_state(&orphan, None).await.unwrap();
2329
2330 let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2331 .await
2332 .unwrap();
2333 let self_id = NodeId(1);
2334 let kv = Arc::new(InMemoryKv::new(self_id));
2335 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2336 let (_tx, rx) = watch::channel(Vec::new());
2337 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2338 let mut coord = coord;
2339 coord.set_cluster_controller(controller);
2340
2341 coord.reconcile_prepared_on_init().await;
2342
2343 let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2344 let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
2345 assert_eq!(ann.phase, Phase::Abort);
2346 assert_eq!(ann.epoch, 7);
2347 assert_eq!(ann.checkpoint_id, 42);
2348 }
2349
2350 #[cfg(feature = "cluster-unstable")]
2351 #[tokio::test]
2352 async fn reconcile_announces_commit_when_marker_present() {
2353 use laminar_core::cluster::control::{
2354 BarrierAnnouncement, CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
2355 Phase, ANNOUNCEMENT_KEY,
2356 };
2357 use laminar_core::cluster::discovery::NodeId;
2358 use laminar_storage::checkpoint_manifest::SinkCommitStatus;
2359 use object_store::local::LocalFileSystem;
2360 use tokio::sync::watch;
2361
2362 let ckpt_dir = tempfile::tempdir().unwrap();
2363 let decision_dir = tempfile::tempdir().unwrap();
2364 let store = Box::new(FileSystemCheckpointStore::new(ckpt_dir.path(), 3));
2365 let mut orphan = CheckpointManifest::new(42, 7);
2366 orphan
2367 .sink_commit_statuses
2368 .insert("kafka_out".into(), SinkCommitStatus::Pending);
2369 store.save_with_state(&orphan, None).await.unwrap();
2370
2371 let decision_os: Arc<dyn object_store::ObjectStore> =
2372 Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2373 let decision_store = Arc::new(CheckpointDecisionStore::new(decision_os));
2374 decision_store.record_committed(7).await.unwrap();
2375
2376 let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2377 .await
2378 .unwrap();
2379 let self_id = NodeId(1);
2380 let kv = Arc::new(InMemoryKv::new(self_id));
2381 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2382 let (_tx, rx) = watch::channel(Vec::new());
2383 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2384 let mut coord = coord;
2385 coord.set_cluster_controller(controller);
2386 coord.set_decision_store(decision_store);
2387
2388 coord.reconcile_prepared_on_init().await;
2389
2390 let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2391 let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
2392 assert_eq!(ann.phase, Phase::Commit);
2393 assert_eq!(ann.epoch, 7);
2394 assert_eq!(ann.checkpoint_id, 42);
2395 }
2396
2397 #[cfg(feature = "cluster-unstable")]
2398 #[tokio::test]
2399 async fn reconcile_announces_abort_when_marker_missing() {
2400 use laminar_core::cluster::control::{
2403 BarrierAnnouncement, CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
2404 Phase, ANNOUNCEMENT_KEY,
2405 };
2406 use laminar_core::cluster::discovery::NodeId;
2407 use laminar_storage::checkpoint_manifest::SinkCommitStatus;
2408 use object_store::local::LocalFileSystem;
2409 use tokio::sync::watch;
2410
2411 let ckpt_dir = tempfile::tempdir().unwrap();
2412 let decision_dir = tempfile::tempdir().unwrap();
2413 let store = Box::new(FileSystemCheckpointStore::new(ckpt_dir.path(), 3));
2414 let mut orphan = CheckpointManifest::new(11, 3);
2415 orphan
2416 .sink_commit_statuses
2417 .insert("out".into(), SinkCommitStatus::Pending);
2418 store.save_with_state(&orphan, None).await.unwrap();
2419
2420 let decision_os: Arc<dyn object_store::ObjectStore> =
2421 Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2422 let decision_store = Arc::new(CheckpointDecisionStore::new(decision_os));
2423
2424 let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2425 .await
2426 .unwrap();
2427 let self_id = NodeId(1);
2428 let kv = Arc::new(InMemoryKv::new(self_id));
2429 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2430 let (_tx, rx) = watch::channel(Vec::new());
2431 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2432 let mut coord = coord;
2433 coord.set_cluster_controller(controller);
2434 coord.set_decision_store(decision_store);
2435
2436 coord.reconcile_prepared_on_init().await;
2437
2438 let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2439 let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
2440 assert_eq!(ann.phase, Phase::Abort);
2441 assert_eq!(ann.epoch, 3);
2442 }
2443
2444 #[cfg(feature = "cluster-unstable")]
2445 #[tokio::test]
2446 async fn reconcile_silent_when_manifest_clean() {
2447 use laminar_core::cluster::control::{
2448 ClusterController, ClusterKv, InMemoryKv, ANNOUNCEMENT_KEY,
2449 };
2450 use laminar_core::cluster::discovery::NodeId;
2451 use laminar_storage::checkpoint_manifest::SinkCommitStatus;
2452 use tokio::sync::watch;
2453
2454 let dir = tempfile::tempdir().unwrap();
2455 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2456 let mut clean = CheckpointManifest::new(5, 3);
2457 clean
2458 .sink_commit_statuses
2459 .insert("out".into(), SinkCommitStatus::Committed);
2460 store.save_with_state(&clean, None).await.unwrap();
2461
2462 let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2463 .await
2464 .unwrap();
2465 let self_id = NodeId(1);
2466 let kv = Arc::new(InMemoryKv::new(self_id));
2467 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2468 let (_tx, rx) = watch::channel(Vec::new());
2469 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2470 let mut coord = coord;
2471 coord.set_cluster_controller(controller);
2472
2473 coord.reconcile_prepared_on_init().await;
2474
2475 assert!(kv.read_from(self_id, ANNOUNCEMENT_KEY).await.is_none());
2477 }
2478
2479 #[cfg(feature = "cluster-unstable")]
2480 #[tokio::test]
2481 async fn follower_checkpoint_commits_on_leader_commit() {
2482 use laminar_core::cluster::control::{
2483 BarrierAck, BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase,
2484 ACK_KEY, ANNOUNCEMENT_KEY,
2485 };
2486 use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
2487 use tokio::sync::watch;
2488
2489 let dir = tempfile::tempdir().unwrap();
2490 let mut coord = make_coordinator(dir.path()).await;
2491
2492 let leader_id = NodeId(1);
2493 let follower_id = NodeId(7);
2494
2495 let kv = Arc::new(InMemoryKv::new(follower_id));
2499 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2500 let leader_info = NodeInfo {
2501 id: leader_id,
2502 name: "leader".into(),
2503 rpc_address: String::new(),
2504 raft_address: String::new(),
2505 state: NodeState::Active,
2506 metadata: NodeMetadata::default(),
2507 last_heartbeat_ms: 0,
2508 };
2509 let (_tx, rx) = watch::channel(vec![leader_info]);
2510 let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
2511 coord.set_cluster_controller(controller);
2512
2513 let prepare_json = serde_json::to_string(&BarrierAnnouncement {
2516 epoch: 1,
2517 checkpoint_id: 1,
2518 phase: Phase::Prepare,
2519 flags: 0,
2520 min_watermark_ms: None,
2521 })
2522 .unwrap();
2523 let commit_json = serde_json::to_string(&BarrierAnnouncement {
2524 epoch: 1,
2525 checkpoint_id: 1,
2526 phase: Phase::Commit,
2527 flags: 0,
2528 min_watermark_ms: None,
2529 })
2530 .unwrap();
2531 kv.seed(leader_id, ANNOUNCEMENT_KEY, prepare_json);
2535 kv.seed(leader_id, ANNOUNCEMENT_KEY, commit_json);
2536
2537 let ann = BarrierAnnouncement {
2538 epoch: 1,
2539 checkpoint_id: 1,
2540 phase: Phase::Prepare,
2541 flags: 0,
2542 min_watermark_ms: None,
2543 };
2544 let committed = coord
2545 .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(2))
2546 .await
2547 .unwrap();
2548 assert!(committed, "follower should commit on leader's Commit");
2549
2550 let ack_raw = kv.read_from(follower_id, ACK_KEY).await.unwrap();
2552 let ack: BarrierAck = serde_json::from_str(&ack_raw).unwrap();
2553 assert_eq!(ack.epoch, 1);
2554 assert!(ack.ok, "prepare succeeded, ack should be ok");
2555
2556 let stored = coord.store().load_latest().await.unwrap().unwrap();
2558 assert_eq!(stored.epoch, 1);
2559 }
2560
2561 #[cfg(feature = "cluster-unstable")]
2562 #[tokio::test]
2563 async fn follower_checkpoint_rolls_back_on_leader_abort() {
2564 use laminar_core::cluster::control::{
2565 BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase, ANNOUNCEMENT_KEY,
2566 };
2567 use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
2568 use tokio::sync::watch;
2569
2570 let dir = tempfile::tempdir().unwrap();
2571 let mut coord = make_coordinator(dir.path()).await;
2572
2573 let leader_id = NodeId(1);
2574 let follower_id = NodeId(9);
2575 let kv = Arc::new(InMemoryKv::new(follower_id));
2576 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2577 let leader_info = NodeInfo {
2578 id: leader_id,
2579 name: "leader".into(),
2580 rpc_address: String::new(),
2581 raft_address: String::new(),
2582 state: NodeState::Active,
2583 metadata: NodeMetadata::default(),
2584 last_heartbeat_ms: 0,
2585 };
2586 let (_tx, rx) = watch::channel(vec![leader_info]);
2587 let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
2588 coord.set_cluster_controller(controller);
2589
2590 let abort_json = serde_json::to_string(&BarrierAnnouncement {
2591 epoch: 1,
2592 checkpoint_id: 1,
2593 phase: Phase::Abort,
2594 flags: 0,
2595 min_watermark_ms: None,
2596 })
2597 .unwrap();
2598 kv.seed(leader_id, ANNOUNCEMENT_KEY, abort_json);
2599
2600 let ann = BarrierAnnouncement {
2601 epoch: 1,
2602 checkpoint_id: 1,
2603 phase: Phase::Prepare,
2604 flags: 0,
2605 min_watermark_ms: None,
2606 };
2607 let committed = coord
2608 .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(2))
2609 .await
2610 .unwrap();
2611 assert!(!committed, "follower should roll back on leader's Abort");
2612 }
2613
2614 #[cfg(feature = "cluster-unstable")]
2615 #[tokio::test]
2616 async fn leader_publishes_cluster_min_watermark_to_controller() {
2617 use laminar_core::cluster::control::{
2625 CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
2626 };
2627 use laminar_core::cluster::discovery::NodeId;
2628 use object_store::local::LocalFileSystem;
2629 use tokio::sync::watch;
2630
2631 let dir = tempfile::tempdir().unwrap();
2632 let decision_dir = tempfile::tempdir().unwrap();
2633 let mut coord = make_coordinator(dir.path()).await;
2634
2635 let decision_os: Arc<dyn object_store::ObjectStore> =
2636 Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2637 coord.set_decision_store(Arc::new(CheckpointDecisionStore::new(decision_os)));
2638
2639 let self_id = NodeId(1);
2640 let kv = Arc::new(InMemoryKv::new(self_id));
2641 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2642 let (_tx, rx) = watch::channel(Vec::new());
2643 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2644 coord.set_cluster_controller(Arc::clone(&controller));
2645
2646 assert_eq!(controller.cluster_min_watermark(), None);
2648
2649 coord.set_local_watermark_ms(Some(12_345));
2653 let result = coord
2654 .checkpoint(CheckpointRequest::default())
2655 .await
2656 .unwrap();
2657 assert!(result.success, "solo-cluster checkpoint should succeed");
2658
2659 assert_eq!(
2660 controller.cluster_min_watermark(),
2661 Some(12_345),
2662 "leader must mirror the cluster-wide min into its controller",
2663 );
2664
2665 coord.set_local_watermark_ms(Some(42));
2669 let result = coord
2670 .checkpoint(CheckpointRequest::default())
2671 .await
2672 .unwrap();
2673 assert!(result.success);
2674 assert_eq!(
2675 controller.cluster_min_watermark(),
2676 Some(12_345),
2677 "stale local watermark must not lower the published cluster min",
2678 );
2679 }
2680
2681 #[cfg(feature = "cluster-unstable")]
2682 #[tokio::test]
2683 async fn leader_announces_prepare_and_commit_on_solo_cluster() {
2684 use laminar_core::cluster::control::{
2685 CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv, Phase,
2686 ANNOUNCEMENT_KEY,
2687 };
2688 use laminar_core::cluster::discovery::NodeId;
2689 use object_store::local::LocalFileSystem;
2690 use tokio::sync::watch;
2691
2692 let dir = tempfile::tempdir().unwrap();
2693 let decision_dir = tempfile::tempdir().unwrap();
2694 let mut coord = make_coordinator(dir.path()).await;
2695
2696 let decision_os: Arc<dyn object_store::ObjectStore> =
2697 Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2698 coord.set_decision_store(Arc::new(CheckpointDecisionStore::new(decision_os)));
2699
2700 let self_id = NodeId(1);
2701 let kv = Arc::new(InMemoryKv::new(self_id));
2702 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2703 let (_tx, rx) = watch::channel(Vec::new()); let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2705 coord.set_cluster_controller(controller);
2706
2707 let result = coord
2708 .checkpoint(CheckpointRequest::default())
2709 .await
2710 .unwrap();
2711 assert!(result.success, "solo-cluster checkpoint should succeed");
2712
2713 let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2716 let ann: laminar_core::cluster::control::BarrierAnnouncement =
2717 serde_json::from_str(&raw).unwrap();
2718 assert_eq!(ann.phase, Phase::Commit);
2719 assert_eq!(ann.epoch, result.epoch);
2720 }
2721
2722 #[tokio::test]
2723 async fn gate_checks_full_registry_not_just_owned() {
2724 use laminar_core::state::InProcessBackend;
2729 let dir = tempfile::tempdir().unwrap();
2730 let mut coord = make_coordinator(dir.path()).await;
2731 let backend = Arc::new(InProcessBackend::new(4));
2732 coord.set_state_backend(backend.clone());
2733 coord.set_vnode_set(vec![0, 1]); coord.set_gate_vnode_set(vec![0, 1, 2, 3]); let result = coord
2737 .checkpoint(CheckpointRequest::default())
2738 .await
2739 .unwrap();
2740 assert!(
2741 !result.success,
2742 "gate must fail when follower markers are missing",
2743 );
2744 let err = result.error.expect("failure produces an error message");
2745 assert!(
2746 err.contains("not all vnodes persisted"),
2747 "expected full-registry gate miss, got: {err}",
2748 );
2749 }
2750
2751 #[tokio::test]
2752 async fn gate_passes_when_all_registry_markers_present() {
2753 use bytes::Bytes;
2757 use laminar_core::state::InProcessBackend;
2758 let dir = tempfile::tempdir().unwrap();
2759 let mut coord = make_coordinator(dir.path()).await;
2760 let backend = Arc::new(InProcessBackend::new(4));
2761 backend
2764 .write_partial(2, 1, 0, Bytes::from_static(b"follower"))
2765 .await
2766 .unwrap();
2767 backend
2768 .write_partial(3, 1, 0, Bytes::from_static(b"follower"))
2769 .await
2770 .unwrap();
2771 coord.set_state_backend(backend);
2772 coord.set_vnode_set(vec![0, 1]);
2773 coord.set_gate_vnode_set(vec![0, 1, 2, 3]);
2774
2775 let result = coord
2776 .checkpoint(CheckpointRequest::default())
2777 .await
2778 .unwrap();
2779 assert!(result.success, "gate should pass: every vnode has a marker");
2780 }
2781
2782 #[tokio::test]
2783 async fn marker_write_failure_aborts_checkpoint() {
2784 use laminar_core::state::InProcessBackend;
2785 let dir = tempfile::tempdir().unwrap();
2786 let mut coord = make_coordinator(dir.path()).await;
2787 coord.set_state_backend(Arc::new(InProcessBackend::new(2)));
2790 coord.set_vnode_set(vec![0, 99]);
2791
2792 let result = coord
2793 .checkpoint(CheckpointRequest::default())
2794 .await
2795 .unwrap();
2796 assert!(
2797 !result.success,
2798 "out-of-range vnode must fail the checkpoint"
2799 );
2800 let err = result.error.expect("failure produces an error message");
2801 assert!(err.contains("vnode marker write failed"), "got: {err}");
2802 }
2803
2804 #[tokio::test]
2805 async fn test_stats_include_percentiles_after_checkpoints() {
2806 let dir = tempfile::tempdir().unwrap();
2807 let mut coord = make_coordinator(dir.path()).await;
2808
2809 for _ in 0..3 {
2811 let result = coord
2812 .checkpoint(CheckpointRequest::default())
2813 .await
2814 .unwrap();
2815 assert!(result.success);
2816 }
2817
2818 let stats = coord.stats();
2819 assert_eq!(stats.completed, 3);
2820 assert!(stats.last_duration.is_some());
2823 }
2824
2825 struct FailingPreCommitSink {
2827 rollback_count: Arc<std::sync::atomic::AtomicU64>,
2828 schema: arrow::datatypes::SchemaRef,
2829 }
2830
2831 #[async_trait::async_trait]
2832 impl laminar_connectors::connector::SinkConnector for FailingPreCommitSink {
2833 async fn open(
2834 &mut self,
2835 _config: &laminar_connectors::config::ConnectorConfig,
2836 ) -> Result<(), laminar_connectors::error::ConnectorError> {
2837 Ok(())
2838 }
2839
2840 async fn write_batch(
2841 &mut self,
2842 _batch: &arrow::array::RecordBatch,
2843 ) -> Result<
2844 laminar_connectors::connector::WriteResult,
2845 laminar_connectors::error::ConnectorError,
2846 > {
2847 Ok(laminar_connectors::connector::WriteResult::new(0, 0))
2848 }
2849
2850 async fn pre_commit(
2851 &mut self,
2852 epoch: u64,
2853 ) -> Result<(), laminar_connectors::error::ConnectorError> {
2854 Err(laminar_connectors::error::ConnectorError::TransactionError(
2855 format!("synthetic pre_commit failure at epoch {epoch}"),
2856 ))
2857 }
2858
2859 async fn rollback_epoch(
2860 &mut self,
2861 _epoch: u64,
2862 ) -> Result<(), laminar_connectors::error::ConnectorError> {
2863 self.rollback_count
2864 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2865 Ok(())
2866 }
2867
2868 async fn close(&mut self) -> Result<(), laminar_connectors::error::ConnectorError> {
2869 Ok(())
2870 }
2871
2872 fn schema(&self) -> arrow::datatypes::SchemaRef {
2873 Arc::clone(&self.schema)
2874 }
2875
2876 fn capabilities(&self) -> laminar_connectors::connector::SinkConnectorCapabilities {
2877 laminar_connectors::connector::SinkConnectorCapabilities::new(Duration::from_secs(5))
2878 .with_exactly_once()
2879 .with_two_phase_commit()
2880 }
2881 }
2882
2883 #[tokio::test]
2884 async fn test_pre_commit_failure_triggers_rollback() {
2885 use arrow::datatypes::{DataType, Field, Schema};
2886
2887 let dir = tempfile::tempdir().unwrap();
2888 let mut coord = make_coordinator(dir.path()).await;
2889
2890 let rollback_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
2891 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
2892 let sink = FailingPreCommitSink {
2893 rollback_count: Arc::clone(&rollback_count),
2894 schema,
2895 };
2896 let (event_tx, _event_rx) = laminar_core::streaming::channel::channel::<
2897 crate::sink_task::SinkEvent,
2898 >(crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY);
2899 let handle = crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
2900 name: "failing-sink".into(),
2901 sink_id: Arc::from("failing-sink"),
2902 connector: Box::new(sink),
2903 exactly_once: true,
2904 channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
2905 flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
2906 write_timeout: Duration::from_secs(5),
2907 event_tx,
2908 });
2909 coord.register_sink("failing-sink", handle, true);
2910
2911 coord.begin_initial_epoch().await.unwrap();
2912
2913 let result = coord
2914 .checkpoint(CheckpointRequest::default())
2915 .await
2916 .unwrap();
2917
2918 assert!(!result.success);
2919 assert!(
2920 result
2921 .error
2922 .as_deref()
2923 .is_some_and(|e| e.contains("pre-commit failed")),
2924 "error should mention pre-commit: got {:?}",
2925 result.error
2926 );
2927 assert_eq!(
2928 rollback_count.load(std::sync::atomic::Ordering::Relaxed),
2929 1,
2930 "rollback_epoch should have been called once"
2931 );
2932 }
2933
2934 struct StuckRollbackSink {
2936 schema: arrow::datatypes::SchemaRef,
2937 }
2938
2939 #[async_trait::async_trait]
2940 impl laminar_connectors::connector::SinkConnector for StuckRollbackSink {
2941 async fn open(
2942 &mut self,
2943 _config: &laminar_connectors::config::ConnectorConfig,
2944 ) -> Result<(), laminar_connectors::error::ConnectorError> {
2945 Ok(())
2946 }
2947
2948 async fn write_batch(
2949 &mut self,
2950 _batch: &arrow::array::RecordBatch,
2951 ) -> Result<
2952 laminar_connectors::connector::WriteResult,
2953 laminar_connectors::error::ConnectorError,
2954 > {
2955 Ok(laminar_connectors::connector::WriteResult::new(0, 0))
2956 }
2957
2958 async fn pre_commit(
2959 &mut self,
2960 _epoch: u64,
2961 ) -> Result<(), laminar_connectors::error::ConnectorError> {
2962 Err(laminar_connectors::error::ConnectorError::TransactionError(
2963 "synthetic pre_commit failure".into(),
2964 ))
2965 }
2966
2967 async fn rollback_epoch(
2968 &mut self,
2969 _epoch: u64,
2970 ) -> Result<(), laminar_connectors::error::ConnectorError> {
2971 std::future::pending::<()>().await;
2973 Ok(())
2974 }
2975
2976 async fn close(&mut self) -> Result<(), laminar_connectors::error::ConnectorError> {
2977 Ok(())
2978 }
2979
2980 fn schema(&self) -> arrow::datatypes::SchemaRef {
2981 Arc::clone(&self.schema)
2982 }
2983
2984 fn capabilities(&self) -> laminar_connectors::connector::SinkConnectorCapabilities {
2985 laminar_connectors::connector::SinkConnectorCapabilities::new(Duration::from_secs(5))
2986 .with_exactly_once()
2987 .with_two_phase_commit()
2988 }
2989 }
2990
2991 #[tokio::test(start_paused = true)]
2992 async fn test_rollback_sinks_bounded_by_timeout() {
2993 use arrow::datatypes::{DataType, Field, Schema};
2994
2995 let dir = tempfile::tempdir().unwrap();
2996 let config = CheckpointConfig {
2997 rollback_timeout: Duration::from_millis(100),
2998 ..Default::default()
2999 };
3000 let store = Box::new(
3001 laminar_storage::checkpoint_store::FileSystemCheckpointStore::new(dir.path(), 3),
3002 );
3003 let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
3004
3005 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
3006 let sink = StuckRollbackSink { schema };
3007 let (event_tx, _event_rx) = laminar_core::streaming::channel::channel::<
3008 crate::sink_task::SinkEvent,
3009 >(crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY);
3010 let handle = crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
3011 name: "stuck-sink".into(),
3012 sink_id: Arc::from("stuck-sink"),
3013 connector: Box::new(sink),
3014 exactly_once: true,
3015 channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
3016 flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
3017 write_timeout: Duration::from_secs(5),
3018 event_tx,
3019 });
3020 coord.register_sink("stuck-sink", handle, true);
3021 coord.begin_initial_epoch().await.unwrap();
3022
3023 let result = coord
3026 .checkpoint(CheckpointRequest::default())
3027 .await
3028 .unwrap();
3029
3030 assert!(!result.success);
3031 assert!(
3032 result
3033 .error
3034 .as_deref()
3035 .is_some_and(|e| e.contains("pre-commit failed")),
3036 "checkpoint result should reflect pre-commit failure: got {:?}",
3037 result.error
3038 );
3039 }
3040}