Skip to main content

laminar_connectors/storage/
resolver.rs

1//! Cloud storage credential resolver.
2//!
3//! [`StorageCredentialResolver`] merges explicit `storage.*` config options with
4//! environment variable fallbacks, producing a [`ResolvedStorageOptions`] ready
5//! for consumption by `object_store` / `deltalake`.
6//!
7//! Resolution priority chain:
8//! 1. Explicit options from SQL `WITH` clause (`storage.*` keys)
9//! 2. Environment variables (`AWS_ACCESS_KEY_ID`, etc.)
10//! 3. Instance metadata / default credential providers (handled by `object_store`)
11#![allow(clippy::disallowed_types)] // cold path: storage configuration
12
13use std::collections::HashMap;
14
15use super::provider::StorageProvider;
16
17/// AWS S3 environment variable fallbacks.
18///
19/// Maps option key (as used by `object_store`/`deltalake`) to env var name.
20const AWS_ENV_MAPPING: &[(&str, &str)] = &[
21    ("aws_access_key_id", "AWS_ACCESS_KEY_ID"),
22    ("aws_secret_access_key", "AWS_SECRET_ACCESS_KEY"),
23    ("aws_region", "AWS_REGION"),
24    ("aws_session_token", "AWS_SESSION_TOKEN"),
25    ("aws_endpoint", "AWS_ENDPOINT_URL"),
26    ("aws_profile", "AWS_PROFILE"),
27    ("aws_s3_allow_unsafe_rename", "AWS_S3_ALLOW_UNSAFE_RENAME"),
28];
29
30/// Azure ADLS environment variable fallbacks.
31const AZURE_ENV_MAPPING: &[(&str, &str)] = &[
32    ("azure_storage_account_name", "AZURE_STORAGE_ACCOUNT_NAME"),
33    ("azure_storage_account_key", "AZURE_STORAGE_ACCOUNT_KEY"),
34    ("azure_storage_sas_token", "AZURE_STORAGE_SAS_TOKEN"),
35    ("azure_storage_client_id", "AZURE_CLIENT_ID"),
36    ("azure_storage_tenant_id", "AZURE_TENANT_ID"),
37    ("azure_storage_client_secret", "AZURE_CLIENT_SECRET"),
38];
39
40/// GCS environment variable fallbacks.
41const GCS_ENV_MAPPING: &[(&str, &str)] = &[
42    (
43        "google_service_account_path",
44        "GOOGLE_APPLICATION_CREDENTIALS",
45    ),
46    ("google_service_account_key", "GOOGLE_SERVICE_ACCOUNT_KEY"),
47];
48
49/// Resolved storage credentials ready for `object_store` / `deltalake`.
50#[derive(Debug, Clone)]
51pub struct ResolvedStorageOptions {
52    /// Detected cloud provider.
53    pub provider: StorageProvider,
54    /// Merged options (explicit config + env vars).
55    /// Keys match what `deltalake`/`object_store` expect.
56    pub options: HashMap<String, String>,
57    /// Keys that were resolved from environment variables (not explicit config).
58    pub env_resolved_keys: Vec<String>,
59}
60
61impl ResolvedStorageOptions {
62    /// Returns true if any credentials were found (explicit or from env).
63    #[must_use]
64    pub fn has_credentials(&self) -> bool {
65        match self.provider {
66            StorageProvider::AwsS3 => {
67                self.options.contains_key("aws_access_key_id")
68                    || self.options.contains_key("aws_profile")
69            }
70            StorageProvider::AzureAdls => {
71                self.options.contains_key("azure_storage_account_key")
72                    || self.options.contains_key("azure_storage_sas_token")
73                    || self.options.contains_key("azure_storage_client_id")
74            }
75            StorageProvider::Gcs => {
76                self.options.contains_key("google_service_account_path")
77                    || self.options.contains_key("google_service_account_key")
78            }
79            StorageProvider::Local => false,
80        }
81    }
82}
83
84/// Storage credential resolver.
85///
86/// Resolves credentials by priority chain:
87/// 1. Explicit `storage.*` keys from SQL WITH clause
88/// 2. Environment variables
89/// 3. Instance metadata / default credential provider (handled downstream by `object_store`)
90pub struct StorageCredentialResolver;
91
92impl StorageCredentialResolver {
93    /// Resolves storage credentials for the given table path.
94    ///
95    /// Merges explicit options with environment variable fallbacks
96    /// appropriate for the detected cloud provider.
97    ///
98    /// # Arguments
99    ///
100    /// * `table_path` - URI of the table (`s3://`, `az://`, `gs://`, or local path)
101    /// * `explicit_options` - Options from SQL WITH clause (`storage.` prefix already stripped)
102    ///
103    /// # Returns
104    ///
105    /// [`ResolvedStorageOptions`] with merged credentials.
106    #[must_use]
107    pub fn resolve(
108        table_path: &str,
109        explicit_options: &HashMap<String, String>,
110    ) -> ResolvedStorageOptions {
111        let provider = StorageProvider::detect(table_path);
112
113        if provider == StorageProvider::Local {
114            return ResolvedStorageOptions {
115                provider,
116                options: explicit_options.clone(),
117                env_resolved_keys: Vec::new(),
118            };
119        }
120
121        let env_mapping = match provider {
122            StorageProvider::AwsS3 => AWS_ENV_MAPPING,
123            StorageProvider::AzureAdls => AZURE_ENV_MAPPING,
124            StorageProvider::Gcs => GCS_ENV_MAPPING,
125            StorageProvider::Local => &[],
126        };
127
128        let mut resolved = explicit_options.clone();
129        let mut env_resolved = Vec::new();
130
131        for (option_key, env_var) in env_mapping {
132            if !resolved.contains_key(*option_key) {
133                if let Ok(val) = std::env::var(env_var) {
134                    if !val.is_empty() {
135                        resolved.insert((*option_key).to_string(), val);
136                        env_resolved.push((*option_key).to_string());
137                    }
138                }
139            }
140        }
141
142        ResolvedStorageOptions {
143            provider,
144            options: resolved,
145            env_resolved_keys: env_resolved,
146        }
147    }
148
149    /// Resolves credentials using a custom environment lookup function.
150    ///
151    /// Allows injecting env var values without mutating the actual
152    /// process environment.
153    #[cfg(test)]
154    #[must_use]
155    pub fn resolve_with_env<F>(
156        table_path: &str,
157        explicit_options: &HashMap<String, String>,
158        env_lookup: F,
159    ) -> ResolvedStorageOptions
160    where
161        F: Fn(&str) -> Option<String>,
162    {
163        let provider = StorageProvider::detect(table_path);
164
165        if provider == StorageProvider::Local {
166            return ResolvedStorageOptions {
167                provider,
168                options: explicit_options.clone(),
169                env_resolved_keys: Vec::new(),
170            };
171        }
172
173        let env_mapping = match provider {
174            StorageProvider::AwsS3 => AWS_ENV_MAPPING,
175            StorageProvider::AzureAdls => AZURE_ENV_MAPPING,
176            StorageProvider::Gcs => GCS_ENV_MAPPING,
177            StorageProvider::Local => &[],
178        };
179
180        let mut resolved = explicit_options.clone();
181        let mut env_resolved = Vec::new();
182
183        for (option_key, env_var) in env_mapping {
184            if !resolved.contains_key(*option_key) {
185                if let Some(val) = env_lookup(env_var) {
186                    if !val.is_empty() {
187                        resolved.insert((*option_key).to_string(), val);
188                        env_resolved.push((*option_key).to_string());
189                    }
190                }
191            }
192        }
193
194        ResolvedStorageOptions {
195            provider,
196            options: resolved,
197            env_resolved_keys: env_resolved,
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    fn empty_opts() -> HashMap<String, String> {
207        HashMap::new()
208    }
209
210    fn env_none(_: &str) -> Option<String> {
211        None
212    }
213
214    fn aws_env(var: &str) -> Option<String> {
215        match var {
216            "AWS_ACCESS_KEY_ID" => Some("AKID_FROM_ENV".to_string()),
217            "AWS_SECRET_ACCESS_KEY" => Some("SECRET_FROM_ENV".to_string()),
218            "AWS_REGION" => Some("us-west-2".to_string()),
219            _ => None,
220        }
221    }
222
223    fn azure_env(var: &str) -> Option<String> {
224        match var {
225            "AZURE_STORAGE_ACCOUNT_NAME" => Some("myaccount".to_string()),
226            "AZURE_STORAGE_ACCOUNT_KEY" => Some("base64key==".to_string()),
227            _ => None,
228        }
229    }
230
231    fn gcs_env(var: &str) -> Option<String> {
232        match var {
233            "GOOGLE_APPLICATION_CREDENTIALS" => Some("/path/to/creds.json".to_string()),
234            _ => None,
235        }
236    }
237
238    // ── Local path tests ──
239
240    #[test]
241    fn test_resolve_local_no_credentials() {
242        let resolved =
243            StorageCredentialResolver::resolve_with_env("/data/table", &empty_opts(), env_none);
244        assert_eq!(resolved.provider, StorageProvider::Local);
245        assert!(resolved.options.is_empty());
246        assert!(resolved.env_resolved_keys.is_empty());
247        assert!(!resolved.has_credentials());
248    }
249
250    #[test]
251    fn test_resolve_local_preserves_explicit() {
252        let mut opts = HashMap::new();
253        opts.insert("custom_key".to_string(), "value".to_string());
254        let resolved = StorageCredentialResolver::resolve_with_env("/data/table", &opts, env_none);
255        assert_eq!(resolved.options.get("custom_key").unwrap(), "value");
256    }
257
258    // ── S3 tests ──
259
260    #[test]
261    fn test_resolve_s3_explicit_keys() {
262        let mut opts = HashMap::new();
263        opts.insert("aws_access_key_id".to_string(), "EXPLICIT_KEY".to_string());
264        opts.insert(
265            "aws_secret_access_key".to_string(),
266            "EXPLICIT_SECRET".to_string(),
267        );
268        opts.insert("aws_region".to_string(), "eu-west-1".to_string());
269
270        let resolved =
271            StorageCredentialResolver::resolve_with_env("s3://bucket/path", &opts, aws_env);
272        assert_eq!(resolved.provider, StorageProvider::AwsS3);
273        assert_eq!(resolved.options["aws_access_key_id"], "EXPLICIT_KEY");
274        assert_eq!(resolved.options["aws_secret_access_key"], "EXPLICIT_SECRET");
275        assert_eq!(resolved.options["aws_region"], "eu-west-1");
276        assert!(resolved.env_resolved_keys.is_empty());
277        assert!(resolved.has_credentials());
278    }
279
280    #[test]
281    fn test_resolve_s3_env_fallback() {
282        let resolved =
283            StorageCredentialResolver::resolve_with_env("s3://bucket/path", &empty_opts(), aws_env);
284        assert_eq!(resolved.options["aws_access_key_id"], "AKID_FROM_ENV");
285        assert_eq!(resolved.options["aws_secret_access_key"], "SECRET_FROM_ENV");
286        assert_eq!(resolved.options["aws_region"], "us-west-2");
287        assert_eq!(resolved.env_resolved_keys.len(), 3);
288        assert!(resolved.has_credentials());
289    }
290
291    #[test]
292    fn test_resolve_s3_explicit_overrides_env() {
293        let mut opts = HashMap::new();
294        opts.insert("aws_region".to_string(), "ap-southeast-1".to_string());
295
296        let resolved =
297            StorageCredentialResolver::resolve_with_env("s3://bucket/path", &opts, aws_env);
298        // Explicit region preserved; env keys and secret filled from env.
299        assert_eq!(resolved.options["aws_region"], "ap-southeast-1");
300        assert_eq!(resolved.options["aws_access_key_id"], "AKID_FROM_ENV");
301        assert!(!resolved
302            .env_resolved_keys
303            .contains(&"aws_region".to_string()));
304        assert!(resolved
305            .env_resolved_keys
306            .contains(&"aws_access_key_id".to_string()));
307    }
308
309    #[test]
310    fn test_resolve_s3_no_credentials() {
311        let resolved = StorageCredentialResolver::resolve_with_env(
312            "s3://bucket/path",
313            &empty_opts(),
314            env_none,
315        );
316        assert_eq!(resolved.provider, StorageProvider::AwsS3);
317        assert!(!resolved.has_credentials());
318    }
319
320    #[test]
321    fn test_resolve_s3_session_token() {
322        let env = |var: &str| -> Option<String> {
323            match var {
324                "AWS_SESSION_TOKEN" => Some("token123".to_string()),
325                _ => None,
326            }
327        };
328        let resolved =
329            StorageCredentialResolver::resolve_with_env("s3://bucket/path", &empty_opts(), env);
330        assert_eq!(resolved.options["aws_session_token"], "token123");
331    }
332
333    #[test]
334    fn test_resolve_s3_profile() {
335        let mut opts = HashMap::new();
336        opts.insert("aws_profile".to_string(), "production".to_string());
337
338        let resolved =
339            StorageCredentialResolver::resolve_with_env("s3://bucket/path", &opts, env_none);
340        assert!(resolved.has_credentials());
341        assert_eq!(resolved.options["aws_profile"], "production");
342    }
343
344    #[test]
345    fn test_resolve_s3_custom_endpoint() {
346        let mut opts = HashMap::new();
347        opts.insert(
348            "aws_endpoint".to_string(),
349            "http://localhost:9000".to_string(),
350        );
351        opts.insert("aws_s3_allow_unsafe_rename".to_string(), "true".to_string());
352        opts.insert("aws_access_key_id".to_string(), "minioadmin".to_string());
353        opts.insert(
354            "aws_secret_access_key".to_string(),
355            "minioadmin".to_string(),
356        );
357
358        let resolved =
359            StorageCredentialResolver::resolve_with_env("s3://bucket/path", &opts, env_none);
360        assert_eq!(resolved.options["aws_endpoint"], "http://localhost:9000");
361        assert_eq!(resolved.options["aws_s3_allow_unsafe_rename"], "true");
362    }
363
364    // ── Azure tests ──
365
366    #[test]
367    fn test_resolve_azure_env_fallback() {
368        let resolved = StorageCredentialResolver::resolve_with_env(
369            "az://container/path",
370            &empty_opts(),
371            azure_env,
372        );
373        assert_eq!(resolved.provider, StorageProvider::AzureAdls);
374        assert_eq!(resolved.options["azure_storage_account_name"], "myaccount");
375        assert_eq!(resolved.options["azure_storage_account_key"], "base64key==");
376        assert!(resolved.has_credentials());
377    }
378
379    #[test]
380    fn test_resolve_azure_sas_token() {
381        let mut opts = HashMap::new();
382        opts.insert("azure_storage_account_name".to_string(), "acct".to_string());
383        opts.insert(
384            "azure_storage_sas_token".to_string(),
385            "sv=2021-06&sig=abc".to_string(),
386        );
387
388        let resolved = StorageCredentialResolver::resolve_with_env(
389            "abfss://container@acct.dfs.core.windows.net/path",
390            &opts,
391            env_none,
392        );
393        assert!(resolved.has_credentials());
394        assert_eq!(
395            resolved.options["azure_storage_sas_token"],
396            "sv=2021-06&sig=abc"
397        );
398    }
399
400    #[test]
401    fn test_resolve_azure_client_id() {
402        let mut opts = HashMap::new();
403        opts.insert("azure_storage_account_name".to_string(), "acct".to_string());
404        opts.insert(
405            "azure_storage_client_id".to_string(),
406            "client-id-123".to_string(),
407        );
408
409        let resolved =
410            StorageCredentialResolver::resolve_with_env("az://container/path", &opts, env_none);
411        assert!(resolved.has_credentials());
412    }
413
414    #[test]
415    fn test_resolve_azure_no_credentials() {
416        let resolved = StorageCredentialResolver::resolve_with_env(
417            "az://container/path",
418            &empty_opts(),
419            env_none,
420        );
421        assert!(!resolved.has_credentials());
422    }
423
424    // ── GCS tests ──
425
426    #[test]
427    fn test_resolve_gcs_env_fallback() {
428        let resolved =
429            StorageCredentialResolver::resolve_with_env("gs://bucket/path", &empty_opts(), gcs_env);
430        assert_eq!(resolved.provider, StorageProvider::Gcs);
431        assert_eq!(
432            resolved.options["google_service_account_path"],
433            "/path/to/creds.json"
434        );
435        assert!(resolved.has_credentials());
436    }
437
438    #[test]
439    fn test_resolve_gcs_inline_key() {
440        let mut opts = HashMap::new();
441        opts.insert(
442            "google_service_account_key".to_string(),
443            r#"{"type":"service_account"}"#.to_string(),
444        );
445
446        let resolved =
447            StorageCredentialResolver::resolve_with_env("gs://bucket/path", &opts, env_none);
448        assert!(resolved.has_credentials());
449    }
450
451    #[test]
452    fn test_resolve_gcs_no_credentials() {
453        let resolved = StorageCredentialResolver::resolve_with_env(
454            "gs://bucket/path",
455            &empty_opts(),
456            env_none,
457        );
458        assert!(!resolved.has_credentials());
459    }
460
461    // ── Env tracking tests ──
462
463    #[test]
464    fn test_env_resolved_keys_tracked() {
465        let resolved =
466            StorageCredentialResolver::resolve_with_env("s3://bucket/path", &empty_opts(), aws_env);
467        assert!(resolved
468            .env_resolved_keys
469            .contains(&"aws_access_key_id".to_string()));
470        assert!(resolved
471            .env_resolved_keys
472            .contains(&"aws_secret_access_key".to_string()));
473        assert!(resolved
474            .env_resolved_keys
475            .contains(&"aws_region".to_string()));
476    }
477
478    #[test]
479    fn test_empty_env_var_not_used() {
480        let env = |var: &str| -> Option<String> {
481            match var {
482                "AWS_REGION" => Some(String::new()),
483                _ => None,
484            }
485        };
486        let resolved =
487            StorageCredentialResolver::resolve_with_env("s3://bucket/path", &empty_opts(), env);
488        assert!(!resolved.options.contains_key("aws_region"));
489    }
490
491    #[test]
492    fn test_s3a_resolves_as_s3() {
493        let resolved = StorageCredentialResolver::resolve_with_env(
494            "s3a://bucket/path",
495            &empty_opts(),
496            aws_env,
497        );
498        assert_eq!(resolved.provider, StorageProvider::AwsS3);
499        assert!(resolved.has_credentials());
500    }
501}