Skip to main content

laminar_connectors/kafka/
sink.rs

1//! Kafka sink connector.
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use arrow_array::{Array, StringArray};
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9use rdkafka::error::{KafkaError, RDKafkaErrorCode};
10use rdkafka::message::OwnedHeaders;
11use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
12use rdkafka::ClientConfig;
13use tracing::{debug, info, warn};
14
15use crate::config::{ConnectorConfig, ConnectorState};
16use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
17use crate::error::ConnectorError;
18use crate::health::HealthStatus;
19use crate::metrics::ConnectorMetrics;
20use crate::serde::{self, Format, RecordSerializer};
21
22use super::avro_serializer::AvroSerializer;
23use super::partitioner::{
24    KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
25};
26use super::schema_registry::SchemaRegistryClient;
27use super::sink_config::{KafkaSinkConfig, PartitionStrategy};
28use super::sink_metrics::KafkaSinkMetrics;
29use crate::connector::DeliveryGuarantee;
30
31/// Fallback partition count used only when broker metadata query fails.
32const FALLBACK_PARTITION_COUNT: i32 = 1;
33
34/// Short deadline used by `SinkConnector::flush` — kept well below any
35/// outer tokio timeout so a `spawn_blocking` flush can't outlive its
36/// caller and leak a blocking thread. Thorough drains go through
37/// `pre_commit` / `commit_epoch` / `close` with their own budgets.
38const PERIODIC_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
39
40/// Contiguous key buffer — stores all key bytes in a single allocation
41/// with per-row `(offset, length)` pairs. Avoids N separate heap
42/// allocations for N rows.
43struct KeyBuffer {
44    data: Vec<u8>,
45    offsets: Vec<(usize, usize)>,
46}
47
48impl KeyBuffer {
49    fn with_capacity(num_rows: usize, avg_key_len: usize) -> Self {
50        Self {
51            data: Vec::with_capacity(num_rows * avg_key_len),
52            offsets: Vec::with_capacity(num_rows),
53        }
54    }
55
56    fn push(&mut self, key: &[u8]) {
57        let start = self.data.len();
58        self.data.extend_from_slice(key);
59        self.offsets.push((start, key.len()));
60    }
61
62    fn push_empty(&mut self) {
63        self.offsets.push((0, 0));
64    }
65
66    fn key(&self, i: usize) -> &[u8] {
67        let (start, len) = self.offsets[i];
68        &self.data[start..start + len]
69    }
70
71    #[cfg(test)]
72    fn len(&self) -> usize {
73        self.offsets.len()
74    }
75}
76
77impl std::ops::Index<usize> for KeyBuffer {
78    type Output = [u8];
79
80    fn index(&self, i: usize) -> &[u8] {
81        self.key(i)
82    }
83}
84
85/// Kafka sink connector that writes Arrow `RecordBatch` data to Kafka topics.
86///
87/// Operates in Ring 1 (background) receiving data from Ring 0 via the
88/// subscription API.
89///
90/// # Lifecycle
91///
92/// 1. Create with [`KafkaSink::new`]
93/// 2. Call `open()` to create the producer and connect to Kafka
94/// 3. For each epoch:
95///    - `begin_epoch()` starts a Kafka transaction (exactly-once only)
96///    - `write_batch()` serializes and produces records
97///    - `commit_epoch()` commits the transaction
98/// 4. Call `close()` for clean shutdown
99pub struct KafkaSink {
100    /// rdkafka producer (set during `open()`).
101    producer: Option<FutureProducer>,
102    /// Parsed Kafka sink configuration.
103    config: KafkaSinkConfig,
104    /// Format-specific serializer.
105    serializer: Box<dyn RecordSerializer>,
106    /// Partitioner for determining target partitions.
107    partitioner: Box<dyn KafkaPartitioner>,
108    /// Connector lifecycle state.
109    state: ConnectorState,
110    /// Current `LaminarDB` epoch.
111    current_epoch: u64,
112    /// Last successfully committed epoch.
113    last_committed_epoch: u64,
114    /// Whether a Kafka transaction is currently active.
115    transaction_active: bool,
116    /// Dead letter queue producer (separate, non-transactional).
117    dlq_producer: Option<FutureProducer>,
118    /// Production metrics.
119    metrics: KafkaSinkMetrics,
120    /// Arrow schema for input batches.
121    schema: SchemaRef,
122    /// Optional Schema Registry client.
123    schema_registry: Option<Arc<SchemaRegistryClient>>,
124    /// Shared Avro schema ID (updated after SR registration).
125    avro_schema_id: Arc<std::sync::atomic::AtomicU32>,
126    /// Cached topic partition count (queried from broker metadata after open).
127    topic_partition_count: i32,
128}
129
130impl KafkaSink {
131    /// Creates a new Kafka sink connector with explicit schema.
132    ///
133    /// # Panics
134    ///
135    /// Panics if `config.format` is not a supported serialization format.
136    /// Call [`KafkaSinkConfig::validate`] first to catch this at config time.
137    #[must_use]
138    pub fn new(
139        schema: SchemaRef,
140        config: KafkaSinkConfig,
141        registry: Option<&prometheus::Registry>,
142    ) -> Self {
143        let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
144        let serializer =
145            select_serializer(config.format, &schema, Arc::clone(&avro_schema_id), None)
146                .expect("format validated in KafkaSinkConfig::validate()");
147        let partitioner = select_partitioner(config.partitioner);
148
149        Self {
150            producer: None,
151            config,
152            serializer,
153            partitioner,
154            state: ConnectorState::Created,
155            current_epoch: 0,
156            last_committed_epoch: 0,
157            transaction_active: false,
158            dlq_producer: None,
159            metrics: KafkaSinkMetrics::new(registry),
160            schema,
161            schema_registry: None,
162            avro_schema_id,
163            topic_partition_count: FALLBACK_PARTITION_COUNT,
164        }
165    }
166
167    /// Creates a new Kafka sink with Schema Registry integration.
168    ///
169    /// # Panics
170    ///
171    /// Panics if `config.format` is not a supported serialization format.
172    /// Call [`KafkaSinkConfig::validate`] first to catch this at config time.
173    #[must_use]
174    pub fn with_schema_registry(
175        schema: SchemaRef,
176        config: KafkaSinkConfig,
177        sr_client: SchemaRegistryClient,
178    ) -> Self {
179        let sr = Arc::new(sr_client);
180        let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
181        let serializer = select_serializer(
182            config.format,
183            &schema,
184            Arc::clone(&avro_schema_id),
185            Some(Arc::clone(&sr)),
186        )
187        .expect("format validated in KafkaSinkConfig::validate()");
188        let partitioner = select_partitioner(config.partitioner);
189
190        Self {
191            producer: None,
192            config,
193            serializer,
194            partitioner,
195            state: ConnectorState::Created,
196            current_epoch: 0,
197            last_committed_epoch: 0,
198            transaction_active: false,
199            dlq_producer: None,
200            metrics: KafkaSinkMetrics::new(None),
201            schema,
202            schema_registry: Some(sr),
203            avro_schema_id,
204            topic_partition_count: FALLBACK_PARTITION_COUNT,
205        }
206    }
207
208    /// Lifecycle state (Created → Running → Closed).
209    #[must_use]
210    pub fn state(&self) -> ConnectorState {
211        self.state
212    }
213
214    /// Whether Avro schema registration is available.
215    #[must_use]
216    pub fn has_schema_registry(&self) -> bool {
217        self.schema_registry.is_some()
218    }
219
220    /// Active epoch (incremented by checkpoint coordinator).
221    #[must_use]
222    pub fn current_epoch(&self) -> u64 {
223        self.current_epoch
224    }
225
226    /// Last epoch that was successfully committed to Kafka.
227    #[must_use]
228    pub fn last_committed_epoch(&self) -> u64 {
229        self.last_committed_epoch
230    }
231
232    /// Ensures the sink schema and SR registration match the actual data.
233    async fn ensure_schema_ready(
234        &mut self,
235        batch_schema: &SchemaRef,
236    ) -> Result<(), ConnectorError> {
237        let schema_changed = self.schema != *batch_schema;
238        let needs_registration = self.config.format == Format::Avro
239            && (schema_changed
240                || self
241                    .avro_schema_id
242                    .load(std::sync::atomic::Ordering::Relaxed)
243                    == 0);
244
245        // Register with SR *before* advancing schema/serializer so a failure
246        // doesn't leave avro_schema_id stale while the serializer already
247        // encodes with the new schema.
248        if needs_registration {
249            if let Some(ref sr) = self.schema_registry {
250                let subject = format!("{}-value", self.config.topic);
251                let avro_schema =
252                    super::schema_registry::arrow_to_avro_schema(batch_schema, &self.config.topic)
253                        .map_err(ConnectorError::Serde)?;
254                let schema_id = sr
255                    .register_schema(
256                        &subject,
257                        &avro_schema,
258                        super::schema_registry::SchemaType::Avro,
259                    )
260                    .await
261                    .map_err(|e| {
262                        ConnectorError::ConnectionFailed(format!(
263                            "failed to register Avro schema for '{subject}': {e}"
264                        ))
265                    })?;
266                #[allow(clippy::cast_sign_loss)]
267                self.avro_schema_id
268                    .store(schema_id as u32, std::sync::atomic::Ordering::Relaxed);
269                info!(subject = %subject, schema_id, "registered Avro schema");
270            }
271        }
272
273        if schema_changed {
274            debug!(
275                old = ?self.schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
276                new = ?batch_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
277                "sink schema updated from incoming batch"
278            );
279            self.schema = batch_schema.clone();
280            self.serializer = select_serializer(
281                self.config.format,
282                &self.schema,
283                Arc::clone(&self.avro_schema_id),
284                self.schema_registry.clone(),
285            )?;
286        }
287
288        Ok(())
289    }
290
291    /// Contiguous key buffer: all key bytes in one allocation with per-row offsets.
292    ///
293    /// Returns `None` if no key column is configured.
294    fn extract_keys(
295        &self,
296        batch: &arrow_array::RecordBatch,
297    ) -> Result<Option<KeyBuffer>, ConnectorError> {
298        let Some(key_col) = &self.config.key_column else {
299            return Ok(None);
300        };
301
302        let col_idx = batch.schema().index_of(key_col).map_err(|_| {
303            ConnectorError::ConfigurationError(format!(
304                "key column '{key_col}' not found in schema"
305            ))
306        })?;
307
308        let array = batch.column(col_idx);
309        let num_rows = batch.num_rows();
310        let mut buf = KeyBuffer::with_capacity(num_rows, 32);
311
312        // Try to get string values; fall back to display representation.
313        if let Some(str_array) = array.as_any().downcast_ref::<StringArray>() {
314            for i in 0..num_rows {
315                if str_array.is_null(i) {
316                    buf.push_empty();
317                } else {
318                    buf.push(str_array.value(i).as_bytes());
319                }
320            }
321        } else {
322            // For non-string columns, use the Arrow array formatter.
323            use std::fmt::Write;
324            let formatter = arrow_cast::display::ArrayFormatter::try_new(
325                array,
326                &arrow_cast::display::FormatOptions::default(),
327            )
328            .map_err(|e| {
329                ConnectorError::Internal(format!(
330                    "failed to create array formatter for key column: {e}"
331                ))
332            })?;
333            // Reusable string buffer for formatted values.
334            let mut fmt_buf = String::with_capacity(64);
335            for i in 0..num_rows {
336                if array.is_null(i) {
337                    buf.push_empty();
338                } else {
339                    fmt_buf.clear();
340                    let _ = write!(fmt_buf, "{}", formatter.value(i));
341                    buf.push(fmt_buf.as_bytes());
342                }
343            }
344        }
345
346        Ok(Some(buf))
347    }
348
349    /// Routes a failed record to the dead letter queue.
350    async fn route_to_dlq(
351        &self,
352        payload: &[u8],
353        key: Option<&[u8]>,
354        error_msg: &str,
355    ) -> Result<(), ConnectorError> {
356        let dlq_producer = self
357            .dlq_producer
358            .as_ref()
359            .ok_or_else(|| ConnectorError::ConfigurationError("DLQ topic not configured".into()))?;
360        let dlq_topic =
361            self.config.dlq_topic.as_ref().ok_or_else(|| {
362                ConnectorError::ConfigurationError("DLQ topic not configured".into())
363            })?;
364
365        let now = std::time::SystemTime::now()
366            .duration_since(std::time::UNIX_EPOCH)
367            .unwrap_or_else(|_| {
368                tracing::warn!("system clock before Unix epoch — using 0 for DLQ timestamp");
369                std::time::Duration::ZERO
370            })
371            .as_millis()
372            .to_string();
373        let epoch_str = self.current_epoch.to_string();
374
375        let headers = OwnedHeaders::new()
376            .insert(rdkafka::message::Header {
377                key: "__dlq.error",
378                value: Some(error_msg.as_bytes()),
379            })
380            .insert(rdkafka::message::Header {
381                key: "__dlq.topic",
382                value: Some(self.config.topic.as_bytes()),
383            })
384            .insert(rdkafka::message::Header {
385                key: "__dlq.timestamp",
386                value: Some(now.as_bytes()),
387            })
388            .insert(rdkafka::message::Header {
389                key: "__dlq.epoch",
390                value: Some(epoch_str.as_bytes()),
391            });
392
393        let mut record = FutureRecord::to(dlq_topic)
394            .payload(payload)
395            .headers(headers);
396
397        if let Some(k) = key {
398            record = record.key(k);
399        }
400
401        dlq_producer
402            .send(record, Duration::from_secs(5))
403            .await
404            .map_err(|(e, _)| ConnectorError::WriteError(format!("DLQ send failed: {e}")))?;
405
406        self.metrics.record_dlq();
407        Ok(())
408    }
409
410    /// Synchronously enqueue a record with a short retry on `QueueFull`.
411    /// Uses `send_result` rather than `send`, because the latter is
412    /// `async fn` in rdkafka 0.39+ and only enqueues when polled — which
413    /// would defeat the Vec-of-futures pipelining in `write_batch`.
414    async fn enqueue_with_queue_retry(
415        producer: &FutureProducer,
416        mut record: FutureRecord<'_, [u8], [u8]>,
417        queue_timeout: Duration,
418    ) -> Result<DeliveryFuture, ConnectorError> {
419        let start = Instant::now();
420        loop {
421            match producer.send_result(record) {
422                Ok(fut) => return Ok(fut),
423                Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), r))
424                    if start.elapsed() < queue_timeout =>
425                {
426                    record = r;
427                    tokio::time::sleep(Duration::from_millis(100)).await;
428                }
429                Err((e, _)) => {
430                    return Err(ConnectorError::WriteError(format!(
431                        "Kafka enqueue failed: {e}"
432                    )));
433                }
434            }
435        }
436    }
437
438    /// Flush on the blocking pool — `Producer::flush()` is a synchronous
439    /// FFI call that would stall the sink task's async select loop.
440    async fn flush_producer_async(
441        producer: &FutureProducer,
442        timeout: Duration,
443    ) -> Result<(), rdkafka::error::KafkaError> {
444        let p = producer.clone();
445        match tokio::task::spawn_blocking(move || p.flush(timeout)).await {
446            Ok(result) => result,
447            Err(join_err) => {
448                warn!("flush blocking task failed: {join_err}");
449                Err(rdkafka::error::KafkaError::Canceled)
450            }
451        }
452    }
453
454    /// Run a blocking producer operation on the thread pool. All
455    /// librdkafka transaction calls (`begin_transaction`,
456    /// `commit_transaction`, `abort_transaction`) are synchronous FFI.
457    async fn producer_blocking<F, R>(producer: &FutureProducer, op: F) -> R
458    where
459        F: FnOnce(&FutureProducer) -> R + Send + 'static,
460        R: Send + 'static,
461    {
462        let p = producer.clone();
463        tokio::task::spawn_blocking(move || op(&p))
464            .await
465            .expect("producer_blocking: blocking task panicked")
466    }
467}
468
469#[async_trait]
470#[allow(clippy::too_many_lines)]
471impl SinkConnector for KafkaSink {
472    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
473        self.state = ConnectorState::Initializing;
474
475        // Re-parse config if properties provided.
476        if !config.properties().is_empty() {
477            let parsed = KafkaSinkConfig::from_config(config)?;
478            self.config = parsed;
479            self.serializer = select_serializer(
480                self.config.format,
481                &self.schema,
482                Arc::clone(&self.avro_schema_id),
483                self.schema_registry.clone(),
484            )?;
485            self.partitioner = select_partitioner(self.config.partitioner);
486        }
487
488        info!(
489            brokers = %self.config.bootstrap_servers,
490            topic = %self.config.topic,
491            format = %self.config.format,
492            delivery = %self.config.delivery_guarantee,
493            "opening Kafka sink connector"
494        );
495
496        // Build rdkafka producer.
497        let rdkafka_config: ClientConfig = self.config.to_rdkafka_config();
498        let producer: FutureProducer = rdkafka_config.create().map_err(|e| {
499            ConnectorError::ConnectionFailed(format!("failed to create producer: {e}"))
500        })?;
501
502        // Initialize transactions if exactly-once.
503        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
504            producer
505                .init_transactions(self.config.transaction_timeout)
506                .map_err(|e| {
507                    ConnectorError::TransactionError(format!("failed to init transactions: {e}"))
508                })?;
509        }
510
511        // Create DLQ producer if configured. Inherits security settings
512        // (SASL, SSL) from the main producer config but is non-transactional.
513        // DLQ records bypass the exactly-once transaction to avoid coupling
514        // error routing with the main data path.
515        if self.config.dlq_topic.is_some() {
516            let dlq_config = self.config.to_dlq_rdkafka_config();
517            let dlq_producer: FutureProducer = dlq_config.create().map_err(|e| {
518                ConnectorError::ConnectionFailed(format!("failed to create DLQ producer: {e}"))
519            })?;
520            self.dlq_producer = Some(dlq_producer);
521        }
522
523        // Initialize Schema Registry client if configured.
524        if let Some(ref url) = self.config.schema_registry_url {
525            if self.schema_registry.is_none() {
526                let sr = if let Some(ref ca_path) = self.config.schema_registry_ssl_ca_location {
527                    SchemaRegistryClient::with_tls(
528                        url,
529                        self.config.schema_registry_auth.clone(),
530                        ca_path,
531                    )?
532                } else {
533                    SchemaRegistryClient::new(url, self.config.schema_registry_auth.clone())
534                };
535                self.schema_registry = Some(Arc::new(sr));
536            }
537        }
538
539        // Set SR compatibility level if configured.
540        // Schema registration is deferred to first write_batch() where the
541        // real pipeline output schema is known (the factory default is a
542        // placeholder that would pollute the registry and break compat checks).
543        if self.config.format == Format::Avro {
544            if let Some(ref sr) = self.schema_registry {
545                if let Some(ref compat) = self.config.schema_compatibility {
546                    let subject = format!("{}-value", self.config.topic);
547                    sr.set_compatibility_level(&subject, *compat)
548                        .await
549                        .map_err(|e| {
550                            ConnectorError::ConnectionFailed(format!(
551                                "failed to set SR compatibility for '{subject}': {e}"
552                            ))
553                        })?;
554                }
555            }
556        }
557
558        // Query broker metadata for actual topic partition count.
559        // Reset to fallback first so a reopened sink doesn't keep a stale count.
560        self.topic_partition_count = FALLBACK_PARTITION_COUNT;
561        match producer
562            .client()
563            .fetch_metadata(Some(&self.config.topic), Duration::from_secs(5))
564        {
565            Ok(metadata) => {
566                if let Some(topic_meta) = metadata.topics().first() {
567                    #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
568                    let count = topic_meta.partitions().len() as i32;
569                    if count > 0 {
570                        self.topic_partition_count = count;
571                        info!(
572                            topic = %self.config.topic,
573                            partitions = count,
574                            "queried topic partition count from broker"
575                        );
576                    }
577                }
578            }
579            Err(e) => {
580                warn!(
581                    topic = %self.config.topic,
582                    error = %e,
583                    fallback = FALLBACK_PARTITION_COUNT,
584                    "failed to query topic metadata — using fallback partition count"
585                );
586            }
587        }
588
589        self.producer = Some(producer);
590        self.state = ConnectorState::Running;
591        info!("Kafka sink connector opened successfully");
592        Ok(())
593    }
594
595    #[allow(clippy::cast_possible_truncation)] // Record batch row/byte counts fit in narrower types
596    async fn write_batch(
597        &mut self,
598        batch: &arrow_array::RecordBatch,
599    ) -> Result<WriteResult, ConnectorError> {
600        if self.state != ConnectorState::Running {
601            return Err(ConnectorError::InvalidState {
602                expected: "Running".into(),
603                actual: self.state.to_string(),
604            });
605        }
606
607        self.ensure_schema_ready(&batch.schema()).await?;
608
609        let producer = self
610            .producer
611            .as_ref()
612            .ok_or_else(|| ConnectorError::InvalidState {
613                expected: "producer initialized".into(),
614                actual: "producer is None".into(),
615            })?;
616
617        // Serialize the RecordBatch into per-row byte payloads.
618        let payloads = self.serializer.serialize(batch).map_err(|e| {
619            self.metrics.record_serialization_error();
620            ConnectorError::Serde(e)
621        })?;
622
623        // Extract keys if key column is configured.
624        let keys = self.extract_keys(batch)?;
625
626        let mut records_written: usize = 0;
627        let mut bytes_written: u64 = 0;
628
629        // Phase 1: enqueue every record into librdkafka's internal queue.
630        // Flush every flush_batch_size records to bound in-flight memory.
631        let flush_threshold = self.config.flush_batch_size;
632        let mut delivery_futures = Vec::with_capacity(payloads.len());
633        for (i, payload) in payloads.iter().enumerate() {
634            let key: Option<&[u8]> = keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
635            let partition = self.partitioner.partition(key, self.topic_partition_count);
636
637            let mut record = FutureRecord::to(&self.config.topic).payload(payload.as_slice());
638            if let Some(k) = key {
639                record = record.key(k);
640            }
641            if let Some(p) = partition {
642                record = record.partition(p);
643            }
644
645            // 500ms matches the old `send(record, 500ms)` contract: ride
646            // out transient QueueFull bursts before giving up.
647            let fut = Self::enqueue_with_queue_retry(producer, record, Duration::from_millis(500))
648                .await?;
649            delivery_futures.push((Instant::now(), fut));
650
651            if flush_threshold > 0 && (i + 1) % flush_threshold == 0 {
652                Self::flush_producer_async(producer, self.config.delivery_timeout)
653                    .await
654                    .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
655            }
656        }
657
658        // Phase 2: await each delivery report. Outer Err = oneshot canceled
659        // (producer dropped); inner Err = Kafka delivery error.
660        let mut failed: usize = 0;
661        let mut first_error: Option<String> = None;
662        for (i, (send_time, future)) in delivery_futures.into_iter().enumerate() {
663            let err_msg = match future.await {
664                Ok(Ok(_)) => {
665                    let latency_us = send_time.elapsed().as_micros() as u64;
666                    self.metrics.record_produce_latency(latency_us);
667                    records_written += 1;
668                    bytes_written += payloads[i].len() as u64;
669                    continue;
670                }
671                Ok(Err((err, _))) => err.to_string(),
672                Err(_canceled) => "delivery canceled — producer dropped before ack".into(),
673            };
674
675            self.metrics.record_error();
676            failed += 1;
677            if first_error.is_none() {
678                first_error = Some(err_msg.clone());
679            }
680
681            if self.dlq_producer.is_some() {
682                let key: Option<&[u8]> =
683                    keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
684                if let Err(dlq_err) = self.route_to_dlq(&payloads[i], key, &err_msg).await {
685                    warn!(
686                        original_error = %err_msg,
687                        dlq_error = %dlq_err,
688                        "failed to route record to DLQ — record lost"
689                    );
690                }
691            }
692        }
693
694        self.metrics
695            .record_write(records_written as u64, bytes_written);
696
697        debug!(
698            records = records_written,
699            bytes = bytes_written,
700            failed,
701            "wrote batch to Kafka"
702        );
703
704        // With DLQ, failures are already routed — report success. Without,
705        // surface the aggregate so the sink task can poison the epoch.
706        if failed > 0 && self.dlq_producer.is_none() {
707            return Err(ConnectorError::WriteError(format!(
708                "Kafka produce: {failed}/{} records failed, first error: {}",
709                payloads.len(),
710                first_error.unwrap_or_else(|| "unknown".into())
711            )));
712        }
713
714        Ok(WriteResult::new(records_written, bytes_written))
715    }
716
717    fn schema(&self) -> SchemaRef {
718        self.schema.clone()
719    }
720
721    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
722        self.current_epoch = epoch;
723
724        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
725            let producer = self
726                .producer
727                .as_ref()
728                .ok_or_else(|| ConnectorError::InvalidState {
729                    expected: "Running".into(),
730                    actual: self.state.to_string(),
731                })?;
732
733            if self.transaction_active {
734                warn!(epoch, "aborting stale transaction before new epoch");
735                let txn_timeout = self.config.transaction_timeout;
736                Self::producer_blocking(producer, move |p| p.abort_transaction(txn_timeout))
737                    .await
738                    .map_err(|e| {
739                        ConnectorError::TransactionError(format!(
740                            "cannot begin epoch {epoch}: abort of stale transaction failed: {e}"
741                        ))
742                    })?;
743                self.transaction_active = false;
744            }
745
746            Self::producer_blocking(producer, FutureProducer::begin_transaction)
747                .await
748                .map_err(|e| {
749                    ConnectorError::TransactionError(format!(
750                        "failed to begin transaction for epoch {epoch}: {e}"
751                    ))
752                })?;
753
754            self.transaction_active = true;
755        }
756
757        self.partitioner.reset();
758        debug!(epoch, "began epoch");
759        Ok(())
760    }
761
762    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
763        if epoch != self.current_epoch {
764            return Err(ConnectorError::TransactionError(format!(
765                "epoch mismatch in pre_commit: expected {}, got {epoch}",
766                self.current_epoch
767            )));
768        }
769
770        // Flush all pending messages to Kafka brokers (phase 1).
771        if let Some(ref producer) = self.producer {
772            Self::flush_producer_async(producer, self.config.delivery_timeout)
773                .await
774                .map_err(|e| {
775                    ConnectorError::TransactionError(format!(
776                        "failed to flush before pre-commit for epoch {epoch}: {e}"
777                    ))
778                })?;
779        }
780
781        debug!(epoch, "pre-committed epoch (flushed)");
782        Ok(())
783    }
784
785    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
786        if epoch != self.current_epoch {
787            return Err(ConnectorError::TransactionError(format!(
788                "epoch mismatch: expected {}, got {epoch}",
789                self.current_epoch
790            )));
791        }
792
793        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
794            let producer = self
795                .producer
796                .as_ref()
797                .ok_or_else(|| ConnectorError::InvalidState {
798                    expected: "Running".into(),
799                    actual: self.state.to_string(),
800                })?;
801
802            Self::flush_producer_async(producer, self.config.delivery_timeout)
803                .await
804                .map_err(|e| {
805                    ConnectorError::TransactionError(format!("failed to flush before commit: {e}"))
806                })?;
807
808            let txn_timeout = self.config.transaction_timeout;
809            Self::producer_blocking(producer, move |p| p.commit_transaction(txn_timeout))
810                .await
811                .map_err(|e| {
812                    ConnectorError::TransactionError(format!(
813                        "failed to commit transaction for epoch {epoch}: {e}"
814                    ))
815                })?;
816
817            self.transaction_active = false;
818        } else {
819            // At-least-once: just flush pending messages.
820            if let Some(ref producer) = self.producer {
821                Self::flush_producer_async(producer, self.config.delivery_timeout)
822                    .await
823                    .map_err(|e| {
824                        ConnectorError::TransactionError(format!(
825                            "failed to flush for epoch {epoch}: {e}"
826                        ))
827                    })?;
828            }
829        }
830
831        self.last_committed_epoch = epoch;
832        self.metrics.record_commit();
833        debug!(epoch, "committed epoch");
834        Ok(())
835    }
836
837    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
838        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
839            && self.transaction_active
840        {
841            let producer = self
842                .producer
843                .as_ref()
844                .ok_or_else(|| ConnectorError::InvalidState {
845                    expected: "Running".into(),
846                    actual: self.state.to_string(),
847                })?;
848
849            let txn_timeout = self.config.transaction_timeout;
850            Self::producer_blocking(producer, move |p| p.abort_transaction(txn_timeout))
851                .await
852                .map_err(|e| {
853                    ConnectorError::TransactionError(format!(
854                        "failed to abort transaction for epoch {epoch}: {e}"
855                    ))
856                })?;
857
858            self.transaction_active = false;
859        }
860
861        self.metrics.record_rollback();
862        debug!(epoch, "rolled back epoch");
863        Ok(())
864    }
865
866    fn health_check(&self) -> HealthStatus {
867        match self.state {
868            ConnectorState::Running => HealthStatus::Healthy,
869            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
870            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
871            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
872            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
873            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
874        }
875    }
876
877    fn metrics(&self) -> ConnectorMetrics {
878        self.metrics.to_connector_metrics()
879    }
880
881    fn capabilities(&self) -> SinkConnectorCapabilities {
882        // Catch stuck-broker scenarios faster than librdkafka's
883        // delivery.timeout.ms (default 5min).
884        let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(10))
885            .with_idempotent()
886            .with_partitioned();
887
888        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
889            caps = caps.with_exactly_once().with_two_phase_commit();
890        }
891
892        if self.schema_registry.is_some() {
893            caps = caps.with_schema_evolution();
894        }
895
896        caps
897    }
898
899    async fn flush(&mut self) -> Result<(), ConnectorError> {
900        if let Some(ref producer) = self.producer {
901            Self::flush_producer_async(producer, PERIODIC_FLUSH_TIMEOUT)
902                .await
903                .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
904        }
905        Ok(())
906    }
907
908    async fn close(&mut self) -> Result<(), ConnectorError> {
909        info!("closing Kafka sink connector");
910
911        // Abort any active transaction.
912        if self.transaction_active {
913            if let Err(e) = self.rollback_epoch(self.current_epoch).await {
914                warn!(error = %e, "failed to abort active transaction on close");
915            }
916        }
917
918        let mut first_err: Option<ConnectorError> = None;
919
920        if let Some(ref producer) = self.producer {
921            if let Err(e) = Self::flush_producer_async(producer, Duration::from_secs(30)).await {
922                warn!(error = %e, "failed to flush on close");
923                first_err.get_or_insert(ConnectorError::WriteError(format!(
924                    "flush failed on close: {e}"
925                )));
926            }
927        }
928
929        if let Some(ref dlq) = self.dlq_producer {
930            if let Err(e) = Self::flush_producer_async(dlq, Duration::from_secs(10)).await {
931                warn!(error = %e, "failed to flush DLQ producer on close");
932                first_err.get_or_insert(ConnectorError::WriteError(format!(
933                    "DLQ flush failed on close: {e}"
934                )));
935            }
936        }
937
938        self.producer = None;
939        self.dlq_producer = None;
940        self.state = ConnectorState::Closed;
941        info!("Kafka sink connector closed");
942        match first_err {
943            Some(e) => Err(e),
944            None => Ok(()),
945        }
946    }
947}
948
949impl std::fmt::Debug for KafkaSink {
950    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
951        f.debug_struct("KafkaSink")
952            .field("state", &self.state)
953            .field("topic", &self.config.topic)
954            .field("delivery", &self.config.delivery_guarantee)
955            .field("format", &self.config.format)
956            .field("current_epoch", &self.current_epoch)
957            .field("last_committed_epoch", &self.last_committed_epoch)
958            .field("transaction_active", &self.transaction_active)
959            .finish_non_exhaustive()
960    }
961}
962
963/// Selects the appropriate serializer for the given format.
964///
965/// For Avro, uses the shared `schema_id` handle so that Schema Registry
966/// registration updates are visible to the serializer.
967fn select_serializer(
968    format: Format,
969    schema: &SchemaRef,
970    schema_id: Arc<std::sync::atomic::AtomicU32>,
971    registry: Option<Arc<SchemaRegistryClient>>,
972) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
973    match format {
974        Format::Avro => Ok(Box::new(AvroSerializer::with_shared_schema_id(
975            schema.clone(),
976            schema_id,
977            registry,
978        ))),
979        other => serde::create_serializer(other).map_err(|e| {
980            ConnectorError::ConfigurationError(format!("unsupported sink format '{other}': {e}"))
981        }),
982    }
983}
984
985/// Selects the appropriate partitioner for the given strategy.
986fn select_partitioner(strategy: PartitionStrategy) -> Box<dyn KafkaPartitioner> {
987    match strategy {
988        PartitionStrategy::KeyHash => Box::new(KeyHashPartitioner::new()),
989        PartitionStrategy::RoundRobin => Box::new(RoundRobinPartitioner::new()),
990        PartitionStrategy::Sticky => Box::new(StickyPartitioner::new(100)),
991    }
992}
993
994#[cfg(test)]
995mod tests {
996    use super::*;
997    use arrow_array::Int64Array;
998    use arrow_schema::{DataType, Field, Schema};
999
1000    fn test_schema() -> SchemaRef {
1001        Arc::new(Schema::new(vec![
1002            Field::new("id", DataType::Int64, false),
1003            Field::new("value", DataType::Utf8, false),
1004        ]))
1005    }
1006
1007    fn test_config() -> KafkaSinkConfig {
1008        let mut cfg = KafkaSinkConfig::default();
1009        cfg.bootstrap_servers = "localhost:9092".into();
1010        cfg.topic = "output-events".into();
1011        cfg
1012    }
1013
1014    #[test]
1015    fn test_new_defaults() {
1016        let sink = KafkaSink::new(test_schema(), test_config(), None);
1017        assert_eq!(sink.state(), ConnectorState::Created);
1018        assert!(sink.producer.is_none());
1019        assert_eq!(sink.current_epoch(), 0);
1020        assert_eq!(sink.last_committed_epoch(), 0);
1021        assert!(!sink.transaction_active);
1022    }
1023
1024    #[test]
1025    fn test_schema_returned() {
1026        let schema = test_schema();
1027        let sink = KafkaSink::new(schema.clone(), test_config(), None);
1028        assert_eq!(sink.schema(), schema);
1029    }
1030
1031    #[test]
1032    fn test_health_check_created() {
1033        let sink = KafkaSink::new(test_schema(), test_config(), None);
1034        assert_eq!(sink.health_check(), HealthStatus::Unknown);
1035    }
1036
1037    #[test]
1038    fn test_health_check_running() {
1039        let mut sink = KafkaSink::new(test_schema(), test_config(), None);
1040        sink.state = ConnectorState::Running;
1041        assert_eq!(sink.health_check(), HealthStatus::Healthy);
1042    }
1043
1044    #[test]
1045    fn test_health_check_closed() {
1046        let mut sink = KafkaSink::new(test_schema(), test_config(), None);
1047        sink.state = ConnectorState::Closed;
1048        assert!(matches!(sink.health_check(), HealthStatus::Unhealthy(_)));
1049    }
1050
1051    #[test]
1052    fn test_metrics_initial() {
1053        let sink = KafkaSink::new(test_schema(), test_config(), None);
1054        let m = sink.metrics();
1055        assert_eq!(m.records_total, 0);
1056        assert_eq!(m.bytes_total, 0);
1057        assert_eq!(m.errors_total, 0);
1058    }
1059
1060    #[test]
1061    fn test_capabilities_at_least_once() {
1062        let sink = KafkaSink::new(test_schema(), test_config(), None);
1063        let caps = sink.capabilities();
1064        assert!(!caps.exactly_once);
1065        assert!(caps.idempotent);
1066        assert!(caps.partitioned);
1067        assert!(!caps.schema_evolution);
1068    }
1069
1070    #[test]
1071    fn test_capabilities_exactly_once() {
1072        let mut cfg = test_config();
1073        cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1074        let sink = KafkaSink::new(test_schema(), cfg, None);
1075        let caps = sink.capabilities();
1076        assert!(caps.exactly_once);
1077        assert!(caps.idempotent);
1078        assert!(caps.partitioned);
1079    }
1080
1081    #[test]
1082    fn test_serializer_selection_json() {
1083        let sink = KafkaSink::new(test_schema(), test_config(), None);
1084        assert_eq!(sink.serializer.format(), Format::Json);
1085    }
1086
1087    #[test]
1088    fn test_serializer_selection_avro() {
1089        let mut cfg = test_config();
1090        cfg.format = Format::Avro;
1091        let sink = KafkaSink::new(test_schema(), cfg, None);
1092        assert_eq!(sink.serializer.format(), Format::Avro);
1093    }
1094
1095    #[test]
1096    fn test_with_schema_registry() {
1097        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1098        let mut cfg = test_config();
1099        cfg.format = Format::Avro;
1100        cfg.schema_registry_url = Some("http://localhost:8081".into());
1101
1102        let sink = KafkaSink::with_schema_registry(test_schema(), cfg, sr);
1103        assert!(sink.has_schema_registry());
1104        assert_eq!(sink.serializer.format(), Format::Avro);
1105        let caps = sink.capabilities();
1106        assert!(caps.schema_evolution);
1107    }
1108
1109    #[test]
1110    fn test_debug_output() {
1111        let sink = KafkaSink::new(test_schema(), test_config(), None);
1112        let debug = format!("{sink:?}");
1113        assert!(debug.contains("KafkaSink"));
1114        assert!(debug.contains("output-events"));
1115    }
1116
1117    #[test]
1118    fn test_extract_keys_no_key_column() {
1119        let sink = KafkaSink::new(test_schema(), test_config(), None);
1120        let batch = arrow_array::RecordBatch::try_new(
1121            test_schema(),
1122            vec![
1123                Arc::new(Int64Array::from(vec![1, 2])),
1124                Arc::new(StringArray::from(vec!["a", "b"])),
1125            ],
1126        )
1127        .unwrap();
1128        assert!(sink.extract_keys(&batch).unwrap().is_none());
1129    }
1130
1131    #[test]
1132    fn test_extract_keys_with_key_column() {
1133        let mut cfg = test_config();
1134        cfg.key_column = Some("value".into());
1135        let sink = KafkaSink::new(test_schema(), cfg, None);
1136        let batch = arrow_array::RecordBatch::try_new(
1137            test_schema(),
1138            vec![
1139                Arc::new(Int64Array::from(vec![1, 2])),
1140                Arc::new(StringArray::from(vec!["key-a", "key-b"])),
1141            ],
1142        )
1143        .unwrap();
1144        let keys = sink.extract_keys(&batch).unwrap().unwrap();
1145        assert_eq!(keys.len(), 2);
1146        assert_eq!(&keys[0], b"key-a");
1147        assert_eq!(&keys[1], b"key-b");
1148    }
1149}