1#![allow(clippy::disallowed_types)] use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use laminar_connectors::checkpoint::SourceCheckpoint;
25use laminar_connectors::connector::SourceConnector;
26use laminar_core::state::StateBackend;
27use laminar_core::storage::checkpoint_manifest::{
28 CheckpointManifest, ConnectorCheckpoint, SinkCommitStatus,
29};
30use laminar_core::storage::checkpoint_store::CheckpointStore;
31use tracing::{debug, error, info, warn};
32
33use crate::error::DbError;
34
35#[derive(Debug, Clone)]
43pub struct CheckpointConfig {
44 pub interval: Option<Duration>,
46 pub max_retained: usize,
48 pub alignment_timeout: Duration,
50 pub pre_commit_timeout: Duration,
52 pub persist_timeout: Duration,
54 pub commit_timeout: Duration,
56 pub rollback_timeout: Duration,
58 pub state_inline_threshold: usize,
60 pub serialization_timeout: Duration,
62 pub max_checkpoint_bytes: Option<usize>,
64 pub quorum_timeout: Duration,
66 pub restorable_gate_timeout: Duration,
72 pub max_in_flight_epochs: u64,
77 pub max_staged_bytes: u64,
81}
82
83impl Default for CheckpointConfig {
84 fn default() -> Self {
85 Self {
86 interval: Some(Duration::from_secs(60)),
87 max_retained: 3,
88 alignment_timeout: Duration::from_secs(30),
89 pre_commit_timeout: Duration::from_secs(30),
90 persist_timeout: Duration::from_secs(120),
91 commit_timeout: Duration::from_secs(60),
92 rollback_timeout: Duration::from_secs(30),
93 serialization_timeout: Duration::from_secs(120),
94 state_inline_threshold: 1_048_576,
95 max_checkpoint_bytes: None,
96 quorum_timeout: Duration::from_secs(3),
97 restorable_gate_timeout: Duration::from_secs(10),
101 max_in_flight_epochs: 4,
102 max_staged_bytes: 512 * 1024 * 1024,
103 }
104 }
105}
106
107#[derive(Debug, Clone, Default)]
109pub struct CheckpointRequest {
110 pub operator_states: HashMap<String, bytes::Bytes>,
114 pub watermark: Option<i64>,
116 pub table_store_checkpoint_path: Option<String>,
118 pub extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
120 pub source_watermarks: HashMap<String, i64>,
122 pub pipeline_hash: Option<u64>,
124 pub source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
126}
127
128#[derive(Debug)]
135pub(crate) struct EpochAllocator {
136 epoch: std::sync::atomic::AtomicU64,
137 next_checkpoint_id: std::sync::atomic::AtomicU64,
138}
139
140impl EpochAllocator {
141 fn new(epoch: u64, next_checkpoint_id: u64) -> Self {
142 Self {
143 epoch: std::sync::atomic::AtomicU64::new(epoch),
144 next_checkpoint_id: std::sync::atomic::AtomicU64::new(next_checkpoint_id),
145 }
146 }
147
148 pub(crate) fn allocate(&self) -> (u64, u64) {
156 use std::sync::atomic::Ordering;
157 (
158 self.epoch.fetch_add(1, Ordering::AcqRel),
159 self.next_checkpoint_id.fetch_add(1, Ordering::AcqRel),
160 )
161 }
162
163 pub(crate) fn peek(&self) -> (u64, u64) {
165 use std::sync::atomic::Ordering;
166 (
167 self.epoch.load(Ordering::Acquire),
168 self.next_checkpoint_id.load(Ordering::Acquire),
169 )
170 }
171
172 fn advance_to(&self, epoch: u64, next_checkpoint_id: u64) {
177 use std::sync::atomic::Ordering;
178 self.epoch.fetch_max(epoch, Ordering::AcqRel);
179 self.next_checkpoint_id
180 .fetch_max(next_checkpoint_id, Ordering::AcqRel);
181 }
182}
183
184#[cfg(feature = "cluster")]
188pub(crate) type QuorumPeer = laminar_core::cluster::discovery::NodeId;
189#[cfg(not(feature = "cluster"))]
190pub(crate) type QuorumPeer = u64;
191
192#[derive(Debug, Clone)]
196pub(crate) enum QuorumStage {
197 RunInline,
199 #[cfg_attr(not(feature = "cluster"), allow(dead_code))]
204 Done {
205 min_watermark_ms: Option<i64>,
207 participants: Vec<QuorumPeer>,
209 },
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
214pub enum CheckpointPhase {
215 Idle,
217 Snapshotting,
219 PreCommitting,
221 Persisting,
223 Committing,
225}
226
227impl std::fmt::Display for CheckpointPhase {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 match self {
230 Self::Idle => write!(f, "Idle"),
231 Self::Snapshotting => write!(f, "Snapshotting"),
232 Self::PreCommitting => write!(f, "PreCommitting"),
233 Self::Persisting => write!(f, "Persisting"),
234 Self::Committing => write!(f, "Committing"),
235 }
236 }
237}
238
239#[derive(Debug, serde::Serialize, serde::Deserialize)]
241pub struct CheckpointResult {
242 pub success: bool,
244 pub checkpoint_id: u64,
246 pub epoch: u64,
248 pub duration: Duration,
250 pub error: Option<String>,
252}
253
254pub(crate) struct RegisteredSource {
256 pub name: String,
258 pub connector: Arc<tokio::sync::Mutex<Box<dyn SourceConnector>>>,
260 pub supports_replay: bool,
265}
266
267pub(crate) struct RegisteredSink {
269 pub name: String,
271 pub handle: crate::sink_task::SinkTaskHandle,
273 pub exactly_once: bool,
275}
276
277pub struct CheckpointCoordinator {
283 config: CheckpointConfig,
284 store: Arc<dyn CheckpointStore>,
285 sinks: Vec<RegisteredSink>,
286 allocator: Arc<EpochAllocator>,
289 phase: CheckpointPhase,
290 checkpoints_completed: u64,
291 checkpoints_failed: u64,
292 last_checkpoint_duration: Option<Duration>,
293 duration_histogram: DurationHistogram,
294 prom: Option<Arc<crate::engine_metrics::EngineMetrics>>,
295 total_bytes_written: u64,
296 state_backend: Option<Arc<dyn StateBackend>>,
299 assignment_version: u64,
302 decision_store: Option<Arc<laminar_core::checkpoint_decision::CheckpointDecisionStore>>,
306 local_watermark_ms: Option<i64>,
309 #[cfg(feature = "cluster")]
312 cluster_min_watermark: Option<i64>,
313 vnode_set: Vec<u32>,
315 gate_vnode_set: Vec<u32>,
318 rotation_epoch_floor: u64,
325 #[allow(clippy::disallowed_types)] pending_vnode_states:
333 std::collections::HashMap<u32, std::collections::HashMap<String, bytes::Bytes>>,
334 #[allow(clippy::disallowed_types)]
339 last_vnode_uploads:
340 std::collections::HashMap<u32, (u64, std::collections::HashMap<String, bytes::Bytes>)>,
341 #[cfg(feature = "cluster")]
343 cluster_controller: Option<Arc<laminar_core::cluster::control::ClusterController>>,
344 cached_sorted_sink_names: Option<Vec<String>>,
346}
347
348async fn load_highest(
350 store: &dyn CheckpointStore,
351) -> Result<Option<CheckpointManifest>, laminar_core::storage::checkpoint_store::CheckpointStoreError>
352{
353 let ids = store.list_ids().await?;
354 for id in ids.iter().rev() {
355 if let Ok(Some(m)) = store.load_by_id(*id).await {
356 return Ok(Some(m));
357 }
358 }
359 Ok(None)
360}
361
362impl CheckpointCoordinator {
363 pub async fn new(
370 config: CheckpointConfig,
371 store: Box<dyn CheckpointStore>,
372 ) -> Result<Self, DbError> {
373 let store: Arc<dyn CheckpointStore> = Arc::from(store);
374 let highest = load_highest(store.as_ref()).await.map_err(|e| {
375 DbError::Checkpoint(format!(
376 "[LDB-6028] failed to list checkpoints at coordinator \
377 construction: {e} — refusing to start at epoch 1 and \
378 clobber existing on-disk state"
379 ))
380 })?;
381 let (next_id, epoch) = match highest.as_ref() {
382 Some(m) => (m.checkpoint_id + 1, m.epoch + 1),
383 None => (1, 1),
384 };
385
386 Ok(Self {
387 config,
388 store,
389 sinks: Vec::new(),
390 allocator: Arc::new(EpochAllocator::new(epoch, next_id)),
391 phase: CheckpointPhase::Idle,
392 checkpoints_completed: 0,
393 checkpoints_failed: 0,
394 last_checkpoint_duration: None,
395 duration_histogram: DurationHistogram::new(),
396 prom: None,
397 total_bytes_written: 0,
398 state_backend: None,
399 assignment_version: 0,
400 decision_store: None,
401 local_watermark_ms: None,
402 #[cfg(feature = "cluster")]
403 cluster_min_watermark: None,
404 vnode_set: Vec::new(),
405 gate_vnode_set: Vec::new(),
406 rotation_epoch_floor: 0,
407 pending_vnode_states: std::collections::HashMap::new(),
408 last_vnode_uploads: std::collections::HashMap::new(),
409 #[cfg(feature = "cluster")]
410 cluster_controller: None,
411 cached_sorted_sink_names: None,
412 })
413 }
414
415 #[cfg(feature = "cluster")]
418 pub fn set_cluster_controller(
419 &mut self,
420 controller: Arc<laminar_core::cluster::control::ClusterController>,
421 ) {
422 self.cluster_controller = Some(controller);
423 }
424
425 pub fn set_state_backend(&mut self, backend: Arc<dyn StateBackend>) {
428 self.state_backend = Some(backend);
429 }
430
431 pub fn set_decision_store(
433 &mut self,
434 store: Arc<laminar_core::checkpoint_decision::CheckpointDecisionStore>,
435 ) {
436 self.decision_store = Some(store);
437 }
438
439 pub fn set_assignment_version(&mut self, version: u64) {
444 self.assignment_version = version;
445 }
446
447 pub fn set_local_watermark_ms(&mut self, watermark: Option<i64>) {
453 self.local_watermark_ms = watermark;
454 }
455
456 #[allow(clippy::disallowed_types)]
464 pub fn set_pending_vnode_states(
465 &mut self,
466 states: std::collections::HashMap<u32, std::collections::HashMap<String, bytes::Bytes>>,
467 ) {
468 self.pending_vnode_states = states;
469 }
470
471 pub fn set_vnode_set(&mut self, vnodes: Vec<u32>) {
474 if self.gate_vnode_set.is_empty() {
475 self.gate_vnode_set.clone_from(&vnodes);
476 }
477 self.rotation_epoch_floor = self.allocator.peek().0;
478 self.last_vnode_uploads.retain(|v, _| vnodes.contains(v));
482 self.vnode_set = vnodes;
483 }
484
485 pub fn set_gate_vnode_set(&mut self, vnodes: Vec<u32>) {
488 self.gate_vnode_set = vnodes;
489 }
490
491 pub(crate) fn register_sink(
493 &mut self,
494 name: impl Into<String>,
495 handle: crate::sink_task::SinkTaskHandle,
496 exactly_once: bool,
497 ) {
498 self.sinks.push(RegisteredSink {
499 name: name.into(),
500 handle,
501 exactly_once,
502 });
503 self.cached_sorted_sink_names = None;
506 }
507
508 pub async fn begin_initial_epoch(&self) -> Result<(), DbError> {
519 self.begin_epoch_for_sinks(self.allocator.peek().0).await
520 }
521
522 pub(crate) fn epoch_allocator(&self) -> Arc<EpochAllocator> {
525 Arc::clone(&self.allocator)
526 }
527
528 async fn begin_epoch_for_sinks(&self, epoch: u64) -> Result<(), DbError> {
531 let mut started: Vec<&RegisteredSink> = Vec::new();
532 for sink in &self.sinks {
533 if sink.exactly_once {
534 match sink.handle.begin_epoch(epoch).await {
535 Ok(()) => {
536 started.push(sink);
537 debug!(sink = %sink.name, epoch, "began epoch");
538 }
539 Err(e) => {
540 for s in &started {
542 if let Err(re) = s.handle.rollback_epoch(epoch).await {
543 error!(sink = %s.name, epoch, error = %re,
544 "[LDB-6016] sink rollback failed during begin_epoch recovery");
545 }
546 }
547 return Err(DbError::Checkpoint(format!(
548 "sink '{}' failed to begin epoch {epoch}: {e}",
549 sink.name
550 )));
551 }
552 }
553 }
554 }
555 Ok(())
556 }
557
558 pub fn set_metrics(&mut self, prom: Arc<crate::engine_metrics::EngineMetrics>) {
560 self.prom = Some(prom);
561 }
562
563 fn emit_checkpoint_metrics(&self, success: bool, epoch: u64, duration: Duration) {
565 if let Some(ref m) = self.prom {
566 if success {
567 m.checkpoints_completed.inc();
568 } else {
569 m.checkpoints_failed.inc();
570 }
571 #[allow(clippy::cast_possible_wrap)]
572 m.checkpoint_epoch.set(epoch as i64);
573 m.checkpoint_duration.observe(duration.as_secs_f64());
574 }
575 }
576
577 pub async fn checkpoint(
586 &mut self,
587 request: CheckpointRequest,
588 ) -> Result<CheckpointResult, DbError> {
589 self.checkpoint_inner(request, None, QuorumStage::RunInline)
590 .await
591 }
592
593 async fn pre_commit_sinks(&self, epoch: u64) -> Result<(), DbError> {
598 let timeout_dur = self.config.pre_commit_timeout;
599 let start = std::time::Instant::now();
600
601 let result =
602 match tokio::time::timeout(timeout_dur, self.pre_commit_sinks_inner(epoch)).await {
603 Ok(result) => result,
604 Err(_elapsed) => Err(DbError::Checkpoint(format!(
605 "pre-commit timed out after {}s",
606 timeout_dur.as_secs()
607 ))),
608 };
609
610 if let Some(ref m) = self.prom {
612 m.sink_precommit_duration
613 .observe(start.elapsed().as_secs_f64());
614 }
615
616 result
617 }
618
619 async fn pre_commit_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
630 let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
631 let handle = sink.handle.clone();
632 let name = sink.name.clone();
633 async move {
634 let result = handle.pre_commit(epoch).await;
635 match result {
636 Ok(()) => {
637 debug!(sink = %name, epoch, "sink pre-committed");
638 Ok(())
639 }
640 Err(e) => Err(DbError::Checkpoint(format!(
641 "sink '{name}' pre-commit failed: {e}"
642 ))),
643 }
644 }
645 });
646 futures::future::try_join_all(futures).await.map(|_| ())
647 }
648
649 async fn commit_sinks_tracked(&self, epoch: u64) -> HashMap<String, SinkCommitStatus> {
655 let timeout_dur = self.config.commit_timeout;
656 let start = std::time::Instant::now();
657
658 let tasks: Vec<_> = self
659 .sinks
660 .iter()
661 .filter(|s| s.exactly_once)
662 .map(|sink| {
663 let handle = sink.handle.clone();
664 let name = sink.name.clone();
665 let task = tokio::spawn(async move {
666 tokio::time::timeout(timeout_dur, handle.commit_epoch(epoch)).await
667 });
668 (name, task)
669 })
670 .collect();
671
672 let mut statuses = HashMap::new();
673 for (name, task) in tasks {
674 let status = match task.await {
675 Ok(Ok(Ok(()))) => SinkCommitStatus::Committed,
676 Ok(Ok(Err(e))) => {
677 error!(sink = %name, epoch, error = %e, "sink commit failed");
678 SinkCommitStatus::Failed(format!("sink '{name}' commit failed: {e}"))
679 }
680 Ok(Err(_)) => {
681 error!(
682 sink = %name, epoch,
683 timeout_secs = timeout_dur.as_secs(),
684 "[LDB-6012] sink commit timed out",
685 );
686 SinkCommitStatus::Failed(format!(
687 "sink '{name}' commit timed out after {}s",
688 timeout_dur.as_secs()
689 ))
690 }
691 Err(join_err) => {
692 error!(sink = %name, epoch, error = %join_err, "sink commit task panicked");
693 SinkCommitStatus::Failed(format!("sink '{name}' commit panicked: {join_err}"))
694 }
695 };
696 statuses.insert(name, status);
697 }
698
699 if let Some(ref m) = self.prom {
700 m.sink_commit_duration
701 .observe(start.elapsed().as_secs_f64());
702 }
703 statuses
704 }
705
706 async fn save_manifest(
716 &self,
717 manifest: Arc<CheckpointManifest>,
718 state_data: Option<Vec<bytes::Bytes>>,
719 ) -> Result<(), DbError> {
720 let timeout_dur = self.config.persist_timeout;
721 let fut = self.store.save_with_state(&manifest, state_data.as_deref());
722 match tokio::time::timeout(timeout_dur, fut).await {
723 Ok(Ok(())) => Ok(()),
724 Ok(Err(e)) => Err(DbError::from(e)),
725 Err(_elapsed) => Err(DbError::Checkpoint(format!(
726 "[LDB-6011] manifest persist timed out after {}s — \
727 filesystem may be degraded",
728 timeout_dur.as_secs()
729 ))),
730 }
731 }
732
733 async fn write_vnode_partials(
753 &mut self,
754 epoch: u64,
755 checkpoint_id: u64,
756 ) -> Result<(), DbError> {
757 let Some(ref backend) = self.state_backend else {
758 return Ok(());
759 };
760 if self.vnode_set.is_empty() {
761 return Ok(());
762 }
763 let caller_version = self.assignment_version;
767 let max_ref_age = (self.config.max_retained as u64).max(1);
768
769 let mut full_uploads: Vec<(u32, std::collections::HashMap<String, bytes::Bytes>)> =
772 Vec::new();
773 let mut emptied: Vec<u32> = Vec::new();
774 let mut reference_count: u64 = 0;
775 let mut encoded: Vec<(u32, bytes::Bytes)> = Vec::with_capacity(self.vnode_set.len());
776 for &v in &self.vnode_set {
777 let ops = self.pending_vnode_states.get(&v);
778 let base = ops.filter(|ops| !ops.is_empty()).and_then(|ops| {
779 self.last_vnode_uploads
780 .get(&v)
781 .filter(|(base, last)| epoch.saturating_sub(*base) < max_ref_age && last == ops)
782 .map(|(base, _)| *base)
783 });
784 let partial = if let Some(base_epoch) = base {
785 reference_count += 1;
786 crate::vnode_partial::VnodePartial {
787 checkpoint_id,
788 operators: Vec::new(),
789 base_epoch: Some(base_epoch),
790 }
791 } else {
792 match ops {
793 Some(ops) if !ops.is_empty() => full_uploads.push((v, ops.clone())),
794 _ => emptied.push(v),
795 }
796 crate::vnode_partial::VnodePartial {
797 checkpoint_id,
798 operators: ops
799 .map(|ops| ops.iter().map(|(n, b)| (n.clone(), b.to_vec())).collect())
800 .unwrap_or_default(),
801 base_epoch: None,
802 }
803 };
804 encoded.push((v, bytes::Bytes::from(partial.encode()?)));
805 }
806
807 let writes = encoded.into_iter().map(|(v, payload)| {
809 let backend = Arc::clone(backend);
810 async move {
811 backend
812 .write_partial(v, epoch, caller_version, payload)
813 .await
814 .map_err(|e| {
815 DbError::Checkpoint(format!(
816 "[LDB-6024] vnode partial write failed (vnode={v}, epoch={epoch}): {e}"
817 ))
818 })
819 }
820 });
821 futures::future::try_join_all(writes).await?;
822
823 for (v, ops) in full_uploads {
825 self.last_vnode_uploads.insert(v, (epoch, ops));
826 }
827 for v in emptied {
828 self.last_vnode_uploads.remove(&v);
829 }
830 if reference_count > 0 {
831 if let Some(ref m) = self.prom {
832 m.checkpoint_unchanged_vnodes.inc_by(reference_count);
833 }
834 }
835 Ok(())
836 }
837
838 async fn await_restorable_gate(
848 &self,
849 epoch: u64,
850 participants: &[QuorumPeer],
851 ) -> Result<(), String> {
852 use laminar_core::state::StateBackendError;
853
854 const INITIAL_POLL: Duration = Duration::from_millis(100);
862 const MAX_POLL: Duration = Duration::from_secs(1);
863
864 let Some(ref backend) = self.state_backend else {
865 return Ok(());
866 };
867 if self.gate_vnode_set.is_empty() {
868 return Ok(());
869 }
870
871 let deadline = Instant::now() + self.config.restorable_gate_timeout;
872 let mut interval = INITIAL_POLL;
873 let mut last_state = String::from("not all vnodes persisted");
874 loop {
875 if epoch < self.rotation_epoch_floor {
876 return Err(format!(
877 "vnode assignment rotated after epoch {epoch} captured \
878 (rotation floor {}); epoch cannot seal",
879 self.rotation_epoch_floor
880 ));
881 }
882 match backend.epoch_complete(epoch, &self.gate_vnode_set).await {
883 Ok(true) => return Ok(()),
884 Ok(false) => {}
888 Err(e @ StateBackendError::SplitBrainCommit { .. }) => {
889 return Err(format!("state durability gate: {e}"));
890 }
891 Err(e) => {
892 debug!(epoch, error = %e, "durability gate poll error; retrying");
894 last_state = e.to_string();
895 }
896 }
897 #[cfg(feature = "cluster")]
902 if let Some(cc) = self.cluster_controller.as_ref() {
903 if let Some(reason) =
904 Self::unhealthy_participant(&cc.members_watch().borrow(), participants)
905 {
906 return Err(format!("durability gate fail-fast: {reason}"));
907 }
908 if let Some(p) = participants
909 .iter()
910 .find(|p| cc.is_recently_unresponsive(**p))
911 {
912 return Err(format!(
913 "durability gate fail-fast: follower {} missed a capture quorum",
914 p.0
915 ));
916 }
917 }
918 #[cfg(not(feature = "cluster"))]
919 let _ = participants;
920 if Instant::now() >= deadline {
921 return Err(format!(
922 "state durability gate timed out after {:?}: {last_state}",
923 self.config.restorable_gate_timeout
924 ));
925 }
926 tokio::time::sleep(interval).await;
927 interval = (interval * 2).min(MAX_POLL);
928 }
929 }
930
931 async fn fail_epoch(
939 &mut self,
940 checkpoint_id: u64,
941 epoch: u64,
942 started: Instant,
943 error: String,
944 ) -> CheckpointResult {
945 #[cfg(feature = "cluster")]
946 self.announce_if_leader(
947 epoch,
948 checkpoint_id,
949 laminar_core::cluster::control::Phase::Abort,
950 None,
951 )
952 .await;
953 self.checkpoints_failed += 1;
954 self.phase = CheckpointPhase::Idle;
955 let duration = started.elapsed();
956 self.emit_checkpoint_metrics(false, epoch, duration);
957 if let Err(e) = self.rollback_sinks(epoch).await {
958 error!(
959 checkpoint_id, epoch, error = %e,
960 "[LDB-6004] sink rollback failed after checkpoint failure",
961 );
962 }
963 self.begin_next_epoch_bounded().await;
964 self.pending_vnode_states.clear();
965 CheckpointResult {
966 success: false,
967 checkpoint_id,
968 epoch,
969 duration,
970 error: Some(error),
971 }
972 }
973
974 async fn begin_next_epoch_bounded(&self) {
980 let next_epoch = self.allocator.peek().0;
981 match tokio::time::timeout(
982 self.config.rollback_timeout,
983 self.begin_epoch_for_sinks(next_epoch),
984 )
985 .await
986 {
987 Ok(Ok(())) => {}
988 Ok(Err(e)) => error!(
989 next_epoch, error = %e,
990 "[LDB-6015] failed to begin next epoch after abandoning a \
991 failed one — writes will be non-transactional",
992 ),
993 Err(_) => error!(
994 next_epoch,
995 timeout_secs = self.config.rollback_timeout.as_secs(),
996 "[LDB-6015] begin next epoch timed out after a failed \
997 checkpoint — writes will be non-transactional",
998 ),
999 }
1000 }
1001
1002 pub async fn reconcile_prepared_on_init(&self) {
1008 let last = match load_highest(self.store.as_ref()).await {
1009 Ok(Some(m)) => m,
1010 Ok(None) => return,
1011 Err(e) => {
1012 error!(error = %e, "[LDB-6041] reconcile skipped: could not load checkpoints");
1013 return;
1014 }
1015 };
1016 let has_pending = last
1017 .sink_commit_statuses
1018 .values()
1019 .any(|s| matches!(s, SinkCommitStatus::Pending));
1020 if !has_pending {
1021 return;
1022 }
1023
1024 let epoch = last.epoch;
1025 let checkpoint_id = last.checkpoint_id;
1026
1027 let committed = match self.decision_store.as_ref() {
1028 Some(ds) => ds.is_committed(epoch).await.unwrap_or_else(|e| {
1029 warn!(
1030 epoch, checkpoint_id, error = %e,
1031 "[LDB-6040] decision store read failed — defaulting to Abort",
1032 );
1033 false
1034 }),
1035 None => false,
1036 };
1037
1038 #[cfg(feature = "cluster")]
1039 let is_leader = self
1040 .cluster_controller
1041 .as_ref()
1042 .is_some_and(|cc| cc.is_leader());
1043
1044 if committed {
1045 info!(
1046 epoch,
1047 checkpoint_id, "recovering Pending epoch as Committed"
1048 );
1049 let statuses = self.commit_sinks_tracked(epoch).await;
1050 if let Err(e) = self
1051 .persist_recovered_statuses(checkpoint_id, statuses)
1052 .await
1053 {
1054 warn!(epoch, checkpoint_id, error = %e, "post-recovery manifest update failed");
1055 }
1056 #[cfg(feature = "cluster")]
1057 if is_leader {
1058 self.announce_if_leader(
1059 epoch,
1060 checkpoint_id,
1061 laminar_core::cluster::control::Phase::Commit,
1062 None,
1063 )
1064 .await;
1065 }
1066 } else {
1067 warn!(
1068 epoch,
1069 checkpoint_id, "[LDB-6035] Pending epoch with no commit marker — rolling back",
1070 );
1071 if let Err(e) = self.rollback_sinks(epoch).await {
1072 error!(epoch, checkpoint_id, error = %e, "sink rollback failed during recovery");
1073 }
1074 #[cfg(feature = "cluster")]
1075 if is_leader {
1076 self.announce_if_leader(
1077 epoch,
1078 checkpoint_id,
1079 laminar_core::cluster::control::Phase::Abort,
1080 None,
1081 )
1082 .await;
1083 }
1084 }
1085
1086 #[cfg(feature = "cluster")]
1088 if is_leader {
1089 tokio::time::sleep(Duration::from_millis(200)).await;
1090 }
1091 }
1092
1093 async fn persist_recovered_statuses(
1096 &self,
1097 checkpoint_id: u64,
1098 statuses: HashMap<String, SinkCommitStatus>,
1099 ) -> Result<(), DbError> {
1100 if statuses.is_empty() {
1101 return Ok(());
1102 }
1103 match self.store.load_by_id(checkpoint_id).await {
1104 Ok(Some(mut m)) => {
1105 m.sink_commit_statuses = statuses;
1106 self.update_manifest_only(Arc::new(m)).await
1107 }
1108 Ok(None) => Ok(()),
1109 Err(e) => Err(DbError::from(e)),
1110 }
1111 }
1112
1113 #[cfg(feature = "cluster")]
1122 async fn announce_if_leader(
1123 &self,
1124 epoch: u64,
1125 checkpoint_id: u64,
1126 phase: laminar_core::cluster::control::Phase,
1127 min_watermark_ms: Option<i64>,
1128 ) {
1129 let Some(cc) = self.cluster_controller.as_ref() else {
1130 return;
1131 };
1132 if !cc.is_leader() {
1133 return;
1134 }
1135 let ann = laminar_core::cluster::control::BarrierAnnouncement {
1136 epoch,
1137 checkpoint_id,
1138 phase,
1139 flags: 0,
1140 min_watermark_ms,
1141 };
1142 if let Err(e) = cc.announce_barrier(&ann).await {
1143 warn!(
1144 epoch,
1145 checkpoint_id,
1146 ?phase,
1147 error = %e,
1148 "[LDB-6031] barrier announcement failed",
1149 );
1150 }
1151 }
1152
1153 #[cfg(feature = "cluster")]
1160 async fn await_prepare_quorum(
1161 &mut self,
1162 epoch: u64,
1163 checkpoint_id: u64,
1164 ) -> Result<Vec<laminar_core::cluster::discovery::NodeId>, String> {
1165 use laminar_core::cluster::control::Phase;
1166 let Some(cc) = self.cluster_controller.clone() else {
1167 return Ok(Vec::new());
1168 };
1169 if !cc.is_leader() {
1170 return Ok(Vec::new());
1171 }
1172 match Self::run_prepare_quorum(
1173 &cc,
1174 self.config.quorum_timeout,
1175 epoch,
1176 checkpoint_id,
1177 self.local_watermark_ms,
1178 )
1179 .await
1180 {
1181 Ok((merged, participants)) => {
1182 self.cluster_min_watermark = merged;
1183 Ok(participants)
1184 }
1185 Err(msg) => {
1186 self.announce_if_leader(epoch, checkpoint_id, Phase::Abort, None)
1187 .await;
1188 Err(msg)
1189 }
1190 }
1191 }
1192
1193 #[cfg(feature = "cluster")]
1198 fn unhealthy_participant(
1199 members: &[laminar_core::cluster::discovery::NodeInfo],
1200 participants: &[QuorumPeer],
1201 ) -> Option<String> {
1202 use laminar_core::cluster::discovery::NodeState;
1203 for &id in participants {
1204 match members.iter().find(|m| m.id.0 == id.0) {
1205 Some(node)
1206 if matches!(
1207 node.state,
1208 NodeState::Suspected | NodeState::Left | NodeState::Draining
1209 ) =>
1210 {
1211 return Some(format!(
1212 "Follower {} transitioned to unhealthy state {:?}",
1213 id.0, node.state
1214 ));
1215 }
1216 Some(_) => {}
1217 None => {
1218 return Some(format!("Follower {} missing from cluster membership", id.0));
1219 }
1220 }
1221 }
1222 None
1223 }
1224
1225 #[cfg(feature = "cluster")]
1233 pub(crate) async fn run_prepare_quorum(
1234 cc: &laminar_core::cluster::control::ClusterController,
1235 quorum_timeout: Duration,
1236 epoch: u64,
1237 checkpoint_id: u64,
1238 local_watermark_ms: Option<i64>,
1239 ) -> Result<(Option<i64>, Vec<laminar_core::cluster::discovery::NodeId>), String> {
1240 use laminar_core::cluster::control::{BarrierAnnouncement, Phase, QuorumOutcome};
1241
1242 if let Err(e) = cc
1243 .announce_barrier(&BarrierAnnouncement {
1244 epoch,
1245 checkpoint_id,
1246 phase: Phase::Prepare,
1247 flags: 0,
1248 min_watermark_ms: None,
1249 })
1250 .await
1251 {
1252 warn!(epoch, checkpoint_id, error = %e, "[LDB-6031] prepare announcement failed");
1253 }
1254
1255 let mut followers = cc.live_instances();
1256 followers.retain(|id| *id != cc.instance_id());
1257 if followers.is_empty() {
1258 if let Some(wm) = local_watermark_ms {
1261 cc.publish_cluster_min_watermark(wm);
1262 }
1263 return Ok((local_watermark_ms, Vec::new()));
1264 }
1265
1266 let mut members_rx = cc.members_watch();
1267
1268 let quorum_fut = cc.wait_for_quorum(epoch, &followers, quorum_timeout);
1269 let membership_fut = async {
1270 loop {
1271 if let Some(reason) = Self::unhealthy_participant(&members_rx.borrow(), &followers)
1272 {
1273 return reason;
1274 }
1275 if members_rx.changed().await.is_err() {
1276 futures::future::pending::<()>().await;
1282 }
1283 }
1284 };
1285
1286 let outcome = tokio::select! {
1287 o = quorum_fut => Ok(o),
1288 e = membership_fut => Err(e),
1289 };
1290
1291 match outcome {
1292 Ok(QuorumOutcome::Reached {
1293 min_follower_watermark_ms,
1294 ref acks,
1295 }) => {
1296 cc.note_responsive(acks);
1298 let merged = match (local_watermark_ms, min_follower_watermark_ms) {
1300 (Some(a), Some(b)) => Some(a.min(b)),
1301 (Some(a), None) => Some(a),
1302 (None, Some(b)) => Some(b),
1303 (None, None) => None,
1304 };
1305 if let Some(wm) = merged {
1306 cc.publish_cluster_min_watermark(wm);
1307 }
1308 Ok((merged, followers))
1309 }
1310 Ok(QuorumOutcome::TimedOut { missing, .. }) => {
1311 cc.note_unresponsive(&missing);
1316 Err(format!(
1317 "quorum timeout: {} follower(s) did not ack",
1318 missing.len()
1319 ))
1320 }
1321 Ok(QuorumOutcome::Failed { failures }) => {
1322 let first = failures.first().map_or("unknown", |(_, msg)| msg.as_str());
1323 Err(format!(
1324 "follower snapshot failed on {} peer(s): {first}",
1325 failures.len()
1326 ))
1327 }
1328 Err(err_msg) => Err(format!("fail-fast: {err_msg}")),
1329 }
1330 }
1331
1332 async fn update_manifest_only(&self, manifest: Arc<CheckpointManifest>) -> Result<(), DbError> {
1336 let timeout_dur = self.config.persist_timeout;
1337 let fut = self.store.update_manifest(&manifest);
1338 match tokio::time::timeout(timeout_dur, fut).await {
1339 Ok(Ok(())) => Ok(()),
1340 Ok(Err(e)) => Err(DbError::from(e)),
1341 Err(_elapsed) => Err(DbError::Checkpoint(format!(
1342 "manifest update timed out after {}s",
1343 timeout_dur.as_secs()
1344 ))),
1345 }
1346 }
1347
1348 fn initial_sink_commit_statuses(&self) -> HashMap<String, SinkCommitStatus> {
1350 self.sinks
1351 .iter()
1352 .filter(|s| s.exactly_once)
1353 .map(|s| (s.name.clone(), SinkCommitStatus::Pending))
1354 .collect()
1355 }
1356
1357 fn pack_operator_states(
1366 manifest: &mut CheckpointManifest,
1367 operator_states: &HashMap<String, bytes::Bytes>,
1368 threshold: usize,
1369 ) -> Option<Vec<bytes::Bytes>> {
1370 let mut sidecar_chunks: Vec<bytes::Bytes> = Vec::new();
1371 let mut offset: u64 = 0;
1372 for (name, data) in operator_states {
1373 let (op_ckpt, maybe_blob) =
1374 laminar_core::storage::checkpoint_manifest::OperatorCheckpoint::from_bytes_shared(
1375 data.clone(),
1376 threshold,
1377 offset,
1378 );
1379 if let Some(blob) = maybe_blob {
1380 offset += blob.len() as u64;
1381 sidecar_chunks.push(blob);
1382 }
1383 manifest.operator_states.insert(name.clone(), op_ckpt);
1384 }
1385
1386 if sidecar_chunks.is_empty() {
1387 None
1388 } else {
1389 Some(sidecar_chunks)
1390 }
1391 }
1392
1393 async fn rollback_sinks(&self, epoch: u64) -> Result<(), DbError> {
1396 let timeout_dur = self.config.rollback_timeout;
1397 match tokio::time::timeout(timeout_dur, self.rollback_sinks_inner(epoch)).await {
1398 Ok(result) => result,
1399 Err(_elapsed) => {
1400 error!(
1401 epoch,
1402 timeout_secs = timeout_dur.as_secs(),
1403 "[LDB-6016] sink rollback timed out"
1404 );
1405 Err(DbError::Checkpoint(format!(
1406 "rollback timed out after {}s",
1407 timeout_dur.as_secs()
1408 )))
1409 }
1410 }
1411 }
1412
1413 async fn rollback_sinks_inner(&self, epoch: u64) -> Result<(), DbError> {
1414 let futures = self.sinks.iter().filter(|s| s.exactly_once).map(|sink| {
1415 let handle = sink.handle.clone();
1416 let name = sink.name.clone();
1417 async move {
1418 let result = handle.abandon_epoch(epoch).await;
1422 (name, result)
1423 }
1424 });
1425 let results = futures::future::join_all(futures).await;
1426
1427 let mut errors = Vec::new();
1428 for (name, result) in results {
1429 if let Err(e) = result {
1430 error!(sink = %name, epoch, error = %e, "[LDB-6016] sink rollback failed");
1431 errors.push(format!("sink '{name}': {e}"));
1432 }
1433 }
1434 if errors.is_empty() {
1435 Ok(())
1436 } else {
1437 Err(DbError::Checkpoint(format!(
1438 "rollback failed: {}",
1439 errors.join("; ")
1440 )))
1441 }
1442 }
1443
1444 fn collect_sink_epochs(&self, epoch: u64) -> HashMap<String, u64> {
1448 let mut epochs = HashMap::with_capacity(self.sinks.len());
1449 for sink in &self.sinks {
1450 if sink.exactly_once {
1451 epochs.insert(sink.name.clone(), epoch);
1452 }
1453 }
1454 epochs
1455 }
1456
1457 fn sorted_sink_names(&mut self) -> Vec<String> {
1463 if self.cached_sorted_sink_names.is_none() {
1464 let mut names: Vec<String> = self.sinks.iter().map(|s| s.name.clone()).collect();
1465 names.sort();
1466 self.cached_sorted_sink_names = Some(names);
1467 }
1468 self.cached_sorted_sink_names.as_ref().unwrap().clone()
1470 }
1471
1472 #[must_use]
1474 pub fn phase(&self) -> CheckpointPhase {
1475 self.phase
1476 }
1477
1478 #[must_use]
1480 pub fn epoch(&self) -> u64 {
1481 self.allocator.peek().0
1482 }
1483
1484 #[must_use]
1486 pub fn next_checkpoint_id(&self) -> u64 {
1487 self.allocator.peek().1
1488 }
1489
1490 #[must_use]
1492 pub fn config(&self) -> &CheckpointConfig {
1493 &self.config
1494 }
1495
1496 #[must_use]
1498 pub fn stats(&self) -> CheckpointStats {
1499 let (p50, p95, p99) = self.duration_histogram.percentiles();
1500 CheckpointStats {
1502 completed: self.checkpoints_completed,
1503 failed: self.checkpoints_failed,
1504 last_duration: self.last_checkpoint_duration,
1505 duration_p50_ms: p50 / 1_000,
1506 duration_p95_ms: p95 / 1_000,
1507 duration_p99_ms: p99 / 1_000,
1508 total_bytes_written: self.total_bytes_written,
1509 current_phase: self.phase,
1510 current_epoch: self.allocator.peek().0,
1511 }
1512 }
1513
1514 #[must_use]
1516 pub fn store(&self) -> &dyn CheckpointStore {
1517 &*self.store
1518 }
1519
1520 pub async fn checkpoint_with_offsets(
1532 &mut self,
1533 request: CheckpointRequest,
1534 ) -> Result<CheckpointResult, DbError> {
1535 self.checkpoint_inner(request, None, QuorumStage::RunInline)
1536 .await
1537 }
1538
1539 pub(crate) async fn checkpoint_preallocated(
1546 &mut self,
1547 request: CheckpointRequest,
1548 epoch: u64,
1549 checkpoint_id: u64,
1550 quorum: QuorumStage,
1551 ) -> Result<CheckpointResult, DbError> {
1552 self.checkpoint_inner(request, Some((epoch, checkpoint_id)), quorum)
1553 .await
1554 }
1555
1556 #[cfg(feature = "cluster")]
1560 pub(crate) async fn abandon_epoch(
1561 &mut self,
1562 checkpoint_id: u64,
1563 epoch: u64,
1564 error: String,
1565 ) -> CheckpointResult {
1566 self.fail_epoch(checkpoint_id, epoch, Instant::now(), error)
1567 .await
1568 }
1569
1570 #[cfg(feature = "cluster")]
1583 pub async fn follower_checkpoint(
1584 &mut self,
1585 request: CheckpointRequest,
1586 ann: laminar_core::cluster::control::BarrierAnnouncement,
1587 decision_timeout: Duration,
1588 ) -> Result<bool, DbError> {
1589 use laminar_core::cluster::control::BarrierAck;
1590
1591 let Some(cc) = self.cluster_controller.clone() else {
1592 return Err(DbError::Checkpoint(
1593 "[LDB-6033] follower_checkpoint called without cluster controller".into(),
1594 ));
1595 };
1596
1597 cc.ack_barrier(&BarrierAck {
1600 epoch: ann.epoch,
1601 ok: true,
1602 error: None,
1603 local_watermark_ms: self.local_watermark_ms,
1606 })
1607 .await
1608 .ok(); self.follower_checkpoint_acked(request, ann, decision_timeout)
1611 .await
1612 }
1613
1614 #[cfg(feature = "cluster")]
1624 pub(crate) async fn follower_checkpoint_acked(
1625 &mut self,
1626 request: CheckpointRequest,
1627 ann: laminar_core::cluster::control::BarrierAnnouncement,
1628 decision_timeout: Duration,
1629 ) -> Result<bool, DbError> {
1630 let Some(cc) = self.cluster_controller.clone() else {
1631 return Err(DbError::Checkpoint(
1632 "[LDB-6033] follower_checkpoint called without cluster controller".into(),
1633 ));
1634 };
1635 let (epoch, checkpoint_id) = (ann.epoch, ann.checkpoint_id);
1636 self.follower_prepare_acked(request, epoch, checkpoint_id)
1637 .await?;
1638 let committed = Self::await_follower_decision(
1639 &cc,
1640 self.decision_store.as_deref(),
1641 epoch,
1642 checkpoint_id,
1643 decision_timeout,
1644 )
1645 .await;
1646 Ok(self.follower_finish(epoch, checkpoint_id, committed).await)
1647 }
1648
1649 #[cfg(feature = "cluster")]
1657 pub(crate) async fn follower_prepare_acked(
1658 &mut self,
1659 request: CheckpointRequest,
1660 epoch: u64,
1661 checkpoint_id: u64,
1662 ) -> Result<(), DbError> {
1663 use laminar_core::cluster::control::BarrierAck;
1664
1665 self.allocator
1669 .advance_to(epoch, checkpoint_id.saturating_add(1));
1670
1671 if let Err(e) = self.follower_prepare(request, epoch, checkpoint_id).await {
1672 if let Some(cc) = self.cluster_controller.clone() {
1673 cc.ack_barrier(&BarrierAck {
1674 epoch,
1675 ok: false,
1676 error: Some(e.to_string()),
1677 local_watermark_ms: self.local_watermark_ms,
1678 })
1679 .await
1680 .ok();
1681 }
1682 self.rollback_sinks(epoch).await.ok();
1683 self.phase = CheckpointPhase::Idle;
1684 self.begin_next_epoch_bounded().await;
1688 return Err(e);
1689 }
1690 Ok(())
1691 }
1692
1693 #[cfg(feature = "cluster")]
1700 pub(crate) async fn await_follower_decision(
1701 cc: &laminar_core::cluster::control::ClusterController,
1702 decision_store: Option<&laminar_core::checkpoint_decision::CheckpointDecisionStore>,
1703 epoch: u64,
1704 checkpoint_id: u64,
1705 decision_timeout: Duration,
1706 ) -> bool {
1707 use laminar_core::cluster::control::Phase;
1708
1709 let is_marked = || async {
1710 match decision_store {
1711 Some(ds) => ds.is_committed(epoch).await.unwrap_or_else(|e| {
1712 warn!(
1713 epoch, checkpoint_id, error = %e,
1714 "[LDB-6045] decision store read failed — defaulting to Abort",
1715 );
1716 false
1717 }),
1718 None => false,
1719 }
1720 };
1721
1722 let deadline = Instant::now() + decision_timeout;
1723 loop {
1724 let remaining = deadline.saturating_duration_since(Instant::now());
1725 let decision = cc
1726 .wait_for_barrier(
1727 |a| {
1728 a.epoch > epoch
1729 || (a.epoch == epoch && matches!(a.phase, Phase::Commit | Phase::Abort))
1730 },
1731 remaining,
1732 )
1733 .await;
1734 match decision {
1735 Some(a) if a.epoch == epoch => return a.phase == Phase::Commit,
1736 Some(a) => {
1737 if is_marked().await {
1739 info!(
1740 epoch,
1741 checkpoint_id,
1742 observed_epoch = a.epoch,
1743 "newer epoch observed with commit marker present — committing",
1744 );
1745 return true;
1746 }
1747 tokio::time::sleep(Duration::from_millis(500)).await;
1751 }
1752 None => {
1753 if is_marked().await {
1755 warn!(
1756 epoch,
1757 checkpoint_id,
1758 "[LDB-6046] follower timeout but marker present — committing",
1759 );
1760 return true;
1761 }
1762 warn!(
1763 epoch,
1764 checkpoint_id, "[LDB-6034] follower decision timeout; rolling back",
1765 );
1766 return false;
1767 }
1768 }
1769 }
1770 }
1771
1772 #[cfg(feature = "cluster")]
1775 pub(crate) async fn follower_finish(
1776 &mut self,
1777 epoch: u64,
1778 checkpoint_id: u64,
1779 committed: bool,
1780 ) -> bool {
1781 let clean = if committed {
1782 self.drive_follower_commit(epoch, checkpoint_id).await
1783 } else {
1784 self.rollback_sinks(epoch).await.ok();
1785 self.checkpoints_failed += 1;
1786 self.phase = CheckpointPhase::Idle;
1787 false
1788 };
1789 self.begin_next_epoch_bounded().await;
1793 clean
1794 }
1795
1796 #[cfg(feature = "cluster")]
1799 pub(crate) fn decision_store_handle(
1800 &self,
1801 ) -> Option<Arc<laminar_core::checkpoint_decision::CheckpointDecisionStore>> {
1802 self.decision_store.clone()
1803 }
1804
1805 #[cfg(feature = "cluster")]
1808 async fn drive_follower_commit(&mut self, epoch: u64, checkpoint_id: u64) -> bool {
1809 let statuses = self.commit_sinks_tracked(epoch).await;
1810 let has_failures = statuses
1811 .values()
1812 .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
1813 if has_failures {
1814 error!(
1815 epoch,
1816 checkpoint_id, "follower sink commit partially failed — rolling back",
1817 );
1818 self.rollback_sinks(epoch).await.ok();
1819 self.checkpoints_failed += 1;
1820 self.phase = CheckpointPhase::Idle;
1821 return false;
1822 }
1823 if let Err(e) = self
1827 .persist_recovered_statuses(checkpoint_id, statuses)
1828 .await
1829 {
1830 warn!(
1831 checkpoint_id,
1832 epoch,
1833 error = %e,
1834 "follower post-commit manifest update failed",
1835 );
1836 }
1837 self.checkpoints_completed += 1;
1838 let (_, next_id) = self.allocator.peek();
1839 self.allocator.advance_to(epoch.saturating_add(1), next_id);
1840 self.phase = CheckpointPhase::Idle;
1841 true
1842 }
1843
1844 #[cfg(feature = "cluster")]
1846 async fn follower_prepare(
1847 &mut self,
1848 request: CheckpointRequest,
1849 epoch: u64,
1850 checkpoint_id: u64,
1851 ) -> Result<(), DbError> {
1852 let CheckpointRequest {
1853 operator_states,
1854 watermark,
1855 table_store_checkpoint_path,
1856 extra_table_offsets,
1857 source_watermarks,
1858 pipeline_hash,
1859 source_offset_overrides,
1860 } = request;
1861
1862 self.phase = CheckpointPhase::PreCommitting;
1863 if let Err(e) = self.pre_commit_sinks(epoch).await {
1864 self.pending_vnode_states.clear();
1865 return Err(e);
1866 }
1867
1868 let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1869 manifest.source_offsets = source_offset_overrides;
1870 manifest.table_offsets = extra_table_offsets;
1871 manifest.sink_epochs = self.collect_sink_epochs(epoch);
1872 manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1873 manifest.watermark = watermark;
1874 manifest.source_watermarks = source_watermarks;
1875 manifest.table_store_checkpoint_path = table_store_checkpoint_path;
1876 manifest.source_names = {
1877 let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
1878 names.sort();
1879 names
1880 };
1881 manifest.sink_names = self.sorted_sink_names();
1882 manifest.pipeline_hash = pipeline_hash;
1883 let state_data = Self::pack_operator_states(
1884 &mut manifest,
1885 &operator_states,
1886 self.config.state_inline_threshold,
1887 );
1888
1889 self.phase = CheckpointPhase::Persisting;
1890 if let Err(e) = self.save_manifest(Arc::new(manifest), state_data).await {
1891 self.pending_vnode_states.clear();
1892 return Err(e);
1893 }
1894 if let Err(e) = self.write_vnode_partials(epoch, checkpoint_id).await {
1895 self.pending_vnode_states.clear();
1896 return Err(e);
1897 }
1898 self.pending_vnode_states.clear();
1899 Ok(())
1900 }
1901
1902 #[allow(clippy::too_many_lines)]
1909 async fn checkpoint_inner(
1910 &mut self,
1911 request: CheckpointRequest,
1912 ids: Option<(u64, u64)>,
1913 quorum: QuorumStage,
1914 ) -> Result<CheckpointResult, DbError> {
1915 let CheckpointRequest {
1916 operator_states,
1917 watermark,
1918 table_store_checkpoint_path,
1919 extra_table_offsets,
1920 source_watermarks,
1921 pipeline_hash,
1922 source_offset_overrides,
1923 } = request;
1924 let start = Instant::now();
1925 let (epoch, checkpoint_id) = ids.unwrap_or_else(|| self.allocator.allocate());
1932
1933 info!(checkpoint_id, epoch, "starting checkpoint");
1934
1935 self.phase = CheckpointPhase::Snapshotting;
1938 let source_offsets = source_offset_overrides;
1939 let table_offsets = extra_table_offsets;
1940
1941 #[cfg(feature = "cluster")]
1947 #[allow(unused_assignments)] let mut quorum_participants: Vec<QuorumPeer> = Vec::new();
1949 #[cfg(feature = "cluster")]
1950 match quorum {
1951 QuorumStage::RunInline => {
1952 match self.await_prepare_quorum(epoch, checkpoint_id).await {
1953 Ok(p) => quorum_participants = p,
1954 Err(quorum_failure) => {
1955 error!(checkpoint_id, epoch, error = %quorum_failure, "[LDB-6032] quorum miss");
1956 return Ok(self
1957 .fail_epoch(checkpoint_id, epoch, start, quorum_failure)
1958 .await);
1959 }
1960 }
1961 self.announce_if_leader(
1962 epoch,
1963 checkpoint_id,
1964 laminar_core::cluster::control::Phase::Aligned,
1965 self.cluster_min_watermark,
1966 )
1967 .await;
1968 }
1969 QuorumStage::Done {
1970 min_watermark_ms,
1971 participants,
1972 } => {
1973 self.cluster_min_watermark = min_watermark_ms;
1974 quorum_participants = participants;
1975 }
1976 }
1977 #[cfg(not(feature = "cluster"))]
1978 let _ = quorum;
1979
1980 self.phase = CheckpointPhase::PreCommitting;
1981 if let Err(e) = self.pre_commit_sinks(epoch).await {
1982 error!(checkpoint_id, epoch, error = %e, "pre-commit failed");
1983 return Ok(self
1984 .fail_epoch(
1985 checkpoint_id,
1986 epoch,
1987 start,
1988 format!("pre-commit failed: {e}"),
1989 )
1990 .await);
1991 }
1992
1993 let mut manifest = CheckpointManifest::new(checkpoint_id, epoch);
1994 manifest.source_offsets = source_offsets;
1995 manifest.table_offsets = table_offsets;
1996 manifest.sink_epochs = self.collect_sink_epochs(epoch);
1997 manifest.sink_commit_statuses = self.initial_sink_commit_statuses();
1999 manifest.watermark = watermark;
2000 manifest.source_watermarks = source_watermarks;
2005 manifest.table_store_checkpoint_path = table_store_checkpoint_path;
2006 manifest.source_names = {
2007 let mut names: Vec<String> = manifest.source_offsets.keys().cloned().collect();
2008 names.sort();
2009 names
2010 };
2011 manifest.sink_names = self.sorted_sink_names();
2012 manifest.pipeline_hash = pipeline_hash;
2013
2014 let state_data = Self::pack_operator_states(
2015 &mut manifest,
2016 &operator_states,
2017 self.config.state_inline_threshold,
2018 );
2019 let sidecar_bytes = state_data
2020 .as_ref()
2021 .map_or(0, |chunks| chunks.iter().map(bytes::Bytes::len).sum());
2022 if sidecar_bytes > 0 {
2023 debug!(
2024 checkpoint_id,
2025 sidecar_bytes, "writing operator state sidecar"
2026 );
2027 }
2028
2029 if let Some(cap) = self.config.max_checkpoint_bytes {
2030 if sidecar_bytes > cap {
2031 let msg = format!(
2032 "[LDB-6014] checkpoint size {sidecar_bytes} bytes exceeds \
2033 cap {cap} bytes — checkpoint rejected"
2034 );
2035 error!(checkpoint_id, epoch, sidecar_bytes, cap, "{msg}");
2036 return Ok(self.fail_epoch(checkpoint_id, epoch, start, msg).await);
2040 }
2041 let warn_threshold = cap * 4 / 5; if sidecar_bytes > warn_threshold {
2043 warn!(
2044 checkpoint_id,
2045 epoch, sidecar_bytes, cap, "checkpoint size approaching cap (>80%)"
2046 );
2047 }
2048 }
2049 let checkpoint_bytes = sidecar_bytes as u64;
2050
2051 self.phase = CheckpointPhase::Persisting;
2052 let mut manifest = Arc::new(manifest);
2057 if let Err(e) = self.save_manifest(Arc::clone(&manifest), state_data).await {
2058 error!(checkpoint_id, epoch, error = %e, "[LDB-6008] manifest persist failed");
2059 return Ok(self
2060 .fail_epoch(
2061 checkpoint_id,
2062 epoch,
2063 start,
2064 format!("manifest persist failed: {e}"),
2065 )
2066 .await);
2067 }
2068
2069 if let Err(e) = self.write_vnode_partials(epoch, checkpoint_id).await {
2073 error!(checkpoint_id, epoch, error = %e, "[LDB-6025] vnode partial write failed");
2074 return Ok(self
2075 .fail_epoch(
2076 checkpoint_id,
2077 epoch,
2078 start,
2079 format!("vnode partial write failed: {e}"),
2080 )
2081 .await);
2082 }
2083
2084 #[cfg(not(feature = "cluster"))]
2093 let quorum_participants: Vec<QuorumPeer> = Vec::new();
2094 let gate_start = Instant::now();
2095 let gate_result = self
2096 .await_restorable_gate(epoch, &quorum_participants)
2097 .await;
2098 if let Some(ref m) = self.prom {
2102 m.checkpoint_restorable_gate_wait
2103 .observe(gate_start.elapsed().as_secs_f64());
2104 }
2105 if let Err(gate_err) = gate_result {
2106 warn!(
2107 checkpoint_id,
2108 epoch,
2109 vnodes = self.gate_vnode_set.len(),
2110 error = %gate_err,
2111 "[LDB-6020] state durability gate failed — rolling back sinks",
2112 );
2113 return Ok(self.fail_epoch(checkpoint_id, epoch, start, gate_err).await);
2114 }
2115
2116 let is_decision_leader = {
2121 #[cfg(feature = "cluster")]
2122 {
2123 self.cluster_controller
2124 .as_ref()
2125 .is_none_or(|cc| cc.is_leader())
2126 }
2127 #[cfg(not(feature = "cluster"))]
2128 {
2129 true
2130 }
2131 };
2132 if is_decision_leader {
2133 if let Some(ds) = self.decision_store.as_ref() {
2134 if let Err(e) = ds.record_committed(epoch).await {
2135 error!(
2136 checkpoint_id, epoch, error = %e,
2137 "[LDB-6038] cannot record commit marker — aborting epoch",
2138 );
2139 return Ok(self
2140 .fail_epoch(checkpoint_id, epoch, start, format!("commit marker: {e}"))
2141 .await);
2142 }
2143 }
2144 }
2145
2146 #[cfg(feature = "cluster")]
2147 self.announce_if_leader(
2151 epoch,
2152 checkpoint_id,
2153 laminar_core::cluster::control::Phase::Commit,
2154 self.cluster_min_watermark,
2155 )
2156 .await;
2157
2158 self.phase = CheckpointPhase::Committing;
2159 let sink_statuses = self.commit_sinks_tracked(epoch).await;
2160 let has_failures = sink_statuses
2161 .values()
2162 .any(|s| matches!(s, SinkCommitStatus::Failed(_)));
2163
2164 if !sink_statuses.is_empty() {
2165 Arc::make_mut(&mut manifest).sink_commit_statuses = sink_statuses;
2169 if let Err(e) = self.update_manifest_only(Arc::clone(&manifest)).await {
2170 warn!(
2171 checkpoint_id,
2172 epoch,
2173 error = %e,
2174 "post-commit manifest update failed"
2175 );
2176 }
2177 }
2178
2179 if has_failures {
2180 self.checkpoints_failed += 1;
2186 error!(
2187 checkpoint_id,
2188 epoch,
2189 "sink commit partially failed after the commit decision — \
2190 statuses recorded for recovery-time re-drive"
2191 );
2192 self.phase = CheckpointPhase::Idle;
2193 let duration = start.elapsed();
2194 self.emit_checkpoint_metrics(false, epoch, duration);
2195 self.begin_next_epoch_bounded().await;
2196 self.pending_vnode_states.clear();
2197 return Ok(CheckpointResult {
2198 success: false,
2199 checkpoint_id,
2200 epoch,
2201 duration,
2202 error: Some("partial sink commit failure".into()),
2203 });
2204 }
2205
2206 self.phase = CheckpointPhase::Idle;
2207 self.checkpoints_completed += 1;
2208 self.total_bytes_written += checkpoint_bytes;
2209 let duration = start.elapsed();
2210 self.last_checkpoint_duration = Some(duration);
2211 self.duration_histogram.record(duration);
2212 self.emit_checkpoint_metrics(true, epoch, duration);
2213
2214 if let Some(ref m) = self.prom {
2216 #[allow(clippy::cast_possible_wrap)]
2217 m.checkpoint_size_bytes.set(checkpoint_bytes as i64);
2218 }
2219
2220 if let Some(ref backend) = self.state_backend {
2228 let horizon = epoch.saturating_sub(self.config.max_retained as u64);
2229 if horizon > 0 {
2230 if let Err(e) = backend.prune_before(horizon).await {
2231 warn!(
2232 epoch,
2233 horizon,
2234 error = %e,
2235 "[LDB-6026] state backend prune failed; old partials will linger"
2236 );
2237 }
2238 if let Some(ref ds) = self.decision_store {
2239 if let Err(e) = ds.prune_before(horizon).await {
2240 warn!(epoch, horizon, error = %e, "decision prune failed");
2241 }
2242 }
2243 }
2244 }
2245
2246 let next_epoch = self.allocator.peek().0;
2247 let begin_epoch_error = match self.begin_epoch_for_sinks(next_epoch).await {
2248 Ok(()) => None,
2249 Err(e) => {
2250 error!(
2251 next_epoch,
2252 error = %e,
2253 "[LDB-6015] failed to begin next epoch — writes will be non-transactional"
2254 );
2255 Some(e.to_string())
2256 }
2257 };
2258
2259 info!(
2260 checkpoint_id,
2261 epoch,
2262 duration_ms = duration.as_millis(),
2263 "checkpoint completed"
2264 );
2265
2266 self.pending_vnode_states.clear();
2270 Ok(CheckpointResult {
2271 success: true,
2272 checkpoint_id,
2273 epoch,
2274 duration,
2275 error: begin_epoch_error,
2276 })
2277 }
2278
2279 pub async fn recover(
2292 &mut self,
2293 ) -> Result<Option<crate::recovery_manager::RecoveredState>, DbError> {
2294 use crate::recovery_manager::RecoveryManager;
2295
2296 let mgr = RecoveryManager::new(&*self.store);
2297 let result = mgr.recover(&[], &self.sinks, &[]).await?;
2301
2302 if let Some(ref recovered) = result {
2303 self.allocator
2306 .advance_to(recovered.epoch() + 1, recovered.manifest.checkpoint_id + 1);
2307 let (epoch, checkpoint_id) = self.allocator.peek();
2308 info!(epoch, checkpoint_id, "coordinator epoch set after recovery");
2309 }
2310
2311 Ok(result)
2312 }
2313
2314 pub async fn load_latest_manifest(&self) -> Result<Option<CheckpointManifest>, DbError> {
2320 self.store.load_latest().await.map_err(DbError::from)
2321 }
2322}
2323
2324impl std::fmt::Debug for CheckpointCoordinator {
2325 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2326 f.debug_struct("CheckpointCoordinator")
2327 .field("allocator", &self.allocator)
2328 .field("phase", &self.phase)
2329 .field("sinks", &self.sinks.len())
2330 .field("completed", &self.checkpoints_completed)
2331 .field("failed", &self.checkpoints_failed)
2332 .finish_non_exhaustive()
2333 }
2334}
2335
2336#[derive(Clone)]
2341pub struct DurationHistogram {
2342 samples: Box<[u64; Self::CAPACITY]>,
2344 cursor: usize,
2346 count: u64,
2348}
2349
2350impl Default for DurationHistogram {
2351 fn default() -> Self {
2352 Self::new()
2353 }
2354}
2355
2356impl DurationHistogram {
2357 const CAPACITY: usize = 100;
2358
2359 #[must_use]
2361 pub fn new() -> Self {
2362 Self {
2363 samples: Box::new([0; Self::CAPACITY]),
2364 cursor: 0,
2365 count: 0,
2366 }
2367 }
2368
2369 pub fn record(&mut self, duration: Duration) {
2371 #[allow(clippy::cast_possible_truncation)]
2372 let us = duration.as_micros() as u64;
2373 self.samples[self.cursor] = us;
2374 self.cursor = (self.cursor + 1) % Self::CAPACITY;
2375 self.count += 1;
2376 }
2377
2378 #[must_use]
2380 pub fn is_empty(&self) -> bool {
2381 self.count == 0
2382 }
2383
2384 #[must_use]
2386 pub fn len(&self) -> usize {
2387 if self.count >= Self::CAPACITY as u64 {
2388 Self::CAPACITY
2389 } else {
2390 #[allow(clippy::cast_possible_truncation)]
2392 {
2393 self.count as usize
2394 }
2395 }
2396 }
2397
2398 #[must_use]
2402 pub fn percentile(&self, p: f64) -> u64 {
2403 let n = self.len();
2404 if n == 0 {
2405 return 0;
2406 }
2407 let mut sorted: Vec<u64> = self.samples[..n].to_vec();
2408 sorted.sort_unstable();
2409 #[allow(
2410 clippy::cast_possible_truncation,
2411 clippy::cast_sign_loss,
2412 clippy::cast_precision_loss
2413 )]
2414 let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
2415 sorted[idx]
2416 }
2417
2418 #[must_use]
2420 pub fn percentiles(&self) -> (u64, u64, u64) {
2421 let n = self.len();
2422 if n == 0 {
2423 return (0, 0, 0);
2424 }
2425 let mut sorted: Vec<u64> = self.samples[..n].to_vec();
2426 sorted.sort_unstable();
2427 #[allow(
2428 clippy::cast_possible_truncation,
2429 clippy::cast_sign_loss,
2430 clippy::cast_precision_loss
2431 )]
2432 let at = |p: f64| -> u64 {
2433 let idx = ((p * (n as f64 - 1.0)).ceil() as usize).min(n - 1);
2434 sorted[idx]
2435 };
2436 (at(0.50), at(0.95), at(0.99))
2437 }
2438}
2439
2440impl std::fmt::Debug for DurationHistogram {
2441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2442 let (p50, p95, p99) = self.percentiles();
2443 f.debug_struct("DurationHistogram")
2444 .field("samples_len", &self.samples.len())
2445 .field("cursor", &self.cursor)
2446 .field("count", &self.count)
2447 .field("p50_us", &p50)
2448 .field("p95_us", &p95)
2449 .field("p99_us", &p99)
2450 .finish()
2451 }
2452}
2453
2454#[derive(Debug, Clone, serde::Serialize)]
2456pub struct CheckpointStats {
2457 pub completed: u64,
2459 pub failed: u64,
2461 pub last_duration: Option<Duration>,
2463 pub duration_p50_ms: u64,
2465 pub duration_p95_ms: u64,
2467 pub duration_p99_ms: u64,
2469 pub total_bytes_written: u64,
2471 pub current_phase: CheckpointPhase,
2473 pub current_epoch: u64,
2475}
2476
2477#[must_use]
2479pub fn source_to_connector_checkpoint(cp: &SourceCheckpoint) -> ConnectorCheckpoint {
2480 ConnectorCheckpoint {
2481 offsets: cp.offsets().clone(),
2482 epoch: cp.epoch(),
2483 metadata: cp.metadata().clone(),
2484 }
2485}
2486
2487#[must_use]
2489pub fn connector_to_source_checkpoint(cp: &ConnectorCheckpoint) -> SourceCheckpoint {
2490 let mut source_cp = SourceCheckpoint::with_offsets(cp.epoch, cp.offsets.clone());
2491 for (k, v) in &cp.metadata {
2492 source_cp.set_metadata(k.clone(), v.clone());
2493 }
2494 source_cp
2495}
2496
2497#[cfg(test)]
2498mod tests {
2499 use super::*;
2500 use laminar_core::storage::checkpoint_store::FileSystemCheckpointStore;
2501
2502 async fn make_coordinator(dir: &std::path::Path) -> CheckpointCoordinator {
2503 let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
2504 CheckpointCoordinator::new(CheckpointConfig::default(), store)
2505 .await
2506 .unwrap()
2507 }
2508
2509 async fn make_coordinator_with_fast_gate(dir: &std::path::Path) -> CheckpointCoordinator {
2513 let store = Box::new(FileSystemCheckpointStore::new(dir, 3));
2514 let config = CheckpointConfig {
2515 restorable_gate_timeout: Duration::from_millis(250),
2516 ..CheckpointConfig::default()
2517 };
2518 CheckpointCoordinator::new(config, store).await.unwrap()
2519 }
2520
2521 #[tokio::test]
2522 async fn test_coordinator_new() {
2523 let dir = tempfile::tempdir().unwrap();
2524 let coord = make_coordinator(dir.path()).await;
2525
2526 assert_eq!(coord.epoch(), 1);
2527 assert_eq!(coord.next_checkpoint_id(), 1);
2528 assert_eq!(coord.phase(), CheckpointPhase::Idle);
2529 }
2530
2531 #[tokio::test]
2532 async fn test_coordinator_resumes_from_stored_checkpoint() {
2533 let dir = tempfile::tempdir().unwrap();
2534
2535 let store = FileSystemCheckpointStore::new(dir.path(), 3);
2537 let m = CheckpointManifest::new(5, 10);
2538 store.save(&m).await.unwrap();
2539
2540 let coord = make_coordinator(dir.path()).await;
2542 assert_eq!(coord.epoch(), 11);
2543 assert_eq!(coord.next_checkpoint_id(), 6);
2544 }
2545
2546 #[test]
2547 fn test_checkpoint_phase_display() {
2548 assert_eq!(CheckpointPhase::Idle.to_string(), "Idle");
2549 assert_eq!(CheckpointPhase::Snapshotting.to_string(), "Snapshotting");
2550 assert_eq!(CheckpointPhase::PreCommitting.to_string(), "PreCommitting");
2551 assert_eq!(CheckpointPhase::Persisting.to_string(), "Persisting");
2552 assert_eq!(CheckpointPhase::Committing.to_string(), "Committing");
2553 }
2554
2555 #[test]
2556 fn test_source_to_connector_checkpoint() {
2557 let mut cp = SourceCheckpoint::new(5);
2558 cp.set_offset("partition-0", "1234");
2559 cp.set_metadata("topic", "events");
2560
2561 let cc = source_to_connector_checkpoint(&cp);
2562 assert_eq!(cc.epoch, 5);
2563 assert_eq!(cc.offsets.get("partition-0"), Some(&"1234".into()));
2564 assert_eq!(cc.metadata.get("topic"), Some(&"events".into()));
2565 }
2566
2567 #[test]
2568 fn test_connector_to_source_checkpoint() {
2569 let cc = ConnectorCheckpoint {
2570 offsets: HashMap::from([("lsn".into(), "0/ABCD".into())]),
2571 epoch: 3,
2572 metadata: HashMap::from([("type".into(), "postgres".into())]),
2573 };
2574
2575 let cp = connector_to_source_checkpoint(&cc);
2576 assert_eq!(cp.epoch(), 3);
2577 assert_eq!(cp.get_offset("lsn"), Some("0/ABCD"));
2578 assert_eq!(cp.get_metadata("type"), Some("postgres"));
2579 }
2580
2581 #[tokio::test]
2582 async fn test_stats_initial() {
2583 let dir = tempfile::tempdir().unwrap();
2584 let coord = make_coordinator(dir.path()).await;
2585 let stats = coord.stats();
2586
2587 assert_eq!(stats.completed, 0);
2588 assert_eq!(stats.failed, 0);
2589 assert!(stats.last_duration.is_none());
2590 assert_eq!(stats.duration_p50_ms, 0);
2591 assert_eq!(stats.duration_p95_ms, 0);
2592 assert_eq!(stats.duration_p99_ms, 0);
2593 assert_eq!(stats.current_phase, CheckpointPhase::Idle);
2594 }
2595
2596 #[tokio::test]
2597 async fn test_checkpoint_no_sources_no_sinks() {
2598 let dir = tempfile::tempdir().unwrap();
2599 let mut coord = make_coordinator(dir.path()).await;
2600
2601 let result = coord
2602 .checkpoint(CheckpointRequest {
2603 watermark: Some(1000),
2604 ..CheckpointRequest::default()
2605 })
2606 .await
2607 .unwrap();
2608
2609 assert!(result.success);
2610 assert_eq!(result.checkpoint_id, 1);
2611 assert_eq!(result.epoch, 1);
2612
2613 let loaded = coord.store().load_latest().await.unwrap().unwrap();
2615 assert_eq!(loaded.checkpoint_id, 1);
2616 assert_eq!(loaded.epoch, 1);
2617 assert_eq!(loaded.watermark, Some(1000));
2618
2619 let result2 = coord
2621 .checkpoint(CheckpointRequest {
2622 watermark: Some(2000),
2623 ..CheckpointRequest::default()
2624 })
2625 .await
2626 .unwrap();
2627
2628 assert!(result2.success);
2629 assert_eq!(result2.checkpoint_id, 2);
2630 assert_eq!(result2.epoch, 2);
2631
2632 let stats = coord.stats();
2633 assert_eq!(stats.completed, 2);
2634 assert_eq!(stats.failed, 0);
2635 }
2636
2637 #[tokio::test]
2638 async fn test_checkpoint_with_operator_states() {
2639 let dir = tempfile::tempdir().unwrap();
2640 let mut coord = make_coordinator(dir.path()).await;
2641
2642 let mut ops = HashMap::new();
2643 ops.insert(
2644 "window-agg".into(),
2645 bytes::Bytes::from_static(b"state-data"),
2646 );
2647 ops.insert("filter".into(), bytes::Bytes::from_static(b"filter-state"));
2648
2649 let result = coord
2650 .checkpoint(CheckpointRequest {
2651 operator_states: ops,
2652 ..CheckpointRequest::default()
2653 })
2654 .await
2655 .unwrap();
2656
2657 assert!(result.success);
2658
2659 let loaded = coord.store().load_latest().await.unwrap().unwrap();
2660 assert_eq!(loaded.operator_states.len(), 2);
2661
2662 let window_op = loaded.operator_states.get("window-agg").unwrap();
2663 assert_eq!(window_op.decode_inline().unwrap(), b"state-data");
2664 }
2665
2666 #[tokio::test]
2667 async fn test_checkpoint_with_table_store_path() {
2668 let dir = tempfile::tempdir().unwrap();
2669 let mut coord = make_coordinator(dir.path()).await;
2670
2671 let result = coord
2672 .checkpoint(CheckpointRequest {
2673 table_store_checkpoint_path: Some("/tmp/table_store_cp".into()),
2674 ..CheckpointRequest::default()
2675 })
2676 .await
2677 .unwrap();
2678
2679 assert!(result.success);
2680
2681 let loaded = coord.store().load_latest().await.unwrap().unwrap();
2682 assert_eq!(
2683 loaded.table_store_checkpoint_path.as_deref(),
2684 Some("/tmp/table_store_cp")
2685 );
2686 }
2687
2688 #[tokio::test]
2689 async fn test_load_latest_manifest_empty() {
2690 let dir = tempfile::tempdir().unwrap();
2691 let coord = make_coordinator(dir.path()).await;
2692 assert!(coord.load_latest_manifest().await.unwrap().is_none());
2693 }
2694
2695 #[tokio::test]
2696 async fn test_coordinator_debug() {
2697 let dir = tempfile::tempdir().unwrap();
2698 let coord = make_coordinator(dir.path()).await;
2699 let debug = format!("{coord:?}");
2700 assert!(debug.contains("CheckpointCoordinator"));
2701 assert!(debug.contains("epoch: 1"));
2702 }
2703
2704 #[tokio::test]
2705 async fn test_checkpoint_emits_metrics_on_success() {
2706 let dir = tempfile::tempdir().unwrap();
2707 let mut coord = make_coordinator(dir.path()).await;
2708
2709 let registry = prometheus::Registry::new();
2710 let prom = Arc::new(crate::engine_metrics::EngineMetrics::new(®istry));
2711 coord.set_metrics(Arc::clone(&prom));
2712
2713 let result = coord
2714 .checkpoint(CheckpointRequest {
2715 watermark: Some(1000),
2716 ..CheckpointRequest::default()
2717 })
2718 .await
2719 .unwrap();
2720
2721 assert!(result.success);
2722 assert_eq!(prom.checkpoints_completed.get(), 1);
2723 assert_eq!(prom.checkpoints_failed.get(), 0);
2724 assert_eq!(prom.checkpoint_epoch.get(), 1);
2725
2726 let result2 = coord
2728 .checkpoint(CheckpointRequest {
2729 watermark: Some(2000),
2730 ..CheckpointRequest::default()
2731 })
2732 .await
2733 .unwrap();
2734
2735 assert!(result2.success);
2736 assert_eq!(prom.checkpoints_completed.get(), 2);
2737 assert_eq!(prom.checkpoint_epoch.get(), 2);
2738 }
2739
2740 #[tokio::test]
2741 async fn test_checkpoint_without_metrics() {
2742 let dir = tempfile::tempdir().unwrap();
2744 let mut coord = make_coordinator(dir.path()).await;
2745
2746 let result = coord
2747 .checkpoint(CheckpointRequest::default())
2748 .await
2749 .unwrap();
2750
2751 assert!(result.success);
2752 }
2754
2755 #[test]
2756 fn test_histogram_empty() {
2757 let h = DurationHistogram::new();
2758 assert_eq!(h.len(), 0);
2759 assert_eq!(h.percentile(0.50), 0);
2760 assert_eq!(h.percentile(0.99), 0);
2761 let (p50, p95, p99) = h.percentiles();
2762 assert_eq!((p50, p95, p99), (0, 0, 0));
2763 }
2764
2765 #[test]
2766 fn test_histogram_single_sample() {
2767 let mut h = DurationHistogram::new();
2768 h.record(Duration::from_millis(42));
2769 assert_eq!(h.len(), 1);
2770 assert_eq!(h.percentile(0.50), 42_000);
2772 assert_eq!(h.percentile(0.99), 42_000);
2773 }
2774
2775 #[test]
2776 fn test_histogram_sub_millisecond() {
2777 let mut h = DurationHistogram::new();
2778 h.record(Duration::from_micros(500));
2780 assert_eq!(h.percentile(0.50), 500);
2781 assert_eq!(h.percentile(0.99), 500);
2782 }
2783
2784 #[test]
2785 fn test_histogram_percentiles() {
2786 let mut h = DurationHistogram::new();
2787 for i in 1..=100 {
2789 h.record(Duration::from_millis(i));
2790 }
2791 assert_eq!(h.len(), 100);
2792
2793 let p50 = h.percentile(0.50);
2794 let p95 = h.percentile(0.95);
2795 let p99 = h.percentile(0.99);
2796
2797 assert!((49_000..=51_000).contains(&p50), "p50={p50}");
2800 assert!((94_000..=96_000).contains(&p95), "p95={p95}");
2801 assert!((98_000..=100_000).contains(&p99), "p99={p99}");
2802 }
2803
2804 #[test]
2805 fn test_histogram_wraps_ring_buffer() {
2806 let mut h = DurationHistogram::new();
2807 for i in 1..=150 {
2809 h.record(Duration::from_millis(i));
2810 }
2811 assert_eq!(h.len(), 100);
2812 assert_eq!(h.count, 150);
2813
2814 let p50 = h.percentile(0.50);
2816 assert!((99_000..=101_000).contains(&p50), "p50={p50}");
2817 }
2818
2819 #[tokio::test]
2820 async fn test_sidecar_round_trip() {
2821 let dir = tempfile::tempdir().unwrap();
2822 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2823 let config = CheckpointConfig {
2824 state_inline_threshold: 100, ..CheckpointConfig::default()
2826 };
2827 let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
2828
2829 let mut ops = HashMap::new();
2831 ops.insert("small".into(), bytes::Bytes::from(vec![0xAAu8; 50]));
2832 ops.insert("large".into(), bytes::Bytes::from(vec![0xBBu8; 200]));
2833
2834 let result = coord
2835 .checkpoint(CheckpointRequest {
2836 operator_states: ops,
2837 ..CheckpointRequest::default()
2838 })
2839 .await
2840 .unwrap();
2841 assert!(result.success);
2842
2843 let loaded = coord.store().load_latest().await.unwrap().unwrap();
2845 let small_op = loaded.operator_states.get("small").unwrap();
2846 assert!(!small_op.external, "small state should be inline");
2847 assert_eq!(small_op.decode_inline().unwrap(), vec![0xAAu8; 50]);
2848
2849 let large_op = loaded.operator_states.get("large").unwrap();
2850 assert!(large_op.external, "large state should be external");
2851 assert_eq!(large_op.external_length, 200);
2852
2853 let state_data = coord.store().load_state_data(1).await.unwrap().unwrap();
2855 assert_eq!(state_data.len(), 200);
2856 assert!(state_data.iter().all(|&b| b == 0xBB));
2857 }
2858
2859 #[tokio::test]
2860 async fn test_all_inline_no_sidecar() {
2861 let dir = tempfile::tempdir().unwrap();
2862 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2863 let config = CheckpointConfig::default(); let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
2865
2866 let mut ops = HashMap::new();
2867 ops.insert("op1".into(), bytes::Bytes::from_static(b"small-state"));
2868
2869 let result = coord
2870 .checkpoint(CheckpointRequest {
2871 operator_states: ops,
2872 ..CheckpointRequest::default()
2873 })
2874 .await
2875 .unwrap();
2876 assert!(result.success);
2877
2878 assert!(coord.store().load_state_data(1).await.unwrap().is_none());
2880 }
2881
2882 #[tokio::test]
2885 async fn durability_gate_skipped_when_vnode_set_empty() {
2886 let dir = tempfile::tempdir().unwrap();
2890 let mut coord = make_coordinator(dir.path()).await;
2891 let result = coord
2892 .checkpoint(CheckpointRequest::default())
2893 .await
2894 .unwrap();
2895 assert!(result.success, "baseline checkpoint must succeed");
2896 }
2897
2898 #[tokio::test]
2899 async fn bridge_writes_markers_and_gate_passes() {
2900 use laminar_core::state::InProcessBackend;
2901 let dir = tempfile::tempdir().unwrap();
2902 let mut coord = make_coordinator(dir.path()).await;
2903 let backend = Arc::new(InProcessBackend::new(4));
2904 coord.set_state_backend(backend.clone());
2905 coord.set_vnode_set(vec![0, 1, 2, 3]);
2906
2907 let result = coord
2908 .checkpoint(CheckpointRequest::default())
2909 .await
2910 .unwrap();
2911 assert!(result.success, "bridge writes markers → gate passes");
2912 for v in 0..4 {
2914 assert!(
2915 backend.read_partial(v, 1).await.unwrap().is_some(),
2916 "bridge should have written marker for vnode {v}",
2917 );
2918 }
2919 }
2920
2921 #[cfg(feature = "cluster")]
2922 #[tokio::test]
2923 async fn reconcile_announces_abort_when_no_decision_store() {
2924 use laminar_core::cluster::control::{
2928 BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase, ANNOUNCEMENT_KEY,
2929 };
2930 use laminar_core::cluster::discovery::NodeId;
2931 use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
2932 use tokio::sync::watch;
2933
2934 let dir = tempfile::tempdir().unwrap();
2935 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
2936 let mut orphan = CheckpointManifest::new(42, 7);
2937 orphan
2938 .sink_commit_statuses
2939 .insert("kafka_out".into(), SinkCommitStatus::Pending);
2940 store.save_with_state(&orphan, None).await.unwrap();
2941
2942 let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2943 .await
2944 .unwrap();
2945 let self_id = NodeId(1);
2946 let kv = Arc::new(InMemoryKv::new(self_id));
2947 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2948 let (_tx, rx) = watch::channel(Vec::new());
2949 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2950 let mut coord = coord;
2951 coord.set_cluster_controller(controller);
2952
2953 coord.reconcile_prepared_on_init().await;
2954
2955 let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
2956 let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
2957 assert_eq!(ann.phase, Phase::Abort);
2958 assert_eq!(ann.epoch, 7);
2959 assert_eq!(ann.checkpoint_id, 42);
2960 }
2961
2962 #[cfg(feature = "cluster")]
2963 #[tokio::test]
2964 async fn reconcile_announces_commit_when_marker_present() {
2965 use laminar_core::cluster::control::{
2966 BarrierAnnouncement, CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
2967 Phase, ANNOUNCEMENT_KEY,
2968 };
2969 use laminar_core::cluster::discovery::NodeId;
2970 use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
2971 use object_store::local::LocalFileSystem;
2972 use tokio::sync::watch;
2973
2974 let ckpt_dir = tempfile::tempdir().unwrap();
2975 let decision_dir = tempfile::tempdir().unwrap();
2976 let store = Box::new(FileSystemCheckpointStore::new(ckpt_dir.path(), 3));
2977 let mut orphan = CheckpointManifest::new(42, 7);
2978 orphan
2979 .sink_commit_statuses
2980 .insert("kafka_out".into(), SinkCommitStatus::Pending);
2981 store.save_with_state(&orphan, None).await.unwrap();
2982
2983 let decision_os: Arc<dyn object_store::ObjectStore> =
2984 Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
2985 let decision_store = Arc::new(CheckpointDecisionStore::new(decision_os));
2986 decision_store.record_committed(7).await.unwrap();
2987
2988 let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
2989 .await
2990 .unwrap();
2991 let self_id = NodeId(1);
2992 let kv = Arc::new(InMemoryKv::new(self_id));
2993 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
2994 let (_tx, rx) = watch::channel(Vec::new());
2995 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
2996 let mut coord = coord;
2997 coord.set_cluster_controller(controller);
2998 coord.set_decision_store(decision_store);
2999
3000 coord.reconcile_prepared_on_init().await;
3001
3002 let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
3003 let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
3004 assert_eq!(ann.phase, Phase::Commit);
3005 assert_eq!(ann.epoch, 7);
3006 assert_eq!(ann.checkpoint_id, 42);
3007 }
3008
3009 #[cfg(feature = "cluster")]
3010 #[tokio::test]
3011 async fn reconcile_announces_abort_when_marker_missing() {
3012 use laminar_core::cluster::control::{
3015 BarrierAnnouncement, CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
3016 Phase, ANNOUNCEMENT_KEY,
3017 };
3018 use laminar_core::cluster::discovery::NodeId;
3019 use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
3020 use object_store::local::LocalFileSystem;
3021 use tokio::sync::watch;
3022
3023 let ckpt_dir = tempfile::tempdir().unwrap();
3024 let decision_dir = tempfile::tempdir().unwrap();
3025 let store = Box::new(FileSystemCheckpointStore::new(ckpt_dir.path(), 3));
3026 let mut orphan = CheckpointManifest::new(11, 3);
3027 orphan
3028 .sink_commit_statuses
3029 .insert("out".into(), SinkCommitStatus::Pending);
3030 store.save_with_state(&orphan, None).await.unwrap();
3031
3032 let decision_os: Arc<dyn object_store::ObjectStore> =
3033 Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
3034 let decision_store = Arc::new(CheckpointDecisionStore::new(decision_os));
3035
3036 let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
3037 .await
3038 .unwrap();
3039 let self_id = NodeId(1);
3040 let kv = Arc::new(InMemoryKv::new(self_id));
3041 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3042 let (_tx, rx) = watch::channel(Vec::new());
3043 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
3044 let mut coord = coord;
3045 coord.set_cluster_controller(controller);
3046 coord.set_decision_store(decision_store);
3047
3048 coord.reconcile_prepared_on_init().await;
3049
3050 let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
3051 let ann: BarrierAnnouncement = serde_json::from_str(&raw).unwrap();
3052 assert_eq!(ann.phase, Phase::Abort);
3053 assert_eq!(ann.epoch, 3);
3054 }
3055
3056 #[cfg(feature = "cluster")]
3057 #[tokio::test]
3058 async fn reconcile_silent_when_manifest_clean() {
3059 use laminar_core::cluster::control::{
3060 ClusterController, ClusterKv, InMemoryKv, ANNOUNCEMENT_KEY,
3061 };
3062 use laminar_core::cluster::discovery::NodeId;
3063 use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
3064 use tokio::sync::watch;
3065
3066 let dir = tempfile::tempdir().unwrap();
3067 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
3068 let mut clean = CheckpointManifest::new(5, 3);
3069 clean
3070 .sink_commit_statuses
3071 .insert("out".into(), SinkCommitStatus::Committed);
3072 store.save_with_state(&clean, None).await.unwrap();
3073
3074 let coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
3075 .await
3076 .unwrap();
3077 let self_id = NodeId(1);
3078 let kv = Arc::new(InMemoryKv::new(self_id));
3079 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3080 let (_tx, rx) = watch::channel(Vec::new());
3081 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
3082 let mut coord = coord;
3083 coord.set_cluster_controller(controller);
3084
3085 coord.reconcile_prepared_on_init().await;
3086
3087 assert!(kv.read_from(self_id, ANNOUNCEMENT_KEY).await.is_none());
3089 }
3090
3091 #[cfg(feature = "cluster")]
3092 #[tokio::test]
3093 async fn follower_checkpoint_commits_on_leader_commit() {
3094 use laminar_core::cluster::control::{
3095 BarrierAck, BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase,
3096 ACK_KEY, ANNOUNCEMENT_KEY,
3097 };
3098 use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
3099 use tokio::sync::watch;
3100
3101 let dir = tempfile::tempdir().unwrap();
3102 let mut coord = make_coordinator(dir.path()).await;
3103
3104 let leader_id = NodeId(1);
3105 let follower_id = NodeId(7);
3106
3107 let kv = Arc::new(InMemoryKv::new(follower_id));
3111 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3112 let leader_info = NodeInfo {
3113 id: leader_id,
3114 name: "leader".into(),
3115 rpc_address: String::new(),
3116 raft_address: String::new(),
3117 state: NodeState::Active,
3118 metadata: NodeMetadata::default(),
3119 last_heartbeat_ms: 0,
3120 };
3121 let (_tx, rx) = watch::channel(vec![leader_info]);
3122 let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
3123 coord.set_cluster_controller(controller);
3124
3125 let prepare_json = serde_json::to_string(&BarrierAnnouncement {
3128 epoch: 1,
3129 checkpoint_id: 1,
3130 phase: Phase::Prepare,
3131 flags: 0,
3132 min_watermark_ms: None,
3133 })
3134 .unwrap();
3135 let commit_json = serde_json::to_string(&BarrierAnnouncement {
3136 epoch: 1,
3137 checkpoint_id: 1,
3138 phase: Phase::Commit,
3139 flags: 0,
3140 min_watermark_ms: None,
3141 })
3142 .unwrap();
3143 kv.seed(leader_id, ANNOUNCEMENT_KEY, prepare_json);
3147 kv.seed(leader_id, ANNOUNCEMENT_KEY, commit_json);
3148
3149 let ann = BarrierAnnouncement {
3150 epoch: 1,
3151 checkpoint_id: 1,
3152 phase: Phase::Prepare,
3153 flags: 0,
3154 min_watermark_ms: None,
3155 };
3156 let committed = coord
3157 .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(2))
3158 .await
3159 .unwrap();
3160 assert!(committed, "follower should commit on leader's Commit");
3161
3162 let ack_raw = kv.read_from(follower_id, ACK_KEY).await.unwrap();
3164 let ack: BarrierAck = serde_json::from_str(&ack_raw).unwrap();
3165 assert_eq!(ack.epoch, 1);
3166 assert!(ack.ok, "prepare succeeded, ack should be ok");
3167
3168 let stored = coord.store().load_latest().await.unwrap().unwrap();
3170 assert_eq!(stored.epoch, 1);
3171 }
3172
3173 #[cfg(feature = "cluster")]
3174 #[tokio::test]
3175 async fn follower_checkpoint_rolls_back_on_leader_abort() {
3176 use laminar_core::cluster::control::{
3177 BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase, ANNOUNCEMENT_KEY,
3178 };
3179 use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
3180 use tokio::sync::watch;
3181
3182 let dir = tempfile::tempdir().unwrap();
3183 let mut coord = make_coordinator(dir.path()).await;
3184
3185 let leader_id = NodeId(1);
3186 let follower_id = NodeId(9);
3187 let kv = Arc::new(InMemoryKv::new(follower_id));
3188 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3189 let leader_info = NodeInfo {
3190 id: leader_id,
3191 name: "leader".into(),
3192 rpc_address: String::new(),
3193 raft_address: String::new(),
3194 state: NodeState::Active,
3195 metadata: NodeMetadata::default(),
3196 last_heartbeat_ms: 0,
3197 };
3198 let (_tx, rx) = watch::channel(vec![leader_info]);
3199 let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
3200 coord.set_cluster_controller(controller);
3201
3202 let abort_json = serde_json::to_string(&BarrierAnnouncement {
3203 epoch: 1,
3204 checkpoint_id: 1,
3205 phase: Phase::Abort,
3206 flags: 0,
3207 min_watermark_ms: None,
3208 })
3209 .unwrap();
3210 kv.seed(leader_id, ANNOUNCEMENT_KEY, abort_json);
3211
3212 let ann = BarrierAnnouncement {
3213 epoch: 1,
3214 checkpoint_id: 1,
3215 phase: Phase::Prepare,
3216 flags: 0,
3217 min_watermark_ms: None,
3218 };
3219 let committed = coord
3220 .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(2))
3221 .await
3222 .unwrap();
3223 assert!(!committed, "follower should roll back on leader's Abort");
3224 }
3225
3226 #[cfg(feature = "cluster")]
3229 struct RecordingKv {
3230 inner: laminar_core::cluster::control::InMemoryKv,
3231 announcements: Arc<parking_lot::Mutex<Vec<String>>>,
3232 }
3233
3234 #[cfg(feature = "cluster")]
3235 #[async_trait::async_trait]
3236 impl laminar_core::cluster::control::ClusterKv for RecordingKv {
3237 async fn write(&self, key: &str, value: String) {
3238 if key == laminar_core::cluster::control::ANNOUNCEMENT_KEY {
3239 self.announcements.lock().push(value.clone());
3240 }
3241 self.inner.write(key, value).await;
3242 }
3243 async fn read_from(
3244 &self,
3245 who: laminar_core::cluster::discovery::NodeId,
3246 key: &str,
3247 ) -> Option<String> {
3248 self.inner.read_from(who, key).await
3249 }
3250 async fn scan(&self, key: &str) -> Vec<(laminar_core::cluster::discovery::NodeId, String)> {
3251 self.inner.scan(key).await
3252 }
3253 async fn scan_prefix(
3254 &self,
3255 prefix: &str,
3256 ) -> Vec<(laminar_core::cluster::discovery::NodeId, String, String)> {
3257 self.inner.scan_prefix(prefix).await
3258 }
3259 }
3260
3261 #[cfg(feature = "cluster")]
3265 #[tokio::test]
3266 async fn leader_announces_aligned_between_prepare_and_commit() {
3267 use laminar_core::cluster::control::{
3268 BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase,
3269 };
3270 use laminar_core::cluster::discovery::NodeId;
3271 use tokio::sync::watch;
3272
3273 let dir = tempfile::tempdir().unwrap();
3274 let mut coord = make_coordinator(dir.path()).await;
3275
3276 let self_id = NodeId(1);
3277 let announcements = Arc::new(parking_lot::Mutex::new(Vec::new()));
3278 let kv: Arc<dyn ClusterKv> = Arc::new(RecordingKv {
3279 inner: InMemoryKv::new(self_id),
3280 announcements: Arc::clone(&announcements),
3281 });
3282 let (_tx, rx) = watch::channel(Vec::new());
3283 let controller = Arc::new(ClusterController::new(self_id, kv, None, rx));
3284 coord.set_cluster_controller(controller);
3285
3286 let result = coord
3287 .checkpoint(CheckpointRequest::default())
3288 .await
3289 .unwrap();
3290 assert!(result.success);
3291
3292 let phases: Vec<Phase> = announcements
3293 .lock()
3294 .iter()
3295 .map(|json| {
3296 serde_json::from_str::<BarrierAnnouncement>(json)
3297 .unwrap()
3298 .phase
3299 })
3300 .collect();
3301 assert_eq!(
3302 phases,
3303 vec![Phase::Prepare, Phase::Aligned, Phase::Commit],
3304 "two-level completion must announce Aligned between Prepare and Commit",
3305 );
3306 }
3307
3308 #[cfg(feature = "cluster")]
3313 #[tokio::test]
3314 async fn follower_prepare_failure_overwrites_capture_ack() {
3315 use laminar_core::cluster::control::{
3316 BarrierAck, BarrierAnnouncement, ClusterController, ClusterKv, InMemoryKv, Phase,
3317 ACK_KEY,
3318 };
3319 use laminar_core::cluster::discovery::{NodeId, NodeInfo, NodeMetadata, NodeState};
3320 use laminar_core::state::InProcessBackend;
3321 use tokio::sync::watch;
3322
3323 let dir = tempfile::tempdir().unwrap();
3324 let mut coord = make_coordinator(dir.path()).await;
3325
3326 let leader_id = NodeId(1);
3327 let follower_id = NodeId(7);
3328 let kv = Arc::new(InMemoryKv::new(follower_id));
3329 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3330 let leader_info = NodeInfo {
3331 id: leader_id,
3332 name: "leader".into(),
3333 rpc_address: String::new(),
3334 raft_address: String::new(),
3335 state: NodeState::Active,
3336 metadata: NodeMetadata::default(),
3337 last_heartbeat_ms: 0,
3338 };
3339 let (_tx, rx) = watch::channel(vec![leader_info]);
3340 let controller = Arc::new(ClusterController::new(follower_id, kv_trait, None, rx));
3341 coord.set_cluster_controller(controller);
3342 coord.set_state_backend(Arc::new(InProcessBackend::new(2)));
3345 coord.set_vnode_set(vec![99]);
3346
3347 let ann = BarrierAnnouncement {
3348 epoch: 1,
3349 checkpoint_id: 1,
3350 phase: Phase::Prepare,
3351 flags: 0,
3352 min_watermark_ms: None,
3353 };
3354 let result = coord
3355 .follower_checkpoint(CheckpointRequest::default(), ann, Duration::from_secs(1))
3356 .await;
3357 assert!(result.is_err(), "prepare failure must surface as an error");
3358
3359 let ack_raw = kv.read_from(follower_id, ACK_KEY).await.unwrap();
3360 let ack: BarrierAck = serde_json::from_str(&ack_raw).unwrap();
3361 assert_eq!(ack.epoch, 1);
3362 assert!(!ack.ok, "the failure ack must overwrite the capture ack");
3363 assert!(
3364 ack.error.unwrap().contains("vnode partial write failed"),
3365 "failure ack should carry the prepare error",
3366 );
3367 }
3368
3369 #[cfg(feature = "cluster")]
3370 #[tokio::test]
3371 async fn leader_publishes_cluster_min_watermark_to_controller() {
3372 use laminar_core::cluster::control::{
3380 CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv,
3381 };
3382 use laminar_core::cluster::discovery::NodeId;
3383 use object_store::local::LocalFileSystem;
3384 use tokio::sync::watch;
3385
3386 let dir = tempfile::tempdir().unwrap();
3387 let decision_dir = tempfile::tempdir().unwrap();
3388 let mut coord = make_coordinator(dir.path()).await;
3389
3390 let decision_os: Arc<dyn object_store::ObjectStore> =
3391 Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
3392 coord.set_decision_store(Arc::new(CheckpointDecisionStore::new(decision_os)));
3393
3394 let self_id = NodeId(1);
3395 let kv = Arc::new(InMemoryKv::new(self_id));
3396 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3397 let (_tx, rx) = watch::channel(Vec::new());
3398 let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
3399 coord.set_cluster_controller(Arc::clone(&controller));
3400
3401 assert_eq!(controller.cluster_min_watermark(), None);
3403
3404 coord.set_local_watermark_ms(Some(12_345));
3408 let result = coord
3409 .checkpoint(CheckpointRequest::default())
3410 .await
3411 .unwrap();
3412 assert!(result.success, "solo-cluster checkpoint should succeed");
3413
3414 assert_eq!(
3415 controller.cluster_min_watermark(),
3416 Some(12_345),
3417 "leader must mirror the cluster-wide min into its controller",
3418 );
3419
3420 coord.set_local_watermark_ms(Some(42));
3424 let result = coord
3425 .checkpoint(CheckpointRequest::default())
3426 .await
3427 .unwrap();
3428 assert!(result.success);
3429 assert_eq!(
3430 controller.cluster_min_watermark(),
3431 Some(12_345),
3432 "stale local watermark must not lower the published cluster min",
3433 );
3434 }
3435
3436 #[cfg(feature = "cluster")]
3437 #[tokio::test]
3438 async fn leader_announces_prepare_and_commit_on_solo_cluster() {
3439 use laminar_core::cluster::control::{
3440 CheckpointDecisionStore, ClusterController, ClusterKv, InMemoryKv, Phase,
3441 ANNOUNCEMENT_KEY,
3442 };
3443 use laminar_core::cluster::discovery::NodeId;
3444 use object_store::local::LocalFileSystem;
3445 use tokio::sync::watch;
3446
3447 let dir = tempfile::tempdir().unwrap();
3448 let decision_dir = tempfile::tempdir().unwrap();
3449 let mut coord = make_coordinator(dir.path()).await;
3450
3451 let decision_os: Arc<dyn object_store::ObjectStore> =
3452 Arc::new(LocalFileSystem::new_with_prefix(decision_dir.path()).unwrap());
3453 coord.set_decision_store(Arc::new(CheckpointDecisionStore::new(decision_os)));
3454
3455 let self_id = NodeId(1);
3456 let kv = Arc::new(InMemoryKv::new(self_id));
3457 let kv_trait: Arc<dyn ClusterKv> = kv.clone();
3458 let (_tx, rx) = watch::channel(Vec::new()); let controller = Arc::new(ClusterController::new(self_id, kv_trait, None, rx));
3460 coord.set_cluster_controller(controller);
3461
3462 let result = coord
3463 .checkpoint(CheckpointRequest::default())
3464 .await
3465 .unwrap();
3466 assert!(result.success, "solo-cluster checkpoint should succeed");
3467
3468 let raw = kv.read_from(self_id, ANNOUNCEMENT_KEY).await.unwrap();
3471 let ann: laminar_core::cluster::control::BarrierAnnouncement =
3472 serde_json::from_str(&raw).unwrap();
3473 assert_eq!(ann.phase, Phase::Commit);
3474 assert_eq!(ann.epoch, result.epoch);
3475 }
3476
3477 #[tokio::test]
3478 async fn gate_checks_full_registry_not_just_owned() {
3479 use laminar_core::state::InProcessBackend;
3484 let dir = tempfile::tempdir().unwrap();
3485 let mut coord = make_coordinator_with_fast_gate(dir.path()).await;
3486 let backend = Arc::new(InProcessBackend::new(4));
3487 coord.set_state_backend(backend.clone());
3488 coord.set_vnode_set(vec![0, 1]); coord.set_gate_vnode_set(vec![0, 1, 2, 3]); let result = coord
3492 .checkpoint(CheckpointRequest::default())
3493 .await
3494 .unwrap();
3495 assert!(
3496 !result.success,
3497 "gate must fail when follower markers are missing",
3498 );
3499 let err = result.error.expect("failure produces an error message");
3500 assert!(
3501 err.contains("not all vnodes persisted"),
3502 "expected full-registry gate miss, got: {err}",
3503 );
3504 }
3505
3506 #[tokio::test]
3510 async fn restorable_gate_waits_for_async_follower_uploads() {
3511 use bytes::Bytes;
3512 use laminar_core::state::InProcessBackend;
3513 let dir = tempfile::tempdir().unwrap();
3514 let mut coord = make_coordinator(dir.path()).await;
3515 let backend = Arc::new(InProcessBackend::new(4));
3516 backend
3520 .write_partial(0, 1, 0, Bytes::from_static(b"leader"))
3521 .await
3522 .unwrap();
3523 backend
3524 .write_partial(1, 1, 0, Bytes::from_static(b"leader"))
3525 .await
3526 .unwrap();
3527 let late = Arc::clone(&backend);
3528 tokio::spawn(async move {
3529 tokio::time::sleep(Duration::from_millis(300)).await;
3530 for v in [2u32, 3] {
3531 late.write_partial(v, 1, 0, Bytes::from_static(b"follower"))
3532 .await
3533 .unwrap();
3534 }
3535 });
3536 coord.set_state_backend(backend);
3537 coord.set_vnode_set(vec![0, 1]);
3538 coord.set_gate_vnode_set(vec![0, 1, 2, 3]);
3539
3540 let start = std::time::Instant::now();
3541 coord
3542 .await_restorable_gate(1, &[])
3543 .await
3544 .expect("gate must seal once the late partials land");
3545 assert!(
3546 start.elapsed() >= Duration::from_millis(250),
3547 "gate returned before the late partials could have landed",
3548 );
3549 }
3550
3551 #[tokio::test]
3552 async fn gate_passes_when_all_registry_markers_present() {
3553 use bytes::Bytes;
3557 use laminar_core::state::InProcessBackend;
3558 let dir = tempfile::tempdir().unwrap();
3559 let mut coord = make_coordinator(dir.path()).await;
3560 let backend = Arc::new(InProcessBackend::new(4));
3561 backend
3564 .write_partial(2, 1, 0, Bytes::from_static(b"follower"))
3565 .await
3566 .unwrap();
3567 backend
3568 .write_partial(3, 1, 0, Bytes::from_static(b"follower"))
3569 .await
3570 .unwrap();
3571 coord.set_state_backend(backend);
3572 coord.set_vnode_set(vec![0, 1]);
3573 coord.set_gate_vnode_set(vec![0, 1, 2, 3]);
3574
3575 let result = coord
3576 .checkpoint(CheckpointRequest::default())
3577 .await
3578 .unwrap();
3579 assert!(result.success, "gate should pass: every vnode has a marker");
3580 }
3581
3582 #[tokio::test]
3583 async fn marker_write_failure_aborts_checkpoint() {
3584 use laminar_core::state::InProcessBackend;
3585 let dir = tempfile::tempdir().unwrap();
3586 let mut coord = make_coordinator(dir.path()).await;
3587 coord.set_state_backend(Arc::new(InProcessBackend::new(2)));
3590 coord.set_vnode_set(vec![0, 99]);
3591
3592 let result = coord
3593 .checkpoint(CheckpointRequest::default())
3594 .await
3595 .unwrap();
3596 assert!(
3597 !result.success,
3598 "out-of-range vnode must fail the checkpoint"
3599 );
3600 let err = result.error.expect("failure produces an error message");
3601 assert!(err.contains("vnode partial write failed"), "got: {err}");
3602 }
3603
3604 #[tokio::test]
3609 async fn unchanged_vnode_state_becomes_reference_partial() {
3610 use laminar_core::state::InProcessBackend;
3611
3612 let dir = tempfile::tempdir().unwrap();
3613 let config = CheckpointConfig {
3614 max_retained: 2, ..CheckpointConfig::default()
3616 };
3617 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
3618 let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
3619 let backend = Arc::new(InProcessBackend::new(2));
3620 coord.set_state_backend(Arc::clone(&backend) as Arc<dyn StateBackend>);
3621 coord.set_vnode_set(vec![0]);
3622
3623 let slices = || {
3624 let mut ops = std::collections::HashMap::new();
3625 ops.insert("agg".to_string(), bytes::Bytes::from_static(b"state-v1"));
3626 std::collections::HashMap::from([(0u32, ops)])
3627 };
3628
3629 coord.set_pending_vnode_states(slices());
3631 let r1 = coord
3632 .checkpoint(CheckpointRequest::default())
3633 .await
3634 .unwrap();
3635 assert!(r1.success);
3636 let p1 = crate::vnode_partial::VnodePartial::decode(
3637 &backend.read_partial(0, r1.epoch).await.unwrap().unwrap(),
3638 )
3639 .unwrap();
3640 assert_eq!(p1.base_epoch, None, "first upload must be full");
3641 assert!(!p1.operators.is_empty());
3642
3643 coord.set_pending_vnode_states(slices());
3645 let r2 = coord
3646 .checkpoint(CheckpointRequest::default())
3647 .await
3648 .unwrap();
3649 assert!(r2.success);
3650 let p2 = crate::vnode_partial::VnodePartial::decode(
3651 &backend.read_partial(0, r2.epoch).await.unwrap().unwrap(),
3652 )
3653 .unwrap();
3654 assert_eq!(
3655 p2.base_epoch,
3656 Some(r1.epoch),
3657 "unchanged slice must reference its base"
3658 );
3659 assert!(p2.operators.is_empty());
3660
3661 coord.set_pending_vnode_states(slices());
3664 let r3 = coord
3665 .checkpoint(CheckpointRequest::default())
3666 .await
3667 .unwrap();
3668 assert!(r3.success);
3669 let p3 = crate::vnode_partial::VnodePartial::decode(
3670 &backend.read_partial(0, r3.epoch).await.unwrap().unwrap(),
3671 )
3672 .unwrap();
3673 assert_eq!(
3674 p3.base_epoch, None,
3675 "reference age cap must force a full re-upload",
3676 );
3677
3678 let mut changed = std::collections::HashMap::new();
3680 let mut ops = std::collections::HashMap::new();
3681 ops.insert("agg".to_string(), bytes::Bytes::from_static(b"state-v2"));
3682 changed.insert(0u32, ops);
3683 coord.set_pending_vnode_states(changed);
3684 let r4 = coord
3685 .checkpoint(CheckpointRequest::default())
3686 .await
3687 .unwrap();
3688 assert!(r4.success);
3689 let p4 = crate::vnode_partial::VnodePartial::decode(
3690 &backend.read_partial(0, r4.epoch).await.unwrap().unwrap(),
3691 )
3692 .unwrap();
3693 assert_eq!(p4.base_epoch, None);
3694 assert!(!p4.operators.is_empty());
3695 }
3696
3697 struct FaultBackend {
3700 inner: laminar_core::state::InProcessBackend,
3701 fail: parking_lot::Mutex<std::collections::HashSet<(u64, u32)>>,
3702 write_delay: Duration,
3703 }
3704
3705 #[async_trait::async_trait]
3706 impl StateBackend for FaultBackend {
3707 async fn write_partial(
3708 &self,
3709 vnode: u32,
3710 epoch: u64,
3711 assignment_version: u64,
3712 bytes: bytes::Bytes,
3713 ) -> Result<(), laminar_core::state::StateBackendError> {
3714 tokio::time::sleep(self.write_delay).await;
3715 if self.fail.lock().contains(&(epoch, vnode)) {
3716 return Err(laminar_core::state::StateBackendError::Io(
3717 "injected write failure".into(),
3718 ));
3719 }
3720 self.inner
3721 .write_partial(vnode, epoch, assignment_version, bytes)
3722 .await
3723 }
3724
3725 async fn read_partial(
3726 &self,
3727 vnode: u32,
3728 epoch: u64,
3729 ) -> Result<Option<bytes::Bytes>, laminar_core::state::StateBackendError> {
3730 self.inner.read_partial(vnode, epoch).await
3731 }
3732
3733 async fn epoch_complete(
3734 &self,
3735 epoch: u64,
3736 vnodes: &[u32],
3737 ) -> Result<bool, laminar_core::state::StateBackendError> {
3738 self.inner.epoch_complete(epoch, vnodes).await
3739 }
3740
3741 async fn prune_before(
3742 &self,
3743 before: u64,
3744 ) -> Result<(), laminar_core::state::StateBackendError> {
3745 self.inner.prune_before(before).await
3746 }
3747
3748 async fn latest_committed_epoch(
3749 &self,
3750 ) -> Result<Option<u64>, laminar_core::state::StateBackendError> {
3751 self.inner.latest_committed_epoch().await
3752 }
3753
3754 fn set_authoritative_version(&self, version: u64) {
3755 self.inner.set_authoritative_version(version);
3756 }
3757
3758 fn authoritative_version(&self) -> u64 {
3759 self.inner.authoritative_version()
3760 }
3761 }
3762
3763 #[tokio::test]
3775 #[allow(clippy::too_many_lines)] async fn overlapping_epoch_failure_is_isolated() {
3777 let dir = tempfile::tempdir().unwrap();
3778 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
3779 let mut coord = CheckpointCoordinator::new(CheckpointConfig::default(), store)
3780 .await
3781 .unwrap();
3782 let backend = Arc::new(FaultBackend {
3783 inner: laminar_core::state::InProcessBackend::new(2),
3784 fail: parking_lot::Mutex::new(std::collections::HashSet::new()),
3785 write_delay: Duration::from_millis(100),
3786 });
3787 coord.set_state_backend(Arc::clone(&backend) as Arc<dyn StateBackend>);
3788 coord.set_vnode_set(vec![0, 1]);
3789
3790 let allocator = coord.epoch_allocator();
3791 let coordinator = Arc::new(tokio::sync::Mutex::new(Some(coord)));
3792 let (done_tx, mut done_rx) = tokio::sync::mpsc::unbounded_channel::<CheckpointResult>();
3793
3794 let admit = |tag: &'static [u8]| {
3798 let (epoch, checkpoint_id) = allocator.allocate();
3799 let coordinator = Arc::clone(&coordinator);
3800 let done = done_tx.clone();
3801 let states = std::collections::HashMap::from([
3802 (
3803 0u32,
3804 std::collections::HashMap::from([(
3805 "agg".to_string(),
3806 bytes::Bytes::from_static(tag),
3807 )]),
3808 ),
3809 (
3810 1u32,
3811 std::collections::HashMap::from([(
3812 "agg".to_string(),
3813 bytes::Bytes::from_static(tag),
3814 )]),
3815 ),
3816 ]);
3817 tokio::spawn(async move {
3818 let mut guard = coordinator.lock().await;
3819 let coord = guard.as_mut().unwrap();
3820 coord.set_pending_vnode_states(states);
3821 let result = coord
3822 .checkpoint_preallocated(
3823 CheckpointRequest::default(),
3824 epoch,
3825 checkpoint_id,
3826 QuorumStage::RunInline,
3827 )
3828 .await
3829 .unwrap();
3830 done.send(result).unwrap();
3831 });
3832 epoch
3833 };
3834
3835 let a = admit(b"v1");
3839 tokio::time::sleep(Duration::from_millis(10)).await;
3840 let b = admit(b"v1"); tokio::time::sleep(Duration::from_millis(10)).await;
3842 let (c_epoch, _) = allocator.peek();
3843 backend.fail.lock().insert((c_epoch, 1)); let c = admit(b"v2"); tokio::time::sleep(Duration::from_millis(10)).await;
3846 let d = admit(b"v2"); let mut results = Vec::new();
3849 for _ in 0..4 {
3850 results.push(done_rx.recv().await.unwrap());
3851 }
3852
3853 assert_eq!(
3854 results.iter().map(|r| r.epoch).collect::<Vec<_>>(),
3855 vec![a, b, c, d],
3856 "tails must complete in admission order",
3857 );
3858 assert_eq!(
3859 results.iter().map(|r| r.success).collect::<Vec<_>>(),
3860 vec![true, true, false, true],
3861 "the failed epoch must not disturb its successors",
3862 );
3863 assert!(results[2]
3864 .error
3865 .as_deref()
3866 .is_some_and(|e| e.contains("vnode partial write failed")));
3867
3868 assert_eq!(
3870 backend.latest_committed_epoch().await.unwrap(),
3871 Some(d),
3872 "the last successful epoch is the recovery point",
3873 );
3874
3875 let p_b = crate::vnode_partial::VnodePartial::decode(
3880 &backend.read_partial(0, b).await.unwrap().unwrap(),
3881 )
3882 .unwrap();
3883 assert_eq!(p_b.base_epoch, Some(a));
3884 for vnode in [0u32, 1] {
3885 let p_d = crate::vnode_partial::VnodePartial::decode(
3886 &backend.read_partial(vnode, d).await.unwrap().unwrap(),
3887 )
3888 .unwrap();
3889 assert_eq!(
3890 p_d.base_epoch, None,
3891 "vnode {vnode}: a successor of a failed epoch must re-upload full, \
3892 never reference the failed epoch's stray partial",
3893 );
3894 assert_eq!(p_d.operators[0].1, b"v2");
3895 }
3896 assert_eq!(c, d - 1, "abandoned epoch's id is burned, not reused");
3897 }
3898
3899 #[tokio::test]
3906 async fn recovery_never_walks_ids_back_onto_aborted_epochs() {
3907 use laminar_core::storage::checkpoint_manifest::SinkCommitStatus;
3908
3909 let dir = tempfile::tempdir().unwrap();
3910 let store = FileSystemCheckpointStore::new(dir.path(), 5);
3911 let mut committed = CheckpointManifest::new(3, 3);
3913 committed
3914 .sink_commit_statuses
3915 .insert("out".into(), SinkCommitStatus::Committed);
3916 store.save(&committed).await.unwrap();
3917 let mut aborted = CheckpointManifest::new(5, 5);
3920 aborted
3921 .sink_commit_statuses
3922 .insert("out".into(), SinkCommitStatus::Pending);
3923 store.save(&aborted).await.unwrap();
3924
3925 let mut coord = CheckpointCoordinator::new(CheckpointConfig::default(), Box::new(store))
3926 .await
3927 .unwrap();
3928 assert_eq!(coord.epoch(), 6, "seeds from the highest loadable manifest");
3929
3930 let recovered = coord.recover().await.unwrap().expect("recovers");
3931 assert_eq!(recovered.epoch(), 3, "restores from the committed epoch");
3932 assert_eq!(
3933 coord.epoch(),
3934 6,
3935 "ids must stay above the aborted epoch, never re-allocating it",
3936 );
3937 }
3938
3939 #[test]
3940 fn epoch_allocator_allocates_monotonic_pairs() {
3941 let a = EpochAllocator::new(5, 9);
3942 assert_eq!(a.peek(), (5, 9));
3943 assert_eq!(a.allocate(), (5, 9));
3944 assert_eq!(a.allocate(), (6, 10));
3945 assert_eq!(a.peek(), (7, 11));
3946 a.advance_to(20, 30);
3947 assert_eq!(a.allocate(), (20, 30));
3948 a.advance_to(5, 5);
3950 assert_eq!(a.peek(), (21, 31));
3951 }
3952
3953 #[tokio::test]
3956 async fn failed_epoch_is_abandoned_not_retried() {
3957 let dir = tempfile::tempdir().unwrap();
3958 let config = CheckpointConfig {
3959 max_checkpoint_bytes: Some(16),
3960 ..CheckpointConfig::default()
3961 };
3962 let store = Box::new(FileSystemCheckpointStore::new(dir.path(), 3));
3963 let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
3964
3965 let mut ops = HashMap::new();
3967 ops.insert("big".to_string(), bytes::Bytes::from(vec![0u8; 2_000_000]));
3968 let failed = coord
3969 .checkpoint(CheckpointRequest {
3970 operator_states: ops,
3971 ..CheckpointRequest::default()
3972 })
3973 .await
3974 .unwrap();
3975 assert!(!failed.success);
3976
3977 let ok = coord
3978 .checkpoint(CheckpointRequest::default())
3979 .await
3980 .unwrap();
3981 assert!(ok.success);
3982 assert_eq!(
3983 ok.epoch,
3984 failed.epoch + 1,
3985 "the failed epoch must be abandoned, not reused",
3986 );
3987 assert_eq!(ok.checkpoint_id, failed.checkpoint_id + 1);
3988 }
3989
3990 #[tokio::test]
3991 async fn test_stats_include_percentiles_after_checkpoints() {
3992 let dir = tempfile::tempdir().unwrap();
3993 let mut coord = make_coordinator(dir.path()).await;
3994
3995 for _ in 0..3 {
3997 let result = coord
3998 .checkpoint(CheckpointRequest::default())
3999 .await
4000 .unwrap();
4001 assert!(result.success);
4002 }
4003
4004 let stats = coord.stats();
4005 assert_eq!(stats.completed, 3);
4006 assert!(stats.last_duration.is_some());
4009 }
4010
4011 struct FailingPreCommitSink {
4013 rollback_count: Arc<std::sync::atomic::AtomicU64>,
4014 schema: arrow::datatypes::SchemaRef,
4015 }
4016
4017 #[async_trait::async_trait]
4018 impl laminar_connectors::connector::SinkConnector for FailingPreCommitSink {
4019 async fn open(
4020 &mut self,
4021 _config: &laminar_connectors::config::ConnectorConfig,
4022 ) -> Result<(), laminar_connectors::error::ConnectorError> {
4023 Ok(())
4024 }
4025
4026 async fn write_batch(
4027 &mut self,
4028 _batch: &arrow::array::RecordBatch,
4029 ) -> Result<
4030 laminar_connectors::connector::WriteResult,
4031 laminar_connectors::error::ConnectorError,
4032 > {
4033 Ok(laminar_connectors::connector::WriteResult::new(0, 0))
4034 }
4035
4036 async fn pre_commit(
4037 &mut self,
4038 epoch: u64,
4039 ) -> Result<(), laminar_connectors::error::ConnectorError> {
4040 Err(laminar_connectors::error::ConnectorError::TransactionError(
4041 format!("synthetic pre_commit failure at epoch {epoch}"),
4042 ))
4043 }
4044
4045 async fn rollback_epoch(
4046 &mut self,
4047 _epoch: u64,
4048 ) -> Result<(), laminar_connectors::error::ConnectorError> {
4049 self.rollback_count
4050 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
4051 Ok(())
4052 }
4053
4054 async fn close(&mut self) -> Result<(), laminar_connectors::error::ConnectorError> {
4055 Ok(())
4056 }
4057
4058 fn schema(&self) -> arrow::datatypes::SchemaRef {
4059 Arc::clone(&self.schema)
4060 }
4061
4062 fn capabilities(&self) -> laminar_connectors::connector::SinkConnectorCapabilities {
4063 laminar_connectors::connector::SinkConnectorCapabilities::new(Duration::from_secs(5))
4064 .with_exactly_once()
4065 .with_two_phase_commit()
4066 .with_preserves_pending_on_abandon()
4067 }
4068 }
4069
4070 #[tokio::test]
4076 async fn pre_commit_failure_abandons_without_connector_rollback() {
4077 use arrow::datatypes::{DataType, Field, Schema};
4078
4079 let dir = tempfile::tempdir().unwrap();
4080 let mut coord = make_coordinator(dir.path()).await;
4081
4082 let rollback_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
4083 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
4084 let sink = FailingPreCommitSink {
4085 rollback_count: Arc::clone(&rollback_count),
4086 schema,
4087 };
4088 let (event_tx, _event_rx) = laminar_core::streaming::channel::channel::<
4089 crate::sink_task::SinkEvent,
4090 >(crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY);
4091 let handle = crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
4092 name: "failing-sink".into(),
4093 sink_id: Arc::from("failing-sink"),
4094 connector: Box::new(sink),
4095 exactly_once: true,
4096 channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
4097 flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
4098 write_timeout: Duration::from_secs(5),
4099 event_tx,
4100 });
4101 coord.register_sink("failing-sink", handle, true);
4102
4103 coord.begin_initial_epoch().await.unwrap();
4104
4105 let result = coord
4106 .checkpoint(CheckpointRequest::default())
4107 .await
4108 .unwrap();
4109
4110 assert!(!result.success);
4111 assert!(
4112 result
4113 .error
4114 .as_deref()
4115 .is_some_and(|e| e.contains("pre-commit failed")),
4116 "error should mention pre-commit: got {:?}",
4117 result.error
4118 );
4119 assert_eq!(
4120 rollback_count.load(std::sync::atomic::Ordering::Relaxed),
4121 0,
4122 "a healthy sink must keep its pending output on a live abandon"
4123 );
4124 }
4125
4126 struct StuckRollbackSink {
4130 schema: arrow::datatypes::SchemaRef,
4131 }
4132
4133 #[async_trait::async_trait]
4134 impl laminar_connectors::connector::SinkConnector for StuckRollbackSink {
4135 async fn open(
4136 &mut self,
4137 _config: &laminar_connectors::config::ConnectorConfig,
4138 ) -> Result<(), laminar_connectors::error::ConnectorError> {
4139 Ok(())
4140 }
4141
4142 async fn write_batch(
4143 &mut self,
4144 _batch: &arrow::array::RecordBatch,
4145 ) -> Result<
4146 laminar_connectors::connector::WriteResult,
4147 laminar_connectors::error::ConnectorError,
4148 > {
4149 Err(laminar_connectors::error::ConnectorError::WriteError(
4150 "synthetic write failure".into(),
4151 ))
4152 }
4153
4154 async fn pre_commit(
4155 &mut self,
4156 _epoch: u64,
4157 ) -> Result<(), laminar_connectors::error::ConnectorError> {
4158 Err(laminar_connectors::error::ConnectorError::TransactionError(
4159 "synthetic pre_commit failure".into(),
4160 ))
4161 }
4162
4163 async fn rollback_epoch(
4164 &mut self,
4165 _epoch: u64,
4166 ) -> Result<(), laminar_connectors::error::ConnectorError> {
4167 std::future::pending::<()>().await;
4169 Ok(())
4170 }
4171
4172 async fn close(&mut self) -> Result<(), laminar_connectors::error::ConnectorError> {
4173 Ok(())
4174 }
4175
4176 fn schema(&self) -> arrow::datatypes::SchemaRef {
4177 Arc::clone(&self.schema)
4178 }
4179
4180 fn capabilities(&self) -> laminar_connectors::connector::SinkConnectorCapabilities {
4181 laminar_connectors::connector::SinkConnectorCapabilities::new(Duration::from_secs(5))
4182 .with_exactly_once()
4183 .with_two_phase_commit()
4184 }
4185 }
4186
4187 #[tokio::test(start_paused = true)]
4188 async fn test_rollback_sinks_bounded_by_timeout() {
4189 use arrow::datatypes::{DataType, Field, Schema};
4190
4191 let dir = tempfile::tempdir().unwrap();
4192 let config = CheckpointConfig {
4193 rollback_timeout: Duration::from_millis(100),
4194 ..Default::default()
4195 };
4196 let store = Box::new(
4197 laminar_core::storage::checkpoint_store::FileSystemCheckpointStore::new(dir.path(), 3),
4198 );
4199 let mut coord = CheckpointCoordinator::new(config, store).await.unwrap();
4200
4201 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
4202 let sink = StuckRollbackSink { schema };
4203 let (event_tx, _event_rx) = laminar_core::streaming::channel::channel::<
4204 crate::sink_task::SinkEvent,
4205 >(crate::sink_task::SINK_EVENT_CHANNEL_CAPACITY);
4206 let handle = crate::sink_task::SinkTaskHandle::spawn(crate::sink_task::SinkTaskConfig {
4207 name: "stuck-sink".into(),
4208 sink_id: Arc::from("stuck-sink"),
4209 connector: Box::new(sink),
4210 exactly_once: true,
4211 channel_capacity: crate::sink_task::DEFAULT_CHANNEL_CAPACITY,
4212 flush_interval: crate::sink_task::DEFAULT_FLUSH_INTERVAL,
4213 write_timeout: Duration::from_secs(5),
4214 event_tx,
4215 });
4216 coord.register_sink("stuck-sink", handle.clone(), true);
4217 coord.begin_initial_epoch().await.unwrap();
4218
4219 let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
4222 let batch = arrow::array::RecordBatch::try_new(
4223 schema,
4224 vec![Arc::new(arrow::array::Int32Array::from(vec![1]))],
4225 )
4226 .unwrap();
4227 handle.write_batch(batch).await.unwrap();
4228 handle.sync().await.unwrap();
4229
4230 let result = coord
4234 .checkpoint(CheckpointRequest::default())
4235 .await
4236 .unwrap();
4237
4238 assert!(!result.success);
4239 assert!(
4240 result
4241 .error
4242 .as_deref()
4243 .is_some_and(|e| e.contains("pre-commit failed")),
4244 "checkpoint result should reflect pre-commit failure: got {:?}",
4245 result.error
4246 );
4247 }
4248}