Skip to main content

laminar_connectors/kafka/
source.rs

1//! Kafka source connector implementation.
2//!
3//! [`KafkaSource`] implements the [`SourceConnector`] trait, consuming
4//! from Kafka topics via rdkafka's `StreamConsumer`, deserializing
5//! messages using pluggable formats, and producing Arrow `RecordBatch`
6//! data through the connector SDK.
7
8use arrow_schema::SchemaRef;
9use async_trait::async_trait;
10use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
11use rdkafka::message::Message;
12use rdkafka::ClientConfig;
13use rdkafka::TopicPartitionList;
14use std::collections::BTreeSet;
15use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17use tokio::sync::{watch, Notify};
18use tracing::{debug, info, warn};
19
20use super::rebalance::LaminarConsumerContext;
21
22/// Locks a mutex, recovering from poison if a prior holder panicked.
23///
24/// Used for state shared with rdkafka's rebalance callback thread.
25/// Poison indicates a panic in the callback — the data may be stale
26/// but is structurally sound, so we recover rather than propagate.
27fn lock_or_recover<T>(mutex: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
28    mutex.lock().unwrap_or_else(|poisoned| {
29        tracing::warn!("mutex poisoned, recovering");
30        poisoned.into_inner()
31    })
32}
33
34use crate::checkpoint::SourceCheckpoint;
35use crate::config::{ConnectorConfig, ConnectorState};
36use crate::connector::{PartitionInfo, SourceBatch, SourceConnector};
37use crate::error::ConnectorError;
38use crate::serde::{self, Format, RecordDeserializer};
39
40use super::avro::AvroDeserializer;
41use super::config::{
42    resolve_value_subject, KafkaSourceConfig, SchemaEvolutionStrategy, StartupMode,
43    TopicSubscription,
44};
45use super::metrics::KafkaSourceMetrics;
46use super::offsets::OffsetTracker;
47use super::rebalance::RebalanceState;
48use super::schema_registry::SchemaRegistryClient;
49use super::watermarks::KafkaWatermarkTracker;
50
51use crate::schema::evolution::SchemaEvolution;
52use crate::schema::traits::{CompatibilityMode, EvolutionVerdict};
53
54/// Payload sent from the background Kafka reader task to [`KafkaSource::poll_batch`].
55struct KafkaPayload {
56    data: Vec<u8>,
57    topic: Arc<str>,
58    partition: i32,
59    offset: i64,
60    timestamp_ms: Option<i64>,
61    /// Kafka message headers serialized as JSON string ("{key: value, ...}").
62    /// Only populated when `include_headers` is enabled.
63    headers_json: Option<String>,
64}
65
66/// Single-consumer async receiver for the reader → `poll_batch` queue.
67type KafkaPayloadRx = crossfire::AsyncRx<crossfire::mpsc::Array<KafkaPayload>>;
68
69/// Kafka source connector that consumes messages and produces Arrow batches.
70///
71/// Operates in Ring 1 (background) and pushes deserialized `RecordBatch`
72/// data to Ring 0 via the streaming `Source<T>` API.
73///
74/// # Lifecycle
75///
76/// 1. Create with [`KafkaSource::new`] or [`KafkaSource::with_schema_registry`]
77/// 2. Call `open()` to connect to Kafka and subscribe to topics
78/// 3. Call `poll_batch()` in a loop to consume messages
79/// 4. Call `checkpoint()` / `restore()` for fault tolerance
80/// 5. Call `close()` for clean shutdown
81pub struct KafkaSource {
82    consumer: Option<StreamConsumer<LaminarConsumerContext>>,
83    config: KafkaSourceConfig,
84    deserializer: Box<dyn RecordDeserializer>,
85    offsets: OffsetTracker,
86    state: ConnectorState,
87    metrics: KafkaSourceMetrics,
88    schema: SchemaRef,
89    channel_len: Arc<AtomicUsize>,
90    rebalance_state: Arc<Mutex<RebalanceState>>,
91    /// Shared rebalance counter bridging `LaminarConsumerContext` → `KafkaSourceMetrics`.
92    rebalance_counter: Arc<AtomicU64>,
93    /// Monotonic counter bumped on each partition revoke event.
94    ///
95    /// Shared with `LaminarConsumerContext` for lock-free revoke detection
96    /// from `poll_batch()`. The source compares `last_seen_revoke_gen`
97    /// against this value each poll cycle and only locks `rebalance_state`
98    /// when a change is detected to purge revoked partition offsets.
99    revoke_generation: Arc<AtomicU64>,
100    /// Last observed value of `revoke_generation`, cached per poll cycle.
101    last_seen_revoke_gen: u64,
102    schema_registry: Option<Arc<SchemaRegistryClient>>,
103    data_ready: Arc<Notify>,
104    checkpoint_request: Arc<AtomicBool>,
105    msg_rx: Option<KafkaPayloadRx>,
106    reader_handle: Option<tokio::task::JoinHandle<()>>,
107    commit_handle: Option<tokio::task::JoinHandle<()>>,
108    hwm_handle: Option<tokio::task::JoinHandle<()>>,
109    reader_shutdown: Option<tokio::sync::watch::Sender<bool>>,
110    /// Latest TPL the commit task should flush (last-writer-wins).
111    commit_tx: Option<watch::Sender<Option<TopicPartitionList>>>,
112    watermark_tracker: Option<KafkaWatermarkTracker>,
113    /// Receiver for high watermark data from the background reader task.
114    /// Each entry is `(topic, partition, high_watermark)` for lag computation.
115    #[allow(clippy::type_complexity)]
116    high_watermarks_rx: Option<tokio::sync::watch::Receiver<Vec<(Arc<str>, i32, i64)>>>,
117    /// Shared flag: `true` when the reader task has paused Kafka partitions
118    /// due to downstream backpressure. Used to re-pause newly assigned
119    /// partitions during rebalance.
120    reader_paused: Arc<AtomicBool>,
121    /// Offset snapshot shared with the rebalance callback for seek-on-assign.
122    /// Updated once per `poll_batch()` cycle (not per message).
123    offset_snapshot: Arc<Mutex<OffsetTracker>>,
124
125    /// Cluster vnode assignment, set in cluster mode via
126    /// [`set_vnode_assignment`](SourceConnector::set_vnode_assignment). When
127    /// present, `open()` uses engine-controlled manual `assign()` of the
128    /// partitions this node owns (`partition % vnode_count`) instead of
129    /// consumer-group `subscribe()`, and the reader re-binds when the
130    /// assignment version rotates. `None` → legacy broker-driven subscribe.
131    vnode_assignment: Option<(
132        Arc<laminar_core::state::VnodeRegistry>,
133        laminar_core::state::NodeId,
134    )>,
135    /// `(topic, partition_count)` captured at `open()` when vnode-assigned, so
136    /// the reader can recompute owned partitions on assignment rotation without
137    /// re-fetching metadata (partition counts are stable).
138    vnode_topic_meta: Vec<(Arc<str>, i32)>,
139
140    /// Last Avro writer schema from the schema registry, used to diff
141    /// successive versions for evolution detection.
142    last_avro_schema: Option<SchemaRef>,
143
144    // Reusable poll_batch buffers — cleared each cycle, capacity retained.
145    poll_payload_buf: Vec<u8>,
146    poll_payload_offsets: Vec<(usize, usize)>,
147    poll_meta_partitions: Vec<i32>,
148    poll_meta_offsets: Vec<i64>,
149    poll_meta_timestamps: Vec<Option<i64>>,
150    poll_meta_headers: Vec<Option<String>>,
151}
152
153impl KafkaSource {
154    /// Creates a new Kafka source connector with explicit schema.
155    #[must_use]
156    pub fn new(
157        schema: SchemaRef,
158        config: KafkaSourceConfig,
159        registry: Option<&prometheus::Registry>,
160    ) -> Self {
161        Self::build_base(schema, config, select_deserializer, None, registry)
162    }
163
164    /// Creates a new Kafka source connector with Schema Registry.
165    #[must_use]
166    pub fn with_schema_registry(
167        schema: SchemaRef,
168        config: KafkaSourceConfig,
169        sr_client: SchemaRegistryClient,
170    ) -> Self {
171        let sr = Arc::new(sr_client);
172        let sr_clone = Arc::clone(&sr);
173        let deser_factory = move |format: Format| -> Box<dyn RecordDeserializer> {
174            if format == Format::Avro {
175                Box::new(AvroDeserializer::with_schema_registry(sr_clone))
176            } else {
177                select_deserializer(format)
178            }
179        };
180        Self::build_base(schema, config, deser_factory, Some(sr), None)
181    }
182
183    /// Build a Schema Registry client from the parsed config, or
184    /// `Ok(None)` when `schema.registry.url` is not set.
185    fn build_sr_client(
186        config: &KafkaSourceConfig,
187    ) -> Result<Option<SchemaRegistryClient>, ConnectorError> {
188        let Some(sr_url) = config.schema_registry_url.as_ref() else {
189            return Ok(None);
190        };
191        let client = if let Some(ca) = config.schema_registry_ssl_ca_location.as_deref() {
192            SchemaRegistryClient::with_tls_mtls(
193                sr_url.clone(),
194                config.schema_registry_auth.clone(),
195                ca,
196                config.schema_registry_ssl_certificate_location.as_deref(),
197                config.schema_registry_ssl_key_location.as_deref(),
198            )?
199        } else {
200            SchemaRegistryClient::new(sr_url.clone(), config.schema_registry_auth.clone())
201        };
202        Ok(Some(client))
203    }
204
205    fn build_base(
206        schema: SchemaRef,
207        config: KafkaSourceConfig,
208        deser_factory: impl FnOnce(Format) -> Box<dyn RecordDeserializer>,
209        schema_registry: Option<Arc<SchemaRegistryClient>>,
210        registry: Option<&prometheus::Registry>,
211    ) -> Self {
212        let deserializer = deser_factory(config.format);
213        let channel_len = Arc::new(AtomicUsize::new(0));
214
215        let watermark_tracker = if config.enable_watermark_tracking {
216            Some(
217                KafkaWatermarkTracker::new(0, config.idle_timeout)
218                    .with_max_out_of_orderness(config.max_out_of_orderness),
219            )
220        } else {
221            None
222        };
223
224        Self {
225            consumer: None,
226            config,
227            deserializer,
228            offsets: OffsetTracker::new(),
229            state: ConnectorState::Created,
230            metrics: KafkaSourceMetrics::new(registry),
231            schema,
232            channel_len,
233            rebalance_state: Arc::new(Mutex::new(RebalanceState::new())),
234            rebalance_counter: Arc::new(AtomicU64::new(0)),
235            revoke_generation: Arc::new(AtomicU64::new(0)),
236            last_seen_revoke_gen: 0,
237            schema_registry,
238            data_ready: Arc::new(Notify::new()),
239            checkpoint_request: Arc::new(AtomicBool::new(false)),
240            msg_rx: None,
241            reader_handle: None,
242            commit_handle: None,
243            hwm_handle: None,
244            reader_shutdown: None,
245            commit_tx: None,
246            watermark_tracker,
247            high_watermarks_rx: None,
248            reader_paused: Arc::new(AtomicBool::new(false)),
249            offset_snapshot: Arc::new(Mutex::new(OffsetTracker::new())),
250            vnode_assignment: None,
251            vnode_topic_meta: Vec::new(),
252            last_avro_schema: None,
253            poll_payload_buf: Vec::new(),
254            poll_payload_offsets: Vec::new(),
255            poll_meta_partitions: Vec::new(),
256            poll_meta_offsets: Vec::new(),
257            poll_meta_timestamps: Vec::new(),
258            poll_meta_headers: Vec::new(),
259        }
260    }
261
262    /// Lifecycle state (Created → Initializing → Running → Closed).
263    #[must_use]
264    pub fn state(&self) -> ConnectorState {
265        self.state
266    }
267
268    /// Per-topic-partition offset state for checkpoint and monitoring.
269    #[must_use]
270    pub fn offsets(&self) -> &OffsetTracker {
271        &self.offsets
272    }
273
274    /// Shared backpressure fill counter for downstream wiring.
275    #[must_use]
276    pub fn channel_len(&self) -> Arc<AtomicUsize> {
277        Arc::clone(&self.channel_len)
278    }
279
280    /// Shared partition assignment state (updated by rebalance callbacks).
281    #[must_use]
282    pub fn rebalance_state(&self) -> Arc<Mutex<RebalanceState>> {
283        Arc::clone(&self.rebalance_state)
284    }
285
286    /// Whether a Schema Registry client is configured.
287    #[must_use]
288    pub fn has_schema_registry(&self) -> bool {
289        self.schema_registry.is_some()
290    }
291
292    /// Current combined watermark from the watermark tracker.
293    ///
294    /// Returns `None` if watermark tracking is disabled or no partitions
295    /// have received data yet.
296    #[must_use]
297    pub fn current_watermark(&self) -> Option<i64> {
298        self.watermark_tracker
299            .as_ref()
300            .and_then(KafkaWatermarkTracker::current_watermark)
301    }
302
303    /// Returns the configured event-time column name, if any.
304    ///
305    /// Used by the pipeline to identify which column contains event timestamps
306    /// for watermark generation, instead of hardcoding `event_time`/`timestamp`.
307    #[must_use]
308    pub fn event_time_column(&self) -> Option<&str> {
309        self.config.event_time_column.as_deref()
310    }
311
312    /// Spawns background tasks on first `poll_batch()` call.
313    ///
314    /// Three tasks handle separate concerns:
315    /// - **Reader**: consumes messages, manages backpressure, detects revokes
316    /// - **Commit**: services on-checkpoint broker offset commits
317    /// - **HWM**: periodic high watermark queries for lag monitoring
318    ///
319    /// Deferred to allow `restore()` to access the consumer directly after `open()`.
320    #[allow(clippy::too_many_lines)]
321    fn ensure_reader_started(&mut self) {
322        if self.reader_handle.is_some() || self.consumer.is_none() {
323            return;
324        }
325
326        let consumer = Arc::new(self.consumer.take().unwrap());
327        let (msg_tx, msg_rx) =
328            crossfire::mpsc::bounded_async::<KafkaPayload>(self.config.reader_channel_capacity);
329        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
330        let (commit_tx, mut commit_rx) = watch::channel::<Option<TopicPartitionList>>(None);
331        let (hwm_tx, hwm_rx) = tokio::sync::watch::channel(Vec::new());
332        // Channel for the reader to publish seen partitions to the HWM task.
333        let (seen_tx, seen_rx) = tokio::sync::watch::channel(Vec::<(Arc<str>, i32)>::new());
334        let data_ready = Arc::clone(&self.data_ready);
335        let channel_len = Arc::clone(&self.channel_len);
336        let capture_headers = self.config.include_headers;
337        let reader_channel_capacity = self.config.reader_channel_capacity;
338        let reader_paused = Arc::clone(&self.reader_paused);
339        let revoke_generation = Arc::clone(&self.revoke_generation);
340        let rebalance_state = Arc::clone(&self.rebalance_state);
341        let pause_threshold = self.config.backpressure_high_watermark;
342        let resume_threshold = self.config.backpressure_low_watermark;
343
344        // Broker commit task: flushes the latest durable TPL to the broker
345        // after each checkpoint. Lifecycle is driven by the watch sender —
346        // when close() drops commit_tx, any pending TPL is processed and
347        // then changed() returns Err, ending the loop.
348        let commit_consumer = Arc::clone(&consumer);
349        let commit_metrics = self.metrics.clone();
350        let commit_handle = tokio::spawn(async move {
351            while commit_rx.changed().await.is_ok() {
352                let Some(tpl) = commit_rx.borrow_and_update().clone() else {
353                    continue;
354                };
355                if tpl.count() == 0 {
356                    continue;
357                }
358                let start = std::time::Instant::now();
359                let c = Arc::clone(&commit_consumer);
360                let result =
361                    tokio::task::spawn_blocking(move || c.commit(&tpl, CommitMode::Sync)).await;
362                commit_metrics.observe_broker_commit_duration(start.elapsed().as_secs_f64());
363
364                match result {
365                    Ok(Ok(())) => {}
366                    Ok(Err(e)) => {
367                        // Failure counting is centralized in
368                        // LaminarConsumerContext::commit_callback (which
369                        // librdkafka fires for both sync and async
370                        // commits) — bumping here would double-count.
371                        warn!(error = %e, "broker offset commit rejected");
372                    }
373                    Err(e) => {
374                        // The callback does not fire for spawn_blocking
375                        // panics, so this is the only path that records
376                        // them.
377                        commit_metrics.commit_failures_panic.inc();
378                        warn!(error = %e, "broker offset commit task panicked");
379                    }
380                }
381            }
382        });
383
384        // -- HWM task: periodic high watermark queries for lag monitoring --
385        let hwm_consumer = Arc::clone(&consumer);
386        let mut hwm_shutdown = shutdown_rx.clone();
387        let hwm_handle = tokio::spawn(async move {
388            let mut timer = tokio::time::interval(std::time::Duration::from_secs(30));
389            timer.tick().await; // skip first
390
391            loop {
392                tokio::select! {
393                    biased;
394                    _ = hwm_shutdown.changed() => break,
395                    _ = timer.tick() => {
396                        let partitions: Vec<_> = seen_rx.borrow().clone();
397                        if partitions.is_empty() { continue; }
398                        let c = Arc::clone(&hwm_consumer);
399                        let watermarks = tokio::time::timeout(
400                            std::time::Duration::from_secs(10),
401                            tokio::task::spawn_blocking(move || {
402                                let mut results = Vec::with_capacity(partitions.len());
403                                for (topic, partition) in &partitions {
404                                    match c.fetch_watermarks(topic, *partition, std::time::Duration::from_secs(1)) {
405                                        Ok((_low, high)) => results.push((Arc::clone(topic), *partition, high)),
406                                        Err(e) => debug!(%topic, partition, error = %e, "HWM fetch failed"),
407                                    }
408                                }
409                                results
410                            }),
411                        )
412                        .await
413                        .unwrap_or(Ok(Vec::new()))
414                        .unwrap_or_default();
415                        if !watermarks.is_empty() {
416                            let _ = hwm_tx.send(watermarks);
417                        }
418                    }
419                }
420            }
421        });
422
423        // -- Reader task: message consumption, backpressure, revoke pruning --
424        // Engine-controlled re-assignment inputs (cluster mode; `None` otherwise).
425        let vnode_reassign = self
426            .vnode_assignment
427            .as_ref()
428            .map(|(r, s)| (Arc::clone(r), *s));
429        let vnode_topic_meta = self.vnode_topic_meta.clone();
430        let reassign_snapshot = Arc::clone(&self.offset_snapshot);
431        let reassign_default_offset = startup_default_offset(&self.config.startup_mode);
432        let mut reader_shutdown = shutdown_rx;
433        let reader_handle = tokio::spawn(async move {
434            let mut cached_topic: Arc<str> = Arc::from("");
435            let mut seen_partitions: std::collections::HashSet<(Arc<str>, i32)> =
436                std::collections::HashSet::new();
437            let mut is_paused = false;
438            let mut last_revoke_gen: u64 = 0;
439            // Track the vnode assignment generation; open() already assigned at
440            // the current version, so only a later rotation triggers a rebind.
441            let mut last_assignment_version = vnode_reassign
442                .as_ref()
443                .map_or(0, |(r, _)| r.assignment_version());
444
445            loop {
446                // Engine-controlled re-assignment: when the vnode assignment
447                // rotates, rebind to the partitions this node now owns, seeking
448                // each to its tracked offset (manual assign gets no broker
449                // rebalance callback to do this for us).
450                if let Some((registry, self_id)) = &vnode_reassign {
451                    let version = registry.assignment_version();
452                    if version != last_assignment_version {
453                        last_assignment_version = version;
454                        let offsets = lock_or_recover(&reassign_snapshot).clone();
455                        let tpl = build_vnode_assignment_tpl(
456                            registry,
457                            *self_id,
458                            &vnode_topic_meta,
459                            &offsets,
460                            reassign_default_offset,
461                        );
462                        match consumer.assign(&tpl) {
463                            Ok(()) => info!(
464                                version,
465                                owned_partitions = tpl.count(),
466                                "Kafka source re-assigned partitions after vnode rotation"
467                            ),
468                            Err(e) => warn!(
469                                version,
470                                error = %e,
471                                "Kafka source partition re-assignment failed"
472                            ),
473                        }
474                    }
475                }
476
477                // Prune seen_partitions on revoke so HWM task stops querying them.
478                let current_gen = revoke_generation.load(Ordering::Acquire);
479                if current_gen != last_revoke_gen {
480                    last_revoke_gen = current_gen;
481                    let assigned = lock_or_recover(&rebalance_state)
482                        .assigned_partitions()
483                        .clone();
484                    seen_partitions.retain(|(t, p)| assigned.contains(&(t.to_string(), *p)));
485                    let _ = seen_tx.send(seen_partitions.iter().cloned().collect());
486                }
487
488                // Backpressure: pause/resume Kafka partitions based on channel fill.
489                #[allow(clippy::cast_precision_loss)]
490                let fill = if reader_channel_capacity > 0 {
491                    channel_len.load(Ordering::Acquire) as f64 / reader_channel_capacity as f64
492                } else {
493                    0.0
494                };
495                if fill >= pause_threshold && !is_paused {
496                    if let Ok(assignment) = consumer.assignment() {
497                        if consumer.pause(&assignment).is_ok() {
498                            is_paused = true;
499                            reader_paused.store(true, Ordering::Release);
500                            debug!("reader: paused Kafka partitions (fill={fill:.2})");
501                        }
502                    }
503                } else if fill <= resume_threshold && is_paused {
504                    if let Ok(assignment) = consumer.assignment() {
505                        if consumer.resume(&assignment).is_ok() {
506                            is_paused = false;
507                            reader_paused.store(false, Ordering::Release);
508                            debug!("reader: resumed Kafka partitions (fill={fill:.2})");
509                        }
510                    }
511                }
512
513                let msg_result = tokio::select! {
514                    biased;
515                    _ = reader_shutdown.changed() => break,
516                    msg = tokio::time::timeout(
517                        std::time::Duration::from_millis(200),
518                        consumer.recv(),
519                    ) => match msg {
520                        Ok(result) => result,
521                        Err(_timeout) => continue,
522                    },
523                };
524                match msg_result {
525                    Ok(msg) => {
526                        if let Some(payload) = msg.payload() {
527                            let topic = msg.topic();
528                            if &*cached_topic != topic {
529                                cached_topic = Arc::from(topic);
530                            }
531                            if seen_partitions.insert((Arc::clone(&cached_topic), msg.partition()))
532                            {
533                                let _ = seen_tx.send(seen_partitions.iter().cloned().collect());
534                            }
535                            let timestamp_ms = match msg.timestamp() {
536                                rdkafka::Timestamp::CreateTime(ts)
537                                | rdkafka::Timestamp::LogAppendTime(ts) => Some(ts),
538                                rdkafka::Timestamp::NotAvailable => None,
539                            };
540                            let headers_json = if capture_headers {
541                                use rdkafka::message::Headers;
542                                msg.headers().and_then(|hdrs| {
543                                    let pairs: Vec<(String, serde_json::Value)> = (0..hdrs.count())
544                                        .map(|i| {
545                                            let h = hdrs.get(i);
546                                            let val = match h.value {
547                                                Some(v) => serde_json::Value::String(
548                                                    String::from_utf8_lossy(v).into_owned(),
549                                                ),
550                                                None => serde_json::Value::Null,
551                                            };
552                                            (h.key.to_string(), val)
553                                        })
554                                        .collect();
555                                    serde_json::to_string(&pairs).ok()
556                                })
557                            } else {
558                                None
559                            };
560                            let kp = KafkaPayload {
561                                data: payload.to_vec(),
562                                topic: Arc::clone(&cached_topic),
563                                partition: msg.partition(),
564                                offset: msg.offset(),
565                                timestamp_ms,
566                                headers_json,
567                            };
568                            match msg_tx.try_send(kp) {
569                                Ok(()) => {
570                                    channel_len.fetch_add(1, Ordering::Relaxed);
571                                }
572                                Err(crossfire::TrySendError::Full(kp)) => {
573                                    if !is_paused {
574                                        if let Ok(assignment) = consumer.assignment() {
575                                            if consumer.pause(&assignment).is_ok() {
576                                                is_paused = true;
577                                                reader_paused.store(true, Ordering::Release);
578                                                debug!("reader: paused partitions (channel full)");
579                                            }
580                                        }
581                                    }
582                                    channel_len.fetch_add(1, Ordering::Relaxed);
583                                    let send_ok = tokio::select! {
584                                        biased;
585                                        _ = reader_shutdown.changed() => false,
586                                        result = msg_tx.send(kp) => result.is_ok(),
587                                    };
588                                    if !send_ok {
589                                        channel_len.fetch_sub(1, Ordering::Relaxed);
590                                        break;
591                                    }
592                                }
593                                Err(crossfire::TrySendError::Disconnected(_)) => break,
594                            }
595                            data_ready.notify_one();
596                        }
597                    }
598                    Err(e) => {
599                        warn!(error = %e, "Kafka consumer error");
600                    }
601                }
602            }
603
604            // Reader does NOT commit on shutdown. With on-checkpoint
605            // semantics the only durable broker offsets are those that
606            // belong to a committed epoch; close() drains the commit-request
607            // channel before unsubscribing, ensuring nothing is lost.
608            consumer.unsubscribe();
609        });
610
611        self.msg_rx = Some(msg_rx);
612        self.reader_handle = Some(reader_handle);
613        self.commit_handle = Some(commit_handle);
614        self.hwm_handle = Some(hwm_handle);
615        self.reader_shutdown = Some(shutdown_tx);
616        self.commit_tx = Some(commit_tx);
617        self.high_watermarks_rx = Some(hwm_rx);
618    }
619}
620
621/// Build the manual-`assign()` partition list for a vnode-assigned Kafka
622/// source: the partitions this node owns (`partition % vnode_count`), each
623/// positioned at its checkpointed offset + 1 when known (resume after the last
624/// consumed record), else `default_offset`.
625fn build_vnode_assignment_tpl(
626    registry: &laminar_core::state::VnodeRegistry,
627    self_id: laminar_core::state::NodeId,
628    topic_meta: &[(Arc<str>, i32)],
629    offsets: &OffsetTracker,
630    default_offset: rdkafka::Offset,
631) -> TopicPartitionList {
632    let mut tpl = TopicPartitionList::new();
633    for (topic, count) in topic_meta {
634        for partition in crate::partition_assignment::owned_partitions(*count, registry, self_id) {
635            let offset = match offsets.get(topic.as_ref(), partition) {
636                Some(o) => rdkafka::Offset::Offset(o + 1),
637                None => default_offset,
638            };
639            if let Err(e) = tpl.add_partition_offset(topic.as_ref(), partition, offset) {
640                warn!(
641                    topic = %topic, partition, error = %e,
642                    "failed to add vnode-owned partition to assignment"
643                );
644            }
645        }
646    }
647    tpl
648}
649
650/// rdkafka start position for a partition that has no checkpointed offset under
651/// engine-controlled assignment, derived from the configured startup mode.
652fn startup_default_offset(mode: &StartupMode) -> rdkafka::Offset {
653    match mode {
654        StartupMode::Earliest => rdkafka::Offset::Beginning,
655        StartupMode::Latest => rdkafka::Offset::End,
656        // GroupOffsets resumes from committed offsets (falling back to
657        // `auto.offset.reset`); Specific/Timestamp aren't combined with vnode
658        // assignment, so they also defer to the stored position.
659        _ => rdkafka::Offset::Stored,
660    }
661}
662
663#[async_trait]
664#[allow(clippy::too_many_lines)] // poll_batch has legitimate complexity (backpressure + deser + poison pill fallback)
665impl SourceConnector for KafkaSource {
666    fn set_vnode_assignment(
667        &mut self,
668        registry: Arc<laminar_core::state::VnodeRegistry>,
669        self_id: laminar_core::state::NodeId,
670    ) {
671        info!(
672            self_id = self_id.0,
673            vnode_count = registry.vnode_count(),
674            "Kafka source: engine-controlled partition→vnode assignment enabled"
675        );
676        self.vnode_assignment = Some((registry, self_id));
677    }
678
679    async fn open(&mut self, config: &ConnectorConfig) -> Result<(), ConnectorError> {
680        self.state = ConnectorState::Initializing;
681
682        // If config provided, re-parse (supports runtime config override).
683        let kafka_config = if config.properties().is_empty() {
684            self.config.clone()
685        } else {
686            let parsed = KafkaSourceConfig::from_config(config)?;
687            self.config = parsed.clone();
688            parsed
689        };
690
691        // Re-select deserializer (factory defaults to JSON).
692        if let Some(sr_client) = Self::build_sr_client(&kafka_config)? {
693            let sr = Arc::new(sr_client);
694            self.schema_registry = Some(Arc::clone(&sr));
695            self.deserializer = if kafka_config.format == Format::Avro {
696                Box::new(AvroDeserializer::with_schema_registry(sr))
697            } else {
698                select_deserializer(kafka_config.format)
699            };
700        } else if let Some(ref sr) = self.schema_registry {
701            // Preserve SR client injected via with_schema_registry().
702            self.deserializer = if kafka_config.format == Format::Avro {
703                Box::new(AvroDeserializer::with_schema_registry(Arc::clone(sr)))
704            } else {
705                select_deserializer(kafka_config.format)
706            };
707        } else {
708            self.deserializer = select_deserializer(kafka_config.format);
709        }
710
711        // New deserializer has empty known_ids; reset evolution baseline to match.
712        self.last_avro_schema = None;
713
714        // Override schema from SQL DDL if provided.
715        if let Some(schema) = config.arrow_schema() {
716            info!(
717                fields = schema.fields().len(),
718                "using SQL-defined schema for deserialization"
719            );
720            self.schema = schema;
721        }
722
723        info!(
724            brokers = %kafka_config.bootstrap_servers,
725            subscription = ?kafka_config.subscription,
726            group_id = %kafka_config.group_id,
727            format = %kafka_config.format,
728            schema_fields = self.schema.fields().len(),
729            "opening Kafka source connector"
730        );
731
732        // Build rdkafka consumer with rebalance-aware context.
733        let rdkafka_config: ClientConfig = kafka_config.to_rdkafka_config();
734        let context = LaminarConsumerContext::new(
735            Arc::clone(&self.checkpoint_request),
736            Arc::clone(&self.rebalance_state),
737            Arc::clone(&self.rebalance_counter),
738            Arc::clone(&self.revoke_generation),
739            Arc::clone(&self.reader_paused),
740            Arc::clone(&self.offset_snapshot),
741            // IntCounter::clone is an Arc bump; these are shared with the
742            // metrics struct and bumped from librdkafka's background thread
743            // inside `commit_callback`.
744            self.metrics.commits.clone(),
745            self.metrics.commit_failures_rejected.clone(),
746        );
747        let consumer: StreamConsumer<LaminarConsumerContext> =
748            rdkafka_config.create_with_context(context).map_err(|e| {
749                ConnectorError::ConnectionFailed(format!("failed to create consumer: {e}"))
750            })?;
751
752        // Engine-controlled partition assignment (cluster mode): bind each
753        // partition to a vnode (`partition % vnode_count`) and manually
754        // `assign()` only those this node owns, instead of consumer-group
755        // `subscribe()`. Manual assign bypasses the broker rebalance callbacks,
756        // so partitions are positioned here directly (checkpointed offset, else
757        // the startup default). The reader loop re-binds on assignment rotation.
758        let vnode = self
759            .vnode_assignment
760            .as_ref()
761            .map(|(r, s)| (Arc::clone(r), *s));
762        let vnode_assigned = if let Some((registry, self_id)) = vnode {
763            if let TopicSubscription::Topics(topics) = &kafka_config.subscription {
764                let mut topic_meta: Vec<(Arc<str>, i32)> = Vec::with_capacity(topics.len());
765                for topic in topics {
766                    let md = consumer
767                        .fetch_metadata(Some(topic), std::time::Duration::from_secs(10))
768                        .map_err(|e| {
769                            ConnectorError::ConnectionFailed(format!(
770                                "metadata fetch for '{topic}': {e}"
771                            ))
772                        })?;
773                    let count = md
774                        .topics()
775                        .iter()
776                        .find(|t| t.name() == topic.as_str())
777                        .map_or(0usize, |t| t.partitions().len());
778                    topic_meta.push((
779                        Arc::from(topic.as_str()),
780                        i32::try_from(count).unwrap_or(i32::MAX),
781                    ));
782                }
783                let default_offset = startup_default_offset(&kafka_config.startup_mode);
784                let tpl = build_vnode_assignment_tpl(
785                    &registry,
786                    self_id,
787                    &topic_meta,
788                    &self.offsets,
789                    default_offset,
790                );
791                let owned = tpl.count();
792                consumer.assign(&tpl).map_err(|e| {
793                    ConnectorError::ConnectionFailed(format!("vnode partition assign failed: {e}"))
794                })?;
795                self.vnode_topic_meta = topic_meta;
796                info!(
797                    owned_partitions = owned,
798                    "Kafka source assigned vnode-owned partitions (engine-controlled)"
799                );
800                true
801            } else {
802                warn!(
803                    "vnode-aware assignment is unsupported with topic patterns — \
804                     falling back to consumer-group subscribe()"
805                );
806                false
807            }
808        } else {
809            false
810        };
811
812        // Subscribe to topics (list or regex pattern). Skipped when the engine
813        // manually assigned partitions above.
814        if !vnode_assigned {
815            match &kafka_config.subscription {
816                TopicSubscription::Topics(topics) => {
817                    let topic_refs: Vec<&str> = topics.iter().map(String::as_str).collect();
818                    consumer.subscribe(&topic_refs).map_err(|e| {
819                        ConnectorError::ConnectionFailed(format!("failed to subscribe: {e}"))
820                    })?;
821                }
822                TopicSubscription::Pattern(pattern) => {
823                    // rdkafka requires a ^ prefix for regex patterns
824                    let regex_pattern = if pattern.starts_with('^') {
825                        pattern.clone()
826                    } else {
827                        format!("^{pattern}")
828                    };
829                    consumer.subscribe(&[&regex_pattern]).map_err(|e| {
830                        ConnectorError::ConnectionFailed(format!(
831                            "failed to subscribe to pattern: {e}"
832                        ))
833                    })?;
834                }
835            }
836
837            // Apply startup mode positioning before starting the reader.
838            match &kafka_config.startup_mode {
839                // GroupOffsets/Earliest/Latest are handled via auto.offset.reset in to_rdkafka_config().
840                StartupMode::GroupOffsets | StartupMode::Earliest | StartupMode::Latest => {}
841                StartupMode::SpecificOffsets(offsets) => {
842                    let mut tpl = rdkafka::TopicPartitionList::new();
843                    let topics = match &kafka_config.subscription {
844                        TopicSubscription::Topics(t) => t.clone(),
845                        TopicSubscription::Pattern(_) => Vec::new(),
846                    };
847                    for topic in &topics {
848                        for (&partition, &offset) in offsets {
849                            if let Err(e) = tpl.add_partition_offset(
850                                topic,
851                                partition,
852                                rdkafka::Offset::Offset(offset),
853                            ) {
854                                tracing::warn!(
855                                    %topic, partition, offset,
856                                    error = %e,
857                                    "failed to add specific offset to partition list"
858                                );
859                            }
860                        }
861                    }
862                    if tpl.count() > 0 {
863                        consumer.assign(&tpl).map_err(|e| {
864                            ConnectorError::ConnectionFailed(format!(
865                                "failed to assign specific offsets: {e}"
866                            ))
867                        })?;
868                        info!(
869                            partition_count = tpl.count(),
870                            "assigned consumer to specific offsets"
871                        );
872                    }
873                }
874                StartupMode::Timestamp(ts_ms) => {
875                    // rdkafka requires assignment before offsets_for_times.
876                    // Wait briefly for partition assignment from the group coordinator,
877                    // then seek each assigned partition to the target timestamp.
878                    let mut tpl = rdkafka::TopicPartitionList::new();
879                    let topics = match &kafka_config.subscription {
880                        TopicSubscription::Topics(t) => t.clone(),
881                        TopicSubscription::Pattern(_) => Vec::new(),
882                    };
883                    // Query metadata to discover partition count per topic.
884                    if let Ok(metadata) = consumer.fetch_metadata(
885                        topics.first().map(String::as_str),
886                        std::time::Duration::from_secs(10),
887                    ) {
888                        for topic_meta in metadata.topics() {
889                            for partition_meta in topic_meta.partitions() {
890                                if let Err(e) = tpl.add_partition_offset(
891                                    topic_meta.name(),
892                                    partition_meta.id(),
893                                    rdkafka::Offset::Offset(*ts_ms),
894                                ) {
895                                    tracing::warn!(
896                                        topic = topic_meta.name(),
897                                        partition = partition_meta.id(),
898                                        error = %e,
899                                        "failed to add timestamp offset to partition list"
900                                    );
901                                }
902                            }
903                        }
904                    }
905                    if tpl.count() > 0 {
906                        match consumer.offsets_for_times(tpl, std::time::Duration::from_secs(10)) {
907                            Ok(resolved) => {
908                                consumer.assign(&resolved).map_err(|e| {
909                                    ConnectorError::ConnectionFailed(format!(
910                                        "failed to assign timestamp offsets: {e}"
911                                    ))
912                                })?;
913                                info!(
914                                    timestamp_ms = ts_ms,
915                                    partition_count = resolved.count(),
916                                    "assigned consumer to timestamp offsets"
917                                );
918                            }
919                            Err(e) => {
920                                warn!(
921                                    error = %e,
922                                    timestamp_ms = ts_ms,
923                                    "failed to resolve timestamp offsets, falling back to group offsets"
924                                );
925                            }
926                        }
927                    }
928                }
929            }
930        } // end `if !vnode_assigned`
931
932        self.consumer = Some(consumer);
933        self.state = ConnectorState::Running;
934
935        // Reader is NOT started here — deferred to first `poll_batch()`.
936        // This allows `restore()` to access the consumer directly between
937        // `open()` and the first poll, calling `consumer.assign()` to seek
938        // to checkpoint offsets. The pipeline's fallback poll interval
939        // ensures the first `poll_batch()` fires promptly even without
940        // a `data_ready_notify()` signal.
941
942        // Eagerly fetch the SR schema so the Arrow schema is available at
943        // plan time (before the first poll_batch).
944        if let Some(ref sr) = self.schema_registry {
945            if let TopicSubscription::Topics(topics) = &kafka_config.subscription {
946                if topics.len() > 1 {
947                    warn!("multiple topics with schema registry — using first topic's schema");
948                }
949                if let Some(topic) = topics.first() {
950                    let subject = resolve_value_subject(
951                        kafka_config.schema_registry_subject_strategy,
952                        kafka_config.schema_registry_record_name.as_deref(),
953                        topic,
954                    );
955                    match tokio::time::timeout(
956                        kafka_config.schema_registry_discovery_timeout,
957                        sr.get_latest_schema(&subject),
958                    )
959                    .await
960                    {
961                        Ok(Ok(cached)) => {
962                            if let Some(avro_deser) = self
963                                .deserializer
964                                .as_any_mut()
965                                .and_then(|any| any.downcast_mut::<AvroDeserializer>())
966                            {
967                                if let Err(e) =
968                                    avro_deser.register_schema(cached.id, &cached.schema_str)
969                                {
970                                    warn!(%subject, error = %e, "SR schema register failed");
971                                } else {
972                                    // Keep the catalog schema pinned — planner
973                                    // plans are already built against it.
974                                    log_schema_drift(&self.schema, &cached.arrow_schema, &subject);
975                                    info!(%subject, schema_id = cached.id,
976                                        "SR schema fetched at open()");
977                                    self.last_avro_schema = Some(cached.arrow_schema);
978                                }
979                            }
980                        }
981                        Ok(Err(e)) => {
982                            warn!(%subject, error = %e, "SR unavailable at open(), will resolve lazily");
983                        }
984                        Err(_elapsed) => {
985                            warn!(%subject, "SR prefetch timed out at open(), will resolve lazily");
986                        }
987                    }
988                }
989            }
990        }
991
992        info!("Kafka source connector opened successfully");
993        Ok(())
994    }
995
996    async fn discover_schema(
997        &mut self,
998        properties: &std::collections::HashMap<String, String>,
999    ) -> Result<(), ConnectorError> {
1000        let cfg = crate::config::ConnectorConfig::with_properties("kafka", properties.clone());
1001        let kafka_config = KafkaSourceConfig::from_config(&cfg)?;
1002        if kafka_config.format != Format::Avro {
1003            return Ok(());
1004        }
1005
1006        let topic = match &kafka_config.subscription {
1007            TopicSubscription::Topics(topics) => match topics.first() {
1008                Some(t) => {
1009                    if topics.len() > 1 {
1010                        warn!(topics = ?topics, chosen = %t,
1011                            "multi-topic source: using first topic's SR schema");
1012                    }
1013                    t.clone()
1014                }
1015                None => return Ok(()),
1016            },
1017            TopicSubscription::Pattern(pattern) => {
1018                return Err(ConnectorError::ConfigurationError(format!(
1019                    "topic.pattern '{pattern}' cannot auto-discover a schema; \
1020                     declare columns explicitly"
1021                )));
1022            }
1023        };
1024
1025        let Some(sr_client) = Self::build_sr_client(&kafka_config)? else {
1026            return Ok(());
1027        };
1028
1029        let subject = resolve_value_subject(
1030            kafka_config.schema_registry_subject_strategy,
1031            kafka_config.schema_registry_record_name.as_deref(),
1032            &topic,
1033        );
1034        let timeout = kafka_config.schema_registry_discovery_timeout;
1035
1036        match tokio::time::timeout(timeout, sr_client.get_latest_schema(&subject)).await {
1037            Ok(Ok(cached)) => {
1038                self.metrics.record_sr_discovery_success();
1039                info!(%subject, schema_id = cached.id,
1040                    fields = cached.arrow_schema.fields().len(),
1041                    "discovered Avro schema from Schema Registry");
1042                self.schema = cached.arrow_schema;
1043                Ok(())
1044            }
1045            Ok(Err(e)) => {
1046                self.metrics.record_sr_discovery_failure();
1047                Err(ConnectorError::ConnectionFailed(format!(
1048                    "Schema Registry lookup failed for subject '{subject}': {e}"
1049                )))
1050            }
1051            Err(_) => {
1052                self.metrics.record_sr_discovery_timeout();
1053                Err(ConnectorError::Timeout(
1054                    u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX),
1055                ))
1056            }
1057        }
1058    }
1059
1060    #[allow(clippy::cast_possible_truncation)] // Kafka partition/offset values fit in narrower types
1061    async fn poll_batch(
1062        &mut self,
1063        max_records: usize,
1064    ) -> Result<Option<SourceBatch>, ConnectorError> {
1065        if self.state != ConnectorState::Running {
1066            return Err(ConnectorError::InvalidState {
1067                expected: "Running".into(),
1068                actual: self.state.to_string(),
1069            });
1070        }
1071
1072        // Lazily spawn the background reader task on first poll.
1073        self.ensure_reader_started();
1074
1075        let rx = self
1076            .msg_rx
1077            .as_mut()
1078            .ok_or_else(|| ConnectorError::InvalidState {
1079                expected: "reader initialized".into(),
1080                actual: "reader is None".into(),
1081            })?;
1082
1083        let limit = max_records.min(self.config.max_poll_records);
1084
1085        // Reuse struct-level buffers — clear without freeing capacity.
1086        self.poll_payload_buf.clear();
1087        self.poll_payload_offsets.clear();
1088        self.poll_meta_partitions.clear();
1089        self.poll_meta_offsets.clear();
1090        self.poll_meta_timestamps.clear();
1091        self.poll_meta_headers.clear();
1092
1093        let mut total_bytes: u64 = 0;
1094        let mut last_topic = String::new();
1095        let mut last_partition_id: i32 = 0;
1096        let mut last_offset: i64 = -1;
1097        let include_metadata = self.config.include_metadata;
1098        let include_headers = self.config.include_headers;
1099
1100        while self.poll_payload_offsets.len() < limit {
1101            match rx.try_recv() {
1102                Ok(kp) => {
1103                    self.channel_len.fetch_sub(1, Ordering::Release);
1104                    total_bytes += kp.data.len() as u64;
1105                    let start = self.poll_payload_buf.len();
1106                    self.poll_payload_buf.extend_from_slice(&kp.data);
1107                    self.poll_payload_offsets.push((start, kp.data.len()));
1108
1109                    self.offsets.update_arc(&kp.topic, kp.partition, kp.offset);
1110
1111                    if include_metadata {
1112                        self.poll_meta_partitions.push(kp.partition);
1113                        self.poll_meta_offsets.push(kp.offset);
1114                        self.poll_meta_timestamps.push(kp.timestamp_ms);
1115                    }
1116                    if include_headers {
1117                        self.poll_meta_headers.push(kp.headers_json);
1118                    }
1119
1120                    // Update watermark tracker with event timestamp.
1121                    if let Some(ref mut tracker) = self.watermark_tracker {
1122                        if let Some(ts) = kp.timestamp_ms {
1123                            tracker.update_partition(kp.partition, ts);
1124                        }
1125                    }
1126
1127                    if last_topic.as_str() != &*kp.topic || last_partition_id != kp.partition {
1128                        last_topic = kp.topic.to_string();
1129                        last_partition_id = kp.partition;
1130                    }
1131                    last_offset = kp.offset;
1132                }
1133                Err(crossfire::TryRecvError::Empty) => break,
1134                Err(crossfire::TryRecvError::Disconnected) => {
1135                    self.state = ConnectorState::Failed;
1136                    return Err(ConnectorError::Internal(
1137                        "Kafka reader task exited unexpectedly".into(),
1138                    ));
1139                }
1140            }
1141        }
1142
1143        // Check for idle partitions on each poll cycle.
1144        if let Some(ref mut tracker) = self.watermark_tracker {
1145            tracker.check_idle_partitions();
1146        }
1147
1148        // Sync rebalance counter → metrics (bridge from rdkafka background thread).
1149        let rebalance_events = self.rebalance_counter.swap(0, Ordering::Relaxed);
1150        for _ in 0..rebalance_events {
1151            self.metrics.record_rebalance();
1152        }
1153
1154        // Lock-free revoke detection: check if a rebalance revoke happened
1155        // since the last poll cycle. If so, purge offsets for revoked partitions.
1156        let current_revoke_gen = self.revoke_generation.load(Ordering::Acquire);
1157        let had_revoke = current_revoke_gen != self.last_seen_revoke_gen;
1158        if had_revoke {
1159            self.last_seen_revoke_gen = current_revoke_gen;
1160            // Clone is intentional: only runs on rebalance events (rare), not per-poll.
1161            let assigned = lock_or_recover(&self.rebalance_state)
1162                .assigned_partitions()
1163                .clone();
1164            let before = self.offsets.partition_count();
1165            self.offsets.retain_assigned(&assigned);
1166            let after = self.offsets.partition_count();
1167            if before != after {
1168                debug!(
1169                    before,
1170                    after, "purged revoked partition offsets after rebalance"
1171                );
1172            }
1173        }
1174
1175        // Update the offset snapshot used by `post_rebalance` for seek-on-assign.
1176        // No broker commit happens here — that's driven exclusively by
1177        // `notify_epoch_committed` against the TPL captured at checkpoint time.
1178        if had_revoke || !self.poll_payload_offsets.is_empty() {
1179            lock_or_recover(&self.offset_snapshot).clone_from(&self.offsets);
1180        }
1181
1182        // Compute consumer lag from high watermarks (moved from metrics()
1183        // to avoid side-effects in a &self getter).
1184        if let Some(ref hwm_rx) = self.high_watermarks_rx {
1185            let watermarks = hwm_rx.borrow();
1186            let mut total_lag: u64 = 0;
1187            for (topic, partition, high_watermark) in watermarks.iter() {
1188                if let Some(current_offset) = self.offsets.get(topic, *partition) {
1189                    let lag = high_watermark.saturating_sub(current_offset + 1);
1190                    #[allow(clippy::cast_sign_loss)]
1191                    if lag > 0 {
1192                        total_lag += lag as u64;
1193                    }
1194                }
1195            }
1196            self.metrics.set_lag(total_lag);
1197        }
1198
1199        if self.poll_payload_offsets.is_empty() {
1200            return Ok(None);
1201        }
1202
1203        // PartitionInfo reflects the last topic-partition seen in this batch.
1204        // Per-partition offsets are tracked correctly in `self.offsets` and
1205        // persisted via checkpoint(); this field is informational only.
1206        let last_partition = if last_offset >= 0 {
1207            Some(PartitionInfo::new(
1208                format!("{last_topic}-{last_partition_id}"),
1209                last_offset.to_string(),
1210            ))
1211        } else {
1212            None
1213        };
1214
1215        // Resolve Avro schemas from Schema Registry before deserialization.
1216        // Also detect schema evolution when new schema IDs appear.
1217        if let Some(avro_deser) = self
1218            .deserializer
1219            .as_any_mut()
1220            .and_then(|any| any.downcast_mut::<AvroDeserializer>())
1221        {
1222            let mut new_schema_ids = Vec::new();
1223            for &(start, len) in &self.poll_payload_offsets {
1224                if let Some(schema_id) = AvroDeserializer::extract_confluent_id(
1225                    &self.poll_payload_buf[start..start + len],
1226                ) {
1227                    let is_new = avro_deser
1228                        .ensure_schema_registered(schema_id)
1229                        .await
1230                        .map_err(ConnectorError::Serde)?;
1231                    if is_new {
1232                        new_schema_ids.push(schema_id);
1233                    }
1234                }
1235            }
1236
1237            // Detect schema evolution by diffing successive writer schemas.
1238            if !new_schema_ids.is_empty()
1239                && self.config.schema_evolution_strategy != SchemaEvolutionStrategy::Ignore
1240            {
1241                if let Some(ref sr) = self.schema_registry {
1242                    let compat = self
1243                        .config
1244                        .schema_compatibility
1245                        .map_or(CompatibilityMode::Backward, CompatibilityMode::from);
1246                    let evolver = SchemaEvolution::new(compat);
1247
1248                    for id in new_schema_ids {
1249                        let cached = sr.resolve_confluent_id(id).await.map_err(|e| {
1250                            ConnectorError::SchemaMismatch(format!(
1251                                "failed to resolve schema {id}: {e}"
1252                            ))
1253                        })?;
1254
1255                        let Some(ref prev) = self.last_avro_schema else {
1256                            // First schema — establish baseline, nothing to diff.
1257                            info!(schema_id = id, "initial Avro schema registered");
1258                            self.last_avro_schema = Some(Arc::clone(&cached.arrow_schema));
1259                            continue;
1260                        };
1261
1262                        let changes = evolver.diff_schemas(prev, &cached.arrow_schema);
1263                        self.last_avro_schema = Some(Arc::clone(&cached.arrow_schema));
1264
1265                        if changes.is_empty() {
1266                            info!(
1267                                schema_id = id,
1268                                "new Avro schema ID registered, no field changes"
1269                            );
1270                            continue;
1271                        }
1272                        let verdict = evolver.evaluate_evolution(&changes);
1273                        match &verdict {
1274                            EvolutionVerdict::Compatible => {
1275                                info!(schema_id = id, ?changes, "schema evolved (compatible)");
1276                            }
1277                            EvolutionVerdict::RequiresMigration => {
1278                                warn!(
1279                                    schema_id = id,
1280                                    ?changes,
1281                                    "schema evolved (requires migration)"
1282                                );
1283                            }
1284                            EvolutionVerdict::Incompatible(reason) => {
1285                                if self.config.schema_evolution_strategy
1286                                    == SchemaEvolutionStrategy::Reject
1287                                {
1288                                    return Err(ConnectorError::SchemaMismatch(format!(
1289                                        "incompatible schema evolution for ID {id}: {reason}"
1290                                    )));
1291                                }
1292                                warn!(
1293                                    schema_id = id, %reason, ?changes,
1294                                    "incompatible schema evolution detected"
1295                                );
1296                            }
1297                        }
1298                    }
1299                }
1300            }
1301        }
1302
1303        let refs: Vec<&[u8]> = self
1304            .poll_payload_offsets
1305            .iter()
1306            .map(|&(start, len)| &self.poll_payload_buf[start..start + len])
1307            .collect();
1308
1309        // Try batch deserialization first (fast path). If it fails, fall back
1310        // to per-record deserialization to isolate poison pills.
1311        let (batch, good_indices) = match self.deserializer.deserialize_batch(&refs, &self.schema) {
1312            Ok(batch) => (batch, None),
1313            Err(batch_err) => {
1314                // Per-record fallback: deserialize one at a time, collect
1315                // successful batches directly (avoids double-deserialization).
1316                // Track indices of successful records so metadata vectors can
1317                // be filtered to match the reduced row count.
1318                let mut good_batches = Vec::with_capacity(refs.len());
1319                let mut good_idx = Vec::with_capacity(refs.len());
1320                let mut error_count = 0u64;
1321                for (i, r) in refs.iter().enumerate() {
1322                    match self
1323                        .deserializer
1324                        .deserialize_batch(std::slice::from_ref(r), &self.schema)
1325                    {
1326                        Ok(batch) => {
1327                            good_batches.push(batch);
1328                            good_idx.push(i);
1329                        }
1330                        Err(e) => {
1331                            error_count += 1;
1332                            self.metrics.record_error();
1333                            warn!(error = %e, "skipping poison pill record");
1334                        }
1335                    }
1336                }
1337                if good_batches.is_empty() {
1338                    return Err(ConnectorError::Serde(batch_err));
1339                }
1340                // Escalate if the error rate exceeds the configured threshold.
1341                #[allow(clippy::cast_precision_loss)]
1342                if error_count > 0 {
1343                    let error_rate = error_count as f64 / refs.len() as f64;
1344                    if error_rate > self.config.max_deser_error_rate {
1345                        return Err(ConnectorError::Serde(batch_err));
1346                    }
1347                    warn!(
1348                        skipped = error_count,
1349                        total = refs.len(),
1350                        error_rate = %format_args!("{error_rate:.1}"),
1351                        "deserialized batch with poison pill isolation"
1352                    );
1353                }
1354                let concat_schema = good_batches[0].schema();
1355                let batch = arrow_select::concat::concat_batches(&concat_schema, &good_batches)
1356                    .map_err(|e| {
1357                        ConnectorError::Internal(format!("failed to concat batches: {e}"))
1358                    })?;
1359                (batch, Some(good_idx))
1360            }
1361        };
1362
1363        // If poison pill fallback filtered records, also filter metadata
1364        // vectors so their lengths match the deserialized batch row count.
1365        if let Some(ref idx) = good_indices {
1366            if include_metadata {
1367                self.poll_meta_partitions =
1368                    idx.iter().map(|&i| self.poll_meta_partitions[i]).collect();
1369                self.poll_meta_offsets = idx.iter().map(|&i| self.poll_meta_offsets[i]).collect();
1370                self.poll_meta_timestamps =
1371                    idx.iter().map(|&i| self.poll_meta_timestamps[i]).collect();
1372            }
1373            if include_headers {
1374                self.poll_meta_headers = idx
1375                    .iter()
1376                    .map(|&i| std::mem::take(&mut self.poll_meta_headers[i]))
1377                    .collect();
1378            }
1379        }
1380
1381        // Append metadata columns if configured.
1382        let needs_meta = include_metadata && !self.poll_meta_partitions.is_empty();
1383        let needs_headers = include_headers && !self.poll_meta_headers.is_empty();
1384        let batch = if needs_meta || needs_headers {
1385            use arrow_schema::{DataType, Field};
1386
1387            let mut fields = batch.schema().fields().to_vec();
1388            let mut columns: Vec<Arc<dyn arrow_array::Array>> = batch.columns().to_vec();
1389
1390            if needs_meta {
1391                use arrow_array::{Int32Array, Int64Array, TimestampMillisecondArray};
1392                use arrow_schema::TimeUnit;
1393                fields.push(Arc::new(Field::new("_partition", DataType::Int32, false)));
1394                columns.push(Arc::new(Int32Array::from(std::mem::take(
1395                    &mut self.poll_meta_partitions,
1396                ))));
1397                fields.push(Arc::new(Field::new("_offset", DataType::Int64, false)));
1398                columns.push(Arc::new(Int64Array::from(std::mem::take(
1399                    &mut self.poll_meta_offsets,
1400                ))));
1401                fields.push(Arc::new(Field::new(
1402                    "_timestamp",
1403                    DataType::Timestamp(TimeUnit::Millisecond, None),
1404                    true,
1405                )));
1406                columns.push(Arc::new(TimestampMillisecondArray::from(std::mem::take(
1407                    &mut self.poll_meta_timestamps,
1408                ))));
1409            }
1410            if needs_headers {
1411                fields.push(Arc::new(Field::new("_headers", DataType::Utf8, true)));
1412                columns.push(Arc::new(arrow_array::StringArray::from(std::mem::take(
1413                    &mut self.poll_meta_headers,
1414                ))));
1415            }
1416
1417            let meta_schema = Arc::new(arrow_schema::Schema::new(fields));
1418            arrow_array::RecordBatch::try_new(meta_schema, columns).map_err(|e| {
1419                ConnectorError::Internal(format!("failed to append metadata columns: {e}"))
1420            })?
1421        } else {
1422            batch
1423        };
1424
1425        let num_rows = batch.num_rows();
1426        self.metrics.record_poll(num_rows as u64, total_bytes);
1427
1428        let source_batch = if let Some(partition) = last_partition {
1429            SourceBatch::with_partition(batch, partition)
1430        } else {
1431            SourceBatch::new(batch)
1432        };
1433
1434        debug!(
1435            records = num_rows,
1436            bytes = total_bytes,
1437            "polled batch from Kafka"
1438        );
1439
1440        Ok(Some(source_batch))
1441    }
1442
1443    fn schema(&self) -> SchemaRef {
1444        self.schema.clone()
1445    }
1446
1447    fn checkpoint(&self) -> SourceCheckpoint {
1448        let assigned = lock_or_recover(&self.rebalance_state)
1449            .assigned_partitions()
1450            .clone();
1451        self.offsets.to_checkpoint_filtered(&assigned)
1452    }
1453
1454    /// Restores the consumer to checkpointed offsets.
1455    ///
1456    /// Offsets are staged in `offset_snapshot`; the actual `seek_partitions`
1457    /// runs in `post_rebalance` when the group coordinator hands us the
1458    /// assignment. Calling `consumer.assign()` here would switch out of
1459    /// `subscribe()` mode and produce `Local: Erroneous state`.
1460    async fn restore(&mut self, checkpoint: &SourceCheckpoint) -> Result<(), ConnectorError> {
1461        info!(
1462            epoch = checkpoint.epoch(),
1463            partition_count = checkpoint.offsets().len(),
1464            "staging checkpointed offsets for seek-on-assign"
1465        );
1466
1467        self.offsets = OffsetTracker::from_checkpoint(checkpoint);
1468        match self.offset_snapshot.lock() {
1469            Ok(mut snapshot) => snapshot.clone_from(&self.offsets),
1470            Err(poisoned) => poisoned.into_inner().clone_from(&self.offsets),
1471        }
1472
1473        Ok(())
1474    }
1475
1476    fn data_ready_notify(&self) -> Option<Arc<Notify>> {
1477        Some(Arc::clone(&self.data_ready))
1478    }
1479
1480    fn checkpoint_requested(&self) -> Option<Arc<AtomicBool>> {
1481        Some(Arc::clone(&self.checkpoint_request))
1482    }
1483
1484    async fn notify_epoch_committed(
1485        &mut self,
1486        _epoch: u64,
1487        checkpoint: &SourceCheckpoint,
1488    ) -> Result<(), ConnectorError> {
1489        if !self.config.broker_commit_on_checkpoint || checkpoint.is_empty() {
1490            return Ok(());
1491        }
1492        let Some(ref tx) = self.commit_tx else {
1493            return Ok(());
1494        };
1495        let tpl = OffsetTracker::from_checkpoint(checkpoint).to_topic_partition_list();
1496        if tpl.count() == 0 {
1497            return Ok(());
1498        }
1499        if tx.send(Some(tpl)).is_err() {
1500            self.metrics.commit_failures_enqueue_dropped.inc();
1501        }
1502        Ok(())
1503    }
1504
1505    async fn close(&mut self) -> Result<(), ConnectorError> {
1506        info!("closing Kafka source connector");
1507
1508        // Close the commit-request channel so the commit task drains
1509        // anything already queued, then exits. We do NOT issue a fresh
1510        // commit here — under on-checkpoint semantics the only durable
1511        // broker offsets are those tied to a committed epoch, and those
1512        // were already enqueued from `notify_epoch_committed`.
1513        self.commit_tx = None;
1514
1515        // Signal shutdown and wait for all background tasks to exit.
1516        if let Some(tx) = self.reader_shutdown.take() {
1517            let _ = tx.send(true);
1518        }
1519        let timeout = std::time::Duration::from_secs(5);
1520        if let Some(handle) = self.reader_handle.take() {
1521            let _ = tokio::time::timeout(timeout, handle).await;
1522        }
1523        if let Some(handle) = self.commit_handle.take() {
1524            let _ = tokio::time::timeout(timeout, handle).await;
1525        }
1526        if let Some(handle) = self.hwm_handle.take() {
1527            let _ = tokio::time::timeout(timeout, handle).await;
1528        }
1529        self.msg_rx = None;
1530
1531        // If consumer was never moved to the reader (e.g., close() called
1532        // before any poll_batch()), unsubscribe directly. No commit:
1533        // nothing has been processed, so there's no durable offset to ack.
1534        if let Some(ref consumer) = self.consumer {
1535            consumer.unsubscribe();
1536        }
1537
1538        self.consumer = None;
1539        self.state = ConnectorState::Closed;
1540        info!("Kafka source connector closed");
1541        Ok(())
1542    }
1543}
1544
1545impl std::fmt::Debug for KafkaSource {
1546    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1547        f.debug_struct("KafkaSource")
1548            .field("state", &self.state)
1549            .field("subscription", &self.config.subscription)
1550            .field("group_id", &self.config.group_id)
1551            .field("format", &self.config.format)
1552            .field("partitions", &self.offsets.partition_count())
1553            .finish_non_exhaustive()
1554    }
1555}
1556
1557/// Warn if the CREATE-SOURCE catalog schema has drifted from the live
1558/// Schema Registry schema. Empty `declared` means nothing was declared.
1559fn log_schema_drift(declared: &arrow_schema::Schema, live: &arrow_schema::Schema, subject: &str) {
1560    if declared.fields().is_empty() || declared.fields() == live.fields() {
1561        return;
1562    }
1563    let decl: BTreeSet<&str> = declared
1564        .fields()
1565        .iter()
1566        .map(|f| f.name().as_str())
1567        .collect();
1568    let lv: BTreeSet<&str> = live.fields().iter().map(|f| f.name().as_str()).collect();
1569    warn!(
1570        %subject,
1571        missing_in_sr = ?decl.difference(&lv).collect::<Vec<_>>(),
1572        added_in_sr = ?lv.difference(&decl).collect::<Vec<_>>(),
1573        "schema drift: re-apply CREATE SOURCE DDL to pick up the current SR schema"
1574    );
1575}
1576
1577fn select_deserializer(format: Format) -> Box<dyn RecordDeserializer> {
1578    match format {
1579        Format::Avro => Box::new(AvroDeserializer::new()),
1580        other => serde::create_deserializer(other).unwrap_or_else(|_| {
1581            warn!(format = %other, "unsupported format, falling back to JSON");
1582            Box::new(serde::json::JsonDeserializer::new())
1583        }),
1584    }
1585}
1586
1587#[cfg(test)]
1588mod tests {
1589    use super::*;
1590    use arrow_schema::{DataType, Field, Schema};
1591
1592    fn test_schema() -> SchemaRef {
1593        Arc::new(Schema::new(vec![
1594            Field::new("id", DataType::Int64, false),
1595            Field::new("value", DataType::Utf8, false),
1596        ]))
1597    }
1598
1599    fn test_config() -> KafkaSourceConfig {
1600        let mut cfg = KafkaSourceConfig::default();
1601        cfg.bootstrap_servers = "localhost:9092".into();
1602        cfg.group_id = "test-group".into();
1603        cfg.subscription = TopicSubscription::Topics(vec!["events".into()]);
1604        cfg
1605    }
1606
1607    #[test]
1608    fn test_new_defaults() {
1609        let source = KafkaSource::new(test_schema(), test_config(), None);
1610        assert_eq!(source.state(), ConnectorState::Created);
1611        assert!(source.consumer.is_none());
1612        assert_eq!(source.offsets().partition_count(), 0);
1613    }
1614
1615    #[test]
1616    fn test_schema_returned() {
1617        let schema = test_schema();
1618        let source = KafkaSource::new(schema.clone(), test_config(), None);
1619        assert_eq!(source.schema(), schema);
1620    }
1621
1622    #[test]
1623    fn test_checkpoint_empty() {
1624        let source = KafkaSource::new(test_schema(), test_config(), None);
1625        let cp = source.checkpoint();
1626        assert!(cp.is_empty());
1627    }
1628
1629    #[test]
1630    fn test_checkpoint_with_offsets() {
1631        let mut source = KafkaSource::new(test_schema(), test_config(), None);
1632        source.offsets.update("events", 0, 100);
1633        source.offsets.update("events", 1, 200);
1634
1635        // Simulate rebalance assign so partitions are in the assigned set.
1636        {
1637            let mut state = source.rebalance_state.lock().unwrap();
1638            state.on_assign(&[("events".into(), 0), ("events".into(), 1)]);
1639        }
1640
1641        let cp = source.checkpoint();
1642        assert_eq!(cp.get_offset("events-0"), Some("100"));
1643        assert_eq!(cp.get_offset("events-1"), Some("200"));
1644    }
1645
1646    #[test]
1647    fn test_deserializer_selection_json() {
1648        let source = KafkaSource::new(test_schema(), test_config(), None);
1649        assert_eq!(source.deserializer.format(), Format::Json);
1650    }
1651
1652    #[test]
1653    fn test_deserializer_selection_csv() {
1654        let mut cfg = test_config();
1655        cfg.format = Format::Csv;
1656        let source = KafkaSource::new(test_schema(), cfg, None);
1657        assert_eq!(source.deserializer.format(), Format::Csv);
1658    }
1659
1660    #[test]
1661    fn test_with_schema_registry() {
1662        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1663        let mut cfg = test_config();
1664        cfg.format = Format::Avro;
1665        cfg.schema_registry_url = Some("http://localhost:8081".into());
1666
1667        let source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1668        assert!(source.schema_registry.is_some());
1669        assert_eq!(source.deserializer.format(), Format::Avro);
1670    }
1671
1672    #[tokio::test]
1673    async fn test_open_preserves_injected_schema_registry() {
1674        let sr = SchemaRegistryClient::new("http://localhost:8081", None);
1675        let mut cfg = test_config();
1676        cfg.format = Format::Avro;
1677        cfg.schema_registry_url = Some("http://localhost:8081".into());
1678        let mut source = KafkaSource::with_schema_registry(test_schema(), cfg, sr);
1679
1680        // open() with empty config should preserve injected SR.
1681        let empty_config = crate::config::ConnectorConfig::new("kafka");
1682        // open() will fail to connect (no broker), but the deserializer
1683        // re-selection happens before the connection attempt.
1684        let _ = source.open(&empty_config).await;
1685        assert!(source.schema_registry.is_some());
1686        assert_eq!(source.deserializer.format(), Format::Avro);
1687    }
1688
1689    #[test]
1690    fn test_debug_output() {
1691        let source = KafkaSource::new(test_schema(), test_config(), None);
1692        let debug = format!("{source:?}");
1693        assert!(debug.contains("KafkaSource"));
1694        assert!(debug.contains("events"));
1695    }
1696
1697    #[test]
1698    fn test_checkpoint_filters_revoked_partitions() {
1699        let mut source = KafkaSource::new(test_schema(), test_config(), None);
1700        source.offsets.update("events", 0, 100);
1701        source.offsets.update("events", 1, 200);
1702        source.offsets.update("events", 2, 300);
1703
1704        // Simulate rebalance: only partitions 0 and 2 are assigned.
1705        {
1706            let mut state = source.rebalance_state.lock().unwrap();
1707            state.on_assign(&[("events".into(), 0), ("events".into(), 2)]);
1708        }
1709
1710        let cp = source.checkpoint();
1711        assert_eq!(cp.get_offset("events-0"), Some("100"));
1712        assert_eq!(cp.get_offset("events-1"), None); // revoked — filtered out
1713        assert_eq!(cp.get_offset("events-2"), Some("300"));
1714    }
1715
1716    #[test]
1717    fn test_checkpoint_empty_before_first_rebalance() {
1718        let mut source = KafkaSource::new(test_schema(), test_config(), None);
1719        source.offsets.update("events", 0, 100);
1720        source.offsets.update("events", 1, 200);
1721
1722        // No rebalance has occurred — assigned_partitions is empty.
1723        // No assigned partitions means no offsets should be checkpointed.
1724        let cp = source.checkpoint();
1725        assert!(cp.is_empty());
1726    }
1727
1728    // discover_schema tests. Control-flow cases (when to skip, when to
1729    // fail) use plain config inputs; the happy path uses a wiremock HTTP
1730    // server mocking Confluent Schema Registry's REST API.
1731
1732    fn empty_schema() -> SchemaRef {
1733        Arc::new(Schema::empty())
1734    }
1735
1736    fn props(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
1737        pairs
1738            .iter()
1739            .map(|(k, v)| ((*k).to_string(), (*v).to_string()))
1740            .collect()
1741    }
1742
1743    #[tokio::test]
1744    async fn discover_schema_skips_non_avro_format() {
1745        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1746        source
1747            .discover_schema(&props(&[
1748                ("bootstrap.servers", "localhost:9092"),
1749                ("group.id", "g"),
1750                ("topic", "t"),
1751                ("format", "json"),
1752                ("schema.registry.url", "http://localhost:8081"),
1753            ]))
1754            .await
1755            .expect("non-avro format is a legitimate skip");
1756        assert_eq!(source.schema().fields().len(), 0);
1757    }
1758
1759    #[tokio::test]
1760    async fn discover_schema_errors_on_avro_without_sr_url() {
1761        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1762        let err = source
1763            .discover_schema(&props(&[
1764                ("bootstrap.servers", "localhost:9092"),
1765                ("group.id", "g"),
1766                ("topic", "t"),
1767                ("format", "avro"),
1768            ]))
1769            .await
1770            .expect_err("avro without schema.registry.url must surface a configuration error");
1771        let msg = err.to_string();
1772        assert!(
1773            msg.contains("schema.registry.url"),
1774            "error must name the missing key, got: {msg}"
1775        );
1776        assert_eq!(source.schema().fields().len(), 0);
1777    }
1778
1779    #[tokio::test]
1780    async fn discover_schema_errors_on_topic_pattern() {
1781        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1782        let err = source
1783            .discover_schema(&props(&[
1784                ("bootstrap.servers", "localhost:9092"),
1785                ("group.id", "g"),
1786                ("topic.pattern", "events-.*"),
1787                ("format", "avro"),
1788                ("schema.registry.url", "http://localhost:8081"),
1789            ]))
1790            .await
1791            .expect_err("topic.pattern + avro must surface a configuration error");
1792        let msg = err.to_string();
1793        assert!(
1794            msg.contains("topic.pattern"),
1795            "error must name the offending key, got: {msg}"
1796        );
1797        assert_eq!(source.schema().fields().len(), 0);
1798    }
1799
1800    #[tokio::test]
1801    async fn discover_schema_errors_on_sr_unreachable() {
1802        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1803        let start = std::time::Instant::now();
1804        let result = tokio::time::timeout(
1805            std::time::Duration::from_secs(20),
1806            source.discover_schema(&props(&[
1807                ("bootstrap.servers", "localhost:9092"),
1808                ("group.id", "g"),
1809                ("topic", "t"),
1810                ("format", "avro"),
1811                ("schema.registry.url", "http://192.0.2.1:65535"),
1812            ])),
1813        )
1814        .await
1815        .expect("discover_schema must honor its own 10s timeout");
1816        assert!(
1817            start.elapsed() < std::time::Duration::from_secs(15),
1818            "discover_schema should have returned well before the outer 20s budget"
1819        );
1820        let err = result.expect_err("unreachable SR must surface as Err");
1821        assert!(
1822            matches!(
1823                err,
1824                ConnectorError::ConnectionFailed(_) | ConnectorError::Timeout(_)
1825            ),
1826            "expected ConnectionFailed or Timeout, got: {err:?}"
1827        );
1828        assert_eq!(source.schema().fields().len(), 0);
1829    }
1830
1831    #[tokio::test]
1832    async fn discover_schema_propagates_broker_commit_interval_rejection() {
1833        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1834        let err = source
1835            .discover_schema(&props(&[
1836                ("bootstrap.servers", "localhost:9092"),
1837                ("group.id", "g"),
1838                ("topic", "t"),
1839                ("format", "avro"),
1840                ("schema.registry.url", "http://localhost:8081"),
1841                ("broker.commit.interval.ms", "5000"),
1842            ]))
1843            .await
1844            .expect_err("deprecated config key must produce a propagated error");
1845        let msg = err.to_string();
1846        assert!(
1847            msg.contains("broker.commit.interval.ms"),
1848            "error must name the offending key, got: {msg}"
1849        );
1850    }
1851
1852    /// Happy path: wiremock SR returns a record-with-map Avro schema
1853    /// (the original "No Field name data" bug shape); `discover_schema`
1854    /// converts it correctly and preserves the Map type.
1855    #[tokio::test]
1856    async fn discover_schema_happy_path_with_wiremock_sr() {
1857        use wiremock::matchers::{method, path};
1858        use wiremock::{Mock, MockServer, ResponseTemplate};
1859
1860        let avro_schema = serde_json::json!({
1861            "type": "record",
1862            "name": "event",
1863            "fields": [
1864                {"name": "id", "type": "long"},
1865                {"name": "data", "type": {"type": "map", "values": "string"}}
1866            ]
1867        })
1868        .to_string();
1869
1870        let sr = MockServer::start().await;
1871        Mock::given(method("GET"))
1872            .and(path("/subjects/ion_tw-value/versions/latest"))
1873            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1874                "id": 42,
1875                "version": 1,
1876                "subject": "ion_tw-value",
1877                "schema": avro_schema,
1878                "schemaType": "AVRO",
1879            })))
1880            .mount(&sr)
1881            .await;
1882
1883        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1884        source
1885            .discover_schema(&props(&[
1886                ("bootstrap.servers", "localhost:9092"),
1887                ("group.id", "g"),
1888                ("topic", "ion_tw"),
1889                ("format", "avro"),
1890                ("schema.registry.url", &sr.uri()),
1891            ]))
1892            .await
1893            .expect("happy-path discovery must succeed");
1894
1895        let schema = source.schema();
1896        assert_eq!(schema.fields().len(), 2, "expected [id, data]");
1897        assert_eq!(schema.field(0).name(), "id");
1898        assert_eq!(schema.field(1).name(), "data");
1899        assert!(
1900            matches!(
1901                schema.field(1).data_type(),
1902                arrow_schema::DataType::Map(_, _)
1903            ),
1904            "'data' field must survive as a Map type (got {:?})",
1905            schema.field(1).data_type()
1906        );
1907    }
1908
1909    /// Record-name subject strategy resolves to `{record_name}-value`
1910    /// rather than the default `{topic}-value`.
1911    #[tokio::test]
1912    async fn discover_schema_happy_path_record_name_strategy() {
1913        use wiremock::matchers::{method, path};
1914        use wiremock::{Mock, MockServer, ResponseTemplate};
1915
1916        let avro_schema = serde_json::json!({
1917            "type": "record",
1918            "name": "com.acme.Order",
1919            "fields": [
1920                {"name": "order_id", "type": "string"},
1921                {"name": "amount", "type": "double"}
1922            ]
1923        })
1924        .to_string();
1925
1926        let sr = MockServer::start().await;
1927        Mock::given(method("GET"))
1928            .and(path("/subjects/com.acme.Order-value/versions/latest"))
1929            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1930                "id": 7,
1931                "version": 1,
1932                "subject": "com.acme.Order-value",
1933                "schema": avro_schema,
1934                "schemaType": "AVRO",
1935            })))
1936            .mount(&sr)
1937            .await;
1938
1939        let mut source = KafkaSource::new(empty_schema(), KafkaSourceConfig::default(), None);
1940        source
1941            .discover_schema(&props(&[
1942                ("bootstrap.servers", "localhost:9092"),
1943                ("group.id", "g"),
1944                ("topic", "orders"),
1945                ("format", "avro"),
1946                ("schema.registry.url", &sr.uri()),
1947                ("schema.registry.subject.name.strategy", "record-name"),
1948                ("schema.registry.record.name", "com.acme.Order"),
1949            ]))
1950            .await
1951            .expect("happy-path discovery must succeed");
1952
1953        let schema = source.schema();
1954        assert_eq!(schema.fields().len(), 2);
1955        assert_eq!(schema.field(0).name(), "order_id");
1956        assert_eq!(schema.field(1).name(), "amount");
1957    }
1958
1959    /// Drift detection: catalog has a stale 2-field schema, live SR
1960    /// has evolved to 3 fields. Catalog stays pinned; only
1961    /// `last_avro_schema` tracks the live SR shape.
1962    #[tokio::test]
1963    async fn open_logs_drift_when_sr_evolved_since_ddl() {
1964        use wiremock::matchers::{method, path};
1965        use wiremock::{Mock, MockServer, ResponseTemplate};
1966
1967        let evolved_schema = serde_json::json!({
1968            "type": "record",
1969            "name": "event",
1970            "fields": [
1971                {"name": "id", "type": "long"},
1972                {"name": "data", "type": {"type": "map", "values": "string"}},
1973                {"name": "version", "type": "int"}
1974            ]
1975        })
1976        .to_string();
1977
1978        let sr = MockServer::start().await;
1979        Mock::given(method("GET"))
1980            .and(path("/subjects/ion_tw-value/versions/latest"))
1981            .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
1982                "id": 99,
1983                "version": 2,
1984                "subject": "ion_tw-value",
1985                "schema": evolved_schema,
1986                "schemaType": "AVRO",
1987            })))
1988            .mount(&sr)
1989            .await;
1990
1991        // Catalog schema baked at CREATE SOURCE time — only two fields,
1992        // predates the `version` field that was just added in SR.
1993        let stale_catalog = Arc::new(Schema::new(vec![
1994            Field::new("id", DataType::Int64, false),
1995            Field::new(
1996                "data",
1997                DataType::Map(
1998                    Arc::new(Field::new(
1999                        "entries",
2000                        DataType::Struct(arrow_schema::Fields::from(vec![
2001                            Field::new("key", DataType::Utf8, false),
2002                            Field::new("value", DataType::Utf8, true),
2003                        ])),
2004                        false,
2005                    )),
2006                    false,
2007                ),
2008                true,
2009            ),
2010        ]));
2011
2012        let mut cfg = KafkaSourceConfig::default();
2013        cfg.bootstrap_servers = "localhost:9092".into();
2014        cfg.group_id = "g".into();
2015        cfg.subscription = TopicSubscription::Topics(vec!["ion_tw".into()]);
2016        cfg.format = Format::Avro;
2017        cfg.schema_registry_url = Some(sr.uri());
2018        let sr_client = SchemaRegistryClient::new(sr.uri(), None);
2019        let mut source = KafkaSource::with_schema_registry(stale_catalog, cfg, sr_client);
2020
2021        let empty_cfg = crate::config::ConnectorConfig::new("kafka");
2022        let _ = source.open(&empty_cfg).await; // broker unreachable — later errors irrelevant
2023
2024        assert_eq!(
2025            source.schema().fields().len(),
2026            2,
2027            "catalog schema must stay pinned even after SR drift"
2028        );
2029        assert_eq!(
2030            source.last_avro_schema.as_ref().map(|s| s.fields().len()),
2031            Some(3),
2032            "last_avro_schema should reflect the evolved SR shape"
2033        );
2034    }
2035
2036    // The hook builds a TPL from a durable SourceCheckpoint and uses
2037    // offset+1 (next-to-fetch) per Kafka convention. We exercise the
2038    // translation directly via OffsetTracker, since the hook delegates
2039    // to it.
2040    #[test]
2041    fn test_checkpoint_to_tpl_uses_next_offset() {
2042        let mut offsets = std::collections::HashMap::new();
2043        offsets.insert("events-0".to_string(), "100".to_string());
2044        offsets.insert("events-1".to_string(), "200".to_string());
2045        let cp = SourceCheckpoint::with_offsets(1, offsets);
2046        let tpl = OffsetTracker::from_checkpoint(&cp).to_topic_partition_list();
2047        assert_eq!(tpl.count(), 2);
2048        for elem in tpl.elements() {
2049            let expected = match elem.partition() {
2050                0 => rdkafka::Offset::Offset(101),
2051                1 => rdkafka::Offset::Offset(201),
2052                p => panic!("unexpected partition {p}"),
2053            };
2054            assert_eq!(elem.offset(), expected);
2055        }
2056    }
2057
2058    #[tokio::test]
2059    async fn test_notify_epoch_committed_empty_cp_is_noop() {
2060        let mut source = KafkaSource::new(test_schema(), test_config(), None);
2061        // No reader, no commit channel — opt-out path. Empty cp must not panic.
2062        source
2063            .notify_epoch_committed(1, &SourceCheckpoint::new(1))
2064            .await
2065            .unwrap();
2066    }
2067}