1use std::collections::HashMap;
4use std::sync::Arc;
5
6use arrow_schema::SchemaRef;
7use async_trait::async_trait;
8use parking_lot::RwLock;
9
10use crate::config::{ConnectorConfig, ConnectorInfo};
11use crate::connector::{SinkConnector, SourceConnector};
12use crate::error::ConnectorError;
13use crate::reference::ReferenceTableSource;
14use crate::serde::{self, Format, RecordDeserializer, RecordSerializer};
15
16pub type SourceFactory =
21 Arc<dyn Fn(Option<&prometheus::Registry>) -> Box<dyn SourceConnector> + Send + Sync>;
22
23pub type SinkFactory =
28 Arc<dyn Fn(Option<&prometheus::Registry>) -> Box<dyn SinkConnector> + Send + Sync>;
29
30pub type TableSourceFactory = Arc<
32 dyn Fn(&ConnectorConfig) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> + Send + Sync,
33>;
34
35#[async_trait]
42pub trait LookupSourceFactory: Send + Sync {
43 async fn build(
45 &self,
46 config: ConnectorConfig,
47 ) -> Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, ConnectorError>;
48}
49
50#[derive(Clone)]
54pub struct ConnectorRegistry {
55 sources: Arc<RwLock<HashMap<String, (ConnectorInfo, SourceFactory)>>>,
56 sinks: Arc<RwLock<HashMap<String, (ConnectorInfo, SinkFactory)>>>,
57 table_sources: Arc<RwLock<HashMap<String, (ConnectorInfo, TableSourceFactory)>>>,
58 lookup_sources: Arc<RwLock<HashMap<String, Arc<dyn LookupSourceFactory>>>>,
59}
60
61impl ConnectorRegistry {
62 #[must_use]
64 pub fn new() -> Self {
65 Self {
66 sources: Arc::new(RwLock::new(HashMap::new())),
67 sinks: Arc::new(RwLock::new(HashMap::new())),
68 table_sources: Arc::new(RwLock::new(HashMap::new())),
69 lookup_sources: Arc::new(RwLock::new(HashMap::new())),
70 }
71 }
72
73 pub fn register_source(
75 &self,
76 name: impl Into<String>,
77 info: ConnectorInfo,
78 factory: SourceFactory,
79 ) {
80 self.sources.write().insert(name.into(), (info, factory));
81 }
82
83 pub fn register_sink(
85 &self,
86 name: impl Into<String>,
87 info: ConnectorInfo,
88 factory: SinkFactory,
89 ) {
90 self.sinks.write().insert(name.into(), (info, factory));
91 }
92
93 pub async fn default_source_schema(
97 &self,
98 connector_type: &str,
99 properties: &std::collections::HashMap<String, String>,
100 ) -> Option<SchemaRef> {
101 let factory = {
104 let sources = self.sources.read();
105 let (_, factory) = sources.get(connector_type)?;
106 factory.clone()
107 };
108
109 let mut instance = factory(None);
110 instance.discover_schema(properties).await;
111 let schema = instance.schema();
112 if schema.fields().is_empty() {
113 None
114 } else {
115 Some(schema)
116 }
117 }
118
119 pub fn create_source(
131 &self,
132 config: &ConnectorConfig,
133 registry: Option<&prometheus::Registry>,
134 ) -> Result<Box<dyn SourceConnector>, ConnectorError> {
135 let sources = self.sources.read();
136 let (_, factory) = sources.get(config.connector_type()).ok_or_else(|| {
137 ConnectorError::ConfigurationError(format!(
138 "unknown source connector type: '{}'",
139 config.connector_type()
140 ))
141 })?;
142 Ok(factory(registry))
143 }
144
145 pub fn create_sink(
157 &self,
158 config: &ConnectorConfig,
159 registry: Option<&prometheus::Registry>,
160 ) -> Result<Box<dyn SinkConnector>, ConnectorError> {
161 let sinks = self.sinks.read();
162 let (_, factory) = sinks.get(config.connector_type()).ok_or_else(|| {
163 ConnectorError::ConfigurationError(format!(
164 "unknown sink connector type: '{}'",
165 config.connector_type()
166 ))
167 })?;
168 Ok(factory(registry))
169 }
170
171 pub fn register_table_source(
173 &self,
174 name: impl Into<String>,
175 info: ConnectorInfo,
176 factory: TableSourceFactory,
177 ) {
178 self.table_sources
179 .write()
180 .insert(name.into(), (info, factory));
181 }
182
183 pub fn create_table_source(
192 &self,
193 config: &ConnectorConfig,
194 ) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> {
195 let table_sources = self.table_sources.read();
196 let (_, factory) = table_sources.get(config.connector_type()).ok_or_else(|| {
197 ConnectorError::ConfigurationError(format!(
198 "unknown table source connector type: '{}'",
199 config.connector_type()
200 ))
201 })?;
202 factory(config)
203 }
204
205 #[must_use]
207 pub fn list_table_sources(&self) -> Vec<String> {
208 self.table_sources.read().keys().cloned().collect()
209 }
210
211 pub fn register_lookup_source(
213 &self,
214 name: impl Into<String>,
215 factory: Arc<dyn LookupSourceFactory>,
216 ) {
217 self.lookup_sources.write().insert(name.into(), factory);
218 }
219
220 pub async fn create_lookup_source(
225 &self,
226 config: ConnectorConfig,
227 ) -> Option<Result<Arc<dyn laminar_core::lookup::source::LookupSourceDyn>, ConnectorError>>
228 {
229 let factory = {
230 let lookup_sources = self.lookup_sources.read();
231 Arc::clone(lookup_sources.get(config.connector_type())?)
232 };
233 Some(factory.build(config).await)
234 }
235
236 #[must_use]
238 pub fn source_info(&self, name: &str) -> Option<ConnectorInfo> {
239 self.sources.read().get(name).map(|(info, _)| info.clone())
240 }
241
242 #[must_use]
244 pub fn sink_info(&self, name: &str) -> Option<ConnectorInfo> {
245 self.sinks.read().get(name).map(|(info, _)| info.clone())
246 }
247
248 #[must_use]
250 pub fn list_sources(&self) -> Vec<String> {
251 self.sources.read().keys().cloned().collect()
252 }
253
254 #[must_use]
256 pub fn list_sinks(&self) -> Vec<String> {
257 self.sinks.read().keys().cloned().collect()
258 }
259
260 pub fn create_deserializer(
266 &self,
267 format: &str,
268 ) -> Result<Box<dyn RecordDeserializer>, ConnectorError> {
269 let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
270 serde::create_deserializer(fmt).map_err(ConnectorError::Serde)
271 }
272
273 pub fn create_serializer(
279 &self,
280 format: &str,
281 ) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
282 let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
283 serde::create_serializer(fmt).map_err(ConnectorError::Serde)
284 }
285}
286
287impl Default for ConnectorRegistry {
288 fn default() -> Self {
289 Self::new()
290 }
291}
292
293impl std::fmt::Debug for ConnectorRegistry {
294 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
295 f.debug_struct("ConnectorRegistry")
296 .field("sources", &self.list_sources())
297 .field("sinks", &self.list_sinks())
298 .field("table_sources", &self.list_table_sources())
299 .finish()
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306 use crate::testing::*;
307
308 fn mock_info(name: &str, is_source: bool, is_sink: bool) -> ConnectorInfo {
309 ConnectorInfo {
310 name: name.to_string(),
311 display_name: name.to_string(),
312 version: "0.1.0".to_string(),
313 is_source,
314 is_sink,
315 config_keys: vec![],
316 }
317 }
318
319 #[test]
320 fn test_register_and_create_source() {
321 let registry = ConnectorRegistry::new();
322 registry.register_source(
323 "mock",
324 mock_info("mock", true, false),
325 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
326 );
327
328 let config = ConnectorConfig::new("mock");
329 let connector = registry.create_source(&config, None);
330 assert!(connector.is_ok());
331 }
332
333 #[test]
334 fn test_register_and_create_sink() {
335 let registry = ConnectorRegistry::new();
336 registry.register_sink(
337 "mock",
338 mock_info("mock", false, true),
339 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
340 );
341
342 let config = ConnectorConfig::new("mock");
343 let connector = registry.create_sink(&config, None);
344 assert!(connector.is_ok());
345 }
346
347 #[test]
348 fn test_create_unknown_connector() {
349 let registry = ConnectorRegistry::new();
350 let config = ConnectorConfig::new("nonexistent");
351
352 assert!(registry.create_source(&config, None).is_err());
353 assert!(registry.create_sink(&config, None).is_err());
354 }
355
356 #[test]
357 fn test_list_connectors() {
358 let registry = ConnectorRegistry::new();
359 registry.register_source(
360 "kafka",
361 mock_info("kafka", true, false),
362 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
363 );
364 registry.register_sink(
365 "delta",
366 mock_info("delta", false, true),
367 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSinkConnector::new())),
368 );
369
370 let sources = registry.list_sources();
371 assert_eq!(sources.len(), 1);
372 assert!(sources.contains(&"kafka".to_string()));
373
374 let sinks = registry.list_sinks();
375 assert_eq!(sinks.len(), 1);
376 assert!(sinks.contains(&"delta".to_string()));
377 }
378
379 #[test]
380 fn test_connector_info() {
381 let registry = ConnectorRegistry::new();
382 registry.register_source(
383 "kafka",
384 mock_info("kafka", true, false),
385 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
386 );
387
388 let info = registry.source_info("kafka");
389 assert!(info.is_some());
390 assert_eq!(info.unwrap().name, "kafka");
391
392 assert!(registry.source_info("nonexistent").is_none());
393 }
394
395 #[test]
396 fn test_format_registry() {
397 let registry = ConnectorRegistry::new();
398
399 assert!(registry.create_deserializer("json").is_ok());
400 assert!(registry.create_serializer("csv").is_ok());
401 assert!(registry.create_deserializer("unknown").is_err());
402 }
403
404 #[tokio::test]
405 async fn default_source_schema_some_when_discovered() {
406 let registry = ConnectorRegistry::new();
407 registry.register_source(
408 "mock",
409 mock_info("mock", true, false),
410 Arc::new(|_: Option<&prometheus::Registry>| Box::new(MockSourceConnector::new())),
411 );
412 let schema = registry
413 .default_source_schema("mock", &std::collections::HashMap::new())
414 .await;
415 assert!(schema.is_some_and(|s| !s.fields().is_empty()));
416 }
417
418 #[tokio::test]
419 async fn default_source_schema_none_for_unknown_connector() {
420 let registry = ConnectorRegistry::new();
421 assert!(registry
422 .default_source_schema("nope", &std::collections::HashMap::new())
423 .await
424 .is_none());
425 }
426
427 #[test]
430 fn test_register_and_create_table_source() {
431 use crate::reference::MockReferenceTableSource;
432
433 let registry = ConnectorRegistry::new();
434 registry.register_table_source(
435 "mock",
436 mock_info("mock", true, false),
437 Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
438 );
439
440 let config = ConnectorConfig::new("mock");
441 let source = registry.create_table_source(&config);
442 assert!(source.is_ok());
443 }
444
445 #[test]
446 fn test_create_unknown_table_source() {
447 let registry = ConnectorRegistry::new();
448 let config = ConnectorConfig::new("nonexistent");
449 let result = registry.create_table_source(&config);
450 match result {
451 Err(e) => assert!(e.to_string().contains("unknown table source"), "got: {e}"),
452 Ok(_) => panic!("Expected error for unknown table source"),
453 }
454 }
455
456 #[test]
457 fn test_list_table_sources() {
458 use crate::reference::MockReferenceTableSource;
459
460 let registry = ConnectorRegistry::new();
461 assert!(registry.list_table_sources().is_empty());
462
463 registry.register_table_source(
464 "mock-table",
465 mock_info("mock-table", true, false),
466 Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
467 );
468
469 let names = registry.list_table_sources();
470 assert_eq!(names.len(), 1);
471 assert!(names.contains(&"mock-table".to_string()));
472 }
473}