1use std::collections::VecDeque;
19use std::sync::Arc;
20
21use arrow_array::builder::{StringBuilder, UInt32Builder};
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema, SchemaRef};
24use async_trait::async_trait;
25use tokio::sync::Notify;
26
27use crate::checkpoint::SourceCheckpoint;
28use crate::config::{ConnectorConfig, ConnectorState};
29use crate::connector::{SourceBatch, SourceConnector};
30use crate::error::ConnectorError;
31
32use super::change_event::{MongoDbChangeEvent, OperationType};
33use super::config::MongoDbSourceConfig;
34use super::metrics::MongoDbCdcMetrics;
35use super::resume_token::{InMemoryResumeTokenStore, ResumeToken, ResumeTokenStore};
36
37#[must_use]
51pub fn mongodb_cdc_envelope_schema() -> SchemaRef {
52 Arc::new(Schema::new(vec![
53 Field::new("_namespace", DataType::Utf8, false),
54 Field::new("_op", DataType::Utf8, false),
55 Field::new("_document_key", DataType::Utf8, false),
56 Field::new("_cluster_time_s", DataType::UInt32, false),
57 Field::new("_cluster_time_i", DataType::UInt32, false),
58 Field::new(
59 "_wall_time_ms",
60 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
61 false,
62 ),
63 Field::new("_full_document", DataType::Utf8, true),
64 Field::new("_update_desc", DataType::Utf8, true),
65 Field::new("_resume_token", DataType::Utf8, false),
66 ]))
67}
68
69pub struct MongoDbCdcSource {
87 config: MongoDbSourceConfig,
89
90 state: ConnectorState,
92
93 schema: SchemaRef,
95
96 metrics: Arc<MongoDbCdcMetrics>,
98
99 event_buffer: VecDeque<MongoDbChangeEvent>,
101
102 last_resume_token: Option<ResumeToken>,
104
105 resume_token_store: Box<dyn ResumeTokenStore>,
107
108 data_ready: Arc<Notify>,
110
111 invalidated: bool,
113
114 #[cfg(feature = "mongodb-cdc")]
116 reader_handle: Option<tokio::task::JoinHandle<()>>,
117
118 #[cfg(feature = "mongodb-cdc")]
120 event_rx: Option<ChangeStreamRx>,
121
122 #[cfg(feature = "mongodb-cdc")]
124 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
125}
126
127#[allow(dead_code)] enum ChangeStreamPayload {
130 Event(Box<MongoDbChangeEvent>),
132 Error(String),
134}
135
136#[cfg(feature = "mongodb-cdc")]
138type ChangeStreamTx = crossfire::MAsyncTx<crossfire::mpsc::Array<ChangeStreamPayload>>;
139#[cfg(feature = "mongodb-cdc")]
141type ChangeStreamRx = crossfire::AsyncRx<crossfire::mpsc::Array<ChangeStreamPayload>>;
142
143impl MongoDbCdcSource {
144 #[must_use]
146 pub fn new(config: MongoDbSourceConfig, registry: Option<&prometheus::Registry>) -> Self {
147 Self {
148 config,
149 state: ConnectorState::Created,
150 schema: mongodb_cdc_envelope_schema(),
151 metrics: Arc::new(MongoDbCdcMetrics::new(registry)),
152 event_buffer: VecDeque::new(),
153 last_resume_token: None,
154 resume_token_store: Box::new(InMemoryResumeTokenStore::new()),
155 data_ready: Arc::new(Notify::new()),
156 invalidated: false,
157 #[cfg(feature = "mongodb-cdc")]
158 reader_handle: None,
159 #[cfg(feature = "mongodb-cdc")]
160 event_rx: None,
161 #[cfg(feature = "mongodb-cdc")]
162 reader_shutdown: None,
163 }
164 }
165
166 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
172 let mongo_config = MongoDbSourceConfig::from_config(config)?;
173 Ok(Self::new(mongo_config, None))
174 }
175
176 #[must_use]
178 pub fn with_resume_token_store(mut self, store: Box<dyn ResumeTokenStore>) -> Self {
179 self.resume_token_store = store;
180 self
181 }
182
183 #[must_use]
185 pub fn config(&self) -> &MongoDbSourceConfig {
186 &self.config
187 }
188
189 #[must_use]
191 pub fn last_resume_token(&self) -> Option<&ResumeToken> {
192 self.last_resume_token.as_ref()
193 }
194
195 #[must_use]
197 pub fn buffered_events(&self) -> usize {
198 self.event_buffer.len()
199 }
200
201 #[must_use]
203 pub fn is_invalidated(&self) -> bool {
204 self.invalidated
205 }
206
207 pub fn enqueue_event(&mut self, event: MongoDbChangeEvent) {
210 if event.operation_type == OperationType::Invalidate {
211 self.invalidated = true;
212 }
213 self.metrics.record_event(event.operation_type.as_str());
214 self.event_buffer.push_back(event);
215 }
216
217 pub fn drain_to_batch(
224 &mut self,
225 max_records: usize,
226 ) -> Result<Option<SourceBatch>, ConnectorError> {
227 if self.event_buffer.is_empty() {
228 return Ok(None);
229 }
230
231 let count = max_records.min(self.event_buffer.len());
232 let events: Vec<MongoDbChangeEvent> = self.event_buffer.drain(..count).collect();
233
234 let batch = events_to_record_batch(&events, &self.schema)?;
235 self.metrics.record_batch();
236
237 if let Some(last) = events.last() {
239 self.last_resume_token = Some(ResumeToken::new(last.resume_token.clone()));
240 }
241
242 Ok(Some(SourceBatch::new(batch)))
243 }
244}
245
246fn events_to_record_batch(
248 events: &[MongoDbChangeEvent],
249 schema: &SchemaRef,
250) -> Result<RecordBatch, ConnectorError> {
251 let len = events.len();
252
253 let mut ns_builder = StringBuilder::with_capacity(len, len * 32);
254 let mut op_builder = StringBuilder::with_capacity(len, len * 4);
255 let mut dk_builder = StringBuilder::with_capacity(len, len * 64);
256 let mut cts_builder = UInt32Builder::with_capacity(len);
257 let mut ct_inc_builder = UInt32Builder::with_capacity(len);
258 let mut wt_builder = arrow_array::builder::TimestampMillisecondBuilder::with_capacity(len);
259 let mut fd_builder = StringBuilder::with_capacity(len, len * 128);
260 let mut ud_builder = StringBuilder::with_capacity(len, len * 64);
261 let mut rt_builder = StringBuilder::with_capacity(len, len * 64);
262
263 for event in events {
264 ns_builder.append_value(event.namespace.full_name());
265 op_builder.append_value(event.operation_type.as_str());
266 dk_builder.append_value(&event.document_key);
267 cts_builder.append_value(event.cluster_time_secs);
268 ct_inc_builder.append_value(event.cluster_time_inc);
269 wt_builder.append_value(event.wall_time_ms);
270
271 match &event.full_document {
272 Some(doc) => fd_builder.append_value(doc),
273 None => fd_builder.append_null(),
274 }
275
276 match &event.update_description {
277 Some(desc) => {
278 let json = serde_json::to_string(desc)
279 .map_err(|e| ConnectorError::Internal(format!("serialize update_desc: {e}")))?;
280 ud_builder.append_value(&json);
281 }
282 None => ud_builder.append_null(),
283 }
284
285 rt_builder.append_value(&event.resume_token);
286 }
287
288 RecordBatch::try_new(
289 Arc::clone(schema),
290 vec![
291 Arc::new(ns_builder.finish()),
292 Arc::new(op_builder.finish()),
293 Arc::new(dk_builder.finish()),
294 Arc::new(cts_builder.finish()),
295 Arc::new(ct_inc_builder.finish()),
296 Arc::new(wt_builder.finish()),
297 Arc::new(fd_builder.finish()),
298 Arc::new(ud_builder.finish()),
299 Arc::new(rt_builder.finish()),
300 ],
301 )
302 .map_err(|e| ConnectorError::Internal(format!("arrow batch: {e}")))
303}
304
305#[async_trait]
306impl SourceConnector for MongoDbCdcSource {
307 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
308 self.config.validate()?;
309
310 let persisted_token = self
312 .resume_token_store
313 .load()
314 .await
315 .map_err(|e| ConnectorError::Internal(format!("load resume token: {e}")))?;
316
317 if let Some(token) = persisted_token {
318 tracing::info!(resume_token = %token, "resuming from persisted token");
319 self.last_resume_token = Some(token);
320 }
321
322 #[cfg(feature = "mongodb-cdc")]
324 {
325 self.start_change_stream_reader().await?;
326 }
327
328 self.state = ConnectorState::Running;
329 tracing::info!(
330 database = %self.config.database,
331 collection = %self.config.collection,
332 full_document_mode = ?self.config.full_document_mode,
333 "MongoDB CDC source opened"
334 );
335
336 Ok(())
337 }
338
339 async fn poll_batch(
340 &mut self,
341 max_records: usize,
342 ) -> Result<Option<SourceBatch>, ConnectorError> {
343 #[cfg(feature = "mongodb-cdc")]
344 {
345 self.drain_channel()?;
346 }
347
348 self.drain_to_batch(max_records)
349 }
350
351 fn schema(&self) -> SchemaRef {
352 Arc::clone(&self.schema)
353 }
354
355 fn checkpoint(&self) -> SourceCheckpoint {
356 let mut cp = SourceCheckpoint::new(0);
357 if let Some(ref token) = self.last_resume_token {
358 cp.set_offset("resume_token", token.as_str());
359 }
360 cp.set_metadata("connector", "mongodb-cdc");
361 cp.set_metadata("database", &self.config.database);
362 cp.set_metadata("collection", &self.config.collection);
363 cp
364 }
365
366 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
367 if let Some(token_str) = checkpoint.get_offset("resume_token") {
368 let token = ResumeToken::new(token_str.to_string());
369 tracing::info!(resume_token = %token, "restoring from checkpoint");
370 self.last_resume_token = Some(token.clone());
371 self.resume_token_store
372 .save(&token)
373 .await
374 .map_err(|e| ConnectorError::Internal(format!("save resume token: {e}")))?;
375 }
376 Ok(())
377 }
378
379 async fn close(&mut self) -> Result<(), ConnectorError> {
380 if let Some(ref token) = self.last_resume_token {
382 if let Err(e) = self.resume_token_store.save(token).await {
383 tracing::warn!(error = %e, "failed to persist resume token on close");
384 }
385 }
386
387 #[cfg(feature = "mongodb-cdc")]
389 {
390 if let Some(tx) = self.reader_shutdown.take() {
391 let _ = tx.send(true);
392 }
393 if let Some(handle) = self.reader_handle.take() {
394 let _ = handle.await;
395 }
396 }
397
398 self.state = ConnectorState::Closed;
399 tracing::info!("MongoDB CDC source closed");
400 Ok(())
401 }
402
403 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
404 Some(Arc::clone(&self.data_ready))
405 }
406
407 fn supports_replay(&self) -> bool {
408 true
410 }
411}
412
413#[cfg(feature = "mongodb-cdc")]
416impl MongoDbCdcSource {
417 async fn start_change_stream_reader(&mut self) -> Result<(), ConnectorError> {
419 use mongodb::options::ClientOptions;
420
421 let client_options = ClientOptions::parse(&self.config.connection_uri)
422 .await
423 .map_err(|e| ConnectorError::ConnectionFailed(format!("parse URI: {e}")))?;
424
425 if let Some(pool) = client_options.max_pool_size {
427 if pool <= 1 {
428 tracing::warn!(
429 max_pool_size = pool,
430 "max_pool_size is very small — on sharded clusters, \
431 mongos opens per-shard cursors and may exhaust the pool"
432 );
433 }
434 }
435
436 let client = mongodb::Client::with_options(client_options)
437 .map_err(|e| ConnectorError::ConnectionFailed(format!("create client: {e}")))?;
438
439 let db = client.database(&self.config.database);
440
441 if self.config.collection != "*" {
443 preflight_timeseries_guard(&db, &self.config.database, &self.config.collection).await?;
444 }
445
446 let (tx, rx) =
447 crossfire::mpsc::bounded_async::<ChangeStreamPayload>(self.config.max_buffered_events);
448 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
449
450 let config = self.config.clone();
451 let resume_token = self.last_resume_token.clone();
452 let data_ready = Arc::clone(&self.data_ready);
453 let metrics = Arc::clone(&self.metrics);
454
455 let handle = tokio::spawn(async move {
456 if let Err(e) = run_change_stream_reader(
457 db,
458 config,
459 resume_token,
460 tx,
461 shutdown_rx,
462 data_ready,
463 metrics,
464 )
465 .await
466 {
467 tracing::error!(error = %e, "change stream reader task failed");
468 }
469 });
470
471 self.reader_handle = Some(handle);
472 self.event_rx = Some(rx);
473 self.reader_shutdown = Some(shutdown_tx);
474
475 Ok(())
476 }
477
478 fn drain_channel(&mut self) -> Result<(), ConnectorError> {
480 let mut payloads = Vec::new();
482 if let Some(rx) = &mut self.event_rx {
483 while let Ok(payload) = rx.try_recv() {
484 payloads.push(payload);
485 }
486 }
487
488 for payload in payloads {
489 match payload {
490 ChangeStreamPayload::Event(event) => {
491 self.enqueue_event(*event);
492 }
493 ChangeStreamPayload::Error(msg) => {
494 self.metrics.record_error();
495 return Err(ConnectorError::ReadError(msg));
496 }
497 }
498 }
499 Ok(())
500 }
501}
502
503#[cfg(feature = "mongodb-cdc")]
505async fn preflight_timeseries_guard(
506 db: &mongodb::Database,
507 database: &str,
508 collection: &str,
509) -> Result<(), ConnectorError> {
510 use futures_util::StreamExt;
511 use mongodb::bson::doc;
512
513 let filter = doc! { "name": collection };
514 let mut cursor = db
515 .list_collections()
516 .filter(filter)
517 .await
518 .map_err(|e| ConnectorError::ConnectionFailed(format!("list collections: {e}")))?;
519 if let Some(result) = cursor.next().await {
520 let spec = result
521 .map_err(|e| ConnectorError::ConnectionFailed(format!("read collection spec: {e}")))?;
522 if spec.collection_type == mongodb::results::CollectionType::Timeseries {
524 return Err(ConnectorError::ConfigurationError(format!(
525 "time series collection {database}.{collection} does not support change streams"
526 )));
527 }
528 }
529
530 Ok(())
531}
532
533#[cfg(feature = "mongodb-cdc")]
535const MAX_FAILURES: u32 = 10;
536
537#[cfg(feature = "mongodb-cdc")]
543#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
544async fn run_change_stream_reader(
545 db: mongodb::Database,
546 config: MongoDbSourceConfig,
547 resume_token: Option<ResumeToken>,
548 tx: ChangeStreamTx,
549 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
550 data_ready: Arc<Notify>,
551 metrics: Arc<MongoDbCdcMetrics>,
552) -> Result<(), ConnectorError> {
553 use futures_util::StreamExt;
554 use mongodb::options::ChangeStreamOptions;
555
556 let full_document = match config.full_document_mode {
557 super::config::FullDocumentMode::Delta => None,
558 super::config::FullDocumentMode::UpdateLookup => {
559 Some(mongodb::options::FullDocumentType::UpdateLookup)
560 }
561 super::config::FullDocumentMode::RequirePostImage => {
562 Some(mongodb::options::FullDocumentType::Required)
563 }
564 super::config::FullDocumentMode::WhenAvailable => {
565 Some(mongodb::options::FullDocumentType::WhenAvailable)
566 }
567 };
568
569 let mut last_token: Option<mongodb::change_stream::event::ResumeToken> =
571 if let Some(ref token) = resume_token {
572 serde_json::from_str::<mongodb::change_stream::event::ResumeToken>(token.as_str())
573 .map(Some)
574 .map_err(|e| ConnectorError::ReadError(format!("parse resume token: {e}")))?
575 } else {
576 None
577 };
578
579 let pipeline: Vec<mongodb::bson::Document> = config
583 .pipeline
584 .iter()
585 .filter_map(|v| mongodb::bson::to_document(v).ok())
586 .collect();
587
588 let mut current_db = db;
589 let mut consecutive_failures: u32 = 0;
590
591 'reconnect: loop {
592 let mut options = ChangeStreamOptions::default();
594 options.full_document = full_document.clone();
595 options.max_await_time = config
596 .max_await_time_ms
597 .map(std::time::Duration::from_millis);
598 options.batch_size = config.batch_size;
599 options.resume_after = last_token.clone();
600
601 if last_token.is_none() {
603 if let Some((secs, inc)) = config.start_at_operation_time {
604 options.start_at_operation_time = Some(mongodb::bson::Timestamp {
605 time: secs,
606 increment: inc,
607 });
608 }
609 }
610
611 let cursor_result = if config.collection == "*" {
613 current_db
614 .watch()
615 .pipeline(pipeline.clone())
616 .with_options(options)
617 .await
618 } else {
619 current_db
620 .collection::<mongodb::bson::Document>(&config.collection)
621 .watch()
622 .pipeline(pipeline.clone())
623 .with_options(options)
624 .await
625 };
626
627 let mut cursor = match cursor_result {
628 Ok(c) => c,
629 Err(e) => {
630 consecutive_failures += 1;
631 if consecutive_failures >= MAX_FAILURES {
632 let msg =
633 format!("change stream open failed after {MAX_FAILURES} attempts: {e}");
634 tracing::error!(%msg);
635 let _ = tx.send(ChangeStreamPayload::Error(msg)).await;
636 break 'reconnect;
637 }
638 let backoff = crate::retry::Backoff::broker_reconnect().delay(consecutive_failures);
639 tracing::warn!(
640 attempt = consecutive_failures,
641 ?backoff,
642 error = %e,
643 "failed to open change stream, retrying"
644 );
645 metrics.record_reconnect();
646 tokio::select! {
647 _ = shutdown_rx.changed() => {
648 if *shutdown_rx.borrow() {
649 break 'reconnect;
650 }
651 }
652 () = tokio::time::sleep(backoff) => {}
653 }
654 continue 'reconnect;
655 }
656 };
657
658 tracing::info!(
659 database = %config.database,
660 collection = %config.collection,
661 resumed = last_token.is_some(),
662 "change stream reader started"
663 );
664
665 'recv: loop {
666 tokio::select! {
667 biased;
668 _ = shutdown_rx.changed() => {
669 if *shutdown_rx.borrow() {
670 tracing::info!("change stream reader shutting down");
671 break 'reconnect;
672 }
673 }
674 next = cursor.next() => {
675 match next {
676 Some(Ok(cs_event)) => {
677 consecutive_failures = 0;
678
679 last_token = Some(cs_event.id.clone());
681
682 let change_event = parse_change_stream_event(&cs_event);
683
684 if tx
685 .send(ChangeStreamPayload::Event(Box::new(change_event)))
686 .await
687 .is_err()
688 {
689 tracing::warn!("source channel closed, stopping reader");
690 break 'reconnect;
691 }
692 data_ready.notify_one();
693 }
694 Some(Err(e)) => {
695 tracing::error!(error = %e, "change stream error");
696 break 'recv;
697 }
698 None => {
699 tracing::info!("change stream cursor exhausted");
700 consecutive_failures = 0;
701 break 'recv;
702 }
703 }
704 }
705 }
706 }
707
708 consecutive_failures += 1;
710 if consecutive_failures >= MAX_FAILURES {
711 let msg = format!("change stream failed after {MAX_FAILURES} consecutive failures");
712 tracing::error!(%msg);
713 let _ = tx.send(ChangeStreamPayload::Error(msg)).await;
714 break 'reconnect;
715 }
716
717 let backoff = crate::retry::Backoff::broker_reconnect().delay(consecutive_failures);
718 tracing::warn!(
719 resume_token = ?last_token,
720 attempt = consecutive_failures,
721 ?backoff,
722 "reconnecting change stream"
723 );
724 metrics.record_reconnect();
725
726 tokio::select! {
728 _ = shutdown_rx.changed() => {
729 if *shutdown_rx.borrow() {
730 break 'reconnect;
731 }
732 }
733 () = tokio::time::sleep(backoff) => {}
734 }
735
736 match mongodb::options::ClientOptions::parse(&config.connection_uri).await {
738 Ok(new_opts) => match mongodb::Client::with_options(new_opts) {
739 Ok(new_client) => {
740 current_db = new_client.database(&config.database);
741 }
742 Err(e) => {
743 tracing::warn!(error = %e, "failed to create client on reconnect");
744 }
745 },
746 Err(e) => {
747 tracing::warn!(error = %e, "failed to parse URI on reconnect");
748 }
749 }
750 }
751
752 Ok(())
753}
754
755#[cfg(feature = "mongodb-cdc")]
757fn parse_change_stream_event(
758 event: &mongodb::change_stream::event::ChangeStreamEvent<mongodb::bson::Document>,
759) -> MongoDbChangeEvent {
760 use super::change_event::{Namespace, UpdateDescription};
761 use mongodb::change_stream::event::OperationType as MongoOpType;
762
763 let operation_type = match event.operation_type {
764 MongoOpType::Insert => OperationType::Insert,
765 MongoOpType::Update => OperationType::Update,
766 MongoOpType::Replace => OperationType::Replace,
767 MongoOpType::Delete => OperationType::Delete,
768 MongoOpType::Drop => OperationType::Drop,
769 MongoOpType::Rename => OperationType::Rename,
770 MongoOpType::Invalidate => OperationType::Invalidate,
771 MongoOpType::DropDatabase => OperationType::DropDatabase,
772 ref other => {
773 tracing::warn!(?other, "unmapped MongoDB operation type");
774 OperationType::Other(format!("{other:?}"))
775 }
776 };
777
778 let namespace = event.ns.as_ref().map_or_else(
779 || Namespace {
780 db: String::new(),
781 coll: String::new(),
782 },
783 |ns| Namespace {
784 db: ns.db.clone(),
785 coll: ns.coll.clone().unwrap_or_default(),
786 },
787 );
788
789 let document_key = event
790 .document_key
791 .as_ref()
792 .and_then(|d| serde_json::to_string(d).ok())
793 .unwrap_or_default();
794
795 let full_document = event
796 .full_document
797 .as_ref()
798 .and_then(|d| serde_json::to_string(d).ok());
799
800 let update_description = event.update_description.as_ref().map(|ud| {
801 let updated_fields = ud
802 .updated_fields
803 .iter()
804 .filter_map(|(k, v)| serde_json::to_value(v).ok().map(|jv| (k.clone(), jv)))
805 .collect();
806
807 let removed_fields = ud.removed_fields.clone();
808
809 #[allow(clippy::cast_sign_loss)]
810 let truncated_arrays = ud
811 .truncated_arrays
812 .as_deref()
813 .unwrap_or_default()
814 .iter()
815 .map(|t| super::change_event::TruncatedArray {
816 field: t.field.clone(),
817 new_size: t.new_size as u32,
818 })
819 .collect();
820
821 UpdateDescription {
822 updated_fields,
823 removed_fields,
824 truncated_arrays,
825 }
826 });
827
828 let (cluster_time_secs, cluster_time_inc) = event
829 .cluster_time
830 .map_or((0, 0), |ts| (ts.time, ts.increment));
831
832 let wall_time_ms = event
833 .wall_time
834 .map_or(0, mongodb::bson::DateTime::timestamp_millis);
835
836 let resume_token = serde_json::to_string(&event.id).unwrap_or_default();
838
839 MongoDbChangeEvent {
840 operation_type,
841 namespace,
842 document_key,
843 full_document,
844 update_description,
845 cluster_time_secs,
846 cluster_time_inc,
847 resume_token,
848 wall_time_ms,
849 }
850}
851
852#[cfg(test)]
853mod tests {
854 use super::super::change_event::Namespace;
855 use super::*;
856
857 fn sample_event(op: OperationType) -> MongoDbChangeEvent {
858 MongoDbChangeEvent {
859 operation_type: op,
860 namespace: Namespace {
861 db: "testdb".to_string(),
862 coll: "users".to_string(),
863 },
864 document_key: r#"{"_id": "1"}"#.to_string(),
865 full_document: Some(r#"{"_id": "1", "name": "Alice"}"#.to_string()),
866 update_description: None,
867 cluster_time_secs: 1_700_000_000,
868 cluster_time_inc: 1,
869 resume_token: r#"{"_data": "token1"}"#.to_string(),
870 wall_time_ms: 1_700_000_000_000,
871 }
872 }
873
874 #[test]
875 fn test_schema() {
876 let schema = mongodb_cdc_envelope_schema();
877 assert_eq!(schema.fields().len(), 9);
878 assert_eq!(schema.field(0).name(), "_namespace");
879 assert_eq!(schema.field(6).name(), "_full_document");
880 assert!(schema.field(6).is_nullable());
881 }
882
883 #[test]
884 fn test_new_source() {
885 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
886 let source = MongoDbCdcSource::new(config, None);
887 assert_eq!(source.buffered_events(), 0);
888 assert!(!source.is_invalidated());
889 assert!(source.last_resume_token().is_none());
890 }
891
892 #[test]
893 fn test_enqueue_event() {
894 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
895 let mut source = MongoDbCdcSource::new(config, None);
896
897 source.enqueue_event(sample_event(OperationType::Insert));
898 assert_eq!(source.buffered_events(), 1);
899 assert!(!source.is_invalidated());
900 }
901
902 #[test]
903 fn test_enqueue_invalidate() {
904 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
905 let mut source = MongoDbCdcSource::new(config, None);
906
907 let mut event = sample_event(OperationType::Invalidate);
908 event.full_document = None;
909 source.enqueue_event(event);
910 assert!(source.is_invalidated());
911 }
912
913 #[test]
914 fn test_events_to_record_batch() {
915 let schema = mongodb_cdc_envelope_schema();
916 let events = vec![
917 sample_event(OperationType::Insert),
918 sample_event(OperationType::Delete),
919 ];
920
921 let batch = events_to_record_batch(&events, &schema).unwrap();
922 assert_eq!(batch.num_rows(), 2);
923 assert_eq!(batch.num_columns(), 9);
924 }
925
926 #[test]
927 fn test_events_to_record_batch_empty() {
928 let schema = mongodb_cdc_envelope_schema();
929 let batch = events_to_record_batch(&[], &schema).unwrap();
930 assert_eq!(batch.num_rows(), 0);
931 assert_eq!(batch.num_columns(), 9);
932 }
933
934 #[test]
935 fn test_drain_to_batch() {
936 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
937 let mut source = MongoDbCdcSource::new(config, None);
938
939 assert!(source.drain_to_batch(10).unwrap().is_none());
941
942 for _ in 0..5 {
944 source.enqueue_event(sample_event(OperationType::Insert));
945 }
946 let batch = source.drain_to_batch(3).unwrap().unwrap();
947 assert_eq!(batch.num_rows(), 3);
948 assert_eq!(source.buffered_events(), 2);
949
950 let batch = source.drain_to_batch(10).unwrap().unwrap();
952 assert_eq!(batch.num_rows(), 2);
953 assert_eq!(source.buffered_events(), 0);
954 }
955
956 #[test]
957 fn test_checkpoint() {
958 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "testdb", "users");
959 let mut source = MongoDbCdcSource::new(config, None);
960
961 let cp = source.checkpoint();
963 assert!(cp.get_offset("resume_token").is_none());
964 assert_eq!(cp.get_metadata("connector"), Some("mongodb-cdc"));
965
966 source.last_resume_token = Some(ResumeToken::new("tok123".to_string()));
968 let cp = source.checkpoint();
969 assert_eq!(cp.get_offset("resume_token"), Some("tok123"));
970 }
971
972 #[tokio::test]
973 async fn test_restore_checkpoint() {
974 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
975 let mut source = MongoDbCdcSource::new(config, None);
976
977 let mut cp = SourceCheckpoint::new(0);
978 cp.set_offset("resume_token", "restored_token");
979
980 source.restore(&cp).await.unwrap();
981 assert_eq!(
982 source.last_resume_token().unwrap().as_str(),
983 "restored_token"
984 );
985 }
986
987 #[test]
988 fn test_drain_tracks_resume_token() {
989 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
990 let mut source = MongoDbCdcSource::new(config, None);
991
992 let mut event = sample_event(OperationType::Insert);
993 event.resume_token = r#"{"_data": "final_token"}"#.to_string();
994 source.enqueue_event(event);
995
996 source.drain_to_batch(10).unwrap();
997 assert_eq!(
998 source.last_resume_token().unwrap().as_str(),
999 r#"{"_data": "final_token"}"#
1000 );
1001 }
1002}