pub trait StateBackend:
Send
+ Sync
+ 'static {
// Required methods
fn write_partial<'life0, 'async_trait>(
&'life0 self,
vnode: u32,
epoch: u64,
assignment_version: u64,
bytes: Bytes,
) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn read_partial<'life0, 'async_trait>(
&'life0 self,
vnode: u32,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<Option<Bytes>, StateBackendError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn epoch_complete<'life0, 'life1, 'async_trait>(
&'life0 self,
epoch: u64,
vnodes: &'life1 [u32],
) -> Pin<Box<dyn Future<Output = Result<bool, StateBackendError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn prune_before<'life0, 'async_trait>(
&'life0 self,
before: u64,
) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn set_authoritative_version(&self, _version: u64) { ... }
fn authoritative_version(&self) -> u64 { ... }
}Expand description
A pluggable state store used by streaming operators for partial aggregates and watermarks.
§Object Safety
The trait is deliberately object-safe:
- No generic methods.
- No
Self-returning methods. - All async methods are
async_traitboxed futures.
This lets the engine hold Arc<dyn StateBackend> and swap
implementations at construction time without touching call sites.
§Concurrency
Implementations must be Send + Sync + 'static. The engine expects
to share a single backend across many worker tasks concurrently.
§Idempotence
write_partial must be idempotent for a
given (vnode, epoch) pair — recovery may replay the same write.
Overwriting the same bytes is acceptable; conflicting bytes for the
same key indicate a bug upstream.
Required Methods§
Sourcefn write_partial<'life0, 'async_trait>(
&'life0 self,
vnode: u32,
epoch: u64,
assignment_version: u64,
bytes: Bytes,
) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn write_partial<'life0, 'async_trait>(
&'life0 self,
vnode: u32,
epoch: u64,
assignment_version: u64,
bytes: Bytes,
) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Persist a partial aggregate for (vnode, epoch).
assignment_version is the VnodeRegistry::assignment_version
the writer observed when it started this write. Backends that
implement the split-brain fence compare it against their own
authoritative version and return
StateBackendError::StaleVersion if the writer is behind.
Backends that opt out of fencing accept any version.
Sourcefn read_partial<'life0, 'async_trait>(
&'life0 self,
vnode: u32,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<Option<Bytes>, StateBackendError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn read_partial<'life0, 'async_trait>(
&'life0 self,
vnode: u32,
epoch: u64,
) -> Pin<Box<dyn Future<Output = Result<Option<Bytes>, StateBackendError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Read the partial aggregate for (vnode, epoch), if any.
Sourcefn epoch_complete<'life0, 'life1, 'async_trait>(
&'life0 self,
epoch: u64,
vnodes: &'life1 [u32],
) -> Pin<Box<dyn Future<Output = Result<bool, StateBackendError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn epoch_complete<'life0, 'life1, 'async_trait>(
&'life0 self,
epoch: u64,
vnodes: &'life1 [u32],
) -> Pin<Box<dyn Future<Output = Result<bool, StateBackendError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Durability barrier: returns true iff every vnode in the set
has a partial persisted for epoch.
The checkpoint coordinator calls this after sinks precommit
and before they commit. Sinks do not commit until this returns
Ok(true).
Sourcefn prune_before<'life0, 'async_trait>(
&'life0 self,
before: u64,
) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn prune_before<'life0, 'async_trait>(
&'life0 self,
before: u64,
) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Garbage-collect every partial and commit marker whose epoch is
strictly less than before. Called by the checkpoint
coordinator after a successful checkpoint commit so the backend
does not retain state for epochs that can never be recovered.
Required — there is intentionally no default. Without it an
in-memory backend leaks a Bytes per vnode per checkpoint
forever, and an object-store backend leaves epoch=N/… objects
forever. Test backends that truly do not accumulate state should
implement Ok(()) explicitly so the choice is visible.
Provided Methods§
Raise the backend’s authoritative assignment version — the
minimum VnodeRegistry::assignment_version it will accept on
write_partial. Hosts call this on boot
after adopting an AssignmentSnapshot and on each subsequent
rotation so stale writers from a deposed leader are fenced out.
Default is a no-op — backends that opt out of fencing (e.g. the
in-process backend used for single-node deployments) inherit it
unchanged. Monotonic on implementations that do fence: a call
with version <= current is a no-op.
Current authoritative assignment version. 0 means the fence is
disabled — every caller version is accepted. Backends that do
not fence return 0 unconditionally.