Skip to main content

laminar_connectors/kafka/
mod.rs

1//! Kafka source and sink connectors.
2
3// Source modules
4pub mod avro;
5pub mod config;
6pub mod metrics;
7pub mod offsets;
8pub mod rebalance;
9pub mod source;
10pub mod watermarks;
11
12// Sink modules
13pub mod avro_serializer;
14pub mod partitioner;
15pub mod sink;
16pub mod sink_config;
17pub mod sink_metrics;
18
19// Shared modules
20pub mod schema_registry;
21
22// Source re-exports
23pub use avro::AvroDeserializer;
24pub use config::{
25    AssignmentStrategy, CompatibilityLevel, IsolationLevel, KafkaSourceConfig, OffsetReset,
26    SaslMechanism, SchemaEvolutionStrategy, SecurityProtocol, SrAuth, StartupMode,
27    TopicSubscription,
28};
29pub use metrics::KafkaSourceMetrics;
30pub use offsets::OffsetTracker;
31pub use source::KafkaSource;
32pub use watermarks::{KafkaWatermarkTracker, WatermarkMetrics};
33
34// Sink re-exports
35pub use avro_serializer::AvroSerializer;
36pub use partitioner::{
37    KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
38};
39pub use sink::KafkaSink;
40pub use sink_config::{
41    Acks, CompressionType, DeliveryGuarantee, KafkaSinkConfig, PartitionStrategy,
42};
43pub use sink_metrics::KafkaSinkMetrics;
44
45// Shared re-exports
46pub use schema_registry::{CachedSchema, CompatibilityResult, SchemaRegistryClient, SchemaType};
47
48use std::sync::Arc;
49
50use crate::config::{ConfigKeySpec, ConnectorInfo};
51use crate::registry::ConnectorRegistry;
52
53/// Registers the Kafka source connector with the given registry.
54pub fn register_kafka_source(registry: &ConnectorRegistry) {
55    let info = ConnectorInfo {
56        name: "kafka".to_string(),
57        display_name: "Apache Kafka Source".to_string(),
58        version: env!("CARGO_PKG_VERSION").to_string(),
59        is_source: true,
60        is_sink: false,
61        config_keys: kafka_source_config_keys(),
62    };
63
64    registry.register_source(
65        "kafka",
66        info,
67        Arc::new(|registry: Option<&prometheus::Registry>| {
68            // Empty schema — filled in by discover_schema / open() / SQL DDL columns.
69            let empty = Arc::new(arrow_schema::Schema::empty());
70            Box::new(KafkaSource::new(
71                empty,
72                KafkaSourceConfig::default(),
73                registry,
74            ))
75        }),
76    );
77}
78
79/// Registers the Kafka sink connector with the given registry.
80pub fn register_kafka_sink(registry: &ConnectorRegistry) {
81    let info = ConnectorInfo {
82        name: "kafka".to_string(),
83        display_name: "Apache Kafka Sink".to_string(),
84        version: env!("CARGO_PKG_VERSION").to_string(),
85        is_source: false,
86        is_sink: true,
87        config_keys: kafka_sink_config_keys(),
88    };
89
90    registry.register_sink(
91        "kafka",
92        info,
93        Arc::new(|registry: Option<&prometheus::Registry>| {
94            // Empty schema — the sink's schema is bound from the upstream query at build time.
95            let empty = Arc::new(arrow_schema::Schema::empty());
96            Box::new(KafkaSink::new(empty, KafkaSinkConfig::default(), registry))
97        }),
98    );
99}
100
101/// Returns the configuration key specifications for the Kafka source.
102#[allow(clippy::too_many_lines)]
103fn kafka_source_config_keys() -> Vec<ConfigKeySpec> {
104    vec![
105        // Required
106        ConfigKeySpec::required("bootstrap.servers", "Kafka broker addresses"),
107        ConfigKeySpec::required("group.id", "Consumer group identifier"),
108        ConfigKeySpec::required("topic", "Comma-separated list of topics"),
109        // Topic subscription (alternative to 'topic')
110        ConfigKeySpec::optional("topic.pattern", "Regex pattern for topic subscription", ""),
111        // Format
112        ConfigKeySpec::optional("format", "Data format (json/csv/avro/raw/debezium)", "json"),
113        // Security
114        ConfigKeySpec::optional(
115            "security.protocol",
116            "Security protocol (plaintext/ssl/sasl_plaintext/sasl_ssl)",
117            "plaintext",
118        ),
119        ConfigKeySpec::optional(
120            "sasl.mechanism",
121            "SASL mechanism (PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/GSSAPI/OAUTHBEARER)",
122            "",
123        ),
124        ConfigKeySpec::optional("sasl.username", "SASL username for PLAIN/SCRAM", ""),
125        ConfigKeySpec::optional("sasl.password", "SASL password for PLAIN/SCRAM", ""),
126        ConfigKeySpec::optional("ssl.ca.location", "SSL CA certificate file path", ""),
127        ConfigKeySpec::optional(
128            "ssl.certificate.location",
129            "Client SSL certificate file path",
130            "",
131        ),
132        ConfigKeySpec::optional("ssl.key.location", "Client SSL private key file path", ""),
133        ConfigKeySpec::optional("ssl.key.password", "Password for encrypted SSL key", ""),
134        // Consumer tuning
135        ConfigKeySpec::optional(
136            "startup.mode",
137            "Startup mode (group-offsets/earliest/latest)",
138            "group-offsets",
139        ),
140        ConfigKeySpec::optional(
141            "startup.specific.offsets",
142            "Start from specific offsets (format: 'partition:offset,...')",
143            "",
144        ),
145        ConfigKeySpec::optional(
146            "startup.timestamp.ms",
147            "Start from timestamp (milliseconds since epoch)",
148            "",
149        ),
150        ConfigKeySpec::optional(
151            "auto.offset.reset",
152            "Fallback when no committed offset (earliest/latest/none)",
153            "earliest",
154        ),
155        ConfigKeySpec::optional(
156            "isolation.level",
157            "Transaction isolation (read_uncommitted/read_committed)",
158            "read_committed",
159        ),
160        ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
161        ConfigKeySpec::optional(
162            "partition.assignment.strategy",
163            "Partition assignment (range/roundrobin/cooperative-sticky)",
164            "range",
165        ),
166        // Consumer group timing
167        ConfigKeySpec::optional(
168            "session.timeout.ms",
169            "Consumer session timeout in milliseconds (production-safe default)",
170            "45000",
171        ),
172        ConfigKeySpec::optional(
173            "heartbeat.interval.ms",
174            "Consumer heartbeat interval in milliseconds",
175            "10000",
176        ),
177        ConfigKeySpec::optional(
178            "queued.max.messages.kbytes",
179            "Max per-partition pre-fetch queue size in kbytes",
180            "16384",
181        ),
182        // Fetch tuning
183        ConfigKeySpec::optional("fetch.min.bytes", "Minimum bytes per fetch request", "1"),
184        ConfigKeySpec::optional(
185            "fetch.max.bytes",
186            "Maximum bytes per fetch request",
187            "52428800",
188        ),
189        ConfigKeySpec::optional(
190            "fetch.max.wait.ms",
191            "Max wait time for fetch.min.bytes",
192            "500",
193        ),
194        ConfigKeySpec::optional(
195            "max.partition.fetch.bytes",
196            "Max bytes per partition per fetch",
197            "1048576",
198        ),
199        // Metadata
200        ConfigKeySpec::optional(
201            "include.metadata",
202            "Include _partition/_offset/_timestamp columns",
203            "false",
204        ),
205        ConfigKeySpec::optional("include.headers", "Include _headers column", "false"),
206        ConfigKeySpec::optional(
207            "event.time.column",
208            "Column name for event time extraction",
209            "",
210        ),
211        // Watermark
212        ConfigKeySpec::optional(
213            "max.out.of.orderness.ms",
214            "Max out-of-orderness for watermarks",
215            "5000",
216        ),
217        ConfigKeySpec::optional("idle.timeout.ms", "Idle partition timeout", "30000"),
218        ConfigKeySpec::optional(
219            "enable.watermark.tracking",
220            "Enable per-partition watermark tracking",
221            "false",
222        ),
223        // Backpressure
224        ConfigKeySpec::optional(
225            "backpressure.high.watermark",
226            "Channel fill ratio to pause",
227            "0.8",
228        ),
229        ConfigKeySpec::optional(
230            "backpressure.low.watermark",
231            "Channel fill ratio to resume",
232            "0.5",
233        ),
234        // Error handling
235        ConfigKeySpec::optional(
236            "max.deser.error.rate",
237            "Max tolerated deserialization error rate per batch (0.0-1.0)",
238            "0.5",
239        ),
240        // Schema Registry
241        ConfigKeySpec::optional(
242            "schema.registry.url",
243            "Confluent Schema Registry URL (required for Avro)",
244            "",
245        ),
246        ConfigKeySpec::optional("schema.registry.username", "Schema Registry username", ""),
247        ConfigKeySpec::optional("schema.registry.password", "Schema Registry password", ""),
248        ConfigKeySpec::optional(
249            "schema.registry.ssl.ca.location",
250            "Schema Registry SSL CA cert path",
251            "",
252        ),
253        ConfigKeySpec::optional(
254            "schema.registry.ssl.certificate.location",
255            "Schema Registry SSL client cert path",
256            "",
257        ),
258        ConfigKeySpec::optional(
259            "schema.registry.ssl.key.location",
260            "Schema Registry SSL client key path",
261            "",
262        ),
263        ConfigKeySpec::optional(
264            "schema.compatibility",
265            "Schema compatibility level override",
266            "",
267        ),
268        ConfigKeySpec::optional(
269            "schema.evolution.strategy",
270            "Runtime schema evolution handling (log/reject/ignore)",
271            "log",
272        ),
273    ]
274}
275
276/// Returns the configuration key specifications for the Kafka sink.
277fn kafka_sink_config_keys() -> Vec<ConfigKeySpec> {
278    vec![
279        // Required
280        ConfigKeySpec::required("bootstrap.servers", "Kafka broker addresses"),
281        ConfigKeySpec::required("topic", "Target Kafka topic"),
282        // Format
283        ConfigKeySpec::optional("format", "Serialization format (json/csv/avro/raw)", "json"),
284        // Security
285        ConfigKeySpec::optional(
286            "security.protocol",
287            "Security protocol (plaintext/ssl/sasl_plaintext/sasl_ssl)",
288            "plaintext",
289        ),
290        ConfigKeySpec::optional(
291            "sasl.mechanism",
292            "SASL mechanism (PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/GSSAPI/OAUTHBEARER)",
293            "",
294        ),
295        ConfigKeySpec::optional("sasl.username", "SASL username for PLAIN/SCRAM", ""),
296        ConfigKeySpec::optional("sasl.password", "SASL password for PLAIN/SCRAM", ""),
297        ConfigKeySpec::optional("ssl.ca.location", "SSL CA certificate file path", ""),
298        ConfigKeySpec::optional(
299            "ssl.certificate.location",
300            "Client SSL certificate file path",
301            "",
302        ),
303        ConfigKeySpec::optional("ssl.key.location", "Client SSL private key file path", ""),
304        ConfigKeySpec::optional("ssl.key.password", "Password for encrypted SSL key", ""),
305        // Delivery & Transactions
306        ConfigKeySpec::optional(
307            "delivery.guarantee",
308            "Delivery guarantee (at-least-once/exactly-once)",
309            "at-least-once",
310        ),
311        ConfigKeySpec::optional(
312            "transactional.id",
313            "Transactional ID prefix (auto-generated if not set)",
314            "",
315        ),
316        ConfigKeySpec::optional(
317            "transaction.timeout.ms",
318            "Transaction timeout in milliseconds",
319            "60000",
320        ),
321        ConfigKeySpec::optional("acks", "Acknowledgment level (0/1/all)", "all"),
322        ConfigKeySpec::optional(
323            "max.in.flight.requests",
324            "Max in-flight requests (<=5 for exactly-once)",
325            "5",
326        ),
327        ConfigKeySpec::optional(
328            "delivery.timeout.ms",
329            "Delivery timeout in milliseconds",
330            "120000",
331        ),
332        // Partitioning
333        ConfigKeySpec::optional("key.column", "Column name to use as Kafka message key", ""),
334        ConfigKeySpec::optional(
335            "partitioner",
336            "Partitioning strategy (key-hash/round-robin/sticky)",
337            "key-hash",
338        ),
339        // Batching & Compression
340        ConfigKeySpec::optional("linger.ms", "Producer linger time in milliseconds", "5"),
341        ConfigKeySpec::optional("batch.size", "Producer batch size in bytes", "16384"),
342        ConfigKeySpec::optional("batch.num.messages", "Max messages per batch", "10000"),
343        ConfigKeySpec::optional(
344            "compression.type",
345            "Compression (none/gzip/snappy/lz4/zstd)",
346            "none",
347        ),
348        // Error Handling
349        ConfigKeySpec::optional(
350            "dlq.topic",
351            "Dead letter queue topic for failed records",
352            "",
353        ),
354        ConfigKeySpec::optional(
355            "flush.batch.size",
356            "Max records to buffer before flushing",
357            "1000",
358        ),
359        // Schema Registry
360        ConfigKeySpec::optional(
361            "schema.registry.url",
362            "Confluent Schema Registry URL (required for Avro)",
363            "",
364        ),
365        ConfigKeySpec::optional("schema.registry.username", "Schema Registry username", ""),
366        ConfigKeySpec::optional("schema.registry.password", "Schema Registry password", ""),
367        ConfigKeySpec::optional(
368            "schema.registry.ssl.ca.location",
369            "Schema Registry SSL CA cert path",
370            "",
371        ),
372        ConfigKeySpec::optional(
373            "schema.compatibility",
374            "Schema compatibility level override",
375            "",
376        ),
377    ]
378}
379
380/// Round-trip integration tests: serialize → deserialize → verify identity.
381#[cfg(test)]
382mod avro_roundtrip_tests {
383    use std::sync::Arc;
384
385    use arrow_array::{
386        BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
387    };
388    use arrow_schema::{DataType, Field, Schema, SchemaRef};
389
390    use super::avro::AvroDeserializer;
391    use super::avro_serializer::AvroSerializer;
392    use super::schema_registry::arrow_to_avro_schema;
393    use crate::serde::{RecordDeserializer, RecordSerializer};
394
395    /// Serializes a batch, deserializes each record, and asserts equality.
396    fn roundtrip(batch: &RecordBatch, schema: &SchemaRef) -> RecordBatch {
397        let avro_schema_json =
398            arrow_to_avro_schema(schema, "roundtrip_test").expect("Arrow→Avro schema");
399
400        let ser = AvroSerializer::new(schema.clone(), 1);
401        let records = ser.serialize(batch).expect("serialize");
402
403        let mut deser = AvroDeserializer::new();
404        deser
405            .register_schema(1, &avro_schema_json)
406            .expect("register schema");
407
408        let record_refs: Vec<&[u8]> = records.iter().map(Vec::as_slice).collect();
409        deser
410            .deserialize_batch(&record_refs, schema)
411            .expect("deserialize")
412    }
413
414    #[test]
415    fn test_roundtrip_primitives() {
416        let schema = Arc::new(Schema::new(vec![
417            Field::new("id", DataType::Int64, false),
418            Field::new("name", DataType::Utf8, false),
419            Field::new("price", DataType::Float64, false),
420        ]));
421        let batch = RecordBatch::try_new(
422            schema.clone(),
423            vec![
424                Arc::new(Int64Array::from(vec![1, 2, 3])),
425                Arc::new(StringArray::from(vec!["AAPL", "GOOG", "MSFT"])),
426                Arc::new(Float64Array::from(vec![150.0, 2800.0, 300.0])),
427            ],
428        )
429        .unwrap();
430
431        let result = roundtrip(&batch, &schema);
432        assert_eq!(result.num_rows(), 3);
433        assert_eq!(result.num_columns(), 3);
434
435        let ids = result
436            .column(0)
437            .as_any()
438            .downcast_ref::<Int64Array>()
439            .unwrap();
440        assert_eq!(ids.value(0), 1);
441        assert_eq!(ids.value(1), 2);
442        assert_eq!(ids.value(2), 3);
443
444        let names = result
445            .column(1)
446            .as_any()
447            .downcast_ref::<StringArray>()
448            .unwrap();
449        assert_eq!(names.value(0), "AAPL");
450        assert_eq!(names.value(1), "GOOG");
451        assert_eq!(names.value(2), "MSFT");
452
453        let prices = result
454            .column(2)
455            .as_any()
456            .downcast_ref::<Float64Array>()
457            .unwrap();
458        assert!((prices.value(0) - 150.0).abs() < f64::EPSILON);
459        assert!((prices.value(1) - 2800.0).abs() < f64::EPSILON);
460    }
461
462    #[test]
463    fn test_roundtrip_all_primitive_types() {
464        let schema = Arc::new(Schema::new(vec![
465            Field::new("b", DataType::Boolean, false),
466            Field::new("i32", DataType::Int32, false),
467            Field::new("i64", DataType::Int64, false),
468            Field::new("f32", DataType::Float32, false),
469            Field::new("f64", DataType::Float64, false),
470            Field::new("s", DataType::Utf8, false),
471        ]));
472        let batch = RecordBatch::try_new(
473            schema.clone(),
474            vec![
475                Arc::new(BooleanArray::from(vec![true, false])),
476                Arc::new(Int32Array::from(vec![42, -1])),
477                Arc::new(Int64Array::from(vec![100_000_000, -999])),
478                Arc::new(Float32Array::from(vec![3.14f32, -0.001f32])),
479                Arc::new(Float64Array::from(vec![2.718, 1e10])),
480                Arc::new(StringArray::from(vec!["hello", "world"])),
481            ],
482        )
483        .unwrap();
484
485        let result = roundtrip(&batch, &schema);
486        assert_eq!(result.num_rows(), 2);
487        assert_eq!(result.num_columns(), 6);
488
489        let bools = result
490            .column(0)
491            .as_any()
492            .downcast_ref::<BooleanArray>()
493            .unwrap();
494        assert!(bools.value(0));
495        assert!(!bools.value(1));
496
497        let ints = result
498            .column(1)
499            .as_any()
500            .downcast_ref::<Int32Array>()
501            .unwrap();
502        assert_eq!(ints.value(0), 42);
503        assert_eq!(ints.value(1), -1);
504    }
505
506    #[test]
507    fn test_roundtrip_single_row() {
508        let schema = Arc::new(Schema::new(vec![
509            Field::new("id", DataType::Int64, false),
510            Field::new("val", DataType::Utf8, false),
511        ]));
512        let batch = RecordBatch::try_new(
513            schema.clone(),
514            vec![
515                Arc::new(Int64Array::from(vec![99])),
516                Arc::new(StringArray::from(vec!["single"])),
517            ],
518        )
519        .unwrap();
520
521        let result = roundtrip(&batch, &schema);
522        assert_eq!(result.num_rows(), 1);
523        let val = result
524            .column(1)
525            .as_any()
526            .downcast_ref::<StringArray>()
527            .unwrap();
528        assert_eq!(val.value(0), "single");
529    }
530
531    #[test]
532    fn test_roundtrip_confluent_wire_format() {
533        let schema = Arc::new(Schema::new(vec![
534            Field::new("id", DataType::Int64, false),
535            Field::new("name", DataType::Utf8, false),
536        ]));
537        let batch = RecordBatch::try_new(
538            schema.clone(),
539            vec![
540                Arc::new(Int64Array::from(vec![1, 2])),
541                Arc::new(StringArray::from(vec!["a", "b"])),
542            ],
543        )
544        .unwrap();
545
546        let ser = AvroSerializer::new(schema.clone(), 42);
547        let records = ser.serialize(&batch).unwrap();
548
549        // Verify Confluent wire format: 0x00 + 4-byte BE schema ID.
550        for record in &records {
551            assert!(record.len() >= 5, "record too short");
552            assert_eq!(record[0], 0x00, "magic byte");
553            let schema_id = u32::from_be_bytes([record[1], record[2], record[3], record[4]]);
554            assert_eq!(schema_id, 42, "schema ID in header");
555        }
556
557        // Also verify round-trip works with schema ID 42.
558        let avro_schema_json = arrow_to_avro_schema(&schema, "test").unwrap();
559        let mut deser = AvroDeserializer::new();
560        deser.register_schema(42, &avro_schema_json).unwrap();
561
562        let record_refs: Vec<&[u8]> = records.iter().map(Vec::as_slice).collect();
563        let result = deser.deserialize_batch(&record_refs, &schema).unwrap();
564        assert_eq!(result.num_rows(), 2);
565    }
566
567    #[test]
568    fn test_roundtrip_empty_batch() {
569        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
570        let batch = RecordBatch::new_empty(schema.clone());
571
572        let ser = AvroSerializer::new(schema.clone(), 1);
573        let records = ser.serialize(&batch).unwrap();
574        assert!(records.is_empty());
575    }
576
577    #[test]
578    fn test_roundtrip_many_rows() {
579        let schema = Arc::new(Schema::new(vec![
580            Field::new("idx", DataType::Int64, false),
581            Field::new("label", DataType::Utf8, false),
582        ]));
583        let n = 100;
584        let ids: Vec<i64> = (0..n).collect();
585        let labels: Vec<String> = (0..n).map(|i| format!("row-{i}")).collect();
586        let batch = RecordBatch::try_new(
587            schema.clone(),
588            vec![
589                Arc::new(Int64Array::from(ids)),
590                Arc::new(StringArray::from(labels)),
591            ],
592        )
593        .unwrap();
594
595        let result = roundtrip(&batch, &schema);
596        assert_eq!(result.num_rows(), n as usize);
597
598        let ids = result
599            .column(0)
600            .as_any()
601            .downcast_ref::<Int64Array>()
602            .unwrap();
603        for i in 0..n as usize {
604            assert_eq!(ids.value(i), i as i64);
605        }
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612
613    #[test]
614    fn test_register_kafka_source() {
615        let registry = ConnectorRegistry::new();
616        register_kafka_source(&registry);
617
618        let sources = registry.list_sources();
619        assert!(sources.contains(&"kafka".to_string()));
620
621        let info = registry.source_info("kafka");
622        assert!(info.is_some());
623        let info = info.unwrap();
624        assert_eq!(info.name, "kafka");
625        assert!(info.is_source);
626        assert!(!info.is_sink);
627        assert!(!info.config_keys.is_empty());
628    }
629
630    #[test]
631    fn test_factory_creates_source() {
632        let registry = ConnectorRegistry::new();
633        register_kafka_source(&registry);
634
635        let config = crate::config::ConnectorConfig::new("kafka");
636        let source = registry.create_source(&config, None);
637        assert!(source.is_ok());
638    }
639
640    #[test]
641    fn test_register_kafka_sink() {
642        let registry = ConnectorRegistry::new();
643        register_kafka_sink(&registry);
644
645        let sinks = registry.list_sinks();
646        assert!(sinks.contains(&"kafka".to_string()));
647
648        let info = registry.sink_info("kafka");
649        assert!(info.is_some());
650        let info = info.unwrap();
651        assert_eq!(info.name, "kafka");
652        assert!(!info.is_source);
653        assert!(info.is_sink);
654        assert!(!info.config_keys.is_empty());
655    }
656
657    #[test]
658    fn test_factory_creates_sink() {
659        let registry = ConnectorRegistry::new();
660        register_kafka_sink(&registry);
661
662        let config = crate::config::ConnectorConfig::new("kafka");
663        let sink = registry.create_sink(&config, None);
664        assert!(sink.is_ok());
665    }
666}