laminar_storage/
object_store_builder.rs1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
12use std::sync::Arc;
13
14use object_store::local::LocalFileSystem;
15use object_store::ObjectStore;
16
17#[derive(Debug, thiserror::Error)]
19pub enum ObjectStoreBuilderError {
20 #[error("scheme '{scheme}' requires the '{feature}' feature flag (compile with --features {feature})")]
22 MissingFeature {
23 scheme: String,
25 feature: String,
27 },
28
29 #[error("unsupported object store URL scheme: '{0}'")]
31 UnsupportedScheme(String),
32
33 #[error("invalid object store URL: {0}")]
35 InvalidUrl(String),
36
37 #[error("object store build error: {0}")]
39 Build(String),
40}
41
42impl From<object_store::Error> for ObjectStoreBuilderError {
43 fn from(e: object_store::Error) -> Self {
44 Self::Build(e.to_string())
45 }
46}
47
48#[allow(clippy::implicit_hasher)]
64pub fn build_object_store(
65 url: &str,
66 options: &HashMap<String, String>,
67) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
68 let scheme = url
69 .find("://")
70 .map(|i| &url[..i])
71 .ok_or_else(|| ObjectStoreBuilderError::InvalidUrl(format!("no scheme in '{url}'")))?;
72
73 match scheme {
74 "file" => build_local_file_system(url),
75 "s3" => build_s3(url, options),
76 "gs" => build_gcs(url, options),
77 "az" | "abfs" | "abfss" => build_azure(url, options),
78 other => Err(ObjectStoreBuilderError::UnsupportedScheme(
79 other.to_string(),
80 )),
81 }
82}
83
84fn build_local_file_system(url: &str) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
86 let path = url
88 .strip_prefix("file://")
89 .ok_or_else(|| ObjectStoreBuilderError::InvalidUrl(url.to_string()))?;
90
91 if path.is_empty() {
92 return Err(ObjectStoreBuilderError::InvalidUrl(
93 "file:// URL has empty path".to_string(),
94 ));
95 }
96
97 let path = strip_windows_leading_slash(path);
101
102 std::fs::create_dir_all(path).map_err(|e| {
104 ObjectStoreBuilderError::InvalidUrl(format!("failed to create directory '{path}': {e}"))
105 })?;
106
107 let fs = LocalFileSystem::new_with_prefix(path)?;
108 Ok(Arc::new(fs))
109}
110
111#[cfg(windows)]
116fn strip_windows_leading_slash(path: &str) -> &str {
117 let bytes = path.as_bytes();
118 if bytes.len() >= 3 && bytes[0] == b'/' && bytes[1].is_ascii_alphabetic() && bytes[2] == b':' {
120 &path[1..]
121 } else {
122 path
123 }
124}
125
126#[cfg(not(windows))]
128fn strip_windows_leading_slash(path: &str) -> &str {
129 path
130}
131
132#[cfg(feature = "aws")]
137fn build_s3(
138 url: &str,
139 options: &HashMap<String, String>,
140) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
141 use object_store::aws::AmazonS3Builder;
142
143 let mut builder = AmazonS3Builder::from_env().with_url(url);
144
145 for (key, value) in options {
146 let config_key = key.parse().map_err(|e: object_store::Error| {
147 ObjectStoreBuilderError::Build(format!("invalid S3 config key '{key}': {e}"))
148 })?;
149 builder = builder.with_config(config_key, value);
150 }
151
152 let store = builder.build()?;
153 Ok(Arc::new(store))
154}
155
156#[cfg(not(feature = "aws"))]
157fn build_s3(
158 _url: &str,
159 _options: &HashMap<String, String>,
160) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
161 Err(ObjectStoreBuilderError::MissingFeature {
162 scheme: "s3".to_string(),
163 feature: "aws".to_string(),
164 })
165}
166
167#[cfg(feature = "gcs")]
172fn build_gcs(
173 url: &str,
174 options: &HashMap<String, String>,
175) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
176 use object_store::gcp::GoogleCloudStorageBuilder;
177
178 let mut builder = GoogleCloudStorageBuilder::from_env().with_url(url);
179
180 for (key, value) in options {
181 let config_key = key.parse().map_err(|e: object_store::Error| {
182 ObjectStoreBuilderError::Build(format!("invalid GCS config key '{key}': {e}"))
183 })?;
184 builder = builder.with_config(config_key, value);
185 }
186
187 let store = builder.build()?;
188 Ok(Arc::new(store))
189}
190
191#[cfg(not(feature = "gcs"))]
192fn build_gcs(
193 _url: &str,
194 _options: &HashMap<String, String>,
195) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
196 Err(ObjectStoreBuilderError::MissingFeature {
197 scheme: "gs".to_string(),
198 feature: "gcs".to_string(),
199 })
200}
201
202#[cfg(feature = "azure")]
207fn build_azure(
208 url: &str,
209 options: &HashMap<String, String>,
210) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
211 use object_store::azure::MicrosoftAzureBuilder;
212
213 let mut builder = MicrosoftAzureBuilder::from_env().with_url(url);
214
215 for (key, value) in options {
216 let config_key = key.parse().map_err(|e: object_store::Error| {
217 ObjectStoreBuilderError::Build(format!("invalid Azure config key '{key}': {e}"))
218 })?;
219 builder = builder.with_config(config_key, value);
220 }
221
222 let store = builder.build()?;
223 Ok(Arc::new(store))
224}
225
226#[cfg(not(feature = "azure"))]
227fn build_azure(
228 _url: &str,
229 _options: &HashMap<String, String>,
230) -> Result<Arc<dyn ObjectStore>, ObjectStoreBuilderError> {
231 Err(ObjectStoreBuilderError::MissingFeature {
232 scheme: "az".to_string(),
233 feature: "azure".to_string(),
234 })
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
242 fn test_file_scheme_creates_local_fs() {
243 let dir = tempfile::tempdir().unwrap();
244 let url = format!("file://{}", dir.path().to_str().unwrap());
245 let store = build_object_store(&url, &HashMap::new());
246 assert!(store.is_ok(), "file:// should succeed: {store:?}");
247 }
248
249 #[test]
250 fn test_file_scheme_empty_path_errors() {
251 let result = build_object_store("file://", &HashMap::new());
252 assert!(result.is_err());
253 let err = result.unwrap_err().to_string();
254 assert!(err.contains("empty path"), "got: {err}");
255 }
256
257 #[test]
258 fn test_unknown_scheme_errors() {
259 let result = build_object_store("ftp://bucket/prefix", &HashMap::new());
260 assert!(result.is_err());
261 let err = result.unwrap_err().to_string();
262 assert!(err.contains("unsupported"), "got: {err}");
263 }
264
265 #[test]
266 fn test_no_scheme_errors() {
267 let result = build_object_store("/just/a/path", &HashMap::new());
268 assert!(result.is_err());
269 let err = result.unwrap_err().to_string();
270 assert!(err.contains("no scheme"), "got: {err}");
271 }
272
273 #[test]
274 fn test_s3_without_feature_errors() {
275 let result = build_object_store("s3://my-bucket/prefix", &HashMap::new());
278 if cfg!(feature = "aws") {
279 assert!(result.is_err() || result.is_ok());
281 } else {
282 let err = result.unwrap_err().to_string();
283 assert!(err.contains("aws"), "got: {err}");
284 }
285 }
286
287 #[test]
288 fn test_gs_without_feature_errors() {
289 let result = build_object_store("gs://my-bucket/prefix", &HashMap::new());
290 if cfg!(feature = "gcs") {
291 assert!(result.is_err() || result.is_ok());
292 } else {
293 let err = result.unwrap_err().to_string();
294 assert!(err.contains("gcs"), "got: {err}");
295 }
296 }
297
298 #[test]
299 fn test_azure_without_feature_errors() {
300 let result = build_object_store("az://my-container/prefix", &HashMap::new());
301 if cfg!(feature = "azure") {
302 assert!(result.is_err() || result.is_ok());
303 } else {
304 let err = result.unwrap_err().to_string();
305 assert!(err.contains("azure"), "got: {err}");
306 }
307 }
308
309 #[test]
310 #[cfg(windows)]
311 fn test_strip_windows_leading_slash() {
312 assert_eq!(strip_windows_leading_slash("/C:/foo"), "C:/foo");
314 assert_eq!(strip_windows_leading_slash("/D:/"), "D:/");
315 assert_eq!(strip_windows_leading_slash("/c:/bar"), "c:/bar");
316
317 assert_eq!(strip_windows_leading_slash("/path/to"), "/path/to");
319 assert_eq!(strip_windows_leading_slash("/tmp"), "/tmp");
320
321 assert_eq!(strip_windows_leading_slash("/"), "/");
323 assert_eq!(strip_windows_leading_slash(""), "");
324 assert_eq!(strip_windows_leading_slash("C:/foo"), "C:/foo");
325 }
326
327 #[test]
328 #[cfg(not(windows))]
329 fn test_strip_windows_leading_slash() {
330 assert_eq!(strip_windows_leading_slash("/C:/foo"), "/C:/foo");
332 assert_eq!(strip_windows_leading_slash("/path/to"), "/path/to");
333 assert_eq!(strip_windows_leading_slash(""), "");
334 }
335}