1use std::collections::HashMap;
5use std::time::Duration;
6
7use rdkafka::config::ClientConfig;
8
9use crate::config::ConnectorConfig;
10use crate::error::ConnectorError;
11use crate::serde::Format;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum SecurityProtocol {
18 #[default]
20 Plaintext,
21 Ssl,
23 SaslPlaintext,
25 SaslSsl,
27}
28
29impl SecurityProtocol {
30 #[must_use]
32 pub fn as_rdkafka_str(&self) -> &'static str {
33 match self {
34 SecurityProtocol::Plaintext => "plaintext",
35 SecurityProtocol::Ssl => "ssl",
36 SecurityProtocol::SaslPlaintext => "sasl_plaintext",
37 SecurityProtocol::SaslSsl => "sasl_ssl",
38 }
39 }
40
41 #[must_use]
43 pub fn uses_ssl(&self) -> bool {
44 matches!(self, SecurityProtocol::Ssl | SecurityProtocol::SaslSsl)
45 }
46
47 #[must_use]
49 pub fn uses_sasl(&self) -> bool {
50 matches!(
51 self,
52 SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl
53 )
54 }
55}
56
57str_enum!(fromstr SecurityProtocol, lowercase, ConnectorError, "invalid security.protocol",
58 Plaintext => "plaintext";
59 Ssl => "ssl";
60 SaslPlaintext => "sasl_plaintext";
61 SaslSsl => "sasl_ssl"
62);
63
64impl std::fmt::Display for SecurityProtocol {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(f, "{}", self.as_rdkafka_str())
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
72pub enum SaslMechanism {
73 #[default]
75 Plain,
76 ScramSha256,
78 ScramSha512,
80 Gssapi,
82 Oauthbearer,
84}
85
86impl SaslMechanism {
87 #[must_use]
89 pub fn as_rdkafka_str(&self) -> &'static str {
90 match self {
91 SaslMechanism::Plain => "PLAIN",
92 SaslMechanism::ScramSha256 => "SCRAM-SHA-256",
93 SaslMechanism::ScramSha512 => "SCRAM-SHA-512",
94 SaslMechanism::Gssapi => "GSSAPI",
95 SaslMechanism::Oauthbearer => "OAUTHBEARER",
96 }
97 }
98
99 #[must_use]
101 pub fn requires_credentials(&self) -> bool {
102 matches!(
103 self,
104 SaslMechanism::Plain | SaslMechanism::ScramSha256 | SaslMechanism::ScramSha512
105 )
106 }
107}
108
109str_enum!(fromstr SaslMechanism, uppercase, ConnectorError, "invalid sasl.mechanism",
110 Plain => "PLAIN";
111 ScramSha256 => "SCRAM_SHA_256", "SCRAM_SHA256";
112 ScramSha512 => "SCRAM_SHA_512", "SCRAM_SHA512";
113 Gssapi => "GSSAPI", "KERBEROS";
114 Oauthbearer => "OAUTHBEARER", "OAUTH"
115);
116
117impl std::fmt::Display for SaslMechanism {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 write!(f, "{}", self.as_rdkafka_str())
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum IsolationLevel {
128 ReadUncommitted,
130 #[default]
132 ReadCommitted,
133}
134
135impl IsolationLevel {
136 #[must_use]
138 pub fn as_rdkafka_str(&self) -> &'static str {
139 match self {
140 IsolationLevel::ReadUncommitted => "read_uncommitted",
141 IsolationLevel::ReadCommitted => "read_committed",
142 }
143 }
144}
145
146str_enum!(fromstr IsolationLevel, lowercase, ConnectorError, "invalid isolation.level",
147 ReadUncommitted => "read_uncommitted";
148 ReadCommitted => "read_committed"
149);
150
151impl std::fmt::Display for IsolationLevel {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 write!(f, "{}", self.as_rdkafka_str())
154 }
155}
156
157#[derive(Debug, Clone, PartialEq, Eq, Default)]
163pub enum StartupMode {
164 #[default]
166 GroupOffsets,
167 Earliest,
169 Latest,
171 SpecificOffsets(HashMap<i32, i64>),
173 Timestamp(i64),
176}
177
178impl StartupMode {}
179
180impl std::str::FromStr for StartupMode {
181 type Err = ConnectorError;
182
183 fn from_str(s: &str) -> Result<Self, Self::Err> {
184 match s.to_lowercase().replace('-', "_").as_str() {
185 "group_offsets" | "group" => Ok(StartupMode::GroupOffsets),
186 "earliest" => Ok(StartupMode::Earliest),
187 "latest" => Ok(StartupMode::Latest),
188 "timestamp" => Err(ConnectorError::ConfigurationError(
189 "use 'startup.timestamp.ms' to start from a timestamp, \
190 not 'startup.mode = timestamp'"
191 .into(),
192 )),
193 other => Err(ConnectorError::ConfigurationError(format!(
194 "invalid startup.mode: '{other}' (expected group-offsets/earliest/latest)"
195 ))),
196 }
197 }
198}
199
200impl std::fmt::Display for StartupMode {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 match self {
203 StartupMode::GroupOffsets => write!(f, "group-offsets"),
204 StartupMode::Earliest => write!(f, "earliest"),
205 StartupMode::Latest => write!(f, "latest"),
206 StartupMode::SpecificOffsets(offsets) => {
207 write!(f, "specific-offsets({} partitions)", offsets.len())
208 }
209 StartupMode::Timestamp(ts) => write!(f, "timestamp({ts})"),
210 }
211 }
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum TopicSubscription {
217 Topics(Vec<String>),
219 Pattern(String),
221}
222
223impl TopicSubscription {
224 #[must_use]
226 pub fn topics(&self) -> Option<&[String]> {
227 match self {
228 TopicSubscription::Topics(t) => Some(t),
229 TopicSubscription::Pattern(_) => None,
230 }
231 }
232
233 #[must_use]
235 pub fn pattern(&self) -> Option<&str> {
236 match self {
237 TopicSubscription::Topics(_) => None,
238 TopicSubscription::Pattern(p) => Some(p),
239 }
240 }
241
242 #[must_use]
244 pub fn is_pattern(&self) -> bool {
245 matches!(self, TopicSubscription::Pattern(_))
246 }
247}
248
249impl Default for TopicSubscription {
250 fn default() -> Self {
251 TopicSubscription::Topics(Vec::new())
252 }
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum OffsetReset {
258 Earliest,
260 Latest,
262 None,
264}
265
266impl OffsetReset {
267 #[must_use]
269 pub fn as_rdkafka_str(&self) -> &'static str {
270 match self {
271 OffsetReset::Earliest => "earliest",
272 OffsetReset::Latest => "latest",
273 OffsetReset::None => "error",
274 }
275 }
276}
277
278str_enum!(fromstr OffsetReset, lowercase_nodash, ConnectorError, "invalid auto.offset.reset",
279 Earliest => "earliest", "beginning";
280 Latest => "latest", "end";
281 None => "none", "error"
282);
283
284#[derive(Debug, Clone, Copy, PartialEq, Eq)]
286pub enum AssignmentStrategy {
287 Range,
289 RoundRobin,
291 CooperativeSticky,
293}
294
295impl AssignmentStrategy {
296 #[must_use]
298 pub fn as_rdkafka_str(&self) -> &'static str {
299 match self {
300 AssignmentStrategy::Range => "range",
301 AssignmentStrategy::RoundRobin => "roundrobin",
302 AssignmentStrategy::CooperativeSticky => "cooperative-sticky",
303 }
304 }
305}
306
307str_enum!(fromstr AssignmentStrategy, lowercase_nodash, ConnectorError,
308 "invalid partition.assignment.strategy",
309 Range => "range";
310 RoundRobin => "roundrobin", "round-robin", "round_robin";
311 CooperativeSticky => "cooperative-sticky", "cooperative_sticky"
312);
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316pub enum CompatibilityLevel {
317 Backward,
319 BackwardTransitive,
321 Forward,
323 ForwardTransitive,
325 Full,
327 FullTransitive,
329 None,
331}
332
333impl CompatibilityLevel {
334 #[must_use]
336 pub fn as_str(&self) -> &'static str {
337 match self {
338 CompatibilityLevel::Backward => "BACKWARD",
339 CompatibilityLevel::BackwardTransitive => "BACKWARD_TRANSITIVE",
340 CompatibilityLevel::Forward => "FORWARD",
341 CompatibilityLevel::ForwardTransitive => "FORWARD_TRANSITIVE",
342 CompatibilityLevel::Full => "FULL",
343 CompatibilityLevel::FullTransitive => "FULL_TRANSITIVE",
344 CompatibilityLevel::None => "NONE",
345 }
346 }
347}
348
349str_enum!(fromstr CompatibilityLevel, uppercase, ConnectorError, "invalid schema.compatibility",
350 Backward => "BACKWARD";
351 BackwardTransitive => "BACKWARD_TRANSITIVE";
352 Forward => "FORWARD";
353 ForwardTransitive => "FORWARD_TRANSITIVE";
354 Full => "FULL";
355 FullTransitive => "FULL_TRANSITIVE";
356 None => "NONE"
357);
358
359impl std::fmt::Display for CompatibilityLevel {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 write!(f, "{}", self.as_str())
362 }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
367pub enum SchemaEvolutionStrategy {
368 #[default]
370 Log,
371 Reject,
373 Ignore,
375}
376
377str_enum!(fromstr SchemaEvolutionStrategy, lowercase, ConnectorError,
378 "invalid schema.evolution.strategy",
379 Log => "log";
380 Reject => "reject";
381 Ignore => "ignore"
382);
383
384impl std::fmt::Display for SchemaEvolutionStrategy {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 SchemaEvolutionStrategy::Log => write!(f, "log"),
388 SchemaEvolutionStrategy::Reject => write!(f, "reject"),
389 SchemaEvolutionStrategy::Ignore => write!(f, "ignore"),
390 }
391 }
392}
393
394#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
396pub enum SubjectNameStrategy {
397 #[default]
399 TopicName,
400 RecordName,
402 TopicRecordName,
404}
405
406str_enum!(fromstr SubjectNameStrategy, lowercase_nodash, ConnectorError,
407 "invalid schema.registry.subject.name.strategy",
408 TopicName => "topic-name", "topicname", "topicnamestrategy";
409 RecordName => "record-name", "recordname", "recordnamestrategy";
410 TopicRecordName => "topic-record-name", "topicrecordname", "topicrecordnamestrategy"
411);
412
413impl std::fmt::Display for SubjectNameStrategy {
414 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415 match self {
416 SubjectNameStrategy::TopicName => write!(f, "topic-name"),
417 SubjectNameStrategy::RecordName => write!(f, "record-name"),
418 SubjectNameStrategy::TopicRecordName => write!(f, "topic-record-name"),
419 }
420 }
421}
422
423pub(crate) fn resolve_value_subject(
427 strategy: SubjectNameStrategy,
428 record_name: Option<&str>,
429 topic: &str,
430) -> String {
431 let name = || record_name.expect("from_config validates record.name");
432 match strategy {
433 SubjectNameStrategy::TopicName => format!("{topic}-value"),
434 SubjectNameStrategy::RecordName => format!("{}-value", name()),
435 SubjectNameStrategy::TopicRecordName => format!("{topic}-{}-value", name()),
436 }
437}
438
439impl From<CompatibilityLevel> for crate::schema::traits::CompatibilityMode {
442 fn from(level: CompatibilityLevel) -> Self {
443 use crate::schema::traits::CompatibilityMode;
444 match level {
445 CompatibilityLevel::Backward => CompatibilityMode::Backward,
446 CompatibilityLevel::BackwardTransitive => CompatibilityMode::BackwardTransitive,
447 CompatibilityLevel::Forward => CompatibilityMode::Forward,
448 CompatibilityLevel::ForwardTransitive => CompatibilityMode::ForwardTransitive,
449 CompatibilityLevel::Full => CompatibilityMode::Full,
450 CompatibilityLevel::FullTransitive => CompatibilityMode::FullTransitive,
451 CompatibilityLevel::None => CompatibilityMode::None,
452 }
453 }
454}
455
456#[derive(Clone)]
458pub struct SrAuth {
459 pub username: String,
461 pub password: String,
463}
464
465impl std::fmt::Debug for SrAuth {
466 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467 f.debug_struct("SrAuth")
468 .field("username", &self.username)
469 .field("password", &"***")
470 .finish()
471 }
472}
473
474#[derive(Clone)]
479#[allow(clippy::struct_excessive_bools)] pub struct KafkaSourceConfig {
481 pub bootstrap_servers: String,
484 pub group_id: String,
486 pub subscription: TopicSubscription,
488
489 pub security_protocol: SecurityProtocol,
492 pub sasl_mechanism: Option<SaslMechanism>,
494 pub sasl_username: Option<String>,
496 pub sasl_password: Option<String>,
498 pub ssl_ca_location: Option<String>,
500 pub ssl_certificate_location: Option<String>,
502 pub ssl_key_location: Option<String>,
504 pub ssl_key_password: Option<String>,
506
507 pub format: Format,
510 pub schema_registry_url: Option<String>,
512 pub schema_registry_auth: Option<SrAuth>,
514 pub schema_compatibility: Option<CompatibilityLevel>,
516 pub schema_evolution_strategy: SchemaEvolutionStrategy,
518 pub schema_registry_ssl_ca_location: Option<String>,
520 pub schema_registry_ssl_certificate_location: Option<String>,
522 pub schema_registry_ssl_key_location: Option<String>,
524 pub schema_registry_subject_strategy: SubjectNameStrategy,
526 pub schema_registry_record_name: Option<String>,
528 pub schema_registry_discovery_timeout: Duration,
531 pub event_time_column: Option<String>,
533 pub include_metadata: bool,
535 pub include_headers: bool,
537
538 pub startup_mode: StartupMode,
541 pub auto_offset_reset: OffsetReset,
543 pub isolation_level: IsolationLevel,
545 pub max_poll_records: usize,
547 pub partition_assignment_strategy: AssignmentStrategy,
549 pub fetch_min_bytes: Option<i32>,
551 pub fetch_max_bytes: Option<i32>,
553 pub fetch_max_wait_ms: Option<i32>,
555 pub max_partition_fetch_bytes: Option<i32>,
557
558 pub max_out_of_orderness: Duration,
561 pub idle_timeout: Duration,
563 pub enable_watermark_tracking: bool,
565 pub session_timeout: Duration,
569 pub heartbeat_interval: Duration,
572 pub max_poll_interval: Duration,
579 pub queued_max_messages_kbytes: u32,
582
583 pub broker_commit_on_checkpoint: bool,
593
594 pub reader_channel_capacity: usize,
598 pub backpressure_high_watermark: f64,
600 pub backpressure_low_watermark: f64,
602
603 pub max_deser_error_rate: f64,
611
612 pub kafka_properties: HashMap<String, String>,
615}
616
617impl std::fmt::Debug for KafkaSourceConfig {
618 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
619 f.debug_struct("KafkaSourceConfig")
620 .field("bootstrap_servers", &self.bootstrap_servers)
621 .field("group_id", &self.group_id)
622 .field("subscription", &self.subscription)
623 .field("format", &self.format)
624 .field("security_protocol", &self.security_protocol)
625 .field("sasl_mechanism", &self.sasl_mechanism)
626 .field("sasl_username", &self.sasl_username)
627 .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
628 .field(
629 "ssl_key_password",
630 &self.ssl_key_password.as_ref().map(|_| "***"),
631 )
632 .field("max_poll_records", &self.max_poll_records)
633 .finish_non_exhaustive()
634 }
635}
636
637impl Default for KafkaSourceConfig {
638 fn default() -> Self {
639 Self {
640 bootstrap_servers: String::new(),
641 group_id: String::new(),
642 subscription: TopicSubscription::default(),
643 security_protocol: SecurityProtocol::default(),
644 sasl_mechanism: None,
645 sasl_username: None,
646 sasl_password: None,
647 ssl_ca_location: None,
648 ssl_certificate_location: None,
649 ssl_key_location: None,
650 ssl_key_password: None,
651 format: Format::Json,
652 schema_registry_url: None,
653 schema_registry_auth: None,
654 schema_compatibility: None,
655 schema_evolution_strategy: SchemaEvolutionStrategy::default(),
656 schema_registry_ssl_ca_location: None,
657 schema_registry_ssl_certificate_location: None,
658 schema_registry_ssl_key_location: None,
659 schema_registry_subject_strategy: SubjectNameStrategy::default(),
660 schema_registry_record_name: None,
661 schema_registry_discovery_timeout: Duration::from_secs(10),
662 event_time_column: None,
663 include_metadata: false,
664 include_headers: false,
665 startup_mode: StartupMode::default(),
666 auto_offset_reset: OffsetReset::Earliest,
667 isolation_level: IsolationLevel::default(),
668 max_poll_records: 1000,
669 partition_assignment_strategy: AssignmentStrategy::Range,
670 fetch_min_bytes: None,
671 fetch_max_bytes: None,
672 fetch_max_wait_ms: None,
673 max_partition_fetch_bytes: None,
674 max_out_of_orderness: Duration::from_secs(5),
675 idle_timeout: Duration::from_secs(30),
676 enable_watermark_tracking: false,
677 session_timeout: Duration::from_secs(45),
678 heartbeat_interval: Duration::from_secs(10),
679 max_poll_interval: Duration::from_secs(600),
680 queued_max_messages_kbytes: 16384,
681 broker_commit_on_checkpoint: true,
682 reader_channel_capacity: 1024,
683 backpressure_high_watermark: 0.8,
684 backpressure_low_watermark: 0.5,
685 max_deser_error_rate: 0.5,
686 kafka_properties: HashMap::new(),
687 }
688 }
689}
690
691impl KafkaSourceConfig {
692 #[allow(deprecated, clippy::too_many_lines)]
698 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
699 let bootstrap_servers = config.require("bootstrap.servers")?.to_string();
700 let group_id = config.require("group.id")?.to_string();
701
702 let subscription = if let Some(pattern) = config.get("topic.pattern") {
703 TopicSubscription::Pattern(pattern.to_string())
704 } else {
705 let topics_str = config.require("topic")?;
706 let topics: Vec<String> = topics_str
707 .split(',')
708 .map(|s| s.trim().to_string())
709 .collect();
710 TopicSubscription::Topics(topics.clone())
711 };
712
713 let security_protocol = match config.get("security.protocol") {
714 Some(s) => s.parse::<SecurityProtocol>()?,
715 None => SecurityProtocol::default(),
716 };
717
718 let sasl_mechanism = match config.get("sasl.mechanism") {
719 Some(s) => Some(s.parse::<SaslMechanism>()?),
720 None => None,
721 };
722
723 let sasl_username = config.get("sasl.username").map(String::from);
724 let sasl_password = config.get("sasl.password").map(String::from);
725 let ssl_ca_location = config.get("ssl.ca.location").map(String::from);
726 let ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
727 let ssl_key_location = config.get("ssl.key.location").map(String::from);
728 let ssl_key_password = config.get("ssl.key.password").map(String::from);
729
730 let format = match config.get("format") {
731 Some(f) => f
732 .parse::<Format>()
733 .map_err(|e| ConnectorError::ConfigurationError(e.to_string()))?,
734 None => Format::Json,
735 };
736
737 let schema_registry_url = config.get("schema.registry.url").map(String::from);
738
739 let schema_registry_auth = match (
740 config.get("schema.registry.username"),
741 config.get("schema.registry.password"),
742 ) {
743 (Some(u), Some(p)) => Some(SrAuth {
744 username: u.to_string(),
745 password: p.to_string(),
746 }),
747 (Some(_), None) | (None, Some(_)) => {
748 return Err(ConnectorError::ConfigurationError(
749 "schema.registry.username and schema.registry.password must both be set"
750 .to_string(),
751 ));
752 }
753 (None, None) => None,
754 };
755
756 let schema_compatibility = match config.get("schema.compatibility") {
757 Some(s) => Some(s.parse::<CompatibilityLevel>()?),
758 None => None,
759 };
760
761 let schema_evolution_strategy = match config.get("schema.evolution.strategy") {
762 Some(s) => s.parse::<SchemaEvolutionStrategy>()?,
763 None => SchemaEvolutionStrategy::default(),
764 };
765
766 let schema_registry_ssl_ca_location = config
767 .get("schema.registry.ssl.ca.location")
768 .map(String::from);
769 let schema_registry_ssl_certificate_location = config
770 .get("schema.registry.ssl.certificate.location")
771 .map(String::from);
772 let schema_registry_ssl_key_location = config
773 .get("schema.registry.ssl.key.location")
774 .map(String::from);
775
776 let schema_registry_subject_strategy =
777 match config.get("schema.registry.subject.name.strategy") {
778 Some(s) => s.parse::<SubjectNameStrategy>()?,
779 None => SubjectNameStrategy::default(),
780 };
781
782 let schema_registry_record_name =
783 config.get("schema.registry.record.name").map(String::from);
784
785 if matches!(
786 schema_registry_subject_strategy,
787 SubjectNameStrategy::RecordName | SubjectNameStrategy::TopicRecordName
788 ) && schema_registry_record_name.is_none()
789 {
790 return Err(ConnectorError::ConfigurationError(format!(
791 "schema.registry.subject.name.strategy={schema_registry_subject_strategy} \
792 requires schema.registry.record.name"
793 )));
794 }
795
796 let schema_registry_discovery_timeout = config
797 .get_parsed::<u64>("schema.registry.discovery.timeout.ms")?
798 .map_or(Duration::from_secs(10), Duration::from_millis);
799
800 let event_time_column = config.get("event.time.column").map(String::from);
801
802 let include_metadata = config
803 .get_parsed::<bool>("include.metadata")?
804 .unwrap_or(false);
805
806 let include_headers = config
807 .get_parsed::<bool>("include.headers")?
808 .unwrap_or(false);
809
810 let startup_mode = if let Some(offsets_str) = config.get("startup.specific.offsets") {
811 let offsets = parse_specific_offsets(offsets_str)?;
812 StartupMode::SpecificOffsets(offsets)
813 } else if let Some(ts_str) = config.get("startup.timestamp.ms") {
814 let ts: i64 = ts_str.parse().map_err(|_| {
815 ConnectorError::ConfigurationError(format!(
816 "invalid startup.timestamp.ms: '{ts_str}'"
817 ))
818 })?;
819 StartupMode::Timestamp(ts)
820 } else {
821 match config.get("startup.mode") {
822 Some(s) => s.parse::<StartupMode>()?,
823 None => StartupMode::default(),
824 }
825 };
826
827 let auto_offset_reset = match &startup_mode {
832 StartupMode::Earliest => OffsetReset::Earliest,
833 StartupMode::Latest => OffsetReset::Latest,
834 _ => match config.get("auto.offset.reset") {
835 Some(s) => s.parse::<OffsetReset>()?,
836 None => OffsetReset::Earliest,
837 },
838 };
839
840 let isolation_level = match config.get("isolation.level") {
841 Some(s) => s.parse::<IsolationLevel>()?,
842 None => IsolationLevel::default(),
843 };
844
845 let max_poll_records = config
846 .get_parsed::<usize>("max.poll.records")?
847 .unwrap_or(1000);
848
849 let partition_assignment_strategy = match config.get("partition.assignment.strategy") {
850 Some(s) => s.parse::<AssignmentStrategy>()?,
851 None => AssignmentStrategy::Range,
852 };
853
854 let fetch_min_bytes = config.get_parsed::<i32>("fetch.min.bytes")?;
855 let fetch_max_bytes = config.get_parsed::<i32>("fetch.max.bytes")?;
856 let fetch_max_wait_ms = config.get_parsed::<i32>("fetch.max.wait.ms")?;
857 let max_partition_fetch_bytes = config.get_parsed::<i32>("max.partition.fetch.bytes")?;
858
859 let max_out_of_orderness_ms = config
860 .get_parsed::<u64>("max.out.of.orderness.ms")?
861 .unwrap_or(5000);
862
863 let idle_timeout_ms = config
864 .get_parsed::<u64>("idle.timeout.ms")?
865 .unwrap_or(30_000);
866
867 let enable_watermark_tracking = config
868 .get_parsed::<bool>("enable.watermark.tracking")?
869 .unwrap_or(false);
870
871 let session_timeout_ms = config
872 .get_parsed::<u64>("session.timeout.ms")?
873 .unwrap_or(45_000);
874 let heartbeat_interval_ms = config
875 .get_parsed::<u64>("heartbeat.interval.ms")?
876 .unwrap_or(10_000);
877 let queued_max_messages_kbytes = config
878 .get_parsed::<u32>("queued.max.messages.kbytes")?
879 .unwrap_or(16384);
880 let max_poll_interval_ms = config
881 .get_parsed::<u64>("max.poll.interval.ms")?
882 .unwrap_or(600_000);
883
884 if config
889 .properties()
890 .contains_key("broker.commit.interval.ms")
891 {
892 return Err(ConnectorError::ConfigurationError(
893 "broker.commit.interval.ms is no longer supported — broker offset \
894 commits now happen on checkpoint completion. Set \
895 broker.commit.on.checkpoint=false to disable."
896 .into(),
897 ));
898 }
899 let broker_commit_on_checkpoint = config
900 .get_parsed::<bool>("broker.commit.on.checkpoint")?
901 .unwrap_or(true);
902
903 let reader_channel_capacity = config
904 .get_parsed::<usize>("reader.channel.capacity")?
905 .unwrap_or(1024);
906
907 let backpressure_high_watermark = config
908 .get_parsed::<f64>("backpressure.high.watermark")?
909 .unwrap_or(0.8);
910
911 let backpressure_low_watermark = config
912 .get_parsed::<f64>("backpressure.low.watermark")?
913 .unwrap_or(0.5);
914
915 let max_deser_error_rate = config
916 .get_parsed::<f64>("max.deser.error.rate")?
917 .unwrap_or(0.5);
918
919 let kafka_properties = config.properties_with_prefix("kafka.");
920
921 let cfg = Self {
922 bootstrap_servers,
923 group_id,
924 subscription,
925 security_protocol,
926 sasl_mechanism,
927 sasl_username,
928 sasl_password,
929 ssl_ca_location,
930 ssl_certificate_location,
931 ssl_key_location,
932 ssl_key_password,
933 format,
934 schema_registry_url,
935 schema_registry_auth,
936 schema_compatibility,
937 schema_evolution_strategy,
938 schema_registry_ssl_ca_location,
939 schema_registry_ssl_certificate_location,
940 schema_registry_ssl_key_location,
941 schema_registry_subject_strategy,
942 schema_registry_record_name,
943 schema_registry_discovery_timeout,
944 event_time_column,
945 include_metadata,
946 include_headers,
947 startup_mode,
948 auto_offset_reset,
949 isolation_level,
950 max_poll_records,
951 partition_assignment_strategy,
952 fetch_min_bytes,
953 fetch_max_bytes,
954 fetch_max_wait_ms,
955 max_partition_fetch_bytes,
956 max_out_of_orderness: Duration::from_millis(max_out_of_orderness_ms),
957 idle_timeout: Duration::from_millis(idle_timeout_ms),
958 enable_watermark_tracking,
959 session_timeout: Duration::from_millis(session_timeout_ms),
960 heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
961 max_poll_interval: Duration::from_millis(max_poll_interval_ms),
962 queued_max_messages_kbytes,
963 broker_commit_on_checkpoint,
964 reader_channel_capacity,
965 backpressure_high_watermark,
966 backpressure_low_watermark,
967 max_deser_error_rate,
968 kafka_properties,
969 };
970
971 cfg.validate()?;
972 Ok(cfg)
973 }
974
975 pub fn validate(&self) -> Result<(), ConnectorError> {
981 if self.bootstrap_servers.is_empty() {
982 return Err(ConnectorError::ConfigurationError(
983 "bootstrap.servers cannot be empty".into(),
984 ));
985 }
986 if self.group_id.is_empty() {
987 return Err(ConnectorError::ConfigurationError(
988 "group.id cannot be empty".into(),
989 ));
990 }
991
992 match &self.subscription {
994 TopicSubscription::Topics(t) if t.is_empty() => {
995 return Err(ConnectorError::ConfigurationError(
996 "at least one topic is required (or use topic.pattern)".into(),
997 ));
998 }
999 TopicSubscription::Pattern(p) if p.is_empty() => {
1000 return Err(ConnectorError::ConfigurationError(
1001 "topic.pattern cannot be empty".into(),
1002 ));
1003 }
1004 _ => {}
1005 }
1006
1007 if self.max_poll_records == 0 {
1008 return Err(ConnectorError::ConfigurationError(
1009 "max.poll.records must be > 0".into(),
1010 ));
1011 }
1012 if self.reader_channel_capacity < self.max_poll_records {
1013 return Err(ConnectorError::ConfigurationError(format!(
1014 "reader.channel.capacity ({}) must be >= max.poll.records ({})",
1015 self.reader_channel_capacity, self.max_poll_records
1016 )));
1017 }
1018
1019 if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
1020 return Err(ConnectorError::ConfigurationError(
1021 "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
1022 .into(),
1023 ));
1024 }
1025
1026 if let Some(mechanism) = &self.sasl_mechanism {
1027 if mechanism.requires_credentials()
1028 && (self.sasl_username.is_none() || self.sasl_password.is_none())
1029 {
1030 return Err(ConnectorError::ConfigurationError(format!(
1031 "sasl.username and sasl.password are required for {mechanism} mechanism"
1032 )));
1033 }
1034 }
1035
1036 if self.security_protocol.uses_ssl() {
1037 if let Some(ref ca) = self.ssl_ca_location {
1038 if ca.is_empty() {
1039 return Err(ConnectorError::ConfigurationError(
1040 "ssl.ca.location cannot be empty when specified".into(),
1041 ));
1042 }
1043 }
1044 }
1045
1046 if self.max_poll_interval.as_secs() < 60 {
1047 return Err(ConnectorError::ConfigurationError(format!(
1048 "max.poll.interval.ms ({}) must be >= 60000 (60s)",
1049 self.max_poll_interval.as_millis()
1050 )));
1051 }
1052
1053 if self.heartbeat_interval.as_millis() * 3 >= self.session_timeout.as_millis() {
1055 return Err(ConnectorError::ConfigurationError(format!(
1056 "heartbeat.interval.ms ({}) * 3 must be < session.timeout.ms ({})",
1057 self.heartbeat_interval.as_millis(),
1058 self.session_timeout.as_millis()
1059 )));
1060 }
1061
1062 if self.backpressure_high_watermark <= self.backpressure_low_watermark {
1063 return Err(ConnectorError::ConfigurationError(
1064 "backpressure.high.watermark must be > backpressure.low.watermark".into(),
1065 ));
1066 }
1067 if !(0.0..=1.0).contains(&self.backpressure_high_watermark) {
1068 return Err(ConnectorError::ConfigurationError(
1069 "backpressure.high.watermark must be between 0.0 and 1.0".into(),
1070 ));
1071 }
1072 if !(0.0..=1.0).contains(&self.backpressure_low_watermark) {
1073 return Err(ConnectorError::ConfigurationError(
1074 "backpressure.low.watermark must be between 0.0 and 1.0".into(),
1075 ));
1076 }
1077
1078 if !(0.0..=1.0).contains(&self.max_deser_error_rate) {
1079 return Err(ConnectorError::ConfigurationError(
1080 "max.deser.error.rate must be between 0.0 and 1.0".into(),
1081 ));
1082 }
1083
1084 if self.format == Format::Avro && self.schema_registry_url.is_none() {
1085 return Err(ConnectorError::ConfigurationError(
1086 "schema.registry.url is required when format is 'avro'".into(),
1087 ));
1088 }
1089
1090 Ok(())
1091 }
1092
1093 #[must_use]
1095 pub fn to_rdkafka_config(&self) -> ClientConfig {
1096 let mut config = ClientConfig::new();
1097
1098 config.set("bootstrap.servers", &self.bootstrap_servers);
1099 config.set("group.id", &self.group_id);
1100 config.set("enable.auto.commit", "false");
1101 config.set("enable.auto.offset.store", "false");
1102 config.set("auto.offset.reset", self.auto_offset_reset.as_rdkafka_str());
1103 config.set(
1104 "partition.assignment.strategy",
1105 self.partition_assignment_strategy.as_rdkafka_str(),
1106 );
1107 config.set("security.protocol", self.security_protocol.as_rdkafka_str());
1108
1109 if let Some(ref mechanism) = self.sasl_mechanism {
1110 config.set("sasl.mechanism", mechanism.as_rdkafka_str());
1111 }
1112 if let Some(ref username) = self.sasl_username {
1113 config.set("sasl.username", username);
1114 }
1115 if let Some(ref password) = self.sasl_password {
1116 config.set("sasl.password", password);
1117 }
1118 if let Some(ref ca) = self.ssl_ca_location {
1119 config.set("ssl.ca.location", ca);
1120 }
1121 if let Some(ref cert) = self.ssl_certificate_location {
1122 config.set("ssl.certificate.location", cert);
1123 }
1124 if let Some(ref key) = self.ssl_key_location {
1125 config.set("ssl.key.location", key);
1126 }
1127 if let Some(ref key_pass) = self.ssl_key_password {
1128 config.set("ssl.key.password", key_pass);
1129 }
1130
1131 config.set("isolation.level", self.isolation_level.as_rdkafka_str());
1132 config.set(
1133 "session.timeout.ms",
1134 self.session_timeout.as_millis().to_string(),
1135 );
1136 config.set(
1137 "heartbeat.interval.ms",
1138 self.heartbeat_interval.as_millis().to_string(),
1139 );
1140 config.set(
1141 "queued.max.messages.kbytes",
1142 self.queued_max_messages_kbytes.to_string(),
1143 );
1144 config.set(
1145 "max.poll.interval.ms",
1146 self.max_poll_interval.as_millis().to_string(),
1147 );
1148
1149 if let Some(fetch_min) = self.fetch_min_bytes {
1150 config.set("fetch.min.bytes", fetch_min.to_string());
1151 }
1152
1153 if let Some(fetch_max) = self.fetch_max_bytes {
1154 config.set("fetch.max.bytes", fetch_max.to_string());
1155 }
1156
1157 if let Some(wait_ms) = self.fetch_max_wait_ms {
1158 config.set("fetch.wait.max.ms", wait_ms.to_string());
1159 }
1160
1161 if let Some(partition_max) = self.max_partition_fetch_bytes {
1162 config.set("max.partition.fetch.bytes", partition_max.to_string());
1163 }
1164
1165 for (key, value) in &self.kafka_properties {
1168 if is_blocked_passthrough_key(key) {
1169 tracing::warn!(
1170 key,
1171 "ignoring kafka.* pass-through property that overrides a security setting"
1172 );
1173 continue;
1174 }
1175 config.set(key, value);
1176 }
1177
1178 config
1179 }
1180}
1181
1182fn is_blocked_passthrough_key(key: &str) -> bool {
1184 key.starts_with("sasl.kerberos.")
1185 || matches!(
1186 key,
1187 "security.protocol"
1188 | "sasl.mechanism"
1189 | "sasl.username"
1190 | "sasl.password"
1191 | "sasl.oauthbearer.config"
1192 | "ssl.ca.location"
1193 | "ssl.certificate.location"
1194 | "ssl.key.location"
1195 | "ssl.key.password"
1196 | "ssl.endpoint.identification.algorithm"
1197 | "enable.auto.commit"
1198 | "enable.auto.offset.store"
1199 | "enable.idempotence"
1200 | "auto.offset.reset"
1201 | "session.timeout.ms"
1202 | "heartbeat.interval.ms"
1203 | "max.poll.interval.ms"
1204 | "queued.max.messages.kbytes"
1205 | "auto.commit.interval.ms"
1208 )
1209}
1210
1211fn parse_specific_offsets(s: &str) -> Result<HashMap<i32, i64>, ConnectorError> {
1215 let mut offsets = HashMap::new();
1216
1217 for pair in s.split(',') {
1218 let pair = pair.trim();
1219 if pair.is_empty() {
1220 continue;
1221 }
1222
1223 let parts: Vec<&str> = pair.split(':').collect();
1224 if parts.len() != 2 {
1225 return Err(ConnectorError::ConfigurationError(format!(
1226 "invalid offset format '{pair}' (expected 'partition:offset')"
1227 )));
1228 }
1229
1230 let partition: i32 = parts[0].trim().parse().map_err(|_| {
1231 ConnectorError::ConfigurationError(format!(
1232 "invalid partition number '{}' in '{pair}'",
1233 parts[0]
1234 ))
1235 })?;
1236
1237 let offset: i64 = parts[1].trim().parse().map_err(|_| {
1238 ConnectorError::ConfigurationError(format!("invalid offset '{}' in '{pair}'", parts[1]))
1239 })?;
1240
1241 offsets.insert(partition, offset);
1242 }
1243
1244 if offsets.is_empty() {
1245 return Err(ConnectorError::ConfigurationError(
1246 "startup.specific.offsets cannot be empty".into(),
1247 ));
1248 }
1249
1250 Ok(offsets)
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use super::*;
1256
1257 fn make_config(extra: &[(&str, &str)]) -> ConnectorConfig {
1258 let mut config = ConnectorConfig::new("kafka");
1259 config.set("bootstrap.servers", "localhost:9092");
1260 config.set("group.id", "test-group");
1261 config.set("topic", "events");
1262 for (k, v) in extra {
1263 config.set(*k, *v);
1264 }
1265 config
1266 }
1267
1268 #[test]
1269 #[allow(deprecated)]
1270 fn test_parse_required_fields() {
1271 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1272 assert_eq!(cfg.bootstrap_servers, "localhost:9092");
1273 assert_eq!(cfg.group_id, "test-group");
1274 assert_eq!(cfg.subscription.topics().unwrap(), &["events"]);
1275 assert!(matches!(
1276 cfg.subscription,
1277 TopicSubscription::Topics(ref t) if t == &["events"]
1278 ));
1279 }
1280
1281 #[test]
1282 fn test_parse_missing_required() {
1283 let config = ConnectorConfig::new("kafka");
1284 assert!(KafkaSourceConfig::from_config(&config).is_err());
1285 }
1286
1287 #[test]
1288 #[allow(deprecated)]
1289 fn test_parse_multi_topic() {
1290 let cfg = KafkaSourceConfig::from_config(&make_config(&[("topic", "a, b, c")])).unwrap();
1291 assert_eq!(cfg.subscription.topics().unwrap(), &["a", "b", "c"]);
1292 assert!(matches!(
1293 cfg.subscription,
1294 TopicSubscription::Topics(ref t) if t == &["a", "b", "c"]
1295 ));
1296 }
1297
1298 #[test]
1299 fn test_parse_topic_pattern() {
1300 let mut config = ConnectorConfig::new("kafka");
1301 config.set("bootstrap.servers", "localhost:9092");
1302 config.set("group.id", "test-group");
1303 config.set("topic.pattern", "events-.*");
1304
1305 let cfg = KafkaSourceConfig::from_config(&config).unwrap();
1306 assert!(matches!(
1307 cfg.subscription,
1308 TopicSubscription::Pattern(ref p) if p == "events-.*"
1309 ));
1310 assert!(cfg.subscription.is_pattern());
1311 assert_eq!(cfg.subscription.pattern(), Some("events-.*"));
1312 assert!(cfg.subscription.topics().is_none());
1313 }
1314
1315 #[test]
1316 fn test_parse_defaults() {
1317 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1318 assert_eq!(cfg.format, Format::Json);
1319 assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1320 assert_eq!(cfg.isolation_level, IsolationLevel::ReadCommitted);
1321 assert_eq!(cfg.max_poll_records, 1000);
1322 assert_eq!(cfg.partition_assignment_strategy, AssignmentStrategy::Range);
1323 assert!(!cfg.include_metadata);
1324 assert!(!cfg.include_headers);
1325 assert!(cfg.schema_registry_url.is_none());
1326 assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
1327 assert!(cfg.sasl_mechanism.is_none());
1328 assert!(cfg.broker_commit_on_checkpoint);
1329 assert_eq!(cfg.reader_channel_capacity, 1024);
1330 }
1331
1332 #[test]
1333 fn test_parse_broker_commit_on_checkpoint_disabled() {
1334 let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1335 "broker.commit.on.checkpoint",
1336 "false",
1337 )]))
1338 .unwrap();
1339 assert!(!cfg.broker_commit_on_checkpoint);
1340 }
1341
1342 #[test]
1343 fn test_parse_broker_commit_interval_rejected() {
1344 let err =
1345 KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "5000")]))
1346 .unwrap_err();
1347 let msg = format!("{err}");
1348 assert!(
1349 msg.contains("broker.commit.interval.ms"),
1350 "expected hard-error mentioning the deprecated key, got: {msg}"
1351 );
1352 }
1353
1354 #[test]
1355 fn test_parse_optional_fields() {
1356 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1357 ("format", "csv"),
1358 ("auto.offset.reset", "latest"),
1359 ("max.poll.records", "500"),
1360 ("include.metadata", "true"),
1361 ("include.headers", "true"),
1362 ("event.time.column", "ts"),
1363 ("partition.assignment.strategy", "roundrobin"),
1364 ("isolation.level", "read_uncommitted"),
1365 ]))
1366 .unwrap();
1367
1368 assert_eq!(cfg.format, Format::Csv);
1369 assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1370 assert_eq!(cfg.isolation_level, IsolationLevel::ReadUncommitted);
1371 assert_eq!(cfg.max_poll_records, 500);
1372 assert!(cfg.include_metadata);
1373 assert!(cfg.include_headers);
1374 assert_eq!(cfg.event_time_column, Some("ts".to_string()));
1375 assert_eq!(
1376 cfg.partition_assignment_strategy,
1377 AssignmentStrategy::RoundRobin
1378 );
1379 }
1380
1381 #[test]
1382 fn test_parse_security_sasl_ssl() {
1383 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1384 ("security.protocol", "sasl_ssl"),
1385 ("sasl.mechanism", "SCRAM-SHA-256"),
1386 ("sasl.username", "alice"),
1387 ("sasl.password", "secret"),
1388 ("ssl.ca.location", "/etc/ssl/ca.pem"),
1389 ]))
1390 .unwrap();
1391
1392 assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
1393 assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha256));
1394 assert_eq!(cfg.sasl_username, Some("alice".to_string()));
1395 assert_eq!(cfg.sasl_password, Some("secret".to_string()));
1396 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1397 }
1398
1399 #[test]
1400 fn test_parse_security_ssl_only() {
1401 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1402 ("security.protocol", "ssl"),
1403 ("ssl.ca.location", "/etc/ssl/ca.pem"),
1404 ("ssl.certificate.location", "/etc/ssl/client.pem"),
1405 ("ssl.key.location", "/etc/ssl/client.key"),
1406 ("ssl.key.password", "keypass"),
1407 ]))
1408 .unwrap();
1409
1410 assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
1411 assert!(cfg.security_protocol.uses_ssl());
1412 assert!(!cfg.security_protocol.uses_sasl());
1413 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1414 assert_eq!(
1415 cfg.ssl_certificate_location,
1416 Some("/etc/ssl/client.pem".to_string())
1417 );
1418 assert_eq!(
1419 cfg.ssl_key_location,
1420 Some("/etc/ssl/client.key".to_string())
1421 );
1422 assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
1423 }
1424
1425 #[test]
1426 fn test_parse_fetch_tuning() {
1427 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1428 ("fetch.min.bytes", "1024"),
1429 ("fetch.max.bytes", "52428800"),
1430 ("fetch.max.wait.ms", "500"),
1431 ("max.partition.fetch.bytes", "1048576"),
1432 ]))
1433 .unwrap();
1434
1435 assert_eq!(cfg.fetch_min_bytes, Some(1024));
1436 assert_eq!(cfg.fetch_max_bytes, Some(52_428_800));
1437 assert_eq!(cfg.fetch_max_wait_ms, Some(500));
1438 assert_eq!(cfg.max_partition_fetch_bytes, Some(1_048_576));
1439 }
1440
1441 #[test]
1442 fn test_parse_kafka_passthrough() {
1443 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1444 ("kafka.session.timeout.ms", "30000"),
1445 ("kafka.max.poll.interval.ms", "300000"),
1446 ("kafka.fetch.message.max.bytes", "2097152"),
1447 ]))
1448 .unwrap();
1449
1450 assert_eq!(cfg.kafka_properties.len(), 3);
1451 assert_eq!(
1455 cfg.kafka_properties.get("session.timeout.ms"),
1456 Some(&"30000".to_string())
1457 );
1458 assert_eq!(
1459 cfg.kafka_properties.get("max.poll.interval.ms"),
1460 Some(&"300000".to_string())
1461 );
1462 }
1463
1464 #[test]
1465 fn test_parse_schema_registry() {
1466 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1467 ("format", "avro"),
1468 ("schema.registry.url", "http://localhost:8081"),
1469 ("schema.registry.username", "user"),
1470 ("schema.registry.password", "pass"),
1471 ("schema.compatibility", "FULL_TRANSITIVE"),
1472 ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
1473 ]))
1474 .unwrap();
1475
1476 assert_eq!(cfg.format, Format::Avro);
1477 assert_eq!(
1478 cfg.schema_registry_url,
1479 Some("http://localhost:8081".to_string())
1480 );
1481 assert!(cfg.schema_registry_auth.is_some());
1482 let auth = cfg.schema_registry_auth.unwrap();
1483 assert_eq!(auth.username, "user");
1484 assert_eq!(auth.password, "pass");
1485 assert_eq!(
1486 cfg.schema_compatibility,
1487 Some(CompatibilityLevel::FullTransitive)
1488 );
1489 assert_eq!(
1490 cfg.schema_registry_ssl_ca_location,
1491 Some("/etc/ssl/sr-ca.pem".to_string())
1492 );
1493 }
1494
1495 #[test]
1496 fn test_parse_sr_auth_partial() {
1497 let config = make_config(&[
1498 ("schema.registry.url", "http://localhost:8081"),
1499 ("schema.registry.username", "user"),
1500 ]);
1502 assert!(KafkaSourceConfig::from_config(&config).is_err());
1503 }
1504
1505 #[test]
1506 fn test_validate_avro_without_sr() {
1507 let mut cfg = KafkaSourceConfig::default();
1508 cfg.bootstrap_servers = "localhost:9092".into();
1509 cfg.group_id = "g".into();
1510 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1511 cfg.format = Format::Avro;
1512 assert!(cfg.validate().is_err());
1514 }
1515
1516 #[test]
1517 fn test_validate_backpressure_watermarks() {
1518 let mut cfg = KafkaSourceConfig::default();
1519 cfg.bootstrap_servers = "localhost:9092".into();
1520 cfg.group_id = "g".into();
1521 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1522 cfg.backpressure_high_watermark = 0.3;
1523 cfg.backpressure_low_watermark = 0.5;
1524 assert!(cfg.validate().is_err());
1525 }
1526
1527 #[test]
1528 fn test_validate_sasl_without_mechanism() {
1529 let mut cfg = KafkaSourceConfig::default();
1530 cfg.bootstrap_servers = "localhost:9092".into();
1531 cfg.group_id = "g".into();
1532 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1533 cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1534 assert!(cfg.validate().is_err());
1536 }
1537
1538 #[test]
1539 fn test_validate_sasl_plain_without_credentials() {
1540 let mut cfg = KafkaSourceConfig::default();
1541 cfg.bootstrap_servers = "localhost:9092".into();
1542 cfg.group_id = "g".into();
1543 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1544 cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1545 cfg.sasl_mechanism = Some(SaslMechanism::Plain);
1546 assert!(cfg.validate().is_err());
1548 }
1549
1550 #[test]
1551 fn test_validate_empty_topic_pattern() {
1552 let mut cfg = KafkaSourceConfig::default();
1553 cfg.bootstrap_servers = "localhost:9092".into();
1554 cfg.group_id = "g".into();
1555 cfg.subscription = TopicSubscription::Pattern(String::new());
1556 assert!(cfg.validate().is_err());
1557 }
1558
1559 #[test]
1560 fn test_rdkafka_config() {
1561 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1562 ("auto.offset.reset", "latest"),
1563 ("kafka.fetch.min.bytes", "1024"),
1564 ]))
1565 .unwrap();
1566
1567 let rdkafka = cfg.to_rdkafka_config();
1568 assert_eq!(rdkafka.get("bootstrap.servers"), Some("localhost:9092"));
1569 assert_eq!(rdkafka.get("group.id"), Some("test-group"));
1570 assert_eq!(rdkafka.get("enable.auto.commit"), Some("false"));
1571 assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1572 assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1573 assert_eq!(rdkafka.get("security.protocol"), Some("plaintext"));
1574 assert_eq!(rdkafka.get("isolation.level"), Some("read_committed"));
1575 }
1576
1577 #[test]
1578 fn test_rdkafka_config_with_security() {
1579 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1580 ("security.protocol", "sasl_ssl"),
1581 ("sasl.mechanism", "PLAIN"),
1582 ("sasl.username", "user"),
1583 ("sasl.password", "pass"),
1584 ("ssl.ca.location", "/ca.pem"),
1585 ]))
1586 .unwrap();
1587
1588 let rdkafka = cfg.to_rdkafka_config();
1589 assert_eq!(rdkafka.get("security.protocol"), Some("sasl_ssl"));
1590 assert_eq!(rdkafka.get("sasl.mechanism"), Some("PLAIN"));
1591 assert_eq!(rdkafka.get("sasl.username"), Some("user"));
1592 assert_eq!(rdkafka.get("sasl.password"), Some("pass"));
1593 assert_eq!(rdkafka.get("ssl.ca.location"), Some("/ca.pem"));
1594 }
1595
1596 #[test]
1597 fn test_rdkafka_config_with_fetch_tuning() {
1598 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1599 ("fetch.min.bytes", "1024"),
1600 ("fetch.max.bytes", "1048576"),
1601 ("fetch.max.wait.ms", "500"),
1602 ("max.partition.fetch.bytes", "262144"),
1603 ("isolation.level", "read_uncommitted"),
1604 ]))
1605 .unwrap();
1606
1607 let rdkafka = cfg.to_rdkafka_config();
1608 assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1609 assert_eq!(rdkafka.get("fetch.max.bytes"), Some("1048576"));
1610 assert_eq!(rdkafka.get("fetch.wait.max.ms"), Some("500"));
1611 assert_eq!(rdkafka.get("max.partition.fetch.bytes"), Some("262144"));
1612 assert_eq!(rdkafka.get("isolation.level"), Some("read_uncommitted"));
1613 }
1614
1615 #[test]
1616 fn test_offset_reset_parsing() {
1617 assert_eq!(
1618 "earliest".parse::<OffsetReset>().unwrap(),
1619 OffsetReset::Earliest
1620 );
1621 assert_eq!(
1622 "latest".parse::<OffsetReset>().unwrap(),
1623 OffsetReset::Latest
1624 );
1625 assert_eq!("none".parse::<OffsetReset>().unwrap(), OffsetReset::None);
1626 assert!("invalid".parse::<OffsetReset>().is_err());
1627 }
1628
1629 #[test]
1630 fn test_compatibility_level_parsing() {
1631 assert_eq!(
1632 "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
1633 CompatibilityLevel::Backward
1634 );
1635 assert_eq!(
1636 "full_transitive".parse::<CompatibilityLevel>().unwrap(),
1637 CompatibilityLevel::FullTransitive
1638 );
1639 assert_eq!(
1640 "NONE".parse::<CompatibilityLevel>().unwrap(),
1641 CompatibilityLevel::None
1642 );
1643 assert!("invalid".parse::<CompatibilityLevel>().is_err());
1644 }
1645
1646 #[test]
1647 fn test_security_protocol_parsing() {
1648 assert_eq!(
1649 "plaintext".parse::<SecurityProtocol>().unwrap(),
1650 SecurityProtocol::Plaintext
1651 );
1652 assert_eq!(
1653 "SSL".parse::<SecurityProtocol>().unwrap(),
1654 SecurityProtocol::Ssl
1655 );
1656 assert_eq!(
1657 "sasl_plaintext".parse::<SecurityProtocol>().unwrap(),
1658 SecurityProtocol::SaslPlaintext
1659 );
1660 assert_eq!(
1661 "SASL_SSL".parse::<SecurityProtocol>().unwrap(),
1662 SecurityProtocol::SaslSsl
1663 );
1664 assert_eq!(
1665 "sasl-ssl".parse::<SecurityProtocol>().unwrap(),
1666 SecurityProtocol::SaslSsl
1667 );
1668 assert!("invalid".parse::<SecurityProtocol>().is_err());
1669 }
1670
1671 #[test]
1672 fn test_sasl_mechanism_parsing() {
1673 assert_eq!(
1674 "PLAIN".parse::<SaslMechanism>().unwrap(),
1675 SaslMechanism::Plain
1676 );
1677 assert_eq!(
1678 "SCRAM-SHA-256".parse::<SaslMechanism>().unwrap(),
1679 SaslMechanism::ScramSha256
1680 );
1681 assert_eq!(
1682 "scram_sha_512".parse::<SaslMechanism>().unwrap(),
1683 SaslMechanism::ScramSha512
1684 );
1685 assert_eq!(
1686 "GSSAPI".parse::<SaslMechanism>().unwrap(),
1687 SaslMechanism::Gssapi
1688 );
1689 assert_eq!(
1690 "OAUTHBEARER".parse::<SaslMechanism>().unwrap(),
1691 SaslMechanism::Oauthbearer
1692 );
1693 assert!("invalid".parse::<SaslMechanism>().is_err());
1694 }
1695
1696 #[test]
1697 fn test_isolation_level_parsing() {
1698 assert_eq!(
1699 "read_uncommitted".parse::<IsolationLevel>().unwrap(),
1700 IsolationLevel::ReadUncommitted
1701 );
1702 assert_eq!(
1703 "read_committed".parse::<IsolationLevel>().unwrap(),
1704 IsolationLevel::ReadCommitted
1705 );
1706 assert_eq!(
1707 "read-committed".parse::<IsolationLevel>().unwrap(),
1708 IsolationLevel::ReadCommitted
1709 );
1710 assert!("invalid".parse::<IsolationLevel>().is_err());
1711 }
1712
1713 #[test]
1714 fn test_topic_subscription_accessors() {
1715 let topics = TopicSubscription::Topics(vec!["a".into(), "b".into()]);
1716 assert_eq!(
1717 topics.topics(),
1718 Some(&["a".to_string(), "b".to_string()][..])
1719 );
1720 assert!(topics.pattern().is_none());
1721 assert!(!topics.is_pattern());
1722
1723 let pattern = TopicSubscription::Pattern("events-.*".into());
1724 assert!(pattern.topics().is_none());
1725 assert_eq!(pattern.pattern(), Some("events-.*"));
1726 assert!(pattern.is_pattern());
1727 }
1728
1729 #[test]
1730 fn test_security_protocol_helpers() {
1731 assert!(!SecurityProtocol::Plaintext.uses_ssl());
1732 assert!(!SecurityProtocol::Plaintext.uses_sasl());
1733
1734 assert!(SecurityProtocol::Ssl.uses_ssl());
1735 assert!(!SecurityProtocol::Ssl.uses_sasl());
1736
1737 assert!(!SecurityProtocol::SaslPlaintext.uses_ssl());
1738 assert!(SecurityProtocol::SaslPlaintext.uses_sasl());
1739
1740 assert!(SecurityProtocol::SaslSsl.uses_ssl());
1741 assert!(SecurityProtocol::SaslSsl.uses_sasl());
1742 }
1743
1744 #[test]
1745 fn test_sasl_mechanism_helpers() {
1746 assert!(SaslMechanism::Plain.requires_credentials());
1747 assert!(SaslMechanism::ScramSha256.requires_credentials());
1748 assert!(SaslMechanism::ScramSha512.requires_credentials());
1749 assert!(!SaslMechanism::Gssapi.requires_credentials());
1750 assert!(!SaslMechanism::Oauthbearer.requires_credentials());
1751 }
1752
1753 #[test]
1754 fn test_enum_display() {
1755 assert_eq!(SecurityProtocol::SaslSsl.to_string(), "sasl_ssl");
1756 assert_eq!(SaslMechanism::ScramSha256.to_string(), "SCRAM-SHA-256");
1757 assert_eq!(IsolationLevel::ReadCommitted.to_string(), "read_committed");
1758 }
1759
1760 #[test]
1761 fn test_startup_mode_parsing() {
1762 assert_eq!(
1763 "group-offsets".parse::<StartupMode>().unwrap(),
1764 StartupMode::GroupOffsets
1765 );
1766 assert_eq!(
1767 "group_offsets".parse::<StartupMode>().unwrap(),
1768 StartupMode::GroupOffsets
1769 );
1770 assert_eq!(
1771 "earliest".parse::<StartupMode>().unwrap(),
1772 StartupMode::Earliest
1773 );
1774 assert_eq!(
1775 "latest".parse::<StartupMode>().unwrap(),
1776 StartupMode::Latest
1777 );
1778 assert!("invalid".parse::<StartupMode>().is_err());
1779 }
1780
1781 #[test]
1782 fn test_startup_mode_display() {
1783 assert_eq!(StartupMode::GroupOffsets.to_string(), "group-offsets");
1784 assert_eq!(StartupMode::Earliest.to_string(), "earliest");
1785 assert_eq!(StartupMode::Latest.to_string(), "latest");
1786
1787 let specific = StartupMode::SpecificOffsets(HashMap::from([(0, 100), (1, 200)]));
1788 assert!(specific.to_string().contains("2 partitions"));
1789
1790 let ts = StartupMode::Timestamp(1234567890000);
1791 assert!(ts.to_string().contains("1234567890000"));
1792 }
1793
1794 #[test]
1795 fn test_startup_mode_latest_overrides_offset_reset() {
1796 let cfg =
1797 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1798 assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1799 let rdkafka = cfg.to_rdkafka_config();
1800 assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1801 }
1802
1803 #[test]
1804 fn test_startup_mode_earliest_overrides_offset_reset() {
1805 let cfg =
1806 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1807 assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1808 }
1809
1810 #[test]
1811 fn test_startup_mode_group_offsets_uses_explicit_offset_reset() {
1812 let cfg = KafkaSourceConfig::from_config(&make_config(&[("auto.offset.reset", "latest")]))
1815 .unwrap();
1816 assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1817 }
1818
1819 #[test]
1820 fn test_parse_specific_offsets() {
1821 let offsets = parse_specific_offsets("0:100,1:200,2:300").unwrap();
1822 assert_eq!(offsets.get(&0), Some(&100));
1823 assert_eq!(offsets.get(&1), Some(&200));
1824 assert_eq!(offsets.get(&2), Some(&300));
1825 }
1826
1827 #[test]
1828 fn test_parse_specific_offsets_with_spaces() {
1829 let offsets = parse_specific_offsets(" 0:100 , 1:200 ").unwrap();
1830 assert_eq!(offsets.get(&0), Some(&100));
1831 assert_eq!(offsets.get(&1), Some(&200));
1832 }
1833
1834 #[test]
1835 fn test_parse_specific_offsets_errors() {
1836 assert!(parse_specific_offsets("").is_err());
1837 assert!(parse_specific_offsets("0").is_err());
1838 assert!(parse_specific_offsets("0:abc").is_err());
1839 assert!(parse_specific_offsets("abc:100").is_err());
1840 assert!(parse_specific_offsets("0:100:extra").is_err());
1841 }
1842
1843 #[test]
1844 fn test_parse_startup_mode_from_config() {
1845 let cfg =
1846 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1847 assert_eq!(cfg.startup_mode, StartupMode::Earliest);
1848
1849 let cfg =
1850 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1851 assert_eq!(cfg.startup_mode, StartupMode::Latest);
1852
1853 let cfg =
1854 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "group-offsets")]))
1855 .unwrap();
1856 assert_eq!(cfg.startup_mode, StartupMode::GroupOffsets);
1857 }
1858
1859 #[test]
1860 fn test_parse_startup_specific_offsets_from_config() {
1861 let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1862 "startup.specific.offsets",
1863 "0:100,1:200",
1864 )]))
1865 .unwrap();
1866
1867 match cfg.startup_mode {
1868 StartupMode::SpecificOffsets(offsets) => {
1869 assert_eq!(offsets.get(&0), Some(&100));
1870 assert_eq!(offsets.get(&1), Some(&200));
1871 }
1872 _ => panic!("expected SpecificOffsets"),
1873 }
1874 }
1875
1876 #[test]
1877 fn test_parse_startup_timestamp_from_config() {
1878 let cfg =
1879 KafkaSourceConfig::from_config(&make_config(&[("startup.timestamp.ms", "1234567890")]))
1880 .unwrap();
1881
1882 assert_eq!(cfg.startup_mode, StartupMode::Timestamp(1234567890));
1883 }
1884
1885 #[test]
1886 fn test_parse_schema_registry_ssl_fields() {
1887 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1888 ("schema.registry.url", "https://sr:8081"),
1889 ("schema.registry.ssl.ca.location", "/ca.pem"),
1890 ("schema.registry.ssl.certificate.location", "/cert.pem"),
1891 ("schema.registry.ssl.key.location", "/key.pem"),
1892 ]))
1893 .unwrap();
1894
1895 assert_eq!(
1896 cfg.schema_registry_ssl_ca_location,
1897 Some("/ca.pem".to_string())
1898 );
1899 assert_eq!(
1900 cfg.schema_registry_ssl_certificate_location,
1901 Some("/cert.pem".to_string())
1902 );
1903 assert_eq!(
1904 cfg.schema_registry_ssl_key_location,
1905 Some("/key.pem".to_string())
1906 );
1907 }
1908
1909 #[test]
1910 fn test_parse_watermark_defaults() {
1911 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1912 assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(5));
1913 assert_eq!(cfg.idle_timeout, Duration::from_secs(30));
1914 assert!(!cfg.enable_watermark_tracking);
1915 }
1916
1917 #[test]
1918 fn test_parse_watermark_tracking_enabled() {
1919 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1920 ("enable.watermark.tracking", "true"),
1921 ("max.out.of.orderness.ms", "10000"),
1922 ("idle.timeout.ms", "60000"),
1923 ]))
1924 .unwrap();
1925
1926 assert!(cfg.enable_watermark_tracking);
1927 assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(10));
1928 assert_eq!(cfg.idle_timeout, Duration::from_secs(60));
1929 }
1930
1931 #[test]
1934 fn test_startup_mode_timestamp_error() {
1935 let config = make_config(&[("startup.mode", "timestamp")]);
1936 let err = KafkaSourceConfig::from_config(&config).unwrap_err();
1937 let msg = err.to_string();
1938 assert!(
1939 msg.contains("startup.timestamp.ms"),
1940 "error should mention startup.timestamp.ms, got: {msg}"
1941 );
1942 }
1943
1944 #[test]
1947 fn test_session_timeout_heartbeat_defaults() {
1948 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1949 assert_eq!(cfg.session_timeout, Duration::from_secs(45));
1950 assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
1951 }
1952
1953 #[test]
1954 fn test_session_timeout_heartbeat_custom() {
1955 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1956 ("session.timeout.ms", "60000"),
1957 ("heartbeat.interval.ms", "15000"),
1958 ]))
1959 .unwrap();
1960 assert_eq!(cfg.session_timeout, Duration::from_secs(60));
1961 assert_eq!(cfg.heartbeat_interval, Duration::from_secs(15));
1962 }
1963
1964 #[test]
1965 fn test_session_timeout_heartbeat_validation_fails() {
1966 let config = make_config(&[
1968 ("session.timeout.ms", "45000"),
1969 ("heartbeat.interval.ms", "20000"),
1970 ]);
1971 let err = KafkaSourceConfig::from_config(&config).unwrap_err();
1972 let msg = err.to_string();
1973 assert!(msg.contains("heartbeat.interval.ms"), "got: {msg}");
1974 }
1975
1976 #[test]
1977 fn test_session_timeout_heartbeat_validation_passes() {
1978 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1980 ("session.timeout.ms", "45000"),
1981 ("heartbeat.interval.ms", "10000"),
1982 ]))
1983 .unwrap();
1984 assert_eq!(cfg.session_timeout, Duration::from_secs(45));
1985 assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
1986 }
1987
1988 #[test]
1989 fn test_session_timeout_heartbeat_in_rdkafka_config() {
1990 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1991 ("session.timeout.ms", "60000"),
1992 ("heartbeat.interval.ms", "15000"),
1993 ]))
1994 .unwrap();
1995 let rdkafka = cfg.to_rdkafka_config();
1996 assert_eq!(rdkafka.get("session.timeout.ms"), Some("60000"));
1997 assert_eq!(rdkafka.get("heartbeat.interval.ms"), Some("15000"));
1998 }
1999
2000 #[test]
2001 fn test_session_timeout_passthrough_blocked() {
2002 let cfg =
2003 KafkaSourceConfig::from_config(&make_config(&[("kafka.session.timeout.ms", "99999")]))
2004 .unwrap();
2005 let rdkafka = cfg.to_rdkafka_config();
2007 assert_eq!(rdkafka.get("session.timeout.ms"), Some("45000"));
2008 }
2009
2010 #[test]
2013 fn test_queued_max_messages_kbytes_default() {
2014 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
2015 assert_eq!(cfg.queued_max_messages_kbytes, 16384);
2016 }
2017
2018 #[test]
2019 fn test_queued_max_messages_kbytes_custom() {
2020 let cfg = KafkaSourceConfig::from_config(&make_config(&[(
2021 "queued.max.messages.kbytes",
2022 "32768",
2023 )]))
2024 .unwrap();
2025 assert_eq!(cfg.queued_max_messages_kbytes, 32768);
2026 }
2027
2028 #[test]
2029 fn test_queued_max_messages_kbytes_in_rdkafka_config() {
2030 let cfg =
2031 KafkaSourceConfig::from_config(&make_config(&[("queued.max.messages.kbytes", "8192")]))
2032 .unwrap();
2033 let rdkafka = cfg.to_rdkafka_config();
2034 assert_eq!(rdkafka.get("queued.max.messages.kbytes"), Some("8192"));
2035 }
2036
2037 #[test]
2040 fn resolve_subject_topic_name() {
2041 assert_eq!(
2042 resolve_value_subject(SubjectNameStrategy::TopicName, None, "orders"),
2043 "orders-value"
2044 );
2045 }
2046
2047 #[test]
2048 fn resolve_subject_record_name() {
2049 assert_eq!(
2050 resolve_value_subject(
2051 SubjectNameStrategy::RecordName,
2052 Some("com.acme.Order"),
2053 "orders"
2054 ),
2055 "com.acme.Order-value"
2056 );
2057 }
2058
2059 #[test]
2060 fn resolve_subject_topic_record_name() {
2061 assert_eq!(
2062 resolve_value_subject(
2063 SubjectNameStrategy::TopicRecordName,
2064 Some("com.acme.Order"),
2065 "orders"
2066 ),
2067 "orders-com.acme.Order-value"
2068 );
2069 }
2070
2071 #[test]
2072 fn parse_subject_strategy_from_config() {
2073 let cfg = KafkaSourceConfig::from_config(&make_config(&[
2074 ("schema.registry.url", "http://sr:8081"),
2075 ("schema.registry.subject.name.strategy", "record-name"),
2076 ("schema.registry.record.name", "com.acme.Order"),
2077 ]))
2078 .unwrap();
2079 assert_eq!(
2080 cfg.schema_registry_subject_strategy,
2081 SubjectNameStrategy::RecordName
2082 );
2083 assert_eq!(
2084 cfg.schema_registry_record_name.as_deref(),
2085 Some("com.acme.Order")
2086 );
2087 }
2088
2089 #[test]
2090 fn parse_subject_strategy_rejects_missing_record_name() {
2091 let err = KafkaSourceConfig::from_config(&make_config(&[
2092 ("schema.registry.url", "http://sr:8081"),
2093 ("schema.registry.subject.name.strategy", "record-name"),
2094 ]))
2095 .unwrap_err();
2096 assert!(matches!(err, ConnectorError::ConfigurationError(_)));
2097 }
2098
2099 #[test]
2100 fn parse_discovery_timeout_default_ten_seconds() {
2101 let cfg =
2102 KafkaSourceConfig::from_config(&make_config(&[("schema.registry.url", "http://sr")]))
2103 .unwrap();
2104 assert_eq!(
2105 cfg.schema_registry_discovery_timeout,
2106 Duration::from_secs(10)
2107 );
2108 }
2109
2110 #[test]
2111 fn parse_discovery_timeout_override() {
2112 let cfg = KafkaSourceConfig::from_config(&make_config(&[
2113 ("schema.registry.url", "http://sr"),
2114 ("schema.registry.discovery.timeout.ms", "25000"),
2115 ]))
2116 .unwrap();
2117 assert_eq!(
2118 cfg.schema_registry_discovery_timeout,
2119 Duration::from_secs(25)
2120 );
2121 }
2122}