Skip to main content

laminar_core/state/
backend.rs

1//! The `StateBackend` trait: the single contract between streaming
2//! operators and the storage tier. Backends persist per-(vnode, epoch)
3//! partial-state blobs and expose an `epoch_complete` durability gate.
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8/// Errors a [`StateBackend`] can raise.
9#[derive(Debug, thiserror::Error)]
10pub enum StateBackendError {
11    /// Underlying I/O failure (filesystem, `object_store`, network).
12    #[error("I/O error: {0}")]
13    Io(String),
14
15    /// Serialization or framing error in the stored partial bytes.
16    #[error("serialization error: {0}")]
17    Serialization(String),
18
19    /// The partial for `(vnode, epoch)` is not present.
20    #[error("not found: vnode={vnode} epoch={epoch}")]
21    NotFound {
22        /// Virtual node ID.
23        vnode: u32,
24        /// Epoch number.
25        epoch: u64,
26    },
27
28    /// The caller's assignment version is older than the backend's
29    /// authoritative version. Thrown by [`StateBackend::write_partial`]
30    /// when a stale writer (e.g. the losing side of a split-brain)
31    /// attempts to persist state at a version that has since been
32    /// superseded. The caller should abandon the write, refresh its
33    /// assignment snapshot, and retry at the new version.
34    #[error("stale assignment version: caller={caller} < authoritative={authoritative}")]
35    StaleVersion {
36        /// Version the writer believes is current.
37        caller: u64,
38        /// Authoritative version seen by the backend.
39        authoritative: u64,
40    },
41
42    /// Raised by [`StateBackend::epoch_complete`] when the caller
43    /// observes an existing commit marker whose audit body names a
44    /// different instance. Two leaders raced on the same epoch and
45    /// the other one won. The caller (losing leader) must NOT proceed
46    /// with its own sink-commit phase — its view of the state is not
47    /// the one durably sealed.
48    #[error(
49        "split-brain commit detected: epoch already committed by {committer:?}, we are {self_id:?}"
50    )]
51    SplitBrainCommit {
52        /// Instance id recorded in the existing commit marker.
53        committer: String,
54        /// This backend's own instance id.
55        self_id: String,
56    },
57}
58
59/// A pluggable state store used by streaming operators for partial
60/// aggregates and watermarks.
61///
62/// ## Object Safety
63///
64/// The trait is deliberately object-safe:
65///
66/// - No generic methods.
67/// - No `Self`-returning methods.
68/// - All async methods are `async_trait` boxed futures.
69///
70/// This lets the engine hold `Arc<dyn StateBackend>` and swap
71/// implementations at construction time without touching call sites.
72///
73/// ## Concurrency
74///
75/// Implementations must be `Send + Sync + 'static`. The engine expects
76/// to share a single backend across many worker tasks concurrently.
77///
78/// ## Idempotence
79///
80/// [`write_partial`](Self::write_partial) must be idempotent for a
81/// given `(vnode, epoch)` pair — recovery may replay the same write.
82/// Overwriting the same bytes is acceptable; conflicting bytes for the
83/// same key indicate a bug upstream.
84#[async_trait]
85pub trait StateBackend: Send + Sync + 'static {
86    /// Persist a partial aggregate for `(vnode, epoch)`.
87    ///
88    /// `assignment_version` is the [`VnodeRegistry::assignment_version`]
89    /// the writer observed when it started this write. Backends that
90    /// implement the split-brain fence compare it against their own
91    /// authoritative version and return
92    /// [`StateBackendError::StaleVersion`] if the writer is behind.
93    /// Backends that opt out of fencing accept any version.
94    ///
95    /// [`VnodeRegistry::assignment_version`]: crate::state::VnodeRegistry::assignment_version
96    async fn write_partial(
97        &self,
98        vnode: u32,
99        epoch: u64,
100        assignment_version: u64,
101        bytes: Bytes,
102    ) -> Result<(), StateBackendError>;
103
104    /// Read the partial aggregate for `(vnode, epoch)`, if any.
105    async fn read_partial(
106        &self,
107        vnode: u32,
108        epoch: u64,
109    ) -> Result<Option<Bytes>, StateBackendError>;
110
111    /// Durability barrier: returns true iff every `vnode` in the set
112    /// has a partial persisted for `epoch`.
113    ///
114    /// The checkpoint coordinator calls this after sinks precommit
115    /// and before they commit. Sinks do not commit until this returns
116    /// `Ok(true)`.
117    async fn epoch_complete(&self, epoch: u64, vnodes: &[u32]) -> Result<bool, StateBackendError>;
118
119    /// Garbage-collect every partial and commit marker whose epoch is
120    /// strictly less than `before`. Called by the checkpoint
121    /// coordinator after a successful checkpoint commit so the backend
122    /// does not retain state for epochs that can never be recovered.
123    ///
124    /// Required — there is intentionally no default. Without it an
125    /// in-memory backend leaks a `Bytes` per vnode per checkpoint
126    /// forever, and an object-store backend leaves `epoch=N/…` objects
127    /// forever. Test backends that truly do not accumulate state should
128    /// implement `Ok(())` explicitly so the choice is visible.
129    async fn prune_before(&self, before: u64) -> Result<(), StateBackendError>;
130
131    /// Raise the backend's authoritative assignment version — the
132    /// minimum [`VnodeRegistry::assignment_version`] it will accept on
133    /// [`write_partial`](Self::write_partial). Hosts call this on boot
134    /// after adopting an `AssignmentSnapshot` and on each subsequent
135    /// rotation so stale writers from a deposed leader are fenced out.
136    ///
137    /// Default is a no-op — backends that opt out of fencing (e.g. the
138    /// in-process backend used for single-node deployments) inherit it
139    /// unchanged. Monotonic on implementations that do fence: a call
140    /// with `version <= current` is a no-op.
141    ///
142    /// [`VnodeRegistry::assignment_version`]: crate::state::VnodeRegistry::assignment_version
143    fn set_authoritative_version(&self, _version: u64) {}
144
145    /// Current authoritative assignment version. `0` means the fence is
146    /// disabled — every caller version is accepted. Backends that do
147    /// not fence return `0` unconditionally.
148    fn authoritative_version(&self) -> u64 {
149        0
150    }
151}
152
153const _: Option<&dyn StateBackend> = None;