Skip to main content

laminar_core/state/
config.rs

1//! [`StateBackendConfig`]: tagged enum selecting the runtime state
2//! backend. Three shapes: `in_process`, `local` (filesystem path),
3//! `object_store` (s3/gcs/file url).
4
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use serde::Deserialize;
9
10use super::{
11    backend::StateBackend, in_process::InProcessBackend, object_store::ObjectStoreBackend,
12};
13
14/// Default number of vnodes if the user does not override.
15pub const DEFAULT_VNODE_CAPACITY: u32 = 256;
16
17fn default_vnode_capacity() -> u32 {
18    DEFAULT_VNODE_CAPACITY
19}
20
21fn default_instance_id() -> String {
22    "local".to_string()
23}
24
25/// How nodes discover one another in `object_store` mode.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum DiscoveryMode {
29    /// Static vnode assignment. `vnodes` and (optionally) `merger_instance`
30    /// are required in this mode.
31    #[default]
32    Static,
33    /// Dynamic membership — peers gossip via chitchat; vnode assignment
34    /// is chosen by the coordination layer.
35    Dynamic,
36}
37
38/// Cloud credential/config overrides for the state object store.
39/// `Debug` redacts values — they can hold secrets
40/// (`aws_secret_access_key`, ...).
41#[derive(Clone, PartialEq, Eq, Default, Deserialize)]
42#[serde(transparent)]
43pub struct StorageOptions(pub rustc_hash::FxHashMap<String, String>);
44
45impl std::fmt::Debug for StorageOptions {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_map()
48            .entries(self.0.keys().map(|k| (k, "[REDACTED]")))
49            .finish()
50    }
51}
52
53/// Tagged-union config that selects the runtime [`StateBackend`].
54///
55/// See module docs for the five deployment shapes.
56#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
57#[serde(tag = "backend", rename_all = "snake_case")]
58pub enum StateBackendConfig {
59    /// Non-durable in-process backend. The default.
60    InProcess {
61        /// Number of vnodes the backend should size for.
62        #[serde(default = "default_vnode_capacity")]
63        vnode_capacity: u32,
64    },
65
66    /// Durable single-node backend on a local filesystem path. Shorthand
67    /// for an `object_store` backend with a `file://` URL.
68    Local {
69        /// Filesystem root for state.
70        path: PathBuf,
71        /// Node identity (written into epoch commit markers for audit).
72        #[serde(default = "default_instance_id")]
73        instance_id: String,
74        /// Number of vnodes the backend should size for.
75        #[serde(default = "default_vnode_capacity")]
76        vnode_capacity: u32,
77    },
78
79    /// Durable shared-state backend on S3 / GCS / Azure. Used by all
80    /// distributed-embedded and cluster modes.
81    ObjectStore {
82        /// Object store URL: `s3://bucket/prefix`, `gs://bucket/prefix`,
83        /// etc.
84        url: String,
85        /// Cloud credentials/config overrides (e.g. `endpoint`,
86        /// `aws_access_key_id`), same keys as `[checkpoint.storage]`.
87        /// Anything absent falls back to the provider's standard env
88        /// vars (`AWS_ACCESS_KEY_ID`, ...).
89        #[serde(default)]
90        storage: StorageOptions,
91        /// This node's identity. Written into epoch manifests and used
92        /// by the assignment-version fence to reject stale writes.
93        instance_id: String,
94        /// Number of vnodes the backend should size for.
95        #[serde(default = "default_vnode_capacity")]
96        vnode_capacity: u32,
97        /// Static vnode subset for this instance. `None` means "all
98        /// vnodes" (useful for the merger instance or for dynamic mode).
99        #[serde(default)]
100        vnodes: Option<Vec<u32>>,
101        /// Optional merger instance — the node that fans in partials
102        /// for sink emission. Only meaningful in static mode.
103        #[serde(default)]
104        merger_instance: Option<String>,
105        /// Discovery strategy: static assignment or chitchat gossip.
106        #[serde(default)]
107        discovery: DiscoveryMode,
108        /// Seed peers for dynamic discovery.
109        #[serde(default)]
110        seed_peers: Vec<String>,
111    },
112}
113
114impl Default for StateBackendConfig {
115    fn default() -> Self {
116        Self::InProcess {
117            vnode_capacity: DEFAULT_VNODE_CAPACITY,
118        }
119    }
120}
121
122/// Failure modes for [`StateBackendConfig::build`].
123#[derive(Debug, thiserror::Error)]
124pub enum StateBackendBuildError {
125    /// Object store construction failed (bad URL, missing feature
126    /// flag for the scheme, missing credentials, ...).
127    #[error("state backend object store: {0}")]
128    Store(#[from] crate::checkpoint::object_store_builder::ObjectStoreBuilderError),
129
130    /// Backend construction failed at the I/O layer.
131    #[error("state backend construction failed: {0}")]
132    Io(String),
133}
134
135impl StateBackendConfig {
136    /// Builder: embedded library, single process.
137    #[must_use]
138    pub fn in_process() -> Self {
139        Self::InProcess {
140            vnode_capacity: DEFAULT_VNODE_CAPACITY,
141        }
142    }
143
144    /// Builder: single-node durable state on the local filesystem.
145    #[must_use]
146    pub fn local(path: impl Into<PathBuf>) -> Self {
147        Self::Local {
148            path: path.into(),
149            instance_id: default_instance_id(),
150            vnode_capacity: DEFAULT_VNODE_CAPACITY,
151        }
152    }
153
154    /// Builder: distributed-embedded over an object store, static mode.
155    /// Credentials resolve from the provider's standard env vars; use
156    /// the `storage` config field for explicit overrides.
157    #[must_use]
158    pub fn object_store(url: impl Into<String>, instance_id: impl Into<String>) -> Self {
159        Self::ObjectStore {
160            url: url.into(),
161            storage: StorageOptions::default(),
162            instance_id: instance_id.into(),
163            vnode_capacity: DEFAULT_VNODE_CAPACITY,
164            vnodes: None,
165            merger_instance: None,
166            discovery: DiscoveryMode::Static,
167            seed_peers: Vec::new(),
168        }
169    }
170
171    /// Instantiate the runtime backend.
172    ///
173    /// Declared `async` because backends added in later iterations
174    /// (object store, distributed) need to perform async setup. The
175    /// in-process path completes synchronously today; callers must
176    /// still `.await` for forward-compatibility.
177    ///
178    /// # Errors
179    /// - [`StateBackendBuildError::Store`] for a bad URL, a scheme
180    ///   whose feature flag (`aws`/`gcs`/`azure`) is not compiled in,
181    ///   or cloud-client construction failure.
182    /// - [`StateBackendBuildError::Io`] on filesystem setup.
183    #[allow(clippy::unused_async)]
184    pub async fn build(&self) -> Result<Arc<dyn StateBackend>, StateBackendBuildError> {
185        match self {
186            Self::InProcess { vnode_capacity } => {
187                Ok(Arc::new(InProcessBackend::new(*vnode_capacity)))
188            }
189            Self::Local {
190                path,
191                instance_id,
192                vnode_capacity,
193            } => {
194                std::fs::create_dir_all(path)
195                    .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
196                let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
197                    .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
198                Ok(Arc::new(ObjectStoreBackend::new(
199                    Arc::new(fs),
200                    instance_id,
201                    *vnode_capacity,
202                )))
203            }
204            Self::ObjectStore {
205                url,
206                storage,
207                instance_id,
208                vnode_capacity,
209                ..
210            } => {
211                let store = cloud_store(url, storage)?;
212                Ok(Arc::new(ObjectStoreBackend::new(
213                    store,
214                    instance_id,
215                    *vnode_capacity,
216                )))
217            }
218        }
219    }
220
221    /// Filesystem path for durable state, if any. Returns `None` for
222    /// non-filesystem backends.
223    #[must_use]
224    pub fn local_storage_dir(&self) -> Option<&std::path::Path> {
225        match self {
226            Self::Local { path, .. } => Some(path.as_path()),
227            _ => None,
228        }
229    }
230
231    /// Build the underlying `object_store` handle (if any) so callers
232    /// that need to share the same store — e.g. an
233    /// `AssignmentSnapshotStore` alongside the state backend — can
234    /// avoid re-parsing the URL. `None` for `InProcess`.
235    ///
236    /// # Errors
237    /// Same failure modes as [`Self::build`].
238    pub fn build_object_store(
239        &self,
240    ) -> Result<Option<Arc<dyn ::object_store::ObjectStore>>, StateBackendBuildError> {
241        match self {
242            Self::InProcess { .. } => Ok(None),
243            Self::Local { path, .. } => {
244                std::fs::create_dir_all(path)
245                    .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
246                let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
247                    .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
248                Ok(Some(Arc::new(fs)))
249            }
250            Self::ObjectStore { url, storage, .. } => Ok(Some(cloud_store(url, storage)?)),
251        }
252    }
253
254    /// Returns true if this backend persists state across process
255    /// restarts.
256    #[must_use]
257    pub fn is_durable(&self) -> bool {
258        !matches!(self, Self::InProcess { .. })
259    }
260
261    /// Number of vnodes this backend is sized for.
262    #[must_use]
263    pub fn vnode_capacity(&self) -> u32 {
264        match self {
265            Self::InProcess { vnode_capacity }
266            | Self::Local { vnode_capacity, .. }
267            | Self::ObjectStore { vnode_capacity, .. } => *vnode_capacity,
268        }
269    }
270}
271
272/// Cloud-store construction shared by [`StateBackendConfig::build`] and
273/// [`StateBackendConfig::build_object_store`]: translates the
274/// `StorageOptions` map into the builder's std-HashMap parameter
275/// (cold path, runs once at startup).
276fn cloud_store(
277    url: &str,
278    storage: &StorageOptions,
279) -> Result<Arc<dyn ::object_store::ObjectStore>, StateBackendBuildError> {
280    Ok(crate::checkpoint::object_store_builder::build_object_store(
281        url,
282        &storage
283            .0
284            .iter()
285            .map(|(k, v)| (k.clone(), v.clone()))
286            .collect(),
287    )?)
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn parse_in_process_minimal() {
296        let toml = r#"backend = "in_process""#;
297        let c: StateBackendConfig = toml::from_str(toml).unwrap();
298        assert!(matches!(
299            c,
300            StateBackendConfig::InProcess {
301                vnode_capacity: 256
302            }
303        ));
304        assert!(!c.is_durable());
305        assert!(c.local_storage_dir().is_none());
306    }
307
308    #[test]
309    fn parse_local_with_path() {
310        let toml = r#"
311backend = "local"
312path = "/var/laminar"
313vnode_capacity = 128
314"#;
315        let c: StateBackendConfig = toml::from_str(toml).unwrap();
316        assert_eq!(
317            c.local_storage_dir(),
318            Some(std::path::Path::new("/var/laminar"))
319        );
320        assert!(c.is_durable());
321        if let StateBackendConfig::Local { vnode_capacity, .. } = c {
322            assert_eq!(vnode_capacity, 128);
323        } else {
324            panic!("expected Local");
325        }
326    }
327
328    #[test]
329    fn parse_object_store_static() {
330        let toml = r#"
331backend = "object_store"
332url = "s3://bucket/laminar"
333instance_id = "node-0"
334vnodes = [0, 1, 2, 3]
335merger_instance = "node-0"
336"#;
337        let c: StateBackendConfig = toml::from_str(toml).unwrap();
338        match c {
339            StateBackendConfig::ObjectStore {
340                url,
341                instance_id,
342                vnodes,
343                merger_instance,
344                discovery,
345                ..
346            } => {
347                assert_eq!(url, "s3://bucket/laminar");
348                assert_eq!(instance_id, "node-0");
349                assert_eq!(vnodes, Some(vec![0, 1, 2, 3]));
350                assert_eq!(merger_instance.as_deref(), Some("node-0"));
351                assert_eq!(discovery, DiscoveryMode::Static);
352            }
353            _ => panic!("expected ObjectStore"),
354        }
355    }
356
357    #[test]
358    fn parse_object_store_dynamic() {
359        let toml = r#"
360backend = "object_store"
361url = "s3://bucket/laminar"
362instance_id = "node-0"
363discovery = "dynamic"
364seed_peers = ["10.0.0.1:7946", "10.0.0.2:7946"]
365"#;
366        let c: StateBackendConfig = toml::from_str(toml).unwrap();
367        match c {
368            StateBackendConfig::ObjectStore {
369                discovery,
370                seed_peers,
371                ..
372            } => {
373                assert_eq!(discovery, DiscoveryMode::Dynamic);
374                assert_eq!(seed_peers.len(), 2);
375            }
376            _ => panic!("expected ObjectStore dynamic"),
377        }
378    }
379
380    #[tokio::test]
381    async fn build_in_process_returns_backend() {
382        use bytes::Bytes;
383        let c = StateBackendConfig::in_process();
384        let backend = c.build().await.unwrap();
385        backend
386            .write_partial(0, 1, 0, Bytes::from_static(b"ok"))
387            .await
388            .unwrap();
389        assert_eq!(
390            &backend.read_partial(0, 1).await.unwrap().unwrap()[..],
391            b"ok",
392        );
393    }
394
395    #[tokio::test]
396    async fn build_local_instantiates_backend() {
397        let dir = tempfile::tempdir().unwrap();
398        let c = StateBackendConfig::local(dir.path());
399        let backend = c.build().await.unwrap();
400        backend
401            .write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
402            .await
403            .unwrap();
404        assert_eq!(
405            &backend.read_partial(0, 1).await.unwrap().unwrap()[..],
406            b"z",
407        );
408    }
409
410    #[tokio::test]
411    async fn build_object_store_file_url_instantiates_backend() {
412        let dir = tempfile::tempdir().unwrap();
413        let url = format!(
414            "file://{}",
415            dir.path().display().to_string().replace('\\', "/")
416        );
417        let c = StateBackendConfig::object_store(url, "node-0");
418        let backend = c.build().await.unwrap();
419        backend
420            .write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
421            .await
422            .unwrap();
423        let got = backend.read_partial(0, 1).await.unwrap().unwrap();
424        assert_eq!(&got[..], b"z");
425    }
426
427    /// Without the `aws` feature an `s3://` URL must fail with the
428    /// missing-feature error, not silently fall back to local.
429    #[cfg(not(feature = "aws"))]
430    #[tokio::test]
431    async fn build_object_store_s3_requires_aws_feature() {
432        use crate::checkpoint::object_store_builder::ObjectStoreBuilderError;
433
434        let c = StateBackendConfig::object_store("s3://bucket/path", "node-0");
435        let err = match c.build().await {
436            Ok(_) => panic!("s3 must not build without the aws feature"),
437            Err(e) => e,
438        };
439        assert!(
440            matches!(
441                err,
442                StateBackendBuildError::Store(ObjectStoreBuilderError::MissingFeature { .. })
443            ),
444            "got: {err}",
445        );
446    }
447
448    /// With the `aws` feature, an `s3://` URL + explicit `storage`
449    /// credentials builds a client (construction is offline — no
450    /// network until first use).
451    #[cfg(feature = "aws")]
452    #[tokio::test]
453    async fn build_object_store_s3_builds_with_storage_options() {
454        let toml = r#"
455backend = "object_store"
456url = "s3://bucket/laminar"
457instance_id = "node-0"
458
459[storage]
460endpoint = "http://127.0.0.1:9000"
461aws_access_key_id = "k"
462aws_secret_access_key = "s"
463region = "us-east-1"
464allow_http = "true"
465"#;
466        let c: StateBackendConfig = toml::from_str(toml).unwrap();
467        c.build().await.expect("s3 client must build offline");
468    }
469
470    #[test]
471    fn default_is_in_process() {
472        let c = StateBackendConfig::default();
473        assert!(matches!(c, StateBackendConfig::InProcess { .. }));
474    }
475
476    #[test]
477    fn partial_eq_works() {
478        assert_eq!(
479            StateBackendConfig::in_process(),
480            StateBackendConfig::in_process()
481        );
482        assert_ne!(
483            StateBackendConfig::in_process(),
484            StateBackendConfig::local("/tmp/x")
485        );
486    }
487}