pub struct CheckpointCoordinator { /* private fields */ }Expand description
Unified checkpoint coordinator.
Orchestrates the full checkpoint lifecycle across sources, sinks,
and operator state, persisting everything in a single
CheckpointManifest.
Implementations§
Source§impl CheckpointCoordinator
impl CheckpointCoordinator
Sourcepub async fn new(
config: CheckpointConfig,
store: Box<dyn CheckpointStore>,
) -> Result<Self, DbError>
pub async fn new( config: CheckpointConfig, store: Box<dyn CheckpointStore>, ) -> Result<Self, DbError>
Creates a new checkpoint coordinator, seeded from the latest stored checkpoint.
§Errors
Returns DbError::Checkpoint if store.load_latest() fails.
A store read error is surfaced rather than silently starting at
(1, 1) and clobbering existing on-disk state. Ok(None) is
the fresh-start path and is not an error.
Sourcepub fn set_cluster_controller(&mut self, controller: Arc<ClusterController>)
pub fn set_cluster_controller(&mut self, controller: Arc<ClusterController>)
Activates cluster-mode 2PC. Without this the coordinator runs single-instance semantics.
Sourcepub fn set_state_backend(&mut self, backend: Arc<dyn StateBackend>)
pub fn set_state_backend(&mut self, backend: Arc<dyn StateBackend>)
Wired with a non-empty vnode_set to enable per-vnode markers
and the epoch_complete durability gate.
Sourcepub fn set_decision_store(&mut self, store: Arc<CheckpointDecisionStore>)
pub fn set_decision_store(&mut self, store: Arc<CheckpointDecisionStore>)
Install the shared commit-marker store.
Sourcepub fn set_assignment_version(&mut self, version: u64)
pub fn set_assignment_version(&mut self, version: u64)
Record the assignment generation this coordinator is writing
with. Forwarded to backend.write_partial so the split-brain
fence can reject stale writers. Host sets this whenever a fresh
AssignmentSnapshot rotates in.
Sourcepub fn set_local_watermark_ms(&mut self, watermark: Option<i64>)
pub fn set_local_watermark_ms(&mut self, watermark: Option<i64>)
Record this instance’s current local watermark, reported in
every subsequent BarrierAck so the leader can compute the
cluster-wide minimum. None disables the per-follower
contribution — leader falls back to its own watermark (and
the other followers’).
Sourcepub fn set_vnode_set(&mut self, vnodes: Vec<u32>)
pub fn set_vnode_set(&mut self, vnodes: Vec<u32>)
Vnodes this instance owns; drives marker writes. Also the
default gate set until Self::set_gate_vnode_set is called.
Sourcepub fn set_gate_vnode_set(&mut self, vnodes: Vec<u32>)
pub fn set_gate_vnode_set(&mut self, vnodes: Vec<u32>)
Set the vnodes the leader’s durability gate checks (the full
registry in cluster mode). Defaults to vnode_set when unset.
Sourcepub async fn begin_initial_epoch(&self) -> Result<(), DbError>
pub async fn begin_initial_epoch(&self) -> Result<(), DbError>
Begins the initial epoch on all exactly-once sinks.
Must be called once after all sinks are registered and before any writes occur. This starts the first Kafka transaction for exactly-once sinks. Subsequent epochs are started automatically after each successful checkpoint commit.
§Errors
Returns the first error from any sink that fails to begin the epoch.
Sourcepub fn set_metrics(&mut self, prom: Arc<EngineMetrics>)
pub fn set_metrics(&mut self, prom: Arc<EngineMetrics>)
Inject prometheus engine metrics.
Sourcepub async fn checkpoint(
&mut self,
request: CheckpointRequest,
) -> Result<CheckpointResult, DbError>
pub async fn checkpoint( &mut self, request: CheckpointRequest, ) -> Result<CheckpointResult, DbError>
Performs a full checkpoint cycle (steps 3-7).
Steps 1-2 (barrier propagation + operator snapshots) are handled
externally by the DAG executor and passed in via CheckpointRequest.
§Errors
Returns DbError::Checkpoint if any phase fails.
Sourcepub async fn reconcile_prepared_on_init(&self)
pub async fn reconcile_prepared_on_init(&self)
On startup, reconcile any Pending sinks in the last manifest against the durable commit marker. Marker present → drive local commit (idempotent); marker absent → rollback. Runs on every node; the leader additionally re-announces the decision.
Sourcepub fn phase(&self) -> CheckpointPhase
pub fn phase(&self) -> CheckpointPhase
Returns the current phase.
Sourcepub fn next_checkpoint_id(&self) -> u64
pub fn next_checkpoint_id(&self) -> u64
Returns the next checkpoint ID.
Sourcepub fn config(&self) -> &CheckpointConfig
pub fn config(&self) -> &CheckpointConfig
Returns the checkpoint config.
Sourcepub fn stats(&self) -> CheckpointStats
pub fn stats(&self) -> CheckpointStats
Returns checkpoint statistics.
Sourcepub fn store(&self) -> &dyn CheckpointStore
pub fn store(&self) -> &dyn CheckpointStore
Returns a reference to the underlying store.
Sourcepub async fn checkpoint_with_offsets(
&mut self,
request: CheckpointRequest,
) -> Result<CheckpointResult, DbError>
pub async fn checkpoint_with_offsets( &mut self, request: CheckpointRequest, ) -> Result<CheckpointResult, DbError>
Performs a full checkpoint with pre-captured source offsets.
When CheckpointRequest::source_offset_overrides is non-empty,
those sources skip the live snapshot_sources() call and use the
provided offsets instead. This is essential for barrier-aligned
checkpoints where source positions must match the operator state
at the barrier point.
§Errors
Returns DbError::Checkpoint if any phase fails.
Sourcepub async fn follower_checkpoint(
&mut self,
request: CheckpointRequest,
ann: BarrierAnnouncement,
decision_timeout: Duration,
) -> Result<bool, DbError>
pub async fn follower_checkpoint( &mut self, request: CheckpointRequest, ann: BarrierAnnouncement, decision_timeout: Duration, ) -> Result<bool, DbError>
Follower half of a cluster checkpoint: pre-commit + save +
markers + ack, then wait for the leader’s commit/abort.
Ok(true) = committed, Ok(false) = aborted/timed out.
§Errors
Propagates sink pre-commit, manifest save, or marker-write failures.
Sourcepub async fn recover(&mut self) -> Result<Option<RecoveredState>, DbError>
pub async fn recover(&mut self) -> Result<Option<RecoveredState>, DbError>
Attempts recovery from the latest checkpoint.
Creates a RecoveryManager
using the coordinator’s store and delegates recovery to it.
On success, advances self.epoch past the recovered epoch so the
next checkpoint gets a fresh epoch number.
Returns Ok(None) for a fresh start (no checkpoint found).
§Errors
Returns DbError::Checkpoint if the store itself fails.
Sourcepub async fn load_latest_manifest(
&self,
) -> Result<Option<CheckpointManifest>, DbError>
pub async fn load_latest_manifest( &self, ) -> Result<Option<CheckpointManifest>, DbError>
Trait Implementations§
Auto Trait Implementations§
impl Freeze for CheckpointCoordinator
impl !RefUnwindSafe for CheckpointCoordinator
impl Send for CheckpointCoordinator
impl Sync for CheckpointCoordinator
impl Unpin for CheckpointCoordinator
impl UnsafeUnpin for CheckpointCoordinator
impl !UnwindSafe for CheckpointCoordinator
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
§impl<U> As for U
impl<U> As for U
§fn as_<T>(self) -> Twhere
T: CastFrom<U>,
fn as_<T>(self) -> Twhere
T: CastFrom<U>,
self to type T. The semantics of numeric casting with the as operator are followed, so <T as As>::as_::<U> can be used in the same way as T as U for numeric conversions. Read more§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
fn any_ref(&self) -> &(dyn Any + Sync + Send + 'static)
dyn Any reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Sync + Send>
Arc<dyn Any> reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
fn into_any(self: Box<T>) -> Box<dyn Any + Sync + Send>
Box<dyn Any>: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name, since Any does not provide it and
Any::type_id is useless as a debugging aid (its Debug is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: AsAny + ?Sized,
impl<T> Downcast for Twhere
T: AsAny + ?Sized,
§fn downcast_ref<T>(&self) -> Option<&T>where
T: AsAny,
fn downcast_ref<T>(&self) -> Option<&T>where
T: AsAny,
Any.§fn downcast_mut<T>(&mut self) -> Option<&mut T>where
T: AsAny,
fn downcast_mut<T>(&mut self) -> Option<&mut T>where
T: AsAny,
Any.§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<L> LayerExt<L> for L
impl<L> LayerExt<L> for L
§fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
Layered].§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Pointee for T
impl<T> Pointee for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T> Scope for T
impl<T> Scope for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.