Skip to main content

laminar_core/state/
config.rs

1//! [`StateBackendConfig`]: tagged enum selecting the runtime state
2//! backend. Three shapes: `in_process`, `local` (filesystem path),
3//! `object_store` (s3/gcs/file url).
4
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use serde::Deserialize;
9
10use super::{
11    backend::StateBackend, in_process::InProcessBackend, object_store::ObjectStoreBackend,
12};
13
14/// Default number of vnodes if the user does not override.
15pub const DEFAULT_VNODE_CAPACITY: u32 = 256;
16
17fn default_vnode_capacity() -> u32 {
18    DEFAULT_VNODE_CAPACITY
19}
20
21fn default_instance_id() -> String {
22    "local".to_string()
23}
24
25/// How nodes discover one another in `object_store` mode.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum DiscoveryMode {
29    /// Static vnode assignment. `vnodes` and (optionally) `merger_instance`
30    /// are required in this mode.
31    #[default]
32    Static,
33    /// Dynamic membership — peers gossip via chitchat; vnode assignment
34    /// is chosen by the coordination layer.
35    Dynamic,
36}
37
38/// Tagged-union config that selects the runtime [`StateBackend`].
39///
40/// See module docs for the five deployment shapes.
41#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
42#[serde(tag = "backend", rename_all = "snake_case")]
43pub enum StateBackendConfig {
44    /// Non-durable in-process backend. The default.
45    InProcess {
46        /// Number of vnodes the backend should size for.
47        #[serde(default = "default_vnode_capacity")]
48        vnode_capacity: u32,
49    },
50
51    /// Durable single-node backend on a local filesystem path. Shorthand
52    /// for an `object_store` backend with a `file://` URL.
53    Local {
54        /// Filesystem root for state.
55        path: PathBuf,
56        /// Node identity (written into epoch commit markers for audit).
57        #[serde(default = "default_instance_id")]
58        instance_id: String,
59        /// Number of vnodes the backend should size for.
60        #[serde(default = "default_vnode_capacity")]
61        vnode_capacity: u32,
62    },
63
64    /// Durable shared-state backend on S3 / GCS / Azure. Used by all
65    /// distributed-embedded and constellation modes.
66    ObjectStore {
67        /// Object store URL: `s3://bucket/prefix`, `gs://bucket/prefix`,
68        /// etc.
69        url: String,
70        /// This node's identity. Written into epoch manifests and used
71        /// by the assignment-version fence to reject stale writes.
72        instance_id: String,
73        /// Number of vnodes the backend should size for.
74        #[serde(default = "default_vnode_capacity")]
75        vnode_capacity: u32,
76        /// Static vnode subset for this instance. `None` means "all
77        /// vnodes" (useful for the merger instance or for dynamic mode).
78        #[serde(default)]
79        vnodes: Option<Vec<u32>>,
80        /// Optional merger instance — the node that fans in partials
81        /// for sink emission. Only meaningful in static mode.
82        #[serde(default)]
83        merger_instance: Option<String>,
84        /// Discovery strategy: static assignment or chitchat gossip.
85        #[serde(default)]
86        discovery: DiscoveryMode,
87        /// Seed peers for dynamic discovery.
88        #[serde(default)]
89        seed_peers: Vec<String>,
90    },
91}
92
93impl Default for StateBackendConfig {
94    fn default() -> Self {
95        Self::InProcess {
96            vnode_capacity: DEFAULT_VNODE_CAPACITY,
97        }
98    }
99}
100
101/// Failure modes for [`StateBackendConfig::build`].
102#[derive(Debug, thiserror::Error)]
103pub enum StateBackendBuildError {
104    /// The selected backend exists in config but its runtime impl has
105    /// not been wired up yet.
106    #[error("state backend '{0}' is not yet implemented")]
107    NotImplemented(&'static str),
108
109    /// Backend construction failed at the I/O layer.
110    #[error("state backend construction failed: {0}")]
111    Io(String),
112}
113
114impl StateBackendConfig {
115    /// Builder: embedded library, single process.
116    #[must_use]
117    pub fn in_process() -> Self {
118        Self::InProcess {
119            vnode_capacity: DEFAULT_VNODE_CAPACITY,
120        }
121    }
122
123    /// Builder: single-node durable state on the local filesystem.
124    #[must_use]
125    pub fn local(path: impl Into<PathBuf>) -> Self {
126        Self::Local {
127            path: path.into(),
128            instance_id: default_instance_id(),
129            vnode_capacity: DEFAULT_VNODE_CAPACITY,
130        }
131    }
132
133    /// Builder: distributed-embedded over an object store, static mode.
134    #[must_use]
135    pub fn object_store(url: impl Into<String>, instance_id: impl Into<String>) -> Self {
136        Self::ObjectStore {
137            url: url.into(),
138            instance_id: instance_id.into(),
139            vnode_capacity: DEFAULT_VNODE_CAPACITY,
140            vnodes: None,
141            merger_instance: None,
142            discovery: DiscoveryMode::Static,
143            seed_peers: Vec::new(),
144        }
145    }
146
147    /// Instantiate the runtime backend.
148    ///
149    /// Declared `async` because backends added in later iterations
150    /// (object store, distributed) need to perform async setup. The
151    /// in-process path completes synchronously today; callers must
152    /// still `.await` for forward-compatibility.
153    ///
154    /// # Errors
155    /// - [`StateBackendBuildError::NotImplemented`] for remote
156    ///   `object_store` schemes not yet wired (`s3://`, `gs://`, `az://`).
157    /// - [`StateBackendBuildError::Io`] on filesystem/network setup.
158    #[allow(clippy::unused_async)]
159    pub async fn build(&self) -> Result<Arc<dyn StateBackend>, StateBackendBuildError> {
160        match self {
161            Self::InProcess { vnode_capacity } => {
162                Ok(Arc::new(InProcessBackend::new(*vnode_capacity)))
163            }
164            Self::Local {
165                path,
166                instance_id,
167                vnode_capacity,
168            } => {
169                std::fs::create_dir_all(path)
170                    .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
171                let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
172                    .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
173                Ok(Arc::new(ObjectStoreBackend::new(
174                    Arc::new(fs),
175                    instance_id,
176                    *vnode_capacity,
177                )))
178            }
179            Self::ObjectStore {
180                url,
181                instance_id,
182                vnode_capacity,
183                ..
184            } => {
185                let store = build_object_store(url)?;
186                Ok(Arc::new(ObjectStoreBackend::new(
187                    store,
188                    instance_id,
189                    *vnode_capacity,
190                )))
191            }
192        }
193    }
194
195    /// Filesystem path for durable state, if any. Returns `None` for
196    /// non-filesystem backends.
197    #[must_use]
198    pub fn local_storage_dir(&self) -> Option<&std::path::Path> {
199        match self {
200            Self::Local { path, .. } => Some(path.as_path()),
201            _ => None,
202        }
203    }
204
205    /// Build the underlying `object_store` handle (if any) so callers
206    /// that need to share the same store — e.g. an
207    /// `AssignmentSnapshotStore` alongside the state backend — can
208    /// avoid re-parsing the URL. `None` for `InProcess`.
209    ///
210    /// # Errors
211    /// Same failure modes as [`Self::build`].
212    pub fn build_object_store(
213        &self,
214    ) -> Result<Option<Arc<dyn ::object_store::ObjectStore>>, StateBackendBuildError> {
215        match self {
216            Self::InProcess { .. } => Ok(None),
217            Self::Local { path, .. } => {
218                std::fs::create_dir_all(path)
219                    .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
220                let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
221                    .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
222                Ok(Some(Arc::new(fs)))
223            }
224            Self::ObjectStore { url, .. } => Ok(Some(build_object_store(url)?)),
225        }
226    }
227
228    /// Returns true if this backend persists state across process
229    /// restarts.
230    #[must_use]
231    pub fn is_durable(&self) -> bool {
232        !matches!(self, Self::InProcess { .. })
233    }
234
235    /// Number of vnodes this backend is sized for.
236    #[must_use]
237    pub fn vnode_capacity(&self) -> u32 {
238        match self {
239            Self::InProcess { vnode_capacity }
240            | Self::Local { vnode_capacity, .. }
241            | Self::ObjectStore { vnode_capacity, .. } => *vnode_capacity,
242        }
243    }
244}
245
246/// Dispatch a URL to the matching `object_store` implementation.
247///
248/// Supported: `file://<path>`.
249/// Returns `NotImplemented` for `s3://`, `gs://`, `az://` — these will
250/// be added in a later iteration once the workspace pulls in
251/// `url::Url` and the corresponding `object_store` features.
252fn build_object_store(
253    url: &str,
254) -> Result<Arc<dyn ::object_store::ObjectStore>, StateBackendBuildError> {
255    if let Some(path) = url.strip_prefix("file://") {
256        let path = path.trim_start_matches('/');
257        std::fs::create_dir_all(path).map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
258        let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
259            .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
260        Ok(Arc::new(fs))
261    } else {
262        Err(StateBackendBuildError::NotImplemented("object_store"))
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn parse_in_process_minimal() {
272        let toml = r#"backend = "in_process""#;
273        let c: StateBackendConfig = toml::from_str(toml).unwrap();
274        assert!(matches!(
275            c,
276            StateBackendConfig::InProcess {
277                vnode_capacity: 256
278            }
279        ));
280        assert!(!c.is_durable());
281        assert!(c.local_storage_dir().is_none());
282    }
283
284    #[test]
285    fn parse_local_with_path() {
286        let toml = r#"
287backend = "local"
288path = "/var/laminar"
289vnode_capacity = 128
290"#;
291        let c: StateBackendConfig = toml::from_str(toml).unwrap();
292        assert_eq!(
293            c.local_storage_dir(),
294            Some(std::path::Path::new("/var/laminar"))
295        );
296        assert!(c.is_durable());
297        if let StateBackendConfig::Local { vnode_capacity, .. } = c {
298            assert_eq!(vnode_capacity, 128);
299        } else {
300            panic!("expected Local");
301        }
302    }
303
304    #[test]
305    fn parse_object_store_static() {
306        let toml = r#"
307backend = "object_store"
308url = "s3://bucket/laminar"
309instance_id = "node-0"
310vnodes = [0, 1, 2, 3]
311merger_instance = "node-0"
312"#;
313        let c: StateBackendConfig = toml::from_str(toml).unwrap();
314        match c {
315            StateBackendConfig::ObjectStore {
316                url,
317                instance_id,
318                vnodes,
319                merger_instance,
320                discovery,
321                ..
322            } => {
323                assert_eq!(url, "s3://bucket/laminar");
324                assert_eq!(instance_id, "node-0");
325                assert_eq!(vnodes, Some(vec![0, 1, 2, 3]));
326                assert_eq!(merger_instance.as_deref(), Some("node-0"));
327                assert_eq!(discovery, DiscoveryMode::Static);
328            }
329            _ => panic!("expected ObjectStore"),
330        }
331    }
332
333    #[test]
334    fn parse_object_store_dynamic() {
335        let toml = r#"
336backend = "object_store"
337url = "s3://bucket/laminar"
338instance_id = "node-0"
339discovery = "dynamic"
340seed_peers = ["10.0.0.1:7946", "10.0.0.2:7946"]
341"#;
342        let c: StateBackendConfig = toml::from_str(toml).unwrap();
343        match c {
344            StateBackendConfig::ObjectStore {
345                discovery,
346                seed_peers,
347                ..
348            } => {
349                assert_eq!(discovery, DiscoveryMode::Dynamic);
350                assert_eq!(seed_peers.len(), 2);
351            }
352            _ => panic!("expected ObjectStore dynamic"),
353        }
354    }
355
356    #[tokio::test]
357    async fn build_in_process_returns_backend() {
358        use bytes::Bytes;
359        let c = StateBackendConfig::in_process();
360        let backend = c.build().await.unwrap();
361        backend
362            .write_partial(0, 1, 0, Bytes::from_static(b"ok"))
363            .await
364            .unwrap();
365        assert_eq!(
366            &backend.read_partial(0, 1).await.unwrap().unwrap()[..],
367            b"ok",
368        );
369    }
370
371    #[tokio::test]
372    async fn build_local_instantiates_backend() {
373        let dir = tempfile::tempdir().unwrap();
374        let c = StateBackendConfig::local(dir.path());
375        let backend = c.build().await.unwrap();
376        backend
377            .write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
378            .await
379            .unwrap();
380        assert_eq!(
381            &backend.read_partial(0, 1).await.unwrap().unwrap()[..],
382            b"z",
383        );
384    }
385
386    #[tokio::test]
387    async fn build_object_store_file_url_instantiates_backend() {
388        let dir = tempfile::tempdir().unwrap();
389        let url = format!(
390            "file://{}",
391            dir.path().display().to_string().replace('\\', "/")
392        );
393        let c = StateBackendConfig::object_store(url, "node-0");
394        let backend = c.build().await.unwrap();
395        backend
396            .write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
397            .await
398            .unwrap();
399        let got = backend.read_partial(0, 1).await.unwrap().unwrap();
400        assert_eq!(&got[..], b"z");
401    }
402
403    #[tokio::test]
404    async fn build_object_store_s3_returns_not_implemented() {
405        let c = StateBackendConfig::object_store("s3://bucket/path", "node-0");
406        assert!(matches!(
407            c.build().await,
408            Err(StateBackendBuildError::NotImplemented("object_store"))
409        ));
410    }
411
412    #[test]
413    fn default_is_in_process() {
414        let c = StateBackendConfig::default();
415        assert!(matches!(c, StateBackendConfig::InProcess { .. }));
416    }
417
418    #[test]
419    fn partial_eq_works() {
420        assert_eq!(
421            StateBackendConfig::in_process(),
422            StateBackendConfig::in_process()
423        );
424        assert_ne!(
425            StateBackendConfig::in_process(),
426            StateBackendConfig::local("/tmp/x")
427        );
428    }
429}