pub struct CheckpointCoordinator { /* private fields */ }Expand description
Unified checkpoint coordinator.
Orchestrates the full checkpoint lifecycle across sources, sinks,
and operator state, persisting everything in a single
CheckpointManifest.
Implementations§
Source§impl CheckpointCoordinator
impl CheckpointCoordinator
Sourcepub fn new(config: CheckpointConfig, store: Box<dyn CheckpointStore>) -> Self
pub fn new(config: CheckpointConfig, store: Box<dyn CheckpointStore>) -> Self
Creates a new checkpoint coordinator.
Sourcepub async fn begin_initial_epoch(&self) -> Result<(), DbError>
pub async fn begin_initial_epoch(&self) -> Result<(), DbError>
Begins the initial epoch on all exactly-once sinks.
Must be called once after all sinks are registered and before any writes occur. This starts the first Kafka transaction for exactly-once sinks. Subsequent epochs are started automatically after each successful checkpoint commit.
§Errors
Returns the first error from any sink that fails to begin the epoch.
Sourcepub fn set_counters(&mut self, counters: Arc<PipelineCounters>)
pub fn set_counters(&mut self, counters: Arc<PipelineCounters>)
Sets the shared pipeline counters for checkpoint metrics emission.
When set, checkpoint completion and failure will update the counters automatically.
Sourcepub fn register_wal_manager(&mut self, wal_manager: PerCoreWalManager)
pub fn register_wal_manager(&mut self, wal_manager: PerCoreWalManager)
Registers a per-core WAL manager for checkpoint coordination.
When registered, prepare_wal_for_checkpoint()
will write epoch barriers and sync all segments, and
truncate_wal_after_checkpoint()
will reset all segments.
Sourcepub fn register_changelog_drainer(&mut self, drainer: ChangelogDrainer)
pub fn register_changelog_drainer(&mut self, drainer: ChangelogDrainer)
Registers a changelog drainer to flush before checkpointing.
Multiple drainers may be registered (one per core or per state store).
All are flushed during
prepare_wal_for_checkpoint().
Sourcepub fn prepare_wal_for_checkpoint(
&mut self,
) -> Result<WalPrepareResult, DbError>
pub fn prepare_wal_for_checkpoint( &mut self, ) -> Result<WalPrepareResult, DbError>
Prepares the WAL for a checkpoint.
- Flushes all registered
ChangelogDrainerinstances (Ring 1 → Ring 0 catchup) - Writes epoch barriers to all per-core WAL segments
- Syncs all WAL segments (
fdatasync) - Records and returns per-core WAL positions
Call this before checkpoint() and pass the returned
positions into that method.
§Errors
Returns DbError::Checkpoint if WAL operations fail.
Sourcepub fn truncate_wal_after_checkpoint(
&mut self,
current_positions: Vec<u64>,
) -> Result<(), DbError>
pub fn truncate_wal_after_checkpoint( &mut self, current_positions: Vec<u64>, ) -> Result<(), DbError>
Truncates (resets) all per-core WAL segments after a successful checkpoint.
Call this after checkpoint() returns success.
WAL data before the checkpoint position is no longer needed.
§Errors
Returns DbError::Checkpoint if truncation fails.
Sourcepub fn wal_manager(&self) -> Option<&PerCoreWalManager>
pub fn wal_manager(&self) -> Option<&PerCoreWalManager>
Returns a reference to the registered WAL manager, if any.
Sourcepub fn wal_manager_mut(&mut self) -> Option<&mut PerCoreWalManager>
pub fn wal_manager_mut(&mut self) -> Option<&mut PerCoreWalManager>
Returns a mutable reference to the registered WAL manager, if any.
Sourcepub fn changelog_drainers(&self) -> &[ChangelogDrainer]
pub fn changelog_drainers(&self) -> &[ChangelogDrainer]
Returns a slice of registered changelog drainers.
Sourcepub fn changelog_drainers_mut(&mut self) -> &mut [ChangelogDrainer]
pub fn changelog_drainers_mut(&mut self) -> &mut [ChangelogDrainer]
Returns a mutable slice of registered changelog drainers.
Sourcepub async fn checkpoint(
&mut self,
operator_states: HashMap<String, Vec<u8>>,
watermark: Option<i64>,
table_store_checkpoint_path: Option<String>,
source_watermarks: HashMap<String, i64>,
pipeline_hash: Option<u64>,
) -> Result<CheckpointResult, DbError>
pub async fn checkpoint( &mut self, operator_states: HashMap<String, Vec<u8>>, watermark: Option<i64>, table_store_checkpoint_path: Option<String>, source_watermarks: HashMap<String, i64>, pipeline_hash: Option<u64>, ) -> Result<CheckpointResult, DbError>
Performs a full checkpoint cycle (steps 3-7).
Steps 1-2 (barrier propagation + operator snapshots) are handled
externally by the DAG executor and passed in as operator_states.
§Arguments
operator_states— serialized operator state fromDagCheckpointCoordinatorwatermark— current global watermarktable_store_checkpoint_path— table store checkpoint pathsource_watermarks— per-source watermarkspipeline_hash— pipeline identity hash
§Errors
Returns DbError::Checkpoint if any phase fails.
Sourcepub fn phase(&self) -> CheckpointPhase
pub fn phase(&self) -> CheckpointPhase
Returns the current phase.
Sourcepub fn next_checkpoint_id(&self) -> u64
pub fn next_checkpoint_id(&self) -> u64
Returns the next checkpoint ID.
Sourcepub fn config(&self) -> &CheckpointConfig
pub fn config(&self) -> &CheckpointConfig
Returns the checkpoint config.
Sourcepub fn smoothed_duration_ms(&self) -> f64
pub fn smoothed_duration_ms(&self) -> f64
Returns the current smoothed checkpoint duration (milliseconds).
Returns 0.0 if no checkpoints have been completed or adaptive mode is not enabled.
Sourcepub fn unaligned_checkpoint_count(&self) -> u64
pub fn unaligned_checkpoint_count(&self) -> u64
Returns the number of unaligned checkpoints completed.
Sourcepub fn stats(&self) -> CheckpointStats
pub fn stats(&self) -> CheckpointStats
Returns checkpoint statistics.
Sourcepub fn store(&self) -> &dyn CheckpointStore
pub fn store(&self) -> &dyn CheckpointStore
Returns a reference to the underlying store.
Sourcepub async fn checkpoint_with_extra_tables(
&mut self,
operator_states: HashMap<String, Vec<u8>>,
watermark: Option<i64>,
table_store_checkpoint_path: Option<String>,
extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
source_watermarks: HashMap<String, i64>,
pipeline_hash: Option<u64>,
) -> Result<CheckpointResult, DbError>
pub async fn checkpoint_with_extra_tables( &mut self, operator_states: HashMap<String, Vec<u8>>, watermark: Option<i64>, table_store_checkpoint_path: Option<String>, extra_table_offsets: HashMap<String, ConnectorCheckpoint>, source_watermarks: HashMap<String, i64>, pipeline_hash: Option<u64>, ) -> Result<CheckpointResult, DbError>
Performs a full checkpoint cycle with additional table offsets.
Identical to checkpoint() but merges
extra_table_offsets into the manifest’s table_offsets field.
This is useful for ReferenceTableSource instances that are not
registered as SourceConnector but still need their offsets persisted.
§Errors
Returns DbError::Checkpoint if any phase fails.
Sourcepub async fn checkpoint_with_offsets(
&mut self,
operator_states: HashMap<String, Vec<u8>>,
watermark: Option<i64>,
table_store_checkpoint_path: Option<String>,
extra_table_offsets: HashMap<String, ConnectorCheckpoint>,
source_watermarks: HashMap<String, i64>,
pipeline_hash: Option<u64>,
source_offset_overrides: HashMap<String, ConnectorCheckpoint>,
) -> Result<CheckpointResult, DbError>
pub async fn checkpoint_with_offsets( &mut self, operator_states: HashMap<String, Vec<u8>>, watermark: Option<i64>, table_store_checkpoint_path: Option<String>, extra_table_offsets: HashMap<String, ConnectorCheckpoint>, source_watermarks: HashMap<String, i64>, pipeline_hash: Option<u64>, source_offset_overrides: HashMap<String, ConnectorCheckpoint>, ) -> Result<CheckpointResult, DbError>
Performs a full checkpoint with pre-captured source offsets.
When source_offset_overrides is non-empty, those sources skip the
live snapshot_sources() call and use the provided offsets instead.
This is essential for barrier-aligned checkpoints where source
positions must match the operator state at the barrier point.
§Errors
Returns DbError::Checkpoint if any phase fails.
Sourcepub async fn recover(&mut self) -> Result<Option<RecoveredState>, DbError>
pub async fn recover(&mut self) -> Result<Option<RecoveredState>, DbError>
Attempts recovery from the latest checkpoint.
Creates a RecoveryManager
using the coordinator’s store and delegates recovery to it.
On success, advances self.epoch past the recovered epoch so the
next checkpoint gets a fresh epoch number.
Returns Ok(None) for a fresh start (no checkpoint found).
§Errors
Returns DbError::Checkpoint if the store itself fails.
Sourcepub fn load_latest_manifest(
&self,
) -> Result<Option<CheckpointManifest>, DbError>
pub fn load_latest_manifest( &self, ) -> Result<Option<CheckpointManifest>, DbError>
Trait Implementations§
Auto Trait Implementations§
impl Freeze for CheckpointCoordinator
impl !RefUnwindSafe for CheckpointCoordinator
impl Send for CheckpointCoordinator
impl Sync for CheckpointCoordinator
impl Unpin for CheckpointCoordinator
impl UnsafeUnpin for CheckpointCoordinator
impl !UnwindSafe for CheckpointCoordinator
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
§impl<T> AsAny for T
impl<T> AsAny for T
§fn any_ref(&self) -> &(dyn Any + Send + Sync + 'static)
fn any_ref(&self) -> &(dyn Any + Send + Sync + 'static)
dyn Any reference to the object: Read more§fn as_any(self: Arc<T>) -> Arc<dyn Any + Send + Sync>
fn as_any(self: Arc<T>) -> Arc<dyn Any + Send + Sync>
Arc<dyn Any> reference to the object: Read more§fn into_any(self: Box<T>) -> Box<dyn Any + Send + Sync>
fn into_any(self: Box<T>) -> Box<dyn Any + Send + Sync>
Box<dyn Any>: Read more§fn type_name(&self) -> &'static str
fn type_name(&self) -> &'static str
std::any::type_name, since Any does not provide it and
Any::type_id is useless as a debugging aid (its Debug is just a mess of hex digits).Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2where
T: SharedNiching<N1, N2>,
N1: Niching<T>,
N2: Niching<T>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> Pointee for T
impl<T> Pointee for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T> Scope for T
impl<T> Scope for T
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.