Skip to main content

laminar_connectors/mongodb/
source.rs

1//! `MongoDB` CDC source connector implementation.
2//!
3//! Implements [`SourceConnector`] for streaming change events from `MongoDB`
4//! change streams into `LaminarDB` as Arrow `RecordBatch`es.
5//!
6//! # Architecture
7//!
8//! - **Ring 0**: No CDC code — just SPSC channel pop (~5ns)
9//! - **Ring 1**: Change stream consumption, event parsing, Arrow conversion
10//! - **Ring 2**: Connection management, collection validation, health checks
11//!
12//! # Cancellation Safety
13//!
14//! The `poll_batch` method is cancellation-safe: dropping the future
15//! mid-execution does not lose events. Buffered events remain in the
16//! internal `VecDeque` and will be returned on the next poll.
17
18use std::collections::VecDeque;
19use std::sync::Arc;
20
21use arrow_array::builder::{StringBuilder, UInt32Builder};
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema, SchemaRef};
24use async_trait::async_trait;
25use tokio::sync::Notify;
26
27use crate::checkpoint::SourceCheckpoint;
28use crate::config::{ConnectorConfig, ConnectorState};
29use crate::connector::{SourceBatch, SourceConnector};
30use crate::error::ConnectorError;
31
32use super::change_event::{MongoDbChangeEvent, OperationType};
33use super::config::MongoDbSourceConfig;
34use super::metrics::MongoDbCdcMetrics;
35use super::resume_token::{InMemoryResumeTokenStore, ResumeToken, ResumeTokenStore};
36
37/// Returns the Arrow schema for `MongoDB` CDC envelope records.
38///
39/// | Column              | Type   | Nullable | Description                        |
40/// |---------------------|--------|----------|------------------------------------|
41/// | `_namespace`        | Utf8   | no       | `database.collection`              |
42/// | `_op`               | Utf8   | no       | Operation code (I/U/R/D/DROP/...)  |
43/// | `_document_key`     | Utf8   | no       | Document key JSON                  |
44/// | `_cluster_time_s`   | UInt32 | no       | Cluster time seconds               |
45/// | `_cluster_time_i`   | UInt32 | no       | Cluster time increment             |
46/// | `_wall_time_ms`     | Timestamp(ms) | no | Wall clock timestamp             |
47/// | `_full_document`    | Utf8   | yes      | Full document JSON                 |
48/// | `_update_desc`      | Utf8   | yes      | Update description JSON            |
49/// | `_resume_token`     | Utf8   | no       | Opaque resume token JSON           |
50#[must_use]
51pub fn mongodb_cdc_envelope_schema() -> SchemaRef {
52    Arc::new(Schema::new(vec![
53        Field::new("_namespace", DataType::Utf8, false),
54        Field::new("_op", DataType::Utf8, false),
55        Field::new("_document_key", DataType::Utf8, false),
56        Field::new("_cluster_time_s", DataType::UInt32, false),
57        Field::new("_cluster_time_i", DataType::UInt32, false),
58        Field::new(
59            "_wall_time_ms",
60            DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
61            false,
62        ),
63        Field::new("_full_document", DataType::Utf8, true),
64        Field::new("_update_desc", DataType::Utf8, true),
65        Field::new("_resume_token", DataType::Utf8, false),
66    ]))
67}
68
69/// `MongoDB` CDC source connector.
70///
71/// Streams change events from a `MongoDB` change stream using the
72/// `SourceConnector` trait. Events are buffered internally and
73/// converted to Arrow `RecordBatch`es on `poll_batch`.
74///
75/// # Resume Token Handling
76///
77/// The connector tracks resume tokens from individual change event `_id`
78/// fields. Tokens are persisted via a pluggable [`ResumeTokenStore`].
79///
80/// # Sharded Cluster Note
81///
82/// On sharded clusters, `mongos` opens per-shard cursors and merges
83/// results transparently. Ensure `max_pool_size` is at least as large
84/// as the expected number of concurrent change streams to avoid
85/// connection starvation.
86pub struct MongoDbCdcSource {
87    /// Connector configuration.
88    config: MongoDbSourceConfig,
89
90    /// Current lifecycle state.
91    state: ConnectorState,
92
93    /// Output schema (CDC envelope).
94    schema: SchemaRef,
95
96    /// Lock-free metrics.
97    metrics: Arc<MongoDbCdcMetrics>,
98
99    /// Buffered change events awaiting `poll_batch`.
100    event_buffer: VecDeque<MongoDbChangeEvent>,
101
102    /// Last persisted resume token.
103    last_resume_token: Option<ResumeToken>,
104
105    /// Resume token store.
106    resume_token_store: Box<dyn ResumeTokenStore>,
107
108    /// Notification handle signalled when data arrives from the stream.
109    data_ready: Arc<Notify>,
110
111    /// Whether an invalidate event has been received.
112    invalidated: bool,
113
114    /// Background change stream reader task handle (feature-gated).
115    #[cfg(feature = "mongodb-cdc")]
116    reader_handle: Option<tokio::task::JoinHandle<()>>,
117
118    /// Channel receiver for change events from the background task.
119    #[cfg(feature = "mongodb-cdc")]
120    event_rx: Option<ChangeStreamRx>,
121
122    /// Shutdown signal for the background reader task.
123    #[cfg(feature = "mongodb-cdc")]
124    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
125}
126
127/// Payload from the background change stream reader task.
128#[allow(dead_code)] // constructed + consumed only with feature = "mongodb-cdc"
129enum ChangeStreamPayload {
130    /// A change event.
131    Event(Box<MongoDbChangeEvent>),
132    /// Fatal error from the reader task.
133    Error(String),
134}
135
136/// Cloneable async sender for the change stream reader → `poll_batch` queue.
137#[cfg(feature = "mongodb-cdc")]
138type ChangeStreamTx = crossfire::MAsyncTx<crossfire::mpsc::Array<ChangeStreamPayload>>;
139/// Single-consumer async receiver for the change stream reader → `poll_batch` queue.
140#[cfg(feature = "mongodb-cdc")]
141type ChangeStreamRx = crossfire::AsyncRx<crossfire::mpsc::Array<ChangeStreamPayload>>;
142
143impl MongoDbCdcSource {
144    /// Creates a new `MongoDB` CDC source with the given configuration.
145    #[must_use]
146    pub fn new(config: MongoDbSourceConfig, registry: Option<&prometheus::Registry>) -> Self {
147        Self {
148            config,
149            state: ConnectorState::Created,
150            schema: mongodb_cdc_envelope_schema(),
151            metrics: Arc::new(MongoDbCdcMetrics::new(registry)),
152            event_buffer: VecDeque::new(),
153            last_resume_token: None,
154            resume_token_store: Box::new(InMemoryResumeTokenStore::new()),
155            data_ready: Arc::new(Notify::new()),
156            invalidated: false,
157            #[cfg(feature = "mongodb-cdc")]
158            reader_handle: None,
159            #[cfg(feature = "mongodb-cdc")]
160            event_rx: None,
161            #[cfg(feature = "mongodb-cdc")]
162            reader_shutdown: None,
163        }
164    }
165
166    /// Creates a new source from a generic [`ConnectorConfig`].
167    ///
168    /// # Errors
169    ///
170    /// Returns `ConnectorError` if the configuration is invalid.
171    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
172        let mongo_config = MongoDbSourceConfig::from_config(config)?;
173        Ok(Self::new(mongo_config, None))
174    }
175
176    /// Sets a custom resume token store.
177    #[must_use]
178    pub fn with_resume_token_store(mut self, store: Box<dyn ResumeTokenStore>) -> Self {
179        self.resume_token_store = store;
180        self
181    }
182
183    /// Returns a reference to the source configuration.
184    #[must_use]
185    pub fn config(&self) -> &MongoDbSourceConfig {
186        &self.config
187    }
188
189    /// Returns the last persisted resume token.
190    #[must_use]
191    pub fn last_resume_token(&self) -> Option<&ResumeToken> {
192        self.last_resume_token.as_ref()
193    }
194
195    /// Returns the number of buffered events.
196    #[must_use]
197    pub fn buffered_events(&self) -> usize {
198        self.event_buffer.len()
199    }
200
201    /// Returns `true` if the stream has been invalidated.
202    #[must_use]
203    pub fn is_invalidated(&self) -> bool {
204        self.invalidated
205    }
206
207    /// Enqueues a change event for processing (used by tests and the
208    /// background reader task).
209    pub fn enqueue_event(&mut self, event: MongoDbChangeEvent) {
210        if event.operation_type == OperationType::Invalidate {
211            self.invalidated = true;
212        }
213        self.metrics.record_event(event.operation_type.as_str());
214        self.event_buffer.push_back(event);
215    }
216
217    /// Drains up to `max_records` events from the buffer and converts
218    /// them to an Arrow `RecordBatch`.
219    ///
220    /// # Errors
221    ///
222    /// Returns `ConnectorError` if Arrow batch construction fails.
223    pub fn drain_to_batch(
224        &mut self,
225        max_records: usize,
226    ) -> Result<Option<SourceBatch>, ConnectorError> {
227        if self.event_buffer.is_empty() {
228            return Ok(None);
229        }
230
231        let count = max_records.min(self.event_buffer.len());
232        let events: Vec<MongoDbChangeEvent> = self.event_buffer.drain(..count).collect();
233
234        let batch = events_to_record_batch(&events, &self.schema)?;
235        self.metrics.record_batch();
236
237        // Track resume token from the last event in this batch.
238        if let Some(last) = events.last() {
239            self.last_resume_token = Some(ResumeToken::new(last.resume_token.clone()));
240        }
241
242        Ok(Some(SourceBatch::new(batch)))
243    }
244}
245
246/// Converts a batch of [`MongoDbChangeEvent`]s to an Arrow `RecordBatch`.
247fn events_to_record_batch(
248    events: &[MongoDbChangeEvent],
249    schema: &SchemaRef,
250) -> Result<RecordBatch, ConnectorError> {
251    let len = events.len();
252
253    let mut ns_builder = StringBuilder::with_capacity(len, len * 32);
254    let mut op_builder = StringBuilder::with_capacity(len, len * 4);
255    let mut dk_builder = StringBuilder::with_capacity(len, len * 64);
256    let mut cts_builder = UInt32Builder::with_capacity(len);
257    let mut ct_inc_builder = UInt32Builder::with_capacity(len);
258    let mut wt_builder = arrow_array::builder::TimestampMillisecondBuilder::with_capacity(len);
259    let mut fd_builder = StringBuilder::with_capacity(len, len * 128);
260    let mut ud_builder = StringBuilder::with_capacity(len, len * 64);
261    let mut rt_builder = StringBuilder::with_capacity(len, len * 64);
262
263    for event in events {
264        ns_builder.append_value(event.namespace.full_name());
265        op_builder.append_value(event.operation_type.as_str());
266        dk_builder.append_value(&event.document_key);
267        cts_builder.append_value(event.cluster_time_secs);
268        ct_inc_builder.append_value(event.cluster_time_inc);
269        wt_builder.append_value(event.wall_time_ms);
270
271        match &event.full_document {
272            Some(doc) => fd_builder.append_value(doc),
273            None => fd_builder.append_null(),
274        }
275
276        match &event.update_description {
277            Some(desc) => {
278                let json = serde_json::to_string(desc)
279                    .map_err(|e| ConnectorError::Internal(format!("serialize update_desc: {e}")))?;
280                ud_builder.append_value(&json);
281            }
282            None => ud_builder.append_null(),
283        }
284
285        rt_builder.append_value(&event.resume_token);
286    }
287
288    RecordBatch::try_new(
289        Arc::clone(schema),
290        vec![
291            Arc::new(ns_builder.finish()),
292            Arc::new(op_builder.finish()),
293            Arc::new(dk_builder.finish()),
294            Arc::new(cts_builder.finish()),
295            Arc::new(ct_inc_builder.finish()),
296            Arc::new(wt_builder.finish()),
297            Arc::new(fd_builder.finish()),
298            Arc::new(ud_builder.finish()),
299            Arc::new(rt_builder.finish()),
300        ],
301    )
302    .map_err(|e| ConnectorError::Internal(format!("arrow batch: {e}")))
303}
304
305#[async_trait]
306impl SourceConnector for MongoDbCdcSource {
307    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
308        self.config.validate()?;
309
310        // Load any persisted resume token.
311        let persisted_token = self
312            .resume_token_store
313            .load()
314            .await
315            .map_err(|e| ConnectorError::Internal(format!("load resume token: {e}")))?;
316
317        if let Some(token) = persisted_token {
318            tracing::info!(resume_token = %token, "resuming from persisted token");
319            self.last_resume_token = Some(token);
320        }
321
322        // Feature-gated: start the background change stream reader.
323        #[cfg(feature = "mongodb-cdc")]
324        {
325            self.start_change_stream_reader().await?;
326        }
327
328        self.state = ConnectorState::Running;
329        tracing::info!(
330            database = %self.config.database,
331            collection = %self.config.collection,
332            full_document_mode = ?self.config.full_document_mode,
333            "MongoDB CDC source opened"
334        );
335
336        Ok(())
337    }
338
339    async fn poll_batch(
340        &mut self,
341        max_records: usize,
342    ) -> Result<Option<SourceBatch>, ConnectorError> {
343        #[cfg(feature = "mongodb-cdc")]
344        {
345            self.drain_channel()?;
346        }
347
348        self.drain_to_batch(max_records)
349    }
350
351    fn schema(&self) -> SchemaRef {
352        Arc::clone(&self.schema)
353    }
354
355    fn checkpoint(&self) -> SourceCheckpoint {
356        let mut cp = SourceCheckpoint::new(0);
357        if let Some(ref token) = self.last_resume_token {
358            cp.set_offset("resume_token", token.as_str());
359        }
360        cp.set_metadata("connector", "mongodb-cdc");
361        cp.set_metadata("database", &self.config.database);
362        cp.set_metadata("collection", &self.config.collection);
363        cp
364    }
365
366    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
367        if let Some(token_str) = checkpoint.get_offset("resume_token") {
368            let token = ResumeToken::new(token_str.to_string());
369            tracing::info!(resume_token = %token, "restoring from checkpoint");
370            self.last_resume_token = Some(token.clone());
371            self.resume_token_store
372                .save(&token)
373                .await
374                .map_err(|e| ConnectorError::Internal(format!("save resume token: {e}")))?;
375        }
376        Ok(())
377    }
378
379    async fn close(&mut self) -> Result<(), ConnectorError> {
380        // Persist the last resume token before shutting down.
381        if let Some(ref token) = self.last_resume_token {
382            if let Err(e) = self.resume_token_store.save(token).await {
383                tracing::warn!(error = %e, "failed to persist resume token on close");
384            }
385        }
386
387        // Shut down the background reader task.
388        #[cfg(feature = "mongodb-cdc")]
389        {
390            if let Some(tx) = self.reader_shutdown.take() {
391                let _ = tx.send(true);
392            }
393            if let Some(handle) = self.reader_handle.take() {
394                let _ = handle.await;
395            }
396        }
397
398        self.state = ConnectorState::Closed;
399        tracing::info!("MongoDB CDC source closed");
400        Ok(())
401    }
402
403    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
404        Some(Arc::clone(&self.data_ready))
405    }
406
407    fn supports_replay(&self) -> bool {
408        // MongoDB change streams support resume from a token.
409        true
410    }
411}
412
413// ── Feature-gated I/O (real MongoDB driver) ──
414
415#[cfg(feature = "mongodb-cdc")]
416impl MongoDbCdcSource {
417    /// Starts the background change stream reader task.
418    async fn start_change_stream_reader(&mut self) -> Result<(), ConnectorError> {
419        use mongodb::options::ClientOptions;
420
421        let client_options = ClientOptions::parse(&self.config.connection_uri)
422            .await
423            .map_err(|e| ConnectorError::ConnectionFailed(format!("parse URI: {e}")))?;
424
425        // Warn if pool size may be too small for sharded clusters.
426        if let Some(pool) = client_options.max_pool_size {
427            if pool <= 1 {
428                tracing::warn!(
429                    max_pool_size = pool,
430                    "max_pool_size is very small — on sharded clusters, \
431                     mongos opens per-shard cursors and may exhaust the pool"
432                );
433            }
434        }
435
436        let client = mongodb::Client::with_options(client_options)
437            .map_err(|e| ConnectorError::ConnectionFailed(format!("create client: {e}")))?;
438
439        let db = client.database(&self.config.database);
440
441        // Pre-flight: detect time series collections.
442        if self.config.collection != "*" {
443            preflight_timeseries_guard(&db, &self.config.database, &self.config.collection).await?;
444        }
445
446        let (tx, rx) =
447            crossfire::mpsc::bounded_async::<ChangeStreamPayload>(self.config.max_buffered_events);
448        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
449
450        let config = self.config.clone();
451        let resume_token = self.last_resume_token.clone();
452        let data_ready = Arc::clone(&self.data_ready);
453        let metrics = Arc::clone(&self.metrics);
454
455        let handle = tokio::spawn(async move {
456            if let Err(e) = run_change_stream_reader(
457                db,
458                config,
459                resume_token,
460                tx,
461                shutdown_rx,
462                data_ready,
463                metrics,
464            )
465            .await
466            {
467                tracing::error!(error = %e, "change stream reader task failed");
468            }
469        });
470
471        self.reader_handle = Some(handle);
472        self.event_rx = Some(rx);
473        self.reader_shutdown = Some(shutdown_tx);
474
475        Ok(())
476    }
477
478    /// Drains events from the background reader channel into the buffer.
479    fn drain_channel(&mut self) -> Result<(), ConnectorError> {
480        // Collect payloads first to avoid double borrow.
481        let mut payloads = Vec::new();
482        if let Some(rx) = &mut self.event_rx {
483            while let Ok(payload) = rx.try_recv() {
484                payloads.push(payload);
485            }
486        }
487
488        for payload in payloads {
489            match payload {
490                ChangeStreamPayload::Event(event) => {
491                    self.enqueue_event(*event);
492                }
493                ChangeStreamPayload::Error(msg) => {
494                    self.metrics.record_error();
495                    return Err(ConnectorError::ReadError(msg));
496                }
497            }
498        }
499        Ok(())
500    }
501}
502
503/// Pre-flight check: reject time series collections.
504#[cfg(feature = "mongodb-cdc")]
505async fn preflight_timeseries_guard(
506    db: &mongodb::Database,
507    database: &str,
508    collection: &str,
509) -> Result<(), ConnectorError> {
510    use futures_util::StreamExt;
511    use mongodb::bson::doc;
512
513    let filter = doc! { "name": collection };
514    let mut cursor = db
515        .list_collections()
516        .filter(filter)
517        .await
518        .map_err(|e| ConnectorError::ConnectionFailed(format!("list collections: {e}")))?;
519    if let Some(result) = cursor.next().await {
520        let spec = result
521            .map_err(|e| ConnectorError::ConnectionFailed(format!("read collection spec: {e}")))?;
522        // Check if collection_type indicates time series.
523        if spec.collection_type == mongodb::results::CollectionType::Timeseries {
524            return Err(ConnectorError::ConfigurationError(format!(
525                "time series collection {database}.{collection} does not support change streams"
526            )));
527        }
528    }
529
530    Ok(())
531}
532
533/// Maximum consecutive failures before the reader gives up.
534#[cfg(feature = "mongodb-cdc")]
535const MAX_FAILURES: u32 = 10;
536
537/// Background task that reads from the `MongoDB` change stream and sends
538/// events to the source via a channel.
539///
540/// Uses a `'reconnect` / `'recv` double-loop pattern (mirroring the
541/// Postgres CDC source) with exponential backoff capped at 30 seconds.
542#[cfg(feature = "mongodb-cdc")]
543#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
544async fn run_change_stream_reader(
545    db: mongodb::Database,
546    config: MongoDbSourceConfig,
547    resume_token: Option<ResumeToken>,
548    tx: ChangeStreamTx,
549    mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
550    data_ready: Arc<Notify>,
551    metrics: Arc<MongoDbCdcMetrics>,
552) -> Result<(), ConnectorError> {
553    use futures_util::StreamExt;
554    use mongodb::options::ChangeStreamOptions;
555
556    let full_document = match config.full_document_mode {
557        super::config::FullDocumentMode::Delta => None,
558        super::config::FullDocumentMode::UpdateLookup => {
559            Some(mongodb::options::FullDocumentType::UpdateLookup)
560        }
561        super::config::FullDocumentMode::RequirePostImage => {
562            Some(mongodb::options::FullDocumentType::Required)
563        }
564        super::config::FullDocumentMode::WhenAvailable => {
565            Some(mongodb::options::FullDocumentType::WhenAvailable)
566        }
567    };
568
569    // Initialize the last resume token for reconnection.
570    let mut last_token: Option<mongodb::change_stream::event::ResumeToken> =
571        if let Some(ref token) = resume_token {
572            serde_json::from_str::<mongodb::change_stream::event::ResumeToken>(token.as_str())
573                .map(Some)
574                .map_err(|e| ConnectorError::ReadError(format!("parse resume token: {e}")))?
575        } else {
576            None
577        };
578
579    // Build a static pipeline (without $changeStreamSplitLargeEvent — the
580    // mongodb v3 driver drops the splitEvent field during deserialization,
581    // so fragment detection is not possible; see config validation).
582    let pipeline: Vec<mongodb::bson::Document> = config
583        .pipeline
584        .iter()
585        .filter_map(|v| mongodb::bson::to_document(v).ok())
586        .collect();
587
588    let mut current_db = db;
589    let mut consecutive_failures: u32 = 0;
590
591    'reconnect: loop {
592        // Build options for this connection attempt.
593        let mut options = ChangeStreamOptions::default();
594        options.full_document = full_document.clone();
595        options.max_await_time = config
596            .max_await_time_ms
597            .map(std::time::Duration::from_millis);
598        options.batch_size = config.batch_size;
599        options.resume_after = last_token.clone();
600
601        // Only set start_at_operation_time on fresh starts (no resume token).
602        if last_token.is_none() {
603            if let Some((secs, inc)) = config.start_at_operation_time {
604                options.start_at_operation_time = Some(mongodb::bson::Timestamp {
605                    time: secs,
606                    increment: inc,
607                });
608            }
609        }
610
611        // Open the change stream cursor.
612        let cursor_result = if config.collection == "*" {
613            current_db
614                .watch()
615                .pipeline(pipeline.clone())
616                .with_options(options)
617                .await
618        } else {
619            current_db
620                .collection::<mongodb::bson::Document>(&config.collection)
621                .watch()
622                .pipeline(pipeline.clone())
623                .with_options(options)
624                .await
625        };
626
627        let mut cursor = match cursor_result {
628            Ok(c) => c,
629            Err(e) => {
630                consecutive_failures += 1;
631                if consecutive_failures >= MAX_FAILURES {
632                    let msg =
633                        format!("change stream open failed after {MAX_FAILURES} attempts: {e}");
634                    tracing::error!(%msg);
635                    let _ = tx.send(ChangeStreamPayload::Error(msg)).await;
636                    break 'reconnect;
637                }
638                let backoff = crate::retry::Backoff::broker_reconnect().delay(consecutive_failures);
639                tracing::warn!(
640                    attempt = consecutive_failures,
641                    ?backoff,
642                    error = %e,
643                    "failed to open change stream, retrying"
644                );
645                metrics.record_reconnect();
646                tokio::select! {
647                    _ = shutdown_rx.changed() => {
648                        if *shutdown_rx.borrow() {
649                            break 'reconnect;
650                        }
651                    }
652                    () = tokio::time::sleep(backoff) => {}
653                }
654                continue 'reconnect;
655            }
656        };
657
658        tracing::info!(
659            database = %config.database,
660            collection = %config.collection,
661            resumed = last_token.is_some(),
662            "change stream reader started"
663        );
664
665        'recv: loop {
666            tokio::select! {
667                biased;
668                _ = shutdown_rx.changed() => {
669                    if *shutdown_rx.borrow() {
670                        tracing::info!("change stream reader shutting down");
671                        break 'reconnect;
672                    }
673                }
674                next = cursor.next() => {
675                    match next {
676                        Some(Ok(cs_event)) => {
677                            consecutive_failures = 0;
678
679                            // Track resume token for reconnection.
680                            last_token = Some(cs_event.id.clone());
681
682                            let change_event = parse_change_stream_event(&cs_event);
683
684                            if tx
685                                .send(ChangeStreamPayload::Event(Box::new(change_event)))
686                                .await
687                                .is_err()
688                            {
689                                tracing::warn!("source channel closed, stopping reader");
690                                break 'reconnect;
691                            }
692                            data_ready.notify_one();
693                        }
694                        Some(Err(e)) => {
695                            tracing::error!(error = %e, "change stream error");
696                            break 'recv;
697                        }
698                        None => {
699                            tracing::info!("change stream cursor exhausted");
700                            consecutive_failures = 0;
701                            break 'recv;
702                        }
703                    }
704                }
705            }
706        }
707
708        // Exited recv loop due to error or cursor exhaustion — attempt reconnect.
709        consecutive_failures += 1;
710        if consecutive_failures >= MAX_FAILURES {
711            let msg = format!("change stream failed after {MAX_FAILURES} consecutive failures");
712            tracing::error!(%msg);
713            let _ = tx.send(ChangeStreamPayload::Error(msg)).await;
714            break 'reconnect;
715        }
716
717        let backoff = crate::retry::Backoff::broker_reconnect().delay(consecutive_failures);
718        tracing::warn!(
719            resume_token = ?last_token,
720            attempt = consecutive_failures,
721            ?backoff,
722            "reconnecting change stream"
723        );
724        metrics.record_reconnect();
725
726        // Interruptible backoff.
727        tokio::select! {
728            _ = shutdown_rx.changed() => {
729                if *shutdown_rx.borrow() {
730                    break 'reconnect;
731                }
732            }
733            () = tokio::time::sleep(backoff) => {}
734        }
735
736        // Re-create client and database for reconnection.
737        match mongodb::options::ClientOptions::parse(&config.connection_uri).await {
738            Ok(new_opts) => match mongodb::Client::with_options(new_opts) {
739                Ok(new_client) => {
740                    current_db = new_client.database(&config.database);
741                }
742                Err(e) => {
743                    tracing::warn!(error = %e, "failed to create client on reconnect");
744                }
745            },
746            Err(e) => {
747                tracing::warn!(error = %e, "failed to parse URI on reconnect");
748            }
749        }
750    }
751
752    Ok(())
753}
754
755/// Parses a `ChangeStreamEvent<Document>` into a [`MongoDbChangeEvent`].
756#[cfg(feature = "mongodb-cdc")]
757fn parse_change_stream_event(
758    event: &mongodb::change_stream::event::ChangeStreamEvent<mongodb::bson::Document>,
759) -> MongoDbChangeEvent {
760    use super::change_event::{Namespace, UpdateDescription};
761    use mongodb::change_stream::event::OperationType as MongoOpType;
762
763    let operation_type = match event.operation_type {
764        MongoOpType::Insert => OperationType::Insert,
765        MongoOpType::Update => OperationType::Update,
766        MongoOpType::Replace => OperationType::Replace,
767        MongoOpType::Delete => OperationType::Delete,
768        MongoOpType::Drop => OperationType::Drop,
769        MongoOpType::Rename => OperationType::Rename,
770        MongoOpType::Invalidate => OperationType::Invalidate,
771        MongoOpType::DropDatabase => OperationType::DropDatabase,
772        ref other => {
773            tracing::warn!(?other, "unmapped MongoDB operation type");
774            OperationType::Other(format!("{other:?}"))
775        }
776    };
777
778    let namespace = event.ns.as_ref().map_or_else(
779        || Namespace {
780            db: String::new(),
781            coll: String::new(),
782        },
783        |ns| Namespace {
784            db: ns.db.clone(),
785            coll: ns.coll.clone().unwrap_or_default(),
786        },
787    );
788
789    let document_key = event
790        .document_key
791        .as_ref()
792        .and_then(|d| serde_json::to_string(d).ok())
793        .unwrap_or_default();
794
795    let full_document = event
796        .full_document
797        .as_ref()
798        .and_then(|d| serde_json::to_string(d).ok());
799
800    let update_description = event.update_description.as_ref().map(|ud| {
801        let updated_fields = ud
802            .updated_fields
803            .iter()
804            .filter_map(|(k, v)| serde_json::to_value(v).ok().map(|jv| (k.clone(), jv)))
805            .collect();
806
807        let removed_fields = ud.removed_fields.clone();
808
809        #[allow(clippy::cast_sign_loss)]
810        let truncated_arrays = ud
811            .truncated_arrays
812            .as_deref()
813            .unwrap_or_default()
814            .iter()
815            .map(|t| super::change_event::TruncatedArray {
816                field: t.field.clone(),
817                new_size: t.new_size as u32,
818            })
819            .collect();
820
821        UpdateDescription {
822            updated_fields,
823            removed_fields,
824            truncated_arrays,
825        }
826    });
827
828    let (cluster_time_secs, cluster_time_inc) = event
829        .cluster_time
830        .map_or((0, 0), |ts| (ts.time, ts.increment));
831
832    let wall_time_ms = event
833        .wall_time
834        .map_or(0, mongodb::bson::DateTime::timestamp_millis);
835
836    // Serialize the ResumeToken via serde (it implements Serialize).
837    let resume_token = serde_json::to_string(&event.id).unwrap_or_default();
838
839    MongoDbChangeEvent {
840        operation_type,
841        namespace,
842        document_key,
843        full_document,
844        update_description,
845        cluster_time_secs,
846        cluster_time_inc,
847        resume_token,
848        wall_time_ms,
849    }
850}
851
852#[cfg(test)]
853mod tests {
854    use super::super::change_event::Namespace;
855    use super::*;
856
857    fn sample_event(op: OperationType) -> MongoDbChangeEvent {
858        MongoDbChangeEvent {
859            operation_type: op,
860            namespace: Namespace {
861                db: "testdb".to_string(),
862                coll: "users".to_string(),
863            },
864            document_key: r#"{"_id": "1"}"#.to_string(),
865            full_document: Some(r#"{"_id": "1", "name": "Alice"}"#.to_string()),
866            update_description: None,
867            cluster_time_secs: 1_700_000_000,
868            cluster_time_inc: 1,
869            resume_token: r#"{"_data": "token1"}"#.to_string(),
870            wall_time_ms: 1_700_000_000_000,
871        }
872    }
873
874    #[test]
875    fn test_schema() {
876        let schema = mongodb_cdc_envelope_schema();
877        assert_eq!(schema.fields().len(), 9);
878        assert_eq!(schema.field(0).name(), "_namespace");
879        assert_eq!(schema.field(6).name(), "_full_document");
880        assert!(schema.field(6).is_nullable());
881    }
882
883    #[test]
884    fn test_new_source() {
885        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
886        let source = MongoDbCdcSource::new(config, None);
887        assert_eq!(source.buffered_events(), 0);
888        assert!(!source.is_invalidated());
889        assert!(source.last_resume_token().is_none());
890    }
891
892    #[test]
893    fn test_enqueue_event() {
894        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
895        let mut source = MongoDbCdcSource::new(config, None);
896
897        source.enqueue_event(sample_event(OperationType::Insert));
898        assert_eq!(source.buffered_events(), 1);
899        assert!(!source.is_invalidated());
900    }
901
902    #[test]
903    fn test_enqueue_invalidate() {
904        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
905        let mut source = MongoDbCdcSource::new(config, None);
906
907        let mut event = sample_event(OperationType::Invalidate);
908        event.full_document = None;
909        source.enqueue_event(event);
910        assert!(source.is_invalidated());
911    }
912
913    #[test]
914    fn test_events_to_record_batch() {
915        let schema = mongodb_cdc_envelope_schema();
916        let events = vec![
917            sample_event(OperationType::Insert),
918            sample_event(OperationType::Delete),
919        ];
920
921        let batch = events_to_record_batch(&events, &schema).unwrap();
922        assert_eq!(batch.num_rows(), 2);
923        assert_eq!(batch.num_columns(), 9);
924    }
925
926    #[test]
927    fn test_events_to_record_batch_empty() {
928        let schema = mongodb_cdc_envelope_schema();
929        let batch = events_to_record_batch(&[], &schema).unwrap();
930        assert_eq!(batch.num_rows(), 0);
931        assert_eq!(batch.num_columns(), 9);
932    }
933
934    #[test]
935    fn test_drain_to_batch() {
936        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
937        let mut source = MongoDbCdcSource::new(config, None);
938
939        // Empty buffer returns None.
940        assert!(source.drain_to_batch(10).unwrap().is_none());
941
942        // Add events and drain.
943        for _ in 0..5 {
944            source.enqueue_event(sample_event(OperationType::Insert));
945        }
946        let batch = source.drain_to_batch(3).unwrap().unwrap();
947        assert_eq!(batch.num_rows(), 3);
948        assert_eq!(source.buffered_events(), 2);
949
950        // Drain remaining.
951        let batch = source.drain_to_batch(10).unwrap().unwrap();
952        assert_eq!(batch.num_rows(), 2);
953        assert_eq!(source.buffered_events(), 0);
954    }
955
956    #[test]
957    fn test_checkpoint() {
958        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "testdb", "users");
959        let mut source = MongoDbCdcSource::new(config, None);
960
961        // Without resume token.
962        let cp = source.checkpoint();
963        assert!(cp.get_offset("resume_token").is_none());
964        assert_eq!(cp.get_metadata("connector"), Some("mongodb-cdc"));
965
966        // With resume token.
967        source.last_resume_token = Some(ResumeToken::new("tok123".to_string()));
968        let cp = source.checkpoint();
969        assert_eq!(cp.get_offset("resume_token"), Some("tok123"));
970    }
971
972    #[tokio::test]
973    async fn test_restore_checkpoint() {
974        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
975        let mut source = MongoDbCdcSource::new(config, None);
976
977        let mut cp = SourceCheckpoint::new(0);
978        cp.set_offset("resume_token", "restored_token");
979
980        source.restore(&cp).await.unwrap();
981        assert_eq!(
982            source.last_resume_token().unwrap().as_str(),
983            "restored_token"
984        );
985    }
986
987    #[test]
988    fn test_drain_tracks_resume_token() {
989        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
990        let mut source = MongoDbCdcSource::new(config, None);
991
992        let mut event = sample_event(OperationType::Insert);
993        event.resume_token = r#"{"_data": "final_token"}"#.to_string();
994        source.enqueue_event(event);
995
996        source.drain_to_batch(10).unwrap();
997        assert_eq!(
998            source.last_resume_token().unwrap().as_str(),
999            r#"{"_data": "final_token"}"#
1000        );
1001    }
1002}