Skip to main content

laminar_connectors/mongodb/
source.rs

1//! `MongoDB` CDC source connector implementation.
2//!
3//! Implements [`SourceConnector`] for streaming change events from `MongoDB`
4//! change streams into `LaminarDB` as Arrow `RecordBatch`es.
5//!
6//! # Architecture
7//!
8//! - **Ring 0**: No CDC code — just SPSC channel pop (~5ns)
9//! - **Ring 1**: Change stream consumption, event parsing, Arrow conversion
10//! - **Ring 2**: Connection management, collection validation, health checks
11//!
12//! # Cancellation Safety
13//!
14//! The `poll_batch` method is cancellation-safe: dropping the future
15//! mid-execution does not lose events. Buffered events remain in the
16//! internal `VecDeque` and will be returned on the next poll.
17
18use std::collections::VecDeque;
19use std::sync::Arc;
20
21use arrow_array::builder::{StringBuilder, UInt32Builder};
22use arrow_array::RecordBatch;
23use arrow_schema::{DataType, Field, Schema, SchemaRef};
24use async_trait::async_trait;
25use tokio::sync::Notify;
26
27use crate::checkpoint::SourceCheckpoint;
28use crate::config::{ConnectorConfig, ConnectorState};
29use crate::connector::{SourceBatch, SourceConnector};
30use crate::error::ConnectorError;
31use crate::health::HealthStatus;
32use crate::metrics::ConnectorMetrics;
33
34use super::change_event::{MongoDbChangeEvent, OperationType};
35use super::config::MongoDbSourceConfig;
36use super::metrics::MongoDbCdcMetrics;
37use super::resume_token::{InMemoryResumeTokenStore, ResumeToken, ResumeTokenStore};
38
39/// Returns the Arrow schema for `MongoDB` CDC envelope records.
40///
41/// | Column              | Type   | Nullable | Description                        |
42/// |---------------------|--------|----------|------------------------------------|
43/// | `_namespace`        | Utf8   | no       | `database.collection`              |
44/// | `_op`               | Utf8   | no       | Operation code (I/U/R/D/DROP/...)  |
45/// | `_document_key`     | Utf8   | no       | Document key JSON                  |
46/// | `_cluster_time_s`   | UInt32 | no       | Cluster time seconds               |
47/// | `_cluster_time_i`   | UInt32 | no       | Cluster time increment             |
48/// | `_wall_time_ms`     | Timestamp(ms) | no | Wall clock timestamp             |
49/// | `_full_document`    | Utf8   | yes      | Full document JSON                 |
50/// | `_update_desc`      | Utf8   | yes      | Update description JSON            |
51/// | `_resume_token`     | Utf8   | no       | Opaque resume token JSON           |
52#[must_use]
53pub fn mongodb_cdc_envelope_schema() -> SchemaRef {
54    Arc::new(Schema::new(vec![
55        Field::new("_namespace", DataType::Utf8, false),
56        Field::new("_op", DataType::Utf8, false),
57        Field::new("_document_key", DataType::Utf8, false),
58        Field::new("_cluster_time_s", DataType::UInt32, false),
59        Field::new("_cluster_time_i", DataType::UInt32, false),
60        Field::new(
61            "_wall_time_ms",
62            DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
63            false,
64        ),
65        Field::new("_full_document", DataType::Utf8, true),
66        Field::new("_update_desc", DataType::Utf8, true),
67        Field::new("_resume_token", DataType::Utf8, false),
68    ]))
69}
70
71/// `MongoDB` CDC source connector.
72///
73/// Streams change events from a `MongoDB` change stream using the
74/// `SourceConnector` trait. Events are buffered internally and
75/// converted to Arrow `RecordBatch`es on `poll_batch`.
76///
77/// # Resume Token Handling
78///
79/// The connector tracks resume tokens from individual change event `_id`
80/// fields. Tokens are persisted via a pluggable [`ResumeTokenStore`].
81///
82/// # Sharded Cluster Note
83///
84/// On sharded clusters, `mongos` opens per-shard cursors and merges
85/// results transparently. Ensure `max_pool_size` is at least as large
86/// as the expected number of concurrent change streams to avoid
87/// connection starvation.
88pub struct MongoDbCdcSource {
89    /// Connector configuration.
90    config: MongoDbSourceConfig,
91
92    /// Current lifecycle state.
93    state: ConnectorState,
94
95    /// Output schema (CDC envelope).
96    schema: SchemaRef,
97
98    /// Lock-free metrics.
99    metrics: Arc<MongoDbCdcMetrics>,
100
101    /// Buffered change events awaiting `poll_batch`.
102    event_buffer: VecDeque<MongoDbChangeEvent>,
103
104    /// Last persisted resume token.
105    last_resume_token: Option<ResumeToken>,
106
107    /// Resume token store.
108    resume_token_store: Box<dyn ResumeTokenStore>,
109
110    /// Notification handle signalled when data arrives from the stream.
111    data_ready: Arc<Notify>,
112
113    /// Whether an invalidate event has been received.
114    invalidated: bool,
115
116    /// Background change stream reader task handle (feature-gated).
117    #[cfg(feature = "mongodb-cdc")]
118    reader_handle: Option<tokio::task::JoinHandle<()>>,
119
120    /// Channel receiver for change events from the background task.
121    #[cfg(feature = "mongodb-cdc")]
122    event_rx: Option<ChangeStreamRx>,
123
124    /// Shutdown signal for the background reader task.
125    #[cfg(feature = "mongodb-cdc")]
126    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
127}
128
129/// Payload from the background change stream reader task.
130#[allow(dead_code)]
131enum ChangeStreamPayload {
132    /// A change event.
133    Event(Box<MongoDbChangeEvent>),
134    /// Fatal error from the reader task.
135    Error(String),
136}
137
138/// Cloneable async sender for the change stream reader → `poll_batch` queue.
139#[cfg(feature = "mongodb-cdc")]
140type ChangeStreamTx = crossfire::MAsyncTx<crossfire::mpsc::Array<ChangeStreamPayload>>;
141/// Single-consumer async receiver for the change stream reader → `poll_batch` queue.
142#[cfg(feature = "mongodb-cdc")]
143type ChangeStreamRx = crossfire::AsyncRx<crossfire::mpsc::Array<ChangeStreamPayload>>;
144
145impl MongoDbCdcSource {
146    /// Creates a new `MongoDB` CDC source with the given configuration.
147    #[must_use]
148    pub fn new(config: MongoDbSourceConfig, registry: Option<&prometheus::Registry>) -> Self {
149        Self {
150            config,
151            state: ConnectorState::Created,
152            schema: mongodb_cdc_envelope_schema(),
153            metrics: Arc::new(MongoDbCdcMetrics::new(registry)),
154            event_buffer: VecDeque::new(),
155            last_resume_token: None,
156            resume_token_store: Box::new(InMemoryResumeTokenStore::new()),
157            data_ready: Arc::new(Notify::new()),
158            invalidated: false,
159            #[cfg(feature = "mongodb-cdc")]
160            reader_handle: None,
161            #[cfg(feature = "mongodb-cdc")]
162            event_rx: None,
163            #[cfg(feature = "mongodb-cdc")]
164            reader_shutdown: None,
165        }
166    }
167
168    /// Creates a new source from a generic [`ConnectorConfig`].
169    ///
170    /// # Errors
171    ///
172    /// Returns `ConnectorError` if the configuration is invalid.
173    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
174        let mongo_config = MongoDbSourceConfig::from_config(config)?;
175        Ok(Self::new(mongo_config, None))
176    }
177
178    /// Sets a custom resume token store.
179    #[must_use]
180    pub fn with_resume_token_store(mut self, store: Box<dyn ResumeTokenStore>) -> Self {
181        self.resume_token_store = store;
182        self
183    }
184
185    /// Returns a reference to the source configuration.
186    #[must_use]
187    pub fn config(&self) -> &MongoDbSourceConfig {
188        &self.config
189    }
190
191    /// Returns the last persisted resume token.
192    #[must_use]
193    pub fn last_resume_token(&self) -> Option<&ResumeToken> {
194        self.last_resume_token.as_ref()
195    }
196
197    /// Returns the number of buffered events.
198    #[must_use]
199    pub fn buffered_events(&self) -> usize {
200        self.event_buffer.len()
201    }
202
203    /// Returns `true` if the stream has been invalidated.
204    #[must_use]
205    pub fn is_invalidated(&self) -> bool {
206        self.invalidated
207    }
208
209    /// Enqueues a change event for processing (used by tests and the
210    /// background reader task).
211    pub fn enqueue_event(&mut self, event: MongoDbChangeEvent) {
212        if event.operation_type == OperationType::Invalidate {
213            self.invalidated = true;
214        }
215        self.metrics.record_event(event.operation_type.as_str());
216        self.event_buffer.push_back(event);
217    }
218
219    /// Drains up to `max_records` events from the buffer and converts
220    /// them to an Arrow `RecordBatch`.
221    ///
222    /// # Errors
223    ///
224    /// Returns `ConnectorError` if Arrow batch construction fails.
225    pub fn drain_to_batch(
226        &mut self,
227        max_records: usize,
228    ) -> Result<Option<SourceBatch>, ConnectorError> {
229        if self.event_buffer.is_empty() {
230            return Ok(None);
231        }
232
233        let count = max_records.min(self.event_buffer.len());
234        let events: Vec<MongoDbChangeEvent> = self.event_buffer.drain(..count).collect();
235
236        let batch = events_to_record_batch(&events, &self.schema)?;
237        self.metrics.record_batch();
238
239        // Track resume token from the last event in this batch.
240        if let Some(last) = events.last() {
241            self.last_resume_token = Some(ResumeToken::new(last.resume_token.clone()));
242        }
243
244        Ok(Some(SourceBatch::new(batch)))
245    }
246}
247
248/// Converts a batch of [`MongoDbChangeEvent`]s to an Arrow `RecordBatch`.
249fn events_to_record_batch(
250    events: &[MongoDbChangeEvent],
251    schema: &SchemaRef,
252) -> Result<RecordBatch, ConnectorError> {
253    let len = events.len();
254
255    let mut ns_builder = StringBuilder::with_capacity(len, len * 32);
256    let mut op_builder = StringBuilder::with_capacity(len, len * 4);
257    let mut dk_builder = StringBuilder::with_capacity(len, len * 64);
258    let mut cts_builder = UInt32Builder::with_capacity(len);
259    let mut ct_inc_builder = UInt32Builder::with_capacity(len);
260    let mut wt_builder = arrow_array::builder::TimestampMillisecondBuilder::with_capacity(len);
261    let mut fd_builder = StringBuilder::with_capacity(len, len * 128);
262    let mut ud_builder = StringBuilder::with_capacity(len, len * 64);
263    let mut rt_builder = StringBuilder::with_capacity(len, len * 64);
264
265    for event in events {
266        ns_builder.append_value(event.namespace.full_name());
267        op_builder.append_value(event.operation_type.as_str());
268        dk_builder.append_value(&event.document_key);
269        cts_builder.append_value(event.cluster_time_secs);
270        ct_inc_builder.append_value(event.cluster_time_inc);
271        wt_builder.append_value(event.wall_time_ms);
272
273        match &event.full_document {
274            Some(doc) => fd_builder.append_value(doc),
275            None => fd_builder.append_null(),
276        }
277
278        match &event.update_description {
279            Some(desc) => {
280                let json = serde_json::to_string(desc)
281                    .map_err(|e| ConnectorError::Internal(format!("serialize update_desc: {e}")))?;
282                ud_builder.append_value(&json);
283            }
284            None => ud_builder.append_null(),
285        }
286
287        rt_builder.append_value(&event.resume_token);
288    }
289
290    RecordBatch::try_new(
291        Arc::clone(schema),
292        vec![
293            Arc::new(ns_builder.finish()),
294            Arc::new(op_builder.finish()),
295            Arc::new(dk_builder.finish()),
296            Arc::new(cts_builder.finish()),
297            Arc::new(ct_inc_builder.finish()),
298            Arc::new(wt_builder.finish()),
299            Arc::new(fd_builder.finish()),
300            Arc::new(ud_builder.finish()),
301            Arc::new(rt_builder.finish()),
302        ],
303    )
304    .map_err(|e| ConnectorError::Internal(format!("arrow batch: {e}")))
305}
306
307#[async_trait]
308impl SourceConnector for MongoDbCdcSource {
309    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
310        self.config.validate()?;
311
312        // Load any persisted resume token.
313        let persisted_token = self
314            .resume_token_store
315            .load()
316            .await
317            .map_err(|e| ConnectorError::Internal(format!("load resume token: {e}")))?;
318
319        if let Some(token) = persisted_token {
320            tracing::info!(resume_token = %token, "resuming from persisted token");
321            self.last_resume_token = Some(token);
322        }
323
324        // Feature-gated: start the background change stream reader.
325        #[cfg(feature = "mongodb-cdc")]
326        {
327            self.start_change_stream_reader().await?;
328        }
329
330        self.state = ConnectorState::Running;
331        tracing::info!(
332            database = %self.config.database,
333            collection = %self.config.collection,
334            full_document_mode = ?self.config.full_document_mode,
335            "MongoDB CDC source opened"
336        );
337
338        Ok(())
339    }
340
341    async fn poll_batch(
342        &mut self,
343        max_records: usize,
344    ) -> Result<Option<SourceBatch>, ConnectorError> {
345        #[cfg(feature = "mongodb-cdc")]
346        {
347            self.drain_channel()?;
348        }
349
350        self.drain_to_batch(max_records)
351    }
352
353    fn schema(&self) -> SchemaRef {
354        Arc::clone(&self.schema)
355    }
356
357    fn checkpoint(&self) -> SourceCheckpoint {
358        let mut cp = SourceCheckpoint::new(0);
359        if let Some(ref token) = self.last_resume_token {
360            cp.set_offset("resume_token", token.as_str());
361        }
362        cp.set_metadata("connector", "mongodb-cdc");
363        cp.set_metadata("database", &self.config.database);
364        cp.set_metadata("collection", &self.config.collection);
365        cp
366    }
367
368    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
369        if let Some(token_str) = checkpoint.get_offset("resume_token") {
370            let token = ResumeToken::new(token_str.to_string());
371            tracing::info!(resume_token = %token, "restoring from checkpoint");
372            self.last_resume_token = Some(token.clone());
373            self.resume_token_store
374                .save(&token)
375                .await
376                .map_err(|e| ConnectorError::Internal(format!("save resume token: {e}")))?;
377        }
378        Ok(())
379    }
380
381    fn health_check(&self) -> HealthStatus {
382        match self.state {
383            ConnectorState::Running => {
384                if self.invalidated {
385                    HealthStatus::Degraded("change stream invalidated".to_string())
386                } else {
387                    HealthStatus::Healthy
388                }
389            }
390            ConnectorState::Created => HealthStatus::Unknown,
391            ConnectorState::Closed | ConnectorState::Failed => {
392                HealthStatus::Unhealthy("closed".to_string())
393            }
394            _ => HealthStatus::Degraded(format!("state: {}", self.state)),
395        }
396    }
397
398    fn metrics(&self) -> ConnectorMetrics {
399        self.metrics.to_connector_metrics()
400    }
401
402    async fn close(&mut self) -> Result<(), ConnectorError> {
403        // Persist the last resume token before shutting down.
404        if let Some(ref token) = self.last_resume_token {
405            if let Err(e) = self.resume_token_store.save(token).await {
406                tracing::warn!(error = %e, "failed to persist resume token on close");
407            }
408        }
409
410        // Shut down the background reader task.
411        #[cfg(feature = "mongodb-cdc")]
412        {
413            if let Some(tx) = self.reader_shutdown.take() {
414                let _ = tx.send(true);
415            }
416            if let Some(handle) = self.reader_handle.take() {
417                let _ = handle.await;
418            }
419        }
420
421        self.state = ConnectorState::Closed;
422        tracing::info!("MongoDB CDC source closed");
423        Ok(())
424    }
425
426    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
427        Some(Arc::clone(&self.data_ready))
428    }
429
430    fn supports_replay(&self) -> bool {
431        // MongoDB change streams support resume from a token.
432        true
433    }
434}
435
436// ── Feature-gated I/O (real MongoDB driver) ──
437
438#[cfg(feature = "mongodb-cdc")]
439impl MongoDbCdcSource {
440    /// Starts the background change stream reader task.
441    async fn start_change_stream_reader(&mut self) -> Result<(), ConnectorError> {
442        use mongodb::options::ClientOptions;
443
444        let client_options = ClientOptions::parse(&self.config.connection_uri)
445            .await
446            .map_err(|e| ConnectorError::ConnectionFailed(format!("parse URI: {e}")))?;
447
448        // Warn if pool size may be too small for sharded clusters.
449        if let Some(pool) = client_options.max_pool_size {
450            if pool <= 1 {
451                tracing::warn!(
452                    max_pool_size = pool,
453                    "max_pool_size is very small — on sharded clusters, \
454                     mongos opens per-shard cursors and may exhaust the pool"
455                );
456            }
457        }
458
459        let client = mongodb::Client::with_options(client_options)
460            .map_err(|e| ConnectorError::ConnectionFailed(format!("create client: {e}")))?;
461
462        let db = client.database(&self.config.database);
463
464        // Pre-flight: detect time series collections.
465        if self.config.collection != "*" {
466            preflight_timeseries_guard(&db, &self.config.database, &self.config.collection).await?;
467        }
468
469        let (tx, rx) =
470            crossfire::mpsc::bounded_async::<ChangeStreamPayload>(self.config.max_buffered_events);
471        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
472
473        let config = self.config.clone();
474        let resume_token = self.last_resume_token.clone();
475        let data_ready = Arc::clone(&self.data_ready);
476        let metrics = Arc::clone(&self.metrics);
477
478        let handle = tokio::spawn(async move {
479            if let Err(e) = run_change_stream_reader(
480                db,
481                config,
482                resume_token,
483                tx,
484                shutdown_rx,
485                data_ready,
486                metrics,
487            )
488            .await
489            {
490                tracing::error!(error = %e, "change stream reader task failed");
491            }
492        });
493
494        self.reader_handle = Some(handle);
495        self.event_rx = Some(rx);
496        self.reader_shutdown = Some(shutdown_tx);
497
498        Ok(())
499    }
500
501    /// Drains events from the background reader channel into the buffer.
502    fn drain_channel(&mut self) -> Result<(), ConnectorError> {
503        // Collect payloads first to avoid double borrow.
504        let mut payloads = Vec::new();
505        if let Some(rx) = &mut self.event_rx {
506            while let Ok(payload) = rx.try_recv() {
507                payloads.push(payload);
508            }
509        }
510
511        for payload in payloads {
512            match payload {
513                ChangeStreamPayload::Event(event) => {
514                    self.enqueue_event(*event);
515                }
516                ChangeStreamPayload::Error(msg) => {
517                    self.metrics.record_error();
518                    return Err(ConnectorError::ReadError(msg));
519                }
520            }
521        }
522        Ok(())
523    }
524}
525
526/// Pre-flight check: reject time series collections.
527#[cfg(feature = "mongodb-cdc")]
528async fn preflight_timeseries_guard(
529    db: &mongodb::Database,
530    database: &str,
531    collection: &str,
532) -> Result<(), ConnectorError> {
533    use futures_util::StreamExt;
534    use mongodb::bson::doc;
535
536    let filter = doc! { "name": collection };
537    let mut cursor = db
538        .list_collections()
539        .filter(filter)
540        .await
541        .map_err(|e| ConnectorError::ConnectionFailed(format!("list collections: {e}")))?;
542    if let Some(result) = cursor.next().await {
543        let spec = result
544            .map_err(|e| ConnectorError::ConnectionFailed(format!("read collection spec: {e}")))?;
545        // Check if collection_type indicates time series.
546        if spec.collection_type == mongodb::results::CollectionType::Timeseries {
547            return Err(ConnectorError::ConfigurationError(format!(
548                "time series collection {database}.{collection} does not support change streams"
549            )));
550        }
551    }
552
553    Ok(())
554}
555
556/// Maximum consecutive failures before the reader gives up.
557#[cfg(feature = "mongodb-cdc")]
558const MAX_FAILURES: u32 = 10;
559
560/// Background task that reads from the `MongoDB` change stream and sends
561/// events to the source via a channel.
562///
563/// Uses a `'reconnect` / `'recv` double-loop pattern (mirroring the
564/// Postgres CDC source) with exponential backoff capped at 30 seconds.
565#[cfg(feature = "mongodb-cdc")]
566#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
567async fn run_change_stream_reader(
568    db: mongodb::Database,
569    config: MongoDbSourceConfig,
570    resume_token: Option<ResumeToken>,
571    tx: ChangeStreamTx,
572    mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
573    data_ready: Arc<Notify>,
574    metrics: Arc<MongoDbCdcMetrics>,
575) -> Result<(), ConnectorError> {
576    use futures_util::StreamExt;
577    use mongodb::options::ChangeStreamOptions;
578
579    let full_document = match config.full_document_mode {
580        super::config::FullDocumentMode::Delta => None,
581        super::config::FullDocumentMode::UpdateLookup => {
582            Some(mongodb::options::FullDocumentType::UpdateLookup)
583        }
584        super::config::FullDocumentMode::RequirePostImage => {
585            Some(mongodb::options::FullDocumentType::Required)
586        }
587        super::config::FullDocumentMode::WhenAvailable => {
588            Some(mongodb::options::FullDocumentType::WhenAvailable)
589        }
590    };
591
592    // Initialize the last resume token for reconnection.
593    let mut last_token: Option<mongodb::change_stream::event::ResumeToken> =
594        if let Some(ref token) = resume_token {
595            serde_json::from_str::<mongodb::change_stream::event::ResumeToken>(token.as_str())
596                .map(Some)
597                .map_err(|e| ConnectorError::ReadError(format!("parse resume token: {e}")))?
598        } else {
599            None
600        };
601
602    // Build a static pipeline (without $changeStreamSplitLargeEvent — the
603    // mongodb v3 driver drops the splitEvent field during deserialization,
604    // so fragment detection is not possible; see config validation).
605    let pipeline: Vec<mongodb::bson::Document> = config
606        .pipeline
607        .iter()
608        .filter_map(|v| mongodb::bson::to_document(v).ok())
609        .collect();
610
611    let mut current_db = db;
612    let mut consecutive_failures: u32 = 0;
613
614    'reconnect: loop {
615        // Build options for this connection attempt.
616        let mut options = ChangeStreamOptions::default();
617        options.full_document = full_document.clone();
618        options.max_await_time = config
619            .max_await_time_ms
620            .map(std::time::Duration::from_millis);
621        options.batch_size = config.batch_size;
622        options.resume_after = last_token.clone();
623
624        // Only set start_at_operation_time on fresh starts (no resume token).
625        if last_token.is_none() {
626            if let Some((secs, inc)) = config.start_at_operation_time {
627                options.start_at_operation_time = Some(mongodb::bson::Timestamp {
628                    time: secs,
629                    increment: inc,
630                });
631            }
632        }
633
634        // Open the change stream cursor.
635        let cursor_result = if config.collection == "*" {
636            current_db
637                .watch()
638                .pipeline(pipeline.clone())
639                .with_options(options)
640                .await
641        } else {
642            current_db
643                .collection::<mongodb::bson::Document>(&config.collection)
644                .watch()
645                .pipeline(pipeline.clone())
646                .with_options(options)
647                .await
648        };
649
650        let mut cursor = match cursor_result {
651            Ok(c) => c,
652            Err(e) => {
653                consecutive_failures += 1;
654                if consecutive_failures >= MAX_FAILURES {
655                    let msg =
656                        format!("change stream open failed after {MAX_FAILURES} attempts: {e}");
657                    tracing::error!(%msg);
658                    let _ = tx.send(ChangeStreamPayload::Error(msg)).await;
659                    break 'reconnect;
660                }
661                let backoff_secs = (1u64 << consecutive_failures).min(30);
662                tracing::warn!(
663                    attempt = consecutive_failures,
664                    backoff_secs,
665                    error = %e,
666                    "failed to open change stream, retrying"
667                );
668                metrics.record_reconnect();
669                tokio::select! {
670                    _ = shutdown_rx.changed() => {
671                        if *shutdown_rx.borrow() {
672                            break 'reconnect;
673                        }
674                    }
675                    () = tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)) => {}
676                }
677                continue 'reconnect;
678            }
679        };
680
681        tracing::info!(
682            database = %config.database,
683            collection = %config.collection,
684            resumed = last_token.is_some(),
685            "change stream reader started"
686        );
687
688        'recv: loop {
689            tokio::select! {
690                biased;
691                _ = shutdown_rx.changed() => {
692                    if *shutdown_rx.borrow() {
693                        tracing::info!("change stream reader shutting down");
694                        break 'reconnect;
695                    }
696                }
697                next = cursor.next() => {
698                    match next {
699                        Some(Ok(cs_event)) => {
700                            consecutive_failures = 0;
701
702                            // Track resume token for reconnection.
703                            last_token = Some(cs_event.id.clone());
704
705                            let change_event = parse_change_stream_event(&cs_event);
706
707                            if tx
708                                .send(ChangeStreamPayload::Event(Box::new(change_event)))
709                                .await
710                                .is_err()
711                            {
712                                tracing::warn!("source channel closed, stopping reader");
713                                break 'reconnect;
714                            }
715                            data_ready.notify_one();
716                        }
717                        Some(Err(e)) => {
718                            tracing::error!(error = %e, "change stream error");
719                            break 'recv;
720                        }
721                        None => {
722                            tracing::info!("change stream cursor exhausted");
723                            consecutive_failures = 0;
724                            break 'recv;
725                        }
726                    }
727                }
728            }
729        }
730
731        // Exited recv loop due to error or cursor exhaustion — attempt reconnect.
732        consecutive_failures += 1;
733        if consecutive_failures >= MAX_FAILURES {
734            let msg = format!("change stream failed after {MAX_FAILURES} consecutive failures");
735            tracing::error!(%msg);
736            let _ = tx.send(ChangeStreamPayload::Error(msg)).await;
737            break 'reconnect;
738        }
739
740        let backoff_secs = (1u64 << consecutive_failures).min(30);
741        tracing::warn!(
742            resume_token = ?last_token,
743            attempt = consecutive_failures,
744            backoff_secs,
745            "reconnecting change stream"
746        );
747        metrics.record_reconnect();
748
749        // Interruptible backoff.
750        tokio::select! {
751            _ = shutdown_rx.changed() => {
752                if *shutdown_rx.borrow() {
753                    break 'reconnect;
754                }
755            }
756            () = tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)) => {}
757        }
758
759        // Re-create client and database for reconnection.
760        match mongodb::options::ClientOptions::parse(&config.connection_uri).await {
761            Ok(new_opts) => match mongodb::Client::with_options(new_opts) {
762                Ok(new_client) => {
763                    current_db = new_client.database(&config.database);
764                }
765                Err(e) => {
766                    tracing::warn!(error = %e, "failed to create client on reconnect");
767                }
768            },
769            Err(e) => {
770                tracing::warn!(error = %e, "failed to parse URI on reconnect");
771            }
772        }
773    }
774
775    Ok(())
776}
777
778/// Parses a `ChangeStreamEvent<Document>` into a [`MongoDbChangeEvent`].
779#[cfg(feature = "mongodb-cdc")]
780fn parse_change_stream_event(
781    event: &mongodb::change_stream::event::ChangeStreamEvent<mongodb::bson::Document>,
782) -> MongoDbChangeEvent {
783    use super::change_event::{Namespace, UpdateDescription};
784    use mongodb::change_stream::event::OperationType as MongoOpType;
785
786    let operation_type = match event.operation_type {
787        MongoOpType::Insert => OperationType::Insert,
788        MongoOpType::Update => OperationType::Update,
789        MongoOpType::Replace => OperationType::Replace,
790        MongoOpType::Delete => OperationType::Delete,
791        MongoOpType::Drop => OperationType::Drop,
792        MongoOpType::Rename => OperationType::Rename,
793        MongoOpType::Invalidate => OperationType::Invalidate,
794        MongoOpType::DropDatabase => OperationType::DropDatabase,
795        ref other => {
796            tracing::warn!(?other, "unmapped MongoDB operation type");
797            OperationType::Other(format!("{other:?}"))
798        }
799    };
800
801    let namespace = event.ns.as_ref().map_or_else(
802        || Namespace {
803            db: String::new(),
804            coll: String::new(),
805        },
806        |ns| Namespace {
807            db: ns.db.clone(),
808            coll: ns.coll.clone().unwrap_or_default(),
809        },
810    );
811
812    let document_key = event
813        .document_key
814        .as_ref()
815        .and_then(|d| serde_json::to_string(d).ok())
816        .unwrap_or_default();
817
818    let full_document = event
819        .full_document
820        .as_ref()
821        .and_then(|d| serde_json::to_string(d).ok());
822
823    let update_description = event.update_description.as_ref().map(|ud| {
824        let updated_fields = ud
825            .updated_fields
826            .iter()
827            .filter_map(|(k, v)| serde_json::to_value(v).ok().map(|jv| (k.clone(), jv)))
828            .collect();
829
830        let removed_fields = ud.removed_fields.clone();
831
832        #[allow(clippy::cast_sign_loss)]
833        let truncated_arrays = ud
834            .truncated_arrays
835            .as_deref()
836            .unwrap_or_default()
837            .iter()
838            .map(|t| super::change_event::TruncatedArray {
839                field: t.field.clone(),
840                new_size: t.new_size as u32,
841            })
842            .collect();
843
844        UpdateDescription {
845            updated_fields,
846            removed_fields,
847            truncated_arrays,
848        }
849    });
850
851    let (cluster_time_secs, cluster_time_inc) = event
852        .cluster_time
853        .map_or((0, 0), |ts| (ts.time, ts.increment));
854
855    let wall_time_ms = event
856        .wall_time
857        .map_or(0, mongodb::bson::DateTime::timestamp_millis);
858
859    // Serialize the ResumeToken via serde (it implements Serialize).
860    let resume_token = serde_json::to_string(&event.id).unwrap_or_default();
861
862    MongoDbChangeEvent {
863        operation_type,
864        namespace,
865        document_key,
866        full_document,
867        update_description,
868        cluster_time_secs,
869        cluster_time_inc,
870        resume_token,
871        wall_time_ms,
872    }
873}
874
875#[cfg(test)]
876mod tests {
877    use super::super::change_event::Namespace;
878    use super::*;
879
880    fn sample_event(op: OperationType) -> MongoDbChangeEvent {
881        MongoDbChangeEvent {
882            operation_type: op,
883            namespace: Namespace {
884                db: "testdb".to_string(),
885                coll: "users".to_string(),
886            },
887            document_key: r#"{"_id": "1"}"#.to_string(),
888            full_document: Some(r#"{"_id": "1", "name": "Alice"}"#.to_string()),
889            update_description: None,
890            cluster_time_secs: 1_700_000_000,
891            cluster_time_inc: 1,
892            resume_token: r#"{"_data": "token1"}"#.to_string(),
893            wall_time_ms: 1_700_000_000_000,
894        }
895    }
896
897    #[test]
898    fn test_schema() {
899        let schema = mongodb_cdc_envelope_schema();
900        assert_eq!(schema.fields().len(), 9);
901        assert_eq!(schema.field(0).name(), "_namespace");
902        assert_eq!(schema.field(6).name(), "_full_document");
903        assert!(schema.field(6).is_nullable());
904    }
905
906    #[test]
907    fn test_new_source() {
908        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
909        let source = MongoDbCdcSource::new(config, None);
910        assert_eq!(source.buffered_events(), 0);
911        assert!(!source.is_invalidated());
912        assert!(source.last_resume_token().is_none());
913    }
914
915    #[test]
916    fn test_enqueue_event() {
917        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
918        let mut source = MongoDbCdcSource::new(config, None);
919
920        source.enqueue_event(sample_event(OperationType::Insert));
921        assert_eq!(source.buffered_events(), 1);
922        assert!(!source.is_invalidated());
923    }
924
925    #[test]
926    fn test_enqueue_invalidate() {
927        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
928        let mut source = MongoDbCdcSource::new(config, None);
929
930        let mut event = sample_event(OperationType::Invalidate);
931        event.full_document = None;
932        source.enqueue_event(event);
933        assert!(source.is_invalidated());
934    }
935
936    #[test]
937    fn test_events_to_record_batch() {
938        let schema = mongodb_cdc_envelope_schema();
939        let events = vec![
940            sample_event(OperationType::Insert),
941            sample_event(OperationType::Delete),
942        ];
943
944        let batch = events_to_record_batch(&events, &schema).unwrap();
945        assert_eq!(batch.num_rows(), 2);
946        assert_eq!(batch.num_columns(), 9);
947    }
948
949    #[test]
950    fn test_events_to_record_batch_empty() {
951        let schema = mongodb_cdc_envelope_schema();
952        let batch = events_to_record_batch(&[], &schema).unwrap();
953        assert_eq!(batch.num_rows(), 0);
954        assert_eq!(batch.num_columns(), 9);
955    }
956
957    #[test]
958    fn test_drain_to_batch() {
959        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
960        let mut source = MongoDbCdcSource::new(config, None);
961
962        // Empty buffer returns None.
963        assert!(source.drain_to_batch(10).unwrap().is_none());
964
965        // Add events and drain.
966        for _ in 0..5 {
967            source.enqueue_event(sample_event(OperationType::Insert));
968        }
969        let batch = source.drain_to_batch(3).unwrap().unwrap();
970        assert_eq!(batch.num_rows(), 3);
971        assert_eq!(source.buffered_events(), 2);
972
973        // Drain remaining.
974        let batch = source.drain_to_batch(10).unwrap().unwrap();
975        assert_eq!(batch.num_rows(), 2);
976        assert_eq!(source.buffered_events(), 0);
977    }
978
979    #[test]
980    fn test_checkpoint() {
981        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "testdb", "users");
982        let mut source = MongoDbCdcSource::new(config, None);
983
984        // Without resume token.
985        let cp = source.checkpoint();
986        assert!(cp.get_offset("resume_token").is_none());
987        assert_eq!(cp.get_metadata("connector"), Some("mongodb-cdc"));
988
989        // With resume token.
990        source.last_resume_token = Some(ResumeToken::new("tok123".to_string()));
991        let cp = source.checkpoint();
992        assert_eq!(cp.get_offset("resume_token"), Some("tok123"));
993    }
994
995    #[tokio::test]
996    async fn test_restore_checkpoint() {
997        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
998        let mut source = MongoDbCdcSource::new(config, None);
999
1000        let mut cp = SourceCheckpoint::new(0);
1001        cp.set_offset("resume_token", "restored_token");
1002
1003        source.restore(&cp).await.unwrap();
1004        assert_eq!(
1005            source.last_resume_token().unwrap().as_str(),
1006            "restored_token"
1007        );
1008    }
1009
1010    #[test]
1011    fn test_health_check() {
1012        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
1013        let mut source = MongoDbCdcSource::new(config, None);
1014
1015        assert_eq!(source.health_check(), HealthStatus::Unknown);
1016
1017        source.state = ConnectorState::Running;
1018        assert_eq!(source.health_check(), HealthStatus::Healthy);
1019
1020        source.invalidated = true;
1021        assert!(matches!(source.health_check(), HealthStatus::Degraded(_)));
1022
1023        source.state = ConnectorState::Closed;
1024        assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
1025    }
1026
1027    #[test]
1028    fn test_drain_tracks_resume_token() {
1029        let config = MongoDbSourceConfig::new("mongodb://localhost:27017", "db", "coll");
1030        let mut source = MongoDbCdcSource::new(config, None);
1031
1032        let mut event = sample_event(OperationType::Insert);
1033        event.resume_token = r#"{"_data": "final_token"}"#.to_string();
1034        source.enqueue_event(event);
1035
1036        source.drain_to_batch(10).unwrap();
1037        assert_eq!(
1038            source.last_resume_token().unwrap().as_str(),
1039            r#"{"_data": "final_token"}"#
1040        );
1041    }
1042}