1use std::collections::VecDeque;
19use std::sync::Arc;
20
21use arrow_array::builder::{StringBuilder, UInt32Builder};
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema, SchemaRef};
24use async_trait::async_trait;
25use tokio::sync::Notify;
26
27use crate::checkpoint::SourceCheckpoint;
28use crate::config::{ConnectorConfig, ConnectorState};
29use crate::connector::{SourceBatch, SourceConnector};
30use crate::error::ConnectorError;
31use crate::health::HealthStatus;
32use crate::metrics::ConnectorMetrics;
33
34use super::change_event::{MongoDbChangeEvent, OperationType};
35use super::config::MongoDbSourceConfig;
36use super::metrics::MongoDbCdcMetrics;
37use super::resume_token::{InMemoryResumeTokenStore, ResumeToken, ResumeTokenStore};
38
39#[must_use]
53pub fn mongodb_cdc_envelope_schema() -> SchemaRef {
54 Arc::new(Schema::new(vec![
55 Field::new("_namespace", DataType::Utf8, false),
56 Field::new("_op", DataType::Utf8, false),
57 Field::new("_document_key", DataType::Utf8, false),
58 Field::new("_cluster_time_s", DataType::UInt32, false),
59 Field::new("_cluster_time_i", DataType::UInt32, false),
60 Field::new(
61 "_wall_time_ms",
62 DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
63 false,
64 ),
65 Field::new("_full_document", DataType::Utf8, true),
66 Field::new("_update_desc", DataType::Utf8, true),
67 Field::new("_resume_token", DataType::Utf8, false),
68 ]))
69}
70
71pub struct MongoDbCdcSource {
89 config: MongoDbSourceConfig,
91
92 state: ConnectorState,
94
95 schema: SchemaRef,
97
98 metrics: Arc<MongoDbCdcMetrics>,
100
101 event_buffer: VecDeque<MongoDbChangeEvent>,
103
104 last_resume_token: Option<ResumeToken>,
106
107 resume_token_store: Box<dyn ResumeTokenStore>,
109
110 data_ready: Arc<Notify>,
112
113 invalidated: bool,
115
116 #[cfg(feature = "mongodb-cdc")]
118 reader_handle: Option<tokio::task::JoinHandle<()>>,
119
120 #[cfg(feature = "mongodb-cdc")]
122 event_rx: Option<ChangeStreamRx>,
123
124 #[cfg(feature = "mongodb-cdc")]
126 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
127}
128
129#[allow(dead_code)]
131enum ChangeStreamPayload {
132 Event(Box<MongoDbChangeEvent>),
134 Error(String),
136}
137
138#[cfg(feature = "mongodb-cdc")]
140type ChangeStreamTx = crossfire::MAsyncTx<crossfire::mpsc::Array<ChangeStreamPayload>>;
141#[cfg(feature = "mongodb-cdc")]
143type ChangeStreamRx = crossfire::AsyncRx<crossfire::mpsc::Array<ChangeStreamPayload>>;
144
145impl MongoDbCdcSource {
146 #[must_use]
148 pub fn new(config: MongoDbSourceConfig, registry: Option<&prometheus::Registry>) -> Self {
149 Self {
150 config,
151 state: ConnectorState::Created,
152 schema: mongodb_cdc_envelope_schema(),
153 metrics: Arc::new(MongoDbCdcMetrics::new(registry)),
154 event_buffer: VecDeque::new(),
155 last_resume_token: None,
156 resume_token_store: Box::new(InMemoryResumeTokenStore::new()),
157 data_ready: Arc::new(Notify::new()),
158 invalidated: false,
159 #[cfg(feature = "mongodb-cdc")]
160 reader_handle: None,
161 #[cfg(feature = "mongodb-cdc")]
162 event_rx: None,
163 #[cfg(feature = "mongodb-cdc")]
164 reader_shutdown: None,
165 }
166 }
167
168 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
174 let mongo_config = MongoDbSourceConfig::from_config(config)?;
175 Ok(Self::new(mongo_config, None))
176 }
177
178 #[must_use]
180 pub fn with_resume_token_store(mut self, store: Box<dyn ResumeTokenStore>) -> Self {
181 self.resume_token_store = store;
182 self
183 }
184
185 #[must_use]
187 pub fn config(&self) -> &MongoDbSourceConfig {
188 &self.config
189 }
190
191 #[must_use]
193 pub fn last_resume_token(&self) -> Option<&ResumeToken> {
194 self.last_resume_token.as_ref()
195 }
196
197 #[must_use]
199 pub fn buffered_events(&self) -> usize {
200 self.event_buffer.len()
201 }
202
203 #[must_use]
205 pub fn is_invalidated(&self) -> bool {
206 self.invalidated
207 }
208
209 pub fn enqueue_event(&mut self, event: MongoDbChangeEvent) {
212 if event.operation_type == OperationType::Invalidate {
213 self.invalidated = true;
214 }
215 self.metrics.record_event(event.operation_type.as_str());
216 self.event_buffer.push_back(event);
217 }
218
219 pub fn drain_to_batch(
226 &mut self,
227 max_records: usize,
228 ) -> Result<Option<SourceBatch>, ConnectorError> {
229 if self.event_buffer.is_empty() {
230 return Ok(None);
231 }
232
233 let count = max_records.min(self.event_buffer.len());
234 let events: Vec<MongoDbChangeEvent> = self.event_buffer.drain(..count).collect();
235
236 let batch = events_to_record_batch(&events, &self.schema)?;
237 self.metrics.record_batch();
238
239 if let Some(last) = events.last() {
241 self.last_resume_token = Some(ResumeToken::new(last.resume_token.clone()));
242 }
243
244 Ok(Some(SourceBatch::new(batch)))
245 }
246}
247
248fn events_to_record_batch(
250 events: &[MongoDbChangeEvent],
251 schema: &SchemaRef,
252) -> Result<RecordBatch, ConnectorError> {
253 let len = events.len();
254
255 let mut ns_builder = StringBuilder::with_capacity(len, len * 32);
256 let mut op_builder = StringBuilder::with_capacity(len, len * 4);
257 let mut dk_builder = StringBuilder::with_capacity(len, len * 64);
258 let mut cts_builder = UInt32Builder::with_capacity(len);
259 let mut ct_inc_builder = UInt32Builder::with_capacity(len);
260 let mut wt_builder = arrow_array::builder::TimestampMillisecondBuilder::with_capacity(len);
261 let mut fd_builder = StringBuilder::with_capacity(len, len * 128);
262 let mut ud_builder = StringBuilder::with_capacity(len, len * 64);
263 let mut rt_builder = StringBuilder::with_capacity(len, len * 64);
264
265 for event in events {
266 ns_builder.append_value(event.namespace.full_name());
267 op_builder.append_value(event.operation_type.as_str());
268 dk_builder.append_value(&event.document_key);
269 cts_builder.append_value(event.cluster_time_secs);
270 ct_inc_builder.append_value(event.cluster_time_inc);
271 wt_builder.append_value(event.wall_time_ms);
272
273 match &event.full_document {
274 Some(doc) => fd_builder.append_value(doc),
275 None => fd_builder.append_null(),
276 }
277
278 match &event.update_description {
279 Some(desc) => {
280 let json = serde_json::to_string(desc)
281 .map_err(|e| ConnectorError::Internal(format!("serialize update_desc: {e}")))?;
282 ud_builder.append_value(&json);
283 }
284 None => ud_builder.append_null(),
285 }
286
287 rt_builder.append_value(&event.resume_token);
288 }
289
290 RecordBatch::try_new(
291 Arc::clone(schema),
292 vec![
293 Arc::new(ns_builder.finish()),
294 Arc::new(op_builder.finish()),
295 Arc::new(dk_builder.finish()),
296 Arc::new(cts_builder.finish()),
297 Arc::new(ct_inc_builder.finish()),
298 Arc::new(wt_builder.finish()),
299 Arc::new(fd_builder.finish()),
300 Arc::new(ud_builder.finish()),
301 Arc::new(rt_builder.finish()),
302 ],
303 )
304 .map_err(|e| ConnectorError::Internal(format!("arrow batch: {e}")))
305}
306
307#[async_trait]
308impl SourceConnector for MongoDbCdcSource {
309 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
310 self.config.validate()?;
311
312 let persisted_token = self
314 .resume_token_store
315 .load()
316 .await
317 .map_err(|e| ConnectorError::Internal(format!("load resume token: {e}")))?;
318
319 if let Some(token) = persisted_token {
320 tracing::info!(resume_token = %token, "resuming from persisted token");
321 self.last_resume_token = Some(token);
322 }
323
324 #[cfg(feature = "mongodb-cdc")]
326 {
327 self.start_change_stream_reader().await?;
328 }
329
330 self.state = ConnectorState::Running;
331 tracing::info!(
332 database = %self.config.database,
333 collection = %self.config.collection,
334 full_document_mode = ?self.config.full_document_mode,
335 "MongoDB CDC source opened"
336 );
337
338 Ok(())
339 }
340
341 async fn poll_batch(
342 &mut self,
343 max_records: usize,
344 ) -> Result<Option<SourceBatch>, ConnectorError> {
345 #[cfg(feature = "mongodb-cdc")]
346 {
347 self.drain_channel()?;
348 }
349
350 self.drain_to_batch(max_records)
351 }
352
353 fn schema(&self) -> SchemaRef {
354 Arc::clone(&self.schema)
355 }
356
357 fn checkpoint(&self) -> SourceCheckpoint {
358 let mut cp = SourceCheckpoint::new(0);
359 if let Some(ref token) = self.last_resume_token {
360 cp.set_offset("resume_token", token.as_str());
361 }
362 cp.set_metadata("connector", "mongodb-cdc");
363 cp.set_metadata("database", &self.config.database);
364 cp.set_metadata("collection", &self.config.collection);
365 cp
366 }
367
368 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
369 if let Some(token_str) = checkpoint.get_offset("resume_token") {
370 let token = ResumeToken::new(token_str.to_string());
371 tracing::info!(resume_token = %token, "restoring from checkpoint");
372 self.last_resume_token = Some(token.clone());
373 self.resume_token_store
374 .save(&token)
375 .await
376 .map_err(|e| ConnectorError::Internal(format!("save resume token: {e}")))?;
377 }
378 Ok(())
379 }
380
381 fn health_check(&self) -> HealthStatus {
382 match self.state {
383 ConnectorState::Running => {
384 if self.invalidated {
385 HealthStatus::Degraded("change stream invalidated".to_string())
386 } else {
387 HealthStatus::Healthy
388 }
389 }
390 ConnectorState::Created => HealthStatus::Unknown,
391 ConnectorState::Closed | ConnectorState::Failed => {
392 HealthStatus::Unhealthy("closed".to_string())
393 }
394 _ => HealthStatus::Degraded(format!("state: {}", self.state)),
395 }
396 }
397
398 fn metrics(&self) -> ConnectorMetrics {
399 self.metrics.to_connector_metrics()
400 }
401
402 async fn close(&mut self) -> Result<(), ConnectorError> {
403 if let Some(ref token) = self.last_resume_token {
405 if let Err(e) = self.resume_token_store.save(token).await {
406 tracing::warn!(error = %e, "failed to persist resume token on close");
407 }
408 }
409
410 #[cfg(feature = "mongodb-cdc")]
412 {
413 if let Some(tx) = self.reader_shutdown.take() {
414 let _ = tx.send(true);
415 }
416 if let Some(handle) = self.reader_handle.take() {
417 let _ = handle.await;
418 }
419 }
420
421 self.state = ConnectorState::Closed;
422 tracing::info!("MongoDB CDC source closed");
423 Ok(())
424 }
425
426 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
427 Some(Arc::clone(&self.data_ready))
428 }
429
430 fn supports_replay(&self) -> bool {
431 true
433 }
434}
435
436#[cfg(feature = "mongodb-cdc")]
439impl MongoDbCdcSource {
440 async fn start_change_stream_reader(&mut self) -> Result<(), ConnectorError> {
442 use mongodb::options::ClientOptions;
443
444 let client_options = ClientOptions::parse(&self.config.connection_uri)
445 .await
446 .map_err(|e| ConnectorError::ConnectionFailed(format!("parse URI: {e}")))?;
447
448 if let Some(pool) = client_options.max_pool_size {
450 if pool <= 1 {
451 tracing::warn!(
452 max_pool_size = pool,
453 "max_pool_size is very small — on sharded clusters, \
454 mongos opens per-shard cursors and may exhaust the pool"
455 );
456 }
457 }
458
459 let client = mongodb::Client::with_options(client_options)
460 .map_err(|e| ConnectorError::ConnectionFailed(format!("create client: {e}")))?;
461
462 let db = client.database(&self.config.database);
463
464 if self.config.collection != "*" {
466 preflight_timeseries_guard(&db, &self.config.database, &self.config.collection).await?;
467 }
468
469 let (tx, rx) =
470 crossfire::mpsc::bounded_async::<ChangeStreamPayload>(self.config.max_buffered_events);
471 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
472
473 let config = self.config.clone();
474 let resume_token = self.last_resume_token.clone();
475 let data_ready = Arc::clone(&self.data_ready);
476 let metrics = Arc::clone(&self.metrics);
477
478 let handle = tokio::spawn(async move {
479 if let Err(e) = run_change_stream_reader(
480 db,
481 config,
482 resume_token,
483 tx,
484 shutdown_rx,
485 data_ready,
486 metrics,
487 )
488 .await
489 {
490 tracing::error!(error = %e, "change stream reader task failed");
491 }
492 });
493
494 self.reader_handle = Some(handle);
495 self.event_rx = Some(rx);
496 self.reader_shutdown = Some(shutdown_tx);
497
498 Ok(())
499 }
500
501 fn drain_channel(&mut self) -> Result<(), ConnectorError> {
503 let mut payloads = Vec::new();
505 if let Some(rx) = &mut self.event_rx {
506 while let Ok(payload) = rx.try_recv() {
507 payloads.push(payload);
508 }
509 }
510
511 for payload in payloads {
512 match payload {
513 ChangeStreamPayload::Event(event) => {
514 self.enqueue_event(*event);
515 }
516 ChangeStreamPayload::Error(msg) => {
517 self.metrics.record_error();
518 return Err(ConnectorError::ReadError(msg));
519 }
520 }
521 }
522 Ok(())
523 }
524}
525
526#[cfg(feature = "mongodb-cdc")]
528async fn preflight_timeseries_guard(
529 db: &mongodb::Database,
530 database: &str,
531 collection: &str,
532) -> Result<(), ConnectorError> {
533 use futures_util::StreamExt;
534 use mongodb::bson::doc;
535
536 let filter = doc! { "name": collection };
537 let mut cursor = db
538 .list_collections()
539 .filter(filter)
540 .await
541 .map_err(|e| ConnectorError::ConnectionFailed(format!("list collections: {e}")))?;
542 if let Some(result) = cursor.next().await {
543 let spec = result
544 .map_err(|e| ConnectorError::ConnectionFailed(format!("read collection spec: {e}")))?;
545 if spec.collection_type == mongodb::results::CollectionType::Timeseries {
547 return Err(ConnectorError::ConfigurationError(format!(
548 "time series collection {database}.{collection} does not support change streams"
549 )));
550 }
551 }
552
553 Ok(())
554}
555
556#[cfg(feature = "mongodb-cdc")]
558const MAX_FAILURES: u32 = 10;
559
560#[cfg(feature = "mongodb-cdc")]
566#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
567async fn run_change_stream_reader(
568 db: mongodb::Database,
569 config: MongoDbSourceConfig,
570 resume_token: Option<ResumeToken>,
571 tx: ChangeStreamTx,
572 mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
573 data_ready: Arc<Notify>,
574 metrics: Arc<MongoDbCdcMetrics>,
575) -> Result<(), ConnectorError> {
576 use futures_util::StreamExt;
577 use mongodb::options::ChangeStreamOptions;
578
579 let full_document = match config.full_document_mode {
580 super::config::FullDocumentMode::Delta => None,
581 super::config::FullDocumentMode::UpdateLookup => {
582 Some(mongodb::options::FullDocumentType::UpdateLookup)
583 }
584 super::config::FullDocumentMode::RequirePostImage => {
585 Some(mongodb::options::FullDocumentType::Required)
586 }
587 super::config::FullDocumentMode::WhenAvailable => {
588 Some(mongodb::options::FullDocumentType::WhenAvailable)
589 }
590 };
591
592 let mut last_token: Option<mongodb::change_stream::event::ResumeToken> =
594 if let Some(ref token) = resume_token {
595 serde_json::from_str::<mongodb::change_stream::event::ResumeToken>(token.as_str())
596 .map(Some)
597 .map_err(|e| ConnectorError::ReadError(format!("parse resume token: {e}")))?
598 } else {
599 None
600 };
601
602 let pipeline: Vec<mongodb::bson::Document> = config
606 .pipeline
607 .iter()
608 .filter_map(|v| mongodb::bson::to_document(v).ok())
609 .collect();
610
611 let mut current_db = db;
612 let mut consecutive_failures: u32 = 0;
613
614 'reconnect: loop {
615 let mut options = ChangeStreamOptions::default();
617 options.full_document = full_document.clone();
618 options.max_await_time = config
619 .max_await_time_ms
620 .map(std::time::Duration::from_millis);
621 options.batch_size = config.batch_size;
622 options.resume_after = last_token.clone();
623
624 if last_token.is_none() {
626 if let Some((secs, inc)) = config.start_at_operation_time {
627 options.start_at_operation_time = Some(mongodb::bson::Timestamp {
628 time: secs,
629 increment: inc,
630 });
631 }
632 }
633
634 let cursor_result = if config.collection == "*" {
636 current_db
637 .watch()
638 .pipeline(pipeline.clone())
639 .with_options(options)
640 .await
641 } else {
642 current_db
643 .collection::<mongodb::bson::Document>(&config.collection)
644 .watch()
645 .pipeline(pipeline.clone())
646 .with_options(options)
647 .await
648 };
649
650 let mut cursor = match cursor_result {
651 Ok(c) => c,
652 Err(e) => {
653 consecutive_failures += 1;
654 if consecutive_failures >= MAX_FAILURES {
655 let msg =
656 format!("change stream open failed after {MAX_FAILURES} attempts: {e}");
657 tracing::error!(%msg);
658 let _ = tx.send(ChangeStreamPayload::Error(msg)).await;
659 break 'reconnect;
660 }
661 let backoff_secs = (1u64 << consecutive_failures).min(30);
662 tracing::warn!(
663 attempt = consecutive_failures,
664 backoff_secs,
665 error = %e,
666 "failed to open change stream, retrying"
667 );
668 metrics.record_reconnect();
669 tokio::select! {
670 _ = shutdown_rx.changed() => {
671 if *shutdown_rx.borrow() {
672 break 'reconnect;
673 }
674 }
675 () = tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)) => {}
676 }
677 continue 'reconnect;
678 }
679 };
680
681 tracing::info!(
682 database = %config.database,
683 collection = %config.collection,
684 resumed = last_token.is_some(),
685 "change stream reader started"
686 );
687
688 'recv: loop {
689 tokio::select! {
690 biased;
691 _ = shutdown_rx.changed() => {
692 if *shutdown_rx.borrow() {
693 tracing::info!("change stream reader shutting down");
694 break 'reconnect;
695 }
696 }
697 next = cursor.next() => {
698 match next {
699 Some(Ok(cs_event)) => {
700 consecutive_failures = 0;
701
702 last_token = Some(cs_event.id.clone());
704
705 let change_event = parse_change_stream_event(&cs_event);
706
707 if tx
708 .send(ChangeStreamPayload::Event(Box::new(change_event)))
709 .await
710 .is_err()
711 {
712 tracing::warn!("source channel closed, stopping reader");
713 break 'reconnect;
714 }
715 data_ready.notify_one();
716 }
717 Some(Err(e)) => {
718 tracing::error!(error = %e, "change stream error");
719 break 'recv;
720 }
721 None => {
722 tracing::info!("change stream cursor exhausted");
723 consecutive_failures = 0;
724 break 'recv;
725 }
726 }
727 }
728 }
729 }
730
731 consecutive_failures += 1;
733 if consecutive_failures >= MAX_FAILURES {
734 let msg = format!("change stream failed after {MAX_FAILURES} consecutive failures");
735 tracing::error!(%msg);
736 let _ = tx.send(ChangeStreamPayload::Error(msg)).await;
737 break 'reconnect;
738 }
739
740 let backoff_secs = (1u64 << consecutive_failures).min(30);
741 tracing::warn!(
742 resume_token = ?last_token,
743 attempt = consecutive_failures,
744 backoff_secs,
745 "reconnecting change stream"
746 );
747 metrics.record_reconnect();
748
749 tokio::select! {
751 _ = shutdown_rx.changed() => {
752 if *shutdown_rx.borrow() {
753 break 'reconnect;
754 }
755 }
756 () = tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)) => {}
757 }
758
759 match mongodb::options::ClientOptions::parse(&config.connection_uri).await {
761 Ok(new_opts) => match mongodb::Client::with_options(new_opts) {
762 Ok(new_client) => {
763 current_db = new_client.database(&config.database);
764 }
765 Err(e) => {
766 tracing::warn!(error = %e, "failed to create client on reconnect");
767 }
768 },
769 Err(e) => {
770 tracing::warn!(error = %e, "failed to parse URI on reconnect");
771 }
772 }
773 }
774
775 Ok(())
776}
777
778#[cfg(feature = "mongodb-cdc")]
780fn parse_change_stream_event(
781 event: &mongodb::change_stream::event::ChangeStreamEvent<mongodb::bson::Document>,
782) -> MongoDbChangeEvent {
783 use super::change_event::{Namespace, UpdateDescription};
784 use mongodb::change_stream::event::OperationType as MongoOpType;
785
786 let operation_type = match event.operation_type {
787 MongoOpType::Insert => OperationType::Insert,
788 MongoOpType::Update => OperationType::Update,
789 MongoOpType::Replace => OperationType::Replace,
790 MongoOpType::Delete => OperationType::Delete,
791 MongoOpType::Drop => OperationType::Drop,
792 MongoOpType::Rename => OperationType::Rename,
793 MongoOpType::Invalidate => OperationType::Invalidate,
794 MongoOpType::DropDatabase => OperationType::DropDatabase,
795 ref other => {
796 tracing::warn!(?other, "unmapped MongoDB operation type");
797 OperationType::Other(format!("{other:?}"))
798 }
799 };
800
801 let namespace = event.ns.as_ref().map_or_else(
802 || Namespace {
803 db: String::new(),
804 coll: String::new(),
805 },
806 |ns| Namespace {
807 db: ns.db.clone(),
808 coll: ns.coll.clone().unwrap_or_default(),
809 },
810 );
811
812 let document_key = event
813 .document_key
814 .as_ref()
815 .and_then(|d| serde_json::to_string(d).ok())
816 .unwrap_or_default();
817
818 let full_document = event
819 .full_document
820 .as_ref()
821 .and_then(|d| serde_json::to_string(d).ok());
822
823 let update_description = event.update_description.as_ref().map(|ud| {
824 let updated_fields = ud
825 .updated_fields
826 .iter()
827 .filter_map(|(k, v)| serde_json::to_value(v).ok().map(|jv| (k.clone(), jv)))
828 .collect();
829
830 let removed_fields = ud.removed_fields.clone();
831
832 #[allow(clippy::cast_sign_loss)]
833 let truncated_arrays = ud
834 .truncated_arrays
835 .as_deref()
836 .unwrap_or_default()
837 .iter()
838 .map(|t| super::change_event::TruncatedArray {
839 field: t.field.clone(),
840 new_size: t.new_size as u32,
841 })
842 .collect();
843
844 UpdateDescription {
845 updated_fields,
846 removed_fields,
847 truncated_arrays,
848 }
849 });
850
851 let (cluster_time_secs, cluster_time_inc) = event
852 .cluster_time
853 .map_or((0, 0), |ts| (ts.time, ts.increment));
854
855 let wall_time_ms = event
856 .wall_time
857 .map_or(0, mongodb::bson::DateTime::timestamp_millis);
858
859 let resume_token = serde_json::to_string(&event.id).unwrap_or_default();
861
862 MongoDbChangeEvent {
863 operation_type,
864 namespace,
865 document_key,
866 full_document,
867 update_description,
868 cluster_time_secs,
869 cluster_time_inc,
870 resume_token,
871 wall_time_ms,
872 }
873}
874
875#[cfg(test)]
876mod tests {
877 use super::super::change_event::Namespace;
878 use super::*;
879
880 fn sample_event(op: OperationType) -> MongoDbChangeEvent {
881 MongoDbChangeEvent {
882 operation_type: op,
883 namespace: Namespace {
884 db: "testdb".to_string(),
885 coll: "users".to_string(),
886 },
887 document_key: r#"{"_id": "1"}"#.to_string(),
888 full_document: Some(r#"{"_id": "1", "name": "Alice"}"#.to_string()),
889 update_description: None,
890 cluster_time_secs: 1_700_000_000,
891 cluster_time_inc: 1,
892 resume_token: r#"{"_data": "token1"}"#.to_string(),
893 wall_time_ms: 1_700_000_000_000,
894 }
895 }
896
897 #[test]
898 fn test_schema() {
899 let schema = mongodb_cdc_envelope_schema();
900 assert_eq!(schema.fields().len(), 9);
901 assert_eq!(schema.field(0).name(), "_namespace");
902 assert_eq!(schema.field(6).name(), "_full_document");
903 assert!(schema.field(6).is_nullable());
904 }
905
906 #[test]
907 fn test_new_source() {
908 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
909 let source = MongoDbCdcSource::new(config, None);
910 assert_eq!(source.buffered_events(), 0);
911 assert!(!source.is_invalidated());
912 assert!(source.last_resume_token().is_none());
913 }
914
915 #[test]
916 fn test_enqueue_event() {
917 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
918 let mut source = MongoDbCdcSource::new(config, None);
919
920 source.enqueue_event(sample_event(OperationType::Insert));
921 assert_eq!(source.buffered_events(), 1);
922 assert!(!source.is_invalidated());
923 }
924
925 #[test]
926 fn test_enqueue_invalidate() {
927 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
928 let mut source = MongoDbCdcSource::new(config, None);
929
930 let mut event = sample_event(OperationType::Invalidate);
931 event.full_document = None;
932 source.enqueue_event(event);
933 assert!(source.is_invalidated());
934 }
935
936 #[test]
937 fn test_events_to_record_batch() {
938 let schema = mongodb_cdc_envelope_schema();
939 let events = vec![
940 sample_event(OperationType::Insert),
941 sample_event(OperationType::Delete),
942 ];
943
944 let batch = events_to_record_batch(&events, &schema).unwrap();
945 assert_eq!(batch.num_rows(), 2);
946 assert_eq!(batch.num_columns(), 9);
947 }
948
949 #[test]
950 fn test_events_to_record_batch_empty() {
951 let schema = mongodb_cdc_envelope_schema();
952 let batch = events_to_record_batch(&[], &schema).unwrap();
953 assert_eq!(batch.num_rows(), 0);
954 assert_eq!(batch.num_columns(), 9);
955 }
956
957 #[test]
958 fn test_drain_to_batch() {
959 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
960 let mut source = MongoDbCdcSource::new(config, None);
961
962 assert!(source.drain_to_batch(10).unwrap().is_none());
964
965 for _ in 0..5 {
967 source.enqueue_event(sample_event(OperationType::Insert));
968 }
969 let batch = source.drain_to_batch(3).unwrap().unwrap();
970 assert_eq!(batch.num_rows(), 3);
971 assert_eq!(source.buffered_events(), 2);
972
973 let batch = source.drain_to_batch(10).unwrap().unwrap();
975 assert_eq!(batch.num_rows(), 2);
976 assert_eq!(source.buffered_events(), 0);
977 }
978
979 #[test]
980 fn test_checkpoint() {
981 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "testdb", "users");
982 let mut source = MongoDbCdcSource::new(config, None);
983
984 let cp = source.checkpoint();
986 assert!(cp.get_offset("resume_token").is_none());
987 assert_eq!(cp.get_metadata("connector"), Some("mongodb-cdc"));
988
989 source.last_resume_token = Some(ResumeToken::new("tok123".to_string()));
991 let cp = source.checkpoint();
992 assert_eq!(cp.get_offset("resume_token"), Some("tok123"));
993 }
994
995 #[tokio::test]
996 async fn test_restore_checkpoint() {
997 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
998 let mut source = MongoDbCdcSource::new(config, None);
999
1000 let mut cp = SourceCheckpoint::new(0);
1001 cp.set_offset("resume_token", "restored_token");
1002
1003 source.restore(&cp).await.unwrap();
1004 assert_eq!(
1005 source.last_resume_token().unwrap().as_str(),
1006 "restored_token"
1007 );
1008 }
1009
1010 #[test]
1011 fn test_health_check() {
1012 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
1013 let mut source = MongoDbCdcSource::new(config, None);
1014
1015 assert_eq!(source.health_check(), HealthStatus::Unknown);
1016
1017 source.state = ConnectorState::Running;
1018 assert_eq!(source.health_check(), HealthStatus::Healthy);
1019
1020 source.invalidated = true;
1021 assert!(matches!(source.health_check(), HealthStatus::Degraded(_)));
1022
1023 source.state = ConnectorState::Closed;
1024 assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
1025 }
1026
1027 #[test]
1028 fn test_drain_tracks_resume_token() {
1029 let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
1030 let mut source = MongoDbCdcSource::new(config, None);
1031
1032 let mut event = sample_event(OperationType::Insert);
1033 event.resume_token = r#"{"_data": "final_token"}"#.to_string();
1034 source.enqueue_event(event);
1035
1036 source.drain_to_batch(10).unwrap();
1037 assert_eq!(
1038 source.last_resume_token().unwrap().as_str(),
1039 r#"{"_data": "final_token"}"#
1040 );
1041 }
1042}