1use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use arrow_array::RecordBatch;
27use arrow_schema::{DataType, SchemaRef};
28use async_trait::async_trait;
29use tracing::{debug, info};
30
31use crate::config::{ConnectorConfig, ConnectorState};
32use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
33use crate::error::ConnectorError;
34use crate::health::HealthStatus;
35use crate::metrics::ConnectorMetrics;
36
37use super::config::MongoDbSinkConfig;
38use super::metrics::MongoDbSinkMetrics;
39use super::timeseries::CollectionKind;
40use super::write_model::WriteMode;
41
42pub struct MongoDbSink {
47 config: MongoDbSinkConfig,
49
50 schema: SchemaRef,
52
53 state: ConnectorState,
55
56 buffer: Vec<RecordBatch>,
58
59 buffered_rows: usize,
61
62 last_flush: Instant,
64
65 metrics: MongoDbSinkMetrics,
67
68 #[cfg(feature = "mongodb-cdc")]
70 client: Option<mongodb::Client>,
71
72 #[cfg(feature = "mongodb-cdc")]
74 collection: Option<mongodb::Collection<mongodb::bson::Document>>,
75}
76
77impl MongoDbSink {
78 #[must_use]
80 pub fn new(
81 schema: SchemaRef,
82 config: MongoDbSinkConfig,
83 registry: Option<&prometheus::Registry>,
84 ) -> Self {
85 let buf_capacity = (config.batch_size / 128).max(4);
86 Self {
87 config,
88 schema,
89 state: ConnectorState::Created,
90 buffer: Vec::with_capacity(buf_capacity),
91 buffered_rows: 0,
92 last_flush: Instant::now(),
93 metrics: MongoDbSinkMetrics::new(registry),
94 #[cfg(feature = "mongodb-cdc")]
95 client: None,
96 #[cfg(feature = "mongodb-cdc")]
97 collection: None,
98 }
99 }
100
101 pub fn from_config(
107 schema: SchemaRef,
108 config: &ConnectorConfig,
109 ) -> Result<Self, ConnectorError> {
110 let mongo_config = MongoDbSinkConfig::from_config(config)?;
111 Ok(Self::new(schema, mongo_config, None))
112 }
113
114 #[must_use]
116 pub fn config(&self) -> &MongoDbSinkConfig {
117 &self.config
118 }
119
120 #[must_use]
122 pub fn buffered_rows(&self) -> usize {
123 self.buffered_rows
124 }
125
126 #[must_use]
128 fn should_flush(&self) -> bool {
129 self.buffered_rows >= self.config.batch_size
130 || self.last_flush.elapsed() >= self.config.flush_interval()
131 }
132
133 fn batches_to_json_docs(&self) -> (Vec<serde_json::Value>, u64) {
138 let mut docs = Vec::with_capacity(self.buffered_rows);
139 let mut byte_estimate: u64 = 0;
140
141 for batch in &self.buffer {
142 for row_idx in 0..batch.num_rows() {
143 let mut doc = serde_json::Map::new();
144
145 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
146 let col = batch.column(col_idx);
147 let value = arrow_value_to_json(col, row_idx);
148 doc.insert(field.name().clone(), value);
149 }
150
151 let val = serde_json::Value::Object(doc);
152 byte_estimate += serde_json::to_string(&val).map_or(0, |s| s.len() as u64);
153 docs.push(val);
154 }
155 }
156
157 (docs, byte_estimate)
158 }
159
160 fn clear_buffer(&mut self) {
162 self.buffer.clear();
163 self.buffered_rows = 0;
164 self.last_flush = Instant::now();
165 }
166
167 async fn flush_inner(&mut self) -> Result<WriteResult, ConnectorError> {
171 if self.buffer.is_empty() {
172 return Ok(WriteResult::new(0, 0));
173 }
174
175 let (docs, byte_estimate) = self.batches_to_json_docs();
176 let doc_count = docs.len();
177
178 #[cfg(feature = "mongodb-cdc")]
179 {
180 self.write_docs(&docs).await?;
181 }
182
183 #[cfg(not(feature = "mongodb-cdc"))]
184 {
185 debug!(
186 count = doc_count,
187 "flush (no-op without mongodb-cdc feature)"
188 );
189 }
190
191 self.metrics.record_flush(doc_count as u64, byte_estimate);
192 self.clear_buffer();
193
194 Ok(WriteResult::new(doc_count, byte_estimate))
195 }
196}
197
198fn arrow_value_to_json(col: &dyn arrow_array::Array, row: usize) -> serde_json::Value {
200 use arrow_array::{BooleanArray, LargeStringArray, StringArray};
201
202 if col.is_null(row) {
203 return serde_json::Value::Null;
204 }
205
206 match col.data_type() {
207 DataType::Boolean => {
208 let arr = col.as_any().downcast_ref::<BooleanArray>().unwrap();
209 serde_json::Value::Bool(arr.value(row))
210 }
211 DataType::Int8
212 | DataType::Int16
213 | DataType::Int32
214 | DataType::Int64
215 | DataType::UInt8
216 | DataType::UInt16
217 | DataType::UInt32
218 | DataType::UInt64 => json_from_primitive(col, row),
219 DataType::Float32 | DataType::Float64 => json_from_float(col, row),
220 DataType::Utf8 => {
221 let arr = col.as_any().downcast_ref::<StringArray>().unwrap();
222 serde_json::Value::String(arr.value(row).to_string())
223 }
224 DataType::LargeUtf8 => {
225 let arr = col.as_any().downcast_ref::<LargeStringArray>().unwrap();
226 serde_json::Value::String(arr.value(row).to_string())
227 }
228 _ => {
229 let formatted = arrow_cast::display::ArrayFormatter::try_new(
231 col,
232 &arrow_cast::display::FormatOptions::default(),
233 );
234 match formatted {
235 Ok(fmt) => serde_json::Value::String(fmt.value(row).to_string()),
236 Err(_) => serde_json::Value::Null,
237 }
238 }
239 }
240}
241
242fn json_from_primitive(col: &dyn arrow_array::Array, row: usize) -> serde_json::Value {
244 let formatted = arrow_cast::display::ArrayFormatter::try_new(
245 col,
246 &arrow_cast::display::FormatOptions::default(),
247 );
248 match formatted {
249 Ok(fmt) => {
250 let s = fmt.value(row).to_string();
251 if let Ok(n) = s.parse::<i64>() {
252 serde_json::Value::Number(n.into())
253 } else {
254 serde_json::Value::String(s)
255 }
256 }
257 Err(_) => serde_json::Value::Null,
258 }
259}
260
261fn json_from_float(col: &dyn arrow_array::Array, row: usize) -> serde_json::Value {
263 let formatted = arrow_cast::display::ArrayFormatter::try_new(
264 col,
265 &arrow_cast::display::FormatOptions::default(),
266 );
267 match formatted {
268 Ok(fmt) => {
269 let s = fmt.value(row).to_string();
270 if let Ok(n) = s.parse::<f64>() {
271 serde_json::json!(n)
272 } else {
273 serde_json::Value::String(s)
274 }
275 }
276 Err(_) => serde_json::Value::Null,
277 }
278}
279
280#[async_trait]
281impl SinkConnector for MongoDbSink {
282 async fn open(&mut self, _config: &ConnectorConfig) -> Result<(), ConnectorError> {
283 self.config.validate()?;
284
285 #[cfg(feature = "mongodb-cdc")]
286 {
287 self.connect().await?;
288 }
289
290 self.state = ConnectorState::Running;
291 info!(
292 database = %self.config.database,
293 collection = %self.config.collection,
294 write_mode = ?self.config.write_mode,
295 ordered = self.config.ordered,
296 "MongoDB sink opened"
297 );
298
299 Ok(())
300 }
301
302 async fn write_batch(&mut self, batch: &RecordBatch) -> Result<WriteResult, ConnectorError> {
303 let rows = batch.num_rows();
304 if rows == 0 {
305 return Ok(WriteResult::new(0, 0));
306 }
307
308 self.buffer.push(batch.clone());
309 self.buffered_rows += rows;
310
311 if self.should_flush() {
312 return self.flush_inner().await;
313 }
314
315 Ok(WriteResult::new(0, 0))
317 }
318
319 fn schema(&self) -> SchemaRef {
320 Arc::clone(&self.schema)
321 }
322
323 fn health_check(&self) -> HealthStatus {
324 match self.state {
325 ConnectorState::Running => HealthStatus::Healthy,
326 ConnectorState::Created => HealthStatus::Unknown,
327 ConnectorState::Closed | ConnectorState::Failed => {
328 HealthStatus::Unhealthy("closed".to_string())
329 }
330 _ => HealthStatus::Degraded(format!("state: {}", self.state)),
331 }
332 }
333
334 fn metrics(&self) -> ConnectorMetrics {
335 self.metrics.to_connector_metrics()
336 }
337
338 fn capabilities(&self) -> SinkConnectorCapabilities {
339 let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(30)).with_idempotent();
340
341 if matches!(self.config.write_mode, WriteMode::Upsert { .. }) {
342 caps = caps.with_upsert();
343 }
344 if matches!(self.config.write_mode, WriteMode::CdcReplay) {
345 caps = caps.with_changelog();
346 }
347
348 caps
349 }
350
351 async fn flush(&mut self) -> Result<(), ConnectorError> {
352 self.flush_inner().await.map(|_| ())
353 }
354
355 async fn close(&mut self) -> Result<(), ConnectorError> {
356 if !self.buffer.is_empty() {
358 self.flush().await?;
359 }
360
361 self.state = ConnectorState::Closed;
362 info!("MongoDB sink closed");
363 Ok(())
364 }
365}
366
367#[cfg(feature = "mongodb-cdc")]
370impl MongoDbSink {
371 async fn connect(&mut self) -> Result<(), ConnectorError> {
373 use mongodb::options::{ClientOptions, CollectionOptions};
374
375 let client_options = ClientOptions::parse(&self.config.connection_uri)
376 .await
377 .map_err(|e| ConnectorError::ConnectionFailed(format!("parse URI: {e}")))?;
378
379 let client = mongodb::Client::with_options(client_options)
380 .map_err(|e| ConnectorError::ConnectionFailed(format!("create client: {e}")))?;
381
382 let db = client.database(&self.config.database);
383
384 if let CollectionKind::TimeSeries(ref ts_config) = self.config.collection_kind {
386 self.ensure_timeseries_collection(&db, ts_config).await?;
387 }
388
389 let wc = {
391 use super::config::WriteConcernLevel;
392 let mut wc = mongodb::options::WriteConcern::default();
393 wc.w = Some(match &self.config.write_concern.w {
394 WriteConcernLevel::Majority => mongodb::options::Acknowledgment::Majority,
395 WriteConcernLevel::Nodes(n) => mongodb::options::Acknowledgment::Nodes(*n),
396 });
397 wc.journal = Some(self.config.write_concern.journal);
398 wc.w_timeout = self
399 .config
400 .write_concern
401 .timeout_ms
402 .map(std::time::Duration::from_millis);
403 wc
404 };
405
406 let coll_opts = CollectionOptions::builder().write_concern(wc).build();
407
408 let collection = db
409 .collection_with_options::<mongodb::bson::Document>(&self.config.collection, coll_opts);
410
411 self.client = Some(client);
412 self.collection = Some(collection);
413
414 Ok(())
415 }
416
417 async fn ensure_timeseries_collection(
419 &self,
420 db: &mongodb::Database,
421 ts_config: &super::timeseries::TimeSeriesConfig,
422 ) -> Result<(), ConnectorError> {
423 use mongodb::bson::doc;
424
425 let mut ts_opts = doc! {
426 "timeField": &ts_config.time_field,
427 };
428
429 if let Some(ref meta) = ts_config.meta_field {
430 ts_opts.insert("metaField", meta);
431 }
432
433 match ts_config.granularity {
434 super::timeseries::TimeSeriesGranularity::Seconds => {
435 ts_opts.insert("granularity", "seconds");
436 }
437 super::timeseries::TimeSeriesGranularity::Minutes => {
438 ts_opts.insert("granularity", "minutes");
439 }
440 super::timeseries::TimeSeriesGranularity::Hours => {
441 ts_opts.insert("granularity", "hours");
442 }
443 super::timeseries::TimeSeriesGranularity::Custom {
444 bucket_max_span_seconds,
445 bucket_rounding_seconds,
446 } => {
447 ts_opts.insert("bucketMaxSpanSeconds", i64::from(bucket_max_span_seconds));
448 ts_opts.insert("bucketRoundingSeconds", i64::from(bucket_rounding_seconds));
449 }
450 }
451
452 let mut create_opts = doc! {
453 "create": &self.config.collection,
454 "timeseries": ts_opts,
455 };
456
457 if let Some(ttl) = ts_config.expire_after_seconds {
458 #[allow(clippy::cast_possible_wrap)]
459 create_opts.insert("expireAfterSeconds", ttl as i64);
460 }
461
462 match db.run_command(create_opts).await {
464 Ok(_) => {
465 info!(
466 collection = %self.config.collection,
467 time_field = %ts_config.time_field,
468 granularity = %ts_config.granularity,
469 "created time series collection"
470 );
471 }
472 Err(e) => {
473 let msg = e.to_string();
474 if !msg.contains("already exists") && !msg.contains("NamespaceExists") {
475 return Err(ConnectorError::ConnectionFailed(format!(
476 "create time series collection: {e}"
477 )));
478 }
479 debug!(
480 collection = %self.config.collection,
481 "time series collection already exists"
482 );
483 }
484 }
485
486 Ok(())
487 }
488
489 fn parse_cdc_field<'a>(
493 val: &'a serde_json::Value,
494 field: &str,
495 ) -> Result<std::borrow::Cow<'a, serde_json::Value>, ConnectorError> {
496 let v = val.get(field).ok_or_else(|| {
497 ConnectorError::WriteError(format!("CDC event missing {field} field"))
498 })?;
499 match v {
500 serde_json::Value::Object(_) => Ok(std::borrow::Cow::Borrowed(v)),
501 serde_json::Value::String(s) => {
502 let parsed: serde_json::Value = serde_json::from_str(s)
503 .map_err(|e| ConnectorError::WriteError(format!("parse {field} JSON: {e}")))?;
504 Ok(std::borrow::Cow::Owned(parsed))
505 }
506 _ => Err(ConnectorError::WriteError(format!(
507 "{field} must be a JSON object or JSON string, got {v}"
508 ))),
509 }
510 }
511
512 #[allow(clippy::too_many_lines)]
516 async fn write_docs(&self, docs: &[serde_json::Value]) -> Result<(), ConnectorError> {
517 use mongodb::bson::{doc, Document};
518
519 let collection = self
520 .collection
521 .as_ref()
522 .ok_or_else(|| ConnectorError::Internal("collection not initialized".to_string()))?;
523
524 match &self.config.write_mode {
525 WriteMode::Insert => {
526 let bson_docs: Vec<Document> = docs
527 .iter()
528 .map(|v| {
529 mongodb::bson::to_document(v)
530 .map_err(|e| ConnectorError::WriteError(format!("to BSON: {e}")))
531 })
532 .collect::<Result<Vec<_>, _>>()?;
533
534 let opts = mongodb::options::InsertManyOptions::builder()
535 .ordered(Some(self.config.ordered))
536 .build();
537
538 collection
539 .insert_many(bson_docs)
540 .with_options(opts)
541 .await
542 .map_err(|e| {
543 self.metrics.record_error();
544 ConnectorError::WriteError(format!("insert_many: {e}"))
545 })?;
546
547 self.metrics.record_inserts(docs.len() as u64);
548 }
549
550 WriteMode::Upsert { ref key_fields } => {
551 for val in docs {
552 let bson_doc = mongodb::bson::to_document(val)
553 .map_err(|e| ConnectorError::WriteError(format!("to BSON: {e}")))?;
554
555 let mut filter = Document::new();
556 for key in key_fields {
557 if let Some(v) = bson_doc.get(key) {
558 filter.insert(key, v.clone());
559 }
560 }
561 if filter.is_empty() {
562 return Err(ConnectorError::WriteError(format!(
563 "upsert filter is empty: none of the key_fields {key_fields:?} \
564 exist in the document"
565 )));
566 }
567
568 let opts = mongodb::options::ReplaceOptions::builder()
569 .upsert(Some(true))
570 .build();
571
572 collection
573 .replace_one(filter, bson_doc)
574 .with_options(opts)
575 .await
576 .map_err(|e| {
577 self.metrics.record_error();
578 ConnectorError::WriteError(format!("upsert: {e}"))
579 })?;
580 }
581
582 self.metrics.record_upserts(docs.len() as u64);
583 }
584
585 WriteMode::Replace { upsert_on_missing } => {
586 for val in docs {
587 let bson_doc = mongodb::bson::to_document(val)
588 .map_err(|e| ConnectorError::WriteError(format!("to BSON: {e}")))?;
589
590 let filter = match bson_doc.get("_id") {
592 Some(id) if *id != mongodb::bson::Bson::Null => {
593 doc! { "_id": id.clone() }
594 }
595 _ => {
596 return Err(ConnectorError::WriteError(
597 "Replace mode requires a non-null _id field in document"
598 .to_string(),
599 ));
600 }
601 };
602
603 let opts = mongodb::options::ReplaceOptions::builder()
604 .upsert(Some(*upsert_on_missing))
605 .build();
606
607 collection
608 .replace_one(filter, bson_doc)
609 .with_options(opts)
610 .await
611 .map_err(|e| {
612 self.metrics.record_error();
613 ConnectorError::WriteError(format!("replace: {e}"))
614 })?;
615 }
616 }
617
618 WriteMode::CdcReplay => {
619 for val in docs {
621 let op = val.get("_op").and_then(|v| v.as_str()).unwrap_or("I");
622
623 match op {
624 "I" => {
625 let full_doc = Self::parse_cdc_field(val, "_full_document")?;
626 let bson_doc = mongodb::bson::to_document(full_doc.as_ref())
627 .map_err(|e| ConnectorError::WriteError(format!("to BSON: {e}")))?;
628 collection.insert_one(bson_doc).await.map_err(|e| {
629 ConnectorError::WriteError(format!("cdc insert: {e}"))
630 })?;
631 self.metrics.record_inserts(1);
632 }
633 "U" => {
634 let dk = Self::parse_cdc_field(val, "_document_key")?;
635 let ud = Self::parse_cdc_field(val, "_update_desc")?;
636 let filter = mongodb::bson::to_document(dk.as_ref()).map_err(|e| {
637 ConnectorError::WriteError(format!("filter BSON: {e}"))
638 })?;
639
640 let mut update = mongodb::bson::Document::new();
644 if let Some(updated) = ud.get("updatedFields") {
645 let bson = mongodb::bson::to_bson(updated).map_err(|e| {
646 ConnectorError::WriteError(format!("updatedFields BSON: {e}"))
647 })?;
648 update.insert("$set", bson);
649 }
650 if let Some(removed) =
651 ud.get("removedFields").and_then(|v| v.as_array())
652 {
653 if !removed.is_empty() {
654 let unset_doc: mongodb::bson::Document = removed
655 .iter()
656 .filter_map(|f| f.as_str())
657 .map(|f| {
658 (
659 f.to_string(),
660 mongodb::bson::Bson::String(String::new()),
661 )
662 })
663 .collect();
664 update.insert("$unset", unset_doc);
665 }
666 }
667 if update.is_empty() {
668 continue;
669 }
670
671 collection.update_one(filter, update).await.map_err(|e| {
672 ConnectorError::WriteError(format!("cdc update: {e}"))
673 })?;
674 self.metrics.record_upserts(1);
675 }
676 "R" => {
677 let dk = Self::parse_cdc_field(val, "_document_key")?;
678 let full_doc = Self::parse_cdc_field(val, "_full_document")?;
679 let filter = mongodb::bson::to_document(dk.as_ref()).map_err(|e| {
680 ConnectorError::WriteError(format!("filter BSON: {e}"))
681 })?;
682 let replacement = mongodb::bson::to_document(full_doc.as_ref())
683 .map_err(|e| {
684 ConnectorError::WriteError(format!("replace BSON: {e}"))
685 })?;
686 let opts = mongodb::options::ReplaceOptions::builder()
687 .upsert(Some(true))
688 .build();
689 collection
690 .replace_one(filter, replacement)
691 .with_options(opts)
692 .await
693 .map_err(|e| {
694 ConnectorError::WriteError(format!("cdc replace: {e}"))
695 })?;
696 self.metrics.record_upserts(1);
697 }
698 "D" => {
699 let dk = Self::parse_cdc_field(val, "_document_key")?;
700 let filter = mongodb::bson::to_document(dk.as_ref()).map_err(|e| {
701 ConnectorError::WriteError(format!("filter BSON: {e}"))
702 })?;
703 collection.delete_one(filter).await.map_err(|e| {
704 ConnectorError::WriteError(format!("cdc delete: {e}"))
705 })?;
706 self.metrics.record_deletes(1);
707 }
708 _ => {
709 debug!(op = op, "lifecycle event — no write issued");
710 }
711 }
712 }
713 }
714 }
715
716 self.metrics.record_bulk_write();
717 Ok(())
718 }
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724 use arrow_array::{Int64Array, StringArray};
725 use arrow_schema::{Field, Schema};
726
727 fn test_schema() -> SchemaRef {
728 Arc::new(Schema::new(vec![
729 Field::new("id", DataType::Int64, false),
730 Field::new("name", DataType::Utf8, false),
731 ]))
732 }
733
734 fn test_batch(n: usize) -> RecordBatch {
735 #[allow(clippy::cast_possible_wrap)]
736 let ids: Vec<i64> = (0..n as i64).collect();
737 let names: Vec<String> = (0..n).map(|i| format!("user_{i}")).collect();
738
739 RecordBatch::try_new(
740 test_schema(),
741 vec![
742 Arc::new(Int64Array::from(ids)),
743 Arc::new(StringArray::from(names)),
744 ],
745 )
746 .unwrap()
747 }
748
749 #[test]
750 fn test_new_sink() {
751 let config = MongoDbSinkConfig::new("mongodb://localhost:27017", "db", "coll");
752 let sink = MongoDbSink::new(test_schema(), config, None);
753 assert_eq!(sink.buffered_rows(), 0);
754 }
755
756 #[test]
757 fn test_sink_capabilities_insert() {
758 let config = MongoDbSinkConfig::default();
759 let sink = MongoDbSink::new(test_schema(), config, None);
760 let caps = sink.capabilities();
761 assert!(caps.idempotent);
762 assert!(!caps.upsert);
763 assert!(!caps.changelog);
764 }
765
766 #[test]
767 fn test_sink_capabilities_upsert() {
768 let mut config = MongoDbSinkConfig::default();
769 config.write_mode = WriteMode::Upsert {
770 key_fields: vec!["id".to_string()],
771 };
772 let sink = MongoDbSink::new(test_schema(), config, None);
773 let caps = sink.capabilities();
774 assert!(caps.upsert);
775 }
776
777 #[test]
778 fn test_sink_capabilities_cdc_replay() {
779 let mut config = MongoDbSinkConfig::default();
780 config.write_mode = WriteMode::CdcReplay;
781 let sink = MongoDbSink::new(test_schema(), config, None);
782 let caps = sink.capabilities();
783 assert!(caps.changelog);
784 }
785
786 #[test]
787 fn test_batches_to_json() {
788 let config = MongoDbSinkConfig::default();
789 let mut sink = MongoDbSink::new(test_schema(), config, None);
790 sink.buffer.push(test_batch(3));
791 sink.buffered_rows = 3;
792
793 let (docs, byte_estimate) = sink.batches_to_json_docs();
794 assert_eq!(docs.len(), 3);
795 assert!(byte_estimate > 0);
796
797 assert_eq!(docs[0]["id"], 0);
798 assert_eq!(docs[0]["name"], "user_0");
799 }
800
801 #[test]
802 fn test_should_flush_batch_size() {
803 let mut config = MongoDbSinkConfig::default();
804 config.batch_size = 100;
805 let mut sink = MongoDbSink::new(test_schema(), config, None);
806
807 assert!(!sink.should_flush());
808 sink.buffered_rows = 100;
809 assert!(sink.should_flush());
810 }
811
812 #[test]
813 fn test_clear_buffer() {
814 let config = MongoDbSinkConfig::default();
815 let mut sink = MongoDbSink::new(test_schema(), config, None);
816
817 sink.buffer.push(test_batch(5));
818 sink.buffered_rows = 5;
819
820 sink.clear_buffer();
821 assert_eq!(sink.buffered_rows, 0);
822 assert!(sink.buffer.is_empty());
823 }
824
825 #[test]
826 fn test_health_check() {
827 let config = MongoDbSinkConfig::default();
828 let mut sink = MongoDbSink::new(test_schema(), config, None);
829
830 assert_eq!(sink.health_check(), HealthStatus::Unknown);
831
832 sink.state = ConnectorState::Running;
833 assert_eq!(sink.health_check(), HealthStatus::Healthy);
834
835 sink.state = ConnectorState::Closed;
836 assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
837 }
838
839 #[test]
840 fn test_arrow_value_to_json_types() {
841 use arrow_array::*;
842
843 let arr = Int64Array::from(vec![42]);
845 let val = arrow_value_to_json(&arr, 0);
846 assert_eq!(val, serde_json::json!(42));
847
848 let arr = StringArray::from(vec!["hello"]);
850 let val = arrow_value_to_json(&arr, 0);
851 assert_eq!(val, serde_json::json!("hello"));
852
853 let arr = BooleanArray::from(vec![true]);
855 let val = arrow_value_to_json(&arr, 0);
856 assert_eq!(val, serde_json::json!(true));
857
858 let arr = Int64Array::from(vec![None::<i64>]);
860 let val = arrow_value_to_json(&arr, 0);
861 assert!(val.is_null());
862 }
863}