1pub mod avro;
43pub mod backpressure;
44pub mod config;
45pub mod metrics;
46pub mod offsets;
47pub mod rebalance;
48pub mod source;
49pub mod watermarks;
50
51pub mod avro_serializer;
53pub mod partitioner;
54pub mod sink;
55pub mod sink_config;
56pub mod sink_metrics;
57
58pub mod schema_registry;
60
61#[cfg(feature = "kafka-discovery")]
63pub mod discovery;
64
65pub use avro::AvroDeserializer;
67pub use config::{
68 AssignmentStrategy, CompatibilityLevel, IsolationLevel, KafkaSourceConfig, OffsetReset,
69 SaslMechanism, SecurityProtocol, SrAuth, StartupMode, TopicSubscription,
70};
71pub use metrics::KafkaSourceMetrics;
72pub use offsets::OffsetTracker;
73pub use source::KafkaSource;
74pub use watermarks::{
75 AlignmentCheckResult, KafkaAlignmentConfig, KafkaAlignmentMode, KafkaWatermarkTracker,
76 WatermarkMetrics, WatermarkMetricsSnapshot,
77};
78
79pub use avro_serializer::AvroSerializer;
81pub use partitioner::{
82 KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
83};
84pub use sink::KafkaSink;
85pub use sink_config::{
86 Acks, CompressionType, DeliveryGuarantee, KafkaSinkConfig, PartitionStrategy,
87};
88pub use sink_metrics::KafkaSinkMetrics;
89
90pub use schema_registry::{CachedSchema, CompatibilityResult, SchemaRegistryClient, SchemaType};
92
93use std::sync::Arc;
94
95use crate::config::{ConfigKeySpec, ConnectorInfo};
96use crate::registry::ConnectorRegistry;
97
98pub fn register_kafka_source(registry: &ConnectorRegistry) {
103 let info = ConnectorInfo {
104 name: "kafka".to_string(),
105 display_name: "Apache Kafka Source".to_string(),
106 version: env!("CARGO_PKG_VERSION").to_string(),
107 is_source: true,
108 is_sink: false,
109 config_keys: kafka_source_config_keys(),
110 };
111
112 registry.register_source(
113 "kafka",
114 info,
115 Arc::new(|| {
116 use arrow_schema::{DataType, Field, Schema};
117
118 let default_schema = Arc::new(Schema::new(vec![
120 Field::new("key", DataType::Utf8, true),
121 Field::new("value", DataType::Utf8, false),
122 ]));
123 Box::new(KafkaSource::new(
124 default_schema,
125 KafkaSourceConfig::default(),
126 ))
127 }),
128 );
129}
130
131pub fn register_kafka_sink(registry: &ConnectorRegistry) {
136 let info = ConnectorInfo {
137 name: "kafka".to_string(),
138 display_name: "Apache Kafka Sink".to_string(),
139 version: env!("CARGO_PKG_VERSION").to_string(),
140 is_source: false,
141 is_sink: true,
142 config_keys: kafka_sink_config_keys(),
143 };
144
145 registry.register_sink(
146 "kafka",
147 info,
148 Arc::new(|| {
149 use arrow_schema::{DataType, Field, Schema};
150
151 let default_schema = Arc::new(Schema::new(vec![
153 Field::new("key", DataType::Utf8, true),
154 Field::new("value", DataType::Utf8, false),
155 ]));
156 Box::new(KafkaSink::new(default_schema, KafkaSinkConfig::default()))
157 }),
158 );
159}
160
161#[allow(clippy::too_many_lines)]
163fn kafka_source_config_keys() -> Vec<ConfigKeySpec> {
164 vec![
165 ConfigKeySpec::required("bootstrap.servers", "Kafka broker addresses"),
167 ConfigKeySpec::required("group.id", "Consumer group identifier"),
168 ConfigKeySpec::required("topic", "Comma-separated list of topics"),
169 ConfigKeySpec::optional("topic.pattern", "Regex pattern for topic subscription", ""),
171 ConfigKeySpec::optional("format", "Data format (json/csv/avro/raw/debezium)", "json"),
173 ConfigKeySpec::optional(
175 "security.protocol",
176 "Security protocol (plaintext/ssl/sasl_plaintext/sasl_ssl)",
177 "plaintext",
178 ),
179 ConfigKeySpec::optional(
180 "sasl.mechanism",
181 "SASL mechanism (PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/GSSAPI/OAUTHBEARER)",
182 "",
183 ),
184 ConfigKeySpec::optional("sasl.username", "SASL username for PLAIN/SCRAM", ""),
185 ConfigKeySpec::optional("sasl.password", "SASL password for PLAIN/SCRAM", ""),
186 ConfigKeySpec::optional("ssl.ca.location", "SSL CA certificate file path", ""),
187 ConfigKeySpec::optional(
188 "ssl.certificate.location",
189 "Client SSL certificate file path",
190 "",
191 ),
192 ConfigKeySpec::optional("ssl.key.location", "Client SSL private key file path", ""),
193 ConfigKeySpec::optional("ssl.key.password", "Password for encrypted SSL key", ""),
194 ConfigKeySpec::optional(
196 "startup.mode",
197 "Startup mode (group-offsets/earliest/latest)",
198 "group-offsets",
199 ),
200 ConfigKeySpec::optional(
201 "startup.specific.offsets",
202 "Start from specific offsets (format: 'partition:offset,...')",
203 "",
204 ),
205 ConfigKeySpec::optional(
206 "startup.timestamp.ms",
207 "Start from timestamp (milliseconds since epoch)",
208 "",
209 ),
210 ConfigKeySpec::optional(
211 "auto.offset.reset",
212 "Fallback when no committed offset (earliest/latest/none)",
213 "earliest",
214 ),
215 ConfigKeySpec::optional(
216 "isolation.level",
217 "Transaction isolation (read_uncommitted/read_committed)",
218 "read_committed",
219 ),
220 ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
221 ConfigKeySpec::optional("poll.timeout.ms", "Poll timeout in milliseconds", "100"),
222 ConfigKeySpec::optional(
223 "partition.assignment.strategy",
224 "Partition assignment (range/roundrobin/cooperative-sticky)",
225 "range",
226 ),
227 ConfigKeySpec::optional("fetch.min.bytes", "Minimum bytes per fetch request", "1"),
229 ConfigKeySpec::optional(
230 "fetch.max.bytes",
231 "Maximum bytes per fetch request",
232 "52428800",
233 ),
234 ConfigKeySpec::optional(
235 "fetch.max.wait.ms",
236 "Max wait time for fetch.min.bytes",
237 "500",
238 ),
239 ConfigKeySpec::optional(
240 "max.partition.fetch.bytes",
241 "Max bytes per partition per fetch",
242 "1048576",
243 ),
244 ConfigKeySpec::optional(
246 "include.metadata",
247 "Include _partition/_offset/_timestamp columns",
248 "false",
249 ),
250 ConfigKeySpec::optional("include.headers", "Include _headers column", "false"),
251 ConfigKeySpec::optional(
252 "event.time.column",
253 "Column name for event time extraction",
254 "",
255 ),
256 ConfigKeySpec::optional(
258 "max.out.of.orderness.ms",
259 "Max out-of-orderness for watermarks",
260 "5000",
261 ),
262 ConfigKeySpec::optional("idle.timeout.ms", "Idle partition timeout", "30000"),
263 ConfigKeySpec::optional(
264 "enable.watermark.tracking",
265 "Enable per-partition watermark tracking",
266 "false",
267 ),
268 ConfigKeySpec::optional(
269 "alignment.group.id",
270 "Alignment group ID for multi-source coordination",
271 "",
272 ),
273 ConfigKeySpec::optional(
274 "alignment.max.drift.ms",
275 "Maximum allowed drift between sources in alignment group",
276 "",
277 ),
278 ConfigKeySpec::optional(
279 "alignment.mode",
280 "Alignment enforcement mode (pause/warn-only/drop-excess)",
281 "pause",
282 ),
283 ConfigKeySpec::optional(
285 "backpressure.high.watermark",
286 "Channel fill ratio to pause",
287 "0.8",
288 ),
289 ConfigKeySpec::optional(
290 "backpressure.low.watermark",
291 "Channel fill ratio to resume",
292 "0.5",
293 ),
294 ConfigKeySpec::optional(
296 "schema.registry.url",
297 "Confluent Schema Registry URL (required for Avro)",
298 "",
299 ),
300 ConfigKeySpec::optional("schema.registry.username", "Schema Registry username", ""),
301 ConfigKeySpec::optional("schema.registry.password", "Schema Registry password", ""),
302 ConfigKeySpec::optional(
303 "schema.registry.ssl.ca.location",
304 "Schema Registry SSL CA cert path",
305 "",
306 ),
307 ConfigKeySpec::optional(
308 "schema.registry.ssl.certificate.location",
309 "Schema Registry SSL client cert path",
310 "",
311 ),
312 ConfigKeySpec::optional(
313 "schema.registry.ssl.key.location",
314 "Schema Registry SSL client key path",
315 "",
316 ),
317 ConfigKeySpec::optional(
318 "schema.compatibility",
319 "Schema compatibility level override",
320 "",
321 ),
322 ]
323}
324
325fn kafka_sink_config_keys() -> Vec<ConfigKeySpec> {
327 vec![
328 ConfigKeySpec::required("bootstrap.servers", "Kafka broker addresses"),
330 ConfigKeySpec::required("topic", "Target Kafka topic"),
331 ConfigKeySpec::optional("format", "Serialization format (json/csv/avro/raw)", "json"),
333 ConfigKeySpec::optional(
335 "security.protocol",
336 "Security protocol (plaintext/ssl/sasl_plaintext/sasl_ssl)",
337 "plaintext",
338 ),
339 ConfigKeySpec::optional(
340 "sasl.mechanism",
341 "SASL mechanism (PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/GSSAPI/OAUTHBEARER)",
342 "",
343 ),
344 ConfigKeySpec::optional("sasl.username", "SASL username for PLAIN/SCRAM", ""),
345 ConfigKeySpec::optional("sasl.password", "SASL password for PLAIN/SCRAM", ""),
346 ConfigKeySpec::optional("ssl.ca.location", "SSL CA certificate file path", ""),
347 ConfigKeySpec::optional(
348 "ssl.certificate.location",
349 "Client SSL certificate file path",
350 "",
351 ),
352 ConfigKeySpec::optional("ssl.key.location", "Client SSL private key file path", ""),
353 ConfigKeySpec::optional("ssl.key.password", "Password for encrypted SSL key", ""),
354 ConfigKeySpec::optional(
356 "delivery.guarantee",
357 "Delivery guarantee (at-least-once/exactly-once)",
358 "at-least-once",
359 ),
360 ConfigKeySpec::optional(
361 "transactional.id",
362 "Transactional ID prefix (auto-generated if not set)",
363 "",
364 ),
365 ConfigKeySpec::optional(
366 "transaction.timeout.ms",
367 "Transaction timeout in milliseconds",
368 "60000",
369 ),
370 ConfigKeySpec::optional("acks", "Acknowledgment level (0/1/all)", "all"),
371 ConfigKeySpec::optional(
372 "max.in.flight.requests",
373 "Max in-flight requests (<=5 for exactly-once)",
374 "5",
375 ),
376 ConfigKeySpec::optional(
377 "delivery.timeout.ms",
378 "Delivery timeout in milliseconds",
379 "120000",
380 ),
381 ConfigKeySpec::optional("key.column", "Column name to use as Kafka message key", ""),
383 ConfigKeySpec::optional(
384 "partitioner",
385 "Partitioning strategy (key-hash/round-robin/sticky)",
386 "key-hash",
387 ),
388 ConfigKeySpec::optional("linger.ms", "Producer linger time in milliseconds", "5"),
390 ConfigKeySpec::optional("batch.size", "Producer batch size in bytes", "16384"),
391 ConfigKeySpec::optional("batch.num.messages", "Max messages per batch", "10000"),
392 ConfigKeySpec::optional(
393 "compression.type",
394 "Compression (none/gzip/snappy/lz4/zstd)",
395 "none",
396 ),
397 ConfigKeySpec::optional(
399 "dlq.topic",
400 "Dead letter queue topic for failed records",
401 "",
402 ),
403 ConfigKeySpec::optional(
404 "flush.batch.size",
405 "Max records to buffer before flushing",
406 "1000",
407 ),
408 ConfigKeySpec::optional(
410 "schema.registry.url",
411 "Confluent Schema Registry URL (required for Avro)",
412 "",
413 ),
414 ConfigKeySpec::optional("schema.registry.username", "Schema Registry username", ""),
415 ConfigKeySpec::optional("schema.registry.password", "Schema Registry password", ""),
416 ConfigKeySpec::optional(
417 "schema.registry.ssl.ca.location",
418 "Schema Registry SSL CA cert path",
419 "",
420 ),
421 ConfigKeySpec::optional(
422 "schema.compatibility",
423 "Schema compatibility level override",
424 "",
425 ),
426 ]
427}
428
429#[cfg(test)]
431mod avro_roundtrip_tests {
432 use std::sync::Arc;
433
434 use arrow_array::{
435 BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
436 };
437 use arrow_schema::{DataType, Field, Schema, SchemaRef};
438
439 use super::avro::AvroDeserializer;
440 use super::avro_serializer::AvroSerializer;
441 use super::schema_registry::arrow_to_avro_schema;
442 use crate::serde::{RecordDeserializer, RecordSerializer};
443
444 fn roundtrip(batch: &RecordBatch, schema: &SchemaRef) -> RecordBatch {
446 let avro_schema_json =
447 arrow_to_avro_schema(schema, "roundtrip_test").expect("Arrow→Avro schema");
448
449 let ser = AvroSerializer::new(schema.clone(), 1);
450 let records = ser.serialize(batch).expect("serialize");
451
452 let mut deser = AvroDeserializer::new();
453 deser
454 .register_schema(1, &avro_schema_json)
455 .expect("register schema");
456
457 let record_refs: Vec<&[u8]> = records.iter().map(Vec::as_slice).collect();
458 deser
459 .deserialize_batch(&record_refs, schema)
460 .expect("deserialize")
461 }
462
463 #[test]
464 fn test_roundtrip_primitives() {
465 let schema = Arc::new(Schema::new(vec![
466 Field::new("id", DataType::Int64, false),
467 Field::new("name", DataType::Utf8, false),
468 Field::new("price", DataType::Float64, false),
469 ]));
470 let batch = RecordBatch::try_new(
471 schema.clone(),
472 vec![
473 Arc::new(Int64Array::from(vec![1, 2, 3])),
474 Arc::new(StringArray::from(vec!["AAPL", "GOOG", "MSFT"])),
475 Arc::new(Float64Array::from(vec![150.0, 2800.0, 300.0])),
476 ],
477 )
478 .unwrap();
479
480 let result = roundtrip(&batch, &schema);
481 assert_eq!(result.num_rows(), 3);
482 assert_eq!(result.num_columns(), 3);
483
484 let ids = result
485 .column(0)
486 .as_any()
487 .downcast_ref::<Int64Array>()
488 .unwrap();
489 assert_eq!(ids.value(0), 1);
490 assert_eq!(ids.value(1), 2);
491 assert_eq!(ids.value(2), 3);
492
493 let names = result
494 .column(1)
495 .as_any()
496 .downcast_ref::<StringArray>()
497 .unwrap();
498 assert_eq!(names.value(0), "AAPL");
499 assert_eq!(names.value(1), "GOOG");
500 assert_eq!(names.value(2), "MSFT");
501
502 let prices = result
503 .column(2)
504 .as_any()
505 .downcast_ref::<Float64Array>()
506 .unwrap();
507 assert!((prices.value(0) - 150.0).abs() < f64::EPSILON);
508 assert!((prices.value(1) - 2800.0).abs() < f64::EPSILON);
509 }
510
511 #[test]
512 fn test_roundtrip_all_primitive_types() {
513 let schema = Arc::new(Schema::new(vec![
514 Field::new("b", DataType::Boolean, false),
515 Field::new("i32", DataType::Int32, false),
516 Field::new("i64", DataType::Int64, false),
517 Field::new("f32", DataType::Float32, false),
518 Field::new("f64", DataType::Float64, false),
519 Field::new("s", DataType::Utf8, false),
520 ]));
521 let batch = RecordBatch::try_new(
522 schema.clone(),
523 vec![
524 Arc::new(BooleanArray::from(vec![true, false])),
525 Arc::new(Int32Array::from(vec![42, -1])),
526 Arc::new(Int64Array::from(vec![100_000_000, -999])),
527 Arc::new(Float32Array::from(vec![3.14f32, -0.001f32])),
528 Arc::new(Float64Array::from(vec![2.718, 1e10])),
529 Arc::new(StringArray::from(vec!["hello", "world"])),
530 ],
531 )
532 .unwrap();
533
534 let result = roundtrip(&batch, &schema);
535 assert_eq!(result.num_rows(), 2);
536 assert_eq!(result.num_columns(), 6);
537
538 let bools = result
539 .column(0)
540 .as_any()
541 .downcast_ref::<BooleanArray>()
542 .unwrap();
543 assert!(bools.value(0));
544 assert!(!bools.value(1));
545
546 let ints = result
547 .column(1)
548 .as_any()
549 .downcast_ref::<Int32Array>()
550 .unwrap();
551 assert_eq!(ints.value(0), 42);
552 assert_eq!(ints.value(1), -1);
553 }
554
555 #[test]
556 fn test_roundtrip_single_row() {
557 let schema = Arc::new(Schema::new(vec![
558 Field::new("id", DataType::Int64, false),
559 Field::new("val", DataType::Utf8, false),
560 ]));
561 let batch = RecordBatch::try_new(
562 schema.clone(),
563 vec![
564 Arc::new(Int64Array::from(vec![99])),
565 Arc::new(StringArray::from(vec!["single"])),
566 ],
567 )
568 .unwrap();
569
570 let result = roundtrip(&batch, &schema);
571 assert_eq!(result.num_rows(), 1);
572 let val = result
573 .column(1)
574 .as_any()
575 .downcast_ref::<StringArray>()
576 .unwrap();
577 assert_eq!(val.value(0), "single");
578 }
579
580 #[test]
581 fn test_roundtrip_confluent_wire_format() {
582 let schema = Arc::new(Schema::new(vec![
583 Field::new("id", DataType::Int64, false),
584 Field::new("name", DataType::Utf8, false),
585 ]));
586 let batch = RecordBatch::try_new(
587 schema.clone(),
588 vec![
589 Arc::new(Int64Array::from(vec![1, 2])),
590 Arc::new(StringArray::from(vec!["a", "b"])),
591 ],
592 )
593 .unwrap();
594
595 let ser = AvroSerializer::new(schema.clone(), 42);
596 let records = ser.serialize(&batch).unwrap();
597
598 for record in &records {
600 assert!(record.len() >= 5, "record too short");
601 assert_eq!(record[0], 0x00, "magic byte");
602 let schema_id = u32::from_be_bytes([record[1], record[2], record[3], record[4]]);
603 assert_eq!(schema_id, 42, "schema ID in header");
604 }
605
606 let avro_schema_json = arrow_to_avro_schema(&schema, "test").unwrap();
608 let mut deser = AvroDeserializer::new();
609 deser.register_schema(42, &avro_schema_json).unwrap();
610
611 let record_refs: Vec<&[u8]> = records.iter().map(Vec::as_slice).collect();
612 let result = deser.deserialize_batch(&record_refs, &schema).unwrap();
613 assert_eq!(result.num_rows(), 2);
614 }
615
616 #[test]
617 fn test_roundtrip_empty_batch() {
618 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
619 let batch = RecordBatch::new_empty(schema.clone());
620
621 let ser = AvroSerializer::new(schema.clone(), 1);
622 let records = ser.serialize(&batch).unwrap();
623 assert!(records.is_empty());
624 }
625
626 #[test]
627 fn test_roundtrip_many_rows() {
628 let schema = Arc::new(Schema::new(vec![
629 Field::new("idx", DataType::Int64, false),
630 Field::new("label", DataType::Utf8, false),
631 ]));
632 let n = 100;
633 let ids: Vec<i64> = (0..n).collect();
634 let labels: Vec<String> = (0..n).map(|i| format!("row-{i}")).collect();
635 let batch = RecordBatch::try_new(
636 schema.clone(),
637 vec![
638 Arc::new(Int64Array::from(ids)),
639 Arc::new(StringArray::from(labels)),
640 ],
641 )
642 .unwrap();
643
644 let result = roundtrip(&batch, &schema);
645 assert_eq!(result.num_rows(), n as usize);
646
647 let ids = result
648 .column(0)
649 .as_any()
650 .downcast_ref::<Int64Array>()
651 .unwrap();
652 for i in 0..n as usize {
653 assert_eq!(ids.value(i), i as i64);
654 }
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661
662 #[test]
663 fn test_register_kafka_source() {
664 let registry = ConnectorRegistry::new();
665 register_kafka_source(®istry);
666
667 let sources = registry.list_sources();
668 assert!(sources.contains(&"kafka".to_string()));
669
670 let info = registry.source_info("kafka");
671 assert!(info.is_some());
672 let info = info.unwrap();
673 assert_eq!(info.name, "kafka");
674 assert!(info.is_source);
675 assert!(!info.is_sink);
676 assert!(!info.config_keys.is_empty());
677 }
678
679 #[test]
680 fn test_factory_creates_source() {
681 let registry = ConnectorRegistry::new();
682 register_kafka_source(®istry);
683
684 let config = crate::config::ConnectorConfig::new("kafka");
685 let source = registry.create_source(&config);
686 assert!(source.is_ok());
687 }
688
689 #[test]
690 fn test_register_kafka_sink() {
691 let registry = ConnectorRegistry::new();
692 register_kafka_sink(®istry);
693
694 let sinks = registry.list_sinks();
695 assert!(sinks.contains(&"kafka".to_string()));
696
697 let info = registry.sink_info("kafka");
698 assert!(info.is_some());
699 let info = info.unwrap();
700 assert_eq!(info.name, "kafka");
701 assert!(!info.is_source);
702 assert!(info.is_sink);
703 assert!(!info.config_keys.is_empty());
704 }
705
706 #[test]
707 fn test_factory_creates_sink() {
708 let registry = ConnectorRegistry::new();
709 register_kafka_sink(®istry);
710
711 let config = crate::config::ConnectorConfig::new("kafka");
712 let sink = registry.create_sink(&config);
713 assert!(sink.is_ok());
714 }
715}