Skip to main content

laminar_core/checkpoint/
object_store_builder.rs

1//! Factory for building `ObjectStore` instances from URL schemes.
2//!
3//! Detects the cloud provider from the URL scheme (`s3://`, `gs://`, `az://`,
4//! `file://`) and constructs the appropriate backend. Cloud providers require
5//! their respective feature flags (`aws`, `gcs`, `azure`).
6//!
7//! Credentials are resolved via `from_env()` (reads standard env vars like
8//! `AWS_ACCESS_KEY_ID`) with explicit overrides from the `options` map.
9
10#![allow(clippy::disallowed_types)] // cold path: object store setup
11
12use std::collections::HashMap;
13use std::sync::Arc;
14
15use object_store::local::LocalFileSystem;
16use object_store::ObjectStore;
17
18/// Errors from object store construction.
19#[derive(Debug, thiserror::Error)]
20pub enum ObjectStoreBuilderError {
21    /// The URL scheme requires a feature that is not compiled in.
22    #[error("scheme '{scheme}' requires the '{feature}' feature flag (compile with --features {feature})")]
23    MissingFeature {
24        /// The URL scheme (e.g., "s3").
25        scheme: String,
26        /// The required cargo feature.
27        feature: String,
28    },
29
30    /// Unrecognized URL scheme.
31    #[error("unsupported object store URL scheme: '{0}'")]
32    UnsupportedScheme(String),
33
34    /// The URL could not be parsed.
35    #[error("invalid object store URL: {0}")]
36    InvalidUrl(String),
37
38    /// Backend construction failed.
39    #[error("object store build error: {0}")]
40    Build(String),
41}
42
43impl From<object_store::Error> for ObjectStoreBuilderError {
44    fn from(e: object_store::Error) -> Self {
45        Self::Build(e.to_string())
46    }
47}
48
49/// Build an [`ObjectStore`] from a URL and optional configuration overrides.
50///
51/// # Supported schemes
52///
53/// | Scheme | Feature | Builder |
54/// |--------|---------|---------|
55/// | `file://` | (always) | `LocalFileSystem` |
56/// | `s3://` | `aws` | `AmazonS3Builder` |
57/// | `gs://` | `gcs` | `GoogleCloudStorageBuilder` |
58/// | `az://`, `abfs://` | `azure` | `MicrosoftAzureBuilder` |
59///
60/// The URL's path (everything after the bucket/container) is applied as a
61/// key prefix on the returned store, so every consumer — checkpoint
62/// manifests, decision markers, control plane, state partials — is rooted
63/// under it. The cloud builders themselves only consume the bucket from the
64/// URL; without the wrapper, two clusters sharing a bucket with different
65/// path prefixes would silently collide at the bucket root.
66///
67/// # Errors
68///
69/// Returns [`ObjectStoreBuilderError`] if the scheme is unsupported, requires
70/// an uncompiled feature, or the backend fails to build.
71#[allow(clippy::implicit_hasher)]
72pub fn build_object_store(
73    url: &str,
74    options: &HashMap<String, String>,
75) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
76    let scheme = url
77        .find("://")
78        .map(|i| &url[..i])
79        .ok_or_else(|| ObjectStoreBuilderError::InvalidUrl(format!("no scheme in '{url}'")))?;
80
81    let store = match scheme {
82        // file:// uses the whole path as the filesystem root — already rooted.
83        "file" => return build_local_file_system(url),
84        "s3" => build_s3(url, options),
85        "gs" => build_gcs(url, options),
86        "az" | "abfs" | "abfss" => build_azure(url, options),
87        other => Err(ObjectStoreBuilderError::UnsupportedScheme(
88            other.to_string(),
89        )),
90    }?;
91
92    Ok(match url_path_prefix(url) {
93        "" => store,
94        prefix => Arc::new(object_store::prefix::PrefixStore::new(
95            store,
96            object_store::path::Path::from(prefix),
97        )),
98    })
99}
100
101/// The key prefix encoded in a cloud URL's path: everything after the
102/// bucket/container authority, e.g. `s3://bucket/a/b/` → `a/b`.
103fn url_path_prefix(url: &str) -> &str {
104    let after_scheme = url.find("://").map_or(url, |i| &url[i + 3..]);
105    after_scheme
106        .find('/')
107        .map_or("", |i| after_scheme[i + 1..].trim_matches('/'))
108}
109
110/// Normalized filesystem path from a `file://` URL: scheme stripped and
111/// the Windows drive-letter slash removed (`file:///C:/x` → `C:/x`).
112///
113/// # Errors
114/// Returns [`ObjectStoreBuilderError::InvalidUrl`] when the scheme is
115/// missing or the path is empty.
116pub fn file_url_path(url: &str) -> Result<&str, ObjectStoreBuilderError> {
117    let path = url
118        .strip_prefix("file://")
119        .ok_or_else(|| ObjectStoreBuilderError::InvalidUrl(url.to_string()))?;
120    if path.is_empty() {
121        return Err(ObjectStoreBuilderError::InvalidUrl(
122            "file:// URL has empty path".to_string(),
123        ));
124    }
125    Ok(strip_windows_leading_slash(path))
126}
127
128/// Extract the local path from a `file://` URL and create a `LocalFileSystem`.
129fn build_local_file_system(url: &str) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
130    let path = file_url_path(url)?;
131
132    // Ensure the directory exists — LocalFileSystem doesn't create it.
133    std::fs::create_dir_all(path).map_err(|e| {
134        ObjectStoreBuilderError::InvalidUrl(format!("failed to create directory '{path}': {e}"))
135    })?;
136
137    let fs = LocalFileSystem::new_with_prefix(path)?;
138    Ok(Arc::new(fs))
139}
140
141/// Strip the leading `/` that precedes a Windows drive letter.
142///
143/// `"/C:/foo"` → `"C:/foo"`. Only active on Windows; on other platforms
144/// the path is returned unchanged.
145#[cfg(windows)]
146fn strip_windows_leading_slash(path: &str) -> &str {
147    let bytes = path.as_bytes();
148    // Pattern: `/X:/...` where X is an ASCII letter
149    if bytes.len() >= 3 && bytes[0] == b'/' && bytes[1].is_ascii_alphabetic() && bytes[2] == b':' {
150        &path[1..]
151    } else {
152        path
153    }
154}
155
156/// No-op on non-Windows platforms — the leading slash is the valid root.
157#[cfg(not(windows))]
158fn strip_windows_leading_slash(path: &str) -> &str {
159    path
160}
161
162// ---------------------------------------------------------------------------
163// S3 (feature = "aws")
164// ---------------------------------------------------------------------------
165
166#[cfg(feature = "aws")]
167fn build_s3(
168    url: &str,
169    options: &HashMap<String, String>,
170) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
171    use object_store::aws::AmazonS3Builder;
172
173    let mut builder = AmazonS3Builder::from_env().with_url(url);
174
175    for (key, value) in options {
176        let config_key = key.parse().map_err(|e: object_store::Error| {
177            ObjectStoreBuilderError::Build(format!("invalid S3 config key '{key}': {e}"))
178        })?;
179        builder = builder.with_config(config_key, value);
180    }
181
182    let store = builder.build()?;
183    Ok(Arc::new(store))
184}
185
186#[cfg(not(feature = "aws"))]
187fn build_s3(
188    _url: &str,
189    _options: &HashMap<String, String>,
190) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
191    Err(ObjectStoreBuilderError::MissingFeature {
192        scheme: "s3".to_string(),
193        feature: "aws".to_string(),
194    })
195}
196
197// ---------------------------------------------------------------------------
198// GCS (feature = "gcs")
199// ---------------------------------------------------------------------------
200
201#[cfg(feature = "gcs")]
202fn build_gcs(
203    url: &str,
204    options: &HashMap<String, String>,
205) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
206    use object_store::gcp::GoogleCloudStorageBuilder;
207
208    let mut builder = GoogleCloudStorageBuilder::from_env().with_url(url);
209
210    for (key, value) in options {
211        let config_key = key.parse().map_err(|e: object_store::Error| {
212            ObjectStoreBuilderError::Build(format!("invalid GCS config key '{key}': {e}"))
213        })?;
214        builder = builder.with_config(config_key, value);
215    }
216
217    let store = builder.build()?;
218    Ok(Arc::new(store))
219}
220
221#[cfg(not(feature = "gcs"))]
222fn build_gcs(
223    _url: &str,
224    _options: &HashMap<String, String>,
225) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
226    Err(ObjectStoreBuilderError::MissingFeature {
227        scheme: "gs".to_string(),
228        feature: "gcs".to_string(),
229    })
230}
231
232// ---------------------------------------------------------------------------
233// Azure (feature = "azure")
234// ---------------------------------------------------------------------------
235
236#[cfg(feature = "azure")]
237fn build_azure(
238    url: &str,
239    options: &HashMap<String, String>,
240) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
241    use object_store::azure::MicrosoftAzureBuilder;
242
243    let mut builder = MicrosoftAzureBuilder::from_env().with_url(url);
244
245    for (key, value) in options {
246        let config_key = key.parse().map_err(|e: object_store::Error| {
247            ObjectStoreBuilderError::Build(format!("invalid Azure config key '{key}': {e}"))
248        })?;
249        builder = builder.with_config(config_key, value);
250    }
251
252    let store = builder.build()?;
253    Ok(Arc::new(store))
254}
255
256#[cfg(not(feature = "azure"))]
257fn build_azure(
258    _url: &str,
259    _options: &HashMap<String, String>,
260) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
261    Err(ObjectStoreBuilderError::MissingFeature {
262        scheme: "az".to_string(),
263        feature: "azure".to_string(),
264    })
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_file_scheme_creates_local_fs() {
273        let dir = tempfile::tempdir().unwrap();
274        let url = format!("file://{}", dir.path().to_str().unwrap());
275        let store = build_object_store(&url, &HashMap::new());
276        assert!(store.is_ok(), "file:// should succeed: {store:?}");
277    }
278
279    #[test]
280    fn test_file_scheme_empty_path_errors() {
281        let result = build_object_store("file://", &HashMap::new());
282        assert!(result.is_err());
283        let err = result.unwrap_err().to_string();
284        assert!(err.contains("empty path"), "got: {err}");
285    }
286
287    #[test]
288    fn test_unknown_scheme_errors() {
289        let result = build_object_store("ftp://bucket/prefix", &HashMap::new());
290        assert!(result.is_err());
291        let err = result.unwrap_err().to_string();
292        assert!(err.contains("unsupported"), "got: {err}");
293    }
294
295    #[test]
296    fn test_no_scheme_errors() {
297        let result = build_object_store("/just/a/path", &HashMap::new());
298        assert!(result.is_err());
299        let err = result.unwrap_err().to_string();
300        assert!(err.contains("no scheme"), "got: {err}");
301    }
302
303    #[test]
304    fn test_s3_without_feature_errors() {
305        // This test validates the behavior when aws feature is NOT compiled.
306        // When aws IS compiled, S3 builder will fail for other reasons (no region).
307        let result = build_object_store("s3://my-bucket/prefix", &HashMap::new());
308        if cfg!(feature = "aws") {
309            // With feature enabled, it will try to build (may fail due to missing config)
310            assert!(result.is_err() || result.is_ok());
311        } else {
312            let err = result.unwrap_err().to_string();
313            assert!(err.contains("aws"), "got: {err}");
314        }
315    }
316
317    #[test]
318    fn test_gs_without_feature_errors() {
319        let result = build_object_store("gs://my-bucket/prefix", &HashMap::new());
320        if cfg!(feature = "gcs") {
321            assert!(result.is_err() || result.is_ok());
322        } else {
323            let err = result.unwrap_err().to_string();
324            assert!(err.contains("gcs"), "got: {err}");
325        }
326    }
327
328    #[test]
329    fn test_azure_without_feature_errors() {
330        let result = build_object_store("az://my-container/prefix", &HashMap::new());
331        if cfg!(feature = "azure") {
332            assert!(result.is_err() || result.is_ok());
333        } else {
334            let err = result.unwrap_err().to_string();
335            assert!(err.contains("azure"), "got: {err}");
336        }
337    }
338
339    /// The URL path roots ALL consumers of the store (manifests,
340    /// decision markers, control plane, state partials) — two clusters
341    /// sharing a bucket must not collide at the root.
342    #[test]
343    fn url_path_prefix_extraction() {
344        assert_eq!(url_path_prefix("s3://bucket"), "");
345        assert_eq!(url_path_prefix("s3://bucket/"), "");
346        assert_eq!(url_path_prefix("s3://bucket/a"), "a");
347        assert_eq!(url_path_prefix("s3://bucket/a/b/"), "a/b");
348        assert_eq!(url_path_prefix("gs://bucket/x"), "x");
349        assert_eq!(
350            url_path_prefix("abfss://container@account.dfs.core.windows.net/p/q"),
351            "p/q"
352        );
353    }
354
355    #[test]
356    #[cfg(windows)]
357    fn test_strip_windows_leading_slash() {
358        // Windows drive letter patterns — slash stripped
359        assert_eq!(strip_windows_leading_slash("/C:/foo"), "C:/foo");
360        assert_eq!(strip_windows_leading_slash("/D:/"), "D:/");
361        assert_eq!(strip_windows_leading_slash("/c:/bar"), "c:/bar");
362
363        // Non-drive paths — no change
364        assert_eq!(strip_windows_leading_slash("/path/to"), "/path/to");
365        assert_eq!(strip_windows_leading_slash("/tmp"), "/tmp");
366
367        // Edge cases
368        assert_eq!(strip_windows_leading_slash("/"), "/");
369        assert_eq!(strip_windows_leading_slash(""), "");
370        assert_eq!(strip_windows_leading_slash("C:/foo"), "C:/foo");
371    }
372
373    #[test]
374    #[cfg(not(windows))]
375    fn test_strip_windows_leading_slash() {
376        // On non-Windows, all paths are returned unchanged
377        assert_eq!(strip_windows_leading_slash("/C:/foo"), "/C:/foo");
378        assert_eq!(strip_windows_leading_slash("/path/to"), "/path/to");
379        assert_eq!(strip_windows_leading_slash(""), "");
380    }
381}