1use std::collections::HashMap;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use crate::config::{ConnectorConfig, ConnectorInfo};
11use crate::connector::{SinkConnector, SourceConnector};
12use crate::error::ConnectorError;
13use crate::reference::ReferenceTableSource;
14use crate::serde::{self, Format, RecordDeserializer, RecordSerializer};
15
16pub type SourceFactory = Arc<dyn Fn() -> Box<dyn SourceConnector> + Send + Sync>;
18
19pub type SinkFactory = Arc<dyn Fn() -> Box<dyn SinkConnector> + Send + Sync>;
21
22pub type TableSourceFactory = Arc<
24 dyn Fn(&ConnectorConfig) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> + Send + Sync,
25>;
26
27#[derive(Clone)]
44pub struct ConnectorRegistry {
45 sources: Arc<RwLock<HashMap<String, (ConnectorInfo, SourceFactory)>>>,
46 sinks: Arc<RwLock<HashMap<String, (ConnectorInfo, SinkFactory)>>>,
47 table_sources: Arc<RwLock<HashMap<String, (ConnectorInfo, TableSourceFactory)>>>,
48}
49
50impl ConnectorRegistry {
51 #[must_use]
53 pub fn new() -> Self {
54 Self {
55 sources: Arc::new(RwLock::new(HashMap::new())),
56 sinks: Arc::new(RwLock::new(HashMap::new())),
57 table_sources: Arc::new(RwLock::new(HashMap::new())),
58 }
59 }
60
61 pub fn register_source(
63 &self,
64 name: impl Into<String>,
65 info: ConnectorInfo,
66 factory: SourceFactory,
67 ) {
68 self.sources.write().insert(name.into(), (info, factory));
69 }
70
71 pub fn register_sink(
73 &self,
74 name: impl Into<String>,
75 info: ConnectorInfo,
76 factory: SinkFactory,
77 ) {
78 self.sinks.write().insert(name.into(), (info, factory));
79 }
80
81 pub fn create_source(
93 &self,
94 config: &ConnectorConfig,
95 ) -> Result<Box<dyn SourceConnector>, ConnectorError> {
96 let sources = self.sources.read();
97 let (_, factory) = sources.get(config.connector_type()).ok_or_else(|| {
98 ConnectorError::ConfigurationError(format!(
99 "unknown source connector type: '{}'",
100 config.connector_type()
101 ))
102 })?;
103 Ok(factory())
104 }
105
106 pub fn create_sink(
115 &self,
116 config: &ConnectorConfig,
117 ) -> Result<Box<dyn SinkConnector>, ConnectorError> {
118 let sinks = self.sinks.read();
119 let (_, factory) = sinks.get(config.connector_type()).ok_or_else(|| {
120 ConnectorError::ConfigurationError(format!(
121 "unknown sink connector type: '{}'",
122 config.connector_type()
123 ))
124 })?;
125 Ok(factory())
126 }
127
128 pub fn register_table_source(
130 &self,
131 name: impl Into<String>,
132 info: ConnectorInfo,
133 factory: TableSourceFactory,
134 ) {
135 self.table_sources
136 .write()
137 .insert(name.into(), (info, factory));
138 }
139
140 pub fn create_table_source(
149 &self,
150 config: &ConnectorConfig,
151 ) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> {
152 let table_sources = self.table_sources.read();
153 let (_, factory) = table_sources.get(config.connector_type()).ok_or_else(|| {
154 ConnectorError::ConfigurationError(format!(
155 "unknown table source connector type: '{}'",
156 config.connector_type()
157 ))
158 })?;
159 factory(config)
160 }
161
162 #[must_use]
164 pub fn list_table_sources(&self) -> Vec<String> {
165 self.table_sources.read().keys().cloned().collect()
166 }
167
168 #[must_use]
170 pub fn source_info(&self, name: &str) -> Option<ConnectorInfo> {
171 self.sources.read().get(name).map(|(info, _)| info.clone())
172 }
173
174 #[must_use]
176 pub fn sink_info(&self, name: &str) -> Option<ConnectorInfo> {
177 self.sinks.read().get(name).map(|(info, _)| info.clone())
178 }
179
180 #[must_use]
182 pub fn list_sources(&self) -> Vec<String> {
183 self.sources.read().keys().cloned().collect()
184 }
185
186 #[must_use]
188 pub fn list_sinks(&self) -> Vec<String> {
189 self.sinks.read().keys().cloned().collect()
190 }
191
192 pub fn create_deserializer(
198 &self,
199 format: &str,
200 ) -> Result<Box<dyn RecordDeserializer>, ConnectorError> {
201 let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
202 serde::create_deserializer(fmt).map_err(ConnectorError::Serde)
203 }
204
205 pub fn create_serializer(
211 &self,
212 format: &str,
213 ) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
214 let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
215 serde::create_serializer(fmt).map_err(ConnectorError::Serde)
216 }
217}
218
219impl Default for ConnectorRegistry {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225impl std::fmt::Debug for ConnectorRegistry {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 f.debug_struct("ConnectorRegistry")
228 .field("sources", &self.list_sources())
229 .field("sinks", &self.list_sinks())
230 .field("table_sources", &self.list_table_sources())
231 .finish()
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use crate::testing::*;
239
240 fn mock_info(name: &str, is_source: bool, is_sink: bool) -> ConnectorInfo {
241 ConnectorInfo {
242 name: name.to_string(),
243 display_name: name.to_string(),
244 version: "0.1.0".to_string(),
245 is_source,
246 is_sink,
247 config_keys: vec![],
248 }
249 }
250
251 #[test]
252 fn test_register_and_create_source() {
253 let registry = ConnectorRegistry::new();
254 registry.register_source(
255 "mock",
256 mock_info("mock", true, false),
257 Arc::new(|| Box::new(MockSourceConnector::new())),
258 );
259
260 let config = ConnectorConfig::new("mock");
261 let connector = registry.create_source(&config);
262 assert!(connector.is_ok());
263 }
264
265 #[test]
266 fn test_register_and_create_sink() {
267 let registry = ConnectorRegistry::new();
268 registry.register_sink(
269 "mock",
270 mock_info("mock", false, true),
271 Arc::new(|| Box::new(MockSinkConnector::new())),
272 );
273
274 let config = ConnectorConfig::new("mock");
275 let connector = registry.create_sink(&config);
276 assert!(connector.is_ok());
277 }
278
279 #[test]
280 fn test_create_unknown_connector() {
281 let registry = ConnectorRegistry::new();
282 let config = ConnectorConfig::new("nonexistent");
283
284 assert!(registry.create_source(&config).is_err());
285 assert!(registry.create_sink(&config).is_err());
286 }
287
288 #[test]
289 fn test_list_connectors() {
290 let registry = ConnectorRegistry::new();
291 registry.register_source(
292 "kafka",
293 mock_info("kafka", true, false),
294 Arc::new(|| Box::new(MockSourceConnector::new())),
295 );
296 registry.register_sink(
297 "delta",
298 mock_info("delta", false, true),
299 Arc::new(|| Box::new(MockSinkConnector::new())),
300 );
301
302 let sources = registry.list_sources();
303 assert_eq!(sources.len(), 1);
304 assert!(sources.contains(&"kafka".to_string()));
305
306 let sinks = registry.list_sinks();
307 assert_eq!(sinks.len(), 1);
308 assert!(sinks.contains(&"delta".to_string()));
309 }
310
311 #[test]
312 fn test_connector_info() {
313 let registry = ConnectorRegistry::new();
314 registry.register_source(
315 "kafka",
316 mock_info("kafka", true, false),
317 Arc::new(|| Box::new(MockSourceConnector::new())),
318 );
319
320 let info = registry.source_info("kafka");
321 assert!(info.is_some());
322 assert_eq!(info.unwrap().name, "kafka");
323
324 assert!(registry.source_info("nonexistent").is_none());
325 }
326
327 #[test]
328 fn test_format_registry() {
329 let registry = ConnectorRegistry::new();
330
331 assert!(registry.create_deserializer("json").is_ok());
332 assert!(registry.create_serializer("csv").is_ok());
333 assert!(registry.create_deserializer("unknown").is_err());
334 }
335
336 #[test]
339 fn test_register_and_create_table_source() {
340 use crate::reference::MockReferenceTableSource;
341
342 let registry = ConnectorRegistry::new();
343 registry.register_table_source(
344 "mock",
345 mock_info("mock", true, false),
346 Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
347 );
348
349 let config = ConnectorConfig::new("mock");
350 let source = registry.create_table_source(&config);
351 assert!(source.is_ok());
352 }
353
354 #[test]
355 fn test_create_unknown_table_source() {
356 let registry = ConnectorRegistry::new();
357 let config = ConnectorConfig::new("nonexistent");
358 let result = registry.create_table_source(&config);
359 match result {
360 Err(e) => assert!(e.to_string().contains("unknown table source"), "got: {e}"),
361 Ok(_) => panic!("Expected error for unknown table source"),
362 }
363 }
364
365 #[test]
366 fn test_list_table_sources() {
367 use crate::reference::MockReferenceTableSource;
368
369 let registry = ConnectorRegistry::new();
370 assert!(registry.list_table_sources().is_empty());
371
372 registry.register_table_source(
373 "mock-table",
374 mock_info("mock-table", true, false),
375 Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
376 );
377
378 let names = registry.list_table_sources();
379 assert_eq!(names.len(), 1);
380 assert!(names.contains(&"mock-table".to_string()));
381 }
382}