1use std::collections::HashMap;
7use std::time::Duration;
8
9use rdkafka::ClientConfig;
10
11use crate::config::ConnectorConfig;
12use crate::error::ConnectorError;
13use crate::kafka::config::{CompatibilityLevel, SaslMechanism, SecurityProtocol, SrAuth};
14use crate::serde::Format;
15
16#[derive(Clone)]
23pub struct KafkaSinkConfig {
24 pub bootstrap_servers: String,
26 pub topic: String,
28 pub security_protocol: SecurityProtocol,
30 pub sasl_mechanism: Option<SaslMechanism>,
32 pub sasl_username: Option<String>,
34 pub sasl_password: Option<String>,
36 pub ssl_ca_location: Option<String>,
38 pub ssl_certificate_location: Option<String>,
40 pub ssl_key_location: Option<String>,
42 pub ssl_key_password: Option<String>,
44 pub format: Format,
46 pub schema_registry_url: Option<String>,
48 pub schema_registry_auth: Option<SrAuth>,
50 pub schema_compatibility: Option<CompatibilityLevel>,
52 pub schema_registry_ssl_ca_location: Option<String>,
54 pub delivery_guarantee: DeliveryGuarantee,
56 pub transactional_id: Option<String>,
61 pub transaction_timeout: Duration,
63 pub acks: Acks,
65 pub max_in_flight: usize,
67 pub delivery_timeout: Duration,
69 pub key_column: Option<String>,
71 pub partitioner: PartitionStrategy,
73 pub linger_ms: u64,
75 pub batch_size: usize,
77 pub batch_num_messages: Option<usize>,
79 pub compression: CompressionType,
81 pub dlq_topic: Option<String>,
83 pub flush_batch_size: usize,
85 pub kafka_properties: HashMap<String, String>,
87}
88
89impl std::fmt::Debug for KafkaSinkConfig {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("KafkaSinkConfig")
92 .field("bootstrap_servers", &self.bootstrap_servers)
93 .field("topic", &self.topic)
94 .field("format", &self.format)
95 .field("delivery_guarantee", &self.delivery_guarantee)
96 .field("security_protocol", &self.security_protocol)
97 .field("sasl_mechanism", &self.sasl_mechanism)
98 .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
99 .field(
100 "ssl_key_password",
101 &self.ssl_key_password.as_ref().map(|_| "***"),
102 )
103 .field("partitioner", &self.partitioner)
104 .field("acks", &self.acks)
105 .finish_non_exhaustive()
106 }
107}
108
109impl Default for KafkaSinkConfig {
110 fn default() -> Self {
111 Self {
112 bootstrap_servers: String::new(),
113 topic: String::new(),
114 security_protocol: SecurityProtocol::default(),
115 sasl_mechanism: None,
116 sasl_username: None,
117 sasl_password: None,
118 ssl_ca_location: None,
119 ssl_certificate_location: None,
120 ssl_key_location: None,
121 ssl_key_password: None,
122 format: Format::Json,
123 schema_registry_url: None,
124 schema_registry_auth: None,
125 schema_compatibility: None,
126 schema_registry_ssl_ca_location: None,
127 delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
128 transactional_id: None,
129 transaction_timeout: Duration::from_secs(60),
130 acks: Acks::All,
131 max_in_flight: 5,
132 delivery_timeout: Duration::from_secs(120),
133 key_column: None,
134 partitioner: PartitionStrategy::KeyHash,
135 linger_ms: 5,
136 batch_size: 16_384,
137 batch_num_messages: None,
138 compression: CompressionType::None,
139 dlq_topic: None,
140 flush_batch_size: 1_000,
141 kafka_properties: HashMap::new(),
142 }
143 }
144}
145
146impl KafkaSinkConfig {
147 #[allow(clippy::too_many_lines, clippy::field_reassign_with_default)]
154 pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
155 let mut cfg = Self::default();
156
157 cfg.bootstrap_servers = config
158 .get("bootstrap.servers")
159 .ok_or_else(|| ConnectorError::missing_config("bootstrap.servers"))?
160 .to_string();
161
162 cfg.topic = config
163 .get("topic")
164 .ok_or_else(|| ConnectorError::missing_config("topic"))?
165 .to_string();
166
167 if let Some(s) = config.get("security.protocol") {
168 cfg.security_protocol = s.parse()?;
169 }
170
171 if let Some(s) = config.get("sasl.mechanism") {
172 cfg.sasl_mechanism = Some(s.parse()?);
173 }
174
175 cfg.sasl_username = config.get("sasl.username").map(String::from);
176 cfg.sasl_password = config.get("sasl.password").map(String::from);
177 cfg.ssl_ca_location = config.get("ssl.ca.location").map(String::from);
178 cfg.ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
179 cfg.ssl_key_location = config.get("ssl.key.location").map(String::from);
180 cfg.ssl_key_password = config.get("ssl.key.password").map(String::from);
181
182 if let Some(fmt) = config.get("format") {
183 cfg.format = fmt.parse().map_err(ConnectorError::Serde)?;
184 }
185
186 cfg.schema_registry_url = config.get("schema.registry.url").map(String::from);
187
188 let sr_user = config.get("schema.registry.username");
189 let sr_pass = config.get("schema.registry.password");
190 if let (Some(user), Some(pass)) = (sr_user, sr_pass) {
191 cfg.schema_registry_auth = Some(SrAuth {
192 username: user.to_string(),
193 password: pass.to_string(),
194 });
195 }
196
197 if let Some(c) = config.get("schema.compatibility") {
198 cfg.schema_compatibility = Some(c.parse().map_err(|_| {
199 ConnectorError::ConfigurationError(format!("invalid schema.compatibility: '{c}'"))
200 })?);
201 }
202
203 cfg.schema_registry_ssl_ca_location = config
204 .get("schema.registry.ssl.ca.location")
205 .map(String::from);
206
207 if let Some(dg) = config.get("delivery.guarantee") {
208 cfg.delivery_guarantee = dg.parse().map_err(|_| {
209 ConnectorError::ConfigurationError(format!(
210 "invalid delivery.guarantee: '{dg}' (expected 'at-least-once' or 'exactly-once')"
211 ))
212 })?;
213 }
214
215 cfg.transactional_id = config.get("transactional.id").map(String::from);
216
217 if let Some(v) = config.get("transaction.timeout.ms") {
218 let ms: u64 = v.parse().map_err(|_| {
219 ConnectorError::ConfigurationError(format!("invalid transaction.timeout.ms: '{v}'"))
220 })?;
221 cfg.transaction_timeout = Duration::from_millis(ms);
222 }
223
224 if let Some(a) = config.get("acks") {
225 cfg.acks = a.parse().map_err(|_| {
226 ConnectorError::ConfigurationError(format!(
227 "invalid acks: '{a}' (expected 'all', '1', or '0')"
228 ))
229 })?;
230 }
231
232 if let Some(v) = config.get("max.in.flight.requests") {
233 cfg.max_in_flight = v.parse().map_err(|_| {
234 ConnectorError::ConfigurationError(format!("invalid max.in.flight.requests: '{v}'"))
235 })?;
236 }
237
238 if let Some(v) = config.get("delivery.timeout.ms") {
239 let ms: u64 = v.parse().map_err(|_| {
240 ConnectorError::ConfigurationError(format!("invalid delivery.timeout.ms: '{v}'"))
241 })?;
242 cfg.delivery_timeout = Duration::from_millis(ms);
243 }
244
245 cfg.key_column = config.get("key.column").map(String::from);
246
247 if let Some(p) = config.get("partitioner") {
248 cfg.partitioner = p.parse().map_err(|_| {
249 ConnectorError::ConfigurationError(format!(
250 "invalid partitioner: '{p}' (expected 'key-hash', 'round-robin', or 'sticky')"
251 ))
252 })?;
253 }
254
255 if let Some(v) = config.get("linger.ms") {
256 cfg.linger_ms = v.parse().map_err(|_| {
257 ConnectorError::ConfigurationError(format!("invalid linger.ms: '{v}'"))
258 })?;
259 }
260
261 if let Some(v) = config.get("batch.size") {
262 cfg.batch_size = v.parse().map_err(|_| {
263 ConnectorError::ConfigurationError(format!("invalid batch.size: '{v}'"))
264 })?;
265 }
266
267 if let Some(v) = config.get("batch.num.messages") {
268 cfg.batch_num_messages = Some(v.parse().map_err(|_| {
269 ConnectorError::ConfigurationError(format!("invalid batch.num.messages: '{v}'"))
270 })?);
271 }
272
273 if let Some(c) = config.get("compression.type") {
274 cfg.compression = c.parse().map_err(|_| {
275 ConnectorError::ConfigurationError(format!("invalid compression.type: '{c}'"))
276 })?;
277 }
278
279 cfg.dlq_topic = config.get("dlq.topic").map(String::from);
280
281 if let Some(v) = config.get("flush.batch.size") {
282 cfg.flush_batch_size = v.parse().map_err(|_| {
283 ConnectorError::ConfigurationError(format!("invalid flush.batch.size: '{v}'"))
284 })?;
285 }
286
287 for (key, value) in config.properties_with_prefix("kafka.") {
288 cfg.kafka_properties.insert(key, value);
289 }
290
291 cfg.validate()?;
292 Ok(cfg)
293 }
294
295 pub fn validate(&self) -> Result<(), ConnectorError> {
301 if self.bootstrap_servers.is_empty() {
302 return Err(ConnectorError::missing_config("bootstrap.servers"));
303 }
304 if self.topic.is_empty() {
305 return Err(ConnectorError::missing_config("topic"));
306 }
307
308 if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
309 return Err(ConnectorError::ConfigurationError(
310 "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
311 .into(),
312 ));
313 }
314
315 if let Some(mechanism) = &self.sasl_mechanism {
316 if mechanism.requires_credentials()
317 && (self.sasl_username.is_none() || self.sasl_password.is_none())
318 {
319 return Err(ConnectorError::ConfigurationError(format!(
320 "sasl.username and sasl.password are required for {mechanism} mechanism"
321 )));
322 }
323 }
324
325 if self.security_protocol.uses_ssl() {
326 if let Some(ref ca) = self.ssl_ca_location {
327 if ca.is_empty() {
328 return Err(ConnectorError::ConfigurationError(
329 "ssl.ca.location cannot be empty when specified".into(),
330 ));
331 }
332 }
333 }
334
335 if self.format == Format::Debezium {
336 return Err(ConnectorError::ConfigurationError(
337 "Debezium is a deserialization-only format and cannot be used for sinks".into(),
338 ));
339 }
340
341 if self.format == Format::Avro && self.schema_registry_url.is_none() {
342 return Err(ConnectorError::ConfigurationError(
343 "Avro format requires 'schema.registry.url'".into(),
344 ));
345 }
346
347 if self.max_in_flight == 0 {
348 return Err(ConnectorError::ConfigurationError(
349 "max.in.flight.requests must be > 0".into(),
350 ));
351 }
352
353 if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce && self.max_in_flight > 5 {
354 return Err(ConnectorError::ConfigurationError(
355 "exactly-once requires max.in.flight.requests <= 5".into(),
356 ));
357 }
358
359 Ok(())
360 }
361
362 #[must_use]
367 pub fn to_rdkafka_config(&self) -> ClientConfig {
368 let mut config = ClientConfig::new();
369
370 config.set("bootstrap.servers", &self.bootstrap_servers);
371 config.set("security.protocol", self.security_protocol.as_rdkafka_str());
372
373 if let Some(ref mechanism) = self.sasl_mechanism {
374 config.set("sasl.mechanism", mechanism.as_rdkafka_str());
375 }
376
377 if let Some(ref username) = self.sasl_username {
378 config.set("sasl.username", username);
379 }
380
381 if let Some(ref password) = self.sasl_password {
382 config.set("sasl.password", password);
383 }
384
385 if let Some(ref ca) = self.ssl_ca_location {
386 config.set("ssl.ca.location", ca);
387 }
388
389 if let Some(ref cert) = self.ssl_certificate_location {
390 config.set("ssl.certificate.location", cert);
391 }
392
393 if let Some(ref key) = self.ssl_key_location {
394 config.set("ssl.key.location", key);
395 }
396
397 if let Some(ref key_pass) = self.ssl_key_password {
398 config.set("ssl.key.password", key_pass);
399 }
400
401 let message_timeout = if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
407 self.delivery_timeout.min(self.transaction_timeout)
408 } else {
409 self.delivery_timeout
410 };
411
412 config
413 .set("enable.idempotence", "true")
414 .set("acks", self.acks.as_rdkafka_str())
415 .set("linger.ms", self.linger_ms.to_string())
416 .set("batch.size", self.batch_size.to_string())
417 .set("compression.type", self.compression.as_rdkafka_str())
418 .set(
419 "max.in.flight.requests.per.connection",
420 self.max_in_flight.to_string(),
421 )
422 .set(
423 "message.timeout.ms",
424 message_timeout.as_millis().to_string(),
425 );
426
427 if let Some(num_msgs) = self.batch_num_messages {
428 config.set("batch.num.messages", num_msgs.to_string());
429 }
430
431 if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
432 let txn_id = self
433 .transactional_id
434 .clone()
435 .unwrap_or_else(|| format!("laminardb-sink-{}", self.topic));
436 config.set("transactional.id", txn_id);
437 config.set(
438 "transaction.timeout.ms",
439 self.transaction_timeout.as_millis().to_string(),
440 );
441 }
442
443 for (key, value) in &self.kafka_properties {
446 if is_blocked_passthrough_key(key) {
447 tracing::warn!(
448 key,
449 "ignoring kafka.* pass-through property that overrides a security setting"
450 );
451 continue;
452 }
453 config.set(key, value);
454 }
455
456 if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
461 let get_ms = |cfg: &ClientConfig, key: &str| -> Option<u64> {
462 cfg.get(key).and_then(|v| v.parse().ok())
463 };
464 if let (Some(msg), Some(txn)) = (
465 get_ms(&config, "message.timeout.ms"),
466 get_ms(&config, "transaction.timeout.ms"),
467 ) {
468 if msg > txn {
469 tracing::warn!(
470 msg,
471 txn,
472 "clamping message.timeout.ms to transaction.timeout.ms \
473 (librdkafka rejects transactional producers otherwise)"
474 );
475 config.set("message.timeout.ms", txn.to_string());
476 }
477 }
478 }
479
480 config
481 }
482
483 #[must_use]
488 pub fn to_dlq_rdkafka_config(&self) -> ClientConfig {
489 let mut config = ClientConfig::new();
490
491 config.set("bootstrap.servers", &self.bootstrap_servers);
492 config.set("security.protocol", self.security_protocol.as_rdkafka_str());
493
494 if let Some(ref mechanism) = self.sasl_mechanism {
495 config.set("sasl.mechanism", mechanism.as_rdkafka_str());
496 }
497 if let Some(ref username) = self.sasl_username {
498 config.set("sasl.username", username);
499 }
500 if let Some(ref password) = self.sasl_password {
501 config.set("sasl.password", password);
502 }
503 if let Some(ref ca) = self.ssl_ca_location {
504 config.set("ssl.ca.location", ca);
505 }
506 if let Some(ref cert) = self.ssl_certificate_location {
507 config.set("ssl.certificate.location", cert);
508 }
509 if let Some(ref key) = self.ssl_key_location {
510 config.set("ssl.key.location", key);
511 }
512 if let Some(ref key_pass) = self.ssl_key_password {
513 config.set("ssl.key.password", key_pass);
514 }
515
516 config.set("enable.idempotence", "true");
517
518 config
519 }
520}
521
522fn is_blocked_passthrough_key(key: &str) -> bool {
524 key.starts_with("sasl.kerberos.")
525 || matches!(
526 key,
527 "security.protocol"
528 | "sasl.mechanism"
529 | "sasl.username"
530 | "sasl.password"
531 | "sasl.oauthbearer.config"
532 | "ssl.ca.location"
533 | "ssl.certificate.location"
534 | "ssl.key.location"
535 | "ssl.key.password"
536 | "ssl.endpoint.identification.algorithm"
537 | "enable.auto.commit"
538 | "enable.idempotence"
539 | "transactional.id"
540 )
541}
542
543pub use crate::connector::DeliveryGuarantee;
544
545#[derive(Debug, Clone, Copy, PartialEq, Eq)]
547pub enum PartitionStrategy {
548 KeyHash,
550 RoundRobin,
552 Sticky,
554}
555
556str_enum!(PartitionStrategy, lowercase_udash, String, "unknown partition strategy",
557 KeyHash => "key-hash", "keyhash", "hash";
558 RoundRobin => "round-robin", "roundrobin";
559 Sticky => "sticky"
560);
561
562#[derive(Debug, Clone, Copy, PartialEq, Eq)]
564pub enum CompressionType {
565 None,
567 Gzip,
569 Snappy,
571 Lz4,
573 Zstd,
575}
576
577impl CompressionType {
578 #[must_use]
580 pub fn as_rdkafka_str(&self) -> &'static str {
581 match self {
582 Self::None => "none",
583 Self::Gzip => "gzip",
584 Self::Snappy => "snappy",
585 Self::Lz4 => "lz4",
586 Self::Zstd => "zstd",
587 }
588 }
589}
590
591str_enum!(fromstr CompressionType, lowercase_nodash, String, "unknown compression type",
592 None => "none";
593 Gzip => "gzip";
594 Snappy => "snappy";
595 Lz4 => "lz4";
596 Zstd => "zstd", "zstandard"
597);
598
599impl std::fmt::Display for CompressionType {
600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601 write!(f, "{}", self.as_rdkafka_str())
602 }
603}
604
605#[derive(Debug, Clone, Copy, PartialEq, Eq)]
607pub enum Acks {
608 None,
610 Leader,
612 All,
614}
615
616impl Acks {
617 #[must_use]
619 pub fn as_rdkafka_str(&self) -> &'static str {
620 match self {
621 Self::None => "0",
622 Self::Leader => "1",
623 Self::All => "all",
624 }
625 }
626}
627
628str_enum!(fromstr Acks, lowercase_nodash, String, "unknown acks value",
629 None => "0", "none";
630 Leader => "1", "leader";
631 All => "-1", "all"
632);
633
634impl std::fmt::Display for Acks {
635 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
636 write!(f, "{}", self.as_rdkafka_str())
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643
644 fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
645 let mut config = ConnectorConfig::new("kafka");
646 for (k, v) in pairs {
647 config.set(*k, *v);
648 }
649 config
650 }
651
652 fn required_pairs() -> Vec<(&'static str, &'static str)> {
653 vec![
654 ("bootstrap.servers", "localhost:9092"),
655 ("topic", "output-events"),
656 ]
657 }
658
659 #[test]
660 fn test_parse_required_fields() {
661 let config = make_config(&required_pairs());
662 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
663 assert_eq!(cfg.bootstrap_servers, "localhost:9092");
664 assert_eq!(cfg.topic, "output-events");
665 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
666 assert_eq!(cfg.format, Format::Json);
667 assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
668 }
669
670 #[test]
671 fn test_missing_bootstrap_servers() {
672 let config = make_config(&[("topic", "t")]);
673 assert!(KafkaSinkConfig::from_config(&config).is_err());
674 }
675
676 #[test]
677 fn test_missing_topic() {
678 let config = make_config(&[("bootstrap.servers", "b:9092")]);
679 assert!(KafkaSinkConfig::from_config(&config).is_err());
680 }
681
682 #[test]
683 fn test_parse_delivery_guarantee() {
684 let mut pairs = required_pairs();
685 pairs.push(("delivery.guarantee", "exactly-once"));
686 let config = make_config(&pairs);
687 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
688 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
689 }
690
691 #[test]
692 fn test_parse_security_sasl_ssl() {
693 let mut pairs = required_pairs();
694 pairs.extend_from_slice(&[
695 ("security.protocol", "sasl_ssl"),
696 ("sasl.mechanism", "SCRAM-SHA-512"),
697 ("sasl.username", "producer"),
698 ("sasl.password", "secret123"),
699 ("ssl.ca.location", "/etc/ssl/ca.pem"),
700 ]);
701 let config = make_config(&pairs);
702 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
703
704 assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
705 assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha512));
706 assert_eq!(cfg.sasl_username, Some("producer".to_string()));
707 assert_eq!(cfg.sasl_password, Some("secret123".to_string()));
708 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
709 }
710
711 #[test]
712 fn test_parse_security_ssl_only() {
713 let mut pairs = required_pairs();
714 pairs.extend_from_slice(&[
715 ("security.protocol", "ssl"),
716 ("ssl.ca.location", "/etc/ssl/ca.pem"),
717 ("ssl.certificate.location", "/etc/ssl/client.pem"),
718 ("ssl.key.location", "/etc/ssl/client.key"),
719 ("ssl.key.password", "keypass"),
720 ]);
721 let config = make_config(&pairs);
722 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
723
724 assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
725 assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
726 assert_eq!(
727 cfg.ssl_certificate_location,
728 Some("/etc/ssl/client.pem".to_string())
729 );
730 assert_eq!(
731 cfg.ssl_key_location,
732 Some("/etc/ssl/client.key".to_string())
733 );
734 assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
735 }
736
737 #[test]
738 fn test_parse_all_optional_fields() {
739 let mut pairs = required_pairs();
740 pairs.extend_from_slice(&[
741 ("format", "avro"),
742 ("delivery.guarantee", "exactly-once"),
743 ("transactional.id", "my-txn"),
744 ("transaction.timeout.ms", "30000"),
745 ("key.column", "order_id"),
746 ("partitioner", "round-robin"),
747 ("linger.ms", "10"),
748 ("batch.size", "32768"),
749 ("batch.num.messages", "5000"),
750 ("compression.type", "zstd"),
751 ("acks", "1"),
752 ("max.in.flight.requests", "3"),
753 ("delivery.timeout.ms", "60000"),
754 ("dlq.topic", "my-dlq"),
755 ("flush.batch.size", "500"),
756 ("schema.registry.url", "http://sr:8081"),
757 ("schema.registry.username", "user"),
758 ("schema.registry.password", "pass"),
759 ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
760 ]);
761 let config = make_config(&pairs);
762 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
763
764 assert_eq!(cfg.format, Format::Avro);
765 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
766 assert_eq!(cfg.transactional_id.as_deref(), Some("my-txn"));
767 assert_eq!(cfg.transaction_timeout, Duration::from_secs(30));
768 assert_eq!(cfg.key_column.as_deref(), Some("order_id"));
769 assert_eq!(cfg.partitioner, PartitionStrategy::RoundRobin);
770 assert_eq!(cfg.linger_ms, 10);
771 assert_eq!(cfg.batch_size, 32_768);
772 assert_eq!(cfg.batch_num_messages, Some(5000));
773 assert_eq!(cfg.compression, CompressionType::Zstd);
774 assert_eq!(cfg.acks, Acks::Leader);
775 assert_eq!(cfg.max_in_flight, 3);
776 assert_eq!(cfg.delivery_timeout, Duration::from_secs(60));
777 assert_eq!(cfg.dlq_topic.as_deref(), Some("my-dlq"));
778 assert_eq!(cfg.flush_batch_size, 500);
779 assert_eq!(cfg.schema_registry_url.as_deref(), Some("http://sr:8081"));
780 assert!(cfg.schema_registry_auth.is_some());
781 assert_eq!(
782 cfg.schema_registry_ssl_ca_location,
783 Some("/etc/ssl/sr-ca.pem".to_string())
784 );
785 }
786
787 #[test]
788 fn test_validate_avro_requires_sr() {
789 let mut cfg = KafkaSinkConfig::default();
790 cfg.bootstrap_servers = "b:9092".into();
791 cfg.topic = "t".into();
792 cfg.format = Format::Avro;
793 assert!(cfg.validate().is_err());
794 }
795
796 #[test]
797 fn test_validate_exactly_once_max_in_flight() {
798 let mut cfg = KafkaSinkConfig::default();
799 cfg.bootstrap_servers = "b:9092".into();
800 cfg.topic = "t".into();
801 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
802 cfg.max_in_flight = 10;
803 assert!(cfg.validate().is_err());
804 }
805
806 #[test]
807 fn test_validate_sasl_without_mechanism() {
808 let mut cfg = KafkaSinkConfig::default();
809 cfg.bootstrap_servers = "b:9092".into();
810 cfg.topic = "t".into();
811 cfg.security_protocol = SecurityProtocol::SaslSsl;
812 assert!(cfg.validate().is_err());
814 }
815
816 #[test]
817 fn test_validate_sasl_plain_without_credentials() {
818 let mut cfg = KafkaSinkConfig::default();
819 cfg.bootstrap_servers = "b:9092".into();
820 cfg.topic = "t".into();
821 cfg.security_protocol = SecurityProtocol::SaslPlaintext;
822 cfg.sasl_mechanism = Some(SaslMechanism::ScramSha256);
823 assert!(cfg.validate().is_err());
825 }
826
827 #[test]
828 fn test_rdkafka_config_at_least_once() {
829 let mut cfg = KafkaSinkConfig::default();
830 cfg.bootstrap_servers = "b:9092".into();
831 cfg.topic = "t".into();
832 let rdk = cfg.to_rdkafka_config();
833 assert_eq!(rdk.get("enable.idempotence"), Some("true"));
834 assert!(rdk.get("transactional.id").is_none());
835 assert_eq!(rdk.get("security.protocol"), Some("plaintext"));
836 }
837
838 #[test]
839 fn test_rdkafka_config_exactly_once() {
840 let mut cfg = KafkaSinkConfig::default();
841 cfg.bootstrap_servers = "b:9092".into();
842 cfg.topic = "t".into();
843 cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
844 let rdk = cfg.to_rdkafka_config();
845 assert_eq!(rdk.get("enable.idempotence"), Some("true"));
846 assert!(rdk.get("transactional.id").is_some());
847 assert_eq!(rdk.get("message.timeout.ms"), Some("60000"));
851 assert_eq!(rdk.get("transaction.timeout.ms"), Some("60000"));
852 }
853
854 #[test]
855 fn test_rdkafka_config_with_security() {
856 let mut cfg = KafkaSinkConfig::default();
857 cfg.bootstrap_servers = "b:9092".into();
858 cfg.topic = "t".into();
859 cfg.security_protocol = SecurityProtocol::SaslSsl;
860 cfg.sasl_mechanism = Some(SaslMechanism::Plain);
861 cfg.sasl_username = Some("user".into());
862 cfg.sasl_password = Some("pass".into());
863 cfg.ssl_ca_location = Some("/ca.pem".into());
864
865 let rdk = cfg.to_rdkafka_config();
866 assert_eq!(rdk.get("security.protocol"), Some("sasl_ssl"));
867 assert_eq!(rdk.get("sasl.mechanism"), Some("PLAIN"));
868 assert_eq!(rdk.get("sasl.username"), Some("user"));
869 assert_eq!(rdk.get("sasl.password"), Some("pass"));
870 assert_eq!(rdk.get("ssl.ca.location"), Some("/ca.pem"));
871 }
872
873 #[test]
874 fn test_rdkafka_config_with_batch_num_messages() {
875 let mut cfg = KafkaSinkConfig::default();
876 cfg.bootstrap_servers = "b:9092".into();
877 cfg.topic = "t".into();
878 cfg.batch_num_messages = Some(10_000);
879
880 let rdk = cfg.to_rdkafka_config();
881 assert_eq!(rdk.get("batch.num.messages"), Some("10000"));
882 }
883
884 #[test]
885 fn test_kafka_passthrough_properties() {
886 let mut pairs = required_pairs();
887 pairs.push(("kafka.socket.timeout.ms", "5000"));
888 pairs.push(("kafka.queue.buffering.max.messages", "100000"));
889 let config = make_config(&pairs);
890 let cfg = KafkaSinkConfig::from_config(&config).unwrap();
891 assert_eq!(
892 cfg.kafka_properties.get("socket.timeout.ms").unwrap(),
893 "5000"
894 );
895 }
896
897 #[test]
898 fn test_defaults() {
899 let cfg = KafkaSinkConfig::default();
900 assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
901 assert_eq!(cfg.partitioner, PartitionStrategy::KeyHash);
902 assert_eq!(cfg.compression, CompressionType::None);
903 assert_eq!(cfg.acks, Acks::All);
904 assert_eq!(cfg.linger_ms, 5);
905 assert_eq!(cfg.batch_size, 16_384);
906 assert_eq!(cfg.max_in_flight, 5);
907 assert_eq!(cfg.flush_batch_size, 1_000);
908 assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
909 assert!(cfg.sasl_mechanism.is_none());
910 assert!(cfg.batch_num_messages.is_none());
911 }
912
913 #[test]
914 fn test_enum_display() {
915 assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
916 assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
917 assert_eq!(PartitionStrategy::KeyHash.to_string(), "key-hash");
918 assert_eq!(PartitionStrategy::RoundRobin.to_string(), "round-robin");
919 assert_eq!(PartitionStrategy::Sticky.to_string(), "sticky");
920 assert_eq!(CompressionType::Zstd.to_string(), "zstd");
921 assert_eq!(Acks::All.to_string(), "all");
922 }
923
924 #[test]
925 fn test_enum_parse() {
926 assert_eq!(
927 "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
928 DeliveryGuarantee::AtLeastOnce
929 );
930 assert_eq!(
931 "exactly-once".parse::<DeliveryGuarantee>().unwrap(),
932 DeliveryGuarantee::ExactlyOnce
933 );
934 assert_eq!(
935 "key-hash".parse::<PartitionStrategy>().unwrap(),
936 PartitionStrategy::KeyHash
937 );
938 assert_eq!(
939 "round-robin".parse::<PartitionStrategy>().unwrap(),
940 PartitionStrategy::RoundRobin
941 );
942 assert_eq!(
943 "sticky".parse::<PartitionStrategy>().unwrap(),
944 PartitionStrategy::Sticky
945 );
946 assert_eq!(
947 "gzip".parse::<CompressionType>().unwrap(),
948 CompressionType::Gzip
949 );
950 assert_eq!(
951 "snappy".parse::<CompressionType>().unwrap(),
952 CompressionType::Snappy
953 );
954 assert_eq!(
955 "lz4".parse::<CompressionType>().unwrap(),
956 CompressionType::Lz4
957 );
958 assert_eq!(
959 "zstd".parse::<CompressionType>().unwrap(),
960 CompressionType::Zstd
961 );
962 assert_eq!("all".parse::<Acks>().unwrap(), Acks::All);
963 assert_eq!("1".parse::<Acks>().unwrap(), Acks::Leader);
964 assert_eq!("0".parse::<Acks>().unwrap(), Acks::None);
965 }
966}