1use std::path::PathBuf;
6use std::sync::Arc;
7
8use serde::Deserialize;
9
10use super::{
11 backend::StateBackend, in_process::InProcessBackend, object_store::ObjectStoreBackend,
12};
13
14pub const DEFAULT_VNODE_CAPACITY: u32 = 256;
16
17fn default_vnode_capacity() -> u32 {
18 DEFAULT_VNODE_CAPACITY
19}
20
21fn default_instance_id() -> String {
22 "local".to_string()
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum DiscoveryMode {
29 #[default]
32 Static,
33 Dynamic,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
42#[serde(tag = "backend", rename_all = "snake_case")]
43pub enum StateBackendConfig {
44 InProcess {
46 #[serde(default = "default_vnode_capacity")]
48 vnode_capacity: u32,
49 },
50
51 Local {
54 path: PathBuf,
56 #[serde(default = "default_instance_id")]
58 instance_id: String,
59 #[serde(default = "default_vnode_capacity")]
61 vnode_capacity: u32,
62 },
63
64 ObjectStore {
67 url: String,
70 instance_id: String,
73 #[serde(default = "default_vnode_capacity")]
75 vnode_capacity: u32,
76 #[serde(default)]
79 vnodes: Option<Vec<u32>>,
80 #[serde(default)]
83 merger_instance: Option<String>,
84 #[serde(default)]
86 discovery: DiscoveryMode,
87 #[serde(default)]
89 seed_peers: Vec<String>,
90 },
91}
92
93impl Default for StateBackendConfig {
94 fn default() -> Self {
95 Self::InProcess {
96 vnode_capacity: DEFAULT_VNODE_CAPACITY,
97 }
98 }
99}
100
101#[derive(Debug, thiserror::Error)]
103pub enum StateBackendBuildError {
104 #[error("state backend '{0}' is not yet implemented")]
107 NotImplemented(&'static str),
108
109 #[error("state backend construction failed: {0}")]
111 Io(String),
112}
113
114impl StateBackendConfig {
115 #[must_use]
117 pub fn in_process() -> Self {
118 Self::InProcess {
119 vnode_capacity: DEFAULT_VNODE_CAPACITY,
120 }
121 }
122
123 #[must_use]
125 pub fn local(path: impl Into<PathBuf>) -> Self {
126 Self::Local {
127 path: path.into(),
128 instance_id: default_instance_id(),
129 vnode_capacity: DEFAULT_VNODE_CAPACITY,
130 }
131 }
132
133 #[must_use]
135 pub fn object_store(url: impl Into<String>, instance_id: impl Into<String>) -> Self {
136 Self::ObjectStore {
137 url: url.into(),
138 instance_id: instance_id.into(),
139 vnode_capacity: DEFAULT_VNODE_CAPACITY,
140 vnodes: None,
141 merger_instance: None,
142 discovery: DiscoveryMode::Static,
143 seed_peers: Vec::new(),
144 }
145 }
146
147 #[allow(clippy::unused_async)]
159 pub async fn build(&self) -> Result<Arc<dyn StateBackend>, StateBackendBuildError> {
160 match self {
161 Self::InProcess { vnode_capacity } => {
162 Ok(Arc::new(InProcessBackend::new(*vnode_capacity)))
163 }
164 Self::Local {
165 path,
166 instance_id,
167 vnode_capacity,
168 } => {
169 std::fs::create_dir_all(path)
170 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
171 let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
172 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
173 Ok(Arc::new(ObjectStoreBackend::new(
174 Arc::new(fs),
175 instance_id,
176 *vnode_capacity,
177 )))
178 }
179 Self::ObjectStore {
180 url,
181 instance_id,
182 vnode_capacity,
183 ..
184 } => {
185 let store = build_object_store(url)?;
186 Ok(Arc::new(ObjectStoreBackend::new(
187 store,
188 instance_id,
189 *vnode_capacity,
190 )))
191 }
192 }
193 }
194
195 #[must_use]
198 pub fn local_storage_dir(&self) -> Option<&std::path::Path> {
199 match self {
200 Self::Local { path, .. } => Some(path.as_path()),
201 _ => None,
202 }
203 }
204
205 pub fn build_object_store(
213 &self,
214 ) -> Result<Option<Arc<dyn ::object_store::ObjectStore>>, StateBackendBuildError> {
215 match self {
216 Self::InProcess { .. } => Ok(None),
217 Self::Local { path, .. } => {
218 std::fs::create_dir_all(path)
219 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
220 let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
221 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
222 Ok(Some(Arc::new(fs)))
223 }
224 Self::ObjectStore { url, .. } => Ok(Some(build_object_store(url)?)),
225 }
226 }
227
228 #[must_use]
231 pub fn is_durable(&self) -> bool {
232 !matches!(self, Self::InProcess { .. })
233 }
234
235 #[must_use]
237 pub fn vnode_capacity(&self) -> u32 {
238 match self {
239 Self::InProcess { vnode_capacity }
240 | Self::Local { vnode_capacity, .. }
241 | Self::ObjectStore { vnode_capacity, .. } => *vnode_capacity,
242 }
243 }
244}
245
246fn build_object_store(
253 url: &str,
254) -> Result<Arc<dyn ::object_store::ObjectStore>, StateBackendBuildError> {
255 if let Some(path) = url.strip_prefix("file://") {
256 let path = path.trim_start_matches('/');
257 std::fs::create_dir_all(path).map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
258 let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
259 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
260 Ok(Arc::new(fs))
261 } else {
262 Err(StateBackendBuildError::NotImplemented("object_store"))
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn parse_in_process_minimal() {
272 let toml = r#"backend = "in_process""#;
273 let c: StateBackendConfig = toml::from_str(toml).unwrap();
274 assert!(matches!(
275 c,
276 StateBackendConfig::InProcess {
277 vnode_capacity: 256
278 }
279 ));
280 assert!(!c.is_durable());
281 assert!(c.local_storage_dir().is_none());
282 }
283
284 #[test]
285 fn parse_local_with_path() {
286 let toml = r#"
287backend = "local"
288path = "/var/laminar"
289vnode_capacity = 128
290"#;
291 let c: StateBackendConfig = toml::from_str(toml).unwrap();
292 assert_eq!(
293 c.local_storage_dir(),
294 Some(std::path::Path::new("/var/laminar"))
295 );
296 assert!(c.is_durable());
297 if let StateBackendConfig::Local { vnode_capacity, .. } = c {
298 assert_eq!(vnode_capacity, 128);
299 } else {
300 panic!("expected Local");
301 }
302 }
303
304 #[test]
305 fn parse_object_store_static() {
306 let toml = r#"
307backend = "object_store"
308url = "s3://bucket/laminar"
309instance_id = "node-0"
310vnodes = [0, 1, 2, 3]
311merger_instance = "node-0"
312"#;
313 let c: StateBackendConfig = toml::from_str(toml).unwrap();
314 match c {
315 StateBackendConfig::ObjectStore {
316 url,
317 instance_id,
318 vnodes,
319 merger_instance,
320 discovery,
321 ..
322 } => {
323 assert_eq!(url, "s3://bucket/laminar");
324 assert_eq!(instance_id, "node-0");
325 assert_eq!(vnodes, Some(vec![0, 1, 2, 3]));
326 assert_eq!(merger_instance.as_deref(), Some("node-0"));
327 assert_eq!(discovery, DiscoveryMode::Static);
328 }
329 _ => panic!("expected ObjectStore"),
330 }
331 }
332
333 #[test]
334 fn parse_object_store_dynamic() {
335 let toml = r#"
336backend = "object_store"
337url = "s3://bucket/laminar"
338instance_id = "node-0"
339discovery = "dynamic"
340seed_peers = ["10.0.0.1:7946", "10.0.0.2:7946"]
341"#;
342 let c: StateBackendConfig = toml::from_str(toml).unwrap();
343 match c {
344 StateBackendConfig::ObjectStore {
345 discovery,
346 seed_peers,
347 ..
348 } => {
349 assert_eq!(discovery, DiscoveryMode::Dynamic);
350 assert_eq!(seed_peers.len(), 2);
351 }
352 _ => panic!("expected ObjectStore dynamic"),
353 }
354 }
355
356 #[tokio::test]
357 async fn build_in_process_returns_backend() {
358 use bytes::Bytes;
359 let c = StateBackendConfig::in_process();
360 let backend = c.build().await.unwrap();
361 backend
362 .write_partial(0, 1, 0, Bytes::from_static(b"ok"))
363 .await
364 .unwrap();
365 assert_eq!(
366 &backend.read_partial(0, 1).await.unwrap().unwrap()[..],
367 b"ok",
368 );
369 }
370
371 #[tokio::test]
372 async fn build_local_instantiates_backend() {
373 let dir = tempfile::tempdir().unwrap();
374 let c = StateBackendConfig::local(dir.path());
375 let backend = c.build().await.unwrap();
376 backend
377 .write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
378 .await
379 .unwrap();
380 assert_eq!(
381 &backend.read_partial(0, 1).await.unwrap().unwrap()[..],
382 b"z",
383 );
384 }
385
386 #[tokio::test]
387 async fn build_object_store_file_url_instantiates_backend() {
388 let dir = tempfile::tempdir().unwrap();
389 let url = format!(
390 "file://{}",
391 dir.path().display().to_string().replace('\\', "/")
392 );
393 let c = StateBackendConfig::object_store(url, "node-0");
394 let backend = c.build().await.unwrap();
395 backend
396 .write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
397 .await
398 .unwrap();
399 let got = backend.read_partial(0, 1).await.unwrap().unwrap();
400 assert_eq!(&got[..], b"z");
401 }
402
403 #[tokio::test]
404 async fn build_object_store_s3_returns_not_implemented() {
405 let c = StateBackendConfig::object_store("s3://bucket/path", "node-0");
406 assert!(matches!(
407 c.build().await,
408 Err(StateBackendBuildError::NotImplemented("object_store"))
409 ));
410 }
411
412 #[test]
413 fn default_is_in_process() {
414 let c = StateBackendConfig::default();
415 assert!(matches!(c, StateBackendConfig::InProcess { .. }));
416 }
417
418 #[test]
419 fn partial_eq_works() {
420 assert_eq!(
421 StateBackendConfig::in_process(),
422 StateBackendConfig::in_process()
423 );
424 assert_ne!(
425 StateBackendConfig::in_process(),
426 StateBackendConfig::local("/tmp/x")
427 );
428 }
429}