Skip to main content

laminar_connectors/kafka/
config.rs

1//! Kafka source config — broker connection, format, Schema Registry,
2//! backpressure, and pass-through `rdkafka` properties.
3
4use std::collections::HashMap;
5use std::time::Duration;
6
7use rdkafka::config::ClientConfig;
8
9use crate::config::ConnectorConfig;
10use crate::error::ConnectorError;
11use crate::serde::Format;
12
13/// Kafka security protocol for broker connections.
14///
15/// Determines encryption (SSL/TLS) and authentication (SASL) requirements.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum SecurityProtocol {
18    /// Plain-text communication (no encryption, no authentication).
19    #[default]
20    Plaintext,
21    /// SSL/TLS encryption without SASL authentication.
22    Ssl,
23    /// SASL authentication over plain-text connection.
24    SaslPlaintext,
25    /// SASL authentication over SSL/TLS encrypted connection.
26    SaslSsl,
27}
28
29impl SecurityProtocol {
30    /// Returns the rdkafka config value string.
31    #[must_use]
32    pub fn as_rdkafka_str(&self) -> &'static str {
33        match self {
34            SecurityProtocol::Plaintext => "plaintext",
35            SecurityProtocol::Ssl => "ssl",
36            SecurityProtocol::SaslPlaintext => "sasl_plaintext",
37            SecurityProtocol::SaslSsl => "sasl_ssl",
38        }
39    }
40
41    /// Returns true if this protocol uses SSL/TLS.
42    #[must_use]
43    pub fn uses_ssl(&self) -> bool {
44        matches!(self, SecurityProtocol::Ssl | SecurityProtocol::SaslSsl)
45    }
46
47    /// Returns true if this protocol uses SASL authentication.
48    #[must_use]
49    pub fn uses_sasl(&self) -> bool {
50        matches!(
51            self,
52            SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl
53        )
54    }
55}
56
57str_enum!(fromstr SecurityProtocol, lowercase, ConnectorError, "invalid security.protocol",
58    Plaintext => "plaintext";
59    Ssl => "ssl";
60    SaslPlaintext => "sasl_plaintext";
61    SaslSsl => "sasl_ssl"
62);
63
64impl std::fmt::Display for SecurityProtocol {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(f, "{}", self.as_rdkafka_str())
67    }
68}
69
70/// SASL authentication mechanism for Kafka.
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
72pub enum SaslMechanism {
73    /// PLAIN: Simple username/password authentication.
74    #[default]
75    Plain,
76    /// SCRAM-SHA-256: Salted Challenge Response Authentication Mechanism.
77    ScramSha256,
78    /// SCRAM-SHA-512: Salted Challenge Response Authentication Mechanism (stronger).
79    ScramSha512,
80    /// GSSAPI: Kerberos authentication.
81    Gssapi,
82    /// OAUTHBEARER: OAuth 2.0 bearer token authentication.
83    Oauthbearer,
84}
85
86impl SaslMechanism {
87    /// Returns the rdkafka config value string.
88    #[must_use]
89    pub fn as_rdkafka_str(&self) -> &'static str {
90        match self {
91            SaslMechanism::Plain => "PLAIN",
92            SaslMechanism::ScramSha256 => "SCRAM-SHA-256",
93            SaslMechanism::ScramSha512 => "SCRAM-SHA-512",
94            SaslMechanism::Gssapi => "GSSAPI",
95            SaslMechanism::Oauthbearer => "OAUTHBEARER",
96        }
97    }
98
99    /// Returns true if this mechanism requires username/password.
100    #[must_use]
101    pub fn requires_credentials(&self) -> bool {
102        matches!(
103            self,
104            SaslMechanism::Plain | SaslMechanism::ScramSha256 | SaslMechanism::ScramSha512
105        )
106    }
107}
108
109str_enum!(fromstr SaslMechanism, uppercase, ConnectorError, "invalid sasl.mechanism",
110    Plain => "PLAIN";
111    ScramSha256 => "SCRAM_SHA_256", "SCRAM_SHA256";
112    ScramSha512 => "SCRAM_SHA_512", "SCRAM_SHA512";
113    Gssapi => "GSSAPI", "KERBEROS";
114    Oauthbearer => "OAUTHBEARER", "OAUTH"
115);
116
117impl std::fmt::Display for SaslMechanism {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        write!(f, "{}", self.as_rdkafka_str())
120    }
121}
122
123/// Consumer isolation level for reading transactional messages.
124///
125/// Controls whether to read uncommitted messages from transactional producers.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum IsolationLevel {
128    /// Read all messages including uncommitted transactional messages.
129    ReadUncommitted,
130    /// Only read committed messages (recommended for transactional pipelines).
131    #[default]
132    ReadCommitted,
133}
134
135impl IsolationLevel {
136    /// Returns the rdkafka config value string.
137    #[must_use]
138    pub fn as_rdkafka_str(&self) -> &'static str {
139        match self {
140            IsolationLevel::ReadUncommitted => "read_uncommitted",
141            IsolationLevel::ReadCommitted => "read_committed",
142        }
143    }
144}
145
146str_enum!(fromstr IsolationLevel, lowercase, ConnectorError, "invalid isolation.level",
147    ReadUncommitted => "read_uncommitted";
148    ReadCommitted => "read_committed"
149);
150
151impl std::fmt::Display for IsolationLevel {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        write!(f, "{}", self.as_rdkafka_str())
154    }
155}
156
157/// Consumer startup mode controlling where to begin consuming.
158///
159/// This is a higher-level abstraction than `auto.offset.reset` that provides
160/// more control over initial positioning, including timestamp-based and
161/// partition-specific offset assignment.
162#[derive(Debug, Clone, PartialEq, Eq, Default)]
163pub enum StartupMode {
164    /// Use committed group offsets, fall back to `auto.offset.reset` if none exist.
165    #[default]
166    GroupOffsets,
167    /// Start from the earliest available offset in each partition.
168    Earliest,
169    /// Start from the latest offset in each partition (only new messages).
170    Latest,
171    /// Start from specific offsets per partition (`partition_id` -> offset).
172    SpecificOffsets(HashMap<i32, i64>),
173    /// Start from a specific timestamp (milliseconds since epoch).
174    /// The consumer seeks to the first message with timestamp >= this value.
175    Timestamp(i64),
176}
177
178impl StartupMode {}
179
180impl std::str::FromStr for StartupMode {
181    type Err = ConnectorError;
182
183    fn from_str(s: &str) -> Result<Self, Self::Err> {
184        match s.to_lowercase().replace('-', "_").as_str() {
185            "group_offsets" | "group" => Ok(StartupMode::GroupOffsets),
186            "earliest" => Ok(StartupMode::Earliest),
187            "latest" => Ok(StartupMode::Latest),
188            "timestamp" => Err(ConnectorError::ConfigurationError(
189                "use 'startup.timestamp.ms' to start from a timestamp, \
190                 not 'startup.mode = timestamp'"
191                    .into(),
192            )),
193            other => Err(ConnectorError::ConfigurationError(format!(
194                "invalid startup.mode: '{other}' (expected group-offsets/earliest/latest)"
195            ))),
196        }
197    }
198}
199
200impl std::fmt::Display for StartupMode {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        match self {
203            StartupMode::GroupOffsets => write!(f, "group-offsets"),
204            StartupMode::Earliest => write!(f, "earliest"),
205            StartupMode::Latest => write!(f, "latest"),
206            StartupMode::SpecificOffsets(offsets) => {
207                write!(f, "specific-offsets({} partitions)", offsets.len())
208            }
209            StartupMode::Timestamp(ts) => write!(f, "timestamp({ts})"),
210        }
211    }
212}
213
214/// Topic subscription mode: explicit list or regex pattern.
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum TopicSubscription {
217    /// Subscribe to a specific list of topic names.
218    Topics(Vec<String>),
219    /// Subscribe to topics matching a regex pattern (e.g., `events-.*`).
220    Pattern(String),
221}
222
223impl TopicSubscription {
224    /// Returns the topic names if this is a `Topics` subscription.
225    #[must_use]
226    pub fn topics(&self) -> Option<&[String]> {
227        match self {
228            TopicSubscription::Topics(t) => Some(t),
229            TopicSubscription::Pattern(_) => None,
230        }
231    }
232
233    /// Returns the pattern if this is a `Pattern` subscription.
234    #[must_use]
235    pub fn pattern(&self) -> Option<&str> {
236        match self {
237            TopicSubscription::Topics(_) => None,
238            TopicSubscription::Pattern(p) => Some(p),
239        }
240    }
241
242    /// Returns true if this is a pattern-based subscription.
243    #[must_use]
244    pub fn is_pattern(&self) -> bool {
245        matches!(self, TopicSubscription::Pattern(_))
246    }
247}
248
249impl Default for TopicSubscription {
250    fn default() -> Self {
251        TopicSubscription::Topics(Vec::new())
252    }
253}
254
255/// Auto-offset reset policy for new consumer groups.
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum OffsetReset {
258    /// Start from the earliest available offset.
259    Earliest,
260    /// Start from the latest offset (only new messages).
261    Latest,
262    /// Fail if no committed offset exists.
263    None,
264}
265
266impl OffsetReset {
267    /// Returns the rdkafka config value string.
268    #[must_use]
269    pub fn as_rdkafka_str(&self) -> &'static str {
270        match self {
271            OffsetReset::Earliest => "earliest",
272            OffsetReset::Latest => "latest",
273            OffsetReset::None => "error",
274        }
275    }
276}
277
278str_enum!(fromstr OffsetReset, lowercase_nodash, ConnectorError, "invalid auto.offset.reset",
279    Earliest => "earliest", "beginning";
280    Latest => "latest", "end";
281    None => "none", "error"
282);
283
284/// Kafka partition assignment strategy.
285#[derive(Debug, Clone, Copy, PartialEq, Eq)]
286pub enum AssignmentStrategy {
287    /// Range assignment (default).
288    Range,
289    /// Round-robin assignment.
290    RoundRobin,
291    /// Cooperative sticky assignment.
292    CooperativeSticky,
293}
294
295impl AssignmentStrategy {
296    /// Returns the rdkafka config value string.
297    #[must_use]
298    pub fn as_rdkafka_str(&self) -> &'static str {
299        match self {
300            AssignmentStrategy::Range => "range",
301            AssignmentStrategy::RoundRobin => "roundrobin",
302            AssignmentStrategy::CooperativeSticky => "cooperative-sticky",
303        }
304    }
305}
306
307str_enum!(fromstr AssignmentStrategy, lowercase_nodash, ConnectorError,
308    "invalid partition.assignment.strategy",
309    Range => "range";
310    RoundRobin => "roundrobin", "round-robin", "round_robin";
311    CooperativeSticky => "cooperative-sticky", "cooperative_sticky"
312);
313
314/// Schema Registry compatibility level.
315#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316pub enum CompatibilityLevel {
317    /// New schema can read old data.
318    Backward,
319    /// Backward compatible with all prior versions.
320    BackwardTransitive,
321    /// Old schema can read new data.
322    Forward,
323    /// Forward compatible with all prior versions.
324    ForwardTransitive,
325    /// Both backward and forward compatible.
326    Full,
327    /// Full compatible with all prior versions.
328    FullTransitive,
329    /// No compatibility checking.
330    None,
331}
332
333impl CompatibilityLevel {
334    /// Returns the Schema Registry API string.
335    #[must_use]
336    pub fn as_str(&self) -> &'static str {
337        match self {
338            CompatibilityLevel::Backward => "BACKWARD",
339            CompatibilityLevel::BackwardTransitive => "BACKWARD_TRANSITIVE",
340            CompatibilityLevel::Forward => "FORWARD",
341            CompatibilityLevel::ForwardTransitive => "FORWARD_TRANSITIVE",
342            CompatibilityLevel::Full => "FULL",
343            CompatibilityLevel::FullTransitive => "FULL_TRANSITIVE",
344            CompatibilityLevel::None => "NONE",
345        }
346    }
347}
348
349str_enum!(fromstr CompatibilityLevel, uppercase, ConnectorError, "invalid schema.compatibility",
350    Backward => "BACKWARD";
351    BackwardTransitive => "BACKWARD_TRANSITIVE";
352    Forward => "FORWARD";
353    ForwardTransitive => "FORWARD_TRANSITIVE";
354    Full => "FULL";
355    FullTransitive => "FULL_TRANSITIVE";
356    None => "NONE"
357);
358
359impl std::fmt::Display for CompatibilityLevel {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        write!(f, "{}", self.as_str())
362    }
363}
364
365/// Strategy for handling Avro schema evolution at runtime.
366#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
367pub enum SchemaEvolutionStrategy {
368    /// Log schema changes, continue processing.
369    #[default]
370    Log,
371    /// Return an error on incompatible schema changes.
372    Reject,
373    /// No detection — skip schema diffing entirely.
374    Ignore,
375}
376
377str_enum!(fromstr SchemaEvolutionStrategy, lowercase, ConnectorError,
378    "invalid schema.evolution.strategy",
379    Log => "log";
380    Reject => "reject";
381    Ignore => "ignore"
382);
383
384impl std::fmt::Display for SchemaEvolutionStrategy {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            SchemaEvolutionStrategy::Log => write!(f, "log"),
388            SchemaEvolutionStrategy::Reject => write!(f, "reject"),
389            SchemaEvolutionStrategy::Ignore => write!(f, "ignore"),
390        }
391    }
392}
393
394/// Confluent subject-name strategy.
395#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
396pub enum SubjectNameStrategy {
397    /// `{topic}-value` (Confluent default).
398    #[default]
399    TopicName,
400    /// `{record_name}-value`. Requires `schema.registry.record.name`.
401    RecordName,
402    /// `{topic}-{record_name}-value`. Requires `schema.registry.record.name`.
403    TopicRecordName,
404}
405
406str_enum!(fromstr SubjectNameStrategy, lowercase_nodash, ConnectorError,
407    "invalid schema.registry.subject.name.strategy",
408    TopicName => "topic-name", "topicname", "topicnamestrategy";
409    RecordName => "record-name", "recordname", "recordnamestrategy";
410    TopicRecordName => "topic-record-name", "topicrecordname", "topicrecordnamestrategy"
411);
412
413impl std::fmt::Display for SubjectNameStrategy {
414    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415        match self {
416            SubjectNameStrategy::TopicName => write!(f, "topic-name"),
417            SubjectNameStrategy::RecordName => write!(f, "record-name"),
418            SubjectNameStrategy::TopicRecordName => write!(f, "topic-record-name"),
419        }
420    }
421}
422
423/// Build the SR `-value` subject for a topic. `from_config` validates
424/// that `record_name` is present for the record-based strategies, so
425/// the `expect`s are unreachable in practice.
426pub(crate) fn resolve_value_subject(
427    strategy: SubjectNameStrategy,
428    record_name: Option<&str>,
429    topic: &str,
430) -> String {
431    let name = || record_name.expect("from_config validates record.name");
432    match strategy {
433        SubjectNameStrategy::TopicName => format!("{topic}-value"),
434        SubjectNameStrategy::RecordName => format!("{}-value", name()),
435        SubjectNameStrategy::TopicRecordName => format!("{topic}-{}-value", name()),
436    }
437}
438
439/// Maps the Kafka-level [`CompatibilityLevel`] to the schema module's
440/// `CompatibilityMode` for evolution evaluation.
441impl From<CompatibilityLevel> for crate::schema::traits::CompatibilityMode {
442    fn from(level: CompatibilityLevel) -> Self {
443        use crate::schema::traits::CompatibilityMode;
444        match level {
445            CompatibilityLevel::Backward => CompatibilityMode::Backward,
446            CompatibilityLevel::BackwardTransitive => CompatibilityMode::BackwardTransitive,
447            CompatibilityLevel::Forward => CompatibilityMode::Forward,
448            CompatibilityLevel::ForwardTransitive => CompatibilityMode::ForwardTransitive,
449            CompatibilityLevel::Full => CompatibilityMode::Full,
450            CompatibilityLevel::FullTransitive => CompatibilityMode::FullTransitive,
451            CompatibilityLevel::None => CompatibilityMode::None,
452        }
453    }
454}
455
456/// Schema Registry authentication credentials.
457#[derive(Clone)]
458pub struct SrAuth {
459    /// Basic auth username.
460    pub username: String,
461    /// Basic auth password.
462    pub password: String,
463}
464
465impl std::fmt::Debug for SrAuth {
466    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467        f.debug_struct("SrAuth")
468            .field("username", &self.username)
469            .field("password", &"***")
470            .finish()
471    }
472}
473
474/// Kafka source connector configuration.
475///
476/// Uses a custom `Debug` impl that redacts `sasl_password` and
477/// `ssl_key_password` to prevent credential leakage in logs.
478#[derive(Clone)]
479#[allow(clippy::struct_excessive_bools)] // Config struct — each bool is an independent user-facing knob.
480pub struct KafkaSourceConfig {
481    // -- Required --
482    /// Comma-separated list of broker addresses.
483    pub bootstrap_servers: String,
484    /// Consumer group identifier.
485    pub group_id: String,
486    /// Topic subscription (explicit list or regex pattern).
487    pub subscription: TopicSubscription,
488
489    // -- Security --
490    /// Security protocol for broker connections.
491    pub security_protocol: SecurityProtocol,
492    /// SASL authentication mechanism.
493    pub sasl_mechanism: Option<SaslMechanism>,
494    /// SASL username (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
495    pub sasl_username: Option<String>,
496    /// SASL password (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
497    pub sasl_password: Option<String>,
498    /// Path to SSL CA certificate file (PEM format).
499    pub ssl_ca_location: Option<String>,
500    /// Path to client SSL certificate file (PEM format).
501    pub ssl_certificate_location: Option<String>,
502    /// Path to client SSL private key file (PEM format).
503    pub ssl_key_location: Option<String>,
504    /// Password for encrypted SSL private key.
505    pub ssl_key_password: Option<String>,
506
507    // -- Format & Schema --
508    /// Data format for deserialization.
509    pub format: Format,
510    /// Confluent Schema Registry URL.
511    pub schema_registry_url: Option<String>,
512    /// Schema Registry authentication credentials.
513    pub schema_registry_auth: Option<SrAuth>,
514    /// Override compatibility level for the subject.
515    pub schema_compatibility: Option<CompatibilityLevel>,
516    /// How to handle Avro schema evolution detected at runtime.
517    pub schema_evolution_strategy: SchemaEvolutionStrategy,
518    /// Schema Registry SSL CA certificate path.
519    pub schema_registry_ssl_ca_location: Option<String>,
520    /// Schema Registry SSL client certificate path.
521    pub schema_registry_ssl_certificate_location: Option<String>,
522    /// Schema Registry SSL client key path.
523    pub schema_registry_ssl_key_location: Option<String>,
524    /// Confluent subject-name strategy. Default: `TopicName`.
525    pub schema_registry_subject_strategy: SubjectNameStrategy,
526    /// Record name for `record-name` / `topic-record-name` strategies.
527    pub schema_registry_record_name: Option<String>,
528    /// Deadline for Schema Registry lookups performed during schema
529    /// auto-discovery at DDL time. Default: 10s.
530    pub schema_registry_discovery_timeout: Duration,
531    /// Column name containing the event timestamp.
532    pub event_time_column: Option<String>,
533    /// Whether to include Kafka metadata columns (_partition, _offset, _timestamp).
534    pub include_metadata: bool,
535    /// Whether to include Kafka headers as a map column (_headers).
536    pub include_headers: bool,
537
538    // -- Consumer tuning --
539    /// Consumer startup mode (controls initial offset positioning).
540    pub startup_mode: StartupMode,
541    /// Where to start reading when no committed offset exists.
542    pub auto_offset_reset: OffsetReset,
543    /// Consumer transaction isolation level.
544    pub isolation_level: IsolationLevel,
545    /// Maximum records per poll batch.
546    pub max_poll_records: usize,
547    /// Partition assignment strategy.
548    pub partition_assignment_strategy: AssignmentStrategy,
549    /// Minimum bytes to return from a fetch (allows batching).
550    pub fetch_min_bytes: Option<i32>,
551    /// Maximum bytes to return from broker per request.
552    pub fetch_max_bytes: Option<i32>,
553    /// Maximum time broker waits for fetch.min.bytes.
554    pub fetch_max_wait_ms: Option<i32>,
555    /// Maximum bytes per partition to return from broker.
556    pub max_partition_fetch_bytes: Option<i32>,
557
558    // -- Watermark --
559    /// Maximum expected out-of-orderness for watermark generation.
560    pub max_out_of_orderness: Duration,
561    /// Timeout before marking a partition as idle.
562    pub idle_timeout: Duration,
563    /// Enable per-partition watermark tracking (integrates with watermark tracking).
564    pub enable_watermark_tracking: bool,
565    // -- Consumer group timing --
566    /// Consumer session timeout (default: 45s — production-safe; rdkafka's
567    /// aggressive 10s default causes rebalance storms under GC pauses).
568    pub session_timeout: Duration,
569    /// Consumer heartbeat interval (default: 10s). Must satisfy
570    /// `heartbeat_interval * 3 < session_timeout` per Kafka broker requirement.
571    pub heartbeat_interval: Duration,
572    /// Maximum interval between calls to `rd_kafka_consumer_poll()` before
573    /// the broker considers this consumer dead and triggers a rebalance.
574    ///
575    /// Default: 600s (10 minutes). rdkafka's default is 300s, but with
576    /// reader-side pause/resume the reader keeps polling even under
577    /// backpressure, so this is a safety margin. Must be >= 60s.
578    pub max_poll_interval: Duration,
579    /// Maximum per-partition pre-fetch queue size in kbytes (default: 16384 = 16MB).
580    /// rdkafka's 64MB default is too aggressive for an embedded database.
581    pub queued_max_messages_kbytes: u32,
582
583    // -- Broker commit --
584    /// Whether to commit consumed offsets to the Kafka broker after each
585    /// `LaminarDB` checkpoint completes. Default: `true`.
586    ///
587    /// Advisory — `LaminarDB` recovery uses its own manifest, not broker-stored
588    /// offsets. This exists so external tooling (`kafka-consumer-groups`,
589    /// kafka-exporter, Burrow) sees progress, and so `StartupMode::GroupOffsets`
590    /// can act as a "lost-checkpoint" fallback that resumes from the last
591    /// durable epoch (no skip-ahead window).
592    pub broker_commit_on_checkpoint: bool,
593
594    // -- Backpressure --
595    /// Capacity of the bounded channel between the background Kafka reader
596    /// task and `poll_batch()` (default: 1024). Must be >= `max_poll_records`.
597    pub reader_channel_capacity: usize,
598    /// Channel fill ratio at which to pause consumption.
599    pub backpressure_high_watermark: f64,
600    /// Channel fill ratio at which to resume consumption.
601    pub backpressure_low_watermark: f64,
602
603    // -- Error handling --
604    /// Maximum tolerated deserialization error rate per batch (0.0-1.0).
605    ///
606    /// When the poison pill fallback is active and the error rate exceeds
607    /// this threshold, the batch is rejected instead of returning partial
608    /// results. Prevents silent data loss when a schema change makes most
609    /// records unparseable. Default: 0.5 (50%).
610    pub max_deser_error_rate: f64,
611
612    // -- Pass-through --
613    /// Additional rdkafka properties passed directly to librdkafka.
614    pub kafka_properties: HashMap<String, String>,
615}
616
617impl std::fmt::Debug for KafkaSourceConfig {
618    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
619        f.debug_struct("KafkaSourceConfig")
620            .field("bootstrap_servers", &self.bootstrap_servers)
621            .field("group_id", &self.group_id)
622            .field("subscription", &self.subscription)
623            .field("format", &self.format)
624            .field("security_protocol", &self.security_protocol)
625            .field("sasl_mechanism", &self.sasl_mechanism)
626            .field("sasl_username", &self.sasl_username)
627            .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
628            .field(
629                "ssl_key_password",
630                &self.ssl_key_password.as_ref().map(|_| "***"),
631            )
632            .field("max_poll_records", &self.max_poll_records)
633            .finish_non_exhaustive()
634    }
635}
636
637impl Default for KafkaSourceConfig {
638    fn default() -> Self {
639        Self {
640            bootstrap_servers: String::new(),
641            group_id: String::new(),
642            subscription: TopicSubscription::default(),
643            security_protocol: SecurityProtocol::default(),
644            sasl_mechanism: None,
645            sasl_username: None,
646            sasl_password: None,
647            ssl_ca_location: None,
648            ssl_certificate_location: None,
649            ssl_key_location: None,
650            ssl_key_password: None,
651            format: Format::Json,
652            schema_registry_url: None,
653            schema_registry_auth: None,
654            schema_compatibility: None,
655            schema_evolution_strategy: SchemaEvolutionStrategy::default(),
656            schema_registry_ssl_ca_location: None,
657            schema_registry_ssl_certificate_location: None,
658            schema_registry_ssl_key_location: None,
659            schema_registry_subject_strategy: SubjectNameStrategy::default(),
660            schema_registry_record_name: None,
661            schema_registry_discovery_timeout: Duration::from_secs(10),
662            event_time_column: None,
663            include_metadata: false,
664            include_headers: false,
665            startup_mode: StartupMode::default(),
666            auto_offset_reset: OffsetReset::Earliest,
667            isolation_level: IsolationLevel::default(),
668            max_poll_records: 1000,
669            partition_assignment_strategy: AssignmentStrategy::Range,
670            fetch_min_bytes: None,
671            fetch_max_bytes: None,
672            fetch_max_wait_ms: None,
673            max_partition_fetch_bytes: None,
674            max_out_of_orderness: Duration::from_secs(5),
675            idle_timeout: Duration::from_secs(30),
676            enable_watermark_tracking: false,
677            session_timeout: Duration::from_secs(45),
678            heartbeat_interval: Duration::from_secs(10),
679            max_poll_interval: Duration::from_secs(600),
680            queued_max_messages_kbytes: 16384,
681            broker_commit_on_checkpoint: true,
682            reader_channel_capacity: 1024,
683            backpressure_high_watermark: 0.8,
684            backpressure_low_watermark: 0.5,
685            max_deser_error_rate: 0.5,
686            kafka_properties: HashMap::new(),
687        }
688    }
689}
690
691impl KafkaSourceConfig {
692    /// Parses a [`KafkaSourceConfig`] from a [`ConnectorConfig`].
693    ///
694    /// # Errors
695    ///
696    /// Returns `ConnectorError` if required fields are missing or values are invalid.
697    #[allow(deprecated, clippy::too_many_lines)]
698    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
699        let bootstrap_servers = config.require("bootstrap.servers")?.to_string();
700        let group_id = config.require("group.id")?.to_string();
701
702        let subscription = if let Some(pattern) = config.get("topic.pattern") {
703            TopicSubscription::Pattern(pattern.to_string())
704        } else {
705            let topics_str = config.require("topic")?;
706            let topics: Vec<String> = topics_str
707                .split(',')
708                .map(|s| s.trim().to_string())
709                .collect();
710            TopicSubscription::Topics(topics.clone())
711        };
712
713        let security_protocol = match config.get("security.protocol") {
714            Some(s) => s.parse::<SecurityProtocol>()?,
715            None => SecurityProtocol::default(),
716        };
717
718        let sasl_mechanism = match config.get("sasl.mechanism") {
719            Some(s) => Some(s.parse::<SaslMechanism>()?),
720            None => None,
721        };
722
723        let sasl_username = config.get("sasl.username").map(String::from);
724        let sasl_password = config.get("sasl.password").map(String::from);
725        let ssl_ca_location = config.get("ssl.ca.location").map(String::from);
726        let ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
727        let ssl_key_location = config.get("ssl.key.location").map(String::from);
728        let ssl_key_password = config.get("ssl.key.password").map(String::from);
729
730        let format = match config.get("format") {
731            Some(f) => f
732                .parse::<Format>()
733                .map_err(|e| ConnectorError::ConfigurationError(e.to_string()))?,
734            None => Format::Json,
735        };
736
737        let schema_registry_url = config.get("schema.registry.url").map(String::from);
738
739        let schema_registry_auth = match (
740            config.get("schema.registry.username"),
741            config.get("schema.registry.password"),
742        ) {
743            (Some(u), Some(p)) => Some(SrAuth {
744                username: u.to_string(),
745                password: p.to_string(),
746            }),
747            (Some(_), None) | (None, Some(_)) => {
748                return Err(ConnectorError::ConfigurationError(
749                    "schema.registry.username and schema.registry.password must both be set"
750                        .to_string(),
751                ));
752            }
753            (None, None) => None,
754        };
755
756        let schema_compatibility = match config.get("schema.compatibility") {
757            Some(s) => Some(s.parse::<CompatibilityLevel>()?),
758            None => None,
759        };
760
761        let schema_evolution_strategy = match config.get("schema.evolution.strategy") {
762            Some(s) => s.parse::<SchemaEvolutionStrategy>()?,
763            None => SchemaEvolutionStrategy::default(),
764        };
765
766        let schema_registry_ssl_ca_location = config
767            .get("schema.registry.ssl.ca.location")
768            .map(String::from);
769        let schema_registry_ssl_certificate_location = config
770            .get("schema.registry.ssl.certificate.location")
771            .map(String::from);
772        let schema_registry_ssl_key_location = config
773            .get("schema.registry.ssl.key.location")
774            .map(String::from);
775
776        let schema_registry_subject_strategy =
777            match config.get("schema.registry.subject.name.strategy") {
778                Some(s) => s.parse::<SubjectNameStrategy>()?,
779                None => SubjectNameStrategy::default(),
780            };
781
782        let schema_registry_record_name =
783            config.get("schema.registry.record.name").map(String::from);
784
785        if matches!(
786            schema_registry_subject_strategy,
787            SubjectNameStrategy::RecordName | SubjectNameStrategy::TopicRecordName
788        ) && schema_registry_record_name.is_none()
789        {
790            return Err(ConnectorError::ConfigurationError(format!(
791                "schema.registry.subject.name.strategy={schema_registry_subject_strategy} \
792                 requires schema.registry.record.name"
793            )));
794        }
795
796        let schema_registry_discovery_timeout = config
797            .get_parsed::<u64>("schema.registry.discovery.timeout.ms")?
798            .map_or(Duration::from_secs(10), Duration::from_millis);
799
800        let event_time_column = config.get("event.time.column").map(String::from);
801
802        let include_metadata = config
803            .get_parsed::<bool>("include.metadata")?
804            .unwrap_or(false);
805
806        let include_headers = config
807            .get_parsed::<bool>("include.headers")?
808            .unwrap_or(false);
809
810        let startup_mode = if let Some(offsets_str) = config.get("startup.specific.offsets") {
811            let offsets = parse_specific_offsets(offsets_str)?;
812            StartupMode::SpecificOffsets(offsets)
813        } else if let Some(ts_str) = config.get("startup.timestamp.ms") {
814            let ts: i64 = ts_str.parse().map_err(|_| {
815                ConnectorError::ConfigurationError(format!(
816                    "invalid startup.timestamp.ms: '{ts_str}'"
817                ))
818            })?;
819            StartupMode::Timestamp(ts)
820        } else {
821            match config.get("startup.mode") {
822                Some(s) => s.parse::<StartupMode>()?,
823                None => StartupMode::default(),
824            }
825        };
826
827        // StartupMode::Earliest/Latest override auto.offset.reset so that
828        // `startup.mode = latest` works without requiring the user to also
829        // set `auto.offset.reset = latest` (the previous disconnect was a
830        // silent data-correctness bug: consumers started from earliest).
831        let auto_offset_reset = match &startup_mode {
832            StartupMode::Earliest => OffsetReset::Earliest,
833            StartupMode::Latest => OffsetReset::Latest,
834            _ => match config.get("auto.offset.reset") {
835                Some(s) => s.parse::<OffsetReset>()?,
836                None => OffsetReset::Earliest,
837            },
838        };
839
840        let isolation_level = match config.get("isolation.level") {
841            Some(s) => s.parse::<IsolationLevel>()?,
842            None => IsolationLevel::default(),
843        };
844
845        let max_poll_records = config
846            .get_parsed::<usize>("max.poll.records")?
847            .unwrap_or(1000);
848
849        let partition_assignment_strategy = match config.get("partition.assignment.strategy") {
850            Some(s) => s.parse::<AssignmentStrategy>()?,
851            None => AssignmentStrategy::Range,
852        };
853
854        let fetch_min_bytes = config.get_parsed::<i32>("fetch.min.bytes")?;
855        let fetch_max_bytes = config.get_parsed::<i32>("fetch.max.bytes")?;
856        let fetch_max_wait_ms = config.get_parsed::<i32>("fetch.max.wait.ms")?;
857        let max_partition_fetch_bytes = config.get_parsed::<i32>("max.partition.fetch.bytes")?;
858
859        let max_out_of_orderness_ms = config
860            .get_parsed::<u64>("max.out.of.orderness.ms")?
861            .unwrap_or(5000);
862
863        let idle_timeout_ms = config
864            .get_parsed::<u64>("idle.timeout.ms")?
865            .unwrap_or(30_000);
866
867        let enable_watermark_tracking = config
868            .get_parsed::<bool>("enable.watermark.tracking")?
869            .unwrap_or(false);
870
871        let session_timeout_ms = config
872            .get_parsed::<u64>("session.timeout.ms")?
873            .unwrap_or(45_000);
874        let heartbeat_interval_ms = config
875            .get_parsed::<u64>("heartbeat.interval.ms")?
876            .unwrap_or(10_000);
877        let queued_max_messages_kbytes = config
878            .get_parsed::<u32>("queued.max.messages.kbytes")?
879            .unwrap_or(16384);
880        let max_poll_interval_ms = config
881            .get_parsed::<u64>("max.poll.interval.ms")?
882            .unwrap_or(600_000);
883
884        // `broker.commit.interval.ms` is no longer supported. Broker offset
885        // commits now run on checkpoint completion (`notify_epoch_committed`),
886        // not on a wall-clock timer. Reject explicitly so silent semantic
887        // drift can't happen.
888        if config
889            .properties()
890            .contains_key("broker.commit.interval.ms")
891        {
892            return Err(ConnectorError::ConfigurationError(
893                "broker.commit.interval.ms is no longer supported — broker offset \
894                 commits now happen on checkpoint completion. Set \
895                 broker.commit.on.checkpoint=false to disable."
896                    .into(),
897            ));
898        }
899        let broker_commit_on_checkpoint = config
900            .get_parsed::<bool>("broker.commit.on.checkpoint")?
901            .unwrap_or(true);
902
903        let reader_channel_capacity = config
904            .get_parsed::<usize>("reader.channel.capacity")?
905            .unwrap_or(1024);
906
907        let backpressure_high_watermark = config
908            .get_parsed::<f64>("backpressure.high.watermark")?
909            .unwrap_or(0.8);
910
911        let backpressure_low_watermark = config
912            .get_parsed::<f64>("backpressure.low.watermark")?
913            .unwrap_or(0.5);
914
915        let max_deser_error_rate = config
916            .get_parsed::<f64>("max.deser.error.rate")?
917            .unwrap_or(0.5);
918
919        let kafka_properties = config.properties_with_prefix("kafka.");
920
921        let cfg = Self {
922            bootstrap_servers,
923            group_id,
924            subscription,
925            security_protocol,
926            sasl_mechanism,
927            sasl_username,
928            sasl_password,
929            ssl_ca_location,
930            ssl_certificate_location,
931            ssl_key_location,
932            ssl_key_password,
933            format,
934            schema_registry_url,
935            schema_registry_auth,
936            schema_compatibility,
937            schema_evolution_strategy,
938            schema_registry_ssl_ca_location,
939            schema_registry_ssl_certificate_location,
940            schema_registry_ssl_key_location,
941            schema_registry_subject_strategy,
942            schema_registry_record_name,
943            schema_registry_discovery_timeout,
944            event_time_column,
945            include_metadata,
946            include_headers,
947            startup_mode,
948            auto_offset_reset,
949            isolation_level,
950            max_poll_records,
951            partition_assignment_strategy,
952            fetch_min_bytes,
953            fetch_max_bytes,
954            fetch_max_wait_ms,
955            max_partition_fetch_bytes,
956            max_out_of_orderness: Duration::from_millis(max_out_of_orderness_ms),
957            idle_timeout: Duration::from_millis(idle_timeout_ms),
958            enable_watermark_tracking,
959            session_timeout: Duration::from_millis(session_timeout_ms),
960            heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
961            max_poll_interval: Duration::from_millis(max_poll_interval_ms),
962            queued_max_messages_kbytes,
963            broker_commit_on_checkpoint,
964            reader_channel_capacity,
965            backpressure_high_watermark,
966            backpressure_low_watermark,
967            max_deser_error_rate,
968            kafka_properties,
969        };
970
971        cfg.validate()?;
972        Ok(cfg)
973    }
974
975    /// Validates the configuration.
976    ///
977    /// # Errors
978    ///
979    /// Returns `ConnectorError::ConfigurationError` if the configuration is invalid.
980    pub fn validate(&self) -> Result<(), ConnectorError> {
981        if self.bootstrap_servers.is_empty() {
982            return Err(ConnectorError::ConfigurationError(
983                "bootstrap.servers cannot be empty".into(),
984            ));
985        }
986        if self.group_id.is_empty() {
987            return Err(ConnectorError::ConfigurationError(
988                "group.id cannot be empty".into(),
989            ));
990        }
991
992        // Validate topic subscription
993        match &self.subscription {
994            TopicSubscription::Topics(t) if t.is_empty() => {
995                return Err(ConnectorError::ConfigurationError(
996                    "at least one topic is required (or use topic.pattern)".into(),
997                ));
998            }
999            TopicSubscription::Pattern(p) if p.is_empty() => {
1000                return Err(ConnectorError::ConfigurationError(
1001                    "topic.pattern cannot be empty".into(),
1002                ));
1003            }
1004            _ => {}
1005        }
1006
1007        if self.max_poll_records == 0 {
1008            return Err(ConnectorError::ConfigurationError(
1009                "max.poll.records must be > 0".into(),
1010            ));
1011        }
1012        if self.reader_channel_capacity < self.max_poll_records {
1013            return Err(ConnectorError::ConfigurationError(format!(
1014                "reader.channel.capacity ({}) must be >= max.poll.records ({})",
1015                self.reader_channel_capacity, self.max_poll_records
1016            )));
1017        }
1018
1019        if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
1020            return Err(ConnectorError::ConfigurationError(
1021                "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
1022                    .into(),
1023            ));
1024        }
1025
1026        if let Some(mechanism) = &self.sasl_mechanism {
1027            if mechanism.requires_credentials()
1028                && (self.sasl_username.is_none() || self.sasl_password.is_none())
1029            {
1030                return Err(ConnectorError::ConfigurationError(format!(
1031                    "sasl.username and sasl.password are required for {mechanism} mechanism"
1032                )));
1033            }
1034        }
1035
1036        if self.security_protocol.uses_ssl() {
1037            if let Some(ref ca) = self.ssl_ca_location {
1038                if ca.is_empty() {
1039                    return Err(ConnectorError::ConfigurationError(
1040                        "ssl.ca.location cannot be empty when specified".into(),
1041                    ));
1042                }
1043            }
1044        }
1045
1046        if self.max_poll_interval.as_secs() < 60 {
1047            return Err(ConnectorError::ConfigurationError(format!(
1048                "max.poll.interval.ms ({}) must be >= 60000 (60s)",
1049                self.max_poll_interval.as_millis()
1050            )));
1051        }
1052
1053        // Kafka broker requires heartbeat_interval * 3 < session_timeout.
1054        if self.heartbeat_interval.as_millis() * 3 >= self.session_timeout.as_millis() {
1055            return Err(ConnectorError::ConfigurationError(format!(
1056                "heartbeat.interval.ms ({}) * 3 must be < session.timeout.ms ({})",
1057                self.heartbeat_interval.as_millis(),
1058                self.session_timeout.as_millis()
1059            )));
1060        }
1061
1062        if self.backpressure_high_watermark <= self.backpressure_low_watermark {
1063            return Err(ConnectorError::ConfigurationError(
1064                "backpressure.high.watermark must be > backpressure.low.watermark".into(),
1065            ));
1066        }
1067        if !(0.0..=1.0).contains(&self.backpressure_high_watermark) {
1068            return Err(ConnectorError::ConfigurationError(
1069                "backpressure.high.watermark must be between 0.0 and 1.0".into(),
1070            ));
1071        }
1072        if !(0.0..=1.0).contains(&self.backpressure_low_watermark) {
1073            return Err(ConnectorError::ConfigurationError(
1074                "backpressure.low.watermark must be between 0.0 and 1.0".into(),
1075            ));
1076        }
1077
1078        if !(0.0..=1.0).contains(&self.max_deser_error_rate) {
1079            return Err(ConnectorError::ConfigurationError(
1080                "max.deser.error.rate must be between 0.0 and 1.0".into(),
1081            ));
1082        }
1083
1084        if self.format == Format::Avro && self.schema_registry_url.is_none() {
1085            return Err(ConnectorError::ConfigurationError(
1086                "schema.registry.url is required when format is 'avro'".into(),
1087            ));
1088        }
1089
1090        Ok(())
1091    }
1092
1093    /// Builds an rdkafka [`ClientConfig`] from this configuration.
1094    #[must_use]
1095    pub fn to_rdkafka_config(&self) -> ClientConfig {
1096        let mut config = ClientConfig::new();
1097
1098        config.set("bootstrap.servers", &self.bootstrap_servers);
1099        config.set("group.id", &self.group_id);
1100        config.set("enable.auto.commit", "false");
1101        config.set("enable.auto.offset.store", "false");
1102        config.set("auto.offset.reset", self.auto_offset_reset.as_rdkafka_str());
1103        config.set(
1104            "partition.assignment.strategy",
1105            self.partition_assignment_strategy.as_rdkafka_str(),
1106        );
1107        config.set("security.protocol", self.security_protocol.as_rdkafka_str());
1108
1109        if let Some(ref mechanism) = self.sasl_mechanism {
1110            config.set("sasl.mechanism", mechanism.as_rdkafka_str());
1111        }
1112        if let Some(ref username) = self.sasl_username {
1113            config.set("sasl.username", username);
1114        }
1115        if let Some(ref password) = self.sasl_password {
1116            config.set("sasl.password", password);
1117        }
1118        if let Some(ref ca) = self.ssl_ca_location {
1119            config.set("ssl.ca.location", ca);
1120        }
1121        if let Some(ref cert) = self.ssl_certificate_location {
1122            config.set("ssl.certificate.location", cert);
1123        }
1124        if let Some(ref key) = self.ssl_key_location {
1125            config.set("ssl.key.location", key);
1126        }
1127        if let Some(ref key_pass) = self.ssl_key_password {
1128            config.set("ssl.key.password", key_pass);
1129        }
1130
1131        config.set("isolation.level", self.isolation_level.as_rdkafka_str());
1132        config.set(
1133            "session.timeout.ms",
1134            self.session_timeout.as_millis().to_string(),
1135        );
1136        config.set(
1137            "heartbeat.interval.ms",
1138            self.heartbeat_interval.as_millis().to_string(),
1139        );
1140        config.set(
1141            "queued.max.messages.kbytes",
1142            self.queued_max_messages_kbytes.to_string(),
1143        );
1144        config.set(
1145            "max.poll.interval.ms",
1146            self.max_poll_interval.as_millis().to_string(),
1147        );
1148
1149        if let Some(fetch_min) = self.fetch_min_bytes {
1150            config.set("fetch.min.bytes", fetch_min.to_string());
1151        }
1152
1153        if let Some(fetch_max) = self.fetch_max_bytes {
1154            config.set("fetch.max.bytes", fetch_max.to_string());
1155        }
1156
1157        if let Some(wait_ms) = self.fetch_max_wait_ms {
1158            config.set("fetch.wait.max.ms", wait_ms.to_string());
1159        }
1160
1161        if let Some(partition_max) = self.max_partition_fetch_bytes {
1162            config.set("max.partition.fetch.bytes", partition_max.to_string());
1163        }
1164
1165        // Apply pass-through properties, blocking security-critical keys
1166        // that could silently downgrade authentication or break semantics.
1167        for (key, value) in &self.kafka_properties {
1168            if is_blocked_passthrough_key(key) {
1169                tracing::warn!(
1170                    key,
1171                    "ignoring kafka.* pass-through property that overrides a security setting"
1172                );
1173                continue;
1174            }
1175            config.set(key, value);
1176        }
1177
1178        config
1179    }
1180}
1181
1182/// Returns `true` if a pass-through kafka.* key must not override explicit settings.
1183fn is_blocked_passthrough_key(key: &str) -> bool {
1184    key.starts_with("sasl.kerberos.")
1185        || matches!(
1186            key,
1187            "security.protocol"
1188                | "sasl.mechanism"
1189                | "sasl.username"
1190                | "sasl.password"
1191                | "sasl.oauthbearer.config"
1192                | "ssl.ca.location"
1193                | "ssl.certificate.location"
1194                | "ssl.key.location"
1195                | "ssl.key.password"
1196                | "ssl.endpoint.identification.algorithm"
1197                | "enable.auto.commit"
1198                | "enable.auto.offset.store"
1199                | "enable.idempotence"
1200                | "auto.offset.reset"
1201                | "session.timeout.ms"
1202                | "heartbeat.interval.ms"
1203                | "max.poll.interval.ms"
1204                | "queued.max.messages.kbytes"
1205                // librdkafka's own auto-commit interval — only meaningful
1206                // when `enable.auto.commit=true`, which we hard-disable.
1207                | "auto.commit.interval.ms"
1208        )
1209}
1210
1211/// Parses a specific offsets string in the format "partition:offset,partition:offset,...".
1212///
1213/// Example: "0:100,1:200,2:300" maps partition 0 to offset 100, partition 1 to offset 200, etc.
1214fn parse_specific_offsets(s: &str) -> Result<HashMap<i32, i64>, ConnectorError> {
1215    let mut offsets = HashMap::new();
1216
1217    for pair in s.split(',') {
1218        let pair = pair.trim();
1219        if pair.is_empty() {
1220            continue;
1221        }
1222
1223        let parts: Vec<&str> = pair.split(':').collect();
1224        if parts.len() != 2 {
1225            return Err(ConnectorError::ConfigurationError(format!(
1226                "invalid offset format '{pair}' (expected 'partition:offset')"
1227            )));
1228        }
1229
1230        let partition: i32 = parts[0].trim().parse().map_err(|_| {
1231            ConnectorError::ConfigurationError(format!(
1232                "invalid partition number '{}' in '{pair}'",
1233                parts[0]
1234            ))
1235        })?;
1236
1237        let offset: i64 = parts[1].trim().parse().map_err(|_| {
1238            ConnectorError::ConfigurationError(format!("invalid offset '{}' in '{pair}'", parts[1]))
1239        })?;
1240
1241        offsets.insert(partition, offset);
1242    }
1243
1244    if offsets.is_empty() {
1245        return Err(ConnectorError::ConfigurationError(
1246            "startup.specific.offsets cannot be empty".into(),
1247        ));
1248    }
1249
1250    Ok(offsets)
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use super::*;
1256
1257    fn make_config(extra: &[(&str, &str)]) -> ConnectorConfig {
1258        let mut config = ConnectorConfig::new("kafka");
1259        config.set("bootstrap.servers", "localhost:9092");
1260        config.set("group.id", "test-group");
1261        config.set("topic", "events");
1262        for (k, v) in extra {
1263            config.set(*k, *v);
1264        }
1265        config
1266    }
1267
1268    #[test]
1269    #[allow(deprecated)]
1270    fn test_parse_required_fields() {
1271        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1272        assert_eq!(cfg.bootstrap_servers, "localhost:9092");
1273        assert_eq!(cfg.group_id, "test-group");
1274        assert_eq!(cfg.subscription.topics().unwrap(), &["events"]);
1275        assert!(matches!(
1276            cfg.subscription,
1277            TopicSubscription::Topics(ref t) if t == &["events"]
1278        ));
1279    }
1280
1281    #[test]
1282    fn test_parse_missing_required() {
1283        let config = ConnectorConfig::new("kafka");
1284        assert!(KafkaSourceConfig::from_config(&config).is_err());
1285    }
1286
1287    #[test]
1288    #[allow(deprecated)]
1289    fn test_parse_multi_topic() {
1290        let cfg = KafkaSourceConfig::from_config(&make_config(&[("topic", "a, b, c")])).unwrap();
1291        assert_eq!(cfg.subscription.topics().unwrap(), &["a", "b", "c"]);
1292        assert!(matches!(
1293            cfg.subscription,
1294            TopicSubscription::Topics(ref t) if t == &["a", "b", "c"]
1295        ));
1296    }
1297
1298    #[test]
1299    fn test_parse_topic_pattern() {
1300        let mut config = ConnectorConfig::new("kafka");
1301        config.set("bootstrap.servers", "localhost:9092");
1302        config.set("group.id", "test-group");
1303        config.set("topic.pattern", "events-.*");
1304
1305        let cfg = KafkaSourceConfig::from_config(&config).unwrap();
1306        assert!(matches!(
1307            cfg.subscription,
1308            TopicSubscription::Pattern(ref p) if p == "events-.*"
1309        ));
1310        assert!(cfg.subscription.is_pattern());
1311        assert_eq!(cfg.subscription.pattern(), Some("events-.*"));
1312        assert!(cfg.subscription.topics().is_none());
1313    }
1314
1315    #[test]
1316    fn test_parse_defaults() {
1317        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1318        assert_eq!(cfg.format, Format::Json);
1319        assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1320        assert_eq!(cfg.isolation_level, IsolationLevel::ReadCommitted);
1321        assert_eq!(cfg.max_poll_records, 1000);
1322        assert_eq!(cfg.partition_assignment_strategy, AssignmentStrategy::Range);
1323        assert!(!cfg.include_metadata);
1324        assert!(!cfg.include_headers);
1325        assert!(cfg.schema_registry_url.is_none());
1326        assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
1327        assert!(cfg.sasl_mechanism.is_none());
1328        assert!(cfg.broker_commit_on_checkpoint);
1329        assert_eq!(cfg.reader_channel_capacity, 1024);
1330    }
1331
1332    #[test]
1333    fn test_parse_broker_commit_on_checkpoint_disabled() {
1334        let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1335            "broker.commit.on.checkpoint",
1336            "false",
1337        )]))
1338        .unwrap();
1339        assert!(!cfg.broker_commit_on_checkpoint);
1340    }
1341
1342    #[test]
1343    fn test_parse_broker_commit_interval_rejected() {
1344        let err =
1345            KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "5000")]))
1346                .unwrap_err();
1347        let msg = format!("{err}");
1348        assert!(
1349            msg.contains("broker.commit.interval.ms"),
1350            "expected hard-error mentioning the deprecated key, got: {msg}"
1351        );
1352    }
1353
1354    #[test]
1355    fn test_parse_optional_fields() {
1356        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1357            ("format", "csv"),
1358            ("auto.offset.reset", "latest"),
1359            ("max.poll.records", "500"),
1360            ("include.metadata", "true"),
1361            ("include.headers", "true"),
1362            ("event.time.column", "ts"),
1363            ("partition.assignment.strategy", "roundrobin"),
1364            ("isolation.level", "read_uncommitted"),
1365        ]))
1366        .unwrap();
1367
1368        assert_eq!(cfg.format, Format::Csv);
1369        assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1370        assert_eq!(cfg.isolation_level, IsolationLevel::ReadUncommitted);
1371        assert_eq!(cfg.max_poll_records, 500);
1372        assert!(cfg.include_metadata);
1373        assert!(cfg.include_headers);
1374        assert_eq!(cfg.event_time_column, Some("ts".to_string()));
1375        assert_eq!(
1376            cfg.partition_assignment_strategy,
1377            AssignmentStrategy::RoundRobin
1378        );
1379    }
1380
1381    #[test]
1382    fn test_parse_security_sasl_ssl() {
1383        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1384            ("security.protocol", "sasl_ssl"),
1385            ("sasl.mechanism", "SCRAM-SHA-256"),
1386            ("sasl.username", "alice"),
1387            ("sasl.password", "secret"),
1388            ("ssl.ca.location", "/etc/ssl/ca.pem"),
1389        ]))
1390        .unwrap();
1391
1392        assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
1393        assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha256));
1394        assert_eq!(cfg.sasl_username, Some("alice".to_string()));
1395        assert_eq!(cfg.sasl_password, Some("secret".to_string()));
1396        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1397    }
1398
1399    #[test]
1400    fn test_parse_security_ssl_only() {
1401        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1402            ("security.protocol", "ssl"),
1403            ("ssl.ca.location", "/etc/ssl/ca.pem"),
1404            ("ssl.certificate.location", "/etc/ssl/client.pem"),
1405            ("ssl.key.location", "/etc/ssl/client.key"),
1406            ("ssl.key.password", "keypass"),
1407        ]))
1408        .unwrap();
1409
1410        assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
1411        assert!(cfg.security_protocol.uses_ssl());
1412        assert!(!cfg.security_protocol.uses_sasl());
1413        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1414        assert_eq!(
1415            cfg.ssl_certificate_location,
1416            Some("/etc/ssl/client.pem".to_string())
1417        );
1418        assert_eq!(
1419            cfg.ssl_key_location,
1420            Some("/etc/ssl/client.key".to_string())
1421        );
1422        assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
1423    }
1424
1425    #[test]
1426    fn test_parse_fetch_tuning() {
1427        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1428            ("fetch.min.bytes", "1024"),
1429            ("fetch.max.bytes", "52428800"),
1430            ("fetch.max.wait.ms", "500"),
1431            ("max.partition.fetch.bytes", "1048576"),
1432        ]))
1433        .unwrap();
1434
1435        assert_eq!(cfg.fetch_min_bytes, Some(1024));
1436        assert_eq!(cfg.fetch_max_bytes, Some(52_428_800));
1437        assert_eq!(cfg.fetch_max_wait_ms, Some(500));
1438        assert_eq!(cfg.max_partition_fetch_bytes, Some(1_048_576));
1439    }
1440
1441    #[test]
1442    fn test_parse_kafka_passthrough() {
1443        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1444            ("kafka.session.timeout.ms", "30000"),
1445            ("kafka.max.poll.interval.ms", "300000"),
1446            ("kafka.fetch.message.max.bytes", "2097152"),
1447        ]))
1448        .unwrap();
1449
1450        assert_eq!(cfg.kafka_properties.len(), 3);
1451        // session.timeout.ms and max.poll.interval.ms are passed through as
1452        // kafka_properties, but blocked by is_blocked_passthrough_key() in
1453        // to_rdkafka_config() — they won't override explicit settings.
1454        assert_eq!(
1455            cfg.kafka_properties.get("session.timeout.ms"),
1456            Some(&"30000".to_string())
1457        );
1458        assert_eq!(
1459            cfg.kafka_properties.get("max.poll.interval.ms"),
1460            Some(&"300000".to_string())
1461        );
1462    }
1463
1464    #[test]
1465    fn test_parse_schema_registry() {
1466        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1467            ("format", "avro"),
1468            ("schema.registry.url", "http://localhost:8081"),
1469            ("schema.registry.username", "user"),
1470            ("schema.registry.password", "pass"),
1471            ("schema.compatibility", "FULL_TRANSITIVE"),
1472            ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
1473        ]))
1474        .unwrap();
1475
1476        assert_eq!(cfg.format, Format::Avro);
1477        assert_eq!(
1478            cfg.schema_registry_url,
1479            Some("http://localhost:8081".to_string())
1480        );
1481        assert!(cfg.schema_registry_auth.is_some());
1482        let auth = cfg.schema_registry_auth.unwrap();
1483        assert_eq!(auth.username, "user");
1484        assert_eq!(auth.password, "pass");
1485        assert_eq!(
1486            cfg.schema_compatibility,
1487            Some(CompatibilityLevel::FullTransitive)
1488        );
1489        assert_eq!(
1490            cfg.schema_registry_ssl_ca_location,
1491            Some("/etc/ssl/sr-ca.pem".to_string())
1492        );
1493    }
1494
1495    #[test]
1496    fn test_parse_sr_auth_partial() {
1497        let config = make_config(&[
1498            ("schema.registry.url", "http://localhost:8081"),
1499            ("schema.registry.username", "user"),
1500            // missing password
1501        ]);
1502        assert!(KafkaSourceConfig::from_config(&config).is_err());
1503    }
1504
1505    #[test]
1506    fn test_validate_avro_without_sr() {
1507        let mut cfg = KafkaSourceConfig::default();
1508        cfg.bootstrap_servers = "localhost:9092".into();
1509        cfg.group_id = "g".into();
1510        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1511        cfg.format = Format::Avro;
1512        // No schema_registry_url
1513        assert!(cfg.validate().is_err());
1514    }
1515
1516    #[test]
1517    fn test_validate_backpressure_watermarks() {
1518        let mut cfg = KafkaSourceConfig::default();
1519        cfg.bootstrap_servers = "localhost:9092".into();
1520        cfg.group_id = "g".into();
1521        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1522        cfg.backpressure_high_watermark = 0.3;
1523        cfg.backpressure_low_watermark = 0.5;
1524        assert!(cfg.validate().is_err());
1525    }
1526
1527    #[test]
1528    fn test_validate_sasl_without_mechanism() {
1529        let mut cfg = KafkaSourceConfig::default();
1530        cfg.bootstrap_servers = "localhost:9092".into();
1531        cfg.group_id = "g".into();
1532        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1533        cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1534        // sasl_mechanism not set
1535        assert!(cfg.validate().is_err());
1536    }
1537
1538    #[test]
1539    fn test_validate_sasl_plain_without_credentials() {
1540        let mut cfg = KafkaSourceConfig::default();
1541        cfg.bootstrap_servers = "localhost:9092".into();
1542        cfg.group_id = "g".into();
1543        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1544        cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1545        cfg.sasl_mechanism = Some(SaslMechanism::Plain);
1546        // username/password not set
1547        assert!(cfg.validate().is_err());
1548    }
1549
1550    #[test]
1551    fn test_validate_empty_topic_pattern() {
1552        let mut cfg = KafkaSourceConfig::default();
1553        cfg.bootstrap_servers = "localhost:9092".into();
1554        cfg.group_id = "g".into();
1555        cfg.subscription = TopicSubscription::Pattern(String::new());
1556        assert!(cfg.validate().is_err());
1557    }
1558
1559    #[test]
1560    fn test_rdkafka_config() {
1561        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1562            ("auto.offset.reset", "latest"),
1563            ("kafka.fetch.min.bytes", "1024"),
1564        ]))
1565        .unwrap();
1566
1567        let rdkafka = cfg.to_rdkafka_config();
1568        assert_eq!(rdkafka.get("bootstrap.servers"), Some("localhost:9092"));
1569        assert_eq!(rdkafka.get("group.id"), Some("test-group"));
1570        assert_eq!(rdkafka.get("enable.auto.commit"), Some("false"));
1571        assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1572        assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1573        assert_eq!(rdkafka.get("security.protocol"), Some("plaintext"));
1574        assert_eq!(rdkafka.get("isolation.level"), Some("read_committed"));
1575    }
1576
1577    #[test]
1578    fn test_rdkafka_config_with_security() {
1579        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1580            ("security.protocol", "sasl_ssl"),
1581            ("sasl.mechanism", "PLAIN"),
1582            ("sasl.username", "user"),
1583            ("sasl.password", "pass"),
1584            ("ssl.ca.location", "/ca.pem"),
1585        ]))
1586        .unwrap();
1587
1588        let rdkafka = cfg.to_rdkafka_config();
1589        assert_eq!(rdkafka.get("security.protocol"), Some("sasl_ssl"));
1590        assert_eq!(rdkafka.get("sasl.mechanism"), Some("PLAIN"));
1591        assert_eq!(rdkafka.get("sasl.username"), Some("user"));
1592        assert_eq!(rdkafka.get("sasl.password"), Some("pass"));
1593        assert_eq!(rdkafka.get("ssl.ca.location"), Some("/ca.pem"));
1594    }
1595
1596    #[test]
1597    fn test_rdkafka_config_with_fetch_tuning() {
1598        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1599            ("fetch.min.bytes", "1024"),
1600            ("fetch.max.bytes", "1048576"),
1601            ("fetch.max.wait.ms", "500"),
1602            ("max.partition.fetch.bytes", "262144"),
1603            ("isolation.level", "read_uncommitted"),
1604        ]))
1605        .unwrap();
1606
1607        let rdkafka = cfg.to_rdkafka_config();
1608        assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1609        assert_eq!(rdkafka.get("fetch.max.bytes"), Some("1048576"));
1610        assert_eq!(rdkafka.get("fetch.wait.max.ms"), Some("500"));
1611        assert_eq!(rdkafka.get("max.partition.fetch.bytes"), Some("262144"));
1612        assert_eq!(rdkafka.get("isolation.level"), Some("read_uncommitted"));
1613    }
1614
1615    #[test]
1616    fn test_offset_reset_parsing() {
1617        assert_eq!(
1618            "earliest".parse::<OffsetReset>().unwrap(),
1619            OffsetReset::Earliest
1620        );
1621        assert_eq!(
1622            "latest".parse::<OffsetReset>().unwrap(),
1623            OffsetReset::Latest
1624        );
1625        assert_eq!("none".parse::<OffsetReset>().unwrap(), OffsetReset::None);
1626        assert!("invalid".parse::<OffsetReset>().is_err());
1627    }
1628
1629    #[test]
1630    fn test_compatibility_level_parsing() {
1631        assert_eq!(
1632            "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
1633            CompatibilityLevel::Backward
1634        );
1635        assert_eq!(
1636            "full_transitive".parse::<CompatibilityLevel>().unwrap(),
1637            CompatibilityLevel::FullTransitive
1638        );
1639        assert_eq!(
1640            "NONE".parse::<CompatibilityLevel>().unwrap(),
1641            CompatibilityLevel::None
1642        );
1643        assert!("invalid".parse::<CompatibilityLevel>().is_err());
1644    }
1645
1646    #[test]
1647    fn test_security_protocol_parsing() {
1648        assert_eq!(
1649            "plaintext".parse::<SecurityProtocol>().unwrap(),
1650            SecurityProtocol::Plaintext
1651        );
1652        assert_eq!(
1653            "SSL".parse::<SecurityProtocol>().unwrap(),
1654            SecurityProtocol::Ssl
1655        );
1656        assert_eq!(
1657            "sasl_plaintext".parse::<SecurityProtocol>().unwrap(),
1658            SecurityProtocol::SaslPlaintext
1659        );
1660        assert_eq!(
1661            "SASL_SSL".parse::<SecurityProtocol>().unwrap(),
1662            SecurityProtocol::SaslSsl
1663        );
1664        assert_eq!(
1665            "sasl-ssl".parse::<SecurityProtocol>().unwrap(),
1666            SecurityProtocol::SaslSsl
1667        );
1668        assert!("invalid".parse::<SecurityProtocol>().is_err());
1669    }
1670
1671    #[test]
1672    fn test_sasl_mechanism_parsing() {
1673        assert_eq!(
1674            "PLAIN".parse::<SaslMechanism>().unwrap(),
1675            SaslMechanism::Plain
1676        );
1677        assert_eq!(
1678            "SCRAM-SHA-256".parse::<SaslMechanism>().unwrap(),
1679            SaslMechanism::ScramSha256
1680        );
1681        assert_eq!(
1682            "scram_sha_512".parse::<SaslMechanism>().unwrap(),
1683            SaslMechanism::ScramSha512
1684        );
1685        assert_eq!(
1686            "GSSAPI".parse::<SaslMechanism>().unwrap(),
1687            SaslMechanism::Gssapi
1688        );
1689        assert_eq!(
1690            "OAUTHBEARER".parse::<SaslMechanism>().unwrap(),
1691            SaslMechanism::Oauthbearer
1692        );
1693        assert!("invalid".parse::<SaslMechanism>().is_err());
1694    }
1695
1696    #[test]
1697    fn test_isolation_level_parsing() {
1698        assert_eq!(
1699            "read_uncommitted".parse::<IsolationLevel>().unwrap(),
1700            IsolationLevel::ReadUncommitted
1701        );
1702        assert_eq!(
1703            "read_committed".parse::<IsolationLevel>().unwrap(),
1704            IsolationLevel::ReadCommitted
1705        );
1706        assert_eq!(
1707            "read-committed".parse::<IsolationLevel>().unwrap(),
1708            IsolationLevel::ReadCommitted
1709        );
1710        assert!("invalid".parse::<IsolationLevel>().is_err());
1711    }
1712
1713    #[test]
1714    fn test_topic_subscription_accessors() {
1715        let topics = TopicSubscription::Topics(vec!["a".into(), "b".into()]);
1716        assert_eq!(
1717            topics.topics(),
1718            Some(&["a".to_string(), "b".to_string()][..])
1719        );
1720        assert!(topics.pattern().is_none());
1721        assert!(!topics.is_pattern());
1722
1723        let pattern = TopicSubscription::Pattern("events-.*".into());
1724        assert!(pattern.topics().is_none());
1725        assert_eq!(pattern.pattern(), Some("events-.*"));
1726        assert!(pattern.is_pattern());
1727    }
1728
1729    #[test]
1730    fn test_security_protocol_helpers() {
1731        assert!(!SecurityProtocol::Plaintext.uses_ssl());
1732        assert!(!SecurityProtocol::Plaintext.uses_sasl());
1733
1734        assert!(SecurityProtocol::Ssl.uses_ssl());
1735        assert!(!SecurityProtocol::Ssl.uses_sasl());
1736
1737        assert!(!SecurityProtocol::SaslPlaintext.uses_ssl());
1738        assert!(SecurityProtocol::SaslPlaintext.uses_sasl());
1739
1740        assert!(SecurityProtocol::SaslSsl.uses_ssl());
1741        assert!(SecurityProtocol::SaslSsl.uses_sasl());
1742    }
1743
1744    #[test]
1745    fn test_sasl_mechanism_helpers() {
1746        assert!(SaslMechanism::Plain.requires_credentials());
1747        assert!(SaslMechanism::ScramSha256.requires_credentials());
1748        assert!(SaslMechanism::ScramSha512.requires_credentials());
1749        assert!(!SaslMechanism::Gssapi.requires_credentials());
1750        assert!(!SaslMechanism::Oauthbearer.requires_credentials());
1751    }
1752
1753    #[test]
1754    fn test_enum_display() {
1755        assert_eq!(SecurityProtocol::SaslSsl.to_string(), "sasl_ssl");
1756        assert_eq!(SaslMechanism::ScramSha256.to_string(), "SCRAM-SHA-256");
1757        assert_eq!(IsolationLevel::ReadCommitted.to_string(), "read_committed");
1758    }
1759
1760    #[test]
1761    fn test_startup_mode_parsing() {
1762        assert_eq!(
1763            "group-offsets".parse::<StartupMode>().unwrap(),
1764            StartupMode::GroupOffsets
1765        );
1766        assert_eq!(
1767            "group_offsets".parse::<StartupMode>().unwrap(),
1768            StartupMode::GroupOffsets
1769        );
1770        assert_eq!(
1771            "earliest".parse::<StartupMode>().unwrap(),
1772            StartupMode::Earliest
1773        );
1774        assert_eq!(
1775            "latest".parse::<StartupMode>().unwrap(),
1776            StartupMode::Latest
1777        );
1778        assert!("invalid".parse::<StartupMode>().is_err());
1779    }
1780
1781    #[test]
1782    fn test_startup_mode_display() {
1783        assert_eq!(StartupMode::GroupOffsets.to_string(), "group-offsets");
1784        assert_eq!(StartupMode::Earliest.to_string(), "earliest");
1785        assert_eq!(StartupMode::Latest.to_string(), "latest");
1786
1787        let specific = StartupMode::SpecificOffsets(HashMap::from([(0, 100), (1, 200)]));
1788        assert!(specific.to_string().contains("2 partitions"));
1789
1790        let ts = StartupMode::Timestamp(1234567890000);
1791        assert!(ts.to_string().contains("1234567890000"));
1792    }
1793
1794    #[test]
1795    fn test_startup_mode_latest_overrides_offset_reset() {
1796        let cfg =
1797            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1798        assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1799        let rdkafka = cfg.to_rdkafka_config();
1800        assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1801    }
1802
1803    #[test]
1804    fn test_startup_mode_earliest_overrides_offset_reset() {
1805        let cfg =
1806            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1807        assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1808    }
1809
1810    #[test]
1811    fn test_startup_mode_group_offsets_uses_explicit_offset_reset() {
1812        // When startup.mode is group-offsets (default), the explicit
1813        // auto.offset.reset setting should be used.
1814        let cfg = KafkaSourceConfig::from_config(&make_config(&[("auto.offset.reset", "latest")]))
1815            .unwrap();
1816        assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1817    }
1818
1819    #[test]
1820    fn test_parse_specific_offsets() {
1821        let offsets = parse_specific_offsets("0:100,1:200,2:300").unwrap();
1822        assert_eq!(offsets.get(&0), Some(&100));
1823        assert_eq!(offsets.get(&1), Some(&200));
1824        assert_eq!(offsets.get(&2), Some(&300));
1825    }
1826
1827    #[test]
1828    fn test_parse_specific_offsets_with_spaces() {
1829        let offsets = parse_specific_offsets(" 0:100 , 1:200 ").unwrap();
1830        assert_eq!(offsets.get(&0), Some(&100));
1831        assert_eq!(offsets.get(&1), Some(&200));
1832    }
1833
1834    #[test]
1835    fn test_parse_specific_offsets_errors() {
1836        assert!(parse_specific_offsets("").is_err());
1837        assert!(parse_specific_offsets("0").is_err());
1838        assert!(parse_specific_offsets("0:abc").is_err());
1839        assert!(parse_specific_offsets("abc:100").is_err());
1840        assert!(parse_specific_offsets("0:100:extra").is_err());
1841    }
1842
1843    #[test]
1844    fn test_parse_startup_mode_from_config() {
1845        let cfg =
1846            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1847        assert_eq!(cfg.startup_mode, StartupMode::Earliest);
1848
1849        let cfg =
1850            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1851        assert_eq!(cfg.startup_mode, StartupMode::Latest);
1852
1853        let cfg =
1854            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "group-offsets")]))
1855                .unwrap();
1856        assert_eq!(cfg.startup_mode, StartupMode::GroupOffsets);
1857    }
1858
1859    #[test]
1860    fn test_parse_startup_specific_offsets_from_config() {
1861        let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1862            "startup.specific.offsets",
1863            "0:100,1:200",
1864        )]))
1865        .unwrap();
1866
1867        match cfg.startup_mode {
1868            StartupMode::SpecificOffsets(offsets) => {
1869                assert_eq!(offsets.get(&0), Some(&100));
1870                assert_eq!(offsets.get(&1), Some(&200));
1871            }
1872            _ => panic!("expected SpecificOffsets"),
1873        }
1874    }
1875
1876    #[test]
1877    fn test_parse_startup_timestamp_from_config() {
1878        let cfg =
1879            KafkaSourceConfig::from_config(&make_config(&[("startup.timestamp.ms", "1234567890")]))
1880                .unwrap();
1881
1882        assert_eq!(cfg.startup_mode, StartupMode::Timestamp(1234567890));
1883    }
1884
1885    #[test]
1886    fn test_parse_schema_registry_ssl_fields() {
1887        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1888            ("schema.registry.url", "https://sr:8081"),
1889            ("schema.registry.ssl.ca.location", "/ca.pem"),
1890            ("schema.registry.ssl.certificate.location", "/cert.pem"),
1891            ("schema.registry.ssl.key.location", "/key.pem"),
1892        ]))
1893        .unwrap();
1894
1895        assert_eq!(
1896            cfg.schema_registry_ssl_ca_location,
1897            Some("/ca.pem".to_string())
1898        );
1899        assert_eq!(
1900            cfg.schema_registry_ssl_certificate_location,
1901            Some("/cert.pem".to_string())
1902        );
1903        assert_eq!(
1904            cfg.schema_registry_ssl_key_location,
1905            Some("/key.pem".to_string())
1906        );
1907    }
1908
1909    #[test]
1910    fn test_parse_watermark_defaults() {
1911        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1912        assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(5));
1913        assert_eq!(cfg.idle_timeout, Duration::from_secs(30));
1914        assert!(!cfg.enable_watermark_tracking);
1915    }
1916
1917    #[test]
1918    fn test_parse_watermark_tracking_enabled() {
1919        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1920            ("enable.watermark.tracking", "true"),
1921            ("max.out.of.orderness.ms", "10000"),
1922            ("idle.timeout.ms", "60000"),
1923        ]))
1924        .unwrap();
1925
1926        assert!(cfg.enable_watermark_tracking);
1927        assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(10));
1928        assert_eq!(cfg.idle_timeout, Duration::from_secs(60));
1929    }
1930
1931    // -- startup.mode = timestamp error --
1932
1933    #[test]
1934    fn test_startup_mode_timestamp_error() {
1935        let config = make_config(&[("startup.mode", "timestamp")]);
1936        let err = KafkaSourceConfig::from_config(&config).unwrap_err();
1937        let msg = err.to_string();
1938        assert!(
1939            msg.contains("startup.timestamp.ms"),
1940            "error should mention startup.timestamp.ms, got: {msg}"
1941        );
1942    }
1943
1944    // -- session timeout / heartbeat interval --
1945
1946    #[test]
1947    fn test_session_timeout_heartbeat_defaults() {
1948        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1949        assert_eq!(cfg.session_timeout, Duration::from_secs(45));
1950        assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
1951    }
1952
1953    #[test]
1954    fn test_session_timeout_heartbeat_custom() {
1955        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1956            ("session.timeout.ms", "60000"),
1957            ("heartbeat.interval.ms", "15000"),
1958        ]))
1959        .unwrap();
1960        assert_eq!(cfg.session_timeout, Duration::from_secs(60));
1961        assert_eq!(cfg.heartbeat_interval, Duration::from_secs(15));
1962    }
1963
1964    #[test]
1965    fn test_session_timeout_heartbeat_validation_fails() {
1966        // heartbeat=20s * 3 = 60s >= session=45s → error
1967        let config = make_config(&[
1968            ("session.timeout.ms", "45000"),
1969            ("heartbeat.interval.ms", "20000"),
1970        ]);
1971        let err = KafkaSourceConfig::from_config(&config).unwrap_err();
1972        let msg = err.to_string();
1973        assert!(msg.contains("heartbeat.interval.ms"), "got: {msg}");
1974    }
1975
1976    #[test]
1977    fn test_session_timeout_heartbeat_validation_passes() {
1978        // heartbeat=10s * 3 = 30s < session=45s → ok
1979        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1980            ("session.timeout.ms", "45000"),
1981            ("heartbeat.interval.ms", "10000"),
1982        ]))
1983        .unwrap();
1984        assert_eq!(cfg.session_timeout, Duration::from_secs(45));
1985        assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
1986    }
1987
1988    #[test]
1989    fn test_session_timeout_heartbeat_in_rdkafka_config() {
1990        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1991            ("session.timeout.ms", "60000"),
1992            ("heartbeat.interval.ms", "15000"),
1993        ]))
1994        .unwrap();
1995        let rdkafka = cfg.to_rdkafka_config();
1996        assert_eq!(rdkafka.get("session.timeout.ms"), Some("60000"));
1997        assert_eq!(rdkafka.get("heartbeat.interval.ms"), Some("15000"));
1998    }
1999
2000    #[test]
2001    fn test_session_timeout_passthrough_blocked() {
2002        let cfg =
2003            KafkaSourceConfig::from_config(&make_config(&[("kafka.session.timeout.ms", "99999")]))
2004                .unwrap();
2005        // The pass-through should be blocked; rdkafka config should use the default (45000).
2006        let rdkafka = cfg.to_rdkafka_config();
2007        assert_eq!(rdkafka.get("session.timeout.ms"), Some("45000"));
2008    }
2009
2010    // -- queued.max.messages.kbytes --
2011
2012    #[test]
2013    fn test_queued_max_messages_kbytes_default() {
2014        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
2015        assert_eq!(cfg.queued_max_messages_kbytes, 16384);
2016    }
2017
2018    #[test]
2019    fn test_queued_max_messages_kbytes_custom() {
2020        let cfg = KafkaSourceConfig::from_config(&make_config(&[(
2021            "queued.max.messages.kbytes",
2022            "32768",
2023        )]))
2024        .unwrap();
2025        assert_eq!(cfg.queued_max_messages_kbytes, 32768);
2026    }
2027
2028    #[test]
2029    fn test_queued_max_messages_kbytes_in_rdkafka_config() {
2030        let cfg =
2031            KafkaSourceConfig::from_config(&make_config(&[("queued.max.messages.kbytes", "8192")]))
2032                .unwrap();
2033        let rdkafka = cfg.to_rdkafka_config();
2034        assert_eq!(rdkafka.get("queued.max.messages.kbytes"), Some("8192"));
2035    }
2036
2037    // ── Schema Registry subject-name strategy ──
2038
2039    #[test]
2040    fn resolve_subject_topic_name() {
2041        assert_eq!(
2042            resolve_value_subject(SubjectNameStrategy::TopicName, None, "orders"),
2043            "orders-value"
2044        );
2045    }
2046
2047    #[test]
2048    fn resolve_subject_record_name() {
2049        assert_eq!(
2050            resolve_value_subject(
2051                SubjectNameStrategy::RecordName,
2052                Some("com.acme.Order"),
2053                "orders"
2054            ),
2055            "com.acme.Order-value"
2056        );
2057    }
2058
2059    #[test]
2060    fn resolve_subject_topic_record_name() {
2061        assert_eq!(
2062            resolve_value_subject(
2063                SubjectNameStrategy::TopicRecordName,
2064                Some("com.acme.Order"),
2065                "orders"
2066            ),
2067            "orders-com.acme.Order-value"
2068        );
2069    }
2070
2071    #[test]
2072    fn parse_subject_strategy_from_config() {
2073        let cfg = KafkaSourceConfig::from_config(&make_config(&[
2074            ("schema.registry.url", "http://sr:8081"),
2075            ("schema.registry.subject.name.strategy", "record-name"),
2076            ("schema.registry.record.name", "com.acme.Order"),
2077        ]))
2078        .unwrap();
2079        assert_eq!(
2080            cfg.schema_registry_subject_strategy,
2081            SubjectNameStrategy::RecordName
2082        );
2083        assert_eq!(
2084            cfg.schema_registry_record_name.as_deref(),
2085            Some("com.acme.Order")
2086        );
2087    }
2088
2089    #[test]
2090    fn parse_subject_strategy_rejects_missing_record_name() {
2091        let err = KafkaSourceConfig::from_config(&make_config(&[
2092            ("schema.registry.url", "http://sr:8081"),
2093            ("schema.registry.subject.name.strategy", "record-name"),
2094        ]))
2095        .unwrap_err();
2096        assert!(matches!(err, ConnectorError::ConfigurationError(_)));
2097    }
2098
2099    #[test]
2100    fn parse_discovery_timeout_default_ten_seconds() {
2101        let cfg =
2102            KafkaSourceConfig::from_config(&make_config(&[("schema.registry.url", "http://sr")]))
2103                .unwrap();
2104        assert_eq!(
2105            cfg.schema_registry_discovery_timeout,
2106            Duration::from_secs(10)
2107        );
2108    }
2109
2110    #[test]
2111    fn parse_discovery_timeout_override() {
2112        let cfg = KafkaSourceConfig::from_config(&make_config(&[
2113            ("schema.registry.url", "http://sr"),
2114            ("schema.registry.discovery.timeout.ms", "25000"),
2115        ]))
2116        .unwrap();
2117        assert_eq!(
2118            cfg.schema_registry_discovery_timeout,
2119            Duration::from_secs(25)
2120        );
2121    }
2122}