Skip to main content

laminar_storage/
tiering.rs

1//! S3 storage class tiering for checkpoint data.
2//!
3//! Maps checkpoint objects to storage tiers (hot/warm/cold) with appropriate
4//! S3 storage classes, object tags for lifecycle rules, and compression
5//! strategies (LZ4 for hot, Zstd for warm/cold).
6
7use std::fmt;
8
9use object_store::{Attribute, Attributes, PutOptions, TagSet};
10
11// ---------------------------------------------------------------------------
12// StorageClass
13// ---------------------------------------------------------------------------
14
15/// S3-compatible storage class for checkpoint objects.
16///
17/// Maps to the `x-amz-storage-class` header on PUT requests. Provider-specific
18/// strings are used because each cloud provider has different class names.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum StorageClass {
21    /// S3 Standard (default). Multi-AZ durability, low latency.
22    Standard,
23    /// S3 Express One Zone. Single-digit ms latency, -55% PUT / -85% GET cost.
24    ExpressOneZone,
25    /// S3 Intelligent-Tiering. Automatic cost optimization by access pattern.
26    IntelligentTiering,
27    /// S3 Glacier Instant Retrieval. Archive with millisecond access, -68% cost.
28    GlacierInstantRetrieval,
29}
30
31impl StorageClass {
32    /// S3 storage class header value.
33    #[must_use]
34    pub fn as_s3_str(self) -> &'static str {
35        match self {
36            Self::Standard => "STANDARD",
37            Self::ExpressOneZone => "EXPRESS_ONEZONE",
38            Self::IntelligentTiering => "INTELLIGENT_TIERING",
39            Self::GlacierInstantRetrieval => "GLACIER_IR",
40        }
41    }
42
43    /// Parse from a configuration string (case-insensitive, underscores/hyphens tolerated).
44    #[must_use]
45    pub fn from_config(s: &str) -> Option<Self> {
46        match s.to_ascii_uppercase().replace('-', "_").as_str() {
47            "STANDARD" => Some(Self::Standard),
48            "EXPRESS_ONEZONE" | "EXPRESS_ONE_ZONE" => Some(Self::ExpressOneZone),
49            "INTELLIGENT_TIERING" => Some(Self::IntelligentTiering),
50            "GLACIER_IR" | "GLACIER_INSTANT_RETRIEVAL" => Some(Self::GlacierInstantRetrieval),
51            _ => None,
52        }
53    }
54}
55
56impl fmt::Display for StorageClass {
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
58        f.write_str(self.as_s3_str())
59    }
60}
61
62// ---------------------------------------------------------------------------
63// StorageTier
64// ---------------------------------------------------------------------------
65
66/// Logical tier for a checkpoint object.
67///
68/// Each tier maps to a storage class, compression strategy, and object tag.
69/// S3 Lifecycle rules target objects by the `laminardb-tier` tag to transition
70/// them between storage classes over time.
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
72pub enum StorageTier {
73    /// Active checkpoints. Fast access, LZ4 compression.
74    Hot,
75    /// Historical checkpoints (1-7 days). Zstd compression.
76    Warm,
77    /// Archive checkpoints. Zstd max compression.
78    Cold,
79}
80
81impl StorageTier {
82    /// Tag value for S3 Lifecycle rule targeting.
83    #[must_use]
84    pub fn tag_value(self) -> &'static str {
85        match self {
86            Self::Hot => "hot",
87            Self::Warm => "warm",
88            Self::Cold => "cold",
89        }
90    }
91}
92
93impl fmt::Display for StorageTier {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        f.write_str(self.tag_value())
96    }
97}
98
99// ---------------------------------------------------------------------------
100// TieringPolicy
101// ---------------------------------------------------------------------------
102
103/// Builds [`PutOptions`] and selects compression for each storage tier.
104///
105/// Constructed from config strings (hot/warm/cold class names). When no
106/// tiering is configured, all objects use `Standard` class with LZ4.
107#[derive(Debug, Clone)]
108#[allow(clippy::struct_field_names)]
109pub struct TieringPolicy {
110    hot_class: StorageClass,
111    warm_class: StorageClass,
112    cold_class: Option<StorageClass>,
113}
114
115impl TieringPolicy {
116    /// Create a tiering policy from config class names.
117    ///
118    /// Falls back to `Standard` for unrecognized or empty class strings.
119    #[must_use]
120    pub fn new(hot_class: &str, warm_class: &str, cold_class: &str) -> Self {
121        Self {
122            hot_class: StorageClass::from_config(hot_class).unwrap_or(StorageClass::Standard),
123            warm_class: StorageClass::from_config(warm_class).unwrap_or(StorageClass::Standard),
124            cold_class: if cold_class.is_empty() {
125                None
126            } else {
127                StorageClass::from_config(cold_class)
128            },
129        }
130    }
131
132    /// Default policy: all tiers use `Standard`.
133    #[must_use]
134    pub fn standard() -> Self {
135        Self {
136            hot_class: StorageClass::Standard,
137            warm_class: StorageClass::Standard,
138            cold_class: None,
139        }
140    }
141
142    /// Storage class for the given tier.
143    #[must_use]
144    pub fn storage_class(&self, tier: StorageTier) -> StorageClass {
145        match tier {
146            StorageTier::Hot => self.hot_class,
147            StorageTier::Warm => self.warm_class,
148            StorageTier::Cold => self.cold_class.unwrap_or(self.warm_class),
149        }
150    }
151
152    /// Whether a cold tier is configured.
153    #[must_use]
154    pub fn has_cold_tier(&self) -> bool {
155        self.cold_class.is_some()
156    }
157
158    /// Build [`PutOptions`] for the given tier.
159    ///
160    /// Sets:
161    /// - `Attribute::StorageClass` to the tier's S3 class
162    /// - `laminardb-tier` tag for lifecycle rule targeting
163    #[must_use]
164    pub fn put_options(&self, tier: StorageTier) -> PutOptions {
165        let class = self.storage_class(tier);
166
167        let mut attrs = Attributes::new();
168        attrs.insert(Attribute::StorageClass, class.as_s3_str().into());
169
170        let mut tags = TagSet::default();
171        tags.push("laminardb-tier", tier.tag_value());
172
173        PutOptions {
174            attributes: attrs,
175            tags,
176            ..PutOptions::default()
177        }
178    }
179
180    /// Build [`PutOptions`] for the given tier with conditional-create mode.
181    #[must_use]
182    pub fn put_options_create(&self, tier: StorageTier) -> PutOptions {
183        let mut opts = self.put_options(tier);
184        opts.mode = object_store::PutMode::Create;
185        opts
186    }
187}
188
189impl Default for TieringPolicy {
190    fn default() -> Self {
191        Self::standard()
192    }
193}
194
195// ---------------------------------------------------------------------------
196// Tier-aware compression (test-only)
197// ---------------------------------------------------------------------------
198
199#[cfg(test)]
200/// Zstd compression level for warm tier (fast, good ratio).
201const ZSTD_WARM_LEVEL: i32 = 3;
202
203#[cfg(test)]
204/// Zstd compression level for cold tier (max ratio, slower).
205const ZSTD_COLD_LEVEL: i32 = 19;
206
207#[cfg(test)]
208fn compress_for_tier(data: &[u8], tier: StorageTier) -> Vec<u8> {
209    match tier {
210        StorageTier::Hot => lz4_flex::compress_prepend_size(data),
211        StorageTier::Warm => zstd::encode_all(data, ZSTD_WARM_LEVEL).unwrap(),
212        StorageTier::Cold => zstd::encode_all(data, ZSTD_COLD_LEVEL).unwrap(),
213    }
214}
215
216#[cfg(test)]
217fn decompress_for_tier(
218    compressed: &[u8],
219    tier: StorageTier,
220) -> Result<Vec<u8>, DecompressionError> {
221    match tier {
222        StorageTier::Hot => lz4_flex::decompress_size_prepended(compressed)
223            .map_err(|e| DecompressionError(format!("LZ4 decompression failed: {e}"))),
224        StorageTier::Warm | StorageTier::Cold => zstd::decode_all(compressed)
225            .map_err(|e| DecompressionError(format!("Zstd decompression failed: {e}"))),
226    }
227}
228
229/// Error from tier-aware decompression.
230#[cfg(test)]
231#[derive(Debug, thiserror::Error)]
232#[error("{0}")]
233pub struct DecompressionError(String);
234
235// ---------------------------------------------------------------------------
236// Tests
237// ---------------------------------------------------------------------------
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    // -- StorageClass --
244
245    #[test]
246    fn test_storage_class_s3_strings() {
247        assert_eq!(StorageClass::Standard.as_s3_str(), "STANDARD");
248        assert_eq!(StorageClass::ExpressOneZone.as_s3_str(), "EXPRESS_ONEZONE");
249        assert_eq!(
250            StorageClass::IntelligentTiering.as_s3_str(),
251            "INTELLIGENT_TIERING"
252        );
253        assert_eq!(
254            StorageClass::GlacierInstantRetrieval.as_s3_str(),
255            "GLACIER_IR"
256        );
257    }
258
259    #[test]
260    fn test_storage_class_from_config() {
261        assert_eq!(
262            StorageClass::from_config("STANDARD"),
263            Some(StorageClass::Standard)
264        );
265        assert_eq!(
266            StorageClass::from_config("express_one_zone"),
267            Some(StorageClass::ExpressOneZone)
268        );
269        assert_eq!(
270            StorageClass::from_config("EXPRESS-ONEZONE"),
271            Some(StorageClass::ExpressOneZone)
272        );
273        assert_eq!(
274            StorageClass::from_config("intelligent_tiering"),
275            Some(StorageClass::IntelligentTiering)
276        );
277        assert_eq!(
278            StorageClass::from_config("GLACIER_IR"),
279            Some(StorageClass::GlacierInstantRetrieval)
280        );
281        assert_eq!(
282            StorageClass::from_config("glacier_instant_retrieval"),
283            Some(StorageClass::GlacierInstantRetrieval)
284        );
285        assert_eq!(StorageClass::from_config("unknown"), None);
286        assert_eq!(StorageClass::from_config(""), None);
287    }
288
289    #[test]
290    fn test_storage_class_display() {
291        assert_eq!(format!("{}", StorageClass::Standard), "STANDARD");
292        assert_eq!(
293            format!("{}", StorageClass::GlacierInstantRetrieval),
294            "GLACIER_IR"
295        );
296    }
297
298    // -- StorageTier --
299
300    #[test]
301    fn test_tier_tag_values() {
302        assert_eq!(StorageTier::Hot.tag_value(), "hot");
303        assert_eq!(StorageTier::Warm.tag_value(), "warm");
304        assert_eq!(StorageTier::Cold.tag_value(), "cold");
305    }
306
307    // -- TieringPolicy --
308
309    #[test]
310    fn test_policy_from_config() {
311        let policy = TieringPolicy::new("EXPRESS_ONE_ZONE", "STANDARD", "GLACIER_IR");
312        assert_eq!(
313            policy.storage_class(StorageTier::Hot),
314            StorageClass::ExpressOneZone
315        );
316        assert_eq!(
317            policy.storage_class(StorageTier::Warm),
318            StorageClass::Standard
319        );
320        assert_eq!(
321            policy.storage_class(StorageTier::Cold),
322            StorageClass::GlacierInstantRetrieval
323        );
324        assert!(policy.has_cold_tier());
325    }
326
327    #[test]
328    fn test_policy_no_cold_tier() {
329        let policy = TieringPolicy::new("STANDARD", "STANDARD", "");
330        assert!(!policy.has_cold_tier());
331        // Cold falls back to warm class
332        assert_eq!(
333            policy.storage_class(StorageTier::Cold),
334            StorageClass::Standard
335        );
336    }
337
338    #[test]
339    fn test_policy_standard_default() {
340        let policy = TieringPolicy::standard();
341        assert_eq!(
342            policy.storage_class(StorageTier::Hot),
343            StorageClass::Standard
344        );
345        assert_eq!(
346            policy.storage_class(StorageTier::Warm),
347            StorageClass::Standard
348        );
349        assert!(!policy.has_cold_tier());
350    }
351
352    #[test]
353    fn test_policy_unknown_config_falls_back() {
354        let policy = TieringPolicy::new("NONEXISTENT", "ALSO_BAD", "");
355        assert_eq!(
356            policy.storage_class(StorageTier::Hot),
357            StorageClass::Standard
358        );
359        assert_eq!(
360            policy.storage_class(StorageTier::Warm),
361            StorageClass::Standard
362        );
363    }
364
365    #[test]
366    fn test_put_options_has_storage_class() {
367        let policy = TieringPolicy::new("EXPRESS_ONE_ZONE", "STANDARD", "GLACIER_IR");
368        let opts = policy.put_options(StorageTier::Hot);
369
370        let class = opts.attributes.get(&Attribute::StorageClass);
371        assert!(class.is_some());
372        assert_eq!(class.unwrap().as_ref(), "EXPRESS_ONEZONE");
373    }
374
375    #[test]
376    fn test_put_options_has_tier_tag() {
377        let policy = TieringPolicy::new("STANDARD", "STANDARD", "");
378        let opts = policy.put_options(StorageTier::Warm);
379
380        let encoded = opts.tags.encoded();
381        assert!(
382            encoded.contains("laminardb-tier=warm"),
383            "tag encoding: {encoded}"
384        );
385    }
386
387    #[test]
388    fn test_put_options_create_mode() {
389        let policy = TieringPolicy::standard();
390        let opts = policy.put_options_create(StorageTier::Hot);
391        assert!(matches!(opts.mode, object_store::PutMode::Create));
392    }
393
394    // -- Compression --
395
396    #[test]
397    fn test_lz4_roundtrip_hot() {
398        let data = b"checkpoint state data for hot tier recovery";
399        let compressed = compress_for_tier(data, StorageTier::Hot);
400        let decompressed = decompress_for_tier(&compressed, StorageTier::Hot).unwrap();
401        assert_eq!(decompressed, data);
402    }
403
404    #[test]
405    fn test_zstd_roundtrip_warm() {
406        let data = b"checkpoint state data for warm tier archival";
407        let compressed = compress_for_tier(data, StorageTier::Warm);
408        let decompressed = decompress_for_tier(&compressed, StorageTier::Warm).unwrap();
409        assert_eq!(decompressed, data);
410    }
411
412    #[test]
413    fn test_zstd_roundtrip_cold() {
414        let data = b"checkpoint state data for cold tier archive compliance";
415        let compressed = compress_for_tier(data, StorageTier::Cold);
416        let decompressed = decompress_for_tier(&compressed, StorageTier::Cold).unwrap();
417        assert_eq!(decompressed, data);
418    }
419
420    #[test]
421    fn test_cold_compresses_better_than_warm() {
422        // Repetitive data to make compression ratio visible
423        let data: Vec<u8> = (0u8..=255).cycle().take(10_000).collect();
424        let warm = compress_for_tier(&data, StorageTier::Warm);
425        let cold = compress_for_tier(&data, StorageTier::Cold);
426        assert!(
427            cold.len() <= warm.len(),
428            "cold ({}) should be <= warm ({})",
429            cold.len(),
430            warm.len()
431        );
432    }
433
434    #[test]
435    fn test_decompress_corrupt_data() {
436        let bad = b"not valid compressed data";
437        assert!(decompress_for_tier(bad, StorageTier::Hot).is_err());
438        assert!(decompress_for_tier(bad, StorageTier::Warm).is_err());
439        assert!(decompress_for_tier(bad, StorageTier::Cold).is_err());
440    }
441}