1use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
11use rdkafka::message::Message;
12use rdkafka::ClientConfig;
13use rdkafka::TopicPartitionList;
14use std::collections::BTreeSet;
15use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17use tokio::sync::Notify;
18use tracing::{debug, info, warn};
19
20use super::rebalance::LaminarConsumerContext;
21
22fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
28 mutex.lock().unwrap_or_else(|poisoned| {
29 tracing::warn!("mutex poisoned, recovering");
30 poisoned.into_inner()
31 })
32}
33
34use crate::checkpoint::SourceCheckpoint;
35use crate::config::{ConnectorConfig, ConnectorState};
36use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
37use crate::error::ConnectorError;
38use crate::health::HealthStatus;
39use crate::metrics::ConnectorMetrics;
40use crate::serde::{self, Format, RecordDeserializer};
41
42use super::avro::AvroDeserializer;
43use super::config::{
44 resolve_value_subject, KafkaSourceConfig, SchemaEvolutionStrategy, StartupMode,
45 TopicSubscription,
46};
47use super::metrics::KafkaSourceMetrics;
48use super::offsets::OffsetTracker;
49use super::rebalance::RebalanceState;
50use super::schema_registry::SchemaRegistryClient;
51use super::watermarks::KafkaWatermarkTracker;
52
53use crate::schema::evolution::SchemaEvolution;
54use crate::schema::traits::{CompatibilityMode, EvolutionVerdict};
55
56struct KafkaPayload {
58 data: Vec<u8>,
59 topic: Arc<str>,
60 partition: i32,
61 offset: i64,
62 timestamp_ms: Option<i64>,
63 headers_json: Option<String>,
66}
67
68type KafkaPayloadRx = crossfire::AsyncRx<crossfire::mpsc::Array<KafkaPayload>>;
70
71pub struct KafkaSource {
84 consumer: Option<StreamConsumer<LaminarConsumerContext>>,
85 config: KafkaSourceConfig,
86 deserializer: Box<dyn RecordDeserializer>,
87 offsets: OffsetTracker,
88 state: ConnectorState,
89 metrics: KafkaSourceMetrics,
90 schema: SchemaRef,
91 channel_len: Arc<AtomicUsize>,
92 rebalance_state: Arc<Mutex<RebalanceState>>,
93 rebalance_counter: Arc<AtomicU64>,
95 revoke_generation: Arc<AtomicU64>,
102 last_seen_revoke_gen: u64,
104 schema_registry: Option<Arc<SchemaRegistryClient>>,
105 data_ready: Arc<Notify>,
106 checkpoint_request: Arc<AtomicBool>,
107 msg_rx: Option<KafkaPayloadRx>,
108 reader_handle: Option<tokio::task::JoinHandle<()>>,
109 commit_handle: Option<tokio::task::JoinHandle<()>>,
110 hwm_handle: Option<tokio::task::JoinHandle<()>>,
111 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
112 offset_commit_tx: Option<tokio::sync::watch::Sender<TopicPartitionList>>,
113 watermark_tracker: Option<KafkaWatermarkTracker>,
114 #[allow(clippy::type_complexity)]
117 high_watermarks_rx: Option<tokio::sync::watch::Receiver<Vec<(Arc<str>, i32, i64)>>>,
118 reader_paused: Arc<AtomicBool>,
122 commit_retry_needed: Arc<AtomicBool>,
125 offset_snapshot: Arc<Mutex<OffsetTracker>>,
128
129 last_avro_schema: Option<SchemaRef>,
132
133 poll_payload_buf: Vec<u8>,
135 poll_payload_offsets: Vec<(usize, usize)>,
136 poll_meta_partitions: Vec<i32>,
137 poll_meta_offsets: Vec<i64>,
138 poll_meta_timestamps: Vec<Option<i64>>,
139 poll_meta_headers: Vec<Option<String>>,
140}
141
142impl KafkaSource {
143 #[must_use]
145 pub fn new(
146 schema: SchemaRef,
147 config: KafkaSourceConfig,
148 registry: Option<&prometheus::Registry>,
149 ) -> Self {
150 Self::build_base(schema, config, select_deserializer, None, registry)
151 }
152
153 #[must_use]
155 pub fn with_schema_registry(
156 schema: SchemaRef,
157 config: KafkaSourceConfig,
158 sr_client: SchemaRegistryClient,
159 ) -> Self {
160 let sr = Arc::new(sr_client);
161 let sr_clone = Arc::clone(&sr);
162 let deser_factory = move |format: Format| -> Box<dyn RecordDeserializer> {
163 if format == Format::Avro {
164 Box::new(AvroDeserializer::with_schema_registry(sr_clone))
165 } else {
166 select_deserializer(format)
167 }
168 };
169 Self::build_base(schema, config, deser_factory, Some(sr), None)
170 }
171
172 fn build_sr_client(
175 config: &KafkaSourceConfig,
176 ) -> Result<Option<SchemaRegistryClient>, ConnectorError> {
177 let Some(sr_url) = config.schema_registry_url.as_ref() else {
178 return Ok(None);
179 };
180 let client = if let Some(ca) = config.schema_registry_ssl_ca_location.as_deref() {
181 SchemaRegistryClient::with_tls_mtls(
182 sr_url.clone(),
183 config.schema_registry_auth.clone(),
184 ca,
185 config.schema_registry_ssl_certificate_location.as_deref(),
186 config.schema_registry_ssl_key_location.as_deref(),
187 )?
188 } else {
189 SchemaRegistryClient::new(sr_url.clone(), config.schema_registry_auth.clone())
190 };
191 Ok(Some(client))
192 }
193
194 fn build_base(
195 schema: SchemaRef,
196 config: KafkaSourceConfig,
197 deser_factory: impl FnOnce(Format) -> Box<dyn RecordDeserializer>,
198 schema_registry: Option<Arc<SchemaRegistryClient>>,
199 registry: Option<&prometheus::Registry>,
200 ) -> Self {
201 let deserializer = deser_factory(config.format);
202 let channel_len = Arc::new(AtomicUsize::new(0));
203
204 let watermark_tracker = if config.enable_watermark_tracking {
205 Some(
206 KafkaWatermarkTracker::new(0, config.idle_timeout)
207 .with_max_out_of_orderness(config.max_out_of_orderness),
208 )
209 } else {
210 None
211 };
212
213 Self {
214 consumer: None,
215 config,
216 deserializer,
217 offsets: OffsetTracker::new(),
218 state: ConnectorState::Created,
219 metrics: KafkaSourceMetrics::new(registry),
220 schema,
221 channel_len,
222 rebalance_state: Arc::new(Mutex::new(RebalanceState::new())),
223 rebalance_counter: Arc::new(AtomicU64::new(0)),
224 revoke_generation: Arc::new(AtomicU64::new(0)),
225 last_seen_revoke_gen: 0,
226 schema_registry,
227 data_ready: Arc::new(Notify::new()),
228 checkpoint_request: Arc::new(AtomicBool::new(false)),
229 msg_rx: None,
230 reader_handle: None,
231 commit_handle: None,
232 hwm_handle: None,
233 reader_shutdown: None,
234 offset_commit_tx: None,
235 watermark_tracker,
236 high_watermarks_rx: None,
237 reader_paused: Arc::new(AtomicBool::new(false)),
238 commit_retry_needed: Arc::new(AtomicBool::new(false)),
239 offset_snapshot: Arc::new(Mutex::new(OffsetTracker::new())),
240 last_avro_schema: None,
241 poll_payload_buf: Vec::new(),
242 poll_payload_offsets: Vec::new(),
243 poll_meta_partitions: Vec::new(),
244 poll_meta_offsets: Vec::new(),
245 poll_meta_timestamps: Vec::new(),
246 poll_meta_headers: Vec::new(),
247 }
248 }
249
250 #[must_use]
252 pub fn state(&self) -> ConnectorState {
253 self.state
254 }
255
256 #[must_use]
258 pub fn offsets(&self) -> &OffsetTracker {
259 &self.offsets
260 }
261
262 #[must_use]
264 pub fn channel_len(&self) -> Arc<AtomicUsize> {
265 Arc::clone(&self.channel_len)
266 }
267
268 #[must_use]
270 pub fn rebalance_state(&self) -> Arc<Mutex<RebalanceState>> {
271 Arc::clone(&self.rebalance_state)
272 }
273
274 #[must_use]
276 pub fn has_schema_registry(&self) -> bool {
277 self.schema_registry.is_some()
278 }
279
280 #[must_use]
285 pub fn current_watermark(&self) -> Option<i64> {
286 self.watermark_tracker
287 .as_ref()
288 .and_then(KafkaWatermarkTracker::current_watermark)
289 }
290
291 #[must_use]
296 pub fn event_time_column(&self) -> Option<&str> {
297 self.config.event_time_column.as_deref()
298 }
299
300 #[allow(clippy::too_many_lines)]
309 fn ensure_reader_started(&mut self) {
310 if self.reader_handle.is_some() || self.consumer.is_none() {
311 return;
312 }
313
314 let consumer = Arc::new(self.consumer.take().unwrap());
315 let (msg_tx, msg_rx) =
316 crossfire::mpsc::bounded_async::<KafkaPayload>(self.config.reader_channel_capacity);
317 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
318 let (offset_tx, offset_rx) = tokio::sync::watch::channel(TopicPartitionList::new());
319 let reader_offset_rx = offset_rx.clone(); let (hwm_tx, hwm_rx) = tokio::sync::watch::channel(Vec::new());
321 let (seen_tx, seen_rx) = tokio::sync::watch::channel(Vec::<(Arc<str>, i32)>::new());
323 let data_ready = Arc::clone(&self.data_ready);
324 let channel_len = Arc::clone(&self.channel_len);
325 let capture_headers = self.config.include_headers;
326 let broker_commit_interval = self.config.broker_commit_interval;
327 let reader_channel_capacity = self.config.reader_channel_capacity;
328 let reader_paused = Arc::clone(&self.reader_paused);
329 let revoke_generation = Arc::clone(&self.revoke_generation);
330 let rebalance_state = Arc::clone(&self.rebalance_state);
331 let commit_retry_needed = Arc::clone(&self.commit_retry_needed);
332 let pause_threshold = self.config.backpressure_high_watermark;
333 let resume_threshold = self.config.backpressure_low_watermark;
334
335 let commit_consumer = Arc::clone(&consumer);
337 let commit_retry = Arc::clone(&commit_retry_needed);
338 let commit_metrics = self.metrics.clone();
339 let mut commit_shutdown = shutdown_rx.clone();
340 let commit_handle = tokio::spawn(async move {
341 if broker_commit_interval.is_zero() {
342 let _ = commit_shutdown.changed().await;
344 return;
345 }
346 let mut timer = tokio::time::interval(broker_commit_interval);
347 timer.tick().await; loop {
350 tokio::select! {
351 biased;
352 _ = commit_shutdown.changed() => break,
353 _ = timer.tick() => {
354 let tpl = offset_rx.borrow().clone();
355 if tpl.count() == 0 { continue; }
356 if commit_retry
357 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
358 .is_ok()
359 {
360 let c = Arc::clone(&commit_consumer);
361 let flag = Arc::clone(&commit_retry);
362 let n = tpl.count();
363 match tokio::time::timeout(
373 std::time::Duration::from_secs(10),
374 tokio::task::spawn_blocking(move || {
375 c.commit(&tpl, CommitMode::Sync)
376 }),
377 )
378 .await
379 {
380 Ok(Ok(Ok(()))) => {
381 info!(partition_count = n, "sync offset commit retry succeeded");
383 }
384 Ok(Ok(Err(e))) => {
385 flag.store(true, Ordering::Release);
387 warn!(error = %e, "sync offset commit retry failed");
388 }
389 Ok(Err(e)) => {
390 commit_metrics.record_commit_failure();
391 warn!(error = %e, "sync commit blocking task panicked");
392 }
393 Err(_) => {
394 commit_metrics.record_commit_failure();
395 flag.store(true, Ordering::Release);
396 warn!("sync offset commit retry timed out");
397 }
398 }
399 } else {
400 match commit_consumer.commit(&tpl, CommitMode::Async) {
409 Ok(()) => {
410 info!(partition_count = tpl.count(), "periodic broker offset commit (advisory)");
411 }
412 Err(e) => {
413 commit_metrics.record_commit_failure();
414 warn!(error = %e, "periodic broker offset commit failed");
415 }
416 }
417 }
418 }
419 }
420 }
421 });
422
423 let hwm_consumer = Arc::clone(&consumer);
425 let mut hwm_shutdown = shutdown_rx.clone();
426 let hwm_handle = tokio::spawn(async move {
427 let mut timer = tokio::time::interval(std::time::Duration::from_secs(30));
428 timer.tick().await; loop {
431 tokio::select! {
432 biased;
433 _ = hwm_shutdown.changed() => break,
434 _ = timer.tick() => {
435 let partitions: Vec<_> = seen_rx.borrow().clone();
436 if partitions.is_empty() { continue; }
437 let c = Arc::clone(&hwm_consumer);
438 let watermarks = tokio::time::timeout(
439 std::time::Duration::from_secs(10),
440 tokio::task::spawn_blocking(move || {
441 let mut results = Vec::with_capacity(partitions.len());
442 for (topic, partition) in &partitions {
443 match c.fetch_watermarks(topic, *partition, std::time::Duration::from_secs(1)) {
444 Ok((_low, high)) => results.push((Arc::clone(topic), *partition, high)),
445 Err(e) => debug!(%topic, partition, error = %e, "HWM fetch failed"),
446 }
447 }
448 results
449 }),
450 )
451 .await
452 .unwrap_or(Ok(Vec::new()))
453 .unwrap_or_default();
454 if !watermarks.is_empty() {
455 let _ = hwm_tx.send(watermarks);
456 }
457 }
458 }
459 }
460 });
461
462 let mut reader_shutdown = shutdown_rx;
464 let reader_handle = tokio::spawn(async move {
465 let mut cached_topic: Arc<str> = Arc::from("");
466 let mut seen_partitions: std::collections::HashSet<(Arc<str>, i32)> =
467 std::collections::HashSet::new();
468 let mut is_paused = false;
469 let mut last_revoke_gen: u64 = 0;
470
471 loop {
472 let current_gen = revoke_generation.load(Ordering::Acquire);
474 if current_gen != last_revoke_gen {
475 last_revoke_gen = current_gen;
476 let assigned = lock_or_recover(&rebalance_state)
477 .assigned_partitions()
478 .clone();
479 seen_partitions.retain(|(t, p)| assigned.contains(&(t.to_string(), *p)));
480 let _ = seen_tx.send(seen_partitions.iter().cloned().collect());
481 }
482
483 #[allow(clippy::cast_precision_loss)]
485 let fill = if reader_channel_capacity > 0 {
486 channel_len.load(Ordering::Acquire) as f64 / reader_channel_capacity as f64
487 } else {
488 0.0
489 };
490 if fill >= pause_threshold && !is_paused {
491 if let Ok(assignment) = consumer.assignment() {
492 if consumer.pause(&assignment).is_ok() {
493 is_paused = true;
494 reader_paused.store(true, Ordering::Release);
495 debug!("reader: paused Kafka partitions (fill={fill:.2})");
496 }
497 }
498 } else if fill <= resume_threshold && is_paused {
499 if let Ok(assignment) = consumer.assignment() {
500 if consumer.resume(&assignment).is_ok() {
501 is_paused = false;
502 reader_paused.store(false, Ordering::Release);
503 debug!("reader: resumed Kafka partitions (fill={fill:.2})");
504 }
505 }
506 }
507
508 let msg_result = tokio::select! {
509 biased;
510 _ = reader_shutdown.changed() => break,
511 msg = tokio::time::timeout(
512 std::time::Duration::from_millis(200),
513 consumer.recv(),
514 ) => match msg {
515 Ok(result) => result,
516 Err(_timeout) => continue,
517 },
518 };
519 match msg_result {
520 Ok(msg) => {
521 if let Some(payload) = msg.payload() {
522 let topic = msg.topic();
523 if &*cached_topic != topic {
524 cached_topic = Arc::from(topic);
525 }
526 if seen_partitions.insert((Arc::clone(&cached_topic), msg.partition()))
527 {
528 let _ = seen_tx.send(seen_partitions.iter().cloned().collect());
529 }
530 let timestamp_ms = match msg.timestamp() {
531 rdkafka::Timestamp::CreateTime(ts)
532 | rdkafka::Timestamp::LogAppendTime(ts) => Some(ts),
533 rdkafka::Timestamp::NotAvailable => None,
534 };
535 let headers_json = if capture_headers {
536 use rdkafka::message::Headers;
537 msg.headers().and_then(|hdrs| {
538 let pairs: Vec<(String, serde_json::Value)> = (0..hdrs.count())
539 .map(|i| {
540 let h = hdrs.get(i);
541 let val = match h.value {
542 Some(v) => serde_json::Value::String(
543 String::from_utf8_lossy(v).into_owned(),
544 ),
545 None => serde_json::Value::Null,
546 };
547 (h.key.to_string(), val)
548 })
549 .collect();
550 serde_json::to_string(&pairs).ok()
551 })
552 } else {
553 None
554 };
555 let kp = KafkaPayload {
556 data: payload.to_vec(),
557 topic: Arc::clone(&cached_topic),
558 partition: msg.partition(),
559 offset: msg.offset(),
560 timestamp_ms,
561 headers_json,
562 };
563 match msg_tx.try_send(kp) {
564 Ok(()) => {
565 channel_len.fetch_add(1, Ordering::Relaxed);
566 }
567 Err(crossfire::TrySendError::Full(kp)) => {
568 if !is_paused {
569 if let Ok(assignment) = consumer.assignment() {
570 if consumer.pause(&assignment).is_ok() {
571 is_paused = true;
572 reader_paused.store(true, Ordering::Release);
573 debug!("reader: paused partitions (channel full)");
574 }
575 }
576 }
577 channel_len.fetch_add(1, Ordering::Relaxed);
578 let send_ok = tokio::select! {
579 biased;
580 _ = reader_shutdown.changed() => false,
581 result = msg_tx.send(kp) => result.is_ok(),
582 };
583 if !send_ok {
584 channel_len.fetch_sub(1, Ordering::Relaxed);
585 break;
586 }
587 }
588 Err(crossfire::TrySendError::Disconnected(_)) => break,
589 }
590 data_ready.notify_one();
591 }
592 }
593 Err(e) => {
594 warn!(error = %e, "Kafka consumer error");
595 }
596 }
597 }
598
599 let tpl = reader_offset_rx.borrow().clone();
604 if tpl.count() > 0 {
605 match consumer.commit(&tpl, CommitMode::Sync) {
606 Ok(()) => info!(
607 partition_count = tpl.count(),
608 "committed final offsets on shutdown"
609 ),
610 Err(e) => warn!(error = %e, "failed to commit final offsets on shutdown"),
611 }
612 }
613 consumer.unsubscribe();
614 });
615
616 self.msg_rx = Some(msg_rx);
617 self.reader_handle = Some(reader_handle);
618 self.commit_handle = Some(commit_handle);
619 self.hwm_handle = Some(hwm_handle);
620 self.reader_shutdown = Some(shutdown_tx);
621 self.offset_commit_tx = Some(offset_tx);
622 self.high_watermarks_rx = Some(hwm_rx);
623 }
624}
625
626#[async_trait]
627#[allow(clippy::too_many_lines)] impl SourceConnector for KafkaSource {
629 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
630 self.state = ConnectorState::Initializing;
631
632 let kafka_config = if config.properties().is_empty() {
634 self.config.clone()
635 } else {
636 let parsed = KafkaSourceConfig::from_config(config)?;
637 self.config = parsed.clone();
638 parsed
639 };
640
641 if let Some(sr_client) = Self::build_sr_client(&kafka_config)? {
643 let sr = Arc::new(sr_client);
644 self.schema_registry = Some(Arc::clone(&sr));
645 self.deserializer = if kafka_config.format == Format::Avro {
646 Box::new(AvroDeserializer::with_schema_registry(sr))
647 } else {
648 select_deserializer(kafka_config.format)
649 };
650 } else if let Some(ref sr) = self.schema_registry {
651 self.deserializer = if kafka_config.format == Format::Avro {
653 Box::new(AvroDeserializer::with_schema_registry(Arc::clone(sr)))
654 } else {
655 select_deserializer(kafka_config.format)
656 };
657 } else {
658 self.deserializer = select_deserializer(kafka_config.format);
659 }
660
661 self.last_avro_schema = None;
663
664 if let Some(schema) = config.arrow_schema() {
666 info!(
667 fields = schema.fields().len(),
668 "using SQL-defined schema for deserialization"
669 );
670 self.schema = schema;
671 }
672
673 info!(
674 brokers = %kafka_config.bootstrap_servers,
675 subscription = ?kafka_config.subscription,
676 group_id = %kafka_config.group_id,
677 format = %kafka_config.format,
678 schema_fields = self.schema.fields().len(),
679 "opening Kafka source connector"
680 );
681
682 let rdkafka_config: ClientConfig = kafka_config.to_rdkafka_config();
684 let context = LaminarConsumerContext::new(
685 Arc::clone(&self.checkpoint_request),
686 Arc::clone(&self.rebalance_state),
687 Arc::clone(&self.rebalance_counter),
688 Arc::clone(&self.revoke_generation),
689 Arc::clone(&self.reader_paused),
690 Arc::clone(&self.commit_retry_needed),
691 Arc::clone(&self.offset_snapshot),
692 self.metrics.commits.clone(),
696 self.metrics.commit_failures.clone(),
697 );
698 let consumer: StreamConsumer<LaminarConsumerContext> =
699 rdkafka_config.create_with_context(context).map_err(|e| {
700 ConnectorError::ConnectionFailed(format!("failed to create consumer: {e}"))
701 })?;
702
703 match &kafka_config.subscription {
705 TopicSubscription::Topics(topics) => {
706 let topic_refs: Vec<&str> = topics.iter().map(String::as_str).collect();
707 consumer.subscribe(&topic_refs).map_err(|e| {
708 ConnectorError::ConnectionFailed(format!("failed to subscribe: {e}"))
709 })?;
710 }
711 TopicSubscription::Pattern(pattern) => {
712 let regex_pattern = if pattern.starts_with('^') {
714 pattern.clone()
715 } else {
716 format!("^{pattern}")
717 };
718 consumer.subscribe(&[®ex_pattern]).map_err(|e| {
719 ConnectorError::ConnectionFailed(format!("failed to subscribe to pattern: {e}"))
720 })?;
721 }
722 }
723
724 match &kafka_config.startup_mode {
726 StartupMode::GroupOffsets | StartupMode::Earliest | StartupMode::Latest => {}
728 StartupMode::SpecificOffsets(offsets) => {
729 let mut tpl = rdkafka::TopicPartitionList::new();
730 let topics = match &kafka_config.subscription {
731 TopicSubscription::Topics(t) => t.clone(),
732 TopicSubscription::Pattern(_) => Vec::new(),
733 };
734 for topic in &topics {
735 for (&partition, &offset) in offsets {
736 if let Err(e) = tpl.add_partition_offset(
737 topic,
738 partition,
739 rdkafka::Offset::Offset(offset),
740 ) {
741 tracing::warn!(
742 %topic, partition, offset,
743 error = %e,
744 "failed to add specific offset to partition list"
745 );
746 }
747 }
748 }
749 if tpl.count() > 0 {
750 consumer.assign(&tpl).map_err(|e| {
751 ConnectorError::ConnectionFailed(format!(
752 "failed to assign specific offsets: {e}"
753 ))
754 })?;
755 info!(
756 partition_count = tpl.count(),
757 "assigned consumer to specific offsets"
758 );
759 }
760 }
761 StartupMode::Timestamp(ts_ms) => {
762 let mut tpl = rdkafka::TopicPartitionList::new();
766 let topics = match &kafka_config.subscription {
767 TopicSubscription::Topics(t) => t.clone(),
768 TopicSubscription::Pattern(_) => Vec::new(),
769 };
770 if let Ok(metadata) = consumer.fetch_metadata(
772 topics.first().map(String::as_str),
773 std::time::Duration::from_secs(10),
774 ) {
775 for topic_meta in metadata.topics() {
776 for partition_meta in topic_meta.partitions() {
777 if let Err(e) = tpl.add_partition_offset(
778 topic_meta.name(),
779 partition_meta.id(),
780 rdkafka::Offset::Offset(*ts_ms),
781 ) {
782 tracing::warn!(
783 topic = topic_meta.name(),
784 partition = partition_meta.id(),
785 error = %e,
786 "failed to add timestamp offset to partition list"
787 );
788 }
789 }
790 }
791 }
792 if tpl.count() > 0 {
793 match consumer.offsets_for_times(tpl, std::time::Duration::from_secs(10)) {
794 Ok(resolved) => {
795 consumer.assign(&resolved).map_err(|e| {
796 ConnectorError::ConnectionFailed(format!(
797 "failed to assign timestamp offsets: {e}"
798 ))
799 })?;
800 info!(
801 timestamp_ms = ts_ms,
802 partition_count = resolved.count(),
803 "assigned consumer to timestamp offsets"
804 );
805 }
806 Err(e) => {
807 warn!(
808 error = %e,
809 timestamp_ms = ts_ms,
810 "failed to resolve timestamp offsets, falling back to group offsets"
811 );
812 }
813 }
814 }
815 }
816 }
817
818 self.consumer = Some(consumer);
819 self.state = ConnectorState::Running;
820
821 if let Some(ref sr) = self.schema_registry {
831 if let TopicSubscription::Topics(topics) = &kafka_config.subscription {
832 if topics.len() > 1 {
833 warn!("multiple topics with schema registry — using first topic's schema");
834 }
835 if let Some(topic) = topics.first() {
836 let subject = resolve_value_subject(
837 kafka_config.schema_registry_subject_strategy,
838 kafka_config.schema_registry_record_name.as_deref(),
839 topic,
840 );
841 match tokio::time::timeout(
842 kafka_config.schema_registry_discovery_timeout,
843 sr.get_latest_schema(&subject),
844 )
845 .await
846 {
847 Ok(Ok(cached)) => {
848 if let Some(avro_deser) = self
849 .deserializer
850 .as_any_mut()
851 .and_then(|any| any.downcast_mut::<AvroDeserializer>())
852 {
853 if let Err(e) =
854 avro_deser.register_schema(cached.id, &cached.schema_str)
855 {
856 warn!(%subject, error = %e, "SR schema register failed");
857 } else {
858 log_schema_drift(&self.schema, &cached.arrow_schema, &subject);
861 info!(%subject, schema_id = cached.id,
862 "SR schema fetched at open()");
863 self.last_avro_schema = Some(cached.arrow_schema);
864 }
865 }
866 }
867 Ok(Err(e)) => {
868 warn!(%subject, error = %e, "SR unavailable at open(), will resolve lazily");
869 }
870 Err(_elapsed) => {
871 warn!(%subject, "SR prefetch timed out at open(), will resolve lazily");
872 }
873 }
874 }
875 }
876 }
877
878 info!("Kafka source connector opened successfully");
879 Ok(())
880 }
881
882 async fn discover_schema(&mut self, properties: &std::collections::HashMap<String, String>) {
883 let cfg = crate::config::ConnectorConfig::with_properties("kafka", properties.clone());
884 let Ok(kafka_config) = KafkaSourceConfig::from_config(&cfg) else {
885 return;
886 };
887 if kafka_config.format != Format::Avro {
888 return;
889 }
890
891 let topic = match &kafka_config.subscription {
892 TopicSubscription::Topics(topics) => match topics.first() {
893 Some(t) => {
894 if topics.len() > 1 {
895 warn!(topics = ?topics, chosen = %t,
896 "multi-topic source: using first topic's SR schema");
897 }
898 t.clone()
899 }
900 None => return,
901 },
902 TopicSubscription::Pattern(pattern) => {
903 warn!(%pattern,
904 "topic.pattern cannot auto-discover a schema — declare columns explicitly");
905 return;
906 }
907 };
908
909 let sr_client = match Self::build_sr_client(&kafka_config) {
910 Ok(Some(c)) => c,
911 Ok(None) => return,
912 Err(e) => {
913 warn!(error = %e, "Schema Registry client build failed");
914 return;
915 }
916 };
917
918 let subject = resolve_value_subject(
919 kafka_config.schema_registry_subject_strategy,
920 kafka_config.schema_registry_record_name.as_deref(),
921 &topic,
922 );
923 let timeout = kafka_config.schema_registry_discovery_timeout;
924
925 match tokio::time::timeout(timeout, sr_client.get_latest_schema(&subject)).await {
926 Ok(Ok(cached)) => {
927 self.metrics.record_sr_discovery_success();
928 info!(%subject, schema_id = cached.id,
929 fields = cached.arrow_schema.fields().len(),
930 "discovered Avro schema from Schema Registry");
931 self.schema = cached.arrow_schema;
932 }
933 Ok(Err(e)) => {
934 self.metrics.record_sr_discovery_failure();
935 warn!(%subject, error = %e, "Schema Registry lookup failed");
936 }
937 Err(_) => {
938 self.metrics.record_sr_discovery_timeout();
939 warn!(%subject, timeout_secs = timeout.as_secs(),
940 "Schema Registry lookup timed out");
941 }
942 }
943 }
944
945 #[allow(clippy::cast_possible_truncation)] async fn poll_batch(
947 &mut self,
948 max_records: usize,
949 ) -> Result<Option<SourceBatch>, ConnectorError> {
950 if self.state != ConnectorState::Running {
951 return Err(ConnectorError::InvalidState {
952 expected: "Running".into(),
953 actual: self.state.to_string(),
954 });
955 }
956
957 self.ensure_reader_started();
959
960 let rx = self
961 .msg_rx
962 .as_mut()
963 .ok_or_else(|| ConnectorError::InvalidState {
964 expected: "reader initialized".into(),
965 actual: "reader is None".into(),
966 })?;
967
968 let limit = max_records.min(self.config.max_poll_records);
969
970 self.poll_payload_buf.clear();
972 self.poll_payload_offsets.clear();
973 self.poll_meta_partitions.clear();
974 self.poll_meta_offsets.clear();
975 self.poll_meta_timestamps.clear();
976 self.poll_meta_headers.clear();
977
978 let mut total_bytes: u64 = 0;
979 let mut last_topic = String::new();
980 let mut last_partition_id: i32 = 0;
981 let mut last_offset: i64 = -1;
982 let include_metadata = self.config.include_metadata;
983 let include_headers = self.config.include_headers;
984
985 while self.poll_payload_offsets.len() < limit {
986 match rx.try_recv() {
987 Ok(kp) => {
988 self.channel_len.fetch_sub(1, Ordering::Release);
989 total_bytes += kp.data.len() as u64;
990 let start = self.poll_payload_buf.len();
991 self.poll_payload_buf.extend_from_slice(&kp.data);
992 self.poll_payload_offsets.push((start, kp.data.len()));
993
994 self.offsets.update_arc(&kp.topic, kp.partition, kp.offset);
995
996 if include_metadata {
997 self.poll_meta_partitions.push(kp.partition);
998 self.poll_meta_offsets.push(kp.offset);
999 self.poll_meta_timestamps.push(kp.timestamp_ms);
1000 }
1001 if include_headers {
1002 self.poll_meta_headers.push(kp.headers_json);
1003 }
1004
1005 if let Some(ref mut tracker) = self.watermark_tracker {
1007 if let Some(ts) = kp.timestamp_ms {
1008 tracker.update_partition(kp.partition, ts);
1009 }
1010 }
1011
1012 if last_topic.as_str() != &*kp.topic || last_partition_id != kp.partition {
1013 last_topic = kp.topic.to_string();
1014 last_partition_id = kp.partition;
1015 }
1016 last_offset = kp.offset;
1017 }
1018 Err(crossfire::TryRecvError::Empty) => break,
1019 Err(crossfire::TryRecvError::Disconnected) => {
1020 self.state = ConnectorState::Failed;
1021 return Err(ConnectorError::Internal(
1022 "Kafka reader task exited unexpectedly".into(),
1023 ));
1024 }
1025 }
1026 }
1027
1028 if let Some(ref mut tracker) = self.watermark_tracker {
1030 tracker.check_idle_partitions();
1031 }
1032
1033 let rebalance_events = self.rebalance_counter.swap(0, Ordering::Relaxed);
1035 for _ in 0..rebalance_events {
1036 self.metrics.record_rebalance();
1037 }
1038
1039 let current_revoke_gen = self.revoke_generation.load(Ordering::Acquire);
1042 let had_revoke = current_revoke_gen != self.last_seen_revoke_gen;
1043 if had_revoke {
1044 self.last_seen_revoke_gen = current_revoke_gen;
1045 let assigned = lock_or_recover(&self.rebalance_state)
1047 .assigned_partitions()
1048 .clone();
1049 let before = self.offsets.partition_count();
1050 self.offsets.retain_assigned(&assigned);
1051 let after = self.offsets.partition_count();
1052 if before != after {
1053 debug!(
1054 before,
1055 after, "purged revoked partition offsets after rebalance"
1056 );
1057 }
1058 }
1059
1060 if had_revoke || !self.poll_payload_offsets.is_empty() {
1072 if let Some(ref tx) = self.offset_commit_tx {
1073 let assigned = lock_or_recover(&self.rebalance_state)
1074 .assigned_partitions()
1075 .clone();
1076 let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
1077 if tx.send(tpl).is_err() {
1078 debug!("offset_commit_tx closed, reader task shutting down");
1079 }
1080 }
1081 lock_or_recover(&self.offset_snapshot).clone_from(&self.offsets);
1082 }
1083
1084 if let Some(ref hwm_rx) = self.high_watermarks_rx {
1087 let watermarks = hwm_rx.borrow();
1088 let mut total_lag: u64 = 0;
1089 for (topic, partition, high_watermark) in watermarks.iter() {
1090 if let Some(current_offset) = self.offsets.get(topic, *partition) {
1091 let lag = high_watermark.saturating_sub(current_offset + 1);
1092 #[allow(clippy::cast_sign_loss)]
1093 if lag > 0 {
1094 total_lag += lag as u64;
1095 }
1096 }
1097 }
1098 self.metrics.set_lag(total_lag);
1099 }
1100
1101 if self.poll_payload_offsets.is_empty() {
1102 return Ok(None);
1103 }
1104
1105 let last_partition = if last_offset >= 0 {
1109 Some(PartitionInfo::new(
1110 format!("{last_topic}-{last_partition_id}"),
1111 last_offset.to_string(),
1112 ))
1113 } else {
1114 None
1115 };
1116
1117 if let Some(avro_deser) = self
1120 .deserializer
1121 .as_any_mut()
1122 .and_then(|any| any.downcast_mut::<AvroDeserializer>())
1123 {
1124 let mut new_schema_ids = Vec::new();
1125 for &(start, len) in &self.poll_payload_offsets {
1126 if let Some(schema_id) = AvroDeserializer::extract_confluent_id(
1127 &self.poll_payload_buf[start..start + len],
1128 ) {
1129 let is_new = avro_deser
1130 .ensure_schema_registered(schema_id)
1131 .await
1132 .map_err(ConnectorError::Serde)?;
1133 if is_new {
1134 new_schema_ids.push(schema_id);
1135 }
1136 }
1137 }
1138
1139 if !new_schema_ids.is_empty()
1141 && self.config.schema_evolution_strategy != SchemaEvolutionStrategy::Ignore
1142 {
1143 if let Some(ref sr) = self.schema_registry {
1144 let compat = self
1145 .config
1146 .schema_compatibility
1147 .map_or(CompatibilityMode::Backward, CompatibilityMode::from);
1148 let evolver = SchemaEvolution::new(compat);
1149
1150 for id in new_schema_ids {
1151 let cached = sr.resolve_confluent_id(id).await.map_err(|e| {
1152 ConnectorError::SchemaMismatch(format!(
1153 "failed to resolve schema {id}: {e}"
1154 ))
1155 })?;
1156
1157 let Some(ref prev) = self.last_avro_schema else {
1158 info!(schema_id = id, "initial Avro schema registered");
1160 self.last_avro_schema = Some(Arc::clone(&cached.arrow_schema));
1161 continue;
1162 };
1163
1164 let changes = evolver.diff_schemas(prev, &cached.arrow_schema);
1165 self.last_avro_schema = Some(Arc::clone(&cached.arrow_schema));
1166
1167 if changes.is_empty() {
1168 info!(
1169 schema_id = id,
1170 "new Avro schema ID registered, no field changes"
1171 );
1172 continue;
1173 }
1174 let verdict = evolver.evaluate_evolution(&changes);
1175 match &verdict {
1176 EvolutionVerdict::Compatible => {
1177 info!(schema_id = id, ?changes, "schema evolved (compatible)");
1178 }
1179 EvolutionVerdict::RequiresMigration => {
1180 warn!(
1181 schema_id = id,
1182 ?changes,
1183 "schema evolved (requires migration)"
1184 );
1185 }
1186 EvolutionVerdict::Incompatible(reason) => {
1187 if self.config.schema_evolution_strategy
1188 == SchemaEvolutionStrategy::Reject
1189 {
1190 return Err(ConnectorError::SchemaMismatch(format!(
1191 "incompatible schema evolution for ID {id}: {reason}"
1192 )));
1193 }
1194 warn!(
1195 schema_id = id, %reason, ?changes,
1196 "incompatible schema evolution detected"
1197 );
1198 }
1199 }
1200 }
1201 }
1202 }
1203 }
1204
1205 let refs: Vec<&[u8]> = self
1206 .poll_payload_offsets
1207 .iter()
1208 .map(|&(start, len)| &self.poll_payload_buf[start..start + len])
1209 .collect();
1210
1211 let (batch, good_indices) = match self.deserializer.deserialize_batch(&refs, &self.schema) {
1214 Ok(batch) => (batch, None),
1215 Err(batch_err) => {
1216 let mut good_batches = Vec::with_capacity(refs.len());
1221 let mut good_idx = Vec::with_capacity(refs.len());
1222 let mut error_count = 0u64;
1223 for (i, r) in refs.iter().enumerate() {
1224 match self
1225 .deserializer
1226 .deserialize_batch(std::slice::from_ref(r), &self.schema)
1227 {
1228 Ok(batch) => {
1229 good_batches.push(batch);
1230 good_idx.push(i);
1231 }
1232 Err(e) => {
1233 error_count += 1;
1234 self.metrics.record_error();
1235 warn!(error = %e, "skipping poison pill record");
1236 }
1237 }
1238 }
1239 if good_batches.is_empty() {
1240 return Err(ConnectorError::Serde(batch_err));
1241 }
1242 #[allow(clippy::cast_precision_loss)]
1244 if error_count > 0 {
1245 let error_rate = error_count as f64 / refs.len() as f64;
1246 if error_rate > self.config.max_deser_error_rate {
1247 return Err(ConnectorError::Serde(batch_err));
1248 }
1249 warn!(
1250 skipped = error_count,
1251 total = refs.len(),
1252 error_rate = %format_args!("{error_rate:.1}"),
1253 "deserialized batch with poison pill isolation"
1254 );
1255 }
1256 let concat_schema = good_batches[0].schema();
1257 let batch = arrow_select::concat::concat_batches(&concat_schema, &good_batches)
1258 .map_err(|e| {
1259 ConnectorError::Internal(format!("failed to concat batches: {e}"))
1260 })?;
1261 (batch, Some(good_idx))
1262 }
1263 };
1264
1265 if let Some(ref idx) = good_indices {
1268 if include_metadata {
1269 self.poll_meta_partitions =
1270 idx.iter().map(|&i| self.poll_meta_partitions[i]).collect();
1271 self.poll_meta_offsets = idx.iter().map(|&i| self.poll_meta_offsets[i]).collect();
1272 self.poll_meta_timestamps =
1273 idx.iter().map(|&i| self.poll_meta_timestamps[i]).collect();
1274 }
1275 if include_headers {
1276 self.poll_meta_headers = idx
1277 .iter()
1278 .map(|&i| std::mem::take(&mut self.poll_meta_headers[i]))
1279 .collect();
1280 }
1281 }
1282
1283 let needs_meta = include_metadata && !self.poll_meta_partitions.is_empty();
1285 let needs_headers = include_headers && !self.poll_meta_headers.is_empty();
1286 let batch = if needs_meta || needs_headers {
1287 use arrow_schema::{DataType, Field};
1288
1289 let mut fields = batch.schema().fields().to_vec();
1290 let mut columns: Vec<Arc<dyn arrow_array::Array>> = batch.columns().to_vec();
1291
1292 if needs_meta {
1293 use arrow_array::{Int32Array, Int64Array, TimestampMillisecondArray};
1294 use arrow_schema::TimeUnit;
1295 fields.push(Arc::new(Field::new("_partition", DataType::Int32, false)));
1296 columns.push(Arc::new(Int32Array::from(std::mem::take(
1297 &mut self.poll_meta_partitions,
1298 ))));
1299 fields.push(Arc::new(Field::new("_offset", DataType::Int64, false)));
1300 columns.push(Arc::new(Int64Array::from(std::mem::take(
1301 &mut self.poll_meta_offsets,
1302 ))));
1303 fields.push(Arc::new(Field::new(
1304 "_timestamp",
1305 DataType::Timestamp(TimeUnit::Millisecond, None),
1306 true,
1307 )));
1308 columns.push(Arc::new(TimestampMillisecondArray::from(std::mem::take(
1309 &mut self.poll_meta_timestamps,
1310 ))));
1311 }
1312 if needs_headers {
1313 fields.push(Arc::new(Field::new("_headers", DataType::Utf8, true)));
1314 columns.push(Arc::new(arrow_array::StringArray::from(std::mem::take(
1315 &mut self.poll_meta_headers,
1316 ))));
1317 }
1318
1319 let meta_schema = Arc::new(arrow_schema::Schema::new(fields));
1320 arrow_array::RecordBatch::try_new(meta_schema, columns).map_err(|e| {
1321 ConnectorError::Internal(format!("failed to append metadata columns: {e}"))
1322 })?
1323 } else {
1324 batch
1325 };
1326
1327 let num_rows = batch.num_rows();
1328 self.metrics.record_poll(num_rows as u64, total_bytes);
1329
1330 let source_batch = if let Some(partition) = last_partition {
1331 SourceBatch::with_partition(batch, partition)
1332 } else {
1333 SourceBatch::new(batch)
1334 };
1335
1336 debug!(
1337 records = num_rows,
1338 bytes = total_bytes,
1339 "polled batch from Kafka"
1340 );
1341
1342 Ok(Some(source_batch))
1343 }
1344
1345 fn schema(&self) -> SchemaRef {
1346 self.schema.clone()
1347 }
1348
1349 fn checkpoint(&self) -> SourceCheckpoint {
1350 let assigned = lock_or_recover(&self.rebalance_state)
1351 .assigned_partitions()
1352 .clone();
1353
1354 if let Some(ref tx) = self.offset_commit_tx {
1357 let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
1358 let _ = tx.send(tpl);
1359 }
1360
1361 self.offsets.to_checkpoint_filtered(&assigned)
1362 }
1363
1364 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
1371 info!(
1372 epoch = checkpoint.epoch(),
1373 partition_count = checkpoint.offsets().len(),
1374 "staging checkpointed offsets for seek-on-assign"
1375 );
1376
1377 self.offsets = OffsetTracker::from_checkpoint(checkpoint);
1378 match self.offset_snapshot.lock() {
1379 Ok(mut snapshot) => snapshot.clone_from(&self.offsets),
1380 Err(poisoned) => poisoned.into_inner().clone_from(&self.offsets),
1381 }
1382
1383 Ok(())
1384 }
1385
1386 fn health_check(&self) -> HealthStatus {
1387 match self.state {
1388 ConnectorState::Running => {
1389 if self.reader_paused.load(Ordering::Acquire) {
1390 HealthStatus::Degraded("backpressure: consumption paused".into())
1391 } else {
1392 HealthStatus::Healthy
1393 }
1394 }
1395 ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
1396 ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
1397 ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
1398 ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
1399 ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
1400 }
1401 }
1402
1403 fn metrics(&self) -> ConnectorMetrics {
1404 self.metrics.to_connector_metrics()
1405 }
1406
1407 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
1408 Some(Arc::clone(&self.data_ready))
1409 }
1410
1411 fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> {
1412 Some(Arc::clone(&self.checkpoint_request))
1413 }
1414
1415 async fn close(&mut self) -> Result<(), ConnectorError> {
1416 info!("closing Kafka source connector");
1417
1418 let assigned = lock_or_recover(&self.rebalance_state)
1419 .assigned_partitions()
1420 .clone();
1421
1422 if let Some(ref tx) = self.offset_commit_tx {
1424 let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
1425 if tpl.count() > 0 {
1426 let _ = tx.send(tpl);
1427 }
1428 }
1429
1430 if let Some(tx) = self.reader_shutdown.take() {
1432 let _ = tx.send(true);
1433 }
1434 let timeout = std::time::Duration::from_secs(5);
1435 if let Some(handle) = self.reader_handle.take() {
1436 let _ = tokio::time::timeout(timeout, handle).await;
1437 }
1438 if let Some(handle) = self.commit_handle.take() {
1439 let _ = tokio::time::timeout(timeout, handle).await;
1440 }
1441 if let Some(handle) = self.hwm_handle.take() {
1442 let _ = tokio::time::timeout(timeout, handle).await;
1443 }
1444 self.msg_rx = None;
1445 self.offset_commit_tx = None;
1446
1447 if let Some(ref consumer) = self.consumer {
1450 let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
1451 if tpl.count() > 0 {
1452 if let Err(e) = consumer.commit(&tpl, CommitMode::Sync) {
1453 warn!(error = %e, "failed to commit final offsets");
1454 }
1455 }
1456 consumer.unsubscribe();
1457 }
1458
1459 self.consumer = None;
1460 self.state = ConnectorState::Closed;
1461 info!("Kafka source connector closed");
1462 Ok(())
1463 }
1464}
1465
1466impl std::fmt::Debug for KafkaSource {
1467 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1468 f.debug_struct("KafkaSource")
1469 .field("state", &self.state)
1470 .field("subscription", &self.config.subscription)
1471 .field("group_id", &self.config.group_id)
1472 .field("format", &self.config.format)
1473 .field("partitions", &self.offsets.partition_count())
1474 .finish_non_exhaustive()
1475 }
1476}
1477
1478fn log_schema_drift(declared: &arrow_schema::Schema, live: &arrow_schema::Schema, subject: &str) {
1481 if declared.fields().is_empty() || declared.fields() == live.fields() {
1482 return;
1483 }
1484 let decl: BTreeSet<&str> = declared
1485 .fields()
1486 .iter()
1487 .map(|f| f.name().as_str())
1488 .collect();
1489 let lv: BTreeSet<&str> = live.fields().iter().map(|f| f.name().as_str()).collect();
1490 warn!(
1491 %subject,
1492 missing_in_sr = ?decl.difference(&lv).collect::<Vec<_>>(),
1493 added_in_sr = ?lv.difference(&decl).collect::<Vec<_>>(),
1494 "schema drift: re-apply CREATE SOURCE DDL to pick up the current SR schema"
1495 );
1496}
1497
1498fn select_deserializer(format: Format) -> Box<dyn RecordDeserializer> {
1499 match format {
1500 Format::Avro => Box::new(AvroDeserializer::new()),
1501 other => serde::create_deserializer(other).unwrap_or_else(|_| {
1502 warn!(format = %other, "unsupported format, falling back to JSON");
1503 Box::new(serde::json::JsonDeserializer::new())
1504 }),
1505 }
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510 use super::*;
1511 use arrow_schema::{DataType, Field, Schema};
1512
1513 fn test_schema() -> SchemaRef {
1514 Arc::new(Schema::new(vec![
1515 Field::new("id", DataType::Int64, false),
1516 Field::new("value", DataType::Utf8, false),
1517 ]))
1518 }
1519
1520 fn test_config() -> KafkaSourceConfig {
1521 let mut cfg = KafkaSourceConfig::default();
1522 cfg.bootstrap_servers = "localhost:9092".into();
1523 cfg.group_id = "test-group".into();
1524 cfg.subscription = TopicSubscription::Topics(vec!["events".into()]);
1525 cfg
1526 }
1527
1528 #[test]
1529 fn test_new_defaults() {
1530 let source = KafkaSource::new(test_schema(), test_config(), None);
1531 assert_eq!(source.state(), ConnectorState::Created);
1532 assert!(source.consumer.is_none());
1533 assert_eq!(source.offsets().partition_count(), 0);
1534 }
1535
1536 #[test]
1537 fn test_schema_returned() {
1538 let schema = test_schema();
1539 let source = KafkaSource::new(schema.clone(), test_config(), None);
1540 assert_eq!(source.schema(), schema);
1541 }
1542
1543 #[test]
1544 fn test_checkpoint_empty() {
1545 let source = KafkaSource::new(test_schema(), test_config(), None);
1546 let cp = source.checkpoint();
1547 assert!(cp.is_empty());
1548 }
1549
1550 #[test]
1551 fn test_checkpoint_with_offsets() {
1552 let mut source = KafkaSource::new(test_schema(), test_config(), None);
1553 source.offsets.update("events", 0, 100);
1554 source.offsets.update("events", 1, 200);
1555
1556 {
1558 let mut state = source.rebalance_state.lock().unwrap();
1559 state.on_assign(&[("events".into(), 0), ("events".into(), 1)]);
1560 }
1561
1562 let cp = source.checkpoint();
1563 assert_eq!(cp.get_offset("events-0"), Some("100"));
1564 assert_eq!(cp.get_offset("events-1"), Some("200"));
1565 }
1566
1567 #[test]
1568 fn test_health_check_created() {
1569 let source = KafkaSource::new(test_schema(), test_config(), None);
1570 assert_eq!(source.health_check(), HealthStatus::Unknown);
1571 }
1572
1573 #[test]
1574 fn test_health_check_running() {
1575 let mut source = KafkaSource::new(test_schema(), test_config(), None);
1576 source.state = ConnectorState::Running;
1577 assert_eq!(source.health_check(), HealthStatus::Healthy);
1578 }
1579
1580 #[test]
1581 fn test_health_check_closed() {
1582 let mut source = KafkaSource::new(test_schema(), test_config(), None);
1583 source.state = ConnectorState::Closed;
1584 assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
1585 }
1586
1587 #[test]
1588 fn test_metrics_initial() {
1589 let source = KafkaSource::new(test_schema(), test_config(), None);
1590 let m = source.metrics();
1591 assert_eq!(m.records_total, 0);
1592 assert_eq!(m.bytes_total, 0);
1593 assert_eq!(m.errors_total, 0);
1594 }
1595
1596 #[test]
1597 fn test_deserializer_selection_json() {
1598 let source = KafkaSource::new(test_schema(), test_config(), None);
1599 assert_eq!(source.deserializer.format(), Format::Json);
1600 }
1601
1602 #[test]
1603 fn test_deserializer_selection_csv() {
1604 let mut cfg = test_config();
1605 cfg.format = Format::Csv;
1606 let source = KafkaSource::new(test_schema(), cfg, None);
1607 assert_eq!(source.deserializer.format(), Format::Csv);
1608 }
1609
1610 #[test]
1611 fn test_with_schema_registry() {
1612 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1613 let mut cfg = test_config();
1614 cfg.format = Format::Avro;
1615 cfg.schema_registry_url = Some("http://localhost:8081".into());
1616
1617 let source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1618 assert!(source.schema_registry.is_some());
1619 assert_eq!(source.deserializer.format(), Format::Avro);
1620 }
1621
1622 #[tokio::test]
1623 async fn test_open_preserves_injected_schema_registry() {
1624 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1625 let mut cfg = test_config();
1626 cfg.format = Format::Avro;
1627 cfg.schema_registry_url = Some("http://localhost:8081".into());
1628 let mut source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1629
1630 let empty_config = crate::config::ConnectorConfig::new("kafka");
1632 let _ = source.open(&empty_config).await;
1635 assert!(source.schema_registry.is_some());
1636 assert_eq!(source.deserializer.format(), Format::Avro);
1637 }
1638
1639 #[test]
1640 fn test_debug_output() {
1641 let source = KafkaSource::new(test_schema(), test_config(), None);
1642 let debug = format!("{source:?}");
1643 assert!(debug.contains("KafkaSource"));
1644 assert!(debug.contains("events"));
1645 }
1646
1647 #[test]
1648 fn test_checkpoint_filters_revoked_partitions() {
1649 let mut source = KafkaSource::new(test_schema(), test_config(), None);
1650 source.offsets.update("events", 0, 100);
1651 source.offsets.update("events", 1, 200);
1652 source.offsets.update("events", 2, 300);
1653
1654 {
1656 let mut state = source.rebalance_state.lock().unwrap();
1657 state.on_assign(&[("events".into(), 0), ("events".into(), 2)]);
1658 }
1659
1660 let cp = source.checkpoint();
1661 assert_eq!(cp.get_offset("events-0"), Some("100"));
1662 assert_eq!(cp.get_offset("events-1"), None); assert_eq!(cp.get_offset("events-2"), Some("300"));
1664 }
1665
1666 #[test]
1667 fn test_checkpoint_empty_before_first_rebalance() {
1668 let mut source = KafkaSource::new(test_schema(), test_config(), None);
1669 source.offsets.update("events", 0, 100);
1670 source.offsets.update("events", 1, 200);
1671
1672 let cp = source.checkpoint();
1675 assert!(cp.is_empty());
1676 }
1677
1678 fn empty_schema() -> SchemaRef {
1683 Arc::new(Schema::empty())
1684 }
1685
1686 fn props(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
1687 pairs
1688 .iter()
1689 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
1690 .collect()
1691 }
1692
1693 #[tokio::test]
1694 async fn discover_schema_skips_non_avro_format() {
1695 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1696 source
1697 .discover_schema(&props(&[
1698 ("bootstrap.servers", "localhost:9092"),
1699 ("group.id", "g"),
1700 ("topic", "t"),
1701 ("format", "json"),
1702 ("schema.registry.url", "http://localhost:8081"),
1703 ]))
1704 .await;
1705 assert_eq!(source.schema().fields().len(), 0);
1707 }
1708
1709 #[tokio::test]
1710 async fn discover_schema_skips_without_sr_url() {
1711 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1712 source
1713 .discover_schema(&props(&[
1714 ("bootstrap.servers", "localhost:9092"),
1715 ("group.id", "g"),
1716 ("topic", "t"),
1717 ("format", "avro"),
1718 ]))
1719 .await;
1720 assert_eq!(source.schema().fields().len(), 0);
1721 }
1722
1723 #[tokio::test]
1724 async fn discover_schema_skips_topic_pattern() {
1725 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1728 source
1729 .discover_schema(&props(&[
1730 ("bootstrap.servers", "localhost:9092"),
1731 ("group.id", "g"),
1732 ("topic.pattern", "events-.*"),
1733 ("format", "avro"),
1734 ("schema.registry.url", "http://localhost:8081"),
1735 ]))
1736 .await;
1737 assert_eq!(source.schema().fields().len(), 0);
1738 }
1739
1740 #[tokio::test]
1741 async fn discover_schema_unreachable_sr_leaves_schema_empty() {
1742 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1746 let start = std::time::Instant::now();
1747 tokio::time::timeout(
1748 std::time::Duration::from_secs(20),
1749 source.discover_schema(&props(&[
1750 ("bootstrap.servers", "localhost:9092"),
1751 ("group.id", "g"),
1752 ("topic", "t"),
1753 ("format", "avro"),
1754 ("schema.registry.url", "http://192.0.2.1:65535"),
1756 ])),
1757 )
1758 .await
1759 .expect("discover_schema must honor its own 10s timeout");
1760 assert!(
1761 start.elapsed() < std::time::Duration::from_secs(15),
1762 "discover_schema should have returned well before the outer 20s budget"
1763 );
1764 assert_eq!(source.schema().fields().len(), 0);
1765 }
1766
1767 #[tokio::test]
1771 async fn discover_schema_happy_path_with_wiremock_sr() {
1772 use wiremock::matchers::{method, path};
1773 use wiremock::{Mock, MockServer, ResponseTemplate};
1774
1775 let avro_schema = serde_json::json!({
1776 "type": "record",
1777 "name": "event",
1778 "fields": [
1779 {"name": "id", "type": "long"},
1780 {"name": "data", "type": {"type": "map", "values": "string"}}
1781 ]
1782 })
1783 .to_string();
1784
1785 let sr = MockServer::start().await;
1786 Mock::given(method("GET"))
1787 .and(path("/subjects/ion_tw-value/versions/latest"))
1788 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1789 "id": 42,
1790 "version": 1,
1791 "subject": "ion_tw-value",
1792 "schema": avro_schema,
1793 "schemaType": "AVRO",
1794 })))
1795 .mount(&sr)
1796 .await;
1797
1798 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1799 source
1800 .discover_schema(&props(&[
1801 ("bootstrap.servers", "localhost:9092"),
1802 ("group.id", "g"),
1803 ("topic", "ion_tw"),
1804 ("format", "avro"),
1805 ("schema.registry.url", &sr.uri()),
1806 ]))
1807 .await;
1808
1809 let schema = source.schema();
1810 assert_eq!(schema.fields().len(), 2, "expected [id, data]");
1811 assert_eq!(schema.field(0).name(), "id");
1812 assert_eq!(schema.field(1).name(), "data");
1813 assert!(
1814 matches!(
1815 schema.field(1).data_type(),
1816 arrow_schema::DataType::Map(_, _)
1817 ),
1818 "'data' field must survive as a Map type (got {:?})",
1819 schema.field(1).data_type()
1820 );
1821 }
1822
1823 #[tokio::test]
1826 async fn discover_schema_happy_path_record_name_strategy() {
1827 use wiremock::matchers::{method, path};
1828 use wiremock::{Mock, MockServer, ResponseTemplate};
1829
1830 let avro_schema = serde_json::json!({
1831 "type": "record",
1832 "name": "com.acme.Order",
1833 "fields": [
1834 {"name": "order_id", "type": "string"},
1835 {"name": "amount", "type": "double"}
1836 ]
1837 })
1838 .to_string();
1839
1840 let sr = MockServer::start().await;
1841 Mock::given(method("GET"))
1842 .and(path("/subjects/com.acme.Order-value/versions/latest"))
1843 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1844 "id": 7,
1845 "version": 1,
1846 "subject": "com.acme.Order-value",
1847 "schema": avro_schema,
1848 "schemaType": "AVRO",
1849 })))
1850 .mount(&sr)
1851 .await;
1852
1853 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1854 source
1855 .discover_schema(&props(&[
1856 ("bootstrap.servers", "localhost:9092"),
1857 ("group.id", "g"),
1858 ("topic", "orders"),
1859 ("format", "avro"),
1860 ("schema.registry.url", &sr.uri()),
1861 ("schema.registry.subject.name.strategy", "record-name"),
1862 ("schema.registry.record.name", "com.acme.Order"),
1863 ]))
1864 .await;
1865
1866 let schema = source.schema();
1867 assert_eq!(schema.fields().len(), 2);
1868 assert_eq!(schema.field(0).name(), "order_id");
1869 assert_eq!(schema.field(1).name(), "amount");
1870 }
1871
1872 #[tokio::test]
1876 async fn open_logs_drift_when_sr_evolved_since_ddl() {
1877 use wiremock::matchers::{method, path};
1878 use wiremock::{Mock, MockServer, ResponseTemplate};
1879
1880 let evolved_schema = serde_json::json!({
1881 "type": "record",
1882 "name": "event",
1883 "fields": [
1884 {"name": "id", "type": "long"},
1885 {"name": "data", "type": {"type": "map", "values": "string"}},
1886 {"name": "version", "type": "int"}
1887 ]
1888 })
1889 .to_string();
1890
1891 let sr = MockServer::start().await;
1892 Mock::given(method("GET"))
1893 .and(path("/subjects/ion_tw-value/versions/latest"))
1894 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1895 "id": 99,
1896 "version": 2,
1897 "subject": "ion_tw-value",
1898 "schema": evolved_schema,
1899 "schemaType": "AVRO",
1900 })))
1901 .mount(&sr)
1902 .await;
1903
1904 let stale_catalog = Arc::new(Schema::new(vec![
1907 Field::new("id", DataType::Int64, false),
1908 Field::new(
1909 "data",
1910 DataType::Map(
1911 Arc::new(Field::new(
1912 "entries",
1913 DataType::Struct(arrow_schema::Fields::from(vec![
1914 Field::new("key", DataType::Utf8, false),
1915 Field::new("value", DataType::Utf8, true),
1916 ])),
1917 false,
1918 )),
1919 false,
1920 ),
1921 true,
1922 ),
1923 ]));
1924
1925 let mut cfg = KafkaSourceConfig::default();
1926 cfg.bootstrap_servers = "localhost:9092".into();
1927 cfg.group_id = "g".into();
1928 cfg.subscription = TopicSubscription::Topics(vec!["ion_tw".into()]);
1929 cfg.format = Format::Avro;
1930 cfg.schema_registry_url = Some(sr.uri());
1931 let sr_client = SchemaRegistryClient::new(sr.uri(), None);
1932 let mut source = KafkaSource::with_schema_registry(stale_catalog, cfg, sr_client);
1933
1934 let empty_cfg = crate::config::ConnectorConfig::new("kafka");
1935 let _ = source.open(&empty_cfg).await; assert_eq!(
1938 source.schema().fields().len(),
1939 2,
1940 "catalog schema must stay pinned even after SR drift"
1941 );
1942 assert_eq!(
1943 source.last_avro_schema.as_ref().map(|s| s.fields().len()),
1944 Some(3),
1945 "last_avro_schema should reflect the evolved SR shape"
1946 );
1947 }
1948}