laminar_connectors/mongodb/
resume_token.rs1use std::path::PathBuf;
21
22use crate::error::ConnectorError;
23
24#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
29pub struct ResumeToken {
30 data: String,
32}
33
34impl ResumeToken {
35 #[must_use]
37 pub fn new(data: String) -> Self {
38 Self { data }
39 }
40
41 #[must_use]
43 pub fn as_str(&self) -> &str {
44 &self.data
45 }
46
47 #[must_use]
49 pub fn into_inner(self) -> String {
50 self.data
51 }
52}
53
54impl std::fmt::Display for ResumeToken {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.write_str(&self.data)
57 }
58}
59
60#[derive(Debug, thiserror::Error)]
62pub enum ResumeTokenStoreError {
63 #[error("resume token I/O error: {0}")]
65 Io(#[from] std::io::Error),
66
67 #[error("resume token serialization error: {0}")]
69 Serialization(String),
70
71 #[error("resume token store error: {0}")]
73 Store(String),
74}
75
76#[async_trait::async_trait]
86pub trait ResumeTokenStore: Send + Sync {
87 async fn load(&self) -> Result<Option<ResumeToken>, ResumeTokenStoreError>;
93
94 async fn save(&self, token: &ResumeToken) -> Result<(), ResumeTokenStoreError>;
100
101 async fn clear(&self) -> Result<(), ResumeTokenStoreError>;
107}
108
109#[derive(Debug, Clone)]
114pub struct FileResumeTokenStore {
115 path: PathBuf,
117}
118
119impl FileResumeTokenStore {
120 #[must_use]
122 pub fn new(path: PathBuf) -> Self {
123 Self { path }
124 }
125}
126
127#[async_trait::async_trait]
128impl ResumeTokenStore for FileResumeTokenStore {
129 async fn load(&self) -> Result<Option<ResumeToken>, ResumeTokenStoreError> {
130 let path = self.path.clone();
131 tokio::task::spawn_blocking(move || {
132 if !path.exists() {
133 return Ok(None);
134 }
135 let data = std::fs::read_to_string(&path)?;
136 let trimmed = data.trim();
137 if trimmed.is_empty() {
138 return Ok(None);
139 }
140 Ok(Some(ResumeToken::new(trimmed.to_string())))
141 })
142 .await
143 .map_err(|e| ResumeTokenStoreError::Store(format!("spawn_blocking join: {e}")))?
144 }
145
146 async fn save(&self, token: &ResumeToken) -> Result<(), ResumeTokenStoreError> {
147 let path = self.path.clone();
148 let data = token.as_str().to_string();
149 tokio::task::spawn_blocking(move || {
150 let tmp_path = path.with_extension("tmp");
153 std::fs::write(&tmp_path, &data)?;
154 #[cfg(windows)]
155 {
156 let _ = std::fs::remove_file(&path);
157 }
158 std::fs::rename(&tmp_path, &path)?;
159 Ok(())
160 })
161 .await
162 .map_err(|e| ResumeTokenStoreError::Store(format!("spawn_blocking join: {e}")))?
163 }
164
165 async fn clear(&self) -> Result<(), ResumeTokenStoreError> {
166 let path = self.path.clone();
167 tokio::task::spawn_blocking(move || {
168 if path.exists() {
169 std::fs::remove_file(&path)?;
170 }
171 Ok(())
172 })
173 .await
174 .map_err(|e| ResumeTokenStoreError::Store(format!("spawn_blocking join: {e}")))?
175 }
176}
177
178#[cfg(feature = "mongodb-cdc")]
184#[derive(Debug, Clone)]
185pub struct MongoResumeTokenStore {
186 collection: mongodb::Collection<mongodb::bson::Document>,
188 source_id: String,
190}
191
192#[cfg(feature = "mongodb-cdc")]
193impl MongoResumeTokenStore {
194 #[must_use]
196 pub fn new(
197 collection: mongodb::Collection<mongodb::bson::Document>,
198 source_id: String,
199 ) -> Self {
200 Self {
201 collection,
202 source_id,
203 }
204 }
205}
206
207#[cfg(feature = "mongodb-cdc")]
208#[async_trait::async_trait]
209impl ResumeTokenStore for MongoResumeTokenStore {
210 async fn load(&self) -> Result<Option<ResumeToken>, ResumeTokenStoreError> {
211 use mongodb::bson::doc;
212
213 let filter = doc! { "_id": &self.source_id };
214 let result = self
215 .collection
216 .find_one(filter)
217 .await
218 .map_err(|e| ResumeTokenStoreError::Store(e.to_string()))?;
219
220 match result {
221 Some(doc) => {
222 let token_str = doc
223 .get_str("token")
224 .map_err(|e| ResumeTokenStoreError::Serialization(e.to_string()))?;
225 Ok(Some(ResumeToken::new(token_str.to_string())))
226 }
227 None => Ok(None),
228 }
229 }
230
231 async fn save(&self, token: &ResumeToken) -> Result<(), ResumeTokenStoreError> {
232 use mongodb::bson::doc;
233
234 let filter = doc! { "_id": &self.source_id };
235 let update = doc! {
236 "$set": {
237 "token": token.as_str(),
238 "updated_at": mongodb::bson::DateTime::now(),
239 }
240 };
241 let opts = mongodb::options::UpdateOptions::builder()
242 .upsert(true)
243 .build();
244
245 self.collection
246 .update_one(filter, update)
247 .with_options(opts)
248 .await
249 .map_err(|e| ResumeTokenStoreError::Store(e.to_string()))?;
250
251 Ok(())
252 }
253
254 async fn clear(&self) -> Result<(), ResumeTokenStoreError> {
255 use mongodb::bson::doc;
256
257 let filter = doc! { "_id": &self.source_id };
258 self.collection
259 .delete_one(filter)
260 .await
261 .map_err(|e| ResumeTokenStoreError::Store(e.to_string()))?;
262
263 Ok(())
264 }
265}
266
267#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
269#[serde(tag = "type", rename_all = "snake_case")]
270pub enum ResumeTokenStoreConfig {
271 File {
273 path: String,
275 },
276 Mongo {
278 collection: String,
280 source_id: String,
282 },
283 #[default]
285 Memory,
286}
287
288#[derive(Debug, Default)]
290pub struct InMemoryResumeTokenStore {
291 token: parking_lot::Mutex<Option<ResumeToken>>,
295}
296
297impl InMemoryResumeTokenStore {
298 #[must_use]
300 pub fn new() -> Self {
301 Self::default()
302 }
303}
304
305#[async_trait::async_trait]
306impl ResumeTokenStore for InMemoryResumeTokenStore {
307 async fn load(&self) -> Result<Option<ResumeToken>, ResumeTokenStoreError> {
308 Ok(self.token.lock().clone())
309 }
310
311 async fn save(&self, token: &ResumeToken) -> Result<(), ResumeTokenStoreError> {
312 *self.token.lock() = Some(token.clone());
313 Ok(())
314 }
315
316 async fn clear(&self) -> Result<(), ResumeTokenStoreError> {
317 *self.token.lock() = None;
318 Ok(())
319 }
320}
321
322impl From<ResumeTokenStoreError> for ConnectorError {
323 fn from(e: ResumeTokenStoreError) -> Self {
324 ConnectorError::Internal(format!("resume token store: {e}"))
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_resume_token_new() {
334 let token = ResumeToken::new(r#"{"_data": "abc123"}"#.to_string());
335 assert_eq!(token.as_str(), r#"{"_data": "abc123"}"#);
336 }
337
338 #[test]
339 fn test_resume_token_display() {
340 let token = ResumeToken::new("tok123".to_string());
341 assert_eq!(token.to_string(), "tok123");
342 }
343
344 #[test]
345 fn test_resume_token_into_inner() {
346 let token = ResumeToken::new("data".to_string());
347 assert_eq!(token.into_inner(), "data");
348 }
349
350 #[test]
351 fn test_resume_token_store_config_default() {
352 assert!(matches!(
353 ResumeTokenStoreConfig::default(),
354 ResumeTokenStoreConfig::Memory
355 ));
356 }
357
358 #[tokio::test]
359 async fn test_file_store_roundtrip() {
360 let dir = tempfile::tempdir().unwrap();
361 let path = dir.path().join("token.json");
362 let store = FileResumeTokenStore::new(path);
363
364 assert!(store.load().await.unwrap().is_none());
366
367 let token = ResumeToken::new(r#"{"_data": "test"}"#.to_string());
369 store.save(&token).await.unwrap();
370 let loaded = store.load().await.unwrap().unwrap();
371 assert_eq!(loaded.as_str(), token.as_str());
372
373 store.clear().await.unwrap();
375 assert!(store.load().await.unwrap().is_none());
376 }
377
378 #[tokio::test]
379 async fn test_in_memory_store_roundtrip() {
380 let store = InMemoryResumeTokenStore::new();
381
382 assert!(store.load().await.unwrap().is_none());
383
384 let token = ResumeToken::new("test_token".to_string());
385 store.save(&token).await.unwrap();
386 let loaded = store.load().await.unwrap().unwrap();
387 assert_eq!(loaded.as_str(), "test_token");
388
389 store.clear().await.unwrap();
390 assert!(store.load().await.unwrap().is_none());
391 }
392}