1use std::sync::Arc;
9use std::time::Duration;
10
11use arrow_array::{Array, StringArray};
12use arrow_schema::SchemaRef;
13use async_trait::async_trait;
14use rdkafka::message::OwnedHeaders;
15use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
16use rdkafka::ClientConfig;
17use tracing::{debug, info, warn};
18
19use crate::config::{ConnectorConfig, ConnectorState};
20use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
21use crate::error::ConnectorError;
22use crate::health::HealthStatus;
23use crate::metrics::ConnectorMetrics;
24use crate::serde::{self, Format, RecordSerializer};
25
26use super::avro_serializer::AvroSerializer;
27use super::partitioner::{
28 KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
29};
30use super::schema_registry::SchemaRegistryClient;
31use super::sink_config::{DeliveryGuarantee, KafkaSinkConfig, PartitionStrategy};
32use super::sink_metrics::KafkaSinkMetrics;
33
34const FALLBACK_PARTITION_COUNT: i32 = 1;
36
37struct KeyBuffer {
41 data: Vec<u8>,
42 offsets: Vec<(usize, usize)>,
43}
44
45impl KeyBuffer {
46 fn with_capacity(num_rows: usize, avg_key_len: usize) -> Self {
47 Self {
48 data: Vec::with_capacity(num_rows * avg_key_len),
49 offsets: Vec::with_capacity(num_rows),
50 }
51 }
52
53 fn push(&mut self, key: &[u8]) {
54 let start = self.data.len();
55 self.data.extend_from_slice(key);
56 self.offsets.push((start, key.len()));
57 }
58
59 fn push_empty(&mut self) {
60 self.offsets.push((0, 0));
61 }
62
63 fn key(&self, i: usize) -> &[u8] {
64 let (start, len) = self.offsets[i];
65 &self.data[start..start + len]
66 }
67
68 #[cfg(test)]
69 fn len(&self) -> usize {
70 self.offsets.len()
71 }
72}
73
74impl std::ops::Index<usize> for KeyBuffer {
75 type Output = [u8];
76
77 fn index(&self, i: usize) -> &[u8] {
78 self.key(i)
79 }
80}
81
82pub struct KafkaSink {
97 producer: Option<FutureProducer>,
99 config: KafkaSinkConfig,
101 serializer: Box<dyn RecordSerializer>,
103 partitioner: Box<dyn KafkaPartitioner>,
105 state: ConnectorState,
107 current_epoch: u64,
109 last_committed_epoch: u64,
111 transaction_active: bool,
113 dlq_producer: Option<FutureProducer>,
115 metrics: KafkaSinkMetrics,
117 schema: SchemaRef,
119 schema_registry: Option<Arc<SchemaRegistryClient>>,
121 avro_schema_id: Arc<std::sync::atomic::AtomicU32>,
123 topic_partition_count: i32,
125}
126
127impl KafkaSink {
128 #[must_use]
130 pub fn new(schema: SchemaRef, config: KafkaSinkConfig) -> Self {
131 let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
132 let serializer =
133 select_serializer(config.format, &schema, Arc::clone(&avro_schema_id), None);
134 let partitioner = select_partitioner(config.partitioner);
135
136 Self {
137 producer: None,
138 config,
139 serializer,
140 partitioner,
141 state: ConnectorState::Created,
142 current_epoch: 0,
143 last_committed_epoch: 0,
144 transaction_active: false,
145 dlq_producer: None,
146 metrics: KafkaSinkMetrics::new(),
147 schema,
148 schema_registry: None,
149 avro_schema_id,
150 topic_partition_count: FALLBACK_PARTITION_COUNT,
151 }
152 }
153
154 #[must_use]
156 pub fn with_schema_registry(
157 schema: SchemaRef,
158 config: KafkaSinkConfig,
159 sr_client: SchemaRegistryClient,
160 ) -> Self {
161 let sr = Arc::new(sr_client);
162 let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
163 let serializer = select_serializer(
164 config.format,
165 &schema,
166 Arc::clone(&avro_schema_id),
167 Some(Arc::clone(&sr)),
168 );
169 let partitioner = select_partitioner(config.partitioner);
170
171 Self {
172 producer: None,
173 config,
174 serializer,
175 partitioner,
176 state: ConnectorState::Created,
177 current_epoch: 0,
178 last_committed_epoch: 0,
179 transaction_active: false,
180 dlq_producer: None,
181 metrics: KafkaSinkMetrics::new(),
182 schema,
183 schema_registry: Some(sr),
184 avro_schema_id,
185 topic_partition_count: FALLBACK_PARTITION_COUNT,
186 }
187 }
188
189 #[must_use]
191 pub fn state(&self) -> ConnectorState {
192 self.state
193 }
194
195 #[must_use]
197 pub fn has_schema_registry(&self) -> bool {
198 self.schema_registry.is_some()
199 }
200
201 #[must_use]
203 pub fn current_epoch(&self) -> u64 {
204 self.current_epoch
205 }
206
207 #[must_use]
209 pub fn last_committed_epoch(&self) -> u64 {
210 self.last_committed_epoch
211 }
212
213 async fn ensure_schema_ready(
215 &mut self,
216 batch_schema: &SchemaRef,
217 ) -> Result<(), ConnectorError> {
218 let schema_changed = self.schema != *batch_schema;
219 let needs_registration = self.config.format == Format::Avro
220 && (schema_changed
221 || self
222 .avro_schema_id
223 .load(std::sync::atomic::Ordering::Relaxed)
224 == 0);
225
226 if needs_registration {
230 if let Some(ref sr) = self.schema_registry {
231 let subject = format!("{}-value", self.config.topic);
232 let avro_schema =
233 super::schema_registry::arrow_to_avro_schema(batch_schema, &self.config.topic)
234 .map_err(ConnectorError::Serde)?;
235 let schema_id = sr
236 .register_schema(
237 &subject,
238 &avro_schema,
239 super::schema_registry::SchemaType::Avro,
240 )
241 .await
242 .map_err(|e| {
243 ConnectorError::ConnectionFailed(format!(
244 "failed to register Avro schema for '{subject}': {e}"
245 ))
246 })?;
247 #[allow(clippy::cast_sign_loss)]
248 self.avro_schema_id
249 .store(schema_id as u32, std::sync::atomic::Ordering::Relaxed);
250 info!(subject = %subject, schema_id, "registered Avro schema");
251 }
252 }
253
254 if schema_changed {
255 debug!(
256 old = ?self.schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
257 new = ?batch_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
258 "sink schema updated from incoming batch"
259 );
260 self.schema = batch_schema.clone();
261 self.serializer = select_serializer(
262 self.config.format,
263 &self.schema,
264 Arc::clone(&self.avro_schema_id),
265 self.schema_registry.clone(),
266 );
267 }
268
269 Ok(())
270 }
271
272 fn extract_keys(
276 &self,
277 batch: &arrow_array::RecordBatch,
278 ) -> Result<Option<KeyBuffer>, ConnectorError> {
279 let Some(key_col) = &self.config.key_column else {
280 return Ok(None);
281 };
282
283 let col_idx = batch.schema().index_of(key_col).map_err(|_| {
284 ConnectorError::ConfigurationError(format!(
285 "key column '{key_col}' not found in schema"
286 ))
287 })?;
288
289 let array = batch.column(col_idx);
290 let num_rows = batch.num_rows();
291 let mut buf = KeyBuffer::with_capacity(num_rows, 32);
292
293 if let Some(str_array) = array.as_any().downcast_ref::<StringArray>() {
295 for i in 0..num_rows {
296 if str_array.is_null(i) {
297 buf.push_empty();
298 } else {
299 buf.push(str_array.value(i).as_bytes());
300 }
301 }
302 } else {
303 use std::fmt::Write;
305 let formatter = arrow_cast::display::ArrayFormatter::try_new(
306 array,
307 &arrow_cast::display::FormatOptions::default(),
308 )
309 .map_err(|e| {
310 ConnectorError::Internal(format!(
311 "failed to create array formatter for key column: {e}"
312 ))
313 })?;
314 let mut fmt_buf = String::with_capacity(64);
316 for i in 0..num_rows {
317 if array.is_null(i) {
318 buf.push_empty();
319 } else {
320 fmt_buf.clear();
321 let _ = write!(fmt_buf, "{}", formatter.value(i));
322 buf.push(fmt_buf.as_bytes());
323 }
324 }
325 }
326
327 Ok(Some(buf))
328 }
329
330 async fn route_to_dlq(
332 &self,
333 payload: &[u8],
334 key: Option<&[u8]>,
335 error_msg: &str,
336 ) -> Result<(), ConnectorError> {
337 let dlq_producer = self
338 .dlq_producer
339 .as_ref()
340 .ok_or_else(|| ConnectorError::ConfigurationError("DLQ topic not configured".into()))?;
341 let dlq_topic =
342 self.config.dlq_topic.as_ref().ok_or_else(|| {
343 ConnectorError::ConfigurationError("DLQ topic not configured".into())
344 })?;
345
346 let now = std::time::SystemTime::now()
347 .duration_since(std::time::UNIX_EPOCH)
348 .unwrap_or_default()
349 .as_millis()
350 .to_string();
351 let epoch_str = self.current_epoch.to_string();
352
353 let headers = OwnedHeaders::new()
354 .insert(rdkafka::message::Header {
355 key: "__dlq.error",
356 value: Some(error_msg.as_bytes()),
357 })
358 .insert(rdkafka::message::Header {
359 key: "__dlq.topic",
360 value: Some(self.config.topic.as_bytes()),
361 })
362 .insert(rdkafka::message::Header {
363 key: "__dlq.timestamp",
364 value: Some(now.as_bytes()),
365 })
366 .insert(rdkafka::message::Header {
367 key: "__dlq.epoch",
368 value: Some(epoch_str.as_bytes()),
369 });
370
371 let mut record = FutureRecord::to(dlq_topic)
372 .payload(payload)
373 .headers(headers);
374
375 if let Some(k) = key {
376 record = record.key(k);
377 }
378
379 dlq_producer
380 .send(record, Duration::from_secs(5))
381 .await
382 .map_err(|(e, _)| ConnectorError::WriteError(format!("DLQ send failed: {e}")))?;
383
384 self.metrics.record_dlq();
385 Ok(())
386 }
387}
388
389#[async_trait]
390#[allow(clippy::too_many_lines)]
391impl SinkConnector for KafkaSink {
392 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
393 self.state = ConnectorState::Initializing;
394
395 if !config.properties().is_empty() {
397 let parsed = KafkaSinkConfig::from_config(config)?;
398 self.config = parsed;
399 self.serializer = select_serializer(
400 self.config.format,
401 &self.schema,
402 Arc::clone(&self.avro_schema_id),
403 self.schema_registry.clone(),
404 );
405 self.partitioner = select_partitioner(self.config.partitioner);
406 }
407
408 info!(
409 brokers = %self.config.bootstrap_servers,
410 topic = %self.config.topic,
411 format = %self.config.format,
412 delivery = %self.config.delivery_guarantee,
413 "opening Kafka sink connector"
414 );
415
416 let rdkafka_config: ClientConfig = self.config.to_rdkafka_config();
418 let producer: FutureProducer = rdkafka_config.create().map_err(|e| {
419 ConnectorError::ConnectionFailed(format!("failed to create producer: {e}"))
420 })?;
421
422 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
424 producer
425 .init_transactions(self.config.transaction_timeout)
426 .map_err(|e| {
427 ConnectorError::TransactionError(format!("failed to init transactions: {e}"))
428 })?;
429 }
430
431 if self.config.dlq_topic.is_some() {
436 let dlq_config = self.config.to_dlq_rdkafka_config();
437 let dlq_producer: FutureProducer = dlq_config.create().map_err(|e| {
438 ConnectorError::ConnectionFailed(format!("failed to create DLQ producer: {e}"))
439 })?;
440 self.dlq_producer = Some(dlq_producer);
441 }
442
443 if let Some(ref url) = self.config.schema_registry_url {
445 if self.schema_registry.is_none() {
446 let sr = if let Some(ref ca_path) = self.config.schema_registry_ssl_ca_location {
447 SchemaRegistryClient::with_tls(
448 url,
449 self.config.schema_registry_auth.clone(),
450 ca_path,
451 )?
452 } else {
453 SchemaRegistryClient::new(url, self.config.schema_registry_auth.clone())
454 };
455 self.schema_registry = Some(Arc::new(sr));
456 }
457 }
458
459 if self.config.format == Format::Avro {
464 if let Some(ref sr) = self.schema_registry {
465 if let Some(ref compat) = self.config.schema_compatibility {
466 let subject = format!("{}-value", self.config.topic);
467 sr.set_compatibility_level(&subject, *compat)
468 .await
469 .map_err(|e| {
470 ConnectorError::ConnectionFailed(format!(
471 "failed to set SR compatibility for '{subject}': {e}"
472 ))
473 })?;
474 }
475 }
476 }
477
478 self.topic_partition_count = FALLBACK_PARTITION_COUNT;
481 match producer
482 .client()
483 .fetch_metadata(Some(&self.config.topic), Duration::from_secs(5))
484 {
485 Ok(metadata) => {
486 if let Some(topic_meta) = metadata.topics().first() {
487 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
488 let count = topic_meta.partitions().len() as i32;
489 if count > 0 {
490 self.topic_partition_count = count;
491 info!(
492 topic = %self.config.topic,
493 partitions = count,
494 "queried topic partition count from broker"
495 );
496 }
497 }
498 }
499 Err(e) => {
500 warn!(
501 topic = %self.config.topic,
502 error = %e,
503 fallback = FALLBACK_PARTITION_COUNT,
504 "failed to query topic metadata — using fallback partition count"
505 );
506 }
507 }
508
509 self.producer = Some(producer);
510 self.state = ConnectorState::Running;
511 info!("Kafka sink connector opened successfully");
512 Ok(())
513 }
514
515 #[allow(clippy::cast_possible_truncation)] async fn write_batch(
517 &mut self,
518 batch: &arrow_array::RecordBatch,
519 ) -> Result<WriteResult, ConnectorError> {
520 if self.state != ConnectorState::Running {
521 return Err(ConnectorError::InvalidState {
522 expected: "Running".into(),
523 actual: self.state.to_string(),
524 });
525 }
526
527 self.ensure_schema_ready(&batch.schema()).await?;
528
529 let producer = self
530 .producer
531 .as_ref()
532 .ok_or_else(|| ConnectorError::InvalidState {
533 expected: "producer initialized".into(),
534 actual: "producer is None".into(),
535 })?;
536
537 let payloads = self.serializer.serialize(batch).map_err(|e| {
539 self.metrics.record_serialization_error();
540 ConnectorError::Serde(e)
541 })?;
542
543 let keys = self.extract_keys(batch)?;
545
546 let mut records_written: usize = 0;
547 let mut bytes_written: u64 = 0;
548
549 let flush_threshold = self.config.flush_batch_size;
553 let mut delivery_futures = Vec::with_capacity(payloads.len());
554 for (i, payload) in payloads.iter().enumerate() {
555 let key: Option<&[u8]> = keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
556
557 let partition = self.partitioner.partition(key, self.topic_partition_count);
559
560 let mut record = FutureRecord::to(&self.config.topic).payload(payload);
562
563 if let Some(k) = key {
564 record = record.key(k);
565 }
566 if let Some(p) = partition {
567 record = record.partition(p);
568 }
569
570 delivery_futures.push(producer.send(record, Duration::from_millis(500)));
574
575 if flush_threshold > 0 && (i + 1) % flush_threshold == 0 {
577 producer
578 .flush(self.config.delivery_timeout)
579 .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
580 }
581 }
582
583 for (i, future) in delivery_futures.into_iter().enumerate() {
585 match future.await {
586 Ok(_delivery) => {
587 records_written += 1;
588 bytes_written += payloads[i].len() as u64;
589 }
590 Err((err, _msg)) => {
591 self.metrics.record_error();
592 let err_msg = err.to_string();
593
594 if self.dlq_producer.is_some() {
595 let key: Option<&[u8]> =
596 keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
597 self.route_to_dlq(&payloads[i], key, &err_msg).await?;
598 } else {
599 return Err(ConnectorError::WriteError(format!(
600 "Kafka produce failed: {err_msg}"
601 )));
602 }
603 }
604 }
605 }
606
607 self.metrics
608 .record_write(records_written as u64, bytes_written);
609
610 debug!(
611 records = records_written,
612 bytes = bytes_written,
613 "wrote batch to Kafka"
614 );
615
616 Ok(WriteResult::new(records_written, bytes_written))
617 }
618
619 fn schema(&self) -> SchemaRef {
620 self.schema.clone()
621 }
622
623 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
624 self.current_epoch = epoch;
625
626 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
627 let producer = self
628 .producer
629 .as_ref()
630 .ok_or_else(|| ConnectorError::InvalidState {
631 expected: "Running".into(),
632 actual: self.state.to_string(),
633 })?;
634
635 producer.begin_transaction().map_err(|e| {
636 ConnectorError::TransactionError(format!(
637 "failed to begin transaction for epoch {epoch}: {e}"
638 ))
639 })?;
640
641 self.transaction_active = true;
642 }
643
644 self.partitioner.reset();
645 debug!(epoch, "began epoch");
646 Ok(())
647 }
648
649 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
650 if epoch != self.current_epoch {
651 return Err(ConnectorError::TransactionError(format!(
652 "epoch mismatch in pre_commit: expected {}, got {epoch}",
653 self.current_epoch
654 )));
655 }
656
657 if let Some(ref producer) = self.producer {
659 producer.flush(self.config.delivery_timeout).map_err(|e| {
660 ConnectorError::TransactionError(format!(
661 "failed to flush before pre-commit for epoch {epoch}: {e}"
662 ))
663 })?;
664 }
665
666 debug!(epoch, "pre-committed epoch (flushed)");
667 Ok(())
668 }
669
670 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
671 if epoch != self.current_epoch {
672 return Err(ConnectorError::TransactionError(format!(
673 "epoch mismatch: expected {}, got {epoch}",
674 self.current_epoch
675 )));
676 }
677
678 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
679 let producer = self
680 .producer
681 .as_ref()
682 .ok_or_else(|| ConnectorError::InvalidState {
683 expected: "Running".into(),
684 actual: self.state.to_string(),
685 })?;
686
687 producer.flush(self.config.delivery_timeout).map_err(|e| {
689 ConnectorError::TransactionError(format!("failed to flush before commit: {e}"))
690 })?;
691
692 producer
694 .commit_transaction(self.config.transaction_timeout)
695 .map_err(|e| {
696 ConnectorError::TransactionError(format!(
697 "failed to commit transaction for epoch {epoch}: {e}"
698 ))
699 })?;
700
701 self.transaction_active = false;
702 } else {
703 if let Some(ref producer) = self.producer {
705 producer.flush(self.config.delivery_timeout).map_err(|e| {
706 ConnectorError::TransactionError(format!(
707 "failed to flush for epoch {epoch}: {e}"
708 ))
709 })?;
710 }
711 }
712
713 self.last_committed_epoch = epoch;
714 self.metrics.record_commit();
715 debug!(epoch, "committed epoch");
716 Ok(())
717 }
718
719 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
720 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
721 && self.transaction_active
722 {
723 let producer = self
724 .producer
725 .as_ref()
726 .ok_or_else(|| ConnectorError::InvalidState {
727 expected: "Running".into(),
728 actual: self.state.to_string(),
729 })?;
730
731 producer
732 .abort_transaction(self.config.transaction_timeout)
733 .map_err(|e| {
734 ConnectorError::TransactionError(format!(
735 "failed to abort transaction for epoch {epoch}: {e}"
736 ))
737 })?;
738
739 self.transaction_active = false;
740 }
741
742 self.metrics.record_rollback();
743 debug!(epoch, "rolled back epoch");
744 Ok(())
745 }
746
747 fn health_check(&self) -> HealthStatus {
748 match self.state {
749 ConnectorState::Running => HealthStatus::Healthy,
750 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
751 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
752 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
753 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
754 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
755 }
756 }
757
758 fn metrics(&self) -> ConnectorMetrics {
759 self.metrics.to_connector_metrics()
760 }
761
762 fn capabilities(&self) -> SinkConnectorCapabilities {
763 let mut caps = SinkConnectorCapabilities::default()
764 .with_idempotent()
765 .with_partitioned();
766
767 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
768 caps = caps.with_exactly_once().with_two_phase_commit();
769 }
770
771 if self.schema_registry.is_some() {
772 caps = caps.with_schema_evolution();
773 }
774
775 caps
776 }
777
778 async fn flush(&mut self) -> Result<(), ConnectorError> {
779 if let Some(ref producer) = self.producer {
780 producer
781 .flush(self.config.delivery_timeout)
782 .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
783 }
784 Ok(())
785 }
786
787 async fn close(&mut self) -> Result<(), ConnectorError> {
788 info!("closing Kafka sink connector");
789
790 if self.transaction_active {
792 if let Err(e) = self.rollback_epoch(self.current_epoch).await {
793 warn!(error = %e, "failed to abort active transaction on close");
794 }
795 }
796
797 if let Some(ref producer) = self.producer {
799 if let Err(e) = producer.flush(Duration::from_secs(30)) {
800 warn!(error = %e, "failed to flush on close");
801 }
802 }
803
804 self.producer = None;
805 self.dlq_producer = None;
806 self.state = ConnectorState::Closed;
807 info!("Kafka sink connector closed");
808 Ok(())
809 }
810}
811
812impl std::fmt::Debug for KafkaSink {
813 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
814 f.debug_struct("KafkaSink")
815 .field("state", &self.state)
816 .field("topic", &self.config.topic)
817 .field("delivery", &self.config.delivery_guarantee)
818 .field("format", &self.config.format)
819 .field("current_epoch", &self.current_epoch)
820 .field("last_committed_epoch", &self.last_committed_epoch)
821 .field("transaction_active", &self.transaction_active)
822 .finish_non_exhaustive()
823 }
824}
825
826fn select_serializer(
831 format: Format,
832 schema: &SchemaRef,
833 schema_id: Arc<std::sync::atomic::AtomicU32>,
834 registry: Option<Arc<SchemaRegistryClient>>,
835) -> Box<dyn RecordSerializer> {
836 match format {
837 Format::Avro => Box::new(AvroSerializer::with_shared_schema_id(
838 schema.clone(),
839 schema_id,
840 registry,
841 )),
842 other => serde::create_serializer(other).unwrap_or_else(|_| {
843 tracing::warn!(format = %other, "unsupported serializer format, falling back to JSON");
844 Box::new(serde::json::JsonSerializer::new())
845 }),
846 }
847}
848
849fn select_partitioner(strategy: PartitionStrategy) -> Box<dyn KafkaPartitioner> {
851 match strategy {
852 PartitionStrategy::KeyHash => Box::new(KeyHashPartitioner::new()),
853 PartitionStrategy::RoundRobin => Box::new(RoundRobinPartitioner::new()),
854 PartitionStrategy::Sticky => Box::new(StickyPartitioner::new(100)),
855 }
856}
857
858#[cfg(test)]
859mod tests {
860 use super::*;
861 use arrow_array::Int64Array;
862 use arrow_schema::{DataType, Field, Schema};
863
864 fn test_schema() -> SchemaRef {
865 Arc::new(Schema::new(vec![
866 Field::new("id", DataType::Int64, false),
867 Field::new("value", DataType::Utf8, false),
868 ]))
869 }
870
871 fn test_config() -> KafkaSinkConfig {
872 let mut cfg = KafkaSinkConfig::default();
873 cfg.bootstrap_servers = "localhost:9092".into();
874 cfg.topic = "output-events".into();
875 cfg
876 }
877
878 #[test]
879 fn test_new_defaults() {
880 let sink = KafkaSink::new(test_schema(), test_config());
881 assert_eq!(sink.state(), ConnectorState::Created);
882 assert!(sink.producer.is_none());
883 assert_eq!(sink.current_epoch(), 0);
884 assert_eq!(sink.last_committed_epoch(), 0);
885 assert!(!sink.transaction_active);
886 }
887
888 #[test]
889 fn test_schema_returned() {
890 let schema = test_schema();
891 let sink = KafkaSink::new(schema.clone(), test_config());
892 assert_eq!(sink.schema(), schema);
893 }
894
895 #[test]
896 fn test_health_check_created() {
897 let sink = KafkaSink::new(test_schema(), test_config());
898 assert_eq!(sink.health_check(), HealthStatus::Unknown);
899 }
900
901 #[test]
902 fn test_health_check_running() {
903 let mut sink = KafkaSink::new(test_schema(), test_config());
904 sink.state = ConnectorState::Running;
905 assert_eq!(sink.health_check(), HealthStatus::Healthy);
906 }
907
908 #[test]
909 fn test_health_check_closed() {
910 let mut sink = KafkaSink::new(test_schema(), test_config());
911 sink.state = ConnectorState::Closed;
912 assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
913 }
914
915 #[test]
916 fn test_metrics_initial() {
917 let sink = KafkaSink::new(test_schema(), test_config());
918 let m = sink.metrics();
919 assert_eq!(m.records_total, 0);
920 assert_eq!(m.bytes_total, 0);
921 assert_eq!(m.errors_total, 0);
922 }
923
924 #[test]
925 fn test_capabilities_at_least_once() {
926 let sink = KafkaSink::new(test_schema(), test_config());
927 let caps = sink.capabilities();
928 assert!(!caps.exactly_once);
929 assert!(caps.idempotent);
930 assert!(caps.partitioned);
931 assert!(!caps.schema_evolution);
932 }
933
934 #[test]
935 fn test_capabilities_exactly_once() {
936 let mut cfg = test_config();
937 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
938 let sink = KafkaSink::new(test_schema(), cfg);
939 let caps = sink.capabilities();
940 assert!(caps.exactly_once);
941 assert!(caps.idempotent);
942 assert!(caps.partitioned);
943 }
944
945 #[test]
946 fn test_serializer_selection_json() {
947 let sink = KafkaSink::new(test_schema(), test_config());
948 assert_eq!(sink.serializer.format(), Format::Json);
949 }
950
951 #[test]
952 fn test_serializer_selection_avro() {
953 let mut cfg = test_config();
954 cfg.format = Format::Avro;
955 let sink = KafkaSink::new(test_schema(), cfg);
956 assert_eq!(sink.serializer.format(), Format::Avro);
957 }
958
959 #[test]
960 fn test_with_schema_registry() {
961 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
962 let mut cfg = test_config();
963 cfg.format = Format::Avro;
964 cfg.schema_registry_url = Some("http://localhost:8081".into());
965
966 let sink = KafkaSink::with_schema_registry(test_schema(), cfg, sr);
967 assert!(sink.has_schema_registry());
968 assert_eq!(sink.serializer.format(), Format::Avro);
969 let caps = sink.capabilities();
970 assert!(caps.schema_evolution);
971 }
972
973 #[test]
974 fn test_debug_output() {
975 let sink = KafkaSink::new(test_schema(), test_config());
976 let debug = format!("{sink:?}");
977 assert!(debug.contains("KafkaSink"));
978 assert!(debug.contains("output-events"));
979 }
980
981 #[test]
982 fn test_extract_keys_no_key_column() {
983 let sink = KafkaSink::new(test_schema(), test_config());
984 let batch = arrow_array::RecordBatch::try_new(
985 test_schema(),
986 vec![
987 Arc::new(Int64Array::from(vec![1, 2])),
988 Arc::new(StringArray::from(vec!["a", "b"])),
989 ],
990 )
991 .unwrap();
992 assert!(sink.extract_keys(&batch).unwrap().is_none());
993 }
994
995 #[test]
996 fn test_extract_keys_with_key_column() {
997 let mut cfg = test_config();
998 cfg.key_column = Some("value".into());
999 let sink = KafkaSink::new(test_schema(), cfg);
1000 let batch = arrow_array::RecordBatch::try_new(
1001 test_schema(),
1002 vec![
1003 Arc::new(Int64Array::from(vec![1, 2])),
1004 Arc::new(StringArray::from(vec!["key-a", "key-b"])),
1005 ],
1006 )
1007 .unwrap();
1008 let keys = sink.extract_keys(&batch).unwrap().unwrap();
1009 assert_eq!(keys.len(), 2);
1010 assert_eq!(&keys[0], b"key-a");
1011 assert_eq!(&keys[1], b"key-b");
1012 }
1013}