Skip to main content

StateBackend

Trait StateBackend 

Source
pub trait StateBackend:
    Send
    + Sync
    + 'static {
    // Required methods
    fn write_partial<'life0, 'async_trait>(
        &'life0 self,
        vnode: u32,
        epoch: u64,
        assignment_version: u64,
        bytes: Bytes,
    ) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn read_partial<'life0, 'async_trait>(
        &'life0 self,
        vnode: u32,
        epoch: u64,
    ) -> Pin<Box<dyn Future<Output = Result<Option<Bytes>, StateBackendError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn epoch_complete<'life0, 'life1, 'async_trait>(
        &'life0 self,
        epoch: u64,
        vnodes: &'life1 [u32],
    ) -> Pin<Box<dyn Future<Output = Result<bool, StateBackendError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait;
    fn prune_before<'life0, 'async_trait>(
        &'life0 self,
        before: u64,
    ) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;

    // Provided methods
    fn set_authoritative_version(&self, _version: u64) { ... }
    fn authoritative_version(&self) -> u64 { ... }
}
Expand description

A pluggable state store used by streaming operators for partial aggregates and watermarks.

§Object Safety

The trait is deliberately object-safe:

  • No generic methods.
  • No Self-returning methods.
  • All async methods are async_trait boxed futures.

This lets the engine hold Arc<dyn StateBackend> and swap implementations at construction time without touching call sites.

§Concurrency

Implementations must be Send + Sync + 'static. The engine expects to share a single backend across many worker tasks concurrently.

§Idempotence

write_partial must be idempotent for a given (vnode, epoch) pair — recovery may replay the same write. Overwriting the same bytes is acceptable; conflicting bytes for the same key indicate a bug upstream.

Required Methods§

Source

fn write_partial<'life0, 'async_trait>( &'life0 self, vnode: u32, epoch: u64, assignment_version: u64, bytes: Bytes, ) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Persist a partial aggregate for (vnode, epoch).

assignment_version is the VnodeRegistry::assignment_version the writer observed when it started this write. Backends that implement the split-brain fence compare it against their own authoritative version and return StateBackendError::StaleVersion if the writer is behind. Backends that opt out of fencing accept any version.

Source

fn read_partial<'life0, 'async_trait>( &'life0 self, vnode: u32, epoch: u64, ) -> Pin<Box<dyn Future<Output = Result<Option<Bytes>, StateBackendError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Read the partial aggregate for (vnode, epoch), if any.

Source

fn epoch_complete<'life0, 'life1, 'async_trait>( &'life0 self, epoch: u64, vnodes: &'life1 [u32], ) -> Pin<Box<dyn Future<Output = Result<bool, StateBackendError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Durability barrier: returns true iff every vnode in the set has a partial persisted for epoch.

The checkpoint coordinator calls this after sinks precommit and before they commit. Sinks do not commit until this returns Ok(true).

Source

fn prune_before<'life0, 'async_trait>( &'life0 self, before: u64, ) -> Pin<Box<dyn Future<Output = Result<(), StateBackendError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Garbage-collect every partial and commit marker whose epoch is strictly less than before. Called by the checkpoint coordinator after a successful checkpoint commit so the backend does not retain state for epochs that can never be recovered.

Required — there is intentionally no default. Without it an in-memory backend leaks a Bytes per vnode per checkpoint forever, and an object-store backend leaves epoch=N/… objects forever. Test backends that truly do not accumulate state should implement Ok(()) explicitly so the choice is visible.

Provided Methods§

Source

fn set_authoritative_version(&self, _version: u64)

Raise the backend’s authoritative assignment version — the minimum VnodeRegistry::assignment_version it will accept on write_partial. Hosts call this on boot after adopting an AssignmentSnapshot and on each subsequent rotation so stale writers from a deposed leader are fenced out.

Default is a no-op — backends that opt out of fencing (e.g. the in-process backend used for single-node deployments) inherit it unchanged. Monotonic on implementations that do fence: a call with version <= current is a no-op.

Source

fn authoritative_version(&self) -> u64

Current authoritative assignment version. 0 means the fence is disabled — every caller version is accepted. Backends that do not fence return 0 unconditionally.

Implementors§