Skip to main content

laminar_connectors/kafka/
source.rs

1//! Kafka source connector implementation.
2//!
3//! [`KafkaSource`] implements the [`SourceConnector`] trait, consuming
4//! from Kafka topics via rdkafka's `StreamConsumer`, deserializing
5//! messages using pluggable formats, and producing Arrow `RecordBatch`
6//! data through the connector SDK.
7
8use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
11use rdkafka::message::Message;
12use rdkafka::ClientConfig;
13use rdkafka::TopicPartitionList;
14use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Mutex};
16use tokio::sync::Notify;
17use tracing::{debug, info, warn};
18
19use super::rebalance::LaminarConsumerContext;
20
21use crate::checkpoint::SourceCheckpoint;
22use crate::config::{ConnectorConfig, ConnectorState};
23use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
24use crate::error::ConnectorError;
25use crate::health::HealthStatus;
26use crate::metrics::ConnectorMetrics;
27use crate::serde::{self, Format, RecordDeserializer};
28
29use super::avro::AvroDeserializer;
30use super::backpressure::KafkaBackpressureController;
31use super::config::{KafkaSourceConfig, StartupMode, TopicSubscription};
32use super::metrics::KafkaSourceMetrics;
33use super::offsets::OffsetTracker;
34use super::rebalance::RebalanceState;
35use super::schema_registry::SchemaRegistryClient;
36use super::watermarks::KafkaWatermarkTracker;
37
38/// Payload sent from the background Kafka reader task to [`KafkaSource::poll_batch`].
39struct KafkaPayload {
40    data: Vec<u8>,
41    topic: Arc<str>,
42    partition: i32,
43    offset: i64,
44    timestamp_ms: Option<i64>,
45    /// Kafka message headers serialized as JSON string ("{key: value, ...}").
46    /// Only populated when `include_headers` is enabled.
47    headers_json: Option<String>,
48}
49
50/// Kafka source connector that consumes messages and produces Arrow batches.
51///
52/// Operates in Ring 1 (background) and pushes deserialized `RecordBatch`
53/// data to Ring 0 via the streaming `Source<T>` API.
54///
55/// # Lifecycle
56///
57/// 1. Create with [`KafkaSource::new`] or [`KafkaSource::with_schema_registry`]
58/// 2. Call `open()` to connect to Kafka and subscribe to topics
59/// 3. Call `poll_batch()` in a loop to consume messages
60/// 4. Call `checkpoint()` / `restore()` for fault tolerance
61/// 5. Call `close()` for clean shutdown
62pub struct KafkaSource {
63    consumer: Option<StreamConsumer<LaminarConsumerContext>>,
64    config: KafkaSourceConfig,
65    deserializer: Box<dyn RecordDeserializer>,
66    offsets: OffsetTracker,
67    state: ConnectorState,
68    metrics: KafkaSourceMetrics,
69    schema: SchemaRef,
70    backpressure: KafkaBackpressureController,
71    channel_len: Arc<AtomicUsize>,
72    rebalance_state: Arc<Mutex<RebalanceState>>,
73    /// Shared rebalance counter bridging `LaminarConsumerContext` → `KafkaSourceMetrics`.
74    rebalance_counter: Arc<AtomicU64>,
75    /// Monotonic counter bumped on each partition revoke event.
76    ///
77    /// Shared with `LaminarConsumerContext` for lock-free revoke detection
78    /// from `poll_batch()`. The source compares `last_seen_revoke_gen`
79    /// against this value each poll cycle and only locks `rebalance_state`
80    /// when a change is detected to purge revoked partition offsets.
81    revoke_generation: Arc<AtomicU64>,
82    /// Last observed value of `revoke_generation`, cached per poll cycle.
83    last_seen_revoke_gen: u64,
84    schema_registry: Option<Arc<SchemaRegistryClient>>,
85    data_ready: Arc<Notify>,
86    checkpoint_request: Arc<AtomicBool>,
87    msg_rx: Option<tokio::sync::mpsc::Receiver<KafkaPayload>>,
88    reader_handle: Option<tokio::task::JoinHandle<()>>,
89    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
90    offset_commit_tx: Option<tokio::sync::watch::Sender<TopicPartitionList>>,
91    watermark_tracker: Option<KafkaWatermarkTracker>,
92}
93
94impl KafkaSource {
95    /// Creates a new Kafka source connector with explicit schema.
96    ///
97    /// # Arguments
98    ///
99    /// * `schema` - Arrow schema for output batches
100    /// * `config` - Parsed Kafka source configuration
101    #[must_use]
102    pub fn new(schema: SchemaRef, config: KafkaSourceConfig) -> Self {
103        let deserializer = select_deserializer(config.format);
104        let channel_len = Arc::new(AtomicUsize::new(0));
105        let backpressure = KafkaBackpressureController::new(
106            config.backpressure_high_watermark,
107            config.backpressure_low_watermark,
108            config.max_poll_records * 10, // rough channel capacity estimate
109            Arc::clone(&channel_len),
110        );
111
112        let watermark_tracker = if config.enable_watermark_tracking {
113            Some(
114                KafkaWatermarkTracker::new(0, config.idle_timeout)
115                    .with_max_out_of_orderness(config.max_out_of_orderness),
116            )
117        } else {
118            None
119        };
120
121        Self {
122            consumer: None,
123            config,
124            deserializer,
125            offsets: OffsetTracker::new(),
126            state: ConnectorState::Created,
127            metrics: KafkaSourceMetrics::new(),
128            schema,
129            backpressure,
130            channel_len,
131            rebalance_state: Arc::new(Mutex::new(RebalanceState::new())),
132            rebalance_counter: Arc::new(AtomicU64::new(0)),
133            revoke_generation: Arc::new(AtomicU64::new(0)),
134            last_seen_revoke_gen: 0,
135            schema_registry: None,
136            data_ready: Arc::new(Notify::new()),
137            checkpoint_request: Arc::new(AtomicBool::new(false)),
138            msg_rx: None,
139            reader_handle: None,
140            reader_shutdown: None,
141            offset_commit_tx: None,
142            watermark_tracker,
143        }
144    }
145
146    /// Creates a new Kafka source connector with Schema Registry.
147    ///
148    /// # Arguments
149    ///
150    /// * `schema` - Arrow schema for output batches
151    /// * `config` - Parsed Kafka source configuration
152    /// * `sr_client` - Schema Registry client
153    #[must_use]
154    pub fn with_schema_registry(
155        schema: SchemaRef,
156        config: KafkaSourceConfig,
157        sr_client: SchemaRegistryClient,
158    ) -> Self {
159        let sr = Arc::new(sr_client);
160        let deserializer: Box<dyn RecordDeserializer> = if config.format == Format::Avro {
161            Box::new(AvroDeserializer::with_schema_registry(Arc::clone(&sr)))
162        } else {
163            select_deserializer(config.format)
164        };
165
166        let channel_len = Arc::new(AtomicUsize::new(0));
167        let backpressure = KafkaBackpressureController::new(
168            config.backpressure_high_watermark,
169            config.backpressure_low_watermark,
170            config.max_poll_records * 10,
171            Arc::clone(&channel_len),
172        );
173
174        let watermark_tracker = if config.enable_watermark_tracking {
175            Some(
176                KafkaWatermarkTracker::new(0, config.idle_timeout)
177                    .with_max_out_of_orderness(config.max_out_of_orderness),
178            )
179        } else {
180            None
181        };
182
183        Self {
184            consumer: None,
185            config,
186            deserializer,
187            offsets: OffsetTracker::new(),
188            state: ConnectorState::Created,
189            metrics: KafkaSourceMetrics::new(),
190            schema,
191            backpressure,
192            channel_len,
193            rebalance_state: Arc::new(Mutex::new(RebalanceState::new())),
194            rebalance_counter: Arc::new(AtomicU64::new(0)),
195            revoke_generation: Arc::new(AtomicU64::new(0)),
196            last_seen_revoke_gen: 0,
197            schema_registry: Some(sr),
198            data_ready: Arc::new(Notify::new()),
199            checkpoint_request: Arc::new(AtomicBool::new(false)),
200            msg_rx: None,
201            reader_handle: None,
202            reader_shutdown: None,
203            offset_commit_tx: None,
204            watermark_tracker,
205        }
206    }
207
208    /// Lifecycle state (Created → Initializing → Running → Closed).
209    #[must_use]
210    pub fn state(&self) -> ConnectorState {
211        self.state
212    }
213
214    /// Per-topic-partition offset state for checkpoint and monitoring.
215    #[must_use]
216    pub fn offsets(&self) -> &OffsetTracker {
217        &self.offsets
218    }
219
220    /// Shared backpressure fill counter for downstream wiring.
221    #[must_use]
222    pub fn channel_len(&self) -> Arc<AtomicUsize> {
223        Arc::clone(&self.channel_len)
224    }
225
226    /// Shared partition assignment state (updated by rebalance callbacks).
227    #[must_use]
228    pub fn rebalance_state(&self) -> Arc<Mutex<RebalanceState>> {
229        Arc::clone(&self.rebalance_state)
230    }
231
232    /// Whether a Schema Registry client is configured.
233    #[must_use]
234    pub fn has_schema_registry(&self) -> bool {
235        self.schema_registry.is_some()
236    }
237
238    /// Current combined watermark from the watermark tracker.
239    ///
240    /// Returns `None` if watermark tracking is disabled or no partitions
241    /// have received data yet.
242    #[must_use]
243    pub fn current_watermark(&self) -> Option<i64> {
244        self.watermark_tracker
245            .as_ref()
246            .and_then(KafkaWatermarkTracker::current_watermark)
247    }
248
249    /// Returns the configured event-time column name, if any.
250    ///
251    /// Used by the pipeline to identify which column contains event timestamps
252    /// for watermark generation, instead of hardcoding `event_time`/`timestamp`.
253    #[must_use]
254    pub fn event_time_column(&self) -> Option<&str> {
255        self.config.event_time_column.as_deref()
256    }
257
258    /// Spawns the background reader task on first `poll_batch()` call.
259    ///
260    /// Deferred to allow `restore()` to access the consumer directly after `open()`.
261    #[allow(clippy::too_many_lines)]
262    fn ensure_reader_started(&mut self) {
263        if self.reader_handle.is_some() || self.consumer.is_none() {
264            return;
265        }
266
267        let consumer = self.consumer.take().unwrap();
268        let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(4096);
269        let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
270        let (offset_tx, offset_rx) = tokio::sync::watch::channel(TopicPartitionList::new());
271        let data_ready = Arc::clone(&self.data_ready);
272        let channel_len = Arc::clone(&self.channel_len);
273        let capture_headers = self.config.include_headers;
274        let broker_commit_interval = self.config.broker_commit_interval;
275
276        // The reader task owns the consumer. On shutdown it commits the latest
277        // offsets received via the offset_rx watch channel, then unsubscribes.
278        let reader_handle = tokio::spawn(async move {
279            let mut cached_topic: Arc<str> = Arc::from("");
280
281            // Periodic broker commit timer. Advisory — keeps kafka-consumer-groups
282            // lag monitoring accurate. Zero interval disables periodic commits.
283            let commit_enabled = !broker_commit_interval.is_zero();
284            let mut commit_timer = tokio::time::interval(if commit_enabled {
285                broker_commit_interval
286            } else {
287                // tokio::time::interval panics on Duration::ZERO; use a
288                // large interval that will never fire before shutdown.
289                std::time::Duration::from_secs(86400)
290            });
291            // Skip the first tick (fires immediately).
292            commit_timer.tick().await;
293
294            loop {
295                let msg_result = tokio::select! {
296                    biased;
297                    _ = shutdown_rx.changed() => break,
298                    _ = commit_timer.tick(), if commit_enabled => {
299                        let tpl = offset_rx.borrow().clone();
300                        if tpl.count() > 0 {
301                            match consumer.commit(&tpl, CommitMode::Async) {
302                                Ok(()) => info!(
303                                    partitions = tpl.count(),
304                                    "periodic broker offset commit (advisory)"
305                                ),
306                                Err(e) => warn!(
307                                    error = %e,
308                                    "periodic broker offset commit failed (advisory)"
309                                ),
310                            }
311                        }
312                        continue;
313                    },
314                    msg = consumer.recv() => msg,
315                };
316                match msg_result {
317                    Ok(msg) => {
318                        if let Some(payload) = msg.payload() {
319                            let topic = msg.topic();
320                            if &*cached_topic != topic {
321                                cached_topic = Arc::from(topic);
322                            }
323                            let timestamp_ms = match msg.timestamp() {
324                                rdkafka::Timestamp::CreateTime(ts)
325                                | rdkafka::Timestamp::LogAppendTime(ts) => Some(ts),
326                                rdkafka::Timestamp::NotAvailable => None,
327                            };
328                            let headers_json = if capture_headers {
329                                use rdkafka::message::Headers;
330                                msg.headers().map(|hdrs| {
331                                    let mut json = String::from('{');
332                                    for i in 0..hdrs.count() {
333                                        let header = hdrs.get(i);
334                                        if i > 0 {
335                                            json.push(',');
336                                        }
337                                        json.push('"');
338                                        json.push_str(header.key);
339                                        json.push_str("\":\"");
340                                        if let Some(val) = header.value {
341                                            json.push_str(&String::from_utf8_lossy(val));
342                                        }
343                                        json.push('"');
344                                    }
345                                    json.push('}');
346                                    json
347                                })
348                            } else {
349                                None
350                            };
351                            let kp = KafkaPayload {
352                                data: payload.to_vec(),
353                                topic: Arc::clone(&cached_topic),
354                                partition: msg.partition(),
355                                offset: msg.offset(),
356                                timestamp_ms,
357                                headers_json,
358                            };
359                            if msg_tx.send(kp).await.is_err() {
360                                break;
361                            }
362                            channel_len.fetch_add(1, Ordering::Relaxed);
363                            data_ready.notify_one();
364                        }
365                    }
366                    Err(e) => {
367                        warn!(error = %e, "Kafka consumer error");
368                    }
369                }
370            }
371
372            // Commit final offsets before unsubscribing. The close() method
373            // sends the latest TPL via offset_tx before signaling shutdown,
374            // so offset_rx.borrow() contains the most recent offsets.
375            let tpl = offset_rx.borrow().clone();
376            if tpl.count() > 0 {
377                match consumer.commit(&tpl, CommitMode::Sync) {
378                    Ok(()) => info!(
379                        partitions = tpl.count(),
380                        "committed final offsets on shutdown"
381                    ),
382                    Err(e) => warn!(error = %e, "failed to commit final offsets on shutdown"),
383                }
384            }
385
386            consumer.unsubscribe();
387        });
388
389        self.msg_rx = Some(msg_rx);
390        self.reader_handle = Some(reader_handle);
391        self.reader_shutdown = Some(shutdown_tx);
392        self.offset_commit_tx = Some(offset_tx);
393    }
394}
395
396#[async_trait]
397#[allow(clippy::too_many_lines)] // poll_batch has legitimate complexity (backpressure + deser + poison pill fallback)
398impl SourceConnector for KafkaSource {
399    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
400        self.state = ConnectorState::Initializing;
401
402        // If config provided, re-parse (supports runtime config override).
403        let kafka_config = if config.properties().is_empty() {
404            self.config.clone()
405        } else {
406            let parsed = KafkaSourceConfig::from_config(config)?;
407            self.config = parsed.clone();
408            parsed
409        };
410
411        // Re-select deserializer (factory defaults to JSON).
412        if let Some(ref sr_url) = kafka_config.schema_registry_url {
413            let sr_client = if let Some(ref ca) = kafka_config.schema_registry_ssl_ca_location {
414                SchemaRegistryClient::with_tls_mtls(
415                    sr_url.clone(),
416                    kafka_config.schema_registry_auth.clone(),
417                    ca,
418                    kafka_config
419                        .schema_registry_ssl_certificate_location
420                        .as_deref(),
421                    kafka_config.schema_registry_ssl_key_location.as_deref(),
422                )?
423            } else {
424                SchemaRegistryClient::new(sr_url.clone(), kafka_config.schema_registry_auth.clone())
425            };
426            let sr = Arc::new(sr_client);
427            self.schema_registry = Some(Arc::clone(&sr));
428            self.deserializer = if kafka_config.format == Format::Avro {
429                Box::new(AvroDeserializer::with_schema_registry(sr))
430            } else {
431                select_deserializer(kafka_config.format)
432            };
433        } else if let Some(ref sr) = self.schema_registry {
434            // Preserve SR client injected via with_schema_registry().
435            self.deserializer = if kafka_config.format == Format::Avro {
436                Box::new(AvroDeserializer::with_schema_registry(Arc::clone(sr)))
437            } else {
438                select_deserializer(kafka_config.format)
439            };
440        } else {
441            self.deserializer = select_deserializer(kafka_config.format);
442        }
443
444        // Override schema from SQL DDL if provided.
445        if let Some(schema) = config.arrow_schema() {
446            info!(
447                fields = schema.fields().len(),
448                "using SQL-defined schema for deserialization"
449            );
450            self.schema = schema;
451        }
452
453        info!(
454            brokers = %kafka_config.bootstrap_servers,
455            subscription = ?kafka_config.subscription,
456            group_id = %kafka_config.group_id,
457            format = %kafka_config.format,
458            schema_fields = self.schema.fields().len(),
459            "opening Kafka source connector"
460        );
461
462        // Build rdkafka consumer with rebalance-aware context.
463        let rdkafka_config: ClientConfig = kafka_config.to_rdkafka_config();
464        let context = LaminarConsumerContext::new(
465            Arc::clone(&self.checkpoint_request),
466            Arc::clone(&self.rebalance_state),
467            Arc::clone(&self.rebalance_counter),
468            Arc::clone(&self.revoke_generation),
469        );
470        let consumer: StreamConsumer<LaminarConsumerContext> =
471            rdkafka_config.create_with_context(context).map_err(|e| {
472                ConnectorError::ConnectionFailed(format!("failed to create consumer: {e}"))
473            })?;
474
475        // Subscribe to topics (list or regex pattern).
476        match &kafka_config.subscription {
477            TopicSubscription::Topics(topics) => {
478                let topic_refs: Vec<&str> = topics.iter().map(String::as_str).collect();
479                consumer.subscribe(&topic_refs).map_err(|e| {
480                    ConnectorError::ConnectionFailed(format!("failed to subscribe: {e}"))
481                })?;
482            }
483            TopicSubscription::Pattern(pattern) => {
484                // rdkafka requires a ^ prefix for regex patterns
485                let regex_pattern = if pattern.starts_with('^') {
486                    pattern.clone()
487                } else {
488                    format!("^{pattern}")
489                };
490                consumer.subscribe(&[&regex_pattern]).map_err(|e| {
491                    ConnectorError::ConnectionFailed(format!("failed to subscribe to pattern: {e}"))
492                })?;
493            }
494        }
495
496        // Apply startup mode positioning before starting the reader.
497        match &kafka_config.startup_mode {
498            // GroupOffsets/Earliest/Latest are handled via auto.offset.reset in to_rdkafka_config().
499            StartupMode::GroupOffsets | StartupMode::Earliest | StartupMode::Latest => {}
500            StartupMode::SpecificOffsets(offsets) => {
501                let mut tpl = rdkafka::TopicPartitionList::new();
502                let topics = match &kafka_config.subscription {
503                    TopicSubscription::Topics(t) => t.clone(),
504                    TopicSubscription::Pattern(_) => Vec::new(),
505                };
506                for topic in &topics {
507                    for (&partition, &offset) in offsets {
508                        tpl.add_partition_offset(topic, partition, rdkafka::Offset::Offset(offset))
509                            .ok();
510                    }
511                }
512                if tpl.count() > 0 {
513                    consumer.assign(&tpl).map_err(|e| {
514                        ConnectorError::ConnectionFailed(format!(
515                            "failed to assign specific offsets: {e}"
516                        ))
517                    })?;
518                    info!(
519                        partitions = tpl.count(),
520                        "assigned consumer to specific offsets"
521                    );
522                }
523            }
524            StartupMode::Timestamp(ts_ms) => {
525                // rdkafka requires assignment before offsets_for_times.
526                // Wait briefly for partition assignment from the group coordinator,
527                // then seek each assigned partition to the target timestamp.
528                let mut tpl = rdkafka::TopicPartitionList::new();
529                let topics = match &kafka_config.subscription {
530                    TopicSubscription::Topics(t) => t.clone(),
531                    TopicSubscription::Pattern(_) => Vec::new(),
532                };
533                // Query metadata to discover partition count per topic.
534                if let Ok(metadata) = consumer.fetch_metadata(
535                    topics.first().map(String::as_str),
536                    std::time::Duration::from_secs(10),
537                ) {
538                    for topic_meta in metadata.topics() {
539                        for partition_meta in topic_meta.partitions() {
540                            tpl.add_partition_offset(
541                                topic_meta.name(),
542                                partition_meta.id(),
543                                rdkafka::Offset::Offset(*ts_ms),
544                            )
545                            .ok();
546                        }
547                    }
548                }
549                if tpl.count() > 0 {
550                    match consumer.offsets_for_times(tpl, std::time::Duration::from_secs(10)) {
551                        Ok(resolved) => {
552                            consumer.assign(&resolved).map_err(|e| {
553                                ConnectorError::ConnectionFailed(format!(
554                                    "failed to assign timestamp offsets: {e}"
555                                ))
556                            })?;
557                            info!(
558                                timestamp_ms = ts_ms,
559                                partitions = resolved.count(),
560                                "assigned consumer to timestamp offsets"
561                            );
562                        }
563                        Err(e) => {
564                            warn!(
565                                error = %e,
566                                timestamp_ms = ts_ms,
567                                "failed to resolve timestamp offsets, falling back to group offsets"
568                            );
569                        }
570                    }
571                }
572            }
573        }
574
575        self.consumer = Some(consumer);
576        self.state = ConnectorState::Running;
577
578        // Reader is NOT started here — deferred to first `poll_batch()`.
579        // This allows `restore()` to access the consumer directly between
580        // `open()` and the first poll, calling `consumer.assign()` to seek
581        // to checkpoint offsets. The pipeline's fallback poll interval
582        // ensures the first `poll_batch()` fires promptly even without
583        // a `data_ready_notify()` signal.
584
585        info!("Kafka source connector opened successfully");
586        Ok(())
587    }
588
589    #[allow(clippy::cast_possible_truncation)] // Kafka partition/offset values fit in narrower types
590    async fn poll_batch(
591        &mut self,
592        max_records: usize,
593    ) -> Result<Option<SourceBatch>, ConnectorError> {
594        if self.state != ConnectorState::Running {
595            return Err(ConnectorError::InvalidState {
596                expected: "Running".into(),
597                actual: self.state.to_string(),
598            });
599        }
600
601        // Check backpressure.
602        if self.backpressure.should_pause() {
603            self.backpressure.set_paused(true);
604            debug!("backpressure: pausing consumption");
605            return Ok(None);
606        }
607        if self.backpressure.should_resume() {
608            self.backpressure.set_paused(false);
609            debug!("backpressure: resuming consumption");
610        }
611        if self.backpressure.is_paused() {
612            return Ok(None);
613        }
614
615        // Lazily spawn the background reader task on first poll.
616        self.ensure_reader_started();
617
618        let rx = self
619            .msg_rx
620            .as_mut()
621            .ok_or_else(|| ConnectorError::InvalidState {
622                expected: "reader initialized".into(),
623                actual: "reader is None".into(),
624            })?;
625
626        let limit = max_records.min(self.config.max_poll_records);
627
628        // Drain messages from the background reader task.
629        let mut payload_buf: Vec<u8> = Vec::with_capacity(limit * 256);
630        let mut payload_offsets: Vec<(usize, usize)> = Vec::with_capacity(limit);
631        let mut total_bytes: u64 = 0;
632        let mut last_topic = String::new();
633        let mut last_partition_id: i32 = 0;
634        let mut last_offset: i64 = -1;
635        // Metadata columns (only collected when include_metadata is true).
636        let include_metadata = self.config.include_metadata;
637        let include_headers = self.config.include_headers;
638        let mut meta_partitions: Vec<i32> = if include_metadata {
639            Vec::with_capacity(limit)
640        } else {
641            Vec::new()
642        };
643        let mut meta_offsets: Vec<i64> = if include_metadata {
644            Vec::with_capacity(limit)
645        } else {
646            Vec::new()
647        };
648        let mut meta_timestamps: Vec<Option<i64>> = if include_metadata {
649            Vec::with_capacity(limit)
650        } else {
651            Vec::new()
652        };
653        let mut meta_headers: Vec<Option<String>> = if include_headers {
654            Vec::with_capacity(limit)
655        } else {
656            Vec::new()
657        };
658
659        while payload_offsets.len() < limit {
660            match rx.try_recv() {
661                Ok(kp) => {
662                    self.channel_len.fetch_sub(1, Ordering::Relaxed);
663                    total_bytes += kp.data.len() as u64;
664                    let start = payload_buf.len();
665                    payload_buf.extend_from_slice(&kp.data);
666                    payload_offsets.push((start, kp.data.len()));
667
668                    self.offsets.update_arc(&kp.topic, kp.partition, kp.offset);
669
670                    if include_metadata {
671                        meta_partitions.push(kp.partition);
672                        meta_offsets.push(kp.offset);
673                        meta_timestamps.push(kp.timestamp_ms);
674                    }
675                    if include_headers {
676                        meta_headers.push(kp.headers_json);
677                    }
678
679                    // Update watermark tracker with event timestamp.
680                    if let Some(ref mut tracker) = self.watermark_tracker {
681                        if let Some(ts) = kp.timestamp_ms {
682                            tracker.update_partition(kp.partition, ts);
683                        }
684                    }
685
686                    if last_topic.as_str() != &*kp.topic || last_partition_id != kp.partition {
687                        last_topic = kp.topic.to_string();
688                        last_partition_id = kp.partition;
689                    }
690                    last_offset = kp.offset;
691                }
692                Err(_) => break,
693            }
694        }
695
696        // Check for idle partitions on each poll cycle.
697        if let Some(ref mut tracker) = self.watermark_tracker {
698            tracker.check_idle_partitions();
699        }
700
701        // Sync rebalance counter → metrics (bridge from rdkafka background thread).
702        let rebalance_events = self.rebalance_counter.swap(0, Ordering::Relaxed);
703        for _ in 0..rebalance_events {
704            self.metrics.record_rebalance();
705        }
706
707        // Lock-free revoke detection: check if a rebalance revoke happened
708        // since the last poll cycle. If so, purge offsets for revoked partitions.
709        let current_revoke_gen = self.revoke_generation.load(Ordering::Relaxed);
710        let had_revoke = current_revoke_gen != self.last_seen_revoke_gen;
711        if had_revoke {
712            self.last_seen_revoke_gen = current_revoke_gen;
713            let assigned = match self.rebalance_state.lock() {
714                Ok(state) => state.assigned_partitions().clone(),
715                Err(poisoned) => poisoned.into_inner().assigned_partitions().clone(),
716            };
717            let before = self.offsets.partition_count();
718            self.offsets.retain_assigned(&assigned);
719            let after = self.offsets.partition_count();
720            if before != after {
721                debug!(
722                    before,
723                    after, "purged revoked partition offsets after rebalance"
724                );
725            }
726        }
727
728        // Publish current offsets to the reader task for periodic broker commits.
729        // After retain_assigned, self.offsets only contains assigned partitions.
730        if had_revoke || !payload_offsets.is_empty() {
731            if let Some(ref tx) = self.offset_commit_tx {
732                let tpl = self.offsets.to_topic_partition_list();
733                if tx.send(tpl).is_err() {
734                    debug!("offset_commit_tx closed, reader task shutting down");
735                }
736            }
737        }
738
739        if payload_offsets.is_empty() {
740            return Ok(None);
741        }
742
743        // PartitionInfo reflects the last topic-partition seen in this batch.
744        // Per-partition offsets are tracked correctly in `self.offsets` and
745        // persisted via checkpoint(); this field is informational only.
746        let last_partition = if last_offset >= 0 {
747            Some(PartitionInfo::new(
748                format!("{last_topic}-{last_partition_id}"),
749                last_offset.to_string(),
750            ))
751        } else {
752            None
753        };
754
755        // Resolve Avro schemas from Schema Registry before deserialization.
756        if let Some(avro_deser) = self
757            .deserializer
758            .as_any_mut()
759            .and_then(|any| any.downcast_mut::<AvroDeserializer>())
760        {
761            for &(start, len) in &payload_offsets {
762                if let Some(schema_id) =
763                    AvroDeserializer::extract_confluent_id(&payload_buf[start..start + len])
764                {
765                    avro_deser
766                        .ensure_schema_registered(schema_id)
767                        .await
768                        .map_err(ConnectorError::Serde)?;
769                }
770            }
771        }
772
773        let refs: Vec<&[u8]> = payload_offsets
774            .iter()
775            .map(|&(start, len)| &payload_buf[start..start + len])
776            .collect();
777
778        // Try batch deserialization first (fast path). If it fails, fall back
779        // to per-record deserialization to isolate poison pills.
780        let batch = match self.deserializer.deserialize_batch(&refs, &self.schema) {
781            Ok(batch) => batch,
782            Err(batch_err) => {
783                // Per-record fallback: deserialize one at a time, skip failures.
784                let mut good_refs = Vec::with_capacity(refs.len());
785                let mut error_count = 0u64;
786                for r in &refs {
787                    match self
788                        .deserializer
789                        .deserialize_batch(std::slice::from_ref(r), &self.schema)
790                    {
791                        Ok(_) => good_refs.push(*r),
792                        Err(e) => {
793                            error_count += 1;
794                            self.metrics.record_error();
795                            warn!(error = %e, "skipping poison pill record");
796                        }
797                    }
798                }
799                if good_refs.is_empty() {
800                    // All records failed — propagate the original batch error.
801                    return Err(ConnectorError::Serde(batch_err));
802                }
803                if error_count > 0 {
804                    warn!(
805                        skipped = error_count,
806                        total = refs.len(),
807                        "deserialized batch with poison pill isolation"
808                    );
809                }
810                self.deserializer
811                    .deserialize_batch(&good_refs, &self.schema)
812                    .map_err(ConnectorError::Serde)?
813            }
814        };
815
816        // Append metadata columns if configured.
817        let batch = if include_metadata && !meta_partitions.is_empty() {
818            use arrow_array::{Int32Array, Int64Array};
819            use arrow_schema::{DataType, Field};
820
821            let mut fields = batch.schema().fields().to_vec();
822            let mut columns: Vec<Arc<dyn arrow_array::Array>> = batch.columns().to_vec();
823
824            fields.push(Arc::new(Field::new("_partition", DataType::Int32, false)));
825            columns.push(Arc::new(Int32Array::from(meta_partitions)));
826
827            fields.push(Arc::new(Field::new("_offset", DataType::Int64, false)));
828            columns.push(Arc::new(Int64Array::from(meta_offsets)));
829
830            fields.push(Arc::new(Field::new("_timestamp", DataType::Int64, true)));
831            columns.push(Arc::new(Int64Array::from(meta_timestamps)));
832
833            if include_headers && !meta_headers.is_empty() {
834                fields.push(Arc::new(Field::new("_headers", DataType::Utf8, true)));
835                columns.push(Arc::new(arrow_array::StringArray::from(meta_headers)));
836            }
837
838            let meta_schema = Arc::new(arrow_schema::Schema::new(fields));
839            arrow_array::RecordBatch::try_new(meta_schema, columns).map_err(|e| {
840                ConnectorError::Internal(format!("failed to append metadata columns: {e}"))
841            })?
842        } else {
843            batch
844        };
845
846        let num_rows = batch.num_rows();
847        self.metrics.record_poll(num_rows as u64, total_bytes);
848
849        let source_batch = if let Some(partition) = last_partition {
850            SourceBatch::with_partition(batch, partition)
851        } else {
852            SourceBatch::new(batch)
853        };
854
855        debug!(
856            records = num_rows,
857            bytes = total_bytes,
858            "polled batch from Kafka"
859        );
860
861        Ok(Some(source_batch))
862    }
863
864    fn schema(&self) -> SchemaRef {
865        self.schema.clone()
866    }
867
868    fn checkpoint(&self) -> SourceCheckpoint {
869        // Filter to only currently assigned partitions — prevents revoked
870        // partition offsets from leaking into checkpoints or broker commits.
871        let assigned = match self.rebalance_state.lock() {
872            Ok(state) => state.assigned_partitions().clone(),
873            Err(poisoned) => poisoned.into_inner().assigned_partitions().clone(),
874        };
875
876        // Push filtered offsets to the reader task so it can commit them
877        // on shutdown even if close() is called without a subsequent checkpoint.
878        if let Some(ref tx) = self.offset_commit_tx {
879            let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
880            let _ = tx.send(tpl);
881        }
882
883        self.offsets.to_checkpoint_filtered(&assigned)
884    }
885
886    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
887        info!(
888            epoch = checkpoint.epoch(),
889            "restoring Kafka source from checkpoint"
890        );
891
892        self.offsets = OffsetTracker::from_checkpoint(checkpoint);
893
894        if let Some(ref consumer) = self.consumer {
895            let tpl = self.offsets.to_topic_partition_list();
896            consumer.assign(&tpl).map_err(|e| {
897                ConnectorError::CheckpointError(format!("failed to seek to offsets: {e}"))
898            })?;
899            info!(
900                partitions = self.offsets.partition_count(),
901                "restored consumer to checkpointed offsets"
902            );
903        }
904
905        Ok(())
906    }
907
908    fn health_check(&self) -> HealthStatus {
909        match self.state {
910            ConnectorState::Running => {
911                if self.backpressure.is_paused() {
912                    HealthStatus::Degraded("backpressure: consumption paused".into())
913                } else {
914                    HealthStatus::Healthy
915                }
916            }
917            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
918            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
919            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
920            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
921            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
922        }
923    }
924
925    fn metrics(&self) -> ConnectorMetrics {
926        self.metrics.to_connector_metrics()
927    }
928
929    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
930        Some(Arc::clone(&self.data_ready))
931    }
932
933    fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> {
934        Some(Arc::clone(&self.checkpoint_request))
935    }
936
937    async fn close(&mut self) -> Result<(), ConnectorError> {
938        info!("closing Kafka source connector");
939
940        // Filter to assigned partitions — same as checkpoint().
941        let assigned = match self.rebalance_state.lock() {
942            Ok(state) => state.assigned_partitions().clone(),
943            Err(poisoned) => poisoned.into_inner().assigned_partitions().clone(),
944        };
945
946        // Send final filtered offsets to the reader task before signaling shutdown.
947        if let Some(ref tx) = self.offset_commit_tx {
948            let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
949            if tpl.count() > 0 {
950                let _ = tx.send(tpl);
951            }
952        }
953
954        // Signal shutdown and wait for the reader task to commit and exit.
955        if let Some(tx) = self.reader_shutdown.take() {
956            let _ = tx.send(true);
957        }
958        if let Some(handle) = self.reader_handle.take() {
959            let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
960        }
961        self.msg_rx = None;
962        self.offset_commit_tx = None;
963
964        // If consumer was never moved to the reader (e.g., close() called
965        // before any poll_batch()), commit directly.
966        if let Some(ref consumer) = self.consumer {
967            let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
968            if tpl.count() > 0 {
969                if let Err(e) = consumer.commit(&tpl, CommitMode::Sync) {
970                    warn!(error = %e, "failed to commit final offsets");
971                }
972            }
973            consumer.unsubscribe();
974        }
975
976        self.consumer = None;
977        self.state = ConnectorState::Closed;
978        info!("Kafka source connector closed");
979        Ok(())
980    }
981}
982
983impl std::fmt::Debug for KafkaSource {
984    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
985        f.debug_struct("KafkaSource")
986            .field("state", &self.state)
987            .field("subscription", &self.config.subscription)
988            .field("group_id", &self.config.group_id)
989            .field("format", &self.config.format)
990            .field("partitions", &self.offsets.partition_count())
991            .finish_non_exhaustive()
992    }
993}
994
995fn select_deserializer(format: Format) -> Box<dyn RecordDeserializer> {
996    match format {
997        Format::Avro => Box::new(AvroDeserializer::new()),
998        other => serde::create_deserializer(other).unwrap_or_else(|_| {
999            warn!(format = %other, "unsupported format, falling back to JSON");
1000            Box::new(serde::json::JsonDeserializer::new())
1001        }),
1002    }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use super::*;
1008    use arrow_schema::{DataType, Field, Schema};
1009
1010    fn test_schema() -> SchemaRef {
1011        Arc::new(Schema::new(vec![
1012            Field::new("id", DataType::Int64, false),
1013            Field::new("value", DataType::Utf8, false),
1014        ]))
1015    }
1016
1017    fn test_config() -> KafkaSourceConfig {
1018        let mut cfg = KafkaSourceConfig::default();
1019        cfg.bootstrap_servers = "localhost:9092".into();
1020        cfg.group_id = "test-group".into();
1021        cfg.subscription = TopicSubscription::Topics(vec!["events".into()]);
1022        cfg
1023    }
1024
1025    #[test]
1026    fn test_new_defaults() {
1027        let source = KafkaSource::new(test_schema(), test_config());
1028        assert_eq!(source.state(), ConnectorState::Created);
1029        assert!(source.consumer.is_none());
1030        assert_eq!(source.offsets().partition_count(), 0);
1031    }
1032
1033    #[test]
1034    fn test_schema_returned() {
1035        let schema = test_schema();
1036        let source = KafkaSource::new(schema.clone(), test_config());
1037        assert_eq!(source.schema(), schema);
1038    }
1039
1040    #[test]
1041    fn test_checkpoint_empty() {
1042        let source = KafkaSource::new(test_schema(), test_config());
1043        let cp = source.checkpoint();
1044        assert!(cp.is_empty());
1045    }
1046
1047    #[test]
1048    fn test_checkpoint_with_offsets() {
1049        let mut source = KafkaSource::new(test_schema(), test_config());
1050        source.offsets.update("events", 0, 100);
1051        source.offsets.update("events", 1, 200);
1052
1053        // Simulate rebalance assign so partitions are in the assigned set.
1054        {
1055            let mut state = source.rebalance_state.lock().unwrap();
1056            state.on_assign(&[("events".into(), 0), ("events".into(), 1)]);
1057        }
1058
1059        let cp = source.checkpoint();
1060        assert_eq!(cp.get_offset("events-0"), Some("100"));
1061        assert_eq!(cp.get_offset("events-1"), Some("200"));
1062    }
1063
1064    #[test]
1065    fn test_health_check_created() {
1066        let source = KafkaSource::new(test_schema(), test_config());
1067        assert_eq!(source.health_check(), HealthStatus::Unknown);
1068    }
1069
1070    #[test]
1071    fn test_health_check_running() {
1072        let mut source = KafkaSource::new(test_schema(), test_config());
1073        source.state = ConnectorState::Running;
1074        assert_eq!(source.health_check(), HealthStatus::Healthy);
1075    }
1076
1077    #[test]
1078    fn test_health_check_closed() {
1079        let mut source = KafkaSource::new(test_schema(), test_config());
1080        source.state = ConnectorState::Closed;
1081        assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
1082    }
1083
1084    #[test]
1085    fn test_metrics_initial() {
1086        let source = KafkaSource::new(test_schema(), test_config());
1087        let m = source.metrics();
1088        assert_eq!(m.records_total, 0);
1089        assert_eq!(m.bytes_total, 0);
1090        assert_eq!(m.errors_total, 0);
1091    }
1092
1093    #[test]
1094    fn test_deserializer_selection_json() {
1095        let source = KafkaSource::new(test_schema(), test_config());
1096        assert_eq!(source.deserializer.format(), Format::Json);
1097    }
1098
1099    #[test]
1100    fn test_deserializer_selection_csv() {
1101        let mut cfg = test_config();
1102        cfg.format = Format::Csv;
1103        let source = KafkaSource::new(test_schema(), cfg);
1104        assert_eq!(source.deserializer.format(), Format::Csv);
1105    }
1106
1107    #[test]
1108    fn test_with_schema_registry() {
1109        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1110        let mut cfg = test_config();
1111        cfg.format = Format::Avro;
1112        cfg.schema_registry_url = Some("http://localhost:8081".into());
1113
1114        let source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1115        assert!(source.schema_registry.is_some());
1116        assert_eq!(source.deserializer.format(), Format::Avro);
1117    }
1118
1119    #[tokio::test]
1120    async fn test_open_preserves_injected_schema_registry() {
1121        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1122        let mut cfg = test_config();
1123        cfg.format = Format::Avro;
1124        cfg.schema_registry_url = Some("http://localhost:8081".into());
1125        let mut source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1126
1127        // open() with empty config should preserve injected SR.
1128        let empty_config = crate::config::ConnectorConfig::new("kafka");
1129        // open() will fail to connect (no broker), but the deserializer
1130        // re-selection happens before the connection attempt.
1131        let _ = source.open(&empty_config).await;
1132        assert!(source.schema_registry.is_some());
1133        assert_eq!(source.deserializer.format(), Format::Avro);
1134    }
1135
1136    #[test]
1137    fn test_debug_output() {
1138        let source = KafkaSource::new(test_schema(), test_config());
1139        let debug = format!("{source:?}");
1140        assert!(debug.contains("KafkaSource"));
1141        assert!(debug.contains("events"));
1142    }
1143
1144    #[test]
1145    fn test_checkpoint_filters_revoked_partitions() {
1146        let mut source = KafkaSource::new(test_schema(), test_config());
1147        source.offsets.update("events", 0, 100);
1148        source.offsets.update("events", 1, 200);
1149        source.offsets.update("events", 2, 300);
1150
1151        // Simulate rebalance: only partitions 0 and 2 are assigned.
1152        {
1153            let mut state = source.rebalance_state.lock().unwrap();
1154            state.on_assign(&[("events".into(), 0), ("events".into(), 2)]);
1155        }
1156
1157        let cp = source.checkpoint();
1158        assert_eq!(cp.get_offset("events-0"), Some("100"));
1159        assert_eq!(cp.get_offset("events-1"), None); // revoked — filtered out
1160        assert_eq!(cp.get_offset("events-2"), Some("300"));
1161    }
1162
1163    #[test]
1164    fn test_checkpoint_empty_before_first_rebalance() {
1165        let mut source = KafkaSource::new(test_schema(), test_config());
1166        source.offsets.update("events", 0, 100);
1167        source.offsets.update("events", 1, 200);
1168
1169        // No rebalance has occurred — assigned_partitions is empty.
1170        // No assigned partitions means no offsets should be checkpointed.
1171        let cp = source.checkpoint();
1172        assert!(cp.is_empty());
1173    }
1174}