laminar_connectors/mongodb/
large_event.rs1use std::collections::HashMap;
19use std::time::{Duration, Instant};
20
21const DEFAULT_REASSEMBLY_TIMEOUT: Duration = Duration::from_secs(30);
23
24#[derive(Debug, thiserror::Error)]
26pub enum LargeEventError {
27 #[error("large event reassembly failed: expected {expected} fragments, received {received}")]
29 ReassemblyFailed {
30 expected: u32,
32 received: u32,
34 },
35
36 #[error("fragment index {index} out of range (total: {total})")]
38 FragmentOutOfRange {
39 index: u32,
41 total: u32,
43 },
44}
45
46#[derive(Debug, Clone)]
48pub struct SplitEventInfo {
49 pub fragment: u32,
51 pub of: u32,
53}
54
55#[derive(Debug, Clone)]
57pub struct EventFragment {
58 pub split_info: SplitEventInfo,
60 pub body: String,
62}
63
64#[derive(Debug)]
66struct PendingReassembly {
67 total: u32,
69 fragments: HashMap<u32, String>,
71 started_at: Instant,
73}
74
75#[derive(Debug)]
81pub struct LargeEventReassembler {
82 pending: HashMap<String, PendingReassembly>,
84 timeout: Duration,
86}
87
88impl LargeEventReassembler {
89 #[must_use]
91 pub fn new() -> Self {
92 Self {
93 pending: HashMap::new(),
94 timeout: DEFAULT_REASSEMBLY_TIMEOUT,
95 }
96 }
97
98 #[must_use]
100 pub fn with_timeout(timeout: Duration) -> Self {
101 Self {
102 pending: HashMap::new(),
103 timeout,
104 }
105 }
106
107 pub fn add_fragment(
120 &mut self,
121 correlation_id: &str,
122 fragment: EventFragment,
123 ) -> Result<Option<String>, LargeEventError> {
124 let info = &fragment.split_info;
125
126 if info.fragment == 0 || info.fragment > info.of {
127 return Err(LargeEventError::FragmentOutOfRange {
128 index: info.fragment,
129 total: info.of,
130 });
131 }
132
133 let entry = self
134 .pending
135 .entry(correlation_id.to_string())
136 .or_insert_with(|| PendingReassembly {
137 total: info.of,
138 fragments: HashMap::new(),
139 started_at: Instant::now(),
140 });
141
142 entry.fragments.insert(info.fragment, fragment.body);
143
144 #[allow(clippy::cast_possible_truncation)]
145 if entry.fragments.len() as u32 == entry.total {
146 let reassembly = self.pending.remove(correlation_id).unwrap_or_else(|| {
147 unreachable!("pending entry disappeared during reassembly")
149 });
150 let mut body = String::new();
151 for i in 1..=reassembly.total {
152 if let Some(part) = reassembly.fragments.get(&i) {
153 body.push_str(part);
154 }
155 }
156 Ok(Some(body))
157 } else {
158 Ok(None)
159 }
160 }
161
162 pub fn evict_expired(&mut self) -> Vec<(String, LargeEventError)> {
164 let now = Instant::now();
165 let mut expired = Vec::new();
166
167 self.pending.retain(|id, pending| {
168 if now.duration_since(pending.started_at) > self.timeout {
169 expired.push((
170 id.clone(),
171 LargeEventError::ReassemblyFailed {
172 expected: pending.total,
173 #[allow(clippy::cast_possible_truncation)]
174 received: pending.fragments.len() as u32,
175 },
176 ));
177 false
178 } else {
179 true
180 }
181 });
182
183 expired
184 }
185
186 #[must_use]
188 pub fn pending_count(&self) -> usize {
189 self.pending.len()
190 }
191}
192
193impl Default for LargeEventReassembler {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 fn make_fragment(index: u32, total: u32, body: &str) -> EventFragment {
204 EventFragment {
205 split_info: SplitEventInfo {
206 fragment: index,
207 of: total,
208 },
209 body: body.to_string(),
210 }
211 }
212
213 #[test]
214 fn test_single_fragment_event() {
215 let mut r = LargeEventReassembler::new();
216 let result = r
217 .add_fragment("tok1", make_fragment(1, 1, "full_body"))
218 .unwrap();
219 assert_eq!(result.as_deref(), Some("full_body"));
220 assert_eq!(r.pending_count(), 0);
221 }
222
223 #[test]
224 fn test_two_fragment_event() {
225 let mut r = LargeEventReassembler::new();
226
227 let result = r
228 .add_fragment("tok1", make_fragment(1, 2, "first_"))
229 .unwrap();
230 assert!(result.is_none());
231 assert_eq!(r.pending_count(), 1);
232
233 let result = r
234 .add_fragment("tok1", make_fragment(2, 2, "second"))
235 .unwrap();
236 assert_eq!(result.as_deref(), Some("first_second"));
237 assert_eq!(r.pending_count(), 0);
238 }
239
240 #[test]
241 fn test_out_of_order_fragments() {
242 let mut r = LargeEventReassembler::new();
243
244 let result = r.add_fragment("tok1", make_fragment(2, 3, "B")).unwrap();
246 assert!(result.is_none());
247
248 let result = r.add_fragment("tok1", make_fragment(3, 3, "C")).unwrap();
249 assert!(result.is_none());
250
251 let result = r.add_fragment("tok1", make_fragment(1, 3, "A")).unwrap();
252 assert_eq!(result.as_deref(), Some("ABC"));
253 }
254
255 #[test]
256 fn test_fragment_out_of_range() {
257 let mut r = LargeEventReassembler::new();
258 let err = r
259 .add_fragment("tok1", make_fragment(0, 2, "body"))
260 .unwrap_err();
261 assert!(matches!(err, LargeEventError::FragmentOutOfRange { .. }));
262
263 let err = r
264 .add_fragment("tok1", make_fragment(3, 2, "body"))
265 .unwrap_err();
266 assert!(matches!(err, LargeEventError::FragmentOutOfRange { .. }));
267 }
268
269 #[test]
270 fn test_evict_expired() {
271 let mut r = LargeEventReassembler::with_timeout(Duration::from_millis(0));
272
273 r.add_fragment("tok1", make_fragment(1, 2, "partial"))
274 .unwrap();
275
276 std::thread::sleep(Duration::from_millis(1));
278 let expired = r.evict_expired();
279 assert_eq!(expired.len(), 1);
280 assert_eq!(expired[0].0, "tok1");
281 assert!(matches!(
282 expired[0].1,
283 LargeEventError::ReassemblyFailed {
284 expected: 2,
285 received: 1,
286 }
287 ));
288 assert_eq!(r.pending_count(), 0);
289 }
290
291 #[test]
292 fn test_multiple_concurrent_events() {
293 let mut r = LargeEventReassembler::new();
294
295 r.add_fragment("tok1", make_fragment(1, 2, "A1")).unwrap();
296 r.add_fragment("tok2", make_fragment(1, 2, "B1")).unwrap();
297 assert_eq!(r.pending_count(), 2);
298
299 let result = r.add_fragment("tok1", make_fragment(2, 2, "A2")).unwrap();
300 assert_eq!(result.as_deref(), Some("A1A2"));
301 assert_eq!(r.pending_count(), 1);
302
303 let result = r.add_fragment("tok2", make_fragment(2, 2, "B2")).unwrap();
304 assert_eq!(result.as_deref(), Some("B1B2"));
305 assert_eq!(r.pending_count(), 0);
306 }
307}