laminar_connectors/mongodb/
timeseries.rs1use crate::error::ConnectorError;
18
19#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
21#[serde(tag = "kind", rename_all = "snake_case")]
22pub enum CollectionKind {
23 #[default]
25 Standard,
26 TimeSeries(TimeSeriesConfig),
28}
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
35pub struct TimeSeriesConfig {
36 pub time_field: String,
38
39 pub meta_field: Option<String>,
42
43 pub granularity: TimeSeriesGranularity,
45
46 pub expire_after_seconds: Option<u64>,
48}
49
50#[derive(
65 Debug, Clone, Copy, PartialEq, Eq, Hash, Default, serde::Serialize, serde::Deserialize,
66)]
67#[serde(tag = "type", rename_all = "snake_case")]
68pub enum TimeSeriesGranularity {
69 #[default]
71 Seconds,
72 Minutes,
74 Hours,
76 Custom {
82 bucket_max_span_seconds: u32,
84 bucket_rounding_seconds: u32,
86 },
87}
88
89impl TimeSeriesGranularity {
90 pub fn custom(
97 bucket_max_span_seconds: u32,
98 bucket_rounding_seconds: u32,
99 ) -> Result<Self, ConnectorError> {
100 if bucket_max_span_seconds != bucket_rounding_seconds {
101 return Err(ConnectorError::ConfigurationError(format!(
102 "time series custom granularity requires bucket_max_span_seconds ({bucket_max_span_seconds}) \
103 == bucket_rounding_seconds ({bucket_rounding_seconds})"
104 )));
105 }
106 if bucket_max_span_seconds == 0 {
107 return Err(ConnectorError::ConfigurationError(
108 "time series custom granularity bucket_max_span_seconds must be > 0".to_string(),
109 ));
110 }
111 Ok(Self::Custom {
112 bucket_max_span_seconds,
113 bucket_rounding_seconds,
114 })
115 }
116
117 #[must_use]
120 fn ordinal(self) -> u32 {
121 match self {
122 Self::Seconds => 1,
123 Self::Minutes => 2,
124 Self::Hours => 3,
125 Self::Custom {
126 bucket_max_span_seconds,
127 ..
128 } => bucket_max_span_seconds,
129 }
130 }
131
132 #[must_use]
136 pub fn is_finer_or_equal(self, other: Self) -> bool {
137 self.ordinal() <= other.ordinal()
138 }
139
140 pub fn validate_change(current: Self, requested: Self) -> Result<(), ConnectorError> {
147 if !current.is_finer_or_equal(requested) {
148 return Err(ConnectorError::ConfigurationError(format!(
149 "cannot decrease time series granularity from {current:?} to {requested:?}"
150 )));
151 }
152 Ok(())
153 }
154}
155
156impl std::fmt::Display for TimeSeriesGranularity {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 match self {
159 Self::Seconds => f.write_str("seconds"),
160 Self::Minutes => f.write_str("minutes"),
161 Self::Hours => f.write_str("hours"),
162 Self::Custom {
163 bucket_max_span_seconds,
164 ..
165 } => write!(f, "custom({bucket_max_span_seconds}s)"),
166 }
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 #[test]
175 fn test_granularity_default() {
176 assert_eq!(
177 TimeSeriesGranularity::default(),
178 TimeSeriesGranularity::Seconds
179 );
180 }
181
182 #[test]
183 fn test_custom_granularity_valid() {
184 let g = TimeSeriesGranularity::custom(3600, 3600).unwrap();
185 assert!(matches!(g, TimeSeriesGranularity::Custom { .. }));
186 }
187
188 #[test]
189 fn test_custom_granularity_mismatch() {
190 let err = TimeSeriesGranularity::custom(3600, 1800).unwrap_err();
191 assert!(err.to_string().contains("bucket_max_span_seconds"));
192 }
193
194 #[test]
195 fn test_custom_granularity_zero() {
196 let err = TimeSeriesGranularity::custom(0, 0).unwrap_err();
197 assert!(err.to_string().contains("must be > 0"));
198 }
199
200 #[test]
201 fn test_granularity_ordering() {
202 assert!(TimeSeriesGranularity::Seconds.is_finer_or_equal(TimeSeriesGranularity::Minutes));
203 assert!(TimeSeriesGranularity::Minutes.is_finer_or_equal(TimeSeriesGranularity::Hours));
204 assert!(!TimeSeriesGranularity::Hours.is_finer_or_equal(TimeSeriesGranularity::Seconds));
205 assert!(TimeSeriesGranularity::Seconds.is_finer_or_equal(TimeSeriesGranularity::Seconds));
206 }
207
208 #[test]
209 fn test_validate_change_increase_ok() {
210 TimeSeriesGranularity::validate_change(
211 TimeSeriesGranularity::Seconds,
212 TimeSeriesGranularity::Minutes,
213 )
214 .unwrap();
215 }
216
217 #[test]
218 fn test_validate_change_decrease_denied() {
219 let err = TimeSeriesGranularity::validate_change(
220 TimeSeriesGranularity::Hours,
221 TimeSeriesGranularity::Seconds,
222 )
223 .unwrap_err();
224 assert!(err.to_string().contains("decrease"));
225 }
226
227 #[test]
228 fn test_collection_kind_default() {
229 assert!(matches!(
230 CollectionKind::default(),
231 CollectionKind::Standard
232 ));
233 }
234
235 #[test]
236 fn test_granularity_display() {
237 assert_eq!(TimeSeriesGranularity::Seconds.to_string(), "seconds");
238 assert_eq!(TimeSeriesGranularity::Minutes.to_string(), "minutes");
239 assert_eq!(TimeSeriesGranularity::Hours.to_string(), "hours");
240 let custom = TimeSeriesGranularity::custom(7200, 7200).unwrap();
241 assert_eq!(custom.to_string(), "custom(7200s)");
242 }
243}