Skip to main content

laminar_connectors/
registry.rs

1//! Connector registry with factory pattern.
2//!
3//! The `ConnectorRegistry` maintains a catalog of available connector
4//! implementations and provides factory methods to instantiate them.
5use std::collections::HashMap;
6use std::sync::Arc;
7
8use parking_lot::RwLock;
9
10use crate::config::{ConnectorConfig, ConnectorInfo};
11use crate::connector::{SinkConnector, SourceConnector};
12use crate::error::ConnectorError;
13use crate::reference::ReferenceTableSource;
14use crate::serde::{self, Format, RecordDeserializer, RecordSerializer};
15
16/// Factory function type for creating source connectors.
17pub type SourceFactory = Arc<dyn Fn() -> Box<dyn SourceConnector> + Send + Sync>;
18
19/// Factory function type for creating sink connectors.
20pub type SinkFactory = Arc<dyn Fn() -> Box<dyn SinkConnector> + Send + Sync>;
21
22/// Factory function type for creating reference table sources.
23pub type TableSourceFactory = Arc<
24    dyn Fn(&ConnectorConfig) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> + Send + Sync,
25>;
26
27/// Registry of available connector implementations.
28///
29/// Connectors register themselves with a factory function that creates
30/// new instances. The runtime uses the registry to instantiate connectors
31/// based on the `connector` property in the configuration.
32///
33/// # Example
34///
35/// ```rust,ignore
36/// use laminar_connectors::registry::ConnectorRegistry;
37///
38/// let mut registry = ConnectorRegistry::new();
39/// registry.register_source("kafka", info, Arc::new(|| Box::new(KafkaSource::new())));
40///
41/// let connector = registry.create_source("kafka", &config)?;
42/// ```
43#[derive(Clone)]
44pub struct ConnectorRegistry {
45    sources: Arc<RwLock<HashMap<String, (ConnectorInfo, SourceFactory)>>>,
46    sinks: Arc<RwLock<HashMap<String, (ConnectorInfo, SinkFactory)>>>,
47    table_sources: Arc<RwLock<HashMap<String, (ConnectorInfo, TableSourceFactory)>>>,
48}
49
50impl ConnectorRegistry {
51    /// Creates a new empty registry.
52    #[must_use]
53    pub fn new() -> Self {
54        Self {
55            sources: Arc::new(RwLock::new(HashMap::new())),
56            sinks: Arc::new(RwLock::new(HashMap::new())),
57            table_sources: Arc::new(RwLock::new(HashMap::new())),
58        }
59    }
60
61    /// Registers a source connector factory.
62    pub fn register_source(
63        &self,
64        name: impl Into<String>,
65        info: ConnectorInfo,
66        factory: SourceFactory,
67    ) {
68        self.sources.write().insert(name.into(), (info, factory));
69    }
70
71    /// Registers a sink connector factory.
72    pub fn register_sink(
73        &self,
74        name: impl Into<String>,
75        info: ConnectorInfo,
76        factory: SinkFactory,
77    ) {
78        self.sinks.write().insert(name.into(), (info, factory));
79    }
80
81    /// Creates a new source connector instance.
82    ///
83    /// The connector type is determined by `config.connector_type()`.
84    ///
85    /// # Errors
86    ///
87    /// Returns `ConnectorError::ConfigurationError` if the connector type
88    /// is not registered.
89    ///
90    /// The factory creates a default-configured connector. The caller must
91    /// subsequently call `open(config)` to forward WITH clause properties.
92    pub fn create_source(
93        &self,
94        config: &ConnectorConfig,
95    ) -> Result<Box<dyn SourceConnector>, ConnectorError> {
96        let sources = self.sources.read();
97        let (_, factory) = sources.get(config.connector_type()).ok_or_else(|| {
98            ConnectorError::ConfigurationError(format!(
99                "unknown source connector type: '{}'",
100                config.connector_type()
101            ))
102        })?;
103        Ok(factory())
104    }
105
106    /// Creates a new sink connector instance.
107    ///
108    /// The connector type is determined by `config.connector_type()`.
109    ///
110    /// # Errors
111    ///
112    /// Returns `ConnectorError::ConfigurationError` if the connector type
113    /// is not registered.
114    pub fn create_sink(
115        &self,
116        config: &ConnectorConfig,
117    ) -> Result<Box<dyn SinkConnector>, ConnectorError> {
118        let sinks = self.sinks.read();
119        let (_, factory) = sinks.get(config.connector_type()).ok_or_else(|| {
120            ConnectorError::ConfigurationError(format!(
121                "unknown sink connector type: '{}'",
122                config.connector_type()
123            ))
124        })?;
125        Ok(factory())
126    }
127
128    /// Registers a reference table source factory.
129    pub fn register_table_source(
130        &self,
131        name: impl Into<String>,
132        info: ConnectorInfo,
133        factory: TableSourceFactory,
134    ) {
135        self.table_sources
136            .write()
137            .insert(name.into(), (info, factory));
138    }
139
140    /// Creates a new reference table source instance.
141    ///
142    /// The connector type is determined by `config.connector_type()`.
143    ///
144    /// # Errors
145    ///
146    /// Returns `ConnectorError::ConfigurationError` if the connector type
147    /// is not registered as a table source.
148    pub fn create_table_source(
149        &self,
150        config: &ConnectorConfig,
151    ) -> Result<Box<dyn ReferenceTableSource>, ConnectorError> {
152        let table_sources = self.table_sources.read();
153        let (_, factory) = table_sources.get(config.connector_type()).ok_or_else(|| {
154            ConnectorError::ConfigurationError(format!(
155                "unknown table source connector type: '{}'",
156                config.connector_type()
157            ))
158        })?;
159        factory(config)
160    }
161
162    /// Lists all registered table source connector names.
163    #[must_use]
164    pub fn list_table_sources(&self) -> Vec<String> {
165        self.table_sources.read().keys().cloned().collect()
166    }
167
168    /// Returns information about a registered source connector.
169    #[must_use]
170    pub fn source_info(&self, name: &str) -> Option<ConnectorInfo> {
171        self.sources.read().get(name).map(|(info, _)| info.clone())
172    }
173
174    /// Returns information about a registered sink connector.
175    #[must_use]
176    pub fn sink_info(&self, name: &str) -> Option<ConnectorInfo> {
177        self.sinks.read().get(name).map(|(info, _)| info.clone())
178    }
179
180    /// Lists all registered source connector names.
181    #[must_use]
182    pub fn list_sources(&self) -> Vec<String> {
183        self.sources.read().keys().cloned().collect()
184    }
185
186    /// Lists all registered sink connector names.
187    #[must_use]
188    pub fn list_sinks(&self) -> Vec<String> {
189        self.sinks.read().keys().cloned().collect()
190    }
191
192    /// Creates a deserializer for the given format string.
193    ///
194    /// # Errors
195    ///
196    /// Returns `ConnectorError::Serde` if the format is not supported.
197    pub fn create_deserializer(
198        &self,
199        format: &str,
200    ) -> Result<Box<dyn RecordDeserializer>, ConnectorError> {
201        let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
202        serde::create_deserializer(fmt).map_err(ConnectorError::Serde)
203    }
204
205    /// Creates a serializer for the given format string.
206    ///
207    /// # Errors
208    ///
209    /// Returns `ConnectorError::Serde` if the format is not supported.
210    pub fn create_serializer(
211        &self,
212        format: &str,
213    ) -> Result<Box<dyn RecordSerializer>, ConnectorError> {
214        let fmt = Format::parse(format).map_err(ConnectorError::Serde)?;
215        serde::create_serializer(fmt).map_err(ConnectorError::Serde)
216    }
217}
218
219impl Default for ConnectorRegistry {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225impl std::fmt::Debug for ConnectorRegistry {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        f.debug_struct("ConnectorRegistry")
228            .field("sources", &self.list_sources())
229            .field("sinks", &self.list_sinks())
230            .field("table_sources", &self.list_table_sources())
231            .finish()
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use crate::testing::*;
239
240    fn mock_info(name: &str, is_source: bool, is_sink: bool) -> ConnectorInfo {
241        ConnectorInfo {
242            name: name.to_string(),
243            display_name: name.to_string(),
244            version: "0.1.0".to_string(),
245            is_source,
246            is_sink,
247            config_keys: vec![],
248        }
249    }
250
251    #[test]
252    fn test_register_and_create_source() {
253        let registry = ConnectorRegistry::new();
254        registry.register_source(
255            "mock",
256            mock_info("mock", true, false),
257            Arc::new(|| Box::new(MockSourceConnector::new())),
258        );
259
260        let config = ConnectorConfig::new("mock");
261        let connector = registry.create_source(&config);
262        assert!(connector.is_ok());
263    }
264
265    #[test]
266    fn test_register_and_create_sink() {
267        let registry = ConnectorRegistry::new();
268        registry.register_sink(
269            "mock",
270            mock_info("mock", false, true),
271            Arc::new(|| Box::new(MockSinkConnector::new())),
272        );
273
274        let config = ConnectorConfig::new("mock");
275        let connector = registry.create_sink(&config);
276        assert!(connector.is_ok());
277    }
278
279    #[test]
280    fn test_create_unknown_connector() {
281        let registry = ConnectorRegistry::new();
282        let config = ConnectorConfig::new("nonexistent");
283
284        assert!(registry.create_source(&config).is_err());
285        assert!(registry.create_sink(&config).is_err());
286    }
287
288    #[test]
289    fn test_list_connectors() {
290        let registry = ConnectorRegistry::new();
291        registry.register_source(
292            "kafka",
293            mock_info("kafka", true, false),
294            Arc::new(|| Box::new(MockSourceConnector::new())),
295        );
296        registry.register_sink(
297            "delta",
298            mock_info("delta", false, true),
299            Arc::new(|| Box::new(MockSinkConnector::new())),
300        );
301
302        let sources = registry.list_sources();
303        assert_eq!(sources.len(), 1);
304        assert!(sources.contains(&"kafka".to_string()));
305
306        let sinks = registry.list_sinks();
307        assert_eq!(sinks.len(), 1);
308        assert!(sinks.contains(&"delta".to_string()));
309    }
310
311    #[test]
312    fn test_connector_info() {
313        let registry = ConnectorRegistry::new();
314        registry.register_source(
315            "kafka",
316            mock_info("kafka", true, false),
317            Arc::new(|| Box::new(MockSourceConnector::new())),
318        );
319
320        let info = registry.source_info("kafka");
321        assert!(info.is_some());
322        assert_eq!(info.unwrap().name, "kafka");
323
324        assert!(registry.source_info("nonexistent").is_none());
325    }
326
327    #[test]
328    fn test_format_registry() {
329        let registry = ConnectorRegistry::new();
330
331        assert!(registry.create_deserializer("json").is_ok());
332        assert!(registry.create_serializer("csv").is_ok());
333        assert!(registry.create_deserializer("unknown").is_err());
334    }
335
336    // ── Table source factory tests ──
337
338    #[test]
339    fn test_register_and_create_table_source() {
340        use crate::reference::MockReferenceTableSource;
341
342        let registry = ConnectorRegistry::new();
343        registry.register_table_source(
344            "mock",
345            mock_info("mock", true, false),
346            Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
347        );
348
349        let config = ConnectorConfig::new("mock");
350        let source = registry.create_table_source(&config);
351        assert!(source.is_ok());
352    }
353
354    #[test]
355    fn test_create_unknown_table_source() {
356        let registry = ConnectorRegistry::new();
357        let config = ConnectorConfig::new("nonexistent");
358        let result = registry.create_table_source(&config);
359        match result {
360            Err(e) => assert!(e.to_string().contains("unknown table source"), "got: {e}"),
361            Ok(_) => panic!("Expected error for unknown table source"),
362        }
363    }
364
365    #[test]
366    fn test_list_table_sources() {
367        use crate::reference::MockReferenceTableSource;
368
369        let registry = ConnectorRegistry::new();
370        assert!(registry.list_table_sources().is_empty());
371
372        registry.register_table_source(
373            "mock-table",
374            mock_info("mock-table", true, false),
375            Arc::new(|_config| Ok(Box::new(MockReferenceTableSource::empty()))),
376        );
377
378        let names = registry.list_table_sources();
379        assert_eq!(names.len(), 1);
380        assert!(names.contains(&"mock-table".to_string()));
381    }
382}