Skip to main content

laminar_connectors/mongodb/
resume_token.rs

1//! `MongoDB` change stream resume token persistence.
2//!
3//! Resume tokens are opaque BSON documents that allow a change stream
4//! to be resumed from a specific position. This module provides a
5//! pluggable storage trait and two implementations:
6//!
7//! - [`FileResumeTokenStore`]: Persists to a local file (embedded/test use)
8//! - [`MongoResumeTokenStore`]: Persists to a dedicated `MongoDB` collection
9//!   (production use, feature-gated behind `mongodb-cdc`)
10//!
11//! # Resume Token Semantics
12//!
13//! - Track `postBatchResumeToken` from every `getMore` response, **not**
14//!   just individual event `_id` fields. This prevents unnecessary oplog
15//!   re-scanning across empty `getMore` batches.
16//! - On startup: if a persisted token exists, open with `resumeAfter`.
17//! - On `invalidate` events, switch to `startAfter` (cannot `resumeAfter`
18//!   an invalidate token).
19
20use std::path::PathBuf;
21
22use crate::error::ConnectorError;
23
24/// An opaque resume token from a `MongoDB` change stream.
25///
26/// Wraps the serialized JSON representation of the `_id` field from
27/// a change event or `postBatchResumeToken` from a `getMore` response.
28#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
29pub struct ResumeToken {
30    /// JSON-serialized token data.
31    data: String,
32}
33
34impl ResumeToken {
35    /// Creates a new resume token from a JSON string.
36    #[must_use]
37    pub fn new(data: String) -> Self {
38        Self { data }
39    }
40
41    /// Returns the JSON representation of the token.
42    #[must_use]
43    pub fn as_str(&self) -> &str {
44        &self.data
45    }
46
47    /// Consumes the token and returns the inner JSON string.
48    #[must_use]
49    pub fn into_inner(self) -> String {
50        self.data
51    }
52}
53
54impl std::fmt::Display for ResumeToken {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.write_str(&self.data)
57    }
58}
59
60/// Errors from resume token store operations.
61#[derive(Debug, thiserror::Error)]
62pub enum ResumeTokenStoreError {
63    /// I/O error reading/writing the token.
64    #[error("resume token I/O error: {0}")]
65    Io(#[from] std::io::Error),
66
67    /// Serialization error.
68    #[error("resume token serialization error: {0}")]
69    Serialization(String),
70
71    /// Store-specific error (e.g., `MongoDB` write failure).
72    #[error("resume token store error: {0}")]
73    Store(String),
74}
75
76/// Trait for pluggable resume token persistence.
77///
78/// Implementations must be `Send + Sync` for use from async contexts.
79///
80/// # Cancellation Safety
81///
82/// Implementations should ensure that `save` is atomic (write-then-rename
83/// for files, or upsert for databases) so that a cancelled future does
84/// not leave a partially-written token.
85#[async_trait::async_trait]
86pub trait ResumeTokenStore: Send + Sync {
87    /// Loads the most recently persisted resume token, if any.
88    ///
89    /// # Errors
90    ///
91    /// Returns `ResumeTokenStoreError` on I/O or deserialization failure.
92    async fn load(&self) -> Result<Option<ResumeToken>, ResumeTokenStoreError>;
93
94    /// Persists a resume token, overwriting any previous value.
95    ///
96    /// # Errors
97    ///
98    /// Returns `ResumeTokenStoreError` on I/O or serialization failure.
99    async fn save(&self, token: &ResumeToken) -> Result<(), ResumeTokenStoreError>;
100
101    /// Clears the persisted resume token.
102    ///
103    /// # Errors
104    ///
105    /// Returns `ResumeTokenStoreError` on I/O failure.
106    async fn clear(&self) -> Result<(), ResumeTokenStoreError>;
107}
108
109/// File-based resume token store for embedded and test use.
110///
111/// Stores the token as a JSON file at the configured path. Uses
112/// atomic write-then-rename to prevent partial writes.
113#[derive(Debug, Clone)]
114pub struct FileResumeTokenStore {
115    /// Path to the token file.
116    path: PathBuf,
117}
118
119impl FileResumeTokenStore {
120    /// Creates a new file-based store at the given path.
121    #[must_use]
122    pub fn new(path: PathBuf) -> Self {
123        Self { path }
124    }
125}
126
127#[async_trait::async_trait]
128impl ResumeTokenStore for FileResumeTokenStore {
129    async fn load(&self) -> Result<Option<ResumeToken>, ResumeTokenStoreError> {
130        let path = self.path.clone();
131        tokio::task::spawn_blocking(move || {
132            if !path.exists() {
133                return Ok(None);
134            }
135            let data = std::fs::read_to_string(&path)?;
136            let trimmed = data.trim();
137            if trimmed.is_empty() {
138                return Ok(None);
139            }
140            Ok(Some(ResumeToken::new(trimmed.to_string())))
141        })
142        .await
143        .map_err(|e| ResumeTokenStoreError::Store(format!("spawn_blocking join: {e}")))?
144    }
145
146    async fn save(&self, token: &ResumeToken) -> Result<(), ResumeTokenStoreError> {
147        let path = self.path.clone();
148        let data = token.as_str().to_string();
149        tokio::task::spawn_blocking(move || {
150            // Write to a temp file then rename for atomicity.
151            // On Windows, rename fails if the destination exists.
152            let tmp_path = path.with_extension("tmp");
153            std::fs::write(&tmp_path, &data)?;
154            #[cfg(windows)]
155            {
156                let _ = std::fs::remove_file(&path);
157            }
158            std::fs::rename(&tmp_path, &path)?;
159            Ok(())
160        })
161        .await
162        .map_err(|e| ResumeTokenStoreError::Store(format!("spawn_blocking join: {e}")))?
163    }
164
165    async fn clear(&self) -> Result<(), ResumeTokenStoreError> {
166        let path = self.path.clone();
167        tokio::task::spawn_blocking(move || {
168            if path.exists() {
169                std::fs::remove_file(&path)?;
170            }
171            Ok(())
172        })
173        .await
174        .map_err(|e| ResumeTokenStoreError::Store(format!("spawn_blocking join: {e}")))?
175    }
176}
177
178/// MongoDB-backed resume token store (feature-gated behind `mongodb-cdc`).
179///
180/// Persists the resume token to a dedicated collection using an upsert
181/// on a fixed document ID. This ensures exactly one token document exists
182/// per source instance.
183#[cfg(feature = "mongodb-cdc")]
184#[derive(Debug, Clone)]
185pub struct MongoResumeTokenStore {
186    /// The `MongoDB` collection used for token storage.
187    collection: mongodb::Collection<mongodb::bson::Document>,
188    /// Unique identifier for this source instance.
189    source_id: String,
190}
191
192#[cfg(feature = "mongodb-cdc")]
193impl MongoResumeTokenStore {
194    /// Creates a new MongoDB-backed token store.
195    #[must_use]
196    pub fn new(
197        collection: mongodb::Collection<mongodb::bson::Document>,
198        source_id: String,
199    ) -> Self {
200        Self {
201            collection,
202            source_id,
203        }
204    }
205}
206
207#[cfg(feature = "mongodb-cdc")]
208#[async_trait::async_trait]
209impl ResumeTokenStore for MongoResumeTokenStore {
210    async fn load(&self) -> Result<Option<ResumeToken>, ResumeTokenStoreError> {
211        use mongodb::bson::doc;
212
213        let filter = doc! { "_id": &self.source_id };
214        let result = self
215            .collection
216            .find_one(filter)
217            .await
218            .map_err(|e| ResumeTokenStoreError::Store(e.to_string()))?;
219
220        match result {
221            Some(doc) => {
222                let token_str = doc
223                    .get_str("token")
224                    .map_err(|e| ResumeTokenStoreError::Serialization(e.to_string()))?;
225                Ok(Some(ResumeToken::new(token_str.to_string())))
226            }
227            None => Ok(None),
228        }
229    }
230
231    async fn save(&self, token: &ResumeToken) -> Result<(), ResumeTokenStoreError> {
232        use mongodb::bson::doc;
233
234        let filter = doc! { "_id": &self.source_id };
235        let update = doc! {
236            "$set": {
237                "token": token.as_str(),
238                "updated_at": mongodb::bson::DateTime::now(),
239            }
240        };
241        let opts = mongodb::options::UpdateOptions::builder()
242            .upsert(true)
243            .build();
244
245        self.collection
246            .update_one(filter, update)
247            .with_options(opts)
248            .await
249            .map_err(|e| ResumeTokenStoreError::Store(e.to_string()))?;
250
251        Ok(())
252    }
253
254    async fn clear(&self) -> Result<(), ResumeTokenStoreError> {
255        use mongodb::bson::doc;
256
257        let filter = doc! { "_id": &self.source_id };
258        self.collection
259            .delete_one(filter)
260            .await
261            .map_err(|e| ResumeTokenStoreError::Store(e.to_string()))?;
262
263        Ok(())
264    }
265}
266
267/// Configuration for resume token persistence.
268#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
269#[serde(tag = "type", rename_all = "snake_case")]
270pub enum ResumeTokenStoreConfig {
271    /// File-based token storage (for embedded/test use).
272    File {
273        /// Path to the token file.
274        path: String,
275    },
276    /// `MongoDB`-based token storage (production).
277    Mongo {
278        /// Collection name for token storage (uses the same connection).
279        collection: String,
280        /// Unique source instance identifier.
281        source_id: String,
282    },
283    /// In-memory only (no persistence across restarts).
284    #[default]
285    Memory,
286}
287
288/// In-memory resume token store (no persistence, for testing).
289#[derive(Debug, Default)]
290pub struct InMemoryResumeTokenStore {
291    /// `parking_lot::Mutex` is safe here — none of the impl methods hold
292    /// the guard across an `.await`. The trait is async for parity with
293    /// the MongoDB-backed store which does actually await network I/O.
294    token: parking_lot::Mutex<Option<ResumeToken>>,
295}
296
297impl InMemoryResumeTokenStore {
298    /// Creates a new in-memory store.
299    #[must_use]
300    pub fn new() -> Self {
301        Self::default()
302    }
303}
304
305#[async_trait::async_trait]
306impl ResumeTokenStore for InMemoryResumeTokenStore {
307    async fn load(&self) -> Result<Option<ResumeToken>, ResumeTokenStoreError> {
308        Ok(self.token.lock().clone())
309    }
310
311    async fn save(&self, token: &ResumeToken) -> Result<(), ResumeTokenStoreError> {
312        *self.token.lock() = Some(token.clone());
313        Ok(())
314    }
315
316    async fn clear(&self) -> Result<(), ResumeTokenStoreError> {
317        *self.token.lock() = None;
318        Ok(())
319    }
320}
321
322impl From<ResumeTokenStoreError> for ConnectorError {
323    fn from(e: ResumeTokenStoreError) -> Self {
324        ConnectorError::Internal(format!("resume token store: {e}"))
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_resume_token_new() {
334        let token = ResumeToken::new(r#"{"_data": "abc123"}"#.to_string());
335        assert_eq!(token.as_str(), r#"{"_data": "abc123"}"#);
336    }
337
338    #[test]
339    fn test_resume_token_display() {
340        let token = ResumeToken::new("tok123".to_string());
341        assert_eq!(token.to_string(), "tok123");
342    }
343
344    #[test]
345    fn test_resume_token_into_inner() {
346        let token = ResumeToken::new("data".to_string());
347        assert_eq!(token.into_inner(), "data");
348    }
349
350    #[test]
351    fn test_resume_token_store_config_default() {
352        assert!(matches!(
353            ResumeTokenStoreConfig::default(),
354            ResumeTokenStoreConfig::Memory
355        ));
356    }
357
358    #[tokio::test]
359    async fn test_file_store_roundtrip() {
360        let dir = tempfile::tempdir().unwrap();
361        let path = dir.path().join("token.json");
362        let store = FileResumeTokenStore::new(path);
363
364        // Empty initially.
365        assert!(store.load().await.unwrap().is_none());
366
367        // Save and load.
368        let token = ResumeToken::new(r#"{"_data": "test"}"#.to_string());
369        store.save(&token).await.unwrap();
370        let loaded = store.load().await.unwrap().unwrap();
371        assert_eq!(loaded.as_str(), token.as_str());
372
373        // Clear.
374        store.clear().await.unwrap();
375        assert!(store.load().await.unwrap().is_none());
376    }
377
378    #[tokio::test]
379    async fn test_in_memory_store_roundtrip() {
380        let store = InMemoryResumeTokenStore::new();
381
382        assert!(store.load().await.unwrap().is_none());
383
384        let token = ResumeToken::new("test_token".to_string());
385        store.save(&token).await.unwrap();
386        let loaded = store.load().await.unwrap().unwrap();
387        assert_eq!(loaded.as_str(), "test_token");
388
389        store.clear().await.unwrap();
390        assert!(store.load().await.unwrap().is_none());
391    }
392}