1use std::collections::HashMap;
7use std::time::Duration;
8
9use rdkafka::ClientConfig;
10
11use crate::config::ConnectorConfig;
12use crate::error::ConnectorError;
13use crate::kafka::config::{CompatibilityLevel, SaslMechanism, SecurityProtocol, SrAuth};
14use crate::serde::Format;
15
16#[derive(Clone)]
23pub struct KafkaSinkConfig {
24 pub bootstrap_servers: String,
26 pub topic: String,
28 pub security_protocol: SecurityProtocol,
30 pub sasl_mechanism: Option<SaslMechanism>,
32 pub sasl_username: Option<String>,
34 pub sasl_password: Option<String>,
36 pub ssl_ca_location: Option<String>,
38 pub ssl_certificate_location: Option<String>,
40 pub ssl_key_location: Option<String>,
42 pub ssl_key_password: Option<String>,
44 pub format: Format,
46 pub schema_registry_url: Option<String>,
48 pub schema_registry_auth: Option<SrAuth>,
50 pub schema_compatibility: Option<CompatibilityLevel>,
52 pub schema_registry_ssl_ca_location: Option<String>,
54 pub delivery_guarantee: DeliveryGuarantee,
56 pub transactional_id: Option<String>,
58 pub transaction_timeout: Duration,
60 pub acks: Acks,
62 pub max_in_flight: usize,
64 pub delivery_timeout: Duration,
66 pub key_column: Option<String>,
68 pub partitioner: PartitionStrategy,
70 pub linger_ms: u64,
72 pub batch_size: usize,
74 pub batch_num_messages: Option<usize>,
76 pub compression: CompressionType,
78 pub dlq_topic: Option<String>,
80 pub flush_batch_size: usize,
82 pub kafka_properties: HashMap<String, String>,
84}
85
86impl std::fmt::Debug for KafkaSinkConfig {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 f.debug_struct("KafkaSinkConfig")
89 .field("bootstrap_servers", &self.bootstrap_servers)
90 .field("topic", &self.topic)
91 .field("format", &self.format)
92 .field("delivery_guarantee", &self.delivery_guarantee)
93 .field("security_protocol", &self.security_protocol)
94 .field("sasl_mechanism", &self.sasl_mechanism)
95 .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
96 .field(
97 "ssl_key_password",
98 &self.ssl_key_password.as_ref().map(|_| "***"),
99 )
100 .field("partitioner", &self.partitioner)
101 .field("acks", &self.acks)
102 .finish_non_exhaustive()
103 }
104}
105
106impl Default for KafkaSinkConfig {
107 fn default() -> Self {
108 Self {
109 bootstrap_servers: String::new(),
110 topic: String::new(),
111 security_protocol: SecurityProtocol::default(),
112 sasl_mechanism: None,
113 sasl_username: None,
114 sasl_password: None,
115 ssl_ca_location: None,
116 ssl_certificate_location: None,
117 ssl_key_location: None,
118 ssl_key_password: None,
119 format: Format::Json,
120 schema_registry_url: None,
121 schema_registry_auth: None,
122 schema_compatibility: None,
123 schema_registry_ssl_ca_location: None,
124 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
125 transactional_id: None,
126 transaction_timeout: Duration::from_secs(60),
127 acks: Acks::All,
128 max_in_flight: 5,
129 delivery_timeout: Duration::from_secs(120),
130 key_column: None,
131 partitioner: PartitionStrategy::KeyHash,
132 linger_ms: 5,
133 batch_size: 16_384,
134 batch_num_messages: None,
135 compression: CompressionType::None,
136 dlq_topic: None,
137 flush_batch_size: 1_000,
138 kafka_properties: HashMap::new(),
139 }
140 }
141}
142
143impl KafkaSinkConfig {
144 #[allow(clippy::too_many_lines, clippy::field_reassign_with_default)]
151 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
152 let mut cfg = Self::default();
153
154 cfg.bootstrap_servers = config
155 .get("bootstrap.servers")
156 .ok_or_else(|| ConnectorError::MissingConfig("bootstrap.servers".into()))?
157 .to_string();
158
159 cfg.topic = config
160 .get("topic")
161 .ok_or_else(|| ConnectorError::MissingConfig("topic".into()))?
162 .to_string();
163
164 if let Some(s) = config.get("security.protocol") {
165 cfg.security_protocol = s.parse()?;
166 }
167
168 if let Some(s) = config.get("sasl.mechanism") {
169 cfg.sasl_mechanism = Some(s.parse()?);
170 }
171
172 cfg.sasl_username = config.get("sasl.username").map(String::from);
173 cfg.sasl_password = config.get("sasl.password").map(String::from);
174 cfg.ssl_ca_location = config.get("ssl.ca.location").map(String::from);
175 cfg.ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
176 cfg.ssl_key_location = config.get("ssl.key.location").map(String::from);
177 cfg.ssl_key_password = config.get("ssl.key.password").map(String::from);
178
179 if let Some(fmt) = config.get("format") {
180 cfg.format = fmt.parse().map_err(ConnectorError::Serde)?;
181 }
182
183 cfg.schema_registry_url = config.get("schema.registry.url").map(String::from);
184
185 let sr_user = config.get("schema.registry.username");
186 let sr_pass = config.get("schema.registry.password");
187 if let (Some(user), Some(pass)) = (sr_user, sr_pass) {
188 cfg.schema_registry_auth = Some(SrAuth {
189 username: user.to_string(),
190 password: pass.to_string(),
191 });
192 }
193
194 if let Some(c) = config.get("schema.compatibility") {
195 cfg.schema_compatibility = Some(c.parse().map_err(|_| {
196 ConnectorError::ConfigurationError(format!("invalid schema.compatibility: '{c}'"))
197 })?);
198 }
199
200 cfg.schema_registry_ssl_ca_location = config
201 .get("schema.registry.ssl.ca.location")
202 .map(String::from);
203
204 if let Some(dg) = config.get("delivery.guarantee") {
205 cfg.delivery_guarantee = dg.parse().map_err(|_| {
206 ConnectorError::ConfigurationError(format!(
207 "invalid delivery.guarantee: '{dg}' (expected 'at-least-once' or 'exactly-once')"
208 ))
209 })?;
210 }
211
212 cfg.transactional_id = config.get("transactional.id").map(String::from);
213
214 if let Some(v) = config.get("transaction.timeout.ms") {
215 let ms: u64 = v.parse().map_err(|_| {
216 ConnectorError::ConfigurationError(format!("invalid transaction.timeout.ms: '{v}'"))
217 })?;
218 cfg.transaction_timeout = Duration::from_millis(ms);
219 }
220
221 if let Some(a) = config.get("acks") {
222 cfg.acks = a.parse().map_err(|_| {
223 ConnectorError::ConfigurationError(format!(
224 "invalid acks: '{a}' (expected 'all', '1', or '0')"
225 ))
226 })?;
227 }
228
229 if let Some(v) = config.get("max.in.flight.requests") {
230 cfg.max_in_flight = v.parse().map_err(|_| {
231 ConnectorError::ConfigurationError(format!("invalid max.in.flight.requests: '{v}'"))
232 })?;
233 }
234
235 if let Some(v) = config.get("delivery.timeout.ms") {
236 let ms: u64 = v.parse().map_err(|_| {
237 ConnectorError::ConfigurationError(format!("invalid delivery.timeout.ms: '{v}'"))
238 })?;
239 cfg.delivery_timeout = Duration::from_millis(ms);
240 }
241
242 cfg.key_column = config.get("key.column").map(String::from);
243
244 if let Some(p) = config.get("partitioner") {
245 cfg.partitioner = p.parse().map_err(|_| {
246 ConnectorError::ConfigurationError(format!(
247 "invalid partitioner: '{p}' (expected 'key-hash', 'round-robin', or 'sticky')"
248 ))
249 })?;
250 }
251
252 if let Some(v) = config.get("linger.ms") {
253 cfg.linger_ms = v.parse().map_err(|_| {
254 ConnectorError::ConfigurationError(format!("invalid linger.ms: '{v}'"))
255 })?;
256 }
257
258 if let Some(v) = config.get("batch.size") {
259 cfg.batch_size = v.parse().map_err(|_| {
260 ConnectorError::ConfigurationError(format!("invalid batch.size: '{v}'"))
261 })?;
262 }
263
264 if let Some(v) = config.get("batch.num.messages") {
265 cfg.batch_num_messages = Some(v.parse().map_err(|_| {
266 ConnectorError::ConfigurationError(format!("invalid batch.num.messages: '{v}'"))
267 })?);
268 }
269
270 if let Some(c) = config.get("compression.type") {
271 cfg.compression = c.parse().map_err(|_| {
272 ConnectorError::ConfigurationError(format!("invalid compression.type: '{c}'"))
273 })?;
274 }
275
276 cfg.dlq_topic = config.get("dlq.topic").map(String::from);
277
278 if let Some(v) = config.get("flush.batch.size") {
279 cfg.flush_batch_size = v.parse().map_err(|_| {
280 ConnectorError::ConfigurationError(format!("invalid flush.batch.size: '{v}'"))
281 })?;
282 }
283
284 for (key, value) in config.properties_with_prefix("kafka.") {
285 cfg.kafka_properties.insert(key, value);
286 }
287
288 cfg.validate()?;
289 Ok(cfg)
290 }
291
292 pub fn validate(&self) -> Result<(), ConnectorError> {
298 if self.bootstrap_servers.is_empty() {
299 return Err(ConnectorError::MissingConfig("bootstrap.servers".into()));
300 }
301 if self.topic.is_empty() {
302 return Err(ConnectorError::MissingConfig("topic".into()));
303 }
304
305 if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
306 return Err(ConnectorError::ConfigurationError(
307 "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
308 .into(),
309 ));
310 }
311
312 if let Some(mechanism) = &self.sasl_mechanism {
313 if mechanism.requires_credentials()
314 && (self.sasl_username.is_none() || self.sasl_password.is_none())
315 {
316 return Err(ConnectorError::ConfigurationError(format!(
317 "sasl.username and sasl.password are required for {mechanism} mechanism"
318 )));
319 }
320 }
321
322 if self.security_protocol.uses_ssl() {
323 if let Some(ref ca) = self.ssl_ca_location {
324 if ca.is_empty() {
325 return Err(ConnectorError::ConfigurationError(
326 "ssl.ca.location cannot be empty when specified".into(),
327 ));
328 }
329 }
330 }
331
332 if self.format == Format::Avro && self.schema_registry_url.is_none() {
333 return Err(ConnectorError::ConfigurationError(
334 "Avro format requires 'schema.registry.url'".into(),
335 ));
336 }
337
338 if self.max_in_flight == 0 {
339 return Err(ConnectorError::ConfigurationError(
340 "max.in.flight.requests must be > 0".into(),
341 ));
342 }
343
344 if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce && self.max_in_flight > 5 {
345 return Err(ConnectorError::ConfigurationError(
346 "exactly-once requires max.in.flight.requests <= 5".into(),
347 ));
348 }
349
350 Ok(())
351 }
352
353 #[must_use]
358 pub fn to_rdkafka_config(&self) -> ClientConfig {
359 let mut config = ClientConfig::new();
360
361 config.set("bootstrap.servers", &self.bootstrap_servers);
362 config.set("security.protocol", self.security_protocol.as_rdkafka_str());
363
364 if let Some(ref mechanism) = self.sasl_mechanism {
365 config.set("sasl.mechanism", mechanism.as_rdkafka_str());
366 }
367
368 if let Some(ref username) = self.sasl_username {
369 config.set("sasl.username", username);
370 }
371
372 if let Some(ref password) = self.sasl_password {
373 config.set("sasl.password", password);
374 }
375
376 if let Some(ref ca) = self.ssl_ca_location {
377 config.set("ssl.ca.location", ca);
378 }
379
380 if let Some(ref cert) = self.ssl_certificate_location {
381 config.set("ssl.certificate.location", cert);
382 }
383
384 if let Some(ref key) = self.ssl_key_location {
385 config.set("ssl.key.location", key);
386 }
387
388 if let Some(ref key_pass) = self.ssl_key_password {
389 config.set("ssl.key.password", key_pass);
390 }
391
392 config
393 .set("enable.idempotence", "true")
394 .set("acks", self.acks.as_rdkafka_str())
395 .set("linger.ms", self.linger_ms.to_string())
396 .set("batch.size", self.batch_size.to_string())
397 .set("compression.type", self.compression.as_rdkafka_str())
398 .set(
399 "max.in.flight.requests.per.connection",
400 self.max_in_flight.to_string(),
401 )
402 .set(
403 "message.timeout.ms",
404 self.delivery_timeout.as_millis().to_string(),
405 );
406
407 if let Some(num_msgs) = self.batch_num_messages {
408 config.set("batch.num.messages", num_msgs.to_string());
409 }
410
411 if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
412 let txn_id = self
413 .transactional_id
414 .clone()
415 .unwrap_or_else(|| format!("laminardb-sink-{}", self.topic));
416 config.set("transactional.id", txn_id);
417 config.set(
418 "transaction.timeout.ms",
419 self.transaction_timeout.as_millis().to_string(),
420 );
421 }
422
423 for (key, value) in &self.kafka_properties {
426 if is_blocked_passthrough_key(key) {
427 tracing::warn!(
428 key,
429 "ignoring kafka.* pass-through property that overrides a security setting"
430 );
431 continue;
432 }
433 config.set(key, value);
434 }
435
436 config
437 }
438
439 #[must_use]
444 pub fn to_dlq_rdkafka_config(&self) -> ClientConfig {
445 let mut config = ClientConfig::new();
446
447 config.set("bootstrap.servers", &self.bootstrap_servers);
448 config.set("security.protocol", self.security_protocol.as_rdkafka_str());
449
450 if let Some(ref mechanism) = self.sasl_mechanism {
451 config.set("sasl.mechanism", mechanism.as_rdkafka_str());
452 }
453 if let Some(ref username) = self.sasl_username {
454 config.set("sasl.username", username);
455 }
456 if let Some(ref password) = self.sasl_password {
457 config.set("sasl.password", password);
458 }
459 if let Some(ref ca) = self.ssl_ca_location {
460 config.set("ssl.ca.location", ca);
461 }
462 if let Some(ref cert) = self.ssl_certificate_location {
463 config.set("ssl.certificate.location", cert);
464 }
465 if let Some(ref key) = self.ssl_key_location {
466 config.set("ssl.key.location", key);
467 }
468 if let Some(ref key_pass) = self.ssl_key_password {
469 config.set("ssl.key.password", key_pass);
470 }
471
472 config.set("enable.idempotence", "true");
473
474 config
475 }
476}
477
478fn is_blocked_passthrough_key(key: &str) -> bool {
480 key.starts_with("sasl.kerberos.")
481 || matches!(
482 key,
483 "security.protocol"
484 | "sasl.mechanism"
485 | "sasl.username"
486 | "sasl.password"
487 | "sasl.oauthbearer.config"
488 | "ssl.ca.location"
489 | "ssl.certificate.location"
490 | "ssl.key.location"
491 | "ssl.key.password"
492 | "ssl.endpoint.identification.algorithm"
493 | "enable.auto.commit"
494 | "enable.idempotence"
495 | "transactional.id"
496 )
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub enum DeliveryGuarantee {
502 AtLeastOnce,
504 ExactlyOnce,
506}
507
508str_enum!(DeliveryGuarantee, lowercase_udash, String, "unknown delivery guarantee",
509 AtLeastOnce => "at-least-once", "atleastonce";
510 ExactlyOnce => "exactly-once", "exactlyonce"
511);
512
513#[derive(Debug, Clone, Copy, PartialEq, Eq)]
515pub enum PartitionStrategy {
516 KeyHash,
518 RoundRobin,
520 Sticky,
522}
523
524str_enum!(PartitionStrategy, lowercase_udash, String, "unknown partition strategy",
525 KeyHash => "key-hash", "keyhash", "hash";
526 RoundRobin => "round-robin", "roundrobin";
527 Sticky => "sticky"
528);
529
530#[derive(Debug, Clone, Copy, PartialEq, Eq)]
532pub enum CompressionType {
533 None,
535 Gzip,
537 Snappy,
539 Lz4,
541 Zstd,
543}
544
545impl CompressionType {
546 #[must_use]
548 pub fn as_rdkafka_str(&self) -> &'static str {
549 match self {
550 Self::None => "none",
551 Self::Gzip => "gzip",
552 Self::Snappy => "snappy",
553 Self::Lz4 => "lz4",
554 Self::Zstd => "zstd",
555 }
556 }
557}
558
559str_enum!(fromstr CompressionType, lowercase_nodash, String, "unknown compression type",
560 None => "none";
561 Gzip => "gzip";
562 Snappy => "snappy";
563 Lz4 => "lz4";
564 Zstd => "zstd", "zstandard"
565);
566
567impl std::fmt::Display for CompressionType {
568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569 write!(f, "{}", self.as_rdkafka_str())
570 }
571}
572
573#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575pub enum Acks {
576 None,
578 Leader,
580 All,
582}
583
584impl Acks {
585 #[must_use]
587 pub fn as_rdkafka_str(&self) -> &'static str {
588 match self {
589 Self::None => "0",
590 Self::Leader => "1",
591 Self::All => "all",
592 }
593 }
594}
595
596str_enum!(fromstr Acks, lowercase_nodash, String, "unknown acks value",
597 None => "0", "none";
598 Leader => "1", "leader";
599 All => "-1", "all"
600);
601
602impl std::fmt::Display for Acks {
603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604 write!(f, "{}", self.as_rdkafka_str())
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611
612 fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
613 let mut config = ConnectorConfig::new("kafka");
614 for (k, v) in pairs {
615 config.set(*k, *v);
616 }
617 config
618 }
619
620 fn required_pairs() -> Vec<(&'static str, &'static str)> {
621 vec![
622 ("bootstrap.servers", "localhost:9092"),
623 ("topic", "output-events"),
624 ]
625 }
626
627 #[test]
628 fn test_parse_required_fields() {
629 let config = make_config(&required_pairs());
630 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
631 assert_eq!(cfg.bootstrap_servers, "localhost:9092");
632 assert_eq!(cfg.topic, "output-events");
633 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
634 assert_eq!(cfg.format, Format::Json);
635 assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
636 }
637
638 #[test]
639 fn test_missing_bootstrap_servers() {
640 let config = make_config(&[("topic", "t")]);
641 assert!(KafkaSinkConfig::from_config(&config).is_err());
642 }
643
644 #[test]
645 fn test_missing_topic() {
646 let config = make_config(&[("bootstrap.servers", "b:9092")]);
647 assert!(KafkaSinkConfig::from_config(&config).is_err());
648 }
649
650 #[test]
651 fn test_parse_delivery_guarantee() {
652 let mut pairs = required_pairs();
653 pairs.push(("delivery.guarantee", "exactly-once"));
654 let config = make_config(&pairs);
655 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
656 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
657 }
658
659 #[test]
660 fn test_parse_security_sasl_ssl() {
661 let mut pairs = required_pairs();
662 pairs.extend_from_slice(&[
663 ("security.protocol", "sasl_ssl"),
664 ("sasl.mechanism", "SCRAM-SHA-512"),
665 ("sasl.username", "producer"),
666 ("sasl.password", "secret123"),
667 ("ssl.ca.location", "/etc/ssl/ca.pem"),
668 ]);
669 let config = make_config(&pairs);
670 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
671
672 assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
673 assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha512));
674 assert_eq!(cfg.sasl_username, Some("producer".to_string()));
675 assert_eq!(cfg.sasl_password, Some("secret123".to_string()));
676 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
677 }
678
679 #[test]
680 fn test_parse_security_ssl_only() {
681 let mut pairs = required_pairs();
682 pairs.extend_from_slice(&[
683 ("security.protocol", "ssl"),
684 ("ssl.ca.location", "/etc/ssl/ca.pem"),
685 ("ssl.certificate.location", "/etc/ssl/client.pem"),
686 ("ssl.key.location", "/etc/ssl/client.key"),
687 ("ssl.key.password", "keypass"),
688 ]);
689 let config = make_config(&pairs);
690 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
691
692 assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
693 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
694 assert_eq!(
695 cfg.ssl_certificate_location,
696 Some("/etc/ssl/client.pem".to_string())
697 );
698 assert_eq!(
699 cfg.ssl_key_location,
700 Some("/etc/ssl/client.key".to_string())
701 );
702 assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
703 }
704
705 #[test]
706 fn test_parse_all_optional_fields() {
707 let mut pairs = required_pairs();
708 pairs.extend_from_slice(&[
709 ("format", "avro"),
710 ("delivery.guarantee", "exactly-once"),
711 ("transactional.id", "my-txn"),
712 ("transaction.timeout.ms", "30000"),
713 ("key.column", "order_id"),
714 ("partitioner", "round-robin"),
715 ("linger.ms", "10"),
716 ("batch.size", "32768"),
717 ("batch.num.messages", "5000"),
718 ("compression.type", "zstd"),
719 ("acks", "1"),
720 ("max.in.flight.requests", "3"),
721 ("delivery.timeout.ms", "60000"),
722 ("dlq.topic", "my-dlq"),
723 ("flush.batch.size", "500"),
724 ("schema.registry.url", "http://sr:8081"),
725 ("schema.registry.username", "user"),
726 ("schema.registry.password", "pass"),
727 ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
728 ]);
729 let config = make_config(&pairs);
730 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
731
732 assert_eq!(cfg.format, Format::Avro);
733 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
734 assert_eq!(cfg.transactional_id.as_deref(), Some("my-txn"));
735 assert_eq!(cfg.transaction_timeout, Duration::from_millis(30_000));
736 assert_eq!(cfg.key_column.as_deref(), Some("order_id"));
737 assert_eq!(cfg.partitioner, PartitionStrategy::RoundRobin);
738 assert_eq!(cfg.linger_ms, 10);
739 assert_eq!(cfg.batch_size, 32_768);
740 assert_eq!(cfg.batch_num_messages, Some(5000));
741 assert_eq!(cfg.compression, CompressionType::Zstd);
742 assert_eq!(cfg.acks, Acks::Leader);
743 assert_eq!(cfg.max_in_flight, 3);
744 assert_eq!(cfg.delivery_timeout, Duration::from_millis(60_000));
745 assert_eq!(cfg.dlq_topic.as_deref(), Some("my-dlq"));
746 assert_eq!(cfg.flush_batch_size, 500);
747 assert_eq!(cfg.schema_registry_url.as_deref(), Some("http://sr:8081"));
748 assert!(cfg.schema_registry_auth.is_some());
749 assert_eq!(
750 cfg.schema_registry_ssl_ca_location,
751 Some("/etc/ssl/sr-ca.pem".to_string())
752 );
753 }
754
755 #[test]
756 fn test_validate_avro_requires_sr() {
757 let mut cfg = KafkaSinkConfig::default();
758 cfg.bootstrap_servers = "b:9092".into();
759 cfg.topic = "t".into();
760 cfg.format = Format::Avro;
761 assert!(cfg.validate().is_err());
762 }
763
764 #[test]
765 fn test_validate_exactly_once_max_in_flight() {
766 let mut cfg = KafkaSinkConfig::default();
767 cfg.bootstrap_servers = "b:9092".into();
768 cfg.topic = "t".into();
769 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
770 cfg.max_in_flight = 10;
771 assert!(cfg.validate().is_err());
772 }
773
774 #[test]
775 fn test_validate_sasl_without_mechanism() {
776 let mut cfg = KafkaSinkConfig::default();
777 cfg.bootstrap_servers = "b:9092".into();
778 cfg.topic = "t".into();
779 cfg.security_protocol = SecurityProtocol::SaslSsl;
780 assert!(cfg.validate().is_err());
782 }
783
784 #[test]
785 fn test_validate_sasl_plain_without_credentials() {
786 let mut cfg = KafkaSinkConfig::default();
787 cfg.bootstrap_servers = "b:9092".into();
788 cfg.topic = "t".into();
789 cfg.security_protocol = SecurityProtocol::SaslPlaintext;
790 cfg.sasl_mechanism = Some(SaslMechanism::ScramSha256);
791 assert!(cfg.validate().is_err());
793 }
794
795 #[test]
796 fn test_rdkafka_config_at_least_once() {
797 let mut cfg = KafkaSinkConfig::default();
798 cfg.bootstrap_servers = "b:9092".into();
799 cfg.topic = "t".into();
800 let rdk = cfg.to_rdkafka_config();
801 assert_eq!(rdk.get("enable.idempotence"), Some("true"));
802 assert!(rdk.get("transactional.id").is_none());
803 assert_eq!(rdk.get("security.protocol"), Some("plaintext"));
804 }
805
806 #[test]
807 fn test_rdkafka_config_exactly_once() {
808 let mut cfg = KafkaSinkConfig::default();
809 cfg.bootstrap_servers = "b:9092".into();
810 cfg.topic = "t".into();
811 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
812 let rdk = cfg.to_rdkafka_config();
813 assert_eq!(rdk.get("enable.idempotence"), Some("true"));
814 assert!(rdk.get("transactional.id").is_some());
815 }
816
817 #[test]
818 fn test_rdkafka_config_with_security() {
819 let mut cfg = KafkaSinkConfig::default();
820 cfg.bootstrap_servers = "b:9092".into();
821 cfg.topic = "t".into();
822 cfg.security_protocol = SecurityProtocol::SaslSsl;
823 cfg.sasl_mechanism = Some(SaslMechanism::Plain);
824 cfg.sasl_username = Some("user".into());
825 cfg.sasl_password = Some("pass".into());
826 cfg.ssl_ca_location = Some("/ca.pem".into());
827
828 let rdk = cfg.to_rdkafka_config();
829 assert_eq!(rdk.get("security.protocol"), Some("sasl_ssl"));
830 assert_eq!(rdk.get("sasl.mechanism"), Some("PLAIN"));
831 assert_eq!(rdk.get("sasl.username"), Some("user"));
832 assert_eq!(rdk.get("sasl.password"), Some("pass"));
833 assert_eq!(rdk.get("ssl.ca.location"), Some("/ca.pem"));
834 }
835
836 #[test]
837 fn test_rdkafka_config_with_batch_num_messages() {
838 let mut cfg = KafkaSinkConfig::default();
839 cfg.bootstrap_servers = "b:9092".into();
840 cfg.topic = "t".into();
841 cfg.batch_num_messages = Some(10_000);
842
843 let rdk = cfg.to_rdkafka_config();
844 assert_eq!(rdk.get("batch.num.messages"), Some("10000"));
845 }
846
847 #[test]
848 fn test_kafka_passthrough_properties() {
849 let mut pairs = required_pairs();
850 pairs.push(("kafka.socket.timeout.ms", "5000"));
851 pairs.push(("kafka.queue.buffering.max.messages", "100000"));
852 let config = make_config(&pairs);
853 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
854 assert_eq!(
855 cfg.kafka_properties.get("socket.timeout.ms").unwrap(),
856 "5000"
857 );
858 }
859
860 #[test]
861 fn test_defaults() {
862 let cfg = KafkaSinkConfig::default();
863 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
864 assert_eq!(cfg.partitioner, PartitionStrategy::KeyHash);
865 assert_eq!(cfg.compression, CompressionType::None);
866 assert_eq!(cfg.acks, Acks::All);
867 assert_eq!(cfg.linger_ms, 5);
868 assert_eq!(cfg.batch_size, 16_384);
869 assert_eq!(cfg.max_in_flight, 5);
870 assert_eq!(cfg.flush_batch_size, 1_000);
871 assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
872 assert!(cfg.sasl_mechanism.is_none());
873 assert!(cfg.batch_num_messages.is_none());
874 }
875
876 #[test]
877 fn test_enum_display() {
878 assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
879 assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
880 assert_eq!(PartitionStrategy::KeyHash.to_string(), "key-hash");
881 assert_eq!(PartitionStrategy::RoundRobin.to_string(), "round-robin");
882 assert_eq!(PartitionStrategy::Sticky.to_string(), "sticky");
883 assert_eq!(CompressionType::Zstd.to_string(), "zstd");
884 assert_eq!(Acks::All.to_string(), "all");
885 }
886
887 #[test]
888 fn test_enum_parse() {
889 assert_eq!(
890 "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
891 DeliveryGuarantee::AtLeastOnce
892 );
893 assert_eq!(
894 "exactly-once".parse::<DeliveryGuarantee>().unwrap(),
895 DeliveryGuarantee::ExactlyOnce
896 );
897 assert_eq!(
898 "key-hash".parse::<PartitionStrategy>().unwrap(),
899 PartitionStrategy::KeyHash
900 );
901 assert_eq!(
902 "round-robin".parse::<PartitionStrategy>().unwrap(),
903 PartitionStrategy::RoundRobin
904 );
905 assert_eq!(
906 "sticky".parse::<PartitionStrategy>().unwrap(),
907 PartitionStrategy::Sticky
908 );
909 assert_eq!(
910 "gzip".parse::<CompressionType>().unwrap(),
911 CompressionType::Gzip
912 );
913 assert_eq!(
914 "snappy".parse::<CompressionType>().unwrap(),
915 CompressionType::Snappy
916 );
917 assert_eq!(
918 "lz4".parse::<CompressionType>().unwrap(),
919 CompressionType::Lz4
920 );
921 assert_eq!(
922 "zstd".parse::<CompressionType>().unwrap(),
923 CompressionType::Zstd
924 );
925 assert_eq!("all".parse::<Acks>().unwrap(), Acks::All);
926 assert_eq!("1".parse::<Acks>().unwrap(), Acks::Leader);
927 assert_eq!("0".parse::<Acks>().unwrap(), Acks::None);
928 }
929}