1use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
11use rdkafka::message::Message;
12use rdkafka::ClientConfig;
13use rdkafka::TopicPartitionList;
14use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
15use std::sync::{Arc, Mutex};
16use tokio::sync::Notify;
17use tracing::{debug, info, warn};
18
19use super::rebalance::LaminarConsumerContext;
20
21use crate::checkpoint::SourceCheckpoint;
22use crate::config::{ConnectorConfig, ConnectorState};
23use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
24use crate::error::ConnectorError;
25use crate::health::HealthStatus;
26use crate::metrics::ConnectorMetrics;
27use crate::serde::{self, Format, RecordDeserializer};
28
29use super::avro::AvroDeserializer;
30use super::backpressure::KafkaBackpressureController;
31use super::config::{KafkaSourceConfig, StartupMode, TopicSubscription};
32use super::metrics::KafkaSourceMetrics;
33use super::offsets::OffsetTracker;
34use super::rebalance::RebalanceState;
35use super::schema_registry::SchemaRegistryClient;
36use super::watermarks::KafkaWatermarkTracker;
37
38struct KafkaPayload {
40 data: Vec<u8>,
41 topic: Arc<str>,
42 partition: i32,
43 offset: i64,
44 timestamp_ms: Option<i64>,
45 headers_json: Option<String>,
48}
49
50pub struct KafkaSource {
63 consumer: Option<StreamConsumer<LaminarConsumerContext>>,
64 config: KafkaSourceConfig,
65 deserializer: Box<dyn RecordDeserializer>,
66 offsets: OffsetTracker,
67 state: ConnectorState,
68 metrics: KafkaSourceMetrics,
69 schema: SchemaRef,
70 backpressure: KafkaBackpressureController,
71 channel_len: Arc<AtomicUsize>,
72 rebalance_state: Arc<Mutex<RebalanceState>>,
73 rebalance_counter: Arc<AtomicU64>,
75 revoke_generation: Arc<AtomicU64>,
82 last_seen_revoke_gen: u64,
84 schema_registry: Option<Arc<SchemaRegistryClient>>,
85 data_ready: Arc<Notify>,
86 checkpoint_request: Arc<AtomicBool>,
87 msg_rx: Option<tokio::sync::mpsc::Receiver<KafkaPayload>>,
88 reader_handle: Option<tokio::task::JoinHandle<()>>,
89 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
90 offset_commit_tx: Option<tokio::sync::watch::Sender<TopicPartitionList>>,
91 watermark_tracker: Option<KafkaWatermarkTracker>,
92}
93
94impl KafkaSource {
95 #[must_use]
102 pub fn new(schema: SchemaRef, config: KafkaSourceConfig) -> Self {
103 let deserializer = select_deserializer(config.format);
104 let channel_len = Arc::new(AtomicUsize::new(0));
105 let backpressure = KafkaBackpressureController::new(
106 config.backpressure_high_watermark,
107 config.backpressure_low_watermark,
108 config.max_poll_records * 10, Arc::clone(&channel_len),
110 );
111
112 let watermark_tracker = if config.enable_watermark_tracking {
113 Some(
114 KafkaWatermarkTracker::new(0, config.idle_timeout)
115 .with_max_out_of_orderness(config.max_out_of_orderness),
116 )
117 } else {
118 None
119 };
120
121 Self {
122 consumer: None,
123 config,
124 deserializer,
125 offsets: OffsetTracker::new(),
126 state: ConnectorState::Created,
127 metrics: KafkaSourceMetrics::new(),
128 schema,
129 backpressure,
130 channel_len,
131 rebalance_state: Arc::new(Mutex::new(RebalanceState::new())),
132 rebalance_counter: Arc::new(AtomicU64::new(0)),
133 revoke_generation: Arc::new(AtomicU64::new(0)),
134 last_seen_revoke_gen: 0,
135 schema_registry: None,
136 data_ready: Arc::new(Notify::new()),
137 checkpoint_request: Arc::new(AtomicBool::new(false)),
138 msg_rx: None,
139 reader_handle: None,
140 reader_shutdown: None,
141 offset_commit_tx: None,
142 watermark_tracker,
143 }
144 }
145
146 #[must_use]
154 pub fn with_schema_registry(
155 schema: SchemaRef,
156 config: KafkaSourceConfig,
157 sr_client: SchemaRegistryClient,
158 ) -> Self {
159 let sr = Arc::new(sr_client);
160 let deserializer: Box<dyn RecordDeserializer> = if config.format == Format::Avro {
161 Box::new(AvroDeserializer::with_schema_registry(Arc::clone(&sr)))
162 } else {
163 select_deserializer(config.format)
164 };
165
166 let channel_len = Arc::new(AtomicUsize::new(0));
167 let backpressure = KafkaBackpressureController::new(
168 config.backpressure_high_watermark,
169 config.backpressure_low_watermark,
170 config.max_poll_records * 10,
171 Arc::clone(&channel_len),
172 );
173
174 let watermark_tracker = if config.enable_watermark_tracking {
175 Some(
176 KafkaWatermarkTracker::new(0, config.idle_timeout)
177 .with_max_out_of_orderness(config.max_out_of_orderness),
178 )
179 } else {
180 None
181 };
182
183 Self {
184 consumer: None,
185 config,
186 deserializer,
187 offsets: OffsetTracker::new(),
188 state: ConnectorState::Created,
189 metrics: KafkaSourceMetrics::new(),
190 schema,
191 backpressure,
192 channel_len,
193 rebalance_state: Arc::new(Mutex::new(RebalanceState::new())),
194 rebalance_counter: Arc::new(AtomicU64::new(0)),
195 revoke_generation: Arc::new(AtomicU64::new(0)),
196 last_seen_revoke_gen: 0,
197 schema_registry: Some(sr),
198 data_ready: Arc::new(Notify::new()),
199 checkpoint_request: Arc::new(AtomicBool::new(false)),
200 msg_rx: None,
201 reader_handle: None,
202 reader_shutdown: None,
203 offset_commit_tx: None,
204 watermark_tracker,
205 }
206 }
207
208 #[must_use]
210 pub fn state(&self) -> ConnectorState {
211 self.state
212 }
213
214 #[must_use]
216 pub fn offsets(&self) -> &OffsetTracker {
217 &self.offsets
218 }
219
220 #[must_use]
222 pub fn channel_len(&self) -> Arc<AtomicUsize> {
223 Arc::clone(&self.channel_len)
224 }
225
226 #[must_use]
228 pub fn rebalance_state(&self) -> Arc<Mutex<RebalanceState>> {
229 Arc::clone(&self.rebalance_state)
230 }
231
232 #[must_use]
234 pub fn has_schema_registry(&self) -> bool {
235 self.schema_registry.is_some()
236 }
237
238 #[must_use]
243 pub fn current_watermark(&self) -> Option<i64> {
244 self.watermark_tracker
245 .as_ref()
246 .and_then(KafkaWatermarkTracker::current_watermark)
247 }
248
249 #[must_use]
254 pub fn event_time_column(&self) -> Option<&str> {
255 self.config.event_time_column.as_deref()
256 }
257
258 #[allow(clippy::too_many_lines)]
262 fn ensure_reader_started(&mut self) {
263 if self.reader_handle.is_some() || self.consumer.is_none() {
264 return;
265 }
266
267 let consumer = self.consumer.take().unwrap();
268 let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(4096);
269 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
270 let (offset_tx, offset_rx) = tokio::sync::watch::channel(TopicPartitionList::new());
271 let data_ready = Arc::clone(&self.data_ready);
272 let channel_len = Arc::clone(&self.channel_len);
273 let capture_headers = self.config.include_headers;
274 let broker_commit_interval = self.config.broker_commit_interval;
275
276 let reader_handle = tokio::spawn(async move {
279 let mut cached_topic: Arc<str> = Arc::from("");
280
281 let commit_enabled = !broker_commit_interval.is_zero();
284 let mut commit_timer = tokio::time::interval(if commit_enabled {
285 broker_commit_interval
286 } else {
287 std::time::Duration::from_secs(86400)
290 });
291 commit_timer.tick().await;
293
294 loop {
295 let msg_result = tokio::select! {
296 biased;
297 _ = shutdown_rx.changed() => break,
298 _ = commit_timer.tick(), if commit_enabled => {
299 let tpl = offset_rx.borrow().clone();
300 if tpl.count() > 0 {
301 match consumer.commit(&tpl, CommitMode::Async) {
302 Ok(()) => info!(
303 partitions = tpl.count(),
304 "periodic broker offset commit (advisory)"
305 ),
306 Err(e) => warn!(
307 error = %e,
308 "periodic broker offset commit failed (advisory)"
309 ),
310 }
311 }
312 continue;
313 },
314 msg = consumer.recv() => msg,
315 };
316 match msg_result {
317 Ok(msg) => {
318 if let Some(payload) = msg.payload() {
319 let topic = msg.topic();
320 if &*cached_topic != topic {
321 cached_topic = Arc::from(topic);
322 }
323 let timestamp_ms = match msg.timestamp() {
324 rdkafka::Timestamp::CreateTime(ts)
325 | rdkafka::Timestamp::LogAppendTime(ts) => Some(ts),
326 rdkafka::Timestamp::NotAvailable => None,
327 };
328 let headers_json = if capture_headers {
329 use rdkafka::message::Headers;
330 msg.headers().map(|hdrs| {
331 let mut json = String::from('{');
332 for i in 0..hdrs.count() {
333 let header = hdrs.get(i);
334 if i > 0 {
335 json.push(',');
336 }
337 json.push('"');
338 json.push_str(header.key);
339 json.push_str("\":\"");
340 if let Some(val) = header.value {
341 json.push_str(&String::from_utf8_lossy(val));
342 }
343 json.push('"');
344 }
345 json.push('}');
346 json
347 })
348 } else {
349 None
350 };
351 let kp = KafkaPayload {
352 data: payload.to_vec(),
353 topic: Arc::clone(&cached_topic),
354 partition: msg.partition(),
355 offset: msg.offset(),
356 timestamp_ms,
357 headers_json,
358 };
359 if msg_tx.send(kp).await.is_err() {
360 break;
361 }
362 channel_len.fetch_add(1, Ordering::Relaxed);
363 data_ready.notify_one();
364 }
365 }
366 Err(e) => {
367 warn!(error = %e, "Kafka consumer error");
368 }
369 }
370 }
371
372 let tpl = offset_rx.borrow().clone();
376 if tpl.count() > 0 {
377 match consumer.commit(&tpl, CommitMode::Sync) {
378 Ok(()) => info!(
379 partitions = tpl.count(),
380 "committed final offsets on shutdown"
381 ),
382 Err(e) => warn!(error = %e, "failed to commit final offsets on shutdown"),
383 }
384 }
385
386 consumer.unsubscribe();
387 });
388
389 self.msg_rx = Some(msg_rx);
390 self.reader_handle = Some(reader_handle);
391 self.reader_shutdown = Some(shutdown_tx);
392 self.offset_commit_tx = Some(offset_tx);
393 }
394}
395
396#[async_trait]
397#[allow(clippy::too_many_lines)] impl SourceConnector for KafkaSource {
399 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
400 self.state = ConnectorState::Initializing;
401
402 let kafka_config = if config.properties().is_empty() {
404 self.config.clone()
405 } else {
406 let parsed = KafkaSourceConfig::from_config(config)?;
407 self.config = parsed.clone();
408 parsed
409 };
410
411 if let Some(ref sr_url) = kafka_config.schema_registry_url {
413 let sr_client = if let Some(ref ca) = kafka_config.schema_registry_ssl_ca_location {
414 SchemaRegistryClient::with_tls_mtls(
415 sr_url.clone(),
416 kafka_config.schema_registry_auth.clone(),
417 ca,
418 kafka_config
419 .schema_registry_ssl_certificate_location
420 .as_deref(),
421 kafka_config.schema_registry_ssl_key_location.as_deref(),
422 )?
423 } else {
424 SchemaRegistryClient::new(sr_url.clone(), kafka_config.schema_registry_auth.clone())
425 };
426 let sr = Arc::new(sr_client);
427 self.schema_registry = Some(Arc::clone(&sr));
428 self.deserializer = if kafka_config.format == Format::Avro {
429 Box::new(AvroDeserializer::with_schema_registry(sr))
430 } else {
431 select_deserializer(kafka_config.format)
432 };
433 } else if let Some(ref sr) = self.schema_registry {
434 self.deserializer = if kafka_config.format == Format::Avro {
436 Box::new(AvroDeserializer::with_schema_registry(Arc::clone(sr)))
437 } else {
438 select_deserializer(kafka_config.format)
439 };
440 } else {
441 self.deserializer = select_deserializer(kafka_config.format);
442 }
443
444 if let Some(schema) = config.arrow_schema() {
446 info!(
447 fields = schema.fields().len(),
448 "using SQL-defined schema for deserialization"
449 );
450 self.schema = schema;
451 }
452
453 info!(
454 brokers = %kafka_config.bootstrap_servers,
455 subscription = ?kafka_config.subscription,
456 group_id = %kafka_config.group_id,
457 format = %kafka_config.format,
458 schema_fields = self.schema.fields().len(),
459 "opening Kafka source connector"
460 );
461
462 let rdkafka_config: ClientConfig = kafka_config.to_rdkafka_config();
464 let context = LaminarConsumerContext::new(
465 Arc::clone(&self.checkpoint_request),
466 Arc::clone(&self.rebalance_state),
467 Arc::clone(&self.rebalance_counter),
468 Arc::clone(&self.revoke_generation),
469 );
470 let consumer: StreamConsumer<LaminarConsumerContext> =
471 rdkafka_config.create_with_context(context).map_err(|e| {
472 ConnectorError::ConnectionFailed(format!("failed to create consumer: {e}"))
473 })?;
474
475 match &kafka_config.subscription {
477 TopicSubscription::Topics(topics) => {
478 let topic_refs: Vec<&str> = topics.iter().map(String::as_str).collect();
479 consumer.subscribe(&topic_refs).map_err(|e| {
480 ConnectorError::ConnectionFailed(format!("failed to subscribe: {e}"))
481 })?;
482 }
483 TopicSubscription::Pattern(pattern) => {
484 let regex_pattern = if pattern.starts_with('^') {
486 pattern.clone()
487 } else {
488 format!("^{pattern}")
489 };
490 consumer.subscribe(&[®ex_pattern]).map_err(|e| {
491 ConnectorError::ConnectionFailed(format!("failed to subscribe to pattern: {e}"))
492 })?;
493 }
494 }
495
496 match &kafka_config.startup_mode {
498 StartupMode::GroupOffsets | StartupMode::Earliest | StartupMode::Latest => {}
500 StartupMode::SpecificOffsets(offsets) => {
501 let mut tpl = rdkafka::TopicPartitionList::new();
502 let topics = match &kafka_config.subscription {
503 TopicSubscription::Topics(t) => t.clone(),
504 TopicSubscription::Pattern(_) => Vec::new(),
505 };
506 for topic in &topics {
507 for (&partition, &offset) in offsets {
508 tpl.add_partition_offset(topic, partition, rdkafka::Offset::Offset(offset))
509 .ok();
510 }
511 }
512 if tpl.count() > 0 {
513 consumer.assign(&tpl).map_err(|e| {
514 ConnectorError::ConnectionFailed(format!(
515 "failed to assign specific offsets: {e}"
516 ))
517 })?;
518 info!(
519 partitions = tpl.count(),
520 "assigned consumer to specific offsets"
521 );
522 }
523 }
524 StartupMode::Timestamp(ts_ms) => {
525 let mut tpl = rdkafka::TopicPartitionList::new();
529 let topics = match &kafka_config.subscription {
530 TopicSubscription::Topics(t) => t.clone(),
531 TopicSubscription::Pattern(_) => Vec::new(),
532 };
533 if let Ok(metadata) = consumer.fetch_metadata(
535 topics.first().map(String::as_str),
536 std::time::Duration::from_secs(10),
537 ) {
538 for topic_meta in metadata.topics() {
539 for partition_meta in topic_meta.partitions() {
540 tpl.add_partition_offset(
541 topic_meta.name(),
542 partition_meta.id(),
543 rdkafka::Offset::Offset(*ts_ms),
544 )
545 .ok();
546 }
547 }
548 }
549 if tpl.count() > 0 {
550 match consumer.offsets_for_times(tpl, std::time::Duration::from_secs(10)) {
551 Ok(resolved) => {
552 consumer.assign(&resolved).map_err(|e| {
553 ConnectorError::ConnectionFailed(format!(
554 "failed to assign timestamp offsets: {e}"
555 ))
556 })?;
557 info!(
558 timestamp_ms = ts_ms,
559 partitions = resolved.count(),
560 "assigned consumer to timestamp offsets"
561 );
562 }
563 Err(e) => {
564 warn!(
565 error = %e,
566 timestamp_ms = ts_ms,
567 "failed to resolve timestamp offsets, falling back to group offsets"
568 );
569 }
570 }
571 }
572 }
573 }
574
575 self.consumer = Some(consumer);
576 self.state = ConnectorState::Running;
577
578 info!("Kafka source connector opened successfully");
586 Ok(())
587 }
588
589 #[allow(clippy::cast_possible_truncation)] async fn poll_batch(
591 &mut self,
592 max_records: usize,
593 ) -> Result<Option<SourceBatch>, ConnectorError> {
594 if self.state != ConnectorState::Running {
595 return Err(ConnectorError::InvalidState {
596 expected: "Running".into(),
597 actual: self.state.to_string(),
598 });
599 }
600
601 if self.backpressure.should_pause() {
603 self.backpressure.set_paused(true);
604 debug!("backpressure: pausing consumption");
605 return Ok(None);
606 }
607 if self.backpressure.should_resume() {
608 self.backpressure.set_paused(false);
609 debug!("backpressure: resuming consumption");
610 }
611 if self.backpressure.is_paused() {
612 return Ok(None);
613 }
614
615 self.ensure_reader_started();
617
618 let rx = self
619 .msg_rx
620 .as_mut()
621 .ok_or_else(|| ConnectorError::InvalidState {
622 expected: "reader initialized".into(),
623 actual: "reader is None".into(),
624 })?;
625
626 let limit = max_records.min(self.config.max_poll_records);
627
628 let mut payload_buf: Vec<u8> = Vec::with_capacity(limit * 256);
630 let mut payload_offsets: Vec<(usize, usize)> = Vec::with_capacity(limit);
631 let mut total_bytes: u64 = 0;
632 let mut last_topic = String::new();
633 let mut last_partition_id: i32 = 0;
634 let mut last_offset: i64 = -1;
635 let include_metadata = self.config.include_metadata;
637 let include_headers = self.config.include_headers;
638 let mut meta_partitions: Vec<i32> = if include_metadata {
639 Vec::with_capacity(limit)
640 } else {
641 Vec::new()
642 };
643 let mut meta_offsets: Vec<i64> = if include_metadata {
644 Vec::with_capacity(limit)
645 } else {
646 Vec::new()
647 };
648 let mut meta_timestamps: Vec<Option<i64>> = if include_metadata {
649 Vec::with_capacity(limit)
650 } else {
651 Vec::new()
652 };
653 let mut meta_headers: Vec<Option<String>> = if include_headers {
654 Vec::with_capacity(limit)
655 } else {
656 Vec::new()
657 };
658
659 while payload_offsets.len() < limit {
660 match rx.try_recv() {
661 Ok(kp) => {
662 self.channel_len.fetch_sub(1, Ordering::Relaxed);
663 total_bytes += kp.data.len() as u64;
664 let start = payload_buf.len();
665 payload_buf.extend_from_slice(&kp.data);
666 payload_offsets.push((start, kp.data.len()));
667
668 self.offsets.update_arc(&kp.topic, kp.partition, kp.offset);
669
670 if include_metadata {
671 meta_partitions.push(kp.partition);
672 meta_offsets.push(kp.offset);
673 meta_timestamps.push(kp.timestamp_ms);
674 }
675 if include_headers {
676 meta_headers.push(kp.headers_json);
677 }
678
679 if let Some(ref mut tracker) = self.watermark_tracker {
681 if let Some(ts) = kp.timestamp_ms {
682 tracker.update_partition(kp.partition, ts);
683 }
684 }
685
686 if last_topic.as_str() != &*kp.topic || last_partition_id != kp.partition {
687 last_topic = kp.topic.to_string();
688 last_partition_id = kp.partition;
689 }
690 last_offset = kp.offset;
691 }
692 Err(_) => break,
693 }
694 }
695
696 if let Some(ref mut tracker) = self.watermark_tracker {
698 tracker.check_idle_partitions();
699 }
700
701 let rebalance_events = self.rebalance_counter.swap(0, Ordering::Relaxed);
703 for _ in 0..rebalance_events {
704 self.metrics.record_rebalance();
705 }
706
707 let current_revoke_gen = self.revoke_generation.load(Ordering::Relaxed);
710 let had_revoke = current_revoke_gen != self.last_seen_revoke_gen;
711 if had_revoke {
712 self.last_seen_revoke_gen = current_revoke_gen;
713 let assigned = match self.rebalance_state.lock() {
714 Ok(state) => state.assigned_partitions().clone(),
715 Err(poisoned) => poisoned.into_inner().assigned_partitions().clone(),
716 };
717 let before = self.offsets.partition_count();
718 self.offsets.retain_assigned(&assigned);
719 let after = self.offsets.partition_count();
720 if before != after {
721 debug!(
722 before,
723 after, "purged revoked partition offsets after rebalance"
724 );
725 }
726 }
727
728 if had_revoke || !payload_offsets.is_empty() {
731 if let Some(ref tx) = self.offset_commit_tx {
732 let tpl = self.offsets.to_topic_partition_list();
733 if tx.send(tpl).is_err() {
734 debug!("offset_commit_tx closed, reader task shutting down");
735 }
736 }
737 }
738
739 if payload_offsets.is_empty() {
740 return Ok(None);
741 }
742
743 let last_partition = if last_offset >= 0 {
747 Some(PartitionInfo::new(
748 format!("{last_topic}-{last_partition_id}"),
749 last_offset.to_string(),
750 ))
751 } else {
752 None
753 };
754
755 if let Some(avro_deser) = self
757 .deserializer
758 .as_any_mut()
759 .and_then(|any| any.downcast_mut::<AvroDeserializer>())
760 {
761 for &(start, len) in &payload_offsets {
762 if let Some(schema_id) =
763 AvroDeserializer::extract_confluent_id(&payload_buf[start..start + len])
764 {
765 avro_deser
766 .ensure_schema_registered(schema_id)
767 .await
768 .map_err(ConnectorError::Serde)?;
769 }
770 }
771 }
772
773 let refs: Vec<&[u8]> = payload_offsets
774 .iter()
775 .map(|&(start, len)| &payload_buf[start..start + len])
776 .collect();
777
778 let batch = match self.deserializer.deserialize_batch(&refs, &self.schema) {
781 Ok(batch) => batch,
782 Err(batch_err) => {
783 let mut good_refs = Vec::with_capacity(refs.len());
785 let mut error_count = 0u64;
786 for r in &refs {
787 match self
788 .deserializer
789 .deserialize_batch(std::slice::from_ref(r), &self.schema)
790 {
791 Ok(_) => good_refs.push(*r),
792 Err(e) => {
793 error_count += 1;
794 self.metrics.record_error();
795 warn!(error = %e, "skipping poison pill record");
796 }
797 }
798 }
799 if good_refs.is_empty() {
800 return Err(ConnectorError::Serde(batch_err));
802 }
803 if error_count > 0 {
804 warn!(
805 skipped = error_count,
806 total = refs.len(),
807 "deserialized batch with poison pill isolation"
808 );
809 }
810 self.deserializer
811 .deserialize_batch(&good_refs, &self.schema)
812 .map_err(ConnectorError::Serde)?
813 }
814 };
815
816 let batch = if include_metadata && !meta_partitions.is_empty() {
818 use arrow_array::{Int32Array, Int64Array};
819 use arrow_schema::{DataType, Field};
820
821 let mut fields = batch.schema().fields().to_vec();
822 let mut columns: Vec<Arc<dyn arrow_array::Array>> = batch.columns().to_vec();
823
824 fields.push(Arc::new(Field::new("_partition", DataType::Int32, false)));
825 columns.push(Arc::new(Int32Array::from(meta_partitions)));
826
827 fields.push(Arc::new(Field::new("_offset", DataType::Int64, false)));
828 columns.push(Arc::new(Int64Array::from(meta_offsets)));
829
830 fields.push(Arc::new(Field::new("_timestamp", DataType::Int64, true)));
831 columns.push(Arc::new(Int64Array::from(meta_timestamps)));
832
833 if include_headers && !meta_headers.is_empty() {
834 fields.push(Arc::new(Field::new("_headers", DataType::Utf8, true)));
835 columns.push(Arc::new(arrow_array::StringArray::from(meta_headers)));
836 }
837
838 let meta_schema = Arc::new(arrow_schema::Schema::new(fields));
839 arrow_array::RecordBatch::try_new(meta_schema, columns).map_err(|e| {
840 ConnectorError::Internal(format!("failed to append metadata columns: {e}"))
841 })?
842 } else {
843 batch
844 };
845
846 let num_rows = batch.num_rows();
847 self.metrics.record_poll(num_rows as u64, total_bytes);
848
849 let source_batch = if let Some(partition) = last_partition {
850 SourceBatch::with_partition(batch, partition)
851 } else {
852 SourceBatch::new(batch)
853 };
854
855 debug!(
856 records = num_rows,
857 bytes = total_bytes,
858 "polled batch from Kafka"
859 );
860
861 Ok(Some(source_batch))
862 }
863
864 fn schema(&self) -> SchemaRef {
865 self.schema.clone()
866 }
867
868 fn checkpoint(&self) -> SourceCheckpoint {
869 let assigned = match self.rebalance_state.lock() {
872 Ok(state) => state.assigned_partitions().clone(),
873 Err(poisoned) => poisoned.into_inner().assigned_partitions().clone(),
874 };
875
876 if let Some(ref tx) = self.offset_commit_tx {
879 let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
880 let _ = tx.send(tpl);
881 }
882
883 self.offsets.to_checkpoint_filtered(&assigned)
884 }
885
886 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
887 info!(
888 epoch = checkpoint.epoch(),
889 "restoring Kafka source from checkpoint"
890 );
891
892 self.offsets = OffsetTracker::from_checkpoint(checkpoint);
893
894 if let Some(ref consumer) = self.consumer {
895 let tpl = self.offsets.to_topic_partition_list();
896 consumer.assign(&tpl).map_err(|e| {
897 ConnectorError::CheckpointError(format!("failed to seek to offsets: {e}"))
898 })?;
899 info!(
900 partitions = self.offsets.partition_count(),
901 "restored consumer to checkpointed offsets"
902 );
903 }
904
905 Ok(())
906 }
907
908 fn health_check(&self) -> HealthStatus {
909 match self.state {
910 ConnectorState::Running => {
911 if self.backpressure.is_paused() {
912 HealthStatus::Degraded("backpressure: consumption paused".into())
913 } else {
914 HealthStatus::Healthy
915 }
916 }
917 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
918 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
919 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
920 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
921 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
922 }
923 }
924
925 fn metrics(&self) -> ConnectorMetrics {
926 self.metrics.to_connector_metrics()
927 }
928
929 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
930 Some(Arc::clone(&self.data_ready))
931 }
932
933 fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> {
934 Some(Arc::clone(&self.checkpoint_request))
935 }
936
937 async fn close(&mut self) -> Result<(), ConnectorError> {
938 info!("closing Kafka source connector");
939
940 let assigned = match self.rebalance_state.lock() {
942 Ok(state) => state.assigned_partitions().clone(),
943 Err(poisoned) => poisoned.into_inner().assigned_partitions().clone(),
944 };
945
946 if let Some(ref tx) = self.offset_commit_tx {
948 let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
949 if tpl.count() > 0 {
950 let _ = tx.send(tpl);
951 }
952 }
953
954 if let Some(tx) = self.reader_shutdown.take() {
956 let _ = tx.send(true);
957 }
958 if let Some(handle) = self.reader_handle.take() {
959 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
960 }
961 self.msg_rx = None;
962 self.offset_commit_tx = None;
963
964 if let Some(ref consumer) = self.consumer {
967 let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
968 if tpl.count() > 0 {
969 if let Err(e) = consumer.commit(&tpl, CommitMode::Sync) {
970 warn!(error = %e, "failed to commit final offsets");
971 }
972 }
973 consumer.unsubscribe();
974 }
975
976 self.consumer = None;
977 self.state = ConnectorState::Closed;
978 info!("Kafka source connector closed");
979 Ok(())
980 }
981}
982
983impl std::fmt::Debug for KafkaSource {
984 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
985 f.debug_struct("KafkaSource")
986 .field("state", &self.state)
987 .field("subscription", &self.config.subscription)
988 .field("group_id", &self.config.group_id)
989 .field("format", &self.config.format)
990 .field("partitions", &self.offsets.partition_count())
991 .finish_non_exhaustive()
992 }
993}
994
995fn select_deserializer(format: Format) -> Box<dyn RecordDeserializer> {
996 match format {
997 Format::Avro => Box::new(AvroDeserializer::new()),
998 other => serde::create_deserializer(other).unwrap_or_else(|_| {
999 warn!(format = %other, "unsupported format, falling back to JSON");
1000 Box::new(serde::json::JsonDeserializer::new())
1001 }),
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use super::*;
1008 use arrow_schema::{DataType, Field, Schema};
1009
1010 fn test_schema() -> SchemaRef {
1011 Arc::new(Schema::new(vec![
1012 Field::new("id", DataType::Int64, false),
1013 Field::new("value", DataType::Utf8, false),
1014 ]))
1015 }
1016
1017 fn test_config() -> KafkaSourceConfig {
1018 let mut cfg = KafkaSourceConfig::default();
1019 cfg.bootstrap_servers = "localhost:9092".into();
1020 cfg.group_id = "test-group".into();
1021 cfg.subscription = TopicSubscription::Topics(vec!["events".into()]);
1022 cfg
1023 }
1024
1025 #[test]
1026 fn test_new_defaults() {
1027 let source = KafkaSource::new(test_schema(), test_config());
1028 assert_eq!(source.state(), ConnectorState::Created);
1029 assert!(source.consumer.is_none());
1030 assert_eq!(source.offsets().partition_count(), 0);
1031 }
1032
1033 #[test]
1034 fn test_schema_returned() {
1035 let schema = test_schema();
1036 let source = KafkaSource::new(schema.clone(), test_config());
1037 assert_eq!(source.schema(), schema);
1038 }
1039
1040 #[test]
1041 fn test_checkpoint_empty() {
1042 let source = KafkaSource::new(test_schema(), test_config());
1043 let cp = source.checkpoint();
1044 assert!(cp.is_empty());
1045 }
1046
1047 #[test]
1048 fn test_checkpoint_with_offsets() {
1049 let mut source = KafkaSource::new(test_schema(), test_config());
1050 source.offsets.update("events", 0, 100);
1051 source.offsets.update("events", 1, 200);
1052
1053 {
1055 let mut state = source.rebalance_state.lock().unwrap();
1056 state.on_assign(&[("events".into(), 0), ("events".into(), 1)]);
1057 }
1058
1059 let cp = source.checkpoint();
1060 assert_eq!(cp.get_offset("events-0"), Some("100"));
1061 assert_eq!(cp.get_offset("events-1"), Some("200"));
1062 }
1063
1064 #[test]
1065 fn test_health_check_created() {
1066 let source = KafkaSource::new(test_schema(), test_config());
1067 assert_eq!(source.health_check(), HealthStatus::Unknown);
1068 }
1069
1070 #[test]
1071 fn test_health_check_running() {
1072 let mut source = KafkaSource::new(test_schema(), test_config());
1073 source.state = ConnectorState::Running;
1074 assert_eq!(source.health_check(), HealthStatus::Healthy);
1075 }
1076
1077 #[test]
1078 fn test_health_check_closed() {
1079 let mut source = KafkaSource::new(test_schema(), test_config());
1080 source.state = ConnectorState::Closed;
1081 assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
1082 }
1083
1084 #[test]
1085 fn test_metrics_initial() {
1086 let source = KafkaSource::new(test_schema(), test_config());
1087 let m = source.metrics();
1088 assert_eq!(m.records_total, 0);
1089 assert_eq!(m.bytes_total, 0);
1090 assert_eq!(m.errors_total, 0);
1091 }
1092
1093 #[test]
1094 fn test_deserializer_selection_json() {
1095 let source = KafkaSource::new(test_schema(), test_config());
1096 assert_eq!(source.deserializer.format(), Format::Json);
1097 }
1098
1099 #[test]
1100 fn test_deserializer_selection_csv() {
1101 let mut cfg = test_config();
1102 cfg.format = Format::Csv;
1103 let source = KafkaSource::new(test_schema(), cfg);
1104 assert_eq!(source.deserializer.format(), Format::Csv);
1105 }
1106
1107 #[test]
1108 fn test_with_schema_registry() {
1109 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1110 let mut cfg = test_config();
1111 cfg.format = Format::Avro;
1112 cfg.schema_registry_url = Some("http://localhost:8081".into());
1113
1114 let source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1115 assert!(source.schema_registry.is_some());
1116 assert_eq!(source.deserializer.format(), Format::Avro);
1117 }
1118
1119 #[tokio::test]
1120 async fn test_open_preserves_injected_schema_registry() {
1121 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1122 let mut cfg = test_config();
1123 cfg.format = Format::Avro;
1124 cfg.schema_registry_url = Some("http://localhost:8081".into());
1125 let mut source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1126
1127 let empty_config = crate::config::ConnectorConfig::new("kafka");
1129 let _ = source.open(&empty_config).await;
1132 assert!(source.schema_registry.is_some());
1133 assert_eq!(source.deserializer.format(), Format::Avro);
1134 }
1135
1136 #[test]
1137 fn test_debug_output() {
1138 let source = KafkaSource::new(test_schema(), test_config());
1139 let debug = format!("{source:?}");
1140 assert!(debug.contains("KafkaSource"));
1141 assert!(debug.contains("events"));
1142 }
1143
1144 #[test]
1145 fn test_checkpoint_filters_revoked_partitions() {
1146 let mut source = KafkaSource::new(test_schema(), test_config());
1147 source.offsets.update("events", 0, 100);
1148 source.offsets.update("events", 1, 200);
1149 source.offsets.update("events", 2, 300);
1150
1151 {
1153 let mut state = source.rebalance_state.lock().unwrap();
1154 state.on_assign(&[("events".into(), 0), ("events".into(), 2)]);
1155 }
1156
1157 let cp = source.checkpoint();
1158 assert_eq!(cp.get_offset("events-0"), Some("100"));
1159 assert_eq!(cp.get_offset("events-1"), None); assert_eq!(cp.get_offset("events-2"), Some("300"));
1161 }
1162
1163 #[test]
1164 fn test_checkpoint_empty_before_first_rebalance() {
1165 let mut source = KafkaSource::new(test_schema(), test_config());
1166 source.offsets.update("events", 0, 100);
1167 source.offsets.update("events", 1, 200);
1168
1169 let cp = source.checkpoint();
1172 assert!(cp.is_empty());
1173 }
1174}