Skip to main content

laminar_connectors/kafka/
config.rs

1//! Kafka source connector configuration.
2//!
3//! Provides [`KafkaSourceConfig`] for configuring the Kafka consumer,
4//! including broker connection, format, Schema Registry, backpressure,
5//! and pass-through `rdkafka` properties.
6
7use std::collections::HashMap;
8use std::time::Duration;
9
10use rdkafka::config::ClientConfig;
11
12use crate::config::ConnectorConfig;
13use crate::error::ConnectorError;
14use crate::serde::Format;
15
16/// Kafka security protocol for broker connections.
17///
18/// Determines encryption (SSL/TLS) and authentication (SASL) requirements.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub enum SecurityProtocol {
21    /// Plain-text communication (no encryption, no authentication).
22    #[default]
23    Plaintext,
24    /// SSL/TLS encryption without SASL authentication.
25    Ssl,
26    /// SASL authentication over plain-text connection.
27    SaslPlaintext,
28    /// SASL authentication over SSL/TLS encrypted connection.
29    SaslSsl,
30}
31
32impl SecurityProtocol {
33    /// Returns the rdkafka config value string.
34    #[must_use]
35    pub fn as_rdkafka_str(&self) -> &'static str {
36        match self {
37            SecurityProtocol::Plaintext => "plaintext",
38            SecurityProtocol::Ssl => "ssl",
39            SecurityProtocol::SaslPlaintext => "sasl_plaintext",
40            SecurityProtocol::SaslSsl => "sasl_ssl",
41        }
42    }
43
44    /// Returns true if this protocol uses SSL/TLS.
45    #[must_use]
46    pub fn uses_ssl(&self) -> bool {
47        matches!(self, SecurityProtocol::Ssl | SecurityProtocol::SaslSsl)
48    }
49
50    /// Returns true if this protocol uses SASL authentication.
51    #[must_use]
52    pub fn uses_sasl(&self) -> bool {
53        matches!(
54            self,
55            SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl
56        )
57    }
58}
59
60str_enum!(fromstr SecurityProtocol, lowercase, ConnectorError, "invalid security.protocol",
61    Plaintext => "plaintext";
62    Ssl => "ssl";
63    SaslPlaintext => "sasl_plaintext";
64    SaslSsl => "sasl_ssl"
65);
66
67impl std::fmt::Display for SecurityProtocol {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        write!(f, "{}", self.as_rdkafka_str())
70    }
71}
72
73/// SASL authentication mechanism for Kafka.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
75pub enum SaslMechanism {
76    /// PLAIN: Simple username/password authentication.
77    #[default]
78    Plain,
79    /// SCRAM-SHA-256: Salted Challenge Response Authentication Mechanism.
80    ScramSha256,
81    /// SCRAM-SHA-512: Salted Challenge Response Authentication Mechanism (stronger).
82    ScramSha512,
83    /// GSSAPI: Kerberos authentication.
84    Gssapi,
85    /// OAUTHBEARER: OAuth 2.0 bearer token authentication.
86    Oauthbearer,
87}
88
89impl SaslMechanism {
90    /// Returns the rdkafka config value string.
91    #[must_use]
92    pub fn as_rdkafka_str(&self) -> &'static str {
93        match self {
94            SaslMechanism::Plain => "PLAIN",
95            SaslMechanism::ScramSha256 => "SCRAM-SHA-256",
96            SaslMechanism::ScramSha512 => "SCRAM-SHA-512",
97            SaslMechanism::Gssapi => "GSSAPI",
98            SaslMechanism::Oauthbearer => "OAUTHBEARER",
99        }
100    }
101
102    /// Returns true if this mechanism requires username/password.
103    #[must_use]
104    pub fn requires_credentials(&self) -> bool {
105        matches!(
106            self,
107            SaslMechanism::Plain | SaslMechanism::ScramSha256 | SaslMechanism::ScramSha512
108        )
109    }
110}
111
112str_enum!(fromstr SaslMechanism, uppercase, ConnectorError, "invalid sasl.mechanism",
113    Plain => "PLAIN";
114    ScramSha256 => "SCRAM_SHA_256", "SCRAM_SHA256";
115    ScramSha512 => "SCRAM_SHA_512", "SCRAM_SHA512";
116    Gssapi => "GSSAPI", "KERBEROS";
117    Oauthbearer => "OAUTHBEARER", "OAUTH"
118);
119
120impl std::fmt::Display for SaslMechanism {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        write!(f, "{}", self.as_rdkafka_str())
123    }
124}
125
126/// Consumer isolation level for reading transactional messages.
127///
128/// Controls whether to read uncommitted messages from transactional producers.
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
130pub enum IsolationLevel {
131    /// Read all messages including uncommitted transactional messages.
132    ReadUncommitted,
133    /// Only read committed messages (recommended for transactional pipelines).
134    #[default]
135    ReadCommitted,
136}
137
138impl IsolationLevel {
139    /// Returns the rdkafka config value string.
140    #[must_use]
141    pub fn as_rdkafka_str(&self) -> &'static str {
142        match self {
143            IsolationLevel::ReadUncommitted => "read_uncommitted",
144            IsolationLevel::ReadCommitted => "read_committed",
145        }
146    }
147}
148
149str_enum!(fromstr IsolationLevel, lowercase, ConnectorError, "invalid isolation.level",
150    ReadUncommitted => "read_uncommitted";
151    ReadCommitted => "read_committed"
152);
153
154impl std::fmt::Display for IsolationLevel {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        write!(f, "{}", self.as_rdkafka_str())
157    }
158}
159
160/// Consumer startup mode controlling where to begin consuming.
161///
162/// This is a higher-level abstraction than `auto.offset.reset` that provides
163/// more control over initial positioning, including timestamp-based and
164/// partition-specific offset assignment.
165#[derive(Debug, Clone, PartialEq, Eq, Default)]
166pub enum StartupMode {
167    /// Use committed group offsets, fall back to `auto.offset.reset` if none exist.
168    #[default]
169    GroupOffsets,
170    /// Start from the earliest available offset in each partition.
171    Earliest,
172    /// Start from the latest offset in each partition (only new messages).
173    Latest,
174    /// Start from specific offsets per partition (`partition_id` -> offset).
175    SpecificOffsets(HashMap<i32, i64>),
176    /// Start from a specific timestamp (milliseconds since epoch).
177    /// The consumer seeks to the first message with timestamp >= this value.
178    Timestamp(i64),
179}
180
181impl StartupMode {
182    /// Returns true if this mode overrides auto.offset.reset behavior.
183    #[must_use]
184    pub fn overrides_offset_reset(&self) -> bool {
185        !matches!(self, StartupMode::GroupOffsets)
186    }
187
188    /// Returns the equivalent auto.offset.reset value, if applicable.
189    #[must_use]
190    pub fn as_offset_reset(&self) -> Option<&'static str> {
191        match self {
192            StartupMode::Earliest => Some("earliest"),
193            StartupMode::Latest => Some("latest"),
194            StartupMode::GroupOffsets
195            | StartupMode::SpecificOffsets(_)
196            | StartupMode::Timestamp(_) => None,
197        }
198    }
199}
200
201impl std::str::FromStr for StartupMode {
202    type Err = ConnectorError;
203
204    fn from_str(s: &str) -> Result<Self, Self::Err> {
205        match s.to_lowercase().replace('-', "_").as_str() {
206            "group_offsets" | "group" => Ok(StartupMode::GroupOffsets),
207            "earliest" => Ok(StartupMode::Earliest),
208            "latest" => Ok(StartupMode::Latest),
209            other => Err(ConnectorError::ConfigurationError(format!(
210                "invalid startup.mode: '{other}' (expected group-offsets/earliest/latest)"
211            ))),
212        }
213    }
214}
215
216impl std::fmt::Display for StartupMode {
217    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218        match self {
219            StartupMode::GroupOffsets => write!(f, "group-offsets"),
220            StartupMode::Earliest => write!(f, "earliest"),
221            StartupMode::Latest => write!(f, "latest"),
222            StartupMode::SpecificOffsets(offsets) => {
223                write!(f, "specific-offsets({} partitions)", offsets.len())
224            }
225            StartupMode::Timestamp(ts) => write!(f, "timestamp({ts})"),
226        }
227    }
228}
229
230/// Topic subscription mode: explicit list or regex pattern.
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub enum TopicSubscription {
233    /// Subscribe to a specific list of topic names.
234    Topics(Vec<String>),
235    /// Subscribe to topics matching a regex pattern (e.g., `events-.*`).
236    Pattern(String),
237}
238
239impl TopicSubscription {
240    /// Returns the topic names if this is a `Topics` subscription.
241    #[must_use]
242    pub fn topics(&self) -> Option<&[String]> {
243        match self {
244            TopicSubscription::Topics(t) => Some(t),
245            TopicSubscription::Pattern(_) => None,
246        }
247    }
248
249    /// Returns the pattern if this is a `Pattern` subscription.
250    #[must_use]
251    pub fn pattern(&self) -> Option<&str> {
252        match self {
253            TopicSubscription::Topics(_) => None,
254            TopicSubscription::Pattern(p) => Some(p),
255        }
256    }
257
258    /// Returns true if this is a pattern-based subscription.
259    #[must_use]
260    pub fn is_pattern(&self) -> bool {
261        matches!(self, TopicSubscription::Pattern(_))
262    }
263}
264
265impl Default for TopicSubscription {
266    fn default() -> Self {
267        TopicSubscription::Topics(Vec::new())
268    }
269}
270
271/// Auto-offset reset policy for new consumer groups.
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub enum OffsetReset {
274    /// Start from the earliest available offset.
275    Earliest,
276    /// Start from the latest offset (only new messages).
277    Latest,
278    /// Fail if no committed offset exists.
279    None,
280}
281
282impl OffsetReset {
283    /// Returns the rdkafka config value string.
284    #[must_use]
285    pub fn as_rdkafka_str(&self) -> &'static str {
286        match self {
287            OffsetReset::Earliest => "earliest",
288            OffsetReset::Latest => "latest",
289            OffsetReset::None => "error",
290        }
291    }
292}
293
294str_enum!(fromstr OffsetReset, lowercase_nodash, ConnectorError, "invalid auto.offset.reset",
295    Earliest => "earliest", "beginning";
296    Latest => "latest", "end";
297    None => "none", "error"
298);
299
300/// Kafka partition assignment strategy.
301#[derive(Debug, Clone, Copy, PartialEq, Eq)]
302pub enum AssignmentStrategy {
303    /// Range assignment (default).
304    Range,
305    /// Round-robin assignment.
306    RoundRobin,
307    /// Cooperative sticky assignment.
308    CooperativeSticky,
309}
310
311impl AssignmentStrategy {
312    /// Returns the rdkafka config value string.
313    #[must_use]
314    pub fn as_rdkafka_str(&self) -> &'static str {
315        match self {
316            AssignmentStrategy::Range => "range",
317            AssignmentStrategy::RoundRobin => "roundrobin",
318            AssignmentStrategy::CooperativeSticky => "cooperative-sticky",
319        }
320    }
321}
322
323str_enum!(fromstr AssignmentStrategy, lowercase_nodash, ConnectorError,
324    "invalid partition.assignment.strategy",
325    Range => "range";
326    RoundRobin => "roundrobin", "round-robin", "round_robin";
327    CooperativeSticky => "cooperative-sticky", "cooperative_sticky"
328);
329
330/// Schema Registry compatibility level.
331#[derive(Debug, Clone, Copy, PartialEq, Eq)]
332pub enum CompatibilityLevel {
333    /// New schema can read old data.
334    Backward,
335    /// Backward compatible with all prior versions.
336    BackwardTransitive,
337    /// Old schema can read new data.
338    Forward,
339    /// Forward compatible with all prior versions.
340    ForwardTransitive,
341    /// Both backward and forward compatible.
342    Full,
343    /// Full compatible with all prior versions.
344    FullTransitive,
345    /// No compatibility checking.
346    None,
347}
348
349impl CompatibilityLevel {
350    /// Returns the Schema Registry API string.
351    #[must_use]
352    pub fn as_str(&self) -> &'static str {
353        match self {
354            CompatibilityLevel::Backward => "BACKWARD",
355            CompatibilityLevel::BackwardTransitive => "BACKWARD_TRANSITIVE",
356            CompatibilityLevel::Forward => "FORWARD",
357            CompatibilityLevel::ForwardTransitive => "FORWARD_TRANSITIVE",
358            CompatibilityLevel::Full => "FULL",
359            CompatibilityLevel::FullTransitive => "FULL_TRANSITIVE",
360            CompatibilityLevel::None => "NONE",
361        }
362    }
363}
364
365str_enum!(fromstr CompatibilityLevel, uppercase, ConnectorError, "invalid schema.compatibility",
366    Backward => "BACKWARD";
367    BackwardTransitive => "BACKWARD_TRANSITIVE";
368    Forward => "FORWARD";
369    ForwardTransitive => "FORWARD_TRANSITIVE";
370    Full => "FULL";
371    FullTransitive => "FULL_TRANSITIVE";
372    None => "NONE"
373);
374
375impl std::fmt::Display for CompatibilityLevel {
376    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377        write!(f, "{}", self.as_str())
378    }
379}
380
381/// Schema Registry authentication credentials.
382#[derive(Clone)]
383pub struct SrAuth {
384    /// Basic auth username.
385    pub username: String,
386    /// Basic auth password.
387    pub password: String,
388}
389
390impl std::fmt::Debug for SrAuth {
391    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392        f.debug_struct("SrAuth")
393            .field("username", &self.username)
394            .field("password", &"***")
395            .finish()
396    }
397}
398
399/// Kafka source connector configuration.
400///
401/// Uses a custom `Debug` impl that redacts `sasl_password` and
402/// `ssl_key_password` to prevent credential leakage in logs.
403#[derive(Clone)]
404pub struct KafkaSourceConfig {
405    // -- Required --
406    /// Comma-separated list of broker addresses.
407    pub bootstrap_servers: String,
408    /// Consumer group identifier.
409    pub group_id: String,
410    /// Topic subscription (explicit list or regex pattern).
411    pub subscription: TopicSubscription,
412
413    // -- Security --
414    /// Security protocol for broker connections.
415    pub security_protocol: SecurityProtocol,
416    /// SASL authentication mechanism.
417    pub sasl_mechanism: Option<SaslMechanism>,
418    /// SASL username (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
419    pub sasl_username: Option<String>,
420    /// SASL password (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
421    pub sasl_password: Option<String>,
422    /// Path to SSL CA certificate file (PEM format).
423    pub ssl_ca_location: Option<String>,
424    /// Path to client SSL certificate file (PEM format).
425    pub ssl_certificate_location: Option<String>,
426    /// Path to client SSL private key file (PEM format).
427    pub ssl_key_location: Option<String>,
428    /// Password for encrypted SSL private key.
429    pub ssl_key_password: Option<String>,
430
431    // -- Format & Schema --
432    /// Data format for deserialization.
433    pub format: Format,
434    /// Confluent Schema Registry URL.
435    pub schema_registry_url: Option<String>,
436    /// Schema Registry authentication credentials.
437    pub schema_registry_auth: Option<SrAuth>,
438    /// Override compatibility level for the subject.
439    pub schema_compatibility: Option<CompatibilityLevel>,
440    /// Schema Registry SSL CA certificate path.
441    pub schema_registry_ssl_ca_location: Option<String>,
442    /// Schema Registry SSL client certificate path.
443    pub schema_registry_ssl_certificate_location: Option<String>,
444    /// Schema Registry SSL client key path.
445    pub schema_registry_ssl_key_location: Option<String>,
446    /// Column name containing the event timestamp.
447    pub event_time_column: Option<String>,
448    /// Whether to include Kafka metadata columns (_partition, _offset, _timestamp).
449    pub include_metadata: bool,
450    /// Whether to include Kafka headers as a map column (_headers).
451    pub include_headers: bool,
452
453    // -- Consumer tuning --
454    /// Consumer startup mode (controls initial offset positioning).
455    pub startup_mode: StartupMode,
456    /// Where to start reading when no committed offset exists.
457    pub auto_offset_reset: OffsetReset,
458    /// Consumer transaction isolation level.
459    pub isolation_level: IsolationLevel,
460    /// Maximum records per poll batch.
461    pub max_poll_records: usize,
462    /// Partition assignment strategy.
463    pub partition_assignment_strategy: AssignmentStrategy,
464    /// Minimum bytes to return from a fetch (allows batching).
465    pub fetch_min_bytes: Option<i32>,
466    /// Maximum bytes to return from broker per request.
467    pub fetch_max_bytes: Option<i32>,
468    /// Maximum time broker waits for fetch.min.bytes.
469    pub fetch_max_wait_ms: Option<i32>,
470    /// Maximum bytes per partition to return from broker.
471    pub max_partition_fetch_bytes: Option<i32>,
472
473    // -- Watermark --
474    /// Maximum expected out-of-orderness for watermark generation.
475    pub max_out_of_orderness: Duration,
476    /// Timeout before marking a partition as idle.
477    pub idle_timeout: Duration,
478    /// Enable per-partition watermark tracking (integrates with watermark tracking).
479    pub enable_watermark_tracking: bool,
480    /// Alignment group ID for multi-source coordination (integrates with watermark tracking).
481    pub alignment_group_id: Option<String>,
482    /// Maximum allowed drift between sources in alignment group.
483    pub alignment_max_drift: Option<Duration>,
484    /// Enforcement mode for watermark alignment.
485    pub alignment_mode: Option<super::watermarks::KafkaAlignmentMode>,
486
487    // -- Broker commit --
488    /// Interval at which to asynchronously commit offsets to the Kafka broker.
489    ///
490    /// This is advisory — it keeps `kafka-consumer-groups` lag monitoring
491    /// accurate. The authoritative offset state lives in `LaminarDB`'s
492    /// checkpoint system. Set to `Duration::ZERO` to disable periodic
493    /// broker commits (default: 60s).
494    pub broker_commit_interval: Duration,
495
496    // -- Backpressure --
497    /// Channel fill ratio at which to pause consumption.
498    pub backpressure_high_watermark: f64,
499    /// Channel fill ratio at which to resume consumption.
500    pub backpressure_low_watermark: f64,
501
502    // -- Pass-through --
503    /// Additional rdkafka properties passed directly to librdkafka.
504    pub kafka_properties: HashMap<String, String>,
505}
506
507impl std::fmt::Debug for KafkaSourceConfig {
508    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
509        f.debug_struct("KafkaSourceConfig")
510            .field("bootstrap_servers", &self.bootstrap_servers)
511            .field("group_id", &self.group_id)
512            .field("subscription", &self.subscription)
513            .field("format", &self.format)
514            .field("security_protocol", &self.security_protocol)
515            .field("sasl_mechanism", &self.sasl_mechanism)
516            .field("sasl_username", &self.sasl_username)
517            .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
518            .field(
519                "ssl_key_password",
520                &self.ssl_key_password.as_ref().map(|_| "***"),
521            )
522            .field("max_poll_records", &self.max_poll_records)
523            .finish_non_exhaustive()
524    }
525}
526
527impl Default for KafkaSourceConfig {
528    fn default() -> Self {
529        Self {
530            bootstrap_servers: String::new(),
531            group_id: String::new(),
532            subscription: TopicSubscription::default(),
533            security_protocol: SecurityProtocol::default(),
534            sasl_mechanism: None,
535            sasl_username: None,
536            sasl_password: None,
537            ssl_ca_location: None,
538            ssl_certificate_location: None,
539            ssl_key_location: None,
540            ssl_key_password: None,
541            format: Format::Json,
542            schema_registry_url: None,
543            schema_registry_auth: None,
544            schema_compatibility: None,
545            schema_registry_ssl_ca_location: None,
546            schema_registry_ssl_certificate_location: None,
547            schema_registry_ssl_key_location: None,
548            event_time_column: None,
549            include_metadata: false,
550            include_headers: false,
551            startup_mode: StartupMode::default(),
552            auto_offset_reset: OffsetReset::Earliest,
553            isolation_level: IsolationLevel::default(),
554            max_poll_records: 1000,
555            partition_assignment_strategy: AssignmentStrategy::Range,
556            fetch_min_bytes: None,
557            fetch_max_bytes: None,
558            fetch_max_wait_ms: None,
559            max_partition_fetch_bytes: None,
560            max_out_of_orderness: Duration::from_secs(5),
561            idle_timeout: Duration::from_secs(30),
562            enable_watermark_tracking: false,
563            alignment_group_id: None,
564            alignment_max_drift: None,
565            alignment_mode: None,
566            broker_commit_interval: Duration::from_secs(60),
567            backpressure_high_watermark: 0.8,
568            backpressure_low_watermark: 0.5,
569            kafka_properties: HashMap::new(),
570        }
571    }
572}
573
574impl KafkaSourceConfig {
575    /// Parses a [`KafkaSourceConfig`] from a [`ConnectorConfig`].
576    ///
577    /// # Errors
578    ///
579    /// Returns `ConnectorError` if required fields are missing or values are invalid.
580    #[allow(deprecated, clippy::too_many_lines)]
581    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
582        let bootstrap_servers = config.require("bootstrap.servers")?.to_string();
583        let group_id = config.require("group.id")?.to_string();
584
585        let subscription = if let Some(pattern) = config.get("topic.pattern") {
586            TopicSubscription::Pattern(pattern.to_string())
587        } else {
588            let topics_str = config.require("topic")?;
589            let topics: Vec<String> = topics_str
590                .split(',')
591                .map(|s| s.trim().to_string())
592                .collect();
593            TopicSubscription::Topics(topics.clone())
594        };
595
596        let security_protocol = match config.get("security.protocol") {
597            Some(s) => s.parse::<SecurityProtocol>()?,
598            None => SecurityProtocol::default(),
599        };
600
601        let sasl_mechanism = match config.get("sasl.mechanism") {
602            Some(s) => Some(s.parse::<SaslMechanism>()?),
603            None => None,
604        };
605
606        let sasl_username = config.get("sasl.username").map(String::from);
607        let sasl_password = config.get("sasl.password").map(String::from);
608        let ssl_ca_location = config.get("ssl.ca.location").map(String::from);
609        let ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
610        let ssl_key_location = config.get("ssl.key.location").map(String::from);
611        let ssl_key_password = config.get("ssl.key.password").map(String::from);
612
613        let format = match config.get("format") {
614            Some(f) => f
615                .parse::<Format>()
616                .map_err(|e| ConnectorError::ConfigurationError(e.to_string()))?,
617            None => Format::Json,
618        };
619
620        let schema_registry_url = config.get("schema.registry.url").map(String::from);
621
622        let schema_registry_auth = match (
623            config.get("schema.registry.username"),
624            config.get("schema.registry.password"),
625        ) {
626            (Some(u), Some(p)) => Some(SrAuth {
627                username: u.to_string(),
628                password: p.to_string(),
629            }),
630            (Some(_), None) | (None, Some(_)) => {
631                return Err(ConnectorError::ConfigurationError(
632                    "schema.registry.username and schema.registry.password must both be set"
633                        .to_string(),
634                ));
635            }
636            (None, None) => None,
637        };
638
639        let schema_compatibility = match config.get("schema.compatibility") {
640            Some(s) => Some(s.parse::<CompatibilityLevel>()?),
641            None => None,
642        };
643
644        let schema_registry_ssl_ca_location = config
645            .get("schema.registry.ssl.ca.location")
646            .map(String::from);
647        let schema_registry_ssl_certificate_location = config
648            .get("schema.registry.ssl.certificate.location")
649            .map(String::from);
650        let schema_registry_ssl_key_location = config
651            .get("schema.registry.ssl.key.location")
652            .map(String::from);
653
654        let event_time_column = config.get("event.time.column").map(String::from);
655
656        let include_metadata = config
657            .get_parsed::<bool>("include.metadata")?
658            .unwrap_or(false);
659
660        let include_headers = config
661            .get_parsed::<bool>("include.headers")?
662            .unwrap_or(false);
663
664        let startup_mode = if let Some(offsets_str) = config.get("startup.specific.offsets") {
665            let offsets = parse_specific_offsets(offsets_str)?;
666            StartupMode::SpecificOffsets(offsets)
667        } else if let Some(ts_str) = config.get("startup.timestamp.ms") {
668            let ts: i64 = ts_str.parse().map_err(|_| {
669                ConnectorError::ConfigurationError(format!(
670                    "invalid startup.timestamp.ms: '{ts_str}'"
671                ))
672            })?;
673            StartupMode::Timestamp(ts)
674        } else {
675            match config.get("startup.mode") {
676                Some(s) => s.parse::<StartupMode>()?,
677                None => StartupMode::default(),
678            }
679        };
680
681        let auto_offset_reset = match config.get("auto.offset.reset") {
682            Some(s) => s.parse::<OffsetReset>()?,
683            None => OffsetReset::Earliest,
684        };
685
686        let isolation_level = match config.get("isolation.level") {
687            Some(s) => s.parse::<IsolationLevel>()?,
688            None => IsolationLevel::default(),
689        };
690
691        let max_poll_records = config
692            .get_parsed::<usize>("max.poll.records")?
693            .unwrap_or(1000);
694
695        let partition_assignment_strategy = match config.get("partition.assignment.strategy") {
696            Some(s) => s.parse::<AssignmentStrategy>()?,
697            None => AssignmentStrategy::Range,
698        };
699
700        let fetch_min_bytes = config.get_parsed::<i32>("fetch.min.bytes")?;
701        let fetch_max_bytes = config.get_parsed::<i32>("fetch.max.bytes")?;
702        let fetch_max_wait_ms = config.get_parsed::<i32>("fetch.max.wait.ms")?;
703        let max_partition_fetch_bytes = config.get_parsed::<i32>("max.partition.fetch.bytes")?;
704
705        let max_out_of_orderness_ms = config
706            .get_parsed::<u64>("max.out.of.orderness.ms")?
707            .unwrap_or(5000);
708
709        let idle_timeout_ms = config
710            .get_parsed::<u64>("idle.timeout.ms")?
711            .unwrap_or(30_000);
712
713        let enable_watermark_tracking = config
714            .get_parsed::<bool>("enable.watermark.tracking")?
715            .unwrap_or(false);
716
717        let alignment_group_id = config.get("alignment.group.id").map(String::from);
718
719        let alignment_max_drift_ms = config.get_parsed::<u64>("alignment.max.drift.ms")?;
720
721        let alignment_mode = match config.get("alignment.mode") {
722            Some(s) => Some(
723                s.parse::<super::watermarks::KafkaAlignmentMode>()
724                    .map_err(ConnectorError::ConfigurationError)?,
725            ),
726            None => None,
727        };
728
729        let broker_commit_interval_ms = config
730            .get_parsed::<u64>("broker.commit.interval.ms")?
731            .unwrap_or(60_000);
732
733        let backpressure_high_watermark = config
734            .get_parsed::<f64>("backpressure.high.watermark")?
735            .unwrap_or(0.8);
736
737        let backpressure_low_watermark = config
738            .get_parsed::<f64>("backpressure.low.watermark")?
739            .unwrap_or(0.5);
740
741        let kafka_properties = config.properties_with_prefix("kafka.");
742
743        let cfg = Self {
744            bootstrap_servers,
745            group_id,
746            subscription,
747            security_protocol,
748            sasl_mechanism,
749            sasl_username,
750            sasl_password,
751            ssl_ca_location,
752            ssl_certificate_location,
753            ssl_key_location,
754            ssl_key_password,
755            format,
756            schema_registry_url,
757            schema_registry_auth,
758            schema_compatibility,
759            schema_registry_ssl_ca_location,
760            schema_registry_ssl_certificate_location,
761            schema_registry_ssl_key_location,
762            event_time_column,
763            include_metadata,
764            include_headers,
765            startup_mode,
766            auto_offset_reset,
767            isolation_level,
768            max_poll_records,
769            partition_assignment_strategy,
770            fetch_min_bytes,
771            fetch_max_bytes,
772            fetch_max_wait_ms,
773            max_partition_fetch_bytes,
774            max_out_of_orderness: Duration::from_millis(max_out_of_orderness_ms),
775            idle_timeout: Duration::from_millis(idle_timeout_ms),
776            enable_watermark_tracking,
777            alignment_group_id,
778            alignment_max_drift: alignment_max_drift_ms.map(Duration::from_millis),
779            alignment_mode,
780            broker_commit_interval: Duration::from_millis(broker_commit_interval_ms),
781            backpressure_high_watermark,
782            backpressure_low_watermark,
783            kafka_properties,
784        };
785
786        cfg.validate()?;
787        Ok(cfg)
788    }
789
790    /// Validates the configuration.
791    ///
792    /// # Errors
793    ///
794    /// Returns `ConnectorError::ConfigurationError` if the configuration is invalid.
795    pub fn validate(&self) -> Result<(), ConnectorError> {
796        if self.bootstrap_servers.is_empty() {
797            return Err(ConnectorError::ConfigurationError(
798                "bootstrap.servers cannot be empty".into(),
799            ));
800        }
801        if self.group_id.is_empty() {
802            return Err(ConnectorError::ConfigurationError(
803                "group.id cannot be empty".into(),
804            ));
805        }
806
807        // Validate topic subscription
808        match &self.subscription {
809            TopicSubscription::Topics(t) if t.is_empty() => {
810                return Err(ConnectorError::ConfigurationError(
811                    "at least one topic is required (or use topic.pattern)".into(),
812                ));
813            }
814            TopicSubscription::Pattern(p) if p.is_empty() => {
815                return Err(ConnectorError::ConfigurationError(
816                    "topic.pattern cannot be empty".into(),
817                ));
818            }
819            _ => {}
820        }
821
822        if self.max_poll_records == 0 {
823            return Err(ConnectorError::ConfigurationError(
824                "max.poll.records must be > 0".into(),
825            ));
826        }
827
828        if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
829            return Err(ConnectorError::ConfigurationError(
830                "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
831                    .into(),
832            ));
833        }
834
835        if let Some(mechanism) = &self.sasl_mechanism {
836            if mechanism.requires_credentials()
837                && (self.sasl_username.is_none() || self.sasl_password.is_none())
838            {
839                return Err(ConnectorError::ConfigurationError(format!(
840                    "sasl.username and sasl.password are required for {mechanism} mechanism"
841                )));
842            }
843        }
844
845        if self.security_protocol.uses_ssl() {
846            if let Some(ref ca) = self.ssl_ca_location {
847                if ca.is_empty() {
848                    return Err(ConnectorError::ConfigurationError(
849                        "ssl.ca.location cannot be empty when specified".into(),
850                    ));
851                }
852            }
853        }
854
855        if self.backpressure_high_watermark <= self.backpressure_low_watermark {
856            return Err(ConnectorError::ConfigurationError(
857                "backpressure.high.watermark must be > backpressure.low.watermark".into(),
858            ));
859        }
860        if !(0.0..=1.0).contains(&self.backpressure_high_watermark) {
861            return Err(ConnectorError::ConfigurationError(
862                "backpressure.high.watermark must be between 0.0 and 1.0".into(),
863            ));
864        }
865        if !(0.0..=1.0).contains(&self.backpressure_low_watermark) {
866            return Err(ConnectorError::ConfigurationError(
867                "backpressure.low.watermark must be between 0.0 and 1.0".into(),
868            ));
869        }
870
871        if self.format == Format::Avro && self.schema_registry_url.is_none() {
872            return Err(ConnectorError::ConfigurationError(
873                "schema.registry.url is required when format is 'avro'".into(),
874            ));
875        }
876
877        Ok(())
878    }
879
880    /// Builds an rdkafka [`ClientConfig`] from this configuration.
881    #[must_use]
882    pub fn to_rdkafka_config(&self) -> ClientConfig {
883        let mut config = ClientConfig::new();
884
885        config.set("bootstrap.servers", &self.bootstrap_servers);
886        config.set("group.id", &self.group_id);
887        config.set("enable.auto.commit", "false");
888        config.set("auto.offset.reset", self.auto_offset_reset.as_rdkafka_str());
889        config.set(
890            "partition.assignment.strategy",
891            self.partition_assignment_strategy.as_rdkafka_str(),
892        );
893        config.set("security.protocol", self.security_protocol.as_rdkafka_str());
894
895        if let Some(ref mechanism) = self.sasl_mechanism {
896            config.set("sasl.mechanism", mechanism.as_rdkafka_str());
897        }
898        if let Some(ref username) = self.sasl_username {
899            config.set("sasl.username", username);
900        }
901        if let Some(ref password) = self.sasl_password {
902            config.set("sasl.password", password);
903        }
904        if let Some(ref ca) = self.ssl_ca_location {
905            config.set("ssl.ca.location", ca);
906        }
907        if let Some(ref cert) = self.ssl_certificate_location {
908            config.set("ssl.certificate.location", cert);
909        }
910        if let Some(ref key) = self.ssl_key_location {
911            config.set("ssl.key.location", key);
912        }
913        if let Some(ref key_pass) = self.ssl_key_password {
914            config.set("ssl.key.password", key_pass);
915        }
916
917        config.set("isolation.level", self.isolation_level.as_rdkafka_str());
918
919        if let Some(fetch_min) = self.fetch_min_bytes {
920            config.set("fetch.min.bytes", fetch_min.to_string());
921        }
922
923        if let Some(fetch_max) = self.fetch_max_bytes {
924            config.set("fetch.max.bytes", fetch_max.to_string());
925        }
926
927        if let Some(wait_ms) = self.fetch_max_wait_ms {
928            config.set("fetch.wait.max.ms", wait_ms.to_string());
929        }
930
931        if let Some(partition_max) = self.max_partition_fetch_bytes {
932            config.set("max.partition.fetch.bytes", partition_max.to_string());
933        }
934
935        // Apply pass-through properties, blocking security-critical keys
936        // that could silently downgrade authentication or break semantics.
937        for (key, value) in &self.kafka_properties {
938            if is_blocked_passthrough_key(key) {
939                tracing::warn!(
940                    key,
941                    "ignoring kafka.* pass-through property that overrides a security setting"
942                );
943                continue;
944            }
945            config.set(key, value);
946        }
947
948        config
949    }
950}
951
952/// Returns `true` if a pass-through kafka.* key must not override explicit settings.
953fn is_blocked_passthrough_key(key: &str) -> bool {
954    key.starts_with("sasl.kerberos.")
955        || matches!(
956            key,
957            "security.protocol"
958                | "sasl.mechanism"
959                | "sasl.username"
960                | "sasl.password"
961                | "sasl.oauthbearer.config"
962                | "ssl.ca.location"
963                | "ssl.certificate.location"
964                | "ssl.key.location"
965                | "ssl.key.password"
966                | "ssl.endpoint.identification.algorithm"
967                | "enable.auto.commit"
968                | "enable.idempotence"
969        )
970}
971
972/// Parses a specific offsets string in the format "partition:offset,partition:offset,...".
973///
974/// Example: "0:100,1:200,2:300" maps partition 0 to offset 100, partition 1 to offset 200, etc.
975fn parse_specific_offsets(s: &str) -> Result<HashMap<i32, i64>, ConnectorError> {
976    let mut offsets = HashMap::new();
977
978    for pair in s.split(',') {
979        let pair = pair.trim();
980        if pair.is_empty() {
981            continue;
982        }
983
984        let parts: Vec<&str> = pair.split(':').collect();
985        if parts.len() != 2 {
986            return Err(ConnectorError::ConfigurationError(format!(
987                "invalid offset format '{pair}' (expected 'partition:offset')"
988            )));
989        }
990
991        let partition: i32 = parts[0].trim().parse().map_err(|_| {
992            ConnectorError::ConfigurationError(format!(
993                "invalid partition number '{}' in '{pair}'",
994                parts[0]
995            ))
996        })?;
997
998        let offset: i64 = parts[1].trim().parse().map_err(|_| {
999            ConnectorError::ConfigurationError(format!("invalid offset '{}' in '{pair}'", parts[1]))
1000        })?;
1001
1002        offsets.insert(partition, offset);
1003    }
1004
1005    if offsets.is_empty() {
1006        return Err(ConnectorError::ConfigurationError(
1007            "startup.specific.offsets cannot be empty".into(),
1008        ));
1009    }
1010
1011    Ok(offsets)
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016    use super::*;
1017
1018    fn make_config(extra: &[(&str, &str)]) -> ConnectorConfig {
1019        let mut config = ConnectorConfig::new("kafka");
1020        config.set("bootstrap.servers", "localhost:9092");
1021        config.set("group.id", "test-group");
1022        config.set("topic", "events");
1023        for (k, v) in extra {
1024            config.set(*k, *v);
1025        }
1026        config
1027    }
1028
1029    #[test]
1030    #[allow(deprecated)]
1031    fn test_parse_required_fields() {
1032        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1033        assert_eq!(cfg.bootstrap_servers, "localhost:9092");
1034        assert_eq!(cfg.group_id, "test-group");
1035        assert_eq!(cfg.subscription.topics().unwrap(), &["events"]);
1036        assert!(matches!(
1037            cfg.subscription,
1038            TopicSubscription::Topics(ref t) if t == &["events"]
1039        ));
1040    }
1041
1042    #[test]
1043    fn test_parse_missing_required() {
1044        let config = ConnectorConfig::new("kafka");
1045        assert!(KafkaSourceConfig::from_config(&config).is_err());
1046    }
1047
1048    #[test]
1049    #[allow(deprecated)]
1050    fn test_parse_multi_topic() {
1051        let cfg = KafkaSourceConfig::from_config(&make_config(&[("topic", "a, b, c")])).unwrap();
1052        assert_eq!(cfg.subscription.topics().unwrap(), &["a", "b", "c"]);
1053        assert!(matches!(
1054            cfg.subscription,
1055            TopicSubscription::Topics(ref t) if t == &["a", "b", "c"]
1056        ));
1057    }
1058
1059    #[test]
1060    fn test_parse_topic_pattern() {
1061        let mut config = ConnectorConfig::new("kafka");
1062        config.set("bootstrap.servers", "localhost:9092");
1063        config.set("group.id", "test-group");
1064        config.set("topic.pattern", "events-.*");
1065
1066        let cfg = KafkaSourceConfig::from_config(&config).unwrap();
1067        assert!(matches!(
1068            cfg.subscription,
1069            TopicSubscription::Pattern(ref p) if p == "events-.*"
1070        ));
1071        assert!(cfg.subscription.is_pattern());
1072        assert_eq!(cfg.subscription.pattern(), Some("events-.*"));
1073        assert!(cfg.subscription.topics().is_none());
1074    }
1075
1076    #[test]
1077    fn test_parse_defaults() {
1078        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1079        assert_eq!(cfg.format, Format::Json);
1080        assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1081        assert_eq!(cfg.isolation_level, IsolationLevel::ReadCommitted);
1082        assert_eq!(cfg.max_poll_records, 1000);
1083        assert_eq!(cfg.partition_assignment_strategy, AssignmentStrategy::Range);
1084        assert!(!cfg.include_metadata);
1085        assert!(!cfg.include_headers);
1086        assert!(cfg.schema_registry_url.is_none());
1087        assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
1088        assert!(cfg.sasl_mechanism.is_none());
1089        assert_eq!(cfg.broker_commit_interval, Duration::from_secs(60));
1090    }
1091
1092    #[test]
1093    fn test_parse_broker_commit_interval() {
1094        let cfg =
1095            KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "5000")]))
1096                .unwrap();
1097        assert_eq!(cfg.broker_commit_interval, Duration::from_millis(5000));
1098    }
1099
1100    #[test]
1101    fn test_parse_broker_commit_interval_zero_disables() {
1102        let cfg =
1103            KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "0")]))
1104                .unwrap();
1105        assert!(cfg.broker_commit_interval.is_zero());
1106    }
1107
1108    #[test]
1109    fn test_parse_optional_fields() {
1110        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1111            ("format", "csv"),
1112            ("auto.offset.reset", "latest"),
1113            ("max.poll.records", "500"),
1114            ("include.metadata", "true"),
1115            ("include.headers", "true"),
1116            ("event.time.column", "ts"),
1117            ("partition.assignment.strategy", "roundrobin"),
1118            ("isolation.level", "read_uncommitted"),
1119        ]))
1120        .unwrap();
1121
1122        assert_eq!(cfg.format, Format::Csv);
1123        assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1124        assert_eq!(cfg.isolation_level, IsolationLevel::ReadUncommitted);
1125        assert_eq!(cfg.max_poll_records, 500);
1126        assert!(cfg.include_metadata);
1127        assert!(cfg.include_headers);
1128        assert_eq!(cfg.event_time_column, Some("ts".to_string()));
1129        assert_eq!(
1130            cfg.partition_assignment_strategy,
1131            AssignmentStrategy::RoundRobin
1132        );
1133    }
1134
1135    #[test]
1136    fn test_parse_security_sasl_ssl() {
1137        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1138            ("security.protocol", "sasl_ssl"),
1139            ("sasl.mechanism", "SCRAM-SHA-256"),
1140            ("sasl.username", "alice"),
1141            ("sasl.password", "secret"),
1142            ("ssl.ca.location", "/etc/ssl/ca.pem"),
1143        ]))
1144        .unwrap();
1145
1146        assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
1147        assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha256));
1148        assert_eq!(cfg.sasl_username, Some("alice".to_string()));
1149        assert_eq!(cfg.sasl_password, Some("secret".to_string()));
1150        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1151    }
1152
1153    #[test]
1154    fn test_parse_security_ssl_only() {
1155        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1156            ("security.protocol", "ssl"),
1157            ("ssl.ca.location", "/etc/ssl/ca.pem"),
1158            ("ssl.certificate.location", "/etc/ssl/client.pem"),
1159            ("ssl.key.location", "/etc/ssl/client.key"),
1160            ("ssl.key.password", "keypass"),
1161        ]))
1162        .unwrap();
1163
1164        assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
1165        assert!(cfg.security_protocol.uses_ssl());
1166        assert!(!cfg.security_protocol.uses_sasl());
1167        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1168        assert_eq!(
1169            cfg.ssl_certificate_location,
1170            Some("/etc/ssl/client.pem".to_string())
1171        );
1172        assert_eq!(
1173            cfg.ssl_key_location,
1174            Some("/etc/ssl/client.key".to_string())
1175        );
1176        assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
1177    }
1178
1179    #[test]
1180    fn test_parse_fetch_tuning() {
1181        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1182            ("fetch.min.bytes", "1024"),
1183            ("fetch.max.bytes", "52428800"),
1184            ("fetch.max.wait.ms", "500"),
1185            ("max.partition.fetch.bytes", "1048576"),
1186        ]))
1187        .unwrap();
1188
1189        assert_eq!(cfg.fetch_min_bytes, Some(1024));
1190        assert_eq!(cfg.fetch_max_bytes, Some(52_428_800));
1191        assert_eq!(cfg.fetch_max_wait_ms, Some(500));
1192        assert_eq!(cfg.max_partition_fetch_bytes, Some(1_048_576));
1193    }
1194
1195    #[test]
1196    fn test_parse_kafka_passthrough() {
1197        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1198            ("kafka.session.timeout.ms", "30000"),
1199            ("kafka.max.poll.interval.ms", "300000"),
1200        ]))
1201        .unwrap();
1202
1203        assert_eq!(cfg.kafka_properties.len(), 2);
1204        assert_eq!(
1205            cfg.kafka_properties.get("session.timeout.ms"),
1206            Some(&"30000".to_string())
1207        );
1208    }
1209
1210    #[test]
1211    fn test_parse_schema_registry() {
1212        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1213            ("format", "avro"),
1214            ("schema.registry.url", "http://localhost:8081"),
1215            ("schema.registry.username", "user"),
1216            ("schema.registry.password", "pass"),
1217            ("schema.compatibility", "FULL_TRANSITIVE"),
1218            ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
1219        ]))
1220        .unwrap();
1221
1222        assert_eq!(cfg.format, Format::Avro);
1223        assert_eq!(
1224            cfg.schema_registry_url,
1225            Some("http://localhost:8081".to_string())
1226        );
1227        assert!(cfg.schema_registry_auth.is_some());
1228        let auth = cfg.schema_registry_auth.unwrap();
1229        assert_eq!(auth.username, "user");
1230        assert_eq!(auth.password, "pass");
1231        assert_eq!(
1232            cfg.schema_compatibility,
1233            Some(CompatibilityLevel::FullTransitive)
1234        );
1235        assert_eq!(
1236            cfg.schema_registry_ssl_ca_location,
1237            Some("/etc/ssl/sr-ca.pem".to_string())
1238        );
1239    }
1240
1241    #[test]
1242    fn test_parse_sr_auth_partial() {
1243        let config = make_config(&[
1244            ("schema.registry.url", "http://localhost:8081"),
1245            ("schema.registry.username", "user"),
1246            // missing password
1247        ]);
1248        assert!(KafkaSourceConfig::from_config(&config).is_err());
1249    }
1250
1251    #[test]
1252    fn test_validate_avro_without_sr() {
1253        let mut cfg = KafkaSourceConfig::default();
1254        cfg.bootstrap_servers = "localhost:9092".into();
1255        cfg.group_id = "g".into();
1256        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1257        cfg.format = Format::Avro;
1258        // No schema_registry_url
1259        assert!(cfg.validate().is_err());
1260    }
1261
1262    #[test]
1263    fn test_validate_backpressure_watermarks() {
1264        let mut cfg = KafkaSourceConfig::default();
1265        cfg.bootstrap_servers = "localhost:9092".into();
1266        cfg.group_id = "g".into();
1267        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1268        cfg.backpressure_high_watermark = 0.3;
1269        cfg.backpressure_low_watermark = 0.5;
1270        assert!(cfg.validate().is_err());
1271    }
1272
1273    #[test]
1274    fn test_validate_sasl_without_mechanism() {
1275        let mut cfg = KafkaSourceConfig::default();
1276        cfg.bootstrap_servers = "localhost:9092".into();
1277        cfg.group_id = "g".into();
1278        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1279        cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1280        // sasl_mechanism not set
1281        assert!(cfg.validate().is_err());
1282    }
1283
1284    #[test]
1285    fn test_validate_sasl_plain_without_credentials() {
1286        let mut cfg = KafkaSourceConfig::default();
1287        cfg.bootstrap_servers = "localhost:9092".into();
1288        cfg.group_id = "g".into();
1289        cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1290        cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1291        cfg.sasl_mechanism = Some(SaslMechanism::Plain);
1292        // username/password not set
1293        assert!(cfg.validate().is_err());
1294    }
1295
1296    #[test]
1297    fn test_validate_empty_topic_pattern() {
1298        let mut cfg = KafkaSourceConfig::default();
1299        cfg.bootstrap_servers = "localhost:9092".into();
1300        cfg.group_id = "g".into();
1301        cfg.subscription = TopicSubscription::Pattern(String::new());
1302        assert!(cfg.validate().is_err());
1303    }
1304
1305    #[test]
1306    fn test_rdkafka_config() {
1307        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1308            ("auto.offset.reset", "latest"),
1309            ("kafka.fetch.min.bytes", "1024"),
1310        ]))
1311        .unwrap();
1312
1313        let rdkafka = cfg.to_rdkafka_config();
1314        assert_eq!(rdkafka.get("bootstrap.servers"), Some("localhost:9092"));
1315        assert_eq!(rdkafka.get("group.id"), Some("test-group"));
1316        assert_eq!(rdkafka.get("enable.auto.commit"), Some("false"));
1317        assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1318        assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1319        assert_eq!(rdkafka.get("security.protocol"), Some("plaintext"));
1320        assert_eq!(rdkafka.get("isolation.level"), Some("read_committed"));
1321    }
1322
1323    #[test]
1324    fn test_rdkafka_config_with_security() {
1325        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1326            ("security.protocol", "sasl_ssl"),
1327            ("sasl.mechanism", "PLAIN"),
1328            ("sasl.username", "user"),
1329            ("sasl.password", "pass"),
1330            ("ssl.ca.location", "/ca.pem"),
1331        ]))
1332        .unwrap();
1333
1334        let rdkafka = cfg.to_rdkafka_config();
1335        assert_eq!(rdkafka.get("security.protocol"), Some("sasl_ssl"));
1336        assert_eq!(rdkafka.get("sasl.mechanism"), Some("PLAIN"));
1337        assert_eq!(rdkafka.get("sasl.username"), Some("user"));
1338        assert_eq!(rdkafka.get("sasl.password"), Some("pass"));
1339        assert_eq!(rdkafka.get("ssl.ca.location"), Some("/ca.pem"));
1340    }
1341
1342    #[test]
1343    fn test_rdkafka_config_with_fetch_tuning() {
1344        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1345            ("fetch.min.bytes", "1024"),
1346            ("fetch.max.bytes", "1048576"),
1347            ("fetch.max.wait.ms", "500"),
1348            ("max.partition.fetch.bytes", "262144"),
1349            ("isolation.level", "read_uncommitted"),
1350        ]))
1351        .unwrap();
1352
1353        let rdkafka = cfg.to_rdkafka_config();
1354        assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1355        assert_eq!(rdkafka.get("fetch.max.bytes"), Some("1048576"));
1356        assert_eq!(rdkafka.get("fetch.wait.max.ms"), Some("500"));
1357        assert_eq!(rdkafka.get("max.partition.fetch.bytes"), Some("262144"));
1358        assert_eq!(rdkafka.get("isolation.level"), Some("read_uncommitted"));
1359    }
1360
1361    #[test]
1362    fn test_offset_reset_parsing() {
1363        assert_eq!(
1364            "earliest".parse::<OffsetReset>().unwrap(),
1365            OffsetReset::Earliest
1366        );
1367        assert_eq!(
1368            "latest".parse::<OffsetReset>().unwrap(),
1369            OffsetReset::Latest
1370        );
1371        assert_eq!("none".parse::<OffsetReset>().unwrap(), OffsetReset::None);
1372        assert!("invalid".parse::<OffsetReset>().is_err());
1373    }
1374
1375    #[test]
1376    fn test_compatibility_level_parsing() {
1377        assert_eq!(
1378            "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
1379            CompatibilityLevel::Backward
1380        );
1381        assert_eq!(
1382            "full_transitive".parse::<CompatibilityLevel>().unwrap(),
1383            CompatibilityLevel::FullTransitive
1384        );
1385        assert_eq!(
1386            "NONE".parse::<CompatibilityLevel>().unwrap(),
1387            CompatibilityLevel::None
1388        );
1389        assert!("invalid".parse::<CompatibilityLevel>().is_err());
1390    }
1391
1392    #[test]
1393    fn test_security_protocol_parsing() {
1394        assert_eq!(
1395            "plaintext".parse::<SecurityProtocol>().unwrap(),
1396            SecurityProtocol::Plaintext
1397        );
1398        assert_eq!(
1399            "SSL".parse::<SecurityProtocol>().unwrap(),
1400            SecurityProtocol::Ssl
1401        );
1402        assert_eq!(
1403            "sasl_plaintext".parse::<SecurityProtocol>().unwrap(),
1404            SecurityProtocol::SaslPlaintext
1405        );
1406        assert_eq!(
1407            "SASL_SSL".parse::<SecurityProtocol>().unwrap(),
1408            SecurityProtocol::SaslSsl
1409        );
1410        assert_eq!(
1411            "sasl-ssl".parse::<SecurityProtocol>().unwrap(),
1412            SecurityProtocol::SaslSsl
1413        );
1414        assert!("invalid".parse::<SecurityProtocol>().is_err());
1415    }
1416
1417    #[test]
1418    fn test_sasl_mechanism_parsing() {
1419        assert_eq!(
1420            "PLAIN".parse::<SaslMechanism>().unwrap(),
1421            SaslMechanism::Plain
1422        );
1423        assert_eq!(
1424            "SCRAM-SHA-256".parse::<SaslMechanism>().unwrap(),
1425            SaslMechanism::ScramSha256
1426        );
1427        assert_eq!(
1428            "scram_sha_512".parse::<SaslMechanism>().unwrap(),
1429            SaslMechanism::ScramSha512
1430        );
1431        assert_eq!(
1432            "GSSAPI".parse::<SaslMechanism>().unwrap(),
1433            SaslMechanism::Gssapi
1434        );
1435        assert_eq!(
1436            "OAUTHBEARER".parse::<SaslMechanism>().unwrap(),
1437            SaslMechanism::Oauthbearer
1438        );
1439        assert!("invalid".parse::<SaslMechanism>().is_err());
1440    }
1441
1442    #[test]
1443    fn test_isolation_level_parsing() {
1444        assert_eq!(
1445            "read_uncommitted".parse::<IsolationLevel>().unwrap(),
1446            IsolationLevel::ReadUncommitted
1447        );
1448        assert_eq!(
1449            "read_committed".parse::<IsolationLevel>().unwrap(),
1450            IsolationLevel::ReadCommitted
1451        );
1452        assert_eq!(
1453            "read-committed".parse::<IsolationLevel>().unwrap(),
1454            IsolationLevel::ReadCommitted
1455        );
1456        assert!("invalid".parse::<IsolationLevel>().is_err());
1457    }
1458
1459    #[test]
1460    fn test_topic_subscription_accessors() {
1461        let topics = TopicSubscription::Topics(vec!["a".into(), "b".into()]);
1462        assert_eq!(
1463            topics.topics(),
1464            Some(&["a".to_string(), "b".to_string()][..])
1465        );
1466        assert!(topics.pattern().is_none());
1467        assert!(!topics.is_pattern());
1468
1469        let pattern = TopicSubscription::Pattern("events-.*".into());
1470        assert!(pattern.topics().is_none());
1471        assert_eq!(pattern.pattern(), Some("events-.*"));
1472        assert!(pattern.is_pattern());
1473    }
1474
1475    #[test]
1476    fn test_security_protocol_helpers() {
1477        assert!(!SecurityProtocol::Plaintext.uses_ssl());
1478        assert!(!SecurityProtocol::Plaintext.uses_sasl());
1479
1480        assert!(SecurityProtocol::Ssl.uses_ssl());
1481        assert!(!SecurityProtocol::Ssl.uses_sasl());
1482
1483        assert!(!SecurityProtocol::SaslPlaintext.uses_ssl());
1484        assert!(SecurityProtocol::SaslPlaintext.uses_sasl());
1485
1486        assert!(SecurityProtocol::SaslSsl.uses_ssl());
1487        assert!(SecurityProtocol::SaslSsl.uses_sasl());
1488    }
1489
1490    #[test]
1491    fn test_sasl_mechanism_helpers() {
1492        assert!(SaslMechanism::Plain.requires_credentials());
1493        assert!(SaslMechanism::ScramSha256.requires_credentials());
1494        assert!(SaslMechanism::ScramSha512.requires_credentials());
1495        assert!(!SaslMechanism::Gssapi.requires_credentials());
1496        assert!(!SaslMechanism::Oauthbearer.requires_credentials());
1497    }
1498
1499    #[test]
1500    fn test_enum_display() {
1501        assert_eq!(SecurityProtocol::SaslSsl.to_string(), "sasl_ssl");
1502        assert_eq!(SaslMechanism::ScramSha256.to_string(), "SCRAM-SHA-256");
1503        assert_eq!(IsolationLevel::ReadCommitted.to_string(), "read_committed");
1504    }
1505
1506    #[test]
1507    fn test_startup_mode_parsing() {
1508        assert_eq!(
1509            "group-offsets".parse::<StartupMode>().unwrap(),
1510            StartupMode::GroupOffsets
1511        );
1512        assert_eq!(
1513            "group_offsets".parse::<StartupMode>().unwrap(),
1514            StartupMode::GroupOffsets
1515        );
1516        assert_eq!(
1517            "earliest".parse::<StartupMode>().unwrap(),
1518            StartupMode::Earliest
1519        );
1520        assert_eq!(
1521            "latest".parse::<StartupMode>().unwrap(),
1522            StartupMode::Latest
1523        );
1524        assert!("invalid".parse::<StartupMode>().is_err());
1525    }
1526
1527    #[test]
1528    fn test_startup_mode_display() {
1529        assert_eq!(StartupMode::GroupOffsets.to_string(), "group-offsets");
1530        assert_eq!(StartupMode::Earliest.to_string(), "earliest");
1531        assert_eq!(StartupMode::Latest.to_string(), "latest");
1532
1533        let specific = StartupMode::SpecificOffsets(HashMap::from([(0, 100), (1, 200)]));
1534        assert!(specific.to_string().contains("2 partitions"));
1535
1536        let ts = StartupMode::Timestamp(1234567890000);
1537        assert!(ts.to_string().contains("1234567890000"));
1538    }
1539
1540    #[test]
1541    fn test_startup_mode_helpers() {
1542        assert!(!StartupMode::GroupOffsets.overrides_offset_reset());
1543        assert!(StartupMode::Earliest.overrides_offset_reset());
1544        assert!(StartupMode::Latest.overrides_offset_reset());
1545        assert!(StartupMode::SpecificOffsets(HashMap::new()).overrides_offset_reset());
1546        assert!(StartupMode::Timestamp(0).overrides_offset_reset());
1547
1548        assert!(StartupMode::GroupOffsets.as_offset_reset().is_none());
1549        assert_eq!(StartupMode::Earliest.as_offset_reset(), Some("earliest"));
1550        assert_eq!(StartupMode::Latest.as_offset_reset(), Some("latest"));
1551        assert!(StartupMode::SpecificOffsets(HashMap::new())
1552            .as_offset_reset()
1553            .is_none());
1554        assert!(StartupMode::Timestamp(0).as_offset_reset().is_none());
1555    }
1556
1557    #[test]
1558    fn test_parse_specific_offsets() {
1559        let offsets = parse_specific_offsets("0:100,1:200,2:300").unwrap();
1560        assert_eq!(offsets.get(&0), Some(&100));
1561        assert_eq!(offsets.get(&1), Some(&200));
1562        assert_eq!(offsets.get(&2), Some(&300));
1563    }
1564
1565    #[test]
1566    fn test_parse_specific_offsets_with_spaces() {
1567        let offsets = parse_specific_offsets(" 0:100 , 1:200 ").unwrap();
1568        assert_eq!(offsets.get(&0), Some(&100));
1569        assert_eq!(offsets.get(&1), Some(&200));
1570    }
1571
1572    #[test]
1573    fn test_parse_specific_offsets_errors() {
1574        assert!(parse_specific_offsets("").is_err());
1575        assert!(parse_specific_offsets("0").is_err());
1576        assert!(parse_specific_offsets("0:abc").is_err());
1577        assert!(parse_specific_offsets("abc:100").is_err());
1578        assert!(parse_specific_offsets("0:100:extra").is_err());
1579    }
1580
1581    #[test]
1582    fn test_parse_startup_mode_from_config() {
1583        let cfg =
1584            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1585        assert_eq!(cfg.startup_mode, StartupMode::Earliest);
1586
1587        let cfg =
1588            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1589        assert_eq!(cfg.startup_mode, StartupMode::Latest);
1590
1591        let cfg =
1592            KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "group-offsets")]))
1593                .unwrap();
1594        assert_eq!(cfg.startup_mode, StartupMode::GroupOffsets);
1595    }
1596
1597    #[test]
1598    fn test_parse_startup_specific_offsets_from_config() {
1599        let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1600            "startup.specific.offsets",
1601            "0:100,1:200",
1602        )]))
1603        .unwrap();
1604
1605        match cfg.startup_mode {
1606            StartupMode::SpecificOffsets(offsets) => {
1607                assert_eq!(offsets.get(&0), Some(&100));
1608                assert_eq!(offsets.get(&1), Some(&200));
1609            }
1610            _ => panic!("expected SpecificOffsets"),
1611        }
1612    }
1613
1614    #[test]
1615    fn test_parse_startup_timestamp_from_config() {
1616        let cfg =
1617            KafkaSourceConfig::from_config(&make_config(&[("startup.timestamp.ms", "1234567890")]))
1618                .unwrap();
1619
1620        assert_eq!(cfg.startup_mode, StartupMode::Timestamp(1234567890));
1621    }
1622
1623    #[test]
1624    fn test_parse_schema_registry_ssl_fields() {
1625        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1626            ("schema.registry.url", "https://sr:8081"),
1627            ("schema.registry.ssl.ca.location", "/ca.pem"),
1628            ("schema.registry.ssl.certificate.location", "/cert.pem"),
1629            ("schema.registry.ssl.key.location", "/key.pem"),
1630        ]))
1631        .unwrap();
1632
1633        assert_eq!(
1634            cfg.schema_registry_ssl_ca_location,
1635            Some("/ca.pem".to_string())
1636        );
1637        assert_eq!(
1638            cfg.schema_registry_ssl_certificate_location,
1639            Some("/cert.pem".to_string())
1640        );
1641        assert_eq!(
1642            cfg.schema_registry_ssl_key_location,
1643            Some("/key.pem".to_string())
1644        );
1645    }
1646
1647    #[test]
1648    fn test_parse_watermark_defaults() {
1649        let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1650        assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(5));
1651        assert_eq!(cfg.idle_timeout, Duration::from_secs(30));
1652        assert!(!cfg.enable_watermark_tracking);
1653        assert!(cfg.alignment_group_id.is_none());
1654        assert!(cfg.alignment_max_drift.is_none());
1655        assert!(cfg.alignment_mode.is_none());
1656    }
1657
1658    #[test]
1659    fn test_parse_watermark_tracking_enabled() {
1660        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1661            ("enable.watermark.tracking", "true"),
1662            ("max.out.of.orderness.ms", "10000"),
1663            ("idle.timeout.ms", "60000"),
1664        ]))
1665        .unwrap();
1666
1667        assert!(cfg.enable_watermark_tracking);
1668        assert_eq!(cfg.max_out_of_orderness, Duration::from_millis(10_000));
1669        assert_eq!(cfg.idle_timeout, Duration::from_millis(60_000));
1670    }
1671
1672    #[test]
1673    fn test_parse_alignment_config() {
1674        use crate::kafka::KafkaAlignmentMode;
1675
1676        let cfg = KafkaSourceConfig::from_config(&make_config(&[
1677            ("alignment.group.id", "orders-payments"),
1678            ("alignment.max.drift.ms", "300000"),
1679            ("alignment.mode", "pause"),
1680        ]))
1681        .unwrap();
1682
1683        assert_eq!(cfg.alignment_group_id, Some("orders-payments".to_string()));
1684        assert_eq!(
1685            cfg.alignment_max_drift,
1686            Some(Duration::from_millis(300_000))
1687        );
1688        assert_eq!(cfg.alignment_mode, Some(KafkaAlignmentMode::Pause));
1689    }
1690
1691    #[test]
1692    fn test_parse_alignment_mode_variants() {
1693        use crate::kafka::KafkaAlignmentMode;
1694
1695        let cfg = KafkaSourceConfig::from_config(&make_config(&[("alignment.mode", "warn-only")]))
1696            .unwrap();
1697        assert_eq!(cfg.alignment_mode, Some(KafkaAlignmentMode::WarnOnly));
1698
1699        let cfg =
1700            KafkaSourceConfig::from_config(&make_config(&[("alignment.mode", "drop-excess")]))
1701                .unwrap();
1702        assert_eq!(cfg.alignment_mode, Some(KafkaAlignmentMode::DropExcess));
1703    }
1704
1705    #[test]
1706    fn test_parse_alignment_mode_invalid() {
1707        let config = make_config(&[("alignment.mode", "invalid")]);
1708        assert!(KafkaSourceConfig::from_config(&config).is_err());
1709    }
1710}