1use std::collections::HashMap;
5use std::time::Duration;
6
7use rdkafka::config::ClientConfig;
8
9use crate::config::ConnectorConfig;
10use crate::error::ConnectorError;
11use crate::serde::Format;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum SecurityProtocol {
18 #[default]
20 Plaintext,
21 Ssl,
23 SaslPlaintext,
25 SaslSsl,
27}
28
29impl SecurityProtocol {
30 #[must_use]
32 pub fn as_rdkafka_str(&self) -> &'static str {
33 match self {
34 SecurityProtocol::Plaintext => "plaintext",
35 SecurityProtocol::Ssl => "ssl",
36 SecurityProtocol::SaslPlaintext => "sasl_plaintext",
37 SecurityProtocol::SaslSsl => "sasl_ssl",
38 }
39 }
40
41 #[must_use]
43 pub fn uses_ssl(&self) -> bool {
44 matches!(self, SecurityProtocol::Ssl | SecurityProtocol::SaslSsl)
45 }
46
47 #[must_use]
49 pub fn uses_sasl(&self) -> bool {
50 matches!(
51 self,
52 SecurityProtocol::SaslPlaintext | SecurityProtocol::SaslSsl
53 )
54 }
55}
56
57str_enum!(fromstr SecurityProtocol, lowercase, ConnectorError, "invalid security.protocol",
58 Plaintext => "plaintext";
59 Ssl => "ssl";
60 SaslPlaintext => "sasl_plaintext";
61 SaslSsl => "sasl_ssl"
62);
63
64impl std::fmt::Display for SecurityProtocol {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(f, "{}", self.as_rdkafka_str())
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
72pub enum SaslMechanism {
73 #[default]
75 Plain,
76 ScramSha256,
78 ScramSha512,
80 Gssapi,
82 Oauthbearer,
84}
85
86impl SaslMechanism {
87 #[must_use]
89 pub fn as_rdkafka_str(&self) -> &'static str {
90 match self {
91 SaslMechanism::Plain => "PLAIN",
92 SaslMechanism::ScramSha256 => "SCRAM-SHA-256",
93 SaslMechanism::ScramSha512 => "SCRAM-SHA-512",
94 SaslMechanism::Gssapi => "GSSAPI",
95 SaslMechanism::Oauthbearer => "OAUTHBEARER",
96 }
97 }
98
99 #[must_use]
101 pub fn requires_credentials(&self) -> bool {
102 matches!(
103 self,
104 SaslMechanism::Plain | SaslMechanism::ScramSha256 | SaslMechanism::ScramSha512
105 )
106 }
107}
108
109str_enum!(fromstr SaslMechanism, uppercase, ConnectorError, "invalid sasl.mechanism",
110 Plain => "PLAIN";
111 ScramSha256 => "SCRAM_SHA_256", "SCRAM_SHA256";
112 ScramSha512 => "SCRAM_SHA_512", "SCRAM_SHA512";
113 Gssapi => "GSSAPI", "KERBEROS";
114 Oauthbearer => "OAUTHBEARER", "OAUTH"
115);
116
117impl std::fmt::Display for SaslMechanism {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 write!(f, "{}", self.as_rdkafka_str())
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
127pub enum IsolationLevel {
128 ReadUncommitted,
130 #[default]
132 ReadCommitted,
133}
134
135impl IsolationLevel {
136 #[must_use]
138 pub fn as_rdkafka_str(&self) -> &'static str {
139 match self {
140 IsolationLevel::ReadUncommitted => "read_uncommitted",
141 IsolationLevel::ReadCommitted => "read_committed",
142 }
143 }
144}
145
146str_enum!(fromstr IsolationLevel, lowercase, ConnectorError, "invalid isolation.level",
147 ReadUncommitted => "read_uncommitted";
148 ReadCommitted => "read_committed"
149);
150
151impl std::fmt::Display for IsolationLevel {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 write!(f, "{}", self.as_rdkafka_str())
154 }
155}
156
157#[derive(Debug, Clone, PartialEq, Eq, Default)]
163pub enum StartupMode {
164 #[default]
166 GroupOffsets,
167 Earliest,
169 Latest,
171 SpecificOffsets(HashMap<i32, i64>),
173 Timestamp(i64),
176}
177
178impl StartupMode {}
179
180impl std::str::FromStr for StartupMode {
181 type Err = ConnectorError;
182
183 fn from_str(s: &str) -> Result<Self, Self::Err> {
184 match s.to_lowercase().replace('-', "_").as_str() {
185 "group_offsets" | "group" => Ok(StartupMode::GroupOffsets),
186 "earliest" => Ok(StartupMode::Earliest),
187 "latest" => Ok(StartupMode::Latest),
188 "timestamp" => Err(ConnectorError::ConfigurationError(
189 "use 'startup.timestamp.ms' to start from a timestamp, \
190 not 'startup.mode = timestamp'"
191 .into(),
192 )),
193 other => Err(ConnectorError::ConfigurationError(format!(
194 "invalid startup.mode: '{other}' (expected group-offsets/earliest/latest)"
195 ))),
196 }
197 }
198}
199
200impl std::fmt::Display for StartupMode {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 match self {
203 StartupMode::GroupOffsets => write!(f, "group-offsets"),
204 StartupMode::Earliest => write!(f, "earliest"),
205 StartupMode::Latest => write!(f, "latest"),
206 StartupMode::SpecificOffsets(offsets) => {
207 write!(f, "specific-offsets({} partitions)", offsets.len())
208 }
209 StartupMode::Timestamp(ts) => write!(f, "timestamp({ts})"),
210 }
211 }
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
216pub enum TopicSubscription {
217 Topics(Vec<String>),
219 Pattern(String),
221}
222
223impl TopicSubscription {
224 #[must_use]
226 pub fn topics(&self) -> Option<&[String]> {
227 match self {
228 TopicSubscription::Topics(t) => Some(t),
229 TopicSubscription::Pattern(_) => None,
230 }
231 }
232
233 #[must_use]
235 pub fn pattern(&self) -> Option<&str> {
236 match self {
237 TopicSubscription::Topics(_) => None,
238 TopicSubscription::Pattern(p) => Some(p),
239 }
240 }
241
242 #[must_use]
244 pub fn is_pattern(&self) -> bool {
245 matches!(self, TopicSubscription::Pattern(_))
246 }
247}
248
249impl Default for TopicSubscription {
250 fn default() -> Self {
251 TopicSubscription::Topics(Vec::new())
252 }
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub enum OffsetReset {
258 Earliest,
260 Latest,
262 None,
264}
265
266impl OffsetReset {
267 #[must_use]
269 pub fn as_rdkafka_str(&self) -> &'static str {
270 match self {
271 OffsetReset::Earliest => "earliest",
272 OffsetReset::Latest => "latest",
273 OffsetReset::None => "error",
274 }
275 }
276}
277
278str_enum!(fromstr OffsetReset, lowercase_nodash, ConnectorError, "invalid auto.offset.reset",
279 Earliest => "earliest", "beginning";
280 Latest => "latest", "end";
281 None => "none", "error"
282);
283
284#[derive(Debug, Clone, Copy, PartialEq, Eq)]
286pub enum AssignmentStrategy {
287 Range,
289 RoundRobin,
291 CooperativeSticky,
293}
294
295impl AssignmentStrategy {
296 #[must_use]
298 pub fn as_rdkafka_str(&self) -> &'static str {
299 match self {
300 AssignmentStrategy::Range => "range",
301 AssignmentStrategy::RoundRobin => "roundrobin",
302 AssignmentStrategy::CooperativeSticky => "cooperative-sticky",
303 }
304 }
305}
306
307str_enum!(fromstr AssignmentStrategy, lowercase_nodash, ConnectorError,
308 "invalid partition.assignment.strategy",
309 Range => "range";
310 RoundRobin => "roundrobin", "round-robin", "round_robin";
311 CooperativeSticky => "cooperative-sticky", "cooperative_sticky"
312);
313
314#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316pub enum CompatibilityLevel {
317 Backward,
319 BackwardTransitive,
321 Forward,
323 ForwardTransitive,
325 Full,
327 FullTransitive,
329 None,
331}
332
333impl CompatibilityLevel {
334 #[must_use]
336 pub fn as_str(&self) -> &'static str {
337 match self {
338 CompatibilityLevel::Backward => "BACKWARD",
339 CompatibilityLevel::BackwardTransitive => "BACKWARD_TRANSITIVE",
340 CompatibilityLevel::Forward => "FORWARD",
341 CompatibilityLevel::ForwardTransitive => "FORWARD_TRANSITIVE",
342 CompatibilityLevel::Full => "FULL",
343 CompatibilityLevel::FullTransitive => "FULL_TRANSITIVE",
344 CompatibilityLevel::None => "NONE",
345 }
346 }
347}
348
349str_enum!(fromstr CompatibilityLevel, uppercase, ConnectorError, "invalid schema.compatibility",
350 Backward => "BACKWARD";
351 BackwardTransitive => "BACKWARD_TRANSITIVE";
352 Forward => "FORWARD";
353 ForwardTransitive => "FORWARD_TRANSITIVE";
354 Full => "FULL";
355 FullTransitive => "FULL_TRANSITIVE";
356 None => "NONE"
357);
358
359impl std::fmt::Display for CompatibilityLevel {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 write!(f, "{}", self.as_str())
362 }
363}
364
365#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
367pub enum SchemaEvolutionStrategy {
368 #[default]
370 Log,
371 Reject,
373 Ignore,
375}
376
377str_enum!(fromstr SchemaEvolutionStrategy, lowercase, ConnectorError,
378 "invalid schema.evolution.strategy",
379 Log => "log";
380 Reject => "reject";
381 Ignore => "ignore"
382);
383
384impl std::fmt::Display for SchemaEvolutionStrategy {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 match self {
387 SchemaEvolutionStrategy::Log => write!(f, "log"),
388 SchemaEvolutionStrategy::Reject => write!(f, "reject"),
389 SchemaEvolutionStrategy::Ignore => write!(f, "ignore"),
390 }
391 }
392}
393
394#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
396pub enum SubjectNameStrategy {
397 #[default]
399 TopicName,
400 RecordName,
402 TopicRecordName,
404}
405
406str_enum!(fromstr SubjectNameStrategy, lowercase_nodash, ConnectorError,
407 "invalid schema.registry.subject.name.strategy",
408 TopicName => "topic-name", "topicname", "topicnamestrategy";
409 RecordName => "record-name", "recordname", "recordnamestrategy";
410 TopicRecordName => "topic-record-name", "topicrecordname", "topicrecordnamestrategy"
411);
412
413impl std::fmt::Display for SubjectNameStrategy {
414 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
415 match self {
416 SubjectNameStrategy::TopicName => write!(f, "topic-name"),
417 SubjectNameStrategy::RecordName => write!(f, "record-name"),
418 SubjectNameStrategy::TopicRecordName => write!(f, "topic-record-name"),
419 }
420 }
421}
422
423pub(crate) fn resolve_value_subject(
427 strategy: SubjectNameStrategy,
428 record_name: Option<&str>,
429 topic: &str,
430) -> String {
431 let name = || record_name.expect("from_config validates record.name");
432 match strategy {
433 SubjectNameStrategy::TopicName => format!("{topic}-value"),
434 SubjectNameStrategy::RecordName => format!("{}-value", name()),
435 SubjectNameStrategy::TopicRecordName => format!("{topic}-{}-value", name()),
436 }
437}
438
439impl From<CompatibilityLevel> for crate::schema::traits::CompatibilityMode {
442 fn from(level: CompatibilityLevel) -> Self {
443 use crate::schema::traits::CompatibilityMode;
444 match level {
445 CompatibilityLevel::Backward => CompatibilityMode::Backward,
446 CompatibilityLevel::BackwardTransitive => CompatibilityMode::BackwardTransitive,
447 CompatibilityLevel::Forward => CompatibilityMode::Forward,
448 CompatibilityLevel::ForwardTransitive => CompatibilityMode::ForwardTransitive,
449 CompatibilityLevel::Full => CompatibilityMode::Full,
450 CompatibilityLevel::FullTransitive => CompatibilityMode::FullTransitive,
451 CompatibilityLevel::None => CompatibilityMode::None,
452 }
453 }
454}
455
456#[derive(Clone)]
458pub struct SrAuth {
459 pub username: String,
461 pub password: String,
463}
464
465impl std::fmt::Debug for SrAuth {
466 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
467 f.debug_struct("SrAuth")
468 .field("username", &self.username)
469 .field("password", &"***")
470 .finish()
471 }
472}
473
474#[derive(Clone)]
479#[allow(clippy::struct_excessive_bools)] pub struct KafkaSourceConfig {
481 pub bootstrap_servers: String,
484 pub group_id: String,
486 pub subscription: TopicSubscription,
488
489 pub security_protocol: SecurityProtocol,
492 pub sasl_mechanism: Option<SaslMechanism>,
494 pub sasl_username: Option<String>,
496 pub sasl_password: Option<String>,
498 pub ssl_ca_location: Option<String>,
500 pub ssl_certificate_location: Option<String>,
502 pub ssl_key_location: Option<String>,
504 pub ssl_key_password: Option<String>,
506
507 pub format: Format,
510 pub schema_registry_url: Option<String>,
512 pub schema_registry_auth: Option<SrAuth>,
514 pub schema_compatibility: Option<CompatibilityLevel>,
516 pub schema_evolution_strategy: SchemaEvolutionStrategy,
518 pub schema_registry_ssl_ca_location: Option<String>,
520 pub schema_registry_ssl_certificate_location: Option<String>,
522 pub schema_registry_ssl_key_location: Option<String>,
524 pub schema_registry_subject_strategy: SubjectNameStrategy,
526 pub schema_registry_record_name: Option<String>,
528 pub schema_registry_discovery_timeout: Duration,
531 pub event_time_column: Option<String>,
533 pub include_metadata: bool,
535 pub include_headers: bool,
537
538 pub startup_mode: StartupMode,
541 pub auto_offset_reset: OffsetReset,
543 pub isolation_level: IsolationLevel,
545 pub max_poll_records: usize,
547 pub partition_assignment_strategy: AssignmentStrategy,
549 pub fetch_min_bytes: Option<i32>,
551 pub fetch_max_bytes: Option<i32>,
553 pub fetch_max_wait_ms: Option<i32>,
555 pub max_partition_fetch_bytes: Option<i32>,
557
558 pub max_out_of_orderness: Duration,
561 pub idle_timeout: Duration,
563 pub enable_watermark_tracking: bool,
565 pub session_timeout: Duration,
569 pub heartbeat_interval: Duration,
572 pub max_poll_interval: Duration,
579 pub queued_max_messages_kbytes: u32,
582
583 pub broker_commit_interval: Duration,
591
592 pub reader_channel_capacity: usize,
596 pub backpressure_high_watermark: f64,
598 pub backpressure_low_watermark: f64,
600
601 pub max_deser_error_rate: f64,
609
610 pub kafka_properties: HashMap<String, String>,
613}
614
615impl std::fmt::Debug for KafkaSourceConfig {
616 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617 f.debug_struct("KafkaSourceConfig")
618 .field("bootstrap_servers", &self.bootstrap_servers)
619 .field("group_id", &self.group_id)
620 .field("subscription", &self.subscription)
621 .field("format", &self.format)
622 .field("security_protocol", &self.security_protocol)
623 .field("sasl_mechanism", &self.sasl_mechanism)
624 .field("sasl_username", &self.sasl_username)
625 .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
626 .field(
627 "ssl_key_password",
628 &self.ssl_key_password.as_ref().map(|_| "***"),
629 )
630 .field("max_poll_records", &self.max_poll_records)
631 .finish_non_exhaustive()
632 }
633}
634
635impl Default for KafkaSourceConfig {
636 fn default() -> Self {
637 Self {
638 bootstrap_servers: String::new(),
639 group_id: String::new(),
640 subscription: TopicSubscription::default(),
641 security_protocol: SecurityProtocol::default(),
642 sasl_mechanism: None,
643 sasl_username: None,
644 sasl_password: None,
645 ssl_ca_location: None,
646 ssl_certificate_location: None,
647 ssl_key_location: None,
648 ssl_key_password: None,
649 format: Format::Json,
650 schema_registry_url: None,
651 schema_registry_auth: None,
652 schema_compatibility: None,
653 schema_evolution_strategy: SchemaEvolutionStrategy::default(),
654 schema_registry_ssl_ca_location: None,
655 schema_registry_ssl_certificate_location: None,
656 schema_registry_ssl_key_location: None,
657 schema_registry_subject_strategy: SubjectNameStrategy::default(),
658 schema_registry_record_name: None,
659 schema_registry_discovery_timeout: Duration::from_secs(10),
660 event_time_column: None,
661 include_metadata: false,
662 include_headers: false,
663 startup_mode: StartupMode::default(),
664 auto_offset_reset: OffsetReset::Earliest,
665 isolation_level: IsolationLevel::default(),
666 max_poll_records: 1000,
667 partition_assignment_strategy: AssignmentStrategy::Range,
668 fetch_min_bytes: None,
669 fetch_max_bytes: None,
670 fetch_max_wait_ms: None,
671 max_partition_fetch_bytes: None,
672 max_out_of_orderness: Duration::from_secs(5),
673 idle_timeout: Duration::from_secs(30),
674 enable_watermark_tracking: false,
675 session_timeout: Duration::from_secs(45),
676 heartbeat_interval: Duration::from_secs(10),
677 max_poll_interval: Duration::from_secs(600),
678 queued_max_messages_kbytes: 16384,
679 broker_commit_interval: Duration::from_secs(5),
680 reader_channel_capacity: 1024,
681 backpressure_high_watermark: 0.8,
682 backpressure_low_watermark: 0.5,
683 max_deser_error_rate: 0.5,
684 kafka_properties: HashMap::new(),
685 }
686 }
687}
688
689impl KafkaSourceConfig {
690 #[allow(deprecated, clippy::too_many_lines)]
696 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
697 let bootstrap_servers = config.require("bootstrap.servers")?.to_string();
698 let group_id = config.require("group.id")?.to_string();
699
700 let subscription = if let Some(pattern) = config.get("topic.pattern") {
701 TopicSubscription::Pattern(pattern.to_string())
702 } else {
703 let topics_str = config.require("topic")?;
704 let topics: Vec<String> = topics_str
705 .split(',')
706 .map(|s| s.trim().to_string())
707 .collect();
708 TopicSubscription::Topics(topics.clone())
709 };
710
711 let security_protocol = match config.get("security.protocol") {
712 Some(s) => s.parse::<SecurityProtocol>()?,
713 None => SecurityProtocol::default(),
714 };
715
716 let sasl_mechanism = match config.get("sasl.mechanism") {
717 Some(s) => Some(s.parse::<SaslMechanism>()?),
718 None => None,
719 };
720
721 let sasl_username = config.get("sasl.username").map(String::from);
722 let sasl_password = config.get("sasl.password").map(String::from);
723 let ssl_ca_location = config.get("ssl.ca.location").map(String::from);
724 let ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
725 let ssl_key_location = config.get("ssl.key.location").map(String::from);
726 let ssl_key_password = config.get("ssl.key.password").map(String::from);
727
728 let format = match config.get("format") {
729 Some(f) => f
730 .parse::<Format>()
731 .map_err(|e| ConnectorError::ConfigurationError(e.to_string()))?,
732 None => Format::Json,
733 };
734
735 let schema_registry_url = config.get("schema.registry.url").map(String::from);
736
737 let schema_registry_auth = match (
738 config.get("schema.registry.username"),
739 config.get("schema.registry.password"),
740 ) {
741 (Some(u), Some(p)) => Some(SrAuth {
742 username: u.to_string(),
743 password: p.to_string(),
744 }),
745 (Some(_), None) | (None, Some(_)) => {
746 return Err(ConnectorError::ConfigurationError(
747 "schema.registry.username and schema.registry.password must both be set"
748 .to_string(),
749 ));
750 }
751 (None, None) => None,
752 };
753
754 let schema_compatibility = match config.get("schema.compatibility") {
755 Some(s) => Some(s.parse::<CompatibilityLevel>()?),
756 None => None,
757 };
758
759 let schema_evolution_strategy = match config.get("schema.evolution.strategy") {
760 Some(s) => s.parse::<SchemaEvolutionStrategy>()?,
761 None => SchemaEvolutionStrategy::default(),
762 };
763
764 let schema_registry_ssl_ca_location = config
765 .get("schema.registry.ssl.ca.location")
766 .map(String::from);
767 let schema_registry_ssl_certificate_location = config
768 .get("schema.registry.ssl.certificate.location")
769 .map(String::from);
770 let schema_registry_ssl_key_location = config
771 .get("schema.registry.ssl.key.location")
772 .map(String::from);
773
774 let schema_registry_subject_strategy =
775 match config.get("schema.registry.subject.name.strategy") {
776 Some(s) => s.parse::<SubjectNameStrategy>()?,
777 None => SubjectNameStrategy::default(),
778 };
779
780 let schema_registry_record_name =
781 config.get("schema.registry.record.name").map(String::from);
782
783 if matches!(
784 schema_registry_subject_strategy,
785 SubjectNameStrategy::RecordName | SubjectNameStrategy::TopicRecordName
786 ) && schema_registry_record_name.is_none()
787 {
788 return Err(ConnectorError::ConfigurationError(format!(
789 "schema.registry.subject.name.strategy={schema_registry_subject_strategy} \
790 requires schema.registry.record.name"
791 )));
792 }
793
794 let schema_registry_discovery_timeout = config
795 .get_parsed::<u64>("schema.registry.discovery.timeout.ms")?
796 .map_or(Duration::from_secs(10), Duration::from_millis);
797
798 let event_time_column = config.get("event.time.column").map(String::from);
799
800 let include_metadata = config
801 .get_parsed::<bool>("include.metadata")?
802 .unwrap_or(false);
803
804 let include_headers = config
805 .get_parsed::<bool>("include.headers")?
806 .unwrap_or(false);
807
808 let startup_mode = if let Some(offsets_str) = config.get("startup.specific.offsets") {
809 let offsets = parse_specific_offsets(offsets_str)?;
810 StartupMode::SpecificOffsets(offsets)
811 } else if let Some(ts_str) = config.get("startup.timestamp.ms") {
812 let ts: i64 = ts_str.parse().map_err(|_| {
813 ConnectorError::ConfigurationError(format!(
814 "invalid startup.timestamp.ms: '{ts_str}'"
815 ))
816 })?;
817 StartupMode::Timestamp(ts)
818 } else {
819 match config.get("startup.mode") {
820 Some(s) => s.parse::<StartupMode>()?,
821 None => StartupMode::default(),
822 }
823 };
824
825 let auto_offset_reset = match &startup_mode {
830 StartupMode::Earliest => OffsetReset::Earliest,
831 StartupMode::Latest => OffsetReset::Latest,
832 _ => match config.get("auto.offset.reset") {
833 Some(s) => s.parse::<OffsetReset>()?,
834 None => OffsetReset::Earliest,
835 },
836 };
837
838 let isolation_level = match config.get("isolation.level") {
839 Some(s) => s.parse::<IsolationLevel>()?,
840 None => IsolationLevel::default(),
841 };
842
843 let max_poll_records = config
844 .get_parsed::<usize>("max.poll.records")?
845 .unwrap_or(1000);
846
847 let partition_assignment_strategy = match config.get("partition.assignment.strategy") {
848 Some(s) => s.parse::<AssignmentStrategy>()?,
849 None => AssignmentStrategy::Range,
850 };
851
852 let fetch_min_bytes = config.get_parsed::<i32>("fetch.min.bytes")?;
853 let fetch_max_bytes = config.get_parsed::<i32>("fetch.max.bytes")?;
854 let fetch_max_wait_ms = config.get_parsed::<i32>("fetch.max.wait.ms")?;
855 let max_partition_fetch_bytes = config.get_parsed::<i32>("max.partition.fetch.bytes")?;
856
857 let max_out_of_orderness_ms = config
858 .get_parsed::<u64>("max.out.of.orderness.ms")?
859 .unwrap_or(5000);
860
861 let idle_timeout_ms = config
862 .get_parsed::<u64>("idle.timeout.ms")?
863 .unwrap_or(30_000);
864
865 let enable_watermark_tracking = config
866 .get_parsed::<bool>("enable.watermark.tracking")?
867 .unwrap_or(false);
868
869 let session_timeout_ms = config
870 .get_parsed::<u64>("session.timeout.ms")?
871 .unwrap_or(45_000);
872 let heartbeat_interval_ms = config
873 .get_parsed::<u64>("heartbeat.interval.ms")?
874 .unwrap_or(10_000);
875 let queued_max_messages_kbytes = config
876 .get_parsed::<u32>("queued.max.messages.kbytes")?
877 .unwrap_or(16384);
878 let max_poll_interval_ms = config
879 .get_parsed::<u64>("max.poll.interval.ms")?
880 .unwrap_or(600_000);
881
882 let broker_commit_interval_ms = config
883 .get_parsed::<u64>("broker.commit.interval.ms")?
884 .unwrap_or(5_000);
885
886 let reader_channel_capacity = config
887 .get_parsed::<usize>("reader.channel.capacity")?
888 .unwrap_or(1024);
889
890 let backpressure_high_watermark = config
891 .get_parsed::<f64>("backpressure.high.watermark")?
892 .unwrap_or(0.8);
893
894 let backpressure_low_watermark = config
895 .get_parsed::<f64>("backpressure.low.watermark")?
896 .unwrap_or(0.5);
897
898 let max_deser_error_rate = config
899 .get_parsed::<f64>("max.deser.error.rate")?
900 .unwrap_or(0.5);
901
902 let kafka_properties = config.properties_with_prefix("kafka.");
903
904 let cfg = Self {
905 bootstrap_servers,
906 group_id,
907 subscription,
908 security_protocol,
909 sasl_mechanism,
910 sasl_username,
911 sasl_password,
912 ssl_ca_location,
913 ssl_certificate_location,
914 ssl_key_location,
915 ssl_key_password,
916 format,
917 schema_registry_url,
918 schema_registry_auth,
919 schema_compatibility,
920 schema_evolution_strategy,
921 schema_registry_ssl_ca_location,
922 schema_registry_ssl_certificate_location,
923 schema_registry_ssl_key_location,
924 schema_registry_subject_strategy,
925 schema_registry_record_name,
926 schema_registry_discovery_timeout,
927 event_time_column,
928 include_metadata,
929 include_headers,
930 startup_mode,
931 auto_offset_reset,
932 isolation_level,
933 max_poll_records,
934 partition_assignment_strategy,
935 fetch_min_bytes,
936 fetch_max_bytes,
937 fetch_max_wait_ms,
938 max_partition_fetch_bytes,
939 max_out_of_orderness: Duration::from_millis(max_out_of_orderness_ms),
940 idle_timeout: Duration::from_millis(idle_timeout_ms),
941 enable_watermark_tracking,
942 session_timeout: Duration::from_millis(session_timeout_ms),
943 heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
944 max_poll_interval: Duration::from_millis(max_poll_interval_ms),
945 queued_max_messages_kbytes,
946 broker_commit_interval: Duration::from_millis(broker_commit_interval_ms),
947 reader_channel_capacity,
948 backpressure_high_watermark,
949 backpressure_low_watermark,
950 max_deser_error_rate,
951 kafka_properties,
952 };
953
954 cfg.validate()?;
955 Ok(cfg)
956 }
957
958 pub fn validate(&self) -> Result<(), ConnectorError> {
964 if self.bootstrap_servers.is_empty() {
965 return Err(ConnectorError::ConfigurationError(
966 "bootstrap.servers cannot be empty".into(),
967 ));
968 }
969 if self.group_id.is_empty() {
970 return Err(ConnectorError::ConfigurationError(
971 "group.id cannot be empty".into(),
972 ));
973 }
974
975 match &self.subscription {
977 TopicSubscription::Topics(t) if t.is_empty() => {
978 return Err(ConnectorError::ConfigurationError(
979 "at least one topic is required (or use topic.pattern)".into(),
980 ));
981 }
982 TopicSubscription::Pattern(p) if p.is_empty() => {
983 return Err(ConnectorError::ConfigurationError(
984 "topic.pattern cannot be empty".into(),
985 ));
986 }
987 _ => {}
988 }
989
990 if self.max_poll_records == 0 {
991 return Err(ConnectorError::ConfigurationError(
992 "max.poll.records must be > 0".into(),
993 ));
994 }
995 if self.reader_channel_capacity < self.max_poll_records {
996 return Err(ConnectorError::ConfigurationError(format!(
997 "reader.channel.capacity ({}) must be >= max.poll.records ({})",
998 self.reader_channel_capacity, self.max_poll_records
999 )));
1000 }
1001
1002 if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
1003 return Err(ConnectorError::ConfigurationError(
1004 "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
1005 .into(),
1006 ));
1007 }
1008
1009 if let Some(mechanism) = &self.sasl_mechanism {
1010 if mechanism.requires_credentials()
1011 && (self.sasl_username.is_none() || self.sasl_password.is_none())
1012 {
1013 return Err(ConnectorError::ConfigurationError(format!(
1014 "sasl.username and sasl.password are required for {mechanism} mechanism"
1015 )));
1016 }
1017 }
1018
1019 if self.security_protocol.uses_ssl() {
1020 if let Some(ref ca) = self.ssl_ca_location {
1021 if ca.is_empty() {
1022 return Err(ConnectorError::ConfigurationError(
1023 "ssl.ca.location cannot be empty when specified".into(),
1024 ));
1025 }
1026 }
1027 }
1028
1029 if self.max_poll_interval.as_secs() < 60 {
1030 return Err(ConnectorError::ConfigurationError(format!(
1031 "max.poll.interval.ms ({}) must be >= 60000 (60s)",
1032 self.max_poll_interval.as_millis()
1033 )));
1034 }
1035
1036 if self.heartbeat_interval.as_millis() * 3 >= self.session_timeout.as_millis() {
1038 return Err(ConnectorError::ConfigurationError(format!(
1039 "heartbeat.interval.ms ({}) * 3 must be < session.timeout.ms ({})",
1040 self.heartbeat_interval.as_millis(),
1041 self.session_timeout.as_millis()
1042 )));
1043 }
1044
1045 if self.backpressure_high_watermark <= self.backpressure_low_watermark {
1046 return Err(ConnectorError::ConfigurationError(
1047 "backpressure.high.watermark must be > backpressure.low.watermark".into(),
1048 ));
1049 }
1050 if !(0.0..=1.0).contains(&self.backpressure_high_watermark) {
1051 return Err(ConnectorError::ConfigurationError(
1052 "backpressure.high.watermark must be between 0.0 and 1.0".into(),
1053 ));
1054 }
1055 if !(0.0..=1.0).contains(&self.backpressure_low_watermark) {
1056 return Err(ConnectorError::ConfigurationError(
1057 "backpressure.low.watermark must be between 0.0 and 1.0".into(),
1058 ));
1059 }
1060
1061 if !(0.0..=1.0).contains(&self.max_deser_error_rate) {
1062 return Err(ConnectorError::ConfigurationError(
1063 "max.deser.error.rate must be between 0.0 and 1.0".into(),
1064 ));
1065 }
1066
1067 if self.format == Format::Avro && self.schema_registry_url.is_none() {
1068 return Err(ConnectorError::ConfigurationError(
1069 "schema.registry.url is required when format is 'avro'".into(),
1070 ));
1071 }
1072
1073 Ok(())
1074 }
1075
1076 #[must_use]
1078 pub fn to_rdkafka_config(&self) -> ClientConfig {
1079 let mut config = ClientConfig::new();
1080
1081 config.set("bootstrap.servers", &self.bootstrap_servers);
1082 config.set("group.id", &self.group_id);
1083 config.set("enable.auto.commit", "false");
1084 config.set("enable.auto.offset.store", "false");
1085 config.set("auto.offset.reset", self.auto_offset_reset.as_rdkafka_str());
1086 config.set(
1087 "partition.assignment.strategy",
1088 self.partition_assignment_strategy.as_rdkafka_str(),
1089 );
1090 config.set("security.protocol", self.security_protocol.as_rdkafka_str());
1091
1092 if let Some(ref mechanism) = self.sasl_mechanism {
1093 config.set("sasl.mechanism", mechanism.as_rdkafka_str());
1094 }
1095 if let Some(ref username) = self.sasl_username {
1096 config.set("sasl.username", username);
1097 }
1098 if let Some(ref password) = self.sasl_password {
1099 config.set("sasl.password", password);
1100 }
1101 if let Some(ref ca) = self.ssl_ca_location {
1102 config.set("ssl.ca.location", ca);
1103 }
1104 if let Some(ref cert) = self.ssl_certificate_location {
1105 config.set("ssl.certificate.location", cert);
1106 }
1107 if let Some(ref key) = self.ssl_key_location {
1108 config.set("ssl.key.location", key);
1109 }
1110 if let Some(ref key_pass) = self.ssl_key_password {
1111 config.set("ssl.key.password", key_pass);
1112 }
1113
1114 config.set("isolation.level", self.isolation_level.as_rdkafka_str());
1115 config.set(
1116 "session.timeout.ms",
1117 self.session_timeout.as_millis().to_string(),
1118 );
1119 config.set(
1120 "heartbeat.interval.ms",
1121 self.heartbeat_interval.as_millis().to_string(),
1122 );
1123 config.set(
1124 "queued.max.messages.kbytes",
1125 self.queued_max_messages_kbytes.to_string(),
1126 );
1127 config.set(
1128 "max.poll.interval.ms",
1129 self.max_poll_interval.as_millis().to_string(),
1130 );
1131
1132 if let Some(fetch_min) = self.fetch_min_bytes {
1133 config.set("fetch.min.bytes", fetch_min.to_string());
1134 }
1135
1136 if let Some(fetch_max) = self.fetch_max_bytes {
1137 config.set("fetch.max.bytes", fetch_max.to_string());
1138 }
1139
1140 if let Some(wait_ms) = self.fetch_max_wait_ms {
1141 config.set("fetch.wait.max.ms", wait_ms.to_string());
1142 }
1143
1144 if let Some(partition_max) = self.max_partition_fetch_bytes {
1145 config.set("max.partition.fetch.bytes", partition_max.to_string());
1146 }
1147
1148 for (key, value) in &self.kafka_properties {
1151 if is_blocked_passthrough_key(key) {
1152 tracing::warn!(
1153 key,
1154 "ignoring kafka.* pass-through property that overrides a security setting"
1155 );
1156 continue;
1157 }
1158 config.set(key, value);
1159 }
1160
1161 config
1162 }
1163}
1164
1165fn is_blocked_passthrough_key(key: &str) -> bool {
1167 key.starts_with("sasl.kerberos.")
1168 || matches!(
1169 key,
1170 "security.protocol"
1171 | "sasl.mechanism"
1172 | "sasl.username"
1173 | "sasl.password"
1174 | "sasl.oauthbearer.config"
1175 | "ssl.ca.location"
1176 | "ssl.certificate.location"
1177 | "ssl.key.location"
1178 | "ssl.key.password"
1179 | "ssl.endpoint.identification.algorithm"
1180 | "enable.auto.commit"
1181 | "enable.auto.offset.store"
1182 | "enable.idempotence"
1183 | "auto.offset.reset"
1184 | "session.timeout.ms"
1185 | "heartbeat.interval.ms"
1186 | "max.poll.interval.ms"
1187 | "queued.max.messages.kbytes"
1188 )
1189}
1190
1191fn parse_specific_offsets(s: &str) -> Result<HashMap<i32, i64>, ConnectorError> {
1195 let mut offsets = HashMap::new();
1196
1197 for pair in s.split(',') {
1198 let pair = pair.trim();
1199 if pair.is_empty() {
1200 continue;
1201 }
1202
1203 let parts: Vec<&str> = pair.split(':').collect();
1204 if parts.len() != 2 {
1205 return Err(ConnectorError::ConfigurationError(format!(
1206 "invalid offset format '{pair}' (expected 'partition:offset')"
1207 )));
1208 }
1209
1210 let partition: i32 = parts[0].trim().parse().map_err(|_| {
1211 ConnectorError::ConfigurationError(format!(
1212 "invalid partition number '{}' in '{pair}'",
1213 parts[0]
1214 ))
1215 })?;
1216
1217 let offset: i64 = parts[1].trim().parse().map_err(|_| {
1218 ConnectorError::ConfigurationError(format!("invalid offset '{}' in '{pair}'", parts[1]))
1219 })?;
1220
1221 offsets.insert(partition, offset);
1222 }
1223
1224 if offsets.is_empty() {
1225 return Err(ConnectorError::ConfigurationError(
1226 "startup.specific.offsets cannot be empty".into(),
1227 ));
1228 }
1229
1230 Ok(offsets)
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235 use super::*;
1236
1237 fn make_config(extra: &[(&str, &str)]) -> ConnectorConfig {
1238 let mut config = ConnectorConfig::new("kafka");
1239 config.set("bootstrap.servers", "localhost:9092");
1240 config.set("group.id", "test-group");
1241 config.set("topic", "events");
1242 for (k, v) in extra {
1243 config.set(*k, *v);
1244 }
1245 config
1246 }
1247
1248 #[test]
1249 #[allow(deprecated)]
1250 fn test_parse_required_fields() {
1251 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1252 assert_eq!(cfg.bootstrap_servers, "localhost:9092");
1253 assert_eq!(cfg.group_id, "test-group");
1254 assert_eq!(cfg.subscription.topics().unwrap(), &["events"]);
1255 assert!(matches!(
1256 cfg.subscription,
1257 TopicSubscription::Topics(ref t) if t == &["events"]
1258 ));
1259 }
1260
1261 #[test]
1262 fn test_parse_missing_required() {
1263 let config = ConnectorConfig::new("kafka");
1264 assert!(KafkaSourceConfig::from_config(&config).is_err());
1265 }
1266
1267 #[test]
1268 #[allow(deprecated)]
1269 fn test_parse_multi_topic() {
1270 let cfg = KafkaSourceConfig::from_config(&make_config(&[("topic", "a, b, c")])).unwrap();
1271 assert_eq!(cfg.subscription.topics().unwrap(), &["a", "b", "c"]);
1272 assert!(matches!(
1273 cfg.subscription,
1274 TopicSubscription::Topics(ref t) if t == &["a", "b", "c"]
1275 ));
1276 }
1277
1278 #[test]
1279 fn test_parse_topic_pattern() {
1280 let mut config = ConnectorConfig::new("kafka");
1281 config.set("bootstrap.servers", "localhost:9092");
1282 config.set("group.id", "test-group");
1283 config.set("topic.pattern", "events-.*");
1284
1285 let cfg = KafkaSourceConfig::from_config(&config).unwrap();
1286 assert!(matches!(
1287 cfg.subscription,
1288 TopicSubscription::Pattern(ref p) if p == "events-.*"
1289 ));
1290 assert!(cfg.subscription.is_pattern());
1291 assert_eq!(cfg.subscription.pattern(), Some("events-.*"));
1292 assert!(cfg.subscription.topics().is_none());
1293 }
1294
1295 #[test]
1296 fn test_parse_defaults() {
1297 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1298 assert_eq!(cfg.format, Format::Json);
1299 assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1300 assert_eq!(cfg.isolation_level, IsolationLevel::ReadCommitted);
1301 assert_eq!(cfg.max_poll_records, 1000);
1302 assert_eq!(cfg.partition_assignment_strategy, AssignmentStrategy::Range);
1303 assert!(!cfg.include_metadata);
1304 assert!(!cfg.include_headers);
1305 assert!(cfg.schema_registry_url.is_none());
1306 assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
1307 assert!(cfg.sasl_mechanism.is_none());
1308 assert_eq!(cfg.broker_commit_interval, Duration::from_secs(5));
1309 assert_eq!(cfg.reader_channel_capacity, 1024);
1310 }
1311
1312 #[test]
1313 fn test_parse_broker_commit_interval() {
1314 let cfg =
1315 KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "5000")]))
1316 .unwrap();
1317 assert_eq!(cfg.broker_commit_interval, Duration::from_secs(5));
1318 }
1319
1320 #[test]
1321 fn test_parse_broker_commit_interval_zero_disables() {
1322 let cfg =
1323 KafkaSourceConfig::from_config(&make_config(&[("broker.commit.interval.ms", "0")]))
1324 .unwrap();
1325 assert!(cfg.broker_commit_interval.is_zero());
1326 }
1327
1328 #[test]
1329 fn test_parse_optional_fields() {
1330 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1331 ("format", "csv"),
1332 ("auto.offset.reset", "latest"),
1333 ("max.poll.records", "500"),
1334 ("include.metadata", "true"),
1335 ("include.headers", "true"),
1336 ("event.time.column", "ts"),
1337 ("partition.assignment.strategy", "roundrobin"),
1338 ("isolation.level", "read_uncommitted"),
1339 ]))
1340 .unwrap();
1341
1342 assert_eq!(cfg.format, Format::Csv);
1343 assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1344 assert_eq!(cfg.isolation_level, IsolationLevel::ReadUncommitted);
1345 assert_eq!(cfg.max_poll_records, 500);
1346 assert!(cfg.include_metadata);
1347 assert!(cfg.include_headers);
1348 assert_eq!(cfg.event_time_column, Some("ts".to_string()));
1349 assert_eq!(
1350 cfg.partition_assignment_strategy,
1351 AssignmentStrategy::RoundRobin
1352 );
1353 }
1354
1355 #[test]
1356 fn test_parse_security_sasl_ssl() {
1357 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1358 ("security.protocol", "sasl_ssl"),
1359 ("sasl.mechanism", "SCRAM-SHA-256"),
1360 ("sasl.username", "alice"),
1361 ("sasl.password", "secret"),
1362 ("ssl.ca.location", "/etc/ssl/ca.pem"),
1363 ]))
1364 .unwrap();
1365
1366 assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
1367 assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha256));
1368 assert_eq!(cfg.sasl_username, Some("alice".to_string()));
1369 assert_eq!(cfg.sasl_password, Some("secret".to_string()));
1370 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1371 }
1372
1373 #[test]
1374 fn test_parse_security_ssl_only() {
1375 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1376 ("security.protocol", "ssl"),
1377 ("ssl.ca.location", "/etc/ssl/ca.pem"),
1378 ("ssl.certificate.location", "/etc/ssl/client.pem"),
1379 ("ssl.key.location", "/etc/ssl/client.key"),
1380 ("ssl.key.password", "keypass"),
1381 ]))
1382 .unwrap();
1383
1384 assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
1385 assert!(cfg.security_protocol.uses_ssl());
1386 assert!(!cfg.security_protocol.uses_sasl());
1387 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
1388 assert_eq!(
1389 cfg.ssl_certificate_location,
1390 Some("/etc/ssl/client.pem".to_string())
1391 );
1392 assert_eq!(
1393 cfg.ssl_key_location,
1394 Some("/etc/ssl/client.key".to_string())
1395 );
1396 assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
1397 }
1398
1399 #[test]
1400 fn test_parse_fetch_tuning() {
1401 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1402 ("fetch.min.bytes", "1024"),
1403 ("fetch.max.bytes", "52428800"),
1404 ("fetch.max.wait.ms", "500"),
1405 ("max.partition.fetch.bytes", "1048576"),
1406 ]))
1407 .unwrap();
1408
1409 assert_eq!(cfg.fetch_min_bytes, Some(1024));
1410 assert_eq!(cfg.fetch_max_bytes, Some(52_428_800));
1411 assert_eq!(cfg.fetch_max_wait_ms, Some(500));
1412 assert_eq!(cfg.max_partition_fetch_bytes, Some(1_048_576));
1413 }
1414
1415 #[test]
1416 fn test_parse_kafka_passthrough() {
1417 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1418 ("kafka.session.timeout.ms", "30000"),
1419 ("kafka.max.poll.interval.ms", "300000"),
1420 ("kafka.fetch.message.max.bytes", "2097152"),
1421 ]))
1422 .unwrap();
1423
1424 assert_eq!(cfg.kafka_properties.len(), 3);
1425 assert_eq!(
1429 cfg.kafka_properties.get("session.timeout.ms"),
1430 Some(&"30000".to_string())
1431 );
1432 assert_eq!(
1433 cfg.kafka_properties.get("max.poll.interval.ms"),
1434 Some(&"300000".to_string())
1435 );
1436 }
1437
1438 #[test]
1439 fn test_parse_schema_registry() {
1440 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1441 ("format", "avro"),
1442 ("schema.registry.url", "http://localhost:8081"),
1443 ("schema.registry.username", "user"),
1444 ("schema.registry.password", "pass"),
1445 ("schema.compatibility", "FULL_TRANSITIVE"),
1446 ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
1447 ]))
1448 .unwrap();
1449
1450 assert_eq!(cfg.format, Format::Avro);
1451 assert_eq!(
1452 cfg.schema_registry_url,
1453 Some("http://localhost:8081".to_string())
1454 );
1455 assert!(cfg.schema_registry_auth.is_some());
1456 let auth = cfg.schema_registry_auth.unwrap();
1457 assert_eq!(auth.username, "user");
1458 assert_eq!(auth.password, "pass");
1459 assert_eq!(
1460 cfg.schema_compatibility,
1461 Some(CompatibilityLevel::FullTransitive)
1462 );
1463 assert_eq!(
1464 cfg.schema_registry_ssl_ca_location,
1465 Some("/etc/ssl/sr-ca.pem".to_string())
1466 );
1467 }
1468
1469 #[test]
1470 fn test_parse_sr_auth_partial() {
1471 let config = make_config(&[
1472 ("schema.registry.url", "http://localhost:8081"),
1473 ("schema.registry.username", "user"),
1474 ]);
1476 assert!(KafkaSourceConfig::from_config(&config).is_err());
1477 }
1478
1479 #[test]
1480 fn test_validate_avro_without_sr() {
1481 let mut cfg = KafkaSourceConfig::default();
1482 cfg.bootstrap_servers = "localhost:9092".into();
1483 cfg.group_id = "g".into();
1484 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1485 cfg.format = Format::Avro;
1486 assert!(cfg.validate().is_err());
1488 }
1489
1490 #[test]
1491 fn test_validate_backpressure_watermarks() {
1492 let mut cfg = KafkaSourceConfig::default();
1493 cfg.bootstrap_servers = "localhost:9092".into();
1494 cfg.group_id = "g".into();
1495 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1496 cfg.backpressure_high_watermark = 0.3;
1497 cfg.backpressure_low_watermark = 0.5;
1498 assert!(cfg.validate().is_err());
1499 }
1500
1501 #[test]
1502 fn test_validate_sasl_without_mechanism() {
1503 let mut cfg = KafkaSourceConfig::default();
1504 cfg.bootstrap_servers = "localhost:9092".into();
1505 cfg.group_id = "g".into();
1506 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1507 cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1508 assert!(cfg.validate().is_err());
1510 }
1511
1512 #[test]
1513 fn test_validate_sasl_plain_without_credentials() {
1514 let mut cfg = KafkaSourceConfig::default();
1515 cfg.bootstrap_servers = "localhost:9092".into();
1516 cfg.group_id = "g".into();
1517 cfg.subscription = TopicSubscription::Topics(vec!["t".into()]);
1518 cfg.security_protocol = SecurityProtocol::SaslPlaintext;
1519 cfg.sasl_mechanism = Some(SaslMechanism::Plain);
1520 assert!(cfg.validate().is_err());
1522 }
1523
1524 #[test]
1525 fn test_validate_empty_topic_pattern() {
1526 let mut cfg = KafkaSourceConfig::default();
1527 cfg.bootstrap_servers = "localhost:9092".into();
1528 cfg.group_id = "g".into();
1529 cfg.subscription = TopicSubscription::Pattern(String::new());
1530 assert!(cfg.validate().is_err());
1531 }
1532
1533 #[test]
1534 fn test_rdkafka_config() {
1535 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1536 ("auto.offset.reset", "latest"),
1537 ("kafka.fetch.min.bytes", "1024"),
1538 ]))
1539 .unwrap();
1540
1541 let rdkafka = cfg.to_rdkafka_config();
1542 assert_eq!(rdkafka.get("bootstrap.servers"), Some("localhost:9092"));
1543 assert_eq!(rdkafka.get("group.id"), Some("test-group"));
1544 assert_eq!(rdkafka.get("enable.auto.commit"), Some("false"));
1545 assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1546 assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1547 assert_eq!(rdkafka.get("security.protocol"), Some("plaintext"));
1548 assert_eq!(rdkafka.get("isolation.level"), Some("read_committed"));
1549 }
1550
1551 #[test]
1552 fn test_rdkafka_config_with_security() {
1553 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1554 ("security.protocol", "sasl_ssl"),
1555 ("sasl.mechanism", "PLAIN"),
1556 ("sasl.username", "user"),
1557 ("sasl.password", "pass"),
1558 ("ssl.ca.location", "/ca.pem"),
1559 ]))
1560 .unwrap();
1561
1562 let rdkafka = cfg.to_rdkafka_config();
1563 assert_eq!(rdkafka.get("security.protocol"), Some("sasl_ssl"));
1564 assert_eq!(rdkafka.get("sasl.mechanism"), Some("PLAIN"));
1565 assert_eq!(rdkafka.get("sasl.username"), Some("user"));
1566 assert_eq!(rdkafka.get("sasl.password"), Some("pass"));
1567 assert_eq!(rdkafka.get("ssl.ca.location"), Some("/ca.pem"));
1568 }
1569
1570 #[test]
1571 fn test_rdkafka_config_with_fetch_tuning() {
1572 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1573 ("fetch.min.bytes", "1024"),
1574 ("fetch.max.bytes", "1048576"),
1575 ("fetch.max.wait.ms", "500"),
1576 ("max.partition.fetch.bytes", "262144"),
1577 ("isolation.level", "read_uncommitted"),
1578 ]))
1579 .unwrap();
1580
1581 let rdkafka = cfg.to_rdkafka_config();
1582 assert_eq!(rdkafka.get("fetch.min.bytes"), Some("1024"));
1583 assert_eq!(rdkafka.get("fetch.max.bytes"), Some("1048576"));
1584 assert_eq!(rdkafka.get("fetch.wait.max.ms"), Some("500"));
1585 assert_eq!(rdkafka.get("max.partition.fetch.bytes"), Some("262144"));
1586 assert_eq!(rdkafka.get("isolation.level"), Some("read_uncommitted"));
1587 }
1588
1589 #[test]
1590 fn test_offset_reset_parsing() {
1591 assert_eq!(
1592 "earliest".parse::<OffsetReset>().unwrap(),
1593 OffsetReset::Earliest
1594 );
1595 assert_eq!(
1596 "latest".parse::<OffsetReset>().unwrap(),
1597 OffsetReset::Latest
1598 );
1599 assert_eq!("none".parse::<OffsetReset>().unwrap(), OffsetReset::None);
1600 assert!("invalid".parse::<OffsetReset>().is_err());
1601 }
1602
1603 #[test]
1604 fn test_compatibility_level_parsing() {
1605 assert_eq!(
1606 "BACKWARD".parse::<CompatibilityLevel>().unwrap(),
1607 CompatibilityLevel::Backward
1608 );
1609 assert_eq!(
1610 "full_transitive".parse::<CompatibilityLevel>().unwrap(),
1611 CompatibilityLevel::FullTransitive
1612 );
1613 assert_eq!(
1614 "NONE".parse::<CompatibilityLevel>().unwrap(),
1615 CompatibilityLevel::None
1616 );
1617 assert!("invalid".parse::<CompatibilityLevel>().is_err());
1618 }
1619
1620 #[test]
1621 fn test_security_protocol_parsing() {
1622 assert_eq!(
1623 "plaintext".parse::<SecurityProtocol>().unwrap(),
1624 SecurityProtocol::Plaintext
1625 );
1626 assert_eq!(
1627 "SSL".parse::<SecurityProtocol>().unwrap(),
1628 SecurityProtocol::Ssl
1629 );
1630 assert_eq!(
1631 "sasl_plaintext".parse::<SecurityProtocol>().unwrap(),
1632 SecurityProtocol::SaslPlaintext
1633 );
1634 assert_eq!(
1635 "SASL_SSL".parse::<SecurityProtocol>().unwrap(),
1636 SecurityProtocol::SaslSsl
1637 );
1638 assert_eq!(
1639 "sasl-ssl".parse::<SecurityProtocol>().unwrap(),
1640 SecurityProtocol::SaslSsl
1641 );
1642 assert!("invalid".parse::<SecurityProtocol>().is_err());
1643 }
1644
1645 #[test]
1646 fn test_sasl_mechanism_parsing() {
1647 assert_eq!(
1648 "PLAIN".parse::<SaslMechanism>().unwrap(),
1649 SaslMechanism::Plain
1650 );
1651 assert_eq!(
1652 "SCRAM-SHA-256".parse::<SaslMechanism>().unwrap(),
1653 SaslMechanism::ScramSha256
1654 );
1655 assert_eq!(
1656 "scram_sha_512".parse::<SaslMechanism>().unwrap(),
1657 SaslMechanism::ScramSha512
1658 );
1659 assert_eq!(
1660 "GSSAPI".parse::<SaslMechanism>().unwrap(),
1661 SaslMechanism::Gssapi
1662 );
1663 assert_eq!(
1664 "OAUTHBEARER".parse::<SaslMechanism>().unwrap(),
1665 SaslMechanism::Oauthbearer
1666 );
1667 assert!("invalid".parse::<SaslMechanism>().is_err());
1668 }
1669
1670 #[test]
1671 fn test_isolation_level_parsing() {
1672 assert_eq!(
1673 "read_uncommitted".parse::<IsolationLevel>().unwrap(),
1674 IsolationLevel::ReadUncommitted
1675 );
1676 assert_eq!(
1677 "read_committed".parse::<IsolationLevel>().unwrap(),
1678 IsolationLevel::ReadCommitted
1679 );
1680 assert_eq!(
1681 "read-committed".parse::<IsolationLevel>().unwrap(),
1682 IsolationLevel::ReadCommitted
1683 );
1684 assert!("invalid".parse::<IsolationLevel>().is_err());
1685 }
1686
1687 #[test]
1688 fn test_topic_subscription_accessors() {
1689 let topics = TopicSubscription::Topics(vec!["a".into(), "b".into()]);
1690 assert_eq!(
1691 topics.topics(),
1692 Some(&["a".to_string(), "b".to_string()][..])
1693 );
1694 assert!(topics.pattern().is_none());
1695 assert!(!topics.is_pattern());
1696
1697 let pattern = TopicSubscription::Pattern("events-.*".into());
1698 assert!(pattern.topics().is_none());
1699 assert_eq!(pattern.pattern(), Some("events-.*"));
1700 assert!(pattern.is_pattern());
1701 }
1702
1703 #[test]
1704 fn test_security_protocol_helpers() {
1705 assert!(!SecurityProtocol::Plaintext.uses_ssl());
1706 assert!(!SecurityProtocol::Plaintext.uses_sasl());
1707
1708 assert!(SecurityProtocol::Ssl.uses_ssl());
1709 assert!(!SecurityProtocol::Ssl.uses_sasl());
1710
1711 assert!(!SecurityProtocol::SaslPlaintext.uses_ssl());
1712 assert!(SecurityProtocol::SaslPlaintext.uses_sasl());
1713
1714 assert!(SecurityProtocol::SaslSsl.uses_ssl());
1715 assert!(SecurityProtocol::SaslSsl.uses_sasl());
1716 }
1717
1718 #[test]
1719 fn test_sasl_mechanism_helpers() {
1720 assert!(SaslMechanism::Plain.requires_credentials());
1721 assert!(SaslMechanism::ScramSha256.requires_credentials());
1722 assert!(SaslMechanism::ScramSha512.requires_credentials());
1723 assert!(!SaslMechanism::Gssapi.requires_credentials());
1724 assert!(!SaslMechanism::Oauthbearer.requires_credentials());
1725 }
1726
1727 #[test]
1728 fn test_enum_display() {
1729 assert_eq!(SecurityProtocol::SaslSsl.to_string(), "sasl_ssl");
1730 assert_eq!(SaslMechanism::ScramSha256.to_string(), "SCRAM-SHA-256");
1731 assert_eq!(IsolationLevel::ReadCommitted.to_string(), "read_committed");
1732 }
1733
1734 #[test]
1735 fn test_startup_mode_parsing() {
1736 assert_eq!(
1737 "group-offsets".parse::<StartupMode>().unwrap(),
1738 StartupMode::GroupOffsets
1739 );
1740 assert_eq!(
1741 "group_offsets".parse::<StartupMode>().unwrap(),
1742 StartupMode::GroupOffsets
1743 );
1744 assert_eq!(
1745 "earliest".parse::<StartupMode>().unwrap(),
1746 StartupMode::Earliest
1747 );
1748 assert_eq!(
1749 "latest".parse::<StartupMode>().unwrap(),
1750 StartupMode::Latest
1751 );
1752 assert!("invalid".parse::<StartupMode>().is_err());
1753 }
1754
1755 #[test]
1756 fn test_startup_mode_display() {
1757 assert_eq!(StartupMode::GroupOffsets.to_string(), "group-offsets");
1758 assert_eq!(StartupMode::Earliest.to_string(), "earliest");
1759 assert_eq!(StartupMode::Latest.to_string(), "latest");
1760
1761 let specific = StartupMode::SpecificOffsets(HashMap::from([(0, 100), (1, 200)]));
1762 assert!(specific.to_string().contains("2 partitions"));
1763
1764 let ts = StartupMode::Timestamp(1234567890000);
1765 assert!(ts.to_string().contains("1234567890000"));
1766 }
1767
1768 #[test]
1769 fn test_startup_mode_latest_overrides_offset_reset() {
1770 let cfg =
1771 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1772 assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1773 let rdkafka = cfg.to_rdkafka_config();
1774 assert_eq!(rdkafka.get("auto.offset.reset"), Some("latest"));
1775 }
1776
1777 #[test]
1778 fn test_startup_mode_earliest_overrides_offset_reset() {
1779 let cfg =
1780 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1781 assert_eq!(cfg.auto_offset_reset, OffsetReset::Earliest);
1782 }
1783
1784 #[test]
1785 fn test_startup_mode_group_offsets_uses_explicit_offset_reset() {
1786 let cfg = KafkaSourceConfig::from_config(&make_config(&[("auto.offset.reset", "latest")]))
1789 .unwrap();
1790 assert_eq!(cfg.auto_offset_reset, OffsetReset::Latest);
1791 }
1792
1793 #[test]
1794 fn test_parse_specific_offsets() {
1795 let offsets = parse_specific_offsets("0:100,1:200,2:300").unwrap();
1796 assert_eq!(offsets.get(&0), Some(&100));
1797 assert_eq!(offsets.get(&1), Some(&200));
1798 assert_eq!(offsets.get(&2), Some(&300));
1799 }
1800
1801 #[test]
1802 fn test_parse_specific_offsets_with_spaces() {
1803 let offsets = parse_specific_offsets(" 0:100 , 1:200 ").unwrap();
1804 assert_eq!(offsets.get(&0), Some(&100));
1805 assert_eq!(offsets.get(&1), Some(&200));
1806 }
1807
1808 #[test]
1809 fn test_parse_specific_offsets_errors() {
1810 assert!(parse_specific_offsets("").is_err());
1811 assert!(parse_specific_offsets("0").is_err());
1812 assert!(parse_specific_offsets("0:abc").is_err());
1813 assert!(parse_specific_offsets("abc:100").is_err());
1814 assert!(parse_specific_offsets("0:100:extra").is_err());
1815 }
1816
1817 #[test]
1818 fn test_parse_startup_mode_from_config() {
1819 let cfg =
1820 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "earliest")])).unwrap();
1821 assert_eq!(cfg.startup_mode, StartupMode::Earliest);
1822
1823 let cfg =
1824 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "latest")])).unwrap();
1825 assert_eq!(cfg.startup_mode, StartupMode::Latest);
1826
1827 let cfg =
1828 KafkaSourceConfig::from_config(&make_config(&[("startup.mode", "group-offsets")]))
1829 .unwrap();
1830 assert_eq!(cfg.startup_mode, StartupMode::GroupOffsets);
1831 }
1832
1833 #[test]
1834 fn test_parse_startup_specific_offsets_from_config() {
1835 let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1836 "startup.specific.offsets",
1837 "0:100,1:200",
1838 )]))
1839 .unwrap();
1840
1841 match cfg.startup_mode {
1842 StartupMode::SpecificOffsets(offsets) => {
1843 assert_eq!(offsets.get(&0), Some(&100));
1844 assert_eq!(offsets.get(&1), Some(&200));
1845 }
1846 _ => panic!("expected SpecificOffsets"),
1847 }
1848 }
1849
1850 #[test]
1851 fn test_parse_startup_timestamp_from_config() {
1852 let cfg =
1853 KafkaSourceConfig::from_config(&make_config(&[("startup.timestamp.ms", "1234567890")]))
1854 .unwrap();
1855
1856 assert_eq!(cfg.startup_mode, StartupMode::Timestamp(1234567890));
1857 }
1858
1859 #[test]
1860 fn test_parse_schema_registry_ssl_fields() {
1861 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1862 ("schema.registry.url", "https://sr:8081"),
1863 ("schema.registry.ssl.ca.location", "/ca.pem"),
1864 ("schema.registry.ssl.certificate.location", "/cert.pem"),
1865 ("schema.registry.ssl.key.location", "/key.pem"),
1866 ]))
1867 .unwrap();
1868
1869 assert_eq!(
1870 cfg.schema_registry_ssl_ca_location,
1871 Some("/ca.pem".to_string())
1872 );
1873 assert_eq!(
1874 cfg.schema_registry_ssl_certificate_location,
1875 Some("/cert.pem".to_string())
1876 );
1877 assert_eq!(
1878 cfg.schema_registry_ssl_key_location,
1879 Some("/key.pem".to_string())
1880 );
1881 }
1882
1883 #[test]
1884 fn test_parse_watermark_defaults() {
1885 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1886 assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(5));
1887 assert_eq!(cfg.idle_timeout, Duration::from_secs(30));
1888 assert!(!cfg.enable_watermark_tracking);
1889 }
1890
1891 #[test]
1892 fn test_parse_watermark_tracking_enabled() {
1893 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1894 ("enable.watermark.tracking", "true"),
1895 ("max.out.of.orderness.ms", "10000"),
1896 ("idle.timeout.ms", "60000"),
1897 ]))
1898 .unwrap();
1899
1900 assert!(cfg.enable_watermark_tracking);
1901 assert_eq!(cfg.max_out_of_orderness, Duration::from_secs(10));
1902 assert_eq!(cfg.idle_timeout, Duration::from_secs(60));
1903 }
1904
1905 #[test]
1908 fn test_startup_mode_timestamp_error() {
1909 let config = make_config(&[("startup.mode", "timestamp")]);
1910 let err = KafkaSourceConfig::from_config(&config).unwrap_err();
1911 let msg = err.to_string();
1912 assert!(
1913 msg.contains("startup.timestamp.ms"),
1914 "error should mention startup.timestamp.ms, got: {msg}"
1915 );
1916 }
1917
1918 #[test]
1921 fn test_session_timeout_heartbeat_defaults() {
1922 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1923 assert_eq!(cfg.session_timeout, Duration::from_secs(45));
1924 assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
1925 }
1926
1927 #[test]
1928 fn test_session_timeout_heartbeat_custom() {
1929 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1930 ("session.timeout.ms", "60000"),
1931 ("heartbeat.interval.ms", "15000"),
1932 ]))
1933 .unwrap();
1934 assert_eq!(cfg.session_timeout, Duration::from_secs(60));
1935 assert_eq!(cfg.heartbeat_interval, Duration::from_secs(15));
1936 }
1937
1938 #[test]
1939 fn test_session_timeout_heartbeat_validation_fails() {
1940 let config = make_config(&[
1942 ("session.timeout.ms", "45000"),
1943 ("heartbeat.interval.ms", "20000"),
1944 ]);
1945 let err = KafkaSourceConfig::from_config(&config).unwrap_err();
1946 let msg = err.to_string();
1947 assert!(msg.contains("heartbeat.interval.ms"), "got: {msg}");
1948 }
1949
1950 #[test]
1951 fn test_session_timeout_heartbeat_validation_passes() {
1952 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1954 ("session.timeout.ms", "45000"),
1955 ("heartbeat.interval.ms", "10000"),
1956 ]))
1957 .unwrap();
1958 assert_eq!(cfg.session_timeout, Duration::from_secs(45));
1959 assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
1960 }
1961
1962 #[test]
1963 fn test_session_timeout_heartbeat_in_rdkafka_config() {
1964 let cfg = KafkaSourceConfig::from_config(&make_config(&[
1965 ("session.timeout.ms", "60000"),
1966 ("heartbeat.interval.ms", "15000"),
1967 ]))
1968 .unwrap();
1969 let rdkafka = cfg.to_rdkafka_config();
1970 assert_eq!(rdkafka.get("session.timeout.ms"), Some("60000"));
1971 assert_eq!(rdkafka.get("heartbeat.interval.ms"), Some("15000"));
1972 }
1973
1974 #[test]
1975 fn test_session_timeout_passthrough_blocked() {
1976 let cfg =
1977 KafkaSourceConfig::from_config(&make_config(&[("kafka.session.timeout.ms", "99999")]))
1978 .unwrap();
1979 let rdkafka = cfg.to_rdkafka_config();
1981 assert_eq!(rdkafka.get("session.timeout.ms"), Some("45000"));
1982 }
1983
1984 #[test]
1987 fn test_queued_max_messages_kbytes_default() {
1988 let cfg = KafkaSourceConfig::from_config(&make_config(&[])).unwrap();
1989 assert_eq!(cfg.queued_max_messages_kbytes, 16384);
1990 }
1991
1992 #[test]
1993 fn test_queued_max_messages_kbytes_custom() {
1994 let cfg = KafkaSourceConfig::from_config(&make_config(&[(
1995 "queued.max.messages.kbytes",
1996 "32768",
1997 )]))
1998 .unwrap();
1999 assert_eq!(cfg.queued_max_messages_kbytes, 32768);
2000 }
2001
2002 #[test]
2003 fn test_queued_max_messages_kbytes_in_rdkafka_config() {
2004 let cfg =
2005 KafkaSourceConfig::from_config(&make_config(&[("queued.max.messages.kbytes", "8192")]))
2006 .unwrap();
2007 let rdkafka = cfg.to_rdkafka_config();
2008 assert_eq!(rdkafka.get("queued.max.messages.kbytes"), Some("8192"));
2009 }
2010
2011 #[test]
2014 fn resolve_subject_topic_name() {
2015 assert_eq!(
2016 resolve_value_subject(SubjectNameStrategy::TopicName, None, "orders"),
2017 "orders-value"
2018 );
2019 }
2020
2021 #[test]
2022 fn resolve_subject_record_name() {
2023 assert_eq!(
2024 resolve_value_subject(
2025 SubjectNameStrategy::RecordName,
2026 Some("com.acme.Order"),
2027 "orders"
2028 ),
2029 "com.acme.Order-value"
2030 );
2031 }
2032
2033 #[test]
2034 fn resolve_subject_topic_record_name() {
2035 assert_eq!(
2036 resolve_value_subject(
2037 SubjectNameStrategy::TopicRecordName,
2038 Some("com.acme.Order"),
2039 "orders"
2040 ),
2041 "orders-com.acme.Order-value"
2042 );
2043 }
2044
2045 #[test]
2046 fn parse_subject_strategy_from_config() {
2047 let cfg = KafkaSourceConfig::from_config(&make_config(&[
2048 ("schema.registry.url", "http://sr:8081"),
2049 ("schema.registry.subject.name.strategy", "record-name"),
2050 ("schema.registry.record.name", "com.acme.Order"),
2051 ]))
2052 .unwrap();
2053 assert_eq!(
2054 cfg.schema_registry_subject_strategy,
2055 SubjectNameStrategy::RecordName
2056 );
2057 assert_eq!(
2058 cfg.schema_registry_record_name.as_deref(),
2059 Some("com.acme.Order")
2060 );
2061 }
2062
2063 #[test]
2064 fn parse_subject_strategy_rejects_missing_record_name() {
2065 let err = KafkaSourceConfig::from_config(&make_config(&[
2066 ("schema.registry.url", "http://sr:8081"),
2067 ("schema.registry.subject.name.strategy", "record-name"),
2068 ]))
2069 .unwrap_err();
2070 assert!(matches!(err, ConnectorError::ConfigurationError(_)));
2071 }
2072
2073 #[test]
2074 fn parse_discovery_timeout_default_ten_seconds() {
2075 let cfg =
2076 KafkaSourceConfig::from_config(&make_config(&[("schema.registry.url", "http://sr")]))
2077 .unwrap();
2078 assert_eq!(
2079 cfg.schema_registry_discovery_timeout,
2080 Duration::from_secs(10)
2081 );
2082 }
2083
2084 #[test]
2085 fn parse_discovery_timeout_override() {
2086 let cfg = KafkaSourceConfig::from_config(&make_config(&[
2087 ("schema.registry.url", "http://sr"),
2088 ("schema.registry.discovery.timeout.ms", "25000"),
2089 ]))
2090 .unwrap();
2091 assert_eq!(
2092 cfg.schema_registry_discovery_timeout,
2093 Duration::from_secs(25)
2094 );
2095 }
2096}