Skip to main content

laminar_connectors/kafka/
sink.rs

1//! Kafka sink connector implementation.
2//!
3//! [`KafkaSink`] implements the [`SinkConnector`] trait, writing Arrow
4//! `RecordBatch` data to Kafka topics via rdkafka's `FutureProducer`.
5//! Supports at-least-once (idempotent) and exactly-once (transactional)
6//! delivery, configurable partitioning, and dead-letter queue routing.
7
8use std::sync::Arc;
9use std::time::Duration;
10
11use arrow_array::{Array, StringArray};
12use arrow_schema::SchemaRef;
13use async_trait::async_trait;
14use rdkafka::message::OwnedHeaders;
15use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
16use rdkafka::ClientConfig;
17use tracing::{debug, info, warn};
18
19use crate::config::{ConnectorConfig, ConnectorState};
20use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
21use crate::error::ConnectorError;
22use crate::health::HealthStatus;
23use crate::metrics::ConnectorMetrics;
24use crate::serde::{self, Format, RecordSerializer};
25
26use super::avro_serializer::AvroSerializer;
27use super::partitioner::{
28    KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
29};
30use super::schema_registry::SchemaRegistryClient;
31use super::sink_config::{DeliveryGuarantee, KafkaSinkConfig, PartitionStrategy};
32use super::sink_metrics::KafkaSinkMetrics;
33
34/// Fallback partition count used only when broker metadata query fails.
35const FALLBACK_PARTITION_COUNT: i32 = 1;
36
37/// Contiguous key buffer — stores all key bytes in a single allocation
38/// with per-row `(offset, length)` pairs. Avoids N separate heap
39/// allocations for N rows.
40struct KeyBuffer {
41    data: Vec<u8>,
42    offsets: Vec<(usize, usize)>,
43}
44
45impl KeyBuffer {
46    fn with_capacity(num_rows: usize, avg_key_len: usize) -> Self {
47        Self {
48            data: Vec::with_capacity(num_rows * avg_key_len),
49            offsets: Vec::with_capacity(num_rows),
50        }
51    }
52
53    fn push(&mut self, key: &[u8]) {
54        let start = self.data.len();
55        self.data.extend_from_slice(key);
56        self.offsets.push((start, key.len()));
57    }
58
59    fn push_empty(&mut self) {
60        self.offsets.push((0, 0));
61    }
62
63    fn key(&self, i: usize) -> &[u8] {
64        let (start, len) = self.offsets[i];
65        &self.data[start..start + len]
66    }
67
68    #[cfg(test)]
69    fn len(&self) -> usize {
70        self.offsets.len()
71    }
72}
73
74impl std::ops::Index<usize> for KeyBuffer {
75    type Output = [u8];
76
77    fn index(&self, i: usize) -> &[u8] {
78        self.key(i)
79    }
80}
81
82/// Kafka sink connector that writes Arrow `RecordBatch` data to Kafka topics.
83///
84/// Operates in Ring 1 (background) receiving data from Ring 0 via the
85/// subscription API.
86///
87/// # Lifecycle
88///
89/// 1. Create with [`KafkaSink::new`]
90/// 2. Call `open()` to create the producer and connect to Kafka
91/// 3. For each epoch:
92///    - `begin_epoch()` starts a Kafka transaction (exactly-once only)
93///    - `write_batch()` serializes and produces records
94///    - `commit_epoch()` commits the transaction
95/// 4. Call `close()` for clean shutdown
96pub struct KafkaSink {
97    /// rdkafka producer (set during `open()`).
98    producer: Option<FutureProducer>,
99    /// Parsed Kafka sink configuration.
100    config: KafkaSinkConfig,
101    /// Format-specific serializer.
102    serializer: Box<dyn RecordSerializer>,
103    /// Partitioner for determining target partitions.
104    partitioner: Box<dyn KafkaPartitioner>,
105    /// Connector lifecycle state.
106    state: ConnectorState,
107    /// Current `LaminarDB` epoch.
108    current_epoch: u64,
109    /// Last successfully committed epoch.
110    last_committed_epoch: u64,
111    /// Whether a Kafka transaction is currently active.
112    transaction_active: bool,
113    /// Dead letter queue producer (separate, non-transactional).
114    dlq_producer: Option<FutureProducer>,
115    /// Production metrics.
116    metrics: KafkaSinkMetrics,
117    /// Arrow schema for input batches.
118    schema: SchemaRef,
119    /// Optional Schema Registry client.
120    schema_registry: Option<Arc<SchemaRegistryClient>>,
121    /// Shared Avro schema ID (updated after SR registration).
122    avro_schema_id: Arc<std::sync::atomic::AtomicU32>,
123    /// Cached topic partition count (queried from broker metadata after open).
124    topic_partition_count: i32,
125}
126
127impl KafkaSink {
128    /// Creates a new Kafka sink connector with explicit schema.
129    #[must_use]
130    pub fn new(schema: SchemaRef, config: KafkaSinkConfig) -> Self {
131        let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
132        let serializer =
133            select_serializer(config.format, &schema, Arc::clone(&avro_schema_id), None);
134        let partitioner = select_partitioner(config.partitioner);
135
136        Self {
137            producer: None,
138            config,
139            serializer,
140            partitioner,
141            state: ConnectorState::Created,
142            current_epoch: 0,
143            last_committed_epoch: 0,
144            transaction_active: false,
145            dlq_producer: None,
146            metrics: KafkaSinkMetrics::new(),
147            schema,
148            schema_registry: None,
149            avro_schema_id,
150            topic_partition_count: FALLBACK_PARTITION_COUNT,
151        }
152    }
153
154    /// Creates a new Kafka sink with Schema Registry integration.
155    #[must_use]
156    pub fn with_schema_registry(
157        schema: SchemaRef,
158        config: KafkaSinkConfig,
159        sr_client: SchemaRegistryClient,
160    ) -> Self {
161        let sr = Arc::new(sr_client);
162        let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
163        let serializer = select_serializer(
164            config.format,
165            &schema,
166            Arc::clone(&avro_schema_id),
167            Some(Arc::clone(&sr)),
168        );
169        let partitioner = select_partitioner(config.partitioner);
170
171        Self {
172            producer: None,
173            config,
174            serializer,
175            partitioner,
176            state: ConnectorState::Created,
177            current_epoch: 0,
178            last_committed_epoch: 0,
179            transaction_active: false,
180            dlq_producer: None,
181            metrics: KafkaSinkMetrics::new(),
182            schema,
183            schema_registry: Some(sr),
184            avro_schema_id,
185            topic_partition_count: FALLBACK_PARTITION_COUNT,
186        }
187    }
188
189    /// Lifecycle state (Created → Running → Closed).
190    #[must_use]
191    pub fn state(&self) -> ConnectorState {
192        self.state
193    }
194
195    /// Whether Avro schema registration is available.
196    #[must_use]
197    pub fn has_schema_registry(&self) -> bool {
198        self.schema_registry.is_some()
199    }
200
201    /// Active epoch (incremented by checkpoint coordinator).
202    #[must_use]
203    pub fn current_epoch(&self) -> u64 {
204        self.current_epoch
205    }
206
207    /// Last epoch that was successfully committed to Kafka.
208    #[must_use]
209    pub fn last_committed_epoch(&self) -> u64 {
210        self.last_committed_epoch
211    }
212
213    /// Ensures the sink schema and SR registration match the actual data.
214    async fn ensure_schema_ready(
215        &mut self,
216        batch_schema: &SchemaRef,
217    ) -> Result<(), ConnectorError> {
218        let schema_changed = self.schema != *batch_schema;
219        let needs_registration = self.config.format == Format::Avro
220            && (schema_changed
221                || self
222                    .avro_schema_id
223                    .load(std::sync::atomic::Ordering::Relaxed)
224                    == 0);
225
226        // Register with SR *before* advancing schema/serializer so a failure
227        // doesn't leave avro_schema_id stale while the serializer already
228        // encodes with the new schema.
229        if needs_registration {
230            if let Some(ref sr) = self.schema_registry {
231                let subject = format!("{}-value", self.config.topic);
232                let avro_schema =
233                    super::schema_registry::arrow_to_avro_schema(batch_schema, &self.config.topic)
234                        .map_err(ConnectorError::Serde)?;
235                let schema_id = sr
236                    .register_schema(
237                        &subject,
238                        &avro_schema,
239                        super::schema_registry::SchemaType::Avro,
240                    )
241                    .await
242                    .map_err(|e| {
243                        ConnectorError::ConnectionFailed(format!(
244                            "failed to register Avro schema for '{subject}': {e}"
245                        ))
246                    })?;
247                #[allow(clippy::cast_sign_loss)]
248                self.avro_schema_id
249                    .store(schema_id as u32, std::sync::atomic::Ordering::Relaxed);
250                info!(subject = %subject, schema_id, "registered Avro schema");
251            }
252        }
253
254        if schema_changed {
255            debug!(
256                old = ?self.schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
257                new = ?batch_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
258                "sink schema updated from incoming batch"
259            );
260            self.schema = batch_schema.clone();
261            self.serializer = select_serializer(
262                self.config.format,
263                &self.schema,
264                Arc::clone(&self.avro_schema_id),
265                self.schema_registry.clone(),
266            );
267        }
268
269        Ok(())
270    }
271
272    /// Contiguous key buffer: all key bytes in one allocation with per-row offsets.
273    ///
274    /// Returns `None` if no key column is configured.
275    fn extract_keys(
276        &self,
277        batch: &arrow_array::RecordBatch,
278    ) -> Result<Option<KeyBuffer>, ConnectorError> {
279        let Some(key_col) = &self.config.key_column else {
280            return Ok(None);
281        };
282
283        let col_idx = batch.schema().index_of(key_col).map_err(|_| {
284            ConnectorError::ConfigurationError(format!(
285                "key column '{key_col}' not found in schema"
286            ))
287        })?;
288
289        let array = batch.column(col_idx);
290        let num_rows = batch.num_rows();
291        let mut buf = KeyBuffer::with_capacity(num_rows, 32);
292
293        // Try to get string values; fall back to display representation.
294        if let Some(str_array) = array.as_any().downcast_ref::<StringArray>() {
295            for i in 0..num_rows {
296                if str_array.is_null(i) {
297                    buf.push_empty();
298                } else {
299                    buf.push(str_array.value(i).as_bytes());
300                }
301            }
302        } else {
303            // For non-string columns, use the Arrow array formatter.
304            use std::fmt::Write;
305            let formatter = arrow_cast::display::ArrayFormatter::try_new(
306                array,
307                &arrow_cast::display::FormatOptions::default(),
308            )
309            .map_err(|e| {
310                ConnectorError::Internal(format!(
311                    "failed to create array formatter for key column: {e}"
312                ))
313            })?;
314            // Reusable string buffer for formatted values.
315            let mut fmt_buf = String::with_capacity(64);
316            for i in 0..num_rows {
317                if array.is_null(i) {
318                    buf.push_empty();
319                } else {
320                    fmt_buf.clear();
321                    let _ = write!(fmt_buf, "{}", formatter.value(i));
322                    buf.push(fmt_buf.as_bytes());
323                }
324            }
325        }
326
327        Ok(Some(buf))
328    }
329
330    /// Routes a failed record to the dead letter queue.
331    async fn route_to_dlq(
332        &self,
333        payload: &[u8],
334        key: Option<&[u8]>,
335        error_msg: &str,
336    ) -> Result<(), ConnectorError> {
337        let dlq_producer = self
338            .dlq_producer
339            .as_ref()
340            .ok_or_else(|| ConnectorError::ConfigurationError("DLQ topic not configured".into()))?;
341        let dlq_topic =
342            self.config.dlq_topic.as_ref().ok_or_else(|| {
343                ConnectorError::ConfigurationError("DLQ topic not configured".into())
344            })?;
345
346        let now = std::time::SystemTime::now()
347            .duration_since(std::time::UNIX_EPOCH)
348            .unwrap_or_default()
349            .as_millis()
350            .to_string();
351        let epoch_str = self.current_epoch.to_string();
352
353        let headers = OwnedHeaders::new()
354            .insert(rdkafka::message::Header {
355                key: "__dlq.error",
356                value: Some(error_msg.as_bytes()),
357            })
358            .insert(rdkafka::message::Header {
359                key: "__dlq.topic",
360                value: Some(self.config.topic.as_bytes()),
361            })
362            .insert(rdkafka::message::Header {
363                key: "__dlq.timestamp",
364                value: Some(now.as_bytes()),
365            })
366            .insert(rdkafka::message::Header {
367                key: "__dlq.epoch",
368                value: Some(epoch_str.as_bytes()),
369            });
370
371        let mut record = FutureRecord::to(dlq_topic)
372            .payload(payload)
373            .headers(headers);
374
375        if let Some(k) = key {
376            record = record.key(k);
377        }
378
379        dlq_producer
380            .send(record, Duration::from_secs(5))
381            .await
382            .map_err(|(e, _)| ConnectorError::WriteError(format!("DLQ send failed: {e}")))?;
383
384        self.metrics.record_dlq();
385        Ok(())
386    }
387}
388
389#[async_trait]
390#[allow(clippy::too_many_lines)]
391impl SinkConnector for KafkaSink {
392    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
393        self.state = ConnectorState::Initializing;
394
395        // Re-parse config if properties provided.
396        if !config.properties().is_empty() {
397            let parsed = KafkaSinkConfig::from_config(config)?;
398            self.config = parsed;
399            self.serializer = select_serializer(
400                self.config.format,
401                &self.schema,
402                Arc::clone(&self.avro_schema_id),
403                self.schema_registry.clone(),
404            );
405            self.partitioner = select_partitioner(self.config.partitioner);
406        }
407
408        info!(
409            brokers = %self.config.bootstrap_servers,
410            topic = %self.config.topic,
411            format = %self.config.format,
412            delivery = %self.config.delivery_guarantee,
413            "opening Kafka sink connector"
414        );
415
416        // Build rdkafka producer.
417        let rdkafka_config: ClientConfig = self.config.to_rdkafka_config();
418        let producer: FutureProducer = rdkafka_config.create().map_err(|e| {
419            ConnectorError::ConnectionFailed(format!("failed to create producer: {e}"))
420        })?;
421
422        // Initialize transactions if exactly-once.
423        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
424            producer
425                .init_transactions(self.config.transaction_timeout)
426                .map_err(|e| {
427                    ConnectorError::TransactionError(format!("failed to init transactions: {e}"))
428                })?;
429        }
430
431        // Create DLQ producer if configured. Inherits security settings
432        // (SASL, SSL) from the main producer config but is non-transactional.
433        // DLQ records bypass the exactly-once transaction to avoid coupling
434        // error routing with the main data path.
435        if self.config.dlq_topic.is_some() {
436            let dlq_config = self.config.to_dlq_rdkafka_config();
437            let dlq_producer: FutureProducer = dlq_config.create().map_err(|e| {
438                ConnectorError::ConnectionFailed(format!("failed to create DLQ producer: {e}"))
439            })?;
440            self.dlq_producer = Some(dlq_producer);
441        }
442
443        // Initialize Schema Registry client if configured.
444        if let Some(ref url) = self.config.schema_registry_url {
445            if self.schema_registry.is_none() {
446                let sr = if let Some(ref ca_path) = self.config.schema_registry_ssl_ca_location {
447                    SchemaRegistryClient::with_tls(
448                        url,
449                        self.config.schema_registry_auth.clone(),
450                        ca_path,
451                    )?
452                } else {
453                    SchemaRegistryClient::new(url, self.config.schema_registry_auth.clone())
454                };
455                self.schema_registry = Some(Arc::new(sr));
456            }
457        }
458
459        // Set SR compatibility level if configured.
460        // Schema registration is deferred to first write_batch() where the
461        // real pipeline output schema is known (the factory default is a
462        // placeholder that would pollute the registry and break compat checks).
463        if self.config.format == Format::Avro {
464            if let Some(ref sr) = self.schema_registry {
465                if let Some(ref compat) = self.config.schema_compatibility {
466                    let subject = format!("{}-value", self.config.topic);
467                    sr.set_compatibility_level(&subject, *compat)
468                        .await
469                        .map_err(|e| {
470                            ConnectorError::ConnectionFailed(format!(
471                                "failed to set SR compatibility for '{subject}': {e}"
472                            ))
473                        })?;
474                }
475            }
476        }
477
478        // Query broker metadata for actual topic partition count.
479        // Reset to fallback first so a reopened sink doesn't keep a stale count.
480        self.topic_partition_count = FALLBACK_PARTITION_COUNT;
481        match producer
482            .client()
483            .fetch_metadata(Some(&self.config.topic), Duration::from_secs(5))
484        {
485            Ok(metadata) => {
486                if let Some(topic_meta) = metadata.topics().first() {
487                    #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
488                    let count = topic_meta.partitions().len() as i32;
489                    if count > 0 {
490                        self.topic_partition_count = count;
491                        info!(
492                            topic = %self.config.topic,
493                            partitions = count,
494                            "queried topic partition count from broker"
495                        );
496                    }
497                }
498            }
499            Err(e) => {
500                warn!(
501                    topic = %self.config.topic,
502                    error = %e,
503                    fallback = FALLBACK_PARTITION_COUNT,
504                    "failed to query topic metadata — using fallback partition count"
505                );
506            }
507        }
508
509        self.producer = Some(producer);
510        self.state = ConnectorState::Running;
511        info!("Kafka sink connector opened successfully");
512        Ok(())
513    }
514
515    #[allow(clippy::cast_possible_truncation)] // Record batch row/byte counts fit in narrower types
516    async fn write_batch(
517        &mut self,
518        batch: &arrow_array::RecordBatch,
519    ) -> Result<WriteResult, ConnectorError> {
520        if self.state != ConnectorState::Running {
521            return Err(ConnectorError::InvalidState {
522                expected: "Running".into(),
523                actual: self.state.to_string(),
524            });
525        }
526
527        self.ensure_schema_ready(&batch.schema()).await?;
528
529        let producer = self
530            .producer
531            .as_ref()
532            .ok_or_else(|| ConnectorError::InvalidState {
533                expected: "producer initialized".into(),
534                actual: "producer is None".into(),
535            })?;
536
537        // Serialize the RecordBatch into per-row byte payloads.
538        let payloads = self.serializer.serialize(batch).map_err(|e| {
539            self.metrics.record_serialization_error();
540            ConnectorError::Serde(e)
541        })?;
542
543        // Extract keys if key column is configured.
544        let keys = self.extract_keys(batch)?;
545
546        let mut records_written: usize = 0;
547        let mut bytes_written: u64 = 0;
548
549        // Phase 1: Enqueue records into librdkafka's internal queue.
550        // Flush every flush_batch_size records to bound memory usage
551        // and provide delivery confirmation in chunks.
552        let flush_threshold = self.config.flush_batch_size;
553        let mut delivery_futures = Vec::with_capacity(payloads.len());
554        for (i, payload) in payloads.iter().enumerate() {
555            let key: Option<&[u8]> = keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
556
557            // Determine partition using the count queried from broker metadata in open().
558            let partition = self.partitioner.partition(key, self.topic_partition_count);
559
560            // Build the Kafka record.
561            let mut record = FutureRecord::to(&self.config.topic).payload(payload);
562
563            if let Some(k) = key {
564                record = record.key(k);
565            }
566            if let Some(p) = partition {
567                record = record.partition(p);
568            }
569
570            // Allow 500ms for rdkafka's internal queue to drain before failing
571            // with QueueFull. Zero timeout causes legitimate messages to be
572            // rejected under transient burst load.
573            delivery_futures.push(producer.send(record, Duration::from_millis(500)));
574
575            // Intermediate flush to bound in-flight records and memory.
576            if flush_threshold > 0 && (i + 1) % flush_threshold == 0 {
577                producer
578                    .flush(self.config.delivery_timeout)
579                    .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
580            }
581        }
582
583        // Phase 2: Collect delivery reports.
584        for (i, future) in delivery_futures.into_iter().enumerate() {
585            match future.await {
586                Ok(_delivery) => {
587                    records_written += 1;
588                    bytes_written += payloads[i].len() as u64;
589                }
590                Err((err, _msg)) => {
591                    self.metrics.record_error();
592                    let err_msg = err.to_string();
593
594                    if self.dlq_producer.is_some() {
595                        let key: Option<&[u8]> =
596                            keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
597                        self.route_to_dlq(&payloads[i], key, &err_msg).await?;
598                    } else {
599                        return Err(ConnectorError::WriteError(format!(
600                            "Kafka produce failed: {err_msg}"
601                        )));
602                    }
603                }
604            }
605        }
606
607        self.metrics
608            .record_write(records_written as u64, bytes_written);
609
610        debug!(
611            records = records_written,
612            bytes = bytes_written,
613            "wrote batch to Kafka"
614        );
615
616        Ok(WriteResult::new(records_written, bytes_written))
617    }
618
619    fn schema(&self) -> SchemaRef {
620        self.schema.clone()
621    }
622
623    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
624        self.current_epoch = epoch;
625
626        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
627            let producer = self
628                .producer
629                .as_ref()
630                .ok_or_else(|| ConnectorError::InvalidState {
631                    expected: "Running".into(),
632                    actual: self.state.to_string(),
633                })?;
634
635            producer.begin_transaction().map_err(|e| {
636                ConnectorError::TransactionError(format!(
637                    "failed to begin transaction for epoch {epoch}: {e}"
638                ))
639            })?;
640
641            self.transaction_active = true;
642        }
643
644        self.partitioner.reset();
645        debug!(epoch, "began epoch");
646        Ok(())
647    }
648
649    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
650        if epoch != self.current_epoch {
651            return Err(ConnectorError::TransactionError(format!(
652                "epoch mismatch in pre_commit: expected {}, got {epoch}",
653                self.current_epoch
654            )));
655        }
656
657        // Flush all pending messages to Kafka brokers (phase 1).
658        if let Some(ref producer) = self.producer {
659            producer.flush(self.config.delivery_timeout).map_err(|e| {
660                ConnectorError::TransactionError(format!(
661                    "failed to flush before pre-commit for epoch {epoch}: {e}"
662                ))
663            })?;
664        }
665
666        debug!(epoch, "pre-committed epoch (flushed)");
667        Ok(())
668    }
669
670    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
671        if epoch != self.current_epoch {
672            return Err(ConnectorError::TransactionError(format!(
673                "epoch mismatch: expected {}, got {epoch}",
674                self.current_epoch
675            )));
676        }
677
678        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
679            let producer = self
680                .producer
681                .as_ref()
682                .ok_or_else(|| ConnectorError::InvalidState {
683                    expected: "Running".into(),
684                    actual: self.state.to_string(),
685                })?;
686
687            // Flush all pending messages.
688            producer.flush(self.config.delivery_timeout).map_err(|e| {
689                ConnectorError::TransactionError(format!("failed to flush before commit: {e}"))
690            })?;
691
692            // Commit the Kafka transaction.
693            producer
694                .commit_transaction(self.config.transaction_timeout)
695                .map_err(|e| {
696                    ConnectorError::TransactionError(format!(
697                        "failed to commit transaction for epoch {epoch}: {e}"
698                    ))
699                })?;
700
701            self.transaction_active = false;
702        } else {
703            // At-least-once: just flush pending messages.
704            if let Some(ref producer) = self.producer {
705                producer.flush(self.config.delivery_timeout).map_err(|e| {
706                    ConnectorError::TransactionError(format!(
707                        "failed to flush for epoch {epoch}: {e}"
708                    ))
709                })?;
710            }
711        }
712
713        self.last_committed_epoch = epoch;
714        self.metrics.record_commit();
715        debug!(epoch, "committed epoch");
716        Ok(())
717    }
718
719    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
720        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
721            && self.transaction_active
722        {
723            let producer = self
724                .producer
725                .as_ref()
726                .ok_or_else(|| ConnectorError::InvalidState {
727                    expected: "Running".into(),
728                    actual: self.state.to_string(),
729                })?;
730
731            producer
732                .abort_transaction(self.config.transaction_timeout)
733                .map_err(|e| {
734                    ConnectorError::TransactionError(format!(
735                        "failed to abort transaction for epoch {epoch}: {e}"
736                    ))
737                })?;
738
739            self.transaction_active = false;
740        }
741
742        self.metrics.record_rollback();
743        debug!(epoch, "rolled back epoch");
744        Ok(())
745    }
746
747    fn health_check(&self) -> HealthStatus {
748        match self.state {
749            ConnectorState::Running => HealthStatus::Healthy,
750            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
751            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
752            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
753            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
754            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
755        }
756    }
757
758    fn metrics(&self) -> ConnectorMetrics {
759        self.metrics.to_connector_metrics()
760    }
761
762    fn capabilities(&self) -> SinkConnectorCapabilities {
763        let mut caps = SinkConnectorCapabilities::default()
764            .with_idempotent()
765            .with_partitioned();
766
767        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
768            caps = caps.with_exactly_once().with_two_phase_commit();
769        }
770
771        if self.schema_registry.is_some() {
772            caps = caps.with_schema_evolution();
773        }
774
775        caps
776    }
777
778    async fn flush(&mut self) -> Result<(), ConnectorError> {
779        if let Some(ref producer) = self.producer {
780            producer
781                .flush(self.config.delivery_timeout)
782                .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
783        }
784        Ok(())
785    }
786
787    async fn close(&mut self) -> Result<(), ConnectorError> {
788        info!("closing Kafka sink connector");
789
790        // Abort any active transaction.
791        if self.transaction_active {
792            if let Err(e) = self.rollback_epoch(self.current_epoch).await {
793                warn!(error = %e, "failed to abort active transaction on close");
794            }
795        }
796
797        // Flush remaining messages.
798        if let Some(ref producer) = self.producer {
799            if let Err(e) = producer.flush(Duration::from_secs(30)) {
800                warn!(error = %e, "failed to flush on close");
801            }
802        }
803
804        self.producer = None;
805        self.dlq_producer = None;
806        self.state = ConnectorState::Closed;
807        info!("Kafka sink connector closed");
808        Ok(())
809    }
810}
811
812impl std::fmt::Debug for KafkaSink {
813    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
814        f.debug_struct("KafkaSink")
815            .field("state", &self.state)
816            .field("topic", &self.config.topic)
817            .field("delivery", &self.config.delivery_guarantee)
818            .field("format", &self.config.format)
819            .field("current_epoch", &self.current_epoch)
820            .field("last_committed_epoch", &self.last_committed_epoch)
821            .field("transaction_active", &self.transaction_active)
822            .finish_non_exhaustive()
823    }
824}
825
826/// Selects the appropriate serializer for the given format.
827///
828/// For Avro, uses the shared `schema_id` handle so that Schema Registry
829/// registration updates are visible to the serializer.
830fn select_serializer(
831    format: Format,
832    schema: &SchemaRef,
833    schema_id: Arc<std::sync::atomic::AtomicU32>,
834    registry: Option<Arc<SchemaRegistryClient>>,
835) -> Box<dyn RecordSerializer> {
836    match format {
837        Format::Avro => Box::new(AvroSerializer::with_shared_schema_id(
838            schema.clone(),
839            schema_id,
840            registry,
841        )),
842        other => serde::create_serializer(other).unwrap_or_else(|_| {
843            tracing::warn!(format = %other, "unsupported serializer format, falling back to JSON");
844            Box::new(serde::json::JsonSerializer::new())
845        }),
846    }
847}
848
849/// Selects the appropriate partitioner for the given strategy.
850fn select_partitioner(strategy: PartitionStrategy) -> Box<dyn KafkaPartitioner> {
851    match strategy {
852        PartitionStrategy::KeyHash => Box::new(KeyHashPartitioner::new()),
853        PartitionStrategy::RoundRobin => Box::new(RoundRobinPartitioner::new()),
854        PartitionStrategy::Sticky => Box::new(StickyPartitioner::new(100)),
855    }
856}
857
858#[cfg(test)]
859mod tests {
860    use super::*;
861    use arrow_array::Int64Array;
862    use arrow_schema::{DataType, Field, Schema};
863
864    fn test_schema() -> SchemaRef {
865        Arc::new(Schema::new(vec![
866            Field::new("id", DataType::Int64, false),
867            Field::new("value", DataType::Utf8, false),
868        ]))
869    }
870
871    fn test_config() -> KafkaSinkConfig {
872        let mut cfg = KafkaSinkConfig::default();
873        cfg.bootstrap_servers = "localhost:9092".into();
874        cfg.topic = "output-events".into();
875        cfg
876    }
877
878    #[test]
879    fn test_new_defaults() {
880        let sink = KafkaSink::new(test_schema(), test_config());
881        assert_eq!(sink.state(), ConnectorState::Created);
882        assert!(sink.producer.is_none());
883        assert_eq!(sink.current_epoch(), 0);
884        assert_eq!(sink.last_committed_epoch(), 0);
885        assert!(!sink.transaction_active);
886    }
887
888    #[test]
889    fn test_schema_returned() {
890        let schema = test_schema();
891        let sink = KafkaSink::new(schema.clone(), test_config());
892        assert_eq!(sink.schema(), schema);
893    }
894
895    #[test]
896    fn test_health_check_created() {
897        let sink = KafkaSink::new(test_schema(), test_config());
898        assert_eq!(sink.health_check(), HealthStatus::Unknown);
899    }
900
901    #[test]
902    fn test_health_check_running() {
903        let mut sink = KafkaSink::new(test_schema(), test_config());
904        sink.state = ConnectorState::Running;
905        assert_eq!(sink.health_check(), HealthStatus::Healthy);
906    }
907
908    #[test]
909    fn test_health_check_closed() {
910        let mut sink = KafkaSink::new(test_schema(), test_config());
911        sink.state = ConnectorState::Closed;
912        assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
913    }
914
915    #[test]
916    fn test_metrics_initial() {
917        let sink = KafkaSink::new(test_schema(), test_config());
918        let m = sink.metrics();
919        assert_eq!(m.records_total, 0);
920        assert_eq!(m.bytes_total, 0);
921        assert_eq!(m.errors_total, 0);
922    }
923
924    #[test]
925    fn test_capabilities_at_least_once() {
926        let sink = KafkaSink::new(test_schema(), test_config());
927        let caps = sink.capabilities();
928        assert!(!caps.exactly_once);
929        assert!(caps.idempotent);
930        assert!(caps.partitioned);
931        assert!(!caps.schema_evolution);
932    }
933
934    #[test]
935    fn test_capabilities_exactly_once() {
936        let mut cfg = test_config();
937        cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
938        let sink = KafkaSink::new(test_schema(), cfg);
939        let caps = sink.capabilities();
940        assert!(caps.exactly_once);
941        assert!(caps.idempotent);
942        assert!(caps.partitioned);
943    }
944
945    #[test]
946    fn test_serializer_selection_json() {
947        let sink = KafkaSink::new(test_schema(), test_config());
948        assert_eq!(sink.serializer.format(), Format::Json);
949    }
950
951    #[test]
952    fn test_serializer_selection_avro() {
953        let mut cfg = test_config();
954        cfg.format = Format::Avro;
955        let sink = KafkaSink::new(test_schema(), cfg);
956        assert_eq!(sink.serializer.format(), Format::Avro);
957    }
958
959    #[test]
960    fn test_with_schema_registry() {
961        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
962        let mut cfg = test_config();
963        cfg.format = Format::Avro;
964        cfg.schema_registry_url = Some("http://localhost:8081".into());
965
966        let sink = KafkaSink::with_schema_registry(test_schema(), cfg, sr);
967        assert!(sink.has_schema_registry());
968        assert_eq!(sink.serializer.format(), Format::Avro);
969        let caps = sink.capabilities();
970        assert!(caps.schema_evolution);
971    }
972
973    #[test]
974    fn test_debug_output() {
975        let sink = KafkaSink::new(test_schema(), test_config());
976        let debug = format!("{sink:?}");
977        assert!(debug.contains("KafkaSink"));
978        assert!(debug.contains("output-events"));
979    }
980
981    #[test]
982    fn test_extract_keys_no_key_column() {
983        let sink = KafkaSink::new(test_schema(), test_config());
984        let batch = arrow_array::RecordBatch::try_new(
985            test_schema(),
986            vec![
987                Arc::new(Int64Array::from(vec![1, 2])),
988                Arc::new(StringArray::from(vec!["a", "b"])),
989            ],
990        )
991        .unwrap();
992        assert!(sink.extract_keys(&batch).unwrap().is_none());
993    }
994
995    #[test]
996    fn test_extract_keys_with_key_column() {
997        let mut cfg = test_config();
998        cfg.key_column = Some("value".into());
999        let sink = KafkaSink::new(test_schema(), cfg);
1000        let batch = arrow_array::RecordBatch::try_new(
1001            test_schema(),
1002            vec![
1003                Arc::new(Int64Array::from(vec![1, 2])),
1004                Arc::new(StringArray::from(vec!["key-a", "key-b"])),
1005            ],
1006        )
1007        .unwrap();
1008        let keys = sink.extract_keys(&batch).unwrap().unwrap();
1009        assert_eq!(keys.len(), 2);
1010        assert_eq!(&keys[0], b"key-a");
1011        assert_eq!(&keys[1], b"key-b");
1012    }
1013}