laminar_storage/
object_store_factory.rs1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
12use std::sync::Arc;
13
14use object_store::local::LocalFileSystem;
15use object_store::ObjectStore;
16
17#[derive(Debug, thiserror::Error)]
19pub enum ObjectStoreFactoryError {
20 #[error("scheme '{scheme}' requires the '{feature}' feature flag (compile with --features {feature})")]
22 MissingFeature {
23 scheme: String,
25 feature: String,
27 },
28
29 #[error("unsupported object store URL scheme: '{0}'")]
31 UnsupportedScheme(String),
32
33 #[error("invalid object store URL: {0}")]
35 InvalidUrl(String),
36
37 #[error("object store build error: {0}")]
39 Build(String),
40}
41
42impl From<object_store::Error> for ObjectStoreFactoryError {
43 fn from(e: object_store::Error) -> Self {
44 Self::Build(e.to_string())
45 }
46}
47
48#[allow(clippy::implicit_hasher)]
64pub fn build_object_store(
65 url: &str,
66 options: &HashMap<String, String>,
67) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
68 let scheme = url
69 .find("://")
70 .map(|i| &url[..i])
71 .ok_or_else(|| ObjectStoreFactoryError::InvalidUrl(format!("no scheme in '{url}'")))?;
72
73 match scheme {
74 "file" => build_local_file_system(url),
75 "s3" => build_s3(url, options),
76 "gs" => build_gcs(url, options),
77 "az" | "abfs" | "abfss" => build_azure(url, options),
78 other => Err(ObjectStoreFactoryError::UnsupportedScheme(
79 other.to_string(),
80 )),
81 }
82}
83
84fn build_local_file_system(url: &str) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
86 let path = url
88 .strip_prefix("file://")
89 .ok_or_else(|| ObjectStoreFactoryError::InvalidUrl(url.to_string()))?;
90
91 if path.is_empty() {
92 return Err(ObjectStoreFactoryError::InvalidUrl(
93 "file:// URL has empty path".to_string(),
94 ));
95 }
96
97 let fs = LocalFileSystem::new_with_prefix(path)?;
98 Ok(Arc::new(fs))
99}
100
101#[cfg(feature = "aws")]
106fn build_s3(
107 url: &str,
108 options: &HashMap<String, String>,
109) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
110 use object_store::aws::AmazonS3Builder;
111
112 let mut builder = AmazonS3Builder::from_env().with_url(url);
113
114 for (key, value) in options {
115 let config_key = key.parse().map_err(|e: object_store::Error| {
116 ObjectStoreFactoryError::Build(format!("invalid S3 config key '{key}': {e}"))
117 })?;
118 builder = builder.with_config(config_key, value);
119 }
120
121 let store = builder.build()?;
122 Ok(Arc::new(store))
123}
124
125#[cfg(not(feature = "aws"))]
126fn build_s3(
127 _url: &str,
128 _options: &HashMap<String, String>,
129) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
130 Err(ObjectStoreFactoryError::MissingFeature {
131 scheme: "s3".to_string(),
132 feature: "aws".to_string(),
133 })
134}
135
136#[cfg(feature = "gcs")]
141fn build_gcs(
142 url: &str,
143 options: &HashMap<String, String>,
144) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
145 use object_store::gcp::GoogleCloudStorageBuilder;
146
147 let mut builder = GoogleCloudStorageBuilder::from_env().with_url(url);
148
149 for (key, value) in options {
150 let config_key = key.parse().map_err(|e: object_store::Error| {
151 ObjectStoreFactoryError::Build(format!("invalid GCS config key '{key}': {e}"))
152 })?;
153 builder = builder.with_config(config_key, value);
154 }
155
156 let store = builder.build()?;
157 Ok(Arc::new(store))
158}
159
160#[cfg(not(feature = "gcs"))]
161fn build_gcs(
162 _url: &str,
163 _options: &HashMap<String, String>,
164) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
165 Err(ObjectStoreFactoryError::MissingFeature {
166 scheme: "gs".to_string(),
167 feature: "gcs".to_string(),
168 })
169}
170
171#[cfg(feature = "azure")]
176fn build_azure(
177 url: &str,
178 options: &HashMap<String, String>,
179) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
180 use object_store::azure::MicrosoftAzureBuilder;
181
182 let mut builder = MicrosoftAzureBuilder::from_env().with_url(url);
183
184 for (key, value) in options {
185 let config_key = key.parse().map_err(|e: object_store::Error| {
186 ObjectStoreFactoryError::Build(format!("invalid Azure config key '{key}': {e}"))
187 })?;
188 builder = builder.with_config(config_key, value);
189 }
190
191 let store = builder.build()?;
192 Ok(Arc::new(store))
193}
194
195#[cfg(not(feature = "azure"))]
196fn build_azure(
197 _url: &str,
198 _options: &HashMap<String, String>,
199) -> Result<Arc<dyn ObjectStore>, ObjectStoreFactoryError> {
200 Err(ObjectStoreFactoryError::MissingFeature {
201 scheme: "az".to_string(),
202 feature: "azure".to_string(),
203 })
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 #[test]
211 fn test_file_scheme_creates_local_fs() {
212 let dir = tempfile::tempdir().unwrap();
213 let url = format!("file://{}", dir.path().to_str().unwrap());
214 let store = build_object_store(&url, &HashMap::new());
215 assert!(store.is_ok(), "file:// should succeed: {store:?}");
216 }
217
218 #[test]
219 fn test_file_scheme_empty_path_errors() {
220 let result = build_object_store("file://", &HashMap::new());
221 assert!(result.is_err());
222 let err = result.unwrap_err().to_string();
223 assert!(err.contains("empty path"), "got: {err}");
224 }
225
226 #[test]
227 fn test_unknown_scheme_errors() {
228 let result = build_object_store("ftp://bucket/prefix", &HashMap::new());
229 assert!(result.is_err());
230 let err = result.unwrap_err().to_string();
231 assert!(err.contains("unsupported"), "got: {err}");
232 }
233
234 #[test]
235 fn test_no_scheme_errors() {
236 let result = build_object_store("/just/a/path", &HashMap::new());
237 assert!(result.is_err());
238 let err = result.unwrap_err().to_string();
239 assert!(err.contains("no scheme"), "got: {err}");
240 }
241
242 #[test]
243 fn test_s3_without_feature_errors() {
244 let result = build_object_store("s3://my-bucket/prefix", &HashMap::new());
247 if cfg!(feature = "aws") {
248 assert!(result.is_err() || result.is_ok());
250 } else {
251 let err = result.unwrap_err().to_string();
252 assert!(err.contains("aws"), "got: {err}");
253 }
254 }
255
256 #[test]
257 fn test_gs_without_feature_errors() {
258 let result = build_object_store("gs://my-bucket/prefix", &HashMap::new());
259 if cfg!(feature = "gcs") {
260 assert!(result.is_err() || result.is_ok());
261 } else {
262 let err = result.unwrap_err().to_string();
263 assert!(err.contains("gcs"), "got: {err}");
264 }
265 }
266
267 #[test]
268 fn test_azure_without_feature_errors() {
269 let result = build_object_store("az://my-container/prefix", &HashMap::new());
270 if cfg!(feature = "azure") {
271 assert!(result.is_err() || result.is_ok());
272 } else {
273 let err = result.unwrap_err().to_string();
274 assert!(err.contains("azure"), "got: {err}");
275 }
276 }
277}