Skip to main content

laminar_storage/
object_store_factory.rs

1//! Factory for building `ObjectStore` instances from URL schemes.
2//!
3//! Detects the cloud provider from the URL scheme (`s3://`, `gs://`, `az://`,
4//! `file://`) and constructs the appropriate backend. Cloud providers require
5//! their respective feature flags (`aws`, `gcs`, `azure`).
6//!
7//! Credentials are resolved via `from_env()` (reads standard env vars like
8//! `AWS_ACCESS_KEY_ID`) with explicit overrides from the `options` map.
9
10#[allow(clippy::disallowed_types)] // cold path: object store setup
11use std::collections::HashMap;
12use std::sync::Arc;
13
14use object_store::local::LocalFileSystem;
15use object_store::ObjectStore;
16
17/// Errors from object store construction.
18#[derive(Debug, thiserror::Error)]
19pub enum ObjectStoreFactoryError {
20    /// The URL scheme requires a feature that is not compiled in.
21    #[error("scheme '{scheme}' requires the '{feature}' feature flag (compile with --features {feature})")]
22    MissingFeature {
23        /// The URL scheme (e.g., "s3").
24        scheme: String,
25        /// The required cargo feature.
26        feature: String,
27    },
28
29    /// Unrecognized URL scheme.
30    #[error("unsupported object store URL scheme: '{0}'")]
31    UnsupportedScheme(String),
32
33    /// The URL could not be parsed.
34    #[error("invalid object store URL: {0}")]
35    InvalidUrl(String),
36
37    /// Backend construction failed.
38    #[error("object store build error: {0}")]
39    Build(String),
40}
41
42impl From<object_store::Error> for ObjectStoreFactoryError {
43    fn from(e: object_store::Error) -> Self {
44        Self::Build(e.to_string())
45    }
46}
47
48/// Build an [`ObjectStore`] from a URL and optional configuration overrides.
49///
50/// # Supported schemes
51///
52/// | Scheme | Feature | Builder |
53/// |--------|---------|---------|
54/// | `file://` | (always) | `LocalFileSystem` |
55/// | `s3://` | `aws` | `AmazonS3Builder` |
56/// | `gs://` | `gcs` | `GoogleCloudStorageBuilder` |
57/// | `az://`, `abfs://` | `azure` | `MicrosoftAzureBuilder` |
58///
59/// # Errors
60///
61/// Returns [`ObjectStoreFactoryError`] if the scheme is unsupported, requires
62/// an uncompiled feature, or the backend fails to build.
63#[allow(clippy::implicit_hasher)]
64pub fn build_object_store(
65    url: &str,
66    options: &HashMap<String, String>,
67) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
68    let scheme = url
69        .find("://")
70        .map(|i| &url[..i])
71        .ok_or_else(|| ObjectStoreFactoryError::InvalidUrl(format!("no scheme in '{url}'")))?;
72
73    match scheme {
74        "file" => build_local_file_system(url),
75        "s3" => build_s3(url, options),
76        "gs" => build_gcs(url, options),
77        "az" | "abfs" | "abfss" => build_azure(url, options),
78        other => Err(ObjectStoreFactoryError::UnsupportedScheme(
79            other.to_string(),
80        )),
81    }
82}
83
84/// Extract the local path from a `file://` URL and create a `LocalFileSystem`.
85fn build_local_file_system(url: &str) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
86    // file:///path/to/dir → /path/to/dir
87    let path = url
88        .strip_prefix("file://")
89        .ok_or_else(|| ObjectStoreFactoryError::InvalidUrl(url.to_string()))?;
90
91    if path.is_empty() {
92        return Err(ObjectStoreFactoryError::InvalidUrl(
93            "file:// URL has empty path".to_string(),
94        ));
95    }
96
97    let fs = LocalFileSystem::new_with_prefix(path)?;
98    Ok(Arc::new(fs))
99}
100
101// ---------------------------------------------------------------------------
102// S3 (feature = "aws")
103// ---------------------------------------------------------------------------
104
105#[cfg(feature = "aws")]
106fn build_s3(
107    url: &str,
108    options: &HashMap<String, String>,
109) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
110    use object_store::aws::AmazonS3Builder;
111
112    let mut builder = AmazonS3Builder::from_env().with_url(url);
113
114    for (key, value) in options {
115        let config_key = key.parse().map_err(|e: object_store::Error| {
116            ObjectStoreFactoryError::Build(format!("invalid S3 config key '{key}': {e}"))
117        })?;
118        builder = builder.with_config(config_key, value);
119    }
120
121    let store = builder.build()?;
122    Ok(Arc::new(store))
123}
124
125#[cfg(not(feature = "aws"))]
126fn build_s3(
127    _url: &str,
128    _options: &HashMap<String, String>,
129) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
130    Err(ObjectStoreFactoryError::MissingFeature {
131        scheme: "s3".to_string(),
132        feature: "aws".to_string(),
133    })
134}
135
136// ---------------------------------------------------------------------------
137// GCS (feature = "gcs")
138// ---------------------------------------------------------------------------
139
140#[cfg(feature = "gcs")]
141fn build_gcs(
142    url: &str,
143    options: &HashMap<String, String>,
144) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
145    use object_store::gcp::GoogleCloudStorageBuilder;
146
147    let mut builder = GoogleCloudStorageBuilder::from_env().with_url(url);
148
149    for (key, value) in options {
150        let config_key = key.parse().map_err(|e: object_store::Error| {
151            ObjectStoreFactoryError::Build(format!("invalid GCS config key '{key}': {e}"))
152        })?;
153        builder = builder.with_config(config_key, value);
154    }
155
156    let store = builder.build()?;
157    Ok(Arc::new(store))
158}
159
160#[cfg(not(feature = "gcs"))]
161fn build_gcs(
162    _url: &str,
163    _options: &HashMap<String, String>,
164) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
165    Err(ObjectStoreFactoryError::MissingFeature {
166        scheme: "gs".to_string(),
167        feature: "gcs".to_string(),
168    })
169}
170
171// ---------------------------------------------------------------------------
172// Azure (feature = "azure")
173// ---------------------------------------------------------------------------
174
175#[cfg(feature = "azure")]
176fn build_azure(
177    url: &str,
178    options: &HashMap<String, String>,
179) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
180    use object_store::azure::MicrosoftAzureBuilder;
181
182    let mut builder = MicrosoftAzureBuilder::from_env().with_url(url);
183
184    for (key, value) in options {
185        let config_key = key.parse().map_err(|e: object_store::Error| {
186            ObjectStoreFactoryError::Build(format!("invalid Azure config key '{key}': {e}"))
187        })?;
188        builder = builder.with_config(config_key, value);
189    }
190
191    let store = builder.build()?;
192    Ok(Arc::new(store))
193}
194
195#[cfg(not(feature = "azure"))]
196fn build_azure(
197    _url: &str,
198    _options: &HashMap<String, String>,
199) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
200    Err(ObjectStoreFactoryError::MissingFeature {
201        scheme: "az".to_string(),
202        feature: "azure".to_string(),
203    })
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn test_file_scheme_creates_local_fs() {
212        let dir = tempfile::tempdir().unwrap();
213        let url = format!("file://{}", dir.path().to_str().unwrap());
214        let store = build_object_store(&url, &HashMap::new());
215        assert!(store.is_ok(), "file:// should succeed: {store:?}");
216    }
217
218    #[test]
219    fn test_file_scheme_empty_path_errors() {
220        let result = build_object_store("file://", &HashMap::new());
221        assert!(result.is_err());
222        let err = result.unwrap_err().to_string();
223        assert!(err.contains("empty path"), "got: {err}");
224    }
225
226    #[test]
227    fn test_unknown_scheme_errors() {
228        let result = build_object_store("ftp://bucket/prefix", &HashMap::new());
229        assert!(result.is_err());
230        let err = result.unwrap_err().to_string();
231        assert!(err.contains("unsupported"), "got: {err}");
232    }
233
234    #[test]
235    fn test_no_scheme_errors() {
236        let result = build_object_store("/just/a/path", &HashMap::new());
237        assert!(result.is_err());
238        let err = result.unwrap_err().to_string();
239        assert!(err.contains("no scheme"), "got: {err}");
240    }
241
242    #[test]
243    fn test_s3_without_feature_errors() {
244        // This test validates the behavior when aws feature is NOT compiled.
245        // When aws IS compiled, S3 builder will fail for other reasons (no region).
246        let result = build_object_store("s3://my-bucket/prefix", &HashMap::new());
247        if cfg!(feature = "aws") {
248            // With feature enabled, it will try to build (may fail due to missing config)
249            assert!(result.is_err() || result.is_ok());
250        } else {
251            let err = result.unwrap_err().to_string();
252            assert!(err.contains("aws"), "got: {err}");
253        }
254    }
255
256    #[test]
257    fn test_gs_without_feature_errors() {
258        let result = build_object_store("gs://my-bucket/prefix", &HashMap::new());
259        if cfg!(feature = "gcs") {
260            assert!(result.is_err() || result.is_ok());
261        } else {
262            let err = result.unwrap_err().to_string();
263            assert!(err.contains("gcs"), "got: {err}");
264        }
265    }
266
267    #[test]
268    fn test_azure_without_feature_errors() {
269        let result = build_object_store("az://my-container/prefix", &HashMap::new());
270        if cfg!(feature = "azure") {
271            assert!(result.is_err() || result.is_ok());
272        } else {
273            let err = result.unwrap_err().to_string();
274            assert!(err.contains("azure"), "got: {err}");
275        }
276    }
277}