Skip to main content

laminar_connectors/kafka/
sink.rs

1//! Kafka sink connector.
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use arrow_array::{Array, StringArray};
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9use rdkafka::error::{KafkaError, RDKafkaErrorCode};
10use rdkafka::message::OwnedHeaders;
11use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
12use rdkafka::ClientConfig;
13use tracing::{debug, info, warn};
14
15use crate::config::{ConnectorConfig, ConnectorState};
16use crate::connector::{SinkConnector, SinkConnectorCapabilities, WriteResult};
17use crate::error::ConnectorError;
18use crate::serde::{self, Format, RecordSerializer};
19
20use super::avro_serializer::AvroSerializer;
21use super::partitioner::{
22    KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
23};
24use super::schema_registry::SchemaRegistryClient;
25use super::sink_config::{KafkaSinkConfig, PartitionStrategy};
26use super::sink_metrics::KafkaSinkMetrics;
27use crate::connector::DeliveryGuarantee;
28
29/// Fallback partition count used only when broker metadata query fails.
30const FALLBACK_PARTITION_COUNT: i32 = 1;
31
32/// Short deadline used by `SinkConnector::flush` — kept well below any
33/// outer tokio timeout so a `spawn_blocking` flush can't outlive its
34/// caller and leak a blocking thread. Thorough drains go through
35/// `pre_commit` / `commit_epoch` / `close` with their own budgets.
36const PERIODIC_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
37
38/// Contiguous key buffer — stores all key bytes in a single allocation
39/// with per-row `(offset, length)` pairs. Avoids N separate heap
40/// allocations for N rows.
41struct KeyBuffer {
42    data: Vec<u8>,
43    offsets: Vec<(usize, usize)>,
44}
45
46impl KeyBuffer {
47    fn with_capacity(num_rows: usize, avg_key_len: usize) -> Self {
48        Self {
49            data: Vec::with_capacity(num_rows * avg_key_len),
50            offsets: Vec::with_capacity(num_rows),
51        }
52    }
53
54    fn push(&mut self, key: &[u8]) {
55        let start = self.data.len();
56        self.data.extend_from_slice(key);
57        self.offsets.push((start, key.len()));
58    }
59
60    fn push_empty(&mut self) {
61        self.offsets.push((0, 0));
62    }
63
64    fn key(&self, i: usize) -> &[u8] {
65        let (start, len) = self.offsets[i];
66        &self.data[start..start + len]
67    }
68
69    #[cfg(test)]
70    fn len(&self) -> usize {
71        self.offsets.len()
72    }
73}
74
75impl std::ops::Index<usize> for KeyBuffer {
76    type Output = [u8];
77
78    fn index(&self, i: usize) -> &[u8] {
79        self.key(i)
80    }
81}
82
83/// Kafka sink connector that writes Arrow `RecordBatch` data to Kafka topics.
84///
85/// Operates in Ring 1 (background) receiving data from Ring 0 via the
86/// subscription API.
87///
88/// # Lifecycle
89///
90/// 1. Create with [`KafkaSink::new`]
91/// 2. Call `open()` to create the producer and connect to Kafka
92/// 3. For each epoch:
93///    - `begin_epoch()` starts a Kafka transaction (exactly-once only)
94///    - `write_batch()` serializes and produces records
95///    - `commit_epoch()` commits the transaction
96/// 4. Call `close()` for clean shutdown
97pub struct KafkaSink {
98    /// rdkafka producer (set during `open()`).
99    producer: Option<FutureProducer>,
100    /// Parsed Kafka sink configuration.
101    config: KafkaSinkConfig,
102    /// Format-specific serializer.
103    serializer: Box<dyn RecordSerializer>,
104    /// Partitioner for determining target partitions.
105    partitioner: Box<dyn KafkaPartitioner>,
106    /// Connector lifecycle state.
107    state: ConnectorState,
108    /// Current `LaminarDB` epoch.
109    current_epoch: u64,
110    /// Last successfully committed epoch.
111    last_committed_epoch: u64,
112    /// Whether a Kafka transaction is currently active.
113    transaction_active: bool,
114    /// Dead letter queue producer (separate, non-transactional).
115    dlq_producer: Option<FutureProducer>,
116    /// Production metrics.
117    metrics: KafkaSinkMetrics,
118    /// Arrow schema for input batches.
119    schema: SchemaRef,
120    /// Optional Schema Registry client.
121    schema_registry: Option<Arc<SchemaRegistryClient>>,
122    /// Shared Avro schema ID (updated after SR registration).
123    avro_schema_id: Arc<std::sync::atomic::AtomicU32>,
124    /// Cached topic partition count (queried from broker metadata after open).
125    topic_partition_count: i32,
126}
127
128impl KafkaSink {
129    /// Creates a new Kafka sink connector with explicit schema.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `config.format` is not a supported serialization format.
134    /// Call [`KafkaSinkConfig::validate`] first to catch this at config time.
135    #[must_use]
136    pub fn new(
137        schema: SchemaRef,
138        config: KafkaSinkConfig,
139        registry: Option<&prometheus::Registry>,
140    ) -> Self {
141        let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
142        let serializer =
143            select_serializer(config.format, &schema, Arc::clone(&avro_schema_id), None)
144                .expect("format validated in KafkaSinkConfig::validate()");
145        let partitioner = select_partitioner(config.partitioner);
146
147        Self {
148            producer: None,
149            config,
150            serializer,
151            partitioner,
152            state: ConnectorState::Created,
153            current_epoch: 0,
154            last_committed_epoch: 0,
155            transaction_active: false,
156            dlq_producer: None,
157            metrics: KafkaSinkMetrics::new(registry),
158            schema,
159            schema_registry: None,
160            avro_schema_id,
161            topic_partition_count: FALLBACK_PARTITION_COUNT,
162        }
163    }
164
165    /// Creates a new Kafka sink with Schema Registry integration.
166    ///
167    /// # Panics
168    ///
169    /// Panics if `config.format` is not a supported serialization format.
170    /// Call [`KafkaSinkConfig::validate`] first to catch this at config time.
171    #[must_use]
172    pub fn with_schema_registry(
173        schema: SchemaRef,
174        config: KafkaSinkConfig,
175        sr_client: SchemaRegistryClient,
176    ) -> Self {
177        let sr = Arc::new(sr_client);
178        let avro_schema_id = Arc::new(std::sync::atomic::AtomicU32::new(0));
179        let serializer = select_serializer(
180            config.format,
181            &schema,
182            Arc::clone(&avro_schema_id),
183            Some(Arc::clone(&sr)),
184        )
185        .expect("format validated in KafkaSinkConfig::validate()");
186        let partitioner = select_partitioner(config.partitioner);
187
188        Self {
189            producer: None,
190            config,
191            serializer,
192            partitioner,
193            state: ConnectorState::Created,
194            current_epoch: 0,
195            last_committed_epoch: 0,
196            transaction_active: false,
197            dlq_producer: None,
198            metrics: KafkaSinkMetrics::new(None),
199            schema,
200            schema_registry: Some(sr),
201            avro_schema_id,
202            topic_partition_count: FALLBACK_PARTITION_COUNT,
203        }
204    }
205
206    /// Lifecycle state (Created → Running → Closed).
207    #[must_use]
208    pub fn state(&self) -> ConnectorState {
209        self.state
210    }
211
212    /// Whether Avro schema registration is available.
213    #[must_use]
214    pub fn has_schema_registry(&self) -> bool {
215        self.schema_registry.is_some()
216    }
217
218    /// Active epoch (incremented by checkpoint coordinator).
219    #[must_use]
220    pub fn current_epoch(&self) -> u64 {
221        self.current_epoch
222    }
223
224    /// Last epoch that was successfully committed to Kafka.
225    #[must_use]
226    pub fn last_committed_epoch(&self) -> u64 {
227        self.last_committed_epoch
228    }
229
230    /// Ensures the sink schema and SR registration match the actual data.
231    async fn ensure_schema_ready(
232        &mut self,
233        batch_schema: &SchemaRef,
234    ) -> Result<(), ConnectorError> {
235        let schema_changed = self.schema != *batch_schema;
236        let needs_registration = self.config.format == Format::Avro
237            && (schema_changed
238                || self
239                    .avro_schema_id
240                    .load(std::sync::atomic::Ordering::Relaxed)
241                    == 0);
242
243        // Register with SR *before* advancing schema/serializer so a failure
244        // doesn't leave avro_schema_id stale while the serializer already
245        // encodes with the new schema.
246        if needs_registration {
247            if let Some(ref sr) = self.schema_registry {
248                let subject = format!("{}-value", self.config.topic);
249                let avro_schema =
250                    super::schema_registry::arrow_to_avro_schema(batch_schema, &self.config.topic)
251                        .map_err(ConnectorError::Serde)?;
252                let schema_id = sr
253                    .register_schema(
254                        &subject,
255                        &avro_schema,
256                        super::schema_registry::SchemaType::Avro,
257                    )
258                    .await
259                    .map_err(|e| {
260                        ConnectorError::ConnectionFailed(format!(
261                            "failed to register Avro schema for '{subject}': {e}"
262                        ))
263                    })?;
264                #[allow(clippy::cast_sign_loss)]
265                self.avro_schema_id
266                    .store(schema_id as u32, std::sync::atomic::Ordering::Relaxed);
267                info!(subject = %subject, schema_id, "registered Avro schema");
268            }
269        }
270
271        if schema_changed {
272            debug!(
273                old = ?self.schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
274                new = ?batch_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>(),
275                "sink schema updated from incoming batch"
276            );
277            self.schema = batch_schema.clone();
278            self.serializer = select_serializer(
279                self.config.format,
280                &self.schema,
281                Arc::clone(&self.avro_schema_id),
282                self.schema_registry.clone(),
283            )?;
284        }
285
286        Ok(())
287    }
288
289    /// Contiguous key buffer: all key bytes in one allocation with per-row offsets.
290    ///
291    /// Returns `None` if no key column is configured.
292    fn extract_keys(
293        &self,
294        batch: &arrow_array::RecordBatch,
295    ) -> Result<Option<KeyBuffer>, ConnectorError> {
296        let Some(key_col) = &self.config.key_column else {
297            return Ok(None);
298        };
299
300        let col_idx = batch.schema().index_of(key_col).map_err(|_| {
301            ConnectorError::ConfigurationError(format!(
302                "key column '{key_col}' not found in schema"
303            ))
304        })?;
305
306        let array = batch.column(col_idx);
307        let num_rows = batch.num_rows();
308        let mut buf = KeyBuffer::with_capacity(num_rows, 32);
309
310        // Try to get string values; fall back to display representation.
311        if let Some(str_array) = array.as_any().downcast_ref::<StringArray>() {
312            for i in 0..num_rows {
313                if str_array.is_null(i) {
314                    buf.push_empty();
315                } else {
316                    buf.push(str_array.value(i).as_bytes());
317                }
318            }
319        } else {
320            // For non-string columns, use the Arrow array formatter.
321            use std::fmt::Write;
322            let formatter = arrow_cast::display::ArrayFormatter::try_new(
323                array,
324                &arrow_cast::display::FormatOptions::default(),
325            )
326            .map_err(|e| {
327                ConnectorError::Internal(format!(
328                    "failed to create array formatter for key column: {e}"
329                ))
330            })?;
331            // Reusable string buffer for formatted values.
332            let mut fmt_buf = String::with_capacity(64);
333            for i in 0..num_rows {
334                if array.is_null(i) {
335                    buf.push_empty();
336                } else {
337                    fmt_buf.clear();
338                    let _ = write!(fmt_buf, "{}", formatter.value(i));
339                    buf.push(fmt_buf.as_bytes());
340                }
341            }
342        }
343
344        Ok(Some(buf))
345    }
346
347    /// Routes a failed record to the dead letter queue.
348    async fn route_to_dlq(
349        &self,
350        payload: &[u8],
351        key: Option<&[u8]>,
352        error_msg: &str,
353    ) -> Result<(), ConnectorError> {
354        let dlq_producer = self
355            .dlq_producer
356            .as_ref()
357            .ok_or_else(|| ConnectorError::ConfigurationError("DLQ topic not configured".into()))?;
358        let dlq_topic =
359            self.config.dlq_topic.as_ref().ok_or_else(|| {
360                ConnectorError::ConfigurationError("DLQ topic not configured".into())
361            })?;
362
363        let now = std::time::SystemTime::now()
364            .duration_since(std::time::UNIX_EPOCH)
365            .unwrap_or_else(|_| {
366                tracing::warn!("system clock before Unix epoch — using 0 for DLQ timestamp");
367                std::time::Duration::ZERO
368            })
369            .as_millis()
370            .to_string();
371        let epoch_str = self.current_epoch.to_string();
372
373        let headers = OwnedHeaders::new()
374            .insert(rdkafka::message::Header {
375                key: "__dlq.error",
376                value: Some(error_msg.as_bytes()),
377            })
378            .insert(rdkafka::message::Header {
379                key: "__dlq.topic",
380                value: Some(self.config.topic.as_bytes()),
381            })
382            .insert(rdkafka::message::Header {
383                key: "__dlq.timestamp",
384                value: Some(now.as_bytes()),
385            })
386            .insert(rdkafka::message::Header {
387                key: "__dlq.epoch",
388                value: Some(epoch_str.as_bytes()),
389            });
390
391        let mut record = FutureRecord::to(dlq_topic)
392            .payload(payload)
393            .headers(headers);
394
395        if let Some(k) = key {
396            record = record.key(k);
397        }
398
399        dlq_producer
400            .send(record, Duration::from_secs(5))
401            .await
402            .map_err(|(e, _)| ConnectorError::WriteError(format!("DLQ send failed: {e}")))?;
403
404        self.metrics.record_dlq();
405        Ok(())
406    }
407
408    /// Synchronously enqueue a record with a short retry on `QueueFull`.
409    /// Uses `send_result` rather than `send`, because the latter is
410    /// `async fn` in rdkafka 0.39+ and only enqueues when polled — which
411    /// would defeat the Vec-of-futures pipelining in `write_batch`.
412    async fn enqueue_with_queue_retry(
413        producer: &FutureProducer,
414        mut record: FutureRecord<'_, [u8], [u8]>,
415        queue_timeout: Duration,
416    ) -> Result<DeliveryFuture, ConnectorError> {
417        let start = Instant::now();
418        loop {
419            match producer.send_result(record) {
420                Ok(fut) => return Ok(fut),
421                Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), r))
422                    if start.elapsed() < queue_timeout =>
423                {
424                    record = r;
425                    tokio::time::sleep(Duration::from_millis(100)).await;
426                }
427                Err((e, _)) => {
428                    return Err(ConnectorError::WriteError(format!(
429                        "Kafka enqueue failed: {e}"
430                    )));
431                }
432            }
433        }
434    }
435
436    /// Flush on the blocking pool — `Producer::flush()` is a synchronous
437    /// FFI call that would stall the sink task's async select loop.
438    async fn flush_producer_async(
439        producer: &FutureProducer,
440        timeout: Duration,
441    ) -> Result<(), rdkafka::error::KafkaError> {
442        let p = producer.clone();
443        match tokio::task::spawn_blocking(move || p.flush(timeout)).await {
444            Ok(result) => result,
445            Err(join_err) => {
446                warn!("flush blocking task failed: {join_err}");
447                Err(rdkafka::error::KafkaError::Canceled)
448            }
449        }
450    }
451
452    /// Run a blocking producer operation on the thread pool. All
453    /// librdkafka transaction calls (`begin_transaction`,
454    /// `commit_transaction`, `abort_transaction`) are synchronous FFI.
455    async fn producer_blocking<F, R>(producer: &FutureProducer, op: F) -> R
456    where
457        F: FnOnce(&FutureProducer) -> R + Send + 'static,
458        R: Send + 'static,
459    {
460        let p = producer.clone();
461        tokio::task::spawn_blocking(move || op(&p))
462            .await
463            .expect("producer_blocking: blocking task panicked")
464    }
465}
466
467#[async_trait]
468#[allow(clippy::too_many_lines)]
469impl SinkConnector for KafkaSink {
470    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
471        self.state = ConnectorState::Initializing;
472
473        // Re-parse config if properties provided.
474        if !config.properties().is_empty() {
475            let parsed = KafkaSinkConfig::from_config(config)?;
476            self.config = parsed;
477            self.serializer = select_serializer(
478                self.config.format,
479                &self.schema,
480                Arc::clone(&self.avro_schema_id),
481                self.schema_registry.clone(),
482            )?;
483            self.partitioner = select_partitioner(self.config.partitioner);
484        }
485
486        info!(
487            brokers = %self.config.bootstrap_servers,
488            topic = %self.config.topic,
489            format = %self.config.format,
490            delivery = %self.config.delivery_guarantee,
491            "opening Kafka sink connector"
492        );
493
494        // Build rdkafka producer.
495        let rdkafka_config: ClientConfig = self.config.to_rdkafka_config();
496        let producer: FutureProducer = rdkafka_config.create().map_err(|e| {
497            ConnectorError::ConnectionFailed(format!("failed to create producer: {e}"))
498        })?;
499
500        // Initialize transactions if exactly-once.
501        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
502            producer
503                .init_transactions(self.config.transaction_timeout)
504                .map_err(|e| {
505                    ConnectorError::TransactionError(format!("failed to init transactions: {e}"))
506                })?;
507        }
508
509        // Create DLQ producer if configured. Inherits security settings
510        // (SASL, SSL) from the main producer config but is non-transactional.
511        // DLQ records bypass the exactly-once transaction to avoid coupling
512        // error routing with the main data path.
513        if self.config.dlq_topic.is_some() {
514            let dlq_config = self.config.to_dlq_rdkafka_config();
515            let dlq_producer: FutureProducer = dlq_config.create().map_err(|e| {
516                ConnectorError::ConnectionFailed(format!("failed to create DLQ producer: {e}"))
517            })?;
518            self.dlq_producer = Some(dlq_producer);
519        }
520
521        // Initialize Schema Registry client if configured.
522        if let Some(ref url) = self.config.schema_registry_url {
523            if self.schema_registry.is_none() {
524                let sr = if let Some(ref ca_path) = self.config.schema_registry_ssl_ca_location {
525                    SchemaRegistryClient::with_tls(
526                        url,
527                        self.config.schema_registry_auth.clone(),
528                        ca_path,
529                    )?
530                } else {
531                    SchemaRegistryClient::new(url, self.config.schema_registry_auth.clone())
532                };
533                self.schema_registry = Some(Arc::new(sr));
534            }
535        }
536
537        // Set SR compatibility level if configured.
538        // Schema registration is deferred to first write_batch() where the
539        // real pipeline output schema is known (the factory default is a
540        // placeholder that would pollute the registry and break compat checks).
541        if self.config.format == Format::Avro {
542            if let Some(ref sr) = self.schema_registry {
543                if let Some(ref compat) = self.config.schema_compatibility {
544                    let subject = format!("{}-value", self.config.topic);
545                    sr.set_compatibility_level(&subject, *compat)
546                        .await
547                        .map_err(|e| {
548                            ConnectorError::ConnectionFailed(format!(
549                                "failed to set SR compatibility for '{subject}': {e}"
550                            ))
551                        })?;
552                }
553            }
554        }
555
556        // Query broker metadata for actual topic partition count.
557        // Reset to fallback first so a reopened sink doesn't keep a stale count.
558        self.topic_partition_count = FALLBACK_PARTITION_COUNT;
559        match producer
560            .client()
561            .fetch_metadata(Some(&self.config.topic), Duration::from_secs(5))
562        {
563            Ok(metadata) => {
564                if let Some(topic_meta) = metadata.topics().first() {
565                    #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
566                    let count = topic_meta.partitions().len() as i32;
567                    if count > 0 {
568                        self.topic_partition_count = count;
569                        info!(
570                            topic = %self.config.topic,
571                            partitions = count,
572                            "queried topic partition count from broker"
573                        );
574                    }
575                }
576            }
577            Err(e) => {
578                warn!(
579                    topic = %self.config.topic,
580                    error = %e,
581                    fallback = FALLBACK_PARTITION_COUNT,
582                    "failed to query topic metadata — using fallback partition count"
583                );
584            }
585        }
586
587        self.producer = Some(producer);
588        self.state = ConnectorState::Running;
589        info!("Kafka sink connector opened successfully");
590        Ok(())
591    }
592
593    #[allow(clippy::cast_possible_truncation)] // Record batch row/byte counts fit in narrower types
594    async fn write_batch(
595        &mut self,
596        batch: &arrow_array::RecordBatch,
597    ) -> Result<WriteResult, ConnectorError> {
598        if self.state != ConnectorState::Running {
599            return Err(ConnectorError::InvalidState {
600                expected: "Running".into(),
601                actual: self.state.to_string(),
602            });
603        }
604
605        self.ensure_schema_ready(&batch.schema()).await?;
606
607        let producer = self
608            .producer
609            .as_ref()
610            .ok_or_else(|| ConnectorError::InvalidState {
611                expected: "producer initialized".into(),
612                actual: "producer is None".into(),
613            })?;
614
615        // Serialize the RecordBatch into per-row byte payloads.
616        let payloads = self.serializer.serialize(batch).map_err(|e| {
617            self.metrics.record_serialization_error();
618            ConnectorError::Serde(e)
619        })?;
620
621        // Extract keys if key column is configured.
622        let keys = self.extract_keys(batch)?;
623
624        let mut records_written: usize = 0;
625        let mut bytes_written: u64 = 0;
626
627        // Phase 1: enqueue every record into librdkafka's internal queue.
628        // Flush every flush_batch_size records to bound in-flight memory.
629        let flush_threshold = self.config.flush_batch_size;
630        let mut delivery_futures = Vec::with_capacity(payloads.len());
631        for (i, payload) in payloads.iter().enumerate() {
632            let key: Option<&[u8]> = keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
633            let partition = self.partitioner.partition(key, self.topic_partition_count);
634
635            let mut record = FutureRecord::to(&self.config.topic).payload(payload.as_slice());
636            if let Some(k) = key {
637                record = record.key(k);
638            }
639            if let Some(p) = partition {
640                record = record.partition(p);
641            }
642
643            // 500ms matches the old `send(record, 500ms)` contract: ride
644            // out transient QueueFull bursts before giving up.
645            let fut = Self::enqueue_with_queue_retry(producer, record, Duration::from_millis(500))
646                .await?;
647            delivery_futures.push((Instant::now(), fut));
648
649            if flush_threshold > 0 && (i + 1) % flush_threshold == 0 {
650                Self::flush_producer_async(producer, self.config.delivery_timeout)
651                    .await
652                    .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
653            }
654        }
655
656        // Phase 2: await each delivery report. Outer Err = oneshot canceled
657        // (producer dropped); inner Err = Kafka delivery error.
658        let mut failed: usize = 0;
659        let mut first_error: Option<String> = None;
660        for (i, (send_time, future)) in delivery_futures.into_iter().enumerate() {
661            let err_msg = match future.await {
662                Ok(Ok(_)) => {
663                    let latency_us = send_time.elapsed().as_micros() as u64;
664                    self.metrics.record_produce_latency(latency_us);
665                    records_written += 1;
666                    bytes_written += payloads[i].len() as u64;
667                    continue;
668                }
669                Ok(Err((err, _))) => err.to_string(),
670                Err(_canceled) => "delivery canceled — producer dropped before ack".into(),
671            };
672
673            self.metrics.record_error();
674            failed += 1;
675            if first_error.is_none() {
676                first_error = Some(err_msg.clone());
677            }
678
679            if self.dlq_producer.is_some() {
680                let key: Option<&[u8]> =
681                    keys.as_ref().map(|kb| kb.key(i)).filter(|k| !k.is_empty());
682                if let Err(dlq_err) = self.route_to_dlq(&payloads[i], key, &err_msg).await {
683                    warn!(
684                        original_error = %err_msg,
685                        dlq_error = %dlq_err,
686                        "failed to route record to DLQ — record lost"
687                    );
688                }
689            }
690        }
691
692        self.metrics
693            .record_write(records_written as u64, bytes_written);
694
695        debug!(
696            records = records_written,
697            bytes = bytes_written,
698            failed,
699            "wrote batch to Kafka"
700        );
701
702        // With DLQ, failures are already routed — report success. Without,
703        // surface the aggregate so the sink task can poison the epoch.
704        if failed > 0 && self.dlq_producer.is_none() {
705            return Err(ConnectorError::WriteError(format!(
706                "Kafka produce: {failed}/{} records failed, first error: {}",
707                payloads.len(),
708                first_error.unwrap_or_else(|| "unknown".into())
709            )));
710        }
711
712        Ok(WriteResult::new(records_written, bytes_written))
713    }
714
715    fn schema(&self) -> SchemaRef {
716        self.schema.clone()
717    }
718
719    async fn begin_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
720        self.current_epoch = epoch;
721
722        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
723            let producer = self
724                .producer
725                .as_ref()
726                .ok_or_else(|| ConnectorError::InvalidState {
727                    expected: "Running".into(),
728                    actual: self.state.to_string(),
729                })?;
730
731            // An already-open transaction is the continuation case: an
732            // abandoned epoch keeps its pending rows and the next
733            // epoch's commit covers them. Aborting here would lose them
734            // (a live abort is never replayed).
735            if self.transaction_active {
736                debug!(
737                    epoch,
738                    "transaction already open — pending rows ride into this epoch"
739                );
740                self.partitioner.reset();
741                return Ok(());
742            }
743
744            Self::producer_blocking(producer, FutureProducer::begin_transaction)
745                .await
746                .map_err(|e| {
747                    ConnectorError::TransactionError(format!(
748                        "failed to begin transaction for epoch {epoch}: {e}"
749                    ))
750                })?;
751
752            self.transaction_active = true;
753        }
754
755        self.partitioner.reset();
756        debug!(epoch, "began epoch");
757        Ok(())
758    }
759
760    async fn pre_commit(&mut self, epoch: u64) -> Result<(), ConnectorError> {
761        // Monotonic, not strict equality: the coordinator abandons
762        // failed epochs (ids are never reused), so the epoch sealing
763        // the currently open transaction may skip past the one
764        // `begin_epoch` was called with. Only a STALE epoch is an
765        // error — there is one open transaction and it always holds
766        // the rows since the last barrier.
767        if epoch < self.current_epoch {
768            return Err(ConnectorError::TransactionError(format!(
769                "stale epoch in pre_commit: current {}, got {epoch}",
770                self.current_epoch
771            )));
772        }
773        self.current_epoch = epoch;
774
775        // Flush all pending messages to Kafka brokers (phase 1).
776        if let Some(ref producer) = self.producer {
777            Self::flush_producer_async(producer, self.config.delivery_timeout)
778                .await
779                .map_err(|e| {
780                    ConnectorError::TransactionError(format!(
781                        "failed to flush before pre-commit for epoch {epoch}: {e}"
782                    ))
783                })?;
784        }
785
786        debug!(epoch, "pre-committed epoch (flushed)");
787        Ok(())
788    }
789
790    async fn commit_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
791        // Monotonic for the same reason as `pre_commit`.
792        if epoch < self.current_epoch {
793            return Err(ConnectorError::TransactionError(format!(
794                "stale epoch in commit: current {}, got {epoch}",
795                self.current_epoch
796            )));
797        }
798        self.current_epoch = epoch;
799
800        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
801            let producer = self
802                .producer
803                .as_ref()
804                .ok_or_else(|| ConnectorError::InvalidState {
805                    expected: "Running".into(),
806                    actual: self.state.to_string(),
807                })?;
808
809            Self::flush_producer_async(producer, self.config.delivery_timeout)
810                .await
811                .map_err(|e| {
812                    ConnectorError::TransactionError(format!("failed to flush before commit: {e}"))
813                })?;
814
815            let txn_timeout = self.config.transaction_timeout;
816            Self::producer_blocking(producer, move |p| p.commit_transaction(txn_timeout))
817                .await
818                .map_err(|e| {
819                    ConnectorError::TransactionError(format!(
820                        "failed to commit transaction for epoch {epoch}: {e}"
821                    ))
822                })?;
823
824            self.transaction_active = false;
825        } else {
826            // At-least-once: just flush pending messages.
827            if let Some(ref producer) = self.producer {
828                Self::flush_producer_async(producer, self.config.delivery_timeout)
829                    .await
830                    .map_err(|e| {
831                        ConnectorError::TransactionError(format!(
832                            "failed to flush for epoch {epoch}: {e}"
833                        ))
834                    })?;
835            }
836        }
837
838        self.last_committed_epoch = epoch;
839        self.metrics.record_commit();
840        debug!(epoch, "committed epoch");
841        Ok(())
842    }
843
844    async fn rollback_epoch(&mut self, epoch: u64) -> Result<(), ConnectorError> {
845        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce
846            && self.transaction_active
847        {
848            let producer = self
849                .producer
850                .as_ref()
851                .ok_or_else(|| ConnectorError::InvalidState {
852                    expected: "Running".into(),
853                    actual: self.state.to_string(),
854                })?;
855
856            let txn_timeout = self.config.transaction_timeout;
857            Self::producer_blocking(producer, move |p| p.abort_transaction(txn_timeout))
858                .await
859                .map_err(|e| {
860                    ConnectorError::TransactionError(format!(
861                        "failed to abort transaction for epoch {epoch}: {e}"
862                    ))
863                })?;
864
865            self.transaction_active = false;
866        }
867
868        self.metrics.record_rollback();
869        debug!(epoch, "rolled back epoch");
870        Ok(())
871    }
872
873    fn capabilities(&self) -> SinkConnectorCapabilities {
874        // Catch stuck-broker scenarios faster than librdkafka's
875        // delivery.timeout.ms (default 5min).
876        let mut caps = SinkConnectorCapabilities::new(Duration::from_secs(10))
877            .with_idempotent()
878            .with_partitioned();
879
880        if self.config.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
881            // The single open transaction holds all rows since the last
882            // commit, so an abandoned epoch's rows ride into the next
883            // epoch's commit — no rollback needed (or wanted) on a live
884            // coordination failure.
885            caps = caps
886                .with_exactly_once()
887                .with_two_phase_commit()
888                .with_preserves_pending_on_abandon();
889        }
890
891        if self.schema_registry.is_some() {
892            caps = caps.with_schema_evolution();
893        }
894
895        caps
896    }
897
898    async fn flush(&mut self) -> Result<(), ConnectorError> {
899        if let Some(ref producer) = self.producer {
900            Self::flush_producer_async(producer, PERIODIC_FLUSH_TIMEOUT)
901                .await
902                .map_err(|e| ConnectorError::WriteError(format!("flush failed: {e}")))?;
903        }
904        Ok(())
905    }
906
907    async fn close(&mut self) -> Result<(), ConnectorError> {
908        info!("closing Kafka sink connector");
909
910        // Abort any active transaction.
911        if self.transaction_active {
912            if let Err(e) = self.rollback_epoch(self.current_epoch).await {
913                warn!(error = %e, "failed to abort active transaction on close");
914            }
915        }
916
917        let mut first_err: Option<ConnectorError> = None;
918
919        if let Some(ref producer) = self.producer {
920            if let Err(e) = Self::flush_producer_async(producer, Duration::from_secs(30)).await {
921                warn!(error = %e, "failed to flush on close");
922                first_err.get_or_insert(ConnectorError::WriteError(format!(
923                    "flush failed on close: {e}"
924                )));
925            }
926        }
927
928        if let Some(ref dlq) = self.dlq_producer {
929            if let Err(e) = Self::flush_producer_async(dlq, Duration::from_secs(10)).await {
930                warn!(error = %e, "failed to flush DLQ producer on close");
931                first_err.get_or_insert(ConnectorError::WriteError(format!(
932                    "DLQ flush failed on close: {e}"
933                )));
934            }
935        }
936
937        self.producer = None;
938        self.dlq_producer = None;
939        self.state = ConnectorState::Closed;
940        info!("Kafka sink connector closed");
941        match first_err {
942            Some(e) => Err(e),
943            None => Ok(()),
944        }
945    }
946}
947
948impl std::fmt::Debug for KafkaSink {
949    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
950        f.debug_struct("KafkaSink")
951            .field("state", &self.state)
952            .field("topic", &self.config.topic)
953            .field("delivery", &self.config.delivery_guarantee)
954            .field("format", &self.config.format)
955            .field("current_epoch", &self.current_epoch)
956            .field("last_committed_epoch", &self.last_committed_epoch)
957            .field("transaction_active", &self.transaction_active)
958            .finish_non_exhaustive()
959    }
960}
961
962/// Selects the appropriate serializer for the given format.
963///
964/// For Avro, uses the shared `schema_id` handle so that Schema Registry
965/// registration updates are visible to the serializer.
966fn select_serializer(
967    format: Format,
968    schema: &SchemaRef,
969    schema_id: Arc<std::sync::atomic::AtomicU32>,
970    registry: Option<Arc<SchemaRegistryClient>>,
971) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
972    match format {
973        Format::Avro => Ok(Box::new(AvroSerializer::with_shared_schema_id(
974            schema.clone(),
975            schema_id,
976            registry,
977        ))),
978        other => serde::create_serializer(other).map_err(|e| {
979            ConnectorError::ConfigurationError(format!("unsupported sink format '{other}': {e}"))
980        }),
981    }
982}
983
984/// Selects the appropriate partitioner for the given strategy.
985fn select_partitioner(strategy: PartitionStrategy) -> Box<dyn KafkaPartitioner> {
986    match strategy {
987        PartitionStrategy::KeyHash => Box::new(KeyHashPartitioner::new()),
988        PartitionStrategy::RoundRobin => Box::new(RoundRobinPartitioner::new()),
989        PartitionStrategy::Sticky => Box::new(StickyPartitioner::new(100)),
990    }
991}
992
993#[cfg(test)]
994mod tests {
995    use super::*;
996    use arrow_array::Int64Array;
997    use arrow_schema::{DataType, Field, Schema};
998
999    fn test_schema() -> SchemaRef {
1000        Arc::new(Schema::new(vec![
1001            Field::new("id", DataType::Int64, false),
1002            Field::new("value", DataType::Utf8, false),
1003        ]))
1004    }
1005
1006    fn test_config() -> KafkaSinkConfig {
1007        let mut cfg = KafkaSinkConfig::default();
1008        cfg.bootstrap_servers = "localhost:9092".into();
1009        cfg.topic = "output-events".into();
1010        cfg
1011    }
1012
1013    #[test]
1014    fn test_new_defaults() {
1015        let sink = KafkaSink::new(test_schema(), test_config(), None);
1016        assert_eq!(sink.state(), ConnectorState::Created);
1017        assert!(sink.producer.is_none());
1018        assert_eq!(sink.current_epoch(), 0);
1019        assert_eq!(sink.last_committed_epoch(), 0);
1020        assert!(!sink.transaction_active);
1021    }
1022
1023    #[test]
1024    fn test_schema_returned() {
1025        let schema = test_schema();
1026        let sink = KafkaSink::new(schema.clone(), test_config(), None);
1027        assert_eq!(sink.schema(), schema);
1028    }
1029
1030    #[test]
1031    fn test_capabilities_at_least_once() {
1032        let sink = KafkaSink::new(test_schema(), test_config(), None);
1033        let caps = sink.capabilities();
1034        assert!(!caps.exactly_once);
1035        assert!(caps.idempotent);
1036        assert!(caps.partitioned);
1037        assert!(!caps.schema_evolution);
1038    }
1039
1040    #[test]
1041    fn test_capabilities_exactly_once() {
1042        let mut cfg = test_config();
1043        cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
1044        let sink = KafkaSink::new(test_schema(), cfg, None);
1045        let caps = sink.capabilities();
1046        assert!(caps.exactly_once);
1047        assert!(caps.idempotent);
1048        assert!(caps.partitioned);
1049    }
1050
1051    #[test]
1052    fn test_serializer_selection_json() {
1053        let sink = KafkaSink::new(test_schema(), test_config(), None);
1054        assert_eq!(sink.serializer.format(), Format::Json);
1055    }
1056
1057    #[test]
1058    fn test_serializer_selection_avro() {
1059        let mut cfg = test_config();
1060        cfg.format = Format::Avro;
1061        let sink = KafkaSink::new(test_schema(), cfg, None);
1062        assert_eq!(sink.serializer.format(), Format::Avro);
1063    }
1064
1065    #[test]
1066    fn test_with_schema_registry() {
1067        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1068        let mut cfg = test_config();
1069        cfg.format = Format::Avro;
1070        cfg.schema_registry_url = Some("http://localhost:8081".into());
1071
1072        let sink = KafkaSink::with_schema_registry(test_schema(), cfg, sr);
1073        assert!(sink.has_schema_registry());
1074        assert_eq!(sink.serializer.format(), Format::Avro);
1075        let caps = sink.capabilities();
1076        assert!(caps.schema_evolution);
1077    }
1078
1079    #[test]
1080    fn test_debug_output() {
1081        let sink = KafkaSink::new(test_schema(), test_config(), None);
1082        let debug = format!("{sink:?}");
1083        assert!(debug.contains("KafkaSink"));
1084        assert!(debug.contains("output-events"));
1085    }
1086
1087    #[test]
1088    fn test_extract_keys_no_key_column() {
1089        let sink = KafkaSink::new(test_schema(), test_config(), None);
1090        let batch = arrow_array::RecordBatch::try_new(
1091            test_schema(),
1092            vec![
1093                Arc::new(Int64Array::from(vec![1, 2])),
1094                Arc::new(StringArray::from(vec!["a", "b"])),
1095            ],
1096        )
1097        .unwrap();
1098        assert!(sink.extract_keys(&batch).unwrap().is_none());
1099    }
1100
1101    #[test]
1102    fn test_extract_keys_with_key_column() {
1103        let mut cfg = test_config();
1104        cfg.key_column = Some("value".into());
1105        let sink = KafkaSink::new(test_schema(), cfg, None);
1106        let batch = arrow_array::RecordBatch::try_new(
1107            test_schema(),
1108            vec![
1109                Arc::new(Int64Array::from(vec![1, 2])),
1110                Arc::new(StringArray::from(vec!["key-a", "key-b"])),
1111            ],
1112        )
1113        .unwrap();
1114        let keys = sink.extract_keys(&batch).unwrap().unwrap();
1115        assert_eq!(keys.len(), 2);
1116        assert_eq!(&keys[0], b"key-a");
1117        assert_eq!(&keys[1], b"key-b");
1118    }
1119}