Skip to main content

laminar_connectors/kafka/
source.rs

1//! Kafka source connector implementation.
2//!
3//! [`KafkaSource`] implements the [`SourceConnector`] trait, consuming
4//! from Kafka topics via rdkafka's `StreamConsumer`, deserializing
5//! messages using pluggable formats, and producing Arrow `RecordBatch`
6//! data through the connector SDK.
7
8use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
11use rdkafka::message::Message;
12use rdkafka::ClientConfig;
13use rdkafka::TopicPartitionList;
14use std::collections::BTreeSet;
15use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17use tokio::sync::Notify;
18use tracing::{debug, info, warn};
19
20use super::rebalance::LaminarConsumerContext;
21
22/// Locks a mutex, recovering from poison if a prior holder panicked.
23///
24/// Used for state shared with rdkafka's rebalance callback thread.
25/// Poison indicates a panic in the callback — the data may be stale
26/// but is structurally sound, so we recover rather than propagate.
27fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
28    mutex.lock().unwrap_or_else(|poisoned| {
29        tracing::warn!("mutex poisoned, recovering");
30        poisoned.into_inner()
31    })
32}
33
34use crate::checkpoint::SourceCheckpoint;
35use crate::config::{ConnectorConfig, ConnectorState};
36use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
37use crate::error::ConnectorError;
38use crate::health::HealthStatus;
39use crate::metrics::ConnectorMetrics;
40use crate::serde::{self, Format, RecordDeserializer};
41
42use super::avro::AvroDeserializer;
43use super::config::{
44    resolve_value_subject, KafkaSourceConfig, SchemaEvolutionStrategy, StartupMode,
45    TopicSubscription,
46};
47use super::metrics::KafkaSourceMetrics;
48use super::offsets::OffsetTracker;
49use super::rebalance::RebalanceState;
50use super::schema_registry::SchemaRegistryClient;
51use super::watermarks::KafkaWatermarkTracker;
52
53use crate::schema::evolution::SchemaEvolution;
54use crate::schema::traits::{CompatibilityMode, EvolutionVerdict};
55
56/// Payload sent from the background Kafka reader task to [`KafkaSource::poll_batch`].
57struct KafkaPayload {
58    data: Vec<u8>,
59    topic: Arc<str>,
60    partition: i32,
61    offset: i64,
62    timestamp_ms: Option<i64>,
63    /// Kafka message headers serialized as JSON string ("{key: value, ...}").
64    /// Only populated when `include_headers` is enabled.
65    headers_json: Option<String>,
66}
67
68/// Single-consumer async receiver for the reader → `poll_batch` queue.
69type KafkaPayloadRx = crossfire::AsyncRx<crossfire::mpsc::Array<KafkaPayload>>;
70
71/// Kafka source connector that consumes messages and produces Arrow batches.
72///
73/// Operates in Ring 1 (background) and pushes deserialized `RecordBatch`
74/// data to Ring 0 via the streaming `Source<T>` API.
75///
76/// # Lifecycle
77///
78/// 1. Create with [`KafkaSource::new`] or [`KafkaSource::with_schema_registry`]
79/// 2. Call `open()` to connect to Kafka and subscribe to topics
80/// 3. Call `poll_batch()` in a loop to consume messages
81/// 4. Call `checkpoint()` / `restore()` for fault tolerance
82/// 5. Call `close()` for clean shutdown
83pub struct KafkaSource {
84    consumer: Option<StreamConsumer<LaminarConsumerContext>>,
85    config: KafkaSourceConfig,
86    deserializer: Box<dyn RecordDeserializer>,
87    offsets: OffsetTracker,
88    state: ConnectorState,
89    metrics: KafkaSourceMetrics,
90    schema: SchemaRef,
91    channel_len: Arc<AtomicUsize>,
92    rebalance_state: Arc<Mutex<RebalanceState>>,
93    /// Shared rebalance counter bridging `LaminarConsumerContext` → `KafkaSourceMetrics`.
94    rebalance_counter: Arc<AtomicU64>,
95    /// Monotonic counter bumped on each partition revoke event.
96    ///
97    /// Shared with `LaminarConsumerContext` for lock-free revoke detection
98    /// from `poll_batch()`. The source compares `last_seen_revoke_gen`
99    /// against this value each poll cycle and only locks `rebalance_state`
100    /// when a change is detected to purge revoked partition offsets.
101    revoke_generation: Arc<AtomicU64>,
102    /// Last observed value of `revoke_generation`, cached per poll cycle.
103    last_seen_revoke_gen: u64,
104    schema_registry: Option<Arc<SchemaRegistryClient>>,
105    data_ready: Arc<Notify>,
106    checkpoint_request: Arc<AtomicBool>,
107    msg_rx: Option<KafkaPayloadRx>,
108    reader_handle: Option<tokio::task::JoinHandle<()>>,
109    commit_handle: Option<tokio::task::JoinHandle<()>>,
110    hwm_handle: Option<tokio::task::JoinHandle<()>>,
111    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
112    offset_commit_tx: Option<tokio::sync::watch::Sender<TopicPartitionList>>,
113    watermark_tracker: Option<KafkaWatermarkTracker>,
114    /// Receiver for high watermark data from the background reader task.
115    /// Each entry is `(topic, partition, high_watermark)` for lag computation.
116    #[allow(clippy::type_complexity)]
117    high_watermarks_rx: Option<tokio::sync::watch::Receiver<Vec<(Arc<str>, i32, i64)>>>,
118    /// Shared flag: `true` when the reader task has paused Kafka partitions
119    /// due to downstream backpressure. Used to re-pause newly assigned
120    /// partitions during rebalance.
121    reader_paused: Arc<AtomicBool>,
122    /// Set by `commit_callback` on async commit failure; reader task
123    /// escalates to `CommitMode::Sync` on the next commit timer tick.
124    commit_retry_needed: Arc<AtomicBool>,
125    /// Offset snapshot shared with the rebalance callback for seek-on-assign.
126    /// Updated once per `poll_batch()` cycle (not per message).
127    offset_snapshot: Arc<Mutex<OffsetTracker>>,
128
129    /// Last Avro writer schema from the schema registry, used to diff
130    /// successive versions for evolution detection.
131    last_avro_schema: Option<SchemaRef>,
132
133    // Reusable poll_batch buffers — cleared each cycle, capacity retained.
134    poll_payload_buf: Vec<u8>,
135    poll_payload_offsets: Vec<(usize, usize)>,
136    poll_meta_partitions: Vec<i32>,
137    poll_meta_offsets: Vec<i64>,
138    poll_meta_timestamps: Vec<Option<i64>>,
139    poll_meta_headers: Vec<Option<String>>,
140}
141
142impl KafkaSource {
143    /// Creates a new Kafka source connector with explicit schema.
144    #[must_use]
145    pub fn new(
146        schema: SchemaRef,
147        config: KafkaSourceConfig,
148        registry: Option<&prometheus::Registry>,
149    ) -> Self {
150        Self::build_base(schema, config, select_deserializer, None, registry)
151    }
152
153    /// Creates a new Kafka source connector with Schema Registry.
154    #[must_use]
155    pub fn with_schema_registry(
156        schema: SchemaRef,
157        config: KafkaSourceConfig,
158        sr_client: SchemaRegistryClient,
159    ) -> Self {
160        let sr = Arc::new(sr_client);
161        let sr_clone = Arc::clone(&sr);
162        let deser_factory = move |format: Format| -> Box<dyn RecordDeserializer> {
163            if format == Format::Avro {
164                Box::new(AvroDeserializer::with_schema_registry(sr_clone))
165            } else {
166                select_deserializer(format)
167            }
168        };
169        Self::build_base(schema, config, deser_factory, Some(sr), None)
170    }
171
172    /// Build a Schema Registry client from the parsed config, or
173    /// `Ok(None)` when `schema.registry.url` is not set.
174    fn build_sr_client(
175        config: &KafkaSourceConfig,
176    ) -> Result<Option<SchemaRegistryClient>, ConnectorError> {
177        let Some(sr_url) = config.schema_registry_url.as_ref() else {
178            return Ok(None);
179        };
180        let client = if let Some(ca) = config.schema_registry_ssl_ca_location.as_deref() {
181            SchemaRegistryClient::with_tls_mtls(
182                sr_url.clone(),
183                config.schema_registry_auth.clone(),
184                ca,
185                config.schema_registry_ssl_certificate_location.as_deref(),
186                config.schema_registry_ssl_key_location.as_deref(),
187            )?
188        } else {
189            SchemaRegistryClient::new(sr_url.clone(), config.schema_registry_auth.clone())
190        };
191        Ok(Some(client))
192    }
193
194    fn build_base(
195        schema: SchemaRef,
196        config: KafkaSourceConfig,
197        deser_factory: impl FnOnce(Format) -> Box<dyn RecordDeserializer>,
198        schema_registry: Option<Arc<SchemaRegistryClient>>,
199        registry: Option<&prometheus::Registry>,
200    ) -> Self {
201        let deserializer = deser_factory(config.format);
202        let channel_len = Arc::new(AtomicUsize::new(0));
203
204        let watermark_tracker = if config.enable_watermark_tracking {
205            Some(
206                KafkaWatermarkTracker::new(0, config.idle_timeout)
207                    .with_max_out_of_orderness(config.max_out_of_orderness),
208            )
209        } else {
210            None
211        };
212
213        Self {
214            consumer: None,
215            config,
216            deserializer,
217            offsets: OffsetTracker::new(),
218            state: ConnectorState::Created,
219            metrics: KafkaSourceMetrics::new(registry),
220            schema,
221            channel_len,
222            rebalance_state: Arc::new(Mutex::new(RebalanceState::new())),
223            rebalance_counter: Arc::new(AtomicU64::new(0)),
224            revoke_generation: Arc::new(AtomicU64::new(0)),
225            last_seen_revoke_gen: 0,
226            schema_registry,
227            data_ready: Arc::new(Notify::new()),
228            checkpoint_request: Arc::new(AtomicBool::new(false)),
229            msg_rx: None,
230            reader_handle: None,
231            commit_handle: None,
232            hwm_handle: None,
233            reader_shutdown: None,
234            offset_commit_tx: None,
235            watermark_tracker,
236            high_watermarks_rx: None,
237            reader_paused: Arc::new(AtomicBool::new(false)),
238            commit_retry_needed: Arc::new(AtomicBool::new(false)),
239            offset_snapshot: Arc::new(Mutex::new(OffsetTracker::new())),
240            last_avro_schema: None,
241            poll_payload_buf: Vec::new(),
242            poll_payload_offsets: Vec::new(),
243            poll_meta_partitions: Vec::new(),
244            poll_meta_offsets: Vec::new(),
245            poll_meta_timestamps: Vec::new(),
246            poll_meta_headers: Vec::new(),
247        }
248    }
249
250    /// Lifecycle state (Created → Initializing → Running → Closed).
251    #[must_use]
252    pub fn state(&self) -> ConnectorState {
253        self.state
254    }
255
256    /// Per-topic-partition offset state for checkpoint and monitoring.
257    #[must_use]
258    pub fn offsets(&self) -> &OffsetTracker {
259        &self.offsets
260    }
261
262    /// Shared backpressure fill counter for downstream wiring.
263    #[must_use]
264    pub fn channel_len(&self) -> Arc<AtomicUsize> {
265        Arc::clone(&self.channel_len)
266    }
267
268    /// Shared partition assignment state (updated by rebalance callbacks).
269    #[must_use]
270    pub fn rebalance_state(&self) -> Arc<Mutex<RebalanceState>> {
271        Arc::clone(&self.rebalance_state)
272    }
273
274    /// Whether a Schema Registry client is configured.
275    #[must_use]
276    pub fn has_schema_registry(&self) -> bool {
277        self.schema_registry.is_some()
278    }
279
280    /// Current combined watermark from the watermark tracker.
281    ///
282    /// Returns `None` if watermark tracking is disabled or no partitions
283    /// have received data yet.
284    #[must_use]
285    pub fn current_watermark(&self) -> Option<i64> {
286        self.watermark_tracker
287            .as_ref()
288            .and_then(KafkaWatermarkTracker::current_watermark)
289    }
290
291    /// Returns the configured event-time column name, if any.
292    ///
293    /// Used by the pipeline to identify which column contains event timestamps
294    /// for watermark generation, instead of hardcoding `event_time`/`timestamp`.
295    #[must_use]
296    pub fn event_time_column(&self) -> Option<&str> {
297        self.config.event_time_column.as_deref()
298    }
299
300    /// Spawns background tasks on first `poll_batch()` call.
301    ///
302    /// Three tasks handle separate concerns:
303    /// - **Reader**: consumes messages, manages backpressure, detects revokes
304    /// - **Commit**: periodic advisory broker offset commits
305    /// - **HWM**: periodic high watermark queries for lag monitoring
306    ///
307    /// Deferred to allow `restore()` to access the consumer directly after `open()`.
308    #[allow(clippy::too_many_lines)]
309    fn ensure_reader_started(&mut self) {
310        if self.reader_handle.is_some() || self.consumer.is_none() {
311            return;
312        }
313
314        let consumer = Arc::new(self.consumer.take().unwrap());
315        let (msg_tx, msg_rx) =
316            crossfire::mpsc::bounded_async::<KafkaPayload>(self.config.reader_channel_capacity);
317        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
318        let (offset_tx, offset_rx) = tokio::sync::watch::channel(TopicPartitionList::new());
319        let reader_offset_rx = offset_rx.clone(); // reader reads final offsets on shutdown
320        let (hwm_tx, hwm_rx) = tokio::sync::watch::channel(Vec::new());
321        // Channel for the reader to publish seen partitions to the HWM task.
322        let (seen_tx, seen_rx) = tokio::sync::watch::channel(Vec::<(Arc<str>, i32)>::new());
323        let data_ready = Arc::clone(&self.data_ready);
324        let channel_len = Arc::clone(&self.channel_len);
325        let capture_headers = self.config.include_headers;
326        let broker_commit_interval = self.config.broker_commit_interval;
327        let reader_channel_capacity = self.config.reader_channel_capacity;
328        let reader_paused = Arc::clone(&self.reader_paused);
329        let revoke_generation = Arc::clone(&self.revoke_generation);
330        let rebalance_state = Arc::clone(&self.rebalance_state);
331        let commit_retry_needed = Arc::clone(&self.commit_retry_needed);
332        let pause_threshold = self.config.backpressure_high_watermark;
333        let resume_threshold = self.config.backpressure_low_watermark;
334
335        // -- Commit task: periodic advisory broker offset commits --
336        let commit_consumer = Arc::clone(&consumer);
337        let commit_retry = Arc::clone(&commit_retry_needed);
338        let commit_metrics = self.metrics.clone();
339        let mut commit_shutdown = shutdown_rx.clone();
340        let commit_handle = tokio::spawn(async move {
341            if broker_commit_interval.is_zero() {
342                // Disabled — wait for shutdown.
343                let _ = commit_shutdown.changed().await;
344                return;
345            }
346            let mut timer = tokio::time::interval(broker_commit_interval);
347            timer.tick().await; // skip first
348
349            loop {
350                tokio::select! {
351                    biased;
352                    _ = commit_shutdown.changed() => break,
353                    _ = timer.tick() => {
354                        let tpl = offset_rx.borrow().clone();
355                        if tpl.count() == 0 { continue; }
356                        if commit_retry
357                            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
358                            .is_ok()
359                        {
360                            let c = Arc::clone(&commit_consumer);
361                            let flag = Arc::clone(&commit_retry);
362                            let n = tpl.count();
363                            // Await the blocking commit so it completes before
364                            // close() drops the consumer Arc.
365                            //
366                            // Metrics: librdkafka fires `commit_callback` for
367                            // BOTH sync and async commits, and the callback is
368                            // our authoritative counter source. We only bump
369                            // `commit_failures` here for outcomes the callback
370                            // can't observe — a panicked blocking task, or a
371                            // tokio timeout while the commit is still pending.
372                            match tokio::time::timeout(
373                                std::time::Duration::from_secs(10),
374                                tokio::task::spawn_blocking(move || {
375                                    c.commit(&tpl, CommitMode::Sync)
376                                }),
377                            )
378                            .await
379                            {
380                                Ok(Ok(Ok(()))) => {
381                                    // callback already bumped `commits`
382                                    info!(partition_count = n, "sync offset commit retry succeeded");
383                                }
384                                Ok(Ok(Err(e))) => {
385                                    // callback already bumped `commit_failures`
386                                    flag.store(true, Ordering::Release);
387                                    warn!(error = %e, "sync offset commit retry failed");
388                                }
389                                Ok(Err(e)) => {
390                                    commit_metrics.record_commit_failure();
391                                    warn!(error = %e, "sync commit blocking task panicked");
392                                }
393                                Err(_) => {
394                                    commit_metrics.record_commit_failure();
395                                    flag.store(true, Ordering::Release);
396                                    warn!("sync offset commit retry timed out");
397                                }
398                            }
399                        } else {
400                            // Async enqueue only — the broker-confirmed outcome
401                            // (Ok or Err) arrives asynchronously via
402                            // `LaminarConsumerContext::commit_callback`, which
403                            // bumps `commits` / `commit_failures`. Counting at
404                            // enqueue would overcount (every rejected commit
405                            // would show as a success here). An immediate `Err`
406                            // below means librdkafka couldn't even queue the
407                            // commit — rare, but worth observing.
408                            match commit_consumer.commit(&tpl, CommitMode::Async) {
409                                Ok(()) => {
410                                    info!(partition_count = tpl.count(), "periodic broker offset commit (advisory)");
411                                }
412                                Err(e) => {
413                                    commit_metrics.record_commit_failure();
414                                    warn!(error = %e, "periodic broker offset commit failed");
415                                }
416                            }
417                        }
418                    }
419                }
420            }
421        });
422
423        // -- HWM task: periodic high watermark queries for lag monitoring --
424        let hwm_consumer = Arc::clone(&consumer);
425        let mut hwm_shutdown = shutdown_rx.clone();
426        let hwm_handle = tokio::spawn(async move {
427            let mut timer = tokio::time::interval(std::time::Duration::from_secs(30));
428            timer.tick().await; // skip first
429
430            loop {
431                tokio::select! {
432                    biased;
433                    _ = hwm_shutdown.changed() => break,
434                    _ = timer.tick() => {
435                        let partitions: Vec<_> = seen_rx.borrow().clone();
436                        if partitions.is_empty() { continue; }
437                        let c = Arc::clone(&hwm_consumer);
438                        let watermarks = tokio::time::timeout(
439                            std::time::Duration::from_secs(10),
440                            tokio::task::spawn_blocking(move || {
441                                let mut results = Vec::with_capacity(partitions.len());
442                                for (topic, partition) in &partitions {
443                                    match c.fetch_watermarks(topic, *partition, std::time::Duration::from_secs(1)) {
444                                        Ok((_low, high)) => results.push((Arc::clone(topic), *partition, high)),
445                                        Err(e) => debug!(%topic, partition, error = %e, "HWM fetch failed"),
446                                    }
447                                }
448                                results
449                            }),
450                        )
451                        .await
452                        .unwrap_or(Ok(Vec::new()))
453                        .unwrap_or_default();
454                        if !watermarks.is_empty() {
455                            let _ = hwm_tx.send(watermarks);
456                        }
457                    }
458                }
459            }
460        });
461
462        // -- Reader task: message consumption, backpressure, revoke pruning --
463        let mut reader_shutdown = shutdown_rx;
464        let reader_handle = tokio::spawn(async move {
465            let mut cached_topic: Arc<str> = Arc::from("");
466            let mut seen_partitions: std::collections::HashSet<(Arc<str>, i32)> =
467                std::collections::HashSet::new();
468            let mut is_paused = false;
469            let mut last_revoke_gen: u64 = 0;
470
471            loop {
472                // Prune seen_partitions on revoke so HWM task stops querying them.
473                let current_gen = revoke_generation.load(Ordering::Acquire);
474                if current_gen != last_revoke_gen {
475                    last_revoke_gen = current_gen;
476                    let assigned = lock_or_recover(&rebalance_state)
477                        .assigned_partitions()
478                        .clone();
479                    seen_partitions.retain(|(t, p)| assigned.contains(&(t.to_string(), *p)));
480                    let _ = seen_tx.send(seen_partitions.iter().cloned().collect());
481                }
482
483                // Backpressure: pause/resume Kafka partitions based on channel fill.
484                #[allow(clippy::cast_precision_loss)]
485                let fill = if reader_channel_capacity > 0 {
486                    channel_len.load(Ordering::Acquire) as f64 / reader_channel_capacity as f64
487                } else {
488                    0.0
489                };
490                if fill >= pause_threshold && !is_paused {
491                    if let Ok(assignment) = consumer.assignment() {
492                        if consumer.pause(&assignment).is_ok() {
493                            is_paused = true;
494                            reader_paused.store(true, Ordering::Release);
495                            debug!("reader: paused Kafka partitions (fill={fill:.2})");
496                        }
497                    }
498                } else if fill <= resume_threshold && is_paused {
499                    if let Ok(assignment) = consumer.assignment() {
500                        if consumer.resume(&assignment).is_ok() {
501                            is_paused = false;
502                            reader_paused.store(false, Ordering::Release);
503                            debug!("reader: resumed Kafka partitions (fill={fill:.2})");
504                        }
505                    }
506                }
507
508                let msg_result = tokio::select! {
509                    biased;
510                    _ = reader_shutdown.changed() => break,
511                    msg = tokio::time::timeout(
512                        std::time::Duration::from_millis(200),
513                        consumer.recv(),
514                    ) => match msg {
515                        Ok(result) => result,
516                        Err(_timeout) => continue,
517                    },
518                };
519                match msg_result {
520                    Ok(msg) => {
521                        if let Some(payload) = msg.payload() {
522                            let topic = msg.topic();
523                            if &*cached_topic != topic {
524                                cached_topic = Arc::from(topic);
525                            }
526                            if seen_partitions.insert((Arc::clone(&cached_topic), msg.partition()))
527                            {
528                                let _ = seen_tx.send(seen_partitions.iter().cloned().collect());
529                            }
530                            let timestamp_ms = match msg.timestamp() {
531                                rdkafka::Timestamp::CreateTime(ts)
532                                | rdkafka::Timestamp::LogAppendTime(ts) => Some(ts),
533                                rdkafka::Timestamp::NotAvailable => None,
534                            };
535                            let headers_json = if capture_headers {
536                                use rdkafka::message::Headers;
537                                msg.headers().and_then(|hdrs| {
538                                    let pairs: Vec<(String, serde_json::Value)> = (0..hdrs.count())
539                                        .map(|i| {
540                                            let h = hdrs.get(i);
541                                            let val = match h.value {
542                                                Some(v) => serde_json::Value::String(
543                                                    String::from_utf8_lossy(v).into_owned(),
544                                                ),
545                                                None => serde_json::Value::Null,
546                                            };
547                                            (h.key.to_string(), val)
548                                        })
549                                        .collect();
550                                    serde_json::to_string(&pairs).ok()
551                                })
552                            } else {
553                                None
554                            };
555                            let kp = KafkaPayload {
556                                data: payload.to_vec(),
557                                topic: Arc::clone(&cached_topic),
558                                partition: msg.partition(),
559                                offset: msg.offset(),
560                                timestamp_ms,
561                                headers_json,
562                            };
563                            match msg_tx.try_send(kp) {
564                                Ok(()) => {
565                                    channel_len.fetch_add(1, Ordering::Relaxed);
566                                }
567                                Err(crossfire::TrySendError::Full(kp)) => {
568                                    if !is_paused {
569                                        if let Ok(assignment) = consumer.assignment() {
570                                            if consumer.pause(&assignment).is_ok() {
571                                                is_paused = true;
572                                                reader_paused.store(true, Ordering::Release);
573                                                debug!("reader: paused partitions (channel full)");
574                                            }
575                                        }
576                                    }
577                                    channel_len.fetch_add(1, Ordering::Relaxed);
578                                    let send_ok = tokio::select! {
579                                        biased;
580                                        _ = reader_shutdown.changed() => false,
581                                        result = msg_tx.send(kp) => result.is_ok(),
582                                    };
583                                    if !send_ok {
584                                        channel_len.fetch_sub(1, Ordering::Relaxed);
585                                        break;
586                                    }
587                                }
588                                Err(crossfire::TrySendError::Disconnected(_)) => break,
589                            }
590                            data_ready.notify_one();
591                        }
592                    }
593                    Err(e) => {
594                        warn!(error = %e, "Kafka consumer error");
595                    }
596                }
597            }
598
599            // Final commit + unsubscribe (reader owns the consumer Arc).
600            // The close() method sends latest offsets via offset_tx before
601            // signaling shutdown, so the commit task may have already
602            // committed them — this is a best-effort final flush.
603            let tpl = reader_offset_rx.borrow().clone();
604            if tpl.count() > 0 {
605                match consumer.commit(&tpl, CommitMode::Sync) {
606                    Ok(()) => info!(
607                        partition_count = tpl.count(),
608                        "committed final offsets on shutdown"
609                    ),
610                    Err(e) => warn!(error = %e, "failed to commit final offsets on shutdown"),
611                }
612            }
613            consumer.unsubscribe();
614        });
615
616        self.msg_rx = Some(msg_rx);
617        self.reader_handle = Some(reader_handle);
618        self.commit_handle = Some(commit_handle);
619        self.hwm_handle = Some(hwm_handle);
620        self.reader_shutdown = Some(shutdown_tx);
621        self.offset_commit_tx = Some(offset_tx);
622        self.high_watermarks_rx = Some(hwm_rx);
623    }
624}
625
626#[async_trait]
627#[allow(clippy::too_many_lines)] // poll_batch has legitimate complexity (backpressure + deser + poison pill fallback)
628impl SourceConnector for KafkaSource {
629    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
630        self.state = ConnectorState::Initializing;
631
632        // If config provided, re-parse (supports runtime config override).
633        let kafka_config = if config.properties().is_empty() {
634            self.config.clone()
635        } else {
636            let parsed = KafkaSourceConfig::from_config(config)?;
637            self.config = parsed.clone();
638            parsed
639        };
640
641        // Re-select deserializer (factory defaults to JSON).
642        if let Some(sr_client) = Self::build_sr_client(&kafka_config)? {
643            let sr = Arc::new(sr_client);
644            self.schema_registry = Some(Arc::clone(&sr));
645            self.deserializer = if kafka_config.format == Format::Avro {
646                Box::new(AvroDeserializer::with_schema_registry(sr))
647            } else {
648                select_deserializer(kafka_config.format)
649            };
650        } else if let Some(ref sr) = self.schema_registry {
651            // Preserve SR client injected via with_schema_registry().
652            self.deserializer = if kafka_config.format == Format::Avro {
653                Box::new(AvroDeserializer::with_schema_registry(Arc::clone(sr)))
654            } else {
655                select_deserializer(kafka_config.format)
656            };
657        } else {
658            self.deserializer = select_deserializer(kafka_config.format);
659        }
660
661        // New deserializer has empty known_ids; reset evolution baseline to match.
662        self.last_avro_schema = None;
663
664        // Override schema from SQL DDL if provided.
665        if let Some(schema) = config.arrow_schema() {
666            info!(
667                fields = schema.fields().len(),
668                "using SQL-defined schema for deserialization"
669            );
670            self.schema = schema;
671        }
672
673        info!(
674            brokers = %kafka_config.bootstrap_servers,
675            subscription = ?kafka_config.subscription,
676            group_id = %kafka_config.group_id,
677            format = %kafka_config.format,
678            schema_fields = self.schema.fields().len(),
679            "opening Kafka source connector"
680        );
681
682        // Build rdkafka consumer with rebalance-aware context.
683        let rdkafka_config: ClientConfig = kafka_config.to_rdkafka_config();
684        let context = LaminarConsumerContext::new(
685            Arc::clone(&self.checkpoint_request),
686            Arc::clone(&self.rebalance_state),
687            Arc::clone(&self.rebalance_counter),
688            Arc::clone(&self.revoke_generation),
689            Arc::clone(&self.reader_paused),
690            Arc::clone(&self.commit_retry_needed),
691            Arc::clone(&self.offset_snapshot),
692            // IntCounter::clone is an Arc bump; these are shared with the
693            // metrics struct and bumped from librdkafka's background thread
694            // inside `commit_callback`.
695            self.metrics.commits.clone(),
696            self.metrics.commit_failures.clone(),
697        );
698        let consumer: StreamConsumer<LaminarConsumerContext> =
699            rdkafka_config.create_with_context(context).map_err(|e| {
700                ConnectorError::ConnectionFailed(format!("failed to create consumer: {e}"))
701            })?;
702
703        // Subscribe to topics (list or regex pattern).
704        match &kafka_config.subscription {
705            TopicSubscription::Topics(topics) => {
706                let topic_refs: Vec<&str> = topics.iter().map(String::as_str).collect();
707                consumer.subscribe(&topic_refs).map_err(|e| {
708                    ConnectorError::ConnectionFailed(format!("failed to subscribe: {e}"))
709                })?;
710            }
711            TopicSubscription::Pattern(pattern) => {
712                // rdkafka requires a ^ prefix for regex patterns
713                let regex_pattern = if pattern.starts_with('^') {
714                    pattern.clone()
715                } else {
716                    format!("^{pattern}")
717                };
718                consumer.subscribe(&[&regex_pattern]).map_err(|e| {
719                    ConnectorError::ConnectionFailed(format!("failed to subscribe to pattern: {e}"))
720                })?;
721            }
722        }
723
724        // Apply startup mode positioning before starting the reader.
725        match &kafka_config.startup_mode {
726            // GroupOffsets/Earliest/Latest are handled via auto.offset.reset in to_rdkafka_config().
727            StartupMode::GroupOffsets | StartupMode::Earliest | StartupMode::Latest => {}
728            StartupMode::SpecificOffsets(offsets) => {
729                let mut tpl = rdkafka::TopicPartitionList::new();
730                let topics = match &kafka_config.subscription {
731                    TopicSubscription::Topics(t) => t.clone(),
732                    TopicSubscription::Pattern(_) => Vec::new(),
733                };
734                for topic in &topics {
735                    for (&partition, &offset) in offsets {
736                        if let Err(e) = tpl.add_partition_offset(
737                            topic,
738                            partition,
739                            rdkafka::Offset::Offset(offset),
740                        ) {
741                            tracing::warn!(
742                                %topic, partition, offset,
743                                error = %e,
744                                "failed to add specific offset to partition list"
745                            );
746                        }
747                    }
748                }
749                if tpl.count() > 0 {
750                    consumer.assign(&tpl).map_err(|e| {
751                        ConnectorError::ConnectionFailed(format!(
752                            "failed to assign specific offsets: {e}"
753                        ))
754                    })?;
755                    info!(
756                        partition_count = tpl.count(),
757                        "assigned consumer to specific offsets"
758                    );
759                }
760            }
761            StartupMode::Timestamp(ts_ms) => {
762                // rdkafka requires assignment before offsets_for_times.
763                // Wait briefly for partition assignment from the group coordinator,
764                // then seek each assigned partition to the target timestamp.
765                let mut tpl = rdkafka::TopicPartitionList::new();
766                let topics = match &kafka_config.subscription {
767                    TopicSubscription::Topics(t) => t.clone(),
768                    TopicSubscription::Pattern(_) => Vec::new(),
769                };
770                // Query metadata to discover partition count per topic.
771                if let Ok(metadata) = consumer.fetch_metadata(
772                    topics.first().map(String::as_str),
773                    std::time::Duration::from_secs(10),
774                ) {
775                    for topic_meta in metadata.topics() {
776                        for partition_meta in topic_meta.partitions() {
777                            if let Err(e) = tpl.add_partition_offset(
778                                topic_meta.name(),
779                                partition_meta.id(),
780                                rdkafka::Offset::Offset(*ts_ms),
781                            ) {
782                                tracing::warn!(
783                                    topic = topic_meta.name(),
784                                    partition = partition_meta.id(),
785                                    error = %e,
786                                    "failed to add timestamp offset to partition list"
787                                );
788                            }
789                        }
790                    }
791                }
792                if tpl.count() > 0 {
793                    match consumer.offsets_for_times(tpl, std::time::Duration::from_secs(10)) {
794                        Ok(resolved) => {
795                            consumer.assign(&resolved).map_err(|e| {
796                                ConnectorError::ConnectionFailed(format!(
797                                    "failed to assign timestamp offsets: {e}"
798                                ))
799                            })?;
800                            info!(
801                                timestamp_ms = ts_ms,
802                                partition_count = resolved.count(),
803                                "assigned consumer to timestamp offsets"
804                            );
805                        }
806                        Err(e) => {
807                            warn!(
808                                error = %e,
809                                timestamp_ms = ts_ms,
810                                "failed to resolve timestamp offsets, falling back to group offsets"
811                            );
812                        }
813                    }
814                }
815            }
816        }
817
818        self.consumer = Some(consumer);
819        self.state = ConnectorState::Running;
820
821        // Reader is NOT started here — deferred to first `poll_batch()`.
822        // This allows `restore()` to access the consumer directly between
823        // `open()` and the first poll, calling `consumer.assign()` to seek
824        // to checkpoint offsets. The pipeline's fallback poll interval
825        // ensures the first `poll_batch()` fires promptly even without
826        // a `data_ready_notify()` signal.
827
828        // Eagerly fetch the SR schema so the Arrow schema is available at
829        // plan time (before the first poll_batch).
830        if let Some(ref sr) = self.schema_registry {
831            if let TopicSubscription::Topics(topics) = &kafka_config.subscription {
832                if topics.len() > 1 {
833                    warn!("multiple topics with schema registry — using first topic's schema");
834                }
835                if let Some(topic) = topics.first() {
836                    let subject = resolve_value_subject(
837                        kafka_config.schema_registry_subject_strategy,
838                        kafka_config.schema_registry_record_name.as_deref(),
839                        topic,
840                    );
841                    match tokio::time::timeout(
842                        kafka_config.schema_registry_discovery_timeout,
843                        sr.get_latest_schema(&subject),
844                    )
845                    .await
846                    {
847                        Ok(Ok(cached)) => {
848                            if let Some(avro_deser) = self
849                                .deserializer
850                                .as_any_mut()
851                                .and_then(|any| any.downcast_mut::<AvroDeserializer>())
852                            {
853                                if let Err(e) =
854                                    avro_deser.register_schema(cached.id, &cached.schema_str)
855                                {
856                                    warn!(%subject, error = %e, "SR schema register failed");
857                                } else {
858                                    // Keep the catalog schema pinned — planner
859                                    // plans are already built against it.
860                                    log_schema_drift(&self.schema, &cached.arrow_schema, &subject);
861                                    info!(%subject, schema_id = cached.id,
862                                        "SR schema fetched at open()");
863                                    self.last_avro_schema = Some(cached.arrow_schema);
864                                }
865                            }
866                        }
867                        Ok(Err(e)) => {
868                            warn!(%subject, error = %e, "SR unavailable at open(), will resolve lazily");
869                        }
870                        Err(_elapsed) => {
871                            warn!(%subject, "SR prefetch timed out at open(), will resolve lazily");
872                        }
873                    }
874                }
875            }
876        }
877
878        info!("Kafka source connector opened successfully");
879        Ok(())
880    }
881
882    async fn discover_schema(&mut self, properties: &std::collections::HashMap<String, String>) {
883        let cfg = crate::config::ConnectorConfig::with_properties("kafka", properties.clone());
884        let Ok(kafka_config) = KafkaSourceConfig::from_config(&cfg) else {
885            return;
886        };
887        if kafka_config.format != Format::Avro {
888            return;
889        }
890
891        let topic = match &kafka_config.subscription {
892            TopicSubscription::Topics(topics) => match topics.first() {
893                Some(t) => {
894                    if topics.len() > 1 {
895                        warn!(topics = ?topics, chosen = %t,
896                            "multi-topic source: using first topic's SR schema");
897                    }
898                    t.clone()
899                }
900                None => return,
901            },
902            TopicSubscription::Pattern(pattern) => {
903                warn!(%pattern,
904                    "topic.pattern cannot auto-discover a schema — declare columns explicitly");
905                return;
906            }
907        };
908
909        let sr_client = match Self::build_sr_client(&kafka_config) {
910            Ok(Some(c)) => c,
911            Ok(None) => return,
912            Err(e) => {
913                warn!(error = %e, "Schema Registry client build failed");
914                return;
915            }
916        };
917
918        let subject = resolve_value_subject(
919            kafka_config.schema_registry_subject_strategy,
920            kafka_config.schema_registry_record_name.as_deref(),
921            &topic,
922        );
923        let timeout = kafka_config.schema_registry_discovery_timeout;
924
925        match tokio::time::timeout(timeout, sr_client.get_latest_schema(&subject)).await {
926            Ok(Ok(cached)) => {
927                self.metrics.record_sr_discovery_success();
928                info!(%subject, schema_id = cached.id,
929                    fields = cached.arrow_schema.fields().len(),
930                    "discovered Avro schema from Schema Registry");
931                self.schema = cached.arrow_schema;
932            }
933            Ok(Err(e)) => {
934                self.metrics.record_sr_discovery_failure();
935                warn!(%subject, error = %e, "Schema Registry lookup failed");
936            }
937            Err(_) => {
938                self.metrics.record_sr_discovery_timeout();
939                warn!(%subject, timeout_secs = timeout.as_secs(),
940                    "Schema Registry lookup timed out");
941            }
942        }
943    }
944
945    #[allow(clippy::cast_possible_truncation)] // Kafka partition/offset values fit in narrower types
946    async fn poll_batch(
947        &mut self,
948        max_records: usize,
949    ) -> Result<Option<SourceBatch>, ConnectorError> {
950        if self.state != ConnectorState::Running {
951            return Err(ConnectorError::InvalidState {
952                expected: "Running".into(),
953                actual: self.state.to_string(),
954            });
955        }
956
957        // Lazily spawn the background reader task on first poll.
958        self.ensure_reader_started();
959
960        let rx = self
961            .msg_rx
962            .as_mut()
963            .ok_or_else(|| ConnectorError::InvalidState {
964                expected: "reader initialized".into(),
965                actual: "reader is None".into(),
966            })?;
967
968        let limit = max_records.min(self.config.max_poll_records);
969
970        // Reuse struct-level buffers — clear without freeing capacity.
971        self.poll_payload_buf.clear();
972        self.poll_payload_offsets.clear();
973        self.poll_meta_partitions.clear();
974        self.poll_meta_offsets.clear();
975        self.poll_meta_timestamps.clear();
976        self.poll_meta_headers.clear();
977
978        let mut total_bytes: u64 = 0;
979        let mut last_topic = String::new();
980        let mut last_partition_id: i32 = 0;
981        let mut last_offset: i64 = -1;
982        let include_metadata = self.config.include_metadata;
983        let include_headers = self.config.include_headers;
984
985        while self.poll_payload_offsets.len() < limit {
986            match rx.try_recv() {
987                Ok(kp) => {
988                    self.channel_len.fetch_sub(1, Ordering::Release);
989                    total_bytes += kp.data.len() as u64;
990                    let start = self.poll_payload_buf.len();
991                    self.poll_payload_buf.extend_from_slice(&kp.data);
992                    self.poll_payload_offsets.push((start, kp.data.len()));
993
994                    self.offsets.update_arc(&kp.topic, kp.partition, kp.offset);
995
996                    if include_metadata {
997                        self.poll_meta_partitions.push(kp.partition);
998                        self.poll_meta_offsets.push(kp.offset);
999                        self.poll_meta_timestamps.push(kp.timestamp_ms);
1000                    }
1001                    if include_headers {
1002                        self.poll_meta_headers.push(kp.headers_json);
1003                    }
1004
1005                    // Update watermark tracker with event timestamp.
1006                    if let Some(ref mut tracker) = self.watermark_tracker {
1007                        if let Some(ts) = kp.timestamp_ms {
1008                            tracker.update_partition(kp.partition, ts);
1009                        }
1010                    }
1011
1012                    if last_topic.as_str() != &*kp.topic || last_partition_id != kp.partition {
1013                        last_topic = kp.topic.to_string();
1014                        last_partition_id = kp.partition;
1015                    }
1016                    last_offset = kp.offset;
1017                }
1018                Err(crossfire::TryRecvError::Empty) => break,
1019                Err(crossfire::TryRecvError::Disconnected) => {
1020                    self.state = ConnectorState::Failed;
1021                    return Err(ConnectorError::Internal(
1022                        "Kafka reader task exited unexpectedly".into(),
1023                    ));
1024                }
1025            }
1026        }
1027
1028        // Check for idle partitions on each poll cycle.
1029        if let Some(ref mut tracker) = self.watermark_tracker {
1030            tracker.check_idle_partitions();
1031        }
1032
1033        // Sync rebalance counter → metrics (bridge from rdkafka background thread).
1034        let rebalance_events = self.rebalance_counter.swap(0, Ordering::Relaxed);
1035        for _ in 0..rebalance_events {
1036            self.metrics.record_rebalance();
1037        }
1038
1039        // Lock-free revoke detection: check if a rebalance revoke happened
1040        // since the last poll cycle. If so, purge offsets for revoked partitions.
1041        let current_revoke_gen = self.revoke_generation.load(Ordering::Acquire);
1042        let had_revoke = current_revoke_gen != self.last_seen_revoke_gen;
1043        if had_revoke {
1044            self.last_seen_revoke_gen = current_revoke_gen;
1045            // Clone is intentional: only runs on rebalance events (rare), not per-poll.
1046            let assigned = lock_or_recover(&self.rebalance_state)
1047                .assigned_partitions()
1048                .clone();
1049            let before = self.offsets.partition_count();
1050            self.offsets.retain_assigned(&assigned);
1051            let after = self.offsets.partition_count();
1052            if before != after {
1053                debug!(
1054                    before,
1055                    after, "purged revoked partition offsets after rebalance"
1056                );
1057            }
1058        }
1059
1060        // Publish current offsets to the reader task for periodic broker commits
1061        // and to the rebalance callback for seek-on-assign.
1062        //
1063        // Filter by the current assignment: `retain_assigned` only runs when
1064        // `had_revoke` is newly detected, so between the revoke callback and
1065        // the next `poll_batch` the tracker may transiently hold entries for
1066        // partitions we no longer own. An unfiltered commit of those offsets
1067        // is rejected by the broker (`IllegalGeneration` / "Consumer not
1068        // assigned"), which flips `commit_retry_needed` on and stalls the
1069        // commit loop against the same stale TPL — visible as a sawtooth in
1070        // consumer lag.
1071        if had_revoke || !self.poll_payload_offsets.is_empty() {
1072            if let Some(ref tx) = self.offset_commit_tx {
1073                let assigned = lock_or_recover(&self.rebalance_state)
1074                    .assigned_partitions()
1075                    .clone();
1076                let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
1077                if tx.send(tpl).is_err() {
1078                    debug!("offset_commit_tx closed, reader task shutting down");
1079                }
1080            }
1081            lock_or_recover(&self.offset_snapshot).clone_from(&self.offsets);
1082        }
1083
1084        // Compute consumer lag from high watermarks (moved from metrics()
1085        // to avoid side-effects in a &self getter).
1086        if let Some(ref hwm_rx) = self.high_watermarks_rx {
1087            let watermarks = hwm_rx.borrow();
1088            let mut total_lag: u64 = 0;
1089            for (topic, partition, high_watermark) in watermarks.iter() {
1090                if let Some(current_offset) = self.offsets.get(topic, *partition) {
1091                    let lag = high_watermark.saturating_sub(current_offset + 1);
1092                    #[allow(clippy::cast_sign_loss)]
1093                    if lag > 0 {
1094                        total_lag += lag as u64;
1095                    }
1096                }
1097            }
1098            self.metrics.set_lag(total_lag);
1099        }
1100
1101        if self.poll_payload_offsets.is_empty() {
1102            return Ok(None);
1103        }
1104
1105        // PartitionInfo reflects the last topic-partition seen in this batch.
1106        // Per-partition offsets are tracked correctly in `self.offsets` and
1107        // persisted via checkpoint(); this field is informational only.
1108        let last_partition = if last_offset >= 0 {
1109            Some(PartitionInfo::new(
1110                format!("{last_topic}-{last_partition_id}"),
1111                last_offset.to_string(),
1112            ))
1113        } else {
1114            None
1115        };
1116
1117        // Resolve Avro schemas from Schema Registry before deserialization.
1118        // Also detect schema evolution when new schema IDs appear.
1119        if let Some(avro_deser) = self
1120            .deserializer
1121            .as_any_mut()
1122            .and_then(|any| any.downcast_mut::<AvroDeserializer>())
1123        {
1124            let mut new_schema_ids = Vec::new();
1125            for &(start, len) in &self.poll_payload_offsets {
1126                if let Some(schema_id) = AvroDeserializer::extract_confluent_id(
1127                    &self.poll_payload_buf[start..start + len],
1128                ) {
1129                    let is_new = avro_deser
1130                        .ensure_schema_registered(schema_id)
1131                        .await
1132                        .map_err(ConnectorError::Serde)?;
1133                    if is_new {
1134                        new_schema_ids.push(schema_id);
1135                    }
1136                }
1137            }
1138
1139            // Detect schema evolution by diffing successive writer schemas.
1140            if !new_schema_ids.is_empty()
1141                && self.config.schema_evolution_strategy != SchemaEvolutionStrategy::Ignore
1142            {
1143                if let Some(ref sr) = self.schema_registry {
1144                    let compat = self
1145                        .config
1146                        .schema_compatibility
1147                        .map_or(CompatibilityMode::Backward, CompatibilityMode::from);
1148                    let evolver = SchemaEvolution::new(compat);
1149
1150                    for id in new_schema_ids {
1151                        let cached = sr.resolve_confluent_id(id).await.map_err(|e| {
1152                            ConnectorError::SchemaMismatch(format!(
1153                                "failed to resolve schema {id}: {e}"
1154                            ))
1155                        })?;
1156
1157                        let Some(ref prev) = self.last_avro_schema else {
1158                            // First schema — establish baseline, nothing to diff.
1159                            info!(schema_id = id, "initial Avro schema registered");
1160                            self.last_avro_schema = Some(Arc::clone(&cached.arrow_schema));
1161                            continue;
1162                        };
1163
1164                        let changes = evolver.diff_schemas(prev, &cached.arrow_schema);
1165                        self.last_avro_schema = Some(Arc::clone(&cached.arrow_schema));
1166
1167                        if changes.is_empty() {
1168                            info!(
1169                                schema_id = id,
1170                                "new Avro schema ID registered, no field changes"
1171                            );
1172                            continue;
1173                        }
1174                        let verdict = evolver.evaluate_evolution(&changes);
1175                        match &verdict {
1176                            EvolutionVerdict::Compatible => {
1177                                info!(schema_id = id, ?changes, "schema evolved (compatible)");
1178                            }
1179                            EvolutionVerdict::RequiresMigration => {
1180                                warn!(
1181                                    schema_id = id,
1182                                    ?changes,
1183                                    "schema evolved (requires migration)"
1184                                );
1185                            }
1186                            EvolutionVerdict::Incompatible(reason) => {
1187                                if self.config.schema_evolution_strategy
1188                                    == SchemaEvolutionStrategy::Reject
1189                                {
1190                                    return Err(ConnectorError::SchemaMismatch(format!(
1191                                        "incompatible schema evolution for ID {id}: {reason}"
1192                                    )));
1193                                }
1194                                warn!(
1195                                    schema_id = id, %reason, ?changes,
1196                                    "incompatible schema evolution detected"
1197                                );
1198                            }
1199                        }
1200                    }
1201                }
1202            }
1203        }
1204
1205        let refs: Vec<&[u8]> = self
1206            .poll_payload_offsets
1207            .iter()
1208            .map(|&(start, len)| &self.poll_payload_buf[start..start + len])
1209            .collect();
1210
1211        // Try batch deserialization first (fast path). If it fails, fall back
1212        // to per-record deserialization to isolate poison pills.
1213        let (batch, good_indices) = match self.deserializer.deserialize_batch(&refs, &self.schema) {
1214            Ok(batch) => (batch, None),
1215            Err(batch_err) => {
1216                // Per-record fallback: deserialize one at a time, collect
1217                // successful batches directly (avoids double-deserialization).
1218                // Track indices of successful records so metadata vectors can
1219                // be filtered to match the reduced row count.
1220                let mut good_batches = Vec::with_capacity(refs.len());
1221                let mut good_idx = Vec::with_capacity(refs.len());
1222                let mut error_count = 0u64;
1223                for (i, r) in refs.iter().enumerate() {
1224                    match self
1225                        .deserializer
1226                        .deserialize_batch(std::slice::from_ref(r), &self.schema)
1227                    {
1228                        Ok(batch) => {
1229                            good_batches.push(batch);
1230                            good_idx.push(i);
1231                        }
1232                        Err(e) => {
1233                            error_count += 1;
1234                            self.metrics.record_error();
1235                            warn!(error = %e, "skipping poison pill record");
1236                        }
1237                    }
1238                }
1239                if good_batches.is_empty() {
1240                    return Err(ConnectorError::Serde(batch_err));
1241                }
1242                // Escalate if the error rate exceeds the configured threshold.
1243                #[allow(clippy::cast_precision_loss)]
1244                if error_count > 0 {
1245                    let error_rate = error_count as f64 / refs.len() as f64;
1246                    if error_rate > self.config.max_deser_error_rate {
1247                        return Err(ConnectorError::Serde(batch_err));
1248                    }
1249                    warn!(
1250                        skipped = error_count,
1251                        total = refs.len(),
1252                        error_rate = %format_args!("{error_rate:.1}"),
1253                        "deserialized batch with poison pill isolation"
1254                    );
1255                }
1256                let concat_schema = good_batches[0].schema();
1257                let batch = arrow_select::concat::concat_batches(&concat_schema, &good_batches)
1258                    .map_err(|e| {
1259                        ConnectorError::Internal(format!("failed to concat batches: {e}"))
1260                    })?;
1261                (batch, Some(good_idx))
1262            }
1263        };
1264
1265        // If poison pill fallback filtered records, also filter metadata
1266        // vectors so their lengths match the deserialized batch row count.
1267        if let Some(ref idx) = good_indices {
1268            if include_metadata {
1269                self.poll_meta_partitions =
1270                    idx.iter().map(|&i| self.poll_meta_partitions[i]).collect();
1271                self.poll_meta_offsets = idx.iter().map(|&i| self.poll_meta_offsets[i]).collect();
1272                self.poll_meta_timestamps =
1273                    idx.iter().map(|&i| self.poll_meta_timestamps[i]).collect();
1274            }
1275            if include_headers {
1276                self.poll_meta_headers = idx
1277                    .iter()
1278                    .map(|&i| std::mem::take(&mut self.poll_meta_headers[i]))
1279                    .collect();
1280            }
1281        }
1282
1283        // Append metadata columns if configured.
1284        let needs_meta = include_metadata && !self.poll_meta_partitions.is_empty();
1285        let needs_headers = include_headers && !self.poll_meta_headers.is_empty();
1286        let batch = if needs_meta || needs_headers {
1287            use arrow_schema::{DataType, Field};
1288
1289            let mut fields = batch.schema().fields().to_vec();
1290            let mut columns: Vec<Arc<dyn arrow_array::Array>> = batch.columns().to_vec();
1291
1292            if needs_meta {
1293                use arrow_array::{Int32Array, Int64Array, TimestampMillisecondArray};
1294                use arrow_schema::TimeUnit;
1295                fields.push(Arc::new(Field::new("_partition", DataType::Int32, false)));
1296                columns.push(Arc::new(Int32Array::from(std::mem::take(
1297                    &mut self.poll_meta_partitions,
1298                ))));
1299                fields.push(Arc::new(Field::new("_offset", DataType::Int64, false)));
1300                columns.push(Arc::new(Int64Array::from(std::mem::take(
1301                    &mut self.poll_meta_offsets,
1302                ))));
1303                fields.push(Arc::new(Field::new(
1304                    "_timestamp",
1305                    DataType::Timestamp(TimeUnit::Millisecond, None),
1306                    true,
1307                )));
1308                columns.push(Arc::new(TimestampMillisecondArray::from(std::mem::take(
1309                    &mut self.poll_meta_timestamps,
1310                ))));
1311            }
1312            if needs_headers {
1313                fields.push(Arc::new(Field::new("_headers", DataType::Utf8, true)));
1314                columns.push(Arc::new(arrow_array::StringArray::from(std::mem::take(
1315                    &mut self.poll_meta_headers,
1316                ))));
1317            }
1318
1319            let meta_schema = Arc::new(arrow_schema::Schema::new(fields));
1320            arrow_array::RecordBatch::try_new(meta_schema, columns).map_err(|e| {
1321                ConnectorError::Internal(format!("failed to append metadata columns: {e}"))
1322            })?
1323        } else {
1324            batch
1325        };
1326
1327        let num_rows = batch.num_rows();
1328        self.metrics.record_poll(num_rows as u64, total_bytes);
1329
1330        let source_batch = if let Some(partition) = last_partition {
1331            SourceBatch::with_partition(batch, partition)
1332        } else {
1333            SourceBatch::new(batch)
1334        };
1335
1336        debug!(
1337            records = num_rows,
1338            bytes = total_bytes,
1339            "polled batch from Kafka"
1340        );
1341
1342        Ok(Some(source_batch))
1343    }
1344
1345    fn schema(&self) -> SchemaRef {
1346        self.schema.clone()
1347    }
1348
1349    fn checkpoint(&self) -> SourceCheckpoint {
1350        let assigned = lock_or_recover(&self.rebalance_state)
1351            .assigned_partitions()
1352            .clone();
1353
1354        // Push filtered offsets to the reader task so it can commit them
1355        // on shutdown even if close() is called without a subsequent checkpoint.
1356        if let Some(ref tx) = self.offset_commit_tx {
1357            let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
1358            let _ = tx.send(tpl);
1359        }
1360
1361        self.offsets.to_checkpoint_filtered(&assigned)
1362    }
1363
1364    /// Restores the consumer to checkpointed offsets.
1365    ///
1366    /// Offsets are staged in `offset_snapshot`; the actual `seek_partitions`
1367    /// runs in `post_rebalance` when the group coordinator hands us the
1368    /// assignment. Calling `consumer.assign()` here would switch out of
1369    /// `subscribe()` mode and produce `Local: Erroneous state`.
1370    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
1371        info!(
1372            epoch = checkpoint.epoch(),
1373            partition_count = checkpoint.offsets().len(),
1374            "staging checkpointed offsets for seek-on-assign"
1375        );
1376
1377        self.offsets = OffsetTracker::from_checkpoint(checkpoint);
1378        match self.offset_snapshot.lock() {
1379            Ok(mut snapshot) => snapshot.clone_from(&self.offsets),
1380            Err(poisoned) => poisoned.into_inner().clone_from(&self.offsets),
1381        }
1382
1383        Ok(())
1384    }
1385
1386    fn health_check(&self) -> HealthStatus {
1387        match self.state {
1388            ConnectorState::Running => {
1389                if self.reader_paused.load(Ordering::Acquire) {
1390                    HealthStatus::Degraded("backpressure: consumption paused".into())
1391                } else {
1392                    HealthStatus::Healthy
1393                }
1394            }
1395            ConnectorState::Created | ConnectorState::Initializing => HealthStatus::Unknown,
1396            ConnectorState::Paused => HealthStatus::Degraded("connector paused".into()),
1397            ConnectorState::Recovering => HealthStatus::Degraded("recovering".into()),
1398            ConnectorState::Closed => HealthStatus::Unhealthy("closed".into()),
1399            ConnectorState::Failed => HealthStatus::Unhealthy("failed".into()),
1400        }
1401    }
1402
1403    fn metrics(&self) -> ConnectorMetrics {
1404        self.metrics.to_connector_metrics()
1405    }
1406
1407    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
1408        Some(Arc::clone(&self.data_ready))
1409    }
1410
1411    fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> {
1412        Some(Arc::clone(&self.checkpoint_request))
1413    }
1414
1415    async fn close(&mut self) -> Result<(), ConnectorError> {
1416        info!("closing Kafka source connector");
1417
1418        let assigned = lock_or_recover(&self.rebalance_state)
1419            .assigned_partitions()
1420            .clone();
1421
1422        // Send final filtered offsets to the reader task before signaling shutdown.
1423        if let Some(ref tx) = self.offset_commit_tx {
1424            let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
1425            if tpl.count() > 0 {
1426                let _ = tx.send(tpl);
1427            }
1428        }
1429
1430        // Signal shutdown and wait for all background tasks to exit.
1431        if let Some(tx) = self.reader_shutdown.take() {
1432            let _ = tx.send(true);
1433        }
1434        let timeout = std::time::Duration::from_secs(5);
1435        if let Some(handle) = self.reader_handle.take() {
1436            let _ = tokio::time::timeout(timeout, handle).await;
1437        }
1438        if let Some(handle) = self.commit_handle.take() {
1439            let _ = tokio::time::timeout(timeout, handle).await;
1440        }
1441        if let Some(handle) = self.hwm_handle.take() {
1442            let _ = tokio::time::timeout(timeout, handle).await;
1443        }
1444        self.msg_rx = None;
1445        self.offset_commit_tx = None;
1446
1447        // If consumer was never moved to the reader (e.g., close() called
1448        // before any poll_batch()), commit directly.
1449        if let Some(ref consumer) = self.consumer {
1450            let tpl = self.offsets.to_topic_partition_list_filtered(&assigned);
1451            if tpl.count() > 0 {
1452                if let Err(e) = consumer.commit(&tpl, CommitMode::Sync) {
1453                    warn!(error = %e, "failed to commit final offsets");
1454                }
1455            }
1456            consumer.unsubscribe();
1457        }
1458
1459        self.consumer = None;
1460        self.state = ConnectorState::Closed;
1461        info!("Kafka source connector closed");
1462        Ok(())
1463    }
1464}
1465
1466impl std::fmt::Debug for KafkaSource {
1467    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1468        f.debug_struct("KafkaSource")
1469            .field("state", &self.state)
1470            .field("subscription", &self.config.subscription)
1471            .field("group_id", &self.config.group_id)
1472            .field("format", &self.config.format)
1473            .field("partitions", &self.offsets.partition_count())
1474            .finish_non_exhaustive()
1475    }
1476}
1477
1478/// Warn if the CREATE-SOURCE catalog schema has drifted from the live
1479/// Schema Registry schema. Empty `declared` means nothing was declared.
1480fn log_schema_drift(declared: &arrow_schema::Schema, live: &arrow_schema::Schema, subject: &str) {
1481    if declared.fields().is_empty() || declared.fields() == live.fields() {
1482        return;
1483    }
1484    let decl: BTreeSet<&str> = declared
1485        .fields()
1486        .iter()
1487        .map(|f| f.name().as_str())
1488        .collect();
1489    let lv: BTreeSet<&str> = live.fields().iter().map(|f| f.name().as_str()).collect();
1490    warn!(
1491        %subject,
1492        missing_in_sr = ?decl.difference(&lv).collect::<Vec<_>>(),
1493        added_in_sr = ?lv.difference(&decl).collect::<Vec<_>>(),
1494        "schema drift: re-apply CREATE SOURCE DDL to pick up the current SR schema"
1495    );
1496}
1497
1498fn select_deserializer(format: Format) -> Box<dyn RecordDeserializer> {
1499    match format {
1500        Format::Avro => Box::new(AvroDeserializer::new()),
1501        other => serde::create_deserializer(other).unwrap_or_else(|_| {
1502            warn!(format = %other, "unsupported format, falling back to JSON");
1503            Box::new(serde::json::JsonDeserializer::new())
1504        }),
1505    }
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510    use super::*;
1511    use arrow_schema::{DataType, Field, Schema};
1512
1513    fn test_schema() -> SchemaRef {
1514        Arc::new(Schema::new(vec![
1515            Field::new("id", DataType::Int64, false),
1516            Field::new("value", DataType::Utf8, false),
1517        ]))
1518    }
1519
1520    fn test_config() -> KafkaSourceConfig {
1521        let mut cfg = KafkaSourceConfig::default();
1522        cfg.bootstrap_servers = "localhost:9092".into();
1523        cfg.group_id = "test-group".into();
1524        cfg.subscription = TopicSubscription::Topics(vec!["events".into()]);
1525        cfg
1526    }
1527
1528    #[test]
1529    fn test_new_defaults() {
1530        let source = KafkaSource::new(test_schema(), test_config(), None);
1531        assert_eq!(source.state(), ConnectorState::Created);
1532        assert!(source.consumer.is_none());
1533        assert_eq!(source.offsets().partition_count(), 0);
1534    }
1535
1536    #[test]
1537    fn test_schema_returned() {
1538        let schema = test_schema();
1539        let source = KafkaSource::new(schema.clone(), test_config(), None);
1540        assert_eq!(source.schema(), schema);
1541    }
1542
1543    #[test]
1544    fn test_checkpoint_empty() {
1545        let source = KafkaSource::new(test_schema(), test_config(), None);
1546        let cp = source.checkpoint();
1547        assert!(cp.is_empty());
1548    }
1549
1550    #[test]
1551    fn test_checkpoint_with_offsets() {
1552        let mut source = KafkaSource::new(test_schema(), test_config(), None);
1553        source.offsets.update("events", 0, 100);
1554        source.offsets.update("events", 1, 200);
1555
1556        // Simulate rebalance assign so partitions are in the assigned set.
1557        {
1558            let mut state = source.rebalance_state.lock().unwrap();
1559            state.on_assign(&[("events".into(), 0), ("events".into(), 1)]);
1560        }
1561
1562        let cp = source.checkpoint();
1563        assert_eq!(cp.get_offset("events-0"), Some("100"));
1564        assert_eq!(cp.get_offset("events-1"), Some("200"));
1565    }
1566
1567    #[test]
1568    fn test_health_check_created() {
1569        let source = KafkaSource::new(test_schema(), test_config(), None);
1570        assert_eq!(source.health_check(), HealthStatus::Unknown);
1571    }
1572
1573    #[test]
1574    fn test_health_check_running() {
1575        let mut source = KafkaSource::new(test_schema(), test_config(), None);
1576        source.state = ConnectorState::Running;
1577        assert_eq!(source.health_check(), HealthStatus::Healthy);
1578    }
1579
1580    #[test]
1581    fn test_health_check_closed() {
1582        let mut source = KafkaSource::new(test_schema(), test_config(), None);
1583        source.state = ConnectorState::Closed;
1584        assert!(matches!(source.health_check(), HealthStatus::Unhealthy(_)));
1585    }
1586
1587    #[test]
1588    fn test_metrics_initial() {
1589        let source = KafkaSource::new(test_schema(), test_config(), None);
1590        let m = source.metrics();
1591        assert_eq!(m.records_total, 0);
1592        assert_eq!(m.bytes_total, 0);
1593        assert_eq!(m.errors_total, 0);
1594    }
1595
1596    #[test]
1597    fn test_deserializer_selection_json() {
1598        let source = KafkaSource::new(test_schema(), test_config(), None);
1599        assert_eq!(source.deserializer.format(), Format::Json);
1600    }
1601
1602    #[test]
1603    fn test_deserializer_selection_csv() {
1604        let mut cfg = test_config();
1605        cfg.format = Format::Csv;
1606        let source = KafkaSource::new(test_schema(), cfg, None);
1607        assert_eq!(source.deserializer.format(), Format::Csv);
1608    }
1609
1610    #[test]
1611    fn test_with_schema_registry() {
1612        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1613        let mut cfg = test_config();
1614        cfg.format = Format::Avro;
1615        cfg.schema_registry_url = Some("http://localhost:8081".into());
1616
1617        let source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1618        assert!(source.schema_registry.is_some());
1619        assert_eq!(source.deserializer.format(), Format::Avro);
1620    }
1621
1622    #[tokio::test]
1623    async fn test_open_preserves_injected_schema_registry() {
1624        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1625        let mut cfg = test_config();
1626        cfg.format = Format::Avro;
1627        cfg.schema_registry_url = Some("http://localhost:8081".into());
1628        let mut source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1629
1630        // open() with empty config should preserve injected SR.
1631        let empty_config = crate::config::ConnectorConfig::new("kafka");
1632        // open() will fail to connect (no broker), but the deserializer
1633        // re-selection happens before the connection attempt.
1634        let _ = source.open(&empty_config).await;
1635        assert!(source.schema_registry.is_some());
1636        assert_eq!(source.deserializer.format(), Format::Avro);
1637    }
1638
1639    #[test]
1640    fn test_debug_output() {
1641        let source = KafkaSource::new(test_schema(), test_config(), None);
1642        let debug = format!("{source:?}");
1643        assert!(debug.contains("KafkaSource"));
1644        assert!(debug.contains("events"));
1645    }
1646
1647    #[test]
1648    fn test_checkpoint_filters_revoked_partitions() {
1649        let mut source = KafkaSource::new(test_schema(), test_config(), None);
1650        source.offsets.update("events", 0, 100);
1651        source.offsets.update("events", 1, 200);
1652        source.offsets.update("events", 2, 300);
1653
1654        // Simulate rebalance: only partitions 0 and 2 are assigned.
1655        {
1656            let mut state = source.rebalance_state.lock().unwrap();
1657            state.on_assign(&[("events".into(), 0), ("events".into(), 2)]);
1658        }
1659
1660        let cp = source.checkpoint();
1661        assert_eq!(cp.get_offset("events-0"), Some("100"));
1662        assert_eq!(cp.get_offset("events-1"), None); // revoked — filtered out
1663        assert_eq!(cp.get_offset("events-2"), Some("300"));
1664    }
1665
1666    #[test]
1667    fn test_checkpoint_empty_before_first_rebalance() {
1668        let mut source = KafkaSource::new(test_schema(), test_config(), None);
1669        source.offsets.update("events", 0, 100);
1670        source.offsets.update("events", 1, 200);
1671
1672        // No rebalance has occurred — assigned_partitions is empty.
1673        // No assigned partitions means no offsets should be checkpointed.
1674        let cp = source.checkpoint();
1675        assert!(cp.is_empty());
1676    }
1677
1678    // discover_schema tests. Control-flow cases (when to skip, when to
1679    // fail) use plain config inputs; the happy path uses a wiremock HTTP
1680    // server mocking Confluent Schema Registry's REST API.
1681
1682    fn empty_schema() -> SchemaRef {
1683        Arc::new(Schema::empty())
1684    }
1685
1686    fn props(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
1687        pairs
1688            .iter()
1689            .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
1690            .collect()
1691    }
1692
1693    #[tokio::test]
1694    async fn discover_schema_skips_non_avro_format() {
1695        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1696        source
1697            .discover_schema(&props(&[
1698                ("bootstrap.servers", "localhost:9092"),
1699                ("group.id", "g"),
1700                ("topic", "t"),
1701                ("format", "json"),
1702                ("schema.registry.url", "http://localhost:8081"),
1703            ]))
1704            .await;
1705        // Schema must remain untouched (still empty).
1706        assert_eq!(source.schema().fields().len(), 0);
1707    }
1708
1709    #[tokio::test]
1710    async fn discover_schema_skips_without_sr_url() {
1711        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1712        source
1713            .discover_schema(&props(&[
1714                ("bootstrap.servers", "localhost:9092"),
1715                ("group.id", "g"),
1716                ("topic", "t"),
1717                ("format", "avro"),
1718            ]))
1719            .await;
1720        assert_eq!(source.schema().fields().len(), 0);
1721    }
1722
1723    #[tokio::test]
1724    async fn discover_schema_skips_topic_pattern() {
1725        // topic.pattern cannot map to a single SR subject. Must skip
1726        // cleanly, not panic or select an arbitrary topic.
1727        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1728        source
1729            .discover_schema(&props(&[
1730                ("bootstrap.servers", "localhost:9092"),
1731                ("group.id", "g"),
1732                ("topic.pattern", "events-.*"),
1733                ("format", "avro"),
1734                ("schema.registry.url", "http://localhost:8081"),
1735            ]))
1736            .await;
1737        assert_eq!(source.schema().fields().len(), 0);
1738    }
1739
1740    #[tokio::test]
1741    async fn discover_schema_unreachable_sr_leaves_schema_empty() {
1742        // Use a reserved-documentation IP on a closed port and bound the
1743        // whole test by a generous wall-clock budget so a bug where we
1744        // forgot the timeout would fail loudly.
1745        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1746        let start = std::time::Instant::now();
1747        tokio::time::timeout(
1748            std::time::Duration::from_secs(20),
1749            source.discover_schema(&props(&[
1750                ("bootstrap.servers", "localhost:9092"),
1751                ("group.id", "g"),
1752                ("topic", "t"),
1753                ("format", "avro"),
1754                // TEST-NET-1 per RFC 5737 — guaranteed not to route.
1755                ("schema.registry.url", "http://192.0.2.1:65535"),
1756            ])),
1757        )
1758        .await
1759        .expect("discover_schema must honor its own 10s timeout");
1760        assert!(
1761            start.elapsed() < std::time::Duration::from_secs(15),
1762            "discover_schema should have returned well before the outer 20s budget"
1763        );
1764        assert_eq!(source.schema().fields().len(), 0);
1765    }
1766
1767    /// Happy path: wiremock SR returns a record-with-map Avro schema
1768    /// (the original "No Field name data" bug shape); `discover_schema`
1769    /// converts it correctly and preserves the Map type.
1770    #[tokio::test]
1771    async fn discover_schema_happy_path_with_wiremock_sr() {
1772        use wiremock::matchers::{method, path};
1773        use wiremock::{Mock, MockServer, ResponseTemplate};
1774
1775        let avro_schema = serde_json::json!({
1776            "type": "record",
1777            "name": "event",
1778            "fields": [
1779                {"name": "id", "type": "long"},
1780                {"name": "data", "type": {"type": "map", "values": "string"}}
1781            ]
1782        })
1783        .to_string();
1784
1785        let sr = MockServer::start().await;
1786        Mock::given(method("GET"))
1787            .and(path("/subjects/ion_tw-value/versions/latest"))
1788            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1789                "id": 42,
1790                "version": 1,
1791                "subject": "ion_tw-value",
1792                "schema": avro_schema,
1793                "schemaType": "AVRO",
1794            })))
1795            .mount(&sr)
1796            .await;
1797
1798        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1799        source
1800            .discover_schema(&props(&[
1801                ("bootstrap.servers", "localhost:9092"),
1802                ("group.id", "g"),
1803                ("topic", "ion_tw"),
1804                ("format", "avro"),
1805                ("schema.registry.url", &sr.uri()),
1806            ]))
1807            .await;
1808
1809        let schema = source.schema();
1810        assert_eq!(schema.fields().len(), 2, "expected [id, data]");
1811        assert_eq!(schema.field(0).name(), "id");
1812        assert_eq!(schema.field(1).name(), "data");
1813        assert!(
1814            matches!(
1815                schema.field(1).data_type(),
1816                arrow_schema::DataType::Map(_, _)
1817            ),
1818            "'data' field must survive as a Map type (got {:?})",
1819            schema.field(1).data_type()
1820        );
1821    }
1822
1823    /// Record-name subject strategy resolves to `{record_name}-value`
1824    /// rather than the default `{topic}-value`.
1825    #[tokio::test]
1826    async fn discover_schema_happy_path_record_name_strategy() {
1827        use wiremock::matchers::{method, path};
1828        use wiremock::{Mock, MockServer, ResponseTemplate};
1829
1830        let avro_schema = serde_json::json!({
1831            "type": "record",
1832            "name": "com.acme.Order",
1833            "fields": [
1834                {"name": "order_id", "type": "string"},
1835                {"name": "amount", "type": "double"}
1836            ]
1837        })
1838        .to_string();
1839
1840        let sr = MockServer::start().await;
1841        Mock::given(method("GET"))
1842            .and(path("/subjects/com.acme.Order-value/versions/latest"))
1843            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1844                "id": 7,
1845                "version": 1,
1846                "subject": "com.acme.Order-value",
1847                "schema": avro_schema,
1848                "schemaType": "AVRO",
1849            })))
1850            .mount(&sr)
1851            .await;
1852
1853        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1854        source
1855            .discover_schema(&props(&[
1856                ("bootstrap.servers", "localhost:9092"),
1857                ("group.id", "g"),
1858                ("topic", "orders"),
1859                ("format", "avro"),
1860                ("schema.registry.url", &sr.uri()),
1861                ("schema.registry.subject.name.strategy", "record-name"),
1862                ("schema.registry.record.name", "com.acme.Order"),
1863            ]))
1864            .await;
1865
1866        let schema = source.schema();
1867        assert_eq!(schema.fields().len(), 2);
1868        assert_eq!(schema.field(0).name(), "order_id");
1869        assert_eq!(schema.field(1).name(), "amount");
1870    }
1871
1872    /// Drift detection: catalog has a stale 2-field schema, live SR
1873    /// has evolved to 3 fields. Catalog stays pinned; only
1874    /// `last_avro_schema` tracks the live SR shape.
1875    #[tokio::test]
1876    async fn open_logs_drift_when_sr_evolved_since_ddl() {
1877        use wiremock::matchers::{method, path};
1878        use wiremock::{Mock, MockServer, ResponseTemplate};
1879
1880        let evolved_schema = serde_json::json!({
1881            "type": "record",
1882            "name": "event",
1883            "fields": [
1884                {"name": "id", "type": "long"},
1885                {"name": "data", "type": {"type": "map", "values": "string"}},
1886                {"name": "version", "type": "int"}
1887            ]
1888        })
1889        .to_string();
1890
1891        let sr = MockServer::start().await;
1892        Mock::given(method("GET"))
1893            .and(path("/subjects/ion_tw-value/versions/latest"))
1894            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1895                "id": 99,
1896                "version": 2,
1897                "subject": "ion_tw-value",
1898                "schema": evolved_schema,
1899                "schemaType": "AVRO",
1900            })))
1901            .mount(&sr)
1902            .await;
1903
1904        // Catalog schema baked at CREATE SOURCE time — only two fields,
1905        // predates the `version` field that was just added in SR.
1906        let stale_catalog = Arc::new(Schema::new(vec![
1907            Field::new("id", DataType::Int64, false),
1908            Field::new(
1909                "data",
1910                DataType::Map(
1911                    Arc::new(Field::new(
1912                        "entries",
1913                        DataType::Struct(arrow_schema::Fields::from(vec![
1914                            Field::new("key", DataType::Utf8, false),
1915                            Field::new("value", DataType::Utf8, true),
1916                        ])),
1917                        false,
1918                    )),
1919                    false,
1920                ),
1921                true,
1922            ),
1923        ]));
1924
1925        let mut cfg = KafkaSourceConfig::default();
1926        cfg.bootstrap_servers = "localhost:9092".into();
1927        cfg.group_id = "g".into();
1928        cfg.subscription = TopicSubscription::Topics(vec!["ion_tw".into()]);
1929        cfg.format = Format::Avro;
1930        cfg.schema_registry_url = Some(sr.uri());
1931        let sr_client = SchemaRegistryClient::new(sr.uri(), None);
1932        let mut source = KafkaSource::with_schema_registry(stale_catalog, cfg, sr_client);
1933
1934        let empty_cfg = crate::config::ConnectorConfig::new("kafka");
1935        let _ = source.open(&empty_cfg).await; // broker unreachable — later errors irrelevant
1936
1937        assert_eq!(
1938            source.schema().fields().len(),
1939            2,
1940            "catalog schema must stay pinned even after SR drift"
1941        );
1942        assert_eq!(
1943            source.last_avro_schema.as_ref().map(|s| s.fields().len()),
1944            Some(3),
1945            "last_avro_schema should reflect the evolved SR shape"
1946        );
1947    }
1948}