Skip to main content

laminar_connectors/kafka/
sink_config.rs

1//! Kafka sink connector configuration.
2//!
3//! [`KafkaSinkConfig`] encapsulates all tuning knobs for the Kafka producer,
4//! parsed from a SQL `WITH (...)` clause via [`ConnectorConfig`].
5
6use std::collections::HashMap;
7use std::time::Duration;
8
9use rdkafka::ClientConfig;
10
11use crate::config::ConnectorConfig;
12use crate::error::ConnectorError;
13use crate::kafka::config::{CompatibilityLevel, SaslMechanism, SecurityProtocol, SrAuth};
14use crate::serde::Format;
15
16/// Configuration for the Kafka Sink Connector.
17///
18/// Parsed from SQL `WITH (...)` clause options.
19///
20/// Uses a custom `Debug` impl that redacts `sasl_password` and
21/// `ssl_key_password` to prevent credential leakage in logs.
22#[derive(Clone)]
23pub struct KafkaSinkConfig {
24    /// Kafka broker addresses (comma-separated).
25    pub bootstrap_servers: String,
26    /// Target Kafka topic name.
27    pub topic: String,
28    /// Security protocol for broker connections.
29    pub security_protocol: SecurityProtocol,
30    /// SASL authentication mechanism.
31    pub sasl_mechanism: Option<SaslMechanism>,
32    /// SASL username (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
33    pub sasl_username: Option<String>,
34    /// SASL password (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
35    pub sasl_password: Option<String>,
36    /// Path to SSL CA certificate file (PEM format).
37    pub ssl_ca_location: Option<String>,
38    /// Path to client SSL certificate file (PEM format).
39    pub ssl_certificate_location: Option<String>,
40    /// Path to client SSL private key file (PEM format).
41    pub ssl_key_location: Option<String>,
42    /// Password for encrypted SSL private key.
43    pub ssl_key_password: Option<String>,
44    /// Serialization format.
45    pub format: Format,
46    /// Schema Registry URL for Avro/Protobuf.
47    pub schema_registry_url: Option<String>,
48    /// Schema Registry authentication.
49    pub schema_registry_auth: Option<SrAuth>,
50    /// Schema compatibility level override.
51    pub schema_compatibility: Option<CompatibilityLevel>,
52    /// Schema Registry SSL CA certificate path.
53    pub schema_registry_ssl_ca_location: Option<String>,
54    /// Delivery guarantee level.
55    pub delivery_guarantee: DeliveryGuarantee,
56    /// Transactional ID prefix for exactly-once.
57    pub transactional_id: Option<String>,
58    /// Transaction timeout.
59    pub transaction_timeout: Duration,
60    /// Acknowledgment level.
61    pub acks: Acks,
62    /// Maximum number of in-flight requests per connection.
63    pub max_in_flight: usize,
64    /// Maximum time to wait for delivery confirmation.
65    pub delivery_timeout: Duration,
66    /// Key column name for partitioning.
67    pub key_column: Option<String>,
68    /// Partitioning strategy.
69    pub partitioner: PartitionStrategy,
70    /// Maximum time to wait before sending a batch (milliseconds).
71    pub linger_ms: u64,
72    /// Maximum batch size in bytes.
73    pub batch_size: usize,
74    /// Maximum number of messages per batch.
75    pub batch_num_messages: Option<usize>,
76    /// Compression algorithm.
77    pub compression: CompressionType,
78    /// Dead letter queue topic for failed records.
79    pub dlq_topic: Option<String>,
80    /// Maximum records to buffer before flushing.
81    pub flush_batch_size: usize,
82    /// Additional rdkafka client properties (pass-through).
83    pub kafka_properties: HashMap<String, String>,
84}
85
86impl std::fmt::Debug for KafkaSinkConfig {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("KafkaSinkConfig")
89            .field("bootstrap_servers", &self.bootstrap_servers)
90            .field("topic", &self.topic)
91            .field("format", &self.format)
92            .field("delivery_guarantee", &self.delivery_guarantee)
93            .field("security_protocol", &self.security_protocol)
94            .field("sasl_mechanism", &self.sasl_mechanism)
95            .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
96            .field(
97                "ssl_key_password",
98                &self.ssl_key_password.as_ref().map(|_| "***"),
99            )
100            .field("partitioner", &self.partitioner)
101            .field("acks", &self.acks)
102            .finish_non_exhaustive()
103    }
104}
105
106impl Default for KafkaSinkConfig {
107    fn default() -> Self {
108        Self {
109            bootstrap_servers: String::new(),
110            topic: String::new(),
111            security_protocol: SecurityProtocol::default(),
112            sasl_mechanism: None,
113            sasl_username: None,
114            sasl_password: None,
115            ssl_ca_location: None,
116            ssl_certificate_location: None,
117            ssl_key_location: None,
118            ssl_key_password: None,
119            format: Format::Json,
120            schema_registry_url: None,
121            schema_registry_auth: None,
122            schema_compatibility: None,
123            schema_registry_ssl_ca_location: None,
124            delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
125            transactional_id: None,
126            transaction_timeout: Duration::from_secs(60),
127            acks: Acks::All,
128            max_in_flight: 5,
129            delivery_timeout: Duration::from_secs(120),
130            key_column: None,
131            partitioner: PartitionStrategy::KeyHash,
132            linger_ms: 5,
133            batch_size: 16_384,
134            batch_num_messages: None,
135            compression: CompressionType::None,
136            dlq_topic: None,
137            flush_batch_size: 1_000,
138            kafka_properties: HashMap::new(),
139        }
140    }
141}
142
143impl KafkaSinkConfig {
144    /// Parses a sink config from a [`ConnectorConfig`] (SQL WITH clause).
145    ///
146    /// # Errors
147    ///
148    /// Returns `ConnectorError::MissingConfig` if required keys are absent,
149    /// or `ConnectorError::ConfigurationError` on invalid values.
150    #[allow(clippy::too_many_lines, clippy::field_reassign_with_default)]
151    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
152        let mut cfg = Self::default();
153
154        cfg.bootstrap_servers = config
155            .get("bootstrap.servers")
156            .ok_or_else(|| ConnectorError::MissingConfig("bootstrap.servers".into()))?
157            .to_string();
158
159        cfg.topic = config
160            .get("topic")
161            .ok_or_else(|| ConnectorError::MissingConfig("topic".into()))?
162            .to_string();
163
164        if let Some(s) = config.get("security.protocol") {
165            cfg.security_protocol = s.parse()?;
166        }
167
168        if let Some(s) = config.get("sasl.mechanism") {
169            cfg.sasl_mechanism = Some(s.parse()?);
170        }
171
172        cfg.sasl_username = config.get("sasl.username").map(String::from);
173        cfg.sasl_password = config.get("sasl.password").map(String::from);
174        cfg.ssl_ca_location = config.get("ssl.ca.location").map(String::from);
175        cfg.ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
176        cfg.ssl_key_location = config.get("ssl.key.location").map(String::from);
177        cfg.ssl_key_password = config.get("ssl.key.password").map(String::from);
178
179        if let Some(fmt) = config.get("format") {
180            cfg.format = fmt.parse().map_err(ConnectorError::Serde)?;
181        }
182
183        cfg.schema_registry_url = config.get("schema.registry.url").map(String::from);
184
185        let sr_user = config.get("schema.registry.username");
186        let sr_pass = config.get("schema.registry.password");
187        if let (Some(user), Some(pass)) = (sr_user, sr_pass) {
188            cfg.schema_registry_auth = Some(SrAuth {
189                username: user.to_string(),
190                password: pass.to_string(),
191            });
192        }
193
194        if let Some(c) = config.get("schema.compatibility") {
195            cfg.schema_compatibility = Some(c.parse().map_err(|_| {
196                ConnectorError::ConfigurationError(format!("invalid schema.compatibility: '{c}'"))
197            })?);
198        }
199
200        cfg.schema_registry_ssl_ca_location = config
201            .get("schema.registry.ssl.ca.location")
202            .map(String::from);
203
204        if let Some(dg) = config.get("delivery.guarantee") {
205            cfg.delivery_guarantee = dg.parse().map_err(|_| {
206                ConnectorError::ConfigurationError(format!(
207                    "invalid delivery.guarantee: '{dg}' (expected 'at-least-once' or 'exactly-once')"
208                ))
209            })?;
210        }
211
212        cfg.transactional_id = config.get("transactional.id").map(String::from);
213
214        if let Some(v) = config.get("transaction.timeout.ms") {
215            let ms: u64 = v.parse().map_err(|_| {
216                ConnectorError::ConfigurationError(format!("invalid transaction.timeout.ms: '{v}'"))
217            })?;
218            cfg.transaction_timeout = Duration::from_millis(ms);
219        }
220
221        if let Some(a) = config.get("acks") {
222            cfg.acks = a.parse().map_err(|_| {
223                ConnectorError::ConfigurationError(format!(
224                    "invalid acks: '{a}' (expected 'all', '1', or '0')"
225                ))
226            })?;
227        }
228
229        if let Some(v) = config.get("max.in.flight.requests") {
230            cfg.max_in_flight = v.parse().map_err(|_| {
231                ConnectorError::ConfigurationError(format!("invalid max.in.flight.requests: '{v}'"))
232            })?;
233        }
234
235        if let Some(v) = config.get("delivery.timeout.ms") {
236            let ms: u64 = v.parse().map_err(|_| {
237                ConnectorError::ConfigurationError(format!("invalid delivery.timeout.ms: '{v}'"))
238            })?;
239            cfg.delivery_timeout = Duration::from_millis(ms);
240        }
241
242        cfg.key_column = config.get("key.column").map(String::from);
243
244        if let Some(p) = config.get("partitioner") {
245            cfg.partitioner = p.parse().map_err(|_| {
246                ConnectorError::ConfigurationError(format!(
247                    "invalid partitioner: '{p}' (expected 'key-hash', 'round-robin', or 'sticky')"
248                ))
249            })?;
250        }
251
252        if let Some(v) = config.get("linger.ms") {
253            cfg.linger_ms = v.parse().map_err(|_| {
254                ConnectorError::ConfigurationError(format!("invalid linger.ms: '{v}'"))
255            })?;
256        }
257
258        if let Some(v) = config.get("batch.size") {
259            cfg.batch_size = v.parse().map_err(|_| {
260                ConnectorError::ConfigurationError(format!("invalid batch.size: '{v}'"))
261            })?;
262        }
263
264        if let Some(v) = config.get("batch.num.messages") {
265            cfg.batch_num_messages = Some(v.parse().map_err(|_| {
266                ConnectorError::ConfigurationError(format!("invalid batch.num.messages: '{v}'"))
267            })?);
268        }
269
270        if let Some(c) = config.get("compression.type") {
271            cfg.compression = c.parse().map_err(|_| {
272                ConnectorError::ConfigurationError(format!("invalid compression.type: '{c}'"))
273            })?;
274        }
275
276        cfg.dlq_topic = config.get("dlq.topic").map(String::from);
277
278        if let Some(v) = config.get("flush.batch.size") {
279            cfg.flush_batch_size = v.parse().map_err(|_| {
280                ConnectorError::ConfigurationError(format!("invalid flush.batch.size: '{v}'"))
281            })?;
282        }
283
284        for (key, value) in config.properties_with_prefix("kafka.") {
285            cfg.kafka_properties.insert(key, value);
286        }
287
288        cfg.validate()?;
289        Ok(cfg)
290    }
291
292    /// Validates the configuration.
293    ///
294    /// # Errors
295    ///
296    /// Returns `ConnectorError::ConfigurationError` on invalid combinations.
297    pub fn validate(&self) -> Result<(), ConnectorError> {
298        if self.bootstrap_servers.is_empty() {
299            return Err(ConnectorError::MissingConfig("bootstrap.servers".into()));
300        }
301        if self.topic.is_empty() {
302            return Err(ConnectorError::MissingConfig("topic".into()));
303        }
304
305        if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
306            return Err(ConnectorError::ConfigurationError(
307                "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
308                    .into(),
309            ));
310        }
311
312        if let Some(mechanism) = &self.sasl_mechanism {
313            if mechanism.requires_credentials()
314                && (self.sasl_username.is_none() || self.sasl_password.is_none())
315            {
316                return Err(ConnectorError::ConfigurationError(format!(
317                    "sasl.username and sasl.password are required for {mechanism} mechanism"
318                )));
319            }
320        }
321
322        if self.security_protocol.uses_ssl() {
323            if let Some(ref ca) = self.ssl_ca_location {
324                if ca.is_empty() {
325                    return Err(ConnectorError::ConfigurationError(
326                        "ssl.ca.location cannot be empty when specified".into(),
327                    ));
328                }
329            }
330        }
331
332        if self.format == Format::Avro && self.schema_registry_url.is_none() {
333            return Err(ConnectorError::ConfigurationError(
334                "Avro format requires 'schema.registry.url'".into(),
335            ));
336        }
337
338        if self.max_in_flight == 0 {
339            return Err(ConnectorError::ConfigurationError(
340                "max.in.flight.requests must be > 0".into(),
341            ));
342        }
343
344        if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce && self.max_in_flight > 5 {
345            return Err(ConnectorError::ConfigurationError(
346                "exactly-once requires max.in.flight.requests <= 5".into(),
347            ));
348        }
349
350        Ok(())
351    }
352
353    /// Builds an rdkafka [`ClientConfig`] from this configuration.
354    ///
355    /// Always sets `enable.idempotence=true`. For exactly-once delivery,
356    /// also sets `transactional.id` and `transaction.timeout.ms`.
357    #[must_use]
358    pub fn to_rdkafka_config(&self) -> ClientConfig {
359        let mut config = ClientConfig::new();
360
361        config.set("bootstrap.servers", &self.bootstrap_servers);
362        config.set("security.protocol", self.security_protocol.as_rdkafka_str());
363
364        if let Some(ref mechanism) = self.sasl_mechanism {
365            config.set("sasl.mechanism", mechanism.as_rdkafka_str());
366        }
367
368        if let Some(ref username) = self.sasl_username {
369            config.set("sasl.username", username);
370        }
371
372        if let Some(ref password) = self.sasl_password {
373            config.set("sasl.password", password);
374        }
375
376        if let Some(ref ca) = self.ssl_ca_location {
377            config.set("ssl.ca.location", ca);
378        }
379
380        if let Some(ref cert) = self.ssl_certificate_location {
381            config.set("ssl.certificate.location", cert);
382        }
383
384        if let Some(ref key) = self.ssl_key_location {
385            config.set("ssl.key.location", key);
386        }
387
388        if let Some(ref key_pass) = self.ssl_key_password {
389            config.set("ssl.key.password", key_pass);
390        }
391
392        config
393            .set("enable.idempotence", "true")
394            .set("acks", self.acks.as_rdkafka_str())
395            .set("linger.ms", self.linger_ms.to_string())
396            .set("batch.size", self.batch_size.to_string())
397            .set("compression.type", self.compression.as_rdkafka_str())
398            .set(
399                "max.in.flight.requests.per.connection",
400                self.max_in_flight.to_string(),
401            )
402            .set(
403                "message.timeout.ms",
404                self.delivery_timeout.as_millis().to_string(),
405            );
406
407        if let Some(num_msgs) = self.batch_num_messages {
408            config.set("batch.num.messages", num_msgs.to_string());
409        }
410
411        if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
412            let txn_id = self
413                .transactional_id
414                .clone()
415                .unwrap_or_else(|| format!("laminardb-sink-{}", self.topic));
416            config.set("transactional.id", txn_id);
417            config.set(
418                "transaction.timeout.ms",
419                self.transaction_timeout.as_millis().to_string(),
420            );
421        }
422
423        // Apply pass-through properties, blocking security-critical keys
424        // that could silently downgrade authentication or break semantics.
425        for (key, value) in &self.kafka_properties {
426            if is_blocked_passthrough_key(key) {
427                tracing::warn!(
428                    key,
429                    "ignoring kafka.* pass-through property that overrides a security setting"
430                );
431                continue;
432            }
433            config.set(key, value);
434        }
435
436        config
437    }
438
439    /// Builds an rdkafka [`ClientConfig`] for the dead letter queue producer.
440    ///
441    /// Inherits security settings (SASL, SSL) from the main config but is
442    /// non-transactional. Does not set `transactional.id`.
443    #[must_use]
444    pub fn to_dlq_rdkafka_config(&self) -> ClientConfig {
445        let mut config = ClientConfig::new();
446
447        config.set("bootstrap.servers", &self.bootstrap_servers);
448        config.set("security.protocol", self.security_protocol.as_rdkafka_str());
449
450        if let Some(ref mechanism) = self.sasl_mechanism {
451            config.set("sasl.mechanism", mechanism.as_rdkafka_str());
452        }
453        if let Some(ref username) = self.sasl_username {
454            config.set("sasl.username", username);
455        }
456        if let Some(ref password) = self.sasl_password {
457            config.set("sasl.password", password);
458        }
459        if let Some(ref ca) = self.ssl_ca_location {
460            config.set("ssl.ca.location", ca);
461        }
462        if let Some(ref cert) = self.ssl_certificate_location {
463            config.set("ssl.certificate.location", cert);
464        }
465        if let Some(ref key) = self.ssl_key_location {
466            config.set("ssl.key.location", key);
467        }
468        if let Some(ref key_pass) = self.ssl_key_password {
469            config.set("ssl.key.password", key_pass);
470        }
471
472        config.set("enable.idempotence", "true");
473
474        config
475    }
476}
477
478/// Returns `true` if a pass-through kafka.* key must not override explicit settings.
479fn is_blocked_passthrough_key(key: &str) -> bool {
480    key.starts_with("sasl.kerberos.")
481        || matches!(
482            key,
483            "security.protocol"
484                | "sasl.mechanism"
485                | "sasl.username"
486                | "sasl.password"
487                | "sasl.oauthbearer.config"
488                | "ssl.ca.location"
489                | "ssl.certificate.location"
490                | "ssl.key.location"
491                | "ssl.key.password"
492                | "ssl.endpoint.identification.algorithm"
493                | "enable.auto.commit"
494                | "enable.idempotence"
495                | "transactional.id"
496        )
497}
498
499/// Delivery guarantee level for the Kafka sink.
500#[derive(Debug, Clone, Copy, PartialEq, Eq)]
501pub enum DeliveryGuarantee {
502    /// At-least-once: idempotent producer, no transactions.
503    AtLeastOnce,
504    /// Exactly-once: transactional producer with epoch-aligned commits.
505    ExactlyOnce,
506}
507
508str_enum!(DeliveryGuarantee, lowercase_udash, String, "unknown delivery guarantee",
509    AtLeastOnce => "at-least-once", "atleastonce";
510    ExactlyOnce => "exactly-once", "exactlyonce"
511);
512
513/// Partitioning strategy for distributing records across Kafka partitions.
514#[derive(Debug, Clone, Copy, PartialEq, Eq)]
515pub enum PartitionStrategy {
516    /// Hash the key column (Murmur2, Kafka-compatible).
517    KeyHash,
518    /// Round-robin across all partitions.
519    RoundRobin,
520    /// Sticky: batch records to the same partition until full.
521    Sticky,
522}
523
524str_enum!(PartitionStrategy, lowercase_udash, String, "unknown partition strategy",
525    KeyHash => "key-hash", "keyhash", "hash";
526    RoundRobin => "round-robin", "roundrobin";
527    Sticky => "sticky"
528);
529
530/// Compression type for produced Kafka messages.
531#[derive(Debug, Clone, Copy, PartialEq, Eq)]
532pub enum CompressionType {
533    /// No compression.
534    None,
535    /// Gzip compression.
536    Gzip,
537    /// Snappy compression.
538    Snappy,
539    /// LZ4 compression.
540    Lz4,
541    /// Zstandard compression.
542    Zstd,
543}
544
545impl CompressionType {
546    /// Returns the rdkafka configuration string.
547    #[must_use]
548    pub fn as_rdkafka_str(&self) -> &'static str {
549        match self {
550            Self::None => "none",
551            Self::Gzip => "gzip",
552            Self::Snappy => "snappy",
553            Self::Lz4 => "lz4",
554            Self::Zstd => "zstd",
555        }
556    }
557}
558
559str_enum!(fromstr CompressionType, lowercase_nodash, String, "unknown compression type",
560    None => "none";
561    Gzip => "gzip";
562    Snappy => "snappy";
563    Lz4 => "lz4";
564    Zstd => "zstd", "zstandard"
565);
566
567impl std::fmt::Display for CompressionType {
568    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569        write!(f, "{}", self.as_rdkafka_str())
570    }
571}
572
573/// Acknowledgment level for Kafka producer.
574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575pub enum Acks {
576    /// No acknowledgment (fire-and-forget).
577    None,
578    /// Leader acknowledgment only.
579    Leader,
580    /// All in-sync replica acknowledgment.
581    All,
582}
583
584impl Acks {
585    /// Returns the rdkafka configuration string.
586    #[must_use]
587    pub fn as_rdkafka_str(&self) -> &'static str {
588        match self {
589            Self::None => "0",
590            Self::Leader => "1",
591            Self::All => "all",
592        }
593    }
594}
595
596str_enum!(fromstr Acks, lowercase_nodash, String, "unknown acks value",
597    None => "0", "none";
598    Leader => "1", "leader";
599    All => "-1", "all"
600);
601
602impl std::fmt::Display for Acks {
603    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604        write!(f, "{}", self.as_rdkafka_str())
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611
612    fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
613        let mut config = ConnectorConfig::new("kafka");
614        for (k, v) in pairs {
615            config.set(*k, *v);
616        }
617        config
618    }
619
620    fn required_pairs() -> Vec<(&'static str, &'static str)> {
621        vec![
622            ("bootstrap.servers", "localhost:9092"),
623            ("topic", "output-events"),
624        ]
625    }
626
627    #[test]
628    fn test_parse_required_fields() {
629        let config = make_config(&required_pairs());
630        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
631        assert_eq!(cfg.bootstrap_servers, "localhost:9092");
632        assert_eq!(cfg.topic, "output-events");
633        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
634        assert_eq!(cfg.format, Format::Json);
635        assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
636    }
637
638    #[test]
639    fn test_missing_bootstrap_servers() {
640        let config = make_config(&[("topic", "t")]);
641        assert!(KafkaSinkConfig::from_config(&config).is_err());
642    }
643
644    #[test]
645    fn test_missing_topic() {
646        let config = make_config(&[("bootstrap.servers", "b:9092")]);
647        assert!(KafkaSinkConfig::from_config(&config).is_err());
648    }
649
650    #[test]
651    fn test_parse_delivery_guarantee() {
652        let mut pairs = required_pairs();
653        pairs.push(("delivery.guarantee", "exactly-once"));
654        let config = make_config(&pairs);
655        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
656        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
657    }
658
659    #[test]
660    fn test_parse_security_sasl_ssl() {
661        let mut pairs = required_pairs();
662        pairs.extend_from_slice(&[
663            ("security.protocol", "sasl_ssl"),
664            ("sasl.mechanism", "SCRAM-SHA-512"),
665            ("sasl.username", "producer"),
666            ("sasl.password", "secret123"),
667            ("ssl.ca.location", "/etc/ssl/ca.pem"),
668        ]);
669        let config = make_config(&pairs);
670        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
671
672        assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
673        assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha512));
674        assert_eq!(cfg.sasl_username, Some("producer".to_string()));
675        assert_eq!(cfg.sasl_password, Some("secret123".to_string()));
676        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
677    }
678
679    #[test]
680    fn test_parse_security_ssl_only() {
681        let mut pairs = required_pairs();
682        pairs.extend_from_slice(&[
683            ("security.protocol", "ssl"),
684            ("ssl.ca.location", "/etc/ssl/ca.pem"),
685            ("ssl.certificate.location", "/etc/ssl/client.pem"),
686            ("ssl.key.location", "/etc/ssl/client.key"),
687            ("ssl.key.password", "keypass"),
688        ]);
689        let config = make_config(&pairs);
690        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
691
692        assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
693        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
694        assert_eq!(
695            cfg.ssl_certificate_location,
696            Some("/etc/ssl/client.pem".to_string())
697        );
698        assert_eq!(
699            cfg.ssl_key_location,
700            Some("/etc/ssl/client.key".to_string())
701        );
702        assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
703    }
704
705    #[test]
706    fn test_parse_all_optional_fields() {
707        let mut pairs = required_pairs();
708        pairs.extend_from_slice(&[
709            ("format", "avro"),
710            ("delivery.guarantee", "exactly-once"),
711            ("transactional.id", "my-txn"),
712            ("transaction.timeout.ms", "30000"),
713            ("key.column", "order_id"),
714            ("partitioner", "round-robin"),
715            ("linger.ms", "10"),
716            ("batch.size", "32768"),
717            ("batch.num.messages", "5000"),
718            ("compression.type", "zstd"),
719            ("acks", "1"),
720            ("max.in.flight.requests", "3"),
721            ("delivery.timeout.ms", "60000"),
722            ("dlq.topic", "my-dlq"),
723            ("flush.batch.size", "500"),
724            ("schema.registry.url", "http://sr:8081"),
725            ("schema.registry.username", "user"),
726            ("schema.registry.password", "pass"),
727            ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
728        ]);
729        let config = make_config(&pairs);
730        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
731
732        assert_eq!(cfg.format, Format::Avro);
733        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
734        assert_eq!(cfg.transactional_id.as_deref(), Some("my-txn"));
735        assert_eq!(cfg.transaction_timeout, Duration::from_millis(30_000));
736        assert_eq!(cfg.key_column.as_deref(), Some("order_id"));
737        assert_eq!(cfg.partitioner, PartitionStrategy::RoundRobin);
738        assert_eq!(cfg.linger_ms, 10);
739        assert_eq!(cfg.batch_size, 32_768);
740        assert_eq!(cfg.batch_num_messages, Some(5000));
741        assert_eq!(cfg.compression, CompressionType::Zstd);
742        assert_eq!(cfg.acks, Acks::Leader);
743        assert_eq!(cfg.max_in_flight, 3);
744        assert_eq!(cfg.delivery_timeout, Duration::from_millis(60_000));
745        assert_eq!(cfg.dlq_topic.as_deref(), Some("my-dlq"));
746        assert_eq!(cfg.flush_batch_size, 500);
747        assert_eq!(cfg.schema_registry_url.as_deref(), Some("http://sr:8081"));
748        assert!(cfg.schema_registry_auth.is_some());
749        assert_eq!(
750            cfg.schema_registry_ssl_ca_location,
751            Some("/etc/ssl/sr-ca.pem".to_string())
752        );
753    }
754
755    #[test]
756    fn test_validate_avro_requires_sr() {
757        let mut cfg = KafkaSinkConfig::default();
758        cfg.bootstrap_servers = "b:9092".into();
759        cfg.topic = "t".into();
760        cfg.format = Format::Avro;
761        assert!(cfg.validate().is_err());
762    }
763
764    #[test]
765    fn test_validate_exactly_once_max_in_flight() {
766        let mut cfg = KafkaSinkConfig::default();
767        cfg.bootstrap_servers = "b:9092".into();
768        cfg.topic = "t".into();
769        cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
770        cfg.max_in_flight = 10;
771        assert!(cfg.validate().is_err());
772    }
773
774    #[test]
775    fn test_validate_sasl_without_mechanism() {
776        let mut cfg = KafkaSinkConfig::default();
777        cfg.bootstrap_servers = "b:9092".into();
778        cfg.topic = "t".into();
779        cfg.security_protocol = SecurityProtocol::SaslSsl;
780        // sasl_mechanism not set
781        assert!(cfg.validate().is_err());
782    }
783
784    #[test]
785    fn test_validate_sasl_plain_without_credentials() {
786        let mut cfg = KafkaSinkConfig::default();
787        cfg.bootstrap_servers = "b:9092".into();
788        cfg.topic = "t".into();
789        cfg.security_protocol = SecurityProtocol::SaslPlaintext;
790        cfg.sasl_mechanism = Some(SaslMechanism::ScramSha256);
791        // username/password not set
792        assert!(cfg.validate().is_err());
793    }
794
795    #[test]
796    fn test_rdkafka_config_at_least_once() {
797        let mut cfg = KafkaSinkConfig::default();
798        cfg.bootstrap_servers = "b:9092".into();
799        cfg.topic = "t".into();
800        let rdk = cfg.to_rdkafka_config();
801        assert_eq!(rdk.get("enable.idempotence"), Some("true"));
802        assert!(rdk.get("transactional.id").is_none());
803        assert_eq!(rdk.get("security.protocol"), Some("plaintext"));
804    }
805
806    #[test]
807    fn test_rdkafka_config_exactly_once() {
808        let mut cfg = KafkaSinkConfig::default();
809        cfg.bootstrap_servers = "b:9092".into();
810        cfg.topic = "t".into();
811        cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
812        let rdk = cfg.to_rdkafka_config();
813        assert_eq!(rdk.get("enable.idempotence"), Some("true"));
814        assert!(rdk.get("transactional.id").is_some());
815    }
816
817    #[test]
818    fn test_rdkafka_config_with_security() {
819        let mut cfg = KafkaSinkConfig::default();
820        cfg.bootstrap_servers = "b:9092".into();
821        cfg.topic = "t".into();
822        cfg.security_protocol = SecurityProtocol::SaslSsl;
823        cfg.sasl_mechanism = Some(SaslMechanism::Plain);
824        cfg.sasl_username = Some("user".into());
825        cfg.sasl_password = Some("pass".into());
826        cfg.ssl_ca_location = Some("/ca.pem".into());
827
828        let rdk = cfg.to_rdkafka_config();
829        assert_eq!(rdk.get("security.protocol"), Some("sasl_ssl"));
830        assert_eq!(rdk.get("sasl.mechanism"), Some("PLAIN"));
831        assert_eq!(rdk.get("sasl.username"), Some("user"));
832        assert_eq!(rdk.get("sasl.password"), Some("pass"));
833        assert_eq!(rdk.get("ssl.ca.location"), Some("/ca.pem"));
834    }
835
836    #[test]
837    fn test_rdkafka_config_with_batch_num_messages() {
838        let mut cfg = KafkaSinkConfig::default();
839        cfg.bootstrap_servers = "b:9092".into();
840        cfg.topic = "t".into();
841        cfg.batch_num_messages = Some(10_000);
842
843        let rdk = cfg.to_rdkafka_config();
844        assert_eq!(rdk.get("batch.num.messages"), Some("10000"));
845    }
846
847    #[test]
848    fn test_kafka_passthrough_properties() {
849        let mut pairs = required_pairs();
850        pairs.push(("kafka.socket.timeout.ms", "5000"));
851        pairs.push(("kafka.queue.buffering.max.messages", "100000"));
852        let config = make_config(&pairs);
853        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
854        assert_eq!(
855            cfg.kafka_properties.get("socket.timeout.ms").unwrap(),
856            "5000"
857        );
858    }
859
860    #[test]
861    fn test_defaults() {
862        let cfg = KafkaSinkConfig::default();
863        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
864        assert_eq!(cfg.partitioner, PartitionStrategy::KeyHash);
865        assert_eq!(cfg.compression, CompressionType::None);
866        assert_eq!(cfg.acks, Acks::All);
867        assert_eq!(cfg.linger_ms, 5);
868        assert_eq!(cfg.batch_size, 16_384);
869        assert_eq!(cfg.max_in_flight, 5);
870        assert_eq!(cfg.flush_batch_size, 1_000);
871        assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
872        assert!(cfg.sasl_mechanism.is_none());
873        assert!(cfg.batch_num_messages.is_none());
874    }
875
876    #[test]
877    fn test_enum_display() {
878        assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
879        assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
880        assert_eq!(PartitionStrategy::KeyHash.to_string(), "key-hash");
881        assert_eq!(PartitionStrategy::RoundRobin.to_string(), "round-robin");
882        assert_eq!(PartitionStrategy::Sticky.to_string(), "sticky");
883        assert_eq!(CompressionType::Zstd.to_string(), "zstd");
884        assert_eq!(Acks::All.to_string(), "all");
885    }
886
887    #[test]
888    fn test_enum_parse() {
889        assert_eq!(
890            "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
891            DeliveryGuarantee::AtLeastOnce
892        );
893        assert_eq!(
894            "exactly-once".parse::<DeliveryGuarantee>().unwrap(),
895            DeliveryGuarantee::ExactlyOnce
896        );
897        assert_eq!(
898            "key-hash".parse::<PartitionStrategy>().unwrap(),
899            PartitionStrategy::KeyHash
900        );
901        assert_eq!(
902            "round-robin".parse::<PartitionStrategy>().unwrap(),
903            PartitionStrategy::RoundRobin
904        );
905        assert_eq!(
906            "sticky".parse::<PartitionStrategy>().unwrap(),
907            PartitionStrategy::Sticky
908        );
909        assert_eq!(
910            "gzip".parse::<CompressionType>().unwrap(),
911            CompressionType::Gzip
912        );
913        assert_eq!(
914            "snappy".parse::<CompressionType>().unwrap(),
915            CompressionType::Snappy
916        );
917        assert_eq!(
918            "lz4".parse::<CompressionType>().unwrap(),
919            CompressionType::Lz4
920        );
921        assert_eq!(
922            "zstd".parse::<CompressionType>().unwrap(),
923            CompressionType::Zstd
924        );
925        assert_eq!("all".parse::<Acks>().unwrap(), Acks::All);
926        assert_eq!("1".parse::<Acks>().unwrap(), Acks::Leader);
927        assert_eq!("0".parse::<Acks>().unwrap(), Acks::None);
928    }
929}