1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use arrow_array::{Array, StringArray};
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9use rdkafka::error::{KafkaError, RDKafkaErrorCode};
10use rdkafka::message::OwnedHeaders;
11use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
12use rdkafka::ClientConfig;
13use tracing::{debug, info, warn};
14
15use crate::config::{ConnectorConfig, ConnectorState};
16use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::metrics::ConnectorMetrics;
20use crate::serde::{self, Format, RecordSerializer};
21
22use super::avro_serializer::AvroSerializer;
23use super::partitioner::{
24 KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
25};
26use super::schema_registry::SchemaRegistryClient;
27use super::sink_config::{KafkaSinkConfig, PartitionStrategy};
28use super::sink_metrics::KafkaSinkMetrics;
29use crate::connector::DeliveryGuarantee;
30
31const FALLBACK_PARTITION_COUNT: i32 = 1;
33
34const PERIODIC_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
39
40struct KeyBuffer {
44 data: Vec<u8>,
45 offsets: Vec<(usize, usize)>,
46}
47
48impl KeyBuffer {
49 fn with_capacity(num_rows: usize, avg_key_len: usize) -> Self {
50 Self {
51 data: Vec::with_capacity(num_rows * avg_key_len),
52 offsets: Vec::with_capacity(num_rows),
53 }
54 }
55
56 fn push(&mut self, key: &[u8]) {
57 let start = self.data.len();
58 self.data.extend_from_slice(key);
59 self.offsets.push((start, key.len()));
60 }
61
62 fn push_empty(&mut self) {
63 self.offsets.push((0, 0));
64 }
65
66 fn key(&self, i: usize) -> &[u8] {
67 let (start, len) = self.offsets[i];
68 &self.data[start..start + len]
69 }
70
71 #[cfg(test)]
72 fn len(&self) -> usize {
73 self.offsets.len()
74 }
75}
76
77impl std::ops::Index<usize> for KeyBuffer {
78 type Output = [u8];
79
80 fn index(&self, i: usize) -> &[u8] {
81 self.key(i)
82 }
83}
84
85pub struct KafkaSink {
100 producer: Option<FutureProducer>,
102 config: KafkaSinkConfig,
104 serializer: Box<dyn RecordSerializer>,
106 partitioner: Box<dyn KafkaPartitioner>,
108 state: ConnectorState,
110 current_epoch: u64,
112 last_committed_epoch: u64,
114 transaction_active: bool,
116 dlq_producer: Option<FutureProducer>,
118 metrics: KafkaSinkMetrics,
120 schema: SchemaRef,
122 schema_registry: Option<Arc<SchemaRegistryClient>>,
124 avro_schema_id: Arc<std::sync::atomic::AtomicU32>,
126 topic_partition_count: i32,
128}
129
130impl KafkaSink {
131 #[must_use]
138 pub fn new(
139 schema: SchemaRef,
140 config: KafkaSinkConfig,
141 registry: Option<&prometheus::Registry>,
142 ) -> Self {
143 let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
144 let serializer =
145 select_serializer(config.format, &schema, Arc::clone(&avro_schema_id), None)
146 .expect("format validated in KafkaSinkConfig::validate()");
147 let partitioner = select_partitioner(config.partitioner);
148
149 Self {
150 producer: None,
151 config,
152 serializer,
153 partitioner,
154 state: ConnectorState::Created,
155 current_epoch: 0,
156 last_committed_epoch: 0,
157 transaction_active: false,
158 dlq_producer: None,
159 metrics: KafkaSinkMetrics::new(registry),
160 schema,
161 schema_registry: None,
162 avro_schema_id,
163 topic_partition_count: FALLBACK_PARTITION_COUNT,
164 }
165 }
166
167 #[must_use]
174 pub fn with_schema_registry(
175 schema: SchemaRef,
176 config: KafkaSinkConfig,
177 sr_client: SchemaRegistryClient,
178 ) -> Self {
179 let sr = Arc::new(sr_client);
180 let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
181 let serializer = select_serializer(
182 config.format,
183 &schema,
184 Arc::clone(&avro_schema_id),
185 Some(Arc::clone(&sr)),
186 )
187 .expect("format validated in KafkaSinkConfig::validate()");
188 let partitioner = select_partitioner(config.partitioner);
189
190 Self {
191 producer: None,
192 config,
193 serializer,
194 partitioner,
195 state: ConnectorState::Created,
196 current_epoch: 0,
197 last_committed_epoch: 0,
198 transaction_active: false,
199 dlq_producer: None,
200 metrics: KafkaSinkMetrics::new(None),
201 schema,
202 schema_registry: Some(sr),
203 avro_schema_id,
204 topic_partition_count: FALLBACK_PARTITION_COUNT,
205 }
206 }
207
208 #[must_use]
210 pub fn state(&self) -> ConnectorState {
211 self.state
212 }
213
214 #[must_use]
216 pub fn has_schema_registry(&self) -> bool {
217 self.schema_registry.is_some()
218 }
219
220 #[must_use]
222 pub fn current_epoch(&self) -> u64 {
223 self.current_epoch
224 }
225
226 #[must_use]
228 pub fn last_committed_epoch(&self) -> u64 {
229 self.last_committed_epoch
230 }
231
232 async fn ensure_schema_ready(
234 &mut self,
235 batch_schema: &SchemaRef,
236 ) -> Result<(), ConnectorError> {
237 let schema_changed = self.schema != *batch_schema;
238 let needs_registration = self.config.format == Format::Avro
239 && (schema_changed
240 || self
241 .avro_schema_id
242 .load(std::sync::atomic::Ordering::Relaxed)
243 == 0);
244
245 if needs_registration {
249 if let Some(ref sr) = self.schema_registry {
250 let subject = format!("{}-value", self.config.topic);
251 let avro_schema =
252 super::schema_registry::arrow_to_avro_schema(batch_schema, &self.config.topic)
253 .map_err(ConnectorError::Serde)?;
254 let schema_id = sr
255 .register_schema(
256 &subject,
257 &avro_schema,
258 super::schema_registry::SchemaType::Avro,
259 )
260 .await
261 .map_err(|e| {
262 ConnectorError::ConnectionFailed(format!(
263 "failed to register Avro schema for '{subject}': {e}"
264 ))
265 })?;
266 #[allow(clippy::cast_sign_loss)]
267 self.avro_schema_id
268 .store(schema_id as u32, std::sync::atomic::Ordering::Relaxed);
269 info!(subject = %subject, schema_id, "registered Avro schema");
270 }
271 }
272
273 if schema_changed {
274 debug!(
275 old = ?self.schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
276 new = ?batch_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
277 "sink schema updated from incoming batch"
278 );
279 self.schema = batch_schema.clone();
280 self.serializer = select_serializer(
281 self.config.format,
282 &self.schema,
283 Arc::clone(&self.avro_schema_id),
284 self.schema_registry.clone(),
285 )?;
286 }
287
288 Ok(())
289 }
290
291 fn extract_keys(
295 &self,
296 batch: &arrow_array::RecordBatch,
297 ) -> Result<Option<KeyBuffer>, ConnectorError> {
298 let Some(key_col) = &self.config.key_column else {
299 return Ok(None);
300 };
301
302 let col_idx = batch.schema().index_of(key_col).map_err(|_| {
303 ConnectorError::ConfigurationError(format!(
304 "key column '{key_col}' not found in schema"
305 ))
306 })?;
307
308 let array = batch.column(col_idx);
309 let num_rows = batch.num_rows();
310 let mut buf = KeyBuffer::with_capacity(num_rows, 32);
311
312 if let Some(str_array) = array.as_any().downcast_ref::<StringArray>() {
314 for i in 0..num_rows {
315 if str_array.is_null(i) {
316 buf.push_empty();
317 } else {
318 buf.push(str_array.value(i).as_bytes());
319 }
320 }
321 } else {
322 use std::fmt::Write;
324 let formatter = arrow_cast::display::ArrayFormatter::try_new(
325 array,
326 &arrow_cast::display::FormatOptions::default(),
327 )
328 .map_err(|e| {
329 ConnectorError::Internal(format!(
330 "failed to create array formatter for key column: {e}"
331 ))
332 })?;
333 let mut fmt_buf = String::with_capacity(64);
335 for i in 0..num_rows {
336 if array.is_null(i) {
337 buf.push_empty();
338 } else {
339 fmt_buf.clear();
340 let _ = write!(fmt_buf, "{}", formatter.value(i));
341 buf.push(fmt_buf.as_bytes());
342 }
343 }
344 }
345
346 Ok(Some(buf))
347 }
348
349 async fn route_to_dlq(
351 &self,
352 payload: &[u8],
353 key: Option<&[u8]>,
354 error_msg: &str,
355 ) -> Result<(), ConnectorError> {
356 let dlq_producer = self
357 .dlq_producer
358 .as_ref()
359 .ok_or_else(|| ConnectorError::ConfigurationError("DLQ topic not configured".into()))?;
360 let dlq_topic =
361 self.config.dlq_topic.as_ref().ok_or_else(|| {
362 ConnectorError::ConfigurationError("DLQ topic not configured".into())
363 })?;
364
365 let now = std::time::SystemTime::now()
366 .duration_since(std::time::UNIX_EPOCH)
367 .unwrap_or_else(|_| {
368 tracing::warn!("system clock before Unix epoch — using 0 for DLQ timestamp");
369 std::time::Duration::ZERO
370 })
371 .as_millis()
372 .to_string();
373 let epoch_str = self.current_epoch.to_string();
374
375 let headers = OwnedHeaders::new()
376 .insert(rdkafka::message::Header {
377 key: "__dlq.error",
378 value: Some(error_msg.as_bytes()),
379 })
380 .insert(rdkafka::message::Header {
381 key: "__dlq.topic",
382 value: Some(self.config.topic.as_bytes()),
383 })
384 .insert(rdkafka::message::Header {
385 key: "__dlq.timestamp",
386 value: Some(now.as_bytes()),
387 })
388 .insert(rdkafka::message::Header {
389 key: "__dlq.epoch",
390 value: Some(epoch_str.as_bytes()),
391 });
392
393 let mut record = FutureRecord::to(dlq_topic)
394 .payload(payload)
395 .headers(headers);
396
397 if let Some(k) = key {
398 record = record.key(k);
399 }
400
401 dlq_producer
402 .send(record, Duration::from_secs(5))
403 .await
404 .map_err(|(e, _)| ConnectorError::WriteError(format!("DLQ send failed: {e}")))?;
405
406 self.metrics.record_dlq();
407 Ok(())
408 }
409
410 async fn enqueue_with_queue_retry(
415 producer: &FutureProducer,
416 mut record: FutureRecord<'_, [u8], [u8]>,
417 queue_timeout: Duration,
418 ) -> Result<DeliveryFuture, ConnectorError> {
419 let start = Instant::now();
420 loop {
421 match producer.send_result(record) {
422 Ok(fut) => return Ok(fut),
423 Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), r))
424 if start.elapsed() < queue_timeout =>
425 {
426 record = r;
427 tokio::time::sleep(Duration::from_millis(100)).await;
428 }
429 Err((e, _)) => {
430 return Err(ConnectorError::WriteError(format!(
431 "Kafka enqueue failed: {e}"
432 )));
433 }
434 }
435 }
436 }
437
438 async fn flush_producer_async(
441 producer: &FutureProducer,
442 timeout: Duration,
443 ) -> Result<(), rdkafka::error::KafkaError> {
444 let p = producer.clone();
445 match tokio::task::spawn_blocking(move || p.flush(timeout)).await {
446 Ok(result) => result,
447 Err(join_err) => {
448 warn!("flush blocking task failed: {join_err}");
449 Err(rdkafka::error::KafkaError::Canceled)
450 }
451 }
452 }
453
454 async fn producer_blocking<F, R>(producer: &FutureProducer, op: F) -> R
458 where
459 F: FnOnce(&FutureProducer) -> R + Send + 'static,
460 R: Send + 'static,
461 {
462 let p = producer.clone();
463 tokio::task::spawn_blocking(move || op(&p))
464 .await
465 .expect("producer_blocking: blocking task panicked")
466 }
467}
468
469#[async_trait]
470#[allow(clippy::too_many_lines)]
471impl SinkConnector for KafkaSink {
472 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
473 self.state = ConnectorState::Initializing;
474
475 if !config.properties().is_empty() {
477 let parsed = KafkaSinkConfig::from_config(config)?;
478 self.config = parsed;
479 self.serializer = select_serializer(
480 self.config.format,
481 &self.schema,
482 Arc::clone(&self.avro_schema_id),
483 self.schema_registry.clone(),
484 )?;
485 self.partitioner = select_partitioner(self.config.partitioner);
486 }
487
488 info!(
489 brokers = %self.config.bootstrap_servers,
490 topic = %self.config.topic,
491 format = %self.config.format,
492 delivery = %self.config.delivery_guarantee,
493 "opening Kafka sink connector"
494 );
495
496 let rdkafka_config: ClientConfig = self.config.to_rdkafka_config();
498 let producer: FutureProducer = rdkafka_config.create().map_err(|e| {
499 ConnectorError::ConnectionFailed(format!("failed to create producer: {e}"))
500 })?;
501
502 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
504 producer
505 .init_transactions(self.config.transaction_timeout)
506 .map_err(|e| {
507 ConnectorError::TransactionError(format!("failed to init transactions: {e}"))
508 })?;
509 }
510
511 if self.config.dlq_topic.is_some() {
516 let dlq_config = self.config.to_dlq_rdkafka_config();
517 let dlq_producer: FutureProducer = dlq_config.create().map_err(|e| {
518 ConnectorError::ConnectionFailed(format!("failed to create DLQ producer: {e}"))
519 })?;
520 self.dlq_producer = Some(dlq_producer);
521 }
522
523 if let Some(ref url) = self.config.schema_registry_url {
525 if self.schema_registry.is_none() {
526 let sr = if let Some(ref ca_path) = self.config.schema_registry_ssl_ca_location {
527 SchemaRegistryClient::with_tls(
528 url,
529 self.config.schema_registry_auth.clone(),
530 ca_path,
531 )?
532 } else {
533 SchemaRegistryClient::new(url, self.config.schema_registry_auth.clone())
534 };
535 self.schema_registry = Some(Arc::new(sr));
536 }
537 }
538
539 if self.config.format == Format::Avro {
544 if let Some(ref sr) = self.schema_registry {
545 if let Some(ref compat) = self.config.schema_compatibility {
546 let subject = format!("{}-value", self.config.topic);
547 sr.set_compatibility_level(&subject, *compat)
548 .await
549 .map_err(|e| {
550 ConnectorError::ConnectionFailed(format!(
551 "failed to set SR compatibility for '{subject}': {e}"
552 ))
553 })?;
554 }
555 }
556 }
557
558 self.topic_partition_count = FALLBACK_PARTITION_COUNT;
561 match producer
562 .client()
563 .fetch_metadata(Some(&self.config.topic), Duration::from_secs(5))
564 {
565 Ok(metadata) => {
566 if let Some(topic_meta) = metadata.topics().first() {
567 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
568 let count = topic_meta.partitions().len() as i32;
569 if count > 0 {
570 self.topic_partition_count = count;
571 info!(
572 topic = %self.config.topic,
573 partitions = count,
574 "queried topic partition count from broker"
575 );
576 }
577 }
578 }
579 Err(e) => {
580 warn!(
581 topic = %self.config.topic,
582 error = %e,
583 fallback = FALLBACK_PARTITION_COUNT,
584 "failed to query topic metadata — using fallback partition count"
585 );
586 }
587 }
588
589 self.producer = Some(producer);
590 self.state = ConnectorState::Running;
591 info!("Kafka sink connector opened successfully");
592 Ok(())
593 }
594
595 #[allow(clippy::cast_possible_truncation)] async fn write_batch(
597 &mut self,
598 batch: &arrow_array::RecordBatch,
599 ) -> Result<WriteResult, ConnectorError> {
600 if self.state != ConnectorState::Running {
601 return Err(ConnectorError::InvalidState {
602 expected: "Running".into(),
603 actual: self.state.to_string(),
604 });
605 }
606
607 self.ensure_schema_ready(&batch.schema()).await?;
608
609 let producer = self
610 .producer
611 .as_ref()
612 .ok_or_else(|| ConnectorError::InvalidState {
613 expected: "producer initialized".into(),
614 actual: "producer is None".into(),
615 })?;
616
617 let payloads = self.serializer.serialize(batch).map_err(|e| {
619 self.metrics.record_serialization_error();
620 ConnectorError::Serde(e)
621 })?;
622
623 let keys = self.extract_keys(batch)?;
625
626 let mut records_written: usize = 0;
627 let mut bytes_written: u64 = 0;
628
629 let flush_threshold = self.config.flush_batch_size;
632 let mut delivery_futures = Vec::with_capacity(payloads.len());
633 for (i, payload) in payloads.iter().enumerate() {
634 let key: Option<&[u8]> = keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
635 let partition = self.partitioner.partition(key, self.topic_partition_count);
636
637 let mut record = FutureRecord::to(&self.config.topic).payload(payload.as_slice());
638 if let Some(k) = key {
639 record = record.key(k);
640 }
641 if let Some(p) = partition {
642 record = record.partition(p);
643 }
644
645 let fut = Self::enqueue_with_queue_retry(producer, record, Duration::from_millis(500))
648 .await?;
649 delivery_futures.push((Instant::now(), fut));
650
651 if flush_threshold > 0 && (i + 1) % flush_threshold == 0 {
652 Self::flush_producer_async(producer, self.config.delivery_timeout)
653 .await
654 .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
655 }
656 }
657
658 let mut failed: usize = 0;
661 let mut first_error: Option<String> = None;
662 for (i, (send_time, future)) in delivery_futures.into_iter().enumerate() {
663 let err_msg = match future.await {
664 Ok(Ok(_)) => {
665 let latency_us = send_time.elapsed().as_micros() as u64;
666 self.metrics.record_produce_latency(latency_us);
667 records_written += 1;
668 bytes_written += payloads[i].len() as u64;
669 continue;
670 }
671 Ok(Err((err, _))) => err.to_string(),
672 Err(_canceled) => "delivery canceled — producer dropped before ack".into(),
673 };
674
675 self.metrics.record_error();
676 failed += 1;
677 if first_error.is_none() {
678 first_error = Some(err_msg.clone());
679 }
680
681 if self.dlq_producer.is_some() {
682 let key: Option<&[u8]> =
683 keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
684 if let Err(dlq_err) = self.route_to_dlq(&payloads[i], key, &err_msg).await {
685 warn!(
686 original_error = %err_msg,
687 dlq_error = %dlq_err,
688 "failed to route record to DLQ — record lost"
689 );
690 }
691 }
692 }
693
694 self.metrics
695 .record_write(records_written as u64, bytes_written);
696
697 debug!(
698 records = records_written,
699 bytes = bytes_written,
700 failed,
701 "wrote batch to Kafka"
702 );
703
704 if failed > 0 && self.dlq_producer.is_none() {
707 return Err(ConnectorError::WriteError(format!(
708 "Kafka produce: {failed}/{} records failed, first error: {}",
709 payloads.len(),
710 first_error.unwrap_or_else(|| "unknown".into())
711 )));
712 }
713
714 Ok(WriteResult::new(records_written, bytes_written))
715 }
716
717 fn schema(&self) -> SchemaRef {
718 self.schema.clone()
719 }
720
721 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
722 self.current_epoch = epoch;
723
724 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
725 let producer = self
726 .producer
727 .as_ref()
728 .ok_or_else(|| ConnectorError::InvalidState {
729 expected: "Running".into(),
730 actual: self.state.to_string(),
731 })?;
732
733 if self.transaction_active {
734 warn!(epoch, "aborting stale transaction before new epoch");
735 let txn_timeout = self.config.transaction_timeout;
736 Self::producer_blocking(producer, move |p| p.abort_transaction(txn_timeout))
737 .await
738 .map_err(|e| {
739 ConnectorError::TransactionError(format!(
740 "cannot begin epoch {epoch}: abort of stale transaction failed: {e}"
741 ))
742 })?;
743 self.transaction_active = false;
744 }
745
746 Self::producer_blocking(producer, FutureProducer::begin_transaction)
747 .await
748 .map_err(|e| {
749 ConnectorError::TransactionError(format!(
750 "failed to begin transaction for epoch {epoch}: {e}"
751 ))
752 })?;
753
754 self.transaction_active = true;
755 }
756
757 self.partitioner.reset();
758 debug!(epoch, "began epoch");
759 Ok(())
760 }
761
762 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
763 if epoch != self.current_epoch {
764 return Err(ConnectorError::TransactionError(format!(
765 "epoch mismatch in pre_commit: expected {}, got {epoch}",
766 self.current_epoch
767 )));
768 }
769
770 if let Some(ref producer) = self.producer {
772 Self::flush_producer_async(producer, self.config.delivery_timeout)
773 .await
774 .map_err(|e| {
775 ConnectorError::TransactionError(format!(
776 "failed to flush before pre-commit for epoch {epoch}: {e}"
777 ))
778 })?;
779 }
780
781 debug!(epoch, "pre-committed epoch (flushed)");
782 Ok(())
783 }
784
785 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
786 if epoch != self.current_epoch {
787 return Err(ConnectorError::TransactionError(format!(
788 "epoch mismatch: expected {}, got {epoch}",
789 self.current_epoch
790 )));
791 }
792
793 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
794 let producer = self
795 .producer
796 .as_ref()
797 .ok_or_else(|| ConnectorError::InvalidState {
798 expected: "Running".into(),
799 actual: self.state.to_string(),
800 })?;
801
802 Self::flush_producer_async(producer, self.config.delivery_timeout)
803 .await
804 .map_err(|e| {
805 ConnectorError::TransactionError(format!("failed to flush before commit: {e}"))
806 })?;
807
808 let txn_timeout = self.config.transaction_timeout;
809 Self::producer_blocking(producer, move |p| p.commit_transaction(txn_timeout))
810 .await
811 .map_err(|e| {
812 ConnectorError::TransactionError(format!(
813 "failed to commit transaction for epoch {epoch}: {e}"
814 ))
815 })?;
816
817 self.transaction_active = false;
818 } else {
819 if let Some(ref producer) = self.producer {
821 Self::flush_producer_async(producer, self.config.delivery_timeout)
822 .await
823 .map_err(|e| {
824 ConnectorError::TransactionError(format!(
825 "failed to flush for epoch {epoch}: {e}"
826 ))
827 })?;
828 }
829 }
830
831 self.last_committed_epoch = epoch;
832 self.metrics.record_commit();
833 debug!(epoch, "committed epoch");
834 Ok(())
835 }
836
837 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
838 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
839 && self.transaction_active
840 {
841 let producer = self
842 .producer
843 .as_ref()
844 .ok_or_else(|| ConnectorError::InvalidState {
845 expected: "Running".into(),
846 actual: self.state.to_string(),
847 })?;
848
849 let txn_timeout = self.config.transaction_timeout;
850 Self::producer_blocking(producer, move |p| p.abort_transaction(txn_timeout))
851 .await
852 .map_err(|e| {
853 ConnectorError::TransactionError(format!(
854 "failed to abort transaction for epoch {epoch}: {e}"
855 ))
856 })?;
857
858 self.transaction_active = false;
859 }
860
861 self.metrics.record_rollback();
862 debug!(epoch, "rolled back epoch");
863 Ok(())
864 }
865
866 fn health_check(&self) -> HealthStatus {
867 match self.state {
868 ConnectorState::Running => HealthStatus::Healthy,
869 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
870 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
871 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
872 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
873 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
874 }
875 }
876
877 fn metrics(&self) -> ConnectorMetrics {
878 self.metrics.to_connector_metrics()
879 }
880
881 fn capabilities(&self) -> SinkConnectorCapabilities {
882 let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(10))
885 .with_idempotent()
886 .with_partitioned();
887
888 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
889 caps = caps.with_exactly_once().with_two_phase_commit();
890 }
891
892 if self.schema_registry.is_some() {
893 caps = caps.with_schema_evolution();
894 }
895
896 caps
897 }
898
899 async fn flush(&mut self) -> Result<(), ConnectorError> {
900 if let Some(ref producer) = self.producer {
901 Self::flush_producer_async(producer, PERIODIC_FLUSH_TIMEOUT)
902 .await
903 .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
904 }
905 Ok(())
906 }
907
908 async fn close(&mut self) -> Result<(), ConnectorError> {
909 info!("closing Kafka sink connector");
910
911 if self.transaction_active {
913 if let Err(e) = self.rollback_epoch(self.current_epoch).await {
914 warn!(error = %e, "failed to abort active transaction on close");
915 }
916 }
917
918 let mut first_err: Option<ConnectorError> = None;
919
920 if let Some(ref producer) = self.producer {
921 if let Err(e) = Self::flush_producer_async(producer, Duration::from_secs(30)).await {
922 warn!(error = %e, "failed to flush on close");
923 first_err.get_or_insert(ConnectorError::WriteError(format!(
924 "flush failed on close: {e}"
925 )));
926 }
927 }
928
929 if let Some(ref dlq) = self.dlq_producer {
930 if let Err(e) = Self::flush_producer_async(dlq, Duration::from_secs(10)).await {
931 warn!(error = %e, "failed to flush DLQ producer on close");
932 first_err.get_or_insert(ConnectorError::WriteError(format!(
933 "DLQ flush failed on close: {e}"
934 )));
935 }
936 }
937
938 self.producer = None;
939 self.dlq_producer = None;
940 self.state = ConnectorState::Closed;
941 info!("Kafka sink connector closed");
942 match first_err {
943 Some(e) => Err(e),
944 None => Ok(()),
945 }
946 }
947}
948
949impl std::fmt::Debug for KafkaSink {
950 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
951 f.debug_struct("KafkaSink")
952 .field("state", &self.state)
953 .field("topic", &self.config.topic)
954 .field("delivery", &self.config.delivery_guarantee)
955 .field("format", &self.config.format)
956 .field("current_epoch", &self.current_epoch)
957 .field("last_committed_epoch", &self.last_committed_epoch)
958 .field("transaction_active", &self.transaction_active)
959 .finish_non_exhaustive()
960 }
961}
962
963fn select_serializer(
968 format: Format,
969 schema: &SchemaRef,
970 schema_id: Arc<std::sync::atomic::AtomicU32>,
971 registry: Option<Arc<SchemaRegistryClient>>,
972) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
973 match format {
974 Format::Avro => Ok(Box::new(AvroSerializer::with_shared_schema_id(
975 schema.clone(),
976 schema_id,
977 registry,
978 ))),
979 other => serde::create_serializer(other).map_err(|e| {
980 ConnectorError::ConfigurationError(format!("unsupported sink format '{other}': {e}"))
981 }),
982 }
983}
984
985fn select_partitioner(strategy: PartitionStrategy) -> Box<dyn KafkaPartitioner> {
987 match strategy {
988 PartitionStrategy::KeyHash => Box::new(KeyHashPartitioner::new()),
989 PartitionStrategy::RoundRobin => Box::new(RoundRobinPartitioner::new()),
990 PartitionStrategy::Sticky => Box::new(StickyPartitioner::new(100)),
991 }
992}
993
994#[cfg(test)]
995mod tests {
996 use super::*;
997 use arrow_array::Int64Array;
998 use arrow_schema::{DataType, Field, Schema};
999
1000 fn test_schema() -> SchemaRef {
1001 Arc::new(Schema::new(vec![
1002 Field::new("id", DataType::Int64, false),
1003 Field::new("value", DataType::Utf8, false),
1004 ]))
1005 }
1006
1007 fn test_config() -> KafkaSinkConfig {
1008 let mut cfg = KafkaSinkConfig::default();
1009 cfg.bootstrap_servers = "localhost:9092".into();
1010 cfg.topic = "output-events".into();
1011 cfg
1012 }
1013
1014 #[test]
1015 fn test_new_defaults() {
1016 let sink = KafkaSink::new(test_schema(), test_config(), None);
1017 assert_eq!(sink.state(), ConnectorState::Created);
1018 assert!(sink.producer.is_none());
1019 assert_eq!(sink.current_epoch(), 0);
1020 assert_eq!(sink.last_committed_epoch(), 0);
1021 assert!(!sink.transaction_active);
1022 }
1023
1024 #[test]
1025 fn test_schema_returned() {
1026 let schema = test_schema();
1027 let sink = KafkaSink::new(schema.clone(), test_config(), None);
1028 assert_eq!(sink.schema(), schema);
1029 }
1030
1031 #[test]
1032 fn test_health_check_created() {
1033 let sink = KafkaSink::new(test_schema(), test_config(), None);
1034 assert_eq!(sink.health_check(), HealthStatus::Unknown);
1035 }
1036
1037 #[test]
1038 fn test_health_check_running() {
1039 let mut sink = KafkaSink::new(test_schema(), test_config(), None);
1040 sink.state = ConnectorState::Running;
1041 assert_eq!(sink.health_check(), HealthStatus::Healthy);
1042 }
1043
1044 #[test]
1045 fn test_health_check_closed() {
1046 let mut sink = KafkaSink::new(test_schema(), test_config(), None);
1047 sink.state = ConnectorState::Closed;
1048 assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1049 }
1050
1051 #[test]
1052 fn test_metrics_initial() {
1053 let sink = KafkaSink::new(test_schema(), test_config(), None);
1054 let m = sink.metrics();
1055 assert_eq!(m.records_total, 0);
1056 assert_eq!(m.bytes_total, 0);
1057 assert_eq!(m.errors_total, 0);
1058 }
1059
1060 #[test]
1061 fn test_capabilities_at_least_once() {
1062 let sink = KafkaSink::new(test_schema(), test_config(), None);
1063 let caps = sink.capabilities();
1064 assert!(!caps.exactly_once);
1065 assert!(caps.idempotent);
1066 assert!(caps.partitioned);
1067 assert!(!caps.schema_evolution);
1068 }
1069
1070 #[test]
1071 fn test_capabilities_exactly_once() {
1072 let mut cfg = test_config();
1073 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1074 let sink = KafkaSink::new(test_schema(), cfg, None);
1075 let caps = sink.capabilities();
1076 assert!(caps.exactly_once);
1077 assert!(caps.idempotent);
1078 assert!(caps.partitioned);
1079 }
1080
1081 #[test]
1082 fn test_serializer_selection_json() {
1083 let sink = KafkaSink::new(test_schema(), test_config(), None);
1084 assert_eq!(sink.serializer.format(), Format::Json);
1085 }
1086
1087 #[test]
1088 fn test_serializer_selection_avro() {
1089 let mut cfg = test_config();
1090 cfg.format = Format::Avro;
1091 let sink = KafkaSink::new(test_schema(), cfg, None);
1092 assert_eq!(sink.serializer.format(), Format::Avro);
1093 }
1094
1095 #[test]
1096 fn test_with_schema_registry() {
1097 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1098 let mut cfg = test_config();
1099 cfg.format = Format::Avro;
1100 cfg.schema_registry_url = Some("http://localhost:8081".into());
1101
1102 let sink = KafkaSink::with_schema_registry(test_schema(), cfg, sr);
1103 assert!(sink.has_schema_registry());
1104 assert_eq!(sink.serializer.format(), Format::Avro);
1105 let caps = sink.capabilities();
1106 assert!(caps.schema_evolution);
1107 }
1108
1109 #[test]
1110 fn test_debug_output() {
1111 let sink = KafkaSink::new(test_schema(), test_config(), None);
1112 let debug = format!("{sink:?}");
1113 assert!(debug.contains("KafkaSink"));
1114 assert!(debug.contains("output-events"));
1115 }
1116
1117 #[test]
1118 fn test_extract_keys_no_key_column() {
1119 let sink = KafkaSink::new(test_schema(), test_config(), None);
1120 let batch = arrow_array::RecordBatch::try_new(
1121 test_schema(),
1122 vec![
1123 Arc::new(Int64Array::from(vec![1, 2])),
1124 Arc::new(StringArray::from(vec!["a", "b"])),
1125 ],
1126 )
1127 .unwrap();
1128 assert!(sink.extract_keys(&batch).unwrap().is_none());
1129 }
1130
1131 #[test]
1132 fn test_extract_keys_with_key_column() {
1133 let mut cfg = test_config();
1134 cfg.key_column = Some("value".into());
1135 let sink = KafkaSink::new(test_schema(), cfg, None);
1136 let batch = arrow_array::RecordBatch::try_new(
1137 test_schema(),
1138 vec![
1139 Arc::new(Int64Array::from(vec![1, 2])),
1140 Arc::new(StringArray::from(vec!["key-a", "key-b"])),
1141 ],
1142 )
1143 .unwrap();
1144 let keys = sink.extract_keys(&batch).unwrap().unwrap();
1145 assert_eq!(keys.len(), 2);
1146 assert_eq!(&keys[0], b"key-a");
1147 assert_eq!(&keys[1], b"key-b");
1148 }
1149}