Skip to main content

laminar_connectors/kafka/
config.rs

1//! Kafka source config — broker connection, format, Schema Registry,
2//! backpressure, and pass-through `rdkafka` properties.
3
4use std::collections::HashMap;
5use std::time::Duration;
6
7use rdkafka::config::ClientConfig;
8
9use crate::config::ConnectorConfig;
10use crate::error::ConnectorError;
11use crate::serde::Format;
12
13/// Kafka security protocol for broker connections.
14///
15/// Determines encryption (SSL/TLS) and authentication (SASL) requirements.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum SecurityProtocol {
18    /// Plain-text communication (no encryption, no authentication).
19    #[default]
20    Plaintext,
21    /// SSL/TLS encryption without SASL authentication.
22    Ssl,
23    /// SASL authentication over plain-text connection.
24    SaslPlaintext,
25    /// SASL authentication over SSL/TLS encrypted connection.
26    SaslSsl,
27}
28
29impl SecurityProtocol {
30    /// Returns the rdkafka config value string.
31    #[must_use]
32    pub fn as_rdkafka_str(&self) -> &'static str {
33        match self {
34            SecurityProtocol::Plaintext => "plaintext",
35            SecurityProtocol::Ssl => "ssl",
36            SecurityProtocol::SaslPlaintext => "sasl_plaintext",
37            SecurityProtocol::SaslSsl => "sasl_ssl",
38        }
39    }
40
41    /// Returns true if this protocol uses SSL/TLS.
42    #[must_use]
43    pub fn uses_ssl(&self) -> bool {
44        matches!(self, SecurityProtocol::Ssl | SecurityProtocol::SaslSsl)
45    }
46
47    /// Returns true if this protocol uses SASL authentication.
48    #[must_use]
49    pub fn uses_sasl(&self) -> bool {
50        matches!(
51            self,
52            SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl
53        )
54    }
55}
56
57str_enum!(fromstr SecurityProtocol, lowercase, ConnectorError, "invalid security.protocol",
58    Plaintext => "plaintext";
59    Ssl => "ssl";
60    SaslPlaintext => "sasl_plaintext";
61    SaslSsl => "sasl_ssl"
62);
63
64impl std::fmt::Display for SecurityProtocol {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        write!(f, "{}", self.as_rdkafka_str())
67    }
68}
69
70/// SASL authentication mechanism for Kafka.
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
72pub enum SaslMechanism {
73    /// PLAIN: Simple username/password authentication.
74    #[default]
75    Plain,
76    /// SCRAM-SHA-256: Salted Challenge Response Authentication Mechanism.
77    ScramSha256,
78    /// SCRAM-SHA-512: Salted Challenge Response Authentication Mechanism (stronger).
79    ScramSha512,
80    /// GSSAPI: Kerberos authentication.
81    Gssapi,
82    /// OAUTHBEARER: OAuth 2.0 bearer token authentication.
83    Oauthbearer,
84}
85
86impl SaslMechanism {
87    /// Returns the rdkafka config value string.
88    #[must_use]
89    pub fn as_rdkafka_str(&self) -> &'static str {
90        match self {
91            SaslMechanism::Plain => "PLAIN",
92            SaslMechanism::ScramSha256 => "SCRAM-SHA-256",
93            SaslMechanism::ScramSha512 => "SCRAM-SHA-512",
94            SaslMechanism::Gssapi => "GSSAPI",
95            SaslMechanism::Oauthbearer => "OAUTHBEARER",
96        }
97    }
98
99    /// Returns true if this mechanism requires username/password.
100    #[must_use]
101    pub fn requires_credentials(&self) -> bool {
102        matches!(
103            self,
104            SaslMechanism::Plain | SaslMechanism::ScramSha256 | SaslMechanism::ScramSha512
105        )
106    }
107}
108
109str_enum!(fromstr SaslMechanism, uppercase, ConnectorError, "invalid sasl.mechanism",
110    Plain => "PLAIN";
111    ScramSha256 => "SCRAM_SHA_256", "SCRAM_SHA256";
112    ScramSha512 => "SCRAM_SHA_512", "SCRAM_SHA512";
113    Gssapi => "GSSAPI", "KERBEROS";
114    Oauthbearer => "OAUTHBEARER", "OAUTH"
115);
116
117impl std::fmt::Display for SaslMechanism {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        write!(f, "{}", self.as_rdkafka_str())
120    }
121}
122
123/// Consumer isolation level for reading transactional messages.
124///
125/// Controls whether to read uncommitted messages from transactional producers.
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum IsolationLevel {
128    /// Read all messages including uncommitted transactional messages.
129    ReadUncommitted,
130    /// Only read committed messages (recommended for transactional pipelines).
131    #[default]
132    ReadCommitted,
133}
134
135impl IsolationLevel {
136    /// Returns the rdkafka config value string.
137    #[must_use]
138    pub fn as_rdkafka_str(&self) -> &'static str {
139        match self {
140            IsolationLevel::ReadUncommitted => "read_uncommitted",
141            IsolationLevel::ReadCommitted => "read_committed",
142        }
143    }
144}
145
146str_enum!(fromstr IsolationLevel, lowercase, ConnectorError, "invalid isolation.level",
147    ReadUncommitted => "read_uncommitted";
148    ReadCommitted => "read_committed"
149);
150
151impl std::fmt::Display for IsolationLevel {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        write!(f, "{}", self.as_rdkafka_str())
154    }
155}
156
157/// Consumer startup mode controlling where to begin consuming.
158///
159/// This is a higher-level abstraction than `auto.offset.reset` that provides
160/// more control over initial positioning, including timestamp-based and
161/// partition-specific offset assignment.
162#[derive(Debug, Clone, PartialEq, Eq, Default)]
163pub enum StartupMode {
164    /// Use committed group offsets, fall back to `auto.offset.reset` if none exist.
165    #[default]
166    GroupOffsets,
167    /// Start from the earliest available offset in each partition.
168    Earliest,
169    /// Start from the latest offset in each partition (only new messages).
170    Latest,
171    /// Start from specific offsets per partition (`partition_id` -> offset).
172    SpecificOffsets(HashMap<i32, i64>),
173    /// Start from a specific timestamp (milliseconds since epoch).
174    /// The consumer seeks to the first message with timestamp >= this value.
175    Timestamp(i64),
176}
177
178impl StartupMode {}
179
180impl std::str::FromStr for StartupMode {
181    type Err = ConnectorError;
182
183    fn from_str(s: &str) -> Result<Self, Self::Err> {
184        match s.to_lowercase().replace('-', "_").as_str() {
185            "group_offsets" | "group" => Ok(StartupMode::GroupOffsets),
186            "earliest" => Ok(StartupMode::Earliest),
187            "latest" => Ok(StartupMode::Latest),
188            "timestamp" => Err(ConnectorError::ConfigurationError(
189                "use 'startup.timestamp.ms' to start from a timestamp, \
190                 not 'startup.mode = timestamp'"
191                    .into(),
192            )),
193            other => Err(ConnectorError::ConfigurationError(format!(
194                "invalid startup.mode: '{other}' (expected group-offsets/earliest/latest)"
195            ))),
196        }
197    }
198}
199
200impl std::fmt::Display for StartupMode {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        match self {
203            StartupMode::GroupOffsets => write!(f, "group-offsets"),
204            StartupMode::Earliest => write!(f, "earliest"),
205            StartupMode::Latest => write!(f, "latest"),
206            StartupMode::SpecificOffsets(offsets) => {
207                write!(f, "specific-offsets({} partitions)", offsets.len())
208            }
209            StartupMode::Timestamp(ts) => write!(f, "timestamp({ts})"),
210        }
211    }
212}
213
214/// Topic subscription mode: explicit list or regex pattern.
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum TopicSubscription {
217    /// Subscribe to a specific list of topic names.
218    Topics(Vec<String>),
219    /// Subscribe to topics matching a regex pattern (e.g., `events-.*`).
220    Pattern(String),
221}
222
223impl TopicSubscription {
224    /// Returns the topic names if this is a `Topics` subscription.
225    #[must_use]
226    pub fn topics(&self) -> Option<&[String]> {
227        match self {
228            TopicSubscription::Topics(t) => Some(t),
229            TopicSubscription::Pattern(_) => None,
230        }
231    }
232
233    /// Returns the pattern if this is a `Pattern` subscription.
234    #[must_use]
235    pub fn pattern(&self) -> Option<&str> {
236        match self {
237            TopicSubscription::Topics(_) => None,
238            TopicSubscription::Pattern(p) => Some(p),
239        }
240    }
241
242    /// Returns true if this is a pattern-based subscription.
243    #[must_use]
244    pub fn is_pattern(&self) -> bool {
245        matches!(self, TopicSubscription::Pattern(_))
246    }
247}
248
249impl Default for TopicSubscription {
250    fn default() -> Self {
251        TopicSubscription::Topics(Vec::new())
252    }
253}
254
255/// Auto-offset reset policy for new consumer groups.
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum OffsetReset {
258    /// Start from the earliest available offset.
259    Earliest,
260    /// Start from the latest offset (only new messages).
261    Latest,
262    /// Fail if no committed offset exists.
263    None,
264}
265
266impl OffsetReset {
267    /// Returns the rdkafka config value string.
268    #[must_use]
269    pub fn as_rdkafka_str(&self) -> &'static str {
270        match self {
271            OffsetReset::Earliest => "earliest",
272            OffsetReset::Latest => "latest",
273            OffsetReset::None => "error",
274        }
275    }
276}
277
278str_enum!(fromstr OffsetReset, lowercase_nodash, ConnectorError, "invalid auto.offset.reset",
279    Earliest => "earliest", "beginning";
280    Latest => "latest", "end";
281    None => "none", "error"
282);
283
284/// Kafka partition assignment strategy.
285#[derive(Debug, Clone, Copy, PartialEq, Eq)]
286pub enum AssignmentStrategy {
287    /// Range assignment (default).
288    Range,
289    /// Round-robin assignment.
290    RoundRobin,
291    /// Cooperative sticky assignment.
292    CooperativeSticky,
293}
294
295impl AssignmentStrategy {
296    /// Returns the rdkafka config value string.
297    #[must_use]
298    pub fn as_rdkafka_str(&self) -> &'static str {
299        match self {
300            AssignmentStrategy::Range => "range",
301            AssignmentStrategy::RoundRobin => "roundrobin",
302            AssignmentStrategy::CooperativeSticky => "cooperative-sticky",
303        }
304    }
305}
306
307str_enum!(fromstr AssignmentStrategy, lowercase_nodash, ConnectorError,
308    "invalid partition.assignment.strategy",
309    Range => "range";
310    RoundRobin => "roundrobin", "round-robin", "round_robin";
311    CooperativeSticky => "cooperative-sticky", "cooperative_sticky"
312);
313
314/// Schema Registry compatibility level.
315#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316pub enum CompatibilityLevel {
317    /// New schema can read old data.
318    Backward,
319    /// Backward compatible with all prior versions.
320    BackwardTransitive,
321    /// Old schema can read new data.
322    Forward,
323    /// Forward compatible with all prior versions.
324    ForwardTransitive,
325    /// Both backward and forward compatible.
326    Full,
327    /// Full compatible with all prior versions.
328    FullTransitive,
329    /// No compatibility checking.
330    None,
331}
332
333impl CompatibilityLevel {
334    /// Returns the Schema Registry API string.
335    #[must_use]
336    pub fn as_str(&self) -> &'static str {
337        match self {
338            CompatibilityLevel::Backward => "BACKWARD",
339            CompatibilityLevel::BackwardTransitive => "BACKWARD_TRANSITIVE",
340            CompatibilityLevel::Forward => "FORWARD",
341            CompatibilityLevel::ForwardTransitive => "FORWARD_TRANSITIVE",
342            CompatibilityLevel::Full => "FULL",
343            CompatibilityLevel::FullTransitive => "FULL_TRANSITIVE",
344            CompatibilityLevel::None => "NONE",
345        }
346    }
347}
348
349str_enum!(fromstr CompatibilityLevel, uppercase, ConnectorError, "invalid schema.compatibility",
350    Backward => "BACKWARD";
351    BackwardTransitive => "BACKWARD_TRANSITIVE";
352    Forward => "FORWARD";
353    ForwardTransitive => "FORWARD_TRANSITIVE";
354    Full => "FULL";
355    FullTransitive => "FULL_TRANSITIVE";
356    None => "NONE"
357);
358
359impl std::fmt::Display for CompatibilityLevel {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        write!(f, "{}", self.as_str())
362    }
363}
364
365/// Strategy for handling Avro schema evolution at runtime.
366#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
367pub enum SchemaEvolutionStrategy {
368    /// Log schema changes, continue processing.
369    #[default]
370    Log,
371    /// Return an error on incompatible schema changes.
372    Reject,
373    /// No detection — skip schema diffing entirely.
374    Ignore,
375}
376
377str_enum!(fromstr SchemaEvolutionStrategy, lowercase, ConnectorError,
378    "invalid schema.evolution.strategy",
379    Log => "log";
380    Reject => "reject";
381    Ignore => "ignore"
382);
383
384impl std::fmt::Display for SchemaEvolutionStrategy {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        match self {
387            SchemaEvolutionStrategy::Log => write!(f, "log"),
388            SchemaEvolutionStrategy::Reject => write!(f, "reject"),
389            SchemaEvolutionStrategy::Ignore => write!(f, "ignore"),
390        }
391    }
392}
393
394/// Confluent subject-name strategy.
395#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
396pub enum SubjectNameStrategy {
397    /// `{topic}-value` (Confluent default).
398    #[default]
399    TopicName,
400    /// `{record_name}-value`. Requires `schema.registry.record.name`.
401    RecordName,
402    /// `{topic}-{record_name}-value`. Requires `schema.registry.record.name`.
403    TopicRecordName,
404}
405
406str_enum!(fromstr SubjectNameStrategy, lowercase_nodash, ConnectorError,
407    "invalid schema.registry.subject.name.strategy",
408    TopicName => "topic-name", "topicname", "topicnamestrategy";
409    RecordName => "record-name", "recordname", "recordnamestrategy";
410    TopicRecordName => "topic-record-name", "topicrecordname", "topicrecordnamestrategy"
411);
412
413impl std::fmt::Display for SubjectNameStrategy {
414    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415        match self {
416            SubjectNameStrategy::TopicName => write!(f, "topic-name"),
417            SubjectNameStrategy::RecordName => write!(f, "record-name"),
418            SubjectNameStrategy::TopicRecordName => write!(f, "topic-record-name"),
419        }
420    }
421}
422
423/// Build the SR `-value` subject for a topic. `from_config` validates
424/// that `record_name` is present for the record-based strategies, so
425/// the `expect`s are unreachable in practice.
426pub(crate) fn resolve_value_subject(
427    strategy: SubjectNameStrategy,
428    record_name: Option<&str>,
429    topic: &str,
430) -> String {
431    let name = || record_name.expect("from_config validates record.name");
432    match strategy {
433        SubjectNameStrategy::TopicName => format!("{topic}-value"),
434        SubjectNameStrategy::RecordName => format!("{}-value", name()),
435        SubjectNameStrategy::TopicRecordName => format!("{topic}-{}-value", name()),
436    }
437}
438
439/// Maps the Kafka-level [`CompatibilityLevel`] to the schema module's
440/// `CompatibilityMode` for evolution evaluation.
441impl From<CompatibilityLevel> for crate::schema::traits::CompatibilityMode {
442    fn from(level: CompatibilityLevel) -> Self {
443        use crate::schema::traits::CompatibilityMode;
444        match level {
445            CompatibilityLevel::Backward => CompatibilityMode::Backward,
446            CompatibilityLevel::BackwardTransitive => CompatibilityMode::BackwardTransitive,
447            CompatibilityLevel::Forward => CompatibilityMode::Forward,
448            CompatibilityLevel::ForwardTransitive => CompatibilityMode::ForwardTransitive,
449            CompatibilityLevel::Full => CompatibilityMode::Full,
450            CompatibilityLevel::FullTransitive => CompatibilityMode::FullTransitive,
451            CompatibilityLevel::None => CompatibilityMode::None,
452        }
453    }
454}
455
456/// Schema Registry authentication credentials.
457#[derive(Clone)]
458pub struct SrAuth {
459    /// Basic auth username.
460    pub username: String,
461    /// Basic auth password.
462    pub password: String,
463}
464
465impl std::fmt::Debug for SrAuth {
466    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467        f.debug_struct("SrAuth")
468            .field("username", &self.username)
469            .field("password", &"***")
470            .finish()
471    }
472}
473
474/// Kafka source connector configuration.
475///
476/// Uses a custom `Debug` impl that redacts `sasl_password` and
477/// `ssl_key_password` to prevent credential leakage in logs.
478#[derive(Clone)]
479#[allow(clippy::struct_excessive_bools)] // Config struct — each bool is an independent user-facing knob.
480pub struct KafkaSourceConfig {
481    // -- Required --
482    /// Comma-separated list of broker addresses.
483    pub bootstrap_servers: String,
484    /// Consumer group identifier.
485    pub group_id: String,
486    /// Topic subscription (explicit list or regex pattern).
487    pub subscription: TopicSubscription,
488
489    // -- Security --
490    /// Security protocol for broker connections.
491    pub security_protocol: SecurityProtocol,
492    /// SASL authentication mechanism.
493    pub sasl_mechanism: Option<SaslMechanism>,
494    /// SASL username (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
495    pub sasl_username: Option<String>,
496    /// SASL password (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
497    pub sasl_password: Option<String>,
498    /// Path to SSL CA certificate file (PEM format).
499    pub ssl_ca_location: Option<String>,
500    /// Path to client SSL certificate file (PEM format).
501    pub ssl_certificate_location: Option<String>,
502    /// Path to client SSL private key file (PEM format).
503    pub ssl_key_location: Option<String>,
504    /// Password for encrypted SSL private key.
505    pub ssl_key_password: Option<String>,
506
507    // -- Format & Schema --
508    /// Data format for deserialization.
509    pub format: Format,
510    /// Confluent Schema Registry URL.
511    pub schema_registry_url: Option<String>,
512    /// Schema Registry authentication credentials.
513    pub schema_registry_auth: Option<SrAuth>,
514    /// Override compatibility level for the subject.
515    pub schema_compatibility: Option<CompatibilityLevel>,
516    /// How to handle Avro schema evolution detected at runtime.
517    pub schema_evolution_strategy: SchemaEvolutionStrategy,
518    /// Schema Registry SSL CA certificate path.
519    pub schema_registry_ssl_ca_location: Option<String>,
520    /// Schema Registry SSL client certificate path.
521    pub schema_registry_ssl_certificate_location: Option<String>,
522    /// Schema Registry SSL client key path.
523    pub schema_registry_ssl_key_location: Option<String>,
524    /// Confluent subject-name strategy. Default: `TopicName`.
525    pub schema_registry_subject_strategy: SubjectNameStrategy,
526    /// Record name for `record-name` / `topic-record-name` strategies.
527    pub schema_registry_record_name: Option<String>,
528    /// Deadline for Schema Registry lookups performed during schema
529    /// auto-discovery at DDL time. Default: 10s.
530    pub schema_registry_discovery_timeout: Duration,
531    /// Column name containing the event timestamp.
532    pub event_time_column: Option<String>,
533    /// Whether to include Kafka metadata columns (_partition, _offset, _timestamp).
534    pub include_metadata: bool,
535    /// Whether to include Kafka headers as a map column (_headers).
536    pub include_headers: bool,
537
538    // -- Consumer tuning --
539    /// Consumer startup mode (controls initial offset positioning).
540    pub startup_mode: StartupMode,
541    /// Where to start reading when no committed offset exists.
542    pub auto_offset_reset: OffsetReset,
543    /// Consumer transaction isolation level.
544    pub isolation_level: IsolationLevel,
545    /// Maximum records per poll batch.
546    pub max_poll_records: usize,
547    /// Partition assignment strategy.
548    pub partition_assignment_strategy: AssignmentStrategy,
549    /// Minimum bytes to return from a fetch (allows batching).
550    pub fetch_min_bytes: Option<i32>,
551    /// Maximum bytes to return from broker per request.
552    pub fetch_max_bytes: Option<i32>,
553    /// Maximum time broker waits for fetch.min.bytes.
554    pub fetch_max_wait_ms: Option<i32>,
555    /// Maximum bytes per partition to return from broker.
556    pub max_partition_fetch_bytes: Option<i32>,
557
558    // -- Watermark --
559    /// Maximum expected out-of-orderness for watermark generation.
560    pub max_out_of_orderness: Duration,
561    /// Timeout before marking a partition as idle.
562    pub idle_timeout: Duration,
563    /// Enable per-partition watermark tracking (integrates with watermark tracking).
564    pub enable_watermark_tracking: bool,
565    // -- Consumer group timing --
566    /// Consumer session timeout (default: 45s — production-safe; rdkafka's
567    /// aggressive 10s default causes rebalance storms under GC pauses).
568    pub session_timeout: Duration,
569    /// Consumer heartbeat interval (default: 10s). Must satisfy
570    /// `heartbeat_interval * 3 < session_timeout` per Kafka broker requirement.
571    pub heartbeat_interval: Duration,
572    /// Maximum interval between calls to `rd_kafka_consumer_poll()` before
573    /// the broker considers this consumer dead and triggers a rebalance.
574    ///
575    /// Default: 600s (10 minutes). rdkafka's default is 300s, but with
576    /// reader-side pause/resume the reader keeps polling even under
577    /// backpressure, so this is a safety margin. Must be >= 60s.
578    pub max_poll_interval: Duration,
579    /// Maximum per-partition pre-fetch queue size in kbytes (default: 16384 = 16MB).
580    /// rdkafka's 64MB default is too aggressive for an embedded database.
581    pub queued_max_messages_kbytes: u32,
582
583    // -- Broker commit --
584    /// Interval at which to asynchronously commit offsets to the Kafka broker.
585    ///
586    /// This is advisory — it keeps `kafka-consumer-groups` lag monitoring
587    /// accurate. The authoritative offset state lives in `LaminarDB`'s
588    /// checkpoint system. Set to `Duration::ZERO` to disable periodic
589    /// broker commits (default: 5s).
590    pub broker_commit_interval: Duration,
591
592    // -- Backpressure --
593    /// Capacity of the bounded channel between the background Kafka reader
594    /// task and `poll_batch()` (default: 1024). Must be >= `max_poll_records`.
595    pub reader_channel_capacity: usize,
596    /// Channel fill ratio at which to pause consumption.
597    pub backpressure_high_watermark: f64,
598    /// Channel fill ratio at which to resume consumption.
599    pub backpressure_low_watermark: f64,
600
601    // -- Error handling --
602    /// Maximum tolerated deserialization error rate per batch (0.0-1.0).
603    ///
604    /// When the poison pill fallback is active and the error rate exceeds
605    /// this threshold, the batch is rejected instead of returning partial
606    /// results. Prevents silent data loss when a schema change makes most
607    /// records unparseable. Default: 0.5 (50%).
608    pub max_deser_error_rate: f64,
609
610    // -- Pass-through --
611    /// Additional rdkafka properties passed directly to librdkafka.
612    pub kafka_properties: HashMap<String, String>,
613}
614
615impl std::fmt::Debug for KafkaSourceConfig {
616    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617        f.debug_struct("KafkaSourceConfig")
618            .field("bootstrap_servers", &self.bootstrap_servers)
619            .field("group_id", &self.group_id)
620            .field("subscription", &self.subscription)
621            .field("format", &self.format)
622            .field("security_protocol", &self.security_protocol)
623            .field("sasl_mechanism", &self.sasl_mechanism)
624            .field("sasl_username", &self.sasl_username)
625            .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
626            .field(
627                "ssl_key_password",
628                &self.ssl_key_password.as_ref().map(|_| "***"),
629            )
630            .field("max_poll_records", &self.max_poll_records)
631            .finish_non_exhaustive()
632    }
633}
634
635impl Default for KafkaSourceConfig {
636    fn default() -> Self {
637        Self {
638            bootstrap_servers: String::new(),
639            group_id: String::new(),
640            subscription: TopicSubscription::default(),
641            security_protocol: SecurityProtocol::default(),
642            sasl_mechanism: None,
643            sasl_username: None,
644            sasl_password: None,
645            ssl_ca_location: None,
646            ssl_certificate_location: None,
647            ssl_key_location: None,
648            ssl_key_password: None,
649            format: Format::Json,
650            schema_registry_url: None,
651            schema_registry_auth: None,
652            schema_compatibility: None,
653            schema_evolution_strategy: SchemaEvolutionStrategy::default(),
654            schema_registry_ssl_ca_location: None,
655            schema_registry_ssl_certificate_location: None,
656            schema_registry_ssl_key_location: None,
657            schema_registry_subject_strategy: SubjectNameStrategy::default(),
658            schema_registry_record_name: None,
659            schema_registry_discovery_timeout: Duration::from_secs(10),
660            event_time_column: None,
661            include_metadata: false,
662            include_headers: false,
663            startup_mode: StartupMode::default(),
664            auto_offset_reset: OffsetReset::Earliest,
665            isolation_level: IsolationLevel::default(),
666            max_poll_records: 1000,
667            partition_assignment_strategy: AssignmentStrategy::Range,
668            fetch_min_bytes: None,
669            fetch_max_bytes: None,
670            fetch_max_wait_ms: None,
671            max_partition_fetch_bytes: None,
672            max_out_of_orderness: Duration::from_secs(5),
673            idle_timeout: Duration::from_secs(30),
674            enable_watermark_tracking: false,
675            session_timeout: Duration::from_secs(45),
676            heartbeat_interval: Duration::from_secs(10),
677            max_poll_interval: Duration::from_secs(600),
678            queued_max_messages_kbytes: 16384,
679            broker_commit_interval: Duration::from_secs(5),
680            reader_channel_capacity: 1024,
681            backpressure_high_watermark: 0.8,
682            backpressure_low_watermark: 0.5,
683            max_deser_error_rate: 0.5,
684            kafka_properties: HashMap::new(),
685        }
686    }
687}
688
689impl KafkaSourceConfig {
690    /// Parses a [`KafkaSourceConfig`] from a [`ConnectorConfig`].
691    ///
692    /// # Errors
693    ///
694    /// Returns `ConnectorError` if required fields are missing or values are invalid.
695    #[allow(deprecated, clippy::too_many_lines)]
696    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
697        let bootstrap_servers = config.require("bootstrap.servers")?.to_string();
698        let group_id = config.require("group.id")?.to_string();
699
700        let subscription = if let Some(pattern) = config.get("topic.pattern") {
701            TopicSubscription::Pattern(pattern.to_string())
702        } else {
703            let topics_str = config.require("topic")?;
704            let topics: Vec<String> = topics_str
705                .split(',')
706                .map(|s| s.trim().to_string())
707                .collect();
708            TopicSubscription::Topics(topics.clone())
709        };
710
711        let security_protocol = match config.get("security.protocol") {
712            Some(s) => s.parse::<SecurityProtocol>()?,
713            None => SecurityProtocol::default(),
714        };
715
716        let sasl_mechanism = match config.get("sasl.mechanism") {
717            Some(s) => Some(s.parse::<SaslMechanism>()?),
718            None => None,
719        };
720
721        let sasl_username = config.get("sasl.username").map(String::from);
722        let sasl_password = config.get("sasl.password").map(String::from);
723        let ssl_ca_location = config.get("ssl.ca.location").map(String::from);
724        let ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
725        let ssl_key_location = config.get("ssl.key.location").map(String::from);
726        let ssl_key_password = config.get("ssl.key.password").map(String::from);
727
728        let format = match config.get("format") {
729            Some(f) => f
730                .parse::<Format>()
731                .map_err(|e| ConnectorError::ConfigurationError(e.to_string()))?,
732            None => Format::Json,
733        };
734
735        let schema_registry_url = config.get("schema.registry.url").map(String::from);
736
737        let schema_registry_auth = match (
738            config.get("schema.registry.username"),
739            config.get("schema.registry.password"),
740        ) {
741            (Some(u), Some(p)) => Some(SrAuth {
742                username: u.to_string(),
743                password: p.to_string(),
744            }),
745            (Some(_), None) | (None, Some(_)) => {
746                return Err(ConnectorError::ConfigurationError(
747                    "schema.registry.username and schema.registry.password must both be set"
748                        .to_string(),
749                ));
750            }
751            (None, None) => None,
752        };
753
754        let schema_compatibility = match config.get("schema.compatibility") {
755            Some(s) => Some(s.parse::<CompatibilityLevel>()?),
756            None => None,
757        };
758
759        let schema_evolution_strategy = match config.get("schema.evolution.strategy") {
760            Some(s) => s.parse::<SchemaEvolutionStrategy>()?,
761            None => SchemaEvolutionStrategy::default(),
762        };
763
764        let schema_registry_ssl_ca_location = config
765            .get("schema.registry.ssl.ca.location")
766            .map(String::from);
767        let schema_registry_ssl_certificate_location = config
768            .get("schema.registry.ssl.certificate.location")
769            .map(String::from);
770        let schema_registry_ssl_key_location = config
771            .get("schema.registry.ssl.key.location")
772            .map(String::from);
773
774        let schema_registry_subject_strategy =
775            match config.get("schema.registry.subject.name.strategy") {
776                Some(s) => s.parse::<SubjectNameStrategy>()?,
777                None => SubjectNameStrategy::default(),
778            };
779
780        let schema_registry_record_name =
781            config.get("schema.registry.record.name").map(String::from);
782
783        if matches!(
784            schema_registry_subject_strategy,
785            SubjectNameStrategy::RecordName | SubjectNameStrategy::TopicRecordName
786        ) && schema_registry_record_name.is_none()
787        {
788            return Err(ConnectorError::ConfigurationError(format!(
789                "schema.registry.subject.name.strategy={schema_registry_subject_strategy} \
790                 requires schema.registry.record.name"
791            )));
792        }
793
794        let schema_registry_discovery_timeout = config
795            .get_parsed::<u64>("schema.registry.discovery.timeout.ms")?
796            .map_or(Duration::from_secs(10), Duration::from_millis);
797
798        let event_time_column = config.get("event.time.column").map(String::from);
799
800        let include_metadata = config
801            .get_parsed::<bool>("include.metadata")?
802            .unwrap_or(false);
803
804        let include_headers = config
805            .get_parsed::<bool>("include.headers")?
806            .unwrap_or(false);
807
808        let startup_mode = if let Some(offsets_str) = config.get("startup.specific.offsets") {
809            let offsets = parse_specific_offsets(offsets_str)?;
810            StartupMode::SpecificOffsets(offsets)
811        } else if let Some(ts_str) = config.get("startup.timestamp.ms") {
812            let ts: i64 = ts_str.parse().map_err(|_| {
813                ConnectorError::ConfigurationError(format!(
814                    "invalid startup.timestamp.ms: '{ts_str}'"
815                ))
816            })?;
817            StartupMode::Timestamp(ts)
818        } else {
819            match config.get("startup.mode") {
820                Some(s) => s.parse::<StartupMode>()?,
821                None => StartupMode::default(),
822            }
823        };
824
825        // StartupMode::Earliest/Latest override auto.offset.reset so that
826        // `startup.mode = latest` works without requiring the user to also
827        // set `auto.offset.reset = latest` (the previous disconnect was a
828        // silent data-correctness bug: consumers started from earliest).
829        let auto_offset_reset = match &startup_mode {
830            StartupMode::Earliest => OffsetReset::Earliest,
831            StartupMode::Latest => OffsetReset::Latest,
832            _ => match config.get("auto.offset.reset") {
833                Some(s) => s.parse::<OffsetReset>()?,
834                None => OffsetReset::Earliest,
835            },
836        };
837
838        let isolation_level = match config.get("isolation.level") {
839            Some(s) => s.parse::<IsolationLevel>()?,
840            None => IsolationLevel::default(),
841        };
842
843        let max_poll_records = config
844            .get_parsed::<usize>("max.poll.records")?
845            .unwrap_or(1000);
846
847        let partition_assignment_strategy = match config.get("partition.assignment.strategy") {
848            Some(s) => s.parse::<AssignmentStrategy>()?,
849            None => AssignmentStrategy::Range,
850        };
851
852        let fetch_min_bytes = config.get_parsed::<i32>("fetch.min.bytes")?;
853        let fetch_max_bytes = config.get_parsed::<i32>("fetch.max.bytes")?;
854        let fetch_max_wait_ms = config.get_parsed::<i32>("fetch.max.wait.ms")?;
855        let max_partition_fetch_bytes = config.get_parsed::<i32>("max.partition.fetch.bytes")?;
856
857        let max_out_of_orderness_ms = config
858            .get_parsed::<u64>("max.out.of.orderness.ms")?
859            .unwrap_or(5000);
860
861        let idle_timeout_ms = config
862            .get_parsed::<u64>("idle.timeout.ms")?
863            .unwrap_or(30_000);
864
865        let enable_watermark_tracking = config
866            .get_parsed::<bool>("enable.watermark.tracking")?
867            .unwrap_or(false);
868
869        let session_timeout_ms = config
870            .get_parsed::<u64>("session.timeout.ms")?
871            .unwrap_or(45_000);
872        let heartbeat_interval_ms = config
873            .get_parsed::<u64>("heartbeat.interval.ms")?
874            .unwrap_or(10_000);
875        let queued_max_messages_kbytes = config
876            .get_parsed::<u32>("queued.max.messages.kbytes")?
877            .unwrap_or(16384);
878        let max_poll_interval_ms = config
879            .get_parsed::<u64>("max.poll.interval.ms")?
880            .unwrap_or(600_000);
881
882        let broker_commit_interval_ms = config
883            .get_parsed::<u64>("broker.commit.interval.ms")?
884            .unwrap_or(5_000);
885
886        let reader_channel_capacity = config
887            .get_parsed::<usize>("reader.channel.capacity")?
888            .unwrap_or(1024);
889
890        let backpressure_high_watermark = config
891            .get_parsed::<f64>("backpressure.high.watermark")?
892            .unwrap_or(0.8);
893
894        let backpressure_low_watermark = config
895            .get_parsed::<f64>("backpressure.low.watermark")?
896            .unwrap_or(0.5);
897
898        let max_deser_error_rate = config
899            .get_parsed::<f64>("max.deser.error.rate")?
900            .unwrap_or(0.5);
901
902        let kafka_properties = config.properties_with_prefix("kafka.");
903
904        let cfg = Self {
905            bootstrap_servers,
906            group_id,
907            subscription,
908            security_protocol,
909            sasl_mechanism,
910            sasl_username,
911            sasl_password,
912            ssl_ca_location,
913            ssl_certificate_location,
914            ssl_key_location,
915            ssl_key_password,
916            format,
917            schema_registry_url,
918            schema_registry_auth,
919            schema_compatibility,
920            schema_evolution_strategy,
921            schema_registry_ssl_ca_location,
922            schema_registry_ssl_certificate_location,
923            schema_registry_ssl_key_location,
924            schema_registry_subject_strategy,
925            schema_registry_record_name,
926            schema_registry_discovery_timeout,
927            event_time_column,
928            include_metadata,
929            include_headers,
930            startup_mode,
931            auto_offset_reset,
932            isolation_level,
933            max_poll_records,
934            partition_assignment_strategy,
935            fetch_min_bytes,
936            fetch_max_bytes,
937            fetch_max_wait_ms,
938            max_partition_fetch_bytes,
939            max_out_of_orderness: Duration::from_millis(max_out_of_orderness_ms),
940            idle_timeout: Duration::from_millis(idle_timeout_ms),
941            enable_watermark_tracking,
942            session_timeout: Duration::from_millis(session_timeout_ms),
943            heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
944            max_poll_interval: Duration::from_millis(max_poll_interval_ms),
945            queued_max_messages_kbytes,
946            broker_commit_interval: Duration::from_millis(broker_commit_interval_ms),
947            reader_channel_capacity,
948            backpressure_high_watermark,
949            backpressure_low_watermark,
950            max_deser_error_rate,
951            kafka_properties,
952        };
953
954        cfg.validate()?;
955        Ok(cfg)
956    }
957
958    /// Validates the configuration.
959    ///
960    /// # Errors
961    ///
962    /// Returns `ConnectorError::ConfigurationError` if the configuration is invalid.
963    pub fn validate(&self) -> Result<(), ConnectorError> {
964        if self.bootstrap_servers.is_empty() {
965            return Err(ConnectorError::ConfigurationError(
966                "bootstrap.servers cannot be empty".into(),
967            ));
968        }
969        if self.group_id.is_empty() {
970            return Err(ConnectorError::ConfigurationError(
971                "group.id cannot be empty".into(),
972            ));
973        }
974
975        // Validate topic subscription
976        match &self.subscription {
977            TopicSubscription::Topics(t) if t.is_empty() => {
978                return Err(ConnectorError::ConfigurationError(
979                    "at least one topic is required (or use topic.pattern)".into(),
980                ));
981            }
982            TopicSubscription::Pattern(p) if p.is_empty() => {
983                return Err(ConnectorError::ConfigurationError(
984                    "topic.pattern cannot be empty".into(),
985                ));
986            }
987            _ => {}
988        }
989
990        if self.max_poll_records == 0 {
991            return Err(ConnectorError::ConfigurationError(
992                "max.poll.records must be > 0".into(),
993            ));
994        }
995        if self.reader_channel_capacity < self.max_poll_records {
996            return Err(ConnectorError::ConfigurationError(format!(
997                "reader.channel.capacity ({}) must be >= max.poll.records ({})",
998                self.reader_channel_capacity, self.max_poll_records
999            )));
1000        }
1001
1002        if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
1003            return Err(ConnectorError::ConfigurationError(
1004                "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
1005                    .into(),
1006            ));
1007        }
1008
1009        if let Some(mechanism) = &self.sasl_mechanism {
1010            if mechanism.requires_credentials()
1011                && (self.sasl_username.is_none() || self.sasl_password.is_none())
1012            {
1013                return Err(ConnectorError::ConfigurationError(format!(
1014                    "sasl.username and sasl.password are required for {mechanism} mechanism"
1015                )));
1016            }
1017        }
1018
1019        if self.security_protocol.uses_ssl() {
1020            if let Some(ref ca) = self.ssl_ca_location {
1021                if ca.is_empty() {
1022                    return Err(ConnectorError::ConfigurationError(
1023                        "ssl.ca.location cannot be empty when specified".into(),
1024                    ));
1025                }
1026            }
1027        }
1028
1029        if self.max_poll_interval.as_secs() < 60 {
1030            return Err(ConnectorError::ConfigurationError(format!(
1031                "max.poll.interval.ms ({}) must be >= 60000 (60s)",
1032                self.max_poll_interval.as_millis()
1033            )));
1034        }
1035
1036        // Kafka broker requires heartbeat_interval * 3 < session_timeout.
1037        if self.heartbeat_interval.as_millis() * 3 >= self.session_timeout.as_millis() {
1038            return Err(ConnectorError::ConfigurationError(format!(
1039                "heartbeat.interval.ms ({}) * 3 must be < session.timeout.ms ({})",
1040                self.heartbeat_interval.as_millis(),
1041                self.session_timeout.as_millis()
1042            )));
1043        }
1044
1045        if self.backpressure_high_watermark <= self.backpressure_low_watermark {
1046            return Err(ConnectorError::ConfigurationError(
1047                "backpressure.high.watermark must be > backpressure.low.watermark".into(),
1048            ));
1049        }
1050        if !(0.0..=1.0).contains(&self.backpressure_high_watermark) {
1051            return Err(ConnectorError::ConfigurationError(
1052                "backpressure.high.watermark must be between 0.0 and 1.0".into(),
1053            ));
1054        }
1055        if !(0.0..=1.0).contains(&self.backpressure_low_watermark) {
1056            return Err(ConnectorError::ConfigurationError(
1057                "backpressure.low.watermark must be between 0.0 and 1.0".into(),
1058            ));
1059        }
1060
1061        if !(0.0..=1.0).contains(&self.max_deser_error_rate) {
1062            return Err(ConnectorError::ConfigurationError(
1063                "max.deser.error.rate must be between 0.0 and 1.0".into(),
1064            ));
1065        }
1066
1067        if self.format == Format::Avro && self.schema_registry_url.is_none() {
1068            return Err(ConnectorError::ConfigurationError(
1069                "schema.registry.url is required when format is 'avro'".into(),
1070            ));
1071        }
1072
1073        Ok(())
1074    }
1075
1076    /// Builds an rdkafka [`ClientConfig`] from this configuration.
1077    #[must_use]
1078    pub fn to_rdkafka_config(&self) -> ClientConfig {
1079        let mut config = ClientConfig::new();
1080
1081        config.set("bootstrap.servers", &self.bootstrap_servers);
1082        config.set("group.id", &self.group_id);
1083        config.set("enable.auto.commit", "false");
1084        config.set("enable.auto.offset.store", "false");
1085        config.set("auto.offset.reset", self.auto_offset_reset.as_rdkafka_str());
1086        config.set(
1087            "partition.assignment.strategy",
1088            self.partition_assignment_strategy.as_rdkafka_str(),
1089        );
1090        config.set("security.protocol", self.security_protocol.as_rdkafka_str());
1091
1092        if let Some(ref mechanism) = self.sasl_mechanism {
1093            config.set("sasl.mechanism", mechanism.as_rdkafka_str());
1094        }
1095        if let Some(ref username) = self.sasl_username {
1096            config.set("sasl.username", username);
1097        }
1098        if let Some(ref password) = self.sasl_password {
1099            config.set("sasl.password", password);
1100        }
1101        if let Some(ref ca) = self.ssl_ca_location {
1102            config.set("ssl.ca.location", ca);
1103        }
1104        if let Some(ref cert) = self.ssl_certificate_location {
1105            config.set("ssl.certificate.location", cert);
1106        }
1107        if let Some(ref key) = self.ssl_key_location {
1108            config.set("ssl.key.location", key);
1109        }
1110        if let Some(ref key_pass) = self.ssl_key_password {
1111            config.set("ssl.key.password", key_pass);
1112        }
1113
1114        config.set("isolation.level", self.isolation_level.as_rdkafka_str());
1115        config.set(
1116            "session.timeout.ms",
1117            self.session_timeout.as_millis().to_string(),
1118        );
1119        config.set(
1120            "heartbeat.interval.ms",
1121            self.heartbeat_interval.as_millis().to_string(),
1122        );
1123        config.set(
1124            "queued.max.messages.kbytes",
1125            self.queued_max_messages_kbytes.to_string(),
1126        );
1127        config.set(
1128            "max.poll.interval.ms",
1129            self.max_poll_interval.as_millis().to_string(),
1130        );
1131
1132        if let Some(fetch_min) = self.fetch_min_bytes {
1133            config.set("fetch.min.bytes", fetch_min.to_string());
1134        }
1135
1136        if let Some(fetch_max) = self.fetch_max_bytes {
1137            config.set("fetch.max.bytes", fetch_max.to_string());
1138        }
1139
1140        if let Some(wait_ms) = self.fetch_max_wait_ms {
1141            config.set("fetch.wait.max.ms", wait_ms.to_string());
1142        }
1143
1144        if let Some(partition_max) = self.max_partition_fetch_bytes {
1145            config.set("max.partition.fetch.bytes", partition_max.to_string());
1146        }
1147
1148        // Apply pass-through properties, blocking security-critical keys
1149        // that could silently downgrade authentication or break semantics.
1150        for (key, value) in &self.kafka_properties {
1151            if is_blocked_passthrough_key(key) {
1152                tracing::warn!(
1153                    key,
1154                    "ignoring kafka.* pass-through property that overrides a security setting"
1155                );
1156                continue;
1157            }
1158            config.set(key, value);
1159        }
1160
1161        config
1162    }
1163}
1164
1165/// Returns `true` if a pass-through kafka.* key must not override explicit settings.
1166fn is_blocked_passthrough_key(key: &str) -> bool {
1167    key.starts_with("sasl.kerberos.")
1168        || matches!(
1169            key,
1170            "security.protocol"
1171                | "sasl.mechanism"
1172                | "sasl.username"
1173                | "sasl.password"
1174                | "sasl.oauthbearer.config"
1175                | "ssl.ca.location"
1176                | "ssl.certificate.location"
1177                | "ssl.key.location"
1178                | "ssl.key.password"
1179                | "ssl.endpoint.identification.algorithm"
1180                | "enable.auto.commit"
1181                | "enable.auto.offset.store"
1182                | "enable.idempotence"
1183                | "auto.offset.reset"
1184                | "session.timeout.ms"
1185                | "heartbeat.interval.ms"
1186                | "max.poll.interval.ms"
1187                | "queued.max.messages.kbytes"
1188        )
1189}
1190
1191/// Parses a specific offsets string in the format "partition:offset,partition:offset,...".
1192///
1193/// Example: "0:100,1:200,2:300" maps partition 0 to offset 100, partition 1 to offset 200, etc.
1194fn parse_specific_offsets(s: &str) -> Result<HashMap<i32, i64>, ConnectorError> {
1195    let mut offsets = HashMap::new();
1196
1197    for pair in s.split(',') {
1198        let pair = pair.trim();
1199        if pair.is_empty() {
1200            continue;
1201        }
1202
1203        let parts: Vec<&str> = pair.split(':').collect();
1204        if parts.len() != 2 {
1205            return Err(ConnectorError::ConfigurationError(format!(
1206                "invalid offset format '{pair}' (expected 'partition:offset')"
1207            )));
1208        }
1209
1210        let partition: i32 = parts[0].trim().parse().map_err(|_| {
1211            ConnectorError::ConfigurationError(format!(
1212                "invalid partition number '{}' in '{pair}'",
1213                parts[0]
1214            ))
1215        })?;
1216
1217        let offset: i64 = parts[1].trim().parse().map_err(|_| {
1218            ConnectorError::ConfigurationError(format!("invalid offset '{}' in '{pair}'", parts[1]))
1219        })?;
1220
1221        offsets.insert(partition, offset);
1222    }
1223
1224    if offsets.is_empty() {
1225        return Err(ConnectorError::ConfigurationError(
1226            "startup.specific.offsets cannot be empty".into(),
1227        ));
1228    }
1229
1230    Ok(offsets)
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235    use super::*;
1236
1237    fn make_config(extra: &[(&str, &str)]) -> ConnectorConfig {
1238        let mut config = ConnectorConfig::new("kafka");
1239        config.set("bootstrap.servers", "localhost:9092");
1240        config.set("group.id", "test-group");
1241        config.set("topic", "events");
1242        for (k, v) in extra {
1243            config.set(*k, *v);
1244        }
1245        config
1246    }
1247
1248    #[test]
1249    #[allow(deprecated)]
1250    fn test_parse_required_fields() {
1251        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1252        assert_eq!(cfg.bootstrap_servers, "localhost:9092");
1253        assert_eq!(cfg.group_id, "test-group");
1254        assert_eq!(cfg.subscription.topics().unwrap(), &["events"]);
1255        assert!(matches!(
1256            cfg.subscription,
1257            TopicSubscription::Topics(ref t) if t == &["events"]
1258        ));
1259    }
1260
1261    #[test]
1262    fn test_parse_missing_required() {
1263        let config = ConnectorConfig::new("kafka");
1264        assert!(KafkaSourceConfig::from_config(&config).is_err());
1265    }
1266
1267    #[test]
1268    #[allow(deprecated)]
1269    fn test_parse_multi_topic() {
1270        let cfg = KafkaSourceConfig::from_config(&make_config(&[("topic", "a, b, c")])).unwrap();
1271        assert_eq!(cfg.subscription.topics().unwrap(), &["a", "b", "c"]);
1272        assert!(matches!(
1273            cfg.subscription,
1274            TopicSubscription::Topics(ref t) if t == &["a", "b", "c"]
1275        ));
1276    }
1277
1278    #[test]
1279    fn test_parse_topic_pattern() {
1280        let mut config = ConnectorConfig::new("kafka");
1281        config.set("bootstrap.servers", "localhost:9092");
1282        config.set("group.id", "test-group");
1283        config.set("topic.pattern", "events-.*");
1284
1285        let cfg = KafkaSourceConfig::from_config(&config).unwrap();
1286        assert!(matches!(
1287            cfg.subscription,
1288            TopicSubscription::Pattern(ref p) if p == "events-.*"
1289        ));
1290        assert!(cfg.subscription.is_pattern());
1291        assert_eq!(cfg.subscription.pattern(), Some("events-.*"));
1292        assert!(cfg.subscription.topics().is_none());
1293    }
1294
1295    #[test]
1296    fn test_parse_defaults() {
1297        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1298        assert_eq!(cfg.format, Format::Json);
1299        assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1300        assert_eq!(cfg.isolation_level, IsolationLevel::ReadCommitted);
1301        assert_eq!(cfg.max_poll_records, 1000);
1302        assert_eq!(cfg.partition_assignment_strategy, AssignmentStrategy::Range);
1303        assert!(!cfg.include_metadata);
1304        assert!(!cfg.include_headers);
1305        assert!(cfg.schema_registry_url.is_none());
1306        assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
1307        assert!(cfg.sasl_mechanism.is_none());
1308        assert_eq!(cfg.broker_commit_interval, Duration::from_secs(5));
1309        assert_eq!(cfg.reader_channel_capacity, 1024);
1310    }
1311
1312    #[test]
1313    fn test_parse_broker_commit_interval() {
1314        let cfg =
1315            KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "5000")]))
1316                .unwrap();
1317        assert_eq!(cfg.broker_commit_interval, Duration::from_secs(5));
1318    }
1319
1320    #[test]
1321    fn test_parse_broker_commit_interval_zero_disables() {
1322        let cfg =
1323            KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "0")]))
1324                .unwrap();
1325        assert!(cfg.broker_commit_interval.is_zero());
1326    }
1327
1328    #[test]
1329    fn test_parse_optional_fields() {
1330        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1331            ("format", "csv"),
1332            ("auto.offset.reset", "latest"),
1333            ("max.poll.records", "500"),
1334            ("include.metadata", "true"),
1335            ("include.headers", "true"),
1336            ("event.time.column", "ts"),
1337            ("partition.assignment.strategy", "roundrobin"),
1338            ("isolation.level", "read_uncommitted"),
1339        ]))
1340        .unwrap();
1341
1342        assert_eq!(cfg.format, Format::Csv);
1343        assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1344        assert_eq!(cfg.isolation_level, IsolationLevel::ReadUncommitted);
1345        assert_eq!(cfg.max_poll_records, 500);
1346        assert!(cfg.include_metadata);
1347        assert!(cfg.include_headers);
1348        assert_eq!(cfg.event_time_column, Some("ts".to_string()));
1349        assert_eq!(
1350            cfg.partition_assignment_strategy,
1351            AssignmentStrategy::RoundRobin
1352        );
1353    }
1354
1355    #[test]
1356    fn test_parse_security_sasl_ssl() {
1357        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1358            ("security.protocol", "sasl_ssl"),
1359            ("sasl.mechanism", "SCRAM-SHA-256"),
1360            ("sasl.username", "alice"),
1361            ("sasl.password", "secret"),
1362            ("ssl.ca.location", "/etc/ssl/ca.pem"),
1363        ]))
1364        .unwrap();
1365
1366        assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
1367        assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha256));
1368        assert_eq!(cfg.sasl_username, Some("alice".to_string()));
1369        assert_eq!(cfg.sasl_password, Some("secret".to_string()));
1370        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1371    }
1372
1373    #[test]
1374    fn test_parse_security_ssl_only() {
1375        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1376            ("security.protocol", "ssl"),
1377            ("ssl.ca.location", "/etc/ssl/ca.pem"),
1378            ("ssl.certificate.location", "/etc/ssl/client.pem"),
1379            ("ssl.key.location", "/etc/ssl/client.key"),
1380            ("ssl.key.password", "keypass"),
1381        ]))
1382        .unwrap();
1383
1384        assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
1385        assert!(cfg.security_protocol.uses_ssl());
1386        assert!(!cfg.security_protocol.uses_sasl());
1387        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1388        assert_eq!(
1389            cfg.ssl_certificate_location,
1390            Some("/etc/ssl/client.pem".to_string())
1391        );
1392        assert_eq!(
1393            cfg.ssl_key_location,
1394            Some("/etc/ssl/client.key".to_string())
1395        );
1396        assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
1397    }
1398
1399    #[test]
1400    fn test_parse_fetch_tuning() {
1401        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1402            ("fetch.min.bytes", "1024"),
1403            ("fetch.max.bytes", "52428800"),
1404            ("fetch.max.wait.ms", "500"),
1405            ("max.partition.fetch.bytes", "1048576"),
1406        ]))
1407        .unwrap();
1408
1409        assert_eq!(cfg.fetch_min_bytes, Some(1024));
1410        assert_eq!(cfg.fetch_max_bytes, Some(52_428_800));
1411        assert_eq!(cfg.fetch_max_wait_ms, Some(500));
1412        assert_eq!(cfg.max_partition_fetch_bytes, Some(1_048_576));
1413    }
1414
1415    #[test]
1416    fn test_parse_kafka_passthrough() {
1417        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1418            ("kafka.session.timeout.ms", "30000"),
1419            ("kafka.max.poll.interval.ms", "300000"),
1420            ("kafka.fetch.message.max.bytes", "2097152"),
1421        ]))
1422        .unwrap();
1423
1424        assert_eq!(cfg.kafka_properties.len(), 3);
1425        // session.timeout.ms and max.poll.interval.ms are passed through as
1426        // kafka_properties, but blocked by is_blocked_passthrough_key() in
1427        // to_rdkafka_config() — they won't override explicit settings.
1428        assert_eq!(
1429            cfg.kafka_properties.get("session.timeout.ms"),
1430            Some(&"30000".to_string())
1431        );
1432        assert_eq!(
1433            cfg.kafka_properties.get("max.poll.interval.ms"),
1434            Some(&"300000".to_string())
1435        );
1436    }
1437
1438    #[test]
1439    fn test_parse_schema_registry() {
1440        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1441            ("format", "avro"),
1442            ("schema.registry.url", "http://localhost:8081"),
1443            ("schema.registry.username", "user"),
1444            ("schema.registry.password", "pass"),
1445            ("schema.compatibility", "FULL_TRANSITIVE"),
1446            ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
1447        ]))
1448        .unwrap();
1449
1450        assert_eq!(cfg.format, Format::Avro);
1451        assert_eq!(
1452            cfg.schema_registry_url,
1453            Some("http://localhost:8081".to_string())
1454        );
1455        assert!(cfg.schema_registry_auth.is_some());
1456        let auth = cfg.schema_registry_auth.unwrap();
1457        assert_eq!(auth.username, "user");
1458        assert_eq!(auth.password, "pass");
1459        assert_eq!(
1460            cfg.schema_compatibility,
1461            Some(CompatibilityLevel::FullTransitive)
1462        );
1463        assert_eq!(
1464            cfg.schema_registry_ssl_ca_location,
1465            Some("/etc/ssl/sr-ca.pem".to_string())
1466        );
1467    }
1468
1469    #[test]
1470    fn test_parse_sr_auth_partial() {
1471        let config = make_config(&[
1472            ("schema.registry.url", "http://localhost:8081"),
1473            ("schema.registry.username", "user"),
1474            // missing password
1475        ]);
1476        assert!(KafkaSourceConfig::from_config(&config).is_err());
1477    }
1478
1479    #[test]
1480    fn test_validate_avro_without_sr() {
1481        let mut cfg = KafkaSourceConfig::default();
1482        cfg.bootstrap_servers = "localhost:9092".into();
1483        cfg.group_id = "g".into();
1484        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1485        cfg.format = Format::Avro;
1486        // No schema_registry_url
1487        assert!(cfg.validate().is_err());
1488    }
1489
1490    #[test]
1491    fn test_validate_backpressure_watermarks() {
1492        let mut cfg = KafkaSourceConfig::default();
1493        cfg.bootstrap_servers = "localhost:9092".into();
1494        cfg.group_id = "g".into();
1495        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1496        cfg.backpressure_high_watermark = 0.3;
1497        cfg.backpressure_low_watermark = 0.5;
1498        assert!(cfg.validate().is_err());
1499    }
1500
1501    #[test]
1502    fn test_validate_sasl_without_mechanism() {
1503        let mut cfg = KafkaSourceConfig::default();
1504        cfg.bootstrap_servers = "localhost:9092".into();
1505        cfg.group_id = "g".into();
1506        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1507        cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1508        // sasl_mechanism not set
1509        assert!(cfg.validate().is_err());
1510    }
1511
1512    #[test]
1513    fn test_validate_sasl_plain_without_credentials() {
1514        let mut cfg = KafkaSourceConfig::default();
1515        cfg.bootstrap_servers = "localhost:9092".into();
1516        cfg.group_id = "g".into();
1517        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1518        cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1519        cfg.sasl_mechanism = Some(SaslMechanism::Plain);
1520        // username/password not set
1521        assert!(cfg.validate().is_err());
1522    }
1523
1524    #[test]
1525    fn test_validate_empty_topic_pattern() {
1526        let mut cfg = KafkaSourceConfig::default();
1527        cfg.bootstrap_servers = "localhost:9092".into();
1528        cfg.group_id = "g".into();
1529        cfg.subscription = TopicSubscription::Pattern(String::new());
1530        assert!(cfg.validate().is_err());
1531    }
1532
1533    #[test]
1534    fn test_rdkafka_config() {
1535        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1536            ("auto.offset.reset", "latest"),
1537            ("kafka.fetch.min.bytes", "1024"),
1538        ]))
1539        .unwrap();
1540
1541        let rdkafka = cfg.to_rdkafka_config();
1542        assert_eq!(rdkafka.get("bootstrap.servers"), Some("localhost:9092"));
1543        assert_eq!(rdkafka.get("group.id"), Some("test-group"));
1544        assert_eq!(rdkafka.get("enable.auto.commit"), Some("false"));
1545        assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1546        assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1547        assert_eq!(rdkafka.get("security.protocol"), Some("plaintext"));
1548        assert_eq!(rdkafka.get("isolation.level"), Some("read_committed"));
1549    }
1550
1551    #[test]
1552    fn test_rdkafka_config_with_security() {
1553        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1554            ("security.protocol", "sasl_ssl"),
1555            ("sasl.mechanism", "PLAIN"),
1556            ("sasl.username", "user"),
1557            ("sasl.password", "pass"),
1558            ("ssl.ca.location", "/ca.pem"),
1559        ]))
1560        .unwrap();
1561
1562        let rdkafka = cfg.to_rdkafka_config();
1563        assert_eq!(rdkafka.get("security.protocol"), Some("sasl_ssl"));
1564        assert_eq!(rdkafka.get("sasl.mechanism"), Some("PLAIN"));
1565        assert_eq!(rdkafka.get("sasl.username"), Some("user"));
1566        assert_eq!(rdkafka.get("sasl.password"), Some("pass"));
1567        assert_eq!(rdkafka.get("ssl.ca.location"), Some("/ca.pem"));
1568    }
1569
1570    #[test]
1571    fn test_rdkafka_config_with_fetch_tuning() {
1572        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1573            ("fetch.min.bytes", "1024"),
1574            ("fetch.max.bytes", "1048576"),
1575            ("fetch.max.wait.ms", "500"),
1576            ("max.partition.fetch.bytes", "262144"),
1577            ("isolation.level", "read_uncommitted"),
1578        ]))
1579        .unwrap();
1580
1581        let rdkafka = cfg.to_rdkafka_config();
1582        assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1583        assert_eq!(rdkafka.get("fetch.max.bytes"), Some("1048576"));
1584        assert_eq!(rdkafka.get("fetch.wait.max.ms"), Some("500"));
1585        assert_eq!(rdkafka.get("max.partition.fetch.bytes"), Some("262144"));
1586        assert_eq!(rdkafka.get("isolation.level"), Some("read_uncommitted"));
1587    }
1588
1589    #[test]
1590    fn test_offset_reset_parsing() {
1591        assert_eq!(
1592            "earliest".parse::<OffsetReset>().unwrap(),
1593            OffsetReset::Earliest
1594        );
1595        assert_eq!(
1596            "latest".parse::<OffsetReset>().unwrap(),
1597            OffsetReset::Latest
1598        );
1599        assert_eq!("none".parse::<OffsetReset>().unwrap(), OffsetReset::None);
1600        assert!("invalid".parse::<OffsetReset>().is_err());
1601    }
1602
1603    #[test]
1604    fn test_compatibility_level_parsing() {
1605        assert_eq!(
1606            "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
1607            CompatibilityLevel::Backward
1608        );
1609        assert_eq!(
1610            "full_transitive".parse::<CompatibilityLevel>().unwrap(),
1611            CompatibilityLevel::FullTransitive
1612        );
1613        assert_eq!(
1614            "NONE".parse::<CompatibilityLevel>().unwrap(),
1615            CompatibilityLevel::None
1616        );
1617        assert!("invalid".parse::<CompatibilityLevel>().is_err());
1618    }
1619
1620    #[test]
1621    fn test_security_protocol_parsing() {
1622        assert_eq!(
1623            "plaintext".parse::<SecurityProtocol>().unwrap(),
1624            SecurityProtocol::Plaintext
1625        );
1626        assert_eq!(
1627            "SSL".parse::<SecurityProtocol>().unwrap(),
1628            SecurityProtocol::Ssl
1629        );
1630        assert_eq!(
1631            "sasl_plaintext".parse::<SecurityProtocol>().unwrap(),
1632            SecurityProtocol::SaslPlaintext
1633        );
1634        assert_eq!(
1635            "SASL_SSL".parse::<SecurityProtocol>().unwrap(),
1636            SecurityProtocol::SaslSsl
1637        );
1638        assert_eq!(
1639            "sasl-ssl".parse::<SecurityProtocol>().unwrap(),
1640            SecurityProtocol::SaslSsl
1641        );
1642        assert!("invalid".parse::<SecurityProtocol>().is_err());
1643    }
1644
1645    #[test]
1646    fn test_sasl_mechanism_parsing() {
1647        assert_eq!(
1648            "PLAIN".parse::<SaslMechanism>().unwrap(),
1649            SaslMechanism::Plain
1650        );
1651        assert_eq!(
1652            "SCRAM-SHA-256".parse::<SaslMechanism>().unwrap(),
1653            SaslMechanism::ScramSha256
1654        );
1655        assert_eq!(
1656            "scram_sha_512".parse::<SaslMechanism>().unwrap(),
1657            SaslMechanism::ScramSha512
1658        );
1659        assert_eq!(
1660            "GSSAPI".parse::<SaslMechanism>().unwrap(),
1661            SaslMechanism::Gssapi
1662        );
1663        assert_eq!(
1664            "OAUTHBEARER".parse::<SaslMechanism>().unwrap(),
1665            SaslMechanism::Oauthbearer
1666        );
1667        assert!("invalid".parse::<SaslMechanism>().is_err());
1668    }
1669
1670    #[test]
1671    fn test_isolation_level_parsing() {
1672        assert_eq!(
1673            "read_uncommitted".parse::<IsolationLevel>().unwrap(),
1674            IsolationLevel::ReadUncommitted
1675        );
1676        assert_eq!(
1677            "read_committed".parse::<IsolationLevel>().unwrap(),
1678            IsolationLevel::ReadCommitted
1679        );
1680        assert_eq!(
1681            "read-committed".parse::<IsolationLevel>().unwrap(),
1682            IsolationLevel::ReadCommitted
1683        );
1684        assert!("invalid".parse::<IsolationLevel>().is_err());
1685    }
1686
1687    #[test]
1688    fn test_topic_subscription_accessors() {
1689        let topics = TopicSubscription::Topics(vec!["a".into(), "b".into()]);
1690        assert_eq!(
1691            topics.topics(),
1692            Some(&["a".to_string(), "b".to_string()][..])
1693        );
1694        assert!(topics.pattern().is_none());
1695        assert!(!topics.is_pattern());
1696
1697        let pattern = TopicSubscription::Pattern("events-.*".into());
1698        assert!(pattern.topics().is_none());
1699        assert_eq!(pattern.pattern(), Some("events-.*"));
1700        assert!(pattern.is_pattern());
1701    }
1702
1703    #[test]
1704    fn test_security_protocol_helpers() {
1705        assert!(!SecurityProtocol::Plaintext.uses_ssl());
1706        assert!(!SecurityProtocol::Plaintext.uses_sasl());
1707
1708        assert!(SecurityProtocol::Ssl.uses_ssl());
1709        assert!(!SecurityProtocol::Ssl.uses_sasl());
1710
1711        assert!(!SecurityProtocol::SaslPlaintext.uses_ssl());
1712        assert!(SecurityProtocol::SaslPlaintext.uses_sasl());
1713
1714        assert!(SecurityProtocol::SaslSsl.uses_ssl());
1715        assert!(SecurityProtocol::SaslSsl.uses_sasl());
1716    }
1717
1718    #[test]
1719    fn test_sasl_mechanism_helpers() {
1720        assert!(SaslMechanism::Plain.requires_credentials());
1721        assert!(SaslMechanism::ScramSha256.requires_credentials());
1722        assert!(SaslMechanism::ScramSha512.requires_credentials());
1723        assert!(!SaslMechanism::Gssapi.requires_credentials());
1724        assert!(!SaslMechanism::Oauthbearer.requires_credentials());
1725    }
1726
1727    #[test]
1728    fn test_enum_display() {
1729        assert_eq!(SecurityProtocol::SaslSsl.to_string(), "sasl_ssl");
1730        assert_eq!(SaslMechanism::ScramSha256.to_string(), "SCRAM-SHA-256");
1731        assert_eq!(IsolationLevel::ReadCommitted.to_string(), "read_committed");
1732    }
1733
1734    #[test]
1735    fn test_startup_mode_parsing() {
1736        assert_eq!(
1737            "group-offsets".parse::<StartupMode>().unwrap(),
1738            StartupMode::GroupOffsets
1739        );
1740        assert_eq!(
1741            "group_offsets".parse::<StartupMode>().unwrap(),
1742            StartupMode::GroupOffsets
1743        );
1744        assert_eq!(
1745            "earliest".parse::<StartupMode>().unwrap(),
1746            StartupMode::Earliest
1747        );
1748        assert_eq!(
1749            "latest".parse::<StartupMode>().unwrap(),
1750            StartupMode::Latest
1751        );
1752        assert!("invalid".parse::<StartupMode>().is_err());
1753    }
1754
1755    #[test]
1756    fn test_startup_mode_display() {
1757        assert_eq!(StartupMode::GroupOffsets.to_string(), "group-offsets");
1758        assert_eq!(StartupMode::Earliest.to_string(), "earliest");
1759        assert_eq!(StartupMode::Latest.to_string(), "latest");
1760
1761        let specific = StartupMode::SpecificOffsets(HashMap::from([(0, 100), (1, 200)]));
1762        assert!(specific.to_string().contains("2 partitions"));
1763
1764        let ts = StartupMode::Timestamp(1234567890000);
1765        assert!(ts.to_string().contains("1234567890000"));
1766    }
1767
1768    #[test]
1769    fn test_startup_mode_latest_overrides_offset_reset() {
1770        let cfg =
1771            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1772        assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1773        let rdkafka = cfg.to_rdkafka_config();
1774        assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1775    }
1776
1777    #[test]
1778    fn test_startup_mode_earliest_overrides_offset_reset() {
1779        let cfg =
1780            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1781        assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1782    }
1783
1784    #[test]
1785    fn test_startup_mode_group_offsets_uses_explicit_offset_reset() {
1786        // When startup.mode is group-offsets (default), the explicit
1787        // auto.offset.reset setting should be used.
1788        let cfg = KafkaSourceConfig::from_config(&make_config(&[("auto.offset.reset", "latest")]))
1789            .unwrap();
1790        assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1791    }
1792
1793    #[test]
1794    fn test_parse_specific_offsets() {
1795        let offsets = parse_specific_offsets("0:100,1:200,2:300").unwrap();
1796        assert_eq!(offsets.get(&0), Some(&100));
1797        assert_eq!(offsets.get(&1), Some(&200));
1798        assert_eq!(offsets.get(&2), Some(&300));
1799    }
1800
1801    #[test]
1802    fn test_parse_specific_offsets_with_spaces() {
1803        let offsets = parse_specific_offsets(" 0:100 , 1:200 ").unwrap();
1804        assert_eq!(offsets.get(&0), Some(&100));
1805        assert_eq!(offsets.get(&1), Some(&200));
1806    }
1807
1808    #[test]
1809    fn test_parse_specific_offsets_errors() {
1810        assert!(parse_specific_offsets("").is_err());
1811        assert!(parse_specific_offsets("0").is_err());
1812        assert!(parse_specific_offsets("0:abc").is_err());
1813        assert!(parse_specific_offsets("abc:100").is_err());
1814        assert!(parse_specific_offsets("0:100:extra").is_err());
1815    }
1816
1817    #[test]
1818    fn test_parse_startup_mode_from_config() {
1819        let cfg =
1820            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1821        assert_eq!(cfg.startup_mode, StartupMode::Earliest);
1822
1823        let cfg =
1824            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1825        assert_eq!(cfg.startup_mode, StartupMode::Latest);
1826
1827        let cfg =
1828            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "group-offsets")]))
1829                .unwrap();
1830        assert_eq!(cfg.startup_mode, StartupMode::GroupOffsets);
1831    }
1832
1833    #[test]
1834    fn test_parse_startup_specific_offsets_from_config() {
1835        let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1836            "startup.specific.offsets",
1837            "0:100,1:200",
1838        )]))
1839        .unwrap();
1840
1841        match cfg.startup_mode {
1842            StartupMode::SpecificOffsets(offsets) => {
1843                assert_eq!(offsets.get(&0), Some(&100));
1844                assert_eq!(offsets.get(&1), Some(&200));
1845            }
1846            _ => panic!("expected SpecificOffsets"),
1847        }
1848    }
1849
1850    #[test]
1851    fn test_parse_startup_timestamp_from_config() {
1852        let cfg =
1853            KafkaSourceConfig::from_config(&make_config(&[("startup.timestamp.ms", "1234567890")]))
1854                .unwrap();
1855
1856        assert_eq!(cfg.startup_mode, StartupMode::Timestamp(1234567890));
1857    }
1858
1859    #[test]
1860    fn test_parse_schema_registry_ssl_fields() {
1861        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1862            ("schema.registry.url", "https://sr:8081"),
1863            ("schema.registry.ssl.ca.location", "/ca.pem"),
1864            ("schema.registry.ssl.certificate.location", "/cert.pem"),
1865            ("schema.registry.ssl.key.location", "/key.pem"),
1866        ]))
1867        .unwrap();
1868
1869        assert_eq!(
1870            cfg.schema_registry_ssl_ca_location,
1871            Some("/ca.pem".to_string())
1872        );
1873        assert_eq!(
1874            cfg.schema_registry_ssl_certificate_location,
1875            Some("/cert.pem".to_string())
1876        );
1877        assert_eq!(
1878            cfg.schema_registry_ssl_key_location,
1879            Some("/key.pem".to_string())
1880        );
1881    }
1882
1883    #[test]
1884    fn test_parse_watermark_defaults() {
1885        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1886        assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(5));
1887        assert_eq!(cfg.idle_timeout, Duration::from_secs(30));
1888        assert!(!cfg.enable_watermark_tracking);
1889    }
1890
1891    #[test]
1892    fn test_parse_watermark_tracking_enabled() {
1893        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1894            ("enable.watermark.tracking", "true"),
1895            ("max.out.of.orderness.ms", "10000"),
1896            ("idle.timeout.ms", "60000"),
1897        ]))
1898        .unwrap();
1899
1900        assert!(cfg.enable_watermark_tracking);
1901        assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(10));
1902        assert_eq!(cfg.idle_timeout, Duration::from_secs(60));
1903    }
1904
1905    // -- startup.mode = timestamp error --
1906
1907    #[test]
1908    fn test_startup_mode_timestamp_error() {
1909        let config = make_config(&[("startup.mode", "timestamp")]);
1910        let err = KafkaSourceConfig::from_config(&config).unwrap_err();
1911        let msg = err.to_string();
1912        assert!(
1913            msg.contains("startup.timestamp.ms"),
1914            "error should mention startup.timestamp.ms, got: {msg}"
1915        );
1916    }
1917
1918    // -- session timeout / heartbeat interval --
1919
1920    #[test]
1921    fn test_session_timeout_heartbeat_defaults() {
1922        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1923        assert_eq!(cfg.session_timeout, Duration::from_secs(45));
1924        assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
1925    }
1926
1927    #[test]
1928    fn test_session_timeout_heartbeat_custom() {
1929        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1930            ("session.timeout.ms", "60000"),
1931            ("heartbeat.interval.ms", "15000"),
1932        ]))
1933        .unwrap();
1934        assert_eq!(cfg.session_timeout, Duration::from_secs(60));
1935        assert_eq!(cfg.heartbeat_interval, Duration::from_secs(15));
1936    }
1937
1938    #[test]
1939    fn test_session_timeout_heartbeat_validation_fails() {
1940        // heartbeat=20s * 3 = 60s >= session=45s → error
1941        let config = make_config(&[
1942            ("session.timeout.ms", "45000"),
1943            ("heartbeat.interval.ms", "20000"),
1944        ]);
1945        let err = KafkaSourceConfig::from_config(&config).unwrap_err();
1946        let msg = err.to_string();
1947        assert!(msg.contains("heartbeat.interval.ms"), "got: {msg}");
1948    }
1949
1950    #[test]
1951    fn test_session_timeout_heartbeat_validation_passes() {
1952        // heartbeat=10s * 3 = 30s < session=45s → ok
1953        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1954            ("session.timeout.ms", "45000"),
1955            ("heartbeat.interval.ms", "10000"),
1956        ]))
1957        .unwrap();
1958        assert_eq!(cfg.session_timeout, Duration::from_secs(45));
1959        assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
1960    }
1961
1962    #[test]
1963    fn test_session_timeout_heartbeat_in_rdkafka_config() {
1964        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1965            ("session.timeout.ms", "60000"),
1966            ("heartbeat.interval.ms", "15000"),
1967        ]))
1968        .unwrap();
1969        let rdkafka = cfg.to_rdkafka_config();
1970        assert_eq!(rdkafka.get("session.timeout.ms"), Some("60000"));
1971        assert_eq!(rdkafka.get("heartbeat.interval.ms"), Some("15000"));
1972    }
1973
1974    #[test]
1975    fn test_session_timeout_passthrough_blocked() {
1976        let cfg =
1977            KafkaSourceConfig::from_config(&make_config(&[("kafka.session.timeout.ms", "99999")]))
1978                .unwrap();
1979        // The pass-through should be blocked; rdkafka config should use the default (45000).
1980        let rdkafka = cfg.to_rdkafka_config();
1981        assert_eq!(rdkafka.get("session.timeout.ms"), Some("45000"));
1982    }
1983
1984    // -- queued.max.messages.kbytes --
1985
1986    #[test]
1987    fn test_queued_max_messages_kbytes_default() {
1988        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1989        assert_eq!(cfg.queued_max_messages_kbytes, 16384);
1990    }
1991
1992    #[test]
1993    fn test_queued_max_messages_kbytes_custom() {
1994        let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1995            "queued.max.messages.kbytes",
1996            "32768",
1997        )]))
1998        .unwrap();
1999        assert_eq!(cfg.queued_max_messages_kbytes, 32768);
2000    }
2001
2002    #[test]
2003    fn test_queued_max_messages_kbytes_in_rdkafka_config() {
2004        let cfg =
2005            KafkaSourceConfig::from_config(&make_config(&[("queued.max.messages.kbytes", "8192")]))
2006                .unwrap();
2007        let rdkafka = cfg.to_rdkafka_config();
2008        assert_eq!(rdkafka.get("queued.max.messages.kbytes"), Some("8192"));
2009    }
2010
2011    // ── Schema Registry subject-name strategy ──
2012
2013    #[test]
2014    fn resolve_subject_topic_name() {
2015        assert_eq!(
2016            resolve_value_subject(SubjectNameStrategy::TopicName, None, "orders"),
2017            "orders-value"
2018        );
2019    }
2020
2021    #[test]
2022    fn resolve_subject_record_name() {
2023        assert_eq!(
2024            resolve_value_subject(
2025                SubjectNameStrategy::RecordName,
2026                Some("com.acme.Order"),
2027                "orders"
2028            ),
2029            "com.acme.Order-value"
2030        );
2031    }
2032
2033    #[test]
2034    fn resolve_subject_topic_record_name() {
2035        assert_eq!(
2036            resolve_value_subject(
2037                SubjectNameStrategy::TopicRecordName,
2038                Some("com.acme.Order"),
2039                "orders"
2040            ),
2041            "orders-com.acme.Order-value"
2042        );
2043    }
2044
2045    #[test]
2046    fn parse_subject_strategy_from_config() {
2047        let cfg = KafkaSourceConfig::from_config(&make_config(&[
2048            ("schema.registry.url", "http://sr:8081"),
2049            ("schema.registry.subject.name.strategy", "record-name"),
2050            ("schema.registry.record.name", "com.acme.Order"),
2051        ]))
2052        .unwrap();
2053        assert_eq!(
2054            cfg.schema_registry_subject_strategy,
2055            SubjectNameStrategy::RecordName
2056        );
2057        assert_eq!(
2058            cfg.schema_registry_record_name.as_deref(),
2059            Some("com.acme.Order")
2060        );
2061    }
2062
2063    #[test]
2064    fn parse_subject_strategy_rejects_missing_record_name() {
2065        let err = KafkaSourceConfig::from_config(&make_config(&[
2066            ("schema.registry.url", "http://sr:8081"),
2067            ("schema.registry.subject.name.strategy", "record-name"),
2068        ]))
2069        .unwrap_err();
2070        assert!(matches!(err, ConnectorError::ConfigurationError(_)));
2071    }
2072
2073    #[test]
2074    fn parse_discovery_timeout_default_ten_seconds() {
2075        let cfg =
2076            KafkaSourceConfig::from_config(&make_config(&[("schema.registry.url", "http://sr")]))
2077                .unwrap();
2078        assert_eq!(
2079            cfg.schema_registry_discovery_timeout,
2080            Duration::from_secs(10)
2081        );
2082    }
2083
2084    #[test]
2085    fn parse_discovery_timeout_override() {
2086        let cfg = KafkaSourceConfig::from_config(&make_config(&[
2087            ("schema.registry.url", "http://sr"),
2088            ("schema.registry.discovery.timeout.ms", "25000"),
2089        ]))
2090        .unwrap();
2091        assert_eq!(
2092            cfg.schema_registry_discovery_timeout,
2093            Duration::from_secs(25)
2094        );
2095    }
2096}