1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use arrow_array::{Array, StringArray};
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9use rdkafka::error::{KafkaError, RDKafkaErrorCode};
10use rdkafka::message::OwnedHeaders;
11use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
12use rdkafka::ClientConfig;
13use tracing::{debug, info, warn};
14
15use crate::config::{ConnectorConfig, ConnectorState};
16use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
17use crate::error::ConnectorError;
18use crate::serde::{self, Format, RecordSerializer};
19
20use super::avro_serializer::AvroSerializer;
21use super::partitioner::{
22 KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
23};
24use super::schema_registry::SchemaRegistryClient;
25use super::sink_config::{KafkaSinkConfig, PartitionStrategy};
26use super::sink_metrics::KafkaSinkMetrics;
27use crate::connector::DeliveryGuarantee;
28
29const FALLBACK_PARTITION_COUNT: i32 = 1;
31
32const PERIODIC_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
37
38struct KeyBuffer {
42 data: Vec<u8>,
43 offsets: Vec<(usize, usize)>,
44}
45
46impl KeyBuffer {
47 fn with_capacity(num_rows: usize, avg_key_len: usize) -> Self {
48 Self {
49 data: Vec::with_capacity(num_rows * avg_key_len),
50 offsets: Vec::with_capacity(num_rows),
51 }
52 }
53
54 fn push(&mut self, key: &[u8]) {
55 let start = self.data.len();
56 self.data.extend_from_slice(key);
57 self.offsets.push((start, key.len()));
58 }
59
60 fn push_empty(&mut self) {
61 self.offsets.push((0, 0));
62 }
63
64 fn key(&self, i: usize) -> &[u8] {
65 let (start, len) = self.offsets[i];
66 &self.data[start..start + len]
67 }
68
69 #[cfg(test)]
70 fn len(&self) -> usize {
71 self.offsets.len()
72 }
73}
74
75impl std::ops::Index<usize> for KeyBuffer {
76 type Output = [u8];
77
78 fn index(&self, i: usize) -> &[u8] {
79 self.key(i)
80 }
81}
82
83pub struct KafkaSink {
98 producer: Option<FutureProducer>,
100 config: KafkaSinkConfig,
102 serializer: Box<dyn RecordSerializer>,
104 partitioner: Box<dyn KafkaPartitioner>,
106 state: ConnectorState,
108 current_epoch: u64,
110 last_committed_epoch: u64,
112 transaction_active: bool,
114 dlq_producer: Option<FutureProducer>,
116 metrics: KafkaSinkMetrics,
118 schema: SchemaRef,
120 schema_registry: Option<Arc<SchemaRegistryClient>>,
122 avro_schema_id: Arc<std::sync::atomic::AtomicU32>,
124 topic_partition_count: i32,
126}
127
128impl KafkaSink {
129 #[must_use]
136 pub fn new(
137 schema: SchemaRef,
138 config: KafkaSinkConfig,
139 registry: Option<&prometheus::Registry>,
140 ) -> Self {
141 let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
142 let serializer =
143 select_serializer(config.format, &schema, Arc::clone(&avro_schema_id), None)
144 .expect("format validated in KafkaSinkConfig::validate()");
145 let partitioner = select_partitioner(config.partitioner);
146
147 Self {
148 producer: None,
149 config,
150 serializer,
151 partitioner,
152 state: ConnectorState::Created,
153 current_epoch: 0,
154 last_committed_epoch: 0,
155 transaction_active: false,
156 dlq_producer: None,
157 metrics: KafkaSinkMetrics::new(registry),
158 schema,
159 schema_registry: None,
160 avro_schema_id,
161 topic_partition_count: FALLBACK_PARTITION_COUNT,
162 }
163 }
164
165 #[must_use]
172 pub fn with_schema_registry(
173 schema: SchemaRef,
174 config: KafkaSinkConfig,
175 sr_client: SchemaRegistryClient,
176 ) -> Self {
177 let sr = Arc::new(sr_client);
178 let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
179 let serializer = select_serializer(
180 config.format,
181 &schema,
182 Arc::clone(&avro_schema_id),
183 Some(Arc::clone(&sr)),
184 )
185 .expect("format validated in KafkaSinkConfig::validate()");
186 let partitioner = select_partitioner(config.partitioner);
187
188 Self {
189 producer: None,
190 config,
191 serializer,
192 partitioner,
193 state: ConnectorState::Created,
194 current_epoch: 0,
195 last_committed_epoch: 0,
196 transaction_active: false,
197 dlq_producer: None,
198 metrics: KafkaSinkMetrics::new(None),
199 schema,
200 schema_registry: Some(sr),
201 avro_schema_id,
202 topic_partition_count: FALLBACK_PARTITION_COUNT,
203 }
204 }
205
206 #[must_use]
208 pub fn state(&self) -> ConnectorState {
209 self.state
210 }
211
212 #[must_use]
214 pub fn has_schema_registry(&self) -> bool {
215 self.schema_registry.is_some()
216 }
217
218 #[must_use]
220 pub fn current_epoch(&self) -> u64 {
221 self.current_epoch
222 }
223
224 #[must_use]
226 pub fn last_committed_epoch(&self) -> u64 {
227 self.last_committed_epoch
228 }
229
230 async fn ensure_schema_ready(
232 &mut self,
233 batch_schema: &SchemaRef,
234 ) -> Result<(), ConnectorError> {
235 let schema_changed = self.schema != *batch_schema;
236 let needs_registration = self.config.format == Format::Avro
237 && (schema_changed
238 || self
239 .avro_schema_id
240 .load(std::sync::atomic::Ordering::Relaxed)
241 == 0);
242
243 if needs_registration {
247 if let Some(ref sr) = self.schema_registry {
248 let subject = format!("{}-value", self.config.topic);
249 let avro_schema =
250 super::schema_registry::arrow_to_avro_schema(batch_schema, &self.config.topic)
251 .map_err(ConnectorError::Serde)?;
252 let schema_id = sr
253 .register_schema(
254 &subject,
255 &avro_schema,
256 super::schema_registry::SchemaType::Avro,
257 )
258 .await
259 .map_err(|e| {
260 ConnectorError::ConnectionFailed(format!(
261 "failed to register Avro schema for '{subject}': {e}"
262 ))
263 })?;
264 #[allow(clippy::cast_sign_loss)]
265 self.avro_schema_id
266 .store(schema_id as u32, std::sync::atomic::Ordering::Relaxed);
267 info!(subject = %subject, schema_id, "registered Avro schema");
268 }
269 }
270
271 if schema_changed {
272 debug!(
273 old = ?self.schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
274 new = ?batch_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
275 "sink schema updated from incoming batch"
276 );
277 self.schema = batch_schema.clone();
278 self.serializer = select_serializer(
279 self.config.format,
280 &self.schema,
281 Arc::clone(&self.avro_schema_id),
282 self.schema_registry.clone(),
283 )?;
284 }
285
286 Ok(())
287 }
288
289 fn extract_keys(
293 &self,
294 batch: &arrow_array::RecordBatch,
295 ) -> Result<Option<KeyBuffer>, ConnectorError> {
296 let Some(key_col) = &self.config.key_column else {
297 return Ok(None);
298 };
299
300 let col_idx = batch.schema().index_of(key_col).map_err(|_| {
301 ConnectorError::ConfigurationError(format!(
302 "key column '{key_col}' not found in schema"
303 ))
304 })?;
305
306 let array = batch.column(col_idx);
307 let num_rows = batch.num_rows();
308 let mut buf = KeyBuffer::with_capacity(num_rows, 32);
309
310 if let Some(str_array) = array.as_any().downcast_ref::<StringArray>() {
312 for i in 0..num_rows {
313 if str_array.is_null(i) {
314 buf.push_empty();
315 } else {
316 buf.push(str_array.value(i).as_bytes());
317 }
318 }
319 } else {
320 use std::fmt::Write;
322 let formatter = arrow_cast::display::ArrayFormatter::try_new(
323 array,
324 &arrow_cast::display::FormatOptions::default(),
325 )
326 .map_err(|e| {
327 ConnectorError::Internal(format!(
328 "failed to create array formatter for key column: {e}"
329 ))
330 })?;
331 let mut fmt_buf = String::with_capacity(64);
333 for i in 0..num_rows {
334 if array.is_null(i) {
335 buf.push_empty();
336 } else {
337 fmt_buf.clear();
338 let _ = write!(fmt_buf, "{}", formatter.value(i));
339 buf.push(fmt_buf.as_bytes());
340 }
341 }
342 }
343
344 Ok(Some(buf))
345 }
346
347 async fn route_to_dlq(
349 &self,
350 payload: &[u8],
351 key: Option<&[u8]>,
352 error_msg: &str,
353 ) -> Result<(), ConnectorError> {
354 let dlq_producer = self
355 .dlq_producer
356 .as_ref()
357 .ok_or_else(|| ConnectorError::ConfigurationError("DLQ topic not configured".into()))?;
358 let dlq_topic =
359 self.config.dlq_topic.as_ref().ok_or_else(|| {
360 ConnectorError::ConfigurationError("DLQ topic not configured".into())
361 })?;
362
363 let now = std::time::SystemTime::now()
364 .duration_since(std::time::UNIX_EPOCH)
365 .unwrap_or_else(|_| {
366 tracing::warn!("system clock before Unix epoch — using 0 for DLQ timestamp");
367 std::time::Duration::ZERO
368 })
369 .as_millis()
370 .to_string();
371 let epoch_str = self.current_epoch.to_string();
372
373 let headers = OwnedHeaders::new()
374 .insert(rdkafka::message::Header {
375 key: "__dlq.error",
376 value: Some(error_msg.as_bytes()),
377 })
378 .insert(rdkafka::message::Header {
379 key: "__dlq.topic",
380 value: Some(self.config.topic.as_bytes()),
381 })
382 .insert(rdkafka::message::Header {
383 key: "__dlq.timestamp",
384 value: Some(now.as_bytes()),
385 })
386 .insert(rdkafka::message::Header {
387 key: "__dlq.epoch",
388 value: Some(epoch_str.as_bytes()),
389 });
390
391 let mut record = FutureRecord::to(dlq_topic)
392 .payload(payload)
393 .headers(headers);
394
395 if let Some(k) = key {
396 record = record.key(k);
397 }
398
399 dlq_producer
400 .send(record, Duration::from_secs(5))
401 .await
402 .map_err(|(e, _)| ConnectorError::WriteError(format!("DLQ send failed: {e}")))?;
403
404 self.metrics.record_dlq();
405 Ok(())
406 }
407
408 async fn enqueue_with_queue_retry(
413 producer: &FutureProducer,
414 mut record: FutureRecord<'_, [u8], [u8]>,
415 queue_timeout: Duration,
416 ) -> Result<DeliveryFuture, ConnectorError> {
417 let start = Instant::now();
418 loop {
419 match producer.send_result(record) {
420 Ok(fut) => return Ok(fut),
421 Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), r))
422 if start.elapsed() < queue_timeout =>
423 {
424 record = r;
425 tokio::time::sleep(Duration::from_millis(100)).await;
426 }
427 Err((e, _)) => {
428 return Err(ConnectorError::WriteError(format!(
429 "Kafka enqueue failed: {e}"
430 )));
431 }
432 }
433 }
434 }
435
436 async fn flush_producer_async(
439 producer: &FutureProducer,
440 timeout: Duration,
441 ) -> Result<(), rdkafka::error::KafkaError> {
442 let p = producer.clone();
443 match tokio::task::spawn_blocking(move || p.flush(timeout)).await {
444 Ok(result) => result,
445 Err(join_err) => {
446 warn!("flush blocking task failed: {join_err}");
447 Err(rdkafka::error::KafkaError::Canceled)
448 }
449 }
450 }
451
452 async fn producer_blocking<F, R>(producer: &FutureProducer, op: F) -> R
456 where
457 F: FnOnce(&FutureProducer) -> R + Send + 'static,
458 R: Send + 'static,
459 {
460 let p = producer.clone();
461 tokio::task::spawn_blocking(move || op(&p))
462 .await
463 .expect("producer_blocking: blocking task panicked")
464 }
465}
466
467#[async_trait]
468#[allow(clippy::too_many_lines)]
469impl SinkConnector for KafkaSink {
470 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
471 self.state = ConnectorState::Initializing;
472
473 if !config.properties().is_empty() {
475 let parsed = KafkaSinkConfig::from_config(config)?;
476 self.config = parsed;
477 self.serializer = select_serializer(
478 self.config.format,
479 &self.schema,
480 Arc::clone(&self.avro_schema_id),
481 self.schema_registry.clone(),
482 )?;
483 self.partitioner = select_partitioner(self.config.partitioner);
484 }
485
486 info!(
487 brokers = %self.config.bootstrap_servers,
488 topic = %self.config.topic,
489 format = %self.config.format,
490 delivery = %self.config.delivery_guarantee,
491 "opening Kafka sink connector"
492 );
493
494 let rdkafka_config: ClientConfig = self.config.to_rdkafka_config();
496 let producer: FutureProducer = rdkafka_config.create().map_err(|e| {
497 ConnectorError::ConnectionFailed(format!("failed to create producer: {e}"))
498 })?;
499
500 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
502 producer
503 .init_transactions(self.config.transaction_timeout)
504 .map_err(|e| {
505 ConnectorError::TransactionError(format!("failed to init transactions: {e}"))
506 })?;
507 }
508
509 if self.config.dlq_topic.is_some() {
514 let dlq_config = self.config.to_dlq_rdkafka_config();
515 let dlq_producer: FutureProducer = dlq_config.create().map_err(|e| {
516 ConnectorError::ConnectionFailed(format!("failed to create DLQ producer: {e}"))
517 })?;
518 self.dlq_producer = Some(dlq_producer);
519 }
520
521 if let Some(ref url) = self.config.schema_registry_url {
523 if self.schema_registry.is_none() {
524 let sr = if let Some(ref ca_path) = self.config.schema_registry_ssl_ca_location {
525 SchemaRegistryClient::with_tls(
526 url,
527 self.config.schema_registry_auth.clone(),
528 ca_path,
529 )?
530 } else {
531 SchemaRegistryClient::new(url, self.config.schema_registry_auth.clone())
532 };
533 self.schema_registry = Some(Arc::new(sr));
534 }
535 }
536
537 if self.config.format == Format::Avro {
542 if let Some(ref sr) = self.schema_registry {
543 if let Some(ref compat) = self.config.schema_compatibility {
544 let subject = format!("{}-value", self.config.topic);
545 sr.set_compatibility_level(&subject, *compat)
546 .await
547 .map_err(|e| {
548 ConnectorError::ConnectionFailed(format!(
549 "failed to set SR compatibility for '{subject}': {e}"
550 ))
551 })?;
552 }
553 }
554 }
555
556 self.topic_partition_count = FALLBACK_PARTITION_COUNT;
559 match producer
560 .client()
561 .fetch_metadata(Some(&self.config.topic), Duration::from_secs(5))
562 {
563 Ok(metadata) => {
564 if let Some(topic_meta) = metadata.topics().first() {
565 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
566 let count = topic_meta.partitions().len() as i32;
567 if count > 0 {
568 self.topic_partition_count = count;
569 info!(
570 topic = %self.config.topic,
571 partitions = count,
572 "queried topic partition count from broker"
573 );
574 }
575 }
576 }
577 Err(e) => {
578 warn!(
579 topic = %self.config.topic,
580 error = %e,
581 fallback = FALLBACK_PARTITION_COUNT,
582 "failed to query topic metadata — using fallback partition count"
583 );
584 }
585 }
586
587 self.producer = Some(producer);
588 self.state = ConnectorState::Running;
589 info!("Kafka sink connector opened successfully");
590 Ok(())
591 }
592
593 #[allow(clippy::cast_possible_truncation)] async fn write_batch(
595 &mut self,
596 batch: &arrow_array::RecordBatch,
597 ) -> Result<WriteResult, ConnectorError> {
598 if self.state != ConnectorState::Running {
599 return Err(ConnectorError::InvalidState {
600 expected: "Running".into(),
601 actual: self.state.to_string(),
602 });
603 }
604
605 self.ensure_schema_ready(&batch.schema()).await?;
606
607 let producer = self
608 .producer
609 .as_ref()
610 .ok_or_else(|| ConnectorError::InvalidState {
611 expected: "producer initialized".into(),
612 actual: "producer is None".into(),
613 })?;
614
615 let payloads = self.serializer.serialize(batch).map_err(|e| {
617 self.metrics.record_serialization_error();
618 ConnectorError::Serde(e)
619 })?;
620
621 let keys = self.extract_keys(batch)?;
623
624 let mut records_written: usize = 0;
625 let mut bytes_written: u64 = 0;
626
627 let flush_threshold = self.config.flush_batch_size;
630 let mut delivery_futures = Vec::with_capacity(payloads.len());
631 for (i, payload) in payloads.iter().enumerate() {
632 let key: Option<&[u8]> = keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
633 let partition = self.partitioner.partition(key, self.topic_partition_count);
634
635 let mut record = FutureRecord::to(&self.config.topic).payload(payload.as_slice());
636 if let Some(k) = key {
637 record = record.key(k);
638 }
639 if let Some(p) = partition {
640 record = record.partition(p);
641 }
642
643 let fut = Self::enqueue_with_queue_retry(producer, record, Duration::from_millis(500))
646 .await?;
647 delivery_futures.push((Instant::now(), fut));
648
649 if flush_threshold > 0 && (i + 1) % flush_threshold == 0 {
650 Self::flush_producer_async(producer, self.config.delivery_timeout)
651 .await
652 .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
653 }
654 }
655
656 let mut failed: usize = 0;
659 let mut first_error: Option<String> = None;
660 for (i, (send_time, future)) in delivery_futures.into_iter().enumerate() {
661 let err_msg = match future.await {
662 Ok(Ok(_)) => {
663 let latency_us = send_time.elapsed().as_micros() as u64;
664 self.metrics.record_produce_latency(latency_us);
665 records_written += 1;
666 bytes_written += payloads[i].len() as u64;
667 continue;
668 }
669 Ok(Err((err, _))) => err.to_string(),
670 Err(_canceled) => "delivery canceled — producer dropped before ack".into(),
671 };
672
673 self.metrics.record_error();
674 failed += 1;
675 if first_error.is_none() {
676 first_error = Some(err_msg.clone());
677 }
678
679 if self.dlq_producer.is_some() {
680 let key: Option<&[u8]> =
681 keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
682 if let Err(dlq_err) = self.route_to_dlq(&payloads[i], key, &err_msg).await {
683 warn!(
684 original_error = %err_msg,
685 dlq_error = %dlq_err,
686 "failed to route record to DLQ — record lost"
687 );
688 }
689 }
690 }
691
692 self.metrics
693 .record_write(records_written as u64, bytes_written);
694
695 debug!(
696 records = records_written,
697 bytes = bytes_written,
698 failed,
699 "wrote batch to Kafka"
700 );
701
702 if failed > 0 && self.dlq_producer.is_none() {
705 return Err(ConnectorError::WriteError(format!(
706 "Kafka produce: {failed}/{} records failed, first error: {}",
707 payloads.len(),
708 first_error.unwrap_or_else(|| "unknown".into())
709 )));
710 }
711
712 Ok(WriteResult::new(records_written, bytes_written))
713 }
714
715 fn schema(&self) -> SchemaRef {
716 self.schema.clone()
717 }
718
719 async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
720 self.current_epoch = epoch;
721
722 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
723 let producer = self
724 .producer
725 .as_ref()
726 .ok_or_else(|| ConnectorError::InvalidState {
727 expected: "Running".into(),
728 actual: self.state.to_string(),
729 })?;
730
731 if self.transaction_active {
736 debug!(
737 epoch,
738 "transaction already open — pending rows ride into this epoch"
739 );
740 self.partitioner.reset();
741 return Ok(());
742 }
743
744 Self::producer_blocking(producer, FutureProducer::begin_transaction)
745 .await
746 .map_err(|e| {
747 ConnectorError::TransactionError(format!(
748 "failed to begin transaction for epoch {epoch}: {e}"
749 ))
750 })?;
751
752 self.transaction_active = true;
753 }
754
755 self.partitioner.reset();
756 debug!(epoch, "began epoch");
757 Ok(())
758 }
759
760 async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
761 if epoch < self.current_epoch {
768 return Err(ConnectorError::TransactionError(format!(
769 "stale epoch in pre_commit: current {}, got {epoch}",
770 self.current_epoch
771 )));
772 }
773 self.current_epoch = epoch;
774
775 if let Some(ref producer) = self.producer {
777 Self::flush_producer_async(producer, self.config.delivery_timeout)
778 .await
779 .map_err(|e| {
780 ConnectorError::TransactionError(format!(
781 "failed to flush before pre-commit for epoch {epoch}: {e}"
782 ))
783 })?;
784 }
785
786 debug!(epoch, "pre-committed epoch (flushed)");
787 Ok(())
788 }
789
790 async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
791 if epoch < self.current_epoch {
793 return Err(ConnectorError::TransactionError(format!(
794 "stale epoch in commit: current {}, got {epoch}",
795 self.current_epoch
796 )));
797 }
798 self.current_epoch = epoch;
799
800 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
801 let producer = self
802 .producer
803 .as_ref()
804 .ok_or_else(|| ConnectorError::InvalidState {
805 expected: "Running".into(),
806 actual: self.state.to_string(),
807 })?;
808
809 Self::flush_producer_async(producer, self.config.delivery_timeout)
810 .await
811 .map_err(|e| {
812 ConnectorError::TransactionError(format!("failed to flush before commit: {e}"))
813 })?;
814
815 let txn_timeout = self.config.transaction_timeout;
816 Self::producer_blocking(producer, move |p| p.commit_transaction(txn_timeout))
817 .await
818 .map_err(|e| {
819 ConnectorError::TransactionError(format!(
820 "failed to commit transaction for epoch {epoch}: {e}"
821 ))
822 })?;
823
824 self.transaction_active = false;
825 } else {
826 if let Some(ref producer) = self.producer {
828 Self::flush_producer_async(producer, self.config.delivery_timeout)
829 .await
830 .map_err(|e| {
831 ConnectorError::TransactionError(format!(
832 "failed to flush for epoch {epoch}: {e}"
833 ))
834 })?;
835 }
836 }
837
838 self.last_committed_epoch = epoch;
839 self.metrics.record_commit();
840 debug!(epoch, "committed epoch");
841 Ok(())
842 }
843
844 async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
845 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
846 && self.transaction_active
847 {
848 let producer = self
849 .producer
850 .as_ref()
851 .ok_or_else(|| ConnectorError::InvalidState {
852 expected: "Running".into(),
853 actual: self.state.to_string(),
854 })?;
855
856 let txn_timeout = self.config.transaction_timeout;
857 Self::producer_blocking(producer, move |p| p.abort_transaction(txn_timeout))
858 .await
859 .map_err(|e| {
860 ConnectorError::TransactionError(format!(
861 "failed to abort transaction for epoch {epoch}: {e}"
862 ))
863 })?;
864
865 self.transaction_active = false;
866 }
867
868 self.metrics.record_rollback();
869 debug!(epoch, "rolled back epoch");
870 Ok(())
871 }
872
873 fn capabilities(&self) -> SinkConnectorCapabilities {
874 let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(10))
877 .with_idempotent()
878 .with_partitioned();
879
880 if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
881 caps = caps
886 .with_exactly_once()
887 .with_two_phase_commit()
888 .with_preserves_pending_on_abandon();
889 }
890
891 if self.schema_registry.is_some() {
892 caps = caps.with_schema_evolution();
893 }
894
895 caps
896 }
897
898 async fn flush(&mut self) -> Result<(), ConnectorError> {
899 if let Some(ref producer) = self.producer {
900 Self::flush_producer_async(producer, PERIODIC_FLUSH_TIMEOUT)
901 .await
902 .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
903 }
904 Ok(())
905 }
906
907 async fn close(&mut self) -> Result<(), ConnectorError> {
908 info!("closing Kafka sink connector");
909
910 if self.transaction_active {
912 if let Err(e) = self.rollback_epoch(self.current_epoch).await {
913 warn!(error = %e, "failed to abort active transaction on close");
914 }
915 }
916
917 let mut first_err: Option<ConnectorError> = None;
918
919 if let Some(ref producer) = self.producer {
920 if let Err(e) = Self::flush_producer_async(producer, Duration::from_secs(30)).await {
921 warn!(error = %e, "failed to flush on close");
922 first_err.get_or_insert(ConnectorError::WriteError(format!(
923 "flush failed on close: {e}"
924 )));
925 }
926 }
927
928 if let Some(ref dlq) = self.dlq_producer {
929 if let Err(e) = Self::flush_producer_async(dlq, Duration::from_secs(10)).await {
930 warn!(error = %e, "failed to flush DLQ producer on close");
931 first_err.get_or_insert(ConnectorError::WriteError(format!(
932 "DLQ flush failed on close: {e}"
933 )));
934 }
935 }
936
937 self.producer = None;
938 self.dlq_producer = None;
939 self.state = ConnectorState::Closed;
940 info!("Kafka sink connector closed");
941 match first_err {
942 Some(e) => Err(e),
943 None => Ok(()),
944 }
945 }
946}
947
948impl std::fmt::Debug for KafkaSink {
949 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
950 f.debug_struct("KafkaSink")
951 .field("state", &self.state)
952 .field("topic", &self.config.topic)
953 .field("delivery", &self.config.delivery_guarantee)
954 .field("format", &self.config.format)
955 .field("current_epoch", &self.current_epoch)
956 .field("last_committed_epoch", &self.last_committed_epoch)
957 .field("transaction_active", &self.transaction_active)
958 .finish_non_exhaustive()
959 }
960}
961
962fn select_serializer(
967 format: Format,
968 schema: &SchemaRef,
969 schema_id: Arc<std::sync::atomic::AtomicU32>,
970 registry: Option<Arc<SchemaRegistryClient>>,
971) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
972 match format {
973 Format::Avro => Ok(Box::new(AvroSerializer::with_shared_schema_id(
974 schema.clone(),
975 schema_id,
976 registry,
977 ))),
978 other => serde::create_serializer(other).map_err(|e| {
979 ConnectorError::ConfigurationError(format!("unsupported sink format '{other}': {e}"))
980 }),
981 }
982}
983
984fn select_partitioner(strategy: PartitionStrategy) -> Box<dyn KafkaPartitioner> {
986 match strategy {
987 PartitionStrategy::KeyHash => Box::new(KeyHashPartitioner::new()),
988 PartitionStrategy::RoundRobin => Box::new(RoundRobinPartitioner::new()),
989 PartitionStrategy::Sticky => Box::new(StickyPartitioner::new(100)),
990 }
991}
992
993#[cfg(test)]
994mod tests {
995 use super::*;
996 use arrow_array::Int64Array;
997 use arrow_schema::{DataType, Field, Schema};
998
999 fn test_schema() -> SchemaRef {
1000 Arc::new(Schema::new(vec![
1001 Field::new("id", DataType::Int64, false),
1002 Field::new("value", DataType::Utf8, false),
1003 ]))
1004 }
1005
1006 fn test_config() -> KafkaSinkConfig {
1007 let mut cfg = KafkaSinkConfig::default();
1008 cfg.bootstrap_servers = "localhost:9092".into();
1009 cfg.topic = "output-events".into();
1010 cfg
1011 }
1012
1013 #[test]
1014 fn test_new_defaults() {
1015 let sink = KafkaSink::new(test_schema(), test_config(), None);
1016 assert_eq!(sink.state(), ConnectorState::Created);
1017 assert!(sink.producer.is_none());
1018 assert_eq!(sink.current_epoch(), 0);
1019 assert_eq!(sink.last_committed_epoch(), 0);
1020 assert!(!sink.transaction_active);
1021 }
1022
1023 #[test]
1024 fn test_schema_returned() {
1025 let schema = test_schema();
1026 let sink = KafkaSink::new(schema.clone(), test_config(), None);
1027 assert_eq!(sink.schema(), schema);
1028 }
1029
1030 #[test]
1031 fn test_capabilities_at_least_once() {
1032 let sink = KafkaSink::new(test_schema(), test_config(), None);
1033 let caps = sink.capabilities();
1034 assert!(!caps.exactly_once);
1035 assert!(caps.idempotent);
1036 assert!(caps.partitioned);
1037 assert!(!caps.schema_evolution);
1038 }
1039
1040 #[test]
1041 fn test_capabilities_exactly_once() {
1042 let mut cfg = test_config();
1043 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1044 let sink = KafkaSink::new(test_schema(), cfg, None);
1045 let caps = sink.capabilities();
1046 assert!(caps.exactly_once);
1047 assert!(caps.idempotent);
1048 assert!(caps.partitioned);
1049 }
1050
1051 #[test]
1052 fn test_serializer_selection_json() {
1053 let sink = KafkaSink::new(test_schema(), test_config(), None);
1054 assert_eq!(sink.serializer.format(), Format::Json);
1055 }
1056
1057 #[test]
1058 fn test_serializer_selection_avro() {
1059 let mut cfg = test_config();
1060 cfg.format = Format::Avro;
1061 let sink = KafkaSink::new(test_schema(), cfg, None);
1062 assert_eq!(sink.serializer.format(), Format::Avro);
1063 }
1064
1065 #[test]
1066 fn test_with_schema_registry() {
1067 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1068 let mut cfg = test_config();
1069 cfg.format = Format::Avro;
1070 cfg.schema_registry_url = Some("http://localhost:8081".into());
1071
1072 let sink = KafkaSink::with_schema_registry(test_schema(), cfg, sr);
1073 assert!(sink.has_schema_registry());
1074 assert_eq!(sink.serializer.format(), Format::Avro);
1075 let caps = sink.capabilities();
1076 assert!(caps.schema_evolution);
1077 }
1078
1079 #[test]
1080 fn test_debug_output() {
1081 let sink = KafkaSink::new(test_schema(), test_config(), None);
1082 let debug = format!("{sink:?}");
1083 assert!(debug.contains("KafkaSink"));
1084 assert!(debug.contains("output-events"));
1085 }
1086
1087 #[test]
1088 fn test_extract_keys_no_key_column() {
1089 let sink = KafkaSink::new(test_schema(), test_config(), None);
1090 let batch = arrow_array::RecordBatch::try_new(
1091 test_schema(),
1092 vec![
1093 Arc::new(Int64Array::from(vec![1, 2])),
1094 Arc::new(StringArray::from(vec!["a", "b"])),
1095 ],
1096 )
1097 .unwrap();
1098 assert!(sink.extract_keys(&batch).unwrap().is_none());
1099 }
1100
1101 #[test]
1102 fn test_extract_keys_with_key_column() {
1103 let mut cfg = test_config();
1104 cfg.key_column = Some("value".into());
1105 let sink = KafkaSink::new(test_schema(), cfg, None);
1106 let batch = arrow_array::RecordBatch::try_new(
1107 test_schema(),
1108 vec![
1109 Arc::new(Int64Array::from(vec![1, 2])),
1110 Arc::new(StringArray::from(vec!["key-a", "key-b"])),
1111 ],
1112 )
1113 .unwrap();
1114 let keys = sink.extract_keys(&batch).unwrap().unwrap();
1115 assert_eq!(keys.len(), 2);
1116 assert_eq!(&keys[0], b"key-a");
1117 assert_eq!(&keys[1], b"key-b");
1118 }
1119}