Skip to main content

laminar_connectors/mongodb/
sink.rs

1//! `MongoDB` sink connector implementation.
2//!
3//! Implements [`SinkConnector`] for writing Arrow `RecordBatch` data to
4//! `MongoDB` collections. Supports insert, upsert, replace, and CDC replay
5//! write modes, with optional time series collection support.
6//!
7//! # Architecture
8//!
9//! - **Ring 0**: No sink code — data arrives via SPSC channel (~5ns push)
10//! - **Ring 1**: Batch buffering, write dispatch, flush management
11//! - **Ring 2**: Connection pool, collection creation, write concern config
12//!
13//! # Batching
14//!
15//! Writes are buffered up to `batch_size` records and flushed when:
16//! - The batch is full
17//! - `flush_interval` has elapsed
18//! - A shutdown signal or epoch boundary is reached
19//!
20//! Insert mode uses `insert_many` for batch efficiency. Upsert, replace,
21//! and CDC replay modes issue individual operations per document.
22
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use arrow_array::RecordBatch;
27use arrow_schema::{DataType, SchemaRef};
28use async_trait::async_trait;
29use tracing::{debug, info};
30
31use crate::config::{ConnectorConfig, ConnectorState};
32use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
33use crate::error::ConnectorError;
34use crate::health::HealthStatus;
35use crate::metrics::ConnectorMetrics;
36
37use super::config::MongoDbSinkConfig;
38use super::metrics::MongoDbSinkMetrics;
39use super::timeseries::CollectionKind;
40use super::write_model::WriteMode;
41
42/// `MongoDB` sink connector.
43///
44/// Writes Arrow `RecordBatch` records to a `MongoDB` collection using
45/// configurable write modes. Supports standard and time series collections.
46pub struct MongoDbSink {
47    /// Sink configuration.
48    config: MongoDbSinkConfig,
49
50    /// Arrow schema for input batches.
51    schema: SchemaRef,
52
53    /// Connector lifecycle state.
54    state: ConnectorState,
55
56    /// Buffered records awaiting flush.
57    buffer: Vec<RecordBatch>,
58
59    /// Total rows in buffer.
60    buffered_rows: usize,
61
62    /// Last flush time.
63    last_flush: Instant,
64
65    /// Sink metrics.
66    metrics: MongoDbSinkMetrics,
67
68    /// `MongoDB` client (feature-gated).
69    #[cfg(feature = "mongodb-cdc")]
70    client: Option<mongodb::Client>,
71
72    /// Target collection handle (feature-gated).
73    #[cfg(feature = "mongodb-cdc")]
74    collection: Option<mongodb::Collection<mongodb::bson::Document>>,
75}
76
77impl MongoDbSink {
78    /// Creates a new `MongoDB` sink connector.
79    #[must_use]
80    pub fn new(
81        schema: SchemaRef,
82        config: MongoDbSinkConfig,
83        registry: Option<&prometheus::Registry>,
84    ) -> Self {
85        let buf_capacity = (config.batch_size / 128).max(4);
86        Self {
87            config,
88            schema,
89            state: ConnectorState::Created,
90            buffer: Vec::with_capacity(buf_capacity),
91            buffered_rows: 0,
92            last_flush: Instant::now(),
93            metrics: MongoDbSinkMetrics::new(registry),
94            #[cfg(feature = "mongodb-cdc")]
95            client: None,
96            #[cfg(feature = "mongodb-cdc")]
97            collection: None,
98        }
99    }
100
101    /// Creates a new sink from a generic [`ConnectorConfig`].
102    ///
103    /// # Errors
104    ///
105    /// Returns `ConnectorError` if the configuration is invalid.
106    pub fn from_config(
107        schema: SchemaRef,
108        config: &ConnectorConfig,
109    ) -> Result<Self, ConnectorError> {
110        let mongo_config = MongoDbSinkConfig::from_config(config)?;
111        Ok(Self::new(schema, mongo_config, None))
112    }
113
114    /// Returns a reference to the sink configuration.
115    #[must_use]
116    pub fn config(&self) -> &MongoDbSinkConfig {
117        &self.config
118    }
119
120    /// Returns the number of buffered rows.
121    #[must_use]
122    pub fn buffered_rows(&self) -> usize {
123        self.buffered_rows
124    }
125
126    /// Returns whether a flush is needed based on batch size or interval.
127    #[must_use]
128    fn should_flush(&self) -> bool {
129        self.buffered_rows >= self.config.batch_size
130            || self.last_flush.elapsed() >= self.config.flush_interval()
131    }
132
133    /// Converts buffered Arrow batches to JSON documents for writing.
134    ///
135    /// Returns `(docs, byte_estimate)`. The byte estimate is accumulated
136    /// during conversion to avoid a redundant serialization pass later.
137    fn batches_to_json_docs(&self) -> (Vec<serde_json::Value>, u64) {
138        let mut docs = Vec::with_capacity(self.buffered_rows);
139        let mut byte_estimate: u64 = 0;
140
141        for batch in &self.buffer {
142            for row_idx in 0..batch.num_rows() {
143                let mut doc = serde_json::Map::new();
144
145                for (col_idx, field) in batch.schema().fields().iter().enumerate() {
146                    let col = batch.column(col_idx);
147                    let value = arrow_value_to_json(col, row_idx);
148                    doc.insert(field.name().clone(), value);
149                }
150
151                let val = serde_json::Value::Object(doc);
152                byte_estimate += serde_json::to_string(&val).map_or(0, |s| s.len() as u64);
153                docs.push(val);
154            }
155        }
156
157        (docs, byte_estimate)
158    }
159
160    /// Clears the internal buffer after a flush.
161    fn clear_buffer(&mut self) {
162        self.buffer.clear();
163        self.buffered_rows = 0;
164        self.last_flush = Instant::now();
165    }
166
167    /// Internal flush that returns a [`WriteResult`] with actual counts.
168    ///
169    /// Both `write_batch` (on auto-flush) and `flush` delegate here.
170    async fn flush_inner(&mut self) -> Result<WriteResult, ConnectorError> {
171        if self.buffer.is_empty() {
172            return Ok(WriteResult::new(0, 0));
173        }
174
175        let (docs, byte_estimate) = self.batches_to_json_docs();
176        let doc_count = docs.len();
177
178        #[cfg(feature = "mongodb-cdc")]
179        {
180            self.write_docs(&docs).await?;
181        }
182
183        #[cfg(not(feature = "mongodb-cdc"))]
184        {
185            debug!(
186                count = doc_count,
187                "flush (no-op without mongodb-cdc feature)"
188            );
189        }
190
191        self.metrics.record_flush(doc_count as u64, byte_estimate);
192        self.clear_buffer();
193
194        Ok(WriteResult::new(doc_count, byte_estimate))
195    }
196}
197
198/// Extracts a JSON value from an Arrow array at the given row index.
199fn arrow_value_to_json(col: &dyn arrow_array::Array, row: usize) -> serde_json::Value {
200    use arrow_array::{BooleanArray, LargeStringArray, StringArray};
201
202    if col.is_null(row) {
203        return serde_json::Value::Null;
204    }
205
206    match col.data_type() {
207        DataType::Boolean => {
208            let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
209            serde_json::Value::Bool(arr.value(row))
210        }
211        DataType::Int8
212        | DataType::Int16
213        | DataType::Int32
214        | DataType::Int64
215        | DataType::UInt8
216        | DataType::UInt16
217        | DataType::UInt32
218        | DataType::UInt64 => json_from_primitive(col, row),
219        DataType::Float32 | DataType::Float64 => json_from_float(col, row),
220        DataType::Utf8 => {
221            let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
222            serde_json::Value::String(arr.value(row).to_string())
223        }
224        DataType::LargeUtf8 => {
225            let arr = col.as_any().downcast_ref::<LargeStringArray>().unwrap();
226            serde_json::Value::String(arr.value(row).to_string())
227        }
228        _ => {
229            // Fallback: use Arrow's display format.
230            let formatted = arrow_cast::display::ArrayFormatter::try_new(
231                col,
232                &arrow_cast::display::FormatOptions::default(),
233            );
234            match formatted {
235                Ok(fmt) => serde_json::Value::String(fmt.value(row).to_string()),
236                Err(_) => serde_json::Value::Null,
237            }
238        }
239    }
240}
241
242/// Helper to extract a JSON number from a primitive Arrow array.
243fn json_from_primitive(col: &dyn arrow_array::Array, row: usize) -> serde_json::Value {
244    let formatted = arrow_cast::display::ArrayFormatter::try_new(
245        col,
246        &arrow_cast::display::FormatOptions::default(),
247    );
248    match formatted {
249        Ok(fmt) => {
250            let s = fmt.value(row).to_string();
251            if let Ok(n) = s.parse::<i64>() {
252                serde_json::Value::Number(n.into())
253            } else {
254                serde_json::Value::String(s)
255            }
256        }
257        Err(_) => serde_json::Value::Null,
258    }
259}
260
261/// Helper to extract a JSON number from a float Arrow array.
262fn json_from_float(col: &dyn arrow_array::Array, row: usize) -> serde_json::Value {
263    let formatted = arrow_cast::display::ArrayFormatter::try_new(
264        col,
265        &arrow_cast::display::FormatOptions::default(),
266    );
267    match formatted {
268        Ok(fmt) => {
269            let s = fmt.value(row).to_string();
270            if let Ok(n) = s.parse::<f64>() {
271                serde_json::json!(n)
272            } else {
273                serde_json::Value::String(s)
274            }
275        }
276        Err(_) => serde_json::Value::Null,
277    }
278}
279
280#[async_trait]
281impl SinkConnector for MongoDbSink {
282    async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
283        self.config.validate()?;
284
285        #[cfg(feature = "mongodb-cdc")]
286        {
287            self.connect().await?;
288        }
289
290        self.state = ConnectorState::Running;
291        info!(
292            database = %self.config.database,
293            collection = %self.config.collection,
294            write_mode = ?self.config.write_mode,
295            ordered = self.config.ordered,
296            "MongoDB sink opened"
297        );
298
299        Ok(())
300    }
301
302    async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
303        let rows = batch.num_rows();
304        if rows == 0 {
305            return Ok(WriteResult::new(0, 0));
306        }
307
308        self.buffer.push(batch.clone());
309        self.buffered_rows += rows;
310
311        if self.should_flush() {
312            return self.flush_inner().await;
313        }
314
315        // Just buffered, nothing written yet.
316        Ok(WriteResult::new(0, 0))
317    }
318
319    fn schema(&self) -> SchemaRef {
320        Arc::clone(&self.schema)
321    }
322
323    fn health_check(&self) -> HealthStatus {
324        match self.state {
325            ConnectorState::Running => HealthStatus::Healthy,
326            ConnectorState::Created => HealthStatus::Unknown,
327            ConnectorState::Closed | ConnectorState::Failed => {
328                HealthStatus::Unhealthy("closed".to_string())
329            }
330            _ => HealthStatus::Degraded(format!("state: {}", self.state)),
331        }
332    }
333
334    fn metrics(&self) -> ConnectorMetrics {
335        self.metrics.to_connector_metrics()
336    }
337
338    fn capabilities(&self) -> SinkConnectorCapabilities {
339        let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(30)).with_idempotent();
340
341        if matches!(self.config.write_mode, WriteMode::Upsert { .. }) {
342            caps = caps.with_upsert();
343        }
344        if matches!(self.config.write_mode, WriteMode::CdcReplay) {
345            caps = caps.with_changelog();
346        }
347
348        caps
349    }
350
351    async fn flush(&mut self) -> Result<(), ConnectorError> {
352        self.flush_inner().await.map(|_| ())
353    }
354
355    async fn close(&mut self) -> Result<(), ConnectorError> {
356        // Flush remaining buffered data.
357        if !self.buffer.is_empty() {
358            self.flush().await?;
359        }
360
361        self.state = ConnectorState::Closed;
362        info!("MongoDB sink closed");
363        Ok(())
364    }
365}
366
367// ── Feature-gated I/O (real MongoDB driver) ──
368
369#[cfg(feature = "mongodb-cdc")]
370impl MongoDbSink {
371    /// Connects to `MongoDB` and sets up the target collection with write concern.
372    async fn connect(&mut self) -> Result<(), ConnectorError> {
373        use mongodb::options::{ClientOptions, CollectionOptions};
374
375        let client_options = ClientOptions::parse(&self.config.connection_uri)
376            .await
377            .map_err(|e| ConnectorError::ConnectionFailed(format!("parse URI: {e}")))?;
378
379        let client = mongodb::Client::with_options(client_options)
380            .map_err(|e| ConnectorError::ConnectionFailed(format!("create client: {e}")))?;
381
382        let db = client.database(&self.config.database);
383
384        // Ensure time series collection exists if configured.
385        if let CollectionKind::TimeSeries(ref ts_config) = self.config.collection_kind {
386            self.ensure_timeseries_collection(&db, ts_config).await?;
387        }
388
389        // Apply write concern from configuration.
390        let wc = {
391            use super::config::WriteConcernLevel;
392            let mut wc = mongodb::options::WriteConcern::default();
393            wc.w = Some(match &self.config.write_concern.w {
394                WriteConcernLevel::Majority => mongodb::options::Acknowledgment::Majority,
395                WriteConcernLevel::Nodes(n) => mongodb::options::Acknowledgment::Nodes(*n),
396            });
397            wc.journal = Some(self.config.write_concern.journal);
398            wc.w_timeout = self
399                .config
400                .write_concern
401                .timeout_ms
402                .map(std::time::Duration::from_millis);
403            wc
404        };
405
406        let coll_opts = CollectionOptions::builder().write_concern(wc).build();
407
408        let collection = db
409            .collection_with_options::<mongodb::bson::Document>(&self.config.collection, coll_opts);
410
411        self.client = Some(client);
412        self.collection = Some(collection);
413
414        Ok(())
415    }
416
417    /// Ensures a time series collection exists with the correct configuration.
418    async fn ensure_timeseries_collection(
419        &self,
420        db: &mongodb::Database,
421        ts_config: &super::timeseries::TimeSeriesConfig,
422    ) -> Result<(), ConnectorError> {
423        use mongodb::bson::doc;
424
425        let mut ts_opts = doc! {
426            "timeField": &ts_config.time_field,
427        };
428
429        if let Some(ref meta) = ts_config.meta_field {
430            ts_opts.insert("metaField", meta);
431        }
432
433        match ts_config.granularity {
434            super::timeseries::TimeSeriesGranularity::Seconds => {
435                ts_opts.insert("granularity", "seconds");
436            }
437            super::timeseries::TimeSeriesGranularity::Minutes => {
438                ts_opts.insert("granularity", "minutes");
439            }
440            super::timeseries::TimeSeriesGranularity::Hours => {
441                ts_opts.insert("granularity", "hours");
442            }
443            super::timeseries::TimeSeriesGranularity::Custom {
444                bucket_max_span_seconds,
445                bucket_rounding_seconds,
446            } => {
447                ts_opts.insert("bucketMaxSpanSeconds", i64::from(bucket_max_span_seconds));
448                ts_opts.insert("bucketRoundingSeconds", i64::from(bucket_rounding_seconds));
449            }
450        }
451
452        let mut create_opts = doc! {
453            "create": &self.config.collection,
454            "timeseries": ts_opts,
455        };
456
457        if let Some(ttl) = ts_config.expire_after_seconds {
458            #[allow(clippy::cast_possible_wrap)]
459            create_opts.insert("expireAfterSeconds", ttl as i64);
460        }
461
462        // Try creating; ignore "already exists" errors.
463        match db.run_command(create_opts).await {
464            Ok(_) => {
465                info!(
466                    collection = %self.config.collection,
467                    time_field = %ts_config.time_field,
468                    granularity = %ts_config.granularity,
469                    "created time series collection"
470                );
471            }
472            Err(e) => {
473                let msg = e.to_string();
474                if !msg.contains("already exists") && !msg.contains("NamespaceExists") {
475                    return Err(ConnectorError::ConnectionFailed(format!(
476                        "create time series collection: {e}"
477                    )));
478                }
479                debug!(
480                    collection = %self.config.collection,
481                    "time series collection already exists"
482                );
483            }
484        }
485
486        Ok(())
487    }
488
489    /// Extracts a CDC envelope field that may be a JSON string (from Utf8 Arrow
490    /// columns) or already a JSON object. Parses strings into objects for BSON
491    /// conversion.
492    fn parse_cdc_field<'a>(
493        val: &'a serde_json::Value,
494        field: &str,
495    ) -> Result<std::borrow::Cow<'a, serde_json::Value>, ConnectorError> {
496        let v = val.get(field).ok_or_else(|| {
497            ConnectorError::WriteError(format!("CDC event missing {field} field"))
498        })?;
499        match v {
500            serde_json::Value::Object(_) => Ok(std::borrow::Cow::Borrowed(v)),
501            serde_json::Value::String(s) => {
502                let parsed: serde_json::Value = serde_json::from_str(s)
503                    .map_err(|e| ConnectorError::WriteError(format!("parse {field} JSON: {e}")))?;
504                Ok(std::borrow::Cow::Owned(parsed))
505            }
506            _ => Err(ConnectorError::WriteError(format!(
507                "{field} must be a JSON object or JSON string, got {v}"
508            ))),
509        }
510    }
511
512    /// Writes JSON value documents to `MongoDB` using the configured write mode.
513    ///
514    /// Accepts `serde_json::Value` directly (no intermediate string round-trip).
515    #[allow(clippy::too_many_lines)]
516    async fn write_docs(&self, docs: &[serde_json::Value]) -> Result<(), ConnectorError> {
517        use mongodb::bson::{doc, Document};
518
519        let collection = self
520            .collection
521            .as_ref()
522            .ok_or_else(|| ConnectorError::Internal("collection not initialized".to_string()))?;
523
524        match &self.config.write_mode {
525            WriteMode::Insert => {
526                let bson_docs: Vec<Document> = docs
527                    .iter()
528                    .map(|v| {
529                        mongodb::bson::to_document(v)
530                            .map_err(|e| ConnectorError::WriteError(format!("to BSON: {e}")))
531                    })
532                    .collect::<Result<Vec<_>, _>>()?;
533
534                let opts = mongodb::options::InsertManyOptions::builder()
535                    .ordered(Some(self.config.ordered))
536                    .build();
537
538                collection
539                    .insert_many(bson_docs)
540                    .with_options(opts)
541                    .await
542                    .map_err(|e| {
543                        self.metrics.record_error();
544                        ConnectorError::WriteError(format!("insert_many: {e}"))
545                    })?;
546
547                self.metrics.record_inserts(docs.len() as u64);
548            }
549
550            WriteMode::Upsert { ref key_fields } => {
551                for val in docs {
552                    let bson_doc = mongodb::bson::to_document(val)
553                        .map_err(|e| ConnectorError::WriteError(format!("to BSON: {e}")))?;
554
555                    let mut filter = Document::new();
556                    for key in key_fields {
557                        if let Some(v) = bson_doc.get(key) {
558                            filter.insert(key, v.clone());
559                        }
560                    }
561                    if filter.is_empty() {
562                        return Err(ConnectorError::WriteError(format!(
563                            "upsert filter is empty: none of the key_fields {key_fields:?} \
564                             exist in the document"
565                        )));
566                    }
567
568                    let opts = mongodb::options::ReplaceOptions::builder()
569                        .upsert(Some(true))
570                        .build();
571
572                    collection
573                        .replace_one(filter, bson_doc)
574                        .with_options(opts)
575                        .await
576                        .map_err(|e| {
577                            self.metrics.record_error();
578                            ConnectorError::WriteError(format!("upsert: {e}"))
579                        })?;
580                }
581
582                self.metrics.record_upserts(docs.len() as u64);
583            }
584
585            WriteMode::Replace { upsert_on_missing } => {
586                for val in docs {
587                    let bson_doc = mongodb::bson::to_document(val)
588                        .map_err(|e| ConnectorError::WriteError(format!("to BSON: {e}")))?;
589
590                    // Use _id as the filter for replacement.
591                    let filter = match bson_doc.get("_id") {
592                        Some(id) if *id != mongodb::bson::Bson::Null => {
593                            doc! { "_id": id.clone() }
594                        }
595                        _ => {
596                            return Err(ConnectorError::WriteError(
597                                "Replace mode requires a non-null _id field in document"
598                                    .to_string(),
599                            ));
600                        }
601                    };
602
603                    let opts = mongodb::options::ReplaceOptions::builder()
604                        .upsert(Some(*upsert_on_missing))
605                        .build();
606
607                    collection
608                        .replace_one(filter, bson_doc)
609                        .with_options(opts)
610                        .await
611                        .map_err(|e| {
612                            self.metrics.record_error();
613                            ConnectorError::WriteError(format!("replace: {e}"))
614                        })?;
615                }
616            }
617
618            WriteMode::CdcReplay => {
619                // CDC replay processes each document based on its _op field.
620                for val in docs {
621                    let op = val.get("_op").and_then(|v| v.as_str()).unwrap_or("I");
622
623                    match op {
624                        "I" => {
625                            let full_doc = Self::parse_cdc_field(val, "_full_document")?;
626                            let bson_doc = mongodb::bson::to_document(full_doc.as_ref())
627                                .map_err(|e| ConnectorError::WriteError(format!("to BSON: {e}")))?;
628                            collection.insert_one(bson_doc).await.map_err(|e| {
629                                ConnectorError::WriteError(format!("cdc insert: {e}"))
630                            })?;
631                            self.metrics.record_inserts(1);
632                        }
633                        "U" => {
634                            let dk = Self::parse_cdc_field(val, "_document_key")?;
635                            let ud = Self::parse_cdc_field(val, "_update_desc")?;
636                            let filter = mongodb::bson::to_document(dk.as_ref()).map_err(|e| {
637                                ConnectorError::WriteError(format!("filter BSON: {e}"))
638                            })?;
639
640                            // Transform updateDescription into update operators.
641                            // Raw format: { "updatedFields": {...}, "removedFields": [...] }
642                            // Required:   { "$set": {...}, "$unset": {...} }
643                            let mut update = mongodb::bson::Document::new();
644                            if let Some(updated) = ud.get("updatedFields") {
645                                let bson = mongodb::bson::to_bson(updated).map_err(|e| {
646                                    ConnectorError::WriteError(format!("updatedFields BSON: {e}"))
647                                })?;
648                                update.insert("$set", bson);
649                            }
650                            if let Some(removed) =
651                                ud.get("removedFields").and_then(|v| v.as_array())
652                            {
653                                if !removed.is_empty() {
654                                    let unset_doc: mongodb::bson::Document = removed
655                                        .iter()
656                                        .filter_map(|f| f.as_str())
657                                        .map(|f| {
658                                            (
659                                                f.to_string(),
660                                                mongodb::bson::Bson::String(String::new()),
661                                            )
662                                        })
663                                        .collect();
664                                    update.insert("$unset", unset_doc);
665                                }
666                            }
667                            if update.is_empty() {
668                                continue;
669                            }
670
671                            collection.update_one(filter, update).await.map_err(|e| {
672                                ConnectorError::WriteError(format!("cdc update: {e}"))
673                            })?;
674                            self.metrics.record_upserts(1);
675                        }
676                        "R" => {
677                            let dk = Self::parse_cdc_field(val, "_document_key")?;
678                            let full_doc = Self::parse_cdc_field(val, "_full_document")?;
679                            let filter = mongodb::bson::to_document(dk.as_ref()).map_err(|e| {
680                                ConnectorError::WriteError(format!("filter BSON: {e}"))
681                            })?;
682                            let replacement = mongodb::bson::to_document(full_doc.as_ref())
683                                .map_err(|e| {
684                                    ConnectorError::WriteError(format!("replace BSON: {e}"))
685                                })?;
686                            let opts = mongodb::options::ReplaceOptions::builder()
687                                .upsert(Some(true))
688                                .build();
689                            collection
690                                .replace_one(filter, replacement)
691                                .with_options(opts)
692                                .await
693                                .map_err(|e| {
694                                    ConnectorError::WriteError(format!("cdc replace: {e}"))
695                                })?;
696                            self.metrics.record_upserts(1);
697                        }
698                        "D" => {
699                            let dk = Self::parse_cdc_field(val, "_document_key")?;
700                            let filter = mongodb::bson::to_document(dk.as_ref()).map_err(|e| {
701                                ConnectorError::WriteError(format!("filter BSON: {e}"))
702                            })?;
703                            collection.delete_one(filter).await.map_err(|e| {
704                                ConnectorError::WriteError(format!("cdc delete: {e}"))
705                            })?;
706                            self.metrics.record_deletes(1);
707                        }
708                        _ => {
709                            debug!(op = op, "lifecycle event — no write issued");
710                        }
711                    }
712                }
713            }
714        }
715
716        self.metrics.record_bulk_write();
717        Ok(())
718    }
719}
720
721#[cfg(test)]
722mod tests {
723    use super::*;
724    use arrow_array::{Int64Array, StringArray};
725    use arrow_schema::{Field, Schema};
726
727    fn test_schema() -> SchemaRef {
728        Arc::new(Schema::new(vec![
729            Field::new("id", DataType::Int64, false),
730            Field::new("name", DataType::Utf8, false),
731        ]))
732    }
733
734    fn test_batch(n: usize) -> RecordBatch {
735        #[allow(clippy::cast_possible_wrap)]
736        let ids: Vec<i64> = (0..n as i64).collect();
737        let names: Vec<String> = (0..n).map(|i| format!("user_{i}")).collect();
738
739        RecordBatch::try_new(
740            test_schema(),
741            vec![
742                Arc::new(Int64Array::from(ids)),
743                Arc::new(StringArray::from(names)),
744            ],
745        )
746        .unwrap()
747    }
748
749    #[test]
750    fn test_new_sink() {
751        let config = MongoDbSinkConfig::new("mongodb://localhost:27017", "db", "coll");
752        let sink = MongoDbSink::new(test_schema(), config, None);
753        assert_eq!(sink.buffered_rows(), 0);
754    }
755
756    #[test]
757    fn test_sink_capabilities_insert() {
758        let config = MongoDbSinkConfig::default();
759        let sink = MongoDbSink::new(test_schema(), config, None);
760        let caps = sink.capabilities();
761        assert!(caps.idempotent);
762        assert!(!caps.upsert);
763        assert!(!caps.changelog);
764    }
765
766    #[test]
767    fn test_sink_capabilities_upsert() {
768        let mut config = MongoDbSinkConfig::default();
769        config.write_mode = WriteMode::Upsert {
770            key_fields: vec!["id".to_string()],
771        };
772        let sink = MongoDbSink::new(test_schema(), config, None);
773        let caps = sink.capabilities();
774        assert!(caps.upsert);
775    }
776
777    #[test]
778    fn test_sink_capabilities_cdc_replay() {
779        let mut config = MongoDbSinkConfig::default();
780        config.write_mode = WriteMode::CdcReplay;
781        let sink = MongoDbSink::new(test_schema(), config, None);
782        let caps = sink.capabilities();
783        assert!(caps.changelog);
784    }
785
786    #[test]
787    fn test_batches_to_json() {
788        let config = MongoDbSinkConfig::default();
789        let mut sink = MongoDbSink::new(test_schema(), config, None);
790        sink.buffer.push(test_batch(3));
791        sink.buffered_rows = 3;
792
793        let (docs, byte_estimate) = sink.batches_to_json_docs();
794        assert_eq!(docs.len(), 3);
795        assert!(byte_estimate > 0);
796
797        assert_eq!(docs[0]["id"], 0);
798        assert_eq!(docs[0]["name"], "user_0");
799    }
800
801    #[test]
802    fn test_should_flush_batch_size() {
803        let mut config = MongoDbSinkConfig::default();
804        config.batch_size = 100;
805        let mut sink = MongoDbSink::new(test_schema(), config, None);
806
807        assert!(!sink.should_flush());
808        sink.buffered_rows = 100;
809        assert!(sink.should_flush());
810    }
811
812    #[test]
813    fn test_clear_buffer() {
814        let config = MongoDbSinkConfig::default();
815        let mut sink = MongoDbSink::new(test_schema(), config, None);
816
817        sink.buffer.push(test_batch(5));
818        sink.buffered_rows = 5;
819
820        sink.clear_buffer();
821        assert_eq!(sink.buffered_rows, 0);
822        assert!(sink.buffer.is_empty());
823    }
824
825    #[test]
826    fn test_health_check() {
827        let config = MongoDbSinkConfig::default();
828        let mut sink = MongoDbSink::new(test_schema(), config, None);
829
830        assert_eq!(sink.health_check(), HealthStatus::Unknown);
831
832        sink.state = ConnectorState::Running;
833        assert_eq!(sink.health_check(), HealthStatus::Healthy);
834
835        sink.state = ConnectorState::Closed;
836        assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
837    }
838
839    #[test]
840    fn test_arrow_value_to_json_types() {
841        use arrow_array::*;
842
843        // Int64
844        let arr = Int64Array::from(vec![42]);
845        let val = arrow_value_to_json(&arr, 0);
846        assert_eq!(val, serde_json::json!(42));
847
848        // String
849        let arr = StringArray::from(vec!["hello"]);
850        let val = arrow_value_to_json(&arr, 0);
851        assert_eq!(val, serde_json::json!("hello"));
852
853        // Boolean
854        let arr = BooleanArray::from(vec![true]);
855        let val = arrow_value_to_json(&arr, 0);
856        assert_eq!(val, serde_json::json!(true));
857
858        // Null
859        let arr = Int64Array::from(vec![None::<i64>]);
860        let val = arrow_value_to_json(&arr, 0);
861        assert!(val.is_null());
862    }
863}