Skip to main content

laminar_connectors/kafka/
mod.rs

1//! Kafka source and sink connectors for LaminarDB.
2//!
3//! Provides a `KafkaSource` that consumes from Kafka topics and
4//! produces Arrow `RecordBatch` data through the [`SourceConnector`]
5//! trait, and a `KafkaSink` that writes Arrow `RecordBatch` data
6//! to Kafka topics through the [`SinkConnector`] trait.
7//!
8//! Both connectors support JSON, CSV, Raw, Debezium, and Avro
9//! formats, with full Confluent Schema Registry integration for Avro.
10//!
11//! # Features
12//!
13//! - Per-partition offset tracking with checkpoint/restore (source)
14//! - At-least-once and exactly-once delivery (sink)
15//! - Confluent Schema Registry with caching and compatibility checking
16//! - Avro serialization/deserialization via `arrow-avro` (Confluent wire format)
17//! - Configurable partitioning: key-hash, round-robin, sticky (sink)
18//! - Backpressure control with high/low watermark hysteresis (source)
19//! - Consumer group rebalance tracking (source)
20//! - Dead letter queue for failed records (sink)
21//! - Atomic metrics counters
22//!
23//! # Usage
24//!
25//! ```rust,ignore
26//! use laminar_connectors::kafka::{KafkaSource, KafkaSourceConfig};
27//! use laminar_connectors::kafka::{KafkaSink, KafkaSinkConfig};
28//!
29//! // Source
30//! let config = KafkaSourceConfig::from_config(&connector_config)?;
31//! let source = KafkaSource::new(schema, config);
32//!
33//! // Sink
34//! let config = KafkaSinkConfig::from_config(&connector_config)?;
35//! let sink = KafkaSink::new(schema, config);
36//! ```
37//!
38//! [`SourceConnector`]: crate::connector::SourceConnector
39//! [`SinkConnector`]: crate::connector::SinkConnector
40
41// Source modules
42pub mod avro;
43pub mod backpressure;
44pub mod config;
45pub mod metrics;
46pub mod offsets;
47pub mod rebalance;
48pub mod source;
49pub mod watermarks;
50
51// Sink modules
52pub mod avro_serializer;
53pub mod partitioner;
54pub mod sink;
55pub mod sink_config;
56pub mod sink_metrics;
57
58// Shared modules
59pub mod schema_registry;
60
61// Discovery module (requires delta feature from laminar-core)
62#[cfg(feature = "kafka-discovery")]
63pub mod discovery;
64
65// Source re-exports
66pub use avro::AvroDeserializer;
67pub use config::{
68    AssignmentStrategy, CompatibilityLevel, IsolationLevel, KafkaSourceConfig, OffsetReset,
69    SaslMechanism, SecurityProtocol, SrAuth, StartupMode, TopicSubscription,
70};
71pub use metrics::KafkaSourceMetrics;
72pub use offsets::OffsetTracker;
73pub use source::KafkaSource;
74pub use watermarks::{
75    AlignmentCheckResult, KafkaAlignmentConfig, KafkaAlignmentMode, KafkaWatermarkTracker,
76    WatermarkMetrics, WatermarkMetricsSnapshot,
77};
78
79// Sink re-exports
80pub use avro_serializer::AvroSerializer;
81pub use partitioner::{
82    KafkaPartitioner, KeyHashPartitioner, RoundRobinPartitioner, StickyPartitioner,
83};
84pub use sink::KafkaSink;
85pub use sink_config::{
86    Acks, CompressionType, DeliveryGuarantee, KafkaSinkConfig, PartitionStrategy,
87};
88pub use sink_metrics::KafkaSinkMetrics;
89
90// Shared re-exports
91pub use schema_registry::{CachedSchema, CompatibilityResult, SchemaRegistryClient, SchemaType};
92
93use std::sync::Arc;
94
95use crate::config::{ConfigKeySpec, ConnectorInfo};
96use crate::registry::ConnectorRegistry;
97
98/// Registers the Kafka source connector with the given registry.
99///
100/// After registration, the runtime can instantiate `KafkaSource` by
101/// name when processing `CREATE SOURCE ... WITH (connector = 'kafka')`.
102pub fn register_kafka_source(registry: &ConnectorRegistry) {
103    let info = ConnectorInfo {
104        name: "kafka".to_string(),
105        display_name: "Apache Kafka Source".to_string(),
106        version: env!("CARGO_PKG_VERSION").to_string(),
107        is_source: true,
108        is_sink: false,
109        config_keys: kafka_source_config_keys(),
110    };
111
112    registry.register_source(
113        "kafka",
114        info,
115        Arc::new(|| {
116            use arrow_schema::{DataType, Field, Schema};
117
118            // Default schema — will be overridden during open() or via SQL DDL.
119            let default_schema = Arc::new(Schema::new(vec![
120                Field::new("key", DataType::Utf8, true),
121                Field::new("value", DataType::Utf8, false),
122            ]));
123            Box::new(KafkaSource::new(
124                default_schema,
125                KafkaSourceConfig::default(),
126            ))
127        }),
128    );
129}
130
131/// Registers the Kafka sink connector with the given registry.
132///
133/// After registration, the runtime can instantiate `KafkaSink` by
134/// name when processing `CREATE SINK ... WITH (connector = 'kafka')`.
135pub fn register_kafka_sink(registry: &ConnectorRegistry) {
136    let info = ConnectorInfo {
137        name: "kafka".to_string(),
138        display_name: "Apache Kafka Sink".to_string(),
139        version: env!("CARGO_PKG_VERSION").to_string(),
140        is_source: false,
141        is_sink: true,
142        config_keys: kafka_sink_config_keys(),
143    };
144
145    registry.register_sink(
146        "kafka",
147        info,
148        Arc::new(|| {
149            use arrow_schema::{DataType, Field, Schema};
150
151            // Default schema — will be overridden during open() or via SQL DDL.
152            let default_schema = Arc::new(Schema::new(vec![
153                Field::new("key", DataType::Utf8, true),
154                Field::new("value", DataType::Utf8, false),
155            ]));
156            Box::new(KafkaSink::new(default_schema, KafkaSinkConfig::default()))
157        }),
158    );
159}
160
161/// Returns the configuration key specifications for the Kafka source.
162#[allow(clippy::too_many_lines)]
163fn kafka_source_config_keys() -> Vec<ConfigKeySpec> {
164    vec![
165        // Required
166        ConfigKeySpec::required("bootstrap.servers", "Kafka broker addresses"),
167        ConfigKeySpec::required("group.id", "Consumer group identifier"),
168        ConfigKeySpec::required("topic", "Comma-separated list of topics"),
169        // Topic subscription (alternative to 'topic')
170        ConfigKeySpec::optional("topic.pattern", "Regex pattern for topic subscription", ""),
171        // Format
172        ConfigKeySpec::optional("format", "Data format (json/csv/avro/raw/debezium)", "json"),
173        // Security
174        ConfigKeySpec::optional(
175            "security.protocol",
176            "Security protocol (plaintext/ssl/sasl_plaintext/sasl_ssl)",
177            "plaintext",
178        ),
179        ConfigKeySpec::optional(
180            "sasl.mechanism",
181            "SASL mechanism (PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/GSSAPI/OAUTHBEARER)",
182            "",
183        ),
184        ConfigKeySpec::optional("sasl.username", "SASL username for PLAIN/SCRAM", ""),
185        ConfigKeySpec::optional("sasl.password", "SASL password for PLAIN/SCRAM", ""),
186        ConfigKeySpec::optional("ssl.ca.location", "SSL CA certificate file path", ""),
187        ConfigKeySpec::optional(
188            "ssl.certificate.location",
189            "Client SSL certificate file path",
190            "",
191        ),
192        ConfigKeySpec::optional("ssl.key.location", "Client SSL private key file path", ""),
193        ConfigKeySpec::optional("ssl.key.password", "Password for encrypted SSL key", ""),
194        // Consumer tuning
195        ConfigKeySpec::optional(
196            "startup.mode",
197            "Startup mode (group-offsets/earliest/latest)",
198            "group-offsets",
199        ),
200        ConfigKeySpec::optional(
201            "startup.specific.offsets",
202            "Start from specific offsets (format: 'partition:offset,...')",
203            "",
204        ),
205        ConfigKeySpec::optional(
206            "startup.timestamp.ms",
207            "Start from timestamp (milliseconds since epoch)",
208            "",
209        ),
210        ConfigKeySpec::optional(
211            "auto.offset.reset",
212            "Fallback when no committed offset (earliest/latest/none)",
213            "earliest",
214        ),
215        ConfigKeySpec::optional(
216            "isolation.level",
217            "Transaction isolation (read_uncommitted/read_committed)",
218            "read_committed",
219        ),
220        ConfigKeySpec::optional("max.poll.records", "Max records per poll", "1000"),
221        ConfigKeySpec::optional("poll.timeout.ms", "Poll timeout in milliseconds", "100"),
222        ConfigKeySpec::optional(
223            "partition.assignment.strategy",
224            "Partition assignment (range/roundrobin/cooperative-sticky)",
225            "range",
226        ),
227        // Fetch tuning
228        ConfigKeySpec::optional("fetch.min.bytes", "Minimum bytes per fetch request", "1"),
229        ConfigKeySpec::optional(
230            "fetch.max.bytes",
231            "Maximum bytes per fetch request",
232            "52428800",
233        ),
234        ConfigKeySpec::optional(
235            "fetch.max.wait.ms",
236            "Max wait time for fetch.min.bytes",
237            "500",
238        ),
239        ConfigKeySpec::optional(
240            "max.partition.fetch.bytes",
241            "Max bytes per partition per fetch",
242            "1048576",
243        ),
244        // Metadata
245        ConfigKeySpec::optional(
246            "include.metadata",
247            "Include _partition/_offset/_timestamp columns",
248            "false",
249        ),
250        ConfigKeySpec::optional("include.headers", "Include _headers column", "false"),
251        ConfigKeySpec::optional(
252            "event.time.column",
253            "Column name for event time extraction",
254            "",
255        ),
256        // Watermark
257        ConfigKeySpec::optional(
258            "max.out.of.orderness.ms",
259            "Max out-of-orderness for watermarks",
260            "5000",
261        ),
262        ConfigKeySpec::optional("idle.timeout.ms", "Idle partition timeout", "30000"),
263        ConfigKeySpec::optional(
264            "enable.watermark.tracking",
265            "Enable per-partition watermark tracking",
266            "false",
267        ),
268        ConfigKeySpec::optional(
269            "alignment.group.id",
270            "Alignment group ID for multi-source coordination",
271            "",
272        ),
273        ConfigKeySpec::optional(
274            "alignment.max.drift.ms",
275            "Maximum allowed drift between sources in alignment group",
276            "",
277        ),
278        ConfigKeySpec::optional(
279            "alignment.mode",
280            "Alignment enforcement mode (pause/warn-only/drop-excess)",
281            "pause",
282        ),
283        // Backpressure
284        ConfigKeySpec::optional(
285            "backpressure.high.watermark",
286            "Channel fill ratio to pause",
287            "0.8",
288        ),
289        ConfigKeySpec::optional(
290            "backpressure.low.watermark",
291            "Channel fill ratio to resume",
292            "0.5",
293        ),
294        // Schema Registry
295        ConfigKeySpec::optional(
296            "schema.registry.url",
297            "Confluent Schema Registry URL (required for Avro)",
298            "",
299        ),
300        ConfigKeySpec::optional("schema.registry.username", "Schema Registry username", ""),
301        ConfigKeySpec::optional("schema.registry.password", "Schema Registry password", ""),
302        ConfigKeySpec::optional(
303            "schema.registry.ssl.ca.location",
304            "Schema Registry SSL CA cert path",
305            "",
306        ),
307        ConfigKeySpec::optional(
308            "schema.registry.ssl.certificate.location",
309            "Schema Registry SSL client cert path",
310            "",
311        ),
312        ConfigKeySpec::optional(
313            "schema.registry.ssl.key.location",
314            "Schema Registry SSL client key path",
315            "",
316        ),
317        ConfigKeySpec::optional(
318            "schema.compatibility",
319            "Schema compatibility level override",
320            "",
321        ),
322    ]
323}
324
325/// Returns the configuration key specifications for the Kafka sink.
326fn kafka_sink_config_keys() -> Vec<ConfigKeySpec> {
327    vec![
328        // Required
329        ConfigKeySpec::required("bootstrap.servers", "Kafka broker addresses"),
330        ConfigKeySpec::required("topic", "Target Kafka topic"),
331        // Format
332        ConfigKeySpec::optional("format", "Serialization format (json/csv/avro/raw)", "json"),
333        // Security
334        ConfigKeySpec::optional(
335            "security.protocol",
336            "Security protocol (plaintext/ssl/sasl_plaintext/sasl_ssl)",
337            "plaintext",
338        ),
339        ConfigKeySpec::optional(
340            "sasl.mechanism",
341            "SASL mechanism (PLAIN/SCRAM-SHA-256/SCRAM-SHA-512/GSSAPI/OAUTHBEARER)",
342            "",
343        ),
344        ConfigKeySpec::optional("sasl.username", "SASL username for PLAIN/SCRAM", ""),
345        ConfigKeySpec::optional("sasl.password", "SASL password for PLAIN/SCRAM", ""),
346        ConfigKeySpec::optional("ssl.ca.location", "SSL CA certificate file path", ""),
347        ConfigKeySpec::optional(
348            "ssl.certificate.location",
349            "Client SSL certificate file path",
350            "",
351        ),
352        ConfigKeySpec::optional("ssl.key.location", "Client SSL private key file path", ""),
353        ConfigKeySpec::optional("ssl.key.password", "Password for encrypted SSL key", ""),
354        // Delivery & Transactions
355        ConfigKeySpec::optional(
356            "delivery.guarantee",
357            "Delivery guarantee (at-least-once/exactly-once)",
358            "at-least-once",
359        ),
360        ConfigKeySpec::optional(
361            "transactional.id",
362            "Transactional ID prefix (auto-generated if not set)",
363            "",
364        ),
365        ConfigKeySpec::optional(
366            "transaction.timeout.ms",
367            "Transaction timeout in milliseconds",
368            "60000",
369        ),
370        ConfigKeySpec::optional("acks", "Acknowledgment level (0/1/all)", "all"),
371        ConfigKeySpec::optional(
372            "max.in.flight.requests",
373            "Max in-flight requests (<=5 for exactly-once)",
374            "5",
375        ),
376        ConfigKeySpec::optional(
377            "delivery.timeout.ms",
378            "Delivery timeout in milliseconds",
379            "120000",
380        ),
381        // Partitioning
382        ConfigKeySpec::optional("key.column", "Column name to use as Kafka message key", ""),
383        ConfigKeySpec::optional(
384            "partitioner",
385            "Partitioning strategy (key-hash/round-robin/sticky)",
386            "key-hash",
387        ),
388        // Batching & Compression
389        ConfigKeySpec::optional("linger.ms", "Producer linger time in milliseconds", "5"),
390        ConfigKeySpec::optional("batch.size", "Producer batch size in bytes", "16384"),
391        ConfigKeySpec::optional("batch.num.messages", "Max messages per batch", "10000"),
392        ConfigKeySpec::optional(
393            "compression.type",
394            "Compression (none/gzip/snappy/lz4/zstd)",
395            "none",
396        ),
397        // Error Handling
398        ConfigKeySpec::optional(
399            "dlq.topic",
400            "Dead letter queue topic for failed records",
401            "",
402        ),
403        ConfigKeySpec::optional(
404            "flush.batch.size",
405            "Max records to buffer before flushing",
406            "1000",
407        ),
408        // Schema Registry
409        ConfigKeySpec::optional(
410            "schema.registry.url",
411            "Confluent Schema Registry URL (required for Avro)",
412            "",
413        ),
414        ConfigKeySpec::optional("schema.registry.username", "Schema Registry username", ""),
415        ConfigKeySpec::optional("schema.registry.password", "Schema Registry password", ""),
416        ConfigKeySpec::optional(
417            "schema.registry.ssl.ca.location",
418            "Schema Registry SSL CA cert path",
419            "",
420        ),
421        ConfigKeySpec::optional(
422            "schema.compatibility",
423            "Schema compatibility level override",
424            "",
425        ),
426    ]
427}
428
429/// Round-trip integration tests: serialize → deserialize → verify identity.
430#[cfg(test)]
431mod avro_roundtrip_tests {
432    use std::sync::Arc;
433
434    use arrow_array::{
435        BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
436    };
437    use arrow_schema::{DataType, Field, Schema, SchemaRef};
438
439    use super::avro::AvroDeserializer;
440    use super::avro_serializer::AvroSerializer;
441    use super::schema_registry::arrow_to_avro_schema;
442    use crate::serde::{RecordDeserializer, RecordSerializer};
443
444    /// Serializes a batch, deserializes each record, and asserts equality.
445    fn roundtrip(batch: &RecordBatch, schema: &SchemaRef) -> RecordBatch {
446        let avro_schema_json =
447            arrow_to_avro_schema(schema, "roundtrip_test").expect("Arrow→Avro schema");
448
449        let ser = AvroSerializer::new(schema.clone(), 1);
450        let records = ser.serialize(batch).expect("serialize");
451
452        let mut deser = AvroDeserializer::new();
453        deser
454            .register_schema(1, &avro_schema_json)
455            .expect("register schema");
456
457        let record_refs: Vec<&[u8]> = records.iter().map(Vec::as_slice).collect();
458        deser
459            .deserialize_batch(&record_refs, schema)
460            .expect("deserialize")
461    }
462
463    #[test]
464    fn test_roundtrip_primitives() {
465        let schema = Arc::new(Schema::new(vec![
466            Field::new("id", DataType::Int64, false),
467            Field::new("name", DataType::Utf8, false),
468            Field::new("price", DataType::Float64, false),
469        ]));
470        let batch = RecordBatch::try_new(
471            schema.clone(),
472            vec![
473                Arc::new(Int64Array::from(vec![1, 2, 3])),
474                Arc::new(StringArray::from(vec!["AAPL", "GOOG", "MSFT"])),
475                Arc::new(Float64Array::from(vec![150.0, 2800.0, 300.0])),
476            ],
477        )
478        .unwrap();
479
480        let result = roundtrip(&batch, &schema);
481        assert_eq!(result.num_rows(), 3);
482        assert_eq!(result.num_columns(), 3);
483
484        let ids = result
485            .column(0)
486            .as_any()
487            .downcast_ref::<Int64Array>()
488            .unwrap();
489        assert_eq!(ids.value(0), 1);
490        assert_eq!(ids.value(1), 2);
491        assert_eq!(ids.value(2), 3);
492
493        let names = result
494            .column(1)
495            .as_any()
496            .downcast_ref::<StringArray>()
497            .unwrap();
498        assert_eq!(names.value(0), "AAPL");
499        assert_eq!(names.value(1), "GOOG");
500        assert_eq!(names.value(2), "MSFT");
501
502        let prices = result
503            .column(2)
504            .as_any()
505            .downcast_ref::<Float64Array>()
506            .unwrap();
507        assert!((prices.value(0) - 150.0).abs() < f64::EPSILON);
508        assert!((prices.value(1) - 2800.0).abs() < f64::EPSILON);
509    }
510
511    #[test]
512    fn test_roundtrip_all_primitive_types() {
513        let schema = Arc::new(Schema::new(vec![
514            Field::new("b", DataType::Boolean, false),
515            Field::new("i32", DataType::Int32, false),
516            Field::new("i64", DataType::Int64, false),
517            Field::new("f32", DataType::Float32, false),
518            Field::new("f64", DataType::Float64, false),
519            Field::new("s", DataType::Utf8, false),
520        ]));
521        let batch = RecordBatch::try_new(
522            schema.clone(),
523            vec![
524                Arc::new(BooleanArray::from(vec![true, false])),
525                Arc::new(Int32Array::from(vec![42, -1])),
526                Arc::new(Int64Array::from(vec![100_000_000, -999])),
527                Arc::new(Float32Array::from(vec![3.14f32, -0.001f32])),
528                Arc::new(Float64Array::from(vec![2.718, 1e10])),
529                Arc::new(StringArray::from(vec!["hello", "world"])),
530            ],
531        )
532        .unwrap();
533
534        let result = roundtrip(&batch, &schema);
535        assert_eq!(result.num_rows(), 2);
536        assert_eq!(result.num_columns(), 6);
537
538        let bools = result
539            .column(0)
540            .as_any()
541            .downcast_ref::<BooleanArray>()
542            .unwrap();
543        assert!(bools.value(0));
544        assert!(!bools.value(1));
545
546        let ints = result
547            .column(1)
548            .as_any()
549            .downcast_ref::<Int32Array>()
550            .unwrap();
551        assert_eq!(ints.value(0), 42);
552        assert_eq!(ints.value(1), -1);
553    }
554
555    #[test]
556    fn test_roundtrip_single_row() {
557        let schema = Arc::new(Schema::new(vec![
558            Field::new("id", DataType::Int64, false),
559            Field::new("val", DataType::Utf8, false),
560        ]));
561        let batch = RecordBatch::try_new(
562            schema.clone(),
563            vec![
564                Arc::new(Int64Array::from(vec![99])),
565                Arc::new(StringArray::from(vec!["single"])),
566            ],
567        )
568        .unwrap();
569
570        let result = roundtrip(&batch, &schema);
571        assert_eq!(result.num_rows(), 1);
572        let val = result
573            .column(1)
574            .as_any()
575            .downcast_ref::<StringArray>()
576            .unwrap();
577        assert_eq!(val.value(0), "single");
578    }
579
580    #[test]
581    fn test_roundtrip_confluent_wire_format() {
582        let schema = Arc::new(Schema::new(vec![
583            Field::new("id", DataType::Int64, false),
584            Field::new("name", DataType::Utf8, false),
585        ]));
586        let batch = RecordBatch::try_new(
587            schema.clone(),
588            vec![
589                Arc::new(Int64Array::from(vec![1, 2])),
590                Arc::new(StringArray::from(vec!["a", "b"])),
591            ],
592        )
593        .unwrap();
594
595        let ser = AvroSerializer::new(schema.clone(), 42);
596        let records = ser.serialize(&batch).unwrap();
597
598        // Verify Confluent wire format: 0x00 + 4-byte BE schema ID.
599        for record in &records {
600            assert!(record.len() >= 5, "record too short");
601            assert_eq!(record[0], 0x00, "magic byte");
602            let schema_id = u32::from_be_bytes([record[1], record[2], record[3], record[4]]);
603            assert_eq!(schema_id, 42, "schema ID in header");
604        }
605
606        // Also verify round-trip works with schema ID 42.
607        let avro_schema_json = arrow_to_avro_schema(&schema, "test").unwrap();
608        let mut deser = AvroDeserializer::new();
609        deser.register_schema(42, &avro_schema_json).unwrap();
610
611        let record_refs: Vec<&[u8]> = records.iter().map(Vec::as_slice).collect();
612        let result = deser.deserialize_batch(&record_refs, &schema).unwrap();
613        assert_eq!(result.num_rows(), 2);
614    }
615
616    #[test]
617    fn test_roundtrip_empty_batch() {
618        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
619        let batch = RecordBatch::new_empty(schema.clone());
620
621        let ser = AvroSerializer::new(schema.clone(), 1);
622        let records = ser.serialize(&batch).unwrap();
623        assert!(records.is_empty());
624    }
625
626    #[test]
627    fn test_roundtrip_many_rows() {
628        let schema = Arc::new(Schema::new(vec![
629            Field::new("idx", DataType::Int64, false),
630            Field::new("label", DataType::Utf8, false),
631        ]));
632        let n = 100;
633        let ids: Vec<i64> = (0..n).collect();
634        let labels: Vec<String> = (0..n).map(|i| format!("row-{i}")).collect();
635        let batch = RecordBatch::try_new(
636            schema.clone(),
637            vec![
638                Arc::new(Int64Array::from(ids)),
639                Arc::new(StringArray::from(labels)),
640            ],
641        )
642        .unwrap();
643
644        let result = roundtrip(&batch, &schema);
645        assert_eq!(result.num_rows(), n as usize);
646
647        let ids = result
648            .column(0)
649            .as_any()
650            .downcast_ref::<Int64Array>()
651            .unwrap();
652        for i in 0..n as usize {
653            assert_eq!(ids.value(i), i as i64);
654        }
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661
662    #[test]
663    fn test_register_kafka_source() {
664        let registry = ConnectorRegistry::new();
665        register_kafka_source(&registry);
666
667        let sources = registry.list_sources();
668        assert!(sources.contains(&"kafka".to_string()));
669
670        let info = registry.source_info("kafka");
671        assert!(info.is_some());
672        let info = info.unwrap();
673        assert_eq!(info.name, "kafka");
674        assert!(info.is_source);
675        assert!(!info.is_sink);
676        assert!(!info.config_keys.is_empty());
677    }
678
679    #[test]
680    fn test_factory_creates_source() {
681        let registry = ConnectorRegistry::new();
682        register_kafka_source(&registry);
683
684        let config = crate::config::ConnectorConfig::new("kafka");
685        let source = registry.create_source(&config);
686        assert!(source.is_ok());
687    }
688
689    #[test]
690    fn test_register_kafka_sink() {
691        let registry = ConnectorRegistry::new();
692        register_kafka_sink(&registry);
693
694        let sinks = registry.list_sinks();
695        assert!(sinks.contains(&"kafka".to_string()));
696
697        let info = registry.sink_info("kafka");
698        assert!(info.is_some());
699        let info = info.unwrap();
700        assert_eq!(info.name, "kafka");
701        assert!(!info.is_source);
702        assert!(info.is_sink);
703        assert!(!info.config_keys.is_empty());
704    }
705
706    #[test]
707    fn test_factory_creates_sink() {
708        let registry = ConnectorRegistry::new();
709        register_kafka_sink(&registry);
710
711        let config = crate::config::ConnectorConfig::new("kafka");
712        let sink = registry.create_sink(&config);
713        assert!(sink.is_ok());
714    }
715}