Skip to main content

laminar_connectors/mongodb/
large_event.rs

1//! Large event fragment reassembly for `MongoDB` change streams.
2//!
3//! `MongoDB` ≥ 6.0.9 supports the `$changeStreamSplitLargeEvent` pipeline
4//! stage, which splits change events exceeding the 16 MiB BSON limit into
5//! ordered fragments. Each fragment carries:
6//!
7//! - `splitEvent.fragment`: 1-based ordinal of this fragment
8//! - `splitEvent.of`: total number of fragments
9//!
10//! This module buffers fragments by their resume token prefix and
11//! reassembles them into a single event once all fragments arrive.
12//!
13//! # Error Handling
14//!
15//! If fragments arrive out of order or are incomplete after the
16//! configured timeout, `LargeEventReassemblyFailed` is emitted.
17
18use std::collections::HashMap;
19use std::time::{Duration, Instant};
20
21/// Default timeout for large event fragment reassembly.
22const DEFAULT_REASSEMBLY_TIMEOUT: Duration = Duration::from_secs(30);
23
24/// Error when large event reassembly fails.
25#[derive(Debug, thiserror::Error)]
26pub enum LargeEventError {
27    /// Not all fragments arrived before the timeout.
28    #[error("large event reassembly failed: expected {expected} fragments, received {received}")]
29    ReassemblyFailed {
30        /// Total expected fragments.
31        expected: u32,
32        /// Fragments actually received.
33        received: u32,
34    },
35
36    /// Fragment ordinal out of range.
37    #[error("fragment index {index} out of range (total: {total})")]
38    FragmentOutOfRange {
39        /// The fragment ordinal received.
40        index: u32,
41        /// The total fragments expected.
42        total: u32,
43    },
44}
45
46/// Metadata about a split event fragment.
47#[derive(Debug, Clone)]
48pub struct SplitEventInfo {
49    /// 1-based ordinal of this fragment.
50    pub fragment: u32,
51    /// Total number of fragments.
52    pub of: u32,
53}
54
55/// A single fragment of a large change event.
56#[derive(Debug, Clone)]
57pub struct EventFragment {
58    /// Fragment metadata.
59    pub split_info: SplitEventInfo,
60    /// The partial JSON body of this fragment.
61    pub body: String,
62}
63
64/// In-progress reassembly of a split large event.
65#[derive(Debug)]
66struct PendingReassembly {
67    /// Total number of fragments expected.
68    total: u32,
69    /// Fragments received, indexed by 1-based ordinal.
70    fragments: HashMap<u32, String>,
71    /// When the first fragment was received.
72    started_at: Instant,
73}
74
75/// Buffers and reassembles split large change events.
76///
77/// Events are keyed by a caller-provided correlation ID (typically the
78/// resume token of the first fragment). Completed events are returned
79/// as reassembled JSON strings.
80#[derive(Debug)]
81pub struct LargeEventReassembler {
82    /// In-progress reassemblies keyed by correlation ID.
83    pending: HashMap<String, PendingReassembly>,
84    /// Timeout for fragment completion.
85    timeout: Duration,
86}
87
88impl LargeEventReassembler {
89    /// Creates a new reassembler with the default timeout (30s).
90    #[must_use]
91    pub fn new() -> Self {
92        Self {
93            pending: HashMap::new(),
94            timeout: DEFAULT_REASSEMBLY_TIMEOUT,
95        }
96    }
97
98    /// Creates a new reassembler with a custom timeout.
99    #[must_use]
100    pub fn with_timeout(timeout: Duration) -> Self {
101        Self {
102            pending: HashMap::new(),
103            timeout,
104        }
105    }
106
107    /// Adds a fragment and returns the reassembled body if all fragments
108    /// have arrived.
109    ///
110    /// # Arguments
111    ///
112    /// - `correlation_id`: Unique ID to group fragments (e.g., resume token).
113    /// - `fragment`: The fragment to add.
114    ///
115    /// # Errors
116    ///
117    /// Returns `LargeEventError::FragmentOutOfRange` if the fragment ordinal
118    /// is invalid.
119    pub fn add_fragment(
120        &mut self,
121        correlation_id: &str,
122        fragment: EventFragment,
123    ) -> Result<Option<String>, LargeEventError> {
124        let info = &fragment.split_info;
125
126        if info.fragment == 0 || info.fragment > info.of {
127            return Err(LargeEventError::FragmentOutOfRange {
128                index: info.fragment,
129                total: info.of,
130            });
131        }
132
133        let entry = self
134            .pending
135            .entry(correlation_id.to_string())
136            .or_insert_with(|| PendingReassembly {
137                total: info.of,
138                fragments: HashMap::new(),
139                started_at: Instant::now(),
140            });
141
142        entry.fragments.insert(info.fragment, fragment.body);
143
144        #[allow(clippy::cast_possible_truncation)]
145        if entry.fragments.len() as u32 == entry.total {
146            let reassembly = self.pending.remove(correlation_id).unwrap_or_else(|| {
147                // Should be unreachable since we just checked.
148                unreachable!("pending entry disappeared during reassembly")
149            });
150            let mut body = String::new();
151            for i in 1..=reassembly.total {
152                if let Some(part) = reassembly.fragments.get(&i) {
153                    body.push_str(part);
154                }
155            }
156            Ok(Some(body))
157        } else {
158            Ok(None)
159        }
160    }
161
162    /// Evicts timed-out pending reassemblies and returns errors for each.
163    pub fn evict_expired(&mut self) -> Vec<(String, LargeEventError)> {
164        let now = Instant::now();
165        let mut expired = Vec::new();
166
167        self.pending.retain(|id, pending| {
168            if now.duration_since(pending.started_at) > self.timeout {
169                expired.push((
170                    id.clone(),
171                    LargeEventError::ReassemblyFailed {
172                        expected: pending.total,
173                        #[allow(clippy::cast_possible_truncation)]
174                        received: pending.fragments.len() as u32,
175                    },
176                ));
177                false
178            } else {
179                true
180            }
181        });
182
183        expired
184    }
185
186    /// Returns the number of in-progress reassemblies.
187    #[must_use]
188    pub fn pending_count(&self) -> usize {
189        self.pending.len()
190    }
191}
192
193impl Default for LargeEventReassembler {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    fn make_fragment(index: u32, total: u32, body: &str) -> EventFragment {
204        EventFragment {
205            split_info: SplitEventInfo {
206                fragment: index,
207                of: total,
208            },
209            body: body.to_string(),
210        }
211    }
212
213    #[test]
214    fn test_single_fragment_event() {
215        let mut r = LargeEventReassembler::new();
216        let result = r
217            .add_fragment("tok1", make_fragment(1, 1, "full_body"))
218            .unwrap();
219        assert_eq!(result.as_deref(), Some("full_body"));
220        assert_eq!(r.pending_count(), 0);
221    }
222
223    #[test]
224    fn test_two_fragment_event() {
225        let mut r = LargeEventReassembler::new();
226
227        let result = r
228            .add_fragment("tok1", make_fragment(1, 2, "first_"))
229            .unwrap();
230        assert!(result.is_none());
231        assert_eq!(r.pending_count(), 1);
232
233        let result = r
234            .add_fragment("tok1", make_fragment(2, 2, "second"))
235            .unwrap();
236        assert_eq!(result.as_deref(), Some("first_second"));
237        assert_eq!(r.pending_count(), 0);
238    }
239
240    #[test]
241    fn test_out_of_order_fragments() {
242        let mut r = LargeEventReassembler::new();
243
244        // Fragment 2 arrives before 1.
245        let result = r.add_fragment("tok1", make_fragment(2, 3, "B")).unwrap();
246        assert!(result.is_none());
247
248        let result = r.add_fragment("tok1", make_fragment(3, 3, "C")).unwrap();
249        assert!(result.is_none());
250
251        let result = r.add_fragment("tok1", make_fragment(1, 3, "A")).unwrap();
252        assert_eq!(result.as_deref(), Some("ABC"));
253    }
254
255    #[test]
256    fn test_fragment_out_of_range() {
257        let mut r = LargeEventReassembler::new();
258        let err = r
259            .add_fragment("tok1", make_fragment(0, 2, "body"))
260            .unwrap_err();
261        assert!(matches!(err, LargeEventError::FragmentOutOfRange { .. }));
262
263        let err = r
264            .add_fragment("tok1", make_fragment(3, 2, "body"))
265            .unwrap_err();
266        assert!(matches!(err, LargeEventError::FragmentOutOfRange { .. }));
267    }
268
269    #[test]
270    fn test_evict_expired() {
271        let mut r = LargeEventReassembler::with_timeout(Duration::from_millis(0));
272
273        r.add_fragment("tok1", make_fragment(1, 2, "partial"))
274            .unwrap();
275
276        // Immediate expiry.
277        std::thread::sleep(Duration::from_millis(1));
278        let expired = r.evict_expired();
279        assert_eq!(expired.len(), 1);
280        assert_eq!(expired[0].0, "tok1");
281        assert!(matches!(
282            expired[0].1,
283            LargeEventError::ReassemblyFailed {
284                expected: 2,
285                received: 1,
286            }
287        ));
288        assert_eq!(r.pending_count(), 0);
289    }
290
291    #[test]
292    fn test_multiple_concurrent_events() {
293        let mut r = LargeEventReassembler::new();
294
295        r.add_fragment("tok1", make_fragment(1, 2, "A1")).unwrap();
296        r.add_fragment("tok2", make_fragment(1, 2, "B1")).unwrap();
297        assert_eq!(r.pending_count(), 2);
298
299        let result = r.add_fragment("tok1", make_fragment(2, 2, "A2")).unwrap();
300        assert_eq!(result.as_deref(), Some("A1A2"));
301        assert_eq!(r.pending_count(), 1);
302
303        let result = r.add_fragment("tok2", make_fragment(2, 2, "B2")).unwrap();
304        assert_eq!(result.as_deref(), Some("B1B2"));
305        assert_eq!(r.pending_count(), 0);
306    }
307}