1use std::path::PathBuf;
6use std::sync::Arc;
7
8use serde::Deserialize;
9
10use super::{
11 backend::StateBackend, in_process::InProcessBackend, object_store::ObjectStoreBackend,
12};
13
14pub const DEFAULT_VNODE_CAPACITY: u32 = 256;
16
17fn default_vnode_capacity() -> u32 {
18 DEFAULT_VNODE_CAPACITY
19}
20
21fn default_instance_id() -> String {
22 "local".to_string()
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)]
27#[serde(rename_all = "snake_case")]
28pub enum DiscoveryMode {
29 #[default]
32 Static,
33 Dynamic,
36}
37
38#[derive(Clone, PartialEq, Eq, Default, Deserialize)]
42#[serde(transparent)]
43pub struct StorageOptions(pub rustc_hash::FxHashMap<String, String>);
44
45impl std::fmt::Debug for StorageOptions {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_map()
48 .entries(self.0.keys().map(|k| (k, "[REDACTED]")))
49 .finish()
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
57#[serde(tag = "backend", rename_all = "snake_case")]
58pub enum StateBackendConfig {
59 InProcess {
61 #[serde(default = "default_vnode_capacity")]
63 vnode_capacity: u32,
64 },
65
66 Local {
69 path: PathBuf,
71 #[serde(default = "default_instance_id")]
73 instance_id: String,
74 #[serde(default = "default_vnode_capacity")]
76 vnode_capacity: u32,
77 },
78
79 ObjectStore {
82 url: String,
85 #[serde(default)]
90 storage: StorageOptions,
91 instance_id: String,
94 #[serde(default = "default_vnode_capacity")]
96 vnode_capacity: u32,
97 #[serde(default)]
100 vnodes: Option<Vec<u32>>,
101 #[serde(default)]
104 merger_instance: Option<String>,
105 #[serde(default)]
107 discovery: DiscoveryMode,
108 #[serde(default)]
110 seed_peers: Vec<String>,
111 },
112}
113
114impl Default for StateBackendConfig {
115 fn default() -> Self {
116 Self::InProcess {
117 vnode_capacity: DEFAULT_VNODE_CAPACITY,
118 }
119 }
120}
121
122#[derive(Debug, thiserror::Error)]
124pub enum StateBackendBuildError {
125 #[error("state backend object store: {0}")]
128 Store(#[from] crate::checkpoint::object_store_builder::ObjectStoreBuilderError),
129
130 #[error("state backend construction failed: {0}")]
132 Io(String),
133}
134
135impl StateBackendConfig {
136 #[must_use]
138 pub fn in_process() -> Self {
139 Self::InProcess {
140 vnode_capacity: DEFAULT_VNODE_CAPACITY,
141 }
142 }
143
144 #[must_use]
146 pub fn local(path: impl Into<PathBuf>) -> Self {
147 Self::Local {
148 path: path.into(),
149 instance_id: default_instance_id(),
150 vnode_capacity: DEFAULT_VNODE_CAPACITY,
151 }
152 }
153
154 #[must_use]
158 pub fn object_store(url: impl Into<String>, instance_id: impl Into<String>) -> Self {
159 Self::ObjectStore {
160 url: url.into(),
161 storage: StorageOptions::default(),
162 instance_id: instance_id.into(),
163 vnode_capacity: DEFAULT_VNODE_CAPACITY,
164 vnodes: None,
165 merger_instance: None,
166 discovery: DiscoveryMode::Static,
167 seed_peers: Vec::new(),
168 }
169 }
170
171 #[allow(clippy::unused_async)]
184 pub async fn build(&self) -> Result<Arc<dyn StateBackend>, StateBackendBuildError> {
185 match self {
186 Self::InProcess { vnode_capacity } => {
187 Ok(Arc::new(InProcessBackend::new(*vnode_capacity)))
188 }
189 Self::Local {
190 path,
191 instance_id,
192 vnode_capacity,
193 } => {
194 std::fs::create_dir_all(path)
195 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
196 let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
197 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
198 Ok(Arc::new(ObjectStoreBackend::new(
199 Arc::new(fs),
200 instance_id,
201 *vnode_capacity,
202 )))
203 }
204 Self::ObjectStore {
205 url,
206 storage,
207 instance_id,
208 vnode_capacity,
209 ..
210 } => {
211 let store = cloud_store(url, storage)?;
212 Ok(Arc::new(ObjectStoreBackend::new(
213 store,
214 instance_id,
215 *vnode_capacity,
216 )))
217 }
218 }
219 }
220
221 #[must_use]
224 pub fn local_storage_dir(&self) -> Option<&std::path::Path> {
225 match self {
226 Self::Local { path, .. } => Some(path.as_path()),
227 _ => None,
228 }
229 }
230
231 pub fn build_object_store(
239 &self,
240 ) -> Result<Option<Arc<dyn ::object_store::ObjectStore>>, StateBackendBuildError> {
241 match self {
242 Self::InProcess { .. } => Ok(None),
243 Self::Local { path, .. } => {
244 std::fs::create_dir_all(path)
245 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
246 let fs = ::object_store::local::LocalFileSystem::new_with_prefix(path)
247 .map_err(|e| StateBackendBuildError::Io(e.to_string()))?;
248 Ok(Some(Arc::new(fs)))
249 }
250 Self::ObjectStore { url, storage, .. } => Ok(Some(cloud_store(url, storage)?)),
251 }
252 }
253
254 #[must_use]
257 pub fn is_durable(&self) -> bool {
258 !matches!(self, Self::InProcess { .. })
259 }
260
261 #[must_use]
263 pub fn vnode_capacity(&self) -> u32 {
264 match self {
265 Self::InProcess { vnode_capacity }
266 | Self::Local { vnode_capacity, .. }
267 | Self::ObjectStore { vnode_capacity, .. } => *vnode_capacity,
268 }
269 }
270}
271
272fn cloud_store(
277 url: &str,
278 storage: &StorageOptions,
279) -> Result<Arc<dyn ::object_store::ObjectStore>, StateBackendBuildError> {
280 Ok(crate::checkpoint::object_store_builder::build_object_store(
281 url,
282 &storage
283 .0
284 .iter()
285 .map(|(k, v)| (k.clone(), v.clone()))
286 .collect(),
287 )?)
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 #[test]
295 fn parse_in_process_minimal() {
296 let toml = r#"backend = "in_process""#;
297 let c: StateBackendConfig = toml::from_str(toml).unwrap();
298 assert!(matches!(
299 c,
300 StateBackendConfig::InProcess {
301 vnode_capacity: 256
302 }
303 ));
304 assert!(!c.is_durable());
305 assert!(c.local_storage_dir().is_none());
306 }
307
308 #[test]
309 fn parse_local_with_path() {
310 let toml = r#"
311backend = "local"
312path = "/var/laminar"
313vnode_capacity = 128
314"#;
315 let c: StateBackendConfig = toml::from_str(toml).unwrap();
316 assert_eq!(
317 c.local_storage_dir(),
318 Some(std::path::Path::new("/var/laminar"))
319 );
320 assert!(c.is_durable());
321 if let StateBackendConfig::Local { vnode_capacity, .. } = c {
322 assert_eq!(vnode_capacity, 128);
323 } else {
324 panic!("expected Local");
325 }
326 }
327
328 #[test]
329 fn parse_object_store_static() {
330 let toml = r#"
331backend = "object_store"
332url = "s3://bucket/laminar"
333instance_id = "node-0"
334vnodes = [0, 1, 2, 3]
335merger_instance = "node-0"
336"#;
337 let c: StateBackendConfig = toml::from_str(toml).unwrap();
338 match c {
339 StateBackendConfig::ObjectStore {
340 url,
341 instance_id,
342 vnodes,
343 merger_instance,
344 discovery,
345 ..
346 } => {
347 assert_eq!(url, "s3://bucket/laminar");
348 assert_eq!(instance_id, "node-0");
349 assert_eq!(vnodes, Some(vec![0, 1, 2, 3]));
350 assert_eq!(merger_instance.as_deref(), Some("node-0"));
351 assert_eq!(discovery, DiscoveryMode::Static);
352 }
353 _ => panic!("expected ObjectStore"),
354 }
355 }
356
357 #[test]
358 fn parse_object_store_dynamic() {
359 let toml = r#"
360backend = "object_store"
361url = "s3://bucket/laminar"
362instance_id = "node-0"
363discovery = "dynamic"
364seed_peers = ["10.0.0.1:7946", "10.0.0.2:7946"]
365"#;
366 let c: StateBackendConfig = toml::from_str(toml).unwrap();
367 match c {
368 StateBackendConfig::ObjectStore {
369 discovery,
370 seed_peers,
371 ..
372 } => {
373 assert_eq!(discovery, DiscoveryMode::Dynamic);
374 assert_eq!(seed_peers.len(), 2);
375 }
376 _ => panic!("expected ObjectStore dynamic"),
377 }
378 }
379
380 #[tokio::test]
381 async fn build_in_process_returns_backend() {
382 use bytes::Bytes;
383 let c = StateBackendConfig::in_process();
384 let backend = c.build().await.unwrap();
385 backend
386 .write_partial(0, 1, 0, Bytes::from_static(b"ok"))
387 .await
388 .unwrap();
389 assert_eq!(
390 &backend.read_partial(0, 1).await.unwrap().unwrap()[..],
391 b"ok",
392 );
393 }
394
395 #[tokio::test]
396 async fn build_local_instantiates_backend() {
397 let dir = tempfile::tempdir().unwrap();
398 let c = StateBackendConfig::local(dir.path());
399 let backend = c.build().await.unwrap();
400 backend
401 .write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
402 .await
403 .unwrap();
404 assert_eq!(
405 &backend.read_partial(0, 1).await.unwrap().unwrap()[..],
406 b"z",
407 );
408 }
409
410 #[tokio::test]
411 async fn build_object_store_file_url_instantiates_backend() {
412 let dir = tempfile::tempdir().unwrap();
413 let url = format!(
414 "file://{}",
415 dir.path().display().to_string().replace('\\', "/")
416 );
417 let c = StateBackendConfig::object_store(url, "node-0");
418 let backend = c.build().await.unwrap();
419 backend
420 .write_partial(0, 1, 0, bytes::Bytes::from_static(b"z"))
421 .await
422 .unwrap();
423 let got = backend.read_partial(0, 1).await.unwrap().unwrap();
424 assert_eq!(&got[..], b"z");
425 }
426
427 #[cfg(not(feature = "aws"))]
430 #[tokio::test]
431 async fn build_object_store_s3_requires_aws_feature() {
432 use crate::checkpoint::object_store_builder::ObjectStoreBuilderError;
433
434 let c = StateBackendConfig::object_store("s3://bucket/path", "node-0");
435 let err = match c.build().await {
436 Ok(_) => panic!("s3 must not build without the aws feature"),
437 Err(e) => e,
438 };
439 assert!(
440 matches!(
441 err,
442 StateBackendBuildError::Store(ObjectStoreBuilderError::MissingFeature { .. })
443 ),
444 "got: {err}",
445 );
446 }
447
448 #[cfg(feature = "aws")]
452 #[tokio::test]
453 async fn build_object_store_s3_builds_with_storage_options() {
454 let toml = r#"
455backend = "object_store"
456url = "s3://bucket/laminar"
457instance_id = "node-0"
458
459[storage]
460endpoint = "http://127.0.0.1:9000"
461aws_access_key_id = "k"
462aws_secret_access_key = "s"
463region = "us-east-1"
464allow_http = "true"
465"#;
466 let c: StateBackendConfig = toml::from_str(toml).unwrap();
467 c.build().await.expect("s3 client must build offline");
468 }
469
470 #[test]
471 fn default_is_in_process() {
472 let c = StateBackendConfig::default();
473 assert!(matches!(c, StateBackendConfig::InProcess { .. }));
474 }
475
476 #[test]
477 fn partial_eq_works() {
478 assert_eq!(
479 StateBackendConfig::in_process(),
480 StateBackendConfig::in_process()
481 );
482 assert_ne!(
483 StateBackendConfig::in_process(),
484 StateBackendConfig::local("/tmp/x")
485 );
486 }
487}