1use std::collections::HashMap;
8use std::time::Duration;
9
10use rdkafka::config::ClientConfig;
11
12use crate::config::ConnectorConfig;
13use crate::error::ConnectorError;
14use crate::serde::Format;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub enum SecurityProtocol {
21 #[default]
23 Plaintext,
24 Ssl,
26 SaslPlaintext,
28 SaslSsl,
30}
31
32impl SecurityProtocol {
33 #[must_use]
35 pub fn as_rdkafka_str(&self) -> &'static str {
36 match self {
37 SecurityProtocol::Plaintext => "plaintext",
38 SecurityProtocol::Ssl => "ssl",
39 SecurityProtocol::SaslPlaintext => "sasl_plaintext",
40 SecurityProtocol::SaslSsl => "sasl_ssl",
41 }
42 }
43
44 #[must_use]
46 pub fn uses_ssl(&self) -> bool {
47 matches!(self, SecurityProtocol::Ssl | SecurityProtocol::SaslSsl)
48 }
49
50 #[must_use]
52 pub fn uses_sasl(&self) -> bool {
53 matches!(
54 self,
55 SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl
56 )
57 }
58}
59
60str_enum!(fromstr SecurityProtocol, lowercase, ConnectorError, "invalid security.protocol",
61 Plaintext => "plaintext";
62 Ssl => "ssl";
63 SaslPlaintext => "sasl_plaintext";
64 SaslSsl => "sasl_ssl"
65);
66
67impl std::fmt::Display for SecurityProtocol {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 write!(f, "{}", self.as_rdkafka_str())
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
75pub enum SaslMechanism {
76 #[default]
78 Plain,
79 ScramSha256,
81 ScramSha512,
83 Gssapi,
85 Oauthbearer,
87}
88
89impl SaslMechanism {
90 #[must_use]
92 pub fn as_rdkafka_str(&self) -> &'static str {
93 match self {
94 SaslMechanism::Plain => "PLAIN",
95 SaslMechanism::ScramSha256 => "SCRAM-SHA-256",
96 SaslMechanism::ScramSha512 => "SCRAM-SHA-512",
97 SaslMechanism::Gssapi => "GSSAPI",
98 SaslMechanism::Oauthbearer => "OAUTHBEARER",
99 }
100 }
101
102 #[must_use]
104 pub fn requires_credentials(&self) -> bool {
105 matches!(
106 self,
107 SaslMechanism::Plain | SaslMechanism::ScramSha256 | SaslMechanism::ScramSha512
108 )
109 }
110}
111
112str_enum!(fromstr SaslMechanism, uppercase, ConnectorError, "invalid sasl.mechanism",
113 Plain => "PLAIN";
114 ScramSha256 => "SCRAM_SHA_256", "SCRAM_SHA256";
115 ScramSha512 => "SCRAM_SHA_512", "SCRAM_SHA512";
116 Gssapi => "GSSAPI", "KERBEROS";
117 Oauthbearer => "OAUTHBEARER", "OAUTH"
118);
119
120impl std::fmt::Display for SaslMechanism {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 write!(f, "{}", self.as_rdkafka_str())
123 }
124}
125
126#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
130pub enum IsolationLevel {
131 ReadUncommitted,
133 #[default]
135 ReadCommitted,
136}
137
138impl IsolationLevel {
139 #[must_use]
141 pub fn as_rdkafka_str(&self) -> &'static str {
142 match self {
143 IsolationLevel::ReadUncommitted => "read_uncommitted",
144 IsolationLevel::ReadCommitted => "read_committed",
145 }
146 }
147}
148
149str_enum!(fromstr IsolationLevel, lowercase, ConnectorError, "invalid isolation.level",
150 ReadUncommitted => "read_uncommitted";
151 ReadCommitted => "read_committed"
152);
153
154impl std::fmt::Display for IsolationLevel {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 write!(f, "{}", self.as_rdkafka_str())
157 }
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Default)]
166pub enum StartupMode {
167 #[default]
169 GroupOffsets,
170 Earliest,
172 Latest,
174 SpecificOffsets(HashMap<i32, i64>),
176 Timestamp(i64),
179}
180
181impl StartupMode {
182 #[must_use]
184 pub fn overrides_offset_reset(&self) -> bool {
185 !matches!(self, StartupMode::GroupOffsets)
186 }
187
188 #[must_use]
190 pub fn as_offset_reset(&self) -> Option<&'static str> {
191 match self {
192 StartupMode::Earliest => Some("earliest"),
193 StartupMode::Latest => Some("latest"),
194 StartupMode::GroupOffsets
195 | StartupMode::SpecificOffsets(_)
196 | StartupMode::Timestamp(_) => None,
197 }
198 }
199}
200
201impl std::str::FromStr for StartupMode {
202 type Err = ConnectorError;
203
204 fn from_str(s: &str) -> Result<Self, Self::Err> {
205 match s.to_lowercase().replace('-', "_").as_str() {
206 "group_offsets" | "group" => Ok(StartupMode::GroupOffsets),
207 "earliest" => Ok(StartupMode::Earliest),
208 "latest" => Ok(StartupMode::Latest),
209 other => Err(ConnectorError::ConfigurationError(format!(
210 "invalid startup.mode: '{other}' (expected group-offsets/earliest/latest)"
211 ))),
212 }
213 }
214}
215
216impl std::fmt::Display for StartupMode {
217 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
218 match self {
219 StartupMode::GroupOffsets => write!(f, "group-offsets"),
220 StartupMode::Earliest => write!(f, "earliest"),
221 StartupMode::Latest => write!(f, "latest"),
222 StartupMode::SpecificOffsets(offsets) => {
223 write!(f, "specific-offsets({} partitions)", offsets.len())
224 }
225 StartupMode::Timestamp(ts) => write!(f, "timestamp({ts})"),
226 }
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
232pub enum TopicSubscription {
233 Topics(Vec<String>),
235 Pattern(String),
237}
238
239impl TopicSubscription {
240 #[must_use]
242 pub fn topics(&self) -> Option<&[String]> {
243 match self {
244 TopicSubscription::Topics(t) => Some(t),
245 TopicSubscription::Pattern(_) => None,
246 }
247 }
248
249 #[must_use]
251 pub fn pattern(&self) -> Option<&str> {
252 match self {
253 TopicSubscription::Topics(_) => None,
254 TopicSubscription::Pattern(p) => Some(p),
255 }
256 }
257
258 #[must_use]
260 pub fn is_pattern(&self) -> bool {
261 matches!(self, TopicSubscription::Pattern(_))
262 }
263}
264
265impl Default for TopicSubscription {
266 fn default() -> Self {
267 TopicSubscription::Topics(Vec::new())
268 }
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub enum OffsetReset {
274 Earliest,
276 Latest,
278 None,
280}
281
282impl OffsetReset {
283 #[must_use]
285 pub fn as_rdkafka_str(&self) -> &'static str {
286 match self {
287 OffsetReset::Earliest => "earliest",
288 OffsetReset::Latest => "latest",
289 OffsetReset::None => "error",
290 }
291 }
292}
293
294str_enum!(fromstr OffsetReset, lowercase_nodash, ConnectorError, "invalid auto.offset.reset",
295 Earliest => "earliest", "beginning";
296 Latest => "latest", "end";
297 None => "none", "error"
298);
299
300#[derive(Debug, Clone, Copy, PartialEq, Eq)]
302pub enum AssignmentStrategy {
303 Range,
305 RoundRobin,
307 CooperativeSticky,
309}
310
311impl AssignmentStrategy {
312 #[must_use]
314 pub fn as_rdkafka_str(&self) -> &'static str {
315 match self {
316 AssignmentStrategy::Range => "range",
317 AssignmentStrategy::RoundRobin => "roundrobin",
318 AssignmentStrategy::CooperativeSticky => "cooperative-sticky",
319 }
320 }
321}
322
323str_enum!(fromstr AssignmentStrategy, lowercase_nodash, ConnectorError,
324 "invalid partition.assignment.strategy",
325 Range => "range";
326 RoundRobin => "roundrobin", "round-robin", "round_robin";
327 CooperativeSticky => "cooperative-sticky", "cooperative_sticky"
328);
329
330#[derive(Debug, Clone, Copy, PartialEq, Eq)]
332pub enum CompatibilityLevel {
333 Backward,
335 BackwardTransitive,
337 Forward,
339 ForwardTransitive,
341 Full,
343 FullTransitive,
345 None,
347}
348
349impl CompatibilityLevel {
350 #[must_use]
352 pub fn as_str(&self) -> &'static str {
353 match self {
354 CompatibilityLevel::Backward => "BACKWARD",
355 CompatibilityLevel::BackwardTransitive => "BACKWARD_TRANSITIVE",
356 CompatibilityLevel::Forward => "FORWARD",
357 CompatibilityLevel::ForwardTransitive => "FORWARD_TRANSITIVE",
358 CompatibilityLevel::Full => "FULL",
359 CompatibilityLevel::FullTransitive => "FULL_TRANSITIVE",
360 CompatibilityLevel::None => "NONE",
361 }
362 }
363}
364
365str_enum!(fromstr CompatibilityLevel, uppercase, ConnectorError, "invalid schema.compatibility",
366 Backward => "BACKWARD";
367 BackwardTransitive => "BACKWARD_TRANSITIVE";
368 Forward => "FORWARD";
369 ForwardTransitive => "FORWARD_TRANSITIVE";
370 Full => "FULL";
371 FullTransitive => "FULL_TRANSITIVE";
372 None => "NONE"
373);
374
375impl std::fmt::Display for CompatibilityLevel {
376 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377 write!(f, "{}", self.as_str())
378 }
379}
380
381#[derive(Clone)]
383pub struct SrAuth {
384 pub username: String,
386 pub password: String,
388}
389
390impl std::fmt::Debug for SrAuth {
391 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
392 f.debug_struct("SrAuth")
393 .field("username", &self.username)
394 .field("password", &"***")
395 .finish()
396 }
397}
398
399#[derive(Clone)]
404pub struct KafkaSourceConfig {
405 pub bootstrap_servers: String,
408 pub group_id: String,
410 pub subscription: TopicSubscription,
412
413 pub security_protocol: SecurityProtocol,
416 pub sasl_mechanism: Option<SaslMechanism>,
418 pub sasl_username: Option<String>,
420 pub sasl_password: Option<String>,
422 pub ssl_ca_location: Option<String>,
424 pub ssl_certificate_location: Option<String>,
426 pub ssl_key_location: Option<String>,
428 pub ssl_key_password: Option<String>,
430
431 pub format: Format,
434 pub schema_registry_url: Option<String>,
436 pub schema_registry_auth: Option<SrAuth>,
438 pub schema_compatibility: Option<CompatibilityLevel>,
440 pub schema_registry_ssl_ca_location: Option<String>,
442 pub schema_registry_ssl_certificate_location: Option<String>,
444 pub schema_registry_ssl_key_location: Option<String>,
446 pub event_time_column: Option<String>,
448 pub include_metadata: bool,
450 pub include_headers: bool,
452
453 pub startup_mode: StartupMode,
456 pub auto_offset_reset: OffsetReset,
458 pub isolation_level: IsolationLevel,
460 pub max_poll_records: usize,
462 pub partition_assignment_strategy: AssignmentStrategy,
464 pub fetch_min_bytes: Option<i32>,
466 pub fetch_max_bytes: Option<i32>,
468 pub fetch_max_wait_ms: Option<i32>,
470 pub max_partition_fetch_bytes: Option<i32>,
472
473 pub max_out_of_orderness: Duration,
476 pub idle_timeout: Duration,
478 pub enable_watermark_tracking: bool,
480 pub alignment_group_id: Option<String>,
482 pub alignment_max_drift: Option<Duration>,
484 pub alignment_mode: Option<super::watermarks::KafkaAlignmentMode>,
486
487 pub broker_commit_interval: Duration,
495
496 pub backpressure_high_watermark: f64,
499 pub backpressure_low_watermark: f64,
501
502 pub kafka_properties: HashMap<String, String>,
505}
506
507impl std::fmt::Debug for KafkaSourceConfig {
508 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
509 f.debug_struct("KafkaSourceConfig")
510 .field("bootstrap_servers", &self.bootstrap_servers)
511 .field("group_id", &self.group_id)
512 .field("subscription", &self.subscription)
513 .field("format", &self.format)
514 .field("security_protocol", &self.security_protocol)
515 .field("sasl_mechanism", &self.sasl_mechanism)
516 .field("sasl_username", &self.sasl_username)
517 .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
518 .field(
519 "ssl_key_password",
520 &self.ssl_key_password.as_ref().map(|_| "***"),
521 )
522 .field("max_poll_records", &self.max_poll_records)
523 .finish_non_exhaustive()
524 }
525}
526
527impl Default for KafkaSourceConfig {
528 fn default() -> Self {
529 Self {
530 bootstrap_servers: String::new(),
531 group_id: String::new(),
532 subscription: TopicSubscription::default(),
533 security_protocol: SecurityProtocol::default(),
534 sasl_mechanism: None,
535 sasl_username: None,
536 sasl_password: None,
537 ssl_ca_location: None,
538 ssl_certificate_location: None,
539 ssl_key_location: None,
540 ssl_key_password: None,
541 format: Format::Json,
542 schema_registry_url: None,
543 schema_registry_auth: None,
544 schema_compatibility: None,
545 schema_registry_ssl_ca_location: None,
546 schema_registry_ssl_certificate_location: None,
547 schema_registry_ssl_key_location: None,
548 event_time_column: None,
549 include_metadata: false,
550 include_headers: false,
551 startup_mode: StartupMode::default(),
552 auto_offset_reset: OffsetReset::Earliest,
553 isolation_level: IsolationLevel::default(),
554 max_poll_records: 1000,
555 partition_assignment_strategy: AssignmentStrategy::Range,
556 fetch_min_bytes: None,
557 fetch_max_bytes: None,
558 fetch_max_wait_ms: None,
559 max_partition_fetch_bytes: None,
560 max_out_of_orderness: Duration::from_secs(5),
561 idle_timeout: Duration::from_secs(30),
562 enable_watermark_tracking: false,
563 alignment_group_id: None,
564 alignment_max_drift: None,
565 alignment_mode: None,
566 broker_commit_interval: Duration::from_secs(60),
567 backpressure_high_watermark: 0.8,
568 backpressure_low_watermark: 0.5,
569 kafka_properties: HashMap::new(),
570 }
571 }
572}
573
574impl KafkaSourceConfig {
575 #[allow(deprecated, clippy::too_many_lines)]
581 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
582 let bootstrap_servers = config.require("bootstrap.servers")?.to_string();
583 let group_id = config.require("group.id")?.to_string();
584
585 let subscription = if let Some(pattern) = config.get("topic.pattern") {
586 TopicSubscription::Pattern(pattern.to_string())
587 } else {
588 let topics_str = config.require("topic")?;
589 let topics: Vec<String> = topics_str
590 .split(',')
591 .map(|s| s.trim().to_string())
592 .collect();
593 TopicSubscription::Topics(topics.clone())
594 };
595
596 let security_protocol = match config.get("security.protocol") {
597 Some(s) => s.parse::<SecurityProtocol>()?,
598 None => SecurityProtocol::default(),
599 };
600
601 let sasl_mechanism = match config.get("sasl.mechanism") {
602 Some(s) => Some(s.parse::<SaslMechanism>()?),
603 None => None,
604 };
605
606 let sasl_username = config.get("sasl.username").map(String::from);
607 let sasl_password = config.get("sasl.password").map(String::from);
608 let ssl_ca_location = config.get("ssl.ca.location").map(String::from);
609 let ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
610 let ssl_key_location = config.get("ssl.key.location").map(String::from);
611 let ssl_key_password = config.get("ssl.key.password").map(String::from);
612
613 let format = match config.get("format") {
614 Some(f) => f
615 .parse::<Format>()
616 .map_err(|e| ConnectorError::ConfigurationError(e.to_string()))?,
617 None => Format::Json,
618 };
619
620 let schema_registry_url = config.get("schema.registry.url").map(String::from);
621
622 let schema_registry_auth = match (
623 config.get("schema.registry.username"),
624 config.get("schema.registry.password"),
625 ) {
626 (Some(u), Some(p)) => Some(SrAuth {
627 username: u.to_string(),
628 password: p.to_string(),
629 }),
630 (Some(_), None) | (None, Some(_)) => {
631 return Err(ConnectorError::ConfigurationError(
632 "schema.registry.username and schema.registry.password must both be set"
633 .to_string(),
634 ));
635 }
636 (None, None) => None,
637 };
638
639 let schema_compatibility = match config.get("schema.compatibility") {
640 Some(s) => Some(s.parse::<CompatibilityLevel>()?),
641 None => None,
642 };
643
644 let schema_registry_ssl_ca_location = config
645 .get("schema.registry.ssl.ca.location")
646 .map(String::from);
647 let schema_registry_ssl_certificate_location = config
648 .get("schema.registry.ssl.certificate.location")
649 .map(String::from);
650 let schema_registry_ssl_key_location = config
651 .get("schema.registry.ssl.key.location")
652 .map(String::from);
653
654 let event_time_column = config.get("event.time.column").map(String::from);
655
656 let include_metadata = config
657 .get_parsed::<bool>("include.metadata")?
658 .unwrap_or(false);
659
660 let include_headers = config
661 .get_parsed::<bool>("include.headers")?
662 .unwrap_or(false);
663
664 let startup_mode = if let Some(offsets_str) = config.get("startup.specific.offsets") {
665 let offsets = parse_specific_offsets(offsets_str)?;
666 StartupMode::SpecificOffsets(offsets)
667 } else if let Some(ts_str) = config.get("startup.timestamp.ms") {
668 let ts: i64 = ts_str.parse().map_err(|_| {
669 ConnectorError::ConfigurationError(format!(
670 "invalid startup.timestamp.ms: '{ts_str}'"
671 ))
672 })?;
673 StartupMode::Timestamp(ts)
674 } else {
675 match config.get("startup.mode") {
676 Some(s) => s.parse::<StartupMode>()?,
677 None => StartupMode::default(),
678 }
679 };
680
681 let auto_offset_reset = match config.get("auto.offset.reset") {
682 Some(s) => s.parse::<OffsetReset>()?,
683 None => OffsetReset::Earliest,
684 };
685
686 let isolation_level = match config.get("isolation.level") {
687 Some(s) => s.parse::<IsolationLevel>()?,
688 None => IsolationLevel::default(),
689 };
690
691 let max_poll_records = config
692 .get_parsed::<usize>("max.poll.records")?
693 .unwrap_or(1000);
694
695 let partition_assignment_strategy = match config.get("partition.assignment.strategy") {
696 Some(s) => s.parse::<AssignmentStrategy>()?,
697 None => AssignmentStrategy::Range,
698 };
699
700 let fetch_min_bytes = config.get_parsed::<i32>("fetch.min.bytes")?;
701 let fetch_max_bytes = config.get_parsed::<i32>("fetch.max.bytes")?;
702 let fetch_max_wait_ms = config.get_parsed::<i32>("fetch.max.wait.ms")?;
703 let max_partition_fetch_bytes = config.get_parsed::<i32>("max.partition.fetch.bytes")?;
704
705 let max_out_of_orderness_ms = config
706 .get_parsed::<u64>("max.out.of.orderness.ms")?
707 .unwrap_or(5000);
708
709 let idle_timeout_ms = config
710 .get_parsed::<u64>("idle.timeout.ms")?
711 .unwrap_or(30_000);
712
713 let enable_watermark_tracking = config
714 .get_parsed::<bool>("enable.watermark.tracking")?
715 .unwrap_or(false);
716
717 let alignment_group_id = config.get("alignment.group.id").map(String::from);
718
719 let alignment_max_drift_ms = config.get_parsed::<u64>("alignment.max.drift.ms")?;
720
721 let alignment_mode = match config.get("alignment.mode") {
722 Some(s) => Some(
723 s.parse::<super::watermarks::KafkaAlignmentMode>()
724 .map_err(ConnectorError::ConfigurationError)?,
725 ),
726 None => None,
727 };
728
729 let broker_commit_interval_ms = config
730 .get_parsed::<u64>("broker.commit.interval.ms")?
731 .unwrap_or(60_000);
732
733 let backpressure_high_watermark = config
734 .get_parsed::<f64>("backpressure.high.watermark")?
735 .unwrap_or(0.8);
736
737 let backpressure_low_watermark = config
738 .get_parsed::<f64>("backpressure.low.watermark")?
739 .unwrap_or(0.5);
740
741 let kafka_properties = config.properties_with_prefix("kafka.");
742
743 let cfg = Self {
744 bootstrap_servers,
745 group_id,
746 subscription,
747 security_protocol,
748 sasl_mechanism,
749 sasl_username,
750 sasl_password,
751 ssl_ca_location,
752 ssl_certificate_location,
753 ssl_key_location,
754 ssl_key_password,
755 format,
756 schema_registry_url,
757 schema_registry_auth,
758 schema_compatibility,
759 schema_registry_ssl_ca_location,
760 schema_registry_ssl_certificate_location,
761 schema_registry_ssl_key_location,
762 event_time_column,
763 include_metadata,
764 include_headers,
765 startup_mode,
766 auto_offset_reset,
767 isolation_level,
768 max_poll_records,
769 partition_assignment_strategy,
770 fetch_min_bytes,
771 fetch_max_bytes,
772 fetch_max_wait_ms,
773 max_partition_fetch_bytes,
774 max_out_of_orderness: Duration::from_millis(max_out_of_orderness_ms),
775 idle_timeout: Duration::from_millis(idle_timeout_ms),
776 enable_watermark_tracking,
777 alignment_group_id,
778 alignment_max_drift: alignment_max_drift_ms.map(Duration::from_millis),
779 alignment_mode,
780 broker_commit_interval: Duration::from_millis(broker_commit_interval_ms),
781 backpressure_high_watermark,
782 backpressure_low_watermark,
783 kafka_properties,
784 };
785
786 cfg.validate()?;
787 Ok(cfg)
788 }
789
790 pub fn validate(&self) -> Result<(), ConnectorError> {
796 if self.bootstrap_servers.is_empty() {
797 return Err(ConnectorError::ConfigurationError(
798 "bootstrap.servers cannot be empty".into(),
799 ));
800 }
801 if self.group_id.is_empty() {
802 return Err(ConnectorError::ConfigurationError(
803 "group.id cannot be empty".into(),
804 ));
805 }
806
807 match &self.subscription {
809 TopicSubscription::Topics(t) if t.is_empty() => {
810 return Err(ConnectorError::ConfigurationError(
811 "at least one topic is required (or use topic.pattern)".into(),
812 ));
813 }
814 TopicSubscription::Pattern(p) if p.is_empty() => {
815 return Err(ConnectorError::ConfigurationError(
816 "topic.pattern cannot be empty".into(),
817 ));
818 }
819 _ => {}
820 }
821
822 if self.max_poll_records == 0 {
823 return Err(ConnectorError::ConfigurationError(
824 "max.poll.records must be > 0".into(),
825 ));
826 }
827
828 if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
829 return Err(ConnectorError::ConfigurationError(
830 "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
831 .into(),
832 ));
833 }
834
835 if let Some(mechanism) = &self.sasl_mechanism {
836 if mechanism.requires_credentials()
837 && (self.sasl_username.is_none() || self.sasl_password.is_none())
838 {
839 return Err(ConnectorError::ConfigurationError(format!(
840 "sasl.username and sasl.password are required for {mechanism} mechanism"
841 )));
842 }
843 }
844
845 if self.security_protocol.uses_ssl() {
846 if let Some(ref ca) = self.ssl_ca_location {
847 if ca.is_empty() {
848 return Err(ConnectorError::ConfigurationError(
849 "ssl.ca.location cannot be empty when specified".into(),
850 ));
851 }
852 }
853 }
854
855 if self.backpressure_high_watermark <= self.backpressure_low_watermark {
856 return Err(ConnectorError::ConfigurationError(
857 "backpressure.high.watermark must be > backpressure.low.watermark".into(),
858 ));
859 }
860 if !(0.0..=1.0).contains(&self.backpressure_high_watermark) {
861 return Err(ConnectorError::ConfigurationError(
862 "backpressure.high.watermark must be between 0.0 and 1.0".into(),
863 ));
864 }
865 if !(0.0..=1.0).contains(&self.backpressure_low_watermark) {
866 return Err(ConnectorError::ConfigurationError(
867 "backpressure.low.watermark must be between 0.0 and 1.0".into(),
868 ));
869 }
870
871 if self.format == Format::Avro && self.schema_registry_url.is_none() {
872 return Err(ConnectorError::ConfigurationError(
873 "schema.registry.url is required when format is 'avro'".into(),
874 ));
875 }
876
877 Ok(())
878 }
879
880 #[must_use]
882 pub fn to_rdkafka_config(&self) -> ClientConfig {
883 let mut config = ClientConfig::new();
884
885 config.set("bootstrap.servers", &self.bootstrap_servers);
886 config.set("group.id", &self.group_id);
887 config.set("enable.auto.commit", "false");
888 config.set("auto.offset.reset", self.auto_offset_reset.as_rdkafka_str());
889 config.set(
890 "partition.assignment.strategy",
891 self.partition_assignment_strategy.as_rdkafka_str(),
892 );
893 config.set("security.protocol", self.security_protocol.as_rdkafka_str());
894
895 if let Some(ref mechanism) = self.sasl_mechanism {
896 config.set("sasl.mechanism", mechanism.as_rdkafka_str());
897 }
898 if let Some(ref username) = self.sasl_username {
899 config.set("sasl.username", username);
900 }
901 if let Some(ref password) = self.sasl_password {
902 config.set("sasl.password", password);
903 }
904 if let Some(ref ca) = self.ssl_ca_location {
905 config.set("ssl.ca.location", ca);
906 }
907 if let Some(ref cert) = self.ssl_certificate_location {
908 config.set("ssl.certificate.location", cert);
909 }
910 if let Some(ref key) = self.ssl_key_location {
911 config.set("ssl.key.location", key);
912 }
913 if let Some(ref key_pass) = self.ssl_key_password {
914 config.set("ssl.key.password", key_pass);
915 }
916
917 config.set("isolation.level", self.isolation_level.as_rdkafka_str());
918
919 if let Some(fetch_min) = self.fetch_min_bytes {
920 config.set("fetch.min.bytes", fetch_min.to_string());
921 }
922
923 if let Some(fetch_max) = self.fetch_max_bytes {
924 config.set("fetch.max.bytes", fetch_max.to_string());
925 }
926
927 if let Some(wait_ms) = self.fetch_max_wait_ms {
928 config.set("fetch.wait.max.ms", wait_ms.to_string());
929 }
930
931 if let Some(partition_max) = self.max_partition_fetch_bytes {
932 config.set("max.partition.fetch.bytes", partition_max.to_string());
933 }
934
935 for (key, value) in &self.kafka_properties {
938 if is_blocked_passthrough_key(key) {
939 tracing::warn!(
940 key,
941 "ignoring kafka.* pass-through property that overrides a security setting"
942 );
943 continue;
944 }
945 config.set(key, value);
946 }
947
948 config
949 }
950}
951
952fn is_blocked_passthrough_key(key: &str) -> bool {
954 key.starts_with("sasl.kerberos.")
955 || matches!(
956 key,
957 "security.protocol"
958 | "sasl.mechanism"
959 | "sasl.username"
960 | "sasl.password"
961 | "sasl.oauthbearer.config"
962 | "ssl.ca.location"
963 | "ssl.certificate.location"
964 | "ssl.key.location"
965 | "ssl.key.password"
966 | "ssl.endpoint.identification.algorithm"
967 | "enable.auto.commit"
968 | "enable.idempotence"
969 )
970}
971
972fn parse_specific_offsets(s: &str) -> Result<HashMap<i32, i64>, ConnectorError> {
976 let mut offsets = HashMap::new();
977
978 for pair in s.split(',') {
979 let pair = pair.trim();
980 if pair.is_empty() {
981 continue;
982 }
983
984 let parts: Vec<&str> = pair.split(':').collect();
985 if parts.len() != 2 {
986 return Err(ConnectorError::ConfigurationError(format!(
987 "invalid offset format '{pair}' (expected 'partition:offset')"
988 )));
989 }
990
991 let partition: i32 = parts[0].trim().parse().map_err(|_| {
992 ConnectorError::ConfigurationError(format!(
993 "invalid partition number '{}' in '{pair}'",
994 parts[0]
995 ))
996 })?;
997
998 let offset: i64 = parts[1].trim().parse().map_err(|_| {
999 ConnectorError::ConfigurationError(format!("invalid offset '{}' in '{pair}'", parts[1]))
1000 })?;
1001
1002 offsets.insert(partition, offset);
1003 }
1004
1005 if offsets.is_empty() {
1006 return Err(ConnectorError::ConfigurationError(
1007 "startup.specific.offsets cannot be empty".into(),
1008 ));
1009 }
1010
1011 Ok(offsets)
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016 use super::*;
1017
1018 fn make_config(extra: &[(&str, &str)]) -> ConnectorConfig {
1019 let mut config = ConnectorConfig::new("kafka");
1020 config.set("bootstrap.servers", "localhost:9092");
1021 config.set("group.id", "test-group");
1022 config.set("topic", "events");
1023 for (k, v) in extra {
1024 config.set(*k, *v);
1025 }
1026 config
1027 }
1028
1029 #[test]
1030 #[allow(deprecated)]
1031 fn test_parse_required_fields() {
1032 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1033 assert_eq!(cfg.bootstrap_servers, "localhost:9092");
1034 assert_eq!(cfg.group_id, "test-group");
1035 assert_eq!(cfg.subscription.topics().unwrap(), &["events"]);
1036 assert!(matches!(
1037 cfg.subscription,
1038 TopicSubscription::Topics(ref t) if t == &["events"]
1039 ));
1040 }
1041
1042 #[test]
1043 fn test_parse_missing_required() {
1044 let config = ConnectorConfig::new("kafka");
1045 assert!(KafkaSourceConfig::from_config(&config).is_err());
1046 }
1047
1048 #[test]
1049 #[allow(deprecated)]
1050 fn test_parse_multi_topic() {
1051 let cfg = KafkaSourceConfig::from_config(&make_config(&[("topic", "a, b, c")])).unwrap();
1052 assert_eq!(cfg.subscription.topics().unwrap(), &["a", "b", "c"]);
1053 assert!(matches!(
1054 cfg.subscription,
1055 TopicSubscription::Topics(ref t) if t == &["a", "b", "c"]
1056 ));
1057 }
1058
1059 #[test]
1060 fn test_parse_topic_pattern() {
1061 let mut config = ConnectorConfig::new("kafka");
1062 config.set("bootstrap.servers", "localhost:9092");
1063 config.set("group.id", "test-group");
1064 config.set("topic.pattern", "events-.*");
1065
1066 let cfg = KafkaSourceConfig::from_config(&config).unwrap();
1067 assert!(matches!(
1068 cfg.subscription,
1069 TopicSubscription::Pattern(ref p) if p == "events-.*"
1070 ));
1071 assert!(cfg.subscription.is_pattern());
1072 assert_eq!(cfg.subscription.pattern(), Some("events-.*"));
1073 assert!(cfg.subscription.topics().is_none());
1074 }
1075
1076 #[test]
1077 fn test_parse_defaults() {
1078 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1079 assert_eq!(cfg.format, Format::Json);
1080 assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1081 assert_eq!(cfg.isolation_level, IsolationLevel::ReadCommitted);
1082 assert_eq!(cfg.max_poll_records, 1000);
1083 assert_eq!(cfg.partition_assignment_strategy, AssignmentStrategy::Range);
1084 assert!(!cfg.include_metadata);
1085 assert!(!cfg.include_headers);
1086 assert!(cfg.schema_registry_url.is_none());
1087 assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
1088 assert!(cfg.sasl_mechanism.is_none());
1089 assert_eq!(cfg.broker_commit_interval, Duration::from_secs(60));
1090 }
1091
1092 #[test]
1093 fn test_parse_broker_commit_interval() {
1094 let cfg =
1095 KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "5000")]))
1096 .unwrap();
1097 assert_eq!(cfg.broker_commit_interval, Duration::from_millis(5000));
1098 }
1099
1100 #[test]
1101 fn test_parse_broker_commit_interval_zero_disables() {
1102 let cfg =
1103 KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "0")]))
1104 .unwrap();
1105 assert!(cfg.broker_commit_interval.is_zero());
1106 }
1107
1108 #[test]
1109 fn test_parse_optional_fields() {
1110 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1111 ("format", "csv"),
1112 ("auto.offset.reset", "latest"),
1113 ("max.poll.records", "500"),
1114 ("include.metadata", "true"),
1115 ("include.headers", "true"),
1116 ("event.time.column", "ts"),
1117 ("partition.assignment.strategy", "roundrobin"),
1118 ("isolation.level", "read_uncommitted"),
1119 ]))
1120 .unwrap();
1121
1122 assert_eq!(cfg.format, Format::Csv);
1123 assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1124 assert_eq!(cfg.isolation_level, IsolationLevel::ReadUncommitted);
1125 assert_eq!(cfg.max_poll_records, 500);
1126 assert!(cfg.include_metadata);
1127 assert!(cfg.include_headers);
1128 assert_eq!(cfg.event_time_column, Some("ts".to_string()));
1129 assert_eq!(
1130 cfg.partition_assignment_strategy,
1131 AssignmentStrategy::RoundRobin
1132 );
1133 }
1134
1135 #[test]
1136 fn test_parse_security_sasl_ssl() {
1137 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1138 ("security.protocol", "sasl_ssl"),
1139 ("sasl.mechanism", "SCRAM-SHA-256"),
1140 ("sasl.username", "alice"),
1141 ("sasl.password", "secret"),
1142 ("ssl.ca.location", "/etc/ssl/ca.pem"),
1143 ]))
1144 .unwrap();
1145
1146 assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
1147 assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha256));
1148 assert_eq!(cfg.sasl_username, Some("alice".to_string()));
1149 assert_eq!(cfg.sasl_password, Some("secret".to_string()));
1150 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1151 }
1152
1153 #[test]
1154 fn test_parse_security_ssl_only() {
1155 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1156 ("security.protocol", "ssl"),
1157 ("ssl.ca.location", "/etc/ssl/ca.pem"),
1158 ("ssl.certificate.location", "/etc/ssl/client.pem"),
1159 ("ssl.key.location", "/etc/ssl/client.key"),
1160 ("ssl.key.password", "keypass"),
1161 ]))
1162 .unwrap();
1163
1164 assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
1165 assert!(cfg.security_protocol.uses_ssl());
1166 assert!(!cfg.security_protocol.uses_sasl());
1167 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1168 assert_eq!(
1169 cfg.ssl_certificate_location,
1170 Some("/etc/ssl/client.pem".to_string())
1171 );
1172 assert_eq!(
1173 cfg.ssl_key_location,
1174 Some("/etc/ssl/client.key".to_string())
1175 );
1176 assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
1177 }
1178
1179 #[test]
1180 fn test_parse_fetch_tuning() {
1181 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1182 ("fetch.min.bytes", "1024"),
1183 ("fetch.max.bytes", "52428800"),
1184 ("fetch.max.wait.ms", "500"),
1185 ("max.partition.fetch.bytes", "1048576"),
1186 ]))
1187 .unwrap();
1188
1189 assert_eq!(cfg.fetch_min_bytes, Some(1024));
1190 assert_eq!(cfg.fetch_max_bytes, Some(52_428_800));
1191 assert_eq!(cfg.fetch_max_wait_ms, Some(500));
1192 assert_eq!(cfg.max_partition_fetch_bytes, Some(1_048_576));
1193 }
1194
1195 #[test]
1196 fn test_parse_kafka_passthrough() {
1197 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1198 ("kafka.session.timeout.ms", "30000"),
1199 ("kafka.max.poll.interval.ms", "300000"),
1200 ]))
1201 .unwrap();
1202
1203 assert_eq!(cfg.kafka_properties.len(), 2);
1204 assert_eq!(
1205 cfg.kafka_properties.get("session.timeout.ms"),
1206 Some(&"30000".to_string())
1207 );
1208 }
1209
1210 #[test]
1211 fn test_parse_schema_registry() {
1212 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1213 ("format", "avro"),
1214 ("schema.registry.url", "http://localhost:8081"),
1215 ("schema.registry.username", "user"),
1216 ("schema.registry.password", "pass"),
1217 ("schema.compatibility", "FULL_TRANSITIVE"),
1218 ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
1219 ]))
1220 .unwrap();
1221
1222 assert_eq!(cfg.format, Format::Avro);
1223 assert_eq!(
1224 cfg.schema_registry_url,
1225 Some("http://localhost:8081".to_string())
1226 );
1227 assert!(cfg.schema_registry_auth.is_some());
1228 let auth = cfg.schema_registry_auth.unwrap();
1229 assert_eq!(auth.username, "user");
1230 assert_eq!(auth.password, "pass");
1231 assert_eq!(
1232 cfg.schema_compatibility,
1233 Some(CompatibilityLevel::FullTransitive)
1234 );
1235 assert_eq!(
1236 cfg.schema_registry_ssl_ca_location,
1237 Some("/etc/ssl/sr-ca.pem".to_string())
1238 );
1239 }
1240
1241 #[test]
1242 fn test_parse_sr_auth_partial() {
1243 let config = make_config(&[
1244 ("schema.registry.url", "http://localhost:8081"),
1245 ("schema.registry.username", "user"),
1246 ]);
1248 assert!(KafkaSourceConfig::from_config(&config).is_err());
1249 }
1250
1251 #[test]
1252 fn test_validate_avro_without_sr() {
1253 let mut cfg = KafkaSourceConfig::default();
1254 cfg.bootstrap_servers = "localhost:9092".into();
1255 cfg.group_id = "g".into();
1256 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1257 cfg.format = Format::Avro;
1258 assert!(cfg.validate().is_err());
1260 }
1261
1262 #[test]
1263 fn test_validate_backpressure_watermarks() {
1264 let mut cfg = KafkaSourceConfig::default();
1265 cfg.bootstrap_servers = "localhost:9092".into();
1266 cfg.group_id = "g".into();
1267 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1268 cfg.backpressure_high_watermark = 0.3;
1269 cfg.backpressure_low_watermark = 0.5;
1270 assert!(cfg.validate().is_err());
1271 }
1272
1273 #[test]
1274 fn test_validate_sasl_without_mechanism() {
1275 let mut cfg = KafkaSourceConfig::default();
1276 cfg.bootstrap_servers = "localhost:9092".into();
1277 cfg.group_id = "g".into();
1278 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1279 cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1280 assert!(cfg.validate().is_err());
1282 }
1283
1284 #[test]
1285 fn test_validate_sasl_plain_without_credentials() {
1286 let mut cfg = KafkaSourceConfig::default();
1287 cfg.bootstrap_servers = "localhost:9092".into();
1288 cfg.group_id = "g".into();
1289 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1290 cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1291 cfg.sasl_mechanism = Some(SaslMechanism::Plain);
1292 assert!(cfg.validate().is_err());
1294 }
1295
1296 #[test]
1297 fn test_validate_empty_topic_pattern() {
1298 let mut cfg = KafkaSourceConfig::default();
1299 cfg.bootstrap_servers = "localhost:9092".into();
1300 cfg.group_id = "g".into();
1301 cfg.subscription = TopicSubscription::Pattern(String::new());
1302 assert!(cfg.validate().is_err());
1303 }
1304
1305 #[test]
1306 fn test_rdkafka_config() {
1307 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1308 ("auto.offset.reset", "latest"),
1309 ("kafka.fetch.min.bytes", "1024"),
1310 ]))
1311 .unwrap();
1312
1313 let rdkafka = cfg.to_rdkafka_config();
1314 assert_eq!(rdkafka.get("bootstrap.servers"), Some("localhost:9092"));
1315 assert_eq!(rdkafka.get("group.id"), Some("test-group"));
1316 assert_eq!(rdkafka.get("enable.auto.commit"), Some("false"));
1317 assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1318 assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1319 assert_eq!(rdkafka.get("security.protocol"), Some("plaintext"));
1320 assert_eq!(rdkafka.get("isolation.level"), Some("read_committed"));
1321 }
1322
1323 #[test]
1324 fn test_rdkafka_config_with_security() {
1325 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1326 ("security.protocol", "sasl_ssl"),
1327 ("sasl.mechanism", "PLAIN"),
1328 ("sasl.username", "user"),
1329 ("sasl.password", "pass"),
1330 ("ssl.ca.location", "/ca.pem"),
1331 ]))
1332 .unwrap();
1333
1334 let rdkafka = cfg.to_rdkafka_config();
1335 assert_eq!(rdkafka.get("security.protocol"), Some("sasl_ssl"));
1336 assert_eq!(rdkafka.get("sasl.mechanism"), Some("PLAIN"));
1337 assert_eq!(rdkafka.get("sasl.username"), Some("user"));
1338 assert_eq!(rdkafka.get("sasl.password"), Some("pass"));
1339 assert_eq!(rdkafka.get("ssl.ca.location"), Some("/ca.pem"));
1340 }
1341
1342 #[test]
1343 fn test_rdkafka_config_with_fetch_tuning() {
1344 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1345 ("fetch.min.bytes", "1024"),
1346 ("fetch.max.bytes", "1048576"),
1347 ("fetch.max.wait.ms", "500"),
1348 ("max.partition.fetch.bytes", "262144"),
1349 ("isolation.level", "read_uncommitted"),
1350 ]))
1351 .unwrap();
1352
1353 let rdkafka = cfg.to_rdkafka_config();
1354 assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1355 assert_eq!(rdkafka.get("fetch.max.bytes"), Some("1048576"));
1356 assert_eq!(rdkafka.get("fetch.wait.max.ms"), Some("500"));
1357 assert_eq!(rdkafka.get("max.partition.fetch.bytes"), Some("262144"));
1358 assert_eq!(rdkafka.get("isolation.level"), Some("read_uncommitted"));
1359 }
1360
1361 #[test]
1362 fn test_offset_reset_parsing() {
1363 assert_eq!(
1364 "earliest".parse::<OffsetReset>().unwrap(),
1365 OffsetReset::Earliest
1366 );
1367 assert_eq!(
1368 "latest".parse::<OffsetReset>().unwrap(),
1369 OffsetReset::Latest
1370 );
1371 assert_eq!("none".parse::<OffsetReset>().unwrap(), OffsetReset::None);
1372 assert!("invalid".parse::<OffsetReset>().is_err());
1373 }
1374
1375 #[test]
1376 fn test_compatibility_level_parsing() {
1377 assert_eq!(
1378 "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
1379 CompatibilityLevel::Backward
1380 );
1381 assert_eq!(
1382 "full_transitive".parse::<CompatibilityLevel>().unwrap(),
1383 CompatibilityLevel::FullTransitive
1384 );
1385 assert_eq!(
1386 "NONE".parse::<CompatibilityLevel>().unwrap(),
1387 CompatibilityLevel::None
1388 );
1389 assert!("invalid".parse::<CompatibilityLevel>().is_err());
1390 }
1391
1392 #[test]
1393 fn test_security_protocol_parsing() {
1394 assert_eq!(
1395 "plaintext".parse::<SecurityProtocol>().unwrap(),
1396 SecurityProtocol::Plaintext
1397 );
1398 assert_eq!(
1399 "SSL".parse::<SecurityProtocol>().unwrap(),
1400 SecurityProtocol::Ssl
1401 );
1402 assert_eq!(
1403 "sasl_plaintext".parse::<SecurityProtocol>().unwrap(),
1404 SecurityProtocol::SaslPlaintext
1405 );
1406 assert_eq!(
1407 "SASL_SSL".parse::<SecurityProtocol>().unwrap(),
1408 SecurityProtocol::SaslSsl
1409 );
1410 assert_eq!(
1411 "sasl-ssl".parse::<SecurityProtocol>().unwrap(),
1412 SecurityProtocol::SaslSsl
1413 );
1414 assert!("invalid".parse::<SecurityProtocol>().is_err());
1415 }
1416
1417 #[test]
1418 fn test_sasl_mechanism_parsing() {
1419 assert_eq!(
1420 "PLAIN".parse::<SaslMechanism>().unwrap(),
1421 SaslMechanism::Plain
1422 );
1423 assert_eq!(
1424 "SCRAM-SHA-256".parse::<SaslMechanism>().unwrap(),
1425 SaslMechanism::ScramSha256
1426 );
1427 assert_eq!(
1428 "scram_sha_512".parse::<SaslMechanism>().unwrap(),
1429 SaslMechanism::ScramSha512
1430 );
1431 assert_eq!(
1432 "GSSAPI".parse::<SaslMechanism>().unwrap(),
1433 SaslMechanism::Gssapi
1434 );
1435 assert_eq!(
1436 "OAUTHBEARER".parse::<SaslMechanism>().unwrap(),
1437 SaslMechanism::Oauthbearer
1438 );
1439 assert!("invalid".parse::<SaslMechanism>().is_err());
1440 }
1441
1442 #[test]
1443 fn test_isolation_level_parsing() {
1444 assert_eq!(
1445 "read_uncommitted".parse::<IsolationLevel>().unwrap(),
1446 IsolationLevel::ReadUncommitted
1447 );
1448 assert_eq!(
1449 "read_committed".parse::<IsolationLevel>().unwrap(),
1450 IsolationLevel::ReadCommitted
1451 );
1452 assert_eq!(
1453 "read-committed".parse::<IsolationLevel>().unwrap(),
1454 IsolationLevel::ReadCommitted
1455 );
1456 assert!("invalid".parse::<IsolationLevel>().is_err());
1457 }
1458
1459 #[test]
1460 fn test_topic_subscription_accessors() {
1461 let topics = TopicSubscription::Topics(vec!["a".into(), "b".into()]);
1462 assert_eq!(
1463 topics.topics(),
1464 Some(&["a".to_string(), "b".to_string()][..])
1465 );
1466 assert!(topics.pattern().is_none());
1467 assert!(!topics.is_pattern());
1468
1469 let pattern = TopicSubscription::Pattern("events-.*".into());
1470 assert!(pattern.topics().is_none());
1471 assert_eq!(pattern.pattern(), Some("events-.*"));
1472 assert!(pattern.is_pattern());
1473 }
1474
1475 #[test]
1476 fn test_security_protocol_helpers() {
1477 assert!(!SecurityProtocol::Plaintext.uses_ssl());
1478 assert!(!SecurityProtocol::Plaintext.uses_sasl());
1479
1480 assert!(SecurityProtocol::Ssl.uses_ssl());
1481 assert!(!SecurityProtocol::Ssl.uses_sasl());
1482
1483 assert!(!SecurityProtocol::SaslPlaintext.uses_ssl());
1484 assert!(SecurityProtocol::SaslPlaintext.uses_sasl());
1485
1486 assert!(SecurityProtocol::SaslSsl.uses_ssl());
1487 assert!(SecurityProtocol::SaslSsl.uses_sasl());
1488 }
1489
1490 #[test]
1491 fn test_sasl_mechanism_helpers() {
1492 assert!(SaslMechanism::Plain.requires_credentials());
1493 assert!(SaslMechanism::ScramSha256.requires_credentials());
1494 assert!(SaslMechanism::ScramSha512.requires_credentials());
1495 assert!(!SaslMechanism::Gssapi.requires_credentials());
1496 assert!(!SaslMechanism::Oauthbearer.requires_credentials());
1497 }
1498
1499 #[test]
1500 fn test_enum_display() {
1501 assert_eq!(SecurityProtocol::SaslSsl.to_string(), "sasl_ssl");
1502 assert_eq!(SaslMechanism::ScramSha256.to_string(), "SCRAM-SHA-256");
1503 assert_eq!(IsolationLevel::ReadCommitted.to_string(), "read_committed");
1504 }
1505
1506 #[test]
1507 fn test_startup_mode_parsing() {
1508 assert_eq!(
1509 "group-offsets".parse::<StartupMode>().unwrap(),
1510 StartupMode::GroupOffsets
1511 );
1512 assert_eq!(
1513 "group_offsets".parse::<StartupMode>().unwrap(),
1514 StartupMode::GroupOffsets
1515 );
1516 assert_eq!(
1517 "earliest".parse::<StartupMode>().unwrap(),
1518 StartupMode::Earliest
1519 );
1520 assert_eq!(
1521 "latest".parse::<StartupMode>().unwrap(),
1522 StartupMode::Latest
1523 );
1524 assert!("invalid".parse::<StartupMode>().is_err());
1525 }
1526
1527 #[test]
1528 fn test_startup_mode_display() {
1529 assert_eq!(StartupMode::GroupOffsets.to_string(), "group-offsets");
1530 assert_eq!(StartupMode::Earliest.to_string(), "earliest");
1531 assert_eq!(StartupMode::Latest.to_string(), "latest");
1532
1533 let specific = StartupMode::SpecificOffsets(HashMap::from([(0, 100), (1, 200)]));
1534 assert!(specific.to_string().contains("2 partitions"));
1535
1536 let ts = StartupMode::Timestamp(1234567890000);
1537 assert!(ts.to_string().contains("1234567890000"));
1538 }
1539
1540 #[test]
1541 fn test_startup_mode_helpers() {
1542 assert!(!StartupMode::GroupOffsets.overrides_offset_reset());
1543 assert!(StartupMode::Earliest.overrides_offset_reset());
1544 assert!(StartupMode::Latest.overrides_offset_reset());
1545 assert!(StartupMode::SpecificOffsets(HashMap::new()).overrides_offset_reset());
1546 assert!(StartupMode::Timestamp(0).overrides_offset_reset());
1547
1548 assert!(StartupMode::GroupOffsets.as_offset_reset().is_none());
1549 assert_eq!(StartupMode::Earliest.as_offset_reset(), Some("earliest"));
1550 assert_eq!(StartupMode::Latest.as_offset_reset(), Some("latest"));
1551 assert!(StartupMode::SpecificOffsets(HashMap::new())
1552 .as_offset_reset()
1553 .is_none());
1554 assert!(StartupMode::Timestamp(0).as_offset_reset().is_none());
1555 }
1556
1557 #[test]
1558 fn test_parse_specific_offsets() {
1559 let offsets = parse_specific_offsets("0:100,1:200,2:300").unwrap();
1560 assert_eq!(offsets.get(&0), Some(&100));
1561 assert_eq!(offsets.get(&1), Some(&200));
1562 assert_eq!(offsets.get(&2), Some(&300));
1563 }
1564
1565 #[test]
1566 fn test_parse_specific_offsets_with_spaces() {
1567 let offsets = parse_specific_offsets(" 0:100 , 1:200 ").unwrap();
1568 assert_eq!(offsets.get(&0), Some(&100));
1569 assert_eq!(offsets.get(&1), Some(&200));
1570 }
1571
1572 #[test]
1573 fn test_parse_specific_offsets_errors() {
1574 assert!(parse_specific_offsets("").is_err());
1575 assert!(parse_specific_offsets("0").is_err());
1576 assert!(parse_specific_offsets("0:abc").is_err());
1577 assert!(parse_specific_offsets("abc:100").is_err());
1578 assert!(parse_specific_offsets("0:100:extra").is_err());
1579 }
1580
1581 #[test]
1582 fn test_parse_startup_mode_from_config() {
1583 let cfg =
1584 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1585 assert_eq!(cfg.startup_mode, StartupMode::Earliest);
1586
1587 let cfg =
1588 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1589 assert_eq!(cfg.startup_mode, StartupMode::Latest);
1590
1591 let cfg =
1592 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "group-offsets")]))
1593 .unwrap();
1594 assert_eq!(cfg.startup_mode, StartupMode::GroupOffsets);
1595 }
1596
1597 #[test]
1598 fn test_parse_startup_specific_offsets_from_config() {
1599 let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1600 "startup.specific.offsets",
1601 "0:100,1:200",
1602 )]))
1603 .unwrap();
1604
1605 match cfg.startup_mode {
1606 StartupMode::SpecificOffsets(offsets) => {
1607 assert_eq!(offsets.get(&0), Some(&100));
1608 assert_eq!(offsets.get(&1), Some(&200));
1609 }
1610 _ => panic!("expected SpecificOffsets"),
1611 }
1612 }
1613
1614 #[test]
1615 fn test_parse_startup_timestamp_from_config() {
1616 let cfg =
1617 KafkaSourceConfig::from_config(&make_config(&[("startup.timestamp.ms", "1234567890")]))
1618 .unwrap();
1619
1620 assert_eq!(cfg.startup_mode, StartupMode::Timestamp(1234567890));
1621 }
1622
1623 #[test]
1624 fn test_parse_schema_registry_ssl_fields() {
1625 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1626 ("schema.registry.url", "https://sr:8081"),
1627 ("schema.registry.ssl.ca.location", "/ca.pem"),
1628 ("schema.registry.ssl.certificate.location", "/cert.pem"),
1629 ("schema.registry.ssl.key.location", "/key.pem"),
1630 ]))
1631 .unwrap();
1632
1633 assert_eq!(
1634 cfg.schema_registry_ssl_ca_location,
1635 Some("/ca.pem".to_string())
1636 );
1637 assert_eq!(
1638 cfg.schema_registry_ssl_certificate_location,
1639 Some("/cert.pem".to_string())
1640 );
1641 assert_eq!(
1642 cfg.schema_registry_ssl_key_location,
1643 Some("/key.pem".to_string())
1644 );
1645 }
1646
1647 #[test]
1648 fn test_parse_watermark_defaults() {
1649 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1650 assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(5));
1651 assert_eq!(cfg.idle_timeout, Duration::from_secs(30));
1652 assert!(!cfg.enable_watermark_tracking);
1653 assert!(cfg.alignment_group_id.is_none());
1654 assert!(cfg.alignment_max_drift.is_none());
1655 assert!(cfg.alignment_mode.is_none());
1656 }
1657
1658 #[test]
1659 fn test_parse_watermark_tracking_enabled() {
1660 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1661 ("enable.watermark.tracking", "true"),
1662 ("max.out.of.orderness.ms", "10000"),
1663 ("idle.timeout.ms", "60000"),
1664 ]))
1665 .unwrap();
1666
1667 assert!(cfg.enable_watermark_tracking);
1668 assert_eq!(cfg.max_out_of_orderness, Duration::from_millis(10_000));
1669 assert_eq!(cfg.idle_timeout, Duration::from_millis(60_000));
1670 }
1671
1672 #[test]
1673 fn test_parse_alignment_config() {
1674 use crate::kafka::KafkaAlignmentMode;
1675
1676 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1677 ("alignment.group.id", "orders-payments"),
1678 ("alignment.max.drift.ms", "300000"),
1679 ("alignment.mode", "pause"),
1680 ]))
1681 .unwrap();
1682
1683 assert_eq!(cfg.alignment_group_id, Some("orders-payments".to_string()));
1684 assert_eq!(
1685 cfg.alignment_max_drift,
1686 Some(Duration::from_millis(300_000))
1687 );
1688 assert_eq!(cfg.alignment_mode, Some(KafkaAlignmentMode::Pause));
1689 }
1690
1691 #[test]
1692 fn test_parse_alignment_mode_variants() {
1693 use crate::kafka::KafkaAlignmentMode;
1694
1695 let cfg = KafkaSourceConfig::from_config(&make_config(&[("alignment.mode", "warn-only")]))
1696 .unwrap();
1697 assert_eq!(cfg.alignment_mode, Some(KafkaAlignmentMode::WarnOnly));
1698
1699 let cfg =
1700 KafkaSourceConfig::from_config(&make_config(&[("alignment.mode", "drop-excess")]))
1701 .unwrap();
1702 assert_eq!(cfg.alignment_mode, Some(KafkaAlignmentMode::DropExcess));
1703 }
1704
1705 #[test]
1706 fn test_parse_alignment_mode_invalid() {
1707 let config = make_config(&[("alignment.mode", "invalid")]);
1708 assert!(KafkaSourceConfig::from_config(&config).is_err());
1709 }
1710}