Skip to main content

laminar_connectors/mongodb/
timeseries.rs

1//! `MongoDB` time series collection configuration and validation.
2//!
3//! Time series collections in `MongoDB` use automatic bucketing for efficient
4//! storage and querying of time-stamped measurement data. This module
5//! provides typed configuration for creating and validating time series
6//! collections.
7//!
8//! # Important Constraints
9//!
10//! - Time series collections only accept `insert` operations; other write
11//!   modes are rejected at the sink level.
12//! - Granularity can only be increased (seconds → minutes → hours), never
13//!   decreased after collection creation.
14//! - `MongoDB` does not support `watch()` (change streams) on time series
15//!   collections — the source pre-flight guard rejects these.
16
17use crate::error::ConnectorError;
18
19/// Whether the target collection is a standard or time series collection.
20#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
21#[serde(tag = "kind", rename_all = "snake_case")]
22pub enum CollectionKind {
23    /// Standard `MongoDB` collection.
24    #[default]
25    Standard,
26    /// Time series collection with bucketing configuration.
27    TimeSeries(TimeSeriesConfig),
28}
29
30/// Configuration for a `MongoDB` time series collection.
31///
32/// Maps to `MongoDB`'s `timeseries` collection option. The `time_field`
33/// is required; `meta_field` and TTL are optional.
34#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
35pub struct TimeSeriesConfig {
36    /// The field in each document that contains the date.
37    pub time_field: String,
38
39    /// An optional field that labels the data source (e.g., sensor ID).
40    /// Documents with the same meta value are bucketed together.
41    pub meta_field: Option<String>,
42
43    /// Bucketing granularity.
44    pub granularity: TimeSeriesGranularity,
45
46    /// Optional TTL: automatically delete documents after this many seconds.
47    pub expire_after_seconds: Option<u64>,
48}
49
50/// Time series bucketing granularity.
51///
52/// Controls the bucket span for time series collections:
53///
54/// | Granularity | Bucket Span |
55/// |-------------|-------------|
56/// | Seconds     | 1 hour      |
57/// | Minutes     | 24 hours    |
58/// | Hours       | 30 days     |
59/// | Custom      | User-defined (`MongoDB` ≥ 6.3) |
60///
61/// Granularity can only increase (seconds → minutes → hours) on an
62/// existing collection. Attempting to decrease returns
63/// `GranularityDecreaseDenied`.
64#[derive(
65    Debug, Clone, Copy, PartialEq, Eq, Hash, Default, serde::Serialize, serde::Deserialize,
66)]
67#[serde(tag = "type", rename_all = "snake_case")]
68pub enum TimeSeriesGranularity {
69    /// Bucket span = 1 hour (default).
70    #[default]
71    Seconds,
72    /// Bucket span = 24 hours.
73    Minutes,
74    /// Bucket span = 30 days.
75    Hours,
76    /// Custom bucketing (`MongoDB` ≥ 6.3).
77    ///
78    /// **Invariant**: `bucket_max_span_seconds` must equal
79    /// `bucket_rounding_seconds`. This is enforced at construction via
80    /// [`TimeSeriesGranularity::custom`].
81    Custom {
82        /// Maximum span of a single bucket in seconds.
83        bucket_max_span_seconds: u32,
84        /// Rounding boundary in seconds (must equal `bucket_max_span_seconds`).
85        bucket_rounding_seconds: u32,
86    },
87}
88
89impl TimeSeriesGranularity {
90    /// Creates a `Custom` granularity, enforcing the invariant that
91    /// `bucket_max_span_seconds == bucket_rounding_seconds`.
92    ///
93    /// # Errors
94    ///
95    /// Returns `ConnectorError::ConfigurationError` if the values differ.
96    pub fn custom(
97        bucket_max_span_seconds: u32,
98        bucket_rounding_seconds: u32,
99    ) -> Result<Self, ConnectorError> {
100        if bucket_max_span_seconds != bucket_rounding_seconds {
101            return Err(ConnectorError::ConfigurationError(format!(
102                "time series custom granularity requires bucket_max_span_seconds ({bucket_max_span_seconds}) \
103                 == bucket_rounding_seconds ({bucket_rounding_seconds})"
104            )));
105        }
106        if bucket_max_span_seconds == 0 {
107            return Err(ConnectorError::ConfigurationError(
108                "time series custom granularity bucket_max_span_seconds must be > 0".to_string(),
109            ));
110        }
111        Ok(Self::Custom {
112            bucket_max_span_seconds,
113            bucket_rounding_seconds,
114        })
115    }
116
117    /// Returns an ordinal for comparison (higher = coarser granularity).
118    /// Custom granularity returns the span as its ordinal.
119    #[must_use]
120    fn ordinal(self) -> u32 {
121        match self {
122            Self::Seconds => 1,
123            Self::Minutes => 2,
124            Self::Hours => 3,
125            Self::Custom {
126                bucket_max_span_seconds,
127                ..
128            } => bucket_max_span_seconds,
129        }
130    }
131
132    /// Returns `true` if `self` is a finer (or equal) granularity than `other`.
133    ///
134    /// Used to validate that granularity changes only increase.
135    #[must_use]
136    pub fn is_finer_or_equal(self, other: Self) -> bool {
137        self.ordinal() <= other.ordinal()
138    }
139
140    /// Validates that changing from `current` to `requested` is allowed.
141    ///
142    /// # Errors
143    ///
144    /// Returns `ConnectorError::ConfigurationError` if the change would
145    /// decrease granularity.
146    pub fn validate_change(current: Self, requested: Self) -> Result<(), ConnectorError> {
147        if !current.is_finer_or_equal(requested) {
148            return Err(ConnectorError::ConfigurationError(format!(
149                "cannot decrease time series granularity from {current:?} to {requested:?}"
150            )));
151        }
152        Ok(())
153    }
154}
155
156impl std::fmt::Display for TimeSeriesGranularity {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        match self {
159            Self::Seconds => f.write_str("seconds"),
160            Self::Minutes => f.write_str("minutes"),
161            Self::Hours => f.write_str("hours"),
162            Self::Custom {
163                bucket_max_span_seconds,
164                ..
165            } => write!(f, "custom({bucket_max_span_seconds}s)"),
166        }
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    #[test]
175    fn test_granularity_default() {
176        assert_eq!(
177            TimeSeriesGranularity::default(),
178            TimeSeriesGranularity::Seconds
179        );
180    }
181
182    #[test]
183    fn test_custom_granularity_valid() {
184        let g = TimeSeriesGranularity::custom(3600, 3600).unwrap();
185        assert!(matches!(g, TimeSeriesGranularity::Custom { .. }));
186    }
187
188    #[test]
189    fn test_custom_granularity_mismatch() {
190        let err = TimeSeriesGranularity::custom(3600, 1800).unwrap_err();
191        assert!(err.to_string().contains("bucket_max_span_seconds"));
192    }
193
194    #[test]
195    fn test_custom_granularity_zero() {
196        let err = TimeSeriesGranularity::custom(0, 0).unwrap_err();
197        assert!(err.to_string().contains("must be > 0"));
198    }
199
200    #[test]
201    fn test_granularity_ordering() {
202        assert!(TimeSeriesGranularity::Seconds.is_finer_or_equal(TimeSeriesGranularity::Minutes));
203        assert!(TimeSeriesGranularity::Minutes.is_finer_or_equal(TimeSeriesGranularity::Hours));
204        assert!(!TimeSeriesGranularity::Hours.is_finer_or_equal(TimeSeriesGranularity::Seconds));
205        assert!(TimeSeriesGranularity::Seconds.is_finer_or_equal(TimeSeriesGranularity::Seconds));
206    }
207
208    #[test]
209    fn test_validate_change_increase_ok() {
210        TimeSeriesGranularity::validate_change(
211            TimeSeriesGranularity::Seconds,
212            TimeSeriesGranularity::Minutes,
213        )
214        .unwrap();
215    }
216
217    #[test]
218    fn test_validate_change_decrease_denied() {
219        let err = TimeSeriesGranularity::validate_change(
220            TimeSeriesGranularity::Hours,
221            TimeSeriesGranularity::Seconds,
222        )
223        .unwrap_err();
224        assert!(err.to_string().contains("decrease"));
225    }
226
227    #[test]
228    fn test_collection_kind_default() {
229        assert!(matches!(
230            CollectionKind::default(),
231            CollectionKind::Standard
232        ));
233    }
234
235    #[test]
236    fn test_granularity_display() {
237        assert_eq!(TimeSeriesGranularity::Seconds.to_string(), "seconds");
238        assert_eq!(TimeSeriesGranularity::Minutes.to_string(), "minutes");
239        assert_eq!(TimeSeriesGranularity::Hours.to_string(), "hours");
240        let custom = TimeSeriesGranularity::custom(7200, 7200).unwrap();
241        assert_eq!(custom.to_string(), "custom(7200s)");
242    }
243}