1pub mod avro;
5pub mod config;
6pub mod metrics;
7pub mod offsets;
8pub mod rebalance;
9pub mod source;
10pub mod watermarks;
11
12pub mod avro_serializer;
14pub mod partitioner;
15pub mod sink;
16pub mod sink_config;
17pub mod sink_metrics;
18
19pub mod schema_registry;
21
22pub use avro::AvroDeserializer;
24pub use config::{
25 AssignmentStrategy, CompatibilityLevel, IsolationLevel, KafkaSourceConfig, OffsetReset,
26 SaslMechanism, SchemaEvolutionStrategy, SecurityProtocol, SrAuth, StartupMode,
27 TopicSubscription,
28};
29pub use metrics::KafkaSourceMetrics;
30pub use offsets::OffsetTracker;
31pub use source::KafkaSource;
32pub use watermarks::{KafkaWatermarkTracker, WatermarkMetrics};
33
34pub use avro_serializer::AvroSerializer;
36pub use partitioner::{
37 KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
38};
39pub use sink::KafkaSink;
40pub use sink_config::{
41 Acks, CompressionType, DeliveryGuarantee, KafkaSinkConfig, PartitionStrategy,
42};
43pub use sink_metrics::KafkaSinkMetrics;
44
45pub use schema_registry::{CachedSchema, CompatibilityResult, SchemaRegistryClient, SchemaType};
47
48use std::sync::Arc;
49
50use crate::config::{ConfigKeySpec, ConnectorInfo};
51use crate::registry::ConnectorRegistry;
52
53pub fn register_kafka_source(registry: &ConnectorRegistry) {
55 let info = ConnectorInfo {
56 name: "kafka".to_string(),
57 display_name: "Apache Kafka Source".to_string(),
58 version: env!("CARGO_PKG_VERSION").to_string(),
59 is_source: true,
60 is_sink: false,
61 config_keys: kafka_source_config_keys(),
62 };
63
64 registry.register_source(
65 "kafka",
66 info,
67 Arc::new(|registry: Option<&prometheus::Registry>| {
68 let empty = Arc::new(arrow_schema::Schema::empty());
70 Box::new(KafkaSource::new(
71 empty,
72 KafkaSourceConfig::default(),
73 registry,
74 ))
75 }),
76 );
77}
78
79pub fn register_kafka_sink(registry: &ConnectorRegistry) {
81 let info = ConnectorInfo {
82 name: "kafka".to_string(),
83 display_name: "Apache Kafka Sink".to_string(),
84 version: env!("CARGO_PKG_VERSION").to_string(),
85 is_source: false,
86 is_sink: true,
87 config_keys: kafka_sink_config_keys(),
88 };
89
90 registry.register_sink(
91 "kafka",
92 info,
93 Arc::new(|registry: Option<&prometheus::Registry>| {
94 let empty = Arc::new(arrow_schema::Schema::empty());
96 Box::new(KafkaSink::new(empty, KafkaSinkConfig::default(), registry))
97 }),
98 );
99}
100
101#[allow(clippy::too_many_lines)]
103fn kafka_source_config_keys() -> Vec<ConfigKeySpec> {
104 vec![
105 ConfigKeySpec::required("bootstrap.servers", "Kafka broker addresses"),
107 ConfigKeySpec::required("group.id", "Consumer group identifier"),
108 ConfigKeySpec::required("topic", "Comma-separated list of topics"),
109 ConfigKeySpec::optional("topic.pattern", "Regex pattern for topic subscription", ""),
111 ConfigKeySpec::optional("format", "Data format (json/csv/avro/raw/debezium)", "json"),
113 ConfigKeySpec::optional(
115 "security.protocol",
116 "Security protocol (plaintext/ssl/sasl_plaintext/sasl_ssl)",
117 "plaintext",
118 ),
119 ConfigKeySpec::optional(
120 "sasl.mechanism",
121 "SASL mechanism (PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/GSSAPI/OAUTHBEARER)",
122 "",
123 ),
124 ConfigKeySpec::optional("sasl.username", "SASL username for PLAIN/SCRAM", ""),
125 ConfigKeySpec::optional("sasl.password", "SASL password for PLAIN/SCRAM", ""),
126 ConfigKeySpec::optional("ssl.ca.location", "SSL CA certificate file path", ""),
127 ConfigKeySpec::optional(
128 "ssl.certificate.location",
129 "Client SSL certificate file path",
130 "",
131 ),
132 ConfigKeySpec::optional("ssl.key.location", "Client SSL private key file path", ""),
133 ConfigKeySpec::optional("ssl.key.password", "Password for encrypted SSL key", ""),
134 ConfigKeySpec::optional(
136 "startup.mode",
137 "Startup mode (group-offsets/earliest/latest)",
138 "group-offsets",
139 ),
140 ConfigKeySpec::optional(
141 "startup.specific.offsets",
142 "Start from specific offsets (format: 'partition:offset,...')",
143 "",
144 ),
145 ConfigKeySpec::optional(
146 "startup.timestamp.ms",
147 "Start from timestamp (milliseconds since epoch)",
148 "",
149 ),
150 ConfigKeySpec::optional(
151 "auto.offset.reset",
152 "Fallback when no committed offset (earliest/latest/none)",
153 "earliest",
154 ),
155 ConfigKeySpec::optional(
156 "isolation.level",
157 "Transaction isolation (read_uncommitted/read_committed)",
158 "read_committed",
159 ),
160 ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
161 ConfigKeySpec::optional(
162 "partition.assignment.strategy",
163 "Partition assignment (range/roundrobin/cooperative-sticky)",
164 "range",
165 ),
166 ConfigKeySpec::optional(
168 "session.timeout.ms",
169 "Consumer session timeout in milliseconds (production-safe default)",
170 "45000",
171 ),
172 ConfigKeySpec::optional(
173 "heartbeat.interval.ms",
174 "Consumer heartbeat interval in milliseconds",
175 "10000",
176 ),
177 ConfigKeySpec::optional(
178 "queued.max.messages.kbytes",
179 "Max per-partition pre-fetch queue size in kbytes",
180 "16384",
181 ),
182 ConfigKeySpec::optional("fetch.min.bytes", "Minimum bytes per fetch request", "1"),
184 ConfigKeySpec::optional(
185 "fetch.max.bytes",
186 "Maximum bytes per fetch request",
187 "52428800",
188 ),
189 ConfigKeySpec::optional(
190 "fetch.max.wait.ms",
191 "Max wait time for fetch.min.bytes",
192 "500",
193 ),
194 ConfigKeySpec::optional(
195 "max.partition.fetch.bytes",
196 "Max bytes per partition per fetch",
197 "1048576",
198 ),
199 ConfigKeySpec::optional(
201 "include.metadata",
202 "Include _partition/_offset/_timestamp columns",
203 "false",
204 ),
205 ConfigKeySpec::optional("include.headers", "Include _headers column", "false"),
206 ConfigKeySpec::optional(
207 "event.time.column",
208 "Column name for event time extraction",
209 "",
210 ),
211 ConfigKeySpec::optional(
213 "max.out.of.orderness.ms",
214 "Max out-of-orderness for watermarks",
215 "5000",
216 ),
217 ConfigKeySpec::optional("idle.timeout.ms", "Idle partition timeout", "30000"),
218 ConfigKeySpec::optional(
219 "enable.watermark.tracking",
220 "Enable per-partition watermark tracking",
221 "false",
222 ),
223 ConfigKeySpec::optional(
225 "backpressure.high.watermark",
226 "Channel fill ratio to pause",
227 "0.8",
228 ),
229 ConfigKeySpec::optional(
230 "backpressure.low.watermark",
231 "Channel fill ratio to resume",
232 "0.5",
233 ),
234 ConfigKeySpec::optional(
236 "max.deser.error.rate",
237 "Max tolerated deserialization error rate per batch (0.0-1.0)",
238 "0.5",
239 ),
240 ConfigKeySpec::optional(
242 "schema.registry.url",
243 "Confluent Schema Registry URL (required for Avro)",
244 "",
245 ),
246 ConfigKeySpec::optional("schema.registry.username", "Schema Registry username", ""),
247 ConfigKeySpec::optional("schema.registry.password", "Schema Registry password", ""),
248 ConfigKeySpec::optional(
249 "schema.registry.ssl.ca.location",
250 "Schema Registry SSL CA cert path",
251 "",
252 ),
253 ConfigKeySpec::optional(
254 "schema.registry.ssl.certificate.location",
255 "Schema Registry SSL client cert path",
256 "",
257 ),
258 ConfigKeySpec::optional(
259 "schema.registry.ssl.key.location",
260 "Schema Registry SSL client key path",
261 "",
262 ),
263 ConfigKeySpec::optional(
264 "schema.compatibility",
265 "Schema compatibility level override",
266 "",
267 ),
268 ConfigKeySpec::optional(
269 "schema.evolution.strategy",
270 "Runtime schema evolution handling (log/reject/ignore)",
271 "log",
272 ),
273 ]
274}
275
276fn kafka_sink_config_keys() -> Vec<ConfigKeySpec> {
278 vec![
279 ConfigKeySpec::required("bootstrap.servers", "Kafka broker addresses"),
281 ConfigKeySpec::required("topic", "Target Kafka topic"),
282 ConfigKeySpec::optional("format", "Serialization format (json/csv/avro/raw)", "json"),
284 ConfigKeySpec::optional(
286 "security.protocol",
287 "Security protocol (plaintext/ssl/sasl_plaintext/sasl_ssl)",
288 "plaintext",
289 ),
290 ConfigKeySpec::optional(
291 "sasl.mechanism",
292 "SASL mechanism (PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/GSSAPI/OAUTHBEARER)",
293 "",
294 ),
295 ConfigKeySpec::optional("sasl.username", "SASL username for PLAIN/SCRAM", ""),
296 ConfigKeySpec::optional("sasl.password", "SASL password for PLAIN/SCRAM", ""),
297 ConfigKeySpec::optional("ssl.ca.location", "SSL CA certificate file path", ""),
298 ConfigKeySpec::optional(
299 "ssl.certificate.location",
300 "Client SSL certificate file path",
301 "",
302 ),
303 ConfigKeySpec::optional("ssl.key.location", "Client SSL private key file path", ""),
304 ConfigKeySpec::optional("ssl.key.password", "Password for encrypted SSL key", ""),
305 ConfigKeySpec::optional(
307 "delivery.guarantee",
308 "Delivery guarantee (at-least-once/exactly-once)",
309 "at-least-once",
310 ),
311 ConfigKeySpec::optional(
312 "transactional.id",
313 "Transactional ID prefix (auto-generated if not set)",
314 "",
315 ),
316 ConfigKeySpec::optional(
317 "transaction.timeout.ms",
318 "Transaction timeout in milliseconds",
319 "60000",
320 ),
321 ConfigKeySpec::optional("acks", "Acknowledgment level (0/1/all)", "all"),
322 ConfigKeySpec::optional(
323 "max.in.flight.requests",
324 "Max in-flight requests (<=5 for exactly-once)",
325 "5",
326 ),
327 ConfigKeySpec::optional(
328 "delivery.timeout.ms",
329 "Delivery timeout in milliseconds",
330 "120000",
331 ),
332 ConfigKeySpec::optional("key.column", "Column name to use as Kafka message key", ""),
334 ConfigKeySpec::optional(
335 "partitioner",
336 "Partitioning strategy (key-hash/round-robin/sticky)",
337 "key-hash",
338 ),
339 ConfigKeySpec::optional("linger.ms", "Producer linger time in milliseconds", "5"),
341 ConfigKeySpec::optional("batch.size", "Producer batch size in bytes", "16384"),
342 ConfigKeySpec::optional("batch.num.messages", "Max messages per batch", "10000"),
343 ConfigKeySpec::optional(
344 "compression.type",
345 "Compression (none/gzip/snappy/lz4/zstd)",
346 "none",
347 ),
348 ConfigKeySpec::optional(
350 "dlq.topic",
351 "Dead letter queue topic for failed records",
352 "",
353 ),
354 ConfigKeySpec::optional(
355 "flush.batch.size",
356 "Max records to buffer before flushing",
357 "1000",
358 ),
359 ConfigKeySpec::optional(
361 "schema.registry.url",
362 "Confluent Schema Registry URL (required for Avro)",
363 "",
364 ),
365 ConfigKeySpec::optional("schema.registry.username", "Schema Registry username", ""),
366 ConfigKeySpec::optional("schema.registry.password", "Schema Registry password", ""),
367 ConfigKeySpec::optional(
368 "schema.registry.ssl.ca.location",
369 "Schema Registry SSL CA cert path",
370 "",
371 ),
372 ConfigKeySpec::optional(
373 "schema.compatibility",
374 "Schema compatibility level override",
375 "",
376 ),
377 ]
378}
379
380#[cfg(test)]
382mod avro_roundtrip_tests {
383 use std::sync::Arc;
384
385 use arrow_array::{
386 BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
387 };
388 use arrow_schema::{DataType, Field, Schema, SchemaRef};
389
390 use super::avro::AvroDeserializer;
391 use super::avro_serializer::AvroSerializer;
392 use super::schema_registry::arrow_to_avro_schema;
393 use crate::serde::{RecordDeserializer, RecordSerializer};
394
395 fn roundtrip(batch: &RecordBatch, schema: &SchemaRef) -> RecordBatch {
397 let avro_schema_json =
398 arrow_to_avro_schema(schema, "roundtrip_test").expect("Arrow→Avro schema");
399
400 let ser = AvroSerializer::new(schema.clone(), 1);
401 let records = ser.serialize(batch).expect("serialize");
402
403 let mut deser = AvroDeserializer::new();
404 deser
405 .register_schema(1, &avro_schema_json)
406 .expect("register schema");
407
408 let record_refs: Vec<&[u8]> = records.iter().map(Vec::as_slice).collect();
409 deser
410 .deserialize_batch(&record_refs, schema)
411 .expect("deserialize")
412 }
413
414 #[test]
415 fn test_roundtrip_primitives() {
416 let schema = Arc::new(Schema::new(vec![
417 Field::new("id", DataType::Int64, false),
418 Field::new("name", DataType::Utf8, false),
419 Field::new("price", DataType::Float64, false),
420 ]));
421 let batch = RecordBatch::try_new(
422 schema.clone(),
423 vec![
424 Arc::new(Int64Array::from(vec![1, 2, 3])),
425 Arc::new(StringArray::from(vec!["AAPL", "GOOG", "MSFT"])),
426 Arc::new(Float64Array::from(vec![150.0, 2800.0, 300.0])),
427 ],
428 )
429 .unwrap();
430
431 let result = roundtrip(&batch, &schema);
432 assert_eq!(result.num_rows(), 3);
433 assert_eq!(result.num_columns(), 3);
434
435 let ids = result
436 .column(0)
437 .as_any()
438 .downcast_ref::<Int64Array>()
439 .unwrap();
440 assert_eq!(ids.value(0), 1);
441 assert_eq!(ids.value(1), 2);
442 assert_eq!(ids.value(2), 3);
443
444 let names = result
445 .column(1)
446 .as_any()
447 .downcast_ref::<StringArray>()
448 .unwrap();
449 assert_eq!(names.value(0), "AAPL");
450 assert_eq!(names.value(1), "GOOG");
451 assert_eq!(names.value(2), "MSFT");
452
453 let prices = result
454 .column(2)
455 .as_any()
456 .downcast_ref::<Float64Array>()
457 .unwrap();
458 assert!((prices.value(0) - 150.0).abs() < f64::EPSILON);
459 assert!((prices.value(1) - 2800.0).abs() < f64::EPSILON);
460 }
461
462 #[test]
463 fn test_roundtrip_all_primitive_types() {
464 let schema = Arc::new(Schema::new(vec![
465 Field::new("b", DataType::Boolean, false),
466 Field::new("i32", DataType::Int32, false),
467 Field::new("i64", DataType::Int64, false),
468 Field::new("f32", DataType::Float32, false),
469 Field::new("f64", DataType::Float64, false),
470 Field::new("s", DataType::Utf8, false),
471 ]));
472 let batch = RecordBatch::try_new(
473 schema.clone(),
474 vec![
475 Arc::new(BooleanArray::from(vec![true, false])),
476 Arc::new(Int32Array::from(vec![42, -1])),
477 Arc::new(Int64Array::from(vec![100_000_000, -999])),
478 Arc::new(Float32Array::from(vec![3.14f32, -0.001f32])),
479 Arc::new(Float64Array::from(vec![2.718, 1e10])),
480 Arc::new(StringArray::from(vec!["hello", "world"])),
481 ],
482 )
483 .unwrap();
484
485 let result = roundtrip(&batch, &schema);
486 assert_eq!(result.num_rows(), 2);
487 assert_eq!(result.num_columns(), 6);
488
489 let bools = result
490 .column(0)
491 .as_any()
492 .downcast_ref::<BooleanArray>()
493 .unwrap();
494 assert!(bools.value(0));
495 assert!(!bools.value(1));
496
497 let ints = result
498 .column(1)
499 .as_any()
500 .downcast_ref::<Int32Array>()
501 .unwrap();
502 assert_eq!(ints.value(0), 42);
503 assert_eq!(ints.value(1), -1);
504 }
505
506 #[test]
507 fn test_roundtrip_single_row() {
508 let schema = Arc::new(Schema::new(vec![
509 Field::new("id", DataType::Int64, false),
510 Field::new("val", DataType::Utf8, false),
511 ]));
512 let batch = RecordBatch::try_new(
513 schema.clone(),
514 vec![
515 Arc::new(Int64Array::from(vec![99])),
516 Arc::new(StringArray::from(vec!["single"])),
517 ],
518 )
519 .unwrap();
520
521 let result = roundtrip(&batch, &schema);
522 assert_eq!(result.num_rows(), 1);
523 let val = result
524 .column(1)
525 .as_any()
526 .downcast_ref::<StringArray>()
527 .unwrap();
528 assert_eq!(val.value(0), "single");
529 }
530
531 #[test]
532 fn test_roundtrip_confluent_wire_format() {
533 let schema = Arc::new(Schema::new(vec![
534 Field::new("id", DataType::Int64, false),
535 Field::new("name", DataType::Utf8, false),
536 ]));
537 let batch = RecordBatch::try_new(
538 schema.clone(),
539 vec![
540 Arc::new(Int64Array::from(vec![1, 2])),
541 Arc::new(StringArray::from(vec!["a", "b"])),
542 ],
543 )
544 .unwrap();
545
546 let ser = AvroSerializer::new(schema.clone(), 42);
547 let records = ser.serialize(&batch).unwrap();
548
549 for record in &records {
551 assert!(record.len() >= 5, "record too short");
552 assert_eq!(record[0], 0x00, "magic byte");
553 let schema_id = u32::from_be_bytes([record[1], record[2], record[3], record[4]]);
554 assert_eq!(schema_id, 42, "schema ID in header");
555 }
556
557 let avro_schema_json = arrow_to_avro_schema(&schema, "test").unwrap();
559 let mut deser = AvroDeserializer::new();
560 deser.register_schema(42, &avro_schema_json).unwrap();
561
562 let record_refs: Vec<&[u8]> = records.iter().map(Vec::as_slice).collect();
563 let result = deser.deserialize_batch(&record_refs, &schema).unwrap();
564 assert_eq!(result.num_rows(), 2);
565 }
566
567 #[test]
568 fn test_roundtrip_empty_batch() {
569 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
570 let batch = RecordBatch::new_empty(schema.clone());
571
572 let ser = AvroSerializer::new(schema.clone(), 1);
573 let records = ser.serialize(&batch).unwrap();
574 assert!(records.is_empty());
575 }
576
577 #[test]
578 fn test_roundtrip_many_rows() {
579 let schema = Arc::new(Schema::new(vec![
580 Field::new("idx", DataType::Int64, false),
581 Field::new("label", DataType::Utf8, false),
582 ]));
583 let n = 100;
584 let ids: Vec<i64> = (0..n).collect();
585 let labels: Vec<String> = (0..n).map(|i| format!("row-{i}")).collect();
586 let batch = RecordBatch::try_new(
587 schema.clone(),
588 vec![
589 Arc::new(Int64Array::from(ids)),
590 Arc::new(StringArray::from(labels)),
591 ],
592 )
593 .unwrap();
594
595 let result = roundtrip(&batch, &schema);
596 assert_eq!(result.num_rows(), n as usize);
597
598 let ids = result
599 .column(0)
600 .as_any()
601 .downcast_ref::<Int64Array>()
602 .unwrap();
603 for i in 0..n as usize {
604 assert_eq!(ids.value(i), i as i64);
605 }
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612
613 #[test]
614 fn test_register_kafka_source() {
615 let registry = ConnectorRegistry::new();
616 register_kafka_source(®istry);
617
618 let sources = registry.list_sources();
619 assert!(sources.contains(&"kafka".to_string()));
620
621 let info = registry.source_info("kafka");
622 assert!(info.is_some());
623 let info = info.unwrap();
624 assert_eq!(info.name, "kafka");
625 assert!(info.is_source);
626 assert!(!info.is_sink);
627 assert!(!info.config_keys.is_empty());
628 }
629
630 #[test]
631 fn test_factory_creates_source() {
632 let registry = ConnectorRegistry::new();
633 register_kafka_source(®istry);
634
635 let config = crate::config::ConnectorConfig::new("kafka");
636 let source = registry.create_source(&config, None);
637 assert!(source.is_ok());
638 }
639
640 #[test]
641 fn test_register_kafka_sink() {
642 let registry = ConnectorRegistry::new();
643 register_kafka_sink(®istry);
644
645 let sinks = registry.list_sinks();
646 assert!(sinks.contains(&"kafka".to_string()));
647
648 let info = registry.sink_info("kafka");
649 assert!(info.is_some());
650 let info = info.unwrap();
651 assert_eq!(info.name, "kafka");
652 assert!(!info.is_source);
653 assert!(info.is_sink);
654 assert!(!info.config_keys.is_empty());
655 }
656
657 #[test]
658 fn test_factory_creates_sink() {
659 let registry = ConnectorRegistry::new();
660 register_kafka_sink(®istry);
661
662 let config = crate::config::ConnectorConfig::new("kafka");
663 let sink = registry.create_sink(&config, None);
664 assert!(sink.is_ok());
665 }
666}