pub struct CheckpointCoordinator { /* private fields */ }Expand description
Orchestrates the checkpoint lifecycle across sources, sinks, and operator state.
Implementations§
Source§impl CheckpointCoordinator
impl CheckpointCoordinator
Sourcepub async fn new(
config: CheckpointConfig,
store: Box<dyn CheckpointStore>,
) -> Result<Self, DbError>
pub async fn new( config: CheckpointConfig, store: Box<dyn CheckpointStore>, ) -> Result<Self, DbError>
Create a coordinator seeded from the highest stored checkpoint.
§Errors
Returns a store read failure rather than silently starting at epoch 1 and clobbering on-disk state.
Sourcepub fn set_cluster_controller(&mut self, controller: Arc<ClusterController>)
pub fn set_cluster_controller(&mut self, controller: Arc<ClusterController>)
Activate cluster-mode 2PC. Without this the coordinator runs single-instance semantics.
Sourcepub fn set_state_backend(&mut self, backend: Arc<dyn StateBackend>)
pub fn set_state_backend(&mut self, backend: Arc<dyn StateBackend>)
Wire a state backend to enable per-vnode markers and the durability gate.
Sourcepub fn set_decision_store(&mut self, store: Arc<CheckpointDecisionStore>)
pub fn set_decision_store(&mut self, store: Arc<CheckpointDecisionStore>)
Wire the durable commit-marker store.
Sourcepub fn set_assignment_version(&mut self, version: u64)
pub fn set_assignment_version(&mut self, version: u64)
Set the assignment generation forwarded to write_partial for the split-brain fence.
Sourcepub fn set_local_watermark_ms(&mut self, watermark: Option<i64>)
pub fn set_local_watermark_ms(&mut self, watermark: Option<i64>)
Set the local watermark reported in BarrierAck so the leader can fold it into the
cluster-wide minimum. None disables this instance’s contribution.
Sourcepub fn set_vnode_set(&mut self, vnodes: Vec<u32>)
pub fn set_vnode_set(&mut self, vnodes: Vec<u32>)
Set the owned vnodes. Also the default gate set until set_gate_vnode_set is called.
Sourcepub fn set_gate_vnode_set(&mut self, vnodes: Vec<u32>)
pub fn set_gate_vnode_set(&mut self, vnodes: Vec<u32>)
Set the vnodes the durability gate checks (the full registry in cluster mode).
Sourcepub async fn begin_initial_epoch(&self) -> Result<(), DbError>
pub async fn begin_initial_epoch(&self) -> Result<(), DbError>
Begin the initial epoch on all exactly-once sinks.
Must be called once after all sinks are registered and before any writes. Subsequent epochs are started automatically after each successful checkpoint commit.
§Errors
Returns the first sink error.
Sourcepub fn set_metrics(&mut self, prom: Arc<EngineMetrics>)
pub fn set_metrics(&mut self, prom: Arc<EngineMetrics>)
Wire Prometheus engine metrics.
Sourcepub async fn checkpoint(
&mut self,
request: CheckpointRequest,
) -> Result<CheckpointResult, DbError>
pub async fn checkpoint( &mut self, request: CheckpointRequest, ) -> Result<CheckpointResult, DbError>
Run a full checkpoint cycle.
Barrier propagation and operator snapshots (steps 1-2) are handled by the caller and
passed in via CheckpointRequest.
§Errors
Returns DbError::Checkpoint if any phase fails.
Sourcepub async fn reconcile_prepared_on_init(&self)
pub async fn reconcile_prepared_on_init(&self)
On startup, reconcile any Pending sinks in the last manifest.
Commit marker present → drive local commit; marker absent → rollback. In cluster mode the leader re-announces the decision.
Sourcepub fn phase(&self) -> CheckpointPhase
pub fn phase(&self) -> CheckpointPhase
Current checkpoint phase.
Sourcepub fn next_checkpoint_id(&self) -> u64
pub fn next_checkpoint_id(&self) -> u64
Next checkpoint ID to be allocated.
Sourcepub fn config(&self) -> &CheckpointConfig
pub fn config(&self) -> &CheckpointConfig
Checkpoint configuration.
Sourcepub fn stats(&self) -> CheckpointStats
pub fn stats(&self) -> CheckpointStats
Checkpoint performance statistics.
Sourcepub fn store(&self) -> &dyn CheckpointStore
pub fn store(&self) -> &dyn CheckpointStore
The underlying checkpoint store.
Sourcepub async fn checkpoint_with_offsets(
&mut self,
request: CheckpointRequest,
) -> Result<CheckpointResult, DbError>
pub async fn checkpoint_with_offsets( &mut self, request: CheckpointRequest, ) -> Result<CheckpointResult, DbError>
Run a full checkpoint using pre-captured source offsets.
Non-empty source_offset_overrides bypass the live snapshot call — required for
barrier-aligned checkpoints where source positions must match operator state exactly.
§Errors
Returns DbError::Checkpoint if any phase fails.
Sourcepub async fn follower_checkpoint(
&mut self,
request: CheckpointRequest,
ann: BarrierAnnouncement,
decision_timeout: Duration,
) -> Result<bool, DbError>
pub async fn follower_checkpoint( &mut self, request: CheckpointRequest, ann: BarrierAnnouncement, decision_timeout: Duration, ) -> Result<bool, DbError>
Follower checkpoint: ack the capture, run the durable prepare, then wait for the
leader’s commit/abort. Returns Ok(true) = committed, Ok(false) = aborted.
The ack means “aligned + captured”; the leader verifies prepare completion through the restorable gate (partials written last imply the full prepare finished).
§Errors
Propagates sink pre-commit, manifest save, or marker-write failures.
Sourcepub async fn recover(&mut self) -> Result<Option<RecoveredState>, DbError>
pub async fn recover(&mut self) -> Result<Option<RecoveredState>, DbError>
Recover from the latest stored checkpoint.
Returns Ok(None) for a fresh start (no checkpoint found).
§Errors
Returns DbError::Checkpoint if the store read fails.
Sourcepub async fn load_latest_manifest(
&self,
) -> Result<Option<CheckpointManifest>, DbError>
pub async fn load_latest_manifest( &self, ) -> Result<Option<CheckpointManifest>, DbError>
Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for CheckpointCoordinator
impl !UnwindSafe for CheckpointCoordinator
impl Freeze for CheckpointCoordinator
impl Send for CheckpointCoordinator
impl Sync for CheckpointCoordinator
impl Unpin for CheckpointCoordinator
impl UnsafeUnpin for CheckpointCoordinator
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
§impl<U> As for U
impl<U> As for U
§fn as_<T>(self) -> Twhere
T: CastFrom<U>,
U: Sized,
fn as_<T>(self) -> Twhere
T: CastFrom<U>,
U: Sized,
self to type T. The semantics of numeric casting with the as operator are followed, so <T as As>::as_::<U> can be used in the same way as T as U for numeric conversions. Read more§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Send + Sync + 'static)
fn any_ref(&self) -> &(dyn Any + Send + Sync + 'static)
dyn Any reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Send + Sync>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Send + Sync>
Arc<dyn Any> reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Send + Sync>
fn into_any(self: Box<T>) -> Box<dyn Any + Send + Sync>
Box<dyn Any>: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name, since Any does not provide it and
Any::type_id is useless as a debugging aid (its Debug is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: AsAny + ?Sized,
impl<T> Downcast for Twhere
T: AsAny + ?Sized,
§fn downcast_ref<T>(&self) -> Option<&T>where
T: AsAny,
fn downcast_ref<T>(&self) -> Option<&T>where
T: AsAny,
Any.§fn downcast_mut<T>(&mut self) -> Option<&mut T>where
T: AsAny,
fn downcast_mut<T>(&mut self) -> Option<&mut T>where
T: AsAny,
Any.§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<L> LayerExt<L> for L
impl<L> LayerExt<L> for L
§fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
Layered].§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
impl<T> MaybeSend for Twhere
T: Send,
§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Pointee for T
impl<T> Pointee for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.