1use std::collections::HashMap;
4use std::sync::Arc;
5
6use arrow_schema::SchemaRef;
7use async_trait::async_trait;
8use parking_lot::RwLock;
9
10use crate::config::{ConnectorConfig, ConnectorInfo};
11use crate::connector::{SinkConnector, SourceConnector};
12use crate::error::ConnectorError;
13use crate::reference::ReferenceTableSource;
14use crate::serde::{self, Format, RecordDeserializer, RecordSerializer};
15
16pub type SourceFactory =
21 Arc<dyn Fn(Option<&prometheus::Registry>) -> Box<dyn SourceConnector> + Send + Sync>;
22
23pub type SinkFactory =
28 Arc<dyn Fn(Option<&prometheus::Registry>) -> Box<dyn SinkConnector> + Send + Sync>;
29
30pub type TableSourceFactory = Arc<
32 dyn Fn(&ConnectorConfig) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> + Send + Sync,
33>;
34
35#[async_trait]
42pub trait LookupSourceFactory: Send + Sync {
43 async fn build(
50 &self,
51 config: ConnectorConfig,
52 declared_schema: Option<arrow_schema::SchemaRef>,
53 ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, ConnectorError>;
54}
55
56#[derive(Clone)]
60pub struct ConnectorRegistry {
61 sources: Arc<RwLock<HashMap<String, (ConnectorInfo, SourceFactory)>>>,
62 sinks: Arc<RwLock<HashMap<String, (ConnectorInfo, SinkFactory)>>>,
63 table_sources: Arc<RwLock<HashMap<String, (ConnectorInfo, TableSourceFactory)>>>,
64 lookup_sources: Arc<RwLock<HashMap<String, Arc<dyn LookupSourceFactory>>>>,
65}
66
67impl ConnectorRegistry {
68 #[must_use]
70 pub fn new() -> Self {
71 Self {
72 sources: Arc::new(RwLock::new(HashMap::new())),
73 sinks: Arc::new(RwLock::new(HashMap::new())),
74 table_sources: Arc::new(RwLock::new(HashMap::new())),
75 lookup_sources: Arc::new(RwLock::new(HashMap::new())),
76 }
77 }
78
79 pub fn register_source(
81 &self,
82 name: impl Into<String>,
83 info: ConnectorInfo,
84 factory: SourceFactory,
85 ) {
86 self.sources.write().insert(name.into(), (info, factory));
87 }
88
89 pub fn register_sink(
91 &self,
92 name: impl Into<String>,
93 info: ConnectorInfo,
94 factory: SinkFactory,
95 ) {
96 self.sinks.write().insert(name.into(), (info, factory));
97 }
98
99 pub async fn default_source_schema(
114 &self,
115 connector_type: &str,
116 properties: &std::collections::HashMap<String, String>,
117 ) -> Result<Option<SchemaRef>, ConnectorError> {
118 let factory = {
119 let sources = self.sources.read();
120 let Some((_, factory)) = sources.get(connector_type) else {
121 return Ok(None);
122 };
123 factory.clone()
124 };
125
126 let mut instance = factory(None);
127 instance.discover_schema(properties).await?;
128 let schema = instance.schema();
129 Ok((!schema.fields().is_empty()).then_some(schema))
130 }
131
132 pub fn create_source(
144 &self,
145 config: &ConnectorConfig,
146 registry: Option<&prometheus::Registry>,
147 ) -> Result<Box<dyn SourceConnector>, ConnectorError> {
148 let sources = self.sources.read();
149 let (_, factory) = sources.get(config.connector_type()).ok_or_else(|| {
150 ConnectorError::ConfigurationError(format!(
151 "unknown source connector type: '{}'",
152 config.connector_type()
153 ))
154 })?;
155 Ok(factory(registry))
156 }
157
158 pub fn create_sink(
170 &self,
171 config: &ConnectorConfig,
172 registry: Option<&prometheus::Registry>,
173 ) -> Result<Box<dyn SinkConnector>, ConnectorError> {
174 let sinks = self.sinks.read();
175 let (_, factory) = sinks.get(config.connector_type()).ok_or_else(|| {
176 ConnectorError::ConfigurationError(format!(
177 "unknown sink connector type: '{}'",
178 config.connector_type()
179 ))
180 })?;
181 Ok(factory(registry))
182 }
183
184 pub fn register_table_source(
186 &self,
187 name: impl Into<String>,
188 info: ConnectorInfo,
189 factory: TableSourceFactory,
190 ) {
191 self.table_sources
192 .write()
193 .insert(name.into(), (info, factory));
194 }
195
196 pub fn create_table_source(
205 &self,
206 config: &ConnectorConfig,
207 ) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> {
208 let table_sources = self.table_sources.read();
209 let (_, factory) = table_sources.get(config.connector_type()).ok_or_else(|| {
210 ConnectorError::ConfigurationError(format!(
211 "unknown table source connector type: '{}'",
212 config.connector_type()
213 ))
214 })?;
215 factory(config)
216 }
217
218 #[must_use]
220 pub fn list_table_sources(&self) -> Vec<String> {
221 self.table_sources.read().keys().cloned().collect()
222 }
223
224 pub fn register_lookup_source(
226 &self,
227 name: impl Into<String>,
228 factory: Arc<dyn LookupSourceFactory>,
229 ) {
230 self.lookup_sources.write().insert(name.into(), factory);
231 }
232
233 pub async fn create_lookup_source(
238 &self,
239 config: ConnectorConfig,
240 declared_schema: Option<arrow_schema::SchemaRef>,
241 ) -> Option<Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, ConnectorError>>
242 {
243 let factory = {
244 let lookup_sources = self.lookup_sources.read();
245 Arc::clone(lookup_sources.get(config.connector_type())?)
246 };
247 Some(factory.build(config, declared_schema).await)
248 }
249
250 #[must_use]
252 pub fn source_info(&self, name: &str) -> Option<ConnectorInfo> {
253 self.sources.read().get(name).map(|(info, _)| info.clone())
254 }
255
256 #[must_use]
258 pub fn sink_info(&self, name: &str) -> Option<ConnectorInfo> {
259 self.sinks.read().get(name).map(|(info, _)| info.clone())
260 }
261
262 #[must_use]
264 pub fn list_sources(&self) -> Vec<String> {
265 self.sources.read().keys().cloned().collect()
266 }
267
268 #[must_use]
270 pub fn list_sinks(&self) -> Vec<String> {
271 self.sinks.read().keys().cloned().collect()
272 }
273
274 pub fn create_deserializer(
280 &self,
281 format: &str,
282 ) -> Result<Box<dyn RecordDeserializer>, ConnectorError> {
283 let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
284 serde::create_deserializer(fmt).map_err(ConnectorError::Serde)
285 }
286
287 pub fn create_serializer(
293 &self,
294 format: &str,
295 ) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
296 let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
297 serde::create_serializer(fmt).map_err(ConnectorError::Serde)
298 }
299}
300
301impl Default for ConnectorRegistry {
302 fn default() -> Self {
303 Self::new()
304 }
305}
306
307impl std::fmt::Debug for ConnectorRegistry {
308 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309 f.debug_struct("ConnectorRegistry")
310 .field("sources", &self.list_sources())
311 .field("sinks", &self.list_sinks())
312 .field("table_sources", &self.list_table_sources())
313 .finish()
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use crate::testing::*;
321
322 fn mock_info(name: &str, is_source: bool, is_sink: bool) -> ConnectorInfo {
323 ConnectorInfo {
324 name: name.to_string(),
325 display_name: name.to_string(),
326 version: "0.1.0".to_string(),
327 is_source,
328 is_sink,
329 config_keys: vec![],
330 }
331 }
332
333 #[test]
334 fn test_register_and_create_source() {
335 let registry = ConnectorRegistry::new();
336 registry.register_source(
337 "mock",
338 mock_info("mock", true, false),
339 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
340 );
341
342 let config = ConnectorConfig::new("mock");
343 let connector = registry.create_source(&config, None);
344 assert!(connector.is_ok());
345 }
346
347 #[test]
348 fn test_register_and_create_sink() {
349 let registry = ConnectorRegistry::new();
350 registry.register_sink(
351 "mock",
352 mock_info("mock", false, true),
353 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
354 );
355
356 let config = ConnectorConfig::new("mock");
357 let connector = registry.create_sink(&config, None);
358 assert!(connector.is_ok());
359 }
360
361 #[test]
362 fn test_create_unknown_connector() {
363 let registry = ConnectorRegistry::new();
364 let config = ConnectorConfig::new("nonexistent");
365
366 assert!(registry.create_source(&config, None).is_err());
367 assert!(registry.create_sink(&config, None).is_err());
368 }
369
370 #[test]
371 fn test_list_connectors() {
372 let registry = ConnectorRegistry::new();
373 registry.register_source(
374 "kafka",
375 mock_info("kafka", true, false),
376 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
377 );
378 registry.register_sink(
379 "delta",
380 mock_info("delta", false, true),
381 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
382 );
383
384 let sources = registry.list_sources();
385 assert_eq!(sources.len(), 1);
386 assert!(sources.contains(&"kafka".to_string()));
387
388 let sinks = registry.list_sinks();
389 assert_eq!(sinks.len(), 1);
390 assert!(sinks.contains(&"delta".to_string()));
391 }
392
393 #[test]
394 fn test_connector_info() {
395 let registry = ConnectorRegistry::new();
396 registry.register_source(
397 "kafka",
398 mock_info("kafka", true, false),
399 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
400 );
401
402 let info = registry.source_info("kafka");
403 assert!(info.is_some());
404 assert_eq!(info.unwrap().name, "kafka");
405
406 assert!(registry.source_info("nonexistent").is_none());
407 }
408
409 #[test]
410 fn test_format_registry() {
411 let registry = ConnectorRegistry::new();
412
413 assert!(registry.create_deserializer("json").is_ok());
414 assert!(registry.create_serializer("csv").is_ok());
415 assert!(registry.create_deserializer("unknown").is_err());
416 }
417
418 #[tokio::test]
419 async fn default_source_schema_some_when_discovered() {
420 let registry = ConnectorRegistry::new();
421 registry.register_source(
422 "mock",
423 mock_info("mock", true, false),
424 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
425 );
426 let schema = registry
427 .default_source_schema("mock", &std::collections::HashMap::new())
428 .await
429 .expect("discovery must not fail");
430 assert!(schema.is_some_and(|s| !s.fields().is_empty()));
431 }
432
433 #[tokio::test]
434 async fn default_source_schema_none_for_unknown_connector() {
435 let registry = ConnectorRegistry::new();
436 assert!(registry
437 .default_source_schema("nope", &std::collections::HashMap::new())
438 .await
439 .expect("unknown connector is Ok(None), not Err")
440 .is_none());
441 }
442
443 #[test]
446 fn test_register_and_create_table_source() {
447 use crate::reference::MockReferenceTableSource;
448
449 let registry = ConnectorRegistry::new();
450 registry.register_table_source(
451 "mock",
452 mock_info("mock", true, false),
453 Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
454 );
455
456 let config = ConnectorConfig::new("mock");
457 let source = registry.create_table_source(&config);
458 assert!(source.is_ok());
459 }
460
461 #[test]
462 fn test_create_unknown_table_source() {
463 let registry = ConnectorRegistry::new();
464 let config = ConnectorConfig::new("nonexistent");
465 let result = registry.create_table_source(&config);
466 match result {
467 Err(e) => assert!(e.to_string().contains("unknown table source"), "got: {e}"),
468 Ok(_) => panic!("Expected error for unknown table source"),
469 }
470 }
471
472 #[test]
473 fn test_list_table_sources() {
474 use crate::reference::MockReferenceTableSource;
475
476 let registry = ConnectorRegistry::new();
477 assert!(registry.list_table_sources().is_empty());
478
479 registry.register_table_source(
480 "mock-table",
481 mock_info("mock-table", true, false),
482 Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
483 );
484
485 let names = registry.list_table_sources();
486 assert_eq!(names.len(), 1);
487 assert!(names.contains(&"mock-table".to_string()));
488 }
489}