laminar_core/state/backend.rs
1//! The `StateBackend` trait: the single contract between streaming
2//! operators and the storage tier. Backends persist per-(vnode, epoch)
3//! partial-state blobs and expose an `epoch_complete` durability gate.
4
5use async_trait::async_trait;
6use bytes::Bytes;
7
8/// Errors a [`StateBackend`] can raise.
9#[derive(Debug, thiserror::Error)]
10pub enum StateBackendError {
11 /// Underlying I/O failure (filesystem, `object_store`, network).
12 #[error("I/O error: {0}")]
13 Io(String),
14
15 /// Serialization or framing error in the stored partial bytes.
16 #[error("serialization error: {0}")]
17 Serialization(String),
18
19 /// The partial for `(vnode, epoch)` is not present.
20 #[error("not found: vnode={vnode} epoch={epoch}")]
21 NotFound {
22 /// Virtual node ID.
23 vnode: u32,
24 /// Epoch number.
25 epoch: u64,
26 },
27
28 /// The caller's assignment version is older than the backend's
29 /// authoritative version. Thrown by [`StateBackend::write_partial`]
30 /// when a stale writer (e.g. the losing side of a split-brain)
31 /// attempts to persist state at a version that has since been
32 /// superseded. The caller should abandon the write, refresh its
33 /// assignment snapshot, and retry at the new version.
34 #[error("stale assignment version: caller={caller} < authoritative={authoritative}")]
35 StaleVersion {
36 /// Version the writer believes is current.
37 caller: u64,
38 /// Authoritative version seen by the backend.
39 authoritative: u64,
40 },
41
42 /// Raised by [`StateBackend::epoch_complete`] when the caller
43 /// observes an existing commit marker whose audit body names a
44 /// different instance. Two leaders raced on the same epoch and
45 /// the other one won. The caller (losing leader) must NOT proceed
46 /// with its own sink-commit phase — its view of the state is not
47 /// the one durably sealed.
48 #[error(
49 "split-brain commit detected: epoch already committed by {committer:?}, we are {self_id:?}"
50 )]
51 SplitBrainCommit {
52 /// Instance id recorded in the existing commit marker.
53 committer: String,
54 /// This backend's own instance id.
55 self_id: String,
56 },
57}
58
59/// A pluggable state store used by streaming operators for partial
60/// aggregates and watermarks.
61///
62/// ## Object Safety
63///
64/// The trait is deliberately object-safe:
65///
66/// - No generic methods.
67/// - No `Self`-returning methods.
68/// - All async methods are `async_trait` boxed futures.
69///
70/// This lets the engine hold `Arc<dyn StateBackend>` and swap
71/// implementations at construction time without touching call sites.
72///
73/// ## Concurrency
74///
75/// Implementations must be `Send + Sync + 'static`. The engine expects
76/// to share a single backend across many worker tasks concurrently.
77///
78/// ## Idempotence
79///
80/// [`write_partial`](Self::write_partial) must be idempotent for a
81/// given `(vnode, epoch)` pair — recovery may replay the same write.
82/// Overwriting the same bytes is acceptable; conflicting bytes for the
83/// same key indicate a bug upstream.
84#[async_trait]
85pub trait StateBackend: Send + Sync + 'static {
86 /// Persist a partial aggregate for `(vnode, epoch)`.
87 ///
88 /// `assignment_version` is the [`VnodeRegistry::assignment_version`]
89 /// the writer observed when it started this write. Backends that
90 /// implement the split-brain fence compare it against their own
91 /// authoritative version and return
92 /// [`StateBackendError::StaleVersion`] if the writer is behind.
93 /// Backends that opt out of fencing accept any version.
94 ///
95 /// [`VnodeRegistry::assignment_version`]: crate::state::VnodeRegistry::assignment_version
96 async fn write_partial(
97 &self,
98 vnode: u32,
99 epoch: u64,
100 assignment_version: u64,
101 bytes: Bytes,
102 ) -> Result<(), StateBackendError>;
103
104 /// Read the partial aggregate for `(vnode, epoch)`, if any.
105 async fn read_partial(
106 &self,
107 vnode: u32,
108 epoch: u64,
109 ) -> Result<Option<Bytes>, StateBackendError>;
110
111 /// Durability barrier: returns true iff every `vnode` in the set
112 /// has a partial persisted for `epoch`.
113 ///
114 /// The checkpoint coordinator calls this after sinks precommit
115 /// and before they commit. Sinks do not commit until this returns
116 /// `Ok(true)`.
117 async fn epoch_complete(&self, epoch: u64, vnodes: &[u32]) -> Result<bool, StateBackendError>;
118
119 /// Garbage-collect every partial and commit marker whose epoch is
120 /// strictly less than `before`. Called by the checkpoint
121 /// coordinator after a successful checkpoint commit so the backend
122 /// does not retain state for epochs that can never be recovered.
123 ///
124 /// Required — there is intentionally no default. Without it an
125 /// in-memory backend leaks a `Bytes` per vnode per checkpoint
126 /// forever, and an object-store backend leaves `epoch=N/…` objects
127 /// forever. Test backends that truly do not accumulate state should
128 /// implement `Ok(())` explicitly so the choice is visible.
129 async fn prune_before(&self, before: u64) -> Result<(), StateBackendError>;
130
131 /// Raise the backend's authoritative assignment version — the
132 /// minimum [`VnodeRegistry::assignment_version`] it will accept on
133 /// [`write_partial`](Self::write_partial). Hosts call this on boot
134 /// after adopting an `AssignmentSnapshot` and on each subsequent
135 /// rotation so stale writers from a deposed leader are fenced out.
136 ///
137 /// Default is a no-op — backends that opt out of fencing (e.g. the
138 /// in-process backend used for single-node deployments) inherit it
139 /// unchanged. Monotonic on implementations that do fence: a call
140 /// with `version <= current` is a no-op.
141 ///
142 /// [`VnodeRegistry::assignment_version`]: crate::state::VnodeRegistry::assignment_version
143 fn set_authoritative_version(&self, _version: u64) {}
144
145 /// Current authoritative assignment version. `0` means the fence is
146 /// disabled — every caller version is accepted. Backends that do
147 /// not fence return `0` unconditionally.
148 fn authoritative_version(&self) -> u64 {
149 0
150 }
151}
152
153const _: Option<&dyn StateBackend> = None;