Skip to main content

laminar_connectors/kafka/
sink_config.rs

1//! Kafka sink connector configuration.
2//!
3//! [`KafkaSinkConfig`] encapsulates all tuning knobs for the Kafka producer,
4//! parsed from a SQL `WITH (...)` clause via [`ConnectorConfig`].
5
6use std::collections::HashMap;
7use std::time::Duration;
8
9use rdkafka::ClientConfig;
10
11use crate::config::ConnectorConfig;
12use crate::error::ConnectorError;
13use crate::kafka::config::{CompatibilityLevel, SaslMechanism, SecurityProtocol, SrAuth};
14use crate::serde::Format;
15
16/// Configuration for the Kafka Sink Connector.
17///
18/// Parsed from SQL `WITH (...)` clause options.
19///
20/// Uses a custom `Debug` impl that redacts `sasl_password` and
21/// `ssl_key_password` to prevent credential leakage in logs.
22#[derive(Clone)]
23pub struct KafkaSinkConfig {
24    /// Kafka broker addresses (comma-separated).
25    pub bootstrap_servers: String,
26    /// Target Kafka topic name.
27    pub topic: String,
28    /// Security protocol for broker connections.
29    pub security_protocol: SecurityProtocol,
30    /// SASL authentication mechanism.
31    pub sasl_mechanism: Option<SaslMechanism>,
32    /// SASL username (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
33    pub sasl_username: Option<String>,
34    /// SASL password (for PLAIN, SCRAM-SHA-256, SCRAM-SHA-512).
35    pub sasl_password: Option<String>,
36    /// Path to SSL CA certificate file (PEM format).
37    pub ssl_ca_location: Option<String>,
38    /// Path to client SSL certificate file (PEM format).
39    pub ssl_certificate_location: Option<String>,
40    /// Path to client SSL private key file (PEM format).
41    pub ssl_key_location: Option<String>,
42    /// Password for encrypted SSL private key.
43    pub ssl_key_password: Option<String>,
44    /// Serialization format.
45    pub format: Format,
46    /// Schema Registry URL for Avro/Protobuf.
47    pub schema_registry_url: Option<String>,
48    /// Schema Registry authentication.
49    pub schema_registry_auth: Option<SrAuth>,
50    /// Schema compatibility level override.
51    pub schema_compatibility: Option<CompatibilityLevel>,
52    /// Schema Registry SSL CA certificate path.
53    pub schema_registry_ssl_ca_location: Option<String>,
54    /// Delivery guarantee level.
55    pub delivery_guarantee: DeliveryGuarantee,
56    /// Transactional ID prefix for exactly-once.
57    ///
58    // TODO(distributed): embed the lease epoch here so a new lease holder
59    // fences the previous one via Kafka's producer epoch.
60    pub transactional_id: Option<String>,
61    /// Transaction timeout.
62    pub transaction_timeout: Duration,
63    /// Acknowledgment level.
64    pub acks: Acks,
65    /// Maximum number of in-flight requests per connection.
66    pub max_in_flight: usize,
67    /// Maximum time to wait for delivery confirmation.
68    pub delivery_timeout: Duration,
69    /// Key column name for partitioning.
70    pub key_column: Option<String>,
71    /// Partitioning strategy.
72    pub partitioner: PartitionStrategy,
73    /// Maximum time to wait before sending a batch (milliseconds).
74    pub linger_ms: u64,
75    /// Maximum batch size in bytes.
76    pub batch_size: usize,
77    /// Maximum number of messages per batch.
78    pub batch_num_messages: Option<usize>,
79    /// Compression algorithm.
80    pub compression: CompressionType,
81    /// Dead letter queue topic for failed records.
82    pub dlq_topic: Option<String>,
83    /// Maximum records to buffer before flushing.
84    pub flush_batch_size: usize,
85    /// Additional rdkafka client properties (pass-through).
86    pub kafka_properties: HashMap<String, String>,
87}
88
89impl std::fmt::Debug for KafkaSinkConfig {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct("KafkaSinkConfig")
92            .field("bootstrap_servers", &self.bootstrap_servers)
93            .field("topic", &self.topic)
94            .field("format", &self.format)
95            .field("delivery_guarantee", &self.delivery_guarantee)
96            .field("security_protocol", &self.security_protocol)
97            .field("sasl_mechanism", &self.sasl_mechanism)
98            .field("sasl_password", &self.sasl_password.as_ref().map(|_| "***"))
99            .field(
100                "ssl_key_password",
101                &self.ssl_key_password.as_ref().map(|_| "***"),
102            )
103            .field("partitioner", &self.partitioner)
104            .field("acks", &self.acks)
105            .finish_non_exhaustive()
106    }
107}
108
109impl Default for KafkaSinkConfig {
110    fn default() -> Self {
111        Self {
112            bootstrap_servers: String::new(),
113            topic: String::new(),
114            security_protocol: SecurityProtocol::default(),
115            sasl_mechanism: None,
116            sasl_username: None,
117            sasl_password: None,
118            ssl_ca_location: None,
119            ssl_certificate_location: None,
120            ssl_key_location: None,
121            ssl_key_password: None,
122            format: Format::Json,
123            schema_registry_url: None,
124            schema_registry_auth: None,
125            schema_compatibility: None,
126            schema_registry_ssl_ca_location: None,
127            delivery_guarantee: DeliveryGuarantee::AtLeastOnce,
128            transactional_id: None,
129            transaction_timeout: Duration::from_secs(60),
130            acks: Acks::All,
131            max_in_flight: 5,
132            delivery_timeout: Duration::from_secs(120),
133            key_column: None,
134            partitioner: PartitionStrategy::KeyHash,
135            linger_ms: 5,
136            batch_size: 16_384,
137            batch_num_messages: None,
138            compression: CompressionType::None,
139            dlq_topic: None,
140            flush_batch_size: 1_000,
141            kafka_properties: HashMap::new(),
142        }
143    }
144}
145
146impl KafkaSinkConfig {
147    /// Parses a sink config from a [`ConnectorConfig`] (SQL WITH clause).
148    ///
149    /// # Errors
150    ///
151    /// Returns `ConnectorError::MissingConfig` if required keys are absent,
152    /// or `ConnectorError::ConfigurationError` on invalid values.
153    #[allow(clippy::too_many_lines, clippy::field_reassign_with_default)]
154    pub fn from_config(config: &ConnectorConfig) -> Result<Self, ConnectorError> {
155        let mut cfg = Self::default();
156
157        cfg.bootstrap_servers = config
158            .get("bootstrap.servers")
159            .ok_or_else(|| ConnectorError::missing_config("bootstrap.servers"))?
160            .to_string();
161
162        cfg.topic = config
163            .get("topic")
164            .ok_or_else(|| ConnectorError::missing_config("topic"))?
165            .to_string();
166
167        if let Some(s) = config.get("security.protocol") {
168            cfg.security_protocol = s.parse()?;
169        }
170
171        if let Some(s) = config.get("sasl.mechanism") {
172            cfg.sasl_mechanism = Some(s.parse()?);
173        }
174
175        cfg.sasl_username = config.get("sasl.username").map(String::from);
176        cfg.sasl_password = config.get("sasl.password").map(String::from);
177        cfg.ssl_ca_location = config.get("ssl.ca.location").map(String::from);
178        cfg.ssl_certificate_location = config.get("ssl.certificate.location").map(String::from);
179        cfg.ssl_key_location = config.get("ssl.key.location").map(String::from);
180        cfg.ssl_key_password = config.get("ssl.key.password").map(String::from);
181
182        if let Some(fmt) = config.get("format") {
183            cfg.format = fmt.parse().map_err(ConnectorError::Serde)?;
184        }
185
186        cfg.schema_registry_url = config.get("schema.registry.url").map(String::from);
187
188        let sr_user = config.get("schema.registry.username");
189        let sr_pass = config.get("schema.registry.password");
190        if let (Some(user), Some(pass)) = (sr_user, sr_pass) {
191            cfg.schema_registry_auth = Some(SrAuth {
192                username: user.to_string(),
193                password: pass.to_string(),
194            });
195        }
196
197        if let Some(c) = config.get("schema.compatibility") {
198            cfg.schema_compatibility = Some(c.parse().map_err(|_| {
199                ConnectorError::ConfigurationError(format!("invalid schema.compatibility: '{c}'"))
200            })?);
201        }
202
203        cfg.schema_registry_ssl_ca_location = config
204            .get("schema.registry.ssl.ca.location")
205            .map(String::from);
206
207        if let Some(dg) = config.get("delivery.guarantee") {
208            cfg.delivery_guarantee = dg.parse().map_err(|_| {
209                ConnectorError::ConfigurationError(format!(
210                    "invalid delivery.guarantee: '{dg}' (expected 'at-least-once' or 'exactly-once')"
211                ))
212            })?;
213        }
214
215        cfg.transactional_id = config.get("transactional.id").map(String::from);
216
217        if let Some(v) = config.get("transaction.timeout.ms") {
218            let ms: u64 = v.parse().map_err(|_| {
219                ConnectorError::ConfigurationError(format!("invalid transaction.timeout.ms: '{v}'"))
220            })?;
221            cfg.transaction_timeout = Duration::from_millis(ms);
222        }
223
224        if let Some(a) = config.get("acks") {
225            cfg.acks = a.parse().map_err(|_| {
226                ConnectorError::ConfigurationError(format!(
227                    "invalid acks: '{a}' (expected 'all', '1', or '0')"
228                ))
229            })?;
230        }
231
232        if let Some(v) = config.get("max.in.flight.requests") {
233            cfg.max_in_flight = v.parse().map_err(|_| {
234                ConnectorError::ConfigurationError(format!("invalid max.in.flight.requests: '{v}'"))
235            })?;
236        }
237
238        if let Some(v) = config.get("delivery.timeout.ms") {
239            let ms: u64 = v.parse().map_err(|_| {
240                ConnectorError::ConfigurationError(format!("invalid delivery.timeout.ms: '{v}'"))
241            })?;
242            cfg.delivery_timeout = Duration::from_millis(ms);
243        }
244
245        cfg.key_column = config.get("key.column").map(String::from);
246
247        if let Some(p) = config.get("partitioner") {
248            cfg.partitioner = p.parse().map_err(|_| {
249                ConnectorError::ConfigurationError(format!(
250                    "invalid partitioner: '{p}' (expected 'key-hash', 'round-robin', or 'sticky')"
251                ))
252            })?;
253        }
254
255        if let Some(v) = config.get("linger.ms") {
256            cfg.linger_ms = v.parse().map_err(|_| {
257                ConnectorError::ConfigurationError(format!("invalid linger.ms: '{v}'"))
258            })?;
259        }
260
261        if let Some(v) = config.get("batch.size") {
262            cfg.batch_size = v.parse().map_err(|_| {
263                ConnectorError::ConfigurationError(format!("invalid batch.size: '{v}'"))
264            })?;
265        }
266
267        if let Some(v) = config.get("batch.num.messages") {
268            cfg.batch_num_messages = Some(v.parse().map_err(|_| {
269                ConnectorError::ConfigurationError(format!("invalid batch.num.messages: '{v}'"))
270            })?);
271        }
272
273        if let Some(c) = config.get("compression.type") {
274            cfg.compression = c.parse().map_err(|_| {
275                ConnectorError::ConfigurationError(format!("invalid compression.type: '{c}'"))
276            })?;
277        }
278
279        cfg.dlq_topic = config.get("dlq.topic").map(String::from);
280
281        if let Some(v) = config.get("flush.batch.size") {
282            cfg.flush_batch_size = v.parse().map_err(|_| {
283                ConnectorError::ConfigurationError(format!("invalid flush.batch.size: '{v}'"))
284            })?;
285        }
286
287        for (key, value) in config.properties_with_prefix("kafka.") {
288            cfg.kafka_properties.insert(key, value);
289        }
290
291        cfg.validate()?;
292        Ok(cfg)
293    }
294
295    /// Validates the configuration.
296    ///
297    /// # Errors
298    ///
299    /// Returns `ConnectorError::ConfigurationError` on invalid combinations.
300    pub fn validate(&self) -> Result<(), ConnectorError> {
301        if self.bootstrap_servers.is_empty() {
302            return Err(ConnectorError::missing_config("bootstrap.servers"));
303        }
304        if self.topic.is_empty() {
305            return Err(ConnectorError::missing_config("topic"));
306        }
307
308        if self.security_protocol.uses_sasl() && self.sasl_mechanism.is_none() {
309            return Err(ConnectorError::ConfigurationError(
310                "sasl.mechanism is required when security.protocol is sasl_plaintext or sasl_ssl"
311                    .into(),
312            ));
313        }
314
315        if let Some(mechanism) = &self.sasl_mechanism {
316            if mechanism.requires_credentials()
317                && (self.sasl_username.is_none() || self.sasl_password.is_none())
318            {
319                return Err(ConnectorError::ConfigurationError(format!(
320                    "sasl.username and sasl.password are required for {mechanism} mechanism"
321                )));
322            }
323        }
324
325        if self.security_protocol.uses_ssl() {
326            if let Some(ref ca) = self.ssl_ca_location {
327                if ca.is_empty() {
328                    return Err(ConnectorError::ConfigurationError(
329                        "ssl.ca.location cannot be empty when specified".into(),
330                    ));
331                }
332            }
333        }
334
335        if self.format == Format::Debezium {
336            return Err(ConnectorError::ConfigurationError(
337                "Debezium is a deserialization-only format and cannot be used for sinks".into(),
338            ));
339        }
340
341        if self.format == Format::Avro && self.schema_registry_url.is_none() {
342            return Err(ConnectorError::ConfigurationError(
343                "Avro format requires 'schema.registry.url'".into(),
344            ));
345        }
346
347        if self.max_in_flight == 0 {
348            return Err(ConnectorError::ConfigurationError(
349                "max.in.flight.requests must be > 0".into(),
350            ));
351        }
352
353        if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce && self.max_in_flight > 5 {
354            return Err(ConnectorError::ConfigurationError(
355                "exactly-once requires max.in.flight.requests <= 5".into(),
356            ));
357        }
358
359        Ok(())
360    }
361
362    /// Builds an rdkafka [`ClientConfig`] from this configuration.
363    ///
364    /// Always sets `enable.idempotence=true`. For exactly-once delivery,
365    /// also sets `transactional.id` and `transaction.timeout.ms`.
366    #[must_use]
367    pub fn to_rdkafka_config(&self) -> ClientConfig {
368        let mut config = ClientConfig::new();
369
370        config.set("bootstrap.servers", &self.bootstrap_servers);
371        config.set("security.protocol", self.security_protocol.as_rdkafka_str());
372
373        if let Some(ref mechanism) = self.sasl_mechanism {
374            config.set("sasl.mechanism", mechanism.as_rdkafka_str());
375        }
376
377        if let Some(ref username) = self.sasl_username {
378            config.set("sasl.username", username);
379        }
380
381        if let Some(ref password) = self.sasl_password {
382            config.set("sasl.password", password);
383        }
384
385        if let Some(ref ca) = self.ssl_ca_location {
386            config.set("ssl.ca.location", ca);
387        }
388
389        if let Some(ref cert) = self.ssl_certificate_location {
390            config.set("ssl.certificate.location", cert);
391        }
392
393        if let Some(ref key) = self.ssl_key_location {
394            config.set("ssl.key.location", key);
395        }
396
397        if let Some(ref key_pass) = self.ssl_key_password {
398            config.set("ssl.key.password", key_pass);
399        }
400
401        // librdkafka rejects a transactional producer whose
402        // message.timeout.ms exceeds transaction.timeout.ms (a message
403        // outliving its transaction could never commit) — and the
404        // defaults conflict (delivery 120s > transaction 60s), so an
405        // unclamped exactly-once config fails producer creation.
406        let message_timeout = if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
407            self.delivery_timeout.min(self.transaction_timeout)
408        } else {
409            self.delivery_timeout
410        };
411
412        config
413            .set("enable.idempotence", "true")
414            .set("acks", self.acks.as_rdkafka_str())
415            .set("linger.ms", self.linger_ms.to_string())
416            .set("batch.size", self.batch_size.to_string())
417            .set("compression.type", self.compression.as_rdkafka_str())
418            .set(
419                "max.in.flight.requests.per.connection",
420                self.max_in_flight.to_string(),
421            )
422            .set(
423                "message.timeout.ms",
424                message_timeout.as_millis().to_string(),
425            );
426
427        if let Some(num_msgs) = self.batch_num_messages {
428            config.set("batch.num.messages", num_msgs.to_string());
429        }
430
431        if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
432            let txn_id = self
433                .transactional_id
434                .clone()
435                .unwrap_or_else(|| format!("laminardb-sink-{}", self.topic));
436            config.set("transactional.id", txn_id);
437            config.set(
438                "transaction.timeout.ms",
439                self.transaction_timeout.as_millis().to_string(),
440            );
441        }
442
443        // Apply pass-through properties, blocking security-critical keys
444        // that could silently downgrade authentication or break semantics.
445        for (key, value) in &self.kafka_properties {
446            if is_blocked_passthrough_key(key) {
447                tracing::warn!(
448                    key,
449                    "ignoring kafka.* pass-through property that overrides a security setting"
450                );
451                continue;
452            }
453            config.set(key, value);
454        }
455
456        // Re-apply the transactional invariant AFTER pass-throughs: a
457        // user override of either timeout must not reintroduce
458        // message.timeout.ms > transaction.timeout.ms, which librdkafka
459        // rejects at producer creation.
460        if self.delivery_guarantee == DeliveryGuarantee::ExactlyOnce {
461            let get_ms = |cfg: &ClientConfig, key: &str| -> Option<u64> {
462                cfg.get(key).and_then(|v| v.parse().ok())
463            };
464            if let (Some(msg), Some(txn)) = (
465                get_ms(&config, "message.timeout.ms"),
466                get_ms(&config, "transaction.timeout.ms"),
467            ) {
468                if msg > txn {
469                    tracing::warn!(
470                        msg,
471                        txn,
472                        "clamping message.timeout.ms to transaction.timeout.ms \
473                         (librdkafka rejects transactional producers otherwise)"
474                    );
475                    config.set("message.timeout.ms", txn.to_string());
476                }
477            }
478        }
479
480        config
481    }
482
483    /// Builds an rdkafka [`ClientConfig`] for the dead letter queue producer.
484    ///
485    /// Inherits security settings (SASL, SSL) from the main config but is
486    /// non-transactional. Does not set `transactional.id`.
487    #[must_use]
488    pub fn to_dlq_rdkafka_config(&self) -> ClientConfig {
489        let mut config = ClientConfig::new();
490
491        config.set("bootstrap.servers", &self.bootstrap_servers);
492        config.set("security.protocol", self.security_protocol.as_rdkafka_str());
493
494        if let Some(ref mechanism) = self.sasl_mechanism {
495            config.set("sasl.mechanism", mechanism.as_rdkafka_str());
496        }
497        if let Some(ref username) = self.sasl_username {
498            config.set("sasl.username", username);
499        }
500        if let Some(ref password) = self.sasl_password {
501            config.set("sasl.password", password);
502        }
503        if let Some(ref ca) = self.ssl_ca_location {
504            config.set("ssl.ca.location", ca);
505        }
506        if let Some(ref cert) = self.ssl_certificate_location {
507            config.set("ssl.certificate.location", cert);
508        }
509        if let Some(ref key) = self.ssl_key_location {
510            config.set("ssl.key.location", key);
511        }
512        if let Some(ref key_pass) = self.ssl_key_password {
513            config.set("ssl.key.password", key_pass);
514        }
515
516        config.set("enable.idempotence", "true");
517
518        config
519    }
520}
521
522/// Returns `true` if a pass-through kafka.* key must not override explicit settings.
523fn is_blocked_passthrough_key(key: &str) -> bool {
524    key.starts_with("sasl.kerberos.")
525        || matches!(
526            key,
527            "security.protocol"
528                | "sasl.mechanism"
529                | "sasl.username"
530                | "sasl.password"
531                | "sasl.oauthbearer.config"
532                | "ssl.ca.location"
533                | "ssl.certificate.location"
534                | "ssl.key.location"
535                | "ssl.key.password"
536                | "ssl.endpoint.identification.algorithm"
537                | "enable.auto.commit"
538                | "enable.idempotence"
539                | "transactional.id"
540        )
541}
542
543pub use crate::connector::DeliveryGuarantee;
544
545/// Partitioning strategy for distributing records across Kafka partitions.
546#[derive(Debug, Clone, Copy, PartialEq, Eq)]
547pub enum PartitionStrategy {
548    /// Hash the key column (Murmur2, Kafka-compatible).
549    KeyHash,
550    /// Round-robin across all partitions.
551    RoundRobin,
552    /// Sticky: batch records to the same partition until full.
553    Sticky,
554}
555
556str_enum!(PartitionStrategy, lowercase_udash, String, "unknown partition strategy",
557    KeyHash => "key-hash", "keyhash", "hash";
558    RoundRobin => "round-robin", "roundrobin";
559    Sticky => "sticky"
560);
561
562/// Compression type for produced Kafka messages.
563#[derive(Debug, Clone, Copy, PartialEq, Eq)]
564pub enum CompressionType {
565    /// No compression.
566    None,
567    /// Gzip compression.
568    Gzip,
569    /// Snappy compression.
570    Snappy,
571    /// LZ4 compression.
572    Lz4,
573    /// Zstandard compression.
574    Zstd,
575}
576
577impl CompressionType {
578    /// Returns the rdkafka configuration string.
579    #[must_use]
580    pub fn as_rdkafka_str(&self) -> &'static str {
581        match self {
582            Self::None => "none",
583            Self::Gzip => "gzip",
584            Self::Snappy => "snappy",
585            Self::Lz4 => "lz4",
586            Self::Zstd => "zstd",
587        }
588    }
589}
590
591str_enum!(fromstr CompressionType, lowercase_nodash, String, "unknown compression type",
592    None => "none";
593    Gzip => "gzip";
594    Snappy => "snappy";
595    Lz4 => "lz4";
596    Zstd => "zstd", "zstandard"
597);
598
599impl std::fmt::Display for CompressionType {
600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601        write!(f, "{}", self.as_rdkafka_str())
602    }
603}
604
605/// Acknowledgment level for Kafka producer.
606#[derive(Debug, Clone, Copy, PartialEq, Eq)]
607pub enum Acks {
608    /// No acknowledgment (fire-and-forget).
609    None,
610    /// Leader acknowledgment only.
611    Leader,
612    /// All in-sync replica acknowledgment.
613    All,
614}
615
616impl Acks {
617    /// Returns the rdkafka configuration string.
618    #[must_use]
619    pub fn as_rdkafka_str(&self) -> &'static str {
620        match self {
621            Self::None => "0",
622            Self::Leader => "1",
623            Self::All => "all",
624        }
625    }
626}
627
628str_enum!(fromstr Acks, lowercase_nodash, String, "unknown acks value",
629    None => "0", "none";
630    Leader => "1", "leader";
631    All => "-1", "all"
632);
633
634impl std::fmt::Display for Acks {
635    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
636        write!(f, "{}", self.as_rdkafka_str())
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643
644    fn make_config(pairs: &[(&str, &str)]) -> ConnectorConfig {
645        let mut config = ConnectorConfig::new("kafka");
646        for (k, v) in pairs {
647            config.set(*k, *v);
648        }
649        config
650    }
651
652    fn required_pairs() -> Vec<(&'static str, &'static str)> {
653        vec![
654            ("bootstrap.servers", "localhost:9092"),
655            ("topic", "output-events"),
656        ]
657    }
658
659    #[test]
660    fn test_parse_required_fields() {
661        let config = make_config(&required_pairs());
662        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
663        assert_eq!(cfg.bootstrap_servers, "localhost:9092");
664        assert_eq!(cfg.topic, "output-events");
665        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
666        assert_eq!(cfg.format, Format::Json);
667        assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
668    }
669
670    #[test]
671    fn test_missing_bootstrap_servers() {
672        let config = make_config(&[("topic", "t")]);
673        assert!(KafkaSinkConfig::from_config(&config).is_err());
674    }
675
676    #[test]
677    fn test_missing_topic() {
678        let config = make_config(&[("bootstrap.servers", "b:9092")]);
679        assert!(KafkaSinkConfig::from_config(&config).is_err());
680    }
681
682    #[test]
683    fn test_parse_delivery_guarantee() {
684        let mut pairs = required_pairs();
685        pairs.push(("delivery.guarantee", "exactly-once"));
686        let config = make_config(&pairs);
687        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
688        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
689    }
690
691    #[test]
692    fn test_parse_security_sasl_ssl() {
693        let mut pairs = required_pairs();
694        pairs.extend_from_slice(&[
695            ("security.protocol", "sasl_ssl"),
696            ("sasl.mechanism", "SCRAM-SHA-512"),
697            ("sasl.username", "producer"),
698            ("sasl.password", "secret123"),
699            ("ssl.ca.location", "/etc/ssl/ca.pem"),
700        ]);
701        let config = make_config(&pairs);
702        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
703
704        assert_eq!(cfg.security_protocol, SecurityProtocol::SaslSsl);
705        assert_eq!(cfg.sasl_mechanism, Some(SaslMechanism::ScramSha512));
706        assert_eq!(cfg.sasl_username, Some("producer".to_string()));
707        assert_eq!(cfg.sasl_password, Some("secret123".to_string()));
708        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
709    }
710
711    #[test]
712    fn test_parse_security_ssl_only() {
713        let mut pairs = required_pairs();
714        pairs.extend_from_slice(&[
715            ("security.protocol", "ssl"),
716            ("ssl.ca.location", "/etc/ssl/ca.pem"),
717            ("ssl.certificate.location", "/etc/ssl/client.pem"),
718            ("ssl.key.location", "/etc/ssl/client.key"),
719            ("ssl.key.password", "keypass"),
720        ]);
721        let config = make_config(&pairs);
722        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
723
724        assert_eq!(cfg.security_protocol, SecurityProtocol::Ssl);
725        assert_eq!(cfg.ssl_ca_location, Some("/etc/ssl/ca.pem".to_string()));
726        assert_eq!(
727            cfg.ssl_certificate_location,
728            Some("/etc/ssl/client.pem".to_string())
729        );
730        assert_eq!(
731            cfg.ssl_key_location,
732            Some("/etc/ssl/client.key".to_string())
733        );
734        assert_eq!(cfg.ssl_key_password, Some("keypass".to_string()));
735    }
736
737    #[test]
738    fn test_parse_all_optional_fields() {
739        let mut pairs = required_pairs();
740        pairs.extend_from_slice(&[
741            ("format", "avro"),
742            ("delivery.guarantee", "exactly-once"),
743            ("transactional.id", "my-txn"),
744            ("transaction.timeout.ms", "30000"),
745            ("key.column", "order_id"),
746            ("partitioner", "round-robin"),
747            ("linger.ms", "10"),
748            ("batch.size", "32768"),
749            ("batch.num.messages", "5000"),
750            ("compression.type", "zstd"),
751            ("acks", "1"),
752            ("max.in.flight.requests", "3"),
753            ("delivery.timeout.ms", "60000"),
754            ("dlq.topic", "my-dlq"),
755            ("flush.batch.size", "500"),
756            ("schema.registry.url", "http://sr:8081"),
757            ("schema.registry.username", "user"),
758            ("schema.registry.password", "pass"),
759            ("schema.registry.ssl.ca.location", "/etc/ssl/sr-ca.pem"),
760        ]);
761        let config = make_config(&pairs);
762        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
763
764        assert_eq!(cfg.format, Format::Avro);
765        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::ExactlyOnce);
766        assert_eq!(cfg.transactional_id.as_deref(), Some("my-txn"));
767        assert_eq!(cfg.transaction_timeout, Duration::from_secs(30));
768        assert_eq!(cfg.key_column.as_deref(), Some("order_id"));
769        assert_eq!(cfg.partitioner, PartitionStrategy::RoundRobin);
770        assert_eq!(cfg.linger_ms, 10);
771        assert_eq!(cfg.batch_size, 32_768);
772        assert_eq!(cfg.batch_num_messages, Some(5000));
773        assert_eq!(cfg.compression, CompressionType::Zstd);
774        assert_eq!(cfg.acks, Acks::Leader);
775        assert_eq!(cfg.max_in_flight, 3);
776        assert_eq!(cfg.delivery_timeout, Duration::from_secs(60));
777        assert_eq!(cfg.dlq_topic.as_deref(), Some("my-dlq"));
778        assert_eq!(cfg.flush_batch_size, 500);
779        assert_eq!(cfg.schema_registry_url.as_deref(), Some("http://sr:8081"));
780        assert!(cfg.schema_registry_auth.is_some());
781        assert_eq!(
782            cfg.schema_registry_ssl_ca_location,
783            Some("/etc/ssl/sr-ca.pem".to_string())
784        );
785    }
786
787    #[test]
788    fn test_validate_avro_requires_sr() {
789        let mut cfg = KafkaSinkConfig::default();
790        cfg.bootstrap_servers = "b:9092".into();
791        cfg.topic = "t".into();
792        cfg.format = Format::Avro;
793        assert!(cfg.validate().is_err());
794    }
795
796    #[test]
797    fn test_validate_exactly_once_max_in_flight() {
798        let mut cfg = KafkaSinkConfig::default();
799        cfg.bootstrap_servers = "b:9092".into();
800        cfg.topic = "t".into();
801        cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
802        cfg.max_in_flight = 10;
803        assert!(cfg.validate().is_err());
804    }
805
806    #[test]
807    fn test_validate_sasl_without_mechanism() {
808        let mut cfg = KafkaSinkConfig::default();
809        cfg.bootstrap_servers = "b:9092".into();
810        cfg.topic = "t".into();
811        cfg.security_protocol = SecurityProtocol::SaslSsl;
812        // sasl_mechanism not set
813        assert!(cfg.validate().is_err());
814    }
815
816    #[test]
817    fn test_validate_sasl_plain_without_credentials() {
818        let mut cfg = KafkaSinkConfig::default();
819        cfg.bootstrap_servers = "b:9092".into();
820        cfg.topic = "t".into();
821        cfg.security_protocol = SecurityProtocol::SaslPlaintext;
822        cfg.sasl_mechanism = Some(SaslMechanism::ScramSha256);
823        // username/password not set
824        assert!(cfg.validate().is_err());
825    }
826
827    #[test]
828    fn test_rdkafka_config_at_least_once() {
829        let mut cfg = KafkaSinkConfig::default();
830        cfg.bootstrap_servers = "b:9092".into();
831        cfg.topic = "t".into();
832        let rdk = cfg.to_rdkafka_config();
833        assert_eq!(rdk.get("enable.idempotence"), Some("true"));
834        assert!(rdk.get("transactional.id").is_none());
835        assert_eq!(rdk.get("security.protocol"), Some("plaintext"));
836    }
837
838    #[test]
839    fn test_rdkafka_config_exactly_once() {
840        let mut cfg = KafkaSinkConfig::default();
841        cfg.bootstrap_servers = "b:9092".into();
842        cfg.topic = "t".into();
843        cfg.delivery_guarantee = DeliveryGuarantee::ExactlyOnce;
844        let rdk = cfg.to_rdkafka_config();
845        assert_eq!(rdk.get("enable.idempotence"), Some("true"));
846        assert!(rdk.get("transactional.id").is_some());
847        // librdkafka rejects message.timeout.ms > transaction.timeout.ms;
848        // the default 120s delivery timeout must clamp to the 60s
849        // transaction timeout or producer creation fails outright.
850        assert_eq!(rdk.get("message.timeout.ms"), Some("60000"));
851        assert_eq!(rdk.get("transaction.timeout.ms"), Some("60000"));
852    }
853
854    #[test]
855    fn test_rdkafka_config_with_security() {
856        let mut cfg = KafkaSinkConfig::default();
857        cfg.bootstrap_servers = "b:9092".into();
858        cfg.topic = "t".into();
859        cfg.security_protocol = SecurityProtocol::SaslSsl;
860        cfg.sasl_mechanism = Some(SaslMechanism::Plain);
861        cfg.sasl_username = Some("user".into());
862        cfg.sasl_password = Some("pass".into());
863        cfg.ssl_ca_location = Some("/ca.pem".into());
864
865        let rdk = cfg.to_rdkafka_config();
866        assert_eq!(rdk.get("security.protocol"), Some("sasl_ssl"));
867        assert_eq!(rdk.get("sasl.mechanism"), Some("PLAIN"));
868        assert_eq!(rdk.get("sasl.username"), Some("user"));
869        assert_eq!(rdk.get("sasl.password"), Some("pass"));
870        assert_eq!(rdk.get("ssl.ca.location"), Some("/ca.pem"));
871    }
872
873    #[test]
874    fn test_rdkafka_config_with_batch_num_messages() {
875        let mut cfg = KafkaSinkConfig::default();
876        cfg.bootstrap_servers = "b:9092".into();
877        cfg.topic = "t".into();
878        cfg.batch_num_messages = Some(10_000);
879
880        let rdk = cfg.to_rdkafka_config();
881        assert_eq!(rdk.get("batch.num.messages"), Some("10000"));
882    }
883
884    #[test]
885    fn test_kafka_passthrough_properties() {
886        let mut pairs = required_pairs();
887        pairs.push(("kafka.socket.timeout.ms", "5000"));
888        pairs.push(("kafka.queue.buffering.max.messages", "100000"));
889        let config = make_config(&pairs);
890        let cfg = KafkaSinkConfig::from_config(&config).unwrap();
891        assert_eq!(
892            cfg.kafka_properties.get("socket.timeout.ms").unwrap(),
893            "5000"
894        );
895    }
896
897    #[test]
898    fn test_defaults() {
899        let cfg = KafkaSinkConfig::default();
900        assert_eq!(cfg.delivery_guarantee, DeliveryGuarantee::AtLeastOnce);
901        assert_eq!(cfg.partitioner, PartitionStrategy::KeyHash);
902        assert_eq!(cfg.compression, CompressionType::None);
903        assert_eq!(cfg.acks, Acks::All);
904        assert_eq!(cfg.linger_ms, 5);
905        assert_eq!(cfg.batch_size, 16_384);
906        assert_eq!(cfg.max_in_flight, 5);
907        assert_eq!(cfg.flush_batch_size, 1_000);
908        assert_eq!(cfg.security_protocol, SecurityProtocol::Plaintext);
909        assert!(cfg.sasl_mechanism.is_none());
910        assert!(cfg.batch_num_messages.is_none());
911    }
912
913    #[test]
914    fn test_enum_display() {
915        assert_eq!(DeliveryGuarantee::AtLeastOnce.to_string(), "at-least-once");
916        assert_eq!(DeliveryGuarantee::ExactlyOnce.to_string(), "exactly-once");
917        assert_eq!(PartitionStrategy::KeyHash.to_string(), "key-hash");
918        assert_eq!(PartitionStrategy::RoundRobin.to_string(), "round-robin");
919        assert_eq!(PartitionStrategy::Sticky.to_string(), "sticky");
920        assert_eq!(CompressionType::Zstd.to_string(), "zstd");
921        assert_eq!(Acks::All.to_string(), "all");
922    }
923
924    #[test]
925    fn test_enum_parse() {
926        assert_eq!(
927            "at-least-once".parse::<DeliveryGuarantee>().unwrap(),
928            DeliveryGuarantee::AtLeastOnce
929        );
930        assert_eq!(
931            "exactly-once".parse::<DeliveryGuarantee>().unwrap(),
932            DeliveryGuarantee::ExactlyOnce
933        );
934        assert_eq!(
935            "key-hash".parse::<PartitionStrategy>().unwrap(),
936            PartitionStrategy::KeyHash
937        );
938        assert_eq!(
939            "round-robin".parse::<PartitionStrategy>().unwrap(),
940            PartitionStrategy::RoundRobin
941        );
942        assert_eq!(
943            "sticky".parse::<PartitionStrategy>().unwrap(),
944            PartitionStrategy::Sticky
945        );
946        assert_eq!(
947            "gzip".parse::<CompressionType>().unwrap(),
948            CompressionType::Gzip
949        );
950        assert_eq!(
951            "snappy".parse::<CompressionType>().unwrap(),
952            CompressionType::Snappy
953        );
954        assert_eq!(
955            "lz4".parse::<CompressionType>().unwrap(),
956            CompressionType::Lz4
957        );
958        assert_eq!(
959            "zstd".parse::<CompressionType>().unwrap(),
960            CompressionType::Zstd
961        );
962        assert_eq!("all".parse::<Acks>().unwrap(), Acks::All);
963        assert_eq!("1".parse::<Acks>().unwrap(), Acks::Leader);
964        assert_eq!("0".parse::<Acks>().unwrap(), Acks::None);
965    }
966}