1use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
11use rdkafka::message::Message;
12use rdkafka::ClientConfig;
13use rdkafka::TopicPartitionList;
14use std::collections::BTreeSet;
15use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17use tokio::sync::{watch, Notify};
18use tracing::{debug, info, warn};
19
20use super::rebalance::LaminarConsumerContext;
21
22fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
28 mutex.lock().unwrap_or_else(|poisoned| {
29 tracing::warn!("mutex poisoned, recovering");
30 poisoned.into_inner()
31 })
32}
33
34use crate::checkpoint::SourceCheckpoint;
35use crate::config::{ConnectorConfig, ConnectorState};
36use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
37use crate::error::ConnectorError;
38use crate::serde::{self, Format, RecordDeserializer};
39
40use super::avro::AvroDeserializer;
41use super::config::{
42 resolve_value_subject, KafkaSourceConfig, SchemaEvolutionStrategy, StartupMode,
43 TopicSubscription,
44};
45use super::metrics::KafkaSourceMetrics;
46use super::offsets::OffsetTracker;
47use super::rebalance::RebalanceState;
48use super::schema_registry::SchemaRegistryClient;
49use super::watermarks::KafkaWatermarkTracker;
50
51use crate::schema::evolution::SchemaEvolution;
52use crate::schema::traits::{CompatibilityMode, EvolutionVerdict};
53
54struct KafkaPayload {
56 data: Vec<u8>,
57 topic: Arc<str>,
58 partition: i32,
59 offset: i64,
60 timestamp_ms: Option<i64>,
61 headers_json: Option<String>,
64}
65
66type KafkaPayloadRx = crossfire::AsyncRx<crossfire::mpsc::Array<KafkaPayload>>;
68
69pub struct KafkaSource {
82 consumer: Option<StreamConsumer<LaminarConsumerContext>>,
83 config: KafkaSourceConfig,
84 deserializer: Box<dyn RecordDeserializer>,
85 offsets: OffsetTracker,
86 state: ConnectorState,
87 metrics: KafkaSourceMetrics,
88 schema: SchemaRef,
89 channel_len: Arc<AtomicUsize>,
90 rebalance_state: Arc<Mutex<RebalanceState>>,
91 rebalance_counter: Arc<AtomicU64>,
93 revoke_generation: Arc<AtomicU64>,
100 last_seen_revoke_gen: u64,
102 schema_registry: Option<Arc<SchemaRegistryClient>>,
103 data_ready: Arc<Notify>,
104 checkpoint_request: Arc<AtomicBool>,
105 msg_rx: Option<KafkaPayloadRx>,
106 reader_handle: Option<tokio::task::JoinHandle<()>>,
107 commit_handle: Option<tokio::task::JoinHandle<()>>,
108 hwm_handle: Option<tokio::task::JoinHandle<()>>,
109 reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
110 commit_tx: Option<watch::Sender<Option<TopicPartitionList>>>,
112 watermark_tracker: Option<KafkaWatermarkTracker>,
113 #[allow(clippy::type_complexity)]
116 high_watermarks_rx: Option<tokio::sync::watch::Receiver<Vec<(Arc<str>, i32, i64)>>>,
117 reader_paused: Arc<AtomicBool>,
121 offset_snapshot: Arc<Mutex<OffsetTracker>>,
124
125 vnode_assignment: Option<(
132 Arc<laminar_core::state::VnodeRegistry>,
133 laminar_core::state::NodeId,
134 )>,
135 vnode_topic_meta: Vec<(Arc<str>, i32)>,
139
140 last_avro_schema: Option<SchemaRef>,
143
144 poll_payload_buf: Vec<u8>,
146 poll_payload_offsets: Vec<(usize, usize)>,
147 poll_meta_partitions: Vec<i32>,
148 poll_meta_offsets: Vec<i64>,
149 poll_meta_timestamps: Vec<Option<i64>>,
150 poll_meta_headers: Vec<Option<String>>,
151}
152
153impl KafkaSource {
154 #[must_use]
156 pub fn new(
157 schema: SchemaRef,
158 config: KafkaSourceConfig,
159 registry: Option<&prometheus::Registry>,
160 ) -> Self {
161 Self::build_base(schema, config, select_deserializer, None, registry)
162 }
163
164 #[must_use]
166 pub fn with_schema_registry(
167 schema: SchemaRef,
168 config: KafkaSourceConfig,
169 sr_client: SchemaRegistryClient,
170 ) -> Self {
171 let sr = Arc::new(sr_client);
172 let sr_clone = Arc::clone(&sr);
173 let deser_factory = move |format: Format| -> Box<dyn RecordDeserializer> {
174 if format == Format::Avro {
175 Box::new(AvroDeserializer::with_schema_registry(sr_clone))
176 } else {
177 select_deserializer(format)
178 }
179 };
180 Self::build_base(schema, config, deser_factory, Some(sr), None)
181 }
182
183 fn build_sr_client(
186 config: &KafkaSourceConfig,
187 ) -> Result<Option<SchemaRegistryClient>, ConnectorError> {
188 let Some(sr_url) = config.schema_registry_url.as_ref() else {
189 return Ok(None);
190 };
191 let client = if let Some(ca) = config.schema_registry_ssl_ca_location.as_deref() {
192 SchemaRegistryClient::with_tls_mtls(
193 sr_url.clone(),
194 config.schema_registry_auth.clone(),
195 ca,
196 config.schema_registry_ssl_certificate_location.as_deref(),
197 config.schema_registry_ssl_key_location.as_deref(),
198 )?
199 } else {
200 SchemaRegistryClient::new(sr_url.clone(), config.schema_registry_auth.clone())
201 };
202 Ok(Some(client))
203 }
204
205 fn build_base(
206 schema: SchemaRef,
207 config: KafkaSourceConfig,
208 deser_factory: impl FnOnce(Format) -> Box<dyn RecordDeserializer>,
209 schema_registry: Option<Arc<SchemaRegistryClient>>,
210 registry: Option<&prometheus::Registry>,
211 ) -> Self {
212 let deserializer = deser_factory(config.format);
213 let channel_len = Arc::new(AtomicUsize::new(0));
214
215 let watermark_tracker = if config.enable_watermark_tracking {
216 Some(
217 KafkaWatermarkTracker::new(0, config.idle_timeout)
218 .with_max_out_of_orderness(config.max_out_of_orderness),
219 )
220 } else {
221 None
222 };
223
224 Self {
225 consumer: None,
226 config,
227 deserializer,
228 offsets: OffsetTracker::new(),
229 state: ConnectorState::Created,
230 metrics: KafkaSourceMetrics::new(registry),
231 schema,
232 channel_len,
233 rebalance_state: Arc::new(Mutex::new(RebalanceState::new())),
234 rebalance_counter: Arc::new(AtomicU64::new(0)),
235 revoke_generation: Arc::new(AtomicU64::new(0)),
236 last_seen_revoke_gen: 0,
237 schema_registry,
238 data_ready: Arc::new(Notify::new()),
239 checkpoint_request: Arc::new(AtomicBool::new(false)),
240 msg_rx: None,
241 reader_handle: None,
242 commit_handle: None,
243 hwm_handle: None,
244 reader_shutdown: None,
245 commit_tx: None,
246 watermark_tracker,
247 high_watermarks_rx: None,
248 reader_paused: Arc::new(AtomicBool::new(false)),
249 offset_snapshot: Arc::new(Mutex::new(OffsetTracker::new())),
250 vnode_assignment: None,
251 vnode_topic_meta: Vec::new(),
252 last_avro_schema: None,
253 poll_payload_buf: Vec::new(),
254 poll_payload_offsets: Vec::new(),
255 poll_meta_partitions: Vec::new(),
256 poll_meta_offsets: Vec::new(),
257 poll_meta_timestamps: Vec::new(),
258 poll_meta_headers: Vec::new(),
259 }
260 }
261
262 #[must_use]
264 pub fn state(&self) -> ConnectorState {
265 self.state
266 }
267
268 #[must_use]
270 pub fn offsets(&self) -> &OffsetTracker {
271 &self.offsets
272 }
273
274 #[must_use]
276 pub fn channel_len(&self) -> Arc<AtomicUsize> {
277 Arc::clone(&self.channel_len)
278 }
279
280 #[must_use]
282 pub fn rebalance_state(&self) -> Arc<Mutex<RebalanceState>> {
283 Arc::clone(&self.rebalance_state)
284 }
285
286 #[must_use]
288 pub fn has_schema_registry(&self) -> bool {
289 self.schema_registry.is_some()
290 }
291
292 #[must_use]
297 pub fn current_watermark(&self) -> Option<i64> {
298 self.watermark_tracker
299 .as_ref()
300 .and_then(KafkaWatermarkTracker::current_watermark)
301 }
302
303 #[must_use]
308 pub fn event_time_column(&self) -> Option<&str> {
309 self.config.event_time_column.as_deref()
310 }
311
312 #[allow(clippy::too_many_lines)]
321 fn ensure_reader_started(&mut self) {
322 if self.reader_handle.is_some() || self.consumer.is_none() {
323 return;
324 }
325
326 let consumer = Arc::new(self.consumer.take().unwrap());
327 let (msg_tx, msg_rx) =
328 crossfire::mpsc::bounded_async::<KafkaPayload>(self.config.reader_channel_capacity);
329 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
330 let (commit_tx, mut commit_rx) = watch::channel::<Option<TopicPartitionList>>(None);
331 let (hwm_tx, hwm_rx) = tokio::sync::watch::channel(Vec::new());
332 let (seen_tx, seen_rx) = tokio::sync::watch::channel(Vec::<(Arc<str>, i32)>::new());
334 let data_ready = Arc::clone(&self.data_ready);
335 let channel_len = Arc::clone(&self.channel_len);
336 let capture_headers = self.config.include_headers;
337 let reader_channel_capacity = self.config.reader_channel_capacity;
338 let reader_paused = Arc::clone(&self.reader_paused);
339 let revoke_generation = Arc::clone(&self.revoke_generation);
340 let rebalance_state = Arc::clone(&self.rebalance_state);
341 let pause_threshold = self.config.backpressure_high_watermark;
342 let resume_threshold = self.config.backpressure_low_watermark;
343
344 let commit_consumer = Arc::clone(&consumer);
349 let commit_metrics = self.metrics.clone();
350 let commit_handle = tokio::spawn(async move {
351 while commit_rx.changed().await.is_ok() {
352 let Some(tpl) = commit_rx.borrow_and_update().clone() else {
353 continue;
354 };
355 if tpl.count() == 0 {
356 continue;
357 }
358 let start = std::time::Instant::now();
359 let c = Arc::clone(&commit_consumer);
360 let result =
361 tokio::task::spawn_blocking(move || c.commit(&tpl, CommitMode::Sync)).await;
362 commit_metrics.observe_broker_commit_duration(start.elapsed().as_secs_f64());
363
364 match result {
365 Ok(Ok(())) => {}
366 Ok(Err(e)) => {
367 warn!(error = %e, "broker offset commit rejected");
372 }
373 Err(e) => {
374 commit_metrics.commit_failures_panic.inc();
378 warn!(error = %e, "broker offset commit task panicked");
379 }
380 }
381 }
382 });
383
384 let hwm_consumer = Arc::clone(&consumer);
386 let mut hwm_shutdown = shutdown_rx.clone();
387 let hwm_handle = tokio::spawn(async move {
388 let mut timer = tokio::time::interval(std::time::Duration::from_secs(30));
389 timer.tick().await; loop {
392 tokio::select! {
393 biased;
394 _ = hwm_shutdown.changed() => break,
395 _ = timer.tick() => {
396 let partitions: Vec<_> = seen_rx.borrow().clone();
397 if partitions.is_empty() { continue; }
398 let c = Arc::clone(&hwm_consumer);
399 let watermarks = tokio::time::timeout(
400 std::time::Duration::from_secs(10),
401 tokio::task::spawn_blocking(move || {
402 let mut results = Vec::with_capacity(partitions.len());
403 for (topic, partition) in &partitions {
404 match c.fetch_watermarks(topic, *partition, std::time::Duration::from_secs(1)) {
405 Ok((_low, high)) => results.push((Arc::clone(topic), *partition, high)),
406 Err(e) => debug!(%topic, partition, error = %e, "HWM fetch failed"),
407 }
408 }
409 results
410 }),
411 )
412 .await
413 .unwrap_or(Ok(Vec::new()))
414 .unwrap_or_default();
415 if !watermarks.is_empty() {
416 let _ = hwm_tx.send(watermarks);
417 }
418 }
419 }
420 }
421 });
422
423 let vnode_reassign = self
426 .vnode_assignment
427 .as_ref()
428 .map(|(r, s)| (Arc::clone(r), *s));
429 let vnode_topic_meta = self.vnode_topic_meta.clone();
430 let reassign_snapshot = Arc::clone(&self.offset_snapshot);
431 let reassign_default_offset = startup_default_offset(&self.config.startup_mode);
432 let mut reader_shutdown = shutdown_rx;
433 let reader_handle = tokio::spawn(async move {
434 let mut cached_topic: Arc<str> = Arc::from("");
435 let mut seen_partitions: std::collections::HashSet<(Arc<str>, i32)> =
436 std::collections::HashSet::new();
437 let mut is_paused = false;
438 let mut last_revoke_gen: u64 = 0;
439 let mut last_assignment_version = vnode_reassign
442 .as_ref()
443 .map_or(0, |(r, _)| r.assignment_version());
444
445 loop {
446 if let Some((registry, self_id)) = &vnode_reassign {
451 let version = registry.assignment_version();
452 if version != last_assignment_version {
453 last_assignment_version = version;
454 let offsets = lock_or_recover(&reassign_snapshot).clone();
455 let tpl = build_vnode_assignment_tpl(
456 registry,
457 *self_id,
458 &vnode_topic_meta,
459 &offsets,
460 reassign_default_offset,
461 );
462 match consumer.assign(&tpl) {
463 Ok(()) => info!(
464 version,
465 owned_partitions = tpl.count(),
466 "Kafka source re-assigned partitions after vnode rotation"
467 ),
468 Err(e) => warn!(
469 version,
470 error = %e,
471 "Kafka source partition re-assignment failed"
472 ),
473 }
474 }
475 }
476
477 let current_gen = revoke_generation.load(Ordering::Acquire);
479 if current_gen != last_revoke_gen {
480 last_revoke_gen = current_gen;
481 let assigned = lock_or_recover(&rebalance_state)
482 .assigned_partitions()
483 .clone();
484 seen_partitions.retain(|(t, p)| assigned.contains(&(t.to_string(), *p)));
485 let _ = seen_tx.send(seen_partitions.iter().cloned().collect());
486 }
487
488 #[allow(clippy::cast_precision_loss)]
490 let fill = if reader_channel_capacity > 0 {
491 channel_len.load(Ordering::Acquire) as f64 / reader_channel_capacity as f64
492 } else {
493 0.0
494 };
495 if fill >= pause_threshold && !is_paused {
496 if let Ok(assignment) = consumer.assignment() {
497 if consumer.pause(&assignment).is_ok() {
498 is_paused = true;
499 reader_paused.store(true, Ordering::Release);
500 debug!("reader: paused Kafka partitions (fill={fill:.2})");
501 }
502 }
503 } else if fill <= resume_threshold && is_paused {
504 if let Ok(assignment) = consumer.assignment() {
505 if consumer.resume(&assignment).is_ok() {
506 is_paused = false;
507 reader_paused.store(false, Ordering::Release);
508 debug!("reader: resumed Kafka partitions (fill={fill:.2})");
509 }
510 }
511 }
512
513 let msg_result = tokio::select! {
514 biased;
515 _ = reader_shutdown.changed() => break,
516 msg = tokio::time::timeout(
517 std::time::Duration::from_millis(200),
518 consumer.recv(),
519 ) => match msg {
520 Ok(result) => result,
521 Err(_timeout) => continue,
522 },
523 };
524 match msg_result {
525 Ok(msg) => {
526 if let Some(payload) = msg.payload() {
527 let topic = msg.topic();
528 if &*cached_topic != topic {
529 cached_topic = Arc::from(topic);
530 }
531 if seen_partitions.insert((Arc::clone(&cached_topic), msg.partition()))
532 {
533 let _ = seen_tx.send(seen_partitions.iter().cloned().collect());
534 }
535 let timestamp_ms = match msg.timestamp() {
536 rdkafka::Timestamp::CreateTime(ts)
537 | rdkafka::Timestamp::LogAppendTime(ts) => Some(ts),
538 rdkafka::Timestamp::NotAvailable => None,
539 };
540 let headers_json = if capture_headers {
541 use rdkafka::message::Headers;
542 msg.headers().and_then(|hdrs| {
543 let pairs: Vec<(String, serde_json::Value)> = (0..hdrs.count())
544 .map(|i| {
545 let h = hdrs.get(i);
546 let val = match h.value {
547 Some(v) => serde_json::Value::String(
548 String::from_utf8_lossy(v).into_owned(),
549 ),
550 None => serde_json::Value::Null,
551 };
552 (h.key.to_string(), val)
553 })
554 .collect();
555 serde_json::to_string(&pairs).ok()
556 })
557 } else {
558 None
559 };
560 let kp = KafkaPayload {
561 data: payload.to_vec(),
562 topic: Arc::clone(&cached_topic),
563 partition: msg.partition(),
564 offset: msg.offset(),
565 timestamp_ms,
566 headers_json,
567 };
568 match msg_tx.try_send(kp) {
569 Ok(()) => {
570 channel_len.fetch_add(1, Ordering::Relaxed);
571 }
572 Err(crossfire::TrySendError::Full(kp)) => {
573 if !is_paused {
574 if let Ok(assignment) = consumer.assignment() {
575 if consumer.pause(&assignment).is_ok() {
576 is_paused = true;
577 reader_paused.store(true, Ordering::Release);
578 debug!("reader: paused partitions (channel full)");
579 }
580 }
581 }
582 channel_len.fetch_add(1, Ordering::Relaxed);
583 let send_ok = tokio::select! {
584 biased;
585 _ = reader_shutdown.changed() => false,
586 result = msg_tx.send(kp) => result.is_ok(),
587 };
588 if !send_ok {
589 channel_len.fetch_sub(1, Ordering::Relaxed);
590 break;
591 }
592 }
593 Err(crossfire::TrySendError::Disconnected(_)) => break,
594 }
595 data_ready.notify_one();
596 }
597 }
598 Err(e) => {
599 warn!(error = %e, "Kafka consumer error");
600 }
601 }
602 }
603
604 consumer.unsubscribe();
609 });
610
611 self.msg_rx = Some(msg_rx);
612 self.reader_handle = Some(reader_handle);
613 self.commit_handle = Some(commit_handle);
614 self.hwm_handle = Some(hwm_handle);
615 self.reader_shutdown = Some(shutdown_tx);
616 self.commit_tx = Some(commit_tx);
617 self.high_watermarks_rx = Some(hwm_rx);
618 }
619}
620
621fn build_vnode_assignment_tpl(
626 registry: &laminar_core::state::VnodeRegistry,
627 self_id: laminar_core::state::NodeId,
628 topic_meta: &[(Arc<str>, i32)],
629 offsets: &OffsetTracker,
630 default_offset: rdkafka::Offset,
631) -> TopicPartitionList {
632 let mut tpl = TopicPartitionList::new();
633 for (topic, count) in topic_meta {
634 for partition in crate::partition_assignment::owned_partitions(*count, registry, self_id) {
635 let offset = match offsets.get(topic.as_ref(), partition) {
636 Some(o) => rdkafka::Offset::Offset(o + 1),
637 None => default_offset,
638 };
639 if let Err(e) = tpl.add_partition_offset(topic.as_ref(), partition, offset) {
640 warn!(
641 topic = %topic, partition, error = %e,
642 "failed to add vnode-owned partition to assignment"
643 );
644 }
645 }
646 }
647 tpl
648}
649
650fn startup_default_offset(mode: &StartupMode) -> rdkafka::Offset {
653 match mode {
654 StartupMode::Earliest => rdkafka::Offset::Beginning,
655 StartupMode::Latest => rdkafka::Offset::End,
656 _ => rdkafka::Offset::Stored,
660 }
661}
662
663#[async_trait]
664#[allow(clippy::too_many_lines)] impl SourceConnector for KafkaSource {
666 fn set_vnode_assignment(
667 &mut self,
668 registry: Arc<laminar_core::state::VnodeRegistry>,
669 self_id: laminar_core::state::NodeId,
670 ) {
671 info!(
672 self_id = self_id.0,
673 vnode_count = registry.vnode_count(),
674 "Kafka source: engine-controlled partition→vnode assignment enabled"
675 );
676 self.vnode_assignment = Some((registry, self_id));
677 }
678
679 async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
680 self.state = ConnectorState::Initializing;
681
682 let kafka_config = if config.properties().is_empty() {
684 self.config.clone()
685 } else {
686 let parsed = KafkaSourceConfig::from_config(config)?;
687 self.config = parsed.clone();
688 parsed
689 };
690
691 if let Some(sr_client) = Self::build_sr_client(&kafka_config)? {
693 let sr = Arc::new(sr_client);
694 self.schema_registry = Some(Arc::clone(&sr));
695 self.deserializer = if kafka_config.format == Format::Avro {
696 Box::new(AvroDeserializer::with_schema_registry(sr))
697 } else {
698 select_deserializer(kafka_config.format)
699 };
700 } else if let Some(ref sr) = self.schema_registry {
701 self.deserializer = if kafka_config.format == Format::Avro {
703 Box::new(AvroDeserializer::with_schema_registry(Arc::clone(sr)))
704 } else {
705 select_deserializer(kafka_config.format)
706 };
707 } else {
708 self.deserializer = select_deserializer(kafka_config.format);
709 }
710
711 self.last_avro_schema = None;
713
714 if let Some(schema) = config.arrow_schema() {
716 info!(
717 fields = schema.fields().len(),
718 "using SQL-defined schema for deserialization"
719 );
720 self.schema = schema;
721 }
722
723 info!(
724 brokers = %kafka_config.bootstrap_servers,
725 subscription = ?kafka_config.subscription,
726 group_id = %kafka_config.group_id,
727 format = %kafka_config.format,
728 schema_fields = self.schema.fields().len(),
729 "opening Kafka source connector"
730 );
731
732 let rdkafka_config: ClientConfig = kafka_config.to_rdkafka_config();
734 let context = LaminarConsumerContext::new(
735 Arc::clone(&self.checkpoint_request),
736 Arc::clone(&self.rebalance_state),
737 Arc::clone(&self.rebalance_counter),
738 Arc::clone(&self.revoke_generation),
739 Arc::clone(&self.reader_paused),
740 Arc::clone(&self.offset_snapshot),
741 self.metrics.commits.clone(),
745 self.metrics.commit_failures_rejected.clone(),
746 );
747 let consumer: StreamConsumer<LaminarConsumerContext> =
748 rdkafka_config.create_with_context(context).map_err(|e| {
749 ConnectorError::ConnectionFailed(format!("failed to create consumer: {e}"))
750 })?;
751
752 let vnode = self
759 .vnode_assignment
760 .as_ref()
761 .map(|(r, s)| (Arc::clone(r), *s));
762 let vnode_assigned = if let Some((registry, self_id)) = vnode {
763 if let TopicSubscription::Topics(topics) = &kafka_config.subscription {
764 let mut topic_meta: Vec<(Arc<str>, i32)> = Vec::with_capacity(topics.len());
765 for topic in topics {
766 let md = consumer
767 .fetch_metadata(Some(topic), std::time::Duration::from_secs(10))
768 .map_err(|e| {
769 ConnectorError::ConnectionFailed(format!(
770 "metadata fetch for '{topic}': {e}"
771 ))
772 })?;
773 let count = md
774 .topics()
775 .iter()
776 .find(|t| t.name() == topic.as_str())
777 .map_or(0usize, |t| t.partitions().len());
778 topic_meta.push((
779 Arc::from(topic.as_str()),
780 i32::try_from(count).unwrap_or(i32::MAX),
781 ));
782 }
783 let default_offset = startup_default_offset(&kafka_config.startup_mode);
784 let tpl = build_vnode_assignment_tpl(
785 ®istry,
786 self_id,
787 &topic_meta,
788 &self.offsets,
789 default_offset,
790 );
791 let owned = tpl.count();
792 consumer.assign(&tpl).map_err(|e| {
793 ConnectorError::ConnectionFailed(format!("vnode partition assign failed: {e}"))
794 })?;
795 self.vnode_topic_meta = topic_meta;
796 info!(
797 owned_partitions = owned,
798 "Kafka source assigned vnode-owned partitions (engine-controlled)"
799 );
800 true
801 } else {
802 warn!(
803 "vnode-aware assignment is unsupported with topic patterns — \
804 falling back to consumer-group subscribe()"
805 );
806 false
807 }
808 } else {
809 false
810 };
811
812 if !vnode_assigned {
815 match &kafka_config.subscription {
816 TopicSubscription::Topics(topics) => {
817 let topic_refs: Vec<&str> = topics.iter().map(String::as_str).collect();
818 consumer.subscribe(&topic_refs).map_err(|e| {
819 ConnectorError::ConnectionFailed(format!("failed to subscribe: {e}"))
820 })?;
821 }
822 TopicSubscription::Pattern(pattern) => {
823 let regex_pattern = if pattern.starts_with('^') {
825 pattern.clone()
826 } else {
827 format!("^{pattern}")
828 };
829 consumer.subscribe(&[®ex_pattern]).map_err(|e| {
830 ConnectorError::ConnectionFailed(format!(
831 "failed to subscribe to pattern: {e}"
832 ))
833 })?;
834 }
835 }
836
837 match &kafka_config.startup_mode {
839 StartupMode::GroupOffsets | StartupMode::Earliest | StartupMode::Latest => {}
841 StartupMode::SpecificOffsets(offsets) => {
842 let mut tpl = rdkafka::TopicPartitionList::new();
843 let topics = match &kafka_config.subscription {
844 TopicSubscription::Topics(t) => t.clone(),
845 TopicSubscription::Pattern(_) => Vec::new(),
846 };
847 for topic in &topics {
848 for (&partition, &offset) in offsets {
849 if let Err(e) = tpl.add_partition_offset(
850 topic,
851 partition,
852 rdkafka::Offset::Offset(offset),
853 ) {
854 tracing::warn!(
855 %topic, partition, offset,
856 error = %e,
857 "failed to add specific offset to partition list"
858 );
859 }
860 }
861 }
862 if tpl.count() > 0 {
863 consumer.assign(&tpl).map_err(|e| {
864 ConnectorError::ConnectionFailed(format!(
865 "failed to assign specific offsets: {e}"
866 ))
867 })?;
868 info!(
869 partition_count = tpl.count(),
870 "assigned consumer to specific offsets"
871 );
872 }
873 }
874 StartupMode::Timestamp(ts_ms) => {
875 let mut tpl = rdkafka::TopicPartitionList::new();
879 let topics = match &kafka_config.subscription {
880 TopicSubscription::Topics(t) => t.clone(),
881 TopicSubscription::Pattern(_) => Vec::new(),
882 };
883 if let Ok(metadata) = consumer.fetch_metadata(
885 topics.first().map(String::as_str),
886 std::time::Duration::from_secs(10),
887 ) {
888 for topic_meta in metadata.topics() {
889 for partition_meta in topic_meta.partitions() {
890 if let Err(e) = tpl.add_partition_offset(
891 topic_meta.name(),
892 partition_meta.id(),
893 rdkafka::Offset::Offset(*ts_ms),
894 ) {
895 tracing::warn!(
896 topic = topic_meta.name(),
897 partition = partition_meta.id(),
898 error = %e,
899 "failed to add timestamp offset to partition list"
900 );
901 }
902 }
903 }
904 }
905 if tpl.count() > 0 {
906 match consumer.offsets_for_times(tpl, std::time::Duration::from_secs(10)) {
907 Ok(resolved) => {
908 consumer.assign(&resolved).map_err(|e| {
909 ConnectorError::ConnectionFailed(format!(
910 "failed to assign timestamp offsets: {e}"
911 ))
912 })?;
913 info!(
914 timestamp_ms = ts_ms,
915 partition_count = resolved.count(),
916 "assigned consumer to timestamp offsets"
917 );
918 }
919 Err(e) => {
920 warn!(
921 error = %e,
922 timestamp_ms = ts_ms,
923 "failed to resolve timestamp offsets, falling back to group offsets"
924 );
925 }
926 }
927 }
928 }
929 }
930 } self.consumer = Some(consumer);
933 self.state = ConnectorState::Running;
934
935 if let Some(ref sr) = self.schema_registry {
945 if let TopicSubscription::Topics(topics) = &kafka_config.subscription {
946 if topics.len() > 1 {
947 warn!("multiple topics with schema registry — using first topic's schema");
948 }
949 if let Some(topic) = topics.first() {
950 let subject = resolve_value_subject(
951 kafka_config.schema_registry_subject_strategy,
952 kafka_config.schema_registry_record_name.as_deref(),
953 topic,
954 );
955 match tokio::time::timeout(
956 kafka_config.schema_registry_discovery_timeout,
957 sr.get_latest_schema(&subject),
958 )
959 .await
960 {
961 Ok(Ok(cached)) => {
962 if let Some(avro_deser) = self
963 .deserializer
964 .as_any_mut()
965 .and_then(|any| any.downcast_mut::<AvroDeserializer>())
966 {
967 if let Err(e) =
968 avro_deser.register_schema(cached.id, &cached.schema_str)
969 {
970 warn!(%subject, error = %e, "SR schema register failed");
971 } else {
972 log_schema_drift(&self.schema, &cached.arrow_schema, &subject);
975 info!(%subject, schema_id = cached.id,
976 "SR schema fetched at open()");
977 self.last_avro_schema = Some(cached.arrow_schema);
978 }
979 }
980 }
981 Ok(Err(e)) => {
982 warn!(%subject, error = %e, "SR unavailable at open(), will resolve lazily");
983 }
984 Err(_elapsed) => {
985 warn!(%subject, "SR prefetch timed out at open(), will resolve lazily");
986 }
987 }
988 }
989 }
990 }
991
992 info!("Kafka source connector opened successfully");
993 Ok(())
994 }
995
996 async fn discover_schema(
997 &mut self,
998 properties: &std::collections::HashMap<String, String>,
999 ) -> Result<(), ConnectorError> {
1000 let cfg = crate::config::ConnectorConfig::with_properties("kafka", properties.clone());
1001 let kafka_config = KafkaSourceConfig::from_config(&cfg)?;
1002 if kafka_config.format != Format::Avro {
1003 return Ok(());
1004 }
1005
1006 let topic = match &kafka_config.subscription {
1007 TopicSubscription::Topics(topics) => match topics.first() {
1008 Some(t) => {
1009 if topics.len() > 1 {
1010 warn!(topics = ?topics, chosen = %t,
1011 "multi-topic source: using first topic's SR schema");
1012 }
1013 t.clone()
1014 }
1015 None => return Ok(()),
1016 },
1017 TopicSubscription::Pattern(pattern) => {
1018 return Err(ConnectorError::ConfigurationError(format!(
1019 "topic.pattern '{pattern}' cannot auto-discover a schema; \
1020 declare columns explicitly"
1021 )));
1022 }
1023 };
1024
1025 let Some(sr_client) = Self::build_sr_client(&kafka_config)? else {
1026 return Ok(());
1027 };
1028
1029 let subject = resolve_value_subject(
1030 kafka_config.schema_registry_subject_strategy,
1031 kafka_config.schema_registry_record_name.as_deref(),
1032 &topic,
1033 );
1034 let timeout = kafka_config.schema_registry_discovery_timeout;
1035
1036 match tokio::time::timeout(timeout, sr_client.get_latest_schema(&subject)).await {
1037 Ok(Ok(cached)) => {
1038 self.metrics.record_sr_discovery_success();
1039 info!(%subject, schema_id = cached.id,
1040 fields = cached.arrow_schema.fields().len(),
1041 "discovered Avro schema from Schema Registry");
1042 self.schema = cached.arrow_schema;
1043 Ok(())
1044 }
1045 Ok(Err(e)) => {
1046 self.metrics.record_sr_discovery_failure();
1047 Err(ConnectorError::ConnectionFailed(format!(
1048 "Schema Registry lookup failed for subject '{subject}': {e}"
1049 )))
1050 }
1051 Err(_) => {
1052 self.metrics.record_sr_discovery_timeout();
1053 Err(ConnectorError::Timeout(
1054 u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX),
1055 ))
1056 }
1057 }
1058 }
1059
1060 #[allow(clippy::cast_possible_truncation)] async fn poll_batch(
1062 &mut self,
1063 max_records: usize,
1064 ) -> Result<Option<SourceBatch>, ConnectorError> {
1065 if self.state != ConnectorState::Running {
1066 return Err(ConnectorError::InvalidState {
1067 expected: "Running".into(),
1068 actual: self.state.to_string(),
1069 });
1070 }
1071
1072 self.ensure_reader_started();
1074
1075 let rx = self
1076 .msg_rx
1077 .as_mut()
1078 .ok_or_else(|| ConnectorError::InvalidState {
1079 expected: "reader initialized".into(),
1080 actual: "reader is None".into(),
1081 })?;
1082
1083 let limit = max_records.min(self.config.max_poll_records);
1084
1085 self.poll_payload_buf.clear();
1087 self.poll_payload_offsets.clear();
1088 self.poll_meta_partitions.clear();
1089 self.poll_meta_offsets.clear();
1090 self.poll_meta_timestamps.clear();
1091 self.poll_meta_headers.clear();
1092
1093 let mut total_bytes: u64 = 0;
1094 let mut last_topic = String::new();
1095 let mut last_partition_id: i32 = 0;
1096 let mut last_offset: i64 = -1;
1097 let include_metadata = self.config.include_metadata;
1098 let include_headers = self.config.include_headers;
1099
1100 while self.poll_payload_offsets.len() < limit {
1101 match rx.try_recv() {
1102 Ok(kp) => {
1103 self.channel_len.fetch_sub(1, Ordering::Release);
1104 total_bytes += kp.data.len() as u64;
1105 let start = self.poll_payload_buf.len();
1106 self.poll_payload_buf.extend_from_slice(&kp.data);
1107 self.poll_payload_offsets.push((start, kp.data.len()));
1108
1109 self.offsets.update_arc(&kp.topic, kp.partition, kp.offset);
1110
1111 if include_metadata {
1112 self.poll_meta_partitions.push(kp.partition);
1113 self.poll_meta_offsets.push(kp.offset);
1114 self.poll_meta_timestamps.push(kp.timestamp_ms);
1115 }
1116 if include_headers {
1117 self.poll_meta_headers.push(kp.headers_json);
1118 }
1119
1120 if let Some(ref mut tracker) = self.watermark_tracker {
1122 if let Some(ts) = kp.timestamp_ms {
1123 tracker.update_partition(kp.partition, ts);
1124 }
1125 }
1126
1127 if last_topic.as_str() != &*kp.topic || last_partition_id != kp.partition {
1128 last_topic = kp.topic.to_string();
1129 last_partition_id = kp.partition;
1130 }
1131 last_offset = kp.offset;
1132 }
1133 Err(crossfire::TryRecvError::Empty) => break,
1134 Err(crossfire::TryRecvError::Disconnected) => {
1135 self.state = ConnectorState::Failed;
1136 return Err(ConnectorError::Internal(
1137 "Kafka reader task exited unexpectedly".into(),
1138 ));
1139 }
1140 }
1141 }
1142
1143 if let Some(ref mut tracker) = self.watermark_tracker {
1145 tracker.check_idle_partitions();
1146 }
1147
1148 let rebalance_events = self.rebalance_counter.swap(0, Ordering::Relaxed);
1150 for _ in 0..rebalance_events {
1151 self.metrics.record_rebalance();
1152 }
1153
1154 let current_revoke_gen = self.revoke_generation.load(Ordering::Acquire);
1157 let had_revoke = current_revoke_gen != self.last_seen_revoke_gen;
1158 if had_revoke {
1159 self.last_seen_revoke_gen = current_revoke_gen;
1160 let assigned = lock_or_recover(&self.rebalance_state)
1162 .assigned_partitions()
1163 .clone();
1164 let before = self.offsets.partition_count();
1165 self.offsets.retain_assigned(&assigned);
1166 let after = self.offsets.partition_count();
1167 if before != after {
1168 debug!(
1169 before,
1170 after, "purged revoked partition offsets after rebalance"
1171 );
1172 }
1173 }
1174
1175 if had_revoke || !self.poll_payload_offsets.is_empty() {
1179 lock_or_recover(&self.offset_snapshot).clone_from(&self.offsets);
1180 }
1181
1182 if let Some(ref hwm_rx) = self.high_watermarks_rx {
1185 let watermarks = hwm_rx.borrow();
1186 let mut total_lag: u64 = 0;
1187 for (topic, partition, high_watermark) in watermarks.iter() {
1188 if let Some(current_offset) = self.offsets.get(topic, *partition) {
1189 let lag = high_watermark.saturating_sub(current_offset + 1);
1190 #[allow(clippy::cast_sign_loss)]
1191 if lag > 0 {
1192 total_lag += lag as u64;
1193 }
1194 }
1195 }
1196 self.metrics.set_lag(total_lag);
1197 }
1198
1199 if self.poll_payload_offsets.is_empty() {
1200 return Ok(None);
1201 }
1202
1203 let last_partition = if last_offset >= 0 {
1207 Some(PartitionInfo::new(
1208 format!("{last_topic}-{last_partition_id}"),
1209 last_offset.to_string(),
1210 ))
1211 } else {
1212 None
1213 };
1214
1215 if let Some(avro_deser) = self
1218 .deserializer
1219 .as_any_mut()
1220 .and_then(|any| any.downcast_mut::<AvroDeserializer>())
1221 {
1222 let mut new_schema_ids = Vec::new();
1223 for &(start, len) in &self.poll_payload_offsets {
1224 if let Some(schema_id) = AvroDeserializer::extract_confluent_id(
1225 &self.poll_payload_buf[start..start + len],
1226 ) {
1227 let is_new = avro_deser
1228 .ensure_schema_registered(schema_id)
1229 .await
1230 .map_err(ConnectorError::Serde)?;
1231 if is_new {
1232 new_schema_ids.push(schema_id);
1233 }
1234 }
1235 }
1236
1237 if !new_schema_ids.is_empty()
1239 && self.config.schema_evolution_strategy != SchemaEvolutionStrategy::Ignore
1240 {
1241 if let Some(ref sr) = self.schema_registry {
1242 let compat = self
1243 .config
1244 .schema_compatibility
1245 .map_or(CompatibilityMode::Backward, CompatibilityMode::from);
1246 let evolver = SchemaEvolution::new(compat);
1247
1248 for id in new_schema_ids {
1249 let cached = sr.resolve_confluent_id(id).await.map_err(|e| {
1250 ConnectorError::SchemaMismatch(format!(
1251 "failed to resolve schema {id}: {e}"
1252 ))
1253 })?;
1254
1255 let Some(ref prev) = self.last_avro_schema else {
1256 info!(schema_id = id, "initial Avro schema registered");
1258 self.last_avro_schema = Some(Arc::clone(&cached.arrow_schema));
1259 continue;
1260 };
1261
1262 let changes = evolver.diff_schemas(prev, &cached.arrow_schema);
1263 self.last_avro_schema = Some(Arc::clone(&cached.arrow_schema));
1264
1265 if changes.is_empty() {
1266 info!(
1267 schema_id = id,
1268 "new Avro schema ID registered, no field changes"
1269 );
1270 continue;
1271 }
1272 let verdict = evolver.evaluate_evolution(&changes);
1273 match &verdict {
1274 EvolutionVerdict::Compatible => {
1275 info!(schema_id = id, ?changes, "schema evolved (compatible)");
1276 }
1277 EvolutionVerdict::RequiresMigration => {
1278 warn!(
1279 schema_id = id,
1280 ?changes,
1281 "schema evolved (requires migration)"
1282 );
1283 }
1284 EvolutionVerdict::Incompatible(reason) => {
1285 if self.config.schema_evolution_strategy
1286 == SchemaEvolutionStrategy::Reject
1287 {
1288 return Err(ConnectorError::SchemaMismatch(format!(
1289 "incompatible schema evolution for ID {id}: {reason}"
1290 )));
1291 }
1292 warn!(
1293 schema_id = id, %reason, ?changes,
1294 "incompatible schema evolution detected"
1295 );
1296 }
1297 }
1298 }
1299 }
1300 }
1301 }
1302
1303 let refs: Vec<&[u8]> = self
1304 .poll_payload_offsets
1305 .iter()
1306 .map(|&(start, len)| &self.poll_payload_buf[start..start + len])
1307 .collect();
1308
1309 let (batch, good_indices) = match self.deserializer.deserialize_batch(&refs, &self.schema) {
1312 Ok(batch) => (batch, None),
1313 Err(batch_err) => {
1314 let mut good_batches = Vec::with_capacity(refs.len());
1319 let mut good_idx = Vec::with_capacity(refs.len());
1320 let mut error_count = 0u64;
1321 for (i, r) in refs.iter().enumerate() {
1322 match self
1323 .deserializer
1324 .deserialize_batch(std::slice::from_ref(r), &self.schema)
1325 {
1326 Ok(batch) => {
1327 good_batches.push(batch);
1328 good_idx.push(i);
1329 }
1330 Err(e) => {
1331 error_count += 1;
1332 self.metrics.record_error();
1333 warn!(error = %e, "skipping poison pill record");
1334 }
1335 }
1336 }
1337 if good_batches.is_empty() {
1338 return Err(ConnectorError::Serde(batch_err));
1339 }
1340 #[allow(clippy::cast_precision_loss)]
1342 if error_count > 0 {
1343 let error_rate = error_count as f64 / refs.len() as f64;
1344 if error_rate > self.config.max_deser_error_rate {
1345 return Err(ConnectorError::Serde(batch_err));
1346 }
1347 warn!(
1348 skipped = error_count,
1349 total = refs.len(),
1350 error_rate = %format_args!("{error_rate:.1}"),
1351 "deserialized batch with poison pill isolation"
1352 );
1353 }
1354 let concat_schema = good_batches[0].schema();
1355 let batch = arrow_select::concat::concat_batches(&concat_schema, &good_batches)
1356 .map_err(|e| {
1357 ConnectorError::Internal(format!("failed to concat batches: {e}"))
1358 })?;
1359 (batch, Some(good_idx))
1360 }
1361 };
1362
1363 if let Some(ref idx) = good_indices {
1366 if include_metadata {
1367 self.poll_meta_partitions =
1368 idx.iter().map(|&i| self.poll_meta_partitions[i]).collect();
1369 self.poll_meta_offsets = idx.iter().map(|&i| self.poll_meta_offsets[i]).collect();
1370 self.poll_meta_timestamps =
1371 idx.iter().map(|&i| self.poll_meta_timestamps[i]).collect();
1372 }
1373 if include_headers {
1374 self.poll_meta_headers = idx
1375 .iter()
1376 .map(|&i| std::mem::take(&mut self.poll_meta_headers[i]))
1377 .collect();
1378 }
1379 }
1380
1381 let needs_meta = include_metadata && !self.poll_meta_partitions.is_empty();
1383 let needs_headers = include_headers && !self.poll_meta_headers.is_empty();
1384 let batch = if needs_meta || needs_headers {
1385 use arrow_schema::{DataType, Field};
1386
1387 let mut fields = batch.schema().fields().to_vec();
1388 let mut columns: Vec<Arc<dyn arrow_array::Array>> = batch.columns().to_vec();
1389
1390 if needs_meta {
1391 use arrow_array::{Int32Array, Int64Array, TimestampMillisecondArray};
1392 use arrow_schema::TimeUnit;
1393 fields.push(Arc::new(Field::new("_partition", DataType::Int32, false)));
1394 columns.push(Arc::new(Int32Array::from(std::mem::take(
1395 &mut self.poll_meta_partitions,
1396 ))));
1397 fields.push(Arc::new(Field::new("_offset", DataType::Int64, false)));
1398 columns.push(Arc::new(Int64Array::from(std::mem::take(
1399 &mut self.poll_meta_offsets,
1400 ))));
1401 fields.push(Arc::new(Field::new(
1402 "_timestamp",
1403 DataType::Timestamp(TimeUnit::Millisecond, None),
1404 true,
1405 )));
1406 columns.push(Arc::new(TimestampMillisecondArray::from(std::mem::take(
1407 &mut self.poll_meta_timestamps,
1408 ))));
1409 }
1410 if needs_headers {
1411 fields.push(Arc::new(Field::new("_headers", DataType::Utf8, true)));
1412 columns.push(Arc::new(arrow_array::StringArray::from(std::mem::take(
1413 &mut self.poll_meta_headers,
1414 ))));
1415 }
1416
1417 let meta_schema = Arc::new(arrow_schema::Schema::new(fields));
1418 arrow_array::RecordBatch::try_new(meta_schema, columns).map_err(|e| {
1419 ConnectorError::Internal(format!("failed to append metadata columns: {e}"))
1420 })?
1421 } else {
1422 batch
1423 };
1424
1425 let num_rows = batch.num_rows();
1426 self.metrics.record_poll(num_rows as u64, total_bytes);
1427
1428 let source_batch = if let Some(partition) = last_partition {
1429 SourceBatch::with_partition(batch, partition)
1430 } else {
1431 SourceBatch::new(batch)
1432 };
1433
1434 debug!(
1435 records = num_rows,
1436 bytes = total_bytes,
1437 "polled batch from Kafka"
1438 );
1439
1440 Ok(Some(source_batch))
1441 }
1442
1443 fn schema(&self) -> SchemaRef {
1444 self.schema.clone()
1445 }
1446
1447 fn checkpoint(&self) -> SourceCheckpoint {
1448 let assigned = lock_or_recover(&self.rebalance_state)
1449 .assigned_partitions()
1450 .clone();
1451 self.offsets.to_checkpoint_filtered(&assigned)
1452 }
1453
1454 async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
1461 info!(
1462 epoch = checkpoint.epoch(),
1463 partition_count = checkpoint.offsets().len(),
1464 "staging checkpointed offsets for seek-on-assign"
1465 );
1466
1467 self.offsets = OffsetTracker::from_checkpoint(checkpoint);
1468 match self.offset_snapshot.lock() {
1469 Ok(mut snapshot) => snapshot.clone_from(&self.offsets),
1470 Err(poisoned) => poisoned.into_inner().clone_from(&self.offsets),
1471 }
1472
1473 Ok(())
1474 }
1475
1476 fn data_ready_notify(&self) -> Option<Arc<Notify>> {
1477 Some(Arc::clone(&self.data_ready))
1478 }
1479
1480 fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> {
1481 Some(Arc::clone(&self.checkpoint_request))
1482 }
1483
1484 async fn notify_epoch_committed(
1485 &mut self,
1486 _epoch: u64,
1487 checkpoint: &SourceCheckpoint,
1488 ) -> Result<(), ConnectorError> {
1489 if !self.config.broker_commit_on_checkpoint || checkpoint.is_empty() {
1490 return Ok(());
1491 }
1492 let Some(ref tx) = self.commit_tx else {
1493 return Ok(());
1494 };
1495 let tpl = OffsetTracker::from_checkpoint(checkpoint).to_topic_partition_list();
1496 if tpl.count() == 0 {
1497 return Ok(());
1498 }
1499 if tx.send(Some(tpl)).is_err() {
1500 self.metrics.commit_failures_enqueue_dropped.inc();
1501 }
1502 Ok(())
1503 }
1504
1505 async fn close(&mut self) -> Result<(), ConnectorError> {
1506 info!("closing Kafka source connector");
1507
1508 self.commit_tx = None;
1514
1515 if let Some(tx) = self.reader_shutdown.take() {
1517 let _ = tx.send(true);
1518 }
1519 let timeout = std::time::Duration::from_secs(5);
1520 if let Some(handle) = self.reader_handle.take() {
1521 let _ = tokio::time::timeout(timeout, handle).await;
1522 }
1523 if let Some(handle) = self.commit_handle.take() {
1524 let _ = tokio::time::timeout(timeout, handle).await;
1525 }
1526 if let Some(handle) = self.hwm_handle.take() {
1527 let _ = tokio::time::timeout(timeout, handle).await;
1528 }
1529 self.msg_rx = None;
1530
1531 if let Some(ref consumer) = self.consumer {
1535 consumer.unsubscribe();
1536 }
1537
1538 self.consumer = None;
1539 self.state = ConnectorState::Closed;
1540 info!("Kafka source connector closed");
1541 Ok(())
1542 }
1543}
1544
1545impl std::fmt::Debug for KafkaSource {
1546 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1547 f.debug_struct("KafkaSource")
1548 .field("state", &self.state)
1549 .field("subscription", &self.config.subscription)
1550 .field("group_id", &self.config.group_id)
1551 .field("format", &self.config.format)
1552 .field("partitions", &self.offsets.partition_count())
1553 .finish_non_exhaustive()
1554 }
1555}
1556
1557fn log_schema_drift(declared: &arrow_schema::Schema, live: &arrow_schema::Schema, subject: &str) {
1560 if declared.fields().is_empty() || declared.fields() == live.fields() {
1561 return;
1562 }
1563 let decl: BTreeSet<&str> = declared
1564 .fields()
1565 .iter()
1566 .map(|f| f.name().as_str())
1567 .collect();
1568 let lv: BTreeSet<&str> = live.fields().iter().map(|f| f.name().as_str()).collect();
1569 warn!(
1570 %subject,
1571 missing_in_sr = ?decl.difference(&lv).collect::<Vec<_>>(),
1572 added_in_sr = ?lv.difference(&decl).collect::<Vec<_>>(),
1573 "schema drift: re-apply CREATE SOURCE DDL to pick up the current SR schema"
1574 );
1575}
1576
1577fn select_deserializer(format: Format) -> Box<dyn RecordDeserializer> {
1578 match format {
1579 Format::Avro => Box::new(AvroDeserializer::new()),
1580 other => serde::create_deserializer(other).unwrap_or_else(|_| {
1581 warn!(format = %other, "unsupported format, falling back to JSON");
1582 Box::new(serde::json::JsonDeserializer::new())
1583 }),
1584 }
1585}
1586
1587#[cfg(test)]
1588mod tests {
1589 use super::*;
1590 use arrow_schema::{DataType, Field, Schema};
1591
1592 fn test_schema() -> SchemaRef {
1593 Arc::new(Schema::new(vec![
1594 Field::new("id", DataType::Int64, false),
1595 Field::new("value", DataType::Utf8, false),
1596 ]))
1597 }
1598
1599 fn test_config() -> KafkaSourceConfig {
1600 let mut cfg = KafkaSourceConfig::default();
1601 cfg.bootstrap_servers = "localhost:9092".into();
1602 cfg.group_id = "test-group".into();
1603 cfg.subscription = TopicSubscription::Topics(vec!["events".into()]);
1604 cfg
1605 }
1606
1607 #[test]
1608 fn test_new_defaults() {
1609 let source = KafkaSource::new(test_schema(), test_config(), None);
1610 assert_eq!(source.state(), ConnectorState::Created);
1611 assert!(source.consumer.is_none());
1612 assert_eq!(source.offsets().partition_count(), 0);
1613 }
1614
1615 #[test]
1616 fn test_schema_returned() {
1617 let schema = test_schema();
1618 let source = KafkaSource::new(schema.clone(), test_config(), None);
1619 assert_eq!(source.schema(), schema);
1620 }
1621
1622 #[test]
1623 fn test_checkpoint_empty() {
1624 let source = KafkaSource::new(test_schema(), test_config(), None);
1625 let cp = source.checkpoint();
1626 assert!(cp.is_empty());
1627 }
1628
1629 #[test]
1630 fn test_checkpoint_with_offsets() {
1631 let mut source = KafkaSource::new(test_schema(), test_config(), None);
1632 source.offsets.update("events", 0, 100);
1633 source.offsets.update("events", 1, 200);
1634
1635 {
1637 let mut state = source.rebalance_state.lock().unwrap();
1638 state.on_assign(&[("events".into(), 0), ("events".into(), 1)]);
1639 }
1640
1641 let cp = source.checkpoint();
1642 assert_eq!(cp.get_offset("events-0"), Some("100"));
1643 assert_eq!(cp.get_offset("events-1"), Some("200"));
1644 }
1645
1646 #[test]
1647 fn test_deserializer_selection_json() {
1648 let source = KafkaSource::new(test_schema(), test_config(), None);
1649 assert_eq!(source.deserializer.format(), Format::Json);
1650 }
1651
1652 #[test]
1653 fn test_deserializer_selection_csv() {
1654 let mut cfg = test_config();
1655 cfg.format = Format::Csv;
1656 let source = KafkaSource::new(test_schema(), cfg, None);
1657 assert_eq!(source.deserializer.format(), Format::Csv);
1658 }
1659
1660 #[test]
1661 fn test_with_schema_registry() {
1662 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1663 let mut cfg = test_config();
1664 cfg.format = Format::Avro;
1665 cfg.schema_registry_url = Some("http://localhost:8081".into());
1666
1667 let source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1668 assert!(source.schema_registry.is_some());
1669 assert_eq!(source.deserializer.format(), Format::Avro);
1670 }
1671
1672 #[tokio::test]
1673 async fn test_open_preserves_injected_schema_registry() {
1674 let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1675 let mut cfg = test_config();
1676 cfg.format = Format::Avro;
1677 cfg.schema_registry_url = Some("http://localhost:8081".into());
1678 let mut source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1679
1680 let empty_config = crate::config::ConnectorConfig::new("kafka");
1682 let _ = source.open(&empty_config).await;
1685 assert!(source.schema_registry.is_some());
1686 assert_eq!(source.deserializer.format(), Format::Avro);
1687 }
1688
1689 #[test]
1690 fn test_debug_output() {
1691 let source = KafkaSource::new(test_schema(), test_config(), None);
1692 let debug = format!("{source:?}");
1693 assert!(debug.contains("KafkaSource"));
1694 assert!(debug.contains("events"));
1695 }
1696
1697 #[test]
1698 fn test_checkpoint_filters_revoked_partitions() {
1699 let mut source = KafkaSource::new(test_schema(), test_config(), None);
1700 source.offsets.update("events", 0, 100);
1701 source.offsets.update("events", 1, 200);
1702 source.offsets.update("events", 2, 300);
1703
1704 {
1706 let mut state = source.rebalance_state.lock().unwrap();
1707 state.on_assign(&[("events".into(), 0), ("events".into(), 2)]);
1708 }
1709
1710 let cp = source.checkpoint();
1711 assert_eq!(cp.get_offset("events-0"), Some("100"));
1712 assert_eq!(cp.get_offset("events-1"), None); assert_eq!(cp.get_offset("events-2"), Some("300"));
1714 }
1715
1716 #[test]
1717 fn test_checkpoint_empty_before_first_rebalance() {
1718 let mut source = KafkaSource::new(test_schema(), test_config(), None);
1719 source.offsets.update("events", 0, 100);
1720 source.offsets.update("events", 1, 200);
1721
1722 let cp = source.checkpoint();
1725 assert!(cp.is_empty());
1726 }
1727
1728 fn empty_schema() -> SchemaRef {
1733 Arc::new(Schema::empty())
1734 }
1735
1736 fn props(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
1737 pairs
1738 .iter()
1739 .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
1740 .collect()
1741 }
1742
1743 #[tokio::test]
1744 async fn discover_schema_skips_non_avro_format() {
1745 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1746 source
1747 .discover_schema(&props(&[
1748 ("bootstrap.servers", "localhost:9092"),
1749 ("group.id", "g"),
1750 ("topic", "t"),
1751 ("format", "json"),
1752 ("schema.registry.url", "http://localhost:8081"),
1753 ]))
1754 .await
1755 .expect("non-avro format is a legitimate skip");
1756 assert_eq!(source.schema().fields().len(), 0);
1757 }
1758
1759 #[tokio::test]
1760 async fn discover_schema_errors_on_avro_without_sr_url() {
1761 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1762 let err = source
1763 .discover_schema(&props(&[
1764 ("bootstrap.servers", "localhost:9092"),
1765 ("group.id", "g"),
1766 ("topic", "t"),
1767 ("format", "avro"),
1768 ]))
1769 .await
1770 .expect_err("avro without schema.registry.url must surface a configuration error");
1771 let msg = err.to_string();
1772 assert!(
1773 msg.contains("schema.registry.url"),
1774 "error must name the missing key, got: {msg}"
1775 );
1776 assert_eq!(source.schema().fields().len(), 0);
1777 }
1778
1779 #[tokio::test]
1780 async fn discover_schema_errors_on_topic_pattern() {
1781 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1782 let err = source
1783 .discover_schema(&props(&[
1784 ("bootstrap.servers", "localhost:9092"),
1785 ("group.id", "g"),
1786 ("topic.pattern", "events-.*"),
1787 ("format", "avro"),
1788 ("schema.registry.url", "http://localhost:8081"),
1789 ]))
1790 .await
1791 .expect_err("topic.pattern + avro must surface a configuration error");
1792 let msg = err.to_string();
1793 assert!(
1794 msg.contains("topic.pattern"),
1795 "error must name the offending key, got: {msg}"
1796 );
1797 assert_eq!(source.schema().fields().len(), 0);
1798 }
1799
1800 #[tokio::test]
1801 async fn discover_schema_errors_on_sr_unreachable() {
1802 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1803 let start = std::time::Instant::now();
1804 let result = tokio::time::timeout(
1805 std::time::Duration::from_secs(20),
1806 source.discover_schema(&props(&[
1807 ("bootstrap.servers", "localhost:9092"),
1808 ("group.id", "g"),
1809 ("topic", "t"),
1810 ("format", "avro"),
1811 ("schema.registry.url", "http://192.0.2.1:65535"),
1812 ])),
1813 )
1814 .await
1815 .expect("discover_schema must honor its own 10s timeout");
1816 assert!(
1817 start.elapsed() < std::time::Duration::from_secs(15),
1818 "discover_schema should have returned well before the outer 20s budget"
1819 );
1820 let err = result.expect_err("unreachable SR must surface as Err");
1821 assert!(
1822 matches!(
1823 err,
1824 ConnectorError::ConnectionFailed(_) | ConnectorError::Timeout(_)
1825 ),
1826 "expected ConnectionFailed or Timeout, got: {err:?}"
1827 );
1828 assert_eq!(source.schema().fields().len(), 0);
1829 }
1830
1831 #[tokio::test]
1832 async fn discover_schema_propagates_broker_commit_interval_rejection() {
1833 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1834 let err = source
1835 .discover_schema(&props(&[
1836 ("bootstrap.servers", "localhost:9092"),
1837 ("group.id", "g"),
1838 ("topic", "t"),
1839 ("format", "avro"),
1840 ("schema.registry.url", "http://localhost:8081"),
1841 ("broker.commit.interval.ms", "5000"),
1842 ]))
1843 .await
1844 .expect_err("deprecated config key must produce a propagated error");
1845 let msg = err.to_string();
1846 assert!(
1847 msg.contains("broker.commit.interval.ms"),
1848 "error must name the offending key, got: {msg}"
1849 );
1850 }
1851
1852 #[tokio::test]
1856 async fn discover_schema_happy_path_with_wiremock_sr() {
1857 use wiremock::matchers::{method, path};
1858 use wiremock::{Mock, MockServer, ResponseTemplate};
1859
1860 let avro_schema = serde_json::json!({
1861 "type": "record",
1862 "name": "event",
1863 "fields": [
1864 {"name": "id", "type": "long"},
1865 {"name": "data", "type": {"type": "map", "values": "string"}}
1866 ]
1867 })
1868 .to_string();
1869
1870 let sr = MockServer::start().await;
1871 Mock::given(method("GET"))
1872 .and(path("/subjects/ion_tw-value/versions/latest"))
1873 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1874 "id": 42,
1875 "version": 1,
1876 "subject": "ion_tw-value",
1877 "schema": avro_schema,
1878 "schemaType": "AVRO",
1879 })))
1880 .mount(&sr)
1881 .await;
1882
1883 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1884 source
1885 .discover_schema(&props(&[
1886 ("bootstrap.servers", "localhost:9092"),
1887 ("group.id", "g"),
1888 ("topic", "ion_tw"),
1889 ("format", "avro"),
1890 ("schema.registry.url", &sr.uri()),
1891 ]))
1892 .await
1893 .expect("happy-path discovery must succeed");
1894
1895 let schema = source.schema();
1896 assert_eq!(schema.fields().len(), 2, "expected [id, data]");
1897 assert_eq!(schema.field(0).name(), "id");
1898 assert_eq!(schema.field(1).name(), "data");
1899 assert!(
1900 matches!(
1901 schema.field(1).data_type(),
1902 arrow_schema::DataType::Map(_, _)
1903 ),
1904 "'data' field must survive as a Map type (got {:?})",
1905 schema.field(1).data_type()
1906 );
1907 }
1908
1909 #[tokio::test]
1912 async fn discover_schema_happy_path_record_name_strategy() {
1913 use wiremock::matchers::{method, path};
1914 use wiremock::{Mock, MockServer, ResponseTemplate};
1915
1916 let avro_schema = serde_json::json!({
1917 "type": "record",
1918 "name": "com.acme.Order",
1919 "fields": [
1920 {"name": "order_id", "type": "string"},
1921 {"name": "amount", "type": "double"}
1922 ]
1923 })
1924 .to_string();
1925
1926 let sr = MockServer::start().await;
1927 Mock::given(method("GET"))
1928 .and(path("/subjects/com.acme.Order-value/versions/latest"))
1929 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1930 "id": 7,
1931 "version": 1,
1932 "subject": "com.acme.Order-value",
1933 "schema": avro_schema,
1934 "schemaType": "AVRO",
1935 })))
1936 .mount(&sr)
1937 .await;
1938
1939 let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1940 source
1941 .discover_schema(&props(&[
1942 ("bootstrap.servers", "localhost:9092"),
1943 ("group.id", "g"),
1944 ("topic", "orders"),
1945 ("format", "avro"),
1946 ("schema.registry.url", &sr.uri()),
1947 ("schema.registry.subject.name.strategy", "record-name"),
1948 ("schema.registry.record.name", "com.acme.Order"),
1949 ]))
1950 .await
1951 .expect("happy-path discovery must succeed");
1952
1953 let schema = source.schema();
1954 assert_eq!(schema.fields().len(), 2);
1955 assert_eq!(schema.field(0).name(), "order_id");
1956 assert_eq!(schema.field(1).name(), "amount");
1957 }
1958
1959 #[tokio::test]
1963 async fn open_logs_drift_when_sr_evolved_since_ddl() {
1964 use wiremock::matchers::{method, path};
1965 use wiremock::{Mock, MockServer, ResponseTemplate};
1966
1967 let evolved_schema = serde_json::json!({
1968 "type": "record",
1969 "name": "event",
1970 "fields": [
1971 {"name": "id", "type": "long"},
1972 {"name": "data", "type": {"type": "map", "values": "string"}},
1973 {"name": "version", "type": "int"}
1974 ]
1975 })
1976 .to_string();
1977
1978 let sr = MockServer::start().await;
1979 Mock::given(method("GET"))
1980 .and(path("/subjects/ion_tw-value/versions/latest"))
1981 .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1982 "id": 99,
1983 "version": 2,
1984 "subject": "ion_tw-value",
1985 "schema": evolved_schema,
1986 "schemaType": "AVRO",
1987 })))
1988 .mount(&sr)
1989 .await;
1990
1991 let stale_catalog = Arc::new(Schema::new(vec![
1994 Field::new("id", DataType::Int64, false),
1995 Field::new(
1996 "data",
1997 DataType::Map(
1998 Arc::new(Field::new(
1999 "entries",
2000 DataType::Struct(arrow_schema::Fields::from(vec![
2001 Field::new("key", DataType::Utf8, false),
2002 Field::new("value", DataType::Utf8, true),
2003 ])),
2004 false,
2005 )),
2006 false,
2007 ),
2008 true,
2009 ),
2010 ]));
2011
2012 let mut cfg = KafkaSourceConfig::default();
2013 cfg.bootstrap_servers = "localhost:9092".into();
2014 cfg.group_id = "g".into();
2015 cfg.subscription = TopicSubscription::Topics(vec!["ion_tw".into()]);
2016 cfg.format = Format::Avro;
2017 cfg.schema_registry_url = Some(sr.uri());
2018 let sr_client = SchemaRegistryClient::new(sr.uri(), None);
2019 let mut source = KafkaSource::with_schema_registry(stale_catalog, cfg, sr_client);
2020
2021 let empty_cfg = crate::config::ConnectorConfig::new("kafka");
2022 let _ = source.open(&empty_cfg).await; assert_eq!(
2025 source.schema().fields().len(),
2026 2,
2027 "catalog schema must stay pinned even after SR drift"
2028 );
2029 assert_eq!(
2030 source.last_avro_schema.as_ref().map(|s| s.fields().len()),
2031 Some(3),
2032 "last_avro_schema should reflect the evolved SR shape"
2033 );
2034 }
2035
2036 #[test]
2041 fn test_checkpoint_to_tpl_uses_next_offset() {
2042 let mut offsets = std::collections::HashMap::new();
2043 offsets.insert("events-0".to_string(), "100".to_string());
2044 offsets.insert("events-1".to_string(), "200".to_string());
2045 let cp = SourceCheckpoint::with_offsets(1, offsets);
2046 let tpl = OffsetTracker::from_checkpoint(&cp).to_topic_partition_list();
2047 assert_eq!(tpl.count(), 2);
2048 for elem in tpl.elements() {
2049 let expected = match elem.partition() {
2050 0 => rdkafka::Offset::Offset(101),
2051 1 => rdkafka::Offset::Offset(201),
2052 p => panic!("unexpected partition {p}"),
2053 };
2054 assert_eq!(elem.offset(), expected);
2055 }
2056 }
2057
2058 #[tokio::test]
2059 async fn test_notify_epoch_committed_empty_cp_is_noop() {
2060 let mut source = KafkaSource::new(test_schema(), test_config(), None);
2061 source
2063 .notify_epoch_committed(1, &SourceCheckpoint::new(1))
2064 .await
2065 .unwrap();
2066 }
2067}